Ensure Executor tests use up-to-date cluster metadata#2338
Ensure Executor tests use up-to-date cluster metadata#2338kyguy wants to merge 3 commits intolinkedin:mainfrom
Conversation
shutDownBroker waits for broker to stop responding to AdminClient.714efa2 to
8c5c7cc
Compare
danielgospodinow
left a comment
There was a problem hiding this comment.
LGTM! Seems like worst case we would wait for almost 1 min for a broker to terminate. But I guess that should be a rare case.
80813bd to
3527646
Compare
3527646 to
bacbaa0
Compare
|
Hey @mimaison, been hunting down some flaky tests, could you have a pass at this as well when you get a chance? |
| waitUntil( | ||
| () -> { | ||
| try { | ||
| _adminClient.describeLogDirs(Collections.singletonList(brokerId)) |
There was a problem hiding this comment.
Can you detail the tests flakiness you're trying to fix with this?
I don't understand how this call could work when the container has been stopped and is not part of the cluster metadata anymore.
There was a problem hiding this comment.
The testBrokerDiesBeforeMovingPartition test of the ExecutorTest class has been failing intermittently for CI in some PRs. [1] [2]
com.linkedin.kafka.cruisecontrol.executor.ExecutorTest > testBrokerDiesBeforeMovingPartition FAILED
java.lang.IllegalStateException: Failed to retrieve if there are already ongoing intra-broker replica reassignments.
at com.linkedin.kafka.cruisecontrol.executor.Executor.sanityCheckOngoingMovement(Executor.java:1077)
at com.linkedin.kafka.cruisecontrol.executor.Executor.startExecution(Executor.java:1016)
at com.linkedin.kafka.cruisecontrol.executor.Executor.executeProposals(Executor.java:834)
at com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.executeAndVerifyProposals(ExecutorTest.java:828)
at com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.testBrokerDiesBeforeMovingPartition(ExecutorTest.java:198)
Caused by:
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:173)
at com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils.hasOngoingIntraBrokerReplicaMovement(ExecutorAdminUtils.java:112)
at com.linkedin.kafka.cruisecontrol.executor.Executor.sanityCheckOngoingMovement(Executor.java:1073)
... 4 more
I don't understand how this call could work when the container has been stopped and is not part of the cluster metadata anymore.
From what I understand, even after the broker is removed from the cluster metadata, the controller may still be busy processing the broker shutdown. During this time, it can be delayed or fail to respond to listPartitionReassignments() requests [3] here leading to a timeout error here [4]
This extra wait was added to wait for the controller isn't overloaded any more and that the broker is completely shutdown.
[1] https://circleci.com/api/v1.1/project/github/linkedin/cruise-control/7014/output/106/0?file=true&allocation-id=692e17b7cb705e774ad6fd9c-0-build%2FABCDEFGH
[2] https://productionresultssa7.blob.core.windows.net/actions-results/a214b7ca-4dfe-4f26-a306-b5c629799e56/workflow-job-run-9dd3c006-698a-57a5-a8ca-be66dd4c4066/logs/job/job-logs.txt?rsct=text%2Fplain&se=2025-12-08T14%3A20%3A34Z&sig=VC46M8yrXq%2B17YI6MaP0SJTgc%2F9goso5ZEPN82kvl1s%3D&ske=2025-12-08T22%3A35%3A22Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2025-12-08T10%3A35%3A22Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2025-11-05&sp=r&spr=https&sr=b&st=2025-12-08T14%3A10%3A29Z&sv=2025-11-05
[3] https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java#L371
[4] https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java#L1059-L1065
There was a problem hiding this comment.
Upon further investigation, it looks like metadataClient reports the killed broker in its live broker list in the call here _metadataClient.cluster().nodes().stream().mapToInt(Node::id).boxed().collect(Collectors.toSet()) [1] even though the broker has been shutdown and removed from the cluster metadata here. [2] The test fails when the Admin client calls in the hasOngoingIntraBrokerReplicaMovement() method attempt to query log directory information from the dead broker. [3]
From what I understand, even after migrating away from the MetadataClient and onto the public Kafka API for metadata management, we will still need some sort of wait in the tests to wait for the dead broker to be removed from the cluster metadata and the topic metadata.
I have been testing a new metadata client that exclusively uses public Kafka APIs (named MetadataAdminClient) and the describeCluster().nodes.get() Admin client calls (after the shutdown() method) list the dead broker as alive sometimes when we don't explicitly wait for the dead broker to be removed from the topic metadata beforehand. My current suspicion is that the Admin client is seeing the id of the dead broker in topic metadata and thinks the broker is still alive.
Anyways, I have added an explicit wait for the topic metadata in the affected test in my latest commit so that the test passes consistently. That being said, I am still considering where the best place for this wait should be, either in the shutdown() method or the affected test (assuming we need the topic metadata wait).
[1] https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java#L1073
[2] https://github.com/linkedin/cruise-control/blob/main/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java#L380-L399
[3] https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorAdminUtils.java#L112
There was a problem hiding this comment.
I have been testing a new metadata client that exclusively uses public Kafka APIs (named MetadataAdminClient) and the describeCluster().nodes.get() Admin client calls (after the shutdown() method) list the dead broker as alive sometimes when we don't explicitly wait for the dead broker to be removed from the topic metadata beforehand.
Wait ... I was short circuiting my cluster metadata waits with my exception handling in the shutdown() method. It looks like I shouldn't need this extra logic afterall. I'll strip it out
| } | ||
|
|
||
| // Wait for leader movement | ||
| TopicDescription topicAfterLeaderMove = _cluster.waitForTopicMetadata(tp.topic(), Duration.ofSeconds(60), |
There was a problem hiding this comment.
Do you have the logs of a failure? What are the partition replicas? Above we checked the replication factor matches the expected value, so I'm curious what the replica list is.
There was a problem hiding this comment.
Do you have the logs of a failure?
Unfortunately not, the PRs that tripped the race condition have had the tests re-run, this was the assertion that failed [1] IIRC, it was the testReplicaReassignmentProgressWithThrottle() that was failing specifically, I'll try reproducing and getting logs.
There was a problem hiding this comment.
I wasn't able to reproduce this issue locally or in the CI so I am going to strip this code out for now. If this assertion fails in the future I'll make sure to grab the logs and open a separate issue/fix for it.
|
Hi @kyguy , just an fyi that me and @efeg will leave the company and we will probably lose the admin access to this repo. Please find @mhratson to merge/approve the PRs. Meanwhile I really appreciate you (and other active contributors) to help review the PRs and contribute a lot to make Cruise Control better. Thank you! |
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
bacbaa0 to
2c9e746
Compare
|
@mimaison could you take another look at this when you get a chance? |
| timeout, | ||
| String.format("Broker %s did not shutdown properly.", brokerId) | ||
| ); | ||
|
|
There was a problem hiding this comment.
This change does not make sense. It's trying to hide an issue specific to hasOngoingIntraBrokerReplicaMovement() that probably stems from theMetadataClient usage in Executor.
If we can't fix Executor now, I think it's acceptable to have this as an interim fix, but in that case it should be clearly documented so we remove this in the future. Have you tried updating Executor to not rely on internal Kafka APIs?
There was a problem hiding this comment.
It's trying to hide an issue specific to hasOngoingIntraBrokerReplicaMovement() that probably stems from the
MetadataClientusage in Executor.
After much debugging and testing it appears the issue was in fact surrounding the MetadataClient usage in the Executor. The MetadataClient and the Admin client used in the Executor class were not detecting cluster and topic metadata changes before the testing timeouts.
I have updated the Executor to take an Admin client as an parameter so the tests can load an Admin client configured with a higher refresh rate to avoid using stale metadata.
4abe756 to
4fabc99
Compare
1bd2f10 to
5171cc8
Compare
bff74fe to
75e68ed
Compare
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
75e68ed to
aeb8409
Compare
|
Hi @mimaison, could you have another pass when you get a chance? |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the PR. I left a few suggestions
| testImplementation 'org.powermock:powermock-module-junit4:2.0.9' | ||
| testImplementation 'org.powermock:powermock-api-easymock:2.0.9' | ||
| testImplementation "org.testcontainers:kafka:$testcontainersVersion" | ||
| testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion" |
There was a problem hiding this comment.
How is this related to fixing the Executor tests?
There was a problem hiding this comment.
Due to breaking changes in recent Docker Engine updates we need to upgrade Testcontainers to a version that adds compatibility fixes, either 1.21.4 or 2.0.3.
That being said, we could avoid this specific artifact name change by sticking with the 1.x versions instead or jumping to 2.x versions as I have done as part of my commit.
Let me update the version to 1.21.4 in my next commit to avoid this artifact name change.
| @Before | ||
| public void setUp() { | ||
| Properties adminClientProps = new Properties(); | ||
| adminClientProps.setProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "0"); |
There was a problem hiding this comment.
I was having trouble getting the tests to pass consistently without it in the past, but the tests are seem to be working fine without it now. I think passing the Admin client to the Executor class was the main fix that was needed to address the flaky tests.
Just removed it in the latest commit.
There was a problem hiding this comment.
I noticed the flakiness reappearing in the CircleCI tests without this configuration so I added this configuration back in.
From what I understand, without this configuration, the adminClient may get stale metadata when querying the cluster for the number of active brokers in the hasOngoingIntraBrokerReplicaMovement() method. This happens in the testBrokerDiesBeforeMovingPartition() test where a broker is shutdown and then metadata is requested from the active brokers (and the dead broker appears in the active brokers list)
From what I understand, we have a couple of options to solve this (1) Set AdminClientConfig.METADATA_MAX_AGE_CONFIG = "0" or (2) Update the shutDownBroker() method of the CCContainerizedKraftCluster class to block until a few consecutive describeCluster() calls show that the shutdown broker is no longer apart of the cluster.
There was a problem hiding this comment.
Fair enough, it's probably something to revisit later but I guess we can have it for now.
| _cluster.start(); | ||
| _brokerAddressList = _cluster.getBrokerAddressList(); | ||
| _bootstrapUrl = _cluster.getExternalBootstrapAddress(); | ||
| _adminClient = _cluster.adminClient(); |
There was a problem hiding this comment.
That client is never closed
There was a problem hiding this comment.
From what I understand, the client is closed at the end of every test by the teardown() method which calls _cluster.close() which calls CCContainerizedKraftCluster's stop() method:
@Override
public void stop() {
this._adminClient.close();
this._brokers.stream().parallel().forEach(GenericContainer::close);
}
Does this not suffice?
Would it be better if I called _cluster._stop() or ._adminClient.close() directly in the teardown() method instead?
There was a problem hiding this comment.
You're right CCContainerizedKraftCluster will close it when tearDown() runs.
49c9ee5 to
1295cb9
Compare
|
@mimaison Ready for another review (at your leisure of course) |
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the updates and tracking the underlying issues. Not all things are quite perfect yet but I think it's good enough so we can continue make progress on the other issues.
|
Hi @mhratson, I am not sure if you are still active here but if you are could you merge this when you get chance? It will address the flaky tests that are causing the CI to fail on other PRs. |
Summary
Why: The clients used to fetch metadata in the Executor tests (the admin client of the
CCContainerizedKraftClusterclass and theMetadataClientof theExecutorclass) can fetch stale cluster information while the controller propagates metadata updates to the brokers. This can cause flakiness in tests that depend on that metadata.What: This PR updates the Executor tests to have the clients refresh metadata more frequently and removes the Executor's dependence on the
MetadataClientand private Kafka APIs to avoid testing against stale metadata.Categorization