-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19891: Suppress error-level heartbeat logs during consumer close #20965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…ut/Disconnect to WARN
kirktrue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a fix for KAFKA-19891 or no? If it is, the PR subject line should start with "KAFKA-19891: AsyncKafkaConsumer..."
In the case described in the bug, the log message is emitted on close(). One option is to update AbstractHeartbeatRequestManager to implement the signalClose() API (part of the RequestManager interface) to set a flag when it's closing. Then that flag can be checked before logging, e.g.:
});
}
+ private boolean isClosed;
+
+ @Override
+ public void signalClose() {
+ isClosed = true;
+ }
+
@SuppressWarnings("unchecked")
private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDelegate.UnsentRequest request) {
return request.whenComplete((response, exception) -> {
@@ -315,7 +322,10 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
else
logger.error("{} failed because of {}: {}", heartbeatRequestName(), error, response);
} else {
- logger.error("{} failed because of unexpected exception.", heartbeatRequestName(), exception);
+ if (isClosed)
+ logger.debug("{} failed because of exception during close.", heartbeatRequestName(), exception);
+ else
+ logger.error("{} failed because of unexpected exception.", heartbeatRequestName(), exception);
}
});
}
|
sure will make the changes |
kirktrue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates!
There are a few other places where we've written tests that can capture the logging output, just to ensure the log level and message is as expected. One example is CoordinatorRequestManagerTest.testMarkCoordinatorUnknownLoggingAccuracy(). Is it possible to write a test that triggers this case to verify that the fix works?
| if (isClosed) { | ||
| logger.debug("{} failed because of exception during close: {}", heartbeatRequestName(), exception); | ||
| } else | ||
| logger.error("{} failed because of unexpected exception: {}", heartbeatRequestName(), exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpicky, sure, but I think we should be consistent with the use of the brackets.
| if (isClosed) { | ||
| logger.debug("StreamsGroupHeartbeatRequest failed because of exception during close: ", exception); | ||
| } else | ||
| logger.error("StreamsGroupHeartbeatRequest failed because of unexpected exception: ", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: both the if and the else should use brackets or both if and the else branches should not use brackets.
This PR improves the logging behavior in AbstractHeartbeatRequestManager
when the heartbeat response is null due to an exception.
Currently, TimeoutException / DisconnectException are logged at ERROR
level with full stacktrace, even though these are normal operational
conditions (e.g., broker down, network turbulence).
The classic KafkaConsumer logs such cases at WARN, and the Jira ticket
KAFKA-19891 confirms that the async consumer should behave
consistently.
This PR:
stacktrace)
No functional behaviour has been changed.