Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 243 additions & 30 deletions sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ const ORACLE_DEFAULT_ID = getOracleId(
export class WebSocketDriftClientAccountSubscriberV2
implements DriftClientAccountSubscriber
{
// Global counters for diagnostics
static totalSocketsOpened = 0;
static currentActiveSockets = 0;
static instancesCreated = 0;
static subscribeCalls = 0;
isSubscribed: boolean;
program: Program;
commitment?: Commitment;
Expand Down Expand Up @@ -82,6 +87,12 @@ export class WebSocketDriftClientAccountSubscriberV2
spotOracleMap = new Map<number, PublicKey>();
spotOracleStringMap = new Map<number, string>();
oracleSubscribers = new Map<string, AccountSubscriber<OraclePriceData>>();
// Track in-flight oracle subscriptions to prevent duplicate subscriptions/races
private oracleSubscribePromises = new Map<string, Promise<boolean>>();
// Debounced refresh handles to avoid side-effects in getters
private perpOracleMapRefreshTimeout?: ReturnType<typeof setTimeout>;
private spotOracleMapRefreshTimeout?: ReturnType<typeof setTimeout>;
private oracleReconcileTimeout?: ReturnType<typeof setTimeout>;
delistedMarketSetting: DelistedMarketSetting;

initialPerpMarketAccountData: Map<number, PerpMarketAccount>;
Expand Down Expand Up @@ -124,10 +135,108 @@ export class WebSocketDriftClientAccountSubscriberV2
});
this.rpc = rpc;
this.rpcSubscriptions = rpcSubscriptions;

// Instance counter
WebSocketDriftClientAccountSubscriberV2.instancesCreated += 1;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS] WebSocketDriftClientAccountSubscriberV2 instance created #${WebSocketDriftClientAccountSubscriberV2.instancesCreated}`
);
}

// Monkey-patch rpcSubscriptions internals for minimal WS open/close logging
try {
const subsAny = this.rpcSubscriptions as any;
const PATCHED_FLAG = '__driftWsPatched';

const attachLoggingToWs = (ws: any) => {
if (!ws || ws.__driftPatched) return;
ws.__driftPatched = true;
WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened += 1;
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets += 1;
const readyState = ws.readyState;
const activeSubs =
(subsAny?._activeSubscriptions && subsAny._activeSubscriptions.size) || 0;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS OPEN] readyState=${readyState} activeSubs=${activeSubs} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}`
);
}
const onClose = () => {
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets = Math.max(
0,
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets - 1
);
const rs = ws.readyState;
const act =
(subsAny?._activeSubscriptions &&
subsAny._activeSubscriptions.size) || 0;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS CLOSE] readyState=${rs} activeSubs=${act} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}`
);
}
};
// Support browser/node ws
if (typeof ws.addEventListener === 'function') {
ws.addEventListener('close', onClose);
}
if (typeof ws.on === 'function') {
ws.on('close', onClose);
}
};

if (!subsAny[PATCHED_FLAG]) {
subsAny[PATCHED_FLAG] = true;
const originalConnect = subsAny._connectWebsocket;
if (typeof originalConnect === 'function') {
subsAny._connectWebsocket = (...args: any[]) => {
const prevWs = subsAny._ws;
const result = originalConnect.apply(subsAny, args);
const hook = () => {
try {
const ws = subsAny._ws;
// Only treat as new if object identity changed or not patched yet
if (ws && ws !== prevWs) {
// may not be open yet; still attach listeners and count as opened
attachLoggingToWs(ws);
}
} catch (_e) {
// swallow
}
};
try {
if (result && typeof result.then === 'function') {
result.then(() => hook()).catch(() => hook());
} else {
queueMicrotask(hook);
}
} catch (_e) {
setTimeout(hook, 0);
}
return result;
};
}

// Fallback: if a WS already exists, attach once and treat as opened
if (subsAny._ws && !subsAny._ws.__driftPatched) {
attachLoggingToWs(subsAny._ws);
}
}
} catch (_e) {
// swallow
}
}

public async subscribe(): Promise<boolean> {
try {
// Subscribe call counter
WebSocketDriftClientAccountSubscriberV2.subscribeCalls += 1;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS] WebSocketDriftClientAccountSubscriberV2.subscribe() call #${WebSocketDriftClientAccountSubscriberV2.subscribeCalls}`
);
}
const startTime = performance.now();
if (this.isSubscribed) {
console.log(
Expand Down Expand Up @@ -312,21 +421,6 @@ export class WebSocketDriftClientAccountSubscriberV2
})(),
]);

