diff --git a/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts b/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts index 0a37b53d8..af678b3d6 100644 --- a/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts +++ b/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts @@ -53,6 +53,11 @@ const ORACLE_DEFAULT_ID = getOracleId( export class WebSocketDriftClientAccountSubscriberV2 implements DriftClientAccountSubscriber { + // Global counters for diagnostics + static totalSocketsOpened = 0; + static currentActiveSockets = 0; + static instancesCreated = 0; + static subscribeCalls = 0; isSubscribed: boolean; program: Program; commitment?: Commitment; @@ -82,6 +87,12 @@ export class WebSocketDriftClientAccountSubscriberV2 spotOracleMap = new Map(); spotOracleStringMap = new Map(); oracleSubscribers = new Map>(); + // Track in-flight oracle subscriptions to prevent duplicate subscriptions/races + private oracleSubscribePromises = new Map>(); + // Debounced refresh handles to avoid side-effects in getters + private perpOracleMapRefreshTimeout?: ReturnType; + private spotOracleMapRefreshTimeout?: ReturnType; + private oracleReconcileTimeout?: ReturnType; delistedMarketSetting: DelistedMarketSetting; initialPerpMarketAccountData: Map; @@ -124,10 +135,108 @@ export class WebSocketDriftClientAccountSubscriberV2 }); this.rpc = rpc; this.rpcSubscriptions = rpcSubscriptions; + + // Instance counter + WebSocketDriftClientAccountSubscriberV2.instancesCreated += 1; + if (this.resubOpts?.logResubMessages) { + console.log( + `[WS] WebSocketDriftClientAccountSubscriberV2 instance created #${WebSocketDriftClientAccountSubscriberV2.instancesCreated}` + ); + } + + // Monkey-patch rpcSubscriptions internals for minimal WS open/close logging + try { + const subsAny = this.rpcSubscriptions as any; + const PATCHED_FLAG = '__driftWsPatched'; + + const attachLoggingToWs = (ws: any) => { + if (!ws || ws.__driftPatched) return; + ws.__driftPatched = true; + WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened += 1; + WebSocketDriftClientAccountSubscriberV2.currentActiveSockets += 1; + const readyState = ws.readyState; + const activeSubs = + (subsAny?._activeSubscriptions && subsAny._activeSubscriptions.size) || 0; + if (this.resubOpts?.logResubMessages) { + console.log( + `[WS OPEN] readyState=${readyState} activeSubs=${activeSubs} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}` + ); + } + const onClose = () => { + WebSocketDriftClientAccountSubscriberV2.currentActiveSockets = Math.max( + 0, + WebSocketDriftClientAccountSubscriberV2.currentActiveSockets - 1 + ); + const rs = ws.readyState; + const act = + (subsAny?._activeSubscriptions && + subsAny._activeSubscriptions.size) || 0; + if (this.resubOpts?.logResubMessages) { + console.log( + `[WS CLOSE] readyState=${rs} activeSubs=${act} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}` + ); + } + }; + // Support browser/node ws + if (typeof ws.addEventListener === 'function') { + ws.addEventListener('close', onClose); + } + if (typeof ws.on === 'function') { + ws.on('close', onClose); + } + }; + + if (!subsAny[PATCHED_FLAG]) { + subsAny[PATCHED_FLAG] = true; + const originalConnect = subsAny._connectWebsocket; + if (typeof originalConnect === 'function') { + subsAny._connectWebsocket = (...args: any[]) => { + const prevWs = subsAny._ws; + const result = originalConnect.apply(subsAny, args); + const hook = () => { + try { + const ws = subsAny._ws; + // Only treat as new if object identity changed or not patched yet + if (ws && ws !== prevWs) { + // may not be open yet; still attach listeners and count as opened + attachLoggingToWs(ws); + } + } catch (_e) { + // swallow + } + }; + try { + if (result && typeof result.then === 'function') { + result.then(() => hook()).catch(() => hook()); + } else { + queueMicrotask(hook); + } + } catch (_e) { + setTimeout(hook, 0); + } + return result; + }; + } + + // Fallback: if a WS already exists, attach once and treat as opened + if (subsAny._ws && !subsAny._ws.__driftPatched) { + attachLoggingToWs(subsAny._ws); + } + } + } catch (_e) { + // swallow + } } public async subscribe(): Promise { try { + // Subscribe call counter + WebSocketDriftClientAccountSubscriberV2.subscribeCalls += 1; + if (this.resubOpts?.logResubMessages) { + console.log( + `[WS] WebSocketDriftClientAccountSubscriberV2.subscribe() call #${WebSocketDriftClientAccountSubscriberV2.subscribeCalls}` + ); + } const startTime = performance.now(); if (this.isSubscribed) { console.log( @@ -312,21 +421,6 @@ export class WebSocketDriftClientAccountSubscriberV2 })(), ]); - // const initialPerpMarketDataFromLatestData = new Map( - // Array.from(this.perpMarketAccountLatestData.values()).map((data) => [ - // data.data.marketIndex, - // data.data, - // ]) - // ); - // const initialSpotMarketDataFromLatestData = new Map( - // Array.from(this.spotMarketAccountLatestData.values()).map((data) => [ - // data.data.marketIndex, - // data.data, - // ]) - // ); - // this.initialPerpMarketAccountData = initialPerpMarketDataFromLatestData; - // this.initialSpotMarketAccountData = initialSpotMarketDataFromLatestData; - await this.handleDelistedMarketOracles(); await Promise.all([this.setPerpOracleMap(), this.setSpotOracleMap()]); @@ -355,6 +449,17 @@ export class WebSocketDriftClientAccountSubscriberV2 console.error('Subscription failed:', error); this.isSubscribing = false; this.subscriptionPromiseResolver(false); + // Best-effort cleanup of any partially created subscriptions to avoid leaks + try { + await Promise.all([ + this.stateAccountSubscriber?.unsubscribe(), + this.perpMarketAllAccountsSubscriber?.unsubscribe(), + this.spotMarketAllAccountsSubscriber?.unsubscribe(), + this.unsubscribeFromOracles(), + ]); + } catch (_cleanupErr) { + // swallow cleanup errors + } return false; } } @@ -495,6 +600,22 @@ export class WebSocketDriftClientAccountSubscriberV2 try { const oracleId = getOracleId(oracleInfo.publicKey, oracleInfo.source); + // If already subscribed, return + if (this.oracleSubscribers.has(oracleId)) { + return true; + } + // If a subscribe is already in-flight for this oracle, reuse it + const inFlight = this.oracleSubscribePromises.get(oracleId); + if (inFlight) { + return await inFlight; + } + + const subscribePromise = (async (): Promise => { + // Double-check after we were scheduled + if (this.oracleSubscribers.has(oracleId)) { + return true; + } + const client = this.oracleClientCache.get( oracleInfo.source, this.program.provider.connection, @@ -517,19 +638,25 @@ export class WebSocketDriftClientAccountSubscriberV2 if (initialOraclePriceData) { accountSubscriber.setData(initialOraclePriceData); } - await accountSubscriber.subscribe((data: OraclePriceData) => { - this.eventEmitter.emit( - 'oraclePriceUpdate', - oracleInfo.publicKey, - oracleInfo.source, - data - ); - this.eventEmitter.emit('update'); - }); + await accountSubscriber.subscribe((data: OraclePriceData) => { + this.eventEmitter.emit( + 'oraclePriceUpdate', + oracleInfo.publicKey, + oracleInfo.source, + data + ); + this.eventEmitter.emit('update'); + }); - this.oracleSubscribers.set(oracleId, accountSubscriber); + // Only after successful subscribe, store in map + this.oracleSubscribers.set(oracleId, accountSubscriber); + return true; + })().finally(() => { + this.oracleSubscribePromises.delete(oracleId); + }); - return true; + this.oracleSubscribePromises.set(oracleId, subscribePromise); + return await subscribePromise; } catch (error) { console.error( `Failed to subscribe to oracle ${oracleInfo.publicKey.toString()}:`, @@ -611,6 +738,8 @@ export class WebSocketDriftClientAccountSubscriberV2 this.perpOracleStringMap.set(perpMarketIndex, oracleId); } await Promise.all(addOraclePromises); + // Debounce oracle reconciliation to run once after both perp/spot map updates + this.scheduleOracleReconcile(); } async setSpotOracleMap() { @@ -636,6 +765,8 @@ export class WebSocketDriftClientAccountSubscriberV2 this.spotOracleStringMap.set(spotMarketIndex, oracleId); } await Promise.all(addOraclePromises); + // Debounce oracle reconciliation to run once after both perp/spot map updates + this.scheduleOracleReconcile(); } async handleDelistedMarketOracles(): Promise { @@ -710,6 +841,7 @@ export class WebSocketDriftClientAccountSubscriberV2 public getOraclePriceDataAndSlotForPerpMarket( marketIndex: number ): DataAndSlot | undefined { + this.assertIsSubscribed(); const perpMarketAccount = this.getMarketAccountAndSlot(marketIndex); const oracle = this.perpOracleMap.get(marketIndex); const oracleId = this.perpOracleStringMap.get(marketIndex); @@ -718,8 +850,8 @@ export class WebSocketDriftClientAccountSubscriberV2 } if (!perpMarketAccount.data.amm.oracle.equals(oracle)) { - // If the oracle has changed, we need to update the oracle map in background - this.setPerpOracleMap(); + // Schedule a debounced oracle map refresh instead of immediate side-effect + this.schedulePerpOracleMapRefresh(); } return this.getOraclePriceDataAndSlot(oracleId); @@ -728,6 +860,7 @@ export class WebSocketDriftClientAccountSubscriberV2 public getOraclePriceDataAndSlotForSpotMarket( marketIndex: number ): DataAndSlot | undefined { + this.assertIsSubscribed(); const spotMarketAccount = this.getSpotMarketAccountAndSlot(marketIndex); const oracle = this.spotOracleMap.get(marketIndex); const oracleId = this.spotOracleStringMap.get(marketIndex); @@ -736,10 +869,90 @@ export class WebSocketDriftClientAccountSubscriberV2 } if (!spotMarketAccount.data.oracle.equals(oracle)) { - // If the oracle has changed, we need to update the oracle map in background - this.setSpotOracleMap(); + // Schedule a debounced oracle map refresh instead of immediate side-effect + this.scheduleSpotOracleMapRefresh(); } return this.getOraclePriceDataAndSlot(oracleId); } + + /** + * Debounced refreshers to avoid side-effects in getters and reduce races + */ + private schedulePerpOracleMapRefresh(): void { + if (this.perpOracleMapRefreshTimeout) { + clearTimeout(this.perpOracleMapRefreshTimeout); + } + this.perpOracleMapRefreshTimeout = setTimeout(async () => { + try { + await this.setPerpOracleMap(); + } catch (_e) { + // swallow + } + }, 50); + } + + private scheduleSpotOracleMapRefresh(): void { + if (this.spotOracleMapRefreshTimeout) { + clearTimeout(this.spotOracleMapRefreshTimeout); + } + this.spotOracleMapRefreshTimeout = setTimeout(async () => { + try { + await this.setSpotOracleMap(); + } catch (_e) { + // swallow + } + }, 50); + } + + private scheduleOracleReconcile(): void { + if (this.oracleReconcileTimeout) { + clearTimeout(this.oracleReconcileTimeout); + } + this.oracleReconcileTimeout = setTimeout(async () => { + try { + await this.reconcileOracleSubscribers(); + } catch (_e) { + // swallow + } + }, 75); + } + + /** + * Ensure we only keep oracle subscriptions that are currently referenced by any market. + * Subscribe missing ones (if any remained due to race), and unsubscribe extras. + */ + private async reconcileOracleSubscribers(): Promise { + // Build desired set from current maps + const desired = new Set(); + for (const id of this.perpOracleStringMap.values()) { + if (id) desired.add(id); + } + for (const id of this.spotOracleStringMap.values()) { + if (id) desired.add(id); + } + + // Subscribe to any desired oracles that aren't yet active or in-flight + const subscribePromises: Promise[] = []; + for (const id of desired) { + if (!this.oracleSubscribers.has(id) && !this.oracleSubscribePromises.has(id)) { + const { publicKey, source } = getPublicKeyAndSourceFromOracleId(id); + // Skip default oracle id + if (!publicKey.equals(PublicKey.default)) { + subscribePromises.push(this.addOracle({ publicKey, source })); + } + } + } + await Promise.all(subscribePromises); + + // Unsubscribe from any oracles that are no longer desired + const unsubscribePromises: Promise[] = []; + for (const [id, sub] of this.oracleSubscribers.entries()) { + if (!desired.has(id)) { + unsubscribePromises.push(sub.unsubscribe()); + this.oracleSubscribers.delete(id); + } + } + await Promise.all(unsubscribePromises); + } }