mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:50:44 +00:00
fix: wait for live terminal handlers
This commit is contained in:
@@ -315,6 +315,94 @@ describe("plugin run context lifecycle", () => {
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it("waits for terminal handlers added after the first terminal cleanup waiter starts", async () => {
|
||||
let releaseFirstTerminalHandler: (() => void) | undefined;
|
||||
let releaseSecondTerminalHandler: (() => void) | undefined;
|
||||
let firstTerminalHandlerSawContext: unknown;
|
||||
let secondTerminalHandlerSawContext: unknown;
|
||||
let terminalEventsSeen = 0;
|
||||
const { config, registry } = createPluginRegistryFixture();
|
||||
registerTestPlugin({
|
||||
registry,
|
||||
config,
|
||||
record: createPluginRecord({
|
||||
id: "repeated-terminal-live-wait",
|
||||
name: "Repeated Terminal Live Wait",
|
||||
}),
|
||||
register(api) {
|
||||
api.registerAgentEventSubscription({
|
||||
id: "records",
|
||||
streams: ["tool", "lifecycle"],
|
||||
async handle(event, ctx) {
|
||||
if (event.stream === "tool") {
|
||||
ctx.setRunContext("seen", { runId: event.runId });
|
||||
return;
|
||||
}
|
||||
if (event.data?.phase !== "end") {
|
||||
return;
|
||||
}
|
||||
terminalEventsSeen += 1;
|
||||
if (terminalEventsSeen === 1) {
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseFirstTerminalHandler = resolve;
|
||||
});
|
||||
firstTerminalHandlerSawContext = ctx.getRunContext("seen");
|
||||
return;
|
||||
}
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseSecondTerminalHandler = resolve;
|
||||
});
|
||||
secondTerminalHandlerSawContext = ctx.getRunContext("seen");
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
setActivePluginRegistry(registry.registry);
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-repeated-terminal-live-wait",
|
||||
stream: "tool",
|
||||
data: { name: "tool" },
|
||||
});
|
||||
await waitForPluginEventHandlers();
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-repeated-terminal-live-wait",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
await waitForPluginEventHandlers();
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-repeated-terminal-live-wait",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
await waitForPluginEventHandlers();
|
||||
|
||||
releaseFirstTerminalHandler?.();
|
||||
await waitForPluginEventHandlers();
|
||||
expect(firstTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" });
|
||||
expect(
|
||||
getPluginRunContext({
|
||||
pluginId: "repeated-terminal-live-wait",
|
||||
get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" },
|
||||
}),
|
||||
).toEqual({ runId: "run-repeated-terminal-live-wait" });
|
||||
|
||||
releaseSecondTerminalHandler?.();
|
||||
await waitForPluginEventHandlers();
|
||||
await waitForPluginEventHandlers();
|
||||
|
||||
expect(secondTerminalHandlerSawContext).toEqual({ runId: "run-repeated-terminal-live-wait" });
|
||||
expect(
|
||||
getPluginRunContext({
|
||||
pluginId: "repeated-terminal-live-wait",
|
||||
get: { runId: "run-repeated-terminal-live-wait", namespace: "seen" },
|
||||
}),
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it("clears run context after the terminal subscription grace period", async () => {
|
||||
vi.useFakeTimers();
|
||||
let releaseTerminalHandler: (() => void) | undefined;
|
||||
|
||||
@@ -107,16 +107,20 @@ function trackAgentEventHandler(runId: string, pending: Promise<void>): void {
|
||||
});
|
||||
}
|
||||
|
||||
function waitForTerminalEventHandlers(params: {
|
||||
runId: string;
|
||||
pendingHandlers: Set<Promise<void>>;
|
||||
}): Promise<void> {
|
||||
const { pendingHandlers, runId } = params;
|
||||
if (pendingHandlers.size === 0) {
|
||||
return Promise.resolve();
|
||||
async function waitForLiveTerminalEventHandlers(runId: string): Promise<"settled"> {
|
||||
for (;;) {
|
||||
const pendingHandlers = getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(runId);
|
||||
if (!pendingHandlers || pendingHandlers.size === 0) {
|
||||
return "settled";
|
||||
}
|
||||
await Promise.allSettled(pendingHandlers);
|
||||
}
|
||||
}
|
||||
|
||||
function waitForTerminalEventHandlers(params: { runId: string }): Promise<void> {
|
||||
const { runId } = params;
|
||||
let timeout: NodeJS.Timeout | undefined;
|
||||
const settled = Promise.allSettled(pendingHandlers).then(() => "settled" as const);
|
||||
const settled = waitForLiveTerminalEventHandlers(runId);
|
||||
// Promise.race bounds the host wait; JavaScript cannot cancel the plugin
|
||||
// promises themselves, so timeout also marks the run expired to block late
|
||||
// run-context resurrection by handlers that eventually settle.
|
||||
@@ -340,12 +344,8 @@ export function dispatchPluginAgentEventSubscriptions(params: {
|
||||
}
|
||||
if (isTerminalEvent) {
|
||||
markPluginRunClosed(params.event.runId);
|
||||
const pendingForRun =
|
||||
getPluginHostRuntimeState().pendingAgentEventHandlersByRunId.get(params.event.runId) ??
|
||||
new Set(pendingHandlers);
|
||||
void waitForTerminalEventHandlers({
|
||||
runId: params.event.runId,
|
||||
pendingHandlers: new Set(pendingForRun),
|
||||
}).then(() => {
|
||||
clearPluginRunContext({ runId: params.event.runId });
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user