diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/SchemaUpgradeHealthCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/SchemaUpgradeHealthCheck.java deleted file mode 100644 index c83545a74b..0000000000 --- a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/SchemaUpgradeHealthCheck.java +++ /dev/null @@ -1,28 +0,0 @@ -package tech.ebp.oqm.core.api.health; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import org.eclipse.microprofile.health.*; -import tech.ebp.oqm.core.api.service.schemaVersioning.ObjectSchemaUpgradeService; - -@Startup -@ApplicationScoped -public class SchemaUpgradeHealthCheck implements HealthCheck { - - @Inject - ObjectSchemaUpgradeService objectSchemaUpgradeService; - - @Override - public HealthCheckResponse call() { - HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.builder() - .name("Database Schema Upgrade"); - - if(objectSchemaUpgradeService.upgradeRan()){ - responseBuilder = responseBuilder.up(); - } else { - responseBuilder = responseBuilder.down(); - } - - return responseBuilder.build(); - } -} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/LivenessHealthCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/LivenessHealthCheck.java new file mode 100644 index 0000000000..790c7faa08 --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/LivenessHealthCheck.java @@ -0,0 +1,28 @@ +package tech.ebp.oqm.core.api.health.service; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import org.eclipse.microprofile.health.Liveness; +import tech.ebp.oqm.core.api.health.utils.GenericHealthCheck; +import tech.ebp.oqm.core.api.health.utils.HasLivenessCheck; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; + +@Liveness +@ApplicationScoped +public class LivenessHealthCheck extends GenericHealthCheck { + + @Inject + LivenessHealthCheck(Instance providers) { + super("Service Health - Liveness", providers); + } + + public LivenessHealthCheck() { + super("Service Health - Liveness", null); + } + + @Override + protected HealthStatus getStatus(HasLivenessCheck provider) { + return provider.getLivenessStatus(); + } +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/ReadinessHealthCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/ReadinessHealthCheck.java new file mode 100644 index 0000000000..4b8715dca2 --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/ReadinessHealthCheck.java @@ -0,0 +1,28 @@ +package tech.ebp.oqm.core.api.health.service; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import org.eclipse.microprofile.health.Readiness; +import tech.ebp.oqm.core.api.health.utils.GenericHealthCheck; +import tech.ebp.oqm.core.api.health.utils.HasReadinessCheck; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; + +@Readiness +@ApplicationScoped +public class ReadinessHealthCheck extends GenericHealthCheck { + + @Inject + ReadinessHealthCheck(Instance providers) { + super("Service Health - Readiness", providers); + } + + public ReadinessHealthCheck() { + super("Service Health - Readiness", null); + } + + @Override + protected HealthStatus getStatus(HasReadinessCheck provider) { + return provider.getReadinessStatus(); + } +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/StartupHealthCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/StartupHealthCheck.java new file mode 100644 index 0000000000..4fc408bc15 --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/service/StartupHealthCheck.java @@ -0,0 +1,28 @@ +package tech.ebp.oqm.core.api.health.service; + +import io.quarkus.runtime.Startup; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import tech.ebp.oqm.core.api.health.utils.GenericHealthCheck; +import tech.ebp.oqm.core.api.health.utils.HasStartupCheck; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; + +@Startup +@ApplicationScoped +public class StartupHealthCheck extends GenericHealthCheck { + + @Inject + StartupHealthCheck(Instance providers) { + super("Service Health - Startup", providers); + } + + public StartupHealthCheck() { + super("Service Health - Startup", null); + } + + @Override + protected HealthStatus getStatus(HasStartupCheck provider) { + return provider.getStartupStatus(); + } +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/GenericHealthCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/GenericHealthCheck.java new file mode 100644 index 0000000000..e884222960 --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/GenericHealthCheck.java @@ -0,0 +1,34 @@ +package tech.ebp.oqm.core.api.health.utils; + +import jakarta.enterprise.inject.Instance; +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; + +public abstract class GenericHealthCheck implements HealthCheck { + private final String healthCheckName; + private final Instance providers; + + public GenericHealthCheck(String healthCheckName, Instance providers) { + this.healthCheckName = healthCheckName; + this.providers = providers; + } + + protected abstract HealthStatus getStatus(T provider); + + @Override + public HealthCheckResponse call() { + HealthCheckResponseBuilder builder = HealthCheckResponse.named(this.healthCheckName); + boolean allUp = true; + + for (T provider : this.providers) { + HealthStatus status = this.getStatus(provider); + boolean up = status.isUp(); + allUp &= up; + builder.withData(status.getName() + ".up", up); + builder.withData(status.getName() + ".status", status.getStatusMessage()); + } + + return (allUp ? builder.up() : builder.down()).build(); + } +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasHealthCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasHealthCheck.java new file mode 100644 index 0000000000..043b75ad9b --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasHealthCheck.java @@ -0,0 +1,3 @@ +package tech.ebp.oqm.core.api.health.utils; + +public interface HasHealthCheck {} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasLivenessCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasLivenessCheck.java new file mode 100644 index 0000000000..a202084fdb --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasLivenessCheck.java @@ -0,0 +1,5 @@ +package tech.ebp.oqm.core.api.health.utils; + +public interface HasLivenessCheck extends HasHealthCheck { + HealthStatus getLivenessStatus(); +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasReadinessCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasReadinessCheck.java new file mode 100644 index 0000000000..a7bc7001d2 --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasReadinessCheck.java @@ -0,0 +1,5 @@ +package tech.ebp.oqm.core.api.health.utils; + +public interface HasReadinessCheck extends HasHealthCheck { + HealthStatus getReadinessStatus(); +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasStartupCheck.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasStartupCheck.java new file mode 100644 index 0000000000..27d07c44df --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HasStartupCheck.java @@ -0,0 +1,5 @@ +package tech.ebp.oqm.core.api.health.utils; + +public interface HasStartupCheck extends HasHealthCheck { + HealthStatus getStartupStatus(); +} diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HealthStatus.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HealthStatus.java new file mode 100644 index 0000000000..16f016b763 --- /dev/null +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/health/utils/HealthStatus.java @@ -0,0 +1,24 @@ +package tech.ebp.oqm.core.api.health.utils; + +import lombok.Getter; + +@Getter +public class HealthStatus { + private final String name; + private volatile boolean up = false; + private volatile String statusMessage = "Status not set"; + + public HealthStatus(String name) { + this.name = name; + } + + public void markUp(String message) { + up = true; + statusMessage = message; + } + + public void markDown(String message) { + up = false; + statusMessage = message; + } +} \ No newline at end of file diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/LifecycleBean.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/LifecycleBean.java index 700f79ede6..93fcf8818a 100644 --- a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/LifecycleBean.java +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/LifecycleBean.java @@ -8,10 +8,8 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; -import tech.ebp.oqm.core.api.model.object.upgrade.TotalUpgradeResult; import tech.ebp.oqm.core.api.service.TempFileService; import tech.ebp.oqm.core.api.service.mongo.CustomUnitService; -import tech.ebp.oqm.core.api.service.schemaVersioning.ObjectSchemaUpgradeService; import tech.ebp.oqm.core.api.service.serviceState.db.OqmDatabaseService; import java.nio.file.Paths; @@ -38,7 +36,7 @@ public class LifecycleBean { TempFileService tempFileService; @Inject - OqmDatabaseService dbService; + OqmDatabaseService dbService; private ZonedDateTime startDateTime; diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/MongoDbInit.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/MongoDbInit.java index 94bcc062a8..0d262df381 100644 --- a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/MongoDbInit.java +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/scheduled/MongoDbInit.java @@ -6,21 +6,23 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import tech.ebp.oqm.core.api.model.object.upgrade.TotalUpgradeResult; import tech.ebp.oqm.core.api.service.mongo.InventoryItemService; -import tech.ebp.oqm.core.api.service.mongo.MongoDbAwareService; import tech.ebp.oqm.core.api.service.mongo.MongoService; import tech.ebp.oqm.core.api.service.schemaVersioning.ObjectSchemaUpgradeService; import tech.ebp.oqm.core.api.service.serviceState.InstanceMutexService; import tech.ebp.oqm.core.api.service.serviceState.db.DbCacheEntry; import tech.ebp.oqm.core.api.service.serviceState.db.OqmDatabaseService; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; +import tech.ebp.oqm.core.api.health.utils.HasReadinessCheck; import java.util.Optional; @Singleton @Slf4j -public class MongoDbInit { +public class MongoDbInit implements HasReadinessCheck { @Inject InventoryItemService inventoryItemService; @@ -46,52 +48,64 @@ public class MongoDbInit { * This can be removed once all inventory items have mutexes. */ private void ensureItemMutexesExist() { - log.info("Ensuring inventory item mutexes exist."); - - for (DbCacheEntry curDb : this.oqmDatabaseService.getDatabases()) { - log.info("Ensuring inventory item mutexes exist for database: {}", curDb.getDbName()); - this.inventoryItemService.iterator(curDb.getDbId().toHexString()).forEachRemaining((item) -> { - this.instanceMutexService.register( - this.instanceMutexService.getMutexIdFor(curDb.getDbId().toHexString(), item) - ); - }); - log.info("DONE Ensuring inventory item mutexes exist for database: {}", curDb.getDbName()); + try { + log.info("Ensuring inventory item mutexes exist."); + + for (DbCacheEntry curDb : this.oqmDatabaseService.getDatabases()) { + log.info("Ensuring inventory item mutexes exist for database: {}", curDb.getDbName()); + this.inventoryItemService.iterator(curDb.getDbId().toHexString()).forEachRemaining((item) -> { + this.instanceMutexService.register(this.instanceMutexService.getMutexIdFor(curDb.getDbId().toHexString(), item)); + }); + log.info("DONE Ensuring inventory item mutexes exist for database: {}", curDb.getDbName()); + } + + log.info("DONE Ensuring inventory item mutexes exist."); + } catch (RuntimeException e) { + readinessStatus.markDown("Inventory item mutex initialization failed: " + e.getMessage()); + throw e; } - - log.info("DONE Ensuring inventory item mutexes exist."); } private void upgradeDbs(){ //TODO:: create flag service to check if things initted right. Setup filter to check this flag to reject requests until setup done. //TODO:: integrate into healthcheck. only DOWN if db upgrade failed - Optional schemaUpgradeResult = this.objectSchemaUpgradeService.updateSchema(); - if(schemaUpgradeResult.isEmpty()){ - log.warn("Did not upgrade schema at start."); - } else { - log.info("Schema upgrade result: {}", schemaUpgradeResult.get()); - //TODO:: rescan inv update stats + try { + Optional schemaUpgradeResult = this.objectSchemaUpgradeService.updateSchema(); + if(schemaUpgradeResult.isEmpty()){ + log.warn("Did not upgrade schema at start."); + } else { + log.info("Schema upgrade result: {}", schemaUpgradeResult.get()); + //TODO:: rescan inv update stats + } + } catch (RuntimeException e) { + readinessStatus.markDown("Database schema upgrade failed: " + e.getMessage()); + throw e; } } private void initDbs(){ - log.info("Initializing all databases."); - for(MongoService service : this.mongoServices){ - service.initDb(); + try { + log.info("Initializing all databases."); + for(MongoService service : this.mongoServices){ + service.initDb(); + } + log.info("DONE initializing all databases."); + } catch (RuntimeException e) { + readinessStatus.markDown("Database initialization failed: " + e.getMessage()); + throw e; } - log.info("DONE initializing all databases."); } - void onStart( - @Observes - StartupEvent ev - ) { - log.info("Starting initial db initialization tasks."); - - this.upgradeDbs(); - this.initDbs(); - this.ensureItemMutexesExist(); + @Getter + private final HealthStatus readinessStatus = new HealthStatus("Mongo DB Init"); - log.info("FINISHED initial db initialization tasks."); + void onStart(@Observes StartupEvent ev) { + readinessStatus.markDown("Startup initialization in progress"); + this.upgradeDbs(); + this.initDbs(); + this.ensureItemMutexesExist(); + readinessStatus.markUp("Initial db initialization tasks finished"); + log.info("FINISHED initial db initialization tasks."); } } diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/CustomUnitService.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/CustomUnitService.java index 00042eb256..a6602cc724 100644 --- a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/CustomUnitService.java +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/CustomUnitService.java @@ -7,15 +7,18 @@ import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.validation.Valid; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.bson.BsonDocument; import org.bson.types.ObjectId; +import tech.ebp.oqm.core.api.exception.db.DbNotFoundException; +import tech.ebp.oqm.core.api.health.utils.HasReadinessCheck; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; import tech.ebp.oqm.core.api.model.collectionStats.CollectionStats; +import tech.ebp.oqm.core.api.model.rest.search.CustomUnitSearch; import tech.ebp.oqm.core.api.model.rest.unit.custom.NewCustomUnitRequest; import tech.ebp.oqm.core.api.model.units.CustomUnitEntry; import tech.ebp.oqm.core.api.model.units.UnitUtils; -import tech.ebp.oqm.core.api.model.rest.search.CustomUnitSearch; -import tech.ebp.oqm.core.api.exception.db.DbNotFoundException; import javax.measure.Unit; import java.util.ArrayList; @@ -26,94 +29,101 @@ */ @Slf4j @ApplicationScoped -public class CustomUnitService extends TopLevelMongoService { - - CustomUnitService() { - super(CustomUnitEntry.class); - } - - @PostConstruct - void readInUnits() { - log.info("Reading existing custom units from database..."); - - try ( - MongoCursor it = this.listIterator(null, Sorts.ascending("order"), null) - .batchSize(1) - .iterator() - ) { - while (it.hasNext()) { - CustomUnitEntry curEntry = it.next(); - log.debug("Registering unit {}", curEntry); - UnitUtils.registerAllUnits(curEntry); - } - } - log.info("Done reading in custom units."); - } - - public long getNextOrderValue() { - CustomUnitEntry entry = this.listIterator(null, Sorts.descending("order"), null).first(); - - if (entry == null) { - return 0; - } - return entry.getOrder() + 1L; - } - - public CustomUnitEntry getFromUnit(ClientSession clientSession, Unit unit) { - List matchList = this.listIterator( - clientSession, - Filters.eq("unitCreator.symbol", unit.getSymbol()), - null, - null - ).into(new ArrayList<>()); - - if (matchList.isEmpty()) { - throw new DbNotFoundException("Could not find custom unit " + unit, CustomUnitEntry.class); - } - if (matchList.size() != 1) { - throw new DbNotFoundException( - "Could not find custom unit " + unit + " - Too many matched units (" + matchList.size() + ")", - CustomUnitEntry.class - ); - } - - return matchList.get(0); - } - - public ObjectId add(ClientSession cs, @Valid CustomUnitEntry entry){ - log.info("Adding new custom unit."); - - UnitUtils.registerAllUnits(entry); - - ObjectId id = null; - if(cs == null){ - id = this.getTypedCollection().insertOne(entry).getInsertedId().asObjectId().getValue(); - } else { - id = this.getTypedCollection().insertOne(cs, entry).getInsertedId().asObjectId().getValue(); - } - entry.setId(id); - - log.info("New custom unit: {}", entry); - return entry.getId(); - } - - public ObjectId add(ClientSession cs, @Valid NewCustomUnitRequest ncur){ - log.info("Adding new custom unit."); - CustomUnitEntry newUnit = ncur.toCustomUnitEntry(this.getNextOrderValue()); - - return this.add(cs, newUnit); - } - - public List list(){ - return this.listIterator(null, Sorts.ascending("order"), null).into(new ArrayList<>()); - } - - public void removeAll(){ - this.getTypedCollection().deleteMany(new BsonDocument()); - } - - @Override - public int getCurrentSchemaVersion() { - return CustomUnitEntry.CUR_SCHEMA_VERSION; - } +public class CustomUnitService extends TopLevelMongoService implements HasReadinessCheck { + + CustomUnitService() { + super(CustomUnitEntry.class); + } + + @Getter + private final HealthStatus readinessStatus = new HealthStatus("Custom Unit Service"); + + @PostConstruct + void readInUnits() { + log.info("Reading existing custom units from database..."); + try ( + MongoCursor it = this.listIterator(null, Sorts.ascending("order"), null) + .batchSize(1) + .iterator() + ) { + while (it.hasNext()) { + CustomUnitEntry curEntry = it.next(); + log.debug("Registering unit {}", curEntry); + UnitUtils.registerAllUnits(curEntry); + } + } catch (Exception e) { + readinessStatus.markDown("Error occurred while reading custom units."); + log.error("Error occurred while reading in custom units from database.", e); + throw e; + } + log.info("Done reading in custom units."); + readinessStatus.markUp("Custom units read in and registered"); + } + + public long getNextOrderValue() { + CustomUnitEntry entry = this.listIterator(null, Sorts.descending("order"), null).first(); + + if (entry == null) { + return 0; + } + return entry.getOrder() + 1L; + } + + public CustomUnitEntry getFromUnit(ClientSession clientSession, Unit unit) { + List matchList = this.listIterator( + clientSession, + Filters.eq("unitCreator.symbol", unit.getSymbol()), + null, + null + ).into(new ArrayList<>()); + + if (matchList.isEmpty()) { + throw new DbNotFoundException("Could not find custom unit " + unit, CustomUnitEntry.class); + } + if (matchList.size() != 1) { + throw new DbNotFoundException( + "Could not find custom unit " + unit + " - Too many matched units (" + matchList.size() + ")", + CustomUnitEntry.class + ); + } + + return matchList.get(0); + } + + public ObjectId add(ClientSession cs, @Valid CustomUnitEntry entry) { + log.info("Adding new custom unit."); + + UnitUtils.registerAllUnits(entry); + + ObjectId id = null; + if (cs == null) { + id = this.getTypedCollection().insertOne(entry).getInsertedId().asObjectId().getValue(); + } else { + id = this.getTypedCollection().insertOne(cs, entry).getInsertedId().asObjectId().getValue(); + } + entry.setId(id); + + log.info("New custom unit: {}", entry); + return entry.getId(); + } + + public ObjectId add(ClientSession cs, @Valid NewCustomUnitRequest ncur) { + log.info("Adding new custom unit."); + CustomUnitEntry newUnit = ncur.toCustomUnitEntry(this.getNextOrderValue()); + + return this.add(cs, newUnit); + } + + public List list() { + return this.listIterator(null, Sorts.ascending("order"), null).into(new ArrayList<>()); + } + + public void removeAll() { + this.getTypedCollection().deleteMany(new BsonDocument()); + } + + @Override + public int getCurrentSchemaVersion() { + return CustomUnitEntry.CUR_SCHEMA_VERSION; + } } diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/ObjectSchemaUpgradeService.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/ObjectSchemaUpgradeService.java index bf88aff6fa..e9107205b2 100644 --- a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/ObjectSchemaUpgradeService.java +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/ObjectSchemaUpgradeService.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.mongodb.TransactionOptions; import com.mongodb.client.ClientSession; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -21,6 +20,8 @@ import tech.ebp.oqm.core.api.config.CoreApiInteractingEntity; import tech.ebp.oqm.core.api.exception.ClassUpgraderNotFoundException; import tech.ebp.oqm.core.api.exception.UpgradeFailedException; +import tech.ebp.oqm.core.api.health.utils.HasReadinessCheck; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; import tech.ebp.oqm.core.api.model.object.MainObject; import tech.ebp.oqm.core.api.model.object.Versionable; import tech.ebp.oqm.core.api.model.object.history.details.FromSchemaUpgradeDetail; @@ -33,7 +34,15 @@ import tech.ebp.oqm.core.api.model.object.upgrade.TotalUpgradeResult; import tech.ebp.oqm.core.api.model.object.upgrade.UpgradeCreatedObjectsResults; import tech.ebp.oqm.core.api.model.object.upgrade.UpgradeOverallCreatedObjectsResults; -import tech.ebp.oqm.core.api.service.mongo.*; +import tech.ebp.oqm.core.api.service.mongo.InteractingEntityService; +import tech.ebp.oqm.core.api.service.mongo.InventoryItemService; +import tech.ebp.oqm.core.api.service.mongo.ItemCheckoutService; +import tech.ebp.oqm.core.api.service.mongo.MongoDbAwareService; +import tech.ebp.oqm.core.api.service.mongo.MongoHistoriedObjectService; +import tech.ebp.oqm.core.api.service.mongo.MongoHistoryService; +import tech.ebp.oqm.core.api.service.mongo.StorageBlockService; +import tech.ebp.oqm.core.api.service.mongo.StoredService; +import tech.ebp.oqm.core.api.service.mongo.TopLevelMongoService; import tech.ebp.oqm.core.api.service.mongo.transactions.AppliedTransactionService; import tech.ebp.oqm.core.api.service.mongo.utils.MongoSessionWrapper; import tech.ebp.oqm.core.api.service.schemaVersioning.upgraders.ObjectSchemaUpgrader; @@ -49,7 +58,13 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -58,12 +73,11 @@ import static com.mongodb.client.model.Filters.eq; import static com.mongodb.client.model.Filters.lt; -import static com.mongodb.client.model.Filters.type; @ApplicationScoped @Slf4j -public class ObjectSchemaUpgradeService { - +public class ObjectSchemaUpgradeService implements HasReadinessCheck { + /** Map of upgraders to provide easy access to which upgraders for which object class. */ private Map, ObjectSchemaUpgrader> upgraderMap; /** The main oqm database service. */ @@ -75,13 +89,16 @@ public class ObjectSchemaUpgradeService { /** Data structure for easy grouping of services that can/ should have their schema updated at the same time, and those groups in order. */ private List>> dbAwareUpgradeGroups; private TotalUpgradeResult startupUpgradeResult = null; - + @Getter(AccessLevel.PRIVATE) CoreApiInteractingEntity coreApiInteractingEntity; - + @ConfigProperty(name = "quarkus.uuid") String instanceUuid; - + + @Getter + private final HealthStatus readinessStatus = new HealthStatus("Object Schema Upgrade Service"); + @Inject public ObjectSchemaUpgradeService( CoreApiInteractingEntity coreApiInteractingEntity, @@ -98,14 +115,14 @@ public ObjectSchemaUpgradeService( this.coreApiInteractingEntity = coreApiInteractingEntity; this.instanceUuid = instanceUuid; this.oqmDatabaseService = oqmDatabaseService; - + this.topLevelServices = new LinkedHashMap<>(); Stream.of( interactingEntityService ).forEachOrdered((service)->{ this.topLevelServices.put(service.getClazz(), service); }); - + //This insertion order here is the order of which these are each processed. this.dbAwareUpgradeGroups = List.of( List.of( @@ -134,7 +151,7 @@ public ObjectSchemaUpgradeService( return map1; } ); - + this.upgraderMap = Stream.of( new HistoryEventSchemaUpgrader(), new InteractingEntitySchemaUpgrader(), @@ -155,27 +172,27 @@ public ObjectSchemaUpgradeService( } ); } - + public Optional getStartupUpgradeResult() { return Optional.ofNullable(this.startupUpgradeResult); } - + public boolean upgradeRan() { return this.startupUpgradeResult != null; } - - + + public ObjectSchemaUpgrader getUpgrader(@NonNull Class clazz) throws ClassUpgraderNotFoundException { if (!this.upgraderMap.containsKey(clazz)) { throw new ClassUpgraderNotFoundException(clazz); } return (ObjectSchemaUpgrader) this.upgraderMap.get(clazz); } - + private void clearUpgraderMap() { this.upgraderMap = null; } - + private void processCreatedObjects( ClientSession cs, String oqmDbId, @@ -185,25 +202,25 @@ private void processCreatedObjects( ) { MongoDbAwareService service = (MongoDbAwareService) this.oqmDbServices.get(newObjClass); ObjectSchemaUpgrader upgrader = (ObjectSchemaUpgrader) this.upgraderMap.get(newObjClass); - + if (service == null) { throw new IllegalStateException("Service for class not found: " + newObjClass.getName()); } if (upgrader == null) { throw new IllegalStateException("Upgrader for class not found: " + newObjClass.getName()); } - + List createdObjs = newObjects.stream() .map((no)->upgrader.upgrade(no).getUpgradedObject()) .filter(Optional::isPresent) .map(Optional::get) .toList(); - + createdObjs.stream() .forEach((curCreated)->{ ObjectId newId = service.getTypedCollection(oqmDbId) .insertOne(cs, curCreated).getInsertedId().asObjectId().getValue(); - + if (service instanceof MongoHistoriedObjectService) { ((MongoHistoriedObjectService) service).getHistoryService() .addHistoryFor( @@ -220,7 +237,7 @@ private void processCreatedObjects( } ); } - + private void processCreatedObjects( String upgradeId, ClientSession cs, @@ -247,7 +264,7 @@ private void processCreatedObjects( ); }); } - + /** * Handles the actual upgrading of schema data in a collection. Iterates over all elements in collection, upgrading each (if necessary). * @@ -273,15 +290,15 @@ private CollectionUpgradeResult upgradeOqmCollection( ObjectSchemaUpgrader objectVersionBumper = this.getUpgrader(objectClass); outputBuilder.collectionClass(objectClass); outputBuilder.collectionName(documentCollection.getNamespace().getCollectionName()); - + UpgradeOverallCreatedObjectsResults createdObjectResults = new UpgradeOverallCreatedObjectsResults(); outputBuilder.createdObjects(createdObjectResults); - + StopWatch sw = StopWatch.createStarted(); long numUpdated = 0; long numNotUpgraded = 0; long numDeleted = 0; - + if (objectVersionBumper.upgradesAvailable()) { try ( MongoCursor it = documentCollection @@ -301,7 +318,7 @@ private CollectionUpgradeResult upgradeOqmCollection( upgradedObject.map(Versionable::getSchemaVersion).orElse(-1) ) .build(); - + if (result.isDelObj()) { log.info("Deleting object with id {} in collection {}", doc.getObjectId("_id"), documentCollection.getNamespace().getCollectionName()); typedCollection.deleteOne(cs, eq("_id", result.getObjectId())); @@ -310,8 +327,8 @@ private CollectionUpgradeResult upgradeOqmCollection( numNotUpgraded++; } else { numUpdated++; - - + + log.info("Updating object db entry with id {} in collection {}", doc.getObjectId("_id"), documentCollection.getNamespace().getCollectionName()); T previous = typedCollection.findOneAndReplace( cs, @@ -324,7 +341,7 @@ private CollectionUpgradeResult upgradeOqmCollection( if (previous == null) { throw new RuntimeException("Previous object was not upgraded..."); } - + //TODO:: support top level collections to do these things if (oqmDbId != null) { //add upgrade event, if applicable @@ -341,7 +358,7 @@ private CollectionUpgradeResult upgradeOqmCollection( } } } - + //TODO:: support top level collections to do these things if (oqmDbId != null) { if (result.hasUpgradedCreatedObjects()) { @@ -362,16 +379,16 @@ private CollectionUpgradeResult upgradeOqmCollection( throw new RuntimeException(e); } } - + sw.stop(); outputBuilder.timeTaken(Duration.of(sw.getTime(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS)) .numObjectsUpgraded(numUpdated) .numObjectsNotUpgraded(numNotUpgraded) .numObjectsDeleted(numDeleted); - + return outputBuilder.build(); } - + /** * Handles upgrading a particular collection. Wrapper for the other method, getting specific details from the mongo service. * @@ -402,7 +419,7 @@ private CollectionUpgradeResult upgradeOqmCollection(Stri outputBuilder ); }); - + Optional> histCollOp = Optional.empty(); if (historiedService) { log.info("Service is historied, processing history events."); @@ -422,20 +439,20 @@ private CollectionUpgradeResult upgradeOqmCollection(Stri }) ); } - + collectionFuture.get(); - + if (histCollOp.isPresent()) { ((HistoriedCollectionUpgradeResult.HistoriedCollectionUpgradeResultBuilder) outputBuilder).historyCollectionUpgradeResult(histCollOp.get().get()); } - + log.info("DONE Updating schema of oqm database service {} in ", service.getClass()); return outputBuilder.build(); } - + private CollectionUpgradeResult upgradeOqmCollection(String upgradeId, ClientSession dbCs, TopLevelMongoService service) { log.info("Updating schema of top level oqm database service {}", service.getClass()); - + CollectionUpgradeResult result = this.upgradeOqmCollection( upgradeId, dbCs, @@ -446,11 +463,11 @@ private CollectionUpgradeResult upgradeOqmCollection(Stri service.getClazz(), CollectionUpgradeResult.builder() ); - + log.info("DONE Updating schema of oqm database service {} in ", service.getClass()); return result; } - + /** * This method upgrades a particular oqm db to the latest schema. *

@@ -466,7 +483,7 @@ private OqmDbUpgradeResult upgradeOqmDb(String upgradeId, OqmMongoDatabase oqmDb .dbName(oqmDb.getName()); List upgradeResults = new ArrayList<>(); outputBuilder.collectionUpgradeResults(upgradeResults); - + StopWatch dbUpgradeTime = StopWatch.createStarted(); ClientSession cs = null; try { @@ -499,15 +516,15 @@ private OqmDbUpgradeResult upgradeOqmDb(String upgradeId, OqmMongoDatabase oqmDb cs.close(); } } - + dbUpgradeTime.stop(); outputBuilder.timeTaken(Duration.of(dbUpgradeTime.getTime(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS)); - + log.info("Done updating oqm database: {}", oqmDb); - + return outputBuilder.build(); } - + /** * This method is responsible for upgrading all the collections/ databases handled by the core api. *

@@ -524,8 +541,8 @@ public Optional updateSchema(boolean force) { } final String upgradeId = UUID.randomUUID().toString(); log.info("Upgrading the schema held in the Database. Id: {}", upgradeId); - - + + AtomicReference result = new AtomicReference<>(); try (MongoSessionWrapper csw = new MongoSessionWrapper(this.oqmDatabaseService)) { csw.runTransaction(true, (ClientSession cs)->{ @@ -536,17 +553,17 @@ public Optional updateSchema(boolean force) { {//top level migration log.info("Upgrading top level collections."); List topLevelResults = new ArrayList<>(); - + //TODO:: session wrapper for all things not just one then the other for (TopLevelMongoService curTopLevelService : this.topLevelServices.values()) { topLevelResults.add(this.upgradeOqmCollection(upgradeId, cs, curTopLevelService)); } - - + + totalResultBuilder.topLevelUpgradeResults(topLevelResults); log.info("DONE upgrading top level results."); } - + List> resultMap = new ArrayList<>(); for (OqmMongoDatabase curDb : this.oqmDatabaseService.listIterator()) { resultMap.add(CompletableFuture.supplyAsync(()->{ @@ -563,18 +580,18 @@ public Optional updateSchema(boolean force) { } }) .toList()); - + cs.commitTransaction(); - + totalTime.stop(); totalResultBuilder.timeTaken(totalTime.getDuration()); result.set(totalResultBuilder.build()); - + log.info("DONE upgrading the schema held in the Database."); log.info("Running post upgrade tasks."); - + cs.startTransaction(); - + TotalUpgradeResult innerResult = result.get(); if (innerResult.wasUpgraded()) { innerResult.getTopLevelUpgradeResults().stream() @@ -593,27 +610,32 @@ public Optional updateSchema(boolean force) { log.info("Running post upgrade tasks for collection: {} / {}", curResult.getCollectionName(), curResult.getCollectionClass()); MongoDbAwareService service = this.oqmDbServices.get(curResult.getCollectionClass()); service.runPostUpgrade(curDbResult.getDbName(), cs, curResult); - + if (service instanceof MongoHistoriedObjectService) { ((MongoHistoriedObjectService) service).getHistoryService().runPostUpgrade(curDbResult.getDbName(), cs, curResult); } }); }); - + } else { log.info("No object upgraded, no reason to run post upgrade tasks."); } - + log.info("DONE running post-upgrade tasks."); }); - } - + } catch(Exception e) { + readinessStatus.markDown("Schema upgrade failed with id: " + upgradeId + ". Error: " + e.getMessage()); + log.error("Failed to upgrade schema with id: " + upgradeId, e); + throw new UpgradeFailedException("Failed to upgrade schema with id: " + upgradeId, e); + } + this.startupUpgradeResult = result.get(); - + readinessStatus.markUp("Schema upgrade completed with id: " + upgradeId); + log.info("DONE running post-upgrade tasks."); return this.getStartupUpgradeResult(); } - + public Optional updateSchema() { return this.updateSchema(false); } diff --git a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmDatabaseService.java b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmDatabaseService.java index 83e2245c99..05fc285d10 100644 --- a/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmDatabaseService.java +++ b/software/core/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmDatabaseService.java @@ -13,12 +13,13 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.bson.conversions.Bson; -import org.bson.types.ObjectId; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; +import tech.ebp.oqm.core.api.health.utils.HasReadinessCheck; import tech.ebp.oqm.core.api.model.collectionStats.CollectionStats; import tech.ebp.oqm.core.api.model.rest.search.OqmMongoDbSearch; import tech.ebp.oqm.core.api.service.mongo.TopLevelMongoService; +import tech.ebp.oqm.core.api.health.utils.HealthStatus; import java.util.ArrayList; import java.util.List; @@ -33,7 +34,7 @@ */ @Slf4j @ApplicationScoped -public class OqmDatabaseService extends TopLevelMongoService { +public class OqmDatabaseService extends TopLevelMongoService implements HasReadinessCheck { @Getter @Setter(AccessLevel.PRIVATE) @@ -46,6 +47,9 @@ public class OqmDatabaseService extends TopLevelMongoService tests() throws IOException { List tests = new ArrayList<>();