mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 04:46:16 +00:00
fix(codex): bound app-server timeout fallout
Retire timed-out Codex app-server clients with lease-aware cleanup and keep harness-owned timeouts out of provider fallback.
This commit is contained in:
committed by
GitHub
parent
9fc71e9076
commit
5a7d5c6def
@@ -22,3 +22,13 @@ export const defaultCodexAppServerClientFactory: CodexAppServerClientFactory = (
|
||||
import("./shared-client.js").then(({ getSharedCodexAppServerClient }) =>
|
||||
getSharedCodexAppServerClient({ startOptions, authProfileId, agentDir, config }),
|
||||
);
|
||||
|
||||
export const defaultLeasedCodexAppServerClientFactory: CodexAppServerClientFactory = (
|
||||
startOptions,
|
||||
authProfileId,
|
||||
agentDir,
|
||||
config,
|
||||
) =>
|
||||
import("./shared-client.js").then(({ getLeasedSharedCodexAppServerClient }) =>
|
||||
getLeasedSharedCodexAppServerClient({ startOptions, authProfileId, agentDir, config }),
|
||||
);
|
||||
|
||||
@@ -74,10 +74,13 @@ async function withCodexAppServerModelClient<T>(
|
||||
): Promise<T> {
|
||||
const timeoutMs = options.timeoutMs ?? 2500;
|
||||
const useSharedClient = options.sharedClient !== false;
|
||||
const { createIsolatedCodexAppServerClient, getSharedCodexAppServerClient } =
|
||||
await import("./shared-client.js");
|
||||
const {
|
||||
createIsolatedCodexAppServerClient,
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
} = await import("./shared-client.js");
|
||||
const client = useSharedClient
|
||||
? await getSharedCodexAppServerClient({
|
||||
? await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: options.startOptions,
|
||||
timeoutMs,
|
||||
authProfileId: options.authProfileId,
|
||||
@@ -94,7 +97,9 @@ async function withCodexAppServerModelClient<T>(
|
||||
try {
|
||||
return await run({ client, timeoutMs });
|
||||
} finally {
|
||||
if (!useSharedClient) {
|
||||
if (useSharedClient) {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
} else {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,11 @@ const sharedClientMocks = vi.hoisted(() => ({
|
||||
getSharedCodexAppServerClient: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./shared-client.js", () => sharedClientMocks);
|
||||
vi.mock("./shared-client.js", () => ({
|
||||
...sharedClientMocks,
|
||||
getLeasedSharedCodexAppServerClient: sharedClientMocks.getSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient: vi.fn(),
|
||||
}));
|
||||
|
||||
const { requestCodexAppServerJson } = await import("./request.js");
|
||||
|
||||
|
||||
@@ -9,7 +9,8 @@ import type {
|
||||
import { resolveCodexAppServerDirectSandboxBypassBlock } from "./sandbox-guard.js";
|
||||
import {
|
||||
createIsolatedCodexAppServerClient,
|
||||
getSharedCodexAppServerClient,
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
} from "./shared-client.js";
|
||||
import { withTimeout } from "./timeout.js";
|
||||
|
||||
@@ -63,7 +64,7 @@ export async function requestCodexAppServerJson<T = JsonValue | undefined>(param
|
||||
return await withTimeout(
|
||||
(async () => {
|
||||
const client = await (
|
||||
params.isolated ? createIsolatedCodexAppServerClient : getSharedCodexAppServerClient
|
||||
params.isolated ? createIsolatedCodexAppServerClient : getLeasedSharedCodexAppServerClient
|
||||
)({
|
||||
startOptions: params.startOptions,
|
||||
timeoutMs,
|
||||
@@ -81,6 +82,8 @@ export async function requestCodexAppServerJson<T = JsonValue | undefined>(param
|
||||
// underlying codex binary, so the unref'd close() path can leave
|
||||
// the child running and keep the parent's event loop alive.
|
||||
await client.closeAndWait({ exitTimeoutMs: 2_000, forceKillDelayMs: 250 });
|
||||
} else {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
}
|
||||
})(),
|
||||
|
||||
@@ -4160,7 +4160,7 @@ describe("runCodexAppServerAttempt", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("interrupts but keeps the app-server client alive when a turn timeout fires", async () => {
|
||||
it("unsubscribes and closes the app-server client when the active turn goes idle past the attempt timeout", async () => {
|
||||
const close = vi.fn();
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "thread/start") {
|
||||
@@ -4204,7 +4204,14 @@ describe("runCodexAppServerAttempt", () => {
|
||||
},
|
||||
{ timeoutMs: 5_000 },
|
||||
);
|
||||
expect(close).not.toHaveBeenCalled();
|
||||
expect(request).toHaveBeenCalledWith(
|
||||
"thread/unsubscribe",
|
||||
{
|
||||
threadId: "thread-1",
|
||||
},
|
||||
{ timeoutMs: 5_000 },
|
||||
);
|
||||
expect(close).toHaveBeenCalledTimes(1);
|
||||
expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false);
|
||||
});
|
||||
|
||||
@@ -5562,9 +5569,13 @@ describe("runCodexAppServerAttempt", () => {
|
||||
),
|
||||
{ interval: 1 },
|
||||
);
|
||||
expect(warn).not.toHaveBeenCalledWith(
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"codex app-server client retired after timed-out turn",
|
||||
expect.anything(),
|
||||
expect.objectContaining({
|
||||
reason: "turn_completion_idle_timeout",
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ import {
|
||||
} from "./auth-bridge.js";
|
||||
import { CODEX_CONTROL_METHODS } from "./capabilities.js";
|
||||
import {
|
||||
defaultCodexAppServerClientFactory,
|
||||
defaultLeasedCodexAppServerClientFactory,
|
||||
type CodexAppServerClientFactory,
|
||||
} from "./client-factory.js";
|
||||
import {
|
||||
@@ -170,7 +170,11 @@ import {
|
||||
type CodexAppServerThreadBinding,
|
||||
} from "./session-binding.js";
|
||||
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
|
||||
import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js";
|
||||
import {
|
||||
clearSharedCodexAppServerClientIfCurrent,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
retireSharedCodexAppServerClientIfCurrent,
|
||||
} from "./shared-client.js";
|
||||
import {
|
||||
areCodexDynamicToolFingerprintsCompatible,
|
||||
buildDeveloperInstructions,
|
||||
@@ -1054,7 +1058,7 @@ export async function runCodexAppServerAttempt(
|
||||
const preDynamicStartupStages = createCodexDynamicToolBuildStageTracker({
|
||||
enabled: profilerEnabled,
|
||||
});
|
||||
const attemptClientFactory = options.clientFactory ?? defaultCodexAppServerClientFactory;
|
||||
const attemptClientFactory = options.clientFactory ?? defaultLeasedCodexAppServerClientFactory;
|
||||
const pluginConfig = readCodexPluginConfig(options.pluginConfig);
|
||||
const computerUseConfig = resolveCodexComputerUseConfig({ pluginConfig });
|
||||
const configuredAppServer = resolveCodexAppServerRuntimeOptions({ pluginConfig });
|
||||
@@ -1492,6 +1496,7 @@ export async function runCodexAppServerAttempt(
|
||||
let thread: CodexAppServerThreadLifecycleBinding;
|
||||
let trajectoryEndRecorded = false;
|
||||
let nativeHookRelay: NativeHookRelayRegistrationHandle | undefined;
|
||||
let releaseSharedClientLease: (() => void) | undefined;
|
||||
let startupClientForCleanup: CodexAppServerClient | undefined;
|
||||
let sandboxExecEnvironmentAcquired = false;
|
||||
const releaseSandboxExecEnvironment = async () => {
|
||||
@@ -1605,134 +1610,150 @@ export async function runCodexAppServerAttempt(
|
||||
operation: async () => {
|
||||
let attemptedClient: CodexAppServerClient | undefined;
|
||||
const startupAttempt = async () => {
|
||||
const startupClient = await attemptClientFactory(
|
||||
appServer.start,
|
||||
startupAuthProfileId,
|
||||
agentDir,
|
||||
params.config,
|
||||
);
|
||||
attemptedClient = startupClient;
|
||||
startupClientForCleanup = startupClient;
|
||||
await ensureCodexComputerUse({
|
||||
client: startupClient,
|
||||
pluginConfig,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
});
|
||||
let startupSandboxEnvironment: CodexSandboxExecEnvironment | undefined;
|
||||
let startupSandboxEnvironmentAcquired = false;
|
||||
const releaseStartupSandboxEnvironment = async () => {
|
||||
if (startupSandboxEnvironmentAcquired) {
|
||||
startupSandboxEnvironmentAcquired = false;
|
||||
await releaseCodexSandboxExecServerEnvironment(sandbox);
|
||||
}
|
||||
};
|
||||
releaseStartupResourcesOnTimeout = releaseStartupSandboxEnvironment;
|
||||
let startupClientLease: (() => void) | undefined;
|
||||
let startupAttemptSucceeded = false;
|
||||
try {
|
||||
startupSandboxEnvironment = shouldRequireCodexSandboxExecServerEnvironment({
|
||||
sandbox,
|
||||
nativeToolSurfaceEnabled,
|
||||
sandboxExecServerEnabled,
|
||||
})
|
||||
? await ensureCodexSandboxExecServerEnvironment({
|
||||
client: startupClient,
|
||||
sandbox: sandbox ?? null,
|
||||
appServerStartOptions: appServer.start,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
})
|
||||
: undefined;
|
||||
startupSandboxEnvironmentAcquired = Boolean(startupSandboxEnvironment);
|
||||
if (runAbortController.signal.aborted) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw new Error("codex app-server startup aborted");
|
||||
}
|
||||
if (
|
||||
sandbox?.enabled &&
|
||||
nativeToolSurfaceEnabled &&
|
||||
sandboxExecServerEnabled &&
|
||||
!startupSandboxEnvironment
|
||||
) {
|
||||
throw new Error(
|
||||
"Codex app-server did not register an OpenClaw sandbox exec-server environment.",
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw error;
|
||||
}
|
||||
const startupEnvironmentSelection = resolveCodexSandboxEnvironmentSelection(
|
||||
startupSandboxEnvironment,
|
||||
nativeToolSurfaceEnabled,
|
||||
);
|
||||
const startupExecutionCwd = resolveCodexAppServerExecutionCwd({
|
||||
effectiveWorkspace,
|
||||
environment: startupSandboxEnvironment,
|
||||
nativeToolSurfaceEnabled,
|
||||
});
|
||||
const startupSandboxPolicy = startupSandboxEnvironment
|
||||
? resolveCodexExternalSandboxPolicyForOpenClawSandbox(sandbox)
|
||||
: undefined;
|
||||
const buildThreadLifecycleParams = () =>
|
||||
({
|
||||
client: startupClient,
|
||||
params: buildActiveRunAttemptParams(),
|
||||
agentId: sessionAgentId,
|
||||
cwd: startupExecutionCwd,
|
||||
dynamicTools: toolBridge.specs,
|
||||
appServer: pluginAppServer,
|
||||
developerInstructions: promptBuild.developerInstructions,
|
||||
config: threadConfig,
|
||||
finalConfigPatch: nativeHookRelayConfig,
|
||||
nativeCodeModeEnabled: nativeToolSurfaceEnabled,
|
||||
nativeCodeModeOnlyEnabled: appServer.codeModeOnly,
|
||||
userMcpServersEnabled: nativeToolSurfaceEnabled,
|
||||
mcpServersFingerprint: bundleMcpThreadConfig.fingerprint,
|
||||
mcpServersFingerprintEvaluated: bundleMcpThreadConfig.evaluated,
|
||||
environmentSelection: startupEnvironmentSelection,
|
||||
contextEngineProjection,
|
||||
pluginThreadConfig: pluginThreadConfigRequired
|
||||
? {
|
||||
enabled: true,
|
||||
inputFingerprint: pluginThreadConfigInputFingerprint,
|
||||
enabledPluginConfigKeys,
|
||||
build: () =>
|
||||
buildCodexPluginThreadConfig({
|
||||
pluginConfig: pluginThreadConfigPluginConfig,
|
||||
request: (method, requestParams) =>
|
||||
startupClient.request(method, requestParams, {
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
}),
|
||||
appCache: defaultCodexAppInventoryCache,
|
||||
appCacheKey: pluginAppCacheKey,
|
||||
}),
|
||||
}
|
||||
: undefined,
|
||||
}) satisfies Parameters<typeof startOrResumeThread>[0];
|
||||
try {
|
||||
restartContextEngineCodexThread = () =>
|
||||
startOrResumeThread(buildThreadLifecycleParams());
|
||||
const startupThread = await startOrResumeThread(buildThreadLifecycleParams());
|
||||
if (runAbortController.signal.aborted) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw new Error("codex app-server startup aborted");
|
||||
}
|
||||
startupSandboxEnvironmentAcquired = false;
|
||||
return {
|
||||
client: startupClient,
|
||||
thread: startupThread,
|
||||
sandboxEnvironment: startupSandboxEnvironment,
|
||||
environmentSelection: startupEnvironmentSelection,
|
||||
executionCwd: startupExecutionCwd,
|
||||
sandboxPolicy: startupSandboxPolicy,
|
||||
const startupClient = await attemptClientFactory(
|
||||
appServer.start,
|
||||
startupAuthProfileId,
|
||||
agentDir,
|
||||
params.config,
|
||||
);
|
||||
startupClientLease = () => {
|
||||
releaseLeasedSharedCodexAppServerClient(startupClient);
|
||||
};
|
||||
} catch (error) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw error;
|
||||
releaseSharedClientLease = startupClientLease;
|
||||
attemptedClient = startupClient;
|
||||
startupClientForCleanup = startupClient;
|
||||
await ensureCodexComputerUse({
|
||||
client: startupClient,
|
||||
pluginConfig,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
});
|
||||
let startupSandboxEnvironment: CodexSandboxExecEnvironment | undefined;
|
||||
let startupSandboxEnvironmentAcquired = false;
|
||||
const releaseStartupSandboxEnvironment = async () => {
|
||||
if (startupSandboxEnvironmentAcquired) {
|
||||
startupSandboxEnvironmentAcquired = false;
|
||||
await releaseCodexSandboxExecServerEnvironment(sandbox);
|
||||
}
|
||||
};
|
||||
releaseStartupResourcesOnTimeout = releaseStartupSandboxEnvironment;
|
||||
try {
|
||||
startupSandboxEnvironment = shouldRequireCodexSandboxExecServerEnvironment({
|
||||
sandbox,
|
||||
nativeToolSurfaceEnabled,
|
||||
sandboxExecServerEnabled,
|
||||
})
|
||||
? await ensureCodexSandboxExecServerEnvironment({
|
||||
client: startupClient,
|
||||
sandbox: sandbox ?? null,
|
||||
appServerStartOptions: appServer.start,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
})
|
||||
: undefined;
|
||||
startupSandboxEnvironmentAcquired = Boolean(startupSandboxEnvironment);
|
||||
if (runAbortController.signal.aborted) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw new Error("codex app-server startup aborted");
|
||||
}
|
||||
if (
|
||||
sandbox?.enabled &&
|
||||
nativeToolSurfaceEnabled &&
|
||||
sandboxExecServerEnabled &&
|
||||
!startupSandboxEnvironment
|
||||
) {
|
||||
throw new Error(
|
||||
"Codex app-server did not register an OpenClaw sandbox exec-server environment.",
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw error;
|
||||
}
|
||||
const startupEnvironmentSelection = resolveCodexSandboxEnvironmentSelection(
|
||||
startupSandboxEnvironment,
|
||||
nativeToolSurfaceEnabled,
|
||||
);
|
||||
const startupExecutionCwd = resolveCodexAppServerExecutionCwd({
|
||||
effectiveWorkspace,
|
||||
environment: startupSandboxEnvironment,
|
||||
nativeToolSurfaceEnabled,
|
||||
});
|
||||
const startupSandboxPolicy = startupSandboxEnvironment
|
||||
? resolveCodexExternalSandboxPolicyForOpenClawSandbox(sandbox)
|
||||
: undefined;
|
||||
const buildThreadLifecycleParams = () =>
|
||||
({
|
||||
client: startupClient,
|
||||
params: buildActiveRunAttemptParams(),
|
||||
agentId: sessionAgentId,
|
||||
cwd: startupExecutionCwd,
|
||||
dynamicTools: toolBridge.specs,
|
||||
appServer: pluginAppServer,
|
||||
developerInstructions: promptBuild.developerInstructions,
|
||||
config: threadConfig,
|
||||
finalConfigPatch: nativeHookRelayConfig,
|
||||
nativeCodeModeEnabled: nativeToolSurfaceEnabled,
|
||||
nativeCodeModeOnlyEnabled: appServer.codeModeOnly,
|
||||
userMcpServersEnabled: nativeToolSurfaceEnabled,
|
||||
mcpServersFingerprint: bundleMcpThreadConfig.fingerprint,
|
||||
mcpServersFingerprintEvaluated: bundleMcpThreadConfig.evaluated,
|
||||
environmentSelection: startupEnvironmentSelection,
|
||||
contextEngineProjection,
|
||||
pluginThreadConfig: pluginThreadConfigRequired
|
||||
? {
|
||||
enabled: true,
|
||||
inputFingerprint: pluginThreadConfigInputFingerprint,
|
||||
enabledPluginConfigKeys,
|
||||
build: () =>
|
||||
buildCodexPluginThreadConfig({
|
||||
pluginConfig: pluginThreadConfigPluginConfig,
|
||||
request: (method, requestParams) =>
|
||||
startupClient.request(method, requestParams, {
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
}),
|
||||
appCache: defaultCodexAppInventoryCache,
|
||||
appCacheKey: pluginAppCacheKey,
|
||||
}),
|
||||
}
|
||||
: undefined,
|
||||
}) satisfies Parameters<typeof startOrResumeThread>[0];
|
||||
try {
|
||||
restartContextEngineCodexThread = () =>
|
||||
startOrResumeThread(buildThreadLifecycleParams());
|
||||
const startupThread = await startOrResumeThread(buildThreadLifecycleParams());
|
||||
if (runAbortController.signal.aborted) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw new Error("codex app-server startup aborted");
|
||||
}
|
||||
startupSandboxEnvironmentAcquired = false;
|
||||
startupAttemptSucceeded = true;
|
||||
return {
|
||||
client: startupClient,
|
||||
thread: startupThread,
|
||||
sandboxEnvironment: startupSandboxEnvironment,
|
||||
environmentSelection: startupEnvironmentSelection,
|
||||
executionCwd: startupExecutionCwd,
|
||||
sandboxPolicy: startupSandboxPolicy,
|
||||
};
|
||||
} catch (error) {
|
||||
await releaseStartupSandboxEnvironment();
|
||||
throw error;
|
||||
} finally {
|
||||
if (releaseStartupResourcesOnTimeout === releaseStartupSandboxEnvironment) {
|
||||
releaseStartupResourcesOnTimeout = undefined;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (releaseStartupResourcesOnTimeout === releaseStartupSandboxEnvironment) {
|
||||
releaseStartupResourcesOnTimeout = undefined;
|
||||
if (!startupAttemptSucceeded) {
|
||||
if (releaseSharedClientLease === startupClientLease) {
|
||||
releaseSharedClientLease = undefined;
|
||||
}
|
||||
startupClientLease?.();
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -3104,6 +3125,8 @@ export async function runCodexAppServerAttempt(
|
||||
},
|
||||
});
|
||||
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
|
||||
releaseSharedClientLease?.();
|
||||
releaseSharedClientLease = undefined;
|
||||
if (usageLimitError) {
|
||||
await markCodexAuthProfileBlockedFromRateLimits({
|
||||
params,
|
||||
@@ -3123,6 +3146,8 @@ export async function runCodexAppServerAttempt(
|
||||
}
|
||||
}
|
||||
if (!turn) {
|
||||
releaseSharedClientLease?.();
|
||||
releaseSharedClientLease = undefined;
|
||||
throw new Error("codex app-server turn/start failed without an error");
|
||||
}
|
||||
turnId = turn.turn.id;
|
||||
@@ -3229,10 +3254,20 @@ export async function runCodexAppServerAttempt(
|
||||
touchTurnCompletionActivity("turn:start", { attemptProgress: true });
|
||||
|
||||
const abortListener = () => {
|
||||
const shouldRetireClient = timedOut;
|
||||
if (shouldRetireClient) {
|
||||
void retireCodexAppServerClientAfterTimedOutTurn(client, {
|
||||
threadId: thread.threadId,
|
||||
turnId: activeTurnId,
|
||||
reason: String(runAbortController.signal.reason ?? "timeout"),
|
||||
}).finally(() => {
|
||||
resolveCompletion?.();
|
||||
});
|
||||
return;
|
||||
}
|
||||
interruptCodexTurnBestEffort(client, {
|
||||
threadId: thread.threadId,
|
||||
turnId: activeTurnId,
|
||||
timeoutMs: timedOut ? CODEX_APP_SERVER_INTERRUPT_TIMEOUT_MS : undefined,
|
||||
});
|
||||
resolveCompletion?.();
|
||||
};
|
||||
@@ -3479,6 +3514,7 @@ export async function runCodexAppServerAttempt(
|
||||
notificationCleanup();
|
||||
requestCleanup();
|
||||
closeCleanup?.();
|
||||
releaseSharedClientLease?.();
|
||||
if (nativeHookRelay) {
|
||||
if (shouldDelayNativeHookRelayUnregister) {
|
||||
// Codex hook subprocesses can outlive a completed app-server turn by a
|
||||
@@ -4049,6 +4085,51 @@ async function unsubscribeCodexThreadBestEffort(
|
||||
}
|
||||
}
|
||||
|
||||
async function retireCodexAppServerClientAfterTimedOutTurn(
|
||||
client: CodexAppServerClient,
|
||||
params: {
|
||||
threadId: string;
|
||||
turnId: string;
|
||||
reason: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
const retiredSharedClient = retireSharedCodexAppServerClientIfCurrent(client);
|
||||
const detachedSharedClient = Boolean(retiredSharedClient);
|
||||
interruptCodexTurnBestEffort(client, {
|
||||
threadId: params.threadId,
|
||||
turnId: params.turnId,
|
||||
timeoutMs: CODEX_APP_SERVER_INTERRUPT_TIMEOUT_MS,
|
||||
});
|
||||
await unsubscribeCodexThreadBestEffort(client, {
|
||||
threadId: params.threadId,
|
||||
timeoutMs: CODEX_APP_SERVER_UNSUBSCRIBE_TIMEOUT_MS,
|
||||
});
|
||||
let closedClient = retiredSharedClient?.closed ?? false;
|
||||
if (!detachedSharedClient) {
|
||||
const close = (client as { close?: () => void }).close;
|
||||
if (typeof close === "function") {
|
||||
try {
|
||||
close.call(client);
|
||||
closedClient = true;
|
||||
} catch (error) {
|
||||
embeddedAgentLog.debug("codex app-server client close failed during timeout cleanup", {
|
||||
threadId: params.threadId,
|
||||
turnId: params.turnId,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
embeddedAgentLog.warn("codex app-server client retired after timed-out turn", {
|
||||
threadId: params.threadId,
|
||||
turnId: params.turnId,
|
||||
reason: params.reason,
|
||||
detachedSharedClient,
|
||||
closedClient,
|
||||
activeSharedClientLeases: retiredSharedClient?.activeLeases ?? 0,
|
||||
});
|
||||
}
|
||||
|
||||
type DynamicToolBuildParams = {
|
||||
params: EmbeddedRunAttemptParams;
|
||||
resolvedWorkspace: string;
|
||||
|
||||
@@ -43,7 +43,12 @@ let clearSharedCodexAppServerClient: typeof import("./shared-client.js").clearSh
|
||||
let clearSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").clearSharedCodexAppServerClientIfCurrent;
|
||||
let clearSharedCodexAppServerClientIfCurrentAndWait: typeof import("./shared-client.js").clearSharedCodexAppServerClientIfCurrentAndWait;
|
||||
let createIsolatedCodexAppServerClient: typeof import("./shared-client.js").createIsolatedCodexAppServerClient;
|
||||
let detachSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").detachSharedCodexAppServerClientIfCurrent;
|
||||
let getLeasedSharedCodexAppServerClient: typeof import("./shared-client.js").getLeasedSharedCodexAppServerClient;
|
||||
let getSharedCodexAppServerClient: typeof import("./shared-client.js").getSharedCodexAppServerClient;
|
||||
let retainSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").retainSharedCodexAppServerClientIfCurrent;
|
||||
let releaseLeasedSharedCodexAppServerClient: typeof import("./shared-client.js").releaseLeasedSharedCodexAppServerClient;
|
||||
let retireSharedCodexAppServerClientIfCurrent: typeof import("./shared-client.js").retireSharedCodexAppServerClientIfCurrent;
|
||||
let resetSharedCodexAppServerClientForTests: typeof import("./shared-client.js").resetSharedCodexAppServerClientForTests;
|
||||
|
||||
async function sendInitializeResult(
|
||||
@@ -116,7 +121,12 @@ describe("shared Codex app-server client", () => {
|
||||
clearSharedCodexAppServerClientIfCurrent,
|
||||
clearSharedCodexAppServerClientIfCurrentAndWait,
|
||||
createIsolatedCodexAppServerClient,
|
||||
detachSharedCodexAppServerClientIfCurrent,
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
getSharedCodexAppServerClient,
|
||||
retainSharedCodexAppServerClientIfCurrent,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
retireSharedCodexAppServerClientIfCurrent,
|
||||
resetSharedCodexAppServerClientForTests,
|
||||
} = await import("./shared-client.js"));
|
||||
});
|
||||
@@ -316,6 +326,39 @@ describe("shared Codex app-server client", () => {
|
||||
expect(startSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("preserves keyed shared-client state when adding lease metadata", async () => {
|
||||
const legacy = createClientHarness();
|
||||
const startOptions = {
|
||||
transport: "websocket" as const,
|
||||
command: "codex",
|
||||
args: [],
|
||||
url: "ws://127.0.0.1:39176",
|
||||
authToken: "tok-keyed",
|
||||
headers: {},
|
||||
};
|
||||
const key = codexAppServerStartOptionsKey(startOptions, {
|
||||
agentDir: "/tmp/openclaw-agent",
|
||||
});
|
||||
const globalState = globalThis as typeof globalThis & {
|
||||
[key: symbol]: unknown;
|
||||
};
|
||||
globalState[Symbol.for("openclaw.codexAppServerClientState")] = {
|
||||
clients: new Map([[key, { client: legacy.client, promise: Promise.resolve(legacy.client) }]]),
|
||||
};
|
||||
|
||||
await expect(getLeasedSharedCodexAppServerClient({ startOptions })).resolves.toBe(
|
||||
legacy.client,
|
||||
);
|
||||
expect(retireSharedCodexAppServerClientIfCurrent(legacy.client)).toEqual({
|
||||
activeLeases: 1,
|
||||
closed: false,
|
||||
});
|
||||
expect(legacy.process.stdin.destroyed).toBe(false);
|
||||
|
||||
expect(releaseLeasedSharedCodexAppServerClient(legacy.client)).toBe(true);
|
||||
expect(legacy.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps an active shared client alive when another agent dir uses a different key", async () => {
|
||||
const first = createClientHarness();
|
||||
const second = createClientHarness();
|
||||
@@ -508,6 +551,101 @@ describe("shared Codex app-server client", () => {
|
||||
expect(second.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("can detach the current shared client without closing it", async () => {
|
||||
const first = createClientHarness();
|
||||
const second = createClientHarness();
|
||||
vi.spyOn(CodexAppServerClient, "start")
|
||||
.mockReturnValueOnce(first.client)
|
||||
.mockReturnValueOnce(second.client);
|
||||
|
||||
const firstList = listCodexAppServerModels({ timeoutMs: 1000 });
|
||||
await sendInitializeResult(first, "openclaw/0.125.0 (macOS; test)");
|
||||
await sendEmptyModelList(first);
|
||||
await expect(firstList).resolves.toEqual({ models: [] });
|
||||
|
||||
expect(detachSharedCodexAppServerClientIfCurrent(first.client)).toBe(true);
|
||||
expect(first.process.stdin.destroyed).toBe(false);
|
||||
|
||||
const secondList = listCodexAppServerModels({ timeoutMs: 1000 });
|
||||
await sendInitializeResult(second, "openclaw/0.125.0 (macOS; test)");
|
||||
await sendEmptyModelList(second);
|
||||
await expect(secondList).resolves.toEqual({ models: [] });
|
||||
|
||||
expect(detachSharedCodexAppServerClientIfCurrent(first.client)).toBe(false);
|
||||
first.client.close();
|
||||
expect(first.process.stdin.destroyed).toBe(true);
|
||||
expect(second.process.kill).not.toHaveBeenCalled();
|
||||
expect(detachSharedCodexAppServerClientIfCurrent(second.client)).toBe(true);
|
||||
second.client.close();
|
||||
expect(second.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("closes a retired shared app-server after all active leases release", async () => {
|
||||
const first = createClientHarness();
|
||||
const second = createClientHarness();
|
||||
vi.spyOn(CodexAppServerClient, "start")
|
||||
.mockReturnValueOnce(first.client)
|
||||
.mockReturnValueOnce(second.client);
|
||||
|
||||
const firstList = listCodexAppServerModels({ timeoutMs: 1000 });
|
||||
await sendInitializeResult(first, "openclaw/0.125.0 (macOS; test)");
|
||||
await sendEmptyModelList(first);
|
||||
await expect(firstList).resolves.toEqual({ models: [] });
|
||||
|
||||
const releaseFirst = retainSharedCodexAppServerClientIfCurrent(first.client);
|
||||
const releaseSecond = retainSharedCodexAppServerClientIfCurrent(first.client);
|
||||
expect(releaseFirst).toBeTypeOf("function");
|
||||
expect(releaseSecond).toBeTypeOf("function");
|
||||
expect(retireSharedCodexAppServerClientIfCurrent(first.client)).toEqual({
|
||||
activeLeases: 2,
|
||||
closed: false,
|
||||
});
|
||||
expect(first.process.stdin.destroyed).toBe(false);
|
||||
|
||||
const secondList = listCodexAppServerModels({ timeoutMs: 1000 });
|
||||
await sendInitializeResult(second, "openclaw/0.125.0 (macOS; test)");
|
||||
await sendEmptyModelList(second);
|
||||
await expect(secondList).resolves.toEqual({ models: [] });
|
||||
|
||||
releaseFirst?.();
|
||||
expect(first.process.stdin.destroyed).toBe(false);
|
||||
releaseSecond?.();
|
||||
expect(first.process.stdin.destroyed).toBe(true);
|
||||
expect(second.process.kill).not.toHaveBeenCalled();
|
||||
expect(retireSharedCodexAppServerClientIfCurrent(second.client)).toEqual({
|
||||
activeLeases: 0,
|
||||
closed: true,
|
||||
});
|
||||
expect(second.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("leases shared app-server clients before returning concurrent acquirers", async () => {
|
||||
const first = createClientHarness();
|
||||
vi.spyOn(CodexAppServerClient, "start").mockReturnValueOnce(first.client);
|
||||
|
||||
const firstLease = getLeasedSharedCodexAppServerClient({ timeoutMs: 1000 });
|
||||
const secondLease = getLeasedSharedCodexAppServerClient({ timeoutMs: 1000 });
|
||||
await sendInitializeResult(first, "openclaw/0.125.0 (macOS; test)");
|
||||
await expect(firstLease).resolves.toBe(first.client);
|
||||
await expect(secondLease).resolves.toBe(first.client);
|
||||
|
||||
expect(retireSharedCodexAppServerClientIfCurrent(first.client)).toEqual({
|
||||
activeLeases: 2,
|
||||
closed: false,
|
||||
});
|
||||
expect(retireSharedCodexAppServerClientIfCurrent(first.client)).toEqual({
|
||||
activeLeases: 2,
|
||||
closed: false,
|
||||
});
|
||||
expect(first.process.stdin.destroyed).toBe(false);
|
||||
|
||||
expect(releaseLeasedSharedCodexAppServerClient(first.client)).toBe(true);
|
||||
expect(first.process.stdin.destroyed).toBe(false);
|
||||
expect(releaseLeasedSharedCodexAppServerClient(first.client)).toBe(true);
|
||||
expect(first.process.stdin.destroyed).toBe(true);
|
||||
expect(releaseLeasedSharedCodexAppServerClient(first.client)).toBe(false);
|
||||
});
|
||||
|
||||
it("waits only for the shared client that is still current", async () => {
|
||||
const first = createClientHarness();
|
||||
const second = createClientHarness();
|
||||
|
||||
@@ -17,10 +17,13 @@ import { withTimeout } from "./timeout.js";
|
||||
type SharedCodexAppServerClientEntry = {
|
||||
client?: CodexAppServerClient;
|
||||
promise?: Promise<CodexAppServerClient>;
|
||||
activeLeases: number;
|
||||
closeWhenIdle: boolean;
|
||||
};
|
||||
|
||||
type SharedCodexAppServerClientState = {
|
||||
clients: Map<string, SharedCodexAppServerClientEntry>;
|
||||
leasedReleases: WeakMap<CodexAppServerClient, Array<() => void>>;
|
||||
};
|
||||
|
||||
type LegacySharedCodexAppServerClientState = Partial<SharedCodexAppServerClientEntry> & {
|
||||
@@ -28,6 +31,11 @@ type LegacySharedCodexAppServerClientState = Partial<SharedCodexAppServerClientE
|
||||
clients?: unknown;
|
||||
};
|
||||
|
||||
type KeyedSharedCodexAppServerClientState = {
|
||||
clients: Map<string, Partial<SharedCodexAppServerClientEntry>>;
|
||||
leasedReleases?: unknown;
|
||||
};
|
||||
|
||||
const SHARED_CODEX_APP_SERVER_CLIENT_STATE = Symbol.for("openclaw.codexAppServerClientState");
|
||||
|
||||
function getSharedCodexAppServerClientState(): SharedCodexAppServerClientState {
|
||||
@@ -35,31 +43,48 @@ function getSharedCodexAppServerClientState(): SharedCodexAppServerClientState {
|
||||
[SHARED_CODEX_APP_SERVER_CLIENT_STATE]?: unknown;
|
||||
};
|
||||
const state = globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE];
|
||||
if (isSharedCodexAppServerClientState(state)) {
|
||||
return state;
|
||||
const keyedState = readKeyedSharedCodexAppServerClientState(state);
|
||||
if (keyedState) {
|
||||
const clients = keyedState.clients as Map<string, SharedCodexAppServerClientEntry>;
|
||||
for (const entry of clients.values()) {
|
||||
entry.activeLeases ??= 0;
|
||||
entry.closeWhenIdle ??= false;
|
||||
}
|
||||
const nextState: SharedCodexAppServerClientState = {
|
||||
clients,
|
||||
leasedReleases:
|
||||
keyedState.leasedReleases instanceof WeakMap ? keyedState.leasedReleases : new WeakMap(),
|
||||
};
|
||||
globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE] = nextState;
|
||||
return nextState;
|
||||
}
|
||||
const legacyState = readLegacySharedCodexAppServerClientState(state);
|
||||
const clients = new Map<string, SharedCodexAppServerClientEntry>();
|
||||
if (legacyState?.key && (legacyState.client || legacyState.promise)) {
|
||||
const legacyKey = legacyState.key;
|
||||
clients.set(legacyKey, { client: legacyState.client, promise: legacyState.promise });
|
||||
clients.set(legacyKey, {
|
||||
client: legacyState.client,
|
||||
promise: legacyState.promise,
|
||||
activeLeases: 0,
|
||||
closeWhenIdle: false,
|
||||
});
|
||||
legacyState.client?.addCloseHandler((closedClient) =>
|
||||
clearSharedClientEntryIfCurrent(legacyKey, closedClient),
|
||||
);
|
||||
}
|
||||
const nextState: SharedCodexAppServerClientState = { clients };
|
||||
const nextState: SharedCodexAppServerClientState = { clients, leasedReleases: new WeakMap() };
|
||||
globalState[SHARED_CODEX_APP_SERVER_CLIENT_STATE] = nextState;
|
||||
return nextState;
|
||||
}
|
||||
|
||||
function isSharedCodexAppServerClientState(
|
||||
function readKeyedSharedCodexAppServerClientState(
|
||||
value: unknown,
|
||||
): value is SharedCodexAppServerClientState {
|
||||
return (
|
||||
value !== null &&
|
||||
): KeyedSharedCodexAppServerClientState | undefined {
|
||||
return value !== null &&
|
||||
typeof value === "object" &&
|
||||
(value as { clients?: unknown }).clients instanceof Map
|
||||
);
|
||||
? (value as KeyedSharedCodexAppServerClientState)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function readLegacySharedCodexAppServerClientState(
|
||||
@@ -71,13 +96,59 @@ function readLegacySharedCodexAppServerClientState(
|
||||
return value as LegacySharedCodexAppServerClientState;
|
||||
}
|
||||
|
||||
export async function getSharedCodexAppServerClient(options?: {
|
||||
type SharedCodexAppServerClientOptions = {
|
||||
startOptions?: CodexAppServerStartOptions;
|
||||
timeoutMs?: number;
|
||||
authProfileId?: string | null;
|
||||
agentDir?: string;
|
||||
config?: Parameters<typeof resolveCodexAppServerAuthProfileIdForAgent>[0]["config"];
|
||||
}): Promise<CodexAppServerClient> {
|
||||
};
|
||||
|
||||
export async function getSharedCodexAppServerClient(
|
||||
options?: SharedCodexAppServerClientOptions,
|
||||
): Promise<CodexAppServerClient> {
|
||||
return (await acquireSharedCodexAppServerClient(options)).client;
|
||||
}
|
||||
|
||||
export async function getLeasedSharedCodexAppServerClient(
|
||||
options?: SharedCodexAppServerClientOptions,
|
||||
): Promise<CodexAppServerClient> {
|
||||
const acquired = await acquireSharedCodexAppServerClient(options, { leased: true });
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
const releases = state.leasedReleases.get(acquired.client) ?? [];
|
||||
releases.push(acquired.release);
|
||||
state.leasedReleases.set(acquired.client, releases);
|
||||
return acquired.client;
|
||||
}
|
||||
|
||||
export function releaseLeasedSharedCodexAppServerClient(client: CodexAppServerClient): boolean {
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
const releases = state.leasedReleases.get(client);
|
||||
if (!releases) {
|
||||
return false;
|
||||
}
|
||||
const release = releases.pop();
|
||||
if (!release) {
|
||||
return false;
|
||||
}
|
||||
if (releases.length === 0) {
|
||||
state.leasedReleases.delete(client);
|
||||
}
|
||||
release();
|
||||
return true;
|
||||
}
|
||||
|
||||
async function acquireSharedCodexAppServerClient(
|
||||
options?: SharedCodexAppServerClientOptions,
|
||||
): Promise<{ client: CodexAppServerClient }>;
|
||||
async function acquireSharedCodexAppServerClient(
|
||||
options: SharedCodexAppServerClientOptions | undefined,
|
||||
leaseOptions: { leased: true },
|
||||
): Promise<{ client: CodexAppServerClient; release: () => void }>;
|
||||
async function acquireSharedCodexAppServerClient(
|
||||
options?: SharedCodexAppServerClientOptions,
|
||||
leaseOptions?: { leased: true },
|
||||
): Promise<{ client: CodexAppServerClient; release?: () => void }> {
|
||||
const agentDir = options?.agentDir ?? resolveDefaultAgentDir(options?.config ?? {});
|
||||
const usesNativeAuth = options?.authProfileId === null;
|
||||
const requestedAuthProfileId =
|
||||
@@ -132,11 +203,13 @@ export async function getSharedCodexAppServerClient(options?: {
|
||||
}
|
||||
})());
|
||||
try {
|
||||
return await withTimeout(
|
||||
const client = await withTimeout(
|
||||
sharedPromise,
|
||||
options?.timeoutMs ?? 0,
|
||||
"codex app-server initialize timed out",
|
||||
);
|
||||
const release = leaseOptions?.leased ? retainSharedClientEntry(entry) : undefined;
|
||||
return release ? { client, release } : { client };
|
||||
} catch (error) {
|
||||
const currentEntry = state.clients.get(key);
|
||||
if (currentEntry?.promise === sharedPromise) {
|
||||
@@ -223,6 +296,59 @@ export function clearSharedCodexAppServerClientIfCurrent(
|
||||
return false;
|
||||
}
|
||||
|
||||
export function detachSharedCodexAppServerClientIfCurrent(
|
||||
client: CodexAppServerClient | undefined,
|
||||
): boolean {
|
||||
if (!client) {
|
||||
return false;
|
||||
}
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
for (const [key, entry] of state.clients) {
|
||||
if (entry.client === client) {
|
||||
state.clients.delete(key);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function retainSharedCodexAppServerClientIfCurrent(
|
||||
client: CodexAppServerClient | undefined,
|
||||
): (() => void) | undefined {
|
||||
if (!client) {
|
||||
return undefined;
|
||||
}
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
for (const entry of state.clients.values()) {
|
||||
if (entry.client === client) {
|
||||
return retainSharedClientEntry(entry);
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function retireSharedCodexAppServerClientIfCurrent(
|
||||
client: CodexAppServerClient | undefined,
|
||||
): { activeLeases: number; closed: boolean } | undefined {
|
||||
if (!client) {
|
||||
return undefined;
|
||||
}
|
||||
const state = getSharedCodexAppServerClientState();
|
||||
for (const [key, entry] of state.clients) {
|
||||
if (entry.client === client) {
|
||||
state.clients.delete(key);
|
||||
entry.closeWhenIdle = true;
|
||||
const closed = closeRetiredSharedClientEntryIfIdle(entry);
|
||||
return { activeLeases: entry.activeLeases, closed };
|
||||
}
|
||||
}
|
||||
const activeLeases = state.leasedReleases.get(client)?.length ?? 0;
|
||||
if (activeLeases > 0) {
|
||||
return { activeLeases, closed: false };
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export async function clearSharedCodexAppServerClientIfCurrentAndWait(
|
||||
client: CodexAppServerClient | undefined,
|
||||
options?: {
|
||||
@@ -260,7 +386,7 @@ function getOrCreateSharedClientEntry(
|
||||
): SharedCodexAppServerClientEntry {
|
||||
let entry = state.clients.get(key);
|
||||
if (!entry) {
|
||||
entry = {};
|
||||
entry = { activeLeases: 0, closeWhenIdle: false };
|
||||
state.clients.set(key, entry);
|
||||
}
|
||||
return entry;
|
||||
@@ -283,6 +409,30 @@ function clearSharedClientEntryIfCurrent(key: string, client: CodexAppServerClie
|
||||
}
|
||||
}
|
||||
|
||||
function retainSharedClientEntry(entry: SharedCodexAppServerClientEntry): () => void {
|
||||
let released = false;
|
||||
entry.activeLeases += 1;
|
||||
return () => {
|
||||
if (released) {
|
||||
return;
|
||||
}
|
||||
released = true;
|
||||
entry.activeLeases = Math.max(0, entry.activeLeases - 1);
|
||||
closeRetiredSharedClientEntryIfIdle(entry);
|
||||
};
|
||||
}
|
||||
|
||||
function closeRetiredSharedClientEntryIfIdle(entry: SharedCodexAppServerClientEntry): boolean {
|
||||
if (!entry.closeWhenIdle || entry.activeLeases > 0 || !entry.client) {
|
||||
return false;
|
||||
}
|
||||
const client = entry.client;
|
||||
entry.closeWhenIdle = false;
|
||||
entry.client = undefined;
|
||||
client.close();
|
||||
return true;
|
||||
}
|
||||
|
||||
function collectSharedClients(state: SharedCodexAppServerClientState): CodexAppServerClient[] {
|
||||
return [
|
||||
...new Set(
|
||||
|
||||
@@ -30,6 +30,9 @@ vi.mock("./session-binding.js", () => ({
|
||||
|
||||
vi.mock("./shared-client.js", () => ({
|
||||
getSharedCodexAppServerClient: (...args: unknown[]) => getSharedCodexAppServerClientMock(...args),
|
||||
getLeasedSharedCodexAppServerClient: (...args: unknown[]) =>
|
||||
getSharedCodexAppServerClientMock(...args),
|
||||
releaseLeasedSharedCodexAppServerClient: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./auth-bridge.js", () => ({
|
||||
|
||||
@@ -66,7 +66,10 @@ import { rememberCodexRateLimits, readRecentCodexRateLimits } from "./rate-limit
|
||||
import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js";
|
||||
import { resolveCodexNativeExecutionBlock } from "./sandbox-guard.js";
|
||||
import { readCodexAppServerBinding } from "./session-binding.js";
|
||||
import { getSharedCodexAppServerClient } from "./shared-client.js";
|
||||
import {
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
} from "./shared-client.js";
|
||||
import {
|
||||
buildCodexRuntimeThreadConfig,
|
||||
CODEX_NATIVE_PERSONALITY_NONE,
|
||||
@@ -145,7 +148,7 @@ export async function runCodexAppServerSideQuestion(
|
||||
const pluginConfig = readCodexPluginConfig(options.pluginConfig);
|
||||
const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig });
|
||||
const authProfileId = params.authProfileId ?? binding.authProfileId;
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: appServer.start,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
authProfileId,
|
||||
@@ -403,6 +406,7 @@ export async function runCodexAppServerSideQuestion(
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
});
|
||||
} finally {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
nativeHookRelay?.unregister();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,11 @@ const agentRuntimeMocks = vi.hoisted(() => ({
|
||||
saveAuthProfileStore: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./app-server/shared-client.js", () => sharedClientMocks);
|
||||
vi.mock("./app-server/shared-client.js", () => ({
|
||||
...sharedClientMocks,
|
||||
getLeasedSharedCodexAppServerClient: sharedClientMocks.getSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient: vi.fn(),
|
||||
}));
|
||||
vi.mock("openclaw/plugin-sdk/agent-runtime", () => agentRuntimeMocks);
|
||||
|
||||
import {
|
||||
|
||||
@@ -33,7 +33,10 @@ import {
|
||||
writeCodexAppServerBinding,
|
||||
type CodexAppServerAuthProfileLookup,
|
||||
} from "./app-server/session-binding.js";
|
||||
import { getSharedCodexAppServerClient } from "./app-server/shared-client.js";
|
||||
import {
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
} from "./app-server/shared-client.js";
|
||||
import { CODEX_NATIVE_PERSONALITY_NONE } from "./app-server/thread-lifecycle.js";
|
||||
import { formatCodexDisplayText } from "./command-formatters.js";
|
||||
import {
|
||||
@@ -270,52 +273,56 @@ async function attachExistingThread(params: {
|
||||
modelProvider: params.modelProvider,
|
||||
...agentLookup,
|
||||
});
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: runtime.start,
|
||||
timeoutMs: runtime.requestTimeoutMs,
|
||||
authProfileId: params.authProfileId,
|
||||
...agentLookup,
|
||||
});
|
||||
const response: CodexThreadResumeResponse = await client.request(
|
||||
CODEX_CONTROL_METHODS.resumeThread,
|
||||
{
|
||||
threadId: params.threadId,
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
...(modelProvider ? { modelProvider } : {}),
|
||||
personality: CODEX_NATIVE_PERSONALITY_NONE,
|
||||
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
|
||||
approvalsReviewer: runtime.approvalsReviewer,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
...((params.serviceTier ?? runtime.serviceTier)
|
||||
? { serviceTier: params.serviceTier ?? runtime.serviceTier }
|
||||
: {}),
|
||||
persistExtendedHistory: true,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
const thread = response.thread;
|
||||
const runtimeApprovalPolicy =
|
||||
typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined;
|
||||
await writeCodexAppServerBinding(
|
||||
params.sessionFile,
|
||||
{
|
||||
threadId: thread.id,
|
||||
cwd: thread.cwd ?? params.workspaceDir,
|
||||
authProfileId: params.authProfileId,
|
||||
model: response.model ?? params.model,
|
||||
modelProvider: normalizeCodexAppServerBindingModelProvider({
|
||||
try {
|
||||
const response: CodexThreadResumeResponse = await client.request(
|
||||
CODEX_CONTROL_METHODS.resumeThread,
|
||||
{
|
||||
threadId: params.threadId,
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
...(modelProvider ? { modelProvider } : {}),
|
||||
personality: CODEX_NATIVE_PERSONALITY_NONE,
|
||||
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
|
||||
approvalsReviewer: runtime.approvalsReviewer,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
...((params.serviceTier ?? runtime.serviceTier)
|
||||
? { serviceTier: params.serviceTier ?? runtime.serviceTier }
|
||||
: {}),
|
||||
persistExtendedHistory: true,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
const thread = response.thread;
|
||||
const runtimeApprovalPolicy =
|
||||
typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined;
|
||||
await writeCodexAppServerBinding(
|
||||
params.sessionFile,
|
||||
{
|
||||
threadId: thread.id,
|
||||
cwd: thread.cwd ?? params.workspaceDir,
|
||||
authProfileId: params.authProfileId,
|
||||
modelProvider: response.modelProvider ?? params.modelProvider,
|
||||
model: response.model ?? params.model,
|
||||
modelProvider: normalizeCodexAppServerBindingModelProvider({
|
||||
authProfileId: params.authProfileId,
|
||||
modelProvider: response.modelProvider ?? params.modelProvider,
|
||||
...agentLookup,
|
||||
}),
|
||||
approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
serviceTier: params.serviceTier ?? runtime.serviceTier,
|
||||
},
|
||||
{
|
||||
...agentLookup,
|
||||
}),
|
||||
approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
serviceTier: params.serviceTier ?? runtime.serviceTier,
|
||||
},
|
||||
{
|
||||
...agentLookup,
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
} finally {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
async function createThread(params: {
|
||||
@@ -340,54 +347,58 @@ async function createThread(params: {
|
||||
modelProvider: params.modelProvider,
|
||||
...agentLookup,
|
||||
});
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: runtime.start,
|
||||
timeoutMs: runtime.requestTimeoutMs,
|
||||
authProfileId: params.authProfileId,
|
||||
...agentLookup,
|
||||
});
|
||||
const response: CodexThreadStartResponse = await client.request(
|
||||
"thread/start",
|
||||
{
|
||||
cwd: params.workspaceDir,
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
...(modelProvider ? { modelProvider } : {}),
|
||||
personality: CODEX_NATIVE_PERSONALITY_NONE,
|
||||
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
|
||||
approvalsReviewer: runtime.approvalsReviewer,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
...((params.serviceTier ?? runtime.serviceTier)
|
||||
? { serviceTier: params.serviceTier ?? runtime.serviceTier }
|
||||
: {}),
|
||||
developerInstructions:
|
||||
"This Codex thread is bound to an OpenClaw conversation. Answer normally; OpenClaw will deliver your final response back to the conversation.",
|
||||
experimentalRawEvents: true,
|
||||
persistExtendedHistory: true,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
const runtimeApprovalPolicy =
|
||||
typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined;
|
||||
await writeCodexAppServerBinding(
|
||||
params.sessionFile,
|
||||
{
|
||||
threadId: response.thread.id,
|
||||
cwd: response.thread.cwd ?? params.workspaceDir,
|
||||
authProfileId: params.authProfileId,
|
||||
model: response.model ?? params.model,
|
||||
modelProvider: normalizeCodexAppServerBindingModelProvider({
|
||||
try {
|
||||
const response: CodexThreadStartResponse = await client.request(
|
||||
"thread/start",
|
||||
{
|
||||
cwd: params.workspaceDir,
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
...(modelProvider ? { modelProvider } : {}),
|
||||
personality: CODEX_NATIVE_PERSONALITY_NONE,
|
||||
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
|
||||
approvalsReviewer: runtime.approvalsReviewer,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
...((params.serviceTier ?? runtime.serviceTier)
|
||||
? { serviceTier: params.serviceTier ?? runtime.serviceTier }
|
||||
: {}),
|
||||
developerInstructions:
|
||||
"This Codex thread is bound to an OpenClaw conversation. Answer normally; OpenClaw will deliver your final response back to the conversation.",
|
||||
experimentalRawEvents: true,
|
||||
persistExtendedHistory: true,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
const runtimeApprovalPolicy =
|
||||
typeof runtime.approvalPolicy === "string" ? runtime.approvalPolicy : undefined;
|
||||
await writeCodexAppServerBinding(
|
||||
params.sessionFile,
|
||||
{
|
||||
threadId: response.thread.id,
|
||||
cwd: response.thread.cwd ?? params.workspaceDir,
|
||||
authProfileId: params.authProfileId,
|
||||
modelProvider: response.modelProvider ?? params.modelProvider,
|
||||
model: response.model ?? params.model,
|
||||
modelProvider: normalizeCodexAppServerBindingModelProvider({
|
||||
authProfileId: params.authProfileId,
|
||||
modelProvider: response.modelProvider ?? params.modelProvider,
|
||||
...agentLookup,
|
||||
}),
|
||||
approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
serviceTier: params.serviceTier ?? runtime.serviceTier,
|
||||
},
|
||||
{
|
||||
...agentLookup,
|
||||
}),
|
||||
approvalPolicy: params.approvalPolicy ?? runtimeApprovalPolicy,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
serviceTier: params.serviceTier ?? runtime.serviceTier,
|
||||
},
|
||||
{
|
||||
...agentLookup,
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
} finally {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
async function runBoundTurn(params: {
|
||||
@@ -407,7 +418,7 @@ async function runBoundTurn(params: {
|
||||
throw new Error("bound Codex conversation has no thread binding");
|
||||
}
|
||||
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: runtime.start,
|
||||
timeoutMs: runtime.requestTimeoutMs,
|
||||
authProfileId: binding.authProfileId,
|
||||
@@ -498,6 +509,7 @@ async function runBoundTurn(params: {
|
||||
} finally {
|
||||
notificationCleanup();
|
||||
requestCleanup();
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,11 @@ const sharedClientMocks = vi.hoisted(() => ({
|
||||
getSharedCodexAppServerClient: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./app-server/shared-client.js", () => sharedClientMocks);
|
||||
vi.mock("./app-server/shared-client.js", () => ({
|
||||
...sharedClientMocks,
|
||||
getLeasedSharedCodexAppServerClient: sharedClientMocks.getSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("codex conversation controls", () => {
|
||||
beforeEach(async () => {
|
||||
|
||||
@@ -10,7 +10,10 @@ import {
|
||||
readCodexAppServerBinding,
|
||||
writeCodexAppServerBinding,
|
||||
} from "./app-server/session-binding.js";
|
||||
import { getSharedCodexAppServerClient } from "./app-server/shared-client.js";
|
||||
import {
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
} from "./app-server/shared-client.js";
|
||||
import { formatCodexDisplayText } from "./command-formatters.js";
|
||||
|
||||
type ActiveTurn = {
|
||||
@@ -61,20 +64,24 @@ export async function stopCodexConversationTurn(params: {
|
||||
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig });
|
||||
const lookup = buildBindingLookup(params);
|
||||
const binding = await readCodexAppServerBinding(params.sessionFile, lookup);
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: runtime.start,
|
||||
timeoutMs: runtime.requestTimeoutMs,
|
||||
authProfileId: binding?.authProfileId,
|
||||
...lookup,
|
||||
});
|
||||
await client.request(
|
||||
"turn/interrupt",
|
||||
{
|
||||
threadId: active.threadId,
|
||||
turnId: active.turnId,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
try {
|
||||
await client.request(
|
||||
"turn/interrupt",
|
||||
{
|
||||
threadId: active.threadId,
|
||||
turnId: active.turnId,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
} finally {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
return { stopped: true, message: "Codex stop requested." };
|
||||
}
|
||||
|
||||
@@ -96,21 +103,25 @@ export async function steerCodexConversationTurn(params: {
|
||||
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig });
|
||||
const lookup = buildBindingLookup(params);
|
||||
const binding = await readCodexAppServerBinding(params.sessionFile, lookup);
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: runtime.start,
|
||||
timeoutMs: runtime.requestTimeoutMs,
|
||||
authProfileId: binding?.authProfileId,
|
||||
...lookup,
|
||||
});
|
||||
await client.request(
|
||||
"turn/steer",
|
||||
{
|
||||
threadId: active.threadId,
|
||||
expectedTurnId: active.turnId,
|
||||
input: [{ type: "text", text, text_elements: [] }],
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
try {
|
||||
await client.request(
|
||||
"turn/steer",
|
||||
{
|
||||
threadId: active.threadId,
|
||||
expectedTurnId: active.turnId,
|
||||
input: [{ type: "text", text, text_elements: [] }],
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
} finally {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
return { steered: true, message: "Sent steer message to Codex." };
|
||||
}
|
||||
|
||||
@@ -261,25 +272,29 @@ async function resumeThreadWithOverrides(params: {
|
||||
serviceTier?: CodexServiceTier;
|
||||
}): Promise<CodexThreadResumeResponse> {
|
||||
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig });
|
||||
const client = await getSharedCodexAppServerClient({
|
||||
const client = await getLeasedSharedCodexAppServerClient({
|
||||
startOptions: runtime.start,
|
||||
timeoutMs: runtime.requestTimeoutMs,
|
||||
authProfileId: params.authProfileId,
|
||||
...buildBindingLookup(params),
|
||||
});
|
||||
return await client.request(
|
||||
CODEX_CONTROL_METHODS.resumeThread,
|
||||
{
|
||||
threadId: params.threadId,
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
approvalsReviewer: runtime.approvalsReviewer,
|
||||
...(params.serviceTier ? { serviceTier: params.serviceTier } : {}),
|
||||
persistExtendedHistory: true,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
try {
|
||||
return await client.request(
|
||||
CODEX_CONTROL_METHODS.resumeThread,
|
||||
{
|
||||
threadId: params.threadId,
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
|
||||
sandbox: params.sandbox ?? runtime.sandbox,
|
||||
approvalsReviewer: runtime.approvalsReviewer,
|
||||
...(params.serviceTier ? { serviceTier: params.serviceTier } : {}),
|
||||
persistExtendedHistory: true,
|
||||
},
|
||||
{ timeoutMs: runtime.requestTimeoutMs },
|
||||
);
|
||||
} finally {
|
||||
releaseLeasedSharedCodexAppServerClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
function buildBindingLookup(params: {
|
||||
|
||||
@@ -42,7 +42,8 @@ import type { v2 } from "../app-server/protocol.js";
|
||||
import { requestCodexAppServerJson } from "../app-server/request.js";
|
||||
import {
|
||||
clearSharedCodexAppServerClientIfCurrentAndWait,
|
||||
getSharedCodexAppServerClient,
|
||||
getLeasedSharedCodexAppServerClient,
|
||||
releaseLeasedSharedCodexAppServerClient,
|
||||
} from "../app-server/shared-client.js";
|
||||
import { applyCodexAuthItem, buildCodexAuthConfigPatchItems } from "./auth.js";
|
||||
import { buildCodexMigrationPlan } from "./plan.js";
|
||||
@@ -86,8 +87,8 @@ export function prepareTargetCodexAppServer(
|
||||
): CodexMigrationTargetAppServerPreparation {
|
||||
const appServer = resolveTargetCodexAppServer(ctx);
|
||||
const targets = resolveCodexMigrationTargets(ctx);
|
||||
let warmedClient: Awaited<ReturnType<typeof getSharedCodexAppServerClient>> | undefined;
|
||||
const ready = getSharedCodexAppServerClient({
|
||||
let warmedClient: Awaited<ReturnType<typeof getLeasedSharedCodexAppServerClient>> | undefined;
|
||||
const ready = getLeasedSharedCodexAppServerClient({
|
||||
startOptions: appServer.start,
|
||||
timeoutMs: 60_000,
|
||||
agentDir: targets.agentDir,
|
||||
@@ -101,6 +102,9 @@ export function prepareTargetCodexAppServer(
|
||||
return {
|
||||
async dispose() {
|
||||
await ready;
|
||||
if (warmedClient) {
|
||||
releaseLeasedSharedCodexAppServerClient(warmedClient);
|
||||
}
|
||||
await clearSharedCodexAppServerClientIfCurrentAndWait(warmedClient, {
|
||||
exitTimeoutMs: 2_000,
|
||||
forceKillDelayMs: 250,
|
||||
|
||||
@@ -319,6 +319,27 @@ describe("resolveRunFailoverDecision", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not rotate harness-owned assistant errors classified as timeout", () => {
|
||||
expect(
|
||||
resolveRunFailoverDecision({
|
||||
stage: "assistant",
|
||||
aborted: false,
|
||||
externalAbort: false,
|
||||
fallbackConfigured: true,
|
||||
failoverFailure: true,
|
||||
failoverReason: "timeout",
|
||||
timedOut: false,
|
||||
idleTimedOut: false,
|
||||
timedOutDuringCompaction: false,
|
||||
timedOutDuringToolExecution: false,
|
||||
harnessOwnsTransport: true,
|
||||
profileRotated: false,
|
||||
}),
|
||||
).toEqual({
|
||||
action: "continue_normal",
|
||||
});
|
||||
});
|
||||
|
||||
it("rotates concrete assistant failover failures that accompany harness-owned timeouts", () => {
|
||||
expect(
|
||||
resolveRunFailoverDecision({
|
||||
|
||||
@@ -116,11 +116,9 @@ function shouldRotateAssistant(params: AssistantDecisionParams): boolean {
|
||||
return false;
|
||||
}
|
||||
const timeoutFailure = isAssistantTimeoutFailure(params);
|
||||
if (
|
||||
timeoutFailure &&
|
||||
params.harnessOwnsTransport &&
|
||||
!isConcreteNonTimeoutAssistantFailure(params)
|
||||
) {
|
||||
const harnessOwnedTimeout =
|
||||
params.harnessOwnsTransport && (timeoutFailure || params.failoverReason === "timeout");
|
||||
if (harnessOwnedTimeout && !isConcreteNonTimeoutAssistantFailure(params)) {
|
||||
return false;
|
||||
}
|
||||
return (!params.aborted && params.failoverFailure) || timeoutFailure;
|
||||
|
||||
Reference in New Issue
Block a user