Skip to content

Commit 87e82c3

Browse files
authored
Honor Config.Schema for the running-state transition in rivertest.Worker (#1262)
`rivertest.Worker` works a job in three database steps: it inserts the job through the client, builds an inline completer, and transitions the job to `running` via `JobUpdateFull`. The first two already thread `config.Schema` through — the client uses it internally, and it's passed explicitly to `jobcompleter.NewInlineCompleter` — but the `JobUpdateFull` call omitted it. With a non-default `Schema`, the insert lands the job in `<schema>.river_job` correctly, then the running-state update runs unqualified and resolves only through the connection's `search_path`. On a connection that doesn't include the configured schema it fails one step later with: test worker internal error: failed to update job to running state: ERROR: relation "river_job" does not exist (SQLSTATE 42P01) Pass `w.config.Schema` into `JobUpdateFullParams` so all three steps agree on the schema. This finishes custom-schema support for `rivertest.Worker`; the `rivertest.Require*` family received an analogous per-call `Schema` option in #926 (#907). The regression tests migrate River into an isolated named schema and work jobs through a transaction whose `search_path` is empty, so the tables resolve only via schema qualification — the exact condition that fails before this change. They live as `CustomSchema` subtests of `TestWorker_Work` and `TestWorker_WorkJob`, each building its own bundle inline since the schema setup differs from those tests' shared `setup`.
1 parent 61c6ac7 commit 87e82c3

3 files changed

Lines changed: 67 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313

1414
- Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259)
1515

16+
### Fixed
17+
18+
- Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262)
19+
1620
## [0.38.0] - 2026-05-22
1721

1822
### Added

rivertest/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job
166166
AttemptedAtDoUpdate: true,
167167
AttemptedBy: append(job.AttemptedBy, w.config.ID),
168168
AttemptedByDoUpdate: true,
169+
Schema: w.config.Schema,
169170
StateDoUpdate: true,
170171
State: rivertype.JobStateRunning,
171172
})

rivertest/worker_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,34 @@ func TestWorker_Work(t *testing.T) {
450450
require.True(t, middlewareCalled)
451451
require.True(t, middlewareWithBaseServiceCalled)
452452
})
453+
454+
// Honors config.Schema: River lives in a named schema, worked through a
455+
// transaction with an empty search_path so tables resolve only via schema
456+
// qualification. Needs its own infrastructure, so it skips setup.
457+
t.Run("CustomSchema", func(t *testing.T) {
458+
t.Parallel()
459+
460+
var (
461+
dbPool = riversharedtest.DBPool(ctx, t)
462+
driver = riverpgxv5.New(dbPool)
463+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
464+
config = &river.Config{ID: "rivertest-worker", Schema: schema}
465+
)
466+
467+
tx, err := dbPool.Begin(ctx)
468+
require.NoError(t, err)
469+
t.Cleanup(func() { _ = tx.Rollback(ctx) })
470+
471+
worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error {
472+
require.Equal(t, rivertype.JobStateRunning, job.State)
473+
return nil
474+
})
475+
476+
tw := NewWorker(t, driver, config, worker)
477+
res, err := tw.Work(ctx, t, tx, testArgs{Value: "test"}, nil)
478+
require.NoError(t, err)
479+
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
480+
})
453481
}
454482

455483
func TestWorker_WorkJob(t *testing.T) {
@@ -556,4 +584,38 @@ func TestWorker_WorkJob(t *testing.T) {
556584
require.ErrorContains(t, err, "failed to update job to running state")
557585
require.Nil(t, res)
558586
})
587+
588+
// Honors config.Schema: River lives in a named schema, worked through a
589+
// transaction with an empty search_path so tables resolve only via schema
590+
// qualification. Needs its own infrastructure, so it skips setup.
591+
t.Run("CustomSchema", func(t *testing.T) {
592+
t.Parallel()
593+
594+
var (
595+
dbPool = riversharedtest.DBPool(ctx, t)
596+
driver = riverpgxv5.New(dbPool)
597+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
598+
config = &river.Config{ID: "rivertest-workjob", Schema: schema}
599+
)
600+
601+
tx, err := dbPool.Begin(ctx)
602+
require.NoError(t, err)
603+
t.Cleanup(func() { _ = tx.Rollback(ctx) })
604+
605+
client, err := river.NewClient(driver, config)
606+
require.NoError(t, err)
607+
608+
insertRes, err := client.InsertTx(ctx, tx, testArgs{Value: "test"}, nil)
609+
require.NoError(t, err)
610+
611+
worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error {
612+
require.Equal(t, rivertype.JobStateRunning, job.State)
613+
return nil
614+
})
615+
616+
tw := NewWorker(t, driver, config, worker)
617+
res, err := tw.WorkJob(ctx, t, tx, insertRes.Job)
618+
require.NoError(t, err)
619+
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
620+
})
559621
}

0 commit comments

Comments
 (0)