Skip to content

Commit e5eed9d

Browse files
authored
Second try to fix producer test intermittency (#921)
Last week I tried to fix an extraneous producer log on shutdown in #896. Unfortunately it still happens, and worse yet, it actually leads to test failures if the logging happens during an example test where it gets recorded (example [1]). --- FAIL: Example_encryptHook (0.39s) got: Secret message: This message is encrypted in the database, but plaintext in workers. producer: Producer status update, error updating in database want: Secret message: This message is encrypted in the database, but plaintext in workers. FAIL FAIL riverqueue.com/riverpro/riverencrypt 0.556s Looking over this code again, I think the problem was the addition of this cluster of lines: var subroutineWG sync.WaitGroup subroutineWG.Add(3) subroutineCtx, cancelSubroutines := context.WithCancelCause(context.WithoutCancel(fetchCtx)) go p.heartbeatLogLoop(subroutineCtx, &subroutineWG) go p.reportQueueStatusLoop(subroutineCtx, &subroutineWG) go p.reportProducerStatusLoop(subroutineCtx, &subroutineWG) Subroutines had been set up to not produce an error when they notice a start/stop's `startstop.ErrStop` stopping error, but with the new `subroutineCtx` and use of `WithoutCancel`, `ErrStop` will never make it down to those goroutines. They don't know not to log on a context cancellation, so errors are produced. Here, try to address the problem by making sure there's an `ErrStop` in the cancel error: cancelSubroutines(fmt.Errorf("producer stopped: %w", startstop.ErrStop)) The checks to know whether to log an error are using `errors.Is` and `context.Cause`, so this should work: if err != nil && errors.Is(context.Cause(ctx), startstop.ErrStop) { return } [1] https://github.com/riverqueue/riverpro/actions/runs/15047355176/job/42293190805
1 parent 63c3a6b commit e5eed9d

1 file changed

Lines changed: 2 additions & 1 deletion

File tree

producer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8+
"fmt"
89
"log/slog"
910
"math"
1011
"strings"
@@ -399,7 +400,7 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error {
399400
p.executorShutdownLoop()
400401

401402
p.Logger.Debug(p.Name+": Shutdown loop exited, awaiting subroutines", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load()))
402-
cancelSubroutines(errors.New("producer stopped"))
403+
cancelSubroutines(fmt.Errorf("producer stopped: %w", startstop.ErrStop))
403404
subroutineWG.Wait()
404405
p.Logger.Debug(p.Name+": Shutdown subroutines completed, finalizing", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load()))
405406

0 commit comments

Comments
 (0)