Skip to content

Commit 8f91ed8

Browse files
committed
Subscriber middleware
1 parent aa9ef1f commit 8f91ed8

File tree

15 files changed

+458
-23
lines changed

15 files changed

+458
-23
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"Projects": [
3+
{
4+
"Name": "AWS.Messaging",
5+
"Type": "Minor",
6+
"ChangelogMessages": [
7+
"Added subscriber middleware"
8+
]
9+
}
10+
]
11+
}

AWS.Messaging.lutconfig

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<LUTConfig Version="1.0">
2+
<Repository />
3+
<ParallelBuilds>true</ParallelBuilds>
4+
<ParallelTestRuns>true</ParallelTestRuns>
5+
<TestCaseTimeout>180000</TestCaseTimeout>
6+
</LUTConfig>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using AWS.Messaging;
5+
6+
namespace SubscriberService.Middleware;
7+
8+
public class SampleMiddleware : IMiddleware
9+
{
10+
public Task<MessageProcessStatus> InvokeAsync<T>(MessageEnvelope<T> messageEnvelope, RequestDelegate next, CancellationToken token = default)
11+
{
12+
// This middleware does not do anything, but exists to demonstrate how to implement a middleware
13+
14+
return next();
15+
}
16+
}

sampleapps/SubscriberService/Program.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using OpenTelemetry.Resources;
1212
using OpenTelemetry.Trace;
1313
using SubscriberService.MessageHandlers;
14+
using SubscriberService.Middleware;
1415
using SubscriberService.Models;
1516

1617
await Host.CreateDefaultBuilder(args)
@@ -34,6 +35,8 @@ await Host.CreateDefaultBuilder(args)
3435
if (string.IsNullOrEmpty(mpfQueueUrl))
3536
throw new InvalidOperationException("Missing required configuration parameter 'AWS:Resources:MPFQueueUrl'.");
3637

38+
builder.AddMiddleware<SampleMiddleware>();
39+
3740
builder.AddSQSPoller(mpfQueueUrl);
3841
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>("chatMessage");
3942

src/AWS.Messaging/Configuration/IMessageBusBuilder.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ public interface IMessageBusBuilder
5353
IMessageBusBuilder AddMessageHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] THandler, TMessage>(string? messageTypeIdentifier = null)
5454
where THandler : IMessageHandler<TMessage>;
5555

56+
/// <summary>
57+
/// Adds a middleware to the subscriber message bus pipeline.
58+
/// </summary>
59+
/// <remarks>
60+
/// Middleware will be executed in the order in which it is added.
61+
/// </remarks>
62+
/// <typeparam name="TMiddleware">The type that implements <see cref="IMiddleware"/></typeparam>
63+
/// <param name="serviceLifetime">The lifetime of the middleware.</param>
64+
IMessageBusBuilder AddMiddleware<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
65+
where TMiddleware : class, IMiddleware;
66+
5667
/// <summary>
5768
/// Adds an SQS queue to poll for messages.
5869
/// </summary>

src/AWS.Messaging/Configuration/IMessageConfiguration.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public interface IMessageConfiguration
4646
/// <returns>The <see cref="SubscriberMapping"/> containing routing info for the specified message type.</returns>
4747
SubscriberMapping? GetSubscriberMapping(string messageTypeIdentifier);
4848

49+
/// <summary>
50+
/// Maps the middleware types to be used in order of execution.
51+
/// </summary>
52+
IList<SubscriberMiddleware> SubscriberMiddleware { get; }
53+
4954
/// <summary>
5055
/// List of configurations for subscriber to poll for messages from an AWS service endpoint.
5156
/// </summary>

