feat: add Codex app-server harness extension

This commit is contained in:
Peter Steinberger
2026-04-10 13:49:54 +01:00
parent 44ec4d05de
commit dd26e8c44d
27 changed files with 4639 additions and 0 deletions

4
.github/labeler.yml vendored
View File

@@ -297,6 +297,10 @@
- changed-files:
- any-glob-to-any-file:
- "extensions/openai/**"
"extensions: codex":
- changed-files:
- any-glob-to-any-file:
- "extensions/codex/**"
"extensions: kimi-coding":
- changed-files:
- any-glob-to-any-file:

View File

@@ -0,0 +1,132 @@
import { callGatewayTool, type EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { buildApprovalResponse, handleCodexAppServerApprovalRequest } from "./approval-bridge.js";
vi.mock("openclaw/plugin-sdk/agent-harness", async (importOriginal) => ({
...(await importOriginal<typeof import("openclaw/plugin-sdk/agent-harness")>()),
callGatewayTool: vi.fn(),
}));
const mockCallGatewayTool = vi.mocked(callGatewayTool);
function createParams(): EmbeddedRunAttemptParams {
return {
sessionKey: "agent:main:session-1",
agentId: "main",
messageChannel: "telegram",
currentChannelId: "chat-1",
agentAccountId: "default",
currentThreadTs: "thread-ts",
onAgentEvent: vi.fn(),
} as unknown as EmbeddedRunAttemptParams;
}
describe("Codex app-server approval bridge", () => {
beforeEach(() => {
mockCallGatewayTool.mockReset();
});
it("routes command approvals through plugin approvals and accepts allowed commands", async () => {
const params = createParams();
mockCallGatewayTool
.mockResolvedValueOnce({ id: "plugin:approval-1", status: "accepted" })
.mockResolvedValueOnce({ id: "plugin:approval-1", decision: "allow-once" });
const result = await handleCodexAppServerApprovalRequest({
method: "item/commandExecution/requestApproval",
requestParams: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "cmd-1",
command: "pnpm test extensions/codex/app-server",
},
paramsForRun: params,
threadId: "thread-1",
turnId: "turn-1",
});
expect(result).toEqual({ decision: "accept" });
expect(mockCallGatewayTool.mock.calls.map(([method]) => method)).toEqual([
"plugin.approval.request",
"plugin.approval.waitDecision",
]);
expect(mockCallGatewayTool).toHaveBeenCalledWith(
"plugin.approval.request",
expect.any(Object),
expect.objectContaining({
pluginId: "openclaw-codex-app-server",
title: "Codex app-server command approval",
twoPhase: true,
turnSourceChannel: "telegram",
turnSourceTo: "chat-1",
}),
{ expectFinal: false },
);
expect(params.onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "approval",
data: expect.objectContaining({ status: "pending", approvalId: "plugin:approval-1" }),
}),
);
expect(params.onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "approval",
data: expect.objectContaining({ status: "approved", approvalId: "plugin:approval-1" }),
}),
);
});
it("fails closed when no approval route is available", async () => {
const params = createParams();
mockCallGatewayTool.mockResolvedValueOnce({
id: "plugin:approval-2",
decision: null,
});
const result = await handleCodexAppServerApprovalRequest({
method: "item/fileChange/requestApproval",
requestParams: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "patch-1",
reason: "needs write access",
},
paramsForRun: params,
threadId: "thread-1",
turnId: "turn-1",
});
expect(result).toEqual({ decision: "decline" });
expect(mockCallGatewayTool).toHaveBeenCalledTimes(1);
expect(params.onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "approval",
data: expect.objectContaining({ status: "unavailable", reason: "needs write access" }),
}),
);
});
it("maps app-server approval response families separately", () => {
expect(buildApprovalResponse("execCommandApproval", undefined, "approved-session")).toEqual({
decision: "approved_for_session",
});
expect(buildApprovalResponse("applyPatchApproval", undefined, "denied")).toEqual({
decision: "denied",
});
expect(
buildApprovalResponse(
"item/permissions/requestApproval",
{
permissions: {
network: { allowHosts: ["example.com"] },
fileSystem: null,
},
},
"approved-once",
),
).toEqual({
permissions: { network: { allowHosts: ["example.com"] } },
scope: "turn",
});
});
});

View File

@@ -0,0 +1,390 @@
import {
callGatewayTool,
type AgentApprovalEventData,
type EmbeddedRunAttemptParams,
type ExecApprovalDecision,
} from "openclaw/plugin-sdk/agent-harness";
import { isJsonObject, type JsonObject, type JsonValue } from "./protocol.js";
const DEFAULT_CODEX_APPROVAL_TIMEOUT_MS = 120_000;
export type AppServerApprovalOutcome =
| "approved-once"
| "approved-session"
| "denied"
| "unavailable"
| "cancelled";
type ApprovalRequestResult = {
id?: string;
status?: string;
decision?: ExecApprovalDecision | null;
};
type ApprovalWaitResult = {
id?: string;
decision?: ExecApprovalDecision | null;
};
export async function handleCodexAppServerApprovalRequest(params: {
method: string;
requestParams: JsonValue | undefined;
paramsForRun: EmbeddedRunAttemptParams;
threadId: string;
turnId: string;
signal?: AbortSignal;
}): Promise<JsonValue | undefined> {
const requestParams = isJsonObject(params.requestParams) ? params.requestParams : undefined;
if (!matchesCurrentTurn(requestParams, params.threadId, params.turnId)) {
return undefined;
}
const context = buildApprovalContext({
method: params.method,
requestParams,
paramsForRun: params.paramsForRun,
});
try {
const timeoutMs = DEFAULT_CODEX_APPROVAL_TIMEOUT_MS;
const requestResult = await callGatewayTool<ApprovalRequestResult>(
"plugin.approval.request",
{ timeoutMs: timeoutMs + 10_000 },
{
pluginId: "openclaw-codex-app-server",
title: context.title,
description: context.description,
severity: context.severity,
toolName: context.kind === "exec" ? "codex_command_approval" : "codex_file_approval",
toolCallId: context.itemId,
agentId: params.paramsForRun.agentId,
sessionKey: params.paramsForRun.sessionKey,
turnSourceChannel:
params.paramsForRun.messageChannel ?? params.paramsForRun.messageProvider,
turnSourceTo: params.paramsForRun.currentChannelId,
turnSourceAccountId: params.paramsForRun.agentAccountId,
turnSourceThreadId: params.paramsForRun.currentThreadTs,
timeoutMs,
twoPhase: true,
},
{ expectFinal: false },
);
const approvalId = requestResult?.id;
if (!approvalId) {
emitApprovalEvent(params.paramsForRun, {
phase: "resolved",
kind: context.kind,
status: "unavailable",
title: context.title,
...context.eventDetails,
message: "Codex app-server approval route unavailable.",
});
return buildApprovalResponse(params.method, context.requestParams, "denied");
}
emitApprovalEvent(params.paramsForRun, {
phase: "requested",
kind: context.kind,
status: "pending",
title: context.title,
approvalId,
approvalSlug: approvalId,
...context.eventDetails,
message: "Codex app-server approval requested.",
});
const decision = Object.prototype.hasOwnProperty.call(requestResult, "decision")
? requestResult.decision
: await waitForApprovalDecision({
approvalId,
timeoutMs,
signal: params.signal,
});
const outcome = mapExecDecisionToOutcome(decision);
emitApprovalEvent(params.paramsForRun, {
phase: "resolved",
kind: context.kind,
status:
outcome === "denied"
? "denied"
: outcome === "unavailable"
? "unavailable"
: outcome === "cancelled"
? "failed"
: "approved",
title: context.title,
approvalId,
approvalSlug: approvalId,
...context.eventDetails,
message: approvalResolutionMessage(outcome),
});
return buildApprovalResponse(params.method, context.requestParams, outcome);
} catch (error) {
const cancelled = params.signal?.aborted === true;
emitApprovalEvent(params.paramsForRun, {
phase: "resolved",
kind: context.kind,
status: cancelled ? "failed" : "unavailable",
title: context.title,
...context.eventDetails,
message: cancelled
? "Codex app-server approval cancelled because the run stopped."
: `Codex app-server approval route failed: ${formatErrorMessage(error)}`,
});
return buildApprovalResponse(
params.method,
context.requestParams,
cancelled ? "cancelled" : "denied",
);
}
}
export function buildApprovalResponse(
method: string,
requestParams: JsonObject | undefined,
outcome: AppServerApprovalOutcome,
): JsonValue {
if (method === "item/commandExecution/requestApproval") {
return { decision: commandApprovalDecision(requestParams, outcome) };
}
if (method === "item/fileChange/requestApproval") {
return { decision: fileChangeApprovalDecision(outcome) };
}
if (method === "item/permissions/requestApproval") {
if (outcome === "approved-session" || outcome === "approved-once") {
return {
permissions: requestedPermissions(requestParams),
scope: outcome === "approved-session" ? "session" : "turn",
};
}
return { permissions: {}, scope: "turn" };
}
if (method === "execCommandApproval" || method === "applyPatchApproval") {
return { decision: legacyReviewDecision(outcome) };
}
return {
decision: outcome === "approved-once" || outcome === "approved-session" ? "accept" : "decline",
};
}
function matchesCurrentTurn(
requestParams: JsonObject | undefined,
threadId: string,
turnId: string,
): boolean {
if (!requestParams) {
return true;
}
const requestThreadId =
readString(requestParams, "threadId") ?? readString(requestParams, "conversationId");
const requestTurnId = readString(requestParams, "turnId");
if (requestThreadId && requestThreadId !== threadId) {
return false;
}
if (requestTurnId && requestTurnId !== turnId) {
return false;
}
return true;
}
function buildApprovalContext(params: {
method: string;
requestParams: JsonObject | undefined;
paramsForRun: EmbeddedRunAttemptParams;
}) {
const itemId =
readString(params.requestParams, "itemId") ??
readString(params.requestParams, "callId") ??
readString(params.requestParams, "approvalId");
const command = readCommand(params.requestParams);
const reason = readString(params.requestParams, "reason");
const kind = approvalKindForMethod(params.method);
const title =
kind === "exec"
? "Codex app-server command approval"
: kind === "plugin"
? "Codex app-server file approval"
: "Codex app-server approval";
const subject = command
? `Command: ${truncate(command, 180)}`
: reason
? `Reason: ${truncate(reason, 180)}`
: `Request method: ${params.method}`;
const description = [
subject,
params.paramsForRun.sessionKey && `Session: ${params.paramsForRun.sessionKey}`,
]
.filter(Boolean)
.join("\n");
return {
kind,
title,
description,
severity: kind === "exec" ? ("warning" as const) : ("info" as const),
itemId,
requestParams: params.requestParams,
eventDetails: {
...(itemId ? { itemId } : {}),
...(command ? { command } : {}),
...(reason ? { reason } : {}),
},
};
}
async function waitForApprovalDecision(params: {
approvalId: string;
timeoutMs: number;
signal?: AbortSignal;
}): Promise<ExecApprovalDecision | null | undefined> {
const waitPromise = callGatewayTool<ApprovalWaitResult>(
"plugin.approval.waitDecision",
{ timeoutMs: params.timeoutMs + 10_000 },
{ id: params.approvalId },
);
if (!params.signal) {
return (await waitPromise)?.decision;
}
let onAbort: (() => void) | undefined;
const abortPromise = new Promise<never>((_, reject) => {
if (params.signal!.aborted) {
reject(params.signal!.reason);
return;
}
onAbort = () => reject(params.signal!.reason);
params.signal!.addEventListener("abort", onAbort, { once: true });
});
try {
return (await Promise.race([waitPromise, abortPromise]))?.decision;
} finally {
if (onAbort) {
params.signal.removeEventListener("abort", onAbort);
}
}
}
function commandApprovalDecision(
requestParams: JsonObject | undefined,
outcome: AppServerApprovalOutcome,
): JsonValue {
if (outcome === "cancelled") {
return "cancel";
}
if (outcome === "denied" || outcome === "unavailable") {
return "decline";
}
if (outcome === "approved-session" && hasAvailableDecision(requestParams, "acceptForSession")) {
return "acceptForSession";
}
return "accept";
}
function fileChangeApprovalDecision(outcome: AppServerApprovalOutcome): JsonValue {
if (outcome === "cancelled") {
return "cancel";
}
if (outcome === "denied" || outcome === "unavailable") {
return "decline";
}
return outcome === "approved-session" ? "acceptForSession" : "accept";
}
function legacyReviewDecision(outcome: AppServerApprovalOutcome): JsonValue {
if (outcome === "cancelled") {
return "abort";
}
if (outcome === "denied" || outcome === "unavailable") {
return "denied";
}
return outcome === "approved-session" ? "approved_for_session" : "approved";
}
function requestedPermissions(requestParams: JsonObject | undefined): JsonObject {
const permissions = isJsonObject(requestParams?.permissions) ? requestParams.permissions : {};
const granted: JsonObject = {};
if (isJsonObject(permissions.network)) {
granted.network = permissions.network;
}
if (isJsonObject(permissions.fileSystem)) {
granted.fileSystem = permissions.fileSystem;
}
return granted;
}
function hasAvailableDecision(requestParams: JsonObject | undefined, decision: string): boolean {
const available = requestParams?.availableDecisions;
if (!Array.isArray(available)) {
return true;
}
return available.includes(decision);
}
function mapExecDecisionToOutcome(
decision: ExecApprovalDecision | null | undefined,
): AppServerApprovalOutcome {
if (decision === "allow-once") {
return "approved-once";
}
if (decision === "allow-always") {
return "approved-session";
}
if (decision === null || decision === undefined) {
return "unavailable";
}
return "denied";
}
function approvalResolutionMessage(outcome: AppServerApprovalOutcome): string {
if (outcome === "approved-session") {
return "Codex app-server approval granted for the session.";
}
if (outcome === "approved-once") {
return "Codex app-server approval granted once.";
}
if (outcome === "cancelled") {
return "Codex app-server approval cancelled.";
}
if (outcome === "unavailable") {
return "Codex app-server approval unavailable.";
}
return "Codex app-server approval denied.";
}
function approvalKindForMethod(method: string): AgentApprovalEventData["kind"] {
if (method.includes("commandExecution") || method.includes("execCommand")) {
return "exec";
}
if (method.includes("fileChange") || method.includes("Patch") || method.includes("permissions")) {
return "plugin";
}
return "unknown";
}
function emitApprovalEvent(params: EmbeddedRunAttemptParams, data: AgentApprovalEventData): void {
params.onAgentEvent?.({ stream: "approval", data: data as unknown as Record<string, unknown> });
}
function readCommand(record: JsonObject | undefined): string | undefined {
const command = record?.command;
if (typeof command === "string") {
return command;
}
if (Array.isArray(command) && command.every((part) => typeof part === "string")) {
return command.join(" ");
}
return undefined;
}
function readString(record: JsonObject | undefined, key: string): string | undefined {
const value = record?.[key];
return typeof value === "string" ? value : undefined;
}
function truncate(value: string, maxLength: number): string {
return value.length <= maxLength ? value : `${value.slice(0, Math.max(0, maxLength - 3))}...`;
}
function formatErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

View File

