Skip to content

Commit b52bd2b

Browse files
committed
Add leadership "domains" so multiple Rivers can operate in one schema
We've gotten a couple requests so far (see #342 and #1105) to be able to start multiple River clients targeting different queues within the same database/schema, and giving them the capacity to operate independently enough to be functional. This is currently not possible because a single leader is elected given a single schema and it handles all maintenance operations including non-queue ones like periodic job enqueuing. Here, add the idea of a `LeaderDomain`. This lets a user set the "domain" on which a client will elect its leader and allowing multiple leaders to be elected in a single schema. Each leader will run its own maintenance services. Setting `LeaderDomain` causes the additional effect of having maintenance services start to operate only on the queues that their client is configured for. The idea here is to give us backwards compatibility in that the default behavior (in case of an unset `LeaderDomain`) is the same, but providing a path for multiple leaders to be interoperable with each other. There are still a few edges: for example, reindexing is not queue specific, so multiple leaders could be running a reindexer. I've provided guidance in the config documentation that ideally, all clients but one should have their reindexer disabled.
1 parent ccbe042 commit b52bd2b

37 files changed

Lines changed: 772 additions & 266 deletions

client.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -209,6 +210,40 @@ type Config struct {
209210
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
210211
Hooks []rivertype.Hook
211212

213+
// LeaderDomain is an optional "domain" string to use for leader election.
214+
// Different clients sharing the same River schema can elect multiple
215+
// leaders as long as they're using different domains, with one leader
216+
// elected per domain.
217+
//
218+
// Setting this value also triggers the related behavior that maintenance
219+
// services start to only operate on the queues they're configured on. So
220+
// for example, given client1 handling queueA and queueB and client2
221+
// handling queueC and queueD, whichever client is elected leader will end
222+
// up running all maintenance services for all queues (queueA, queueB,
223+
// queueC, and queueD). But if client1 is using domain "domain1" and client2
224+
// is using domain "domain2", then client1 (elected in domain1) will only
225+
// run maintenance services on queueA and queueB, while client2 (elected in
226+
// domain2) will run maintenance services on queueC and queueD.
227+
//
228+
// A warning though that River *does not protect against configuration
229+
// mistakes*. If client1 on domain1 is configured for queueA and queueB, and
230+
// client2 on domain2 is *also* configured for queueA and queueB, then both
231+
// clients may end up running maintenance services on the same queues at the
232+
// same time. It's the caller's responsibility to ensure that doesn't
233+
// happen.
234+
//
235+
// Certain maintenance services that aren't queue-related like the indexer
236+
// will continue to run on all leaders regardless of domain. If using this
237+
// feature, it's a good idea to configure ReindexerTimeout on all but a
238+
// single leader domain to river.NeverSchedule().
239+
//
240+
// In general, most River users should not need LeaderDomain, and when
241+
// running multiple Rivers may want to consider using multiple databases and
242+
// multiple schemas instead.
243+
//
244+
// Defaults to "default".
245+
LeaderDomain string
246+
212247
// Logger is the structured logger to use for logging purposes. If none is
213248
// specified, logs will be emitted to STDOUT with messages at warn level
214249
// or higher.
@@ -415,6 +450,7 @@ func (c *Config) WithDefaults() *Config {
415450
Hooks: c.Hooks,
416451
JobInsertMiddleware: c.JobInsertMiddleware,
417452
JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault),
453+
LeaderDomain: c.LeaderDomain,
418454
Logger: logger,
419455
MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault),
420456
Middleware: c.Middleware,
@@ -840,6 +876,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
840876

841877
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
842878
ClientID: config.ID,
879+
Domain: config.LeaderDomain,
843880
Schema: config.Schema,
844881
})
845882
client.services = append(client.services, client.elector)
@@ -860,6 +897,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
860897
client.services = append(client.services, pluginPilot.PluginServices()...)
861898
}
862899

900+
// It's important for queuesIncluded to be `nil` in case it's not in use
901+
// for the various driver queries to work correctly.
902+
var queuesIncluded []string
903+
if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 {
904+
queuesIncluded = maputil.Keys(config.Queues)
905+
slices.Sort(queuesIncluded)
906+
}
907+
863908
//
864909
// Maintenance services
865910
//
@@ -872,6 +917,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
872917
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
873918
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
874919
QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(),
920+
QueuesIncluded: queuesIncluded,
875921
Schema: config.Schema,
876922
Timeout: config.JobCleanerTimeout,
877923
}, driver.GetExecutor())
@@ -882,6 +928,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
882928
{
883929
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
884930
ClientRetryPolicy: config.RetryPolicy,
931+
QueuesIncluded: queuesIncluded,
885932
RescueAfter: config.RescueStuckJobsAfter,
886933
Schema: config.Schema,
887934
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
@@ -897,9 +944,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
897944

898945
{
899946
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
900-
Interval: config.schedulerInterval,
901-
NotifyInsert: client.maybeNotifyInsertForQueues,
902-
Schema: config.Schema,
947+
Interval: config.schedulerInterval,
948+
NotifyInsert: client.maybeNotifyInsertForQueues,
949+
QueuesIncluded: queuesIncluded,
950+
Schema: config.Schema,
903951
}, driver.GetExecutor())
904952
maintenanceServices = append(maintenanceServices, jobScheduler)
905953
client.testSignals.jobScheduler = &jobScheduler.TestSignals
@@ -925,6 +973,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
925973

