Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/5bdfbea6-1f68-4d45-b380-4b705ed18fe2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "AWS.Messaging",
"Type": "Minor",
"ChangelogMessages": [
"Added subscriber middleware with optional error handler to override result or retry execution."
]
}
]
}
6 changes: 6 additions & 0 deletions AWS.Messaging.lutconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<LUTConfig Version="1.0">
<Repository />
<ParallelBuilds>true</ParallelBuilds>
<ParallelTestRuns>true</ParallelTestRuns>
<TestCaseTimeout>180000</TestCaseTimeout>
</LUTConfig>
16 changes: 16 additions & 0 deletions sampleapps/SubscriberService/Middleware/SampleMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging;

namespace SubscriberService.Middleware;

public class SampleMiddleware : IMiddleware
{
public Task<MessageProcessStatus> InvokeAsync<T>(MessageEnvelope<T> messageEnvelope, RequestDelegate next, CancellationToken token = default)
{
// This middleware does not do anything, but exists to demonstrate how to implement a middleware

return next();
}
}
3 changes: 3 additions & 0 deletions sampleapps/SubscriberService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using SubscriberService.MessageHandlers;
using SubscriberService.Middleware;
using SubscriberService.Models;

await Host.CreateDefaultBuilder(args)
Expand All @@ -34,6 +35,8 @@ await Host.CreateDefaultBuilder(args)
if (string.IsNullOrEmpty(mpfQueueUrl))
throw new InvalidOperationException("Missing required configuration parameter 'AWS:Resources:MPFQueueUrl'.");

builder.AddMiddleware<SampleMiddleware>();

builder.AddSQSPoller(mpfQueueUrl);
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>("chatMessage");

Expand Down
11 changes: 11 additions & 0 deletions src/AWS.Messaging/Configuration/IMessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public interface IMessageBusBuilder
IMessageBusBuilder AddMessageHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] THandler, TMessage>(string? messageTypeIdentifier = null)
where THandler : IMessageHandler<TMessage>;

/// <summary>
/// Adds a middleware to the subscriber message bus pipeline.
/// </summary>
/// <remarks>
/// Middleware will be executed in the order in which it is added.
/// </remarks>
/// <typeparam name="TMiddleware">The type that implements <see cref="IMiddleware"/></typeparam>
/// <param name="serviceLifetime">The lifetime of the middleware.</param>
IMessageBusBuilder AddMiddleware<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
where TMiddleware : class, IMiddleware;

/// <summary>
/// Adds an SQS queue to poll for messages.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/AWS.Messaging/Configuration/IMessageConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public interface IMessageConfiguration
/// <returns>The <see cref="SubscriberMapping"/> containing routing info for the specified message type.</returns>
SubscriberMapping? GetSubscriberMapping(string messageTypeIdentifier);

/// <summary>
/// Maps the middleware types to be used in order of execution.
/// </summary>
IList<SubscriberMiddleware> SubscriberMiddleware { get; }

/// <summary>
/// List of configurations for subscriber to poll for messages from an AWS service endpoint.
/// </summary>
Expand Down
64 changes: 56 additions & 8 deletions src/AWS.Messaging/Configuration/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reflection;
using AWS.Messaging.Configuration.Internal;
using AWS.Messaging.Publishers;
using AWS.Messaging.Publishers.EventBridge;
Expand All @@ -12,13 +15,11 @@
using AWS.Messaging.Services.Backoff;
using AWS.Messaging.Services.Backoff.Policies;
using AWS.Messaging.Telemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;

namespace AWS.Messaging.Configuration;

