diff --git a/.github/workflows/nuget-package.yml b/.github/workflows/nuget-package.yml index c08c88fd..8ecfc2f3 100644 --- a/.github/workflows/nuget-package.yml +++ b/.github/workflows/nuget-package.yml @@ -171,6 +171,10 @@ jobs: src/Temporalio/bin/Release/*.snupkg src/Temporalio.Extensions.DiagnosticSource/bin/Release/*.nupkg src/Temporalio.Extensions.DiagnosticSource/bin/Release/*.snupkg + src/Temporalio.Extensions.Aws.Lambda/bin/Release/*.nupkg + src/Temporalio.Extensions.Aws.Lambda/bin/Release/*.snupkg + src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/bin/Release/*.nupkg + src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/bin/Release/*.snupkg src/Temporalio.Extensions.Hosting/bin/Release/*.nupkg src/Temporalio.Extensions.Hosting/bin/Release/*.snupkg src/Temporalio.Extensions.OpenTelemetry/bin/Release/*.nupkg diff --git a/Directory.Packages.props b/Directory.Packages.props index a57f3505..34ba9ffd 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -3,6 +3,7 @@ true + @@ -15,8 +16,11 @@ + + + diff --git a/README.md b/README.md index 5fa1d8e6..72c2362b 100644 --- a/README.md +++ b/README.md @@ -276,6 +276,26 @@ var client = await TemporalClient.ConnectAsync(new("my-namespace.a1b2c.tmprl.clo }); ``` +#### Client Configuration From Environment + +Client connection options can be loaded from a `temporal.toml` file and environment variables: + +```csharp +using Temporalio.Client; +using Temporalio.Common.EnvConfig; + +var client = await TemporalClient.ConnectAsync( + ClientEnvConfig.LoadClientConnectOptions()); +``` + +By default, the loader checks `TEMPORAL_CONFIG_FILE`; if unset, it looks for `temporal.toml` in the +user config directory under `temporalio`. The selected profile is `TEMPORAL_PROFILE`, or `default` +when unset. Environment variables such as `TEMPORAL_ADDRESS`, `TEMPORAL_NAMESPACE`, +`TEMPORAL_API_KEY`, TLS certificate/key settings, and `TEMPORAL_GRPC_META_*` override file values. + +Use `ClientEnvConfig.ProfileLoadOptions` to choose a profile, provide an explicit config source, or +disable file or environment loading. + #### Client Dependency Injection To create clients for use with dependency injection, see the diff --git a/Temporalio.sln b/Temporalio.sln index be9ec1f3..faaca396 100644 --- a/Temporalio.sln +++ b/Temporalio.sln @@ -17,8 +17,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.Extensions.Hosti EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.Extensions.DiagnosticSource", "src\Temporalio.Extensions.DiagnosticSource\Temporalio.Extensions.DiagnosticSource.csproj", "{CC7EA7CD-BBE7-448C-8A4B-F8B2D1E55990}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.Extensions.Aws.Lambda", "src\Temporalio.Extensions.Aws.Lambda\Temporalio.Extensions.Aws.Lambda.csproj", "{B7CDF2C9-1D1D-446C-AF90-6C758D7DF19D}" +EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.SimpleBench", "tests\Temporalio.SimpleBench\Temporalio.SimpleBench.csproj", "{2610AFAE-FD3A-4583-8CA5-4869E1347A3C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.Extensions.Aws.Lambda.OpenTelemetry", "src\Temporalio.Extensions.Aws.Lambda.OpenTelemetry\Temporalio.Extensions.Aws.Lambda.OpenTelemetry.csproj", "{9A2C7274-7ED2-4C92-BE92-13887C3309B4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -48,10 +52,18 @@ Global {CC7EA7CD-BBE7-448C-8A4B-F8B2D1E55990}.Debug|Any CPU.Build.0 = Debug|Any CPU {CC7EA7CD-BBE7-448C-8A4B-F8B2D1E55990}.Release|Any CPU.ActiveCfg = Release|Any CPU {CC7EA7CD-BBE7-448C-8A4B-F8B2D1E55990}.Release|Any CPU.Build.0 = Release|Any CPU + {B7CDF2C9-1D1D-446C-AF90-6C758D7DF19D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B7CDF2C9-1D1D-446C-AF90-6C758D7DF19D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B7CDF2C9-1D1D-446C-AF90-6C758D7DF19D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B7CDF2C9-1D1D-446C-AF90-6C758D7DF19D}.Release|Any CPU.Build.0 = Release|Any CPU {2610AFAE-FD3A-4583-8CA5-4869E1347A3C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {2610AFAE-FD3A-4583-8CA5-4869E1347A3C}.Debug|Any CPU.Build.0 = Debug|Any CPU {2610AFAE-FD3A-4583-8CA5-4869E1347A3C}.Release|Any CPU.ActiveCfg = Release|Any CPU {2610AFAE-FD3A-4583-8CA5-4869E1347A3C}.Release|Any CPU.Build.0 = Release|Any CPU + {9A2C7274-7ED2-4C92-BE92-13887C3309B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9A2C7274-7ED2-4C92-BE92-13887C3309B4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9A2C7274-7ED2-4C92-BE92-13887C3309B4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9A2C7274-7ED2-4C92-BE92-13887C3309B4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {7AE1422A-0937-40D7-9A62-431DD0E2F6D5} = {758B61E2-9AB6-46BF-B53C-16BD140BF56B} @@ -59,6 +71,8 @@ Global {D4AC2E2B-1C24-491D-9175-874D448D30FE} = {758B61E2-9AB6-46BF-B53C-16BD140BF56B} {E8D1975A-5AF7-4375-BAD0-3C256DCB7F87} = {758B61E2-9AB6-46BF-B53C-16BD140BF56B} {CC7EA7CD-BBE7-448C-8A4B-F8B2D1E55990} = {758B61E2-9AB6-46BF-B53C-16BD140BF56B} + {B7CDF2C9-1D1D-446C-AF90-6C758D7DF19D} = {758B61E2-9AB6-46BF-B53C-16BD140BF56B} {2610AFAE-FD3A-4583-8CA5-4869E1347A3C} = {F2683DAA-F157-448E-96C8-DF7BB019886D} + {9A2C7274-7ED2-4C92-BE92-13887C3309B4} = {758B61E2-9AB6-46BF-B53C-16BD140BF56B} EndGlobalSection EndGlobal diff --git a/src/Temporalio.ApiDoc/api/index.md b/src/Temporalio.ApiDoc/api/index.md index 00aa778f..866d67c7 100644 --- a/src/Temporalio.ApiDoc/api/index.md +++ b/src/Temporalio.ApiDoc/api/index.md @@ -16,5 +16,7 @@ Commonly used namespaces: Extensions: * [Temporalio.Extensions.DiagnosticSource](/api/Temporalio.Extensions.DiagnosticSource.html) +* [Temporalio.Extensions.Aws.Lambda](/api/Temporalio.Extensions.Aws.Lambda.html) +* [Temporalio.Extensions.Aws.Lambda.OpenTelemetry](/api/Temporalio.Extensions.Aws.Lambda.OpenTelemetry.html) * [Temporalio.Extensions.Hosting](/api/Temporalio.Extensions.Hosting.html) -* [Temporalio.Extensions.OpenTelemetry](/api/Temporalio.Extensions.OpenTelemetry.html) \ No newline at end of file +* [Temporalio.Extensions.OpenTelemetry](/api/Temporalio.Extensions.OpenTelemetry.html) diff --git a/src/Temporalio.ApiDoc/docfx.json b/src/Temporalio.ApiDoc/docfx.json index 9adb184e..4606b23e 100644 --- a/src/Temporalio.ApiDoc/docfx.json +++ b/src/Temporalio.ApiDoc/docfx.json @@ -6,6 +6,8 @@ "files": [ "Temporalio/*.csproj", "Temporalio.Extensions.DiagnosticSource/*.csproj", + "Temporalio.Extensions.Aws.Lambda/*.csproj", + "Temporalio.Extensions.Aws.Lambda.OpenTelemetry/*.csproj", "Temporalio.Extensions.Hosting/*.csproj", "Temporalio.Extensions.OpenTelemetry/*.csproj" ], @@ -72,4 +74,4 @@ "keepFileLink": false, "disableGitFeatures": false } -} \ No newline at end of file +} diff --git a/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/LambdaWorkerOpenTelemetry.cs b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/LambdaWorkerOpenTelemetry.cs new file mode 100644 index 00000000..3db4c63c --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/LambdaWorkerOpenTelemetry.cs @@ -0,0 +1,204 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry; +using OpenTelemetry.Exporter; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; +using Temporalio.Client.Interceptors; +using Temporalio.Runtime; +using TemporalOpenTelemetry = Temporalio.Extensions.OpenTelemetry; + +namespace Temporalio.Extensions.Aws.Lambda.OpenTelemetry +{ + /// + /// OpenTelemetry helpers for Temporal workers running inside AWS Lambda. + /// + public static class LambdaWorkerOpenTelemetry + { + private const string DefaultCollectorEndpoint = "http://localhost:4317"; + private const string DefaultServiceName = "temporal-lambda-worker"; + private const string OTelExporterOtlpEndpointEnvironmentVariable = + "OTEL_EXPORTER_OTLP_ENDPOINT"; + + private const string OTelServiceNameEnvironmentVariable = "OTEL_SERVICE_NAME"; + private const string LambdaFunctionNameEnvironmentVariable = "AWS_LAMBDA_FUNCTION_NAME"; + private const string ServiceNameResourceAttribute = "service.name"; + + /// + /// Configure OpenTelemetry metrics and tracing with AWS Lambda defaults. + /// + /// Lambda worker configuration to mutate. + /// Optional OpenTelemetry configuration. + /// + /// This creates an OTLP trace exporter and tracer provider, configures Core SDK metrics + /// through a Temporal runtime, adds the Temporal tracing interceptor, and registers a + /// per-invocation shutdown hook to force-flush traces before the Lambda invocation ends. + /// + public static void ApplyDefaults( + LambdaWorkerConfig config, + LambdaWorkerOpenTelemetryOptions? options = null) + { + if (config == null) + { + throw new ArgumentNullException(nameof(config)); + } + + var resolvedOptions = ResolveOptions(options); +#pragma warning disable CA2000 // Provider is intentionally retained for Lambda warm invocations. + var tracerProvider = CreateTracerProvider(resolvedOptions); +#pragma warning restore CA2000 + + config.ClientOptions.Interceptors = AddTracingInterceptor( + config.ClientOptions.Interceptors); + config.ClientOptions.Runtime = CreateRuntime(resolvedOptions); + config.ShutdownHooks.Add( + cancellationToken => ForceFlushAsync( + tracerProvider, + config.ShutdownDeadlineBuffer, + cancellationToken)); + } + + /// + /// Resolve options using process environment variables. + /// + /// Options to resolve. + /// Resolved options. + internal static ResolvedLambdaWorkerOpenTelemetryOptions ResolveOptions( + LambdaWorkerOpenTelemetryOptions? options = null) + { + options ??= new LambdaWorkerOpenTelemetryOptions(); + if (options.MetricsExportInterval <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(options), + "MetricsExportInterval must be greater than zero"); + } + + var serviceName = FirstNonEmpty( + options.ServiceName, + Environment.GetEnvironmentVariable(OTelServiceNameEnvironmentVariable), + Environment.GetEnvironmentVariable(LambdaFunctionNameEnvironmentVariable), + DefaultServiceName); + var collectorEndpoint = FirstNonEmpty( + options.CollectorEndpoint, + Environment.GetEnvironmentVariable(OTelExporterOtlpEndpointEnvironmentVariable), + DefaultCollectorEndpoint); + + return new ResolvedLambdaWorkerOpenTelemetryOptions( + new Uri(collectorEndpoint), + serviceName, + options.MetricsExportInterval); + } + + /// + /// Force-flush the tracer provider asynchronously. + /// + /// Tracer provider to flush. + /// Maximum time to wait for the flush. + /// Cancellation token. + /// A task for the flush. + internal static async Task ForceFlushAsync( + TracerProvider tracerProvider, + TimeSpan shutdownDeadlineBuffer, + CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + var flushTask = Task.Run( + () => tracerProvider.ForceFlush(ToTimeoutMilliseconds(shutdownDeadlineBuffer))); + if (flushTask == await Task.WhenAny( + flushTask, + Task.Delay(Timeout.Infinite, cancellationToken)).ConfigureAwait(false)) + { + await flushTask.ConfigureAwait(false); + } + else + { + ObserveTaskException(flushTask); + } + } + + private static string FirstNonEmpty(params string?[] values) => + values.First(value => !string.IsNullOrEmpty(value))!; + + private static void ObserveTaskException(Task task) => + _ = task.ContinueWith( + completedTask => _ = completedTask.Exception, + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + + private static TracerProvider CreateTracerProvider( + ResolvedLambdaWorkerOpenTelemetryOptions options) => + Sdk.CreateTracerProviderBuilder(). + AddXRayTraceId(). + SetResourceBuilder( + ResourceBuilder.CreateDefault().AddService(options.ServiceName)). + AddSource( + TemporalOpenTelemetry.TracingInterceptor.ClientSource.Name, + TemporalOpenTelemetry.TracingInterceptor.WorkflowsSource.Name, + TemporalOpenTelemetry.TracingInterceptor.ActivitiesSource.Name, + TemporalOpenTelemetry.TracingInterceptor.NexusSource.Name). + AddOtlpExporter(exporterOptions => + { + exporterOptions.Endpoint = options.CollectorEndpoint; +#pragma warning disable CS0618 // ADOT Lambda parity uses OTLP gRPC on localhost:4317. + exporterOptions.Protocol = OtlpExportProtocol.Grpc; +#pragma warning restore CS0618 + }). + Build(); + + private static List AddTracingInterceptor( + IReadOnlyCollection? interceptors) + { + var newInterceptors = interceptors?.ToList() ?? new List(); + newInterceptors.Add(new TemporalOpenTelemetry.TracingInterceptor()); + return newInterceptors; + } + + private static TemporalRuntime CreateRuntime( + ResolvedLambdaWorkerOpenTelemetryOptions options) + { + var openTelemetryOptions = new Temporalio.Runtime.OpenTelemetryOptions( + options.CollectorEndpoint) + { + MetricsExportInterval = options.MetricsExportInterval, + Protocol = OpenTelemetryProtocol.Grpc, + }; + return new TemporalRuntime(new TemporalRuntimeOptions + { + Telemetry = new TelemetryOptions + { + Metrics = new MetricsOptions(openTelemetryOptions) + { + GlobalTags = new[] + { + new KeyValuePair( + ServiceNameResourceAttribute, + options.ServiceName), + }, + }, + }, + }); + } + + private static int ToTimeoutMilliseconds(TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) + { + return 0; + } + if (timeout.TotalMilliseconds >= int.MaxValue) + { + return int.MaxValue; + } + return (int)timeout.TotalMilliseconds; + } + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/LambdaWorkerOpenTelemetryOptions.cs b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/LambdaWorkerOpenTelemetryOptions.cs new file mode 100644 index 00000000..b03ad440 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/LambdaWorkerOpenTelemetryOptions.cs @@ -0,0 +1,27 @@ +using System; + +namespace Temporalio.Extensions.Aws.Lambda.OpenTelemetry +{ + /// + /// Options for . + /// + public class LambdaWorkerOpenTelemetryOptions + { + /// + /// Gets or sets how often the Core SDK exports metrics to the collector. + /// + public TimeSpan MetricsExportInterval { get; set; } = TimeSpan.FromSeconds(10); + + /// + /// Gets or sets the OpenTelemetry service name. If unset, this falls back to + /// OTEL_SERVICE_NAME, then AWS_LAMBDA_FUNCTION_NAME, then "temporal-lambda-worker". + /// + public string? ServiceName { get; set; } + + /// + /// Gets or sets the OTLP collector endpoint. If unset, this falls back to + /// OTEL_EXPORTER_OTLP_ENDPOINT, then "http://localhost:4317". + /// + public string? CollectorEndpoint { get; set; } + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/README.md b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/README.md new file mode 100644 index 00000000..f9a487ff --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/README.md @@ -0,0 +1,71 @@ +# AWS Lambda Worker OpenTelemetry Support + +This extension adds OpenTelemetry helpers for Temporal workers running inside AWS Lambda. + +Add the `Temporalio.Extensions.Aws.Lambda.OpenTelemetry` package from +[NuGet](https://www.nuget.org/packages/Temporalio.Extensions.Aws.Lambda.OpenTelemetry). For example, using the +`dotnet` CLI: + + dotnet add package Temporalio.Extensions.Aws.Lambda.OpenTelemetry + +## Quick Start + +Call `LambdaWorkerOpenTelemetry.ApplyDefaults` from the Lambda worker configure callback: + +```csharp +using Amazon.Lambda.Core; +using Temporalio.Common; +using Temporalio.Extensions.Aws.Lambda; +using Temporalio.Extensions.Aws.Lambda.OpenTelemetry; + +private static readonly Func WorkerHandler = + TemporalLambdaWorker.CreateHandler( + new WorkerDeploymentVersion("payments-worker", "2026-05-27"), + config => + { + LambdaWorkerOpenTelemetry.ApplyDefaults(config); + + config.WorkerOptions.TaskQueue = "payments"; + config.WorkerOptions.AddWorkflow(); + config.WorkerOptions.AddActivity(PaymentActivities.ChargeAsync); + }); +``` + +`ApplyDefaults` configures Temporal tracing with `TracingInterceptor`, creates an OTLP trace exporter and tracer +provider, configures Core SDK metrics through a `TemporalRuntime`, and registers a per-invocation shutdown hook that +force-flushes traces before the Lambda invocation ends. + +## Defaults + +The OTLP collector endpoint is resolved from `LambdaWorkerOpenTelemetryOptions.CollectorEndpoint`, then +`OTEL_EXPORTER_OTLP_ENDPOINT`, then `http://localhost:4317`, which is the endpoint expected by the AWS Distro for +OpenTelemetry collector Lambda layer. + +The OpenTelemetry service name is resolved from `LambdaWorkerOpenTelemetryOptions.ServiceName`, then +`OTEL_SERVICE_NAME`, then `AWS_LAMBDA_FUNCTION_NAME`, then `temporal-lambda-worker`. + +Core SDK metrics export every 10 seconds by default. Set `MetricsExportInterval` shorter than your Lambda timeout to +increase the chance that at least one metrics export happens during each invocation. Core metrics are exported +periodically and do not have an explicit per-invocation flush API. + +```csharp +LambdaWorkerOpenTelemetry.ApplyDefaults( + config, + new LambdaWorkerOpenTelemetryOptions + { + CollectorEndpoint = "http://localhost:4317", + ServiceName = "payments-worker", + MetricsExportInterval = TimeSpan.FromSeconds(5), + }); +``` + +## AWS Lambda Setup + +Attach the ADOT collector Lambda layer and set: + +```text +OPENTELEMETRY_COLLECTOR_CONFIG_URI=/var/task/otel-collector-config.yaml +``` + +The helper uses AWS X-Ray-compatible trace IDs. It force-flushes traces on every Lambda invocation but does not shut down +the tracer provider, so warm Lambda invocations can continue using it. diff --git a/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/ResolvedLambdaWorkerOpenTelemetryOptions.cs b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/ResolvedLambdaWorkerOpenTelemetryOptions.cs new file mode 100644 index 00000000..12cf4a8e --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/ResolvedLambdaWorkerOpenTelemetryOptions.cs @@ -0,0 +1,41 @@ +using System; + +namespace Temporalio.Extensions.Aws.Lambda.OpenTelemetry +{ + /// + /// Resolved OpenTelemetry options for Lambda workers. + /// + internal sealed class ResolvedLambdaWorkerOpenTelemetryOptions + { + /// + /// Initializes a new instance of the class. + /// + /// OTLP collector endpoint. + /// OpenTelemetry service name. + /// Metrics export interval. + public ResolvedLambdaWorkerOpenTelemetryOptions( + Uri collectorEndpoint, + string serviceName, + TimeSpan metricsExportInterval) + { + CollectorEndpoint = collectorEndpoint; + ServiceName = serviceName; + MetricsExportInterval = metricsExportInterval; + } + + /// + /// Gets the OTLP collector endpoint. + /// + public Uri CollectorEndpoint { get; } + + /// + /// Gets the OpenTelemetry service name. + /// + public string ServiceName { get; } + + /// + /// Gets how often the Core SDK exports metrics to the collector. + /// + public TimeSpan MetricsExportInterval { get; } + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/Temporalio.Extensions.Aws.Lambda.OpenTelemetry.csproj b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/Temporalio.Extensions.Aws.Lambda.OpenTelemetry.csproj new file mode 100644 index 00000000..69751efa --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/Temporalio.Extensions.Aws.Lambda.OpenTelemetry.csproj @@ -0,0 +1,38 @@ + + + + Temporal SDK .NET AWS Lambda Worker OpenTelemetry Extension + + false + true + 9.0 + README.md + true + snupkg + netstandard2.0 + + + + + + + + + + + + + + + + + + <_Parameter1>Temporalio.Tests + + + + + + + + diff --git a/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/packages.lock.json b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/packages.lock.json new file mode 100644 index 00000000..dbb7f643 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda.OpenTelemetry/packages.lock.json @@ -0,0 +1,363 @@ +{ + "version": 2, + "dependencies": { + ".NETStandard,Version=v2.0": { + "Microsoft.VisualStudio.Threading.Analyzers": { + "type": "Direct", + "requested": "[17.4.33, )", + "resolved": "17.4.33", + "contentHash": "gpf500WFCqnhJmbIEDvVYgeNLpHZ6PAyhcewyqqpE8UoKNHoCDdMgogulYuJERbRaGqywqD1pRwPNplVteNZ8g==" + }, + "NETStandard.Library": { + "type": "Direct", + "requested": "[2.0.3, )", + "resolved": "2.0.3", + "contentHash": "st47PosZSHrjECdjeIzZQbzivYBJFv6P2nv4cj2ypdI204DO+vZ7l5raGMiX4eXMJ53RfOIg+/s4DHVZ54Nu2A==", + "dependencies": { + "Microsoft.NETCore.Platforms": "1.1.0" + } + }, + "OpenTelemetry": { + "type": "Direct", + "requested": "[1.15.3, )", + "resolved": "1.15.3", + "contentHash": "N0i6WjPoHPbZyms1ugbDIFAJFuGlpeExJMU/+XSL0lQRUkg/D0utFkDoLXf8Z1km5B+xVZ2GyMXXiX8qdeNmPg==", + "dependencies": { + "Microsoft.Extensions.Diagnostics.Abstractions": "10.0.0", + "Microsoft.Extensions.Logging.Configuration": "10.0.0", + "OpenTelemetry.Api.ProviderBuilderExtensions": "1.15.3" + } + }, + "OpenTelemetry.Exporter.OpenTelemetryProtocol": { + "type": "Direct", + "requested": "[1.15.3, )", + "resolved": "1.15.3", + "contentHash": "FEXJepcseTGbATiCkUfP7ipoFEYYfl/0UmmUwi0KxCPg9PaUA8ab2P1LGopK+/HExasJ1ZutFhZrN6WvUIR23g==", + "dependencies": { + "OpenTelemetry": "1.15.3" + } + }, + "OpenTelemetry.Extensions.AWS": { + "type": "Direct", + "requested": "[1.15.1, )", + "resolved": "1.15.1", + "contentHash": "T+Vrhlv79PyG+fK6XnEtdJW9VtYp5WxSsVajplnkbuY0Q3gTyFNiLPP8tyu1qmEL19bKc9i6Wgp4JqhvupqirA==", + "dependencies": { + "OpenTelemetry": "[1.15.3, 2.0.0)" + } + }, + "StyleCop.Analyzers": { + "type": "Direct", + "requested": "[1.2.0-beta.435, )", + "resolved": "1.2.0-beta.435", + "contentHash": "TADk7vdGXtfTnYCV7GyleaaRTQjfoSfZXprQrVMm7cSJtJbFc1QIbWPyLvrgrfGdfHbGmUPvaN4ODKNxg2jgPQ==", + "dependencies": { + "StyleCop.Analyzers.Unstable": "1.2.0.435" + } + }, + "Microsoft.Bcl.AsyncInterfaces": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "vFuwSLj9QJBbNR0NeNO4YVASUbokxs+i/xbuu8B+Fs4FAZg5QaFa6eGrMaRqTzzNI5tAb97T7BhSxtLckFyiRA==", + "dependencies": { + "System.Threading.Tasks.Extensions": "4.6.3" + } + }, + "Microsoft.Extensions.Configuration": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "H4SWETCh/cC5L1WtWchHR6LntGk3rDTTznZMssr4cL8IbDmMWBxY+MOGDc/ASnqNolLKPIWHWeuC1ddiL/iNPw==", + "dependencies": { + "Microsoft.Extensions.Configuration.Abstractions": "10.0.0", + "Microsoft.Extensions.Primitives": "10.0.0" + } + }, + "Microsoft.Extensions.Configuration.Abstractions": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "d2kDKnCsJvY7mBVhcjPSp9BkJk48DsaHPg5u+Oy4f8XaOqnEedRy/USyvnpHL92wpJ6DrTPy7htppUUzskbCXQ==", + "dependencies": { + "Microsoft.Extensions.Primitives": "10.0.0" + } + }, + "Microsoft.Extensions.Configuration.Binder": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "tMF9wNh+hlyYDWB8mrFCQHQmWHlRosol1b/N2Jrefy1bFLnuTlgSYmPyHNmz8xVQgs7DpXytBRWxGhG+mSTp0g==", + "dependencies": { + "Microsoft.Extensions.Configuration": "10.0.0", + "Microsoft.Extensions.Configuration.Abstractions": "10.0.0" + } + }, + "Microsoft.Extensions.DependencyInjection": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "f0RBabswJq+gRu5a+hWIobrLWiUYPKMhCD9WO3sYBAdSy3FFH14LMvLVFZc2kPSCimBLxSuitUhsd6tb0TAY6A==", + "dependencies": { + "Microsoft.Bcl.AsyncInterfaces": "10.0.0", + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "System.Threading.Tasks.Extensions": "4.6.3" + } + }, + "Microsoft.Extensions.DependencyInjection.Abstractions": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "L3AdmZ1WOK4XXT5YFPEwyt0ep6l8lGIPs7F5OOBZc77Zqeo01Of7XXICy47628sdVl0v/owxYJTe86DTgFwKCA==", + "dependencies": { + "Microsoft.Bcl.AsyncInterfaces": "10.0.0", + "System.Threading.Tasks.Extensions": "4.6.3" + } + }, + "Microsoft.Extensions.Diagnostics.Abstractions": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "SfK89ytD61S7DgzorFljSkUeluC1ncn6dtZgwc0ot39f/BEYWBl5jpgvodxduoYAs1d9HG8faCDRZxE95UMo2A==", + "dependencies": { + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "Microsoft.Extensions.Options": "10.0.0", + "System.Buffers": "4.6.1", + "System.Diagnostics.DiagnosticSource": "10.0.0", + "System.Memory": "4.6.3" + } + }, + "Microsoft.Extensions.Logging.Configuration": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "j8zcwhS6bYB6FEfaY3nYSgHdpiL2T+/V3xjpHtslVAegyI1JUbB9yAt/BFdvZdsNbY0Udm4xFtvfT/hUwcOOOg==", + "dependencies": { + "Microsoft.Extensions.Configuration": "10.0.0", + "Microsoft.Extensions.Configuration.Abstractions": "10.0.0", + "Microsoft.Extensions.Configuration.Binder": "10.0.0", + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "Microsoft.Extensions.Logging": "10.0.0", + "Microsoft.Extensions.Logging.Abstractions": "10.0.0", + "Microsoft.Extensions.Options": "10.0.0", + "Microsoft.Extensions.Options.ConfigurationExtensions": "10.0.0" + } + }, + "Microsoft.Extensions.Options": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "8oCAgXOow5XDrY9HaXX1QmH3ORsyZO/ANVHBlhLyCeWTH5Sg4UuqZeOTWJi6484M+LqSx0RqQXDJtdYy2BNiLQ==", + "dependencies": { + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "Microsoft.Extensions.Primitives": "10.0.0", + "System.ComponentModel.Annotations": "5.0.0" + } + }, + "Microsoft.Extensions.Options.ConfigurationExtensions": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "tL9cSl3maS5FPzp/3MtlZI21ExWhni0nnUCF8HY4npTsINw45n9SNDbkKXBMtFyUFGSsQep25fHIDN4f/Vp3AQ==", + "dependencies": { + "Microsoft.Extensions.Configuration.Abstractions": "10.0.0", + "Microsoft.Extensions.Configuration.Binder": "10.0.0", + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "Microsoft.Extensions.Options": "10.0.0", + "Microsoft.Extensions.Primitives": "10.0.0" + } + }, + "Microsoft.Extensions.Primitives": { + "type": "Transitive", + "resolved": "10.0.0", + "contentHash": "inRnbpCS0nwO/RuoZIAqxQUuyjaknOOnCEZB55KSMMjRhl0RQDttSmLSGsUJN3RQ3ocf5NDLFd2mOQViHqMK5w==", + "dependencies": { + "System.Memory": "4.6.3", + "System.Runtime.CompilerServices.Unsafe": "6.1.2" + } + }, + "Microsoft.NETCore.Platforms": { + "type": "Transitive", + "resolved": "1.1.0", + "contentHash": "kz0PEW2lhqygehI/d6XsPCQzD7ff7gUJaVGPVETX611eadGsA3A877GdSlU0LRVMCTH/+P3o2iDTak+S08V2+A==" + }, + "OpenTelemetry.Api.ProviderBuilderExtensions": { + "type": "Transitive", + "resolved": "1.15.3", + "contentHash": "SYn0lqYDwLMWhv/zlNGsQcl2yX++yTumanX46bmOZE/ZDOd1WjPBO2kZaZgKLEZTZk48pavIFGJ6vOvxXgWVFQ==", + "dependencies": { + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "OpenTelemetry.Api": "1.15.3" + } + }, + "StyleCop.Analyzers.Unstable": { + "type": "Transitive", + "resolved": "1.2.0.435", + "contentHash": "ouwPWZxbOV3SmCZxIRqHvljkSzkCyi1tDoMzQtDb/bRP8ctASV/iRJr+A2Gdj0QLaLmWnqTWDrH82/iP+X80Lg==" + }, + "System.Buffers": { + "type": "Transitive", + "resolved": "4.6.1", + "contentHash": "N8GXpmiLMtljq7gwvyS+1QvKT/W2J8sNAvx+HVg4NGmsG/H+2k/y9QI23auLJRterrzCiDH+IWAw4V/GPwsMlw==" + }, + "System.ComponentModel.Annotations": { + "type": "Transitive", + "resolved": "5.0.0", + "contentHash": "dMkqfy2el8A8/I76n2Hi1oBFEbG1SfxD2l5nhwXV3XjlnOmwxJlQbYpJH4W51odnU9sARCSAgv7S3CyAFMkpYg==" + }, + "System.IO.Pipelines": { + "type": "Transitive", + "resolved": "9.0.4", + "contentHash": "luF2Xba+lTe2GOoNQdZLe8q7K6s7nSpWZl9jIwWNMszN4/Yv0lmxk9HISgMmwdyZ83i3UhAGXaSY9o6IJBUuuA==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Memory": "4.5.5", + "System.Threading.Tasks.Extensions": "4.5.4" + } + }, + "System.Memory": { + "type": "Transitive", + "resolved": "4.6.3", + "contentHash": "qdcDOgnFZY40+Q9876JUHnlHu7bosOHX8XISRoH94fwk6hgaeQGSgfZd8srWRZNt5bV9ZW2TljcegDNxsf+96A==", + "dependencies": { + "System.Buffers": "4.6.1", + "System.Numerics.Vectors": "4.6.1", + "System.Runtime.CompilerServices.Unsafe": "6.1.2" + } + }, + "System.Numerics.Vectors": { + "type": "Transitive", + "resolved": "4.6.1", + "contentHash": "sQxefTnhagrhoq2ReR0D/6K0zJcr9Hrd6kikeXsA1I8kOCboTavcUC4r7TSfpKFeE163uMuxZcyfO1mGO3EN8Q==" + }, + "System.Runtime.CompilerServices.Unsafe": { + "type": "Transitive", + "resolved": "6.1.2", + "contentHash": "2hBr6zdbIBTDE3EhK7NSVNdX58uTK6iHW/P/Axmm9sl1xoGSLqDvMtpecn226TNwHByFokYwJmt/aQQNlO5CRw==" + }, + "System.Text.Encodings.Web": { + "type": "Transitive", + "resolved": "9.0.4", + "contentHash": "V+5cCPpk1S2ngekUs9nDrQLHGiWFZMg8BthADQr+Fwi59a8DdHFu26S2oi9Bfgv+d67bqmkPqctJXMEXiimXUg==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0" + } + }, + "System.Threading.Tasks.Extensions": { + "type": "Transitive", + "resolved": "4.6.3", + "contentHash": "7sCiwilJLYbTZELaKnc7RecBBXWXA+xMLQWZKWawBxYjp6DBlSE3v9/UcvKBvr1vv2tTOhipiogM8rRmxlhrVA==", + "dependencies": { + "System.Runtime.CompilerServices.Unsafe": "6.1.2" + } + }, + "temporalio": { + "type": "Project", + "dependencies": { + "Google.Protobuf": "[3.26.1, )", + "Microsoft.Bcl.HashCode": "[6.0.0, )", + "Microsoft.Extensions.Logging.Abstractions": "[2.2.0, )", + "NexusRpc": "[0.3.0, )", + "System.Text.Json": "[9.0.4, )" + } + }, + "temporalio.extensions.aws.lambda": { + "type": "Project", + "dependencies": { + "Amazon.Lambda.Core": "[3.1.0, )", + "Temporalio": "[1.14.1, )" + } + }, + "temporalio.extensions.opentelemetry": { + "type": "Project", + "dependencies": { + "OpenTelemetry.Api": "[1.15.3, )", + "System.Diagnostics.DiagnosticSource": "[10.0.0, )", + "Temporalio": "[1.14.1, )" + } + }, + "Amazon.Lambda.Core": { + "type": "CentralTransitive", + "requested": "[3.1.0, )", + "resolved": "3.1.0", + "contentHash": "uZZ2k5lMoB9OzPTmKkkEKpyFcnLxcb7FxtxrA3+HBg/sooTzu402iCcSk5r+N62Qokhwr4Q9cbaVJSM6Dln3aA==" + }, + "Google.Protobuf": { + "type": "CentralTransitive", + "requested": "[3.26.1, )", + "resolved": "3.26.1", + "contentHash": "CHZX8zXqhF/fdUtd+AYzew8T2HFkAoe5c7lbGxZY/qryAlQXckDvM5BfOJjXlMS7kyICqQTMszj4w1bX5uBJ/w==", + "dependencies": { + "System.Memory": "4.5.3", + "System.Runtime.CompilerServices.Unsafe": "4.5.2" + } + }, + "Microsoft.Bcl.HashCode": { + "type": "CentralTransitive", + "requested": "[6.0.0, )", + "resolved": "6.0.0", + "contentHash": "GI4jcoi6eC9ZhNOQylIBaWOQjyGaR8T6N3tC1u8p3EXfndLCVNNWa+Zp+ocjvvS3kNBN09Zma2HXL0ezO0dRfw==" + }, + "Microsoft.Extensions.Logging": { + "type": "CentralTransitive", + "requested": "[10.0.0, )", + "resolved": "10.0.0", + "contentHash": "BStFkd5CcnEtarlcgYDBcFzGYCuuNMzPs02wN3WBsOFoYIEmYoUdAiU+au6opzoqfTYJsMTW00AeqDdnXH2CvA==", + "dependencies": { + "Microsoft.Bcl.AsyncInterfaces": "10.0.0", + "Microsoft.Extensions.DependencyInjection": "10.0.0", + "Microsoft.Extensions.Logging.Abstractions": "10.0.0", + "Microsoft.Extensions.Options": "10.0.0", + "System.Diagnostics.DiagnosticSource": "10.0.0" + } + }, + "Microsoft.Extensions.Logging.Abstractions": { + "type": "CentralTransitive", + "requested": "[2.2.0, )", + "resolved": "10.0.0", + "contentHash": "FU/IfjDfwaMuKr414SSQNTIti/69bHEMb+QKrskRb26oVqpx3lNFXMjs/RC9ZUuhBhcwDM2BwOgoMw+PZ+beqQ==", + "dependencies": { + "Microsoft.Extensions.DependencyInjection.Abstractions": "10.0.0", + "System.Buffers": "4.6.1", + "System.Diagnostics.DiagnosticSource": "10.0.0", + "System.Memory": "4.6.3" + } + }, + "NexusRpc": { + "type": "CentralTransitive", + "requested": "[0.3.0, )", + "resolved": "0.3.0", + "contentHash": "Kr+NMSZ5428AvxpzShdJcQxc9w6HT8SM6FXQMekC4K9wGpmC1m/L2pQJydpvVTwRBu3qAIYKPI37KWexF4Gtcg==", + "dependencies": { + "Microsoft.Bcl.HashCode": "6.0.0" + } + }, + "OpenTelemetry.Api": { + "type": "CentralTransitive", + "requested": "[1.15.3, )", + "resolved": "1.15.3", + "contentHash": "fX+fkCysfPut+qCcT3bKqyX4QN9Saf4CgX8HLOHywEVD+Xr7sULtfuypITpoDysjx8R59dn/3mWhgimMH8cm/g==", + "dependencies": { + "System.Diagnostics.DiagnosticSource": "10.0.0" + } + }, + "System.Diagnostics.DiagnosticSource": { + "type": "CentralTransitive", + "requested": "[7.0.0, )", + "resolved": "10.0.0", + "contentHash": "0KdBK+h7G13PuOSC2R/DalAoFMvdYMznvGRuICtkdcUMXgl/gYXsG6z4yUvTxHSMACorWgHCU1Faq0KUHU6yAQ==", + "dependencies": { + "System.Memory": "4.6.3", + "System.Runtime.CompilerServices.Unsafe": "6.1.2" + } + }, + "System.Text.Json": { + "type": "CentralTransitive", + "requested": "[9.0.4, )", + "resolved": "9.0.4", + "contentHash": "pYtmpcO6R3Ef1XilZEHgXP2xBPVORbYEzRP7dl0IAAbN8Dm+kfwio8aCKle97rAWXOExr292MuxWYurIuwN62g==", + "dependencies": { + "Microsoft.Bcl.AsyncInterfaces": "9.0.4", + "System.Buffers": "4.5.1", + "System.IO.Pipelines": "9.0.4", + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0", + "System.Text.Encodings.Web": "9.0.4", + "System.Threading.Tasks.Extensions": "4.5.4" + } + } + } + } +} \ No newline at end of file diff --git a/src/Temporalio.Extensions.Aws.Lambda/ILambdaWorker.cs b/src/Temporalio.Extensions.Aws.Lambda/ILambdaWorker.cs new file mode 100644 index 00000000..73be3f42 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/ILambdaWorker.cs @@ -0,0 +1,19 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Temporalio.Extensions.Aws.Lambda +{ + /// + /// Internal abstraction for a worker that can run for a Lambda invocation. + /// + internal interface ILambdaWorker : IDisposable + { + /// + /// Run the worker until cancellation or failure. + /// + /// Cancellation token for worker shutdown. + /// Task for worker completion. + Task ExecuteAsync(CancellationToken stoppingToken); + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda/LambdaWorkerConfig.cs b/src/Temporalio.Extensions.Aws.Lambda/LambdaWorkerConfig.cs new file mode 100644 index 00000000..de36ea17 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/LambdaWorkerConfig.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Temporalio.Client; +using Temporalio.Worker; + +namespace Temporalio.Extensions.Aws.Lambda +{ + /// + /// Configuration for . + /// + public class LambdaWorkerConfig + { + /// + /// Default time reserved after the worker run budget for worker shutdown and hooks. + /// + public static readonly TimeSpan DefaultShutdownDeadlineBuffer = TimeSpan.FromSeconds(7); + + private Func? loadClientOptions; + private TemporalClientConnectOptions? clientOptions; + private bool clientOptionsSet; + + /// + /// Initializes a new instance of the class. + /// + public LambdaWorkerConfig() + : this(null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Lazy client options loader. + internal LambdaWorkerConfig(Func? loadClientOptions) + { + this.loadClientOptions = loadClientOptions; + WorkerOptions.MaxConcurrentActivities = 2; + WorkerOptions.MaxConcurrentWorkflowTasks = 10; + WorkerOptions.MaxConcurrentLocalActivities = 2; + WorkerOptions.MaxConcurrentNexusTasks = 5; + WorkerOptions.GracefulShutdownTimeout = TimeSpan.FromSeconds(5); + WorkerOptions.MaxCachedWorkflows = 30; + WorkerOptions.MaxConcurrentWorkflowTaskPolls = 2; + WorkerOptions.MaxConcurrentActivityTaskPolls = 1; + WorkerOptions.MaxConcurrentNexusTaskPolls = 1; + WorkerOptions.DisableEagerActivityExecution = true; + } + + /// + /// Gets or sets the client connection options. + /// + public TemporalClientConnectOptions ClientOptions + { + get + { + if (!clientOptionsSet) + { + clientOptions = loadClientOptions == null ? + new TemporalClientConnectOptions() : + loadClientOptions(); + loadClientOptions = null; + clientOptionsSet = true; + } + return clientOptions!; + } + + set + { + clientOptions = value; + loadClientOptions = null; + clientOptionsSet = true; + } + } + + /// + /// Gets or sets the worker options. + /// + public TemporalWorkerOptions WorkerOptions { get; set; } = new TemporalWorkerOptions(); + + /// + /// Gets or sets the deadline buffer reserved for worker shutdown and hooks. + /// + public TimeSpan ShutdownDeadlineBuffer { get; set; } = DefaultShutdownDeadlineBuffer; + + /// + /// Gets or sets hooks to run after each invocation's worker has shut down. + /// +#pragma warning disable CA2227 // The public API intentionally allows replacing the list during configuration. + public IList> ShutdownHooks { get; set; } = + new List>(); +#pragma warning restore CA2227 + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda/README.md b/src/Temporalio.Extensions.Aws.Lambda/README.md new file mode 100644 index 00000000..9a925395 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/README.md @@ -0,0 +1,158 @@ +# AWS Lambda Worker Support + +This extension adds an AWS Lambda handler helper for running a Temporal worker during a Lambda invocation. + +Add the `Temporalio.Extensions.Aws.Lambda` package from +[NuGet](https://www.nuget.org/packages/Temporalio.Extensions.Aws.Lambda). For example, using the `dotnet` CLI: + + dotnet add package Temporalio.Extensions.Aws.Lambda + +## Quick Start + +Create a static handler delegate and call it from the AWS Lambda handler method: + +```csharp +using Amazon.Lambda.Core; +using Temporalio.Common; +using Temporalio.Extensions.Aws.Lambda; + +public class Function +{ + private static readonly Func WorkerHandler = + TemporalLambdaWorker.CreateHandler( + new WorkerDeploymentVersion("payments-worker", "2026-05-27"), + config => + { + config.WorkerOptions.TaskQueue = "payments"; + config.WorkerOptions.AddWorkflow(); + config.WorkerOptions.AddActivity(PaymentActivities.ChargeAsync); + }); + + public Task HandlerAsync(object? input, ILambdaContext context) => + WorkerHandler(input, context); +} +``` + +The synchronous `configure` callback shown above runs once when the delegate is created, which is normally during Lambda +cold start. Use it for static worker setup such as task queues, workflow/activity registration, and options that can be +shared across warm invocations. Each invocation creates a fresh Temporal client and worker, runs until the Lambda +deadline minus `ShutdownDeadlineBuffer`, then shuts down and runs configured shutdown hooks. + +For setup that must be awaited per invocation, use the async overload: + +```csharp +private static readonly Func WorkerHandler = + TemporalLambdaWorker.CreateHandler( + new WorkerDeploymentVersion("payments-worker", "2026-05-27"), + async config => + { + config.WorkerOptions.TaskQueue = "payments"; + config.WorkerOptions.AddWorkflow(); + config.WorkerOptions.AddActivity(PaymentActivities.ChargeAsync); + + config.ClientOptions.ApiKey = await LoadTemporalApiKeyAsync(); + config.ShutdownHooks.Add(async cancellationToken => + { + await FlushPerInvocationResourceAsync(cancellationToken); + }); + }); +``` + +The async `configure` callback is awaited once per Lambda invocation, before the Temporal client connects. It receives a +fresh `LambdaWorkerConfig` each time, so use shutdown hooks for any per-invocation cleanup. + +## Configuration + +Client connection settings are pre-populated from a `temporal.toml` file and/or environment variables via +`Temporalio.Common.EnvConfig`. The Lambda config file path is selected as follows: + +1. `TEMPORAL_CONFIG_FILE`, if set. +2. Otherwise, `temporal.toml` in `$LAMBDA_TASK_ROOT`, typically `/var/task`, when `LAMBDA_TASK_ROOT` is set. +3. Otherwise, `temporal.toml` in the current working directory. + +The file is optional. If it does not exist, only environment variables are used. You can also load Lambda-aware client +options directly: + +```csharp +using Temporalio.Common.EnvConfig; +using Temporalio.Extensions.Aws.Lambda; + +config.ClientOptions = TemporalLambdaWorker.LoadClientConnectOptions( + new ClientEnvConfig.ProfileLoadOptions { Profile = "production" }); +``` + +To bypass config loading, assign explicit client options in `configure`: + +```csharp +config.ClientOptions = new TemporalClientConnectOptions +{ + TargetHost = "my-namespace.a1b2c.tmprl.cloud:7233", + Namespace = "my-namespace.a1b2c", + ApiKey = Environment.GetEnvironmentVariable("TEMPORAL_API_KEY"), + Tls = new TlsOptions(), +}; +``` + +If `TEMPORAL_TASK_QUEUE` is present, it is used as the initial `WorkerOptions.TaskQueue`. You can still override the task +queue in `configure`. If no task queue is set by the environment or by `configure`, synchronous handler creation fails; +with async configure, the invocation fails after the callback is awaited. + +`TemporalLambdaWorker.CreateHandler` requires a `WorkerDeploymentVersion` and always enables Worker Versioning by setting +`WorkerOptions.DeploymentOptions` with `UseWorkerVersioning = true`. Use a deployment name and build ID that match your +rollout process. The default versioning behavior is `AutoUpgrade`. If you need a different default versioning behavior, +configure `config.WorkerOptions.DeploymentOptions.DefaultVersioningBehavior`; the handler preserves any non-`Unspecified` +value while enforcing the deployment version passed to `CreateHandler`. + +The helper applies Lambda-oriented worker defaults before `configure`, including lower concurrency, a 5 second graceful +shutdown timeout, a smaller workflow cache, simple poller limits, and disabled eager activity execution. Values you set in +`configure` override these defaults except for the enforced deployment version. With sync configure this happens once at +handler creation; with async configure it happens for every invocation. + +## Shutdown Hooks + +Add shutdown hooks for per-invocation cleanup: + +```csharp +config.ShutdownHooks.Add(async cancellationToken => +{ + await FlushTelemetryAsync(cancellationToken); +}); +``` + +Hooks run in order after the worker has stopped. Hook failures are logged to the Lambda context logger and later hooks +still run. Hooks added from async configure are scoped to that invocation. + +## Observability + +For AWS Lambda OpenTelemetry defaults, add the `Temporalio.Extensions.Aws.Lambda.OpenTelemetry` package and call +`LambdaWorkerOpenTelemetry.ApplyDefaults` in `configure`: + +```csharp +using Temporalio.Extensions.Aws.Lambda.OpenTelemetry; + +LambdaWorkerOpenTelemetry.ApplyDefaults(config); +``` + +This configures Temporal tracing, Core SDK OTLP metrics, AWS X-Ray-compatible trace IDs, and a per-invocation trace +flush shutdown hook. The OTLP endpoint defaults to `OTEL_EXPORTER_OTLP_ENDPOINT`, then `http://localhost:4317`, which is +the endpoint expected by the ADOT collector Lambda layer. + +You can still configure tracing and metrics manually using `Temporalio.Extensions.OpenTelemetry.TracingInterceptor` and +`TemporalRuntime`: + +```csharp +config.ClientOptions.Interceptors = new[] { new TracingInterceptor() }; +config.ClientOptions.Runtime = new TemporalRuntime(new TemporalRuntimeOptions +{ + Telemetry = new TelemetryOptions + { + Metrics = new MetricsOptions(new OpenTelemetryOptions("http://collector:4317")), + }, +}); +``` + +## TLS/CA Notes + +Some AWS Lambda .NET images can override `SSL_CERT_FILE` in a way that prevents the SDK's Rust-based runtime from loading +system root CAs. See the SDK root README's +[AWS Lambda .NET CA loading workaround](https://github.com/temporalio/sdk-dotnet#aws-lambda-net-ca-loading-issues). diff --git a/src/Temporalio.Extensions.Aws.Lambda/TemporalLambdaWorker.cs b/src/Temporalio.Extensions.Aws.Lambda/TemporalLambdaWorker.cs new file mode 100644 index 00000000..bd271cb4 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/TemporalLambdaWorker.cs @@ -0,0 +1,429 @@ +#pragma warning disable CS0618 // This package forces deployment options and clears legacy versioning fields. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Temporalio.Client; +using Temporalio.Common; +using Temporalio.Common.EnvConfig; +using Temporalio.Worker; + +namespace Temporalio.Extensions.Aws.Lambda +{ + /// + /// Helpers for running a Temporal worker inside an AWS Lambda invocation. + /// + public static class TemporalLambdaWorker + { + private const string ConfigFileEnvironmentVariable = "TEMPORAL_CONFIG_FILE"; + private const string DefaultConfigFileName = "temporal.toml"; + private const string LambdaTaskRootEnvironmentVariable = "LAMBDA_TASK_ROOT"; + private const string TaskQueueEnvironmentVariable = "TEMPORAL_TASK_QUEUE"; + private static readonly TimeSpan LowWorkBudgetWarningThreshold = TimeSpan.FromSeconds(5); + + /// + /// Create an AWS Lambda handler that runs a Temporal worker for each invocation. + /// + /// Worker deployment version for this Lambda worker. + /// Callback to configure client and worker options. + /// A Lambda handler delegate. + public static Func CreateHandler( + WorkerDeploymentVersion version, + Action configure) => + CreateHandler( + version, + configure, + new TemporalLambdaWorkerHandlerOptions + { + LoadClientConnectOptions = options => LoadClientConnectOptions(options), + }); + + /// + /// Create an AWS Lambda handler that runs a Temporal worker for each invocation. + /// + /// Worker deployment version for this Lambda worker. + /// Callback to configure client and worker options per invocation. + /// A Lambda handler delegate. + public static Func CreateHandler( + WorkerDeploymentVersion version, + Func configureAsync) => + CreateHandler( + version, + configureAsync, + new TemporalLambdaWorkerHandlerOptions + { + LoadClientConnectOptions = options => LoadClientConnectOptions(options), + }); + + /// + /// Load Temporal client connection options using AWS Lambda-aware config file resolution. + /// + /// Options for loading the configuration profile. + /// Client connection options. + public static TemporalClientConnectOptions LoadClientConnectOptions( + ClientEnvConfig.ProfileLoadOptions? options = null) + { + var loadOptions = options == null ? + new ClientEnvConfig.ProfileLoadOptions() : + (ClientEnvConfig.ProfileLoadOptions)options.Clone(); + if (loadOptions.ConfigSource == null && + !loadOptions.DisableFile && + string.IsNullOrEmpty(GetEnvironmentVariable( + loadOptions, + ConfigFileEnvironmentVariable))) + { + var lambdaTaskRoot = GetEnvironmentVariable( + loadOptions, + LambdaTaskRootEnvironmentVariable); + var root = string.IsNullOrEmpty(lambdaTaskRoot) ? "." : lambdaTaskRoot; + loadOptions.ConfigSource = DataSource.FromPath( + Path.Combine(root, DefaultConfigFileName)); + } + + return ClientEnvConfig.LoadClientConnectOptions(loadOptions); + } + + /// + /// Create an AWS Lambda handler with overridable internals for tests. + /// + /// Worker deployment version for this Lambda worker. + /// Callback to configure client and worker options. + /// Internal handler options. + /// A Lambda handler delegate. + internal static Func CreateHandler( + WorkerDeploymentVersion version, + Action configure, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + ValidateCreateHandlerArgs(version, configure, nameof(configure), handlerOptions); + var config = CreateConfig(version, handlerOptions); + configure(config); + var state = PrepareHandlerState(version, config, handlerOptions); + return state.HandleAsync; + } + + /// + /// Create an AWS Lambda handler with overridable internals for tests. + /// + /// Worker deployment version for this Lambda worker. + /// Callback to configure client and worker options per invocation. + /// Internal handler options. + /// A Lambda handler delegate. + internal static Func CreateHandler( + WorkerDeploymentVersion version, + Func configureAsync, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + ValidateCreateHandlerArgs(version, configureAsync, nameof(configureAsync), handlerOptions); + var state = new AsyncLambdaWorkerHandlerState(version, configureAsync, handlerOptions); + return state.HandleAsync; + } + + private static void ValidateCreateHandlerArgs( + WorkerDeploymentVersion version, + object configure, + string configureParamName, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + if (version == null) + { + throw new ArgumentNullException(nameof(version)); + } + if (configure == null) + { + throw new ArgumentNullException(configureParamName); + } + if (handlerOptions == null) + { + throw new ArgumentNullException(nameof(handlerOptions)); + } + if (string.IsNullOrWhiteSpace(version.DeploymentName)) + { + throw new ArgumentException("Deployment name must be set", nameof(version)); + } + if (string.IsNullOrWhiteSpace(version.BuildId)) + { + throw new ArgumentException("Build ID must be set", nameof(version)); + } + } + + private static LambdaWorkerConfig CreateConfig( + WorkerDeploymentVersion version, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + var loadClientConnectOptions = handlerOptions.LoadClientConnectOptions; + var config = new LambdaWorkerConfig( + loadClientConnectOptions == null ? + null : + () => loadClientConnectOptions(null)); + var environmentTaskQueue = handlerOptions.GetEnvironmentVariable(TaskQueueEnvironmentVariable); + if (environmentTaskQueue != null) + { + config.WorkerOptions.TaskQueue = environmentTaskQueue; + } + ApplyDeploymentVersion(config.WorkerOptions, version); + + return config; + } + + private static LambdaWorkerHandlerState PrepareHandlerState( + WorkerDeploymentVersion version, + LambdaWorkerConfig config, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + if (config.ClientOptions == null) + { + throw new InvalidOperationException("ClientOptions must be set"); + } + if (config.WorkerOptions == null) + { + throw new InvalidOperationException("WorkerOptions must be set"); + } + if (config.ShutdownHooks == null) + { + throw new InvalidOperationException("ShutdownHooks must be set"); + } + if (config.ShutdownDeadlineBuffer < TimeSpan.Zero) + { + throw new InvalidOperationException("ShutdownDeadlineBuffer cannot be negative"); + } + if (string.IsNullOrWhiteSpace(config.WorkerOptions.TaskQueue)) + { + throw new InvalidOperationException( + "WorkerOptions.TaskQueue must be set or TEMPORAL_TASK_QUEUE must be present"); + } + + var postPluginConfiguration = config.WorkerOptions.PostPluginConfiguration; + config.WorkerOptions.PostPluginConfiguration = options => + { + postPluginConfiguration?.Invoke(options); + ApplyDeploymentVersion(options, version); + ClearConcurrencyLimitsIfTunerSet(options); + }; + config.WorkerOptions.ApplyPostPluginConfiguration(); + + foreach (var hook in config.ShutdownHooks) + { + if (hook == null) + { + throw new InvalidOperationException("ShutdownHooks cannot contain null entries"); + } + } + + return new LambdaWorkerHandlerState( + (TemporalClientConnectOptions)config.ClientOptions.Clone(), + (TemporalWorkerOptions)config.WorkerOptions.Clone(), + config.ShutdownDeadlineBuffer, + new List>(config.ShutdownHooks), + handlerOptions); + } + + private static void ApplyDeploymentVersion( + TemporalWorkerOptions workerOptions, + WorkerDeploymentVersion version) + { + var defaultVersioningBehavior = + workerOptions.DeploymentOptions?.DefaultVersioningBehavior is { } behavior && + behavior != VersioningBehavior.Unspecified ? + behavior : + VersioningBehavior.AutoUpgrade; + workerOptions.DeploymentOptions = new WorkerDeploymentOptions( + version, + useWorkerVersioning: true) + { + DefaultVersioningBehavior = defaultVersioningBehavior, + }; + workerOptions.BuildId = null; + workerOptions.UseWorkerVersioning = false; + } + + private static void ClearConcurrencyLimitsIfTunerSet(TemporalWorkerOptions workerOptions) + { + if (workerOptions.Tuner == null) + { + return; + } + + workerOptions.MaxConcurrentActivities = null; + workerOptions.MaxConcurrentWorkflowTasks = null; + workerOptions.MaxConcurrentLocalActivities = null; + workerOptions.MaxConcurrentNexusTasks = null; + } + + private static CancellationTokenSource CreateHookCancellationTokenSource( + TimeSpan remainingTime) + { + if (remainingTime <= TimeSpan.Zero) + { + var cts = new CancellationTokenSource(); + cts.Cancel(); + return cts; + } + return new CancellationTokenSource(remainingTime); + } + + private static string? GetEnvironmentVariable( + ClientEnvConfig.ProfileLoadOptions options, + string name) + { + if (options.OverrideEnvVars != null) + { + return options.OverrideEnvVars.TryGetValue(name, out var value) ? value : null; + } + + return Environment.GetEnvironmentVariable(name); + } + + private static void LogLine(ILambdaContext context, string message) => + context.Logger?.LogLine(message); + + private sealed class LambdaWorkerHandlerState + { + private readonly TemporalClientConnectOptions clientOptions; + private readonly TemporalWorkerOptions workerOptions; + private readonly TimeSpan shutdownDeadlineBuffer; + private readonly IReadOnlyCollection> shutdownHooks; + private readonly TemporalLambdaWorkerHandlerOptions handlerOptions; + + public LambdaWorkerHandlerState( + TemporalClientConnectOptions clientOptions, + TemporalWorkerOptions workerOptions, + TimeSpan shutdownDeadlineBuffer, + IReadOnlyCollection> shutdownHooks, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + this.clientOptions = clientOptions; + this.workerOptions = workerOptions; + this.shutdownDeadlineBuffer = shutdownDeadlineBuffer; + this.shutdownHooks = shutdownHooks; + this.handlerOptions = handlerOptions; + } + + public async Task HandleAsync(object? input, ILambdaContext context) + { + _ = input; + if (context == null) + { + throw new ArgumentNullException(nameof(context)); + } + + var initialWorkBudget = context.RemainingTime - shutdownDeadlineBuffer; + if (initialWorkBudget <= TimeSpan.Zero) + { + throw new InvalidOperationException( + "Lambda remaining time is too low to start a Temporal worker"); + } + if (initialWorkBudget < LowWorkBudgetWarningThreshold) + { + LogLine( + context, + $"WARNING: Temporal Lambda worker budget is only {initialWorkBudget.TotalSeconds:F3} seconds"); + } + + try + { + var invocationClientOptions = + (TemporalClientConnectOptions)clientOptions.Clone(); + if (invocationClientOptions.Identity == null) + { + invocationClientOptions.Identity = + $"{context.AwsRequestId}@{context.InvokedFunctionArn}"; + } + + var invocationWorkerOptions = + (TemporalWorkerOptions)workerOptions.Clone(); + var client = await handlerOptions.ConnectClientAsync( + invocationClientOptions).ConfigureAwait(false); + using (var worker = handlerOptions.CreateWorker( + client, + invocationWorkerOptions)) + { + var workBudget = context.RemainingTime - shutdownDeadlineBuffer; + if (workBudget <= TimeSpan.Zero) + { + return; + } + + using (var runCts = new CancellationTokenSource(workBudget)) + { + await ExecuteWorkerAsync(worker, runCts).ConfigureAwait(false); + } + } + } + finally + { + await RunShutdownHooksAsync(context).ConfigureAwait(false); + } + } + + private static async Task ExecuteWorkerAsync( + ILambdaWorker worker, + CancellationTokenSource runCts) + { + try + { + await worker.ExecuteAsync(runCts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (runCts.IsCancellationRequested) + { + // Expected path when the Lambda worker reaches its run budget. + } + } + + private async Task RunShutdownHooksAsync(ILambdaContext context) + { + using (var hookCts = CreateHookCancellationTokenSource(context.RemainingTime)) + { + foreach (var hook in shutdownHooks) + { +#pragma warning disable CA1031 // All hook failures are logged and later hooks still run. + try + { + await hook(hookCts.Token).ConfigureAwait(false); + } + catch (Exception e) + { + LogLine( + context, + $"ERROR: Temporal Lambda worker shutdown hook failed: {e}"); + } +#pragma warning restore CA1031 + } + } + } + } + + private sealed class AsyncLambdaWorkerHandlerState + { + private readonly WorkerDeploymentVersion version; + private readonly Func configureAsync; + private readonly TemporalLambdaWorkerHandlerOptions handlerOptions; + + public AsyncLambdaWorkerHandlerState( + WorkerDeploymentVersion version, + Func configureAsync, + TemporalLambdaWorkerHandlerOptions handlerOptions) + { + this.version = version; + this.configureAsync = configureAsync; + this.handlerOptions = handlerOptions; + } + + public async Task HandleAsync(object? input, ILambdaContext context) + { + if (context == null) + { + throw new ArgumentNullException(nameof(context)); + } + + var config = CreateConfig(version, handlerOptions); + await configureAsync(config).ConfigureAwait(false); + var state = PrepareHandlerState(version, config, handlerOptions); + await state.HandleAsync(input, context).ConfigureAwait(false); + } + } + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda/TemporalLambdaWorkerHandlerOptions.cs b/src/Temporalio.Extensions.Aws.Lambda/TemporalLambdaWorkerHandlerOptions.cs new file mode 100644 index 00000000..61a92176 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/TemporalLambdaWorkerHandlerOptions.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading.Tasks; +using Temporalio.Client; +using Temporalio.Common.EnvConfig; +using Temporalio.Worker; + +namespace Temporalio.Extensions.Aws.Lambda +{ + /// + /// Internal test seams for the Lambda worker handler. + /// + internal sealed class TemporalLambdaWorkerHandlerOptions + { + /// + /// Gets or sets the environment variable reader. + /// + public Func GetEnvironmentVariable { get; set; } = + Environment.GetEnvironmentVariable; + + /// + /// Gets or sets the client configuration loader. + /// + public Func? + LoadClientConnectOptions + { get; set; } + + /// + /// Gets or sets the client connection factory. + /// + public Func> ConnectClientAsync { get; set; } = + async options => await TemporalClient.ConnectAsync(options).ConfigureAwait(false); + + /// + /// Gets or sets the worker factory. + /// + public Func CreateWorker { get; set; } = + (client, options) => new TemporalWorkerAdapter( + new TemporalWorker((IWorkerClient)client, options)); + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda/TemporalWorkerAdapter.cs b/src/Temporalio.Extensions.Aws.Lambda/TemporalWorkerAdapter.cs new file mode 100644 index 00000000..b9a4ab27 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/TemporalWorkerAdapter.cs @@ -0,0 +1,27 @@ +using System.Threading; +using System.Threading.Tasks; +using Temporalio.Worker; + +namespace Temporalio.Extensions.Aws.Lambda +{ + /// + /// Adapter from to . + /// + internal sealed class TemporalWorkerAdapter : ILambdaWorker + { + private readonly TemporalWorker worker; + + /// + /// Initializes a new instance of the class. + /// + /// Worker to adapt. + public TemporalWorkerAdapter(TemporalWorker worker) => this.worker = worker; + + /// + public Task ExecuteAsync(CancellationToken stoppingToken) => + worker.ExecuteAsync(stoppingToken); + + /// + public void Dispose() => worker.Dispose(); + } +} diff --git a/src/Temporalio.Extensions.Aws.Lambda/Temporalio.Extensions.Aws.Lambda.csproj b/src/Temporalio.Extensions.Aws.Lambda/Temporalio.Extensions.Aws.Lambda.csproj new file mode 100644 index 00000000..605c9250 --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/Temporalio.Extensions.Aws.Lambda.csproj @@ -0,0 +1,34 @@ + + + + Temporal SDK .NET AWS Lambda Worker Extension + + false + true + 9.0 + README.md + true + snupkg + netstandard2.0 + + + + + + + + + + + + + + <_Parameter1>Temporalio.Tests + + + + + + + + diff --git a/src/Temporalio.Extensions.Aws.Lambda/packages.lock.json b/src/Temporalio.Extensions.Aws.Lambda/packages.lock.json new file mode 100644 index 00000000..dbf0002c --- /dev/null +++ b/src/Temporalio.Extensions.Aws.Lambda/packages.lock.json @@ -0,0 +1,164 @@ +{ + "version": 2, + "dependencies": { + ".NETStandard,Version=v2.0": { + "Amazon.Lambda.Core": { + "type": "Direct", + "requested": "[3.1.0, )", + "resolved": "3.1.0", + "contentHash": "uZZ2k5lMoB9OzPTmKkkEKpyFcnLxcb7FxtxrA3+HBg/sooTzu402iCcSk5r+N62Qokhwr4Q9cbaVJSM6Dln3aA==" + }, + "Microsoft.VisualStudio.Threading.Analyzers": { + "type": "Direct", + "requested": "[17.4.33, )", + "resolved": "17.4.33", + "contentHash": "gpf500WFCqnhJmbIEDvVYgeNLpHZ6PAyhcewyqqpE8UoKNHoCDdMgogulYuJERbRaGqywqD1pRwPNplVteNZ8g==" + }, + "NETStandard.Library": { + "type": "Direct", + "requested": "[2.0.3, )", + "resolved": "2.0.3", + "contentHash": "st47PosZSHrjECdjeIzZQbzivYBJFv6P2nv4cj2ypdI204DO+vZ7l5raGMiX4eXMJ53RfOIg+/s4DHVZ54Nu2A==", + "dependencies": { + "Microsoft.NETCore.Platforms": "1.1.0" + } + }, + "StyleCop.Analyzers": { + "type": "Direct", + "requested": "[1.2.0-beta.435, )", + "resolved": "1.2.0-beta.435", + "contentHash": "TADk7vdGXtfTnYCV7GyleaaRTQjfoSfZXprQrVMm7cSJtJbFc1QIbWPyLvrgrfGdfHbGmUPvaN4ODKNxg2jgPQ==", + "dependencies": { + "StyleCop.Analyzers.Unstable": "1.2.0.435" + } + }, + "Microsoft.Bcl.AsyncInterfaces": { + "type": "Transitive", + "resolved": "9.0.4", + "contentHash": "9VGI5kxIvrNG2mqLQZnUR6y/3fcnygD8eNpHR+CqfbnIXvea6nehnYknDKQTxZVPMpzpNca+7DxLBmpdB3q0Bw==", + "dependencies": { + "System.Threading.Tasks.Extensions": "4.5.4" + } + }, + "Microsoft.NETCore.Platforms": { + "type": "Transitive", + "resolved": "1.1.0", + "contentHash": "kz0PEW2lhqygehI/d6XsPCQzD7ff7gUJaVGPVETX611eadGsA3A877GdSlU0LRVMCTH/+P3o2iDTak+S08V2+A==" + }, + "StyleCop.Analyzers.Unstable": { + "type": "Transitive", + "resolved": "1.2.0.435", + "contentHash": "ouwPWZxbOV3SmCZxIRqHvljkSzkCyi1tDoMzQtDb/bRP8ctASV/iRJr+A2Gdj0QLaLmWnqTWDrH82/iP+X80Lg==" + }, + "System.Buffers": { + "type": "Transitive", + "resolved": "4.5.1", + "contentHash": "Rw7ijyl1qqRS0YQD/WycNst8hUUMgrMH4FCn1nNm27M4VxchZ1js3fVjQaANHO5f3sN4isvP4a+Met9Y4YomAg==" + }, + "System.IO.Pipelines": { + "type": "Transitive", + "resolved": "9.0.4", + "contentHash": "luF2Xba+lTe2GOoNQdZLe8q7K6s7nSpWZl9jIwWNMszN4/Yv0lmxk9HISgMmwdyZ83i3UhAGXaSY9o6IJBUuuA==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Memory": "4.5.5", + "System.Threading.Tasks.Extensions": "4.5.4" + } + }, + "System.Memory": { + "type": "Transitive", + "resolved": "4.5.5", + "contentHash": "XIWiDvKPXaTveaB7HVganDlOCRoj03l+jrwNvcge/t8vhGYKvqV+dMv6G4SAX2NoNmN0wZfVPTAlFwZcZvVOUw==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Numerics.Vectors": "4.4.0", + "System.Runtime.CompilerServices.Unsafe": "4.5.3" + } + }, + "System.Numerics.Vectors": { + "type": "Transitive", + "resolved": "4.4.0", + "contentHash": "UiLzLW+Lw6HLed1Hcg+8jSRttrbuXv7DANVj0DkL9g6EnnzbL75EB7EWsw5uRbhxd/4YdG8li5XizGWepmG3PQ==" + }, + "System.Runtime.CompilerServices.Unsafe": { + "type": "Transitive", + "resolved": "6.0.0", + "contentHash": "/iUeP3tq1S0XdNNoMz5C9twLSrM/TH+qElHkXWaPvuNOt+99G75NrV0OS2EqHx5wMN7popYjpc8oTjC1y16DLg==" + }, + "System.Text.Encodings.Web": { + "type": "Transitive", + "resolved": "9.0.4", + "contentHash": "V+5cCPpk1S2ngekUs9nDrQLHGiWFZMg8BthADQr+Fwi59a8DdHFu26S2oi9Bfgv+d67bqmkPqctJXMEXiimXUg==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0" + } + }, + "System.Threading.Tasks.Extensions": { + "type": "Transitive", + "resolved": "4.5.4", + "contentHash": "zteT+G8xuGu6mS+mzDzYXbzS7rd3K6Fjb9RiZlYlJPam2/hU7JCBZBVEcywNuR+oZ1ncTvc/cq0faRr3P01OVg==", + "dependencies": { + "System.Runtime.CompilerServices.Unsafe": "4.5.3" + } + }, + "temporalio": { + "type": "Project", + "dependencies": { + "Google.Protobuf": "[3.26.1, )", + "Microsoft.Bcl.HashCode": "[6.0.0, )", + "Microsoft.Extensions.Logging.Abstractions": "[2.2.0, )", + "NexusRpc": "[0.3.0, )", + "System.Text.Json": "[9.0.4, )" + } + }, + "Google.Protobuf": { + "type": "CentralTransitive", + "requested": "[3.26.1, )", + "resolved": "3.26.1", + "contentHash": "CHZX8zXqhF/fdUtd+AYzew8T2HFkAoe5c7lbGxZY/qryAlQXckDvM5BfOJjXlMS7kyICqQTMszj4w1bX5uBJ/w==", + "dependencies": { + "System.Memory": "4.5.3", + "System.Runtime.CompilerServices.Unsafe": "4.5.2" + } + }, + "Microsoft.Bcl.HashCode": { + "type": "CentralTransitive", + "requested": "[6.0.0, )", + "resolved": "6.0.0", + "contentHash": "GI4jcoi6eC9ZhNOQylIBaWOQjyGaR8T6N3tC1u8p3EXfndLCVNNWa+Zp+ocjvvS3kNBN09Zma2HXL0ezO0dRfw==" + }, + "Microsoft.Extensions.Logging.Abstractions": { + "type": "CentralTransitive", + "requested": "[2.2.0, )", + "resolved": "2.2.0", + "contentHash": "B2WqEox8o+4KUOpL7rZPyh6qYjik8tHi2tN8Z9jZkHzED8ElYgZa/h6K+xliB435SqUcWT290Fr2aa8BtZjn8A==" + }, + "NexusRpc": { + "type": "CentralTransitive", + "requested": "[0.3.0, )", + "resolved": "0.3.0", + "contentHash": "Kr+NMSZ5428AvxpzShdJcQxc9w6HT8SM6FXQMekC4K9wGpmC1m/L2pQJydpvVTwRBu3qAIYKPI37KWexF4Gtcg==", + "dependencies": { + "Microsoft.Bcl.HashCode": "6.0.0" + } + }, + "System.Text.Json": { + "type": "CentralTransitive", + "requested": "[9.0.4, )", + "resolved": "9.0.4", + "contentHash": "pYtmpcO6R3Ef1XilZEHgXP2xBPVORbYEzRP7dl0IAAbN8Dm+kfwio8aCKle97rAWXOExr292MuxWYurIuwN62g==", + "dependencies": { + "Microsoft.Bcl.AsyncInterfaces": "9.0.4", + "System.Buffers": "4.5.1", + "System.IO.Pipelines": "9.0.4", + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0", + "System.Text.Encodings.Web": "9.0.4", + "System.Threading.Tasks.Extensions": "4.5.4" + } + } + } + } +} \ No newline at end of file diff --git a/src/Temporalio/Bridge/EnvConfig.cs b/src/Temporalio/Bridge/EnvConfig.cs index 648acde4..0c7efd64 100644 --- a/src/Temporalio/Bridge/EnvConfig.cs +++ b/src/Temporalio/Bridge/EnvConfig.cs @@ -24,7 +24,7 @@ internal static class EnvConfig try { - var envVarsRef = options.OverrideEnvVars?.Count > 0 + var envVarsRef = options.OverrideEnvVars != null ? scope.ByteArray(JsonSerializer.Serialize(options.OverrideEnvVars)) : ByteArrayRef.Empty.Ref; @@ -60,7 +60,7 @@ public static ClientEnvConfig.Profile LoadClientConfigProfile( try { - var envVarsRef = options.OverrideEnvVars?.Count > 0 + var envVarsRef = options.OverrideEnvVars != null ? scope.ByteArray(JsonSerializer.Serialize(options.OverrideEnvVars)) : ByteArrayRef.Empty.Ref; diff --git a/src/Temporalio/Common/EnvConfig/ClientEnvConfig.cs b/src/Temporalio/Common/EnvConfig/ClientEnvConfig.cs index 82cea6c3..d0c20174 100644 --- a/src/Temporalio/Common/EnvConfig/ClientEnvConfig.cs +++ b/src/Temporalio/Common/EnvConfig/ClientEnvConfig.cs @@ -206,7 +206,7 @@ public sealed record Tls( { if (Disabled == true) { - return null; + return new TlsOptions { Disabled = true }; } return new TlsOptions @@ -465,4 +465,4 @@ public static DataSource FromBytes(byte[] data) return null; } } -} \ No newline at end of file +} diff --git a/src/Temporalio/Temporalio.csproj b/src/Temporalio/Temporalio.csproj index 817fe019..eb0f5aec 100644 --- a/src/Temporalio/Temporalio.csproj +++ b/src/Temporalio/Temporalio.csproj @@ -28,6 +28,9 @@ <_Parameter1>Temporalio.Tests + + <_Parameter1>Temporalio.Extensions.Aws.Lambda + diff --git a/src/Temporalio/Worker/TemporalWorker.cs b/src/Temporalio/Worker/TemporalWorker.cs index deea8248..7d924d93 100644 --- a/src/Temporalio/Worker/TemporalWorker.cs +++ b/src/Temporalio/Worker/TemporalWorker.cs @@ -57,6 +57,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) { plugin.ConfigureWorker(Options); } + Options.ApplyPostPluginConfiguration(); // Ensure later accesses use the modified version of options. options = Options; diff --git a/src/Temporalio/Worker/TemporalWorkerOptions.cs b/src/Temporalio/Worker/TemporalWorkerOptions.cs index e3953a91..ee335d9b 100644 --- a/src/Temporalio/Worker/TemporalWorkerOptions.cs +++ b/src/Temporalio/Worker/TemporalWorkerOptions.cs @@ -376,6 +376,14 @@ public TemporalWorkerOptions() internal Func WorkflowInstanceFactory { get; set; } = DefaultWorkflowInstanceFactory; + /// + /// Gets or sets a function to run after worker plugins configure options. + /// + /// + /// Don't expose this until there's a use case. + /// + internal Action? PostPluginConfiguration { get; set; } + /// /// Add the given delegate with as an activity. This is /// usually a method reference. @@ -517,5 +525,10 @@ internal void OnTaskCompleted(WorkflowInstance instance, Exception? failureExcep handler(instance, new(instance, failureException)); } } + + /// + /// Run post-plugin configuration. + /// + internal void ApplyPostPluginConfiguration() => PostPluginConfiguration?.Invoke(this); } } diff --git a/tests/Temporalio.SimpleBench/Program.cs b/tests/Temporalio.SimpleBench/Program.cs index d8492b6d..e16add84 100644 --- a/tests/Temporalio.SimpleBench/Program.cs +++ b/tests/Temporalio.SimpleBench/Program.cs @@ -153,4 +153,4 @@ public record Results( TimeSpan StartDuration, TimeSpan ResultDuration, decimal WorkflowsPerSecond); -} \ No newline at end of file +} diff --git a/tests/Temporalio.Tests/Common/EnvConfig/ClientConfigTests.cs b/tests/Temporalio.Tests/Common/EnvConfig/ClientConfigTests.cs index baa467f0..45632842 100644 --- a/tests/Temporalio.Tests/Common/EnvConfig/ClientConfigTests.cs +++ b/tests/Temporalio.Tests/Common/EnvConfig/ClientConfigTests.cs @@ -6,7 +6,7 @@ namespace Temporalio.Tests.Common.EnvConfig { /// /// Environment configuration tests following Python/TypeScript patterns for cross-SDK consistency. - /// Comprehensive 34-test suite covering all aspects of environment configuration. + /// Comprehensive test suite covering all aspects of environment configuration. /// public class ClientConfigTests : TestBase { @@ -196,7 +196,7 @@ public void Test_Profile_Null_Address_Preserves_Null_In_Connection_Options() Assert.Equal("test-namespace", options.Namespace); } - // === ENVIRONMENT VARIABLES TESTS (4 tests) === + // === ENVIRONMENT VARIABLES TESTS (5 tests) === [Fact] public void Test_Load_Profile_Grpc_Meta_Env_Overrides() { @@ -287,6 +287,32 @@ public void Test_Load_Profile_Disable_Env() Assert.Equal("default-address", profile.Address); } + [Fact] + public void EmptyOverrideEnvVarsSuppressesSystemEnvironment() + { + var previousAddress = Environment.GetEnvironmentVariable("TEMPORAL_ADDRESS"); + var previousNamespace = Environment.GetEnvironmentVariable("TEMPORAL_NAMESPACE"); + try + { + Environment.SetEnvironmentVariable("TEMPORAL_ADDRESS", "system-address"); + Environment.SetEnvironmentVariable("TEMPORAL_NAMESPACE", "system-namespace"); + + var profile = ClientEnvConfig.Profile.Load(new ClientEnvConfig.ProfileLoadOptions + { + DisableFile = true, + OverrideEnvVars = new Dictionary(), + }); + + Assert.Null(profile.Address); + Assert.Null(profile.Namespace); + } + finally + { + Environment.SetEnvironmentVariable("TEMPORAL_ADDRESS", previousAddress); + Environment.SetEnvironmentVariable("TEMPORAL_NAMESPACE", previousNamespace); + } + } + // === CONTROL FLAGS TESTS (3 tests) === [Fact] public void Test_Load_Profile_Disable_File() @@ -465,7 +491,8 @@ public void Test_Load_Profile_Tls_Options() Assert.True(profileDisabled.Tls.Disabled); var optionsDisabled = profileDisabled.ToClientConnectionOptions(); - Assert.Null(optionsDisabled.Tls); + Assert.NotNull(optionsDisabled.Tls); + Assert.True(optionsDisabled.Tls.Disabled); // Test TLS with certs var profileCerts = ClientEnvConfig.Profile.Load(new ClientEnvConfig.ProfileLoadOptions @@ -682,7 +709,9 @@ public void Test_Tls_Disabled_Tri_State_Behavior() ConfigSource = DataSource.FromUTF8String(tomlTrue), }); Assert.True(profileTrue.Tls!.Disabled); // explicitly disabled=true - Assert.Null(profileTrue.ToClientConnectionOptions().Tls); // TLS disabled even with API key + var optionsTrue = profileTrue.ToClientConnectionOptions(); + Assert.NotNull(optionsTrue.Tls); + Assert.True(optionsTrue.Tls.Disabled); // TLS disabled even with API key } // === ERROR HANDLING TESTS (4 tests) === diff --git a/tests/Temporalio.Tests/Extensions/Aws/Lambda/OpenTelemetry/LambdaWorkerOpenTelemetryTests.cs b/tests/Temporalio.Tests/Extensions/Aws/Lambda/OpenTelemetry/LambdaWorkerOpenTelemetryTests.cs new file mode 100644 index 00000000..f0e0c3e5 --- /dev/null +++ b/tests/Temporalio.Tests/Extensions/Aws/Lambda/OpenTelemetry/LambdaWorkerOpenTelemetryTests.cs @@ -0,0 +1,295 @@ +namespace Temporalio.Tests.Extensions.Aws.Lambda.OpenTelemetry; + +using global::OpenTelemetry; +using global::OpenTelemetry.Trace; +using Temporalio.Client; +using Temporalio.Client.Interceptors; +using Temporalio.Extensions.Aws.Lambda; +using Temporalio.Extensions.Aws.Lambda.OpenTelemetry; +using Xunit; +using TemporalOpenTelemetry = Temporalio.Extensions.OpenTelemetry; + +public class LambdaWorkerOpenTelemetryTests +{ + private const string OTelExporterOtlpEndpointEnvironmentVariable = + "OTEL_EXPORTER_OTLP_ENDPOINT"; + + private const string OTelServiceNameEnvironmentVariable = "OTEL_SERVICE_NAME"; + private const string LambdaFunctionNameEnvironmentVariable = "AWS_LAMBDA_FUNCTION_NAME"; + + [Fact] + public void ApplyDefaults_NullConfigThrows() + { + Assert.Throws(() => + LambdaWorkerOpenTelemetry.ApplyDefaults(null!)); + } + + [Fact] + public void ResolveOptions_ExplicitOptionsWin() + { + using var env = new EnvironmentScope( + KeyValuePair.Create( + OTelExporterOtlpEndpointEnvironmentVariable, + "http://env:4317"), + KeyValuePair.Create( + OTelServiceNameEnvironmentVariable, + "env-service"), + KeyValuePair.Create( + LambdaFunctionNameEnvironmentVariable, + "lambda-service")); + var resolved = LambdaWorkerOpenTelemetry.ResolveOptions(new LambdaWorkerOpenTelemetryOptions + { + CollectorEndpoint = "http://explicit:4317", + ServiceName = "explicit-service", + MetricsExportInterval = TimeSpan.FromSeconds(3), + }); + + Assert.Equal(new Uri("http://explicit:4317"), resolved.CollectorEndpoint); + Assert.Equal("explicit-service", resolved.ServiceName); + Assert.Equal(TimeSpan.FromSeconds(3), resolved.MetricsExportInterval); + } + + [Fact] + public void ResolveOptions_EnvironmentWinsOverFallbacks() + { + using var env = new EnvironmentScope( + KeyValuePair.Create( + OTelExporterOtlpEndpointEnvironmentVariable, + "http://env:4317"), + KeyValuePair.Create( + OTelServiceNameEnvironmentVariable, + "env-service"), + KeyValuePair.Create( + LambdaFunctionNameEnvironmentVariable, + "lambda-service")); + var resolved = LambdaWorkerOpenTelemetry.ResolveOptions(); + + Assert.Equal(new Uri("http://env:4317"), resolved.CollectorEndpoint); + Assert.Equal("env-service", resolved.ServiceName); + Assert.Equal(TimeSpan.FromSeconds(10), resolved.MetricsExportInterval); + } + + [Fact] + public void ResolveOptions_LambdaFunctionNameWinsOverDefaultServiceName() + { + using var env = new EnvironmentScope( + KeyValuePair.Create( + OTelExporterOtlpEndpointEnvironmentVariable, + null), + KeyValuePair.Create( + OTelServiceNameEnvironmentVariable, + null), + KeyValuePair.Create( + LambdaFunctionNameEnvironmentVariable, + "lambda-service")); + var resolved = LambdaWorkerOpenTelemetry.ResolveOptions(); + + Assert.Equal(new Uri("http://localhost:4317"), resolved.CollectorEndpoint); + Assert.Equal("lambda-service", resolved.ServiceName); + } + + [Fact] + public void ResolveOptions_UsesFallbacks() + { + using var env = new EnvironmentScope( + KeyValuePair.Create( + OTelExporterOtlpEndpointEnvironmentVariable, + null), + KeyValuePair.Create( + OTelServiceNameEnvironmentVariable, + null), + KeyValuePair.Create( + LambdaFunctionNameEnvironmentVariable, + null)); + var resolved = LambdaWorkerOpenTelemetry.ResolveOptions(); + + Assert.Equal(new Uri("http://localhost:4317"), resolved.CollectorEndpoint); + Assert.Equal("temporal-lambda-worker", resolved.ServiceName); + Assert.Equal(TimeSpan.FromSeconds(10), resolved.MetricsExportInterval); + } + + [Fact] + public void ResolveOptions_InvalidMetricsExportIntervalThrows() + { + Assert.Throws(() => + LambdaWorkerOpenTelemetry.ResolveOptions( + new LambdaWorkerOpenTelemetryOptions + { + MetricsExportInterval = TimeSpan.Zero, + })); + } + + [Fact] + public void ApplyDefaults_PreservesInterceptorsAndAddsTracing() + { + var existingInterceptor = new NoopClientInterceptor(); + var config = new LambdaWorkerConfig + { + ClientOptions = new TemporalClientConnectOptions + { + Interceptors = new IClientInterceptor[] { existingInterceptor }, + }, + }; + + LambdaWorkerOpenTelemetry.ApplyDefaults(config); + + var interceptors = Assert.IsAssignableFrom>( + config.ClientOptions.Interceptors); + Assert.Equal(2, interceptors.Count); + Assert.Same(existingInterceptor, interceptors.First()); + Assert.IsType(interceptors.Last()); + } + + [Fact] + public async Task ApplyDefaults_ConfiguresRuntimeAndShutdownHook() + { + var config = new LambdaWorkerConfig + { + ShutdownDeadlineBuffer = TimeSpan.FromMilliseconds(1), + }; + config.ShutdownHooks.Add(_ => Task.CompletedTask); + + LambdaWorkerOpenTelemetry.ApplyDefaults( + config, + new LambdaWorkerOpenTelemetryOptions + { + CollectorEndpoint = "http://localhost:4317", + ServiceName = "test-service", + MetricsExportInterval = TimeSpan.FromSeconds(1), + }); + + Assert.NotNull(config.ClientOptions.Runtime); + Assert.Equal(2, config.ShutdownHooks.Count); + await config.ShutdownHooks[1](CancellationToken.None); + } + + [Fact] + public async Task ForceFlushAsync_RunsForceFlushOffCallerThread() + { + using var flushStarted = new ManualResetEventSlim(); + using var releaseFlush = new ManualResetEventSlim(); +#pragma warning disable CA2000 // Tracer provider owns the processor/exporter. + using var provider = Sdk.CreateTracerProviderBuilder(). + AddProcessor(new SimpleActivityExportProcessor( + new BlockingForceFlushExporter(flushStarted, releaseFlush))). + Build(); +#pragma warning restore CA2000 + +#pragma warning disable CA2025 // The task is completed before the provider exits scope. + var flushTask = LambdaWorkerOpenTelemetry.ForceFlushAsync( + provider, + TimeSpan.FromSeconds(10), + CancellationToken.None); +#pragma warning restore CA2025 + + try + { + Assert.True(flushStarted.Wait(TimeSpan.FromSeconds(5))); + Assert.False(flushTask.IsCompleted); + } + finally + { + releaseFlush.Set(); + await flushTask.WaitAsync(TimeSpan.FromSeconds(5)); + } + } + + [Fact] + public async Task ForceFlushAsync_ReturnsWhenCancellationRequested() + { + using var flushStarted = new ManualResetEventSlim(); + using var releaseFlush = new ManualResetEventSlim(); + using var flushCompleted = new ManualResetEventSlim(); +#pragma warning disable CA2000 // Tracer provider owns the processor/exporter. + using var provider = Sdk.CreateTracerProviderBuilder(). + AddProcessor(new SimpleActivityExportProcessor( + new BlockingForceFlushExporter(flushStarted, releaseFlush, flushCompleted))). + Build(); +#pragma warning restore CA2000 + using var cts = new CancellationTokenSource(); + +#pragma warning disable CA2025 // The provider exits scope after the blocking flush is released. + var flushTask = LambdaWorkerOpenTelemetry.ForceFlushAsync( + provider, + TimeSpan.FromSeconds(10), + cts.Token); +#pragma warning restore CA2025 + + try + { + Assert.True(flushStarted.Wait(TimeSpan.FromSeconds(5))); + await cts.CancelAsync(); + await flushTask.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.False(flushCompleted.IsSet); + } + finally + { + releaseFlush.Set(); + Assert.True(flushCompleted.Wait(TimeSpan.FromSeconds(5))); + } + } + + private sealed class BlockingForceFlushExporter : + BaseExporter + { + private readonly ManualResetEventSlim flushStarted; + private readonly ManualResetEventSlim releaseFlush; + private readonly ManualResetEventSlim? flushCompleted; + + public BlockingForceFlushExporter( + ManualResetEventSlim flushStarted, + ManualResetEventSlim releaseFlush, + ManualResetEventSlim? flushCompleted = null) + { + this.flushStarted = flushStarted; + this.releaseFlush = releaseFlush; + this.flushCompleted = flushCompleted; + } + + public override ExportResult Export(in Batch batch) => + ExportResult.Success; + + protected override bool OnForceFlush(int timeoutMilliseconds) + { + flushStarted.Set(); + try + { + return releaseFlush.Wait(timeoutMilliseconds); + } + finally + { + flushCompleted?.Set(); + } + } + } + + private sealed class NoopClientInterceptor : IClientInterceptor + { + public ClientOutboundInterceptor InterceptClient( + ClientOutboundInterceptor nextInterceptor) => nextInterceptor; + } + + private sealed class EnvironmentScope : IDisposable + { + private readonly IReadOnlyDictionary previousValues; + + public EnvironmentScope(params KeyValuePair[] values) + { + previousValues = values.ToDictionary( + pair => pair.Key, + pair => Environment.GetEnvironmentVariable(pair.Key)); + foreach (var pair in values) + { + Environment.SetEnvironmentVariable(pair.Key, pair.Value); + } + } + + public void Dispose() + { + foreach (var pair in previousValues) + { + Environment.SetEnvironmentVariable(pair.Key, pair.Value); + } + } + } +} diff --git a/tests/Temporalio.Tests/Extensions/Aws/Lambda/TemporalLambdaWorkerNonParallelDefinition.cs b/tests/Temporalio.Tests/Extensions/Aws/Lambda/TemporalLambdaWorkerNonParallelDefinition.cs new file mode 100644 index 00000000..4265f4ed --- /dev/null +++ b/tests/Temporalio.Tests/Extensions/Aws/Lambda/TemporalLambdaWorkerNonParallelDefinition.cs @@ -0,0 +1,8 @@ +namespace Temporalio.Tests.Extensions.Aws.Lambda; + +using Xunit; + +[CollectionDefinition("TemporalLambdaWorkerNonParallel", DisableParallelization = true)] +public sealed class TemporalLambdaWorkerNonParallelDefinition +{ +} diff --git a/tests/Temporalio.Tests/Extensions/Aws/Lambda/TemporalLambdaWorkerTests.cs b/tests/Temporalio.Tests/Extensions/Aws/Lambda/TemporalLambdaWorkerTests.cs new file mode 100644 index 00000000..3d03852b --- /dev/null +++ b/tests/Temporalio.Tests/Extensions/Aws/Lambda/TemporalLambdaWorkerTests.cs @@ -0,0 +1,1095 @@ +namespace Temporalio.Tests.Extensions.Aws.Lambda; + +using Amazon.Lambda.Core; +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Common; +using Temporalio.Common.EnvConfig; +using Temporalio.Extensions.Aws.Lambda; +using Temporalio.Worker; +using Temporalio.Worker.Tuning; +using Temporalio.Workflows; +using Xunit; + +[Collection("TemporalLambdaWorkerNonParallel")] +public class TemporalLambdaWorkerTests +{ + private static readonly WorkerDeploymentVersion Version = new("deployment", "build"); + + [Fact] + public async Task CreateHandler_DefaultsAreAppliedAndUserOverridesWin() + { + var configureCalls = 0; + TemporalClientConnectOptions? capturedClientOptions = null; + TemporalWorkerOptions? capturedWorkerOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + configureCalls++; + Assert.Equal(2, config.WorkerOptions.MaxConcurrentActivities); + Assert.Equal(10, config.WorkerOptions.MaxConcurrentWorkflowTasks); + Assert.Equal(2, config.WorkerOptions.MaxConcurrentLocalActivities); + Assert.Equal(5, config.WorkerOptions.MaxConcurrentNexusTasks); + Assert.Equal(TimeSpan.FromSeconds(5), config.WorkerOptions.GracefulShutdownTimeout); + Assert.Equal(30, config.WorkerOptions.MaxCachedWorkflows); + Assert.Equal(2, config.WorkerOptions.MaxConcurrentWorkflowTaskPolls); + Assert.Equal(1, config.WorkerOptions.MaxConcurrentActivityTaskPolls); + Assert.Equal(1, config.WorkerOptions.MaxConcurrentNexusTaskPolls); + Assert.Null(config.WorkerOptions.WorkflowTaskPollerBehavior); + Assert.Null(config.WorkerOptions.ActivityTaskPollerBehavior); + Assert.Null(config.WorkerOptions.NexusTaskPollerBehavior); + Assert.True(config.WorkerOptions.DisableEagerActivityExecution); + Assert.NotNull(config.WorkerOptions.DeploymentOptions); + Assert.Equal(Version, config.WorkerOptions.DeploymentOptions.Version); + Assert.True(config.WorkerOptions.DeploymentOptions.UseWorkerVersioning); + Assert.Equal( + VersioningBehavior.AutoUpgrade, + config.WorkerOptions.DeploymentOptions.DefaultVersioningBehavior); + Assert.Equal("env-task-queue", config.WorkerOptions.TaskQueue); + Assert.Equal("loaded-address", config.ClientOptions.TargetHost); + Assert.Equal("loaded-namespace", config.ClientOptions.Namespace); + + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "configured-task-queue"; + config.WorkerOptions.MaxConcurrentActivities = 8; + config.WorkerOptions.MaxConcurrentActivityTaskPolls = 4; + config.WorkerOptions.MaxCachedWorkflows = 12; + config.WorkerOptions.DisableEagerActivityExecution = false; + config.WorkerOptions.DeploymentOptions = new WorkerDeploymentOptions( + new WorkerDeploymentVersion("ignored", "ignored"), + useWorkerVersioning: false) + { + DefaultVersioningBehavior = VersioningBehavior.Pinned, + }; + config.WorkerOptions.Activities.Add(DummyActivity()); + }, + new TemporalLambdaWorkerHandlerOptions + { + LoadClientConnectOptions = _ => new TemporalClientConnectOptions + { + TargetHost = "loaded-address", + Namespace = "loaded-namespace", + }, + GetEnvironmentVariable = name => + name == "TEMPORAL_TASK_QUEUE" ? "env-task-queue" : null, + ConnectClientAsync = options => + { + capturedClientOptions = options; + return Task.FromResult(new object()); + }, + CreateWorker = (_, options) => + { + capturedWorkerOptions = options; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + Assert.Equal(1, configureCalls); + await handler(null, new FakeLambdaContext()); + + Assert.NotNull(capturedClientOptions); + Assert.NotNull(capturedWorkerOptions); + Assert.Equal("localhost:7233", capturedClientOptions.TargetHost); + Assert.Equal("loaded-namespace", capturedClientOptions.Namespace); + Assert.Equal("configured-task-queue", capturedWorkerOptions.TaskQueue); + Assert.Equal(8, capturedWorkerOptions.MaxConcurrentActivities); + Assert.Equal(10, capturedWorkerOptions.MaxConcurrentWorkflowTasks); + Assert.Equal(2, capturedWorkerOptions.MaxConcurrentLocalActivities); + Assert.Equal(5, capturedWorkerOptions.MaxConcurrentNexusTasks); + Assert.Equal(2, capturedWorkerOptions.MaxConcurrentWorkflowTaskPolls); + Assert.Equal(4, capturedWorkerOptions.MaxConcurrentActivityTaskPolls); + Assert.Equal(1, capturedWorkerOptions.MaxConcurrentNexusTaskPolls); + Assert.Null(capturedWorkerOptions.WorkflowTaskPollerBehavior); + Assert.Null(capturedWorkerOptions.ActivityTaskPollerBehavior); + Assert.Null(capturedWorkerOptions.NexusTaskPollerBehavior); + Assert.Equal(12, capturedWorkerOptions.MaxCachedWorkflows); + Assert.False(capturedWorkerOptions.DisableEagerActivityExecution); + Assert.NotNull(capturedWorkerOptions.DeploymentOptions); + Assert.Equal(Version, capturedWorkerOptions.DeploymentOptions.Version); + Assert.True(capturedWorkerOptions.DeploymentOptions.UseWorkerVersioning); + Assert.Equal( + VersioningBehavior.Pinned, + capturedWorkerOptions.DeploymentOptions.DefaultVersioningBehavior); +#pragma warning disable CS0618 // Verifying the Lambda helper clears legacy versioning options. + Assert.Null(capturedWorkerOptions.BuildId); + Assert.False(capturedWorkerOptions.UseWorkerVersioning); +#pragma warning restore CS0618 + } + + [Fact] + public async Task CreateHandler_DefaultsVersioningBehaviorToAutoUpgrade() + { + TemporalWorkerOptions? capturedWorkerOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.WorkerOptions.AddWorkflow(); + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, options) => + { + capturedWorkerOptions = options; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, new FakeLambdaContext()); + + Assert.NotNull(capturedWorkerOptions); + Assert.NotNull(capturedWorkerOptions.DeploymentOptions); + Assert.Equal(Version, capturedWorkerOptions.DeploymentOptions.Version); + Assert.True(capturedWorkerOptions.DeploymentOptions.UseWorkerVersioning); + Assert.Equal( + VersioningBehavior.AutoUpgrade, + capturedWorkerOptions.DeploymentOptions.DefaultVersioningBehavior); + } + + [Fact] + public async Task CreateHandler_LoadsDefaultClientOptionsWhenNotOverridden() + { + var loadCalls = 0; + TemporalClientConnectOptions? capturedClientOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.WorkerOptions.TaskQueue = "task-queue"; + }, + new TemporalLambdaWorkerHandlerOptions + { + LoadClientConnectOptions = _ => + { + loadCalls++; + return new TemporalClientConnectOptions + { + TargetHost = "loaded-address", + Namespace = "loaded-namespace", + }; + }, + ConnectClientAsync = options => + { + capturedClientOptions = options; + return Task.FromResult(new object()); + }, + CreateWorker = (_, _) => new FakeLambdaWorker(_ => Task.CompletedTask), + }); + + await handler(null, new FakeLambdaContext()); + + Assert.Equal(1, loadCalls); + Assert.NotNull(capturedClientOptions); + Assert.Equal("loaded-address", capturedClientOptions.TargetHost); + Assert.Equal("loaded-namespace", capturedClientOptions.Namespace); + } + + [Fact] + public async Task CreateHandler_ExplicitClientOptionsBypassDefaultConfigLoad() + { + TemporalClientConnectOptions? capturedClientOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions = new TemporalClientConnectOptions + { + TargetHost = "explicit-address", + Namespace = "explicit-namespace", + }; + config.WorkerOptions.TaskQueue = "task-queue"; + }, + new TemporalLambdaWorkerHandlerOptions + { + LoadClientConnectOptions = _ => + throw new InvalidOperationException("Config should not be loaded"), + ConnectClientAsync = options => + { + capturedClientOptions = options; + return Task.FromResult(new object()); + }, + CreateWorker = (_, _) => new FakeLambdaWorker(_ => Task.CompletedTask), + }); + + await handler(null, new FakeLambdaContext()); + + Assert.NotNull(capturedClientOptions); + Assert.Equal("explicit-address", capturedClientOptions.TargetHost); + Assert.Equal("explicit-namespace", capturedClientOptions.Namespace); + } + + [Fact] + public async Task CreateHandler_ClearsConcurrencyDefaultsWhenTunerSet() + { + var tuner = WorkerTuner.CreateFixedSize( + workflowTaskSlots: 1, + activityTaskSlots: 2, + localActivitySlots: 3, + nexusTaskSlots: 4); + TemporalWorkerOptions? capturedWorkerOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.WorkerOptions.Tuner = tuner; + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, options) => + { + capturedWorkerOptions = options; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, new FakeLambdaContext()); + + Assert.NotNull(capturedWorkerOptions); + Assert.Same(tuner, capturedWorkerOptions.Tuner); + Assert.Null(capturedWorkerOptions.MaxConcurrentActivities); + Assert.Null(capturedWorkerOptions.MaxConcurrentWorkflowTasks); + Assert.Null(capturedWorkerOptions.MaxConcurrentLocalActivities); + Assert.Null(capturedWorkerOptions.MaxConcurrentNexusTasks); + } + + [Fact] + public async Task CreateHandler_ClearsConcurrencyDefaultsWhenPluginSetsTuner() + { + var tuner = WorkerTuner.CreateFixedSize( + workflowTaskSlots: 1, + activityTaskSlots: 2, + localActivitySlots: 3, + nexusTaskSlots: 4); + TemporalWorkerOptions? capturedWorkerOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.WorkerOptions.Plugins = new[] { new TunerPlugin(tuner) }; + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, options) => + { + foreach (var plugin in options.Plugins ?? Array.Empty()) + { + plugin.ConfigureWorker(options); + } + options.ApplyPostPluginConfiguration(); + capturedWorkerOptions = options; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, new FakeLambdaContext()); + + Assert.NotNull(capturedWorkerOptions); + Assert.Same(tuner, capturedWorkerOptions.Tuner); + Assert.Null(capturedWorkerOptions.MaxConcurrentActivities); + Assert.Null(capturedWorkerOptions.MaxConcurrentWorkflowTasks); + Assert.Null(capturedWorkerOptions.MaxConcurrentLocalActivities); + Assert.Null(capturedWorkerOptions.MaxConcurrentNexusTasks); + } + + [Fact] + public async Task CreateHandler_ReappliesDeploymentVersionAfterPlugins() + { + TemporalWorkerOptions? capturedWorkerOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.WorkerOptions.Plugins = new[] { new VersioningPlugin() }; + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, options) => + { + foreach (var plugin in options.Plugins ?? Array.Empty()) + { + plugin.ConfigureWorker(options); + } + options.ApplyPostPluginConfiguration(); + capturedWorkerOptions = options; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, new FakeLambdaContext()); + + Assert.NotNull(capturedWorkerOptions); + Assert.NotNull(capturedWorkerOptions.DeploymentOptions); + Assert.Equal(Version, capturedWorkerOptions.DeploymentOptions.Version); + Assert.True(capturedWorkerOptions.DeploymentOptions.UseWorkerVersioning); + Assert.Equal( + VersioningBehavior.AutoUpgrade, + capturedWorkerOptions.DeploymentOptions.DefaultVersioningBehavior); +#pragma warning disable CS0618 // Verifying the Lambda helper clears legacy versioning options. + Assert.Null(capturedWorkerOptions.BuildId); + Assert.False(capturedWorkerOptions.UseWorkerVersioning); +#pragma warning restore CS0618 + } + + [Fact] + public void CreateHandler_MissingDeploymentNameOrBuildIdThrows() + { + Assert.Throws(() => + TemporalLambdaWorker.CreateHandler( + new WorkerDeploymentVersion(string.Empty, "build"), + _ => { })); + Assert.Throws(() => + TemporalLambdaWorker.CreateHandler( + new WorkerDeploymentVersion("deployment", string.Empty), + _ => { })); + } + + [Fact] + public async Task CreateHandler_TaskQueueCanComeFromEnvironment() + { + Assert.Throws(() => + TemporalLambdaWorker.CreateHandler( + Version, + _ => { }, + new TemporalLambdaWorkerHandlerOptions + { + GetEnvironmentVariable = _ => null, + })); + + TemporalWorkerOptions? capturedWorkerOptions = null; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => config.ClientOptions.TargetHost = "localhost:7233", + new TemporalLambdaWorkerHandlerOptions + { + GetEnvironmentVariable = name => + name == "TEMPORAL_TASK_QUEUE" ? "env-task-queue" : null, + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, options) => + { + capturedWorkerOptions = options; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, new FakeLambdaContext()); + Assert.NotNull(capturedWorkerOptions); + Assert.Equal("env-task-queue", capturedWorkerOptions.TaskQueue); + } + + [Fact] + public void LoadClientConnectOptions_ExplicitConfigSourceWins() + { + var tempDir = CreateTempDirectory(); + try + { + var envConfigPath = Path.Combine(tempDir, "env.toml"); + File.WriteAllText(envConfigPath, ConfigToml("env-address", "env-namespace")); + + var options = TemporalLambdaWorker.LoadClientConnectOptions( + new ClientEnvConfig.ProfileLoadOptions + { + ConfigSource = DataSource.FromUTF8String( + ConfigToml("explicit-address", "explicit-namespace")), + OverrideEnvVars = new Dictionary + { + ["TEMPORAL_CONFIG_FILE"] = envConfigPath, + }, + }); + + Assert.Equal("explicit-address", options.TargetHost); + Assert.Equal("explicit-namespace", options.Namespace); + } + finally + { + Directory.Delete(tempDir, recursive: true); + } + } + + [Fact] + public void LoadClientConnectOptions_TemporalConfigFileWinsOverLambdaTaskRoot() + { + var tempDir = CreateTempDirectory(); + try + { + var envConfigPath = Path.Combine(tempDir, "env.toml"); + File.WriteAllText(envConfigPath, ConfigToml("env-address", "env-namespace")); + var lambdaRoot = Path.Combine(tempDir, "lambda-root"); + Directory.CreateDirectory(lambdaRoot); + File.WriteAllText( + Path.Combine(lambdaRoot, "temporal.toml"), + ConfigToml("lambda-address", "lambda-namespace")); + + var options = TemporalLambdaWorker.LoadClientConnectOptions( + new ClientEnvConfig.ProfileLoadOptions + { + OverrideEnvVars = new Dictionary + { + ["TEMPORAL_CONFIG_FILE"] = envConfigPath, + ["LAMBDA_TASK_ROOT"] = lambdaRoot, + }, + }); + + Assert.Equal("env-address", options.TargetHost); + Assert.Equal("env-namespace", options.Namespace); + } + finally + { + Directory.Delete(tempDir, recursive: true); + } + } + + [Fact] + public void LoadClientConnectOptions_UsesLambdaTaskRootTemporalToml() + { + var tempDir = CreateTempDirectory(); + try + { + File.WriteAllText( + Path.Combine(tempDir, "temporal.toml"), + ConfigToml("lambda-address", "lambda-namespace")); + + var options = TemporalLambdaWorker.LoadClientConnectOptions( + new ClientEnvConfig.ProfileLoadOptions + { + OverrideEnvVars = new Dictionary + { + ["LAMBDA_TASK_ROOT"] = tempDir, + }, + }); + + Assert.Equal("lambda-address", options.TargetHost); + Assert.Equal("lambda-namespace", options.Namespace); + } + finally + { + Directory.Delete(tempDir, recursive: true); + } + } + + [Fact] + public void LoadClientConnectOptions_FallsBackToCurrentDirectoryTemporalToml() + { + var previousDirectory = Directory.GetCurrentDirectory(); + var tempDir = CreateTempDirectory(); + try + { + File.WriteAllText( + Path.Combine(tempDir, "temporal.toml"), + ConfigToml("cwd-address", "cwd-namespace")); + Directory.SetCurrentDirectory(tempDir); + + var options = TemporalLambdaWorker.LoadClientConnectOptions( + new ClientEnvConfig.ProfileLoadOptions + { + OverrideEnvVars = new Dictionary(), + }); + + Assert.Equal("cwd-address", options.TargetHost); + Assert.Equal("cwd-namespace", options.Namespace); + } + finally + { + Directory.SetCurrentDirectory(previousDirectory); + Directory.Delete(tempDir, recursive: true); + } + } + + [Fact] + public void LoadClientConnectOptions_MissingLambdaConfigAllowsEnvOnly() + { + var previousDirectory = Directory.GetCurrentDirectory(); + var tempDir = CreateTempDirectory(); + try + { + Directory.SetCurrentDirectory(tempDir); + + var options = TemporalLambdaWorker.LoadClientConnectOptions( + new ClientEnvConfig.ProfileLoadOptions + { + OverrideEnvVars = new Dictionary + { + ["TEMPORAL_ADDRESS"] = "env-only-address", + ["TEMPORAL_NAMESPACE"] = "env-only-namespace", + }, + }); + + Assert.Equal("env-only-address", options.TargetHost); + Assert.Equal("env-only-namespace", options.Namespace); + } + finally + { + Directory.SetCurrentDirectory(previousDirectory); + Directory.Delete(tempDir, recursive: true); + } + } + + [Fact] + public async Task Invoke_SetsLambdaIdentityUnlessUserConfiguredIdentity() + { + TemporalClientConnectOptions? capturedClientOptions = null; + var context = new FakeLambdaContext + { + AwsRequestId = "request-id", + InvokedFunctionArn = "function-arn", + }; + var handler = CreateCapturingHandler( + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + }, + options => capturedClientOptions = options); + + await handler(null, context); + + Assert.NotNull(capturedClientOptions); + Assert.Equal("request-id@function-arn", capturedClientOptions.Identity); + + handler = CreateCapturingHandler( + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.ClientOptions.Identity = "user-identity"; + config.WorkerOptions.TaskQueue = "task-queue"; + }, + options => capturedClientOptions = options); + + await handler(null, context); + + Assert.NotNull(capturedClientOptions); + Assert.Equal("user-identity", capturedClientOptions.Identity); + } + + [Fact] + public async Task Invoke_DeadlineCancellationIsNormalAndRunsShutdownHooks() + { + var hookRan = false; + CancellationToken workerToken = default; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownDeadlineBuffer = TimeSpan.FromMilliseconds(10); + config.ShutdownHooks.Add(_ => + { + hookRan = true; + return Task.CompletedTask; + }); + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, _) => new FakeLambdaWorker(async token => + { + workerToken = token; + await Task.Delay(Timeout.InfiniteTimeSpan, token); + }), + }); + + await handler(null, new FakeLambdaContext { RemainingTime = TimeSpan.FromMilliseconds(40) }); + + Assert.True(workerToken.IsCancellationRequested); + Assert.True(hookRan); + } + + [Fact] + public async Task Invoke_RecomputesWorkerBudgetAfterSetupAndBeforeWorkerRun() + { + var context = new FakeLambdaContext( + TimeSpan.FromMilliseconds(200), + TimeSpan.FromMilliseconds(40), + TimeSpan.FromSeconds(1)); + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownDeadlineBuffer = TimeSpan.FromMilliseconds(10); + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => + { + Assert.Equal(1, context.RemainingTimeReadCount); + return Task.FromResult(new object()); + }, + CreateWorker = (_, _) => + { + Assert.Equal(1, context.RemainingTimeReadCount); + return new FakeLambdaWorker(async token => + { + Assert.Equal(2, context.RemainingTimeReadCount); + await Task.Delay(Timeout.InfiniteTimeSpan, token); + }); + }, + }); + + await handler(null, context); + + Assert.Equal(3, context.RemainingTimeReadCount); + } + + [Fact] + public async Task Invoke_TightDeadlinesThrowOrWarn() + { + var connectCalls = 0; + var throwingHandler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownDeadlineBuffer = TimeSpan.FromMilliseconds(100); + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => + { + connectCalls++; + return Task.FromResult(new object()); + }, + }); + + await Assert.ThrowsAsync(() => + throwingHandler( + null, + new FakeLambdaContext { RemainingTime = TimeSpan.FromMilliseconds(50) })); + Assert.Equal(0, connectCalls); + + var warningContext = new FakeLambdaContext { RemainingTime = TimeSpan.FromMilliseconds(40) }; + var warningHandler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownDeadlineBuffer = TimeSpan.FromMilliseconds(10); + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, _) => new FakeLambdaWorker( + token => Task.Delay(Timeout.InfiniteTimeSpan, token)), + }); + + await warningHandler(null, warningContext); + + Assert.Contains( + warningContext.CaptureLogger.Lines, + line => line.Contains("WARNING: Temporal Lambda worker budget", StringComparison.Ordinal)); + } + + [Fact] + public async Task Invoke_ShutdownHooksRunInOrderPerInvocationAndContinueAfterFailures() + { + var hookCalls = new List(); + var connectCalls = 0; + var workerCreations = 0; + var context = new FakeLambdaContext(); + var handler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.ClientOptions.TargetHost = "localhost:7233"; + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownHooks.Add(_ => + { + hookCalls.Add("first"); + return Task.CompletedTask; + }); + config.ShutdownHooks.Add(_ => + { + hookCalls.Add("second"); + throw new InvalidOperationException("hook failed"); + }); + config.ShutdownHooks.Add(_ => + { + hookCalls.Add("third"); + return Task.CompletedTask; + }); + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => + { + connectCalls++; + return Task.FromResult(new object()); + }, + CreateWorker = (_, _) => + { + workerCreations++; + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, context); + await handler(null, context); + + Assert.Equal( + new[] { "first", "second", "third", "first", "second", "third" }, + hookCalls); + Assert.Equal( + 2, + context.CaptureLogger.Lines.Count( + line => line.Contains("shutdown hook failed", StringComparison.Ordinal))); + Assert.Equal(2, connectCalls); + Assert.Equal(2, workerCreations); + } + + [Fact] + public async Task CreateHandler_AsyncConfigureRunsPerInvocationWithFreshConfig() + { + var configureCalls = 0; + var capturedConfigs = new List(); + var capturedTargets = new List(); + var capturedTaskQueues = new List(); + var hookCalls = new List(); + var handler = TemporalLambdaWorker.CreateHandler( + Version, + async config => + { + await Task.Yield(); + var call = ++configureCalls; + capturedConfigs.Add(config); + Assert.Equal("env-task-queue", config.WorkerOptions.TaskQueue); + + config.ClientOptions.TargetHost = $"target-{call}"; + config.WorkerOptions.TaskQueue = $"task-queue-{call}"; + config.ShutdownHooks.Add(_ => + { + hookCalls.Add($"hook-{call}"); + return Task.CompletedTask; + }); + }, + new TemporalLambdaWorkerHandlerOptions + { + GetEnvironmentVariable = name => + name == "TEMPORAL_TASK_QUEUE" ? "env-task-queue" : null, + ConnectClientAsync = options => + { + capturedTargets.Add(options.TargetHost); + return Task.FromResult(new object()); + }, + CreateWorker = (_, options) => + { + capturedTaskQueues.Add(options.TaskQueue); + return new FakeLambdaWorker(_ => Task.CompletedTask); + }, + }); + + await handler(null, new FakeLambdaContext()); + await handler(null, new FakeLambdaContext()); + + Assert.Equal(2, configureCalls); + Assert.Equal(2, capturedConfigs.Count); + Assert.NotSame(capturedConfigs[0], capturedConfigs[1]); + Assert.Equal(new[] { "target-1", "target-2" }, capturedTargets); + Assert.Equal(new[] { "task-queue-1", "task-queue-2" }, capturedTaskQueues); + Assert.Equal(new[] { "hook-1", "hook-2" }, hookCalls); + } + + [Fact] + public async Task CreateHandler_AsyncConfigureErrorsSurfaceOnInvocation() + { + var configureCalls = 0; + var handler = TemporalLambdaWorker.CreateHandler( + Version, + async config => + { + _ = config; + await Task.Yield(); + configureCalls++; + throw new InvalidOperationException("bad config"); + }, + new TemporalLambdaWorkerHandlerOptions()); + + var error = await Assert.ThrowsAsync(() => + handler(null, new FakeLambdaContext())); + Assert.Equal("bad config", error.Message); + Assert.Equal(1, configureCalls); + } + + [Fact] + public async Task CreateHandler_AsyncConfigureValidatesTaskQueueOnInvocation() + { + var handler = TemporalLambdaWorker.CreateHandler( + Version, + _ => Task.CompletedTask, + new TemporalLambdaWorkerHandlerOptions + { + GetEnvironmentVariable = _ => null, + }); + + await Assert.ThrowsAsync(() => + handler(null, new FakeLambdaContext())); + } + + [Fact] + public async Task CreateHandler_AsyncConfigureShutdownHooksRunAfterFailures() + { + var hookCalls = new List(); + var connectFailureHandler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownHooks.Add(_ => + { + hookCalls.Add("connect"); + return Task.CompletedTask; + }); + return Task.CompletedTask; + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => + throw new InvalidOperationException("connect failed"), + }); + + await Assert.ThrowsAsync(() => + connectFailureHandler(null, new FakeLambdaContext())); + + var workerFailureHandler = TemporalLambdaWorker.CreateHandler( + Version, + config => + { + config.WorkerOptions.TaskQueue = "task-queue"; + config.ShutdownHooks.Add(_ => + { + hookCalls.Add("worker"); + return Task.CompletedTask; + }); + return Task.CompletedTask; + }, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = _ => Task.FromResult(new object()), + CreateWorker = (_, _) => new FakeLambdaWorker( + _ => throw new InvalidOperationException("worker failed")), + }); + + await Assert.ThrowsAsync(() => + workerFailureHandler(null, new FakeLambdaContext())); + + Assert.Equal(new[] { "connect", "worker" }, hookCalls); + } + + private static Func CreateCapturingHandler( + Action configure, + Action captureClientOptions) => + TemporalLambdaWorker.CreateHandler( + Version, + configure, + new TemporalLambdaWorkerHandlerOptions + { + ConnectClientAsync = options => + { + captureClientOptions(options); + return Task.FromResult(new object()); + }, + CreateWorker = (_, _) => new FakeLambdaWorker(_ => Task.CompletedTask), + }); + + private static string ConfigToml(string address, string nameSpace) => $@" +[profile.default] +address = ""{address}"" +namespace = ""{nameSpace}"" +"; + + private static string CreateTempDirectory() + { + var tempDir = Path.Combine( + Path.GetTempPath(), + $"TemporalLambdaWorkerTests-{Guid.NewGuid():N}"); + Directory.CreateDirectory(tempDir); + return tempDir; + } + + private static ActivityDefinition DummyActivity() => + ActivityDefinition.Create( + "dummy", + typeof(Task), + Array.Empty(), + 0, + _ => Task.CompletedTask); + + [Workflow] + public sealed class WorkflowWithoutVersioningBehavior + { + [WorkflowRun] + public Task RunAsync() => Task.CompletedTask; + } + + private sealed class FakeLambdaWorker : ILambdaWorker + { + private readonly Func executeAsync; + + public FakeLambdaWorker(Func executeAsync) => + this.executeAsync = executeAsync; + + public Task ExecuteAsync(CancellationToken stoppingToken) => + executeAsync(stoppingToken); + + public void Dispose() + { + } + } + + private sealed class TunerPlugin : ITemporalWorkerPlugin + { + private readonly WorkerTuner tuner; + + public TunerPlugin(WorkerTuner tuner) => this.tuner = tuner; + + public string Name => "TunerPlugin"; + + public void ConfigureWorker(TemporalWorkerOptions options) => options.Tuner = tuner; + + public Task RunWorkerAsync( + TemporalWorker worker, + Func> continuation, + CancellationToken stoppingToken) => + throw new NotImplementedException(); + + public void ConfigureReplayer(WorkflowReplayerOptions options) => + throw new NotImplementedException(); + + public Task> ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func>> continuation, + CancellationToken cancellationToken) => + throw new NotImplementedException(); + + public IAsyncEnumerable ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func> continuation, + CancellationToken cancellationToken) => + throw new NotImplementedException(); + } + + private sealed class VersioningPlugin : ITemporalWorkerPlugin + { + public string Name => "VersioningPlugin"; + + public void ConfigureWorker(TemporalWorkerOptions options) + { + options.DeploymentOptions = new WorkerDeploymentOptions( + new WorkerDeploymentVersion("plugin-deployment", "plugin-build"), + useWorkerVersioning: false) + { + DefaultVersioningBehavior = VersioningBehavior.AutoUpgrade, + }; +#pragma warning disable CS0618 // Verifying the Lambda helper clears legacy versioning options. + options.BuildId = "legacy-build"; + options.UseWorkerVersioning = true; +#pragma warning restore CS0618 + } + + public Task RunWorkerAsync( + TemporalWorker worker, + Func> continuation, + CancellationToken stoppingToken) => + throw new NotImplementedException(); + + public void ConfigureReplayer(WorkflowReplayerOptions options) => + throw new NotImplementedException(); + + public Task> ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func>> continuation, + CancellationToken cancellationToken) => + throw new NotImplementedException(); + + public IAsyncEnumerable ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func> continuation, + CancellationToken cancellationToken) => + throw new NotImplementedException(); + } + + private sealed class FakeLambdaContext : ILambdaContext + { + private readonly Queue remainingTimes = new(); + private TimeSpan remainingTime = TimeSpan.FromMinutes(1); + + public FakeLambdaContext() + { + } + + public FakeLambdaContext(params TimeSpan[] remainingTimes) + { + foreach (var remaining in remainingTimes) + { + this.remainingTimes.Enqueue(remaining); + } + } + + public CaptureLambdaLogger CaptureLogger { get; } = new(); + + public string AwsRequestId { get; set; } = "request-id"; + + public IClientContext ClientContext { get; } = null!; + + public string FunctionName { get; } = "function-name"; + + public string FunctionVersion { get; } = "1"; + + public ICognitoIdentity Identity { get; } = null!; + + public string InvokedFunctionArn { get; set; } = "function-arn"; + + public ILambdaLogger Logger => CaptureLogger; + + public string LogGroupName { get; } = "log-group"; + + public string LogStreamName { get; } = "log-stream"; + + public int MemoryLimitInMB { get; } = 128; + + public int RemainingTimeReadCount { get; private set; } + + public TimeSpan RemainingTime + { + get + { + RemainingTimeReadCount++; + if (remainingTimes.Count > 0) + { + remainingTime = remainingTimes.Dequeue(); + } + return remainingTime; + } + + set + { + remainingTimes.Clear(); + remainingTime = value; + } + } + } + + private sealed class CaptureLambdaLogger : ILambdaLogger + { + public List Lines { get; } = new(); + + public void Log(string message) => Lines.Add(message); + + public void LogLine(string message) => Lines.Add(message); + } +} diff --git a/tests/Temporalio.Tests/Temporalio.Tests.csproj b/tests/Temporalio.Tests/Temporalio.Tests.csproj index f424273b..0f01826f 100644 --- a/tests/Temporalio.Tests/Temporalio.Tests.csproj +++ b/tests/Temporalio.Tests/Temporalio.Tests.csproj @@ -25,6 +25,8 @@ + + diff --git a/tests/Temporalio.Tests/packages.lock.json b/tests/Temporalio.Tests/packages.lock.json index 0aee28c3..ff3ff8af 100644 --- a/tests/Temporalio.Tests/packages.lock.json +++ b/tests/Temporalio.Tests/packages.lock.json @@ -378,16 +378,6 @@ "resolved": "5.11.0", "contentHash": "eaiXkUjC4NPcquGWzAGMXjuxvLwc6XGKMptSyOGQeT0X70BUZObuybJFZLA0OfTdueLd3US23NBPTBb6iF3V1Q==" }, - "OpenTelemetry": { - "type": "Transitive", - "resolved": "1.15.3", - "contentHash": "N0i6WjPoHPbZyms1ugbDIFAJFuGlpeExJMU/+XSL0lQRUkg/D0utFkDoLXf8Z1km5B+xVZ2GyMXXiX8qdeNmPg==", - "dependencies": { - "Microsoft.Extensions.Diagnostics.Abstractions": "10.0.0", - "Microsoft.Extensions.Logging.Configuration": "10.0.0", - "OpenTelemetry.Api.ProviderBuilderExtensions": "1.15.3" - } - }, "OpenTelemetry.Api.ProviderBuilderExtensions": { "type": "Transitive", "resolved": "1.15.3", @@ -476,6 +466,24 @@ "NexusRpc": "[0.3.0, )" } }, + "temporalio.extensions.aws.lambda": { + "type": "Project", + "dependencies": { + "Amazon.Lambda.Core": "[3.1.0, )", + "Temporalio": "[1.14.1, )" + } + }, + "temporalio.extensions.aws.lambda.opentelemetry": { + "type": "Project", + "dependencies": { + "OpenTelemetry": "[1.15.3, )", + "OpenTelemetry.Exporter.OpenTelemetryProtocol": "[1.15.3, )", + "OpenTelemetry.Extensions.AWS": "[1.15.1, )", + "Temporalio": "[1.14.1, )", + "Temporalio.Extensions.Aws.Lambda": "[1.14.1, )", + "Temporalio.Extensions.OpenTelemetry": "[1.14.1, )" + } + }, "temporalio.extensions.diagnosticsource": { "type": "Project", "dependencies": { @@ -496,6 +504,12 @@ "Temporalio": "[1.15.0, )" } }, + "Amazon.Lambda.Core": { + "type": "CentralTransitive", + "requested": "[3.1.0, )", + "resolved": "3.1.0", + "contentHash": "uZZ2k5lMoB9OzPTmKkkEKpyFcnLxcb7FxtxrA3+HBg/sooTzu402iCcSk5r+N62Qokhwr4Q9cbaVJSM6Dln3aA==" + }, "Google.Protobuf": { "type": "CentralTransitive", "requested": "[3.26.1, )", @@ -528,11 +542,40 @@ "resolved": "0.3.0", "contentHash": "Kr+NMSZ5428AvxpzShdJcQxc9w6HT8SM6FXQMekC4K9wGpmC1m/L2pQJydpvVTwRBu3qAIYKPI37KWexF4Gtcg==" }, + "OpenTelemetry": { + "type": "CentralTransitive", + "requested": "[1.15.3, )", + "resolved": "1.15.3", + "contentHash": "N0i6WjPoHPbZyms1ugbDIFAJFuGlpeExJMU/+XSL0lQRUkg/D0utFkDoLXf8Z1km5B+xVZ2GyMXXiX8qdeNmPg==", + "dependencies": { + "Microsoft.Extensions.Diagnostics.Abstractions": "10.0.0", + "Microsoft.Extensions.Logging.Configuration": "10.0.0", + "OpenTelemetry.Api.ProviderBuilderExtensions": "1.15.3" + } + }, "OpenTelemetry.Api": { "type": "CentralTransitive", "requested": "[1.15.3, )", "resolved": "1.15.3", "contentHash": "fX+fkCysfPut+qCcT3bKqyX4QN9Saf4CgX8HLOHywEVD+Xr7sULtfuypITpoDysjx8R59dn/3mWhgimMH8cm/g==" + }, + "OpenTelemetry.Exporter.OpenTelemetryProtocol": { + "type": "CentralTransitive", + "requested": "[1.15.3, )", + "resolved": "1.15.3", + "contentHash": "FEXJepcseTGbATiCkUfP7ipoFEYYfl/0UmmUwi0KxCPg9PaUA8ab2P1LGopK+/HExasJ1ZutFhZrN6WvUIR23g==", + "dependencies": { + "OpenTelemetry": "1.15.3" + } + }, + "OpenTelemetry.Extensions.AWS": { + "type": "CentralTransitive", + "requested": "[1.15.1, )", + "resolved": "1.15.1", + "contentHash": "T+Vrhlv79PyG+fK6XnEtdJW9VtYp5WxSsVajplnkbuY0Q3gTyFNiLPP8tyu1qmEL19bKc9i6Wgp4JqhvupqirA==", + "dependencies": { + "OpenTelemetry": "[1.15.3, 2.0.0)" + } } } }