refactor(agents): thread post-compaction guard observer

This commit is contained in:
Peter Steinberger
2026-05-05 00:28:28 +01:00
parent e0fafdcc1d
commit 1af6855bb0
7 changed files with 71 additions and 101 deletions

View File

@@ -29,12 +29,6 @@ export type PostCompactionLoopGuard = {
snapshot: () => { armed: boolean; remainingAttempts: number };
};
export type PostCompactionGuardScope = {
sessionKey?: string;
sessionId?: string;
runId?: string;
};
type GuardState = {
enabled: boolean;
windowSize: number;
@@ -42,8 +36,6 @@ type GuardState = {
history: PostCompactionGuardObservation[];
};
const activeGuards = new Map<string, PostCompactionLoopGuard>();
function asPositiveInt(value: number | undefined, fallback: number): number {
if (typeof value !== "number" || !Number.isInteger(value) || value <= 0) {
return fallback;
@@ -113,56 +105,6 @@ export function createPostCompactionLoopGuard(
return { armPostCompaction, observe, snapshot };
}
function normalizeScopePart(value: string | undefined): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
function scopeKeys(scope: PostCompactionGuardScope): string[] {
const runId = normalizeScopePart(scope.runId);
const keys: string[] = [];
for (const [kind, id] of [
["sessionKey", normalizeScopePart(scope.sessionKey)],
["sessionId", normalizeScopePart(scope.sessionId)],
] as const) {
if (!id) {
continue;
}
keys.push(runId ? `${kind}:${id}:run:${runId}` : `${kind}:${id}`);
}
return keys;
}
export function registerPostCompactionLoopGuard(
scope: PostCompactionGuardScope,
guard: PostCompactionLoopGuard,
): () => void {
const keys = scopeKeys(scope);
for (const key of keys) {
activeGuards.set(key, guard);
}
return () => {
for (const key of keys) {
if (activeGuards.get(key) === guard) {
activeGuards.delete(key);
}
}
};
}
export function observePostCompactionLoopGuard(
scope: PostCompactionGuardScope,
call: PostCompactionGuardObservation,
): PostCompactionGuardVerdict | undefined {
for (const key of scopeKeys(scope)) {
const guard = activeGuards.get(key);
if (guard) {
return guard.observe(call);
}
}
return undefined;
}
export class PostCompactionLoopPersistedError extends Error {
readonly detector: "compaction_loop_persisted";
readonly count: number;

View File

@@ -4,7 +4,10 @@ import type {
getDiagnosticSessionState as GetDiagnosticSessionStateType,
SessionState,
} from "../../logging/diagnostic-session-state.js";
import type { wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType } from "../pi-tools.before-tool-call.js";
import type {
ToolOutcomeObserver,
wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType,
} from "../pi-tools.before-tool-call.js";
import type {
recordToolCall as RecordToolCallType,
recordToolCallOutcome as RecordToolCallOutcomeType,
@@ -72,6 +75,7 @@ async function executeWrappedToolOutcome(
toolName: string,
toolParams: unknown,
result: unknown,
onToolOutcome?: ToolOutcomeObserver,
runId = baseParams.runId,
): Promise<unknown> {
const tool = wrapToolWithBeforeToolCallHook(
@@ -84,6 +88,7 @@ async function executeWrappedToolOutcome(
sessionKey: baseParams.sessionKey,
sessionId: baseParams.sessionId,
runId,
onToolOutcome,
},
);
liveToolCallSeq += 1;
@@ -159,12 +164,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
// Attempt 2: post-compaction. The live wrapped-tool path records each
// outcome while the prompt is still running; the third identical result
// aborts before the attempt can return.
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
.onToolOutcome;
for (let i = 0; i < 3; i += 1) {
await executeWrappedToolOutcome(
"gateway",
{ action: "lookup", path: "x" },
"identical-result",
onToolOutcome,
);
}
attemptReturned = true;
@@ -200,9 +208,16 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
// Attempt 2 (post-compaction): identical args, but DIFFERENT result hash
// each time. This fills the window without triggering the persisted-loop
// abort because the tool is making progress.
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
.onToolOutcome;
for (let i = 0; i < 3; i += 1) {
await executeWrappedToolOutcome("gateway", { action: "lookup", path: "x" }, `result-${i}`);
await executeWrappedToolOutcome(
"gateway",
{ action: "lookup", path: "x" },
`result-${i}`,
onToolOutcome,
);
}
return makeAttemptResult({
promptError: null,
@@ -235,9 +250,11 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
// Attempt 2 (post-compaction): two distinct records → window full,
// guard disarms with no abort. We then append more identical records
// afterwards in this test to confirm they are not observed by the guard.
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
await executeWrappedToolOutcome("read", { path: "/a" }, "ra");
await executeWrappedToolOutcome("write", { path: "/b" }, "rb");
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
.onToolOutcome;
await executeWrappedToolOutcome("read", { path: "/a" }, "ra", onToolOutcome);
await executeWrappedToolOutcome("write", { path: "/b" }, "rb", onToolOutcome);
return makeAttemptResult({
promptError: null,
toolMetas: [{ toolName: "read" }, { toolName: "write" }],
@@ -293,12 +310,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
// Attempt 2 (post-compaction): three identical live tool outcomes while
// history is already at the cap. The guard aborts on the third result
// before the mocked attempt can return.
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
.onToolOutcome;
for (let i = 0; i < 3; i += 1) {
await executeWrappedToolOutcome(
"gateway",
{ action: "lookup", path: "x" },
"identical-result",
onToolOutcome,
);
}
// History is still capped at HISTORY_TRIM_CAP after the trim.

View File

@@ -94,7 +94,8 @@ import { log } from "./logger.js";
import { resolveModelAsync } from "./model.js";
import {
createPostCompactionLoopGuard,
registerPostCompactionLoopGuard,
PostCompactionLoopPersistedError,
type PostCompactionGuardObservation,
} from "./post-compaction-loop-guard.js";
import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js";
import { handleAssistantFailover } from "./run/assistant-failover.js";
@@ -792,14 +793,14 @@ export async function runEmbeddedPiAgent(
const postCompactionGuard = createPostCompactionLoopGuard(
params.config?.tools?.loopDetection?.postCompactionGuard,
);
const unregisterPostCompactionGuard = registerPostCompactionLoopGuard(
{
sessionKey: params.sessionKey,
sessionId: params.sessionId,
runId: params.runId,
},
postCompactionGuard,
);
const observePostCompactionToolOutcome = (
observation: PostCompactionGuardObservation,
): void => {
const verdict = postCompactionGuard.observe(observation);
if (verdict.shouldAbort) {
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
}
};
let lastRetryFailoverReason: FailoverReason | null = null;
let planningOnlyRetryInstruction: string | null = null;
let reasoningOnlyRetryInstruction: string | null = null;
@@ -1160,6 +1161,7 @@ export async function runEmbeddedPiAgent(
agentId: workspaceResolution.agentId,
legacyBeforeAgentStartResult,
thinkLevel,
onToolOutcome: observePostCompactionToolOutcome,
fastMode: params.fastMode,
verboseLevel: params.verboseLevel,
reasoningLevel: params.reasoningLevel,
@@ -2786,7 +2788,6 @@ export async function runEmbeddedPiAgent(
};
}
} finally {
unregisterPostCompactionGuard();
forgetPromptBuildDrainCacheForRun(params.runId);
stopRuntimeAuthRefreshTimer();
await runAgentCleanupStep({

View File

@@ -928,6 +928,7 @@ export async function runEmbeddedAttempt(
forceHeartbeatTool: params.forceHeartbeatTool,
authProfileStore: params.authProfileStore,
recordToolPrepStage: (name) => corePluginToolStages.mark(name),
onToolOutcome: params.onToolOutcome,
onYield: (message) => {
yieldDetected = true;
yieldMessage = message;
@@ -1650,6 +1651,7 @@ export async function runEmbeddedAttempt(
sessionId: params.sessionId,
runId: params.runId,
loopDetection: clientToolLoopDetection,
onToolOutcome: params.onToolOutcome,
},
)
: [];

View File

@@ -9,6 +9,7 @@ import type { DiagnosticTraceContext } from "../../../infra/diagnostic-trace-con
import type { PluginHookBeforeAgentStartResult } from "../../../plugins/hook-before-agent-start.types.js";
import type { AuthProfileStore } from "../../auth-profiles/types.js";
import type { MessagingToolSend } from "../../pi-embedded-messaging.types.js";
import type { ToolOutcomeObserver } from "../../pi-tools.before-tool-call.js";
import type { AgentRuntimePlan } from "../../runtime-plan/types.js";
import type { ToolErrorSummary } from "../../tool-error-summary.js";
import type { NormalizedUsage } from "../../usage.js";
@@ -40,6 +41,8 @@ export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & {
agentHarnessId?: string;
/** OpenClaw-owned runtime policy prepared by the orchestrator for this attempt. */
runtimePlan?: AgentRuntimePlan;
/** Live observer called after wrapped tool outcomes are recorded. */
onToolOutcome?: ToolOutcomeObserver;
model: Model<Api>;
authStorage: AuthStorage;
/** Auth profile store already resolved during startup for this attempt. */

View File

@@ -26,14 +26,18 @@ import {
import { createLazyRuntimeSurface } from "../shared/lazy-runtime.js";
import { isPlainObject } from "../utils.js";
import { copyChannelAgentToolMeta } from "./channel-tools.js";
import {
observePostCompactionLoopGuard,
PostCompactionLoopPersistedError,
} from "./pi-embedded-runner/post-compaction-loop-guard.js";
import { normalizeToolName } from "./tool-policy.js";
import type { AnyAgentTool } from "./tools/common.js";
import { callGatewayTool } from "./tools/gateway.js";
export type ToolOutcomeObservation = {
toolName: string;
argsHash: string;
resultHash: string;
};
export type ToolOutcomeObserver = (observation: ToolOutcomeObservation) => void;
export type HookContext = {
agentId?: string;
config?: OpenClawConfig;
@@ -43,6 +47,7 @@ export type HookContext = {
runId?: string;
trace?: DiagnosticTraceContext;
loopDetection?: ToolLoopDetectionConfig;
onToolOutcome?: ToolOutcomeObserver;
};
type HookBlockedKind = "veto" | "failure";
@@ -376,9 +381,10 @@ async function recordLoopOutcome(args: {
result?: unknown;
error?: unknown;
}): Promise<void> {
if (!args.ctx?.sessionKey) {
if (!args.ctx?.sessionKey && !args.ctx?.sessionId) {
return;
}
let recordedOutcome: ToolOutcomeObservation | undefined;
try {
const { getDiagnosticSessionState, recordToolCallOutcome } = await loadBeforeToolCallRuntime();
const sessionState = getDiagnosticSessionState({
@@ -394,29 +400,19 @@ async function recordLoopOutcome(args: {
config: args.ctx.loopDetection,
...(args.ctx.runId && { runId: args.ctx.runId }),
});
if (record?.resultHash) {
const verdict = observePostCompactionLoopGuard(
{
sessionKey: args.ctx.sessionKey,
sessionId: args.ctx.sessionId,
runId: args.ctx.runId,
},
{
toolName: record.toolName,
argsHash: record.argsHash,
resultHash: record.resultHash,
},
);
if (verdict?.shouldAbort) {
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
}
if (record?.resultHash && args.ctx.onToolOutcome) {
recordedOutcome = {
toolName: record.toolName,
argsHash: record.argsHash,
resultHash: record.resultHash,
};
}
} catch (err) {
if (err instanceof PostCompactionLoopPersistedError) {
throw err;
}
log.warn(`tool loop outcome tracking failed: tool=${args.toolName} error=${String(err)}`);
}
if (recordedOutcome) {
args.ctx.onToolOutcome?.(recordedOutcome);
}
}
export async function runBeforeToolCallHook(args: {

View File

@@ -27,7 +27,10 @@ import type { ModelAuthMode } from "./model-auth.js";
import { resolveOpenClawPluginToolsForOptions } from "./openclaw-plugin-tools.js";
import { createOpenClawTools } from "./openclaw-tools.js";
import { wrapToolWithAbortSignal } from "./pi-tools.abort.js";
import { wrapToolWithBeforeToolCallHook } from "./pi-tools.before-tool-call.js";
import {
type ToolOutcomeObserver,
wrapToolWithBeforeToolCallHook,
} from "./pi-tools.before-tool-call.js";
import { applyDeferredFollowupToolDescriptions } from "./pi-tools.deferred-followup.js";
import { filterToolsByMessageProvider } from "./pi-tools.message-provider-policy.js";
import {
@@ -378,6 +381,8 @@ export function createOpenClawCodingTools(options?: {
onYield?: (message: string) => Promise<void> | void;
/** Optional instrumentation callback for tool preparation stage timing. */
recordToolPrepStage?: (name: string) => void;
/** Live observer called after wrapped tool outcomes are recorded. */
onToolOutcome?: ToolOutcomeObserver;
}): AnyAgentTool[] {
const execToolName = "exec";
const sandbox = options?.sandbox?.enabled ? options.sandbox : undefined;
@@ -838,6 +843,7 @@ export function createOpenClawCodingTools(options?: {
runId: options?.runId,
...(options?.trace ? { trace: options.trace } : {}),
loopDetection: resolveToolLoopDetectionConfig({ cfg: options?.config, agentId }),
onToolOutcome: options?.onToolOutcome,
}),
);
options?.recordToolPrepStage?.("tool-hooks");