Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Token caching for the default OAuth path. Tokens obtained via `.oauth(...)` are now cached per table on the `ZerobusSdk` instance and reused across stream creations and recoveries until they near expiry, instead of minting a fresh token on every stream. This reduces load on the Unity Catalog token endpoint for clients that create many short-lived streams. Caching is on by default and can be tuned via `ZerobusSdkBuilder::token_cache_enabled` and `ZerobusSdkBuilder::token_refresh_buffer`.
- On a server-side authentication rejection during stream creation, the cached token is invalidated so the next attempt re-mints (re-checking grants at Unity Catalog), rather than reusing a rejected token until the refresh window.
- `OAuthHeadersProvider::new` now caches tokens for the lifetime of the returned provider (previously it minted a fresh token on every call). Behavior is unchanged for the common path of constructing streams through `ZerobusSdk`, which already shares a cache.
- Stream creation and recovery now default to exponential backoff with jitter instead of a fixed retry interval. `recovery_backoff_ms` remains the fixed interval for `RetryStrategy::Fixed` and becomes the base delay for `RetryStrategy::ExponentialBackoffWithJitter` before jitter, capped by `max_recovery_backoff_ms` (default 30 seconds).

### Bug Fixes

Expand All @@ -29,3 +30,4 @@
- Added `ZerobusSdkBuilder::token_cache_enabled(bool)` to enable or disable OAuth token caching (default enabled).
- Added `ZerobusSdkBuilder::token_refresh_buffer(Duration)` to configure how long before a cached token's expiry it is refreshed (default 5 minutes).
- Added `HeadersProvider::invalidate` with a default no-op implementation; the SDK calls it when the server rejects the supplied credentials so a provider can drop cached auth state. Existing trait implementations are unaffected.
- Added `RetryStrategy`, `retry_strategy(...)`, and `max_recovery_backoff_ms(...)` for Rust stream recovery configuration.
7 changes: 6 additions & 1 deletion rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ let mut stream = sdk
.max_inflight_requests(10_000)
.recovery_timeout_ms(15_000)
.recovery_backoff_ms(2_000)
.max_recovery_backoff_ms(30_000)
.recovery_retries(4)
.build()
.await?;
Expand Down Expand Up @@ -483,6 +484,7 @@ let mut stream = sdk
.max_inflight_requests(10_000)
.recovery_timeout_ms(15_000)
.recovery_backoff_ms(2_000)
.max_recovery_backoff_ms(30_000)
.recovery_retries(4)
.build()
.await?;
Expand Down Expand Up @@ -721,7 +723,9 @@ Also accepts `0` or `no`.
| `max_inflight_requests` | `usize` | 1,000,000 | Maximum unacknowledged requests in flight |
| `recovery` | `bool` | true | Enable automatic stream recovery on failure |
| `recovery_timeout_ms` | `u64` | 15,000 | Timeout for recovery operations (ms) |
| `recovery_backoff_ms` | `u64` | 2,000 | Delay between recovery retry attempts (ms) |
| `recovery_backoff_ms` | `u64` | 2,000 | Base delay for exponential recovery retries before jitter, or interval for fixed recovery retries (ms) |
| `max_recovery_backoff_ms` | `u64` | 30,000 | Maximum exponential recovery retry delay (ms) |
| `retry_strategy` | `RetryStrategy` | `ExponentialBackoffWithJitter` | Recovery retry strategy (`Fixed` or `ExponentialBackoffWithJitter`) |
| `recovery_retries` | `u32` | 4 | Maximum number of recovery attempts |
| `server_lack_of_ack_timeout_ms` | `u64` | 60,000 | Timeout waiting for server acks (ms) |
| `flush_timeout_ms` | `u64` | 300,000 | Timeout for flush operations (ms) |
Expand All @@ -742,6 +746,7 @@ let stream = sdk
.max_inflight_requests(50_000)
.recovery(true)
.recovery_timeout_ms(20_000)
.max_recovery_backoff_ms(30_000)
.recovery_retries(5)
.flush_timeout_ms(600_000)
.build()
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ if let Some(offset) = stream.ingest_records_offset(records).await? {
## Next Steps

- Try ingesting larger batches of records
- Experiment with different `StreamConfigurationOptions`
- Experiment with different `StreamConfigurationOptions`, including `.max_recovery_backoff_ms(...)` and `.retry_strategy(...)` for recovery tuning
- Add error handling and retry logic
- Implement monitoring and metrics
- Use the SDK in a production application
Expand Down
39 changes: 37 additions & 2 deletions rust/sdk/src/arrow_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! **Beta**: Arrow Flight ingestion is in Beta. The API is stabilising but may
//! still change before reaching GA.

use crate::stream_options::defaults;
use crate::stream_options::{defaults, RetryStrategy};
use arrow_ipc::CompressionType;

/// Configuration options for Arrow Flight stream creation and operation.
Expand Down Expand Up @@ -53,11 +53,25 @@ pub struct ArrowStreamConfigurationOptions {

/// Backoff time in milliseconds between stream recovery retry attempts.
///
/// The SDK will wait this duration before attempting another recovery after a failure.
/// For fixed retry strategy, the SDK will wait this duration before each retry.
/// For exponential retry strategy, this is the base delay before jitter is applied;
/// retries use full jitter in the range `0..=computed_delay`.
///
/// Default: 2,000 (2 seconds)
pub recovery_backoff_ms: u64,

/// Maximum recovery backoff time in milliseconds.
///
/// This caps the exponential retry strategy. It is ignored by the fixed strategy.
///
/// Default: 30,000 (30 seconds)
pub max_recovery_backoff_ms: u64,

/// Retry strategy for stream creation and recovery.
///
/// Default: `RetryStrategy::ExponentialBackoffWithJitter`
pub retry_strategy: RetryStrategy,

/// Maximum number of recovery retry attempts before giving up.
///
/// After this many failed attempts, the stream will close and return an error.
Expand Down Expand Up @@ -122,6 +136,8 @@ impl Default for ArrowStreamConfigurationOptions {
recovery: defaults::RECOVERY,
recovery_timeout_ms: defaults::RECOVERY_TIMEOUT_MS,
recovery_backoff_ms: defaults::RECOVERY_BACKOFF_MS,
max_recovery_backoff_ms: defaults::MAX_RECOVERY_BACKOFF_MS,
retry_strategy: RetryStrategy::default(),
recovery_retries: defaults::RECOVERY_RETRIES,
server_lack_of_ack_timeout_ms: defaults::SERVER_LACK_OF_ACK_TIMEOUT_MS,
flush_timeout_ms: defaults::FLUSH_TIMEOUT_MS,
Expand All @@ -131,3 +147,22 @@ impl Default for ArrowStreamConfigurationOptions {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn defaults_use_exponential_retry_strategy_with_cap() {
let options = ArrowStreamConfigurationOptions::default();

assert_eq!(
options.retry_strategy,
RetryStrategy::ExponentialBackoffWithJitter
);
assert_eq!(
options.max_recovery_backoff_ms,
defaults::MAX_RECOVERY_BACKOFF_MS
);
}
}
27 changes: 23 additions & 4 deletions rust/sdk/src/arrow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use bytes::Bytes;
use futures::{Stream, StreamExt};
use tokio::sync::{mpsc, watch, Mutex};
use tokio::time::{sleep, Duration};
use tokio_retry::strategy::FixedInterval;
use tokio_retry::RetryIf;
use tonic::transport::Channel;
use tracing::{debug, error, info, instrument, warn};
Expand All @@ -35,6 +34,7 @@ use crate::arrow_metadata::{FlightAckMetadata, FlightBatchMetadata};
use crate::errors::ZerobusError;
use crate::headers_provider::HeadersProvider;
use crate::offset_generator::{OffsetId, OffsetIdGenerator};
use crate::stream_options;
use crate::tls_config::TlsConfig;
use crate::ZerobusResult;

Expand Down Expand Up @@ -304,8 +304,12 @@ impl ZerobusArrowStream {
let table_properties = stream.table_properties.clone();
let options = stream.options.clone();
let headers_provider = Arc::clone(&stream.headers_provider);
let strategy = FixedInterval::from_millis(options.recovery_backoff_ms)
.take(options.recovery_retries as usize);
let strategy = stream_options::recovery_retry_strategy(
options.retry_strategy,
options.recovery_backoff_ms,
options.max_recovery_backoff_ms,
)
.take(options.recovery_retries as usize);

let create_attempt = || {
let endpoint = endpoint.clone();
Expand Down Expand Up @@ -618,6 +622,13 @@ impl ZerobusArrowStream {
let ack_timeout = Duration::from_millis(options.server_lack_of_ack_timeout_ms);
let mut response_stream = initial_response_stream;

let mut recovery_strategy = stream_options::recovery_retry_strategy(
options.retry_strategy,
options.recovery_backoff_ms,
options.max_recovery_backoff_ms,
)
.take(options.recovery_retries as usize);

loop {
if is_closed.load(Ordering::Relaxed) {
debug!("Supervisor: Stream closed, exiting");
Expand Down Expand Up @@ -677,7 +688,9 @@ impl ZerobusArrowStream {
is_paused.store(true, Ordering::Relaxed);

// Backoff before retry.
sleep(Duration::from_millis(options.recovery_backoff_ms)).await;
if let Some(delay) = recovery_strategy.next() {
sleep(delay).await;
}

// Clear the server error.
let _ = server_error_tx.send(None);
Expand Down Expand Up @@ -712,6 +725,12 @@ impl ZerobusArrowStream {
Ok(Ok(new_response_stream)) => {
info!("Supervisor: Recovery successful, resuming");
recovery_attempts.store(0, Ordering::Relaxed);
recovery_strategy = stream_options::recovery_retry_strategy(
options.retry_strategy,
options.recovery_backoff_ms,
options.max_recovery_backoff_ms,
)
.take(options.recovery_retries as usize);
// is_paused was already cleared inside reconnect().
response_stream = new_response_stream;
// Loop continues with new stream.
Expand Down
41 changes: 40 additions & 1 deletion rust/sdk/src/builder/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::callbacks::AckCallback;
use crate::databricks::zerobus::RecordType;
use crate::headers_provider::{HeadersProvider, OAuthHeadersProvider};
use crate::stream_configuration::StreamConfigurationOptions;
use crate::stream_options::RetryStrategy;
use crate::{TableProperties, ZerobusError, ZerobusResult, ZerobusSdk, ZerobusStream};

#[cfg(feature = "arrow-flight")]
Expand Down Expand Up @@ -195,6 +196,9 @@ impl<'a> StreamBuilder<'a> {
}

/// Set the backoff time in milliseconds between recovery retries.
///
/// This is the interval for fixed retry strategy and the initial delay for
/// exponential retry strategy.
pub fn recovery_backoff_ms(mut self, ms: u64) -> Self {
self.grpc_config.recovery_backoff_ms = ms;
#[cfg(feature = "arrow-flight")]
Expand All @@ -204,6 +208,26 @@ impl<'a> StreamBuilder<'a> {
self
}

/// Set the maximum backoff time in milliseconds for exponential recovery retries.
pub fn max_recovery_backoff_ms(mut self, ms: u64) -> Self {
self.grpc_config.max_recovery_backoff_ms = ms;
#[cfg(feature = "arrow-flight")]
{
self.arrow_config.max_recovery_backoff_ms = ms;
}
self
}

/// Set the retry strategy for stream creation and recovery.
pub fn retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.grpc_config.retry_strategy = strategy;
#[cfg(feature = "arrow-flight")]
{
self.arrow_config.retry_strategy = strategy;
}
self
}

/// Set the maximum number of recovery retry attempts.
pub fn recovery_retries(mut self, n: u32) -> Self {
self.grpc_config.recovery_retries = n;
Expand Down Expand Up @@ -503,20 +527,26 @@ mod tests {
#[test]
fn config_setters_chain() {
let sdk = test_sdk();
let _builder = sdk
let builder = sdk
.stream_builder()
.table("t")
.oauth("a", "b")
.json()
.recovery(false)
.recovery_timeout_ms(10_000)
.recovery_backoff_ms(1_000)
.max_recovery_backoff_ms(30_000)
.retry_strategy(RetryStrategy::Fixed)
.recovery_retries(3)
.server_lack_of_ack_timeout_ms(30_000)
.flush_timeout_ms(60_000)
.max_inflight_requests(500)
.stream_paused_max_wait_time_ms(Some(5_000))
.callback_max_wait_time_ms(None);

assert_eq!(builder.grpc_config.recovery_backoff_ms, 1_000);
assert_eq!(builder.grpc_config.max_recovery_backoff_ms, 30_000);
assert_eq!(builder.grpc_config.retry_strategy, RetryStrategy::Fixed);
}

#[test]
Expand All @@ -525,6 +555,11 @@ mod tests {
let builder = sdk.stream_builder().table("t").oauth("a", "b").json();
assert_eq!(builder.grpc_config.max_inflight_requests, 1_000_000);
assert!(builder.grpc_config.recovery);
assert_eq!(
builder.grpc_config.retry_strategy,
RetryStrategy::ExponentialBackoffWithJitter
);
assert_eq!(builder.grpc_config.max_recovery_backoff_ms, 30_000);
}

#[tokio::test]
Expand Down Expand Up @@ -654,13 +689,17 @@ mod tests {
.recovery(false)
.recovery_timeout_ms(5_000)
.recovery_backoff_ms(500)
.max_recovery_backoff_ms(5_000)
.retry_strategy(RetryStrategy::Fixed)
.recovery_retries(2)
.server_lack_of_ack_timeout_ms(10_000)
.flush_timeout_ms(20_000)
.stream_paused_max_wait_time_ms(Some(5_000));
assert!(!builder.arrow_config.recovery);
assert_eq!(builder.arrow_config.recovery_timeout_ms, 5_000);
assert_eq!(builder.arrow_config.recovery_backoff_ms, 500);
assert_eq!(builder.arrow_config.max_recovery_backoff_ms, 5_000);
assert_eq!(builder.arrow_config.retry_strategy, RetryStrategy::Fixed);
assert_eq!(builder.arrow_config.recovery_retries, 2);
assert_eq!(builder.arrow_config.server_lack_of_ack_timeout_ms, 10_000);
assert_eq!(builder.arrow_config.flush_timeout_ms, 20_000);
Expand Down
10 changes: 7 additions & 3 deletions rust/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ use std::sync::Arc;
use prost::Message;
use tokio::sync::RwLock;
use tokio::time::Duration;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::RetryIf;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -101,6 +100,7 @@ pub use record_types::{
ProtoBytes, ProtoEncodedRecord, ProtoMessage,
};
pub use stream_configuration::StreamConfigurationOptions;
pub use stream_options::RetryStrategy;
#[cfg(feature = "testing")]
pub use tls_config::NoTlsConfig;
pub use tls_config::{SecureTlsConfig, TlsConfig};
Expand Down Expand Up @@ -681,8 +681,12 @@ impl ZerobusStream {
let landing_zone_recovery = Arc::clone(&landing_zone);

// 1. Create a stream.
let strategy = FixedInterval::from_millis(options.recovery_backoff_ms)
.take(options.recovery_retries as usize);
let strategy = stream_options::recovery_retry_strategy(
options.retry_strategy,
options.recovery_backoff_ms,
options.max_recovery_backoff_ms,
)
.take(options.recovery_retries as usize);

let create_attempt = || {
let channel = channel.clone();
Expand Down
39 changes: 37 additions & 2 deletions rust/sdk/src/stream_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use crate::callbacks::AckCallback;
use crate::databricks::zerobus::RecordType;
use crate::stream_options::defaults;
use crate::stream_options::{defaults, RetryStrategy};

/// Configuration options for stream creation, recovery of broken streams and flushing.
///
Expand Down Expand Up @@ -52,11 +52,25 @@ pub struct StreamConfigurationOptions {

/// Backoff time in milliseconds between stream recovery retry attempts.
///
/// The SDK will wait this duration before attempting another recovery after a failure.
/// For fixed retry strategy, the SDK will wait this duration before each retry.
/// For exponential retry strategy, this is the base delay before jitter is applied;
/// retries use full jitter in the range `0..=computed_delay`.
///
/// Default: 2,000 (2 seconds)
pub recovery_backoff_ms: u64,

/// Maximum recovery backoff time in milliseconds.
///
/// This caps the exponential retry strategy. It is ignored by the fixed strategy.
///
/// Default: 30,000 (30 seconds)
pub max_recovery_backoff_ms: u64,

/// Retry strategy for stream creation and recovery.
///
/// Default: `RetryStrategy::ExponentialBackoffWithJitter`
pub retry_strategy: RetryStrategy,

/// Maximum number of recovery retry attempts before giving up.
///
/// After this many failed attempts, the stream will close and return an error.
Expand Down Expand Up @@ -166,6 +180,8 @@ impl Default for StreamConfigurationOptions {
recovery: defaults::RECOVERY,
recovery_timeout_ms: defaults::RECOVERY_TIMEOUT_MS,
recovery_backoff_ms: defaults::RECOVERY_BACKOFF_MS,
max_recovery_backoff_ms: defaults::MAX_RECOVERY_BACKOFF_MS,
retry_strategy: RetryStrategy::default(),
recovery_retries: defaults::RECOVERY_RETRIES,
server_lack_of_ack_timeout_ms: defaults::SERVER_LACK_OF_ACK_TIMEOUT_MS,
flush_timeout_ms: defaults::FLUSH_TIMEOUT_MS,
Expand All @@ -176,3 +192,22 @@ impl Default for StreamConfigurationOptions {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn defaults_use_exponential_retry_strategy_with_cap() {
let options = StreamConfigurationOptions::default();

assert_eq!(
options.retry_strategy,
RetryStrategy::ExponentialBackoffWithJitter
);
assert_eq!(
options.max_recovery_backoff_ms,
defaults::MAX_RECOVERY_BACKOFF_MS
);
}
}
Loading