diff --git a/pi-extension/subagents/index.ts b/pi-extension/subagents/index.ts index 1320d4e..d757dcd 100644 --- a/pi-extension/subagents/index.ts +++ b/pi-extension/subagents/index.ts @@ -494,6 +494,48 @@ interface RunningSubagent { /** All currently running subagents, keyed by id. */ const runningSubagents = new Map(); +let subagentShutdownInProgress = false; + +function resetSubagentShutdownStateForSession() { + subagentShutdownInProgress = false; + const moduleAbort = (globalThis as any)[POLL_ABORT_KEY] as AbortController | undefined; + if (!moduleAbort || moduleAbort.signal.aborted) { + (globalThis as any)[POLL_ABORT_KEY] = new AbortController(); + } +} + +function closeRunningSubagentSurface( + running: Pick, + closeFn: (surface: string) => void = closeSurface, +) { + try { + closeFn(running.surface); + } catch {} + runningSubagents.delete(running.id); +} + +function abortAndCloseRunningSubagents(closeFn: (surface: string) => void = closeSurface): number { + subagentShutdownInProgress = true; + const agents = Array.from(runningSubagents.values()); + for (const agent of agents) { + closeRunningSubagentSurface(agent, closeFn); + try { + agent.abortController?.abort(); + } catch {} + } + runningSubagents.clear(); + return agents.length; +} + +function ensureSubagentLaunchCanContinue( + running: Pick, + closeFn: (surface: string) => void = closeSurface, +) { + if (!subagentShutdownInProgress) return; + closeRunningSubagentSurface(running, closeFn); + throw new Error("Session is shutting down"); +} + // ── Widget management ── /** Latest ExtensionContext from session_start, used for widget updates. */ @@ -854,6 +896,10 @@ export const __test__ = { requestSubagentInterrupt, handleSubagentInterrupt, resolveResultPresentation, + abortAndCloseRunningSubagents, + ensureSubagentLaunchCanContinue, + resetSubagentShutdownStateForSession, + isSubagentShutdownInProgress: () => subagentShutdownInProgress, runningSubagents, }; @@ -908,12 +954,42 @@ async function launchSubagent( ].join("-"); const subagentSessionFile = join(sessionDir, `${timestamp}_${uuid}.jsonl`); + if (subagentShutdownInProgress) { + throw new Error("Session is shutting down"); + } + // Use pre-created surface (parallel mode) or create a new one. // For new surfaces, pause briefly so the shell is ready before sending the command. const surfacePreCreated = !!options?.surface; const surface = options?.surface ?? createSurface(params.name); - if (!surfacePreCreated) { - await new Promise((resolve) => setTimeout(resolve, getShellReadyDelayMs())); + const running: RunningSubagent = { + id, + name: params.name, + task: params.task, + agent: params.agent, + surface, + startTime, + sessionFile: subagentSessionFile, + cli: agentDefs?.cli, + interactive: effectiveInteractive, + statusState: createStatusState({ + source: agentDefs?.cli === "claude" ? "claude" : "pi", + startTimeMs: startTime, + }), + }; + runningSubagents.set(id, running); + + try { + ensureSubagentLaunchCanContinue(running); + if (!surfacePreCreated) { + await new Promise((resolve) => setTimeout(resolve, getShellReadyDelayMs())); + } + ensureSubagentLaunchCanContinue(running); + } catch (error) { + if (!subagentShutdownInProgress) { + closeRunningSubagentSurface(running); + } + throw error; } const launchBehavior = resolveLaunchBehavior(params, agentDefs); @@ -990,6 +1066,10 @@ async function launchSubagent( .replace(/^-|-$/g, "") || "subagent"}-${id}.sh`; const launchScriptFile = join(artifactDir, "subagent-scripts", launchScriptName); + running.launchScriptFile = launchScriptFile; + running.cli = "claude"; + running.sentinelFile = sentinelFile; + ensureSubagentLaunchCanContinue(running); sendLongCommand(surface, command, { scriptPath: launchScriptFile, scriptPreamble: [ @@ -999,25 +1079,6 @@ async function launchSubagent( ].join("\n"), }); - const running: RunningSubagent = { - id, - name: params.name, - task: params.task, - agent: params.agent, - surface, - startTime, - sessionFile: subagentSessionFile, - launchScriptFile, - cli: "claude", - sentinelFile, - interactive: effectiveInteractive, - statusState: createStatusState({ - source: "claude", - startTimeMs: startTime, - }), - }; - - runningSubagents.set(id, running); return running; } @@ -1134,6 +1195,9 @@ async function launchSubagent( .replace(/-+/g, "-") .replace(/^-|-$/g, "") || "subagent"}-${id}.sh`; const launchScriptFile = join(artifactDir, "subagent-scripts", launchScriptName); + running.launchScriptFile = launchScriptFile; + running.activityFile = activityFile; + ensureSubagentLaunchCanContinue(running); sendLongCommand(surface, command, { scriptPath: launchScriptFile, scriptPreamble: [ @@ -1144,24 +1208,6 @@ async function launchSubagent( ].join("\n"), }); - const running: RunningSubagent = { - id, - name: params.name, - task: params.task, - agent: params.agent, - surface, - startTime, - sessionFile: subagentSessionFile, - launchScriptFile, - activityFile, - interactive: effectiveInteractive, - statusState: createStatusState({ - source: "pi", - startTimeMs: startTime, - }), - }; - - runningSubagents.set(id, running); return running; } @@ -1239,8 +1285,7 @@ async function watchSubagent( try { unlinkSync(running.sentinelFile + ".transcript"); } catch {} } - closeSurface(surface); - runningSubagents.delete(running.id); + closeRunningSubagentSurface(running); return { name, task, summary, exitCode: result.exitCode, elapsed, ...(sessionId ? { claudeSessionId: sessionId } : {}) }; } @@ -1261,8 +1306,7 @@ async function watchSubagent( : "Sub-agent exited without output"; } - closeSurface(surface); - runningSubagents.delete(running.id); + closeRunningSubagentSurface(running); return { name, @@ -1274,10 +1318,7 @@ async function watchSubagent( ping: result.ping, }; } catch (err: any) { - try { - closeSurface(surface); - } catch {} - runningSubagents.delete(running.id); + closeRunningSubagentSurface(running); if (signal.aborted) { return { @@ -1304,6 +1345,7 @@ async function watchSubagent( export default function subagentsExtension(pi: ExtensionAPI) { // Capture the UI context for widget updates pi.on("session_start", (_event, ctx) => { + resetSubagentShutdownStateForSession(); latestCtx = ctx; }); @@ -1321,10 +1363,8 @@ export default function subagentsExtension(pi: ExtensionAPI) { } const moduleAbort = (globalThis as any)[POLL_ABORT_KEY] as AbortController | undefined; if (moduleAbort) moduleAbort.abort(); - for (const [_id, agent] of runningSubagents) { - agent.abortController?.abort(); - } - runningSubagents.clear(); + abortAndCloseRunningSubagents(); + latestCtx = null; }); // Tools denied via PI_DENY_TOOLS env var (set by parent agent based on frontmatter) @@ -1723,8 +1763,36 @@ export default function subagentsExtension(pi: ExtensionAPI) { // Record entry count before resuming so we can extract new messages const entryCountBefore = getNewEntries(params.sessionPath, 0).length; + if (subagentShutdownInProgress) { + throw new Error("Session is shutting down"); + } + const surface = createSurface(name); - await new Promise((resolve) => setTimeout(resolve, getShellReadyDelayMs())); + const running: RunningSubagent = { + id, + name, + task: params.message ?? "resumed session", + surface, + startTime, + sessionFile: params.sessionPath, + interactive: true, + statusState: createStatusState({ + source: "pi", + startTimeMs: startTime, + }), + }; + runningSubagents.set(id, running); + + try { + ensureSubagentLaunchCanContinue(running); + await new Promise((resolve) => setTimeout(resolve, getShellReadyDelayMs())); + ensureSubagentLaunchCanContinue(running); + } catch (error) { + if (!subagentShutdownInProgress) { + closeRunningSubagentSurface(running); + } + throw error; + } // Build pi resume command const parts = ["pi", "--session", shellEscape(params.sessionPath)]; @@ -1781,6 +1849,9 @@ export default function subagentsExtension(pi: ExtensionAPI) { .replace(/-+/g, "-") .replace(/^-|-$/g, "") || "resume"}-resume-${Date.now()}.sh`, ); + running.launchScriptFile = launchScriptFile; + running.activityFile = activityFile; + ensureSubagentLaunchCanContinue(running); sendLongCommand(surface, command, { scriptPath: launchScriptFile, scriptPreamble: [ @@ -1792,23 +1863,6 @@ export default function subagentsExtension(pi: ExtensionAPI) { ].join("\n"), }); - // Register as a running subagent for widget tracking - const running: RunningSubagent = { - id, - name, - task: params.message ?? "resumed session", - surface, - startTime, - sessionFile: params.sessionPath, - launchScriptFile, - activityFile, - interactive: true, - statusState: createStatusState({ - source: "pi", - startTimeMs: startTime, - }), - }; - runningSubagents.set(id, running); startWidgetRefresh(); startStatusRefresh(pi); diff --git a/test/test.ts b/test/test.ts index 2da1195..201ae30 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1398,6 +1398,75 @@ describe("subagent interruption", () => { }; } + it("closes every tracked subagent on shutdown and tolerates close failures", () => { + const testApi = (subagentsModule as any).__test__; + const runningMap = testApi.runningSubagents as Map; + const closed: string[] = []; + const aborted: string[] = []; + runningMap.clear(); + testApi.resetSubagentShutdownStateForSession(); + + try { + runningMap.set("a1", makeRunning({ + id: "a1", + name: "One", + surface: "pane-1", + abortController: { abort: () => aborted.push("a1") }, + })); + runningMap.set("b2", makeRunning({ + id: "b2", + name: "Two", + surface: "pane-2", + abortController: { abort: () => aborted.push("b2") }, + })); + runningMap.set("c3", makeRunning({ + id: "c3", + name: "Three", + surface: "pane-3", + abortController: { abort: () => aborted.push("c3") }, + })); + + const count = testApi.abortAndCloseRunningSubagents((surface: string) => { + closed.push(surface); + if (surface === "pane-2") throw new Error("already closed"); + }); + + assert.equal(count, 3); + assert.deepEqual(closed, ["pane-1", "pane-2", "pane-3"]); + assert.deepEqual(aborted, ["a1", "b2", "c3"]); + assert.equal(runningMap.size, 0); + assert.equal(testApi.isSubagentShutdownInProgress(), true); + } finally { + runningMap.clear(); + testApi.resetSubagentShutdownStateForSession(); + } + }); + + it("closes and rejects launch surfaces that resume after shutdown starts", () => { + const testApi = (subagentsModule as any).__test__; + const runningMap = testApi.runningSubagents as Map; + const closed: string[] = []; + runningMap.clear(); + testApi.resetSubagentShutdownStateForSession(); + + const running = makeRunning({ id: "launch", surface: "pane-launch" }); + + try { + testApi.abortAndCloseRunningSubagents(() => {}); + runningMap.set(running.id, running); + + assert.throws( + () => testApi.ensureSubagentLaunchCanContinue(running, (surface: string) => closed.push(surface)), + /Session is shutting down/, + ); + assert.deepEqual(closed, ["pane-launch"]); + assert.equal(runningMap.has("launch"), false); + } finally { + runningMap.clear(); + testApi.resetSubagentShutdownStateForSession(); + } + }); + it("registers subagent_interrupt in the main session extension", () => { const { api, registeredTools } = createMockExtensionApi();