Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 123 additions & 69 deletions pi-extension/subagents/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,48 @@ interface RunningSubagent {
/** All currently running subagents, keyed by id. */
const runningSubagents = new Map<string, RunningSubagent>();

let subagentShutdownInProgress = false;

function resetSubagentShutdownStateForSession() {
subagentShutdownInProgress = false;
const moduleAbort = (globalThis as any)[POLL_ABORT_KEY] as AbortController | undefined;
if (!moduleAbort || moduleAbort.signal.aborted) {
(globalThis as any)[POLL_ABORT_KEY] = new AbortController();
}
}

function closeRunningSubagentSurface(
running: Pick<RunningSubagent, "id" | "surface">,
closeFn: (surface: string) => void = closeSurface,
) {
try {
closeFn(running.surface);
} catch {}
runningSubagents.delete(running.id);
}

function abortAndCloseRunningSubagents(closeFn: (surface: string) => void = closeSurface): number {
subagentShutdownInProgress = true;
const agents = Array.from(runningSubagents.values());
for (const agent of agents) {
closeRunningSubagentSurface(agent, closeFn);
try {
agent.abortController?.abort();
} catch {}
}
runningSubagents.clear();
return agents.length;
}

function ensureSubagentLaunchCanContinue(
running: Pick<RunningSubagent, "id" | "surface">,
closeFn: (surface: string) => void = closeSurface,
) {
if (!subagentShutdownInProgress) return;
closeRunningSubagentSurface(running, closeFn);
throw new Error("Session is shutting down");
}

// ── Widget management ──

/** Latest ExtensionContext from session_start, used for widget updates. */
Expand Down Expand Up @@ -854,6 +896,10 @@ export const __test__ = {
requestSubagentInterrupt,
handleSubagentInterrupt,
resolveResultPresentation,
abortAndCloseRunningSubagents,
ensureSubagentLaunchCanContinue,
resetSubagentShutdownStateForSession,
isSubagentShutdownInProgress: () => subagentShutdownInProgress,
runningSubagents,
};

Expand Down Expand Up @@ -908,12 +954,42 @@ async function launchSubagent(
].join("-");
const subagentSessionFile = join(sessionDir, `${timestamp}_${uuid}.jsonl`);

if (subagentShutdownInProgress) {
throw new Error("Session is shutting down");
}

// Use pre-created surface (parallel mode) or create a new one.
// For new surfaces, pause briefly so the shell is ready before sending the command.
const surfacePreCreated = !!options?.surface;
const surface = options?.surface ?? createSurface(params.name);
if (!surfacePreCreated) {
await new Promise<void>((resolve) => setTimeout(resolve, getShellReadyDelayMs()));
const running: RunningSubagent = {
id,
name: params.name,
task: params.task,
agent: params.agent,
surface,
startTime,
sessionFile: subagentSessionFile,
cli: agentDefs?.cli,
interactive: effectiveInteractive,
statusState: createStatusState({
source: agentDefs?.cli === "claude" ? "claude" : "pi",
startTimeMs: startTime,
}),
};
runningSubagents.set(id, running);

try {
ensureSubagentLaunchCanContinue(running);
if (!surfacePreCreated) {
await new Promise<void>((resolve) => setTimeout(resolve, getShellReadyDelayMs()));
}
ensureSubagentLaunchCanContinue(running);
} catch (error) {
if (!subagentShutdownInProgress) {
closeRunningSubagentSurface(running);
}
throw error;
}

const launchBehavior = resolveLaunchBehavior(params, agentDefs);
Expand Down Expand Up @@ -990,6 +1066,10 @@ async function launchSubagent(
.replace(/^-|-$/g, "") || "subagent"}-${id}.sh`;
const launchScriptFile = join(artifactDir, "subagent-scripts", launchScriptName);

running.launchScriptFile = launchScriptFile;
running.cli = "claude";
running.sentinelFile = sentinelFile;
ensureSubagentLaunchCanContinue(running);
sendLongCommand(surface, command, {
scriptPath: launchScriptFile,
scriptPreamble: [
Expand All @@ -999,25 +1079,6 @@ async function launchSubagent(
].join("\n"),
});

const running: RunningSubagent = {
id,
name: params.name,
task: params.task,
agent: params.agent,
surface,
startTime,
sessionFile: subagentSessionFile,
launchScriptFile,
cli: "claude",
sentinelFile,
interactive: effectiveInteractive,
statusState: createStatusState({
source: "claude",
startTimeMs: startTime,
}),
};

runningSubagents.set(id, running);
return running;
}

Expand Down Expand Up @@ -1134,6 +1195,9 @@ async function launchSubagent(
.replace(/-+/g, "-")
.replace(/^-|-$/g, "") || "subagent"}-${id}.sh`;
const launchScriptFile = join(artifactDir, "subagent-scripts", launchScriptName);
running.launchScriptFile = launchScriptFile;
running.activityFile = activityFile;
ensureSubagentLaunchCanContinue(running);
sendLongCommand(surface, command, {
scriptPath: launchScriptFile,
scriptPreamble: [
Expand All @@ -1144,24 +1208,6 @@ async function launchSubagent(
].join("\n"),
});

const running: RunningSubagent = {
id,
name: params.name,
task: params.task,
agent: params.agent,
surface,
startTime,
sessionFile: subagentSessionFile,
launchScriptFile,
activityFile,
interactive: effectiveInteractive,
statusState: createStatusState({
source: "pi",
startTimeMs: startTime,
}),
};

runningSubagents.set(id, running);
return running;
}

Expand Down Expand Up @@ -1239,8 +1285,7 @@ async function watchSubagent(
try { unlinkSync(running.sentinelFile + ".transcript"); } catch {}
}

closeSurface(surface);
runningSubagents.delete(running.id);
closeRunningSubagentSurface(running);

return { name, task, summary, exitCode: result.exitCode, elapsed, ...(sessionId ? { claudeSessionId: sessionId } : {}) };
}
Expand All @@ -1261,8 +1306,7 @@ async function watchSubagent(
: "Sub-agent exited without output";
}

closeSurface(surface);
runningSubagents.delete(running.id);
closeRunningSubagentSurface(running);

return {
name,
Expand All @@ -1274,10 +1318,7 @@ async function watchSubagent(
ping: result.ping,
};
} catch (err: any) {
try {
closeSurface(surface);
} catch {}
runningSubagents.delete(running.id);
closeRunningSubagentSurface(running);

if (signal.aborted) {
return {
Expand All @@ -1304,6 +1345,7 @@ async function watchSubagent(
export default function subagentsExtension(pi: ExtensionAPI) {
// Capture the UI context for widget updates
pi.on("session_start", (_event, ctx) => {
resetSubagentShutdownStateForSession();
latestCtx = ctx;
});

Expand All @@ -1321,10 +1363,8 @@ export default function subagentsExtension(pi: ExtensionAPI) {
}
const moduleAbort = (globalThis as any)[POLL_ABORT_KEY] as AbortController | undefined;
if (moduleAbort) moduleAbort.abort();
for (const [_id, agent] of runningSubagents) {
agent.abortController?.abort();
}
runningSubagents.clear();
abortAndCloseRunningSubagents();
latestCtx = null;
});

// Tools denied via PI_DENY_TOOLS env var (set by parent agent based on frontmatter)
Expand Down Expand Up @@ -1723,8 +1763,36 @@ export default function subagentsExtension(pi: ExtensionAPI) {
// Record entry count before resuming so we can extract new messages
const entryCountBefore = getNewEntries(params.sessionPath, 0).length;

if (subagentShutdownInProgress) {
throw new Error("Session is shutting down");
}

const surface = createSurface(name);
await new Promise<void>((resolve) => setTimeout(resolve, getShellReadyDelayMs()));
const running: RunningSubagent = {
id,
name,
task: params.message ?? "resumed session",
surface,
startTime,
sessionFile: params.sessionPath,
interactive: true,
statusState: createStatusState({
source: "pi",
startTimeMs: startTime,
}),
};
runningSubagents.set(id, running);

try {
ensureSubagentLaunchCanContinue(running);
await new Promise<void>((resolve) => setTimeout(resolve, getShellReadyDelayMs()));
ensureSubagentLaunchCanContinue(running);
} catch (error) {
if (!subagentShutdownInProgress) {
closeRunningSubagentSurface(running);
}
throw error;
}

// Build pi resume command
const parts = ["pi", "--session", shellEscape(params.sessionPath)];
Expand Down Expand Up @@ -1781,6 +1849,9 @@ export default function subagentsExtension(pi: ExtensionAPI) {
.replace(/-+/g, "-")
.replace(/^-|-$/g, "") || "resume"}-resume-${Date.now()}.sh`,
);
running.launchScriptFile = launchScriptFile;
running.activityFile = activityFile;
ensureSubagentLaunchCanContinue(running);
sendLongCommand(surface, command, {
scriptPath: launchScriptFile,
scriptPreamble: [
Expand All @@ -1792,23 +1863,6 @@ export default function subagentsExtension(pi: ExtensionAPI) {
].join("\n"),
});

// Register as a running subagent for widget tracking
const running: RunningSubagent = {
id,
name,
task: params.message ?? "resumed session",
surface,
startTime,
sessionFile: params.sessionPath,
launchScriptFile,
activityFile,
interactive: true,
statusState: createStatusState({
source: "pi",
startTimeMs: startTime,
}),
};
runningSubagents.set(id, running);
startWidgetRefresh();
startStatusRefresh(pi);

Expand Down
69 changes: 69 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,75 @@ describe("subagent interruption", () => {
};
}

it("closes every tracked subagent on shutdown and tolerates close failures", () => {
const testApi = (subagentsModule as any).__test__;
const runningMap = testApi.runningSubagents as Map<string, any>;
const closed: string[] = [];
const aborted: string[] = [];
runningMap.clear();
testApi.resetSubagentShutdownStateForSession();

try {
runningMap.set("a1", makeRunning({
id: "a1",
name: "One",
surface: "pane-1",
abortController: { abort: () => aborted.push("a1") },
}));
runningMap.set("b2", makeRunning({
id: "b2",
name: "Two",
surface: "pane-2",
abortController: { abort: () => aborted.push("b2") },
}));
runningMap.set("c3", makeRunning({
id: "c3",
name: "Three",
surface: "pane-3",
abortController: { abort: () => aborted.push("c3") },
}));

const count = testApi.abortAndCloseRunningSubagents((surface: string) => {
closed.push(surface);
if (surface === "pane-2") throw new Error("already closed");
});

assert.equal(count, 3);
assert.deepEqual(closed, ["pane-1", "pane-2", "pane-3"]);
assert.deepEqual(aborted, ["a1", "b2", "c3"]);
assert.equal(runningMap.size, 0);
assert.equal(testApi.isSubagentShutdownInProgress(), true);
} finally {
runningMap.clear();
testApi.resetSubagentShutdownStateForSession();
}
});

it("closes and rejects launch surfaces that resume after shutdown starts", () => {
const testApi = (subagentsModule as any).__test__;
const runningMap = testApi.runningSubagents as Map<string, any>;
const closed: string[] = [];
runningMap.clear();
testApi.resetSubagentShutdownStateForSession();

const running = makeRunning({ id: "launch", surface: "pane-launch" });

try {
testApi.abortAndCloseRunningSubagents(() => {});
runningMap.set(running.id, running);

assert.throws(
() => testApi.ensureSubagentLaunchCanContinue(running, (surface: string) => closed.push(surface)),
/Session is shutting down/,
);
assert.deepEqual(closed, ["pane-launch"]);
assert.equal(runningMap.has("launch"), false);
} finally {
runningMap.clear();
testApi.resetSubagentShutdownStateForSession();
}
});

it("registers subagent_interrupt in the main session extension", () => {
const { api, registeredTools } = createMockExtensionApi();

Expand Down