Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ jobs:
name: lint
runs-on: ubuntu-latest
env:
GOLANGCI_LINT_VERSION: v2.0.0
GOLANGCI_LINT_VERSION: v2.1.6
permissions:
contents: read
# allow read access to pull request. Use with `only-new-issues` option.
Expand Down
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ linters:

# disabled because they're annoying/bad
- cyclop # screams into the void at "cyclomatic complexity"
- funcorder # very particular about where unexported functions can go, lots of churn.
- funlen # screams when functions are more than 60 lines long; what are we even doing here guys
- interfacebloat # we do in fact want >10 methods on the Adapter interface or wherever we see fit.
- gocognit # yells that "cognitive complexity" is too high; why
Expand Down
4 changes: 2 additions & 2 deletions riverdbtest/riverdbtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestTestSchema(t *testing.T) {

var schema string

t.Run("FirstCheckout", func(t *testing.T) {
t.Run("FirstCheckout", func(t *testing.T) { //nolint:paralleltest
schema = TestSchema(ctx, t, driver, &TestSchemaOpts{
Lines: []string{}, // non-nil empty indicates no migrations should be run
skipPackageNameCheck: true,
Expand All @@ -122,7 +122,7 @@ func TestTestSchema(t *testing.T) {

var schema string

t.Run("FirstCheckout", func(t *testing.T) {
t.Run("FirstCheckout", func(t *testing.T) { //nolint:paralleltest
schema = TestSchema(ctx, t, driver, &TestSchemaOpts{
LineTargetVersions: map[string]int{riverdriver.MigrationLineMain: 1},
skipPackageNameCheck: true,
Expand Down
164 changes: 138 additions & 26 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,20 @@ func TestRequireInsertedTx(t *testing.T) {
t.Run("InsertOpts", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

// Verify custom insertion options.
insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
ScheduledAt: testTime,
Tags: []string{"tag1"},
})
require.NoError(t, err)
job := insertRes.Job

emptyOpts := func() *RequireInsertedOpts { return &RequireInsertedOpts{} }

insertJob := func(riverClient *river.Client[pgx.Tx], bundle *testBundle) *rivertype.JobRow {
insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
ScheduledAt: testTime,
Tags: []string{"tag1"},
})
require.NoError(t, err)
return insertRes.Job
}

sameOpts := func() *RequireInsertedOpts {
return &RequireInsertedOpts{
MaxAttempts: 78,
Expand All @@ -236,6 +235,12 @@ func TestRequireInsertedTx(t *testing.T) {
}

t.Run("MaxAttempts", func(t *testing.T) {
t.Parallel()

@brandur brandur May 25, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you saw the data race failures, but I think we were cheating a bit on these subtests by sharing the same transaction between all these test cases. Use of a tx across goroutines is obviously not safe and that's what was failing these all so hard.

Here's a patch for a fix if you want to use it:

diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go
index a2182d6..aa19097 100644
--- a/rivertest/rivertest_test.go
+++ b/rivertest/rivertest_test.go
@@ -209,21 +209,20 @@ func TestRequireInsertedTx(t *testing.T) {
 	t.Run("InsertOpts", func(t *testing.T) {
 		t.Parallel()
 
-		riverClient, bundle := setup(t)
-
-		// Verify custom insertion options.
-		insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
-			MaxAttempts: 78,
-			Priority:    2,
-			Queue:       "another_queue",
-			ScheduledAt: testTime,
-			Tags:        []string{"tag1"},
-		})
-		require.NoError(t, err)
-		job := insertRes.Job
-
 		emptyOpts := func() *RequireInsertedOpts { return &RequireInsertedOpts{} }
 
+		insertJob := func(riverClient *river.Client[pgx.Tx], bundle *testBundle) *rivertype.JobRow {
+			insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
+				MaxAttempts: 78,
+				Priority:    2,
+				Queue:       "another_queue",
+				ScheduledAt: testTime,
+				Tags:        []string{"tag1"},
+			})
+			require.NoError(t, err)
+			return insertRes.Job
+		}
+
 		sameOpts := func() *RequireInsertedOpts {
 			return &RequireInsertedOpts{
 				MaxAttempts: 78,
@@ -238,6 +237,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("MaxAttempts", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.MaxAttempts = 77
@@ -251,6 +254,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("Priority", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.Priority = 3
@@ -264,6 +271,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("Queue", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.Queue = "wrong_queue"
@@ -277,6 +288,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("ScheduledAt", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.ScheduledAt = testTime.Add(3*time.Minute + 23*time.Second + 123*time.Microsecond)
@@ -290,6 +305,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("State", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.State = rivertype.JobStateCancelled
@@ -303,6 +322,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("Tags", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.Tags = []string{"tag2"}
@@ -316,6 +339,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("MultiplePropertiesSucceed", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.MaxAttempts = job.MaxAttempts
@@ -327,6 +354,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("MultiplePropertiesFails", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			opts.MaxAttempts = 77
@@ -341,6 +372,10 @@ func TestRequireInsertedTx(t *testing.T) {
 		t.Run("AllSameSucceeds", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts)
@@ -506,21 +541,20 @@ func TestRequireNotInsertedTx(t *testing.T) {
 	t.Run("InsertOpts", func(t *testing.T) {
 		t.Parallel()
 
-		riverClient, bundle := setup(t)
-
-		// Verify custom insertion options.
-		insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
-			MaxAttempts: 78,
-			Priority:    2,
-			Queue:       "another_queue",
-			ScheduledAt: testTime,
-			Tags:        []string{"tag1"},
-		})
-		require.NoError(t, err)
-		job := insertRes.Job
-
 		emptyOpts := func() *RequireInsertedOpts { return &RequireInsertedOpts{} }
 
+		insertJob := func(riverClient *river.Client[pgx.Tx], bundle *testBundle) *rivertype.JobRow {
+			insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
+				MaxAttempts: 78,
+				Priority:    2,
+				Queue:       "another_queue",
+				ScheduledAt: testTime,
+				Tags:        []string{"tag1"},
+			})
+			require.NoError(t, err)
+			return insertRes.Job
+		}
+
 		sameOpts := func() *RequireInsertedOpts {
 			return &RequireInsertedOpts{
 				MaxAttempts: 78,
@@ -535,6 +569,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("MaxAttempts", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.MaxAttempts = job.MaxAttempts
@@ -548,6 +586,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("Priority", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.Priority = job.Priority
@@ -561,6 +603,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("Queue", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.Queue = job.Queue
@@ -574,6 +620,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("ScheduledAt", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.ScheduledAt = job.ScheduledAt
@@ -587,6 +637,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("State", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.State = job.State
@@ -600,6 +654,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("Tags", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.Tags = job.Tags
@@ -613,6 +671,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("MultiplePropertiesSucceed", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.MaxAttempts = job.MaxAttempts // one property matches job, but the other does not
@@ -624,6 +686,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("MultiplePropertiesFail", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := emptyOpts()
 			opts.MaxAttempts = job.MaxAttempts
@@ -638,6 +704,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("AllSameFails", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			job := insertJob(riverClient, bundle)
+
 			mockT := testutil.NewMockT(t)
 			opts := sameOpts()
 			requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts)
@@ -650,6 +720,10 @@ func TestRequireNotInsertedTx(t *testing.T) {
 		t.Run("FailsWithTooManyInserts", func(t *testing.T) {
 			t.Parallel()
 
+			riverClient, bundle := setup(t)
+
+			_ = insertJob(riverClient, bundle)
+
 			_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
 				Priority: 3,
 			})

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, didn't even run the tests after making the changes or this would have been obvious 😆

Had to lint disable a couple cases which were intentionally reusing a schema variable between parent and subtest as part of what was actually being tested.


riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.MaxAttempts = 77
Expand All @@ -247,6 +252,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("Priority", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.Priority = 3
Expand All @@ -258,6 +269,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("Queue", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.Queue = "wrong_queue"
Expand All @@ -269,6 +286,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("ScheduledAt", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.ScheduledAt = testTime.Add(3*time.Minute + 23*time.Second + 123*time.Microsecond)
Expand All @@ -280,6 +303,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("State", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.State = rivertype.JobStateCancelled
Expand All @@ -291,6 +320,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("Tags", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.Tags = []string{"tag2"}
Expand All @@ -302,6 +337,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("MultiplePropertiesSucceed", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.MaxAttempts = job.MaxAttempts
Expand All @@ -311,6 +352,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("MultiplePropertiesFails", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
opts.MaxAttempts = 77
Expand All @@ -323,6 +370,12 @@ func TestRequireInsertedTx(t *testing.T) {
})

t.Run("AllSameSucceeds", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts)
Expand Down Expand Up @@ -488,21 +541,20 @@ func TestRequireNotInsertedTx(t *testing.T) {
t.Run("InsertOpts", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

// Verify custom insertion options.
insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
ScheduledAt: testTime,
Tags: []string{"tag1"},
})
require.NoError(t, err)
job := insertRes.Job

emptyOpts := func() *RequireInsertedOpts { return &RequireInsertedOpts{} }

insertJob := func(riverClient *river.Client[pgx.Tx], bundle *testBundle) *rivertype.JobRow {
insertRes, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
ScheduledAt: testTime,
Tags: []string{"tag1"},
})
require.NoError(t, err)
return insertRes.Job
}

sameOpts := func() *RequireInsertedOpts {
return &RequireInsertedOpts{
MaxAttempts: 78,
Expand All @@ -515,6 +567,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
}

t.Run("MaxAttempts", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.MaxAttempts = job.MaxAttempts
Expand All @@ -526,6 +584,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("Priority", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.Priority = job.Priority
Expand All @@ -537,6 +601,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("Queue", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.Queue = job.Queue
Expand All @@ -548,6 +618,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("ScheduledAt", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.ScheduledAt = job.ScheduledAt
Expand All @@ -559,6 +635,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("State", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.State = job.State
Expand All @@ -570,6 +652,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("Tags", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.Tags = job.Tags
Expand All @@ -581,6 +669,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("MultiplePropertiesSucceed", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.MaxAttempts = job.MaxAttempts // one property matches job, but the other does not
Expand All @@ -590,6 +684,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("MultiplePropertiesFail", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.MaxAttempts = job.MaxAttempts
Expand All @@ -602,6 +702,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("AllSameFails", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

job := insertJob(riverClient, bundle)

mockT := testutil.NewMockT(t)
opts := sameOpts()
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts)
Expand All @@ -612,6 +718,12 @@ func TestRequireNotInsertedTx(t *testing.T) {
})

t.Run("FailsWithTooManyInserts", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_ = insertJob(riverClient, bundle)

_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
Priority: 3,
})
Expand Down
Loading