Skip to content

Commit de67cde

Browse files
committed
add JobCancel to Pilot, add Pilot test coverage
1 parent 6184b2e commit de67cde

4 files changed

Lines changed: 325 additions & 1 deletion

File tree

client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1428,7 +1428,7 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri
14281428
}
14291429

14301430
func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) {
1431-
return exec.JobCancel(ctx, &riverdriver.JobCancelParams{
1431+
return c.pilot.JobCancel(ctx, exec, &riverdriver.JobCancelParams{
14321432
ID: jobID,
14331433
CancelAttemptedAt: c.baseService.Time.NowUTC(),
14341434
ControlTopic: string(notifier.NotificationTopicControl),

client_pilot_test.go

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
package river
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/jackc/pgx/v5"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/riverqueue/river/riverdbtest"
14+
"github.com/riverqueue/river/riverdriver"
15+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
16+
"github.com/riverqueue/river/rivershared/baseservice"
17+
"github.com/riverqueue/river/rivershared/riverpilot"
18+
"github.com/riverqueue/river/rivershared/riversharedtest"
19+
"github.com/riverqueue/river/rivershared/util/testutil"
20+
"github.com/riverqueue/river/rivertype"
21+
)
22+
23+
type pilotSpy struct {
24+
riverpilot.StandardPilot
25+
26+
jobCancelCalls atomic.Int64
27+
jobCleanerQueuesExcludedCalls atomic.Int64
28+
jobGetAvailableCalls atomic.Int64
29+
jobInsertManyCalls atomic.Int64
30+
jobRetryCalls atomic.Int64
31+
jobSetStateIfRunningManyCalls atomic.Int64
32+
periodicJobGetAllCalls atomic.Int64
33+
periodicJobKeepAliveCalls atomic.Int64
34+
periodicJobUpsertManyCalls atomic.Int64
35+
pilotInitCalls atomic.Int64
36+
producerInitCalls atomic.Int64
37+
producerKeepAliveCalls atomic.Int64
38+
producerShutdownCalls atomic.Int64
39+
queueMetadataChangedCalls atomic.Int64
40+
41+
jobGetAvailableCh chan struct{}
42+
jobSetStateIfRunningManyCh chan struct{}
43+
periodicJobGetAllCh chan struct{}
44+
periodicJobKeepAliveCh chan struct{}
45+
periodicJobUpsertManyCh chan struct{}
46+
pilotInitCh chan struct{}
47+
producerInitCh chan struct{}
48+
producerKeepAliveCh chan struct{}
49+
producerShutdownCh chan struct{}
50+
queueMetadataChangedCh chan struct{}
51+
}
52+
53+
func newPilotSpy() *pilotSpy {
54+
return &pilotSpy{
55+
jobGetAvailableCh: make(chan struct{}, 1),
56+
jobSetStateIfRunningManyCh: make(chan struct{}, 1),
57+
periodicJobGetAllCh: make(chan struct{}, 1),
58+
periodicJobKeepAliveCh: make(chan struct{}, 1),
59+
periodicJobUpsertManyCh: make(chan struct{}, 1),
60+
pilotInitCh: make(chan struct{}, 1),
61+
producerInitCh: make(chan struct{}, 1),
62+
producerKeepAliveCh: make(chan struct{}, 1),
63+
producerShutdownCh: make(chan struct{}, 1),
64+
queueMetadataChangedCh: make(chan struct{}, 1),
65+
}
66+
}
67+
68+
func (p *pilotSpy) signal(ch chan struct{}) {
69+
select {
70+
case ch <- struct{}{}:
71+
default:
72+
}
73+
}
74+
75+
func (p *pilotSpy) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
76+
p.jobCancelCalls.Add(1)
77+
return p.StandardPilot.JobCancel(ctx, exec, params)
78+
}
79+
80+
func (p *pilotSpy) JobCleanerQueuesExcluded() []string {
81+
p.jobCleanerQueuesExcludedCalls.Add(1)
82+
return p.StandardPilot.JobCleanerQueuesExcluded()
83+
}
84+
85+
func (p *pilotSpy) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state riverpilot.ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
86+
p.jobGetAvailableCalls.Add(1)
87+
p.signal(p.jobGetAvailableCh)
88+
return p.StandardPilot.JobGetAvailable(ctx, exec, state, params)
89+
}
90+
91+
func (p *pilotSpy) JobInsertMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) {
92+
p.jobInsertManyCalls.Add(1)
93+
return p.StandardPilot.JobInsertMany(ctx, exec, params)
94+
}
95+
96+
func (p *pilotSpy) JobRetry(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error) {
97+
p.jobRetryCalls.Add(1)
98+
return p.StandardPilot.JobRetry(ctx, exec, params)
99+
}
100+
101+
func (p *pilotSpy) JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
102+
p.jobSetStateIfRunningManyCalls.Add(1)
103+
p.signal(p.jobSetStateIfRunningManyCh)
104+
return p.StandardPilot.JobSetStateIfRunningMany(ctx, exec, params)
105+
}
106+
107+
func (p *pilotSpy) PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) {
108+
p.periodicJobGetAllCalls.Add(1)
109+
p.signal(p.periodicJobGetAllCh)
110+
return p.StandardPilot.PeriodicJobGetAll(ctx, exec, params)
111+
}
112+
113+
func (p *pilotSpy) PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) {
114+
p.periodicJobKeepAliveCalls.Add(1)
115+
p.signal(p.periodicJobKeepAliveCh)
116+
return p.StandardPilot.PeriodicJobKeepAliveAndReap(ctx, exec, params)
117+
}
118+
119+
func (p *pilotSpy) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) {
120+
p.periodicJobUpsertManyCalls.Add(1)
121+
p.signal(p.periodicJobUpsertManyCh)
122+
return p.StandardPilot.PeriodicJobUpsertMany(ctx, exec, params)
123+
}
124+
125+
func (p *pilotSpy) PilotInit(archetype *baseservice.Archetype, params *riverpilot.PilotInitParams) {
126+
p.pilotInitCalls.Add(1)
127+
p.signal(p.pilotInitCh)
128+
p.StandardPilot.PilotInit(archetype, params)
129+
}
130+
131+
func (p *pilotSpy) ProducerInit(ctx context.Context, exec riverdriver.Executor, params *riverpilot.ProducerInitParams) (int64, riverpilot.ProducerState, error) {
132+
p.producerInitCalls.Add(1)
133+
p.signal(p.producerInitCh)
134+
return p.StandardPilot.ProducerInit(ctx, exec, params)
135+
}
136+
137+
func (p *pilotSpy) ProducerKeepAlive(ctx context.Context, exec riverdriver.Executor, params *riverdriver.ProducerKeepAliveParams) error {
138+
p.producerKeepAliveCalls.Add(1)
139+
p.signal(p.producerKeepAliveCh)
140+
return p.StandardPilot.ProducerKeepAlive(ctx, exec, params)
141+
}
142+
143+
func (p *pilotSpy) ProducerShutdown(ctx context.Context, exec riverdriver.Executor, params *riverpilot.ProducerShutdownParams) error {
144+
p.producerShutdownCalls.Add(1)
145+
p.signal(p.producerShutdownCh)
146+
return p.StandardPilot.ProducerShutdown(ctx, exec, params)
147+
}
148+
149+
func (p *pilotSpy) QueueMetadataChanged(ctx context.Context, exec riverdriver.Executor, params *riverpilot.QueueMetadataChangedParams) error {
150+
p.queueMetadataChangedCalls.Add(1)
151+
p.signal(p.queueMetadataChangedCh)
152+
return p.StandardPilot.QueueMetadataChanged(ctx, exec, params)
153+
}
154+
155+
func Test_Client_PilotUsage(t *testing.T) {
156+
t.Parallel()
157+
158+
ctx := context.Background()
159+
160+
setup := func(t *testing.T, configMutate func(*Config)) (*Client[pgx.Tx], *pilotSpy) {
161+
t.Helper()
162+
163+
var (
164+
dbPool = riversharedtest.DBPool(ctx, t)
165+
driver = riverpgxv5.New(dbPool)
166+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
167+
config = newTestConfig(t, schema)
168+
)
169+
170+
if configMutate != nil {
171+
configMutate(config)
172+
}
173+
174+
pilot := newPilotSpy()
175+
pluginDriver := newDriverWithPlugin(t, dbPool)
176+
pluginDriver.pilot = pilot
177+
178+
client, err := NewClient(pluginDriver, config)
179+
require.NoError(t, err)
180+
181+
return client, pilot
182+
}
183+
184+
t.Run("InitUsesPilot", func(t *testing.T) {
185+
client, pilot := setup(t, nil)
186+
require.NotNil(t, client)
187+
require.Equal(t, int64(1), pilot.jobCleanerQueuesExcludedCalls.Load())
188+
require.Equal(t, int64(1), pilot.pilotInitCalls.Load())
189+
})
190+
191+
t.Run("JobInsertManyUsesPilot", func(t *testing.T) {
192+
client, pilot := setup(t, nil)
193+
194+
_, err := client.Insert(ctx, noOpArgs{}, nil)
195+
require.NoError(t, err)
196+
require.Equal(t, int64(1), pilot.jobInsertManyCalls.Load())
197+
})
198+
199+
t.Run("JobCancelUsesPilot", func(t *testing.T) {
200+
client, pilot := setup(t, nil)
201+
202+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
203+
ScheduledAt: time.Now().Add(5 * time.Minute),
204+
})
205+
require.NoError(t, err)
206+
207+
_, err = client.JobCancel(ctx, insertRes.Job.ID)
208+
require.NoError(t, err)
209+
require.Equal(t, int64(1), pilot.jobCancelCalls.Load())
210+
})
211+
212+
t.Run("JobRetryUsesPilot", func(t *testing.T) {
213+
client, pilot := setup(t, nil)
214+
215+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
216+
ScheduledAt: time.Now().Add(5 * time.Minute),
217+
})
218+
require.NoError(t, err)
219+
220+
_, err = client.JobRetry(ctx, insertRes.Job.ID)
221+
require.NoError(t, err)
222+
require.Equal(t, int64(1), pilot.jobRetryCalls.Load())
223+
})
224+
225+
t.Run("JobRetryTxUsesPilot", func(t *testing.T) {
226+
client, pilot := setup(t, nil)
227+
228+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
229+
ScheduledAt: time.Now().Add(5 * time.Minute),
230+
})
231+
require.NoError(t, err)
232+
233+
exec := client.Driver().GetExecutor()
234+
execTx, err := exec.Begin(ctx)
235+
require.NoError(t, err)
236+
237+
committed := false
238+
t.Cleanup(func() {
239+
if !committed {
240+
_ = execTx.Rollback(ctx)
241+
}
242+
})
243+
244+
tx := client.Driver().UnwrapTx(execTx)
245+
246+
_, err = client.JobRetryTx(ctx, tx, insertRes.Job.ID)
247+
require.NoError(t, err)
248+
require.Equal(t, int64(1), pilot.jobRetryCalls.Load())
249+
250+
require.NoError(t, execTx.Commit(ctx))
251+
committed = true
252+
})
253+
254+
t.Run("PeriodicJobsUsePilot", func(t *testing.T) {
255+
client, pilot := setup(t, func(config *Config) {
256+
config.PeriodicJobs = []*PeriodicJob{
257+
NewPeriodicJob(PeriodicInterval(time.Second), func() (JobArgs, *InsertOpts) {
258+
return noOpArgs{}, nil
259+
}, &PeriodicJobOpts{
260+
ID: "pilot_periodic_job",
261+
RunOnStart: true,
262+
}),
263+
}
264+
})
265+
266+
client.testSignals.Init(t)
267+
268+
startClient(ctx, t, client)
269+
client.testSignals.electedLeader.WaitOrTimeout()
270+
271+
riversharedtest.WaitOrTimeout(t, pilot.periodicJobGetAllCh)
272+
riversharedtest.WaitOrTimeout(t, pilot.periodicJobUpsertManyCh)
273+
riversharedtest.WaitOrTimeout(t, pilot.periodicJobKeepAliveCh)
274+
})
275+
276+
t.Run("ProducerAndCompleterUsePilot", func(t *testing.T) {
277+
client, pilot := setup(t, nil)
278+
279+
jobDone := make(chan struct{})
280+
281+
type JobArgs struct {
282+
testutil.JobArgsReflectKind[JobArgs]
283+
}
284+
285+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
286+
close(jobDone)
287+
return nil
288+
}))
289+
290+
require.NoError(t, client.Start(ctx))
291+
292+
stopOnce := sync.Once{}
293+
stopClient := func() {
294+
stopOnce.Do(func() {
295+
stopCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
296+
defer cancel()
297+
require.NoError(t, client.Stop(stopCtx))
298+
})
299+
}
300+
t.Cleanup(stopClient)
301+
302+
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
303+
require.NoError(t, err)
304+
305+
riversharedtest.WaitOrTimeout(t, jobDone)
306+
require.NotZero(t, insertRes.Job.ID)
307+
308+
riversharedtest.WaitOrTimeout(t, pilot.jobGetAvailableCh)
309+
riversharedtest.WaitOrTimeout(t, pilot.jobSetStateIfRunningManyCh)
310+
riversharedtest.WaitOrTimeout(t, pilot.producerInitCh)
311+
riversharedtest.WaitOrTimeout(t, pilot.producerKeepAliveCh)
312+
riversharedtest.WaitOrTimeout(t, pilot.queueMetadataChangedCh)
313+
314+
stopClient()
315+
316+
riversharedtest.WaitOrTimeout(t, pilot.producerShutdownCh)
317+
})
318+
}

rivershared/riverpilot/pilot.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
type Pilot interface {
2020
PilotPeriodicJob
2121

22+
JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error)
23+
2224
// JobCleanerQueuesExcluded returns queues that should be excluded from the
2325
// main River client's JobCleaner. If no queues should be omitted, this
2426
// function should return nil as opposed to an empty array. (Because the

rivershared/riverpilot/standard_pilot.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
2222
return exec.JobGetAvailable(ctx, params)
2323
}
2424

25+
func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
26+
return exec.JobCancel(ctx, params)
27+
}
28+
2529
func (p *StandardPilot) JobInsertMany(
2630
ctx context.Context,
2731
exec riverdriver.Executor,

0 commit comments

Comments
 (0)