From db958463f6587c3d2c58351760a0f3da7869106b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 01:09:11 +0100 Subject: [PATCH] fix(codex): emit app-server final chat events (#71293) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix live webchat finalization for Codex app-server runs by emitting standard assistant and lifecycle completion events on the global agent event bus, instead of relying on a message-less chat.final fallback. Replaces #70815. Closes #71183. Co-authored-by: Lēsa <260982214+lesaai@users.noreply.github.com> --- CHANGELOG.md | 1 + .../src/app-server/event-projector.test.ts | 8 +- .../codex/src/app-server/event-projector.ts | 20 ++++- .../codex/src/app-server/run-attempt.test.ts | 88 ++++++++++++++++++- .../codex/src/app-server/run-attempt.ts | 71 ++++++++++++++- .../chat.directive-tags.test.ts | 4 + src/plugin-sdk/agent-harness-runtime.ts | 1 + 7 files changed, 186 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fa847853d7..1572023b777 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai - Slack/exec approvals: resolve native approval button clicks over the Gateway instead of delivering `/approve ...` as plain agent text, preserving retry buttons if Gateway resolution fails. Fixes #71023. (#71025) Thanks @marusan03. - Slack/files: return non-image `download-file` results as local file paths instead of image payloads, and include Slack file IDs in inbound file placeholders so agents can call `download-file`. Fixes #71212. Thanks @teamrazo. - Browser control: scope standalone loopback auth to the resolved active gateway credential and fail closed when password mode lacks a resolved password, so inactive tokens or passwords no longer authorize browser routes. Fixes #65626. (#65639) Thanks @coygeek. +- Control UI/Codex harness: emit native Codex app-server assistant and lifecycle completion events so live webchat runs stop spinning without needing a transcript reload fallback. (#70815) Thanks @lesaai. - Discord/replies: run `message_sending` plugin hooks for Discord reply delivery, including DM targets, so plugins can transform or cancel outbound Discord replies consistently with other channels. Fixes #59350. (#71094) Thanks @wei840222. - Control UI/commands: carry provider-owned thinking option ids/labels in session rows and defaults so fresh sessions show and accept dynamic modes such as `adaptive`, `xhigh`, and `max`. Fixes #71269. Thanks @Young-Khalil. - Image generation: make explicit `model=` overrides exact-only so failed `openai/gpt-image-2` requests no longer fall through to Gemini or other configured providers, and update `image_generate list` to mention OpenAI Codex OAuth as valid auth for `openai/gpt-image-2`. Fixes #71290 and #71231. Thanks @Young-Khalil and @steipete. diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index 89d2e31d9b3..7454c188e3c 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -3,7 +3,8 @@ import os from "node:os"; import path from "node:path"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { resetAgentEventsForTest } from "../../../../src/infra/agent-events.js"; import { initializeGlobalHookRunner, resetGlobalHookRunner, @@ -78,7 +79,12 @@ async function createProjectorWithAssistantHooks() { return { onAssistantMessageStart, onPartialReply, projector }; } +beforeEach(() => { + resetAgentEventsForTest(); +}); + afterEach(async () => { + resetAgentEventsForTest(); resetGlobalHookRunner(); vi.restoreAllMocks(); for (const tempDir of tempDirs) { diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index 8fee9b1932b..2cd8ce36afa 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -2,6 +2,8 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, Usage } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { + embeddedAgentLog, + emitAgentEvent as emitGlobalAgentEvent, formatErrorMessage, formatToolProgressOutput, inferToolMetaFromArgs, @@ -724,9 +726,23 @@ export class CodexAppServerEventProjector { event: Parameters>[0], ): void { try { - this.params.onAgentEvent?.(event); - } catch { + emitGlobalAgentEvent({ + runId: this.params.runId, + stream: event.stream, + data: event.data, + ...(this.params.sessionKey ? { sessionKey: this.params.sessionKey } : {}), + }); + } catch (error) { + embeddedAgentLog.debug("codex app-server global agent event emit failed", { error }); + } + try { + const maybePromise = this.params.onAgentEvent?.(event); + void Promise.resolve(maybePromise).catch((error: unknown) => { + embeddedAgentLog.debug("codex app-server agent event handler rejected", { error }); + }); + } catch (error) { // Downstream event consumers must not corrupt the canonical Codex turn projection. + embeddedAgentLog.debug("codex app-server agent event handler threw", { error }); } } diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index ea2ebaa1398..6ae3d95f80b 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -9,6 +9,11 @@ import { } from "openclaw/plugin-sdk/agent-harness"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { __testing as nativeHookRelayTesting } from "../../../../src/agents/harness/native-hook-relay.js"; +import { + onAgentEvent, + resetAgentEventsForTest, + type AgentEventPayload, +} from "../../../../src/infra/agent-events.js"; import { initializeGlobalHookRunner, resetGlobalHookRunner, @@ -276,12 +281,14 @@ function extractRelayIdFromThreadRequest(params: unknown): string { describe("runCodexAppServerAttempt", () => { beforeEach(async () => { + resetAgentEventsForTest(); tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-run-")); }); afterEach(async () => { __testing.resetCodexAppServerClientFactoryForTests(); nativeHookRelayTesting.clearNativeHookRelaysForTests(); + resetAgentEventsForTest(); resetGlobalHookRunner(); vi.restoreAllMocks(); await fs.rm(tempDir, { recursive: true, force: true }); @@ -341,6 +348,9 @@ describe("runCodexAppServerAttempt", () => { const llmInput = vi.fn(); const llmOutput = vi.fn(); const agentEnd = vi.fn(); + const onRunAgentEvent = vi.fn(); + const globalAgentEvents: AgentEventPayload[] = []; + onAgentEvent((event) => globalAgentEvents.push(event)); initializeGlobalHookRunner( createMockPluginRegistry([ { hookName: "llm_input", handler: llmInput }, @@ -354,7 +364,9 @@ describe("runCodexAppServerAttempt", () => { sessionManager.appendMessage(assistantMessage("existing context", Date.now())); const harness = createStartedThreadHarness(); - const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)); + const params = createParams(sessionFile, workspaceDir); + params.onAgentEvent = onRunAgentEvent; + const run = runCodexAppServerAttempt(params); await harness.waitForMethod("turn/start"); await vi.waitFor(() => expect(llmInput).toHaveBeenCalledTimes(1), { interval: 1 }); @@ -391,6 +403,56 @@ describe("runCodexAppServerAttempt", () => { expect(result.assistantTexts).toEqual(["hello back"]); await vi.waitFor(() => expect(llmOutput).toHaveBeenCalledTimes(1), { interval: 1 }); await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 }); + const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event); + expect(agentEvents).toEqual( + expect.arrayContaining([ + { + stream: "lifecycle", + data: expect.objectContaining({ + phase: "start", + startedAt: expect.any(Number), + }), + }, + { + stream: "assistant", + data: { text: "hello back" }, + }, + { + stream: "lifecycle", + data: expect.objectContaining({ + phase: "end", + startedAt: expect.any(Number), + endedAt: expect.any(Number), + }), + }, + ]), + ); + const startIndex = agentEvents.findIndex( + (event) => event.stream === "lifecycle" && event.data.phase === "start", + ); + const assistantIndex = agentEvents.findIndex((event) => event.stream === "assistant"); + const endIndex = agentEvents.findIndex( + (event) => event.stream === "lifecycle" && event.data.phase === "end", + ); + expect(startIndex).toBeGreaterThanOrEqual(0); + expect(assistantIndex).toBeGreaterThan(startIndex); + expect(endIndex).toBeGreaterThan(assistantIndex); + expect(globalAgentEvents).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + runId: "run-1", + sessionKey: "agent:main:session-1", + stream: "assistant", + data: { text: "hello back" }, + }), + expect.objectContaining({ + runId: "run-1", + sessionKey: "agent:main:session-1", + stream: "lifecycle", + data: expect.objectContaining({ phase: "end" }), + }), + ]), + ); expect(llmOutput).toHaveBeenCalledWith( expect.objectContaining({ @@ -530,6 +592,7 @@ describe("runCodexAppServerAttempt", () => { it("fires agent_end with failure metadata when the codex turn fails", async () => { const agentEnd = vi.fn(); + const onRunAgentEvent = vi.fn(); initializeGlobalHookRunner( createMockPluginRegistry([{ hookName: "agent_end", handler: agentEnd }]), ); @@ -537,7 +600,9 @@ describe("runCodexAppServerAttempt", () => { const workspaceDir = path.join(tempDir, "workspace"); const harness = createStartedThreadHarness(); - const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)); + const params = createParams(sessionFile, workspaceDir); + params.onAgentEvent = onRunAgentEvent; + const run = runCodexAppServerAttempt(params); await harness.waitForMethod("turn/start"); await harness.notify({ method: "turn/completed", @@ -556,6 +621,25 @@ describe("runCodexAppServerAttempt", () => { expect(result.promptError).toBe("codex exploded"); await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 }); + const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event); + expect(agentEvents).toEqual( + expect.arrayContaining([ + { + stream: "lifecycle", + data: expect.objectContaining({ phase: "start", startedAt: expect.any(Number) }), + }, + { + stream: "lifecycle", + data: expect.objectContaining({ + phase: "error", + startedAt: expect.any(Number), + endedAt: expect.any(Number), + error: "codex exploded", + }), + }, + ]), + ); + expect(agentEvents.some((event) => event.stream === "assistant")).toBe(false); expect(agentEnd).toHaveBeenCalledWith( expect.objectContaining({ success: false, diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 03116521112..b3ce8139fc4 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -9,6 +9,7 @@ import { buildEmbeddedAttemptToolRunContext, clearActiveEmbeddedRun, embeddedAgentLog, + emitAgentEvent as emitGlobalAgentEvent, finalizeHarnessContextEngineTurn, formatErrorMessage, isActiveHarnessContextEngine, @@ -90,13 +91,31 @@ function emitCodexAppServerEvent( event: Parameters>[0], ): void { try { - params.onAgentEvent?.(event); - } catch { + emitGlobalAgentEvent({ + runId: params.runId, + stream: event.stream, + data: event.data, + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + }); + } catch (error) { + embeddedAgentLog.debug("codex app-server global agent event emit failed", { error }); + } + try { + const maybePromise = params.onAgentEvent?.(event); + void Promise.resolve(maybePromise).catch((error: unknown) => { + embeddedAgentLog.debug("codex app-server agent event handler rejected", { error }); + }); + } catch (error) { // Event consumers are observational; they must not abort or strand the // canonical app-server turn lifecycle. + embeddedAgentLog.debug("codex app-server agent event handler threw", { error }); } } +function collectTerminalAssistantText(result: EmbeddedRunAttemptResult): string { + return result.assistantTexts.join("\n\n").trim(); +} + export async function runCodexAppServerAttempt( params: EmbeddedRunAttemptParams, options: { @@ -335,12 +354,37 @@ export async function runCodexAppServerAttempt( let userInputBridge: ReturnType | undefined; let completed = false; let timedOut = false; + let lifecycleStarted = false; + let lifecycleTerminalEmitted = false; let resolveCompletion: (() => void) | undefined; const completion = new Promise((resolve) => { resolveCompletion = resolve; }); let notificationQueue: Promise = Promise.resolve(); + const emitLifecycleStart = () => { + emitCodexAppServerEvent(params, { + stream: "lifecycle", + data: { phase: "start", startedAt: attemptStartedAt }, + }); + lifecycleStarted = true; + }; + + const emitLifecycleTerminal = (data: Record & { phase: "end" | "error" }) => { + if (!lifecycleStarted || lifecycleTerminalEmitted) { + return; + } + emitCodexAppServerEvent(params, { + stream: "lifecycle", + data: { + startedAt: attemptStartedAt, + endedAt: Date.now(), + ...data, + }, + }); + lifecycleTerminalEmitted = true; + }; + const handleNotification = async (notification: CodexServerNotification) => { userInputBridge?.handleNotification(notification); if (!projector || !turnId) { @@ -536,6 +580,7 @@ export async function runCodexAppServerAttempt( imagesCount: params.images?.length ?? 0, }); projector = new CodexAppServerEventProjector(params, thread.threadId, activeTurnId); + emitLifecycleStart(); const activeProjector = projector; for (const notification of pendingNotifications.splice(0)) { await enqueueNotification(notification); @@ -622,6 +667,24 @@ export async function runCodexAppServerAttempt( threadId: thread.threadId, turnId: activeTurnId, }); + const terminalAssistantText = collectTerminalAssistantText(result); + if (terminalAssistantText && !finalAborted && !finalPromptError) { + emitCodexAppServerEvent(params, { + stream: "assistant", + data: { text: terminalAssistantText }, + }); + } + if (finalPromptError) { + emitLifecycleTerminal({ + phase: "error", + error: formatErrorMessage(finalPromptError), + }); + } else { + emitLifecycleTerminal({ + phase: "end", + ...(finalAborted ? { aborted: true } : {}), + }); + } if (activeContextEngine) { const finalMessages = readMirroredSessionHistoryMessages(params.sessionFile) ?? @@ -684,6 +747,10 @@ export async function runCodexAppServerAttempt( promptErrorSource: finalPromptErrorSource, }; } finally { + emitLifecycleTerminal({ + phase: "error", + error: "codex app-server run completed without lifecycle terminal event", + }); if (trajectoryRecorder && !trajectoryEndRecorded) { trajectoryRecorder.recordEvent("session.ended", { status: timedOut || runAbortController.signal.aborted ? "interrupted" : "cleanup", diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index b6af0120a4d..7fd3f213c23 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -1676,6 +1676,10 @@ describe("chat directive tag stripping for non-streaming final payloads", () => timestamp: expect.any(Number), }, }); + const finalBroadcast = ( + context.broadcast as unknown as ReturnType + ).mock.calls.find((call) => call[0] === "chat" && call[1]?.state === "final")?.[1]; + expect(finalBroadcast).toBeUndefined(); }); it("adds persisted media paths to the user transcript update", async () => { diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index e90c78c4c25..e39ca9f6a13 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -55,6 +55,7 @@ export type { export { VERSION as OPENCLAW_VERSION } from "../version.js"; export { formatErrorMessage } from "../infra/errors.js"; +export { emitAgentEvent } from "../infra/agent-events.js"; export { log as embeddedAgentLog } from "../agents/pi-embedded-runner/logger.js"; export { resolveEmbeddedAgentRuntime } from "../agents/pi-embedded-runner/runtime.js"; export { resolveUserPath } from "../utils.js";