From 8afc9ef73cb5ad1353151d04ef2d6191219c8f53 Mon Sep 17 00:00:00 2001 From: Eva Date: Mon, 4 May 2026 21:04:22 +0700 Subject: [PATCH] [plugin sdk] Harden finalize retry and run context cleanup (#75600) Merged via squash. Prepared head SHA: ec58a6212b0aa239bf23d339d825eeff0777b611 Co-authored-by: 100yenadmin <239388517+100yenadmin@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 5 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/plugins/hooks.md | 16 + .../matrix/monitor/handler.test-helpers.ts | 3 +- .../matrix/src/matrix/monitor/handler.test.ts | 44 + .../matrix/src/matrix/monitor/handler.ts | 27 +- .../harness/lifecycle-hook-helpers.test.ts | 292 +++++- src/agents/harness/lifecycle-hook-helpers.ts | 128 ++- src/commands/onboard-non-interactive.ts | 8 +- .../run-context-lifecycle.contract.test.ts | 855 ++++++++++++++++++ src/plugins/hook-types.ts | 5 + .../hooks.before-agent-finalize.test.ts | 121 +++ src/plugins/hooks.ts | 74 +- src/plugins/host-hook-cleanup-timeout.test.ts | 36 + src/plugins/host-hook-cleanup-timeout.ts | 30 + src/plugins/host-hook-cleanup.config.test.ts | 54 ++ src/plugins/host-hook-cleanup.ts | 194 ++-- src/plugins/host-hook-runtime.ts | 173 +++- src/plugins/registry-lifecycle.ts | 19 + src/plugins/registry.ts | 54 +- src/plugins/runtime.test.ts | 81 ++ src/plugins/runtime.ts | 17 +- 22 files changed, 2112 insertions(+), 128 deletions(-) create mode 100644 src/plugins/contracts/run-context-lifecycle.contract.test.ts create mode 100644 src/plugins/host-hook-cleanup-timeout.test.ts create mode 100644 src/plugins/host-hook-cleanup-timeout.ts create mode 100644 src/plugins/host-hook-cleanup.config.test.ts create mode 100644 src/plugins/registry-lifecycle.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 84e0cc6bc9f..9d4a2b28a43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ Docs: https://docs.openclaw.ai - Plugins/update: clean stale bundled load paths for already-externalized pinned npm and ClawHub plugin installs, so release-channel sync does not leave removed bundled paths ahead of the installed external package. Thanks @vincentkoc. - Telegram: accept plugin-owned numeric forum-topic targets in the agent message tool and keep reply-dispatch provider chunks behind a real stable runtime alias during in-place package updates. Fixes #77137. Thanks @richardmqq. - Google Meet: preserve `realtime.introMessage: ""` so realtime Chrome joins can stay silent instead of restoring the default spoken intro. Thanks @vincentkoc. +- Plugins/SDK: add bounded `before_agent_finalize` retry instructions so workflow plugins can request one more model pass. Thanks @100yenadmin. +- Discord/status: add degraded Discord transport and gateway event-loop starvation signals to `openclaw channels status`, `openclaw status --deep`, and fetch-timeout logs so intermittent socket resets do not look like a healthy running channel. (#76327) Thanks @joshavant. - Providers/OpenRouter: add opt-in response caching params that send OpenRouter's `X-OpenRouter-Cache`, `X-OpenRouter-Cache-TTL`, and cache-clear headers only on verified OpenRouter routes. Thanks @vincentkoc. - Providers/OpenRouter: expand app-attribution categories so OpenClaw advertises coding, programming, writing, chat, and personal-agent usage on verified OpenRouter routes. Thanks @vincentkoc. - Plugins/runtime state: add `registerIfAbsent` for atomic keyed-store dedupe claims that return whether a plugin successfully claimed a key without overwriting an existing live value. Thanks @amknight. @@ -465,6 +467,9 @@ Docs: https://docs.openclaw.ai - Gateway/CLI: make `openclaw gateway start` repair stale managed service definitions that point at old OpenClaw versions, missing binaries, or temporary installer paths before starting. - Heartbeat/scheduler: make heartbeat phase scheduling active-hours-aware so the scheduler seeks forward to the first in-window phase slot instead of arming timers for quiet-hours slots and relying solely on the runtime guard. Non-UTC `activeHours.timezone` values (e.g. `Asia/Shanghai`) now correctly influence when the next heartbeat timer fires, avoiding wasted quiet-hours ticks and long dormant gaps after gateway restarts. Fixes #75487. Thanks @amknight. - Providers/Arcee AI: mark Trinity Large Thinking as tool-incompatible so main-session runs use the same text-only request shape that made subagent runs recover, avoiding the remaining main-session response-shape mismatch after the #62848 transport failover fix. Fixes #62851 and #62847; carries forward #62848. Thanks @Adam-Researchh. +- Plugins/SDK: harden run-scoped plugin context cleanup so finalized workflow runs do not leak per-run state. Thanks @100yenadmin. +- Plugins/SDK: keep stale async registry cleanup from clearing restored plugin run context and scheduler state after a plugin registry is reactivated. (#75600) Thanks @100yenadmin. +- Plugins/SDK: preserve restored plugin scheduler state when earlier delayed replacement cleanup finishes after reactivation. Thanks @100yenadmin. - Status: show the `openai-codex` OAuth profile for `openai/gpt-*` sessions running through the native Codex runtime instead of reporting auth as unknown. (#76197) Thanks @mbelinky. - Gateway: avoid repeated plugin tool descriptor config hashing so large runtime configs do not block reply startup and trigger reconnect/timeouts. (#75944) Thanks @joshavant. - Plugins/externalization: keep diagnostics ClawHub packages and persisted bundled-plugin relocation on npm-first install metadata for launch, and omit Discord from the core package now that its external package is published. Thanks @vincentkoc. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 3e3904a90db..a4cba731222 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -c38441e2e18aa519c5dc22c2b593694444869673447740327c87f16f3d4a0f8d plugin-sdk-api-baseline.json -5711948923b5a4f89ac04a182266ee0fb57275369a3a8112433f3758a7d38c86 plugin-sdk-api-baseline.jsonl +3c0423e26e758e7a5f5febcbaacd6a7ceb8584a8eecd0224f7ce98e6bcb9e9c0 plugin-sdk-api-baseline.json +952ba44c63a9f2107fc10aead1d0cc77ef06ac9a9befcac3ca9e4b0f4427cdfc plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/hooks.md b/docs/plugins/hooks.md index 094cf4d651d..b0e928ea50c 100644 --- a/docs/plugins/hooks.md +++ b/docs/plugins/hooks.md @@ -264,6 +264,22 @@ the harness for one more model pass before finalization, `{ action: Codex native `Stop` hooks are relayed into this hook as OpenClaw `before_agent_finalize` decisions. +When returning `action: "revise"`, plugins can include `retry` metadata to make +the extra model pass bounded and replay-safe: + +```typescript +type BeforeAgentFinalizeRetry = { + instruction: string; + idempotencyKey?: string; + maxAttempts?: number; +}; +``` + +`instruction` is appended to the revision reason sent to the harness. +`idempotencyKey` lets the host count retries for the same plugin request across +equivalent finalize decisions, and `maxAttempts` caps how many extra passes the +host will allow before continuing with the natural final answer. + Non-bundled plugins that need `llm_input`, `llm_output`, `before_agent_finalize`, or `agent_end` must set: diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index b0d486e090f..b75ad343ed3 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -23,6 +23,7 @@ type MatrixHandlerTestHarnessOptions = { accountId?: string; accountConfig?: MatrixConfig; cfg?: unknown; + liveCfg?: unknown; client?: Partial; runtime?: RuntimeEnv; logger?: RuntimeLogger; @@ -192,7 +193,7 @@ export function createMatrixHandlerTestHarness( } as never, core: { config: { - current: () => cfgForHandler, + current: () => options.liveCfg ?? cfgForHandler, }, channel: { pairing: { diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index a8278f4ce8c..d9036d85935 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -228,6 +228,50 @@ describe("matrix monitor handler pairing account scope", () => { ); }); + it("uses live dmScope when deciding whether to pin main DM route updates", async () => { + const startupCfg = { + session: { dmScope: "main" }, + channels: { + matrix: { + dm: { allowFrom: ["@owner:example.org"] }, + }, + }, + }; + const liveCfg = { + session: { dmScope: "per-channel-peer" }, + channels: { + matrix: { + dm: { allowFrom: ["@owner:example.org"] }, + }, + }, + }; + const { handler, recordInboundSession } = createMatrixHandlerTestHarness({ + cfg: startupCfg, + liveCfg, + dmPolicy: "allowlist", + allowFrom: ["@owner:example.org"], + allowFromResolvedEntries: [{ input: "@owner:example.org", id: "@owner:example.org" }], + isDirectMessage: true, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$owner-dm-live-scope", + sender: "@owner:example.org", + body: "hello", + }), + ); + + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + updateLastRoute: expect.objectContaining({ + mainDmOwnerPin: undefined, + }), + }), + ); + }); + it("sends pairing reminders for pending requests with cooldown", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-01T10:00:00.000Z")); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 472fa2d6097..6d0441946d6 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1198,7 +1198,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam triggerSnapshot, threadRootId: _threadRootId, thread, - effectiveAllowFrom, effectiveGroupAllowFrom, effectiveRoomUsers, } = resolvedIngressResult; @@ -1940,11 +1939,27 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam onIdle: typingCallbacks.onIdle, }); const pinnedMainDmOwner = isDirectMessage - ? resolvePinnedMainDmOwnerFromAllowlist({ - dmScope: cfg.session?.dmScope, - allowFrom: effectiveAllowFrom, - normalizeEntry: normalizeMatrixUserId, - }) + ? await (async () => { + const livePinnedCfg = core.config.current() as CoreConfig; + const livePinnedAllowlists = resolveMatrixAccountAllowlistConfig({ + cfg: livePinnedCfg, + accountId, + }); + const livePinnedDmAllowFrom = await resolveCachedLiveAllowlist({ + cfg: livePinnedCfg, + entries: livePinnedAllowlists.dmAllowFrom, + startupResolvedEntries: allowFromResolvedEntries, + cache: liveDmAllowlistCache, + updateCache: (next) => { + liveDmAllowlistCache = next; + }, + }); + return resolvePinnedMainDmOwnerFromAllowlist({ + dmScope: livePinnedCfg.session?.dmScope, + allowFrom: livePinnedDmAllowFrom, + normalizeEntry: normalizeMatrixUserId, + }); + })() : null; const turnResult = await core.channel.turn.run({ diff --git a/src/agents/harness/lifecycle-hook-helpers.test.ts b/src/agents/harness/lifecycle-hook-helpers.test.ts index 1c2e4cdb24c..6f1ec3ff7e1 100644 --- a/src/agents/harness/lifecycle-hook-helpers.test.ts +++ b/src/agents/harness/lifecycle-hook-helpers.test.ts @@ -1,5 +1,6 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { + clearAgentHarnessFinalizeRetryBudget, runAgentHarnessAgentEndHook, runAgentHarnessBeforeAgentFinalizeHook, runAgentHarnessLlmInputHook, @@ -10,7 +11,24 @@ const legacyHookRunner = { hasHooks: () => true, }; +const EVENT = { + runId: "run-1", + sessionId: "session-1", + sessionKey: "agent:main:session-1", + turnId: "turn-1", + provider: "codex", + model: "gpt-5.4", + cwd: "/repo", + transcriptPath: "/tmp/session.jsonl", + stopHookActive: false, + lastAssistantMessage: "done", +}; + describe("agent harness lifecycle hook helpers", () => { + afterEach(() => { + clearAgentHarnessFinalizeRetryBudget(); + }); + it("ignores legacy hook runners that advertise llm_input without a runner method", () => { expect(() => runAgentHarnessLlmInputHook({ @@ -50,4 +68,276 @@ describe("agent harness lifecycle hook helpers", () => { } as never), ).resolves.toEqual({ action: "continue" }); }); + + it("clears finalize retry budgets by run id", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "revise once", + idempotencyKey: "stable", + maxAttempts: 1, + }, + }), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise once" }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + + clearAgentHarnessFinalizeRetryBudget({ runId: "run-1" }); + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise once" }); + }); + + it("does not clear finalize retry budgets for runs that only share a prefix", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "revise child once", + idempotencyKey: "stable", + maxAttempts: 1, + }, + }), + }; + const childEvent = { + ...EVENT, + runId: "run:child", + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: childEvent, + ctx: { runId: "run:child", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise child once" }); + + clearAgentHarnessFinalizeRetryBudget({ runId: "run" }); + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: childEvent, + ctx: { runId: "run:child", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + }); + + it("keys finalize retry budgets by context run id when the event omits run id", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "revise from context run", + idempotencyKey: "stable", + maxAttempts: 1, + }, + }), + }; + const eventWithoutRunId = { + ...EVENT, + runId: undefined, + sessionId: "shared-session", + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: eventWithoutRunId, + ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise from context run" }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: eventWithoutRunId, + ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + + clearAgentHarnessFinalizeRetryBudget({ runId: "run-from-context" }); + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: eventWithoutRunId, + ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "revise", reason: "revise from context run" }); + }); + + it("preserves merged revise reasons when retry metadata is present", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + reason: "fix generated baseline\n\nrerun the focused tests", + retry: { + instruction: "rerun the focused tests", + idempotencyKey: "merged-reason", + maxAttempts: 1, + }, + }), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: "fix generated baseline\n\nrerun the focused tests", + }); + }); + + it("honors a later finalize retry candidate after an earlier candidate is spent", async () => { + const firstRetry = { + instruction: "regenerate artifacts", + idempotencyKey: "artifacts", + maxAttempts: 1, + }; + const secondRetry = { + instruction: "rerun focused tests", + idempotencyKey: "tests", + maxAttempts: 1, + }; + const result = { + action: "revise", + reason: "retry generated artifacts\n\nretry focused tests", + retry: firstRetry, + }; + Object.defineProperty(result, "retryCandidates", { + enumerable: false, + value: [firstRetry, secondRetry], + }); + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue(result), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: "retry generated artifacts\n\nretry focused tests\n\nregenerate artifacts", + }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: "retry generated artifacts\n\nretry focused tests\n\nrerun focused tests", + }); + }); + + it("falls back to retry instruction keys when retry idempotency keys are malformed", async () => { + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi.fn().mockResolvedValue({ + action: "revise", + retry: { + instruction: "retry with a safe key", + idempotencyKey: { invalid: true }, + maxAttempts: 1, + } as never, + }), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: "retry with a safe key", + }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ action: "continue" }); + }); + + it("does not collide fallback retry keys for long instructions with shared prefixes", async () => { + const sharedPrefix = "x".repeat(180); + const firstInstruction = `${sharedPrefix} first`; + const secondInstruction = `${sharedPrefix} second`; + const hookRunner = { + hasHooks: () => true, + runBeforeAgentFinalize: vi + .fn() + .mockResolvedValueOnce({ + action: "revise", + retry: { + instruction: firstInstruction, + idempotencyKey: { invalid: true }, + maxAttempts: 1, + }, + }) + .mockResolvedValueOnce({ + action: "revise", + retry: { + instruction: secondInstruction, + idempotencyKey: { invalid: true }, + maxAttempts: 1, + }, + }), + }; + + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: firstInstruction, + }); + await expect( + runAgentHarnessBeforeAgentFinalizeHook({ + event: EVENT, + ctx: { runId: "run-1", sessionKey: "agent:main:session-1" }, + hookRunner: hookRunner as never, + }), + ).resolves.toEqual({ + action: "revise", + reason: secondInstruction, + }); + }); }); diff --git a/src/agents/harness/lifecycle-hook-helpers.ts b/src/agents/harness/lifecycle-hook-helpers.ts index db35af577ef..cbad11d9aa4 100644 --- a/src/agents/harness/lifecycle-hook-helpers.ts +++ b/src/agents/harness/lifecycle-hook-helpers.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import type { @@ -7,11 +8,57 @@ import type { PluginHookLlmInputEvent, PluginHookLlmOutputEvent, } from "../../plugins/hook-types.js"; +import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; import { buildAgentHookContext, type AgentHarnessHookContext } from "./hook-context.js"; const log = createSubsystemLogger("agents/harness"); +const FINALIZE_RETRY_BUDGET_KEY = Symbol.for("openclaw.pluginFinalizeRetryBudget"); +const FINALIZE_RETRY_BUDGET_MAX_ENTRIES = 2048; type AgentHarnessHookRunner = ReturnType; +type FinalizeRetryBudget = Map>; + +function getFinalizeRetryBudget(): FinalizeRetryBudget { + return resolveGlobalSingleton(FINALIZE_RETRY_BUDGET_KEY, () => new Map()); +} + +function countFinalizeRetryBudgetEntries(budget: FinalizeRetryBudget): number { + let count = 0; + for (const runBudget of budget.values()) { + count += runBudget.size; + } + return count; +} + +function pruneFinalizeRetryBudget(budget: FinalizeRetryBudget): void { + while (countFinalizeRetryBudgetEntries(budget) > FINALIZE_RETRY_BUDGET_MAX_ENTRIES) { + const oldestRunId = budget.keys().next().value; + if (oldestRunId === undefined) { + return; + } + const oldestRunBudget = budget.get(oldestRunId); + const oldestRetryKey = oldestRunBudget?.keys().next().value; + if (oldestRunBudget && oldestRetryKey !== undefined) { + oldestRunBudget.delete(oldestRetryKey); + } + if (!oldestRunBudget || oldestRunBudget.size === 0) { + budget.delete(oldestRunId); + } + } +} + +function buildFinalizeRetryInstructionKey(instruction: string): string { + return `instruction:${createHash("sha256").update(instruction).digest("hex")}`; +} + +export function clearAgentHarnessFinalizeRetryBudget(params?: { runId?: string }): void { + const budget = getFinalizeRetryBudget(); + if (!params?.runId) { + budget.clear(); + return; + } + budget.delete(params.runId); +} export function runAgentHarnessLlmInputHook(params: { event: PluginHookLlmInputEvent; @@ -73,8 +120,16 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: { return { action: "continue" }; } try { + const eventForNormalization: PluginHookBeforeAgentFinalizeEvent = { + ...params.event, + runId: params.event.runId ?? params.ctx.runId, + }; return normalizeBeforeAgentFinalizeResult( - await hookRunner.runBeforeAgentFinalize(params.event, buildAgentHookContext(params.ctx)), + await hookRunner.runBeforeAgentFinalize( + eventForNormalization, + buildAgentHookContext(params.ctx), + ), + eventForNormalization, ); } catch (error) { log.warn(`before_agent_finalize hook failed: ${String(error)}`); @@ -84,15 +139,78 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: { function normalizeBeforeAgentFinalizeResult( result: PluginHookBeforeAgentFinalizeResult | undefined, + event?: PluginHookBeforeAgentFinalizeEvent, ): AgentHarnessBeforeAgentFinalizeOutcome { if (result?.action === "finalize") { - return result.reason?.trim() - ? { action: "finalize", reason: result.reason.trim() } - : { action: "finalize" }; + const reason = normalizeTrimmedString(result.reason); + return reason ? { action: "finalize", reason } : { action: "finalize" }; } if (result?.action === "revise") { - const reason = result.reason?.trim(); + const retryCandidates = readBeforeAgentFinalizeRetryCandidates(result); + if (retryCandidates.length > 0) { + const reason = normalizeTrimmedString(result.reason); + for (const retry of retryCandidates) { + const retryInstruction = normalizeTrimmedString(retry.instruction); + if (!retryInstruction) { + continue; + } + const maxAttempts = + typeof retry.maxAttempts === "number" && Number.isFinite(retry.maxAttempts) + ? Math.max(1, Math.floor(retry.maxAttempts)) + : 1; + const retryRunId = event?.runId ?? event?.sessionId ?? "unknown-run"; + const retryKey = + normalizeTrimmedString(retry.idempotencyKey) || + buildFinalizeRetryInstructionKey(retryInstruction); + const budget = getFinalizeRetryBudget(); + const runBudget = budget.get(retryRunId) ?? new Map(); + const nextCount = (runBudget.get(retryKey) ?? 0) + 1; + runBudget.delete(retryKey); + runBudget.set(retryKey, nextCount); + budget.delete(retryRunId); + budget.set(retryRunId, runBudget); + pruneFinalizeRetryBudget(budget); + if (nextCount > maxAttempts) { + continue; + } + const revisedReason = + reason && reason.includes(retryInstruction) + ? reason + : [reason, retryInstruction].filter(Boolean).join("\n\n"); + return { action: "revise", reason: revisedReason }; + } + return { action: "continue" }; + } + const reason = normalizeTrimmedString(result.reason); return reason ? { action: "revise", reason } : { action: "continue" }; } return { action: "continue" }; } + +function readBeforeAgentFinalizeRetryCandidates( + result: PluginHookBeforeAgentFinalizeResult, +): NonNullable[] { + const candidateList = ( + result as { + retryCandidates?: unknown; + } + ).retryCandidates; + if (Array.isArray(candidateList) && candidateList.length > 0) { + return candidateList.filter(isBeforeAgentFinalizeRetry); + } + return isBeforeAgentFinalizeRetry(result.retry) ? [result.retry] : []; +} + +function isBeforeAgentFinalizeRetry( + value: unknown, +): value is NonNullable { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +function normalizeTrimmedString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} diff --git a/src/commands/onboard-non-interactive.ts b/src/commands/onboard-non-interactive.ts index e77a23f2b3f..ecf2d550917 100644 --- a/src/commands/onboard-non-interactive.ts +++ b/src/commands/onboard-non-interactive.ts @@ -27,16 +27,16 @@ function createNonInteractiveMigrationPrompter(runtime: RuntimeEnv): WizardPromp async note(message, title) { runtime.log(title ? `${title}\n${message}` : message); }, - select(params) { + async select(params) { return unavailable(params.message); }, - multiselect(params) { + async multiselect(params) { return unavailable(params.message); }, - text(params) { + async text(params) { return unavailable(params.message); }, - confirm(params) { + async confirm(params) { return unavailable(params.message); }, progress(label) { diff --git a/src/plugins/contracts/run-context-lifecycle.contract.test.ts b/src/plugins/contracts/run-context-lifecycle.contract.test.ts new file mode 100644 index 00000000000..b1c9a77f86d --- /dev/null +++ b/src/plugins/contracts/run-context-lifecycle.contract.test.ts @@ -0,0 +1,855 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { + createPluginRegistryFixture, + registerTestPlugin, +} from "openclaw/plugin-sdk/plugin-test-contracts"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { loadSessionStore, updateSessionStore } from "../../config/sessions.js"; +import { withTempConfig } from "../../gateway/test-temp-config.js"; +import { emitAgentEvent, resetAgentEventsForTest } from "../../infra/agent-events.js"; +import { resolvePreferredOpenClawTmpDir } from "../../infra/tmp-openclaw-dir.js"; +import { PLUGIN_HOST_CLEANUP_TIMEOUT_MS } from "../host-hook-cleanup-timeout.js"; +import { runPluginHostCleanup } from "../host-hook-cleanup.js"; +import { + clearPluginHostRuntimeState, + getPluginRunContext, + listPluginSessionSchedulerJobs, + PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS, + dispatchPluginAgentEventSubscriptions, + registerPluginSessionSchedulerJob, + setPluginRunContext, +} from "../host-hook-runtime.js"; +import { createEmptyPluginRegistry } from "../registry-empty.js"; +import { setActivePluginRegistry } from "../runtime.js"; +import { createPluginRecord } from "../status.test-helpers.js"; +import type { OpenClawPluginApi } from "../types.js"; + +async function waitForPluginEventHandlers(): Promise { + await new Promise((resolve) => { + setTimeout(resolve, 0); + }); +} + +describe("plugin run context lifecycle", () => { + afterEach(() => { + vi.useRealTimers(); + setActivePluginRegistry(createEmptyPluginRegistry()); + clearPluginHostRuntimeState(); + resetAgentEventsForTest(); + }); + + it("blocks stale plugin API run-context mutations after registry replacement", () => { + const { config, registry } = createPluginRegistryFixture(); + let capturedApi: OpenClawPluginApi | undefined; + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "stale-run-context-plugin", + name: "Stale Run Context Plugin", + }), + register(api) { + capturedApi = api; + }, + }); + setActivePluginRegistry(registry.registry); + setActivePluginRegistry(createEmptyPluginRegistry()); + + expect( + capturedApi?.setRunContext({ + runId: "stale-run", + namespace: "state", + value: { stale: true }, + }), + ).toBe(false); + expect( + getPluginRunContext({ + pluginId: "stale-run-context-plugin", + get: { runId: "stale-run", namespace: "state" }, + }), + ).toBeUndefined(); + + expect( + setPluginRunContext({ + pluginId: "stale-run-context-plugin", + patch: { runId: "stale-run", namespace: "state", value: { live: true } }, + }), + ).toBe(true); + capturedApi?.clearRunContext({ runId: "stale-run", namespace: "state" }); + expect( + getPluginRunContext({ + pluginId: "stale-run-context-plugin", + get: { runId: "stale-run", namespace: "state" }, + }), + ).toEqual({ live: true }); + }); + + it("allows run-context mutations after a previous registry is restored active", () => { + const { config, registry } = createPluginRegistryFixture(); + let capturedApi: OpenClawPluginApi | undefined; + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "restored-run-context-plugin", + name: "Restored Run Context Plugin", + }), + register(api) { + capturedApi = api; + }, + }); + setActivePluginRegistry(registry.registry); + setActivePluginRegistry(createEmptyPluginRegistry()); + setActivePluginRegistry(registry.registry); + + expect( + capturedApi?.setRunContext({ + runId: "restored-run", + namespace: "state", + value: { restored: true }, + }), + ).toBe(true); + expect( + getPluginRunContext({ + pluginId: "restored-run-context-plugin", + get: { runId: "restored-run", namespace: "state" }, + }), + ).toEqual({ restored: true }); + }); + + it("allows run-context initialization during activating plugin registration", () => { + const { config, registry } = createPluginRegistryFixture(); + const api = registry.createApi( + createPluginRecord({ + id: "registration-run-context-plugin", + name: "Registration Run Context Plugin", + }), + { config }, + ); + + expect( + api.setRunContext({ + runId: "run-registration", + namespace: "state", + value: { initialized: true }, + }), + ).toBe(true); + expect( + getPluginRunContext({ + pluginId: "registration-run-context-plugin", + get: { runId: "run-registration", namespace: "state" }, + }), + ).toEqual({ initialized: true }); + + api.clearRunContext({ runId: "run-registration", namespace: "state" }); + expect( + getPluginRunContext({ + pluginId: "registration-run-context-plugin", + get: { runId: "run-registration", namespace: "state" }, + }), + ).toBeUndefined(); + }); + + it("keeps restored active registry state after stale async cleanup finishes", async () => { + let releaseCleanup: (() => void) | undefined; + let markCleanupStarted: (() => void) | undefined; + let capturedApi: OpenClawPluginApi | undefined; + const cleanupStarted = new Promise((resolve) => { + markCleanupStarted = resolve; + }); + const cleanupRelease = new Promise((resolve) => { + releaseCleanup = resolve; + }); + const schedulerCleanup = vi.fn(); + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "delayed-restored-registry-plugin", + name: "Delayed Restored Registry Plugin", + }), + register(api) { + capturedApi = api; + api.registerRuntimeLifecycle({ + id: "delayed-cleanup", + async cleanup() { + markCleanupStarted?.(); + await cleanupRelease; + }, + }); + api.registerSessionSchedulerJob({ + id: "live-job", + sessionKey: "agent:main:main", + kind: "session-turn", + cleanup: schedulerCleanup, + }); + }, + }); + setActivePluginRegistry(registry.registry); + setActivePluginRegistry(createEmptyPluginRegistry()); + await cleanupStarted; + setActivePluginRegistry(registry.registry); + + expect( + capturedApi?.setRunContext({ + runId: "restored-after-cleanup-started", + namespace: "state", + value: { restored: true }, + }), + ).toBe(true); + + releaseCleanup?.(); + await waitForPluginEventHandlers(); + await waitForPluginEventHandlers(); + + expect( + getPluginRunContext({ + pluginId: "delayed-restored-registry-plugin", + get: { runId: "restored-after-cleanup-started", namespace: "state" }, + }), + ).toEqual({ restored: true }); + expect(schedulerCleanup).not.toHaveBeenCalled(); + expect(listPluginSessionSchedulerJobs("delayed-restored-registry-plugin")).toEqual([ + { + id: "live-job", + pluginId: "delayed-restored-registry-plugin", + sessionKey: "agent:main:main", + kind: "session-turn", + }, + ]); + }); + + it("does not let delayed non-terminal subscriptions resurrect closed run context", async () => { + let releaseToolHandler: (() => void) | undefined; + let delayedToolHandlerSawContext: unknown; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "delayed-subscription", + name: "Delayed Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "delayed", + streams: ["tool"], + async handle(_event, ctx) { + ctx.setRunContext("before-terminal", { visible: true }); + await new Promise((resolve) => { + releaseToolHandler = resolve; + }); + delayedToolHandlerSawContext = ctx.getRunContext("before-terminal"); + ctx.setRunContext("late", { resurrected: true }); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-delayed-subscription", + stream: "tool", + data: { name: "tool" }, + }); + await Promise.resolve(); + + emitAgentEvent({ + runId: "run-delayed-subscription", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + expect( + getPluginRunContext({ + pluginId: "delayed-subscription", + get: { runId: "run-delayed-subscription", namespace: "before-terminal" }, + }), + ).toEqual({ visible: true }); + + releaseToolHandler?.(); + await waitForPluginEventHandlers(); + + expect(delayedToolHandlerSawContext).toEqual({ visible: true }); + expect( + getPluginRunContext({ + pluginId: "delayed-subscription", + get: { runId: "run-delayed-subscription", namespace: "late" }, + }), + ).toBeUndefined(); + }); + + it("preserves run context until async terminal event subscriptions settle", async () => { + let releaseTerminalHandler: (() => void) | undefined; + let terminalHandlerSawContext: unknown; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "async-terminal-subscription", + name: "Async Terminal Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "records", + streams: ["tool", "lifecycle"], + async handle(event, ctx) { + if (event.stream === "tool") { + ctx.setRunContext("seen", { runId: event.runId }); + return; + } + if (event.data?.phase !== "end") { + return; + } + await new Promise((resolve) => { + releaseTerminalHandler = resolve; + }); + terminalHandlerSawContext = ctx.getRunContext("seen"); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-async-terminal", + stream: "tool", + data: { name: "tool" }, + }); + await Promise.resolve(); + + emitAgentEvent({ + runId: "run-async-terminal", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + expect( + getPluginRunContext({ + pluginId: "async-terminal-subscription", + get: { runId: "run-async-terminal", namespace: "seen" }, + }), + ).toEqual({ runId: "run-async-terminal" }); + + releaseTerminalHandler?.(); + await waitForPluginEventHandlers(); + + expect(terminalHandlerSawContext).toEqual({ runId: "run-async-terminal" }); + expect( + getPluginRunContext({ + pluginId: "async-terminal-subscription", + get: { runId: "run-async-terminal", namespace: "seen" }, + }), + ).toBeUndefined(); + }); + + it("waits for terminal handlers added after the first terminal cleanup waiter starts", async () => { + let releaseFirstTerminalHandler: (() => void) | undefined; + let releaseSecondTerminalHandler: (() => void) | undefined; + let firstTerminalHandlerSawContext: unknown; + let secondTerminalHandlerSawContext: unknown; + let terminalEventsSeen = 0; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "repeated-terminal-live-wait", + name: "Repeated Terminal Live Wait", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "records", + streams: ["tool", "lifecycle"], + async handle(event, ctx) { + if (event.stream === "tool") { + ctx.setRunContext("seen", { runId: event.runId }); + return; + } + if (event.data?.phase !== "end") { + return; + } + terminalEventsSeen += 1; + if (terminalEventsSeen === 1) { + await new Promise((resolve) => { + releaseFirstTerminalHandler = resolve; + }); + firstTerminalHandlerSawContext = ctx.getRunContext("seen"); + return; + } + await new Promise((resolve) => { + releaseSecondTerminalHandler = resolve; + }); + secondTerminalHandlerSawContext = ctx.getRunContext("seen"); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-repeated-terminal-live-wait", + stream: "tool", + data: { name: "tool" }, + }); + await waitForPluginEventHandlers(); + + emitAgentEvent({ + runId: "run-repeated-terminal-live-wait", + stream: "lifecycle", + data: { phase: "end" }, + }); + await waitForPluginEventHandlers(); + + emitAgentEvent({ + runId: "run-repeated-terminal-live-wait", + stream: "lifecycle", + data: { phase: "end" }, + }); + await waitForPluginEventHandlers(); + + releaseFirstTerminalHandler?.(); + await waitForPluginEventHandlers(); + expect(firstTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" }); + expect( + getPluginRunContext({ + pluginId: "repeated-terminal-live-wait", + get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" }, + }), + ).toEqual({ runId: "run-repeated-terminal-live-wait" }); + + releaseSecondTerminalHandler?.(); + await waitForPluginEventHandlers(); + await waitForPluginEventHandlers(); + + expect(secondTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" }); + expect( + getPluginRunContext({ + pluginId: "repeated-terminal-live-wait", + get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" }, + }), + ).toBeUndefined(); + }); + + it("clears run context after the terminal subscription grace period", async () => { + vi.useFakeTimers(); + let releaseTerminalHandler: (() => void) | undefined; + let terminalHandlerSawContext: unknown; + let terminalHandlerWroteContext: unknown; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "slow-terminal-subscription", + name: "Slow Terminal Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "slow", + streams: ["tool", "lifecycle"], + async handle(event, ctx) { + if (event.stream === "tool") { + ctx.setRunContext("seen", { runId: event.runId }); + return; + } + if (event.data?.phase === "end") { + await new Promise((resolve) => { + releaseTerminalHandler = resolve; + }); + terminalHandlerSawContext = ctx.getRunContext("seen"); + ctx.setRunContext("terminal", { completed: true }); + terminalHandlerWroteContext = ctx.getRunContext("terminal"); + } + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-slow-terminal", + stream: "tool", + data: { name: "tool" }, + }); + await Promise.resolve(); + + emitAgentEvent({ + runId: "run-slow-terminal", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + await vi.advanceTimersByTimeAsync(PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); + expect( + getPluginRunContext({ + pluginId: "slow-terminal-subscription", + get: { runId: "run-slow-terminal", namespace: "seen" }, + }), + ).toBeUndefined(); + + releaseTerminalHandler?.(); + await vi.advanceTimersByTimeAsync(0); + + expect(terminalHandlerSawContext).toBeUndefined(); + expect(terminalHandlerWroteContext).toBeUndefined(); + expect( + getPluginRunContext({ + pluginId: "slow-terminal-subscription", + get: { runId: "run-slow-terminal", namespace: "seen" }, + }), + ).toBeUndefined(); + expect( + getPluginRunContext({ + pluginId: "slow-terminal-subscription", + get: { runId: "run-slow-terminal", namespace: "terminal" }, + }), + ).toBeUndefined(); + }); + + it("keeps the expired terminal marker across repeated terminal events", async () => { + vi.useFakeTimers(); + let releaseFirstTerminalHandler: (() => void) | undefined; + let firstTerminalHandlerWroteContext: unknown; + let secondTerminalHandlerWroteContext: unknown; + let terminalEventsSeen = 0; + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "repeated-terminal-subscription", + name: "Repeated Terminal Subscription", + }), + register(api) { + api.registerAgentEventSubscription({ + id: "repeat-terminal", + streams: ["lifecycle"], + async handle(event, ctx) { + if (event.data?.phase !== "end") { + return; + } + terminalEventsSeen += 1; + if (terminalEventsSeen === 1) { + await new Promise((resolve) => { + releaseFirstTerminalHandler = resolve; + }); + ctx.setRunContext("terminal", { from: "first" }); + firstTerminalHandlerWroteContext = ctx.getRunContext("terminal"); + return; + } + ctx.setRunContext("terminal", { from: "second" }); + secondTerminalHandlerWroteContext = ctx.getRunContext("terminal"); + }, + }); + }, + }); + setActivePluginRegistry(registry.registry); + + emitAgentEvent({ + runId: "run-repeat-terminal", + stream: "lifecycle", + data: { phase: "end" }, + }); + await Promise.resolve(); + + await vi.advanceTimersByTimeAsync(PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); + + emitAgentEvent({ + runId: "run-repeat-terminal", + stream: "lifecycle", + data: { phase: "end" }, + }); + await vi.advanceTimersByTimeAsync(0); + + expect(secondTerminalHandlerWroteContext).toBeUndefined(); + expect( + getPluginRunContext({ + pluginId: "repeated-terminal-subscription", + get: { runId: "run-repeat-terminal", namespace: "terminal" }, + }), + ).toBeUndefined(); + + releaseFirstTerminalHandler?.(); + await vi.advanceTimersByTimeAsync(0); + + expect(firstTerminalHandlerWroteContext).toBeUndefined(); + expect( + getPluginRunContext({ + pluginId: "repeated-terminal-subscription", + get: { runId: "run-repeat-terminal", namespace: "terminal" }, + }), + ).toBeUndefined(); + }); + + it("preserves scheduler jobs instead of invoking stale cleanup callbacks", async () => { + const cleanup = vi.fn(); + registerPluginSessionSchedulerJob({ + pluginId: "scheduler-plugin", + pluginName: "Scheduler Plugin", + job: { + id: "job-preserved", + sessionKey: "agent:main:main", + kind: "session-turn", + cleanup, + }, + }); + + await expect( + runPluginHostCleanup({ + reason: "disable", + pluginId: "scheduler-plugin", + preserveSchedulerJobIds: new Set(["job-preserved"]), + }), + ).resolves.toMatchObject({ failures: [] }); + expect(cleanup).not.toHaveBeenCalled(); + expect(listPluginSessionSchedulerJobs("scheduler-plugin")).toHaveLength(1); + }); + + it("preserves plugin run context during restart cleanup", async () => { + const registry = createEmptyPluginRegistry(); + expect( + setPluginRunContext({ + pluginId: "restart-context-plugin", + patch: { runId: "run-restart", namespace: "state", value: { keep: true } }, + }), + ).toBe(true); + + await expect( + runPluginHostCleanup({ + registry, + pluginId: "restart-context-plugin", + reason: "restart", + }), + ).resolves.toMatchObject({ failures: [] }); + expect( + getPluginRunContext({ + pluginId: "restart-context-plugin", + get: { runId: "run-restart", namespace: "state" }, + }), + ).toEqual({ keep: true }); + + await expect( + runPluginHostCleanup({ + registry, + pluginId: "restart-context-plugin", + reason: "disable", + }), + ).resolves.toMatchObject({ failures: [] }); + expect( + getPluginRunContext({ + pluginId: "restart-context-plugin", + get: { runId: "run-restart", namespace: "state" }, + }), + ).toBeUndefined(); + }); + + it("preserves durable plugin session state during plugin restart cleanup", async () => { + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "restart-state-fixture", + name: "Restart State Fixture", + }), + register(api) { + api.registerSessionExtension({ + namespace: "workflow", + description: "restart state test", + }); + }, + }); + + const stateDir = await fs.mkdtemp( + path.join(resolvePreferredOpenClawTmpDir(), "openclaw-run-context-restart-state-"), + ); + const storePath = path.join(stateDir, "sessions.json"); + const tempConfig = { + session: { store: storePath }, + }; + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + try { + process.env.OPENCLAW_STATE_DIR = stateDir; + await withTempConfig({ + cfg: tempConfig, + run: async () => { + await updateSessionStore(storePath, (store) => { + store["agent:main:main"] = { + sessionId: "session-1", + updatedAt: Date.now(), + pluginExtensions: { + "restart-state-fixture": { workflow: { state: "waiting" } }, + }, + pluginNextTurnInjections: { + "restart-state-fixture": [ + { + id: "resume", + pluginId: "restart-state-fixture", + text: "resume", + placement: "prepend_context", + createdAt: 1, + }, + ], + }, + }; + return undefined; + }); + + await expect( + runPluginHostCleanup({ + cfg: tempConfig, + registry: registry.registry, + pluginId: "restart-state-fixture", + reason: "restart", + }), + ).resolves.toMatchObject({ failures: [] }); + + const stored = loadSessionStore(storePath, { skipCache: true }); + expect(stored["agent:main:main"]?.pluginExtensions).toEqual({ + "restart-state-fixture": { workflow: { state: "waiting" } }, + }); + expect(stored["agent:main:main"]?.pluginNextTurnInjections).toEqual({ + "restart-state-fixture": [ + { + id: "resume", + pluginId: "restart-state-fixture", + text: "resume", + placement: "prepend_context", + createdAt: 1, + }, + ], + }); + }, + }); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + } + }); + + it("rejects hung cleanup hooks with a bounded timeout", async () => { + vi.useFakeTimers(); + const cleanup = vi.fn(async () => { + await new Promise(() => undefined); + }); + registerPluginSessionSchedulerJob({ + pluginId: "hung-cleanup-plugin", + pluginName: "Hung Cleanup Plugin", + job: { + id: "job-hung", + sessionKey: "agent:main:main", + kind: "session-turn", + cleanup, + }, + }); + + const resultPromise = runPluginHostCleanup({ + reason: "disable", + pluginId: "hung-cleanup-plugin", + }); + await vi.advanceTimersByTimeAsync(0); + expect(cleanup).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(PLUGIN_HOST_CLEANUP_TIMEOUT_MS); + await expect(resultPromise).resolves.toMatchObject({ + failures: [ + { + pluginId: "hung-cleanup-plugin", + hookId: "scheduler:job-hung", + }, + ], + }); + }); + + it("bounds session, runtime, and scheduler cleanup callbacks so cleanup keeps moving", async () => { + vi.useFakeTimers(); + const { config, registry } = createPluginRegistryFixture(); + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "hanging-cleanup-fixture", + name: "Hanging Cleanup Fixture", + }), + register(api) { + api.registerSessionExtension({ + namespace: "state", + description: "hangs during cleanup", + cleanup: () => new Promise(() => undefined), + }); + api.registerRuntimeLifecycle({ + id: "runtime-cleanup", + cleanup: () => new Promise(() => undefined), + }); + api.registerSessionSchedulerJob({ + id: "scheduler-cleanup", + sessionKey: "agent:main:main", + kind: "monitor", + cleanup: () => new Promise(() => undefined), + }); + }, + }); + + const cleanupPromise = runPluginHostCleanup({ + cfg: config, + registry: registry.registry, + pluginId: "hanging-cleanup-fixture", + reason: "delete", + }); + for (let index = 0; index < 3; index += 1) { + await vi.advanceTimersByTimeAsync(PLUGIN_HOST_CLEANUP_TIMEOUT_MS + 1); + } + await expect(cleanupPromise).resolves.toMatchObject({ + failures: [ + expect.objectContaining({ + pluginId: "hanging-cleanup-fixture", + hookId: "session:state", + }), + expect.objectContaining({ + pluginId: "hanging-cleanup-fixture", + hookId: "runtime:runtime-cleanup", + }), + expect.objectContaining({ + pluginId: "hanging-cleanup-fixture", + hookId: "scheduler:scheduler-cleanup", + }), + ], + }); + }); + + it("blocks setting run context after a run is closed", () => { + expect( + setPluginRunContext({ + pluginId: "closed-run-plugin", + patch: { runId: "run-closed", namespace: "state", value: { before: true } }, + }), + ).toBe(true); + dispatchPluginAgentEventSubscriptions({ + registry: createEmptyPluginRegistry(), + event: { + runId: "run-closed", + seq: 1, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }, + }); + + expect( + setPluginRunContext({ + pluginId: "closed-run-plugin", + patch: { runId: "run-closed", namespace: "state", value: { after: true } }, + }), + ).toBe(false); + }); +}); diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index ae8894cfc9a..04e41afe370 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -299,6 +299,11 @@ export type PluginHookBeforeAgentFinalizeResult = { */ action?: "continue" | "revise" | "finalize"; reason?: string; + retry?: { + instruction: string; + idempotencyKey?: string; + maxAttempts?: number; + }; }; export type PluginHookBeforeCompactionEvent = { diff --git a/src/plugins/hooks.before-agent-finalize.test.ts b/src/plugins/hooks.before-agent-finalize.test.ts index 3db857b739e..fad46d1884b 100644 --- a/src/plugins/hooks.before-agent-finalize.test.ts +++ b/src/plugins/hooks.before-agent-finalize.test.ts @@ -60,6 +60,127 @@ describe("before_agent_finalize hook runner", () => { }); }); + it("skips empty retry instructions when merging revise decisions", async () => { + const runner = createHookRunner( + createMockPluginRegistry([ + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "needs a retry but forgot the instruction", + retry: { instruction: " ", idempotencyKey: "empty-retry" }, + }), + }, + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "rerun the focused tests", + retry: { + instruction: " rerun the focused tests ", + idempotencyKey: "valid-retry", + maxAttempts: 1, + }, + }), + }, + ]), + ); + + await expect(runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX)).resolves.toEqual({ + action: "revise", + reason: "needs a retry but forgot the instruction\n\nrerun the focused tests", + retry: { + instruction: "rerun the focused tests", + idempotencyKey: "valid-retry", + maxAttempts: 1, + }, + }); + }); + + it("skips malformed retry instructions when merging revise decisions", async () => { + const runner = createHookRunner( + createMockPluginRegistry([ + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "malformed retry payload should not crash", + retry: { instruction: 123, idempotencyKey: "bad-retry" } as never, + }), + }, + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "valid retry still applies", + retry: { + instruction: " rerun the focused tests ", + idempotencyKey: "valid-retry", + }, + }), + }, + ]), + ); + + await expect(runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX)).resolves.toEqual({ + action: "revise", + reason: "malformed retry payload should not crash\n\nvalid retry still applies", + retry: { + instruction: "rerun the focused tests", + idempotencyKey: "valid-retry", + }, + }); + }); + + it("preserves multiple valid retry candidates for budget evaluation", async () => { + const runner = createHookRunner( + createMockPluginRegistry([ + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "retry generated artifacts", + retry: { + instruction: "regenerate artifacts", + idempotencyKey: "artifacts", + maxAttempts: 1, + }, + }), + }, + { + hookName: "before_agent_finalize", + handler: vi.fn().mockResolvedValue({ + action: "revise", + reason: "retry focused tests", + retry: { + instruction: "rerun focused tests", + idempotencyKey: "tests", + maxAttempts: 1, + }, + }), + }, + ]), + ); + + const result = await runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX); + + expect(result).toEqual({ + action: "revise", + reason: "retry generated artifacts\n\nretry focused tests", + retry: { + instruction: "regenerate artifacts", + idempotencyKey: "artifacts", + maxAttempts: 1, + }, + }); + expect(Object.getOwnPropertyDescriptor(result, "retryCandidates")?.enumerable).toBe(false); + expect( + (Object.getOwnPropertyDescriptor(result, "retryCandidates")?.value as unknown[])?.map( + (retry) => (retry as { idempotencyKey?: string }).idempotencyKey, + ), + ).toEqual(["artifacts", "tests"]); + }); + it("lets finalize override earlier revise decisions", async () => { const runner = createHookRunner( createMockPluginRegistry([ diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index 4fb45c2db9f..fe3035e08ab 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -154,6 +154,11 @@ export type HookRunnerLogger = { export type HookFailurePolicy = "fail-open" | "fail-closed"; +type BeforeAgentFinalizeRetry = NonNullable; +type BeforeAgentFinalizeResultWithRetryCandidates = PluginHookBeforeAgentFinalizeResult & { + retryCandidates?: BeforeAgentFinalizeRetry[]; +}; + export type HookRunnerOptions = { logger?: HookRunnerLogger; /** If true, errors in hooks will be caught and logged instead of thrown */ @@ -319,6 +324,48 @@ export function createHookRunner( acc: PluginHookBeforeAgentFinalizeResult | undefined, next: PluginHookBeforeAgentFinalizeResult, ): PluginHookBeforeAgentFinalizeResult => { + const normalizeRetry = ( + retry: PluginHookBeforeAgentFinalizeResult["retry"] | undefined, + ): BeforeAgentFinalizeRetry | undefined => { + const instruction = typeof retry?.instruction === "string" ? retry.instruction.trim() : ""; + if (!instruction) { + return undefined; + } + return { + ...retry, + instruction, + }; + }; + const readRetryCandidates = ( + result: PluginHookBeforeAgentFinalizeResult | undefined, + ): BeforeAgentFinalizeRetry[] => { + if (!result || result.action !== "revise") { + return []; + } + const candidateList = (result as BeforeAgentFinalizeResultWithRetryCandidates) + .retryCandidates; + if (Array.isArray(candidateList) && candidateList.length > 0) { + return candidateList + .map((retry) => normalizeRetry(retry)) + .filter((retry): retry is BeforeAgentFinalizeRetry => retry !== undefined); + } + const retry = normalizeRetry(result.retry); + return retry ? [retry] : []; + }; + const attachRetryCandidates = ( + result: PluginHookBeforeAgentFinalizeResult, + candidates: BeforeAgentFinalizeRetry[], + ): PluginHookBeforeAgentFinalizeResult => { + if (result.action !== "revise" || candidates.length <= 1) { + return result; + } + Object.defineProperty(result, "retryCandidates", { + configurable: true, + enumerable: false, + value: candidates, + }); + return result; + }; if (acc?.action === "finalize") { return acc; } @@ -326,19 +373,30 @@ export function createHookRunner( return { action: "finalize", reason: next.reason }; } if (acc?.action === "revise" && next.action === "revise") { - return { - action: "revise", - reason: concatOptionalTextSegments({ - left: acc.reason, - right: next.reason, - }), - }; + const retryCandidates = [...readRetryCandidates(acc), ...readRetryCandidates(next)]; + const retry = retryCandidates[0]; + return attachRetryCandidates( + { + action: "revise", + reason: concatOptionalTextSegments({ + left: acc.reason, + right: next.reason, + }), + ...(retry ? { retry } : {}), + }, + retryCandidates, + ); } if (acc?.action === "revise") { return acc; } if (next.action === "revise") { - return { action: "revise", reason: next.reason }; + const retry = normalizeRetry(next.retry); + return { + action: "revise", + reason: next.reason, + ...(retry ? { retry } : {}), + }; } return next.action === "continue" ? { action: "continue", reason: next.reason } : (acc ?? next); }; diff --git a/src/plugins/host-hook-cleanup-timeout.test.ts b/src/plugins/host-hook-cleanup-timeout.test.ts new file mode 100644 index 00000000000..671e6bc297e --- /dev/null +++ b/src/plugins/host-hook-cleanup-timeout.test.ts @@ -0,0 +1,36 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + PLUGIN_HOST_CLEANUP_TIMEOUT_MS, + withPluginHostCleanupTimeout, +} from "./host-hook-cleanup-timeout.js"; + +describe("withPluginHostCleanupTimeout", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("unrefs cleanup timeout timers so pending cleanup does not keep the process alive", async () => { + const originalSetTimeout = globalThis.setTimeout; + const unref = vi.fn(); + + vi.spyOn(globalThis, "setTimeout").mockImplementation((( + callback: () => void, + timeout?: number, + ) => { + const timer = originalSetTimeout(callback, timeout); + vi.spyOn(timer, "unref").mockImplementation(() => { + unref(); + return timer; + }); + return timer; + }) as typeof setTimeout); + + await expect(withPluginHostCleanupTimeout("fast-cleanup", () => "ok")).resolves.toBe("ok"); + + expect(globalThis.setTimeout).toHaveBeenCalledWith( + expect.any(Function), + PLUGIN_HOST_CLEANUP_TIMEOUT_MS, + ); + expect(unref).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/plugins/host-hook-cleanup-timeout.ts b/src/plugins/host-hook-cleanup-timeout.ts new file mode 100644 index 00000000000..7b992664dd5 --- /dev/null +++ b/src/plugins/host-hook-cleanup-timeout.ts @@ -0,0 +1,30 @@ +export const PLUGIN_HOST_CLEANUP_TIMEOUT_MS = 5_000; + +export class PluginHostCleanupTimeoutError extends Error { + constructor(hookId: string) { + super(`plugin host cleanup timed out: ${hookId}`); + this.name = "PluginHostCleanupTimeoutError"; + } +} + +export async function withPluginHostCleanupTimeout( + hookId: string, + cleanup: () => T | Promise, +): Promise { + let timeout: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + Promise.resolve().then(cleanup), + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new PluginHostCleanupTimeoutError(hookId)); + }, PLUGIN_HOST_CLEANUP_TIMEOUT_MS); + timeout.unref?.(); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } +} diff --git a/src/plugins/host-hook-cleanup.config.test.ts b/src/plugins/host-hook-cleanup.config.test.ts new file mode 100644 index 00000000000..9194a3bcc58 --- /dev/null +++ b/src/plugins/host-hook-cleanup.config.test.ts @@ -0,0 +1,54 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { runPluginHostCleanup } from "./host-hook-cleanup.js"; +import { createEmptyPluginRegistry } from "./registry-empty.js"; + +const mocks = vi.hoisted(() => ({ + getRuntimeConfig: vi.fn(), +})); + +vi.mock("../config/config.js", () => ({ + getRuntimeConfig: mocks.getRuntimeConfig, +})); + +describe("plugin host cleanup config fallback", () => { + afterEach(() => { + mocks.getRuntimeConfig.mockReset(); + }); + + it("records session store config failures while continuing runtime cleanup", async () => { + const registry = createEmptyPluginRegistry(); + const cleanup = vi.fn(); + registry.runtimeLifecycles ??= []; + registry.runtimeLifecycles.push({ + pluginId: "cleanup-plugin", + pluginName: "Cleanup Plugin", + source: "test", + lifecycle: { + id: "runtime-cleanup", + cleanup, + }, + }); + mocks.getRuntimeConfig.mockImplementation(() => { + throw new Error("invalid config"); + }); + + const result = await runPluginHostCleanup({ + registry, + pluginId: "cleanup-plugin", + reason: "disable", + }); + + expect(cleanup).toHaveBeenCalledWith( + expect.objectContaining({ + reason: "disable", + }), + ); + expect(result.cleanupCount).toBe(1); + expect(result.failures).toEqual([ + expect.objectContaining({ + pluginId: "cleanup-plugin", + hookId: "session-store", + }), + ]); + }); +}); diff --git a/src/plugins/host-hook-cleanup.ts b/src/plugins/host-hook-cleanup.ts index 1ff2aca7036..1ee90dac7f0 100644 --- a/src/plugins/host-hook-cleanup.ts +++ b/src/plugins/host-hook-cleanup.ts @@ -1,10 +1,16 @@ import fs from "node:fs"; +import { getRuntimeConfig } from "../config/config.js"; import { updateSessionStore } from "../config/sessions/store.js"; import { resolveAllAgentSessionStoreTargetsSync } from "../config/sessions/targets.js"; import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; -import { cleanupPluginSessionSchedulerJobs, clearPluginRunContext } from "./host-hook-runtime.js"; +import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js"; +import { + cleanupPluginSessionSchedulerJobs, + clearPluginRunContext, + makePluginSessionSchedulerJobKey, +} from "./host-hook-runtime.js"; import type { PluginHostCleanupReason } from "./host-hooks.js"; import type { PluginRegistry } from "./registry-types.js"; @@ -101,84 +107,136 @@ async function clearPluginOwnedSessionStores(params: { } export async function runPluginHostCleanup(params: { - cfg: OpenClawConfig; + cfg?: OpenClawConfig; registry?: PluginRegistry | null; pluginId?: string; reason: PluginHostCleanupReason; sessionKey?: string; runId?: string; preserveSchedulerJobIds?: ReadonlySet; + shouldCleanup?: () => boolean; }): Promise { - const persistentCleanupCount = - params.reason === "restart" - ? 0 - : await clearPluginOwnedSessionStores({ - cfg: params.cfg, - pluginId: params.pluginId, - sessionKey: params.sessionKey, - }); - const registry = params.registry; - if (!registry) { - return { cleanupCount: persistentCleanupCount, failures: [] }; - } const failures: PluginHostCleanupFailure[] = []; + const shouldCleanup = params.shouldCleanup ?? (() => true); + if (!shouldCleanup()) { + return { cleanupCount: 0, failures }; + } + let persistentCleanupCount = 0; + if (params.reason !== "restart" && shouldCleanup()) { + try { + persistentCleanupCount = await clearPluginOwnedSessionStores({ + cfg: params.cfg ?? getRuntimeConfig(), + pluginId: params.pluginId, + sessionKey: params.sessionKey, + }); + } catch (error) { + failures.push({ + pluginId: params.pluginId ?? "plugin-host", + hookId: "session-store", + error, + }); + } + } + const registry = params.registry; let cleanupCount = persistentCleanupCount; - for (const registration of registry.sessionExtensions ?? []) { - if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { - continue; + if (registry) { + for (const registration of registry.sessionExtensions ?? []) { + if (!shouldCleanup()) { + return { cleanupCount, failures }; + } + if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { + continue; + } + const cleanup = registration.extension.cleanup; + if (!cleanup) { + continue; + } + const hookId = `session:${registration.extension.namespace}`; + try { + await withPluginHostCleanupTimeout(hookId, () => + cleanup({ + reason: params.reason, + sessionKey: params.sessionKey, + }), + ); + cleanupCount += 1; + } catch (error) { + failures.push({ + pluginId: registration.pluginId, + hookId, + error, + }); + } } - const cleanup = registration.extension.cleanup; - if (!cleanup) { - continue; + for (const registration of registry.runtimeLifecycles ?? []) { + if (!shouldCleanup()) { + return { cleanupCount, failures }; + } + if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { + continue; + } + const cleanup = registration.lifecycle.cleanup; + if (!cleanup) { + continue; + } + const hookId = `runtime:${registration.lifecycle.id}`; + try { + await withPluginHostCleanupTimeout(hookId, () => + cleanup({ + reason: params.reason, + sessionKey: params.sessionKey, + runId: params.runId, + }), + ); + cleanupCount += 1; + } catch (error) { + failures.push({ + pluginId: registration.pluginId, + hookId, + error, + }); + } } - try { - await cleanup({ - reason: params.reason, - sessionKey: params.sessionKey, - }); - cleanupCount += 1; - } catch (error) { - failures.push({ - pluginId: registration.pluginId, - hookId: `session:${registration.extension.namespace}`, - error, - }); + const schedulerFailures = await cleanupPluginSessionSchedulerJobs({ + pluginId: params.pluginId, + reason: params.reason, + sessionKey: params.sessionKey, + records: registry.sessionSchedulerJobs, + preserveJobIds: params.preserveSchedulerJobIds, + shouldCleanup, + }); + for (const failure of schedulerFailures) { + failures.push(failure); } } - for (const registration of registry.runtimeLifecycles ?? []) { - if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) { - continue; - } - const cleanup = registration.lifecycle.cleanup; - if (!cleanup) { - continue; - } - try { - await cleanup({ - reason: params.reason, - sessionKey: params.sessionKey, - runId: params.runId, - }); - cleanupCount += 1; - } catch (error) { - failures.push({ - pluginId: registration.pluginId, - hookId: `runtime:${registration.lifecycle.id}`, - error, - }); + if (params.reason !== "restart" && shouldCleanup()) { + const registrySchedulerJobKeys = new Set( + (registry?.sessionSchedulerJobs ?? []) + .filter((record) => !params.pluginId || record.pluginId === params.pluginId) + .map((record) => ({ + pluginId: record.pluginId, + jobId: typeof record.job.id === "string" ? record.job.id.trim() : "", + })) + .filter(({ jobId }) => jobId.length > 0) + .map(({ pluginId, jobId }) => makePluginSessionSchedulerJobKey(pluginId, jobId)), + ); + const runtimeSchedulerFailures = await cleanupPluginSessionSchedulerJobs({ + pluginId: params.pluginId, + reason: params.reason, + sessionKey: params.sessionKey, + preserveJobIds: params.preserveSchedulerJobIds, + excludeJobKeys: registrySchedulerJobKeys, + shouldCleanup, + }); + for (const failure of runtimeSchedulerFailures) { + failures.push(failure); } } - const schedulerFailures = await cleanupPluginSessionSchedulerJobs({ - pluginId: params.pluginId, - reason: params.reason, - sessionKey: params.sessionKey, - records: registry?.sessionSchedulerJobs, - preserveJobIds: params.preserveSchedulerJobIds, - }); - for (const failure of schedulerFailures) { - failures.push(failure); - } - if (params.pluginId || params.runId) { + if ( + shouldCleanup() && + (params.pluginId || params.runId) && + (params.reason !== "restart" || params.runId) + ) { clearPluginRunContext({ pluginId: params.pluginId, runId: params.runId }); } return { cleanupCount, failures }; @@ -225,9 +283,11 @@ export async function cleanupReplacedPluginHostRegistry(params: { cfg: OpenClawConfig; previousRegistry?: PluginRegistry | null; nextRegistry?: PluginRegistry | null; + shouldCleanup?: () => boolean; }): Promise { const previousRegistry = params.previousRegistry; - if (!previousRegistry || previousRegistry === params.nextRegistry) { + const shouldCleanup = params.shouldCleanup ?? (() => true); + if (!previousRegistry || previousRegistry === params.nextRegistry || !shouldCleanup()) { return { cleanupCount: 0, failures: [] }; } const nextPluginIds = params.nextRegistry @@ -240,6 +300,9 @@ export async function cleanupReplacedPluginHostRegistry(params: { const failures: PluginHostCleanupFailure[] = []; let cleanupCount = 0; for (const pluginId of previousPluginIds) { + if (!shouldCleanup()) { + break; + } const restarted = nextPluginIds.has(pluginId); const result = await runPluginHostCleanup({ cfg: params.cfg, @@ -249,6 +312,7 @@ export async function cleanupReplacedPluginHostRegistry(params: { preserveSchedulerJobIds: restarted ? collectSchedulerJobIds(params.nextRegistry, pluginId) : undefined, + shouldCleanup, }); cleanupCount += result.cleanupCount; failures.push(...result.failures); diff --git a/src/plugins/host-hook-runtime.ts b/src/plugins/host-hook-runtime.ts index 3fca1e4a565..0a14c290b51 100644 --- a/src/plugins/host-hook-runtime.ts +++ b/src/plugins/host-hook-runtime.ts @@ -2,6 +2,7 @@ import type { AgentEventPayload } from "../infra/agent-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js"; import { isPluginJsonValue, type PluginHostCleanupReason, @@ -29,10 +30,12 @@ type PluginHostRuntimeState = { nextSchedulerJobGeneration: number; pendingAgentEventHandlersByRunId: Map>>; closedRunIds: Set; + terminalEventCleanupExpiredRunIds: Set; }; const PLUGIN_HOST_RUNTIME_STATE_KEY = Symbol.for("openclaw.pluginHostRuntimeState"); const CLOSED_RUN_IDS_MAX = 512; +export const PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS = 5_000; const log = createSubsystemLogger("plugins/host-hooks"); function getPluginHostRuntimeState(): PluginHostRuntimeState { @@ -42,6 +45,7 @@ function getPluginHostRuntimeState(): PluginHostRuntimeState { nextSchedulerJobGeneration: 1, pendingAgentEventHandlersByRunId: new Map(), closedRunIds: new Set(), + terminalEventCleanupExpiredRunIds: new Set(), })); } @@ -70,6 +74,23 @@ function isPluginRunClosed(runId: string): boolean { return getPluginHostRuntimeState().closedRunIds.has(runId); } +function markTerminalEventCleanupExpired(runId: string): void { + const state = getPluginHostRuntimeState(); + state.terminalEventCleanupExpiredRunIds.delete(runId); + state.terminalEventCleanupExpiredRunIds.add(runId); + while (state.terminalEventCleanupExpiredRunIds.size > CLOSED_RUN_IDS_MAX) { + const oldest = state.terminalEventCleanupExpiredRunIds.values().next().value; + if (oldest === undefined) { + break; + } + state.terminalEventCleanupExpiredRunIds.delete(oldest); + } +} + +function isTerminalEventCleanupExpired(runId: string): boolean { + return getPluginHostRuntimeState().terminalEventCleanupExpiredRunIds.has(runId); +} + function trackAgentEventHandler(runId: string, pending: Promise): void { const state = getPluginHostRuntimeState(); const handlers = state.pendingAgentEventHandlersByRunId.get(runId) ?? new Set(); @@ -77,12 +98,53 @@ function trackAgentEventHandler(runId: string, pending: Promise): void { state.pendingAgentEventHandlersByRunId.set(runId, handlers); void pending.finally(() => { handlers.delete(pending); - if (handlers.size === 0) { + if ( + handlers.size === 0 && + getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(runId) === handlers + ) { state.pendingAgentEventHandlersByRunId.delete(runId); } }); } +async function waitForLiveTerminalEventHandlers(runId: string): Promise<"settled"> { + for (;;) { + const pendingHandlers = getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(runId); + if (!pendingHandlers || pendingHandlers.size === 0) { + return "settled"; + } + await Promise.allSettled(pendingHandlers); + } +} + +function waitForTerminalEventHandlers(params: { runId: string }): Promise { + const { runId } = params; + let timeout: NodeJS.Timeout | undefined; + const settled = waitForLiveTerminalEventHandlers(runId); + // Promise.race bounds the host wait; JavaScript cannot cancel the plugin + // promises themselves, so timeout also marks the run expired to block late + // run-context resurrection by handlers that eventually settle. + const timedOut = new Promise<"timeout">((resolve) => { + timeout = setTimeout(() => { + markTerminalEventCleanupExpired(runId); + getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.delete(runId); + log.warn( + `plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; clearing run context without waiting for them to settle`, + ); + resolve("timeout"); + }, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS); + }); + if (timeout) { + timeout.unref?.(); + } + return Promise.race([settled, timedOut]).then(() => { + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + }); +} + function getPluginRunContextNamespaces(params: { runId: string; pluginId: string; @@ -108,13 +170,14 @@ function getPluginRunContextNamespaces(params: { export function setPluginRunContext(params: { pluginId: string; patch: PluginRunContextPatch; + allowClosedRun?: boolean; }): boolean { const runId = normalizeOptionalString(params.patch.runId); const namespace = normalizeNamespace(params.patch.namespace); if (!runId || !namespace) { return false; } - if (isPluginRunClosed(runId)) { + if (!params.allowClosedRun && isPluginRunClosed(runId)) { return false; } // Only an explicit `unset: true` deletes the run-context entry — silently @@ -230,6 +293,7 @@ export function dispatchPluginAgentEventSubscriptions(params: { }): void { const subscriptions = params.registry?.agentEventSubscriptions ?? []; const pendingHandlers: Promise[] = []; + const isTerminalEvent = isTerminalAgentRunEvent(params.event); for (const registration of subscriptions) { const streams = registration.subscription.streams; if (streams && streams.length > 0 && !streams.includes(params.event.stream)) { @@ -237,12 +301,17 @@ export function dispatchPluginAgentEventSubscriptions(params: { } const pluginId = registration.pluginId; const runId = params.event.runId; + let handlerActive = true; const ctx = { // oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Run-context JSON reads are caller-typed by namespace. getRunContext: (namespace: string) => getPluginRunContext({ pluginId, get: { runId, namespace } }), setRunContext: (namespace: string, value: PluginJsonValue) => { - setPluginRunContext({ pluginId, patch: { runId, namespace, value } }); + setPluginRunContext({ + pluginId, + patch: { runId, namespace, value }, + allowClosedRun: isTerminalEvent && handlerActive && !isTerminalEventCleanupExpired(runId), + }); }, clearRunContext: (namespace?: string) => { clearPluginRunContext({ pluginId, runId, namespace }); @@ -251,16 +320,21 @@ export function dispatchPluginAgentEventSubscriptions(params: { try { const pending = Promise.resolve( registration.subscription.handle(structuredClone(params.event), ctx), - ).catch((error) => { - logAgentEventSubscriptionFailure({ - pluginId, - subscriptionId: registration.subscription.id, - error, + ) + .catch((error) => { + logAgentEventSubscriptionFailure({ + pluginId, + subscriptionId: registration.subscription.id, + error, + }); + }) + .finally(() => { + handlerActive = false; }); - }); trackAgentEventHandler(runId, pending); pendingHandlers.push(pending); } catch (error) { + handlerActive = false; logAgentEventSubscriptionFailure({ pluginId, subscriptionId: registration.subscription.id, @@ -268,12 +342,11 @@ export function dispatchPluginAgentEventSubscriptions(params: { }); } } - if (isTerminalAgentRunEvent(params.event)) { + if (isTerminalEvent) { markPluginRunClosed(params.event.runId); - const pendingForRun = - getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ?? - new Set(pendingHandlers); - void Promise.allSettled(pendingForRun).then(() => { + void waitForTerminalEventHandlers({ + runId: params.event.runId, + }).then(() => { clearPluginRunContext({ runId: params.event.runId }); }); } @@ -360,6 +433,10 @@ export function getPluginSessionSchedulerJobGeneration(params: { return record.generation; } +export function makePluginSessionSchedulerJobKey(pluginId: string, jobId: string): string { + return JSON.stringify([pluginId, jobId]); +} + export async function cleanupPluginSessionSchedulerJobs(params: { pluginId?: string; reason: PluginHostCleanupReason; @@ -371,11 +448,20 @@ export async function cleanupPluginSessionSchedulerJobs(params: { generation?: number; }[]; preserveJobIds?: ReadonlySet; + excludeJobKeys?: ReadonlySet; + shouldCleanup?: () => boolean; }): Promise> { const state = getPluginHostRuntimeState(); const failures: Array<{ pluginId: string; hookId: string; error: unknown }> = []; + const shouldCleanup = params.shouldCleanup ?? (() => true); + if (!shouldCleanup()) { + return failures; + } if (params.records) { for (const record of params.records) { + if (!shouldCleanup()) { + return failures; + } if (params.pluginId && record.pluginId !== params.pluginId) { continue; } @@ -406,29 +492,37 @@ export async function cleanupPluginSessionSchedulerJobs(params: { continue; } const preserveJob = params.preserveJobIds?.has(jobId) ?? false; - if ( - preserveJob && - (record.generation === undefined || liveGeneration === record.generation) - ) { + if (preserveJob) { + // preserveJobIds means "do not run cleanup at all" — even across + // generation mismatches. The generation-matched deletion below would + // otherwise still call the OLD cleanup callback, which can remove + // external scheduled jobs (e.g. cron.remove) and break the live + // newer-generation registration that took over this jobId. continue; } // A newer generation may already own this id. The old cleanup callback can // still release plugin-owned resources, while deletion below is generation // matched so it cannot remove the newer live record. + const hookId = `scheduler:${jobId}`; try { - await record.job.cleanup?.({ - reason: params.reason, - sessionKey, - jobId, - }); + await withPluginHostCleanupTimeout(hookId, () => + record.job.cleanup?.({ + reason: params.reason, + sessionKey, + jobId, + }), + ); } catch (error) { failures.push({ pluginId: record.pluginId, - hookId: `scheduler:${jobId}`, + hookId, error, }); continue; } + if (!shouldCleanup()) { + continue; + } deletePluginSessionSchedulerJob({ pluginId: record.pluginId, jobId, @@ -440,28 +534,46 @@ export async function cleanupPluginSessionSchedulerJobs(params: { } const pluginIds = params.pluginId ? [params.pluginId] : [...state.schedulerJobsByPlugin.keys()]; for (const pluginId of pluginIds) { + if (!shouldCleanup()) { + return failures; + } const jobs = state.schedulerJobsByPlugin.get(pluginId); if (!jobs) { continue; } for (const [jobId, record] of jobs.entries()) { + if (!shouldCleanup()) { + return failures; + } if (params.sessionKey && record.job.sessionKey !== params.sessionKey) { continue; } + if (params.excludeJobKeys?.has(makePluginSessionSchedulerJobKey(pluginId, jobId))) { + continue; + } + if (params.preserveJobIds?.has(jobId)) { + continue; + } + const hookId = `scheduler:${jobId}`; try { - await record.job.cleanup?.({ - reason: params.reason, - sessionKey: record.job.sessionKey, - jobId, - }); + await withPluginHostCleanupTimeout(hookId, () => + record.job.cleanup?.({ + reason: params.reason, + sessionKey: record.job.sessionKey, + jobId, + }), + ); } catch (error) { failures.push({ pluginId, - hookId: `scheduler:${jobId}`, + hookId, error, }); continue; } + if (!shouldCleanup()) { + continue; + } jobs.delete(jobId); } if (jobs.size === 0) { @@ -480,6 +592,7 @@ export function clearPluginHostRuntimeState(params?: { pluginId?: string; runId? state.schedulerJobsByPlugin.clear(); state.pendingAgentEventHandlersByRunId.clear(); state.closedRunIds.clear(); + state.terminalEventCleanupExpiredRunIds.clear(); } } diff --git a/src/plugins/registry-lifecycle.ts b/src/plugins/registry-lifecycle.ts new file mode 100644 index 00000000000..18dbdcbf48d --- /dev/null +++ b/src/plugins/registry-lifecycle.ts @@ -0,0 +1,19 @@ +import type { PluginRegistry } from "./registry-types.js"; + +const retiredRegistries = new WeakSet(); + +export function markPluginRegistryRetired(registry: PluginRegistry | null | undefined): void { + if (registry) { + retiredRegistries.add(registry); + } +} + +export function markPluginRegistryActive(registry: PluginRegistry | null | undefined): void { + if (registry) { + retiredRegistries.delete(registry); + } +} + +export function isPluginRegistryRetired(registry: PluginRegistry): boolean { + return retiredRegistries.has(registry); +} diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index 839e71501ad..6ce09bb6f9b 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -99,6 +99,7 @@ import { } from "./memory-state.js"; import { normalizeRegisteredProvider } from "./provider-validation.js"; import { createEmptyPluginRegistry } from "./registry-empty.js"; +import { isPluginRegistryRetired } from "./registry-lifecycle.js"; import type { PluginCliBackendRegistration, PluginCliRegistration, @@ -303,6 +304,9 @@ const activePluginHookRegistrations = resolveGlobalSingleton< type HookRegistration = { event: string; handler: Parameters[1] }; type HookRollbackEntry = { name: string; previousRegistrations: HookRegistration[] }; +type PluginSideEffectGuard = { + active: boolean; +}; type PluginRegistrationCapabilities = { /** Broad registry writes that discovery and live activation both need. */ @@ -338,6 +342,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { const coreGatewayMethods = new Set(coreGatewayMethodNames); const pluginHookRollback = new Map(); const pluginsWithChannelRegistrationConflict = new Set(); + const pluginSideEffectGuards = new Map>(); const pushDiagnostic = (diag: PluginDiagnostic) => { registry.diagnostics.push(diag); @@ -354,6 +359,25 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { return value; }; + const createPluginSideEffectGuard = (pluginId: string): PluginSideEffectGuard => { + const guard = { active: true }; + const guards = pluginSideEffectGuards.get(pluginId) ?? new Set(); + guards.add(guard); + pluginSideEffectGuards.set(pluginId, guards); + return guard; + }; + + const deactivatePluginSideEffectGuards = (pluginId: string): void => { + const guards = pluginSideEffectGuards.get(pluginId); + if (!guards) { + return; + } + for (const guard of guards) { + guard.active = false; + } + pluginSideEffectGuards.delete(pluginId); + }; + const registerCodexAppServerExtensionFactory = ( record: PluginRecord, factory: Parameters[0], @@ -2169,6 +2193,18 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { const registrationMode = params.registrationMode ?? "full"; const registrationCapabilities = resolvePluginRegistrationCapabilities(registrationMode); pluginRuntimeRecordById.set(record.id, record); + const sideEffectGuard = createPluginSideEffectGuard(record.id); + const isLoadedRecordInRegistry = () => + registry.plugins.some((plugin) => plugin.id === record.id && plugin.status === "loaded"); + const isActivatingLoadedRecord = () => + registryParams.activateGlobalSideEffects !== false && + record.enabled && + record.status === "loaded" && + !registry.plugins.some((plugin) => plugin.id === record.id); + const shouldCommitWorkflowSideEffect = () => + sideEffectGuard.active && + !isPluginRegistryRetired(registry) && + (isLoadedRecordInRegistry() || isActivatingLoadedRecord()); return buildPluginApi({ id: record.id, name: record.name, @@ -2378,14 +2414,25 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerRuntimeLifecycle: (lifecycle) => registerRuntimeLifecycle(record, lifecycle), registerAgentEventSubscription: (subscription) => registerAgentEventSubscription(record, subscription), - setRunContext: (patch) => setPluginRunContext({ pluginId: record.id, patch }), + setRunContext: (patch) => + registryParams.activateGlobalSideEffects !== false && + shouldCommitWorkflowSideEffect() + ? setPluginRunContext({ pluginId: record.id, patch }) + : false, getRunContext: (get) => getPluginRunContext({ pluginId: record.id, get }), - clearRunContext: (params) => + clearRunContext: (params) => { + if ( + registryParams.activateGlobalSideEffects === false || + !shouldCommitWorkflowSideEffect() + ) { + return; + } clearPluginRunContext({ pluginId: record.id, runId: params.runId, namespace: params.namespace, - }), + }); + }, registerSessionSchedulerJob: (job) => registerSessionSchedulerJob(record, job), registerMemoryCapability: (capability) => { if (!hasKind(record.kind, "memory")) { @@ -2548,6 +2595,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { }; const rollbackPluginGlobalSideEffects = (pluginId: string) => { + deactivatePluginSideEffectGuards(pluginId); if (registryParams.activateGlobalSideEffects === false) { return; } diff --git a/src/plugins/runtime.test.ts b/src/plugins/runtime.test.ts index a2541fb60c5..13232e39cb2 100644 --- a/src/plugins/runtime.test.ts +++ b/src/plugins/runtime.test.ts @@ -68,6 +68,15 @@ function expectRouteRegistryState(params: { setup: () => void; assert: () => voi params.assert(); } +async function waitForCleanupSignal(signal: Promise, label: string): Promise { + await Promise.race([ + signal, + new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Timed out waiting for ${label}`)), 500); + }), + ]); +} + describe("plugin runtime route registry", () => { afterEach(() => { releasePinnedPluginHttpRouteRegistry(); @@ -209,6 +218,78 @@ describe("setActivePluginRegistry", () => { expect(listImportedRuntimePluginIds()).toEqual(["runtime-plugin"]); }); + it.each([ + { + name: "same active registry is refreshed", + refresh(nextRegistry: ReturnType) { + setActivePluginRegistry(nextRegistry); + }, + }, + { + name: "active registry advances again", + refresh() { + setActivePluginRegistry(createEmptyPluginRegistry()); + }, + }, + ] as const)("continues cleanup when the $name", async ({ refresh }) => { + let releaseFirstCleanup: (() => void) | undefined; + let markFirstCleanupStarted!: () => void; + let markSecondCleanupCalled!: () => void; + const firstCleanupStarted = new Promise((resolve) => { + markFirstCleanupStarted = resolve; + }); + const secondCleanupCalled = new Promise((resolve) => { + markSecondCleanupCalled = resolve; + }); + const previous = createEmptyPluginRegistry(); + previous.plugins.push( + createPluginRecord({ + id: "cleanup-refresh-race", + name: "Cleanup Refresh Race", + status: "loaded", + }), + ); + previous.runtimeLifecycles = [ + { + pluginId: "cleanup-refresh-race", + pluginName: "Cleanup Refresh Race", + lifecycle: { + id: "first-cleanup", + async cleanup() { + markFirstCleanupStarted(); + await new Promise((resolve) => { + releaseFirstCleanup = resolve; + }); + }, + }, + source: "/virtual/cleanup-refresh-race/index.ts", + rootDir: "/virtual/cleanup-refresh-race", + }, + { + pluginId: "cleanup-refresh-race", + pluginName: "Cleanup Refresh Race", + lifecycle: { + id: "second-cleanup", + cleanup() { + markSecondCleanupCalled(); + }, + }, + source: "/virtual/cleanup-refresh-race/index.ts", + rootDir: "/virtual/cleanup-refresh-race", + }, + ]; + const next = createEmptyPluginRegistry(); + + setActivePluginRegistry(previous); + setActivePluginRegistry(next); + await waitForCleanupSignal(firstCleanupStarted, "first cleanup start"); + + refresh(next); + releaseFirstCleanup?.(); + + await waitForCleanupSignal(secondCleanupCalled, "second cleanup"); + }); + it("includes plugin ids imported before registration failed", () => { recordImportedPluginId("broken-plugin"); diff --git a/src/plugins/runtime.ts b/src/plugins/runtime.ts index d831a21eaed..cc93ee54d6e 100644 --- a/src/plugins/runtime.ts +++ b/src/plugins/runtime.ts @@ -5,6 +5,7 @@ import { dispatchPluginAgentEventSubscriptions, } from "./host-hook-runtime.js"; import { createEmptyPluginRegistry } from "./registry-empty.js"; +import { markPluginRegistryActive, markPluginRegistryRetired } from "./registry-lifecycle.js"; import type { PluginRegistry } from "./registry-types.js"; import { PLUGIN_REGISTRY_STATE, @@ -64,16 +65,23 @@ function registryHasPluginHostCleanupWork(registry: PluginRegistry | null): bool async function cleanupPreviousPluginHostRegistry(params: { previousRegistry: PluginRegistry; - nextRegistry: PluginRegistry; }): Promise { const [{ getRuntimeConfig }, { cleanupReplacedPluginHostRegistry }] = await Promise.all([ import("../config/config.js"), import("./host-hook-cleanup.js"), ]); + const nextRegistry = asPluginRegistry(state.activeRegistry); + if (!nextRegistry || nextRegistry === params.previousRegistry) { + return; + } + // Async cleanup must not clear state for a registry that has been restored + // active, but later swaps should not strand cleanup for the retiring registry. + const shouldCleanup = () => state.activeRegistry !== params.previousRegistry; await cleanupReplacedPluginHostRegistry({ cfg: getRuntimeConfig(), previousRegistry: params.previousRegistry, - nextRegistry: params.nextRegistry, + nextRegistry, + shouldCleanup, }); } @@ -129,6 +137,10 @@ export function setActivePluginRegistry( workspaceDir?: string, ) { const previousRegistry = asPluginRegistry(state.activeRegistry); + if (previousRegistry && previousRegistry !== registry) { + markPluginRegistryRetired(previousRegistry); + } + markPluginRegistryActive(registry); state.activeRegistry = registry; state.activeVersion += 1; syncTrackedSurface(state.httpRoute, registry, true); @@ -146,7 +158,6 @@ export function setActivePluginRegistry( } void cleanupPreviousPluginHostRegistry({ previousRegistry, - nextRegistry: registry, }).catch((error) => { log.warn(`plugin host registry cleanup failed: ${String(error)}`); });