Skip to content

Commit 51abbeb

Browse files
committed
Suppress producer error log on shutdown
I noticed when running some benchmarking tonight that I'd occasionally get an error line at the end even though things seem to finish cleanly: May 13 18:13:36.622 ERR producer: Error fetching queue settings err="context canceled" Looking at the implementation, it looks like this is something that could happen randomly in the poll-only path. If the producer happens to be polling for queue changes as shutdown happens, it'll log an error as the context is cancelled. This is more likely to happen for SQLite because we constrain the connection pool to only one active connection. Here, only produce an error log conditionally in cases where the parent context has not been cancelled.
1 parent d04b82a commit 51abbeb

2 files changed

Lines changed: 14 additions & 12 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
### Fixed
1515

1616
- Resuming an already unpaused queue is now fully an no-op, and won't touch the row's `updated_at` like it (unintentionally) did before. [PR #870](https://github.com/riverqueue/river/pull/870).
17+
- Suppress an error log line from the producer that may occur on normal shutdown when operating in poll-only mode. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).
1718

1819
## [0.22.0] - 2025-05-10
1920

producer.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -797,9 +797,20 @@ func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup
797797
case <-ctx.Done():
798798
return
799799
case <-ticker.C:
800-
updatedQueue, err := p.fetchQueueSettings(ctx)
800+
updatedQueue, err := func() (*rivertype.Queue, error) {
801+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
802+
defer cancel()
803+
804+
return p.exec.QueueGet(ctx, &riverdriver.QueueGetParams{
805+
Name: p.config.Queue,
806+
Schema: p.config.Schema,
807+
})
808+
}()
801809
if err != nil {
802-
p.Logger.ErrorContext(ctx, p.Name+": Error fetching queue settings", slog.String("err", err.Error()))
810+
// Don't log if this is part of a standard shutdown.
811+
if ctx.Err() == nil {
812+
p.Logger.ErrorContext(ctx, p.Name+": Error fetching queue settings", slog.String("err", err.Error()))
813+
}
803814
continue
804815
}
805816

@@ -857,16 +868,6 @@ func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup
857868
}
858869
}
859870

860-
func (p *producer) fetchQueueSettings(ctx context.Context) (*rivertype.Queue, error) {
861-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
862-
defer cancel()
863-
864-
return p.exec.QueueGet(ctx, &riverdriver.QueueGetParams{
865-
Name: p.config.Queue,
866-
Schema: p.config.Schema,
867-
})
868-
}
869-
870871
func (p *producer) reportProducerStatusLoop(ctx context.Context, wg *sync.WaitGroup) {
871872
defer wg.Done()
872873

0 commit comments

Comments
 (0)