-
Notifications
You must be signed in to change notification settings - Fork 56
AWS Lambda #721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
AWS Lambda #721
Changes from all commits
3d87655
7046b9b
0fd7e60
1674521
aa6c26a
c96452f
33b7e9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -276,6 +276,26 @@ var client = await TemporalClient.ConnectAsync(new("my-namespace.a1b2c.tmprl.clo | |
| }); | ||
| ``` | ||
|
|
||
| #### Client Configuration From Environment | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please separate this out from this change and open a separate PR |
||
|
|
||
| Client connection options can be loaded from a `temporal.toml` file and environment variables: | ||
|
|
||
| ```csharp | ||
| using Temporalio.Client; | ||
| using Temporalio.Common.EnvConfig; | ||
|
|
||
| var client = await TemporalClient.ConnectAsync( | ||
| ClientEnvConfig.LoadClientConnectOptions()); | ||
| ``` | ||
|
|
||
| By default, the loader checks `TEMPORAL_CONFIG_FILE`; if unset, it looks for `temporal.toml` in the | ||
| user config directory under `temporalio`. The selected profile is `TEMPORAL_PROFILE`, or `default` | ||
| when unset. Environment variables such as `TEMPORAL_ADDRESS`, `TEMPORAL_NAMESPACE`, | ||
| `TEMPORAL_API_KEY`, TLS certificate/key settings, and `TEMPORAL_GRPC_META_*` override file values. | ||
|
|
||
| Use `ClientEnvConfig.ProfileLoadOptions` to choose a profile, provide an explicit config source, or | ||
| disable file or environment loading. | ||
|
|
||
| #### Client Dependency Injection | ||
|
|
||
| To create clients for use with dependency injection, see the | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,5 +16,7 @@ Commonly used namespaces: | |
| Extensions: | ||
|
|
||
| * [Temporalio.Extensions.DiagnosticSource](/api/Temporalio.Extensions.DiagnosticSource.html) | ||
| * [Temporalio.Extensions.Aws.Lambda](/api/Temporalio.Extensions.Aws.Lambda.html) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please alphabetize |
||
| * [Temporalio.Extensions.Aws.Lambda.OpenTelemetry](/api/Temporalio.Extensions.Aws.Lambda.OpenTelemetry.html) | ||
| * [Temporalio.Extensions.Hosting](/api/Temporalio.Extensions.Hosting.html) | ||
| * [Temporalio.Extensions.OpenTelemetry](/api/Temporalio.Extensions.OpenTelemetry.html) | ||
| * [Temporalio.Extensions.OpenTelemetry](/api/Temporalio.Extensions.OpenTelemetry.html) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,8 @@ | |
| "files": [ | ||
| "Temporalio/*.csproj", | ||
| "Temporalio.Extensions.DiagnosticSource/*.csproj", | ||
| "Temporalio.Extensions.Aws.Lambda/*.csproj", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please alphabetize |
||
| "Temporalio.Extensions.Aws.Lambda.OpenTelemetry/*.csproj", | ||
| "Temporalio.Extensions.Hosting/*.csproj", | ||
| "Temporalio.Extensions.OpenTelemetry/*.csproj" | ||
| ], | ||
|
|
@@ -72,4 +74,4 @@ | |
| "keepFileLink": false, | ||
| "disableGitFeatures": false | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,204 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using OpenTelemetry; | ||
| using OpenTelemetry.Exporter; | ||
| using OpenTelemetry.Resources; | ||
| using OpenTelemetry.Trace; | ||
| using Temporalio.Client.Interceptors; | ||
| using Temporalio.Runtime; | ||
| using TemporalOpenTelemetry = Temporalio.Extensions.OpenTelemetry; | ||
|
|
||
| namespace Temporalio.Extensions.Aws.Lambda.OpenTelemetry | ||
| { | ||
| /// <summary> | ||
| /// OpenTelemetry helpers for Temporal workers running inside AWS Lambda. | ||
| /// </summary> | ||
| public static class LambdaWorkerOpenTelemetry | ||
| { | ||
| private const string DefaultCollectorEndpoint = "http://localhost:4317"; | ||
| private const string DefaultServiceName = "temporal-lambda-worker"; | ||
| private const string OTelExporterOtlpEndpointEnvironmentVariable = | ||
| "OTEL_EXPORTER_OTLP_ENDPOINT"; | ||
|
|
||
| private const string OTelServiceNameEnvironmentVariable = "OTEL_SERVICE_NAME"; | ||
| private const string LambdaFunctionNameEnvironmentVariable = "AWS_LAMBDA_FUNCTION_NAME"; | ||
| private const string ServiceNameResourceAttribute = "service.name"; | ||
|
|
||
| /// <summary> | ||
| /// Configure OpenTelemetry metrics and tracing with AWS Lambda defaults. | ||
| /// </summary> | ||
| /// <param name="config">Lambda worker configuration to mutate.</param> | ||
| /// <param name="options">Optional OpenTelemetry configuration.</param> | ||
| /// <remarks> | ||
| /// This creates an OTLP trace exporter and tracer provider, configures Core SDK metrics | ||
| /// through a Temporal runtime, adds the Temporal tracing interceptor, and registers a | ||
| /// per-invocation shutdown hook to force-flush traces before the Lambda invocation ends. | ||
| /// </remarks> | ||
| public static void ApplyDefaults( | ||
| LambdaWorkerConfig config, | ||
| LambdaWorkerOpenTelemetryOptions? options = null) | ||
| { | ||
| if (config == null) | ||
| { | ||
| throw new ArgumentNullException(nameof(config)); | ||
| } | ||
|
|
||
| var resolvedOptions = ResolveOptions(options); | ||
| #pragma warning disable CA2000 // Provider is intentionally retained for Lambda warm invocations. | ||
| var tracerProvider = CreateTracerProvider(resolvedOptions); | ||
| #pragma warning restore CA2000 | ||
|
|
||
| config.ClientOptions.Interceptors = AddTracingInterceptor( | ||
| config.ClientOptions.Interceptors); | ||
| config.ClientOptions.Runtime = CreateRuntime(resolvedOptions); | ||
| config.ShutdownHooks.Add( | ||
| cancellationToken => ForceFlushAsync( | ||
| tracerProvider, | ||
| config.ShutdownDeadlineBuffer, | ||
| cancellationToken)); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Resolve options using process environment variables. | ||
| /// </summary> | ||
| /// <param name="options">Options to resolve.</param> | ||
| /// <returns>Resolved options.</returns> | ||
| internal static ResolvedLambdaWorkerOpenTelemetryOptions ResolveOptions( | ||
| LambdaWorkerOpenTelemetryOptions? options = null) | ||
| { | ||
| options ??= new LambdaWorkerOpenTelemetryOptions(); | ||
| if (options.MetricsExportInterval <= TimeSpan.Zero) | ||
| { | ||
| throw new ArgumentOutOfRangeException( | ||
| nameof(options), | ||
| "MetricsExportInterval must be greater than zero"); | ||
| } | ||
|
|
||
| var serviceName = FirstNonEmpty( | ||
| options.ServiceName, | ||
| Environment.GetEnvironmentVariable(OTelServiceNameEnvironmentVariable), | ||
| Environment.GetEnvironmentVariable(LambdaFunctionNameEnvironmentVariable), | ||
| DefaultServiceName); | ||
| var collectorEndpoint = FirstNonEmpty( | ||
| options.CollectorEndpoint, | ||
| Environment.GetEnvironmentVariable(OTelExporterOtlpEndpointEnvironmentVariable), | ||
| DefaultCollectorEndpoint); | ||
|
|
||
| return new ResolvedLambdaWorkerOpenTelemetryOptions( | ||
| new Uri(collectorEndpoint), | ||
| serviceName, | ||
| options.MetricsExportInterval); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Force-flush the tracer provider asynchronously. | ||
| /// </summary> | ||
| /// <param name="tracerProvider">Tracer provider to flush.</param> | ||
| /// <param name="shutdownDeadlineBuffer">Maximum time to wait for the flush.</param> | ||
| /// <param name="cancellationToken">Cancellation token.</param> | ||
| /// <returns>A task for the flush.</returns> | ||
| internal static async Task ForceFlushAsync( | ||
| TracerProvider tracerProvider, | ||
| TimeSpan shutdownDeadlineBuffer, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| if (cancellationToken.IsCancellationRequested) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| var flushTask = Task.Run( | ||
| () => tracerProvider.ForceFlush(ToTimeoutMilliseconds(shutdownDeadlineBuffer))); | ||
| if (flushTask == await Task.WhenAny( | ||
| flushTask, | ||
| Task.Delay(Timeout.Infinite, cancellationToken)).ConfigureAwait(false)) | ||
| { | ||
| await flushTask.ConfigureAwait(false); | ||
| } | ||
| else | ||
| { | ||
| ObserveTaskException(flushTask); | ||
| } | ||
| } | ||
|
|
||
| private static string FirstNonEmpty(params string?[] values) => | ||
| values.First(value => !string.IsNullOrEmpty(value))!; | ||
|
|
||
| private static void ObserveTaskException(Task task) => | ||
| _ = task.ContinueWith( | ||
| completedTask => _ = completedTask.Exception, | ||
| CancellationToken.None, | ||
| TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, | ||
| TaskScheduler.Default); | ||
|
|
||
| private static TracerProvider CreateTracerProvider( | ||
| ResolvedLambdaWorkerOpenTelemetryOptions options) => | ||
| Sdk.CreateTracerProviderBuilder(). | ||
| AddXRayTraceId(). | ||
| SetResourceBuilder( | ||
| ResourceBuilder.CreateDefault().AddService(options.ServiceName)). | ||
| AddSource( | ||
| TemporalOpenTelemetry.TracingInterceptor.ClientSource.Name, | ||
| TemporalOpenTelemetry.TracingInterceptor.WorkflowsSource.Name, | ||
| TemporalOpenTelemetry.TracingInterceptor.ActivitiesSource.Name, | ||
| TemporalOpenTelemetry.TracingInterceptor.NexusSource.Name). | ||
| AddOtlpExporter(exporterOptions => | ||
| { | ||
| exporterOptions.Endpoint = options.CollectorEndpoint; | ||
| #pragma warning disable CS0618 // ADOT Lambda parity uses OTLP gRPC on localhost:4317. | ||
| exporterOptions.Protocol = OtlpExportProtocol.Grpc; | ||
| #pragma warning restore CS0618 | ||
| }). | ||
| Build(); | ||
|
|
||
| private static List<IClientInterceptor> AddTracingInterceptor( | ||
| IReadOnlyCollection<IClientInterceptor>? interceptors) | ||
| { | ||
| var newInterceptors = interceptors?.ToList() ?? new List<IClientInterceptor>(); | ||
| newInterceptors.Add(new TemporalOpenTelemetry.TracingInterceptor()); | ||
| return newInterceptors; | ||
| } | ||
|
|
||
| private static TemporalRuntime CreateRuntime( | ||
| ResolvedLambdaWorkerOpenTelemetryOptions options) | ||
| { | ||
| var openTelemetryOptions = new Temporalio.Runtime.OpenTelemetryOptions( | ||
| options.CollectorEndpoint) | ||
| { | ||
| MetricsExportInterval = options.MetricsExportInterval, | ||
| Protocol = OpenTelemetryProtocol.Grpc, | ||
| }; | ||
| return new TemporalRuntime(new TemporalRuntimeOptions | ||
| { | ||
| Telemetry = new TelemetryOptions | ||
| { | ||
| Metrics = new MetricsOptions(openTelemetryOptions) | ||
| { | ||
| GlobalTags = new[] | ||
| { | ||
| new KeyValuePair<string, string>( | ||
| ServiceNameResourceAttribute, | ||
| options.ServiceName), | ||
| }, | ||
| }, | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| private static int ToTimeoutMilliseconds(TimeSpan timeout) | ||
| { | ||
| if (timeout <= TimeSpan.Zero) | ||
| { | ||
| return 0; | ||
| } | ||
| if (timeout.TotalMilliseconds >= int.MaxValue) | ||
| { | ||
| return int.MaxValue; | ||
| } | ||
| return (int)timeout.TotalMilliseconds; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| using System; | ||
|
|
||
| namespace Temporalio.Extensions.Aws.Lambda.OpenTelemetry | ||
| { | ||
| /// <summary> | ||
| /// Options for <see cref="LambdaWorkerOpenTelemetry.ApplyDefaults"/>. | ||
| /// </summary> | ||
| public class LambdaWorkerOpenTelemetryOptions | ||
| { | ||
| /// <summary> | ||
| /// Gets or sets how often the Core SDK exports metrics to the collector. | ||
| /// </summary> | ||
| public TimeSpan MetricsExportInterval { get; set; } = TimeSpan.FromSeconds(10); | ||
|
|
||
| /// <summary> | ||
| /// Gets or sets the OpenTelemetry service name. If unset, this falls back to | ||
| /// OTEL_SERVICE_NAME, then AWS_LAMBDA_FUNCTION_NAME, then "temporal-lambda-worker". | ||
| /// </summary> | ||
| public string? ServiceName { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Gets or sets the OTLP collector endpoint. If unset, this falls back to | ||
| /// OTEL_EXPORTER_OTLP_ENDPOINT, then "http://localhost:4317". | ||
| /// </summary> | ||
| public string? CollectorEndpoint { get; set; } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please alphabetize