diff --git a/extensions/codex/harness.ts b/extensions/codex/harness.ts index bd9304cf0dc..5879e8d1089 100644 --- a/extensions/codex/harness.ts +++ b/extensions/codex/harness.ts @@ -1,11 +1,11 @@ import type { AgentHarness } from "openclaw/plugin-sdk/agent-harness"; -import { listCodexAppServerModels } from "./src/app-server/client.js"; +import { maybeCompactCodexAppServerSession } from "./src/app-server/compact.js"; +import { listCodexAppServerModels } from "./src/app-server/models.js"; import type { CodexAppServerListModelsOptions, CodexAppServerModel, CodexAppServerModelListResult, -} from "./src/app-server/client.js"; -import { maybeCompactCodexAppServerSession } from "./src/app-server/compact.js"; +} from "./src/app-server/models.js"; import { runCodexAppServerAttempt } from "./src/app-server/run-attempt.js"; import { clearCodexAppServerBinding } from "./src/app-server/session-binding.js"; diff --git a/extensions/codex/src/app-server/capabilities.ts b/extensions/codex/src/app-server/capabilities.ts new file mode 100644 index 00000000000..27bae16408d --- /dev/null +++ b/extensions/codex/src/app-server/capabilities.ts @@ -0,0 +1,21 @@ +export const CODEX_CONTROL_METHODS = { + account: "account/read", + compact: "thread/compact/start", + listMcpServers: "mcpServerStatus/list", + listSkills: "skills/list", + listThreads: "thread/list", + rateLimits: "account/rateLimits/read", + resumeThread: "thread/resume", + review: "review/start", +} as const; + +export type CodexControlName = keyof typeof CODEX_CONTROL_METHODS; +export type CodexControlMethod = (typeof CODEX_CONTROL_METHODS)[CodexControlName]; + +export function describeControlFailure(error: string): string { + return isUnsupportedControlError(error) ? "unsupported by this Codex app-server" : error; +} + +function isUnsupportedControlError(error: string): boolean { + return /method not found|unknown method|unsupported method|-32601/i.test(error); +} diff --git a/extensions/codex/src/app-server/client.test.ts b/extensions/codex/src/app-server/client.test.ts index 39e13c79db8..7b9d2a6c46d 100644 --- a/extensions/codex/src/app-server/client.test.ts +++ b/extensions/codex/src/app-server/client.test.ts @@ -4,11 +4,11 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { WebSocketServer, type RawData } from "ws"; import { CodexAppServerClient, - listCodexAppServerModels, MIN_CODEX_APP_SERVER_VERSION, readCodexVersionFromUserAgent, - resetSharedCodexAppServerClientForTests, } from "./client.js"; +import { listCodexAppServerModels } from "./models.js"; +import { resetSharedCodexAppServerClientForTests } from "./shared-client.js"; function createClientHarness() { const stdout = new PassThrough(); diff --git a/extensions/codex/src/app-server/client.ts b/extensions/codex/src/app-server/client.ts index 30ecd009dd9..220e7ffdadf 100644 --- a/extensions/codex/src/app-server/client.ts +++ b/extensions/codex/src/app-server/client.ts @@ -1,24 +1,18 @@ -import { spawn } from "node:child_process"; -import { EventEmitter } from "node:events"; import { createInterface, type Interface as ReadlineInterface } from "node:readline"; -import { PassThrough, Writable } from "node:stream"; import { embeddedAgentLog, OPENCLAW_VERSION } from "openclaw/plugin-sdk/agent-harness"; -import WebSocket, { type RawData } from "ws"; -import { - codexAppServerStartOptionsKey, - resolveCodexAppServerRuntimeOptions, - type CodexAppServerStartOptions, -} from "./config.js"; +import { resolveCodexAppServerRuntimeOptions, type CodexAppServerStartOptions } from "./config.js"; import { type CodexInitializeResponse, isRpcResponse, type CodexServerNotification, - type JsonObject, type JsonValue, type RpcMessage, type RpcRequest, type RpcResponse, } from "./protocol.js"; +import { createStdioTransport } from "./transport-stdio.js"; +import { createWebSocketTransport } from "./transport-websocket.js"; +import type { CodexAppServerTransport } from "./transport.js"; export const MIN_CODEX_APP_SERVER_VERSION = "0.118.0"; @@ -28,15 +22,6 @@ type PendingRequest = { reject: (error: Error) => void; }; -type CodexAppServerTransport = { - stdin: { write: (data: string) => unknown }; - stdout: NodeJS.ReadableStream; - stderr: NodeJS.ReadableStream; - killed?: boolean; - kill?: () => unknown; - once: (event: string, listener: (...args: unknown[]) => void) => unknown; -}; - export type CodexServerRequestHandler = ( request: Required> & { params?: JsonValue }, ) => Promise | JsonValue | undefined; @@ -45,37 +30,13 @@ export type CodexServerNotificationHandler = ( notification: CodexServerNotification, ) => Promise | void; -export type CodexAppServerModel = { - id: string; - model: string; - displayName?: string; - description?: string; - hidden?: boolean; - isDefault?: boolean; - inputModalities: string[]; - supportedReasoningEfforts: string[]; - defaultReasoningEffort?: string; -}; - -export type CodexAppServerModelListResult = { - models: CodexAppServerModel[]; - nextCursor?: string; -}; - -export type CodexAppServerListModelsOptions = { - limit?: number; - cursor?: string; - includeHidden?: boolean; - timeoutMs?: number; - startOptions?: CodexAppServerStartOptions; -}; - export class CodexAppServerClient { private readonly child: CodexAppServerTransport; private readonly lines: ReadlineInterface; private readonly pending = new Map(); private readonly requestHandlers = new Set(); private readonly notificationHandlers = new Set(); + private readonly closeHandlers = new Set<(client: CodexAppServerClient) => void>(); private nextId = 1; private initialized = false; private closed = false; @@ -112,11 +73,7 @@ export class CodexAppServerClient { if (startOptions.transport === "websocket") { return new CodexAppServerClient(createWebSocketTransport(startOptions)); } - const child = spawn(startOptions.command, startOptions.args, { - env: process.env, - stdio: ["pipe", "pipe", "pipe"], - }); - return new CodexAppServerClient(child); + return new CodexAppServerClient(createStdioTransport(startOptions)); } static fromTransportForTests(child: CodexAppServerTransport): CodexAppServerClient { @@ -174,6 +131,11 @@ export class CodexAppServerClient { return () => this.notificationHandlers.delete(handler); } + addCloseHandler(handler: (client: CodexAppServerClient) => void): () => void { + this.closeHandlers.add(handler); + return () => this.closeHandlers.delete(handler); + } + close(): void { this.closed = true; this.lines.close(); @@ -275,105 +237,12 @@ export class CodexAppServerClient { pending.reject(error); } this.pending.clear(); - clearSharedClientIfCurrent(this); - } -} - -let sharedClient: CodexAppServerClient | undefined; -let sharedClientPromise: Promise | undefined; -let sharedClientKey: string | undefined; - -export async function getSharedCodexAppServerClient(options?: { - startOptions?: CodexAppServerStartOptions; -}): Promise { - const startOptions = options?.startOptions ?? resolveCodexAppServerRuntimeOptions().start; - const key = codexAppServerStartOptionsKey(startOptions); - if (sharedClientKey && sharedClientKey !== key) { - clearSharedCodexAppServerClient(); - } - sharedClientKey = key; - sharedClientPromise ??= (async () => { - const client = CodexAppServerClient.start(startOptions); - sharedClient = client; - try { - await client.initialize(); - return client; - } catch (error) { - // Startup failures happen before callers own the shared client, so close - // the child here instead of leaving a rejected daemon attached to stdio. - client.close(); - throw error; + for (const handler of this.closeHandlers) { + handler(this); } - })(); - try { - return await sharedClientPromise; - } catch (error) { - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; - throw error; } } -export function resetSharedCodexAppServerClientForTests(): void { - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; -} - -export function clearSharedCodexAppServerClient(): void { - const client = sharedClient; - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; - client?.close(); -} - -function clearSharedClientIfCurrent(client: CodexAppServerClient): void { - if (sharedClient !== client) { - return; - } - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; -} - -export async function listCodexAppServerModels( - options: CodexAppServerListModelsOptions = {}, -): Promise { - const timeoutMs = options.timeoutMs ?? 2500; - return await withTimeout( - (async () => { - const client = await getSharedCodexAppServerClient({ startOptions: options.startOptions }); - const response = await client.request("model/list", { - limit: options.limit ?? null, - cursor: options.cursor ?? null, - includeHidden: options.includeHidden ?? null, - }); - return readModelListResult(response); - })(), - timeoutMs, - "codex app-server model/list timed out", - ); -} - -export async function requestCodexAppServerJson(params: { - method: string; - requestParams?: JsonValue; - timeoutMs?: number; - startOptions?: CodexAppServerStartOptions; -}): Promise { - const timeoutMs = params.timeoutMs ?? 60_000; - return await withTimeout( - (async () => { - const client = await getSharedCodexAppServerClient({ startOptions: params.startOptions }); - return await client.request(params.method, params.requestParams); - })(), - timeoutMs, - `codex app-server ${params.method} timed out`, - ); -} - export function defaultServerRequestResponse( request: Required> & { params?: JsonValue }, ): JsonValue { @@ -416,85 +285,6 @@ export function defaultServerRequestResponse( return {}; } -function readModelListResult(value: JsonValue | undefined): CodexAppServerModelListResult { - if (!isJsonObjectValue(value) || !Array.isArray(value.data)) { - return { models: [] }; - } - const models = value.data - .map((entry) => readCodexModel(entry)) - .filter((entry): entry is CodexAppServerModel => entry !== undefined); - const nextCursor = typeof value.nextCursor === "string" ? value.nextCursor : undefined; - return { models, ...(nextCursor ? { nextCursor } : {}) }; -} - -function readCodexModel(value: unknown): CodexAppServerModel | undefined { - if (!isJsonObjectValue(value)) { - return undefined; - } - const id = readNonEmptyString(value.id); - const model = readNonEmptyString(value.model) ?? id; - if (!id || !model) { - return undefined; - } - return { - id, - model, - ...(readNonEmptyString(value.displayName) - ? { displayName: readNonEmptyString(value.displayName) } - : {}), - ...(readNonEmptyString(value.description) - ? { description: readNonEmptyString(value.description) } - : {}), - ...(typeof value.hidden === "boolean" ? { hidden: value.hidden } : {}), - ...(typeof value.isDefault === "boolean" ? { isDefault: value.isDefault } : {}), - inputModalities: readStringArray(value.inputModalities), - supportedReasoningEfforts: readReasoningEfforts(value.supportedReasoningEfforts), - ...(readNonEmptyString(value.defaultReasoningEffort) - ? { defaultReasoningEffort: readNonEmptyString(value.defaultReasoningEffort) } - : {}), - }; -} - -function readReasoningEfforts(value: unknown): string[] { - if (!Array.isArray(value)) { - return []; - } - const efforts = value - .map((entry) => { - if (!isJsonObjectValue(entry)) { - return undefined; - } - return readNonEmptyString(entry.reasoningEffort); - }) - .filter((entry): entry is string => entry !== undefined); - return [...new Set(efforts)]; -} - -function readStringArray(value: unknown): string[] { - if (!Array.isArray(value)) { - return []; - } - return [ - ...new Set( - value - .map((entry) => readNonEmptyString(entry)) - .filter((entry): entry is string => entry !== undefined), - ), - ]; -} - -function readNonEmptyString(value: unknown): string | undefined { - if (typeof value !== "string") { - return undefined; - } - const trimmed = value.trim(); - return trimmed || undefined; -} - -function isJsonObjectValue(value: unknown): value is JsonObject { - return Boolean(value && typeof value === "object" && !Array.isArray(value)); -} - function assertSupportedCodexAppServerVersion(response: CodexInitializeResponse): void { const detectedVersion = readCodexVersionFromUserAgent(response.userAgent); if (!detectedVersion) { @@ -539,29 +329,6 @@ function numericVersionParts(version: string): number[] { .map((part) => (Number.isFinite(part) ? part : 0)); } -async function withTimeout( - promise: Promise, - timeoutMs: number, - timeoutMessage: string, -): Promise { - if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { - return await promise; - } - let timeout: NodeJS.Timeout | undefined; - try { - return await Promise.race([ - promise, - new Promise((_, reject) => { - timeout = setTimeout(() => reject(new Error(timeoutMessage)), Math.max(1, timeoutMs)); - }), - ]); - } finally { - if (timeout) { - clearTimeout(timeout); - } - } -} - export function isCodexAppServerApprovalRequest(method: string): boolean { return method.includes("requestApproval") || method.includes("Approval"); } @@ -575,86 +342,3 @@ function formatExitValue(value: unknown): string { } return "unknown"; } - -function createWebSocketTransport(options: CodexAppServerStartOptions): CodexAppServerTransport { - if (!options.url) { - throw new Error( - "codex app-server websocket transport requires plugins.entries.codex.config.appServer.url", - ); - } - const events = new EventEmitter(); - const stdout = new PassThrough(); - const stderr = new PassThrough(); - const headers = { - ...options.headers, - ...(options.authToken ? { Authorization: `Bearer ${options.authToken}` } : {}), - }; - const socket = new WebSocket(options.url, { headers }); - const pendingFrames: string[] = []; - let killed = false; - - const sendFrame = (frame: string) => { - const trimmed = frame.trim(); - if (!trimmed) { - return; - } - if (socket.readyState === WebSocket.OPEN) { - socket.send(trimmed); - return; - } - pendingFrames.push(trimmed); - }; - - // `initialize` can be written before the WebSocket open event fires. Buffer - // whole JSON-RPC frames so stdio and websocket transports share call timing. - socket.once("open", () => { - for (const frame of pendingFrames.splice(0)) { - socket.send(frame); - } - }); - socket.once("error", (error) => events.emit("error", error)); - socket.once("close", (code, reason) => { - killed = true; - events.emit("exit", code, reason.toString("utf8")); - }); - socket.on("message", (data) => { - const text = websocketFrameToText(data); - stdout.write(text.endsWith("\n") ? text : `${text}\n`); - }); - - const stdin = new Writable({ - write(chunk, _encoding, callback) { - for (const frame of chunk.toString("utf8").split("\n")) { - sendFrame(frame); - } - callback(); - }, - }); - - return { - stdin, - stdout, - stderr, - get killed() { - return killed; - }, - kill: () => { - killed = true; - socket.close(); - }, - once: (event, listener) => events.once(event, listener), - }; -} - -function websocketFrameToText(data: RawData): string { - if (typeof data === "string") { - return data; - } - if (Buffer.isBuffer(data)) { - return data.toString("utf8"); - } - if (Array.isArray(data)) { - return Buffer.concat(data).toString("utf8"); - } - return Buffer.from(data).toString("utf8"); -} diff --git a/extensions/codex/src/app-server/compact.ts b/extensions/codex/src/app-server/compact.ts index a1e9b871e3f..d2951dde196 100644 --- a/extensions/codex/src/app-server/compact.ts +++ b/extensions/codex/src/app-server/compact.ts @@ -4,14 +4,11 @@ import { type CompactEmbeddedPiSessionParams, type EmbeddedPiCompactResult, } from "openclaw/plugin-sdk/agent-harness"; -import { - getSharedCodexAppServerClient, - type CodexAppServerClient, - type CodexServerNotificationHandler, -} from "./client.js"; +import type { CodexAppServerClient, CodexServerNotificationHandler } from "./client.js"; import { resolveCodexAppServerRuntimeOptions, type CodexAppServerStartOptions } from "./config.js"; import { isJsonObject, type CodexServerNotification, type JsonObject } from "./protocol.js"; import { readCodexAppServerBinding } from "./session-binding.js"; +import { getSharedCodexAppServerClient } from "./shared-client.js"; type CodexAppServerClientFactory = ( startOptions?: CodexAppServerStartOptions, diff --git a/extensions/codex/src/app-server/models.ts b/extensions/codex/src/app-server/models.ts new file mode 100644 index 00000000000..cd5b5578f67 --- /dev/null +++ b/extensions/codex/src/app-server/models.ts @@ -0,0 +1,127 @@ +import type { CodexAppServerStartOptions } from "./config.js"; +import { type JsonObject, type JsonValue } from "./protocol.js"; +import { getSharedCodexAppServerClient } from "./shared-client.js"; +import { withTimeout } from "./timeout.js"; + +export type CodexAppServerModel = { + id: string; + model: string; + displayName?: string; + description?: string; + hidden?: boolean; + isDefault?: boolean; + inputModalities: string[]; + supportedReasoningEfforts: string[]; + defaultReasoningEffort?: string; +}; + +export type CodexAppServerModelListResult = { + models: CodexAppServerModel[]; + nextCursor?: string; +}; + +export type CodexAppServerListModelsOptions = { + limit?: number; + cursor?: string; + includeHidden?: boolean; + timeoutMs?: number; + startOptions?: CodexAppServerStartOptions; +}; + +export async function listCodexAppServerModels( + options: CodexAppServerListModelsOptions = {}, +): Promise { + const timeoutMs = options.timeoutMs ?? 2500; + return await withTimeout( + (async () => { + const client = await getSharedCodexAppServerClient({ startOptions: options.startOptions }); + const response = await client.request("model/list", { + limit: options.limit ?? null, + cursor: options.cursor ?? null, + includeHidden: options.includeHidden ?? null, + }); + return readModelListResult(response); + })(), + timeoutMs, + "codex app-server model/list timed out", + ); +} + +function readModelListResult(value: JsonValue | undefined): CodexAppServerModelListResult { + if (!isJsonObjectValue(value) || !Array.isArray(value.data)) { + return { models: [] }; + } + const models = value.data + .map((entry) => readCodexModel(entry)) + .filter((entry): entry is CodexAppServerModel => entry !== undefined); + const nextCursor = typeof value.nextCursor === "string" ? value.nextCursor : undefined; + return { models, ...(nextCursor ? { nextCursor } : {}) }; +} + +function readCodexModel(value: unknown): CodexAppServerModel | undefined { + if (!isJsonObjectValue(value)) { + return undefined; + } + const id = readNonEmptyString(value.id); + const model = readNonEmptyString(value.model) ?? id; + if (!id || !model) { + return undefined; + } + return { + id, + model, + ...(readNonEmptyString(value.displayName) + ? { displayName: readNonEmptyString(value.displayName) } + : {}), + ...(readNonEmptyString(value.description) + ? { description: readNonEmptyString(value.description) } + : {}), + ...(typeof value.hidden === "boolean" ? { hidden: value.hidden } : {}), + ...(typeof value.isDefault === "boolean" ? { isDefault: value.isDefault } : {}), + inputModalities: readStringArray(value.inputModalities), + supportedReasoningEfforts: readReasoningEfforts(value.supportedReasoningEfforts), + ...(readNonEmptyString(value.defaultReasoningEffort) + ? { defaultReasoningEffort: readNonEmptyString(value.defaultReasoningEffort) } + : {}), + }; +} + +function readReasoningEfforts(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + const efforts = value + .map((entry) => { + if (!isJsonObjectValue(entry)) { + return undefined; + } + return readNonEmptyString(entry.reasoningEffort); + }) + .filter((entry): entry is string => entry !== undefined); + return [...new Set(efforts)]; +} + +function readStringArray(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + return [ + ...new Set( + value + .map((entry) => readNonEmptyString(entry)) + .filter((entry): entry is string => entry !== undefined), + ), + ]; +} + +function readNonEmptyString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed || undefined; +} + +function isJsonObjectValue(value: unknown): value is JsonObject { + return Boolean(value && typeof value === "object" && !Array.isArray(value)); +} diff --git a/extensions/codex/src/app-server/request.ts b/extensions/codex/src/app-server/request.ts new file mode 100644 index 00000000000..dfa805e7543 --- /dev/null +++ b/extensions/codex/src/app-server/request.ts @@ -0,0 +1,21 @@ +import type { CodexAppServerStartOptions } from "./config.js"; +import type { JsonValue } from "./protocol.js"; +import { getSharedCodexAppServerClient } from "./shared-client.js"; +import { withTimeout } from "./timeout.js"; + +export async function requestCodexAppServerJson(params: { + method: string; + requestParams?: JsonValue; + timeoutMs?: number; + startOptions?: CodexAppServerStartOptions; +}): Promise { + const timeoutMs = params.timeoutMs ?? 60_000; + return await withTimeout( + (async () => { + const client = await getSharedCodexAppServerClient({ startOptions: params.startOptions }); + return await client.request(params.method, params.requestParams); + })(), + timeoutMs, + `codex app-server ${params.method} timed out`, + ); +} diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index 67123ae750c..9ecc678dcdc 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -9,7 +9,12 @@ import { } from "openclaw/plugin-sdk/agent-harness"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { CodexServerNotification } from "./protocol.js"; -import { runCodexAppServerAttempt, __testing } from "./run-attempt.js"; +import { + buildThreadResumeParams, + buildTurnStartParams, + runCodexAppServerAttempt, + __testing, +} from "./run-attempt.js"; import { writeCodexAppServerBinding } from "./session-binding.js"; let tempDir: string; @@ -391,4 +396,44 @@ describe("runCodexAppServerAttempt", () => { ]), ); }); + + it("builds resume and turn params from the currently selected OpenClaw model", () => { + const params = createParams("/tmp/session.jsonl", "/tmp/workspace"); + const appServer = { + start: { + transport: "stdio" as const, + command: "codex", + args: ["app-server", "--listen", "stdio://"], + headers: {}, + }, + requestTimeoutMs: 60_000, + approvalPolicy: "on-request" as const, + approvalsReviewer: "guardian_subagent" as const, + sandbox: "danger-full-access" as const, + serviceTier: "priority", + }; + + expect(buildThreadResumeParams(params, { threadId: "thread-1", appServer })).toEqual({ + threadId: "thread-1", + model: "gpt-5.4-codex", + modelProvider: "openai", + approvalPolicy: "on-request", + approvalsReviewer: "guardian_subagent", + sandbox: "danger-full-access", + serviceTier: "priority", + persistExtendedHistory: true, + }); + expect( + buildTurnStartParams(params, { threadId: "thread-1", cwd: "/tmp/workspace", appServer }), + ).toEqual( + expect.objectContaining({ + threadId: "thread-1", + cwd: "/tmp/workspace", + model: "gpt-5.4-codex", + approvalPolicy: "on-request", + approvalsReviewer: "guardian_subagent", + serviceTier: "priority", + }), + ); + }); }); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index f00e3136979..1ab35228d3d 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -18,12 +18,7 @@ import { type EmbeddedRunAttemptResult, } from "openclaw/plugin-sdk/agent-harness"; import { handleCodexAppServerApprovalRequest } from "./approval-bridge.js"; -import { - clearSharedCodexAppServerClient, - getSharedCodexAppServerClient, - isCodexAppServerApprovalRequest, - type CodexAppServerClient, -} from "./client.js"; +import { isCodexAppServerApprovalRequest, type CodexAppServerClient } from "./client.js"; import { resolveCodexAppServerRuntimeOptions, type CodexAppServerRuntimeOptions, @@ -35,8 +30,10 @@ import { isJsonObject, type CodexServerNotification, type CodexDynamicToolCallParams, + type CodexThreadResumeParams, type CodexThreadResumeResponse, type CodexThreadStartResponse, + type CodexTurnStartParams, type CodexTurnStartResponse, type CodexUserInput, type JsonObject, @@ -48,6 +45,7 @@ import { writeCodexAppServerBinding, type CodexAppServerThreadBinding, } from "./session-binding.js"; +import { clearSharedCodexAppServerClient, getSharedCodexAppServerClient } from "./shared-client.js"; import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js"; type CodexAppServerClientFactory = ( @@ -185,16 +183,14 @@ export async function runCodexAppServerAttempt( let turn: CodexTurnStartResponse; try { - turn = await client.request("turn/start", { - threadId: thread.threadId, - input: buildUserInput(params), - cwd: effectiveWorkspace, - approvalPolicy: appServer.approvalPolicy, - approvalsReviewer: appServer.approvalsReviewer, - model: params.modelId, - ...(appServer.serviceTier ? { serviceTier: appServer.serviceTier } : {}), - effort: resolveReasoningEffort(params.thinkLevel), - }); + turn = await client.request( + "turn/start", + buildTurnStartParams(params, { + threadId: thread.threadId, + cwd: effectiveWorkspace, + appServer, + }), + ); } catch (error) { notificationCleanup(); requestCleanup(); @@ -426,16 +422,13 @@ async function startOrResumeThread(params: { await clearCodexAppServerBinding(params.params.sessionFile); } else { try { - const response = await params.client.request("thread/resume", { - threadId: binding.threadId, - model: params.params.modelId, - modelProvider: normalizeModelProvider(params.params.provider), - approvalPolicy: params.appServer.approvalPolicy, - approvalsReviewer: params.appServer.approvalsReviewer, - sandbox: params.appServer.sandbox, - ...(params.appServer.serviceTier ? { serviceTier: params.appServer.serviceTier } : {}), - persistExtendedHistory: true, - }); + const response = await params.client.request( + "thread/resume", + buildThreadResumeParams(params.params, { + threadId: binding.threadId, + appServer: params.appServer, + }), + ); await writeCodexAppServerBinding(params.params.sessionFile, { threadId: response.thread.id, cwd: params.cwd, @@ -497,6 +490,45 @@ async function startOrResumeThread(params: { }; } +export function buildThreadResumeParams( + params: EmbeddedRunAttemptParams, + options: { + threadId: string; + appServer: CodexAppServerRuntimeOptions; + }, +): CodexThreadResumeParams { + return { + threadId: options.threadId, + model: params.modelId, + modelProvider: normalizeModelProvider(params.provider), + approvalPolicy: options.appServer.approvalPolicy, + approvalsReviewer: options.appServer.approvalsReviewer, + sandbox: options.appServer.sandbox, + ...(options.appServer.serviceTier ? { serviceTier: options.appServer.serviceTier } : {}), + persistExtendedHistory: true, + }; +} + +export function buildTurnStartParams( + params: EmbeddedRunAttemptParams, + options: { + threadId: string; + cwd: string; + appServer: CodexAppServerRuntimeOptions; + }, +): CodexTurnStartParams { + return { + threadId: options.threadId, + input: buildUserInput(params), + cwd: options.cwd, + approvalPolicy: options.appServer.approvalPolicy, + approvalsReviewer: options.appServer.approvalsReviewer, + model: params.modelId, + ...(options.appServer.serviceTier ? { serviceTier: options.appServer.serviceTier } : {}), + effort: resolveReasoningEffort(params.thinkLevel), + }; +} + function fingerprintDynamicTools(dynamicTools: JsonValue[]): string { return JSON.stringify(dynamicTools.map(stabilizeJsonValue)); } diff --git a/extensions/codex/src/app-server/shared-client.ts b/extensions/codex/src/app-server/shared-client.ts new file mode 100644 index 00000000000..90f1978de3c --- /dev/null +++ b/extensions/codex/src/app-server/shared-client.ts @@ -0,0 +1,66 @@ +import { CodexAppServerClient } from "./client.js"; +import { + codexAppServerStartOptionsKey, + resolveCodexAppServerRuntimeOptions, + type CodexAppServerStartOptions, +} from "./config.js"; + +let sharedClient: CodexAppServerClient | undefined; +let sharedClientPromise: Promise | undefined; +let sharedClientKey: string | undefined; + +export async function getSharedCodexAppServerClient(options?: { + startOptions?: CodexAppServerStartOptions; +}): Promise { + const startOptions = options?.startOptions ?? resolveCodexAppServerRuntimeOptions().start; + const key = codexAppServerStartOptionsKey(startOptions); + if (sharedClientKey && sharedClientKey !== key) { + clearSharedCodexAppServerClient(); + } + sharedClientKey = key; + sharedClientPromise ??= (async () => { + const client = CodexAppServerClient.start(startOptions); + sharedClient = client; + client.addCloseHandler(clearSharedClientIfCurrent); + try { + await client.initialize(); + return client; + } catch (error) { + // Startup failures happen before callers own the shared client, so close + // the child here instead of leaving a rejected daemon attached to stdio. + client.close(); + throw error; + } + })(); + try { + return await sharedClientPromise; + } catch (error) { + sharedClient = undefined; + sharedClientPromise = undefined; + sharedClientKey = undefined; + throw error; + } +} + +export function resetSharedCodexAppServerClientForTests(): void { + sharedClient = undefined; + sharedClientPromise = undefined; + sharedClientKey = undefined; +} + +export function clearSharedCodexAppServerClient(): void { + const client = sharedClient; + sharedClient = undefined; + sharedClientPromise = undefined; + sharedClientKey = undefined; + client?.close(); +} + +function clearSharedClientIfCurrent(client: CodexAppServerClient): void { + if (sharedClient !== client) { + return; + } + sharedClient = undefined; + sharedClientPromise = undefined; + sharedClientKey = undefined; +} diff --git a/extensions/codex/src/app-server/timeout.ts b/extensions/codex/src/app-server/timeout.ts new file mode 100644 index 00000000000..f87c4695a68 --- /dev/null +++ b/extensions/codex/src/app-server/timeout.ts @@ -0,0 +1,22 @@ +export async function withTimeout( + promise: Promise, + timeoutMs: number, + timeoutMessage: string, +): Promise { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return await promise; + } + let timeout: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeout = setTimeout(() => reject(new Error(timeoutMessage)), Math.max(1, timeoutMs)); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } +} diff --git a/extensions/codex/src/app-server/transport-stdio.ts b/extensions/codex/src/app-server/transport-stdio.ts new file mode 100644 index 00000000000..c9c5feb72b0 --- /dev/null +++ b/extensions/codex/src/app-server/transport-stdio.ts @@ -0,0 +1,10 @@ +import { spawn } from "node:child_process"; +import type { CodexAppServerStartOptions } from "./config.js"; +import type { CodexAppServerTransport } from "./transport.js"; + +export function createStdioTransport(options: CodexAppServerStartOptions): CodexAppServerTransport { + return spawn(options.command, options.args, { + env: process.env, + stdio: ["pipe", "pipe", "pipe"], + }); +} diff --git a/extensions/codex/src/app-server/transport-websocket.ts b/extensions/codex/src/app-server/transport-websocket.ts new file mode 100644 index 00000000000..041cfbb8974 --- /dev/null +++ b/extensions/codex/src/app-server/transport-websocket.ts @@ -0,0 +1,90 @@ +import { EventEmitter } from "node:events"; +import { PassThrough, Writable } from "node:stream"; +import WebSocket, { type RawData } from "ws"; +import type { CodexAppServerStartOptions } from "./config.js"; +import type { CodexAppServerTransport } from "./transport.js"; + +export function createWebSocketTransport( + options: CodexAppServerStartOptions, +): CodexAppServerTransport { + if (!options.url) { + throw new Error( + "codex app-server websocket transport requires plugins.entries.codex.config.appServer.url", + ); + } + const events = new EventEmitter(); + const stdout = new PassThrough(); + const stderr = new PassThrough(); + const headers = { + ...options.headers, + ...(options.authToken ? { Authorization: `Bearer ${options.authToken}` } : {}), + }; + const socket = new WebSocket(options.url, { headers }); + const pendingFrames: string[] = []; + let killed = false; + + const sendFrame = (frame: string) => { + const trimmed = frame.trim(); + if (!trimmed) { + return; + } + if (socket.readyState === WebSocket.OPEN) { + socket.send(trimmed); + return; + } + pendingFrames.push(trimmed); + }; + + // `initialize` can be written before the WebSocket open event fires. Buffer + // whole JSON-RPC frames so stdio and websocket transports share call timing. + socket.once("open", () => { + for (const frame of pendingFrames.splice(0)) { + socket.send(frame); + } + }); + socket.once("error", (error) => events.emit("error", error)); + socket.once("close", (code, reason) => { + killed = true; + events.emit("exit", code, reason.toString("utf8")); + }); + socket.on("message", (data) => { + const text = websocketFrameToText(data); + stdout.write(text.endsWith("\n") ? text : `${text}\n`); + }); + + const stdin = new Writable({ + write(chunk, _encoding, callback) { + for (const frame of chunk.toString("utf8").split("\n")) { + sendFrame(frame); + } + callback(); + }, + }); + + return { + stdin, + stdout, + stderr, + get killed() { + return killed; + }, + kill: () => { + killed = true; + socket.close(); + }, + once: (event, listener) => events.once(event, listener), + }; +} + +function websocketFrameToText(data: RawData): string { + if (typeof data === "string") { + return data; + } + if (Buffer.isBuffer(data)) { + return data.toString("utf8"); + } + if (Array.isArray(data)) { + return Buffer.concat(data).toString("utf8"); + } + return Buffer.from(data).toString("utf8"); +} diff --git a/extensions/codex/src/app-server/transport.ts b/extensions/codex/src/app-server/transport.ts new file mode 100644 index 00000000000..d7d14187bf2 --- /dev/null +++ b/extensions/codex/src/app-server/transport.ts @@ -0,0 +1,8 @@ +export type CodexAppServerTransport = { + stdin: { write: (data: string) => unknown }; + stdout: NodeJS.ReadableStream; + stderr: NodeJS.ReadableStream; + killed?: boolean; + kill?: () => unknown; + once: (event: string, listener: (...args: unknown[]) => void) => unknown; +}; diff --git a/extensions/codex/src/command-formatters.ts b/extensions/codex/src/command-formatters.ts new file mode 100644 index 00000000000..6cd39b2b404 --- /dev/null +++ b/extensions/codex/src/command-formatters.ts @@ -0,0 +1,157 @@ +import type { CodexAppServerModelListResult } from "./app-server/models.js"; +import { isJsonObject, type JsonObject, type JsonValue } from "./app-server/protocol.js"; +import type { SafeValue } from "./command-rpc.js"; + +export type CodexStatusProbes = { + models: SafeValue; + account: SafeValue; + limits: SafeValue; + mcps: SafeValue; + skills: SafeValue; +}; + +export function formatCodexStatus(probes: CodexStatusProbes): string { + const connected = + probes.models.ok || probes.account.ok || probes.limits.ok || probes.mcps.ok || probes.skills.ok; + const lines = [`Codex app-server: ${connected ? "connected" : "unavailable"}`]; + if (probes.models.ok) { + lines.push( + `Models: ${ + probes.models.value.models + .map((model) => model.id) + .slice(0, 8) + .join(", ") || "none" + }`, + ); + } else { + lines.push(`Models: ${probes.models.error}`); + } + lines.push( + `Account: ${probes.account.ok ? summarizeAccount(probes.account.value) : probes.account.error}`, + ); + lines.push( + `Rate limits: ${probes.limits.ok ? summarizeArrayLike(probes.limits.value) : probes.limits.error}`, + ); + lines.push( + `MCP servers: ${probes.mcps.ok ? summarizeArrayLike(probes.mcps.value) : probes.mcps.error}`, + ); + lines.push( + `Skills: ${probes.skills.ok ? summarizeArrayLike(probes.skills.value) : probes.skills.error}`, + ); + return lines.join("\n"); +} + +export function formatModels(result: CodexAppServerModelListResult): string { + if (result.models.length === 0) { + return "No Codex app-server models returned."; + } + return [ + "Codex models:", + ...result.models.map((model) => `- ${model.id}${model.isDefault ? " (default)" : ""}`), + ].join("\n"); +} + +export function formatThreads(response: JsonValue | undefined): string { + const threads = extractArray(response); + if (threads.length === 0) { + return "No Codex threads returned."; + } + return [ + "Codex threads:", + ...threads.slice(0, 10).map((thread) => { + const record = isJsonObject(thread) ? thread : {}; + const id = readString(record, "threadId") ?? readString(record, "id") ?? ""; + const title = + readString(record, "title") ?? readString(record, "name") ?? readString(record, "summary"); + const details = [ + readString(record, "model"), + readString(record, "cwd"), + readString(record, "updatedAt") ?? readString(record, "lastUpdatedAt"), + ].filter(Boolean); + return `- ${id}${title ? ` - ${title}` : ""}${ + details.length > 0 ? ` (${details.join(", ")})` : "" + }\n Resume: /codex resume ${id}`; + }), + ].join("\n"); +} + +export function formatAccount( + account: SafeValue, + limits: SafeValue, +): string { + return [ + `Account: ${account.ok ? summarizeAccount(account.value) : account.error}`, + `Rate limits: ${limits.ok ? summarizeArrayLike(limits.value) : limits.error}`, + ].join("\n"); +} + +export function formatList(response: JsonValue | undefined, label: string): string { + const entries = extractArray(response); + if (entries.length === 0) { + return `${label}: none returned.`; + } + return [ + `${label}:`, + ...entries.slice(0, 25).map((entry) => { + const record = isJsonObject(entry) ? entry : {}; + return `- ${readString(record, "name") ?? readString(record, "id") ?? JSON.stringify(entry)}`; + }), + ].join("\n"); +} + +export function buildHelp(): string { + return [ + "Codex commands:", + "- /codex status", + "- /codex models", + "- /codex threads [filter]", + "- /codex resume ", + "- /codex compact", + "- /codex review", + "- /codex account", + "- /codex mcp", + "- /codex skills", + ].join("\n"); +} + +function summarizeAccount(value: JsonValue | undefined): string { + if (!isJsonObject(value)) { + return "unavailable"; + } + return ( + readString(value, "email") ?? + readString(value, "accountEmail") ?? + readString(value, "planType") ?? + readString(value, "id") ?? + "available" + ); +} + +function summarizeArrayLike(value: JsonValue | undefined): string { + const entries = extractArray(value); + if (entries.length === 0) { + return "none returned"; + } + return `${entries.length}`; +} + +function extractArray(value: JsonValue | undefined): JsonValue[] { + if (Array.isArray(value)) { + return value; + } + if (!isJsonObject(value)) { + return []; + } + for (const key of ["data", "items", "threads", "models", "skills", "servers", "rateLimits"]) { + const child = value[key]; + if (Array.isArray(child)) { + return child; + } + } + return []; +} + +export function readString(record: JsonObject, key: string): string | undefined { + const value = record[key]; + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} diff --git a/extensions/codex/src/command-handlers.ts b/extensions/codex/src/command-handlers.ts new file mode 100644 index 00000000000..51463c2eba7 --- /dev/null +++ b/extensions/codex/src/command-handlers.ts @@ -0,0 +1,150 @@ +import type { PluginCommandContext } from "openclaw/plugin-sdk/plugin-entry"; +import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js"; +import { listCodexAppServerModels } from "./app-server/models.js"; +import { isJsonObject } from "./app-server/protocol.js"; +import { + readCodexAppServerBinding, + writeCodexAppServerBinding, +} from "./app-server/session-binding.js"; +import { + buildHelp, + formatAccount, + formatCodexStatus, + formatList, + formatModels, + formatThreads, + readString, +} from "./command-formatters.js"; +import { + codexControlRequest, + readCodexStatusProbes, + requestOptions, + safeCodexControlRequest, +} from "./command-rpc.js"; + +export async function handleCodexSubcommand( + ctx: PluginCommandContext, + options: { pluginConfig?: unknown }, +): Promise<{ text: string }> { + const [subcommand = "status", ...rest] = splitArgs(ctx.args); + const normalized = subcommand.toLowerCase(); + if (normalized === "help") { + return { text: buildHelp() }; + } + if (normalized === "status") { + return { text: formatCodexStatus(await readCodexStatusProbes(options.pluginConfig)) }; + } + if (normalized === "models") { + return { + text: formatModels(await listCodexAppServerModels(requestOptions(options.pluginConfig, 100))), + }; + } + if (normalized === "threads") { + return { text: await buildThreads(options.pluginConfig, rest.join(" ")) }; + } + if (normalized === "resume") { + return { text: await resumeThread(ctx, options.pluginConfig, rest[0]) }; + } + if (normalized === "compact") { + return { + text: await startThreadAction( + ctx, + options.pluginConfig, + CODEX_CONTROL_METHODS.compact, + "compaction", + ), + }; + } + if (normalized === "review") { + return { + text: await startThreadAction( + ctx, + options.pluginConfig, + CODEX_CONTROL_METHODS.review, + "review", + ), + }; + } + if (normalized === "mcp") { + return { + text: formatList( + await codexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.listMcpServers, { + limit: 100, + }), + "MCP servers", + ), + }; + } + if (normalized === "skills") { + return { + text: formatList( + await codexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.listSkills, {}), + "Codex skills", + ), + }; + } + if (normalized === "account") { + const [account, limits] = await Promise.all([ + safeCodexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.account, {}), + safeCodexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.rateLimits, {}), + ]); + return { text: formatAccount(account, limits) }; + } + return { text: `Unknown Codex command: ${subcommand}\n\n${buildHelp()}` }; +} + +async function buildThreads(pluginConfig: unknown, filter: string): Promise { + const response = await codexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.listThreads, { + limit: 10, + ...(filter.trim() ? { filter: filter.trim() } : {}), + }); + return formatThreads(response); +} + +async function resumeThread( + ctx: PluginCommandContext, + pluginConfig: unknown, + threadId: string | undefined, +): Promise { + const normalizedThreadId = threadId?.trim(); + if (!normalizedThreadId) { + return "Usage: /codex resume "; + } + if (!ctx.sessionFile) { + return "Cannot attach a Codex thread because this command did not include an OpenClaw session file."; + } + const response = await codexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.resumeThread, { + threadId: normalizedThreadId, + persistExtendedHistory: true, + }); + const thread = isJsonObject(response) && isJsonObject(response.thread) ? response.thread : {}; + const effectiveThreadId = readString(thread, "id") ?? normalizedThreadId; + await writeCodexAppServerBinding(ctx.sessionFile, { + threadId: effectiveThreadId, + cwd: readString(thread, "cwd") ?? "", + model: isJsonObject(response) ? readString(response, "model") : undefined, + modelProvider: isJsonObject(response) ? readString(response, "modelProvider") : undefined, + }); + return `Attached this OpenClaw session to Codex thread ${effectiveThreadId}.`; +} + +async function startThreadAction( + ctx: PluginCommandContext, + pluginConfig: unknown, + method: typeof CODEX_CONTROL_METHODS.compact | typeof CODEX_CONTROL_METHODS.review, + label: string, +): Promise { + if (!ctx.sessionFile) { + return `Cannot start Codex ${label} because this command did not include an OpenClaw session file.`; + } + const binding = await readCodexAppServerBinding(ctx.sessionFile); + if (!binding?.threadId) { + return `No Codex thread is attached to this OpenClaw session yet.`; + } + await codexControlRequest(pluginConfig, method, { threadId: binding.threadId }); + return `Started Codex ${label} for thread ${binding.threadId}.`; +} + +function splitArgs(value: string | undefined): string[] { + return (value ?? "").trim().split(/\s+/).filter(Boolean); +} diff --git a/extensions/codex/src/command-rpc.ts b/extensions/codex/src/command-rpc.ts new file mode 100644 index 00000000000..f04ccaa912a --- /dev/null +++ b/extensions/codex/src/command-rpc.ts @@ -0,0 +1,74 @@ +import { + CODEX_CONTROL_METHODS, + describeControlFailure, + type CodexControlMethod, +} from "./app-server/capabilities.js"; +import { resolveCodexAppServerRuntimeOptions } from "./app-server/config.js"; +import { listCodexAppServerModels } from "./app-server/models.js"; +import type { JsonValue } from "./app-server/protocol.js"; +import { requestCodexAppServerJson } from "./app-server/request.js"; + +export type SafeValue = { ok: true; value: T } | { ok: false; error: string }; + +export function requestOptions(pluginConfig: unknown, limit: number) { + const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig }); + return { + limit, + timeoutMs: runtime.requestTimeoutMs, + startOptions: runtime.start, + }; +} + +export async function codexControlRequest( + pluginConfig: unknown, + method: CodexControlMethod, + requestParams?: JsonValue, +): Promise { + const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig }); + return await requestCodexAppServerJson({ + method, + requestParams, + timeoutMs: runtime.requestTimeoutMs, + startOptions: runtime.start, + }); +} + +export async function safeCodexControlRequest( + pluginConfig: unknown, + method: CodexControlMethod, + requestParams?: JsonValue, +): Promise> { + return await safeValue( + async () => await codexControlRequest(pluginConfig, method, requestParams), + ); +} + +export async function safeCodexModelList(pluginConfig: unknown, limit: number) { + return await safeValue( + async () => await listCodexAppServerModels(requestOptions(pluginConfig, limit)), + ); +} + +export async function readCodexStatusProbes(pluginConfig: unknown) { + const [models, account, limits, mcps, skills] = await Promise.all([ + safeCodexModelList(pluginConfig, 20), + safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.account, {}), + safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.rateLimits, {}), + safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.listMcpServers, { limit: 100 }), + safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.listSkills, {}), + ]); + + return { models, account, limits, mcps, skills }; +} + +export async function safeValue(read: () => Promise): Promise> { + try { + return { ok: true, value: await read() }; + } catch (error) { + return { ok: false, error: describeControlFailure(formatError(error)) }; + } +} + +function formatError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/extensions/codex/src/commands.test.ts b/extensions/codex/src/commands.test.ts index 39b61b8c65a..c7d28fabd51 100644 --- a/extensions/codex/src/commands.test.ts +++ b/extensions/codex/src/commands.test.ts @@ -3,9 +3,25 @@ import os from "node:os"; import path from "node:path"; import type { PluginCommandContext } from "openclaw/plugin-sdk/plugin-entry"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { resetSharedCodexAppServerClientForTests } from "./app-server/client.js"; +import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js"; +import { resetSharedCodexAppServerClientForTests } from "./app-server/shared-client.js"; import { handleCodexCommand } from "./commands.js"; +const commandRpcMocks = vi.hoisted(() => ({ + codexControlRequest: vi.fn(), + readCodexStatusProbes: vi.fn(), + requestOptions: vi.fn((_pluginConfig: unknown, limit: number) => ({ limit })), + safeCodexControlRequest: vi.fn(), + safeCodexModelList: vi.fn(), +})); + +const modelMocks = vi.hoisted(() => ({ + listCodexAppServerModels: vi.fn(), +})); + +vi.mock("./command-rpc.js", () => commandRpcMocks); +vi.mock("./app-server/models.js", () => modelMocks); + let tempDir: string; function createContext(args: string, sessionFile?: string): PluginCommandContext { @@ -25,28 +41,35 @@ function createContext(args: string, sessionFile?: string): PluginCommandContext describe("codex command", () => { beforeEach(async () => { tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-command-")); + commandRpcMocks.codexControlRequest.mockReset(); + commandRpcMocks.readCodexStatusProbes.mockReset(); + commandRpcMocks.requestOptions.mockReset(); + commandRpcMocks.requestOptions.mockImplementation((_pluginConfig: unknown, limit: number) => ({ + limit, + })); + commandRpcMocks.safeCodexControlRequest.mockReset(); + commandRpcMocks.safeCodexModelList.mockReset(); + modelMocks.listCodexAppServerModels.mockReset(); }); afterEach(async () => { resetSharedCodexAppServerClientForTests(); - vi.restoreAllMocks(); await fs.rm(tempDir, { recursive: true, force: true }); }); it("attaches the current session to an existing Codex thread", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const requests: Array<{ method: string; params: unknown }> = []; - vi.spyOn( - await import("./app-server/client.js"), - "requestCodexAppServerJson", - ).mockImplementation(async ({ method, requestParams }) => { - requests.push({ method, params: requestParams }); - return { - thread: { id: "thread-123", cwd: "/repo" }, - model: "gpt-5.4", - modelProvider: "openai", - }; - }); + commandRpcMocks.codexControlRequest.mockImplementation( + async (_pluginConfig: unknown, method: string, requestParams: unknown) => { + requests.push({ method: String(method), params: requestParams }); + return { + thread: { id: "thread-123", cwd: "/repo" }, + model: "gpt-5.4", + modelProvider: "openai", + }; + }, + ); await expect( handleCodexCommand(createContext("resume thread-123", sessionFile)), @@ -66,7 +89,7 @@ describe("codex command", () => { }); it("shows model ids from Codex app-server", async () => { - vi.spyOn(await import("./app-server/client.js"), "listCodexAppServerModels").mockResolvedValue({ + modelMocks.listCodexAppServerModels.mockResolvedValue({ models: [ { id: "gpt-5.4", @@ -81,4 +104,76 @@ describe("codex command", () => { text: "Codex models:\n- gpt-5.4", }); }); + + it("reports status unavailable when every Codex probe fails", async () => { + const offline = { ok: false as const, error: "offline" }; + commandRpcMocks.readCodexStatusProbes.mockResolvedValue({ + models: offline, + account: offline, + limits: offline, + mcps: offline, + skills: offline, + }); + + await expect(handleCodexCommand(createContext("status"))).resolves.toEqual({ + text: [ + "Codex app-server: unavailable", + "Models: offline", + "Account: offline", + "Rate limits: offline", + "MCP servers: offline", + "Skills: offline", + ].join("\n"), + }); + }); + + it("starts compaction for the attached Codex thread", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + await fs.writeFile( + `${sessionFile}.codex-app-server.json`, + JSON.stringify({ schemaVersion: 1, threadId: "thread-123", cwd: "/repo" }), + ); + commandRpcMocks.codexControlRequest.mockResolvedValue({}); + + await expect(handleCodexCommand(createContext("compact", sessionFile))).resolves.toEqual({ + text: "Started Codex compaction for thread thread-123.", + }); + expect(commandRpcMocks.codexControlRequest).toHaveBeenCalledWith( + undefined, + CODEX_CONTROL_METHODS.compact, + { + threadId: "thread-123", + }, + ); + }); + + it("explains compaction when no Codex thread is attached", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + + await expect(handleCodexCommand(createContext("compact", sessionFile))).resolves.toEqual({ + text: "No Codex thread is attached to this OpenClaw session yet.", + }); + }); + + it("passes filters to Codex thread listing", async () => { + commandRpcMocks.codexControlRequest.mockResolvedValue({ + data: [{ id: "thread-123", title: "Fix the thing", model: "gpt-5.4", cwd: "/repo" }], + }); + + await expect(handleCodexCommand(createContext("threads fix"))).resolves.toEqual({ + text: [ + "Codex threads:", + "- thread-123 - Fix the thing (gpt-5.4, /repo)", + " Resume: /codex resume thread-123", + ].join("\n"), + }); + expect(commandRpcMocks.codexControlRequest).toHaveBeenCalledWith( + undefined, + CODEX_CONTROL_METHODS.listThreads, + { + limit: 10, + filter: "fix", + }, + ); + }); }); diff --git a/extensions/codex/src/commands.ts b/extensions/codex/src/commands.ts index 2ae805bee88..ad4d20a8a82 100644 --- a/extensions/codex/src/commands.ts +++ b/extensions/codex/src/commands.ts @@ -2,13 +2,7 @@ import type { OpenClawPluginCommandDefinition, PluginCommandContext, } from "openclaw/plugin-sdk/plugin-entry"; -import { listCodexAppServerModels, requestCodexAppServerJson } from "./app-server/client.js"; -import { resolveCodexAppServerRuntimeOptions } from "./app-server/config.js"; -import { isJsonObject, type JsonObject, type JsonValue } from "./app-server/protocol.js"; -import { - readCodexAppServerBinding, - writeCodexAppServerBinding, -} from "./app-server/session-binding.js"; +import { handleCodexSubcommand } from "./command-handlers.js"; export function createCodexCommand(options: { pluginConfig?: unknown; @@ -26,274 +20,5 @@ export async function handleCodexCommand( ctx: PluginCommandContext, options: { pluginConfig?: unknown } = {}, ): Promise<{ text: string }> { - const [subcommand = "status", ...rest] = splitArgs(ctx.args); - const normalized = subcommand.toLowerCase(); - if (normalized === "help") { - return { text: buildHelp() }; - } - if (normalized === "status") { - return { text: await buildStatus(options.pluginConfig) }; - } - if (normalized === "models") { - return { text: await buildModels(options.pluginConfig) }; - } - if (normalized === "threads") { - return { text: await buildThreads(options.pluginConfig, rest.join(" ")) }; - } - if (normalized === "resume") { - return { text: await resumeThread(ctx, options.pluginConfig, rest[0]) }; - } - if (normalized === "compact") { - return { - text: await startThreadAction( - ctx, - options.pluginConfig, - "thread/compact/start", - "compaction", - ), - }; - } - if (normalized === "review") { - return { text: await startThreadAction(ctx, options.pluginConfig, "review/start", "review") }; - } - if (normalized === "mcp") { - return { text: await buildList(options.pluginConfig, "mcpServerStatus/list", "MCP servers") }; - } - if (normalized === "skills") { - return { text: await buildList(options.pluginConfig, "skills/list", "Codex skills") }; - } - if (normalized === "account") { - return { text: await buildAccount(options.pluginConfig) }; - } - return { text: `Unknown Codex command: ${subcommand}\n\n${buildHelp()}` }; -} - -async function buildStatus(pluginConfig: unknown): Promise { - const [models, account, limits, mcps, skills] = await Promise.all([ - safeValue(() => listCodexAppServerModels(requestOptions(pluginConfig, 20))), - safeValue(() => codexRequest(pluginConfig, "account/read", {})), - safeValue(() => codexRequest(pluginConfig, "account/rateLimits/read", {})), - safeValue(() => codexRequest(pluginConfig, "mcpServerStatus/list", { limit: 100 })), - safeValue(() => codexRequest(pluginConfig, "skills/list", {})), - ]); - - const connected = models.ok || account.ok || limits.ok || mcps.ok || skills.ok; - const lines = [`Codex app-server: ${connected ? "connected" : "unavailable"}`]; - if (models.ok) { - lines.push( - `Models: ${ - models.value.models - .map((model) => model.id) - .slice(0, 8) - .join(", ") || "none" - }`, - ); - } else { - lines.push(`Models: ${models.error}`); - } - lines.push(`Account: ${account.ok ? summarizeAccount(account.value) : account.error}`); - lines.push(`Rate limits: ${limits.ok ? summarizeArrayLike(limits.value) : limits.error}`); - lines.push(`MCP servers: ${mcps.ok ? summarizeArrayLike(mcps.value) : mcps.error}`); - lines.push(`Skills: ${skills.ok ? summarizeArrayLike(skills.value) : skills.error}`); - return lines.join("\n"); -} - -async function buildModels(pluginConfig: unknown): Promise { - const result = await listCodexAppServerModels(requestOptions(pluginConfig, 100)); - if (result.models.length === 0) { - return "No Codex app-server models returned."; - } - return [ - "Codex models:", - ...result.models.map((model) => `- ${model.id}${model.isDefault ? " (default)" : ""}`), - ].join("\n"); -} - -async function buildThreads(pluginConfig: unknown, filter: string): Promise { - const response = await codexRequest(pluginConfig, "thread/list", { - limit: 10, - ...(filter.trim() ? { filter: filter.trim() } : {}), - }); - const threads = extractArray(response); - if (threads.length === 0) { - return "No Codex threads returned."; - } - return [ - "Codex threads:", - ...threads.slice(0, 10).map((thread) => { - const record = isJsonObject(thread) ? thread : {}; - const id = readString(record, "threadId") ?? readString(record, "id") ?? ""; - const title = - readString(record, "title") ?? readString(record, "name") ?? readString(record, "summary"); - return `- ${id}${title ? ` - ${title}` : ""}`; - }), - ].join("\n"); -} - -async function buildAccount(pluginConfig: unknown): Promise { - const [account, limits] = await Promise.all([ - safeValue(() => codexRequest(pluginConfig, "account/read", {})), - safeValue(() => codexRequest(pluginConfig, "account/rateLimits/read", {})), - ]); - return [ - `Account: ${account.ok ? summarizeAccount(account.value) : account.error}`, - `Rate limits: ${limits.ok ? summarizeArrayLike(limits.value) : limits.error}`, - ].join("\n"); -} - -async function buildList(pluginConfig: unknown, method: string, label: string): Promise { - const response = await codexRequest(pluginConfig, method, { limit: 100 }); - const entries = extractArray(response); - if (entries.length === 0) { - return `${label}: none returned.`; - } - return [ - `${label}:`, - ...entries.slice(0, 25).map((entry) => { - const record = isJsonObject(entry) ? entry : {}; - return `- ${readString(record, "name") ?? readString(record, "id") ?? JSON.stringify(entry)}`; - }), - ].join("\n"); -} - -async function resumeThread( - ctx: PluginCommandContext, - pluginConfig: unknown, - threadId: string | undefined, -): Promise { - const normalizedThreadId = threadId?.trim(); - if (!normalizedThreadId) { - return "Usage: /codex resume "; - } - if (!ctx.sessionFile) { - return "Cannot attach a Codex thread because this command did not include an OpenClaw session file."; - } - const response = await codexRequest(pluginConfig, "thread/resume", { - threadId: normalizedThreadId, - persistExtendedHistory: true, - }); - const thread = isJsonObject(response) && isJsonObject(response.thread) ? response.thread : {}; - const effectiveThreadId = readString(thread, "id") ?? normalizedThreadId; - await writeCodexAppServerBinding(ctx.sessionFile, { - threadId: effectiveThreadId, - cwd: readString(thread, "cwd") ?? "", - model: isJsonObject(response) ? readString(response, "model") : undefined, - modelProvider: isJsonObject(response) ? readString(response, "modelProvider") : undefined, - }); - return `Attached this OpenClaw session to Codex thread ${effectiveThreadId}.`; -} - -async function startThreadAction( - ctx: PluginCommandContext, - pluginConfig: unknown, - method: string, - label: string, -): Promise { - if (!ctx.sessionFile) { - return `Cannot start Codex ${label} because this command did not include an OpenClaw session file.`; - } - const binding = await readCodexAppServerBinding(ctx.sessionFile); - if (!binding?.threadId) { - return `No Codex thread is attached to this OpenClaw session yet.`; - } - await codexRequest(pluginConfig, method, { threadId: binding.threadId }); - return `Started Codex ${label} for thread ${binding.threadId}.`; -} - -async function codexRequest( - pluginConfig: unknown, - method: string, - requestParams?: JsonValue, -): Promise { - const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig }); - return await requestCodexAppServerJson({ - method, - requestParams, - timeoutMs: runtime.requestTimeoutMs, - startOptions: runtime.start, - }); -} - -function requestOptions(pluginConfig: unknown, limit: number) { - const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig }); - return { - limit, - timeoutMs: runtime.requestTimeoutMs, - startOptions: runtime.start, - }; -} - -async function safeValue( - read: () => Promise, -): Promise<{ ok: true; value: T } | { ok: false; error: string }> { - try { - return { ok: true, value: await read() }; - } catch (error) { - return { ok: false, error: formatError(error) }; - } -} - -function summarizeAccount(value: JsonValue | undefined): string { - if (!isJsonObject(value)) { - return "unavailable"; - } - return ( - readString(value, "email") ?? - readString(value, "accountEmail") ?? - readString(value, "planType") ?? - readString(value, "id") ?? - "available" - ); -} - -function summarizeArrayLike(value: JsonValue | undefined): string { - const entries = extractArray(value); - if (entries.length === 0) { - return "none returned"; - } - return `${entries.length}`; -} - -function extractArray(value: JsonValue | undefined): JsonValue[] { - if (Array.isArray(value)) { - return value; - } - if (!isJsonObject(value)) { - return []; - } - for (const key of ["data", "items", "threads", "models", "skills", "servers", "rateLimits"]) { - const child = value[key]; - if (Array.isArray(child)) { - return child; - } - } - return []; -} - -function readString(record: JsonObject, key: string): string | undefined { - const value = record[key]; - return typeof value === "string" && value.trim() ? value.trim() : undefined; -} - -function splitArgs(value: string | undefined): string[] { - return (value ?? "").trim().split(/\s+/).filter(Boolean); -} - -function formatError(error: unknown): string { - return error instanceof Error ? error.message : String(error); -} - -function buildHelp(): string { - return [ - "Codex commands:", - "- /codex status", - "- /codex models", - "- /codex threads [filter]", - "- /codex resume ", - "- /codex compact", - "- /codex review", - "- /codex account", - "- /codex mcp", - "- /codex skills", - ].join("\n"); + return await handleCodexSubcommand(ctx, options); }