diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala index 41f76f070e..59a09e132c 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala @@ -614,47 +614,58 @@ final class DbMultiDomainAcsStore[TXE]( )(implicit tc: TraceContext ): Future[DestinationHistory.InsertResult] = { - val trees = items.collect { case UpdateHistoryResponse(TransactionTreeUpdate(tree), _) => - assert( - tree.getRecordTime.isAfter(CantonTimestamp.MinValue.toInstant), - "insert() must not be called with import updates", - ) - tree - } - val nonEmpty = NonEmptyList - .fromFoldable(trees) - .getOrElse( - throw new RuntimeException("insert() must not be called with an empty sequence") - ) - val firstTree = nonEmpty.foldLeft(nonEmpty.head) { case (acc, tree) => - if (tree.getRecordTime.isBefore(acc.getRecordTime)) tree else acc - } - val firstRecordTime = CantonTimestamp.assertFromInstant(firstTree.getRecordTime) - val treesWithEntries = trees.flatMap { tree => - val entries = txLogConfig.parser.parse(tree, synchronizerId, logger) - entries.map(e => (tree, e)) + val trees = items.collect { + case UpdateHistoryResponse(TransactionTreeUpdate(tree), _) + if !tree.getWorkflowId.startsWith(IMPORT_ACS_WORKFLOW_ID_PREFIX) => + assert( + tree.getRecordTime.isAfter(CantonTimestamp.MinValue.toInstant), + "insert() must not be called with import updates", + ) + tree } - for { - _ <- storage.queryAndUpdate( - DBIOAction - .seq( - doInsertEntries(migrationId, synchronizerId, treesWithEntries), - doUpdateFirstIngestedUpdate( - synchronizerId, - migrationId, - firstRecordTime, - ), + NonEmptyList.fromFoldable(trees) match { + case None => + Future.successful( + DestinationHistory.InsertResult( + backfilledUpdates = 0L, + backfilledEvents = 0L, + lastBackfilledRecordTime = + items.headOption.map(_.update.recordTime).getOrElse(CantonTimestamp.MinValue), ) - .transactionally, - "destinationHistory.insert", - ) - } yield DestinationHistory.InsertResult( - backfilledUpdates = trees.size.toLong, - backfilledEvents = - trees.foldLeft(0L)((sum, tree) => sum + tree.getEventsById.size().toLong), - lastBackfilledRecordTime = CantonTimestamp.assertFromInstant(nonEmpty.last.getRecordTime), - ) + ) + case Some(nonEmpty) => + val firstTree = nonEmpty.foldLeft(nonEmpty.head) { case (acc, tree) => + if (tree.getRecordTime.isBefore(acc.getRecordTime)) tree else acc + } + val firstRecordTime = CantonTimestamp.assertFromInstant(firstTree.getRecordTime) + val treesWithEntries = trees.flatMap { tree => + val entries = txLogConfig.parser.parse(tree, synchronizerId, logger) + entries.map(e => (tree, e)) + } + + for { + _ <- storage.queryAndUpdate( + DBIOAction + .seq( + doInsertEntries(migrationId, synchronizerId, treesWithEntries), + doUpdateFirstIngestedUpdate( + synchronizerId, + migrationId, + firstRecordTime, + ), + ) + .transactionally, + "destinationHistory.insert", + ) + } yield DestinationHistory.InsertResult( + backfilledUpdates = trees.size.toLong, + backfilledEvents = + trees.foldLeft(0L)((sum, tree) => sum + tree.getEventsById.size().toLong), + lastBackfilledRecordTime = + CantonTimestamp.assertFromInstant(nonEmpty.last.getRecordTime), + ) + } } override def markBackfillingComplete()(implicit tc: TraceContext): Future[Unit] = {