From cb097004eb5d92bc3d4619e40b839a568e39fa9a Mon Sep 17 00:00:00 2001 From: Eva Date: Fri, 1 May 2026 17:40:09 +0700 Subject: [PATCH] fix(plugin-sdk): harden run context cleanup lifecycle --- docs/plugins/hooks.md | 16 + .../harness/lifecycle-hook-helpers.test.ts | 139 ++++- src/agents/harness/lifecycle-hook-helpers.ts | 74 ++- .../run-context-lifecycle.contract.test.ts | 509 ++++++++++++++++++ src/plugins/hook-types.ts | 5 + .../hooks.before-agent-finalize.test.ts | 37 ++ src/plugins/hooks.ts | 21 +- src/plugins/host-hook-cleanup-timeout.test.ts | 36 ++ src/plugins/host-hook-cleanup-timeout.ts | 30 ++ src/plugins/host-hook-cleanup.ts | 145 +++-- src/plugins/host-hook-runtime.ts | 103 +++- 11 files changed, 1030 insertions(+), 85 deletions(-) create mode 100644 src/plugins/contracts/run-context-lifecycle.contract.test.ts create mode 100644 src/plugins/host-hook-cleanup-timeout.test.ts create mode 100644 src/plugins/host-hook-cleanup-timeout.ts diff --git a/docs/plugins/hooks.md b/docs/plugins/hooks.md index 094cf4d651d..b0e928ea50c 100644 --- a/docs/plugins/hooks.md +++ b/docs/plugins/hooks.md @@ -264,6 +264,22 @@ the harness for one more model pass before finalization, `{ action: Codex native `Stop` hooks are relayed into this hook as OpenClaw `before_agent_finalize` decisions. +When returning `action: "revise"`, plugins can include `retry` metadata to make +the extra model pass bounded and replay-safe: + +```typescript +type BeforeAgentFinalizeRetry = { + instruction: string; + idempotencyKey?: string; + maxAttempts?: number; +}; +``` + +`instruction` is appended to the revision reason sent to the harness. +`idempotencyKey` lets the host count retries for the same plugin request across +equivalent finalize decisions, and `maxAttempts` caps how many extra passes the +host will allow before continuing with the natural final answer. + Non-bundled plugins that need `llm_input`, `llm_output`, `before_agent_finalize`, or `agent_end` must set: diff --git a/src/agents/harness/lifecycle-hook-helpers.test.ts b/src/agents/harness/lifecycle-hook-helpers.test.ts index 1c2e4cdb24c..6b559691e94 100644 --- a/src/agents/harness/lifecycle-hook-helpers.test.ts +++ b/src/agents/harness/lifecycle-hook-helpers.test.ts @@ -1,5 +1,6 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { + clearAgentHarnessFinalizeRetryBudget, runAgentHarnessAgentEndHook, runAgentHarnessBeforeAgentFinalizeHook, runAgentHarnessLlmInputHook, @@ -10,7 +11,24 @@ const legacyHookRunner = { hasHooks: () => true, }; +const EVENT = { + runId: "run-1", + sessionId: "session-1", + sessionKey: "agent:main:session-1", + turnId: "turn-1", + provider: "codex", + model: "gpt-5.4", + cwd: "/repo", + transcriptPath: "/tmp/session.jsonl", + stopHookActive: false, + lastAssistantMessage: "done", +}; + describe("agent harness lifecycle hook helpers", () => { + afterEach(() => { + clearAgentHarnessFinalizeRetryBudget(); + }); + it("ignores legacy hook runners that advertise llm_input without a runner method", () => { expect(() => runAgentHarnessLlmInputHook({ @@ -50,4 +68,123 @@ describe("agent harness lifecycle hook helpers", () => { } as never), ).resolves.toEqual({ action: "continue" }); }); + + it("clears finalize retry budgets by run id", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "revise once", + idempotencyKey: "stable", + maxAttempts: 1, + }, + }), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise once" }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + + clearAgentHarnessFinalizeRetryBudget({ runId: "run-1" }); + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise once" }); + }); + + it("does not clear finalize retry budgets for runs that only share a prefix", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "revise child once", + idempotencyKey: "stable", + maxAttempts: 1, + }, + }), + }; + const childEvent = { + ...EVENT, + runId: "run:child", + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: childEvent, + ctx: { runId: "run:child", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise child once" }); + + clearAgentHarnessFinalizeRetryBudget({ runId: "run" }); + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: childEvent, + ctx: { runId: "run:child", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + }); + + it("keys finalize retry budgets by context run id when the event omits run id", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "revise from context run", + idempotencyKey: "stable", + maxAttempts: 1, + }, + }), + }; + const eventWithoutRunId = { + ...EVENT, + runId: undefined, + sessionId: "shared-session", + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: eventWithoutRunId, + ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise from context run" }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: eventWithoutRunId, + ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + + clearAgentHarnessFinalizeRetryBudget({ runId: "run-from-context" }); + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: eventWithoutRunId, + ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise from context run" }); + }); }); diff --git a/src/agents/harness/lifecycle-hook-helpers.ts b/src/agents/harness/lifecycle-hook-helpers.ts index db35af577ef..a6059394f87 100644 --- a/src/agents/harness/lifecycle-hook-helpers.ts +++ b/src/agents/harness/lifecycle-hook-helpers.ts @@ -7,11 +7,53 @@ import type { PluginHookLlmInputEvent, PluginHookLlmOutputEvent, } from "../../plugins/hook-types.js"; +import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; import { buildAgentHookContext, type AgentHarnessHookContext } from "./hook-context.js"; const log = createSubsystemLogger("agents/harness"); +const FINALIZE_RETRY_BUDGET_KEY = Symbol.for("openclaw.pluginFinalizeRetryBudget"); +const FINALIZE_RETRY_BUDGET_MAX_ENTRIES = 2048; type AgentHarnessHookRunner = ReturnType; +type FinalizeRetryBudget = Map>; + +function getFinalizeRetryBudget(): FinalizeRetryBudget { + return resolveGlobalSingleton(FINALIZE_RETRY_BUDGET_KEY, () => new Map()); +} + +function countFinalizeRetryBudgetEntries(budget: FinalizeRetryBudget): number { + let count = 0; + for (const runBudget of budget.values()) { + count += runBudget.size; + } + return count; +} + +function pruneFinalizeRetryBudget(budget: FinalizeRetryBudget): void { + while (countFinalizeRetryBudgetEntries(budget) > FINALIZE_RETRY_BUDGET_MAX_ENTRIES) { + const oldestRunId = budget.keys().next().value; + if (oldestRunId === undefined) { + return; + } + const oldestRunBudget = budget.get(oldestRunId); + const oldestRetryKey = oldestRunBudget?.keys().next().value; + if (oldestRunBudget && oldestRetryKey !== undefined) { + oldestRunBudget.delete(oldestRetryKey); + } + if (!oldestRunBudget || oldestRunBudget.size === 0) { + budget.delete(oldestRunId); + } + } +} + +export function clearAgentHarnessFinalizeRetryBudget(params?: { runId?: string }): void { + const budget = getFinalizeRetryBudget(); + if (!params?.runId) { + budget.clear(); + return; + } + budget.delete(params.runId); +} export function runAgentHarnessLlmInputHook(params: { event: PluginHookLlmInputEvent; @@ -73,8 +115,16 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: { return { action: "continue" }; } try { + const eventForNormalization: PluginHookBeforeAgentFinalizeEvent = { + ...params.event, + runId: params.event.runId ?? params.ctx.runId, + }; return normalizeBeforeAgentFinalizeResult( - await hookRunner.runBeforeAgentFinalize(params.event, buildAgentHookContext(params.ctx)), + await hookRunner.runBeforeAgentFinalize( + eventForNormalization, + buildAgentHookContext(params.ctx), + ), + eventForNormalization, ); } catch (error) { log.warn(`before_agent_finalize hook failed: ${String(error)}`); @@ -84,6 +134,7 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: { function normalizeBeforeAgentFinalizeResult( result: PluginHookBeforeAgentFinalizeResult | undefined, + event?: PluginHookBeforeAgentFinalizeEvent, ): AgentHarnessBeforeAgentFinalizeOutcome { if (result?.action === "finalize") { return result.reason?.trim() @@ -91,6 +142,27 @@ function normalizeBeforeAgentFinalizeResult( : { action: "finalize" }; } if (result?.action === "revise") { + const retryInstruction = result.retry?.instruction?.trim(); + if (retryInstruction) { + const maxAttempts = + typeof result.retry?.maxAttempts === "number" && Number.isFinite(result.retry.maxAttempts) + ? Math.max(1, Math.floor(result.retry.maxAttempts)) + : 1; + const retryRunId = event?.runId ?? event?.sessionId ?? "unknown-run"; + const retryKey = result.retry?.idempotencyKey?.trim() || retryInstruction.slice(0, 160); + const budget = getFinalizeRetryBudget(); + const runBudget = budget.get(retryRunId) ?? new Map(); + const nextCount = (runBudget.get(retryKey) ?? 0) + 1; + runBudget.delete(retryKey); + runBudget.set(retryKey, nextCount); + budget.delete(retryRunId); + budget.set(retryRunId, runBudget); + pruneFinalizeRetryBudget(budget); + if (nextCount > maxAttempts) { + return { action: "continue" }; + } + return { action: "revise", reason: retryInstruction }; + } const reason = result.reason?.trim(); return reason ? { action: "revise", reason } : { action: "continue" }; } diff --git a/src/plugins/contracts/run-context-lifecycle.contract.test.ts b/src/plugins/contracts/run-context-lifecycle.contract.test.ts new file mode 100644 index 00000000000..f529e36c3b8 --- /dev/null +++ b/src/plugins/contracts/run-context-lifecycle.contract.test.ts @@ -0,0 +1,509 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { + createPluginRegistryFixture, + registerTestPlugin, +} from "openclaw/plugin-sdk/plugin-test-contracts"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { loadSessionStore, updateSessionStore } from "../../config/sessions.js"; +import { withTempConfig } from "../../gateway/test-temp-config.js"; +import { emitAgentEvent, resetAgentEventsForTest } from "../../infra/agent-events.js"; +import { resolvePreferredOpenClawTmpDir } from "../../infra/tmp-openclaw-dir.js"; +import { PLUGIN_HOST_CLEANUP_TIMEOUT_MS } from "../host-hook-cleanup-timeout.js"; +import { runPluginHostCleanup } from "../host-hook-cleanup.js"; +import { + clearPluginHostRuntimeState, + getPluginRunContext, + listPluginSessionSchedulerJobs, + PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS, + dispatchPluginAgentEventSubscriptions, + registerPluginSessionSchedulerJob, + setPluginRunContext, +} from "../host-hook-runtime.js"; +import { createEmptyPluginRegistry } from "../registry-empty.js"; +import { setActivePluginRegistry } from "../runtime.js"; +import { createPluginRecord } from "../status.test-helpers.js"; + +async function waitForPluginEventHandlers(): Promise { + await new Promise((resolve) => { + setTimeout(resolve, 0); + }); +} + +describe("plugin run context lifecycle", () => { + afterEach(() => { + vi.useRealTimers(); + setActivePluginRegistry(createEmptyPluginRegistry()); + clearPluginHostRuntimeState(); + resetAgentEventsForTest(); + }); + + it("does not let delayed non-terminal subscriptions resurrect closed run context", async () => { + let releaseToolHandler: (() => void) | undefined; + let delayedToolHandlerSawContext: unknown; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "delayed-subscription", + name: "Delayed Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "delayed", + streams: ["tool"], + async handle(_event, ctx) { + ctx.setRunContext("before-terminal", { visible: true }); + await new Promise((resolve) => { + releaseToolHandler = resolve; + }); + delayedToolHandlerSawContext = ctx.getRunContext("before-terminal"); + ctx.setRunContext("late", { resurrected: true }); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-delayed-subscription", + stream: "tool", + data: { name: "tool" }, + }); + await Promise.resolve(); + + emitAgentEvent({ + runId: "run-delayed-subscription", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + expect( + getPluginRunContext({ + pluginId: "delayed-subscription", + get: { runId: "run-delayed-subscription", namespace: "before-terminal" }, + }), + ).toEqual({ visible: true }); + + releaseToolHandler?.(); + await waitForPluginEventHandlers(); + + expect(delayedToolHandlerSawContext).toEqual({ visible: true }); + expect( + getPluginRunContext({ + pluginId: "delayed-subscription", + get: { runId: "run-delayed-subscription", namespace: "late" }, + }), + ).toBeUndefined(); + }); + + it("preserves run context until async terminal event subscriptions settle", async () => { + let releaseTerminalHandler: (() => void) | undefined; + let terminalHandlerSawContext: unknown; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "async-terminal-subscription", + name: "Async Terminal Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "records", + streams: ["tool", "lifecycle"], + async handle(event, ctx) { + if (event.stream === "tool") { + ctx.setRunContext("seen", { runId: event.runId }); + return; + } + if (event.data?.phase !== "end") { + return; + } + await new Promise((resolve) => { + releaseTerminalHandler = resolve; + }); + terminalHandlerSawContext = ctx.getRunContext("seen"); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-async-terminal", + stream: "tool", + data: { name: "tool" }, + }); + await Promise.resolve(); + + emitAgentEvent({ + runId: "run-async-terminal", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + expect( + getPluginRunContext({ + pluginId: "async-terminal-subscription", + get: { runId: "run-async-terminal", namespace: "seen" }, + }), + ).toEqual({ runId: "run-async-terminal" }); + + releaseTerminalHandler?.(); + await waitForPluginEventHandlers(); + + expect(terminalHandlerSawContext).toEqual({ runId: "run-async-terminal" }); + expect( + getPluginRunContext({ + pluginId: "async-terminal-subscription", + get: { runId: "run-async-terminal", namespace: "seen" }, + }), + ).toBeUndefined(); + }); + + it("keeps run context until slow terminal event subscriptions settle", async () => { + vi.useFakeTimers(); + let releaseTerminalHandler: (() => void) | undefined; + let terminalHandlerSawContext: unknown; + let terminalHandlerWroteContext: unknown; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "slow-terminal-subscription", + name: "Slow Terminal Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "slow", + streams: ["tool", "lifecycle"], + async handle(event, ctx) { + if (event.stream === "tool") { + ctx.setRunContext("seen", { runId: event.runId }); + return; + } + if (event.data?.phase === "end") { + await new Promise((resolve) => { + releaseTerminalHandler = resolve; + }); + terminalHandlerSawContext = ctx.getRunContext("seen"); + ctx.setRunContext("terminal", { completed: true }); + terminalHandlerWroteContext = ctx.getRunContext("terminal"); + } + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-slow-terminal", + stream: "tool", + data: { name: "tool" }, + }); + await Promise.resolve(); + + emitAgentEvent({ + runId: "run-slow-terminal", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + await vi.advanceTimersByTimeAsync(PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); + expect( + getPluginRunContext({ + pluginId: "slow-terminal-subscription", + get: { runId: "run-slow-terminal", namespace: "seen" }, + }), + ).toEqual({ runId: "run-slow-terminal" }); + + releaseTerminalHandler?.(); + await vi.advanceTimersByTimeAsync(0); + + expect(terminalHandlerSawContext).toEqual({ runId: "run-slow-terminal" }); + expect(terminalHandlerWroteContext).toEqual({ completed: true }); + expect( + getPluginRunContext({ + pluginId: "slow-terminal-subscription", + get: { runId: "run-slow-terminal", namespace: "seen" }, + }), + ).toBeUndefined(); + expect( + getPluginRunContext({ + pluginId: "slow-terminal-subscription", + get: { runId: "run-slow-terminal", namespace: "terminal" }, + }), + ).toBeUndefined(); + }); + + it("preserves scheduler jobs instead of invoking stale cleanup callbacks", async () => { + const cleanup = vi.fn(); + registerPluginSessionSchedulerJob({ + pluginId: "scheduler-plugin", + pluginName: "Scheduler Plugin", + job: { + id: "job-preserved", + sessionKey: "agent:main:main", + kind: "session-turn", + cleanup, + }, + }); + + await expect( + runPluginHostCleanup({ + reason: "disable", + pluginId: "scheduler-plugin", + preserveSchedulerJobIds: new Set(["job-preserved"]), + }), + ).resolves.toMatchObject({ failures: [] }); + expect(cleanup).not.toHaveBeenCalled(); + expect(listPluginSessionSchedulerJobs("scheduler-plugin")).toHaveLength(1); + }); + + it("preserves plugin run context during restart cleanup", async () => { + const registry = createEmptyPluginRegistry(); + expect( + setPluginRunContext({ + pluginId: "restart-context-plugin", + patch: { runId: "run-restart", namespace: "state", value: { keep: true } }, + }), + ).toBe(true); + + await expect( + runPluginHostCleanup({ + registry, + pluginId: "restart-context-plugin", + reason: "restart", + }), + ).resolves.toMatchObject({ failures: [] }); + expect( + getPluginRunContext({ + pluginId: "restart-context-plugin", + get: { runId: "run-restart", namespace: "state" }, + }), + ).toEqual({ keep: true }); + + await expect( + runPluginHostCleanup({ + registry, + pluginId: "restart-context-plugin", + reason: "disable", + }), + ).resolves.toMatchObject({ failures: [] }); + expect( + getPluginRunContext({ + pluginId: "restart-context-plugin", + get: { runId: "run-restart", namespace: "state" }, + }), + ).toBeUndefined(); + }); + + it("preserves durable plugin session state during plugin restart cleanup", async () => { + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "restart-state-fixture", + name: "Restart State Fixture", + }), + register(api) { + api.registerSessionExtension({ + namespace: "workflow", + description: "restart state test", + }); + }, + }); + + const stateDir = await fs.mkdtemp( + path.join(resolvePreferredOpenClawTmpDir(), "openclaw-run-context-restart-state-"), + ); + const storePath = path.join(stateDir, "sessions.json"); + const tempConfig = { + session: { store: storePath }, + }; + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + try { + process.env.OPENCLAW_STATE_DIR = stateDir; + await withTempConfig({ + cfg: tempConfig, + run: async () => { + await updateSessionStore(storePath, (store) => { + store["agent:main:main"] = { + sessionId: "session-1", + updatedAt: Date.now(), + pluginExtensions: { + "restart-state-fixture": { workflow: { state: "waiting" } }, + }, + pluginNextTurnInjections: { + "restart-state-fixture": [ + { + id: "resume", + pluginId: "restart-state-fixture", + text: "resume", + placement: "prepend_context", + createdAt: 1, + }, + ], + }, + }; + return undefined; + }); + + await expect( + runPluginHostCleanup({ + cfg: tempConfig, + registry: registry.registry, + pluginId: "restart-state-fixture", + reason: "restart", + }), + ).resolves.toMatchObject({ failures: [] }); + + const stored = loadSessionStore(storePath, { skipCache: true }); + expect(stored["agent:main:main"]?.pluginExtensions).toEqual({ + "restart-state-fixture": { workflow: { state: "waiting" } }, + }); + expect(stored["agent:main:main"]?.pluginNextTurnInjections).toEqual({ + "restart-state-fixture": [ + { + id: "resume", + pluginId: "restart-state-fixture", + text: "resume", + placement: "prepend_context", + createdAt: 1, + }, + ], + }); + }, + }); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + } + }); + + it("rejects hung cleanup hooks with a bounded timeout", async () => { + vi.useFakeTimers(); + const cleanup = vi.fn(async () => { + await new Promise(() => undefined); + }); + registerPluginSessionSchedulerJob({ + pluginId: "hung-cleanup-plugin", + pluginName: "Hung Cleanup Plugin", + job: { + id: "job-hung", + sessionKey: "agent:main:main", + kind: "session-turn", + cleanup, + }, + }); + + const resultPromise = runPluginHostCleanup({ + reason: "disable", + pluginId: "hung-cleanup-plugin", + }); + await vi.advanceTimersByTimeAsync(0); + expect(cleanup).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(5_000); + await expect(resultPromise).resolves.toMatchObject({ + failures: [ + { + pluginId: "hung-cleanup-plugin", + hookId: "scheduler:job-hung", + }, + ], + }); + }); + + it("bounds session, runtime, and scheduler cleanup callbacks so cleanup keeps moving", async () => { + vi.useFakeTimers(); + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "hanging-cleanup-fixture", + name: "Hanging Cleanup Fixture", + }), + register(api) { + api.registerSessionExtension({ + namespace: "state", + description: "hangs during cleanup", + cleanup: () => new Promise(() => undefined), + }); + api.registerRuntimeLifecycle({ + id: "runtime-cleanup", + cleanup: () => new Promise(() => undefined), + }); + api.registerSessionSchedulerJob({ + id: "scheduler-cleanup", + sessionKey: "agent:main:main", + kind: "monitor", + cleanup: () => new Promise(() => undefined), + }); + }, + }); + + const cleanupPromise = runPluginHostCleanup({ + cfg: config, + registry: registry.registry, + pluginId: "hanging-cleanup-fixture", + reason: "delete", + }); + for (let index = 0; index < 3; index += 1) { + await vi.advanceTimersByTimeAsync(PLUGIN_HOST_CLEANUP_TIMEOUT_MS + 1); + } + await expect(cleanupPromise).resolves.toMatchObject({ + failures: [ + expect.objectContaining({ + pluginId: "hanging-cleanup-fixture", + hookId: "session:state", + }), + expect.objectContaining({ + pluginId: "hanging-cleanup-fixture", + hookId: "runtime:runtime-cleanup", + }), + expect.objectContaining({ + pluginId: "hanging-cleanup-fixture", + hookId: "scheduler:scheduler-cleanup", + }), + ], + }); + }); + + it("blocks setting run context after a run is closed", () => { + expect( + setPluginRunContext({ + pluginId: "closed-run-plugin", + patch: { runId: "run-closed", namespace: "state", value: { before: true } }, + }), + ).toBe(true); + dispatchPluginAgentEventSubscriptions({ + registry: createEmptyPluginRegistry(), + event: { + runId: "run-closed", + seq: 1, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }, + }); + + expect( + setPluginRunContext({ + pluginId: "closed-run-plugin", + patch: { runId: "run-closed", namespace: "state", value: { after: true } }, + }), + ).toBe(false); + }); +}); diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index ae8894cfc9a..04e41afe370 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -299,6 +299,11 @@ export type PluginHookBeforeAgentFinalizeResult = { */ action?: "continue" | "revise" | "finalize"; reason?: string; + retry?: { + instruction: string; + idempotencyKey?: string; + maxAttempts?: number; + }; }; export type PluginHookBeforeCompactionEvent = { diff --git a/src/plugins/hooks.before-agent-finalize.test.ts b/src/plugins/hooks.before-agent-finalize.test.ts index 3db857b739e..e6b60f0bae8 100644 --- a/src/plugins/hooks.before-agent-finalize.test.ts +++ b/src/plugins/hooks.before-agent-finalize.test.ts @@ -60,6 +60,43 @@ describe("before_agent_finalize hook runner", () => { }); }); + it("skips empty retry instructions when merging revise decisions", async () => { + const runner = createHookRunner( + createMockPluginRegistry([ + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "needs a retry but forgot the instruction", + retry: { instruction: " ", idempotencyKey: "empty-retry" }, + }), + }, + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "rerun the focused tests", + retry: { + instruction: " rerun the focused tests ", + idempotencyKey: "valid-retry", + maxAttempts: 1, + }, + }), + }, + ]), + ); + + await expect(runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX)).resolves.toEqual({ + action: "revise", + reason: "needs a retry but forgot the instruction\n\nrerun the focused tests", + retry: { + instruction: "rerun the focused tests", + idempotencyKey: "valid-retry", + maxAttempts: 1, + }, + }); + }); + it("lets finalize override earlier revise decisions", async () => { const runner = createHookRunner( createMockPluginRegistry([ diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index 4fb45c2db9f..2c81893fb12 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -319,6 +319,18 @@ export function createHookRunner( acc: PluginHookBeforeAgentFinalizeResult | undefined, next: PluginHookBeforeAgentFinalizeResult, ): PluginHookBeforeAgentFinalizeResult => { + const normalizeRetry = ( + retry: PluginHookBeforeAgentFinalizeResult["retry"] | undefined, + ): PluginHookBeforeAgentFinalizeResult["retry"] | undefined => { + const instruction = retry?.instruction.trim(); + if (!instruction) { + return undefined; + } + return { + ...retry, + instruction, + }; + }; if (acc?.action === "finalize") { return acc; } @@ -326,19 +338,26 @@ export function createHookRunner( return { action: "finalize", reason: next.reason }; } if (acc?.action === "revise" && next.action === "revise") { + const retry = normalizeRetry(acc.retry) ?? normalizeRetry(next.retry); return { action: "revise", reason: concatOptionalTextSegments({ left: acc.reason, right: next.reason, }), + ...(retry ? { retry } : {}), }; } if (acc?.action === "revise") { return acc; } if (next.action === "revise") { - return { action: "revise", reason: next.reason }; + const retry = normalizeRetry(next.retry); + return { + action: "revise", + reason: next.reason, + ...(retry ? { retry } : {}), + }; } return next.action === "continue" ? { action: "continue", reason: next.reason } : (acc ?? next); }; diff --git a/src/plugins/host-hook-cleanup-timeout.test.ts b/src/plugins/host-hook-cleanup-timeout.test.ts new file mode 100644 index 00000000000..671e6bc297e --- /dev/null +++ b/src/plugins/host-hook-cleanup-timeout.test.ts @@ -0,0 +1,36 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + PLUGIN_HOST_CLEANUP_TIMEOUT_MS, + withPluginHostCleanupTimeout, +} from "./host-hook-cleanup-timeout.js"; + +describe("withPluginHostCleanupTimeout", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("unrefs cleanup timeout timers so pending cleanup does not keep the process alive", async () => { + const originalSetTimeout = globalThis.setTimeout; + const unref = vi.fn(); + + vi.spyOn(globalThis, "setTimeout").mockImplementation((( + callback: () => void, + timeout?: number, + ) => { + const timer = originalSetTimeout(callback, timeout); + vi.spyOn(timer, "unref").mockImplementation(() => { + unref(); + return timer; + }); + return timer; + }) as typeof setTimeout); + + await expect(withPluginHostCleanupTimeout("fast-cleanup", () => "ok")).resolves.toBe("ok"); + + expect(globalThis.setTimeout).toHaveBeenCalledWith( + expect.any(Function), + PLUGIN_HOST_CLEANUP_TIMEOUT_MS, + ); + expect(unref).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/plugins/host-hook-cleanup-timeout.ts b/src/plugins/host-hook-cleanup-timeout.ts new file mode 100644 index 00000000000..7b992664dd5 --- /dev/null +++ b/src/plugins/host-hook-cleanup-timeout.ts @@ -0,0 +1,30 @@ +export const PLUGIN_HOST_CLEANUP_TIMEOUT_MS = 5_000; + +export class PluginHostCleanupTimeoutError extends Error { + constructor(hookId: string) { + super(`plugin host cleanup timed out: ${hookId}`); + this.name = "PluginHostCleanupTimeoutError"; + } +} + +export async function withPluginHostCleanupTimeout( + hookId: string, + cleanup: () => T | Promise, +): Promise { + let timeout: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + Promise.resolve().then(cleanup), + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new PluginHostCleanupTimeoutError(hookId)); + }, PLUGIN_HOST_CLEANUP_TIMEOUT_MS); + timeout.unref?.(); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } +} diff --git a/src/plugins/host-hook-cleanup.ts b/src/plugins/host-hook-cleanup.ts index 1ff2aca7036..913fa5e27d5 100644 --- a/src/plugins/host-hook-cleanup.ts +++ b/src/plugins/host-hook-cleanup.ts @@ -1,10 +1,16 @@ import fs from "node:fs"; +import { getRuntimeConfig } from "../config/config.js"; import { updateSessionStore } from "../config/sessions/store.js"; import { resolveAllAgentSessionStoreTargetsSync } from "../config/sessions/targets.js"; import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; -import { cleanupPluginSessionSchedulerJobs, clearPluginRunContext } from "./host-hook-runtime.js"; +import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js"; +import { + cleanupPluginSessionSchedulerJobs, + clearPluginRunContext, + makePluginSessionSchedulerJobKey, +} from "./host-hook-runtime.js"; import type { PluginHostCleanupReason } from "./host-hooks.js"; import type { PluginRegistry } from "./registry-types.js"; @@ -101,7 +107,7 @@ async function clearPluginOwnedSessionStores(params: { } export async function runPluginHostCleanup(params: { - cfg: OpenClawConfig; + cfg?: OpenClawConfig; registry?: PluginRegistry | null; pluginId?: string; reason: PluginHostCleanupReason; @@ -113,72 +119,99 @@ export async function runPluginHostCleanup(params: { params.reason === "restart" ? 0 : await clearPluginOwnedSessionStores({ - cfg: params.cfg, + cfg: params.cfg ?? getRuntimeConfig(), pluginId: params.pluginId, sessionKey: params.sessionKey, }); const registry = params.registry; - if (!registry) { - return { cleanupCount: persistentCleanupCount, failures: [] }; - } const failures: PluginHostCleanupFailure[] = []; let cleanupCount = persistentCleanupCount; - for (const registration of registry.sessionExtensions ?? []) { - if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { - continue; + if (registry) { + for (const registration of registry.sessionExtensions ?? []) { + if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { + continue; + } + const cleanup = registration.extension.cleanup; + if (!cleanup) { + continue; + } + const hookId = `session:${registration.extension.namespace}`; + try { + await withPluginHostCleanupTimeout(hookId, () => + cleanup({ + reason: params.reason, + sessionKey: params.sessionKey, + }), + ); + cleanupCount += 1; + } catch (error) { + failures.push({ + pluginId: registration.pluginId, + hookId, + error, + }); + } } - const cleanup = registration.extension.cleanup; - if (!cleanup) { - continue; + for (const registration of registry.runtimeLifecycles ?? []) { + if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { + continue; + } + const cleanup = registration.lifecycle.cleanup; + if (!cleanup) { + continue; + } + const hookId = `runtime:${registration.lifecycle.id}`; + try { + await withPluginHostCleanupTimeout(hookId, () => + cleanup({ + reason: params.reason, + sessionKey: params.sessionKey, + runId: params.runId, + }), + ); + cleanupCount += 1; + } catch (error) { + failures.push({ + pluginId: registration.pluginId, + hookId, + error, + }); + } } - try { - await cleanup({ - reason: params.reason, - sessionKey: params.sessionKey, - }); - cleanupCount += 1; - } catch (error) { - failures.push({ - pluginId: registration.pluginId, - hookId: `session:${registration.extension.namespace}`, - error, - }); + const schedulerFailures = await cleanupPluginSessionSchedulerJobs({ + pluginId: params.pluginId, + reason: params.reason, + sessionKey: params.sessionKey, + records: registry.sessionSchedulerJobs, + preserveJobIds: params.preserveSchedulerJobIds, + }); + for (const failure of schedulerFailures) { + failures.push(failure); } } - for (const registration of registry.runtimeLifecycles ?? []) { - if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { - continue; - } - const cleanup = registration.lifecycle.cleanup; - if (!cleanup) { - continue; - } - try { - await cleanup({ - reason: params.reason, - sessionKey: params.sessionKey, - runId: params.runId, - }); - cleanupCount += 1; - } catch (error) { - failures.push({ - pluginId: registration.pluginId, - hookId: `runtime:${registration.lifecycle.id}`, - error, - }); + if (params.reason !== "restart") { + const registrySchedulerJobKeys = new Set( + (registry?.sessionSchedulerJobs ?? []) + .filter((record) => !params.pluginId || record.pluginId === params.pluginId) + .map((record) => ({ + pluginId: record.pluginId, + jobId: typeof record.job.id === "string" ? record.job.id.trim() : "", + })) + .filter(({ jobId }) => jobId.length > 0) + .map(({ pluginId, jobId }) => makePluginSessionSchedulerJobKey(pluginId, jobId)), + ); + const runtimeSchedulerFailures = await cleanupPluginSessionSchedulerJobs({ + pluginId: params.pluginId, + reason: params.reason, + sessionKey: params.sessionKey, + preserveJobIds: params.preserveSchedulerJobIds, + excludeJobKeys: registrySchedulerJobKeys, + }); + for (const failure of runtimeSchedulerFailures) { + failures.push(failure); } } - const schedulerFailures = await cleanupPluginSessionSchedulerJobs({ - pluginId: params.pluginId, - reason: params.reason, - sessionKey: params.sessionKey, - records: registry?.sessionSchedulerJobs, - preserveJobIds: params.preserveSchedulerJobIds, - }); - for (const failure of schedulerFailures) { - failures.push(failure); - } - if (params.pluginId || params.runId) { + if ((params.pluginId || params.runId) && (params.reason !== "restart" || params.runId)) { clearPluginRunContext({ pluginId: params.pluginId, runId: params.runId }); } return { cleanupCount, failures }; diff --git a/src/plugins/host-hook-runtime.ts b/src/plugins/host-hook-runtime.ts index 3fca1e4a565..4b8fe5b3bea 100644 --- a/src/plugins/host-hook-runtime.ts +++ b/src/plugins/host-hook-runtime.ts @@ -2,6 +2,7 @@ import type { AgentEventPayload } from "../infra/agent-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js"; import { isPluginJsonValue, type PluginHostCleanupReason, @@ -33,6 +34,7 @@ type PluginHostRuntimeState = { const PLUGIN_HOST_RUNTIME_STATE_KEY = Symbol.for("openclaw.pluginHostRuntimeState"); const CLOSED_RUN_IDS_MAX = 512; +export const PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS = 5_000; const log = createSubsystemLogger("plugins/host-hooks"); function getPluginHostRuntimeState(): PluginHostRuntimeState { @@ -83,6 +85,24 @@ function trackAgentEventHandler(runId: string, pending: Promise): void { }); } +function waitForTerminalEventHandlers(pendingHandlers: Set>): Promise { + if (pendingHandlers.size === 0) { + return Promise.resolve(); + } + let timeout: NodeJS.Timeout | undefined = setTimeout(() => { + log.warn( + `plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; preserving run context until they settle`, + ); + }, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); + timeout.unref?.(); + return Promise.allSettled(pendingHandlers).then(() => { + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + }); +} + function getPluginRunContextNamespaces(params: { runId: string; pluginId: string; @@ -108,13 +128,14 @@ function getPluginRunContextNamespaces(params: { export function setPluginRunContext(params: { pluginId: string; patch: PluginRunContextPatch; + allowClosedRun?: boolean; }): boolean { const runId = normalizeOptionalString(params.patch.runId); const namespace = normalizeNamespace(params.patch.namespace); if (!runId || !namespace) { return false; } - if (isPluginRunClosed(runId)) { + if (!params.allowClosedRun && isPluginRunClosed(runId)) { return false; } // Only an explicit `unset: true` deletes the run-context entry — silently @@ -230,6 +251,7 @@ export function dispatchPluginAgentEventSubscriptions(params: { }): void { const subscriptions = params.registry?.agentEventSubscriptions ?? []; const pendingHandlers: Promise[] = []; + const isTerminalEvent = isTerminalAgentRunEvent(params.event); for (const registration of subscriptions) { const streams = registration.subscription.streams; if (streams && streams.length > 0 && !streams.includes(params.event.stream)) { @@ -237,12 +259,17 @@ export function dispatchPluginAgentEventSubscriptions(params: { } const pluginId = registration.pluginId; const runId = params.event.runId; + let handlerActive = true; const ctx = { // oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Run-context JSON reads are caller-typed by namespace. getRunContext: (namespace: string) => getPluginRunContext({ pluginId, get: { runId, namespace } }), setRunContext: (namespace: string, value: PluginJsonValue) => { - setPluginRunContext({ pluginId, patch: { runId, namespace, value } }); + setPluginRunContext({ + pluginId, + patch: { runId, namespace, value }, + allowClosedRun: isTerminalEvent && handlerActive, + }); }, clearRunContext: (namespace?: string) => { clearPluginRunContext({ pluginId, runId, namespace }); @@ -251,16 +278,21 @@ export function dispatchPluginAgentEventSubscriptions(params: { try { const pending = Promise.resolve( registration.subscription.handle(structuredClone(params.event), ctx), - ).catch((error) => { - logAgentEventSubscriptionFailure({ - pluginId, - subscriptionId: registration.subscription.id, - error, + ) + .catch((error) => { + logAgentEventSubscriptionFailure({ + pluginId, + subscriptionId: registration.subscription.id, + error, + }); + }) + .finally(() => { + handlerActive = false; }); - }); trackAgentEventHandler(runId, pending); pendingHandlers.push(pending); } catch (error) { + handlerActive = false; logAgentEventSubscriptionFailure({ pluginId, subscriptionId: registration.subscription.id, @@ -268,12 +300,12 @@ export function dispatchPluginAgentEventSubscriptions(params: { }); } } - if (isTerminalAgentRunEvent(params.event)) { + if (isTerminalEvent) { markPluginRunClosed(params.event.runId); const pendingForRun = getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ?? new Set(pendingHandlers); - void Promise.allSettled(pendingForRun).then(() => { + void waitForTerminalEventHandlers(new Set(pendingForRun)).then(() => { clearPluginRunContext({ runId: params.event.runId }); }); } @@ -360,6 +392,10 @@ export function getPluginSessionSchedulerJobGeneration(params: { return record.generation; } +export function makePluginSessionSchedulerJobKey(pluginId: string, jobId: string): string { + return JSON.stringify([pluginId, jobId]); +} + export async function cleanupPluginSessionSchedulerJobs(params: { pluginId?: string; reason: PluginHostCleanupReason; @@ -371,6 +407,7 @@ export async function cleanupPluginSessionSchedulerJobs(params: { generation?: number; }[]; preserveJobIds?: ReadonlySet; + excludeJobKeys?: ReadonlySet; }): Promise> { const state = getPluginHostRuntimeState(); const failures: Array<{ pluginId: string; hookId: string; error: unknown }> = []; @@ -406,25 +443,30 @@ export async function cleanupPluginSessionSchedulerJobs(params: { continue; } const preserveJob = params.preserveJobIds?.has(jobId) ?? false; - if ( - preserveJob && - (record.generation === undefined || liveGeneration === record.generation) - ) { + if (preserveJob) { + // preserveJobIds means "do not run cleanup at all" — even across + // generation mismatches. The generation-matched deletion below would + // otherwise still call the OLD cleanup callback, which can remove + // external scheduled jobs (e.g. cron.remove) and break the live + // newer-generation registration that took over this jobId. continue; } // A newer generation may already own this id. The old cleanup callback can // still release plugin-owned resources, while deletion below is generation // matched so it cannot remove the newer live record. + const hookId = `scheduler:${jobId}`; try { - await record.job.cleanup?.({ - reason: params.reason, - sessionKey, - jobId, - }); + await withPluginHostCleanupTimeout(hookId, () => + record.job.cleanup?.({ + reason: params.reason, + sessionKey, + jobId, + }), + ); } catch (error) { failures.push({ pluginId: record.pluginId, - hookId: `scheduler:${jobId}`, + hookId, error, }); continue; @@ -448,16 +490,25 @@ export async function cleanupPluginSessionSchedulerJobs(params: { if (params.sessionKey && record.job.sessionKey !== params.sessionKey) { continue; } + if (params.excludeJobKeys?.has(makePluginSessionSchedulerJobKey(pluginId, jobId))) { + continue; + } + if (params.preserveJobIds?.has(jobId)) { + continue; + } + const hookId = `scheduler:${jobId}`; try { - await record.job.cleanup?.({ - reason: params.reason, - sessionKey: record.job.sessionKey, - jobId, - }); + await withPluginHostCleanupTimeout(hookId, () => + record.job.cleanup?.({ + reason: params.reason, + sessionKey: record.job.sessionKey, + jobId, + }), + ); } catch (error) { failures.push({ pluginId, - hookId: `scheduler:${jobId}`, + hookId, error, }); continue;