mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 01:31:08 +00:00
fix: dispose codex app-server harnesses
This commit is contained in:
@@ -8,6 +8,7 @@ import type {
|
||||
} from "./src/app-server/models.js";
|
||||
import { runCodexAppServerAttempt } from "./src/app-server/run-attempt.js";
|
||||
import { clearCodexAppServerBinding } from "./src/app-server/session-binding.js";
|
||||
import { clearSharedCodexAppServerClient } from "./src/app-server/shared-client.js";
|
||||
|
||||
const DEFAULT_CODEX_HARNESS_PROVIDER_IDS = new Set(["codex", "openai-codex"]);
|
||||
|
||||
@@ -47,5 +48,8 @@ export function createCodexAppServerAgentHarness(options?: {
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
}
|
||||
},
|
||||
dispose: () => {
|
||||
clearSharedCodexAppServerClient();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ describe("codex plugin", () => {
|
||||
expect(registerAgentHarness.mock.calls[0]?.[0]).toMatchObject({
|
||||
id: "codex",
|
||||
label: "Codex agent harness",
|
||||
dispose: expect.any(Function),
|
||||
});
|
||||
expect(registerCommand.mock.calls[0]?.[0]).toMatchObject({
|
||||
name: "codex",
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { buildCodexProvider, buildCodexProviderCatalog } from "./provider.js";
|
||||
import { CodexAppServerClient } from "./src/app-server/client.js";
|
||||
import { resetSharedCodexAppServerClientForTests } from "./src/app-server/shared-client.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetSharedCodexAppServerClientForTests();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("codex provider", () => {
|
||||
it("maps Codex app-server models to a Codex provider catalog", async () => {
|
||||
@@ -64,17 +71,49 @@ describe("codex provider", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps a static fallback catalog when live discovery is explicitly disabled by env", async () => {
|
||||
const listModels = vi.fn();
|
||||
|
||||
const result = await buildCodexProviderCatalog({
|
||||
env: { OPENCLAW_CODEX_DISCOVERY_LIVE: "0" },
|
||||
listModels,
|
||||
});
|
||||
|
||||
expect(listModels).not.toHaveBeenCalled();
|
||||
expect(result.provider.models.map((model) => model.id)).toEqual([
|
||||
"gpt-5.4",
|
||||
"gpt-5.4-mini",
|
||||
"gpt-5.2",
|
||||
]);
|
||||
});
|
||||
|
||||
it("closes the transient app-server client after live discovery", async () => {
|
||||
const client = {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
request: vi.fn(async () => ({ data: [] })),
|
||||
addCloseHandler: vi.fn(() => () => undefined),
|
||||
close: vi.fn(),
|
||||
} as unknown as CodexAppServerClient;
|
||||
vi.spyOn(CodexAppServerClient, "start").mockReturnValue(client);
|
||||
|
||||
await buildCodexProviderCatalog({
|
||||
env: { OPENCLAW_CODEX_DISCOVERY_LIVE: "1" },
|
||||
});
|
||||
|
||||
expect(client.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("resolves arbitrary Codex app-server model ids through the codex provider", () => {
|
||||
const provider = buildCodexProvider();
|
||||
|
||||
const model = provider.resolveDynamicModel?.({
|
||||
provider: "codex",
|
||||
modelId: " arcanine ",
|
||||
modelId: " custom-model ",
|
||||
modelRegistry: { find: () => null },
|
||||
} as never);
|
||||
|
||||
expect(model).toMatchObject({
|
||||
id: "arcanine",
|
||||
id: "custom-model",
|
||||
provider: "codex",
|
||||
api: "openai-codex-responses",
|
||||
baseUrl: "https://chatgpt.com/backend-api",
|
||||
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
readCodexPluginConfig,
|
||||
resolveCodexAppServerRuntimeOptions,
|
||||
} from "./src/app-server/config.js";
|
||||
import { clearSharedCodexAppServerClient } from "./src/app-server/shared-client.js";
|
||||
|
||||
const PROVIDER_ID = "codex";
|
||||
const CODEX_BASE_URL = "https://chatgpt.com/backend-api";
|
||||
@@ -99,14 +100,18 @@ export async function buildCodexProviderCatalog(
|
||||
const config = readCodexPluginConfig(options.pluginConfig);
|
||||
const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig: options.pluginConfig });
|
||||
const timeoutMs = normalizeTimeoutMs(config.discovery?.timeoutMs);
|
||||
const discovered =
|
||||
config.discovery?.enabled === false || shouldSkipLiveDiscovery(options.env)
|
||||
? []
|
||||
: await listModelsBestEffort({
|
||||
listModels: options.listModels ?? listCodexAppServerModels,
|
||||
timeoutMs,
|
||||
startOptions: appServer.start,
|
||||
});
|
||||
let discovered: CodexAppServerModel[] = [];
|
||||
if (config.discovery?.enabled !== false && !shouldSkipLiveDiscovery(options.env)) {
|
||||
try {
|
||||
discovered = await listModelsBestEffort({
|
||||
listModels: options.listModels ?? listCodexAppServerModels,
|
||||
timeoutMs,
|
||||
startOptions: appServer.start,
|
||||
});
|
||||
} finally {
|
||||
clearSharedCodexAppServerClient();
|
||||
}
|
||||
}
|
||||
const models = (discovered.length > 0 ? discovered : FALLBACK_CODEX_MODELS).map(
|
||||
codexModelToDefinition,
|
||||
);
|
||||
@@ -189,7 +194,11 @@ function normalizeTimeoutMs(value: unknown): number {
|
||||
}
|
||||
|
||||
function shouldSkipLiveDiscovery(env: NodeJS.ProcessEnv = process.env): boolean {
|
||||
return Boolean(env.VITEST) && env[LIVE_DISCOVERY_ENV] !== "1";
|
||||
const override = env[LIVE_DISCOVERY_ENV]?.trim().toLowerCase();
|
||||
if (override === "0" || override === "false") {
|
||||
return true;
|
||||
}
|
||||
return Boolean(env.VITEST) && override !== "1";
|
||||
}
|
||||
|
||||
function shouldDefaultToReasoningModel(modelId: string): boolean {
|
||||
|
||||
@@ -2,6 +2,7 @@ import { EventEmitter } from "node:events";
|
||||
import { PassThrough, Writable } from "node:stream";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
__testing,
|
||||
CodexAppServerClient,
|
||||
CodexAppServerRpcError,
|
||||
MIN_CODEX_APP_SERVER_VERSION,
|
||||
@@ -47,6 +48,7 @@ describe("CodexAppServerClient", () => {
|
||||
afterEach(() => {
|
||||
resetSharedCodexAppServerClientForTests();
|
||||
vi.restoreAllMocks();
|
||||
vi.useRealTimers();
|
||||
for (const client of clients) {
|
||||
client.close();
|
||||
}
|
||||
@@ -141,6 +143,30 @@ describe("CodexAppServerClient", () => {
|
||||
expect(harness.writes).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("force-stops app-server transports that ignore the graceful signal", async () => {
|
||||
vi.useFakeTimers();
|
||||
const process = Object.assign(new EventEmitter(), {
|
||||
stdin: {
|
||||
write: vi.fn(),
|
||||
end: vi.fn(),
|
||||
destroy: vi.fn(),
|
||||
unref: vi.fn(),
|
||||
},
|
||||
stdout: Object.assign(new PassThrough(), { unref: vi.fn() }),
|
||||
stderr: Object.assign(new PassThrough(), { unref: vi.fn() }),
|
||||
exitCode: null,
|
||||
signalCode: null,
|
||||
kill: vi.fn(),
|
||||
unref: vi.fn(),
|
||||
});
|
||||
|
||||
__testing.closeCodexAppServerTransport(process, { forceKillDelayMs: 25 });
|
||||
|
||||
expect(process.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
expect(process.kill).toHaveBeenCalledWith("SIGKILL");
|
||||
expect(process.unref).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
it("reads the Codex version from the app-server user agent", () => {
|
||||
expect(readCodexVersionFromUserAgent("openclaw/0.118.0 (macOS; test)")).toBe("0.118.0");
|
||||
expect(readCodexVersionFromUserAgent("codex_cli_rs/0.118.1-dev (linux; test)")).toBe(
|
||||
|
||||
@@ -12,7 +12,7 @@ import {
|
||||
} from "./protocol.js";
|
||||
import { createStdioTransport } from "./transport-stdio.js";
|
||||
import { createWebSocketTransport } from "./transport-websocket.js";
|
||||
import type { CodexAppServerTransport } from "./transport.js";
|
||||
import { closeCodexAppServerTransport, type CodexAppServerTransport } from "./transport.js";
|
||||
|
||||
export const MIN_CODEX_APP_SERVER_VERSION = "0.118.0";
|
||||
|
||||
@@ -149,11 +149,13 @@ export class CodexAppServerClient {
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
this.lines.close();
|
||||
if (!this.child.killed) {
|
||||
this.child.kill?.();
|
||||
}
|
||||
this.rejectPendingRequests(new Error("codex app-server client is closed"));
|
||||
closeCodexAppServerTransport(this.child);
|
||||
}
|
||||
|
||||
private writeMessage(message: RpcRequest | RpcResponse): void {
|
||||
@@ -245,6 +247,10 @@ export class CodexAppServerClient {
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
this.rejectPendingRequests(error);
|
||||
}
|
||||
|
||||
private rejectPendingRequests(error: Error): void {
|
||||
for (const pending of this.pending.values()) {
|
||||
pending.reject(error);
|
||||
}
|
||||
@@ -354,3 +360,7 @@ function formatExitValue(value: unknown): string {
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
closeCodexAppServerTransport,
|
||||
} as const;
|
||||
|
||||
@@ -5,22 +5,35 @@ import {
|
||||
type CodexAppServerStartOptions,
|
||||
} from "./config.js";
|
||||
|
||||
let sharedClient: CodexAppServerClient | undefined;
|
||||
let sharedClientPromise: Promise<CodexAppServerClient> | undefined;
|
||||
let sharedClientKey: string | undefined;
|
||||
type SharedCodexAppServerClientState = {
|
||||
client?: CodexAppServerClient;
|
||||
promise?: Promise<CodexAppServerClient>;
|
||||
key?: string;
|
||||
};
|
||||
|
||||
const SHARED_CODEX_APP_SERVER_CLIENT_STATE = Symbol.for("openclaw.codexAppServerClientState");
|
||||
|
||||
function getSharedCodexAppServerClientState(): SharedCodexAppServerClientState {
|
||||
const globalState = globalThis as typeof globalThis & {
|
||||
[SHARED_CODEX_APP_SERVER_CLIENT_STATE]?: SharedCodexAppServerClientState;
|
||||
};
|
||||
globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE] ??= {};
|
||||
return globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE];
|
||||
}
|
||||
|
||||
export async function getSharedCodexAppServerClient(options?: {
|
||||
startOptions?: CodexAppServerStartOptions;
|
||||
}): Promise<CodexAppServerClient> {
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
const startOptions = options?.startOptions ?? resolveCodexAppServerRuntimeOptions().start;
|
||||
const key = codexAppServerStartOptionsKey(startOptions);
|
||||
if (sharedClientKey && sharedClientKey !== key) {
|
||||
if (state.key && state.key !== key) {
|
||||
clearSharedCodexAppServerClient();
|
||||
}
|
||||
sharedClientKey = key;
|
||||
sharedClientPromise ??= (async () => {
|
||||
state.key = key;
|
||||
state.promise ??= (async () => {
|
||||
const client = CodexAppServerClient.start(startOptions);
|
||||
sharedClient = client;
|
||||
state.client = client;
|
||||
client.addCloseHandler(clearSharedClientIfCurrent);
|
||||
try {
|
||||
await client.initialize();
|
||||
@@ -33,34 +46,37 @@ export async function getSharedCodexAppServerClient(options?: {
|
||||
}
|
||||
})();
|
||||
try {
|
||||
return await sharedClientPromise;
|
||||
return await state.promise;
|
||||
} catch (error) {
|
||||
sharedClient = undefined;
|
||||
sharedClientPromise = undefined;
|
||||
sharedClientKey = undefined;
|
||||
state.client = undefined;
|
||||
state.promise = undefined;
|
||||
state.key = undefined;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export function resetSharedCodexAppServerClientForTests(): void {
|
||||
sharedClient = undefined;
|
||||
sharedClientPromise = undefined;
|
||||
sharedClientKey = undefined;
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
state.client = undefined;
|
||||
state.promise = undefined;
|
||||
state.key = undefined;
|
||||
}
|
||||
|
||||
export function clearSharedCodexAppServerClient(): void {
|
||||
const client = sharedClient;
|
||||
sharedClient = undefined;
|
||||
sharedClientPromise = undefined;
|
||||
sharedClientKey = undefined;
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
const client = state.client;
|
||||
state.client = undefined;
|
||||
state.promise = undefined;
|
||||
state.key = undefined;
|
||||
client?.close();
|
||||
}
|
||||
|
||||
function clearSharedClientIfCurrent(client: CodexAppServerClient): void {
|
||||
if (sharedClient !== client) {
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
if (state.client !== client) {
|
||||
return;
|
||||
}
|
||||
sharedClient = undefined;
|
||||
sharedClientPromise = undefined;
|
||||
sharedClientKey = undefined;
|
||||
state.client = undefined;
|
||||
state.promise = undefined;
|
||||
state.key = undefined;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import type { CodexAppServerTransport } from "./transport.js";
|
||||
export function createStdioTransport(options: CodexAppServerStartOptions): CodexAppServerTransport {
|
||||
return spawn(options.command, options.args, {
|
||||
env: process.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,8 +1,72 @@
|
||||
export type CodexAppServerTransport = {
|
||||
stdin: { write: (data: string) => unknown };
|
||||
stdout: NodeJS.ReadableStream;
|
||||
stderr: NodeJS.ReadableStream;
|
||||
stdin: {
|
||||
write: (data: string) => unknown;
|
||||
end?: () => unknown;
|
||||
destroy?: () => unknown;
|
||||
unref?: () => unknown;
|
||||
};
|
||||
stdout: NodeJS.ReadableStream & {
|
||||
destroy?: () => unknown;
|
||||
unref?: () => unknown;
|
||||
};
|
||||
stderr: NodeJS.ReadableStream & {
|
||||
destroy?: () => unknown;
|
||||
unref?: () => unknown;
|
||||
};
|
||||
pid?: number;
|
||||
exitCode?: number | null;
|
||||
signalCode?: string | null;
|
||||
killed?: boolean;
|
||||
kill?: () => unknown;
|
||||
kill?: (signal?: NodeJS.Signals) => unknown;
|
||||
unref?: () => unknown;
|
||||
once: (event: string, listener: (...args: unknown[]) => void) => unknown;
|
||||
};
|
||||
|
||||
export function closeCodexAppServerTransport(
|
||||
child: CodexAppServerTransport,
|
||||
options: { forceKillDelayMs?: number } = {},
|
||||
): void {
|
||||
child.stdout.destroy?.();
|
||||
child.stderr.destroy?.();
|
||||
child.stdin.end?.();
|
||||
child.stdin.destroy?.();
|
||||
signalCodexAppServerTransport(child, "SIGTERM");
|
||||
const forceKillDelayMs = options.forceKillDelayMs ?? 1_000;
|
||||
const forceKill = setTimeout(
|
||||
() => {
|
||||
if (hasCodexAppServerTransportExited(child)) {
|
||||
return;
|
||||
}
|
||||
signalCodexAppServerTransport(child, "SIGKILL");
|
||||
},
|
||||
Math.max(1, forceKillDelayMs),
|
||||
);
|
||||
forceKill.unref?.();
|
||||
child.once("exit", () => clearTimeout(forceKill));
|
||||
child.unref?.();
|
||||
child.stdout.unref?.();
|
||||
child.stderr.unref?.();
|
||||
child.stdin.unref?.();
|
||||
}
|
||||
|
||||
function hasCodexAppServerTransportExited(child: CodexAppServerTransport): boolean {
|
||||
return child.exitCode !== null && child.exitCode !== undefined
|
||||
? true
|
||||
: child.signalCode !== null && child.signalCode !== undefined;
|
||||
}
|
||||
|
||||
function signalCodexAppServerTransport(
|
||||
child: CodexAppServerTransport,
|
||||
signal: NodeJS.Signals,
|
||||
): void {
|
||||
if (child.pid && process.platform !== "win32") {
|
||||
try {
|
||||
process.kill(-child.pid, signal);
|
||||
return;
|
||||
} catch {
|
||||
// Fall back to the child handle. The process may already be gone or not
|
||||
// be a process-group leader on older call sites.
|
||||
}
|
||||
}
|
||||
child.kill?.(signal);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
export {
|
||||
clearAgentHarnesses,
|
||||
disposeRegisteredAgentHarnesses,
|
||||
getAgentHarness,
|
||||
getRegisteredAgentHarness,
|
||||
listAgentHarnessIds,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
clearAgentHarnesses,
|
||||
disposeRegisteredAgentHarnesses,
|
||||
getAgentHarness,
|
||||
getRegisteredAgentHarness,
|
||||
listAgentHarnessIds,
|
||||
@@ -96,6 +97,18 @@ describe("agent harness registry", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("disposes registered harness runtime state", async () => {
|
||||
const dispose = vi.fn(async () => undefined);
|
||||
registerAgentHarness({
|
||||
...makeHarness("custom"),
|
||||
dispose,
|
||||
});
|
||||
|
||||
await disposeRegisteredAgentHarnesses();
|
||||
|
||||
expect(dispose).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("keeps model-specific harnesses behind plugin registration in auto mode", () => {
|
||||
process.env.OPENCLAW_AGENT_RUNTIME = "auto";
|
||||
|
||||
|
||||
@@ -80,3 +80,21 @@ export async function resetRegisteredAgentHarnessSessions(
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
export async function disposeRegisteredAgentHarnesses(): Promise<void> {
|
||||
await Promise.all(
|
||||
listRegisteredAgentHarnesses().map(async (entry) => {
|
||||
if (!entry.harness.dispose) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await entry.harness.dispose();
|
||||
} catch (error) {
|
||||
log.warn(`${entry.harness.label} dispose hook failed`, {
|
||||
harnessId: entry.harness.id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const mocks = {
|
||||
logWarn: vi.fn(),
|
||||
disposeAgentHarnesses: vi.fn(async () => undefined),
|
||||
};
|
||||
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
|
||||
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
|
||||
@@ -14,6 +15,10 @@ vi.mock("../hooks/gmail-watcher.js", () => ({
|
||||
stopGmailWatcher: vi.fn(async () => undefined),
|
||||
}));
|
||||
|
||||
vi.mock("../agents/harness/registry.js", () => ({
|
||||
disposeRegisteredAgentHarnesses: mocks.disposeAgentHarnesses,
|
||||
}));
|
||||
|
||||
vi.mock("../logging/subsystem.js", () => ({
|
||||
createSubsystemLogger: vi.fn(() => ({
|
||||
warn: mocks.logWarn,
|
||||
@@ -26,6 +31,7 @@ describe("createGatewayCloseHandler", () => {
|
||||
beforeEach(() => {
|
||||
vi.useRealTimers();
|
||||
mocks.logWarn.mockClear();
|
||||
mocks.disposeAgentHarnesses.mockClear();
|
||||
});
|
||||
|
||||
it("unsubscribes lifecycle listeners during shutdown", async () => {
|
||||
@@ -66,6 +72,7 @@ describe("createGatewayCloseHandler", () => {
|
||||
|
||||
expect(lifecycleUnsub).toHaveBeenCalledTimes(1);
|
||||
expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.disposeAgentHarnesses).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("terminates lingering websocket clients when websocket close exceeds the grace window", async () => {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { Server as HttpServer } from "node:http";
|
||||
import type { WebSocketServer } from "ws";
|
||||
import { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js";
|
||||
import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js";
|
||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||
@@ -98,6 +99,7 @@ export function createGatewayCloseHandler(params: {
|
||||
for (const plugin of listChannelPlugins()) {
|
||||
await params.stopChannel(plugin.id);
|
||||
}
|
||||
await disposeRegisteredAgentHarnesses();
|
||||
if (params.pluginServices) {
|
||||
await params.pluginServices.stop().catch(() => {});
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ export {
|
||||
queueEmbeddedPiMessage as queueAgentHarnessMessage,
|
||||
setActiveEmbeddedRun,
|
||||
} from "../agents/pi-embedded-runner/runs.js";
|
||||
export { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js";
|
||||
export { normalizeProviderToolSchemas } from "../agents/pi-embedded-runner/tool-schema-runtime.js";
|
||||
export { createOpenClawCodingTools } from "../agents/pi-tools.js";
|
||||
export { resolveSandboxContext } from "../agents/sandbox.js";
|
||||
|
||||
Reference in New Issue
Block a user