[plugin sdk] Harden finalize retry and run context cleanup (#75600)

Merged via squash.

Prepared head SHA: ec58a6212b
Co-authored-by: 100yenadmin <239388517+100yenadmin@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Eva
2026-05-04 21:04:22 +07:00
committed by GitHub
parent 042d7b8823
commit 8afc9ef73c
22 changed files with 2112 additions and 128 deletions

View File

@@ -37,6 +37,8 @@ Docs: https://docs.openclaw.ai
- Plugins/update: clean stale bundled load paths for already-externalized pinned npm and ClawHub plugin installs, so release-channel sync does not leave removed bundled paths ahead of the installed external package. Thanks @vincentkoc.
- Telegram: accept plugin-owned numeric forum-topic targets in the agent message tool and keep reply-dispatch provider chunks behind a real stable runtime alias during in-place package updates. Fixes #77137. Thanks @richardmqq.
- Google Meet: preserve `realtime.introMessage: ""` so realtime Chrome joins can stay silent instead of restoring the default spoken intro. Thanks @vincentkoc.
- Plugins/SDK: add bounded `before_agent_finalize` retry instructions so workflow plugins can request one more model pass. Thanks @100yenadmin.
- Discord/status: add degraded Discord transport and gateway event-loop starvation signals to `openclaw channels status`, `openclaw status --deep`, and fetch-timeout logs so intermittent socket resets do not look like a healthy running channel. (#76327) Thanks @joshavant.
- Providers/OpenRouter: add opt-in response caching params that send OpenRouter's `X-OpenRouter-Cache`, `X-OpenRouter-Cache-TTL`, and cache-clear headers only on verified OpenRouter routes. Thanks @vincentkoc.
- Providers/OpenRouter: expand app-attribution categories so OpenClaw advertises coding, programming, writing, chat, and personal-agent usage on verified OpenRouter routes. Thanks @vincentkoc.
- Plugins/runtime state: add `registerIfAbsent` for atomic keyed-store dedupe claims that return whether a plugin successfully claimed a key without overwriting an existing live value. Thanks @amknight.
@@ -465,6 +467,9 @@ Docs: https://docs.openclaw.ai
- Gateway/CLI: make `openclaw gateway start` repair stale managed service definitions that point at old OpenClaw versions, missing binaries, or temporary installer paths before starting.
- Heartbeat/scheduler: make heartbeat phase scheduling active-hours-aware so the scheduler seeks forward to the first in-window phase slot instead of arming timers for quiet-hours slots and relying solely on the runtime guard. Non-UTC `activeHours.timezone` values (e.g. `Asia/Shanghai`) now correctly influence when the next heartbeat timer fires, avoiding wasted quiet-hours ticks and long dormant gaps after gateway restarts. Fixes #75487. Thanks @amknight.
- Providers/Arcee AI: mark Trinity Large Thinking as tool-incompatible so main-session runs use the same text-only request shape that made subagent runs recover, avoiding the remaining main-session response-shape mismatch after the #62848 transport failover fix. Fixes #62851 and #62847; carries forward #62848. Thanks @Adam-Researchh.
- Plugins/SDK: harden run-scoped plugin context cleanup so finalized workflow runs do not leak per-run state. Thanks @100yenadmin.
- Plugins/SDK: keep stale async registry cleanup from clearing restored plugin run context and scheduler state after a plugin registry is reactivated. (#75600) Thanks @100yenadmin.
- Plugins/SDK: preserve restored plugin scheduler state when earlier delayed replacement cleanup finishes after reactivation. Thanks @100yenadmin.
- Status: show the `openai-codex` OAuth profile for `openai/gpt-*` sessions running through the native Codex runtime instead of reporting auth as unknown. (#76197) Thanks @mbelinky.
- Gateway: avoid repeated plugin tool descriptor config hashing so large runtime configs do not block reply startup and trigger reconnect/timeouts. (#75944) Thanks @joshavant.
- Plugins/externalization: keep diagnostics ClawHub packages and persisted bundled-plugin relocation on npm-first install metadata for launch, and omit Discord from the core package now that its external package is published. Thanks @vincentkoc.

View File

@@ -1,2 +1,2 @@
c38441e2e18aa519c5dc22c2b593694444869673447740327c87f16f3d4a0f8d plugin-sdk-api-baseline.json
5711948923b5a4f89ac04a182266ee0fb57275369a3a8112433f3758a7d38c86 plugin-sdk-api-baseline.jsonl
3c0423e26e758e7a5f5febcbaacd6a7ceb8584a8eecd0224f7ce98e6bcb9e9c0 plugin-sdk-api-baseline.json
952ba44c63a9f2107fc10aead1d0cc77ef06ac9a9befcac3ca9e4b0f4427cdfc plugin-sdk-api-baseline.jsonl

View File

@@ -264,6 +264,22 @@ the harness for one more model pass before finalization, `{ action:
Codex native `Stop` hooks are relayed into this hook as OpenClaw
`before_agent_finalize` decisions.
When returning `action: "revise"`, plugins can include `retry` metadata to make
the extra model pass bounded and replay-safe:
```typescript
type BeforeAgentFinalizeRetry = {
instruction: string;
idempotencyKey?: string;
maxAttempts?: number;
};
```
`instruction` is appended to the revision reason sent to the harness.
`idempotencyKey` lets the host count retries for the same plugin request across
equivalent finalize decisions, and `maxAttempts` caps how many extra passes the
host will allow before continuing with the natural final answer.
Non-bundled plugins that need `llm_input`, `llm_output`,
`before_agent_finalize`, or `agent_end` must set:

View File

@@ -23,6 +23,7 @@ type MatrixHandlerTestHarnessOptions = {
accountId?: string;
accountConfig?: MatrixConfig;
cfg?: unknown;
liveCfg?: unknown;
client?: Partial<MatrixClient>;
runtime?: RuntimeEnv;
logger?: RuntimeLogger;
@@ -192,7 +193,7 @@ export function createMatrixHandlerTestHarness(
} as never,
core: {
config: {
current: () => cfgForHandler,
current: () => options.liveCfg ?? cfgForHandler,
},
channel: {
pairing: {

View File

@@ -228,6 +228,50 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("uses live dmScope when deciding whether to pin main DM route updates", async () => {
const startupCfg = {
session: { dmScope: "main" },
channels: {
matrix: {
dm: { allowFrom: ["@owner:example.org"] },
},
},
};
const liveCfg = {
session: { dmScope: "per-channel-peer" },
channels: {
matrix: {
dm: { allowFrom: ["@owner:example.org"] },
},
},
};
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
cfg: startupCfg,
liveCfg,
dmPolicy: "allowlist",
allowFrom: ["@owner:example.org"],
allowFromResolvedEntries: [{ input: "@owner:example.org", id: "@owner:example.org" }],
isDirectMessage: true,
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$owner-dm-live-scope",
sender: "@owner:example.org",
body: "hello",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
updateLastRoute: expect.objectContaining({
mainDmOwnerPin: undefined,
}),
}),
);
});
it("sends pairing reminders for pending requests with cooldown", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-01T10:00:00.000Z"));

View File

@@ -1198,7 +1198,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
triggerSnapshot,
threadRootId: _threadRootId,
thread,
effectiveAllowFrom,
effectiveGroupAllowFrom,
effectiveRoomUsers,
} = resolvedIngressResult;
@@ -1940,11 +1939,27 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
onIdle: typingCallbacks.onIdle,
});
const pinnedMainDmOwner = isDirectMessage
? resolvePinnedMainDmOwnerFromAllowlist({
dmScope: cfg.session?.dmScope,
allowFrom: effectiveAllowFrom,
normalizeEntry: normalizeMatrixUserId,
})
? await (async () => {
const livePinnedCfg = core.config.current() as CoreConfig;
const livePinnedAllowlists = resolveMatrixAccountAllowlistConfig({
cfg: livePinnedCfg,
accountId,
});
const livePinnedDmAllowFrom = await resolveCachedLiveAllowlist({
cfg: livePinnedCfg,
entries: livePinnedAllowlists.dmAllowFrom,
startupResolvedEntries: allowFromResolvedEntries,
cache: liveDmAllowlistCache,
updateCache: (next) => {
liveDmAllowlistCache = next;
},
});
return resolvePinnedMainDmOwnerFromAllowlist({
dmScope: livePinnedCfg.session?.dmScope,
allowFrom: livePinnedDmAllowFrom,
normalizeEntry: normalizeMatrixUserId,
});
})()
: null;
const turnResult = await core.channel.turn.run({

View File

@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
clearAgentHarnessFinalizeRetryBudget,
runAgentHarnessAgentEndHook,
runAgentHarnessBeforeAgentFinalizeHook,
runAgentHarnessLlmInputHook,
@@ -10,7 +11,24 @@ const legacyHookRunner = {
hasHooks: () => true,
};
const EVENT = {
runId: "run-1",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
turnId: "turn-1",
provider: "codex",
model: "gpt-5.4",
cwd: "/repo",
transcriptPath: "/tmp/session.jsonl",
stopHookActive: false,
lastAssistantMessage: "done",
};
describe("agent harness lifecycle hook helpers", () => {
afterEach(() => {
clearAgentHarnessFinalizeRetryBudget();
});
it("ignores legacy hook runners that advertise llm_input without a runner method", () => {
expect(() =>
runAgentHarnessLlmInputHook({
@@ -50,4 +68,276 @@ describe("agent harness lifecycle hook helpers", () => {
} as never),
).resolves.toEqual({ action: "continue" });
});
it("clears finalize retry budgets by run id", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "revise once",
idempotencyKey: "stable",
maxAttempts: 1,
},
}),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise once" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
clearAgentHarnessFinalizeRetryBudget({ runId: "run-1" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise once" });
});
it("does not clear finalize retry budgets for runs that only share a prefix", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "revise child once",
idempotencyKey: "stable",
maxAttempts: 1,
},
}),
};
const childEvent = {
...EVENT,
runId: "run:child",
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: childEvent,
ctx: { runId: "run:child", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise child once" });
clearAgentHarnessFinalizeRetryBudget({ runId: "run" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: childEvent,
ctx: { runId: "run:child", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
});
it("keys finalize retry budgets by context run id when the event omits run id", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "revise from context run",
idempotencyKey: "stable",
maxAttempts: 1,
},
}),
};
const eventWithoutRunId = {
...EVENT,
runId: undefined,
sessionId: "shared-session",
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: eventWithoutRunId,
ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise from context run" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: eventWithoutRunId,
ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
clearAgentHarnessFinalizeRetryBudget({ runId: "run-from-context" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: eventWithoutRunId,
ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise from context run" });
});
it("preserves merged revise reasons when retry metadata is present", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
reason: "fix generated baseline\n\nrerun the focused tests",
retry: {
instruction: "rerun the focused tests",
idempotencyKey: "merged-reason",
maxAttempts: 1,
},
}),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: "fix generated baseline\n\nrerun the focused tests",
});
});
it("honors a later finalize retry candidate after an earlier candidate is spent", async () => {
const firstRetry = {
instruction: "regenerate artifacts",
idempotencyKey: "artifacts",
maxAttempts: 1,
};
const secondRetry = {
instruction: "rerun focused tests",
idempotencyKey: "tests",
maxAttempts: 1,
};
const result = {
action: "revise",
reason: "retry generated artifacts\n\nretry focused tests",
retry: firstRetry,
};
Object.defineProperty(result, "retryCandidates", {
enumerable: false,
value: [firstRetry, secondRetry],
});
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue(result),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: "retry generated artifacts\n\nretry focused tests\n\nregenerate artifacts",
});
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: "retry generated artifacts\n\nretry focused tests\n\nrerun focused tests",
});
});
it("falls back to retry instruction keys when retry idempotency keys are malformed", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "retry with a safe key",
idempotencyKey: { invalid: true },
maxAttempts: 1,
} as never,
}),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: "retry with a safe key",
});
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
});
it("does not collide fallback retry keys for long instructions with shared prefixes", async () => {
const sharedPrefix = "x".repeat(180);
const firstInstruction = `${sharedPrefix} first`;
const secondInstruction = `${sharedPrefix} second`;
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi
.fn()
.mockResolvedValueOnce({
action: "revise",
retry: {
instruction: firstInstruction,
idempotencyKey: { invalid: true },
maxAttempts: 1,
},
})
.mockResolvedValueOnce({
action: "revise",
retry: {
instruction: secondInstruction,
idempotencyKey: { invalid: true },
maxAttempts: 1,
},
}),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: firstInstruction,
});
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: secondInstruction,
});
});
});

View File

@@ -1,3 +1,4 @@
import { createHash } from "node:crypto";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import type {
@@ -7,11 +8,57 @@ import type {
PluginHookLlmInputEvent,
PluginHookLlmOutputEvent,
} from "../../plugins/hook-types.js";
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
import { buildAgentHookContext, type AgentHarnessHookContext } from "./hook-context.js";
const log = createSubsystemLogger("agents/harness");
const FINALIZE_RETRY_BUDGET_KEY = Symbol.for("openclaw.pluginFinalizeRetryBudget");
const FINALIZE_RETRY_BUDGET_MAX_ENTRIES = 2048;
type AgentHarnessHookRunner = ReturnType<typeof getGlobalHookRunner>;
type FinalizeRetryBudget = Map<string, Map<string, number>>;
function getFinalizeRetryBudget(): FinalizeRetryBudget {
return resolveGlobalSingleton<FinalizeRetryBudget>(FINALIZE_RETRY_BUDGET_KEY, () => new Map());
}
function countFinalizeRetryBudgetEntries(budget: FinalizeRetryBudget): number {
let count = 0;
for (const runBudget of budget.values()) {
count += runBudget.size;
}
return count;
}
function pruneFinalizeRetryBudget(budget: FinalizeRetryBudget): void {
while (countFinalizeRetryBudgetEntries(budget) > FINALIZE_RETRY_BUDGET_MAX_ENTRIES) {
const oldestRunId = budget.keys().next().value;
if (oldestRunId === undefined) {
return;
}
const oldestRunBudget = budget.get(oldestRunId);
const oldestRetryKey = oldestRunBudget?.keys().next().value;
if (oldestRunBudget && oldestRetryKey !== undefined) {
oldestRunBudget.delete(oldestRetryKey);
}
if (!oldestRunBudget || oldestRunBudget.size === 0) {
budget.delete(oldestRunId);
}
}
}
function buildFinalizeRetryInstructionKey(instruction: string): string {
return `instruction:${createHash("sha256").update(instruction).digest("hex")}`;
}
export function clearAgentHarnessFinalizeRetryBudget(params?: { runId?: string }): void {
const budget = getFinalizeRetryBudget();
if (!params?.runId) {
budget.clear();
return;
}
budget.delete(params.runId);
}
export function runAgentHarnessLlmInputHook(params: {
event: PluginHookLlmInputEvent;
@@ -73,8 +120,16 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: {
return { action: "continue" };
}
try {
const eventForNormalization: PluginHookBeforeAgentFinalizeEvent = {
...params.event,
runId: params.event.runId ?? params.ctx.runId,
};
return normalizeBeforeAgentFinalizeResult(
await hookRunner.runBeforeAgentFinalize(params.event, buildAgentHookContext(params.ctx)),
await hookRunner.runBeforeAgentFinalize(
eventForNormalization,
buildAgentHookContext(params.ctx),
),
eventForNormalization,
);
} catch (error) {
log.warn(`before_agent_finalize hook failed: ${String(error)}`);
@@ -84,15 +139,78 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: {
function normalizeBeforeAgentFinalizeResult(
result: PluginHookBeforeAgentFinalizeResult | undefined,
event?: PluginHookBeforeAgentFinalizeEvent,
): AgentHarnessBeforeAgentFinalizeOutcome {
if (result?.action === "finalize") {
return result.reason?.trim()
? { action: "finalize", reason: result.reason.trim() }
: { action: "finalize" };
const reason = normalizeTrimmedString(result.reason);
return reason ? { action: "finalize", reason } : { action: "finalize" };
}
if (result?.action === "revise") {
const reason = result.reason?.trim();
const retryCandidates = readBeforeAgentFinalizeRetryCandidates(result);
if (retryCandidates.length > 0) {
const reason = normalizeTrimmedString(result.reason);
for (const retry of retryCandidates) {
const retryInstruction = normalizeTrimmedString(retry.instruction);
if (!retryInstruction) {
continue;
}
const maxAttempts =
typeof retry.maxAttempts === "number" && Number.isFinite(retry.maxAttempts)
? Math.max(1, Math.floor(retry.maxAttempts))
: 1;
const retryRunId = event?.runId ?? event?.sessionId ?? "unknown-run";
const retryKey =
normalizeTrimmedString(retry.idempotencyKey) ||
buildFinalizeRetryInstructionKey(retryInstruction);
const budget = getFinalizeRetryBudget();
const runBudget = budget.get(retryRunId) ?? new Map<string, number>();
const nextCount = (runBudget.get(retryKey) ?? 0) + 1;
runBudget.delete(retryKey);
runBudget.set(retryKey, nextCount);
budget.delete(retryRunId);
budget.set(retryRunId, runBudget);
pruneFinalizeRetryBudget(budget);
if (nextCount > maxAttempts) {
continue;
}
const revisedReason =
reason && reason.includes(retryInstruction)
? reason
: [reason, retryInstruction].filter(Boolean).join("\n\n");
return { action: "revise", reason: revisedReason };
}
return { action: "continue" };
}
const reason = normalizeTrimmedString(result.reason);
return reason ? { action: "revise", reason } : { action: "continue" };
}
return { action: "continue" };
}
function readBeforeAgentFinalizeRetryCandidates(
result: PluginHookBeforeAgentFinalizeResult,
): NonNullable<PluginHookBeforeAgentFinalizeResult["retry"]>[] {
const candidateList = (
result as {
retryCandidates?: unknown;
}
).retryCandidates;
if (Array.isArray(candidateList) && candidateList.length > 0) {
return candidateList.filter(isBeforeAgentFinalizeRetry);
}
return isBeforeAgentFinalizeRetry(result.retry) ? [result.retry] : [];
}
function isBeforeAgentFinalizeRetry(
value: unknown,
): value is NonNullable<PluginHookBeforeAgentFinalizeResult["retry"]> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
function normalizeTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}

View File

@@ -27,16 +27,16 @@ function createNonInteractiveMigrationPrompter(runtime: RuntimeEnv): WizardPromp
async note(message, title) {
runtime.log(title ? `${title}\n${message}` : message);
},
select(params) {
async select(params) {
return unavailable(params.message);
},
multiselect(params) {
async multiselect(params) {
return unavailable(params.message);
},
text(params) {
async text(params) {
return unavailable(params.message);
},
confirm(params) {
async confirm(params) {
return unavailable(params.message);
},
progress(label) {

View File

@@ -0,0 +1,855 @@
import fs from "node:fs/promises";
import path from "node:path";
import {
createPluginRegistryFixture,
registerTestPlugin,
} from "openclaw/plugin-sdk/plugin-test-contracts";
import { afterEach, describe, expect, it, vi } from "vitest";
import { loadSessionStore, updateSessionStore } from "../../config/sessions.js";
import { withTempConfig } from "../../gateway/test-temp-config.js";
import { emitAgentEvent, resetAgentEventsForTest } from "../../infra/agent-events.js";
import { resolvePreferredOpenClawTmpDir } from "../../infra/tmp-openclaw-dir.js";
import { PLUGIN_HOST_CLEANUP_TIMEOUT_MS } from "../host-hook-cleanup-timeout.js";
import { runPluginHostCleanup } from "../host-hook-cleanup.js";
import {
clearPluginHostRuntimeState,
getPluginRunContext,
listPluginSessionSchedulerJobs,
PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS,
dispatchPluginAgentEventSubscriptions,
registerPluginSessionSchedulerJob,
setPluginRunContext,
} from "../host-hook-runtime.js";
import { createEmptyPluginRegistry } from "../registry-empty.js";
import { setActivePluginRegistry } from "../runtime.js";
import { createPluginRecord } from "../status.test-helpers.js";
import type { OpenClawPluginApi } from "../types.js";
async function waitForPluginEventHandlers(): Promise<void> {
await new Promise<void>((resolve) => {
setTimeout(resolve, 0);
});
}
describe("plugin run context lifecycle", () => {
afterEach(() => {
vi.useRealTimers();
setActivePluginRegistry(createEmptyPluginRegistry());
clearPluginHostRuntimeState();
resetAgentEventsForTest();
});
it("blocks stale plugin API run-context mutations after registry replacement", () => {
const { config, registry } = createPluginRegistryFixture();
let capturedApi: OpenClawPluginApi | undefined;
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "stale-run-context-plugin",
name: "Stale Run Context Plugin",
}),
register(api) {
capturedApi = api;
},
});
setActivePluginRegistry(registry.registry);
setActivePluginRegistry(createEmptyPluginRegistry());
expect(
capturedApi?.setRunContext({
runId: "stale-run",
namespace: "state",
value: { stale: true },
}),
).toBe(false);
expect(
getPluginRunContext({
pluginId: "stale-run-context-plugin",
get: { runId: "stale-run", namespace: "state" },
}),
).toBeUndefined();
expect(
setPluginRunContext({
pluginId: "stale-run-context-plugin",
patch: { runId: "stale-run", namespace: "state", value: { live: true } },
}),
).toBe(true);
capturedApi?.clearRunContext({ runId: "stale-run", namespace: "state" });
expect(
getPluginRunContext({
pluginId: "stale-run-context-plugin",
get: { runId: "stale-run", namespace: "state" },
}),
).toEqual({ live: true });
});
it("allows run-context mutations after a previous registry is restored active", () => {
const { config, registry } = createPluginRegistryFixture();
let capturedApi: OpenClawPluginApi | undefined;
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "restored-run-context-plugin",
name: "Restored Run Context Plugin",
}),
register(api) {
capturedApi = api;
},
});
setActivePluginRegistry(registry.registry);
setActivePluginRegistry(createEmptyPluginRegistry());
setActivePluginRegistry(registry.registry);
expect(
capturedApi?.setRunContext({
runId: "restored-run",
namespace: "state",
value: { restored: true },
}),
).toBe(true);
expect(
getPluginRunContext({
pluginId: "restored-run-context-plugin",
get: { runId: "restored-run", namespace: "state" },
}),
).toEqual({ restored: true });
});
it("allows run-context initialization during activating plugin registration", () => {
const { config, registry } = createPluginRegistryFixture();
const api = registry.createApi(
createPluginRecord({
id: "registration-run-context-plugin",
name: "Registration Run Context Plugin",
}),
{ config },
);
expect(
api.setRunContext({
runId: "run-registration",
namespace: "state",
value: { initialized: true },
}),
).toBe(true);
expect(
getPluginRunContext({
pluginId: "registration-run-context-plugin",
get: { runId: "run-registration", namespace: "state" },
}),
).toEqual({ initialized: true });
api.clearRunContext({ runId: "run-registration", namespace: "state" });
expect(
getPluginRunContext({
pluginId: "registration-run-context-plugin",
get: { runId: "run-registration", namespace: "state" },
}),
).toBeUndefined();
});
it("keeps restored active registry state after stale async cleanup finishes", async () => {
let releaseCleanup: (() => void) | undefined;
let markCleanupStarted: (() => void) | undefined;
let capturedApi: OpenClawPluginApi | undefined;
const cleanupStarted = new Promise<void>((resolve) => {
markCleanupStarted = resolve;
});
const cleanupRelease = new Promise<void>((resolve) => {
releaseCleanup = resolve;
});
const schedulerCleanup = vi.fn();
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "delayed-restored-registry-plugin",
name: "Delayed Restored Registry Plugin",
}),
register(api) {
capturedApi = api;
api.registerRuntimeLifecycle({
id: "delayed-cleanup",
async cleanup() {
markCleanupStarted?.();
await cleanupRelease;
},
});
api.registerSessionSchedulerJob({
id: "live-job",
sessionKey: "agent:main:main",
kind: "session-turn",
cleanup: schedulerCleanup,
});
},
});
setActivePluginRegistry(registry.registry);
setActivePluginRegistry(createEmptyPluginRegistry());
await cleanupStarted;
setActivePluginRegistry(registry.registry);
expect(
capturedApi?.setRunContext({
runId: "restored-after-cleanup-started",
namespace: "state",
value: { restored: true },
}),
).toBe(true);
releaseCleanup?.();
await waitForPluginEventHandlers();
await waitForPluginEventHandlers();
expect(
getPluginRunContext({
pluginId: "delayed-restored-registry-plugin",
get: { runId: "restored-after-cleanup-started", namespace: "state" },
}),
).toEqual({ restored: true });
expect(schedulerCleanup).not.toHaveBeenCalled();
expect(listPluginSessionSchedulerJobs("delayed-restored-registry-plugin")).toEqual([
{
id: "live-job",
pluginId: "delayed-restored-registry-plugin",
sessionKey: "agent:main:main",
kind: "session-turn",
},
]);
});
it("does not let delayed non-terminal subscriptions resurrect closed run context", async () => {
let releaseToolHandler: (() => void) | undefined;
let delayedToolHandlerSawContext: unknown;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "delayed-subscription",
name: "Delayed Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "delayed",
streams: ["tool"],
async handle(_event, ctx) {
ctx.setRunContext("before-terminal", { visible: true });
await new Promise<void>((resolve) => {
releaseToolHandler = resolve;
});
delayedToolHandlerSawContext = ctx.getRunContext("before-terminal");
ctx.setRunContext("late", { resurrected: true });
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-delayed-subscription",
stream: "tool",
data: { name: "tool" },
});
await Promise.resolve();
emitAgentEvent({
runId: "run-delayed-subscription",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
expect(
getPluginRunContext({
pluginId: "delayed-subscription",
get: { runId: "run-delayed-subscription", namespace: "before-terminal" },
}),
).toEqual({ visible: true });
releaseToolHandler?.();
await waitForPluginEventHandlers();
expect(delayedToolHandlerSawContext).toEqual({ visible: true });
expect(
getPluginRunContext({
pluginId: "delayed-subscription",
get: { runId: "run-delayed-subscription", namespace: "late" },
}),
).toBeUndefined();
});
it("preserves run context until async terminal event subscriptions settle", async () => {
let releaseTerminalHandler: (() => void) | undefined;
let terminalHandlerSawContext: unknown;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "async-terminal-subscription",
name: "Async Terminal Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "records",
streams: ["tool", "lifecycle"],
async handle(event, ctx) {
if (event.stream === "tool") {
ctx.setRunContext("seen", { runId: event.runId });
return;
}
if (event.data?.phase !== "end") {
return;
}
await new Promise<void>((resolve) => {
releaseTerminalHandler = resolve;
});
terminalHandlerSawContext = ctx.getRunContext("seen");
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-async-terminal",
stream: "tool",
data: { name: "tool" },
});
await Promise.resolve();
emitAgentEvent({
runId: "run-async-terminal",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
expect(
getPluginRunContext({
pluginId: "async-terminal-subscription",
get: { runId: "run-async-terminal", namespace: "seen" },
}),
).toEqual({ runId: "run-async-terminal" });
releaseTerminalHandler?.();
await waitForPluginEventHandlers();
expect(terminalHandlerSawContext).toEqual({ runId: "run-async-terminal" });
expect(
getPluginRunContext({
pluginId: "async-terminal-subscription",
get: { runId: "run-async-terminal", namespace: "seen" },
}),
).toBeUndefined();
});
it("waits for terminal handlers added after the first terminal cleanup waiter starts", async () => {
let releaseFirstTerminalHandler: (() => void) | undefined;
let releaseSecondTerminalHandler: (() => void) | undefined;
let firstTerminalHandlerSawContext: unknown;
let secondTerminalHandlerSawContext: unknown;
let terminalEventsSeen = 0;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "repeated-terminal-live-wait",
name: "Repeated Terminal Live Wait",
}),
register(api) {
api.registerAgentEventSubscription({
id: "records",
streams: ["tool", "lifecycle"],
async handle(event, ctx) {
if (event.stream === "tool") {
ctx.setRunContext("seen", { runId: event.runId });
return;
}
if (event.data?.phase !== "end") {
return;
}
terminalEventsSeen += 1;
if (terminalEventsSeen === 1) {
await new Promise<void>((resolve) => {
releaseFirstTerminalHandler = resolve;
});
firstTerminalHandlerSawContext = ctx.getRunContext("seen");
return;
}
await new Promise<void>((resolve) => {
releaseSecondTerminalHandler = resolve;
});
secondTerminalHandlerSawContext = ctx.getRunContext("seen");
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-repeated-terminal-live-wait",
stream: "tool",
data: { name: "tool" },
});
await waitForPluginEventHandlers();
emitAgentEvent({
runId: "run-repeated-terminal-live-wait",
stream: "lifecycle",
data: { phase: "end" },
});
await waitForPluginEventHandlers();
emitAgentEvent({
runId: "run-repeated-terminal-live-wait",
stream: "lifecycle",
data: { phase: "end" },
});
await waitForPluginEventHandlers();
releaseFirstTerminalHandler?.();
await waitForPluginEventHandlers();
expect(firstTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" });
expect(
getPluginRunContext({
pluginId: "repeated-terminal-live-wait",
get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" },
}),
).toEqual({ runId: "run-repeated-terminal-live-wait" });
releaseSecondTerminalHandler?.();
await waitForPluginEventHandlers();
await waitForPluginEventHandlers();
expect(secondTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" });
expect(
getPluginRunContext({
pluginId: "repeated-terminal-live-wait",
get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" },
}),
).toBeUndefined();
});
it("clears run context after the terminal subscription grace period", async () => {
vi.useFakeTimers();
let releaseTerminalHandler: (() => void) | undefined;
let terminalHandlerSawContext: unknown;
let terminalHandlerWroteContext: unknown;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "slow-terminal-subscription",
name: "Slow Terminal Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "slow",
streams: ["tool", "lifecycle"],
async handle(event, ctx) {
if (event.stream === "tool") {
ctx.setRunContext("seen", { runId: event.runId });
return;
}
if (event.data?.phase === "end") {
await new Promise<void>((resolve) => {
releaseTerminalHandler = resolve;
});
terminalHandlerSawContext = ctx.getRunContext("seen");
ctx.setRunContext("terminal", { completed: true });
terminalHandlerWroteContext = ctx.getRunContext("terminal");
}
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-slow-terminal",
stream: "tool",
data: { name: "tool" },
});
await Promise.resolve();
emitAgentEvent({
runId: "run-slow-terminal",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
await vi.advanceTimersByTimeAsync(PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "seen" },
}),
).toBeUndefined();
releaseTerminalHandler?.();
await vi.advanceTimersByTimeAsync(0);
expect(terminalHandlerSawContext).toBeUndefined();
expect(terminalHandlerWroteContext).toBeUndefined();
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "seen" },
}),
).toBeUndefined();
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "terminal" },
}),
).toBeUndefined();
});
it("keeps the expired terminal marker across repeated terminal events", async () => {
vi.useFakeTimers();
let releaseFirstTerminalHandler: (() => void) | undefined;
let firstTerminalHandlerWroteContext: unknown;
let secondTerminalHandlerWroteContext: unknown;
let terminalEventsSeen = 0;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "repeated-terminal-subscription",
name: "Repeated Terminal Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "repeat-terminal",
streams: ["lifecycle"],
async handle(event, ctx) {
if (event.data?.phase !== "end") {
return;
}
terminalEventsSeen += 1;
if (terminalEventsSeen === 1) {
await new Promise<void>((resolve) => {
releaseFirstTerminalHandler = resolve;
});
ctx.setRunContext("terminal", { from: "first" });
firstTerminalHandlerWroteContext = ctx.getRunContext("terminal");
return;
}
ctx.setRunContext("terminal", { from: "second" });
secondTerminalHandlerWroteContext = ctx.getRunContext("terminal");
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-repeat-terminal",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
await vi.advanceTimersByTimeAsync(PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
emitAgentEvent({
runId: "run-repeat-terminal",
stream: "lifecycle",
data: { phase: "end" },
});
await vi.advanceTimersByTimeAsync(0);
expect(secondTerminalHandlerWroteContext).toBeUndefined();
expect(
getPluginRunContext({
pluginId: "repeated-terminal-subscription",
get: { runId: "run-repeat-terminal", namespace: "terminal" },
}),
).toBeUndefined();
releaseFirstTerminalHandler?.();
await vi.advanceTimersByTimeAsync(0);
expect(firstTerminalHandlerWroteContext).toBeUndefined();
expect(
getPluginRunContext({
pluginId: "repeated-terminal-subscription",
get: { runId: "run-repeat-terminal", namespace: "terminal" },
}),
).toBeUndefined();
});
it("preserves scheduler jobs instead of invoking stale cleanup callbacks", async () => {
const cleanup = vi.fn();
registerPluginSessionSchedulerJob({
pluginId: "scheduler-plugin",
pluginName: "Scheduler Plugin",
job: {
id: "job-preserved",
sessionKey: "agent:main:main",
kind: "session-turn",
cleanup,
},
});
await expect(
runPluginHostCleanup({
reason: "disable",
pluginId: "scheduler-plugin",
preserveSchedulerJobIds: new Set(["job-preserved"]),
}),
).resolves.toMatchObject({ failures: [] });
expect(cleanup).not.toHaveBeenCalled();
expect(listPluginSessionSchedulerJobs("scheduler-plugin")).toHaveLength(1);
});
it("preserves plugin run context during restart cleanup", async () => {
const registry = createEmptyPluginRegistry();
expect(
setPluginRunContext({
pluginId: "restart-context-plugin",
patch: { runId: "run-restart", namespace: "state", value: { keep: true } },
}),
).toBe(true);
await expect(
runPluginHostCleanup({
registry,
pluginId: "restart-context-plugin",
reason: "restart",
}),
).resolves.toMatchObject({ failures: [] });
expect(
getPluginRunContext({
pluginId: "restart-context-plugin",
get: { runId: "run-restart", namespace: "state" },
}),
).toEqual({ keep: true });
await expect(
runPluginHostCleanup({
registry,
pluginId: "restart-context-plugin",
reason: "disable",
}),
).resolves.toMatchObject({ failures: [] });
expect(
getPluginRunContext({
pluginId: "restart-context-plugin",
get: { runId: "run-restart", namespace: "state" },
}),
).toBeUndefined();
});
it("preserves durable plugin session state during plugin restart cleanup", async () => {
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "restart-state-fixture",
name: "Restart State Fixture",
}),
register(api) {
api.registerSessionExtension({
namespace: "workflow",
description: "restart state test",
});
},
});
const stateDir = await fs.mkdtemp(
path.join(resolvePreferredOpenClawTmpDir(), "openclaw-run-context-restart-state-"),
);
const storePath = path.join(stateDir, "sessions.json");
const tempConfig = {
session: { store: storePath },
};
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
try {
process.env.OPENCLAW_STATE_DIR = stateDir;
await withTempConfig({
cfg: tempConfig,
run: async () => {
await updateSessionStore(storePath, (store) => {
store["agent:main:main"] = {
sessionId: "session-1",
updatedAt: Date.now(),
pluginExtensions: {
"restart-state-fixture": { workflow: { state: "waiting" } },
},
pluginNextTurnInjections: {
"restart-state-fixture": [
{
id: "resume",
pluginId: "restart-state-fixture",
text: "resume",
placement: "prepend_context",
createdAt: 1,
},
],
},
};
return undefined;
});
await expect(
runPluginHostCleanup({
cfg: tempConfig,
registry: registry.registry,
pluginId: "restart-state-fixture",
reason: "restart",
}),
).resolves.toMatchObject({ failures: [] });
const stored = loadSessionStore(storePath, { skipCache: true });
expect(stored["agent:main:main"]?.pluginExtensions).toEqual({
"restart-state-fixture": { workflow: { state: "waiting" } },
});
expect(stored["agent:main:main"]?.pluginNextTurnInjections).toEqual({
"restart-state-fixture": [
{
id: "resume",
pluginId: "restart-state-fixture",
text: "resume",
placement: "prepend_context",
createdAt: 1,
},
],
});
},
});
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
}
});
it("rejects hung cleanup hooks with a bounded timeout", async () => {
vi.useFakeTimers();
const cleanup = vi.fn(async () => {
await new Promise(() => undefined);
});
registerPluginSessionSchedulerJob({
pluginId: "hung-cleanup-plugin",
pluginName: "Hung Cleanup Plugin",
job: {
id: "job-hung",
sessionKey: "agent:main:main",
kind: "session-turn",
cleanup,
},
});
const resultPromise = runPluginHostCleanup({
reason: "disable",
pluginId: "hung-cleanup-plugin",
});
await vi.advanceTimersByTimeAsync(0);
expect(cleanup).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(PLUGIN_HOST_CLEANUP_TIMEOUT_MS);
await expect(resultPromise).resolves.toMatchObject({
failures: [
{
pluginId: "hung-cleanup-plugin",
hookId: "scheduler:job-hung",
},
],
});
});
it("bounds session, runtime, and scheduler cleanup callbacks so cleanup keeps moving", async () => {
vi.useFakeTimers();
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "hanging-cleanup-fixture",
name: "Hanging Cleanup Fixture",
}),
register(api) {
api.registerSessionExtension({
namespace: "state",
description: "hangs during cleanup",
cleanup: () => new Promise(() => undefined),
});
api.registerRuntimeLifecycle({
id: "runtime-cleanup",
cleanup: () => new Promise(() => undefined),
});
api.registerSessionSchedulerJob({
id: "scheduler-cleanup",
sessionKey: "agent:main:main",
kind: "monitor",
cleanup: () => new Promise(() => undefined),
});
},
});
const cleanupPromise = runPluginHostCleanup({
cfg: config,
registry: registry.registry,
pluginId: "hanging-cleanup-fixture",
reason: "delete",
});
for (let index = 0; index < 3; index += 1) {
await vi.advanceTimersByTimeAsync(PLUGIN_HOST_CLEANUP_TIMEOUT_MS + 1);
}
await expect(cleanupPromise).resolves.toMatchObject({
failures: [
expect.objectContaining({
pluginId: "hanging-cleanup-fixture",
hookId: "session:state",
}),
expect.objectContaining({
pluginId: "hanging-cleanup-fixture",
hookId: "runtime:runtime-cleanup",
}),
expect.objectContaining({
pluginId: "hanging-cleanup-fixture",
hookId: "scheduler:scheduler-cleanup",
}),
],
});
});
it("blocks setting run context after a run is closed", () => {
expect(
setPluginRunContext({
pluginId: "closed-run-plugin",
patch: { runId: "run-closed", namespace: "state", value: { before: true } },
}),
).toBe(true);
dispatchPluginAgentEventSubscriptions({
registry: createEmptyPluginRegistry(),
event: {
runId: "run-closed",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
},
});
expect(
setPluginRunContext({
pluginId: "closed-run-plugin",
patch: { runId: "run-closed", namespace: "state", value: { after: true } },
}),
).toBe(false);
});
});

View File

@@ -299,6 +299,11 @@ export type PluginHookBeforeAgentFinalizeResult = {
*/
action?: "continue" | "revise" | "finalize";
reason?: string;
retry?: {
instruction: string;
idempotencyKey?: string;
maxAttempts?: number;
};
};
export type PluginHookBeforeCompactionEvent = {

View File

@@ -60,6 +60,127 @@ describe("before_agent_finalize hook runner", () => {
});
});
it("skips empty retry instructions when merging revise decisions", async () => {
const runner = createHookRunner(
createMockPluginRegistry([
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "needs a retry but forgot the instruction",
retry: { instruction: " ", idempotencyKey: "empty-retry" },
}),
},
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "rerun the focused tests",
retry: {
instruction: " rerun the focused tests ",
idempotencyKey: "valid-retry",
maxAttempts: 1,
},
}),
},
]),
);
await expect(runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX)).resolves.toEqual({
action: "revise",
reason: "needs a retry but forgot the instruction\n\nrerun the focused tests",
retry: {
instruction: "rerun the focused tests",
idempotencyKey: "valid-retry",
maxAttempts: 1,
},
});
});
it("skips malformed retry instructions when merging revise decisions", async () => {
const runner = createHookRunner(
createMockPluginRegistry([
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "malformed retry payload should not crash",
retry: { instruction: 123, idempotencyKey: "bad-retry" } as never,
}),
},
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "valid retry still applies",
retry: {
instruction: " rerun the focused tests ",
idempotencyKey: "valid-retry",
},
}),
},
]),
);
await expect(runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX)).resolves.toEqual({
action: "revise",
reason: "malformed retry payload should not crash\n\nvalid retry still applies",
retry: {
instruction: "rerun the focused tests",
idempotencyKey: "valid-retry",
},
});
});
it("preserves multiple valid retry candidates for budget evaluation", async () => {
const runner = createHookRunner(
createMockPluginRegistry([
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "retry generated artifacts",
retry: {
instruction: "regenerate artifacts",
idempotencyKey: "artifacts",
maxAttempts: 1,
},
}),
},
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "retry focused tests",
retry: {
instruction: "rerun focused tests",
idempotencyKey: "tests",
maxAttempts: 1,
},
}),
},
]),
);
const result = await runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX);
expect(result).toEqual({
action: "revise",
reason: "retry generated artifacts\n\nretry focused tests",
retry: {
instruction: "regenerate artifacts",
idempotencyKey: "artifacts",
maxAttempts: 1,
},
});
expect(Object.getOwnPropertyDescriptor(result, "retryCandidates")?.enumerable).toBe(false);
expect(
(Object.getOwnPropertyDescriptor(result, "retryCandidates")?.value as unknown[])?.map(
(retry) => (retry as { idempotencyKey?: string }).idempotencyKey,
),
).toEqual(["artifacts", "tests"]);
});
it("lets finalize override earlier revise decisions", async () => {
const runner = createHookRunner(
createMockPluginRegistry([

View File

@@ -154,6 +154,11 @@ export type HookRunnerLogger = {
export type HookFailurePolicy = "fail-open" | "fail-closed";
type BeforeAgentFinalizeRetry = NonNullable<PluginHookBeforeAgentFinalizeResult["retry"]>;
type BeforeAgentFinalizeResultWithRetryCandidates = PluginHookBeforeAgentFinalizeResult & {
retryCandidates?: BeforeAgentFinalizeRetry[];
};
export type HookRunnerOptions = {
logger?: HookRunnerLogger;
/** If true, errors in hooks will be caught and logged instead of thrown */
@@ -319,6 +324,48 @@ export function createHookRunner(
acc: PluginHookBeforeAgentFinalizeResult | undefined,
next: PluginHookBeforeAgentFinalizeResult,
): PluginHookBeforeAgentFinalizeResult => {
const normalizeRetry = (
retry: PluginHookBeforeAgentFinalizeResult["retry"] | undefined,
): BeforeAgentFinalizeRetry | undefined => {
const instruction = typeof retry?.instruction === "string" ? retry.instruction.trim() : "";
if (!instruction) {
return undefined;
}
return {
...retry,
instruction,
};
};
const readRetryCandidates = (
result: PluginHookBeforeAgentFinalizeResult | undefined,
): BeforeAgentFinalizeRetry[] => {
if (!result || result.action !== "revise") {
return [];
}
const candidateList = (result as BeforeAgentFinalizeResultWithRetryCandidates)
.retryCandidates;
if (Array.isArray(candidateList) && candidateList.length > 0) {
return candidateList
.map((retry) => normalizeRetry(retry))
.filter((retry): retry is BeforeAgentFinalizeRetry => retry !== undefined);
}
const retry = normalizeRetry(result.retry);
return retry ? [retry] : [];
};
const attachRetryCandidates = (
result: PluginHookBeforeAgentFinalizeResult,
candidates: BeforeAgentFinalizeRetry[],
): PluginHookBeforeAgentFinalizeResult => {
if (result.action !== "revise" || candidates.length <= 1) {
return result;
}
Object.defineProperty(result, "retryCandidates", {
configurable: true,
enumerable: false,
value: candidates,
});
return result;
};
if (acc?.action === "finalize") {
return acc;
}
@@ -326,19 +373,30 @@ export function createHookRunner(
return { action: "finalize", reason: next.reason };
}
if (acc?.action === "revise" && next.action === "revise") {
return {
action: "revise",
reason: concatOptionalTextSegments({
left: acc.reason,
right: next.reason,
}),
};
const retryCandidates = [...readRetryCandidates(acc), ...readRetryCandidates(next)];
const retry = retryCandidates[0];
return attachRetryCandidates(
{
action: "revise",
reason: concatOptionalTextSegments({
left: acc.reason,
right: next.reason,
}),
...(retry ? { retry } : {}),
},
retryCandidates,
);
}
if (acc?.action === "revise") {
return acc;
}
if (next.action === "revise") {
return { action: "revise", reason: next.reason };
const retry = normalizeRetry(next.retry);
return {
action: "revise",
reason: next.reason,
...(retry ? { retry } : {}),
};
}
return next.action === "continue" ? { action: "continue", reason: next.reason } : (acc ?? next);
};

View File

@@ -0,0 +1,36 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
PLUGIN_HOST_CLEANUP_TIMEOUT_MS,
withPluginHostCleanupTimeout,
} from "./host-hook-cleanup-timeout.js";
describe("withPluginHostCleanupTimeout", () => {
afterEach(() => {
vi.restoreAllMocks();
});
it("unrefs cleanup timeout timers so pending cleanup does not keep the process alive", async () => {
const originalSetTimeout = globalThis.setTimeout;
const unref = vi.fn();
vi.spyOn(globalThis, "setTimeout").mockImplementation(((
callback: () => void,
timeout?: number,
) => {
const timer = originalSetTimeout(callback, timeout);
vi.spyOn(timer, "unref").mockImplementation(() => {
unref();
return timer;
});
return timer;
}) as typeof setTimeout);
await expect(withPluginHostCleanupTimeout("fast-cleanup", () => "ok")).resolves.toBe("ok");
expect(globalThis.setTimeout).toHaveBeenCalledWith(
expect.any(Function),
PLUGIN_HOST_CLEANUP_TIMEOUT_MS,
);
expect(unref).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,30 @@
export const PLUGIN_HOST_CLEANUP_TIMEOUT_MS = 5_000;
export class PluginHostCleanupTimeoutError extends Error {
constructor(hookId: string) {
super(`plugin host cleanup timed out: ${hookId}`);
this.name = "PluginHostCleanupTimeoutError";
}
}
export async function withPluginHostCleanupTimeout<T>(
hookId: string,
cleanup: () => T | Promise<T>,
): Promise<T> {
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
Promise.resolve().then(cleanup),
new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new PluginHostCleanupTimeoutError(hookId));
}, PLUGIN_HOST_CLEANUP_TIMEOUT_MS);
timeout.unref?.();
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}

View File

@@ -0,0 +1,54 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { runPluginHostCleanup } from "./host-hook-cleanup.js";
import { createEmptyPluginRegistry } from "./registry-empty.js";
const mocks = vi.hoisted(() => ({
getRuntimeConfig: vi.fn(),
}));
vi.mock("../config/config.js", () => ({
getRuntimeConfig: mocks.getRuntimeConfig,
}));
describe("plugin host cleanup config fallback", () => {
afterEach(() => {
mocks.getRuntimeConfig.mockReset();
});
it("records session store config failures while continuing runtime cleanup", async () => {
const registry = createEmptyPluginRegistry();
const cleanup = vi.fn();
registry.runtimeLifecycles ??= [];
registry.runtimeLifecycles.push({
pluginId: "cleanup-plugin",
pluginName: "Cleanup Plugin",
source: "test",
lifecycle: {
id: "runtime-cleanup",
cleanup,
},
});
mocks.getRuntimeConfig.mockImplementation(() => {
throw new Error("invalid config");
});
const result = await runPluginHostCleanup({
registry,
pluginId: "cleanup-plugin",
reason: "disable",
});
expect(cleanup).toHaveBeenCalledWith(
expect.objectContaining({
reason: "disable",
}),
);
expect(result.cleanupCount).toBe(1);
expect(result.failures).toEqual([
expect.objectContaining({
pluginId: "cleanup-plugin",
hookId: "session-store",
}),
]);
});
});

View File

@@ -1,10 +1,16 @@
import fs from "node:fs";
import { getRuntimeConfig } from "../config/config.js";
import { updateSessionStore } from "../config/sessions/store.js";
import { resolveAllAgentSessionStoreTargetsSync } from "../config/sessions/targets.js";
import type { SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { cleanupPluginSessionSchedulerJobs, clearPluginRunContext } from "./host-hook-runtime.js";
import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js";
import {
cleanupPluginSessionSchedulerJobs,
clearPluginRunContext,
makePluginSessionSchedulerJobKey,
} from "./host-hook-runtime.js";
import type { PluginHostCleanupReason } from "./host-hooks.js";
import type { PluginRegistry } from "./registry-types.js";
@@ -101,84 +107,136 @@ async function clearPluginOwnedSessionStores(params: {
}
export async function runPluginHostCleanup(params: {
cfg: OpenClawConfig;
cfg?: OpenClawConfig;
registry?: PluginRegistry | null;
pluginId?: string;
reason: PluginHostCleanupReason;
sessionKey?: string;
runId?: string;
preserveSchedulerJobIds?: ReadonlySet<string>;
shouldCleanup?: () => boolean;
}): Promise<PluginHostCleanupResult> {
const persistentCleanupCount =
params.reason === "restart"
? 0
: await clearPluginOwnedSessionStores({
cfg: params.cfg,
pluginId: params.pluginId,
sessionKey: params.sessionKey,
});
const registry = params.registry;
if (!registry) {
return { cleanupCount: persistentCleanupCount, failures: [] };
}
const failures: PluginHostCleanupFailure[] = [];
const shouldCleanup = params.shouldCleanup ?? (() => true);
if (!shouldCleanup()) {
return { cleanupCount: 0, failures };
}
let persistentCleanupCount = 0;
if (params.reason !== "restart" && shouldCleanup()) {
try {
persistentCleanupCount = await clearPluginOwnedSessionStores({
cfg: params.cfg ?? getRuntimeConfig(),
pluginId: params.pluginId,
sessionKey: params.sessionKey,
});
} catch (error) {
failures.push({
pluginId: params.pluginId ?? "plugin-host",
hookId: "session-store",
error,
});
}
}
const registry = params.registry;
let cleanupCount = persistentCleanupCount;
for (const registration of registry.sessionExtensions ?? []) {
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
if (registry) {
for (const registration of registry.sessionExtensions ?? []) {
if (!shouldCleanup()) {
return { cleanupCount, failures };
}
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
}
const cleanup = registration.extension.cleanup;
if (!cleanup) {
continue;
}
const hookId = `session:${registration.extension.namespace}`;
try {
await withPluginHostCleanupTimeout(hookId, () =>
cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
}),
);
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId,
error,
});
}
}
const cleanup = registration.extension.cleanup;
if (!cleanup) {
continue;
for (const registration of registry.runtimeLifecycles ?? []) {
if (!shouldCleanup()) {
return { cleanupCount, failures };
}
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
}
const cleanup = registration.lifecycle.cleanup;
if (!cleanup) {
continue;
}
const hookId = `runtime:${registration.lifecycle.id}`;
try {
await withPluginHostCleanupTimeout(hookId, () =>
cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
runId: params.runId,
}),
);
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId,
error,
});
}
}
try {
await cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
});
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId: `session:${registration.extension.namespace}`,
error,
});
const schedulerFailures = await cleanupPluginSessionSchedulerJobs({
pluginId: params.pluginId,
reason: params.reason,
sessionKey: params.sessionKey,
records: registry.sessionSchedulerJobs,
preserveJobIds: params.preserveSchedulerJobIds,
shouldCleanup,
});
for (const failure of schedulerFailures) {
failures.push(failure);
}
}
for (const registration of registry.runtimeLifecycles ?? []) {
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
}
const cleanup = registration.lifecycle.cleanup;
if (!cleanup) {
continue;
}
try {
await cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
runId: params.runId,
});
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId: `runtime:${registration.lifecycle.id}`,
error,
});
if (params.reason !== "restart" && shouldCleanup()) {
const registrySchedulerJobKeys = new Set(
(registry?.sessionSchedulerJobs ?? [])
.filter((record) => !params.pluginId || record.pluginId === params.pluginId)
.map((record) => ({
pluginId: record.pluginId,
jobId: typeof record.job.id === "string" ? record.job.id.trim() : "",
}))
.filter(({ jobId }) => jobId.length > 0)
.map(({ pluginId, jobId }) => makePluginSessionSchedulerJobKey(pluginId, jobId)),
);
const runtimeSchedulerFailures = await cleanupPluginSessionSchedulerJobs({
pluginId: params.pluginId,
reason: params.reason,
sessionKey: params.sessionKey,
preserveJobIds: params.preserveSchedulerJobIds,
excludeJobKeys: registrySchedulerJobKeys,
shouldCleanup,
});
for (const failure of runtimeSchedulerFailures) {
failures.push(failure);
}
}
const schedulerFailures = await cleanupPluginSessionSchedulerJobs({
pluginId: params.pluginId,
reason: params.reason,
sessionKey: params.sessionKey,
records: registry?.sessionSchedulerJobs,
preserveJobIds: params.preserveSchedulerJobIds,
});
for (const failure of schedulerFailures) {
failures.push(failure);
}
if (params.pluginId || params.runId) {
if (
shouldCleanup() &&
(params.pluginId || params.runId) &&
(params.reason !== "restart" || params.runId)
) {
clearPluginRunContext({ pluginId: params.pluginId, runId: params.runId });
}
return { cleanupCount, failures };
@@ -225,9 +283,11 @@ export async function cleanupReplacedPluginHostRegistry(params: {
cfg: OpenClawConfig;
previousRegistry?: PluginRegistry | null;
nextRegistry?: PluginRegistry | null;
shouldCleanup?: () => boolean;
}): Promise<PluginHostCleanupResult> {
const previousRegistry = params.previousRegistry;
if (!previousRegistry || previousRegistry === params.nextRegistry) {
const shouldCleanup = params.shouldCleanup ?? (() => true);
if (!previousRegistry || previousRegistry === params.nextRegistry || !shouldCleanup()) {
return { cleanupCount: 0, failures: [] };
}
const nextPluginIds = params.nextRegistry
@@ -240,6 +300,9 @@ export async function cleanupReplacedPluginHostRegistry(params: {
const failures: PluginHostCleanupFailure[] = [];
let cleanupCount = 0;
for (const pluginId of previousPluginIds) {
if (!shouldCleanup()) {
break;
}
const restarted = nextPluginIds.has(pluginId);
const result = await runPluginHostCleanup({
cfg: params.cfg,
@@ -249,6 +312,7 @@ export async function cleanupReplacedPluginHostRegistry(params: {
preserveSchedulerJobIds: restarted
? collectSchedulerJobIds(params.nextRegistry, pluginId)
: undefined,
shouldCleanup,
});
cleanupCount += result.cleanupCount;
failures.push(...result.failures);

View File

@@ -2,6 +2,7 @@ import type { AgentEventPayload } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js";
import {
isPluginJsonValue,
type PluginHostCleanupReason,
@@ -29,10 +30,12 @@ type PluginHostRuntimeState = {
nextSchedulerJobGeneration: number;
pendingAgentEventHandlersByRunId: Map<string, Set<Promise<void>>>;
closedRunIds: Set<string>;
terminalEventCleanupExpiredRunIds: Set<string>;
};
const PLUGIN_HOST_RUNTIME_STATE_KEY = Symbol.for("openclaw.pluginHostRuntimeState");
const CLOSED_RUN_IDS_MAX = 512;
export const PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS = 5_000;
const log = createSubsystemLogger("plugins/host-hooks");
function getPluginHostRuntimeState(): PluginHostRuntimeState {
@@ -42,6 +45,7 @@ function getPluginHostRuntimeState(): PluginHostRuntimeState {
nextSchedulerJobGeneration: 1,
pendingAgentEventHandlersByRunId: new Map(),
closedRunIds: new Set(),
terminalEventCleanupExpiredRunIds: new Set(),
}));
}
@@ -70,6 +74,23 @@ function isPluginRunClosed(runId: string): boolean {
return getPluginHostRuntimeState().closedRunIds.has(runId);
}
function markTerminalEventCleanupExpired(runId: string): void {
const state = getPluginHostRuntimeState();
state.terminalEventCleanupExpiredRunIds.delete(runId);
state.terminalEventCleanupExpiredRunIds.add(runId);
while (state.terminalEventCleanupExpiredRunIds.size > CLOSED_RUN_IDS_MAX) {
const oldest = state.terminalEventCleanupExpiredRunIds.values().next().value;
if (oldest === undefined) {
break;
}
state.terminalEventCleanupExpiredRunIds.delete(oldest);
}
}
function isTerminalEventCleanupExpired(runId: string): boolean {
return getPluginHostRuntimeState().terminalEventCleanupExpiredRunIds.has(runId);
}
function trackAgentEventHandler(runId: string, pending: Promise<void>): void {
const state = getPluginHostRuntimeState();
const handlers = state.pendingAgentEventHandlersByRunId.get(runId) ?? new Set();
@@ -77,12 +98,53 @@ function trackAgentEventHandler(runId: string, pending: Promise<void>): void {
state.pendingAgentEventHandlersByRunId.set(runId, handlers);
void pending.finally(() => {
handlers.delete(pending);
if (handlers.size === 0) {
if (
handlers.size === 0 &&
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(runId) === handlers
) {
state.pendingAgentEventHandlersByRunId.delete(runId);
}
});
}
async function waitForLiveTerminalEventHandlers(runId: string): Promise<"settled"> {
for (;;) {
const pendingHandlers = getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(runId);
if (!pendingHandlers || pendingHandlers.size === 0) {
return "settled";
}
await Promise.allSettled(pendingHandlers);
}
}
function waitForTerminalEventHandlers(params: { runId: string }): Promise<void> {
const { runId } = params;
let timeout: NodeJS.Timeout | undefined;
const settled = waitForLiveTerminalEventHandlers(runId);
// Promise.race bounds the host wait; JavaScript cannot cancel the plugin
// promises themselves, so timeout also marks the run expired to block late
// run-context resurrection by handlers that eventually settle.
const timedOut = new Promise<"timeout">((resolve) => {
timeout = setTimeout(() => {
markTerminalEventCleanupExpired(runId);
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.delete(runId);
log.warn(
`plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; clearing run context without waiting for them to settle`,
);
resolve("timeout");
}, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
});
if (timeout) {
timeout.unref?.();
}
return Promise.race([settled, timedOut]).then(() => {
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}
});
}
function getPluginRunContextNamespaces(params: {
runId: string;
pluginId: string;
@@ -108,13 +170,14 @@ function getPluginRunContextNamespaces(params: {
export function setPluginRunContext(params: {
pluginId: string;
patch: PluginRunContextPatch;
allowClosedRun?: boolean;
}): boolean {
const runId = normalizeOptionalString(params.patch.runId);
const namespace = normalizeNamespace(params.patch.namespace);
if (!runId || !namespace) {
return false;
}
if (isPluginRunClosed(runId)) {
if (!params.allowClosedRun && isPluginRunClosed(runId)) {
return false;
}
// Only an explicit `unset: true` deletes the run-context entry — silently
@@ -230,6 +293,7 @@ export function dispatchPluginAgentEventSubscriptions(params: {
}): void {
const subscriptions = params.registry?.agentEventSubscriptions ?? [];
const pendingHandlers: Promise<void>[] = [];
const isTerminalEvent = isTerminalAgentRunEvent(params.event);
for (const registration of subscriptions) {
const streams = registration.subscription.streams;
if (streams && streams.length > 0 && !streams.includes(params.event.stream)) {
@@ -237,12 +301,17 @@ export function dispatchPluginAgentEventSubscriptions(params: {
}
const pluginId = registration.pluginId;
const runId = params.event.runId;
let handlerActive = true;
const ctx = {
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Run-context JSON reads are caller-typed by namespace.
getRunContext: <T extends PluginJsonValue = PluginJsonValue>(namespace: string) =>
getPluginRunContext<T>({ pluginId, get: { runId, namespace } }),
setRunContext: (namespace: string, value: PluginJsonValue) => {
setPluginRunContext({ pluginId, patch: { runId, namespace, value } });
setPluginRunContext({
pluginId,
patch: { runId, namespace, value },
allowClosedRun: isTerminalEvent && handlerActive && !isTerminalEventCleanupExpired(runId),
});
},
clearRunContext: (namespace?: string) => {
clearPluginRunContext({ pluginId, runId, namespace });
@@ -251,16 +320,21 @@ export function dispatchPluginAgentEventSubscriptions(params: {
try {
const pending = Promise.resolve(
registration.subscription.handle(structuredClone(params.event), ctx),
).catch((error) => {
logAgentEventSubscriptionFailure({
pluginId,
subscriptionId: registration.subscription.id,
error,
)
.catch((error) => {
logAgentEventSubscriptionFailure({
pluginId,
subscriptionId: registration.subscription.id,
error,
});
})
.finally(() => {
handlerActive = false;
});
});
trackAgentEventHandler(runId, pending);
pendingHandlers.push(pending);
} catch (error) {
handlerActive = false;
logAgentEventSubscriptionFailure({
pluginId,
subscriptionId: registration.subscription.id,
@@ -268,12 +342,11 @@ export function dispatchPluginAgentEventSubscriptions(params: {
});
}
}
if (isTerminalAgentRunEvent(params.event)) {
if (isTerminalEvent) {
markPluginRunClosed(params.event.runId);
const pendingForRun =
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ??
new Set(pendingHandlers);
void Promise.allSettled(pendingForRun).then(() => {
void waitForTerminalEventHandlers({
runId: params.event.runId,
}).then(() => {
clearPluginRunContext({ runId: params.event.runId });
});
}
@@ -360,6 +433,10 @@ export function getPluginSessionSchedulerJobGeneration(params: {
return record.generation;
}
export function makePluginSessionSchedulerJobKey(pluginId: string, jobId: string): string {
return JSON.stringify([pluginId, jobId]);
}
export async function cleanupPluginSessionSchedulerJobs(params: {
pluginId?: string;
reason: PluginHostCleanupReason;
@@ -371,11 +448,20 @@ export async function cleanupPluginSessionSchedulerJobs(params: {
generation?: number;
}[];
preserveJobIds?: ReadonlySet<string>;
excludeJobKeys?: ReadonlySet<string>;
shouldCleanup?: () => boolean;
}): Promise<Array<{ pluginId: string; hookId: string; error: unknown }>> {
const state = getPluginHostRuntimeState();
const failures: Array<{ pluginId: string; hookId: string; error: unknown }> = [];
const shouldCleanup = params.shouldCleanup ?? (() => true);
if (!shouldCleanup()) {
return failures;
}
if (params.records) {
for (const record of params.records) {
if (!shouldCleanup()) {
return failures;
}
if (params.pluginId && record.pluginId !== params.pluginId) {
continue;
}
@@ -406,29 +492,37 @@ export async function cleanupPluginSessionSchedulerJobs(params: {
continue;
}
const preserveJob = params.preserveJobIds?.has(jobId) ?? false;
if (
preserveJob &&
(record.generation === undefined || liveGeneration === record.generation)
) {
if (preserveJob) {
// preserveJobIds means "do not run cleanup at all" — even across
// generation mismatches. The generation-matched deletion below would
// otherwise still call the OLD cleanup callback, which can remove
// external scheduled jobs (e.g. cron.remove) and break the live
// newer-generation registration that took over this jobId.
continue;
}
// A newer generation may already own this id. The old cleanup callback can
// still release plugin-owned resources, while deletion below is generation
// matched so it cannot remove the newer live record.
const hookId = `scheduler:${jobId}`;
try {
await record.job.cleanup?.({
reason: params.reason,
sessionKey,
jobId,
});
await withPluginHostCleanupTimeout(hookId, () =>
record.job.cleanup?.({
reason: params.reason,
sessionKey,
jobId,
}),
);
} catch (error) {
failures.push({
pluginId: record.pluginId,
hookId: `scheduler:${jobId}`,
hookId,
error,
});
continue;
}
if (!shouldCleanup()) {
continue;
}
deletePluginSessionSchedulerJob({
pluginId: record.pluginId,
jobId,
@@ -440,28 +534,46 @@ export async function cleanupPluginSessionSchedulerJobs(params: {
}
const pluginIds = params.pluginId ? [params.pluginId] : [...state.schedulerJobsByPlugin.keys()];
for (const pluginId of pluginIds) {
if (!shouldCleanup()) {
return failures;
}
const jobs = state.schedulerJobsByPlugin.get(pluginId);
if (!jobs) {
continue;
}
for (const [jobId, record] of jobs.entries()) {
if (!shouldCleanup()) {
return failures;
}
if (params.sessionKey && record.job.sessionKey !== params.sessionKey) {
continue;
}
if (params.excludeJobKeys?.has(makePluginSessionSchedulerJobKey(pluginId, jobId))) {
continue;
}
if (params.preserveJobIds?.has(jobId)) {
continue;
}
const hookId = `scheduler:${jobId}`;
try {
await record.job.cleanup?.({
reason: params.reason,
sessionKey: record.job.sessionKey,
jobId,
});
await withPluginHostCleanupTimeout(hookId, () =>
record.job.cleanup?.({
reason: params.reason,
sessionKey: record.job.sessionKey,
jobId,
}),
);
} catch (error) {
failures.push({
pluginId,
hookId: `scheduler:${jobId}`,
hookId,
error,
});
continue;
}
if (!shouldCleanup()) {
continue;
}
jobs.delete(jobId);
}
if (jobs.size === 0) {
@@ -480,6 +592,7 @@ export function clearPluginHostRuntimeState(params?: { pluginId?: string; runId?
state.schedulerJobsByPlugin.clear();
state.pendingAgentEventHandlersByRunId.clear();
state.closedRunIds.clear();
state.terminalEventCleanupExpiredRunIds.clear();
}
}

View File

@@ -0,0 +1,19 @@
import type { PluginRegistry } from "./registry-types.js";
const retiredRegistries = new WeakSet<PluginRegistry>();
export function markPluginRegistryRetired(registry: PluginRegistry | null | undefined): void {
if (registry) {
retiredRegistries.add(registry);
}
}
export function markPluginRegistryActive(registry: PluginRegistry | null | undefined): void {
if (registry) {
retiredRegistries.delete(registry);
}
}
export function isPluginRegistryRetired(registry: PluginRegistry): boolean {
return retiredRegistries.has(registry);
}

View File

@@ -99,6 +99,7 @@ import {
} from "./memory-state.js";
import { normalizeRegisteredProvider } from "./provider-validation.js";
import { createEmptyPluginRegistry } from "./registry-empty.js";
import { isPluginRegistryRetired } from "./registry-lifecycle.js";
import type {
PluginCliBackendRegistration,
PluginCliRegistration,
@@ -303,6 +304,9 @@ const activePluginHookRegistrations = resolveGlobalSingleton<
type HookRegistration = { event: string; handler: Parameters<typeof registerInternalHook>[1] };
type HookRollbackEntry = { name: string; previousRegistrations: HookRegistration[] };
type PluginSideEffectGuard = {
active: boolean;
};
type PluginRegistrationCapabilities = {
/** Broad registry writes that discovery and live activation both need. */
@@ -338,6 +342,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
const coreGatewayMethods = new Set(coreGatewayMethodNames);
const pluginHookRollback = new Map<string, HookRollbackEntry[]>();
const pluginsWithChannelRegistrationConflict = new Set<string>();
const pluginSideEffectGuards = new Map<string, Set<PluginSideEffectGuard>>();
const pushDiagnostic = (diag: PluginDiagnostic) => {
registry.diagnostics.push(diag);
@@ -354,6 +359,25 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
return value;
};
const createPluginSideEffectGuard = (pluginId: string): PluginSideEffectGuard => {
const guard = { active: true };
const guards = pluginSideEffectGuards.get(pluginId) ?? new Set<PluginSideEffectGuard>();
guards.add(guard);
pluginSideEffectGuards.set(pluginId, guards);
return guard;
};
const deactivatePluginSideEffectGuards = (pluginId: string): void => {
const guards = pluginSideEffectGuards.get(pluginId);
if (!guards) {
return;
}
for (const guard of guards) {
guard.active = false;
}
pluginSideEffectGuards.delete(pluginId);
};
const registerCodexAppServerExtensionFactory = (
record: PluginRecord,
factory: Parameters<OpenClawPluginApi["registerCodexAppServerExtensionFactory"]>[0],
@@ -2169,6 +2193,18 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
const registrationMode = params.registrationMode ?? "full";
const registrationCapabilities = resolvePluginRegistrationCapabilities(registrationMode);
pluginRuntimeRecordById.set(record.id, record);
const sideEffectGuard = createPluginSideEffectGuard(record.id);
const isLoadedRecordInRegistry = () =>
registry.plugins.some((plugin) => plugin.id === record.id && plugin.status === "loaded");
const isActivatingLoadedRecord = () =>
registryParams.activateGlobalSideEffects !== false &&
record.enabled &&
record.status === "loaded" &&
!registry.plugins.some((plugin) => plugin.id === record.id);
const shouldCommitWorkflowSideEffect = () =>
sideEffectGuard.active &&
!isPluginRegistryRetired(registry) &&
(isLoadedRecordInRegistry() || isActivatingLoadedRecord());
return buildPluginApi({
id: record.id,
name: record.name,
@@ -2378,14 +2414,25 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
registerRuntimeLifecycle: (lifecycle) => registerRuntimeLifecycle(record, lifecycle),
registerAgentEventSubscription: (subscription) =>
registerAgentEventSubscription(record, subscription),
setRunContext: (patch) => setPluginRunContext({ pluginId: record.id, patch }),
setRunContext: (patch) =>
registryParams.activateGlobalSideEffects !== false &&
shouldCommitWorkflowSideEffect()
? setPluginRunContext({ pluginId: record.id, patch })
: false,
getRunContext: (get) => getPluginRunContext({ pluginId: record.id, get }),
clearRunContext: (params) =>
clearRunContext: (params) => {
if (
registryParams.activateGlobalSideEffects === false ||
!shouldCommitWorkflowSideEffect()
) {
return;
}
clearPluginRunContext({
pluginId: record.id,
runId: params.runId,
namespace: params.namespace,
}),
});
},
registerSessionSchedulerJob: (job) => registerSessionSchedulerJob(record, job),
registerMemoryCapability: (capability) => {
if (!hasKind(record.kind, "memory")) {
@@ -2548,6 +2595,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
};
const rollbackPluginGlobalSideEffects = (pluginId: string) => {
deactivatePluginSideEffectGuards(pluginId);
if (registryParams.activateGlobalSideEffects === false) {
return;
}

View File

@@ -68,6 +68,15 @@ function expectRouteRegistryState(params: { setup: () => void; assert: () => voi
params.assert();
}
async function waitForCleanupSignal(signal: Promise<void>, label: string): Promise<void> {
await Promise.race([
signal,
new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`Timed out waiting for ${label}`)), 500);
}),
]);
}
describe("plugin runtime route registry", () => {
afterEach(() => {
releasePinnedPluginHttpRouteRegistry();
@@ -209,6 +218,78 @@ describe("setActivePluginRegistry", () => {
expect(listImportedRuntimePluginIds()).toEqual(["runtime-plugin"]);
});
it.each([
{
name: "same active registry is refreshed",
refresh(nextRegistry: ReturnType<typeof createEmptyPluginRegistry>) {
setActivePluginRegistry(nextRegistry);
},
},
{
name: "active registry advances again",
refresh() {
setActivePluginRegistry(createEmptyPluginRegistry());
},
},
] as const)("continues cleanup when the $name", async ({ refresh }) => {
let releaseFirstCleanup: (() => void) | undefined;
let markFirstCleanupStarted!: () => void;
let markSecondCleanupCalled!: () => void;
const firstCleanupStarted = new Promise<void>((resolve) => {
markFirstCleanupStarted = resolve;
});
const secondCleanupCalled = new Promise<void>((resolve) => {
markSecondCleanupCalled = resolve;
});
const previous = createEmptyPluginRegistry();
previous.plugins.push(
createPluginRecord({
id: "cleanup-refresh-race",
name: "Cleanup Refresh Race",
status: "loaded",
}),
);
previous.runtimeLifecycles = [
{
pluginId: "cleanup-refresh-race",
pluginName: "Cleanup Refresh Race",
lifecycle: {
id: "first-cleanup",
async cleanup() {
markFirstCleanupStarted();
await new Promise<void>((resolve) => {
releaseFirstCleanup = resolve;
});
},
},
source: "/virtual/cleanup-refresh-race/index.ts",
rootDir: "/virtual/cleanup-refresh-race",
},
{
pluginId: "cleanup-refresh-race",
pluginName: "Cleanup Refresh Race",
lifecycle: {
id: "second-cleanup",
cleanup() {
markSecondCleanupCalled();
},
},
source: "/virtual/cleanup-refresh-race/index.ts",
rootDir: "/virtual/cleanup-refresh-race",
},
];
const next = createEmptyPluginRegistry();
setActivePluginRegistry(previous);
setActivePluginRegistry(next);
await waitForCleanupSignal(firstCleanupStarted, "first cleanup start");
refresh(next);
releaseFirstCleanup?.();
await waitForCleanupSignal(secondCleanupCalled, "second cleanup");
});
it("includes plugin ids imported before registration failed", () => {
recordImportedPluginId("broken-plugin");

View File

@@ -5,6 +5,7 @@ import {
dispatchPluginAgentEventSubscriptions,
} from "./host-hook-runtime.js";
import { createEmptyPluginRegistry } from "./registry-empty.js";
import { markPluginRegistryActive, markPluginRegistryRetired } from "./registry-lifecycle.js";
import type { PluginRegistry } from "./registry-types.js";
import {
PLUGIN_REGISTRY_STATE,
@@ -64,16 +65,23 @@ function registryHasPluginHostCleanupWork(registry: PluginRegistry | null): bool
async function cleanupPreviousPluginHostRegistry(params: {
previousRegistry: PluginRegistry;
nextRegistry: PluginRegistry;
}): Promise<void> {
const [{ getRuntimeConfig }, { cleanupReplacedPluginHostRegistry }] = await Promise.all([
import("../config/config.js"),
import("./host-hook-cleanup.js"),
]);
const nextRegistry = asPluginRegistry(state.activeRegistry);
if (!nextRegistry || nextRegistry === params.previousRegistry) {
return;
}
// Async cleanup must not clear state for a registry that has been restored
// active, but later swaps should not strand cleanup for the retiring registry.
const shouldCleanup = () => state.activeRegistry !== params.previousRegistry;
await cleanupReplacedPluginHostRegistry({
cfg: getRuntimeConfig(),
previousRegistry: params.previousRegistry,
nextRegistry: params.nextRegistry,
nextRegistry,
shouldCleanup,
});
}
@@ -129,6 +137,10 @@ export function setActivePluginRegistry(
workspaceDir?: string,
) {
const previousRegistry = asPluginRegistry(state.activeRegistry);
if (previousRegistry && previousRegistry !== registry) {
markPluginRegistryRetired(previousRegistry);
}
markPluginRegistryActive(registry);
state.activeRegistry = registry;
state.activeVersion += 1;
syncTrackedSurface(state.httpRoute, registry, true);
@@ -146,7 +158,6 @@ export function setActivePluginRegistry(
}
void cleanupPreviousPluginHostRegistry({
previousRegistry,
nextRegistry: registry,
}).catch((error) => {
log.warn(`plugin host registry cleanup failed: ${String(error)}`);
});