Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ type ProcessorOptions struct {
// MaxReceiveCount is the maximum number of messages to receive at a time. This value is passed to the receiver.
MaxReceiveCount int

// ReceiveInterval is the interval between each receive call.
// ReceiveInterval is the interval between each receive call when there is enough available concurrency.
// If we receive less than the number of messages requested, the next receive is called immediately
// without waiting for the ReceiveInterval.
ReceiveInterval *time.Duration

// StartMaxAttempt is the maximum number of attempts to start the processor.
Expand Down Expand Up @@ -213,9 +215,10 @@ func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error {
for _, msg := range messages {
p.process(ctx, receiverEx, msg)
}
receiveInterval := *p.options.ReceiveInterval
for ctx.Err() == nil {
select {
case <-time.After(*p.options.ReceiveInterval):
case <-time.After(receiveInterval):
maxMessages := min(p.options.MaxReceiveCount, p.options.MaxConcurrency-len(p.concurrencyTokens))
if ctx.Err() != nil || maxMessages == 0 {
break
Expand All @@ -229,6 +232,12 @@ func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error {
for _, msg := range messages {
p.process(ctx, receiverEx, msg)
}
// if we received less than maxMessages, the next ReceiveMessages() is called immediately
if len(messages) < maxMessages {
receiveInterval = 0
} else {
receiveInterval = *p.options.ReceiveInterval
}
case <-ctx.Done():
logger.Info(fmt.Sprintf("context done, stop receiving from processor %s", receiverName))
break
Expand Down
Loading