Skip to content

Commit 4e675d2

Browse files
committed
[FLINK-38403][tests] Fix the unexpected test that the second job does not restore from checkpoint
1 parent dae2cfe commit 4e675d2

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import org.junit.runner.RunWith;
5555
import org.junit.runners.Parameterized;
5656

57-
import java.io.File;
5857
import java.util.Arrays;
5958
import java.util.BitSet;
6059
import java.util.Collections;
6160

6261
import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
6362
import static org.apache.flink.util.Preconditions.checkState;
63+
import static org.assertj.core.api.Assertions.assertThat;
6464
import static org.hamcrest.Matchers.equalTo;
6565

6666
/** Integration test for performing rescale of unaligned checkpoint. */
@@ -619,8 +619,10 @@ public void shouldRescaleUnalignedCheckpoint() throws Exception {
619619
.setExpectedFailures(1)
620620
.setSourceSleepMs(sourceSleepMs);
621621
prescaleSettings.setGenerateCheckpoint(true);
622-
final File checkpointDir = super.execute(prescaleSettings);
623-
622+
final String checkpointDir = super.execute(prescaleSettings);
623+
assertThat(checkpointDir)
624+
.as("First job must generate a checkpoint for rescale test to be valid.")
625+
.isNotNull();
624626
// resume
625627
final UnalignedSettings postscaleSettings =
626628
new UnalignedSettings(topology)

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373

7474
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
7575

76+
import org.assertj.core.api.Fail;
7677
import org.junit.AfterClass;
7778
import org.junit.BeforeClass;
7879
import org.junit.Rule;
@@ -150,7 +151,7 @@ public static void afterAll() {
150151
}
151152

152153
@Nullable
153-
protected File execute(UnalignedSettings settings) throws Exception {
154+
protected String execute(UnalignedSettings settings) throws Exception {
154155
final File checkpointDir = temp.newFolder();
155156
Configuration conf = settings.getConfiguration(checkpointDir);
156157

@@ -179,14 +180,15 @@ protected File execute(UnalignedSettings settings) throws Exception {
179180
final StreamExecutionEnvironment env =
180181
StreamExecutionEnvironment.getExecutionEnvironment(conf);
181182
settings.configure(env);
183+
JobID jobID = null;
182184
try {
183185
// print the test parameters to help debugging when the case is stuck
184186
System.out.println(
185187
"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
186188
final CompletableFuture<JobSubmissionResult> result =
187189
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
188190

189-
final JobID jobID = result.get().getJobID();
191+
jobID = result.get().getJobID();
190192
checkCounters(
191193
miniCluster
192194
.getMiniCluster()
@@ -198,13 +200,17 @@ protected File execute(UnalignedSettings settings) throws Exception {
198200
if (settings.generateCheckpoint) {
199201
return CommonTestUtils.getLatestCompletedCheckpointPath(
200202
jobID, miniCluster.getMiniCluster())
201-
.map(File::new)
202-
.orElseThrow(() -> new AssertionError("Could not generate checkpoint"));
203+
.orElseThrow(() -> Fail.fail("Could not generate checkpoint"));
203204
}
204205
} catch (Exception e) {
205206
if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) {
206207
throw e;
207208
}
209+
if (settings.generateCheckpoint) {
210+
return CommonTestUtils.getLatestCompletedCheckpointPath(
211+
jobID, miniCluster.getMiniCluster())
212+
.orElseThrow(() -> Fail.fail("Could not generate checkpoint"));
213+
}
208214
} finally {
209215
miniCluster.after();
210216
}
@@ -680,7 +686,7 @@ public String toString() {
680686
protected static class UnalignedSettings {
681687
private int parallelism;
682688
private final int minCheckpoints = 10;
683-
@Nullable private File restoreCheckpoint;
689+
@Nullable private String restoreCheckpoint;
684690
private boolean generateCheckpoint = false;
685691
int expectedFailures = 0;
686692
int tolerableCheckpointFailures = 0;
@@ -701,7 +707,7 @@ public UnalignedSettings setParallelism(int parallelism) {
701707
return this;
702708
}
703709

704-
public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) {
710+
public UnalignedSettings setRestoreCheckpoint(String restoreCheckpoint) {
705711
this.restoreCheckpoint = restoreCheckpoint;
706712
return this;
707713
}
@@ -772,7 +778,7 @@ public Configuration getConfiguration(File checkpointDir) {
772778
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
773779
conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
774780
if (restoreCheckpoint != null) {
775-
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
781+
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint);
776782
}
777783

778784
conf.set(

0 commit comments

Comments
 (0)