Expand Down Expand Up @@ -99,16 +100,33 @@ private IMessageBusBuilder AddPublisher([DynamicallyAccessedMembers(DynamicallyA
public IMessageBusBuilder AddMessageHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] THandler, TMessage>(string? messageTypeIdentifier = null)
where THandler : IMessageHandler<TMessage>
{
return AddMessageHandler(typeof(THandler), typeof(TMessage), () => new MessageEnvelope<TMessage>(), messageTypeIdentifier);
var subscriberMapping = SubscriberMapping.Create<THandler, TMessage>(messageTypeIdentifier);
_messageConfiguration.SubscriberMappings.Add(subscriberMapping);
return this;
}

private IMessageBusBuilder AddMessageHandler([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, string? messageTypeIdentifier = null)
private IMessageBusBuilder AddMessageHandler([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, HandlerInvokerDelegate handlerInvokerDelegate, string? messageTypeIdentifier = null)
{
var subscriberMapping = new SubscriberMapping(handlerType, messageType, envelopeFactory, messageTypeIdentifier);
var subscriberMapping = new SubscriberMapping(handlerType, messageType, envelopeFactory, handlerInvokerDelegate, messageTypeIdentifier);
_messageConfiguration.SubscriberMappings.Add(subscriberMapping);
return this;
}

public IMessageBusBuilder AddMessageErrorHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] T>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
where T: IMessageErrorHandler
{
AddAdditionalService(new ServiceDescriptor(typeof(IMessageErrorHandler), typeof(T), serviceLifetime));
return this;
}

public IMessageBusBuilder AddMiddleware<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
where TMiddleware : class, IMiddleware
{
var subscriberMiddleware = SubscriberMiddleware.Create<TMiddleware>(serviceLifetime);
_messageConfiguration.SubscriberMiddleware.Add(subscriberMiddleware);
return this;
}

/// <inheritdoc/>
public IMessageBusBuilder AddSQSPoller(string queueUrl, Action<SQSMessagePollerOptions>? options = null)
{
Expand Down Expand Up @@ -223,12 +241,13 @@ public IMessageBusBuilder LoadConfigurationFromSettings(IConfiguration configura
var handlerType = GetTypeFromAssemblies(callingAssembly, messageHandler.HandlerType)
?? throw new InvalidAppSettingsConfigurationException($"Unable to find the provided message handler type '{messageHandler.HandlerType}'.");

var messageEnvelopeType = typeof(MessageEnvelope<>).MakeGenericType(messageType);

// This func is not Native AOT compatible but the method in general is marked
// as not being Native AOT compatible due to loading dynamic types. So this
// func not being Native AOT compatible is okay.
MessageEnvelope envelopeFactory()
{
var messageEnvelopeType = typeof(MessageEnvelope<>).MakeGenericType(messageType);
var envelope = Activator.CreateInstance(messageEnvelopeType);
if (envelope == null || envelope is not MessageEnvelope)
{
Expand All @@ -238,7 +257,8 @@ MessageEnvelope envelopeFactory()
return (MessageEnvelope)envelope;
}

AddMessageHandler(handlerType, messageType, envelopeFactory, messageHandler.MessageTypeIdentifier);
var handlerInoker = BuildHandlerInvoker(messageType, messageEnvelopeType);
AddMessageHandler(handlerType, messageType, envelopeFactory, handlerInoker, messageHandler.MessageTypeIdentifier);
}
}

Expand Down Expand Up @@ -283,6 +303,29 @@ MessageEnvelope envelopeFactory()
}

return this;

// This is not Native AOT compatible but the method in general is marked
// as not being Native AOT compatible due to loading dynamic types. So this
// func not being Native AOT compatible is okay.
static HandlerInvokerDelegate BuildHandlerInvoker(Type messageType, Type messageEnvelopeType)
{
var invokerParam = Expression.Parameter(typeof(HandlerInvoker), "invoker");
var envelopeParam = Expression.Parameter(typeof(MessageEnvelope), "envelope");
var mappingParam = Expression.Parameter(typeof(SubscriberMapping), "mapping");
var tokenParam = Expression.Parameter(typeof(CancellationToken), "token");

// invoker.InvokeAsync<T>( (MessageEnvelope<T>) envelope, mapping, token )
var genericMethodDef = typeof(HandlerInvoker)
.GetMethods(BindingFlags.Public | BindingFlags.Instance)
.First(m => m.Name == nameof(HandlerInvoker.InvokeAsync) && m.IsGenericMethodDefinition)
.GetGenericMethodDefinition();

var closedMethod = genericMethodDef.MakeGenericMethod(messageType);
var typedEnvelope = Expression.Convert(envelopeParam, messageEnvelopeType);
var call = Expression.Call(invokerParam, closedMethod, typedEnvelope, mappingParam, tokenParam);
var lambda = Expression.Lambda<HandlerInvokerDelegate>(call, invokerParam, envelopeParam, mappingParam, tokenParam);
return lambda.Compile();
}
}

[RequiresUnreferencedCode("This method requires loading types dynamically as defined in the configuration system.")]
Expand Down Expand Up @@ -378,6 +421,11 @@ internal void Build()
{
_serviceCollection.TryAddScoped(subscriberMapping.HandlerType);
}

foreach (var subscriberMiddleware in _messageConfiguration.SubscriberMiddleware)
{
_serviceCollection.TryAdd(new ServiceDescriptor(subscriberMiddleware.Type, subscriberMiddleware.Type, subscriberMiddleware.ServiceLifetime));
}
}

if (_messageConfiguration.MessagePollerConfigurations.Any())
Expand Down
3 changes: 3 additions & 0 deletions src/AWS.Messaging/Configuration/MessageConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class MessageConfiguration : IMessageConfiguration
return subscriberMapping;
}

