Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/agent-task-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ export class AgentTaskExecutor implements AgentExecutor {
resolve();
});

// Finalizer: fires exactly once on every terminal path (incl. cancel, which emits no
// done/error). Idempotent cleanup — the seam where worktree.dispose()/slot-release will
// attach later. Publishes no A2A event (preserves the event stream).
runner!.on('settled', () => {
this._activeRunners.delete(taskId);
});

runner!.start();
});
};
Expand Down
33 changes: 28 additions & 5 deletions src/process-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { ChildProcess } from 'node:child_process';
import type { Span } from '@opentelemetry/api';
import { SpanStatusCode } from '@opentelemetry/api';
import type { ProcessAdapter, AgentEvent, AgentStats } from './adapters/base.js';
import type { Runner } from './runner.js';
import type { Runner, TerminalReason } from './runner.js';
import type { Config } from './types.js';
import { tracer, inputTokenCounter, outputTokenCounter, taskDurationHist, taskErrorCounter, context } from './telemetry.js';

Expand All @@ -30,9 +30,11 @@ export declare interface ProcessRunner {
on(event: 'agent-event', listener: (e: AgentEvent) => void): this;
on(event: 'done', listener: (exitCode: number, stderr: string) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'settled', listener: (reason: TerminalReason) => void): this;
emit(event: 'agent-event', e: AgentEvent): boolean;
emit(event: 'done', exitCode: number, stderr: string): boolean;
emit(event: 'error', err: Error): boolean;
emit(event: 'settled', reason: TerminalReason): boolean;
}

/**
Expand All @@ -48,6 +50,7 @@ export class ProcessRunner extends EventEmitter implements Runner {
private _child: ChildProcess | null = null;
private _cancelled = false;
private _exited = false;
private _settled = false;
private _timeoutHandle: ReturnType<typeof setTimeout> | null = null;
private _idleHandle: ReturnType<typeof setTimeout> | null = null;
private _stderrBuffer = '';
Expand Down Expand Up @@ -92,6 +95,7 @@ export class ProcessRunner extends EventEmitter implements Runner {
'error',
new Error(`Failed to spawn "${binary}": ${String(err)}`),
);
this._settle('failed');
return;
}

Expand All @@ -100,6 +104,7 @@ export class ProcessRunner extends EventEmitter implements Runner {
if (child.stdout === null || child.stderr === null) {
this._finalizeOtel(-1);
this.emit('error', new Error(`"${binary}" process has no stdout/stderr`));
this._settle('failed');
return;
}

Expand Down Expand Up @@ -141,6 +146,7 @@ export class ProcessRunner extends EventEmitter implements Runner {
}
taskErrorCounter.add(1, { adapter: adapter.name, error_kind: 'spawn_error' });
this.emit('error', new Error(`Failed to spawn "${binary}": ${err.message}`));
this._settle('failed');
}
});

Expand All @@ -150,9 +156,13 @@ export class ProcessRunner extends EventEmitter implements Runner {
// Always finalize telemetry regardless of whether this is a normal exit,
// a cancellation, or a timeout — _finalizeOtel is idempotent.
this._finalizeOtel(exitCode);
if (this._cancelled || this._exited) return;
this._exited = true;
this.emit('done', exitCode, this._stderrBuffer);
if (!this._cancelled && !this._exited) {
this._exited = true;
this.emit('done', exitCode, this._stderrBuffer);
}
// `settled` fires on every close, including cancellation (which emits no done/error).
// Timeout/idle already settled ('timed-out') so _settle here is a guarded no-op for them.
this._settle(this._cancelled ? 'cancelled' : exitCode === 0 ? 'succeeded' : 'failed');
});

if (config.agentTimeoutMs > 0) {
Expand All @@ -163,6 +173,7 @@ export class ProcessRunner extends EventEmitter implements Runner {
this._exited = true;
this.emit('done', -1, this._stderrBuffer);
}
this._settle('timed-out');
}
}, config.agentTimeoutMs);
}
Expand All @@ -176,7 +187,11 @@ export class ProcessRunner extends EventEmitter implements Runner {
this._cancelled = true;
this._clearTimers();
const child = this._child;
if (child === null) return;
if (child === null) {
// Cancelled before the process started: no close event will fire, so settle here.
this._settle('cancelled');
return;
}
child.kill('SIGTERM');
const kill = setTimeout(() => {
if (!this._exited) child.kill('SIGKILL');
Expand All @@ -189,6 +204,13 @@ export class ProcessRunner extends EventEmitter implements Runner {
this._child?.stdin?.write(answer + '\n');
}

/** Emits `settled` exactly once. Idempotent — subsequent terminal paths are no-ops. */
private _settle(reason: TerminalReason): void {
if (this._settled) return;
this._settled = true;
this.emit('settled', reason);
}