// const initialPerpMarketDataFromLatestData = new Map(
// Array.from(this.perpMarketAccountLatestData.values()).map((data) => [
// data.data.marketIndex,
// data.data,
// ])
// );
// const initialSpotMarketDataFromLatestData = new Map(
// Array.from(this.spotMarketAccountLatestData.values()).map((data) => [
// data.data.marketIndex,
// data.data,
// ])
// );
// this.initialPerpMarketAccountData = initialPerpMarketDataFromLatestData;
// this.initialSpotMarketAccountData = initialSpotMarketDataFromLatestData;

await this.handleDelistedMarketOracles();

await Promise.all([this.setPerpOracleMap(), this.setSpotOracleMap()]);
Expand Down Expand Up @@ -355,6 +449,17 @@ export class WebSocketDriftClientAccountSubscriberV2
console.error('Subscription failed:', error);
this.isSubscribing = false;
this.subscriptionPromiseResolver(false);
// Best-effort cleanup of any partially created subscriptions to avoid leaks
try {
await Promise.all([
this.stateAccountSubscriber?.unsubscribe(),
this.perpMarketAllAccountsSubscriber?.unsubscribe(),
this.spotMarketAllAccountsSubscriber?.unsubscribe(),
this.unsubscribeFromOracles(),
]);
} catch (_cleanupErr) {
// swallow cleanup errors
}
return false;
}
}
Expand Down Expand Up @@ -495,6 +600,22 @@ export class WebSocketDriftClientAccountSubscriberV2
try {
const oracleId = getOracleId(oracleInfo.publicKey, oracleInfo.source);

// If already subscribed, return
if (this.oracleSubscribers.has(oracleId)) {
return true;
}
// If a subscribe is already in-flight for this oracle, reuse it
const inFlight = this.oracleSubscribePromises.get(oracleId);
if (inFlight) {
return await inFlight;
}

const subscribePromise = (async (): Promise<boolean> => {
// Double-check after we were scheduled
if (this.oracleSubscribers.has(oracleId)) {
return true;
}

const client = this.oracleClientCache.get(
oracleInfo.source,
this.program.provider.connection,
Expand All @@ -517,19 +638,25 @@ export class WebSocketDriftClientAccountSubscriberV2
if (initialOraclePriceData) {
accountSubscriber.setData(initialOraclePriceData);
}
await accountSubscriber.subscribe((data: OraclePriceData) => {
this.eventEmitter.emit(
'oraclePriceUpdate',
oracleInfo.publicKey,
oracleInfo.source,
data
);
this.eventEmitter.emit('update');
});
await accountSubscriber.subscribe((data: OraclePriceData) => {
this.eventEmitter.emit(
'oraclePriceUpdate',
oracleInfo.publicKey,
oracleInfo.source,
data
);
this.eventEmitter.emit('update');
});

this.oracleSubscribers.set(oracleId, accountSubscriber);
// Only after successful subscribe, store in map
this.oracleSubscribers.set(oracleId, accountSubscriber);
return true;
})().finally(() => {
this.oracleSubscribePromises.delete(oracleId);
});

return true;
this.oracleSubscribePromises.set(oracleId, subscribePromise);
return await subscribePromise;
} catch (error) {
console.error(
`Failed to subscribe to oracle ${oracleInfo.publicKey.toString()}:`,
Expand Down Expand Up @@ -611,6 +738,8 @@ export class WebSocketDriftClientAccountSubscriberV2
this.perpOracleStringMap.set(perpMarketIndex, oracleId);
}
await Promise.all(addOraclePromises);
// Debounce oracle reconciliation to run once after both perp/spot map updates
this.scheduleOracleReconcile();
}

async setSpotOracleMap() {
Expand All @@ -636,6 +765,8 @@ export class WebSocketDriftClientAccountSubscriberV2
this.spotOracleStringMap.set(spotMarketIndex, oracleId);
}
await Promise.all(addOraclePromises);
// Debounce oracle reconciliation to run once after both perp/spot map updates
this.scheduleOracleReconcile();
}

