diff --git a/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts b/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts index 94c363a5866..b3d941ba9be 100644 --- a/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts +++ b/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts @@ -199,6 +199,9 @@ export async function finalizeAttemptContextEngineTurn(params: { }) => Promise; sessionManager: unknown; warn: (message: string) => void; + /** When true, skip the afterTurn/ingest calls because the loop hook already + * handled per-iteration ingestion during the tool loop. Maintenance still runs. */ + skipAfterTurn?: boolean; }) { if (!params.contextEngine) { return { postTurnFinalizationSucceeded: true }; @@ -206,47 +209,49 @@ export async function finalizeAttemptContextEngineTurn(params: { let postTurnFinalizationSucceeded = true; - if (typeof params.contextEngine.afterTurn === "function") { - try { - await params.contextEngine.afterTurn({ - sessionId: params.sessionIdUsed, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - messages: params.messagesSnapshot, - prePromptMessageCount: params.prePromptMessageCount, - tokenBudget: params.tokenBudget, - runtimeContext: params.runtimeContext, - }); - } catch (afterTurnErr) { - postTurnFinalizationSucceeded = false; - params.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); - } - } else { - const newMessages = params.messagesSnapshot.slice(params.prePromptMessageCount); - if (newMessages.length > 0) { - if (typeof params.contextEngine.ingestBatch === "function") { - try { - await params.contextEngine.ingestBatch({ - sessionId: params.sessionIdUsed, - sessionKey: params.sessionKey, - messages: newMessages, - }); - } catch (ingestErr) { - postTurnFinalizationSucceeded = false; - params.warn(`context engine ingest failed: ${String(ingestErr)}`); - } - } else { - for (const msg of newMessages) { + if (!params.skipAfterTurn) { + if (typeof params.contextEngine.afterTurn === "function") { + try { + await params.contextEngine.afterTurn({ + sessionId: params.sessionIdUsed, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + messages: params.messagesSnapshot, + prePromptMessageCount: params.prePromptMessageCount, + tokenBudget: params.tokenBudget, + runtimeContext: params.runtimeContext, + }); + } catch (afterTurnErr) { + postTurnFinalizationSucceeded = false; + params.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); + } + } else { + const newMessages = params.messagesSnapshot.slice(params.prePromptMessageCount); + if (newMessages.length > 0) { + if (typeof params.contextEngine.ingestBatch === "function") { try { - await params.contextEngine.ingest?.({ + await params.contextEngine.ingestBatch({ sessionId: params.sessionIdUsed, sessionKey: params.sessionKey, - message: msg, + messages: newMessages, }); } catch (ingestErr) { postTurnFinalizationSucceeded = false; params.warn(`context engine ingest failed: ${String(ingestErr)}`); } + } else { + for (const msg of newMessages) { + try { + await params.contextEngine.ingest?.({ + sessionId: params.sessionIdUsed, + sessionKey: params.sessionKey, + message: msg, + }); + } catch (ingestErr) { + postTurnFinalizationSucceeded = false; + params.warn(`context engine ingest failed: ${String(ingestErr)}`); + } + } } } } diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index dc93e3b6691..59252a3ca5e 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -997,11 +997,7 @@ export async function runEmbeddedAttempt( queueYieldInterruptForSession = () => { queueSessionsYieldInterruptMessage(activeSession); }; - // Hoisted so installContextEngineLoopHook can read the same fence value - // that finalizeAttemptContextEngineTurn uses at end-of-attempt. The - // initial value is reassigned in the prompt-build phase below, after - // any heartbeat message filtering. - let prePromptMessageCount = 0; + let loopHookActive = false; if (params.contextEngine?.info?.ownsCompaction !== true) { removeToolResultContextGuard = installToolResultContextGuard({ agent: activeSession.agent, @@ -1021,8 +1017,8 @@ export async function runEmbeddedAttempt( sessionFile: params.sessionFile, tokenBudget: params.contextTokenBudget, modelId: params.modelId, - getPrePromptMessageCount: () => prePromptMessageCount, }); + loopHookActive = true; } const cacheTrace = createCacheTrace({ cfg: params.config, @@ -1668,7 +1664,7 @@ export async function runEmbeddedAttempt( let promptError: unknown = null; let preflightRecovery: EmbeddedRunAttemptResult["preflightRecovery"]; let promptErrorSource: "prompt" | "compaction" | "precheck" | null = null; - prePromptMessageCount = activeSession.messages.length; + let prePromptMessageCount = activeSession.messages.length; let skipPromptSubmission = false; try { const promptStartedAt = Date.now(); @@ -2222,6 +2218,7 @@ export async function runEmbeddedAttempt( prePromptMessageCount, tokenBudget: params.contextTokenBudget, runtimeContext: afterTurnRuntimeContext, + skipAfterTurn: loopHookActive, runMaintenance: async (contextParams) => await runContextEngineMaintenance({ contextEngine: contextParams.contextEngine as never, diff --git a/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts b/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts index 598430b1e9f..48f25c9b61b 100644 --- a/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts +++ b/src/agents/pi-embedded-runner/tool-result-context-guard.test.ts @@ -295,270 +295,204 @@ describe("installContextEngineLoopHook", () => { const tokenBudget = 4096; const modelId = "test-model"; - it("forwards new messages to engine.afterTurn with prePromptMessageCount=0 on first call", async () => { + function installHook( + agent: ReturnType, + engine: MockedEngine, + ): () => void { + return installContextEngineLoopHook({ + agent, + contextEngine: engine, + sessionId, + sessionKey, + sessionFile, + tokenBudget, + modelId, + }); + } + + it("returns early on the first call without calling afterTurn or assemble", async () => { const agent = makeGuardableAgent(); const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const messages = [makeUser("first"), makeToolResult("call_1", "result")]; - await callTransform(agent, messages); - - expect(engine.afterTurn).toHaveBeenCalledTimes(1); - expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({ - sessionId, - sessionKey, - sessionFile, - messages, - prePromptMessageCount: 0, - tokenBudget, - }); - }); - - it("advances prePromptMessageCount on subsequent calls based on the previous high-water mark", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const firstBatch = [makeUser("first"), makeToolResult("call_1", "result")]; - await callTransform(agent, firstBatch); - - const secondBatch = [ - makeUser("first"), - makeToolResult("call_1", "result"), - makeUser("second"), - makeToolResult("call_2", "result two"), - ]; - await callTransform(agent, secondBatch); - - expect(engine.afterTurn).toHaveBeenCalledTimes(2); - expect(engine.afterTurn.mock.calls[1]?.[0]).toMatchObject({ - prePromptMessageCount: 2, - messages: secondBatch, - }); - }); - - it("does not call engine.afterTurn when no new messages have been appended", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const messages = [makeUser("first"), makeToolResult("call_1", "result")]; - await callTransform(agent, messages); - await callTransform(agent, messages); - - expect(engine.afterTurn).toHaveBeenCalledTimes(1); - }); - - it("returns the engine's assembled view when its length differs from the source", async () => { - const agent = makeGuardableAgent(); - const compactedView = [makeUser("compacted")]; - const engine = makeMockEngine({ - assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), - }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const sourceMessages = [ - makeUser("first"), - makeToolResult("call_1", "result"), - makeToolResult("call_2", "result two"), - ]; - const transformed = await callTransform(agent, sourceMessages); - - expect(transformed).toBe(compactedView); - expect(transformed).not.toBe(sourceMessages); - }); - - it("returns the source messages when the engine's assembled view has the same length", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const sourceMessages = [makeUser("first"), makeToolResult("call_1", "result")]; - const transformed = await callTransform(agent, sourceMessages); - - expect(transformed).toBe(sourceMessages); - }); - - it("does not mutate the source messages array even when the engine returns a different view", async () => { - const agent = makeGuardableAgent(); - const compactedView = [makeUser("compacted")]; - const engine = makeMockEngine({ - assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), - }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const sourceMessages = [makeUser("first"), makeToolResult("call_1", "result")]; - const sourceCopy = [...sourceMessages]; - await callTransform(agent, sourceMessages); - - expect(sourceMessages).toEqual(sourceCopy); - expect(sourceMessages).toHaveLength(2); - }); - - it("skips the afterTurn call when the engine does not implement it", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine({ omitAfterTurn: true }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); + installHook(agent, engine); const messages = [makeUser("first"), makeToolResult("call_1", "result")]; const transformed = await callTransform(agent, messages); expect(transformed).toBe(messages); + expect(engine.afterTurn).not.toHaveBeenCalled(); + expect(engine.assemble).not.toHaveBeenCalled(); + }); + + it("calls afterTurn and assemble when new messages are appended after the first call", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + await callTransform(agent, withNew); + + expect(engine.afterTurn).toHaveBeenCalledTimes(1); + expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({ + prePromptMessageCount: 2, + messages: withNew, + }); expect(engine.assemble).toHaveBeenCalledTimes(1); }); - it("keeps calling assemble on subsequent iterations when engine lacks afterTurn but new messages arrive", async () => { + it("advances the fence across multiple iterations", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const batch0 = [makeUser("h1"), makeToolResult("c1", "r1")]; + await callTransform(agent, batch0); + + const batch1 = [...batch0, makeUser("h2"), makeToolResult("c2", "r2")]; + await callTransform(agent, batch1); + + const batch2 = [...batch1, makeUser("h3"), makeToolResult("c3", "r3")]; + await callTransform(agent, batch2); + + expect(engine.afterTurn).toHaveBeenCalledTimes(2); + expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(2); + expect(engine.afterTurn.mock.calls[1]?.[0]?.prePromptMessageCount).toBe(4); + }); + + it("skips afterTurn and assemble when messages have not changed", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const messages = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, messages); + await callTransform(agent, messages); + await callTransform(agent, messages); + + expect(engine.afterTurn).not.toHaveBeenCalled(); + expect(engine.assemble).not.toHaveBeenCalled(); + }); + + it("returns the assembled view when its length differs from the source", async () => { + const agent = makeGuardableAgent(); + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "r")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(compactedView); + }); + + it("returns the source messages when the assembled view has the same length", async () => { + const agent = makeGuardableAgent(); + const engine = makeMockEngine(); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(withNew); + }); + + it("does not mutate the source messages array", async () => { + const agent = makeGuardableAgent(); + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), + }); + installHook(agent, engine); + + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); + + const sourceMessages = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const sourceCopy = [...sourceMessages]; + await callTransform(agent, sourceMessages); + + expect(sourceMessages).toEqual(sourceCopy); + }); + + it("still calls assemble when engine lacks afterTurn but new messages arrive", async () => { const agent = makeGuardableAgent(); const engine = makeMockEngine({ omitAfterTurn: true }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); + installHook(agent, engine); - const batch1 = [makeUser("first"), makeToolResult("call_1", "r1")]; + const batch0 = [makeUser("first"), makeToolResult("call_1", "r1")]; + await callTransform(agent, batch0); + + const batch1 = [...batch0, makeUser("second"), makeToolResult("call_2", "r2")]; await callTransform(agent, batch1); - expect(engine.assemble).toHaveBeenCalledTimes(1); - const batch2 = [...batch1, makeUser("second"), makeToolResult("call_2", "r2")]; + const batch2 = [...batch1, makeUser("third"), makeToolResult("call_3", "r3")]; await callTransform(agent, batch2); - expect(engine.assemble).toHaveBeenCalledTimes(2); - const batch3 = [...batch2, makeUser("third"), makeToolResult("call_3", "r3")]; - await callTransform(agent, batch3); - expect(engine.assemble).toHaveBeenCalledTimes(3); + expect(engine.assemble).toHaveBeenCalledTimes(2); }); - it("falls through to the source messages when engine.afterTurn throws", async () => { + it("falls through to source messages when engine.afterTurn throws", async () => { const agent = makeGuardableAgent(); const engine = makeMockEngine({ afterTurn: async () => { throw new Error("engine afterTurn boom"); }, }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); + installHook(agent, engine); - const messages = [makeUser("first"), makeToolResult("call_1", "result")]; - const transformed = await callTransform(agent, messages); + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); - expect(transformed).toBe(messages); + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(withNew); }); - it("falls through to the source messages when engine.assemble throws", async () => { + it("falls through to source messages when engine.assemble throws", async () => { const agent = makeGuardableAgent(); const engine = makeMockEngine({ assemble: async () => { throw new Error("engine assemble boom"); }, }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); + installHook(agent, engine); - const messages = [makeUser("first"), makeToolResult("call_1", "result")]; - const transformed = await callTransform(agent, messages); + const initial = [makeUser("first"), makeToolResult("call_1", "result")]; + await callTransform(agent, initial); - expect(transformed).toBe(messages); + const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")]; + const transformed = await callTransform(agent, withNew); + + expect(transformed).toBe(withNew); }); - it("invokes any pre-existing transformContext before passing the result to the engine", async () => { + it("invokes any pre-existing transformContext before the engine sees messages", async () => { const upstream = vi.fn(async (messages: AgentMessage[]) => [...messages, makeUser("appended")]); const agent = makeGuardableAgent(upstream); const compactedView = [makeUser("compacted")]; const engine = makeMockEngine({ - assemble: async (params) => { - expect(params.messages).toHaveLength(2); - return { messages: compactedView, estimatedTokens: 0 }; - }, - }); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), }); + installHook(agent, engine); - const sourceMessages = [makeUser("first")]; - const transformed = await callTransform(agent, sourceMessages); - + // First call: upstream runs (1 msg -> 2 msgs), fence set to 2, returns early + await callTransform(agent, [makeUser("first")]); expect(upstream).toHaveBeenCalledTimes(1); + + // Second call: upstream runs (2 msgs -> 3 msgs), hasNewMessages = true, assemble fires + const transformed = await callTransform(agent, [makeUser("first"), makeUser("second")]); + expect(upstream).toHaveBeenCalledTimes(2); expect(transformed).toBe(compactedView); }); @@ -566,178 +500,31 @@ describe("installContextEngineLoopHook", () => { const upstream = vi.fn(async (messages: AgentMessage[]) => messages); const agent = makeGuardableAgent(upstream); const engine = makeMockEngine(); - const dispose = installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); + const dispose = installHook(agent, engine); dispose(); expect(agent.transformContext).toBe(upstream); }); - it("uses getPrePromptMessageCount as the initial fence so pre-attempt history is not reported as new", async () => { + it("returns the cached assembled view on unchanged iterations instead of raw source", async () => { const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - let prePromptMessageCount = 5; - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - getPrePromptMessageCount: () => prePromptMessageCount, + const compactedView = [makeUser("compacted")]; + const engine = makeMockEngine({ + assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }), }); + installHook(agent, engine); - const history = [ - makeUser("h1"), - makeUser("h2"), - makeUser("h3"), - makeUser("h4"), - makeUser("h5"), - ]; - const firstCallMessages = [...history, makeUser("user prompt"), makeToolResult("call_1", "r1")]; - await callTransform(agent, firstCallMessages); + const initial = [makeUser("first"), makeToolResult("call_1", "r")]; + await callTransform(agent, initial); - expect(engine.afterTurn).toHaveBeenCalledTimes(1); - expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({ - prePromptMessageCount: 5, - messages: firstCallMessages, - }); - }); + const withNew = [...initial, makeToolResult("call_2", "r2")]; + const firstResult = await callTransform(agent, withNew); + expect(firstResult).toBe(compactedView); - it("evaluates getPrePromptMessageCount lazily on the first call, not at install time", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - let prePromptMessageCount = 0; - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - getPrePromptMessageCount: () => prePromptMessageCount, - }); - // Caller updates the value between install and first transformContext call - // (mirrors the runtime where prePromptMessageCount is reassigned in the - // prompt-build phase after heartbeat filtering). - prePromptMessageCount = 3; - - const messages = [ - makeUser("h1"), - makeUser("h2"), - makeUser("h3"), - makeUser("user prompt"), - makeToolResult("call_1", "r1"), - ]; - await callTransform(agent, messages); - - expect(engine.afterTurn).toHaveBeenCalledTimes(1); - expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(3); - }); - - it("falls back to fence=0 when no getPrePromptMessageCount is supplied", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const messages = [makeUser("first"), makeToolResult("call_1", "result")]; - await callTransform(agent, messages); - - expect(engine.afterTurn).toHaveBeenCalledTimes(1); - expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(0); - }); - - it("calls assemble on the first iteration even when no afterTurn ingest happened", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - // Fence equals current message count so afterTurn is skipped on first call - getPrePromptMessageCount: () => 2, - }); - - const messages = [makeUser("h1"), makeUser("h2")]; - await callTransform(agent, messages); - - expect(engine.afterTurn).not.toHaveBeenCalled(); + // Retry with same messages: should return cached assembled view, not raw + const retryResult = await callTransform(agent, withNew); + expect(retryResult).toBe(compactedView); expect(engine.assemble).toHaveBeenCalledTimes(1); }); - - it("skips assemble on subsequent iterations when no new messages have arrived", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const messages = [makeUser("first"), makeToolResult("call_1", "result")]; - await callTransform(agent, messages); - await callTransform(agent, messages); - await callTransform(agent, messages); - - // First call: afterTurn ingests delta and assemble runs. - // Second + third: nothing new, assemble must NOT be called again. - expect(engine.afterTurn).toHaveBeenCalledTimes(1); - expect(engine.assemble).toHaveBeenCalledTimes(1); - }); - - it("calls assemble again after a fresh afterTurn ingest on a later iteration", async () => { - const agent = makeGuardableAgent(); - const engine = makeMockEngine(); - installContextEngineLoopHook({ - agent, - contextEngine: engine, - sessionId, - sessionKey, - sessionFile, - tokenBudget, - modelId, - }); - - const firstBatch = [makeUser("first"), makeToolResult("call_1", "result")]; - await callTransform(agent, firstBatch); - await callTransform(agent, firstBatch); - expect(engine.assemble).toHaveBeenCalledTimes(1); - - const secondBatch = [ - makeUser("first"), - makeToolResult("call_1", "result"), - makeUser("second"), - makeToolResult("call_2", "result two"), - ]; - await callTransform(agent, secondBatch); - - expect(engine.afterTurn).toHaveBeenCalledTimes(2); - expect(engine.assemble).toHaveBeenCalledTimes(2); - }); }); diff --git a/src/agents/pi-embedded-runner/tool-result-context-guard.ts b/src/agents/pi-embedded-runner/tool-result-context-guard.ts index 620a9629239..2624bfc8a6c 100644 --- a/src/agents/pi-embedded-runner/tool-result-context-guard.ts +++ b/src/agents/pi-embedded-runner/tool-result-context-guard.ts @@ -197,20 +197,12 @@ export function installContextEngineLoopHook(params: { sessionFile: string; tokenBudget?: number; modelId: string; - /** - * Returns the message count that should be used as the initial - * `prePromptMessageCount` fence on the first `transformContext` call. - * Should match the value `finalizeAttemptContextEngineTurn` will eventually - * pass to `afterTurn` so the loop hook does not report pre-attempt history - * as new on its first invocation. - */ - getPrePromptMessageCount?: () => number; }): () => void { const { contextEngine, sessionId, sessionKey, sessionFile, tokenBudget, modelId } = params; const mutableAgent = params.agent as GuardableAgentRecord; const originalTransformContext = mutableAgent.transformContext; let lastSeenLength: number | null = null; - let hasAssembledBefore = false; + let lastAssembledView: AgentMessage[] | null = null; mutableAgent.transformContext = (async (messages: AgentMessage[], signal: AbortSignal) => { const transformed = originalTransformContext @@ -219,12 +211,16 @@ export function installContextEngineLoopHook(params: { const sourceMessages = Array.isArray(transformed) ? transformed : messages; if (lastSeenLength === null) { - lastSeenLength = params.getPrePromptMessageCount?.() ?? 0; + lastSeenLength = sourceMessages.length; } const hasNewMessages = sourceMessages.length > lastSeenLength; + if (!hasNewMessages) { + return lastAssembledView ?? sourceMessages; + } + try { - if (hasNewMessages && typeof contextEngine.afterTurn === "function") { + if (typeof contextEngine.afterTurn === "function") { const prePromptCount = lastSeenLength; await contextEngine.afterTurn({ sessionId, @@ -235,29 +231,23 @@ export function installContextEngineLoopHook(params: { tokenBudget, }); } - if (hasNewMessages) { - lastSeenLength = sourceMessages.length; - } - // Skip assemble when nothing has changed since the last call AND we - // already returned an assembled view at least once. The engine's view - // cannot have changed without new messages arriving. - if (hasNewMessages || !hasAssembledBefore) { - const assembled = await contextEngine.assemble({ - sessionId, - sessionKey, - messages: sourceMessages, - tokenBudget, - model: modelId, - }); - hasAssembledBefore = true; - if ( - assembled && - Array.isArray(assembled.messages) && - assembled.messages.length !== sourceMessages.length - ) { - return assembled.messages; - } + lastSeenLength = sourceMessages.length; + const assembled = await contextEngine.assemble({ + sessionId, + sessionKey, + messages: sourceMessages, + tokenBudget, + model: modelId, + }); + if ( + assembled && + Array.isArray(assembled.messages) && + assembled.messages.length !== sourceMessages.length + ) { + lastAssembledView = assembled.messages; + return assembled.messages; } + lastAssembledView = null; } catch { // Best-effort: any engine failure falls through to the raw source // messages so the tool loop still makes forward progress.