feat(codex): add prompt and compaction hooks (#70313)

* feat(codex): add prompt and compaction hooks

* fix(codex): clean prompt and compaction hook tests
This commit is contained in:
Vincent Koc
2026-04-22 15:56:08 -07:00
committed by GitHub
parent ac8495adaa
commit e8b56a9928
9 changed files with 475 additions and 37 deletions

View File

@@ -1,5 +1,14 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness";
import { describe, expect, it, vi } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
initializeGlobalHookRunner,
resetGlobalHookRunner,
} from "../../../../src/plugins/hook-runner-global.js";
import { createMockPluginRegistry } from "../../../../src/plugins/hooks.test-helpers.js";
import {
CodexAppServerEventProjector,
type CodexAppServerToolTelemetry,
@@ -8,36 +17,87 @@ import { createCodexTestModel } from "./test-support.js";
const THREAD_ID = "thread-1";
const TURN_ID = "turn-1";
const tempDirs = new Set<string>();
type ProjectorNotification = Parameters<CodexAppServerEventProjector["handleNotification"]>[0];
function createParams(): EmbeddedRunAttemptParams {
function assistantMessage(text: string, timestamp: number) {
return {
role: "assistant" as const,
content: [{ type: "text" as const, text }],
api: "openai-codex-responses",
provider: "openai-codex",
model: "gpt-5.4-codex",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop" as const,
timestamp,
};
}
async function createParams(): Promise<EmbeddedRunAttemptParams> {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-projector-"));
tempDirs.add(tempDir);
const sessionFile = path.join(tempDir, "session.jsonl");
SessionManager.open(sessionFile).appendMessage(assistantMessage("history", Date.now()));
return {
prompt: "hello",
sessionId: "session-1",
sessionFile,
workspaceDir: tempDir,
runId: "run-1",
provider: "openai-codex",
modelId: "gpt-5.4-codex",
model: createCodexTestModel(),
thinkLevel: "medium",
} as unknown as EmbeddedRunAttemptParams;
} as EmbeddedRunAttemptParams;
}
function createProjector(params = createParams()): CodexAppServerEventProjector {
return new CodexAppServerEventProjector(params, THREAD_ID, TURN_ID);
async function createProjector(
params?: EmbeddedRunAttemptParams,
): Promise<CodexAppServerEventProjector> {
const resolvedParams = params ?? (await createParams());
return new CodexAppServerEventProjector(resolvedParams, THREAD_ID, TURN_ID);
}
function createProjectorWithAssistantHooks() {
async function createProjectorWithAssistantHooks() {
const onAssistantMessageStart = vi.fn();
const onPartialReply = vi.fn();
return {
const params = await createParams();
const projector = await createProjector({
...params,
onAssistantMessageStart,
onPartialReply,
projector: createProjector({
...createParams(),
onAssistantMessageStart,
onPartialReply,
}),
};
});
return { onAssistantMessageStart, onPartialReply, projector };
}
afterEach(async () => {
resetGlobalHookRunner();
vi.restoreAllMocks();
for (const tempDir of tempDirs) {
await fs.rm(tempDir, { recursive: true, force: true });
}
tempDirs.clear();
});
async function createProjectorWithHooks() {
const beforeCompaction = vi.fn();
const afterCompaction = vi.fn();
initializeGlobalHookRunner(
createMockPluginRegistry([
{ hookName: "before_compaction", handler: beforeCompaction },
{ hookName: "after_compaction", handler: afterCompaction },
]),
);
const projector = await createProjector();
return { projector, beforeCompaction, afterCompaction };
}
function buildEmptyToolTelemetry(): CodexAppServerToolTelemetry {
@@ -72,7 +132,7 @@ function turnCompleted(items: unknown[] = []): ProjectorNotification {
describe("CodexAppServerEventProjector", () => {
it("projects assistant deltas and usage into embedded attempt results", async () => {
const { onAssistantMessageStart, onPartialReply, projector } =
createProjectorWithAssistantHooks();
await createProjectorWithAssistantHooks();
await projector.handleNotification(agentMessageDelta("hel"));
await projector.handleNotification(agentMessageDelta("lo"));
@@ -116,7 +176,7 @@ describe("CodexAppServerEventProjector", () => {
});
it("does not treat cumulative-only token usage as fresh context usage", async () => {
const projector = createProjector();
const projector = await createProjector();
await projector.handleNotification(agentMessageDelta("done"));
await projector.handleNotification(
@@ -145,7 +205,7 @@ describe("CodexAppServerEventProjector", () => {
});
it("normalizes snake_case current token usage fields", async () => {
const projector = createProjector();
const projector = await createProjector();
await projector.handleNotification(agentMessageDelta("done"));
await projector.handleNotification(
@@ -175,7 +235,7 @@ describe("CodexAppServerEventProjector", () => {
it("keeps intermediate agentMessage items out of the final visible reply", async () => {
const { onAssistantMessageStart, onPartialReply, projector } =
createProjectorWithAssistantHooks();
await createProjectorWithAssistantHooks();
await projector.handleNotification(
agentMessageDelta(
@@ -221,7 +281,7 @@ describe("CodexAppServerEventProjector", () => {
});
it("ignores notifications for other turns", async () => {
const projector = createProjector();
const projector = await createProjector();
await projector.handleNotification({
method: "item/agentMessage/delta",
@@ -233,7 +293,21 @@ describe("CodexAppServerEventProjector", () => {
});
it("preserves sessions_yield detection in attempt results", () => {
const projector = createProjector();
const projector = new CodexAppServerEventProjector(
{
prompt: "hello",
sessionId: "session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
runId: "run-1",
provider: "openai-codex",
modelId: "gpt-5.4-codex",
model: createCodexTestModel(),
thinkLevel: "medium",
} as EmbeddedRunAttemptParams,
THREAD_ID,
TURN_ID,
);
const result = projector.buildResult(buildEmptyToolTelemetry(), { yieldDetected: true });
@@ -245,12 +319,12 @@ describe("CodexAppServerEventProjector", () => {
const onReasoningEnd = vi.fn();
const onAgentEvent = vi.fn();
const params = {
...createParams(),
...(await createParams()),
onReasoningStream,
onReasoningEnd,
onAgentEvent,
};
const projector = createProjector(params);
const projector = await createProjector(params);
await projector.handleNotification(
forCurrentTurn("item/reasoning/textDelta", { itemId: "reason-1", delta: "thinking" }),
@@ -319,8 +393,8 @@ describe("CodexAppServerEventProjector", () => {
const onAgentEvent = vi.fn(() => {
throw new Error("consumer failed");
});
const projector = createProjector({
...createParams(),
const projector = await createProjector({
...(await createParams()),
onAgentEvent,
});
@@ -344,4 +418,42 @@ describe("CodexAppServerEventProjector", () => {
expect(result.assistantTexts).toEqual(["final answer"]);
expect(JSON.stringify(result.messagesSnapshot)).toContain("Codex plan");
});
it("fires before_compaction and after_compaction hooks for codex compaction items", async () => {
const { projector, beforeCompaction, afterCompaction } = await createProjectorWithHooks();
await projector.handleNotification(
forCurrentTurn("item/started", {
item: { type: "contextCompaction", id: "compact-1" },
}),
);
await projector.handleNotification(
forCurrentTurn("item/completed", {
item: { type: "contextCompaction", id: "compact-1" },
}),
);
expect(beforeCompaction).toHaveBeenCalledWith(
expect.objectContaining({
messageCount: 1,
sessionFile: expect.stringContaining("session.jsonl"),
messages: [expect.objectContaining({ role: "assistant" })],
}),
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
}),
);
expect(afterCompaction).toHaveBeenCalledWith(
expect.objectContaining({
messageCount: 1,
compactedCount: -1,
sessionFile: expect.stringContaining("session.jsonl"),
}),
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
}),
);
});
});

View File

@@ -1,9 +1,11 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, Usage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
formatErrorMessage,
normalizeUsage,
type NormalizedUsage,
runAgentHarnessAfterCompactionHook,
runAgentHarnessBeforeCompactionHook,
type EmbeddedRunAttemptParams,
type EmbeddedRunAttemptResult,
type MessagingToolSend,
@@ -67,7 +69,7 @@ export class CodexAppServerEventProjector {
private promptError: unknown;
private promptErrorSource: EmbeddedRunAttemptResult["promptErrorSource"] = null;
private aborted = false;
private tokenUsage: NormalizedUsage | undefined;
private tokenUsage: ReturnType<typeof normalizeUsage>;
private guardianReviewCount = 0;
private completedCompactionCount = 0;
@@ -98,10 +100,10 @@ export class CodexAppServerEventProjector {
this.handleTurnPlanUpdated(params);
break;
case "item/started":
this.handleItemStarted(params);
await this.handleItemStarted(params);
break;
case "item/completed":
this.handleItemCompleted(params);
await this.handleItemCompleted(params);
break;
case "item/autoApprovalReview/started":
case "item/autoApprovalReview/completed":
@@ -271,7 +273,7 @@ export class CodexAppServerEventProjector {
});
}
private handleItemStarted(params: JsonObject): void {
private async handleItemStarted(params: JsonObject): Promise<void> {
const item = readItem(params.item);
const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id");
if (itemId) {
@@ -279,6 +281,20 @@ export class CodexAppServerEventProjector {
}
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.add(itemId);
await runAgentHarnessBeforeCompactionHook({
sessionFile: this.params.sessionFile,
messages: this.readMirroredSessionMessages(),
ctx: {
runId: this.params.runId,
agentId: this.params.agentId,
sessionKey: this.params.sessionKey,
sessionId: this.params.sessionId,
workspaceDir: this.params.workspaceDir,
messageProvider: this.params.messageProvider ?? undefined,
trigger: this.params.trigger,
channelId: this.params.messageChannel ?? this.params.messageProvider ?? undefined,
},
});
this.emitAgentEvent({
stream: "compaction",
data: {
@@ -297,7 +313,7 @@ export class CodexAppServerEventProjector {
});
}
private handleItemCompleted(params: JsonObject): void {
private async handleItemCompleted(params: JsonObject): Promise<void> {
const item = readItem(params.item);
const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id");
if (itemId) {
@@ -315,6 +331,21 @@ export class CodexAppServerEventProjector {
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.delete(itemId);
this.completedCompactionCount += 1;
await runAgentHarnessAfterCompactionHook({
sessionFile: this.params.sessionFile,
messages: this.readMirroredSessionMessages(),
compactedCount: -1,
ctx: {
runId: this.params.runId,
agentId: this.params.agentId,
sessionKey: this.params.sessionKey,
sessionId: this.params.sessionId,
workspaceDir: this.params.workspaceDir,
messageProvider: this.params.messageProvider ?? undefined,
trigger: this.params.trigger,
channelId: this.params.messageChannel ?? this.params.messageProvider ?? undefined,
},
});
this.emitAgentEvent({
stream: "compaction",
data: {
@@ -476,6 +507,14 @@ export class CodexAppServerEventProjector {
this.assistantItemOrder.push(itemId);
}
private readMirroredSessionMessages(): AgentMessage[] {
try {
return SessionManager.open(this.params.sessionFile).buildSessionContext().messages;
} catch {
return [];
}
}
private createAssistantMessage(text: string): AssistantMessage {
const usage: Usage = this.tokenUsage
? {
@@ -563,7 +602,7 @@ function readNumberAlias(record: JsonObject, keys: readonly string[]): number |
return undefined;
}
function normalizeCodexTokenUsage(record: JsonObject): NormalizedUsage | undefined {
function normalizeCodexTokenUsage(record: JsonObject): ReturnType<typeof normalizeUsage> {
return normalizeUsage({
input: readNumberAlias(record, ["inputTokens", "input_tokens", "input", "promptTokens"]),
output: readNumberAlias(record, ["outputTokens", "output_tokens", "output"]),

View File

@@ -1,12 +1,18 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
abortAgentHarnessRun,
queueAgentHarnessMessage,
type EmbeddedRunAttemptParams,
} from "openclaw/plugin-sdk/agent-harness";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
initializeGlobalHookRunner,
resetGlobalHookRunner,
} from "../../../../src/plugins/hook-runner-global.js";
import { createMockPluginRegistry } from "../../../../src/plugins/hooks.test-helpers.js";
import { CODEX_GPT5_BEHAVIOR_CONTRACT } from "../../prompt-overlay.js";
import type { CodexServerNotification } from "./protocol.js";
import { runCodexAppServerAttempt, __testing } from "./run-attempt.js";
@@ -47,6 +53,26 @@ function turnStartResult(turnId = "turn-1", status = "inProgress") {
return { turn: { id: turnId, status } };
}
function assistantMessage(text: string, timestamp: number) {
return {
role: "assistant" as const,
content: [{ type: "text" as const, text }],
api: "openai-codex-responses",
provider: "openai-codex",
model: "gpt-5.4-codex",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop" as const,
timestamp,
};
}
function createAppServerHarness(
requestImpl: (method: string, params: unknown) => Promise<unknown>,
options: { onStart?: (authProfileId: string | undefined) => void } = {},
@@ -157,10 +183,61 @@ describe("runCodexAppServerAttempt", () => {
afterEach(async () => {
__testing.resetCodexAppServerClientFactoryForTests();
resetGlobalHookRunner();
vi.restoreAllMocks();
await fs.rm(tempDir, { recursive: true, force: true });
});
it("applies before_prompt_build to Codex developer instructions and turn input", async () => {
const beforePromptBuild = vi.fn(async () => ({
systemPrompt: "custom codex system",
prependSystemContext: "pre system",
appendSystemContext: "post system",
prependContext: "queued context",
}));
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "before_prompt_build", handler: beforePromptBuild }]),
);
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(assistantMessage("previous turn", Date.now()));
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
await harness.waitForMethod("turn/start");
await new Promise<void>((resolve) => setImmediate(resolve));
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
expect(beforePromptBuild).toHaveBeenCalledWith(
{
prompt: "hello",
messages: [expect.objectContaining({ role: "assistant" })],
},
expect.objectContaining({
runId: "run-1",
sessionId: "session-1",
}),
);
expect(harness.requests).toEqual(
expect.arrayContaining([
{
method: "thread/start",
params: expect.objectContaining({
developerInstructions: expect.stringContaining("pre system\n\ncustom codex system"),
}),
},
{
method: "turn/start",
params: expect.objectContaining({
input: [{ type: "text", text: "queued context\n\nhello" }],
}),
},
]),
);
});
it("forwards queued user input and aborts the active app-server turn", async () => {
const { requests, waitForMethod } = createStartedThreadHarness();

View File

@@ -1,4 +1,5 @@
import fs from "node:fs/promises";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
buildEmbeddedAttemptToolRunContext,
clearActiveEmbeddedRun,
@@ -12,6 +13,7 @@ import {
resolveSandboxContext,
resolveSessionAgentIds,
resolveUserPath,
resolveAgentHarnessBeforePromptBuildResult,
setActiveEmbeddedRun,
supportsModelTools,
type EmbeddedRunAttemptParams,
@@ -36,7 +38,11 @@ import {
} from "./protocol.js";
import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js";
import { clearSharedCodexAppServerClient } from "./shared-client.js";
import { buildTurnStartParams, startOrResumeThread } from "./thread-lifecycle.js";
import {
buildDeveloperInstructions,
buildTurnStartParams,
startOrResumeThread,
} from "./thread-lifecycle.js";
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
let clientFactory = defaultCodexAppServerClientFactory;
@@ -95,6 +101,22 @@ export async function runCodexAppServerAttempt(
tools,
signal: runAbortController.signal,
});
const historyMessages = readMirroredSessionHistoryMessages(params.sessionFile);
const promptBuild = await resolveAgentHarnessBeforePromptBuildResult({
prompt: params.prompt,
developerInstructions: buildDeveloperInstructions(params),
messages: historyMessages,
ctx: {
runId: params.runId,
agentId: sessionAgentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
},
});
let client: CodexAppServerClient;
let thread: CodexAppServerThreadBinding;
try {
@@ -110,6 +132,7 @@ export async function runCodexAppServerAttempt(
cwd: effectiveWorkspace,
dynamicTools: toolBridge.specs,
appServer,
developerInstructions: promptBuild.developerInstructions,
});
return { client: startupClient, thread: startupThread };
},
@@ -196,6 +219,7 @@ export async function runCodexAppServerAttempt(
threadId: thread.threadId,
cwd: effectiveWorkspace,
appServer,
promptText: promptBuild.prompt,
}),
{ timeoutMs: params.timeoutMs, signal: runAbortController.signal },
);
@@ -476,6 +500,18 @@ function readString(record: JsonObject, key: string): string | undefined {
return typeof value === "string" ? value : undefined;
}
function readMirroredSessionHistoryMessages(sessionFile: string): unknown[] {
try {
return SessionManager.open(sessionFile).buildSessionContext().messages;
} catch (error) {
embeddedAgentLog.warn("failed to read mirrored session history for codex prompt hooks", {
error,
sessionFile,
});
return [];
}
}
async function mirrorTranscriptBestEffort(params: {
params: EmbeddedRunAttemptParams;
result: EmbeddedRunAttemptResult;

View File

@@ -25,6 +25,7 @@ export async function startOrResumeThread(params: {
cwd: string;
dynamicTools: JsonValue[];
appServer: CodexAppServerRuntimeOptions;
developerInstructions?: string;
}): Promise<CodexAppServerThreadBinding> {
const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools);
const binding = await readCodexAppServerBinding(params.params.sessionFile);
@@ -49,6 +50,7 @@ export async function startOrResumeThread(params: {
buildThreadResumeParams(params.params, {
threadId: binding.threadId,
appServer: params.appServer,
developerInstructions: params.developerInstructions,
}),
);
const boundAuthProfileId = params.params.authProfileId ?? binding.authProfileId;
@@ -88,7 +90,8 @@ export async function startOrResumeThread(params: {
sandbox: params.appServer.sandbox,
...(params.appServer.serviceTier ? { serviceTier: params.appServer.serviceTier } : {}),
serviceName: "OpenClaw",
developerInstructions: buildDeveloperInstructions(params.params),
developerInstructions:
params.developerInstructions ?? buildDeveloperInstructions(params.params),
dynamicTools: params.dynamicTools,
experimentalRawEvents: true,
persistExtendedHistory: true,
@@ -122,6 +125,7 @@ export function buildThreadResumeParams(
options: {
threadId: string;
appServer: CodexAppServerRuntimeOptions;
developerInstructions?: string;
},
): CodexThreadResumeParams {
return {
@@ -132,7 +136,7 @@ export function buildThreadResumeParams(
approvalsReviewer: options.appServer.approvalsReviewer,
sandbox: options.appServer.sandbox,
...(options.appServer.serviceTier ? { serviceTier: options.appServer.serviceTier } : {}),
developerInstructions: buildDeveloperInstructions(params),
developerInstructions: options.developerInstructions ?? buildDeveloperInstructions(params),
persistExtendedHistory: true,
};
}
@@ -143,11 +147,12 @@ export function buildTurnStartParams(
threadId: string;
cwd: string;
appServer: CodexAppServerRuntimeOptions;
promptText?: string;
},
): CodexTurnStartParams {
return {
threadId: options.threadId,
input: buildUserInput(params),
input: buildUserInput(params, options.promptText),
cwd: options.cwd,
approvalPolicy: options.appServer.approvalPolicy,
approvalsReviewer: options.appServer.approvalsReviewer,
@@ -177,7 +182,7 @@ function stabilizeJsonValue(value: JsonValue): JsonValue {
return stable;
}
function buildDeveloperInstructions(params: EmbeddedRunAttemptParams): string {
export function buildDeveloperInstructions(params: EmbeddedRunAttemptParams): string {
const sections = [
"You are running inside OpenClaw. Use OpenClaw dynamic tools for messaging, cron, sessions, and host actions when available.",
"Preserve the user's existing channel/session context. If sending a channel reply, use the OpenClaw messaging tool instead of describing that you would reply.",
@@ -188,9 +193,12 @@ function buildDeveloperInstructions(params: EmbeddedRunAttemptParams): string {
return sections.filter((section) => typeof section === "string" && section.trim()).join("\n\n");
}
function buildUserInput(params: EmbeddedRunAttemptParams): CodexUserInput[] {
function buildUserInput(
params: EmbeddedRunAttemptParams,
promptText: string = params.prompt,
): CodexUserInput[] {
return [
{ type: "text", text: params.prompt },
{ type: "text", text: promptText },
...(params.images ?? []).map(
(image): CodexUserInput => ({
type: "image",