Skip to content

Commit 8d8bce4

Browse files
committed
Add UnknownConfigure function for durable periodic jobs
Here, add an `UnknownConfigure` function to the pilot for use with rescuing and reconfiguring unknown durable periodic jobs. Full explanation in the counterpart pull request.
1 parent edec589 commit 8d8bce4

16 files changed

Lines changed: 383 additions & 299 deletions

client.go

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -900,11 +901,28 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
900901
}
901902

902903
{
904+
unknownConfigureFunc := client.pilot.PeriodicJobUnknownConfigure()
905+
903906
periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
904907
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
905908
Insert: client.insertMany,
906909
Pilot: client.pilot,
907910
Schema: config.Schema,
911+
UnknownConfigure: func(job *riverpilot.PeriodicJob) *maintenance.UnknownConfigureResult {
912+
if unknownConfigureFunc == nil {
913+
return nil
914+
}
915+
916+
unknownConfigureRes := unknownConfigureFunc(job)
917+
918+
return &maintenance.UnknownConfigureResult{
919+
JobConstructor: func() (*rivertype.JobInsertParams, error) {
920+
jobArgs, insertOpts := unknownConfigureRes.JobConstructor()
921+
return insertParamsFromConfigArgsAndOptions(archetype, config, jobArgs, insertOpts)
922+
},
923+
Schedule: unknownConfigureRes.Schedule.Next,
924+
}
925+
},
908926
}, driver.GetExecutor())
909927
if err != nil {
910928
return nil, err
@@ -1510,6 +1528,14 @@ func (c *Client[TTx]) ID() string {
15101528
return c.config.ID
15111529
}
15121530

1531+
// Regular expression to which the format of tags must comply. Mainly, no
1532+
// special characters, and with hyphens in the middle.
1533+
//
1534+
// A key property here (in case this is relaxed in the future) is that commas
1535+
// must never be allowed because they're used as a delimiter during batch job
1536+
// insertion for the `riverdatabasesql` driver.
1537+
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)
1538+
15131539
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*rivertype.JobInsertParams, error) {
15141540
encodedArgs, err := json.Marshal(args)
15151541
if err != nil {
@@ -1562,11 +1588,11 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
15621588
var uniqueOpts UniqueOpts
15631589
if !config.Test.DisableUniqueEnforcement {
15641590
uniqueOpts = insertOpts.UniqueOpts
1565-
if uniqueOpts.isEmpty() {
1591+
if uniqueOptsIsEmpty(&uniqueOpts) {
15661592
uniqueOpts = jobInsertOpts.UniqueOpts
15671593
}
15681594
}
1569-
if err := uniqueOpts.validate(); err != nil {
1595+
if err := uniqueOptsValidate(&uniqueOpts); err != nil {
15701596
return nil, err
15711597
}
15721598

@@ -1587,7 +1613,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
15871613
State: rivertype.JobStateAvailable,
15881614
Tags: tags,
15891615
}
1590-
if !uniqueOpts.isEmpty() {
1616+
if !uniqueOptsIsEmpty(&uniqueOpts) {
15911617
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
15921618
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
15931619
if err != nil {
@@ -2709,3 +2735,70 @@ func defaultClientIDWithHost(startedAt time.Time, host string) string {
27092735

27102736
return host + "_" + strings.Replace(startedAt.Format(rfc3339Compact), ".", "_", 1)
27112737
}
2738+
2739+
// uniqueOptsIsEmpty returns true for an empty, uninitialized options struct.
2740+
//
2741+
// This is required because we can't check against `UniqueOpts{}` because slices
2742+
// aren't comparable. Unfortunately it makes things a little more brittle
2743+
// comparatively because any new options must also be considered here for things
2744+
// to work.
2745+
//
2746+
// This is an unexported function in `river` so that it doesn't have
2747+
// to be exported from `rivertype` and doesn't become part of the public API.
2748+
func uniqueOptsIsEmpty(opts *rivertype.UniqueOpts) bool {
2749+
return !opts.ByArgs &&
2750+
opts.ByPeriod == time.Duration(0) &&
2751+
!opts.ByQueue &&
2752+
opts.ByState == nil
2753+
}
2754+
2755+
var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
2756+
2757+
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
2758+
rivertype.JobStateAvailable,
2759+
rivertype.JobStatePending,
2760+
rivertype.JobStateRunning,
2761+
rivertype.JobStateScheduled,
2762+
}
2763+
2764+
// uniqueOptsValidate validates the given rivertype.UniqueOpts.
2765+
//
2766+
// This is a function instance of an instance function so that it doesn't have
2767+
// to be exported from `rivertype` and doesn't become part of the public API.
2768+
func uniqueOptsValidate(opts *rivertype.UniqueOpts) error {
2769+
if uniqueOptsIsEmpty(opts) {
2770+
return nil
2771+
}
2772+
2773+
if opts.ByPeriod != time.Duration(0) && opts.ByPeriod < 1*time.Second {
2774+
return errors.New("UniqueOpts.ByPeriod should not be less than 1 second")
2775+
}
2776+
2777+
// Job states are typed, but since the underlying type is a string, users
2778+
// can put anything they want in there.
2779+
for _, state := range opts.ByState {
2780+
// This could be turned to a map lookup, but last I checked the speed
2781+
// difference for tiny slice sizes is negligible, and map lookup might
2782+
// even be slower.
2783+
if !slices.Contains(jobStateAll, state) {
2784+
return fmt.Errorf("UniqueOpts.ByState contains invalid state %q", state)
2785+
}
2786+
}
2787+
2788+
// Skip required states validation if no custom states were provided.
2789+
if len(opts.ByState) == 0 {
2790+
return nil
2791+
}
2792+
2793+
var missingStates []string
2794+
for _, state := range requiredV3states {
2795+
if !slices.Contains(opts.ByState, state) {
2796+
missingStates = append(missingStates, string(state))
2797+
}
2798+
}
2799+
if len(missingStates) > 0 {
2800+
return fmt.Errorf("UniqueOpts.ByState must contain all required states, missing: %s", strings.Join(missingStates, ", "))
2801+
}
2802+
2803+
return nil
2804+
}

client_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8173,3 +8173,81 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook {
81738173
func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil }
81748174

81758175
func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil }
8176+
8177+
func TestTagRE(t *testing.T) {
8178+
t.Parallel()
8179+
8180+
require.Regexp(t, tagRE, "aaa")
8181+
require.Regexp(t, tagRE, "_aaa")
8182+
require.Regexp(t, tagRE, "aaa_")
8183+
require.Regexp(t, tagRE, "777")
8184+
require.Regexp(t, tagRE, "my-tag")
8185+
require.Regexp(t, tagRE, "my_tag")
8186+
require.Regexp(t, tagRE, "my-longer-tag")
8187+
require.Regexp(t, tagRE, "my_longer_tag")
8188+
require.Regexp(t, tagRE, "My_Capitalized_Tag")
8189+
require.Regexp(t, tagRE, "ALL_CAPS")
8190+
require.Regexp(t, tagRE, "1_2_3")
8191+
8192+
require.NotRegexp(t, tagRE, "a")
8193+
require.NotRegexp(t, tagRE, "aa")
8194+
require.NotRegexp(t, tagRE, "-aaa")
8195+
require.NotRegexp(t, tagRE, "aaa-")
8196+
require.NotRegexp(t, tagRE, "special@characters$banned")
8197+
require.NotRegexp(t, tagRE, "commas,never,allowed")
8198+
}
8199+
8200+
func TestUniqueOptsIsEmpty(t *testing.T) {
8201+
t.Parallel()
8202+
8203+
require.True(t, uniqueOptsIsEmpty(&UniqueOpts{}))
8204+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByArgs: true}))
8205+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByPeriod: 1 * time.Nanosecond}))
8206+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByQueue: true}))
8207+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateAvailable}}))
8208+
}
8209+
8210+
func TestUniqueOptsValidate(t *testing.T) {
8211+
t.Parallel()
8212+
8213+
require.NoError(t, uniqueOptsValidate(&UniqueOpts{}))
8214+
require.NoError(t, uniqueOptsValidate(&UniqueOpts{
8215+
ByArgs: true,
8216+
ByPeriod: 1 * time.Second,
8217+
ByQueue: true,
8218+
}))
8219+
8220+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByPeriod: 1 * time.Millisecond}), "UniqueOpts.ByPeriod should not be less than 1 second")
8221+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobState("invalid")}}), `UniqueOpts.ByState contains invalid state "invalid"`)
8222+
8223+
requiredStates := []rivertype.JobState{
8224+
rivertype.JobStateAvailable,
8225+
rivertype.JobStatePending,
8226+
rivertype.JobStateRunning,
8227+
rivertype.JobStateScheduled,
8228+
}
8229+
8230+
for _, state := range requiredStates {
8231+
// Test with each state individually removed from requiredStates to ensure
8232+
// it's validated.
8233+
8234+
// Create a copy of requiredStates without the current state
8235+
var testStates []rivertype.JobState
8236+
for _, s := range requiredStates {
8237+
if s != state {
8238+
testStates = append(testStates, s)
8239+
}
8240+
}
8241+
8242+
// Test validation
8243+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByState: testStates}), "UniqueOpts.ByState must contain all required states, missing: "+string(state))
8244+
}
8245+
8246+
// test with more than one required state missing:
8247+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByState: []rivertype.JobState{
8248+
rivertype.JobStateAvailable,
8249+
rivertype.JobStateScheduled,
8250+
}}), "UniqueOpts.ByState must contain all required states, missing: pending, running")
8251+
8252+
require.NoError(t, uniqueOptsValidate(&UniqueOpts{ByState: rivertype.JobStates()}))
8253+
}

insert_opts_test.go

Lines changed: 0 additions & 78 deletions
This file was deleted.

0 commit comments

Comments
 (0)