Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class UpdateHistorySanityCheckPlugin(
interval = Span(100, Millis),
)
eventually {
scan.automation.store.updateHistory
scan.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
}(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,14 +1093,14 @@ class DecentralizedSynchronizerMigrationIntegrationTest

withClueAndLog("Backfilled history includes ACS import") {
eventually() {
sv1ScanLocalBackend.appState.store.updateHistory.sourceHistory
sv1ScanLocalBackend.appState.automation.updateHistory.sourceHistory
.migrationInfo(1L)
.futureValue
.exists(_.complete) should be(true)
}

val backfilledUpdates =
sv1ScanLocalBackend.appState.store.updateHistory
sv1ScanLocalBackend.appState.automation.updateHistory
.getAllUpdates(None, PageLimit.tryCreate(1000))
.futureValue
backfilledUpdates.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ class ScanHistoryBackfillingIntegrationTest
)(
"History marked as free of corrupt snapshots",
_ => {
sv1ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv1ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
},
)

Expand All @@ -307,15 +307,15 @@ class ScanHistoryBackfillingIntegrationTest
)(
"Backfilling is complete only on the founding SV",
_ => {
sv1ScanBackend.appState.store.updateHistory
sv1ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete at this point, but the status endpoint only reports
// as complete if the txlog is also backfilled
sv1ScanBackend.getBackfillingStatus().complete shouldBe false
readUpdateHistoryFromScan(sv1ScanBackend) should not be empty

sv2ScanBackend.appState.store.updateHistory
sv2ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.InProgress(false, false))
sv2ScanBackend.getBackfillingStatus().complete shouldBe false
Expand Down Expand Up @@ -354,12 +354,12 @@ class ScanHistoryBackfillingIntegrationTest
)(
"All backfilling is complete",
_ => {
sv1ScanBackend.appState.store.updateHistory
sv1ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete, TxLog is not
sv1ScanBackend.getBackfillingStatus().complete shouldBe false
sv2ScanBackend.appState.store.updateHistory
sv2ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete, TxLog is not
Expand Down Expand Up @@ -444,7 +444,7 @@ class ScanHistoryBackfillingIntegrationTest
clue("Compare scan history with participant update stream") {
compareHistory(
sv1Backend.participantClient,
sv1ScanBackend.appState.store.updateHistory,
sv1ScanBackend.appState.automation.updateHistory,
ledgerBeginSv1,
)
}
Expand Down Expand Up @@ -554,7 +554,7 @@ class ScanHistoryBackfillingIntegrationTest

private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = {
// Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled
scanBackend.appState.store.updateHistory
scanBackend.appState.automation.updateHistory
.getAllUpdates(None, PageLimit.tryCreate(1000))
.futureValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ class ScanTimeBasedIntegrationTest
"Wait for backfilling to complete, as the ACS snapshot trigger is paused until then"
) {
eventually() {
sv1ScanBackend.automation.store.updateHistory
sv1ScanBackend.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
advanceTime(sv1ScanBackend.config.automation.pollingInterval.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class ScanTotalSupplyBigQueryIntegrationTest
case db: DbStorage => db
case s => fail(s"non-DB storage configured, unsupported for BigQuery: ${s.getClass}")
}
val sourceHistoryId = sv1ScanBackend.appState.store.updateHistory.historyId
val sourceHistoryId = sv1ScanBackend.appState.automation.updateHistory.historyId

copyTableToBigQuery(
"update_history_creates",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class UpdateHistoryIntegrationTest
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1ScanBackend.appState.store.updateHistory,
sv1ScanBackend.appState.automation.updateHistory,
ledgerBeginSv1,
)
}
Expand All @@ -179,30 +179,16 @@ class UpdateHistoryIntegrationTest
.lookupUserWallet(aliceWalletClient.config.ledgerApiUser)
.futureValue
.getOrElse(throw new RuntimeException("Alice wallet should exist"))
.store
.automation
.updateHistory,
ledgerBeginAlice,
true,
)
}
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1Backend.appState.svStore.updateHistory,
ledgerBeginSv1,
)
}
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1Backend.appState.dsoStore.updateHistory,
ledgerBeginSv1,
)
}
eventually() {
compareHistory(
aliceValidatorBackend.participantClient,
aliceValidatorBackend.appState.store.updateHistory,
aliceValidatorBackend.appState.automation.updateHistory,
ledgerBeginAlice,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.wallet.subscriptions
import org.lfdecentralizedtrust.splice.config.ConfigTransforms
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment
import org.lfdecentralizedtrust.splice.store.Limit
import org.lfdecentralizedtrust.splice.store.{Limit, PageLimit}
import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AnsSubscriptionRenewalPaymentTrigger
import org.lfdecentralizedtrust.splice.sv.config.InitialAnsConfig
import org.lfdecentralizedtrust.splice.util.{
Expand Down Expand Up @@ -1226,12 +1226,27 @@ class WalletTxLogIntegrationTest
logEntry.senderHoldingFees should beWithin(0, smallAmount)
logEntry.amuletPrice shouldBe amuletPrice
}
val expectedTxLogEntries = Seq(renewTxLog, creationTxLog, tapTxLog)
checkTxHistory(
bobValidatorWalletClient,
Seq(renewTxLog, creationTxLog, tapTxLog),
expectedTxLogEntries,
trafficTopups = IgnoreTopupsDevNet,
)

clue("Check UpdateHistory works for external parties") {
inside(
bobValidatorBackend.appState.walletManager
.valueOrFail("WalletManager is expected to be defined")
.externalPartyWalletManager
.lookupExternalPartyWallet(onboarding.party)
.valueOrFail(s"Expected ${onboarding.party} to have an external party wallet")
.updateHistory
.getAllUpdates(None, PageLimit.Max)
.futureValue
) { history =>
history.size should be >= expectedTxLogEntries.size
}
}
}

"handle failed automation (direct transfer)" in { implicit env =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ trait UpdateHistoryTestUtil extends TestCommon {
scanBackend: ScanAppBackendReference,
scanClient: ScanAppClientReference,
): Assertion = {
val historyFromStore = scanBackend.appState.store.updateHistory
val historyFromStore = scanBackend.appState.automation.updateHistory
.getAllUpdates(
None,
PageLimit.tryCreate(1000),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- if the only 2 store descriptors belong to the SV app, that means we can truncate the update_history tables

DO $$
DECLARE
descriptors TEXT[];
BEGIN

-- array equality (ordered) ensures that exactly these two, no more, no less, are there <=> it's the SV app
select array_agg(store_name order by store_name) into descriptors
from update_history_descriptors;

IF (descriptors = '{"DbSvDsoStore", "DbSvSvStore"}' OR descriptors = '{"DbSplitwellStore"}') THEN
RAISE NOTICE 'Truncating update history tables as only SV/Splitwell app descriptors are present. Descriptors: %', descriptors::text;
EXECUTE 'TRUNCATE TABLE update_history_assignments CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_unassignments CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_backfilling CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_creates CASCADE';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: The acs_snapshot_data table has a foreign key constraint to update_history_creates. We don't need to truncate acs_snapshot_data because the SV app doesn't write any data into it (and we'd rather fail if there is any unexpected data in there after all).

EXECUTE 'TRUNCATE TABLE update_history_exercises CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_transactions CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_last_ingested_offsets CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_descriptors CASCADE';
ELSE
RAISE NOTICE 'This is not the SV or Splitwell app, NOT truncating update history tables. Descriptors: %', descriptors::text;
END IF;

END $$;
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag

/** Shared base class for running ingestion and task-handler automation in applications. */
abstract class AutomationService(
private val automationConfig: AutomationConfig,
protected val automationConfig: AutomationConfig,
clock: Clock,
domainTimeSync: DomainTimeSynchronization,
domainUnpausedSync: DomainUnpausedSynchronization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.lfdecentralizedtrust.splice.store.{
AppStoreWithIngestion,
DomainTimeSynchronization,
DomainUnpausedSynchronization,
UpdateHistory,
}
import com.digitalasset.canton.time.{Clock, WallClock}
import com.digitalasset.canton.tracing.TraceContext
Expand All @@ -37,7 +38,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
ledgerClient: SpliceLedgerClient,
retryProvider: RetryProvider,
ingestFromParticipantBegin: Boolean,
ingestUpdateHistoryFromParticipantBegin: Boolean,
parametersConfig: SpliceParametersConfig,
)(implicit
ec: ExecutionContext,
Expand Down Expand Up @@ -97,6 +97,24 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
case SpliceLedgerConnectionPriority.AmuletExpiry => amuletExpiryConnection
}

final protected def registerUpdateHistoryIngestion(
updateHistory: UpdateHistory,
ingestUpdateHistoryFromParticipantBegin: Boolean,
): Unit = {
registerService(
new UpdateIngestionService(
updateHistory.getClass.getSimpleName,
updateHistory.ingestionSink,
connection(SpliceLedgerConnectionPriority.High),
automationConfig,
backoffClock = triggerContext.pollingClock,
triggerContext.retryProvider,
triggerContext.loggerFactory,
ingestUpdateHistoryFromParticipantBegin,
)
)
}

private def completionOffsetCallback(offset: Long): Future[Unit] =
store.multiDomainAcsStore.signalWhenIngestedOrShutdown(offset)(TraceContext.empty)

Expand All @@ -113,19 +131,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
)
)

registerService(
new UpdateIngestionService(
store.updateHistory.getClass.getSimpleName,
store.updateHistory.ingestionSink,
connection(SpliceLedgerConnectionPriority.High),
automationConfig,
backoffClock = triggerContext.pollingClock,
triggerContext.retryProvider,
triggerContext.loggerFactory,
ingestUpdateHistoryFromParticipantBegin,
)
)

registerTrigger(
new DomainIngestionService(
store.domains.ingestionSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, *}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.resource.{DbStorage, Storage}
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
Expand Down Expand Up @@ -150,23 +150,19 @@ class SqlIndexInitializationTrigger(
object SqlIndexInitializationTrigger {

def apply(
storage: Storage,
storage: DbStorage,
triggerContext: TriggerContext,
indexActions: List[IndexAction] = defaultIndexActions,
)(implicit
ec: ExecutionContextExecutor,
tracer: Tracer,
mat: Materializer,
): SqlIndexInitializationTrigger = storage match {
case dbStorage: DbStorage =>
new SqlIndexInitializationTrigger(
dbStorage,
triggerContext,
indexActions,
)
case storageType =>
// Same behavior as in `ScanStore.apply` and similar - we only really support DbStorage in our apps.
throw new RuntimeException(s"Unsupported storage type $storageType")
): SqlIndexInitializationTrigger = {
new SqlIndexInitializationTrigger(
storage,
triggerContext,
indexActions,
)
}

sealed trait IndexStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.lfdecentralizedtrust.splice.store.{
HistoryMetrics,
TxLogAppStore,
TxLogBackfilling,
UpdateHistory,
}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.PartyId
Expand All @@ -23,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future}

class TxLogBackfillingTrigger[TXE](
store: TxLogAppStore[TXE],
updateHistory: UpdateHistory,
batchSize: Int,
override protected val context: TriggerContext,
)(implicit
Expand All @@ -31,13 +33,13 @@ class TxLogBackfillingTrigger[TXE](
mat: Materializer,
) extends PollingParallelTaskExecutionTrigger[TxLogBackfillingTrigger.Task] {

private def party: PartyId = store.updateHistory.updateStreamParty
private def party: PartyId = updateHistory.updateStreamParty

override protected def extraMetricLabels = Seq(
"party" -> party.toProtoPrimitive
)

private val currentMigrationId = store.updateHistory.domainMigrationInfo.currentMigrationId
private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId

private val historyMetrics = new HistoryMetrics(context.metricsFactory)(
MetricsContext.Empty
Expand All @@ -48,23 +50,23 @@ class TxLogBackfillingTrigger[TXE](
)
private val backfilling = new TxLogBackfilling(
store.multiDomainAcsStore,
store.updateHistory,
updateHistory,
batchSize,
context.loggerFactory,
)

override def retrieveTasks()(implicit
tc: TraceContext
): Future[Seq[TxLogBackfillingTrigger.Task]] = {
if (!store.updateHistory.isReady) {
if (!updateHistory.isReady) {
logger.debug("UpdateHistory is not yet ready")
Future.successful(Seq.empty)
} else if (!store.multiDomainAcsStore.destinationHistory.isReady) {
logger.debug("MultiDomainAcsStore is not yet ready")
Future.successful(Seq.empty)
} else {
for {
sourceState <- store.updateHistory.getBackfillingState()
sourceState <- updateHistory.getBackfillingState()
destinationState <- store.multiDomainAcsStore.getTxLogBackfillingState()
} yield {
sourceState match {
Expand Down
Loading
Loading