mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix(agents): observe post-compaction guard live
This commit is contained in:
@@ -29,6 +29,12 @@ export type PostCompactionLoopGuard = {
|
||||
snapshot: () => { armed: boolean; remainingAttempts: number };
|
||||
};
|
||||
|
||||
export type PostCompactionGuardScope = {
|
||||
sessionKey?: string;
|
||||
sessionId?: string;
|
||||
runId?: string;
|
||||
};
|
||||
|
||||
type GuardState = {
|
||||
enabled: boolean;
|
||||
windowSize: number;
|
||||
@@ -36,6 +42,8 @@ type GuardState = {
|
||||
history: PostCompactionGuardObservation[];
|
||||
};
|
||||
|
||||
const activeGuards = new Map<string, PostCompactionLoopGuard>();
|
||||
|
||||
function asPositiveInt(value: number | undefined, fallback: number): number {
|
||||
if (typeof value !== "number" || !Number.isInteger(value) || value <= 0) {
|
||||
return fallback;
|
||||
@@ -105,6 +113,56 @@ export function createPostCompactionLoopGuard(
|
||||
return { armPostCompaction, observe, snapshot };
|
||||
}
|
||||
|
||||
function normalizeScopePart(value: string | undefined): string | undefined {
|
||||
const trimmed = value?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function scopeKeys(scope: PostCompactionGuardScope): string[] {
|
||||
const runId = normalizeScopePart(scope.runId);
|
||||
const keys: string[] = [];
|
||||
for (const [kind, id] of [
|
||||
["sessionKey", normalizeScopePart(scope.sessionKey)],
|
||||
["sessionId", normalizeScopePart(scope.sessionId)],
|
||||
] as const) {
|
||||
if (!id) {
|
||||
continue;
|
||||
}
|
||||
keys.push(runId ? `${kind}:${id}:run:${runId}` : `${kind}:${id}`);
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
export function registerPostCompactionLoopGuard(
|
||||
scope: PostCompactionGuardScope,
|
||||
guard: PostCompactionLoopGuard,
|
||||
): () => void {
|
||||
const keys = scopeKeys(scope);
|
||||
for (const key of keys) {
|
||||
activeGuards.set(key, guard);
|
||||
}
|
||||
return () => {
|
||||
for (const key of keys) {
|
||||
if (activeGuards.get(key) === guard) {
|
||||
activeGuards.delete(key);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function observePostCompactionLoopGuard(
|
||||
scope: PostCompactionGuardScope,
|
||||
call: PostCompactionGuardObservation,
|
||||
): PostCompactionGuardVerdict | undefined {
|
||||
for (const key of scopeKeys(scope)) {
|
||||
const guard = activeGuards.get(key);
|
||||
if (guard) {
|
||||
return guard.observe(call);
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export class PostCompactionLoopPersistedError extends Error {
|
||||
readonly detector: "compaction_loop_persisted";
|
||||
readonly count: number;
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { beforeAll, beforeEach, describe, expect, it } from "vitest";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type {
|
||||
diagnosticSessionStates as DiagnosticSessionStatesType,
|
||||
getDiagnosticSessionState as GetDiagnosticSessionStateType,
|
||||
SessionState,
|
||||
} from "../../logging/diagnostic-session-state.js";
|
||||
import type { wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType } from "../pi-tools.before-tool-call.js";
|
||||
import type {
|
||||
recordToolCall as RecordToolCallType,
|
||||
recordToolCallOutcome as RecordToolCallOutcomeType,
|
||||
@@ -35,6 +36,7 @@ let diagnosticSessionStates: typeof DiagnosticSessionStatesType;
|
||||
let getDiagnosticSessionState: typeof GetDiagnosticSessionStateType;
|
||||
let recordToolCall: typeof RecordToolCallType;
|
||||
let recordToolCallOutcome: typeof RecordToolCallOutcomeType;
|
||||
let wrapToolWithBeforeToolCallHook: typeof WrapToolWithBeforeToolCallHookType;
|
||||
let PostCompactionLoopPersistedError: typeof PostCompactionLoopPersistedErrorType;
|
||||
|
||||
// Mirror the production trim cap (resolveLoopDetectionConfig default
|
||||
@@ -49,7 +51,7 @@ function recordToolOutcome(
|
||||
result: unknown,
|
||||
runId?: string,
|
||||
): void {
|
||||
const toolCallId = `${toolName}-${state.toolOutcomeSeq ?? 0}`;
|
||||
const toolCallId = `${toolName}-${state.toolCallHistory?.length ?? 0}`;
|
||||
const scope = runId ? { runId } : undefined;
|
||||
recordToolCall(state, toolName, toolParams, toolCallId, undefined, scope);
|
||||
const outcome: Parameters<typeof recordToolCallOutcome>[1] = {
|
||||
@@ -64,6 +66,30 @@ function recordToolOutcome(
|
||||
recordToolCallOutcome(state, outcome);
|
||||
}
|
||||
|
||||
let liveToolCallSeq = 0;
|
||||
|
||||
async function executeWrappedToolOutcome(
|
||||
toolName: string,
|
||||
toolParams: unknown,
|
||||
result: unknown,
|
||||
runId = baseParams.runId,
|
||||
): Promise<unknown> {
|
||||
const tool = wrapToolWithBeforeToolCallHook(
|
||||
{
|
||||
name: toolName,
|
||||
execute: vi.fn(async () => result),
|
||||
} as never,
|
||||
{
|
||||
agentId: "main",
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
runId,
|
||||
},
|
||||
);
|
||||
liveToolCallSeq += 1;
|
||||
return tool.execute(`${toolName}-${liveToolCallSeq}`, toolParams, undefined, undefined);
|
||||
}
|
||||
|
||||
describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
beforeAll(async () => {
|
||||
({ runEmbeddedPiAgent } = await loadRunOverflowCompactionHarness());
|
||||
@@ -72,10 +98,12 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
({ diagnosticSessionStates, getDiagnosticSessionState } =
|
||||
await import("../../logging/diagnostic-session-state.js"));
|
||||
({ recordToolCall, recordToolCallOutcome } = await import("../tool-loop-detection.js"));
|
||||
({ wrapToolWithBeforeToolCallHook } = await import("../pi-tools.before-tool-call.js"));
|
||||
({ PostCompactionLoopPersistedError } = await import("./post-compaction-loop-guard.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
liveToolCallSeq = 0;
|
||||
diagnosticSessionStates.clear();
|
||||
mockedRunEmbeddedAttempt.mockReset();
|
||||
mockedCompactDirect.mockReset();
|
||||
@@ -122,29 +150,24 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
|
||||
it("aborts the run with PostCompactionLoopPersistedError when identical (tool, args, result) repeats windowSize times after compaction", async () => {
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
let attemptReturned = false;
|
||||
|
||||
// Attempt 1: overflow → triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2: post-compaction. The wrapped tool layer would have
|
||||
// recorded `windowSize` identical (tool, args, result) outcomes during
|
||||
// this single attempt. The runner's after-attempt guard observation
|
||||
// sees all three at once, accumulates matches, and aborts on the third.
|
||||
// Attempt 2: post-compaction. The live wrapped-tool path records each
|
||||
// outcome while the prompt is still running; the third identical result
|
||||
// aborts before the attempt can return.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
for (let i = 0; i < 3; i += 1) {
|
||||
recordToolOutcome(
|
||||
sessionState,
|
||||
await executeWrappedToolOutcome(
|
||||
"gateway",
|
||||
{ action: "lookup", path: "x" },
|
||||
"identical-result",
|
||||
baseParams.runId,
|
||||
);
|
||||
}
|
||||
attemptReturned = true;
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "gateway" }, { toolName: "gateway" }, { toolName: "gateway" }],
|
||||
@@ -165,35 +188,25 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
expect(attemptReturned).toBe(false);
|
||||
});
|
||||
|
||||
it("does not abort when the result hash changes across post-compaction attempts (progress was made)", async () => {
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
|
||||
// Attempt 1: overflow → triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2 (post-compaction): identical args, but DIFFERENT result hash
|
||||
// each time. Only one further attempt is needed since the runner exits
|
||||
// on a successful prompt with no further retry trigger.
|
||||
let callCounter = 0;
|
||||
// each time. This fills the window without triggering the persisted-loop
|
||||
// abort because the tool is making progress.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
callCounter += 1;
|
||||
recordToolOutcome(
|
||||
sessionState,
|
||||
"gateway",
|
||||
{ action: "lookup", path: "x" },
|
||||
`result-${callCounter}`,
|
||||
baseParams.runId,
|
||||
);
|
||||
for (let i = 0; i < 3; i += 1) {
|
||||
await executeWrappedToolOutcome("gateway", { action: "lookup", path: "x" }, `result-${i}`);
|
||||
}
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "gateway" }],
|
||||
toolMetas: [{ toolName: "gateway" }, { toolName: "gateway" }, { toolName: "gateway" }],
|
||||
});
|
||||
});
|
||||
|
||||
@@ -214,10 +227,6 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
it("disarms after windowSize observations regardless of match, so later identical calls do not abort", async () => {
|
||||
// Use windowSize: 2 so the guard disarms after 2 observations.
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
|
||||
// Attempt 1: overflow → triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
@@ -227,8 +236,8 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
// guard disarms with no abort. We then append more identical records
|
||||
// afterwards in this test to confirm they are not observed by the guard.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
recordToolOutcome(sessionState, "read", { path: "/a" }, "ra", baseParams.runId);
|
||||
recordToolOutcome(sessionState, "write", { path: "/b" }, "rb", baseParams.runId);
|
||||
await executeWrappedToolOutcome("read", { path: "/a" }, "ra");
|
||||
await executeWrappedToolOutcome("write", { path: "/b" }, "rb");
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "read" }, { toolName: "write" }],
|
||||
@@ -259,12 +268,10 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("aborts post-compaction loop even when toolCallHistory is at its trim cap (regression: index-cursor blind spot in long-running sessions)", async () => {
|
||||
it("aborts post-compaction loop from the live tool path even when toolCallHistory is at its trim cap", async () => {
|
||||
// Long-running sessions accumulate up to historySize (default 30) records
|
||||
// in toolCallHistory. Pushing more entries triggers trim, which would
|
||||
// shift records out from under an absolute index cursor and let the
|
||||
// guard silently miss every loop. The seq-based observation must still
|
||||
// see the new records via the tail-slice path.
|
||||
// in toolCallHistory. The live observer must still see the new outcome
|
||||
// before trimming can make any after-attempt cursor ambiguous.
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
@@ -283,20 +290,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2 (post-compaction): three identical records appended while
|
||||
// history is already at the cap. These pushes trigger trim, shifting
|
||||
// older entries out. With the old index-cursor scheme, length never
|
||||
// grew so the observation loop never ran. With the seq-based scheme,
|
||||
// the tail of length-30 history contains the three new records and
|
||||
// the guard aborts on the third match.
|
||||
// Attempt 2 (post-compaction): three identical live tool outcomes while
|
||||
// history is already at the cap. The guard aborts on the third result
|
||||
// before the mocked attempt can return.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
for (let i = 0; i < 3; i += 1) {
|
||||
recordToolOutcome(
|
||||
sessionState,
|
||||
await executeWrappedToolOutcome(
|
||||
"gateway",
|
||||
{ action: "lookup", path: "x" },
|
||||
"identical-result",
|
||||
baseParams.runId,
|
||||
);
|
||||
}
|
||||
// History is still capped at HISTORY_TRIM_CAP after the trim.
|
||||
|
||||
@@ -9,7 +9,6 @@ import { emitAgentPlanEvent } from "../../infra/agent-events.js";
|
||||
import { sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { freezeDiagnosticTraceContext } from "../../infra/diagnostic-trace-context.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { getDiagnosticSessionState } from "../../logging/diagnostic-session-state.js";
|
||||
import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-context.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js";
|
||||
@@ -95,7 +94,7 @@ import { log } from "./logger.js";
|
||||
import { resolveModelAsync } from "./model.js";
|
||||
import {
|
||||
createPostCompactionLoopGuard,
|
||||
PostCompactionLoopPersistedError,
|
||||
registerPostCompactionLoopGuard,
|
||||
} from "./post-compaction-loop-guard.js";
|
||||
import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js";
|
||||
import { handleAssistantFailover } from "./run/assistant-failover.js";
|
||||
@@ -788,28 +787,19 @@ export async function runEmbeddedPiAgent(
|
||||
// feeds it the outcome of each attempt.
|
||||
const idleTimeoutBreakerState = createIdleTimeoutBreakerState();
|
||||
// Post-compaction loop guard for #77474. Armed at each compaction-success
|
||||
// site below; observes tool-call outcomes from the diagnostic session
|
||||
// state's toolCallHistory after each attempt. Aborts the run when the
|
||||
// same (tool, args, result) triple repeats windowSize times within the
|
||||
// post-compaction window.
|
||||
// site below; observed from the live tool-outcome path so it can abort
|
||||
// while the post-compaction prompt is still running.
|
||||
const postCompactionGuard = createPostCompactionLoopGuard(
|
||||
params.config?.tools?.loopDetection?.postCompactionGuard,
|
||||
);
|
||||
// Monotonic outcome seq (incremented by recordToolCallOutcome on each
|
||||
// observable push). We use a delta on this counter instead of an
|
||||
// absolute index into state.toolCallHistory, which is trimmed at
|
||||
// historySize and would silently shift records out from under an
|
||||
// index cursor in long-running sessions.
|
||||
let lastObservedToolOutcomeSeq = (() => {
|
||||
if (!params.sessionKey && !params.sessionId) {
|
||||
return 0;
|
||||
}
|
||||
const state = getDiagnosticSessionState({
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
});
|
||||
return state.toolOutcomeSeq ?? 0;
|
||||
})();
|
||||
const unregisterPostCompactionGuard = registerPostCompactionLoopGuard(
|
||||
{
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: params.sessionId,
|
||||
runId: params.runId,
|
||||
},
|
||||
postCompactionGuard,
|
||||
);
|
||||
let lastRetryFailoverReason: FailoverReason | null = null;
|
||||
let planningOnlyRetryInstruction: string | null = null;
|
||||
let reasoningOnlyRetryInstruction: string | null = null;
|
||||
@@ -1221,68 +1211,6 @@ export async function runEmbeddedPiAgent(
|
||||
});
|
||||
const attempt = normalizeEmbeddedRunAttemptResult(rawAttempt);
|
||||
|
||||
// Post-compaction loop guard observation. Reads any new tool-call
|
||||
// records that completed during this attempt (populated by the
|
||||
// before-tool-call hook's recordToolCallOutcome) and feeds them
|
||||
// into the guard. Disarms automatically once the window expires.
|
||||
//
|
||||
// Cursor scheme: rather than index into state.toolCallHistory
|
||||
// (which trims at historySize and silently drops records on busy
|
||||
// sessions), we read state.toolOutcomeSeq, a monotonic counter
|
||||
// that recordToolCallOutcome increments on every observable push.
|
||||
// The delta currentSeq - lastObservedSeq tells us how many new
|
||||
// records have appeared globally; we then scan that many entries
|
||||
// from the tail of toolCallHistory. The tail-slice is trim-safe:
|
||||
// even if the buffer was full, the most recent N records are the
|
||||
// ones that survive.
|
||||
if (postCompactionGuard.snapshot().armed) {
|
||||
const guardSessionState =
|
||||
params.sessionKey || params.sessionId
|
||||
? getDiagnosticSessionState({
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
})
|
||||
: undefined;
|
||||
const history = guardSessionState?.toolCallHistory ?? [];
|
||||
const currentSeq = guardSessionState?.toolOutcomeSeq ?? 0;
|
||||
const newRecordCount = Math.max(0, currentSeq - lastObservedToolOutcomeSeq);
|
||||
if (newRecordCount > 0) {
|
||||
const startIndex = Math.max(0, history.length - newRecordCount);
|
||||
for (let i = startIndex; i < history.length; i += 1) {
|
||||
const record = history[i];
|
||||
if (!record || typeof record.resultHash !== "string") {
|
||||
continue;
|
||||
}
|
||||
if (params.runId && record.runId && record.runId !== params.runId) {
|
||||
continue;
|
||||
}
|
||||
const verdict = postCompactionGuard.observe({
|
||||
toolName: record.toolName,
|
||||
argsHash: record.argsHash,
|
||||
resultHash: record.resultHash,
|
||||
});
|
||||
if (verdict.shouldAbort) {
|
||||
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
|
||||
}
|
||||
if (!postCompactionGuard.snapshot().armed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
lastObservedToolOutcomeSeq = currentSeq;
|
||||
} else {
|
||||
// Keep cursor aligned with the current global outcome seq so a
|
||||
// freshly-armed window only sees records pushed AFTER arming.
|
||||
const guardSessionState =
|
||||
params.sessionKey || params.sessionId
|
||||
? getDiagnosticSessionState({
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
})
|
||||
: undefined;
|
||||
lastObservedToolOutcomeSeq = guardSessionState?.toolOutcomeSeq ?? 0;
|
||||
}
|
||||
|
||||
const {
|
||||
aborted,
|
||||
externalAbort,
|
||||
@@ -2858,6 +2786,7 @@ export async function runEmbeddedPiAgent(
|
||||
};
|
||||
}
|
||||
} finally {
|
||||
unregisterPostCompactionGuard();
|
||||
forgetPromptBuildDrainCacheForRun(params.runId);
|
||||
stopRuntimeAuthRefreshTimer();
|
||||
await runAgentCleanupStep({
|
||||
|
||||
@@ -26,6 +26,10 @@ import {
|
||||
import { createLazyRuntimeSurface } from "../shared/lazy-runtime.js";
|
||||
import { isPlainObject } from "../utils.js";
|
||||
import { copyChannelAgentToolMeta } from "./channel-tools.js";
|
||||
import {
|
||||
observePostCompactionLoopGuard,
|
||||
PostCompactionLoopPersistedError,
|
||||
} from "./pi-embedded-runner/post-compaction-loop-guard.js";
|
||||
import { normalizeToolName } from "./tool-policy.js";
|
||||
import type { AnyAgentTool } from "./tools/common.js";
|
||||
import { callGatewayTool } from "./tools/gateway.js";
|
||||
@@ -379,9 +383,9 @@ async function recordLoopOutcome(args: {
|
||||
const { getDiagnosticSessionState, recordToolCallOutcome } = await loadBeforeToolCallRuntime();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: args.ctx.sessionKey,
|
||||
sessionId: args.ctx?.agentId,
|
||||
sessionId: args.ctx.sessionId,
|
||||
});
|
||||
recordToolCallOutcome(sessionState, {
|
||||
const record = recordToolCallOutcome(sessionState, {
|
||||
toolName: args.toolName,
|
||||
toolParams: args.toolParams,
|
||||
toolCallId: args.toolCallId,
|
||||
@@ -390,7 +394,27 @@ async function recordLoopOutcome(args: {
|
||||
config: args.ctx.loopDetection,
|
||||
...(args.ctx.runId && { runId: args.ctx.runId }),
|
||||
});
|
||||
if (record?.resultHash) {
|
||||
const verdict = observePostCompactionLoopGuard(
|
||||
{
|
||||
sessionKey: args.ctx.sessionKey,
|
||||
sessionId: args.ctx.sessionId,
|
||||
runId: args.ctx.runId,
|
||||
},
|
||||
{
|
||||
toolName: record.toolName,
|
||||
argsHash: record.argsHash,
|
||||
resultHash: record.resultHash,
|
||||
},
|
||||
);
|
||||
if (verdict?.shouldAbort) {
|
||||
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof PostCompactionLoopPersistedError) {
|
||||
throw err;
|
||||
}
|
||||
log.warn(`tool loop outcome tracking failed: tool=${args.toolName} error=${String(err)}`);
|
||||
}
|
||||
}
|
||||
@@ -411,7 +435,7 @@ export async function runBeforeToolCallHook(args: {
|
||||
await loadBeforeToolCallRuntime();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: args.ctx.sessionKey,
|
||||
sessionId: args.ctx?.agentId,
|
||||
sessionId: args.ctx.sessionId,
|
||||
});
|
||||
|
||||
const loopScope = args.ctx.runId ? { runId: args.ctx.runId } : undefined;
|
||||
@@ -428,7 +452,7 @@ export async function runBeforeToolCallHook(args: {
|
||||
log.error(`Blocking ${toolName} due to critical loop: ${loopResult.message}`);
|
||||
logToolLoopAction({
|
||||
sessionKey: args.ctx.sessionKey,
|
||||
sessionId: args.ctx?.agentId,
|
||||
sessionId: args.ctx.sessionId,
|
||||
toolName,
|
||||
level: "critical",
|
||||
action: "block",
|
||||
@@ -451,7 +475,7 @@ export async function runBeforeToolCallHook(args: {
|
||||
log.warn(`Loop warning for ${toolName}: ${loopResult.message}`);
|
||||
logToolLoopAction({
|
||||
sessionKey: args.ctx.sessionKey,
|
||||
sessionId: args.ctx?.agentId,
|
||||
sessionId: args.ctx.sessionId,
|
||||
toolName,
|
||||
level: "warning",
|
||||
action: "warn",
|
||||
|
||||
@@ -811,41 +811,42 @@ describe("tool-loop-detection", () => {
|
||||
expect(entry?.resultHash?.length).toBe(64);
|
||||
});
|
||||
|
||||
it("increments the outcome sequence when a pre-recorded tool call receives its result", () => {
|
||||
it("returns the recorded call when a pre-recorded tool call receives its result", () => {
|
||||
const state = createState();
|
||||
const params = { action: "lookup", path: "cron.maxConcurrentRuns" };
|
||||
|
||||
recordToolCall(state, "gateway", params, "call-1");
|
||||
expect(state.toolOutcomeSeq).toBeUndefined();
|
||||
|
||||
recordToolCallOutcome(state, {
|
||||
const recorded = recordToolCallOutcome(state, {
|
||||
toolName: "gateway",
|
||||
toolParams: params,
|
||||
toolCallId: "call-1",
|
||||
result: { content: [{ type: "text", text: "same schema" }] },
|
||||
});
|
||||
|
||||
expect(state.toolOutcomeSeq).toBe(1);
|
||||
expect(recorded?.toolCallId).toBe("call-1");
|
||||
expect(state.toolCallHistory).toHaveLength(1);
|
||||
expect(state.toolCallHistory?.[0]?.resultHash).toBeTypeOf("string");
|
||||
});
|
||||
|
||||
it("keeps outcome sequence monotonic while trimming production call/outcome records", () => {
|
||||
it("returns the recorded call while trimming production call/outcome records", () => {
|
||||
const state = createState();
|
||||
let lastRecordedToolCallId: string | undefined;
|
||||
|
||||
for (let i = 0; i < TOOL_CALL_HISTORY_SIZE + 3; i += 1) {
|
||||
const params = { action: "lookup", path: `config.${i}` };
|
||||
const toolCallId = `call-${i}`;
|
||||
recordToolCall(state, "gateway", params, toolCallId);
|
||||
recordToolCallOutcome(state, {
|
||||
const recorded = recordToolCallOutcome(state, {
|
||||
toolName: "gateway",
|
||||
toolParams: params,
|
||||
toolCallId,
|
||||
result: { content: [{ type: "text", text: `schema-${i}` }] },
|
||||
});
|
||||
lastRecordedToolCallId = recorded?.toolCallId;
|
||||
}
|
||||
|
||||
expect(state.toolOutcomeSeq).toBe(TOOL_CALL_HISTORY_SIZE + 3);
|
||||
expect(lastRecordedToolCallId).toBe(`call-${TOOL_CALL_HISTORY_SIZE + 2}`);
|
||||
expect(state.toolCallHistory).toHaveLength(TOOL_CALL_HISTORY_SIZE);
|
||||
expect(state.toolCallHistory?.[0]?.toolCallId).toBe("call-3");
|
||||
});
|
||||
|
||||
@@ -678,13 +678,13 @@ export function recordToolCallOutcome(
|
||||
config?: ToolLoopDetectionConfig;
|
||||
runId?: string;
|
||||
},
|
||||
): void {
|
||||
): ToolCallRecord | undefined {
|
||||
const resolvedConfig = resolveLoopDetectionConfig(params.config);
|
||||
const runId = normalizeRunId(params.runId);
|
||||
const outcome = hashToolOutcome(params.toolName, params.toolParams, params.result, params.error);
|
||||
const resultHash = outcome.resultHash;
|
||||
if (!resultHash) {
|
||||
return;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (!state.toolCallHistory) {
|
||||
@@ -693,7 +693,7 @@ export function recordToolCallOutcome(
|
||||
|
||||
const argsHash = hashToolCall(params.toolName, params.toolParams);
|
||||
let matched = false;
|
||||
let recordedOutcome = false;
|
||||
let recordedOutcome: ToolCallRecord | undefined;
|
||||
for (let i = state.toolCallHistory.length - 1; i >= 0; i -= 1) {
|
||||
const call = state.toolCallHistory[i];
|
||||
if (!call) {
|
||||
@@ -714,12 +714,12 @@ export function recordToolCallOutcome(
|
||||
call.resultHash = resultHash;
|
||||
call.unknownToolName = outcome.unknownToolName;
|
||||
matched = true;
|
||||
recordedOutcome = true;
|
||||
recordedOutcome = call;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!matched) {
|
||||
state.toolCallHistory.push({
|
||||
const record: ToolCallRecord = {
|
||||
toolName: params.toolName,
|
||||
argsHash,
|
||||
toolCallId: params.toolCallId,
|
||||
@@ -727,17 +727,15 @@ export function recordToolCallOutcome(
|
||||
resultHash,
|
||||
unknownToolName: outcome.unknownToolName,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
recordedOutcome = true;
|
||||
}
|
||||
|
||||
if (recordedOutcome) {
|
||||
state.toolOutcomeSeq = (state.toolOutcomeSeq ?? 0) + 1;
|
||||
};
|
||||
state.toolCallHistory.push(record);
|
||||
recordedOutcome = record;
|
||||
}
|
||||
|
||||
if (state.toolCallHistory.length > resolvedConfig.historySize) {
|
||||
state.toolCallHistory.splice(0, state.toolCallHistory.length - resolvedConfig.historySize);
|
||||
}
|
||||
return recordedOutcome;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -9,14 +9,6 @@ export type SessionState = {
|
||||
state: SessionStateValue;
|
||||
queueDepth: number;
|
||||
toolCallHistory?: ToolCallRecord[];
|
||||
/**
|
||||
* Monotonic counter of observable tool outcomes recorded in
|
||||
* toolCallHistory.
|
||||
* Never decremented by trims. Lets observers detect new records via a
|
||||
* delta on the seq instead of an absolute index that breaks once the
|
||||
* history is trimmed at historySize.
|
||||
*/
|
||||
toolOutcomeSeq?: number;
|
||||
toolLoopWarningBuckets?: Map<string, number>;
|
||||
commandPollCounts?: Map<string, { count: number; lastPollAt: number }>;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user