From a373468d825224c92051dc2e50b717fbe75c401c Mon Sep 17 00:00:00 2001 From: Kelaw - Keshav's Agent Date: Mon, 4 May 2026 02:27:49 +0530 Subject: [PATCH] fix: recover missing Codex bound threads --- CHANGELOG.md | 1 + .../codex/src/conversation-binding.test.ts | 145 +++++++++++++++++- extensions/codex/src/conversation-binding.ts | 98 ++++++++++-- 3 files changed, 225 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 618574cf045..aba4a24adbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -219,6 +219,7 @@ Docs: https://docs.openclaw.ai - Google Meet: make Twilio setup status require an enabled `voice-call` plugin entry instead of treating a missing entry as ready. Thanks @vincentkoc. - Telegram: render shared interactive reply buttons in reply delivery so plugin approval messages show inline keyboards. (#76238) Thanks @keshavbotagent. - Cron/sessions: keep cron metadata rows without an on-disk transcript non-resumable until a transcript exists, so doctor and `sessions cleanup --fix-missing` no longer report or prune pre-transcript cron rows as broken sessions. Refs #77011. +- OpenAI Codex: recreate missing bound app-server threads once when a stale `/codex bind` sidecar survives a restart, preserving the selected auth profile and turn overrides before retrying the inbound turn. (#76936) Thanks @keshavbotagent. - Agents/cli-runner: drop a saved `claude-cli` resume sessionId at preparation time when its on-disk transcript no longer exists in `~/.claude/projects/`, so a stale binding from a half-installed `update.run` cannot trap follow-up runs (auto-reply / Telegram direct) in a `claude --resume` timeout loop; the run starts fresh and the new sessionId is written back through the existing post-run flow. (#77030; refs #77011) Thanks @openperf. - Release validation: install the cross-OS TypeScript harness through Windows-safe Node/npm shims so native Windows package checks reach the OpenClaw smoke suites instead of exiting before artifact capture. Thanks @vincentkoc. - Release validation: let Windows packaged-upgrade checks continue after the shipped 2026.5.2 updater hits its native-module swap cleanup fallback, verifying the fallback-installed candidate through package metadata and downstream smoke instead of crashing on the immediate update-status probe. Thanks @vincentkoc. diff --git a/extensions/codex/src/conversation-binding.test.ts b/extensions/codex/src/conversation-binding.test.ts index 675d3463050..5ded64df0dd 100644 --- a/extensions/codex/src/conversation-binding.test.ts +++ b/extensions/codex/src/conversation-binding.test.ts @@ -48,7 +48,10 @@ describe("codex conversation binding", () => { }); beforeEach(() => { - agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({ version: 1, profiles: {} }); + agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({ + version: 1, + profiles: {}, + }); agentRuntimeMocks.resolveAuthProfileOrder.mockReturnValue([]); agentRuntimeMocks.resolveOpenClawAgentDir.mockReturnValue("/agent"); agentRuntimeMocks.resolveProviderIdForAuth.mockImplementation((provider: string) => provider); @@ -56,7 +59,9 @@ describe("codex conversation binding", () => { it("uses the default Codex auth profile and omits the public OpenAI provider for new binds", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); - const config = { auth: { order: { "openai-codex": ["openai-codex:default"] } } }; + const config = { + auth: { order: { "openai-codex": ["openai-codex:default"] } }, + }; const requests: Array<{ method: string; params: Record }> = []; agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({ version: 1, @@ -220,6 +225,142 @@ describe("codex conversation binding", () => { expect(result).toEqual({ handled: true }); }); + it("recreates a missing bound thread and preserves auth plus turn overrides", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({ + version: 1, + profiles: { + work: { + type: "oauth", + provider: "openai-codex", + access: "access-token", + }, + }, + }); + await fs.writeFile( + `${sessionFile}.codex-app-server.json`, + JSON.stringify({ + schemaVersion: 1, + threadId: "thread-old", + cwd: tempDir, + authProfileId: "work", + model: "gpt-5.4-mini", + modelProvider: "openai", + approvalPolicy: "on-request", + sandbox: "workspace-write", + serviceTier: "fast", + }), + ); + const requests: Array<{ method: string; params: Record }> = []; + const notificationHandlers: Array<(notification: Record) => void> = []; + sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ + request: vi.fn(async (method: string, requestParams: Record) => { + requests.push({ method, params: requestParams }); + if (method === "turn/start" && requestParams.threadId === "thread-old") { + throw new Error("thread not found: thread-old"); + } + if (method === "thread/start") { + return { + thread: { id: "thread-new", cwd: tempDir }, + model: "gpt-5.4-mini", + }; + } + if (method === "turn/start" && requestParams.threadId === "thread-new") { + setImmediate(() => { + for (const handler of notificationHandlers) { + handler({ + method: "turn/completed", + params: { + threadId: "thread-new", + turn: { + id: "turn-new", + status: "completed", + items: [ + { + id: "assistant-1", + type: "agentMessage", + text: "Recovered", + }, + ], + }, + }, + }); + } + }); + return { turn: { id: "turn-new" } }; + } + throw new Error(`unexpected method: ${method}`); + }), + addNotificationHandler: vi.fn((handler) => { + notificationHandlers.push(handler); + return () => undefined; + }), + addRequestHandler: vi.fn(() => () => undefined), + }); + + const result = await handleCodexConversationInboundClaim( + { + content: "hi again", + bodyForAgent: "hi again", + channel: "telegram", + isGroup: false, + commandAuthorized: true, + }, + { + channelId: "telegram", + pluginBinding: { + bindingId: "binding-1", + pluginId: "codex", + pluginRoot: tempDir, + channel: "telegram", + accountId: "default", + conversationId: "5185575566", + boundAt: Date.now(), + data: { + kind: "codex-app-server-session", + version: 1, + sessionFile, + workspaceDir: tempDir, + }, + }, + }, + { timeoutMs: 500 }, + ); + + expect(result).toEqual({ handled: true, reply: { text: "Recovered" } }); + expect(requests.map((request) => request.method)).toEqual([ + "turn/start", + "thread/start", + "turn/start", + ]); + expect(sharedClientMocks.getSharedCodexAppServerClient).toHaveBeenCalledWith( + expect.objectContaining({ authProfileId: "work" }), + ); + expect(requests[1]?.params).toMatchObject({ + model: "gpt-5.4-mini", + approvalPolicy: "on-request", + sandbox: "workspace-write", + serviceTier: "fast", + }); + expect(requests[1]?.params).not.toHaveProperty("modelProvider"); + expect(requests[2]?.params).toMatchObject({ + threadId: "thread-new", + approvalPolicy: "on-request", + serviceTier: "fast", + }); + const savedBinding = JSON.parse( + await fs.readFile(`${sessionFile}.codex-app-server.json`, "utf8"), + ); + expect(savedBinding).toMatchObject({ + threadId: "thread-new", + authProfileId: "work", + approvalPolicy: "on-request", + sandbox: "workspace-write", + serviceTier: "fast", + }); + expect(savedBinding).not.toHaveProperty("modelProvider"); + }); + it("returns a clean failure reply when app-server turn start rejects", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); await fs.writeFile( diff --git a/extensions/codex/src/conversation-binding.ts b/extensions/codex/src/conversation-binding.ts index c8919e8f1b0..f1ee5b4802f 100644 --- a/extensions/codex/src/conversation-binding.ts +++ b/extensions/codex/src/conversation-binding.ts @@ -10,8 +10,11 @@ import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js"; import { codexSandboxPolicyForTurn, resolveCodexAppServerRuntimeOptions, + type CodexAppServerApprovalPolicy, + type CodexAppServerSandboxMode, } from "./app-server/config.js"; import { + type CodexServiceTier, type CodexThreadResumeResponse, type CodexThreadStartResponse, type CodexTurnStartResponse, @@ -59,6 +62,9 @@ type CodexConversationStartParams = { model?: string; modelProvider?: string; authProfileId?: string; + approvalPolicy?: CodexAppServerApprovalPolicy; + sandbox?: CodexAppServerSandboxMode; + serviceTier?: CodexServiceTier; }; type BoundTurnResult = { @@ -100,6 +106,9 @@ export async function startCodexConversationThread( model: params.model, modelProvider: params.modelProvider, authProfileId, + approvalPolicy: params.approvalPolicy, + sandbox: params.sandbox, + serviceTier: params.serviceTier, config: params.config, }); } else { @@ -110,6 +119,9 @@ export async function startCodexConversationThread( model: params.model, modelProvider: params.modelProvider, authProfileId, + approvalPolicy: params.approvalPolicy, + sandbox: params.sandbox, + serviceTier: params.serviceTier, config: params.config, }); } @@ -137,7 +149,7 @@ export async function handleCodexConversationInboundClaim( } try { const result = await enqueueBoundTurn(data.sessionFile, () => - runBoundTurn({ + runBoundTurnWithMissingThreadRecovery({ data, prompt, event, @@ -177,9 +189,14 @@ async function attachExistingThread(params: { model?: string; modelProvider?: string; authProfileId?: string; + approvalPolicy?: CodexAppServerApprovalPolicy; + sandbox?: CodexAppServerSandboxMode; + serviceTier?: CodexServiceTier; config?: CodexAppServerAuthProfileLookup["config"]; }): Promise { - const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig }); + const runtime = resolveCodexAppServerRuntimeOptions({ + pluginConfig: params.pluginConfig, + }); const modelProvider = resolveThreadRequestModelProvider({ authProfileId: params.authProfileId, modelProvider: params.modelProvider, @@ -196,10 +213,12 @@ async function attachExistingThread(params: { threadId: params.threadId, ...(params.model ? { model: params.model } : {}), ...(modelProvider ? { modelProvider } : {}), - approvalPolicy: runtime.approvalPolicy, + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, approvalsReviewer: runtime.approvalsReviewer, - sandbox: runtime.sandbox, - ...(runtime.serviceTier ? { serviceTier: runtime.serviceTier } : {}), + sandbox: params.sandbox ?? runtime.sandbox, + ...((params.serviceTier ?? runtime.serviceTier) + ? { serviceTier: params.serviceTier ?? runtime.serviceTier } + : {}), persistExtendedHistory: true, }, { timeoutMs: runtime.requestTimeoutMs }, @@ -217,9 +236,9 @@ async function attachExistingThread(params: { authProfileId: params.authProfileId, modelProvider: response.modelProvider ?? params.modelProvider, }), - approvalPolicy: runtime.approvalPolicy, - sandbox: runtime.sandbox, - serviceTier: runtime.serviceTier, + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, + sandbox: params.sandbox ?? runtime.sandbox, + serviceTier: params.serviceTier ?? runtime.serviceTier, }, { config: params.config, @@ -234,9 +253,14 @@ async function createThread(params: { model?: string; modelProvider?: string; authProfileId?: string; + approvalPolicy?: CodexAppServerApprovalPolicy; + sandbox?: CodexAppServerSandboxMode; + serviceTier?: CodexServiceTier; config?: CodexAppServerAuthProfileLookup["config"]; }): Promise { - const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig }); + const runtime = resolveCodexAppServerRuntimeOptions({ + pluginConfig: params.pluginConfig, + }); const modelProvider = resolveThreadRequestModelProvider({ authProfileId: params.authProfileId, modelProvider: params.modelProvider, @@ -253,10 +277,12 @@ async function createThread(params: { cwd: params.workspaceDir, ...(params.model ? { model: params.model } : {}), ...(modelProvider ? { modelProvider } : {}), - approvalPolicy: runtime.approvalPolicy, + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, approvalsReviewer: runtime.approvalsReviewer, - sandbox: runtime.sandbox, - ...(runtime.serviceTier ? { serviceTier: runtime.serviceTier } : {}), + sandbox: params.sandbox ?? runtime.sandbox, + ...((params.serviceTier ?? runtime.serviceTier) + ? { serviceTier: params.serviceTier ?? runtime.serviceTier } + : {}), developerInstructions: "This Codex thread is bound to an OpenClaw conversation. Answer normally; OpenClaw will deliver your final response back to the conversation.", experimentalRawEvents: true, @@ -276,9 +302,9 @@ async function createThread(params: { authProfileId: params.authProfileId, modelProvider: response.modelProvider ?? params.modelProvider, }), - approvalPolicy: runtime.approvalPolicy, - sandbox: runtime.sandbox, - serviceTier: runtime.serviceTier, + approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy, + sandbox: params.sandbox ?? runtime.sandbox, + serviceTier: params.serviceTier ?? runtime.serviceTier, }, { config: params.config, @@ -293,7 +319,9 @@ async function runBoundTurn(params: { pluginConfig?: unknown; timeoutMs?: number; }): Promise { - const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig }); + const runtime = resolveCodexAppServerRuntimeOptions({ + pluginConfig: params.pluginConfig, + }); const binding = await readCodexAppServerBinding(params.data.sessionFile); const threadId = binding?.threadId; if (!threadId) { @@ -350,7 +378,10 @@ async function runBoundTurn(params: { "turn/start", { threadId, - input: buildCodexConversationTurnInput({ prompt: params.prompt, event: params.event }), + input: buildCodexConversationTurnInput({ + prompt: params.prompt, + event: params.event, + }), cwd: binding.cwd || params.data.workspaceDir, approvalPolicy: binding.approvalPolicy ?? runtime.approvalPolicy, approvalsReviewer: runtime.approvalsReviewer, @@ -389,6 +420,39 @@ async function runBoundTurn(params: { } } +async function runBoundTurnWithMissingThreadRecovery(params: { + data: CodexConversationBindingData; + prompt: string; + event: PluginHookInboundClaimEvent; + pluginConfig?: unknown; + timeoutMs?: number; +}): Promise { + try { + return await runBoundTurn(params); + } catch (error) { + if (!isCodexThreadNotFoundError(error)) { + throw error; + } + const binding = await readCodexAppServerBinding(params.data.sessionFile); + await startCodexConversationThread({ + pluginConfig: params.pluginConfig, + sessionFile: params.data.sessionFile, + workspaceDir: binding?.cwd || params.data.workspaceDir, + model: binding?.model, + modelProvider: binding?.modelProvider, + authProfileId: binding?.authProfileId, + approvalPolicy: binding?.approvalPolicy, + sandbox: binding?.sandbox, + serviceTier: binding?.serviceTier, + }); + return await runBoundTurn(params); + } +} + +function isCodexThreadNotFoundError(error: unknown): boolean { + return /\bthread not found:/iu.test(formatErrorMessage(error)); +} + function enqueueBoundTurn(key: string, run: () => Promise): Promise { const state = getGlobalState(); const previous = state.queues.get(key) ?? Promise.resolve();