refactor: split Codex app-server modules

This commit is contained in:
Peter Steinberger
2026-04-10 22:43:06 +01:00
parent e9684c22c1
commit 8d72aafdbb
19 changed files with 981 additions and 657 deletions

View File

@@ -1,11 +1,11 @@
import type { AgentHarness } from "openclaw/plugin-sdk/agent-harness";
import { listCodexAppServerModels } from "./src/app-server/client.js";
import { maybeCompactCodexAppServerSession } from "./src/app-server/compact.js";
import { listCodexAppServerModels } from "./src/app-server/models.js";
import type {
CodexAppServerListModelsOptions,
CodexAppServerModel,
CodexAppServerModelListResult,
} from "./src/app-server/client.js";
import { maybeCompactCodexAppServerSession } from "./src/app-server/compact.js";
} from "./src/app-server/models.js";
import { runCodexAppServerAttempt } from "./src/app-server/run-attempt.js";
import { clearCodexAppServerBinding } from "./src/app-server/session-binding.js";

View File

@@ -0,0 +1,21 @@
export const CODEX_CONTROL_METHODS = {
account: "account/read",
compact: "thread/compact/start",
listMcpServers: "mcpServerStatus/list",
listSkills: "skills/list",
listThreads: "thread/list",
rateLimits: "account/rateLimits/read",
resumeThread: "thread/resume",
review: "review/start",
} as const;
export type CodexControlName = keyof typeof CODEX_CONTROL_METHODS;
export type CodexControlMethod = (typeof CODEX_CONTROL_METHODS)[CodexControlName];
export function describeControlFailure(error: string): string {
return isUnsupportedControlError(error) ? "unsupported by this Codex app-server" : error;
}
function isUnsupportedControlError(error: string): boolean {
return /method not found|unknown method|unsupported method|-32601/i.test(error);
}

View File