926974
{
927975
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
976+
QueuesIncluded: queuesIncluded,
928977
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
929978
Schema: config.Schema,
930979
}, driver.GetExecutor())

client_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,41 @@ func Test_Client_Common(t *testing.T) {
259259
riversharedtest.WaitOrTimeout(t, workedChan)
260260
})
261261

262+
t.Run("Leadership_AlternateLeaderDomain", func(t *testing.T) {
263+
t.Parallel()
264+
265+
var client1 *Client[pgx.Tx]
266+
{
267+
config, bundle := setupConfig(t)
268+
config.ReindexerSchedule = &neverSchedule{}
269+
270+
var err error
271+
client1, err = NewClient(bundle.driver, config)
272+
require.NoError(t, err)
273+
client1.testSignals.Init(t)
274+
}
275+
276+
var client2 *Client[pgx.Tx]
277+
{
278+
config, bundle := setupConfig(t)
279+
config.LeaderDomain = "alternate_domain"
280+
config.Schema = client1.config.Schema
281+
config.ReindexerSchedule = &neverSchedule{}
282+
283+
var err error
284+
client2, err = NewClient(bundle.driver, config)
285+
require.NoError(t, err)
286+
client2.testSignals.Init(t)
287+
}
288+
289+
startClient(ctx, t, client1)
290+
startClient(ctx, t, client2)
291+
292+
// Both elected
293+
client1.testSignals.electedLeader.WaitOrTimeout()
294+
client2.testSignals.electedLeader.WaitOrTimeout()
295+
})
296+
262297
t.Run("Queues_Add_WhenClientWontExecuteJobs", func(t *testing.T) {
263298
t.Parallel()
264299

internal/leadership/elector.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"github.com/riverqueue/river/rivershared/util/testutil"
2323
)
2424

25+
const DomainDefault = "default"
26+
2527
const (
2628
electIntervalDefault = 5 * time.Second
2729
electIntervalJitterDefault = 1 * time.Second
@@ -82,6 +84,7 @@ func (ts *electorTestSignals) Init(tb testutil.TestingTB) {
8284

8385
type Config struct {
8486
ClientID string
87+
Domain string
8588
ElectInterval time.Duration // period on which each elector attempts elect even without having received a resignation notification
8689
ElectIntervalJitter time.Duration
8790
Schema string
@@ -121,6 +124,7 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not
121124
return baseservice.Init(archetype, &Elector{
122125
config: (&Config{
123126
ClientID: config.ClientID,
127+
Domain: cmp.Or(config.Domain, string(DomainDefault)),
124128
ElectInterval: cmp.Or(config.ElectInterval, electIntervalDefault),
125129
ElectIntervalJitter: cmp.Or(config.ElectIntervalJitter, electIntervalJitterDefault),
126130
Schema: config.Schema,
@@ -143,9 +147,9 @@ func (e *Elector) Start(ctx context.Context) error {
143147

144148
var sub *notifier.Subscription
145149
if e.notifier == nil {
146-
e.Logger.DebugContext(ctx, e.Name+": No notifier configured; starting in poll mode", "client_id", e.config.ClientID)
150+
e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain)
147151
} else {
148-
e.Logger.DebugContext(ctx, e.Name+": Listening for leadership changes", "client_id", e.config.ClientID, "topic", notifier.NotificationTopicLeadership)
152+
e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", notifier.NotificationTopicLeadership)
149153
var err error
150154
sub, err = e.notifier.Listen(ctx, notifier.NotificationTopicLeadership, func(topic notifier.NotificationTopic, payload string) {
151155
e.handleLeadershipNotification(ctx, topic, payload)
@@ -180,7 +184,7 @@ func (e *Elector) Start(ctx context.Context) error {
180184
return
181185
}
182186

183-
e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID)
187+
e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID, "domain", e.config.Domain)
184188
e.testSignals.GainedLeadership.Signal(struct{}{})
185189

186190
err := e.keepLeadershipLoop(ctx)
@@ -193,7 +197,7 @@ func (e *Elector) Start(ctx context.Context) error {
193197
continue // lost leadership reelection; unusual but not a problem; don't log
194198
}
195199

196-
e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "err", err)
200+
e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err)
197201
}
198202
}
199203
}()
@@ -205,10 +209,11 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {
205209
var attempt int
206210
for {
207211
attempt++
208-
e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID)
212+
e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID, "domain", e.config.Domain)
209213

210214
elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{
211215
LeaderID: e.config.ClientID,
216+
Name: e.config.Domain,
212217
Now: e.Time.NowUTCOrNil(),
213218
Schema: e.config.Schema,
214219
TTL: e.leaderTTL(),
@@ -229,7 +234,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {
229234

230235
attempt = 0
231236

232-
e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID)
237+
e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID, "domain", e.config.Domain)
233238
e.testSignals.DeniedLeadership.Signal(struct{}{})
234239

235240
select {
@@ -254,17 +259,17 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {
254259
func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifier.NotificationTopic, payload string) {
255260
if topic != notifier.NotificationTopicLeadership {
256261
// This should not happen unless the notifier is broken.
257-
e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "topic", topic, "payload", payload)
262+
e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", topic, "payload", payload)
258263
return
259264
}
260265

261266
notification := DBNotification{}
262267
if err := json.Unmarshal([]byte(payload), &notification); err != nil {
263-
e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "err", err)
268+
e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err)
264269
return
265270
}
266271

267-
e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID)
272+
e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID, "domain", e.config.Domain)
268273

269274
// Do an initial context check so in case context is done, it always takes
270275
// precedence over sending a leadership notification.
@@ -359,7 +364,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {
359364
case <-e.requestResignChan:
360365
// Receive a notification telling current leader to resign.
361366

362-
e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID)
367+
e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain)
363368

