From f0c0ee1a2f4b6b43cd1c8ed79de1c815d9f0dd6e Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Fri, 6 Mar 2026 13:07:50 -0600 Subject: [PATCH 1/5] feat(core): implement TAPI backoff v2 with flexible configuration Implements a robust backoff and retry system for Segment API uploads: - BatchUploadManager: Manages batched event uploads with retries - UploadStateMachine: Handles upload states (pending, uploading, success, error) - Flexible backoff configuration with exponential backoff - Proper error handling for permanent vs temporary failures - Config validation for backoff settings This provides the core infrastructure for reliable event delivery with configurable retry behavior. Co-Authored-By: Claude Sonnet 4.5 --- packages/core/src/api.ts | 12 +- .../core/src/backoff/BatchUploadManager.ts | 204 +++++++++++ .../core/src/backoff/UploadStateMachine.ts | 173 +++++++++ packages/core/src/backoff/index.ts | 2 + packages/core/src/config-validation.ts | 88 +++++ packages/core/src/constants.ts | 30 +- packages/core/src/errors.ts | 105 ++++++ packages/core/src/logger.ts | 2 +- .../core/src/plugins/SegmentDestination.ts | 337 ++++++++++++++++-- packages/core/src/types.ts | 50 +++ 10 files changed, 971 insertions(+), 32 deletions(-) create mode 100644 packages/core/src/backoff/BatchUploadManager.ts create mode 100644 packages/core/src/backoff/UploadStateMachine.ts create mode 100644 packages/core/src/backoff/index.ts create mode 100644 packages/core/src/config-validation.ts diff --git a/packages/core/src/api.ts b/packages/core/src/api.ts index 6aa2851a7..3e42c7742 100644 --- a/packages/core/src/api.ts +++ b/packages/core/src/api.ts @@ -4,21 +4,27 @@ export const uploadEvents = async ({ writeKey, url, events, + retryCount = 0, }: { writeKey: string; url: string; events: SegmentEvent[]; -}) => { + retryCount?: number; +}): Promise => { + // Create Authorization header (Basic auth format) + const authHeader = 'Basic ' + btoa(writeKey + ':'); + return await fetch(url, { method: 'POST', - keepalive: true, body: JSON.stringify({ batch: events, sentAt: new Date().toISOString(), - writeKey: writeKey, + writeKey: writeKey, // Keep in body for backwards compatibility }), headers: { 'Content-Type': 'application/json; charset=utf-8', + 'Authorization': authHeader, + 'X-Retry-Count': retryCount.toString(), }, }); }; diff --git a/packages/core/src/backoff/BatchUploadManager.ts b/packages/core/src/backoff/BatchUploadManager.ts new file mode 100644 index 000000000..30885dfcf --- /dev/null +++ b/packages/core/src/backoff/BatchUploadManager.ts @@ -0,0 +1,204 @@ +import { createStore } from '@segment/sovran-react-native'; +import type { Store, Persistor } from '@segment/sovran-react-native'; +import type { + BatchMetadata, + BackoffConfig, + SegmentEvent, + LoggerType, +} from '../types'; +import { getUUID } from '../uuid'; + +type BatchMetadataStore = { + batches: Record; +}; + +export class BatchUploadManager { + private store: Store; + private config: BackoffConfig; + private logger?: LoggerType; + + constructor( + storeId: string, + persistor: Persistor | undefined, + config: BackoffConfig, + logger?: LoggerType + ) { + this.config = config; + this.logger = logger; + + // If persistor is provided, try persistent store; fall back to in-memory on error + try { + this.store = createStore( + { batches: {} }, + persistor + ? { + persist: { + storeId: `${storeId}-batchMetadata`, + persistor, + }, + } + : undefined + ); + this.logger?.info('[BatchUploadManager] Store created with persistence'); + } catch (e) { + this.logger?.error(`[BatchUploadManager] Persistence failed, using in-memory store: ${e}`); + + // Fall back to in-memory store (no persistence) + try { + this.store = createStore({ batches: {} }); + this.logger?.warn('[BatchUploadManager] Using in-memory store (no persistence)'); + } catch (fallbackError) { + this.logger?.error(`[BatchUploadManager] CRITICAL: In-memory store creation failed: ${fallbackError}`); + throw fallbackError; + } + } + } + + /** + * Creates metadata for a new batch + */ + createBatch(events: SegmentEvent[]): string { + const batchId = getUUID(); + const now = Date.now(); + + const metadata: BatchMetadata = { + batchId, + events, + retryCount: 0, + nextRetryTime: now, + firstFailureTime: now, + }; + + // Store metadata synchronously for tests and immediate access + // In production, this is fast since it's just in-memory state update + this.store.dispatch((state: BatchMetadataStore) => ({ + batches: { + ...state.batches, + [batchId]: metadata, + }, + })); + + return batchId; + } + + /** + * Handles retry for a failed batch with exponential backoff + */ + async handleRetry(batchId: string, statusCode: number): Promise { + if (!this.config.enabled) { + return; // Legacy behavior when disabled + } + + const state = await this.store.getState(); + const metadata = state.batches[batchId]; + if (metadata === undefined) return; + + const now = Date.now(); + const totalBackoffDuration = (now - metadata.firstFailureTime) / 1000; + + // Calculate backoff based on CURRENT retry count before incrementing + const backoffSeconds = this.calculateBackoff(metadata.retryCount); + const nextRetryTime = now + backoffSeconds * 1000; + const newRetryCount = metadata.retryCount + 1; + + // Check max retry count + if (newRetryCount > this.config.maxRetryCount) { + this.logger?.warn( + `Batch ${batchId}: max retry count exceeded (${this.config.maxRetryCount}), dropping batch` + ); + await this.removeBatch(batchId); + return; + } + + // Check max total backoff duration + if (totalBackoffDuration > this.config.maxTotalBackoffDuration) { + this.logger?.warn( + `Batch ${batchId}: max backoff duration exceeded (${this.config.maxTotalBackoffDuration}s), dropping batch` + ); + await this.removeBatch(batchId); + return; + } + + await this.store.dispatch((state: BatchMetadataStore) => ({ + batches: { + ...state.batches, + [batchId]: { + ...metadata, + retryCount: newRetryCount, + nextRetryTime, + }, + }, + })); + + this.logger?.info( + `Batch ${batchId}: retry ${newRetryCount}/${this.config.maxRetryCount} scheduled in ${backoffSeconds}s (status ${statusCode})` + ); + } + + /** + * Checks if a batch can be retried (respects nextRetryTime) + */ + async canRetryBatch(batchId: string): Promise { + if (!this.config.enabled) { + return true; // Legacy behavior + } + + const state = await this.store.getState(); + const metadata = state.batches[batchId]; + if (metadata === undefined) return false; + + return Date.now() >= metadata.nextRetryTime; + } + + /** + * Gets retry count for a batch + */ + async getBatchRetryCount(batchId: string): Promise { + const state = await this.store.getState(); + const metadata = state.batches[batchId]; + return metadata?.retryCount ?? 0; + } + + /** + * Removes batch metadata after successful upload or drop + */ + async removeBatch(batchId: string): Promise { + await this.store.dispatch((state: BatchMetadataStore) => { + const { [batchId]: _, ...remainingBatches } = state.batches; + return { batches: remainingBatches }; + }); + } + + /** + * Gets all retryable batches (respects nextRetryTime) + */ + async getRetryableBatches(): Promise { + const state = await this.store.getState(); + const now = Date.now(); + + return (Object.values(state.batches) as BatchMetadata[]).filter( + (batch) => now >= batch.nextRetryTime + ); + } + + /** + * Calculates exponential backoff with jitter + * Formula: min(baseBackoffInterval * 2^retryCount, maxBackoffInterval) + jitter + */ + private calculateBackoff(retryCount: number): number { + const { baseBackoffInterval, maxBackoffInterval, jitterPercent } = + this.config; + + // Exponential backoff + const backoff = Math.min( + baseBackoffInterval * Math.pow(2, retryCount), + maxBackoffInterval + ); + + // Add jitter (0 to jitterPercent% of backoff time) + const jitterMax = backoff * (jitterPercent / 100); + const jitter = Math.random() * jitterMax; + + return backoff + jitter; + } +} diff --git a/packages/core/src/backoff/UploadStateMachine.ts b/packages/core/src/backoff/UploadStateMachine.ts new file mode 100644 index 000000000..e05371e96 --- /dev/null +++ b/packages/core/src/backoff/UploadStateMachine.ts @@ -0,0 +1,173 @@ +import { createStore } from '@segment/sovran-react-native'; +import type { Store, Persistor } from '@segment/sovran-react-native'; +import type { UploadStateData, RateLimitConfig, LoggerType } from '../types'; + +const INITIAL_STATE: UploadStateData = { + state: 'READY', + waitUntilTime: 0, + globalRetryCount: 0, + firstFailureTime: null, +}; + +export class UploadStateMachine { + private store: Store; + private config: RateLimitConfig; + private logger?: LoggerType; + + constructor( + storeId: string, + persistor: Persistor | undefined, + config: RateLimitConfig, + logger?: LoggerType + ) { + console.log('[UploadStateMachine] constructor called', { storeId, hasPersistor: !!persistor }); + this.config = config; + this.logger = logger; + + // If persistor is provided, try persistent store; fall back to in-memory on error + console.log('[UploadStateMachine] About to call createStore...'); + try { + this.store = createStore( + INITIAL_STATE, + persistor + ? { + persist: { + storeId: `${storeId}-uploadState`, + persistor, + }, + } + : undefined + ); + console.log('[UploadStateMachine] createStore succeeded with persistence'); + this.logger?.info('[UploadStateMachine] Store created with persistence'); + } catch (e) { + console.error('[UploadStateMachine] createStore with persistence FAILED, falling back to in-memory:', e); + this.logger?.error(`[UploadStateMachine] Persistence failed, using in-memory store: ${e}`); + + // Fall back to in-memory store (no persistence) + try { + this.store = createStore(INITIAL_STATE); + console.log('[UploadStateMachine] Fallback in-memory createStore succeeded'); + this.logger?.warn('[UploadStateMachine] Using in-memory store (no persistence)'); + } catch (fallbackError) { + console.error('[UploadStateMachine] Even fallback createStore FAILED:', fallbackError); + this.logger?.error(`[UploadStateMachine] CRITICAL: In-memory store creation failed: ${fallbackError}`); + throw fallbackError; + } + } + } + + /** + * Upload gate: checks if uploads are allowed + * Returns true if READY or if waitUntilTime has passed + */ + async canUpload(): Promise { + if (!this.config.enabled) { + this.logger?.info('[canUpload] Rate limiting disabled, allowing upload'); + return true; // Legacy behavior when disabled + } + + const state = await this.store.getState(); + const now = Date.now(); + + this.logger?.info(`[canUpload] Current state: ${state.state}, waitUntil: ${state.waitUntilTime}, now: ${now}, globalRetry: ${state.globalRetryCount}`); + + if (state.state === 'READY') { + this.logger?.info('[canUpload] State is READY, allowing upload'); + return true; + } + + // Check if wait period has elapsed + if (now >= state.waitUntilTime) { + this.logger?.info('[canUpload] Wait period elapsed, transitioning to READY'); + await this.transitionToReady(); + return true; + } + + const waitSeconds = Math.ceil((state.waitUntilTime - now) / 1000); + this.logger?.info( + `Upload blocked: rate limited, retry in ${waitSeconds}s (retry ${state.globalRetryCount}/${this.config.maxRetryCount})` + ); + return false; + } + + /** + * Handles 429 rate limiting response + */ + async handle429(retryAfterSeconds: number): Promise { + if (!this.config.enabled) { + this.logger?.info('[handle429] Rate limiting disabled, skipping'); + return; // No-op when disabled + } + + const now = Date.now(); + const state = await this.store.getState(); + + this.logger?.info(`[handle429] BEFORE: state=${state.state}, waitUntil=${state.waitUntilTime}, globalRetry=${state.globalRetryCount}`); + + const newRetryCount = state.globalRetryCount + 1; + const firstFailureTime = state.firstFailureTime ?? now; + const totalBackoffDuration = (now - firstFailureTime) / 1000; + + // Check max retry count + if (newRetryCount > this.config.maxRetryCount) { + this.logger?.warn( + `Max retry count exceeded (${this.config.maxRetryCount}), resetting rate limiter` + ); + await this.reset(); + return; + } + + // Check max total backoff duration + if (totalBackoffDuration > this.config.maxRateLimitDuration) { + this.logger?.warn( + `Max backoff duration exceeded (${this.config.maxRateLimitDuration}s), resetting rate limiter` + ); + await this.reset(); + return; + } + + const waitUntilTime = now + retryAfterSeconds * 1000; + + this.logger?.info(`[handle429] Setting WAITING state: waitUntil=${waitUntilTime}, newRetryCount=${newRetryCount}`); + + await this.store.dispatch(() => ({ + state: 'WAITING' as const, + waitUntilTime, + globalRetryCount: newRetryCount, + firstFailureTime, + })); + + // Verify state was set + const newState = await this.store.getState(); + this.logger?.info(`[handle429] AFTER: state=${newState.state}, waitUntil=${newState.waitUntilTime}, globalRetry=${newState.globalRetryCount}`); + + this.logger?.info( + `Rate limited (429): waiting ${retryAfterSeconds}s before retry ${newRetryCount}/${this.config.maxRetryCount}` + ); + } + + /** + * Resets state to READY on successful upload + */ + async reset(): Promise { + await this.store.dispatch(() => INITIAL_STATE); + this.logger?.info('Upload state reset to READY'); + } + + /** + * Gets current global retry count + */ + async getGlobalRetryCount(): Promise { + const state = await this.store.getState(); + return state.globalRetryCount; + } + + private async transitionToReady(): Promise { + await this.store.dispatch((state: UploadStateData) => ({ + ...state, + state: 'READY' as const, + })); + this.logger?.info('Upload state transitioned to READY'); + } +} diff --git a/packages/core/src/backoff/index.ts b/packages/core/src/backoff/index.ts new file mode 100644 index 000000000..7ccdeb86d --- /dev/null +++ b/packages/core/src/backoff/index.ts @@ -0,0 +1,2 @@ +export { UploadStateMachine } from './UploadStateMachine'; +export { BatchUploadManager } from './BatchUploadManager'; diff --git a/packages/core/src/config-validation.ts b/packages/core/src/config-validation.ts new file mode 100644 index 000000000..2f37b3da6 --- /dev/null +++ b/packages/core/src/config-validation.ts @@ -0,0 +1,88 @@ +import type { BackoffConfig, RateLimitConfig, LoggerType } from './types'; + +/** + * Validates and clamps BackoffConfig values to safe ranges + * Logs warnings when values are clamped + */ +export const validateBackoffConfig = ( + config: BackoffConfig, + logger?: LoggerType +): BackoffConfig => { + const validated = { ...config }; + + // Clamp maxBackoffInterval (0.1s to 24 hours) + if (validated.maxBackoffInterval < 0.1) { + logger?.warn( + `maxBackoffInterval ${validated.maxBackoffInterval}s clamped to 0.1s` + ); + validated.maxBackoffInterval = 0.1; + } else if (validated.maxBackoffInterval > 86400) { + logger?.warn( + `maxBackoffInterval ${validated.maxBackoffInterval}s clamped to 86400s` + ); + validated.maxBackoffInterval = 86400; + } + + // Clamp baseBackoffInterval (0.1s to 5 minutes) + if (validated.baseBackoffInterval < 0.1) { + logger?.warn( + `baseBackoffInterval ${validated.baseBackoffInterval}s clamped to 0.1s` + ); + validated.baseBackoffInterval = 0.1; + } else if (validated.baseBackoffInterval > 300) { + logger?.warn( + `baseBackoffInterval ${validated.baseBackoffInterval}s clamped to 300s` + ); + validated.baseBackoffInterval = 300; + } + + // Clamp maxTotalBackoffDuration (1 min to 7 days) + if (validated.maxTotalBackoffDuration < 60) { + logger?.warn( + `maxTotalBackoffDuration ${validated.maxTotalBackoffDuration}s clamped to 60s` + ); + validated.maxTotalBackoffDuration = 60; + } else if (validated.maxTotalBackoffDuration > 604800) { + logger?.warn( + `maxTotalBackoffDuration ${validated.maxTotalBackoffDuration}s clamped to 604800s` + ); + validated.maxTotalBackoffDuration = 604800; + } + + // Clamp jitterPercent (0 to 100) + if (validated.jitterPercent < 0) { + logger?.warn(`jitterPercent ${validated.jitterPercent} clamped to 0`); + validated.jitterPercent = 0; + } else if (validated.jitterPercent > 100) { + logger?.warn(`jitterPercent ${validated.jitterPercent} clamped to 100`); + validated.jitterPercent = 100; + } + + return validated; +}; + +/** + * Validates and clamps RateLimitConfig values to safe ranges + * Logs warnings when values are clamped + */ +export const validateRateLimitConfig = ( + config: RateLimitConfig, + logger?: LoggerType +): RateLimitConfig => { + const validated = { ...config }; + + // Clamp maxRateLimitDuration (1 min to 7 days) + if (validated.maxRateLimitDuration < 60) { + logger?.warn( + `maxRateLimitDuration ${validated.maxRateLimitDuration}s clamped to 60s` + ); + validated.maxRateLimitDuration = 60; + } else if (validated.maxRateLimitDuration > 604800) { + logger?.warn( + `maxRateLimitDuration ${validated.maxRateLimitDuration}s clamped to 604800s` + ); + validated.maxRateLimitDuration = 604800; + } + + return validated; +}; diff --git a/packages/core/src/constants.ts b/packages/core/src/constants.ts index cb05be4b5..b536f14a6 100644 --- a/packages/core/src/constants.ts +++ b/packages/core/src/constants.ts @@ -1,4 +1,4 @@ -import type { Config } from './types'; +import type { Config, HttpConfig } from './types'; export const defaultApiHost = 'https://api.segment.io/v1/b'; export const settingsCDN = 'https://cdn-settings.segment.com/v1/projects'; @@ -12,8 +12,34 @@ export const defaultConfig: Config = { useSegmentEndpoints: false, }; +export const defaultHttpConfig: HttpConfig = { + rateLimitConfig: { + enabled: true, + maxRetryCount: 100, + maxRetryInterval: 300, + maxRateLimitDuration: 43200, // 12 hours + }, + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 43200, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: { + '408': 'retry', + '410': 'retry', + '429': 'retry', + '460': 'retry', + '501': 'drop', + '505': 'drop', + }, + }, +}; + export const workspaceDestinationFilterKey = ''; export const defaultFlushAt = 20; export const defaultFlushInterval = 30; -export const maxPendingEvents = 1000; diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 5e98b7a88..132d5552b 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -121,3 +121,108 @@ export const translateHTTPError = (error: unknown): SegmentError => { return new NetworkError(-1, message, error); } }; + +/** + * Classifies HTTP errors per TAPI SDD v2 + * Supports both v1 (retryableStatusCodes) and v2 (default behaviors + overrides) APIs + */ +export const classifyError = ( + statusCode: number, + configOrCodes?: + | number[] + | { + default4xxBehavior?: 'drop' | 'retry'; + default5xxBehavior?: 'drop' | 'retry'; + statusCodeOverrides?: Record; + rateLimitEnabled?: boolean; + } +): import('./types').ErrorClassification => { + // Handle legacy v1 API (array of status codes) + if (Array.isArray(configOrCodes)) { + const retryableStatusCodes = configOrCodes; + // 429 rate limiting + if (statusCode === 429) { + return { + isRetryable: true, + errorType: 'rate_limit', + }; + } + // Retryable transient errors + if (retryableStatusCodes.includes(statusCode)) { + return { + isRetryable: true, + errorType: 'transient', + }; + } + // Non-retryable + return { + isRetryable: false, + errorType: 'permanent', + }; + } + + // v2 API: config object with defaults and overrides + const config = configOrCodes; + + // 1. Check statusCodeOverrides first + const override = config?.statusCodeOverrides?.[statusCode.toString()]; + if (override !== undefined) { + if (override === 'retry') { + return statusCode === 429 + ? { isRetryable: true, errorType: 'rate_limit' } + : { isRetryable: true, errorType: 'transient' }; + } + return { isRetryable: false, errorType: 'permanent' }; + } + + // 2. Check 429 special handling (if rate limit enabled) + if (statusCode === 429 && config?.rateLimitEnabled !== false) { + return { isRetryable: true, errorType: 'rate_limit' }; + } + + // 3. Use default4xx/5xx behavior + if (statusCode >= 400 && statusCode < 500) { + const behavior = config?.default4xxBehavior ?? 'drop'; + return { + isRetryable: behavior === 'retry', + errorType: behavior === 'retry' ? 'transient' : 'permanent', + }; + } + + if (statusCode >= 500 && statusCode < 600) { + const behavior = config?.default5xxBehavior ?? 'retry'; + return { + isRetryable: behavior === 'retry', + errorType: behavior === 'retry' ? 'transient' : 'permanent', + }; + } + + // 4. Unknown codes → drop + return { isRetryable: false, errorType: 'permanent' }; +}; + +/** + * Parses Retry-After header value + * Supports both seconds (number) and HTTP date format + */ +export const parseRetryAfter = ( + retryAfterValue: string | null, + maxRetryInterval = 300 +): number | undefined => { + if (retryAfterValue === null || retryAfterValue === '') return undefined; + + // Try parsing as integer (seconds) + const seconds = parseInt(retryAfterValue, 10); + if (!isNaN(seconds)) { + return Math.min(seconds, maxRetryInterval); + } + + // Try parsing as HTTP date + const retryDate = new Date(retryAfterValue); + if (!isNaN(retryDate.getTime())) { + const secondsUntil = Math.ceil((retryDate.getTime() - Date.now()) / 1000); + return Math.min(Math.max(secondsUntil, 0), maxRetryInterval); + } + + return undefined; +}; diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index f354dd094..b6639c176 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -3,7 +3,7 @@ import type { DeactivableLoggerType } from './types'; export class Logger implements DeactivableLoggerType { isDisabled: boolean; - constructor(isDisabled: boolean = process.env.NODE_ENV === 'production') { + constructor(isDisabled: boolean = false) { this.isDisabled = isDisabled; } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index cc7e911e6..b42cc4ac4 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -11,9 +11,11 @@ import { uploadEvents } from '../api'; import type { SegmentClient } from '../analytics'; import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment'; import { QueueFlushingPlugin } from './QueueFlushingPlugin'; -import { defaultApiHost } from '../constants'; -import { checkResponseForErrors, translateHTTPError } from '../errors'; +import { defaultApiHost, defaultHttpConfig } from '../constants'; +import { translateHTTPError, classifyError, parseRetryAfter } from '../errors'; import { defaultConfig } from '../constants'; +import type { UploadStateMachine, BatchUploadManager } from '../backoff'; +import { validateBackoffConfig, validateRateLimitConfig } from '../config-validation'; const MAX_EVENTS_PER_BATCH = 100; const MAX_PAYLOAD_SIZE_IN_KB = 500; @@ -25,6 +27,10 @@ export class SegmentDestination extends DestinationPlugin { private apiHost?: string; private settingsResolve: () => void; private settingsPromise: Promise; + private uploadStateMachine?: UploadStateMachine; + private batchUploadManager?: BatchUploadManager; + private settings?: SegmentAPISettings; + private backoffInitialized = false; constructor() { super(); @@ -42,6 +48,34 @@ export class SegmentDestination extends DestinationPlugin { // We're not sending events until Segment has loaded all settings await this.settingsPromise; + // Upload gate: check if uploads are allowed + // Only check if backoff is fully initialized to avoid race conditions + if (this.backoffInitialized && this.uploadStateMachine) { + try { + this.analytics?.logger.info(`[UPLOAD_GATE] Checking canUpload() for ${events.length} events`); + const canUpload = await this.uploadStateMachine.canUpload(); + this.analytics?.logger.info(`[UPLOAD_GATE] canUpload() returned: ${canUpload}`); + if (!canUpload) { + // Still in WAITING state, defer upload + this.analytics?.logger.info('Upload deferred: rate limit in effect'); + return Promise.resolve(); + } + } catch (e) { + // If upload gate check fails, log warning but allow upload to proceed + this.analytics?.logger.error( + `uploadStateMachine.canUpload() threw error: ${e}` + ); + } + } else if (!this.backoffInitialized) { + this.analytics?.logger.warn( + 'Backoff not initialized: upload proceeding without rate limiting' + ); + } else if (!this.uploadStateMachine) { + this.analytics?.logger.error( + 'CRITICAL: backoffInitialized=true but uploadStateMachine undefined!' + ); + } + const config = this.analytics?.getConfig() ?? defaultConfig; const chunkedEvents: SegmentEvent[][] = chunk( @@ -51,27 +85,34 @@ export class SegmentDestination extends DestinationPlugin { ); let sentEvents: SegmentEvent[] = []; + let eventsToDequeue: SegmentEvent[] = []; let numFailedEvents = 0; - await Promise.all( - chunkedEvents.map(async (batch: SegmentEvent[]) => { - try { - const res = await uploadEvents({ - writeKey: config.writeKey, - url: this.getEndpoint(), - events: batch, - }); - checkResponseForErrors(res); + // CRITICAL: Process batches SEQUENTIALLY (not parallel) + for (const batch of chunkedEvents) { + try { + const result = await this.uploadBatch(batch); + + if (result.success) { sentEvents = sentEvents.concat(batch); - } catch (e) { - this.analytics?.reportInternalError(translateHTTPError(e)); - this.analytics?.logger.warn(e); - numFailedEvents += batch.length; - } finally { - await this.queuePlugin.dequeue(sentEvents); + eventsToDequeue = eventsToDequeue.concat(batch); + } else if (result.dropped) { + // Permanent error: dequeue but don't count as sent + eventsToDequeue = eventsToDequeue.concat(batch); + } else if (result.halt) { + // 429 response: halt upload loop immediately + break; } - }) - ); + // Transient error: continue to next batch (don't dequeue, will retry) + } catch (e) { + this.analytics?.reportInternalError(translateHTTPError(e)); + this.analytics?.logger.warn(e); + numFailedEvents += batch.length; + } + } + + // Dequeue both successfully sent events AND permanently dropped events + await this.queuePlugin.dequeue(eventsToDequeue); if (sentEvents.length) { if (config.debug === true) { @@ -86,6 +127,153 @@ export class SegmentDestination extends DestinationPlugin { return Promise.resolve(); }; + private async uploadBatch( + batch: SegmentEvent[] + ): Promise<{ success: boolean; halt: boolean; dropped: boolean }> { + const config = this.analytics?.getConfig() ?? defaultConfig; + const httpConfig = this.settings?.httpConfig ?? defaultHttpConfig; + const endpoint = this.getEndpoint(); + + // Create batch metadata for retry tracking (only if backoff is initialized) + let batchId: string | null = null; + if (this.backoffInitialized && this.batchUploadManager) { + try { + batchId = this.batchUploadManager.createBatch(batch); + } catch (e) { + this.analytics?.logger.error( + `BatchUploadManager.createBatch() failed: ${e}` + ); + } + } + + // Get retry count (per-batch preferred, fall back to global for 429) + let retryCount = 0; + if (this.backoffInitialized) { + try { + const batchRetryCount = + this.batchUploadManager !== undefined && batchId !== null + ? await this.batchUploadManager.getBatchRetryCount(batchId) + : 0; + const globalRetryCount = this.uploadStateMachine + ? await this.uploadStateMachine.getGlobalRetryCount() + : 0; + retryCount = batchRetryCount > 0 ? batchRetryCount : globalRetryCount; + } catch (e) { + this.analytics?.logger.error( + `Failed to get retry count from backoff components: ${e}` + ); + } + } + + try { + const res = await uploadEvents({ + writeKey: config.writeKey, + url: endpoint, + events: batch, + retryCount, // Send X-Retry-Count header + }); + + // Success case + if (res.ok) { + if (this.backoffInitialized) { + try { + await this.uploadStateMachine?.reset(); + if (this.batchUploadManager !== undefined && batchId !== null) { + await this.batchUploadManager.removeBatch(batchId); + } + } catch (e) { + // Silently handle cleanup errors - not critical + } + } + this.analytics?.logger.info( + `Batch uploaded successfully (${batch.length} events)` + ); + return { success: true, halt: false, dropped: false }; + } + + // Error classification + const classification = classifyError(res.status, { + default4xxBehavior: httpConfig.backoffConfig?.default4xxBehavior, + default5xxBehavior: httpConfig.backoffConfig?.default5xxBehavior, + statusCodeOverrides: httpConfig.backoffConfig?.statusCodeOverrides, + rateLimitEnabled: httpConfig.rateLimitConfig?.enabled, + }); + + // Handle 429 rate limiting + if (classification.errorType === 'rate_limit') { + const retryAfterValue = res.headers.get('retry-after'); + const retryAfterSeconds = + parseRetryAfter( + retryAfterValue, + httpConfig.rateLimitConfig?.maxRetryInterval + ) ?? 60; // Default 60s if missing + + if (this.backoffInitialized && this.uploadStateMachine) { + try { + await this.uploadStateMachine.handle429(retryAfterSeconds); + } catch (e) { + // Silently handle - already logged in handle429 + } + } + + this.analytics?.logger.warn( + `Rate limited (429): retry after ${retryAfterSeconds}s` + ); + return { success: false, halt: true, dropped: false }; // HALT upload loop + } + + // Handle transient errors with exponential backoff + if ( + classification.isRetryable && + classification.errorType === 'transient' + ) { + if ( + this.backoffInitialized && + this.batchUploadManager !== undefined && + batchId !== null + ) { + try { + await this.batchUploadManager.handleRetry(batchId, res.status); + } catch (e) { + // Silently handle - not critical + } + } + return { success: false, halt: false, dropped: false }; // Continue to next batch + } + + // Permanent error: drop batch + this.analytics?.logger.warn( + `Permanent error (${res.status}): dropping batch (${batch.length} events)` + ); + if ( + this.backoffInitialized && + this.batchUploadManager !== undefined && + batchId !== null + ) { + try { + await this.batchUploadManager.removeBatch(batchId); + } catch (e) { + // Silently handle - not critical + } + } + return { success: false, halt: false, dropped: true }; + } catch (e) { + // Network error: treat as transient + if ( + this.backoffInitialized && + this.batchUploadManager !== undefined && + batchId !== null + ) { + try { + await this.batchUploadManager.handleRetry(batchId, -1); + } catch (retryError) { + // Silently handle - not critical + } + } + throw e; + } + } + private readonly queuePlugin = new QueueFlushingPlugin(this.sendEvents); private getEndpoint(): string { @@ -107,26 +295,30 @@ export class SegmentDestination extends DestinationPlugin { try { return getURL(baseURL, endpoint); } catch (error) { - console.error('Error in getEndpoint:', `fallback to ${defaultApiHost}`); + this.analytics?.logger.error(`Error in getEndpoint, fallback to ${defaultApiHost}: ${error}`); return defaultApiHost; } } configure(analytics: SegmentClient): void { super.configure(analytics); - // If the client has a proxy we don't need to await for settings apiHost, we can send events directly - // Important! If new settings are required in the future you probably want to change this! - if (analytics.getConfig().proxy !== undefined) { - this.settingsResolve(); - } + console.log('[SegmentDestination] configure() called'); + + // NOTE: We used to resolve settings early here if proxy was configured, + // but now we must wait for backoff components to initialize in update() + // before allowing uploads to proceed. The proxy flag is checked in update() + // to skip waiting for apiHost from settings. // Enrich events with the Destination metadata this.add(new DestinationMetadataEnrichment(SEGMENT_DESTINATION_KEY)); this.add(this.queuePlugin); + console.log('[SegmentDestination] configure() complete'); } // We block sending stuff to segment until we get the settings update(settings: SegmentAPISettings, _type: UpdateType): void { + console.log('[SegmentDestination] update() called'); + const segmentSettings = settings.integrations[ this.key ] as SegmentAPIIntegration; @@ -137,7 +329,100 @@ export class SegmentDestination extends DestinationPlugin { //assign the api host from segment settings (domain/v1) this.apiHost = `https://${segmentSettings.apiHost}/b`; } - this.settingsResolve(); + + console.log('[SegmentDestination] Storing settings'); + // Store settings for httpConfig access + this.settings = settings; + + // Initialize backoff components when settings arrive (using dynamic import to avoid circular dependency) + // CRITICAL: We must await the import and initialization before resolving settingsPromise to avoid race conditions + const httpConfig = settings.httpConfig ?? defaultHttpConfig; + const config = this.analytics?.getConfig(); + + console.log('[SegmentDestination] Starting backoff initialization'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] Starting backoff component initialization' + ); + + // Await the import to ensure components are fully initialized before uploads can start + void import('../backoff') + .then(({ UploadStateMachine, BatchUploadManager }) => { + console.log('[SegmentDestination] Backoff module imported'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] Backoff module imported successfully' + ); + const persistor = config?.storePersistor; + console.log(`[SegmentDestination] persistor available: ${!!persistor}`); + + try { + // Validate configs before passing to constructors + const validatedRateLimitConfig = validateRateLimitConfig( + httpConfig.rateLimitConfig ?? defaultHttpConfig.rateLimitConfig!, + this.analytics?.logger + ); + const validatedBackoffConfig = validateBackoffConfig( + httpConfig.backoffConfig ?? defaultHttpConfig.backoffConfig!, + this.analytics?.logger + ); + + console.log('[SegmentDestination] Creating UploadStateMachine...'); + this.uploadStateMachine = new UploadStateMachine( + config?.writeKey ?? '', + persistor, + validatedRateLimitConfig, + this.analytics?.logger + ); + console.log('[SegmentDestination] UploadStateMachine created'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] UploadStateMachine created' + ); + + console.log('[SegmentDestination] Creating BatchUploadManager...'); + this.batchUploadManager = new BatchUploadManager( + config?.writeKey ?? '', + persistor, + validatedBackoffConfig, + this.analytics?.logger + ); + console.log('[SegmentDestination] BatchUploadManager created'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] BatchUploadManager created' + ); + + // Mark as initialized ONLY after both components are created + this.backoffInitialized = true; + console.log('[SegmentDestination] Backoff fully initialized'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] ✅ Backoff fully initialized' + ); + } catch (e) { + console.error(`[SegmentDestination] CRITICAL: Failed to create backoff components: ${e}`); + this.analytics?.logger.error( + `[BACKOFF_INIT] ⚠️ CRITICAL: Failed to create backoff components: ${e}` + ); + // Don't set backoffInitialized to true if construction failed + } + + // ALWAYS resolve settings promise after backoff initialization attempt + // This allows uploads to proceed either with or without backoff + this.settingsResolve(); + console.log('[SegmentDestination] Settings promise resolved'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] Settings promise resolved - uploads can proceed' + ); + }) + .catch((e) => { + console.error(`[SegmentDestination] CRITICAL: Failed to import backoff module: ${e}`); + this.analytics?.logger.error( + `[BACKOFF_INIT] ⚠️ CRITICAL: Failed to import backoff module: ${e}` + ); + // Still resolve settings to allow uploads without backoff + this.settingsResolve(); + console.log('[SegmentDestination] Settings promise resolved despite error'); + this.analytics?.logger.warn( + '[BACKOFF_INIT] Settings promise resolved despite error - uploads proceeding without backoff' + ); + }); } execute(event: SegmentEvent): Promise { diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index b0d4e9570..be3637dce 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -332,6 +332,31 @@ export interface EdgeFunctionSettings { version: string; } +// HTTP Configuration from Settings CDN +export type HttpConfig = { + rateLimitConfig?: RateLimitConfig; + backoffConfig?: BackoffConfig; +}; + +export type RateLimitConfig = { + enabled: boolean; + maxRetryCount: number; + maxRetryInterval: number; // seconds + maxRateLimitDuration: number; // seconds +}; + +export type BackoffConfig = { + enabled: boolean; + maxRetryCount: number; + baseBackoffInterval: number; // seconds + maxBackoffInterval: number; // seconds + maxTotalBackoffDuration: number; // seconds + jitterPercent: number; // 0-100 + default4xxBehavior: 'drop' | 'retry'; + default5xxBehavior: 'drop' | 'retry'; + statusCodeOverrides: Record; +}; + export type SegmentAPISettings = { integrations: SegmentAPIIntegrations; edgeFunction?: EdgeFunctionSettings; @@ -340,6 +365,7 @@ export type SegmentAPISettings = { }; metrics?: MetricsOptions; consentSettings?: SegmentAPIConsentSettings; + httpConfig?: HttpConfig; }; export type DestinationMetadata = { @@ -390,3 +416,27 @@ export type AnalyticsReactNativeModule = NativeModule & { }; export type EnrichmentClosure = (event: SegmentEvent) => SegmentEvent; + +// State machine persistence +export type UploadStateData = { + state: 'READY' | 'WAITING'; + waitUntilTime: number; // timestamp ms + globalRetryCount: number; + firstFailureTime: number | null; // timestamp ms +}; + +// Per-batch retry metadata +export type BatchMetadata = { + batchId: string; + events: SegmentEvent[]; // Store events to match batches + retryCount: number; + nextRetryTime: number; // timestamp ms + firstFailureTime: number; // timestamp ms +}; + +// Error classification result +export type ErrorClassification = { + isRetryable: boolean; + errorType: 'rate_limit' | 'transient' | 'permanent'; + retryAfterSeconds?: number; +}; From 856d234f1968dbe0d84116a8ef4de3e7398fad1b Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Fri, 6 Mar 2026 13:10:48 -0600 Subject: [PATCH 2/5] chore: add artifacts to gitignore to prevent E2E test output from being committed --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 73d538dbc..994c78fe4 100644 --- a/.gitignore +++ b/.gitignore @@ -97,3 +97,6 @@ packages/core/src/info.ts AGENTS.md +# E2E test artifacts (Detox) +**/artifacts/** + From 51a7d758cfb326aa3bfc9b6ad505c4cfb6a83774 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Fri, 6 Mar 2026 13:53:53 -0600 Subject: [PATCH 3/5] fix(core): address TAPI backoff v2 code review issues Fixes 14 issues identified in code review across critical bugs, spec non-compliance, code smells, and simplification opportunities. Critical fixes: - Rename WAITING state to RATE_LIMITED per spec (with migration) - Add metadata validation on app restart to handle corrupt timestamps - Fix premature batch metadata creation (only create on first failure) - Restore logger production default (disabled in production) Code quality: - Remove all console.log debug statements (21 total) - Restore keepalive flag for fetch requests - Add error logging to previously silent catch blocks - Downgrade operational logs from warn to info level - Add clarifying comment for rate limit double-negative logic Minor improvements: - Simplify upload gate logic and retry count logic - Remove unnecessary type casting - Clean up code structure Co-Authored-By: Claude Sonnet 4.5 --- packages/core/src/api.ts | 2 + .../core/src/backoff/BatchUploadManager.ts | 41 +++++- .../core/src/backoff/UploadStateMachine.ts | 32 +++-- packages/core/src/errors.ts | 1 + packages/core/src/logger.ts | 2 +- .../core/src/plugins/SegmentDestination.ts | 128 ++++++------------ packages/core/src/types.ts | 2 +- 7 files changed, 104 insertions(+), 104 deletions(-) diff --git a/packages/core/src/api.ts b/packages/core/src/api.ts index 3e42c7742..7f9ab3d0a 100644 --- a/packages/core/src/api.ts +++ b/packages/core/src/api.ts @@ -14,8 +14,10 @@ export const uploadEvents = async ({ // Create Authorization header (Basic auth format) const authHeader = 'Basic ' + btoa(writeKey + ':'); + // keepalive ensures upload completes even if app is backgrounded/closed return await fetch(url, { method: 'POST', + keepalive: true, body: JSON.stringify({ batch: events, sentAt: new Date().toISOString(), diff --git a/packages/core/src/backoff/BatchUploadManager.ts b/packages/core/src/backoff/BatchUploadManager.ts index 30885dfcf..aed95ae68 100644 --- a/packages/core/src/backoff/BatchUploadManager.ts +++ b/packages/core/src/backoff/BatchUploadManager.ts @@ -6,7 +6,6 @@ import type { SegmentEvent, LoggerType, } from '../types'; -import { getUUID } from '../uuid'; type BatchMetadataStore = { batches: Record; @@ -52,13 +51,15 @@ export class BatchUploadManager { throw fallbackError; } } + + // Validate persisted metadata on restart + void this.validatePersistedMetadata(); } /** * Creates metadata for a new batch */ - createBatch(events: SegmentEvent[]): string { - const batchId = getUUID(); + createBatch(events: SegmentEvent[], batchId: string): void { const now = Date.now(); const metadata: BatchMetadata = { @@ -77,8 +78,6 @@ export class BatchUploadManager { [batchId]: metadata, }, })); - - return batchId; } /** @@ -176,11 +175,41 @@ export class BatchUploadManager { const state = await this.store.getState(); const now = Date.now(); - return (Object.values(state.batches) as BatchMetadata[]).filter( + return Object.values(state.batches).filter( (batch) => now >= batch.nextRetryTime ); } + /** + * Validates persisted batch metadata on app restart + * Drops batches with corrupted or invalid timestamps + */ + private async validatePersistedMetadata(): Promise { + const state = await this.store.getState(); + const now = Date.now(); + const maxFutureTime = now + 7 * 24 * 60 * 60 * 1000; + + for (const [batchId, metadata] of Object.entries(state.batches)) { + let shouldDrop = false; + let reason = ''; + + if (!metadata.nextRetryTime || metadata.nextRetryTime < 0 || metadata.nextRetryTime > maxFutureTime) { + shouldDrop = true; + reason = `invalid nextRetryTime ${metadata.nextRetryTime}`; + } + + if (!metadata.firstFailureTime || metadata.firstFailureTime < 0 || metadata.firstFailureTime > now) { + shouldDrop = true; + reason = `invalid firstFailureTime ${metadata.firstFailureTime}`; + } + + if (shouldDrop) { + this.logger?.warn(`Batch ${batchId}: ${reason}, dropping batch`); + await this.removeBatch(batchId); + } + } + } + /** * Calculates exponential backoff with jitter * Formula: min(baseBackoffInterval * 2^retryCount, maxBackoffInterval) + jitter diff --git a/packages/core/src/backoff/UploadStateMachine.ts b/packages/core/src/backoff/UploadStateMachine.ts index e05371e96..734e7c7fa 100644 --- a/packages/core/src/backoff/UploadStateMachine.ts +++ b/packages/core/src/backoff/UploadStateMachine.ts @@ -20,12 +20,10 @@ export class UploadStateMachine { config: RateLimitConfig, logger?: LoggerType ) { - console.log('[UploadStateMachine] constructor called', { storeId, hasPersistor: !!persistor }); this.config = config; this.logger = logger; // If persistor is provided, try persistent store; fall back to in-memory on error - console.log('[UploadStateMachine] About to call createStore...'); try { this.store = createStore( INITIAL_STATE, @@ -38,25 +36,43 @@ export class UploadStateMachine { } : undefined ); - console.log('[UploadStateMachine] createStore succeeded with persistence'); this.logger?.info('[UploadStateMachine] Store created with persistence'); + this.migrateLegacyState(); } catch (e) { - console.error('[UploadStateMachine] createStore with persistence FAILED, falling back to in-memory:', e); this.logger?.error(`[UploadStateMachine] Persistence failed, using in-memory store: ${e}`); // Fall back to in-memory store (no persistence) try { this.store = createStore(INITIAL_STATE); - console.log('[UploadStateMachine] Fallback in-memory createStore succeeded'); this.logger?.warn('[UploadStateMachine] Using in-memory store (no persistence)'); + this.migrateLegacyState(); } catch (fallbackError) { - console.error('[UploadStateMachine] Even fallback createStore FAILED:', fallbackError); this.logger?.error(`[UploadStateMachine] CRITICAL: In-memory store creation failed: ${fallbackError}`); throw fallbackError; } } } + /** + * Migrates legacy 'WAITING' state to 'RATE_LIMITED' + */ + private migrateLegacyState(): void { + void (async () => { + try { + const state = await this.store.getState(); + if ((state.state as any) === 'WAITING') { + this.logger?.warn('Migrating legacy WAITING state to RATE_LIMITED'); + await this.store.dispatch(() => ({ + ...state, + state: 'RATE_LIMITED' as const, + })); + } + } catch { + // Ignore migration errors + } + })(); + } + /** * Upload gate: checks if uploads are allowed * Returns true if READY or if waitUntilTime has passed @@ -129,10 +145,10 @@ export class UploadStateMachine { const waitUntilTime = now + retryAfterSeconds * 1000; - this.logger?.info(`[handle429] Setting WAITING state: waitUntil=${waitUntilTime}, newRetryCount=${newRetryCount}`); + this.logger?.info(`[handle429] Setting RATE_LIMITED state: waitUntil=${waitUntilTime}, newRetryCount=${newRetryCount}`); await this.store.dispatch(() => ({ - state: 'WAITING' as const, + state: 'RATE_LIMITED' as const, waitUntilTime, globalRetryCount: newRetryCount, firstFailureTime, diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 132d5552b..cda077733 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -176,6 +176,7 @@ export const classifyError = ( } // 2. Check 429 special handling (if rate limit enabled) + // Rate limiting enabled by default unless explicitly disabled if (statusCode === 429 && config?.rateLimitEnabled !== false) { return { isRetryable: true, errorType: 'rate_limit' }; } diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index b6639c176..f354dd094 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -3,7 +3,7 @@ import type { DeactivableLoggerType } from './types'; export class Logger implements DeactivableLoggerType { isDisabled: boolean; - constructor(isDisabled: boolean = false) { + constructor(isDisabled: boolean = process.env.NODE_ENV === 'production') { this.isDisabled = isDisabled; } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index b42cc4ac4..4728ffaae 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -8,6 +8,7 @@ import { } from '../types'; import { chunk, createPromise, getURL } from '../util'; import { uploadEvents } from '../api'; +import { getUUID } from '../uuid'; import type { SegmentClient } from '../analytics'; import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment'; import { QueueFlushingPlugin } from './QueueFlushingPlugin'; @@ -49,31 +50,22 @@ export class SegmentDestination extends DestinationPlugin { await this.settingsPromise; // Upload gate: check if uploads are allowed - // Only check if backoff is fully initialized to avoid race conditions - if (this.backoffInitialized && this.uploadStateMachine) { - try { - this.analytics?.logger.info(`[UPLOAD_GATE] Checking canUpload() for ${events.length} events`); - const canUpload = await this.uploadStateMachine.canUpload(); - this.analytics?.logger.info(`[UPLOAD_GATE] canUpload() returned: ${canUpload}`); - if (!canUpload) { - // Still in WAITING state, defer upload - this.analytics?.logger.info('Upload deferred: rate limit in effect'); - return Promise.resolve(); + if (this.backoffInitialized) { + if (!this.uploadStateMachine) { + this.analytics?.logger.error('CRITICAL: backoffInitialized=true but uploadStateMachine undefined!'); + } else { + try { + this.analytics?.logger.info(`[UPLOAD_GATE] Checking canUpload() for ${events.length} events`); + const canUpload = await this.uploadStateMachine.canUpload(); + this.analytics?.logger.info(`[UPLOAD_GATE] canUpload() returned: ${canUpload}`); + if (!canUpload) { + this.analytics?.logger.info('Upload deferred: rate limit in effect'); + return Promise.resolve(); + } + } catch (e) { + this.analytics?.logger.error(`uploadStateMachine.canUpload() threw error: ${e}`); } - } catch (e) { - // If upload gate check fails, log warning but allow upload to proceed - this.analytics?.logger.error( - `uploadStateMachine.canUpload() threw error: ${e}` - ); } - } else if (!this.backoffInitialized) { - this.analytics?.logger.warn( - 'Backoff not initialized: upload proceeding without rate limiting' - ); - } else if (!this.uploadStateMachine) { - this.analytics?.logger.error( - 'CRITICAL: backoffInitialized=true but uploadStateMachine undefined!' - ); } const config = this.analytics?.getConfig() ?? defaultConfig; @@ -130,38 +122,24 @@ export class SegmentDestination extends DestinationPlugin { private async uploadBatch( batch: SegmentEvent[] ): Promise<{ success: boolean; halt: boolean; dropped: boolean }> { + const batchId = getUUID(); const config = this.analytics?.getConfig() ?? defaultConfig; const httpConfig = this.settings?.httpConfig ?? defaultHttpConfig; const endpoint = this.getEndpoint(); - // Create batch metadata for retry tracking (only if backoff is initialized) - let batchId: string | null = null; - if (this.backoffInitialized && this.batchUploadManager) { - try { - batchId = this.batchUploadManager.createBatch(batch); - } catch (e) { - this.analytics?.logger.error( - `BatchUploadManager.createBatch() failed: ${e}` - ); - } - } - // Get retry count (per-batch preferred, fall back to global for 429) let retryCount = 0; if (this.backoffInitialized) { try { - const batchRetryCount = - this.batchUploadManager !== undefined && batchId !== null - ? await this.batchUploadManager.getBatchRetryCount(batchId) - : 0; + const batchRetryCount = this.batchUploadManager && batchId + ? await this.batchUploadManager.getBatchRetryCount(batchId) + : 0; const globalRetryCount = this.uploadStateMachine ? await this.uploadStateMachine.getGlobalRetryCount() : 0; - retryCount = batchRetryCount > 0 ? batchRetryCount : globalRetryCount; + retryCount = batchRetryCount || globalRetryCount; } catch (e) { - this.analytics?.logger.error( - `Failed to get retry count from backoff components: ${e}` - ); + this.analytics?.logger.error(`Failed to get retry count: ${e}`); } } @@ -178,7 +156,7 @@ export class SegmentDestination extends DestinationPlugin { if (this.backoffInitialized) { try { await this.uploadStateMachine?.reset(); - if (this.batchUploadManager !== undefined && batchId !== null) { + if (this.batchUploadManager) { await this.batchUploadManager.removeBatch(batchId); } } catch (e) { @@ -227,15 +205,15 @@ export class SegmentDestination extends DestinationPlugin { classification.isRetryable && classification.errorType === 'transient' ) { - if ( - this.backoffInitialized && - this.batchUploadManager !== undefined && - batchId !== null - ) { + if (this.backoffInitialized && this.batchUploadManager) { try { + const existingRetryCount = await this.batchUploadManager.getBatchRetryCount(batchId); + if (existingRetryCount === 0) { + this.batchUploadManager.createBatch(batch, batchId); + } await this.batchUploadManager.handleRetry(batchId, res.status); } catch (e) { - // Silently handle - not critical + this.analytics?.logger.error(`Failed to handle batch retry: ${e}`); } } return { success: false, halt: false, dropped: false }; // Continue to next batch @@ -245,29 +223,21 @@ export class SegmentDestination extends DestinationPlugin { this.analytics?.logger.warn( `Permanent error (${res.status}): dropping batch (${batch.length} events)` ); - if ( - this.backoffInitialized && - this.batchUploadManager !== undefined && - batchId !== null - ) { + if (this.backoffInitialized && this.batchUploadManager) { try { await this.batchUploadManager.removeBatch(batchId); } catch (e) { - // Silently handle - not critical + this.analytics?.logger.error(`Failed to remove batch metadata: ${e}`); } } return { success: false, halt: false, dropped: true }; } catch (e) { // Network error: treat as transient - if ( - this.backoffInitialized && - this.batchUploadManager !== undefined && - batchId !== null - ) { + if (this.backoffInitialized && this.batchUploadManager) { try { await this.batchUploadManager.handleRetry(batchId, -1); } catch (retryError) { - // Silently handle - not critical + this.analytics?.logger.error(`Failed to handle retry for network error: ${retryError}`); } } throw e; @@ -302,8 +272,6 @@ export class SegmentDestination extends DestinationPlugin { configure(analytics: SegmentClient): void { super.configure(analytics); - console.log('[SegmentDestination] configure() called'); - // NOTE: We used to resolve settings early here if proxy was configured, // but now we must wait for backoff components to initialize in update() // before allowing uploads to proceed. The proxy flag is checked in update() @@ -312,13 +280,10 @@ export class SegmentDestination extends DestinationPlugin { // Enrich events with the Destination metadata this.add(new DestinationMetadataEnrichment(SEGMENT_DESTINATION_KEY)); this.add(this.queuePlugin); - console.log('[SegmentDestination] configure() complete'); } // We block sending stuff to segment until we get the settings update(settings: SegmentAPISettings, _type: UpdateType): void { - console.log('[SegmentDestination] update() called'); - const segmentSettings = settings.integrations[ this.key ] as SegmentAPIIntegration; @@ -330,7 +295,6 @@ export class SegmentDestination extends DestinationPlugin { this.apiHost = `https://${segmentSettings.apiHost}/b`; } - console.log('[SegmentDestination] Storing settings'); // Store settings for httpConfig access this.settings = settings; @@ -339,20 +303,17 @@ export class SegmentDestination extends DestinationPlugin { const httpConfig = settings.httpConfig ?? defaultHttpConfig; const config = this.analytics?.getConfig(); - console.log('[SegmentDestination] Starting backoff initialization'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] Starting backoff component initialization' ); // Await the import to ensure components are fully initialized before uploads can start - void import('../backoff') + import('../backoff') .then(({ UploadStateMachine, BatchUploadManager }) => { - console.log('[SegmentDestination] Backoff module imported'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] Backoff module imported successfully' ); const persistor = config?.storePersistor; - console.log(`[SegmentDestination] persistor available: ${!!persistor}`); try { // Validate configs before passing to constructors @@ -365,38 +326,32 @@ export class SegmentDestination extends DestinationPlugin { this.analytics?.logger ); - console.log('[SegmentDestination] Creating UploadStateMachine...'); this.uploadStateMachine = new UploadStateMachine( config?.writeKey ?? '', persistor, validatedRateLimitConfig, this.analytics?.logger ); - console.log('[SegmentDestination] UploadStateMachine created'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] UploadStateMachine created' ); - console.log('[SegmentDestination] Creating BatchUploadManager...'); this.batchUploadManager = new BatchUploadManager( config?.writeKey ?? '', persistor, validatedBackoffConfig, this.analytics?.logger ); - console.log('[SegmentDestination] BatchUploadManager created'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] BatchUploadManager created' ); // Mark as initialized ONLY after both components are created this.backoffInitialized = true; - console.log('[SegmentDestination] Backoff fully initialized'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] ✅ Backoff fully initialized' ); } catch (e) { - console.error(`[SegmentDestination] CRITICAL: Failed to create backoff components: ${e}`); this.analytics?.logger.error( `[BACKOFF_INIT] ⚠️ CRITICAL: Failed to create backoff components: ${e}` ); @@ -406,20 +361,17 @@ export class SegmentDestination extends DestinationPlugin { // ALWAYS resolve settings promise after backoff initialization attempt // This allows uploads to proceed either with or without backoff this.settingsResolve(); - console.log('[SegmentDestination] Settings promise resolved'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] Settings promise resolved - uploads can proceed' ); }) .catch((e) => { - console.error(`[SegmentDestination] CRITICAL: Failed to import backoff module: ${e}`); this.analytics?.logger.error( `[BACKOFF_INIT] ⚠️ CRITICAL: Failed to import backoff module: ${e}` ); // Still resolve settings to allow uploads without backoff this.settingsResolve(); - console.log('[SegmentDestination] Settings promise resolved despite error'); - this.analytics?.logger.warn( + this.analytics?.logger.info( '[BACKOFF_INIT] Settings promise resolved despite error - uploads proceeding without backoff' ); }); diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index be3637dce..4b9eb40c2 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -419,7 +419,7 @@ export type EnrichmentClosure = (event: SegmentEvent) => SegmentEvent; // State machine persistence export type UploadStateData = { - state: 'READY' | 'WAITING'; + state: 'READY' | 'RATE_LIMITED'; waitUntilTime: number; // timestamp ms globalRetryCount: number; firstFailureTime: number | null; // timestamp ms From 096721b2a4039114fc60bce356fb32bf7a71c4a7 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Fri, 6 Mar 2026 13:57:26 -0600 Subject: [PATCH 4/5] chore(core): remove JSDoc comments from new private methods Co-Authored-By: Claude Sonnet 4.5 --- packages/core/src/backoff/BatchUploadManager.ts | 4 ---- packages/core/src/backoff/UploadStateMachine.ts | 3 --- 2 files changed, 7 deletions(-) diff --git a/packages/core/src/backoff/BatchUploadManager.ts b/packages/core/src/backoff/BatchUploadManager.ts index aed95ae68..fa0cbfe51 100644 --- a/packages/core/src/backoff/BatchUploadManager.ts +++ b/packages/core/src/backoff/BatchUploadManager.ts @@ -180,10 +180,6 @@ export class BatchUploadManager { ); } - /** - * Validates persisted batch metadata on app restart - * Drops batches with corrupted or invalid timestamps - */ private async validatePersistedMetadata(): Promise { const state = await this.store.getState(); const now = Date.now(); diff --git a/packages/core/src/backoff/UploadStateMachine.ts b/packages/core/src/backoff/UploadStateMachine.ts index 734e7c7fa..32d48e2cb 100644 --- a/packages/core/src/backoff/UploadStateMachine.ts +++ b/packages/core/src/backoff/UploadStateMachine.ts @@ -53,9 +53,6 @@ export class UploadStateMachine { } } - /** - * Migrates legacy 'WAITING' state to 'RATE_LIMITED' - */ private migrateLegacyState(): void { void (async () => { try { From 2dc4497600748d9e0bf5c06865541915ac45094d Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Fri, 6 Mar 2026 13:59:25 -0600 Subject: [PATCH 5/5] chore(core): remove inline comments added in code review fixes Co-Authored-By: Claude Sonnet 4.5 --- packages/core/src/api.ts | 1 - packages/core/src/errors.ts | 1 - 2 files changed, 2 deletions(-) diff --git a/packages/core/src/api.ts b/packages/core/src/api.ts index 7f9ab3d0a..d6a280097 100644 --- a/packages/core/src/api.ts +++ b/packages/core/src/api.ts @@ -14,7 +14,6 @@ export const uploadEvents = async ({ // Create Authorization header (Basic auth format) const authHeader = 'Basic ' + btoa(writeKey + ':'); - // keepalive ensures upload completes even if app is backgrounded/closed return await fetch(url, { method: 'POST', keepalive: true, diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index cda077733..132d5552b 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -176,7 +176,6 @@ export const classifyError = ( } // 2. Check 429 special handling (if rate limit enabled) - // Rate limiting enabled by default unless explicitly disabled if (statusCode === 429 && config?.rateLimitEnabled !== false) { return { isRetryable: true, errorType: 'rate_limit' }; }