src/AWS.Messaging/Configuration/MessageBusBuilder.cs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,26 @@ private IMessageBusBuilder AddPublisher([DynamicallyAccessedMembers(DynamicallyA
9999
public IMessageBusBuilder AddMessageHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] THandler, TMessage>(string? messageTypeIdentifier = null)
100100
where THandler : IMessageHandler<TMessage>
101101
{
102-
return AddMessageHandler(typeof(THandler), typeof(TMessage), () => new MessageEnvelope<TMessage>(), messageTypeIdentifier);
102+
var subscriberMapping = SubscriberMapping.Create<THandler, TMessage>(messageTypeIdentifier);
103+
_messageConfiguration.SubscriberMappings.Add(subscriberMapping);
104+
return this;
103105
}
104106

105-
private IMessageBusBuilder AddMessageHandler([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, string? messageTypeIdentifier = null)
107+
private IMessageBusBuilder AddMessageHandler([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, MethodInfo middlewareInvokeAsyncMethodInfo, string? messageTypeIdentifier = null)
106108
{
107-
var subscriberMapping = new SubscriberMapping(handlerType, messageType, envelopeFactory, messageTypeIdentifier);
109+
var subscriberMapping = new SubscriberMapping(handlerType, messageType, envelopeFactory, middlewareInvokeAsyncMethodInfo, messageTypeIdentifier);
108110
_messageConfiguration.SubscriberMappings.Add(subscriberMapping);
109111
return this;
110112
}
111113

114+
public IMessageBusBuilder AddMiddleware<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
115+
where TMiddleware : class, IMiddleware
116+
{
117+
var subscriberMiddleware = SubscriberMiddleware.Create<TMiddleware>(serviceLifetime);
118+
_messageConfiguration.SubscriberMiddleware.Add(subscriberMiddleware);
119+
return this;
120+
}
121+
112122
/// <inheritdoc/>
113123
public IMessageBusBuilder AddSQSPoller(string queueUrl, Action<SQSMessagePollerOptions>? options = null)
114124
{
@@ -216,6 +226,11 @@ public IMessageBusBuilder LoadConfigurationFromSettings(IConfiguration configura
216226

217227
if (settings.MessageHandlers != null)
218228
{
229+
// This is not Native AOT compatible but the method in general is marked
230+
// as not being Native AOT compatible due to loading dynamic types. So this
231+
// not being Native AOT compatible is okay.
232+
var middlewareInvokeMethod = typeof(IMiddleware).GetMethod(nameof(IMiddleware.InvokeAsync))!;
233+
219234
foreach (var messageHandler in settings.MessageHandlers)
220235
{
221236
var messageType = GetTypeFromAssemblies(callingAssembly, messageHandler.MessageType)
@@ -238,7 +253,12 @@ MessageEnvelope envelopeFactory()
238253
return (MessageEnvelope)envelope;
239254
}
240255

241-
AddMessageHandler(handlerType, messageType, envelopeFactory, messageHandler.MessageTypeIdentifier);
256+
// MakeGenericMethod is not Native AOT compatible but the method in general is marked
257+
// as not being Native AOT compatible due to loading dynamic types. So this
258+
// method not being Native AOT compatible is okay.
259+
var middlewareInvokeAsyncMethodInfo = middlewareInvokeMethod.MakeGenericMethod(messageType);
260+
261+
AddMessageHandler(handlerType, messageType, envelopeFactory, middlewareInvokeAsyncMethodInfo, messageHandler.MessageTypeIdentifier);
242262
}
243263
}
244264

@@ -378,6 +398,11 @@ internal void Build()
378398
{
379399
_serviceCollection.TryAddScoped(subscriberMapping.HandlerType);
380400
}
401+
402+
foreach (var subscriberMiddleware in _messageConfiguration.SubscriberMiddleware)
403+
{
404+
_serviceCollection.TryAdd(new ServiceDescriptor(subscriberMiddleware.Type, subscriberMiddleware.Type, subscriberMiddleware.ServiceLifetime));
405+
}
381406
}
382407

383408
if (_messageConfiguration.MessagePollerConfigurations.Any())

src/AWS.Messaging/Configuration/MessageConfiguration.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public class MessageConfiguration : IMessageConfiguration
3939
return subscriberMapping;
4040
}
4141

42+
/// <inheritdoc/>
43+
public IList<SubscriberMiddleware> SubscriberMiddleware { get; } = new List<SubscriberMiddleware>();
44+
4245
/// <inheritdoc/>
4346
public IList<IMessagePollerConfiguration> MessagePollerConfigurations { get; set; } = new List<IMessagePollerConfiguration>();
4447

src/AWS.Messaging/Configuration/SubscriberMapping.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
using System.Diagnostics.CodeAnalysis;
5+
using System.Reflection;
56

67
namespace AWS.Messaging.Configuration;
78

@@ -17,6 +18,9 @@ public class SubscriberMapping
1718
/// <inheritdoc/>
1819
public Type MessageType { get; }
1920

21+
/// <inheritdoc/>
22+
public MethodInfo MiddlewareInvokeAsyncMethodInfo { get; }
23+
2024
/// <inheritdoc/>
2125
public string MessageTypeIdentifier { get; }
2226

@@ -32,9 +36,10 @@ public class SubscriberMapping
3236
/// <param name="handlerType">The type that implements <see cref="IMessageHandler{T}"/></param>
3337
/// <param name="messageType">The type that will be message data will deserialized into</param>
3438
/// <param name="envelopeFactory">Func for creating <see cref="MessageEnvelope{messageType}"/></param>
39+
/// <param name="middlewareInvokeAsyncMethodInfo"><see cref="MethodInfo"/> to invoke middleware of <see cref="MessageType"/>.</param>
3540
/// <param name="messageTypeIdentifier">Optional message type identifier. If not set the full name of the <see cref="MessageType"/> is used.</param>
3641

37-
internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, string? messageTypeIdentifier = null)
42+
internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, MethodInfo middlewareInvokeAsyncMethodInfo, string? messageTypeIdentifier = null)
3843
{
3944
HandlerType = handlerType;
4045
MessageType = messageType;
@@ -44,6 +49,7 @@ internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMember
4449
messageType.FullName ?? throw new InvalidMessageTypeException("Unable to retrieve the Full Name of the provided Message Type.");
4550

4651
MessageEnvelopeFactory = envelopeFactory;
52+
MiddlewareInvokeAsyncMethodInfo = middlewareInvokeAsyncMethodInfo;
4753
}
4854

4955
/// <summary>
@@ -61,6 +67,19 @@ static MessageEnvelope<TMessage> envelopeFactory()
6167
return new MessageEnvelope<TMessage>();
6268
}
6369

