Skip to content

Conversation

@1996fanrui
Copy link
Member

@1996fanrui 1996fanrui commented Nov 19, 2025

What is the purpose of the change

  1. Find latest checkpoint should be placed in catch block as well,

Get more details from https://github.com/apache/flink/pull/27119/files#r2542189639

  1. The second job expects failover once, and finishes after source generates all records. So removing @ThrowableAnnotation(ThrowableType.NonRecoverableError) for TestException.

Also, I introduced ExpectedFinalJobStatus in UnalignedSettings to check the final JobStatus.

Get more details from #27254 (comment)

Brief change log

[FLINK-38403][tests] Fix the unexpected test that the second job does not restore from checkpoint

@1996fanrui 1996fanrui requested a review from AHeise November 19, 2025 17:56
@flinkbot
Copy link
Collaborator

flinkbot commented Nov 19, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Comment on lines -775 to +795
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds the wrong prefix, so updated restoreCheckpoint to String.

Image

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for fixing this test! Left two remarks that need to be addressed before approval.

Comment on lines 623 to 625
assertNotNull(
"First job must generate a checkpoint for rescale test to be valid.",
checkpointDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use assertj (assertThat(checkpointDir).as("First job must generate a checkpoint for rescale test to be valid.").isNotNull)

Comment on lines 200 to 202
return CommonTestUtils.getLatestCompletedCheckpointPath(
jobID, miniCluster.getMiniCluster())
.map(File::new)
.orElseThrow(() -> new AssertionError("Could not generate checkpoint"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we Fail.fail("Expected exception") here?

@1996fanrui 1996fanrui force-pushed the 38403/ITCase-rescale-not-restore branch from 80692f1 to 4e675d2 Compare November 20, 2025 11:23
Copy link
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AHeise for the quick review, all comments are addressed.

@1996fanrui 1996fanrui force-pushed the 38403/ITCase-rescale-not-restore branch from 4e675d2 to 736ad29 Compare November 20, 2025 14:23
@snuyanzin
Copy link
Contributor

snuyanzin commented Nov 20, 2025

fyi: to have green ci, rebase to the latest master
e2e was fixed at FLINK-38700

@1996fanrui
Copy link
Member Author

fyi: to have green ci, rebase to the latest master e2e was fixed at FLINK-38700

Thanks @snuyanzin for the reminder, I am still changing the PR, and I will rebase master branch before next push.

…ished eventually

Add comments to help understand the UnalignedCheckpointRescaleITCase
@1996fanrui 1996fanrui force-pushed the 38403/ITCase-rescale-not-restore branch from 736ad29 to 8938499 Compare November 26, 2025 13:05
Comment on lines +626 to +627
* <p>Postscale phase: Job restores from checkpoint with different parallelism, failovers once,
* and finishes after source generates all records.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second job expects failover once, and finishes after source generates all records. So removing @ThrowableAnnotation(ThrowableType.NonRecoverableError) for TestException.

Also, I introduced ExpectedFinalJobStatus in UnalignedSettings to check the final JobStatus.

@1996fanrui 1996fanrui requested a review from AHeise November 28, 2025 14:22
Comment on lines +775 to +776
RestartStrategyUtils.configureFixedDelayRestartStrategy(
env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the calculated number of expected failures?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It reverts the change in https://github.com/apache/flink/pull/27119/files#diff-ace775e80e66d4f4001bdaea6bcbaae1975bd5e9a5497532d8d7152e4090069aL752

The original intention is :

  • generateCheckpoint controls the operational phase: true is for the job before rescaling, and false is for the new job after rescaling
  • The value expectedFailures / 2 acts as the failure threshold for the first job. This setup ensures that the first job fails after half of the expected exceptions are met, allowing the second job to automatically recover from the generated checkpoint and continue consumption.

@1996fanrui 1996fanrui merged commit c4d6344 into apache:master Dec 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants