diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java index 9cb73d01dc962..366b2438179a7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java @@ -67,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.SortedMap; import java.util.UUID; import java.util.stream.Collectors; @@ -392,22 +393,94 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = - new PreviousSnapshot(Collections.emptyList()); + new PreviousSnapshot(null, -1L); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { @Nonnull private final Map confirmedSstFiles; - protected PreviousSnapshot(@Nullable Collection confirmedSstFiles) { + /** + * Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous + * checkpoints, prune the sst files which have been re-uploaded in the following + * checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to + * notification delay. Following steps for example: + * + * + * + * @param currentUploadedSstFiles the sst files uploaded in previous checkpoints. + * @param lastCompletedCheckpoint the last completed checkpoint id. + */ + protected PreviousSnapshot( + @Nullable SortedMap> currentUploadedSstFiles, + long lastCompletedCheckpoint) { this.confirmedSstFiles = - confirmedSstFiles != null - ? confirmedSstFiles.stream() + currentUploadedSstFiles != null + ? pruneFirstCheckpointSstFiles( + currentUploadedSstFiles, lastCompletedCheckpoint) + : Collections.emptyMap(); + } + + /** + * The last completed checkpoint's uploaded sst files are all included, then for each + * following checkpoint, if a sst file has been re-uploaded, remove it from the first + * checkpoint's sst files. + * + * @param currentUploadedSstFiles the sst files uploaded in the following checkpoint. + * @param lastCompletedCheckpoint the last completed checkpoint id. + */ + private Map pruneFirstCheckpointSstFiles( + @Nonnull SortedMap> currentUploadedSstFiles, + long lastCompletedCheckpoint) { + Map prunedSstFiles = null; + int removedCount = 0; + for (Map.Entry> entry : + currentUploadedSstFiles.entrySet()) { + // Iterate checkpoints in ascending order of checkpoint id. + if (entry.getKey() == lastCompletedCheckpoint) { + // The first checkpoint's uploaded sst files are all included. + prunedSstFiles = + entry.getValue().stream() .collect( Collectors.toMap( HandleAndLocalPath::getLocalPath, - HandleAndLocalPath::getHandle)) - : Collections.emptyMap(); + HandleAndLocalPath::getHandle)); + } else if (prunedSstFiles == null) { + // The last completed checkpoint's uploaded sst files are not existed. + // So we skip the pruning process. + break; + } else if (!prunedSstFiles.isEmpty()) { + // Prune sst files which have been re-uploaded in the following checkpoints. + for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) { + if (!(handleAndLocalPath.getHandle() + instanceof PlaceholderStreamStateHandle)) { + // If it's not a placeholder handle, it means the sst file has been + // re-uploaded in the following checkpoint. + if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) { + removedCount++; + } + } + } + } + } + if (removedCount > 0 && LOG.isTraceEnabled()) { + LOG.trace( + "Removed {} re-uploaded sst files from base file set for incremental " + + "checkpoint. Base checkpoint id: {}", + removedCount, + currentUploadedSstFiles.firstKey()); + } + return (prunedSstFiles != null && !prunedSstFiles.isEmpty()) + ? Collections.unmodifiableMap(prunedSstFiles) + : Collections.emptyMap(); } protected Optional getUploaded(String filename) { @@ -425,5 +498,10 @@ protected Optional getUploaded(String filename) { return Optional.empty(); } } + + @Override + public String toString() { + return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}'; + } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 436a0f2ec1c93..404d671cc38e8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -195,30 +195,29 @@ protected PreviousSnapshot snapshotMetaData( long checkpointId, @Nonnull List stateMetaInfoSnapshots) { final long lastCompletedCheckpoint; - final Collection confirmedSstFiles; + final SortedMap> currentUploadedSstFiles; // use the last completed checkpoint as the comparison base. synchronized (uploadedSstFiles) { lastCompletedCheckpoint = lastCompletedCheckpointId; - confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint); - LOG.trace( - "Use confirmed SST files for checkpoint {}: {}", - checkpointId, - confirmedSstFiles); + currentUploadedSstFiles = + new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint)); } + PreviousSnapshot previousSnapshot = + new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint); LOG.trace( "Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + "assuming the following (shared) confirmed files as base: {}.", checkpointId, lastCompletedCheckpoint, - confirmedSstFiles); + previousSnapshot); // snapshot meta data to save for (Map.Entry stateMetaInfoEntry : kvStateInformation.entrySet()) { stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot()); } - return new PreviousSnapshot(confirmedSstFiles); + return previousSnapshot; } /** @@ -343,6 +342,7 @@ private long uploadSnapshotFiles( List miscFilePaths = new ArrayList<>(files.length); createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); + LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java index c4cb705842125..899708d12d507 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java @@ -96,6 +96,41 @@ void testCheckpointIsIncremental() throws Exception { } } + @Test + void testCheckpointIsIncrementalWithLateNotification() throws Exception { + + try (CloseableRegistry closeableRegistry = new CloseableRegistry(); + RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = + createSnapshotStrategy()) { + FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory(); + + // make and checkpoint with id 1 + snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + + // make and checkpoint with id 2 + snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + + // Late notify checkpoint with id 1 + checkpointSnapshotStrategy.notifyCheckpointComplete(1L); + + // make checkpoint with id 3, based on checkpoint 1 + IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 = + snapshot( + 3L, + checkpointSnapshotStrategy, + checkpointStreamFactory, + closeableRegistry); + + // Late notify checkpoint with id 2 + checkpointSnapshotStrategy.notifyCheckpointComplete(2L); + + // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st + // one, so it should be based on nothing, thus this is effectively a full checkpoint. + assertThat(incrementalRemoteKeyedStateHandle3.getStateSize()) + .isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize()); + } + } + public RocksIncrementalSnapshotStrategy createSnapshotStrategy() throws IOException, RocksDBException {