From 902d2fc043db6b8adc2c20bace8b2fbbfca9c20e Mon Sep 17 00:00:00 2001 From: jlizen Date: Thu, 5 Feb 2026 06:16:06 +0000 Subject: [PATCH 1/3] refactor(lambda-managed-instances): warn on `run()` with `AWS_LAMBDA_MAX_CONCURRENCY, rename feature experimental-concurrency -> concurrency->tokio --- examples/basic-lambda-concurrent/Cargo.toml | 2 +- lambda-http/Cargo.toml | 2 +- lambda-http/src/lib.rs | 43 +++++++---- lambda-http/src/streaming.rs | 32 ++++++--- lambda-runtime/Cargo.toml | 2 +- lambda-runtime/src/lib.rs | 28 ++++++-- lambda-runtime/src/runtime.rs | 79 ++++++++++++++------- 7 files changed, 130 insertions(+), 58 deletions(-) diff --git a/examples/basic-lambda-concurrent/Cargo.toml b/examples/basic-lambda-concurrent/Cargo.toml index b65220af..7c770aea 100644 --- a/examples/basic-lambda-concurrent/Cargo.toml +++ b/examples/basic-lambda-concurrent/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -lambda_runtime = { path = "../../lambda-runtime", features = ["experimental-concurrency"] } +lambda_runtime = { path = "../../lambda-runtime", features = ["concurrency-tokio"] } serde = "1.0.219" tokio = { version = "1", features = ["macros"] } diff --git a/lambda-http/Cargo.toml b/lambda-http/Cargo.toml index 6106cf4a..8f3f94a2 100644 --- a/lambda-http/Cargo.toml +++ b/lambda-http/Cargo.toml @@ -29,7 +29,7 @@ opentelemetry = ["lambda_runtime/opentelemetry"] # enables access to the OpenTel anyhow = ["lambda_runtime/anyhow"] # enables From for Diagnostic for anyhow error types, see README.md for more info eyre = ["lambda_runtime/eyre"] # enables From for Diagnostic for eyre error types, see README.md for more info miette = ["lambda_runtime/miette"] # enables From for Diagnostic for miette error types, see README.md for more info -experimental-concurrency = ["lambda_runtime/experimental-concurrency"] +concurrency-tokio = ["lambda_runtime/concurrency-tokio"] [dependencies] bytes = { workspace = true } diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 12a9cf69..bf0c2f9b 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -102,7 +102,7 @@ use std::{ }; mod streaming; -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] pub use streaming::run_with_streaming_response_concurrent; pub use streaming::{run_with_streaming_response, StreamAdapter}; @@ -211,11 +211,17 @@ where /// converting the result into a `LambdaResponse`. /// /// # Managed concurrency -/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because -/// it does not enable concurrent polling. If your handler can satisfy `Clone`, -/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature), +/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged. +/// If your handler can satisfy `Clone + Send + 'static`, +/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature), /// which honors managed concurrency and falls back to sequential behavior when /// unset. +/// +/// # Panics +/// +/// This function panics if required Lambda environment variables are missing +/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, +/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`). pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error> where S: Service, @@ -226,18 +232,29 @@ where lambda_runtime::run(Adapter::from(handler)).await } -/// Starts the Lambda Rust runtime in a mode that is compatible with -/// Lambda Managed Instances (concurrent invocations). +/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda +/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html). +/// +/// This takes care of transforming the LambdaEvent into a [`Request`] and then +/// converting the result into a `LambdaResponse`. /// -/// Requires the `experimental-concurrency` feature. +/// # Managed concurrency /// /// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this -/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own -/// `/next` polling loop. When the environment variable is unset or `<= 1`, -/// it falls back to the same sequential behavior as [`run`], so the same -/// handler can run on both classic Lambda and Lambda Managed Instances. -#[cfg(feature = "experimental-concurrency")] -#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))] +/// function spawns multiple tokio worker tasks to handle concurrent invocations. +/// When the environment variable is unset or `<= 1`, it falls back to +/// sequential behavior, so the same handler can run on both classic Lambda +/// and Lambda Managed Instances. +/// +/// # Panics +/// +/// This function panics if: +/// - Called outside of a Tokio runtime with `AWS_LAMBDA_MAX_CONCURRENCY > 1` +/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`, +/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`, +/// `AWS_LAMBDA_RUNTIME_API`) +#[cfg(feature = "concurrency-tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))] pub async fn run_concurrent(handler: S) -> Result<(), Error> where S: Service + Clone + Send + 'static, diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 141068b9..2c0a99ce 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -114,10 +114,10 @@ where /// Builds a streaming-aware Tower service from a `Service` that can be /// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint. -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] type EventToRequest = fn(LambdaEvent) -> Request; -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] #[allow(clippy::type_complexity)] fn into_stream_service_cloneable( handler: S, @@ -178,10 +178,16 @@ fn event_to_request(req: LambdaEvent) -> Request { /// # Managed concurrency /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because /// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`] -/// (requires the `experimental-concurrency` feature) instead. +/// (requires the `concurrency-tokio` feature) instead. /// /// [AWS docs for response streaming]: /// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html +/// +/// # Panics +/// +/// This function panics if required Lambda environment variables are missing +/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, +/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`). pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> where S: Service, Error = E>, @@ -197,13 +203,21 @@ where /// Runs the Lambda runtime with a handler that returns **streaming** HTTP /// responses, in a mode that is compatible with Lambda Managed Instances. /// -/// Requires the `experimental-concurrency` feature. +/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this +/// spawns multiple tokio worker tasks to handle concurrent invocations. When the +/// environment variable is unset or `<= 1`, it falls back to sequential +/// behavior, so the same handler can run on both classic Lambda and Lambda +/// Managed Instances. +/// +/// # Panics /// -/// This uses a cloneable, boxed service internally so it can be driven by the -/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`, -/// it falls back to the same sequential behavior as [`run_with_streaming_response`]. -#[cfg(feature = "experimental-concurrency")] -#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))] +/// This function panics if: +/// - Called outside of a Tokio runtime +/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`, +/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`, +/// `AWS_LAMBDA_RUNTIME_API`) +#[cfg(feature = "concurrency-tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))] pub async fn run_with_streaming_response_concurrent(handler: S) -> Result<(), Error> where S: Service, Error = E> + Clone + Send + 'static, diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 9b02931e..20084894 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -25,7 +25,7 @@ miette = ["dep:miette"] # enables From for Diagnostic for miette error types, # as well as default features # https://github.com/aws/aws-lambda-rust-runtime/issues/984 graceful-shutdown = ["tokio/rt", "tokio/signal", "dep:lambda-extension"] -experimental-concurrency = [] +concurrency-tokio = [] [dependencies] anyhow = { version = "1.0.86", optional = true } diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index f62240f4..14805087 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -95,9 +95,9 @@ where /// [Runtime] type directly. /// /// # Managed concurrency -/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because -/// it does not enable concurrent polling. If your handler can satisfy `Clone`, -/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature), +/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged. +/// If your handler can satisfy `Clone + Send + 'static`, +/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature), /// which honors managed concurrency and falls back to sequential behavior when /// unset. /// @@ -117,6 +117,12 @@ where /// Ok(event.payload) /// } /// ``` +/// +/// # Panics +/// +/// This function panics if required Lambda environment variables are missing +/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, +/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`). pub async fn run(handler: F) -> Result<(), Error> where F: Service, Response = R>, @@ -136,10 +142,10 @@ where /// Starts the Lambda Rust runtime in a mode that is compatible with /// Lambda Managed Instances (concurrent invocations). /// -/// Requires the `experimental-concurrency` feature. +/// Requires the `concurrency-tokio` feature. /// /// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this -/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own +/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` tokio worker tasks, each running its own /// `/next` polling loop. When the environment variable is unset or `<= 1`, it /// falls back to the same sequential behavior as [`run`], so the same handler /// can run on both classic Lambda and Lambda Managed Instances. @@ -163,8 +169,16 @@ where /// Ok(event.payload) /// } /// ``` -#[cfg(feature = "experimental-concurrency")] -#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))] +/// +/// # Panics +/// +/// This function panics if: +/// - Called outside of a Tokio runtime +/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`, +/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`, +/// `AWS_LAMBDA_RUNTIME_API`) +#[cfg(feature = "concurrency-tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))] pub async fn run_concurrent(handler: F) -> Result<(), Error> where F: Service, Response = R> + Clone + Send + 'static, diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index e9a6bb27..191b55ae 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -4,18 +4,18 @@ use crate::{ types::{invoke_request_id, IntoFunctionResponse, LambdaEvent}, Config, Context, Diagnostic, }; -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] use futures::stream::FuturesUnordered; use http_body_util::BodyExt; use lambda_runtime_api_client::{BoxError, Client as ApiClient}; use serde::{Deserialize, Serialize}; -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] use std::fmt; -use std::{env, fmt::Debug, future::Future, io, sync::Arc}; +use std::{env, fmt::Debug, future::Future, sync::Arc}; use tokio_stream::{Stream, StreamExt}; use tower::{Layer, Service, ServiceExt}; use tracing::trace; -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] use tracing::{debug, error, info_span, warn, Instrument}; /* ----------------------------------------- INVOCATION ---------------------------------------- */ @@ -96,6 +96,13 @@ where /// Note that manually creating a [Runtime] does not add tracing to the executed handler /// as is done by [super::run]. If you want to add the default tracing functionality, call /// [Runtime::layer] with a [super::layers::TracingLayer]. + /// + /// + /// # Panics + /// + /// This function panics if required Lambda environment variables are missing + /// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, + /// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`). pub fn new(handler: F) -> Self { trace!("Loading config from env"); let config = Arc::new(Config::from_env()); @@ -154,19 +161,30 @@ impl Runtime { } } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] impl Runtime where S: Service + Clone + Send + 'static, S::Future: Send, { - /// Start the runtime in concurrent mode when configured for Lambda managed-concurrency. + /// Start the runtime and begin polling for events on the Lambda Runtime API, + /// in a mode that is compatible with Lambda Managed Instances. + /// + /// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this + /// spawns multiple tokio worker tasks to handle concurrent invocations. When the + /// environment variable is unset or `<= 1`, it falls back to sequential + /// behavior, so the same handler can run on both classic Lambda and Lambda + /// Managed Instances. + /// + /// # Panics /// - /// If `AWS_LAMBDA_MAX_CONCURRENCY` is not set or is `<= 1`, this falls back to the - /// sequential `run_with_incoming` loop so that the same handler can run on both - /// classic Lambda and Lambda Managed Instances. - #[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))] + /// This function panics if called outside of a Tokio runtime. + #[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))] pub async fn run_concurrent(self) -> Result<(), BoxError> { + if tokio::runtime::Handle::try_current().is_err() { + panic!("`run_concurrent` must be called from within a Tokio runtime"); + } + if self.concurrency_limit > 1 { trace!("Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id"); Self::run_concurrent_inner(self.service, self.config, self.client, self.concurrency_limit).await @@ -259,20 +277,20 @@ where } } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] #[derive(Debug)] enum WorkerError { CleanExit(tokio::task::Id), Failure(tokio::task::Id, BoxError), } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] #[derive(Debug)] struct ConcurrentWorkerErrors { errors: Vec, } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] #[derive(Serialize)] struct ConcurrentWorkerErrorsPayload<'a> { message: &'a str, @@ -282,14 +300,14 @@ struct ConcurrentWorkerErrorsPayload<'a> { failures: Vec, } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] #[derive(Serialize)] struct WorkerFailurePayload { id: String, err: String, } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] impl fmt::Display for ConcurrentWorkerErrors { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut clean = Vec::new(); @@ -326,7 +344,7 @@ impl fmt::Display for ConcurrentWorkerErrors { } } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] impl std::error::Error for ConcurrentWorkerErrors {} impl Runtime @@ -335,14 +353,23 @@ where { /// Start the runtime and begin polling for events on the Lambda Runtime API. /// - /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this returns an error because it does not enable - /// concurrent polling. Enable the `experimental-concurrency` feature and use - /// [`Runtime::run_concurrent`] instead. + /// The runtime will process requests sequentially. + /// + /// # Managed concurrency + /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged. + /// If your handler can satisfy `Clone + Send + 'static`, + /// prefer [`Runtime::run_concurrent`] (requires the `concurrency-tokio` feature), + /// which honors managed concurrency and falls back to sequential behavior when + /// unset. pub async fn run(self) -> Result<(), BoxError> { if let Some(raw) = concurrency_env_value() { - return Err(Box::new(io::Error::other(format!( - "AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but Runtime::run does not support concurrent polling; enable the experimental-concurrency feature and use Runtime::run_concurrent instead" - )))); + if tracing::dispatcher::has_been_set() { + tracing::warn!( + "AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially", + ); + } else { + eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially"); + } } let incoming = incoming(&self.client); Self::run_with_incoming(self.service, self.config, incoming).await @@ -412,7 +439,7 @@ fn incoming( } /// Creates a future that polls the `/next` endpoint. -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] async fn next_event_future(client: &ApiClient) -> Result, BoxError> { let req = NextEventRequest.into_req()?; client.call(req).await @@ -429,7 +456,7 @@ fn concurrency_env_value() -> Option { env::var("AWS_LAMBDA_MAX_CONCURRENCY").ok() } -#[cfg(feature = "experimental-concurrency")] +#[cfg(feature = "concurrency-tokio")] async fn concurrent_worker_loop(mut service: S, config: Arc, client: Arc) -> Result<(), BoxError> where S: Service, @@ -760,7 +787,7 @@ mod endpoint_tests { .await } - #[cfg(feature = "experimental-concurrency")] + #[cfg(feature = "concurrency-tokio")] #[tokio::test] async fn concurrent_worker_crash_does_not_stop_other_workers() -> Result<(), Error> { let next_calls = Arc::new(AtomicUsize::new(0)); @@ -910,7 +937,7 @@ mod endpoint_tests { } #[tokio::test] - #[cfg(feature = "experimental-concurrency")] + #[cfg(feature = "concurrency-tokio")] async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> { use std::collections::HashSet; use tracing::info; From cdb65c2a4509aee6b626080831945d0aa221846f Mon Sep 17 00:00:00 2001 From: jlizen Date: Thu, 5 Feb 2026 04:33:53 +0000 Subject: [PATCH 2/3] chore: add tokio_unstable to known cfgs to avoid linter warns --- lambda-runtime/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 20084894..7d0b570b 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -14,6 +14,9 @@ categories = ["web-programming::http-server"] keywords = ["AWS", "Lambda", "API"] readme = "../README.md" +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } + [features] default = ["tracing"] tracing = ["lambda_runtime_api_client/tracing"] # enables access to the Tracing utilities From 029dd71e36fabfac4f9261a1478d199578484e58 Mon Sep 17 00:00:00 2001 From: jlizen Date: Thu, 5 Feb 2026 04:36:46 +0000 Subject: [PATCH 3/3] chore(ci): only run integration tests on aws/aws-lambda-runtime-rust since they require creds --- .github/workflows/run-integration-test.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/run-integration-test.yml b/.github/workflows/run-integration-test.yml index ad85da12..4bebf825 100644 --- a/.github/workflows/run-integration-test.yml +++ b/.github/workflows/run-integration-test.yml @@ -10,6 +10,8 @@ on: jobs: run-integration-tests: + # Only run on the main repo, not forks + if: ${{ github.repository_owner == 'aws' }} runs-on: ubuntu-latest steps: - name: install Cargo Lambda