diff --git a/src/Temporalio/Client/StartActivityOptions.cs b/src/Temporalio/Client/StartActivityOptions.cs index 570accc6..2fb0ae80 100644 --- a/src/Temporalio/Client/StartActivityOptions.cs +++ b/src/Temporalio/Client/StartActivityOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Temporalio.Api.Enums.V1; using Temporalio.Common; @@ -114,6 +115,26 @@ public StartActivityOptions(string id, string taskQueue) /// public RpcOptions? Rpc { get; set; } + /// + /// Gets or sets the on-conflict options. + /// + internal Api.Common.V1.OnConflictOptions? OnConflictOptions { get; set; } + + /// + /// Gets or sets the completion callbacks. + /// + internal IReadOnlyCollection? CompletionCallbacks { get; set; } + + /// + /// Gets or sets the links to attach on activity start. + /// + internal IReadOnlyCollection? Links { get; set; } + + /// + /// Gets or sets the request ID for the activity start request. If null, a new GUID is used. + /// + internal string? RequestId { get; set; } + /// /// Create a shallow copy of these options. /// diff --git a/src/Temporalio/Client/TemporalClient.Activity.cs b/src/Temporalio/Client/TemporalClient.Activity.cs index 4b9675b5..ad2c49d7 100644 --- a/src/Temporalio/Client/TemporalClient.Activity.cs +++ b/src/Temporalio/Client/TemporalClient.Activity.cs @@ -12,6 +12,7 @@ using Temporalio.Common; using Temporalio.Converters; using Temporalio.Exceptions; +using Temporalio.Nexus; #if NETCOREAPP3_0_OR_GREATER using System.Runtime.CompilerServices; @@ -118,7 +119,7 @@ public override async Task> StartActivityAsync( Name = input.Options.TaskQueue!, }, Identity = Client.Connection.Options.Identity, - RequestId = Guid.NewGuid().ToString(), + RequestId = input.Options.RequestId ?? Guid.NewGuid().ToString(), IdReusePolicy = input.Options.IdReusePolicy, IdConflictPolicy = input.Options.IdConflictPolicy, RetryPolicy = input.Options.RetryPolicy?.ToProto(), @@ -126,7 +127,16 @@ public override async Task> StartActivityAsync( input.Options.StaticSummary, input.Options.StaticDetails). ConfigureAwait(false), Priority = input.Options.Priority?.ToProto(), + OnConflictOptions = input.Options.OnConflictOptions, }; + if (input.Options.CompletionCallbacks is { } completionCallbacks) + { + req.CompletionCallbacks.AddRange(completionCallbacks); + } + if (input.Options.Links is { } activityLinks) + { + req.Links.AddRange(activityLinks); + } if (input.Args.Count > 0) { req.Input = new Payloads(); @@ -173,6 +183,11 @@ public override async Task> StartActivityAsync( var resp = await Client.Connection.WorkflowService.StartActivityExecutionAsync( req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); + if (NexusOperationExecutionContext.HasCurrent && + resp.Link?.ToNexusLink() is { } nexusLink) + { + NexusOperationExecutionContext.Current.HandlerContext.OutboundLinks.Add(nexusLink); + } return new ActivityHandle( Client: Client, Id: input.Options.Id!, diff --git a/src/Temporalio/Client/TemporalClient.Workflow.cs b/src/Temporalio/Client/TemporalClient.Workflow.cs index 791a478e..251d3bc4 100644 --- a/src/Temporalio/Client/TemporalClient.Workflow.cs +++ b/src/Temporalio/Client/TemporalClient.Workflow.cs @@ -14,6 +14,7 @@ using Temporalio.Common; using Temporalio.Converters; using Temporalio.Exceptions; +using Temporalio.Nexus; #if NETCOREAPP3_0_OR_GREATER using System.Runtime.CompilerServices; @@ -739,6 +740,19 @@ private async Task> StartWorkflowInternalAsyn } var resp = await Client.Connection.WorkflowService.StartWorkflowExecutionAsync( req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); + if (NexusOperationExecutionContext.HasCurrent) + { + // Prefer the link returned by the server; fall back to a + // WorkflowExecutionStarted link for older servers that don't populate it. + var nexusLink = resp.Link?.ToNexusLink() ?? new Link.Types.WorkflowEvent + { + Namespace = req.Namespace, + WorkflowId = req.WorkflowId, + RunId = resp.RunId, + EventRef = new() { EventId = 1, EventType = EventType.WorkflowExecutionStarted }, + }.ToNexusLink(); + NexusOperationExecutionContext.Current.HandlerContext.OutboundLinks.Add(nexusLink); + } return new WorkflowHandle( Client: Client, Id: req.WorkflowId, diff --git a/src/Temporalio/Nexus/ITemporalNexusClient.cs b/src/Temporalio/Nexus/ITemporalNexusClient.cs index 59bccb15..1a0a9c4e 100644 --- a/src/Temporalio/Nexus/ITemporalNexusClient.cs +++ b/src/Temporalio/Nexus/ITemporalNexusClient.cs @@ -45,7 +45,8 @@ public interface ITemporalNexusClient /// Workflow class type. /// Workflow result type. /// Invocation of workflow run method with a result. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// An async operation result containing the workflow-run token. Task> StartWorkflowAsync( Expression>> workflowRunCall, WorkflowOptions options); @@ -56,7 +57,8 @@ Task> StartWorkflowAsync( /// /// Workflow class type. /// Invocation of workflow run method with no result. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// An async operation result containing the workflow-run token. Task> StartWorkflowAsync( Expression> workflowRunCall, WorkflowOptions options); @@ -68,9 +70,52 @@ Task> StartWorkflowAsync( /// Workflow result type. /// Workflow type name. /// Arguments for the workflow. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// An async operation result containing the workflow-run token. Task> StartWorkflowAsync( string workflow, IReadOnlyCollection args, WorkflowOptions options); + + /// + /// Schedule a standalone activity via a lambda invoking the activity method. Always returns + /// an async result with an activity-execution operation token. + /// + /// Activity result type. + /// Invocation of activity method with a result. + /// Activity start options. Id is required and should be derived + /// deterministically from the operation input so retries of the Nexus start request are + /// idempotent. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is also + /// required. TaskQueue defaults to the operation's task queue when omitted. + /// An async operation result containing the activity-execution token. + Task> StartActivityAsync( + Expression>> activityCall, StartActivityOptions options); + + /// + /// Schedule a standalone activity via a lambda invoking the activity method with no return + /// value. Always returns an async result with an activity-execution operation token. + /// + /// Invocation of activity method with no result. + /// Activity start options. Id is required and should be derived + /// deterministically from the operation input so retries of the Nexus start request are + /// idempotent. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is also + /// required. TaskQueue defaults to the operation's task queue when omitted. + /// An async operation result containing the activity-execution token. + Task> StartActivityAsync( + Expression> activityCall, StartActivityOptions options); + + /// + /// Schedule a standalone activity by name. Always returns an async result with an + /// activity-execution operation token. + /// + /// Activity result type. + /// Activity type name. + /// Arguments for the activity. + /// Activity start options. Id is required and should be derived + /// deterministically from the operation input so retries of the Nexus start request are + /// idempotent. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is also + /// required. TaskQueue defaults to the operation's task queue when omitted. + /// An async operation result containing the activity-execution token. + Task> StartActivityAsync( + string activity, IReadOnlyCollection args, StartActivityOptions options); } } diff --git a/src/Temporalio/Nexus/NexusActivityExecutionToken.cs b/src/Temporalio/Nexus/NexusActivityExecutionToken.cs new file mode 100644 index 00000000..2f9c466b --- /dev/null +++ b/src/Temporalio/Nexus/NexusActivityExecutionToken.cs @@ -0,0 +1,95 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Temporalio.Nexus +{ + /// + /// Internal helper for building and parsing activity-execution operation tokens used by the + /// generic when an operation is backed by a standalone + /// activity. + /// + internal static class NexusActivityExecutionToken + { + /// + /// Token-type value identifying an activity-execution operation token. + /// + internal const int OperationTokenType = 2; + + /// + /// Build a base64url-encoded activity-execution operation token. + /// + /// Activity namespace. + /// Activity ID. + /// Activity run ID. May be null when building the token used in + /// the completion-callback header (which is sent before the run ID is known). + /// Base64url-encoded operation token. + internal static string Create(string namespace_, string activityId, string? runId) => + NexusWorkflowRunHandle.Base64UrlEncode(JsonSerializer.SerializeToUtf8Bytes( + new Token(namespace_, activityId, runId, null), + NexusWorkflowRunHandle.TokenSerializerOptions)); + + /// + /// Parse an activity-execution operation token into its underlying fields. + /// + /// Base64url-encoded token string. + /// Parsed token fields. + /// If the token is invalid. + internal static Token Parse(string token) + { + byte[] bytes; + try + { + bytes = NexusWorkflowRunHandle.Base64UrlDecode(token); + } + catch (FormatException) + { + throw new ArgumentException("Token invalid"); + } + Token? tokenObj; + try + { + tokenObj = JsonSerializer.Deserialize( + bytes, NexusWorkflowRunHandle.TokenSerializerOptions); + } + catch (JsonException e) + { + throw new ArgumentException("Token invalid", e); + } + if (tokenObj == null) + { + throw new ArgumentException("Token invalid"); + } + if (tokenObj.Type != OperationTokenType) + { + throw new ArgumentException( + $"Invalid activity execution token type: {tokenObj.Type}, " + + $"expected: {OperationTokenType}"); + } + if (tokenObj.Version != null && tokenObj.Version != 0) + { + throw new ArgumentException($"Unsupported token version: {tokenObj.Version}"); + } + if (string.IsNullOrEmpty(tokenObj.ActivityId)) + { + throw new ArgumentException("Token invalid: missing activity ID (aid)"); + } + return tokenObj; + } + + /// + /// Represents the fields of an activity-execution operation token. + /// + internal record Token( + [property: JsonPropertyName("ns")] + string Namespace, + [property: JsonPropertyName("aid")] + string ActivityId, + [property: JsonPropertyName("rid")] + string? RunId, + [property: JsonPropertyName("v")] + int? Version, + [property: JsonPropertyName("t")] + int Type = OperationTokenType); + } +} diff --git a/src/Temporalio/Nexus/NexusActivityStartHelper.cs b/src/Temporalio/Nexus/NexusActivityStartHelper.cs new file mode 100644 index 00000000..6fea3ce0 --- /dev/null +++ b/src/Temporalio/Nexus/NexusActivityStartHelper.cs @@ -0,0 +1,74 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using NexusRpc.Handlers; +using Temporalio.Client; + +namespace Temporalio.Nexus +{ + /// + /// Internal helper for starting standalone activities from Nexus operations and managing + /// activity-execution operation tokens. + /// + internal static class NexusActivityStartHelper + { + /// + /// Start a standalone activity and return the activity-execution operation token. This + /// handles all Nexus plumbing: cloning options, defaulting task queue / ID, processing + /// links, injecting callbacks, and adding outbound links. + /// + /// Temporal client. + /// Nexus start context for callbacks and links. + /// Temporal operation context for info and logging. + /// Activity type name. + /// Activity arguments. + /// Activity start options. Either ScheduleToCloseTimeout or + /// StartToCloseTimeout must be set; TaskQueue defaults to the operation's task queue. + /// Base64url-encoded operation token. + internal static async Task StartActivityAsync( + ITemporalClient client, + OperationStartContext nexusStartContext, + NexusOperationExecutionContext temporalContext, + string activity, + IReadOnlyCollection args, + StartActivityOptions options) + { + // Shallow clone so we can mutate + options = (StartActivityOptions)options.Clone(); + options.TaskQueue ??= temporalContext.Info.TaskQueue; + + var namespace_ = client.Options.Namespace; + var activityId = options.Id!; + + // Build the callback-header token without a run ID (we don't have it yet). + var callbackToken = NexusActivityExecutionToken.Create(namespace_, activityId, runId: null); + + if (options.IdConflictPolicy == Api.Enums.V1.ActivityIdConflictPolicy.UseExisting) + { + options.OnConflictOptions = new() + { + AttachLinks = true, + AttachCompletionCallbacks = true, + AttachRequestId = true, + }; + } + if (NexusOperationStartHelper.CreateInboundLinks( + nexusStartContext, temporalContext) is { } links) + { + options.Links = links; + } + if (NexusOperationStartHelper.CreateCallback( + nexusStartContext, callbackToken, options.Links) is { } callback) + { + options.CompletionCallbacks = new[] { callback }; + } + options.RequestId = nexusStartContext.RequestId; + + // Do the start call + var handle = await client.StartActivityAsync( + activity, args, options).ConfigureAwait(false); + + // Return a token that includes the run ID from the start response. + return NexusActivityExecutionToken.Create(namespace_, activityId, handle.RunId); + } + } +} diff --git a/src/Temporalio/Nexus/NexusOperationStartHelper.cs b/src/Temporalio/Nexus/NexusOperationStartHelper.cs new file mode 100644 index 00000000..d4322fa1 --- /dev/null +++ b/src/Temporalio/Nexus/NexusOperationStartHelper.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Extensions.Logging; +using NexusRpc.Handlers; +using Temporalio.Api.Common.V1; + +namespace Temporalio.Nexus +{ + /// + /// Plumbing shared by and + /// for translating inbound Nexus links and building the + /// completion callback. + /// + internal static class NexusOperationStartHelper + { + /// + /// Header name used to carry the Nexus operation token on completion callbacks. + /// + internal const string NexusOperationTokenHeader = "Nexus-Operation-Token"; + + /// + /// Translate inbound Nexus links into Temporal proto links. Unrecognized link types are + /// dropped silently; malformed links are logged and dropped. + /// + /// Nexus start context. + /// Temporal operation context (used for logging). + /// List of proto links, or null if there are no inbound links. + internal static IReadOnlyCollection? CreateInboundLinks( + OperationStartContext nexusStartContext, + NexusOperationExecutionContext temporalContext) + { + if (nexusStartContext.InboundLinks.Count == 0) + { + return null; + } + return nexusStartContext.InboundLinks.Select(link => + { + try + { + return link.ToProtoLink(); + } + catch (ArgumentException e) + { + temporalContext.Logger.LogWarning(e, "Invalid Nexus link: {Url}", link.Uri); + return null; + } + }).OfType().ToList(); + } + + /// + /// Build the completion callback from the start context, attaching the operation token + /// header (unless the caller already supplied one) and any inbound links. + /// + /// Nexus start context. + /// Operation token to inject as a callback header. + /// Links to attach to the callback (typically from + /// ). + /// Callback to attach to start options, or null if no callback URL is set. + internal static Callback? CreateCallback( + OperationStartContext nexusStartContext, + string token, + IReadOnlyCollection? links) + { + if (nexusStartContext.CallbackUrl is not { } callbackUrl) + { + return null; + } + var callback = new Callback() { Nexus = new() { Url = callbackUrl } }; + var callbackHeadersHasToken = false; + if (nexusStartContext.CallbackHeaders is { } callbackHeaders) + { + foreach (var kv in callbackHeaders) + { + callback.Nexus.Header.Add(kv.Key, kv.Value); + if (string.Equals( + kv.Key, NexusOperationTokenHeader, StringComparison.OrdinalIgnoreCase)) + { + callbackHeadersHasToken = true; + } + } + } + // Set operation token if not already present (header is case-insensitive) + if (!callbackHeadersHasToken) + { + callback.Nexus.Header[NexusOperationTokenHeader] = token; + } + if (links is { } notNullLinks) + { + callback.Links.AddRange(notNullLinks); + } + return callback; + } + } +} diff --git a/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs b/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs index 5740a902..1a2065b6 100644 --- a/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs +++ b/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs @@ -19,7 +19,10 @@ public class NexusWorkflowRunHandle /// internal const int WorkflowRunOperationTokenType = 1; - private static readonly JsonSerializerOptions TokenSerializerOptions = new() + /// + /// JSON options shared by all Nexus operation-token serialization (omits null fields). + /// + internal static readonly JsonSerializerOptions TokenSerializerOptions = new() { #pragma warning disable SYSLIB0020 // Need to use obsolete form, alternative not in all our versions IgnoreNullValues = true, @@ -84,6 +87,44 @@ internal static byte[] Base64UrlDecode(string s) return Convert.FromBase64String(s); } + /// + /// Decode just the type field from a base64url-encoded operation token. Used by cancel + /// dispatch to route by token type without committing to a particular per-type schema. + /// + /// Base64url-encoded token string. + /// Token type code (the JSON t field). + /// If the token is empty or malformed. + internal static int ParseTokenType(string token) + { + if (string.IsNullOrEmpty(token)) + { + throw new ArgumentException("Token invalid: token is empty"); + } + byte[] bytes; + try + { + bytes = Base64UrlDecode(token); + } + catch (FormatException) + { + throw new ArgumentException("Token invalid"); + } + TokenTypeOnly? partial; + try + { + partial = JsonSerializer.Deserialize(bytes, TokenSerializerOptions); + } + catch (JsonException e) + { + throw new ArgumentException("Token invalid", e); + } + if (partial == null || partial.Type == 0) + { + throw new ArgumentException("Token invalid: missing or zero token type"); + } + return partial.Type; + } + /// /// Create a handle based on the string token. /// @@ -154,6 +195,9 @@ internal record OperationToken( int? Version, [property: JsonPropertyName("t")] int Type = WorkflowRunOperationTokenType); + + private record TokenTypeOnly( + [property: JsonPropertyName("t")] int Type); } /// diff --git a/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs b/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs index 0640868f..01270811 100644 --- a/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs +++ b/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs @@ -1,10 +1,6 @@ -using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; using NexusRpc.Handlers; -using Temporalio.Api.Common.V1; using Temporalio.Api.Enums.V1; using Temporalio.Client; @@ -16,8 +12,6 @@ namespace Temporalio.Nexus /// internal static class NexusWorkflowStartHelper { - private const string NexusOperationTokenHeader = "Nexus-Operation-Token"; - /// /// Start a workflow and return the workflow-run handle. This handles all Nexus plumbing: /// cloning options, setting task queue, processing links, injecting callbacks, and @@ -27,7 +21,8 @@ internal static class NexusWorkflowStartHelper /// Temporal operation context for client, info, and logging. /// Workflow type name. /// Workflow arguments. - /// Workflow start options. ID and TaskQueue are required. + /// Workflow start options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// Workflow-run handle for the started workflow. internal static async Task StartWorkflowAsync( OperationStartContext nexusStartContext, @@ -58,64 +53,22 @@ internal static async Task StartWorkflowAsync( AttachRequestId = true, }; } - if (nexusStartContext.InboundLinks.Count > 0) + if (NexusOperationStartHelper.CreateInboundLinks( + nexusStartContext, temporalContext) is { } links) { - options.Links = nexusStartContext.InboundLinks.Select(link => - { - try - { - return new Link { WorkflowEvent = link.ToWorkflowEvent() }; - } - catch (ArgumentException e) - { - temporalContext.Logger.LogWarning(e, "Invalid Nexus link: {Url}", link.Uri); - return null; - } - }).OfType().ToList(); + options.Links = links; } - if (nexusStartContext.CallbackUrl is { } callbackUrl) + if (NexusOperationStartHelper.CreateCallback( + nexusStartContext, token, options.Links) is { } callback) { - var callback = new Callback() { Nexus = new() { Url = callbackUrl } }; - var callbackHeadersHasToken = false; - if (nexusStartContext.CallbackHeaders is { } callbackHeaders) - { - foreach (var kv in callbackHeaders) - { - callback.Nexus.Header.Add(kv.Key, kv.Value); - if (string.Equals( - kv.Key, NexusOperationTokenHeader, StringComparison.OrdinalIgnoreCase)) - { - callbackHeadersHasToken = true; - } - } - } - // Set operation token if not already present (header is case-insensitive) - if (!callbackHeadersHasToken) - { - callback.Nexus.Header[NexusOperationTokenHeader] = token; - } - if (options.Links is { } links) - { - callback.Links.AddRange(links); - } options.CompletionCallbacks = new[] { callback }; } options.RequestId = nexusStartContext.RequestId; // Do the start call - var wfHandle = await client.StartWorkflowAsync( + await client.StartWorkflowAsync( workflow, args, options).ConfigureAwait(false); - // Add the outbound link - nexusStartContext.OutboundLinks.Add(new Link.Types.WorkflowEvent - { - Namespace = namespace_, - WorkflowId = workflowId, - RunId = wfHandle.FirstExecutionRunId ?? - throw new InvalidOperationException("Handle unexpectedly missing run ID"), - EventRef = new() { EventId = 1, EventType = EventType.WorkflowExecutionStarted }, - }.ToNexusLink()); - return handle; } } diff --git a/src/Temporalio/Nexus/ProtoLinkExtensions.cs b/src/Temporalio/Nexus/ProtoLinkExtensions.cs index 6a975122..1c011ee0 100644 --- a/src/Temporalio/Nexus/ProtoLinkExtensions.cs +++ b/src/Temporalio/Nexus/ProtoLinkExtensions.cs @@ -32,6 +32,44 @@ internal static class ProtoLinkExtensions private static readonly char[] QuerySeparator = new[] { '&' }; private static readonly char[] QueryValueSeparator = new[] { '=' }; + /// + /// Convert a Nexus link to a Temporal proto link, dispatching by the link's type. + /// + /// Nexus link. + /// Proto link with the appropriate variant populated, or null if the type + /// is unrecognized. + /// If the link is malformed for its declared type. + public static Api.Common.V1.Link? ToProtoLink(this NexusLink link) + { + if (link.Type == Api.Common.V1.Link.Types.WorkflowEvent.Descriptor.FullName) + { + return new() { WorkflowEvent = link.ToWorkflowEvent() }; + } + if (link.Type == Api.Common.V1.Link.Types.Activity.Descriptor.FullName) + { + return new() { Activity = link.ToActivity() }; + } + return null; + } + + /// + /// Convert a Temporal proto link to a Nexus link by dispatching on its variant. + /// + /// Proto link. + /// Nexus link, or null if the variant is unrecognized. + public static NexusLink? ToNexusLink(this Api.Common.V1.Link link) + { + if (link.WorkflowEvent is { } evt) + { + return evt.ToNexusLink(); + } + if (link.Activity is { } act) + { + return act.ToNexusLink(); + } + return null; + } + /// /// Convert a workflow event to a Nexus link. /// @@ -70,6 +108,55 @@ public static NexusLink ToNexusLink(this Api.Common.V1.Link.Types.WorkflowEvent return new(builder.Uri, Api.Common.V1.Link.Types.WorkflowEvent.Descriptor.FullName); } + /// + /// Convert an activity link to a Nexus link. + /// + /// Activity link to convert. + /// Nexus link. + public static NexusLink ToNexusLink(this Api.Common.V1.Link.Types.Activity act) + { + var builder = new UriBuilder + { + Scheme = "temporal", + Path = "/namespaces/" + Uri.EscapeDataString(act.Namespace) + "/activities/" + + Uri.EscapeDataString(act.ActivityId) + "/" + Uri.EscapeDataString(act.RunId) + + "/details", + }; + return new(builder.Uri, Api.Common.V1.Link.Types.Activity.Descriptor.FullName); + } + + /// + /// Convert a Nexus link to an activity link. + /// + /// Nexus link. + /// Activity link. + /// If the link is invalid. + public static Api.Common.V1.Link.Types.Activity ToActivity(this NexusLink link) + { + if (link.Uri.Scheme != "temporal") + { + throw new ArgumentException("Invalid scheme"); + } + if (link.Uri.Host.Length > 0) + { + throw new ArgumentException("Unexpected host"); + } + var pathPieces = link.Uri.AbsolutePath.TrimStart('/').Split('/'); + if (pathPieces.Length != 6 || + pathPieces[0] != "namespaces" || + pathPieces[2] != "activities" || + pathPieces[5] != "details") + { + throw new ArgumentException("Invalid path"); + } + return new Api.Common.V1.Link.Types.Activity + { + Namespace = Uri.UnescapeDataString(pathPieces[1]), + ActivityId = Uri.UnescapeDataString(pathPieces[3]), + RunId = Uri.UnescapeDataString(pathPieces[4]), + }; + } + /// /// Convert a Nexus link to a workflow event. /// diff --git a/src/Temporalio/Nexus/TemporalNexusClient.cs b/src/Temporalio/Nexus/TemporalNexusClient.cs index cd8e8252..73cbfa02 100644 --- a/src/Temporalio/Nexus/TemporalNexusClient.cs +++ b/src/Temporalio/Nexus/TemporalNexusClient.cs @@ -75,5 +75,45 @@ public async Task> StartWorkflowAsync( options).ConfigureAwait(false); return TemporalOperationResult.AsyncResult(handle.ToToken()); } + + /// + public Task> StartActivityAsync( + Expression>> activityCall, StartActivityOptions options) + { + var (method, args) = Common.ExpressionUtil.ExtractCall(activityCall); + return StartActivityAsync( + Activities.ActivityDefinition.NameFromMethodForCall(method), + args, + options); + } + + /// + public async Task> StartActivityAsync( + Expression> activityCall, StartActivityOptions options) + { + var (method, args) = Common.ExpressionUtil.ExtractCall(activityCall); + var token = await NexusActivityStartHelper.StartActivityAsync( + TemporalClient, + nexusStartContext, + temporalContext, + Activities.ActivityDefinition.NameFromMethodForCall(method), + args, + options).ConfigureAwait(false); + return TemporalOperationResult.AsyncResult(token); + } + + /// + public async Task> StartActivityAsync( + string activity, IReadOnlyCollection args, StartActivityOptions options) + { + var token = await NexusActivityStartHelper.StartActivityAsync( + TemporalClient, + nexusStartContext, + temporalContext, + activity, + args, + options).ConfigureAwait(false); + return TemporalOperationResult.AsyncResult(token); + } } } diff --git a/src/Temporalio/Nexus/TemporalOperationHandler.cs b/src/Temporalio/Nexus/TemporalOperationHandler.cs index 377ad818..e43783da 100644 --- a/src/Temporalio/Nexus/TemporalOperationHandler.cs +++ b/src/Temporalio/Nexus/TemporalOperationHandler.cs @@ -112,29 +112,56 @@ public async Task> StartAsync( /// public Task CancelAsync(OperationCancelContext context) { - NexusWorkflowRunHandle.OperationToken token; + int tokenType; try { - token = NexusWorkflowRunHandle.ParseToken(context.OperationToken); + tokenType = NexusWorkflowRunHandle.ParseTokenType(context.OperationToken); } catch (ArgumentException e) { throw new HandlerException(HandlerErrorType.BadRequest, e.Message); } - if (token.Namespace != NexusOperationExecutionContext.Current.Info.Namespace) + switch (tokenType) { - throw new HandlerException(HandlerErrorType.BadRequest, "Invalid namespace"); - } - return token.Type switch - { - NexusWorkflowRunHandle.WorkflowRunOperationTokenType => - CancelWorkflowRunAsync( + case NexusWorkflowRunHandle.WorkflowRunOperationTokenType: + NexusWorkflowRunHandle.OperationToken wfToken; + try + { + wfToken = NexusWorkflowRunHandle.ParseToken(context.OperationToken); + } + catch (ArgumentException e) + { + throw new HandlerException(HandlerErrorType.BadRequest, e.Message); + } + if (wfToken.Namespace != NexusOperationExecutionContext.Current.Info.Namespace) + { + throw new HandlerException(HandlerErrorType.BadRequest, "Invalid namespace"); + } + return CancelWorkflowRunAsync( new TemporalOperationCancelContext(context), - new CancelWorkflowRunInput(token.WorkflowId)), - _ => throw new HandlerException( - HandlerErrorType.BadRequest, - $"Unsupported token type: {token.Type}"), - }; + new CancelWorkflowRunInput(wfToken.WorkflowId)); + case NexusActivityExecutionToken.OperationTokenType: + NexusActivityExecutionToken.Token actToken; + try + { + actToken = NexusActivityExecutionToken.Parse(context.OperationToken); + } + catch (ArgumentException e) + { + throw new HandlerException(HandlerErrorType.BadRequest, e.Message); + } + if (actToken.Namespace != NexusOperationExecutionContext.Current.Info.Namespace) + { + throw new HandlerException(HandlerErrorType.BadRequest, "Invalid namespace"); + } + return CancelActivityExecutionAsync( + new TemporalOperationCancelContext(context), + new CancelActivityExecutionInput(actToken.ActivityId, actToken.RunId)); + default: + throw new HandlerException( + HandlerErrorType.BadRequest, + $"Unsupported token type: {tokenType}"); + } } /// @@ -149,6 +176,19 @@ protected virtual Task CancelWorkflowRunAsync( TemporalOperationCancelContext context, CancelWorkflowRunInput input) => NexusOperationExecutionContext.Current.TemporalClient .GetWorkflowHandle(input.WorkflowId).CancelAsync(); + + /// + /// Called when a cancel request is received for an activity-execution token. Override to + /// customize cancel behavior. + /// Default behavior: cancels the underlying standalone activity. + /// + /// The cancel context. + /// Activity-execution cancel input. + /// Task for cancel completion. + protected virtual Task CancelActivityExecutionAsync( + TemporalOperationCancelContext context, CancelActivityExecutionInput input) => + NexusOperationExecutionContext.Current.TemporalClient + .GetActivityHandle(input.ActivityId, input.RunId).CancelAsync(); } /// @@ -170,6 +210,35 @@ public class CancelWorkflowRunInput public string WorkflowId { get; } } + /// + /// Input passed to + /// . + /// + /// WARNING: Nexus support is experimental. + public class CancelActivityExecutionInput + { + /// + /// Initializes a new instance of the class. + /// + /// Activity ID extracted from the operation token. + /// Activity run ID extracted from the operation token. + public CancelActivityExecutionInput(string activityId, string? runId) + { + ActivityId = activityId; + RunId = runId; + } + + /// + /// Gets the activity ID extracted from the operation token. + /// + public string ActivityId { get; } + + /// + /// Gets the activity run ID extracted from the operation token. + /// + public string? RunId { get; } + } + /// /// Context passed to the start function of a /// . diff --git a/src/Temporalio/Nexus/WorkflowRunOperationContext.cs b/src/Temporalio/Nexus/WorkflowRunOperationContext.cs index ea742e07..9cc4a492 100644 --- a/src/Temporalio/Nexus/WorkflowRunOperationContext.cs +++ b/src/Temporalio/Nexus/WorkflowRunOperationContext.cs @@ -31,7 +31,8 @@ internal WorkflowRunOperationContext(OperationStartContext handlerContext) => /// /// Workflow class type. /// Invocation of workflow run method with no result. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// Nexus workflow run handle to return in handle factory. public Task StartWorkflowAsync( Expression> workflowRunCall, WorkflowOptions options) @@ -49,7 +50,8 @@ public Task StartWorkflowAsync( /// Workflow class type. /// Workflow result type. /// Invocation of workflow run method with a result. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// Nexus workflow run handle to return in handle factory. public Task> StartWorkflowAsync( Expression>> workflowRunCall, WorkflowOptions options) @@ -66,7 +68,8 @@ public Task> StartWorkflowAsync /// Workflow type name. /// Arguments for the workflow. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// Nexus workflow run handle to return in handle factory. #pragma warning disable CA1822 // We don't want this static public Task StartWorkflowAsync( @@ -88,7 +91,8 @@ public Task StartWorkflowAsync( /// Result type. /// Workflow type name. /// Arguments for the workflow. - /// Start workflow options. ID and TaskQueue are required. + /// Start workflow options. ID is required; TaskQueue defaults to + /// the operation's task queue when omitted. /// Nexus workflow run handle to return in handle factory. #pragma warning disable CA1822 // We don't want this static public async Task> StartWorkflowAsync( diff --git a/tests/Temporalio.Tests/Nexus/NexusActivityExecutionTokenTests.cs b/tests/Temporalio.Tests/Nexus/NexusActivityExecutionTokenTests.cs new file mode 100644 index 00000000..d37eda51 --- /dev/null +++ b/tests/Temporalio.Tests/Nexus/NexusActivityExecutionTokenTests.cs @@ -0,0 +1,132 @@ +namespace Temporalio.Tests.Nexus; + +using System.Text; +using System.Text.Json; +using Temporalio.Nexus; +using Xunit; + +public class NexusActivityExecutionTokenTests +{ + [Fact] + public void Build_RoundTrips() + { + var token = NexusActivityExecutionToken.Create("my-namespace", "my-activity-id", "my-run-id"); + var decoded = NexusActivityExecutionToken.Parse(token); + + Assert.Equal("my-namespace", decoded.Namespace); + Assert.Equal("my-activity-id", decoded.ActivityId); + Assert.Equal("my-run-id", decoded.RunId); + Assert.Null(decoded.Version); + } + + [Fact] + public void Build_RoundTripsWithoutRunId() + { + var token = NexusActivityExecutionToken.Create("ns", "aid", runId: null); + var decoded = NexusActivityExecutionToken.Parse(token); + + Assert.Equal("ns", decoded.Namespace); + Assert.Equal("aid", decoded.ActivityId); + Assert.Null(decoded.RunId); + } + + [Fact] + public void Build_UsesBase64Url_NoPadding() + { + var token = NexusActivityExecutionToken.Create("my-ns", "my-aid", "my-rid"); + + Assert.DoesNotContain("+", token); + Assert.DoesNotContain("/", token); + Assert.DoesNotContain("=", token); + } + + [Fact] + public void Build_JsonUsesCorrectKeys() + { + var token = NexusActivityExecutionToken.Create("ns", "aid", "rid"); + var json = Encoding.UTF8.GetString(NexusWorkflowRunHandle.Base64UrlDecode(token)); + using var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + Assert.Equal( + NexusActivityExecutionToken.OperationTokenType, + root.GetProperty("t").GetInt32()); + Assert.Equal("ns", root.GetProperty("ns").GetString()); + Assert.Equal("aid", root.GetProperty("aid").GetString()); + Assert.Equal("rid", root.GetProperty("rid").GetString()); + } + + [Fact] + public void Parse_RejectsWorkflowRunToken() + { + // A workflow-run token (t=1) must not parse as activity-execution. + var wfToken = new NexusWorkflowRunHandle("ns", "wid", 0).ToToken(); + Assert.Throws(() => NexusActivityExecutionToken.Parse(wfToken)); + } + + [Fact] + public void Parse_RejectsMissingActivityId() + { + var json = """{"t":2,"ns":"ns"}"""; + var token = NexusWorkflowRunHandle.Base64UrlEncode(Encoding.UTF8.GetBytes(json)); + Assert.Throws(() => NexusActivityExecutionToken.Parse(token)); + } + + [Fact] + public void Parse_RejectsUnsupportedVersion() + { + var json = """{"t":2,"ns":"ns","aid":"aid","v":1}"""; + var token = NexusWorkflowRunHandle.Base64UrlEncode(Encoding.UTF8.GetBytes(json)); + Assert.Throws(() => NexusActivityExecutionToken.Parse(token)); + } + + [Fact] + public void Parse_RejectsInvalidBase64() + { + Assert.Throws( + () => NexusActivityExecutionToken.Parse("!!!invalid!!!")); + } + + [Fact] + public void LoadTokenType_DetectsActivityToken() + { + var token = NexusActivityExecutionToken.Create("ns", "aid", "rid"); + Assert.Equal( + NexusActivityExecutionToken.OperationTokenType, + NexusWorkflowRunHandle.ParseTokenType(token)); + } + + [Fact] + public void LoadTokenType_DetectsWorkflowToken() + { + var token = new NexusWorkflowRunHandle("ns", "wid", 0).ToToken(); + Assert.Equal( + NexusWorkflowRunHandle.WorkflowRunOperationTokenType, + NexusWorkflowRunHandle.ParseTokenType(token)); + } + + [Fact] + public void LoadTokenType_RejectsEmpty() + { + Assert.Throws(() => NexusWorkflowRunHandle.ParseTokenType(string.Empty)); + } + + [Fact] + public void LoadTokenType_RejectsMissingType() + { + var json = """{"ns":"ns","wid":"wid"}"""; + var token = NexusWorkflowRunHandle.Base64UrlEncode(Encoding.UTF8.GetBytes(json)); + Assert.Throws(() => NexusWorkflowRunHandle.ParseTokenType(token)); + } + + [Fact] + public void Build_SpecialCharactersRoundTrip() + { + var token = NexusActivityExecutionToken.Create("ns/with+special", "aid?id=1&foo=bar", "rid/with+special"); + var decoded = NexusActivityExecutionToken.Parse(token); + + Assert.Equal("ns/with+special", decoded.Namespace); + Assert.Equal("aid?id=1&foo=bar", decoded.ActivityId); + Assert.Equal("rid/with+special", decoded.RunId); + } +} diff --git a/tests/Temporalio.Tests/Nexus/ProtoLinkExtensionsTests.cs b/tests/Temporalio.Tests/Nexus/ProtoLinkExtensionsTests.cs new file mode 100644 index 00000000..31986ae9 --- /dev/null +++ b/tests/Temporalio.Tests/Nexus/ProtoLinkExtensionsTests.cs @@ -0,0 +1,59 @@ +namespace Temporalio.Tests.Nexus; + +using NexusRpc; +using Temporalio.Api.Common.V1; +using Temporalio.Nexus; +using Xunit; + +public class ProtoLinkExtensionsTests +{ + [Fact] + public void ActivityLink_ToNexusLink_BuildsExpectedUri() + { + var act = new Link.Types.Activity + { + Namespace = "my-ns", + ActivityId = "my-aid", + RunId = "my-run", + }; + var nexusLink = act.ToNexusLink(); + + Assert.Equal("temporal", nexusLink.Uri.Scheme); + Assert.Equal(Link.Types.Activity.Descriptor.FullName, nexusLink.Type); + Assert.Equal( + "/namespaces/my-ns/activities/my-aid/my-run/details", + nexusLink.Uri.AbsolutePath); + } + + [Fact] + public void ToActivity_ParsesServerStyleUri() + { + // Servers produce URIs in the host-less form `temporal:/namespaces/.../details`. + var link = new NexusLink( + new Uri("temporal:/namespaces/my-ns/activities/my-aid/my-run/details"), + Link.Types.Activity.Descriptor.FullName); + + var act = link.ToActivity(); + Assert.Equal("my-ns", act.Namespace); + Assert.Equal("my-aid", act.ActivityId); + Assert.Equal("my-run", act.RunId); + } + + [Fact] + public void ToActivity_RejectsNonTemporalScheme() + { + var link = new NexusLink( + new Uri("https://example/namespaces/ns/activities/aid/run/details"), + Link.Types.Activity.Descriptor.FullName); + Assert.Throws(() => link.ToActivity()); + } + + [Fact] + public void ToActivity_RejectsBadPath() + { + var link = new NexusLink( + new Uri("temporal:///namespaces/ns/workflows/wid/run/history"), + Link.Types.Activity.Descriptor.FullName); + Assert.Throws(() => link.ToActivity()); + } +} diff --git a/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs b/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs index 4ec728a0..0c98f130 100644 --- a/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs @@ -2,6 +2,7 @@ namespace Temporalio.Tests.Worker; using NexusRpc; using NexusRpc.Handlers; +using Temporalio.Activities; using Temporalio.Api.Common.V1; using Temporalio.Api.Enums.V1; using Temporalio.Api.History.V1; @@ -1662,6 +1663,220 @@ await RunInWorkflowAsync(workerOptions, async () => }); } + public static class ActivityStubs + { + private static volatile TaskCompletionSource? waitForCancelReached; + + public static TaskCompletionSource? WaitForCancelReached + { + get => waitForCancelReached; + set => waitForCancelReached = value; + } + + [Activity] + public static Task EchoAsync(string input) => + Task.FromResult($"echo-activity:{input}"); + + [Activity] + public static async Task WaitForCancelAsync() + { + var ctx = ActivityExecutionContext.Current; + waitForCancelReached?.TrySetResult(); + while (!ctx.CancellationToken.IsCancellationRequested) + { + ctx.Heartbeat(); + await Task.Delay(100, ctx.CancellationToken); + } + ctx.CancellationToken.ThrowIfCancellationRequested(); + } + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_StartActivity_Succeeds() + { + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartActivityAsync( + () => ActivityStubs.EchoAsync(input), + new() + { + Id = $"echo-{input}", + ScheduleToCloseTimeout = TimeSpan.FromMinutes(1), + })))). + AddActivity(ActivityStubs.EchoAsync); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + var handle = await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("some-name")); + Assert.Equal("echo-activity:some-name", result); + }); + + // TODO(quinn): re-enable once dev-server supports activity links + // // Outbound link on the start event points to the activity + // var startEvent = Assert.Single( + // (await handle.FetchHistoryAsync()).Events, + // evt => evt.NexusOperationStartedEventAttributes != null); + // var link = Assert.Single(startEvent.Links); + // Assert.NotNull(link.Activity); + // Assert.False(string.IsNullOrEmpty(link.Activity.ActivityId)); + // Assert.False(string.IsNullOrEmpty(link.Activity.RunId)); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_StartActivityByName_Succeeds() + { + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartActivityAsync( + "Echo", + new object?[] { input }, + new() + { + Id = $"echo-byname-{input}", + ScheduleToCloseTimeout = TimeSpan.FromMinutes(1), + })))). + AddActivity(ActivityStubs.EchoAsync); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("by-name")); + Assert.Equal("echo-activity:by-name", result); + }); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_CancelActivity_CancelsUnderlying() + { + ActivityStubs.WaitForCancelReached = new TaskCompletionSource(); + try + { + var activityId = $"act-{Guid.NewGuid()}"; + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartActivityAsync( + "WaitForCancel", + Array.Empty(), + new() + { + Id = activityId, + ScheduleToCloseTimeout = TimeSpan.FromMinutes(2), + HeartbeatTimeout = TimeSpan.FromSeconds(10), + })))). + AddActivity(ActivityStubs.WaitForCancelAsync); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + var wfExc = await Assert.ThrowsAsync(() => + RunInWorkflowAsync(workerOptions, async () => + { + using var cancelSource = new CancellationTokenSource(); + var handle = await Workflow.CreateNexusWorkflowClient(endpoint). + StartNexusOperationAsync( + svc => svc.DoSomething("some-name"), + new() { CancellationToken = cancelSource.Token }); +#pragma warning disable CA1849, VSTHRD103 // https://github.com/temporalio/sdk-dotnet/issues/327 + cancelSource.Cancel(); +#pragma warning restore CA1849, VSTHRD103 + await handle.GetResultAsync(); + })); + var nexusExc = Assert.IsType(wfExc.InnerException); + Assert.IsType(nexusExc.InnerException); + + // Underlying activity was canceled by the default CancelActivityExecutionAsync + await AssertMore.EventuallyAsync(async () => + { + var desc = await Client.GetActivityHandle(activityId).DescribeAsync(); + Assert.Equal(Api.Enums.V1.ActivityExecutionStatus.Canceled, desc.Status); + }); + } + finally + { + ActivityStubs.WaitForCancelReached = null; + } + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_CancelActivityOverride_Invoked() + { + ActivityStubs.WaitForCancelReached = new TaskCompletionSource(); + try + { + var activityId = $"act-{Guid.NewGuid()}"; + CancelActivityOverrideHandler? capturedHandler = null; + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + { + capturedHandler ??= new CancelActivityOverrideHandler( + async (context, client, input) => + await client.StartActivityAsync( + "WaitForCancel", + Array.Empty(), + new() + { + Id = activityId, + ScheduleToCloseTimeout = TimeSpan.FromMinutes(2), + HeartbeatTimeout = TimeSpan.FromSeconds(10), + })); + return capturedHandler; + })). + AddActivity(ActivityStubs.WaitForCancelAsync); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + await Assert.ThrowsAsync(() => + RunInWorkflowAsync(workerOptions, async () => + { + using var cancelSource = new CancellationTokenSource(); + var handle = await Workflow.CreateNexusWorkflowClient(endpoint). + StartNexusOperationAsync( + svc => svc.DoSomething("some-name"), + new() { CancellationToken = cancelSource.Token }); +#pragma warning disable CA1849, VSTHRD103 + cancelSource.Cancel(); +#pragma warning restore CA1849, VSTHRD103 + await handle.GetResultAsync(); + })); + + Assert.NotNull(capturedHandler); + Assert.True(capturedHandler!.CancelCallCount > 0); + Assert.Equal(activityId, capturedHandler.CapturedActivityId); + } + finally + { + ActivityStubs.WaitForCancelReached = null; + } + } + + private class CancelActivityOverrideHandler : TemporalOperationHandler + { + public CancelActivityOverrideHandler( + Func>> startFunc) + : base(startFunc) + { + } + + public int CancelCallCount { get; private set; } + + public string? CapturedActivityId { get; private set; } + + protected override Task CancelActivityExecutionAsync( + TemporalOperationCancelContext context, CancelActivityExecutionInput input) + { + CancelCallCount++; + CapturedActivityId = input.ActivityId; + return base.CancelActivityExecutionAsync(context, input); + } + } + [NexusService] public interface INoInputService {