-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(persistence): completion callbacks via Defer - simplified alternative to #7937 #7954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(persistence): completion callbacks via Defer - simplified alternative to #7937 #7954
Conversation
Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync.
Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail)
Aaronontheweb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed my changes - pushing up some more test coverage but otherwise this looks good
| [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")] | ||
| namespace Akka.Persistence | ||
| { | ||
| public sealed class AsyncHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marked as internal now
| public override void AroundPreStart() { } | ||
| protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } | ||
| public void DeferAsync<TEvent>(TEvent evt, System.Action<TEvent> handler) { } | ||
| public void DeferAsync<TEvent>(TEvent evt, System.Func<TEvent, System.Threading.Tasks.Task> handler) { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now supports Task
| protected virtual void OnRecoveryFailure(System.Exception reason, object message = null) { } | ||
| protected virtual void OnReplaySuccess() { } | ||
| public void Persist<TEvent>(TEvent @event, System.Action<TEvent> handler) { } | ||
| public void Persist<TEvent>(TEvent @event, System.Func<TEvent, System.Threading.Tasks.Task> handler) { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same sets of overloads as #7937 - we now support Task in all persistence callbacks AND an optional OnComplete handler which can be an Action or a Func<Task>
| _events.Add(evt.Data); | ||
| _completionOrder.Add($"handler:{evt.Data}"); | ||
| }, () => | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dedicated OnComplete handler
| _events.Add(evt.Data); | ||
| _completionOrder.Add($"handler:{evt.Data}"); | ||
| }, async () => | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Async completion handler
| } | ||
|
|
||
| PersistAll(events, handler); | ||
| if (onCompleteAsync != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now compose the onComplete handler to use the appropriate Defer / DeferAsync overload.
| /// <typeparam name="TEvent">The event type.</typeparam> | ||
| /// <param name="evt">The event to pass to the handler.</param> | ||
| /// <param name="handler">The handler to invoke.</param> | ||
| internal void Defer<TEvent>(TEvent evt, Action<TEvent> handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New internal method - Defer differs from DeferAsync in that it is a stashing invocation handler, meaning that it wants to be treated like a Persist / PersistAll call and interleaved with them, rather than running only once all of them have been completed.
| { | ||
| _pendingInvocations.First.Value.Handler(payload); | ||
| // Async handler - run via RunTask | ||
| RunTask(async () => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run async handlers inside RunTask
| // Sync handler - invoke directly | ||
| try | ||
| { | ||
| syncInv.Handler(payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, run other handlers synchronously
| if (m1.ActorInstanceId == _instanceId) | ||
| { | ||
| UpdateLastSequenceNr(m1.Persistent); | ||
| try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved all of the try...catch behavior to inside the PeekApplyHandler method.
…ethods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios
Arkatufus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some concern about this PR, mostly regarding the "in order processing/invocation guarantee" promise
| { | ||
| if (events == null || !events.Any()) | ||
| { | ||
| onComplete?.Invoke(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is quite right, need to think that these Persist, PersistAll, and Defer as "fluent compositing" the method callbacks inside the _pendingInvocation list and they still need to be invoked in sequence. If another async operation was queued before this PersistAll call, the operations would be invoked out of order, which breaks the "in order execution guarantee" promise.
I think this should be
if (events == null || !events.Any())
{
if (onComplete != null)
Defer<object>(null, _ => onComplete());
return;
}
|
|
||
| if (_pendingInvocations.Count == 0) | ||
| { | ||
| RunTask(() => handler(evt)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that invoking this asynchronously in a different thread as a detached task is the right move here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not though - that's how await is run inside the actor's context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. ReceiveAsync is implemented on top of the RunTask handler from the UntypedActor
…maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
Arkatufus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ative to akkadotnet#7937 (akkadotnet#7954) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
…ative to #7937 (#7954) (#7957) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
Summary
This is a simplified alternative implementation to #7937, which adds completion callback and async handler support to Akka.NET persistence.
Fixes #7935
What's Different From #7937?
The Original Approach (#7937)
The original PR embedded completion callbacks directly in invocation objects with an
IsLastInBatchflag to track when the final event handler completes. This required:IsLastInBatchtracking logic to determine when to invoke completion callbacksPeekApplyHandlerto chain completion callbacksThis Simplified Approach
After discussing with @gregorius, we realized that
DeferAsyncalready provides most of what completion callbacks need - it queues a handler to run after all pending invocations complete.The key insight: instead of embedding completion callbacks in invocation objects, we can simply call
Defer/DeferAsyncafterPersistAll/PersistAllAsync:Why This Is Better
IsLastInBatchflagDefer/DeferAsyncCritical Design Detail: Stashing Semantics
There's one subtlety that required a new internal stashing
Defermethod:PersistAll(stashing) must use stashingDeferfor completion callbacksPersistAllAsync(non-stashing) usesDeferAsyncfor completion callbacksUsing
DeferAsyncforPersistAllwould break stashing - commands would unstash before the completion callback runs. The internalDefermethod increments_pendingStashingPersistInvocationsto maintain correct stashing behavior.Changes
New Interfaces
IPendingHandlerInvocation- base interface for handler invocationsISyncHandlerInvocation- sync handler withAction<object> HandlerIAsyncHandlerInvocation- async handler withFunc<object, Task> AsyncHandlerIStashingInvocation- marker interface for stashing invocationsNew Invocation Classes (4 total, vs 8 in #7937)
StashingHandlerInvocation: ISyncHandlerInvocation, IStashingInvocationStashingAsyncHandlerInvocation: IAsyncHandlerInvocation, IStashingInvocationAsyncHandlerInvocation: ISyncHandlerInvocation (non-stashing)AsyncAsyncHandlerInvocation: IAsyncHandlerInvocation (non-stashing)New Public Methods
Persist<TEvent>(TEvent, Func<TEvent, Task>)- async handlerPersistAsync<TEvent>(TEvent, Func<TEvent, Task>)- async handlerPersistAll<TEvent>(events, handler, Action onComplete)- sync completion callbackPersistAll<TEvent>(events, handler, Func<Task> onCompleteAsync)- async completion callbackPersistAll<TEvent>(events, Func<TEvent, Task>, Action)- async handler + sync completionPersistAll<TEvent>(events, Func<TEvent, Task>, Func<Task>)- async handler + async completionPersistAllAsync<TEvent>(events, handler, Action onComplete)- sync completion callbackPersistAllAsync<TEvent>(events, handler, Func<Task> onCompleteAsync)- async completion callbackPersistAllAsync<TEvent>(events, Func<TEvent, Task>, Action)- async handler + sync completionPersistAllAsync<TEvent>(events, Func<TEvent, Task>, Func<Task>)- async handler + async completionDeferAsync<TEvent>(TEvent, Func<TEvent, Task>)- async handlerInternal Methods
Defer<TEvent>(TEvent, Action<TEvent>)- stashing variant (internal)Defer<TEvent>(TEvent, Func<TEvent, Task>)- stashing async variant (internal)Updated Recovery Logic
PeekApplyHandlernow handles both sync and async handlersPersistingEventsusesIStashingInvocationmarker interfaceTest Plan
PersistenceCompletionCallbackSpec.cs- all passingPersistAllPersistAllAsynchandlers