From 85329af0dbb991e9ff40589b9ec5fd71f3ef766b Mon Sep 17 00:00:00 2001 From: Muneeb Ahmad Date: Thu, 25 Jun 2026 12:58:07 -0700 Subject: [PATCH 1/4] Handle task queue dispatch rate limiting in WCI autoscaling --- wci/client/hook.go | 38 ++- wci/client/hook_test.go | 270 ++++++++++++++++++ wci/metrics/metric_defs.go | 3 + wci/workflow/activities.go | 3 + wci/workflow/iface/workflow.go | 14 +- .../scaling_algorithm/no_sync_match.go | 50 +++- .../scaling_algorithm/no_sync_match_test.go | 137 ++++++++- .../scaling_algorithm/rate_based_test.go | 11 +- wci/workflow/scaling_algorithm/registry.go | 2 + 9 files changed, 502 insertions(+), 26 deletions(-) create mode 100644 wci/client/hook_test.go diff --git a/wci/client/hook.go b/wci/client/hook.go index 28cb94c..08b8bcf 100644 --- a/wci/client/hook.go +++ b/wci/client/hook.go @@ -27,6 +27,7 @@ type ( timestamp time.Time syncMatchCount int noSyncMatchCount int + rateLimitedCount int } taskHookImpl struct { @@ -88,7 +89,7 @@ func (th *taskHookImpl) ProcessTaskAdd(ctx context.Context, event *hooks.TaskAdd workflowID := GenerateWorkerControllerInstanceWorkflowID(event.DeploymentVersion) // batch signals per WCI in minSignalInterval* time buckets - syncMatchBatchCount, noSyncMatchBatchCount, skip := th.batchMatchSignals(ctx, workflowID, event.IsSyncMatch) + syncMatchBatchCount, noSyncMatchBatchCount, rateLimitedBatchCount, skip := th.batchMatchSignals(ctx, workflowID, event.SyncMatchOutcome, event.IsSyncMatch) if skip { return } @@ -106,9 +107,10 @@ func (th *taskHookImpl) ProcessTaskAdd(ctx context.Context, event *hooks.TaskAdd request := &iface.SignalTaskAddRequest{ TaskQueueName: th.taskQueueName, TaskQueueType: th.taskQueueType, - IsSyncMatch: event.IsSyncMatch, + IsSyncMatch: noSyncMatchBatchCount == 0, // backward compatibility: true when no genuine not-matched events so old WCI workers don't scale up NoSyncMatchSignalsSinceLast: noSyncMatchBatchCount, SyncMatchSignalsSinceLast: syncMatchBatchCount, + RateLimitedSignalsSinceLast: rateLimitedBatchCount, } if err := th.client.SignalTaskAddEvent(ctx, th.namespace, event.DeploymentVersion, request); err != nil { @@ -117,7 +119,12 @@ func (th *taskHookImpl) ProcessTaskAdd(ctx context.Context, event *hooks.TaskAdd } } -func (th *taskHookImpl) batchMatchSignals(_ context.Context, workflowID string, isSyncMatch bool) (int, int, bool) { +func (th *taskHookImpl) batchMatchSignals( + _ context.Context, + workflowID string, + outcome hooks.SyncMatchOutcome, + isSyncMatchFallback bool, +) (syncCount int, noSyncCount int, rateLimitedCount int, skip bool) { now := time.Now() th.lastSignalMu.Lock() @@ -129,13 +136,27 @@ func (th *taskHookImpl) batchMatchSignals(_ context.Context, workflowID string, timestamp: time.Unix(0, 0), syncMatchCount: 0, noSyncMatchCount: 0, + rateLimitedCount: 0, } } - if isSyncMatch { + // In older or existing workflows, handle scenario when SyncMatchOutcome + // is not available in the hooks API + if outcome == hooks.SyncMatchOutcomeUnspecified { + if isSyncMatchFallback { + outcome = hooks.SyncMatchOutcomeSuccess + } else { + outcome = hooks.SyncMatchOutcomeNotMatched + } + } + + switch outcome { + case hooks.SyncMatchOutcomeSuccess: last.syncMatchCount++ - } else { + case hooks.SyncMatchOutcomeNotMatched: last.noSyncMatchCount++ + case hooks.SyncMatchOutcomeRateLimited: + last.rateLimitedCount++ // does NOT increment noSyncMatchCount } minSignalIntervalSyncMatch := WorkerControllerMinSignalIntervalSyncMatchMilliseconds.Get(th.dc)(th.namespace.Name().String()) @@ -144,6 +165,8 @@ func (th *taskHookImpl) batchMatchSignals(_ context.Context, workflowID string, } sendBy := last.timestamp.Add(time.Duration(minSignalIntervalSyncMatch) * time.Millisecond) if last.noSyncMatchCount > 0 { + // Only genuine not-matched events pull the send-by forward to the fast interval. + // Rate-limited events only increment rateLimitedCount and never accelerate the batch. minSignalIntervalNoSyncMatch := WorkerControllerMinSignalIntervalNoSyncMatchMilliseconds.Get(th.dc)(th.namespace.Name().String()) if minSignalIntervalNoSyncMatch <= 0 { minSignalIntervalNoSyncMatch = 500 @@ -156,9 +179,10 @@ func (th *taskHookImpl) batchMatchSignals(_ context.Context, workflowID string, timestamp: now, syncMatchCount: 0, noSyncMatchCount: 0, + rateLimitedCount: 0, } - return last.syncMatchCount, last.noSyncMatchCount, false + return last.syncMatchCount, last.noSyncMatchCount, last.rateLimitedCount, false } th.lastSignalDetails[workflowID] = last - return last.syncMatchCount, last.noSyncMatchCount, true + return last.syncMatchCount, last.noSyncMatchCount, last.rateLimitedCount, true } diff --git a/wci/client/hook_test.go b/wci/client/hook_test.go new file mode 100644 index 0000000..b63e6be --- /dev/null +++ b/wci/client/hook_test.go @@ -0,0 +1,270 @@ +package client + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/matching/hooks" +) + +func newHookForTest() *taskHookImpl { + ns := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-ns"}, + nil, + "", + ) + return &taskHookImpl{ + logger: log.NewNoopLogger(), + dc: dynamicconfig.NewNoopCollection(), + namespace: ns, + lastSignalDetails: map[string]*signalBatchDetails{}, + } +} + +// setLastSent pre-populates the batch window for workflowID as if a signal was +// sent at the given time. This forces the next call to batchMatchSignals to +// treat events as still-accumulating (if sendAt is recent) or immediately +// eligible to send (if sendAt is in the past beyond the interval). +func setLastSent(th *taskHookImpl, workflowID string, sentAt time.Time) { + th.lastSignalDetails[workflowID] = &signalBatchDetails{ + timestamp: sentAt, + syncMatchCount: 0, + noSyncMatchCount: 0, + rateLimitedCount: 0, + } +} + +const testWorkflowID = "test-wf-id" + +// TestBatchMatchSignals_Bucketing verifies that each SyncMatchOutcome increments +// the correct internal counter and is reflected in the returned counts. +func TestBatchMatchSignals_Bucketing(t *testing.T) { + ctx := context.Background() + + t.Run("success increments sync count", func(t *testing.T) { + th := newHookForTest() + syncCount, noSyncCount, rateLimitedCount, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeSuccess, true) + require.False(t, skip) + assert.Equal(t, 1, syncCount) + assert.Equal(t, 0, noSyncCount) + assert.Equal(t, 0, rateLimitedCount) + }) + + t.Run("not_matched increments no-sync count", func(t *testing.T) { + th := newHookForTest() + syncCount, noSyncCount, rateLimitedCount, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeNotMatched, false) + require.False(t, skip) + assert.Equal(t, 0, syncCount) + assert.Equal(t, 1, noSyncCount) + assert.Equal(t, 0, rateLimitedCount) + }) + + t.Run("rate_limited increments rate-limited count not no-sync count", func(t *testing.T) { + th := newHookForTest() + syncCount, noSyncCount, rateLimitedCount, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeRateLimited, false) + require.False(t, skip) + assert.Equal(t, 0, syncCount) + assert.Equal(t, 0, noSyncCount) + assert.Equal(t, 1, rateLimitedCount) + }) + + t.Run("accumulated counts from all three buckets are returned on fire", func(t *testing.T) { + th := newHookForTest() + // Pre-populate with counts from prior events in the current batch window, + // using epoch timestamp so the next call fires immediately. + th.lastSignalDetails[testWorkflowID] = &signalBatchDetails{ + timestamp: time.Unix(0, 0), + syncMatchCount: 2, + noSyncMatchCount: 3, + rateLimitedCount: 1, + } + // The next call increments syncMatchCount (2 → 3) and then fires because sendBy is in the past. + // Returned counts include the current event. + syncCount, noSyncCount, rateLimitedCount, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeSuccess, true) + require.False(t, skip) + assert.Equal(t, 3, syncCount) + assert.Equal(t, 3, noSyncCount) + assert.Equal(t, 1, rateLimitedCount) + }) +} + +// TestBatchMatchSignals_Unspecified verifies the backward-compatibility fallback +// for matching service nodes that predate PR #10045. +func TestBatchMatchSignals_Unspecified(t *testing.T) { + ctx := context.Background() + + t.Run("unspecified with isSyncMatch=true routes as success", func(t *testing.T) { + th := newHookForTest() + syncCount, noSyncCount, rateLimitedCount, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeUnspecified, true) + require.False(t, skip) + assert.Equal(t, 1, syncCount) + assert.Equal(t, 0, noSyncCount) + assert.Equal(t, 0, rateLimitedCount) + }) + + t.Run("unspecified with isSyncMatch=false routes as not_matched", func(t *testing.T) { + th := newHookForTest() + syncCount, noSyncCount, rateLimitedCount, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeUnspecified, false) + require.False(t, skip) + assert.Equal(t, 0, syncCount) + assert.Equal(t, 1, noSyncCount) + assert.Equal(t, 0, rateLimitedCount) + }) +} + +// TestBatchMatchSignals_Interval verifies that rate-limited events use the slow +// (sync-match) interval and never accelerate the batch to the fast interval. +func TestBatchMatchSignals_Interval(t *testing.T) { + ctx := context.Background() + + // The noopCollection returns the configured defaults: + // sync interval = 60 000 ms + // no-sync interval = 500 ms + // + // We test interval selection by setting the last-sent timestamp to + // "now minus 600ms" — past the 500ms no-sync interval but well within + // the 60s sync interval. A not-matched event must fire; a rate-limited + // event must still accumulate (skip=true). + + recentEnoughForSyncOnly := time.Now().Add(-600 * time.Millisecond) + + t.Run("rate_limited alone uses slow interval accumulates within 60s window", func(t *testing.T) { + th := newHookForTest() + setLastSent(th, testWorkflowID, recentEnoughForSyncOnly) + + _, _, _, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeRateLimited, false) + assert.True(t, skip, "rate-limited event within 60s window should accumulate, not send") + }) + + t.Run("not_matched alone uses fast interval fires after 500ms", func(t *testing.T) { + th := newHookForTest() + setLastSent(th, testWorkflowID, recentEnoughForSyncOnly) + + _, _, _, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeNotMatched, false) + assert.False(t, skip, "not-matched event past 500ms window should send immediately") + }) + + t.Run("mixed not_matched and rate_limited uses fast interval", func(t *testing.T) { + th := newHookForTest() + // Accumulate a not-matched event first (within the 60s window so it accumulates). + setLastSent(th, testWorkflowID, time.Now()) + th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeNotMatched, false) + + // Now set the last-sent to 600ms ago: noSyncMatchCount > 0 means fast interval applies. + setLastSent(th, testWorkflowID, recentEnoughForSyncOnly) + // Re-add the not-matched count to the fresh batch. + th.lastSignalDetails[testWorkflowID].noSyncMatchCount = 1 + + _, _, _, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeRateLimited, false) + assert.False(t, skip, "batch with not-matched events uses fast interval, must fire") + }) + + t.Run("unspecified with isSyncMatch=true does not accelerate batch", func(t *testing.T) { + th := newHookForTest() + setLastSent(th, testWorkflowID, recentEnoughForSyncOnly) + + _, _, _, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeUnspecified, true) + assert.True(t, skip, "unspecified/sync-match routes as success and must not accelerate the batch") + }) +} + +// TestBatchMatchSignals_LegacyIsSyncMatchCompat verifies that the noSyncCount returned by +// batchMatchSignals correctly drives the legacy IsSyncMatch field on SignalTaskAddRequest +// (set as noSyncMatchBatchCount == 0). Old WCI workers still running during a rolling deploy +// of the matching service use !IsSyncMatch as their scale-up trigger. Sending IsSyncMatch=true +// for rate-limited-only batches prevents those old workers from incorrectly scaling up. +func TestBatchMatchSignals_LegacyIsSyncMatchCompat(t *testing.T) { + ctx := context.Background() + + cases := []struct { + name string + outcome hooks.SyncMatchOutcome + wantNoSyncCount int + wantIsSyncMatch bool // derived as noSyncCount == 0 + }{ + { + name: "rate_limited_only sets IsSyncMatch=true (old workers must not scale up)", + outcome: hooks.SyncMatchOutcomeRateLimited, + wantNoSyncCount: 0, + wantIsSyncMatch: true, + }, + { + name: "not_matched sets IsSyncMatch=false (old workers must scale up)", + outcome: hooks.SyncMatchOutcomeNotMatched, + wantNoSyncCount: 1, + wantIsSyncMatch: false, + }, + { + name: "success sets IsSyncMatch=true (old workers must not scale up)", + outcome: hooks.SyncMatchOutcomeSuccess, + wantNoSyncCount: 0, + wantIsSyncMatch: true, + }, + { + name: "unspecified with isSyncMatch=false sets IsSyncMatch=false", + outcome: hooks.SyncMatchOutcomeUnspecified, + wantNoSyncCount: 1, + wantIsSyncMatch: false, + }, + { + name: "unspecified with isSyncMatch=true sets IsSyncMatch=true", + outcome: hooks.SyncMatchOutcomeUnspecified, + wantNoSyncCount: 0, + wantIsSyncMatch: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + th := newHookForTest() + isSyncMatchFallback := tc.wantIsSyncMatch // drives Unspecified fallback for the last two cases + _, noSyncCount, _, skip := th.batchMatchSignals(ctx, testWorkflowID, tc.outcome, isSyncMatchFallback) + require.False(t, skip) + assert.Equal(t, tc.wantNoSyncCount, noSyncCount) + // Verify the IsSyncMatch derivation used in ProcessTaskAdd matches expectation. + assert.Equal(t, tc.wantIsSyncMatch, noSyncCount == 0, + "IsSyncMatch = (noSyncMatchBatchCount == 0): old WCI workers use !IsSyncMatch as scale-up trigger") + }) + } +} + +// TestBatchMatchSignals_Skip verifies the skip=true / skip=false boundary. +func TestBatchMatchSignals_Skip(t *testing.T) { + ctx := context.Background() + + t.Run("first call always fires epoch timestamp", func(t *testing.T) { + th := newHookForTest() + _, _, _, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeSuccess, true) + assert.False(t, skip) + }) + + t.Run("second call with recent timestamp skips", func(t *testing.T) { + th := newHookForTest() + th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeSuccess, true) + // Timestamp is now set to time.Now() — well within any interval. + _, _, _, skip := th.batchMatchSignals(ctx, testWorkflowID, hooks.SyncMatchOutcomeSuccess, true) + assert.True(t, skip) + }) + + t.Run("different workflow IDs are batched independently", func(t *testing.T) { + th := newHookForTest() + // Batch for wf-a fires. + _, _, _, skipA := th.batchMatchSignals(ctx, "wf-a", hooks.SyncMatchOutcomeSuccess, true) + assert.False(t, skipA) + + // wf-b has no history — also fires. + _, _, _, skipB := th.batchMatchSignals(ctx, "wf-b", hooks.SyncMatchOutcomeSuccess, true) + assert.False(t, skipB) + + // Second call for wf-a now skips (timestamp was set). + _, _, _, skipA2 := th.batchMatchSignals(ctx, "wf-a", hooks.SyncMatchOutcomeSuccess, true) + assert.True(t, skipA2) + }) +} diff --git a/wci/metrics/metric_defs.go b/wci/metrics/metric_defs.go index 74f617e..4b4ece8 100644 --- a/wci/metrics/metric_defs.go +++ b/wci/metrics/metric_defs.go @@ -13,6 +13,9 @@ var ( ScaleUpThrottledCount = metrics.NewCounterDef( "worker_controller_instance_scale_up_throttled_count", metrics.WithDescription("The number of times a scale up was throttled for a worker controller instance.")) + RateLimitedTaskCount = metrics.NewCounterDef( + "worker_controller_instance_rate_limited_task_count", + metrics.WithDescription("The number of rate-limited task-add events observed by a worker controller instance where scale-up was suppressed.")) DeferredScalingDecisionCount = metrics.NewCounterDef( "worker_controller_instance_deferred_scaling_decision_count", metrics.WithDescription("The number of times a deferred scaling decision was dispatched for a worker controller instance.")) diff --git a/wci/workflow/activities.go b/wci/workflow/activities.go index da79a54..e6f4845 100644 --- a/wci/workflow/activities.go +++ b/wci/workflow/activities.go @@ -444,6 +444,9 @@ func (a *Activities) HandleTaskAddSignal(ctx context.Context, req HandleTaskAddS if response.ThrottledCount > 0 { metricsHandler.Counter(wcimetrics.ScaleUpThrottledCount.Name()).Inc(int64(response.ThrottledCount)) } + if response.RateLimitedCount > 0 { + metricsHandler.Counter(wcimetrics.RateLimitedTaskCount.Name()).Inc(int64(response.RateLimitedCount)) + } recordSuccess() diff --git a/wci/workflow/iface/workflow.go b/wci/workflow/iface/workflow.go index 10b678f..f8b5437 100644 --- a/wci/workflow/iface/workflow.go +++ b/wci/workflow/iface/workflow.go @@ -147,9 +147,17 @@ type ( TaskQueueName string `json:"task_queue_name"` TaskQueueType enumspb.TaskQueueType `json:"task_queue_type"` - IsSyncMatch bool `json:"is_sync_match"` - SyncMatchSignalsSinceLast int `json:"sync_match_signals_batched,omitempty"` - NoSyncMatchSignalsSinceLast int `json:"no_sync_match_signals_batched,omitempty"` + // Deprecated: use NoSyncMatchSignalsSinceLast / RateLimitedSignalsSinceLast instead. + // Kept for backward compat with older WCI workflow instances still running during + // rolling deploys. Set to true only when noSyncMatchBatchCount == 0. + // Neither scaling algorithm reads this field for decisions after this change. + IsSyncMatch bool `json:"is_sync_match"` + + // SyncMatchSignalsSinceLast is the count of SyncMatchOutcomeSuccess events in this batch. + // Does NOT include rate-limited events — those are counted in RateLimitedSignalsSinceLast. + SyncMatchSignalsSinceLast int `json:"sync_match_signals_batched,omitempty"` + NoSyncMatchSignalsSinceLast int `json:"no_sync_match_signals_batched,omitempty"` + RateLimitedSignalsSinceLast int `json:"rate_limited_signals_batched,omitempty"` } WorkerControllerInstanceMemo struct { diff --git a/wci/workflow/scaling_algorithm/no_sync_match.go b/wci/workflow/scaling_algorithm/no_sync_match.go index c8503ed..901124f 100644 --- a/wci/workflow/scaling_algorithm/no_sync_match.go +++ b/wci/workflow/scaling_algorithm/no_sync_match.go @@ -45,6 +45,14 @@ const ( configNoSyncMetricsPollIntervalMsKey = "metrics_poll_interval_ms" configNoSyncMetricsPollIntervalMsDefault = int64(60_000) // 60s + // configNoSyncRateLimitedSuppressQuietMsKey: in ProcessMetricsPoll, suppress scale-up for this many ms + // after the last observed rate-limited signal. 0 = disabled. + // Default is 2× the poll interval. + configNoSyncRateLimitedSuppressQuietMsKey = "rate_limited_suppress_quiet_ms" + configNoSyncRateLimitedSuppressQuietMsDefault = 2 * configNoSyncMetricsPollIntervalMsDefault + + stateLastRateLimitedTimestampKey = "last_rate_limited_time_ms" + stateLastScaleUpTimestampKey = "last_scale_up_time_ms" // stateLastDispatchRateKeyFmt is a format string for per-queue dispatch rate state keys. // The %s placeholder is replaced by the queue type name ("workflow", "activity", "nexus"), @@ -60,10 +68,12 @@ var noSyncValidConfigKeys = map[string]struct{}{ configNoSyncMaxWorkerLifetimeMsKey: {}, configNoSyncScaleUpDispatchRateEpsilonKey: {}, configNoSyncMetricsPollIntervalMsKey: {}, + configNoSyncRateLimitedSuppressQuietMsKey: {}, } var noSyncValidStateKeys = map[string]struct{}{ stateLastScaleUpTimestampKey: {}, + stateLastRateLimitedTimestampKey: {}, fmt.Sprintf(stateLastDispatchRateKeyFmt, "workflow"): {}, fmt.Sprintf(stateLastDispatchRateKeyFmt, "activity"): {}, fmt.Sprintf(stateLastDispatchRateKeyFmt, "nexus"): {}, @@ -111,6 +121,9 @@ func (a *scalingAlgorithmNoSync) ValidateConfig(ctx context.Context, config ifac if err := config.ValidateInt64Field(configNoSyncMetricsPollIntervalMsKey, 10000); err != nil { return err } + if err := config.ValidateInt64Field(configNoSyncRateLimitedSuppressQuietMsKey, 0); err != nil { + return err + } // Cross-field: if poll interval < cooloff, metric-driven scale-ups can never fire. // The guard `cooloff > 0` reflects the "0 means disabled" semantics: when cooloff is @@ -146,11 +159,12 @@ func (a *scalingAlgorithmNoSync) ProcessTaskAdd(ctx context.Context, config ifac } } + nowMs := time.Now().UnixMilli() // safe: called from activity context, not workflow + throttledCount := 0 - if !event.IsSyncMatch || event.NoSyncMatchSignalsSinceLast > 0 { + if event.NoSyncMatchSignalsSinceLast > 0 { cooloffMs := config.GetInt64Field(configNoSyncScaleUpCooloffMsKey, configNoSyncScaleUpCooloffMsDefault) lastScaleUpMs := priorState.GetInt64Field(stateLastScaleUpTimestampKey, 0) - nowMs := time.Now().UnixMilli() // safe: called from activity context, not workflow elapsedMs := nowMs - lastScaleUpMs if elapsedMs >= cooloffMs { @@ -162,7 +176,15 @@ func (a *scalingAlgorithmNoSync) ProcessTaskAdd(ctx context.Context, config ifac } } - return &TaskAddResponse{Actions: actions, Status: updatedState, ThrottledCount: throttledCount}, nil + rateLimitedCount := 0 + if event.RateLimitedSignalsSinceLast > 0 { + logger.Info("Task queue dispatch rate limiting observed, suppressing scale-up", + "rate_limited_count", event.RateLimitedSignalsSinceLast) + rateLimitedCount = event.RateLimitedSignalsSinceLast + updatedState[stateLastRateLimitedTimestampKey] = nowMs + } + + return &TaskAddResponse{Actions: actions, Status: updatedState, ThrottledCount: throttledCount, RateLimitedCount: rateLimitedCount}, nil } func (a *scalingAlgorithmNoSync) ProcessDeferredScalingDecision(_ context.Context, _ iface.ScalingAlgorithmConfig, priorState iface.ScalingAlgorithmStatus, _ iface.SignalTaskAddRequest, _ ScalingMetricsSnapshotGetter) (*TaskAddResponse, error) { @@ -199,6 +221,28 @@ func (a *scalingAlgorithmNoSync) ProcessMetricsPoll(ctx context.Context, config nowMs := time.Now().UnixMilli() // safe: called from activity context, not workflow elapsedSinceScaleUp := nowMs - lastScaleUpMs + suppressQuietMs := config.GetInt64Field(configNoSyncRateLimitedSuppressQuietMsKey, configNoSyncRateLimitedSuppressQuietMsDefault) + lastRateLimitedMs := priorState.GetInt64Field(stateLastRateLimitedTimestampKey, 0) + if suppressQuietMs > 0 && nowMs-lastRateLimitedMs < suppressQuietMs { + // Rate limiting recently observed. Suppress backlog-driven scale-up — adding workers + // that hit the rate cap does not help and repeats the cycle. + // Still update per-queue dispatch-rate state so the epsilon de-bounce baseline + // stays current and the first post-suppression poll compares against a fresh reference. + for _, q := range []struct { + qName string + metrics *iface.QueueTypeScalingMetrics + }{ + {"workflow", metricsSnapshot.Workflow}, + {"activity", metricsSnapshot.Activity}, + {"nexus", metricsSnapshot.Nexus}, + } { + if q.metrics != nil { + updatedState[fmt.Sprintf(stateLastDispatchRateKeyFmt, q.qName)] = float64(q.metrics.LastProcessingRate) + } + } + return &MetricsPollResponse{Actions: actions, Status: updatedState, NextPoll: &nextPoll}, nil + } + scaleUp := false for _, q := range []struct { qName string diff --git a/wci/workflow/scaling_algorithm/no_sync_match_test.go b/wci/workflow/scaling_algorithm/no_sync_match_test.go index 1ff6481..e79540b 100644 --- a/wci/workflow/scaling_algorithm/no_sync_match_test.go +++ b/wci/workflow/scaling_algorithm/no_sync_match_test.go @@ -130,7 +130,7 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { }) t.Run("no-sync match nil state first call", func(t *testing.T) { - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, nil, event) require.NoError(t, err) assert.Len(t, resp.Actions, 1) @@ -142,7 +142,7 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { nowMs := time.Now().UnixMilli() state := iface.ScalingAlgorithmStatus{stateLastScaleUpTimestampKey: nowMs} cfg := iface.ScalingAlgorithmConfig{configNoSyncScaleUpCooloffMsKey: int64(30000)} - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} resp, err := a.ProcessTaskAdd(ctx, cfg, state, event) require.NoError(t, err) assert.Len(t, resp.Actions, 0) @@ -150,7 +150,7 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { t.Run("no-sync match outside cooloff", func(t *testing.T) { state := iface.ScalingAlgorithmStatus{stateLastScaleUpTimestampKey: int64(0)} - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, state, event) require.NoError(t, err) assert.Len(t, resp.Actions, 1) @@ -166,7 +166,7 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { }) t.Run("activity queue type writes shared state key", func(t *testing.T) { - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_ACTIVITY} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_ACTIVITY} resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, nil, event) require.NoError(t, err) assert.Len(t, resp.Actions, 1) @@ -174,7 +174,7 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { }) t.Run("nexus queue type writes shared state key", func(t *testing.T) { - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_NEXUS} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_NEXUS} resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, nil, event) require.NoError(t, err) assert.Len(t, resp.Actions, 1) @@ -183,7 +183,7 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { t.Run("state threads correctly across two calls", func(t *testing.T) { // First call: fires and stores timestamp in state. - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} cfg := iface.ScalingAlgorithmConfig{configNoSyncScaleUpCooloffMsKey: int64(30_000)} resp1, err := a.ProcessTaskAdd(ctx, cfg, nil, event) require.NoError(t, err) @@ -199,12 +199,73 @@ func TestNoSyncProcessTaskAdd(t *testing.T) { nowMs := time.Now().UnixMilli() state := iface.ScalingAlgorithmStatus{stateLastScaleUpTimestampKey: nowMs} cfg := iface.ScalingAlgorithmConfig{configNoSyncScaleUpCooloffMsKey: int64(0)} - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} resp, err := a.ProcessTaskAdd(ctx, cfg, state, event) require.NoError(t, err) assert.Len(t, resp.Actions, 1) assert.Equal(t, ActionTypeInvokeWorker, resp.Actions[0].Action) }) + + t.Run("rate-limited only does not invoke worker", func(t *testing.T) { + event := iface.SignalTaskAddRequest{RateLimitedSignalsSinceLast: 3, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, nil, event) + require.NoError(t, err) + assert.Len(t, resp.Actions, 0) + assert.Equal(t, 0, resp.ThrottledCount) + assert.Equal(t, 3, resp.RateLimitedCount) + assert.NotContains(t, resp.Status, stateLastScaleUpTimestampKey, "rate-limited events must not update the scale-up timestamp") + assert.Contains(t, resp.Status, stateLastRateLimitedTimestampKey, "rate-limited events must write the rate-limited timestamp") + }) + + t.Run("rate-limited only does not invoke worker even with cooloff elapsed", func(t *testing.T) { + state := iface.ScalingAlgorithmStatus{stateLastScaleUpTimestampKey: int64(0)} + event := iface.SignalTaskAddRequest{RateLimitedSignalsSinceLast: 2, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, state, event) + require.NoError(t, err) + assert.Len(t, resp.Actions, 0) + assert.Equal(t, 0, resp.ThrottledCount) + assert.Equal(t, 2, resp.RateLimitedCount) + assert.Equal(t, int64(0), resp.Status.GetInt64Field(stateLastScaleUpTimestampKey, 0), "scale-up timestamp must not change") + }) + + t.Run("mixed no-sync and rate-limited cooloff elapsed invokes worker", func(t *testing.T) { + event := iface.SignalTaskAddRequest{ + NoSyncMatchSignalsSinceLast: 2, + RateLimitedSignalsSinceLast: 1, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + } + resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, nil, event) + require.NoError(t, err) + assert.Len(t, resp.Actions, 1) + assert.Equal(t, ActionTypeInvokeWorker, resp.Actions[0].Action) + assert.Equal(t, 0, resp.ThrottledCount) + assert.Equal(t, 1, resp.RateLimitedCount) + assert.Contains(t, resp.Status, stateLastRateLimitedTimestampKey) + }) + + t.Run("mixed no-sync and rate-limited cooloff not elapsed throttles and reports both counts", func(t *testing.T) { + nowMs := time.Now().UnixMilli() + state := iface.ScalingAlgorithmStatus{stateLastScaleUpTimestampKey: nowMs} + cfg := iface.ScalingAlgorithmConfig{configNoSyncScaleUpCooloffMsKey: int64(30_000)} + event := iface.SignalTaskAddRequest{ + NoSyncMatchSignalsSinceLast: 1, + RateLimitedSignalsSinceLast: 2, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + } + resp, err := a.ProcessTaskAdd(ctx, cfg, state, event) + require.NoError(t, err) + assert.Len(t, resp.Actions, 0) + assert.Equal(t, 1, resp.ThrottledCount) + assert.Equal(t, 2, resp.RateLimitedCount) + }) + + t.Run("sync-match-only does not write rate-limited timestamp", func(t *testing.T) { + event := iface.SignalTaskAddRequest{SyncMatchSignalsSinceLast: 5} + resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, nil, event) + require.NoError(t, err) + assert.Len(t, resp.Actions, 0) + assert.NotContains(t, resp.Status, stateLastRateLimitedTimestampKey) + }) } func TestNoSyncCompatibleLaunchStrategies(t *testing.T) { @@ -686,7 +747,7 @@ func TestNoSyncProcessMetricsPoll(t *testing.T) { // Both methods share the same last_scale_up_time_ms key, so a scale-up via ProcessTaskAdd // must suppress a subsequent ProcessMetricsPoll within the cooloff window. cfg := iface.ScalingAlgorithmConfig{configNoSyncScaleUpCooloffMsKey: int64(30_000)} - event := iface.SignalTaskAddRequest{IsSyncMatch: false, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW} taskAddResp, err := a.ProcessTaskAdd(ctx, cfg, nil, event) require.NoError(t, err) assert.Len(t, taskAddResp.Actions, 1) @@ -699,3 +760,63 @@ func TestNoSyncProcessMetricsPoll(t *testing.T) { assert.Len(t, pollResp.Actions, 0) }) } + +func TestNoSyncProcessMetricsPollRateLimitedSuppression(t *testing.T) { + a := newNoSync() + ctx := context.Background() + + snapshot := ScalingMetricsSnapshot{ + Workflow: &iface.QueueTypeScalingMetrics{LastBacklogCount: 5}, + } + + t.Run("recent rate-limited timestamp suppresses scale-up", func(t *testing.T) { + nowMs := time.Now().UnixMilli() + state := iface.ScalingAlgorithmStatus{stateLastRateLimitedTimestampKey: nowMs} + cfg := iface.ScalingAlgorithmConfig{configNoSyncRateLimitedSuppressQuietMsKey: int64(30_000)} + resp, err := a.ProcessMetricsPoll(ctx, cfg, state, snapshot) + require.NoError(t, err) + assert.Len(t, resp.Actions, 0, "scale-up must be suppressed while rate limiting is recent") + }) + + t.Run("stale rate-limited timestamp does not suppress scale-up", func(t *testing.T) { + oldRateLimitedMs := time.Now().Add(-time.Hour).UnixMilli() + state := iface.ScalingAlgorithmStatus{stateLastRateLimitedTimestampKey: oldRateLimitedMs} + cfg := iface.ScalingAlgorithmConfig{ + configNoSyncRateLimitedSuppressQuietMsKey: int64(30_000), + configNoSyncScaleUpCooloffMsKey: int64(0), + } + resp, err := a.ProcessMetricsPoll(ctx, cfg, state, snapshot) + require.NoError(t, err) + assert.Len(t, resp.Actions, 1, "scale-up must fire once TTL has elapsed") + assert.Equal(t, ActionTypeInvokeWorker, resp.Actions[0].Action) + }) + + t.Run("suppress_quiet_ms=0 disables suppression", func(t *testing.T) { + nowMs := time.Now().UnixMilli() + state := iface.ScalingAlgorithmStatus{stateLastRateLimitedTimestampKey: nowMs} + cfg := iface.ScalingAlgorithmConfig{ + configNoSyncRateLimitedSuppressQuietMsKey: int64(0), + configNoSyncScaleUpCooloffMsKey: int64(0), + } + resp, err := a.ProcessMetricsPoll(ctx, cfg, state, snapshot) + require.NoError(t, err) + assert.Len(t, resp.Actions, 1, "suppression disabled via TTL=0 must allow scale-up") + }) + + t.Run("ProcessTaskAdd state threads into ProcessMetricsPoll suppression", func(t *testing.T) { + cfg := iface.ScalingAlgorithmConfig{ + configNoSyncRateLimitedSuppressQuietMsKey: int64(30_000), + configNoSyncScaleUpCooloffMsKey: int64(0), + } + // First: a rate-limited signal writes the state key. + event := iface.SignalTaskAddRequest{RateLimitedSignalsSinceLast: 2} + taskAddResp, err := a.ProcessTaskAdd(ctx, cfg, nil, event) + require.NoError(t, err) + assert.Contains(t, taskAddResp.Status, stateLastRateLimitedTimestampKey) + + // Second: ProcessMetricsPoll reads that state and suppresses. + pollResp, err := a.ProcessMetricsPoll(ctx, cfg, taskAddResp.Status, snapshot) + require.NoError(t, err) + assert.Len(t, pollResp.Actions, 0, "ProcessMetricsPoll must be suppressed by rate-limited state from ProcessTaskAdd") + }) +} diff --git a/wci/workflow/scaling_algorithm/rate_based_test.go b/wci/workflow/scaling_algorithm/rate_based_test.go index 85bb59b..9cd03f8 100644 --- a/wci/workflow/scaling_algorithm/rate_based_test.go +++ b/wci/workflow/scaling_algorithm/rate_based_test.go @@ -243,7 +243,7 @@ func TestRateBasedProcessTaskAdd(t *testing.T) { configRateBasedEWMAAlphaKey: float64(0.5), configRateBasedInitialPerConsumerCapacityKey: float64(10), } - event := iface.SignalTaskAddRequest{IsSyncMatch: false} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1} resp, err := a.ProcessTaskAdd(ctx, cfg, state, event) require.NoError(t, err) @@ -264,7 +264,7 @@ func TestRateBasedProcessTaskAdd(t *testing.T) { configRateBasedInitialCountKey: int64(2), configRateBasedMaxCountKey: int64(5), } - event := iface.SignalTaskAddRequest{IsSyncMatch: false} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1} resp, err := a.ProcessTaskAdd(ctx, cfg, nil, event) require.NoError(t, err) @@ -286,7 +286,7 @@ func TestRateBasedProcessTaskAdd(t *testing.T) { configRateBasedInitialCountKey: int64(1), configRateBasedMaxCountKey: int64(10), } - event := iface.SignalTaskAddRequest{IsSyncMatch: false} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1} resp, err := a.ProcessTaskAdd(ctx, cfg, state, event) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestRateBasedProcessTaskAdd(t *testing.T) { stateRateBasedWorkerCount: int64(0), stateRateBasedLastScaleUpTimestamp: oldMs(), } - event := iface.SignalTaskAddRequest{IsSyncMatch: false} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1} resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, state, event) require.NoError(t, err) @@ -379,7 +379,7 @@ func TestRateBasedProcessTaskAdd(t *testing.T) { stateRateBasedWorkerCount: int64(2), stateRateBasedLastScaleUpTimestamp: oldMs(), } - event := iface.SignalTaskAddRequest{IsSyncMatch: false} + event := iface.SignalTaskAddRequest{NoSyncMatchSignalsSinceLast: 1} resp, err := a.ProcessTaskAdd(ctx, iface.ScalingAlgorithmConfig{}, state, event) require.NoError(t, err) @@ -423,6 +423,7 @@ func TestRateBasedProcessTaskAdd(t *testing.T) { assert.NotContains(t, resp.Status, stateRateBasedEWMADispatchRate, "wrong-typed dispatch slot must not survive into persisted state") assert.NotContains(t, resp.Status, stateRateBasedEWMAPerConsumerCapacity, "non-finite capacity slot must not survive into persisted state") }) + } func TestRateBasedProcessDeferredScalingDecision(t *testing.T) { diff --git a/wci/workflow/scaling_algorithm/registry.go b/wci/workflow/scaling_algorithm/registry.go index 8596d2a..584cb86 100644 --- a/wci/workflow/scaling_algorithm/registry.go +++ b/wci/workflow/scaling_algorithm/registry.go @@ -42,6 +42,8 @@ type ( Status iface.ScalingAlgorithmStatus // The number of task-add events that were throttled ThrottledCount int + // The number of rate-limited task-add events observed where scale-up was suppressed + RateLimitedCount int } MetricsPollResponse struct { From 10d6074f95d134516fae5784d0f8bee054b66d84 Mon Sep 17 00:00:00 2001 From: Muneeb Ahmad Date: Thu, 25 Jun 2026 16:18:54 -0700 Subject: [PATCH 2/4] Handle task queue dispatch rate limiting in WCI autoscaling --- wci/workflow/scaling_algorithm/no_sync_match.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/wci/workflow/scaling_algorithm/no_sync_match.go b/wci/workflow/scaling_algorithm/no_sync_match.go index 901124f..6a46db7 100644 --- a/wci/workflow/scaling_algorithm/no_sync_match.go +++ b/wci/workflow/scaling_algorithm/no_sync_match.go @@ -192,6 +192,7 @@ func (a *scalingAlgorithmNoSync) ProcessDeferredScalingDecision(_ context.Contex } func (a *scalingAlgorithmNoSync) ProcessMetricsPoll(ctx context.Context, config iface.ScalingAlgorithmConfig, priorState iface.ScalingAlgorithmStatus, metricsSnapshot ScalingMetricsSnapshot) (*MetricsPollResponse, error) { + logger := safeActivityLogger(ctx) updatedState := maps.Clone(priorState) actions := []ScalingAction{} @@ -228,6 +229,10 @@ func (a *scalingAlgorithmNoSync) ProcessMetricsPoll(ctx context.Context, config // that hit the rate cap does not help and repeats the cycle. // Still update per-queue dispatch-rate state so the epsilon de-bounce baseline // stays current and the first post-suppression poll compares against a fresh reference. + logger.Debug("no_sync_match: ProcessMetricsPoll scale-up suppressed due to recent rate limiting", + "elapsed_since_rate_limited_ms", nowMs-lastRateLimitedMs, + "suppress_quiet_ms", suppressQuietMs, + ) for _, q := range []struct { qName string metrics *iface.QueueTypeScalingMetrics From 485976260dfa2dac439b4de6e1d066775ff7f57f Mon Sep 17 00:00:00 2001 From: Muneeb Ahmad Date: Thu, 25 Jun 2026 16:15:57 -0700 Subject: [PATCH 3/4] Address review feedback - 1 --- wci/client/hook.go | 4 ++-- wci/workflow/iface/workflow.go | 5 ++-- .../scaling_algorithm/no_sync_match.go | 24 ++++++++++++------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/wci/client/hook.go b/wci/client/hook.go index 08b8bcf..64bdf46 100644 --- a/wci/client/hook.go +++ b/wci/client/hook.go @@ -165,8 +165,8 @@ func (th *taskHookImpl) batchMatchSignals( } sendBy := last.timestamp.Add(time.Duration(minSignalIntervalSyncMatch) * time.Millisecond) if last.noSyncMatchCount > 0 { - // Only genuine not-matched events pull the send-by forward to the fast interval. - // Rate-limited events only increment rateLimitedCount and never accelerate the batch. + // Not-matched events (no worker was available) send the batch urgently at 500ms. + // Rate-limited events stay in the slow 60-second window alongside sync-matched events. minSignalIntervalNoSyncMatch := WorkerControllerMinSignalIntervalNoSyncMatchMilliseconds.Get(th.dc)(th.namespace.Name().String()) if minSignalIntervalNoSyncMatch <= 0 { minSignalIntervalNoSyncMatch = 500 diff --git a/wci/workflow/iface/workflow.go b/wci/workflow/iface/workflow.go index f8b5437..858cbec 100644 --- a/wci/workflow/iface/workflow.go +++ b/wci/workflow/iface/workflow.go @@ -153,8 +153,9 @@ type ( // Neither scaling algorithm reads this field for decisions after this change. IsSyncMatch bool `json:"is_sync_match"` - // SyncMatchSignalsSinceLast is the count of SyncMatchOutcomeSuccess events in this batch. - // Does NOT include rate-limited events — those are counted in RateLimitedSignalsSinceLast. + // SyncMatchSignalsSinceLast is the count of tasks successfully dispatched to a waiting worker in this batch. + // Rate-limited events (worker is available but there's no task handoff due to rate-limiting) are a + //distinct outcome tracked separately in RateLimitedSignalsSinceLast. SyncMatchSignalsSinceLast int `json:"sync_match_signals_batched,omitempty"` NoSyncMatchSignalsSinceLast int `json:"no_sync_match_signals_batched,omitempty"` RateLimitedSignalsSinceLast int `json:"rate_limited_signals_batched,omitempty"` diff --git a/wci/workflow/scaling_algorithm/no_sync_match.go b/wci/workflow/scaling_algorithm/no_sync_match.go index 6a46db7..2ca0b7d 100644 --- a/wci/workflow/scaling_algorithm/no_sync_match.go +++ b/wci/workflow/scaling_algorithm/no_sync_match.go @@ -45,12 +45,16 @@ const ( configNoSyncMetricsPollIntervalMsKey = "metrics_poll_interval_ms" configNoSyncMetricsPollIntervalMsDefault = int64(60_000) // 60s - // configNoSyncRateLimitedSuppressQuietMsKey: in ProcessMetricsPoll, suppress scale-up for this many ms - // after the last observed rate-limited signal. 0 = disabled. - // Default is 2× the poll interval. + // configNoSyncRateLimitedSuppressQuietMsKey prevents ProcessMetricsPoll from invoking workers + // based on backlog growth caused by rate limiting. Without this, a backlog growing because of + // rate limiting would trigger more invocations, but those workers also hit the limit and add no throughput. + // After rate limiting is observed, backlog-driven scale-up is blocked for this many ms. 0 = disabled. + // Default is 2× the poll interval so at least one full poll cycle is suppressed. configNoSyncRateLimitedSuppressQuietMsKey = "rate_limited_suppress_quiet_ms" configNoSyncRateLimitedSuppressQuietMsDefault = 2 * configNoSyncMetricsPollIntervalMsDefault + // stateLastRateLimitedTimestampKey records when rate limiting was last observed. + // Written by ProcessTaskAdd, read by ProcessMetricsPoll to enforce the suppression window above. stateLastRateLimitedTimestampKey = "last_rate_limited_time_ms" stateLastScaleUpTimestampKey = "last_scale_up_time_ms" @@ -178,6 +182,10 @@ func (a *scalingAlgorithmNoSync) ProcessTaskAdd(ctx context.Context, config ifac rateLimitedCount := 0 if event.RateLimitedSignalsSinceLast > 0 { + // No scale-up action is taken for rate-limited events — workers are already polling, + // the bottleneck is the task queue dispatch rate limit. Record the count for the metric + // and update the timestamp so ProcessMetricsPoll suppresses backlog-driven scale-up for + // the configured quiet window. logger.Info("Task queue dispatch rate limiting observed, suppressing scale-up", "rate_limited_count", event.RateLimitedSignalsSinceLast) rateLimitedCount = event.RateLimitedSignalsSinceLast @@ -224,12 +232,10 @@ func (a *scalingAlgorithmNoSync) ProcessMetricsPoll(ctx context.Context, config suppressQuietMs := config.GetInt64Field(configNoSyncRateLimitedSuppressQuietMsKey, configNoSyncRateLimitedSuppressQuietMsDefault) lastRateLimitedMs := priorState.GetInt64Field(stateLastRateLimitedTimestampKey, 0) - if suppressQuietMs > 0 && nowMs-lastRateLimitedMs < suppressQuietMs { - // Rate limiting recently observed. Suppress backlog-driven scale-up — adding workers - // that hit the rate cap does not help and repeats the cycle. - // Still update per-queue dispatch-rate state so the epsilon de-bounce baseline - // stays current and the first post-suppression poll compares against a fresh reference. - logger.Debug("no_sync_match: ProcessMetricsPoll scale-up suppressed due to recent rate limiting", + // lastRateLimitedMs > 0 ensures suppression only activates when rate limiting has actually + // been observed. + if suppressQuietMs > 0 && lastRateLimitedMs > 0 && nowMs-lastRateLimitedMs < suppressQuietMs { + logger.Info("no_sync_match: ProcessMetricsPoll scale-up suppressed due to recent rate limiting", "elapsed_since_rate_limited_ms", nowMs-lastRateLimitedMs, "suppress_quiet_ms", suppressQuietMs, ) From 27e03e5593c5b3760bf86db5b18808fec648a002 Mon Sep 17 00:00:00 2001 From: Muneeb Ahmad Date: Thu, 25 Jun 2026 17:41:59 -0700 Subject: [PATCH 4/4] Add rate-limited counter for PullStats --- wci/workflow/iface/workflow.go | 2 +- .../scaling_algorithm/no_sync_match.go | 48 +++++++++++---- .../scaling_algorithm/no_sync_match_test.go | 59 ++++++++++++++++++- 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/wci/workflow/iface/workflow.go b/wci/workflow/iface/workflow.go index 858cbec..9a780b8 100644 --- a/wci/workflow/iface/workflow.go +++ b/wci/workflow/iface/workflow.go @@ -155,7 +155,7 @@ type ( // SyncMatchSignalsSinceLast is the count of tasks successfully dispatched to a waiting worker in this batch. // Rate-limited events (worker is available but there's no task handoff due to rate-limiting) are a - //distinct outcome tracked separately in RateLimitedSignalsSinceLast. + // distinct outcome tracked separately in RateLimitedSignalsSinceLast. SyncMatchSignalsSinceLast int `json:"sync_match_signals_batched,omitempty"` NoSyncMatchSignalsSinceLast int `json:"no_sync_match_signals_batched,omitempty"` RateLimitedSignalsSinceLast int `json:"rate_limited_signals_batched,omitempty"` diff --git a/wci/workflow/scaling_algorithm/no_sync_match.go b/wci/workflow/scaling_algorithm/no_sync_match.go index 2ca0b7d..74c0b09 100644 --- a/wci/workflow/scaling_algorithm/no_sync_match.go +++ b/wci/workflow/scaling_algorithm/no_sync_match.go @@ -45,18 +45,26 @@ const ( configNoSyncMetricsPollIntervalMsKey = "metrics_poll_interval_ms" configNoSyncMetricsPollIntervalMsDefault = int64(60_000) // 60s - // configNoSyncRateLimitedSuppressQuietMsKey prevents ProcessMetricsPoll from invoking workers - // based on backlog growth caused by rate limiting. Without this, a backlog growing because of - // rate limiting would trigger more invocations, but those workers also hit the limit and add no throughput. - // After rate limiting is observed, backlog-driven scale-up is blocked for this many ms. 0 = disabled. - // Default is 2× the poll interval so at least one full poll cycle is suppressed. + // configNoSyncRateLimitedSuppressQuietMsKey is the duration to block backlog-driven scale-up + // after rate limiting is no longer actively signalled. Adding workers when the dispatch rate limit + // is being applied does not increase throughput; they will poll but not receive work any faster. + // Set to 0 to disable. Default is 2× the poll interval. configNoSyncRateLimitedSuppressQuietMsKey = "rate_limited_suppress_quiet_ms" configNoSyncRateLimitedSuppressQuietMsDefault = 2 * configNoSyncMetricsPollIntervalMsDefault - // stateLastRateLimitedTimestampKey records when rate limiting was last observed. - // Written by ProcessTaskAdd, read by ProcessMetricsPoll to enforce the suppression window above. + // stateLastRateLimitedTimestampKey records when rate limiting was last confirmed active. + // Written by ProcessTaskAdd (on every observed signal) and refreshed by ProcessMetricsPoll + // (whenever the per-poll counter shows signals arrived since the last poll). Read by + // ProcessMetricsPoll to enforce the suppression window. stateLastRateLimitedTimestampKey = "last_rate_limited_time_ms" + // stateRateLimitedCountSinceLastPollKey accumulates rate-limited event counts between + // consecutive ProcessMetricsPoll runs. ProcessTaskAdd increments it; ProcessMetricsPoll + // reads it, refreshes the suppression timestamp if non-zero (signals still arriving), then + // resets it to zero. This bridges the two signal paths so suppression stays active as long + // as rate limiting is ongoing — even when idle pollers disappear because workers are busy. + stateRateLimitedCountSinceLastPollKey = "rate_limited_count_since_last_poll" + stateLastScaleUpTimestampKey = "last_scale_up_time_ms" // stateLastDispatchRateKeyFmt is a format string for per-queue dispatch rate state keys. // The %s placeholder is replaced by the queue type name ("workflow", "activity", "nexus"), @@ -78,6 +86,7 @@ var noSyncValidConfigKeys = map[string]struct{}{ var noSyncValidStateKeys = map[string]struct{}{ stateLastScaleUpTimestampKey: {}, stateLastRateLimitedTimestampKey: {}, + stateRateLimitedCountSinceLastPollKey: {}, fmt.Sprintf(stateLastDispatchRateKeyFmt, "workflow"): {}, fmt.Sprintf(stateLastDispatchRateKeyFmt, "activity"): {}, fmt.Sprintf(stateLastDispatchRateKeyFmt, "nexus"): {}, @@ -183,13 +192,15 @@ func (a *scalingAlgorithmNoSync) ProcessTaskAdd(ctx context.Context, config ifac rateLimitedCount := 0 if event.RateLimitedSignalsSinceLast > 0 { // No scale-up action is taken for rate-limited events — workers are already polling, - // the bottleneck is the task queue dispatch rate limit. Record the count for the metric - // and update the timestamp so ProcessMetricsPoll suppresses backlog-driven scale-up for - // the configured quiet window. + // the bottleneck is the task queue dispatch rate limit. Record the count for the metric, + // stamp the timestamp, and accumulate into the per-poll counter so ProcessMetricsPoll + // can keep suppression active while signals keep arriving. logger.Info("Task queue dispatch rate limiting observed, suppressing scale-up", "rate_limited_count", event.RateLimitedSignalsSinceLast) rateLimitedCount = event.RateLimitedSignalsSinceLast updatedState[stateLastRateLimitedTimestampKey] = nowMs + prevCount := priorState.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0) + updatedState[stateRateLimitedCountSinceLastPollKey] = prevCount + int64(event.RateLimitedSignalsSinceLast) } return &TaskAddResponse{Actions: actions, Status: updatedState, ThrottledCount: throttledCount, RateLimitedCount: rateLimitedCount}, nil @@ -231,13 +242,24 @@ func (a *scalingAlgorithmNoSync) ProcessMetricsPoll(ctx context.Context, config elapsedSinceScaleUp := nowMs - lastScaleUpMs suppressQuietMs := config.GetInt64Field(configNoSyncRateLimitedSuppressQuietMsKey, configNoSyncRateLimitedSuppressQuietMsDefault) + rateLimitedCountSinceLastPoll := priorState.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0) lastRateLimitedMs := priorState.GetInt64Field(stateLastRateLimitedTimestampKey, 0) - // lastRateLimitedMs > 0 ensures suppression only activates when rate limiting has actually - // been observed. + + if rateLimitedCountSinceLastPoll > 0 { + // Rate-limited signals arrived since our last poll — rate limiting is still active. + // Refresh the suppression timestamp to now and reset the counter so the next poll + // starts fresh. + lastRateLimitedMs = nowMs + updatedState[stateLastRateLimitedTimestampKey] = nowMs + updatedState[stateRateLimitedCountSinceLastPollKey] = int64(0) + } + + // lastRateLimitedMs > 0 ensures suppression only activates when rate limiting has been observed. if suppressQuietMs > 0 && lastRateLimitedMs > 0 && nowMs-lastRateLimitedMs < suppressQuietMs { - logger.Info("no_sync_match: ProcessMetricsPoll scale-up suppressed due to recent rate limiting", + logger.Debug("no_sync_match: ProcessMetricsPoll scale-up suppressed due to recent rate limiting", "elapsed_since_rate_limited_ms", nowMs-lastRateLimitedMs, "suppress_quiet_ms", suppressQuietMs, + "rate_limited_count_since_last_poll", rateLimitedCountSinceLastPoll, ) for _, q := range []struct { qName string diff --git a/wci/workflow/scaling_algorithm/no_sync_match_test.go b/wci/workflow/scaling_algorithm/no_sync_match_test.go index e79540b..052d43a 100644 --- a/wci/workflow/scaling_algorithm/no_sync_match_test.go +++ b/wci/workflow/scaling_algorithm/no_sync_match_test.go @@ -808,15 +808,70 @@ func TestNoSyncProcessMetricsPollRateLimitedSuppression(t *testing.T) { configNoSyncRateLimitedSuppressQuietMsKey: int64(30_000), configNoSyncScaleUpCooloffMsKey: int64(0), } - // First: a rate-limited signal writes the state key. + // First: a rate-limited signal writes the state key and counter. event := iface.SignalTaskAddRequest{RateLimitedSignalsSinceLast: 2} taskAddResp, err := a.ProcessTaskAdd(ctx, cfg, nil, event) require.NoError(t, err) assert.Contains(t, taskAddResp.Status, stateLastRateLimitedTimestampKey) + assert.Equal(t, int64(2), taskAddResp.Status.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0)) - // Second: ProcessMetricsPoll reads that state and suppresses. + // Second: ProcessMetricsPoll reads counter > 0, refreshes timestamp, resets counter, suppresses. pollResp, err := a.ProcessMetricsPoll(ctx, cfg, taskAddResp.Status, snapshot) require.NoError(t, err) assert.Len(t, pollResp.Actions, 0, "ProcessMetricsPoll must be suppressed by rate-limited state from ProcessTaskAdd") + assert.Equal(t, int64(0), pollResp.Status.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0), + "counter must be reset to zero after ProcessMetricsPoll reads it") + }) + + t.Run("suppression sustained across multiple polls while signals keep arriving", func(t *testing.T) { + cfg := iface.ScalingAlgorithmConfig{ + configNoSyncRateLimitedSuppressQuietMsKey: int64(30_000), + configNoSyncScaleUpCooloffMsKey: int64(0), + } + + // Simulate: rate-limited signal → poll → rate-limited signal → poll → still suppressed. + event := iface.SignalTaskAddRequest{RateLimitedSignalsSinceLast: 5} + + // Round 1: signal arrives, first poll suppresses and resets counter. + taskAddResp1, err := a.ProcessTaskAdd(ctx, cfg, nil, event) + require.NoError(t, err) + pollResp1, err := a.ProcessMetricsPoll(ctx, cfg, taskAddResp1.Status, snapshot) + require.NoError(t, err) + assert.Len(t, pollResp1.Actions, 0, "first poll must be suppressed") + assert.Equal(t, int64(0), pollResp1.Status.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0)) + + // Round 2: another signal arrives (rate limiting still active), second poll also suppressed. + taskAddResp2, err := a.ProcessTaskAdd(ctx, cfg, pollResp1.Status, event) + require.NoError(t, err) + assert.Equal(t, int64(5), taskAddResp2.Status.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0), + "counter must accumulate from zero after the first poll reset it") + pollResp2, err := a.ProcessMetricsPoll(ctx, cfg, taskAddResp2.Status, snapshot) + require.NoError(t, err) + assert.Len(t, pollResp2.Actions, 0, "second poll must still be suppressed while signals keep arriving") + assert.Equal(t, int64(0), pollResp2.Status.GetInt64Field(stateRateLimitedCountSinceLastPollKey, 0)) + }) + + t.Run("suppression lifts when signals stop and TTL expires", func(t *testing.T) { + cfg := iface.ScalingAlgorithmConfig{ + configNoSyncRateLimitedSuppressQuietMsKey: int64(30_000), + configNoSyncScaleUpCooloffMsKey: int64(0), + } + + // Signal arrives → poll suppresses and resets counter. + event := iface.SignalTaskAddRequest{RateLimitedSignalsSinceLast: 3} + taskAddResp, err := a.ProcessTaskAdd(ctx, cfg, nil, event) + require.NoError(t, err) + pollResp1, err := a.ProcessMetricsPoll(ctx, cfg, taskAddResp.Status, snapshot) + require.NoError(t, err) + assert.Len(t, pollResp1.Actions, 0) + + // No new signals — counter stays zero. Simulate TTL expired by backdating the timestamp. + stateAfterSignalsStopped := pollResp1.Status + stateAfterSignalsStopped[stateLastRateLimitedTimestampKey] = time.Now().Add(-time.Hour).UnixMilli() + + pollResp2, err := a.ProcessMetricsPoll(ctx, cfg, stateAfterSignalsStopped, snapshot) + require.NoError(t, err) + assert.Len(t, pollResp2.Actions, 1, "scale-up must fire once TTL has elapsed and no new signals arrived") + assert.Equal(t, ActionTypeInvokeWorker, pollResp2.Actions[0].Action) }) }