Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Temporalio/Client/StartActivityOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Temporalio.Api.Enums.V1;
using Temporalio.Common;

Expand Down Expand Up @@ -114,6 +115,26 @@ public StartActivityOptions(string id, string taskQueue)
/// </summary>
public RpcOptions? Rpc { get; set; }

/// <summary>
/// Gets or sets the on-conflict options.
/// </summary>
internal Api.Common.V1.OnConflictOptions? OnConflictOptions { get; set; }

/// <summary>
/// Gets or sets the completion callbacks.
/// </summary>
internal IReadOnlyCollection<Api.Common.V1.Callback>? CompletionCallbacks { get; set; }

/// <summary>
/// Gets or sets the links to attach on activity start.
/// </summary>
internal IReadOnlyCollection<Api.Common.V1.Link>? Links { get; set; }

/// <summary>
/// Gets or sets the request ID for the activity start request. If null, a new GUID is used.
/// </summary>
internal string? RequestId { get; set; }

/// <summary>
/// Create a shallow copy of these options.
/// </summary>
Expand Down
17 changes: 16 additions & 1 deletion src/Temporalio/Client/TemporalClient.Activity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Temporalio.Common;
using Temporalio.Converters;
using Temporalio.Exceptions;
using Temporalio.Nexus;

#if NETCOREAPP3_0_OR_GREATER
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -118,15 +119,24 @@ public override async Task<ActivityHandle<TResult>> StartActivityAsync<TResult>(
Name = input.Options.TaskQueue!,
},
Identity = Client.Connection.Options.Identity,
RequestId = Guid.NewGuid().ToString(),
RequestId = input.Options.RequestId ?? Guid.NewGuid().ToString(),
IdReusePolicy = input.Options.IdReusePolicy,
IdConflictPolicy = input.Options.IdConflictPolicy,
RetryPolicy = input.Options.RetryPolicy?.ToProto(),
UserMetadata = await dataConverter.ToUserMetadataAsync(
input.Options.StaticSummary, input.Options.StaticDetails).
ConfigureAwait(false),
Priority = input.Options.Priority?.ToProto(),
OnConflictOptions = input.Options.OnConflictOptions,
};
if (input.Options.CompletionCallbacks is { } completionCallbacks)
{
req.CompletionCallbacks.AddRange(completionCallbacks);
}
if (input.Options.Links is { } activityLinks)
{
req.Links.AddRange(activityLinks);
}
if (input.Args.Count > 0)
{
req.Input = new Payloads();
Expand Down Expand Up @@ -173,6 +183,11 @@ public override async Task<ActivityHandle<TResult>> StartActivityAsync<TResult>(

var resp = await Client.Connection.WorkflowService.StartActivityExecutionAsync(
req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
if (NexusOperationExecutionContext.HasCurrent &&
resp.Link?.ToNexusLink() is { } nexusLink)
{
NexusOperationExecutionContext.Current.HandlerContext.OutboundLinks.Add(nexusLink);
}
Comment thread
Quinn-With-Two-Ns marked this conversation as resolved.
return new ActivityHandle<TResult>(
Client: Client,
Id: input.Options.Id!,
Expand Down
14 changes: 14 additions & 0 deletions src/Temporalio/Client/TemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Temporalio.Common;
using Temporalio.Converters;
using Temporalio.Exceptions;
using Temporalio.Nexus;

#if NETCOREAPP3_0_OR_GREATER
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -739,6 +740,19 @@ private async Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowInternalAsyn
}
var resp = await Client.Connection.WorkflowService.StartWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
if (NexusOperationExecutionContext.HasCurrent)
{
// Prefer the link returned by the server; fall back to a
// WorkflowExecutionStarted link for older servers that don't populate it.
var nexusLink = resp.Link?.ToNexusLink() ?? new Link.Types.WorkflowEvent
Comment thread
jmaeagle99 marked this conversation as resolved.
{
Namespace = req.Namespace,
WorkflowId = req.WorkflowId,
RunId = resp.RunId,
EventRef = new() { EventId = 1, EventType = EventType.WorkflowExecutionStarted },
}.ToNexusLink();
NexusOperationExecutionContext.Current.HandlerContext.OutboundLinks.Add(nexusLink);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signal-with-start drops Nexus links

Medium Severity

Outbound Nexus links for child workflow starts are only appended after a plain StartWorkflowExecution call. The SignalWithStartWorkflowExecution path returns without the same logic, so Nexus handlers that start workflows with StartSignal no longer populate OutboundLinks on the operation start response.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8520a4b. Configure here.

return new WorkflowHandle<TWorkflow, TResult>(
Client: Client,
Id: req.WorkflowId,
Expand Down
51 changes: 48 additions & 3 deletions src/Temporalio/Nexus/ITemporalNexusClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public interface ITemporalNexusClient
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <typeparam name="TResult">Workflow result type.</typeparam>
/// <param name="workflowRunCall">Invocation of workflow run method with a result.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <param name="options">Start workflow options. ID is required; TaskQueue defaults to
/// the operation's task queue when omitted.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TWorkflow, TResult>(
Expression<Func<TWorkflow, Task<TResult>>> workflowRunCall, WorkflowOptions options);
Expand All @@ -56,7 +57,8 @@ Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TWorkflow, TResult>(
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <param name="workflowRunCall">Invocation of workflow run method with no result.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <param name="options">Start workflow options. ID is required; TaskQueue defaults to
/// the operation's task queue when omitted.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
Task<TemporalOperationResult<NoValue>> StartWorkflowAsync<TWorkflow>(
Expression<Func<TWorkflow, Task>> workflowRunCall, WorkflowOptions options);
Expand All @@ -68,9 +70,52 @@ Task<TemporalOperationResult<NoValue>> StartWorkflowAsync<TWorkflow>(
/// <typeparam name="TResult">Workflow result type.</typeparam>
/// <param name="workflow">Workflow type name.</param>
/// <param name="args">Arguments for the workflow.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <param name="options">Start workflow options. ID is required; TaskQueue defaults to
/// the operation's task queue when omitted.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TResult>(
string workflow, IReadOnlyCollection<object?> args, WorkflowOptions options);

/// <summary>
/// Schedule a standalone activity via a lambda invoking the activity method. Always returns
/// an async result with an activity-execution operation token.
/// </summary>
/// <typeparam name="TResult">Activity result type.</typeparam>
/// <param name="activityCall">Invocation of activity method with a result.</param>
/// <param name="options">Activity start options. Id is required and should be derived
/// deterministically from the operation input so retries of the Nexus start request are
/// idempotent. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is also
/// required. TaskQueue defaults to the operation's task queue when omitted.</param>
/// <returns>An async operation result containing the activity-execution token.</returns>
Task<TemporalOperationResult<TResult>> StartActivityAsync<TResult>(
Expression<Func<Task<TResult>>> activityCall, StartActivityOptions options);

/// <summary>
/// Schedule a standalone activity via a lambda invoking the activity method with no return
/// value. Always returns an async result with an activity-execution operation token.
/// </summary>
/// <param name="activityCall">Invocation of activity method with no result.</param>
/// <param name="options">Activity start options. Id is required and should be derived
/// deterministically from the operation input so retries of the Nexus start request are
/// idempotent. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is also
/// required. TaskQueue defaults to the operation's task queue when omitted.</param>
/// <returns>An async operation result containing the activity-execution token.</returns>
Task<TemporalOperationResult<NoValue>> StartActivityAsync(
Expression<Func<Task>> activityCall, StartActivityOptions options);

/// <summary>
/// Schedule a standalone activity by name. Always returns an async result with an
/// activity-execution operation token.
/// </summary>
/// <typeparam name="TResult">Activity result type.</typeparam>
/// <param name="activity">Activity type name.</param>
/// <param name="args">Arguments for the activity.</param>
/// <param name="options">Activity start options. Id is required and should be derived
/// deterministically from the operation input so retries of the Nexus start request are
/// idempotent. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is also
/// required. TaskQueue defaults to the operation's task queue when omitted.</param>
/// <returns>An async operation result containing the activity-execution token.</returns>
Task<TemporalOperationResult<TResult>> StartActivityAsync<TResult>(
string activity, IReadOnlyCollection<object?> args, StartActivityOptions options);
}
}
95 changes: 95 additions & 0 deletions src/Temporalio/Nexus/NexusActivityExecutionToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Temporalio.Nexus
{
/// <summary>
/// Internal helper for building and parsing activity-execution operation tokens used by the
/// generic <see cref="TemporalOperationHandler"/> when an operation is backed by a standalone
/// activity.
/// </summary>
internal static class NexusActivityExecutionToken
{
/// <summary>
/// Token-type value identifying an activity-execution operation token.
/// </summary>
internal const int OperationTokenType = 2;

/// <summary>
/// Build a base64url-encoded activity-execution operation token.
/// </summary>
/// <param name="namespace_">Activity namespace.</param>
/// <param name="activityId">Activity ID.</param>
/// <param name="runId">Activity run ID. May be <c>null</c> when building the token used in
/// the completion-callback header (which is sent before the run ID is known).</param>
/// <returns>Base64url-encoded operation token.</returns>
internal static string Create(string namespace_, string activityId, string? runId) =>
NexusWorkflowRunHandle.Base64UrlEncode(JsonSerializer.SerializeToUtf8Bytes(
new Token(namespace_, activityId, runId, null),
NexusWorkflowRunHandle.TokenSerializerOptions));

/// <summary>
/// Parse an activity-execution operation token into its underlying fields.
/// </summary>
/// <param name="token">Base64url-encoded token string.</param>
/// <returns>Parsed token fields.</returns>
/// <exception cref="ArgumentException">If the token is invalid.</exception>
internal static Token Parse(string token)
{
byte[] bytes;
try
{
bytes = NexusWorkflowRunHandle.Base64UrlDecode(token);
}
catch (FormatException)
{
throw new ArgumentException("Token invalid");
}
Token? tokenObj;
try
{
tokenObj = JsonSerializer.Deserialize<Token>(
bytes, NexusWorkflowRunHandle.TokenSerializerOptions);
}
catch (JsonException e)
{
throw new ArgumentException("Token invalid", e);
}
if (tokenObj == null)
{
throw new ArgumentException("Token invalid");
}
if (tokenObj.Type != OperationTokenType)
{
throw new ArgumentException(
$"Invalid activity execution token type: {tokenObj.Type}, " +
$"expected: {OperationTokenType}");
}
if (tokenObj.Version != null && tokenObj.Version != 0)
{
throw new ArgumentException($"Unsupported token version: {tokenObj.Version}");
}
if (string.IsNullOrEmpty(tokenObj.ActivityId))
{
throw new ArgumentException("Token invalid: missing activity ID (aid)");
}
return tokenObj;
}

/// <summary>
/// Represents the fields of an activity-execution operation token.
/// </summary>
internal record Token(
[property: JsonPropertyName("ns")]
string Namespace,
[property: JsonPropertyName("aid")]
string ActivityId,
[property: JsonPropertyName("rid")]
string? RunId,
[property: JsonPropertyName("v")]
int? Version,
[property: JsonPropertyName("t")]
int Type = OperationTokenType);
}
}
74 changes: 74 additions & 0 deletions src/Temporalio/Nexus/NexusActivityStartHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using NexusRpc.Handlers;
using Temporalio.Client;

namespace Temporalio.Nexus
{
/// <summary>
/// Internal helper for starting standalone activities from Nexus operations and managing
/// activity-execution operation tokens.
/// </summary>
internal static class NexusActivityStartHelper
{
/// <summary>
/// Start a standalone activity and return the activity-execution operation token. This
/// handles all Nexus plumbing: cloning options, defaulting task queue / ID, processing
/// links, injecting callbacks, and adding outbound links.
/// </summary>
/// <param name="client">Temporal client.</param>
/// <param name="nexusStartContext">Nexus start context for callbacks and links.</param>
/// <param name="temporalContext">Temporal operation context for info and logging.</param>
/// <param name="activity">Activity type name.</param>
/// <param name="args">Activity arguments.</param>
/// <param name="options">Activity start options. Either ScheduleToCloseTimeout or
/// StartToCloseTimeout must be set; TaskQueue defaults to the operation's task queue.</param>
/// <returns>Base64url-encoded operation token.</returns>
internal static async Task<string> StartActivityAsync(
ITemporalClient client,
OperationStartContext nexusStartContext,
NexusOperationExecutionContext temporalContext,
string activity,
IReadOnlyCollection<object?> args,
StartActivityOptions options)
{
// Shallow clone so we can mutate
options = (StartActivityOptions)options.Clone();
options.TaskQueue ??= temporalContext.Info.TaskQueue;

var namespace_ = client.Options.Namespace;
var activityId = options.Id!;

// Build the callback-header token without a run ID (we don't have it yet).
var callbackToken = NexusActivityExecutionToken.Create(namespace_, activityId, runId: null);

if (options.IdConflictPolicy == Api.Enums.V1.ActivityIdConflictPolicy.UseExisting)
{
options.OnConflictOptions = new()
{
AttachLinks = true,
AttachCompletionCallbacks = true,
AttachRequestId = true,
};
}
if (NexusOperationStartHelper.CreateInboundLinks(
nexusStartContext, temporalContext) is { } links)
{
options.Links = links;
}
if (NexusOperationStartHelper.CreateCallback(
nexusStartContext, callbackToken, options.Links) is { } callback)
{
options.CompletionCallbacks = new[] { callback };
}
options.RequestId = nexusStartContext.RequestId;

// Do the start call
var handle = await client.StartActivityAsync(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Go implementation, we check for timeout presence to raise a handler error and surface the validation error. Is that necessary here as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK already does some of this validation, Started a discussion in slack on where we want to do the validation.

activity, args, options).ConfigureAwait(false);
Comment thread
Quinn-With-Two-Ns marked this conversation as resolved.

// Return a token that includes the run ID from the start response.
return NexusActivityExecutionToken.Create(namespace_, activityId, handle.RunId);
}
}
}
Loading
Loading