diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 242068c3765..f713f38c312 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1711,6 +1711,11 @@ export async function runEmbeddedAttempt( onPartialReply: params.onPartialReply, onAssistantMessageStart: params.onAssistantMessageStart, onAgentEvent: params.onAgentEvent, + onBeforeLifecycleTerminal: () => { + // Clear embedded-run activity before emitting terminal lifecycle events so + // post-completion cleanup does not observe a logically finished run as active. + clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); + }, enforceFinalTag: params.enforceFinalTag, silentExpected: params.silentExpected, config: params.config, diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index 96ec1d0c09b..75a2ee1e06d 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -11,6 +11,7 @@ function createContext( lastAssistant: unknown, overrides?: { onAgentEvent?: (event: unknown) => void; + onBeforeLifecycleTerminal?: () => void | Promise; onBlockReplyFlush?: () => void | Promise; }, ): EmbeddedPiSubscribeContext { @@ -21,6 +22,7 @@ function createContext( config: {}, sessionKey: "agent:main:main", onAgentEvent: overrides?.onAgentEvent, + onBeforeLifecycleTerminal: overrides?.onBeforeLifecycleTerminal, onBlockReply, onBlockReplyFlush: overrides?.onBlockReplyFlush, }, @@ -380,6 +382,90 @@ describe("handleAgentEnd", () => { await endPromise; }); + it("runs the before-lifecycle callback before the lifecycle end event", async () => { + const order: string[] = []; + const onAgentEvent = vi.fn(() => { + order.push("event"); + }); + const onBeforeLifecycleTerminal = vi.fn(() => { + order.push("before"); + }); + const ctx = createContext(undefined, { + onAgentEvent, + onBeforeLifecycleTerminal, + }); + + await handleAgentEnd(ctx); + + expect(order).toEqual(["before", "event"]); + expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { phase: "end" }, + }); + }); + + it("runs an async before-lifecycle callback before the lifecycle end event", async () => { + const order: string[] = []; + const onAgentEvent = vi.fn(() => { + order.push("event"); + }); + const onBeforeLifecycleTerminal = vi.fn(() => + Promise.resolve().then(() => { + order.push("before"); + }), + ); + const ctx = createContext(undefined, { + onAgentEvent, + onBeforeLifecycleTerminal, + }); + + await handleAgentEnd(ctx); + + expect(order).toEqual(["before", "event"]); + expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { phase: "end" }, + }); + }); + + it("still emits lifecycle terminal when sync before-lifecycle callback throws", async () => { + const onAgentEvent = vi.fn(); + const onBeforeLifecycleTerminal = vi.fn(() => { + throw new Error("hook exploded"); + }); + const ctx = createContext(undefined, { + onAgentEvent, + onBeforeLifecycleTerminal, + }); + + await handleAgentEnd(ctx); + + expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { phase: "end" }, + }); + }); + + it("still emits lifecycle terminal when async before-lifecycle callback rejects", async () => { + const onAgentEvent = vi.fn(); + const onBeforeLifecycleTerminal = vi.fn(() => Promise.reject(new Error("hook failed"))); + const ctx = createContext(undefined, { + onAgentEvent, + onBeforeLifecycleTerminal, + }); + + await handleAgentEnd(ctx); + + expect(onBeforeLifecycleTerminal).toHaveBeenCalledTimes(1); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { phase: "end" }, + }); + }); + it("emits lifecycle end after async channel flush completes", async () => { let resolveChannelFlush: (() => void) | undefined; const onAgentEvent = vi.fn(); diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index d9cf3173753..b81e42ea8d1 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -191,11 +191,26 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< }; let lifecycleTerminalEmitted = false; - const emitLifecycleTerminalOnce = () => { + const emitLifecycleTerminalOnce = (): void | Promise => { if (lifecycleTerminalEmitted) { return; } lifecycleTerminalEmitted = true; + let beforeLifecycleTerminal: void | Promise = undefined; + try { + beforeLifecycleTerminal = ctx.params.onBeforeLifecycleTerminal?.(); + } catch (err) { + ctx.log.debug(`before lifecycle terminal failed: ${String(err)}`); + } + if (isPromiseLike(beforeLifecycleTerminal)) { + return Promise.resolve(beforeLifecycleTerminal) + .catch((err) => { + ctx.log.debug(`before lifecycle terminal failed: ${String(err)}`); + }) + .then(() => { + emitLifecycleTerminal(); + }); + } emitLifecycleTerminal(); }; @@ -207,15 +222,28 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< : flushPendingMediaAndChannel(); if (isPromiseLike(flushPendingMediaAndChannelResult)) { - return Promise.resolve(flushPendingMediaAndChannelResult).finally(() => { - emitLifecycleTerminalOnce(); - }); + return Promise.resolve(flushPendingMediaAndChannelResult).then( + () => emitLifecycleTerminalOnce(), + (error) => { + const emitted = emitLifecycleTerminalOnce(); + if (isPromiseLike(emitted)) { + return Promise.resolve(emitted).then(() => { + throw error; + }); + } + throw error; + }, + ); } } catch (error) { - emitLifecycleTerminalOnce(); + const emitted = emitLifecycleTerminalOnce(); + if (isPromiseLike(emitted)) { + return Promise.resolve(emitted).then(() => { + throw error; + }); + } throw error; } - emitLifecycleTerminalOnce(); - return undefined; + return emitLifecycleTerminalOnce(); } diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index 2f49a377592..44064e0be5c 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -31,6 +31,8 @@ export type SubscribeEmbeddedPiSessionParams = { onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onAssistantMessageStart?: () => void | Promise; onAgentEvent?: (evt: { stream: string; data: Record }) => void | Promise; + /** Best-effort hook invoked immediately before the terminal lifecycle event is emitted. */ + onBeforeLifecycleTerminal?: () => void | Promise; enforceFinalTag?: boolean; silentExpected?: boolean; config?: OpenClawConfig;