fix: recover missing Codex bound threads

This commit is contained in:
Kelaw - Keshav's Agent
2026-05-04 02:27:49 +05:30
committed by Peter Steinberger
parent 761e668acf
commit a373468d82
3 changed files with 225 additions and 19 deletions

View File

@@ -219,6 +219,7 @@ Docs: https://docs.openclaw.ai
- Google Meet: make Twilio setup status require an enabled `voice-call` plugin entry instead of treating a missing entry as ready. Thanks @vincentkoc.
- Telegram: render shared interactive reply buttons in reply delivery so plugin approval messages show inline keyboards. (#76238) Thanks @keshavbotagent.
- Cron/sessions: keep cron metadata rows without an on-disk transcript non-resumable until a transcript exists, so doctor and `sessions cleanup --fix-missing` no longer report or prune pre-transcript cron rows as broken sessions. Refs #77011.
- OpenAI Codex: recreate missing bound app-server threads once when a stale `/codex bind` sidecar survives a restart, preserving the selected auth profile and turn overrides before retrying the inbound turn. (#76936) Thanks @keshavbotagent.
- Agents/cli-runner: drop a saved `claude-cli` resume sessionId at preparation time when its on-disk transcript no longer exists in `~/.claude/projects/`, so a stale binding from a half-installed `update.run` cannot trap follow-up runs (auto-reply / Telegram direct) in a `claude --resume` timeout loop; the run starts fresh and the new sessionId is written back through the existing post-run flow. (#77030; refs #77011) Thanks @openperf.
- Release validation: install the cross-OS TypeScript harness through Windows-safe Node/npm shims so native Windows package checks reach the OpenClaw smoke suites instead of exiting before artifact capture. Thanks @vincentkoc.
- Release validation: let Windows packaged-upgrade checks continue after the shipped 2026.5.2 updater hits its native-module swap cleanup fallback, verifying the fallback-installed candidate through package metadata and downstream smoke instead of crashing on the immediate update-status probe. Thanks @vincentkoc.

View File

@@ -48,7 +48,10 @@ describe("codex conversation binding", () => {
});
beforeEach(() => {
agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({ version: 1, profiles: {} });
agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({
version: 1,
profiles: {},
});
agentRuntimeMocks.resolveAuthProfileOrder.mockReturnValue([]);
agentRuntimeMocks.resolveOpenClawAgentDir.mockReturnValue("/agent");
agentRuntimeMocks.resolveProviderIdForAuth.mockImplementation((provider: string) => provider);
@@ -56,7 +59,9 @@ describe("codex conversation binding", () => {
it("uses the default Codex auth profile and omits the public OpenAI provider for new binds", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const config = { auth: { order: { "openai-codex": ["openai-codex:default"] } } };
const config = {
auth: { order: { "openai-codex": ["openai-codex:default"] } },
};
const requests: Array<{ method: string; params: Record<string, unknown> }> = [];
agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({
version: 1,
@@ -220,6 +225,142 @@ describe("codex conversation binding", () => {
expect(result).toEqual({ handled: true });
});
it("recreates a missing bound thread and preserves auth plus turn overrides", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
agentRuntimeMocks.ensureAuthProfileStore.mockReturnValue({
version: 1,
profiles: {
work: {
type: "oauth",
provider: "openai-codex",
access: "access-token",
},
},
});
await fs.writeFile(
`${sessionFile}.codex-app-server.json`,
JSON.stringify({
schemaVersion: 1,
threadId: "thread-old",
cwd: tempDir,
authProfileId: "work",
model: "gpt-5.4-mini",
modelProvider: "openai",
approvalPolicy: "on-request",
sandbox: "workspace-write",
serviceTier: "fast",
}),
);
const requests: Array<{ method: string; params: Record<string, unknown> }> = [];
const notificationHandlers: Array<(notification: Record<string, unknown>) => void> = [];
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({
request: vi.fn(async (method: string, requestParams: Record<string, unknown>) => {
requests.push({ method, params: requestParams });
if (method === "turn/start" && requestParams.threadId === "thread-old") {
throw new Error("thread not found: thread-old");
}
if (method === "thread/start") {
return {
thread: { id: "thread-new", cwd: tempDir },
model: "gpt-5.4-mini",
};
}
if (method === "turn/start" && requestParams.threadId === "thread-new") {
setImmediate(() => {
for (const handler of notificationHandlers) {
handler({
method: "turn/completed",
params: {
threadId: "thread-new",
turn: {
id: "turn-new",
status: "completed",
items: [
{
id: "assistant-1",
type: "agentMessage",
text: "Recovered",
},
],
},
},
});
}
});
return { turn: { id: "turn-new" } };
}
throw new Error(`unexpected method: ${method}`);
}),
addNotificationHandler: vi.fn((handler) => {
notificationHandlers.push(handler);
return () => undefined;
}),
addRequestHandler: vi.fn(() => () => undefined),
});
const result = await handleCodexConversationInboundClaim(
{
content: "hi again",
bodyForAgent: "hi again",
channel: "telegram",
isGroup: false,
commandAuthorized: true,
},
{
channelId: "telegram",
pluginBinding: {
bindingId: "binding-1",
pluginId: "codex",
pluginRoot: tempDir,
channel: "telegram",
accountId: "default",
conversationId: "5185575566",
boundAt: Date.now(),
data: {
kind: "codex-app-server-session",
version: 1,
sessionFile,
workspaceDir: tempDir,
},
},
},
{ timeoutMs: 500 },
);
expect(result).toEqual({ handled: true, reply: { text: "Recovered" } });
expect(requests.map((request) => request.method)).toEqual([
"turn/start",
"thread/start",
"turn/start",
]);
expect(sharedClientMocks.getSharedCodexAppServerClient).toHaveBeenCalledWith(
expect.objectContaining({ authProfileId: "work" }),
);
expect(requests[1]?.params).toMatchObject({
model: "gpt-5.4-mini",
approvalPolicy: "on-request",
sandbox: "workspace-write",
serviceTier: "fast",
});
expect(requests[1]?.params).not.toHaveProperty("modelProvider");
expect(requests[2]?.params).toMatchObject({
threadId: "thread-new",
approvalPolicy: "on-request",
serviceTier: "fast",
});
const savedBinding = JSON.parse(
await fs.readFile(`${sessionFile}.codex-app-server.json`, "utf8"),
);
expect(savedBinding).toMatchObject({
threadId: "thread-new",
authProfileId: "work",
approvalPolicy: "on-request",
sandbox: "workspace-write",
serviceTier: "fast",
});
expect(savedBinding).not.toHaveProperty("modelProvider");
});
it("returns a clean failure reply when app-server turn start rejects", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
await fs.writeFile(

View File

@@ -10,8 +10,11 @@ import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js";
import {
codexSandboxPolicyForTurn,
resolveCodexAppServerRuntimeOptions,
type CodexAppServerApprovalPolicy,
type CodexAppServerSandboxMode,
} from "./app-server/config.js";
import {
type CodexServiceTier,
type CodexThreadResumeResponse,
type CodexThreadStartResponse,
type CodexTurnStartResponse,
@@ -59,6 +62,9 @@ type CodexConversationStartParams = {
model?: string;
modelProvider?: string;
authProfileId?: string;
approvalPolicy?: CodexAppServerApprovalPolicy;
sandbox?: CodexAppServerSandboxMode;
serviceTier?: CodexServiceTier;
};
type BoundTurnResult = {
@@ -100,6 +106,9 @@ export async function startCodexConversationThread(
model: params.model,
modelProvider: params.modelProvider,
authProfileId,
approvalPolicy: params.approvalPolicy,
sandbox: params.sandbox,
serviceTier: params.serviceTier,
config: params.config,
});
} else {
@@ -110,6 +119,9 @@ export async function startCodexConversationThread(
model: params.model,
modelProvider: params.modelProvider,
authProfileId,
approvalPolicy: params.approvalPolicy,
sandbox: params.sandbox,
serviceTier: params.serviceTier,
config: params.config,
});
}
@@ -137,7 +149,7 @@ export async function handleCodexConversationInboundClaim(
}
try {
const result = await enqueueBoundTurn(data.sessionFile, () =>
runBoundTurn({
runBoundTurnWithMissingThreadRecovery({
data,
prompt,
event,
@@ -177,9 +189,14 @@ async function attachExistingThread(params: {
model?: string;
modelProvider?: string;
authProfileId?: string;
approvalPolicy?: CodexAppServerApprovalPolicy;
sandbox?: CodexAppServerSandboxMode;
serviceTier?: CodexServiceTier;
config?: CodexAppServerAuthProfileLookup["config"];
}): Promise<void> {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig });
const runtime = resolveCodexAppServerRuntimeOptions({
pluginConfig: params.pluginConfig,
});
const modelProvider = resolveThreadRequestModelProvider({
authProfileId: params.authProfileId,
modelProvider: params.modelProvider,
@@ -196,10 +213,12 @@ async function attachExistingThread(params: {
threadId: params.threadId,
...(params.model ? { model: params.model } : {}),
...(modelProvider ? { modelProvider } : {}),
approvalPolicy: runtime.approvalPolicy,
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
approvalsReviewer: runtime.approvalsReviewer,
sandbox: runtime.sandbox,
...(runtime.serviceTier ? { serviceTier: runtime.serviceTier } : {}),
sandbox: params.sandbox ?? runtime.sandbox,
...((params.serviceTier ?? runtime.serviceTier)
? { serviceTier: params.serviceTier ?? runtime.serviceTier }
: {}),
persistExtendedHistory: true,
},
{ timeoutMs: runtime.requestTimeoutMs },
@@ -217,9 +236,9 @@ async function attachExistingThread(params: {
authProfileId: params.authProfileId,
modelProvider: response.modelProvider ?? params.modelProvider,
}),
approvalPolicy: runtime.approvalPolicy,
sandbox: runtime.sandbox,
serviceTier: runtime.serviceTier,
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
sandbox: params.sandbox ?? runtime.sandbox,
serviceTier: params.serviceTier ?? runtime.serviceTier,
},
{
config: params.config,
@@ -234,9 +253,14 @@ async function createThread(params: {
model?: string;
modelProvider?: string;
authProfileId?: string;
approvalPolicy?: CodexAppServerApprovalPolicy;
sandbox?: CodexAppServerSandboxMode;
serviceTier?: CodexServiceTier;
config?: CodexAppServerAuthProfileLookup["config"];
}): Promise<void> {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig });
const runtime = resolveCodexAppServerRuntimeOptions({
pluginConfig: params.pluginConfig,
});
const modelProvider = resolveThreadRequestModelProvider({
authProfileId: params.authProfileId,
modelProvider: params.modelProvider,
@@ -253,10 +277,12 @@ async function createThread(params: {
cwd: params.workspaceDir,
...(params.model ? { model: params.model } : {}),
...(modelProvider ? { modelProvider } : {}),
approvalPolicy: runtime.approvalPolicy,
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
approvalsReviewer: runtime.approvalsReviewer,
sandbox: runtime.sandbox,
...(runtime.serviceTier ? { serviceTier: runtime.serviceTier } : {}),
sandbox: params.sandbox ?? runtime.sandbox,
...((params.serviceTier ?? runtime.serviceTier)
? { serviceTier: params.serviceTier ?? runtime.serviceTier }
: {}),
developerInstructions:
"This Codex thread is bound to an OpenClaw conversation. Answer normally; OpenClaw will deliver your final response back to the conversation.",
experimentalRawEvents: true,
@@ -276,9 +302,9 @@ async function createThread(params: {
authProfileId: params.authProfileId,
modelProvider: response.modelProvider ?? params.modelProvider,
}),
approvalPolicy: runtime.approvalPolicy,
sandbox: runtime.sandbox,
serviceTier: runtime.serviceTier,
approvalPolicy: params.approvalPolicy ?? runtime.approvalPolicy,
sandbox: params.sandbox ?? runtime.sandbox,
serviceTier: params.serviceTier ?? runtime.serviceTier,
},
{
config: params.config,
@@ -293,7 +319,9 @@ async function runBoundTurn(params: {
pluginConfig?: unknown;
timeoutMs?: number;
}): Promise<BoundTurnResult> {
const runtime = resolveCodexAppServerRuntimeOptions({ pluginConfig: params.pluginConfig });
const runtime = resolveCodexAppServerRuntimeOptions({
pluginConfig: params.pluginConfig,
});
const binding = await readCodexAppServerBinding(params.data.sessionFile);
const threadId = binding?.threadId;
if (!threadId) {
@@ -350,7 +378,10 @@ async function runBoundTurn(params: {
"turn/start",
{
threadId,
input: buildCodexConversationTurnInput({ prompt: params.prompt, event: params.event }),
input: buildCodexConversationTurnInput({
prompt: params.prompt,
event: params.event,
}),
cwd: binding.cwd || params.data.workspaceDir,
approvalPolicy: binding.approvalPolicy ?? runtime.approvalPolicy,
approvalsReviewer: runtime.approvalsReviewer,
@@ -389,6 +420,39 @@ async function runBoundTurn(params: {
}
}
async function runBoundTurnWithMissingThreadRecovery(params: {
data: CodexConversationBindingData;
prompt: string;
event: PluginHookInboundClaimEvent;
pluginConfig?: unknown;
timeoutMs?: number;
}): Promise<BoundTurnResult> {
try {
return await runBoundTurn(params);
} catch (error) {
if (!isCodexThreadNotFoundError(error)) {
throw error;
}
const binding = await readCodexAppServerBinding(params.data.sessionFile);
await startCodexConversationThread({
pluginConfig: params.pluginConfig,
sessionFile: params.data.sessionFile,
workspaceDir: binding?.cwd || params.data.workspaceDir,
model: binding?.model,
modelProvider: binding?.modelProvider,
authProfileId: binding?.authProfileId,
approvalPolicy: binding?.approvalPolicy,
sandbox: binding?.sandbox,
serviceTier: binding?.serviceTier,
});
return await runBoundTurn(params);
}
}
function isCodexThreadNotFoundError(error: unknown): boolean {
return /\bthread not found:/iu.test(formatErrorMessage(error));
}
function enqueueBoundTurn<T>(key: string, run: () => Promise<T>): Promise<T> {
const state = getGlobalState();
const previous = state.queues.get(key) ?? Promise.resolve();