Skip to content

Commit aceb32d

Browse files
committed
KAFKA-19747: Update ClientTelemetryReporter telemetry push error handling (#20661)
When a failure occurs with a push telemetry request, any exception is treated as fatal, increasing the time interval to `Integer.MAX_VALUE` effectively turning telemetry off. This PR updates the error handling to check if the exception is a transient one with expected recovery and keeps the telemetry interval value the same in those cases since a recovery is expected. Reviewers: Apoorv Mittal <[email protected]>, Matthias Sax<[email protected]>
1 parent 6c8223c commit aceb32d

File tree

3 files changed

+173
-3
lines changed

3 files changed

+173
-3
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
files="ClientUtils.java"/>
8888

8989
<suppress checks="ClassDataAbstractionCoupling"
90-
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
90+
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
9191
<suppress checks="ClassDataAbstractionCoupling"
9292
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
9393

clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.KafkaException;
2121
import org.apache.kafka.common.Uuid;
2222
import org.apache.kafka.common.errors.InterruptException;
23+
import org.apache.kafka.common.errors.RetriableException;
2324
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
2425
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
2526
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -526,13 +527,13 @@ public void handleResponse(PushTelemetryResponse response) {
526527
@Override
527528
public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
528529
log.debug("The broker generated an error for the get telemetry network API request", maybeFatalException);
529-
handleFailedRequest(maybeFatalException != null);
530+
handleFailedRequest(isRetryable(maybeFatalException));
530531
}
531532

532533
@Override
533534
public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
534535
log.debug("The broker generated an error for the push telemetry network API request", maybeFatalException);
535-
handleFailedRequest(maybeFatalException != null);
536+
handleFailedRequest(isRetryable(maybeFatalException));
536537
}
537538

538539
@Override
@@ -626,6 +627,12 @@ public void initiateClose() {
626627
}
627628
}
628629

630+
private boolean isRetryable(final KafkaException maybeFatalException) {
631+
return maybeFatalException == null ||
632+
(maybeFatalException instanceof RetriableException) ||
633+
(maybeFatalException.getCause() != null && maybeFatalException.getCause() instanceof RetriableException);
634+
}
635+
629636
private Optional<Builder<?>> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
630637
/*
631638
If we've previously retrieved a subscription, it will contain the client instance ID

clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@
1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.consumer.ConsumerConfig;
2121
import org.apache.kafka.clients.producer.ProducerConfig;
22+
import org.apache.kafka.common.KafkaException;
2223
import org.apache.kafka.common.Uuid;
24+
import org.apache.kafka.common.errors.AuthorizationException;
25+
import org.apache.kafka.common.errors.DisconnectException;
26+
import org.apache.kafka.common.errors.NetworkException;
27+
import org.apache.kafka.common.errors.TimeoutException;
28+
import org.apache.kafka.common.errors.UnsupportedVersionException;
2329
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
2430
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
2531
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -770,6 +776,163 @@ public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() {
770776
.telemetrySender()).state());
771777
}
772778

779+
@Test
780+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() {
781+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
782+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
783+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
784+
785+
KafkaException retriableException = new TimeoutException("Request timed out");
786+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException);
787+
788+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
789+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
790+
assertTrue(telemetrySender.enabled());
791+
}
792+
793+
@Test
794+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException() {
795+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
796+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
797+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
798+
799+
KafkaException wrappedException = new KafkaException(new DisconnectException("Connection lost"));
800+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
801+
802+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
803+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
804+
assertTrue(telemetrySender.enabled());
805+
}
806+
807+
@Test
808+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() {
809+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
810+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
811+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
812+
813+
KafkaException fatalException = new AuthorizationException("Not authorized for telemetry");
814+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException);
815+
816+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
817+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
818+
assertFalse(telemetrySender.enabled());
819+
}
820+
821+
@Test
822+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() {
823+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
824+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
825+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
826+
827+
KafkaException wrappedException = new KafkaException("Version check failed",
828+
new UnsupportedVersionException("Broker doesn't support telemetry"));
829+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
830+
831+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
832+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
833+
assertFalse(telemetrySender.enabled());
834+
}
835+
836+
@Test
837+
public void testHandleFailedPushTelemetryRequestWithRetriableException() {
838+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
839+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
840+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
841+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
842+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
843+
844+
KafkaException networkException = new NetworkException("Network failure");
845+
telemetrySender.handleFailedPushTelemetryRequest(networkException);
846+
847+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
848+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
849+
assertTrue(telemetrySender.enabled());
850+
}
851+
852+
@Test
853+
public void testHandleFailedPushTelemetryRequestWithFatalException() {
854+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
855+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
856+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
857+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
858+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
859+
860+
KafkaException authException = new AuthorizationException("Not authorized to push telemetry");
861+
telemetrySender.handleFailedPushTelemetryRequest(authException);
862+
863+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
864+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
865+
assertFalse(telemetrySender.enabled());
866+
}
867+
868+
@Test
869+
public void testHandleFailedRequestWithMultipleRetriableExceptionsInChain() {
870+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
871+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
872+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
873+
874+
KafkaException chainedException = new TimeoutException("Outer timeout",
875+
new DisconnectException("Inner disconnect"));
876+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException);
877+
878+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
879+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
880+
assertTrue(telemetrySender.enabled());
881+
}
882+
883+
@Test
884+
public void testHandleFailedRequestWithGenericKafkaException() {
885+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
886+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
887+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
888+
889+
KafkaException genericException = new KafkaException("Unknown error");
890+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException);
891+
892+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
893+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
894+
assertFalse(telemetrySender.enabled());
895+
}
896+
897+
@Test
898+
public void testHandleFailedRequestDuringTermination() {
899+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
900+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
901+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
902+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
903+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
904+
905+
KafkaException exception = new TimeoutException("Timeout");
906+
telemetrySender.handleFailedPushTelemetryRequest(exception);
907+
908+
assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, telemetrySender.state());
909+
assertTrue(telemetrySender.enabled());
910+
}
911+
912+
@Test
913+
public void testSequentialFailuresWithDifferentExceptionTypes() {
914+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
915+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
916+
917+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
918+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
919+
new TimeoutException("Timeout 1"));
920+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
921+
assertTrue(telemetrySender.enabled());
922+
923+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
924+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
925+
new DisconnectException("Disconnect"));
926+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
927+
assertTrue(telemetrySender.enabled());
928+
929+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
930+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
931+
new UnsupportedVersionException("Version not supported"));
932+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
933+
assertFalse(telemetrySender.enabled());
934+
}
935+
773936
@AfterEach
774937
public void tearDown() {
775938
clientTelemetryReporter.close();

0 commit comments

Comments
 (0)