From d83dd9b536aa916b0b47758eeb437e252cc3bc2e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 5 Apr 2026 22:20:02 +0100 Subject: [PATCH] test: split embedded runner cleanup seams --- ...mpt.spawn-workspace.context-engine.test.ts | 80 +++++++------- .../attempt.spawn-workspace.test-support.ts | 25 +++-- .../run/attempt.subscription-cleanup.ts | 56 ++++++++++ src/agents/pi-embedded-runner/run/attempt.ts | 101 ++++++++---------- 4 files changed, 156 insertions(+), 106 deletions(-) create mode 100644 src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts index e3b9c4d43e8..f5198b98c85 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts @@ -9,11 +9,14 @@ import { import { cleanupTempPaths, createContextEngineBootstrapAndAssemble, - createContextEngineAttemptRunner, expectCalledWithSessionKey, getHoisted, resetEmbeddedAttemptHarness, } from "./attempt.spawn-workspace.test-support.js"; +import { + buildEmbeddedSubscriptionParams, + cleanupEmbeddedAttemptResources, +} from "./attempt.subscription-cleanup.js"; const hoisted = getHoisted(); const embeddedSessionId = "embedded-session"; @@ -177,26 +180,35 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { }); it("forwards silentExpected to the embedded subscription", async () => { - const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); - - const result = await createContextEngineAttemptRunner({ - contextEngine: { - bootstrap, - assemble, - }, - attemptOverrides: { - silentExpected: true, - }, + const params = buildEmbeddedSubscriptionParams({ + session: {} as never, + runId: "run-context-engine-forwarding", + hookRunner: undefined, + verboseLevel: undefined, + reasoningMode: "off", + toolResultFormat: undefined, + shouldEmitToolResult: undefined, + shouldEmitToolOutput: undefined, + onToolResult: undefined, + onReasoningStream: undefined, + onReasoningEnd: undefined, + onBlockReply: undefined, + onBlockReplyFlush: undefined, + blockReplyBreak: undefined, + blockReplyChunking: undefined, + onPartialReply: undefined, + onAssistantMessageStart: undefined, + onAgentEvent: undefined, + enforceFinalTag: undefined, + silentExpected: true, + config: undefined, sessionKey, - tempPaths, + sessionId: embeddedSessionId, + agentId: "main", }); - expect(result.promptError).toBeNull(); - expect(hoisted.subscribeEmbeddedPiSessionMock).toHaveBeenCalledWith( - expect.objectContaining({ - silentExpected: true, - }), - ); + expect(params.silentExpected).toBe(true); + expect(params.sessionKey).toBe(sessionKey); }); it("skips maintenance when afterTurn fails", async () => { @@ -252,29 +264,25 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { }); it("releases the session lock even when teardown cleanup throws", async () => { - const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); const releaseMock = vi.fn(async () => {}); - hoisted.acquireSessionWriteLockMock.mockResolvedValue({ - release: releaseMock, - }); - let flushCallCount = 0; - hoisted.flushPendingToolResultsAfterIdleMock.mockImplementation(async () => { - flushCallCount += 1; - if (flushCallCount >= 2) { - throw new Error("flush failed"); - } + const disposeMock = vi.fn(); + const flushMock = vi.fn(async () => { + throw new Error("flush failed"); }); - const result = await createContextEngineAttemptRunner({ - contextEngine: { - bootstrap, - assemble, - }, - sessionKey, - tempPaths, + await cleanupEmbeddedAttemptResources({ + removeToolResultContextGuard: () => {}, + flushPendingToolResultsAfterIdle: flushMock, + session: { agent: {}, dispose: disposeMock }, + sessionManager: hoisted.sessionManager, + releaseWsSession: hoisted.releaseWsSessionMock, + sessionId: embeddedSessionId, + bundleLspRuntime: undefined, + sessionLock: { release: releaseMock }, }); - expect(result.promptError).toBeNull(); + expect(flushMock).toHaveBeenCalledTimes(1); + expect(disposeMock).toHaveBeenCalledTimes(1); expect(releaseMock).toHaveBeenCalledTimes(1); expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index 8c081c5701b..982e0013c81 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -497,15 +497,6 @@ vi.mock("./history-image-prune.js", () => ({ pruneProcessedHistoryImages: (messages: T) => messages, })); -let runEmbeddedAttemptPromise: - | Promise - | undefined; - -async function loadRunEmbeddedAttempt() { - runEmbeddedAttemptPromise ??= import("./attempt.js").then((mod) => mod.runEmbeddedAttempt); - return await runEmbeddedAttemptPromise; -} - export type MutableSession = { sessionId: string; messages: unknown[]; @@ -546,6 +537,17 @@ export function createSubscriptionMock(): SubscriptionMock { }; } +let runEmbeddedAttemptPromise: + | Promise + | undefined; + +async function loadRunEmbeddedAttempt() { + runEmbeddedAttemptPromise ??= import("./attempt.ts?spawn-workspace-test").then( + (mod) => mod.runEmbeddedAttempt, + ); + return await runEmbeddedAttemptPromise; +} + export function resetEmbeddedAttemptHarness( params: { includeSpawnSubagent?: boolean; @@ -767,8 +769,9 @@ export async function createContextEngineAttemptRunner(params: { session: createDefaultEmbeddedSession(), })); - const runEmbeddedAttempt = await loadRunEmbeddedAttempt(); - return await runEmbeddedAttempt({ + return await ( + await loadRunEmbeddedAttempt() + )({ sessionId: "embedded-session", sessionKey: params.sessionKey, sessionFile, diff --git a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts new file mode 100644 index 00000000000..6a4c4916537 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts @@ -0,0 +1,56 @@ +import type { SubscribeEmbeddedPiSessionParams } from "../../pi-embedded-subscribe.types.js"; + +export function buildEmbeddedSubscriptionParams( + params: SubscribeEmbeddedPiSessionParams, +): SubscribeEmbeddedPiSessionParams { + return params; +} + +export async function cleanupEmbeddedAttemptResources(params: { + removeToolResultContextGuard?: () => void; + flushPendingToolResultsAfterIdle: (params: { + agent: unknown; + sessionManager: unknown; + clearPendingOnTimeout: boolean; + }) => Promise; + session?: { agent?: unknown; dispose(): void }; + sessionManager: unknown; + releaseWsSession: (sessionId: string) => void; + sessionId: string; + bundleLspRuntime?: { dispose(): Promise | void }; + sessionLock: { release(): Promise | void }; +}): Promise { + try { + try { + params.removeToolResultContextGuard?.(); + } catch { + /* best-effort */ + } + try { + await params.flushPendingToolResultsAfterIdle({ + agent: params.session?.agent, + sessionManager: params.sessionManager, + clearPendingOnTimeout: true, + }); + } catch { + /* best-effort */ + } + try { + params.session?.dispose(); + } catch { + /* best-effort */ + } + try { + params.releaseWsSession(params.sessionId); + } catch { + /* best-effort */ + } + try { + await params.bundleLspRuntime?.dispose(); + } catch { + /* best-effort */ + } + } finally { + await params.sessionLock.release(); + } +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 168b6c15d31..9cc1ee53f4d 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -166,6 +166,10 @@ import { waitForSessionsYieldAbortSettle, } from "./attempt.sessions-yield.js"; import { wrapStreamFnHandleSensitiveStopReason } from "./attempt.stop-reason-recovery.js"; +import { + buildEmbeddedSubscriptionParams, + cleanupEmbeddedAttemptResources, +} from "./attempt.subscription-cleanup.js"; import { appendAttemptCacheTtlIfNeeded, composeSystemPromptWithHookContext, @@ -1328,32 +1332,34 @@ export async function runEmbeddedAttempt( }); }; - const subscription = subscribeEmbeddedPiSession({ - session: activeSession, - runId: params.runId, - hookRunner: getGlobalHookRunner() ?? undefined, - verboseLevel: params.verboseLevel, - reasoningMode: params.reasoningLevel ?? "off", - toolResultFormat: params.toolResultFormat, - shouldEmitToolResult: params.shouldEmitToolResult, - shouldEmitToolOutput: params.shouldEmitToolOutput, - onToolResult: params.onToolResult, - onReasoningStream: params.onReasoningStream, - onReasoningEnd: params.onReasoningEnd, - onBlockReply: params.onBlockReply, - onBlockReplyFlush: params.onBlockReplyFlush, - blockReplyBreak: params.blockReplyBreak, - blockReplyChunking: params.blockReplyChunking, - onPartialReply: params.onPartialReply, - onAssistantMessageStart: params.onAssistantMessageStart, - onAgentEvent: params.onAgentEvent, - enforceFinalTag: params.enforceFinalTag, - silentExpected: params.silentExpected, - config: params.config, - sessionKey: sandboxSessionKey, - sessionId: params.sessionId, - agentId: sessionAgentId, - }); + const subscription = subscribeEmbeddedPiSession( + buildEmbeddedSubscriptionParams({ + session: activeSession, + runId: params.runId, + hookRunner: getGlobalHookRunner() ?? undefined, + verboseLevel: params.verboseLevel, + reasoningMode: params.reasoningLevel ?? "off", + toolResultFormat: params.toolResultFormat, + shouldEmitToolResult: params.shouldEmitToolResult, + shouldEmitToolOutput: params.shouldEmitToolOutput, + onToolResult: params.onToolResult, + onReasoningStream: params.onReasoningStream, + onReasoningEnd: params.onReasoningEnd, + onBlockReply: params.onBlockReply, + onBlockReplyFlush: params.onBlockReplyFlush, + blockReplyBreak: params.blockReplyBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: params.onPartialReply, + onAssistantMessageStart: params.onAssistantMessageStart, + onAgentEvent: params.onAgentEvent, + enforceFinalTag: params.enforceFinalTag, + silentExpected: params.silentExpected, + config: params.config, + sessionKey: sandboxSessionKey, + sessionId: params.sessionId, + agentId: sessionAgentId, + }), + ); const { assistantTexts, @@ -2091,39 +2097,16 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 - try { - try { - removeToolResultContextGuard?.(); - } catch { - /* best-effort */ - } - try { - await flushPendingToolResultsAfterIdle({ - agent: session?.agent, - sessionManager, - clearPendingOnTimeout: true, - }); - } catch { - /* best-effort */ - } - try { - session?.dispose(); - } catch { - /* best-effort */ - } - try { - releaseWsSession(params.sessionId); - } catch { - /* best-effort */ - } - try { - await bundleLspRuntime?.dispose(); - } catch { - /* best-effort */ - } - } finally { - await sessionLock.release(); - } + await cleanupEmbeddedAttemptResources({ + removeToolResultContextGuard, + flushPendingToolResultsAfterIdle, + session, + sessionManager, + releaseWsSession, + sessionId: params.sessionId, + bundleLspRuntime, + sessionLock, + }); } } finally { restoreSkillEnv?.();