fix(codex): emit app-server final chat events (#71293)

Fix live webchat finalization for Codex app-server runs by emitting standard assistant and lifecycle completion events on the global agent event bus, instead of relying on a message-less chat.final fallback.

Replaces #70815. Closes #71183.

Co-authored-by: Lēsa <260982214+lesaai@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-04-25 01:09:11 +01:00
committed by GitHub
parent f4add8047b
commit db958463f6
7 changed files with 186 additions and 7 deletions

View File

@@ -3,7 +3,8 @@ import os from "node:os";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetAgentEventsForTest } from "../../../../src/infra/agent-events.js";
import {
initializeGlobalHookRunner,
resetGlobalHookRunner,
@@ -78,7 +79,12 @@ async function createProjectorWithAssistantHooks() {
return { onAssistantMessageStart, onPartialReply, projector };
}
beforeEach(() => {
resetAgentEventsForTest();
});
afterEach(async () => {
resetAgentEventsForTest();
resetGlobalHookRunner();
vi.restoreAllMocks();
for (const tempDir of tempDirs) {

View File

@@ -2,6 +2,8 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, Usage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
embeddedAgentLog,
emitAgentEvent as emitGlobalAgentEvent,
formatErrorMessage,
formatToolProgressOutput,
inferToolMetaFromArgs,
@@ -724,9 +726,23 @@ export class CodexAppServerEventProjector {
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
): void {
try {
this.params.onAgentEvent?.(event);
} catch {
emitGlobalAgentEvent({
runId: this.params.runId,
stream: event.stream,
data: event.data,
...(this.params.sessionKey ? { sessionKey: this.params.sessionKey } : {}),
});
} catch (error) {
embeddedAgentLog.debug("codex app-server global agent event emit failed", { error });
}
try {
const maybePromise = this.params.onAgentEvent?.(event);
void Promise.resolve(maybePromise).catch((error: unknown) => {
embeddedAgentLog.debug("codex app-server agent event handler rejected", { error });
});
} catch (error) {
// Downstream event consumers must not corrupt the canonical Codex turn projection.
embeddedAgentLog.debug("codex app-server agent event handler threw", { error });
}
}

View File

@@ -9,6 +9,11 @@ import {
} from "openclaw/plugin-sdk/agent-harness";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { __testing as nativeHookRelayTesting } from "../../../../src/agents/harness/native-hook-relay.js";
import {
onAgentEvent,
resetAgentEventsForTest,
type AgentEventPayload,
} from "../../../../src/infra/agent-events.js";
import {
initializeGlobalHookRunner,
resetGlobalHookRunner,
@@ -276,12 +281,14 @@ function extractRelayIdFromThreadRequest(params: unknown): string {
describe("runCodexAppServerAttempt", () => {
beforeEach(async () => {
resetAgentEventsForTest();
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-run-"));
});
afterEach(async () => {
__testing.resetCodexAppServerClientFactoryForTests();
nativeHookRelayTesting.clearNativeHookRelaysForTests();
resetAgentEventsForTest();
resetGlobalHookRunner();
vi.restoreAllMocks();
await fs.rm(tempDir, { recursive: true, force: true });
@@ -341,6 +348,9 @@ describe("runCodexAppServerAttempt", () => {
const llmInput = vi.fn();
const llmOutput = vi.fn();
const agentEnd = vi.fn();
const onRunAgentEvent = vi.fn();
const globalAgentEvents: AgentEventPayload[] = [];
onAgentEvent((event) => globalAgentEvents.push(event));
initializeGlobalHookRunner(
createMockPluginRegistry([
{ hookName: "llm_input", handler: llmInput },
@@ -354,7 +364,9 @@ describe("runCodexAppServerAttempt", () => {
sessionManager.appendMessage(assistantMessage("existing context", Date.now()));
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
const params = createParams(sessionFile, workspaceDir);
params.onAgentEvent = onRunAgentEvent;
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
await vi.waitFor(() => expect(llmInput).toHaveBeenCalledTimes(1), { interval: 1 });
@@ -391,6 +403,56 @@ describe("runCodexAppServerAttempt", () => {
expect(result.assistantTexts).toEqual(["hello back"]);
await vi.waitFor(() => expect(llmOutput).toHaveBeenCalledTimes(1), { interval: 1 });
await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 });
const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event);
expect(agentEvents).toEqual(
expect.arrayContaining([
{
stream: "lifecycle",
data: expect.objectContaining({
phase: "start",
startedAt: expect.any(Number),
}),
},
{
stream: "assistant",
data: { text: "hello back" },
},
{
stream: "lifecycle",
data: expect.objectContaining({
phase: "end",
startedAt: expect.any(Number),
endedAt: expect.any(Number),
}),
},
]),
);
const startIndex = agentEvents.findIndex(
(event) => event.stream === "lifecycle" && event.data.phase === "start",
);
const assistantIndex = agentEvents.findIndex((event) => event.stream === "assistant");
const endIndex = agentEvents.findIndex(
(event) => event.stream === "lifecycle" && event.data.phase === "end",
);
expect(startIndex).toBeGreaterThanOrEqual(0);
expect(assistantIndex).toBeGreaterThan(startIndex);
expect(endIndex).toBeGreaterThan(assistantIndex);
expect(globalAgentEvents).toEqual(
expect.arrayContaining([
expect.objectContaining({
runId: "run-1",
sessionKey: "agent:main:session-1",
stream: "assistant",
data: { text: "hello back" },
}),
expect.objectContaining({
runId: "run-1",
sessionKey: "agent:main:session-1",
stream: "lifecycle",
data: expect.objectContaining({ phase: "end" }),
}),
]),
);
expect(llmOutput).toHaveBeenCalledWith(
expect.objectContaining({
@@ -530,6 +592,7 @@ describe("runCodexAppServerAttempt", () => {
it("fires agent_end with failure metadata when the codex turn fails", async () => {
const agentEnd = vi.fn();
const onRunAgentEvent = vi.fn();
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "agent_end", handler: agentEnd }]),
);
@@ -537,7 +600,9 @@ describe("runCodexAppServerAttempt", () => {
const workspaceDir = path.join(tempDir, "workspace");
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
const params = createParams(sessionFile, workspaceDir);
params.onAgentEvent = onRunAgentEvent;
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
await harness.notify({
method: "turn/completed",
@@ -556,6 +621,25 @@ describe("runCodexAppServerAttempt", () => {
expect(result.promptError).toBe("codex exploded");
await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 });
const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event);
expect(agentEvents).toEqual(
expect.arrayContaining([
{
stream: "lifecycle",
data: expect.objectContaining({ phase: "start", startedAt: expect.any(Number) }),
},
{
stream: "lifecycle",
data: expect.objectContaining({
phase: "error",
startedAt: expect.any(Number),
endedAt: expect.any(Number),
error: "codex exploded",
}),
},
]),
);
expect(agentEvents.some((event) => event.stream === "assistant")).toBe(false);
expect(agentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: false,

View File

@@ -9,6 +9,7 @@ import {
buildEmbeddedAttemptToolRunContext,
clearActiveEmbeddedRun,
embeddedAgentLog,
emitAgentEvent as emitGlobalAgentEvent,
finalizeHarnessContextEngineTurn,
formatErrorMessage,
isActiveHarnessContextEngine,
@@ -90,13 +91,31 @@ function emitCodexAppServerEvent(
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
): void {
try {
params.onAgentEvent?.(event);
} catch {
emitGlobalAgentEvent({
runId: params.runId,
stream: event.stream,
data: event.data,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
});
} catch (error) {
embeddedAgentLog.debug("codex app-server global agent event emit failed", { error });
}
try {
const maybePromise = params.onAgentEvent?.(event);
void Promise.resolve(maybePromise).catch((error: unknown) => {
embeddedAgentLog.debug("codex app-server agent event handler rejected", { error });
});
} catch (error) {
// Event consumers are observational; they must not abort or strand the
// canonical app-server turn lifecycle.
embeddedAgentLog.debug("codex app-server agent event handler threw", { error });
}
}
function collectTerminalAssistantText(result: EmbeddedRunAttemptResult): string {
return result.assistantTexts.join("\n\n").trim();
}
export async function runCodexAppServerAttempt(
params: EmbeddedRunAttemptParams,
options: {
@@ -335,12 +354,37 @@ export async function runCodexAppServerAttempt(
let userInputBridge: ReturnType<typeof createCodexUserInputBridge> | undefined;
let completed = false;
let timedOut = false;
let lifecycleStarted = false;
let lifecycleTerminalEmitted = false;
let resolveCompletion: (() => void) | undefined;
const completion = new Promise<void>((resolve) => {
resolveCompletion = resolve;
});
let notificationQueue: Promise<void> = Promise.resolve();
const emitLifecycleStart = () => {
emitCodexAppServerEvent(params, {
stream: "lifecycle",
data: { phase: "start", startedAt: attemptStartedAt },
});
lifecycleStarted = true;
};
const emitLifecycleTerminal = (data: Record<string, unknown> & { phase: "end" | "error" }) => {
if (!lifecycleStarted || lifecycleTerminalEmitted) {
return;
}
emitCodexAppServerEvent(params, {
stream: "lifecycle",
data: {
startedAt: attemptStartedAt,
endedAt: Date.now(),
...data,
},
});
lifecycleTerminalEmitted = true;
};
const handleNotification = async (notification: CodexServerNotification) => {
userInputBridge?.handleNotification(notification);
if (!projector || !turnId) {
@@ -536,6 +580,7 @@ export async function runCodexAppServerAttempt(
imagesCount: params.images?.length ?? 0,
});
projector = new CodexAppServerEventProjector(params, thread.threadId, activeTurnId);
emitLifecycleStart();
const activeProjector = projector;
for (const notification of pendingNotifications.splice(0)) {
await enqueueNotification(notification);
@@ -622,6 +667,24 @@ export async function runCodexAppServerAttempt(
threadId: thread.threadId,
turnId: activeTurnId,
});
const terminalAssistantText = collectTerminalAssistantText(result);
if (terminalAssistantText && !finalAborted && !finalPromptError) {
emitCodexAppServerEvent(params, {
stream: "assistant",
data: { text: terminalAssistantText },
});
}
if (finalPromptError) {
emitLifecycleTerminal({
phase: "error",
error: formatErrorMessage(finalPromptError),
});
} else {
emitLifecycleTerminal({
phase: "end",
...(finalAborted ? { aborted: true } : {}),
});
}
if (activeContextEngine) {
const finalMessages =
readMirroredSessionHistoryMessages(params.sessionFile) ??
@@ -684,6 +747,10 @@ export async function runCodexAppServerAttempt(
promptErrorSource: finalPromptErrorSource,
};
} finally {
emitLifecycleTerminal({
phase: "error",
error: "codex app-server run completed without lifecycle terminal event",
});
if (trajectoryRecorder && !trajectoryEndRecorded) {
trajectoryRecorder.recordEvent("session.ended", {
status: timedOut || runAbortController.signal.aborted ? "interrupted" : "cleanup",