test: split embedded runner cleanup seams

This commit is contained in:
Peter Steinberger
2026-04-05 22:20:02 +01:00
parent d3e67a0de7
commit d83dd9b536
4 changed files with 156 additions and 106 deletions

View File

@@ -9,11 +9,14 @@ import {
import {
cleanupTempPaths,
createContextEngineBootstrapAndAssemble,
createContextEngineAttemptRunner,
expectCalledWithSessionKey,
getHoisted,
resetEmbeddedAttemptHarness,
} from "./attempt.spawn-workspace.test-support.js";
import {
buildEmbeddedSubscriptionParams,
cleanupEmbeddedAttemptResources,
} from "./attempt.subscription-cleanup.js";
const hoisted = getHoisted();
const embeddedSessionId = "embedded-session";
@@ -177,26 +180,35 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
});
it("forwards silentExpected to the embedded subscription", async () => {
const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble();
const result = await createContextEngineAttemptRunner({
contextEngine: {
bootstrap,
assemble,
},
attemptOverrides: {
silentExpected: true,
},
const params = buildEmbeddedSubscriptionParams({
session: {} as never,
runId: "run-context-engine-forwarding",
hookRunner: undefined,
verboseLevel: undefined,
reasoningMode: "off",
toolResultFormat: undefined,
shouldEmitToolResult: undefined,
shouldEmitToolOutput: undefined,
onToolResult: undefined,
onReasoningStream: undefined,
onReasoningEnd: undefined,
onBlockReply: undefined,
onBlockReplyFlush: undefined,
blockReplyBreak: undefined,
blockReplyChunking: undefined,
onPartialReply: undefined,
onAssistantMessageStart: undefined,
onAgentEvent: undefined,
enforceFinalTag: undefined,
silentExpected: true,
config: undefined,
sessionKey,
tempPaths,
sessionId: embeddedSessionId,
agentId: "main",
});
expect(result.promptError).toBeNull();
expect(hoisted.subscribeEmbeddedPiSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
silentExpected: true,
}),
);
expect(params.silentExpected).toBe(true);
expect(params.sessionKey).toBe(sessionKey);
});
it("skips maintenance when afterTurn fails", async () => {
@@ -252,29 +264,25 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
});
it("releases the session lock even when teardown cleanup throws", async () => {
const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble();
const releaseMock = vi.fn(async () => {});
hoisted.acquireSessionWriteLockMock.mockResolvedValue({
release: releaseMock,
});
let flushCallCount = 0;
hoisted.flushPendingToolResultsAfterIdleMock.mockImplementation(async () => {
flushCallCount += 1;
if (flushCallCount >= 2) {
throw new Error("flush failed");
}
const disposeMock = vi.fn();
const flushMock = vi.fn(async () => {
throw new Error("flush failed");
});
const result = await createContextEngineAttemptRunner({
contextEngine: {
bootstrap,
assemble,
},
sessionKey,
tempPaths,
await cleanupEmbeddedAttemptResources({
removeToolResultContextGuard: () => {},
flushPendingToolResultsAfterIdle: flushMock,
session: { agent: {}, dispose: disposeMock },
sessionManager: hoisted.sessionManager,
releaseWsSession: hoisted.releaseWsSessionMock,
sessionId: embeddedSessionId,
bundleLspRuntime: undefined,
sessionLock: { release: releaseMock },
});
expect(result.promptError).toBeNull();
expect(flushMock).toHaveBeenCalledTimes(1);
expect(disposeMock).toHaveBeenCalledTimes(1);
expect(releaseMock).toHaveBeenCalledTimes(1);
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
});

View File

@@ -497,15 +497,6 @@ vi.mock("./history-image-prune.js", () => ({
pruneProcessedHistoryImages: <T>(messages: T) => messages,
}));
let runEmbeddedAttemptPromise:
| Promise<typeof import("./attempt.js").runEmbeddedAttempt>
| undefined;
async function loadRunEmbeddedAttempt() {
runEmbeddedAttemptPromise ??= import("./attempt.js").then((mod) => mod.runEmbeddedAttempt);
return await runEmbeddedAttemptPromise;
}
export type MutableSession = {
sessionId: string;
messages: unknown[];
@@ -546,6 +537,17 @@ export function createSubscriptionMock(): SubscriptionMock {
};
}
let runEmbeddedAttemptPromise:
| Promise<typeof import("./attempt.js").runEmbeddedAttempt>
| undefined;
async function loadRunEmbeddedAttempt() {
runEmbeddedAttemptPromise ??= import("./attempt.ts?spawn-workspace-test").then(
(mod) => mod.runEmbeddedAttempt,
);
return await runEmbeddedAttemptPromise;
}
export function resetEmbeddedAttemptHarness(
params: {
includeSpawnSubagent?: boolean;
@@ -767,8 +769,9 @@ export async function createContextEngineAttemptRunner(params: {
session: createDefaultEmbeddedSession(),
}));
const runEmbeddedAttempt = await loadRunEmbeddedAttempt();
return await runEmbeddedAttempt({
return await (
await loadRunEmbeddedAttempt()
)({
sessionId: "embedded-session",
sessionKey: params.sessionKey,
sessionFile,

View File

@@ -0,0 +1,56 @@
import type { SubscribeEmbeddedPiSessionParams } from "../../pi-embedded-subscribe.types.js";
export function buildEmbeddedSubscriptionParams(
params: SubscribeEmbeddedPiSessionParams,
): SubscribeEmbeddedPiSessionParams {
return params;
}
export async function cleanupEmbeddedAttemptResources(params: {
removeToolResultContextGuard?: () => void;
flushPendingToolResultsAfterIdle: (params: {
agent: unknown;
sessionManager: unknown;
clearPendingOnTimeout: boolean;
}) => Promise<void>;
session?: { agent?: unknown; dispose(): void };
sessionManager: unknown;
releaseWsSession: (sessionId: string) => void;
sessionId: string;
bundleLspRuntime?: { dispose(): Promise<void> | void };
sessionLock: { release(): Promise<void> | void };
}): Promise<void> {
try {
try {
params.removeToolResultContextGuard?.();
} catch {
/* best-effort */
}
try {
await params.flushPendingToolResultsAfterIdle({
agent: params.session?.agent,
sessionManager: params.sessionManager,
clearPendingOnTimeout: true,
});
} catch {
/* best-effort */
}
try {
params.session?.dispose();
} catch {
/* best-effort */
}
try {
params.releaseWsSession(params.sessionId);
} catch {
/* best-effort */
}
try {
await params.bundleLspRuntime?.dispose();
} catch {
/* best-effort */
}
} finally {
await params.sessionLock.release();
}
}

View File

@@ -166,6 +166,10 @@ import {
waitForSessionsYieldAbortSettle,
} from "./attempt.sessions-yield.js";
import { wrapStreamFnHandleSensitiveStopReason } from "./attempt.stop-reason-recovery.js";
import {
buildEmbeddedSubscriptionParams,
cleanupEmbeddedAttemptResources,
} from "./attempt.subscription-cleanup.js";
import {
appendAttemptCacheTtlIfNeeded,
composeSystemPromptWithHookContext,
@@ -1328,32 +1332,34 @@ export async function runEmbeddedAttempt(
});
};
const subscription = subscribeEmbeddedPiSession({
session: activeSession,
runId: params.runId,
hookRunner: getGlobalHookRunner() ?? undefined,
verboseLevel: params.verboseLevel,
reasoningMode: params.reasoningLevel ?? "off",
toolResultFormat: params.toolResultFormat,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onReasoningEnd: params.onReasoningEnd,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAssistantMessageStart: params.onAssistantMessageStart,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
silentExpected: params.silentExpected,
config: params.config,
sessionKey: sandboxSessionKey,
sessionId: params.sessionId,
agentId: sessionAgentId,
});
const subscription = subscribeEmbeddedPiSession(
buildEmbeddedSubscriptionParams({
session: activeSession,
runId: params.runId,
hookRunner: getGlobalHookRunner() ?? undefined,
verboseLevel: params.verboseLevel,
reasoningMode: params.reasoningLevel ?? "off",
toolResultFormat: params.toolResultFormat,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onReasoningEnd: params.onReasoningEnd,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAssistantMessageStart: params.onAssistantMessageStart,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
silentExpected: params.silentExpected,
config: params.config,
sessionKey: sandboxSessionKey,
sessionId: params.sessionId,
agentId: sessionAgentId,
}),
);
const {
assistantTexts,
@@ -2091,39 +2097,16 @@ export async function runEmbeddedAttempt(
// flushPendingToolResults() fires while tools are still executing, inserting
// synthetic "missing tool result" errors and causing silent agent failures.
// See: https://github.com/openclaw/openclaw/issues/8643
try {
try {
removeToolResultContextGuard?.();
} catch {
/* best-effort */
}
try {
await flushPendingToolResultsAfterIdle({
agent: session?.agent,
sessionManager,
clearPendingOnTimeout: true,
});
} catch {
/* best-effort */
}
try {
session?.dispose();
} catch {
/* best-effort */
}
try {
releaseWsSession(params.sessionId);
} catch {
/* best-effort */
}
try {
await bundleLspRuntime?.dispose();
} catch {
/* best-effort */
}
} finally {
await sessionLock.release();
}
await cleanupEmbeddedAttemptResources({
removeToolResultContextGuard,
flushPendingToolResultsAfterIdle,
session,
sessionManager,
releaseWsSession,
sessionId: params.sessionId,
bundleLspRuntime,
sessionLock,
});
}
} finally {
restoreSkillEnv?.();