From fecd4fcc5568016dfcea63ca329c770ba79afbb0 Mon Sep 17 00:00:00 2001 From: Bikkies Date: Tue, 14 Apr 2026 18:42:14 +1000 Subject: [PATCH] fix(agents) context-engine: per-iteration ingest and assemble for compaction (#63555) Merged via squash. Prepared head SHA: 0d815fc190f5b2fd976fea3075e64fef043b1e55 Co-authored-by: Bikkies <29473797+Bikkies@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 2 + .../run/attempt.context-engine-helpers.ts | 67 ++-- src/agents/pi-embedded-runner/run/attempt.ts | 61 ++- .../tool-result-context-guard.test.ts | 367 +++++++++++++++++- .../tool-result-context-guard.ts | 101 +++++ 5 files changed, 547 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0da2897b890..3556c55ad58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai - Browser/SSRF: preserve explicit strict browser navigation mode for legacy `browser.ssrfPolicy.allowPrivateNetwork: false` configs by normalizing the legacy alias to the canonical strict marker instead of silently widening those installs to the default non-strict hostname-navigation path. - Agents/subagents: emit the subagent registry lazy-runtime stub on the stable dist path that both source and bundled runtime imports resolve, so the follow-up dist fix no longer still fails with `ERR_MODULE_NOT_FOUND` at runtime. (#66420) Thanks @obviyus. - Browser: keep loopback CDP readiness checks reachable under strict SSRF defaults so OpenClaw can reconnect to locally started managed Chrome. (#66354) Thanks @hxy91819. +- Agents/context engine: compact engine-owned sessions from the first tool-loop delta and preserve ingest fallback when `afterTurn` is absent, so long-running tool loops can stay bounded without dropping engine state. (#63555) Thanks @Bikkies. ## 2026.4.14-beta.1 @@ -330,6 +331,7 @@ Docs: https://docs.openclaw.ai - Agents/inbound metadata: strip NUL bytes from serialized inbound context blocks before they reach backend spawn args, so malformed message metadata cannot crash agent spawn with `ERR_INVALID_ARG_VALUE`. (#65389) Thanks @adminfedres and @vincentkoc. - iMessage: retry transient `watch.subscribe` startup failures before tearing down the monitor, so brief local transport stalls do not immediately bounce the channel. (#65393) Thanks @vincentkoc. - Status/session_status: move shared session status text into a neutral internal status module and keep the tool importing a local runtime shim, so built `session_status` no longer depends on reply command internals or a bundler-opaque runtime import. (#65807) Thanks @dutifulbob. +- QQBot/security: replace raw `fetch()` in the image-size probe with SSRF-guarded `fetchRemoteMedia`, fix `resolveRepoRoot()` to walk up to `.git` instead of hardcoding two parent levels, and refresh the raw-fetch allowlist to match the corrected scan. (#63495) Thanks @dims. ## 2026.4.9 diff --git a/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts b/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts index 94c363a5866..6b89298d904 100644 --- a/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts +++ b/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts @@ -207,50 +207,51 @@ export async function finalizeAttemptContextEngineTurn(params: { let postTurnFinalizationSucceeded = true; if (typeof params.contextEngine.afterTurn === "function") { - try { - await params.contextEngine.afterTurn({ - sessionId: params.sessionIdUsed, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - messages: params.messagesSnapshot, - prePromptMessageCount: params.prePromptMessageCount, - tokenBudget: params.tokenBudget, - runtimeContext: params.runtimeContext, - }); - } catch (afterTurnErr) { - postTurnFinalizationSucceeded = false; - params.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); - } - } else { - const newMessages = params.messagesSnapshot.slice(params.prePromptMessageCount); - if (newMessages.length > 0) { - if (typeof params.contextEngine.ingestBatch === "function") { - try { - await params.contextEngine.ingestBatch({ - sessionId: params.sessionIdUsed, - sessionKey: params.sessionKey, - messages: newMessages, - }); - } catch (ingestErr) { - postTurnFinalizationSucceeded = false; - params.warn(`context engine ingest failed: ${String(ingestErr)}`); - } - } else { - for (const msg of newMessages) { + try { + await params.contextEngine.afterTurn({ + sessionId: params.sessionIdUsed, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + messages: params.messagesSnapshot, + prePromptMessageCount: params.prePromptMessageCount, + tokenBudget: params.tokenBudget, + runtimeContext: params.runtimeContext, + }); + } catch (afterTurnErr) { + postTurnFinalizationSucceeded = false; + params.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); + } + } else { + const newMessages = params.messagesSnapshot.slice(params.prePromptMessageCount); + if (newMessages.length > 0) { + if (typeof params.contextEngine.ingestBatch === "function") { try { - await params.contextEngine.ingest?.({ + await params.contextEngine.ingestBatch({ sessionId: params.sessionIdUsed, sessionKey: params.sessionKey, - message: msg, + messages: newMessages, }); } catch (ingestErr) { postTurnFinalizationSucceeded = false; params.warn(`context engine ingest failed: ${String(ingestErr)}`); } + } else { + for (const msg of newMessages) { + try { + await params.contextEngine.ingest?.({ + sessionId: params.sessionIdUsed, + sessionKey: params.sessionKey, + message: msg, + }); + } catch (ingestErr) { + postTurnFinalizationSucceeded = false; + params.warn(`context engine ingest failed: ${String(ingestErr)}`); + } + } } } } - } + if ( !params.promptError && diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 3ad5c5fc65b..38aca027ecf 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -161,7 +161,10 @@ import { } from "../system-prompt.js"; import { dropThinkingBlocks } from "../thinking.js"; import { collectAllowedToolNames } from "../tool-name-allowlist.js"; -import { installToolResultContextGuard } from "../tool-result-context-guard.js"; +import { + installContextEngineLoopHook, + installToolResultContextGuard, +} from "../tool-result-context-guard.js"; import { truncateOversizedToolResultsInSessionManager } from "../tool-result-truncation.js"; import { logProviderToolSchemaDiagnostics, @@ -988,21 +991,35 @@ export async function runEmbeddedAttempt( throw new Error("Embedded agent session missing"); } const activeSession = session; + let prePromptMessageCount = activeSession.messages.length; abortSessionForYield = () => { yieldAbortSettled = Promise.resolve(activeSession.abort()); }; queueYieldInterruptForSession = () => { queueSessionsYieldInterruptMessage(activeSession); }; - removeToolResultContextGuard = installToolResultContextGuard({ - agent: activeSession.agent, - contextWindowTokens: Math.max( - 1, - Math.floor( - params.model.contextWindow ?? params.model.maxTokens ?? DEFAULT_CONTEXT_TOKENS, + if (params.contextEngine?.info?.ownsCompaction !== true) { + removeToolResultContextGuard = installToolResultContextGuard({ + agent: activeSession.agent, + contextWindowTokens: Math.max( + 1, + Math.floor( + params.model.contextWindow ?? params.model.maxTokens ?? DEFAULT_CONTEXT_TOKENS, + ), ), - ), - }); + }); + } else { + removeToolResultContextGuard = installContextEngineLoopHook({ + agent: activeSession.agent, + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + tokenBudget: params.contextTokenBudget, + modelId: params.modelId, + getPrePromptMessageCount: () => prePromptMessageCount, + }); + } const cacheTrace = createCacheTrace({ cfg: params.config, env: process.env, @@ -1647,7 +1664,6 @@ export async function runEmbeddedAttempt( let promptError: unknown = null; let preflightRecovery: EmbeddedRunAttemptResult["preflightRecovery"]; let promptErrorSource: "prompt" | "compaction" | "precheck" | null = null; - let prePromptMessageCount = activeSession.messages.length; let skipPromptSubmission = false; try { const promptStartedAt = Date.now(); @@ -1900,13 +1916,24 @@ export async function runEmbeddedAttempt( const reserveTokens = settingsManager.getCompactionReserveTokens(); const contextTokenBudget = params.contextTokenBudget ?? DEFAULT_CONTEXT_TOKENS; - const preemptiveCompaction = shouldPreemptivelyCompactBeforePrompt({ - messages: activeSession.messages, - systemPrompt: systemPromptText, - prompt: effectivePrompt, - contextTokenBudget, - reserveTokens, - }); + const preemptiveCompaction = + params.contextEngine?.info?.ownsCompaction === true + ? { + route: "fits" as const, + shouldCompact: false, + estimatedPromptTokens: 0, + promptBudgetBeforeReserve: 0, + overflowTokens: 0, + toolResultReducibleChars: 0, + effectiveReserveTokens: reserveTokens, + } + : shouldPreemptivelyCompactBeforePrompt({ + messages: activeSession.messages, + systemPrompt: systemPromptText, + prompt: effectivePrompt, + contextTokenBudget, + reserveTokens, + }); if (preemptiveCompaction.route === "truncate_tool_results_only") { const truncationResult = truncateOversizedToolResultsInSessionManager({ sessionManager, diff --git a/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts b/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts index 91fbca21bb3..2c8f610e07a 100644 --- a/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts +++ b/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts @@ -1,9 +1,11 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; +import type { ContextEngine } from "../../context-engine/types.js"; import { castAgentMessage } from "../test-helpers/agent-message-fixtures.js"; import { CONTEXT_LIMIT_TRUNCATION_NOTICE, formatContextLimitTruncationNotice, + installContextEngineLoopHook, installToolResultContextGuard, PREEMPTIVE_CONTEXT_OVERFLOW_MESSAGE, } from "./tool-result-context-guard.js"; @@ -239,3 +241,366 @@ describe("installToolResultContextGuard", () => { expectPiStyleTruncation(getToolResultText(transformed[0])); }); }); + +type MockedEngine = ContextEngine & { + afterTurn: ReturnType; + assemble: ReturnType; + ingest: ReturnType; + ingestBatch?: ReturnType; +}; + +function makeMockEngine( + overrides: { + assemble?: ( + params: Parameters[0], + ) => Promise<{ messages: AgentMessage[]; estimatedTokens: number }>; + afterTurn?: (params: Parameters>[0]) => Promise; + omitAfterTurn?: boolean; + ingest?: (params: Parameters[0]) => Promise<{ ingested: boolean }>; + ingestBatch?: ( + params: Parameters>[0], + ) => Promise<{ ingestedCount: number }>; + omitIngestBatch?: boolean; + } = {}, +): MockedEngine { + const defaultAfterTurn = vi.fn(async () => {}); + const defaultAssemble = vi.fn(async (params: Parameters[0]) => ({ + messages: params.messages, + estimatedTokens: 0, + })); + const defaultIngest = vi.fn(async () => ({ ingested: true })); + const defaultIngestBatch = vi.fn( + async (params: Parameters>[0]) => ({ + ingestedCount: params.messages.length, + }), + ); + const afterTurn = overrides.omitAfterTurn + ? undefined + : overrides.afterTurn + ? vi.fn(overrides.afterTurn) + : defaultAfterTurn; + const assemble = overrides.assemble ? vi.fn(overrides.assemble) : defaultAssemble; + const ingest = overrides.ingest ? vi.fn(overrides.ingest) : defaultIngest; + const ingestBatch = overrides.omitIngestBatch + ? undefined + : overrides.ingestBatch + ? vi.fn(overrides.ingestBatch) + : defaultIngestBatch; + const engine = { + info: { + id: "test-engine", + name: "Test Engine", + version: "0.0.1", + ownsCompaction: true, + }, + ingest, + assemble, + ...(ingestBatch ? { ingestBatch } : {}), + ...(afterTurn ? { afterTurn } : {}), + } as unknown as MockedEngine; + return engine; +} + +async function callTransform( + agent: { transformContext?: (messages: AgentMessage[], signal: AbortSignal) => unknown }, + messages: AgentMessage[], +) { + return await agent.transformContext?.(messages, new AbortController().signal); +} + +describe("installContextEngineLoopHook", () => { + const sessionId = "test-session-id"; + const sessionKey = "agent:main:subagent:test"; + const sessionFile = "/tmp/test-session.jsonl"; + const tokenBudget = 4096; + const modelId = "test-model"; + + function installHook( + agent: ReturnType, + engine: MockedEngine, + prePromptCount?: number, + ): () => void { + return installContextEngineLoopHook({ + agent, + contextEngine: engine, + sessionId, + sessionKey, + sessionFile, + tokenBudget, + modelId, + ...(prePromptCount !== undefined ? { getPrePromptMessageCount: () => prePromptCount } : {}), + }); + } + + it("returns early when the current messages match the pre-prompt baseline", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine, 2); + + const messages = [makeUser("first"), makeToolResult("call_1", "result")]; + const transformed = await callTransform(agent, messages); + + expect(transformed).toBe(messages); + expect(engine.afterTurn).not.toHaveBeenCalled(); + expect(engine.assemble).not.toHaveBeenCalled(); + }); + + it("processes the first call when messages already exceed the pre-prompt baseline", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine, 1); + + const messages = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, messages); + + expect(engine.afterTurn).toHaveBeenCalledTimes(1); + expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({ + prePromptMessageCount: 1, + messages, + }); + expect(engine.assemble).toHaveBeenCalledTimes(1); + }); + + it("calls afterTurn and assemble when new messages are appended after the first call", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + await callTransform(agent, withNew); + + expect(engine.afterTurn).toHaveBeenCalledTimes(1); + expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({ + prePromptMessageCount: 2, + messages: withNew, + }); + expect(engine.assemble).toHaveBeenCalledTimes(1); + }); + + it("advances the fence across multiple iterations", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const batch0 = [makeUser("h1"), makeToolResult("c1", "r1")]; + await callTransform(agent, batch0); + + const batch1 = [...batch0, makeUser("h2"), makeToolResult("c2", "r2")]; + await callTransform(agent, batch1); + + const batch2 = [...batch1, makeUser("h3"), makeToolResult("c3", "r3")]; + await callTransform(agent, batch2); + + expect(engine.afterTurn).toHaveBeenCalledTimes(2); + expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(2); + expect(engine.afterTurn.mock.calls[1]?.[0]?.prePromptMessageCount).toBe(4); + }); + + it("skips afterTurn and assemble when messages have not changed", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const messages = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, messages); + await callTransform(agent, messages); + await callTransform(agent, messages); + + expect(engine.afterTurn).not.toHaveBeenCalled(); + expect(engine.assemble).not.toHaveBeenCalled(); + }); + + it("returns the assembled view when its length differs from the source", async () => { + const agent = makeGuardableAgent(); + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "r")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(compactedView); + }); + + it("returns the assembled view when the engine rewrites content without changing count", async () => { + const agent = makeGuardableAgent(); + const rewrittenView = [makeUser("rewritten-1"), makeUser("rewritten-2")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: rewrittenView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "r")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + // Same count (2) but different array reference — engine's view should be used + expect(transformed).toBe(rewrittenView); + }); + + it("returns the source when the engine returns the same array reference", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(withNew); + }); + + it("does not mutate the source messages array", async () => { + const agent = makeGuardableAgent(); + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const sourceMessages = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const sourceCopy = [...sourceMessages]; + await callTransform(agent, sourceMessages); + + expect(sourceMessages).toEqual(sourceCopy); + }); + + it("ingests new messages in batches when afterTurn is absent", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine({ omitAfterTurn: true }); + installHook(agent, engine); + + const batch0 = [makeUser("first"), makeToolResult("call_1", "r1")]; + await callTransform(agent, batch0); + + const batch1 = [...batch0, makeUser("second"), makeToolResult("call_2", "r2")]; + await callTransform(agent, batch1); + + const batch2 = [...batch1, makeUser("third"), makeToolResult("call_3", "r3")]; + await callTransform(agent, batch2); + + expect(engine.ingestBatch).toHaveBeenCalledTimes(2); + expect(engine.ingestBatch?.mock.calls[0]?.[0]?.messages).toEqual(batch1.slice(2)); + expect(engine.ingestBatch?.mock.calls[1]?.[0]?.messages).toEqual(batch2.slice(4)); + expect(engine.assemble).toHaveBeenCalledTimes(2); + }); + + it("falls back to per-message ingest when ingestBatch is absent", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine({ omitAfterTurn: true, omitIngestBatch: true }); + installHook(agent, engine, 1); + + const messages = [makeUser("first"), makeToolResult("call_1", "r1")]; + await callTransform(agent, messages); + + expect(engine.ingest).toHaveBeenCalledTimes(1); + expect(engine.ingest.mock.calls[0]?.[0]).toMatchObject({ + sessionId, + sessionKey, + message: makeToolResult("call_1", "r1"), + }); + expect(engine.assemble).toHaveBeenCalledTimes(1); + }); + + it("falls through to source messages when engine.afterTurn throws", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine({ + afterTurn: async () => { + throw new Error("engine afterTurn boom"); + }, + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(withNew); + }); + + it("falls through to source messages when engine.assemble throws", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine({ + assemble: async () => { + throw new Error("engine assemble boom"); + }, + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(withNew); + }); + + it("invokes any pre-existing transformContext before the engine sees messages", async () => { + const upstream = vi.fn(async (messages: AgentMessage[]) => [...messages, makeUser("appended")]); + const agent = makeGuardableAgent(upstream); + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + // First call: upstream runs (1 msg -> 2 msgs), fence set to 2, returns early + await callTransform(agent, [makeUser("first")]); + expect(upstream).toHaveBeenCalledTimes(1); + + // Second call: upstream runs (2 msgs -> 3 msgs), hasNewMessages = true, assemble fires + const transformed = await callTransform(agent, [makeUser("first"), makeUser("second")]); + expect(upstream).toHaveBeenCalledTimes(2); + expect(transformed).toBe(compactedView); + }); + + it("restores the previous transformContext when the returned dispose is called", async () => { + const upstream = vi.fn(async (messages: AgentMessage[]) => messages); + const agent = makeGuardableAgent(upstream); + const engine = makeMockEngine(); + const dispose = installHook(agent, engine); + + dispose(); + + expect(agent.transformContext).toBe(upstream); + }); + + it("returns the cached assembled view on unchanged iterations instead of raw source", async () => { + const agent = makeGuardableAgent(); + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "r")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeToolResult("call_2", "r2")]; + const firstResult = await callTransform(agent, withNew); + expect(firstResult).toBe(compactedView); + + // Retry with same messages: should return cached assembled view, not raw + const retryResult = await callTransform(agent, withNew); + expect(retryResult).toBe(compactedView); + expect(engine.assemble).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/agents/pi-embedded-runner/tool-result-context-guard.ts b/src/agents/pi-embedded-runner/tool-result-context-guard.ts index f4dcc9005bf..601097d58b7 100644 --- a/src/agents/pi-embedded-runner/tool-result-context-guard.ts +++ b/src/agents/pi-embedded-runner/tool-result-context-guard.ts @@ -1,4 +1,5 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { ContextEngine } from "../../context-engine/types.js"; import { CHARS_PER_TOKEN_ESTIMATE, TOOL_RESULT_CHARS_PER_TOKEN_ESTIMATE, @@ -183,6 +184,106 @@ function enforceToolResultLimitInPlace(params: { } } +/** + * Per-iteration `afterTurn` + `assemble` wrapper for sessions where + * the context engine owns compaction. Lets the engine compact inside + * a long tool loop instead of only at end of attempt. + */ +export function installContextEngineLoopHook(params: { + agent: GuardableAgent; + contextEngine: ContextEngine; + sessionId: string; + sessionKey?: string; + sessionFile: string; + tokenBudget?: number; + modelId: string; + getPrePromptMessageCount?: () => number; +}): () => void { + const { contextEngine, sessionId, sessionKey, sessionFile, tokenBudget, modelId } = params; + const mutableAgent = params.agent as GuardableAgentRecord; + const originalTransformContext = mutableAgent.transformContext; + let lastSeenLength: number | null = null; + let lastAssembledView: AgentMessage[] | null = null; + + mutableAgent.transformContext = (async (messages: AgentMessage[], signal: AbortSignal) => { + const transformed = originalTransformContext + ? await originalTransformContext.call(mutableAgent, messages, signal) + : messages; + const sourceMessages = Array.isArray(transformed) ? transformed : messages; + + // Seed the loop fence from the attempt's pre-prompt message count when available. + // This keeps the first real post-tool-call iteration eligible for compaction even + // if the hook's first observed call happens after tool results were appended. + const prePromptMessageCount = Math.max( + 0, + Math.min( + sourceMessages.length, + lastSeenLength ?? params.getPrePromptMessageCount?.() ?? sourceMessages.length, + ), + ); + lastSeenLength = prePromptMessageCount; + + const hasNewMessages = sourceMessages.length > prePromptMessageCount; + if (!hasNewMessages) { + return lastAssembledView ?? sourceMessages; + } + + try { + if (typeof contextEngine.afterTurn === "function") { + await contextEngine.afterTurn({ + sessionId, + sessionKey, + sessionFile, + messages: sourceMessages, + prePromptMessageCount, + tokenBudget, + }); + } else { + const newMessages = sourceMessages.slice(prePromptMessageCount); + if (newMessages.length > 0) { + if (typeof contextEngine.ingestBatch === "function") { + await contextEngine.ingestBatch({ + sessionId, + sessionKey, + messages: newMessages, + }); + } else { + for (const message of newMessages) { + await contextEngine.ingest({ + sessionId, + sessionKey, + message, + }); + } + } + } + } + lastSeenLength = sourceMessages.length; + const assembled = await contextEngine.assemble({ + sessionId, + sessionKey, + messages: sourceMessages, + tokenBudget, + model: modelId, + }); + if (assembled && Array.isArray(assembled.messages) && assembled.messages !== sourceMessages) { + lastAssembledView = assembled.messages; + return assembled.messages; + } + lastAssembledView = null; + } catch { + // Best-effort: any engine failure falls through to the raw source + // messages so the tool loop still makes forward progress. + } + + return sourceMessages; + }) as GuardableTransformContext; + + return () => { + mutableAgent.transformContext = originalTransformContext; + }; +} + export function installToolResultContextGuard(params: { agent: GuardableAgent; contextWindowTokens: number;