diff --git a/src/plugins/contracts/run-context-lifecycle.contract.test.ts b/src/plugins/contracts/run-context-lifecycle.contract.test.ts index b6bf48f8d96..0f2148b563e 100644 --- a/src/plugins/contracts/run-context-lifecycle.contract.test.ts +++ b/src/plugins/contracts/run-context-lifecycle.contract.test.ts @@ -315,6 +315,94 @@ describe("plugin run context lifecycle", () => { ).toBeUndefined(); }); + it("waits for terminal handlers added after the first terminal cleanup waiter starts", async () => { + let releaseFirstTerminalHandler: (() => void) | undefined; + let releaseSecondTerminalHandler: (() => void) | undefined; + let firstTerminalHandlerSawContext: unknown; + let secondTerminalHandlerSawContext: unknown; + let terminalEventsSeen = 0; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "repeated-terminal-live-wait", + name: "Repeated Terminal Live Wait", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "records", + streams: ["tool", "lifecycle"], + async handle(event, ctx) { + if (event.stream === "tool") { + ctx.setRunContext("seen", { runId: event.runId }); + return; + } + if (event.data?.phase !== "end") { + return; + } + terminalEventsSeen += 1; + if (terminalEventsSeen === 1) { + await new Promise((resolve) => { + releaseFirstTerminalHandler = resolve; + }); + firstTerminalHandlerSawContext = ctx.getRunContext("seen"); + return; + } + await new Promise((resolve) => { + releaseSecondTerminalHandler = resolve; + }); + secondTerminalHandlerSawContext = ctx.getRunContext("seen"); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-repeated-terminal-live-wait", + stream: "tool", + data: { name: "tool" }, + }); + await waitForPluginEventHandlers(); + + emitAgentEvent({ + runId: "run-repeated-terminal-live-wait", + stream: "lifecycle", + data: { phase: "end" }, + }); + await waitForPluginEventHandlers(); + + emitAgentEvent({ + runId: "run-repeated-terminal-live-wait", + stream: "lifecycle", + data: { phase: "end" }, + }); + await waitForPluginEventHandlers(); + + releaseFirstTerminalHandler?.(); + await waitForPluginEventHandlers(); + expect(firstTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" }); + expect( + getPluginRunContext({ + pluginId: "repeated-terminal-live-wait", + get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" }, + }), + ).toEqual({ runId: "run-repeated-terminal-live-wait" }); + + releaseSecondTerminalHandler?.(); + await waitForPluginEventHandlers(); + await waitForPluginEventHandlers(); + + expect(secondTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" }); + expect( + getPluginRunContext({ + pluginId: "repeated-terminal-live-wait", + get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" }, + }), + ).toBeUndefined(); + }); + it("clears run context after the terminal subscription grace period", async () => { vi.useFakeTimers(); let releaseTerminalHandler: (() => void) | undefined; diff --git a/src/plugins/host-hook-runtime.ts b/src/plugins/host-hook-runtime.ts index fbd78333d89..0a14c290b51 100644 --- a/src/plugins/host-hook-runtime.ts +++ b/src/plugins/host-hook-runtime.ts @@ -107,16 +107,20 @@ function trackAgentEventHandler(runId: string, pending: Promise): void { }); } -function waitForTerminalEventHandlers(params: { - runId: string; - pendingHandlers: Set>; -}): Promise { - const { pendingHandlers, runId } = params; - if (pendingHandlers.size === 0) { - return Promise.resolve(); +async function waitForLiveTerminalEventHandlers(runId: string): Promise<"settled"> { + for (;;) { + const pendingHandlers = getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(runId); + if (!pendingHandlers || pendingHandlers.size === 0) { + return "settled"; + } + await Promise.allSettled(pendingHandlers); } +} + +function waitForTerminalEventHandlers(params: { runId: string }): Promise { + const { runId } = params; let timeout: NodeJS.Timeout | undefined; - const settled = Promise.allSettled(pendingHandlers).then(() => "settled" as const); + const settled = waitForLiveTerminalEventHandlers(runId); // Promise.race bounds the host wait; JavaScript cannot cancel the plugin // promises themselves, so timeout also marks the run expired to block late // run-context resurrection by handlers that eventually settle. @@ -340,12 +344,8 @@ export function dispatchPluginAgentEventSubscriptions(params: { } if (isTerminalEvent) { markPluginRunClosed(params.event.runId); - const pendingForRun = - getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ?? - new Set(pendingHandlers); void waitForTerminalEventHandlers({ runId: params.event.runId, - pendingHandlers: new Set(pendingForRun), }).then(() => { clearPluginRunContext({ runId: params.event.runId }); });