mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 14:14:08 +00:00
fix(codex): preserve shared app-server after startup app errors (#87399)
* fix(codex): preserve shared app-server after startup app errors * fix(codex): align startup cleanup tests with current types * test(config): isolate installed plugin ledger cache
This commit is contained in:
committed by
GitHub
parent
87944c0d80
commit
7f7eca1ad2
203
extensions/codex/src/app-server/attempt-startup.test.ts
Normal file
203
extensions/codex/src/app-server/attempt-startup.test.ts
Normal file
@@ -0,0 +1,203 @@
|
||||
import type {
|
||||
CodexBundleMcpThreadConfig,
|
||||
EmbeddedRunAttemptParams,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { startCodexAttemptThread } from "./attempt-startup.js";
|
||||
import { defaultLeasedCodexAppServerClientFactory } from "./client-factory.js";
|
||||
import { CodexAppServerClient } from "./client.js";
|
||||
import { type CodexPluginConfig, resolveCodexAppServerRuntimeOptions } from "./config.js";
|
||||
import { clearSharedCodexAppServerClient } from "./shared-client.js";
|
||||
import { createClientHarness, createCodexTestModel } from "./test-support.js";
|
||||
|
||||
type ClientHarness = ReturnType<typeof createClientHarness>;
|
||||
|
||||
function createAttemptParams(): EmbeddedRunAttemptParams {
|
||||
return {
|
||||
prompt: "hello",
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/tmp",
|
||||
runId: "run-1",
|
||||
provider: "codex",
|
||||
modelId: "gpt-5.4-codex",
|
||||
model: createCodexTestModel("codex"),
|
||||
thinkLevel: "medium",
|
||||
disableTools: true,
|
||||
timeoutMs: 5_000,
|
||||
authStorage: {} as never,
|
||||
authProfileStore: { version: 1, profiles: {} },
|
||||
modelRegistry: {} as never,
|
||||
} as EmbeddedRunAttemptParams;
|
||||
}
|
||||
|
||||
const pluginConfig: CodexPluginConfig = {
|
||||
appServer: { command: "codex" },
|
||||
};
|
||||
|
||||
const bundleMcpThreadConfig = {
|
||||
configPatch: undefined,
|
||||
diagnostics: [],
|
||||
evaluated: false,
|
||||
fingerprint: undefined,
|
||||
} satisfies CodexBundleMcpThreadConfig;
|
||||
|
||||
function readHarnessMessages(writes: string[]): Array<{ id?: number; method?: string }> {
|
||||
return writes.map((write) => JSON.parse(write) as { id?: number; method?: string });
|
||||
}
|
||||
|
||||
function startThreadWithHarness(
|
||||
startupTimeoutMs: number,
|
||||
signal = new AbortController().signal,
|
||||
overrides?: { pluginConfig?: CodexPluginConfig },
|
||||
) {
|
||||
const harness = createClientHarness();
|
||||
vi.spyOn(CodexAppServerClient, "start").mockReturnValue(harness.client);
|
||||
const effectivePluginConfig = overrides?.pluginConfig ?? pluginConfig;
|
||||
|
||||
const run = startCodexAttemptThread({
|
||||
attemptClientFactory: defaultLeasedCodexAppServerClientFactory,
|
||||
appServer: resolveCodexAppServerRuntimeOptions({ pluginConfig: effectivePluginConfig }),
|
||||
pluginConfig: effectivePluginConfig,
|
||||
computerUseConfig: effectivePluginConfig.computerUse ?? { enabled: false },
|
||||
startupAuthProfileId: undefined,
|
||||
startupAuthAccountCacheKey: undefined,
|
||||
startupEnvApiKeyCacheKey: undefined,
|
||||
agentDir: "/tmp/agent",
|
||||
config: undefined,
|
||||
buildAttemptParams: createAttemptParams,
|
||||
sessionAgentId: "agent-1",
|
||||
effectiveWorkspace: "/tmp",
|
||||
dynamicTools: [],
|
||||
developerInstructions: undefined,
|
||||
finalConfigPatch: undefined,
|
||||
bundleMcpThreadConfig,
|
||||
nativeToolSurfaceEnabled: true,
|
||||
sandboxExecServerEnabled: false,
|
||||
sandbox: null,
|
||||
contextEngineProjection: undefined,
|
||||
startupTimeoutMs,
|
||||
signal,
|
||||
onStartupTimeout: vi.fn(),
|
||||
spawnedBy: undefined,
|
||||
});
|
||||
|
||||
return { harness, run };
|
||||
}
|
||||
|
||||
async function answerInitialize(harness: ClientHarness): Promise<void> {
|
||||
await vi.waitFor(() => expect(harness.writes.length).toBeGreaterThanOrEqual(1), {
|
||||
interval: 1,
|
||||
timeout: 5_000,
|
||||
});
|
||||
const initialize = JSON.parse(harness.writes[0] ?? "{}") as { id?: number };
|
||||
harness.send({ id: initialize.id, result: { userAgent: "openclaw/0.125.0 (macOS; test)" } });
|
||||
}
|
||||
|
||||
async function waitForRequest(
|
||||
harness: ClientHarness,
|
||||
method: string,
|
||||
): Promise<{ id?: number; method?: string }> {
|
||||
await vi.waitFor(
|
||||
() =>
|
||||
expect(readHarnessMessages(harness.writes).some((write) => write.method === method)).toBe(
|
||||
true,
|
||||
),
|
||||
{ interval: 1, timeout: 5_000 },
|
||||
);
|
||||
const request = readHarnessMessages(harness.writes).find((write) => write.method === method);
|
||||
if (!request) {
|
||||
throw new Error(`${method} request was not written`);
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
async function waitForThreadStart(harness: ClientHarness): Promise<{ id?: number }> {
|
||||
return waitForRequest(harness, "thread/start");
|
||||
}
|
||||
|
||||
describe("startCodexAttemptThread", () => {
|
||||
beforeEach(() => {
|
||||
vi.stubEnv("CODEX_API_KEY", "");
|
||||
vi.stubEnv("OPENAI_API_KEY", "");
|
||||
clearSharedCodexAppServerClient();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearSharedCodexAppServerClient();
|
||||
vi.restoreAllMocks();
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
|
||||
it("keeps the shared app-server when thread startup fails with an app error", async () => {
|
||||
const { harness, run } = startThreadWithHarness(5_000);
|
||||
await answerInitialize(harness);
|
||||
const threadStart = await waitForThreadStart(harness);
|
||||
harness.send({
|
||||
id: threadStart.id,
|
||||
error: { code: -32000, message: "401 authentication_error: Invalid bearer token" },
|
||||
});
|
||||
|
||||
await expect(run).rejects.toThrow("Invalid bearer token");
|
||||
expect(harness.process.stdin.destroyed).toBe(false);
|
||||
|
||||
clearSharedCodexAppServerClient();
|
||||
expect(harness.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("clears the shared app-server when startup abandons an in-flight thread request", async () => {
|
||||
const { harness, run } = startThreadWithHarness(2_000);
|
||||
const runError = run.then(
|
||||
() => undefined,
|
||||
(error: unknown) => error,
|
||||
);
|
||||
await answerInitialize(harness);
|
||||
await waitForThreadStart(harness);
|
||||
|
||||
const error = await runError;
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toBe("codex app-server startup timed out");
|
||||
expect(harness.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("clears the shared app-server when cancellation abandons an in-flight thread request", async () => {
|
||||
const abortController = new AbortController();
|
||||
const { harness, run } = startThreadWithHarness(5_000, abortController.signal);
|
||||
const runError = run.then(
|
||||
() => undefined,
|
||||
(error: unknown) => error,
|
||||
);
|
||||
await answerInitialize(harness);
|
||||
await waitForThreadStart(harness);
|
||||
|
||||
abortController.abort();
|
||||
|
||||
const error = await runError;
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toBe("codex app-server startup aborted");
|
||||
expect(harness.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
|
||||
it("clears the shared app-server when a startup RPC times out", async () => {
|
||||
const perRpcTimeoutPluginConfig = {
|
||||
...pluginConfig,
|
||||
appServer: { command: "codex", requestTimeoutMs: 100 },
|
||||
computerUse: { enabled: true, marketplaceDiscoveryTimeoutMs: 1 },
|
||||
} satisfies CodexPluginConfig;
|
||||
const { harness, run } = startThreadWithHarness(5_000, new AbortController().signal, {
|
||||
pluginConfig: perRpcTimeoutPluginConfig,
|
||||
});
|
||||
const runError = run.then(
|
||||
() => undefined,
|
||||
(error: unknown) => error,
|
||||
);
|
||||
await answerInitialize(harness);
|
||||
await waitForRequest(harness, "plugin/list");
|
||||
|
||||
const error = await runError;
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toBe("plugin/list timed out");
|
||||
expect(harness.process.stdin.destroyed).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -49,7 +49,6 @@ import {
|
||||
} from "./shared-client.js";
|
||||
import {
|
||||
startOrResumeThread,
|
||||
isCodexThreadStartRequestError,
|
||||
type CodexAppServerThreadLifecycleBinding,
|
||||
type CodexContextEngineThreadBootstrapProjection,
|
||||
} from "./thread-lifecycle.js";
|
||||
@@ -100,7 +99,7 @@ export async function startCodexAttemptThread(params: {
|
||||
}): Promise<StartCodexAttemptThreadResult> {
|
||||
let pluginAppServer = params.appServer;
|
||||
let releaseSharedClientLease: (() => void) | undefined;
|
||||
let startupClientForCleanup: CodexAppServerClient | undefined;
|
||||
let startupClientForAbandonedRequestCleanup: CodexAppServerClient | undefined;
|
||||
let releaseStartupResourcesOnTimeout: (() => Promise<void>) | undefined;
|
||||
try {
|
||||
const startupResult = await withCodexStartupTimeout({
|
||||
@@ -185,7 +184,7 @@ export async function startCodexAttemptThread(params: {
|
||||
};
|
||||
releaseSharedClientLease = startupClientLease;
|
||||
attemptedClient = startupClient;
|
||||
startupClientForCleanup = startupClient;
|
||||
startupClientForAbandonedRequestCleanup = startupClient;
|
||||
await ensureCodexComputerUse({
|
||||
client: startupClient,
|
||||
pluginConfig: params.pluginConfig,
|
||||
@@ -334,8 +333,8 @@ export async function startCodexAttemptThread(params: {
|
||||
}
|
||||
const failedClient = attemptedClient;
|
||||
const clearedSharedClient = clearSharedCodexAppServerClientIfCurrent(failedClient);
|
||||
if (startupClientForCleanup === failedClient) {
|
||||
startupClientForCleanup = undefined;
|
||||
if (startupClientForAbandonedRequestCleanup === failedClient) {
|
||||
startupClientForAbandonedRequestCleanup = undefined;
|
||||
}
|
||||
attemptedClient = undefined;
|
||||
if (attempt >= CODEX_APP_SERVER_STARTUP_CONNECTION_CLOSE_MAX_ATTEMPTS) {
|
||||
@@ -365,7 +364,7 @@ export async function startCodexAttemptThread(params: {
|
||||
throw new Error("codex app-server startup retry loop exited unexpectedly");
|
||||
},
|
||||
});
|
||||
startupClientForCleanup = undefined;
|
||||
startupClientForAbandonedRequestCleanup = undefined;
|
||||
if (!releaseSharedClientLease) {
|
||||
throw new Error("codex app-server startup succeeded without a shared client lease");
|
||||
}
|
||||
@@ -375,12 +374,18 @@ export async function startCodexAttemptThread(params: {
|
||||
releaseSharedClientLease,
|
||||
};
|
||||
} catch (error) {
|
||||
const transportPoisoned = isCodexAppServerConnectionClosedError(error);
|
||||
const preserveSharedClientForSpawnedThreadStartError =
|
||||
Boolean(params.spawnedBy?.trim()) && isCodexThreadStartRequestError(error);
|
||||
if (transportPoisoned || !preserveSharedClientForSpawnedThreadStartError) {
|
||||
clearSharedCodexAppServerClientIfCurrent(startupClientForCleanup);
|
||||
if (params.signal.aborted || shouldClearSharedClientAfterStartupRace(error)) {
|
||||
clearSharedCodexAppServerClientIfCurrent(startupClientForAbandonedRequestCleanup);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function shouldClearSharedClientAfterStartupRace(error: unknown): boolean {
|
||||
return (
|
||||
error instanceof Error &&
|
||||
(error.message === "codex app-server startup timed out" ||
|
||||
error.message === "codex app-server startup aborted" ||
|
||||
error.message.endsWith(" timed out"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -834,6 +834,7 @@ describe("config plugin validation", () => {
|
||||
it("uses persisted installed-plugin records as stale channel evidence", async () => {
|
||||
const installedPluginIndexPath = path.join(suiteHome, ".openclaw", "plugins", "installs.json");
|
||||
await mkdirSafe(path.dirname(installedPluginIndexPath));
|
||||
clearLoadInstalledPluginIndexInstallRecordsCache();
|
||||
await fs.writeFile(
|
||||
installedPluginIndexPath,
|
||||
JSON.stringify(
|
||||
@@ -872,6 +873,7 @@ describe("config plugin validation", () => {
|
||||
});
|
||||
} finally {
|
||||
await fs.rm(installedPluginIndexPath, { force: true });
|
||||
clearLoadInstalledPluginIndexInstallRecordsCache();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user