Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,512 changes: 993 additions & 519 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

1,920 changes: 352 additions & 1,568 deletions go/vt/proto/query/query_vtproto.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/loadshed/snake.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (s *Snake) hasCapacity() bool {
// is dropped by CoDel, or the context is cancelled. The returned SafeUnlock
// must be released via defer unlock.Release().
//
// An empty valveID is valid: such requests bypass the per-valve fairness
// layer but still pass through the CoDel gate. Callers should pass the valve
// ID through unconditionally rather than gating Acquire on a non-empty ID,
// which would silently exclude all unkeyed traffic from load shedding.
//
// The priority ordering convention is that lower-valued priorities indicate
// less important requests. Lower values are shed first.
func (s *Snake) Acquire(ctx context.Context, valveID string, priority float64) (*SafeUnlock, error) {
Expand Down
6 changes: 1 addition & 5 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,11 +818,7 @@ func (qre *QueryExecutor) getConn() (*connpool.PooledConn, func(), error) {

snake := qre.tsv.qe.snake
if snake != nil {
// An empty valve ID is valid: such requests bypass the per-valve
// fairness layer but still pass through the CoDel gate. Gating the
// acquire on a non-empty ID would silently exclude all unkeyed traffic
// from load shedding.
valveID := qre.options.GetUniqueId()
valveID := qre.options.GetLoadshedValveId()
// Translate the Vitess proto priority (0 = most important) into
// Snake's convention (higher priority shed last).
snakePriority := float64(sqlparser.MaxPriorityValue - qre.tsv.getPriorityFromOptions(qre.options))
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,9 +1716,9 @@ func TestGetConnWithSnake(t *testing.T) {

input := "select * from test_table limit 1"

// With UniqueId set, Snake gate admits the request
// With a valve ID set, Snake gate admits the request
qre := newTestQueryExecutor(ctx, tsv, input, 0)
qre.options = &querypb.ExecuteOptions{UniqueId: "test-request-123"}
qre.options = &querypb.ExecuteOptions{LoadshedValveId: "test-request-123"}
conn, release, err := qre.getConn()
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down
6 changes: 1 addition & 5 deletions go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
}

if tp.snake != nil {
// An empty valve ID is valid: such requests bypass the per-valve
// fairness layer but still pass through the CoDel gate. Gating the
// acquire on a non-empty ID would silently exclude all unkeyed
// traffic from load shedding.
valveID := options.GetUniqueId()
valveID := options.GetLoadshedValveId()
// Translate the Vitess proto priority (0 = most important) into
// Snake's convention (higher priority shed last).
protoPriority := priorityFromOptions(options, tp.env.Config().TxThrottlerDefaultPriority)
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vttablet/tabletserver/tx_pool_snake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestTxPoolSnake_BeginAcquiresSlot(t *testing.T) {
defer closer()

ctx := context.Background()
opts := &querypb.ExecuteOptions{UniqueId: "req-1"}
opts := &querypb.ExecuteOptions{LoadshedValveId: "req-1"}

conn, _, _, err := txPool.Begin(ctx, opts, false, 0, nil)
require.NoError(t, err)
Expand All @@ -96,7 +96,7 @@ func TestTxPoolSnake_CommitReleasesSlot(t *testing.T) {
defer closer()

ctx := context.Background()
opts := &querypb.ExecuteOptions{UniqueId: "req-1"}
opts := &querypb.ExecuteOptions{LoadshedValveId: "req-1"}

// Fill the single slot.
conn, _, _, err := txPool.Begin(ctx, opts, false, 0, nil)
Expand All @@ -112,7 +112,7 @@ func TestTxPoolSnake_CommitReleasesSlot(t *testing.T) {
conn2.Release(tx.TxCommit)

// Now a new begin should succeed.
conn3, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{UniqueId: "req-2"}, false, 0, nil)
conn3, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{LoadshedValveId: "req-2"}, false, 0, nil)
require.NoError(t, err)
conn3.Unlock()
conn4, err := txPool.GetAndLock(conn3.ReservedID(), "")
Expand All @@ -126,7 +126,7 @@ func TestTxPoolSnake_RollbackReleasesSlot(t *testing.T) {
defer closer()

ctx := context.Background()
opts := &querypb.ExecuteOptions{UniqueId: "req-1"}
opts := &querypb.ExecuteOptions{LoadshedValveId: "req-1"}

conn, _, _, err := txPool.Begin(ctx, opts, false, 0, nil)
require.NoError(t, err)
Expand All @@ -138,7 +138,7 @@ func TestTxPoolSnake_RollbackReleasesSlot(t *testing.T) {
txPool.RollbackAndRelease(ctx, conn2)

// Slot is free — new begin should succeed.
conn3, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{UniqueId: "req-2"}, false, 0, nil)
conn3, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{LoadshedValveId: "req-2"}, false, 0, nil)
require.NoError(t, err)
conn3.Unlock()
conn4, err := txPool.GetAndLock(conn3.ReservedID(), "")
Expand All @@ -147,13 +147,13 @@ func TestTxPoolSnake_RollbackReleasesSlot(t *testing.T) {
conn4.Release(tx.TxCommit)
}

func TestTxPoolSnake_EmptyUniqueIdAcquiresSlot(t *testing.T) {
func TestTxPoolSnake_EmptyValveIDAcquiresSlot(t *testing.T) {
_, txPool, closer := setupWithSnake(t, 2)
defer closer()

ctx := context.Background()

// A request with no UniqueId still acquires a Snake slot — it enters the
// A request with no valve ID still acquires a Snake slot — it enters the
// CoDel queue directly, bypassing only the per-valve fairness layer.
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
Expand All @@ -171,7 +171,7 @@ func TestTxPoolSnake_ReservedConnSkipsSnake(t *testing.T) {
defer closer()

ctx := context.Background()
opts := &querypb.ExecuteOptions{UniqueId: "req-1"}
opts := &querypb.ExecuteOptions{LoadshedValveId: "req-1"}

// Fill the single slot.
conn, _, _, err := txPool.Begin(ctx, opts, false, 0, nil)
Expand All @@ -187,7 +187,7 @@ func TestTxPoolSnake_ReservedConnSkipsSnake(t *testing.T) {

// Begin with reservedID on the same conn: since the first tx committed and released,
// we need a new conn to test reservedID path. Create one via Begin first.
conn3, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{UniqueId: "req-2"}, false, 0, nil)
conn3, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{LoadshedValveId: "req-2"}, false, 0, nil)
require.NoError(t, err)
rid := conn3.ReservedID()
conn3.Unlock()
Expand All @@ -210,7 +210,7 @@ func TestTxPoolSnake_CapacityExhaustedShedsLoad(t *testing.T) {
ctx := context.Background()

// Fill the single slot.
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{UniqueId: "req-1"}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{LoadshedValveId: "req-1"}, false, 0, nil)
require.NoError(t, err)
conn.Unlock()

Expand All @@ -219,7 +219,7 @@ func TestTxPoolSnake_CapacityExhaustedShedsLoad(t *testing.T) {
shortCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()

_, _, _, err = txPool.Begin(shortCtx, &querypb.ExecuteOptions{UniqueId: "req-2"}, false, 0, nil)
_, _, _, err = txPool.Begin(shortCtx, &querypb.ExecuteOptions{LoadshedValveId: "req-2"}, false, 0, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "dml load shed")

Expand All @@ -235,7 +235,7 @@ func TestTxPoolSnake_NilSnakePassesThrough(t *testing.T) {
defer closer()

ctx := context.Background()
opts := &querypb.ExecuteOptions{UniqueId: "req-1"}
opts := &querypb.ExecuteOptions{LoadshedValveId: "req-1"}

conn, _, _, err := txPool.Begin(ctx, opts, false, 0, nil)
require.NoError(t, err)
Expand Down
10 changes: 6 additions & 4 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,12 @@ message ExecuteOptions {
// where the goal is to warm the buffer pool, not to retrieve data.
bool no_result = 21;

// unique_id identifies the logical request or job that issued this query.
// Used by the Snake load shedder's self-contention valve to detect fan-out
// from a single caller (e.g., map_async issuing N queries on the same shard).
string unique_id = 22;
// loadshed_valve_id identifies the logical request or job that issued this
// query. Used by the Snake load shedder's self-contention valve to detect
// fan-out from a single caller (e.g., map_async issuing N queries on the
// same shard). When empty, the request bypasses the per-valve fairness layer
// but still passes through the CoDel gate.
string loadshed_valve_id = 22;
}

// Field describes a single column returned by a query
Expand Down
18 changes: 18 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions web/vtadmin/src/proto/vtadmin.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading