From 34e45ecfcc794bf966aa2eac2bead9a9ed1a209b Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 22 Apr 2026 16:19:59 -0700 Subject: [PATCH] feat(codex): add llm lifecycle hooks (#70312) * feat(codex): add llm lifecycle hooks * fix(codex): close llm hook lifecycle gaps * fix(codex): dedupe llm hook context * fix(codex): preserve abort and error hook state --- CHANGELOG.md | 1 + docs/plugins/codex-harness.md | 4 + .../codex/src/app-server/run-attempt.test.ts | 206 ++++++++++++++++++ .../codex/src/app-server/run-attempt.ts | 99 +++++++-- src/agents/harness/lifecycle-hook-helpers.ts | 73 +++++++ src/plugin-sdk/agent-harness.ts | 5 + 6 files changed, 374 insertions(+), 14 deletions(-) create mode 100644 src/agents/harness/lifecycle-hook-helpers.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ca5e709afe8..ca3fc298290 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai - Tokenjuice: add bundled native OpenClaw support for tokenjuice as an opt-in plugin that compacts noisy `exec` and `bash` tool results in Pi embedded runs. (#69946) Thanks @vincentkoc. - Codex harness/hooks: route native Codex app-server turns through `before_prompt_build` and emit `before_compaction` / `after_compaction` for native compaction items so prompt and compaction hooks stop drifting from Pi. Thanks @vincentkoc. - Codex harness/plugins: add a bundled-plugin Codex app-server extension seam for async `tool_result` middleware, fire `after_tool_call` for Codex tool runs, and route mirrored Codex transcript writes through `before_message_write` so tool integrations stop diverging from Pi. Thanks @vincentkoc. +- Codex harness/hooks: fire `llm_input`, `llm_output`, and `agent_end` for native Codex app-server turns so lifecycle hooks stop drifting from Pi. Thanks @vincentkoc. - Providers/Tencent: add the bundled Tencent Cloud provider plugin with TokenHub and Token Plan onboarding, docs, `hy3-preview` model catalog entries, and tiered Hy3 pricing metadata. (#68460) Thanks @JuniperSling. - TUI: add local embedded mode for running terminal chats without a Gateway while keeping plugin approval gates enforced. (#66767) Thanks @fuller-stack-dev. - CLI/Claude: default `claude-cli` runs to warm stdio sessions, including custom configs that omit transport fields, and resume from the stored Claude session after Gateway restarts or idle exits. (#69679) Thanks @obviyus. diff --git a/docs/plugins/codex-harness.md b/docs/plugins/codex-harness.md index ffbce0f9698..04b6113c529 100644 --- a/docs/plugins/codex-harness.md +++ b/docs/plugins/codex-harness.md @@ -20,6 +20,10 @@ approvals, media delivery, and the visible transcript mirror. Native Codex turns also respect the shared `before_prompt_build`, `before_compaction`, and `after_compaction` plugin hooks, so prompt shims and compaction-aware automation can stay aligned with the PI harness. +Native Codex turns also respect the shared `before_prompt_build`, +`before_compaction`, `after_compaction`, `llm_input`, `llm_output`, and +`agent_end` plugin hooks, so prompt shims, compaction-aware automation, and +lifecycle observers can stay aligned with the PI harness. The harness is off by default. It is selected only when the `codex` plugin is enabled and the resolved model is a `codex/*` model, or when you explicitly diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index 9c3f993250f..9b5988ff1d3 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -114,6 +114,9 @@ function createAppServerHarness( }, }); }, + async notify(notification: CodexServerNotification) { + await notify(notification); + }, }; } @@ -238,6 +241,209 @@ describe("runCodexAppServerAttempt", () => { ); }); + it("fires llm_input, llm_output, and agent_end hooks for codex turns", async () => { + const llmInput = vi.fn(); + const llmOutput = vi.fn(); + const agentEnd = vi.fn(); + initializeGlobalHookRunner( + createMockPluginRegistry([ + { hookName: "llm_input", handler: llmInput }, + { hookName: "llm_output", handler: llmOutput }, + { hookName: "agent_end", handler: agentEnd }, + ]), + ); + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const sessionManager = SessionManager.open(sessionFile); + sessionManager.appendMessage(assistantMessage("existing context", Date.now())); + const harness = createStartedThreadHarness(); + + const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)); + await harness.waitForMethod("turn/start"); + await vi.waitFor(() => expect(llmInput).toHaveBeenCalledTimes(1), { interval: 1 }); + + expect(llmInput).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-1", + sessionId: "session-1", + provider: "codex", + model: "gpt-5.4-codex", + prompt: "hello", + imagesCount: 0, + historyMessages: [expect.objectContaining({ role: "assistant" })], + systemPrompt: expect.stringContaining(CODEX_GPT5_BEHAVIOR_CONTRACT), + }), + expect.objectContaining({ + runId: "run-1", + sessionId: "session-1", + sessionKey: "agent:main:session-1", + }), + ); + + await harness.notify({ + method: "item/agentMessage/delta", + params: { + threadId: "thread-1", + turnId: "turn-1", + itemId: "msg-1", + delta: "hello back", + }, + }); + await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" }); + const result = await run; + + expect(result.assistantTexts).toEqual(["hello back"]); + await vi.waitFor(() => expect(llmOutput).toHaveBeenCalledTimes(1), { interval: 1 }); + await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 }); + + expect(llmOutput).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-1", + sessionId: "session-1", + provider: "codex", + model: "gpt-5.4-codex", + assistantTexts: ["hello back"], + lastAssistant: expect.objectContaining({ + role: "assistant", + }), + }), + expect.objectContaining({ + runId: "run-1", + sessionId: "session-1", + }), + ); + expect(agentEnd).toHaveBeenCalledWith( + expect.objectContaining({ + success: true, + messages: expect.arrayContaining([ + expect.objectContaining({ role: "user" }), + expect.objectContaining({ role: "assistant" }), + ]), + }), + expect.objectContaining({ + runId: "run-1", + sessionId: "session-1", + }), + ); + }); + + it("fires agent_end with failure metadata when the codex turn fails", async () => { + const agentEnd = vi.fn(); + initializeGlobalHookRunner( + createMockPluginRegistry([{ hookName: "agent_end", handler: agentEnd }]), + ); + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const harness = createStartedThreadHarness(); + + const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)); + await harness.waitForMethod("turn/start"); + await harness.notify({ + method: "turn/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + turn: { + id: "turn-1", + status: "failed", + error: { message: "codex exploded" }, + }, + }, + }); + + const result = await run; + + expect(result.promptError).toBe("codex exploded"); + await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 }); + expect(agentEnd).toHaveBeenCalledWith( + expect.objectContaining({ + success: false, + error: "codex exploded", + }), + expect.objectContaining({ + runId: "run-1", + sessionId: "session-1", + }), + ); + }); + + it("fires llm_output and agent_end when turn/start fails", async () => { + const llmInput = vi.fn(); + const llmOutput = vi.fn(); + const agentEnd = vi.fn(); + initializeGlobalHookRunner( + createMockPluginRegistry([ + { hookName: "llm_input", handler: llmInput }, + { hookName: "llm_output", handler: llmOutput }, + { hookName: "agent_end", handler: agentEnd }, + ]), + ); + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + SessionManager.open(sessionFile).appendMessage( + assistantMessage("existing context", Date.now()), + ); + createStartedThreadHarness(async (method) => { + if (method === "turn/start") { + throw new Error("turn start exploded"); + } + return undefined; + }); + + await expect(runCodexAppServerAttempt(createParams(sessionFile, workspaceDir))).rejects.toThrow( + "turn start exploded", + ); + + await vi.waitFor(() => expect(llmInput).toHaveBeenCalledTimes(1), { interval: 1 }); + await vi.waitFor(() => expect(llmOutput).toHaveBeenCalledTimes(1), { interval: 1 }); + await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 }); + expect(llmOutput).toHaveBeenCalledWith( + expect.objectContaining({ + assistantTexts: [], + model: "gpt-5.4-codex", + provider: "codex", + runId: "run-1", + sessionId: "session-1", + }), + expect.any(Object), + ); + expect(agentEnd).toHaveBeenCalledWith( + expect.objectContaining({ + success: false, + error: "turn start exploded", + messages: expect.arrayContaining([ + expect.objectContaining({ role: "assistant" }), + expect.objectContaining({ role: "user" }), + ]), + }), + expect.any(Object), + ); + }); + + it("fires agent_end with success false when the codex turn is aborted", async () => { + const agentEnd = vi.fn(); + initializeGlobalHookRunner( + createMockPluginRegistry([{ hookName: "agent_end", handler: agentEnd }]), + ); + const { waitForMethod } = createStartedThreadHarness(); + const run = runCodexAppServerAttempt( + createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")), + ); + + await waitForMethod("turn/start"); + expect(abortAgentHarnessRun("session-1")).toBe(true); + + const result = await run; + expect(result.aborted).toBe(true); + await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 }); + expect(agentEnd).toHaveBeenCalledWith( + expect.objectContaining({ + success: false, + }), + expect.any(Object), + ); + }); + it("forwards queued user input and aborts the active app-server turn", async () => { const { requests, waitForMethod } = createStartedThreadHarness(); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 32981efd324..83dc3b2cf7a 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -5,6 +5,7 @@ import { clearActiveEmbeddedRun, createOpenClawCodingTools, embeddedAgentLog, + formatErrorMessage, isSubagentSessionKey, normalizeProviderToolSchemas, resolveAttemptSpawnWorkspaceDir, @@ -14,6 +15,9 @@ import { resolveSessionAgentIds, resolveUserPath, resolveAgentHarnessBeforePromptBuildResult, + runAgentHarnessAgentEndHook, + runAgentHarnessLlmInputHook, + runAgentHarnessLlmOutputHook, setActiveEmbeddedRun, supportsModelTools, type EmbeddedRunAttemptParams, @@ -51,6 +55,7 @@ export async function runCodexAppServerAttempt( params: EmbeddedRunAttemptParams, options: { pluginConfig?: unknown; startupTimeoutFloorMs?: number } = {}, ): Promise { + const attemptStartedAt = Date.now(); const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig: options.pluginConfig }); const resolvedWorkspace = resolveUserPath(params.workspaceDir); await fs.mkdir(resolvedWorkspace, { recursive: true }); @@ -108,20 +113,21 @@ export async function runCodexAppServerAttempt( }, }); const historyMessages = readMirroredSessionHistoryMessages(params.sessionFile); + const hookContext = { + runId: params.runId, + agentId: sessionAgentId, + sessionKey: sandboxSessionKey, + sessionId: params.sessionId, + workspaceDir: params.workspaceDir, + messageProvider: params.messageProvider ?? undefined, + trigger: params.trigger, + channelId: params.messageChannel ?? params.messageProvider ?? undefined, + }; const promptBuild = await resolveAgentHarnessBeforePromptBuildResult({ prompt: params.prompt, developerInstructions: buildDeveloperInstructions(params), messages: historyMessages, - ctx: { - runId: params.runId, - agentId: sessionAgentId, - sessionKey: params.sessionKey, - sessionId: params.sessionId, - workspaceDir: params.workspaceDir, - messageProvider: params.messageProvider ?? undefined, - trigger: params.trigger, - channelId: params.messageChannel ?? params.messageProvider ?? undefined, - }, + ctx: hookContext, }); let client: CodexAppServerClient; let thread: CodexAppServerThreadBinding; @@ -217,8 +223,30 @@ export async function runCodexAppServerAttempt( return toolBridge.handleToolCall(call) as Promise; }); + const llmInputEvent = { + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: params.modelId, + systemPrompt: promptBuild.developerInstructions, + prompt: promptBuild.prompt, + historyMessages, + imagesCount: params.images?.length ?? 0, + }; + const turnStartFailureMessages = [ + ...historyMessages, + { + role: "user", + content: [{ type: "text", text: promptBuild.prompt }], + }, + ]; + let turn: CodexTurnStartResponse; try { + runAgentHarnessLlmInputHook({ + event: llmInputEvent, + ctx: hookContext, + }); turn = await client.request( "turn/start", buildTurnStartParams(params, { @@ -230,6 +258,25 @@ export async function runCodexAppServerAttempt( { timeoutMs: params.timeoutMs, signal: runAbortController.signal }, ); } catch (error) { + runAgentHarnessLlmOutputHook({ + event: { + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: params.modelId, + assistantTexts: [], + }, + ctx: hookContext, + }); + runAgentHarnessAgentEndHook({ + event: { + messages: turnStartFailureMessages, + success: false, + error: formatErrorMessage(error), + durationMs: Date.now() - attemptStartedAt, + }, + ctx: hookContext, + }); notificationCleanup(); requestCleanup(); params.abortSignal?.removeEventListener("abort", abortFromUpstream); @@ -283,6 +330,9 @@ export async function runCodexAppServerAttempt( try { await completion; const result = activeProjector.buildResult(toolBridge.telemetry, { yieldDetected }); + const finalAborted = result.aborted || runAbortController.signal.aborted; + const finalPromptError = timedOut ? "codex app-server attempt timed out" : result.promptError; + const finalPromptErrorSource = timedOut ? "prompt" : result.promptErrorSource; await mirrorTranscriptBestEffort({ params, agentId: sessionAgentId, @@ -291,12 +341,33 @@ export async function runCodexAppServerAttempt( threadId: thread.threadId, turnId: activeTurnId, }); + runAgentHarnessLlmOutputHook({ + event: { + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: params.modelId, + assistantTexts: result.assistantTexts, + ...(result.lastAssistant ? { lastAssistant: result.lastAssistant } : {}), + ...(result.attemptUsage ? { usage: result.attemptUsage } : {}), + }, + ctx: hookContext, + }); + runAgentHarnessAgentEndHook({ + event: { + messages: result.messagesSnapshot, + success: !finalAborted && !finalPromptError, + ...(finalPromptError ? { error: formatErrorMessage(finalPromptError) } : {}), + durationMs: Date.now() - attemptStartedAt, + }, + ctx: hookContext, + }); return { ...result, timedOut, - aborted: result.aborted || runAbortController.signal.aborted, - promptError: timedOut ? "codex app-server attempt timed out" : result.promptError, - promptErrorSource: timedOut ? "prompt" : result.promptErrorSource, + aborted: finalAborted, + promptError: finalPromptError, + promptErrorSource: finalPromptErrorSource, }; } finally { clearTimeout(timeout); @@ -512,7 +583,7 @@ function readMirroredSessionHistoryMessages(sessionFile: string): unknown[] { try { return SessionManager.open(sessionFile).buildSessionContext().messages; } catch (error) { - embeddedAgentLog.warn("failed to read mirrored session history for codex prompt hooks", { + embeddedAgentLog.warn("failed to read mirrored session history for codex harness hooks", { error, sessionFile, }); diff --git a/src/agents/harness/lifecycle-hook-helpers.ts b/src/agents/harness/lifecycle-hook-helpers.ts new file mode 100644 index 00000000000..792c29151b7 --- /dev/null +++ b/src/agents/harness/lifecycle-hook-helpers.ts @@ -0,0 +1,73 @@ +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; +import type { + PluginHookAgentEndEvent, + PluginHookAgentContext, + PluginHookLlmInputEvent, + PluginHookLlmOutputEvent, +} from "../../plugins/hook-types.js"; + +const log = createSubsystemLogger("agents/harness"); + +type AgentHarnessHookContext = { + runId: string; + agentId?: string; + sessionKey?: string; + sessionId?: string; + workspaceDir?: string; + messageProvider?: string; + trigger?: string; + channelId?: string; +}; + +function buildAgentHookContext(params: AgentHarnessHookContext): PluginHookAgentContext { + return { + runId: params.runId, + ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + ...(params.sessionId ? { sessionId: params.sessionId } : {}), + ...(params.workspaceDir ? { workspaceDir: params.workspaceDir } : {}), + ...(params.messageProvider ? { messageProvider: params.messageProvider } : {}), + ...(params.trigger ? { trigger: params.trigger } : {}), + ...(params.channelId ? { channelId: params.channelId } : {}), + }; +} + +export function runAgentHarnessLlmInputHook(params: { + event: PluginHookLlmInputEvent; + ctx: AgentHarnessHookContext; +}): void { + const hookRunner = getGlobalHookRunner(); + if (!hookRunner?.hasHooks("llm_input")) { + return; + } + void hookRunner.runLlmInput(params.event, buildAgentHookContext(params.ctx)).catch((error) => { + log.warn(`llm_input hook failed: ${String(error)}`); + }); +} + +export function runAgentHarnessLlmOutputHook(params: { + event: PluginHookLlmOutputEvent; + ctx: AgentHarnessHookContext; +}): void { + const hookRunner = getGlobalHookRunner(); + if (!hookRunner?.hasHooks("llm_output")) { + return; + } + void hookRunner.runLlmOutput(params.event, buildAgentHookContext(params.ctx)).catch((error) => { + log.warn(`llm_output hook failed: ${String(error)}`); + }); +} + +export function runAgentHarnessAgentEndHook(params: { + event: PluginHookAgentEndEvent; + ctx: AgentHarnessHookContext; +}): void { + const hookRunner = getGlobalHookRunner(); + if (!hookRunner?.hasHooks("agent_end")) { + return; + } + void hookRunner.runAgentEnd(params.event, buildAgentHookContext(params.ctx)).catch((error) => { + log.warn(`agent_end hook failed: ${String(error)}`); + }); +} diff --git a/src/plugin-sdk/agent-harness.ts b/src/plugin-sdk/agent-harness.ts index 032b811b6b1..ed732a28d1b 100644 --- a/src/plugin-sdk/agent-harness.ts +++ b/src/plugin-sdk/agent-harness.ts @@ -71,3 +71,8 @@ export { runAgentHarnessAfterToolCallHook, runAgentHarnessBeforeMessageWriteHook, } from "../agents/harness/hook-helpers.js"; +export { + runAgentHarnessAgentEndHook, + runAgentHarnessLlmInputHook, + runAgentHarnessLlmOutputHook, +} from "../agents/harness/lifecycle-hook-helpers.js";