From 8a8ae8cefa6100e42b39157ce67454b84a259c76 Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 23 May 2025 00:20:18 -0700 Subject: [PATCH] Second try to fix producer test intermittency Last week I tried to fix an extraneous producer log on shutdown in #896. Unfortunately it still happens, and worse yet, it actually leads to test failures if the logging happens during an example test where it gets recorded (example [1]). --- FAIL: Example_encryptHook (0.39s) got: Secret message: This message is encrypted in the database, but plaintext in workers. producer: Producer status update, error updating in database want: Secret message: This message is encrypted in the database, but plaintext in workers. FAIL FAIL riverqueue.com/riverpro/riverencrypt 0.556s Looking over this code again, I think the problem was the addition of this cluster of lines: var subroutineWG sync.WaitGroup subroutineWG.Add(3) subroutineCtx, cancelSubroutines := context.WithCancelCause(context.WithoutCancel(fetchCtx)) go p.heartbeatLogLoop(subroutineCtx, &subroutineWG) go p.reportQueueStatusLoop(subroutineCtx, &subroutineWG) go p.reportProducerStatusLoop(subroutineCtx, &subroutineWG) Subroutines had been set up to not produce an error when they notice a start/stop's `startstop.ErrStop` stopping error, but with the new `subroutineCtx` and use of `WithoutCancel`, `ErrStop` will never make it down to those goroutines. They don't know not to log on a context cancellation, so errors are produced. Here, try to address the problem by making sure there's an `ErrStop` in the cancel error: cancelSubroutines(fmt.Errorf("producer stopped: %w", startstop.ErrStop)) The checks to know whether to log an error are using `errors.Is` and `context.Cause`, so this should work: if err != nil && errors.Is(context.Cause(ctx), startstop.ErrStop) { return } [1] https://github.com/riverqueue/riverpro/actions/runs/15047355176/job/42293190805 --- producer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/producer.go b/producer.go index ff59cd81..f2417769 100644 --- a/producer.go +++ b/producer.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "log/slog" "math" "strings" @@ -399,7 +400,7 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { p.executorShutdownLoop() p.Logger.Debug(p.Name+": Shutdown loop exited, awaiting subroutines", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) - cancelSubroutines(errors.New("producer stopped")) + cancelSubroutines(fmt.Errorf("producer stopped: %w", startstop.ErrStop)) subroutineWG.Wait() p.Logger.Debug(p.Name+": Shutdown subroutines completed, finalizing", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load()))