Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/testfactory"
"github.com/riverqueue/river/rivershared/util/hashutil"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
Expand Down Expand Up @@ -2703,43 +2704,43 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

exec, _ := setup(ctx, t)

// Acquire the advisory lock.
_, err := exec.PGAdvisoryXactLock(ctx, 123456)
require.NoError(t, err)

// Open a new transaction and try to acquire the same lock, which should
// block because the lock can't be acquired. Verify some amount of wait,
// cancel the lock acquisition attempt, then verify return.
{
otherExec := executorWithTx(ctx, t)

goroutineDone := make(chan struct{})
// It's possible for multiple versions of this test to be running at the
// same time (from different drivers), so make sure the lock we're
// acquiring per test is unique by using the complete test name.
lockHash := hashutil.NewAdvisoryLockHash(0)
lockHash.Write([]byte(t.Name()))
lockHash.Write([]byte("123456"))
key := lockHash.Key()
Comment on lines +2707 to +2713

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx thx.


// Tries to acquire the given lock from another test transaction and
// returns true if the lock was acquired.
tryAcquireLock := func(exec riverdriver.Executor) bool {
var lockAcquired bool
require.NoError(t, exec.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", key).Scan(&lockAcquired))
return lockAcquired
}

ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
// Start a transaction to acquire the lock so we can later release the
// lock by rolling back.
execTx, err := exec.Begin(ctx)
require.NoError(t, err)

go func() {
defer close(goroutineDone)
// Acquire the advisory lock on the main test transaction.
_, err = execTx.PGAdvisoryXactLock(ctx, key)
require.NoError(t, err)

_, err := otherExec.PGAdvisoryXactLock(ctx, 123456)
// pgx will produce a context.Canceled error, but pg swallows it to emit its own
require.Regexp(t, "(context canceled|pq: canceling statement due to user request)", err.Error())
}()
// Start another test transaction unrelated to the first.
otherExec := executorWithTx(ctx, t)

select {
case <-goroutineDone:
require.FailNow(t, "Unexpectedly acquired lock that should've held by other transaction")
case <-time.After(50 * time.Millisecond):
}
// The other test transaction is unable to acquire the lock because the
// first test transaction holds it.
require.False(t, tryAcquireLock(otherExec))

cancel()
// Roll back the first test transaction to release the lock.
require.NoError(t, execTx.Rollback(ctx))

select {
case <-goroutineDone:
case <-time.After(50 * time.Millisecond):
require.FailNow(t, "Goroutine didn't finish in a timely manner")
}
}
// The other test transaction can now acquire the lock.
require.True(t, tryAcquireLock(otherExec))
})

t.Run("QueueCreateOrSetUpdatedAt", func(t *testing.T) {
Expand Down