From c3fc9195b8399a010b7339da49b5e807620e6e3f Mon Sep 17 00:00:00 2001 From: Nanook Date: Sat, 20 Jun 2026 00:32:11 +0000 Subject: [PATCH] fix: add exponential recovery backoff --- rust/NEXT_CHANGELOG.md | 2 + rust/README.md | 7 +- rust/examples/README.md | 2 +- rust/sdk/src/arrow_configuration.rs | 39 +++++++++- rust/sdk/src/arrow_stream.rs | 27 ++++++- rust/sdk/src/builder/stream_builder.rs | 41 +++++++++- rust/sdk/src/lib.rs | 10 ++- rust/sdk/src/stream_configuration.rs | 39 +++++++++- rust/sdk/src/stream_options.rs | 100 ++++++++++++++++++++++++- 9 files changed, 252 insertions(+), 15 deletions(-) diff --git a/rust/NEXT_CHANGELOG.md b/rust/NEXT_CHANGELOG.md index fb8dacd5..085a25b1 100644 --- a/rust/NEXT_CHANGELOG.md +++ b/rust/NEXT_CHANGELOG.md @@ -9,6 +9,7 @@ - Token caching for the default OAuth path. Tokens obtained via `.oauth(...)` are now cached per table on the `ZerobusSdk` instance and reused across stream creations and recoveries until they near expiry, instead of minting a fresh token on every stream. This reduces load on the Unity Catalog token endpoint for clients that create many short-lived streams. Caching is on by default and can be tuned via `ZerobusSdkBuilder::token_cache_enabled` and `ZerobusSdkBuilder::token_refresh_buffer`. - On a server-side authentication rejection during stream creation, the cached token is invalidated so the next attempt re-mints (re-checking grants at Unity Catalog), rather than reusing a rejected token until the refresh window. - `OAuthHeadersProvider::new` now caches tokens for the lifetime of the returned provider (previously it minted a fresh token on every call). Behavior is unchanged for the common path of constructing streams through `ZerobusSdk`, which already shares a cache. +- Stream creation and recovery now default to exponential backoff with jitter instead of a fixed retry interval. `recovery_backoff_ms` remains the fixed interval for `RetryStrategy::Fixed` and becomes the base delay for `RetryStrategy::ExponentialBackoffWithJitter` before jitter, capped by `max_recovery_backoff_ms` (default 30 seconds). ### Bug Fixes @@ -29,3 +30,4 @@ - Added `ZerobusSdkBuilder::token_cache_enabled(bool)` to enable or disable OAuth token caching (default enabled). - Added `ZerobusSdkBuilder::token_refresh_buffer(Duration)` to configure how long before a cached token's expiry it is refreshed (default 5 minutes). - Added `HeadersProvider::invalidate` with a default no-op implementation; the SDK calls it when the server rejects the supplied credentials so a provider can drop cached auth state. Existing trait implementations are unaffected. +- Added `RetryStrategy`, `retry_strategy(...)`, and `max_recovery_backoff_ms(...)` for Rust stream recovery configuration. diff --git a/rust/README.md b/rust/README.md index ea959fc5..12c72918 100644 --- a/rust/README.md +++ b/rust/README.md @@ -444,6 +444,7 @@ let mut stream = sdk .max_inflight_requests(10_000) .recovery_timeout_ms(15_000) .recovery_backoff_ms(2_000) + .max_recovery_backoff_ms(30_000) .recovery_retries(4) .build() .await?; @@ -483,6 +484,7 @@ let mut stream = sdk .max_inflight_requests(10_000) .recovery_timeout_ms(15_000) .recovery_backoff_ms(2_000) + .max_recovery_backoff_ms(30_000) .recovery_retries(4) .build() .await?; @@ -721,7 +723,9 @@ Also accepts `0` or `no`. | `max_inflight_requests` | `usize` | 1,000,000 | Maximum unacknowledged requests in flight | | `recovery` | `bool` | true | Enable automatic stream recovery on failure | | `recovery_timeout_ms` | `u64` | 15,000 | Timeout for recovery operations (ms) | -| `recovery_backoff_ms` | `u64` | 2,000 | Delay between recovery retry attempts (ms) | +| `recovery_backoff_ms` | `u64` | 2,000 | Base delay for exponential recovery retries before jitter, or interval for fixed recovery retries (ms) | +| `max_recovery_backoff_ms` | `u64` | 30,000 | Maximum exponential recovery retry delay (ms) | +| `retry_strategy` | `RetryStrategy` | `ExponentialBackoffWithJitter` | Recovery retry strategy (`Fixed` or `ExponentialBackoffWithJitter`) | | `recovery_retries` | `u32` | 4 | Maximum number of recovery attempts | | `server_lack_of_ack_timeout_ms` | `u64` | 60,000 | Timeout waiting for server acks (ms) | | `flush_timeout_ms` | `u64` | 300,000 | Timeout for flush operations (ms) | @@ -742,6 +746,7 @@ let stream = sdk .max_inflight_requests(50_000) .recovery(true) .recovery_timeout_ms(20_000) + .max_recovery_backoff_ms(30_000) .recovery_retries(5) .flush_timeout_ms(600_000) .build() diff --git a/rust/examples/README.md b/rust/examples/README.md index aeb868b1..9b01e171 100644 --- a/rust/examples/README.md +++ b/rust/examples/README.md @@ -257,7 +257,7 @@ if let Some(offset) = stream.ingest_records_offset(records).await? { ## Next Steps - Try ingesting larger batches of records -- Experiment with different `StreamConfigurationOptions` +- Experiment with different `StreamConfigurationOptions`, including `.max_recovery_backoff_ms(...)` and `.retry_strategy(...)` for recovery tuning - Add error handling and retry logic - Implement monitoring and metrics - Use the SDK in a production application diff --git a/rust/sdk/src/arrow_configuration.rs b/rust/sdk/src/arrow_configuration.rs index 0d02840d..55bddbef 100644 --- a/rust/sdk/src/arrow_configuration.rs +++ b/rust/sdk/src/arrow_configuration.rs @@ -3,7 +3,7 @@ //! **Beta**: Arrow Flight ingestion is in Beta. The API is stabilising but may //! still change before reaching GA. -use crate::stream_options::defaults; +use crate::stream_options::{defaults, RetryStrategy}; use arrow_ipc::CompressionType; /// Configuration options for Arrow Flight stream creation and operation. @@ -53,11 +53,25 @@ pub struct ArrowStreamConfigurationOptions { /// Backoff time in milliseconds between stream recovery retry attempts. /// - /// The SDK will wait this duration before attempting another recovery after a failure. + /// For fixed retry strategy, the SDK will wait this duration before each retry. + /// For exponential retry strategy, this is the base delay before jitter is applied; + /// retries use full jitter in the range `0..=computed_delay`. /// /// Default: 2,000 (2 seconds) pub recovery_backoff_ms: u64, + /// Maximum recovery backoff time in milliseconds. + /// + /// This caps the exponential retry strategy. It is ignored by the fixed strategy. + /// + /// Default: 30,000 (30 seconds) + pub max_recovery_backoff_ms: u64, + + /// Retry strategy for stream creation and recovery. + /// + /// Default: `RetryStrategy::ExponentialBackoffWithJitter` + pub retry_strategy: RetryStrategy, + /// Maximum number of recovery retry attempts before giving up. /// /// After this many failed attempts, the stream will close and return an error. @@ -122,6 +136,8 @@ impl Default for ArrowStreamConfigurationOptions { recovery: defaults::RECOVERY, recovery_timeout_ms: defaults::RECOVERY_TIMEOUT_MS, recovery_backoff_ms: defaults::RECOVERY_BACKOFF_MS, + max_recovery_backoff_ms: defaults::MAX_RECOVERY_BACKOFF_MS, + retry_strategy: RetryStrategy::default(), recovery_retries: defaults::RECOVERY_RETRIES, server_lack_of_ack_timeout_ms: defaults::SERVER_LACK_OF_ACK_TIMEOUT_MS, flush_timeout_ms: defaults::FLUSH_TIMEOUT_MS, @@ -131,3 +147,22 @@ impl Default for ArrowStreamConfigurationOptions { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn defaults_use_exponential_retry_strategy_with_cap() { + let options = ArrowStreamConfigurationOptions::default(); + + assert_eq!( + options.retry_strategy, + RetryStrategy::ExponentialBackoffWithJitter + ); + assert_eq!( + options.max_recovery_backoff_ms, + defaults::MAX_RECOVERY_BACKOFF_MS + ); + } +} diff --git a/rust/sdk/src/arrow_stream.rs b/rust/sdk/src/arrow_stream.rs index 70ee1d26..7e10e1dd 100644 --- a/rust/sdk/src/arrow_stream.rs +++ b/rust/sdk/src/arrow_stream.rs @@ -21,7 +21,6 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{sleep, Duration}; -use tokio_retry::strategy::FixedInterval; use tokio_retry::RetryIf; use tonic::transport::Channel; use tracing::{debug, error, info, instrument, warn}; @@ -35,6 +34,7 @@ use crate::arrow_metadata::{FlightAckMetadata, FlightBatchMetadata}; use crate::errors::ZerobusError; use crate::headers_provider::HeadersProvider; use crate::offset_generator::{OffsetId, OffsetIdGenerator}; +use crate::stream_options; use crate::tls_config::TlsConfig; use crate::ZerobusResult; @@ -304,8 +304,12 @@ impl ZerobusArrowStream { let table_properties = stream.table_properties.clone(); let options = stream.options.clone(); let headers_provider = Arc::clone(&stream.headers_provider); - let strategy = FixedInterval::from_millis(options.recovery_backoff_ms) - .take(options.recovery_retries as usize); + let strategy = stream_options::recovery_retry_strategy( + options.retry_strategy, + options.recovery_backoff_ms, + options.max_recovery_backoff_ms, + ) + .take(options.recovery_retries as usize); let create_attempt = || { let endpoint = endpoint.clone(); @@ -618,6 +622,13 @@ impl ZerobusArrowStream { let ack_timeout = Duration::from_millis(options.server_lack_of_ack_timeout_ms); let mut response_stream = initial_response_stream; + let mut recovery_strategy = stream_options::recovery_retry_strategy( + options.retry_strategy, + options.recovery_backoff_ms, + options.max_recovery_backoff_ms, + ) + .take(options.recovery_retries as usize); + loop { if is_closed.load(Ordering::Relaxed) { debug!("Supervisor: Stream closed, exiting"); @@ -677,7 +688,9 @@ impl ZerobusArrowStream { is_paused.store(true, Ordering::Relaxed); // Backoff before retry. - sleep(Duration::from_millis(options.recovery_backoff_ms)).await; + if let Some(delay) = recovery_strategy.next() { + sleep(delay).await; + } // Clear the server error. let _ = server_error_tx.send(None); @@ -712,6 +725,12 @@ impl ZerobusArrowStream { Ok(Ok(new_response_stream)) => { info!("Supervisor: Recovery successful, resuming"); recovery_attempts.store(0, Ordering::Relaxed); + recovery_strategy = stream_options::recovery_retry_strategy( + options.retry_strategy, + options.recovery_backoff_ms, + options.max_recovery_backoff_ms, + ) + .take(options.recovery_retries as usize); // is_paused was already cleared inside reconnect(). response_stream = new_response_stream; // Loop continues with new stream. diff --git a/rust/sdk/src/builder/stream_builder.rs b/rust/sdk/src/builder/stream_builder.rs index e90ab538..810fd8fb 100644 --- a/rust/sdk/src/builder/stream_builder.rs +++ b/rust/sdk/src/builder/stream_builder.rs @@ -25,6 +25,7 @@ use crate::callbacks::AckCallback; use crate::databricks::zerobus::RecordType; use crate::headers_provider::{HeadersProvider, OAuthHeadersProvider}; use crate::stream_configuration::StreamConfigurationOptions; +use crate::stream_options::RetryStrategy; use crate::{TableProperties, ZerobusError, ZerobusResult, ZerobusSdk, ZerobusStream}; #[cfg(feature = "arrow-flight")] @@ -195,6 +196,9 @@ impl<'a> StreamBuilder<'a> { } /// Set the backoff time in milliseconds between recovery retries. + /// + /// This is the interval for fixed retry strategy and the initial delay for + /// exponential retry strategy. pub fn recovery_backoff_ms(mut self, ms: u64) -> Self { self.grpc_config.recovery_backoff_ms = ms; #[cfg(feature = "arrow-flight")] @@ -204,6 +208,26 @@ impl<'a> StreamBuilder<'a> { self } + /// Set the maximum backoff time in milliseconds for exponential recovery retries. + pub fn max_recovery_backoff_ms(mut self, ms: u64) -> Self { + self.grpc_config.max_recovery_backoff_ms = ms; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.max_recovery_backoff_ms = ms; + } + self + } + + /// Set the retry strategy for stream creation and recovery. + pub fn retry_strategy(mut self, strategy: RetryStrategy) -> Self { + self.grpc_config.retry_strategy = strategy; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.retry_strategy = strategy; + } + self + } + /// Set the maximum number of recovery retry attempts. pub fn recovery_retries(mut self, n: u32) -> Self { self.grpc_config.recovery_retries = n; @@ -503,7 +527,7 @@ mod tests { #[test] fn config_setters_chain() { let sdk = test_sdk(); - let _builder = sdk + let builder = sdk .stream_builder() .table("t") .oauth("a", "b") @@ -511,12 +535,18 @@ mod tests { .recovery(false) .recovery_timeout_ms(10_000) .recovery_backoff_ms(1_000) + .max_recovery_backoff_ms(30_000) + .retry_strategy(RetryStrategy::Fixed) .recovery_retries(3) .server_lack_of_ack_timeout_ms(30_000) .flush_timeout_ms(60_000) .max_inflight_requests(500) .stream_paused_max_wait_time_ms(Some(5_000)) .callback_max_wait_time_ms(None); + + assert_eq!(builder.grpc_config.recovery_backoff_ms, 1_000); + assert_eq!(builder.grpc_config.max_recovery_backoff_ms, 30_000); + assert_eq!(builder.grpc_config.retry_strategy, RetryStrategy::Fixed); } #[test] @@ -525,6 +555,11 @@ mod tests { let builder = sdk.stream_builder().table("t").oauth("a", "b").json(); assert_eq!(builder.grpc_config.max_inflight_requests, 1_000_000); assert!(builder.grpc_config.recovery); + assert_eq!( + builder.grpc_config.retry_strategy, + RetryStrategy::ExponentialBackoffWithJitter + ); + assert_eq!(builder.grpc_config.max_recovery_backoff_ms, 30_000); } #[tokio::test] @@ -654,6 +689,8 @@ mod tests { .recovery(false) .recovery_timeout_ms(5_000) .recovery_backoff_ms(500) + .max_recovery_backoff_ms(5_000) + .retry_strategy(RetryStrategy::Fixed) .recovery_retries(2) .server_lack_of_ack_timeout_ms(10_000) .flush_timeout_ms(20_000) @@ -661,6 +698,8 @@ mod tests { assert!(!builder.arrow_config.recovery); assert_eq!(builder.arrow_config.recovery_timeout_ms, 5_000); assert_eq!(builder.arrow_config.recovery_backoff_ms, 500); + assert_eq!(builder.arrow_config.max_recovery_backoff_ms, 5_000); + assert_eq!(builder.arrow_config.retry_strategy, RetryStrategy::Fixed); assert_eq!(builder.arrow_config.recovery_retries, 2); assert_eq!(builder.arrow_config.server_lack_of_ack_timeout_ms, 10_000); assert_eq!(builder.arrow_config.flush_timeout_ms, 20_000); diff --git a/rust/sdk/src/lib.rs b/rust/sdk/src/lib.rs index f69b70a1..82812bff 100644 --- a/rust/sdk/src/lib.rs +++ b/rust/sdk/src/lib.rs @@ -66,7 +66,6 @@ use std::sync::Arc; use prost::Message; use tokio::sync::RwLock; use tokio::time::Duration; -use tokio_retry::strategy::FixedInterval; use tokio_retry::RetryIf; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; @@ -101,6 +100,7 @@ pub use record_types::{ ProtoBytes, ProtoEncodedRecord, ProtoMessage, }; pub use stream_configuration::StreamConfigurationOptions; +pub use stream_options::RetryStrategy; #[cfg(feature = "testing")] pub use tls_config::NoTlsConfig; pub use tls_config::{SecureTlsConfig, TlsConfig}; @@ -681,8 +681,12 @@ impl ZerobusStream { let landing_zone_recovery = Arc::clone(&landing_zone); // 1. Create a stream. - let strategy = FixedInterval::from_millis(options.recovery_backoff_ms) - .take(options.recovery_retries as usize); + let strategy = stream_options::recovery_retry_strategy( + options.retry_strategy, + options.recovery_backoff_ms, + options.max_recovery_backoff_ms, + ) + .take(options.recovery_retries as usize); let create_attempt = || { let channel = channel.clone(); diff --git a/rust/sdk/src/stream_configuration.rs b/rust/sdk/src/stream_configuration.rs index 15c6f8c5..4475ca29 100644 --- a/rust/sdk/src/stream_configuration.rs +++ b/rust/sdk/src/stream_configuration.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::callbacks::AckCallback; use crate::databricks::zerobus::RecordType; -use crate::stream_options::defaults; +use crate::stream_options::{defaults, RetryStrategy}; /// Configuration options for stream creation, recovery of broken streams and flushing. /// @@ -52,11 +52,25 @@ pub struct StreamConfigurationOptions { /// Backoff time in milliseconds between stream recovery retry attempts. /// - /// The SDK will wait this duration before attempting another recovery after a failure. + /// For fixed retry strategy, the SDK will wait this duration before each retry. + /// For exponential retry strategy, this is the base delay before jitter is applied; + /// retries use full jitter in the range `0..=computed_delay`. /// /// Default: 2,000 (2 seconds) pub recovery_backoff_ms: u64, + /// Maximum recovery backoff time in milliseconds. + /// + /// This caps the exponential retry strategy. It is ignored by the fixed strategy. + /// + /// Default: 30,000 (30 seconds) + pub max_recovery_backoff_ms: u64, + + /// Retry strategy for stream creation and recovery. + /// + /// Default: `RetryStrategy::ExponentialBackoffWithJitter` + pub retry_strategy: RetryStrategy, + /// Maximum number of recovery retry attempts before giving up. /// /// After this many failed attempts, the stream will close and return an error. @@ -166,6 +180,8 @@ impl Default for StreamConfigurationOptions { recovery: defaults::RECOVERY, recovery_timeout_ms: defaults::RECOVERY_TIMEOUT_MS, recovery_backoff_ms: defaults::RECOVERY_BACKOFF_MS, + max_recovery_backoff_ms: defaults::MAX_RECOVERY_BACKOFF_MS, + retry_strategy: RetryStrategy::default(), recovery_retries: defaults::RECOVERY_RETRIES, server_lack_of_ack_timeout_ms: defaults::SERVER_LACK_OF_ACK_TIMEOUT_MS, flush_timeout_ms: defaults::FLUSH_TIMEOUT_MS, @@ -176,3 +192,22 @@ impl Default for StreamConfigurationOptions { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn defaults_use_exponential_retry_strategy_with_cap() { + let options = StreamConfigurationOptions::default(); + + assert_eq!( + options.retry_strategy, + RetryStrategy::ExponentialBackoffWithJitter + ); + assert_eq!( + options.max_recovery_backoff_ms, + defaults::MAX_RECOVERY_BACKOFF_MS + ); + } +} diff --git a/rust/sdk/src/stream_options.rs b/rust/sdk/src/stream_options.rs index 0f09003c..d9a6e8c6 100644 --- a/rust/sdk/src/stream_options.rs +++ b/rust/sdk/src/stream_options.rs @@ -2,6 +2,54 @@ //! //! This module provides common configuration constants shared between gRPC and Arrow Flight streams. +use std::time::Duration; + +use tokio_retry::strategy::{jitter, ExponentialBackoff, FixedInterval}; + +/// Retry strategy for stream creation and recovery. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[non_exhaustive] +pub enum RetryStrategy { + /// Retry after the configured `recovery_backoff_ms` interval. + Fixed, + /// Retry with exponential backoff and jitter, starting from `recovery_backoff_ms`. + #[default] + ExponentialBackoffWithJitter, +} + +const EXPONENTIAL_BACKOFF_BASE: u64 = 2; + +pub(crate) fn recovery_retry_strategy( + retry_strategy: RetryStrategy, + recovery_backoff_ms: u64, + max_recovery_backoff_ms: u64, +) -> Box + Send> { + match retry_strategy { + RetryStrategy::Fixed => Box::new(FixedInterval::from_millis(recovery_backoff_ms)), + RetryStrategy::ExponentialBackoffWithJitter => Box::new( + ExponentialBackoff::from_millis(EXPONENTIAL_BACKOFF_BASE) + .map(move |duration| { + capped_exponential_delay(duration, recovery_backoff_ms, max_recovery_backoff_ms) + }) + .map(jitter), + ), + } +} + +fn capped_exponential_delay( + exponential_delay: Duration, + recovery_backoff_ms: u64, + max_recovery_backoff_ms: u64, +) -> Duration { + let multiplier = exponential_delay.as_millis() / u128::from(EXPONENTIAL_BACKOFF_BASE); + let millis = u128::from(recovery_backoff_ms) + .saturating_mul(multiplier) + .min(u128::from(max_recovery_backoff_ms)) + .min(u128::from(u64::MAX)); + + Duration::from_millis(millis as u64) +} + /// Default values for stream configuration options. /// These are shared between gRPC and Arrow Flight streams. pub mod defaults { @@ -9,8 +57,10 @@ pub mod defaults { pub const RECOVERY: bool = true; /// Default: 15 seconds per recovery attempt pub const RECOVERY_TIMEOUT_MS: u64 = 15_000; - /// Default: 2 seconds backoff between retries + /// Default: 2 seconds initial backoff between retries pub const RECOVERY_BACKOFF_MS: u64 = 2_000; + /// Default: cap exponential recovery backoff at 30 seconds + pub const MAX_RECOVERY_BACKOFF_MS: u64 = 30_000; /// Default: 4 retry attempts pub const RECOVERY_RETRIES: u32 = 4; /// Default: 60 seconds lack of ack timeout @@ -23,3 +73,51 @@ pub mod defaults { /// Default: 5 seconds callback timeout pub const CALLBACK_MAX_WAIT_TIME_MS: u64 = 5_000; } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn retry_strategy_defaults_to_exponential_with_jitter() { + assert_eq!( + RetryStrategy::default(), + RetryStrategy::ExponentialBackoffWithJitter + ); + assert_eq!(defaults::MAX_RECOVERY_BACKOFF_MS, 30_000); + } + + #[test] + fn fixed_retry_strategy_uses_configured_interval() { + let mut strategy = recovery_retry_strategy(RetryStrategy::Fixed, 123, 30_000); + + assert_eq!(strategy.next(), Some(Duration::from_millis(123))); + assert_eq!(strategy.next(), Some(Duration::from_millis(123))); + } + + #[test] + fn exponential_retry_strategy_starts_from_initial_delay_and_caps() { + assert_eq!( + capped_exponential_delay(Duration::from_millis(2), 100, 1_000), + Duration::from_millis(100) + ); + assert_eq!( + capped_exponential_delay(Duration::from_millis(4), 100, 1_000), + Duration::from_millis(200) + ); + assert_eq!( + capped_exponential_delay(Duration::from_millis(16), 100, 250), + Duration::from_millis(250) + ); + } + + #[test] + fn exponential_retry_strategy_applies_jitter_under_cap() { + let mut strategy = + recovery_retry_strategy(RetryStrategy::ExponentialBackoffWithJitter, 100, 250); + + for _ in 0..10 { + assert!(strategy.next().unwrap() <= Duration::from_millis(250)); + } + } +}