This code shows various retry approaches when using spring-cloud-stream with the apache-kafka binder.
In the Error Handling
section of the Spring Cloud Stream document, it says that when the application throws an exception, the Spring RetryTemplate is used to retry. If that fails,
then it is sent to the container's error handler in the binder's container, where the SeekToCurrentErrorHandler will also try multiple times before sending it to a
dead-letter topic. Think of these as two nested loops. This suggests two approaches
This approach does a "stateless" retry and sends the message to a DLQ if all attempts fail. Use this approach if ALL the retries can be completed within max.poll.interval.ms (typically 5 mins) of the consumer.
The retry configuration ...
spring.cloud.streams.bindings.XXXX:
consumer:
maxAttempts : 5
backOffInitialInterval : 10000 # 10seconds
backoffMultiplier : 2.0would allow a maximum of four retries before the total time exceeds the default 5mins of max.poll.interval.ms and triggers a consumer
rebalance. To avoid that, the maxAttempts should be set to 5 or less.
The following configuration enables dlq-topic as the dead-letter topic, as described the documentation
for enableDlq
spring.cloud.stream.kafka.bindings.XXX:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqPartitions: 1Note: With this approach, if enableDlq is not true, then the framework will use a default SeekToCurrentErrorHandler which will make 10 attempts and
then drop the message.
According to the documentation for enableDlq the messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].
This approach does a "stateful" retry and sends the message to DLQ if all attempts fail. With this approach there is no upper limit on the
TOTAL amount of time spent on retries. But the delay between INDIVIDUAL retries cannot exceed max.poll.interval.ms.
- Disable the retry in the spring-cloud-stream level by setting
spring.cloud.stream.bindings.xxxx.consumer.maxAttempts=1. - Ensure that
enableDlqis false ( or that the entry is not present in the application.yml file) - Configure the
SeekToCurrentErrorHandlerto use a FixedBackOff or ExponentialBackoff.
If using ExponentialBackoff, makes sure to
- Set the
maxIntervalto a value less thanmax.poll.interval.ms - Set
maxElapsedTimeto a reasonable value. This has a default ofLong.MAX_VALUEwhich would lead to infinite number of retries
For example, using
long initialInterval = 5000; // initial interval in milliseconds
double multiplier = 2.0; // retries will be done after 5, 10, 20, 40, 60 second interval
long maxInterval = 30 * 1000L; // maximum wait time between two retries.
// This will hence be capped to to 5, 10, 20, 30, 30
long maxElapsedTime = 80 * 1000L; // no retry will be done after this much time has elapsed
will lead to a maximum of 5 retries (i.e. 6 attempts) with a pause of 5, 10, 20, 30, and 30 seconds between retries.
See the Spring Cloud Stream documentation on Retry Template and Backoff
spring:
cloud:
stream:
bindings:
input:
binder: kafka
destination: test-topic
consumer:
maxAttempts: 4
backOffInitialInterval : 10000 # 10seconds
backOffMaxInterval : 100000 # 100 seconds
backoffMultiplier : 2.0 # retries will be done after 10 20 40 80 seconds. The total is 150 which is within the default max retry interval2020-09-12 14:57:13.654 INFO 22220 --- [container-0-C-1] com.example.demo.DemoConsumer : DemoConsumer::handle() was called
2020-09-12 14:57:23.660 INFO 22220 --- [container-0-C-1] com.example.demo.DemoConsumer : DemoConsumer::handle() was called
2020-09-12 14:57:43.665 INFO 22220 --- [container-0-C-1] com.example.demo.DemoConsumer : DemoConsumer::handle() was called
2020-09-12 14:58:23.671 INFO 22220 --- [container-0-C-1] com.example.demo.DemoConsumer : DemoConsumer::handle() was called
2020-09-12 14:58:23.675 ERROR 22220 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException:
Exception thrown while invoking DemoConsumer#handle[1 args];
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integr
The default SeekToCurrentErrorHandler in the listener's container is configured for 10 attempts with no waiting time in between (ie.,interval=0, maxAttempts=10 )
This backoff behavior and the recovery action (what to do if all retries fails) can be configured using ListenerContainerCustomizer as shown below
@Autowired
ProducerFactory<?,?> producerFactory;
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destination, group) -> {
BackOff backoff = ... some code to return a Backoff object ....
KafkaTemplate<String,String> template = new KafkaTemplate<>((ProducerFactory<String, String>) producerFactory);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
(KafkaOperations) template,
(rec, ex) -> new TopicPartition("dlq-topic",0));
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, backoff);
container.setErrorHandler(errorHandler);
};
}