Skip to content

Commit 3ec3ebf

Browse files
committed
Active job rescue based on heartbeat
Wrote up the main description already elsewhere.
1 parent 4feea6a commit 3ec3ebf

15 files changed

Lines changed: 224 additions & 16 deletions

File tree

client_pilot_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type pilotSpy struct {
3434
}
3535

3636
type pilotSpyTestSignals struct {
37+
JobBegin testsignal.TestSignal[int64]
38+
JobEnd testsignal.TestSignal[int64]
3739
JobGetAvailable testsignal.TestSignal[struct{}]
3840
JobSetStateIfRunningMany testsignal.TestSignal[struct{}]
3941
PeriodicJobGetAll testsignal.TestSignal[struct{}]
@@ -47,6 +49,8 @@ type pilotSpyTestSignals struct {
4749
}
4850

4951
func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) {
52+
ts.JobBegin.Init(tb)
53+
ts.JobEnd.Init(tb)
5054
ts.JobGetAvailable.Init(tb)
5155
ts.JobSetStateIfRunningMany.Init(tb)
5256
ts.PeriodicJobGetAll.Init(tb)
@@ -59,6 +63,11 @@ func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) {
5963
ts.QueueMetadataChanged.Init(tb)
6064
}
6165

66+
func (p *pilotSpy) JobBegin(ctx context.Context, job *rivertype.JobRow) {
67+
p.testSignals.JobBegin.Signal(job.ID)
68+
p.StandardPilot.JobBegin(ctx, job)
69+
}
70+
6271
func (p *pilotSpy) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
6372
p.jobCancelCalls.Add(1)
6473
return p.StandardPilot.JobCancel(ctx, exec, params)
@@ -69,6 +78,11 @@ func (p *pilotSpy) JobCleanerQueuesExcluded() []string {
6978
return p.StandardPilot.JobCleanerQueuesExcluded()
7079
}
7180

81+
func (p *pilotSpy) JobEnd(ctx context.Context, job *rivertype.JobRow) {
82+
p.testSignals.JobEnd.Signal(job.ID)
83+
p.StandardPilot.JobEnd(ctx, job)
84+
}
85+
7286
func (p *pilotSpy) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state riverpilot.ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
7387
p.testSignals.JobGetAvailable.Signal(struct{}{})
7488
return p.StandardPilot.JobGetAvailable(ctx, exec, state, params)
@@ -324,6 +338,8 @@ func Test_Client_PilotUsage(t *testing.T) {
324338
riversharedtest.WaitOrTimeout(t, jobDone)
325339
require.NotZero(t, insertRes.Job.ID)
326340

341+
require.Equal(t, insertRes.Job.ID, pilot.testSignals.JobBegin.WaitOrTimeout())
342+
require.Equal(t, insertRes.Job.ID, pilot.testSignals.JobEnd.WaitOrTimeout())
327343
pilot.testSignals.JobGetAvailable.WaitOrTimeout()
328344
pilot.testSignals.JobSetStateIfRunningMany.WaitOrTimeout()
329345
pilot.testSignals.ProducerInit.WaitOrTimeout()

internal/jobexecutor/job_executor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/riverqueue/river/internal/workunit"
2222
"github.com/riverqueue/river/riverdriver"
2323
"github.com/riverqueue/river/rivershared/baseservice"
24+
"github.com/riverqueue/river/rivershared/riverpilot"
2425
"github.com/riverqueue/river/rivertype"
2526
)
2627

