2727import  org .apache .flink .configuration .Configuration ;
2828import  org .apache .flink .configuration .ExternalizedCheckpointRetention ;
2929import  org .apache .flink .configuration .MemorySize ;
30+ import  org .apache .flink .configuration .RestartStrategyOptions ;
3031import  org .apache .flink .configuration .StateRecoveryOptions ;
3132import  org .apache .flink .configuration .TaskManagerOptions ;
3233import  org .apache .flink .connector .datagen .source .DataGeneratorSource ;
5859import  java .util .Collections ;
5960import  java .util .Random ;
6061
62+ import  static  org .apache .flink .configuration .RestartStrategyOptions .RestartStrategyType .NO_RESTART_STRATEGY ;
63+ 
6164/** 
6265 * Integration test for rescaling jobs with mixed (UC-supported and UC-unsupported) exchanges from 
6366 * an unaligned checkpoint. 
@@ -81,7 +84,8 @@ public static Collection<ExecuteJobViaEnv> parameter() {
8184                UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMultiOutputDAG ,
8285                UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMultiInputDAG ,
8386                UnalignedCheckpointRescaleWithMixedExchangesITCase ::createRescalePartitionerDAG ,
84-                 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMixedComplexityDAG );
87+                 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMixedComplexityDAG ,
88+                 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createPartEmptyHashExchangeDAG );
8589    }
8690
8791    @ Before 
@@ -138,6 +142,7 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
138142        conf .set (CheckpointingOptions .CHECKPOINTING_INTERVAL , Duration .ofSeconds (1 ));
139143        // Disable aligned timeout to ensure it works with unaligned checkpoint directly 
140144        conf .set (CheckpointingOptions .ALIGNED_CHECKPOINT_TIMEOUT , Duration .ofSeconds (0 ));
145+         conf .set (RestartStrategyOptions .RESTART_STRATEGY , NO_RESTART_STRATEGY .getMainValue ());
141146        conf .set (
142147                CheckpointingOptions .EXTERNALIZED_CHECKPOINT_RETENTION ,
143148                ExternalizedCheckpointRetention .RETAIN_ON_CANCELLATION );
@@ -337,6 +342,53 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
337342        return  env .executeAsync ();
338343    }
339344
345+     /** 
346+      * Creates a DAG where the downstream MapAfterKeyBy task receives input from two hash exchanges: 
347+      * one with actual data and one that is empty due to filtering. This tests unaligned checkpoint 
348+      * rescaling with mixed empty and non-empty hash partitions. 
349+      */ 
350+     private  static  JobClient  createPartEmptyHashExchangeDAG (StreamExecutionEnvironment  env )
351+             throws  Exception  {
352+         int  source1Parallelism  = getRandomParallelism ();
353+         DataGeneratorSource <Long > source1  =
354+                 new  DataGeneratorSource <>(
355+                         index  -> index ,
356+                         Long .MAX_VALUE ,
357+                         RateLimiterStrategy .perSecond (5000 ),
358+                         Types .LONG );
359+         DataStream <Long > sourceStream1  =
360+                 env .fromSource (source1 , WatermarkStrategy .noWatermarks (), "Source 1" )
361+                         .setParallelism (source1Parallelism );
362+ 
363+         int  source2Parallelism  = getRandomParallelism ();
364+         DataGeneratorSource <Long > source2  =
365+                 new  DataGeneratorSource <>(
366+                         index  -> index ,
367+                         Long .MAX_VALUE ,
368+                         RateLimiterStrategy .perSecond (5000 ),
369+                         Types .LONG );
370+ 
371+         // Filter all records to simulate empty state exchange 
372+         DataStream <Long > sourceStream2  =
373+                 env .fromSource (source2 , WatermarkStrategy .noWatermarks (), "Source 2" )
374+                         .setParallelism (source2Parallelism )
375+                         .filter (value  -> false )
376+                         .setParallelism (source2Parallelism );
377+ 
378+         sourceStream1 
379+                 .union (sourceStream2 )
380+                 .keyBy ((KeySelector <Long , Long >) value  -> value )
381+                 .map (
382+                         x  -> {
383+                             Thread .sleep (5 );
384+                             return  x ;
385+                         })
386+                 .name ("MapAfterKeyBy" )
387+                 .setParallelism (getRandomParallelism ());
388+ 
389+         return  env .executeAsync ();
390+     }
391+ 
340392    private  static  int  getRandomParallelism () {
341393        return  RANDOM .nextInt (MAX_SLOTS ) + 1 ;
342394    }
0 commit comments