mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:50:43 +00:00
fix: clear embedded runs before lifecycle end (#70187)
* fix: clear embedded runs before lifecycle end * fix: guard onBeforeLifecycleTerminal against synchronous throws Wrap the hook invocation in try/catch so a synchronous exception cannot skip emitLifecycleTerminal() after lifecycleTerminalEmitted is already set to true. This preserves the best-effort contract documented in the JSDoc.
This commit is contained in:
@@ -1711,6 +1711,11 @@ export async function runEmbeddedAttempt(
|
||||
onPartialReply: params.onPartialReply,
|
||||
onAssistantMessageStart: params.onAssistantMessageStart,
|
||||
onAgentEvent: params.onAgentEvent,
|
||||
onBeforeLifecycleTerminal: () => {
|
||||
// Clear embedded-run activity before emitting terminal lifecycle events so
|
||||
// post-completion cleanup does not observe a logically finished run as active.
|
||||
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
|
||||
},
|
||||
enforceFinalTag: params.enforceFinalTag,
|
||||
silentExpected: params.silentExpected,
|
||||
config: params.config,
|
||||
|
||||
@@ -11,6 +11,7 @@ function createContext(
|
||||
lastAssistant: unknown,
|
||||
overrides?: {
|
||||
onAgentEvent?: (event: unknown) => void;
|
||||
onBeforeLifecycleTerminal?: () => void | Promise<void>;
|
||||
onBlockReplyFlush?: () => void | Promise<void>;
|
||||
},
|
||||
): EmbeddedPiSubscribeContext {
|
||||
@@ -21,6 +22,7 @@ function createContext(
|
||||
config: {},
|
||||
sessionKey: "agent:main:main",
|
||||
onAgentEvent: overrides?.onAgentEvent,
|
||||
onBeforeLifecycleTerminal: overrides?.onBeforeLifecycleTerminal,
|
||||
onBlockReply,
|
||||
onBlockReplyFlush: overrides?.onBlockReplyFlush,
|
||||
},
|
||||
@@ -380,6 +382,90 @@ describe("handleAgentEnd", () => {
|
||||
await endPromise;
|
||||
});
|
||||
|
||||
it("runs the before-lifecycle callback before the lifecycle end event", async () => {
|
||||
const order: string[] = [];
|
||||
const onAgentEvent = vi.fn(() => {
|
||||
order.push("event");
|
||||
});
|
||||
const onBeforeLifecycleTerminal = vi.fn(() => {
|
||||
order.push("before");
|
||||
});
|
||||
const ctx = createContext(undefined, {
|
||||
onAgentEvent,
|
||||
onBeforeLifecycleTerminal,
|
||||
});
|
||||
|
||||
await handleAgentEnd(ctx);
|
||||
|
||||
expect(order).toEqual(["before", "event"]);
|
||||
expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1);
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
});
|
||||
|
||||
it("runs an async before-lifecycle callback before the lifecycle end event", async () => {
|
||||
const order: string[] = [];
|
||||
const onAgentEvent = vi.fn(() => {
|
||||
order.push("event");
|
||||
});
|
||||
const onBeforeLifecycleTerminal = vi.fn(() =>
|
||||
Promise.resolve().then(() => {
|
||||
order.push("before");
|
||||
}),
|
||||
);
|
||||
const ctx = createContext(undefined, {
|
||||
onAgentEvent,
|
||||
onBeforeLifecycleTerminal,
|
||||
});
|
||||
|
||||
await handleAgentEnd(ctx);
|
||||
|
||||
expect(order).toEqual(["before", "event"]);
|
||||
expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1);
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
});
|
||||
|
||||
it("still emits lifecycle terminal when sync before-lifecycle callback throws", async () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const onBeforeLifecycleTerminal = vi.fn(() => {
|
||||
throw new Error("hook exploded");
|
||||
});
|
||||
const ctx = createContext(undefined, {
|
||||
onAgentEvent,
|
||||
onBeforeLifecycleTerminal,
|
||||
});
|
||||
|
||||
await handleAgentEnd(ctx);
|
||||
|
||||
expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1);
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
});
|
||||
|
||||
it("still emits lifecycle terminal when async before-lifecycle callback rejects", async () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const onBeforeLifecycleTerminal = vi.fn(() => Promise.reject(new Error("hook failed")));
|
||||
const ctx = createContext(undefined, {
|
||||
onAgentEvent,
|
||||
onBeforeLifecycleTerminal,
|
||||
});
|
||||
|
||||
await handleAgentEnd(ctx);
|
||||
|
||||
expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1);
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
});
|
||||
|
||||
it("emits lifecycle end after async channel flush completes", async () => {
|
||||
let resolveChannelFlush: (() => void) | undefined;
|
||||
const onAgentEvent = vi.fn();
|
||||
|
||||
@@ -191,11 +191,26 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
|
||||
};
|
||||
|
||||
let lifecycleTerminalEmitted = false;
|
||||
const emitLifecycleTerminalOnce = () => {
|
||||
const emitLifecycleTerminalOnce = (): void | Promise<void> => {
|
||||
if (lifecycleTerminalEmitted) {
|
||||
return;
|
||||
}
|
||||
lifecycleTerminalEmitted = true;
|
||||
let beforeLifecycleTerminal: void | Promise<void> = undefined;
|
||||
try {
|
||||
beforeLifecycleTerminal = ctx.params.onBeforeLifecycleTerminal?.();
|
||||
} catch (err) {
|
||||
ctx.log.debug(`before lifecycle terminal failed: ${String(err)}`);
|
||||
}
|
||||
if (isPromiseLike<void>(beforeLifecycleTerminal)) {
|
||||
return Promise.resolve(beforeLifecycleTerminal)
|
||||
.catch((err) => {
|
||||
ctx.log.debug(`before lifecycle terminal failed: ${String(err)}`);
|
||||
})
|
||||
.then(() => {
|
||||
emitLifecycleTerminal();
|
||||
});
|
||||
}
|
||||
emitLifecycleTerminal();
|
||||
};
|
||||
|
||||
@@ -207,15 +222,28 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
|
||||
: flushPendingMediaAndChannel();
|
||||
|
||||
if (isPromiseLike<void>(flushPendingMediaAndChannelResult)) {
|
||||
return Promise.resolve(flushPendingMediaAndChannelResult).finally(() => {
|
||||
emitLifecycleTerminalOnce();
|
||||
});
|
||||
return Promise.resolve(flushPendingMediaAndChannelResult).then(
|
||||
() => emitLifecycleTerminalOnce(),
|
||||
(error) => {
|
||||
const emitted = emitLifecycleTerminalOnce();
|
||||
if (isPromiseLike<void>(emitted)) {
|
||||
return Promise.resolve(emitted).then(() => {
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
throw error;
|
||||
},
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
emitLifecycleTerminalOnce();
|
||||
const emitted = emitLifecycleTerminalOnce();
|
||||
if (isPromiseLike<void>(emitted)) {
|
||||
return Promise.resolve(emitted).then(() => {
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
emitLifecycleTerminalOnce();
|
||||
return undefined;
|
||||
return emitLifecycleTerminalOnce();
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ export type SubscribeEmbeddedPiSessionParams = {
|
||||
onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
|
||||
onAssistantMessageStart?: () => void | Promise<void>;
|
||||
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void | Promise<void>;
|
||||
/** Best-effort hook invoked immediately before the terminal lifecycle event is emitted. */
|
||||
onBeforeLifecycleTerminal?: () => void | Promise<void>;
|
||||
enforceFinalTag?: boolean;
|
||||
silentExpected?: boolean;
|
||||
config?: OpenClawConfig;
|
||||
|
||||
Reference in New Issue
Block a user