diff --git a/packages/core/src/backoff/UploadStateMachine.ts b/packages/core/src/backoff/UploadStateMachine.ts new file mode 100644 index 000000000..98d23bde4 --- /dev/null +++ b/packages/core/src/backoff/UploadStateMachine.ts @@ -0,0 +1,188 @@ +import { createStore } from '@segment/sovran-react-native'; +import type { Store, Persistor } from '@segment/sovran-react-native'; +import type { UploadStateData, RateLimitConfig, LoggerType } from '../types'; + +const INITIAL_STATE: UploadStateData = { + state: 'READY', + waitUntilTime: 0, + globalRetryCount: 0, + firstFailureTime: null, +}; + +/** + * State machine managing global rate limiting for 429 responses per the TAPI SDD. + * Implements READY/RATE_LIMITED states with persistence across app restarts. + */ +export class UploadStateMachine { + private store: Store; + private config: RateLimitConfig; + private logger?: LoggerType; + + /** + * Creates an UploadStateMachine instance. + * + * @param storeId - Unique identifier for the store (typically writeKey) + * @param persistor - Optional persistor for state persistence + * @param config - Rate limit configuration from Settings object + * @param logger - Optional logger for debugging + */ + constructor( + storeId: string, + persistor: Persistor | undefined, + config: RateLimitConfig, + logger?: LoggerType + ) { + this.config = config; + this.logger = logger; + + try { + this.store = createStore( + INITIAL_STATE, + persistor + ? { + persist: { + storeId: `${storeId}-uploadState`, + persistor, + }, + } + : undefined + ); + } catch (e) { + const errorMessage = e instanceof Error ? e.message : String(e); + this.logger?.error( + `[UploadStateMachine] Persistence failed, using in-memory store: ${errorMessage}` + ); + + try { + this.store = createStore(INITIAL_STATE); + } catch (fallbackError) { + const fallbackMessage = + fallbackError instanceof Error + ? fallbackError.message + : String(fallbackError); + this.logger?.error( + `[UploadStateMachine] CRITICAL: In-memory store creation failed: ${fallbackMessage}` + ); + throw fallbackError; + } + } + } + + /** + * Check if uploads can proceed based on rate limit state. + * Automatically transitions from RATE_LIMITED to READY when wait time has passed. + * + * @returns true if uploads should proceed, false if rate limited + */ + async canUpload(): Promise { + if (!this.config.enabled) { + return true; + } + + const state = await this.store.getState(); + const now = Date.now(); + + if (state.state === 'READY') { + return true; + } + + if (now >= state.waitUntilTime) { + await this.transitionToReady(); + return true; + } + + const waitSeconds = Math.ceil((state.waitUntilTime - now) / 1000); + this.logger?.info( + `Upload blocked: rate limited, retry in ${waitSeconds}s (retry ${state.globalRetryCount}/${this.config.maxRetryCount})` + ); + return false; + } + + /** + * Handle a 429 rate limit response by setting RATE_LIMITED state. + * Increments global retry count and enforces max retry/duration limits. + * + * @param retryAfterSeconds - Delay in seconds from Retry-After header (validated and clamped) + */ + async handle429(retryAfterSeconds: number): Promise { + if (!this.config.enabled) { + return; + } + + // Validate and clamp input + if (retryAfterSeconds < 0) { + this.logger?.warn( + `Invalid retryAfterSeconds ${retryAfterSeconds}, using 0` + ); + retryAfterSeconds = 0; + } + if (retryAfterSeconds > this.config.maxRetryInterval) { + this.logger?.warn( + `retryAfterSeconds ${retryAfterSeconds}s exceeds maxRetryInterval, clamping to ${this.config.maxRetryInterval}s` + ); + retryAfterSeconds = this.config.maxRetryInterval; + } + + const now = Date.now(); + const state = await this.store.getState(); + + const newRetryCount = state.globalRetryCount + 1; + const firstFailureTime = state.firstFailureTime ?? now; + const totalBackoffDuration = (now - firstFailureTime) / 1000; + + if (newRetryCount > this.config.maxRetryCount) { + this.logger?.warn( + `Max retry count exceeded (${this.config.maxRetryCount}), resetting rate limiter` + ); + await this.reset(); + return; + } + + if (totalBackoffDuration > this.config.maxRateLimitDuration) { + this.logger?.warn( + `Max backoff duration exceeded (${this.config.maxRateLimitDuration}s), resetting rate limiter` + ); + await this.reset(); + return; + } + + const waitUntilTime = now + retryAfterSeconds * 1000; + + await this.store.dispatch(() => ({ + state: 'RATE_LIMITED' as const, + waitUntilTime, + globalRetryCount: newRetryCount, + firstFailureTime, + })); + + this.logger?.info( + `Rate limited (429): waiting ${retryAfterSeconds}s before retry ${newRetryCount}/${this.config.maxRetryCount}` + ); + } + + /** + * Reset the state machine to READY with retry count 0. + * Called on successful upload (2xx response). + */ + async reset(): Promise { + await this.store.dispatch(() => INITIAL_STATE); + } + + /** + * Get the current global retry count for X-Retry-Count header. + * + * @returns Current global retry count + */ + async getGlobalRetryCount(): Promise { + const state = await this.store.getState(); + return state.globalRetryCount; + } + + private async transitionToReady(): Promise { + this.logger?.info('Rate limit period expired, resuming uploads'); + await this.store.dispatch((state: UploadStateData) => ({ + ...state, + state: 'READY' as const, + })); + } +} diff --git a/packages/core/src/backoff/__tests__/UploadStateMachine.test.ts b/packages/core/src/backoff/__tests__/UploadStateMachine.test.ts new file mode 100644 index 000000000..41871e6a4 --- /dev/null +++ b/packages/core/src/backoff/__tests__/UploadStateMachine.test.ts @@ -0,0 +1,282 @@ +import { UploadStateMachine } from '../UploadStateMachine'; +import type { Persistor } from '@segment/sovran-react-native'; +import type { RateLimitConfig } from '../../types'; +import { getMockLogger } from '../../test-helpers'; +import { createTestPersistor } from './test-helpers'; + +jest.mock('@segment/sovran-react-native', () => { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const helpers = require('./test-helpers'); + return { + ...jest.requireActual('@segment/sovran-react-native'), + createStore: jest.fn((initialState: unknown) => + // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access + helpers.createMockStore(initialState) + ), + }; +}); + +describe('UploadStateMachine', () => { + let sharedStorage: Record; + let mockPersistor: Persistor; + let mockLogger: ReturnType; + + const defaultConfig: RateLimitConfig = { + enabled: true, + maxRetryCount: 100, + maxRetryInterval: 300, + maxRateLimitDuration: 43200, + }; + + beforeEach(() => { + sharedStorage = {}; + mockPersistor = createTestPersistor(sharedStorage); + mockLogger = getMockLogger(); + jest.clearAllMocks(); + }); + + describe('canUpload', () => { + it('returns true in READY state', async () => { + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + expect(await sm.canUpload()).toBe(true); + }); + + it('returns false during RATE_LIMITED when waitUntilTime not reached', async () => { + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(60); + + expect(await sm.canUpload()).toBe(false); + }); + + it('transitions to READY when waitUntilTime has passed', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(60); + jest.spyOn(Date, 'now').mockReturnValue(now + 61000); + + expect(await sm.canUpload()).toBe(true); + }); + + it('always returns true when config.enabled is false', async () => { + const disabledConfig: RateLimitConfig = { + ...defaultConfig, + enabled: false, + }; + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + disabledConfig, + mockLogger + ); + + await sm.handle429(60); + expect(await sm.canUpload()).toBe(true); + }); + }); + + describe('handle429', () => { + it('increments retry count', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(60); + expect(await sm.getGlobalRetryCount()).toBe(1); + + await sm.handle429(60); + expect(await sm.getGlobalRetryCount()).toBe(2); + }); + + it('blocks uploads with correct wait time', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(60); + expect(await sm.canUpload()).toBe(false); + + jest.spyOn(Date, 'now').mockReturnValue(now + 59000); + expect(await sm.canUpload()).toBe(false); + + jest.spyOn(Date, 'now').mockReturnValue(now + 60000); + expect(await sm.canUpload()).toBe(true); + }); + + it('resets when max retry count exceeded', async () => { + const limitedConfig: RateLimitConfig = { + ...defaultConfig, + maxRetryCount: 3, + }; + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + limitedConfig, + mockLogger + ); + + await sm.handle429(10); + await sm.handle429(10); + await sm.handle429(10); + await sm.handle429(10); + + expect(mockLogger.warn).toHaveBeenCalledWith( + 'Max retry count exceeded (3), resetting rate limiter' + ); + expect(await sm.getGlobalRetryCount()).toBe(0); + }); + + it('resets when max rate limit duration exceeded', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const limitedConfig: RateLimitConfig = { + ...defaultConfig, + maxRateLimitDuration: 10, + }; + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + limitedConfig, + mockLogger + ); + + await sm.handle429(5); + jest.spyOn(Date, 'now').mockReturnValue(now + 11000); + await sm.handle429(5); + + expect(mockLogger.warn).toHaveBeenCalledWith( + 'Max backoff duration exceeded (10s), resetting rate limiter' + ); + expect(await sm.getGlobalRetryCount()).toBe(0); + }); + + it('no-ops when config.enabled is false', async () => { + const disabledConfig: RateLimitConfig = { + ...defaultConfig, + enabled: false, + }; + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + disabledConfig, + mockLogger + ); + + await sm.handle429(60); + expect(await sm.getGlobalRetryCount()).toBe(0); + }); + + it('handles negative retryAfterSeconds gracefully', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(-10); + + expect(mockLogger.warn).toHaveBeenCalledWith( + 'Invalid retryAfterSeconds -10, using 0' + ); + expect(await sm.getGlobalRetryCount()).toBe(1); + expect(await sm.canUpload()).toBe(true); // No wait time + }); + + it('handles zero retryAfterSeconds', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(0); + + expect(await sm.getGlobalRetryCount()).toBe(1); + expect(await sm.canUpload()).toBe(true); // No wait time + }); + + it('clamps very large retryAfterSeconds to maxRetryInterval', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(999999); + + expect(mockLogger.warn).toHaveBeenCalledWith( + 'retryAfterSeconds 999999s exceeds maxRetryInterval, clamping to 300s' + ); + expect(await sm.getGlobalRetryCount()).toBe(1); + + // Should wait maxRetryInterval, not 999999 + jest.spyOn(Date, 'now').mockReturnValue(now + 299000); + expect(await sm.canUpload()).toBe(false); + jest.spyOn(Date, 'now').mockReturnValue(now + 300000); + expect(await sm.canUpload()).toBe(true); + }); + }); + + describe('reset', () => { + it('clears to READY with retryCount 0', async () => { + const sm = new UploadStateMachine( + 'test-key', + mockPersistor, + defaultConfig, + mockLogger + ); + + await sm.handle429(60); + expect(await sm.getGlobalRetryCount()).toBe(1); + + await sm.reset(); + + expect(await sm.getGlobalRetryCount()).toBe(0); + expect(await sm.canUpload()).toBe(true); + }); + }); +}); diff --git a/packages/core/src/backoff/__tests__/test-helpers.ts b/packages/core/src/backoff/__tests__/test-helpers.ts new file mode 100644 index 000000000..bcddbb64e --- /dev/null +++ b/packages/core/src/backoff/__tests__/test-helpers.ts @@ -0,0 +1,28 @@ +import type { Persistor } from '@segment/sovran-react-native'; + +export const createMockStore = (initialState: T) => { + let state = initialState; + return { + getState: jest.fn(() => Promise.resolve(state)), + dispatch: jest.fn((action: unknown) => { + if (typeof action === 'function') { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + state = action(state); + } else { + state = (action as { payload: unknown }).payload as T; + } + return Promise.resolve(); + }), + }; +}; + +export const createTestPersistor = ( + storage: Record = {} +): Persistor => ({ + get: async (key: string): Promise => + Promise.resolve(storage[key] as T), + set: async (key: string, state: T): Promise => { + storage[key] = state; + return Promise.resolve(); + }, +}); diff --git a/packages/core/src/backoff/index.ts b/packages/core/src/backoff/index.ts new file mode 100644 index 000000000..5fe4305c4 --- /dev/null +++ b/packages/core/src/backoff/index.ts @@ -0,0 +1 @@ +export { UploadStateMachine } from './UploadStateMachine'; diff --git a/packages/core/src/config-validation.ts b/packages/core/src/config-validation.ts new file mode 100644 index 000000000..a9b51eb5d --- /dev/null +++ b/packages/core/src/config-validation.ts @@ -0,0 +1,42 @@ +import type { RateLimitConfig, LoggerType } from './types'; + +export const validateRateLimitConfig = ( + config: RateLimitConfig, + logger?: LoggerType +): RateLimitConfig => { + const validated = { ...config }; + + if (validated.maxRetryInterval < 0.1) { + logger?.warn( + `maxRetryInterval ${validated.maxRetryInterval}s clamped to 0.1s` + ); + validated.maxRetryInterval = 0.1; + } else if (validated.maxRetryInterval > 86400) { + logger?.warn( + `maxRetryInterval ${validated.maxRetryInterval}s clamped to 86400s` + ); + validated.maxRetryInterval = 86400; + } + + if (validated.maxRateLimitDuration < 60) { + logger?.warn( + `maxRateLimitDuration ${validated.maxRateLimitDuration}s clamped to 60s` + ); + validated.maxRateLimitDuration = 60; + } else if (validated.maxRateLimitDuration > 604800) { + logger?.warn( + `maxRateLimitDuration ${validated.maxRateLimitDuration}s clamped to 604800s` + ); + validated.maxRateLimitDuration = 604800; + } + + if (validated.maxRetryCount < 1) { + logger?.warn(`maxRetryCount ${validated.maxRetryCount} clamped to 1`); + validated.maxRetryCount = 1; + } else if (validated.maxRetryCount > 100) { + logger?.warn(`maxRetryCount ${validated.maxRetryCount} clamped to 100`); + validated.maxRetryCount = 100; + } + + return validated; +}; diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index b0d4e9570..b3cfc69ac 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -332,6 +332,24 @@ export interface EdgeFunctionSettings { version: string; } +export type RateLimitConfig = { + enabled: boolean; + maxRetryCount: number; + maxRetryInterval: number; + maxRateLimitDuration: number; +}; + +export type HttpConfig = { + rateLimitConfig?: RateLimitConfig; +}; + +export type UploadStateData = { + state: 'READY' | 'RATE_LIMITED'; + waitUntilTime: number; + globalRetryCount: number; + firstFailureTime: number | null; +}; + export type SegmentAPISettings = { integrations: SegmentAPIIntegrations; edgeFunction?: EdgeFunctionSettings; @@ -340,6 +358,7 @@ export type SegmentAPISettings = { }; metrics?: MetricsOptions; consentSettings?: SegmentAPIConsentSettings; + httpConfig?: HttpConfig; }; export type DestinationMetadata = {