From dd26e8c44d4e809832f4834272879aac40ee7463 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 10 Apr 2026 13:49:54 +0100 Subject: [PATCH] feat: add Codex app-server harness extension --- .github/labeler.yml | 4 + .../codex/app-server/approval-bridge.test.ts | 132 ++++ .../codex/app-server/approval-bridge.ts | 390 +++++++++++ extensions/codex/app-server/client.test.ts | 191 ++++++ extensions/codex/app-server/client.ts | 503 +++++++++++++++ extensions/codex/app-server/compact.test.ts | 140 ++++ extensions/codex/app-server/compact.ts | 227 +++++++ .../codex/app-server/dynamic-tools.test.ts | 141 ++++ extensions/codex/app-server/dynamic-tools.ts | 217 +++++++ .../codex/app-server/event-projector.test.ts | 200 ++++++ .../codex/app-server/event-projector.ts | 604 ++++++++++++++++++ extensions/codex/app-server/protocol.ts | 185 ++++++ .../codex/app-server/run-attempt.test.ts | 229 +++++++ extensions/codex/app-server/run-attempt.ts | 577 +++++++++++++++++ .../codex/app-server/session-binding.test.ts | 52 ++ .../codex/app-server/session-binding.ts | 98 +++ .../app-server/transcript-mirror.test.ts | 98 +++ .../codex/app-server/transcript-mirror.ts | 83 +++ extensions/codex/harness.ts | 48 ++ extensions/codex/index.test.ts | 29 + extensions/codex/index.ts | 13 + extensions/codex/openclaw.plugin.json | 44 ++ extensions/codex/package.json | 17 + extensions/codex/provider.test.ts | 109 ++++ extensions/codex/provider.ts | 222 +++++++ extensions/codex/tsconfig.json | 8 + scripts/check-codex-app-server-protocol.ts | 78 +++ 27 files changed, 4639 insertions(+) create mode 100644 extensions/codex/app-server/approval-bridge.test.ts create mode 100644 extensions/codex/app-server/approval-bridge.ts create mode 100644 extensions/codex/app-server/client.test.ts create mode 100644 extensions/codex/app-server/client.ts create mode 100644 extensions/codex/app-server/compact.test.ts create mode 100644 extensions/codex/app-server/compact.ts create mode 100644 extensions/codex/app-server/dynamic-tools.test.ts create mode 100644 extensions/codex/app-server/dynamic-tools.ts create mode 100644 extensions/codex/app-server/event-projector.test.ts create mode 100644 extensions/codex/app-server/event-projector.ts create mode 100644 extensions/codex/app-server/protocol.ts create mode 100644 extensions/codex/app-server/run-attempt.test.ts create mode 100644 extensions/codex/app-server/run-attempt.ts create mode 100644 extensions/codex/app-server/session-binding.test.ts create mode 100644 extensions/codex/app-server/session-binding.ts create mode 100644 extensions/codex/app-server/transcript-mirror.test.ts create mode 100644 extensions/codex/app-server/transcript-mirror.ts create mode 100644 extensions/codex/harness.ts create mode 100644 extensions/codex/index.test.ts create mode 100644 extensions/codex/index.ts create mode 100644 extensions/codex/openclaw.plugin.json create mode 100644 extensions/codex/package.json create mode 100644 extensions/codex/provider.test.ts create mode 100644 extensions/codex/provider.ts create mode 100644 extensions/codex/tsconfig.json create mode 100644 scripts/check-codex-app-server-protocol.ts diff --git a/.github/labeler.yml b/.github/labeler.yml index 57e1857664e..b7efd8d401a 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -297,6 +297,10 @@ - changed-files: - any-glob-to-any-file: - "extensions/openai/**" +"extensions: codex": + - changed-files: + - any-glob-to-any-file: + - "extensions/codex/**" "extensions: kimi-coding": - changed-files: - any-glob-to-any-file: diff --git a/extensions/codex/app-server/approval-bridge.test.ts b/extensions/codex/app-server/approval-bridge.test.ts new file mode 100644 index 00000000000..2f1aa8a2462 --- /dev/null +++ b/extensions/codex/app-server/approval-bridge.test.ts @@ -0,0 +1,132 @@ +import { callGatewayTool, type EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { buildApprovalResponse, handleCodexAppServerApprovalRequest } from "./approval-bridge.js"; + +vi.mock("openclaw/plugin-sdk/agent-harness", async (importOriginal) => ({ + ...(await importOriginal()), + callGatewayTool: vi.fn(), +})); + +const mockCallGatewayTool = vi.mocked(callGatewayTool); + +function createParams(): EmbeddedRunAttemptParams { + return { + sessionKey: "agent:main:session-1", + agentId: "main", + messageChannel: "telegram", + currentChannelId: "chat-1", + agentAccountId: "default", + currentThreadTs: "thread-ts", + onAgentEvent: vi.fn(), + } as unknown as EmbeddedRunAttemptParams; +} + +describe("Codex app-server approval bridge", () => { + beforeEach(() => { + mockCallGatewayTool.mockReset(); + }); + + it("routes command approvals through plugin approvals and accepts allowed commands", async () => { + const params = createParams(); + mockCallGatewayTool + .mockResolvedValueOnce({ id: "plugin:approval-1", status: "accepted" }) + .mockResolvedValueOnce({ id: "plugin:approval-1", decision: "allow-once" }); + + const result = await handleCodexAppServerApprovalRequest({ + method: "item/commandExecution/requestApproval", + requestParams: { + threadId: "thread-1", + turnId: "turn-1", + itemId: "cmd-1", + command: "pnpm test extensions/codex/app-server", + }, + paramsForRun: params, + threadId: "thread-1", + turnId: "turn-1", + }); + + expect(result).toEqual({ decision: "accept" }); + expect(mockCallGatewayTool.mock.calls.map(([method]) => method)).toEqual([ + "plugin.approval.request", + "plugin.approval.waitDecision", + ]); + expect(mockCallGatewayTool).toHaveBeenCalledWith( + "plugin.approval.request", + expect.any(Object), + expect.objectContaining({ + pluginId: "openclaw-codex-app-server", + title: "Codex app-server command approval", + twoPhase: true, + turnSourceChannel: "telegram", + turnSourceTo: "chat-1", + }), + { expectFinal: false }, + ); + expect(params.onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "approval", + data: expect.objectContaining({ status: "pending", approvalId: "plugin:approval-1" }), + }), + ); + expect(params.onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "approval", + data: expect.objectContaining({ status: "approved", approvalId: "plugin:approval-1" }), + }), + ); + }); + + it("fails closed when no approval route is available", async () => { + const params = createParams(); + mockCallGatewayTool.mockResolvedValueOnce({ + id: "plugin:approval-2", + decision: null, + }); + + const result = await handleCodexAppServerApprovalRequest({ + method: "item/fileChange/requestApproval", + requestParams: { + threadId: "thread-1", + turnId: "turn-1", + itemId: "patch-1", + reason: "needs write access", + }, + paramsForRun: params, + threadId: "thread-1", + turnId: "turn-1", + }); + + expect(result).toEqual({ decision: "decline" }); + expect(mockCallGatewayTool).toHaveBeenCalledTimes(1); + expect(params.onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "approval", + data: expect.objectContaining({ status: "unavailable", reason: "needs write access" }), + }), + ); + }); + + it("maps app-server approval response families separately", () => { + expect(buildApprovalResponse("execCommandApproval", undefined, "approved-session")).toEqual({ + decision: "approved_for_session", + }); + expect(buildApprovalResponse("applyPatchApproval", undefined, "denied")).toEqual({ + decision: "denied", + }); + expect( + buildApprovalResponse( + "item/permissions/requestApproval", + { + permissions: { + network: { allowHosts: ["example.com"] }, + fileSystem: null, + }, + }, + "approved-once", + ), + ).toEqual({ + permissions: { network: { allowHosts: ["example.com"] } }, + scope: "turn", + }); + }); +}); diff --git a/extensions/codex/app-server/approval-bridge.ts b/extensions/codex/app-server/approval-bridge.ts new file mode 100644 index 00000000000..6d524521877 --- /dev/null +++ b/extensions/codex/app-server/approval-bridge.ts @@ -0,0 +1,390 @@ +import { + callGatewayTool, + type AgentApprovalEventData, + type EmbeddedRunAttemptParams, + type ExecApprovalDecision, +} from "openclaw/plugin-sdk/agent-harness"; +import { isJsonObject, type JsonObject, type JsonValue } from "./protocol.js"; + +const DEFAULT_CODEX_APPROVAL_TIMEOUT_MS = 120_000; + +export type AppServerApprovalOutcome = + | "approved-once" + | "approved-session" + | "denied" + | "unavailable" + | "cancelled"; + +type ApprovalRequestResult = { + id?: string; + status?: string; + decision?: ExecApprovalDecision | null; +}; + +type ApprovalWaitResult = { + id?: string; + decision?: ExecApprovalDecision | null; +}; + +export async function handleCodexAppServerApprovalRequest(params: { + method: string; + requestParams: JsonValue | undefined; + paramsForRun: EmbeddedRunAttemptParams; + threadId: string; + turnId: string; + signal?: AbortSignal; +}): Promise { + const requestParams = isJsonObject(params.requestParams) ? params.requestParams : undefined; + if (!matchesCurrentTurn(requestParams, params.threadId, params.turnId)) { + return undefined; + } + + const context = buildApprovalContext({ + method: params.method, + requestParams, + paramsForRun: params.paramsForRun, + }); + + try { + const timeoutMs = DEFAULT_CODEX_APPROVAL_TIMEOUT_MS; + const requestResult = await callGatewayTool( + "plugin.approval.request", + { timeoutMs: timeoutMs + 10_000 }, + { + pluginId: "openclaw-codex-app-server", + title: context.title, + description: context.description, + severity: context.severity, + toolName: context.kind === "exec" ? "codex_command_approval" : "codex_file_approval", + toolCallId: context.itemId, + agentId: params.paramsForRun.agentId, + sessionKey: params.paramsForRun.sessionKey, + turnSourceChannel: + params.paramsForRun.messageChannel ?? params.paramsForRun.messageProvider, + turnSourceTo: params.paramsForRun.currentChannelId, + turnSourceAccountId: params.paramsForRun.agentAccountId, + turnSourceThreadId: params.paramsForRun.currentThreadTs, + timeoutMs, + twoPhase: true, + }, + { expectFinal: false }, + ); + + const approvalId = requestResult?.id; + if (!approvalId) { + emitApprovalEvent(params.paramsForRun, { + phase: "resolved", + kind: context.kind, + status: "unavailable", + title: context.title, + ...context.eventDetails, + message: "Codex app-server approval route unavailable.", + }); + return buildApprovalResponse(params.method, context.requestParams, "denied"); + } + + emitApprovalEvent(params.paramsForRun, { + phase: "requested", + kind: context.kind, + status: "pending", + title: context.title, + approvalId, + approvalSlug: approvalId, + ...context.eventDetails, + message: "Codex app-server approval requested.", + }); + + const decision = Object.prototype.hasOwnProperty.call(requestResult, "decision") + ? requestResult.decision + : await waitForApprovalDecision({ + approvalId, + timeoutMs, + signal: params.signal, + }); + const outcome = mapExecDecisionToOutcome(decision); + + emitApprovalEvent(params.paramsForRun, { + phase: "resolved", + kind: context.kind, + status: + outcome === "denied" + ? "denied" + : outcome === "unavailable" + ? "unavailable" + : outcome === "cancelled" + ? "failed" + : "approved", + title: context.title, + approvalId, + approvalSlug: approvalId, + ...context.eventDetails, + message: approvalResolutionMessage(outcome), + }); + return buildApprovalResponse(params.method, context.requestParams, outcome); + } catch (error) { + const cancelled = params.signal?.aborted === true; + emitApprovalEvent(params.paramsForRun, { + phase: "resolved", + kind: context.kind, + status: cancelled ? "failed" : "unavailable", + title: context.title, + ...context.eventDetails, + message: cancelled + ? "Codex app-server approval cancelled because the run stopped." + : `Codex app-server approval route failed: ${formatErrorMessage(error)}`, + }); + return buildApprovalResponse( + params.method, + context.requestParams, + cancelled ? "cancelled" : "denied", + ); + } +} + +export function buildApprovalResponse( + method: string, + requestParams: JsonObject | undefined, + outcome: AppServerApprovalOutcome, +): JsonValue { + if (method === "item/commandExecution/requestApproval") { + return { decision: commandApprovalDecision(requestParams, outcome) }; + } + if (method === "item/fileChange/requestApproval") { + return { decision: fileChangeApprovalDecision(outcome) }; + } + if (method === "item/permissions/requestApproval") { + if (outcome === "approved-session" || outcome === "approved-once") { + return { + permissions: requestedPermissions(requestParams), + scope: outcome === "approved-session" ? "session" : "turn", + }; + } + return { permissions: {}, scope: "turn" }; + } + if (method === "execCommandApproval" || method === "applyPatchApproval") { + return { decision: legacyReviewDecision(outcome) }; + } + return { + decision: outcome === "approved-once" || outcome === "approved-session" ? "accept" : "decline", + }; +} + +function matchesCurrentTurn( + requestParams: JsonObject | undefined, + threadId: string, + turnId: string, +): boolean { + if (!requestParams) { + return true; + } + const requestThreadId = + readString(requestParams, "threadId") ?? readString(requestParams, "conversationId"); + const requestTurnId = readString(requestParams, "turnId"); + if (requestThreadId && requestThreadId !== threadId) { + return false; + } + if (requestTurnId && requestTurnId !== turnId) { + return false; + } + return true; +} + +function buildApprovalContext(params: { + method: string; + requestParams: JsonObject | undefined; + paramsForRun: EmbeddedRunAttemptParams; +}) { + const itemId = + readString(params.requestParams, "itemId") ?? + readString(params.requestParams, "callId") ?? + readString(params.requestParams, "approvalId"); + const command = readCommand(params.requestParams); + const reason = readString(params.requestParams, "reason"); + const kind = approvalKindForMethod(params.method); + const title = + kind === "exec" + ? "Codex app-server command approval" + : kind === "plugin" + ? "Codex app-server file approval" + : "Codex app-server approval"; + const subject = command + ? `Command: ${truncate(command, 180)}` + : reason + ? `Reason: ${truncate(reason, 180)}` + : `Request method: ${params.method}`; + const description = [ + subject, + params.paramsForRun.sessionKey && `Session: ${params.paramsForRun.sessionKey}`, + ] + .filter(Boolean) + .join("\n"); + return { + kind, + title, + description, + severity: kind === "exec" ? ("warning" as const) : ("info" as const), + itemId, + requestParams: params.requestParams, + eventDetails: { + ...(itemId ? { itemId } : {}), + ...(command ? { command } : {}), + ...(reason ? { reason } : {}), + }, + }; +} + +async function waitForApprovalDecision(params: { + approvalId: string; + timeoutMs: number; + signal?: AbortSignal; +}): Promise { + const waitPromise = callGatewayTool( + "plugin.approval.waitDecision", + { timeoutMs: params.timeoutMs + 10_000 }, + { id: params.approvalId }, + ); + if (!params.signal) { + return (await waitPromise)?.decision; + } + let onAbort: (() => void) | undefined; + const abortPromise = new Promise((_, reject) => { + if (params.signal!.aborted) { + reject(params.signal!.reason); + return; + } + onAbort = () => reject(params.signal!.reason); + params.signal!.addEventListener("abort", onAbort, { once: true }); + }); + try { + return (await Promise.race([waitPromise, abortPromise]))?.decision; + } finally { + if (onAbort) { + params.signal.removeEventListener("abort", onAbort); + } + } +} + +function commandApprovalDecision( + requestParams: JsonObject | undefined, + outcome: AppServerApprovalOutcome, +): JsonValue { + if (outcome === "cancelled") { + return "cancel"; + } + if (outcome === "denied" || outcome === "unavailable") { + return "decline"; + } + if (outcome === "approved-session" && hasAvailableDecision(requestParams, "acceptForSession")) { + return "acceptForSession"; + } + return "accept"; +} + +function fileChangeApprovalDecision(outcome: AppServerApprovalOutcome): JsonValue { + if (outcome === "cancelled") { + return "cancel"; + } + if (outcome === "denied" || outcome === "unavailable") { + return "decline"; + } + return outcome === "approved-session" ? "acceptForSession" : "accept"; +} + +function legacyReviewDecision(outcome: AppServerApprovalOutcome): JsonValue { + if (outcome === "cancelled") { + return "abort"; + } + if (outcome === "denied" || outcome === "unavailable") { + return "denied"; + } + return outcome === "approved-session" ? "approved_for_session" : "approved"; +} + +function requestedPermissions(requestParams: JsonObject | undefined): JsonObject { + const permissions = isJsonObject(requestParams?.permissions) ? requestParams.permissions : {}; + const granted: JsonObject = {}; + if (isJsonObject(permissions.network)) { + granted.network = permissions.network; + } + if (isJsonObject(permissions.fileSystem)) { + granted.fileSystem = permissions.fileSystem; + } + return granted; +} + +function hasAvailableDecision(requestParams: JsonObject | undefined, decision: string): boolean { + const available = requestParams?.availableDecisions; + if (!Array.isArray(available)) { + return true; + } + return available.includes(decision); +} + +function mapExecDecisionToOutcome( + decision: ExecApprovalDecision | null | undefined, +): AppServerApprovalOutcome { + if (decision === "allow-once") { + return "approved-once"; + } + if (decision === "allow-always") { + return "approved-session"; + } + if (decision === null || decision === undefined) { + return "unavailable"; + } + return "denied"; +} + +function approvalResolutionMessage(outcome: AppServerApprovalOutcome): string { + if (outcome === "approved-session") { + return "Codex app-server approval granted for the session."; + } + if (outcome === "approved-once") { + return "Codex app-server approval granted once."; + } + if (outcome === "cancelled") { + return "Codex app-server approval cancelled."; + } + if (outcome === "unavailable") { + return "Codex app-server approval unavailable."; + } + return "Codex app-server approval denied."; +} + +function approvalKindForMethod(method: string): AgentApprovalEventData["kind"] { + if (method.includes("commandExecution") || method.includes("execCommand")) { + return "exec"; + } + if (method.includes("fileChange") || method.includes("Patch") || method.includes("permissions")) { + return "plugin"; + } + return "unknown"; +} + +function emitApprovalEvent(params: EmbeddedRunAttemptParams, data: AgentApprovalEventData): void { + params.onAgentEvent?.({ stream: "approval", data: data as unknown as Record }); +} + +function readCommand(record: JsonObject | undefined): string | undefined { + const command = record?.command; + if (typeof command === "string") { + return command; + } + if (Array.isArray(command) && command.every((part) => typeof part === "string")) { + return command.join(" "); + } + return undefined; +} + +function readString(record: JsonObject | undefined, key: string): string | undefined { + const value = record?.[key]; + return typeof value === "string" ? value : undefined; +} + +function truncate(value: string, maxLength: number): string { + return value.length <= maxLength ? value : `${value.slice(0, Math.max(0, maxLength - 3))}...`; +} + +function formatErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/extensions/codex/app-server/client.test.ts b/extensions/codex/app-server/client.test.ts new file mode 100644 index 00000000000..88afac2a036 --- /dev/null +++ b/extensions/codex/app-server/client.test.ts @@ -0,0 +1,191 @@ +import { EventEmitter } from "node:events"; +import { PassThrough, Writable } from "node:stream"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + CodexAppServerClient, + listCodexAppServerModels, + resetSharedCodexAppServerClientForTests, +} from "./client.js"; + +function createClientHarness() { + const stdout = new PassThrough(); + const stderr = new PassThrough(); + const writes: string[] = []; + const stdin = new Writable({ + write(chunk, _encoding, callback) { + writes.push(chunk.toString()); + callback(); + }, + }); + const process = Object.assign(new EventEmitter(), { + stdin, + stdout, + stderr, + killed: false, + kill: vi.fn(() => { + process.killed = true; + }), + }); + const client = CodexAppServerClient.fromTransportForTests(process); + return { + client, + writes, + send(message: unknown) { + stdout.write(`${JSON.stringify(message)}\n`); + }, + }; +} + +describe("CodexAppServerClient", () => { + const clients: CodexAppServerClient[] = []; + + afterEach(() => { + resetSharedCodexAppServerClientForTests(); + vi.restoreAllMocks(); + for (const client of clients) { + client.close(); + } + clients.length = 0; + }); + + it("routes request responses by id", async () => { + const harness = createClientHarness(); + clients.push(harness.client); + + const request = harness.client.request("model/list", {}); + const outbound = JSON.parse(harness.writes[0] ?? "{}") as { id?: number; method?: string }; + harness.send({ id: outbound.id, result: { models: [] } }); + + await expect(request).resolves.toEqual({ models: [] }); + expect(outbound.method).toBe("model/list"); + }); + + it("initializes with the required client version", async () => { + const harness = createClientHarness(); + clients.push(harness.client); + + const initializing = harness.client.initialize(); + const outbound = JSON.parse(harness.writes[0] ?? "{}") as { + id?: number; + method?: string; + params?: { clientInfo?: { name?: string; title?: string; version?: string } }; + }; + harness.send({ id: outbound.id, result: {} }); + + await expect(initializing).resolves.toBeUndefined(); + expect(outbound).toMatchObject({ + method: "initialize", + params: { + clientInfo: { + name: "openclaw", + title: "OpenClaw", + version: expect.any(String), + }, + }, + }); + expect(outbound.params?.clientInfo?.version).not.toBe(""); + expect(JSON.parse(harness.writes[1] ?? "{}")).toEqual({ method: "initialized" }); + }); + + it("answers server-initiated requests with the registered handler result", async () => { + const harness = createClientHarness(); + clients.push(harness.client); + harness.client.addRequestHandler((request) => { + if (request.method === "item/tool/call") { + return { contentItems: [{ type: "inputText", text: "ok" }], success: true }; + } + return undefined; + }); + + harness.send({ id: "srv-1", method: "item/tool/call", params: { tool: "message" } }); + await vi.waitFor(() => expect(harness.writes.length).toBe(1)); + + expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({ + id: "srv-1", + result: { contentItems: [{ type: "inputText", text: "ok" }], success: true }, + }); + }); + + it("fails closed for unhandled native app-server approvals", async () => { + const harness = createClientHarness(); + clients.push(harness.client); + + harness.send({ + id: "approval-1", + method: "item/commandExecution/requestApproval", + params: { threadId: "thread-1", turnId: "turn-1", itemId: "cmd-1", command: "pnpm test" }, + }); + await vi.waitFor(() => expect(harness.writes.length).toBe(1)); + + expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({ + id: "approval-1", + result: { decision: "decline" }, + }); + }); + + it("fails closed with legacy review decisions for legacy app-server approvals", async () => { + const harness = createClientHarness(); + clients.push(harness.client); + + harness.send({ + id: "approval-legacy", + method: "execCommandApproval", + params: { conversationId: "thread-1", callId: "cmd-1", command: ["pnpm", "test"] }, + }); + await vi.waitFor(() => expect(harness.writes.length).toBe(1)); + + expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({ + id: "approval-legacy", + result: { decision: "denied" }, + }); + }); + + it("lists app-server models through the typed helper", async () => { + const harness = createClientHarness(); + clients.push(harness.client); + const startSpy = vi.spyOn(CodexAppServerClient, "start").mockReturnValue(harness.client); + + const listPromise = listCodexAppServerModels({ limit: 12, timeoutMs: 1000 }); + const initialize = JSON.parse(harness.writes[0] ?? "{}") as { id?: number }; + harness.send({ id: initialize.id, result: {} }); + await vi.waitFor(() => expect(harness.writes.length).toBeGreaterThanOrEqual(3)); + const list = JSON.parse(harness.writes[2] ?? "{}") as { id?: number; method?: string }; + expect(list.method).toBe("model/list"); + + harness.send({ + id: list.id, + result: { + data: [ + { + id: "gpt-5.4", + model: "gpt-5.4", + displayName: "gpt-5.4", + inputModalities: ["text", "image"], + supportedReasoningEfforts: [ + { reasoningEffort: "low", description: "fast" }, + { reasoningEffort: "xhigh", description: "deep" }, + ], + defaultReasoningEffort: "medium", + isDefault: true, + }, + ], + nextCursor: null, + }, + }); + + await expect(listPromise).resolves.toEqual({ + models: [ + { + id: "gpt-5.4", + model: "gpt-5.4", + displayName: "gpt-5.4", + inputModalities: ["text", "image"], + supportedReasoningEfforts: ["low", "xhigh"], + defaultReasoningEffort: "medium", + isDefault: true, + }, + ], + }); + startSpy.mockRestore(); + }); +}); diff --git a/extensions/codex/app-server/client.ts b/extensions/codex/app-server/client.ts new file mode 100644 index 00000000000..5982116d3bb --- /dev/null +++ b/extensions/codex/app-server/client.ts @@ -0,0 +1,503 @@ +import { spawn } from "node:child_process"; +import { createInterface, type Interface as ReadlineInterface } from "node:readline"; +import { embeddedAgentLog, OPENCLAW_VERSION } from "openclaw/plugin-sdk/agent-harness"; +import { + isRpcResponse, + type CodexServerNotification, + type JsonObject, + type JsonValue, + type RpcMessage, + type RpcRequest, + type RpcResponse, +} from "./protocol.js"; + +type PendingRequest = { + method: string; + resolve: (value: unknown) => void; + reject: (error: Error) => void; +}; + +type CodexAppServerTransport = { + stdin: { write: (data: string) => unknown }; + stdout: NodeJS.ReadableStream; + stderr: NodeJS.ReadableStream; + killed?: boolean; + kill?: () => unknown; + once: (event: string, listener: (...args: unknown[]) => void) => unknown; +}; + +export type CodexServerRequestHandler = ( + request: Required> & { params?: JsonValue }, +) => Promise | JsonValue | undefined; + +export type CodexServerNotificationHandler = ( + notification: CodexServerNotification, +) => Promise | void; + +export type CodexAppServerModel = { + id: string; + model: string; + displayName?: string; + description?: string; + hidden?: boolean; + isDefault?: boolean; + inputModalities: string[]; + supportedReasoningEfforts: string[]; + defaultReasoningEffort?: string; +}; + +export type CodexAppServerModelListResult = { + models: CodexAppServerModel[]; + nextCursor?: string; +}; + +export type CodexAppServerListModelsOptions = { + limit?: number; + cursor?: string; + includeHidden?: boolean; + timeoutMs?: number; +}; + +export class CodexAppServerClient { + private readonly child: CodexAppServerTransport; + private readonly lines: ReadlineInterface; + private readonly pending = new Map(); + private readonly requestHandlers = new Set(); + private readonly notificationHandlers = new Set(); + private nextId = 1; + private initialized = false; + private closed = false; + + private constructor(child: CodexAppServerTransport) { + this.child = child; + this.lines = createInterface({ input: child.stdout }); + this.lines.on("line", (line) => this.handleLine(line)); + child.stderr.on("data", (chunk: Buffer | string) => { + const text = chunk.toString("utf8").trim(); + if (text) { + embeddedAgentLog.debug(`codex app-server stderr: ${text}`); + } + }); + child.once("error", (error) => + this.closeWithError(error instanceof Error ? error : new Error(String(error))), + ); + child.once("exit", (code, signal) => { + this.closeWithError( + new Error( + `codex app-server exited: code=${formatExitValue(code)} signal=${formatExitValue(signal)}`, + ), + ); + }); + } + + static start(): CodexAppServerClient { + const bin = process.env.OPENCLAW_CODEX_APP_SERVER_BIN?.trim() || "codex"; + const extraArgs = splitShellWords(process.env.OPENCLAW_CODEX_APP_SERVER_ARGS ?? ""); + const args = extraArgs.length > 0 ? extraArgs : ["app-server", "--listen", "stdio://"]; + const child = spawn(bin, args, { + env: process.env, + stdio: ["pipe", "pipe", "pipe"], + }); + return new CodexAppServerClient(child); + } + + static fromTransportForTests(child: CodexAppServerTransport): CodexAppServerClient { + return new CodexAppServerClient(child); + } + + async initialize(): Promise { + if (this.initialized) { + return; + } + await this.request("initialize", { + clientInfo: { + name: "openclaw", + title: "OpenClaw", + version: OPENCLAW_VERSION, + }, + capabilities: { + experimentalApi: true, + }, + }); + this.notify("initialized"); + this.initialized = true; + } + + request(method: string, params?: JsonValue): Promise { + if (this.closed) { + return Promise.reject(new Error("codex app-server client is closed")); + } + const id = this.nextId++; + const message: RpcRequest = { id, method, params }; + return new Promise((resolve, reject) => { + this.pending.set(id, { + method, + resolve: (value) => resolve(value as T), + reject, + }); + this.writeMessage(message); + }); + } + + notify(method: string, params?: JsonValue): void { + this.writeMessage({ method, params }); + } + + addRequestHandler(handler: CodexServerRequestHandler): () => void { + this.requestHandlers.add(handler); + return () => this.requestHandlers.delete(handler); + } + + addNotificationHandler(handler: CodexServerNotificationHandler): () => void { + this.notificationHandlers.add(handler); + return () => this.notificationHandlers.delete(handler); + } + + close(): void { + this.closed = true; + this.lines.close(); + if (!this.child.killed) { + this.child.kill?.(); + } + } + + private writeMessage(message: RpcRequest | RpcResponse): void { + this.child.stdin.write(`${JSON.stringify(message)}\n`); + } + + private handleLine(line: string): void { + const trimmed = line.trim(); + if (!trimmed) { + return; + } + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch (error) { + embeddedAgentLog.warn("failed to parse codex app-server message", { error }); + return; + } + if (!parsed || typeof parsed !== "object") { + return; + } + const message = parsed as RpcMessage; + if (isRpcResponse(message)) { + this.handleResponse(message); + return; + } + if (!("method" in message)) { + return; + } + if ("id" in message && message.id !== undefined) { + void this.handleServerRequest({ + id: message.id, + method: message.method, + params: message.params, + }); + return; + } + this.handleNotification({ + method: message.method, + params: message.params, + }); + } + + private handleResponse(response: RpcResponse): void { + const pending = this.pending.get(response.id); + if (!pending) { + return; + } + this.pending.delete(response.id); + if (response.error) { + pending.reject(new Error(response.error.message || `${pending.method} failed`)); + return; + } + pending.resolve(response.result); + } + + private async handleServerRequest( + request: Required> & { params?: JsonValue }, + ): Promise { + try { + for (const handler of this.requestHandlers) { + const result = await handler(request); + if (result !== undefined) { + this.writeMessage({ id: request.id, result }); + return; + } + } + this.writeMessage({ id: request.id, result: defaultServerRequestResponse(request) }); + } catch (error) { + this.writeMessage({ + id: request.id, + error: { + message: error instanceof Error ? error.message : String(error), + }, + }); + } + } + + private handleNotification(notification: CodexServerNotification): void { + for (const handler of this.notificationHandlers) { + Promise.resolve(handler(notification)).catch((error: unknown) => { + embeddedAgentLog.warn("codex app-server notification handler failed", { error }); + }); + } + } + + private closeWithError(error: Error): void { + if (this.closed) { + return; + } + this.closed = true; + for (const pending of this.pending.values()) { + pending.reject(error); + } + this.pending.clear(); + clearSharedClientIfCurrent(this); + } +} + +let sharedClient: CodexAppServerClient | undefined; +let sharedClientPromise: Promise | undefined; + +export async function getSharedCodexAppServerClient(): Promise { + sharedClientPromise ??= (async () => { + const client = CodexAppServerClient.start(); + sharedClient = client; + await client.initialize(); + return client; + })(); + try { + return await sharedClientPromise; + } catch (error) { + sharedClient = undefined; + sharedClientPromise = undefined; + throw error; + } +} + +export function resetSharedCodexAppServerClientForTests(): void { + sharedClient = undefined; + sharedClientPromise = undefined; +} + +function clearSharedClientIfCurrent(client: CodexAppServerClient): void { + if (sharedClient !== client) { + return; + } + sharedClient = undefined; + sharedClientPromise = undefined; +} + +export async function listCodexAppServerModels( + options: CodexAppServerListModelsOptions = {}, +): Promise { + const timeoutMs = options.timeoutMs ?? 2500; + return await withTimeout( + (async () => { + const client = await getSharedCodexAppServerClient(); + const response = await client.request("model/list", { + limit: options.limit ?? null, + cursor: options.cursor ?? null, + includeHidden: options.includeHidden ?? null, + }); + return readModelListResult(response); + })(), + timeoutMs, + "codex app-server model/list timed out", + ); +} + +export function defaultServerRequestResponse( + request: Required> & { params?: JsonValue }, +): JsonValue { + if (request.method === "item/tool/call") { + return { + contentItems: [ + { + type: "inputText", + text: "OpenClaw did not register a handler for this app-server tool call.", + }, + ], + success: false, + }; + } + if ( + request.method === "item/commandExecution/requestApproval" || + request.method === "item/fileChange/requestApproval" + ) { + return { decision: "decline" }; + } + if (request.method === "execCommandApproval" || request.method === "applyPatchApproval") { + return { decision: "denied" }; + } + if (request.method === "item/permissions/requestApproval") { + return { permissions: {}, scope: "turn" }; + } + if (isCodexAppServerApprovalRequest(request.method)) { + return { + decision: "decline", + reason: "OpenClaw codex app-server bridge does not grant native approvals yet.", + }; + } + if (request.method === "item/tool/requestUserInput") { + return { + answers: {}, + }; + } + if (request.method === "mcpServer/elicitation/request") { + return { + action: "decline", + }; + } + return {}; +} + +function readModelListResult(value: JsonValue | undefined): CodexAppServerModelListResult { + if (!isJsonObjectValue(value) || !Array.isArray(value.data)) { + return { models: [] }; + } + const models = value.data + .map((entry) => readCodexModel(entry)) + .filter((entry): entry is CodexAppServerModel => entry !== undefined); + const nextCursor = typeof value.nextCursor === "string" ? value.nextCursor : undefined; + return { models, ...(nextCursor ? { nextCursor } : {}) }; +} + +function readCodexModel(value: unknown): CodexAppServerModel | undefined { + if (!isJsonObjectValue(value)) { + return undefined; + } + const id = readNonEmptyString(value.id); + const model = readNonEmptyString(value.model) ?? id; + if (!id || !model) { + return undefined; + } + return { + id, + model, + ...(readNonEmptyString(value.displayName) + ? { displayName: readNonEmptyString(value.displayName) } + : {}), + ...(readNonEmptyString(value.description) + ? { description: readNonEmptyString(value.description) } + : {}), + ...(typeof value.hidden === "boolean" ? { hidden: value.hidden } : {}), + ...(typeof value.isDefault === "boolean" ? { isDefault: value.isDefault } : {}), + inputModalities: readStringArray(value.inputModalities), + supportedReasoningEfforts: readReasoningEfforts(value.supportedReasoningEfforts), + ...(readNonEmptyString(value.defaultReasoningEffort) + ? { defaultReasoningEffort: readNonEmptyString(value.defaultReasoningEffort) } + : {}), + }; +} + +function readReasoningEfforts(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + const efforts = value + .map((entry) => { + if (!isJsonObjectValue(entry)) { + return undefined; + } + return readNonEmptyString(entry.reasoningEffort); + }) + .filter((entry): entry is string => entry !== undefined); + return [...new Set(efforts)]; +} + +function readStringArray(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + return [ + ...new Set( + value + .map((entry) => readNonEmptyString(entry)) + .filter((entry): entry is string => entry !== undefined), + ), + ]; +} + +function readNonEmptyString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed || undefined; +} + +function isJsonObjectValue(value: unknown): value is JsonObject { + return Boolean(value && typeof value === "object" && !Array.isArray(value)); +} + +async function withTimeout( + promise: Promise, + timeoutMs: number, + timeoutMessage: string, +): Promise { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return await promise; + } + let timeout: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeout = setTimeout(() => reject(new Error(timeoutMessage)), Math.max(1, timeoutMs)); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } +} + +export function isCodexAppServerApprovalRequest(method: string): boolean { + return method.includes("requestApproval") || method.includes("Approval"); +} + +function splitShellWords(value: string): string[] { + const words: string[] = []; + let current = ""; + let quote: '"' | "'" | null = null; + for (const char of value) { + if (quote) { + if (char === quote) { + quote = null; + } else { + current += char; + } + continue; + } + if (char === '"' || char === "'") { + quote = char; + continue; + } + if (/\s/.test(char)) { + if (current) { + words.push(current); + current = ""; + } + continue; + } + current += char; + } + if (current) { + words.push(current); + } + return words; +} + +function formatExitValue(value: unknown): string { + if (value === null || value === undefined) { + return "null"; + } + if (typeof value === "string" || typeof value === "number") { + return String(value); + } + return "unknown"; +} diff --git a/extensions/codex/app-server/compact.test.ts b/extensions/codex/app-server/compact.test.ts new file mode 100644 index 00000000000..5a55323ae69 --- /dev/null +++ b/extensions/codex/app-server/compact.test.ts @@ -0,0 +1,140 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CodexAppServerClient } from "./client.js"; +import { maybeCompactCodexAppServerSession, __testing } from "./compact.js"; +import type { CodexServerNotification } from "./protocol.js"; +import { writeCodexAppServerBinding } from "./session-binding.js"; + +const OLD_RUNTIME = process.env.OPENCLAW_AGENT_RUNTIME; + +let tempDir: string; + +describe("maybeCompactCodexAppServerSession", () => { + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-compact-")); + process.env.OPENCLAW_AGENT_RUNTIME = "codex-app-server"; + }); + + afterEach(async () => { + __testing.resetCodexAppServerClientFactoryForTests(); + if (OLD_RUNTIME === undefined) { + delete process.env.OPENCLAW_AGENT_RUNTIME; + } else { + process.env.OPENCLAW_AGENT_RUNTIME = OLD_RUNTIME; + } + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + it("waits for native app-server compaction before reporting success", async () => { + const fake = createFakeCodexClient(); + __testing.setCodexAppServerClientFactoryForTests(async () => fake.client); + const sessionFile = path.join(tempDir, "session.jsonl"); + await writeCodexAppServerBinding(sessionFile, { + threadId: "thread-1", + cwd: tempDir, + }); + + const pendingResult = maybeCompactCodexAppServerSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile, + workspaceDir: tempDir, + currentTokenCount: 123, + }); + await vi.waitFor(() => { + expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" }); + }); + + let settled = false; + void pendingResult.then(() => { + settled = true; + }); + await Promise.resolve(); + expect(settled).toBe(false); + + fake.emit({ + method: "thread/compacted", + params: { threadId: "thread-1", turnId: "turn-1" }, + }); + const result = await pendingResult; + + expect(result).toMatchObject({ + ok: true, + compacted: true, + result: { + tokensBefore: 123, + details: { + backend: "codex-app-server", + threadId: "thread-1", + signal: "thread/compacted", + turnId: "turn-1", + }, + }, + }); + }); + + it("accepts native context-compaction item completion as success", async () => { + const fake = createFakeCodexClient(); + __testing.setCodexAppServerClientFactoryForTests(async () => fake.client); + const sessionFile = path.join(tempDir, "session.jsonl"); + await writeCodexAppServerBinding(sessionFile, { + threadId: "thread-1", + cwd: tempDir, + }); + + const pendingResult = maybeCompactCodexAppServerSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile, + workspaceDir: tempDir, + }); + await vi.waitFor(() => { + expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" }); + }); + fake.emit({ + method: "item/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + item: { type: "contextCompaction", id: "compact-1" }, + }, + }); + + await expect(pendingResult).resolves.toMatchObject({ + ok: true, + compacted: true, + result: { + details: { + signal: "item/completed", + itemId: "compact-1", + }, + }, + }); + }); +}); + +function createFakeCodexClient(): { + client: CodexAppServerClient; + request: ReturnType; + emit: (notification: CodexServerNotification) => void; +} { + const handlers = new Set<(notification: CodexServerNotification) => void>(); + const request = vi.fn(async () => ({})); + return { + client: { + request, + addNotificationHandler(handler: (notification: CodexServerNotification) => void) { + handlers.add(handler); + return () => handlers.delete(handler); + }, + } as unknown as CodexAppServerClient, + request, + emit(notification: CodexServerNotification): void { + for (const handler of handlers) { + handler(notification); + } + }, + }; +} diff --git a/extensions/codex/app-server/compact.ts b/extensions/codex/app-server/compact.ts new file mode 100644 index 00000000000..a60255538b1 --- /dev/null +++ b/extensions/codex/app-server/compact.ts @@ -0,0 +1,227 @@ +import { + embeddedAgentLog, + resolveEmbeddedAgentRuntime, + type CompactEmbeddedPiSessionParams, + type EmbeddedPiCompactResult, +} from "openclaw/plugin-sdk/agent-harness"; +import { + getSharedCodexAppServerClient, + type CodexAppServerClient, + type CodexServerNotificationHandler, +} from "./client.js"; +import { isJsonObject, type CodexServerNotification, type JsonObject } from "./protocol.js"; +import { readCodexAppServerBinding } from "./session-binding.js"; + +type CodexAppServerClientFactory = () => Promise; +type CodexNativeCompactionCompletion = { + signal: "thread/compacted" | "item/completed"; + turnId?: string; + itemId?: string; +}; +type CodexNativeCompactionWaiter = { + promise: Promise; + startTimeout: () => void; + cancel: () => void; +}; + +const DEFAULT_CODEX_COMPACTION_WAIT_TIMEOUT_MS = 5 * 60 * 1000; + +let clientFactory: CodexAppServerClientFactory = getSharedCodexAppServerClient; + +export async function maybeCompactCodexAppServerSession( + params: CompactEmbeddedPiSessionParams, +): Promise { + const runtime = resolveEmbeddedAgentRuntime(); + const provider = params.provider?.trim().toLowerCase(); + const shouldUseCodex = + runtime === "codex" || + (runtime === "auto" && (provider === "codex" || provider === "openai-codex")); + if (!shouldUseCodex) { + return undefined; + } + + const binding = await readCodexAppServerBinding(params.sessionFile); + if (!binding?.threadId) { + if (runtime === "codex") { + return { ok: false, compacted: false, reason: "no codex app-server thread binding" }; + } + return undefined; + } + + const client = await clientFactory(); + const waiter = createCodexNativeCompactionWaiter(client, binding.threadId); + let completion: CodexNativeCompactionCompletion; + try { + await client.request("thread/compact/start", { + threadId: binding.threadId, + }); + embeddedAgentLog.info("started codex app-server compaction", { + sessionId: params.sessionId, + threadId: binding.threadId, + }); + waiter.startTimeout(); + completion = await waiter.promise; + } catch (error) { + waiter.cancel(); + return { + ok: false, + compacted: false, + reason: formatCompactionError(error), + }; + } + embeddedAgentLog.info("completed codex app-server compaction", { + sessionId: params.sessionId, + threadId: binding.threadId, + signal: completion.signal, + turnId: completion.turnId, + itemId: completion.itemId, + }); + return { + ok: true, + compacted: true, + result: { + summary: "", + firstKeptEntryId: "", + tokensBefore: params.currentTokenCount ?? 0, + details: { + backend: "codex-app-server", + threadId: binding.threadId, + signal: completion.signal, + turnId: completion.turnId, + itemId: completion.itemId, + }, + }, + }; +} + +function createCodexNativeCompactionWaiter( + client: CodexAppServerClient, + threadId: string, +): CodexNativeCompactionWaiter { + let settled = false; + let removeHandler: () => void = () => {}; + let timeout: ReturnType | undefined; + let failWaiter: (error: Error) => void = () => {}; + + const promise = new Promise((resolve, reject) => { + const cleanup = (): void => { + removeHandler(); + if (timeout) { + clearTimeout(timeout); + } + }; + const complete = (completion: CodexNativeCompactionCompletion): void => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(completion); + }; + const fail = (error: Error): void => { + if (settled) { + return; + } + settled = true; + cleanup(); + reject(error); + }; + failWaiter = fail; + const handler: CodexServerNotificationHandler = (notification) => { + const completion = readNativeCompactionCompletion(notification, threadId); + if (completion) { + complete(completion); + } + }; + removeHandler = client.addNotificationHandler(handler); + }); + + return { + promise, + startTimeout(): void { + if (settled || timeout) { + return; + } + timeout = setTimeout(() => { + failWaiter(new Error(`timed out waiting for codex app-server compaction for ${threadId}`)); + }, resolveCompactionWaitTimeoutMs()); + timeout.unref?.(); + }, + cancel(): void { + if (settled) { + return; + } + settled = true; + removeHandler(); + if (timeout) { + clearTimeout(timeout); + } + }, + }; +} + +function readNativeCompactionCompletion( + notification: CodexServerNotification, + threadId: string, +): CodexNativeCompactionCompletion | undefined { + const params = notification.params; + if (!isJsonObject(params) || readString(params, "threadId", "thread_id") !== threadId) { + return undefined; + } + if (notification.method === "thread/compacted") { + return { + signal: "thread/compacted", + turnId: readString(params, "turnId", "turn_id"), + }; + } + if (notification.method !== "item/completed") { + return undefined; + } + const item = isJsonObject(params.item) ? params.item : undefined; + if (readString(item, "type") !== "contextCompaction") { + return undefined; + } + return { + signal: "item/completed", + turnId: readString(params, "turnId", "turn_id"), + itemId: readString(item, "id") ?? readString(params, "itemId", "item_id", "id"), + }; +} + +function resolveCompactionWaitTimeoutMs(): number { + const raw = process.env.OPENCLAW_CODEX_COMPACTION_WAIT_TIMEOUT_MS?.trim(); + const parsed = raw ? Number.parseInt(raw, 10) : NaN; + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + return DEFAULT_CODEX_COMPACTION_WAIT_TIMEOUT_MS; +} + +function readString(params: JsonObject | undefined, ...keys: string[]): string | undefined { + if (!params) { + return undefined; + } + for (const key of keys) { + const value = params[key]; + if (typeof value === "string") { + return value; + } + } + return undefined; +} + +function formatCompactionError(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + return String(error); +} + +export const __testing = { + setCodexAppServerClientFactoryForTests(factory: CodexAppServerClientFactory): void { + clientFactory = factory; + }, + resetCodexAppServerClientFactoryForTests(): void { + clientFactory = getSharedCodexAppServerClient; + }, +} as const; diff --git a/extensions/codex/app-server/dynamic-tools.test.ts b/extensions/codex/app-server/dynamic-tools.test.ts new file mode 100644 index 00000000000..923781157c0 --- /dev/null +++ b/extensions/codex/app-server/dynamic-tools.test.ts @@ -0,0 +1,141 @@ +import type { AgentToolResult } from "@mariozechner/pi-agent-core"; +import type { AnyAgentTool } from "openclaw/plugin-sdk/agent-harness"; +import { describe, expect, it, vi } from "vitest"; +import { createCodexDynamicToolBridge } from "./dynamic-tools.js"; + +function createTool(overrides: Partial): AnyAgentTool { + return { + name: "tts", + description: "Convert text to speech.", + parameters: { type: "object", properties: {} }, + execute: vi.fn(), + ...overrides, + } as unknown as AnyAgentTool; +} + +describe("createCodexDynamicToolBridge", () => { + it.each([ + { toolName: "tts", mediaUrl: "/tmp/reply.opus", audioAsVoice: true }, + { toolName: "image_generate", mediaUrl: "/tmp/generated.png" }, + { toolName: "video_generate", mediaUrl: "https://media.example/video.mp4" }, + { toolName: "music_generate", mediaUrl: "https://media.example/music.wav" }, + ])( + "preserves structured media artifacts from $toolName tool results", + async ({ toolName, mediaUrl, audioAsVoice }) => { + const toolResult = { + content: [{ type: "text", text: "Generated media reply." }], + details: { + media: { + mediaUrl, + ...(audioAsVoice === true ? { audioAsVoice: true } : {}), + }, + }, + } satisfies AgentToolResult; + const tool = createTool({ + name: toolName, + execute: vi.fn(async () => toolResult), + }); + const bridge = createCodexDynamicToolBridge({ + tools: [tool], + signal: new AbortController().signal, + }); + + const result = await bridge.handleToolCall({ + threadId: "thread-1", + turnId: "turn-1", + callId: "call-1", + tool: toolName, + arguments: { prompt: "hello" }, + }); + + expect(result).toEqual({ + success: true, + contentItems: [{ type: "inputText", text: "Generated media reply." }], + }); + expect(bridge.telemetry.toolMediaUrls).toEqual([mediaUrl]); + expect(bridge.telemetry.toolAudioAsVoice).toBe(audioAsVoice === true); + }, + ); + + it("preserves audio-as-voice metadata from tts results", async () => { + const toolResult = { + content: [{ type: "text", text: "Generated audio reply." }], + details: { + media: { + mediaUrl: "/tmp/reply.opus", + audioAsVoice: true, + }, + }, + } satisfies AgentToolResult; + const tool = createTool({ + execute: vi.fn(async () => toolResult), + }); + const bridge = createCodexDynamicToolBridge({ + tools: [tool], + signal: new AbortController().signal, + }); + + const result = await bridge.handleToolCall({ + threadId: "thread-1", + turnId: "turn-1", + callId: "call-1", + tool: "tts", + arguments: { text: "hello" }, + }); + + expect(result).toEqual({ + success: true, + contentItems: [{ type: "inputText", text: "Generated audio reply." }], + }); + expect(bridge.telemetry.toolMediaUrls).toEqual(["/tmp/reply.opus"]); + expect(bridge.telemetry.toolAudioAsVoice).toBe(true); + }); + + it("records messaging tool side effects while returning concise text to app-server", async () => { + const toolResult = { + content: [{ type: "text", text: "Sent." }], + details: { messageId: "message-1" }, + } satisfies AgentToolResult; + const tool = createTool({ + name: "message", + execute: vi.fn(async () => toolResult), + }); + const bridge = createCodexDynamicToolBridge({ + tools: [tool], + signal: new AbortController().signal, + }); + + const result = await bridge.handleToolCall({ + threadId: "thread-1", + turnId: "turn-1", + callId: "call-1", + tool: "message", + arguments: { + action: "send", + text: "hello from Codex", + mediaUrl: "/tmp/reply.png", + provider: "telegram", + to: "chat-1", + threadId: "thread-ts-1", + }, + }); + + expect(result).toEqual({ + success: true, + contentItems: [{ type: "inputText", text: "Sent." }], + }); + expect(bridge.telemetry).toMatchObject({ + didSendViaMessagingTool: true, + messagingToolSentTexts: ["hello from Codex"], + messagingToolSentMediaUrls: ["/tmp/reply.png"], + messagingToolSentTargets: [ + { + tool: "message", + provider: "telegram", + to: "chat-1", + threadId: "thread-ts-1", + }, + ], + }); + }); +}); diff --git a/extensions/codex/app-server/dynamic-tools.ts b/extensions/codex/app-server/dynamic-tools.ts new file mode 100644 index 00000000000..5e56dc65375 --- /dev/null +++ b/extensions/codex/app-server/dynamic-tools.ts @@ -0,0 +1,217 @@ +import type { AgentToolResult } from "@mariozechner/pi-agent-core"; +import type { ImageContent, TextContent } from "@mariozechner/pi-ai"; +import { + extractToolResultMediaArtifact, + filterToolResultMediaUrls, + isMessagingTool, + isMessagingToolSendAction, + type AnyAgentTool, + type MessagingToolSend, +} from "openclaw/plugin-sdk/agent-harness"; +import { + type CodexDynamicToolCallOutputContentItem, + type CodexDynamicToolCallParams, + type CodexDynamicToolCallResponse, + type CodexDynamicToolSpec, + type JsonValue, +} from "./protocol.js"; + +export type CodexDynamicToolBridge = { + specs: CodexDynamicToolSpec[]; + handleToolCall: (params: CodexDynamicToolCallParams) => Promise; + telemetry: { + didSendViaMessagingTool: boolean; + messagingToolSentTexts: string[]; + messagingToolSentMediaUrls: string[]; + messagingToolSentTargets: MessagingToolSend[]; + toolMediaUrls: string[]; + toolAudioAsVoice: boolean; + successfulCronAdds?: number; + }; +}; + +export function createCodexDynamicToolBridge(params: { + tools: AnyAgentTool[]; + signal: AbortSignal; +}): CodexDynamicToolBridge { + const toolMap = new Map(params.tools.map((tool) => [tool.name, tool])); + const telemetry: CodexDynamicToolBridge["telemetry"] = { + didSendViaMessagingTool: false, + messagingToolSentTexts: [], + messagingToolSentMediaUrls: [], + messagingToolSentTargets: [], + toolMediaUrls: [], + toolAudioAsVoice: false, + }; + + return { + specs: params.tools.map((tool) => ({ + name: tool.name, + description: tool.description, + inputSchema: toJsonValue(tool.parameters), + })), + telemetry, + handleToolCall: async (call) => { + const tool = toolMap.get(call.tool); + if (!tool) { + return { + contentItems: [{ type: "inputText", text: `Unknown OpenClaw tool: ${call.tool}` }], + success: false, + }; + } + const args = jsonObjectToRecord(call.arguments); + try { + const preparedArgs = tool.prepareArguments ? tool.prepareArguments(args) : args; + const result = await tool.execute(call.callId, preparedArgs, params.signal); + collectToolTelemetry({ + toolName: tool.name, + args, + result, + telemetry, + isError: false, + }); + return { + contentItems: result.content.flatMap(convertToolContent), + success: true, + }; + } catch (error) { + collectToolTelemetry({ + toolName: tool.name, + args, + result: undefined, + telemetry, + isError: true, + }); + return { + contentItems: [ + { + type: "inputText", + text: error instanceof Error ? error.message : String(error), + }, + ], + success: false, + }; + } + }, + }; +} + +function collectToolTelemetry(params: { + toolName: string; + args: Record; + result: AgentToolResult | undefined; + telemetry: CodexDynamicToolBridge["telemetry"]; + isError: boolean; +}): void { + if (!params.isError && params.toolName === "cron" && isCronAddAction(params.args)) { + params.telemetry.successfulCronAdds = (params.telemetry.successfulCronAdds ?? 0) + 1; + } + if (!params.isError && params.result) { + const media = extractToolResultMediaArtifact(params.result); + if (media) { + const mediaUrls = filterToolResultMediaUrls(params.toolName, media.mediaUrls, params.result); + const seen = new Set(params.telemetry.toolMediaUrls); + for (const mediaUrl of mediaUrls) { + if (!seen.has(mediaUrl)) { + seen.add(mediaUrl); + params.telemetry.toolMediaUrls.push(mediaUrl); + } + } + if (media.audioAsVoice) { + params.telemetry.toolAudioAsVoice = true; + } + } + } + if ( + !isMessagingTool(params.toolName) || + !isMessagingToolSendAction(params.toolName, params.args) + ) { + return; + } + params.telemetry.didSendViaMessagingTool = true; + const text = readFirstString(params.args, ["text", "message", "body", "content"]); + if (text) { + params.telemetry.messagingToolSentTexts.push(text); + } + params.telemetry.messagingToolSentMediaUrls.push(...collectMediaUrls(params.args)); + params.telemetry.messagingToolSentTargets.push({ + tool: params.toolName, + provider: readFirstString(params.args, ["provider", "channel"]) ?? params.toolName, + accountId: readFirstString(params.args, ["accountId", "account_id"]), + to: readFirstString(params.args, ["to", "target", "recipient"]), + threadId: readFirstString(params.args, ["threadId", "thread_id", "messageThreadId"]), + }); +} + +function convertToolContent( + content: TextContent | ImageContent, +): CodexDynamicToolCallOutputContentItem[] { + if (content.type === "text") { + return [{ type: "inputText", text: content.text }]; + } + return [ + { + type: "inputImage", + imageUrl: `data:${content.mimeType};base64,${content.data}`, + }, + ]; +} + +function toJsonValue(value: unknown): JsonValue { + try { + const text = JSON.stringify(value); + if (!text) { + return {}; + } + return JSON.parse(text) as JsonValue; + } catch { + return {}; + } +} + +function jsonObjectToRecord(value: JsonValue | undefined): Record { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return {}; + } + return value as Record; +} + +function readFirstString(record: Record, keys: string[]): string | undefined { + for (const key of keys) { + const value = record[key]; + if (typeof value === "string" && value.trim()) { + return value.trim(); + } + if (typeof value === "number" && Number.isFinite(value)) { + return String(value); + } + } + return undefined; +} + +function collectMediaUrls(record: Record): string[] { + const urls: string[] = []; + for (const key of ["mediaUrl", "media_url", "imageUrl", "image_url"]) { + const value = record[key]; + if (typeof value === "string" && value.trim()) { + urls.push(value.trim()); + } + } + for (const key of ["mediaUrls", "media_urls", "imageUrls", "image_urls"]) { + const value = record[key]; + if (!Array.isArray(value)) { + continue; + } + for (const entry of value) { + if (typeof entry === "string" && entry.trim()) { + urls.push(entry.trim()); + } + } + } + return urls; +} + +function isCronAddAction(args: Record): boolean { + const action = args.action; + return typeof action === "string" && action.trim().toLowerCase() === "add"; +} diff --git a/extensions/codex/app-server/event-projector.test.ts b/extensions/codex/app-server/event-projector.test.ts new file mode 100644 index 00000000000..4d66c40c1b5 --- /dev/null +++ b/extensions/codex/app-server/event-projector.test.ts @@ -0,0 +1,200 @@ +import type { Api, Model } from "@mariozechner/pi-ai"; +import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness"; +import { describe, expect, it, vi } from "vitest"; +import { CodexAppServerEventProjector } from "./event-projector.js"; + +function createParams(): EmbeddedRunAttemptParams { + return { + prompt: "hello", + sessionId: "session-1", + provider: "openai-codex", + modelId: "gpt-5.4-codex", + model: { + id: "gpt-5.4-codex", + name: "gpt-5.4-codex", + provider: "openai-codex", + api: "openai-codex-responses", + input: ["text"], + reasoning: true, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128_000, + maxTokens: 8_000, + } as Model, + thinkLevel: "medium", + } as unknown as EmbeddedRunAttemptParams; +} + +describe("CodexAppServerEventProjector", () => { + it("projects assistant deltas and usage into embedded attempt results", async () => { + const onAssistantMessageStart = vi.fn(); + const onPartialReply = vi.fn(); + const params = { + ...createParams(), + onAssistantMessageStart, + onPartialReply, + }; + const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + + await projector.handleNotification({ + method: "item/agentMessage/delta", + params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "hel" }, + }); + await projector.handleNotification({ + method: "item/agentMessage/delta", + params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "lo" }, + }); + await projector.handleNotification({ + method: "thread/tokenUsage/updated", + params: { + threadId: "thread-1", + turnId: "turn-1", + tokenUsage: { + total: { + totalTokens: 12, + inputTokens: 5, + cachedInputTokens: 2, + outputTokens: 7, + }, + }, + }, + }); + await projector.handleNotification({ + method: "turn/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + turn: { + id: "turn-1", + status: "completed", + items: [{ type: "agentMessage", id: "msg-1", text: "hello" }], + }, + }, + }); + + const result = projector.buildResult({ + didSendViaMessagingTool: false, + messagingToolSentTexts: [], + messagingToolSentMediaUrls: [], + messagingToolSentTargets: [], + }); + + expect(onAssistantMessageStart).toHaveBeenCalledTimes(1); + expect(onPartialReply).toHaveBeenLastCalledWith({ text: "hello" }); + expect(result.assistantTexts).toEqual(["hello"]); + expect(result.lastAssistant?.content).toEqual([{ type: "text", text: "hello" }]); + expect(result.attemptUsage).toMatchObject({ input: 5, output: 7, cacheRead: 2, total: 12 }); + expect(result.replayMetadata.replaySafe).toBe(true); + }); + + it("ignores notifications for other turns", async () => { + const params = createParams(); + const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + + await projector.handleNotification({ + method: "item/agentMessage/delta", + params: { threadId: "thread-1", turnId: "turn-2", itemId: "msg-1", delta: "wrong" }, + }); + + const result = projector.buildResult({ + didSendViaMessagingTool: false, + messagingToolSentTexts: [], + messagingToolSentMediaUrls: [], + messagingToolSentTargets: [], + }); + expect(result.assistantTexts).toEqual([]); + }); + + it("projects reasoning end, plan updates, compaction state, and tool metadata", async () => { + const onReasoningStream = vi.fn(); + const onReasoningEnd = vi.fn(); + const onAgentEvent = vi.fn(); + const params = { + ...createParams(), + onReasoningStream, + onReasoningEnd, + onAgentEvent, + }; + const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + + await projector.handleNotification({ + method: "item/reasoning/textDelta", + params: { threadId: "thread-1", turnId: "turn-1", itemId: "reason-1", delta: "thinking" }, + }); + await projector.handleNotification({ + method: "item/plan/delta", + params: { threadId: "thread-1", turnId: "turn-1", itemId: "plan-1", delta: "- inspect\n" }, + }); + await projector.handleNotification({ + method: "turn/plan/updated", + params: { + threadId: "thread-1", + turnId: "turn-1", + explanation: "next", + plan: [{ step: "patch", status: "in_progress" }], + }, + }); + await projector.handleNotification({ + method: "item/started", + params: { + threadId: "thread-1", + turnId: "turn-1", + item: { type: "contextCompaction", id: "compact-1" }, + }, + }); + expect(projector.isCompacting()).toBe(true); + await projector.handleNotification({ + method: "item/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + item: { type: "contextCompaction", id: "compact-1" }, + }, + }); + expect(projector.isCompacting()).toBe(false); + await projector.handleNotification({ + method: "item/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + item: { + type: "dynamicToolCall", + id: "tool-1", + tool: "sessions_send", + status: "completed", + }, + }, + }); + await projector.handleNotification({ + method: "turn/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + turn: { id: "turn-1", status: "completed", items: [] }, + }, + }); + + const result = projector.buildResult({ + didSendViaMessagingTool: false, + messagingToolSentTexts: [], + messagingToolSentMediaUrls: [], + messagingToolSentTargets: [], + }); + + expect(onReasoningStream).toHaveBeenCalledWith({ text: "thinking" }); + expect(onReasoningEnd).toHaveBeenCalledTimes(1); + expect(onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "plan", + data: expect.objectContaining({ steps: ["patch (in_progress)"] }), + }), + ); + expect(onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "compaction", + data: expect.objectContaining({ phase: "start", itemId: "compact-1" }), + }), + ); + expect(result.toolMetas).toEqual([{ toolName: "sessions_send", meta: "completed" }]); + expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 }); + }); +}); diff --git a/extensions/codex/app-server/event-projector.ts b/extensions/codex/app-server/event-projector.ts new file mode 100644 index 00000000000..71a5b9c9b2a --- /dev/null +++ b/extensions/codex/app-server/event-projector.ts @@ -0,0 +1,604 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { AssistantMessage, Usage } from "@mariozechner/pi-ai"; +import { + formatErrorMessage, + normalizeUsage, + type NormalizedUsage, + type EmbeddedRunAttemptParams, + type EmbeddedRunAttemptResult, + type MessagingToolSend, +} from "openclaw/plugin-sdk/agent-harness"; +import { + isJsonObject, + type CodexServerNotification, + type CodexThreadItem, + type CodexTurn, + type JsonObject, + type JsonValue, +} from "./protocol.js"; + +export type CodexAppServerToolTelemetry = { + didSendViaMessagingTool: boolean; + messagingToolSentTexts: string[]; + messagingToolSentMediaUrls: string[]; + messagingToolSentTargets: MessagingToolSend[]; + toolMediaUrls?: string[]; + toolAudioAsVoice?: boolean; + successfulCronAdds?: number; +}; + +const ZERO_USAGE: Usage = { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, +}; + +export class CodexAppServerEventProjector { + private readonly assistantTextByItem = new Map(); + private readonly reasoningTextByItem = new Map(); + private readonly planTextByItem = new Map(); + private readonly activeItemIds = new Set(); + private readonly completedItemIds = new Set(); + private readonly activeCompactionItemIds = new Set(); + private readonly toolMetas = new Map(); + private assistantStarted = false; + private reasoningStarted = false; + private reasoningEnded = false; + private completedTurn: CodexTurn | undefined; + private promptError: unknown; + private promptErrorSource: EmbeddedRunAttemptResult["promptErrorSource"] = null; + private aborted = false; + private tokenUsage: NormalizedUsage | undefined; + private guardianReviewCount = 0; + private completedCompactionCount = 0; + + constructor( + private readonly params: EmbeddedRunAttemptParams, + private readonly threadId: string, + private readonly turnId: string, + ) {} + + async handleNotification(notification: CodexServerNotification): Promise { + const params = isJsonObject(notification.params) ? notification.params : undefined; + if (!params || !this.isNotificationForTurn(params)) { + return; + } + + switch (notification.method) { + case "item/agentMessage/delta": + await this.handleAssistantDelta(params); + break; + case "item/reasoning/summaryTextDelta": + case "item/reasoning/textDelta": + await this.handleReasoningDelta(params); + break; + case "item/plan/delta": + this.handlePlanDelta(params); + break; + case "turn/plan/updated": + this.handleTurnPlanUpdated(params); + break; + case "item/started": + this.handleItemStarted(params); + break; + case "item/completed": + this.handleItemCompleted(params); + break; + case "item/autoApprovalReview/started": + case "item/autoApprovalReview/completed": + this.guardianReviewCount += 1; + this.params.onAgentEvent?.({ + stream: "codex_app_server.guardian", + data: { method: notification.method }, + }); + break; + case "thread/tokenUsage/updated": + this.handleTokenUsage(params); + break; + case "turn/completed": + await this.handleTurnCompleted(params); + break; + case "error": + this.promptError = readString(params, "message") ?? "codex app-server error"; + this.promptErrorSource = "prompt"; + break; + default: + break; + } + } + + buildResult(toolTelemetry: CodexAppServerToolTelemetry): EmbeddedRunAttemptResult { + const assistantTexts = this.collectAssistantTexts(); + const lastAssistant = + assistantTexts.length > 0 + ? this.createAssistantMessage(assistantTexts.join("\n\n")) + : undefined; + const messagesSnapshot: AgentMessage[] = [ + { + role: "user", + content: this.params.prompt, + timestamp: Date.now(), + }, + ]; + if (lastAssistant) { + messagesSnapshot.push(lastAssistant); + } + const turnFailed = this.completedTurn?.status === "failed"; + const turnInterrupted = this.completedTurn?.status === "interrupted"; + const promptError = + this.promptError ?? + (turnFailed ? (this.completedTurn?.error?.message ?? "codex app-server turn failed") : null); + return { + aborted: this.aborted || turnInterrupted, + timedOut: false, + idleTimedOut: false, + timedOutDuringCompaction: false, + promptError, + promptErrorSource: promptError ? this.promptErrorSource || "prompt" : null, + sessionIdUsed: this.params.sessionId, + bootstrapPromptWarningSignaturesSeen: this.params.bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: this.params.bootstrapPromptWarningSignature, + messagesSnapshot, + assistantTexts, + toolMetas: [...this.toolMetas.values()], + lastAssistant, + didSendViaMessagingTool: toolTelemetry.didSendViaMessagingTool, + messagingToolSentTexts: toolTelemetry.messagingToolSentTexts, + messagingToolSentMediaUrls: toolTelemetry.messagingToolSentMediaUrls, + messagingToolSentTargets: toolTelemetry.messagingToolSentTargets, + toolMediaUrls: toolTelemetry.toolMediaUrls, + toolAudioAsVoice: toolTelemetry.toolAudioAsVoice, + successfulCronAdds: toolTelemetry.successfulCronAdds, + cloudCodeAssistFormatError: false, + attemptUsage: this.tokenUsage, + replayMetadata: { + hadPotentialSideEffects: toolTelemetry.didSendViaMessagingTool, + replaySafe: !toolTelemetry.didSendViaMessagingTool, + }, + itemLifecycle: { + startedCount: this.activeItemIds.size + this.completedItemIds.size, + completedCount: this.completedItemIds.size, + activeCount: this.activeItemIds.size, + ...(this.completedCompactionCount > 0 + ? { compactionCount: this.completedCompactionCount } + : {}), + }, + yieldDetected: false, + didSendDeterministicApprovalPrompt: this.guardianReviewCount > 0 ? false : undefined, + }; + } + + markTimedOut(): void { + this.aborted = true; + this.promptError = "codex app-server attempt timed out"; + this.promptErrorSource = "prompt"; + } + + isCompacting(): boolean { + return this.activeCompactionItemIds.size > 0; + } + + private async handleAssistantDelta(params: JsonObject): Promise { + const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "assistant"; + const delta = readString(params, "delta") ?? ""; + if (!delta) { + return; + } + if (!this.assistantStarted) { + this.assistantStarted = true; + await this.params.onAssistantMessageStart?.(); + } + const text = `${this.assistantTextByItem.get(itemId) ?? ""}${delta}`; + this.assistantTextByItem.set(itemId, text); + await this.params.onPartialReply?.({ text }); + } + + private async handleReasoningDelta(params: JsonObject): Promise { + const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "reasoning"; + const delta = readString(params, "delta") ?? ""; + if (!delta) { + return; + } + this.reasoningStarted = true; + this.reasoningTextByItem.set(itemId, `${this.reasoningTextByItem.get(itemId) ?? ""}${delta}`); + await this.params.onReasoningStream?.({ text: delta }); + } + + private handlePlanDelta(params: JsonObject): void { + const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "plan"; + const delta = readString(params, "delta") ?? ""; + if (!delta) { + return; + } + const text = `${this.planTextByItem.get(itemId) ?? ""}${delta}`; + this.planTextByItem.set(itemId, text); + this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(text) }); + } + + private handleTurnPlanUpdated(params: JsonObject): void { + const plan = Array.isArray(params.plan) + ? params.plan.flatMap((entry) => { + if (!isJsonObject(entry)) { + return []; + } + const step = readString(entry, "step"); + const status = readString(entry, "status"); + if (!step) { + return []; + } + return status ? [`${step} (${status})`] : [step]; + }) + : undefined; + this.emitPlanUpdate({ + explanation: readNullableString(params, "explanation"), + steps: plan, + }); + } + + private handleItemStarted(params: JsonObject): void { + const item = readItem(params.item); + const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id"); + if (itemId) { + this.activeItemIds.add(itemId); + } + if (item?.type === "contextCompaction" && itemId) { + this.activeCompactionItemIds.add(itemId); + this.params.onAgentEvent?.({ + stream: "compaction", + data: { + phase: "start", + backend: "codex-app-server", + threadId: this.threadId, + turnId: this.turnId, + itemId, + }, + }); + } + this.emitStandardItemEvent({ phase: "start", item }); + this.params.onAgentEvent?.({ + stream: "codex_app_server.item", + data: { phase: "started", itemId, type: item?.type }, + }); + } + + private handleItemCompleted(params: JsonObject): void { + const item = readItem(params.item); + const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id"); + if (itemId) { + this.activeItemIds.delete(itemId); + this.completedItemIds.add(itemId); + } + if (item?.type === "agentMessage" && typeof item.text === "string" && item.text) { + this.assistantTextByItem.set(item.id, item.text); + } + if (item?.type === "plan" && typeof item.text === "string" && item.text) { + this.planTextByItem.set(item.id, item.text); + this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(item.text) }); + } + if (item?.type === "contextCompaction" && itemId) { + this.activeCompactionItemIds.delete(itemId); + this.completedCompactionCount += 1; + this.params.onAgentEvent?.({ + stream: "compaction", + data: { + phase: "end", + backend: "codex-app-server", + threadId: this.threadId, + turnId: this.turnId, + itemId, + }, + }); + } + this.recordToolMeta(item); + this.emitStandardItemEvent({ phase: "end", item }); + this.params.onAgentEvent?.({ + stream: "codex_app_server.item", + data: { phase: "completed", itemId, type: item?.type }, + }); + } + + private handleTokenUsage(params: JsonObject): void { + const tokenUsage = isJsonObject(params.tokenUsage) ? params.tokenUsage : undefined; + const total = tokenUsage && isJsonObject(tokenUsage.total) ? tokenUsage.total : undefined; + if (!total) { + return; + } + this.tokenUsage = normalizeUsage({ + input: readNumber(total, "inputTokens"), + output: readNumber(total, "outputTokens"), + cacheRead: readNumber(total, "cachedInputTokens"), + total: readNumber(total, "totalTokens"), + }); + } + + private async handleTurnCompleted(params: JsonObject): Promise { + const turn = readTurn(params.turn); + if (!turn || turn.id !== this.turnId) { + return; + } + this.completedTurn = turn; + if (turn.status === "interrupted") { + this.aborted = true; + } + if (turn.status === "failed") { + this.promptError = turn.error?.message ?? "codex app-server turn failed"; + this.promptErrorSource = "prompt"; + } + for (const item of turn.items ?? []) { + if (item.type === "agentMessage" && typeof item.text === "string" && item.text) { + this.assistantTextByItem.set(item.id, item.text); + } + if (item.type === "plan" && typeof item.text === "string" && item.text) { + this.planTextByItem.set(item.id, item.text); + this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(item.text) }); + } + this.recordToolMeta(item); + } + this.activeCompactionItemIds.clear(); + await this.maybeEndReasoning(); + } + + private async maybeEndReasoning(): Promise { + if (!this.reasoningStarted || this.reasoningEnded) { + return; + } + this.reasoningEnded = true; + await this.params.onReasoningEnd?.(); + } + + private emitPlanUpdate(params: { explanation?: string | null; steps?: string[] }): void { + if (!params.explanation && (!params.steps || params.steps.length === 0)) { + return; + } + this.params.onAgentEvent?.({ + stream: "plan", + data: { + phase: "update", + title: "Plan updated", + source: "codex-app-server", + ...(params.explanation ? { explanation: params.explanation } : {}), + ...(params.steps && params.steps.length > 0 ? { steps: params.steps } : {}), + }, + }); + } + + private emitStandardItemEvent(params: { + phase: "start" | "end"; + item: CodexThreadItem | undefined; + }): void { + const { item } = params; + if (!item) { + return; + } + const kind = itemKind(item); + if (!kind) { + return; + } + this.params.onAgentEvent?.({ + stream: "item", + data: { + itemId: item.id, + phase: params.phase, + kind, + title: itemTitle(item), + status: params.phase === "start" ? "running" : itemStatus(item), + ...(itemName(item) ? { name: itemName(item) } : {}), + ...(itemMeta(item) ? { meta: itemMeta(item) } : {}), + }, + }); + } + + private recordToolMeta(item: CodexThreadItem | undefined): void { + if (!item) { + return; + } + const toolName = itemName(item); + if (!toolName) { + return; + } + this.toolMetas.set(item.id, { + toolName, + ...(itemMeta(item) ? { meta: itemMeta(item) } : {}), + }); + } + + private collectAssistantTexts(): string[] { + return [...this.assistantTextByItem.values()].filter((text) => text.trim().length > 0); + } + + private createAssistantMessage(text: string): AssistantMessage { + const usage: Usage = this.tokenUsage + ? { + input: this.tokenUsage.input ?? 0, + output: this.tokenUsage.output ?? 0, + cacheRead: this.tokenUsage.cacheRead ?? 0, + cacheWrite: this.tokenUsage.cacheWrite ?? 0, + totalTokens: + this.tokenUsage.total ?? + (this.tokenUsage.input ?? 0) + + (this.tokenUsage.output ?? 0) + + (this.tokenUsage.cacheRead ?? 0) + + (this.tokenUsage.cacheWrite ?? 0), + cost: ZERO_USAGE.cost, + } + : ZERO_USAGE; + return { + role: "assistant", + content: [{ type: "text", text }], + api: this.params.model.api ?? "openai-codex-responses", + provider: this.params.provider, + model: this.params.modelId, + usage, + stopReason: this.aborted ? "aborted" : this.promptError ? "error" : "stop", + errorMessage: this.promptError ? formatErrorMessage(this.promptError) : undefined, + timestamp: Date.now(), + }; + } + + private isNotificationForTurn(params: JsonObject): boolean { + const threadId = readString(params, "threadId"); + const turnId = readString(params, "turnId"); + return (!threadId || threadId === this.threadId) && (!turnId || turnId === this.turnId); + } +} + +function readString(record: JsonObject, key: string): string | undefined { + const value = record[key]; + return typeof value === "string" ? value : undefined; +} + +function readNullableString(record: JsonObject, key: string): string | null | undefined { + const value = record[key]; + if (value === null) { + return null; + } + return typeof value === "string" ? value : undefined; +} + +function readNumber(record: JsonObject, key: string): number | undefined { + const value = record[key]; + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function splitPlanText(text: string): string[] { + return text + .split(/\r?\n/) + .map((line) => line.trim().replace(/^[-*]\s+/, "")) + .filter((line) => line.length > 0); +} + +function itemKind( + item: CodexThreadItem, +): "tool" | "command" | "patch" | "search" | "analysis" | undefined { + switch (item.type) { + case "dynamicToolCall": + case "mcpToolCall": + return "tool"; + case "commandExecution": + return "command"; + case "fileChange": + return "patch"; + case "webSearch": + return "search"; + case "reasoning": + case "contextCompaction": + return "analysis"; + default: + return undefined; + } +} + +function itemTitle(item: CodexThreadItem): string { + switch (item.type) { + case "commandExecution": + return "Command"; + case "fileChange": + return "File change"; + case "mcpToolCall": + return "MCP tool"; + case "dynamicToolCall": + return "Tool"; + case "webSearch": + return "Web search"; + case "contextCompaction": + return "Context compaction"; + case "reasoning": + return "Reasoning"; + default: + return item.type; + } +} + +function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" { + const status = readItemString(item, "status"); + if (status === "failed") { + return "failed"; + } + if (status === "inProgress" || status === "running") { + return "running"; + } + return "completed"; +} + +function itemName(item: CodexThreadItem): string | undefined { + if (item.type === "dynamicToolCall" && typeof item.tool === "string") { + return item.tool; + } + if (item.type === "mcpToolCall" && typeof item.tool === "string") { + const server = typeof item.server === "string" ? item.server : undefined; + return server ? `${server}.${item.tool}` : item.tool; + } + if (item.type === "commandExecution") { + return "bash"; + } + if (item.type === "fileChange") { + return "apply_patch"; + } + if (item.type === "webSearch") { + return "web_search"; + } + return undefined; +} + +function itemMeta(item: CodexThreadItem): string | undefined { + if (item.type === "commandExecution" && typeof item.command === "string") { + return item.command; + } + if (item.type === "webSearch" && typeof item.query === "string") { + return item.query; + } + return readItemString(item, "status"); +} + +function readItemString(item: CodexThreadItem, key: string): string | undefined { + const value = (item as Record)[key]; + return typeof value === "string" ? value : undefined; +} + +function readItem(value: JsonValue | undefined): CodexThreadItem | undefined { + if (!isJsonObject(value)) { + return undefined; + } + const type = typeof value.type === "string" ? value.type : undefined; + const id = typeof value.id === "string" ? value.id : undefined; + if (!type || !id) { + return undefined; + } + return value as CodexThreadItem; +} + +function readTurn(value: JsonValue | undefined): CodexTurn | undefined { + if (!isJsonObject(value)) { + return undefined; + } + const id = typeof value.id === "string" ? value.id : undefined; + const status = typeof value.status === "string" ? value.status : undefined; + if (!id || !status) { + return undefined; + } + const items = Array.isArray(value.items) + ? value.items.flatMap((item) => { + const parsed = readItem(item); + return parsed ? [parsed] : []; + }) + : undefined; + return { + id, + status: status as CodexTurn["status"], + error: isJsonObject(value.error) + ? { + message: typeof value.error.message === "string" ? value.error.message : undefined, + } + : null, + items, + }; +} diff --git a/extensions/codex/app-server/protocol.ts b/extensions/codex/app-server/protocol.ts new file mode 100644 index 00000000000..fd85987ec79 --- /dev/null +++ b/extensions/codex/app-server/protocol.ts @@ -0,0 +1,185 @@ +export type JsonPrimitive = null | boolean | number | string; +export type JsonValue = JsonPrimitive | JsonValue[] | { [key: string]: JsonValue }; +export type JsonObject = { [key: string]: JsonValue }; + +export type RpcRequest = { + id?: number | string; + method: string; + params?: JsonValue; +}; + +export type RpcResponse = { + id: number | string; + result?: JsonValue; + error?: { + code?: number; + message: string; + data?: JsonValue; + }; +}; + +export type RpcMessage = RpcRequest | RpcResponse; + +export type CodexUserInput = + | { + type: "text"; + text: string; + } + | { + type: "image"; + url: string; + } + | { + type: "localImage"; + path: string; + }; + +export type CodexDynamicToolSpec = { + name: string; + description: string; + inputSchema: JsonValue; + deferLoading?: boolean; +}; + +export type CodexThreadStartParams = { + model?: string | null; + modelProvider?: string | null; + cwd?: string | null; + approvalPolicy?: "never" | "on-request" | "on-failure" | "untrusted"; + approvalsReviewer?: "user" | "guardian_subagent"; + sandbox?: "read-only" | "workspace-write" | "danger-full-access"; + config?: JsonObject | null; + serviceName?: string | null; + baseInstructions?: string | null; + developerInstructions?: string | null; + ephemeral?: boolean | null; + dynamicTools?: CodexDynamicToolSpec[] | null; + experimentalRawEvents: boolean; + persistExtendedHistory: boolean; +}; + +export type CodexThreadResumeParams = { + threadId: string; +}; + +export type CodexThreadStartResponse = { + thread: CodexThread; + model?: string | null; + modelProvider?: string | null; +}; + +export type CodexThreadResumeResponse = CodexThreadStartResponse; + +export type CodexTurnStartParams = { + threadId: string; + input: CodexUserInput[]; + cwd?: string | null; + approvalPolicy?: "never" | "on-request" | "on-failure" | "untrusted"; + approvalsReviewer?: "user" | "guardian_subagent"; + model?: string | null; + effort?: "minimal" | "low" | "medium" | "high" | "xhigh" | null; +}; + +export type CodexTurnSteerParams = { + threadId: string; + expectedTurnId: string; + input: CodexUserInput[]; +}; + +export type CodexTurnInterruptParams = { + threadId: string; + turnId: string; +}; + +export type CodexTurnStartResponse = { + turn: CodexTurn; +}; + +export type CodexThread = { + id: string; + status?: string; + cwd?: string | null; + turns?: CodexTurn[]; +}; + +export type CodexTurn = { + id: string; + status: "completed" | "interrupted" | "failed" | "inProgress"; + error?: { + message?: string; + } | null; + items?: CodexThreadItem[]; +}; + +export type CodexThreadItem = + | { + type: "agentMessage"; + id: string; + text?: string; + } + | { + type: "reasoning"; + id: string; + summary?: string[]; + content?: string[]; + } + | { + type: "plan"; + id: string; + text?: string; + } + | { + type: "dynamicToolCall"; + id: string; + tool?: string; + status?: string; + } + | { + type: string; + id: string; + status?: string; + [key: string]: JsonValue | undefined; + }; + +export type CodexServerNotification = { + method: string; + params?: JsonValue; +}; + +export type CodexDynamicToolCallParams = { + threadId: string; + turnId: string; + callId: string; + tool: string; + arguments?: JsonValue; +}; + +export type CodexDynamicToolCallResponse = { + contentItems: CodexDynamicToolCallOutputContentItem[]; + success: boolean; +}; + +export type CodexDynamicToolCallOutputContentItem = + | { + type: "inputText"; + text: string; + } + | { + type: "inputImage"; + imageUrl: string; + }; + +export function isJsonObject(value: JsonValue | undefined): value is JsonObject { + return Boolean(value && typeof value === "object" && !Array.isArray(value)); +} + +export function isRpcResponse(message: RpcMessage): message is RpcResponse { + return "id" in message && !("method" in message); +} + +export function coerceJsonObject(value: unknown): JsonObject | undefined { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return undefined; + } + return value as JsonObject; +} diff --git a/extensions/codex/app-server/run-attempt.test.ts b/extensions/codex/app-server/run-attempt.test.ts new file mode 100644 index 00000000000..a4168d8aaa9 --- /dev/null +++ b/extensions/codex/app-server/run-attempt.test.ts @@ -0,0 +1,229 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { Api, Model } from "@mariozechner/pi-ai"; +import { + abortAgentHarnessRun, + queueAgentHarnessMessage, + type EmbeddedRunAttemptParams, +} from "openclaw/plugin-sdk/agent-harness"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CodexServerNotification } from "./protocol.js"; +import { runCodexAppServerAttempt, __testing } from "./run-attempt.js"; + +let tempDir: string; + +function createParams(sessionFile: string, workspaceDir: string): EmbeddedRunAttemptParams { + return { + prompt: "hello", + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile, + workspaceDir, + runId: "run-1", + provider: "codex", + modelId: "gpt-5.4-codex", + model: { + id: "gpt-5.4-codex", + name: "gpt-5.4-codex", + provider: "codex", + api: "openai-codex-responses", + input: ["text"], + reasoning: true, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128_000, + maxTokens: 8_000, + } as Model, + thinkLevel: "medium", + disableTools: true, + timeoutMs: 5_000, + authStorage: {} as never, + modelRegistry: {} as never, + } as EmbeddedRunAttemptParams; +} + +describe("runCodexAppServerAttempt", () => { + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-run-")); + }); + + afterEach(async () => { + __testing.resetCodexAppServerClientFactoryForTests(); + vi.restoreAllMocks(); + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + it("forwards queued user input and aborts the active app-server turn", async () => { + const requests: Array<{ method: string; params: unknown }> = []; + const request = vi.fn(async (method: string, params?: unknown) => { + requests.push({ method, params }); + if (method === "thread/start") { + return { thread: { id: "thread-1" }, model: "gpt-5.4-codex", modelProvider: "openai" }; + } + if (method === "turn/start") { + return { turn: { id: "turn-1", status: "inProgress" } }; + } + return {}; + }); + __testing.setCodexAppServerClientFactoryForTests( + async () => + ({ + request, + addNotificationHandler: () => () => undefined, + addRequestHandler: () => () => undefined, + }) as never, + ); + + const run = runCodexAppServerAttempt( + createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")), + ); + await vi.waitFor(() => + expect(requests.some((entry) => entry.method === "turn/start")).toBe(true), + ); + + expect(queueAgentHarnessMessage("session-1", "more context")).toBe(true); + await vi.waitFor(() => + expect(requests.some((entry) => entry.method === "turn/steer")).toBe(true), + ); + expect(abortAgentHarnessRun("session-1")).toBe(true); + await vi.waitFor(() => + expect(requests.some((entry) => entry.method === "turn/interrupt")).toBe(true), + ); + + const result = await run; + expect(result.aborted).toBe(true); + expect(requests).toEqual( + expect.arrayContaining([ + { + method: "thread/start", + params: expect.objectContaining({ + model: "gpt-5.4-codex", + modelProvider: "openai", + }), + }, + { + method: "turn/steer", + params: { + threadId: "thread-1", + expectedTurnId: "turn-1", + input: [{ type: "text", text: "more context" }], + }, + }, + { + method: "turn/interrupt", + params: { threadId: "thread-1", turnId: "turn-1" }, + }, + ]), + ); + }); + + it("forwards image attachments to the app-server turn input", async () => { + const requests: Array<{ method: string; params: unknown }> = []; + let notify: (notification: CodexServerNotification) => Promise = async () => undefined; + const request = vi.fn(async (method: string, params?: unknown) => { + requests.push({ method, params }); + if (method === "thread/start") { + return { thread: { id: "thread-1" }, model: "gpt-5.4-codex", modelProvider: "openai" }; + } + if (method === "turn/start") { + return { turn: { id: "turn-1", status: "inProgress" } }; + } + return {}; + }); + __testing.setCodexAppServerClientFactoryForTests( + async () => + ({ + request, + addNotificationHandler: (handler: typeof notify) => { + notify = handler; + return () => undefined; + }, + addRequestHandler: () => () => undefined, + }) as never, + ); + const params = createParams( + path.join(tempDir, "session.jsonl"), + path.join(tempDir, "workspace"), + ); + params.model = { + ...params.model, + input: ["text", "image"], + } as Model; + params.images = [ + { + type: "image", + mimeType: "image/png", + data: "aW1hZ2UtYnl0ZXM=", + }, + ]; + + const run = runCodexAppServerAttempt(params); + await vi.waitFor(() => + expect(requests.some((entry) => entry.method === "turn/start")).toBe(true), + ); + await notify({ + method: "turn/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + turn: { id: "turn-1", status: "completed" }, + }, + }); + await run; + + expect(requests).toEqual( + expect.arrayContaining([ + { + method: "turn/start", + params: expect.objectContaining({ + input: [ + { type: "text", text: "hello" }, + { type: "image", url: "data:image/png;base64,aW1hZ2UtYnl0ZXM=" }, + ], + }), + }, + ]), + ); + }); + + it("does not drop turn completion notifications emitted while turn/start is in flight", async () => { + let notify: (notification: CodexServerNotification) => Promise = async () => undefined; + const request = vi.fn(async (method: string) => { + if (method === "thread/start") { + return { thread: { id: "thread-1" }, model: "gpt-5.4-codex", modelProvider: "openai" }; + } + if (method === "turn/start") { + await notify({ + method: "turn/completed", + params: { + threadId: "thread-1", + turnId: "turn-1", + turn: { id: "turn-1", status: "completed" }, + }, + }); + return { turn: { id: "turn-1", status: "completed" } }; + } + return {}; + }); + __testing.setCodexAppServerClientFactoryForTests( + async () => + ({ + request, + addNotificationHandler: (handler: typeof notify) => { + notify = handler; + return () => undefined; + }, + addRequestHandler: () => () => undefined, + }) as never, + ); + + await expect( + runCodexAppServerAttempt( + createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")), + ), + ).resolves.toMatchObject({ + aborted: false, + timedOut: false, + }); + }); +}); diff --git a/extensions/codex/app-server/run-attempt.ts b/extensions/codex/app-server/run-attempt.ts new file mode 100644 index 00000000000..3c1206ceb6f --- /dev/null +++ b/extensions/codex/app-server/run-attempt.ts @@ -0,0 +1,577 @@ +import fs from "node:fs/promises"; +import { + buildEmbeddedAttemptToolRunContext, + clearActiveEmbeddedRun, + createOpenClawCodingTools, + embeddedAgentLog, + isSubagentSessionKey, + normalizeProviderToolSchemas, + resolveAttemptSpawnWorkspaceDir, + resolveModelAuthMode, + resolveOpenClawAgentDir, + resolveSandboxContext, + resolveSessionAgentIds, + resolveUserPath, + setActiveEmbeddedRun, + supportsModelTools, + type EmbeddedRunAttemptParams, + type EmbeddedRunAttemptResult, +} from "openclaw/plugin-sdk/agent-harness"; +import { handleCodexAppServerApprovalRequest } from "./approval-bridge.js"; +import { + getSharedCodexAppServerClient, + isCodexAppServerApprovalRequest, + type CodexAppServerClient, +} from "./client.js"; +import { createCodexDynamicToolBridge } from "./dynamic-tools.js"; +import { CodexAppServerEventProjector } from "./event-projector.js"; +import { + isJsonObject, + type CodexServerNotification, + type CodexDynamicToolCallParams, + type CodexThreadResumeResponse, + type CodexThreadStartResponse, + type CodexTurnStartResponse, + type CodexUserInput, + type JsonObject, + type JsonValue, +} from "./protocol.js"; +import { + clearCodexAppServerBinding, + readCodexAppServerBinding, + writeCodexAppServerBinding, + type CodexAppServerThreadBinding, +} from "./session-binding.js"; +import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js"; + +type CodexAppServerClientFactory = () => Promise; + +let clientFactory: CodexAppServerClientFactory = getSharedCodexAppServerClient; + +export async function runCodexAppServerAttempt( + params: EmbeddedRunAttemptParams, +): Promise { + const resolvedWorkspace = resolveUserPath(params.workspaceDir); + await fs.mkdir(resolvedWorkspace, { recursive: true }); + const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; + const sandbox = await resolveSandboxContext({ + config: params.config, + sessionKey: sandboxSessionKey, + workspaceDir: resolvedWorkspace, + }); + const effectiveWorkspace = sandbox?.enabled + ? sandbox.workspaceAccess === "rw" + ? resolvedWorkspace + : sandbox.workspaceDir + : resolvedWorkspace; + await fs.mkdir(effectiveWorkspace, { recursive: true }); + + const runAbortController = new AbortController(); + const abortFromUpstream = () => { + runAbortController.abort(params.abortSignal?.reason ?? "upstream_abort"); + }; + if (params.abortSignal?.aborted) { + abortFromUpstream(); + } else { + params.abortSignal?.addEventListener("abort", abortFromUpstream, { once: true }); + } + + const { sessionAgentId } = resolveSessionAgentIds({ + sessionKey: params.sessionKey, + config: params.config, + agentId: params.agentId, + }); + const tools = await buildDynamicTools({ + params, + resolvedWorkspace, + effectiveWorkspace, + sandboxSessionKey, + sandbox, + runAbortController, + sessionAgentId, + }); + const toolBridge = createCodexDynamicToolBridge({ + tools, + signal: runAbortController.signal, + }); + const client = await clientFactory(); + const thread = await startOrResumeThread({ + client, + params, + cwd: effectiveWorkspace, + dynamicTools: toolBridge.specs, + }); + + let projector: CodexAppServerEventProjector | undefined; + let turnId: string | undefined; + const pendingNotifications: CodexServerNotification[] = []; + let completed = false; + let timedOut = false; + let resolveCompletion: (() => void) | undefined; + const completion = new Promise((resolve) => { + resolveCompletion = resolve; + }); + + const handleNotification = async (notification: CodexServerNotification) => { + if (!projector || !turnId) { + pendingNotifications.push(notification); + return; + } + await projector.handleNotification(notification); + if ( + notification.method === "turn/completed" && + isTurnNotification(notification.params, turnId) + ) { + completed = true; + resolveCompletion?.(); + } + }; + + const notificationCleanup = client.addNotificationHandler(handleNotification); + const requestCleanup = client.addRequestHandler(async (request) => { + if (!turnId) { + return undefined; + } + if (request.method !== "item/tool/call") { + if (isCodexAppServerApprovalRequest(request.method)) { + return handleApprovalRequest({ + method: request.method, + params: request.params, + paramsForRun: params, + threadId: thread.threadId, + turnId, + signal: runAbortController.signal, + }); + } + return undefined; + } + const call = readDynamicToolCallParams(request.params); + if (!call || call.threadId !== thread.threadId || call.turnId !== turnId) { + return undefined; + } + return toolBridge.handleToolCall(call) as Promise; + }); + + let turn: CodexTurnStartResponse; + try { + turn = await client.request("turn/start", { + threadId: thread.threadId, + input: buildUserInput(params), + cwd: effectiveWorkspace, + approvalPolicy: resolveAppServerApprovalPolicy(), + approvalsReviewer: resolveApprovalsReviewer(), + model: params.modelId, + effort: resolveReasoningEffort(params.thinkLevel), + }); + } catch (error) { + notificationCleanup(); + requestCleanup(); + params.abortSignal?.removeEventListener("abort", abortFromUpstream); + throw error; + } + turnId = turn.turn.id; + projector = new CodexAppServerEventProjector(params, thread.threadId, turnId); + for (const notification of pendingNotifications.splice(0)) { + await handleNotification(notification); + } + const activeTurnId = turnId; + const activeProjector = projector; + + const handle = { + kind: "embedded" as const, + queueMessage: async (text: string) => { + await client.request("turn/steer", { + threadId: thread.threadId, + expectedTurnId: activeTurnId, + input: [{ type: "text", text }], + }); + }, + isStreaming: () => !completed, + isCompacting: () => projector?.isCompacting() ?? false, + cancel: () => runAbortController.abort("cancelled"), + abort: () => runAbortController.abort("aborted"), + }; + setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey); + + const timeout = setTimeout( + () => { + timedOut = true; + projector?.markTimedOut(); + runAbortController.abort("timeout"); + }, + Math.max(100, params.timeoutMs), + ); + + const abortListener = () => { + void client.request("turn/interrupt", { + threadId: thread.threadId, + turnId: activeTurnId, + }); + resolveCompletion?.(); + }; + runAbortController.signal.addEventListener("abort", abortListener, { once: true }); + if (runAbortController.signal.aborted) { + abortListener(); + } + + try { + await completion; + const result = activeProjector.buildResult(toolBridge.telemetry); + await mirrorTranscriptBestEffort({ + params, + result, + threadId: thread.threadId, + turnId: activeTurnId, + }); + return { + ...result, + timedOut, + aborted: result.aborted || runAbortController.signal.aborted, + promptError: timedOut ? "codex app-server attempt timed out" : result.promptError, + promptErrorSource: timedOut ? "prompt" : result.promptErrorSource, + }; + } finally { + clearTimeout(timeout); + notificationCleanup(); + requestCleanup(); + runAbortController.signal.removeEventListener("abort", abortListener); + params.abortSignal?.removeEventListener("abort", abortFromUpstream); + clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey); + } +} + +type DynamicToolBuildParams = { + params: EmbeddedRunAttemptParams; + resolvedWorkspace: string; + effectiveWorkspace: string; + sandboxSessionKey: string; + sandbox: Awaited>; + runAbortController: AbortController; + sessionAgentId: string | undefined; +}; + +async function buildDynamicTools(input: DynamicToolBuildParams) { + const { params } = input; + if (params.disableTools || !supportsModelTools(params.model)) { + return []; + } + const modelHasVision = params.model.input?.includes("image") ?? false; + const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); + const allTools = createOpenClawCodingTools({ + agentId: input.sessionAgentId, + ...buildEmbeddedAttemptToolRunContext(params), + exec: { + ...params.execOverrides, + elevated: params.bashElevated, + }, + sandbox: input.sandbox, + messageProvider: params.messageChannel ?? params.messageProvider, + agentAccountId: params.agentAccountId, + messageTo: params.messageTo, + messageThreadId: params.messageThreadId, + groupId: params.groupId, + groupChannel: params.groupChannel, + groupSpace: params.groupSpace, + spawnedBy: params.spawnedBy, + senderId: params.senderId, + senderName: params.senderName, + senderUsername: params.senderUsername, + senderE164: params.senderE164, + senderIsOwner: params.senderIsOwner, + allowGatewaySubagentBinding: params.allowGatewaySubagentBinding, + sessionKey: input.sandboxSessionKey, + sessionId: params.sessionId, + runId: params.runId, + agentDir, + workspaceDir: input.effectiveWorkspace, + spawnWorkspaceDir: resolveAttemptSpawnWorkspaceDir({ + sandbox: input.sandbox, + resolvedWorkspace: input.resolvedWorkspace, + }), + config: params.config, + abortSignal: input.runAbortController.signal, + modelProvider: params.model.provider, + modelId: params.modelId, + modelCompat: params.model.compat, + modelApi: params.model.api, + modelContextWindowTokens: params.model.contextWindow, + modelAuthMode: resolveModelAuthMode(params.model.provider, params.config), + currentChannelId: params.currentChannelId, + currentThreadTs: params.currentThreadTs, + currentMessageId: params.currentMessageId, + replyToMode: params.replyToMode, + hasRepliedRef: params.hasRepliedRef, + modelHasVision, + requireExplicitMessageTarget: + params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey), + disableMessageTool: params.disableMessageTool, + onYield: (message) => { + params.onAgentEvent?.({ + stream: "codex_app_server.tool", + data: { name: "sessions_yield", message }, + }); + input.runAbortController.abort("sessions_yield"); + }, + }); + const filteredTools = + params.toolsAllow && params.toolsAllow.length > 0 + ? allTools.filter((tool) => params.toolsAllow?.includes(tool.name)) + : allTools; + return normalizeProviderToolSchemas({ + tools: filteredTools, + provider: params.provider, + config: params.config, + workspaceDir: input.effectiveWorkspace, + env: process.env, + modelId: params.modelId, + modelApi: params.model.api, + model: params.model, + }); +} + +async function startOrResumeThread(params: { + client: CodexAppServerClient; + params: EmbeddedRunAttemptParams; + cwd: string; + dynamicTools: JsonValue[]; +}): Promise { + const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools); + const binding = await readCodexAppServerBinding(params.params.sessionFile); + if (binding?.threadId) { + if (binding.dynamicToolsFingerprint !== dynamicToolsFingerprint) { + embeddedAgentLog.debug( + "codex app-server dynamic tool catalog changed; starting a new thread", + { + threadId: binding.threadId, + }, + ); + await clearCodexAppServerBinding(params.params.sessionFile); + } else { + try { + const response = await params.client.request("thread/resume", { + threadId: binding.threadId, + }); + await writeCodexAppServerBinding(params.params.sessionFile, { + threadId: response.thread.id, + cwd: params.cwd, + model: params.params.modelId, + modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider), + dynamicToolsFingerprint, + createdAt: binding.createdAt, + }); + return { + ...binding, + threadId: response.thread.id, + cwd: params.cwd, + model: params.params.modelId, + modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider), + dynamicToolsFingerprint, + }; + } catch (error) { + embeddedAgentLog.warn("codex app-server thread resume failed; starting a new thread", { + error, + }); + await clearCodexAppServerBinding(params.params.sessionFile); + } + } + } + + const response = await params.client.request("thread/start", { + model: params.params.modelId, + modelProvider: normalizeModelProvider(params.params.provider), + cwd: params.cwd, + approvalPolicy: resolveAppServerApprovalPolicy(), + approvalsReviewer: resolveApprovalsReviewer(), + sandbox: resolveAppServerSandbox(), + serviceName: "OpenClaw", + developerInstructions: buildDeveloperInstructions(params.params), + dynamicTools: params.dynamicTools, + experimentalRawEvents: true, + persistExtendedHistory: true, + }); + const createdAt = new Date().toISOString(); + await writeCodexAppServerBinding(params.params.sessionFile, { + threadId: response.thread.id, + cwd: params.cwd, + model: response.model ?? params.params.modelId, + modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider), + dynamicToolsFingerprint, + createdAt, + }); + return { + schemaVersion: 1, + threadId: response.thread.id, + sessionFile: params.params.sessionFile, + cwd: params.cwd, + model: response.model ?? params.params.modelId, + modelProvider: response.modelProvider ?? normalizeModelProvider(params.params.provider), + dynamicToolsFingerprint, + createdAt, + updatedAt: createdAt, + }; +} + +function fingerprintDynamicTools(dynamicTools: JsonValue[]): string { + return JSON.stringify(dynamicTools.map(stabilizeJsonValue)); +} + +function stabilizeJsonValue(value: JsonValue): JsonValue { + if (Array.isArray(value)) { + return value.map(stabilizeJsonValue); + } + if (!isJsonObject(value)) { + return value; + } + const stable: JsonObject = {}; + for (const [key, child] of Object.entries(value).toSorted(([left], [right]) => + left.localeCompare(right), + )) { + stable[key] = stabilizeJsonValue(child); + } + return stable; +} + +function buildDeveloperInstructions(params: EmbeddedRunAttemptParams): string { + const sections = [ + "You are running inside OpenClaw. Use OpenClaw dynamic tools for messaging, cron, sessions, and host actions when available.", + "Preserve the user's existing channel/session context. If sending a channel reply, use the OpenClaw messaging tool instead of describing that you would reply.", + params.extraSystemPrompt, + params.skillsSnapshot?.prompt, + ]; + return sections.filter((section) => typeof section === "string" && section.trim()).join("\n\n"); +} + +function buildUserInput(params: EmbeddedRunAttemptParams): CodexUserInput[] { + return [ + { type: "text", text: params.prompt }, + ...(params.images ?? []).map( + (image): CodexUserInput => ({ + type: "image", + url: `data:${image.mimeType};base64,${image.data}`, + }), + ), + ]; +} + +function normalizeModelProvider(provider: string): string { + return provider === "codex" || provider === "openai-codex" ? "openai" : provider; +} + +function resolveAppServerApprovalPolicy(): "never" | "on-request" | "on-failure" | "untrusted" { + const raw = process.env.OPENCLAW_CODEX_APP_SERVER_APPROVAL_POLICY?.trim(); + if (raw === "on-request" || raw === "on-failure" || raw === "untrusted") { + return raw; + } + return "never"; +} + +function resolveAppServerSandbox(): "read-only" | "workspace-write" | "danger-full-access" { + const raw = process.env.OPENCLAW_CODEX_APP_SERVER_SANDBOX?.trim(); + if (raw === "read-only" || raw === "danger-full-access") { + return raw; + } + return "workspace-write"; +} + +function resolveApprovalsReviewer(): "user" | "guardian_subagent" { + return process.env.OPENCLAW_CODEX_APP_SERVER_GUARDIAN === "1" ? "guardian_subagent" : "user"; +} + +function resolveReasoningEffort( + thinkLevel: EmbeddedRunAttemptParams["thinkLevel"], +): "minimal" | "low" | "medium" | "high" | "xhigh" | null { + if ( + thinkLevel === "minimal" || + thinkLevel === "low" || + thinkLevel === "medium" || + thinkLevel === "high" || + thinkLevel === "xhigh" + ) { + return thinkLevel; + } + return null; +} + +function readDynamicToolCallParams( + value: JsonValue | undefined, +): CodexDynamicToolCallParams | undefined { + if (!isJsonObject(value)) { + return undefined; + } + const threadId = readString(value, "threadId"); + const turnId = readString(value, "turnId"); + const callId = readString(value, "callId"); + const tool = readString(value, "tool"); + if (!threadId || !turnId || !callId || !tool) { + return undefined; + } + return { + threadId, + turnId, + callId, + tool, + arguments: value.arguments, + }; +} + +function isTurnNotification(value: JsonValue | undefined, turnId: string): boolean { + if (!isJsonObject(value)) { + return false; + } + const directTurnId = readString(value, "turnId"); + if (directTurnId === turnId) { + return true; + } + const turn = isJsonObject(value.turn) ? value.turn : undefined; + return readString(turn ?? {}, "id") === turnId; +} + +function readString(record: JsonObject, key: string): string | undefined { + const value = record[key]; + return typeof value === "string" ? value : undefined; +} + +async function mirrorTranscriptBestEffort(params: { + params: EmbeddedRunAttemptParams; + result: EmbeddedRunAttemptResult; + threadId: string; + turnId: string; +}): Promise { + try { + await mirrorCodexAppServerTranscript({ + sessionFile: params.params.sessionFile, + sessionKey: params.params.sessionKey, + messages: params.result.messagesSnapshot, + idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`, + }); + } catch (error) { + embeddedAgentLog.warn("failed to mirror codex app-server transcript", { error }); + } +} + +function handleApprovalRequest(params: { + method: string; + params: JsonValue | undefined; + paramsForRun: EmbeddedRunAttemptParams; + threadId: string; + turnId: string; + signal?: AbortSignal; +}): Promise { + return handleCodexAppServerApprovalRequest({ + method: params.method, + requestParams: params.params, + paramsForRun: params.paramsForRun, + threadId: params.threadId, + turnId: params.turnId, + signal: params.signal, + }); +} + +export const __testing = { + setCodexAppServerClientFactoryForTests(factory: CodexAppServerClientFactory): void { + clientFactory = factory; + }, + resetCodexAppServerClientFactoryForTests(): void { + clientFactory = getSharedCodexAppServerClient; + }, +} as const; diff --git a/extensions/codex/app-server/session-binding.test.ts b/extensions/codex/app-server/session-binding.test.ts new file mode 100644 index 00000000000..49c7abe2874 --- /dev/null +++ b/extensions/codex/app-server/session-binding.test.ts @@ -0,0 +1,52 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + clearCodexAppServerBinding, + readCodexAppServerBinding, + resolveCodexAppServerBindingPath, + writeCodexAppServerBinding, +} from "./session-binding.js"; + +let tempDir: string; + +describe("codex app-server session binding", () => { + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-binding-")); + }); + + afterEach(async () => { + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + it("round-trips the thread binding beside the PI session file", async () => { + const sessionFile = path.join(tempDir, "session.json"); + await writeCodexAppServerBinding(sessionFile, { + threadId: "thread-123", + cwd: tempDir, + model: "gpt-5.4-codex", + modelProvider: "openai", + dynamicToolsFingerprint: "tools-v1", + }); + + const binding = await readCodexAppServerBinding(sessionFile); + + expect(binding).toMatchObject({ + schemaVersion: 1, + threadId: "thread-123", + sessionFile, + cwd: tempDir, + model: "gpt-5.4-codex", + modelProvider: "openai", + dynamicToolsFingerprint: "tools-v1", + }); + await expect(fs.stat(resolveCodexAppServerBindingPath(sessionFile))).resolves.toBeTruthy(); + }); + + it("clears missing bindings without throwing", async () => { + const sessionFile = path.join(tempDir, "missing.json"); + await clearCodexAppServerBinding(sessionFile); + await expect(readCodexAppServerBinding(sessionFile)).resolves.toBeUndefined(); + }); +}); diff --git a/extensions/codex/app-server/session-binding.ts b/extensions/codex/app-server/session-binding.ts new file mode 100644 index 00000000000..7a69ef4f8a7 --- /dev/null +++ b/extensions/codex/app-server/session-binding.ts @@ -0,0 +1,98 @@ +import fs from "node:fs/promises"; +import { embeddedAgentLog } from "openclaw/plugin-sdk/agent-harness"; + +export type CodexAppServerThreadBinding = { + schemaVersion: 1; + threadId: string; + sessionFile: string; + cwd: string; + model?: string; + modelProvider?: string; + dynamicToolsFingerprint?: string; + createdAt: string; + updatedAt: string; +}; + +export function resolveCodexAppServerBindingPath(sessionFile: string): string { + return `${sessionFile}.codex-app-server.json`; +} + +export async function readCodexAppServerBinding( + sessionFile: string, +): Promise { + const path = resolveCodexAppServerBindingPath(sessionFile); + let raw: string; + try { + raw = await fs.readFile(path, "utf8"); + } catch (error) { + if (isNotFound(error)) { + return undefined; + } + embeddedAgentLog.warn("failed to read codex app-server binding", { path, error }); + return undefined; + } + try { + const parsed = JSON.parse(raw) as Partial; + if (parsed.schemaVersion !== 1 || typeof parsed.threadId !== "string") { + return undefined; + } + return { + schemaVersion: 1, + threadId: parsed.threadId, + sessionFile, + cwd: typeof parsed.cwd === "string" ? parsed.cwd : "", + model: typeof parsed.model === "string" ? parsed.model : undefined, + modelProvider: typeof parsed.modelProvider === "string" ? parsed.modelProvider : undefined, + dynamicToolsFingerprint: + typeof parsed.dynamicToolsFingerprint === "string" + ? parsed.dynamicToolsFingerprint + : undefined, + createdAt: typeof parsed.createdAt === "string" ? parsed.createdAt : new Date().toISOString(), + updatedAt: typeof parsed.updatedAt === "string" ? parsed.updatedAt : new Date().toISOString(), + }; + } catch (error) { + embeddedAgentLog.warn("failed to parse codex app-server binding", { path, error }); + return undefined; + } +} + +export async function writeCodexAppServerBinding( + sessionFile: string, + binding: Omit< + CodexAppServerThreadBinding, + "schemaVersion" | "sessionFile" | "createdAt" | "updatedAt" + > & { + createdAt?: string; + }, +): Promise { + const now = new Date().toISOString(); + const payload: CodexAppServerThreadBinding = { + schemaVersion: 1, + sessionFile, + threadId: binding.threadId, + cwd: binding.cwd, + model: binding.model, + modelProvider: binding.modelProvider, + dynamicToolsFingerprint: binding.dynamicToolsFingerprint, + createdAt: binding.createdAt ?? now, + updatedAt: now, + }; + await fs.writeFile( + resolveCodexAppServerBindingPath(sessionFile), + `${JSON.stringify(payload, null, 2)}\n`, + ); +} + +export async function clearCodexAppServerBinding(sessionFile: string): Promise { + try { + await fs.unlink(resolveCodexAppServerBindingPath(sessionFile)); + } catch (error) { + if (!isNotFound(error)) { + embeddedAgentLog.warn("failed to clear codex app-server binding", { sessionFile, error }); + } + } +} + +function isNotFound(error: unknown): boolean { + return Boolean(error && typeof error === "object" && "code" in error && error.code === "ENOENT"); +} diff --git a/extensions/codex/app-server/transcript-mirror.test.ts b/extensions/codex/app-server/transcript-mirror.test.ts new file mode 100644 index 00000000000..9d7e461724b --- /dev/null +++ b/extensions/codex/app-server/transcript-mirror.test.ts @@ -0,0 +1,98 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js"; + +let tempDir: string; + +describe("mirrorCodexAppServerTranscript", () => { + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-transcript-")); + }); + + afterEach(async () => { + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + it("mirrors user and assistant messages into the PI transcript", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "agent:main:session-1", + messages: [ + { role: "user", content: "hello", timestamp: 1 }, + { + role: "assistant", + content: [{ type: "text", text: "hi" }], + api: "openai-codex-responses", + provider: "openai-codex", + model: "gpt-5.4-codex", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: 2, + }, + ], + }); + + const records = (await fs.readFile(sessionFile, "utf8")) + .trim() + .split("\n") + .map((line) => JSON.parse(line) as { type?: string; message?: { role?: string } }); + expect(records[0]?.type).toBe("session"); + expect(records.slice(1).map((record) => record.message?.role)).toEqual(["user", "assistant"]); + }); + + it("deduplicates app-server turn mirrors by idempotency scope", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const messages = [ + { role: "user" as const, content: "hello", timestamp: 1 }, + { + role: "assistant" as const, + content: [{ type: "text" as const, text: "hi" }], + api: "openai-codex-responses", + provider: "openai-codex", + model: "gpt-5.4-codex", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop" as const, + timestamp: 2, + }, + ]; + + await mirrorCodexAppServerTranscript({ + sessionFile, + messages, + idempotencyScope: "codex-app-server:thread-1:turn-1", + }); + await mirrorCodexAppServerTranscript({ + sessionFile, + messages, + idempotencyScope: "codex-app-server:thread-1:turn-1", + }); + + const records = (await fs.readFile(sessionFile, "utf8")) + .trim() + .split("\n") + .map((line) => JSON.parse(line) as { message?: { role?: string; idempotencyKey?: string } }); + expect(records.slice(1).map((record) => record.message?.role)).toEqual(["user", "assistant"]); + expect(records.slice(1).map((record) => record.message?.idempotencyKey)).toEqual([ + "codex-app-server:thread-1:turn-1:user:0", + "codex-app-server:thread-1:turn-1:assistant:1", + ]); + }); +}); diff --git a/extensions/codex/app-server/transcript-mirror.ts b/extensions/codex/app-server/transcript-mirror.ts new file mode 100644 index 00000000000..0f8138dd324 --- /dev/null +++ b/extensions/codex/app-server/transcript-mirror.ts @@ -0,0 +1,83 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { + acquireSessionWriteLock, + emitSessionTranscriptUpdate, +} from "openclaw/plugin-sdk/agent-harness"; + +export async function mirrorCodexAppServerTranscript(params: { + sessionFile: string; + sessionKey?: string; + messages: AgentMessage[]; + idempotencyScope?: string; +}): Promise { + const messages = params.messages.filter( + (message) => message.role === "user" || message.role === "assistant", + ); + if (messages.length === 0) { + return; + } + + await fs.mkdir(path.dirname(params.sessionFile), { recursive: true }); + const lock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, + timeoutMs: 10_000, + }); + try { + const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile); + const sessionManager = SessionManager.open(params.sessionFile); + for (const [index, message] of messages.entries()) { + const idempotencyKey = params.idempotencyScope + ? `${params.idempotencyScope}:${message.role}:${index}` + : undefined; + if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) { + continue; + } + const transcriptMessage = { + ...message, + ...(idempotencyKey ? { idempotencyKey } : {}), + } as Parameters[0]; + sessionManager.appendMessage(transcriptMessage); + if (idempotencyKey) { + existingIdempotencyKeys.add(idempotencyKey); + } + } + } finally { + await lock.release(); + } + + if (params.sessionKey) { + emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey }); + } else { + emitSessionTranscriptUpdate(params.sessionFile); + } +} + +async function readTranscriptIdempotencyKeys(sessionFile: string): Promise> { + const keys = new Set(); + let raw: string; + try { + raw = await fs.readFile(sessionFile, "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + return keys; + } + for (const line of raw.split(/\r?\n/)) { + if (!line.trim()) { + continue; + } + try { + const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } }; + if (typeof parsed.message?.idempotencyKey === "string") { + keys.add(parsed.message.idempotencyKey); + } + } catch { + continue; + } + } + return keys; +} diff --git a/extensions/codex/harness.ts b/extensions/codex/harness.ts new file mode 100644 index 00000000000..ba4e0141acc --- /dev/null +++ b/extensions/codex/harness.ts @@ -0,0 +1,48 @@ +import type { AgentHarness } from "openclaw/plugin-sdk/agent-harness"; +import { listCodexAppServerModels } from "./app-server/client.js"; +import type { + CodexAppServerListModelsOptions, + CodexAppServerModel, + CodexAppServerModelListResult, +} from "./app-server/client.js"; +import { maybeCompactCodexAppServerSession } from "./app-server/compact.js"; +import { runCodexAppServerAttempt } from "./app-server/run-attempt.js"; +import { clearCodexAppServerBinding } from "./app-server/session-binding.js"; + +const DEFAULT_CODEX_HARNESS_PROVIDER_IDS = new Set(["codex", "openai-codex"]); + +export type { CodexAppServerListModelsOptions, CodexAppServerModel, CodexAppServerModelListResult }; +export { listCodexAppServerModels }; + +export function createCodexAppServerAgentHarness(options?: { + id?: string; + label?: string; + providerIds?: Iterable; +}): AgentHarness { + const providerIds = new Set( + [...(options?.providerIds ?? DEFAULT_CODEX_HARNESS_PROVIDER_IDS)].map((id) => + id.trim().toLowerCase(), + ), + ); + return { + id: options?.id ?? "codex", + label: options?.label ?? "Codex agent harness", + supports: (ctx) => { + const provider = ctx.provider.trim().toLowerCase(); + if (providerIds.has(provider)) { + return { supported: true, priority: 100 }; + } + return { + supported: false, + reason: `provider is not one of: ${[...providerIds].toSorted().join(", ")}`, + }; + }, + runAttempt: runCodexAppServerAttempt, + compact: maybeCompactCodexAppServerSession, + reset: async (params) => { + if (params.sessionFile) { + await clearCodexAppServerBinding(params.sessionFile); + } + }, + }; +} diff --git a/extensions/codex/index.test.ts b/extensions/codex/index.test.ts new file mode 100644 index 00000000000..db9d029a1bc --- /dev/null +++ b/extensions/codex/index.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTestPluginApi } from "../../test/helpers/plugins/plugin-api.js"; +import plugin from "./index.js"; + +describe("codex plugin", () => { + it("registers the codex provider and agent harness", () => { + const registerAgentHarness = vi.fn(); + const registerProvider = vi.fn(); + + plugin.register( + createTestPluginApi({ + id: "codex", + name: "Codex", + source: "test", + config: {}, + pluginConfig: {}, + runtime: {} as never, + registerAgentHarness, + registerProvider, + }), + ); + + expect(registerProvider.mock.calls[0]?.[0]).toMatchObject({ id: "codex", label: "Codex" }); + expect(registerAgentHarness.mock.calls[0]?.[0]).toMatchObject({ + id: "codex", + label: "Codex agent harness", + }); + }); +}); diff --git a/extensions/codex/index.ts b/extensions/codex/index.ts new file mode 100644 index 00000000000..f23e1e18f88 --- /dev/null +++ b/extensions/codex/index.ts @@ -0,0 +1,13 @@ +import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; +import { createCodexAppServerAgentHarness } from "./harness.js"; +import { buildCodexProvider } from "./provider.js"; + +export default definePluginEntry({ + id: "codex", + name: "Codex", + description: "Codex app-server harness and Codex-managed GPT model catalog.", + register(api) { + api.registerAgentHarness(createCodexAppServerAgentHarness()); + api.registerProvider(buildCodexProvider({ pluginConfig: api.pluginConfig })); + }, +}); diff --git a/extensions/codex/openclaw.plugin.json b/extensions/codex/openclaw.plugin.json new file mode 100644 index 00000000000..7c52147d19e --- /dev/null +++ b/extensions/codex/openclaw.plugin.json @@ -0,0 +1,44 @@ +{ + "id": "codex", + "enabledByDefault": true, + "name": "Codex", + "description": "Codex app-server harness and Codex-managed GPT model catalog.", + "providers": ["codex"], + "modelSupport": { + "providerIds": ["codex"], + "modelPrefixes": ["gpt-", "o1", "o3", "o4", "arcanine"] + }, + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": { + "discovery": { + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { "type": "boolean" }, + "timeoutMs": { + "type": "number", + "minimum": 1, + "default": 2500 + } + } + } + } + }, + "uiHints": { + "discovery": { + "label": "Model Discovery", + "help": "Plugin-owned controls for discovering Codex app-server models." + }, + "discovery.enabled": { + "label": "Enable Discovery", + "help": "When false, OpenClaw keeps the Codex harness available but uses the bundled fallback model list." + }, + "discovery.timeoutMs": { + "label": "Discovery Timeout", + "help": "Maximum time to wait for Codex app-server model discovery before falling back to the bundled model list.", + "advanced": true + } + } +} diff --git a/extensions/codex/package.json b/extensions/codex/package.json new file mode 100644 index 00000000000..06a41b09cc6 --- /dev/null +++ b/extensions/codex/package.json @@ -0,0 +1,17 @@ +{ + "name": "@openclaw/codex", + "version": "2026.4.9", + "description": "OpenClaw Codex harness and model provider plugin", + "type": "module", + "dependencies": { + "@mariozechner/pi-coding-agent": "0.65.2" + }, + "devDependencies": { + "@openclaw/plugin-sdk": "workspace:*" + }, + "openclaw": { + "extensions": [ + "./index.ts" + ] + } +} diff --git a/extensions/codex/provider.test.ts b/extensions/codex/provider.test.ts new file mode 100644 index 00000000000..2a8ef20e34c --- /dev/null +++ b/extensions/codex/provider.test.ts @@ -0,0 +1,109 @@ +import { describe, expect, it, vi } from "vitest"; +import { buildCodexProvider, buildCodexProviderCatalog } from "./provider.js"; + +describe("codex provider", () => { + it("maps Codex app-server models to a Codex provider catalog", async () => { + const listModels = vi.fn(async () => ({ + models: [ + { + id: "gpt-5.4", + model: "gpt-5.4", + displayName: "gpt-5.4", + hidden: false, + inputModalities: ["text", "image"], + supportedReasoningEfforts: ["low", "medium", "high", "xhigh"], + }, + { + id: "hidden-model", + model: "hidden-model", + hidden: true, + inputModalities: ["text"], + supportedReasoningEfforts: [], + }, + ], + })); + + const result = await buildCodexProviderCatalog({ + env: {}, + listModels, + pluginConfig: { discovery: { timeoutMs: 1234 } }, + }); + + expect(listModels).toHaveBeenCalledWith({ limit: 100, timeoutMs: 1234 }); + expect(result.provider).toMatchObject({ + auth: "token", + api: "openai-codex-responses", + models: [ + { + id: "gpt-5.4", + name: "gpt-5.4", + reasoning: true, + input: ["text", "image"], + compat: { supportsReasoningEffort: true }, + }, + ], + }); + }); + + it("keeps a static fallback catalog when discovery is disabled", async () => { + const listModels = vi.fn(); + + const result = await buildCodexProviderCatalog({ + env: {}, + listModels, + pluginConfig: { discovery: { enabled: false } }, + }); + + expect(listModels).not.toHaveBeenCalled(); + expect(result.provider.models.map((model) => model.id)).toEqual([ + "gpt-5.4", + "gpt-5.4-mini", + "gpt-5.2", + ]); + }); + + it("resolves arbitrary Codex app-server model ids through the codex provider", () => { + const provider = buildCodexProvider(); + + const model = provider.resolveDynamicModel?.({ + provider: "codex", + modelId: " arcanine ", + modelRegistry: { find: () => null }, + } as never); + + expect(model).toMatchObject({ + id: "arcanine", + provider: "codex", + api: "openai-codex-responses", + baseUrl: "https://chatgpt.com/backend-api", + input: ["text", "image"], + }); + }); + + it("treats o4 ids as reasoning-capable Codex models", () => { + const provider = buildCodexProvider(); + + const model = provider.resolveDynamicModel?.({ + provider: "codex", + modelId: "o4-mini", + modelRegistry: { find: () => null }, + } as never); + + expect(model).toMatchObject({ + id: "o4-mini", + reasoning: true, + compat: { supportsReasoningEffort: true }, + }); + expect(provider.supportsXHighThinking?.({ provider: "codex", modelId: "o4-mini" })).toBe(true); + }); + + it("declares synthetic auth because the harness owns Codex credentials", () => { + const provider = buildCodexProvider(); + + expect(provider.resolveSyntheticAuth?.({ provider: "codex" })).toEqual({ + apiKey: "codex-app-server", + source: "codex-app-server", + mode: "token", + }); + }); +}); diff --git a/extensions/codex/provider.ts b/extensions/codex/provider.ts new file mode 100644 index 00000000000..0b26a84cd1a --- /dev/null +++ b/extensions/codex/provider.ts @@ -0,0 +1,222 @@ +import type { ProviderRuntimeModel } from "openclaw/plugin-sdk/plugin-entry"; +import { + normalizeModelCompat, + type ModelDefinitionConfig, + type ModelProviderConfig, + type ProviderPlugin, +} from "openclaw/plugin-sdk/provider-model-shared"; +import { + listCodexAppServerModels, + type CodexAppServerModel, + type CodexAppServerModelListResult, +} from "./harness.js"; + +const PROVIDER_ID = "codex"; +const CODEX_BASE_URL = "https://chatgpt.com/backend-api"; +const DEFAULT_CONTEXT_WINDOW = 272_000; +const DEFAULT_MAX_TOKENS = 128_000; +const DEFAULT_DISCOVERY_TIMEOUT_MS = 2500; +const LIVE_DISCOVERY_ENV = "OPENCLAW_CODEX_DISCOVERY_LIVE"; + +type CodexPluginConfig = { + discovery?: { + enabled?: boolean; + timeoutMs?: number; + }; +}; + +type CodexModelLister = (options: { + timeoutMs: number; + limit?: number; +}) => Promise; + +type BuildCodexProviderOptions = { + pluginConfig?: unknown; + listModels?: CodexModelLister; +}; + +type BuildCatalogOptions = { + env?: NodeJS.ProcessEnv; + pluginConfig?: unknown; + listModels?: CodexModelLister; +}; + +const FALLBACK_CODEX_MODELS = [ + { + id: "gpt-5.4", + model: "gpt-5.4", + displayName: "gpt-5.4", + description: "Latest frontier agentic coding model.", + isDefault: true, + inputModalities: ["text", "image"], + supportedReasoningEfforts: ["low", "medium", "high", "xhigh"], + }, + { + id: "gpt-5.4-mini", + model: "gpt-5.4-mini", + displayName: "GPT-5.4-Mini", + description: "Smaller frontier agentic coding model.", + inputModalities: ["text", "image"], + supportedReasoningEfforts: ["low", "medium", "high", "xhigh"], + }, + { + id: "gpt-5.2", + model: "gpt-5.2", + displayName: "gpt-5.2", + inputModalities: ["text", "image"], + supportedReasoningEfforts: ["low", "medium", "high", "xhigh"], + }, +] satisfies CodexAppServerModel[]; + +export function buildCodexProvider(options: BuildCodexProviderOptions = {}): ProviderPlugin { + return { + id: PROVIDER_ID, + label: "Codex", + docsPath: "/providers/models", + auth: [], + catalog: { + order: "late", + run: async (ctx) => + buildCodexProviderCatalog({ + env: ctx.env, + pluginConfig: options.pluginConfig, + listModels: options.listModels, + }), + }, + resolveDynamicModel: (ctx) => resolveCodexDynamicModel(ctx.modelId), + resolveSyntheticAuth: () => ({ + apiKey: "codex-app-server", + source: "codex-app-server", + mode: "token", + }), + supportsXHighThinking: ({ modelId }) => isKnownXHighCodexModel(modelId), + isModernModelRef: ({ modelId }) => isModernCodexModel(modelId), + }; +} + +export async function buildCodexProviderCatalog( + options: BuildCatalogOptions = {}, +): Promise<{ provider: ModelProviderConfig }> { + const config = readCodexPluginConfig(options.pluginConfig); + const timeoutMs = normalizeTimeoutMs(config.discovery?.timeoutMs); + const discovered = + config.discovery?.enabled === false || shouldSkipLiveDiscovery(options.env) + ? [] + : await listModelsBestEffort({ + listModels: options.listModels ?? listCodexAppServerModels, + timeoutMs, + }); + const models = (discovered.length > 0 ? discovered : FALLBACK_CODEX_MODELS).map( + codexModelToDefinition, + ); + return { + provider: { + baseUrl: CODEX_BASE_URL, + auth: "token", + api: "openai-codex-responses", + models, + }, + }; +} + +function resolveCodexDynamicModel(modelId: string): ProviderRuntimeModel | undefined { + const id = modelId.trim(); + if (!id) { + return undefined; + } + return normalizeModelCompat({ + ...buildModelDefinition({ + id, + model: id, + inputModalities: ["text", "image"], + supportedReasoningEfforts: shouldDefaultToReasoningModel(id) ? ["medium"] : [], + }), + provider: PROVIDER_ID, + baseUrl: CODEX_BASE_URL, + } as ProviderRuntimeModel); +} + +function codexModelToDefinition(model: CodexAppServerModel): ModelDefinitionConfig { + return buildModelDefinition(model); +} + +function buildModelDefinition(model: { + id: string; + model: string; + displayName?: string; + inputModalities: string[]; + supportedReasoningEfforts: string[]; +}): ModelDefinitionConfig { + const id = model.id.trim() || model.model.trim(); + return { + id, + name: model.displayName?.trim() || id, + api: "openai-codex-responses", + reasoning: model.supportedReasoningEfforts.length > 0 || shouldDefaultToReasoningModel(id), + input: model.inputModalities.includes("image") ? ["text", "image"] : ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: DEFAULT_CONTEXT_WINDOW, + maxTokens: DEFAULT_MAX_TOKENS, + compat: { + supportsReasoningEffort: model.supportedReasoningEfforts.length > 0, + supportsUsageInStreaming: true, + }, + }; +} + +async function listModelsBestEffort(params: { + listModels: CodexModelLister; + timeoutMs: number; +}): Promise { + try { + const result = await params.listModels({ + timeoutMs: params.timeoutMs, + limit: 100, + }); + return result.models.filter((model) => !model.hidden); + } catch { + return []; + } +} + +function readCodexPluginConfig(value: unknown): CodexPluginConfig { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return {}; + } + return value as CodexPluginConfig; +} + +function normalizeTimeoutMs(value: unknown): number { + return typeof value === "number" && Number.isFinite(value) && value > 0 + ? value + : DEFAULT_DISCOVERY_TIMEOUT_MS; +} + +function shouldSkipLiveDiscovery(env: NodeJS.ProcessEnv = process.env): boolean { + return Boolean(env.VITEST) && env[LIVE_DISCOVERY_ENV] !== "1"; +} + +function shouldDefaultToReasoningModel(modelId: string): boolean { + const lower = modelId.toLowerCase(); + return ( + lower.startsWith("gpt-5") || + lower.startsWith("o1") || + lower.startsWith("o3") || + lower.startsWith("o4") + ); +} + +function isKnownXHighCodexModel(modelId: string): boolean { + const lower = modelId.trim().toLowerCase(); + return ( + lower.startsWith("gpt-5") || + lower.startsWith("o3") || + lower.startsWith("o4") || + lower.includes("codex") + ); +} + +function isModernCodexModel(modelId: string): boolean { + const lower = modelId.trim().toLowerCase(); + return lower === "gpt-5.4" || lower === "gpt-5.4-mini" || lower === "gpt-5.2"; +} diff --git a/extensions/codex/tsconfig.json b/extensions/codex/tsconfig.json new file mode 100644 index 00000000000..530d7fda72f --- /dev/null +++ b/extensions/codex/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../tsconfig.package-boundary.base.json", + "compilerOptions": { + "rootDir": "." + }, + "include": ["./*.ts", "./app-server/*.ts"], + "exclude": ["./**/*.test.ts", "./dist/**", "./node_modules/**"] +} diff --git a/scripts/check-codex-app-server-protocol.ts b/scripts/check-codex-app-server-protocol.ts new file mode 100644 index 00000000000..9b4b16ac03a --- /dev/null +++ b/scripts/check-codex-app-server-protocol.ts @@ -0,0 +1,78 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +const codexRepo = process.env.OPENCLAW_CODEX_REPO + ? path.resolve(process.env.OPENCLAW_CODEX_REPO) + : path.resolve(process.cwd(), "../codex"); +const schemaRoot = path.join(codexRepo, "codex-rs/app-server-protocol/schema/typescript"); + +const checks: Array<{ file: string; snippets: string[] }> = [ + { + file: "ServerRequest.ts", + snippets: [ + '"item/commandExecution/requestApproval"', + '"item/fileChange/requestApproval"', + '"item/permissions/requestApproval"', + '"item/tool/call"', + ], + }, + { + file: "v2/ThreadItem.ts", + snippets: [ + '"type": "contextCompaction"', + '"type": "dynamicToolCall"', + '"type": "commandExecution"', + '"type": "mcpToolCall"', + ], + }, + { + file: "v2/DynamicToolSpec.ts", + snippets: ["name: string", "description: string", "inputSchema: JsonValue"], + }, + { + file: "v2/CommandExecutionApprovalDecision.ts", + snippets: ['"accept"', '"acceptForSession"', '"decline"', '"cancel"'], + }, + { + file: "ReviewDecision.ts", + snippets: ['"approved"', '"approved_for_session"', '"denied"', '"abort"'], + }, + { + file: "v2/PlanDeltaNotification.ts", + snippets: ["itemId: string", "delta: string"], + }, + { + file: "v2/TurnPlanUpdatedNotification.ts", + snippets: ["explanation: string | null", "plan: Array"], + }, +]; + +const failures: string[] = []; + +for (const check of checks) { + const filePath = path.join(schemaRoot, check.file); + let text: string; + try { + text = await fs.readFile(filePath, "utf8"); + } catch (error) { + failures.push(`${check.file}: missing (${String(error)})`); + continue; + } + for (const snippet of check.snippets) { + if (!text.includes(snippet)) { + failures.push(`${check.file}: missing ${snippet}`); + } + } +} + +if (failures.length > 0) { + console.error("Codex app-server generated protocol drift:"); + for (const failure of failures) { + console.error(`- ${failure}`); + } + process.exit(1); +} + +console.log( + `Codex app-server generated protocol matches OpenClaw bridge assumptions: ${schemaRoot}`, +);