@@ -0,0 +1,191 @@
import { EventEmitter } from "node:events";
import { PassThrough, Writable } from "node:stream";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
CodexAppServerClient,
listCodexAppServerModels,
resetSharedCodexAppServerClientForTests,
} from "./client.js";
function createClientHarness() {
const stdout = new PassThrough();
const stderr = new PassThrough();
const writes: string[] = [];
const stdin = new Writable({
write(chunk, _encoding, callback) {
writes.push(chunk.toString());
callback();
},
});
const process = Object.assign(new EventEmitter(), {
stdin,
stdout,
stderr,
killed: false,
kill: vi.fn(() => {
process.killed = true;
}),
});
const client = CodexAppServerClient.fromTransportForTests(process);
return {
client,
writes,
send(message: unknown) {
stdout.write(`${JSON.stringify(message)}\n`);
},
};
}
describe("CodexAppServerClient", () => {
const clients: CodexAppServerClient[] = [];
afterEach(() => {
resetSharedCodexAppServerClientForTests();
vi.restoreAllMocks();
for (const client of clients) {
client.close();
}
clients.length = 0;
});
it("routes request responses by id", async () => {
const harness = createClientHarness();
clients.push(harness.client);
const request = harness.client.request("model/list", {});
const outbound = JSON.parse(harness.writes[0] ?? "{}") as { id?: number; method?: string };
harness.send({ id: outbound.id, result: { models: [] } });
await expect(request).resolves.toEqual({ models: [] });
expect(outbound.method).toBe("model/list");
});
it("initializes with the required client version", async () => {
const harness = createClientHarness();
clients.push(harness.client);
const initializing = harness.client.initialize();
const outbound = JSON.parse(harness.writes[0] ?? "{}") as {
id?: number;
method?: string;
params?: { clientInfo?: { name?: string; title?: string; version?: string } };
};
harness.send({ id: outbound.id, result: {} });
await expect(initializing).resolves.toBeUndefined();
expect(outbound).toMatchObject({
method: "initialize",
params: {
clientInfo: {
name: "openclaw",
title: "OpenClaw",
version: expect.any(String),
},
},
});
expect(outbound.params?.clientInfo?.version).not.toBe("");
expect(JSON.parse(harness.writes[1] ?? "{}")).toEqual({ method: "initialized" });
});
it("answers server-initiated requests with the registered handler result", async () => {
const harness = createClientHarness();
clients.push(harness.client);
harness.client.addRequestHandler((request) => {
if (request.method === "item/tool/call") {
return { contentItems: [{ type: "inputText", text: "ok" }], success: true };
}
return undefined;
});
harness.send({ id: "srv-1", method: "item/tool/call", params: { tool: "message" } });
await vi.waitFor(() => expect(harness.writes.length).toBe(1));
expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({
id: "srv-1",
result: { contentItems: [{ type: "inputText", text: "ok" }], success: true },
});
});
it("fails closed for unhandled native app-server approvals", async () => {
const harness = createClientHarness();
clients.push(harness.client);
harness.send({
id: "approval-1",
method: "item/commandExecution/requestApproval",
params: { threadId: "thread-1", turnId: "turn-1", itemId: "cmd-1", command: "pnpm test" },
});
await vi.waitFor(() => expect(harness.writes.length).toBe(1));
expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({
id: "approval-1",
result: { decision: "decline" },
});
});
it("fails closed with legacy review decisions for legacy app-server approvals", async () => {
const harness = createClientHarness();
clients.push(harness.client);
harness.send({
id: "approval-legacy",
method: "execCommandApproval",
params: { conversationId: "thread-1", callId: "cmd-1", command: ["pnpm", "test"] },
});
await vi.waitFor(() => expect(harness.writes.length).toBe(1));
expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({
id: "approval-legacy",
result: { decision: "denied" },
});
});
it("lists app-server models through the typed helper", async () => {
const harness = createClientHarness();
clients.push(harness.client);
const startSpy = vi.spyOn(CodexAppServerClient, "start").mockReturnValue(harness.client);
const listPromise = listCodexAppServerModels({ limit: 12, timeoutMs: 1000 });
const initialize = JSON.parse(harness.writes[0] ?? "{}") as { id?: number };
harness.send({ id: initialize.id, result: {} });
await vi.waitFor(() => expect(harness.writes.length).toBeGreaterThanOrEqual(3));
const list = JSON.parse(harness.writes[2] ?? "{}") as { id?: number; method?: string };
expect(list.method).toBe("model/list");
harness.send({
id: list.id,
result: {
data: [
{
id: "gpt-5.4",
model: "gpt-5.4",
displayName: "gpt-5.4",
inputModalities: ["text", "image"],
supportedReasoningEfforts: [
{ reasoningEffort: "low", description: "fast" },
{ reasoningEffort: "xhigh", description: "deep" },
],
defaultReasoningEffort: "medium",
isDefault: true,
},
],
nextCursor: null,
},
});
await expect(listPromise).resolves.toEqual({
models: [
{
id: "gpt-5.4",
model: "gpt-5.4",
displayName: "gpt-5.4",
inputModalities: ["text", "image"],
supportedReasoningEfforts: ["low", "xhigh"],
defaultReasoningEffort: "medium",
isDefault: true,
},
],
});
startSpy.mockRestore();
});
});

View File

@@ -0,0 +1,503 @@
import { spawn } from "node:child_process";
import { createInterface, type Interface as ReadlineInterface } from "node:readline";
import { embeddedAgentLog, OPENCLAW_VERSION } from "openclaw/plugin-sdk/agent-harness";
import {
isRpcResponse,
type CodexServerNotification,
type JsonObject,
type JsonValue,
type RpcMessage,
type RpcRequest,
type RpcResponse,
} from "./protocol.js";
type PendingRequest = {
method: string;
resolve: (value: unknown) => void;
reject: (error: Error) => void;
};
type CodexAppServerTransport = {
stdin: { write: (data: string) => unknown };
stdout: NodeJS.ReadableStream;
stderr: NodeJS.ReadableStream;
killed?: boolean;
kill?: () => unknown;
once: (event: string, listener: (...args: unknown[]) => void) => unknown;
};
export type CodexServerRequestHandler = (
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
) => Promise<JsonValue | undefined> | JsonValue | undefined;
export type CodexServerNotificationHandler = (
notification: CodexServerNotification,
) => Promise<void> | void;
export type CodexAppServerModel = {
id: string;
model: string;
displayName?: string;
description?: string;
hidden?: boolean;
isDefault?: boolean;
inputModalities: string[];
supportedReasoningEfforts: string[];
defaultReasoningEffort?: string;
};
export type CodexAppServerModelListResult = {
models: CodexAppServerModel[];
nextCursor?: string;
};
export type CodexAppServerListModelsOptions = {
limit?: number;
cursor?: string;
includeHidden?: boolean;
timeoutMs?: number;
};
export class CodexAppServerClient {
private readonly child: CodexAppServerTransport;
private readonly lines: ReadlineInterface;
private readonly pending = new Map<number | string, PendingRequest>();
private readonly requestHandlers = new Set<CodexServerRequestHandler>();
private readonly notificationHandlers = new Set<CodexServerNotificationHandler>();
private nextId = 1;
private initialized = false;
private closed = false;
private constructor(child: CodexAppServerTransport) {
this.child = child;
this.lines = createInterface({ input: child.stdout });
this.lines.on("line", (line) => this.handleLine(line));
child.stderr.on("data", (chunk: Buffer | string) => {
const text = chunk.toString("utf8").trim();
if (text) {
embeddedAgentLog.debug(`codex app-server stderr: ${text}`);
}
});
child.once("error", (error) =>
this.closeWithError(error instanceof Error ? error : new Error(String(error))),
);
child.once("exit", (code, signal) => {
this.closeWithError(
new Error(
`codex app-server exited: code=${formatExitValue(code)} signal=${formatExitValue(signal)}`,
),
);
});
}
static start(): CodexAppServerClient {
const bin = process.env.OPENCLAW_CODEX_APP_SERVER_BIN?.trim() || "codex";
const extraArgs = splitShellWords(process.env.OPENCLAW_CODEX_APP_SERVER_ARGS ?? "");
const args = extraArgs.length > 0 ? extraArgs : ["app-server", "--listen", "stdio://"];
const child = spawn(bin, args, {
env: process.env,
stdio: ["pipe", "pipe", "pipe"],
});
return new CodexAppServerClient(child);
}
static fromTransportForTests(child: CodexAppServerTransport): CodexAppServerClient {
return new CodexAppServerClient(child);
}
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
await this.request("initialize", {
clientInfo: {
name: "openclaw",
title: "OpenClaw",
version: OPENCLAW_VERSION,
},
capabilities: {
experimentalApi: true,
},
});
this.notify("initialized");
this.initialized = true;
}
request<T = JsonValue | undefined>(method: string, params?: JsonValue): Promise<T> {
if (this.closed) {
return Promise.reject(new Error("codex app-server client is closed"));
}
const id = this.nextId++;
const message: RpcRequest = { id, method, params };
return new Promise<T>((resolve, reject) => {
this.pending.set(id, {
method,
resolve: (value) => resolve(value as T),
reject,
});
this.writeMessage(message);
});
}
notify(method: string, params?: JsonValue): void {
this.writeMessage({ method, params });
}
addRequestHandler(handler: CodexServerRequestHandler): () => void {
this.requestHandlers.add(handler);
return () => this.requestHandlers.delete(handler);
}
addNotificationHandler(handler: CodexServerNotificationHandler): () => void {
this.notificationHandlers.add(handler);
return () => this.notificationHandlers.delete(handler);
}
close(): void {
this.closed = true;
this.lines.close();
if (!this.child.killed) {
this.child.kill?.();
}
}
private writeMessage(message: RpcRequest | RpcResponse): void {
this.child.stdin.write(`${JSON.stringify(message)}\n`);
}
private handleLine(line: string): void {
const trimmed = line.trim();
if (!trimmed) {
return;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch (error) {
embeddedAgentLog.warn("failed to parse codex app-server message", { error });
return;
}
if (!parsed || typeof parsed !== "object") {
return;
}
const message = parsed as RpcMessage;
if (isRpcResponse(message)) {
this.handleResponse(message);
return;
}
if (!("method" in message)) {
return;
}
if ("id" in message && message.id !== undefined) {
void this.handleServerRequest({
id: message.id,
method: message.method,
params: message.params,
});
return;
}
this.handleNotification({
method: message.method,
params: message.params,
});
}
private handleResponse(response: RpcResponse): void {
const pending = this.pending.get(response.id);
if (!pending) {
return;
}
this.pending.delete(response.id);
if (response.error) {
pending.reject(new Error(response.error.message || `${pending.method} failed`));
return;
}
pending.resolve(response.result);
}
private async handleServerRequest(
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
): Promise<void> {
try {
for (const handler of this.requestHandlers) {
const result = await handler(request);
if (result !== undefined) {
this.writeMessage({ id: request.id, result });
return;
}
}
this.writeMessage({ id: request.id, result: defaultServerRequestResponse(request) });
} catch (error) {
this.writeMessage({
id: request.id,
error: {
message: error instanceof Error ? error.message : String(error),
},
});
}
}
private handleNotification(notification: CodexServerNotification): void {
for (const handler of this.notificationHandlers) {
Promise.resolve(handler(notification)).catch((error: unknown) => {
embeddedAgentLog.warn("codex app-server notification handler failed", { error });
});
}
}
private closeWithError(error: Error): void {
if (this.closed) {
return;
}
this.closed = true;
for (const pending of this.pending.values()) {
pending.reject(error);
}
this.pending.clear();
clearSharedClientIfCurrent(this);
}
}
let sharedClient: CodexAppServerClient | undefined;
let sharedClientPromise: Promise<CodexAppServerClient> | undefined;
export async function getSharedCodexAppServerClient(): Promise<CodexAppServerClient> {
sharedClientPromise ??= (async () => {
const client = CodexAppServerClient.start();
sharedClient = client;
await client.initialize();
return client;
})();
try {
return await sharedClientPromise;
} catch (error) {
sharedClient = undefined;
sharedClientPromise = undefined;
throw error;
}
}
export function resetSharedCodexAppServerClientForTests(): void {
sharedClient = undefined;
sharedClientPromise = undefined;
}
function clearSharedClientIfCurrent(client: CodexAppServerClient): void {
if (sharedClient !== client) {
return;
}
sharedClient = undefined;
sharedClientPromise = undefined;
}
export async function listCodexAppServerModels(
options: CodexAppServerListModelsOptions = {},
): Promise<CodexAppServerModelListResult> {
const timeoutMs = options.timeoutMs ?? 2500;
return await withTimeout(
(async () => {
const client = await getSharedCodexAppServerClient();
const response = await client.request<JsonObject>("model/list", {
limit: options.limit ?? null,
cursor: options.cursor ?? null,
includeHidden: options.includeHidden ?? null,
});
return readModelListResult(response);
})(),
timeoutMs,
"codex app-server model/list timed out",
);
}
export function defaultServerRequestResponse(
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
): JsonValue {
if (request.method === "item/tool/call") {
return {
contentItems: [
{
type: "inputText",
text: "OpenClaw did not register a handler for this app-server tool call.",
},
],
success: false,
};
}
if (
request.method === "item/commandExecution/requestApproval" ||
request.method === "item/fileChange/requestApproval"
) {
return { decision: "decline" };
}
if (request.method === "execCommandApproval" || request.method === "applyPatchApproval") {
return { decision: "denied" };
}
if (request.method === "item/permissions/requestApproval") {
return { permissions: {}, scope: "turn" };
}
if (isCodexAppServerApprovalRequest(request.method)) {
return {
decision: "decline",
reason: "OpenClaw codex app-server bridge does not grant native approvals yet.",
};
}
if (request.method === "item/tool/requestUserInput") {
return {
answers: {},
};
}
if (request.method === "mcpServer/elicitation/request") {
return {
action: "decline",
};
}
return {};
}
function readModelListResult(value: JsonValue | undefined): CodexAppServerModelListResult {
if (!isJsonObjectValue(value) || !Array.isArray(value.data)) {
return { models: [] };
}
const models = value.data
.map((entry) => readCodexModel(entry))
.filter((entry): entry is CodexAppServerModel => entry !== undefined);
const nextCursor = typeof value.nextCursor === "string" ? value.nextCursor : undefined;
return { models, ...(nextCursor ? { nextCursor } : {}) };
}
function readCodexModel(value: unknown): CodexAppServerModel | undefined {
if (!isJsonObjectValue(value)) {
return undefined;
}
const id = readNonEmptyString(value.id);
const model = readNonEmptyString(value.model) ?? id;
if (!id || !model) {
return undefined;
}
return {
id,
model,
...(readNonEmptyString(value.displayName)
? { displayName: readNonEmptyString(value.displayName) }
: {}),
...(readNonEmptyString(value.description)
? { description: readNonEmptyString(value.description) }
: {}),
...(typeof value.hidden === "boolean" ? { hidden: value.hidden } : {}),
...(typeof value.isDefault === "boolean" ? { isDefault: value.isDefault } : {}),
inputModalities: readStringArray(value.inputModalities),
supportedReasoningEfforts: readReasoningEfforts(value.supportedReasoningEfforts),
...(readNonEmptyString(value.defaultReasoningEffort)
? { defaultReasoningEffort: readNonEmptyString(value.defaultReasoningEffort) }
: {}),
};
}
function readReasoningEfforts(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
const efforts = value
.map((entry) => {
if (!isJsonObjectValue(entry)) {
return undefined;
}
return readNonEmptyString(entry.reasoningEffort);
})
.filter((entry): entry is string => entry !== undefined);
return [...new Set(efforts)];
}
function readStringArray(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return [
...new Set(
value
.map((entry) => readNonEmptyString(entry))
.filter((entry): entry is string => entry !== undefined),
),
];
}
function readNonEmptyString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function isJsonObjectValue(value: unknown): value is JsonObject {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}
async function withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
timeoutMessage: string,
): Promise<T> {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return await promise;
}
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timeout = setTimeout(() => reject(new Error(timeoutMessage)), Math.max(1, timeoutMs));
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
export function isCodexAppServerApprovalRequest(method: string): boolean {
return method.includes("requestApproval") || method.includes("Approval");
}
function splitShellWords(value: string): string[] {
const words: string[] = [];
let current = "";
let quote: '"' | "'" | null = null;
for (const char of value) {
if (quote) {
if (char === quote) {
quote = null;
} else {
current += char;
}
continue;
}
if (char === '"' || char === "'") {
quote = char;
continue;
}
if (/\s/.test(char)) {
if (current) {
words.push(current);
current = "";
}
continue;
}
current += char;
}
if (current) {
words.push(current);
}
return words;
}
function formatExitValue(value: unknown): string {
if (value === null || value === undefined) {
return "null";
}
if (typeof value === "string" || typeof value === "number") {
return String(value);
}
return "unknown";
}

