diff --git a/extensions/codex/src/app-server/client-factory.ts b/extensions/codex/src/app-server/client-factory.ts index e3885c47bb7..8b042d8d4bf 100644 --- a/extensions/codex/src/app-server/client-factory.ts +++ b/extensions/codex/src/app-server/client-factory.ts @@ -22,3 +22,13 @@ export const defaultCodexAppServerClientFactory: CodexAppServerClientFactory = ( import("./shared-client.js").then(({ getSharedCodexAppServerClient }) => getSharedCodexAppServerClient({ startOptions, authProfileId, agentDir, config }), ); + +export const defaultLeasedCodexAppServerClientFactory: CodexAppServerClientFactory = ( + startOptions, + authProfileId, + agentDir, + config, +) => + import("./shared-client.js").then(({ getLeasedSharedCodexAppServerClient }) => + getLeasedSharedCodexAppServerClient({ startOptions, authProfileId, agentDir, config }), + ); diff --git a/extensions/codex/src/app-server/models.ts b/extensions/codex/src/app-server/models.ts index 337caeb27e9..2836b4c740c 100644 --- a/extensions/codex/src/app-server/models.ts +++ b/extensions/codex/src/app-server/models.ts @@ -74,10 +74,13 @@ async function withCodexAppServerModelClient( ): Promise { const timeoutMs = options.timeoutMs ?? 2500; const useSharedClient = options.sharedClient !== false; - const { createIsolatedCodexAppServerClient, getSharedCodexAppServerClient } = - await import("./shared-client.js"); + const { + createIsolatedCodexAppServerClient, + getLeasedSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient, + } = await import("./shared-client.js"); const client = useSharedClient - ? await getSharedCodexAppServerClient({ + ? await getLeasedSharedCodexAppServerClient({ startOptions: options.startOptions, timeoutMs, authProfileId: options.authProfileId, @@ -94,7 +97,9 @@ async function withCodexAppServerModelClient( try { return await run({ client, timeoutMs }); } finally { - if (!useSharedClient) { + if (useSharedClient) { + releaseLeasedSharedCodexAppServerClient(client); + } else { client.close(); } } diff --git a/extensions/codex/src/app-server/request.test.ts b/extensions/codex/src/app-server/request.test.ts index 73ebde1bbd1..41497747c8f 100644 --- a/extensions/codex/src/app-server/request.test.ts +++ b/extensions/codex/src/app-server/request.test.ts @@ -5,7 +5,11 @@ const sharedClientMocks = vi.hoisted(() => ({ getSharedCodexAppServerClient: vi.fn(), })); -vi.mock("./shared-client.js", () => sharedClientMocks); +vi.mock("./shared-client.js", () => ({ + ...sharedClientMocks, + getLeasedSharedCodexAppServerClient: sharedClientMocks.getSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient: vi.fn(), +})); const { requestCodexAppServerJson } = await import("./request.js"); diff --git a/extensions/codex/src/app-server/request.ts b/extensions/codex/src/app-server/request.ts index d07db846d67..7a0a892bd12 100644 --- a/extensions/codex/src/app-server/request.ts +++ b/extensions/codex/src/app-server/request.ts @@ -9,7 +9,8 @@ import type { import { resolveCodexAppServerDirectSandboxBypassBlock } from "./sandbox-guard.js"; import { createIsolatedCodexAppServerClient, - getSharedCodexAppServerClient, + getLeasedSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient, } from "./shared-client.js"; import { withTimeout } from "./timeout.js"; @@ -63,7 +64,7 @@ export async function requestCodexAppServerJson(param return await withTimeout( (async () => { const client = await ( - params.isolated ? createIsolatedCodexAppServerClient : getSharedCodexAppServerClient + params.isolated ? createIsolatedCodexAppServerClient : getLeasedSharedCodexAppServerClient )({ startOptions: params.startOptions, timeoutMs, @@ -81,6 +82,8 @@ export async function requestCodexAppServerJson(param // underlying codex binary, so the unref'd close() path can leave // the child running and keep the parent's event loop alive. await client.closeAndWait({ exitTimeoutMs: 2_000, forceKillDelayMs: 250 }); + } else { + releaseLeasedSharedCodexAppServerClient(client); } } })(), diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index e230e781886..d220a512193 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -4160,7 +4160,7 @@ describe("runCodexAppServerAttempt", () => { }); }); - it("interrupts but keeps the app-server client alive when a turn timeout fires", async () => { + it("unsubscribes and closes the app-server client when the active turn goes idle past the attempt timeout", async () => { const close = vi.fn(); const request = vi.fn(async (method: string) => { if (method === "thread/start") { @@ -4204,7 +4204,14 @@ describe("runCodexAppServerAttempt", () => { }, { timeoutMs: 5_000 }, ); - expect(close).not.toHaveBeenCalled(); + expect(request).toHaveBeenCalledWith( + "thread/unsubscribe", + { + threadId: "thread-1", + }, + { timeoutMs: 5_000 }, + ); + expect(close).toHaveBeenCalledTimes(1); expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false); }); @@ -5562,9 +5569,13 @@ describe("runCodexAppServerAttempt", () => { ), { interval: 1 }, ); - expect(warn).not.toHaveBeenCalledWith( + expect(warn).toHaveBeenCalledWith( "codex app-server client retired after timed-out turn", - expect.anything(), + expect.objectContaining({ + reason: "turn_completion_idle_timeout", + threadId: "thread-1", + turnId: "turn-1", + }), ); }); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index e99540f079a..5ff6a20b2f6 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -73,7 +73,7 @@ import { } from "./auth-bridge.js"; import { CODEX_CONTROL_METHODS } from "./capabilities.js"; import { - defaultCodexAppServerClientFactory, + defaultLeasedCodexAppServerClientFactory, type CodexAppServerClientFactory, } from "./client-factory.js"; import { @@ -170,7 +170,11 @@ import { type CodexAppServerThreadBinding, } from "./session-binding.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; -import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js"; +import { + clearSharedCodexAppServerClientIfCurrent, + releaseLeasedSharedCodexAppServerClient, + retireSharedCodexAppServerClientIfCurrent, +} from "./shared-client.js"; import { areCodexDynamicToolFingerprintsCompatible, buildDeveloperInstructions, @@ -1054,7 +1058,7 @@ export async function runCodexAppServerAttempt( const preDynamicStartupStages = createCodexDynamicToolBuildStageTracker({ enabled: profilerEnabled, }); - const attemptClientFactory = options.clientFactory ?? defaultCodexAppServerClientFactory; + const attemptClientFactory = options.clientFactory ?? defaultLeasedCodexAppServerClientFactory; const pluginConfig = readCodexPluginConfig(options.pluginConfig); const computerUseConfig = resolveCodexComputerUseConfig({ pluginConfig }); const configuredAppServer = resolveCodexAppServerRuntimeOptions({ pluginConfig }); @@ -1492,6 +1496,7 @@ export async function runCodexAppServerAttempt( let thread: CodexAppServerThreadLifecycleBinding; let trajectoryEndRecorded = false; let nativeHookRelay: NativeHookRelayRegistrationHandle | undefined; + let releaseSharedClientLease: (() => void) | undefined; let startupClientForCleanup: CodexAppServerClient | undefined; let sandboxExecEnvironmentAcquired = false; const releaseSandboxExecEnvironment = async () => { @@ -1605,134 +1610,150 @@ export async function runCodexAppServerAttempt( operation: async () => { let attemptedClient: CodexAppServerClient | undefined; const startupAttempt = async () => { - const startupClient = await attemptClientFactory( - appServer.start, - startupAuthProfileId, - agentDir, - params.config, - ); - attemptedClient = startupClient; - startupClientForCleanup = startupClient; - await ensureCodexComputerUse({ - client: startupClient, - pluginConfig, - timeoutMs: appServer.requestTimeoutMs, - signal: runAbortController.signal, - }); - let startupSandboxEnvironment: CodexSandboxExecEnvironment | undefined; - let startupSandboxEnvironmentAcquired = false; - const releaseStartupSandboxEnvironment = async () => { - if (startupSandboxEnvironmentAcquired) { - startupSandboxEnvironmentAcquired = false; - await releaseCodexSandboxExecServerEnvironment(sandbox); - } - }; - releaseStartupResourcesOnTimeout = releaseStartupSandboxEnvironment; + let startupClientLease: (() => void) | undefined; + let startupAttemptSucceeded = false; try { - startupSandboxEnvironment = shouldRequireCodexSandboxExecServerEnvironment({ - sandbox, - nativeToolSurfaceEnabled, - sandboxExecServerEnabled, - }) - ? await ensureCodexSandboxExecServerEnvironment({ - client: startupClient, - sandbox: sandbox ?? null, - appServerStartOptions: appServer.start, - timeoutMs: appServer.requestTimeoutMs, - signal: runAbortController.signal, - }) - : undefined; - startupSandboxEnvironmentAcquired = Boolean(startupSandboxEnvironment); - if (runAbortController.signal.aborted) { - await releaseStartupSandboxEnvironment(); - throw new Error("codex app-server startup aborted"); - } - if ( - sandbox?.enabled && - nativeToolSurfaceEnabled && - sandboxExecServerEnabled && - !startupSandboxEnvironment - ) { - throw new Error( - "Codex app-server did not register an OpenClaw sandbox exec-server environment.", - ); - } - } catch (error) { - await releaseStartupSandboxEnvironment(); - throw error; - } - const startupEnvironmentSelection = resolveCodexSandboxEnvironmentSelection( - startupSandboxEnvironment, - nativeToolSurfaceEnabled, - ); - const startupExecutionCwd = resolveCodexAppServerExecutionCwd({ - effectiveWorkspace, - environment: startupSandboxEnvironment, - nativeToolSurfaceEnabled, - }); - const startupSandboxPolicy = startupSandboxEnvironment - ? resolveCodexExternalSandboxPolicyForOpenClawSandbox(sandbox) - : undefined; - const buildThreadLifecycleParams = () => - ({ - client: startupClient, - params: buildActiveRunAttemptParams(), - agentId: sessionAgentId, - cwd: startupExecutionCwd, - dynamicTools: toolBridge.specs, - appServer: pluginAppServer, - developerInstructions: promptBuild.developerInstructions, - config: threadConfig, - finalConfigPatch: nativeHookRelayConfig, - nativeCodeModeEnabled: nativeToolSurfaceEnabled, - nativeCodeModeOnlyEnabled: appServer.codeModeOnly, - userMcpServersEnabled: nativeToolSurfaceEnabled, - mcpServersFingerprint: bundleMcpThreadConfig.fingerprint, - mcpServersFingerprintEvaluated: bundleMcpThreadConfig.evaluated, - environmentSelection: startupEnvironmentSelection, - contextEngineProjection, - pluginThreadConfig: pluginThreadConfigRequired - ? { - enabled: true, - inputFingerprint: pluginThreadConfigInputFingerprint, - enabledPluginConfigKeys, - build: () => - buildCodexPluginThreadConfig({ - pluginConfig: pluginThreadConfigPluginConfig, - request: (method, requestParams) => - startupClient.request(method, requestParams, { - timeoutMs: appServer.requestTimeoutMs, - signal: runAbortController.signal, - }), - appCache: defaultCodexAppInventoryCache, - appCacheKey: pluginAppCacheKey, - }), - } - : undefined, - }) satisfies Parameters[0]; - try { - restartContextEngineCodexThread = () => - startOrResumeThread(buildThreadLifecycleParams()); - const startupThread = await startOrResumeThread(buildThreadLifecycleParams()); - if (runAbortController.signal.aborted) { - await releaseStartupSandboxEnvironment(); - throw new Error("codex app-server startup aborted"); - } - startupSandboxEnvironmentAcquired = false; - return { - client: startupClient, - thread: startupThread, - sandboxEnvironment: startupSandboxEnvironment, - environmentSelection: startupEnvironmentSelection, - executionCwd: startupExecutionCwd, - sandboxPolicy: startupSandboxPolicy, + const startupClient = await attemptClientFactory( + appServer.start, + startupAuthProfileId, + agentDir, + params.config, + ); + startupClientLease = () => { + releaseLeasedSharedCodexAppServerClient(startupClient); }; - } catch (error) { - await releaseStartupSandboxEnvironment(); - throw error; + releaseSharedClientLease = startupClientLease; + attemptedClient = startupClient; + startupClientForCleanup = startupClient; + await ensureCodexComputerUse({ + client: startupClient, + pluginConfig, + timeoutMs: appServer.requestTimeoutMs, + signal: runAbortController.signal, + }); + let startupSandboxEnvironment: CodexSandboxExecEnvironment | undefined; + let startupSandboxEnvironmentAcquired = false; + const releaseStartupSandboxEnvironment = async () => { + if (startupSandboxEnvironmentAcquired) { + startupSandboxEnvironmentAcquired = false; + await releaseCodexSandboxExecServerEnvironment(sandbox); + } + }; + releaseStartupResourcesOnTimeout = releaseStartupSandboxEnvironment; + try { + startupSandboxEnvironment = shouldRequireCodexSandboxExecServerEnvironment({ + sandbox, + nativeToolSurfaceEnabled, + sandboxExecServerEnabled, + }) + ? await ensureCodexSandboxExecServerEnvironment({ + client: startupClient, + sandbox: sandbox ?? null, + appServerStartOptions: appServer.start, + timeoutMs: appServer.requestTimeoutMs, + signal: runAbortController.signal, + }) + : undefined; + startupSandboxEnvironmentAcquired = Boolean(startupSandboxEnvironment); + if (runAbortController.signal.aborted) { + await releaseStartupSandboxEnvironment(); + throw new Error("codex app-server startup aborted"); + } + if ( + sandbox?.enabled && + nativeToolSurfaceEnabled && + sandboxExecServerEnabled && + !startupSandboxEnvironment + ) { + throw new Error( + "Codex app-server did not register an OpenClaw sandbox exec-server environment.", + ); + } + } catch (error) { + await releaseStartupSandboxEnvironment(); + throw error; + } + const startupEnvironmentSelection = resolveCodexSandboxEnvironmentSelection( + startupSandboxEnvironment, + nativeToolSurfaceEnabled, + ); + const startupExecutionCwd = resolveCodexAppServerExecutionCwd({ + effectiveWorkspace, + environment: startupSandboxEnvironment, + nativeToolSurfaceEnabled, + }); + const startupSandboxPolicy = startupSandboxEnvironment + ? resolveCodexExternalSandboxPolicyForOpenClawSandbox(sandbox) + : undefined; + const buildThreadLifecycleParams = () => + ({ + client: startupClient, + params: buildActiveRunAttemptParams(), + agentId: sessionAgentId, + cwd: startupExecutionCwd, + dynamicTools: toolBridge.specs, + appServer: pluginAppServer, + developerInstructions: promptBuild.developerInstructions, + config: threadConfig, + finalConfigPatch: nativeHookRelayConfig, + nativeCodeModeEnabled: nativeToolSurfaceEnabled, + nativeCodeModeOnlyEnabled: appServer.codeModeOnly, + userMcpServersEnabled: nativeToolSurfaceEnabled, + mcpServersFingerprint: bundleMcpThreadConfig.fingerprint, + mcpServersFingerprintEvaluated: bundleMcpThreadConfig.evaluated, + environmentSelection: startupEnvironmentSelection, + contextEngineProjection, + pluginThreadConfig: pluginThreadConfigRequired + ? { + enabled: true, + inputFingerprint: pluginThreadConfigInputFingerprint, + enabledPluginConfigKeys, + build: () => + buildCodexPluginThreadConfig({ + pluginConfig: pluginThreadConfigPluginConfig, + request: (method, requestParams) => + startupClient.request(method, requestParams, { + timeoutMs: appServer.requestTimeoutMs, + signal: runAbortController.signal, + }), + appCache: defaultCodexAppInventoryCache, + appCacheKey: pluginAppCacheKey, + }), + } + : undefined, + }) satisfies Parameters[0]; + try { + restartContextEngineCodexThread = () => + startOrResumeThread(buildThreadLifecycleParams()); + const startupThread = await startOrResumeThread(buildThreadLifecycleParams()); + if (runAbortController.signal.aborted) { + await releaseStartupSandboxEnvironment(); + throw new Error("codex app-server startup aborted"); + } + startupSandboxEnvironmentAcquired = false; + startupAttemptSucceeded = true; + return { + client: startupClient, + thread: startupThread, + sandboxEnvironment: startupSandboxEnvironment, + environmentSelection: startupEnvironmentSelection, + executionCwd: startupExecutionCwd, + sandboxPolicy: startupSandboxPolicy, + }; + } catch (error) { + await releaseStartupSandboxEnvironment(); + throw error; + } finally { + if (releaseStartupResourcesOnTimeout === releaseStartupSandboxEnvironment) { + releaseStartupResourcesOnTimeout = undefined; + } + } } finally { - if (releaseStartupResourcesOnTimeout === releaseStartupSandboxEnvironment) { - releaseStartupResourcesOnTimeout = undefined; + if (!startupAttemptSucceeded) { + if (releaseSharedClientLease === startupClientLease) { + releaseSharedClientLease = undefined; + } + startupClientLease?.(); } } }; @@ -3104,6 +3125,8 @@ export async function runCodexAppServerAttempt( }, }); params.abortSignal?.removeEventListener("abort", abortFromUpstream); + releaseSharedClientLease?.(); + releaseSharedClientLease = undefined; if (usageLimitError) { await markCodexAuthProfileBlockedFromRateLimits({ params, @@ -3123,6 +3146,8 @@ export async function runCodexAppServerAttempt( } } if (!turn) { + releaseSharedClientLease?.(); + releaseSharedClientLease = undefined; throw new Error("codex app-server turn/start failed without an error"); } turnId = turn.turn.id; @@ -3229,10 +3254,20 @@ export async function runCodexAppServerAttempt( touchTurnCompletionActivity("turn:start", { attemptProgress: true }); const abortListener = () => { + const shouldRetireClient = timedOut; + if (shouldRetireClient) { + void retireCodexAppServerClientAfterTimedOutTurn(client, { + threadId: thread.threadId, + turnId: activeTurnId, + reason: String(runAbortController.signal.reason ?? "timeout"), + }).finally(() => { + resolveCompletion?.(); + }); + return; + } interruptCodexTurnBestEffort(client, { threadId: thread.threadId, turnId: activeTurnId, - timeoutMs: timedOut ? CODEX_APP_SERVER_INTERRUPT_TIMEOUT_MS : undefined, }); resolveCompletion?.(); }; @@ -3479,6 +3514,7 @@ export async function runCodexAppServerAttempt( notificationCleanup(); requestCleanup(); closeCleanup?.(); + releaseSharedClientLease?.(); if (nativeHookRelay) { if (shouldDelayNativeHookRelayUnregister) { // Codex hook subprocesses can outlive a completed app-server turn by a @@ -4049,6 +4085,51 @@ async function unsubscribeCodexThreadBestEffort( } } +async function retireCodexAppServerClientAfterTimedOutTurn( + client: CodexAppServerClient, + params: { + threadId: string; + turnId: string; + reason: string; + }, +): Promise { + const retiredSharedClient = retireSharedCodexAppServerClientIfCurrent(client); + const detachedSharedClient = Boolean(retiredSharedClient); + interruptCodexTurnBestEffort(client, { + threadId: params.threadId, + turnId: params.turnId, + timeoutMs: CODEX_APP_SERVER_INTERRUPT_TIMEOUT_MS, + }); + await unsubscribeCodexThreadBestEffort(client, { + threadId: params.threadId, + timeoutMs: CODEX_APP_SERVER_UNSUBSCRIBE_TIMEOUT_MS, + }); + let closedClient = retiredSharedClient?.closed ?? false; + if (!detachedSharedClient) { + const close = (client as { close?: () => void }).close; + if (typeof close === "function") { + try { + close.call(client); + closedClient = true; + } catch (error) { + embeddedAgentLog.debug("codex app-server client close failed during timeout cleanup", { + threadId: params.threadId, + turnId: params.turnId, + error, + }); + } + } + } + embeddedAgentLog.warn("codex app-server client retired after timed-out turn", { + threadId: params.threadId, + turnId: params.turnId, + reason: params.reason, + detachedSharedClient, + closedClient, + activeSharedClientLeases: retiredSharedClient?.activeLeases ?? 0, + }); +} + type DynamicToolBuildParams = { params: EmbeddedRunAttemptParams; resolvedWorkspace: string; diff --git a/extensions/codex/src/app-server/shared-client.test.ts b/extensions/codex/src/app-server/shared-client.test.ts index 4c1b86294f7..c8464b38b7b 100644 --- a/extensions/codex/src/app-server/shared-client.test.ts +++ b/extensions/codex/src/app-server/shared-client.test.ts @@ -43,7 +43,12 @@ let clearSharedCodexAppServerClient: typeof import("./shared-client.js").clearSh let clearSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").clearSharedCodexAppServerClientIfCurrent; let clearSharedCodexAppServerClientIfCurrentAndWait: typeof import("./shared-client.js").clearSharedCodexAppServerClientIfCurrentAndWait; let createIsolatedCodexAppServerClient: typeof import("./shared-client.js").createIsolatedCodexAppServerClient; +let detachSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").detachSharedCodexAppServerClientIfCurrent; +let getLeasedSharedCodexAppServerClient: typeof import("./shared-client.js").getLeasedSharedCodexAppServerClient; let getSharedCodexAppServerClient: typeof import("./shared-client.js").getSharedCodexAppServerClient; +let retainSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").retainSharedCodexAppServerClientIfCurrent; +let releaseLeasedSharedCodexAppServerClient: typeof import("./shared-client.js").releaseLeasedSharedCodexAppServerClient; +let retireSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").retireSharedCodexAppServerClientIfCurrent; let resetSharedCodexAppServerClientForTests: typeof import("./shared-client.js").resetSharedCodexAppServerClientForTests; async function sendInitializeResult( @@ -116,7 +121,12 @@ describe("shared Codex app-server client", () => { clearSharedCodexAppServerClientIfCurrent, clearSharedCodexAppServerClientIfCurrentAndWait, createIsolatedCodexAppServerClient, + detachSharedCodexAppServerClientIfCurrent, + getLeasedSharedCodexAppServerClient, getSharedCodexAppServerClient, + retainSharedCodexAppServerClientIfCurrent, + releaseLeasedSharedCodexAppServerClient, + retireSharedCodexAppServerClientIfCurrent, resetSharedCodexAppServerClientForTests, } = await import("./shared-client.js")); }); @@ -316,6 +326,39 @@ describe("shared Codex app-server client", () => { expect(startSpy).toHaveBeenCalledTimes(1); }); + it("preserves keyed shared-client state when adding lease metadata", async () => { + const legacy = createClientHarness(); + const startOptions = { + transport: "websocket" as const, + command: "codex", + args: [], + url: "ws://127.0.0.1:39176", + authToken: "tok-keyed", + headers: {}, + }; + const key = codexAppServerStartOptionsKey(startOptions, { + agentDir: "/tmp/openclaw-agent", + }); + const globalState = globalThis as typeof globalThis & { + [key: symbol]: unknown; + }; + globalState[Symbol.for("openclaw.codexAppServerClientState")] = { + clients: new Map([[key, { client: legacy.client, promise: Promise.resolve(legacy.client) }]]), + }; + + await expect(getLeasedSharedCodexAppServerClient({ startOptions })).resolves.toBe( + legacy.client, + ); + expect(retireSharedCodexAppServerClientIfCurrent(legacy.client)).toEqual({ + activeLeases: 1, + closed: false, + }); + expect(legacy.process.stdin.destroyed).toBe(false); + + expect(releaseLeasedSharedCodexAppServerClient(legacy.client)).toBe(true); + expect(legacy.process.stdin.destroyed).toBe(true); + }); + it("keeps an active shared client alive when another agent dir uses a different key", async () => { const first = createClientHarness(); const second = createClientHarness(); @@ -508,6 +551,101 @@ describe("shared Codex app-server client", () => { expect(second.process.stdin.destroyed).toBe(true); }); + it("can detach the current shared client without closing it", async () => { + const first = createClientHarness(); + const second = createClientHarness(); + vi.spyOn(CodexAppServerClient, "start") + .mockReturnValueOnce(first.client) + .mockReturnValueOnce(second.client); + + const firstList = listCodexAppServerModels({ timeoutMs: 1000 }); + await sendInitializeResult(first, "openclaw/0.125.0 (macOS; test)"); + await sendEmptyModelList(first); + await expect(firstList).resolves.toEqual({ models: [] }); + + expect(detachSharedCodexAppServerClientIfCurrent(first.client)).toBe(true); + expect(first.process.stdin.destroyed).toBe(false); + + const secondList = listCodexAppServerModels({ timeoutMs: 1000 }); + await sendInitializeResult(second, "openclaw/0.125.0 (macOS; test)"); + await sendEmptyModelList(second); + await expect(secondList).resolves.toEqual({ models: [] }); + + expect(detachSharedCodexAppServerClientIfCurrent(first.client)).toBe(false); + first.client.close(); + expect(first.process.stdin.destroyed).toBe(true); + expect(second.process.kill).not.toHaveBeenCalled(); + expect(detachSharedCodexAppServerClientIfCurrent(second.client)).toBe(true); + second.client.close(); + expect(second.process.stdin.destroyed).toBe(true); + }); + + it("closes a retired shared app-server after all active leases release", async () => { + const first = createClientHarness(); + const second = createClientHarness(); + vi.spyOn(CodexAppServerClient, "start") + .mockReturnValueOnce(first.client) + .mockReturnValueOnce(second.client); + + const firstList = listCodexAppServerModels({ timeoutMs: 1000 }); + await sendInitializeResult(first, "openclaw/0.125.0 (macOS; test)"); + await sendEmptyModelList(first); + await expect(firstList).resolves.toEqual({ models: [] }); + + const releaseFirst = retainSharedCodexAppServerClientIfCurrent(first.client); + const releaseSecond = retainSharedCodexAppServerClientIfCurrent(first.client); + expect(releaseFirst).toBeTypeOf("function"); + expect(releaseSecond).toBeTypeOf("function"); + expect(retireSharedCodexAppServerClientIfCurrent(first.client)).toEqual({ + activeLeases: 2, + closed: false, + }); + expect(first.process.stdin.destroyed).toBe(false); + + const secondList = listCodexAppServerModels({ timeoutMs: 1000 }); + await sendInitializeResult(second, "openclaw/0.125.0 (macOS; test)"); + await sendEmptyModelList(second); + await expect(secondList).resolves.toEqual({ models: [] }); + + releaseFirst?.(); + expect(first.process.stdin.destroyed).toBe(false); + releaseSecond?.(); + expect(first.process.stdin.destroyed).toBe(true); + expect(second.process.kill).not.toHaveBeenCalled(); + expect(retireSharedCodexAppServerClientIfCurrent(second.client)).toEqual({ + activeLeases: 0, + closed: true, + }); + expect(second.process.stdin.destroyed).toBe(true); + }); + + it("leases shared app-server clients before returning concurrent acquirers", async () => { + const first = createClientHarness(); + vi.spyOn(CodexAppServerClient, "start").mockReturnValueOnce(first.client); + + const firstLease = getLeasedSharedCodexAppServerClient({ timeoutMs: 1000 }); + const secondLease = getLeasedSharedCodexAppServerClient({ timeoutMs: 1000 }); + await sendInitializeResult(first, "openclaw/0.125.0 (macOS; test)"); + await expect(firstLease).resolves.toBe(first.client); + await expect(secondLease).resolves.toBe(first.client); + + expect(retireSharedCodexAppServerClientIfCurrent(first.client)).toEqual({ + activeLeases: 2, + closed: false, + }); + expect(retireSharedCodexAppServerClientIfCurrent(first.client)).toEqual({ + activeLeases: 2, + closed: false, + }); + expect(first.process.stdin.destroyed).toBe(false); + + expect(releaseLeasedSharedCodexAppServerClient(first.client)).toBe(true); + expect(first.process.stdin.destroyed).toBe(false); + expect(releaseLeasedSharedCodexAppServerClient(first.client)).toBe(true); + expect(first.process.stdin.destroyed).toBe(true); + expect(releaseLeasedSharedCodexAppServerClient(first.client)).toBe(false); + }); + it("waits only for the shared client that is still current", async () => { const first = createClientHarness(); const second = createClientHarness(); diff --git a/extensions/codex/src/app-server/shared-client.ts b/extensions/codex/src/app-server/shared-client.ts index 7260492f7c4..85a8c2f4788 100644 --- a/extensions/codex/src/app-server/shared-client.ts +++ b/extensions/codex/src/app-server/shared-client.ts @@ -17,10 +17,13 @@ import { withTimeout } from "./timeout.js"; type SharedCodexAppServerClientEntry = { client?: CodexAppServerClient; promise?: Promise; + activeLeases: number; + closeWhenIdle: boolean; }; type SharedCodexAppServerClientState = { clients: Map; + leasedReleases: WeakMap void>>; }; type LegacySharedCodexAppServerClientState = Partial & { @@ -28,6 +31,11 @@ type LegacySharedCodexAppServerClientState = Partial>; + leasedReleases?: unknown; +}; + const SHARED_CODEX_APP_SERVER_CLIENT_STATE = Symbol.for("openclaw.codexAppServerClientState"); function getSharedCodexAppServerClientState(): SharedCodexAppServerClientState { @@ -35,31 +43,48 @@ function getSharedCodexAppServerClientState(): SharedCodexAppServerClientState { [SHARED_CODEX_APP_SERVER_CLIENT_STATE]?: unknown; }; const state = globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE]; - if (isSharedCodexAppServerClientState(state)) { - return state; + const keyedState = readKeyedSharedCodexAppServerClientState(state); + if (keyedState) { + const clients = keyedState.clients as Map; + for (const entry of clients.values()) { + entry.activeLeases ??= 0; + entry.closeWhenIdle ??= false; + } + const nextState: SharedCodexAppServerClientState = { + clients, + leasedReleases: + keyedState.leasedReleases instanceof WeakMap ? keyedState.leasedReleases : new WeakMap(), + }; + globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE] = nextState; + return nextState; } const legacyState = readLegacySharedCodexAppServerClientState(state); const clients = new Map(); if (legacyState?.key && (legacyState.client || legacyState.promise)) { const legacyKey = legacyState.key; - clients.set(legacyKey, { client: legacyState.client, promise: legacyState.promise }); + clients.set(legacyKey, { + client: legacyState.client, + promise: legacyState.promise, + activeLeases: 0, + closeWhenIdle: false, + }); legacyState.client?.addCloseHandler((closedClient) => clearSharedClientEntryIfCurrent(legacyKey, closedClient), ); } - const nextState: SharedCodexAppServerClientState = { clients }; + const nextState: SharedCodexAppServerClientState = { clients, leasedReleases: new WeakMap() }; globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE] = nextState; return nextState; } -function isSharedCodexAppServerClientState( +function readKeyedSharedCodexAppServerClientState( value: unknown, -): value is SharedCodexAppServerClientState { - return ( - value !== null && +): KeyedSharedCodexAppServerClientState | undefined { + return value !== null && typeof value === "object" && (value as { clients?: unknown }).clients instanceof Map - ); + ? (value as KeyedSharedCodexAppServerClientState) + : undefined; } function readLegacySharedCodexAppServerClientState( @@ -71,13 +96,59 @@ function readLegacySharedCodexAppServerClientState( return value as LegacySharedCodexAppServerClientState; } -export async function getSharedCodexAppServerClient(options?: { +type SharedCodexAppServerClientOptions = { startOptions?: CodexAppServerStartOptions; timeoutMs?: number; authProfileId?: string | null; agentDir?: string; config?: Parameters[0]["config"]; -}): Promise { +}; + +export async function getSharedCodexAppServerClient( + options?: SharedCodexAppServerClientOptions, +): Promise { + return (await acquireSharedCodexAppServerClient(options)).client; +} + +export async function getLeasedSharedCodexAppServerClient( + options?: SharedCodexAppServerClientOptions, +): Promise { + const acquired = await acquireSharedCodexAppServerClient(options, { leased: true }); + const state = getSharedCodexAppServerClientState(); + const releases = state.leasedReleases.get(acquired.client) ?? []; + releases.push(acquired.release); + state.leasedReleases.set(acquired.client, releases); + return acquired.client; +} + +export function releaseLeasedSharedCodexAppServerClient(client: CodexAppServerClient): boolean { + const state = getSharedCodexAppServerClientState(); + const releases = state.leasedReleases.get(client); + if (!releases) { + return false; + } + const release = releases.pop(); + if (!release) { + return false; + } + if (releases.length === 0) { + state.leasedReleases.delete(client); + } + release(); + return true; +} + +async function acquireSharedCodexAppServerClient( + options?: SharedCodexAppServerClientOptions, +): Promise<{ client: CodexAppServerClient }>; +async function acquireSharedCodexAppServerClient( + options: SharedCodexAppServerClientOptions | undefined, + leaseOptions: { leased: true }, +): Promise<{ client: CodexAppServerClient; release: () => void }>; +async function acquireSharedCodexAppServerClient( + options?: SharedCodexAppServerClientOptions, + leaseOptions?: { leased: true }, +): Promise<{ client: CodexAppServerClient; release?: () => void }> { const agentDir = options?.agentDir ?? resolveDefaultAgentDir(options?.config ?? {}); const usesNativeAuth = options?.authProfileId === null; const requestedAuthProfileId = @@ -132,11 +203,13 @@ export async function getSharedCodexAppServerClient(options?: { } })()); try { - return await withTimeout( + const client = await withTimeout( sharedPromise, options?.timeoutMs ?? 0, "codex app-server initialize timed out", ); + const release = leaseOptions?.leased ? retainSharedClientEntry(entry) : undefined; + return release ? { client, release } : { client }; } catch (error) { const currentEntry = state.clients.get(key); if (currentEntry?.promise === sharedPromise) { @@ -223,6 +296,59 @@ export function clearSharedCodexAppServerClientIfCurrent( return false; } +export function detachSharedCodexAppServerClientIfCurrent( + client: CodexAppServerClient | undefined, +): boolean { + if (!client) { + return false; + } + const state = getSharedCodexAppServerClientState(); + for (const [key, entry] of state.clients) { + if (entry.client === client) { + state.clients.delete(key); + return true; + } + } + return false; +} + +export function retainSharedCodexAppServerClientIfCurrent( + client: CodexAppServerClient | undefined, +): (() => void) | undefined { + if (!client) { + return undefined; + } + const state = getSharedCodexAppServerClientState(); + for (const entry of state.clients.values()) { + if (entry.client === client) { + return retainSharedClientEntry(entry); + } + } + return undefined; +} + +export function retireSharedCodexAppServerClientIfCurrent( + client: CodexAppServerClient | undefined, +): { activeLeases: number; closed: boolean } | undefined { + if (!client) { + return undefined; + } + const state = getSharedCodexAppServerClientState(); + for (const [key, entry] of state.clients) { + if (entry.client === client) { + state.clients.delete(key); + entry.closeWhenIdle = true; + const closed = closeRetiredSharedClientEntryIfIdle(entry); + return { activeLeases: entry.activeLeases, closed }; + } + } + const activeLeases = state.leasedReleases.get(client)?.length ?? 0; + if (activeLeases > 0) { + return { activeLeases, closed: false }; + } + return undefined; +} + export async function clearSharedCodexAppServerClientIfCurrentAndWait( client: CodexAppServerClient | undefined, options?: { @@ -260,7 +386,7 @@ function getOrCreateSharedClientEntry( ): SharedCodexAppServerClientEntry { let entry = state.clients.get(key); if (!entry) { - entry = {}; + entry = { activeLeases: 0, closeWhenIdle: false }; state.clients.set(key, entry); } return entry; @@ -283,6 +409,30 @@ function clearSharedClientEntryIfCurrent(key: string, client: CodexAppServerClie } } +function retainSharedClientEntry(entry: SharedCodexAppServerClientEntry): () => void { + let released = false; + entry.activeLeases += 1; + return () => { + if (released) { + return; + } + released = true; + entry.activeLeases = Math.max(0, entry.activeLeases - 1); + closeRetiredSharedClientEntryIfIdle(entry); + }; +} + +function closeRetiredSharedClientEntryIfIdle(entry: SharedCodexAppServerClientEntry): boolean { + if (!entry.closeWhenIdle || entry.activeLeases > 0 || !entry.client) { + return false; + } + const client = entry.client; + entry.closeWhenIdle = false; + entry.client = undefined; + client.close(); + return true; +} + function collectSharedClients(state: SharedCodexAppServerClientState): CodexAppServerClient[] { return [ ...new Set( diff --git a/extensions/codex/src/app-server/side-question.test.ts b/extensions/codex/src/app-server/side-question.test.ts index 86d14be556e..a66a8f28563 100644 --- a/extensions/codex/src/app-server/side-question.test.ts +++ b/extensions/codex/src/app-server/side-question.test.ts @@ -30,6 +30,9 @@ vi.mock("./session-binding.js", () => ({ vi.mock("./shared-client.js", () => ({ getSharedCodexAppServerClient: (...args: unknown[]) => getSharedCodexAppServerClientMock(...args), + getLeasedSharedCodexAppServerClient: (...args: unknown[]) => + getSharedCodexAppServerClientMock(...args), + releaseLeasedSharedCodexAppServerClient: vi.fn(), })); vi.mock("./auth-bridge.js", () => ({ diff --git a/extensions/codex/src/app-server/side-question.ts b/extensions/codex/src/app-server/side-question.ts index 2fb65c41ffe..22b960eed24 100644 --- a/extensions/codex/src/app-server/side-question.ts +++ b/extensions/codex/src/app-server/side-question.ts @@ -66,7 +66,10 @@ import { rememberCodexRateLimits, readRecentCodexRateLimits } from "./rate-limit import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js"; import { resolveCodexNativeExecutionBlock } from "./sandbox-guard.js"; import { readCodexAppServerBinding } from "./session-binding.js"; -import { getSharedCodexAppServerClient } from "./shared-client.js"; +import { + getLeasedSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient, +} from "./shared-client.js"; import { buildCodexRuntimeThreadConfig, CODEX_NATIVE_PERSONALITY_NONE, @@ -145,7 +148,7 @@ export async function runCodexAppServerSideQuestion( const pluginConfig = readCodexPluginConfig(options.pluginConfig); const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig }); const authProfileId = params.authProfileId ?? binding.authProfileId; - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: appServer.start, timeoutMs: appServer.requestTimeoutMs, authProfileId, @@ -403,6 +406,7 @@ export async function runCodexAppServerSideQuestion( timeoutMs: appServer.requestTimeoutMs, }); } finally { + releaseLeasedSharedCodexAppServerClient(client); nativeHookRelay?.unregister(); } } diff --git a/extensions/codex/src/conversation-binding.test.ts b/extensions/codex/src/conversation-binding.test.ts index d8da7de101d..6246c36f2cb 100644 --- a/extensions/codex/src/conversation-binding.test.ts +++ b/extensions/codex/src/conversation-binding.test.ts @@ -18,7 +18,11 @@ const agentRuntimeMocks = vi.hoisted(() => ({ saveAuthProfileStore: vi.fn(), })); -vi.mock("./app-server/shared-client.js", () => sharedClientMocks); +vi.mock("./app-server/shared-client.js", () => ({ + ...sharedClientMocks, + getLeasedSharedCodexAppServerClient: sharedClientMocks.getSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient: vi.fn(), +})); vi.mock("openclaw/plugin-sdk/agent-runtime", () => agentRuntimeMocks); import { diff --git a/extensions/codex/src/conversation-binding.ts b/extensions/codex/src/conversation-binding.ts index e2616f1d1ad..b7208b64a73 100644 --- a/extensions/codex/src/conversation-binding.ts +++ b/extensions/codex/src/conversation-binding.ts @@ -33,7 +33,10 @@ import { writeCodexAppServerBinding, type CodexAppServerAuthProfileLookup, } from "./app-server/session-binding.js"; -import { getSharedCodexAppServerClient } from "./app-server/shared-client.js"; +import { + getLeasedSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient, +} from "./app-server/shared-client.js"; import { CODEX_NATIVE_PERSONALITY_NONE } from "./app-server/thread-lifecycle.js"; import { formatCodexDisplayText } from "./command-formatters.js"; import { @@ -270,52 +273,56 @@ async function attachExistingThread(params: { modelProvider: params.modelProvider, ...agentLookup, }); - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: runtime.start, timeoutMs: runtime.requestTimeoutMs, authProfileId: params.authProfileId, ...agentLookup, }); - const response: CodexThreadResumeResponse = await client.request( - CODEX_CONTROL_METHODS.resumeThread, - { - threadId: params.threadId, - ...(params.model ? { model: params.model } : {}), - ...(modelProvider ? { modelProvider } : {}), - personality: CODEX_NATIVE_PERSONALITY_NONE, - approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, - approvalsReviewer: runtime.approvalsReviewer, - sandbox: params.sandbox ?? runtime.sandbox, - ...((params.serviceTier ?? runtime.serviceTier) - ? { serviceTier: params.serviceTier ?? runtime.serviceTier } - : {}), - persistExtendedHistory: true, - }, - { timeoutMs: runtime.requestTimeoutMs }, - ); - const thread = response.thread; - const runtimeApprovalPolicy = - typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined; - await writeCodexAppServerBinding( - params.sessionFile, - { - threadId: thread.id, - cwd: thread.cwd ?? params.workspaceDir, - authProfileId: params.authProfileId, - model: response.model ?? params.model, - modelProvider: normalizeCodexAppServerBindingModelProvider({ + try { + const response: CodexThreadResumeResponse = await client.request( + CODEX_CONTROL_METHODS.resumeThread, + { + threadId: params.threadId, + ...(params.model ? { model: params.model } : {}), + ...(modelProvider ? { modelProvider } : {}), + personality: CODEX_NATIVE_PERSONALITY_NONE, + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, + approvalsReviewer: runtime.approvalsReviewer, + sandbox: params.sandbox ?? runtime.sandbox, + ...((params.serviceTier ?? runtime.serviceTier) + ? { serviceTier: params.serviceTier ?? runtime.serviceTier } + : {}), + persistExtendedHistory: true, + }, + { timeoutMs: runtime.requestTimeoutMs }, + ); + const thread = response.thread; + const runtimeApprovalPolicy = + typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined; + await writeCodexAppServerBinding( + params.sessionFile, + { + threadId: thread.id, + cwd: thread.cwd ?? params.workspaceDir, authProfileId: params.authProfileId, - modelProvider: response.modelProvider ?? params.modelProvider, + model: response.model ?? params.model, + modelProvider: normalizeCodexAppServerBindingModelProvider({ + authProfileId: params.authProfileId, + modelProvider: response.modelProvider ?? params.modelProvider, + ...agentLookup, + }), + approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy, + sandbox: params.sandbox ?? runtime.sandbox, + serviceTier: params.serviceTier ?? runtime.serviceTier, + }, + { ...agentLookup, - }), - approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy, - sandbox: params.sandbox ?? runtime.sandbox, - serviceTier: params.serviceTier ?? runtime.serviceTier, - }, - { - ...agentLookup, - }, - ); + }, + ); + } finally { + releaseLeasedSharedCodexAppServerClient(client); + } } async function createThread(params: { @@ -340,54 +347,58 @@ async function createThread(params: { modelProvider: params.modelProvider, ...agentLookup, }); - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: runtime.start, timeoutMs: runtime.requestTimeoutMs, authProfileId: params.authProfileId, ...agentLookup, }); - const response: CodexThreadStartResponse = await client.request( - "thread/start", - { - cwd: params.workspaceDir, - ...(params.model ? { model: params.model } : {}), - ...(modelProvider ? { modelProvider } : {}), - personality: CODEX_NATIVE_PERSONALITY_NONE, - approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, - approvalsReviewer: runtime.approvalsReviewer, - sandbox: params.sandbox ?? runtime.sandbox, - ...((params.serviceTier ?? runtime.serviceTier) - ? { serviceTier: params.serviceTier ?? runtime.serviceTier } - : {}), - developerInstructions: - "This Codex thread is bound to an OpenClaw conversation. Answer normally; OpenClaw will deliver your final response back to the conversation.", - experimentalRawEvents: true, - persistExtendedHistory: true, - }, - { timeoutMs: runtime.requestTimeoutMs }, - ); - const runtimeApprovalPolicy = - typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined; - await writeCodexAppServerBinding( - params.sessionFile, - { - threadId: response.thread.id, - cwd: response.thread.cwd ?? params.workspaceDir, - authProfileId: params.authProfileId, - model: response.model ?? params.model, - modelProvider: normalizeCodexAppServerBindingModelProvider({ + try { + const response: CodexThreadStartResponse = await client.request( + "thread/start", + { + cwd: params.workspaceDir, + ...(params.model ? { model: params.model } : {}), + ...(modelProvider ? { modelProvider } : {}), + personality: CODEX_NATIVE_PERSONALITY_NONE, + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, + approvalsReviewer: runtime.approvalsReviewer, + sandbox: params.sandbox ?? runtime.sandbox, + ...((params.serviceTier ?? runtime.serviceTier) + ? { serviceTier: params.serviceTier ?? runtime.serviceTier } + : {}), + developerInstructions: + "This Codex thread is bound to an OpenClaw conversation. Answer normally; OpenClaw will deliver your final response back to the conversation.", + experimentalRawEvents: true, + persistExtendedHistory: true, + }, + { timeoutMs: runtime.requestTimeoutMs }, + ); + const runtimeApprovalPolicy = + typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined; + await writeCodexAppServerBinding( + params.sessionFile, + { + threadId: response.thread.id, + cwd: response.thread.cwd ?? params.workspaceDir, authProfileId: params.authProfileId, - modelProvider: response.modelProvider ?? params.modelProvider, + model: response.model ?? params.model, + modelProvider: normalizeCodexAppServerBindingModelProvider({ + authProfileId: params.authProfileId, + modelProvider: response.modelProvider ?? params.modelProvider, + ...agentLookup, + }), + approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy, + sandbox: params.sandbox ?? runtime.sandbox, + serviceTier: params.serviceTier ?? runtime.serviceTier, + }, + { ...agentLookup, - }), - approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy, - sandbox: params.sandbox ?? runtime.sandbox, - serviceTier: params.serviceTier ?? runtime.serviceTier, - }, - { - ...agentLookup, - }, - ); + }, + ); + } finally { + releaseLeasedSharedCodexAppServerClient(client); + } } async function runBoundTurn(params: { @@ -407,7 +418,7 @@ async function runBoundTurn(params: { throw new Error("bound Codex conversation has no thread binding"); } - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: runtime.start, timeoutMs: runtime.requestTimeoutMs, authProfileId: binding.authProfileId, @@ -498,6 +509,7 @@ async function runBoundTurn(params: { } finally { notificationCleanup(); requestCleanup(); + releaseLeasedSharedCodexAppServerClient(client); } } diff --git a/extensions/codex/src/conversation-control.test.ts b/extensions/codex/src/conversation-control.test.ts index 2a377c682f2..902bbe85619 100644 --- a/extensions/codex/src/conversation-control.test.ts +++ b/extensions/codex/src/conversation-control.test.ts @@ -20,7 +20,11 @@ const sharedClientMocks = vi.hoisted(() => ({ getSharedCodexAppServerClient: vi.fn(), })); -vi.mock("./app-server/shared-client.js", () => sharedClientMocks); +vi.mock("./app-server/shared-client.js", () => ({ + ...sharedClientMocks, + getLeasedSharedCodexAppServerClient: sharedClientMocks.getSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient: vi.fn(), +})); describe("codex conversation controls", () => { beforeEach(async () => { diff --git a/extensions/codex/src/conversation-control.ts b/extensions/codex/src/conversation-control.ts index 0e309f1d8fc..126a0beae5a 100644 --- a/extensions/codex/src/conversation-control.ts +++ b/extensions/codex/src/conversation-control.ts @@ -10,7 +10,10 @@ import { readCodexAppServerBinding, writeCodexAppServerBinding, } from "./app-server/session-binding.js"; -import { getSharedCodexAppServerClient } from "./app-server/shared-client.js"; +import { + getLeasedSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient, +} from "./app-server/shared-client.js"; import { formatCodexDisplayText } from "./command-formatters.js"; type ActiveTurn = { @@ -61,20 +64,24 @@ export async function stopCodexConversationTurn(params: { const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig }); const lookup = buildBindingLookup(params); const binding = await readCodexAppServerBinding(params.sessionFile, lookup); - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: runtime.start, timeoutMs: runtime.requestTimeoutMs, authProfileId: binding?.authProfileId, ...lookup, }); - await client.request( - "turn/interrupt", - { - threadId: active.threadId, - turnId: active.turnId, - }, - { timeoutMs: runtime.requestTimeoutMs }, - ); + try { + await client.request( + "turn/interrupt", + { + threadId: active.threadId, + turnId: active.turnId, + }, + { timeoutMs: runtime.requestTimeoutMs }, + ); + } finally { + releaseLeasedSharedCodexAppServerClient(client); + } return { stopped: true, message: "Codex stop requested." }; } @@ -96,21 +103,25 @@ export async function steerCodexConversationTurn(params: { const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig }); const lookup = buildBindingLookup(params); const binding = await readCodexAppServerBinding(params.sessionFile, lookup); - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: runtime.start, timeoutMs: runtime.requestTimeoutMs, authProfileId: binding?.authProfileId, ...lookup, }); - await client.request( - "turn/steer", - { - threadId: active.threadId, - expectedTurnId: active.turnId, - input: [{ type: "text", text, text_elements: [] }], - }, - { timeoutMs: runtime.requestTimeoutMs }, - ); + try { + await client.request( + "turn/steer", + { + threadId: active.threadId, + expectedTurnId: active.turnId, + input: [{ type: "text", text, text_elements: [] }], + }, + { timeoutMs: runtime.requestTimeoutMs }, + ); + } finally { + releaseLeasedSharedCodexAppServerClient(client); + } return { steered: true, message: "Sent steer message to Codex." }; } @@ -261,25 +272,29 @@ async function resumeThreadWithOverrides(params: { serviceTier?: CodexServiceTier; }): Promise { const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig }); - const client = await getSharedCodexAppServerClient({ + const client = await getLeasedSharedCodexAppServerClient({ startOptions: runtime.start, timeoutMs: runtime.requestTimeoutMs, authProfileId: params.authProfileId, ...buildBindingLookup(params), }); - return await client.request( - CODEX_CONTROL_METHODS.resumeThread, - { - threadId: params.threadId, - ...(params.model ? { model: params.model } : {}), - approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, - sandbox: params.sandbox ?? runtime.sandbox, - approvalsReviewer: runtime.approvalsReviewer, - ...(params.serviceTier ? { serviceTier: params.serviceTier } : {}), - persistExtendedHistory: true, - }, - { timeoutMs: runtime.requestTimeoutMs }, - ); + try { + return await client.request( + CODEX_CONTROL_METHODS.resumeThread, + { + threadId: params.threadId, + ...(params.model ? { model: params.model } : {}), + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, + sandbox: params.sandbox ?? runtime.sandbox, + approvalsReviewer: runtime.approvalsReviewer, + ...(params.serviceTier ? { serviceTier: params.serviceTier } : {}), + persistExtendedHistory: true, + }, + { timeoutMs: runtime.requestTimeoutMs }, + ); + } finally { + releaseLeasedSharedCodexAppServerClient(client); + } } function buildBindingLookup(params: { diff --git a/extensions/codex/src/migration/apply.ts b/extensions/codex/src/migration/apply.ts index 4f05ca3cf93..a2f283db476 100644 --- a/extensions/codex/src/migration/apply.ts +++ b/extensions/codex/src/migration/apply.ts @@ -42,7 +42,8 @@ import type { v2 } from "../app-server/protocol.js"; import { requestCodexAppServerJson } from "../app-server/request.js"; import { clearSharedCodexAppServerClientIfCurrentAndWait, - getSharedCodexAppServerClient, + getLeasedSharedCodexAppServerClient, + releaseLeasedSharedCodexAppServerClient, } from "../app-server/shared-client.js"; import { applyCodexAuthItem, buildCodexAuthConfigPatchItems } from "./auth.js"; import { buildCodexMigrationPlan } from "./plan.js"; @@ -86,8 +87,8 @@ export function prepareTargetCodexAppServer( ): CodexMigrationTargetAppServerPreparation { const appServer = resolveTargetCodexAppServer(ctx); const targets = resolveCodexMigrationTargets(ctx); - let warmedClient: Awaited> | undefined; - const ready = getSharedCodexAppServerClient({ + let warmedClient: Awaited> | undefined; + const ready = getLeasedSharedCodexAppServerClient({ startOptions: appServer.start, timeoutMs: 60_000, agentDir: targets.agentDir, @@ -101,6 +102,9 @@ export function prepareTargetCodexAppServer( return { async dispose() { await ready; + if (warmedClient) { + releaseLeasedSharedCodexAppServerClient(warmedClient); + } await clearSharedCodexAppServerClientIfCurrentAndWait(warmedClient, { exitTimeoutMs: 2_000, forceKillDelayMs: 250, diff --git a/src/agents/pi-embedded-runner/run/failover-policy.test.ts b/src/agents/pi-embedded-runner/run/failover-policy.test.ts index 9ec967e407e..42361ec692c 100644 --- a/src/agents/pi-embedded-runner/run/failover-policy.test.ts +++ b/src/agents/pi-embedded-runner/run/failover-policy.test.ts @@ -319,6 +319,27 @@ describe("resolveRunFailoverDecision", () => { }); }); + it("does not rotate harness-owned assistant errors classified as timeout", () => { + expect( + resolveRunFailoverDecision({ + stage: "assistant", + aborted: false, + externalAbort: false, + fallbackConfigured: true, + failoverFailure: true, + failoverReason: "timeout", + timedOut: false, + idleTimedOut: false, + timedOutDuringCompaction: false, + timedOutDuringToolExecution: false, + harnessOwnsTransport: true, + profileRotated: false, + }), + ).toEqual({ + action: "continue_normal", + }); + }); + it("rotates concrete assistant failover failures that accompany harness-owned timeouts", () => { expect( resolveRunFailoverDecision({ diff --git a/src/agents/pi-embedded-runner/run/failover-policy.ts b/src/agents/pi-embedded-runner/run/failover-policy.ts index 56baf0177f7..2159947f117 100644 --- a/src/agents/pi-embedded-runner/run/failover-policy.ts +++ b/src/agents/pi-embedded-runner/run/failover-policy.ts @@ -116,11 +116,9 @@ function shouldRotateAssistant(params: AssistantDecisionParams): boolean { return false; } const timeoutFailure = isAssistantTimeoutFailure(params); - if ( - timeoutFailure && - params.harnessOwnsTransport && - !isConcreteNonTimeoutAssistantFailure(params) - ) { + const harnessOwnedTimeout = + params.harnessOwnsTransport && (timeoutFailure || params.failoverReason === "timeout"); + if (harnessOwnedTimeout && !isConcreteNonTimeoutAssistantFailure(params)) { return false; } return (!params.aborted && params.failoverFailure) || timeoutFailure;