/// <inheritdoc/>
public IList<SubscriberMiddleware> SubscriberMiddleware { get; } = new List<SubscriberMiddleware>();

/// <inheritdoc/>
public IList<IMessagePollerConfiguration> MessagePollerConfigurations { get; set; } = new List<IMessagePollerConfiguration>();

Expand Down
17 changes: 15 additions & 2 deletions src/AWS.Messaging/Configuration/SubscriberMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics.CodeAnalysis;
using AWS.Messaging.Services;

namespace AWS.Messaging.Configuration;

Expand All @@ -17,6 +18,9 @@ public class SubscriberMapping
/// <inheritdoc/>
public Type MessageType { get; }

/// <inheritdoc/>
public HandlerInvokerDelegate HandlerInvoker { get; }

/// <inheritdoc/>
public string MessageTypeIdentifier { get; }

Expand All @@ -32,9 +36,10 @@ public class SubscriberMapping
/// <param name="handlerType">The type that implements <see cref="IMessageHandler{T}"/></param>
/// <param name="messageType">The type that will be message data will deserialized into</param>
/// <param name="envelopeFactory">Func for creating <see cref="MessageEnvelope{messageType}"/></param>
/// <param name="handlerInvoker">Delegate to invoke handler of <see cref="MessageType"/>.</param>
/// <param name="messageTypeIdentifier">Optional message type identifier. If not set the full name of the <see cref="MessageType"/> is used.</param>

internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, string? messageTypeIdentifier = null)
internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, HandlerInvokerDelegate handlerInvoker, string? messageTypeIdentifier = null)
{
HandlerType = handlerType;
MessageType = messageType;
Expand All @@ -44,6 +49,7 @@ internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMember
messageType.FullName ?? throw new InvalidMessageTypeException("Unable to retrieve the Full Name of the provided Message Type.");

MessageEnvelopeFactory = envelopeFactory;
HandlerInvoker = handlerInvoker;
}

/// <summary>
Expand All @@ -61,6 +67,13 @@ static MessageEnvelope<TMessage> envelopeFactory()
return new MessageEnvelope<TMessage>();
}

return new SubscriberMapping(typeof(THandler), typeof(TMessage), envelopeFactory, messageTypeIdentifier);
static Task<MessageProcessStatus> handlerInvoker(HandlerInvoker invoker, MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default)
{
return invoker.InvokeAsync((MessageEnvelope<TMessage>)messageEnvelope, subscriberMapping, token);
}

return new SubscriberMapping(typeof(THandler), typeof(TMessage), envelopeFactory, handlerInvoker, messageTypeIdentifier);
}
}

