diff --git a/src/agents/pi-embedded-runner/post-compaction-loop-guard.ts b/src/agents/pi-embedded-runner/post-compaction-loop-guard.ts index 5e739315b88..b5f7f52ae2f 100644 --- a/src/agents/pi-embedded-runner/post-compaction-loop-guard.ts +++ b/src/agents/pi-embedded-runner/post-compaction-loop-guard.ts @@ -29,12 +29,6 @@ export type PostCompactionLoopGuard = { snapshot: () => { armed: boolean; remainingAttempts: number }; }; -export type PostCompactionGuardScope = { - sessionKey?: string; - sessionId?: string; - runId?: string; -}; - type GuardState = { enabled: boolean; windowSize: number; @@ -42,8 +36,6 @@ type GuardState = { history: PostCompactionGuardObservation[]; }; -const activeGuards = new Map(); - function asPositiveInt(value: number | undefined, fallback: number): number { if (typeof value !== "number" || !Number.isInteger(value) || value <= 0) { return fallback; @@ -113,56 +105,6 @@ export function createPostCompactionLoopGuard( return { armPostCompaction, observe, snapshot }; } -function normalizeScopePart(value: string | undefined): string | undefined { - const trimmed = value?.trim(); - return trimmed ? trimmed : undefined; -} - -function scopeKeys(scope: PostCompactionGuardScope): string[] { - const runId = normalizeScopePart(scope.runId); - const keys: string[] = []; - for (const [kind, id] of [ - ["sessionKey", normalizeScopePart(scope.sessionKey)], - ["sessionId", normalizeScopePart(scope.sessionId)], - ] as const) { - if (!id) { - continue; - } - keys.push(runId ? `${kind}:${id}:run:${runId}` : `${kind}:${id}`); - } - return keys; -} - -export function registerPostCompactionLoopGuard( - scope: PostCompactionGuardScope, - guard: PostCompactionLoopGuard, -): () => void { - const keys = scopeKeys(scope); - for (const key of keys) { - activeGuards.set(key, guard); - } - return () => { - for (const key of keys) { - if (activeGuards.get(key) === guard) { - activeGuards.delete(key); - } - } - }; -} - -export function observePostCompactionLoopGuard( - scope: PostCompactionGuardScope, - call: PostCompactionGuardObservation, -): PostCompactionGuardVerdict | undefined { - for (const key of scopeKeys(scope)) { - const guard = activeGuards.get(key); - if (guard) { - return guard.observe(call); - } - } - return undefined; -} - export class PostCompactionLoopPersistedError extends Error { readonly detector: "compaction_loop_persisted"; readonly count: number; diff --git a/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts b/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts index 6d9b2067551..4bfc0c373d8 100644 --- a/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts +++ b/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts @@ -4,7 +4,10 @@ import type { getDiagnosticSessionState as GetDiagnosticSessionStateType, SessionState, } from "../../logging/diagnostic-session-state.js"; -import type { wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType } from "../pi-tools.before-tool-call.js"; +import type { + ToolOutcomeObserver, + wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType, +} from "../pi-tools.before-tool-call.js"; import type { recordToolCall as RecordToolCallType, recordToolCallOutcome as RecordToolCallOutcomeType, @@ -72,6 +75,7 @@ async function executeWrappedToolOutcome( toolName: string, toolParams: unknown, result: unknown, + onToolOutcome?: ToolOutcomeObserver, runId = baseParams.runId, ): Promise { const tool = wrapToolWithBeforeToolCallHook( @@ -84,6 +88,7 @@ async function executeWrappedToolOutcome( sessionKey: baseParams.sessionKey, sessionId: baseParams.sessionId, runId, + onToolOutcome, }, ); liveToolCallSeq += 1; @@ -159,12 +164,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { // Attempt 2: post-compaction. The live wrapped-tool path records each // outcome while the prompt is still running; the third identical result // aborts before the attempt can return. - mockedRunEmbeddedAttempt.mockImplementationOnce(async () => { + mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => { + const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver }) + .onToolOutcome; for (let i = 0; i < 3; i += 1) { await executeWrappedToolOutcome( "gateway", { action: "lookup", path: "x" }, "identical-result", + onToolOutcome, ); } attemptReturned = true; @@ -200,9 +208,16 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { // Attempt 2 (post-compaction): identical args, but DIFFERENT result hash // each time. This fills the window without triggering the persisted-loop // abort because the tool is making progress. - mockedRunEmbeddedAttempt.mockImplementationOnce(async () => { + mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => { + const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver }) + .onToolOutcome; for (let i = 0; i < 3; i += 1) { - await executeWrappedToolOutcome("gateway", { action: "lookup", path: "x" }, `result-${i}`); + await executeWrappedToolOutcome( + "gateway", + { action: "lookup", path: "x" }, + `result-${i}`, + onToolOutcome, + ); } return makeAttemptResult({ promptError: null, @@ -235,9 +250,11 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { // Attempt 2 (post-compaction): two distinct records → window full, // guard disarms with no abort. We then append more identical records // afterwards in this test to confirm they are not observed by the guard. - mockedRunEmbeddedAttempt.mockImplementationOnce(async () => { - await executeWrappedToolOutcome("read", { path: "/a" }, "ra"); - await executeWrappedToolOutcome("write", { path: "/b" }, "rb"); + mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => { + const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver }) + .onToolOutcome; + await executeWrappedToolOutcome("read", { path: "/a" }, "ra", onToolOutcome); + await executeWrappedToolOutcome("write", { path: "/b" }, "rb", onToolOutcome); return makeAttemptResult({ promptError: null, toolMetas: [{ toolName: "read" }, { toolName: "write" }], @@ -293,12 +310,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { // Attempt 2 (post-compaction): three identical live tool outcomes while // history is already at the cap. The guard aborts on the third result // before the mocked attempt can return. - mockedRunEmbeddedAttempt.mockImplementationOnce(async () => { + mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => { + const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver }) + .onToolOutcome; for (let i = 0; i < 3; i += 1) { await executeWrappedToolOutcome( "gateway", { action: "lookup", path: "x" }, "identical-result", + onToolOutcome, ); } // History is still capped at HISTORY_TRIM_CAP after the trim. diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index c5d7495230a..314b2dc9abf 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -94,7 +94,8 @@ import { log } from "./logger.js"; import { resolveModelAsync } from "./model.js"; import { createPostCompactionLoopGuard, - registerPostCompactionLoopGuard, + PostCompactionLoopPersistedError, + type PostCompactionGuardObservation, } from "./post-compaction-loop-guard.js"; import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js"; import { handleAssistantFailover } from "./run/assistant-failover.js"; @@ -792,14 +793,14 @@ export async function runEmbeddedPiAgent( const postCompactionGuard = createPostCompactionLoopGuard( params.config?.tools?.loopDetection?.postCompactionGuard, ); - const unregisterPostCompactionGuard = registerPostCompactionLoopGuard( - { - sessionKey: params.sessionKey, - sessionId: params.sessionId, - runId: params.runId, - }, - postCompactionGuard, - ); + const observePostCompactionToolOutcome = ( + observation: PostCompactionGuardObservation, + ): void => { + const verdict = postCompactionGuard.observe(observation); + if (verdict.shouldAbort) { + throw PostCompactionLoopPersistedError.fromVerdict(verdict); + } + }; let lastRetryFailoverReason: FailoverReason | null = null; let planningOnlyRetryInstruction: string | null = null; let reasoningOnlyRetryInstruction: string | null = null; @@ -1160,6 +1161,7 @@ export async function runEmbeddedPiAgent( agentId: workspaceResolution.agentId, legacyBeforeAgentStartResult, thinkLevel, + onToolOutcome: observePostCompactionToolOutcome, fastMode: params.fastMode, verboseLevel: params.verboseLevel, reasoningLevel: params.reasoningLevel, @@ -2786,7 +2788,6 @@ export async function runEmbeddedPiAgent( }; } } finally { - unregisterPostCompactionGuard(); forgetPromptBuildDrainCacheForRun(params.runId); stopRuntimeAuthRefreshTimer(); await runAgentCleanupStep({ diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 372ad4dd00c..28ec416c98b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -928,6 +928,7 @@ export async function runEmbeddedAttempt( forceHeartbeatTool: params.forceHeartbeatTool, authProfileStore: params.authProfileStore, recordToolPrepStage: (name) => corePluginToolStages.mark(name), + onToolOutcome: params.onToolOutcome, onYield: (message) => { yieldDetected = true; yieldMessage = message; @@ -1650,6 +1651,7 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, runId: params.runId, loopDetection: clientToolLoopDetection, + onToolOutcome: params.onToolOutcome, }, ) : []; diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index c5f514b74ea..621b3a02a8b 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -9,6 +9,7 @@ import type { DiagnosticTraceContext } from "../../../infra/diagnostic-trace-con import type { PluginHookBeforeAgentStartResult } from "../../../plugins/hook-before-agent-start.types.js"; import type { AuthProfileStore } from "../../auth-profiles/types.js"; import type { MessagingToolSend } from "../../pi-embedded-messaging.types.js"; +import type { ToolOutcomeObserver } from "../../pi-tools.before-tool-call.js"; import type { AgentRuntimePlan } from "../../runtime-plan/types.js"; import type { ToolErrorSummary } from "../../tool-error-summary.js"; import type { NormalizedUsage } from "../../usage.js"; @@ -40,6 +41,8 @@ export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { agentHarnessId?: string; /** OpenClaw-owned runtime policy prepared by the orchestrator for this attempt. */ runtimePlan?: AgentRuntimePlan; + /** Live observer called after wrapped tool outcomes are recorded. */ + onToolOutcome?: ToolOutcomeObserver; model: Model; authStorage: AuthStorage; /** Auth profile store already resolved during startup for this attempt. */ diff --git a/src/agents/pi-tools.before-tool-call.ts b/src/agents/pi-tools.before-tool-call.ts index c5d8c95164a..cf30b79ad58 100644 --- a/src/agents/pi-tools.before-tool-call.ts +++ b/src/agents/pi-tools.before-tool-call.ts @@ -26,14 +26,18 @@ import { import { createLazyRuntimeSurface } from "../shared/lazy-runtime.js"; import { isPlainObject } from "../utils.js"; import { copyChannelAgentToolMeta } from "./channel-tools.js"; -import { - observePostCompactionLoopGuard, - PostCompactionLoopPersistedError, -} from "./pi-embedded-runner/post-compaction-loop-guard.js"; import { normalizeToolName } from "./tool-policy.js"; import type { AnyAgentTool } from "./tools/common.js"; import { callGatewayTool } from "./tools/gateway.js"; +export type ToolOutcomeObservation = { + toolName: string; + argsHash: string; + resultHash: string; +}; + +export type ToolOutcomeObserver = (observation: ToolOutcomeObservation) => void; + export type HookContext = { agentId?: string; config?: OpenClawConfig; @@ -43,6 +47,7 @@ export type HookContext = { runId?: string; trace?: DiagnosticTraceContext; loopDetection?: ToolLoopDetectionConfig; + onToolOutcome?: ToolOutcomeObserver; }; type HookBlockedKind = "veto" | "failure"; @@ -376,9 +381,10 @@ async function recordLoopOutcome(args: { result?: unknown; error?: unknown; }): Promise { - if (!args.ctx?.sessionKey) { + if (!args.ctx?.sessionKey && !args.ctx?.sessionId) { return; } + let recordedOutcome: ToolOutcomeObservation | undefined; try { const { getDiagnosticSessionState, recordToolCallOutcome } = await loadBeforeToolCallRuntime(); const sessionState = getDiagnosticSessionState({ @@ -394,29 +400,19 @@ async function recordLoopOutcome(args: { config: args.ctx.loopDetection, ...(args.ctx.runId && { runId: args.ctx.runId }), }); - if (record?.resultHash) { - const verdict = observePostCompactionLoopGuard( - { - sessionKey: args.ctx.sessionKey, - sessionId: args.ctx.sessionId, - runId: args.ctx.runId, - }, - { - toolName: record.toolName, - argsHash: record.argsHash, - resultHash: record.resultHash, - }, - ); - if (verdict?.shouldAbort) { - throw PostCompactionLoopPersistedError.fromVerdict(verdict); - } + if (record?.resultHash && args.ctx.onToolOutcome) { + recordedOutcome = { + toolName: record.toolName, + argsHash: record.argsHash, + resultHash: record.resultHash, + }; } } catch (err) { - if (err instanceof PostCompactionLoopPersistedError) { - throw err; - } log.warn(`tool loop outcome tracking failed: tool=${args.toolName} error=${String(err)}`); } + if (recordedOutcome) { + args.ctx.onToolOutcome?.(recordedOutcome); + } } export async function runBeforeToolCallHook(args: { diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index 69f109cfe38..80d78201aa6 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -27,7 +27,10 @@ import type { ModelAuthMode } from "./model-auth.js"; import { resolveOpenClawPluginToolsForOptions } from "./openclaw-plugin-tools.js"; import { createOpenClawTools } from "./openclaw-tools.js"; import { wrapToolWithAbortSignal } from "./pi-tools.abort.js"; -import { wrapToolWithBeforeToolCallHook } from "./pi-tools.before-tool-call.js"; +import { + type ToolOutcomeObserver, + wrapToolWithBeforeToolCallHook, +} from "./pi-tools.before-tool-call.js"; import { applyDeferredFollowupToolDescriptions } from "./pi-tools.deferred-followup.js"; import { filterToolsByMessageProvider } from "./pi-tools.message-provider-policy.js"; import { @@ -378,6 +381,8 @@ export function createOpenClawCodingTools(options?: { onYield?: (message: string) => Promise | void; /** Optional instrumentation callback for tool preparation stage timing. */ recordToolPrepStage?: (name: string) => void; + /** Live observer called after wrapped tool outcomes are recorded. */ + onToolOutcome?: ToolOutcomeObserver; }): AnyAgentTool[] { const execToolName = "exec"; const sandbox = options?.sandbox?.enabled ? options.sandbox : undefined; @@ -838,6 +843,7 @@ export function createOpenClawCodingTools(options?: { runId: options?.runId, ...(options?.trace ? { trace: options.trace } : {}), loopDetection: resolveToolLoopDetectionConfig({ cfg: options?.config, agentId }), + onToolOutcome: options?.onToolOutcome, }), ); options?.recordToolPrepStage?.("tool-hooks");