diff --git a/extensions/codex/harness.ts b/extensions/codex/harness.ts index 5879e8d1089..aa91b3c6821 100644 --- a/extensions/codex/harness.ts +++ b/extensions/codex/harness.ts @@ -8,6 +8,7 @@ import type { } from "./src/app-server/models.js"; import { runCodexAppServerAttempt } from "./src/app-server/run-attempt.js"; import { clearCodexAppServerBinding } from "./src/app-server/session-binding.js"; +import { clearSharedCodexAppServerClient } from "./src/app-server/shared-client.js"; const DEFAULT_CODEX_HARNESS_PROVIDER_IDS = new Set(["codex", "openai-codex"]); @@ -47,5 +48,8 @@ export function createCodexAppServerAgentHarness(options?: { await clearCodexAppServerBinding(params.sessionFile); } }, + dispose: () => { + clearSharedCodexAppServerClient(); + }, }; } diff --git a/extensions/codex/index.test.ts b/extensions/codex/index.test.ts index 07940a30f7a..37c405a91c1 100644 --- a/extensions/codex/index.test.ts +++ b/extensions/codex/index.test.ts @@ -35,6 +35,7 @@ describe("codex plugin", () => { expect(registerAgentHarness.mock.calls[0]?.[0]).toMatchObject({ id: "codex", label: "Codex agent harness", + dispose: expect.any(Function), }); expect(registerCommand.mock.calls[0]?.[0]).toMatchObject({ name: "codex", diff --git a/extensions/codex/provider.test.ts b/extensions/codex/provider.test.ts index f4cd789a7bf..3d1c26b0266 100644 --- a/extensions/codex/provider.test.ts +++ b/extensions/codex/provider.test.ts @@ -1,5 +1,12 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { buildCodexProvider, buildCodexProviderCatalog } from "./provider.js"; +import { CodexAppServerClient } from "./src/app-server/client.js"; +import { resetSharedCodexAppServerClientForTests } from "./src/app-server/shared-client.js"; + +afterEach(() => { + resetSharedCodexAppServerClientForTests(); + vi.restoreAllMocks(); +}); describe("codex provider", () => { it("maps Codex app-server models to a Codex provider catalog", async () => { @@ -64,17 +71,49 @@ describe("codex provider", () => { ]); }); + it("keeps a static fallback catalog when live discovery is explicitly disabled by env", async () => { + const listModels = vi.fn(); + + const result = await buildCodexProviderCatalog({ + env: { OPENCLAW_CODEX_DISCOVERY_LIVE: "0" }, + listModels, + }); + + expect(listModels).not.toHaveBeenCalled(); + expect(result.provider.models.map((model) => model.id)).toEqual([ + "gpt-5.4", + "gpt-5.4-mini", + "gpt-5.2", + ]); + }); + + it("closes the transient app-server client after live discovery", async () => { + const client = { + initialize: vi.fn(async () => undefined), + request: vi.fn(async () => ({ data: [] })), + addCloseHandler: vi.fn(() => () => undefined), + close: vi.fn(), + } as unknown as CodexAppServerClient; + vi.spyOn(CodexAppServerClient, "start").mockReturnValue(client); + + await buildCodexProviderCatalog({ + env: { OPENCLAW_CODEX_DISCOVERY_LIVE: "1" }, + }); + + expect(client.close).toHaveBeenCalledTimes(1); + }); + it("resolves arbitrary Codex app-server model ids through the codex provider", () => { const provider = buildCodexProvider(); const model = provider.resolveDynamicModel?.({ provider: "codex", - modelId: " arcanine ", + modelId: " custom-model ", modelRegistry: { find: () => null }, } as never); expect(model).toMatchObject({ - id: "arcanine", + id: "custom-model", provider: "codex", api: "openai-codex-responses", baseUrl: "https://chatgpt.com/backend-api", diff --git a/extensions/codex/provider.ts b/extensions/codex/provider.ts index 4501efc2d55..c737a24ad58 100644 --- a/extensions/codex/provider.ts +++ b/extensions/codex/provider.ts @@ -15,6 +15,7 @@ import { readCodexPluginConfig, resolveCodexAppServerRuntimeOptions, } from "./src/app-server/config.js"; +import { clearSharedCodexAppServerClient } from "./src/app-server/shared-client.js"; const PROVIDER_ID = "codex"; const CODEX_BASE_URL = "https://chatgpt.com/backend-api"; @@ -99,14 +100,18 @@ export async function buildCodexProviderCatalog( const config = readCodexPluginConfig(options.pluginConfig); const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig: options.pluginConfig }); const timeoutMs = normalizeTimeoutMs(config.discovery?.timeoutMs); - const discovered = - config.discovery?.enabled === false || shouldSkipLiveDiscovery(options.env) - ? [] - : await listModelsBestEffort({ - listModels: options.listModels ?? listCodexAppServerModels, - timeoutMs, - startOptions: appServer.start, - }); + let discovered: CodexAppServerModel[] = []; + if (config.discovery?.enabled !== false && !shouldSkipLiveDiscovery(options.env)) { + try { + discovered = await listModelsBestEffort({ + listModels: options.listModels ?? listCodexAppServerModels, + timeoutMs, + startOptions: appServer.start, + }); + } finally { + clearSharedCodexAppServerClient(); + } + } const models = (discovered.length > 0 ? discovered : FALLBACK_CODEX_MODELS).map( codexModelToDefinition, ); @@ -189,7 +194,11 @@ function normalizeTimeoutMs(value: unknown): number { } function shouldSkipLiveDiscovery(env: NodeJS.ProcessEnv = process.env): boolean { - return Boolean(env.VITEST) && env[LIVE_DISCOVERY_ENV] !== "1"; + const override = env[LIVE_DISCOVERY_ENV]?.trim().toLowerCase(); + if (override === "0" || override === "false") { + return true; + } + return Boolean(env.VITEST) && override !== "1"; } function shouldDefaultToReasoningModel(modelId: string): boolean { diff --git a/extensions/codex/src/app-server/client.test.ts b/extensions/codex/src/app-server/client.test.ts index eb7718d7c27..b8dc9b8648a 100644 --- a/extensions/codex/src/app-server/client.test.ts +++ b/extensions/codex/src/app-server/client.test.ts @@ -2,6 +2,7 @@ import { EventEmitter } from "node:events"; import { PassThrough, Writable } from "node:stream"; import { afterEach, describe, expect, it, vi } from "vitest"; import { + __testing, CodexAppServerClient, CodexAppServerRpcError, MIN_CODEX_APP_SERVER_VERSION, @@ -47,6 +48,7 @@ describe("CodexAppServerClient", () => { afterEach(() => { resetSharedCodexAppServerClientForTests(); vi.restoreAllMocks(); + vi.useRealTimers(); for (const client of clients) { client.close(); } @@ -141,6 +143,30 @@ describe("CodexAppServerClient", () => { expect(harness.writes).toHaveLength(1); }); + it("force-stops app-server transports that ignore the graceful signal", async () => { + vi.useFakeTimers(); + const process = Object.assign(new EventEmitter(), { + stdin: { + write: vi.fn(), + end: vi.fn(), + destroy: vi.fn(), + unref: vi.fn(), + }, + stdout: Object.assign(new PassThrough(), { unref: vi.fn() }), + stderr: Object.assign(new PassThrough(), { unref: vi.fn() }), + exitCode: null, + signalCode: null, + kill: vi.fn(), + unref: vi.fn(), + }); + + __testing.closeCodexAppServerTransport(process, { forceKillDelayMs: 25 }); + + expect(process.kill).toHaveBeenCalledWith("SIGTERM"); + await vi.advanceTimersByTimeAsync(25); + expect(process.kill).toHaveBeenCalledWith("SIGKILL"); + expect(process.unref).toHaveBeenCalledTimes(1); + }); it("reads the Codex version from the app-server user agent", () => { expect(readCodexVersionFromUserAgent("openclaw/0.118.0 (macOS; test)")).toBe("0.118.0"); expect(readCodexVersionFromUserAgent("codex_cli_rs/0.118.1-dev (linux; test)")).toBe( diff --git a/extensions/codex/src/app-server/client.ts b/extensions/codex/src/app-server/client.ts index fc3c1b94a6b..3bb1e1de0a0 100644 --- a/extensions/codex/src/app-server/client.ts +++ b/extensions/codex/src/app-server/client.ts @@ -12,7 +12,7 @@ import { } from "./protocol.js"; import { createStdioTransport } from "./transport-stdio.js"; import { createWebSocketTransport } from "./transport-websocket.js"; -import type { CodexAppServerTransport } from "./transport.js"; +import { closeCodexAppServerTransport, type CodexAppServerTransport } from "./transport.js"; export const MIN_CODEX_APP_SERVER_VERSION = "0.118.0"; @@ -149,11 +149,13 @@ export class CodexAppServerClient { } close(): void { + if (this.closed) { + return; + } this.closed = true; this.lines.close(); - if (!this.child.killed) { - this.child.kill?.(); - } + this.rejectPendingRequests(new Error("codex app-server client is closed")); + closeCodexAppServerTransport(this.child); } private writeMessage(message: RpcRequest | RpcResponse): void { @@ -245,6 +247,10 @@ export class CodexAppServerClient { return; } this.closed = true; + this.rejectPendingRequests(error); + } + + private rejectPendingRequests(error: Error): void { for (const pending of this.pending.values()) { pending.reject(error); } @@ -354,3 +360,7 @@ function formatExitValue(value: unknown): string { } return "unknown"; } + +export const __testing = { + closeCodexAppServerTransport, +} as const; diff --git a/extensions/codex/src/app-server/shared-client.ts b/extensions/codex/src/app-server/shared-client.ts index 90f1978de3c..12ca87f12cf 100644 --- a/extensions/codex/src/app-server/shared-client.ts +++ b/extensions/codex/src/app-server/shared-client.ts @@ -5,22 +5,35 @@ import { type CodexAppServerStartOptions, } from "./config.js"; -let sharedClient: CodexAppServerClient | undefined; -let sharedClientPromise: Promise | undefined; -let sharedClientKey: string | undefined; +type SharedCodexAppServerClientState = { + client?: CodexAppServerClient; + promise?: Promise; + key?: string; +}; + +const SHARED_CODEX_APP_SERVER_CLIENT_STATE = Symbol.for("openclaw.codexAppServerClientState"); + +function getSharedCodexAppServerClientState(): SharedCodexAppServerClientState { + const globalState = globalThis as typeof globalThis & { + [SHARED_CODEX_APP_SERVER_CLIENT_STATE]?: SharedCodexAppServerClientState; + }; + globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE] ??= {}; + return globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE]; +} export async function getSharedCodexAppServerClient(options?: { startOptions?: CodexAppServerStartOptions; }): Promise { + const state = getSharedCodexAppServerClientState(); const startOptions = options?.startOptions ?? resolveCodexAppServerRuntimeOptions().start; const key = codexAppServerStartOptionsKey(startOptions); - if (sharedClientKey && sharedClientKey !== key) { + if (state.key && state.key !== key) { clearSharedCodexAppServerClient(); } - sharedClientKey = key; - sharedClientPromise ??= (async () => { + state.key = key; + state.promise ??= (async () => { const client = CodexAppServerClient.start(startOptions); - sharedClient = client; + state.client = client; client.addCloseHandler(clearSharedClientIfCurrent); try { await client.initialize(); @@ -33,34 +46,37 @@ export async function getSharedCodexAppServerClient(options?: { } })(); try { - return await sharedClientPromise; + return await state.promise; } catch (error) { - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; + state.client = undefined; + state.promise = undefined; + state.key = undefined; throw error; } } export function resetSharedCodexAppServerClientForTests(): void { - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; + const state = getSharedCodexAppServerClientState(); + state.client = undefined; + state.promise = undefined; + state.key = undefined; } export function clearSharedCodexAppServerClient(): void { - const client = sharedClient; - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; + const state = getSharedCodexAppServerClientState(); + const client = state.client; + state.client = undefined; + state.promise = undefined; + state.key = undefined; client?.close(); } function clearSharedClientIfCurrent(client: CodexAppServerClient): void { - if (sharedClient !== client) { + const state = getSharedCodexAppServerClientState(); + if (state.client !== client) { return; } - sharedClient = undefined; - sharedClientPromise = undefined; - sharedClientKey = undefined; + state.client = undefined; + state.promise = undefined; + state.key = undefined; } diff --git a/extensions/codex/src/app-server/transport-stdio.ts b/extensions/codex/src/app-server/transport-stdio.ts index c9c5feb72b0..17bc84da453 100644 --- a/extensions/codex/src/app-server/transport-stdio.ts +++ b/extensions/codex/src/app-server/transport-stdio.ts @@ -5,6 +5,7 @@ import type { CodexAppServerTransport } from "./transport.js"; export function createStdioTransport(options: CodexAppServerStartOptions): CodexAppServerTransport { return spawn(options.command, options.args, { env: process.env, + detached: process.platform !== "win32", stdio: ["pipe", "pipe", "pipe"], }); } diff --git a/extensions/codex/src/app-server/transport.ts b/extensions/codex/src/app-server/transport.ts index d7d14187bf2..423d7f0c474 100644 --- a/extensions/codex/src/app-server/transport.ts +++ b/extensions/codex/src/app-server/transport.ts @@ -1,8 +1,72 @@ export type CodexAppServerTransport = { - stdin: { write: (data: string) => unknown }; - stdout: NodeJS.ReadableStream; - stderr: NodeJS.ReadableStream; + stdin: { + write: (data: string) => unknown; + end?: () => unknown; + destroy?: () => unknown; + unref?: () => unknown; + }; + stdout: NodeJS.ReadableStream & { + destroy?: () => unknown; + unref?: () => unknown; + }; + stderr: NodeJS.ReadableStream & { + destroy?: () => unknown; + unref?: () => unknown; + }; + pid?: number; + exitCode?: number | null; + signalCode?: string | null; killed?: boolean; - kill?: () => unknown; + kill?: (signal?: NodeJS.Signals) => unknown; + unref?: () => unknown; once: (event: string, listener: (...args: unknown[]) => void) => unknown; }; + +export function closeCodexAppServerTransport( + child: CodexAppServerTransport, + options: { forceKillDelayMs?: number } = {}, +): void { + child.stdout.destroy?.(); + child.stderr.destroy?.(); + child.stdin.end?.(); + child.stdin.destroy?.(); + signalCodexAppServerTransport(child, "SIGTERM"); + const forceKillDelayMs = options.forceKillDelayMs ?? 1_000; + const forceKill = setTimeout( + () => { + if (hasCodexAppServerTransportExited(child)) { + return; + } + signalCodexAppServerTransport(child, "SIGKILL"); + }, + Math.max(1, forceKillDelayMs), + ); + forceKill.unref?.(); + child.once("exit", () => clearTimeout(forceKill)); + child.unref?.(); + child.stdout.unref?.(); + child.stderr.unref?.(); + child.stdin.unref?.(); +} + +function hasCodexAppServerTransportExited(child: CodexAppServerTransport): boolean { + return child.exitCode !== null && child.exitCode !== undefined + ? true + : child.signalCode !== null && child.signalCode !== undefined; +} + +function signalCodexAppServerTransport( + child: CodexAppServerTransport, + signal: NodeJS.Signals, +): void { + if (child.pid && process.platform !== "win32") { + try { + process.kill(-child.pid, signal); + return; + } catch { + // Fall back to the child handle. The process may already be gone or not + // be a process-group leader on older call sites. + } + } + child.kill?.(signal); +} diff --git a/src/agents/harness/index.ts b/src/agents/harness/index.ts index 45369279449..6a1d5e9e3af 100644 --- a/src/agents/harness/index.ts +++ b/src/agents/harness/index.ts @@ -1,5 +1,6 @@ export { clearAgentHarnesses, + disposeRegisteredAgentHarnesses, getAgentHarness, getRegisteredAgentHarness, listAgentHarnessIds, diff --git a/src/agents/harness/registry.test.ts b/src/agents/harness/registry.test.ts index db2d2c2d014..07b14f09bff 100644 --- a/src/agents/harness/registry.test.ts +++ b/src/agents/harness/registry.test.ts @@ -1,6 +1,7 @@ -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { clearAgentHarnesses, + disposeRegisteredAgentHarnesses, getAgentHarness, getRegisteredAgentHarness, listAgentHarnessIds, @@ -96,6 +97,18 @@ describe("agent harness registry", () => { ]); }); + it("disposes registered harness runtime state", async () => { + const dispose = vi.fn(async () => undefined); + registerAgentHarness({ + ...makeHarness("custom"), + dispose, + }); + + await disposeRegisteredAgentHarnesses(); + + expect(dispose).toHaveBeenCalledTimes(1); + }); + it("keeps model-specific harnesses behind plugin registration in auto mode", () => { process.env.OPENCLAW_AGENT_RUNTIME = "auto"; diff --git a/src/agents/harness/registry.ts b/src/agents/harness/registry.ts index c5541cce7c0..695f6096a37 100644 --- a/src/agents/harness/registry.ts +++ b/src/agents/harness/registry.ts @@ -80,3 +80,21 @@ export async function resetRegisteredAgentHarnessSessions( }), ); } + +export async function disposeRegisteredAgentHarnesses(): Promise { + await Promise.all( + listRegisteredAgentHarnesses().map(async (entry) => { + if (!entry.harness.dispose) { + return; + } + try { + await entry.harness.dispose(); + } catch (error) { + log.warn(`${entry.harness.label} dispose hook failed`, { + harnessId: entry.harness.id, + error, + }); + } + }), + ); +} diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index b325032a479..823fc1bf047 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const mocks = { logWarn: vi.fn(), + disposeAgentHarnesses: vi.fn(async () => undefined), }; const WEBSOCKET_CLOSE_GRACE_MS = 1_000; const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; @@ -14,6 +15,10 @@ vi.mock("../hooks/gmail-watcher.js", () => ({ stopGmailWatcher: vi.fn(async () => undefined), })); +vi.mock("../agents/harness/registry.js", () => ({ + disposeRegisteredAgentHarnesses: mocks.disposeAgentHarnesses, +})); + vi.mock("../logging/subsystem.js", () => ({ createSubsystemLogger: vi.fn(() => ({ warn: mocks.logWarn, @@ -26,6 +31,7 @@ describe("createGatewayCloseHandler", () => { beforeEach(() => { vi.useRealTimers(); mocks.logWarn.mockClear(); + mocks.disposeAgentHarnesses.mockClear(); }); it("unsubscribes lifecycle listeners during shutdown", async () => { @@ -66,6 +72,7 @@ describe("createGatewayCloseHandler", () => { expect(lifecycleUnsub).toHaveBeenCalledTimes(1); expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1); + expect(mocks.disposeAgentHarnesses).toHaveBeenCalledTimes(1); }); it("terminates lingering websocket clients when websocket close exceeds the grace window", async () => { diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 32e7c8b4c26..6457bbe567d 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -1,5 +1,6 @@ import type { Server as HttpServer } from "node:http"; import type { WebSocketServer } from "ws"; +import { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js"; import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { stopGmailWatcher } from "../hooks/gmail-watcher.js"; @@ -98,6 +99,7 @@ export function createGatewayCloseHandler(params: { for (const plugin of listChannelPlugins()) { await params.stopChannel(plugin.id); } + await disposeRegisteredAgentHarnesses(); if (params.pluginServices) { await params.pluginServices.stop().catch(() => {}); } diff --git a/src/plugin-sdk/agent-harness.ts b/src/plugin-sdk/agent-harness.ts index 491879149df..6503f86643e 100644 --- a/src/plugin-sdk/agent-harness.ts +++ b/src/plugin-sdk/agent-harness.ts @@ -47,6 +47,7 @@ export { queueEmbeddedPiMessage as queueAgentHarnessMessage, setActiveEmbeddedRun, } from "../agents/pi-embedded-runner/runs.js"; +export { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js"; export { normalizeProviderToolSchemas } from "../agents/pi-embedded-runner/tool-schema-runtime.js"; export { createOpenClawCodingTools } from "../agents/pi-tools.js"; export { resolveSandboxContext } from "../agents/sandbox.js";