Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.utils;

/**
* Manages retry attempts and exponential backoff for requests.
*/
public class ExponentialBackoffManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really require a new class and can't use some existing mechanism?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PersisterStateManager already had a subclass called BackOffManager, and I requred a similar thing. Instead of having 2 subclasses at different places, I think having a utility class is better. It could be used in future elsewhere as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public class now, please add a few unit tests.

private final int maxAttempts;
private int attempts;
private final ExponentialBackoff backoff;

public ExponentialBackoffManager(int maxAttempts, long initialInterval, int multiplier, long maxInterval, double jitter) {
this.maxAttempts = maxAttempts;
this.backoff = new ExponentialBackoff(
initialInterval,
multiplier,
maxInterval,
jitter);
}

public void incrementAttempt() {
attempts++;
}

public void resetAttempts() {
attempts = 0;
}

public boolean canAttempt() {
return attempts < maxAttempts;
}

public long backOff() {
return this.backoff.backoff(attempts);
}

public int attempts() {
return attempts;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.utils;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ExponentialBackoffManagerTest {

private static final ArrayList<Long> BACKOFF_LIST = new ArrayList<>(List.of(100L, 200L, 400L, 800L, 1600L));

@Test
public void testInitialState() {
ExponentialBackoffManager manager = new ExponentialBackoffManager(
5, 100, 2, 1000, 0.0);
assertEquals(0, manager.attempts());
assertTrue(manager.canAttempt());
}

@Test
public void testIncrementAttempt() {
ExponentialBackoffManager manager = new ExponentialBackoffManager(
5, 100, 2, 1000, 0.0);
assertEquals(0, manager.attempts());
manager.incrementAttempt();
assertEquals(1, manager.attempts());
}

@Test
public void testResetAttempts() {
ExponentialBackoffManager manager = new ExponentialBackoffManager(
5, 100, 2, 1000, 0.0);
manager.incrementAttempt();
manager.incrementAttempt();
manager.incrementAttempt();
assertEquals(3, manager.attempts());

manager.resetAttempts();
assertEquals(0, manager.attempts());
assertTrue(manager.canAttempt());
}

@Test
public void testCanAttempt() {
ExponentialBackoffManager manager = new ExponentialBackoffManager(
3, 100, 2, 1000, 0.0);
// Initially can attempt
assertTrue(manager.canAttempt());
assertEquals(0, manager.attempts());

manager.incrementAttempt();
manager.incrementAttempt();
manager.incrementAttempt();
// After all retry attempts are exhausted
assertFalse(manager.canAttempt());
assertEquals(3, manager.attempts());
}

@Test
public void testBackOffWithoutJitter() {
ExponentialBackoffManager manager = new ExponentialBackoffManager(
5, 100, 2, 1000, 0.0);
for (int i = 0; i < 5; i++) {
long backoff = manager.backOff();
// without jitter, the backoff values should be exact multiples.
assertEquals(Math.min(1000L, BACKOFF_LIST.get(i)), backoff);
manager.incrementAttempt();
}
}

@Test
public void testBackOffWithJitter() {
ExponentialBackoffManager manager = new ExponentialBackoffManager(
5, 100, 2, 1000, 0.2);
for (int i = 0; i < 5; i++) {
long backoff = manager.backOff();
// with jitter, the backoff values should be within 20% of the expected value.
assertTrue(backoff >= 0.8 * Math.min(1000L, BACKOFF_LIST.get(i)));
assertTrue(backoff <= 1.2 * Math.min(1000L, BACKOFF_LIST.get(i)));
manager.incrementAttempt();
}
}
}
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ class BrokerServer(
new LogContext(s"[NetworkPartitionMetadataClient broker=${config.brokerId}]")
),
Time.SYSTEM,
config.interBrokerListenerName()
config.interBrokerListenerName(),
new SystemTimerReaper("network-partition-metadata-client-reaper", new SystemTimer("network-partition-metadata-client"))
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.ExponentialBackoffManager;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,20 +58,42 @@ public class NetworkPartitionMetadataClient implements PartitionMetadataClient {

private static final Logger log = LoggerFactory.getLogger(NetworkPartitionMetadataClient.class);

private static final long REQUEST_BACKOFF_MS = 1_000L;
private static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
private static final int MAX_RETRY_ATTEMPTS = 5;

private final MetadataCache metadataCache;
private final Supplier<KafkaClient> networkClientSupplier;
private final Time time;
private final ListenerName listenerName;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private volatile SendThread sendThread;
private final Timer timer;

public NetworkPartitionMetadataClient(MetadataCache metadataCache,
Supplier<KafkaClient> networkClientSupplier,
Time time, ListenerName listenerName) {
Time time, ListenerName listenerName, Timer timer) {
if (metadataCache == null) {
throw new IllegalArgumentException("MetadataCache must not be null.");
}
if (networkClientSupplier == null) {
throw new IllegalArgumentException("NetworkClientSupplier must not be null.");
}
if (time == null) {
throw new IllegalArgumentException("Time must not be null.");
}
if (listenerName == null) {
throw new IllegalArgumentException("ListenerName must not be null.");
}
if (timer == null) {
throw new IllegalArgumentException("Timer must not be null.");
}

this.metadataCache = metadataCache;
this.networkClientSupplier = networkClientSupplier;
this.time = time;
this.listenerName = listenerName;
this.timer = timer;
}

@Override
Expand Down Expand Up @@ -125,6 +151,7 @@ public Map<TopicPartition, CompletableFuture<OffsetResponse>> listLatestOffsets(
public void close() {
// Only close sendThread if it was initialized. Note, close is called only during broker shutdown, so need
// for further synchronization here.
Utils.closeQuietly(timer, "NetworkPartitionMetadataClient timer");
if (!initialized.get()) {
return;
}
Expand Down Expand Up @@ -186,14 +213,18 @@ private ListOffsetsRequest.Builder createListOffsetsRequest(List<TopicPartition>
* Handles the response from a ListOffsets request.
*/
// Visible for Testing.
void handleResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>> partitionFutures, ClientResponse clientResponse) {
void handleResponse(PendingRequest pendingRequest, ClientResponse clientResponse) {
// Handle error responses first
if (maybeHandleErrorResponse(partitionFutures, clientResponse)) {
if (maybeHandleErrorResponse(pendingRequest, clientResponse)) {
return;
}

log.debug("ListOffsets response received successfully - {}", clientResponse);
// Reset retry attempts on success
pendingRequest.backoffManager().resetAttempts();

ListOffsetsResponse response = (ListOffsetsResponse) clientResponse.responseBody();
Map<TopicPartition, CompletableFuture<OffsetResponse>> partitionFutures = pendingRequest.futures();

for (ListOffsetsTopicResponse topicResponse : response.topics()) {
String topicName = topicResponse.name();
Expand All @@ -216,11 +247,14 @@ void handleResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>> parti
}

/**
* Handles error responses by completing all associated futures with an error. Returns true if an error was
* handled. Otherwise, returns false.
* Handles error responses by completing all associated futures with an error or retrying the request.
* Returns true if an error was handled. Otherwise, returns false.
*/
private boolean maybeHandleErrorResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>> partitionFutures, ClientResponse clientResponse) {
private boolean maybeHandleErrorResponse(PendingRequest pendingRequest, ClientResponse clientResponse) {
Map<TopicPartition, CompletableFuture<OffsetResponse>> partitionFutures = pendingRequest.futures();
Errors error;
boolean shouldRetry = false;

if (clientResponse == null) {
log.error("Response for ListOffsets for topicPartitions: {} is null", partitionFutures.keySet());
error = Errors.UNKNOWN_SERVER_ERROR;
Expand All @@ -231,11 +265,13 @@ private boolean maybeHandleErrorResponse(Map<TopicPartition, CompletableFuture<O
log.error("Version mismatch exception", clientResponse.versionMismatch());
error = Errors.UNKNOWN_SERVER_ERROR;
} else if (clientResponse.wasDisconnected()) {
log.error("Response for ListOffsets for TopicPartitions: {} was disconnected - {}.", partitionFutures.keySet(), clientResponse);
log.debug("Response for ListOffsets for TopicPartitions: {} was disconnected - {}.", partitionFutures.keySet(), clientResponse);
error = Errors.NETWORK_EXCEPTION;
shouldRetry = true;
} else if (clientResponse.wasTimedOut()) {
log.error("Response for ListOffsets for TopicPartitions: {} timed out - {}.", partitionFutures.keySet(), clientResponse);
log.debug("Response for ListOffsets for TopicPartitions: {} timed out - {}.", partitionFutures.keySet(), clientResponse);
error = Errors.REQUEST_TIMED_OUT;
shouldRetry = true;
} else if (!clientResponse.hasResponse()) {
log.error("Response for ListOffsets for TopicPartitions: {} has no response - {}.", partitionFutures.keySet(), clientResponse);
error = Errors.UNKNOWN_SERVER_ERROR;
Expand All @@ -244,16 +280,63 @@ private boolean maybeHandleErrorResponse(Map<TopicPartition, CompletableFuture<O
return false;
}

// For retriable errors (disconnected or timed out), attempt retry if possible
if (shouldRetry) {
ExponentialBackoffManager backoffManager = pendingRequest.backoffManager();
if (backoffManager.canAttempt()) {
backoffManager.incrementAttempt();
long backoffMs = backoffManager.backOff();
log.debug("Retrying ListOffsets request for TopicPartitions: {} after {} ms (attempt {}/{})",
partitionFutures.keySet(), backoffMs, backoffManager.attempts(), MAX_RETRY_ATTEMPTS);
timer.add(new RetryTimerTask(backoffMs, pendingRequest));
return true;
} else {
log.error("Exhausted max retries ({}) for ListOffsets request for TopicPartitions: {}",
MAX_RETRY_ATTEMPTS, partitionFutures.keySet());
}
}

// Complete all futures with error (either non-retriable error or exhausted retries)
partitionFutures.forEach((tp, future) -> future.complete(new OffsetResponse(-1, error)));
return true;
}

/**
* Tracks a pending ListOffsets request and its associated futures.
*/
private record PendingRequest(Node node,
Map<TopicPartition, CompletableFuture<OffsetResponse>> futures,
ListOffsetsRequest.Builder requestBuilder) {
// Visible for testing.
record PendingRequest(Node node,
Map<TopicPartition, CompletableFuture<OffsetResponse>> futures,
ListOffsetsRequest.Builder requestBuilder,
ExponentialBackoffManager backoffManager) {
PendingRequest(Node node,
Map<TopicPartition, CompletableFuture<OffsetResponse>> futures,
ListOffsetsRequest.Builder requestBuilder) {
this(node, futures, requestBuilder, new ExponentialBackoffManager(
MAX_RETRY_ATTEMPTS,
REQUEST_BACKOFF_MS,
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
REQUEST_BACKOFF_MAX_MS,
CommonClientConfigs.RETRY_BACKOFF_JITTER));
}
}

/**
* Timer task for retrying failed requests after backoff.
*/
private final class RetryTimerTask extends TimerTask {
private final PendingRequest pendingRequest;

RetryTimerTask(long delayMs, PendingRequest pendingRequest) {
super(delayMs);
this.pendingRequest = pendingRequest;
}

@Override
public void run() {
sendThread.enqueue(pendingRequest);
sendThread.wakeup();
}
}

private class SendThread extends InterBrokerSendThread {
Expand Down Expand Up @@ -286,8 +369,7 @@ public Collection<RequestAndCompletionHandler> generateRequests() {
time.hiResClockMs(),
current.node,
requestBuilder,
response -> handleResponse(current.futures, response)
);
response -> handleResponse(current, response));

requests.add(requestHandler);
}
Expand Down
Loading