diff --git a/packages/core/src/api.ts b/packages/core/src/api.ts index 6aa2851a7..d6a280097 100644 --- a/packages/core/src/api.ts +++ b/packages/core/src/api.ts @@ -4,21 +4,28 @@ export const uploadEvents = async ({ writeKey, url, events, + retryCount = 0, }: { writeKey: string; url: string; events: SegmentEvent[]; -}) => { + retryCount?: number; +}): Promise => { + // Create Authorization header (Basic auth format) + const authHeader = 'Basic ' + btoa(writeKey + ':'); + return await fetch(url, { method: 'POST', keepalive: true, body: JSON.stringify({ batch: events, sentAt: new Date().toISOString(), - writeKey: writeKey, + writeKey: writeKey, // Keep in body for backwards compatibility }), headers: { 'Content-Type': 'application/json; charset=utf-8', + 'Authorization': authHeader, + 'X-Retry-Count': retryCount.toString(), }, }); }; diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 5e98b7a88..8b19b368c 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -99,18 +99,14 @@ export const checkResponseForErrors = (response: Response) => { * @returns a SegmentError object */ export const translateHTTPError = (error: unknown): SegmentError => { - // SegmentError already if (error instanceof SegmentError) { return error; - // JSON Deserialization Errors } else if (error instanceof SyntaxError) { return new JSONError( ErrorType.JsonUnableToDeserialize, error.message, error ); - - // HTTP Errors } else { const message = error instanceof Error diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index cc7e911e6..8dec423a7 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -8,12 +8,18 @@ import { } from '../types'; import { chunk, createPromise, getURL } from '../util'; import { uploadEvents } from '../api'; +import { getUUID } from '../uuid'; import type { SegmentClient } from '../analytics'; import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment'; import { QueueFlushingPlugin } from './QueueFlushingPlugin'; -import { defaultApiHost } from '../constants'; -import { checkResponseForErrors, translateHTTPError } from '../errors'; +import { defaultApiHost, defaultHttpConfig } from '../constants'; +import { translateHTTPError, classifyError, parseRetryAfter } from '../errors'; import { defaultConfig } from '../constants'; +import type { UploadStateMachine, BatchUploadManager } from '../backoff'; +import { + validateBackoffConfig, + validateRateLimitConfig, +} from '../config-validation'; const MAX_EVENTS_PER_BATCH = 100; const MAX_PAYLOAD_SIZE_IN_KB = 500; @@ -25,10 +31,13 @@ export class SegmentDestination extends DestinationPlugin { private apiHost?: string; private settingsResolve: () => void; private settingsPromise: Promise; + private uploadStateMachine?: UploadStateMachine; + private batchUploadManager?: BatchUploadManager; + private settings?: SegmentAPISettings; + private backoffInitialized = false; constructor() { super(); - // We don't timeout this promise. We strictly need the response from Segment before sending things const { promise, resolve } = createPromise(); this.settingsPromise = promise; this.settingsResolve = resolve; @@ -39,9 +48,36 @@ export class SegmentDestination extends DestinationPlugin { return Promise.resolve(); } - // We're not sending events until Segment has loaded all settings await this.settingsPromise; + if (this.backoffInitialized) { + if (!this.uploadStateMachine) { + this.analytics?.logger.error( + 'CRITICAL: backoffInitialized=true but uploadStateMachine undefined!' + ); + } else { + try { + this.analytics?.logger.info( + `[UPLOAD_GATE] Checking canUpload() for ${events.length} events` + ); + const canUpload = await this.uploadStateMachine.canUpload(); + this.analytics?.logger.info( + `[UPLOAD_GATE] canUpload() returned: ${canUpload}` + ); + if (!canUpload) { + this.analytics?.logger.info( + 'Upload deferred: rate limit in effect' + ); + return Promise.resolve(); + } + } catch (e) { + this.analytics?.logger.error( + `uploadStateMachine.canUpload() threw error: ${e}` + ); + } + } + } + const config = this.analytics?.getConfig() ?? defaultConfig; const chunkedEvents: SegmentEvent[][] = chunk( @@ -51,27 +87,29 @@ export class SegmentDestination extends DestinationPlugin { ); let sentEvents: SegmentEvent[] = []; + let eventsToDequeue: SegmentEvent[] = []; let numFailedEvents = 0; - await Promise.all( - chunkedEvents.map(async (batch: SegmentEvent[]) => { - try { - const res = await uploadEvents({ - writeKey: config.writeKey, - url: this.getEndpoint(), - events: batch, - }); - checkResponseForErrors(res); + for (const batch of chunkedEvents) { + try { + const result = await this.uploadBatch(batch); + + if (result.success) { sentEvents = sentEvents.concat(batch); - } catch (e) { - this.analytics?.reportInternalError(translateHTTPError(e)); - this.analytics?.logger.warn(e); - numFailedEvents += batch.length; - } finally { - await this.queuePlugin.dequeue(sentEvents); + eventsToDequeue = eventsToDequeue.concat(batch); + } else if (result.dropped) { + eventsToDequeue = eventsToDequeue.concat(batch); + } else if (result.halt) { + break; } - }) - ); + } catch (e) { + this.analytics?.reportInternalError(translateHTTPError(e)); + this.analytics?.logger.warn(e); + numFailedEvents += batch.length; + } + } + + await this.queuePlugin.dequeue(eventsToDequeue); if (sentEvents.length) { if (config.debug === true) { @@ -86,6 +124,124 @@ export class SegmentDestination extends DestinationPlugin { return Promise.resolve(); }; + private async uploadBatch( + batch: SegmentEvent[] + ): Promise<{ success: boolean; halt: boolean; dropped: boolean }> { + const batchId = getUUID(); + const config = this.analytics?.getConfig() ?? defaultConfig; + const httpConfig = this.settings?.httpConfig ?? defaultHttpConfig; + const endpoint = this.getEndpoint(); + + let retryCount = 0; + if (this.backoffInitialized) { + try { + const batchRetryCount = + this.batchUploadManager && batchId + ? await this.batchUploadManager.getBatchRetryCount(batchId) + : 0; + const globalRetryCount = this.uploadStateMachine + ? await this.uploadStateMachine.getGlobalRetryCount() + : 0; + retryCount = batchRetryCount || globalRetryCount; + } catch (e) { + this.analytics?.logger.error(`Failed to get retry count: ${e}`); + } + } + + try { + const res = await uploadEvents({ + writeKey: config.writeKey, + url: endpoint, + events: batch, + retryCount, + }); + + if (res.ok) { + if (this.backoffInitialized) { + try { + await this.uploadStateMachine?.reset(); + if (this.batchUploadManager) { + await this.batchUploadManager.removeBatch(batchId); + } + } catch (e) {} + } + this.analytics?.logger.info( + `Batch uploaded successfully (${batch.length} events)` + ); + return { success: true, halt: false, dropped: false }; + } + + const classification = classifyError(res.status, { + default4xxBehavior: httpConfig.backoffConfig?.default4xxBehavior, + default5xxBehavior: httpConfig.backoffConfig?.default5xxBehavior, + statusCodeOverrides: httpConfig.backoffConfig?.statusCodeOverrides, + rateLimitEnabled: httpConfig.rateLimitConfig?.enabled, + }); + + if (classification.errorType === 'rate_limit') { + const retryAfterValue = res.headers.get('retry-after'); + const retryAfterSeconds = + parseRetryAfter( + retryAfterValue, + httpConfig.rateLimitConfig?.maxRetryInterval + ) ?? 60; + + if (this.backoffInitialized && this.uploadStateMachine) { + try { + await this.uploadStateMachine.handle429(retryAfterSeconds); + } catch (e) {} + } + + this.analytics?.logger.warn( + `Rate limited (429): retry after ${retryAfterSeconds}s` + ); + return { success: false, halt: true, dropped: false }; + } + + if ( + classification.isRetryable && + classification.errorType === 'transient' + ) { + if (this.backoffInitialized && this.batchUploadManager) { + try { + const existingRetryCount = + await this.batchUploadManager.getBatchRetryCount(batchId); + if (existingRetryCount === 0) { + this.batchUploadManager.createBatch(batch, batchId); + } + await this.batchUploadManager.handleRetry(batchId, res.status); + } catch (e) { + this.analytics?.logger.error(`Failed to handle batch retry: ${e}`); + } + } + return { success: false, halt: false, dropped: false }; + } + + this.analytics?.logger.warn( + `Permanent error (${res.status}): dropping batch (${batch.length} events)` + ); + if (this.backoffInitialized && this.batchUploadManager) { + try { + await this.batchUploadManager.removeBatch(batchId); + } catch (e) { + this.analytics?.logger.error(`Failed to remove batch metadata: ${e}`); + } + } + return { success: false, halt: false, dropped: true }; + } catch (e) { + if (this.backoffInitialized && this.batchUploadManager) { + try { + await this.batchUploadManager.handleRetry(batchId, -1); + } catch (retryError) { + this.analytics?.logger.error( + `Failed to handle retry for network error: ${retryError}` + ); + } + } + throw e; + } + } + private readonly queuePlugin = new QueueFlushingPlugin(this.sendEvents); private getEndpoint(): string { @@ -107,25 +263,19 @@ export class SegmentDestination extends DestinationPlugin { try { return getURL(baseURL, endpoint); } catch (error) { - console.error('Error in getEndpoint:', `fallback to ${defaultApiHost}`); + this.analytics?.logger.error( + `Error in getEndpoint, fallback to ${defaultApiHost}: ${error}` + ); return defaultApiHost; } } configure(analytics: SegmentClient): void { super.configure(analytics); - // If the client has a proxy we don't need to await for settings apiHost, we can send events directly - // Important! If new settings are required in the future you probably want to change this! - if (analytics.getConfig().proxy !== undefined) { - this.settingsResolve(); - } - - // Enrich events with the Destination metadata this.add(new DestinationMetadataEnrichment(SEGMENT_DESTINATION_KEY)); this.add(this.queuePlugin); } - // We block sending stuff to segment until we get the settings update(settings: SegmentAPISettings, _type: UpdateType): void { const segmentSettings = settings.integrations[ this.key @@ -134,14 +284,83 @@ export class SegmentDestination extends DestinationPlugin { segmentSettings?.apiHost !== undefined && segmentSettings?.apiHost !== null ) { - //assign the api host from segment settings (domain/v1) this.apiHost = `https://${segmentSettings.apiHost}/b`; } - this.settingsResolve(); + + this.settings = settings; + + const httpConfig = settings.httpConfig ?? defaultHttpConfig; + const config = this.analytics?.getConfig(); + + this.analytics?.logger.info( + '[BACKOFF_INIT] Starting backoff component initialization' + ); + + // Await the import to ensure components are fully initialized before uploads can start + import('../backoff') + .then(({ UploadStateMachine, BatchUploadManager }) => { + this.analytics?.logger.info( + '[BACKOFF_INIT] Backoff module imported successfully' + ); + const persistor = config?.storePersistor; + + try { + const validatedRateLimitConfig = validateRateLimitConfig( + httpConfig.rateLimitConfig ?? defaultHttpConfig.rateLimitConfig!, + this.analytics?.logger + ); + const validatedBackoffConfig = validateBackoffConfig( + httpConfig.backoffConfig ?? defaultHttpConfig.backoffConfig!, + this.analytics?.logger + ); + + this.uploadStateMachine = new UploadStateMachine( + config?.writeKey ?? '', + persistor, + validatedRateLimitConfig, + this.analytics?.logger + ); + this.analytics?.logger.info( + '[BACKOFF_INIT] UploadStateMachine created' + ); + + this.batchUploadManager = new BatchUploadManager( + config?.writeKey ?? '', + persistor, + validatedBackoffConfig, + this.analytics?.logger + ); + this.analytics?.logger.info( + '[BACKOFF_INIT] BatchUploadManager created' + ); + + this.backoffInitialized = true; + this.analytics?.logger.info( + '[BACKOFF_INIT] Backoff fully initialized' + ); + } catch (e) { + this.analytics?.logger.error( + `[BACKOFF_INIT] CRITICAL: Failed to create backoff components: ${e}` + ); + } + + this.settingsResolve(); + this.analytics?.logger.info( + '[BACKOFF_INIT] Settings promise resolved - uploads can proceed' + ); + }) + .catch((e) => { + this.analytics?.logger.error( + `[BACKOFF_INIT] CRITICAL: Failed to import backoff module: ${e}` + ); + this.settingsResolve(); + this.analytics?.logger.info( + '[BACKOFF_INIT] Settings promise resolved despite error - uploads proceeding without backoff' + ); + }); } execute(event: SegmentEvent): Promise { - // Execute the internal timeline here, the queue plugin will pick up the event and add it to the queue automatically const enrichedEvent = super.execute(event); return enrichedEvent; } diff --git a/packages/core/src/plugins/__tests__/SegmentDestination.test.ts b/packages/core/src/plugins/__tests__/SegmentDestination.test.ts index 885097fd9..e499ff715 100644 --- a/packages/core/src/plugins/__tests__/SegmentDestination.test.ts +++ b/packages/core/src/plugins/__tests__/SegmentDestination.test.ts @@ -10,6 +10,7 @@ import { Config, EventType, SegmentAPIIntegration, + SegmentAPISettings, SegmentEvent, TrackEventType, UpdateType, @@ -24,12 +25,20 @@ jest.mock('uuid'); describe('SegmentDestination', () => { const store = new MockSegmentStore(); + + // Mock persistor for backoff state management + const mockPersistor = { + get: jest.fn(() => Promise.resolve(undefined)), + set: jest.fn(() => Promise.resolve()), + }; + const clientArgs = { logger: getMockLogger(), config: { writeKey: '123-456', maxBatchSize: 2, flushInterval: 0, + storePersistor: mockPersistor, }, store, }; @@ -325,6 +334,7 @@ describe('SegmentDestination', () => { events: events.slice(0, 2).map((e) => ({ ...e, })), + retryCount: 0, }); expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(defaultApiHost, ''), // default api already appended with '/b' @@ -332,6 +342,7 @@ describe('SegmentDestination', () => { events: events.slice(2, 4).map((e) => ({ ...e, })), + retryCount: 0, }); }); @@ -359,6 +370,7 @@ describe('SegmentDestination', () => { events: events.slice(0, 2).map((e) => ({ ...e, })), + retryCount: 0, }); }); @@ -410,6 +422,7 @@ describe('SegmentDestination', () => { events: events.map((e) => ({ ...e, })), + retryCount: 0, }); } ); @@ -546,4 +559,263 @@ describe('SegmentDestination', () => { expect(spy).toHaveBeenCalled(); }); }); + + describe('TAPI backoff and rate limiting', () => { + const createTestWith = ({ + config, + settings, + events, + }: { + config?: Config; + settings?: SegmentAPISettings; + events: SegmentEvent[]; + }) => { + const plugin = new SegmentDestination(); + + const analytics = new SegmentClient({ + ...clientArgs, + config: config ?? clientArgs.config, + store: new MockSegmentStore({ + settings: { + [SEGMENT_DESTINATION_KEY]: {}, + }, + }), + }); + + plugin.configure(analytics); + plugin.update( + { + integrations: { + [SEGMENT_DESTINATION_KEY]: + settings?.integrations?.[SEGMENT_DESTINATION_KEY] ?? {}, + }, + httpConfig: settings?.httpConfig ?? { + rateLimitConfig: { + enabled: true, + maxRetryCount: 100, + maxRetryInterval: 300, + maxRateLimitDuration: 43200, + }, + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 43200, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: { + '408': 'retry', + '410': 'retry', + '429': 'retry', + '460': 'retry', + }, + }, + }, + }, + UpdateType.initial + ); + + jest + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + .spyOn(plugin.queuePlugin.queueStore!, 'getState') + .mockImplementation(createMockStoreGetter(() => ({ events }))); + + return { plugin, analytics }; + }; + + it('sends Authorization header with base64 encoded writeKey', async () => { + const events = [{ messageId: 'message-1' }] as SegmentEvent[]; + const { plugin } = createTestWith({ events }); + + const sendEventsSpy = jest + .spyOn(api, 'uploadEvents') + .mockResolvedValue({ ok: true } as Response); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledWith( + expect.objectContaining({ + retryCount: 0, + }) + ); + }); + + it('sends X-Retry-Count header starting at 0', async () => { + const events = [{ messageId: 'message-1' }] as SegmentEvent[]; + const { plugin } = createTestWith({ events }); + + const sendEventsSpy = jest + .spyOn(api, 'uploadEvents') + .mockResolvedValue({ ok: true } as Response); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledWith( + expect.objectContaining({ + retryCount: 0, + }) + ); + }); + + it('halts upload loop on 429 response', async () => { + const events = [ + { messageId: 'message-1' }, + { messageId: 'message-2' }, + { messageId: 'message-3' }, + { messageId: 'message-4' }, + ] as SegmentEvent[]; + + const { plugin } = createTestWith({ events }); + + const sendEventsSpy = jest.spyOn(api, 'uploadEvents').mockResolvedValue({ + ok: false, + status: 429, + headers: new Headers({ 'retry-after': '60' }), + } as Response); + + await plugin.flush(); + + // With maxBatchSize=2, there would be 2 batches + // But 429 on first batch should halt, so only 1 call + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + }); + + it('blocks future uploads after 429 until waitUntilTime passes', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const events = [{ messageId: 'message-1' }] as SegmentEvent[]; + const { plugin } = createTestWith({ events }); + + // First flush returns 429 + jest.spyOn(api, 'uploadEvents').mockResolvedValue({ + ok: false, + status: 429, + headers: new Headers({ 'retry-after': '60' }), + } as Response); + + await plugin.flush(); + + // Second flush should be blocked (same time) + const sendEventsSpy = jest.spyOn(api, 'uploadEvents'); + sendEventsSpy.mockClear(); + + await plugin.flush(); + + expect(sendEventsSpy).not.toHaveBeenCalled(); + }); + + it('allows upload after 429 waitUntilTime passes', async () => { + const now = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(now); + + const events = [{ messageId: 'message-1' }] as SegmentEvent[]; + const { plugin } = createTestWith({ events }); + + // First flush returns 429 + jest.spyOn(api, 'uploadEvents').mockResolvedValue({ + ok: false, + status: 429, + headers: new Headers({ 'retry-after': '60' }), + } as Response); + + await plugin.flush(); + + // Advance time past waitUntilTime + jest.spyOn(Date, 'now').mockReturnValue(now + 61000); + + // Second flush should now work + const sendEventsSpy = jest + .spyOn(api, 'uploadEvents') + .mockResolvedValue({ ok: true } as Response); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalled(); + }); + + it('continues to next batch on transient error (500)', async () => { + const events = [ + { messageId: 'message-1' }, + { messageId: 'message-2' }, + { messageId: 'message-3' }, + { messageId: 'message-4' }, + ] as SegmentEvent[]; + + const { plugin } = createTestWith({ events }); + + let callCount = 0; + const sendEventsSpy = jest + .spyOn(api, 'uploadEvents') + .mockImplementation(async () => { + callCount++; + if (callCount === 1) { + // First batch fails with 500 + return { + ok: false, + status: 500, + headers: new Headers(), + } as Response; + } + // Second batch succeeds + return { ok: true, status: 200 } as Response; + }); + + await plugin.flush(); + + // Should try both batches (not halt on 500) + expect(sendEventsSpy).toHaveBeenCalledTimes(2); + }); + + it('drops batch on permanent error (400)', async () => { + const events = [{ messageId: 'message-1' }] as SegmentEvent[]; + const { plugin, analytics } = createTestWith({ events }); + + const warnSpy = jest.spyOn(analytics.logger, 'warn'); + + jest.spyOn(api, 'uploadEvents').mockResolvedValue({ + ok: false, + status: 400, + headers: new Headers(), + } as Response); + + await plugin.flush(); + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Permanent error (400): dropping batch') + ); + }); + + it('processes batches sequentially (not parallel)', async () => { + const events = [ + { messageId: 'message-1' }, + { messageId: 'message-2' }, + { messageId: 'message-3' }, + { messageId: 'message-4' }, + ] as SegmentEvent[]; + + const { plugin } = createTestWith({ events }); + + const callOrder: number[] = []; + let currentCall = 0; + + jest.spyOn(api, 'uploadEvents').mockImplementation(async () => { + const thisCall = ++currentCall; + callOrder.push(thisCall); + + // Simulate async delay + await new Promise((resolve) => setTimeout(resolve, 10)); + + return { ok: true, status: 200 } as Response; + }); + + await plugin.flush(); + + // Calls should be sequential: [1, 2] + expect(callOrder).toEqual([1, 2]); + }); + }); }); diff --git a/scripts/sync-versions.sh b/scripts/sync-versions.sh index 51f911283..b9d7da885 100755 --- a/scripts/sync-versions.sh +++ b/scripts/sync-versions.sh @@ -33,7 +33,7 @@ for pkg_json in "$PROJECT_ROOT"/packages/*/package.json "$PROJECT_ROOT"/packages echo " ok $name@$current" skipped=$((skipped + 1)) else - jq --arg v "$latest" '.version = $v' "$pkg_json" > "$pkg_json.tmp" && mv "$pkg_json.tmp" "$pkg_json" + jq --arg v "$latest" '.version = $v' "$pkg_json" >"$pkg_json.tmp" && mv "$pkg_json.tmp" "$pkg_json" echo " bump $name $current -> $latest" updated=$((updated + 1)) fi