View File

@@ -0,0 +1,140 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CodexAppServerClient } from "./client.js";
import { maybeCompactCodexAppServerSession, __testing } from "./compact.js";
import type { CodexServerNotification } from "./protocol.js";
import { writeCodexAppServerBinding } from "./session-binding.js";
const OLD_RUNTIME = process.env.OPENCLAW_AGENT_RUNTIME;
let tempDir: string;
describe("maybeCompactCodexAppServerSession", () => {
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-compact-"));
process.env.OPENCLAW_AGENT_RUNTIME = "codex-app-server";
});
afterEach(async () => {
__testing.resetCodexAppServerClientFactoryForTests();
if (OLD_RUNTIME === undefined) {
delete process.env.OPENCLAW_AGENT_RUNTIME;
} else {
process.env.OPENCLAW_AGENT_RUNTIME = OLD_RUNTIME;
}
await fs.rm(tempDir, { recursive: true, force: true });
});
it("waits for native app-server compaction before reporting success", async () => {
const fake = createFakeCodexClient();
__testing.setCodexAppServerClientFactoryForTests(async () => fake.client);
const sessionFile = path.join(tempDir, "session.jsonl");
await writeCodexAppServerBinding(sessionFile, {
threadId: "thread-1",
cwd: tempDir,
});
const pendingResult = maybeCompactCodexAppServerSession({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile,
workspaceDir: tempDir,
currentTokenCount: 123,
});
await vi.waitFor(() => {
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
});
let settled = false;
void pendingResult.then(() => {
settled = true;
});
await Promise.resolve();
expect(settled).toBe(false);
fake.emit({
method: "thread/compacted",
params: { threadId: "thread-1", turnId: "turn-1" },
});
const result = await pendingResult;
expect(result).toMatchObject({
ok: true,
compacted: true,
result: {
tokensBefore: 123,
details: {
backend: "codex-app-server",
threadId: "thread-1",
signal: "thread/compacted",
turnId: "turn-1",
},
},
});
});
it("accepts native context-compaction item completion as success", async () => {
const fake = createFakeCodexClient();
__testing.setCodexAppServerClientFactoryForTests(async () => fake.client);
const sessionFile = path.join(tempDir, "session.jsonl");
await writeCodexAppServerBinding(sessionFile, {
threadId: "thread-1",
cwd: tempDir,
});
const pendingResult = maybeCompactCodexAppServerSession({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile,
workspaceDir: tempDir,
});
await vi.waitFor(() => {
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
});
fake.emit({
method: "item/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
item: { type: "contextCompaction", id: "compact-1" },
},
});
await expect(pendingResult).resolves.toMatchObject({
ok: true,
compacted: true,
result: {
details: {
signal: "item/completed",
itemId: "compact-1",
},
},
});
});
});
function createFakeCodexClient(): {
client: CodexAppServerClient;
request: ReturnType<typeof vi.fn>;
emit: (notification: CodexServerNotification) => void;
} {
const handlers = new Set<(notification: CodexServerNotification) => void>();
const request = vi.fn(async () => ({}));
return {
client: {
request,
addNotificationHandler(handler: (notification: CodexServerNotification) => void) {
handlers.add(handler);
return () => handlers.delete(handler);
},
} as unknown as CodexAppServerClient,
request,
emit(notification: CodexServerNotification): void {
for (const handler of handlers) {
handler(notification);
}
},
};
}

View File

@@ -0,0 +1,227 @@
import {
embeddedAgentLog,
resolveEmbeddedAgentRuntime,
type CompactEmbeddedPiSessionParams,
type EmbeddedPiCompactResult,
} from "openclaw/plugin-sdk/agent-harness";
import {
getSharedCodexAppServerClient,
type CodexAppServerClient,
type CodexServerNotificationHandler,
} from "./client.js";
import { isJsonObject, type CodexServerNotification, type JsonObject } from "./protocol.js";
import { readCodexAppServerBinding } from "./session-binding.js";
type CodexAppServerClientFactory = () => Promise<CodexAppServerClient>;
type CodexNativeCompactionCompletion = {
signal: "thread/compacted" | "item/completed";
turnId?: string;
itemId?: string;
};
type CodexNativeCompactionWaiter = {
promise: Promise<CodexNativeCompactionCompletion>;
startTimeout: () => void;
cancel: () => void;
};
const DEFAULT_CODEX_COMPACTION_WAIT_TIMEOUT_MS = 5 * 60 * 1000;
let clientFactory: CodexAppServerClientFactory = getSharedCodexAppServerClient;
export async function maybeCompactCodexAppServerSession(
params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult | undefined> {
const runtime = resolveEmbeddedAgentRuntime();
const provider = params.provider?.trim().toLowerCase();
const shouldUseCodex =
runtime === "codex" ||
(runtime === "auto" && (provider === "codex" || provider === "openai-codex"));
if (!shouldUseCodex) {
return undefined;
}
const binding = await readCodexAppServerBinding(params.sessionFile);
if (!binding?.threadId) {
if (runtime === "codex") {
return { ok: false, compacted: false, reason: "no codex app-server thread binding" };
}
return undefined;
}
const client = await clientFactory();
const waiter = createCodexNativeCompactionWaiter(client, binding.threadId);
let completion: CodexNativeCompactionCompletion;
try {
await client.request("thread/compact/start", {
threadId: binding.threadId,
});
embeddedAgentLog.info("started codex app-server compaction", {
sessionId: params.sessionId,
threadId: binding.threadId,
});
waiter.startTimeout();
completion = await waiter.promise;
} catch (error) {
waiter.cancel();
return {
ok: false,
compacted: false,
reason: formatCompactionError(error),
};
}
embeddedAgentLog.info("completed codex app-server compaction", {
sessionId: params.sessionId,
threadId: binding.threadId,
signal: completion.signal,
turnId: completion.turnId,
itemId: completion.itemId,
});
return {
ok: true,
compacted: true,
result: {
summary: "",
firstKeptEntryId: "",
tokensBefore: params.currentTokenCount ?? 0,
details: {
backend: "codex-app-server",
threadId: binding.threadId,
signal: completion.signal,
turnId: completion.turnId,
itemId: completion.itemId,
},
},
};
}
function createCodexNativeCompactionWaiter(
client: CodexAppServerClient,
threadId: string,
): CodexNativeCompactionWaiter {
let settled = false;
let removeHandler: () => void = () => {};
let timeout: ReturnType<typeof setTimeout> | undefined;
let failWaiter: (error: Error) => void = () => {};
const promise = new Promise<CodexNativeCompactionCompletion>((resolve, reject) => {
const cleanup = (): void => {
removeHandler();
if (timeout) {
clearTimeout(timeout);
}
};
const complete = (completion: CodexNativeCompactionCompletion): void => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve(completion);
};
const fail = (error: Error): void => {
if (settled) {
return;
}
settled = true;
cleanup();
reject(error);
};
failWaiter = fail;
const handler: CodexServerNotificationHandler = (notification) => {
const completion = readNativeCompactionCompletion(notification, threadId);
if (completion) {
complete(completion);
}
};
removeHandler = client.addNotificationHandler(handler);
});
return {
promise,
startTimeout(): void {
if (settled || timeout) {
return;
}
timeout = setTimeout(() => {
failWaiter(new Error(`timed out waiting for codex app-server compaction for ${threadId}`));
}, resolveCompactionWaitTimeoutMs());
timeout.unref?.();
},
cancel(): void {
if (settled) {
return;
}
settled = true;
removeHandler();
if (timeout) {
clearTimeout(timeout);
}
},
};
}
function readNativeCompactionCompletion(
notification: CodexServerNotification,
threadId: string,
): CodexNativeCompactionCompletion | undefined {
const params = notification.params;
if (!isJsonObject(params) || readString(params, "threadId", "thread_id") !== threadId) {
return undefined;
}
if (notification.method === "thread/compacted") {
return {
signal: "thread/compacted",
turnId: readString(params, "turnId", "turn_id"),
};
}
if (notification.method !== "item/completed") {
return undefined;
}
const item = isJsonObject(params.item) ? params.item : undefined;
if (readString(item, "type") !== "contextCompaction") {
return undefined;
}
return {
signal: "item/completed",
turnId: readString(params, "turnId", "turn_id"),
itemId: readString(item, "id") ?? readString(params, "itemId", "item_id", "id"),
};
}
function resolveCompactionWaitTimeoutMs(): number {
const raw = process.env.OPENCLAW_CODEX_COMPACTION_WAIT_TIMEOUT_MS?.trim();
const parsed = raw ? Number.parseInt(raw, 10) : NaN;
if (Number.isFinite(parsed) && parsed > 0) {
return parsed;
}
return DEFAULT_CODEX_COMPACTION_WAIT_TIMEOUT_MS;
}
function readString(params: JsonObject | undefined, ...keys: string[]): string | undefined {
if (!params) {
return undefined;
}
for (const key of keys) {
const value = params[key];
if (typeof value === "string") {
return value;
}
}
return undefined;
}
function formatCompactionError(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
export const __testing = {
setCodexAppServerClientFactoryForTests(factory: CodexAppServerClientFactory): void {
clientFactory = factory;
},
resetCodexAppServerClientFactoryForTests(): void {
clientFactory = getSharedCodexAppServerClient;
},
} as const;

View File

@@ -0,0 +1,141 @@
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import type { AnyAgentTool } from "openclaw/plugin-sdk/agent-harness";
import { describe, expect, it, vi } from "vitest";
import { createCodexDynamicToolBridge } from "./dynamic-tools.js";
function createTool(overrides: Partial<AnyAgentTool>): AnyAgentTool {
return {
name: "tts",
description: "Convert text to speech.",
parameters: { type: "object", properties: {} },
execute: vi.fn(),
...overrides,
} as unknown as AnyAgentTool;
}
describe("createCodexDynamicToolBridge", () => {
it.each([
{ toolName: "tts", mediaUrl: "/tmp/reply.opus", audioAsVoice: true },
{ toolName: "image_generate", mediaUrl: "/tmp/generated.png" },
{ toolName: "video_generate", mediaUrl: "https://media.example/video.mp4" },
{ toolName: "music_generate", mediaUrl: "https://media.example/music.wav" },
])(
"preserves structured media artifacts from $toolName tool results",
async ({ toolName, mediaUrl, audioAsVoice }) => {
const toolResult = {
content: [{ type: "text", text: "Generated media reply." }],
details: {
media: {
mediaUrl,
...(audioAsVoice === true ? { audioAsVoice: true } : {}),
},
},
} satisfies AgentToolResult<unknown>;
const tool = createTool({
name: toolName,
execute: vi.fn(async () => toolResult),
});
const bridge = createCodexDynamicToolBridge({
tools: [tool],
signal: new AbortController().signal,
});
const result = await bridge.handleToolCall({
threadId: "thread-1",
turnId: "turn-1",
callId: "call-1",
tool: toolName,
arguments: { prompt: "hello" },
});
expect(result).toEqual({
success: true,
contentItems: [{ type: "inputText", text: "Generated media reply." }],
});
expect(bridge.telemetry.toolMediaUrls).toEqual([mediaUrl]);
expect(bridge.telemetry.toolAudioAsVoice).toBe(audioAsVoice === true);
},
);
it("preserves audio-as-voice metadata from tts results", async () => {
const toolResult = {
content: [{ type: "text", text: "Generated audio reply." }],
details: {
media: {
mediaUrl: "/tmp/reply.opus",
audioAsVoice: true,
},
},
} satisfies AgentToolResult<unknown>;
const tool = createTool({
execute: vi.fn(async () => toolResult),
});
const bridge = createCodexDynamicToolBridge({
tools: [tool],
signal: new AbortController().signal,
});
const result = await bridge.handleToolCall({
threadId: "thread-1",
turnId: "turn-1",
callId: "call-1",
tool: "tts",
arguments: { text: "hello" },
});
expect(result).toEqual({
success: true,
contentItems: [{ type: "inputText", text: "Generated audio reply." }],
});
expect(bridge.telemetry.toolMediaUrls).toEqual(["/tmp/reply.opus"]);
expect(bridge.telemetry.toolAudioAsVoice).toBe(true);
});
it("records messaging tool side effects while returning concise text to app-server", async () => {
const toolResult = {
content: [{ type: "text", text: "Sent." }],
details: { messageId: "message-1" },
} satisfies AgentToolResult<unknown>;
const tool = createTool({
name: "message",
execute: vi.fn(async () => toolResult),
});
const bridge = createCodexDynamicToolBridge({
tools: [tool],
signal: new AbortController().signal,
});
const result = await bridge.handleToolCall({
threadId: "thread-1",
turnId: "turn-1",
callId: "call-1",
tool: "message",
arguments: {
action: "send",
text: "hello from Codex",
mediaUrl: "/tmp/reply.png",
provider: "telegram",
to: "chat-1",
threadId: "thread-ts-1",
},
});
expect(result).toEqual({
success: true,
contentItems: [{ type: "inputText", text: "Sent." }],
});
expect(bridge.telemetry).toMatchObject({
didSendViaMessagingTool: true,
messagingToolSentTexts: ["hello from Codex"],
messagingToolSentMediaUrls: ["/tmp/reply.png"],
messagingToolSentTargets: [
{
tool: "message",
provider: "telegram",
to: "chat-1",
threadId: "thread-ts-1",
},
],
});
});
});

View File

@@ -0,0 +1,217 @@
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import type { ImageContent, TextContent } from "@mariozechner/pi-ai";
import {
extractToolResultMediaArtifact,
filterToolResultMediaUrls,
isMessagingTool,
isMessagingToolSendAction,
type AnyAgentTool,
type MessagingToolSend,
} from "openclaw/plugin-sdk/agent-harness";
import {
type CodexDynamicToolCallOutputContentItem,
type CodexDynamicToolCallParams,
type CodexDynamicToolCallResponse,
type CodexDynamicToolSpec,
type JsonValue,
} from "./protocol.js";
export type CodexDynamicToolBridge = {
specs: CodexDynamicToolSpec[];
handleToolCall: (params: CodexDynamicToolCallParams) => Promise<CodexDynamicToolCallResponse>;
telemetry: {
didSendViaMessagingTool: boolean;
messagingToolSentTexts: string[];
messagingToolSentMediaUrls: string[];
messagingToolSentTargets: MessagingToolSend[];
toolMediaUrls: string[];
toolAudioAsVoice: boolean;
successfulCronAdds?: number;
};
};
export function createCodexDynamicToolBridge(params: {
tools: AnyAgentTool[];
signal: AbortSignal;
}): CodexDynamicToolBridge {
const toolMap = new Map(params.tools.map((tool) => [tool.name, tool]));
const telemetry: CodexDynamicToolBridge["telemetry"] = {
didSendViaMessagingTool: false,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],
toolMediaUrls: [],
toolAudioAsVoice: false,
};
return {
specs: params.tools.map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: toJsonValue(tool.parameters),
})),
telemetry,
handleToolCall: async (call) => {
const tool = toolMap.get(call.tool);
if (!tool) {
return {
contentItems: [{ type: "inputText", text: `Unknown OpenClaw tool: ${call.tool}` }],
success: false,
};
}
const args = jsonObjectToRecord(call.arguments);
try {
const preparedArgs = tool.prepareArguments ? tool.prepareArguments(args) : args;
const result = await tool.execute(call.callId, preparedArgs, params.signal);
collectToolTelemetry({
toolName: tool.name,
args,
result,
telemetry,
isError: false,
});
return {
contentItems: result.content.flatMap(convertToolContent),
success: true,
};
} catch (error) {
collectToolTelemetry({
toolName: tool.name,
args,
result: undefined,
telemetry,
isError: true,
});
return {
contentItems: [
{
type: "inputText",
text: error instanceof Error ? error.message : String(error),
},
],
success: false,
};
}
},
};
}
function collectToolTelemetry(params: {
toolName: string;
args: Record<string, unknown>;
result: AgentToolResult<unknown> | undefined;
telemetry: CodexDynamicToolBridge["telemetry"];
isError: boolean;
}): void {
if (!params.isError && params.toolName === "cron" && isCronAddAction(params.args)) {
params.telemetry.successfulCronAdds = (params.telemetry.successfulCronAdds ?? 0) + 1;
}
if (!params.isError && params.result) {
const media = extractToolResultMediaArtifact(params.result);
if (media) {
const mediaUrls = filterToolResultMediaUrls(params.toolName, media.mediaUrls, params.result);
const seen = new Set(params.telemetry.toolMediaUrls);
for (const mediaUrl of mediaUrls) {
if (!seen.has(mediaUrl)) {
seen.add(mediaUrl);
params.telemetry.toolMediaUrls.push(mediaUrl);
}
}
if (media.audioAsVoice) {
params.telemetry.toolAudioAsVoice = true;
}
}
}
if (
!isMessagingTool(params.toolName) ||
!isMessagingToolSendAction(params.toolName, params.args)
) {
return;
}
params.telemetry.didSendViaMessagingTool = true;
const text = readFirstString(params.args, ["text", "message", "body", "content"]);
if (text) {
params.telemetry.messagingToolSentTexts.push(text);
}
params.telemetry.messagingToolSentMediaUrls.push(...collectMediaUrls(params.args));
params.telemetry.messagingToolSentTargets.push({
tool: params.toolName,
provider: readFirstString(params.args, ["provider", "channel"]) ?? params.toolName,
accountId: readFirstString(params.args, ["accountId", "account_id"]),
to: readFirstString(params.args, ["to", "target", "recipient"]),
threadId: readFirstString(params.args, ["threadId", "thread_id", "messageThreadId"]),
});
}
function convertToolContent(
content: TextContent | ImageContent,
): CodexDynamicToolCallOutputContentItem[] {
if (content.type === "text") {
return [{ type: "inputText", text: content.text }];
}
return [
{
type: "inputImage",
imageUrl: `data:${content.mimeType};base64,${content.data}`,
},
];
}
function toJsonValue(value: unknown): JsonValue {
try {
const text = JSON.stringify(value);
if (!text) {
return {};
}
return JSON.parse(text) as JsonValue;
} catch {
return {};
}
}
function jsonObjectToRecord(value: JsonValue | undefined): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return {};
}
return value as Record<string, unknown>;
}
function readFirstString(record: Record<string, unknown>, keys: string[]): string | undefined {
for (const key of keys) {
const value = record[key];
if (typeof value === "string" && value.trim()) {
return value.trim();
}
if (typeof value === "number" && Number.isFinite(value)) {
return String(value);
}
}
return undefined;
}
function collectMediaUrls(record: Record<string, unknown>): string[] {
const urls: string[] = [];
for (const key of ["mediaUrl", "media_url", "imageUrl", "image_url"]) {
const value = record[key];
if (typeof value === "string" && value.trim()) {
urls.push(value.trim());
}
}
for (const key of ["mediaUrls", "media_urls", "imageUrls", "image_urls"]) {
const value = record[key];
if (!Array.isArray(value)) {
continue;
}
for (const entry of value) {
if (typeof entry === "string" && entry.trim()) {
urls.push(entry.trim());
}
}
}
return urls;
}
function isCronAddAction(args: Record<string, unknown>): boolean {
const action = args.action;
return typeof action === "string" && action.trim().toLowerCase() === "add";
}

