diff --git a/packages/core/src/RPCMessages/RPCConnect.ts b/packages/core/src/RPCMessages/RPCConnect.ts index a47889df5..227a02274 100644 --- a/packages/core/src/RPCMessages/RPCConnect.ts +++ b/packages/core/src/RPCMessages/RPCConnect.ts @@ -19,12 +19,14 @@ export const DEFAULT_CONNECT_VERSION = { major: 3, minor: 0, revision: 0, + string: 'bkw', } export const UNIFIED_CONNECT_VERSION = { major: 4, minor: 0, revision: 0, + string: 'bkw', } export const RPCConnect = (params: RPCConnectParams) => { diff --git a/packages/js/package.json b/packages/js/package.json index ae7d9dedf..be9823985 100644 --- a/packages/js/package.json +++ b/packages/js/package.json @@ -3,7 +3,7 @@ "description": "SignalWire JS SDK", "author": "SignalWire Team ", "license": "MIT", - "version": "3.29.1", + "version": "3.29.1-bkw", "main": "dist/index.js", "module": "dist/index.esm.js", "unpkg": "dist/index.umd.js", diff --git a/packages/js/src/fabric/WSClient.ts b/packages/js/src/fabric/WSClient.ts index 7199fc960..c445e15cc 100644 --- a/packages/js/src/fabric/WSClient.ts +++ b/packages/js/src/fabric/WSClient.ts @@ -32,6 +32,7 @@ import { PREVIOUS_CALLID_STORAGE_KEY } from './utils/constants' export class WSClient extends BaseClient<{}> implements WSClientContract { private _incomingCallManager: IncomingCallManager private _disconnected: boolean = false + private _eventCleanup: Array<() => void> = [] constructor(private wsClientOptions: WSClientOptions) { const client = createWSClient(wsClientOptions) @@ -305,10 +306,24 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { this._disconnected = true }) + // Clean up all event listeners + this._cleanupEventListeners() + super.disconnect() }) } + private _cleanupEventListeners() { + this._eventCleanup.forEach(cleanup => { + try { + cleanup() + } catch (error) { + this.logger.error('Error cleaning up event listener:', error) + } + }) + this._eventCleanup = [] + } + public async dial(params: DialParams) { return new Promise(async (resolve, reject) => { try { diff --git a/packages/js/src/fabric/workers/callSegmentWorker.ts b/packages/js/src/fabric/workers/callSegmentWorker.ts index ad81dbd53..59b139438 100644 --- a/packages/js/src/fabric/workers/callSegmentWorker.ts +++ b/packages/js/src/fabric/workers/callSegmentWorker.ts @@ -64,6 +64,9 @@ export const callSegmentWorker = function* ( case 'call.room': cfRoomSession.emit(type, payload) break + case 'call.state': + cfRoomSession.emit(type, payload) + break /** * The Core module includes a generic worker, {@link memberPositionWorker}, @@ -120,17 +123,23 @@ export const callSegmentWorker = function* ( } const isSegmentEvent = (action: SDKActions) => { - const { type, payload } = action as FabricAction - const shouldWatch = - type.startsWith('call.') || - type.startsWith('member.') || - type.startsWith('layout.') + const { payload } = action as FabricAction + + // Check both direct payload fields and nested params (for call.state events) const hasSegmentCallId = - 'call_id' in payload && segmentCallId === payload.call_id + ('call_id' in payload && segmentCallId === payload.call_id) || + ('params' in payload && + 'call_id' in (payload as any).params && + (payload as any).params.call_id === segmentCallId) + const hasSegmentRoomSessionId = - segmentRooSessionId === payload.room_session_id + (segmentRooSessionId === payload.room_session_id) || + ('params' in payload && + 'room_session_id' in (payload as any).params && + (payload as any).params.room_session_id === segmentRooSessionId) - if (shouldWatch && (hasSegmentCallId || hasSegmentRoomSessionId)) { + // Allow ALL events that belong to this segment (no type filtering) + if (hasSegmentCallId || hasSegmentRoomSessionId) { return true } diff --git a/packages/webrtc/src/RTCPeer.ts b/packages/webrtc/src/RTCPeer.ts index bcff1f100..3aa038581 100644 --- a/packages/webrtc/src/RTCPeer.ts +++ b/packages/webrtc/src/RTCPeer.ts @@ -40,6 +40,7 @@ export default class RTCPeer { private _processingLocalSDP = false private _waitNegotiation: Promise = Promise.resolve() private _waitNegotiationCompleter: () => void + private _eventCleanup: Array<() => void> = [] /** * Both of these properties are used to have granular * control over when to `resolve` and when `reject` the @@ -466,7 +467,7 @@ export default class RTCPeer { } this.instance.removeEventListener('icecandidate', this._onIce) - this.instance.addEventListener('icecandidate', this._onIce) + this._addEventListener(this.instance, 'icecandidate', this._onIce as EventListener) if (this.isOffer) { this.logger.debug('Trying to generate offer') const offerOptions: RTCOfferOptions = { @@ -584,6 +585,18 @@ export default class RTCPeer { this.logger.info( 'Using pre-warmed connection from session pool with ICE candidates ready' ) + + // Debug the pooled connection state + this.logger.debug('Pooled connection state:', pooledConnection.connectionState) + this.logger.debug('Pooled connection signaling state:', pooledConnection.signalingState) + this.logger.debug('Pooled connection ICE state:', pooledConnection.iceConnectionState) + + const transceivers = pooledConnection.getTransceivers() + this.logger.debug(`Pooled connection has ${transceivers.length} transceivers`) + transceivers.forEach((t, i) => { + this.logger.debug(` Transceiver ${i}: mid=${t.mid}, direction=${t.direction}`) + }) + this.instance = pooledConnection // The connection is already clean: @@ -653,18 +666,28 @@ export default class RTCPeer { t.receiver.track?.kind === 'audio' || (!t.sender.track && !t.receiver.track && - t.mid?.includes('audio')) + (t.mid === null || t.mid?.includes('audio'))) ) + this.logger.debug( + `Found ${existingAudioTransceivers.length} existing audio transceivers for ${audioTracks.length} audio tracks` + ) + audioTracks.forEach((track, index) => { if (index < existingAudioTransceivers.length) { // Reuse existing transceiver const transceiver = existingAudioTransceivers[index] this.logger.debug( - 'Reusing existing audio transceiver', - transceiver.mid + `Reusing existing audio transceiver with mid=${transceiver.mid}, direction=${transceiver.direction}` ) - transceiver.sender.replaceTrack(track) + + try { + transceiver.sender.replaceTrack(track) + this.logger.debug(`Successfully replaced track on audio transceiver`) + } catch (error) { + this.logger.error('Failed to replace track on audio transceiver:', error) + } + transceiver.direction = audioTransceiverParams.direction || 'sendrecv' // Add stream association @@ -704,7 +727,7 @@ export default class RTCPeer { t.receiver.track?.kind === 'video' || (!t.sender.track && !t.receiver.track && - t.mid?.includes('video')) + (t.mid === null || t.mid?.includes('video'))) ) videoTracks.forEach((track, index) => { @@ -844,6 +867,26 @@ export default class RTCPeer { } stop() { + // Reset negotiation flag to prevent further negotiation attempts + this._negotiating = false + this._processingLocalSDP = false + this._processingRemoteSDP = false + + // Clear any active timers + this.clearTimers() + this.clearResumeTimer() + this.clearConnectionStateTimer() + + // Clean up all event listeners + this._eventCleanup.forEach(cleanup => { + try { + cleanup() + } catch (error) { + this.logger.error('Error cleaning up event listener:', error) + } + }) + this._eventCleanup = [] + // Do not use `stopTrack` util to not dispatch the `ended` event this._localStream?.getTracks().forEach((track) => track.stop()) this._remoteStream?.getTracks().forEach((track) => track.stop()) @@ -867,7 +910,7 @@ export default class RTCPeer { .find( (t) => t.receiver.track?.kind === kind || - (!t.sender.track && !t.receiver.track && t.mid?.includes(kind)) + (!t.sender.track && !t.receiver.track && (t.mid === null || t.mid?.includes(kind))) ) if (existingTransceiver) { @@ -898,6 +941,14 @@ export default class RTCPeer { return } + // Check if the peer connection is still valid before processing + if (!this.instance || this.instance.connectionState === 'closed' || + this.instance.connectionState === 'failed') { + this.logger.warn('Peer connection is closed or failed in _sdpReady, skipping') + this._negotiating = false + return + } + this._processingLocalSDP = true clearTimeout(this._iceTimeout) @@ -1062,6 +1113,13 @@ export default class RTCPeer { } private _retryWithMoreCandidates() { + // Skip renegotiation for answer-type connections (incoming calls) + // This prevents verto.modify spam on Windows where ICE gathering is delayed + if (this.type === 'answer') { + this.logger.info('Skipping renegotiation for answer-type connection (incoming call)') + return; + } + // Check if we have better candidates now than when we first sent SDP const hasMoreCandidates = this._hasMoreCandidates() @@ -1153,8 +1211,15 @@ export default class RTCPeer { return getUserMedia(constraints) } + private _addEventListener(target: EventTarget, event: string, handler: EventListenerOrEventListenerObject, options?: boolean | AddEventListenerOptions) { + target.addEventListener(event, handler, options) + this._eventCleanup.push(() => { + target.removeEventListener(event, handler, options) + }) + } + private _attachListeners() { - this.instance.addEventListener('signalingstatechange', () => { + this._addEventListener(this.instance, 'signalingstatechange', () => { this.logger.debug('signalingState:', this.instance.signalingState) switch (this.instance.signalingState) { @@ -1165,6 +1230,13 @@ export default class RTCPeer { this._restartingIce = false this.resetNeedResume() + // For answer types, check if we need to send the SDP after becoming stable + if (this.isAnswer && this.instance.iceGatheringState === 'complete' && + this.instance.localDescription && !this._processingLocalSDP) { + this.logger.debug('Answer type stable with complete ICE - sending SDP') + this._sdpReady() + } + if (this.instance.connectionState === 'connected') { // An ice restart won't change the connectionState so we emit the same event in here // since the signalingState is "stable" again. @@ -1178,7 +1250,11 @@ export default class RTCPeer { } break } - // case 'have-remote-offer': {} + case 'have-remote-offer': + // We have a remote offer, need to create an answer + // Don't set _negotiating here as it will be set when we actually start negotiation + this.logger.debug('Have remote offer state - ready to create answer') + break case 'closed': // @ts-ignore delete this.instance @@ -1188,20 +1264,26 @@ export default class RTCPeer { } }) - this.instance.addEventListener('connectionstatechange', () => { + this._addEventListener(this.instance, 'connectionstatechange', () => { this.logger.debug('connectionState:', this.instance.connectionState) switch (this.instance.connectionState) { // case 'new': // break case 'connecting': + // Use longer timeout for answer-type connections (incoming calls) + // These need more time on Windows due to network complexity + const timeout = this.type === 'answer' + ? 10000 // 10 seconds for incoming calls + : this.options.maxConnectionStateTimeout; // 3 seconds for outgoing + this._connectionStateTimer = setTimeout(() => { - this.logger.warn('connectionState timed out') + this.logger.warn(`connectionState timed out after ${timeout}ms (type: ${this.type})`) if (this._hasMoreCandidates()) { this._retryWithMoreCandidates() } else { this.restartIceWithRelayOnly() } - }, this.options.maxConnectionStateTimeout) + }, timeout) break case 'connected': this.clearConnectionStateTimer() @@ -1213,22 +1295,56 @@ export default class RTCPeer { this.logger.debug('[test] Prevent reattach!') break case 'failed': { + this.logger.error('RTCPeerConnection entered failed state') + this.logger.error('Connection details:') + this.logger.error(' - connectionState:', this.instance.connectionState) + this.logger.error(' - iceConnectionState:', this.instance.iceConnectionState) + this.logger.error(' - signalingState:', this.instance.signalingState) + + // Debug ICE candidates + this.instance.getStats().then(stats => { + let candidatePairs = 0 + stats.forEach(stat => { + if (stat.type === 'candidate-pair') { + candidatePairs++ + this.logger.error(` - Candidate pair: state=${stat.state}, nominated=${stat.nominated}`) + } + }) + this.logger.error(` - Total candidate pairs: ${candidatePairs}`) + }).catch(err => { + this.logger.error('Failed to get stats:', err) + }) + this.triggerResume() break } } }) - this.instance.addEventListener('negotiationneeded', () => { - this.logger.debug('Negotiation needed event') - this.startNegotiation() + this._addEventListener(this.instance, 'negotiationneeded', () => { + this.logger.debug('Negotiation needed event, signaling state:', this.instance.signalingState) + + // For answer types (incoming calls), only negotiate if we're in specific states + if (this.isAnswer) { + // Only start negotiation if we haven't answered yet or have a new remote offer + if (!this.instance.remoteDescription || + this.instance.signalingState === 'have-remote-offer') { + this.logger.debug('Answer type: proceeding with negotiation') + this.startNegotiation() + } else { + this.logger.debug('Answer type: skipping negotiation - already answered or in stable state') + } + } else { + // For offer types (outgoing calls), always negotiate + this.startNegotiation() + } }) - this.instance.addEventListener('iceconnectionstatechange', () => { + this._addEventListener(this.instance, 'iceconnectionstatechange', () => { this.logger.debug('iceConnectionState:', this.instance.iceConnectionState) }) - this.instance.addEventListener('icegatheringstatechange', () => { + this._addEventListener(this.instance, 'icegatheringstatechange', () => { this.logger.debug('iceGatheringState:', this.instance.iceGatheringState) if (this.instance.iceGatheringState === 'complete') { this.logger.debug('ICE gathering complete') @@ -1240,7 +1356,7 @@ export default class RTCPeer { // this.logger.warn('IceCandidate Error:', event) // }) - this.instance.addEventListener('track', (event: RTCTrackEvent) => { + this._addEventListener(this.instance, 'track', ((event: RTCTrackEvent) => { this.logger.debug('Track event:', event, event.track.kind) // @ts-expect-error this.call.emit('track', event) @@ -1250,10 +1366,10 @@ export default class RTCPeer { // this.call._dispatchNotification(notification) } this._remoteStream = event.streams[0] - }) + }) as EventListener) // @ts-ignore - this.instance.addEventListener('addstream', (event: MediaStreamEvent) => { + this._addEventListener(this.instance, 'addstream', (event: MediaStreamEvent) => { if (event.stream) { this._remoteStream = event.stream } @@ -1298,13 +1414,13 @@ export default class RTCPeer { public _attachAudioTrackListener() { this.localStream?.getAudioTracks().forEach((track) => { - track.addEventListener('ended', this._onEndedTrackHandler) + this._addEventListener(track, 'ended', this._onEndedTrackHandler) }) } public _attachVideoTrackListener() { this.localStream?.getVideoTracks().forEach((track) => { - track.addEventListener('ended', this._onEndedTrackHandler) + this._addEventListener(track, 'ended', this._onEndedTrackHandler) }) } diff --git a/packages/webrtc/src/RTCPeerConnectionManager.ts b/packages/webrtc/src/RTCPeerConnectionManager.ts index 6995fbf1c..e93cb4e85 100644 --- a/packages/webrtc/src/RTCPeerConnectionManager.ts +++ b/packages/webrtc/src/RTCPeerConnectionManager.ts @@ -7,8 +7,24 @@ import { cleanupMockVideoTrack, } from './utils/mockTracks' -const maxPoolSize = 4 +const DEFAULT_POOL_SIZE = 3 +const MAX_POOL_SIZE = 10 const maxIceCandidatePoolSize = 20 +const DEFAULT_ICE_GATHERING_TIMEOUT = 30000 // 30 seconds +const MAX_ICE_GATHERING_RETRIES = 3 +const CIRCUIT_BREAKER_THRESHOLD = 5 // failures before opening circuit +const CIRCUIT_BREAKER_RESET_TIME = 60000 // 1 minute + +class PoolError extends Error { + constructor( + message: string, + public readonly retryable: boolean, + public readonly code: string + ) { + super(message) + this.name = 'PoolError' + } +} interface PooledConnection { id: string @@ -20,19 +36,60 @@ interface PooledConnection { audio?: MediaStreamTrack video?: MediaStreamTrack } - senders: RTCRtpSender[] + transceivers: RTCRtpTransceiver[] + eventListeners: Map +} + +interface PoolMetrics { + hits: number + misses: number + returns: number + failures: number + currentPoolSize: number + inUseCount: number + totalCreated: number + avgConnectionAge: number + activeTracksCount: number + cleanupFailures: number } export class RTCPeerConnectionManager { private pool: Map = new Map() + private inUseConnections: Map = new Map() private config: RTCConfiguration private poolSize: number private forceRefresh: boolean private turnRefreshInterval: number = 240000 // 4 minutes private refreshTimer?: ReturnType + private iceGatheringTimeout: number + private maxIceRetries: number private logger = getLogger() - - constructor(config: RTCConfiguration, poolSize = 3, forceRefresh = false) { + private metrics: PoolMetrics = { + hits: 0, + misses: 0, + returns: 0, + failures: 0, + currentPoolSize: 0, + inUseCount: 0, + totalCreated: 0, + avgConnectionAge: 0, + activeTracksCount: 0, + cleanupFailures: 0, + } + private circuitBreakerState: 'closed' | 'open' | 'half-open' = 'closed' + private circuitBreakerFailures = 0 + private circuitBreakerResetTimer?: ReturnType + + constructor( + config: RTCConfiguration, + poolSize = DEFAULT_POOL_SIZE, + enableAutoRefresh = true, // Renamed from forceRefresh for clarity + options: { + iceGatheringTimeout?: number + maxIceRetries?: number + turnRefreshInterval?: number + } = {} + ) { const iceCandidatePoolSize = config.iceCandidatePoolSize || 10 this.config = { ...config, @@ -42,8 +99,11 @@ export class RTCPeerConnectionManager { ? maxIceCandidatePoolSize : iceCandidatePoolSize, } - this.poolSize = poolSize > maxPoolSize ? maxPoolSize : poolSize - this.forceRefresh = forceRefresh + this.poolSize = poolSize > MAX_POOL_SIZE ? MAX_POOL_SIZE : poolSize + this.forceRefresh = enableAutoRefresh + this.turnRefreshInterval = options.turnRefreshInterval || 240000 // 4 minutes default + this.iceGatheringTimeout = options.iceGatheringTimeout || DEFAULT_ICE_GATHERING_TIMEOUT + this.maxIceRetries = options.maxIceRetries || MAX_ICE_GATHERING_RETRIES } /** @@ -70,10 +130,14 @@ export class RTCPeerConnectionManager { if (this.forceRefresh) { this.logger.info( - 'Manual force refresh mode enabled, TURN allocations will not be refreshed by this manager.' + `Auto-refresh enabled: TURN allocations will be refreshed every ${this.turnRefreshInterval / 1000}s` ) - // Start maintenance worker for TURN refresh + // Start maintenance worker for automatic TURN refresh this.startMaintenanceWorker() + } else { + this.logger.info( + 'Auto-refresh disabled: TURN allocations will not be automatically refreshed' + ) } } @@ -87,30 +151,134 @@ export class RTCPeerConnectionManager { this.logger.debug(`Connection state: ${conn.pc.connectionState}`) this.logger.debug(`Signaling state: ${conn.pc.signalingState}`) this.logger.debug(`ICE connection state: ${conn.pc.iceConnectionState}`) + this.logger.debug(`ICE gathering state: ${conn.pc.iceGatheringState}`) + + // Debug transceivers before cleanup + const transceivers = conn.pc.getTransceivers() + this.logger.debug(`Connection has ${transceivers.length} transceivers before cleanup:`) + transceivers.forEach((t, i) => { + const senderTrack = t.sender.track ? `${t.sender.track.kind}(${t.sender.track.id.substring(0, 8)})` : 'null' + const receiverTrack = t.receiver.track ? `${t.receiver.track.kind}(${t.receiver.track.id.substring(0, 8)})` : 'null' + this.logger.debug(` Transceiver ${i}: mid=${t.mid}, direction=${t.direction}, sender.track=${senderTrack}, receiver.track=${receiverTrack}`) + }) + if (this.isConnectionValid(conn)) { this.logger.info(`Providing pooled connection ${id}`) - // Remove from pool + // Update metrics + this.metrics.hits++ + + // Remove from pool and track as in-use this.pool.delete(id) + this.inUseConnections.set(conn.pc, id) // Clean up mock tracks completely before returning this.cleanupMockTracks(conn) + // Debug transceivers after cleanup + const transceiversAfter = conn.pc.getTransceivers() + this.logger.debug(`After cleanup, connection has ${transceiversAfter.length} transceivers:`) + transceiversAfter.forEach((t, i) => { + const senderTrack = t.sender.track ? `${t.sender.track.kind}(${t.sender.track.id.substring(0, 8)})` : 'null' + const receiverTrack = t.receiver.track ? `${t.receiver.track.kind}(${t.receiver.track.id.substring(0, 8)})` : 'null' + this.logger.debug(` Transceiver ${i}: mid=${t.mid}, direction=${t.direction}, sender.track=${senderTrack}, receiver.track=${receiverTrack}`) + }) + // Replenish pool in background this.replenishPool().catch((err) => { this.logger.error('Failed to replenish pool:', err) }) + // Update current counts + this.updateMetricCounts() + // Return clean RTCPeerConnection ready for use return conn.pc } } this.logger.warn('No valid pooled connections available') + this.metrics.misses++ + this.updateMetricCounts() return null } + /** + * Return a connection to the pool for reuse + */ + returnConnection(pc: RTCPeerConnection): void { + const connectionId = this.inUseConnections.get(pc) + + if (!connectionId) { + this.logger.debug('Connection not tracked, cannot return to pool') + return + } + + this.inUseConnections.delete(pc) + + // Check if connection is still reusable + if ( + pc.connectionState === 'closed' || + pc.connectionState === 'failed' || + pc.signalingState === 'closed' + ) { + this.logger.info(`Connection ${connectionId} is not reusable, closing`) + try { + pc.close() + } catch (e) { + // Ignore close errors + } + // Replenish pool since we lost a connection + this.replenishPool().catch((err) => { + this.logger.error('Failed to replenish after return:', err) + }) + return + } + + // Reset the connection for reuse + try { + // Remove any remaining tracks + pc.getSenders().forEach(sender => { + if (sender.track) { + pc.removeTrack(sender) + } + }) + + // Clear any data channels + // Note: There's no direct API to close all data channels, + // they should be closed by the application before returning + + // Re-add to pool + const pooledConnection: PooledConnection = { + id: connectionId, + pc, + createdAt: Date.now(), + lastRefreshed: Date.now(), + iceGatheringComplete: true, + mockTracks: {}, + transceivers: [], + eventListeners: new Map(), + } + + this.pool.set(connectionId, pooledConnection) + this.logger.info(`Returned connection ${connectionId} to pool`) + this.metrics.returns++ + this.updateMetricCounts() + } catch (error) { + this.logger.error(`Failed to return connection ${connectionId}:`, error) + try { + pc.close() + } catch (e) { + // Ignore close errors + } + // Replenish pool since we failed to return + this.replenishPool().catch((err) => { + this.logger.error('Failed to replenish after return error:', err) + }) + } + } + /** * Clean up the manager and all connections */ @@ -123,18 +291,39 @@ export class RTCPeerConnectionManager { this.refreshTimer = undefined } - // Close all connections + // Close all pooled connections for (const [, conn] of this.pool.entries()) { this.closeConnection(conn) } + // Close all in-use connections + for (const [pc, id] of this.inUseConnections.entries()) { + this.logger.debug(`Closing in-use connection ${id}`) + try { + pc.close() + } catch (e) { + // Ignore close errors + } + } + this.pool.clear() + this.inUseConnections.clear() } /** * Create a new pooled connection with pre-gathered ICE candidates */ - private async createPooledConnection(): Promise { + private async createPooledConnection(retryCount = 0): Promise { + // Check circuit breaker + if (this.circuitBreakerState === 'open') { + this.logger.warn('Circuit breaker is open, skipping connection creation') + throw new PoolError( + 'Circuit breaker open - too many failures', + false, + 'CIRCUIT_OPEN' + ) + } + try { const pc = RTCPeerConnection(this.config) const id = `conn_${Date.now()}_${Math.random() @@ -147,17 +336,22 @@ export class RTCPeerConnectionManager { const audioTrack = createMockAudioTrack() const videoTrack = createMockVideoTrack() // May be null on Safari - const senders: RTCRtpSender[] = [] + const transceivers: RTCRtpTransceiver[] = [] - // Add audio track - const audioSender = pc.addTrack(audioTrack) - senders.push(audioSender) + // Use addTransceiver for audio to create proper transceiver + // Note: We pass the track to set up the sender, direction controls send/receive capability + const audioTransceiver = pc.addTransceiver(audioTrack, { + direction: 'sendrecv', + }) + transceivers.push(audioTransceiver) - // Add video track if available (not Safari) - let videoSender: RTCRtpSender | null = null + // Add video transceiver if video track is available (not Safari) + let videoTransceiver: RTCRtpTransceiver | null = null if (videoTrack) { - videoSender = pc.addTrack(videoTrack) - senders.push(videoSender) + videoTransceiver = pc.addTransceiver(videoTrack, { + direction: 'sendrecv', + }) + transceivers.push(videoTransceiver) } // Create offer to start ICE gathering @@ -170,6 +364,12 @@ export class RTCPeerConnectionManager { // Wait for ICE gathering to complete await this.waitForIceGathering(pc) + // IMPORTANT: Do NOT rollback! We need to keep the local description + // with the ICE candidates. The connection will be in have-local-offer state + // but that's fine - when we use it for a real call, we'll create a new offer + // which will replace this one while preserving the gathered ICE candidates. + this.logger.debug(`Connection ${id} has ICE candidates gathered in offer`) + // Create pooled connection object const pooledConnection: PooledConnection = { id, @@ -181,21 +381,112 @@ export class RTCPeerConnectionManager { audio: audioTrack, video: videoTrack || undefined, }, - senders, + transceivers, + eventListeners: new Map(), } this.logger.debug(`Pooled connection ${id} created successfully`) + this.logger.debug(`Connection state: ${pc.connectionState}`) + this.logger.debug(`Signaling state: ${pc.signalingState}`) + this.logger.debug(`ICE gathering state: ${pc.iceGatheringState}`) this.logger.debug( - `ICE candidates gathered for connection ${id}:`, - pc.localDescription?.sdp + `ICE candidates gathered: ${pc.localDescription ? 'YES' : 'NO'}` ) + + // Update metrics + this.metrics.totalCreated++ + + // Reset circuit breaker on success + if (this.circuitBreakerState === 'half-open') { + this.logger.info('Circuit breaker reset to closed after successful creation') + this.circuitBreakerState = 'closed' + this.circuitBreakerFailures = 0 + } + return pooledConnection } catch (error) { - this.logger.error('Failed to create pooled connection:', error) + this.logger.error(`Failed to create pooled connection (attempt ${retryCount + 1}):`, error) + + // Retry with exponential backoff + if (retryCount < this.maxIceRetries - 1) { + const delay = Math.min(1000 * Math.pow(2, retryCount), 5000) // Max 5s delay + this.logger.info(`Retrying connection creation in ${delay}ms...`) + await new Promise(resolve => setTimeout(resolve, delay)) + return this.createPooledConnection(retryCount + 1) + } + + // Track final failure + this.metrics.failures++ + + // Update circuit breaker + this.handleCircuitBreakerFailure() + return null } } + /** + * Handle circuit breaker failure + */ + private handleCircuitBreakerFailure(): void { + this.circuitBreakerFailures++ + + if (this.circuitBreakerFailures >= CIRCUIT_BREAKER_THRESHOLD) { + this.logger.error('Circuit breaker opening due to excessive failures') + this.circuitBreakerState = 'open' + + // Schedule circuit breaker reset + if (this.circuitBreakerResetTimer) { + clearTimeout(this.circuitBreakerResetTimer) + } + + this.circuitBreakerResetTimer = setTimeout(() => { + this.logger.info('Circuit breaker entering half-open state') + this.circuitBreakerState = 'half-open' + this.circuitBreakerResetTimer = undefined + }, CIRCUIT_BREAKER_RESET_TIME) + } + } + + /** + * Update current metric counts + */ + private updateMetricCounts(): void { + this.metrics.currentPoolSize = this.pool.size + this.metrics.inUseCount = this.inUseConnections.size + + // Calculate average connection age + if (this.pool.size > 0) { + const now = Date.now() + let totalAge = 0 + for (const conn of this.pool.values()) { + totalAge += now - conn.createdAt + } + this.metrics.avgConnectionAge = totalAge / this.pool.size + } + } + + /** + * Get current pool metrics + */ + getMetrics(): PoolMetrics { + this.updateMetricCounts() + return { ...this.metrics } + } + + /** + * Log current metrics + */ + logMetrics(): void { + const metrics = this.getMetrics() + this.logger.info('Pool Metrics:', { + hitRate: metrics.hits > 0 ? + `${((metrics.hits / (metrics.hits + metrics.misses)) * 100).toFixed(2)}%` : '0%', + ...metrics, + avgConnectionAgeSeconds: (metrics.avgConnectionAge / 1000).toFixed(2), + }) + } + /** * Wait for ICE gathering to complete or timeout */ @@ -220,15 +511,22 @@ export class RTCPeerConnectionManager { pc.addEventListener('icegatheringstatechange', onGatheringComplete) - // Timeout after 10 seconds + // Timeout after configured duration const timer = setTimeout(() => { - this.logger.warn('ICE gathering timeout, proceeding anyway') + this.logger.warn(`ICE gathering timeout after ${this.iceGatheringTimeout}ms, proceeding anyway`) cleanup() resolve() - }, 10000) + }, this.iceGatheringTimeout) }) } + /** + * Verify that a track is properly stopped + */ + private verifyTrackStopped(track: MediaStreamTrack): boolean { + return track.readyState === 'ended' + } + /** * Clean up mock tracks and event listeners before handing off connection */ @@ -238,27 +536,76 @@ export class RTCPeerConnectionManager { // Stop mock tracks first (important for cleanup) if (conn.mockTracks.audio) { cleanupMockAudioTrack(conn.mockTracks.audio) + + // Verify cleanup + if (!this.verifyTrackStopped(conn.mockTracks.audio)) { + this.logger.warn(`Audio track for ${conn.id} may not be properly stopped`) + this.metrics.cleanupFailures++ + try { + conn.mockTracks.audio.stop() + } catch (e) { + // Ignore if already stopped + } + } } if (conn.mockTracks.video) { cleanupMockVideoTrack(conn.mockTracks.video) + + // Verify cleanup + if (!this.verifyTrackStopped(conn.mockTracks.video)) { + this.logger.warn(`Video track for ${conn.id} may not be properly stopped`) + this.metrics.cleanupFailures++ + try { + conn.mockTracks.video.stop() + } catch (e) { + // Ignore if already stopped + } + } } - // Remove senders from peer connection - // Note: removeTrack sets sender.track to null but keeps sender in getSenders() - conn.senders.forEach((sender) => { - try { - conn.pc.removeTrack(sender) - } catch (error) { - this.logger.warn('Error removing track:', error) + // Clean up transceivers by stopping and removing mock tracks + // Note: replaceTrack is async, but we need this to be synchronous + // So we'll use a Promise to handle it properly + const transceivers = conn.pc.getTransceivers() + this.logger.debug(`Cleaning up ${transceivers.length} transceivers`) + + const cleanupPromises: Promise[] = [] + + for (let i = 0; i < transceivers.length; i++) { + const transceiver = transceivers[i] + if (transceiver.sender.track) { + const trackKind = transceiver.sender.track.kind + const trackId = transceiver.sender.track.id + this.logger.debug(`Stopping ${trackKind} track (${trackId.substring(0, 8)}) on transceiver ${i}`) + + // First stop the track to release resources + transceiver.sender.track.stop() + + // Schedule async replacement + const promise = transceiver.sender.replaceTrack(null) + .then(() => { + this.logger.debug(`Successfully nullified track on transceiver ${i}`) + }) + .catch((error) => { + this.logger.warn(`Error replacing track with null on transceiver ${i}:`, error) + }) + + cleanupPromises.push(promise) } + } + + // Fire and forget - the cleanup will happen async + // The connection is still usable even with tracks being cleaned up + Promise.all(cleanupPromises).then(() => { + this.logger.debug(`All transceivers cleaned for connection ${conn.id}`) }) // CRITICAL: Remove ALL event listeners to prevent conflicts with RTCPeer - this.cleanupEventListeners(conn.pc) + this.cleanupEventListeners(conn) // Clear references to prevent memory leaks conn.mockTracks = {} - conn.senders = [] + // Note: transceivers remain with the connection, just without tracks // The peer connection is now ready with: // - No active tracks @@ -270,8 +617,16 @@ export class RTCPeerConnectionManager { /** * Remove all event listeners from RTCPeerConnection */ - private cleanupEventListeners(pc: RTCPeerConnection): void { - // Remove all possible event listeners by setting handlers to null + private cleanupEventListeners(conn: PooledConnection): void { + // Properly remove all tracked event listeners + conn.eventListeners.forEach((listeners, eventName) => { + listeners.forEach(listener => { + conn.pc.removeEventListener(eventName, listener) + }) + }) + conn.eventListeners.clear() + + // Also clear any inline event handlers const events = [ 'icecandidate', 'icegatheringstatechange', @@ -286,9 +641,8 @@ export class RTCPeerConnectionManager { ] events.forEach((event) => { - // This removes all listeners for each event type // @ts-ignore - setting to null is valid - pc[`on${event}`] = null + conn.pc[`on${event}`] = null }) } @@ -307,8 +661,8 @@ export class RTCPeerConnectionManager { return false } - // Check signaling state (must be stable for reuse) - if (conn.pc.signalingState === 'closed') { + // Check signaling state (should be stable after rollback) + if (conn.pc.signalingState !== 'stable') { this.logger.debug( `Pooled connection ${conn.id} signalingState is not valid: ${conn.pc.signalingState}` ) @@ -316,10 +670,9 @@ export class RTCPeerConnectionManager { } // Check if ICE connection is still valid - if ( - conn.pc.iceConnectionState === 'failed' || - conn.pc.iceConnectionState === 'disconnected' - ) { + // Note: 'disconnected' state is temporary and connections can recover + // Only reject 'failed' state which is permanent + if (conn.pc.iceConnectionState === 'failed') { this.logger.debug( `Pooled connection ${conn.id} iceConnectionState is not valid: ${conn.pc.iceConnectionState}` ) @@ -348,12 +701,21 @@ export class RTCPeerConnectionManager { this.logger.debug(`Replenishing pool with ${needed} connections`) - for (let i = 0; i < needed; i++) { - const conn = await this.createPooledConnection() + // Create all needed connections in parallel + const promises = Array(needed) + .fill(null) + .map(() => this.createPooledConnection()) + + const connections = await Promise.all(promises) + + // Add successful connections to the pool + connections.forEach((conn) => { if (conn) { this.pool.set(conn.id, conn) } - } + }) + + this.logger.debug(`Replenished pool with ${connections.filter(c => c !== null).length} connections`) } /** @@ -371,6 +733,14 @@ export class RTCPeerConnectionManager { }, this.turnRefreshInterval) } + /** + * Manually refresh all connections (public method) + */ + async refreshAllConnections(): Promise { + this.logger.info('Manually refreshing all pooled connections') + await this.refreshTurnAllocations() + } + /** * Refresh TURN allocations for all pooled connections */ diff --git a/packages/webrtc/src/connectionPoolManager.ts b/packages/webrtc/src/connectionPoolManager.ts index db69a2cb2..dad1d22a9 100644 --- a/packages/webrtc/src/connectionPoolManager.ts +++ b/packages/webrtc/src/connectionPoolManager.ts @@ -27,7 +27,7 @@ class ConnectionPoolManagerSingleton { this.manager = new RTCPeerConnectionManager( rtcConfig, - options.poolSize ?? 2 + options.poolSize ?? 3 // Increased default from 2 to 3 ) await this.manager.initializePool() @@ -41,6 +41,14 @@ class ConnectionPoolManagerSingleton { return this.manager.getConnection() } + returnConnection(pc: RTCPeerConnection): void { + if (!this.manager) { + this.logger.warn('Connection pool not initialized, cannot return connection') + return + } + this.manager.returnConnection(pc) + } + cleanup(): void { if (this.manager) { this.logger.info('Cleaning up connection pool')