From f01c41b27a13391e5d8cabbc0dcbb0db982b9a15 Mon Sep 17 00:00:00 2001 From: David Rudduck <47308254+davidrudduck@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:19:20 +1000 Subject: [PATCH] fix(context-engine): guard compact() throw + fire hooks for ownsCompaction engines (#41361) Merged via squash. Prepared head SHA: 0957b32dc63b16d710403565953b77bfbd2bd987 Co-authored-by: davidrudduck <47308254+davidrudduck@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 2 + .../pi-embedded-runner/compact.hooks.test.ts | 134 +++++++++++++++++- src/agents/pi-embedded-runner/compact.ts | 54 +++++++ .../run.overflow-compaction.fixture.ts | 15 +- .../run.overflow-compaction.loop.test.ts | 20 ++- .../run.overflow-compaction.mocks.shared.ts | 52 ++++++- .../run.overflow-compaction.shared-test.ts | 8 +- .../run.overflow-compaction.test.ts | 85 ++++++++++- src/agents/pi-embedded-runner/run.ts | 109 ++++++++++---- 9 files changed, 429 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30b08625751..dc132c837cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -121,6 +121,7 @@ Docs: https://docs.openclaw.ai - Gateway/config errors: surface up to three validation issues in top-level `config.set`, `config.patch`, and `config.apply` error messages while preserving structured issue details. (#42664) Thanks @huntharo. - Hooks/plugin context parity followup: pass `trigger` and `channelId` through embedded `llm_input`, `agent_end`, and `llm_output` hook contexts so plugins receive the same agent metadata across hook phases. (#42362) Thanks @zhoulf1006. - ACP/main session aliases: canonicalize `main` before ACP session lookup so restarted ACP main sessions rehydrate instead of failing closed with `Session is not ACP-enabled: main`. (#43285, fixes #25692) +- Agents/context-engine compaction: guard thrown engine-owned overflow compaction attempts and fire compaction hooks for `ownsCompaction` engines so overflow recovery no longer crashes and plugin subscribers still observe compact runs. (#41361) thanks @davidrudduck. ## 2026.3.8 @@ -4036,6 +4037,7 @@ Thanks @AlexMikhalev, @CoreyH, @John-Rood, @KrauseFx, @MaudeBot, @Nachx639, @Nic - Gateway/Daemon/Doctor: atomic config writes; repair gateway service entrypoint + install switches; non-interactive legacy migrations; systemd unit alignment + KillMode=process; node bridge keepalive/pings; Launch at Login persistence; bundle MoltbotKit resources + Swift 6.2 compat dylib; relay version check + remove smoke test; regen Swift GatewayModels + keep agent provider string; cron jobId alias + channel alias migration + main session key normalization; heartbeat Telegram accountId resolution; avoid WhatsApp fallback for internal runs; gateway listener error wording; serveBaseUrl param; honor gateway --dev; fix wide-area discovery updates; align agents.defaults schema; provider account metadata in daemon status; refresh Carbon patch for gateway fixes; restore doctor prompter initialValue handling. - Control UI/TUI: persist per-session verbose off + hide tool cards; logs tab opens at bottom; relative asset paths + landing cleanup; session labels lookup/persistence; stop pinning main session in recents; start logs at bottom; TUI status bar refresh + timeout handling + hide reasoning label when off. - Onboarding/Configure: QuickStart single-select provider picker; avoid Codex CLI false-expiry warnings; clarify WhatsApp owner prompt; fix Minimax hosted onboarding (agents.defaults + msteams heartbeat target); remove configure Control UI prompt; honor gateway --dev flag. +- Agent loop: guard overflow compaction throws and restore compaction hooks for engine-owned context engines. (#41361) — thanks @davidrudduck ### Maintenance diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 9ef2a3efe76..dc1511a5e05 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -7,6 +7,7 @@ const { sessionCompactImpl, triggerInternalHook, sanitizeSessionHistoryMock, + contextEngineCompactMock, } = vi.hoisted(() => ({ hookRunner: { hasHooks: vi.fn(), @@ -28,6 +29,14 @@ const { })), triggerInternalHook: vi.fn(), sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages), + contextEngineCompactMock: vi.fn(async () => ({ + ok: true as boolean, + compacted: true as boolean, + reason: undefined as string | undefined, + result: { summary: "engine-summary", tokensAfter: 50 } as + | { summary: string; tokensAfter: number } + | undefined, + })), })); vi.mock("../../plugins/hook-runner-global.js", () => ({ @@ -123,6 +132,27 @@ vi.mock("../session-write-lock.js", () => ({ resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 0), })); +vi.mock("../../context-engine/index.js", () => ({ + ensureContextEnginesInitialized: vi.fn(), + resolveContextEngine: vi.fn(async () => ({ + info: { ownsCompaction: true }, + compact: contextEngineCompactMock, + })), +})); + +vi.mock("../../process/command-queue.js", () => ({ + enqueueCommandInLane: vi.fn((_lane: unknown, task: () => unknown) => task()), +})); + +vi.mock("./lanes.js", () => ({ + resolveSessionLane: vi.fn(() => "test-session-lane"), + resolveGlobalLane: vi.fn(() => "test-global-lane"), +})); + +vi.mock("../context-window-guard.js", () => ({ + resolveContextWindowInfo: vi.fn(() => ({ tokens: 128_000 })), +})); + vi.mock("../bootstrap-files.js", () => ({ makeBootstrapWarn: vi.fn(() => () => {}), resolveBootstrapContextForRun: vi.fn(async () => ({ contextFiles: [] })), @@ -160,7 +190,7 @@ vi.mock("../transcript-policy.js", () => ({ })); vi.mock("./extensions.js", () => ({ - buildEmbeddedExtensionFactories: vi.fn(() => []), + buildEmbeddedExtensionFactories: vi.fn(() => ({ factories: [] })), })); vi.mock("./history.js", () => ({ @@ -251,7 +281,7 @@ vi.mock("./utils.js", () => ({ import { getApiProvider, unregisterApiProviders } from "@mariozechner/pi-ai"; import { getCustomApiRegistrySourceId } from "../custom-api-registry.js"; -import { compactEmbeddedPiSessionDirect } from "./compact.js"; +import { compactEmbeddedPiSessionDirect, compactEmbeddedPiSession } from "./compact.js"; const sessionHook = (action: string) => triggerInternalHook.mock.calls.find( @@ -436,3 +466,103 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { expect(result.ok).toBe(true); }); }); + +describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { + beforeEach(() => { + hookRunner.hasHooks.mockReset(); + hookRunner.runBeforeCompaction.mockReset(); + hookRunner.runAfterCompaction.mockReset(); + contextEngineCompactMock.mockReset(); + contextEngineCompactMock.mockResolvedValue({ + ok: true, + compacted: true, + reason: undefined, + result: { summary: "engine-summary", tokensAfter: 50 }, + }); + resolveModelMock.mockReset(); + resolveModelMock.mockReturnValue({ + model: { provider: "openai", api: "responses", id: "fake", input: [] }, + error: null, + authStorage: { setRuntimeApiKey: vi.fn() }, + modelRegistry: {}, + }); + }); + + it("fires before_compaction with sentinel -1 and after_compaction on success", async () => { + hookRunner.hasHooks.mockReturnValue(true); + + const result = await compactEmbeddedPiSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + messageChannel: "telegram", + customInstructions: "focus on decisions", + enqueue: (task) => task(), + }); + + expect(result.ok).toBe(true); + expect(result.compacted).toBe(true); + + expect(hookRunner.runBeforeCompaction).toHaveBeenCalledWith( + { messageCount: -1, sessionFile: "/tmp/session.jsonl" }, + expect.objectContaining({ + sessionKey: "agent:main:session-1", + messageProvider: "telegram", + }), + ); + expect(hookRunner.runAfterCompaction).toHaveBeenCalledWith( + { + messageCount: -1, + compactedCount: -1, + tokenCount: 50, + sessionFile: "/tmp/session.jsonl", + }, + expect.objectContaining({ + sessionKey: "agent:main:session-1", + messageProvider: "telegram", + }), + ); + }); + + it("does not fire after_compaction when compaction fails", async () => { + hookRunner.hasHooks.mockReturnValue(true); + contextEngineCompactMock.mockResolvedValue({ + ok: false, + compacted: false, + reason: "nothing to compact", + result: undefined, + }); + + const result = await compactEmbeddedPiSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + enqueue: (task) => task(), + }); + + expect(result.ok).toBe(false); + expect(hookRunner.runBeforeCompaction).toHaveBeenCalled(); + expect(hookRunner.runAfterCompaction).not.toHaveBeenCalled(); + }); + + it("catches and logs hook exceptions without aborting compaction", async () => { + hookRunner.hasHooks.mockReturnValue(true); + hookRunner.runBeforeCompaction.mockRejectedValue(new Error("hook boom")); + + const result = await compactEmbeddedPiSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + enqueue: (task) => task(), + }); + + expect(result.ok).toBe(true); + expect(result.compacted).toBe(true); + expect(contextEngineCompactMock).toHaveBeenCalled(); + }); +}); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 91f99571db4..feba0f81493 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -936,6 +936,43 @@ export async function compactEmbeddedPiSession( modelContextWindow: ceModel?.contextWindow, defaultTokens: DEFAULT_CONTEXT_TOKENS, }); + // When the context engine owns compaction, its compact() implementation + // bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally). + // Fire before_compaction / after_compaction hooks here so plugin subscribers + // are notified regardless of which engine is active. + const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; + const hookRunner = engineOwnsCompaction ? getGlobalHookRunner() : null; + const hookSessionKey = params.sessionKey?.trim() || params.sessionId; + const { sessionAgentId } = resolveSessionAgentIds({ + sessionKey: params.sessionKey, + config: params.config, + }); + const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; + const hookCtx = { + sessionId: params.sessionId, + agentId: sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: resolveUserPath(params.workspaceDir), + messageProvider: resolvedMessageProvider, + }; + // Engine-owned compaction doesn't load the transcript at this level, so + // message counts are unavailable. We pass sessionFile so hook subscribers + // can read the transcript themselves if they need exact counts. + if (hookRunner?.hasHooks("before_compaction")) { + try { + await hookRunner.runBeforeCompaction( + { + messageCount: -1, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (err) { + log.warn("before_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + }); + } + } const result = await contextEngine.compact({ sessionId: params.sessionId, sessionFile: params.sessionFile, @@ -944,6 +981,23 @@ export async function compactEmbeddedPiSession( force: params.trigger === "manual", runtimeContext: params as Record, }); + if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) { + try { + await hookRunner.runAfterCompaction( + { + messageCount: -1, + compactedCount: -1, + tokenCount: result.result?.tokensAfter, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (err) { + log.warn("after_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + }); + } + } return { ok: result.ok, compacted: result.compacted, diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts index 8c7afc834d2..8c320f765be 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts @@ -9,16 +9,18 @@ export function makeOverflowError(message: string = DEFAULT_OVERFLOW_ERROR_MESSA export function makeCompactionSuccess(params: { summary: string; - firstKeptEntryId: string; - tokensBefore: number; + firstKeptEntryId?: string; + tokensBefore?: number; + tokensAfter?: number; }) { return { ok: true as const, compacted: true as const, result: { summary: params.summary, - firstKeptEntryId: params.firstKeptEntryId, - tokensBefore: params.tokensBefore, + ...(params.firstKeptEntryId ? { firstKeptEntryId: params.firstKeptEntryId } : {}), + ...(params.tokensBefore !== undefined ? { tokensBefore: params.tokensBefore } : {}), + ...(params.tokensAfter !== undefined ? { tokensAfter: params.tokensAfter } : {}), }, }; } @@ -55,8 +57,9 @@ type MockCompactDirect = { compacted: true; result: { summary: string; - firstKeptEntryId: string; - tokensBefore: number; + firstKeptEntryId?: string; + tokensBefore?: number; + tokensAfter?: number; }; }) => unknown; }; diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts index 5980170be62..7a2550ba1e9 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts @@ -2,9 +2,13 @@ import "./run.overflow-compaction.mocks.shared.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { isCompactionFailureError, isLikelyContextOverflowError } from "../pi-embedded-helpers.js"; -vi.mock("../../utils.js", () => ({ - resolveUserPath: vi.fn((p: string) => p), -})); +vi.mock(import("../../utils.js"), async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveUserPath: vi.fn((p: string) => p), + }; +}); import { log } from "./logger.js"; import { runEmbeddedPiAgent } from "./run.js"; @@ -16,6 +20,7 @@ import { queueOverflowAttemptWithOversizedToolOutput, } from "./run.overflow-compaction.fixture.js"; import { + mockedContextEngine, mockedCompactDirect, mockedRunEmbeddedAttempt, mockedSessionLikelyHasOversizedToolResults, @@ -30,6 +35,11 @@ const mockedIsLikelyContextOverflowError = vi.mocked(isLikelyContextOverflowErro describe("overflow compaction in run loop", () => { beforeEach(() => { vi.clearAllMocks(); + mockedRunEmbeddedAttempt.mockReset(); + mockedCompactDirect.mockReset(); + mockedSessionLikelyHasOversizedToolResults.mockReset(); + mockedTruncateOversizedToolResultsInSession.mockReset(); + mockedContextEngine.info.ownsCompaction = false; mockedIsCompactionFailureError.mockImplementation((msg?: string) => { if (!msg) { return false; @@ -72,7 +82,9 @@ describe("overflow compaction in run loop", () => { expect(mockedCompactDirect).toHaveBeenCalledTimes(1); expect(mockedCompactDirect).toHaveBeenCalledWith( - expect.objectContaining({ authProfileId: "test-profile" }), + expect.objectContaining({ + runtimeContext: expect.objectContaining({ authProfileId: "test-profile" }), + }), ); expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2); expect(log.warn).toHaveBeenCalledWith( diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts index 22dee7b49cd..51f711508b1 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts @@ -6,6 +6,25 @@ import type { PluginHookBeforePromptBuildResult, } from "../../plugins/types.js"; +type MockCompactionResult = + | { + ok: true; + compacted: true; + result: { + summary: string; + firstKeptEntryId?: string; + tokensBefore?: number; + tokensAfter?: number; + }; + reason?: string; + } + | { + ok: false; + compacted: false; + reason: string; + result?: undefined; + }; + export const mockedGlobalHookRunner = { hasHooks: vi.fn((_hookName: string) => false), runBeforeAgentStart: vi.fn( @@ -26,12 +45,35 @@ export const mockedGlobalHookRunner = { _ctx: PluginHookAgentContext, ): Promise => undefined, ), + runBeforeCompaction: vi.fn(async () => undefined), + runAfterCompaction: vi.fn(async () => undefined), }; +export const mockedContextEngine = { + info: { ownsCompaction: false as boolean }, + compact: vi.fn<(params: unknown) => Promise>(async () => ({ + ok: false as const, + compacted: false as const, + reason: "nothing to compact", + })), +}; + +export const mockedContextEngineCompact = vi.mocked(mockedContextEngine.compact); +export const mockedEnsureRuntimePluginsLoaded: (...args: unknown[]) => void = vi.fn(); + vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: vi.fn(() => mockedGlobalHookRunner), })); +vi.mock("../../context-engine/index.js", () => ({ + ensureContextEnginesInitialized: vi.fn(), + resolveContextEngine: vi.fn(async () => mockedContextEngine), +})); + +vi.mock("../runtime-plugins.js", () => ({ + ensureRuntimePluginsLoaded: mockedEnsureRuntimePluginsLoaded, +})); + vi.mock("../auth-profiles.js", () => ({ isProfileInCooldown: vi.fn(() => false), markAuthProfileFailure: vi.fn(async () => {}), @@ -141,9 +183,13 @@ vi.mock("../../process/command-queue.js", () => ({ enqueueCommandInLane: vi.fn((_lane: string, task: () => unknown) => task()), })); -vi.mock("../../utils/message-channel.js", () => ({ - isMarkdownCapableMessageChannel: vi.fn(() => true), -})); +vi.mock(import("../../utils/message-channel.js"), async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + isMarkdownCapableMessageChannel: vi.fn(() => true), + }; +}); vi.mock("../agent-paths.js", () => ({ resolveOpenClawAgentDir: vi.fn(() => "/tmp/agent-dir"), diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.shared-test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.shared-test.ts index 45bab82e1b8..c697ac9526a 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.shared-test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.shared-test.ts @@ -1,5 +1,8 @@ import { vi } from "vitest"; -import { compactEmbeddedPiSessionDirect } from "./compact.js"; +import { + mockedContextEngine, + mockedContextEngineCompact, +} from "./run.overflow-compaction.mocks.shared.js"; import { runEmbeddedAttempt } from "./run/attempt.js"; import { sessionLikelyHasOversizedToolResults, @@ -7,13 +10,14 @@ import { } from "./tool-result-truncation.js"; export const mockedRunEmbeddedAttempt = vi.mocked(runEmbeddedAttempt); -export const mockedCompactDirect = vi.mocked(compactEmbeddedPiSessionDirect); +export const mockedCompactDirect = mockedContextEngineCompact; export const mockedSessionLikelyHasOversizedToolResults = vi.mocked( sessionLikelyHasOversizedToolResults, ); export const mockedTruncateOversizedToolResultsInSession = vi.mocked( truncateOversizedToolResultsInSession, ); +export { mockedContextEngine }; export const overflowBaseRunParams = { sessionId: "test-session", diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index 19b4a81d279..b29394eedfd 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -11,6 +11,7 @@ import { } from "./run.overflow-compaction.fixture.js"; import { mockedGlobalHookRunner } from "./run.overflow-compaction.mocks.shared.js"; import { + mockedContextEngine, mockedCompactDirect, mockedRunEmbeddedAttempt, mockedSessionLikelyHasOversizedToolResults, @@ -22,6 +23,25 @@ const mockedPickFallbackThinkingLevel = vi.mocked(pickFallbackThinkingLevel); describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { beforeEach(() => { vi.clearAllMocks(); + mockedRunEmbeddedAttempt.mockReset(); + mockedCompactDirect.mockReset(); + mockedSessionLikelyHasOversizedToolResults.mockReset(); + mockedTruncateOversizedToolResultsInSession.mockReset(); + mockedGlobalHookRunner.runBeforeAgentStart.mockReset(); + mockedGlobalHookRunner.runBeforeCompaction.mockReset(); + mockedGlobalHookRunner.runAfterCompaction.mockReset(); + mockedContextEngine.info.ownsCompaction = false; + mockedCompactDirect.mockResolvedValue({ + ok: false, + compacted: false, + reason: "nothing to compact", + }); + mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false); + mockedTruncateOversizedToolResultsInSession.mockResolvedValue({ + truncated: false, + truncatedCount: 0, + reason: "no oversized tool results", + }); mockedGlobalHookRunner.hasHooks.mockImplementation(() => false); }); @@ -81,8 +101,12 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { expect(mockedCompactDirect).toHaveBeenCalledTimes(1); expect(mockedCompactDirect).toHaveBeenCalledWith( expect.objectContaining({ - trigger: "overflow", - authProfileId: "test-profile", + sessionId: "test-session", + sessionFile: "/tmp/session.json", + runtimeContext: expect.objectContaining({ + trigger: "overflow", + authProfileId: "test-profile", + }), }), ); }); @@ -132,6 +156,63 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { expect(result.meta.error?.kind).toBe("context_overflow"); }); + it("fires compaction hooks during overflow recovery for ownsCompaction engines", async () => { + mockedContextEngine.info.ownsCompaction = true; + mockedGlobalHookRunner.hasHooks.mockImplementation( + (hookName) => hookName === "before_compaction" || hookName === "after_compaction", + ); + mockedRunEmbeddedAttempt + .mockResolvedValueOnce(makeAttemptResult({ promptError: makeOverflowError() })) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + mockedCompactDirect.mockResolvedValueOnce({ + ok: true, + compacted: true, + result: { + summary: "engine-owned compaction", + tokensAfter: 50, + }, + }); + + await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledWith( + { messageCount: -1, sessionFile: "/tmp/session.json" }, + expect.objectContaining({ + sessionKey: "test-key", + }), + ); + expect(mockedGlobalHookRunner.runAfterCompaction).toHaveBeenCalledWith( + { + messageCount: -1, + compactedCount: -1, + tokenCount: 50, + sessionFile: "/tmp/session.json", + }, + expect.objectContaining({ + sessionKey: "test-key", + }), + ); + }); + + it("guards thrown engine-owned overflow compaction attempts", async () => { + mockedContextEngine.info.ownsCompaction = true; + mockedGlobalHookRunner.hasHooks.mockImplementation( + (hookName) => hookName === "before_compaction" || hookName === "after_compaction", + ); + mockedRunEmbeddedAttempt.mockResolvedValueOnce( + makeAttemptResult({ promptError: makeOverflowError() }), + ); + mockedCompactDirect.mockRejectedValueOnce(new Error("engine boom")); + + const result = await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedCompactDirect).toHaveBeenCalledTimes(1); + expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledTimes(1); + expect(mockedGlobalHookRunner.runAfterCompaction).not.toHaveBeenCalled(); + expect(result.meta.error?.kind).toBe("context_overflow"); + expect(result.payloads?.[0]?.isError).toBe(true); + }); + it("returns retry_limit when repeated retries never converge", async () => { mockedRunEmbeddedAttempt.mockClear(); mockedCompactDirect.mockClear(); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index a28d74bf71e..09d5adda724 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1028,37 +1028,84 @@ export async function runEmbeddedPiAgent( log.warn( `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, ); - const compactResult = await contextEngine.compact({ - sessionId: params.sessionId, - sessionFile: params.sessionFile, - tokenBudget: ctxInfo.tokens, - force: true, - compactionTarget: "budget", - runtimeContext: { - sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - authProfileId: lastProfileId, - workspaceDir: resolvedWorkspace, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - provider, - model: modelId, - runId: params.runId, - thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - trigger: "overflow", - diagId: overflowDiagId, - attempt: overflowCompactionAttempts, - maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, - }, - }); + let compactResult: Awaited>; + // When the engine owns compaction, hooks are not fired inside + // compactEmbeddedPiSessionDirect (which is bypassed). Fire them + // here so subscribers (memory extensions, usage trackers) are + // notified even on overflow-recovery compactions. + const overflowEngineOwnsCompaction = contextEngine.info.ownsCompaction === true; + const overflowHookRunner = overflowEngineOwnsCompaction ? hookRunner : null; + if (overflowHookRunner?.hasHooks("before_compaction")) { + try { + await overflowHookRunner.runBeforeCompaction( + { messageCount: -1, sessionFile: params.sessionFile }, + hookCtx, + ); + } catch (hookErr) { + log.warn( + `before_compaction hook failed during overflow recovery: ${String(hookErr)}`, + ); + } + } + try { + compactResult = await contextEngine.compact({ + sessionId: params.sessionId, + sessionFile: params.sessionFile, + tokenBudget: ctxInfo.tokens, + force: true, + compactionTarget: "budget", + runtimeContext: { + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + authProfileId: lastProfileId, + workspaceDir: resolvedWorkspace, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + provider, + model: modelId, + runId: params.runId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + trigger: "overflow", + diagId: overflowDiagId, + attempt: overflowCompactionAttempts, + maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + }, + }); + } catch (compactErr) { + log.warn( + `contextEngine.compact() threw during overflow recovery for ${provider}/${modelId}: ${String(compactErr)}`, + ); + compactResult = { ok: false, compacted: false, reason: String(compactErr) }; + } + if ( + compactResult.ok && + compactResult.compacted && + overflowHookRunner?.hasHooks("after_compaction") + ) { + try { + await overflowHookRunner.runAfterCompaction( + { + messageCount: -1, + compactedCount: -1, + tokenCount: compactResult.result?.tokensAfter, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (hookErr) { + log.warn( + `after_compaction hook failed during overflow recovery: ${String(hookErr)}`, + ); + } + } if (compactResult.compacted) { autoCompactionCount += 1; log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);