mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:50:42 +00:00
fix: cache assembled view for retries, skip duplicate finalize afterTurn
- Return the last assembled view on unchanged iterations instead of falling back to raw source messages, so API retries use the compacted context - Add skipAfterTurn param to finalizeAttemptContextEngineTurn so the end-of-attempt path skips ingestion when the loop hook already handled it during the tool loop - Set loopHookActive flag at install site and pass it through to finalize - Add test verifying cached view is returned on retry
This commit is contained in:
@@ -199,6 +199,9 @@ export async function finalizeAttemptContextEngineTurn(params: {
|
||||
}) => Promise<unknown>;
|
||||
sessionManager: unknown;
|
||||
warn: (message: string) => void;
|
||||
/** When true, skip the afterTurn/ingest calls because the loop hook already
|
||||
* handled per-iteration ingestion during the tool loop. Maintenance still runs. */
|
||||
skipAfterTurn?: boolean;
|
||||
}) {
|
||||
if (!params.contextEngine) {
|
||||
return { postTurnFinalizationSucceeded: true };
|
||||
@@ -206,47 +209,49 @@ export async function finalizeAttemptContextEngineTurn(params: {
|
||||
|
||||
let postTurnFinalizationSucceeded = true;
|
||||
|
||||
if (typeof params.contextEngine.afterTurn === "function") {
|
||||
try {
|
||||
await params.contextEngine.afterTurn({
|
||||
sessionId: params.sessionIdUsed,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
messages: params.messagesSnapshot,
|
||||
prePromptMessageCount: params.prePromptMessageCount,
|
||||
tokenBudget: params.tokenBudget,
|
||||
runtimeContext: params.runtimeContext,
|
||||
});
|
||||
} catch (afterTurnErr) {
|
||||
postTurnFinalizationSucceeded = false;
|
||||
params.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`);
|
||||
}
|
||||
} else {
|
||||
const newMessages = params.messagesSnapshot.slice(params.prePromptMessageCount);
|
||||
if (newMessages.length > 0) {
|
||||
if (typeof params.contextEngine.ingestBatch === "function") {
|
||||
try {
|
||||
await params.contextEngine.ingestBatch({
|
||||
sessionId: params.sessionIdUsed,
|
||||
sessionKey: params.sessionKey,
|
||||
messages: newMessages,
|
||||
});
|
||||
} catch (ingestErr) {
|
||||
postTurnFinalizationSucceeded = false;
|
||||
params.warn(`context engine ingest failed: ${String(ingestErr)}`);
|
||||
}
|
||||
} else {
|
||||
for (const msg of newMessages) {
|
||||
if (!params.skipAfterTurn) {
|
||||
if (typeof params.contextEngine.afterTurn === "function") {
|
||||
try {
|
||||
await params.contextEngine.afterTurn({
|
||||
sessionId: params.sessionIdUsed,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
messages: params.messagesSnapshot,
|
||||
prePromptMessageCount: params.prePromptMessageCount,
|
||||
tokenBudget: params.tokenBudget,
|
||||
runtimeContext: params.runtimeContext,
|
||||
});
|
||||
} catch (afterTurnErr) {
|
||||
postTurnFinalizationSucceeded = false;
|
||||
params.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`);
|
||||
}
|
||||
} else {
|
||||
const newMessages = params.messagesSnapshot.slice(params.prePromptMessageCount);
|
||||
if (newMessages.length > 0) {
|
||||
if (typeof params.contextEngine.ingestBatch === "function") {
|
||||
try {
|
||||
await params.contextEngine.ingest?.({
|
||||
await params.contextEngine.ingestBatch({
|
||||
sessionId: params.sessionIdUsed,
|
||||
sessionKey: params.sessionKey,
|
||||
message: msg,
|
||||
messages: newMessages,
|
||||
});
|
||||
} catch (ingestErr) {
|
||||
postTurnFinalizationSucceeded = false;
|
||||
params.warn(`context engine ingest failed: ${String(ingestErr)}`);
|
||||
}
|
||||
} else {
|
||||
for (const msg of newMessages) {
|
||||
try {
|
||||
await params.contextEngine.ingest?.({
|
||||
sessionId: params.sessionIdUsed,
|
||||
sessionKey: params.sessionKey,
|
||||
message: msg,
|
||||
});
|
||||
} catch (ingestErr) {
|
||||
postTurnFinalizationSucceeded = false;
|
||||
params.warn(`context engine ingest failed: ${String(ingestErr)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -997,11 +997,7 @@ export async function runEmbeddedAttempt(
|
||||
queueYieldInterruptForSession = () => {
|
||||
queueSessionsYieldInterruptMessage(activeSession);
|
||||
};
|
||||
// Hoisted so installContextEngineLoopHook can read the same fence value
|
||||
// that finalizeAttemptContextEngineTurn uses at end-of-attempt. The
|
||||
// initial value is reassigned in the prompt-build phase below, after
|
||||
// any heartbeat message filtering.
|
||||
let prePromptMessageCount = 0;
|
||||
let loopHookActive = false;
|
||||
if (params.contextEngine?.info?.ownsCompaction !== true) {
|
||||
removeToolResultContextGuard = installToolResultContextGuard({
|
||||
agent: activeSession.agent,
|
||||
@@ -1021,8 +1017,8 @@ export async function runEmbeddedAttempt(
|
||||
sessionFile: params.sessionFile,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
modelId: params.modelId,
|
||||
getPrePromptMessageCount: () => prePromptMessageCount,
|
||||
});
|
||||
loopHookActive = true;
|
||||
}
|
||||
const cacheTrace = createCacheTrace({
|
||||
cfg: params.config,
|
||||
@@ -1668,7 +1664,7 @@ export async function runEmbeddedAttempt(
|
||||
let promptError: unknown = null;
|
||||
let preflightRecovery: EmbeddedRunAttemptResult["preflightRecovery"];
|
||||
let promptErrorSource: "prompt" | "compaction" | "precheck" | null = null;
|
||||
prePromptMessageCount = activeSession.messages.length;
|
||||
let prePromptMessageCount = activeSession.messages.length;
|
||||
let skipPromptSubmission = false;
|
||||
try {
|
||||
const promptStartedAt = Date.now();
|
||||
@@ -2222,6 +2218,7 @@ export async function runEmbeddedAttempt(
|
||||
prePromptMessageCount,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
runtimeContext: afterTurnRuntimeContext,
|
||||
skipAfterTurn: loopHookActive,
|
||||
runMaintenance: async (contextParams) =>
|
||||
await runContextEngineMaintenance({
|
||||
contextEngine: contextParams.contextEngine as never,
|
||||
|
||||
@@ -295,270 +295,204 @@ describe("installContextEngineLoopHook", () => {
|
||||
const tokenBudget = 4096;
|
||||
const modelId = "test-model";
|
||||
|
||||
it("forwards new messages to engine.afterTurn with prePromptMessageCount=0 on first call", async () => {
|
||||
function installHook(
|
||||
agent: ReturnType<typeof makeGuardableAgent>,
|
||||
engine: MockedEngine,
|
||||
): () => void {
|
||||
return installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
}
|
||||
|
||||
it("returns early on the first call without calling afterTurn or assemble", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, messages);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
messages,
|
||||
prePromptMessageCount: 0,
|
||||
tokenBudget,
|
||||
});
|
||||
});
|
||||
|
||||
it("advances prePromptMessageCount on subsequent calls based on the previous high-water mark", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const firstBatch = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, firstBatch);
|
||||
|
||||
const secondBatch = [
|
||||
makeUser("first"),
|
||||
makeToolResult("call_1", "result"),
|
||||
makeUser("second"),
|
||||
makeToolResult("call_2", "result two"),
|
||||
];
|
||||
await callTransform(agent, secondBatch);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(2);
|
||||
expect(engine.afterTurn.mock.calls[1]?.[0]).toMatchObject({
|
||||
prePromptMessageCount: 2,
|
||||
messages: secondBatch,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not call engine.afterTurn when no new messages have been appended", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, messages);
|
||||
await callTransform(agent, messages);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("returns the engine's assembled view when its length differs from the source", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const compactedView = [makeUser("compacted")];
|
||||
const engine = makeMockEngine({
|
||||
assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }),
|
||||
});
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const sourceMessages = [
|
||||
makeUser("first"),
|
||||
makeToolResult("call_1", "result"),
|
||||
makeToolResult("call_2", "result two"),
|
||||
];
|
||||
const transformed = await callTransform(agent, sourceMessages);
|
||||
|
||||
expect(transformed).toBe(compactedView);
|
||||
expect(transformed).not.toBe(sourceMessages);
|
||||
});
|
||||
|
||||
it("returns the source messages when the engine's assembled view has the same length", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const sourceMessages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
const transformed = await callTransform(agent, sourceMessages);
|
||||
|
||||
expect(transformed).toBe(sourceMessages);
|
||||
});
|
||||
|
||||
it("does not mutate the source messages array even when the engine returns a different view", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const compactedView = [makeUser("compacted")];
|
||||
const engine = makeMockEngine({
|
||||
assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }),
|
||||
});
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const sourceMessages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
const sourceCopy = [...sourceMessages];
|
||||
await callTransform(agent, sourceMessages);
|
||||
|
||||
expect(sourceMessages).toEqual(sourceCopy);
|
||||
expect(sourceMessages).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("skips the afterTurn call when the engine does not implement it", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine({ omitAfterTurn: true });
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
const transformed = await callTransform(agent, messages);
|
||||
|
||||
expect(transformed).toBe(messages);
|
||||
expect(engine.afterTurn).not.toHaveBeenCalled();
|
||||
expect(engine.assemble).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("calls afterTurn and assemble when new messages are appended after the first call", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installHook(agent, engine);
|
||||
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
await callTransform(agent, withNew);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({
|
||||
prePromptMessageCount: 2,
|
||||
messages: withNew,
|
||||
});
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("keeps calling assemble on subsequent iterations when engine lacks afterTurn but new messages arrive", async () => {
|
||||
it("advances the fence across multiple iterations", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installHook(agent, engine);
|
||||
|
||||
const batch0 = [makeUser("h1"), makeToolResult("c1", "r1")];
|
||||
await callTransform(agent, batch0);
|
||||
|
||||
const batch1 = [...batch0, makeUser("h2"), makeToolResult("c2", "r2")];
|
||||
await callTransform(agent, batch1);
|
||||
|
||||
const batch2 = [...batch1, makeUser("h3"), makeToolResult("c3", "r3")];
|
||||
await callTransform(agent, batch2);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(2);
|
||||
expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(2);
|
||||
expect(engine.afterTurn.mock.calls[1]?.[0]?.prePromptMessageCount).toBe(4);
|
||||
});
|
||||
|
||||
it("skips afterTurn and assemble when messages have not changed", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installHook(agent, engine);
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, messages);
|
||||
await callTransform(agent, messages);
|
||||
await callTransform(agent, messages);
|
||||
|
||||
expect(engine.afterTurn).not.toHaveBeenCalled();
|
||||
expect(engine.assemble).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns the assembled view when its length differs from the source", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const compactedView = [makeUser("compacted")];
|
||||
const engine = makeMockEngine({
|
||||
assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }),
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "r")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
const withNew = [...initial, makeToolResult("call_2", "r2")];
|
||||
const transformed = await callTransform(agent, withNew);
|
||||
|
||||
expect(transformed).toBe(compactedView);
|
||||
});
|
||||
|
||||
it("returns the source messages when the assembled view has the same length", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installHook(agent, engine);
|
||||
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
const transformed = await callTransform(agent, withNew);
|
||||
|
||||
expect(transformed).toBe(withNew);
|
||||
});
|
||||
|
||||
it("does not mutate the source messages array", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const compactedView = [makeUser("compacted")];
|
||||
const engine = makeMockEngine({
|
||||
assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }),
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
const sourceMessages = [...initial, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
const sourceCopy = [...sourceMessages];
|
||||
await callTransform(agent, sourceMessages);
|
||||
|
||||
expect(sourceMessages).toEqual(sourceCopy);
|
||||
});
|
||||
|
||||
it("still calls assemble when engine lacks afterTurn but new messages arrive", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine({ omitAfterTurn: true });
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const batch1 = [makeUser("first"), makeToolResult("call_1", "r1")];
|
||||
const batch0 = [makeUser("first"), makeToolResult("call_1", "r1")];
|
||||
await callTransform(agent, batch0);
|
||||
|
||||
const batch1 = [...batch0, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
await callTransform(agent, batch1);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(1);
|
||||
|
||||
const batch2 = [...batch1, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
const batch2 = [...batch1, makeUser("third"), makeToolResult("call_3", "r3")];
|
||||
await callTransform(agent, batch2);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(2);
|
||||
|
||||
const batch3 = [...batch2, makeUser("third"), makeToolResult("call_3", "r3")];
|
||||
await callTransform(agent, batch3);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(3);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("falls through to the source messages when engine.afterTurn throws", async () => {
|
||||
it("falls through to source messages when engine.afterTurn throws", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine({
|
||||
afterTurn: async () => {
|
||||
throw new Error("engine afterTurn boom");
|
||||
},
|
||||
});
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
const transformed = await callTransform(agent, messages);
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
expect(transformed).toBe(messages);
|
||||
const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
const transformed = await callTransform(agent, withNew);
|
||||
|
||||
expect(transformed).toBe(withNew);
|
||||
});
|
||||
|
||||
it("falls through to the source messages when engine.assemble throws", async () => {
|
||||
it("falls through to source messages when engine.assemble throws", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine({
|
||||
assemble: async () => {
|
||||
throw new Error("engine assemble boom");
|
||||
},
|
||||
});
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
const transformed = await callTransform(agent, messages);
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
expect(transformed).toBe(messages);
|
||||
const withNew = [...initial, makeUser("second"), makeToolResult("call_2", "r2")];
|
||||
const transformed = await callTransform(agent, withNew);
|
||||
|
||||
expect(transformed).toBe(withNew);
|
||||
});
|
||||
|
||||
it("invokes any pre-existing transformContext before passing the result to the engine", async () => {
|
||||
it("invokes any pre-existing transformContext before the engine sees messages", async () => {
|
||||
const upstream = vi.fn(async (messages: AgentMessage[]) => [...messages, makeUser("appended")]);
|
||||
const agent = makeGuardableAgent(upstream);
|
||||
const compactedView = [makeUser("compacted")];
|
||||
const engine = makeMockEngine({
|
||||
assemble: async (params) => {
|
||||
expect(params.messages).toHaveLength(2);
|
||||
return { messages: compactedView, estimatedTokens: 0 };
|
||||
},
|
||||
});
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }),
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const sourceMessages = [makeUser("first")];
|
||||
const transformed = await callTransform(agent, sourceMessages);
|
||||
|
||||
// First call: upstream runs (1 msg -> 2 msgs), fence set to 2, returns early
|
||||
await callTransform(agent, [makeUser("first")]);
|
||||
expect(upstream).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second call: upstream runs (2 msgs -> 3 msgs), hasNewMessages = true, assemble fires
|
||||
const transformed = await callTransform(agent, [makeUser("first"), makeUser("second")]);
|
||||
expect(upstream).toHaveBeenCalledTimes(2);
|
||||
expect(transformed).toBe(compactedView);
|
||||
});
|
||||
|
||||
@@ -566,178 +500,31 @@ describe("installContextEngineLoopHook", () => {
|
||||
const upstream = vi.fn(async (messages: AgentMessage[]) => messages);
|
||||
const agent = makeGuardableAgent(upstream);
|
||||
const engine = makeMockEngine();
|
||||
const dispose = installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
const dispose = installHook(agent, engine);
|
||||
|
||||
dispose();
|
||||
|
||||
expect(agent.transformContext).toBe(upstream);
|
||||
});
|
||||
|
||||
it("uses getPrePromptMessageCount as the initial fence so pre-attempt history is not reported as new", async () => {
|
||||
it("returns the cached assembled view on unchanged iterations instead of raw source", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
let prePromptMessageCount = 5;
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
getPrePromptMessageCount: () => prePromptMessageCount,
|
||||
const compactedView = [makeUser("compacted")];
|
||||
const engine = makeMockEngine({
|
||||
assemble: async () => ({ messages: compactedView, estimatedTokens: 0 }),
|
||||
});
|
||||
installHook(agent, engine);
|
||||
|
||||
const history = [
|
||||
makeUser("h1"),
|
||||
makeUser("h2"),
|
||||
makeUser("h3"),
|
||||
makeUser("h4"),
|
||||
makeUser("h5"),
|
||||
];
|
||||
const firstCallMessages = [...history, makeUser("user prompt"), makeToolResult("call_1", "r1")];
|
||||
await callTransform(agent, firstCallMessages);
|
||||
const initial = [makeUser("first"), makeToolResult("call_1", "r")];
|
||||
await callTransform(agent, initial);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
expect(engine.afterTurn.mock.calls[0]?.[0]).toMatchObject({
|
||||
prePromptMessageCount: 5,
|
||||
messages: firstCallMessages,
|
||||
});
|
||||
});
|
||||
const withNew = [...initial, makeToolResult("call_2", "r2")];
|
||||
const firstResult = await callTransform(agent, withNew);
|
||||
expect(firstResult).toBe(compactedView);
|
||||
|
||||
it("evaluates getPrePromptMessageCount lazily on the first call, not at install time", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
let prePromptMessageCount = 0;
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
getPrePromptMessageCount: () => prePromptMessageCount,
|
||||
});
|
||||
// Caller updates the value between install and first transformContext call
|
||||
// (mirrors the runtime where prePromptMessageCount is reassigned in the
|
||||
// prompt-build phase after heartbeat filtering).
|
||||
prePromptMessageCount = 3;
|
||||
|
||||
const messages = [
|
||||
makeUser("h1"),
|
||||
makeUser("h2"),
|
||||
makeUser("h3"),
|
||||
makeUser("user prompt"),
|
||||
makeToolResult("call_1", "r1"),
|
||||
];
|
||||
await callTransform(agent, messages);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(3);
|
||||
});
|
||||
|
||||
it("falls back to fence=0 when no getPrePromptMessageCount is supplied", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, messages);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
expect(engine.afterTurn.mock.calls[0]?.[0]?.prePromptMessageCount).toBe(0);
|
||||
});
|
||||
|
||||
it("calls assemble on the first iteration even when no afterTurn ingest happened", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
// Fence equals current message count so afterTurn is skipped on first call
|
||||
getPrePromptMessageCount: () => 2,
|
||||
});
|
||||
|
||||
const messages = [makeUser("h1"), makeUser("h2")];
|
||||
await callTransform(agent, messages);
|
||||
|
||||
expect(engine.afterTurn).not.toHaveBeenCalled();
|
||||
// Retry with same messages: should return cached assembled view, not raw
|
||||
const retryResult = await callTransform(agent, withNew);
|
||||
expect(retryResult).toBe(compactedView);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("skips assemble on subsequent iterations when no new messages have arrived", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const messages = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, messages);
|
||||
await callTransform(agent, messages);
|
||||
await callTransform(agent, messages);
|
||||
|
||||
// First call: afterTurn ingests delta and assemble runs.
|
||||
// Second + third: nothing new, assemble must NOT be called again.
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(1);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("calls assemble again after a fresh afterTurn ingest on a later iteration", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
const engine = makeMockEngine();
|
||||
installContextEngineLoopHook({
|
||||
agent,
|
||||
contextEngine: engine,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
tokenBudget,
|
||||
modelId,
|
||||
});
|
||||
|
||||
const firstBatch = [makeUser("first"), makeToolResult("call_1", "result")];
|
||||
await callTransform(agent, firstBatch);
|
||||
await callTransform(agent, firstBatch);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(1);
|
||||
|
||||
const secondBatch = [
|
||||
makeUser("first"),
|
||||
makeToolResult("call_1", "result"),
|
||||
makeUser("second"),
|
||||
makeToolResult("call_2", "result two"),
|
||||
];
|
||||
await callTransform(agent, secondBatch);
|
||||
|
||||
expect(engine.afterTurn).toHaveBeenCalledTimes(2);
|
||||
expect(engine.assemble).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -197,20 +197,12 @@ export function installContextEngineLoopHook(params: {
|
||||
sessionFile: string;
|
||||
tokenBudget?: number;
|
||||
modelId: string;
|
||||
/**
|
||||
* Returns the message count that should be used as the initial
|
||||
* `prePromptMessageCount` fence on the first `transformContext` call.
|
||||
* Should match the value `finalizeAttemptContextEngineTurn` will eventually
|
||||
* pass to `afterTurn` so the loop hook does not report pre-attempt history
|
||||
* as new on its first invocation.
|
||||
*/
|
||||
getPrePromptMessageCount?: () => number;
|
||||
}): () => void {
|
||||
const { contextEngine, sessionId, sessionKey, sessionFile, tokenBudget, modelId } = params;
|
||||
const mutableAgent = params.agent as GuardableAgentRecord;
|
||||
const originalTransformContext = mutableAgent.transformContext;
|
||||
let lastSeenLength: number | null = null;
|
||||
let hasAssembledBefore = false;
|
||||
let lastAssembledView: AgentMessage[] | null = null;
|
||||
|
||||
mutableAgent.transformContext = (async (messages: AgentMessage[], signal: AbortSignal) => {
|
||||
const transformed = originalTransformContext
|
||||
@@ -219,12 +211,16 @@ export function installContextEngineLoopHook(params: {
|
||||
const sourceMessages = Array.isArray(transformed) ? transformed : messages;
|
||||
|
||||
if (lastSeenLength === null) {
|
||||
lastSeenLength = params.getPrePromptMessageCount?.() ?? 0;
|
||||
lastSeenLength = sourceMessages.length;
|
||||
}
|
||||
|
||||
const hasNewMessages = sourceMessages.length > lastSeenLength;
|
||||
if (!hasNewMessages) {
|
||||
return lastAssembledView ?? sourceMessages;
|
||||
}
|
||||
|
||||
try {
|
||||
if (hasNewMessages && typeof contextEngine.afterTurn === "function") {
|
||||
if (typeof contextEngine.afterTurn === "function") {
|
||||
const prePromptCount = lastSeenLength;
|
||||
await contextEngine.afterTurn({
|
||||
sessionId,
|
||||
@@ -235,29 +231,23 @@ export function installContextEngineLoopHook(params: {
|
||||
tokenBudget,
|
||||
});
|
||||
}
|
||||
if (hasNewMessages) {
|
||||
lastSeenLength = sourceMessages.length;
|
||||
}
|
||||
// Skip assemble when nothing has changed since the last call AND we
|
||||
// already returned an assembled view at least once. The engine's view
|
||||
// cannot have changed without new messages arriving.
|
||||
if (hasNewMessages || !hasAssembledBefore) {
|
||||
const assembled = await contextEngine.assemble({
|
||||
sessionId,
|
||||
sessionKey,
|
||||
messages: sourceMessages,
|
||||
tokenBudget,
|
||||
model: modelId,
|
||||
});
|
||||
hasAssembledBefore = true;
|
||||
if (
|
||||
assembled &&
|
||||
Array.isArray(assembled.messages) &&
|
||||
assembled.messages.length !== sourceMessages.length
|
||||
) {
|
||||
return assembled.messages;
|
||||
}
|
||||
lastSeenLength = sourceMessages.length;
|
||||
const assembled = await contextEngine.assemble({
|
||||
sessionId,
|
||||
sessionKey,
|
||||
messages: sourceMessages,
|
||||
tokenBudget,
|
||||
model: modelId,
|
||||
});
|
||||
if (
|
||||
assembled &&
|
||||
Array.isArray(assembled.messages) &&
|
||||
assembled.messages.length !== sourceMessages.length
|
||||
) {
|
||||
lastAssembledView = assembled.messages;
|
||||
return assembled.messages;
|
||||
}
|
||||
lastAssembledView = null;
|
||||
} catch {
|
||||
// Best-effort: any engine failure falls through to the raw source
|
||||
// messages so the tool loop still makes forward progress.
|
||||
|
||||
Reference in New Issue
Block a user