Skip to content

Commit 273fe35

Browse files
committed
Refactor RabbitMQ consumer logic for clarity and QoS
Refactored `HandleBasicDeliverAsync` in `RabbitMqBasicConsumer` to simplify `safeBody` handling and eliminate redundant variable usage. Updated to pass `body` directly to `BasicDeliverEventArgs` and `TransportMessage`. Enhanced `Listening` method in `RabbitMqConsumerClient` to use `BasicQosOptions` from `_rabbitMqOptions` for more flexible QoS settings. Defaulted `prefetchCount` to `_groupConcurrent` or 1 for better configurability.
1 parent 5e4d37c commit 273fe35

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
4545
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
4646
CancellationToken cancellationToken = default)
4747
{
48-
var safeBody = _usingTaskRun ? body.ToArray() : body;
49-
5048
if (_usingTaskRun)
5149
{
5250
await _semaphore.WaitAsync(cancellationToken);
5351

52+
var safeBody = body.ToArray();
53+
5454
_ = Task.Run(Consume, cancellationToken).ConfigureAwait(false);
5555
}
5656
else
@@ -76,15 +76,15 @@ Task Consume()
7676
if (_customHeadersBuilder != null)
7777
{
7878
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey,
79-
properties, safeBody);
79+
properties, body);
8080
var customHeaders = _customHeadersBuilder(e, _serviceProvider);
8181
foreach (var customHeader in customHeaders)
8282
{
8383
headers[customHeader.Key] = customHeader.Value;
8484
}
8585
}
8686

87-
var message = new TransportMessage(headers, safeBody);
87+
var message = new TransportMessage(headers, body);
8888

8989
return _msgCallback(message, deliveryTag);
9090
}

src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
6060
{
6161
Connect().GetAwaiter().GetResult();
6262

63-
if (_groupConcurrent > 0)
63+
if (_rabbitMqOptions.BasicQosOptions != null)
6464
{
65-
_channel!.BasicQosAsync(prefetchSize: 0, prefetchCount: _groupConcurrent, global: false, cancellationToken).GetAwaiter().GetResult();
65+
_channel!.BasicQosAsync(0, _rabbitMqOptions.BasicQosOptions.PrefetchCount, _rabbitMqOptions.BasicQosOptions.Global, cancellationToken).GetAwaiter().GetResult();
6666
}
67-
else if (_rabbitMqOptions.BasicQosOptions != null)
67+
else
6868
{
69-
_channel!.BasicQosAsync(0, _rabbitMqOptions.BasicQosOptions.PrefetchCount, _rabbitMqOptions.BasicQosOptions.Global, cancellationToken).GetAwaiter().GetResult();
69+
ushort prefetch = _groupConcurrent > 0 ? _groupConcurrent : (ushort)1;
70+
_channel!.BasicQosAsync(prefetchSize: 0, prefetchCount: prefetch, global: false, cancellationToken).GetAwaiter().GetResult();
7071
}
7172

7273
_consumer = new RabbitMqBasicConsumer(_channel!, _groupConcurrent, _queueName, OnMessageCallback!, OnLogCallback!,

0 commit comments

Comments
 (0)