@@ -114,6 +115,7 @@ type JobExecutor struct {
114115
HookLookupGlobal hooklookup.HookLookupInterface
115116
JobRow *rivertype.JobRow
116117
MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
118+
Pilot riverpilot.Pilot
117119
ProducerCallbacks struct {
118120
JobDone func(jobRow *rivertype.JobRow)
119121
Stuck func()
@@ -143,7 +145,16 @@ func (e *JobExecutor) Execute(ctx context.Context) {
143145
QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt),
144146
}
145147

148+
if e.Pilot != nil {
149+
e.Pilot.JobBegin(ctx, e.JobRow)
150+
}
151+
146152
res := e.execute(ctx)
153+
154+
if e.Pilot != nil {
155+
e.Pilot.JobEnd(ctx, e.JobRow)
156+
}
157+
147158
if res.Err != nil && errors.Is(context.Cause(ctx), rivertype.ErrJobCancelledRemotely) {
148159
res.Err = context.Cause(ctx)
149160
}

internal/jobexecutor/job_executor_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ func (h *testErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRo
112112
return h.HandlePanicFunc(ctx, job, panicVal, trace)
113113
}
114114

115+
type testPilotWithJobCallbacks struct {
116+
riverpilot.StandardPilot
117+
118+
JobBeginFunc func(ctx context.Context, job *rivertype.JobRow)
119+
JobEndFunc func(ctx context.Context, job *rivertype.JobRow)
120+
}
121+
122+
func (p *testPilotWithJobCallbacks) JobBegin(ctx context.Context, job *rivertype.JobRow) {
123+
if p.JobBeginFunc != nil {
124+
p.JobBeginFunc(ctx, job)
125+
}
126+
}
127+
128+
func (p *testPilotWithJobCallbacks) JobEnd(ctx context.Context, job *rivertype.JobRow) {
129+
if p.JobEndFunc != nil {
130+
p.JobEndFunc(ctx, job)
131+
}
132+
}
133+
115134
func TestJobExecutor_Execute(t *testing.T) {
116135
t.Parallel()
117136

@@ -995,6 +1014,90 @@ func TestJobExecutor_Execute(t *testing.T) {
9951014
require.True(t, bundle.errorHandler.HandlePanicCalled)
9961015
})
9971016

1017+
t.Run("PilotJobCallbacksJobBeginAndEnd", func(t *testing.T) {
1018+
t.Parallel()
1019+
1020+
executor, bundle := setup(t)
1021+
1022+
var events []string
1023+
1024+
executor.Pilot = &testPilotWithJobCallbacks{
1025+
JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) {
1026+
require.Equal(t, bundle.jobRow.ID, job.ID)
1027+
events = append(events, "begin")
1028+
},
1029+
JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) {
1030+
require.Equal(t, bundle.jobRow.ID, job.ID)
1031+
events = append(events, "end")
1032+
},
1033+
}
1034+
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
1035+
events = append(events, "work")
1036+
return nil
1037+
}, nil).MakeUnit(bundle.jobRow)
1038+
1039+
executor.Execute(ctx)
1040+
riversharedtest.WaitOrTimeout(t, bundle.updateCh)
1041+
1042+
require.Equal(t, []string{"begin", "work", "end"}, events)
1043+
})
1044+
1045+
t.Run("PilotJobCallbacksJobEndInvokedOnError", func(t *testing.T) {
1046+
t.Parallel()
1047+
1048+
executor, bundle := setup(t)
1049+
1050+
var events []string
1051+
1052+
executor.Pilot = &testPilotWithJobCallbacks{
1053+
JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) {
1054+
require.Equal(t, bundle.jobRow.ID, job.ID)
1055+
events = append(events, "begin")
1056+
},
1057+
JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) {
1058+
require.Equal(t, bundle.jobRow.ID, job.ID)
1059+
events = append(events, "end")
1060+
},
1061+
}
1062+
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
1063+
events = append(events, "work")
1064+
return errors.New("work failed")
1065+
}, nil).MakeUnit(bundle.jobRow)
1066+
1067+
executor.Execute(ctx)
1068+
riversharedtest.WaitOrTimeout(t, bundle.updateCh)
1069+
1070+
require.Equal(t, []string{"begin", "work", "end"}, events)
1071+
})
1072+
1073+
t.Run("PilotJobCallbacksJobEndInvokedOnPanic", func(t *testing.T) {
1074+
t.Parallel()
1075+
1076+
executor, bundle := setup(t)
1077+
1078+
var events []string
1079+
1080+
executor.Pilot = &testPilotWithJobCallbacks{
1081+
JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) {
1082+
require.Equal(t, bundle.jobRow.ID, job.ID)
1083+
events = append(events, "begin")
1084+
},
1085+
JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) {
1086+
require.Equal(t, bundle.jobRow.ID, job.ID)
1087+
events = append(events, "end")
1088+
},
1089+
}
1090+
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
1091+
events = append(events, "work")
1092+
panic("panic val")
1093+
}, nil).MakeUnit(bundle.jobRow)
1094+
1095+
executor.Execute(ctx)
1096+
riversharedtest.WaitOrTimeout(t, bundle.updateCh)
1097+
1098+
require.Equal(t, []string{"begin", "work", "end"}, events)
1099+
})
1100+
9981101
t.Run("CancelFuncCleanedUpEvenWithoutCancel", func(t *testing.T) {
9991102
t.Parallel()
10001103

internal/maintenance/job_rescuer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func TestJobRescuer(t *testing.T) {
140140

141141
stuckToRetryJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
142142
stuckToRetryJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
143+
stuckToRetryJobWithLastSeen := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), Metadata: fmt.Appendf(nil, `{"%s": %q}`, riversharedmaintenance.MetadataKeyLastSeenAt, bundle.rescueHorizon.Add(-1*time.Minute).UTC().Format(time.RFC3339Nano)), MaxAttempts: ptrutil.Ptr(5)})
143144
stuckToRetryJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued
144145

145146
// Already at max attempts:
@@ -183,6 +184,7 @@ func TestJobRescuer(t *testing.T) {
183184
var err error
184185
confirmRetried(stuckToRetryJob1)
185186
confirmRetried(stuckToRetryJob2)
187+
confirmRetried(stuckToRetryJobWithLastSeen)
186188

187189
job3After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: stuckToRetryJob3.ID, Schema: rescuer.Config.Schema})
188190
require.NoError(t, err)