View File

@@ -0,0 +1,200 @@
import type { Api, Model } from "@mariozechner/pi-ai";
import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness";
import { describe, expect, it, vi } from "vitest";
import { CodexAppServerEventProjector } from "./event-projector.js";
function createParams(): EmbeddedRunAttemptParams {
return {
prompt: "hello",
sessionId: "session-1",
provider: "openai-codex",
modelId: "gpt-5.4-codex",
model: {
id: "gpt-5.4-codex",
name: "gpt-5.4-codex",
provider: "openai-codex",
api: "openai-codex-responses",
input: ["text"],
reasoning: true,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128_000,
maxTokens: 8_000,
} as Model<Api>,
thinkLevel: "medium",
} as unknown as EmbeddedRunAttemptParams;
}
describe("CodexAppServerEventProjector", () => {
it("projects assistant deltas and usage into embedded attempt results", async () => {
const onAssistantMessageStart = vi.fn();
const onPartialReply = vi.fn();
const params = {
...createParams(),
onAssistantMessageStart,
onPartialReply,
};
const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1");
await projector.handleNotification({
method: "item/agentMessage/delta",
params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "hel" },
});
await projector.handleNotification({
method: "item/agentMessage/delta",
params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "lo" },
});
await projector.handleNotification({
method: "thread/tokenUsage/updated",
params: {
threadId: "thread-1",
turnId: "turn-1",
tokenUsage: {
total: {
totalTokens: 12,
inputTokens: 5,
cachedInputTokens: 2,
outputTokens: 7,
},
},
},
});
await projector.handleNotification({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: {
id: "turn-1",
status: "completed",
items: [{ type: "agentMessage", id: "msg-1", text: "hello" }],
},
},
});
const result = projector.buildResult({
didSendViaMessagingTool: false,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],
});
expect(onAssistantMessageStart).toHaveBeenCalledTimes(1);
expect(onPartialReply).toHaveBeenLastCalledWith({ text: "hello" });
expect(result.assistantTexts).toEqual(["hello"]);
expect(result.lastAssistant?.content).toEqual([{ type: "text", text: "hello" }]);
expect(result.attemptUsage).toMatchObject({ input: 5, output: 7, cacheRead: 2, total: 12 });
expect(result.replayMetadata.replaySafe).toBe(true);
});
it("ignores notifications for other turns", async () => {
const params = createParams();
const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1");
await projector.handleNotification({
method: "item/agentMessage/delta",
params: { threadId: "thread-1", turnId: "turn-2", itemId: "msg-1", delta: "wrong" },
});
const result = projector.buildResult({
didSendViaMessagingTool: false,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],
});
expect(result.assistantTexts).toEqual([]);
});
it("projects reasoning end, plan updates, compaction state, and tool metadata", async () => {
const onReasoningStream = vi.fn();
const onReasoningEnd = vi.fn();
const onAgentEvent = vi.fn();
const params = {
...createParams(),
onReasoningStream,
onReasoningEnd,
onAgentEvent,
};
const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1");
await projector.handleNotification({
method: "item/reasoning/textDelta",
params: { threadId: "thread-1", turnId: "turn-1", itemId: "reason-1", delta: "thinking" },
});
await projector.handleNotification({
method: "item/plan/delta",
params: { threadId: "thread-1", turnId: "turn-1", itemId: "plan-1", delta: "- inspect\n" },
});
await projector.handleNotification({
method: "turn/plan/updated",
params: {
threadId: "thread-1",
turnId: "turn-1",
explanation: "next",
plan: [{ step: "patch", status: "in_progress" }],
},
});
await projector.handleNotification({
method: "item/started",
params: {
threadId: "thread-1",
turnId: "turn-1",
item: { type: "contextCompaction", id: "compact-1" },
},
});
expect(projector.isCompacting()).toBe(true);
await projector.handleNotification({
method: "item/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
item: { type: "contextCompaction", id: "compact-1" },
},
});
expect(projector.isCompacting()).toBe(false);
await projector.handleNotification({
method: "item/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
item: {
type: "dynamicToolCall",
id: "tool-1",
tool: "sessions_send",
status: "completed",
},
},
});
await projector.handleNotification({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: { id: "turn-1", status: "completed", items: [] },
},
});
const result = projector.buildResult({
didSendViaMessagingTool: false,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],
});
expect(onReasoningStream).toHaveBeenCalledWith({ text: "thinking" });
expect(onReasoningEnd).toHaveBeenCalledTimes(1);
expect(onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "plan",
data: expect.objectContaining({ steps: ["patch (in_progress)"] }),
}),
);
expect(onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "compaction",
data: expect.objectContaining({ phase: "start", itemId: "compact-1" }),
}),
);
expect(result.toolMetas).toEqual([{ toolName: "sessions_send", meta: "completed" }]);
expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 });
});
});

View File

