fix: bound finalize lifecycle cleanup

This commit is contained in:
Eva
2026-05-01 20:02:06 +07:00
committed by Josh Lehman
parent cb097004eb
commit 7d8ffdcfe5
4 changed files with 81 additions and 15 deletions

View File

@@ -187,4 +187,30 @@ describe("agent harness lifecycle hook helpers", () => {
}),
).resolves.toEqual({ action: "revise", reason: "revise from context run" });
});
it("preserves merged revise reasons when retry metadata is present", async () => {
const hookRunner = {
hasHooks: () => true,
runBeforeAgentFinalize: vi.fn().mockResolvedValue({
action: "revise",
reason: "fix generated baseline\n\nrerun the focused tests",
retry: {
instruction: "rerun the focused tests",
idempotencyKey: "merged-reason",
maxAttempts: 1,
},
}),
};
await expect(
runAgentHarnessBeforeAgentFinalizeHook({
event: EVENT,
ctx: { runId: "run-1", sessionKey: "agent:main:session-1" },
hookRunner: hookRunner as never,
}),
).resolves.toEqual({
action: "revise",
reason: "fix generated baseline\n\nrerun the focused tests",
});
});
});

View File

@@ -161,7 +161,12 @@ function normalizeBeforeAgentFinalizeResult(
if (nextCount > maxAttempts) {
return { action: "continue" };
}
return { action: "revise", reason: retryInstruction };
const reason = result.reason?.trim();
const revisedReason =
reason && reason.includes(retryInstruction)
? reason
: [reason, retryInstruction].filter(Boolean).join("\n\n");
return { action: "revise", reason: revisedReason };
}
const reason = result.reason?.trim();
return reason ? { action: "revise", reason } : { action: "continue" };

View File

@@ -165,7 +165,7 @@ describe("plugin run context lifecycle", () => {
).toBeUndefined();
});
it("keeps run context until slow terminal event subscriptions settle", async () => {
it("clears run context after the terminal subscription grace period", async () => {
vi.useFakeTimers();
let releaseTerminalHandler: (() => void) | undefined;
let terminalHandlerSawContext: unknown;
@@ -221,13 +221,13 @@ describe("plugin run context lifecycle", () => {
pluginId: "slow-terminal-subscription",
get: { runId: "run-slow-terminal", namespace: "seen" },
}),
).toEqual({ runId: "run-slow-terminal" });
).toBeUndefined();
releaseTerminalHandler?.();
await vi.advanceTimersByTimeAsync(0);
expect(terminalHandlerSawContext).toEqual({ runId: "run-slow-terminal" });
expect(terminalHandlerWroteContext).toEqual({ completed: true });
expect(terminalHandlerSawContext).toBeUndefined();
expect(terminalHandlerWroteContext).toBeUndefined();
expect(
getPluginRunContext({
pluginId: "slow-terminal-subscription",

View File

@@ -30,6 +30,7 @@ type PluginHostRuntimeState = {
nextSchedulerJobGeneration: number;
pendingAgentEventHandlersByRunId: Map<string, Set<Promise<void>>>;
closedRunIds: Set<string>;
terminalEventCleanupExpiredRunIds: Set<string>;
};
const PLUGIN_HOST_RUNTIME_STATE_KEY = Symbol.for("openclaw.pluginHostRuntimeState");
@@ -44,6 +45,7 @@ function getPluginHostRuntimeState(): PluginHostRuntimeState {
nextSchedulerJobGeneration: 1,
pendingAgentEventHandlersByRunId: new Map(),
closedRunIds: new Set(),
terminalEventCleanupExpiredRunIds: new Set(),
}));
}
@@ -57,6 +59,7 @@ function copyJsonValue(value: PluginJsonValue): PluginJsonValue {
function markPluginRunClosed(runId: string): void {
const state = getPluginHostRuntimeState();
state.terminalEventCleanupExpiredRunIds.delete(runId);
state.closedRunIds.delete(runId);
state.closedRunIds.add(runId);
while (state.closedRunIds.size > CLOSED_RUN_IDS_MAX) {
@@ -72,6 +75,23 @@ function isPluginRunClosed(runId: string): boolean {
return getPluginHostRuntimeState().closedRunIds.has(runId);
}
function markTerminalEventCleanupExpired(runId: string): void {
const state = getPluginHostRuntimeState();
state.terminalEventCleanupExpiredRunIds.delete(runId);
state.terminalEventCleanupExpiredRunIds.add(runId);
while (state.terminalEventCleanupExpiredRunIds.size > CLOSED_RUN_IDS_MAX) {
const oldest = state.terminalEventCleanupExpiredRunIds.values().next().value;
if (oldest === undefined) {
break;
}
state.terminalEventCleanupExpiredRunIds.delete(oldest);
}
}
function isTerminalEventCleanupExpired(runId: string): boolean {
return getPluginHostRuntimeState().terminalEventCleanupExpiredRunIds.has(runId);
}
function trackAgentEventHandler(runId: string, pending: Promise<void>): void {
const state = getPluginHostRuntimeState();
const handlers = state.pendingAgentEventHandlersByRunId.get(runId) ?? new Set();
@@ -85,17 +105,28 @@ function trackAgentEventHandler(runId: string, pending: Promise<void>): void {
});
}
function waitForTerminalEventHandlers(pendingHandlers: Set<Promise<void>>): Promise<void> {
function waitForTerminalEventHandlers(params: {
runId: string;
pendingHandlers: Set<Promise<void>>;
}): Promise<void> {
const { pendingHandlers, runId } = params;
if (pendingHandlers.size === 0) {
return Promise.resolve();
}
let timeout: NodeJS.Timeout | undefined = setTimeout(() => {
log.warn(
`plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; preserving run context until they settle`,
);
}, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
timeout.unref?.();
return Promise.allSettled(pendingHandlers).then(() => {
let timeout: NodeJS.Timeout | undefined;
const settled = Promise.allSettled(pendingHandlers).then(() => "settled" as const);
const timedOut = new Promise<"timeout">((resolve) => {
timeout = setTimeout(() => {
markTerminalEventCleanupExpired(runId);
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.delete(runId);
log.warn(
`plugin terminal agent event subscriptions still running after ${PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS}ms; clearing run context without waiting for them to settle`,
);
resolve("timeout");
}, PLUGIN_TERMINAL_EVENT_CLEANUP_WAIT_MS);
});
timeout?.unref?.();
return Promise.race([settled, timedOut]).then(() => {
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
@@ -268,7 +299,7 @@ export function dispatchPluginAgentEventSubscriptions(params: {
setPluginRunContext({
pluginId,
patch: { runId, namespace, value },
allowClosedRun: isTerminalEvent && handlerActive,
allowClosedRun: isTerminalEvent && handlerActive && !isTerminalEventCleanupExpired(runId),
});
},
clearRunContext: (namespace?: string) => {
@@ -305,7 +336,10 @@ export function dispatchPluginAgentEventSubscriptions(params: {
const pendingForRun =
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ??
new Set(pendingHandlers);
void waitForTerminalEventHandlers(new Set(pendingForRun)).then(() => {
void waitForTerminalEventHandlers({
runId: params.event.runId,
pendingHandlers: new Set(pendingForRun),
}).then(() => {
clearPluginRunContext({ runId: params.event.runId });
});
}
@@ -531,6 +565,7 @@ export function clearPluginHostRuntimeState(params?: { pluginId?: string; runId?
state.schedulerJobsByPlugin.clear();
state.pendingAgentEventHandlersByRunId.clear();
state.closedRunIds.clear();
state.terminalEventCleanupExpiredRunIds.clear();
}
}