fix(plugin-sdk): harden run context cleanup lifecycle

This commit is contained in:
Eva
2026-05-01 17:40:09 +07:00
committed by Josh Lehman
parent 042d7b8823
commit cb097004eb
11 changed files with 1030 additions and 85 deletions

View File

@@ -264,6 +264,22 @@ the harness for one more model pass before finalization, `{ action:
Codex native `Stop` hooks are relayed into this hook as OpenClaw
`before_agent_finalize` decisions.
When returning `action: "revise"`, plugins can include `retry` metadata to make
the extra model pass bounded and replay-safe:
```typescript
type BeforeAgentFinalizeRetry = {
instruction: string;
idempotencyKey?: string;
maxAttempts?: number;
};
```
`instruction` is appended to the revision reason sent to the harness.
`idempotencyKey` lets the host count retries for the same plugin request across
equivalent finalize decisions, and `maxAttempts` caps how many extra passes the
host will allow before continuing with the natural final answer.
Non-bundled plugins that need `llm_input`, `llm_output`,
`before_agent_finalize`, or `agent_end` must set:

View File

@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
clearAgentHarnessFinalizeRetryBudget,
runAgentHarnessAgentEndHook,
runAgentHarnessBeforeAgentFinalizeHook,
runAgentHarnessLlmInputHook,
@@ -10,7 +11,24 @@ const legacyHookRunner = {
hasHooks: () => true,
};
const EVENT = {
runId: "run-1",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
turnId: "turn-1",
provider: "codex",
model: "gpt-5.4",
cwd: "/repo",
transcriptPath: "/tmp/session.jsonl",
stopHookActive: false,
lastAssistantMessage: "done",
};
describe("agent harness lifecycle hook helpers", () => {
afterEach(() => {
clearAgentHarnessFinalizeRetryBudget();
});
it("ignores legacy hook runners that advertise llm_input without a runner method", () => {
expect(() =>
runAgentHarnessLlmInputHook({
@@ -50,4 +68,123 @@ describe("agent harness lifecycle hook helpers", () => {
} as never),
).resolves.toEqual({ action: "continue" });
});
it("clears finalize retry budgets by run id", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "revise once",
idempotencyKey: "stable",
maxAttempts: 1,
},
}),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise once" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
clearAgentHarnessFinalizeRetryBudget({ runId: "run-1" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise once" });
});
it("does not clear finalize retry budgets for runs that only share a prefix", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "revise child once",
idempotencyKey: "stable",
maxAttempts: 1,
},
}),
};
const childEvent = {
...EVENT,
runId: "run:child",
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: childEvent,
ctx: { runId: "run:child", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise child once" });
clearAgentHarnessFinalizeRetryBudget({ runId: "run" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: childEvent,
ctx: { runId: "run:child", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
});
it("keys finalize retry budgets by context run id when the event omits run id", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
retry: {
instruction: "revise from context run",
idempotencyKey: "stable",
maxAttempts: 1,
},
}),
};
const eventWithoutRunId = {
...EVENT,
runId: undefined,
sessionId: "shared-session",
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: eventWithoutRunId,
ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise from context run" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: eventWithoutRunId,
ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "continue" });
clearAgentHarnessFinalizeRetryBudget({ runId: "run-from-context" });
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: eventWithoutRunId,
ctx: { runId: "run-from-context", sessionKey: "agent:main:shared-session" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({ action: "revise", reason: "revise from context run" });
});
});

View File

@@ -7,11 +7,53 @@ import type {
PluginHookLlmInputEvent,
PluginHookLlmOutputEvent,
} from "../../plugins/hook-types.js";
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
import { buildAgentHookContext, type AgentHarnessHookContext } from "./hook-context.js";
const log = createSubsystemLogger("agents/harness");
const FINALIZE_RETRY_BUDGET_KEY = Symbol.for("openclaw.pluginFinalizeRetryBudget");
const FINALIZE_RETRY_BUDGET_MAX_ENTRIES = 2048;
type AgentHarnessHookRunner = ReturnType<typeof getGlobalHookRunner>;
type FinalizeRetryBudget = Map<string, Map<string, number>>;
function getFinalizeRetryBudget(): FinalizeRetryBudget {
return resolveGlobalSingleton<FinalizeRetryBudget>(FINALIZE_RETRY_BUDGET_KEY, () => new Map());
}
function countFinalizeRetryBudgetEntries(budget: FinalizeRetryBudget): number {
let count = 0;
for (const runBudget of budget.values()) {
count += runBudget.size;
}
return count;
}
function pruneFinalizeRetryBudget(budget: FinalizeRetryBudget): void {
while (countFinalizeRetryBudgetEntries(budget) > FINALIZE_RETRY_BUDGET_MAX_ENTRIES) {
const oldestRunId = budget.keys().next().value;
if (oldestRunId === undefined) {
return;
}
const oldestRunBudget = budget.get(oldestRunId);
const oldestRetryKey = oldestRunBudget?.keys().next().value;
if (oldestRunBudget && oldestRetryKey !== undefined) {
oldestRunBudget.delete(oldestRetryKey);
}
if (!oldestRunBudget || oldestRunBudget.size === 0) {
budget.delete(oldestRunId);
}
}
}
export function clearAgentHarnessFinalizeRetryBudget(params?: { runId?: string }): void {
const budget = getFinalizeRetryBudget();
if (!params?.runId) {
budget.clear();
return;
}
budget.delete(params.runId);
}
export function runAgentHarnessLlmInputHook(params: {
event: PluginHookLlmInputEvent;
@@ -73,8 +115,16 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: {
return { action: "continue" };
}
try {
const eventForNormalization: PluginHookBeforeAgentFinalizeEvent = {
...params.event,
runId: params.event.runId ?? params.ctx.runId,
};
return normalizeBeforeAgentFinalizeResult(
await hookRunner.runBeforeAgentFinalize(params.event, buildAgentHookContext(params.ctx)),
await hookRunner.runBeforeAgentFinalize(
eventForNormalization,
buildAgentHookContext(params.ctx),
),
eventForNormalization,
);
} catch (error) {
log.warn(`before_agent_finalize hook failed: ${String(error)}`);
@@ -84,6 +134,7 @@ export async function runAgentHarnessBeforeAgentFinalizeHook(params: {
function normalizeBeforeAgentFinalizeResult(
result: PluginHookBeforeAgentFinalizeResult | undefined,
event?: PluginHookBeforeAgentFinalizeEvent,
): AgentHarnessBeforeAgentFinalizeOutcome {
if (result?.action === "finalize") {
return result.reason?.trim()
@@ -91,6 +142,27 @@ function normalizeBeforeAgentFinalizeResult(
: { action: "finalize" };
}
if (result?.action === "revise") {
const retryInstruction = result.retry?.instruction?.trim();
if (retryInstruction) {
const maxAttempts =
typeof result.retry?.maxAttempts === "number" && Number.isFinite(result.retry.maxAttempts)
? Math.max(1, Math.floor(result.retry.maxAttempts))
: 1;
const retryRunId = event?.runId ?? event?.sessionId ?? "unknown-run";
const retryKey = result.retry?.idempotencyKey?.trim() || retryInstruction.slice(0, 160);
const budget = getFinalizeRetryBudget();
const runBudget = budget.get(retryRunId) ?? new Map<string, number>();
const nextCount = (runBudget.get(retryKey) ?? 0) + 1;
runBudget.delete(retryKey);
runBudget.set(retryKey, nextCount);
budget.delete(retryRunId);
budget.set(retryRunId, runBudget);
pruneFinalizeRetryBudget(budget);
if (nextCount > maxAttempts) {
return { action: "continue" };
}
return { action: "revise", reason: retryInstruction };
}
const reason = result.reason?.trim();
return reason ? { action: "revise", reason } : { action: "continue" };
}

View File

@@ -0,0 +1,509 @@
import fs from "node:fs/promises";
import path from "node:path";
import {
createPluginRegistryFixture,
registerTestPlugin,
} from "openclaw/plugin-sdk/plugin-test-contracts";
import { afterEach, describe, expect, it, vi } from "vitest";
import { loadSessionStore, updateSessionStore } from "../../config/sessions.js";
import { withTempConfig } from "../../gateway/test-temp-config.js";
import { emitAgentEvent, resetAgentEventsForTest } from "../../infra/agent-events.js";
import { resolvePreferredOpenClawTmpDir } from "../../infra/tmp-openclaw-dir.js";
import { PLUGIN_HOST_CLEANUP_TIMEOUT_MS } from "../host-hook-cleanup-timeout.js";
import { runPluginHostCleanup } from "../host-hook-cleanup.js";
import {
clearPluginHostRuntimeState,
getPluginRunContext,
listPluginSessionSchedulerJobs,
PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS,
dispatchPluginAgentEventSubscriptions,
registerPluginSessionSchedulerJob,
setPluginRunContext,
} from "../host-hook-runtime.js";
import { createEmptyPluginRegistry } from "../registry-empty.js";
import { setActivePluginRegistry } from "../runtime.js";
import { createPluginRecord } from "../status.test-helpers.js";
async function waitForPluginEventHandlers(): Promise<void> {
await new Promise<void>((resolve) => {
setTimeout(resolve, 0);
});
}
describe("plugin run context lifecycle", () => {
afterEach(() => {
vi.useRealTimers();
setActivePluginRegistry(createEmptyPluginRegistry());
clearPluginHostRuntimeState();
resetAgentEventsForTest();
});
it("does not let delayed non-terminal subscriptions resurrect closed run context", async () => {
let releaseToolHandler: (() => void) | undefined;
let delayedToolHandlerSawContext: unknown;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "delayed-subscription",
name: "Delayed Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "delayed",
streams: ["tool"],
async handle(_event, ctx) {
ctx.setRunContext("before-terminal", { visible: true });
await new Promise<void>((resolve) => {
releaseToolHandler = resolve;
});
delayedToolHandlerSawContext = ctx.getRunContext("before-terminal");
ctx.setRunContext("late", { resurrected: true });
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-delayed-subscription",
stream: "tool",
data: { name: "tool" },
});
await Promise.resolve();
emitAgentEvent({
runId: "run-delayed-subscription",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
expect(
getPluginRunContext({
pluginId: "delayed-subscription",
get: { runId: "run-delayed-subscription", namespace: "before-terminal" },
}),
).toEqual({ visible: true });
releaseToolHandler?.();
await waitForPluginEventHandlers();
expect(delayedToolHandlerSawContext).toEqual({ visible: true });
expect(
getPluginRunContext({
pluginId: "delayed-subscription",
get: { runId: "run-delayed-subscription", namespace: "late" },
}),
).toBeUndefined();
});
it("preserves run context until async terminal event subscriptions settle", async () => {
let releaseTerminalHandler: (() => void) | undefined;
let terminalHandlerSawContext: unknown;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "async-terminal-subscription",
name: "Async Terminal Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "records",
streams: ["tool", "lifecycle"],
async handle(event, ctx) {
if (event.stream === "tool") {
ctx.setRunContext("seen", { runId: event.runId });
return;
}
if (event.data?.phase !== "end") {
return;
}
await new Promise<void>((resolve) => {
releaseTerminalHandler = resolve;
});
terminalHandlerSawContext = ctx.getRunContext("seen");
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-async-terminal",
stream: "tool",
data: { name: "tool" },
});
await Promise.resolve();
emitAgentEvent({
runId: "run-async-terminal",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
expect(
getPluginRunContext({
pluginId: "async-terminal-subscription",
get: { runId: "run-async-terminal", namespace: "seen" },
}),
).toEqual({ runId: "run-async-terminal" });
releaseTerminalHandler?.();
await waitForPluginEventHandlers();
expect(terminalHandlerSawContext).toEqual({ runId: "run-async-terminal" });
expect(
getPluginRunContext({
pluginId: "async-terminal-subscription",
get: { runId: "run-async-terminal", namespace: "seen" },
}),
).toBeUndefined();
});
it("keeps run context until slow terminal event subscriptions settle", async () => {
vi.useFakeTimers();
let releaseTerminalHandler: (() => void) | undefined;
let terminalHandlerSawContext: unknown;
let terminalHandlerWroteContext: unknown;
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "slow-terminal-subscription",
name: "Slow Terminal Subscription",
}),
register(api) {
api.registerAgentEventSubscription({
id: "slow",
streams: ["tool", "lifecycle"],
async handle(event, ctx) {
if (event.stream === "tool") {
ctx.setRunContext("seen", { runId: event.runId });
return;
}
if (event.data?.phase === "end") {
await new Promise<void>((resolve) => {
releaseTerminalHandler = resolve;
});
terminalHandlerSawContext = ctx.getRunContext("seen");
ctx.setRunContext("terminal", { completed: true });
terminalHandlerWroteContext = ctx.getRunContext("terminal");
}
},
});
},
});
setActivePluginRegistry(registry.registry);
emitAgentEvent({
runId: "run-slow-terminal",
stream: "tool",
data: { name: "tool" },
});
await Promise.resolve();
emitAgentEvent({
runId: "run-slow-terminal",
stream: "lifecycle",
data: { phase: "end" },
});
await Promise.resolve();
await vi.advanceTimersByTimeAsync(PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "seen" },
}),
).toEqual({ runId: "run-slow-terminal" });
releaseTerminalHandler?.();
await vi.advanceTimersByTimeAsync(0);
expect(terminalHandlerSawContext).toEqual({ runId: "run-slow-terminal" });
expect(terminalHandlerWroteContext).toEqual({ completed: true });
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "seen" },
}),
).toBeUndefined();
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "terminal" },
}),
).toBeUndefined();
});
it("preserves scheduler jobs instead of invoking stale cleanup callbacks", async () => {
const cleanup = vi.fn();
registerPluginSessionSchedulerJob({
pluginId: "scheduler-plugin",
pluginName: "Scheduler Plugin",
job: {
id: "job-preserved",
sessionKey: "agent:main:main",
kind: "session-turn",
cleanup,
},
});
await expect(
runPluginHostCleanup({
reason: "disable",
pluginId: "scheduler-plugin",
preserveSchedulerJobIds: new Set(["job-preserved"]),
}),
).resolves.toMatchObject({ failures: [] });
expect(cleanup).not.toHaveBeenCalled();
expect(listPluginSessionSchedulerJobs("scheduler-plugin")).toHaveLength(1);
});
it("preserves plugin run context during restart cleanup", async () => {
const registry = createEmptyPluginRegistry();
expect(
setPluginRunContext({
pluginId: "restart-context-plugin",
patch: { runId: "run-restart", namespace: "state", value: { keep: true } },
}),
).toBe(true);
await expect(
runPluginHostCleanup({
registry,
pluginId: "restart-context-plugin",
reason: "restart",
}),
).resolves.toMatchObject({ failures: [] });
expect(
getPluginRunContext({
pluginId: "restart-context-plugin",
get: { runId: "run-restart", namespace: "state" },
}),
).toEqual({ keep: true });
await expect(
runPluginHostCleanup({
registry,
pluginId: "restart-context-plugin",
reason: "disable",
}),
).resolves.toMatchObject({ failures: [] });
expect(
getPluginRunContext({
pluginId: "restart-context-plugin",
get: { runId: "run-restart", namespace: "state" },
}),
).toBeUndefined();
});
it("preserves durable plugin session state during plugin restart cleanup", async () => {
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "restart-state-fixture",
name: "Restart State Fixture",
}),
register(api) {
api.registerSessionExtension({
namespace: "workflow",
description: "restart state test",
});
},
});
const stateDir = await fs.mkdtemp(
path.join(resolvePreferredOpenClawTmpDir(), "openclaw-run-context-restart-state-"),
);
const storePath = path.join(stateDir, "sessions.json");
const tempConfig = {
session: { store: storePath },
};
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
try {
process.env.OPENCLAW_STATE_DIR = stateDir;
await withTempConfig({
cfg: tempConfig,
run: async () => {
await updateSessionStore(storePath, (store) => {
store["agent:main:main"] = {
sessionId: "session-1",
updatedAt: Date.now(),
pluginExtensions: {
"restart-state-fixture": { workflow: { state: "waiting" } },
},
pluginNextTurnInjections: {
"restart-state-fixture": [
{
id: "resume",
pluginId: "restart-state-fixture",
text: "resume",
placement: "prepend_context",
createdAt: 1,
},
],
},
};
return undefined;
});
await expect(
runPluginHostCleanup({
cfg: tempConfig,
registry: registry.registry,
pluginId: "restart-state-fixture",
reason: "restart",
}),
).resolves.toMatchObject({ failures: [] });
const stored = loadSessionStore(storePath, { skipCache: true });
expect(stored["agent:main:main"]?.pluginExtensions).toEqual({
"restart-state-fixture": { workflow: { state: "waiting" } },
});
expect(stored["agent:main:main"]?.pluginNextTurnInjections).toEqual({
"restart-state-fixture": [
{
id: "resume",
pluginId: "restart-state-fixture",
text: "resume",
placement: "prepend_context",
createdAt: 1,
},
],
});
},
});
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
}
});
it("rejects hung cleanup hooks with a bounded timeout", async () => {
vi.useFakeTimers();
const cleanup = vi.fn(async () => {
await new Promise(() => undefined);
});
registerPluginSessionSchedulerJob({
pluginId: "hung-cleanup-plugin",
pluginName: "Hung Cleanup Plugin",
job: {
id: "job-hung",
sessionKey: "agent:main:main",
kind: "session-turn",
cleanup,
},
});
const resultPromise = runPluginHostCleanup({
reason: "disable",
pluginId: "hung-cleanup-plugin",
});
await vi.advanceTimersByTimeAsync(0);
expect(cleanup).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(5_000);
await expect(resultPromise).resolves.toMatchObject({
failures: [
{
pluginId: "hung-cleanup-plugin",
hookId: "scheduler:job-hung",
},
],
});
});
it("bounds session, runtime, and scheduler cleanup callbacks so cleanup keeps moving", async () => {
vi.useFakeTimers();
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "hanging-cleanup-fixture",
name: "Hanging Cleanup Fixture",
}),
register(api) {
api.registerSessionExtension({
namespace: "state",
description: "hangs during cleanup",
cleanup: () => new Promise(() => undefined),
});
api.registerRuntimeLifecycle({
id: "runtime-cleanup",
cleanup: () => new Promise(() => undefined),
});
api.registerSessionSchedulerJob({
id: "scheduler-cleanup",
sessionKey: "agent:main:main",
kind: "monitor",
cleanup: () => new Promise(() => undefined),
});
},
});
const cleanupPromise = runPluginHostCleanup({
cfg: config,
registry: registry.registry,
pluginId: "hanging-cleanup-fixture",
reason: "delete",
});
for (let index = 0; index < 3; index += 1) {
await vi.advanceTimersByTimeAsync(PLUGIN_HOST_CLEANUP_TIMEOUT_MS + 1);
}
await expect(cleanupPromise).resolves.toMatchObject({
failures: [
expect.objectContaining({
pluginId: "hanging-cleanup-fixture",
hookId: "session:state",
}),
expect.objectContaining({
pluginId: "hanging-cleanup-fixture",
hookId: "runtime:runtime-cleanup",
}),
expect.objectContaining({
pluginId: "hanging-cleanup-fixture",
hookId: "scheduler:scheduler-cleanup",
}),
],
});
});
it("blocks setting run context after a run is closed", () => {
expect(
setPluginRunContext({
pluginId: "closed-run-plugin",
patch: { runId: "run-closed", namespace: "state", value: { before: true } },
}),
).toBe(true);
dispatchPluginAgentEventSubscriptions({
registry: createEmptyPluginRegistry(),
event: {
runId: "run-closed",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
},
});
expect(
setPluginRunContext({
pluginId: "closed-run-plugin",
patch: { runId: "run-closed", namespace: "state", value: { after: true } },
}),
).toBe(false);
});
});

View File

@@ -299,6 +299,11 @@ export type PluginHookBeforeAgentFinalizeResult = {
*/
action?: "continue" | "revise" | "finalize";
reason?: string;
retry?: {
instruction: string;
idempotencyKey?: string;
maxAttempts?: number;
};
};
export type PluginHookBeforeCompactionEvent = {

View File

@@ -60,6 +60,43 @@ describe("before_agent_finalize hook runner", () => {
});
});
it("skips empty retry instructions when merging revise decisions", async () => {
const runner = createHookRunner(
createMockPluginRegistry([
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "needs a retry but forgot the instruction",
retry: { instruction: " ", idempotencyKey: "empty-retry" },
}),
},
{
hookName: "before_agent_finalize",
handler: vi.fn().mockResolvedValue({
action: "revise",
reason: "rerun the focused tests",
retry: {
instruction: " rerun the focused tests ",
idempotencyKey: "valid-retry",
maxAttempts: 1,
},
}),
},
]),
);
await expect(runner.runBeforeAgentFinalize(EVENT, TEST_PLUGIN_AGENT_CTX)).resolves.toEqual({
action: "revise",
reason: "needs a retry but forgot the instruction\n\nrerun the focused tests",
retry: {
instruction: "rerun the focused tests",
idempotencyKey: "valid-retry",
maxAttempts: 1,
},
});
});
it("lets finalize override earlier revise decisions", async () => {
const runner = createHookRunner(
createMockPluginRegistry([

View File

@@ -319,6 +319,18 @@ export function createHookRunner(
acc: PluginHookBeforeAgentFinalizeResult | undefined,
next: PluginHookBeforeAgentFinalizeResult,
): PluginHookBeforeAgentFinalizeResult => {
const normalizeRetry = (
retry: PluginHookBeforeAgentFinalizeResult["retry"] | undefined,
): PluginHookBeforeAgentFinalizeResult["retry"] | undefined => {
const instruction = retry?.instruction.trim();
if (!instruction) {
return undefined;
}
return {
...retry,
instruction,
};
};
if (acc?.action === "finalize") {
return acc;
}
@@ -326,19 +338,26 @@ export function createHookRunner(
return { action: "finalize", reason: next.reason };
}
if (acc?.action === "revise" && next.action === "revise") {
const retry = normalizeRetry(acc.retry) ?? normalizeRetry(next.retry);
return {
action: "revise",
reason: concatOptionalTextSegments({
left: acc.reason,
right: next.reason,
}),
...(retry ? { retry } : {}),
};
}
if (acc?.action === "revise") {
return acc;
}
if (next.action === "revise") {
return { action: "revise", reason: next.reason };
const retry = normalizeRetry(next.retry);
return {
action: "revise",
reason: next.reason,
...(retry ? { retry } : {}),
};
}
return next.action === "continue" ? { action: "continue", reason: next.reason } : (acc ?? next);
};

View File

@@ -0,0 +1,36 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
PLUGIN_HOST_CLEANUP_TIMEOUT_MS,
withPluginHostCleanupTimeout,
} from "./host-hook-cleanup-timeout.js";
describe("withPluginHostCleanupTimeout", () => {
afterEach(() => {
vi.restoreAllMocks();
});
it("unrefs cleanup timeout timers so pending cleanup does not keep the process alive", async () => {
const originalSetTimeout = globalThis.setTimeout;
const unref = vi.fn();
vi.spyOn(globalThis, "setTimeout").mockImplementation(((
callback: () => void,
timeout?: number,
) => {
const timer = originalSetTimeout(callback, timeout);
vi.spyOn(timer, "unref").mockImplementation(() => {
unref();
return timer;
});
return timer;
}) as typeof setTimeout);
await expect(withPluginHostCleanupTimeout("fast-cleanup", () => "ok")).resolves.toBe("ok");
expect(globalThis.setTimeout).toHaveBeenCalledWith(
expect.any(Function),
PLUGIN_HOST_CLEANUP_TIMEOUT_MS,
);
expect(unref).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,30 @@
export const PLUGIN_HOST_CLEANUP_TIMEOUT_MS = 5_000;
export class PluginHostCleanupTimeoutError extends Error {
constructor(hookId: string) {
super(`plugin host cleanup timed out: ${hookId}`);
this.name = "PluginHostCleanupTimeoutError";
}
}
export async function withPluginHostCleanupTimeout<T>(
hookId: string,
cleanup: () => T | Promise<T>,
): Promise<T> {
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
Promise.resolve().then(cleanup),
new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new PluginHostCleanupTimeoutError(hookId));
}, PLUGIN_HOST_CLEANUP_TIMEOUT_MS);
timeout.unref?.();
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}

View File

@@ -1,10 +1,16 @@
import fs from "node:fs";
import { getRuntimeConfig } from "../config/config.js";
import { updateSessionStore } from "../config/sessions/store.js";
import { resolveAllAgentSessionStoreTargetsSync } from "../config/sessions/targets.js";
import type { SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { cleanupPluginSessionSchedulerJobs, clearPluginRunContext } from "./host-hook-runtime.js";
import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js";
import {
cleanupPluginSessionSchedulerJobs,
clearPluginRunContext,
makePluginSessionSchedulerJobKey,
} from "./host-hook-runtime.js";
import type { PluginHostCleanupReason } from "./host-hooks.js";
import type { PluginRegistry } from "./registry-types.js";
@@ -101,7 +107,7 @@ async function clearPluginOwnedSessionStores(params: {
}
export async function runPluginHostCleanup(params: {
cfg: OpenClawConfig;
cfg?: OpenClawConfig;
registry?: PluginRegistry | null;
pluginId?: string;
reason: PluginHostCleanupReason;
@@ -113,72 +119,99 @@ export async function runPluginHostCleanup(params: {
params.reason === "restart"
? 0
: await clearPluginOwnedSessionStores({
cfg: params.cfg,
cfg: params.cfg ?? getRuntimeConfig(),
pluginId: params.pluginId,
sessionKey: params.sessionKey,
});
const registry = params.registry;
if (!registry) {
return { cleanupCount: persistentCleanupCount, failures: [] };
}
const failures: PluginHostCleanupFailure[] = [];
let cleanupCount = persistentCleanupCount;
for (const registration of registry.sessionExtensions ?? []) {
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
if (registry) {
for (const registration of registry.sessionExtensions ?? []) {
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
}
const cleanup = registration.extension.cleanup;
if (!cleanup) {
continue;
}
const hookId = `session:${registration.extension.namespace}`;
try {
await withPluginHostCleanupTimeout(hookId, () =>
cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
}),
);
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId,
error,
});
}
}
const cleanup = registration.extension.cleanup;
if (!cleanup) {
continue;
for (const registration of registry.runtimeLifecycles ?? []) {
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
}
const cleanup = registration.lifecycle.cleanup;
if (!cleanup) {
continue;
}
const hookId = `runtime:${registration.lifecycle.id}`;
try {
await withPluginHostCleanupTimeout(hookId, () =>
cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
runId: params.runId,
}),
);
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId,
error,
});
}
}
try {
await cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
});
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId: `session:${registration.extension.namespace}`,
error,
});
const schedulerFailures = await cleanupPluginSessionSchedulerJobs({
pluginId: params.pluginId,
reason: params.reason,
sessionKey: params.sessionKey,
records: registry.sessionSchedulerJobs,
preserveJobIds: params.preserveSchedulerJobIds,
});
for (const failure of schedulerFailures) {
failures.push(failure);
}
}
for (const registration of registry.runtimeLifecycles ?? []) {
if (!shouldCleanPlugin(registration.pluginId, params.pluginId)) {
continue;
}
const cleanup = registration.lifecycle.cleanup;
if (!cleanup) {
continue;
}
try {
await cleanup({
reason: params.reason,
sessionKey: params.sessionKey,
runId: params.runId,
});
cleanupCount += 1;
} catch (error) {
failures.push({
pluginId: registration.pluginId,
hookId: `runtime:${registration.lifecycle.id}`,
error,
});
if (params.reason !== "restart") {
const registrySchedulerJobKeys = new Set(
(registry?.sessionSchedulerJobs ?? [])
.filter((record) => !params.pluginId || record.pluginId === params.pluginId)
.map((record) => ({
pluginId: record.pluginId,
jobId: typeof record.job.id === "string" ? record.job.id.trim() : "",
}))
.filter(({ jobId }) => jobId.length > 0)
.map(({ pluginId, jobId }) => makePluginSessionSchedulerJobKey(pluginId, jobId)),
);
const runtimeSchedulerFailures = await cleanupPluginSessionSchedulerJobs({
pluginId: params.pluginId,
reason: params.reason,
sessionKey: params.sessionKey,
preserveJobIds: params.preserveSchedulerJobIds,
excludeJobKeys: registrySchedulerJobKeys,
});
for (const failure of runtimeSchedulerFailures) {
failures.push(failure);
}
}
const schedulerFailures = await cleanupPluginSessionSchedulerJobs({
pluginId: params.pluginId,
reason: params.reason,
sessionKey: params.sessionKey,
records: registry?.sessionSchedulerJobs,
preserveJobIds: params.preserveSchedulerJobIds,
});
for (const failure of schedulerFailures) {
failures.push(failure);
}
if (params.pluginId || params.runId) {
if ((params.pluginId || params.runId) && (params.reason !== "restart" || params.runId)) {
clearPluginRunContext({ pluginId: params.pluginId, runId: params.runId });
}
return { cleanupCount, failures };

View File

@@ -2,6 +2,7 @@ import type { AgentEventPayload } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { withPluginHostCleanupTimeout } from "./host-hook-cleanup-timeout.js";
import {
isPluginJsonValue,
type PluginHostCleanupReason,
@@ -33,6 +34,7 @@ type PluginHostRuntimeState = {
const PLUGIN_HOST_RUNTIME_STATE_KEY = Symbol.for("openclaw.pluginHostRuntimeState");
const CLOSED_RUN_IDS_MAX = 512;
export const PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS = 5_000;
const log = createSubsystemLogger("plugins/host-hooks");
function getPluginHostRuntimeState(): PluginHostRuntimeState {
@@ -83,6 +85,24 @@ function trackAgentEventHandler(runId: string, pending: Promise<void>): void {
});
}
function waitForTerminalEventHandlers(pendingHandlers: Set<Promise<void>>): Promise<void> {
if (pendingHandlers.size === 0) {
return Promise.resolve();
}
let timeout: NodeJS.Timeout | undefined = setTimeout(() => {
log.warn(
`plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; preserving run context until they settle`,
);
}, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
timeout.unref?.();
return Promise.allSettled(pendingHandlers).then(() => {
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}
});
}
function getPluginRunContextNamespaces(params: {
runId: string;
pluginId: string;
@@ -108,13 +128,14 @@ function getPluginRunContextNamespaces(params: {
export function setPluginRunContext(params: {
pluginId: string;
patch: PluginRunContextPatch;
allowClosedRun?: boolean;
}): boolean {
const runId = normalizeOptionalString(params.patch.runId);
const namespace = normalizeNamespace(params.patch.namespace);
if (!runId || !namespace) {
return false;
}
if (isPluginRunClosed(runId)) {
if (!params.allowClosedRun && isPluginRunClosed(runId)) {
return false;
}
// Only an explicit `unset: true` deletes the run-context entry — silently
@@ -230,6 +251,7 @@ export function dispatchPluginAgentEventSubscriptions(params: {
}): void {
const subscriptions = params.registry?.agentEventSubscriptions ?? [];
const pendingHandlers: Promise<void>[] = [];
const isTerminalEvent = isTerminalAgentRunEvent(params.event);
for (const registration of subscriptions) {
const streams = registration.subscription.streams;
if (streams && streams.length > 0 && !streams.includes(params.event.stream)) {
@@ -237,12 +259,17 @@ export function dispatchPluginAgentEventSubscriptions(params: {
}
const pluginId = registration.pluginId;
const runId = params.event.runId;
let handlerActive = true;
const ctx = {
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Run-context JSON reads are caller-typed by namespace.
getRunContext: <T extends PluginJsonValue = PluginJsonValue>(namespace: string) =>
getPluginRunContext<T>({ pluginId, get: { runId, namespace } }),
setRunContext: (namespace: string, value: PluginJsonValue) => {
setPluginRunContext({ pluginId, patch: { runId, namespace, value } });
setPluginRunContext({
pluginId,
patch: { runId, namespace, value },
allowClosedRun: isTerminalEvent && handlerActive,
});
},
clearRunContext: (namespace?: string) => {
clearPluginRunContext({ pluginId, runId, namespace });
@@ -251,16 +278,21 @@ export function dispatchPluginAgentEventSubscriptions(params: {
try {
const pending = Promise.resolve(
registration.subscription.handle(structuredClone(params.event), ctx),
).catch((error) => {
logAgentEventSubscriptionFailure({
pluginId,
subscriptionId: registration.subscription.id,
error,
)
.catch((error) => {
logAgentEventSubscriptionFailure({
pluginId,
subscriptionId: registration.subscription.id,
error,
});
})
.finally(() => {
handlerActive = false;
});
});
trackAgentEventHandler(runId, pending);
pendingHandlers.push(pending);
} catch (error) {
handlerActive = false;
logAgentEventSubscriptionFailure({
pluginId,
subscriptionId: registration.subscription.id,
@@ -268,12 +300,12 @@ export function dispatchPluginAgentEventSubscriptions(params: {
});
}
}
if (isTerminalAgentRunEvent(params.event)) {
if (isTerminalEvent) {
markPluginRunClosed(params.event.runId);
const pendingForRun =
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ??
new Set(pendingHandlers);
void Promise.allSettled(pendingForRun).then(() => {
void waitForTerminalEventHandlers(new Set(pendingForRun)).then(() => {
clearPluginRunContext({ runId: params.event.runId });
});
}
@@ -360,6 +392,10 @@ export function getPluginSessionSchedulerJobGeneration(params: {
return record.generation;
}
export function makePluginSessionSchedulerJobKey(pluginId: string, jobId: string): string {
return JSON.stringify([pluginId, jobId]);
}
export async function cleanupPluginSessionSchedulerJobs(params: {
pluginId?: string;
reason: PluginHostCleanupReason;
@@ -371,6 +407,7 @@ export async function cleanupPluginSessionSchedulerJobs(params: {
generation?: number;
}[];
preserveJobIds?: ReadonlySet<string>;
excludeJobKeys?: ReadonlySet<string>;
}): Promise<Array<{ pluginId: string; hookId: string; error: unknown }>> {
const state = getPluginHostRuntimeState();
const failures: Array<{ pluginId: string; hookId: string; error: unknown }> = [];
@@ -406,25 +443,30 @@ export async function cleanupPluginSessionSchedulerJobs(params: {
continue;
}
const preserveJob = params.preserveJobIds?.has(jobId) ?? false;
if (
preserveJob &&
(record.generation === undefined || liveGeneration === record.generation)
) {
if (preserveJob) {
// preserveJobIds means "do not run cleanup at all" — even across
// generation mismatches. The generation-matched deletion below would
// otherwise still call the OLD cleanup callback, which can remove
// external scheduled jobs (e.g. cron.remove) and break the live
// newer-generation registration that took over this jobId.
continue;
}
// A newer generation may already own this id. The old cleanup callback can
// still release plugin-owned resources, while deletion below is generation
// matched so it cannot remove the newer live record.
const hookId = `scheduler:${jobId}`;
try {
await record.job.cleanup?.({
reason: params.reason,
sessionKey,
jobId,
});
await withPluginHostCleanupTimeout(hookId, () =>
record.job.cleanup?.({
reason: params.reason,
sessionKey,
jobId,
}),
);
} catch (error) {
failures.push({
pluginId: record.pluginId,
hookId: `scheduler:${jobId}`,
hookId,
error,
});
continue;
@@ -448,16 +490,25 @@ export async function cleanupPluginSessionSchedulerJobs(params: {
if (params.sessionKey && record.job.sessionKey !== params.sessionKey) {
continue;
}
if (params.excludeJobKeys?.has(makePluginSessionSchedulerJobKey(pluginId, jobId))) {
continue;
}
if (params.preserveJobIds?.has(jobId)) {
continue;
}
const hookId = `scheduler:${jobId}`;
try {
await record.job.cleanup?.({
reason: params.reason,
sessionKey: record.job.sessionKey,
jobId,
});
await withPluginHostCleanupTimeout(hookId, () =>
record.job.cleanup?.({
reason: params.reason,
sessionKey: record.job.sessionKey,
jobId,
}),
);
} catch (error) {
failures.push({
pluginId,
hookId: `scheduler:${jobId}`,
hookId,
error,
});
continue;