diff --git a/src/agent-task-executor.ts b/src/agent-task-executor.ts index 602e340..b29e5f0 100644 --- a/src/agent-task-executor.ts +++ b/src/agent-task-executor.ts @@ -98,6 +98,13 @@ export class AgentTaskExecutor implements AgentExecutor { resolve(); }); + // Finalizer: fires exactly once on every terminal path (incl. cancel, which emits no + // done/error). Idempotent cleanup — the seam where worktree.dispose()/slot-release will + // attach later. Publishes no A2A event (preserves the event stream). + runner!.on('settled', () => { + this._activeRunners.delete(taskId); + }); + runner!.start(); }); }; diff --git a/src/process-runner.ts b/src/process-runner.ts index aaaf7af..cad2405 100644 --- a/src/process-runner.ts +++ b/src/process-runner.ts @@ -4,7 +4,7 @@ import type { ChildProcess } from 'node:child_process'; import type { Span } from '@opentelemetry/api'; import { SpanStatusCode } from '@opentelemetry/api'; import type { ProcessAdapter, AgentEvent, AgentStats } from './adapters/base.js'; -import type { Runner } from './runner.js'; +import type { Runner, TerminalReason } from './runner.js'; import type { Config } from './types.js'; import { tracer, inputTokenCounter, outputTokenCounter, taskDurationHist, taskErrorCounter, context } from './telemetry.js'; @@ -30,9 +30,11 @@ export declare interface ProcessRunner { on(event: 'agent-event', listener: (e: AgentEvent) => void): this; on(event: 'done', listener: (exitCode: number, stderr: string) => void): this; on(event: 'error', listener: (err: Error) => void): this; + on(event: 'settled', listener: (reason: TerminalReason) => void): this; emit(event: 'agent-event', e: AgentEvent): boolean; emit(event: 'done', exitCode: number, stderr: string): boolean; emit(event: 'error', err: Error): boolean; + emit(event: 'settled', reason: TerminalReason): boolean; } /** @@ -48,6 +50,7 @@ export class ProcessRunner extends EventEmitter implements Runner { private _child: ChildProcess | null = null; private _cancelled = false; private _exited = false; + private _settled = false; private _timeoutHandle: ReturnType | null = null; private _idleHandle: ReturnType | null = null; private _stderrBuffer = ''; @@ -92,6 +95,7 @@ export class ProcessRunner extends EventEmitter implements Runner { 'error', new Error(`Failed to spawn "${binary}": ${String(err)}`), ); + this._settle('failed'); return; } @@ -100,6 +104,7 @@ export class ProcessRunner extends EventEmitter implements Runner { if (child.stdout === null || child.stderr === null) { this._finalizeOtel(-1); this.emit('error', new Error(`"${binary}" process has no stdout/stderr`)); + this._settle('failed'); return; } @@ -141,6 +146,7 @@ export class ProcessRunner extends EventEmitter implements Runner { } taskErrorCounter.add(1, { adapter: adapter.name, error_kind: 'spawn_error' }); this.emit('error', new Error(`Failed to spawn "${binary}": ${err.message}`)); + this._settle('failed'); } }); @@ -150,9 +156,13 @@ export class ProcessRunner extends EventEmitter implements Runner { // Always finalize telemetry regardless of whether this is a normal exit, // a cancellation, or a timeout — _finalizeOtel is idempotent. this._finalizeOtel(exitCode); - if (this._cancelled || this._exited) return; - this._exited = true; - this.emit('done', exitCode, this._stderrBuffer); + if (!this._cancelled && !this._exited) { + this._exited = true; + this.emit('done', exitCode, this._stderrBuffer); + } + // `settled` fires on every close, including cancellation (which emits no done/error). + // Timeout/idle already settled ('timed-out') so _settle here is a guarded no-op for them. + this._settle(this._cancelled ? 'cancelled' : exitCode === 0 ? 'succeeded' : 'failed'); }); if (config.agentTimeoutMs > 0) { @@ -163,6 +173,7 @@ export class ProcessRunner extends EventEmitter implements Runner { this._exited = true; this.emit('done', -1, this._stderrBuffer); } + this._settle('timed-out'); } }, config.agentTimeoutMs); } @@ -176,7 +187,11 @@ export class ProcessRunner extends EventEmitter implements Runner { this._cancelled = true; this._clearTimers(); const child = this._child; - if (child === null) return; + if (child === null) { + // Cancelled before the process started: no close event will fire, so settle here. + this._settle('cancelled'); + return; + } child.kill('SIGTERM'); const kill = setTimeout(() => { if (!this._exited) child.kill('SIGKILL'); @@ -189,6 +204,13 @@ export class ProcessRunner extends EventEmitter implements Runner { this._child?.stdin?.write(answer + '\n'); } + /** Emits `settled` exactly once. Idempotent — subsequent terminal paths are no-ops. */ + private _settle(reason: TerminalReason): void { + if (this._settled) return; + this._settled = true; + this.emit('settled', reason); + } + private _processLine(line: string): void { const { adapter, config } = this._options; if (adapter.isApprovalPrompt(line)) { @@ -247,6 +269,7 @@ export class ProcessRunner extends EventEmitter implements Runner { this._exited = true; this.emit('done', 0, this._stderrBuffer); } + this._settle('timed-out'); } }, idleMs); } diff --git a/src/runner.ts b/src/runner.ts index 38bb3a5..88b3e82 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -1,5 +1,11 @@ import type { AgentEvent } from './adapters/base.js'; +/** + * Why a run reached a terminal state. Emitted with the `settled` event so a finalizer can + * run once per run regardless of which path ended it. + */ +export type TerminalReason = 'succeeded' | 'failed' | 'cancelled' | 'timed-out'; + /** * Transport-agnostic execution port. * @@ -22,8 +28,14 @@ export interface Runner { /** Emitted for each parsed {@link AgentEvent}. */ on(event: 'agent-event', listener: (e: AgentEvent) => void): this; - /** Emitted once when execution finishes (exit code and accumulated stderr). */ + /** Emitted once when execution finishes normally (exit code and accumulated stderr). */ on(event: 'done', listener: (exitCode: number, stderr: string) => void): this; /** Emitted if execution cannot start or fails unexpectedly. */ on(event: 'error', listener: (err: Error) => void): this; + /** + * Emitted **exactly once per run, on every terminal path** — including cancellation, which + * emits neither `done` nor `error`. The single hook a finalizer subscribes to for + * guaranteed cleanup. `done`/`error` (when they occur) fire before `settled`. + */ + on(event: 'settled', listener: (reason: TerminalReason) => void): this; } diff --git a/tests/unit/agent-task-executor.test.ts b/tests/unit/agent-task-executor.test.ts index 6af63a3..023fdec 100644 --- a/tests/unit/agent-task-executor.test.ts +++ b/tests/unit/agent-task-executor.test.ts @@ -380,3 +380,21 @@ describe('AgentTaskExecutor routing', () => { expect(seen).toEqual([undefined, undefined]); }); }); + +describe('AgentTaskExecutor settled finalizer', () => { + it('removes the runner on settled so the next execute starts fresh', () => { + const executor = new AgentTaskExecutor(baseConfig, mockAdapter); + const ctx = makeContext({ taskId: 'task-s' }); + + void executor.execute(ctx, makeBus() as never); + expect(vi.mocked(ProcessRunner)).toHaveBeenCalledTimes(1); + + // Finalizer fires — should evict the runner from the active map. + getMockRunner().emit('settled', 'succeeded'); + + // Same taskId again must build a NEW runner (not hit the resume path). + void executor.execute(ctx, makeBus() as never); + expect(vi.mocked(ProcessRunner)).toHaveBeenCalledTimes(2); + expect(getMockRunner().resume).not.toHaveBeenCalled(); + }); +}); diff --git a/tests/unit/process-runner.test.ts b/tests/unit/process-runner.test.ts index 1051c3f..ac3fb30 100644 --- a/tests/unit/process-runner.test.ts +++ b/tests/unit/process-runner.test.ts @@ -597,3 +597,51 @@ describe('ProcessRunner', () => { }); }); }); + + describe('settled event', () => { + it('emits settled("succeeded") after done on a clean exit', async () => { + const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig }); + const order: string[] = []; + const settled = new Promise((resolve) => { + runner.on('done', () => order.push('done')); + runner.on('settled', (reason) => { order.push('settled'); resolve(reason); }); + }); + runner.start(); + exitChild(0); + expect(await settled).toBe('succeeded'); + expect(order).toEqual(['done', 'settled']); // done precedes settled + }); + + it('emits settled("failed") on a non-zero exit', async () => { + const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig }); + const settled = new Promise((resolve) => runner.on('settled', resolve)); + runner.start(); + exitChild(1); + expect(await settled).toBe('failed'); + }); + + it('emits settled("cancelled") on cancel — the path that emits no done/error', async () => { + const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig }); + let doneOrError = false; + const settled = new Promise((resolve) => { + runner.on('done', () => { doneOrError = true; }); + runner.on('error', () => { doneOrError = true; }); + runner.on('settled', resolve); + }); + runner.start(); + runner.cancel(); + exitChild(143); // SIGTERM close after cancel + expect(await settled).toBe('cancelled'); + expect(doneOrError).toBe(false); + }); + + it('emits settled only once even if close arrives twice', async () => { + const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig }); + const reasons: string[] = []; + runner.on('settled', (r) => reasons.push(r)); + runner.start(); + exitChild(0); + exitChild(0); + expect(reasons).toEqual(['succeeded']); + }); + });