@@ -4,11 +4,11 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import { WebSocketServer, type RawData } from "ws";
import {
CodexAppServerClient,
listCodexAppServerModels,
MIN_CODEX_APP_SERVER_VERSION,
readCodexVersionFromUserAgent,
resetSharedCodexAppServerClientForTests,
} from "./client.js";
import { listCodexAppServerModels } from "./models.js";
import { resetSharedCodexAppServerClientForTests } from "./shared-client.js";
function createClientHarness() {
const stdout = new PassThrough();

View File

@@ -1,24 +1,18 @@
import { spawn } from "node:child_process";
import { EventEmitter } from "node:events";
import { createInterface, type Interface as ReadlineInterface } from "node:readline";
import { PassThrough, Writable } from "node:stream";
import { embeddedAgentLog, OPENCLAW_VERSION } from "openclaw/plugin-sdk/agent-harness";
import WebSocket, { type RawData } from "ws";
import {
codexAppServerStartOptionsKey,
resolveCodexAppServerRuntimeOptions,
type CodexAppServerStartOptions,
} from "./config.js";
import { resolveCodexAppServerRuntimeOptions, type CodexAppServerStartOptions } from "./config.js";
import {
type CodexInitializeResponse,
isRpcResponse,
type CodexServerNotification,
type JsonObject,
type JsonValue,
type RpcMessage,
type RpcRequest,
type RpcResponse,
} from "./protocol.js";
import { createStdioTransport } from "./transport-stdio.js";
import { createWebSocketTransport } from "./transport-websocket.js";
import type { CodexAppServerTransport } from "./transport.js";
export const MIN_CODEX_APP_SERVER_VERSION = "0.118.0";
@@ -28,15 +22,6 @@ type PendingRequest = {
reject: (error: Error) => void;
};
type CodexAppServerTransport = {
stdin: { write: (data: string) => unknown };
stdout: NodeJS.ReadableStream;
stderr: NodeJS.ReadableStream;
killed?: boolean;
kill?: () => unknown;
once: (event: string, listener: (...args: unknown[]) => void) => unknown;
};
export type CodexServerRequestHandler = (
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
) => Promise<JsonValue | undefined> | JsonValue | undefined;
@@ -45,37 +30,13 @@ export type CodexServerNotificationHandler = (
notification: CodexServerNotification,
) => Promise<void> | void;
export type CodexAppServerModel = {
id: string;
model: string;
displayName?: string;
description?: string;
hidden?: boolean;
isDefault?: boolean;
inputModalities: string[];
supportedReasoningEfforts: string[];
defaultReasoningEffort?: string;
};
export type CodexAppServerModelListResult = {
models: CodexAppServerModel[];
nextCursor?: string;
};
export type CodexAppServerListModelsOptions = {
limit?: number;
cursor?: string;
includeHidden?: boolean;
timeoutMs?: number;
startOptions?: CodexAppServerStartOptions;
};
export class CodexAppServerClient {
private readonly child: CodexAppServerTransport;
private readonly lines: ReadlineInterface;
private readonly pending = new Map<number | string, PendingRequest>();
private readonly requestHandlers = new Set<CodexServerRequestHandler>();
private readonly notificationHandlers = new Set<CodexServerNotificationHandler>();
private readonly closeHandlers = new Set<(client: CodexAppServerClient) => void>();
private nextId = 1;
private initialized = false;
private closed = false;
@@ -112,11 +73,7 @@ export class CodexAppServerClient {
if (startOptions.transport === "websocket") {
return new CodexAppServerClient(createWebSocketTransport(startOptions));
}
const child = spawn(startOptions.command, startOptions.args, {
env: process.env,
stdio: ["pipe", "pipe", "pipe"],
});
return new CodexAppServerClient(child);
return new CodexAppServerClient(createStdioTransport(startOptions));
}
static fromTransportForTests(child: CodexAppServerTransport): CodexAppServerClient {
@@ -174,6 +131,11 @@ export class CodexAppServerClient {
return () => this.notificationHandlers.delete(handler);
}
addCloseHandler(handler: (client: CodexAppServerClient) => void): () => void {
this.closeHandlers.add(handler);
return () => this.closeHandlers.delete(handler);
}
close(): void {
this.closed = true;
this.lines.close();
@@ -275,105 +237,12 @@ export class CodexAppServerClient {
pending.reject(error);
}
this.pending.clear();
clearSharedClientIfCurrent(this);
}
}
let sharedClient: CodexAppServerClient | undefined;
let sharedClientPromise: Promise<CodexAppServerClient> | undefined;
let sharedClientKey: string | undefined;
export async function getSharedCodexAppServerClient(options?: {
startOptions?: CodexAppServerStartOptions;
}): Promise<CodexAppServerClient> {
const startOptions = options?.startOptions ?? resolveCodexAppServerRuntimeOptions().start;
const key = codexAppServerStartOptionsKey(startOptions);
if (sharedClientKey && sharedClientKey !== key) {
clearSharedCodexAppServerClient();
}
sharedClientKey = key;
sharedClientPromise ??= (async () => {
const client = CodexAppServerClient.start(startOptions);
sharedClient = client;
try {
await client.initialize();
return client;
} catch (error) {
// Startup failures happen before callers own the shared client, so close
// the child here instead of leaving a rejected daemon attached to stdio.
client.close();
throw error;
for (const handler of this.closeHandlers) {
handler(this);
}
})();
try {
return await sharedClientPromise;
} catch (error) {
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
throw error;
}
}
export function resetSharedCodexAppServerClientForTests(): void {
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
}
export function clearSharedCodexAppServerClient(): void {
const client = sharedClient;
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
client?.close();
}
function clearSharedClientIfCurrent(client: CodexAppServerClient): void {
if (sharedClient !== client) {
return;
}
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
}
export async function listCodexAppServerModels(
options: CodexAppServerListModelsOptions = {},
): Promise<CodexAppServerModelListResult> {
const timeoutMs = options.timeoutMs ?? 2500;
return await withTimeout(
(async () => {
const client = await getSharedCodexAppServerClient({ startOptions: options.startOptions });
const response = await client.request<JsonObject>("model/list", {
limit: options.limit ?? null,
cursor: options.cursor ?? null,
includeHidden: options.includeHidden ?? null,
});
return readModelListResult(response);
})(),
timeoutMs,
"codex app-server model/list timed out",
);
}
export async function requestCodexAppServerJson<T = JsonValue | undefined>(params: {
method: string;
requestParams?: JsonValue;
timeoutMs?: number;
startOptions?: CodexAppServerStartOptions;
}): Promise<T> {
const timeoutMs = params.timeoutMs ?? 60_000;
return await withTimeout(
(async () => {
const client = await getSharedCodexAppServerClient({ startOptions: params.startOptions });
return await client.request<T>(params.method, params.requestParams);
})(),
timeoutMs,
`codex app-server ${params.method} timed out`,
);
}
export function defaultServerRequestResponse(
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
): JsonValue {
@@ -416,85 +285,6 @@ export function defaultServerRequestResponse(
return {};
}
function readModelListResult(value: JsonValue | undefined): CodexAppServerModelListResult {
if (!isJsonObjectValue(value) || !Array.isArray(value.data)) {
return { models: [] };
}
const models = value.data
.map((entry) => readCodexModel(entry))
.filter((entry): entry is CodexAppServerModel => entry !== undefined);
const nextCursor = typeof value.nextCursor === "string" ? value.nextCursor : undefined;
return { models, ...(nextCursor ? { nextCursor } : {}) };
}
function readCodexModel(value: unknown): CodexAppServerModel | undefined {
if (!isJsonObjectValue(value)) {
return undefined;
}
const id = readNonEmptyString(value.id);
const model = readNonEmptyString(value.model) ?? id;
if (!id || !model) {
return undefined;
}
return {
id,
model,
...(readNonEmptyString(value.displayName)
? { displayName: readNonEmptyString(value.displayName) }
: {}),
...(readNonEmptyString(value.description)
? { description: readNonEmptyString(value.description) }
: {}),
...(typeof value.hidden === "boolean" ? { hidden: value.hidden } : {}),
...(typeof value.isDefault === "boolean" ? { isDefault: value.isDefault } : {}),
inputModalities: readStringArray(value.inputModalities),
supportedReasoningEfforts: readReasoningEfforts(value.supportedReasoningEfforts),
...(readNonEmptyString(value.defaultReasoningEffort)
? { defaultReasoningEffort: readNonEmptyString(value.defaultReasoningEffort) }
: {}),
};
}
function readReasoningEfforts(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
const efforts = value
.map((entry) => {
if (!isJsonObjectValue(entry)) {
return undefined;
}
return readNonEmptyString(entry.reasoningEffort);
})
.filter((entry): entry is string => entry !== undefined);
return [...new Set(efforts)];
}
function readStringArray(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return [
...new Set(
value
.map((entry) => readNonEmptyString(entry))
.filter((entry): entry is string => entry !== undefined),
),
];
}
function readNonEmptyString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function isJsonObjectValue(value: unknown): value is JsonObject {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}
function assertSupportedCodexAppServerVersion(response: CodexInitializeResponse): void {
const detectedVersion = readCodexVersionFromUserAgent(response.userAgent);
if (!detectedVersion) {
@@ -539,29 +329,6 @@ function numericVersionParts(version: string): number[] {
.map((part) => (Number.isFinite(part) ? part : 0));
}
async function withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
timeoutMessage: string,
): Promise<T> {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return await promise;
}
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timeout = setTimeout(() => reject(new Error(timeoutMessage)), Math.max(1, timeoutMs));
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
export function isCodexAppServerApprovalRequest(method: string): boolean {
return method.includes("requestApproval") || method.includes("Approval");
}
@@ -575,86 +342,3 @@ function formatExitValue(value: unknown): string {
}
return "unknown";
}
function createWebSocketTransport(options: CodexAppServerStartOptions): CodexAppServerTransport {
if (!options.url) {
throw new Error(
"codex app-server websocket transport requires plugins.entries.codex.config.appServer.url",
);
}
const events = new EventEmitter();
const stdout = new PassThrough();
const stderr = new PassThrough();
const headers = {
...options.headers,
...(options.authToken ? { Authorization: `Bearer ${options.authToken}` } : {}),
};
const socket = new WebSocket(options.url, { headers });
const pendingFrames: string[] = [];
let killed = false;
const sendFrame = (frame: string) => {
const trimmed = frame.trim();
if (!trimmed) {
return;
}
if (socket.readyState === WebSocket.OPEN) {
socket.send(trimmed);
return;
}
pendingFrames.push(trimmed);
};
// `initialize` can be written before the WebSocket open event fires. Buffer
// whole JSON-RPC frames so stdio and websocket transports share call timing.
socket.once("open", () => {
for (const frame of pendingFrames.splice(0)) {
socket.send(frame);
}
});
socket.once("error", (error) => events.emit("error", error));
socket.once("close", (code, reason) => {
killed = true;
events.emit("exit", code, reason.toString("utf8"));
});
socket.on("message", (data) => {
const text = websocketFrameToText(data);
stdout.write(text.endsWith("\n") ? text : `${text}\n`);
});
const stdin = new Writable({
write(chunk, _encoding, callback) {
for (const frame of chunk.toString("utf8").split("\n")) {
sendFrame(frame);
}
callback();
},
});
return {
stdin,
stdout,
stderr,
get killed() {
return killed;
},
kill: () => {
killed = true;
socket.close();
},
once: (event, listener) => events.once(event, listener),
};
}
function websocketFrameToText(data: RawData): string {
if (typeof data === "string") {
return data;
}
if (Buffer.isBuffer(data)) {
return data.toString("utf8");
}
if (Array.isArray(data)) {
return Buffer.concat(data).toString("utf8");
}
return Buffer.from(data).toString("utf8");
}

View File

@@ -4,14 +4,11 @@ import {
type CompactEmbeddedPiSessionParams,
type EmbeddedPiCompactResult,
} from "openclaw/plugin-sdk/agent-harness";
import {
getSharedCodexAppServerClient,
type CodexAppServerClient,
type CodexServerNotificationHandler,
} from "./client.js";
import type { CodexAppServerClient, CodexServerNotificationHandler } from "./client.js";
import { resolveCodexAppServerRuntimeOptions, type CodexAppServerStartOptions } from "./config.js";
import { isJsonObject, type CodexServerNotification, type JsonObject } from "./protocol.js";
import { readCodexAppServerBinding } from "./session-binding.js";
import { getSharedCodexAppServerClient } from "./shared-client.js";
type CodexAppServerClientFactory = (
startOptions?: CodexAppServerStartOptions,

View File

@@ -0,0 +1,127 @@
import type { CodexAppServerStartOptions } from "./config.js";
import { type JsonObject, type JsonValue } from "./protocol.js";
import { getSharedCodexAppServerClient } from "./shared-client.js";
import { withTimeout } from "./timeout.js";
export type CodexAppServerModel = {
id: string;
model: string;
displayName?: string;
description?: string;
hidden?: boolean;
isDefault?: boolean;
inputModalities: string[];
supportedReasoningEfforts: string[];
defaultReasoningEffort?: string;
};
export type CodexAppServerModelListResult = {
models: CodexAppServerModel[];
nextCursor?: string;
};
export type CodexAppServerListModelsOptions = {
limit?: number;
cursor?: string;
includeHidden?: boolean;
timeoutMs?: number;
startOptions?: CodexAppServerStartOptions;
};
export async function listCodexAppServerModels(
options: CodexAppServerListModelsOptions = {},
): Promise<CodexAppServerModelListResult> {
const timeoutMs = options.timeoutMs ?? 2500;
return await withTimeout(
(async () => {
const client = await getSharedCodexAppServerClient({ startOptions: options.startOptions });
const response = await client.request<JsonObject>("model/list", {
limit: options.limit ?? null,
cursor: options.cursor ?? null,
includeHidden: options.includeHidden ?? null,
});
return readModelListResult(response);
})(),
timeoutMs,
"codex app-server model/list timed out",
);
}
function readModelListResult(value: JsonValue | undefined): CodexAppServerModelListResult {
if (!isJsonObjectValue(value) || !Array.isArray(value.data)) {
return { models: [] };
}
const models = value.data
.map((entry) => readCodexModel(entry))
.filter((entry): entry is CodexAppServerModel => entry !== undefined);
const nextCursor = typeof value.nextCursor === "string" ? value.nextCursor : undefined;
return { models, ...(nextCursor ? { nextCursor } : {}) };
}
function readCodexModel(value: unknown): CodexAppServerModel | undefined {
if (!isJsonObjectValue(value)) {
return undefined;
}
const id = readNonEmptyString(value.id);
const model = readNonEmptyString(value.model) ?? id;
if (!id || !model) {
return undefined;
}
return {
id,
model,
...(readNonEmptyString(value.displayName)
? { displayName: readNonEmptyString(value.displayName) }
: {}),
...(readNonEmptyString(value.description)
? { description: readNonEmptyString(value.description) }
: {}),
...(typeof value.hidden === "boolean" ? { hidden: value.hidden } : {}),
...(typeof value.isDefault === "boolean" ? { isDefault: value.isDefault } : {}),
inputModalities: readStringArray(value.inputModalities),
supportedReasoningEfforts: readReasoningEfforts(value.supportedReasoningEfforts),
...(readNonEmptyString(value.defaultReasoningEffort)
? { defaultReasoningEffort: readNonEmptyString(value.defaultReasoningEffort) }
: {}),
};
}
function readReasoningEfforts(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
const efforts = value
.map((entry) => {
if (!isJsonObjectValue(entry)) {
return undefined;
}
return readNonEmptyString(entry.reasoningEffort);
})
.filter((entry): entry is string => entry !== undefined);
return [...new Set(efforts)];
}
function readStringArray(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return [
...new Set(
value
.map((entry) => readNonEmptyString(entry))
.filter((entry): entry is string => entry !== undefined),
),
];
}
function readNonEmptyString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function isJsonObjectValue(value: unknown): value is JsonObject {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}

View File

@@ -0,0 +1,21 @@
import type { CodexAppServerStartOptions } from "./config.js";
import type { JsonValue } from "./protocol.js";
import { getSharedCodexAppServerClient } from "./shared-client.js";
import { withTimeout } from "./timeout.js";
export async function requestCodexAppServerJson<T = JsonValue | undefined>(params: {
method: string;
requestParams?: JsonValue;
timeoutMs?: number;
startOptions?: CodexAppServerStartOptions;
}): Promise<T> {
const timeoutMs = params.timeoutMs ?? 60_000;
return await withTimeout(
(async () => {
const client = await getSharedCodexAppServerClient({ startOptions: params.startOptions });
return await client.request<T>(params.method, params.requestParams);
})(),
timeoutMs,
`codex app-server ${params.method} timed out`,
);
}

View File

@@ -9,7 +9,12 @@ import {
} from "openclaw/plugin-sdk/agent-harness";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CodexServerNotification } from "./protocol.js";
import { runCodexAppServerAttempt, __testing } from "./run-attempt.js";
import {
buildThreadResumeParams,
buildTurnStartParams,
runCodexAppServerAttempt,
__testing,
} from "./run-attempt.js";
import { writeCodexAppServerBinding } from "./session-binding.js";
let tempDir: string;
@@ -391,4 +396,44 @@ describe("runCodexAppServerAttempt", () => {
]),
);
});
it("builds resume and turn params from the currently selected OpenClaw model", () => {
const params = createParams("/tmp/session.jsonl", "/tmp/workspace");
const appServer = {
start: {
transport: "stdio" as const,
command: "codex",
args: ["app-server", "--listen", "stdio://"],
headers: {},
},
requestTimeoutMs: 60_000,
approvalPolicy: "on-request" as const,
approvalsReviewer: "guardian_subagent" as const,
sandbox: "danger-full-access" as const,
serviceTier: "priority",
};
expect(buildThreadResumeParams(params, { threadId: "thread-1", appServer })).toEqual({
threadId: "thread-1",
model: "gpt-5.4-codex",
modelProvider: "openai",
approvalPolicy: "on-request",
approvalsReviewer: "guardian_subagent",
sandbox: "danger-full-access",
serviceTier: "priority",
persistExtendedHistory: true,
});
expect(
buildTurnStartParams(params, { threadId: "thread-1", cwd: "/tmp/workspace", appServer }),
).toEqual(
expect.objectContaining({
threadId: "thread-1",
cwd: "/tmp/workspace",
model: "gpt-5.4-codex",
approvalPolicy: "on-request",
approvalsReviewer: "guardian_subagent",
serviceTier: "priority",
}),
);
});
});

