Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
### Fixed

- Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245).
- Ensure jobs that return a custom timeout of -1 (no timeout) are never rescued. [PR #1288](https://github.com/riverqueue/river/pull/1288).

## [0.39.0] - 2026-06-03

Expand Down
3 changes: 2 additions & 1 deletion internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
}

if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
timeout := workUnit.Timeout()
if timeout < 0 || timeout > 0 && now.Sub(*job.AttemptedAt) < timeout {
return jobRetryDecisionIgnore, time.Time{}
}

Expand Down
9 changes: 9 additions & 0 deletions internal/maintenance/job_rescuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestJobRescuer(t *testing.T) {
const (
rescuerJobKind = "rescuer"
rescuerJobKindLongTimeout = "rescuer_long_timeout"
rescuerJobKindNoTimeout = "rescuer_no_timeout"
)

type testBundle struct {
Expand Down Expand Up @@ -95,6 +96,8 @@ func TestJobRescuer(t *testing.T) {
return &callbackWorkUnitFactory{Callback: emptyCallback}
case rescuerJobKindLongTimeout:
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute}
case rescuerJobKindNoTimeout:
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: -1}
}
panic("unhandled kind: " + kind)
},
Expand Down Expand Up @@ -163,6 +166,8 @@ func TestJobRescuer(t *testing.T) {
longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})

noTimeoutJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindNoTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-24 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})

require.NoError(t, rescuer.Start(ctx))

rescuer.TestSignals.FetchedBatch.WaitOrTimeout()
Expand Down Expand Up @@ -226,6 +231,10 @@ func TestJobRescuer(t *testing.T) {
notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: longTimeOutJob2.ID, Schema: rescuer.Config.Schema})
require.NoError(t, err)
require.Equal(t, rivertype.JobStateRetryable, notTimedOutJob2After.State)

noTimeoutJobAfter, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: noTimeoutJob.ID, Schema: rescuer.Config.Schema})
require.NoError(t, err)
require.Equal(t, rivertype.JobStateRunning, noTimeoutJobAfter.State)
})

t.Run("RescuesInBatches", func(t *testing.T) {
Expand Down
Loading