private _processLine(line: string): void {
const { adapter, config } = this._options;
if (adapter.isApprovalPrompt(line)) {
Expand Down Expand Up @@ -247,6 +269,7 @@ export class ProcessRunner extends EventEmitter implements Runner {
this._exited = true;
this.emit('done', 0, this._stderrBuffer);
}
this._settle('timed-out');
}
}, idleMs);
}
Expand Down
14 changes: 13 additions & 1 deletion src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import type { AgentEvent } from './adapters/base.js';

/**
* Why a run reached a terminal state. Emitted with the `settled` event so a finalizer can
* run once per run regardless of which path ended it.
*/
export type TerminalReason = 'succeeded' | 'failed' | 'cancelled' | 'timed-out';

/**
* Transport-agnostic execution port.
*
Expand All @@ -22,8 +28,14 @@ export interface Runner {

/** Emitted for each parsed {@link AgentEvent}. */
on(event: 'agent-event', listener: (e: AgentEvent) => void): this;
/** Emitted once when execution finishes (exit code and accumulated stderr). */
/** Emitted once when execution finishes normally (exit code and accumulated stderr). */
on(event: 'done', listener: (exitCode: number, stderr: string) => void): this;
/** Emitted if execution cannot start or fails unexpectedly. */
on(event: 'error', listener: (err: Error) => void): this;
/**
* Emitted **exactly once per run, on every terminal path** — including cancellation, which
* emits neither `done` nor `error`. The single hook a finalizer subscribes to for
* guaranteed cleanup. `done`/`error` (when they occur) fire before `settled`.
*/
on(event: 'settled', listener: (reason: TerminalReason) => void): this;
}
18 changes: 18 additions & 0 deletions tests/unit/agent-task-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,21 @@ describe('AgentTaskExecutor routing', () => {
expect(seen).toEqual([undefined, undefined]);
});
});

describe('AgentTaskExecutor settled finalizer', () => {
it('removes the runner on settled so the next execute starts fresh', () => {
const executor = new AgentTaskExecutor(baseConfig, mockAdapter);
const ctx = makeContext({ taskId: 'task-s' });

void executor.execute(ctx, makeBus() as never);
expect(vi.mocked(ProcessRunner)).toHaveBeenCalledTimes(1);

// Finalizer fires — should evict the runner from the active map.
getMockRunner().emit('settled', 'succeeded');

// Same taskId again must build a NEW runner (not hit the resume path).
void executor.execute(ctx, makeBus() as never);
expect(vi.mocked(ProcessRunner)).toHaveBeenCalledTimes(2);
expect(getMockRunner().resume).not.toHaveBeenCalled();
});
});
48 changes: 48 additions & 0 deletions tests/unit/process-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,51 @@ describe('ProcessRunner', () => {
});
});
});

describe('settled event', () => {
it('emits settled("succeeded") after done on a clean exit', async () => {
const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig });
const order: string[] = [];
const settled = new Promise<string>((resolve) => {
runner.on('done', () => order.push('done'));
runner.on('settled', (reason) => { order.push('settled'); resolve(reason); });
});
runner.start();
exitChild(0);
expect(await settled).toBe('succeeded');
expect(order).toEqual(['done', 'settled']); // done precedes settled
});

it('emits settled("failed") on a non-zero exit', async () => {
const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig });
const settled = new Promise<string>((resolve) => runner.on('settled', resolve));
runner.start();
exitChild(1);
expect(await settled).toBe('failed');
});

it('emits settled("cancelled") on cancel — the path that emits no done/error', async () => {
const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig });
let doneOrError = false;
const settled = new Promise<string>((resolve) => {
runner.on('done', () => { doneOrError = true; });
runner.on('error', () => { doneOrError = true; });
runner.on('settled', resolve);
});
runner.start();
runner.cancel();
exitChild(143); // SIGTERM close after cancel
expect(await settled).toBe('cancelled');
expect(doneOrError).toBe(false);
});

it('emits settled only once even if close arrives twice', async () => {
const runner = new ProcessRunner({ task: 'p', adapter: makeMockAdapter(), config: baseConfig });
const reasons: string[] = [];
runner.on('settled', (r) => reasons.push(r));
runner.start();
exitChild(0);
exitChild(0);
expect(reasons).toEqual(['succeeded']);
});
});
Loading