From 4452b7e972cb1450ce03a8c8c8d6609f53ff16e6 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Mon, 17 Nov 2025 16:26:54 -0800 Subject: [PATCH 01/13] add operation_timeout_s parameter for total operation timeout --- baseten-performance-client/core/src/client.rs | 41 +++++++++++++++---- .../core/src/split_policy.rs | 23 +++++++++++ .../python_bindings/src/lib.rs | 24 ++++++++--- 3 files changed, 75 insertions(+), 13 deletions(-) diff --git a/baseten-performance-client/core/src/client.rs b/baseten-performance-client/core/src/client.rs index cbc2618df..7957ba7aa 100644 --- a/baseten-performance-client/core/src/client.rs +++ b/baseten-performance-client/core/src/client.rs @@ -305,18 +305,39 @@ impl PerformanceClientCore { } // Process results as they complete with fast-fail cancellation - while let Some(task_result) = join_set.join_next().await { - match process_joinset_outcome(task_result, &cancel_token) { - Ok((response, duration, start_index, batch_index)) => { - indexed_results.push((batch_index, response, duration, start_index)); + let process_results = async { + while let Some(task_result) = join_set.join_next().await { + match process_joinset_outcome(task_result, &cancel_token) { + Ok((response, duration, start_index, batch_index)) => { + indexed_results.push((batch_index, response, duration, start_index)); + } + Err(e) => { + // Cancel all remaining tasks immediately + cancel_token.store(true, Ordering::SeqCst); + join_set.abort_all(); + return Err(e); + } } - Err(e) => { - // Cancel all remaining tasks immediately + } + Ok(()) + }; + + // Apply operation timeout if configured + if let Some(operation_timeout) = config.operation_timeout_duration() { + match tokio::time::timeout(operation_timeout, process_results).await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e), + Err(_) => { cancel_token.store(true, Ordering::SeqCst); join_set.abort_all(); - return Err(e); + return Err(ClientError::Timeout(format!( + "Batch operation timed out after {:.3}s", + operation_timeout.as_secs_f64() + ))); } } + } else { + process_results.await?; } // Sort results by original batch order to preserve ordering @@ -367,6 +388,7 @@ impl PerformanceClientCore { max_chars_per_request: Option, timeout_s: f64, hedge_delay: Option, + operation_timeout_s: Option, ) -> Result<(CoreOpenAIEmbeddingsResponse, Vec, Duration), ClientError> { // Create and validate config let config = RequestProcessingConfig::new( @@ -376,6 +398,7 @@ impl PerformanceClientCore { self.base_url.to_string(), hedge_delay, max_chars_per_request, + operation_timeout_s, )?; // Create batches @@ -426,6 +449,7 @@ impl PerformanceClientCore { max_chars_per_request: Option, timeout_s: f64, hedge_delay: Option, + operation_timeout_s: Option, ) -> Result<(CoreRerankResponse, Vec, Duration), ClientError> { // Create and validate config let config = RequestProcessingConfig::new( @@ -435,6 +459,7 @@ impl PerformanceClientCore { self.base_url.to_string(), hedge_delay, max_chars_per_request, + operation_timeout_s, )?; // Create batches @@ -487,6 +512,7 @@ impl PerformanceClientCore { max_chars_per_request: Option, timeout_s: f64, hedge_delay: Option, + operation_timeout_s: Option, ) -> Result<(CoreClassificationResponse, Vec, Duration), ClientError> { // Create and validate config let config = RequestProcessingConfig::new( @@ -496,6 +522,7 @@ impl PerformanceClientCore { self.base_url.to_string(), hedge_delay, max_chars_per_request, + operation_timeout_s, )?; // Create batches diff --git a/baseten-performance-client/core/src/split_policy.rs b/baseten-performance-client/core/src/split_policy.rs index dc03d5d47..6b1663224 100644 --- a/baseten-performance-client/core/src/split_policy.rs +++ b/baseten-performance-client/core/src/split_policy.rs @@ -22,6 +22,7 @@ pub struct RequestProcessingConfig { pub base_url: String, pub hedge_delay: Option, pub max_chars_per_request: Option, + pub operation_timeout_s: Option, } impl RequestProcessingConfig { @@ -33,6 +34,7 @@ impl RequestProcessingConfig { base_url: String, hedge_delay: Option, max_chars_per_request: Option, + operation_timeout_s: Option, ) -> Result { // Validate timeout if !(MIN_REQUEST_TIMEOUT_S..=MAX_REQUEST_TIMEOUT_S).contains(&timeout_s) { @@ -65,6 +67,21 @@ impl RequestProcessingConfig { ))); } } + if operation_timeout_s.is_some() { + let operation_timeout = operation_timeout_s.unwrap(); + if !(MIN_REQUEST_TIMEOUT_S..=MAX_REQUEST_TIMEOUT_S).contains(&operation_timeout) { + return Err(crate::errors::ClientError::InvalidParameter(format!( + "Operation timeout {:.3}s is outside the allowed range [{:.3}s, {:.3}s].", + operation_timeout, MIN_REQUEST_TIMEOUT_S, MAX_REQUEST_TIMEOUT_S + ))); + } + if operation_timeout < timeout_s { + return Err(crate::errors::ClientError::InvalidParameter(format!( + "Operation timeout {:.3}s must be greater than or equal to per-request timeout {:.3}s.", + operation_timeout, timeout_s + ))); + } + } // Validate concurrency parameters if max_concurrent_requests == 0 || max_concurrent_requests > MAX_CONCURRENCY_HIGH_BATCH { @@ -93,6 +110,7 @@ impl RequestProcessingConfig { base_url, hedge_delay, max_chars_per_request, + operation_timeout_s, }) } @@ -100,6 +118,11 @@ impl RequestProcessingConfig { pub fn timeout_duration(&self) -> std::time::Duration { std::time::Duration::from_secs_f64(self.timeout_s) } + + /// Get operation timeout duration if set + pub fn operation_timeout_duration(&self) -> Option { + self.operation_timeout_s.map(|s| std::time::Duration::from_secs_f64(s)) + } } impl SplitPolicy { diff --git a/baseten-performance-client/python_bindings/src/lib.rs b/baseten-performance-client/python_bindings/src/lib.rs index 087f9e18d..a6a3f8a7e 100644 --- a/baseten-performance-client/python_bindings/src/lib.rs +++ b/baseten-performance-client/python_bindings/src/lib.rs @@ -330,7 +330,7 @@ impl PerformanceClient { Ok(self.core_client.api_key.clone()) } - #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None))] + #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] fn embed( &self, py: Python, @@ -344,6 +344,7 @@ impl PerformanceClient { timeout_s: f64, max_chars_per_request: Option, hedge_delay: Option, + operation_timeout_s: Option, ) -> PyResult { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); @@ -368,6 +369,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + operation_timeout_s, ) .await; let _ = tx.send(res); @@ -395,7 +397,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None))] + #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] fn async_embed<'py>( &self, py: Python<'py>, @@ -409,6 +411,7 @@ impl PerformanceClient { timeout_s: f64, max_chars_per_request: Option, hedge_delay: Option, + operation_timeout_s: Option, ) -> PyResult> { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); @@ -429,6 +432,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + operation_timeout_s, ) .await .map_err(Self::convert_core_error_to_py_err)?; @@ -448,7 +452,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None))] + #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] fn rerank( &self, py: Python, @@ -463,6 +467,7 @@ impl PerformanceClient { timeout_s: f64, max_chars_per_request: Option, hedge_delay: Option, + operation_timeout_s: Option, ) -> PyResult { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); @@ -489,6 +494,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + operation_timeout_s, ) .await; let _ = tx.send(res); @@ -516,7 +522,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None))] + #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] fn async_rerank<'py>( &self, py: Python<'py>, @@ -531,6 +537,7 @@ impl PerformanceClient { timeout_s: f64, max_chars_per_request: Option, hedge_delay: Option, + operation_timeout_s: Option, ) -> PyResult> { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); @@ -553,6 +560,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + operation_timeout_s, ) .await .map_err(Self::convert_core_error_to_py_err)?; @@ -572,7 +580,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None))] + #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] fn classify( &self, py: Python, @@ -585,6 +593,7 @@ impl PerformanceClient { timeout_s: f64, max_chars_per_request: Option, hedge_delay: Option, + operation_timeout_s: Option, ) -> PyResult { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); @@ -609,6 +618,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + operation_timeout_s, ) .await; let _ = tx.send(res); @@ -636,7 +646,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None))] + #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] fn async_classify<'py>( &self, py: Python<'py>, @@ -649,6 +659,7 @@ impl PerformanceClient { timeout_s: f64, max_chars_per_request: Option, hedge_delay: Option, + operation_timeout_s: Option, ) -> PyResult> { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); @@ -669,6 +680,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + operation_timeout_s, ) .await .map_err(Self::convert_core_error_to_py_err)?; From 3f5b9f0b68f366de77a6b8797476fa5ad8009d23 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Mon, 17 Nov 2025 16:38:31 -0800 Subject: [PATCH 02/13] reduce MIN_REQUEST_TIMEOUT_S from 1 to 0.3s --- baseten-performance-client/core/src/constants.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/baseten-performance-client/core/src/constants.rs b/baseten-performance-client/core/src/constants.rs index fa710cd5d..142740246 100644 --- a/baseten-performance-client/core/src/constants.rs +++ b/baseten-performance-client/core/src/constants.rs @@ -2,7 +2,7 @@ use std::time::Duration; // Request timeout constants pub const DEFAULT_REQUEST_TIMEOUT_S: f64 = 3600.0; -pub const MIN_REQUEST_TIMEOUT_S: f64 = 1.0; +pub const MIN_REQUEST_TIMEOUT_S: f64 = 0.3; pub const MAX_REQUEST_TIMEOUT_S: f64 = 3600.0; // Concurrency constants @@ -14,7 +14,7 @@ pub const MIN_CHARACTERS_PER_REQUEST: usize = 50; pub const MAX_CHARACTERS_PER_REQUEST: usize = 256000; // hedging settings: -pub const MIN_HEDGE_DELAY_S: f64 = 0.2; +pub const MIN_HEDGE_DELAY_S: f64 = 0.1; pub const HEDGE_BUDGET_PERCENTAGE: f64 = 0.10; // Batch size constants From 3bb8d2b10558828290bf2993685c901411d0756d Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Tue, 18 Nov 2025 13:43:30 -0800 Subject: [PATCH 03/13] add retry parameter --- baseten-performance-client/Cargo.lock | 4 +-- baseten-performance-client/core/Cargo.toml | 2 +- baseten-performance-client/core/src/client.rs | 9 ++++++- .../core/src/split_policy.rs | 26 +++++++++++++++++++ .../python_bindings/Cargo.toml | 2 +- .../python_bindings/src/lib.rs | 25 +++++++++++++----- 6 files changed, 57 insertions(+), 11 deletions(-) diff --git a/baseten-performance-client/Cargo.lock b/baseten-performance-client/Cargo.lock index 03df90b9c..2d7566691 100644 --- a/baseten-performance-client/Cargo.lock +++ b/baseten-performance-client/Cargo.lock @@ -76,7 +76,7 @@ dependencies = [ [[package]] name = "baseten_performance_client" -version = "0.0.11" +version = "0.0.12-dev.1" dependencies = [ "baseten_performance_client_core", "futures", @@ -93,7 +93,7 @@ dependencies = [ [[package]] name = "baseten_performance_client_core" -version = "0.0.11" +version = "0.0.12-dev.1" dependencies = [ "futures", "once_cell", diff --git a/baseten-performance-client/core/Cargo.toml b/baseten-performance-client/core/Cargo.toml index ca46df717..5fceaa2f9 100644 --- a/baseten-performance-client/core/Cargo.toml +++ b/baseten-performance-client/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "baseten_performance_client_core" -version = "0.0.11" +version = "0.0.12-dev.1" edition = "2021" description = "High performance HTTP client for Baseten.co and other APIs" license = "MIT" diff --git a/baseten-performance-client/core/src/client.rs b/baseten-performance-client/core/src/client.rs index 7957ba7aa..371d2ac4b 100644 --- a/baseten-performance-client/core/src/client.rs +++ b/baseten-performance-client/core/src/client.rs @@ -251,6 +251,7 @@ impl PerformanceClientCore { let mut indexed_results: Vec<(usize, R, Duration, usize)> = Vec::with_capacity(total_requests); + let max_retries = config.max_retries(); let mut current_absolute_index = 0; for (batch_index, batch) in batches.into_iter().enumerate() { let current_batch_absolute_start_index = current_absolute_index; @@ -275,7 +276,7 @@ impl PerformanceClientCore { let request_time_start = Instant::now(); let config = SendRequestConfig { - max_retries: MAX_HTTP_RETRIES, + max_retries: max_retries, initial_backoff: Duration::from_millis(INITIAL_BACKOFF_MS), retry_budget: retry_budget, cancel_token: cancel_token.clone(), @@ -389,6 +390,7 @@ impl PerformanceClientCore { timeout_s: f64, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> Result<(CoreOpenAIEmbeddingsResponse, Vec, Duration), ClientError> { // Create and validate config let config = RequestProcessingConfig::new( @@ -399,6 +401,7 @@ impl PerformanceClientCore { hedge_delay, max_chars_per_request, operation_timeout_s, + max_retries, )?; // Create batches @@ -450,6 +453,7 @@ impl PerformanceClientCore { timeout_s: f64, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> Result<(CoreRerankResponse, Vec, Duration), ClientError> { // Create and validate config let config = RequestProcessingConfig::new( @@ -460,6 +464,7 @@ impl PerformanceClientCore { hedge_delay, max_chars_per_request, operation_timeout_s, + max_retries, )?; // Create batches @@ -513,6 +518,7 @@ impl PerformanceClientCore { timeout_s: f64, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> Result<(CoreClassificationResponse, Vec, Duration), ClientError> { // Create and validate config let config = RequestProcessingConfig::new( @@ -523,6 +529,7 @@ impl PerformanceClientCore { hedge_delay, max_chars_per_request, operation_timeout_s, + max_retries, )?; // Create batches diff --git a/baseten-performance-client/core/src/split_policy.rs b/baseten-performance-client/core/src/split_policy.rs index 6b1663224..2ac4ff899 100644 --- a/baseten-performance-client/core/src/split_policy.rs +++ b/baseten-performance-client/core/src/split_policy.rs @@ -23,6 +23,7 @@ pub struct RequestProcessingConfig { pub hedge_delay: Option, pub max_chars_per_request: Option, pub operation_timeout_s: Option, + pub max_retries: Option, } impl RequestProcessingConfig { @@ -35,6 +36,7 @@ impl RequestProcessingConfig { hedge_delay: Option, max_chars_per_request: Option, operation_timeout_s: Option, + max_retries: Option, ) -> Result { // Validate timeout if !(MIN_REQUEST_TIMEOUT_S..=MAX_REQUEST_TIMEOUT_S).contains(&timeout_s) { @@ -82,6 +84,24 @@ impl RequestProcessingConfig { ))); } } + // Validate and convert max_retries from i64 to u32 + let max_retries_u32 = if let Some(retries) = max_retries { + if retries < 0 { + return Err(crate::errors::ClientError::InvalidParameter(format!( + "max_retries must be non-negative, got {}", + retries + ))); + } + if retries > MAX_HTTP_RETRIES as i64 { + return Err(crate::errors::ClientError::InvalidParameter(format!( + "max_retries {} exceeds maximum allowed retries {}", + retries, MAX_HTTP_RETRIES + ))); + } + Some(retries as u32) + } else { + None + }; // Validate concurrency parameters if max_concurrent_requests == 0 || max_concurrent_requests > MAX_CONCURRENCY_HIGH_BATCH { @@ -111,6 +131,7 @@ impl RequestProcessingConfig { hedge_delay, max_chars_per_request, operation_timeout_s, + max_retries: max_retries_u32, }) } @@ -123,6 +144,11 @@ impl RequestProcessingConfig { pub fn operation_timeout_duration(&self) -> Option { self.operation_timeout_s.map(|s| std::time::Duration::from_secs_f64(s)) } + + /// Get max retries, defaulting to MAX_HTTP_RETRIES if not set + pub fn max_retries(&self) -> u32 { + self.max_retries.unwrap_or(MAX_HTTP_RETRIES) + } } impl SplitPolicy { diff --git a/baseten-performance-client/python_bindings/Cargo.toml b/baseten-performance-client/python_bindings/Cargo.toml index 874874678..042bf4807 100644 --- a/baseten-performance-client/python_bindings/Cargo.toml +++ b/baseten-performance-client/python_bindings/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "baseten_performance_client" -version = "0.0.11" +version = "0.0.12-dev.1" edition = "2021" [dependencies] diff --git a/baseten-performance-client/python_bindings/src/lib.rs b/baseten-performance-client/python_bindings/src/lib.rs index a6a3f8a7e..b69333d61 100644 --- a/baseten-performance-client/python_bindings/src/lib.rs +++ b/baseten-performance-client/python_bindings/src/lib.rs @@ -309,6 +309,7 @@ impl PerformanceClient { ClientError::Connect(msg) => PyValueError::new_err(msg), } } + } #[pymethods] @@ -330,7 +331,7 @@ impl PerformanceClient { Ok(self.core_client.api_key.clone()) } - #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] + #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn embed( &self, py: Python, @@ -345,6 +346,7 @@ impl PerformanceClient { max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> PyResult { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); @@ -370,6 +372,7 @@ impl PerformanceClient { timeout_s, hedge_delay, operation_timeout_s, + max_retries, ) .await; let _ = tx.send(res); @@ -397,7 +400,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] + #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn async_embed<'py>( &self, py: Python<'py>, @@ -412,6 +415,7 @@ impl PerformanceClient { max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> PyResult> { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); @@ -433,6 +437,7 @@ impl PerformanceClient { timeout_s, hedge_delay, operation_timeout_s, + max_retries, ) .await .map_err(Self::convert_core_error_to_py_err)?; @@ -452,7 +457,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] + #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn rerank( &self, py: Python, @@ -468,6 +473,7 @@ impl PerformanceClient { max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> PyResult { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); @@ -495,6 +501,7 @@ impl PerformanceClient { timeout_s, hedge_delay, operation_timeout_s, + max_retries, ) .await; let _ = tx.send(res); @@ -522,7 +529,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] + #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn async_rerank<'py>( &self, py: Python<'py>, @@ -538,6 +545,7 @@ impl PerformanceClient { max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> PyResult> { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); @@ -561,6 +569,7 @@ impl PerformanceClient { timeout_s, hedge_delay, operation_timeout_s, + max_retries, ) .await .map_err(Self::convert_core_error_to_py_err)?; @@ -580,7 +589,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] + #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn classify( &self, py: Python, @@ -594,6 +603,7 @@ impl PerformanceClient { max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> PyResult { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); @@ -619,6 +629,7 @@ impl PerformanceClient { timeout_s, hedge_delay, operation_timeout_s, + max_retries, ) .await; let _ = tx.send(res); @@ -646,7 +657,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None))] + #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn async_classify<'py>( &self, py: Python<'py>, @@ -660,6 +671,7 @@ impl PerformanceClient { max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, + max_retries: Option, ) -> PyResult> { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); @@ -681,6 +693,7 @@ impl PerformanceClient { timeout_s, hedge_delay, operation_timeout_s, + max_retries, ) .await .map_err(Self::convert_core_error_to_py_err)?; From 8496b24fe532c5c7aad1a1c39227c50bef23140e Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Tue, 18 Nov 2025 15:45:40 -0800 Subject: [PATCH 04/13] update timeout min to 0.5s, reset min hedge delay to 0.2 --- .../core/src/constants.rs | 4 +-- .../python_bindings/src/lib.rs | 36 ++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/baseten-performance-client/core/src/constants.rs b/baseten-performance-client/core/src/constants.rs index 142740246..7fd05628f 100644 --- a/baseten-performance-client/core/src/constants.rs +++ b/baseten-performance-client/core/src/constants.rs @@ -2,7 +2,7 @@ use std::time::Duration; // Request timeout constants pub const DEFAULT_REQUEST_TIMEOUT_S: f64 = 3600.0; -pub const MIN_REQUEST_TIMEOUT_S: f64 = 0.3; +pub const MIN_REQUEST_TIMEOUT_S: f64 = 0.5; pub const MAX_REQUEST_TIMEOUT_S: f64 = 3600.0; // Concurrency constants @@ -14,7 +14,7 @@ pub const MIN_CHARACTERS_PER_REQUEST: usize = 50; pub const MAX_CHARACTERS_PER_REQUEST: usize = 256000; // hedging settings: -pub const MIN_HEDGE_DELAY_S: f64 = 0.1; +pub const MIN_HEDGE_DELAY_S: f64 = 0.2; pub const HEDGE_BUDGET_PERCENTAGE: f64 = 0.10; // Batch size constants diff --git a/baseten-performance-client/python_bindings/src/lib.rs b/baseten-performance-client/python_bindings/src/lib.rs index b69333d61..7cc5b3e8e 100644 --- a/baseten-performance-client/python_bindings/src/lib.rs +++ b/baseten-performance-client/python_bindings/src/lib.rs @@ -331,7 +331,7 @@ impl PerformanceClient { Ok(self.core_client.api_key.clone()) } - #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn embed( &self, py: Python, @@ -342,7 +342,7 @@ impl PerformanceClient { user: Option, max_concurrent_requests: usize, batch_size: usize, - timeout_s: f64, + timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, @@ -351,6 +351,8 @@ impl PerformanceClient { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); } + // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default + let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let rt: Arc = Arc::clone(&self.runtime); @@ -400,7 +402,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn async_embed<'py>( &self, py: Python<'py>, @@ -411,7 +413,7 @@ impl PerformanceClient { user: Option, max_concurrent_requests: usize, batch_size: usize, - timeout_s: f64, + timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, @@ -420,6 +422,8 @@ impl PerformanceClient { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); } + // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default + let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); @@ -457,7 +461,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn rerank( &self, py: Python, @@ -469,7 +473,7 @@ impl PerformanceClient { truncation_direction: &str, max_concurrent_requests: usize, batch_size: usize, - timeout_s: f64, + timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, @@ -478,6 +482,8 @@ impl PerformanceClient { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); } + // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default + let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let rt = Arc::clone(&self.runtime); @@ -529,7 +535,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn async_rerank<'py>( &self, py: Python<'py>, @@ -541,7 +547,7 @@ impl PerformanceClient { truncation_direction: &str, max_concurrent_requests: usize, batch_size: usize, - timeout_s: f64, + timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, @@ -550,6 +556,8 @@ impl PerformanceClient { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); } + // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default + let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let truncation_direction_owned = truncation_direction.to_string(); @@ -589,7 +597,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn classify( &self, py: Python, @@ -599,7 +607,7 @@ impl PerformanceClient { truncation_direction: &str, max_concurrent_requests: usize, batch_size: usize, - timeout_s: f64, + timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, @@ -608,6 +616,8 @@ impl PerformanceClient { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); } + // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default + let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let rt = Arc::clone(&self.runtime); @@ -657,7 +667,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] fn async_classify<'py>( &self, py: Python<'py>, @@ -667,7 +677,7 @@ impl PerformanceClient { truncation_direction: &str, max_concurrent_requests: usize, batch_size: usize, - timeout_s: f64, + timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, operation_timeout_s: Option, @@ -676,6 +686,8 @@ impl PerformanceClient { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); } + // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default + let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let truncation_direction_owned = truncation_direction.to_string(); From 8e8329e2101039d1e8aa21a08ec08664e7f9c3d2 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Tue, 18 Nov 2025 16:09:02 -0800 Subject: [PATCH 05/13] rename operation_timeout_s to total_timeout_s --- baseten-performance-client/core/src/client.rs | 18 +++--- .../core/src/split_policy.rs | 28 ++++----- .../python_bindings/src/lib.rs | 60 +++++++++---------- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/baseten-performance-client/core/src/client.rs b/baseten-performance-client/core/src/client.rs index 371d2ac4b..7a267589c 100644 --- a/baseten-performance-client/core/src/client.rs +++ b/baseten-performance-client/core/src/client.rs @@ -324,8 +324,8 @@ impl PerformanceClientCore { }; // Apply operation timeout if configured - if let Some(operation_timeout) = config.operation_timeout_duration() { - match tokio::time::timeout(operation_timeout, process_results).await { + if let Some(total_timeout) = config.total_timeout_duration() { + match tokio::time::timeout(total_timeout, process_results).await { Ok(Ok(())) => {} Ok(Err(e)) => return Err(e), Err(_) => { @@ -333,7 +333,7 @@ impl PerformanceClientCore { join_set.abort_all(); return Err(ClientError::Timeout(format!( "Batch operation timed out after {:.3}s", - operation_timeout.as_secs_f64() + total_timeout.as_secs_f64() ))); } } @@ -389,7 +389,7 @@ impl PerformanceClientCore { max_chars_per_request: Option, timeout_s: f64, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> Result<(CoreOpenAIEmbeddingsResponse, Vec, Duration), ClientError> { // Create and validate config @@ -400,7 +400,7 @@ impl PerformanceClientCore { self.base_url.to_string(), hedge_delay, max_chars_per_request, - operation_timeout_s, + total_timeout_s, max_retries, )?; @@ -452,7 +452,7 @@ impl PerformanceClientCore { max_chars_per_request: Option, timeout_s: f64, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> Result<(CoreRerankResponse, Vec, Duration), ClientError> { // Create and validate config @@ -463,7 +463,7 @@ impl PerformanceClientCore { self.base_url.to_string(), hedge_delay, max_chars_per_request, - operation_timeout_s, + total_timeout_s, max_retries, )?; @@ -517,7 +517,7 @@ impl PerformanceClientCore { max_chars_per_request: Option, timeout_s: f64, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> Result<(CoreClassificationResponse, Vec, Duration), ClientError> { // Create and validate config @@ -528,7 +528,7 @@ impl PerformanceClientCore { self.base_url.to_string(), hedge_delay, max_chars_per_request, - operation_timeout_s, + total_timeout_s, max_retries, )?; diff --git a/baseten-performance-client/core/src/split_policy.rs b/baseten-performance-client/core/src/split_policy.rs index 2ac4ff899..a78c8c9de 100644 --- a/baseten-performance-client/core/src/split_policy.rs +++ b/baseten-performance-client/core/src/split_policy.rs @@ -22,7 +22,7 @@ pub struct RequestProcessingConfig { pub base_url: String, pub hedge_delay: Option, pub max_chars_per_request: Option, - pub operation_timeout_s: Option, + pub total_timeout_s: Option, pub max_retries: Option, } @@ -35,7 +35,7 @@ impl RequestProcessingConfig { base_url: String, hedge_delay: Option, max_chars_per_request: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> Result { // Validate timeout @@ -69,18 +69,18 @@ impl RequestProcessingConfig { ))); } } - if operation_timeout_s.is_some() { - let operation_timeout = operation_timeout_s.unwrap(); - if !(MIN_REQUEST_TIMEOUT_S..=MAX_REQUEST_TIMEOUT_S).contains(&operation_timeout) { + if total_timeout_s.is_some() { + let total_timeout = total_timeout_s.unwrap(); + if !(MIN_REQUEST_TIMEOUT_S..=MAX_REQUEST_TIMEOUT_S).contains(&total_timeout) { return Err(crate::errors::ClientError::InvalidParameter(format!( - "Operation timeout {:.3}s is outside the allowed range [{:.3}s, {:.3}s].", - operation_timeout, MIN_REQUEST_TIMEOUT_S, MAX_REQUEST_TIMEOUT_S + "Total timeout {:.3}s is outside the allowed range [{:.3}s, {:.3}s].", + total_timeout, MIN_REQUEST_TIMEOUT_S, MAX_REQUEST_TIMEOUT_S ))); } - if operation_timeout < timeout_s { + if total_timeout < timeout_s { return Err(crate::errors::ClientError::InvalidParameter(format!( - "Operation timeout {:.3}s must be greater than or equal to per-request timeout {:.3}s.", - operation_timeout, timeout_s + "Total timeout {:.3}s must be greater than or equal to per-request timeout {:.3}s.", + total_timeout, timeout_s ))); } } @@ -130,7 +130,7 @@ impl RequestProcessingConfig { base_url, hedge_delay, max_chars_per_request, - operation_timeout_s, + total_timeout_s, max_retries: max_retries_u32, }) } @@ -140,9 +140,9 @@ impl RequestProcessingConfig { std::time::Duration::from_secs_f64(self.timeout_s) } - /// Get operation timeout duration if set - pub fn operation_timeout_duration(&self) -> Option { - self.operation_timeout_s.map(|s| std::time::Duration::from_secs_f64(s)) + /// Get total timeout duration if set + pub fn total_timeout_duration(&self) -> Option { + self.total_timeout_s.map(|s| std::time::Duration::from_secs_f64(s)) } /// Get max retries, defaulting to MAX_HTTP_RETRIES if not set diff --git a/baseten-performance-client/python_bindings/src/lib.rs b/baseten-performance-client/python_bindings/src/lib.rs index 7cc5b3e8e..8e8eb1d17 100644 --- a/baseten-performance-client/python_bindings/src/lib.rs +++ b/baseten-performance-client/python_bindings/src/lib.rs @@ -331,7 +331,7 @@ impl PerformanceClient { Ok(self.core_client.api_key.clone()) } - #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, total_timeout_s = None, max_retries = None))] fn embed( &self, py: Python, @@ -345,14 +345,14 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> PyResult { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); } - // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default - let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); + // Resolve timeout_s: use provided value, or total_timeout_s if set, or default + let timeout_s = timeout_s.or(total_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let rt: Arc = Arc::clone(&self.runtime); @@ -373,7 +373,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, - operation_timeout_s, + total_timeout_s, max_retries, ) .await; @@ -402,7 +402,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(name = "async_embed", signature = (input, model, encoding_format = None, dimensions = None, user = None, max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, total_timeout_s = None, max_retries = None))] fn async_embed<'py>( &self, py: Python<'py>, @@ -416,14 +416,14 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> PyResult> { if input.is_empty() { return Err(PyValueError::new_err("Input list cannot be empty")); } - // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default - let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); + // Resolve timeout_s: use provided value, or total_timeout_s if set, or default + let timeout_s = timeout_s.or(total_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); @@ -440,7 +440,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, - operation_timeout_s, + total_timeout_s, max_retries, ) .await @@ -461,7 +461,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, total_timeout_s = None, max_retries = None))] fn rerank( &self, py: Python, @@ -476,14 +476,14 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> PyResult { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); } - // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default - let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); + // Resolve timeout_s: use provided value, or total_timeout_s if set, or default + let timeout_s = timeout_s.or(total_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let rt = Arc::clone(&self.runtime); @@ -506,7 +506,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, - operation_timeout_s, + total_timeout_s, max_retries, ) .await; @@ -535,7 +535,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(name = "async_rerank", signature = (query, texts, raw_scores = false, return_text = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, total_timeout_s = None, max_retries = None))] fn async_rerank<'py>( &self, py: Python<'py>, @@ -550,14 +550,14 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> PyResult> { if texts.is_empty() { return Err(PyValueError::new_err("Texts list cannot be empty")); } - // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default - let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); + // Resolve timeout_s: use provided value, or total_timeout_s if set, or default + let timeout_s = timeout_s.or(total_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let truncation_direction_owned = truncation_direction.to_string(); @@ -576,7 +576,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, - operation_timeout_s, + total_timeout_s, max_retries, ) .await @@ -597,7 +597,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, total_timeout_s = None, max_retries = None))] fn classify( &self, py: Python, @@ -610,14 +610,14 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> PyResult { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); } - // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default - let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); + // Resolve timeout_s: use provided value, or total_timeout_s if set, or default + let timeout_s = timeout_s.or(total_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let rt = Arc::clone(&self.runtime); @@ -638,7 +638,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, - operation_timeout_s, + total_timeout_s, max_retries, ) .await; @@ -667,7 +667,7 @@ impl PerformanceClient { Ok(api_response) } - #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, operation_timeout_s = None, max_retries = None))] + #[pyo3(name = "async_classify", signature = (inputs, raw_scores = false, truncate = false, truncation_direction = "Right", max_concurrent_requests = DEFAULT_CONCURRENCY, batch_size = DEFAULT_BATCH_SIZE, timeout_s = None, max_chars_per_request = None, hedge_delay = None, total_timeout_s = None, max_retries = None))] fn async_classify<'py>( &self, py: Python<'py>, @@ -680,14 +680,14 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, - operation_timeout_s: Option, + total_timeout_s: Option, max_retries: Option, ) -> PyResult> { if inputs.is_empty() { return Err(PyValueError::new_err("Inputs list cannot be empty")); } - // Resolve timeout_s: use provided value, or operation_timeout_s if set, or default - let timeout_s = timeout_s.or(operation_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); + // Resolve timeout_s: use provided value, or total_timeout_s if set, or default + let timeout_s = timeout_s.or(total_timeout_s).unwrap_or(DEFAULT_REQUEST_TIMEOUT_S); let core_client = self.core_client.clone(); let truncation_direction_owned = truncation_direction.to_string(); @@ -704,7 +704,7 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, - operation_timeout_s, + total_timeout_s, max_retries, ) .await From 30991207524b480222bf98fa25e332252f7b9229 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Tue, 18 Nov 2025 16:24:01 -0800 Subject: [PATCH 06/13] add separate constants for total timeout and request timeout --- baseten-performance-client/core/src/client.rs | 2 +- baseten-performance-client/core/src/constants.rs | 4 ++++ baseten-performance-client/core/src/split_policy.rs | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/baseten-performance-client/core/src/client.rs b/baseten-performance-client/core/src/client.rs index 7a267589c..a3fcf8dbf 100644 --- a/baseten-performance-client/core/src/client.rs +++ b/baseten-performance-client/core/src/client.rs @@ -323,7 +323,7 @@ impl PerformanceClientCore { Ok(()) }; - // Apply operation timeout if configured + // Apply total timeout if configured if let Some(total_timeout) = config.total_timeout_duration() { match tokio::time::timeout(total_timeout, process_results).await { Ok(Ok(())) => {} diff --git a/baseten-performance-client/core/src/constants.rs b/baseten-performance-client/core/src/constants.rs index 7fd05628f..3f7a3d3f1 100644 --- a/baseten-performance-client/core/src/constants.rs +++ b/baseten-performance-client/core/src/constants.rs @@ -5,6 +5,10 @@ pub const DEFAULT_REQUEST_TIMEOUT_S: f64 = 3600.0; pub const MIN_REQUEST_TIMEOUT_S: f64 = 0.5; pub const MAX_REQUEST_TIMEOUT_S: f64 = 3600.0; +// Total timeout constants +pub const MIN_TOTAL_TIMEOUT_S: f64 = 0.5; +pub const MAX_TOTAL_TIMEOUT_S: f64 = 3600.0; + // Concurrency constants pub const MAX_CONCURRENCY_HIGH_BATCH: usize = 1024; pub const MAX_CONCURRENCY_LOW_BATCH: usize = 512; diff --git a/baseten-performance-client/core/src/split_policy.rs b/baseten-performance-client/core/src/split_policy.rs index a78c8c9de..0b7a071da 100644 --- a/baseten-performance-client/core/src/split_policy.rs +++ b/baseten-performance-client/core/src/split_policy.rs @@ -71,10 +71,10 @@ impl RequestProcessingConfig { } if total_timeout_s.is_some() { let total_timeout = total_timeout_s.unwrap(); - if !(MIN_REQUEST_TIMEOUT_S..=MAX_REQUEST_TIMEOUT_S).contains(&total_timeout) { + if !(MIN_TOTAL_TIMEOUT_S..=MAX_TOTAL_TIMEOUT_S).contains(&total_timeout) { return Err(crate::errors::ClientError::InvalidParameter(format!( "Total timeout {:.3}s is outside the allowed range [{:.3}s, {:.3}s].", - total_timeout, MIN_REQUEST_TIMEOUT_S, MAX_REQUEST_TIMEOUT_S + total_timeout, MIN_TOTAL_TIMEOUT_S, MAX_TOTAL_TIMEOUT_S ))); } if total_timeout < timeout_s { From 2155db42370b2ad3b19538ee2c69b4ac4cff6fc3 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Wed, 19 Nov 2025 11:06:56 -0800 Subject: [PATCH 07/13] add node_bindings for total_timeout_s and max_retries --- baseten-performance-client/node_bindings/src/lib.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/baseten-performance-client/node_bindings/src/lib.rs b/baseten-performance-client/node_bindings/src/lib.rs index 063c94985..5b1575069 100644 --- a/baseten-performance-client/node_bindings/src/lib.rs +++ b/baseten-performance-client/node_bindings/src/lib.rs @@ -200,6 +200,8 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, + total_timeout_s: Option, + max_retries: Option, ) -> napi::Result { if input.is_empty() { return Err(create_napi_error("Input list cannot be empty")); @@ -225,6 +227,8 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + total_timeout_s, + max_retries, ) .await .map_err(convert_core_error_to_napi_error)?; @@ -255,6 +259,8 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, + total_timeout_s: Option, + max_retries: Option, ) -> napi::Result { if texts.is_empty() { return Err(create_napi_error("Texts list cannot be empty")); @@ -281,6 +287,8 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + total_timeout_s, + max_retries, ) .await .map_err(convert_core_error_to_napi_error)?; @@ -309,6 +317,8 @@ impl PerformanceClient { timeout_s: Option, max_chars_per_request: Option, hedge_delay: Option, + total_timeout_s: Option, + max_retries: Option, ) -> napi::Result { if inputs.is_empty() { return Err(create_napi_error("Inputs list cannot be empty")); @@ -333,6 +343,8 @@ impl PerformanceClient { max_chars_per_request, timeout_s, hedge_delay, + total_timeout_s, + max_retries, ) .await .map_err(convert_core_error_to_napi_error)?; From 23ded64d4eb9445c5f13b2ac2a31f2f41f19fa98 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Wed, 19 Nov 2025 13:45:14 -0800 Subject: [PATCH 08/13] add total_timeou_s to batch_post and async_batch_post --- baseten-performance-client/core/src/client.rs | 54 ++++++++++++++++--- .../node_bindings/src/lib.rs | 4 +- .../python_bindings/src/lib.rs | 8 ++- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/baseten-performance-client/core/src/client.rs b/baseten-performance-client/core/src/client.rs index a3fcf8dbf..e6c63a954 100644 --- a/baseten-performance-client/core/src/client.rs +++ b/baseten-performance-client/core/src/client.rs @@ -578,6 +578,7 @@ impl PerformanceClientCore { max_concurrent_requests: usize, timeout_s: f64, hedge_delay: Option, + total_timeout_s: Option, ) -> Result< ( Vec<( @@ -594,6 +595,23 @@ impl PerformanceClientCore { // Validate parameters internally (using batch_size of 128 for validation) let (validated_concurrency, request_timeout_duration) = self.validate_request_parameters(max_concurrent_requests, 128, timeout_s)?; + + // Validate total_timeout_s if provided + if let Some(total_timeout) = total_timeout_s { + if !(MIN_TOTAL_TIMEOUT_S..=MAX_TOTAL_TIMEOUT_S).contains(&total_timeout) { + return Err(ClientError::InvalidParameter(format!( + "Total timeout {:.3}s is outside the allowed range [{:.3}s, {:.3}s].", + total_timeout, MIN_TOTAL_TIMEOUT_S, MAX_TOTAL_TIMEOUT_S + ))); + } + if total_timeout < timeout_s { + return Err(ClientError::InvalidParameter(format!( + "Total timeout {:.3}s must be greater than or equal to per-request timeout {:.3}s.", + total_timeout, timeout_s + ))); + } + } + let semaphore = Arc::new(Semaphore::new(validated_concurrency)); let cancel_token = Arc::new(AtomicBool::new(false)); let total_payloads = payloads_json.len(); @@ -687,18 +705,40 @@ impl PerformanceClientCore { } // Process results as they complete with fast-fail cancellation - while let Some(task_result) = join_set.join_next().await { - match process_joinset_outcome(task_result, &cancel_token) { - Ok(indexed_data) => { - indexed_results.push(indexed_data); + let process_results = async { + while let Some(task_result) = join_set.join_next().await { + match process_joinset_outcome(task_result, &cancel_token) { + Ok(indexed_data) => { + indexed_results.push(indexed_data); + } + Err(e) => { + // Cancel all remaining tasks immediately + cancel_token.store(true, Ordering::SeqCst); + join_set.abort_all(); + return Err(e); + } } - Err(e) => { - // Cancel all remaining tasks immediately + } + Ok(()) + }; + + // Apply total timeout if configured + if let Some(total_timeout) = total_timeout_s { + let total_timeout_duration = Duration::from_secs_f64(total_timeout); + match tokio::time::timeout(total_timeout_duration, process_results).await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e), + Err(_) => { cancel_token.store(true, Ordering::SeqCst); join_set.abort_all(); - return Err(e); + return Err(ClientError::Timeout(format!( + "Batch post operation timed out after {:.3}s", + total_timeout + ))); } } + } else { + process_results.await?; } indexed_results.sort_by_key(|&(original_index, _, _, _)| original_index); diff --git a/baseten-performance-client/node_bindings/src/lib.rs b/baseten-performance-client/node_bindings/src/lib.rs index 5b1575069..a504a2e3d 100644 --- a/baseten-performance-client/node_bindings/src/lib.rs +++ b/baseten-performance-client/node_bindings/src/lib.rs @@ -368,6 +368,8 @@ impl PerformanceClient { payloads: Vec, max_concurrent_requests: Option, timeout_s: Option, + hedge_delay: Option, + total_timeout_s: Option, ) -> napi::Result { if payloads.is_empty() { return Err(create_napi_error("Payloads list cannot be empty")); @@ -379,7 +381,7 @@ impl PerformanceClient { let result = self .core_client - .process_batch_post_requests(url_path, payloads, max_concurrent_requests, timeout_s, None) + .process_batch_post_requests(url_path, payloads, max_concurrent_requests, timeout_s, hedge_delay, total_timeout_s) .await .map_err(convert_core_error_to_napi_error)?; diff --git a/baseten-performance-client/python_bindings/src/lib.rs b/baseten-performance-client/python_bindings/src/lib.rs index 8e8eb1d17..546bcc32a 100644 --- a/baseten-performance-client/python_bindings/src/lib.rs +++ b/baseten-performance-client/python_bindings/src/lib.rs @@ -725,7 +725,7 @@ impl PerformanceClient { pyo3_async_runtimes::tokio::future_into_py(py, future) } - #[pyo3(signature = (url_path, payloads, max_concurrent_requests = DEFAULT_CONCURRENCY, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, hedge_delay = None))] + #[pyo3(signature = (url_path, payloads, max_concurrent_requests = DEFAULT_CONCURRENCY, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, hedge_delay = None, total_timeout_s = None))] fn batch_post( &self, py: Python, @@ -734,6 +734,7 @@ impl PerformanceClient { max_concurrent_requests: usize, timeout_s: f64, hedge_delay: Option, + total_timeout_s: Option, ) -> PyResult { if payloads.is_empty() { return Err(PyValueError::new_err("Payloads list cannot be empty")); @@ -765,6 +766,7 @@ impl PerformanceClient { max_concurrent_requests, timeout_s, hedge_delay, + total_timeout_s, ) .await; let _ = tx.send(res); @@ -821,7 +823,7 @@ impl PerformanceClient { }) } - #[pyo3(name = "async_batch_post", signature = (url_path, payloads, max_concurrent_requests = DEFAULT_CONCURRENCY, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, hedge_delay = None))] + #[pyo3(name = "async_batch_post", signature = (url_path, payloads, max_concurrent_requests = DEFAULT_CONCURRENCY, timeout_s = DEFAULT_REQUEST_TIMEOUT_S, hedge_delay = None, total_timeout_s = None))] fn async_batch_post<'py>( &self, py: Python<'py>, @@ -830,6 +832,7 @@ impl PerformanceClient { max_concurrent_requests: usize, timeout_s: f64, hedge_delay: Option, + total_timeout_s: Option, ) -> PyResult> { if payloads.is_empty() { return Err(PyValueError::new_err("Payloads list cannot be empty")); @@ -857,6 +860,7 @@ impl PerformanceClient { max_concurrent_requests, timeout_s, hedge_delay, + total_timeout_s, ) .await .map_err(Self::convert_core_error_to_py_err)?; From dc34ee6779dd9557e7f6227de8b1adc730cd26e7 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Wed, 19 Nov 2025 14:14:16 -0800 Subject: [PATCH 09/13] fix trailing whitespaces --- baseten-performance-client/core/src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/baseten-performance-client/core/src/client.rs b/baseten-performance-client/core/src/client.rs index e6c63a954..cb880e843 100644 --- a/baseten-performance-client/core/src/client.rs +++ b/baseten-performance-client/core/src/client.rs @@ -595,7 +595,7 @@ impl PerformanceClientCore { // Validate parameters internally (using batch_size of 128 for validation) let (validated_concurrency, request_timeout_duration) = self.validate_request_parameters(max_concurrent_requests, 128, timeout_s)?; - + // Validate total_timeout_s if provided if let Some(total_timeout) = total_timeout_s { if !(MIN_TOTAL_TIMEOUT_S..=MAX_TOTAL_TIMEOUT_S).contains(&total_timeout) { @@ -611,7 +611,7 @@ impl PerformanceClientCore { ))); } } - + let semaphore = Arc::new(Semaphore::new(validated_concurrency)); let cancel_token = Arc::new(AtomicBool::new(false)); let total_payloads = payloads_json.len(); From dd929434ec1b704a6b7d7976c34b0beb33f1f9fa Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Wed, 19 Nov 2025 14:54:08 -0800 Subject: [PATCH 10/13] Bump baseten-performance-client Python to v0.0.12-rc.0 (PyPI only) --- baseten-performance-client/core/Cargo.toml | 2 +- baseten-performance-client/python_bindings/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/baseten-performance-client/core/Cargo.toml b/baseten-performance-client/core/Cargo.toml index 5fceaa2f9..47696e40b 100644 --- a/baseten-performance-client/core/Cargo.toml +++ b/baseten-performance-client/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "baseten_performance_client_core" -version = "0.0.12-dev.1" +version = "0.0.12-rc.0" edition = "2021" description = "High performance HTTP client for Baseten.co and other APIs" license = "MIT" diff --git a/baseten-performance-client/python_bindings/Cargo.toml b/baseten-performance-client/python_bindings/Cargo.toml index 042bf4807..5b6faae13 100644 --- a/baseten-performance-client/python_bindings/Cargo.toml +++ b/baseten-performance-client/python_bindings/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "baseten_performance_client" -version = "0.0.12-dev.1" +version = "0.0.12-rc.0" edition = "2021" [dependencies] From ad7fb796cdc3303a6ac8ce4c389501e0b0bb8d30 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Thu, 20 Nov 2025 09:28:16 -0800 Subject: [PATCH 11/13] Update .pyi file to match new timeout and retry parameters --- baseten-performance-client/Cargo.lock | 4 +- .../baseten_performance_client.pyi | 57 ++++++++++++++----- 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/baseten-performance-client/Cargo.lock b/baseten-performance-client/Cargo.lock index 2d7566691..47c8437c8 100644 --- a/baseten-performance-client/Cargo.lock +++ b/baseten-performance-client/Cargo.lock @@ -76,7 +76,7 @@ dependencies = [ [[package]] name = "baseten_performance_client" -version = "0.0.12-dev.1" +version = "0.0.12-rc.0" dependencies = [ "baseten_performance_client_core", "futures", @@ -93,7 +93,7 @@ dependencies = [ [[package]] name = "baseten_performance_client_core" -version = "0.0.12-dev.1" +version = "0.0.12-rc.0" dependencies = [ "futures", "once_cell", diff --git a/baseten-performance-client/python_bindings/baseten_performance_client.pyi b/baseten-performance-client/python_bindings/baseten_performance_client.pyi index ab1d31fc1..39586a8c8 100644 --- a/baseten-performance-client/python_bindings/baseten_performance_client.pyi +++ b/baseten-performance-client/python_bindings/baseten_performance_client.pyi @@ -278,9 +278,11 @@ class PerformanceClient: user: typing.Optional[builtins.str] = None, max_concurrent_requests: builtins.int = 32, # DEFAULT_CONCURRENCY batch_size: builtins.int = 16, # DEFAULT_BATCH_SIZE - timeout_s: builtins.float = 3600.0, # DEFAULT_REQUEST_TIMEOUT_S + timeout_s: typing.Optional[builtins.float] = None, max_chars_per_request: typing.Optional[builtins.int] = None, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, + max_retries: typing.Optional[builtins.int] = None, ) -> OpenAIEmbeddingsResponse: """ Sends a list of strings to the embedding endpoint to generate embeddings. @@ -293,9 +295,11 @@ class PerformanceClient: user: Optional user identifier. max_concurrent_requests: Maximum parallel requests. batch_size: Number of texts per batch. - timeout_s: Total timeout in seconds. + timeout_s: Optional per-request timeout in seconds. max_chars_per_request: Optional character-based batching limit. hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. + max_retries: Optional maximum number of retries (0 to disable retries). Returns: An OpenAIEmbeddingsResponse object. @@ -326,9 +330,11 @@ class PerformanceClient: truncation_direction: builtins.str = "Right", max_concurrent_requests: builtins.int = 32, # DEFAULT_CONCURRENCY batch_size: builtins.int = 16, # DEFAULT_BATCH_SIZE - timeout_s: builtins.float = 3600.0, # DEFAULT_REQUEST_TIMEOUT_S + timeout_s: typing.Optional[builtins.float] = None, max_chars_per_request: typing.Optional[builtins.int] = None, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, + max_retries: typing.Optional[builtins.int] = None, ) -> RerankResponse: """ Reranks a set of texts based on the provided query. @@ -342,9 +348,11 @@ class PerformanceClient: truncation_direction: Direction for truncation ('Right' by default). max_concurrent_requests: Maximum parallel requests. batch_size: Batch size for each request. - timeout_s: Overall timeout in seconds. + timeout_s: Optional per-request timeout in seconds. max_chars_per_request: Optional character-based batching limit. hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. + max_retries: Optional maximum number of retries (0 to disable retries). Returns: A RerankResponse object. @@ -368,9 +376,11 @@ class PerformanceClient: truncation_direction: builtins.str = "Right", max_concurrent_requests: builtins.int = 32, # DEFAULT_CONCURRENCY batch_size: builtins.int = 16, # DEFAULT_BATCH_SIZE - timeout_s: builtins.float = 3600.0, # DEFAULT_REQUEST_TIMEOUT_S + timeout_s: typing.Optional[builtins.float] = None, max_chars_per_request: typing.Optional[builtins.int] = None, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, + max_retries: typing.Optional[builtins.int] = None, ) -> ClassificationResponse: """ Classifies each input text. @@ -382,9 +392,11 @@ class PerformanceClient: truncation_direction: Truncation direction ('Right' by default). max_concurrent_requests: Maximum parallel requests. batch_size: Batch size for each request. - timeout_s: Overall timeout in seconds. + timeout_s: Optional per-request timeout in seconds. max_chars_per_request: Optional character-based batching limit. hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. + max_retries: Optional maximum number of retries (0 to disable retries). Returns: A ClassificationResponse object. @@ -408,6 +420,7 @@ class PerformanceClient: max_concurrent_requests: builtins.int = 32, # DEFAULT_CONCURRENCY timeout_s: builtins.float = 3600.0, # DEFAULT_REQUEST_TIMEOUT_S hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, ) -> BatchPostResponse: """ Sends a list of generic JSON payloads to a specified URL path concurrently. @@ -420,9 +433,9 @@ class PerformanceClient: payloads: A list of Python objects that are JSON-serializable. Each object will be the body of a POST request. max_concurrent_requests: Maximum number of parallel requests. - timeout_s: Total timeout in seconds for the entire batch operation, - also used as the timeout for each individual request. + timeout_s: Per-request timeout in seconds (defaults to 3600.0). hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. Returns: A BatchPostResponse object containing the list of responses, @@ -454,9 +467,11 @@ class PerformanceClient: user: typing.Optional[builtins.str] = None, max_concurrent_requests: builtins.int = 32, batch_size: builtins.int = 16, - timeout_s: builtins.float = 3600.0, + timeout_s: typing.Optional[builtins.float] = None, max_chars_per_request: typing.Optional[builtins.int] = None, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, + max_retries: typing.Optional[builtins.int] = None, ) -> OpenAIEmbeddingsResponse: """ Asynchronously sends a list of texts to the embedding endpoint to generate embeddings. @@ -469,9 +484,11 @@ class PerformanceClient: user: Optional user identifier. max_concurrent_requests: Maximum parallel requests. batch_size: Number of texts per batch. - timeout_s: Total timeout in seconds. + timeout_s: Optional per-request timeout in seconds. max_chars_per_request: Optional character-based batching limit. hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. + max_retries: Optional maximum number of retries (0 to disable retries). Returns: An awaitable OpenAIEmbeddingsResponse object. @@ -496,9 +513,11 @@ class PerformanceClient: truncation_direction: builtins.str = "Right", max_concurrent_requests: builtins.int = 32, batch_size: builtins.int = 16, - timeout_s: builtins.float = 3600.0, + timeout_s: typing.Optional[builtins.float] = None, max_chars_per_request: typing.Optional[builtins.int] = None, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, + max_retries: typing.Optional[builtins.int] = None, ) -> RerankResponse: """ Asynchronously reranks a set of texts based on the provided query. @@ -512,9 +531,11 @@ class PerformanceClient: truncation_direction: Direction for truncation ('Right' by default). max_concurrent_requests: Maximum parallel requests. batch_size: Batch size for each request. - timeout_s: Overall timeout in seconds. + timeout_s: Optional per-request timeout in seconds. max_chars_per_request: Optional character-based batching limit. hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. + max_retries: Optional maximum number of retries (0 to disable retries). Returns: An awaitable RerankResponse object. @@ -538,9 +559,11 @@ class PerformanceClient: truncation_direction: builtins.str = "Right", max_concurrent_requests: builtins.int = 32, batch_size: builtins.int = 16, - timeout_s: builtins.float = 3600.0, + timeout_s: typing.Optional[builtins.float] = None, max_chars_per_request: typing.Optional[builtins.int] = None, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, + max_retries: typing.Optional[builtins.int] = None, ) -> ClassificationResponse: """ Asynchronously classifies each input text. @@ -552,9 +575,11 @@ class PerformanceClient: truncation_direction: Truncation direction ('Right' by default). max_concurrent_requests: Maximum parallel requests. batch_size: Batch size for each request. - timeout_s: Overall timeout in seconds. + timeout_s: Optional per-request timeout in seconds. max_chars_per_request: Optional character-based batching limit. hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. + max_retries: Optional maximum number of retries (0 to disable retries). Returns: An awaitable ClassificationResponse object. @@ -578,6 +603,7 @@ class PerformanceClient: max_concurrent_requests: builtins.int = 32, timeout_s: builtins.float = 3600.0, hedge_delay: typing.Optional[builtins.float] = None, + total_timeout_s: typing.Optional[builtins.float] = None, ) -> BatchPostResponse: """ Asynchronously sends a list of generic JSON payloads to a specified URL path concurrently. @@ -586,8 +612,9 @@ class PerformanceClient: url_path: The specific API path to post to (e.g., "/v1/custom_endpoint"). payloads: A list of Python objects that are JSON-serializable. max_concurrent_requests: Maximum number of parallel requests. - timeout_s: Total timeout in seconds for the batch operation. + timeout_s: Per-request timeout in seconds (defaults to 3600.0). hedge_delay: Optional request hedging delay in seconds. + total_timeout_s: Optional overall operation timeout in seconds. Returns: An awaitable BatchPostResponse object. From fc732bfb9ff94635ddfe687e623a9a27c9e76ce1 Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Thu, 20 Nov 2025 09:56:55 -0800 Subject: [PATCH 12/13] Bump version to 0.0.12-rc.1 (includes updated .pyi file) --- baseten-performance-client/core/Cargo.toml | 2 +- baseten-performance-client/python_bindings/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/baseten-performance-client/core/Cargo.toml b/baseten-performance-client/core/Cargo.toml index 47696e40b..eb63b671d 100644 --- a/baseten-performance-client/core/Cargo.toml +++ b/baseten-performance-client/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "baseten_performance_client_core" -version = "0.0.12-rc.0" +version = "0.0.12-rc.1" edition = "2021" description = "High performance HTTP client for Baseten.co and other APIs" license = "MIT" diff --git a/baseten-performance-client/python_bindings/Cargo.toml b/baseten-performance-client/python_bindings/Cargo.toml index 5b6faae13..e5a3ffbb8 100644 --- a/baseten-performance-client/python_bindings/Cargo.toml +++ b/baseten-performance-client/python_bindings/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "baseten_performance_client" -version = "0.0.12-rc.0" +version = "0.0.12-rc.1" edition = "2021" [dependencies] From 3bd2fd2b3ef29583e4064c030c68cff669e9a1bf Mon Sep 17 00:00:00 2001 From: Jojo Ortiz Date: Thu, 20 Nov 2025 10:46:46 -0800 Subject: [PATCH 13/13] lower min timeout and hedge delay to 0.1 --- baseten-performance-client/Cargo.lock | 4 ++-- baseten-performance-client/core/src/constants.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/baseten-performance-client/Cargo.lock b/baseten-performance-client/Cargo.lock index 47c8437c8..246058ffc 100644 --- a/baseten-performance-client/Cargo.lock +++ b/baseten-performance-client/Cargo.lock @@ -76,7 +76,7 @@ dependencies = [ [[package]] name = "baseten_performance_client" -version = "0.0.12-rc.0" +version = "0.0.12-rc.1" dependencies = [ "baseten_performance_client_core", "futures", @@ -93,7 +93,7 @@ dependencies = [ [[package]] name = "baseten_performance_client_core" -version = "0.0.12-rc.0" +version = "0.0.12-rc.1" dependencies = [ "futures", "once_cell", diff --git a/baseten-performance-client/core/src/constants.rs b/baseten-performance-client/core/src/constants.rs index 3f7a3d3f1..efc0a712b 100644 --- a/baseten-performance-client/core/src/constants.rs +++ b/baseten-performance-client/core/src/constants.rs @@ -2,11 +2,11 @@ use std::time::Duration; // Request timeout constants pub const DEFAULT_REQUEST_TIMEOUT_S: f64 = 3600.0; -pub const MIN_REQUEST_TIMEOUT_S: f64 = 0.5; +pub const MIN_REQUEST_TIMEOUT_S: f64 = 0.1; pub const MAX_REQUEST_TIMEOUT_S: f64 = 3600.0; // Total timeout constants -pub const MIN_TOTAL_TIMEOUT_S: f64 = 0.5; +pub const MIN_TOTAL_TIMEOUT_S: f64 = 0.1; pub const MAX_TOTAL_TIMEOUT_S: f64 = 3600.0; // Concurrency constants @@ -18,7 +18,7 @@ pub const MIN_CHARACTERS_PER_REQUEST: usize = 50; pub const MAX_CHARACTERS_PER_REQUEST: usize = 256000; // hedging settings: -pub const MIN_HEDGE_DELAY_S: f64 = 0.2; +pub const MIN_HEDGE_DELAY_S: f64 = 0.1; pub const HEDGE_BUDGET_PERCENTAGE: f64 = 0.10; // Batch size constants