2020package org .apache .flink .test .checkpointing ;
2121
2222import org .apache .flink .api .common .JobExecutionResult ;
23+ import org .apache .flink .api .common .JobStatus ;
2324import org .apache .flink .api .common .accumulators .LongCounter ;
2425import org .apache .flink .api .common .functions .FilterFunction ;
2526import org .apache .flink .api .common .functions .MapFunction ;
@@ -366,14 +367,7 @@ public String map(Long value) throws Exception {
366367 })
367368 .name ("long-to-string-map" )
368369 .uid ("long-to-string-map" )
369- .map (
370- new FailingMapper <>(
371- state -> false ,
372- state ->
373- state .completedCheckpoints >= minCheckpoints / 2
374- && state .runNumber == 0 ,
375- state -> false ,
376- state -> false ))
370+ .map (getFailingMapper (minCheckpoints ))
377371 .name ("failing-map" )
378372 .uid ("failing-map" )
379373 .setParallelism (parallelism )
@@ -394,14 +388,7 @@ void addFailingSink(
394388 DataStream <Long > combinedSource , long minCheckpoints , boolean slotSharing ) {
395389 combinedSource
396390 .shuffle ()
397- .map (
398- new FailingMapper <>(
399- state -> false ,
400- state ->
401- state .completedCheckpoints >= minCheckpoints / 2
402- && state .runNumber == 0 ,
403- state -> false ,
404- state -> false ))
391+ .map (getFailingMapper (minCheckpoints ))
405392 .name ("failing-map" )
406393 .uid ("failing-map" )
407394 .slotSharingGroup (slotSharing ? "default" : "failing-map" )
@@ -418,6 +405,25 @@ void addFailingSink(
418405 .slotSharingGroup (slotSharing ? "default" : "sink" );
419406 }
420407
408+ /**
409+ * Creates a FailingMapper that only fails during snapshot operations.
410+ *
411+ * <p>Only fails during snapshotState() when completedCheckpoints >= minCheckpoints/2 AND
412+ * runNumber == 0. After job failovers internally, runNumber becomes attemptNumber > 0, so
413+ * failure condition is no longer satisfied. This ensures the mapper fails exactly once
414+ * during initial run to trigger job failover, but never fails again after failing over and
415+ * recovery from checkpoint.
416+ */
417+ private static <T > FailingMapper <T > getFailingMapper (long minCheckpoints ) {
418+ return new FailingMapper <>(
419+ state -> false ,
420+ state ->
421+ state .completedCheckpoints >= minCheckpoints / 2
422+ && state .runNumber == 0 ,
423+ state -> false ,
424+ state -> false );
425+ }
426+
421427 DataStream <Long > createSourcePipeline (
422428 StreamExecutionEnvironment env ,
423429 int minCheckpoints ,
@@ -611,13 +617,23 @@ public UnalignedCheckpointRescaleITCase(
611617 this .sourceSleepMs = sourceSleepMs ;
612618 }
613619
620+ /**
621+ * Tests unaligned checkpoint rescaling behavior.
622+ *
623+ * <p>Prescale phase: Job fails when completedCheckpoints >= minCheckpoints/2 via FailingMapper.
624+ * Generates checkpoint for rescale test.
625+ *
626+ * <p>Postscale phase: Job restores from checkpoint with different parallelism, failovers once,
627+ * and finishes after source generates all records.
628+ */
614629 @ Test
615630 public void shouldRescaleUnalignedCheckpoint () throws Exception {
616631 final UnalignedSettings prescaleSettings =
617632 new UnalignedSettings (topology )
618633 .setParallelism (oldParallelism )
619634 .setExpectedFailures (1 )
620- .setSourceSleepMs (sourceSleepMs );
635+ .setSourceSleepMs (sourceSleepMs )
636+ .setExpectedFinalJobStatus (JobStatus .FAILED );
621637 prescaleSettings .setGenerateCheckpoint (true );
622638 final String checkpointDir = super .execute (prescaleSettings );
623639 assertThat (checkpointDir )
@@ -627,7 +643,8 @@ public void shouldRescaleUnalignedCheckpoint() throws Exception {
627643 final UnalignedSettings postscaleSettings =
628644 new UnalignedSettings (topology )
629645 .setParallelism (newParallelism )
630- .setExpectedFailures (1 );
646+ .setExpectedFailures (1 )
647+ .setExpectedFinalJobStatus (JobStatus .FINISHED );
631648 postscaleSettings .setRestoreCheckpoint (checkpointDir );
632649 super .execute (postscaleSettings );
633650 }
0 commit comments