async handleDelistedMarketOracles(): Promise<void> {
Expand Down Expand Up @@ -710,6 +841,7 @@ export class WebSocketDriftClientAccountSubscriberV2
public getOraclePriceDataAndSlotForPerpMarket(
marketIndex: number
): DataAndSlot<OraclePriceData> | undefined {
this.assertIsSubscribed();
const perpMarketAccount = this.getMarketAccountAndSlot(marketIndex);
const oracle = this.perpOracleMap.get(marketIndex);
const oracleId = this.perpOracleStringMap.get(marketIndex);
Expand All @@ -718,8 +850,8 @@ export class WebSocketDriftClientAccountSubscriberV2
}

if (!perpMarketAccount.data.amm.oracle.equals(oracle)) {
// If the oracle has changed, we need to update the oracle map in background
this.setPerpOracleMap();
// Schedule a debounced oracle map refresh instead of immediate side-effect
this.schedulePerpOracleMapRefresh();
}

return this.getOraclePriceDataAndSlot(oracleId);
Expand All @@ -728,6 +860,7 @@ export class WebSocketDriftClientAccountSubscriberV2
public getOraclePriceDataAndSlotForSpotMarket(
marketIndex: number
): DataAndSlot<OraclePriceData> | undefined {
this.assertIsSubscribed();
const spotMarketAccount = this.getSpotMarketAccountAndSlot(marketIndex);
const oracle = this.spotOracleMap.get(marketIndex);
const oracleId = this.spotOracleStringMap.get(marketIndex);
Expand All @@ -736,10 +869,90 @@ export class WebSocketDriftClientAccountSubscriberV2
}

if (!spotMarketAccount.data.oracle.equals(oracle)) {
// If the oracle has changed, we need to update the oracle map in background
this.setSpotOracleMap();
// Schedule a debounced oracle map refresh instead of immediate side-effect
this.scheduleSpotOracleMapRefresh();
}

return this.getOraclePriceDataAndSlot(oracleId);
}

/**
* Debounced refreshers to avoid side-effects in getters and reduce races
*/
private schedulePerpOracleMapRefresh(): void {
if (this.perpOracleMapRefreshTimeout) {
clearTimeout(this.perpOracleMapRefreshTimeout);
}
this.perpOracleMapRefreshTimeout = setTimeout(async () => {
try {
await this.setPerpOracleMap();
} catch (_e) {
// swallow
}
}, 50);
}

private scheduleSpotOracleMapRefresh(): void {
if (this.spotOracleMapRefreshTimeout) {
clearTimeout(this.spotOracleMapRefreshTimeout);
}
this.spotOracleMapRefreshTimeout = setTimeout(async () => {
try {
await this.setSpotOracleMap();
} catch (_e) {
// swallow
}
}, 50);
}

private scheduleOracleReconcile(): void {
if (this.oracleReconcileTimeout) {
clearTimeout(this.oracleReconcileTimeout);
}
this.oracleReconcileTimeout = setTimeout(async () => {
try {
await this.reconcileOracleSubscribers();
} catch (_e) {
// swallow
}
}, 75);
}

/**
* Ensure we only keep oracle subscriptions that are currently referenced by any market.
* Subscribe missing ones (if any remained due to race), and unsubscribe extras.
*/
private async reconcileOracleSubscribers(): Promise<void> {
// Build desired set from current maps
const desired = new Set<string>();
for (const id of this.perpOracleStringMap.values()) {
if (id) desired.add(id);
}
for (const id of this.spotOracleStringMap.values()) {
if (id) desired.add(id);
}

// Subscribe to any desired oracles that aren't yet active or in-flight
const subscribePromises: Promise<boolean>[] = [];
for (const id of desired) {
if (!this.oracleSubscribers.has(id) && !this.oracleSubscribePromises.has(id)) {
const { publicKey, source } = getPublicKeyAndSourceFromOracleId(id);
// Skip default oracle id
if (!publicKey.equals(PublicKey.default)) {
subscribePromises.push(this.addOracle({ publicKey, source }));
}
}
}
await Promise.all(subscribePromises);

// Unsubscribe from any oracles that are no longer desired
const unsubscribePromises: Promise<void>[] = [];
for (const [id, sub] of this.oracleSubscribers.entries()) {
if (!desired.has(id)) {
unsubscribePromises.push(sub.unsubscribe());
this.oracleSubscribers.delete(id);
}
}
await Promise.all(unsubscribePromises);
}
}
Loading