From 80efeeef2c8d6e439463eb8d4de2a0b80ecb472c Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 18 Jun 2026 16:38:15 -0600 Subject: [PATCH] Never rescue jobs that return a timeout of -1 Fix a bug in the job rescuer in that jobs with a timeout of -1 (meaning no timeout) should never be rescued, but they were being rescued. Fixes #1287. --- CHANGELOG.md | 1 + internal/maintenance/job_rescuer.go | 3 ++- internal/maintenance/job_rescuer_test.go | 9 +++++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 342cccf6..b9e6dd94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql ### Fixed - Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245). +- Ensure jobs that return a custom timeout of -1 (no timeout) are never rescued. [PR #1288](https://github.com/riverqueue/river/pull/1288). ## [0.39.0] - 2026-06-03 diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index c7353b36..8ddafeb4 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -312,7 +312,8 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) } - if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() { + timeout := workUnit.Timeout() + if timeout < 0 || timeout > 0 && now.Sub(*job.AttemptedAt) < timeout { return jobRetryDecisionIgnore, time.Time{} } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index d1f44ffd..508824ee 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -65,6 +65,7 @@ func TestJobRescuer(t *testing.T) { const ( rescuerJobKind = "rescuer" rescuerJobKindLongTimeout = "rescuer_long_timeout" + rescuerJobKindNoTimeout = "rescuer_no_timeout" ) type testBundle struct { @@ -95,6 +96,8 @@ func TestJobRescuer(t *testing.T) { return &callbackWorkUnitFactory{Callback: emptyCallback} case rescuerJobKindLongTimeout: return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute} + case rescuerJobKindNoTimeout: + return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: -1} } panic("unhandled kind: " + kind) }, @@ -163,6 +166,8 @@ func TestJobRescuer(t *testing.T) { longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + noTimeoutJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindNoTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-24 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + require.NoError(t, rescuer.Start(ctx)) rescuer.TestSignals.FetchedBatch.WaitOrTimeout() @@ -226,6 +231,10 @@ func TestJobRescuer(t *testing.T) { notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: longTimeOutJob2.ID, Schema: rescuer.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateRetryable, notTimedOutJob2After.State) + + noTimeoutJobAfter, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: noTimeoutJob.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRunning, noTimeoutJobAfter.State) }) t.Run("RescuesInBatches", func(t *testing.T) {