diff --git a/v2/processor.go b/v2/processor.go index 5927685..a170871 100644 --- a/v2/processor.go +++ b/v2/processor.go @@ -72,7 +72,9 @@ type ProcessorOptions struct { // MaxReceiveCount is the maximum number of messages to receive at a time. This value is passed to the receiver. MaxReceiveCount int - // ReceiveInterval is the interval between each receive call. + // ReceiveInterval is the interval between each receive call when there is enough available concurrency. + // If we receive less than the number of messages requested, the next receive is called immediately + // without waiting for the ReceiveInterval. ReceiveInterval *time.Duration // StartMaxAttempt is the maximum number of attempts to start the processor. @@ -213,9 +215,10 @@ func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error { for _, msg := range messages { p.process(ctx, receiverEx, msg) } + receiveInterval := *p.options.ReceiveInterval for ctx.Err() == nil { select { - case <-time.After(*p.options.ReceiveInterval): + case <-time.After(receiveInterval): maxMessages := min(p.options.MaxReceiveCount, p.options.MaxConcurrency-len(p.concurrencyTokens)) if ctx.Err() != nil || maxMessages == 0 { break @@ -229,6 +232,12 @@ func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error { for _, msg := range messages { p.process(ctx, receiverEx, msg) } + // if we received less than maxMessages, the next ReceiveMessages() is called immediately + if len(messages) < maxMessages { + receiveInterval = 0 + } else { + receiveInterval = *p.options.ReceiveInterval + } case <-ctx.Done(): logger.Info(fmt.Sprintf("context done, stop receiving from processor %s", receiverName)) break