mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:00:43 +00:00
feat(agents): wire post-compaction loop guard into pi-embedded-runner
Arms the guard at each of the three compaction-success points in run.ts and observes tool-call outcomes from the diagnostic session state's toolCallHistory after each attempt. Aborts with PostCompactionLoopPersistedError when the same (tool, args, result) triple repeats windowSize times within the post-compaction window. Refs #77474
This commit is contained in:
committed by
Peter Steinberger
parent
5b863c719e
commit
2a702f927f
@@ -67,6 +67,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Telegram/media: derive no-caption inbound media placeholders from saved MIME metadata instead of the Telegram `photo` shape, so non-image and mixed attachments no longer reach the model as `<media:image>`. Fixes #69793. Thanks @aspalagin.
|
||||
- Agents/cache: keep per-turn runtime context out of ordinary chat system prompts while still delivering hidden current-turn context, restoring prompt-cache reuse on chat continuations. Fixes #77431. Thanks @Udjin79.
|
||||
- Gateway/startup: include resolved thinking and fast-mode defaults in the `agent model` startup log line, defaulting unset startup thinking to `medium` without mixing in reasoning visibility.
|
||||
- Agents/Tools: add post-compaction loop guard in `pi-embedded-runner` that arms after auto-compaction-retry and aborts the run with `compaction_loop_persisted` when the agent emits the same `(tool, args, result)` triple `windowSize` times (default 3) within that window. Configurable via `tools.loopDetection.postCompactionGuard.{enabled,windowSize}`. Targets the failure mode where context-overflow + compaction does not break a tool-call loop. Refs #77474; carries forward #21597. Thanks @efpiva.
|
||||
- Gateway/watch: suppress sync-I/O trace output during `pnpm gateway:watch --benchmark` unless explicitly requested, so CPU profiling no longer floods the terminal with stack traces.
|
||||
- Gateway/watch: when benchmark sync-I/O tracing is explicitly enabled, tee trace blocks to the benchmark output log and filter them from the terminal pane while keeping normal Gateway logs visible.
|
||||
- Plugins/runtime-deps: include `json5` in the memory-core plugin runtime dependency set so packaged `memory_search` sandboxes can resolve generated OpenClaw runtime chunks that parse JSON5 config. Fixes #77461.
|
||||
|
||||
@@ -86,6 +86,30 @@ When a run id is available, recent tool-call history is evaluated only within th
|
||||
- disable only the detector causing issues
|
||||
- reduce `historySize` for less strict historical context
|
||||
|
||||
## Post-compaction guard
|
||||
|
||||
When the runner completes an auto-compaction-retry (after a context-overflow), it arms a short-window guard that watches the next few tool calls. If the agent emits the _same_ `(toolName, args, result)` triple multiple times within that window, the guard concludes that compaction did not break the loop and aborts the run with a `compaction_loop_persisted` error.
|
||||
|
||||
This is a separate code path from the global `tools.loopDetection` detectors. It is independently configurable:
|
||||
|
||||
```json5
|
||||
{
|
||||
tools: {
|
||||
loopDetection: {
|
||||
postCompactionGuard: {
|
||||
enabled: true, // default: true
|
||||
windowSize: 3, // default: 3
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
- `enabled`: master switch for the guard.
|
||||
- `windowSize`: number of post-compaction tool calls during which the guard stays armed _and_ the count of identical (tool, args, result) triples that triggers an abort.
|
||||
|
||||
The guard never aborts when results are changing, only when results are byte-identical across the window. It is intentionally narrow: it fires only in the immediate aftermath of a compaction-retry.
|
||||
|
||||
## Logs and expected behavior
|
||||
|
||||
When a loop is detected, OpenClaw reports a loop event and blocks or dampens the next tool-cycle depending on severity.
|
||||
|
||||
@@ -2,7 +2,6 @@ import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
createPostCompactionLoopGuard,
|
||||
PostCompactionLoopPersistedError,
|
||||
type PostCompactionLoopGuard,
|
||||
} from "./post-compaction-loop-guard.js";
|
||||
|
||||
function callOutcome(toolName: string, args: unknown, result: string) {
|
||||
|
||||
249
src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts
Normal file
249
src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts
Normal file
@@ -0,0 +1,249 @@
|
||||
import { beforeAll, beforeEach, describe, expect, it } from "vitest";
|
||||
import type {
|
||||
diagnosticSessionStates as DiagnosticSessionStatesType,
|
||||
getDiagnosticSessionState as GetDiagnosticSessionStateType,
|
||||
SessionState,
|
||||
} from "../../logging/diagnostic-session-state.js";
|
||||
import type { hashToolCall as HashToolCallType } from "../tool-loop-detection.js";
|
||||
import type { PostCompactionLoopPersistedError as PostCompactionLoopPersistedErrorType } from "./post-compaction-loop-guard.js";
|
||||
import {
|
||||
makeAttemptResult,
|
||||
makeCompactionSuccess,
|
||||
makeOverflowError,
|
||||
} from "./run.overflow-compaction.fixture.js";
|
||||
import {
|
||||
loadRunOverflowCompactionHarness,
|
||||
mockedCompactDirect,
|
||||
mockedContextEngine,
|
||||
mockedIsCompactionFailureError,
|
||||
mockedIsLikelyContextOverflowError,
|
||||
mockedLog,
|
||||
mockedRunEmbeddedAttempt,
|
||||
mockedSessionLikelyHasOversizedToolResults,
|
||||
mockedTruncateOversizedToolResultsInSession,
|
||||
overflowBaseRunParams as baseParams,
|
||||
} from "./run.overflow-compaction.harness.js";
|
||||
|
||||
let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent;
|
||||
// These need to be imported AFTER loadRunOverflowCompactionHarness so that
|
||||
// they reference the same module instances the (re-imported) runner uses.
|
||||
// vi.resetModules() inside the harness invalidates any earlier import.
|
||||
let diagnosticSessionStates: typeof DiagnosticSessionStatesType;
|
||||
let getDiagnosticSessionState: typeof GetDiagnosticSessionStateType;
|
||||
let hashToolCall: typeof HashToolCallType;
|
||||
let PostCompactionLoopPersistedError: typeof PostCompactionLoopPersistedErrorType;
|
||||
|
||||
function recordToolOutcome(
|
||||
state: SessionState,
|
||||
toolName: string,
|
||||
toolParams: unknown,
|
||||
resultHash: string,
|
||||
runId?: string,
|
||||
): void {
|
||||
if (!state.toolCallHistory) {
|
||||
state.toolCallHistory = [];
|
||||
}
|
||||
state.toolCallHistory.push({
|
||||
toolName,
|
||||
argsHash: hashToolCall(toolName, toolParams),
|
||||
resultHash,
|
||||
timestamp: Date.now(),
|
||||
...(runId ? { runId } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
|
||||
beforeAll(async () => {
|
||||
({ runEmbeddedPiAgent } = await loadRunOverflowCompactionHarness());
|
||||
// Re-import after the harness reset so we share module instances with
|
||||
// the runner. The runner imports both modules through its own graph.
|
||||
({ diagnosticSessionStates, getDiagnosticSessionState } =
|
||||
await import("../../logging/diagnostic-session-state.js"));
|
||||
({ hashToolCall } = await import("../tool-loop-detection.js"));
|
||||
({ PostCompactionLoopPersistedError } = await import("./post-compaction-loop-guard.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
diagnosticSessionStates.clear();
|
||||
mockedRunEmbeddedAttempt.mockReset();
|
||||
mockedCompactDirect.mockReset();
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReset();
|
||||
mockedTruncateOversizedToolResultsInSession.mockReset();
|
||||
mockedContextEngine.info.ownsCompaction = false;
|
||||
mockedLog.debug.mockReset();
|
||||
mockedLog.info.mockReset();
|
||||
mockedLog.warn.mockReset();
|
||||
mockedLog.error.mockReset();
|
||||
mockedLog.isEnabled.mockReset();
|
||||
mockedLog.isEnabled.mockReturnValue(false);
|
||||
mockedIsCompactionFailureError.mockImplementation((msg?: string) => {
|
||||
if (!msg) {
|
||||
return false;
|
||||
}
|
||||
const lower = msg.toLowerCase();
|
||||
return lower.includes("request_too_large") && lower.includes("summarization failed");
|
||||
});
|
||||
mockedIsLikelyContextOverflowError.mockImplementation((msg?: string) => {
|
||||
if (!msg) {
|
||||
return false;
|
||||
}
|
||||
const lower = msg.toLowerCase();
|
||||
return (
|
||||
lower.includes("request_too_large") ||
|
||||
lower.includes("request size exceeds") ||
|
||||
lower.includes("context window exceeded") ||
|
||||
lower.includes("prompt too large")
|
||||
);
|
||||
});
|
||||
mockedCompactDirect.mockResolvedValue({
|
||||
ok: false,
|
||||
compacted: false,
|
||||
reason: "nothing to compact",
|
||||
});
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false);
|
||||
mockedTruncateOversizedToolResultsInSession.mockResolvedValue({
|
||||
truncated: false,
|
||||
truncatedCount: 0,
|
||||
reason: "no oversized tool results",
|
||||
});
|
||||
});
|
||||
|
||||
it("aborts the run with PostCompactionLoopPersistedError when identical (tool, args, result) repeats windowSize times after compaction", async () => {
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
|
||||
// Attempt 1: overflow → triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2: post-compaction. The wrapped tool layer would have
|
||||
// recorded `windowSize` identical (tool, args, result) outcomes during
|
||||
// this single attempt. The runner's after-attempt guard observation
|
||||
// sees all three at once, accumulates matches, and aborts on the third.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
for (let i = 0; i < 3; i += 1) {
|
||||
recordToolOutcome(
|
||||
sessionState,
|
||||
"gateway",
|
||||
{ action: "lookup", path: "x" },
|
||||
"identical-result",
|
||||
baseParams.runId,
|
||||
);
|
||||
}
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "gateway" }, { toolName: "gateway" }, { toolName: "gateway" }],
|
||||
});
|
||||
});
|
||||
|
||||
mockedCompactDirect.mockResolvedValueOnce(
|
||||
makeCompactionSuccess({
|
||||
summary: "Compacted session",
|
||||
firstKeptEntryId: "entry-5",
|
||||
tokensBefore: 150000,
|
||||
}),
|
||||
);
|
||||
|
||||
await expect(runEmbeddedPiAgent(baseParams)).rejects.toBeInstanceOf(
|
||||
PostCompactionLoopPersistedError,
|
||||
);
|
||||
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("does not abort when the result hash changes across post-compaction attempts (progress was made)", async () => {
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
|
||||
// Attempt 1: overflow → triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2 (post-compaction): identical args, but DIFFERENT result hash
|
||||
// each time. Only one further attempt is needed since the runner exits
|
||||
// on a successful prompt with no further retry trigger.
|
||||
let callCounter = 0;
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
callCounter += 1;
|
||||
recordToolOutcome(
|
||||
sessionState,
|
||||
"gateway",
|
||||
{ action: "lookup", path: "x" },
|
||||
`result-${callCounter}`,
|
||||
baseParams.runId,
|
||||
);
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "gateway" }],
|
||||
});
|
||||
});
|
||||
|
||||
mockedCompactDirect.mockResolvedValueOnce(
|
||||
makeCompactionSuccess({
|
||||
summary: "Compacted session",
|
||||
firstKeptEntryId: "entry-5",
|
||||
tokensBefore: 150000,
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await runEmbeddedPiAgent(baseParams);
|
||||
expect(result.meta.error).toBeUndefined();
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("disarms after windowSize observations regardless of match, so later identical calls do not abort", async () => {
|
||||
// Use windowSize: 2 so the guard disarms after 2 observations.
|
||||
const overflowError = makeOverflowError();
|
||||
const sessionState = getDiagnosticSessionState({
|
||||
sessionKey: baseParams.sessionKey,
|
||||
sessionId: baseParams.sessionId,
|
||||
});
|
||||
|
||||
// Attempt 1: overflow → triggers compaction.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () =>
|
||||
makeAttemptResult({ promptError: overflowError }),
|
||||
);
|
||||
// Attempt 2 (post-compaction): two distinct records → window full,
|
||||
// guard disarms with no abort. We then append more identical records
|
||||
// afterwards in this test to confirm they are not observed by the guard.
|
||||
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
|
||||
recordToolOutcome(sessionState, "read", { path: "/a" }, "ra", baseParams.runId);
|
||||
recordToolOutcome(sessionState, "write", { path: "/b" }, "rb", baseParams.runId);
|
||||
return makeAttemptResult({
|
||||
promptError: null,
|
||||
toolMetas: [{ toolName: "read" }, { toolName: "write" }],
|
||||
});
|
||||
});
|
||||
|
||||
mockedCompactDirect.mockResolvedValueOnce(
|
||||
makeCompactionSuccess({
|
||||
summary: "Compacted session",
|
||||
firstKeptEntryId: "entry-5",
|
||||
tokensBefore: 150000,
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await runEmbeddedPiAgent({
|
||||
...baseParams,
|
||||
config: {
|
||||
tools: {
|
||||
loopDetection: {
|
||||
postCompactionGuard: { enabled: true, windowSize: 2 },
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
expect(result.meta.error).toBeUndefined();
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,7 @@ import { emitAgentPlanEvent } from "../../infra/agent-events.js";
|
||||
import { sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { freezeDiagnosticTraceContext } from "../../infra/diagnostic-trace-context.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { getDiagnosticSessionState } from "../../logging/diagnostic-session-state.js";
|
||||
import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-context.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js";
|
||||
@@ -92,6 +93,10 @@ import { resolveEmbeddedRunFailureSignal } from "./failure-signal.js";
|
||||
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import { resolveModelAsync } from "./model.js";
|
||||
import {
|
||||
createPostCompactionLoopGuard,
|
||||
PostCompactionLoopPersistedError,
|
||||
} from "./post-compaction-loop-guard.js";
|
||||
import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js";
|
||||
import { handleAssistantFailover } from "./run/assistant-failover.js";
|
||||
import {
|
||||
@@ -782,6 +787,24 @@ export async function runEmbeddedPiAgent(
|
||||
// unit-tested in run/idle-timeout-breaker.test.ts; the run loop just
|
||||
// feeds it the outcome of each attempt.
|
||||
const idleTimeoutBreakerState = createIdleTimeoutBreakerState();
|
||||
// Post-compaction loop guard for #77474. Armed at each compaction-success
|
||||
// site below; observes tool-call outcomes from the diagnostic session
|
||||
// state's toolCallHistory after each attempt. Aborts the run when the
|
||||
// same (tool, args, result) triple repeats windowSize times within the
|
||||
// post-compaction window.
|
||||
const postCompactionGuard = createPostCompactionLoopGuard(
|
||||
params.config?.tools?.loopDetection?.postCompactionGuard,
|
||||
);
|
||||
let lastObservedToolCallHistoryIndex = (() => {
|
||||
if (!params.sessionKey && !params.sessionId) {
|
||||
return 0;
|
||||
}
|
||||
const state = getDiagnosticSessionState({
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
});
|
||||
return state.toolCallHistory?.length ?? 0;
|
||||
})();
|
||||
let lastRetryFailoverReason: FailoverReason | null = null;
|
||||
let planningOnlyRetryInstruction: string | null = null;
|
||||
let reasoningOnlyRetryInstruction: string | null = null;
|
||||
@@ -1193,6 +1216,53 @@ export async function runEmbeddedPiAgent(
|
||||
});
|
||||
const attempt = normalizeEmbeddedRunAttemptResult(rawAttempt);
|
||||
|
||||
// Post-compaction loop guard observation. Reads any new tool-call
|
||||
// records that completed during this attempt (populated by the
|
||||
// before-tool-call hook's recordToolCallOutcome) and feeds them
|
||||
// into the guard. Disarms automatically once the window expires.
|
||||
if (postCompactionGuard.snapshot().armed) {
|
||||
const guardSessionState =
|
||||
params.sessionKey || params.sessionId
|
||||
? getDiagnosticSessionState({
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
})
|
||||
: undefined;
|
||||
const history = guardSessionState?.toolCallHistory ?? [];
|
||||
for (let i = lastObservedToolCallHistoryIndex; i < history.length; i += 1) {
|
||||
const record = history[i];
|
||||
if (!record || !record.resultHash) {
|
||||
continue;
|
||||
}
|
||||
if (params.runId && record.runId && record.runId !== params.runId) {
|
||||
continue;
|
||||
}
|
||||
const verdict = postCompactionGuard.observe({
|
||||
toolName: record.toolName,
|
||||
argsHash: record.argsHash,
|
||||
resultHash: record.resultHash,
|
||||
});
|
||||
if (verdict.shouldAbort) {
|
||||
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
|
||||
}
|
||||
if (!postCompactionGuard.snapshot().armed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
lastObservedToolCallHistoryIndex = history.length;
|
||||
} else {
|
||||
// Keep index aligned with current history length so freshly armed
|
||||
// windows only see records from the post-compaction-retry attempt.
|
||||
const guardSessionState =
|
||||
params.sessionKey || params.sessionId
|
||||
? getDiagnosticSessionState({
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
})
|
||||
: undefined;
|
||||
lastObservedToolCallHistoryIndex = guardSessionState?.toolCallHistory?.length ?? 0;
|
||||
}
|
||||
|
||||
const {
|
||||
aborted,
|
||||
externalAbort,
|
||||
@@ -1461,6 +1531,7 @@ export async function runEmbeddedPiAgent(
|
||||
log.info(
|
||||
`[timeout-compaction] compaction succeeded for ${provider}/${modelId}; retrying prompt`,
|
||||
);
|
||||
postCompactionGuard.armPostCompaction();
|
||||
continue;
|
||||
} else {
|
||||
log.warn(
|
||||
@@ -1650,6 +1721,7 @@ export async function runEmbeddedPiAgent(
|
||||
}
|
||||
autoCompactionCount += 1;
|
||||
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);
|
||||
postCompactionGuard.armPostCompaction();
|
||||
if (preflightRecovery?.source === "mid-turn") {
|
||||
continueFromCurrentTranscript();
|
||||
} else if (
|
||||
@@ -2425,6 +2497,7 @@ export async function runEmbeddedPiAgent(
|
||||
`compaction interrupted visible final answer: runId=${params.runId} sessionId=${params.sessionId} ` +
|
||||
`compactions=${attemptCompactionCount} — retrying ${compactionContinuationRetryAttempts}/1 with compacted-transcript continuation`,
|
||||
);
|
||||
postCompactionGuard.armPostCompaction();
|
||||
continue;
|
||||
}
|
||||
compactionContinuationRetryInstruction = null;
|
||||
|
||||
@@ -28972,6 +28972,14 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
help: "Plugin-defined configuration payload interpreted by that plugin's own schema and validation rules. Use only documented fields from the plugin to prevent ignored or invalid settings.",
|
||||
tags: ["advanced"],
|
||||
},
|
||||
"tools.loopDetection.postCompactionGuard.enabled": {
|
||||
help: "Enable the post-compaction loop guard that aborts the run when the agent repeats the same (tool, args, result) triple windowSize times immediately after auto-compaction-retry (default: true).",
|
||||
tags: ["tools"],
|
||||
},
|
||||
"tools.loopDetection.postCompactionGuard.windowSize": {
|
||||
help: "Number of post-compaction attempts during which the guard stays armed (default: 3). Lower values are stricter; higher values give the agent more attempts before abort.",
|
||||
tags: ["tools"],
|
||||
},
|
||||
"models.providers.*.headers.*": {
|
||||
sensitive: true,
|
||||
tags: ["security", "models"],
|
||||
|
||||
Reference in New Issue
Block a user