diff --git a/src/agents/pi-embedded-runner/compact.hooks.harness.ts b/src/agents/pi-embedded-runner/compact.hooks.harness.ts index d3d53a8f00b..84e10a85c98 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.harness.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.harness.ts @@ -334,6 +334,47 @@ export async function loadCompactHooksHarness(): Promise<{ splitSdkTools: vi.fn(() => ({ builtInTools: [], customTools: [] })), })); + vi.doMock("./compaction-safety-timeout.js", () => ({ + compactWithSafetyTimeout: vi.fn( + async ( + compact: () => Promise, + _timeoutMs?: number, + opts?: { abortSignal?: AbortSignal; onCancel?: () => void }, + ) => { + const abortSignal = opts?.abortSignal; + if (!abortSignal) { + return await compact(); + } + const cancelAndCreateError = () => { + opts?.onCancel?.(); + const reason = "reason" in abortSignal ? abortSignal.reason : undefined; + if (reason instanceof Error) { + return reason; + } + const err = new Error("aborted"); + err.name = "AbortError"; + return err; + }; + if (abortSignal.aborted) { + throw cancelAndCreateError(); + } + return await Promise.race([ + compact(), + new Promise((_, reject) => { + abortSignal.addEventListener( + "abort", + () => { + reject(cancelAndCreateError()); + }, + { once: true }, + ); + }), + ]); + }, + ), + resolveCompactionTimeoutMs: vi.fn(() => 30_000), + })); + vi.doMock("./wait-for-idle-before-flush.js", () => ({ flushPendingToolResultsAfterIdle: vi.fn(async () => {}), })); diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 3d37637f8db..55ebe2f51f7 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -77,17 +77,6 @@ function compactionConfig(mode: "await" | "off" | "async") { } as never; } -function directCompactionArgs(overrides: Record = {}) { - return { - sessionId: TEST_SESSION_ID, - sessionKey: TEST_SESSION_KEY, - sessionFile: TEST_SESSION_FILE, - workspaceDir: TEST_WORKSPACE_DIR, - customInstructions: TEST_CUSTOM_INSTRUCTIONS, - ...overrides, - }; -} - function wrappedCompactionArgs(overrides: Record = {}) { return { sessionId: TEST_SESSION_ID, @@ -174,14 +163,6 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { unregisterApiProviders(getCustomApiRegistrySourceId("ollama")); }); - async function runDirectCompaction(customInstructions = TEST_CUSTOM_INSTRUCTIONS) { - return await compactEmbeddedPiSessionDirect( - directCompactionArgs({ - customInstructions, - }), - ); - } - it("bootstraps runtime plugins with the resolved workspace", async () => { // This assertion only cares about bootstrap wiring, so stop before the // rest of the compaction pipeline can pull in unrelated runtime surfaces. @@ -230,23 +211,39 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { it("emits internal + plugin compaction hooks with counts", async () => { hookRunner.hasHooks.mockReturnValue(true); - let sanitizedCount = 0; - sanitizeSessionHistoryMock.mockImplementation(async (params: { messages: unknown[] }) => { - const sanitized = params.messages.slice(1); - sanitizedCount = sanitized.length; - return sanitized; + const originalMessages = sessionMessages.slice(1) as AgentMessage[]; + const currentMessages = sessionMessages.slice(1) as AgentMessage[]; + const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({ + originalMessages, + currentMessages, + estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number, }); - - const result = await compactEmbeddedPiSessionDirect({ + const { hookSessionKey, missingSessionKey } = await compactTesting.runBeforeCompactionHooks({ + hookRunner, sessionId: "session-1", sessionKey: "agent:main:session-1", - sessionFile: "/tmp/session.jsonl", + sessionAgentId: "main", workspaceDir: "/tmp", - messageChannel: "telegram", - customInstructions: "focus on decisions", + messageProvider: "telegram", + metrics: beforeMetrics, + }); + await compactTesting.runAfterCompactionHooks({ + hookRunner, + sessionId: "session-1", + sessionAgentId: "main", + hookSessionKey, + missingSessionKey, + workspaceDir: "/tmp", + messageProvider: "telegram", + messageCountAfter: 1, + tokensAfter: 10, + compactedCount: 1, + sessionFile: "/tmp/session.jsonl", + summaryLength: "summary".length, + tokensBefore: 120, + firstKeptEntryId: "entry-1", }); - expect(result.ok).toBe(true); expect(sessionHook("compact:before")).toMatchObject({ type: "session", action: "compact:before", @@ -257,8 +254,8 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { expect(beforeContext).toMatchObject({ messageCount: 2, tokenCount: 20, - messageCountOriginal: sanitizedCount, - tokenCountOriginal: sanitizedCount * 10, + messageCountOriginal: 2, + tokenCountOriginal: 20, }); expect(afterContext).toMatchObject({ messageCount: 1, @@ -288,15 +285,33 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { it("uses sessionId as hook session key fallback when sessionKey is missing", async () => { hookRunner.hasHooks.mockReturnValue(true); - - const result = await compactEmbeddedPiSessionDirect({ + const originalMessages = sessionMessages.slice(1) as AgentMessage[]; + const currentMessages = sessionMessages.slice(1) as AgentMessage[]; + const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({ + originalMessages, + currentMessages, + estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number, + }); + const { hookSessionKey, missingSessionKey } = await compactTesting.runBeforeCompactionHooks({ + hookRunner, sessionId: "session-1", - sessionFile: "/tmp/session.jsonl", + sessionAgentId: "main", workspaceDir: "/tmp", - customInstructions: "focus on decisions", + metrics: beforeMetrics, + }); + await compactTesting.runAfterCompactionHooks({ + hookRunner, + sessionId: "session-1", + sessionAgentId: "main", + hookSessionKey, + missingSessionKey, + workspaceDir: "/tmp", + messageCountAfter: 1, + tokensAfter: 10, + compactedCount: 1, + sessionFile: "/tmp/session.jsonl", }); - expect(result.ok).toBe(true); expect(sessionHook("compact:before")?.sessionKey).toBe("session-1"); expect(sessionHook("compact:after")?.sessionKey).toBe("session-1"); expect(hookRunner.runBeforeCompaction).toHaveBeenCalledWith( @@ -311,11 +326,20 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { it("applies validated transcript before hooks even when it becomes empty", async () => { hookRunner.hasHooks.mockReturnValue(true); - sanitizeSessionHistoryMock.mockResolvedValue([]); + const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({ + originalMessages: [], + currentMessages: [], + estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number, + }); + await compactTesting.runBeforeCompactionHooks({ + hookRunner, + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionAgentId: "main", + workspaceDir: "/tmp", + metrics: beforeMetrics, + }); - const result = await runDirectCompaction(); - - expect(result.ok).toBe(true); const beforeContext = sessionHook("compact:before")?.context; expect(beforeContext).toMatchObject({ messageCountOriginal: 0, @@ -329,15 +353,11 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { const cleanup = onSessionTranscriptUpdate(listener); try { - const result = await compactEmbeddedPiSessionDirect({ - sessionId: "session-1", + await compactTesting.runPostCompactionSideEffects({ sessionKey: "agent:main:session-1", sessionFile: " /tmp/session.jsonl ", - workspaceDir: "/tmp", - customInstructions: "focus on decisions", }); - expect(result.ok).toBe(true); expect(listener).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" }); } finally { @@ -356,24 +376,13 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { } return 5; }); - sessionCompactImpl.mockResolvedValue({ - summary: "summary", - firstKeptEntryId: "entry-1", - tokensBefore: 20, - details: { ok: true }, + const tokensAfter = compactTesting.estimateTokensAfterCompaction({ + messagesAfter: [{ role: "user", content: "kept ask" }] as AgentMessage[], + fullSessionTokensBefore: 55, + estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number, }); - const result = await runDirectCompaction(); - - expect(result).toMatchObject({ - ok: true, - compacted: true, - result: { - tokensBefore: 20, - tokensAfter: 30, - }, - }); - expect(sessionHook("compact:after")?.context?.tokenCount).toBe(30); + expect(tokensAfter).toBe(30); }); it("treats pre-compaction token estimation failures as a no-op sanity check", async () => { @@ -387,29 +396,20 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { } return 5; }); - sessionCompactImpl.mockResolvedValue({ - summary: "summary", - firstKeptEntryId: "entry-1", - tokensBefore: 20, - details: { ok: true }, + const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({ + originalMessages: sessionMessages as AgentMessage[], + currentMessages: sessionMessages as AgentMessage[], + estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number, + }); + const tokensAfter = compactTesting.estimateTokensAfterCompaction({ + messagesAfter: [{ role: "user", content: "kept ask" }] as AgentMessage[], + fullSessionTokensBefore: 0, + estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number, }); - const result = await compactEmbeddedPiSessionDirect({ - sessionId: "session-1", - sessionKey: "agent:main:session-1", - sessionFile: "/tmp/session.jsonl", - workspaceDir: "/tmp", - customInstructions: "focus on decisions", - }); - - expect(result).toMatchObject({ - ok: true, - compacted: true, - result: { - tokensAfter: 30, - }, - }); - expect(sessionHook("compact:after")?.context?.tokenCount).toBe(30); + expect(beforeMetrics.tokenCountOriginal).toBeUndefined(); + expect(beforeMetrics.tokenCountBefore).toBeUndefined(); + expect(tokensAfter).toBe(30); }); it("skips sync in await mode when postCompactionForce is false", async () => { @@ -424,13 +424,12 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { }, }); - const result = await compactEmbeddedPiSessionDirect( - directCompactionArgs({ - config: compactionConfig("await"), - }), - ); + await compactTesting.runPostCompactionSideEffects({ + config: compactionConfig("await"), + sessionKey: TEST_SESSION_KEY, + sessionFile: TEST_SESSION_FILE, + }); - expect(result.ok).toBe(true); expect(resolveSessionAgentIdMock).toHaveBeenCalledWith({ sessionKey: TEST_SESSION_KEY, config: expect.any(Object), @@ -449,11 +448,11 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); let settled = false; - const resultPromise = compactEmbeddedPiSessionDirect( - directCompactionArgs({ - config: compactionConfig("await"), - }), - ); + const resultPromise = compactTesting.runPostCompactionSideEffects({ + config: compactionConfig("await"), + sessionKey: TEST_SESSION_KEY, + sessionFile: TEST_SESSION_FILE, + }); void resultPromise.then(() => { settled = true; @@ -464,8 +463,7 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { }); expect(settled).toBe(false); syncRelease.resolve(undefined); - const result = await resultPromise; - expect(result.ok).toBe(true); + await resultPromise; expect(settled).toBe(true); }); @@ -473,13 +471,12 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { const sync = vi.fn(async () => {}); getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); - const result = await compactEmbeddedPiSessionDirect( - directCompactionArgs({ - config: compactionConfig("off"), - }), - ); + await compactTesting.runPostCompactionSideEffects({ + config: compactionConfig("off"), + sessionKey: TEST_SESSION_KEY, + sessionFile: TEST_SESSION_FILE, + }); - expect(result.ok).toBe(true); expect(resolveSessionAgentIdMock).not.toHaveBeenCalled(); expect(getMemorySearchManagerMock).not.toHaveBeenCalled(); expect(sync).not.toHaveBeenCalled(); @@ -499,11 +496,11 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { }); let settled = false; - const resultPromise = compactEmbeddedPiSessionDirect( - directCompactionArgs({ - config: compactionConfig("async"), - }), - ); + const resultPromise = compactTesting.runPostCompactionSideEffects({ + config: compactionConfig("async"), + sessionKey: TEST_SESSION_KEY, + sessionFile: TEST_SESSION_FILE, + }); await managerRequested.promise; void resultPromise.then(() => { @@ -521,9 +518,7 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { }); it("skips compaction when the transcript only contains boilerplate replies and tool output", async () => { - sessionMessages.splice( - 0, - sessionMessages.length, + const messages = [ { role: "user", content: "HEARTBEAT_OK", timestamp: 1 }, { role: "toolResult", @@ -533,50 +528,22 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { isError: false, timestamp: 2, }, - ); + ] as AgentMessage[]; - const result = await compactEmbeddedPiSessionDirect({ - sessionId: "session-1", - sessionKey: "agent:main:session-1", - sessionFile: "/tmp/session.jsonl", - workspaceDir: "/tmp", - customInstructions: "focus on decisions", - }); - - expect(result).toMatchObject({ - ok: true, - compacted: false, - reason: "no real conversation messages", - }); - expect(sessionCompactImpl).not.toHaveBeenCalled(); + expect(compactTesting.containsRealConversationMessages(messages)).toBe(false); }); it("skips compaction when the transcript only contains heartbeat boilerplate and reasoning blocks", async () => { - sessionMessages.splice( - 0, - sessionMessages.length, + const messages = [ { role: "user", content: "HEARTBEAT_OK", timestamp: 1 }, { role: "assistant", content: [{ type: "thinking", thinking: "checking" }], timestamp: 2, }, - ); + ] as AgentMessage[]; - const result = await compactEmbeddedPiSessionDirect({ - sessionId: "session-1", - sessionKey: "agent:main:session-1", - sessionFile: "/tmp/session.jsonl", - workspaceDir: "/tmp", - customInstructions: "focus on decisions", - }); - - expect(result).toMatchObject({ - ok: true, - compacted: false, - reason: "no real conversation messages", - }); - expect(sessionCompactImpl).not.toHaveBeenCalled(); + expect(compactTesting.containsRealConversationMessages(messages)).toBe(false); }); it("does not treat assistant-only tool-call blocks as meaningful conversation", () => { @@ -661,20 +628,30 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { }); it("aborts in-flight compaction when the caller abort signal fires", async () => { + const { compactWithSafetyTimeout } = await vi.importActual< + typeof import("./compaction-safety-timeout.js") + >("./compaction-safety-timeout.js"); const controller = new AbortController(); - sessionCompactImpl.mockImplementationOnce(() => new Promise(() => {})); + const compactStarted = createDeferred(); - const resultPromise = compactEmbeddedPiSessionDirect( - directCompactionArgs({ + const resultPromise = compactWithSafetyTimeout( + async () => { + compactStarted.resolve(undefined); + return await new Promise(() => {}); + }, + 30_000, + { abortSignal: controller.signal, - }), + onCancel: () => { + sessionAbortCompactionMock(); + }, + }, ); + await compactStarted.promise; controller.abort(new Error("request timed out")); - const result = await resultPromise; - expect(result.ok).toBe(false); - expect(result.reason).toContain("request timed out"); + await expect(resultPromise).rejects.toThrow("request timed out"); expect(sessionAbortCompactionMock).toHaveBeenCalledTimes(1); }); }); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 3fe52479a73..df6466cf17c 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -387,6 +387,222 @@ async function runPostCompactionSideEffects(params: { }); } +type CompactionHookRunner = { + hasHooks?: (hookName?: string) => boolean; + runBeforeCompaction?: ( + metrics: { messageCount: number; tokenCount?: number }, + context: { + sessionId: string; + agentId: string; + sessionKey: string; + workspaceDir: string; + messageProvider?: string; + }, + ) => Promise | void; + runAfterCompaction?: ( + metrics: { + messageCount: number; + tokenCount?: number; + compactedCount: number; + sessionFile: string; + }, + context: { + sessionId: string; + agentId: string; + sessionKey: string; + workspaceDir: string; + messageProvider?: string; + }, + ) => Promise | void; +}; + +function asCompactionHookRunner( + hookRunner: ReturnType | null | undefined, +): CompactionHookRunner | null { + if (!hookRunner) { + return null; + } + return { + hasHooks: (hookName?: string) => hookRunner.hasHooks?.(hookName as never) ?? false, + runBeforeCompaction: hookRunner.runBeforeCompaction?.bind(hookRunner), + runAfterCompaction: hookRunner.runAfterCompaction?.bind(hookRunner), + }; +} + +function estimateTokenCountSafe( + messages: AgentMessage[], + estimateTokensFn: (message: AgentMessage) => number, +): number | undefined { + try { + let total = 0; + for (const message of messages) { + total += estimateTokensFn(message); + } + return total; + } catch { + return undefined; + } +} + +function buildBeforeCompactionHookMetrics(params: { + originalMessages: AgentMessage[]; + currentMessages: AgentMessage[]; + observedTokenCount?: number; + estimateTokensFn: (message: AgentMessage) => number; +}) { + return { + messageCountOriginal: params.originalMessages.length, + tokenCountOriginal: estimateTokenCountSafe(params.originalMessages, params.estimateTokensFn), + messageCountBefore: params.currentMessages.length, + tokenCountBefore: + params.observedTokenCount ?? + estimateTokenCountSafe(params.currentMessages, params.estimateTokensFn), + }; +} + +async function runBeforeCompactionHooks(params: { + hookRunner?: CompactionHookRunner | null; + sessionId: string; + sessionKey?: string; + sessionAgentId: string; + workspaceDir: string; + messageProvider?: string; + metrics: ReturnType; +}) { + const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); + const hookSessionKey = params.sessionKey?.trim() || params.sessionId; + try { + const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, { + sessionId: params.sessionId, + missingSessionKey, + messageCount: params.metrics.messageCountBefore, + tokenCount: params.metrics.tokenCountBefore, + messageCountOriginal: params.metrics.messageCountOriginal, + tokenCountOriginal: params.metrics.tokenCountOriginal, + }); + await triggerInternalHook(hookEvent); + } catch (err) { + log.warn("session:compact:before hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + if (params.hookRunner?.hasHooks?.("before_compaction")) { + try { + await params.hookRunner.runBeforeCompaction?.( + { + messageCount: params.metrics.messageCountBefore, + tokenCount: params.metrics.tokenCountBefore, + }, + { + sessionId: params.sessionId, + agentId: params.sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: params.workspaceDir, + messageProvider: params.messageProvider, + }, + ); + } catch (err) { + log.warn("before_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + } + return { + hookSessionKey, + missingSessionKey, + }; +} + +function containsRealConversationMessages(messages: AgentMessage[]): boolean { + return messages.some((message, index, allMessages) => + hasRealConversationContent(message, allMessages, index), + ); +} + +function estimateTokensAfterCompaction(params: { + messagesAfter: AgentMessage[]; + observedTokenCount?: number; + fullSessionTokensBefore: number; + estimateTokensFn: (message: AgentMessage) => number; +}) { + const tokensAfter = estimateTokenCountSafe(params.messagesAfter, params.estimateTokensFn); + if (tokensAfter === undefined) { + return undefined; + } + const sanityCheckBaseline = params.observedTokenCount ?? params.fullSessionTokensBefore; + if ( + sanityCheckBaseline > 0 && + tokensAfter > + (params.observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1) + ) { + return undefined; + } + return tokensAfter; +} + +async function runAfterCompactionHooks(params: { + hookRunner?: CompactionHookRunner | null; + sessionId: string; + sessionAgentId: string; + hookSessionKey: string; + missingSessionKey: boolean; + workspaceDir: string; + messageProvider?: string; + messageCountAfter: number; + tokensAfter?: number; + compactedCount: number; + sessionFile: string; + summaryLength?: number; + tokensBefore?: number; + firstKeptEntryId?: string; +}) { + try { + const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, { + sessionId: params.sessionId, + missingSessionKey: params.missingSessionKey, + messageCount: params.messageCountAfter, + tokenCount: params.tokensAfter, + compactedCount: params.compactedCount, + summaryLength: params.summaryLength, + tokensBefore: params.tokensBefore, + tokensAfter: params.tokensAfter, + firstKeptEntryId: params.firstKeptEntryId, + }); + await triggerInternalHook(hookEvent); + } catch (err) { + log.warn("session:compact:after hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + if (params.hookRunner?.hasHooks?.("after_compaction")) { + try { + await params.hookRunner.runAfterCompaction?.( + { + messageCount: params.messageCountAfter, + tokenCount: params.tokensAfter, + compactedCount: params.compactedCount, + sessionFile: params.sessionFile, + }, + { + sessionId: params.sessionId, + agentId: params.sessionAgentId, + sessionKey: params.hookSessionKey, + workspaceDir: params.workspaceDir, + messageProvider: params.messageProvider, + }, + ); + } catch (err) { + log.warn("after_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + } +} + /** * Core compaction logic without lane queueing. * Use this when already inside a session/global lane to avoid deadlocks. @@ -886,72 +1102,24 @@ export async function compactEmbeddedPiSessionDirect( if (limited.length > 0) { session.agent.replaceMessages(limited); } - const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); - const hookSessionKey = params.sessionKey?.trim() || params.sessionId; - const hookRunner = getGlobalHookRunner(); + const hookRunner = asCompactionHookRunner(getGlobalHookRunner()); const observedTokenCount = normalizeObservedTokenCount(params.currentTokenCount); - const messageCountOriginal = originalMessages.length; - let tokenCountOriginal: number | undefined; - try { - tokenCountOriginal = 0; - for (const message of originalMessages) { - tokenCountOriginal += estimateTokens(message); - } - } catch { - tokenCountOriginal = undefined; - } - const messageCountBefore = session.messages.length; - let tokenCountBefore = observedTokenCount; - if (tokenCountBefore === undefined) { - try { - tokenCountBefore = 0; - for (const message of session.messages) { - tokenCountBefore += estimateTokens(message); - } - } catch { - tokenCountBefore = undefined; - } - } - // TODO(#7175): Consider exposing full message snapshots or pre-compaction injection - // hooks; current events only report counts/metadata. - try { - const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, { - sessionId: params.sessionId, - missingSessionKey, - messageCount: messageCountBefore, - tokenCount: tokenCountBefore, - messageCountOriginal, - tokenCountOriginal, - }); - await triggerInternalHook(hookEvent); - } catch (err) { - log.warn("session:compact:before hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - if (hookRunner?.hasHooks("before_compaction")) { - try { - await hookRunner.runBeforeCompaction( - { - messageCount: messageCountBefore, - tokenCount: tokenCountBefore, - }, - { - sessionId: params.sessionId, - agentId: sessionAgentId, - sessionKey: hookSessionKey, - workspaceDir: effectiveWorkspace, - messageProvider: resolvedMessageProvider, - }, - ); - } catch (err) { - log.warn("before_compaction hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - } + const beforeHookMetrics = buildBeforeCompactionHookMetrics({ + originalMessages, + currentMessages: session.messages, + observedTokenCount, + estimateTokensFn: estimateTokens, + }); + const { hookSessionKey, missingSessionKey } = await runBeforeCompactionHooks({ + hookRunner, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionAgentId, + workspaceDir: effectiveWorkspace, + messageProvider: resolvedMessageProvider, + metrics: beforeHookMetrics, + }); + const { messageCountOriginal } = beforeHookMetrics; const diagEnabled = log.isEnabled("debug"); const preMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined; if (diagEnabled && preMetrics) { @@ -967,11 +1135,7 @@ export async function compactEmbeddedPiSessionDirect( ); } - if ( - !session.messages.some((message, index, messages) => - hasRealConversationContent(message, messages, index), - ) - ) { + if (!containsRealConversationMessages(session.messages)) { log.info( `[compaction] skipping — no real conversation messages (sessionKey=${params.sessionKey ?? params.sessionId})`, ); @@ -1013,27 +1177,12 @@ export async function compactEmbeddedPiSessionDirect( sessionFile: params.sessionFile, }); // Estimate tokens after compaction by summing token estimates for remaining messages - let tokensAfter: number | undefined; - try { - tokensAfter = 0; - for (const message of session.messages) { - tokensAfter += estimateTokens(message); - } - // Sanity check: compare against the best full-session pre-compaction baseline. - // Prefer the provider-observed live count when available; otherwise use the - // heuristic full-session estimate with a 10% margin for counter jitter. - const sanityCheckBaseline = observedTokenCount ?? fullSessionTokensBefore; - if ( - sanityCheckBaseline > 0 && - tokensAfter > - (observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1) - ) { - tokensAfter = undefined; // Don't trust the estimate - } - } catch { - // If estimation fails, leave tokensAfter undefined - tokensAfter = undefined; - } + const tokensAfter = estimateTokensAfterCompaction({ + messagesAfter: session.messages, + observedTokenCount, + fullSessionTokensBefore, + estimateTokensFn: estimateTokens, + }); const messageCountAfter = session.messages.length; const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter); const postMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined; @@ -1051,51 +1200,22 @@ export async function compactEmbeddedPiSessionDirect( `delta.estTokens=${typeof preMetrics.estTokens === "number" && typeof postMetrics.estTokens === "number" ? postMetrics.estTokens - preMetrics.estTokens : "unknown"}`, ); } - // TODO(#9611): Consider exposing compaction summaries or post-compaction injection; - // current events only report summary metadata. - try { - const hookEvent = createInternalHookEvent("session", "compact:after", hookSessionKey, { - sessionId: params.sessionId, - missingSessionKey, - messageCount: messageCountAfter, - tokenCount: tokensAfter, - compactedCount, - summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, - tokensBefore: result.tokensBefore, - tokensAfter, - firstKeptEntryId: result.firstKeptEntryId, - }); - await triggerInternalHook(hookEvent); - } catch (err) { - log.warn("session:compact:after hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - if (hookRunner?.hasHooks("after_compaction")) { - try { - await hookRunner.runAfterCompaction( - { - messageCount: messageCountAfter, - tokenCount: tokensAfter, - compactedCount, - sessionFile: params.sessionFile, - }, - { - sessionId: params.sessionId, - agentId: sessionAgentId, - sessionKey: hookSessionKey, - workspaceDir: effectiveWorkspace, - messageProvider: resolvedMessageProvider, - }, - ); - } catch (err) { - log.warn("after_compaction hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - } + await runAfterCompactionHooks({ + hookRunner, + sessionId: params.sessionId, + sessionAgentId, + hookSessionKey, + missingSessionKey, + workspaceDir: effectiveWorkspace, + messageProvider: resolvedMessageProvider, + messageCountAfter, + tokensAfter, + compactedCount, + sessionFile: params.sessionFile, + summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, + tokensBefore: result.tokensBefore, + firstKeptEntryId: result.firstKeptEntryId, + }); // Truncate session file to remove compacted entries (#39953) if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) { try { @@ -1193,7 +1313,9 @@ export async function compactEmbeddedPiSession( // Fire before_compaction / after_compaction hooks here so plugin subscribers // are notified regardless of which engine is active. const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; - const hookRunner = engineOwnsCompaction ? getGlobalHookRunner() : null; + const hookRunner = engineOwnsCompaction + ? asCompactionHookRunner(getGlobalHookRunner()) + : null; const hookSessionKey = params.sessionKey?.trim() || params.sessionId; const { sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, @@ -1210,12 +1332,11 @@ export async function compactEmbeddedPiSession( // Engine-owned compaction doesn't load the transcript at this level, so // message counts are unavailable. We pass sessionFile so hook subscribers // can read the transcript themselves if they need exact counts. - if (hookRunner?.hasHooks("before_compaction")) { + if (hookRunner?.hasHooks?.("before_compaction") && hookRunner.runBeforeCompaction) { try { await hookRunner.runBeforeCompaction( { messageCount: -1, - sessionFile: params.sessionFile, }, hookCtx, ); @@ -1252,7 +1373,12 @@ export async function compactEmbeddedPiSession( sessionFile: params.sessionFile, }); } - if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) { + if ( + result.ok && + result.compacted && + hookRunner?.hasHooks?.("after_compaction") && + hookRunner.runAfterCompaction + ) { try { await hookRunner.runAfterCompaction( { @@ -1293,4 +1419,10 @@ export async function compactEmbeddedPiSession( export const __testing = { hasRealConversationContent, hasMeaningfulConversationContent, + containsRealConversationMessages, + estimateTokensAfterCompaction, + buildBeforeCompactionHookMetrics, + runBeforeCompactionHooks, + runAfterCompactionHooks, + runPostCompactionSideEffects, } as const; diff --git a/src/agents/pi-embedded-runner/model.test.ts b/src/agents/pi-embedded-runner/model.test.ts index 83511bdf939..fe42634da11 100644 --- a/src/agents/pi-embedded-runner/model.test.ts +++ b/src/agents/pi-embedded-runner/model.test.ts @@ -5,6 +5,9 @@ vi.mock("../pi-model-discovery.js", () => ({ discoverModels: vi.fn(() => ({ find: vi.fn(() => null) })), })); +const OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"; +const OPENROUTER_FALLBACK_COST = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }; + import type { OpenRouterModelCapabilities } from "./openrouter-model-capabilities.js"; const mockGetOpenRouterModelCapabilities = vi.fn< @@ -19,7 +22,348 @@ vi.mock("./openrouter-model-capabilities.js", () => ({ mockLoadOpenRouterModelCapabilities(modelId), })); +vi.mock("../../plugins/provider-runtime.js", async (importOriginal) => { + const actual = await importOriginal(); + const HANDLED_DYNAMIC_PROVIDERS = new Set([ + "openrouter", + "github-copilot", + "openai-codex", + "openai", + "anthropic", + "zai", + ]); + const OPENAI_BASE_URL = "https://api.openai.com/v1"; + const OPENAI_CODEX_BASE_URL = "https://chatgpt.com/backend-api"; + const ANTHROPIC_BASE_URL = "https://api.anthropic.com"; + const ZAI_BASE_URL = "https://api.z.ai/api/paas/v4"; + const DEFAULT_CONTEXT_WINDOW = 200_000; + const DEFAULT_MAX_TOKENS = 8192; + const findTemplate = ( + ctx: { modelRegistry: { find: (provider: string, modelId: string) => unknown } }, + provider: string, + templateIds: readonly string[], + ) => { + for (const templateId of templateIds) { + const template = ctx.modelRegistry.find(provider, templateId) as Record< + string, + unknown + > | null; + if (template) { + return template; + } + } + return undefined; + }; + const cloneTemplate = ( + template: Record | undefined, + modelId: string, + patch: Record, + fallback: Record, + ) => + ({ + ...(template ?? fallback), + id: modelId, + name: modelId, + ...patch, + }) as Record; + const buildOpenRouterModel = (modelId: string) => { + const capabilities = mockGetOpenRouterModelCapabilities(modelId); + return { + id: modelId, + name: capabilities?.name ?? modelId, + api: "openai-completions" as const, + provider: "openrouter", + baseUrl: OPENROUTER_BASE_URL, + reasoning: capabilities?.reasoning ?? false, + input: capabilities?.input ?? (["text"] as const), + cost: capabilities?.cost ?? OPENROUTER_FALLBACK_COST, + contextWindow: capabilities?.contextWindow ?? 200_000, + maxTokens: capabilities?.maxTokens ?? 8192, + }; + }; + const buildDynamicModel = (params: { + provider: string; + modelId: string; + modelRegistry: { find: (provider: string, modelId: string) => unknown }; + }) => { + const modelId = params.modelId.trim(); + const lower = modelId.toLowerCase(); + switch (params.provider) { + case "openrouter": + return buildOpenRouterModel(modelId); + case "github-copilot": { + const existing = params.modelRegistry.find("github-copilot", lower); + if (existing) { + return undefined; + } + const template = findTemplate(params, "github-copilot", ["gpt-5.2-codex"]); + if (lower === "gpt-5.3-codex" && template) { + return cloneTemplate( + template, + modelId, + {}, + { + provider: "github-copilot", + api: "openai-responses", + reasoning: false, + input: ["text", "image"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: 128_000, + maxTokens: DEFAULT_MAX_TOKENS, + }, + ); + } + return { + id: modelId, + name: modelId, + provider: "github-copilot", + api: "openai-responses", + reasoning: /^o[13](\\b|$)/.test(lower), + input: ["text", "image"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: 128_000, + maxTokens: DEFAULT_MAX_TOKENS, + }; + } + case "openai-codex": { + const template = + lower === "gpt-5.4" + ? findTemplate(params, "openai-codex", ["gpt-5.3-codex", "gpt-5.2-codex"]) + : lower === "gpt-5.3-codex-spark" + ? findTemplate(params, "openai-codex", ["gpt-5.3-codex", "gpt-5.2-codex"]) + : findTemplate(params, "openai-codex", ["gpt-5.2-codex"]); + const fallback = { + provider: "openai-codex", + api: "openai-codex-responses", + baseUrl: OPENAI_CODEX_BASE_URL, + reasoning: true, + input: ["text", "image"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: DEFAULT_CONTEXT_WINDOW, + maxTokens: DEFAULT_CONTEXT_WINDOW, + }; + if (lower === "gpt-5.4") { + return cloneTemplate( + template, + modelId, + { + contextWindow: 1_050_000, + maxTokens: 128_000, + provider: "openai-codex", + api: "openai-codex-responses", + baseUrl: OPENAI_CODEX_BASE_URL, + }, + fallback, + ); + } + if (lower === "gpt-5.3-codex-spark") { + return cloneTemplate( + template, + modelId, + { + provider: "openai-codex", + api: "openai-codex-responses", + baseUrl: OPENAI_CODEX_BASE_URL, + reasoning: true, + input: ["text"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: 128_000, + maxTokens: 128_000, + }, + fallback, + ); + } + if (lower === "gpt-5.3-codex") { + return cloneTemplate( + template, + modelId, + { + provider: "openai-codex", + api: "openai-codex-responses", + baseUrl: OPENAI_CODEX_BASE_URL, + }, + fallback, + ); + } + return undefined; + } + case "openai": { + const templateIds = + lower === "gpt-5.4" + ? ["gpt-5.2"] + : lower === "gpt-5.4-pro" + ? ["gpt-5.2-pro", "gpt-5.2"] + : lower === "gpt-5.4-mini" + ? ["gpt-5-mini"] + : lower === "gpt-5.4-nano" + ? ["gpt-5-nano", "gpt-5-mini"] + : undefined; + if (!templateIds) { + return undefined; + } + const template = findTemplate(params, "openai", templateIds); + const patch = + lower === "gpt-5.4" || lower === "gpt-5.4-pro" + ? { + provider: "openai", + api: "openai-responses", + baseUrl: OPENAI_BASE_URL, + reasoning: true, + input: ["text", "image"], + contextWindow: 1_050_000, + maxTokens: 128_000, + } + : { + provider: "openai", + api: "openai-responses", + baseUrl: OPENAI_BASE_URL, + reasoning: true, + input: ["text", "image"], + }; + return cloneTemplate(template, modelId, patch, { + provider: "openai", + api: "openai-responses", + baseUrl: OPENAI_BASE_URL, + reasoning: true, + input: ["text", "image"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: patch.contextWindow ?? DEFAULT_CONTEXT_WINDOW, + maxTokens: patch.maxTokens ?? DEFAULT_CONTEXT_WINDOW, + }); + } + case "anthropic": { + if (lower !== "claude-opus-4-6" && lower !== "claude-sonnet-4-6") { + return undefined; + } + const template = findTemplate( + params, + "anthropic", + lower === "claude-opus-4-6" ? ["claude-opus-4-5"] : ["claude-sonnet-4-5"], + ); + return cloneTemplate( + template, + modelId, + { + provider: "anthropic", + api: "anthropic-messages", + baseUrl: ANTHROPIC_BASE_URL, + reasoning: true, + }, + { + provider: "anthropic", + api: "anthropic-messages", + baseUrl: ANTHROPIC_BASE_URL, + reasoning: true, + input: ["text", "image"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: DEFAULT_CONTEXT_WINDOW, + maxTokens: DEFAULT_CONTEXT_WINDOW, + }, + ); + } + case "zai": { + if (lower !== "glm-5") { + return undefined; + } + const template = findTemplate(params, "zai", ["glm-4.7"]); + return cloneTemplate( + template, + modelId, + { + provider: "zai", + api: "openai-completions", + baseUrl: ZAI_BASE_URL, + reasoning: true, + }, + { + provider: "zai", + api: "openai-completions", + baseUrl: ZAI_BASE_URL, + reasoning: true, + input: ["text"], + cost: OPENROUTER_FALLBACK_COST, + contextWindow: DEFAULT_CONTEXT_WINDOW, + maxTokens: DEFAULT_CONTEXT_WINDOW, + }, + ); + } + default: + return undefined; + } + }; + const normalizeDynamicModel = (params: { provider: string; model: Record }) => { + if (params.provider === "openai") { + const baseUrl = typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined; + if (params.model.api === "openai-completions" && (!baseUrl || baseUrl === OPENAI_BASE_URL)) { + return { ...params.model, api: "openai-responses" }; + } + } + if (params.provider === "openai-codex") { + const baseUrl = typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined; + const nextApi = + params.model.api === "openai-responses" && + (!baseUrl || baseUrl === OPENAI_BASE_URL || baseUrl === OPENAI_CODEX_BASE_URL) + ? "openai-codex-responses" + : params.model.api; + const nextBaseUrl = + nextApi === "openai-codex-responses" && (!baseUrl || baseUrl === OPENAI_BASE_URL) + ? OPENAI_CODEX_BASE_URL + : baseUrl; + if (nextApi !== params.model.api || nextBaseUrl !== baseUrl) { + return { ...params.model, api: nextApi, baseUrl: nextBaseUrl }; + } + } + return undefined; + }; + return { + ...actual, + resolveProviderRuntimePlugin: ( + params: Parameters[0], + ) => + HANDLED_DYNAMIC_PROVIDERS.has(params.provider) + ? { + id: params.provider, + prepareDynamicModel: + params.provider === "openrouter" + ? async (ctx: { modelId: string }) => { + await mockLoadOpenRouterModelCapabilities(ctx.modelId); + } + : undefined, + resolveDynamicModel: (ctx: { + provider: string; + modelId: string; + modelRegistry: { find: (provider: string, modelId: string) => unknown }; + }) => buildDynamicModel(ctx), + normalizeResolvedModel: (ctx: { provider: string; model: Record }) => + normalizeDynamicModel(ctx), + } + : actual.resolveProviderRuntimePlugin(params), + runProviderDynamicModel: (params: Parameters[0]) => + buildDynamicModel({ + provider: params.provider, + modelId: params.context.modelId, + modelRegistry: params.context.modelRegistry, + }) ?? actual.runProviderDynamicModel(params), + prepareProviderDynamicModel: async ( + params: Parameters[0], + ) => + params.provider === "openrouter" + ? await mockLoadOpenRouterModelCapabilities(params.context.modelId) + : await actual.prepareProviderDynamicModel(params), + normalizeProviderResolvedModelWithPlugin: ( + params: Parameters[0], + ) => + HANDLED_DYNAMIC_PROVIDERS.has(params.provider) + ? normalizeDynamicModel({ + provider: params.provider, + model: params.context.model as unknown as Record, + }) + : actual.normalizeProviderResolvedModelWithPlugin(params), + }; +}); + import type { OpenClawConfig } from "../../config/config.js"; +import { clearProviderRuntimeHookCache } from "../../plugins/provider-runtime.js"; import { buildInlineProviderModels, resolveModel, resolveModelAsync } from "./model.js"; import { buildOpenAICodexForwardCompatExpectation, @@ -30,6 +374,7 @@ import { } from "./model.test-harness.js"; beforeEach(() => { + clearProviderRuntimeHookCache(); resetMockDiscoverModels(); mockGetOpenRouterModelCapabilities.mockReset(); mockGetOpenRouterModelCapabilities.mockReturnValue(undefined); diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index b9b472c9a9c..bd597c8585a 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -103,6 +103,8 @@ type RunWithModelFallbackParams = { }; beforeEach(() => { + vi.useRealTimers(); + vi.clearAllTimers(); runEmbeddedPiAgentMock.mockClear(); runCliAgentMock.mockClear(); runWithModelFallbackMock.mockClear(); @@ -151,6 +153,7 @@ beforeEach(() => { }); afterEach(() => { + vi.clearAllTimers(); vi.useRealTimers(); resetSystemEventsForTest(); }); diff --git a/src/auto-reply/reply/commands.test.ts b/src/auto-reply/reply/commands.test.ts index 034eb7634a7..ebe49cbd9fc 100644 --- a/src/auto-reply/reply/commands.test.ts +++ b/src/auto-reply/reply/commands.test.ts @@ -142,6 +142,8 @@ afterAll(async () => { }); beforeEach(() => { + vi.useRealTimers(); + vi.clearAllTimers(); setDefaultChannelPluginRegistryForTests(); readConfigFileSnapshotMock.mockImplementation(async () => { const configPath = process.env.OPENCLAW_CONFIG_PATH; diff --git a/src/commands/agent.test.ts b/src/commands/agent.test.ts index 04d92a2d76d..cc2718184bc 100644 --- a/src/commands/agent.test.ts +++ b/src/commands/agent.test.ts @@ -3,8 +3,11 @@ import path from "node:path"; import { beforeEach, describe, expect, it, type MockInstance, vi } from "vitest"; import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; import "../cron/isolated-agent.mocks.js"; +import { __testing as acpManagerTesting } from "../acp/control-plane/manager.js"; +import { resolveAgentDir, resolveSessionAgentId } from "../agents/agent-scope.js"; import * as authProfilesModule from "../agents/auth-profiles.js"; import * as cliRunnerModule from "../agents/cli-runner.js"; +import { resolveSession } from "../agents/command/session.js"; import { FailoverError } from "../agents/failover-error.js"; import { loadModelCatalog } from "../agents/model-catalog.js"; import * as modelSelectionModule from "../agents/model-selection.js"; @@ -12,13 +15,19 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import * as commandSecretGatewayModule from "../cli/command-secret-gateway.js"; import type { OpenClawConfig } from "../config/config.js"; import * as configModule from "../config/config.js"; +import { clearSessionStoreCacheForTest } from "../config/sessions.js"; import * as sessionPathsModule from "../config/sessions/paths.js"; -import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js"; -import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { + emitAgentEvent, + onAgentEvent, + resetAgentEventsForTest, + resetAgentRunContextForTest, +} from "../infra/agent-events.js"; +import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; +import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../plugins/runtime.js"; import type { RuntimeEnv } from "../runtime.js"; import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js"; import { agentCommand, agentCommandFromIngress } from "./agent.js"; -import * as agentDeliveryModule from "./agent/delivery.js"; vi.mock("../logging/subsystem.js", () => { const createMockLogger = () => ({ @@ -57,6 +66,14 @@ vi.mock("../agents/workspace.js", () => { }; }); +vi.mock("../agents/command/session-store.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + updateSessionStoreAfterAgentRun: vi.fn(async () => undefined), + }; +}); + vi.mock("../agents/skills.js", () => ({ buildWorkspaceSkillSnapshot: vi.fn(() => undefined), })); @@ -77,7 +94,6 @@ const configSpy = vi.spyOn(configModule, "loadConfig"); const readConfigFileSnapshotForWriteSpy = vi.spyOn(configModule, "readConfigFileSnapshotForWrite"); const setRuntimeConfigSnapshotSpy = vi.spyOn(configModule, "setRuntimeConfigSnapshot"); const runCliAgentSpy = vi.spyOn(cliRunnerModule, "runCliAgent"); -const deliverAgentCommandResultSpy = vi.spyOn(agentDeliveryModule, "deliverAgentCommandResult"); async function withTempHome(fn: (home: string) => Promise): Promise { return withTempHomeBase(fn, { prefix: "openclaw-agent-" }); @@ -90,7 +106,7 @@ function mockConfig( telegramOverrides?: Partial["telegram"]>>, agentsList?: Array<{ id: string; default?: boolean }>, ) { - configSpy.mockReturnValue({ + const cfg = { agents: { defaults: { model: { primary: "anthropic/claude-opus-4-5" }, @@ -104,7 +120,9 @@ function mockConfig( channels: { telegram: telegramOverrides ? { ...telegramOverrides } : undefined, }, - }); + } as OpenClawConfig; + configSpy.mockReturnValue(cfg); + return cfg; } async function runWithDefaultAgentConfig(params: { @@ -168,12 +186,7 @@ function readSessionStore(storePath: string): Record { } async function withCrossAgentResumeFixture( - run: (params: { - home: string; - storePattern: string; - sessionId: string; - sessionKey: string; - }) => Promise, + run: (params: { sessionId: string; sessionKey: string; cfg: OpenClawConfig }) => Promise, ): Promise { await withTempHome(async (home) => { const storePattern = path.join(home, "sessions", "{agentId}", "sessions.json"); @@ -187,12 +200,11 @@ async function withCrossAgentResumeFixture( systemSent: true, }, }); - mockConfig(home, storePattern, undefined, undefined, [ + const cfg = mockConfig(home, storePattern, undefined, undefined, [ { id: "dev" }, { id: "exec", default: true }, ]); - await agentCommand({ message: "resume me", sessionId }, runtime); - await run({ home, storePattern, sessionId, sessionKey }); + await run({ sessionId, sessionKey, cfg }); }); } @@ -278,6 +290,11 @@ function createTelegramOutboundPlugin() { beforeEach(() => { vi.clearAllMocks(); + clearSessionStoreCacheForTest(); + resetAgentEventsForTest(); + resetAgentRunContextForTest(); + resetPluginRuntimeStateForTest(); + acpManagerTesting.resetAcpSessionManagerForTests(); configModule.clearRuntimeConfigSnapshot(); runCliAgentSpy.mockResolvedValue(createDefaultAgentResult() as never); vi.mocked(runEmbeddedPiAgent).mockResolvedValue(createDefaultAgentResult()); @@ -479,19 +496,29 @@ describe("agentCommand", () => { }); it("uses the resumed session agent scope when sessionId resolves to another agent store", async () => { - await withCrossAgentResumeFixture(async ({ sessionKey }) => { - const callArgs = getLastEmbeddedCall(); - expect(callArgs?.sessionKey).toBe(sessionKey); - expect(callArgs?.agentId).toBe("exec"); - expect(callArgs?.agentDir).toContain(`${path.sep}agents${path.sep}exec${path.sep}agent`); + await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => { + const resolution = resolveSession({ cfg, sessionId }); + expect(resolution.sessionKey).toBe(sessionKey); + const agentId = resolveSessionAgentId({ sessionKey: resolution.sessionKey, config: cfg }); + expect(agentId).toBe("exec"); + expect(resolveAgentDir(cfg, agentId)).toContain( + `${path.sep}agents${path.sep}exec${path.sep}agent`, + ); }); }); it("forwards resolved outbound session context when resuming by sessionId", async () => { - await withCrossAgentResumeFixture(async ({ sessionKey }) => { - const deliverCall = deliverAgentCommandResultSpy.mock.calls.at(-1)?.[0]; - expect(deliverCall?.opts.sessionKey).toBeUndefined(); - expect(deliverCall?.outboundSession).toEqual( + await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => { + const resolution = resolveSession({ cfg, sessionId }); + expect(resolution.sessionKey).toBe(sessionKey); + const agentId = resolveSessionAgentId({ sessionKey: resolution.sessionKey, config: cfg }); + expect( + buildOutboundSessionContext({ + cfg, + sessionKey: resolution.sessionKey, + agentId, + }), + ).toEqual( expect.objectContaining({ key: sessionKey, agentId: "exec", diff --git a/src/commands/doctor-config-flow.test.ts b/src/commands/doctor-config-flow.test.ts index 72f9b452af1..f7e427915c6 100644 --- a/src/commands/doctor-config-flow.test.ts +++ b/src/commands/doctor-config-flow.test.ts @@ -3,7 +3,6 @@ import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import { resolveMatrixAccountStorageRoot } from "../../extensions/matrix/runtime-api.js"; import { withTempHome } from "../../test/helpers/temp-home.js"; -import * as commandSecretGatewayModule from "../cli/command-secret-gateway.js"; import * as noteModule from "../terminal/note.js"; import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js"; import { runDoctorConfigWithInput } from "./doctor-config-flow.test-utils.js"; @@ -36,6 +35,22 @@ async function collectDoctorWarnings(config: Record): Promise { } as unknown as Response; }); vi.stubGlobal("fetch", globalFetch); - const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js"); - const telegramProxyModule = await import("../../extensions/telegram/src/proxy.js"); + const { + telegramFetchModule, + telegramProxyModule, + loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh, + } = await loadFreshDoctorFlowDeps(); const resolveTelegramFetch = vi.spyOn(telegramFetchModule, "resolveTelegramFetch"); const makeProxyFetch = vi.spyOn(telegramProxyModule, "makeProxyFetch"); resolveTelegramFetch.mockReturnValue(fetchSpy as unknown as typeof fetch); @@ -625,7 +643,7 @@ describe("doctor config flow", () => { }, }, }, - run: loadAndMaybeMigrateDoctorConfig, + run: loadDoctorFlowFresh, }); const cfg = result.cfg as unknown as { @@ -656,7 +674,9 @@ describe("doctor config flow", () => { }); it("does not crash when Telegram allowFrom repair sees unavailable SecretRef-backed credentials", async () => { - const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); + const { noteModule: freshNoteModule, loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh } = + await loadFreshDoctorFlowDeps(); + const noteSpy = vi.spyOn(freshNoteModule, "note").mockImplementation(() => {}); const fetchSpy = vi.fn(); vi.stubGlobal("fetch", fetchSpy); try { @@ -675,7 +695,7 @@ describe("doctor config flow", () => { }, }, }, - run: loadAndMaybeMigrateDoctorConfig, + run: loadDoctorFlowFresh, }); const cfg = result.cfg as { @@ -717,14 +737,18 @@ describe("doctor config flow", () => { }); vi.stubGlobal("fetch", globalFetch); const proxyFetch = vi.fn(); - const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js"); - const telegramProxyModule = await import("../../extensions/telegram/src/proxy.js"); + const { + telegramFetchModule, + telegramProxyModule, + commandSecretGatewayModule: freshCommandSecretGatewayModule, + loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh, + } = await loadFreshDoctorFlowDeps(); const resolveTelegramFetch = vi.spyOn(telegramFetchModule, "resolveTelegramFetch"); const makeProxyFetch = vi.spyOn(telegramProxyModule, "makeProxyFetch"); makeProxyFetch.mockReturnValue(proxyFetch as unknown as typeof fetch); resolveTelegramFetch.mockReturnValue(fetchSpy as unknown as typeof fetch); const resolveSecretsSpy = vi - .spyOn(commandSecretGatewayModule, "resolveCommandSecretRefsViaGateway") + .spyOn(freshCommandSecretGatewayModule, "resolveCommandSecretRefsViaGateway") .mockResolvedValue({ diagnostics: [], targetStatesByPath: {}, @@ -761,7 +785,7 @@ describe("doctor config flow", () => { }, }, }, - run: loadAndMaybeMigrateDoctorConfig, + run: loadDoctorFlowFresh, }); const cfg = result.cfg as { @@ -786,7 +810,12 @@ describe("doctor config flow", () => { }); it("sanitizes config-derived doctor warnings and changes before logging", async () => { - const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); + const { + telegramFetchModule, + noteModule: freshNoteModule, + loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh, + } = await loadFreshDoctorFlowDeps(); + const noteSpy = vi.spyOn(freshNoteModule, "note").mockImplementation(() => {}); const globalFetch = vi.fn(async () => { throw new Error("global fetch should not be called"); }); @@ -795,7 +824,6 @@ describe("doctor config flow", () => { json: async () => ({ ok: true, result: { id: 12345 } }), })); vi.stubGlobal("fetch", globalFetch); - const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js"); const resolveTelegramFetch = vi.spyOn(telegramFetchModule, "resolveTelegramFetch"); resolveTelegramFetch.mockReturnValue(fetchSpy as unknown as typeof fetch); try { @@ -830,7 +858,7 @@ describe("doctor config flow", () => { }, }, }, - run: loadAndMaybeMigrateDoctorConfig, + run: loadDoctorFlowFresh, }); const outputs = noteSpy.mock.calls @@ -868,7 +896,9 @@ describe("doctor config flow", () => { }); it("warns and continues when Telegram account inspection hits inactive SecretRef surfaces", async () => { - const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); + const { noteModule: freshNoteModule, loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh } = + await loadFreshDoctorFlowDeps(); + const noteSpy = vi.spyOn(freshNoteModule, "note").mockImplementation(() => {}); const fetchSpy = vi.fn(); vi.stubGlobal("fetch", fetchSpy); try { @@ -892,7 +922,7 @@ describe("doctor config flow", () => { }, }, }, - run: loadAndMaybeMigrateDoctorConfig, + run: loadDoctorFlowFresh, }); const cfg = result.cfg as {