From 7d8ffdcfe528bc454caddfa69cfc5a17a5e30c40 Mon Sep 17 00:00:00 2001 From: Eva Date: Fri, 1 May 2026 20:02:06 +0700 Subject: [PATCH] fix: bound finalize lifecycle cleanup --- .../harness/lifecycle-hook-helpers.test.ts | 26 +++++++++ src/agents/harness/lifecycle-hook-helpers.ts | 7 ++- .../run-context-lifecycle.contract.test.ts | 8 +-- src/plugins/host-hook-runtime.ts | 55 +++++++++++++++---- 4 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/agents/harness/lifecycle-hook-helpers.test.ts b/src/agents/harness/lifecycle-hook-helpers.test.ts index 6b559691e94..8b27996baf9 100644 --- a/src/agents/harness/lifecycle-hook-helpers.test.ts +++ b/src/agents/harness/lifecycle-hook-helpers.test.ts @@ -187,4 +187,30 @@ describe("agent harness lifecycle hook helpers", () => { }), ).resolves.toEqual({ action: "revise", reason: "revise from context run" }); }); + + it("preserves merged revise reasons when retry metadata is present", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + reason: "fix generated baseline\n\nrerun the focused tests", + retry: { + instruction: "rerun the focused tests", + idempotencyKey: "merged-reason", + maxAttempts: 1, + }, + }), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: "fix generated baseline\n\nrerun the focused tests", + }); + }); }); diff --git a/src/agents/harness/lifecycle-hook-helpers.ts b/src/agents/harness/lifecycle-hook-helpers.ts index a6059394f87..12e9b0f2845 100644 --- a/src/agents/harness/lifecycle-hook-helpers.ts +++ b/src/agents/harness/lifecycle-hook-helpers.ts @@ -161,7 +161,12 @@ function normalizeBeforeAgentFinalizeResult( if (nextCount > maxAttempts) { return { action: "continue" }; } - return { action: "revise", reason: retryInstruction }; + const reason = result.reason?.trim(); + const revisedReason = + reason && reason.includes(retryInstruction) + ? reason + : [reason, retryInstruction].filter(Boolean).join("\n\n"); + return { action: "revise", reason: revisedReason }; } const reason = result.reason?.trim(); return reason ? { action: "revise", reason } : { action: "continue" }; diff --git a/src/plugins/contracts/run-context-lifecycle.contract.test.ts b/src/plugins/contracts/run-context-lifecycle.contract.test.ts index f529e36c3b8..16670ea3c18 100644 --- a/src/plugins/contracts/run-context-lifecycle.contract.test.ts +++ b/src/plugins/contracts/run-context-lifecycle.contract.test.ts @@ -165,7 +165,7 @@ describe("plugin run context lifecycle", () => { ).toBeUndefined(); }); - it("keeps run context until slow terminal event subscriptions settle", async () => { + it("clears run context after the terminal subscription grace period", async () => { vi.useFakeTimers(); let releaseTerminalHandler: (() => void) | undefined; let terminalHandlerSawContext: unknown; @@ -221,13 +221,13 @@ describe("plugin run context lifecycle", () => { pluginId: "slow-terminal-subscription", get: { runId: "run-slow-terminal", namespace: "seen" }, }), - ).toEqual({ runId: "run-slow-terminal" }); + ).toBeUndefined(); releaseTerminalHandler?.(); await vi.advanceTimersByTimeAsync(0); - expect(terminalHandlerSawContext).toEqual({ runId: "run-slow-terminal" }); - expect(terminalHandlerWroteContext).toEqual({ completed: true }); + expect(terminalHandlerSawContext).toBeUndefined(); + expect(terminalHandlerWroteContext).toBeUndefined(); expect( getPluginRunContext({ pluginId: "slow-terminal-subscription", diff --git a/src/plugins/host-hook-runtime.ts b/src/plugins/host-hook-runtime.ts index 4b8fe5b3bea..7005cf9423d 100644 --- a/src/plugins/host-hook-runtime.ts +++ b/src/plugins/host-hook-runtime.ts @@ -30,6 +30,7 @@ type PluginHostRuntimeState = { nextSchedulerJobGeneration: number; pendingAgentEventHandlersByRunId: Map>>; closedRunIds: Set; + terminalEventCleanupExpiredRunIds: Set; }; const PLUGIN_HOST_RUNTIME_STATE_KEY = Symbol.for("openclaw.pluginHostRuntimeState"); @@ -44,6 +45,7 @@ function getPluginHostRuntimeState(): PluginHostRuntimeState { nextSchedulerJobGeneration: 1, pendingAgentEventHandlersByRunId: new Map(), closedRunIds: new Set(), + terminalEventCleanupExpiredRunIds: new Set(), })); } @@ -57,6 +59,7 @@ function copyJsonValue(value: PluginJsonValue): PluginJsonValue { function markPluginRunClosed(runId: string): void { const state = getPluginHostRuntimeState(); + state.terminalEventCleanupExpiredRunIds.delete(runId); state.closedRunIds.delete(runId); state.closedRunIds.add(runId); while (state.closedRunIds.size > CLOSED_RUN_IDS_MAX) { @@ -72,6 +75,23 @@ function isPluginRunClosed(runId: string): boolean { return getPluginHostRuntimeState().closedRunIds.has(runId); } +function markTerminalEventCleanupExpired(runId: string): void { + const state = getPluginHostRuntimeState(); + state.terminalEventCleanupExpiredRunIds.delete(runId); + state.terminalEventCleanupExpiredRunIds.add(runId); + while (state.terminalEventCleanupExpiredRunIds.size > CLOSED_RUN_IDS_MAX) { + const oldest = state.terminalEventCleanupExpiredRunIds.values().next().value; + if (oldest === undefined) { + break; + } + state.terminalEventCleanupExpiredRunIds.delete(oldest); + } +} + +function isTerminalEventCleanupExpired(runId: string): boolean { + return getPluginHostRuntimeState().terminalEventCleanupExpiredRunIds.has(runId); +} + function trackAgentEventHandler(runId: string, pending: Promise): void { const state = getPluginHostRuntimeState(); const handlers = state.pendingAgentEventHandlersByRunId.get(runId) ?? new Set(); @@ -85,17 +105,28 @@ function trackAgentEventHandler(runId: string, pending: Promise): void { }); } -function waitForTerminalEventHandlers(pendingHandlers: Set>): Promise { +function waitForTerminalEventHandlers(params: { + runId: string; + pendingHandlers: Set>; +}): Promise { + const { pendingHandlers, runId } = params; if (pendingHandlers.size === 0) { return Promise.resolve(); } - let timeout: NodeJS.Timeout | undefined = setTimeout(() => { - log.warn( - `plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; preserving run context until they settle`, - ); - }, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); - timeout.unref?.(); - return Promise.allSettled(pendingHandlers).then(() => { + let timeout: NodeJS.Timeout | undefined; + const settled = Promise.allSettled(pendingHandlers).then(() => "settled" as const); + const timedOut = new Promise<"timeout">((resolve) => { + timeout = setTimeout(() => { + markTerminalEventCleanupExpired(runId); + getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.delete(runId); + log.warn( + `plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; clearing run context without waiting for them to settle`, + ); + resolve("timeout"); + }, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); + }); + timeout?.unref?.(); + return Promise.race([settled, timedOut]).then(() => { if (timeout) { clearTimeout(timeout); timeout = undefined; @@ -268,7 +299,7 @@ export function dispatchPluginAgentEventSubscriptions(params: { setPluginRunContext({ pluginId, patch: { runId, namespace, value }, - allowClosedRun: isTerminalEvent && handlerActive, + allowClosedRun: isTerminalEvent && handlerActive && !isTerminalEventCleanupExpired(runId), }); }, clearRunContext: (namespace?: string) => { @@ -305,7 +336,10 @@ export function dispatchPluginAgentEventSubscriptions(params: { const pendingForRun = getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ?? new Set(pendingHandlers); - void waitForTerminalEventHandlers(new Set(pendingForRun)).then(() => { + void waitForTerminalEventHandlers({ + runId: params.event.runId, + pendingHandlers: new Set(pendingForRun), + }).then(() => { clearPluginRunContext({ runId: params.event.runId }); }); } @@ -531,6 +565,7 @@ export function clearPluginHostRuntimeState(params?: { pluginId?: string; runId? state.schedulerJobsByPlugin.clear(); state.pendingAgentEventHandlersByRunId.clear(); state.closedRunIds.clear(); + state.terminalEventCleanupExpiredRunIds.clear(); } }