View File

@@ -18,12 +18,7 @@ import {
type EmbeddedRunAttemptResult,
} from "openclaw/plugin-sdk/agent-harness";
import { handleCodexAppServerApprovalRequest } from "./approval-bridge.js";
import {
clearSharedCodexAppServerClient,
getSharedCodexAppServerClient,
isCodexAppServerApprovalRequest,
type CodexAppServerClient,
} from "./client.js";
import { isCodexAppServerApprovalRequest, type CodexAppServerClient } from "./client.js";
import {
resolveCodexAppServerRuntimeOptions,
type CodexAppServerRuntimeOptions,
@@ -35,8 +30,10 @@ import {
isJsonObject,
type CodexServerNotification,
type CodexDynamicToolCallParams,
type CodexThreadResumeParams,
type CodexThreadResumeResponse,
type CodexThreadStartResponse,
type CodexTurnStartParams,
type CodexTurnStartResponse,
type CodexUserInput,
type JsonObject,
@@ -48,6 +45,7 @@ import {
writeCodexAppServerBinding,
type CodexAppServerThreadBinding,
} from "./session-binding.js";
import { clearSharedCodexAppServerClient, getSharedCodexAppServerClient } from "./shared-client.js";
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
type CodexAppServerClientFactory = (
@@ -185,16 +183,14 @@ export async function runCodexAppServerAttempt(
let turn: CodexTurnStartResponse;
try {
turn = await client.request<CodexTurnStartResponse>("turn/start", {
threadId: thread.threadId,
input: buildUserInput(params),
cwd: effectiveWorkspace,
approvalPolicy: appServer.approvalPolicy,
approvalsReviewer: appServer.approvalsReviewer,
model: params.modelId,
...(appServer.serviceTier ? { serviceTier: appServer.serviceTier } : {}),
effort: resolveReasoningEffort(params.thinkLevel),
});
turn = await client.request<CodexTurnStartResponse>(
"turn/start",
buildTurnStartParams(params, {
threadId: thread.threadId,
cwd: effectiveWorkspace,
appServer,
}),
);
} catch (error) {
notificationCleanup();
requestCleanup();
@@ -426,16 +422,13 @@ async function startOrResumeThread(params: {
await clearCodexAppServerBinding(params.params.sessionFile);
} else {
try {
const response = await params.client.request<CodexThreadResumeResponse>("thread/resume", {
threadId: binding.threadId,
model: params.params.modelId,
modelProvider: normalizeModelProvider(params.params.provider),
approvalPolicy: params.appServer.approvalPolicy,
approvalsReviewer: params.appServer.approvalsReviewer,
sandbox: params.appServer.sandbox,
...(params.appServer.serviceTier ? { serviceTier: params.appServer.serviceTier } : {}),
persistExtendedHistory: true,
});
const response = await params.client.request<CodexThreadResumeResponse>(
"thread/resume",
buildThreadResumeParams(params.params, {
threadId: binding.threadId,
appServer: params.appServer,
}),
);
await writeCodexAppServerBinding(params.params.sessionFile, {
threadId: response.thread.id,
cwd: params.cwd,
@@ -497,6 +490,45 @@ async function startOrResumeThread(params: {
};
}
export function buildThreadResumeParams(
params: EmbeddedRunAttemptParams,
options: {
threadId: string;
appServer: CodexAppServerRuntimeOptions;
},
): CodexThreadResumeParams {
return {
threadId: options.threadId,
model: params.modelId,
modelProvider: normalizeModelProvider(params.provider),
approvalPolicy: options.appServer.approvalPolicy,
approvalsReviewer: options.appServer.approvalsReviewer,
sandbox: options.appServer.sandbox,
...(options.appServer.serviceTier ? { serviceTier: options.appServer.serviceTier } : {}),
persistExtendedHistory: true,
};
}
export function buildTurnStartParams(
params: EmbeddedRunAttemptParams,
options: {
threadId: string;
cwd: string;
appServer: CodexAppServerRuntimeOptions;
},
): CodexTurnStartParams {
return {
threadId: options.threadId,
input: buildUserInput(params),
cwd: options.cwd,
approvalPolicy: options.appServer.approvalPolicy,
approvalsReviewer: options.appServer.approvalsReviewer,
model: params.modelId,
...(options.appServer.serviceTier ? { serviceTier: options.appServer.serviceTier } : {}),
effort: resolveReasoningEffort(params.thinkLevel),
};
}
function fingerprintDynamicTools(dynamicTools: JsonValue[]): string {
return JSON.stringify(dynamicTools.map(stabilizeJsonValue));
}

View File

@@ -0,0 +1,66 @@
import { CodexAppServerClient } from "./client.js";
import {
codexAppServerStartOptionsKey,
resolveCodexAppServerRuntimeOptions,
type CodexAppServerStartOptions,
} from "./config.js";
let sharedClient: CodexAppServerClient | undefined;
let sharedClientPromise: Promise<CodexAppServerClient> | undefined;
let sharedClientKey: string | undefined;
export async function getSharedCodexAppServerClient(options?: {
startOptions?: CodexAppServerStartOptions;
}): Promise<CodexAppServerClient> {
const startOptions = options?.startOptions ?? resolveCodexAppServerRuntimeOptions().start;
const key = codexAppServerStartOptionsKey(startOptions);
if (sharedClientKey && sharedClientKey !== key) {
clearSharedCodexAppServerClient();
}
sharedClientKey = key;
sharedClientPromise ??= (async () => {
const client = CodexAppServerClient.start(startOptions);
sharedClient = client;
client.addCloseHandler(clearSharedClientIfCurrent);
try {
await client.initialize();
return client;
} catch (error) {
// Startup failures happen before callers own the shared client, so close
// the child here instead of leaving a rejected daemon attached to stdio.
client.close();
throw error;
}
})();
try {
return await sharedClientPromise;
} catch (error) {
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
throw error;
}
}
export function resetSharedCodexAppServerClientForTests(): void {
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
}
export function clearSharedCodexAppServerClient(): void {
const client = sharedClient;
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
client?.close();
}
function clearSharedClientIfCurrent(client: CodexAppServerClient): void {
if (sharedClient !== client) {
return;
}
sharedClient = undefined;
sharedClientPromise = undefined;
sharedClientKey = undefined;
}

View File

@@ -0,0 +1,22 @@
export async function withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
timeoutMessage: string,
): Promise<T> {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return await promise;
}
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timeout = setTimeout(() => reject(new Error(timeoutMessage)), Math.max(1, timeoutMs));
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}

View File

@@ -0,0 +1,10 @@
import { spawn } from "node:child_process";
import type { CodexAppServerStartOptions } from "./config.js";
import type { CodexAppServerTransport } from "./transport.js";
export function createStdioTransport(options: CodexAppServerStartOptions): CodexAppServerTransport {
return spawn(options.command, options.args, {
env: process.env,
stdio: ["pipe", "pipe", "pipe"],
});
}

View File

@@ -0,0 +1,90 @@
import { EventEmitter } from "node:events";
import { PassThrough, Writable } from "node:stream";
import WebSocket, { type RawData } from "ws";
import type { CodexAppServerStartOptions } from "./config.js";
import type { CodexAppServerTransport } from "./transport.js";
export function createWebSocketTransport(
options: CodexAppServerStartOptions,
): CodexAppServerTransport {
if (!options.url) {
throw new Error(
"codex app-server websocket transport requires plugins.entries.codex.config.appServer.url",
);
}
const events = new EventEmitter();
const stdout = new PassThrough();
const stderr = new PassThrough();
const headers = {
...options.headers,
...(options.authToken ? { Authorization: `Bearer ${options.authToken}` } : {}),
};
const socket = new WebSocket(options.url, { headers });
const pendingFrames: string[] = [];
let killed = false;
const sendFrame = (frame: string) => {
const trimmed = frame.trim();
if (!trimmed) {
return;
}
if (socket.readyState === WebSocket.OPEN) {
socket.send(trimmed);
return;
}
pendingFrames.push(trimmed);
};
// `initialize` can be written before the WebSocket open event fires. Buffer
// whole JSON-RPC frames so stdio and websocket transports share call timing.
socket.once("open", () => {
for (const frame of pendingFrames.splice(0)) {
socket.send(frame);
}
});
socket.once("error", (error) => events.emit("error", error));
socket.once("close", (code, reason) => {
killed = true;
events.emit("exit", code, reason.toString("utf8"));
});
socket.on("message", (data) => {
const text = websocketFrameToText(data);
stdout.write(text.endsWith("\n") ? text : `${text}\n`);
});
const stdin = new Writable({
write(chunk, _encoding, callback) {
for (const frame of chunk.toString("utf8").split("\n")) {
sendFrame(frame);
}
callback();
},
});
return {
stdin,
stdout,
stderr,
get killed() {
return killed;
},
kill: () => {
killed = true;
socket.close();
},
once: (event, listener) => events.once(event, listener),
};
}
function websocketFrameToText(data: RawData): string {
if (typeof data === "string") {
return data;
}
if (Buffer.isBuffer(data)) {
return data.toString("utf8");
}
if (Array.isArray(data)) {
return Buffer.concat(data).toString("utf8");
}
return Buffer.from(data).toString("utf8");
}

View File

@@ -0,0 +1,8 @@
export type CodexAppServerTransport = {
stdin: { write: (data: string) => unknown };
stdout: NodeJS.ReadableStream;
stderr: NodeJS.ReadableStream;
killed?: boolean;
kill?: () => unknown;
once: (event: string, listener: (...args: unknown[]) => void) => unknown;
};

View File

@@ -0,0 +1,157 @@
import type { CodexAppServerModelListResult } from "./app-server/models.js";
import { isJsonObject, type JsonObject, type JsonValue } from "./app-server/protocol.js";
import type { SafeValue } from "./command-rpc.js";
export type CodexStatusProbes = {
models: SafeValue<CodexAppServerModelListResult>;
account: SafeValue<JsonValue | undefined>;
limits: SafeValue<JsonValue | undefined>;
mcps: SafeValue<JsonValue | undefined>;
skills: SafeValue<JsonValue | undefined>;
};
export function formatCodexStatus(probes: CodexStatusProbes): string {
const connected =
probes.models.ok || probes.account.ok || probes.limits.ok || probes.mcps.ok || probes.skills.ok;
const lines = [`Codex app-server: ${connected ? "connected" : "unavailable"}`];
if (probes.models.ok) {
lines.push(
`Models: ${
probes.models.value.models
.map((model) => model.id)
.slice(0, 8)
.join(", ") || "none"
}`,
);
} else {
lines.push(`Models: ${probes.models.error}`);
}
lines.push(
`Account: ${probes.account.ok ? summarizeAccount(probes.account.value) : probes.account.error}`,
);
lines.push(
`Rate limits: ${probes.limits.ok ? summarizeArrayLike(probes.limits.value) : probes.limits.error}`,
);
lines.push(
`MCP servers: ${probes.mcps.ok ? summarizeArrayLike(probes.mcps.value) : probes.mcps.error}`,
);
lines.push(
`Skills: ${probes.skills.ok ? summarizeArrayLike(probes.skills.value) : probes.skills.error}`,
);
return lines.join("\n");
}
export function formatModels(result: CodexAppServerModelListResult): string {
if (result.models.length === 0) {
return "No Codex app-server models returned.";
}
return [
"Codex models:",
...result.models.map((model) => `- ${model.id}${model.isDefault ? " (default)" : ""}`),
].join("\n");
}
export function formatThreads(response: JsonValue | undefined): string {
const threads = extractArray(response);
if (threads.length === 0) {
return "No Codex threads returned.";
}
return [
"Codex threads:",
...threads.slice(0, 10).map((thread) => {
const record = isJsonObject(thread) ? thread : {};
const id = readString(record, "threadId") ?? readString(record, "id") ?? "<unknown>";
const title =
readString(record, "title") ?? readString(record, "name") ?? readString(record, "summary");
const details = [
readString(record, "model"),
readString(record, "cwd"),
readString(record, "updatedAt") ?? readString(record, "lastUpdatedAt"),
].filter(Boolean);
return `- ${id}${title ? ` - ${title}` : ""}${
details.length > 0 ? ` (${details.join(", ")})` : ""
}\n Resume: /codex resume ${id}`;
}),
].join("\n");
}
export function formatAccount(
account: SafeValue<JsonValue | undefined>,
limits: SafeValue<JsonValue | undefined>,
): string {
return [
`Account: ${account.ok ? summarizeAccount(account.value) : account.error}`,
`Rate limits: ${limits.ok ? summarizeArrayLike(limits.value) : limits.error}`,
].join("\n");
}
export function formatList(response: JsonValue | undefined, label: string): string {
const entries = extractArray(response);
if (entries.length === 0) {
return `${label}: none returned.`;
}
return [
`${label}:`,
...entries.slice(0, 25).map((entry) => {
const record = isJsonObject(entry) ? entry : {};
return `- ${readString(record, "name") ?? readString(record, "id") ?? JSON.stringify(entry)}`;
}),
].join("\n");
}
export function buildHelp(): string {
return [
"Codex commands:",
"- /codex status",
"- /codex models",
"- /codex threads [filter]",
"- /codex resume <thread-id>",
"- /codex compact",
"- /codex review",
"- /codex account",
"- /codex mcp",
"- /codex skills",
].join("\n");
}
function summarizeAccount(value: JsonValue | undefined): string {
if (!isJsonObject(value)) {
return "unavailable";
}
return (
readString(value, "email") ??
readString(value, "accountEmail") ??
readString(value, "planType") ??
readString(value, "id") ??
"available"
);
}
function summarizeArrayLike(value: JsonValue | undefined): string {
const entries = extractArray(value);
if (entries.length === 0) {
return "none returned";
}
return `${entries.length}`;
}
function extractArray(value: JsonValue | undefined): JsonValue[] {
if (Array.isArray(value)) {
return value;
}
if (!isJsonObject(value)) {
return [];
}
for (const key of ["data", "items", "threads", "models", "skills", "servers", "rateLimits"]) {
const child = value[key];
if (Array.isArray(child)) {
return child;
}
}
return [];
}
export function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}

View File

@@ -0,0 +1,150 @@
import type { PluginCommandContext } from "openclaw/plugin-sdk/plugin-entry";
import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js";
import { listCodexAppServerModels } from "./app-server/models.js";
import { isJsonObject } from "./app-server/protocol.js";
import {
readCodexAppServerBinding,
writeCodexAppServerBinding,
} from "./app-server/session-binding.js";
import {
buildHelp,
formatAccount,
formatCodexStatus,
formatList,
formatModels,
formatThreads,
readString,
} from "./command-formatters.js";
import {
codexControlRequest,
readCodexStatusProbes,
requestOptions,
safeCodexControlRequest,
} from "./command-rpc.js";
export async function handleCodexSubcommand(
ctx: PluginCommandContext,
options: { pluginConfig?: unknown },
): Promise<{ text: string }> {
const [subcommand = "status", ...rest] = splitArgs(ctx.args);
const normalized = subcommand.toLowerCase();
if (normalized === "help") {
return { text: buildHelp() };
}
if (normalized === "status") {
return { text: formatCodexStatus(await readCodexStatusProbes(options.pluginConfig)) };
}
if (normalized === "models") {
return {
text: formatModels(await listCodexAppServerModels(requestOptions(options.pluginConfig, 100))),
};
}
if (normalized === "threads") {
return { text: await buildThreads(options.pluginConfig, rest.join(" ")) };
}
if (normalized === "resume") {
return { text: await resumeThread(ctx, options.pluginConfig, rest[0]) };
}
if (normalized === "compact") {
return {
text: await startThreadAction(
ctx,
options.pluginConfig,
CODEX_CONTROL_METHODS.compact,
"compaction",
),
};
}
if (normalized === "review") {
return {
text: await startThreadAction(
ctx,
options.pluginConfig,
CODEX_CONTROL_METHODS.review,
"review",
),
};
}
if (normalized === "mcp") {
return {
text: formatList(
await codexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.listMcpServers, {
limit: 100,
}),
"MCP servers",
),
};
}
if (normalized === "skills") {
return {
text: formatList(
await codexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.listSkills, {}),
"Codex skills",
),
};
}
if (normalized === "account") {
const [account, limits] = await Promise.all([
safeCodexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.account, {}),
safeCodexControlRequest(options.pluginConfig, CODEX_CONTROL_METHODS.rateLimits, {}),
]);
return { text: formatAccount(account, limits) };
}
return { text: `Unknown Codex command: ${subcommand}\n\n${buildHelp()}` };
}
async function buildThreads(pluginConfig: unknown, filter: string): Promise<string> {
const response = await codexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.listThreads, {
limit: 10,
...(filter.trim() ? { filter: filter.trim() } : {}),
});
return formatThreads(response);
}
async function resumeThread(
ctx: PluginCommandContext,
pluginConfig: unknown,
threadId: string | undefined,
): Promise<string> {
const normalizedThreadId = threadId?.trim();
if (!normalizedThreadId) {
return "Usage: /codex resume <thread-id>";
}
if (!ctx.sessionFile) {
return "Cannot attach a Codex thread because this command did not include an OpenClaw session file.";
}
const response = await codexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.resumeThread, {
threadId: normalizedThreadId,
persistExtendedHistory: true,
});
const thread = isJsonObject(response) && isJsonObject(response.thread) ? response.thread : {};
const effectiveThreadId = readString(thread, "id") ?? normalizedThreadId;
await writeCodexAppServerBinding(ctx.sessionFile, {
threadId: effectiveThreadId,
cwd: readString(thread, "cwd") ?? "",
model: isJsonObject(response) ? readString(response, "model") : undefined,
modelProvider: isJsonObject(response) ? readString(response, "modelProvider") : undefined,
});
return `Attached this OpenClaw session to Codex thread ${effectiveThreadId}.`;
}
async function startThreadAction(
ctx: PluginCommandContext,
pluginConfig: unknown,
method: typeof CODEX_CONTROL_METHODS.compact | typeof CODEX_CONTROL_METHODS.review,
label: string,
): Promise<string> {
if (!ctx.sessionFile) {
return `Cannot start Codex ${label} because this command did not include an OpenClaw session file.`;
}
const binding = await readCodexAppServerBinding(ctx.sessionFile);
if (!binding?.threadId) {
return `No Codex thread is attached to this OpenClaw session yet.`;
}
await codexControlRequest(pluginConfig, method, { threadId: binding.threadId });
return `Started Codex ${label} for thread ${binding.threadId}.`;
}
function splitArgs(value: string | undefined): string[] {
return (value ?? "").trim().split(/\s+/).filter(Boolean);
}

View File

@@ -0,0 +1,74 @@
import {
CODEX_CONTROL_METHODS,
describeControlFailure,
type CodexControlMethod,
} from "./app-server/capabilities.js";
import { resolveCodexAppServerRuntimeOptions } from "./app-server/config.js";
import { listCodexAppServerModels } from "./app-server/models.js";
import type { JsonValue } from "./app-server/protocol.js";
import { requestCodexAppServerJson } from "./app-server/request.js";
export type SafeValue<T> = { ok: true; value: T } | { ok: false; error: string };
export function requestOptions(pluginConfig: unknown, limit: number) {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig });
return {
limit,
timeoutMs: runtime.requestTimeoutMs,
startOptions: runtime.start,
};
}
export async function codexControlRequest(
pluginConfig: unknown,
method: CodexControlMethod,
requestParams?: JsonValue,
): Promise<JsonValue | undefined> {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig });
return await requestCodexAppServerJson({
method,
requestParams,
timeoutMs: runtime.requestTimeoutMs,
startOptions: runtime.start,
});
}
export async function safeCodexControlRequest(
pluginConfig: unknown,
method: CodexControlMethod,
requestParams?: JsonValue,
): Promise<SafeValue<JsonValue | undefined>> {
return await safeValue(
async () => await codexControlRequest(pluginConfig, method, requestParams),
);
}
export async function safeCodexModelList(pluginConfig: unknown, limit: number) {
return await safeValue(
async () => await listCodexAppServerModels(requestOptions(pluginConfig, limit)),
);
}
export async function readCodexStatusProbes(pluginConfig: unknown) {
const [models, account, limits, mcps, skills] = await Promise.all([
safeCodexModelList(pluginConfig, 20),
safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.account, {}),
safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.rateLimits, {}),
safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.listMcpServers, { limit: 100 }),
safeCodexControlRequest(pluginConfig, CODEX_CONTROL_METHODS.listSkills, {}),
]);
return { models, account, limits, mcps, skills };
}
export async function safeValue<T>(read: () => Promise<T>): Promise<SafeValue<T>> {
try {
return { ok: true, value: await read() };
} catch (error) {
return { ok: false, error: describeControlFailure(formatError(error)) };
}
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

View File

@@ -3,9 +3,25 @@ import os from "node:os";
import path from "node:path";
import type { PluginCommandContext } from "openclaw/plugin-sdk/plugin-entry";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetSharedCodexAppServerClientForTests } from "./app-server/client.js";
import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js";
import { resetSharedCodexAppServerClientForTests } from "./app-server/shared-client.js";
import { handleCodexCommand } from "./commands.js";
const commandRpcMocks = vi.hoisted(() => ({
codexControlRequest: vi.fn(),
readCodexStatusProbes: vi.fn(),
requestOptions: vi.fn((_pluginConfig: unknown, limit: number) => ({ limit })),
safeCodexControlRequest: vi.fn(),
safeCodexModelList: vi.fn(),
}));
const modelMocks = vi.hoisted(() => ({
listCodexAppServerModels: vi.fn(),
}));
vi.mock("./command-rpc.js", () => commandRpcMocks);
vi.mock("./app-server/models.js", () => modelMocks);
let tempDir: string;
function createContext(args: string, sessionFile?: string): PluginCommandContext {
@@ -25,28 +41,35 @@ function createContext(args: string, sessionFile?: string): PluginCommandContext
describe("codex command", () => {
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-command-"));
commandRpcMocks.codexControlRequest.mockReset();
commandRpcMocks.readCodexStatusProbes.mockReset();
commandRpcMocks.requestOptions.mockReset();
commandRpcMocks.requestOptions.mockImplementation((_pluginConfig: unknown, limit: number) => ({
limit,
}));
commandRpcMocks.safeCodexControlRequest.mockReset();
commandRpcMocks.safeCodexModelList.mockReset();
modelMocks.listCodexAppServerModels.mockReset();
});
afterEach(async () => {
resetSharedCodexAppServerClientForTests();
vi.restoreAllMocks();
await fs.rm(tempDir, { recursive: true, force: true });
});
it("attaches the current session to an existing Codex thread", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const requests: Array<{ method: string; params: unknown }> = [];
vi.spyOn(
await import("./app-server/client.js"),
"requestCodexAppServerJson",
).mockImplementation(async ({ method, requestParams }) => {
requests.push({ method, params: requestParams });
return {
thread: { id: "thread-123", cwd: "/repo" },
model: "gpt-5.4",
modelProvider: "openai",
};
});
commandRpcMocks.codexControlRequest.mockImplementation(
async (_pluginConfig: unknown, method: string, requestParams: unknown) => {
requests.push({ method: String(method), params: requestParams });
return {
thread: { id: "thread-123", cwd: "/repo" },
model: "gpt-5.4",
modelProvider: "openai",
};
},
);
await expect(
handleCodexCommand(createContext("resume thread-123", sessionFile)),
@@ -66,7 +89,7 @@ describe("codex command", () => {
});
it("shows model ids from Codex app-server", async () => {
vi.spyOn(await import("./app-server/client.js"), "listCodexAppServerModels").mockResolvedValue({
modelMocks.listCodexAppServerModels.mockResolvedValue({
models: [
{
id: "gpt-5.4",
@@ -81,4 +104,76 @@ describe("codex command", () => {
text: "Codex models:\n- gpt-5.4",
});
});
it("reports status unavailable when every Codex probe fails", async () => {
const offline = { ok: false as const, error: "offline" };
commandRpcMocks.readCodexStatusProbes.mockResolvedValue({
models: offline,
account: offline,
limits: offline,
mcps: offline,
skills: offline,
});
await expect(handleCodexCommand(createContext("status"))).resolves.toEqual({
text: [
"Codex app-server: unavailable",
"Models: offline",
"Account: offline",
"Rate limits: offline",
"MCP servers: offline",
"Skills: offline",
].join("\n"),
});
});
it("starts compaction for the attached Codex thread", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
await fs.writeFile(
`${sessionFile}.codex-app-server.json`,
JSON.stringify({ schemaVersion: 1, threadId: "thread-123", cwd: "/repo" }),
);
commandRpcMocks.codexControlRequest.mockResolvedValue({});
await expect(handleCodexCommand(createContext("compact", sessionFile))).resolves.toEqual({
text: "Started Codex compaction for thread thread-123.",
});
expect(commandRpcMocks.codexControlRequest).toHaveBeenCalledWith(
undefined,
CODEX_CONTROL_METHODS.compact,
{
threadId: "thread-123",
},
);
});
it("explains compaction when no Codex thread is attached", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
await expect(handleCodexCommand(createContext("compact", sessionFile))).resolves.toEqual({
text: "No Codex thread is attached to this OpenClaw session yet.",
});
});
it("passes filters to Codex thread listing", async () => {
commandRpcMocks.codexControlRequest.mockResolvedValue({
data: [{ id: "thread-123", title: "Fix the thing", model: "gpt-5.4", cwd: "/repo" }],
});
await expect(handleCodexCommand(createContext("threads fix"))).resolves.toEqual({
text: [
"Codex threads:",
"- thread-123 - Fix the thing (gpt-5.4, /repo)",
" Resume: /codex resume thread-123",
].join("\n"),
});
expect(commandRpcMocks.codexControlRequest).toHaveBeenCalledWith(
undefined,
CODEX_CONTROL_METHODS.listThreads,
{
limit: 10,
filter: "fix",
},
);
});
});

View File

@@ -2,13 +2,7 @@ import type {
OpenClawPluginCommandDefinition,
PluginCommandContext,
} from "openclaw/plugin-sdk/plugin-entry";
import { listCodexAppServerModels, requestCodexAppServerJson } from "./app-server/client.js";
import { resolveCodexAppServerRuntimeOptions } from "./app-server/config.js";
import { isJsonObject, type JsonObject, type JsonValue } from "./app-server/protocol.js";
import {
readCodexAppServerBinding,
writeCodexAppServerBinding,
} from "./app-server/session-binding.js";
import { handleCodexSubcommand } from "./command-handlers.js";
export function createCodexCommand(options: {
pluginConfig?: unknown;
@@ -26,274 +20,5 @@ export async function handleCodexCommand(
ctx: PluginCommandContext,
options: { pluginConfig?: unknown } = {},
): Promise<{ text: string }> {
const [subcommand = "status", ...rest] = splitArgs(ctx.args);
const normalized = subcommand.toLowerCase();
if (normalized === "help") {
return { text: buildHelp() };
}
if (normalized === "status") {
return { text: await buildStatus(options.pluginConfig) };
}
if (normalized === "models") {
return { text: await buildModels(options.pluginConfig) };
}
if (normalized === "threads") {
return { text: await buildThreads(options.pluginConfig, rest.join(" ")) };
}
if (normalized === "resume") {
return { text: await resumeThread(ctx, options.pluginConfig, rest[0]) };
}
if (normalized === "compact") {
return {
text: await startThreadAction(
ctx,
options.pluginConfig,
"thread/compact/start",
"compaction",
),
};
}
if (normalized === "review") {
return { text: await startThreadAction(ctx, options.pluginConfig, "review/start", "review") };
}
if (normalized === "mcp") {
return { text: await buildList(options.pluginConfig, "mcpServerStatus/list", "MCP servers") };
}
if (normalized === "skills") {
return { text: await buildList(options.pluginConfig, "skills/list", "Codex skills") };
}
if (normalized === "account") {
return { text: await buildAccount(options.pluginConfig) };
}
return { text: `Unknown Codex command: ${subcommand}\n\n${buildHelp()}` };
}
async function buildStatus(pluginConfig: unknown): Promise<string> {
const [models, account, limits, mcps, skills] = await Promise.all([
safeValue(() => listCodexAppServerModels(requestOptions(pluginConfig, 20))),
safeValue(() => codexRequest(pluginConfig, "account/read", {})),
safeValue(() => codexRequest(pluginConfig, "account/rateLimits/read", {})),
safeValue(() => codexRequest(pluginConfig, "mcpServerStatus/list", { limit: 100 })),
safeValue(() => codexRequest(pluginConfig, "skills/list", {})),
]);
const connected = models.ok || account.ok || limits.ok || mcps.ok || skills.ok;
const lines = [`Codex app-server: ${connected ? "connected" : "unavailable"}`];
if (models.ok) {
lines.push(
`Models: ${
models.value.models
.map((model) => model.id)
.slice(0, 8)
.join(", ") || "none"
}`,
);
} else {
lines.push(`Models: ${models.error}`);
}
lines.push(`Account: ${account.ok ? summarizeAccount(account.value) : account.error}`);
lines.push(`Rate limits: ${limits.ok ? summarizeArrayLike(limits.value) : limits.error}`);
lines.push(`MCP servers: ${mcps.ok ? summarizeArrayLike(mcps.value) : mcps.error}`);
lines.push(`Skills: ${skills.ok ? summarizeArrayLike(skills.value) : skills.error}`);
return lines.join("\n");
}
async function buildModels(pluginConfig: unknown): Promise<string> {
const result = await listCodexAppServerModels(requestOptions(pluginConfig, 100));
if (result.models.length === 0) {
return "No Codex app-server models returned.";
}
return [
"Codex models:",
...result.models.map((model) => `- ${model.id}${model.isDefault ? " (default)" : ""}`),
].join("\n");
}
async function buildThreads(pluginConfig: unknown, filter: string): Promise<string> {
const response = await codexRequest(pluginConfig, "thread/list", {
limit: 10,
...(filter.trim() ? { filter: filter.trim() } : {}),
});
const threads = extractArray(response);
if (threads.length === 0) {
return "No Codex threads returned.";
}
return [
"Codex threads:",
...threads.slice(0, 10).map((thread) => {
const record = isJsonObject(thread) ? thread : {};
const id = readString(record, "threadId") ?? readString(record, "id") ?? "<unknown>";
const title =
readString(record, "title") ?? readString(record, "name") ?? readString(record, "summary");
return `- ${id}${title ? ` - ${title}` : ""}`;
}),
].join("\n");
}
async function buildAccount(pluginConfig: unknown): Promise<string> {
const [account, limits] = await Promise.all([
safeValue(() => codexRequest(pluginConfig, "account/read", {})),
safeValue(() => codexRequest(pluginConfig, "account/rateLimits/read", {})),
]);
return [
`Account: ${account.ok ? summarizeAccount(account.value) : account.error}`,
`Rate limits: ${limits.ok ? summarizeArrayLike(limits.value) : limits.error}`,
].join("\n");
}
async function buildList(pluginConfig: unknown, method: string, label: string): Promise<string> {
const response = await codexRequest(pluginConfig, method, { limit: 100 });
const entries = extractArray(response);
if (entries.length === 0) {
return `${label}: none returned.`;
}
return [
`${label}:`,
...entries.slice(0, 25).map((entry) => {
const record = isJsonObject(entry) ? entry : {};
return `- ${readString(record, "name") ?? readString(record, "id") ?? JSON.stringify(entry)}`;
}),
].join("\n");
}
async function resumeThread(
ctx: PluginCommandContext,
pluginConfig: unknown,
threadId: string | undefined,
): Promise<string> {
const normalizedThreadId = threadId?.trim();
if (!normalizedThreadId) {
return "Usage: /codex resume <thread-id>";
}
if (!ctx.sessionFile) {
return "Cannot attach a Codex thread because this command did not include an OpenClaw session file.";
}
const response = await codexRequest(pluginConfig, "thread/resume", {
threadId: normalizedThreadId,
persistExtendedHistory: true,
});
const thread = isJsonObject(response) && isJsonObject(response.thread) ? response.thread : {};
const effectiveThreadId = readString(thread, "id") ?? normalizedThreadId;
await writeCodexAppServerBinding(ctx.sessionFile, {
threadId: effectiveThreadId,
cwd: readString(thread, "cwd") ?? "",
model: isJsonObject(response) ? readString(response, "model") : undefined,
modelProvider: isJsonObject(response) ? readString(response, "modelProvider") : undefined,
});
return `Attached this OpenClaw session to Codex thread ${effectiveThreadId}.`;
}
async function startThreadAction(
ctx: PluginCommandContext,
pluginConfig: unknown,
method: string,
label: string,
): Promise<string> {
if (!ctx.sessionFile) {
return `Cannot start Codex ${label} because this command did not include an OpenClaw session file.`;
}
const binding = await readCodexAppServerBinding(ctx.sessionFile);
if (!binding?.threadId) {
return `No Codex thread is attached to this OpenClaw session yet.`;
}
await codexRequest(pluginConfig, method, { threadId: binding.threadId });
return `Started Codex ${label} for thread ${binding.threadId}.`;
}
async function codexRequest(
pluginConfig: unknown,
method: string,
requestParams?: JsonValue,
): Promise<JsonValue | undefined> {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig });
return await requestCodexAppServerJson({
method,
requestParams,
timeoutMs: runtime.requestTimeoutMs,
startOptions: runtime.start,
});
}
function requestOptions(pluginConfig: unknown, limit: number) {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig });
return {
limit,
timeoutMs: runtime.requestTimeoutMs,
startOptions: runtime.start,
};
}
async function safeValue<T>(
read: () => Promise<T>,
): Promise<{ ok: true; value: T } | { ok: false; error: string }> {
try {
return { ok: true, value: await read() };
} catch (error) {
return { ok: false, error: formatError(error) };
}
}
function summarizeAccount(value: JsonValue | undefined): string {
if (!isJsonObject(value)) {
return "unavailable";
}
return (
readString(value, "email") ??
readString(value, "accountEmail") ??
readString(value, "planType") ??
readString(value, "id") ??
"available"
);
}
function summarizeArrayLike(value: JsonValue | undefined): string {
const entries = extractArray(value);
if (entries.length === 0) {
return "none returned";
}
return `${entries.length}`;
}
function extractArray(value: JsonValue | undefined): JsonValue[] {
if (Array.isArray(value)) {
return value;
}
if (!isJsonObject(value)) {
return [];
}
for (const key of ["data", "items", "threads", "models", "skills", "servers", "rateLimits"]) {
const child = value[key];
if (Array.isArray(child)) {
return child;
}
}
return [];
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function splitArgs(value: string | undefined): string[] {
return (value ?? "").trim().split(/\s+/).filter(Boolean);
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function buildHelp(): string {
return [
"Codex commands:",
"- /codex status",
"- /codex models",
"- /codex threads [filter]",
"- /codex resume <thread-id>",
"- /codex compact",
"- /codex review",
"- /codex account",
"- /codex mcp",
"- /codex skills",
].join("\n");
return await handleCodexSubcommand(ctx, options);
}