@@ -0,0 +1,604 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, Usage } from "@mariozechner/pi-ai";
import {
formatErrorMessage,
normalizeUsage,
type NormalizedUsage,
type EmbeddedRunAttemptParams,
type EmbeddedRunAttemptResult,
type MessagingToolSend,
} from "openclaw/plugin-sdk/agent-harness";
import {
isJsonObject,
type CodexServerNotification,
type CodexThreadItem,
type CodexTurn,
type JsonObject,
type JsonValue,
} from "./protocol.js";
export type CodexAppServerToolTelemetry = {
didSendViaMessagingTool: boolean;
messagingToolSentTexts: string[];
messagingToolSentMediaUrls: string[];
messagingToolSentTargets: MessagingToolSend[];
toolMediaUrls?: string[];
toolAudioAsVoice?: boolean;
successfulCronAdds?: number;
};
const ZERO_USAGE: Usage = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
};
export class CodexAppServerEventProjector {
private readonly assistantTextByItem = new Map<string, string>();
private readonly reasoningTextByItem = new Map<string, string>();
private readonly planTextByItem = new Map<string, string>();
private readonly activeItemIds = new Set<string>();
private readonly completedItemIds = new Set<string>();
private readonly activeCompactionItemIds = new Set<string>();
private readonly toolMetas = new Map<string, { toolName: string; meta?: string }>();
private assistantStarted = false;
private reasoningStarted = false;
private reasoningEnded = false;
private completedTurn: CodexTurn | undefined;
private promptError: unknown;
private promptErrorSource: EmbeddedRunAttemptResult["promptErrorSource"] = null;
private aborted = false;
private tokenUsage: NormalizedUsage | undefined;
private guardianReviewCount = 0;
private completedCompactionCount = 0;
constructor(
private readonly params: EmbeddedRunAttemptParams,
private readonly threadId: string,
private readonly turnId: string,
) {}
async handleNotification(notification: CodexServerNotification): Promise<void> {
const params = isJsonObject(notification.params) ? notification.params : undefined;
if (!params || !this.isNotificationForTurn(params)) {
return;
}
switch (notification.method) {
case "item/agentMessage/delta":
await this.handleAssistantDelta(params);
break;
case "item/reasoning/summaryTextDelta":
case "item/reasoning/textDelta":
await this.handleReasoningDelta(params);
break;
case "item/plan/delta":
this.handlePlanDelta(params);
break;
case "turn/plan/updated":
this.handleTurnPlanUpdated(params);
break;
case "item/started":
this.handleItemStarted(params);
break;
case "item/completed":
this.handleItemCompleted(params);
break;
case "item/autoApprovalReview/started":
case "item/autoApprovalReview/completed":
this.guardianReviewCount += 1;
this.params.onAgentEvent?.({
stream: "codex_app_server.guardian",
data: { method: notification.method },
});
break;
case "thread/tokenUsage/updated":
this.handleTokenUsage(params);
break;
case "turn/completed":
await this.handleTurnCompleted(params);
break;
case "error":
this.promptError = readString(params, "message") ?? "codex app-server error";
this.promptErrorSource = "prompt";
break;
default:
break;
}
}
buildResult(toolTelemetry: CodexAppServerToolTelemetry): EmbeddedRunAttemptResult {
const assistantTexts = this.collectAssistantTexts();
const lastAssistant =
assistantTexts.length > 0
? this.createAssistantMessage(assistantTexts.join("\n\n"))
: undefined;
const messagesSnapshot: AgentMessage[] = [
{
role: "user",
content: this.params.prompt,
timestamp: Date.now(),
},
];
if (lastAssistant) {
messagesSnapshot.push(lastAssistant);
}
const turnFailed = this.completedTurn?.status === "failed";
const turnInterrupted = this.completedTurn?.status === "interrupted";
const promptError =
this.promptError ??
(turnFailed ? (this.completedTurn?.error?.message ?? "codex app-server turn failed") : null);
return {
aborted: this.aborted || turnInterrupted,
timedOut: false,
idleTimedOut: false,
timedOutDuringCompaction: false,
promptError,
promptErrorSource: promptError ? this.promptErrorSource || "prompt" : null,
sessionIdUsed: this.params.sessionId,
bootstrapPromptWarningSignaturesSeen: this.params.bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature: this.params.bootstrapPromptWarningSignature,
messagesSnapshot,
assistantTexts,
toolMetas: [...this.toolMetas.values()],
lastAssistant,
didSendViaMessagingTool: toolTelemetry.didSendViaMessagingTool,
messagingToolSentTexts: toolTelemetry.messagingToolSentTexts,
messagingToolSentMediaUrls: toolTelemetry.messagingToolSentMediaUrls,
messagingToolSentTargets: toolTelemetry.messagingToolSentTargets,
toolMediaUrls: toolTelemetry.toolMediaUrls,
toolAudioAsVoice: toolTelemetry.toolAudioAsVoice,
successfulCronAdds: toolTelemetry.successfulCronAdds,
cloudCodeAssistFormatError: false,
attemptUsage: this.tokenUsage,
replayMetadata: {
hadPotentialSideEffects: toolTelemetry.didSendViaMessagingTool,
replaySafe: !toolTelemetry.didSendViaMessagingTool,
},
itemLifecycle: {
startedCount: this.activeItemIds.size + this.completedItemIds.size,
completedCount: this.completedItemIds.size,
activeCount: this.activeItemIds.size,
...(this.completedCompactionCount > 0
? { compactionCount: this.completedCompactionCount }
: {}),
},
yieldDetected: false,
didSendDeterministicApprovalPrompt: this.guardianReviewCount > 0 ? false : undefined,
};
}
markTimedOut(): void {
this.aborted = true;
this.promptError = "codex app-server attempt timed out";
this.promptErrorSource = "prompt";
}
isCompacting(): boolean {
return this.activeCompactionItemIds.size > 0;
}
private async handleAssistantDelta(params: JsonObject): Promise<void> {
const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "assistant";
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
if (!this.assistantStarted) {
this.assistantStarted = true;
await this.params.onAssistantMessageStart?.();
}
const text = `${this.assistantTextByItem.get(itemId) ?? ""}${delta}`;
this.assistantTextByItem.set(itemId, text);
await this.params.onPartialReply?.({ text });
}
private async handleReasoningDelta(params: JsonObject): Promise<void> {
const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "reasoning";
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
this.reasoningStarted = true;
this.reasoningTextByItem.set(itemId, `${this.reasoningTextByItem.get(itemId) ?? ""}${delta}`);
await this.params.onReasoningStream?.({ text: delta });
}
private handlePlanDelta(params: JsonObject): void {
const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "plan";
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
const text = `${this.planTextByItem.get(itemId) ?? ""}${delta}`;
this.planTextByItem.set(itemId, text);
this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(text) });
}
private handleTurnPlanUpdated(params: JsonObject): void {
const plan = Array.isArray(params.plan)
? params.plan.flatMap((entry) => {
if (!isJsonObject(entry)) {
return [];
}
const step = readString(entry, "step");
const status = readString(entry, "status");
if (!step) {
return [];
}
return status ? [`${step} (${status})`] : [step];
})
: undefined;
this.emitPlanUpdate({
explanation: readNullableString(params, "explanation"),
steps: plan,
});
}
private handleItemStarted(params: JsonObject): void {
const item = readItem(params.item);
const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id");
if (itemId) {
this.activeItemIds.add(itemId);
}
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.add(itemId);
this.params.onAgentEvent?.({
stream: "compaction",
data: {
phase: "start",
backend: "codex-app-server",
threadId: this.threadId,
turnId: this.turnId,
itemId,
},
});
}
this.emitStandardItemEvent({ phase: "start", item });
this.params.onAgentEvent?.({
stream: "codex_app_server.item",
data: { phase: "started", itemId, type: item?.type },
});
}
private handleItemCompleted(params: JsonObject): void {
const item = readItem(params.item);
const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id");
if (itemId) {
this.activeItemIds.delete(itemId);
this.completedItemIds.add(itemId);
}
if (item?.type === "agentMessage" && typeof item.text === "string" && item.text) {
this.assistantTextByItem.set(item.id, item.text);
}
if (item?.type === "plan" && typeof item.text === "string" && item.text) {
this.planTextByItem.set(item.id, item.text);
this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(item.text) });
}
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.delete(itemId);
this.completedCompactionCount += 1;
this.params.onAgentEvent?.({
stream: "compaction",
data: {
phase: "end",
backend: "codex-app-server",
threadId: this.threadId,
turnId: this.turnId,
itemId,
},
});
}
this.recordToolMeta(item);
this.emitStandardItemEvent({ phase: "end", item });
this.params.onAgentEvent?.({
stream: "codex_app_server.item",
data: { phase: "completed", itemId, type: item?.type },
});
}
private handleTokenUsage(params: JsonObject): void {
const tokenUsage = isJsonObject(params.tokenUsage) ? params.tokenUsage : undefined;
const total = tokenUsage && isJsonObject(tokenUsage.total) ? tokenUsage.total : undefined;
if (!total) {
return;
}
this.tokenUsage = normalizeUsage({
input: readNumber(total, "inputTokens"),
output: readNumber(total, "outputTokens"),
cacheRead: readNumber(total, "cachedInputTokens"),
total: readNumber(total, "totalTokens"),
});
}
private async handleTurnCompleted(params: JsonObject): Promise<void> {
const turn = readTurn(params.turn);
if (!turn || turn.id !== this.turnId) {
return;
}
this.completedTurn = turn;
if (turn.status === "interrupted") {
this.aborted = true;
}
if (turn.status === "failed") {
this.promptError = turn.error?.message ?? "codex app-server turn failed";
this.promptErrorSource = "prompt";
}
for (const item of turn.items ?? []) {
if (item.type === "agentMessage" && typeof item.text === "string" && item.text) {
this.assistantTextByItem.set(item.id, item.text);
}
if (item.type === "plan" && typeof item.text === "string" && item.text) {
this.planTextByItem.set(item.id, item.text);
this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(item.text) });
}
this.recordToolMeta(item);
}
this.activeCompactionItemIds.clear();
await this.maybeEndReasoning();
}
private async maybeEndReasoning(): Promise<void> {
if (!this.reasoningStarted || this.reasoningEnded) {
return;
}
this.reasoningEnded = true;
await this.params.onReasoningEnd?.();
}
private emitPlanUpdate(params: { explanation?: string | null; steps?: string[] }): void {
if (!params.explanation && (!params.steps || params.steps.length === 0)) {
return;
}
this.params.onAgentEvent?.({
stream: "plan",
data: {
phase: "update",
title: "Plan updated",
source: "codex-app-server",
...(params.explanation ? { explanation: params.explanation } : {}),
...(params.steps && params.steps.length > 0 ? { steps: params.steps } : {}),
},
});
}
private emitStandardItemEvent(params: {
phase: "start" | "end";
item: CodexThreadItem | undefined;
}): void {
const { item } = params;
if (!item) {
return;
}
const kind = itemKind(item);
if (!kind) {
return;
}
this.params.onAgentEvent?.({
stream: "item",
data: {
itemId: item.id,
phase: params.phase,
kind,
title: itemTitle(item),
status: params.phase === "start" ? "running" : itemStatus(item),
...(itemName(item) ? { name: itemName(item) } : {}),
...(itemMeta(item) ? { meta: itemMeta(item) } : {}),
},
});
}
private recordToolMeta(item: CodexThreadItem | undefined): void {
if (!item) {
return;
}
const toolName = itemName(item);
if (!toolName) {
return;
}
this.toolMetas.set(item.id, {
toolName,
...(itemMeta(item) ? { meta: itemMeta(item) } : {}),
});
}
private collectAssistantTexts(): string[] {
return [...this.assistantTextByItem.values()].filter((text) => text.trim().length > 0);
}
private createAssistantMessage(text: string): AssistantMessage {
const usage: Usage = this.tokenUsage
? {
input: this.tokenUsage.input ?? 0,
output: this.tokenUsage.output ?? 0,
cacheRead: this.tokenUsage.cacheRead ?? 0,
cacheWrite: this.tokenUsage.cacheWrite ?? 0,
totalTokens:
this.tokenUsage.total ??
(this.tokenUsage.input ?? 0) +
(this.tokenUsage.output ?? 0) +
(this.tokenUsage.cacheRead ?? 0) +
(this.tokenUsage.cacheWrite ?? 0),
cost: ZERO_USAGE.cost,
}
: ZERO_USAGE;
return {
role: "assistant",
content: [{ type: "text", text }],
api: this.params.model.api ?? "openai-codex-responses",
provider: this.params.provider,
model: this.params.modelId,
usage,
stopReason: this.aborted ? "aborted" : this.promptError ? "error" : "stop",
errorMessage: this.promptError ? formatErrorMessage(this.promptError) : undefined,
timestamp: Date.now(),
};
}
private isNotificationForTurn(params: JsonObject): boolean {
const threadId = readString(params, "threadId");
const turnId = readString(params, "turnId");
return (!threadId || threadId === this.threadId) && (!turnId || turnId === this.turnId);
}
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}
function readNullableString(record: JsonObject, key: string): string | null | undefined {
const value = record[key];
if (value === null) {
return null;
}
return typeof value === "string" ? value : undefined;
}
function readNumber(record: JsonObject, key: string): number | undefined {
const value = record[key];
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function splitPlanText(text: string): string[] {
return text
.split(/\r?\n/)
.map((line) => line.trim().replace(/^[-*]\s+/, ""))
.filter((line) => line.length > 0);
}
function itemKind(
item: CodexThreadItem,
): "tool" | "command" | "patch" | "search" | "analysis" | undefined {
switch (item.type) {
case "dynamicToolCall":
case "mcpToolCall":
return "tool";
case "commandExecution":
return "command";
case "fileChange":
return "patch";
case "webSearch":
return "search";
case "reasoning":
case "contextCompaction":
return "analysis";
default:
return undefined;
}
}
function itemTitle(item: CodexThreadItem): string {
switch (item.type) {
case "commandExecution":
return "Command";
case "fileChange":
return "File change";
case "mcpToolCall":
return "MCP tool";
case "dynamicToolCall":
return "Tool";
case "webSearch":
return "Web search";
case "contextCompaction":
return "Context compaction";
case "reasoning":
return "Reasoning";
default:
return item.type;
}
}
function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" {
const status = readItemString(item, "status");
if (status === "failed") {
return "failed";
}
if (status === "inProgress" || status === "running") {
return "running";
}
return "completed";
}
function itemName(item: CodexThreadItem): string | undefined {
if (item.type === "dynamicToolCall" && typeof item.tool === "string") {
return item.tool;
}
if (item.type === "mcpToolCall" && typeof item.tool === "string") {
const server = typeof item.server === "string" ? item.server : undefined;
return server ? `${server}.${item.tool}` : item.tool;
}
if (item.type === "commandExecution") {
return "bash";
}
if (item.type === "fileChange") {
return "apply_patch";
}
if (item.type === "webSearch") {
return "web_search";
}
return undefined;
}
function itemMeta(item: CodexThreadItem): string | undefined {
if (item.type === "commandExecution" && typeof item.command === "string") {
return item.command;
}
if (item.type === "webSearch" && typeof item.query === "string") {
return item.query;
}
return readItemString(item, "status");
}
function readItemString(item: CodexThreadItem, key: string): string | undefined {
const value = (item as Record<string, unknown>)[key];
return typeof value === "string" ? value : undefined;
}
function readItem(value: JsonValue | undefined): CodexThreadItem | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const type = typeof value.type === "string" ? value.type : undefined;
const id = typeof value.id === "string" ? value.id : undefined;
if (!type || !id) {
return undefined;
}
return value as CodexThreadItem;
}
function readTurn(value: JsonValue | undefined): CodexTurn | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const id = typeof value.id === "string" ? value.id : undefined;
const status = typeof value.status === "string" ? value.status : undefined;
if (!id || !status) {
return undefined;
}
const items = Array.isArray(value.items)
? value.items.flatMap((item) => {
const parsed = readItem(item);
return parsed ? [parsed] : [];
})
: undefined;
return {
id,
status: status as CodexTurn["status"],
error: isJsonObject(value.error)
? {
message: typeof value.error.message === "string" ? value.error.message : undefined,
}
: null,
items,
};
}

View File

@@ -0,0 +1,185 @@
export type JsonPrimitive = null | boolean | number | string;
export type JsonValue = JsonPrimitive | JsonValue[] | { [key: string]: JsonValue };
export type JsonObject = { [key: string]: JsonValue };
export type RpcRequest = {
id?: number | string;
method: string;
params?: JsonValue;
};
export type RpcResponse = {
id: number | string;
result?: JsonValue;
error?: {
code?: number;
message: string;
data?: JsonValue;
};
};
export type RpcMessage = RpcRequest | RpcResponse;
export type CodexUserInput =
| {
type: "text";
text: string;
}
| {
type: "image";
url: string;
}
| {
type: "localImage";
path: string;
};
export type CodexDynamicToolSpec = {
name: string;
description: string;
inputSchema: JsonValue;
deferLoading?: boolean;
};
export type CodexThreadStartParams = {
model?: string | null;
modelProvider?: string | null;
cwd?: string | null;
approvalPolicy?: "never" | "on-request" | "on-failure" | "untrusted";
approvalsReviewer?: "user" | "guardian_subagent";
sandbox?: "read-only" | "workspace-write" | "danger-full-access";
config?: JsonObject | null;
serviceName?: string | null;
baseInstructions?: string | null;
developerInstructions?: string | null;
ephemeral?: boolean | null;
dynamicTools?: CodexDynamicToolSpec[] | null;
experimentalRawEvents: boolean;
persistExtendedHistory: boolean;
};
export type CodexThreadResumeParams = {
threadId: string;
};
export type CodexThreadStartResponse = {
thread: CodexThread;
model?: string | null;
modelProvider?: string | null;
};
export type CodexThreadResumeResponse = CodexThreadStartResponse;
export type CodexTurnStartParams = {
threadId: string;
input: CodexUserInput[];
cwd?: string | null;
approvalPolicy?: "never" | "on-request" | "on-failure" | "untrusted";
approvalsReviewer?: "user" | "guardian_subagent";
model?: string | null;
effort?: "minimal" | "low" | "medium" | "high" | "xhigh" | null;
};
export type CodexTurnSteerParams = {
threadId: string;
expectedTurnId: string;
input: CodexUserInput[];
};
export type CodexTurnInterruptParams = {
threadId: string;
turnId: string;
};
export type CodexTurnStartResponse = {
turn: CodexTurn;
};
export type CodexThread = {
id: string;
status?: string;
cwd?: string | null;
turns?: CodexTurn[];
};
export type CodexTurn = {
id: string;
status: "completed" | "interrupted" | "failed" | "inProgress";
error?: {
message?: string;
} | null;
items?: CodexThreadItem[];
};
export type CodexThreadItem =
| {
type: "agentMessage";
id: string;
text?: string;
}
| {
type: "reasoning";
id: string;
summary?: string[];
content?: string[];
}
| {
type: "plan";
id: string;
text?: string;
}
| {
type: "dynamicToolCall";
id: string;
tool?: string;
status?: string;
}
| {
type: string;
id: string;
status?: string;
[key: string]: JsonValue | undefined;
};
export type CodexServerNotification = {
method: string;
params?: JsonValue;
};
export type CodexDynamicToolCallParams = {
threadId: string;
turnId: string;
callId: string;
tool: string;
arguments?: JsonValue;
};
export type CodexDynamicToolCallResponse = {
contentItems: CodexDynamicToolCallOutputContentItem[];
success: boolean;
};
export type CodexDynamicToolCallOutputContentItem =
| {
type: "inputText";
text: string;
}
| {
type: "inputImage";
imageUrl: string;
};
export function isJsonObject(value: JsonValue | undefined): value is JsonObject {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}
export function isRpcResponse(message: RpcMessage): message is RpcResponse {
return "id" in message && !("method" in message);
}
export function coerceJsonObject(value: unknown): JsonObject | undefined {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return undefined;
}
return value as JsonObject;
}

View File