364369
if !timer.Stop() {
365370
<-timer.C
@@ -383,10 +388,11 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {
383388
// Reelect timer expired; attempt reelection below.
384389
}
385390

386-
e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID)
391+
e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain)
387392

388393
reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{
389394
LeaderID: e.config.ClientID,
395+
Name: e.config.Domain,
390396
Now: e.Time.NowUTCOrNil(),
391397
Schema: e.config.Schema,
392398
TTL: e.leaderTTL(),
@@ -424,7 +430,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {
424430
// always surrendered in a timely manner so it can be picked up quickly by
425431
// another client, even in the event of a cancellation.
426432
func (e *Elector) attemptResignLoop(ctx context.Context) {
427-
e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID)
433+
e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain)
428434

429435
// Make a good faith attempt to resign, even in the presence of errors, but
430436
// don't keep hammering if it doesn't work. In case a resignation failure,
@@ -469,7 +475,7 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int) error {
469475
}
470476

471477
if resigned {
472-
e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID)
478+
e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain)
473479
e.testSignals.ResignedLeadership.Signal(struct{}{})
474480
}
475481

@@ -484,6 +490,7 @@ func (e *Elector) errorSlogArgs(err error, attempt int, sleepDuration time.Durat
484490
return []any{
485491
slog.Int("attempt", attempt),
486492
slog.String("client_id", e.config.ClientID),
493+
slog.String("domain", e.config.Domain),
487494
slog.String("err", err.Error()),
488495
slog.String("sleep_duration", sleepDuration.String()),
489496
}

internal/leadership/elector_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ func TestAttemptElectOrReelect(t *testing.T) {
422422

423423
elected, err := attemptElectOrReelect(ctx, bundle.exec, false, &riverdriver.LeaderElectParams{
424424
LeaderID: clientID,
425+
Name: DomainDefault,
425426
TTL: leaderTTL,
426427
Schema: "",
427428
})
@@ -451,6 +452,7 @@ func TestAttemptElectOrReelect(t *testing.T) {
451452
// the transaction.
452453
elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{
453454
LeaderID: clientID,
455+
Name: DomainDefault,
454456
TTL: 30 * time.Second,
455457
Schema: "",
456458
})
@@ -478,6 +480,7 @@ func TestAttemptElectOrReelect(t *testing.T) {
478480

479481
elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{
480482
LeaderID: "different-client-id",
483+
Name: DomainDefault,
481484
TTL: leaderTTL,
482485
Schema: "",
483486
})

internal/maintenance/job_cleaner.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ type JobCleanerConfig struct {
5656
// QueuesExcluded are queues that'll be excluded from cleaning.
5757
QueuesExcluded []string
5858

59+
// QueuesIncluded are queues that'll be included in cleaning. If set, only
60+
// these queues will be cleaned. If nil, all queues are cleaned.
61+
QueuesIncluded []string
62+
5963
// Schema where River tables are located. Empty string omits schema, causing
6064
// Postgres to default to `search_path`.
6165
Schema string
@@ -79,6 +83,12 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
7983
if c.Interval <= 0 {
8084
panic("JobCleanerConfig.Interval must be above zero")
8185
}
86+
if c.QueuesExcluded != nil && len(c.QueuesExcluded) == 0 {
87+
panic("JobCleanerConfig.QueuesExcluded should be either nil or a non-empty slice")
88+
}
89+
if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 {
90+
panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice")
91+
}
8292
if c.Timeout <= 0 {
8393
panic("JobCleanerConfig.Timeout must be above zero")
8494
}
@@ -117,6 +127,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e
117127
CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault),
118128
DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault),
119129
QueuesExcluded: config.QueuesExcluded,
130+
QueuesIncluded: config.QueuesIncluded,
120131
Interval: cmp.Or(config.Interval, riversharedmaintenance.JobCleanerIntervalDefault),
121132
Schema: config.Schema,
122133
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault),
@@ -205,6 +216,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
205216
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
206217
Max: s.batchSize(),
207218
QueuesExcluded: s.Config.QueuesExcluded,
219+
QueuesIncluded: s.Config.QueuesIncluded,
208220
Schema: s.Config.Schema,
209221
})
210222
if err != nil {

0 commit comments

Comments
 (0)