From c81b063df096932d0423b0c7cc259ca117f58da5 Mon Sep 17 00:00:00 2001 From: Zeeshan Date: Fri, 24 Oct 2025 16:39:15 +0530 Subject: [PATCH 1/2] Add support for MAX_CONCURRENT_PROCESSING in snapshot processing and enhance processing logic --- src/lib/env.ts | 6 +++-- src/lib/snapshotQueue.ts | 57 ++++++++++++++++++++++++++++++++++------ src/types.ts | 1 + 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/lib/env.ts b/src/lib/env.ts index 6efc67ee..1402aa77 100644 --- a/src/lib/env.ts +++ b/src/lib/env.ts @@ -25,7 +25,8 @@ export default (): Env => { SMART_GIT, SHOW_RENDER_ERRORS, SMARTUI_SSE_URL='https://server-events.lambdatest.com', - LT_SDK_SKIP_EXECUTION_LOGS + LT_SDK_SKIP_EXECUTION_LOGS, + MAX_CONCURRENT_PROCESSING } = process.env return { @@ -52,6 +53,7 @@ export default (): Env => { SMART_GIT: SMART_GIT === 'true', SHOW_RENDER_ERRORS: SHOW_RENDER_ERRORS === 'true', SMARTUI_SSE_URL, - LT_SDK_SKIP_EXECUTION_LOGS: LT_SDK_SKIP_EXECUTION_LOGS === 'true' + LT_SDK_SKIP_EXECUTION_LOGS: LT_SDK_SKIP_EXECUTION_LOGS === 'true', + MAX_CONCURRENT_PROCESSING: MAX_CONCURRENT_PROCESSING ? parseInt(MAX_CONCURRENT_PROCESSING, 10) : 0, } } diff --git a/src/lib/snapshotQueue.ts b/src/lib/snapshotQueue.ts index e36af8cd..0b35c0c0 100644 --- a/src/lib/snapshotQueue.ts +++ b/src/lib/snapshotQueue.ts @@ -14,6 +14,8 @@ export default class Queue { private ctx: Context; private snapshotNames: Array = []; private variants: Array = []; + private activeProcessingCount: number = 0; + private readonly MAX_CONCURRENT_PROCESSING = 5; constructor(ctx: Context) { this.ctx = ctx; @@ -275,15 +277,59 @@ export default class Queue { private async processNext(): Promise { if (!this.isEmpty()) { + const useRemoteDiscovery = this.ctx.env.USE_REMOTE_DISCOVERY || this.ctx.config.useRemoteDiscovery; + + if (useRemoteDiscovery && !this.ctx.config.delayedUpload && !this.ctx.config.allowDuplicateSnapshotNames) { + const maxConcurrentProcessing = this.ctx.env.MAX_CONCURRENT_PROCESSING === 0 ? this.MAX_CONCURRENT_PROCESSING : this.ctx.env.MAX_CONCURRENT_PROCESSING; + const snapshotsToProcess: Array = []; + const maxSnapshots = Math.min(maxConcurrentProcessing - this.activeProcessingCount, this.snapshots.length); + + for (let i = 0; i < maxSnapshots; i++) { + let snapshot; + if (this.ctx.config.delayedUpload) { + snapshot = this.snapshots.pop(); + } else { + snapshot = this.snapshots.shift(); + } + if (snapshot) { + snapshotsToProcess.push(snapshot); + } + } + + if (snapshotsToProcess.length > 0) { + this.activeProcessingCount += snapshotsToProcess.length; + const processingPromises = snapshotsToProcess.map(snapshot => this.processSnapshot(snapshot)); + await Promise.allSettled(processingPromises); + this.activeProcessingCount -= snapshotsToProcess.length; + + if (!this.isEmpty()) { + this.processNext(); + } else { + this.processing = false; + } + return; + } + } + let snapshot; if (this.ctx.config.delayedUpload) { snapshot = this.snapshots.pop(); } else { snapshot = this.snapshots.shift(); } - try { - this.processingSnapshot = snapshot?.name; - let drop = false; + if (snapshot) { + await this.processSnapshot(snapshot); + this.processNext(); + } + } else { + this.processing = false; + } + } + + private async processSnapshot(snapshot: Snapshot): Promise { + try { + this.processingSnapshot = snapshot?.name; + let drop = false; if (this.ctx.isStartExec) { @@ -450,7 +496,6 @@ export default class Queue { if(snapshot?.options?.contextId){ this.ctx.contextToSnapshotMap?.set(snapshot?.options?.contextId,'2'); } - this.processNext(); } else { let approvalThreshold = snapshot?.options?.approvalThreshold || this.ctx.config.approvalThreshold; let rejectionThreshold = snapshot?.options?.rejectionThreshold || this.ctx.config.rejectionThreshold; @@ -487,10 +532,6 @@ export default class Queue { this.ctx.log.debug(`Closed browser context for snapshot ${snapshot.name}`); } } - this.processNext(); - } else { - this.processing = false; - } } isProcessing(): boolean { diff --git a/src/types.ts b/src/types.ts index feb53f86..fc59dc43 100644 --- a/src/types.ts +++ b/src/types.ts @@ -125,6 +125,7 @@ export interface Env { SHOW_RENDER_ERRORS: boolean; SMARTUI_SSE_URL: string; LT_SDK_SKIP_EXECUTION_LOGS: boolean; + MAX_CONCURRENT_PROCESSING: number; } export interface Snapshot { From 5f43e6b1675626fe0000eb1481666cf2b03f4f3a Mon Sep 17 00:00:00 2001 From: Zeeshan Date: Wed, 29 Oct 2025 17:39:02 +0530 Subject: [PATCH 2/2] Enhance snapshot processing by capping MAX_CONCURRENT_PROCESSING to valid range and logging updates --- src/lib/snapshotQueue.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/lib/snapshotQueue.ts b/src/lib/snapshotQueue.ts index 0b35c0c0..af6d61da 100644 --- a/src/lib/snapshotQueue.ts +++ b/src/lib/snapshotQueue.ts @@ -280,7 +280,13 @@ export default class Queue { const useRemoteDiscovery = this.ctx.env.USE_REMOTE_DISCOVERY || this.ctx.config.useRemoteDiscovery; if (useRemoteDiscovery && !this.ctx.config.delayedUpload && !this.ctx.config.allowDuplicateSnapshotNames) { - const maxConcurrentProcessing = this.ctx.env.MAX_CONCURRENT_PROCESSING === 0 ? this.MAX_CONCURRENT_PROCESSING : this.ctx.env.MAX_CONCURRENT_PROCESSING; + let maxConcurrentProcessing = this.ctx.env.MAX_CONCURRENT_PROCESSING === 0 ? this.MAX_CONCURRENT_PROCESSING : this.ctx.env.MAX_CONCURRENT_PROCESSING; + if (maxConcurrentProcessing > 15 || maxConcurrentProcessing < 1) { + this.ctx.log.info(`Larger than 15 concurrent processing. Setting to 5.`); + maxConcurrentProcessing = 5; + } + + this.ctx.log.info(`Max concurrent processing: ${maxConcurrentProcessing}`); const snapshotsToProcess: Array = []; const maxSnapshots = Math.min(maxConcurrentProcessing - this.activeProcessingCount, this.snapshots.length);