Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/run-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ on:

jobs:
run-integration-tests:
# Only run on the main repo, not forks
if: ${{ github.repository_owner == 'aws' }}
runs-on: ubuntu-latest
steps:
- name: install Cargo Lambda
Expand Down
2 changes: 1 addition & 1 deletion examples/basic-lambda-concurrent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ version = "0.1.0"
edition = "2021"

[dependencies]
lambda_runtime = { path = "../../lambda-runtime", features = ["experimental-concurrency"] }
lambda_runtime = { path = "../../lambda-runtime", features = ["concurrency-tokio"] }
serde = "1.0.219"
tokio = { version = "1", features = ["macros"] }
2 changes: 1 addition & 1 deletion lambda-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ opentelemetry = ["lambda_runtime/opentelemetry"] # enables access to the OpenTel
anyhow = ["lambda_runtime/anyhow"] # enables From<T> for Diagnostic for anyhow error types, see README.md for more info
eyre = ["lambda_runtime/eyre"] # enables From<T> for Diagnostic for eyre error types, see README.md for more info
miette = ["lambda_runtime/miette"] # enables From<T> for Diagnostic for miette error types, see README.md for more info
experimental-concurrency = ["lambda_runtime/experimental-concurrency"]
concurrency-tokio = ["lambda_runtime/concurrency-tokio"]

[dependencies]
bytes = { workspace = true }
Expand Down
43 changes: 30 additions & 13 deletions lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
#[cfg(feature = "experimental-concurrency")]
#[cfg(feature = "concurrency-tokio")]
pub use streaming::run_with_streaming_response_concurrent;
pub use streaming::{run_with_streaming_response, StreamAdapter};

Expand Down Expand Up @@ -211,11 +211,17 @@ where
/// converting the result into a `LambdaResponse`.
///
/// # Managed concurrency
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature),
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged.
/// If your handler can satisfy `Clone + Send + 'static`,
/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature),
/// which honors managed concurrency and falls back to sequential behavior when
/// unset.
///
/// # Panics
///
/// This function panics if required Lambda environment variables are missing
/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = R, Error = E>,
Expand All @@ -226,18 +232,29 @@ where
lambda_runtime::run(Adapter::from(handler)).await
}

/// Starts the Lambda Rust runtime in a mode that is compatible with
/// Lambda Managed Instances (concurrent invocations).
/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and then
/// converting the result into a `LambdaResponse`.
///
/// Requires the `experimental-concurrency` feature.
/// # Managed concurrency
///
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
/// `/next` polling loop. When the environment variable is unset or `<= 1`,
/// it falls back to the same sequential behavior as [`run`], so the same
/// handler can run on both classic Lambda and Lambda Managed Instances.
#[cfg(feature = "experimental-concurrency")]
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
/// function spawns multiple tokio worker tasks to handle concurrent invocations.
/// When the environment variable is unset or `<= 1`, it falls back to
/// sequential behavior, so the same handler can run on both classic Lambda
/// and Lambda Managed Instances.
///
/// # Panics
///
/// This function panics if:
/// - Called outside of a Tokio runtime with `AWS_LAMBDA_MAX_CONCURRENCY > 1`
/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`,
/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`,
/// `AWS_LAMBDA_RUNTIME_API`)
#[cfg(feature = "concurrency-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
Expand Down
32 changes: 23 additions & 9 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ where

/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
#[cfg(feature = "experimental-concurrency")]
#[cfg(feature = "concurrency-tokio")]
type EventToRequest = fn(LambdaEvent<LambdaRequest>) -> Request;

#[cfg(feature = "experimental-concurrency")]
#[cfg(feature = "concurrency-tokio")]
#[allow(clippy::type_complexity)]
fn into_stream_service_cloneable<S, B, E>(
handler: S,
Expand Down Expand Up @@ -178,10 +178,16 @@ fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
/// # Managed concurrency
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
/// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`]
/// (requires the `experimental-concurrency` feature) instead.
/// (requires the `concurrency-tokio` feature) instead.
///
/// [AWS docs for response streaming]:
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
///
/// # Panics
///
/// This function panics if required Lambda environment variables are missing
/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E>,
Expand All @@ -197,13 +203,21 @@ where
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses, in a mode that is compatible with Lambda Managed Instances.
///
/// Requires the `experimental-concurrency` feature.
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
/// spawns multiple tokio worker tasks to handle concurrent invocations. When the
/// environment variable is unset or `<= 1`, it falls back to sequential
/// behavior, so the same handler can run on both classic Lambda and Lambda
/// Managed Instances.
///
/// # Panics
///
/// This uses a cloneable, boxed service internally so it can be driven by the
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
#[cfg(feature = "experimental-concurrency")]
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
/// This function panics if:
/// - Called outside of a Tokio runtime
/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`,
/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`,
/// `AWS_LAMBDA_RUNTIME_API`)
#[cfg(feature = "concurrency-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
Expand Down
5 changes: 4 additions & 1 deletion lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ categories = ["web-programming::http-server"]
keywords = ["AWS", "Lambda", "API"]
readme = "../README.md"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[features]
default = ["tracing"]
tracing = ["lambda_runtime_api_client/tracing"] # enables access to the Tracing utilities
Expand All @@ -25,7 +28,7 @@ miette = ["dep:miette"] # enables From<T> for Diagnostic for miette error types,
# as well as default features
# https://github.com/aws/aws-lambda-rust-runtime/issues/984
graceful-shutdown = ["tokio/rt", "tokio/signal", "dep:lambda-extension"]
experimental-concurrency = []
concurrency-tokio = []

[dependencies]
anyhow = { version = "1.0.86", optional = true }
Expand Down
28 changes: 21 additions & 7 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ where
/// [Runtime] type directly.
///
/// # Managed concurrency
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature),
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged.
/// If your handler can satisfy `Clone + Send + 'static`,
/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature),
/// which honors managed concurrency and falls back to sequential behavior when
/// unset.
///
Expand All @@ -117,6 +117,12 @@ where
/// Ok(event.payload)
/// }
/// ```
///
/// # Panics
///
/// This function panics if required Lambda environment variables are missing
/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
where
F: Service<LambdaEvent<A>, Response = R>,
Expand All @@ -136,10 +142,10 @@ where
/// Starts the Lambda Rust runtime in a mode that is compatible with
/// Lambda Managed Instances (concurrent invocations).
///
/// Requires the `experimental-concurrency` feature.
/// Requires the `concurrency-tokio` feature.
///
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` tokio worker tasks, each running its own
/// `/next` polling loop. When the environment variable is unset or `<= 1`, it
/// falls back to the same sequential behavior as [`run`], so the same handler
/// can run on both classic Lambda and Lambda Managed Instances.
Expand All @@ -163,8 +169,16 @@ where
/// Ok(event.payload)
/// }
/// ```
#[cfg(feature = "experimental-concurrency")]
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
///
/// # Panics
///
/// This function panics if:
/// - Called outside of a Tokio runtime
/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`,
/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`,
/// `AWS_LAMBDA_RUNTIME_API`)
#[cfg(feature = "concurrency-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
where
F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,
Expand Down
Loading