diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoffManager.java b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoffManager.java new file mode 100644 index 0000000000000..87e730f9b7601 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoffManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +/** + * Manages retry attempts and exponential backoff for requests. + */ +public class ExponentialBackoffManager { + private final int maxAttempts; + private int attempts; + private final ExponentialBackoff backoff; + + public ExponentialBackoffManager(int maxAttempts, long initialInterval, int multiplier, long maxInterval, double jitter) { + this.maxAttempts = maxAttempts; + this.backoff = new ExponentialBackoff( + initialInterval, + multiplier, + maxInterval, + jitter); + } + + public void incrementAttempt() { + attempts++; + } + + public void resetAttempts() { + attempts = 0; + } + + public boolean canAttempt() { + return attempts < maxAttempts; + } + + public long backOff() { + return this.backoff.backoff(attempts); + } + + public int attempts() { + return attempts; + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffManagerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffManagerTest.java new file mode 100644 index 0000000000000..87fe09858584e --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffManagerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ExponentialBackoffManagerTest { + + private static final ArrayList BACKOFF_LIST = new ArrayList<>(List.of(100L, 200L, 400L, 800L, 1600L)); + + @Test + public void testInitialState() { + ExponentialBackoffManager manager = new ExponentialBackoffManager( + 5, 100, 2, 1000, 0.0); + assertEquals(0, manager.attempts()); + assertTrue(manager.canAttempt()); + } + + @Test + public void testIncrementAttempt() { + ExponentialBackoffManager manager = new ExponentialBackoffManager( + 5, 100, 2, 1000, 0.0); + assertEquals(0, manager.attempts()); + manager.incrementAttempt(); + assertEquals(1, manager.attempts()); + } + + @Test + public void testResetAttempts() { + ExponentialBackoffManager manager = new ExponentialBackoffManager( + 5, 100, 2, 1000, 0.0); + manager.incrementAttempt(); + manager.incrementAttempt(); + manager.incrementAttempt(); + assertEquals(3, manager.attempts()); + + manager.resetAttempts(); + assertEquals(0, manager.attempts()); + assertTrue(manager.canAttempt()); + } + + @Test + public void testCanAttempt() { + ExponentialBackoffManager manager = new ExponentialBackoffManager( + 3, 100, 2, 1000, 0.0); + // Initially can attempt + assertTrue(manager.canAttempt()); + assertEquals(0, manager.attempts()); + + manager.incrementAttempt(); + manager.incrementAttempt(); + manager.incrementAttempt(); + // After all retry attempts are exhausted + assertFalse(manager.canAttempt()); + assertEquals(3, manager.attempts()); + } + + @Test + public void testBackOffWithoutJitter() { + ExponentialBackoffManager manager = new ExponentialBackoffManager( + 5, 100, 2, 1000, 0.0); + for (int i = 0; i < 5; i++) { + long backoff = manager.backOff(); + // without jitter, the backoff values should be exact multiples. + assertEquals(Math.min(1000L, BACKOFF_LIST.get(i)), backoff); + manager.incrementAttempt(); + } + } + + @Test + public void testBackOffWithJitter() { + ExponentialBackoffManager manager = new ExponentialBackoffManager( + 5, 100, 2, 1000, 0.2); + for (int i = 0; i < 5; i++) { + long backoff = manager.backOff(); + // with jitter, the backoff values should be within 20% of the expected value. + assertTrue(backoff >= 0.8 * Math.min(1000L, BACKOFF_LIST.get(i))); + assertTrue(backoff <= 1.2 * Math.min(1000L, BACKOFF_LIST.get(i))); + manager.incrementAttempt(); + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 3cba40491cd2d..70e18a626a3c6 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -635,7 +635,8 @@ class BrokerServer( new LogContext(s"[NetworkPartitionMetadataClient broker=${config.brokerId}]") ), Time.SYSTEM, - config.interBrokerListenerName() + config.interBrokerListenerName(), + new SystemTimerReaper("network-partition-metadata-client-reaper", new SystemTimer("network-partition-metadata-client")) ) } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java index 355b0d2fb2132..56297c408f0ce 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java @@ -30,10 +30,14 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.ExponentialBackoffManager; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.server.util.InterBrokerSendThread; import org.apache.kafka.server.util.RequestAndCompletionHandler; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,20 +58,42 @@ public class NetworkPartitionMetadataClient implements PartitionMetadataClient { private static final Logger log = LoggerFactory.getLogger(NetworkPartitionMetadataClient.class); + private static final long REQUEST_BACKOFF_MS = 1_000L; + private static final long REQUEST_BACKOFF_MAX_MS = 30_000L; + private static final int MAX_RETRY_ATTEMPTS = 5; + private final MetadataCache metadataCache; private final Supplier networkClientSupplier; private final Time time; private final ListenerName listenerName; private final AtomicBoolean initialized = new AtomicBoolean(false); private volatile SendThread sendThread; + private final Timer timer; public NetworkPartitionMetadataClient(MetadataCache metadataCache, Supplier networkClientSupplier, - Time time, ListenerName listenerName) { + Time time, ListenerName listenerName, Timer timer) { + if (metadataCache == null) { + throw new IllegalArgumentException("MetadataCache must not be null."); + } + if (networkClientSupplier == null) { + throw new IllegalArgumentException("NetworkClientSupplier must not be null."); + } + if (time == null) { + throw new IllegalArgumentException("Time must not be null."); + } + if (listenerName == null) { + throw new IllegalArgumentException("ListenerName must not be null."); + } + if (timer == null) { + throw new IllegalArgumentException("Timer must not be null."); + } + this.metadataCache = metadataCache; this.networkClientSupplier = networkClientSupplier; this.time = time; this.listenerName = listenerName; + this.timer = timer; } @Override @@ -125,6 +151,7 @@ public Map> listLatestOffsets( public void close() { // Only close sendThread if it was initialized. Note, close is called only during broker shutdown, so need // for further synchronization here. + Utils.closeQuietly(timer, "NetworkPartitionMetadataClient timer"); if (!initialized.get()) { return; } @@ -186,14 +213,18 @@ private ListOffsetsRequest.Builder createListOffsetsRequest(List * Handles the response from a ListOffsets request. */ // Visible for Testing. - void handleResponse(Map> partitionFutures, ClientResponse clientResponse) { + void handleResponse(PendingRequest pendingRequest, ClientResponse clientResponse) { // Handle error responses first - if (maybeHandleErrorResponse(partitionFutures, clientResponse)) { + if (maybeHandleErrorResponse(pendingRequest, clientResponse)) { return; } log.debug("ListOffsets response received successfully - {}", clientResponse); + // Reset retry attempts on success + pendingRequest.backoffManager().resetAttempts(); + ListOffsetsResponse response = (ListOffsetsResponse) clientResponse.responseBody(); + Map> partitionFutures = pendingRequest.futures(); for (ListOffsetsTopicResponse topicResponse : response.topics()) { String topicName = topicResponse.name(); @@ -216,11 +247,14 @@ void handleResponse(Map> parti } /** - * Handles error responses by completing all associated futures with an error. Returns true if an error was - * handled. Otherwise, returns false. + * Handles error responses by completing all associated futures with an error or retrying the request. + * Returns true if an error was handled. Otherwise, returns false. */ - private boolean maybeHandleErrorResponse(Map> partitionFutures, ClientResponse clientResponse) { + private boolean maybeHandleErrorResponse(PendingRequest pendingRequest, ClientResponse clientResponse) { + Map> partitionFutures = pendingRequest.futures(); Errors error; + boolean shouldRetry = false; + if (clientResponse == null) { log.error("Response for ListOffsets for topicPartitions: {} is null", partitionFutures.keySet()); error = Errors.UNKNOWN_SERVER_ERROR; @@ -231,11 +265,13 @@ private boolean maybeHandleErrorResponse(Map future.complete(new OffsetResponse(-1, error))); return true; } @@ -251,9 +304,39 @@ private boolean maybeHandleErrorResponse(Map> futures, - ListOffsetsRequest.Builder requestBuilder) { + // Visible for testing. + record PendingRequest(Node node, + Map> futures, + ListOffsetsRequest.Builder requestBuilder, + ExponentialBackoffManager backoffManager) { + PendingRequest(Node node, + Map> futures, + ListOffsetsRequest.Builder requestBuilder) { + this(node, futures, requestBuilder, new ExponentialBackoffManager( + MAX_RETRY_ATTEMPTS, + REQUEST_BACKOFF_MS, + CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, + REQUEST_BACKOFF_MAX_MS, + CommonClientConfigs.RETRY_BACKOFF_JITTER)); + } + } + + /** + * Timer task for retrying failed requests after backoff. + */ + private final class RetryTimerTask extends TimerTask { + private final PendingRequest pendingRequest; + + RetryTimerTask(long delayMs, PendingRequest pendingRequest) { + super(delayMs); + this.pendingRequest = pendingRequest; + } + + @Override + public void run() { + sendThread.enqueue(pendingRequest); + sendThread.wakeup(); + } } private class SendThread extends InterBrokerSendThread { @@ -286,8 +369,7 @@ public Collection generateRequests() { time.hiResClockMs(), current.node, requestBuilder, - response -> handleResponse(current.futures, response) - ); + response -> handleResponse(current, response)); requests.add(requestHandler); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java index 49e4bf8c783f9..98528de556f83 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; import org.apache.kafka.common.Node; @@ -31,8 +32,12 @@ import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.ExponentialBackoffManager; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.server.util.timer.MockTimer; +import org.apache.kafka.server.util.timer.Timer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -63,18 +68,27 @@ class NetworkPartitionMetadataClientTest { private static final MockTime MOCK_TIME = new MockTime(); private static final MetadataCache METADATA_CACHE = mock(MetadataCache.class); private static final Supplier KAFKA_CLIENT_SUPPLIER = () -> mock(KafkaClient.class); + private static final Timer MOCK_TIMER = new MockTimer(MOCK_TIME); private static final String HOST = "localhost"; private static final int PORT = 9092; private static final ListenerName LISTENER_NAME = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT); private static final String TOPIC = "test-topic"; private static final int PARTITION = 0; private static final Node LEADER_NODE = new Node(1, HOST, PORT); + private static final int MAX_RETRY_ATTEMPTS = 2; + private static final long REQUEST_BACKOFF_MS = 1_000L; + private static final long REQUEST_BACKOFF_MAX_MS = 30_000L; + private static final int RETRY_BACKOFF_EXP_BASE = CommonClientConfigs.RETRY_BACKOFF_EXP_BASE; + private static final double RETRY_BACKOFF_JITTER = CommonClientConfigs.RETRY_BACKOFF_JITTER; + private NetworkPartitionMetadataClient networkPartitionMetadataClient; private static class NetworkPartitionMetadataClientBuilder { private MetadataCache metadataCache = METADATA_CACHE; private Supplier kafkaClientSupplier = KAFKA_CLIENT_SUPPLIER; + private Time time = MOCK_TIME; + private Timer timer = MOCK_TIMER; NetworkPartitionMetadataClientBuilder withMetadataCache(MetadataCache metadataCache) { this.metadataCache = metadataCache; @@ -86,12 +100,22 @@ NetworkPartitionMetadataClientBuilder withKafkaClientSupplier(Supplier partitionFuture = new CompletableFuture<>(); Map> futures = Map.of( tp, - partitionFuture - ); + partitionFuture); networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder().build(); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + NetworkPartitionMetadataClient.PendingRequest pendingReqeust = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder); // Pass null as clientResponse. - networkPartitionMetadataClient.handleResponse(futures, null); + networkPartitionMetadataClient.handleResponse(pendingReqeust, null); assertTrue(partitionFuture.isDone() && !partitionFuture.isCompletedExceptionally()); PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); assertEquals(-1, response.offset()); @@ -290,14 +319,19 @@ public void testListLatestOffsetsAuthenticationError() throws ExecutionException CompletableFuture partitionFuture = new CompletableFuture<>(); Map> futures = Map.of( tp, - partitionFuture - ); + partitionFuture); AuthenticationException authenticationException = new AuthenticationException("Test authentication exception"); ClientResponse clientResponse = mock(ClientResponse.class); // Mock authentication exception in client response. when(clientResponse.authenticationException()).thenReturn(authenticationException); networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder().build(); - networkPartitionMetadataClient.handleResponse(futures, clientResponse); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + NetworkPartitionMetadataClient.PendingRequest pendingReqeust = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder); + networkPartitionMetadataClient.handleResponse(pendingReqeust, clientResponse); assertTrue(partitionFuture.isDone() && !partitionFuture.isCompletedExceptionally()); PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); assertEquals(-1, response.offset()); @@ -310,64 +344,26 @@ public void testListLatestOffsetsVersionMismatch() throws ExecutionException, In CompletableFuture partitionFuture = new CompletableFuture<>(); Map> futures = Map.of( tp, - partitionFuture - ); + partitionFuture); UnsupportedVersionException unsupportedVersionException = new UnsupportedVersionException("Test unsupportedVersionException exception"); ClientResponse clientResponse = mock(ClientResponse.class); when(clientResponse.authenticationException()).thenReturn(null); // Mock version mismatch exception in client response. when(clientResponse.versionMismatch()).thenReturn(unsupportedVersionException); networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder().build(); - networkPartitionMetadataClient.handleResponse(futures, clientResponse); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + NetworkPartitionMetadataClient.PendingRequest pendingReqeust = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder); + networkPartitionMetadataClient.handleResponse(pendingReqeust, clientResponse); assertTrue(partitionFuture.isDone() && !partitionFuture.isCompletedExceptionally()); PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); assertEquals(-1, response.offset()); assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), response.error().code()); } - @Test - public void testListLatestOffsetsDisconnected() throws ExecutionException, InterruptedException { - TopicPartition tp = new TopicPartition(TOPIC, PARTITION); - CompletableFuture partitionFuture = new CompletableFuture<>(); - Map> futures = Map.of( - tp, - partitionFuture - ); - ClientResponse clientResponse = mock(ClientResponse.class); - when(clientResponse.authenticationException()).thenReturn(null); - when(clientResponse.versionMismatch()).thenReturn(null); - // Mock disconnected in client response. - when(clientResponse.wasDisconnected()).thenReturn(true); - networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder().build(); - networkPartitionMetadataClient.handleResponse(futures, clientResponse); - assertTrue(partitionFuture.isDone() && !partitionFuture.isCompletedExceptionally()); - PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); - assertEquals(-1, response.offset()); - assertEquals(Errors.NETWORK_EXCEPTION.code(), response.error().code()); - } - - @Test - public void testListLatestOffsetsTimedOut() throws ExecutionException, InterruptedException { - TopicPartition tp = new TopicPartition(TOPIC, PARTITION); - CompletableFuture partitionFuture = new CompletableFuture<>(); - Map> futures = Map.of( - tp, - partitionFuture - ); - ClientResponse clientResponse = mock(ClientResponse.class); - when(clientResponse.authenticationException()).thenReturn(null); - when(clientResponse.versionMismatch()).thenReturn(null); - when(clientResponse.wasDisconnected()).thenReturn(false); - // Mock timed out in client response. - when(clientResponse.wasTimedOut()).thenReturn(true); - networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder().build(); - networkPartitionMetadataClient.handleResponse(futures, clientResponse); - assertTrue(partitionFuture.isDone() && !partitionFuture.isCompletedExceptionally()); - PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); - assertEquals(-1, response.offset()); - assertEquals(Errors.REQUEST_TIMED_OUT.code(), response.error().code()); - } - @Test public void testListLatestOffsetsMultiplePartitionsSameLeader() throws ExecutionException, InterruptedException { TopicPartition tp1 = new TopicPartition(TOPIC, PARTITION); @@ -969,4 +965,206 @@ public void testLazyInitializationOnlyOnce() { // Verify supplier was still only called once (not again) assertEquals(1, supplierCallCount[0]); } + + @Test + public void testRetryOnDisconnect() { + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + CompletableFuture partitionFuture = new CompletableFuture<>(); + Map> futures = Map.of( + tp, + partitionFuture); + MockTimer timer = new MockTimer(MOCK_TIME); + ClientResponse clientResponse = mock(ClientResponse.class); + when(clientResponse.authenticationException()).thenReturn(null); + when(clientResponse.versionMismatch()).thenReturn(null); + when(clientResponse.wasDisconnected()).thenReturn(true); + + networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder() + .withTimer(timer) + .build(); + + ExponentialBackoffManager exponentialBackoffManager = new ExponentialBackoffManager( + MAX_RETRY_ATTEMPTS, + REQUEST_BACKOFF_MS, + RETRY_BACKOFF_EXP_BASE, + REQUEST_BACKOFF_MAX_MS, + RETRY_BACKOFF_JITTER); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + NetworkPartitionMetadataClient.PendingRequest pendingRequest = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder, + exponentialBackoffManager); + + // Initially, timer should be empty + assertEquals(0, timer.size()); + assertEquals(0, exponentialBackoffManager.attempts()); + + // Handle disconnected response + networkPartitionMetadataClient.handleResponse(pendingRequest, clientResponse); + + // Verify that a timer entry is present for retry + assertEquals(1, timer.size()); + assertEquals(1, exponentialBackoffManager.attempts()); + // Future should not be completed yet since retry is scheduled + assertFalse(partitionFuture.isDone()); + } + + @Test + public void testRetryOnTimeout() { + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + CompletableFuture partitionFuture = new CompletableFuture<>(); + Map> futures = Map.of( + tp, + partitionFuture); + MockTimer timer = new MockTimer(MOCK_TIME); + ClientResponse clientResponse = mock(ClientResponse.class); + when(clientResponse.authenticationException()).thenReturn(null); + when(clientResponse.versionMismatch()).thenReturn(null); + when(clientResponse.wasDisconnected()).thenReturn(false); + when(clientResponse.wasTimedOut()).thenReturn(true); + + networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder() + .withTimer(timer) + .build(); + + ExponentialBackoffManager exponentialBackoffManager = new ExponentialBackoffManager( + MAX_RETRY_ATTEMPTS, + REQUEST_BACKOFF_MS, + RETRY_BACKOFF_EXP_BASE, + REQUEST_BACKOFF_MAX_MS, + RETRY_BACKOFF_JITTER); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + NetworkPartitionMetadataClient.PendingRequest pendingRequest = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder, + exponentialBackoffManager); + + // Initially, timer should be empty + assertEquals(0, timer.size()); + assertEquals(0, exponentialBackoffManager.attempts()); + + // Handle timeout response + networkPartitionMetadataClient.handleResponse(pendingRequest, clientResponse); + + // Verify that a timer entry is present for retry + assertEquals(1, timer.size()); + assertEquals(1, exponentialBackoffManager.attempts()); + // Future should not be completed yet since retry is scheduled + assertFalse(partitionFuture.isDone()); + } + + @Test + public void testMaxRetryAttemptsExhaustedOnDisconnect() throws ExecutionException, InterruptedException { + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + CompletableFuture partitionFuture = new CompletableFuture<>(); + Map> futures = Map.of( + tp, + partitionFuture); + MockTimer timer = new MockTimer(MOCK_TIME); + ClientResponse clientResponse = mock(ClientResponse.class); + when(clientResponse.authenticationException()).thenReturn(null); + when(clientResponse.versionMismatch()).thenReturn(null); + when(clientResponse.wasDisconnected()).thenReturn(true); + + networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder() + .withTimer(timer) + .build(); + + ExponentialBackoffManager exponentialBackoffManager = new ExponentialBackoffManager( + MAX_RETRY_ATTEMPTS, + REQUEST_BACKOFF_MS, + RETRY_BACKOFF_EXP_BASE, + REQUEST_BACKOFF_MAX_MS, + RETRY_BACKOFF_JITTER); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + + NetworkPartitionMetadataClient.PendingRequest pendingRequest = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder, + exponentialBackoffManager); + + // Initially, timer should be empty + assertEquals(0, timer.size()); + + // Exhaust all retry attempts by incrementing to MAX_RETRY_ATTEMPTS (5) + for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) { + exponentialBackoffManager.incrementAttempt(); + } + + // Verify that attempts are exhausted + assertFalse(exponentialBackoffManager.canAttempt()); + + // Handle disconnected response with exhausted retries + networkPartitionMetadataClient.handleResponse(pendingRequest, clientResponse); + + // Verify that no timer entry is added (max retries exhausted) + assertEquals(0, timer.size()); + // Verify that future is completed with error + assertTrue(partitionFuture.isDone()); + PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); + assertEquals(-1, response.offset()); + assertEquals(Errors.NETWORK_EXCEPTION.code(), response.error().code()); + } + + @Test + public void testMaxRetryAttemptsExhaustedOnTimeout() throws ExecutionException, InterruptedException { + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + CompletableFuture partitionFuture = new CompletableFuture<>(); + Map> futures = Map.of( + tp, + partitionFuture); + MockTimer timer = new MockTimer(MOCK_TIME); + ClientResponse clientResponse = mock(ClientResponse.class); + when(clientResponse.authenticationException()).thenReturn(null); + when(clientResponse.versionMismatch()).thenReturn(null); + when(clientResponse.wasDisconnected()).thenReturn(false); + when(clientResponse.wasTimedOut()).thenReturn(true); + + networkPartitionMetadataClient = NetworkPartitionMetadataClientBuilder.builder() + .withTimer(timer) + .build(); + + ExponentialBackoffManager exponentialBackoffManager = new ExponentialBackoffManager( + MAX_RETRY_ATTEMPTS, + REQUEST_BACKOFF_MS, + RETRY_BACKOFF_EXP_BASE, + REQUEST_BACKOFF_MAX_MS, + RETRY_BACKOFF_JITTER); + Node node = mock(Node.class); + ListOffsetsRequest.Builder builder = mock(ListOffsetsRequest.Builder.class); + + NetworkPartitionMetadataClient.PendingRequest pendingRequest = new NetworkPartitionMetadataClient.PendingRequest( + node, + futures, + builder, + exponentialBackoffManager); + + // Initially, timer should be empty + assertEquals(0, timer.size()); + + // Exhaust all retry attempts by incrementing to MAX_RETRY_ATTEMPTS (5) + for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) { + exponentialBackoffManager.incrementAttempt(); + } + + // Verify that attempts are exhausted + assertFalse(exponentialBackoffManager.canAttempt(), "Retry attempts should be exhausted"); + + // Handle timeout response with exhausted retries + networkPartitionMetadataClient.handleResponse(pendingRequest, clientResponse); + + // Verify that no timer entry is added (max retries exhausted) + assertEquals(0, timer.size(), "Timer should not have an entry when max retries are exhausted"); + // Verify that future is completed with error + assertTrue(partitionFuture.isDone(), "Future should be completed when max retries are exhausted"); + PartitionMetadataClient.OffsetResponse response = partitionFuture.get(); + assertEquals(-1, response.offset()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), response.error().code()); + } } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index 43562ecc17779..6c151686e9b69 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -53,7 +53,7 @@ import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateRequest; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; -import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.ExponentialBackoffManager; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.share.SharePartitionKey; @@ -94,6 +94,8 @@ public class PersisterStateManager { public static final long REQUEST_BACKOFF_MS = 1_000L; public static final long REQUEST_BACKOFF_MAX_MS = 30_000L; private static final int MAX_FIND_COORD_ATTEMPTS = 5; + private static final int RETRY_BACKOFF_EXP_BASE = CommonClientConfigs.RETRY_BACKOFF_EXP_BASE; + private static final double RETRY_BACKOFF_JITTER = CommonClientConfigs.RETRY_BACKOFF_JITTER; private final Time time; private final Timer timer; private final ShareCoordinatorMetadataCacheHelper cacheHelper; @@ -116,38 +118,6 @@ public class PersisterStateManager { // when generateRequests is called. private Runnable generateCallback; - private static class BackoffManager { - private final int maxAttempts; - private int attempts; - private final ExponentialBackoff backoff; - - BackoffManager(int maxAttempts, long initialBackoffMs, long maxBackoffMs) { - this.maxAttempts = maxAttempts; - this.backoff = new ExponentialBackoff( - initialBackoffMs, - CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, - maxBackoffMs, - CommonClientConfigs.RETRY_BACKOFF_JITTER - ); - } - - void incrementAttempt() { - attempts++; - } - - void resetAttempts() { - attempts = 0; - } - - boolean canAttempt() { - return attempts < maxAttempts; - } - - long backOff() { - return this.backoff.backoff(attempts); - } - } - public enum RPCType { INITIALIZE, READ, @@ -219,7 +189,7 @@ public void setGenerateCallback(Runnable generateCallback) { */ public abstract class PersisterStateManagerHandler implements RequestCompletionHandler { protected Node coordinatorNode; - private final BackoffManager findCoordBackoff; + private final ExponentialBackoffManager findCoordBackoff; protected final Logger log; private Consumer onCompleteCallback; protected final SharePartitionKey partitionKey; @@ -232,7 +202,12 @@ public PersisterStateManagerHandler( long backoffMaxMs, int maxRPCRetryAttempts ) { - this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + this.findCoordBackoff = new ExponentialBackoffManager( + maxRPCRetryAttempts, + backoffMs, + RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + RETRY_BACKOFF_JITTER); this.onCompleteCallback = response -> { }; // noop partitionKey = SharePartitionKey.getInstance(groupId, topicId, partition); @@ -522,7 +497,7 @@ public class InitializeStateHandler extends PersisterStateManagerHandler { private final int stateEpoch; private final long startOffset; private final CompletableFuture result; - private final BackoffManager initializeStateBackoff; + private final ExponentialBackoffManager initializeStateBackoff; public InitializeStateHandler( String groupId, @@ -539,7 +514,12 @@ public InitializeStateHandler( this.stateEpoch = stateEpoch; this.startOffset = startOffset; this.result = result; - this.initializeStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + this.initializeStateBackoff = new ExponentialBackoffManager( + maxRPCRetryAttempts, + backoffMs, + RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + RETRY_BACKOFF_JITTER); } public InitializeStateHandler( @@ -701,7 +681,7 @@ public class WriteStateHandler extends PersisterStateManagerHandler { private final int deliveryCompleteCount; private final List batches; private final CompletableFuture result; - private final BackoffManager writeStateBackoff; + private final ExponentialBackoffManager writeStateBackoff; public WriteStateHandler( String groupId, @@ -724,7 +704,12 @@ public WriteStateHandler( this.deliveryCompleteCount = deliveryCompleteCount; this.batches = batches; this.result = result; - this.writeStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + this.writeStateBackoff = new ExponentialBackoffManager( + maxRPCRetryAttempts, + backoffMs, + RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + RETRY_BACKOFF_JITTER); } public WriteStateHandler( @@ -887,7 +872,7 @@ protected RPCType rpcType() { public class ReadStateHandler extends PersisterStateManagerHandler { private final int leaderEpoch; private final CompletableFuture result; - private final BackoffManager readStateBackoff; + private final ExponentialBackoffManager readStateBackoff; public ReadStateHandler( String groupId, @@ -903,7 +888,12 @@ public ReadStateHandler( super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); this.leaderEpoch = leaderEpoch; this.result = result; - this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + this.readStateBackoff = new ExponentialBackoffManager( + maxRPCRetryAttempts, + backoffMs, + RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + RETRY_BACKOFF_JITTER); } public ReadStateHandler( @@ -1057,7 +1047,7 @@ protected RPCType rpcType() { public class ReadStateSummaryHandler extends PersisterStateManagerHandler { private final int leaderEpoch; private final CompletableFuture result; - private final BackoffManager readStateSummaryBackoff; + private final ExponentialBackoffManager readStateSummaryBackoff; public ReadStateSummaryHandler( String groupId, @@ -1073,7 +1063,12 @@ public ReadStateSummaryHandler( super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); this.leaderEpoch = leaderEpoch; this.result = result; - this.readStateSummaryBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + this.readStateSummaryBackoff = new ExponentialBackoffManager( + maxRPCRetryAttempts, + backoffMs, + RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + RETRY_BACKOFF_JITTER); } public ReadStateSummaryHandler( @@ -1226,7 +1221,7 @@ protected RPCType rpcType() { public class DeleteStateHandler extends PersisterStateManagerHandler { private final CompletableFuture result; - private final BackoffManager deleteStateBackoff; + private final ExponentialBackoffManager deleteStateBackoff; public DeleteStateHandler( String groupId, @@ -1239,7 +1234,12 @@ public DeleteStateHandler( ) { super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); this.result = result; - this.deleteStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + this.deleteStateBackoff = new ExponentialBackoffManager( + maxRPCRetryAttempts, + backoffMs, + RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + RETRY_BACKOFF_JITTER); } public DeleteStateHandler(