@@ -0,0 +1,229 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { Api, Model } from "@mariozechner/pi-ai";
import {
abortAgentHarnessRun,
queueAgentHarnessMessage,
type EmbeddedRunAttemptParams,
} from "openclaw/plugin-sdk/agent-harness";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CodexServerNotification } from "./protocol.js";
import { runCodexAppServerAttempt, __testing } from "./run-attempt.js";
let tempDir: string;
function createParams(sessionFile: string, workspaceDir: string): EmbeddedRunAttemptParams {
return {
prompt: "hello",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile,
workspaceDir,
runId: "run-1",
provider: "codex",
modelId: "gpt-5.4-codex",
model: {
id: "gpt-5.4-codex",
name: "gpt-5.4-codex",
provider: "codex",
api: "openai-codex-responses",
input: ["text"],
reasoning: true,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128_000,
maxTokens: 8_000,
} as Model<Api>,
thinkLevel: "medium",
disableTools: true,
timeoutMs: 5_000,
authStorage: {} as never,
modelRegistry: {} as never,
} as EmbeddedRunAttemptParams;
}
describe("runCodexAppServerAttempt", () => {
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-run-"));
});
afterEach(async () => {
__testing.resetCodexAppServerClientFactoryForTests();
vi.restoreAllMocks();
await fs.rm(tempDir, { recursive: true, force: true });
});
it("forwards queued user input and aborts the active app-server turn", async () => {
const requests: Array<{ method: string; params: unknown }> = [];
const request = vi.fn(async (method: string, params?: unknown) => {
requests.push({ method, params });
if (method === "thread/start") {
return { thread: { id: "thread-1" }, model: "gpt-5.4-codex", modelProvider: "openai" };
}
if (method === "turn/start") {
return { turn: { id: "turn-1", status: "inProgress" } };
}
return {};
});
__testing.setCodexAppServerClientFactoryForTests(
async () =>
({
request,
addNotificationHandler: () => () => undefined,
addRequestHandler: () => () => undefined,
}) as never,
);
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
);
await vi.waitFor(() =>
expect(requests.some((entry) => entry.method === "turn/start")).toBe(true),
);
expect(queueAgentHarnessMessage("session-1", "more context")).toBe(true);
await vi.waitFor(() =>
expect(requests.some((entry) => entry.method === "turn/steer")).toBe(true),
);
expect(abortAgentHarnessRun("session-1")).toBe(true);
await vi.waitFor(() =>
expect(requests.some((entry) => entry.method === "turn/interrupt")).toBe(true),
);
const result = await run;
expect(result.aborted).toBe(true);
expect(requests).toEqual(
expect.arrayContaining([
{
method: "thread/start",
params: expect.objectContaining({
model: "gpt-5.4-codex",
modelProvider: "openai",
}),
},
{
method: "turn/steer",
params: {
threadId: "thread-1",
expectedTurnId: "turn-1",
input: [{ type: "text", text: "more context" }],
},
},
{
method: "turn/interrupt",
params: { threadId: "thread-1", turnId: "turn-1" },
},
]),
);
});
it("forwards image attachments to the app-server turn input", async () => {
const requests: Array<{ method: string; params: unknown }> = [];
let notify: (notification: CodexServerNotification) => Promise<void> = async () => undefined;
const request = vi.fn(async (method: string, params?: unknown) => {
requests.push({ method, params });
if (method === "thread/start") {
return { thread: { id: "thread-1" }, model: "gpt-5.4-codex", modelProvider: "openai" };
}
if (method === "turn/start") {
return { turn: { id: "turn-1", status: "inProgress" } };
}
return {};
});
__testing.setCodexAppServerClientFactoryForTests(
async () =>
({
request,
addNotificationHandler: (handler: typeof notify) => {
notify = handler;
return () => undefined;
},
addRequestHandler: () => () => undefined,
}) as never,
);
const params = createParams(
path.join(tempDir, "session.jsonl"),
path.join(tempDir, "workspace"),
);
params.model = {
...params.model,
input: ["text", "image"],
} as Model<Api>;
params.images = [
{
type: "image",
mimeType: "image/png",
data: "aW1hZ2UtYnl0ZXM=",
},
];
const run = runCodexAppServerAttempt(params);
await vi.waitFor(() =>
expect(requests.some((entry) => entry.method === "turn/start")).toBe(true),
);
await notify({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: { id: "turn-1", status: "completed" },
},
});
await run;
expect(requests).toEqual(
expect.arrayContaining([
{
method: "turn/start",
params: expect.objectContaining({
input: [
{ type: "text", text: "hello" },
{ type: "image", url: "data:image/png;base64,aW1hZ2UtYnl0ZXM=" },
],
}),
},
]),
);
});
it("does not drop turn completion notifications emitted while turn/start is in flight", async () => {
let notify: (notification: CodexServerNotification) => Promise<void> = async () => undefined;
const request = vi.fn(async (method: string) => {
if (method === "thread/start") {
return { thread: { id: "thread-1" }, model: "gpt-5.4-codex", modelProvider: "openai" };
}
if (method === "turn/start") {
await notify({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: { id: "turn-1", status: "completed" },
},
});
return { turn: { id: "turn-1", status: "completed" } };
}
return {};
});
__testing.setCodexAppServerClientFactoryForTests(
async () =>
({
request,
addNotificationHandler: (handler: typeof notify) => {
notify = handler;
return () => undefined;
},
addRequestHandler: () => () => undefined,
}) as never,
);
await expect(
runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
),
).resolves.toMatchObject({
aborted: false,
timedOut: false,
});
});
});

View File