public delegate Task<MessageProcessStatus> HandlerInvokerDelegate(HandlerInvoker invoker, MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default);
47 changes: 47 additions & 0 deletions src/AWS.Messaging/Configuration/SubscriberMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;

namespace AWS.Messaging.Configuration;

/// <summary>
/// Tracks the <see cref="IMiddleware"/> to be processed by the <see cref="Services.IHandlerInvoker"/> implementation and its <see cref="ServiceLifetime"/>.
/// </summary>
public class SubscriberMiddleware
{
/// <summary>
/// Constructs an instance of <see cref="SubscriberMiddleware"/>
/// </summary>
/// <param name="type">The type that implements <see cref="IMiddleware"/>.</param>
/// <param name="serviceLifetime">The lifetime of the middleware.</param>
internal SubscriberMiddleware([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type type, ServiceLifetime serviceLifetime)
{
Type = type;
ServiceLifetime = serviceLifetime;
}

/// <summary>
/// Type that implements <see cref="IMiddleware"/>.
/// </summary>
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)]
public Type Type { get; }

/// <summary>
/// Service lifetime of the middleware.
/// </summary>
public ServiceLifetime ServiceLifetime { get; }

/// <summary>
/// Creates a SubscriberMiddleware from the generic parameters for the middleware.
/// </summary>
/// <typeparam name="TMiddleware">The type that implements <see cref="IMiddleware"/></typeparam>
/// <param name="serviceLifetime">The lifetime of the middleware.</param>
/// <returns><see cref="SubscriberMapping"/></returns>
public static SubscriberMiddleware Create<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
where TMiddleware : class, IMiddleware
{
return new SubscriberMiddleware(typeof(TMiddleware), serviceLifetime);
}
}
35 changes: 35 additions & 0 deletions src/AWS.Messaging/IMessageErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging;

public interface IMessageErrorHandler
{
/// <summary>
/// Handles errors that occur during message processing.
/// </summary>
/// <param name="messageEnvelope">The message being processed.</param>
/// <param name="exception"><see cref="Exception"/> raised while processing message.</param>
/// <param name="attempts">Number of attempts made at processing this message</param>
/// <param name="token"><see cref="CancellationToken"/></param>
/// <returns><see cref="MessageErrorHandlerResponse"/></returns>
public ValueTask<MessageErrorHandlerResponse> OnHandleError<T>(MessageEnvelope<T> messageEnvelope, Exception exception, int attempts, CancellationToken token);
}

public enum MessageErrorHandlerResponse
{
/// <summary>
/// Failed response.
/// </summary>
Failed,

/// <summary>
/// Retry the message processing in the same process.
/// </summary>
Retry,

/// <summary>
/// Success response.
/// </summary>
Success
}
30 changes: 30 additions & 0 deletions src/AWS.Messaging/IMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging;

/// <summary>
/// This interface is implemented by the users of this library for each layer of middleware that should be processed.
/// </summary>
public interface IMiddleware
{
/// <summary>
/// Processes a message through the middleware pipeline, invoking the next middleware or the message handler.
/// </summary>
/// <param name="messageEnvelope">The message read from the message source wrapped around a message envelope containing message metadata.</param>
/// <param name="next">Delegate to execute the next layer of middleware. When no further middleware remains, the delegate will execute the message handler.</param>
/// <param name="token">The optional cancellation token.</param>
/// <returns>
/// The status of the processed message. For example whether the message was successfully processed.
/// Default implementations should return the result returned from the next delegate.
/// </returns>
Task<MessageProcessStatus> InvokeAsync<T>(MessageEnvelope<T> messageEnvelope, RequestDelegate next, CancellationToken token = default);
}

/// <summary>
/// The delegate used to invoke the next middleware layer or the message handler.
/// </summary>
/// <returns>
/// The status of the processed message. For example whether the message was successfully processed.
/// </returns>
public delegate Task<MessageProcessStatus> RequestDelegate();
Loading