64-
return new SubscriberMapping(typeof(THandler), typeof(TMessage), envelopeFactory, messageTypeIdentifier);
70+
return new SubscriberMapping(typeof(THandler), typeof(TMessage), envelopeFactory, MiddlewareMethodCache<TMessage>.InvokeAsyncMethod, messageTypeIdentifier);
71+
}
72+
73+
private static class MiddlewareMethodCache<T>
74+
{
75+
public static readonly MethodInfo InvokeAsyncMethod = MiddlewareMethod.OpenGenericInvokeAsyncMethod.MakeGenericMethod(typeof(T));
76+
}
77+
78+
private static class MiddlewareMethod
79+
{
80+
// resolved once, globally
81+
public static readonly MethodInfo OpenGenericInvokeAsyncMethod = typeof(IMiddleware)
82+
.GetMethods()
83+
.First(m => m.Name == nameof(IMiddleware.InvokeAsync) && m.IsGenericMethod);
6584
}
6685
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Diagnostics.CodeAnalysis;
5+
using Microsoft.Extensions.DependencyInjection;
6+
7+
namespace AWS.Messaging.Configuration;
8+
9+
/// <summary>
10+
/// Tracks the <see cref="IMiddleware"/> to be processed by the <see cref="Services.IHandlerInvoker"/> implementation and its <see cref="ServiceLifetime"/>.
11+
/// </summary>
12+
public class SubscriberMiddleware
13+
{
14+
/// <summary>
15+
/// Constructs an instance of <see cref="SubscriberMiddleware"/>
16+
/// </summary>
17+
/// <param name="type">The type that implements <see cref="IMiddleware"/>.</param>
18+
/// <param name="serviceLifetime">The lifetime of the middleware.</param>
19+
internal SubscriberMiddleware([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type type, ServiceLifetime serviceLifetime)
20+
{
21+
Type = type;
22+
ServiceLifetime = serviceLifetime;
23+
}
24+
25+
/// <summary>
26+
/// Type that implements <see cref="IMiddleware"/>.
27+
/// </summary>
28+
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)]
29+
public Type Type { get; }
30+
31+
/// <summary>
32+
/// Service lifetime of the middleware.
33+
/// </summary>
34+
public ServiceLifetime ServiceLifetime { get; }
35+
36+
/// <summary>
37+
/// Creates a SubscriberMiddleware from the generic parameters for the middleware.
38+
/// </summary>
39+
/// <typeparam name="TMiddleware">The type that implements <see cref="IMiddleware"/></typeparam>
40+
/// <param name="serviceLifetime">The lifetime of the middleware.</param>
41+
/// <returns><see cref="SubscriberMapping"/></returns>
42+
public static SubscriberMiddleware Create<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
43+
where TMiddleware : class, IMiddleware
44+
{
45+
return new SubscriberMiddleware(typeof(TMiddleware), serviceLifetime);
46+
}
47+
}

0 commit comments

Comments
 (0)