@@ -0,0 +1,577 @@
import fs from "node:fs/promises";
import {
buildEmbeddedAttemptToolRunContext,
clearActiveEmbeddedRun,
createOpenClawCodingTools,
embeddedAgentLog,
isSubagentSessionKey,
normalizeProviderToolSchemas,
resolveAttemptSpawnWorkspaceDir,
resolveModelAuthMode,
resolveOpenClawAgentDir,
resolveSandboxContext,
resolveSessionAgentIds,
resolveUserPath,
setActiveEmbeddedRun,
supportsModelTools,
type EmbeddedRunAttemptParams,
type EmbeddedRunAttemptResult,
} from "openclaw/plugin-sdk/agent-harness";
import { handleCodexAppServerApprovalRequest } from "./approval-bridge.js";
import {
getSharedCodexAppServerClient,
isCodexAppServerApprovalRequest,
type CodexAppServerClient,
} from "./client.js";
import { createCodexDynamicToolBridge } from "./dynamic-tools.js";
import { CodexAppServerEventProjector } from "./event-projector.js";
import {
isJsonObject,
type CodexServerNotification,
type CodexDynamicToolCallParams,
type CodexThreadResumeResponse,
type CodexThreadStartResponse,
type CodexTurnStartResponse,
type CodexUserInput,
type JsonObject,
type JsonValue,
} from "./protocol.js";
import {
clearCodexAppServerBinding,
readCodexAppServerBinding,
writeCodexAppServerBinding,
type CodexAppServerThreadBinding,
} from "./session-binding.js";
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
type CodexAppServerClientFactory = () => Promise<CodexAppServerClient>;
let clientFactory: CodexAppServerClientFactory = getSharedCodexAppServerClient;
export async function runCodexAppServerAttempt(
params: EmbeddedRunAttemptParams,
): Promise<EmbeddedRunAttemptResult> {
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
await fs.mkdir(resolvedWorkspace, { recursive: true });
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
const effectiveWorkspace = sandbox?.enabled
? sandbox.workspaceAccess === "rw"
? resolvedWorkspace
: sandbox.workspaceDir
: resolvedWorkspace;
await fs.mkdir(effectiveWorkspace, { recursive: true });
const runAbortController = new AbortController();
const abortFromUpstream = () => {
runAbortController.abort(params.abortSignal?.reason ?? "upstream_abort");
};
if (params.abortSignal?.aborted) {
abortFromUpstream();
} else {
params.abortSignal?.addEventListener("abort", abortFromUpstream, { once: true });
}
const { sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
agentId: params.agentId,
});
const tools = await buildDynamicTools({
params,
resolvedWorkspace,
effectiveWorkspace,
sandboxSessionKey,
sandbox,
runAbortController,
sessionAgentId,
});
const toolBridge = createCodexDynamicToolBridge({
tools,
signal: runAbortController.signal,
});
const client = await clientFactory();
const thread = await startOrResumeThread({
client,
params,
cwd: effectiveWorkspace,
dynamicTools: toolBridge.specs,
});
let projector: CodexAppServerEventProjector | undefined;
let turnId: string | undefined;
const pendingNotifications: CodexServerNotification[] = [];
let completed = false;
let timedOut = false;
let resolveCompletion: (() => void) | undefined;
const completion = new Promise<void>((resolve) => {
resolveCompletion = resolve;
});
const handleNotification = async (notification: CodexServerNotification) => {
if (!projector || !turnId) {
pendingNotifications.push(notification);
return;
}
await projector.handleNotification(notification);
if (
notification.method === "turn/completed" &&
isTurnNotification(notification.params, turnId)
) {
completed = true;
resolveCompletion?.();
}
};
const notificationCleanup = client.addNotificationHandler(handleNotification);
const requestCleanup = client.addRequestHandler(async (request) => {
if (!turnId) {
return undefined;
}
if (request.method !== "item/tool/call") {
if (isCodexAppServerApprovalRequest(request.method)) {
return handleApprovalRequest({
method: request.method,
params: request.params,
paramsForRun: params,
threadId: thread.threadId,
turnId,
signal: runAbortController.signal,
});
}
return undefined;
}
const call = readDynamicToolCallParams(request.params);
if (!call || call.threadId !== thread.threadId || call.turnId !== turnId) {
return undefined;
}
return toolBridge.handleToolCall(call) as Promise<JsonValue>;
});
let turn: CodexTurnStartResponse;
try {
turn = await client.request<CodexTurnStartResponse>("turn/start", {
threadId: thread.threadId,
input: buildUserInput(params),
cwd: effectiveWorkspace,
approvalPolicy: resolveAppServerApprovalPolicy(),
approvalsReviewer: resolveApprovalsReviewer(),
model: params.modelId,
effort: resolveReasoningEffort(params.thinkLevel),
});
} catch (error) {
notificationCleanup();
requestCleanup();
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
throw error;
}
turnId = turn.turn.id;
projector = new CodexAppServerEventProjector(params, thread.threadId, turnId);
for (const notification of pendingNotifications.splice(0)) {
await handleNotification(notification);
}
const activeTurnId = turnId;
const activeProjector = projector;
const handle = {
kind: "embedded" as const,
queueMessage: async (text: string) => {
await client.request("turn/steer", {
threadId: thread.threadId,
expectedTurnId: activeTurnId,
input: [{ type: "text", text }],
});
},
isStreaming: () => !completed,
isCompacting: () => projector?.isCompacting() ?? false,
cancel: () => runAbortController.abort("cancelled"),
abort: () => runAbortController.abort("aborted"),
};
setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
const timeout = setTimeout(
() => {
timedOut = true;
projector?.markTimedOut();
runAbortController.abort("timeout");
},
Math.max(100, params.timeoutMs),
);
const abortListener = () => {
void client.request("turn/interrupt", {
threadId: thread.threadId,
turnId: activeTurnId,
});
resolveCompletion?.();
};
runAbortController.signal.addEventListener("abort", abortListener, { once: true });
if (runAbortController.signal.aborted) {
abortListener();
}
try {
await completion;
const result = activeProjector.buildResult(toolBridge.telemetry);
await mirrorTranscriptBestEffort({
params,
result,
threadId: thread.threadId,
turnId: activeTurnId,
});
return {
...result,
timedOut,
aborted: result.aborted || runAbortController.signal.aborted,
promptError: timedOut ? "codex app-server attempt timed out" : result.promptError,
promptErrorSource: timedOut ? "prompt" : result.promptErrorSource,
};
} finally {
clearTimeout(timeout);
notificationCleanup();
requestCleanup();
runAbortController.signal.removeEventListener("abort", abortListener);
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
}
}
type DynamicToolBuildParams = {
params: EmbeddedRunAttemptParams;
resolvedWorkspace: string;
effectiveWorkspace: string;
sandboxSessionKey: string;
sandbox: Awaited<ReturnType<typeof resolveSandboxContext>>;
runAbortController: AbortController;
sessionAgentId: string | undefined;
};
async function buildDynamicTools(input: DynamicToolBuildParams) {
const { params } = input;
if (params.disableTools || !supportsModelTools(params.model)) {
return [];
}
const modelHasVision = params.model.input?.includes("image") ?? false;
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
const allTools = createOpenClawCodingTools({
agentId: input.sessionAgentId,
...buildEmbeddedAttemptToolRunContext(params),
exec: {
...params.execOverrides,
elevated: params.bashElevated,
},
sandbox: input.sandbox,
messageProvider: params.messageChannel ?? params.messageProvider,
agentAccountId: params.agentAccountId,
messageTo: params.messageTo,
messageThreadId: params.messageThreadId,
groupId: params.groupId,
groupChannel: params.groupChannel,
groupSpace: params.groupSpace,
spawnedBy: params.spawnedBy,
senderId: params.senderId,
senderName: params.senderName,
senderUsername: params.senderUsername,
senderE164: params.senderE164,
senderIsOwner: params.senderIsOwner,
allowGatewaySubagentBinding: params.allowGatewaySubagentBinding,
sessionKey: input.sandboxSessionKey,
sessionId: params.sessionId,
runId: params.runId,
agentDir,
workspaceDir: input.effectiveWorkspace,
spawnWorkspaceDir: resolveAttemptSpawnWorkspaceDir({
sandbox: input.sandbox,
resolvedWorkspace: input.resolvedWorkspace,
}),
config: params.config,
abortSignal: input.runAbortController.signal,
modelProvider: params.model.provider,
modelId: params.modelId,
modelCompat: params.model.compat,
modelApi: params.model.api,
modelContextWindowTokens: params.model.contextWindow,
modelAuthMode: resolveModelAuthMode(params.model.provider, params.config),
currentChannelId: params.currentChannelId,
currentThreadTs: params.currentThreadTs,
currentMessageId: params.currentMessageId,
replyToMode: params.replyToMode,
hasRepliedRef: params.hasRepliedRef,
modelHasVision,
requireExplicitMessageTarget:
params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey),
disableMessageTool: params.disableMessageTool,
onYield: (message) => {
params.onAgentEvent?.({
stream: "codex_app_server.tool",
data: { name: "sessions_yield", message },
});
input.runAbortController.abort("sessions_yield");
},
});
const filteredTools =
params.toolsAllow && params.toolsAllow.length > 0
? allTools.filter((tool) => params.toolsAllow?.includes(tool.name))
: allTools;
return normalizeProviderToolSchemas({
tools: filteredTools,
provider: params.provider,
config: params.config,
workspaceDir: input.effectiveWorkspace,
env: process.env,
modelId: params.modelId,
modelApi: params.model.api,
model: params.model,
});
}
async function startOrResumeThread(params: {
client: CodexAppServerClient;
params: EmbeddedRunAttemptParams;
cwd: string;
dynamicTools: JsonValue[];
}): Promise<CodexAppServerThreadBinding> {
const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools);
const binding = await readCodexAppServerBinding(params.params.sessionFile);
if (binding?.threadId) {
if (binding.dynamicToolsFingerprint !== dynamicToolsFingerprint) {
embeddedAgentLog.debug(
"codex app-server dynamic tool catalog changed; starting a new thread",
{
threadId: binding.threadId,
},
);
await clearCodexAppServerBinding(params.params.sessionFile);
} else {
try {
const response = await params.client.request<CodexThreadResumeResponse>("thread/resume", {
threadId: binding.threadId,
});
await writeCodexAppServerBinding(params.params.sessionFile, {
threadId: response.thread.id,
cwd: params.cwd,
model: params.params.modelId,
modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider),
dynamicToolsFingerprint,
createdAt: binding.createdAt,
});
return {
...binding,
threadId: response.thread.id,
cwd: params.cwd,
model: params.params.modelId,
modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider),
dynamicToolsFingerprint,
};
} catch (error) {
embeddedAgentLog.warn("codex app-server thread resume failed; starting a new thread", {
error,
});
await clearCodexAppServerBinding(params.params.sessionFile);
}
}
}
const response = await params.client.request<CodexThreadStartResponse>("thread/start", {
model: params.params.modelId,
modelProvider: normalizeModelProvider(params.params.provider),
cwd: params.cwd,
approvalPolicy: resolveAppServerApprovalPolicy(),
approvalsReviewer: resolveApprovalsReviewer(),
sandbox: resolveAppServerSandbox(),
serviceName: "OpenClaw",
developerInstructions: buildDeveloperInstructions(params.params),
dynamicTools: params.dynamicTools,
experimentalRawEvents: true,
persistExtendedHistory: true,
});
const createdAt = new Date().toISOString();
await writeCodexAppServerBinding(params.params.sessionFile, {
threadId: response.thread.id,
cwd: params.cwd,
model: response.model ?? params.params.modelId,
modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider),
dynamicToolsFingerprint,
createdAt,
});
return {
schemaVersion: 1,
threadId: response.thread.id,
sessionFile: params.params.sessionFile,
cwd: params.cwd,
model: response.model ?? params.params.modelId,
modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider),
dynamicToolsFingerprint,
createdAt,
updatedAt: createdAt,
};
}
function fingerprintDynamicTools(dynamicTools: JsonValue[]): string {
return JSON.stringify(dynamicTools.map(stabilizeJsonValue));
}
function stabilizeJsonValue(value: JsonValue): JsonValue {
if (Array.isArray(value)) {
return value.map(stabilizeJsonValue);
}
if (!isJsonObject(value)) {
return value;
}
const stable: JsonObject = {};
for (const [key, child] of Object.entries(value).toSorted(([left], [right]) =>
left.localeCompare(right),
)) {
stable[key] = stabilizeJsonValue(child);
}
return stable;
}
function buildDeveloperInstructions(params: EmbeddedRunAttemptParams): string {
const sections = [
"You are running inside OpenClaw. Use OpenClaw dynamic tools for messaging, cron, sessions, and host actions when available.",
"Preserve the user's existing channel/session context. If sending a channel reply, use the OpenClaw messaging tool instead of describing that you would reply.",
params.extraSystemPrompt,
params.skillsSnapshot?.prompt,
];
return sections.filter((section) => typeof section === "string" && section.trim()).join("\n\n");
}
function buildUserInput(params: EmbeddedRunAttemptParams): CodexUserInput[] {
return [
{ type: "text", text: params.prompt },
...(params.images ?? []).map(
(image): CodexUserInput => ({
type: "image",
url: `data:${image.mimeType};base64,${image.data}`,
}),
),
];
}
function normalizeModelProvider(provider: string): string {
return provider === "codex" || provider === "openai-codex" ? "openai" : provider;
}
function resolveAppServerApprovalPolicy(): "never" | "on-request" | "on-failure" | "untrusted" {
const raw = process.env.OPENCLAW_CODEX_APP_SERVER_APPROVAL_POLICY?.trim();
if (raw === "on-request" || raw === "on-failure" || raw === "untrusted") {
return raw;
}
return "never";
}
function resolveAppServerSandbox(): "read-only" | "workspace-write" | "danger-full-access" {
const raw = process.env.OPENCLAW_CODEX_APP_SERVER_SANDBOX?.trim();
if (raw === "read-only" || raw === "danger-full-access") {
return raw;
}
return "workspace-write";
}
function resolveApprovalsReviewer(): "user" | "guardian_subagent" {
return process.env.OPENCLAW_CODEX_APP_SERVER_GUARDIAN === "1" ? "guardian_subagent" : "user";
}
function resolveReasoningEffort(
thinkLevel: EmbeddedRunAttemptParams["thinkLevel"],
): "minimal" | "low" | "medium" | "high" | "xhigh" | null {
if (
thinkLevel === "minimal" ||
thinkLevel === "low" ||
thinkLevel === "medium" ||
thinkLevel === "high" ||
thinkLevel === "xhigh"
) {
return thinkLevel;
}
return null;
}
function readDynamicToolCallParams(
value: JsonValue | undefined,
): CodexDynamicToolCallParams | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const threadId = readString(value, "threadId");
const turnId = readString(value, "turnId");
const callId = readString(value, "callId");
const tool = readString(value, "tool");
if (!threadId || !turnId || !callId || !tool) {
return undefined;
}
return {
threadId,
turnId,
callId,
tool,
arguments: value.arguments,
};
}
function isTurnNotification(value: JsonValue | undefined, turnId: string): boolean {
if (!isJsonObject(value)) {
return false;
}
const directTurnId = readString(value, "turnId");
if (directTurnId === turnId) {
return true;
}
const turn = isJsonObject(value.turn) ? value.turn : undefined;
return readString(turn ?? {}, "id") === turnId;
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}
async function mirrorTranscriptBestEffort(params: {
params: EmbeddedRunAttemptParams;
result: EmbeddedRunAttemptResult;
threadId: string;
turnId: string;
}): Promise<void> {
try {
await mirrorCodexAppServerTranscript({
sessionFile: params.params.sessionFile,
sessionKey: params.params.sessionKey,
messages: params.result.messagesSnapshot,
idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`,
});
} catch (error) {
embeddedAgentLog.warn("failed to mirror codex app-server transcript", { error });
}
}
function handleApprovalRequest(params: {
method: string;
params: JsonValue | undefined;
paramsForRun: EmbeddedRunAttemptParams;
threadId: string;
turnId: string;
signal?: AbortSignal;
}): Promise<JsonValue | undefined> {
return handleCodexAppServerApprovalRequest({
method: params.method,
requestParams: params.params,
paramsForRun: params.paramsForRun,
threadId: params.threadId,
turnId: params.turnId,
signal: params.signal,
});
}
export const __testing = {
setCodexAppServerClientFactoryForTests(factory: CodexAppServerClientFactory): void {
clientFactory = factory;
},
resetCodexAppServerClientFactoryForTests(): void {
clientFactory = getSharedCodexAppServerClient;
},
} as const;

View File

@@ -0,0 +1,52 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
clearCodexAppServerBinding,
readCodexAppServerBinding,
resolveCodexAppServerBindingPath,
writeCodexAppServerBinding,
} from "./session-binding.js";
let tempDir: string;
describe("codex app-server session binding", () => {
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-binding-"));
});
afterEach(async () => {
await fs.rm(tempDir, { recursive: true, force: true });
});
it("round-trips the thread binding beside the PI session file", async () => {
const sessionFile = path.join(tempDir, "session.json");
await writeCodexAppServerBinding(sessionFile, {
threadId: "thread-123",
cwd: tempDir,
model: "gpt-5.4-codex",
modelProvider: "openai",
dynamicToolsFingerprint: "tools-v1",
});
const binding = await readCodexAppServerBinding(sessionFile);
expect(binding).toMatchObject({
schemaVersion: 1,
threadId: "thread-123",
sessionFile,
cwd: tempDir,
model: "gpt-5.4-codex",
modelProvider: "openai",
dynamicToolsFingerprint: "tools-v1",
});
await expect(fs.stat(resolveCodexAppServerBindingPath(sessionFile))).resolves.toBeTruthy();
});
it("clears missing bindings without throwing", async () => {
const sessionFile = path.join(tempDir, "missing.json");
await clearCodexAppServerBinding(sessionFile);
await expect(readCodexAppServerBinding(sessionFile)).resolves.toBeUndefined();
});
});

View File

@@ -0,0 +1,98 @@
import fs from "node:fs/promises";
import { embeddedAgentLog } from "openclaw/plugin-sdk/agent-harness";
export type CodexAppServerThreadBinding = {
schemaVersion: 1;
threadId: string;
sessionFile: string;
cwd: string;
model?: string;
modelProvider?: string;
dynamicToolsFingerprint?: string;
createdAt: string;
updatedAt: string;
};
export function resolveCodexAppServerBindingPath(sessionFile: string): string {
return `${sessionFile}.codex-app-server.json`;
}
export async function readCodexAppServerBinding(
sessionFile: string,
): Promise<CodexAppServerThreadBinding | undefined> {
const path = resolveCodexAppServerBindingPath(sessionFile);
let raw: string;
try {
raw = await fs.readFile(path, "utf8");
} catch (error) {
if (isNotFound(error)) {
return undefined;
}
embeddedAgentLog.warn("failed to read codex app-server binding", { path, error });
return undefined;
}
try {
const parsed = JSON.parse(raw) as Partial<CodexAppServerThreadBinding>;
if (parsed.schemaVersion !== 1 || typeof parsed.threadId !== "string") {
return undefined;
}
return {
schemaVersion: 1,
threadId: parsed.threadId,
sessionFile,
cwd: typeof parsed.cwd === "string" ? parsed.cwd : "",
model: typeof parsed.model === "string" ? parsed.model : undefined,
modelProvider: typeof parsed.modelProvider === "string" ? parsed.modelProvider : undefined,
dynamicToolsFingerprint:
typeof parsed.dynamicToolsFingerprint === "string"
? parsed.dynamicToolsFingerprint
: undefined,
createdAt: typeof parsed.createdAt === "string" ? parsed.createdAt : new Date().toISOString(),
updatedAt: typeof parsed.updatedAt === "string" ? parsed.updatedAt : new Date().toISOString(),
};
} catch (error) {
embeddedAgentLog.warn("failed to parse codex app-server binding", { path, error });
return undefined;
}
}
export async function writeCodexAppServerBinding(
sessionFile: string,
binding: Omit<
CodexAppServerThreadBinding,
"schemaVersion" | "sessionFile" | "createdAt" | "updatedAt"
> & {
createdAt?: string;
},
): Promise<void> {
const now = new Date().toISOString();
const payload: CodexAppServerThreadBinding = {
schemaVersion: 1,
sessionFile,
threadId: binding.threadId,
cwd: binding.cwd,
model: binding.model,
modelProvider: binding.modelProvider,
dynamicToolsFingerprint: binding.dynamicToolsFingerprint,
createdAt: binding.createdAt ?? now,
updatedAt: now,
};
await fs.writeFile(
resolveCodexAppServerBindingPath(sessionFile),
`${JSON.stringify(payload, null, 2)}\n`,
);
}
export async function clearCodexAppServerBinding(sessionFile: string): Promise<void> {
try {
await fs.unlink(resolveCodexAppServerBindingPath(sessionFile));
} catch (error) {
if (!isNotFound(error)) {
embeddedAgentLog.warn("failed to clear codex app-server binding", { sessionFile, error });
}
}
}
function isNotFound(error: unknown): boolean {
return Boolean(error && typeof error === "object" && "code" in error && error.code === "ENOENT");
}

View File

@@ -0,0 +1,98 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
let tempDir: string;
describe("mirrorCodexAppServerTranscript", () => {
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-transcript-"));
});
afterEach(async () => {
await fs.rm(tempDir, { recursive: true, force: true });
});
it("mirrors user and assistant messages into the PI transcript", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
await mirrorCodexAppServerTranscript({
sessionFile,
sessionKey: "agent:main:session-1",
messages: [
{ role: "user", content: "hello", timestamp: 1 },
{
role: "assistant",
content: [{ type: "text", text: "hi" }],
api: "openai-codex-responses",
provider: "openai-codex",
model: "gpt-5.4-codex",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: 2,
},
],
});
const records = (await fs.readFile(sessionFile, "utf8"))
.trim()
.split("\n")
.map((line) => JSON.parse(line) as { type?: string; message?: { role?: string } });
expect(records[0]?.type).toBe("session");
expect(records.slice(1).map((record) => record.message?.role)).toEqual(["user", "assistant"]);
});
it("deduplicates app-server turn mirrors by idempotency scope", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const messages = [
{ role: "user" as const, content: "hello", timestamp: 1 },
{
role: "assistant" as const,
content: [{ type: "text" as const, text: "hi" }],
api: "openai-codex-responses",
provider: "openai-codex",
model: "gpt-5.4-codex",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop" as const,
timestamp: 2,
},
];
await mirrorCodexAppServerTranscript({
sessionFile,
messages,
idempotencyScope: "codex-app-server:thread-1:turn-1",
});
await mirrorCodexAppServerTranscript({
sessionFile,
messages,
idempotencyScope: "codex-app-server:thread-1:turn-1",
});
const records = (await fs.readFile(sessionFile, "utf8"))
.trim()
.split("\n")
.map((line) => JSON.parse(line) as { message?: { role?: string; idempotencyKey?: string } });
expect(records.slice(1).map((record) => record.message?.role)).toEqual(["user", "assistant"]);
expect(records.slice(1).map((record) => record.message?.idempotencyKey)).toEqual([
"codex-app-server:thread-1:turn-1:user:0",
"codex-app-server:thread-1:turn-1:assistant:1",
]);
});
});

View File

@@ -0,0 +1,83 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
acquireSessionWriteLock,
emitSessionTranscriptUpdate,
} from "openclaw/plugin-sdk/agent-harness";
export async function mirrorCodexAppServerTranscript(params: {
sessionFile: string;
sessionKey?: string;
messages: AgentMessage[];
idempotencyScope?: string;
}): Promise<void> {
const messages = params.messages.filter(
(message) => message.role === "user" || message.role === "assistant",
);
if (messages.length === 0) {
return;
}
await fs.mkdir(path.dirname(params.sessionFile), { recursive: true });
const lock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
timeoutMs: 10_000,
});
try {
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
const sessionManager = SessionManager.open(params.sessionFile);
for (const [index, message] of messages.entries()) {
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${message.role}:${index}`
: undefined;
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
continue;
}
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as Parameters<SessionManager["appendMessage"]>[0];
sessionManager.appendMessage(transcriptMessage);
if (idempotencyKey) {
existingIdempotencyKeys.add(idempotencyKey);
}
}
} finally {
await lock.release();
}
if (params.sessionKey) {
emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey });
} else {
emitSessionTranscriptUpdate(params.sessionFile);
}
}
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
const keys = new Set<string>();
let raw: string;
try {
raw = await fs.readFile(sessionFile, "utf8");
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
return keys;
}
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
try {
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
if (typeof parsed.message?.idempotencyKey === "string") {
keys.add(parsed.message.idempotencyKey);
}
} catch {
continue;
}
}
return keys;
}

View File

@@ -0,0 +1,48 @@
import type { AgentHarness } from "openclaw/plugin-sdk/agent-harness";
import { listCodexAppServerModels } from "./app-server/client.js";
import type {
CodexAppServerListModelsOptions,
CodexAppServerModel,
CodexAppServerModelListResult,
} from "./app-server/client.js";
import { maybeCompactCodexAppServerSession } from "./app-server/compact.js";
import { runCodexAppServerAttempt } from "./app-server/run-attempt.js";
import { clearCodexAppServerBinding } from "./app-server/session-binding.js";
const DEFAULT_CODEX_HARNESS_PROVIDER_IDS = new Set(["codex", "openai-codex"]);
export type { CodexAppServerListModelsOptions, CodexAppServerModel, CodexAppServerModelListResult };
export { listCodexAppServerModels };
export function createCodexAppServerAgentHarness(options?: {
id?: string;
label?: string;
providerIds?: Iterable<string>;
}): AgentHarness {
const providerIds = new Set(
[...(options?.providerIds ?? DEFAULT_CODEX_HARNESS_PROVIDER_IDS)].map((id) =>
id.trim().toLowerCase(),
),
);
return {
id: options?.id ?? "codex",
label: options?.label ?? "Codex agent harness",
supports: (ctx) => {
const provider = ctx.provider.trim().toLowerCase();
if (providerIds.has(provider)) {
return { supported: true, priority: 100 };
}
return {
supported: false,
reason: `provider is not one of: ${[...providerIds].toSorted().join(", ")}`,
};
},
runAttempt: runCodexAppServerAttempt,
compact: maybeCompactCodexAppServerSession,
reset: async (params) => {
if (params.sessionFile) {
await clearCodexAppServerBinding(params.sessionFile);
}
},
};
}