producer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.
831831
HookLookupGlobal: p.config.HookLookupGlobal,
832832
MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal,
833833
JobRow: job,
834+
Pilot: p.pilot,
834835
ProducerCallbacks: struct {
835836
JobDone func(jobRow *rivertype.JobRow)
836837
Stuck func()

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdrivertest/job_read.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package riverdrivertest
22

33
import (
44
"context"
5+
"fmt"
56
"sort"
67
"strconv"
78
"testing"
@@ -12,6 +13,7 @@ import (
1213

1314
"github.com/riverqueue/river/internal/rivercommon"
1415
"github.com/riverqueue/river/riverdriver"
16+
riversharedmaintenance "github.com/riverqueue/river/rivershared/riversharedmaintenance"
1517
"github.com/riverqueue/river/rivershared/testfactory"
1618
"github.com/riverqueue/river/rivershared/util/ptrutil"
1719
"github.com/riverqueue/river/rivershared/util/sliceutil"
@@ -519,16 +521,22 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx
519521
afterHorizon = horizon.Add(1 * time.Minute)
520522
)
521523

522-
stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
524+
metadataWithLastSeen := func(t time.Time) []byte {
525+
return fmt.Appendf(nil, `{"%s": %q}`, riversharedmaintenance.MetadataKeyLastSeenAt,
526+
t.UTC().Round(time.Millisecond).Format("2006-01-02 15:04:05.999-07:00"))
527+
}
528+
529+
stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Metadata: []byte(`{"other":"value"}`), State: ptrutil.Ptr(rivertype.JobStateRunning)})
523530
stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
531+
stuckJob3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
524532

525533
t.Logf("horizon = %s", horizon)
526534
t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt)
527535
t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt)
528536

529537
t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1))
530538

531-
// Not returned because we put a maximum of two.
539+
// Not returned because we put a maximum of three.
532540
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
533541

534542
// Not stuck because not in running state.
@@ -537,13 +545,19 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx
537545
// Not stuck because after queried horizon.
538546
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
539547

540-
// Max two stuck
548+
// Not stuck because last seen is after queried horizon.
549+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Metadata: metadataWithLastSeen(afterHorizon), State: ptrutil.Ptr(rivertype.JobStateRunning)})
550+
551+
// Not stuck because attempted at is after queried horizon.
552+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, Metadata: metadataWithLastSeen(beforeHorizon), State: ptrutil.Ptr(rivertype.JobStateRunning)})
553+
554+
// Max three stuck.
541555
stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{
542-
Max: 2,
556+
Max: 3,
543557
StuckHorizon: horizon,
544558
})
545559
require.NoError(t, err)
546-
require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID},
560+
require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID, stuckJob3.ID},
547561
sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID }))
548562
})
549563

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,13 @@ ORDER BY id;
258258
SELECT *
259259
FROM /* TEMPLATE: schema */river_job
260260
WHERE state = 'running'
261-
AND attempted_at < @stuck_horizon::timestamptz
261+
-- `last_seen_at` may still be present on a row from its last retry, so make
262+
-- sure we have `max` to take `attempted_at` (set on the latest lock of the
263+
-- job) if it's larger.
264+
AND greatest(
265+
attempted_at,
266+
(metadata->>'river:last_seen_at')::timestamptz
267+
) < @stuck_horizon::timestamptz
262268
ORDER BY id
263269
LIMIT @max;
264270

@@ -726,4 +732,4 @@ SET
726732
metadata = CASE WHEN @metadata_do_update::boolean THEN @metadata::jsonb ELSE metadata END,
727733
state = CASE WHEN @state_do_update::boolean THEN @state::/* TEMPLATE: schema */river_job_state ELSE state END
728734
WHERE id = @id
729-
RETURNING *;
735+
RETURNING *;

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riversqlite/internal/dbsqlc/river_job.sql

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,15 @@ ORDER BY id;
185185
SELECT *
186186
FROM /* TEMPLATE: schema */river_job
187187
WHERE state = 'running'
188-
AND attempted_at < cast(@stuck_horizon AS text)
188+
-- `last_seen_at` may still be present on a row from its last retry, so make
189+
-- sure we have `max` to take `attempted_at` (set on the latest lock of the
190+
-- job) if it's larger.
191+
--
192+
-- `coalesce` is necessary because `max(NULL, ...)` always returns `NULL`.
193+
AND max(
194+
attempted_at,
195+
coalesce(json_extract(metadata, '$."river:last_seen_at"'), attempted_at)
196+
) < cast(@stuck_horizon AS text)
189197
ORDER BY id
190198
LIMIT @max;
191199

@@ -513,4 +521,4 @@ SET
513521
metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END,
514522
state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END
515523
WHERE id = @id
516-
RETURNING *;
524+
RETURNING *;

0 commit comments

Comments
 (0)