feat(codex): add llm lifecycle hooks (#70312)

* feat(codex): add llm lifecycle hooks

* fix(codex): close llm hook lifecycle gaps

* fix(codex): dedupe llm hook context

* fix(codex): preserve abort and error hook state
This commit is contained in:
Vincent Koc
2026-04-22 16:19:59 -07:00
committed by GitHub
parent a5128777ee
commit 34e45ecfcc
6 changed files with 374 additions and 14 deletions

View File

@@ -114,6 +114,9 @@ function createAppServerHarness(
},
});
},
async notify(notification: CodexServerNotification) {
await notify(notification);
},
};
}
@@ -238,6 +241,209 @@ describe("runCodexAppServerAttempt", () => {
);
});
it("fires llm_input, llm_output, and agent_end hooks for codex turns", async () => {
const llmInput = vi.fn();
const llmOutput = vi.fn();
const agentEnd = vi.fn();
initializeGlobalHookRunner(
createMockPluginRegistry([
{ hookName: "llm_input", handler: llmInput },
{ hookName: "llm_output", handler: llmOutput },
{ hookName: "agent_end", handler: agentEnd },
]),
);
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(assistantMessage("existing context", Date.now()));
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
await harness.waitForMethod("turn/start");
await vi.waitFor(() => expect(llmInput).toHaveBeenCalledTimes(1), { interval: 1 });
expect(llmInput).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
provider: "codex",
model: "gpt-5.4-codex",
prompt: "hello",
imagesCount: 0,
historyMessages: [expect.objectContaining({ role: "assistant" })],
systemPrompt: expect.stringContaining(CODEX_GPT5_BEHAVIOR_CONTRACT),
}),
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
}),
);
await harness.notify({
method: "item/agentMessage/delta",
params: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "msg-1",
delta: "hello back",
},
});
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
const result = await run;
expect(result.assistantTexts).toEqual(["hello back"]);
await vi.waitFor(() => expect(llmOutput).toHaveBeenCalledTimes(1), { interval: 1 });
await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 });
expect(llmOutput).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
provider: "codex",
model: "gpt-5.4-codex",
assistantTexts: ["hello back"],
lastAssistant: expect.objectContaining({
role: "assistant",
}),
}),
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
}),
);
expect(agentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: true,
messages: expect.arrayContaining([
expect.objectContaining({ role: "user" }),
expect.objectContaining({ role: "assistant" }),
]),
}),
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
}),
);
});
it("fires agent_end with failure metadata when the codex turn fails", async () => {
const agentEnd = vi.fn();
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "agent_end", handler: agentEnd }]),
);
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
await harness.waitForMethod("turn/start");
await harness.notify({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: {
id: "turn-1",
status: "failed",
error: { message: "codex exploded" },
},
},
});
const result = await run;
expect(result.promptError).toBe("codex exploded");
await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 });
expect(agentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: false,
error: "codex exploded",
}),
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
}),
);
});
it("fires llm_output and agent_end when turn/start fails", async () => {
const llmInput = vi.fn();
const llmOutput = vi.fn();
const agentEnd = vi.fn();
initializeGlobalHookRunner(
createMockPluginRegistry([
{ hookName: "llm_input", handler: llmInput },
{ hookName: "llm_output", handler: llmOutput },
{ hookName: "agent_end", handler: agentEnd },
]),
);
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
SessionManager.open(sessionFile).appendMessage(
assistantMessage("existing context", Date.now()),
);
createStartedThreadHarness(async (method) => {
if (method === "turn/start") {
throw new Error("turn start exploded");
}
return undefined;
});
await expect(runCodexAppServerAttempt(createParams(sessionFile, workspaceDir))).rejects.toThrow(
"turn start exploded",
);
await vi.waitFor(() => expect(llmInput).toHaveBeenCalledTimes(1), { interval: 1 });
await vi.waitFor(() => expect(llmOutput).toHaveBeenCalledTimes(1), { interval: 1 });
await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 });
expect(llmOutput).toHaveBeenCalledWith(
expect.objectContaining({
assistantTexts: [],
model: "gpt-5.4-codex",
provider: "codex",
runId: "run-1",
sessionId: "session-1",
}),
expect.any(Object),
);
expect(agentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: false,
error: "turn start exploded",
messages: expect.arrayContaining([
expect.objectContaining({ role: "assistant" }),
expect.objectContaining({ role: "user" }),
]),
}),
expect.any(Object),
);
});
it("fires agent_end with success false when the codex turn is aborted", async () => {
const agentEnd = vi.fn();
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "agent_end", handler: agentEnd }]),
);
const { waitForMethod } = createStartedThreadHarness();
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
);
await waitForMethod("turn/start");
expect(abortAgentHarnessRun("session-1")).toBe(true);
const result = await run;
expect(result.aborted).toBe(true);
await vi.waitFor(() => expect(agentEnd).toHaveBeenCalledTimes(1), { interval: 1 });
expect(agentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: false,
}),
expect.any(Object),
);
});
it("forwards queued user input and aborts the active app-server turn", async () => {
const { requests, waitForMethod } = createStartedThreadHarness();

View File

@@ -5,6 +5,7 @@ import {
clearActiveEmbeddedRun,
createOpenClawCodingTools,
embeddedAgentLog,
formatErrorMessage,
isSubagentSessionKey,
normalizeProviderToolSchemas,
resolveAttemptSpawnWorkspaceDir,
@@ -14,6 +15,9 @@ import {
resolveSessionAgentIds,
resolveUserPath,
resolveAgentHarnessBeforePromptBuildResult,
runAgentHarnessAgentEndHook,
runAgentHarnessLlmInputHook,
runAgentHarnessLlmOutputHook,
setActiveEmbeddedRun,
supportsModelTools,
type EmbeddedRunAttemptParams,
@@ -51,6 +55,7 @@ export async function runCodexAppServerAttempt(
params: EmbeddedRunAttemptParams,
options: { pluginConfig?: unknown; startupTimeoutFloorMs?: number } = {},
): Promise<EmbeddedRunAttemptResult> {
const attemptStartedAt = Date.now();
const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig: options.pluginConfig });
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
await fs.mkdir(resolvedWorkspace, { recursive: true });
@@ -108,20 +113,21 @@ export async function runCodexAppServerAttempt(
},
});
const historyMessages = readMirroredSessionHistoryMessages(params.sessionFile);
const hookContext = {
runId: params.runId,
agentId: sessionAgentId,
sessionKey: sandboxSessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
};
const promptBuild = await resolveAgentHarnessBeforePromptBuildResult({
prompt: params.prompt,
developerInstructions: buildDeveloperInstructions(params),
messages: historyMessages,
ctx: {
runId: params.runId,
agentId: sessionAgentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
},
ctx: hookContext,
});
let client: CodexAppServerClient;
let thread: CodexAppServerThreadBinding;
@@ -217,8 +223,30 @@ export async function runCodexAppServerAttempt(
return toolBridge.handleToolCall(call) as Promise<JsonValue>;
});
const llmInputEvent = {
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: params.modelId,
systemPrompt: promptBuild.developerInstructions,
prompt: promptBuild.prompt,
historyMessages,
imagesCount: params.images?.length ?? 0,
};
const turnStartFailureMessages = [
...historyMessages,
{
role: "user",
content: [{ type: "text", text: promptBuild.prompt }],
},
];
let turn: CodexTurnStartResponse;
try {
runAgentHarnessLlmInputHook({
event: llmInputEvent,
ctx: hookContext,
});
turn = await client.request<CodexTurnStartResponse>(
"turn/start",
buildTurnStartParams(params, {
@@ -230,6 +258,25 @@ export async function runCodexAppServerAttempt(
{ timeoutMs: params.timeoutMs, signal: runAbortController.signal },
);
} catch (error) {
runAgentHarnessLlmOutputHook({
event: {
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: params.modelId,
assistantTexts: [],
},
ctx: hookContext,
});
runAgentHarnessAgentEndHook({
event: {
messages: turnStartFailureMessages,
success: false,
error: formatErrorMessage(error),
durationMs: Date.now() - attemptStartedAt,
},
ctx: hookContext,
});
notificationCleanup();
requestCleanup();
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
@@ -283,6 +330,9 @@ export async function runCodexAppServerAttempt(
try {
await completion;
const result = activeProjector.buildResult(toolBridge.telemetry, { yieldDetected });
const finalAborted = result.aborted || runAbortController.signal.aborted;
const finalPromptError = timedOut ? "codex app-server attempt timed out" : result.promptError;
const finalPromptErrorSource = timedOut ? "prompt" : result.promptErrorSource;
await mirrorTranscriptBestEffort({
params,
agentId: sessionAgentId,
@@ -291,12 +341,33 @@ export async function runCodexAppServerAttempt(
threadId: thread.threadId,
turnId: activeTurnId,
});
runAgentHarnessLlmOutputHook({
event: {
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: params.modelId,
assistantTexts: result.assistantTexts,
...(result.lastAssistant ? { lastAssistant: result.lastAssistant } : {}),
...(result.attemptUsage ? { usage: result.attemptUsage } : {}),
},
ctx: hookContext,
});
runAgentHarnessAgentEndHook({
event: {
messages: result.messagesSnapshot,
success: !finalAborted && !finalPromptError,
...(finalPromptError ? { error: formatErrorMessage(finalPromptError) } : {}),
durationMs: Date.now() - attemptStartedAt,
},
ctx: hookContext,
});
return {
...result,
timedOut,
aborted: result.aborted || runAbortController.signal.aborted,
promptError: timedOut ? "codex app-server attempt timed out" : result.promptError,
promptErrorSource: timedOut ? "prompt" : result.promptErrorSource,
aborted: finalAborted,
promptError: finalPromptError,
promptErrorSource: finalPromptErrorSource,
};
} finally {
clearTimeout(timeout);
@@ -512,7 +583,7 @@ function readMirroredSessionHistoryMessages(sessionFile: string): unknown[] {
try {
return SessionManager.open(sessionFile).buildSessionContext().messages;
} catch (error) {
embeddedAgentLog.warn("failed to read mirrored session history for codex prompt hooks", {
embeddedAgentLog.warn("failed to read mirrored session history for codex harness hooks", {
error,
sessionFile,
});