View File

@@ -0,0 +1,29 @@
import { describe, expect, it, vi } from "vitest";
import { createTestPluginApi } from "../../test/helpers/plugins/plugin-api.js";
import plugin from "./index.js";
describe("codex plugin", () => {
it("registers the codex provider and agent harness", () => {
const registerAgentHarness = vi.fn();
const registerProvider = vi.fn();
plugin.register(
createTestPluginApi({
id: "codex",
name: "Codex",
source: "test",
config: {},
pluginConfig: {},
runtime: {} as never,
registerAgentHarness,
registerProvider,
}),
);
expect(registerProvider.mock.calls[0]?.[0]).toMatchObject({ id: "codex", label: "Codex" });
expect(registerAgentHarness.mock.calls[0]?.[0]).toMatchObject({
id: "codex",
label: "Codex agent harness",
});
});
});

13
extensions/codex/index.ts Normal file
View File

@@ -0,0 +1,13 @@
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
import { createCodexAppServerAgentHarness } from "./harness.js";
import { buildCodexProvider } from "./provider.js";
export default definePluginEntry({
id: "codex",
name: "Codex",
description: "Codex app-server harness and Codex-managed GPT model catalog.",
register(api) {
api.registerAgentHarness(createCodexAppServerAgentHarness());
api.registerProvider(buildCodexProvider({ pluginConfig: api.pluginConfig }));
},
});

View File

@@ -0,0 +1,44 @@
{
"id": "codex",
"enabledByDefault": true,
"name": "Codex",
"description": "Codex app-server harness and Codex-managed GPT model catalog.",
"providers": ["codex"],
"modelSupport": {
"providerIds": ["codex"],
"modelPrefixes": ["gpt-", "o1", "o3", "o4", "arcanine"]
},
"configSchema": {
"type": "object",
"additionalProperties": false,
"properties": {
"discovery": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": { "type": "boolean" },
"timeoutMs": {
"type": "number",
"minimum": 1,
"default": 2500
}
}
}
}
},
"uiHints": {
"discovery": {
"label": "Model Discovery",
"help": "Plugin-owned controls for discovering Codex app-server models."
},
"discovery.enabled": {
"label": "Enable Discovery",
"help": "When false, OpenClaw keeps the Codex harness available but uses the bundled fallback model list."
},
"discovery.timeoutMs": {
"label": "Discovery Timeout",
"help": "Maximum time to wait for Codex app-server model discovery before falling back to the bundled model list.",
"advanced": true
}
}
}

View File

@@ -0,0 +1,17 @@
{
"name": "@openclaw/codex",
"version": "2026.4.9",
"description": "OpenClaw Codex harness and model provider plugin",
"type": "module",
"dependencies": {
"@mariozechner/pi-coding-agent": "0.65.2"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"
},
"openclaw": {
"extensions": [
"./index.ts"
]
}
}

View File

@@ -0,0 +1,109 @@
import { describe, expect, it, vi } from "vitest";
import { buildCodexProvider, buildCodexProviderCatalog } from "./provider.js";
describe("codex provider", () => {
it("maps Codex app-server models to a Codex provider catalog", async () => {
const listModels = vi.fn(async () => ({
models: [
{
id: "gpt-5.4",
model: "gpt-5.4",
displayName: "gpt-5.4",
hidden: false,
inputModalities: ["text", "image"],
supportedReasoningEfforts: ["low", "medium", "high", "xhigh"],
},
{
id: "hidden-model",
model: "hidden-model",
hidden: true,
inputModalities: ["text"],
supportedReasoningEfforts: [],
},
],
}));
const result = await buildCodexProviderCatalog({
env: {},
listModels,
pluginConfig: { discovery: { timeoutMs: 1234 } },
});
expect(listModels).toHaveBeenCalledWith({ limit: 100, timeoutMs: 1234 });
expect(result.provider).toMatchObject({
auth: "token",
api: "openai-codex-responses",
models: [
{
id: "gpt-5.4",
name: "gpt-5.4",
reasoning: true,
input: ["text", "image"],
compat: { supportsReasoningEffort: true },
},
],
});
});
it("keeps a static fallback catalog when discovery is disabled", async () => {
const listModels = vi.fn();
const result = await buildCodexProviderCatalog({
env: {},
listModels,
pluginConfig: { discovery: { enabled: false } },
});
expect(listModels).not.toHaveBeenCalled();
expect(result.provider.models.map((model) => model.id)).toEqual([
"gpt-5.4",
"gpt-5.4-mini",
"gpt-5.2",
]);
});
it("resolves arbitrary Codex app-server model ids through the codex provider", () => {
const provider = buildCodexProvider();
const model = provider.resolveDynamicModel?.({
provider: "codex",
modelId: " arcanine ",
modelRegistry: { find: () => null },
} as never);
expect(model).toMatchObject({
id: "arcanine",
provider: "codex",
api: "openai-codex-responses",
baseUrl: "https://chatgpt.com/backend-api",
input: ["text", "image"],
});
});
it("treats o4 ids as reasoning-capable Codex models", () => {
const provider = buildCodexProvider();
const model = provider.resolveDynamicModel?.({
provider: "codex",
modelId: "o4-mini",
modelRegistry: { find: () => null },
} as never);
expect(model).toMatchObject({
id: "o4-mini",
reasoning: true,
compat: { supportsReasoningEffort: true },
});
expect(provider.supportsXHighThinking?.({ provider: "codex", modelId: "o4-mini" })).toBe(true);
});
it("declares synthetic auth because the harness owns Codex credentials", () => {
const provider = buildCodexProvider();
expect(provider.resolveSyntheticAuth?.({ provider: "codex" })).toEqual({
apiKey: "codex-app-server",
source: "codex-app-server",
mode: "token",
});
});
});

View File

@@ -0,0 +1,222 @@
import type { ProviderRuntimeModel } from "openclaw/plugin-sdk/plugin-entry";
import {
normalizeModelCompat,
type ModelDefinitionConfig,
type ModelProviderConfig,
type ProviderPlugin,
} from "openclaw/plugin-sdk/provider-model-shared";
import {
listCodexAppServerModels,
type CodexAppServerModel,
type CodexAppServerModelListResult,
} from "./harness.js";
const PROVIDER_ID = "codex";
const CODEX_BASE_URL = "https://chatgpt.com/backend-api";
const DEFAULT_CONTEXT_WINDOW = 272_000;
const DEFAULT_MAX_TOKENS = 128_000;
const DEFAULT_DISCOVERY_TIMEOUT_MS = 2500;
const LIVE_DISCOVERY_ENV = "OPENCLAW_CODEX_DISCOVERY_LIVE";
type CodexPluginConfig = {
discovery?: {
enabled?: boolean;
timeoutMs?: number;
};
};
type CodexModelLister = (options: {
timeoutMs: number;
limit?: number;
}) => Promise<CodexAppServerModelListResult>;
type BuildCodexProviderOptions = {
pluginConfig?: unknown;
listModels?: CodexModelLister;
};
type BuildCatalogOptions = {
env?: NodeJS.ProcessEnv;
pluginConfig?: unknown;
listModels?: CodexModelLister;
};
const FALLBACK_CODEX_MODELS = [
{
id: "gpt-5.4",
model: "gpt-5.4",
displayName: "gpt-5.4",
description: "Latest frontier agentic coding model.",
isDefault: true,
inputModalities: ["text", "image"],
supportedReasoningEfforts: ["low", "medium", "high", "xhigh"],
},
{
id: "gpt-5.4-mini",
model: "gpt-5.4-mini",
displayName: "GPT-5.4-Mini",
description: "Smaller frontier agentic coding model.",
inputModalities: ["text", "image"],
supportedReasoningEfforts: ["low", "medium", "high", "xhigh"],
},
{
id: "gpt-5.2",
model: "gpt-5.2",
displayName: "gpt-5.2",
inputModalities: ["text", "image"],
supportedReasoningEfforts: ["low", "medium", "high", "xhigh"],
},
] satisfies CodexAppServerModel[];
export function buildCodexProvider(options: BuildCodexProviderOptions = {}): ProviderPlugin {
return {
id: PROVIDER_ID,
label: "Codex",
docsPath: "/providers/models",
auth: [],
catalog: {
order: "late",
run: async (ctx) =>
buildCodexProviderCatalog({
env: ctx.env,
pluginConfig: options.pluginConfig,
listModels: options.listModels,
}),
},
resolveDynamicModel: (ctx) => resolveCodexDynamicModel(ctx.modelId),
resolveSyntheticAuth: () => ({
apiKey: "codex-app-server",
source: "codex-app-server",
mode: "token",
}),
supportsXHighThinking: ({ modelId }) => isKnownXHighCodexModel(modelId),
isModernModelRef: ({ modelId }) => isModernCodexModel(modelId),
};
}
export async function buildCodexProviderCatalog(
options: BuildCatalogOptions = {},
): Promise<{ provider: ModelProviderConfig }> {
const config = readCodexPluginConfig(options.pluginConfig);
const timeoutMs = normalizeTimeoutMs(config.discovery?.timeoutMs);
const discovered =
config.discovery?.enabled === false || shouldSkipLiveDiscovery(options.env)
? []
: await listModelsBestEffort({
listModels: options.listModels ?? listCodexAppServerModels,
timeoutMs,
});
const models = (discovered.length > 0 ? discovered : FALLBACK_CODEX_MODELS).map(
codexModelToDefinition,
);
return {
provider: {
baseUrl: CODEX_BASE_URL,
auth: "token",
api: "openai-codex-responses",
models,
},
};
}
function resolveCodexDynamicModel(modelId: string): ProviderRuntimeModel | undefined {
const id = modelId.trim();
if (!id) {
return undefined;
}
return normalizeModelCompat({
...buildModelDefinition({
id,
model: id,
inputModalities: ["text", "image"],
supportedReasoningEfforts: shouldDefaultToReasoningModel(id) ? ["medium"] : [],
}),
provider: PROVIDER_ID,
baseUrl: CODEX_BASE_URL,
} as ProviderRuntimeModel);
}
function codexModelToDefinition(model: CodexAppServerModel): ModelDefinitionConfig {
return buildModelDefinition(model);
}
function buildModelDefinition(model: {
id: string;
model: string;
displayName?: string;
inputModalities: string[];
supportedReasoningEfforts: string[];
}): ModelDefinitionConfig {
const id = model.id.trim() || model.model.trim();
return {
id,
name: model.displayName?.trim() || id,
api: "openai-codex-responses",
reasoning: model.supportedReasoningEfforts.length > 0 || shouldDefaultToReasoningModel(id),
input: model.inputModalities.includes("image") ? ["text", "image"] : ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: DEFAULT_CONTEXT_WINDOW,
maxTokens: DEFAULT_MAX_TOKENS,
compat: {
supportsReasoningEffort: model.supportedReasoningEfforts.length > 0,
supportsUsageInStreaming: true,
},
};
}
async function listModelsBestEffort(params: {
listModels: CodexModelLister;
timeoutMs: number;
}): Promise<CodexAppServerModel[]> {
try {
const result = await params.listModels({
timeoutMs: params.timeoutMs,
limit: 100,
});
return result.models.filter((model) => !model.hidden);
} catch {
return [];
}
}
function readCodexPluginConfig(value: unknown): CodexPluginConfig {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return {};
}
return value as CodexPluginConfig;
}
function normalizeTimeoutMs(value: unknown): number {
return typeof value === "number" && Number.isFinite(value) && value > 0
? value
: DEFAULT_DISCOVERY_TIMEOUT_MS;
}
function shouldSkipLiveDiscovery(env: NodeJS.ProcessEnv = process.env): boolean {
return Boolean(env.VITEST) && env[LIVE_DISCOVERY_ENV] !== "1";
}
function shouldDefaultToReasoningModel(modelId: string): boolean {
const lower = modelId.toLowerCase();
return (
lower.startsWith("gpt-5") ||
lower.startsWith("o1") ||
lower.startsWith("o3") ||
lower.startsWith("o4")
);
}
function isKnownXHighCodexModel(modelId: string): boolean {
const lower = modelId.trim().toLowerCase();
return (
lower.startsWith("gpt-5") ||
lower.startsWith("o3") ||
lower.startsWith("o4") ||
lower.includes("codex")
);
}
function isModernCodexModel(modelId: string): boolean {
const lower = modelId.trim().toLowerCase();
return lower === "gpt-5.4" || lower === "gpt-5.4-mini" || lower === "gpt-5.2";
}

View File

@@ -0,0 +1,8 @@
{
"extends": "../tsconfig.package-boundary.base.json",
"compilerOptions": {
"rootDir": "."
},
"include": ["./*.ts", "./app-server/*.ts"],
"exclude": ["./**/*.test.ts", "./dist/**", "./node_modules/**"]
}

View File

@@ -0,0 +1,78 @@
import fs from "node:fs/promises";
import path from "node:path";
const codexRepo = process.env.OPENCLAW_CODEX_REPO
? path.resolve(process.env.OPENCLAW_CODEX_REPO)
: path.resolve(process.cwd(), "../codex");
const schemaRoot = path.join(codexRepo, "codex-rs/app-server-protocol/schema/typescript");
const checks: Array<{ file: string; snippets: string[] }> = [
{
file: "ServerRequest.ts",
snippets: [
'"item/commandExecution/requestApproval"',
'"item/fileChange/requestApproval"',
'"item/permissions/requestApproval"',
'"item/tool/call"',
],
},
{
file: "v2/ThreadItem.ts",
snippets: [
'"type": "contextCompaction"',
'"type": "dynamicToolCall"',
'"type": "commandExecution"',
'"type": "mcpToolCall"',
],
},
{
file: "v2/DynamicToolSpec.ts",
snippets: ["name: string", "description: string", "inputSchema: JsonValue"],
},
{
file: "v2/CommandExecutionApprovalDecision.ts",
snippets: ['"accept"', '"acceptForSession"', '"decline"', '"cancel"'],
},
{
file: "ReviewDecision.ts",
snippets: ['"approved"', '"approved_for_session"', '"denied"', '"abort"'],
},
{
file: "v2/PlanDeltaNotification.ts",
snippets: ["itemId: string", "delta: string"],
},
{
file: "v2/TurnPlanUpdatedNotification.ts",
snippets: ["explanation: string | null", "plan: Array<TurnPlanStep>"],
},
];
const failures: string[] = [];
for (const check of checks) {
const filePath = path.join(schemaRoot, check.file);
let text: string;
try {
text = await fs.readFile(filePath, "utf8");
} catch (error) {
failures.push(`${check.file}: missing (${String(error)})`);
continue;
}
for (const snippet of check.snippets) {
if (!text.includes(snippet)) {
failures.push(`${check.file}: missing ${snippet}`);
}
}
}
if (failures.length > 0) {
console.error("Codex app-server generated protocol drift:");
for (const failure of failures) {
console.error(`- ${failure}`);
}
process.exit(1);
}
console.log(
`Codex app-server generated protocol matches OpenClaw bridge assumptions: ${schemaRoot}`,
);