mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix(agents): make post-compaction guard config valid + observation trim-resilient
Two correctness fixes from code review. 1. Zod schema (src/config/zod-schema.agent-runtime.ts) was strict and rejected tools.loopDetection.postCompactionGuard.* keys at validation time, making the guard's documented configurability inaccessible at gateway startup. Adds ToolLoopPostCompactionGuardSchema with both optional fields and wires it into ToolLoopDetectionSchema. 2. The runner observation cursor in pi-embedded-runner/run.ts used absolute indices into state.toolCallHistory, but that array is trimmed at historySize (default 30). Once the buffer was full, new records shifted out from under the cursor and the guard silently missed every loop in long-running sessions. Replaces the index cursor with a monotonic toolOutcomeSeq on SessionState that recordToolCallOutcome bumps on each observable push (unmatched branch only, mirroring the prior cursor's effective semantics). The runner now reads the most recent (currentSeq - lastSeq) entries from the tail of toolCallHistory, which is trim-resilient. Adds zod parse tests for the new config keys (valid, empty, unknown key, non-positive, non-integer) and a runner regression test that seeds toolCallHistory at the trim cap before triggering a post-compaction loop, asserting the abort still fires. Refs #77474
This commit is contained in:
committed by
Peter Steinberger
parent
2a702f927f
commit
4c4825679b
@@ -1,4 +1,4 @@
|
||||
7526a2ec39c5bbe905f8347f900d812f28dd65caaf3f0172d1e9692fb186dcd3 config-baseline.json
|
||||
baa4a12a48a03755cab302629a1f6b65b2e826c091a987d68de5989d39e8ed35 config-baseline.core.json
|
||||
a963f7242f75fbc137a7891b8056c199b42eeb8f4887aeaaa079bff6394feb89 config-baseline.json
|
||||
51f525cfffa40659d6273e56705e91f3cb1255b217dadab59c32f77e0f770b37 config-baseline.core.json
|
||||
cd7c0c7fb1435bc7e59099e9ac334462d5ad444016e9ab4512aae63a238f78dc config-baseline.channel.json
|
||||
9832b30a696930a3da7efccf38073137571e1b66cae84e54d747b733fdafcc54 config-baseline.plugin.json
|
||||
|
||||
@@ -33,6 +33,11 @@ let getDiagnosticSessionState: typeof GetDiagnosticSessionStateType;
|
||||
let hashToolCall: typeof HashToolCallType;
|
||||
let PostCompactionLoopPersistedError: typeof PostCompactionLoopPersistedErrorType;
|
||||
|
||||
// Mirror the production trim cap (resolveLoopDetectionConfig default
|
||||
// historySize = 30). The trim is what makes the seq-based observation
|
||||
// non-trivially better than an absolute index cursor.
|
||||
const HISTORY_TRIM_CAP = 30;
|
||||
|
||||
function recordToolOutcome(
|
||||
state: SessionState,
|
||||
toolName: string,
|
||||
@@ -50,6 +55,13 @@ function recordToolOutcome(
|
||||
timestamp: Date.now(),
|
||||
...(runId ? { runId } : {}),
|
||||
});
|
||||
if (state.toolCallHistory.length > HISTORY_TRIM_CAP) {
|
||||
state.toolCallHistory.splice(0, state.toolCallHistory.length - HISTORY_TRIM_CAP);
|
||||
}
|
||||
// Mirror recordToolCallOutcome's unmatched-push branch: bump the monotonic
|
||||
// outcome seq the runner uses to detect new records without an absolute
|
||||
// index into the (trim-prone) toolCallHistory array.
|
||||
state.toolOutcomeSeq = (state.toolOutcomeSeq ?? 0) + 1;
|
||||
}
|
||||
|
||||
describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
@@ -246,4 +258,68 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("aborts post-compaction loop even when toolCallHistory is at its trim cap (regression: index-cursor blind spot in long-running sessions)", async () => {
|
||||
// Long-running sessions accumulate up to historySize (default 30) records
|
||||
// in toolCallHistory. Pushing more entries triggers trim, which would
|
||||
// shift records out from under an absolute index cursor and let the
|
||||
// guard silently miss every loop. The seq-based observation must still
|
||||
// see the new records via the tail-slice path.
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
|
||||
// Pre-fill history to the default trim cap with distinct entries that
|
||||
// pre-date the run. This puts the guard's cursor right at the trim
|
||||
// boundary before the post-compaction window opens.
|
||||
for (let i = 0; i < HISTORY_TRIM_CAP; i += 1) {
|
||||
recordToolOutcome(sessionState, "seed", { iter: i }, `seed-result-${i}`, baseParams.runId);
|
||||
}
|
||||
expect(sessionState.toolCallHistory?.length).toBe(HISTORY_TRIM_CAP);
|
||||
|
||||
// Attempt 1: overflow -> triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2 (post-compaction): three identical records appended while
|
||||
// history is already at the cap. These pushes trigger trim, shifting
|
||||
// older entries out. With the old index-cursor scheme, length never
|
||||
// grew so the observation loop never ran. With the seq-based scheme,
|
||||
// the tail of length-30 history contains the three new records and
|
||||
// the guard aborts on the third match.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
for (let i = 0; i < 3; i += 1) {
|
||||
recordToolOutcome(
|
||||
sessionState,
|
||||
"gateway",
|
||||
{ action: "lookup", path: "x" },
|
||||
"identical-result",
|
||||
baseParams.runId,
|
||||
);
|
||||
}
|
||||
// History is still capped at HISTORY_TRIM_CAP after the trim.
|
||||
expect(sessionState.toolCallHistory?.length).toBe(HISTORY_TRIM_CAP);
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "gateway" }, { toolName: "gateway" }, { toolName: "gateway" }],
|
||||
});
|
||||
});
|
||||
|
||||
mockedCompactDirect.mockResolvedValueOnce(
|
||||
makeCompactionSuccess({
|
||||
summary: "Compacted session",
|
||||
firstKeptEntryId: "entry-5",
|
||||
tokensBefore: 150000,
|
||||
}),
|
||||
);
|
||||
|
||||
await expect(runEmbeddedPiAgent(baseParams)).rejects.toBeInstanceOf(
|
||||
PostCompactionLoopPersistedError,
|
||||
);
|
||||
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -795,7 +795,12 @@ export async function runEmbeddedPiAgent(
|
||||
const postCompactionGuard = createPostCompactionLoopGuard(
|
||||
params.config?.tools?.loopDetection?.postCompactionGuard,
|
||||
);
|
||||
let lastObservedToolCallHistoryIndex = (() => {
|
||||
// Monotonic outcome seq (incremented by recordToolCallOutcome on each
|
||||
// observable push). We use a delta on this counter instead of an
|
||||
// absolute index into state.toolCallHistory, which is trimmed at
|
||||
// historySize and would silently shift records out from under an
|
||||
// index cursor in long-running sessions.
|
||||
let lastObservedToolOutcomeSeq = (() => {
|
||||
if (!params.sessionKey && !params.sessionId) {
|
||||
return 0;
|
||||
}
|
||||
@@ -803,7 +808,7 @@ export async function runEmbeddedPiAgent(
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
});
|
||||
return state.toolCallHistory?.length ?? 0;
|
||||
return state.toolOutcomeSeq ?? 0;
|
||||
})();
|
||||
let lastRetryFailoverReason: FailoverReason | null = null;
|
||||
let planningOnlyRetryInstruction: string | null = null;
|
||||
@@ -1220,6 +1225,16 @@ export async function runEmbeddedPiAgent(
|
||||
// records that completed during this attempt (populated by the
|
||||
// before-tool-call hook's recordToolCallOutcome) and feeds them
|
||||
// into the guard. Disarms automatically once the window expires.
|
||||
//
|
||||
// Cursor scheme: rather than index into state.toolCallHistory
|
||||
// (which trims at historySize and silently drops records on busy
|
||||
// sessions), we read state.toolOutcomeSeq, a monotonic counter
|
||||
// that recordToolCallOutcome increments on every observable push.
|
||||
// The delta currentSeq - lastObservedSeq tells us how many new
|
||||
// records have appeared globally; we then scan that many entries
|
||||
// from the tail of toolCallHistory. The tail-slice is trim-safe:
|
||||
// even if the buffer was full, the most recent N records are the
|
||||
// ones that survive.
|
||||
if (postCompactionGuard.snapshot().armed) {
|
||||
const guardSessionState =
|
||||
params.sessionKey || params.sessionId
|
||||
@@ -1229,30 +1244,35 @@ export async function runEmbeddedPiAgent(
|
||||
})
|
||||
: undefined;
|
||||
const history = guardSessionState?.toolCallHistory ?? [];
|
||||
for (let i = lastObservedToolCallHistoryIndex; i < history.length; i += 1) {
|
||||
const record = history[i];
|
||||
if (!record || !record.resultHash) {
|
||||
continue;
|
||||
}
|
||||
if (params.runId && record.runId && record.runId !== params.runId) {
|
||||
continue;
|
||||
}
|
||||
const verdict = postCompactionGuard.observe({
|
||||
toolName: record.toolName,
|
||||
argsHash: record.argsHash,
|
||||
resultHash: record.resultHash,
|
||||
});
|
||||
if (verdict.shouldAbort) {
|
||||
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
|
||||
}
|
||||
if (!postCompactionGuard.snapshot().armed) {
|
||||
break;
|
||||
const currentSeq = guardSessionState?.toolOutcomeSeq ?? 0;
|
||||
const newRecordCount = Math.max(0, currentSeq - lastObservedToolOutcomeSeq);
|
||||
if (newRecordCount > 0) {
|
||||
const startIndex = Math.max(0, history.length - newRecordCount);
|
||||
for (let i = startIndex; i < history.length; i += 1) {
|
||||
const record = history[i];
|
||||
if (!record || typeof record.resultHash !== "string") {
|
||||
continue;
|
||||
}
|
||||
if (params.runId && record.runId && record.runId !== params.runId) {
|
||||
continue;
|
||||
}
|
||||
const verdict = postCompactionGuard.observe({
|
||||
toolName: record.toolName,
|
||||
argsHash: record.argsHash,
|
||||
resultHash: record.resultHash,
|
||||
});
|
||||
if (verdict.shouldAbort) {
|
||||
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
|
||||
}
|
||||
if (!postCompactionGuard.snapshot().armed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
lastObservedToolCallHistoryIndex = history.length;
|
||||
lastObservedToolOutcomeSeq = currentSeq;
|
||||
} else {
|
||||
// Keep index aligned with current history length so freshly armed
|
||||
// windows only see records from the post-compaction-retry attempt.
|
||||
// Keep cursor aligned with the current global outcome seq so a
|
||||
// freshly-armed window only sees records pushed AFTER arming.
|
||||
const guardSessionState =
|
||||
params.sessionKey || params.sessionId
|
||||
? getDiagnosticSessionState({
|
||||
@@ -1260,7 +1280,7 @@ export async function runEmbeddedPiAgent(
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
})
|
||||
: undefined;
|
||||
lastObservedToolCallHistoryIndex = guardSessionState?.toolCallHistory?.length ?? 0;
|
||||
lastObservedToolOutcomeSeq = guardSessionState?.toolOutcomeSeq ?? 0;
|
||||
}
|
||||
|
||||
const {
|
||||
|
||||
@@ -726,6 +726,12 @@ export function recordToolCallOutcome(
|
||||
unknownToolName: outcome.unknownToolName,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
// Bump the monotonic outcome counter only when an observable record (one
|
||||
// with resultHash already populated) is appended. Matched updates set
|
||||
// resultHash on a record that the post-compaction guard's cursor has
|
||||
// already passed, so they were never observable under the prior index
|
||||
// scheme either. See logging/diagnostic-session-state.ts for the field.
|
||||
state.toolOutcomeSeq = (state.toolOutcomeSeq ?? 0) + 1;
|
||||
}
|
||||
|
||||
if (state.toolCallHistory.length > resolvedConfig.historySize) {
|
||||
|
||||
@@ -8331,6 +8331,20 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
postCompactionGuard: {
|
||||
type: "object",
|
||||
properties: {
|
||||
enabled: {
|
||||
type: "boolean",
|
||||
},
|
||||
windowSize: {
|
||||
type: "integer",
|
||||
exclusiveMinimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
@@ -18255,6 +18269,24 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
postCompactionGuard: {
|
||||
type: "object",
|
||||
properties: {
|
||||
enabled: {
|
||||
type: "boolean",
|
||||
description:
|
||||
"Enable the post-compaction loop guard that aborts the run when the agent repeats the same (tool, args, result) triple windowSize times immediately after auto-compaction-retry (default: true).",
|
||||
},
|
||||
windowSize: {
|
||||
type: "integer",
|
||||
exclusiveMinimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
description:
|
||||
"Number of post-compaction attempts during which the guard stays armed (default: 3). Lower values are stricter; higher values give the agent more attempts before abort.",
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
|
||||
@@ -502,6 +502,14 @@ const ToolLoopDetectionDetectorSchema = z
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
const ToolLoopPostCompactionGuardSchema = z
|
||||
.object({
|
||||
enabled: z.boolean().optional(),
|
||||
windowSize: z.number().int().positive().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
const ToolLoopDetectionSchema = z
|
||||
.object({
|
||||
enabled: z.boolean().optional(),
|
||||
@@ -511,6 +519,7 @@ const ToolLoopDetectionSchema = z
|
||||
criticalThreshold: z.number().int().positive().optional(),
|
||||
globalCircuitBreakerThreshold: z.number().int().positive().optional(),
|
||||
detectors: ToolLoopDetectionDetectorSchema,
|
||||
postCompactionGuard: ToolLoopPostCompactionGuardSchema,
|
||||
})
|
||||
.strict()
|
||||
.superRefine((value, ctx) => {
|
||||
|
||||
81
src/config/zod-schema.post-compaction-guard.test.ts
Normal file
81
src/config/zod-schema.post-compaction-guard.test.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { ToolsSchema } from "./zod-schema.agent-runtime.js";
|
||||
import { OpenClawSchema } from "./zod-schema.js";
|
||||
|
||||
describe("OpenClawSchema tools.loopDetection.postCompactionGuard validation", () => {
|
||||
it("accepts tools.loopDetection.postCompactionGuard configuration", () => {
|
||||
const result = OpenClawSchema.safeParse({
|
||||
tools: {
|
||||
loopDetection: {
|
||||
enabled: true,
|
||||
postCompactionGuard: {
|
||||
enabled: false,
|
||||
windowSize: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts an empty postCompactionGuard object", () => {
|
||||
const result = OpenClawSchema.safeParse({
|
||||
tools: {
|
||||
loopDetection: {
|
||||
postCompactionGuard: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects unknown keys under tools.loopDetection.postCompactionGuard", () => {
|
||||
const result = OpenClawSchema.safeParse({
|
||||
tools: {
|
||||
loopDetection: {
|
||||
postCompactionGuard: {
|
||||
enabled: true,
|
||||
windowSize: 3,
|
||||
bogus: "key",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it("rejects non-positive windowSize", () => {
|
||||
const result = OpenClawSchema.safeParse({
|
||||
tools: {
|
||||
loopDetection: {
|
||||
postCompactionGuard: {
|
||||
windowSize: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it("rejects non-integer windowSize", () => {
|
||||
const result = OpenClawSchema.safeParse({
|
||||
tools: {
|
||||
loopDetection: {
|
||||
postCompactionGuard: {
|
||||
windowSize: 2.5,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it("validates via ToolsSchema directly", () => {
|
||||
const result = ToolsSchema.safeParse({
|
||||
loopDetection: {
|
||||
postCompactionGuard: { enabled: true, windowSize: 4 },
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,14 @@ export type SessionState = {
|
||||
state: SessionStateValue;
|
||||
queueDepth: number;
|
||||
toolCallHistory?: ToolCallRecord[];
|
||||
/**
|
||||
* Monotonic counter of observable tool-outcome records ever pushed to
|
||||
* toolCallHistory (i.e. unmatched-push branch of recordToolCallOutcome).
|
||||
* Never decremented by trims. Lets observers detect new records via a
|
||||
* delta on the seq instead of an absolute index that breaks once the
|
||||
* history is trimmed at historySize.
|
||||
*/
|
||||
toolOutcomeSeq?: number;
|
||||
toolLoopWarningBuckets?: Map<string, number>;
|
||||
commandPollCounts?: Map<string, { count: number; lastPollAt: number }>;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user