Skip to content

Commit 92b9b41

Browse files
authored
Enable Parallel Sending of Delayed Messages (#1753)
* enable parallel sending of the delayed messages * added test cases for enqueueToScheduler method with enabled parallel sending * code style improvement and removed redundant test case * added retries check * use concurrentQueue instead of the list for testing purposes * tests improvements * fixed concurrency issues
1 parent 273fe35 commit 92b9b41

File tree

3 files changed

+103
-21
lines changed

3 files changed

+103
-21
lines changed

src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,28 @@ await Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadC
102102
await foreach (var nextMessage in _schedulerQueue.GetConsumingEnumerable(_tasksCts.Token))
103103
{
104104
_tasksCts.Token.ThrowIfCancellationRequested();
105-
try
105+
106+
if (_enableParallelSend && nextMessage.Retries == 0)
106107
{
107-
var result = await _sender.SendAsync(nextMessage).ConfigureAwait(false);
108-
if (!result.Succeeded)
109-
{
110-
_logger.LogError("Delay message sending failed. MessageId: {MessageId} ", nextMessage.DbId);
111-
}
108+
if (!_publishedChannel.Writer.TryWrite(nextMessage))
109+
while (await _publishedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
110+
if (_publishedChannel.Writer.TryWrite(nextMessage))
111+
break;
112112
}
113-
catch (Exception ex)
113+
else
114114
{
115-
_logger.LogError(ex, "Error sending scheduled message. MessageId: {MessageId}", nextMessage.DbId);
115+
try
116+
{
117+
var result = await _sender.SendAsync(nextMessage).ConfigureAwait(false);
118+
if (!result.Succeeded)
119+
{
120+
_logger.LogError("Delay message sending failed. MessageId: {MessageId} ", nextMessage.DbId);
121+
}
122+
}
123+
catch (Exception ex)
124+
{
125+
_logger.LogError(ex, "Error sending scheduled message. MessageId: {MessageId}", nextMessage.DbId);
126+
}
116127
}
117128
}
118129

test/DotNetCore.CAP.Test/DispatcherTests.cs

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
namespace DotNetCore.CAP.Test;
1818

19-
public class DispatcherTests
19+
public class DispatcherTests
2020
{
2121
private readonly ILogger<Dispatcher> _logger;
2222
private readonly ISubscribeExecutor _executor;
@@ -41,22 +41,22 @@ public async Task EnqueueToPublish_ShouldInvokeSend_WhenParallelSendDisabled()
4141
SubscriberParallelExecuteThreadCount = 2,
4242
SubscriberParallelExecuteBufferFactor = 2
4343
});
44-
44+
4545
var dispatcher = new Dispatcher(_logger, sender, options, _executor, _storage);
4646

4747
using var cts = new CancellationTokenSource();
4848
var messageId = "testId";
49-
49+
5050
// Act
5151
await dispatcher.Start(cts.Token);
5252
await dispatcher.EnqueueToPublish(CreateTestMessage(messageId));
5353
await cts.CancelAsync();
54-
54+
5555
// Assert
5656
sender.Count.Should().Be(1);
5757
sender.ReceivedMessages.First().DbId.Should().Be(messageId);
5858
}
59-
59+
6060
[Fact]
6161
public async Task EnqueueToPublish_ShouldBeThreadSafe_WhenParallelSendDisabled()
6262
{
@@ -150,18 +150,88 @@ public async Task EnqueueToScheduler_ShouldSendMessagesInCorrectOrder_WhenEarlie
150150
// Act
151151
await dispatcher.Start(cts.Token);
152152
var dateTime = DateTime.Now;
153-
153+
154154
await dispatcher.EnqueueToScheduler(messages[0], dateTime.AddSeconds(1));
155155
await dispatcher.EnqueueToScheduler(messages[1], dateTime.AddMilliseconds(200));
156156
await dispatcher.EnqueueToScheduler(messages[2], dateTime.AddMilliseconds(100));
157157

158158
await Task.Delay(1200, CancellationToken.None);
159159
await cts.CancelAsync();
160-
160+
161+
// Assert
162+
sender.ReceivedMessages.Select(m => m.DbId).Should().Equal(["3", "2", "1"]);
163+
}
164+
165+
[Fact]
166+
public async Task EnqueueToScheduler_ShouldBeThreadSafe_WhenDelayLessThenMinuteAndParallelSendEnabled()
167+
{
168+
// Arrange
169+
var sender = new TestThreadSafeMessageSender();
170+
var options = Options.Create(new CapOptions
171+
{
172+
EnableSubscriberParallelExecute = false,
173+
EnablePublishParallelSend = true,
174+
SubscriberParallelExecuteThreadCount = 2,
175+
SubscriberParallelExecuteBufferFactor = 2
176+
});
177+
var dispatcher = new Dispatcher(_logger, sender, options, _executor, _storage);
178+
179+
using var cts = new CancellationTokenSource();
180+
var messages = Enumerable.Range(1, 10000)
181+
.Select(i => CreateTestMessage(i.ToString()))
182+
.ToArray();
183+
184+
// Act
185+
await dispatcher.Start(cts.Token);
186+
var dateTime = DateTime.Now.AddMilliseconds(50);
187+
await Parallel.ForEachAsync(messages, CancellationToken.None,
188+
async (m, ct) => { await dispatcher.EnqueueToScheduler(m, dateTime); });
189+
190+
await Task.Delay(3000, CancellationToken.None);
191+
192+
await cts.CancelAsync();
193+
194+
// Assert
195+
sender.Count.Should().Be(10000);
196+
197+
var receivedMessages = sender.ReceivedMessages.Select(m => m.DbId).Order().ToList();
198+
var expected = messages.Select(m => m.DbId).Order().ToList();
199+
expected.Should().Equal(receivedMessages);
200+
}
201+
202+
[Fact]
203+
public async Task EnqueueToScheduler_ShouldSendMessagesInCorrectOrder_WhenParallelSendEnabled()
204+
{
205+
// Arrange
206+
var sender = new TestThreadSafeMessageSender();
207+
var options = Options.Create(new CapOptions
208+
{
209+
EnableSubscriberParallelExecute = true,
210+
EnablePublishParallelSend = true,
211+
SubscriberParallelExecuteThreadCount = 2,
212+
SubscriberParallelExecuteBufferFactor = 2,
213+
});
214+
var dispatcher = new Dispatcher(_logger, sender, options, _executor, _storage);
215+
216+
using var cts = new CancellationTokenSource();
217+
var messages = Enumerable.Range(1, 3)
218+
.Select(i => CreateTestMessage(i.ToString()))
219+
.ToArray();
220+
221+
// Act
222+
await dispatcher.Start(cts.Token);
223+
var dateTime = DateTime.Now;
224+
225+
await dispatcher.EnqueueToScheduler(messages[0], dateTime.AddSeconds(1));
226+
await dispatcher.EnqueueToScheduler(messages[1], dateTime.AddMilliseconds(200));
227+
await dispatcher.EnqueueToScheduler(messages[2], dateTime.AddMilliseconds(100));
228+
229+
await Task.Delay(1200, CancellationToken.None);
230+
await cts.CancelAsync();
231+
161232
// Assert
162233
sender.ReceivedMessages.Select(m => m.DbId).Should().Equal(["3", "2", "1"]);
163234
}
164-
165235

166236
private MediumMessage CreateTestMessage(string id = "1")
167237
{
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System.Collections.Concurrent;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading.Tasks;
45
using DotNetCore.CAP.Internal;
@@ -8,17 +9,17 @@ namespace DotNetCore.CAP.Test.Helpers;
89

910
public class TestThreadSafeMessageSender : IMessageSender
1011
{
11-
private readonly List<MediumMessage> _messagesInOrder = new();
12+
private readonly ConcurrentQueue<MediumMessage> _messagesInOrder = [];
1213

1314
public Task<OperateResult> SendAsync(MediumMessage message)
14-
{
15+
{
1516
lock (_messagesInOrder)
1617
{
17-
_messagesInOrder.Add(message);
18+
_messagesInOrder.Enqueue(message);
1819
}
1920
return Task.FromResult(OperateResult.Success);
2021
}
21-
22+
2223
public int Count => _messagesInOrder.Count;
2324
public List<MediumMessage> ReceivedMessages => _messagesInOrder.ToList();
2425
}

0 commit comments

Comments
 (0)