diff --git a/CHANGELOG.md b/CHANGELOG.md index c5a916fcec2..f14d18534f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Telegram/exec approvals: route topic-aware exec approval followups through Telegram-owned threading and approval-target parsing, so forum-topic approvals stay in the originating topic instead of falling back to the root chat. (#58783) - Agents/failover: unify structured and raw provider error classification so provider-specific `400`/`422` payloads no longer get forced into generic format failures before retry, billing, or compaction logic can inspect them. (#58856) Thanks @aaron-he-zhu. - Auth profiles/store: coerce misplaced SecretRef objects out of plaintext `key` and `token` fields during store load so agents without ACP runtime stop crashing on `.trim()` after upgrade. (#58923) Thanks @openperf. +- ACPX/runtime: repair `queue owner unavailable` session recovery by replacing dead named sessions and resuming the backend session when ACPX exposes a stable session id, so the first ACP prompt no longer inherits a dead handle. (#58669) Thanks @neeravmakwana ## 2026.3.31 diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index 0094f121805..c647a8cfa95 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -47,6 +47,7 @@ async function expectSessionEnsureFallback(params: { env?: Record; expectNewAfterStatus: boolean; expectedRecordId?: string; + expectedResumeSessionId?: string | null; }) { const previousEnv = new Map(); for (const [key, value] of Object.entries(params.env ?? {})) { @@ -70,13 +71,26 @@ async function expectSessionEnsureFallback(params: { const logs = await readMockRuntimeLogEntries(logPath); const ensureIndex = logs.findIndex((entry) => entry.kind === "ensure"); const statusIndex = logs.findIndex((entry) => entry.kind === "status"); - const newIndex = logs.findIndex((entry) => entry.kind === "new"); + const newEntries = logs.filter((entry) => entry.kind === "new"); + const newEntry = newEntries[0] ?? null; + const newIndex = newEntry ? logs.indexOf(newEntry) : -1; expect(ensureIndex).toBeGreaterThanOrEqual(0); expect(statusIndex).toBeGreaterThan(ensureIndex); if (params.expectNewAfterStatus) { + expect(newEntries).toHaveLength(1); expect(newIndex).toBeGreaterThan(statusIndex); } else { - expect(newIndex).toBe(-1); + expect(newEntries).toHaveLength(0); + } + const newArgs = ((newEntry?.args as string[]) ?? []).slice(); + const resumeFlagIndex = newArgs.indexOf("--resume-session"); + if (params.expectedResumeSessionId === undefined) { + // No assertion requested for resume behavior. + } else if (params.expectedResumeSessionId === null) { + expect(resumeFlagIndex).toBe(-1); + } else { + expect(resumeFlagIndex).toBeGreaterThanOrEqual(0); + expect(newArgs[resumeFlagIndex + 1]).toBe(params.expectedResumeSessionId); } } finally { for (const [key, value] of previousEnv.entries()) { @@ -241,14 +255,15 @@ describe("AcpxRuntime", () => { expect(resumeArgs[resumeFlagIndex + 1]).toBe(resumeSessionId); }); - it("retains dead named sessions when status only reports queue owner unavailable", async () => { + it("repairs dead named sessions when status only reports queue owner unavailable", async () => { await expectSessionEnsureFallback({ sessionKey: "agent:codex:acp:dead-session", env: { MOCK_ACPX_STATUS_STATUS: "dead", MOCK_ACPX_STATUS_SUMMARY: "queue owner unavailable", }, - expectNewAfterStatus: false, + expectNewAfterStatus: true, + expectedResumeSessionId: "sid-agent:codex:acp:dead-session", }); }); @@ -275,7 +290,7 @@ describe("AcpxRuntime", () => { }); }); - it("retains the named session after ensure failure when status only reports queue owner unavailable", async () => { + it("repairs the named session after ensure failure when status only reports queue owner unavailable", async () => { await expectSessionEnsureFallback({ sessionKey: "agent:codex:acp:ensure-fallback-dead", env: { @@ -283,8 +298,22 @@ describe("AcpxRuntime", () => { MOCK_ACPX_STATUS_STATUS: "dead", MOCK_ACPX_STATUS_SUMMARY: "queue owner unavailable", }, - expectNewAfterStatus: false, + expectNewAfterStatus: true, expectedRecordId: "rec-agent:codex:acp:ensure-fallback-dead", + expectedResumeSessionId: "sid-agent:codex:acp:ensure-fallback-dead", + }); + }); + + it("falls back to a fresh named session when queue owner recovery has no resumable id", async () => { + await expectSessionEnsureFallback({ + sessionKey: "agent:codex:acp:dead-session-no-ids", + env: { + MOCK_ACPX_STATUS_STATUS: "dead", + MOCK_ACPX_STATUS_SUMMARY: "queue owner unavailable", + MOCK_ACPX_STATUS_NO_IDS: "1", + }, + expectNewAfterStatus: true, + expectedResumeSessionId: null, }); }); diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index 1df6089e468..53bf85eb1a9 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -74,6 +74,11 @@ type AcpxHealthCheckResult = }; }; +type EnsureFailureRecoveryResult = { + events: AcpxJsonObject[]; + skipPostEnsureReplacement: boolean; +}; + function formatPermissionModeGuidance(): string { return "Configure plugins.entries.acpx.config.permissionMode to one of: approve-reads, approve-all, deny-all."; } @@ -129,6 +134,10 @@ function shouldRetainNamedSessionForDeadStatus(detail: AcpxJsonObject | undefine return summary?.includes("queue owner unavailable") ?? false; } +function resolveResumeSessionIdFromDetail(detail: AcpxJsonObject | undefined): string | undefined { + return asOptionalString(detail?.acpxSessionId) ?? asOptionalString(detail?.agentSessionId); +} + function formatAcpxControlErrorMessage(params: { code?: string; message: string; @@ -350,11 +359,32 @@ export class AcpxRuntime implements AcpRuntime { }); } + private async replaceDeadNamedSession(params: { + detail: AcpxJsonObject | undefined; + sessionName: string; + agent: string; + cwd: string; + logContext: string; + }): Promise { + const resumeSessionId = resolveResumeSessionIdFromDetail(params.detail); + this.logger?.warn?.( + resumeSessionId + ? `acpx ensureSession repairing dead named session by resuming backend session: session=${params.sessionName} cwd=${params.cwd} resumeSessionId=${resumeSessionId} ${params.logContext}` + : `acpx ensureSession repairing dead named session with fresh session owner: session=${params.sessionName} cwd=${params.cwd} ${params.logContext}`, + ); + return await this.createNamedSession({ + agent: params.agent, + cwd: params.cwd, + sessionName: params.sessionName, + ...(resumeSessionId ? { resumeSessionId } : {}), + }); + } + private async shouldReplaceEnsuredSession(params: { sessionName: string; agent: string; cwd: string; - }): Promise { + }): Promise<{ replace: boolean; replacementEvents?: AcpxJsonObject[] }> { const args = await this.buildVerbArgs({ agent: params.agent, cwd: params.cwd, @@ -372,7 +402,7 @@ export class AcpxRuntime implements AcpRuntime { this.logger?.warn?.( `acpx ensureSession status probe failed: session=${params.sessionName} cwd=${params.cwd} error=${summarizeLogText(error instanceof Error ? error.message : String(error)) || ""}`, ); - return false; + return { replace: false }; } const noSession = events.some((event) => toAcpxErrorEvent(event)?.code === "NO_SESSION"); @@ -380,7 +410,7 @@ export class AcpxRuntime implements AcpRuntime { this.logger?.warn?.( `acpx ensureSession replacing missing named session: session=${params.sessionName} cwd=${params.cwd}`, ); - return true; + return { replace: true }; } const detail = events.find((event) => !toAcpxErrorEvent(event)); @@ -388,18 +418,24 @@ export class AcpxRuntime implements AcpRuntime { if (status === "dead") { const summary = summarizeLogText(asOptionalString(detail?.summary) ?? ""); if (shouldRetainNamedSessionForDeadStatus(detail)) { - this.logger?.warn?.( - `acpx ensureSession retaining dead named session with recoverable status: session=${params.sessionName} cwd=${params.cwd} status=${status} summary=${summary || ""}`, - ); - return false; + return { + replace: true, + replacementEvents: await this.replaceDeadNamedSession({ + detail, + sessionName: params.sessionName, + agent: params.agent, + cwd: params.cwd, + logContext: `status=${status} summary=${summary || ""}`, + }), + }; } this.logger?.warn?.( `acpx ensureSession replacing dead named session: session=${params.sessionName} cwd=${params.cwd} status=${status} summary=${summary || ""}`, ); - return true; + return { replace: true }; } - return false; + return { replace: false }; } private async recoverEnsureFailure(params: { @@ -407,7 +443,7 @@ export class AcpxRuntime implements AcpRuntime { agent: string; cwd: string; error: unknown; - }): Promise { + }): Promise { const errorMessage = summarizeLogText( params.error instanceof Error ? params.error.message : String(params.error), ); @@ -439,11 +475,14 @@ export class AcpxRuntime implements AcpRuntime { this.logger?.warn?.( `acpx ensureSession creating named session after ensure failure and missing status: session=${params.sessionName} cwd=${params.cwd}`, ); - return await this.createNamedSession({ - agent: params.agent, - cwd: params.cwd, - sessionName: params.sessionName, - }); + return { + events: await this.createNamedSession({ + agent: params.agent, + cwd: params.cwd, + sessionName: params.sessionName, + }), + skipPostEnsureReplacement: true, + }; } const detail = events.find((event) => !toAcpxErrorEvent(event)); @@ -451,26 +490,38 @@ export class AcpxRuntime implements AcpRuntime { if (status === "dead") { const summary = summarizeLogText(asOptionalString(detail?.summary) ?? ""); if (shouldRetainNamedSessionForDeadStatus(detail)) { - this.logger?.warn?.( - `acpx ensureSession retaining dead named session after ensure failure with recoverable status: session=${params.sessionName} cwd=${params.cwd} status=${status} summary=${summary || ""}`, - ); - return events; + return { + events: await this.replaceDeadNamedSession({ + detail, + sessionName: params.sessionName, + agent: params.agent, + cwd: params.cwd, + logContext: `status=${status} summary=${summary || ""}`, + }), + skipPostEnsureReplacement: true, + }; } this.logger?.warn?.( `acpx ensureSession replacing dead named session after ensure failure: session=${params.sessionName} cwd=${params.cwd}`, ); - return await this.createNamedSession({ - agent: params.agent, - cwd: params.cwd, - sessionName: params.sessionName, - }); + return { + events: await this.createNamedSession({ + agent: params.agent, + cwd: params.cwd, + sessionName: params.sessionName, + }), + skipPostEnsureReplacement: true, + }; } if (status === "alive" || findSessionIdentifierEvent(events)) { this.logger?.warn?.( `acpx ensureSession reusing live named session after ensure failure: session=${params.sessionName} cwd=${params.cwd} status=${status || "unknown"}`, ); - return events; + return { + events, + skipPostEnsureReplacement: false, + }; } return null; @@ -489,6 +540,7 @@ export class AcpxRuntime implements AcpRuntime { const mode = input.mode; const resumeSessionId = asTrimmedString(input.resumeSessionId); let events: AcpxJsonObject[]; + let skipPostEnsureReplacement = false; if (resumeSessionId) { events = await this.createNamedSession({ agent, @@ -517,7 +569,8 @@ export class AcpxRuntime implements AcpRuntime { if (!recovered) { throw error; } - events = recovered; + events = recovered.events; + skipPostEnsureReplacement = recovered.skipPostEnsureReplacement; } } if (events.length === 0) { @@ -527,26 +580,27 @@ export class AcpxRuntime implements AcpRuntime { } let ensuredEvent = findSessionIdentifierEvent(events); - if ( - ensuredEvent && - !resumeSessionId && - (await this.shouldReplaceEnsuredSession({ + if (ensuredEvent && !resumeSessionId && !skipPostEnsureReplacement) { + const replacement = await this.shouldReplaceEnsuredSession({ sessionName, agent, cwd, - })) - ) { - events = await this.createNamedSession({ - agent, - cwd, - sessionName, }); - if (events.length === 0) { - this.logger?.warn?.( - `acpx ensureSession returned no events after replacing dead session: session=${sessionName} agent=${agent} cwd=${cwd}`, - ); + if (replacement.replace) { + events = + replacement.replacementEvents ?? + (await this.createNamedSession({ + agent, + cwd, + sessionName, + })); + if (events.length === 0) { + this.logger?.warn?.( + `acpx ensureSession returned no events after replacing dead session: session=${sessionName} agent=${agent} cwd=${cwd}`, + ); + } + ensuredEvent = findSessionIdentifierEvent(events); } - ensuredEvent = findSessionIdentifierEvent(events); } if (!ensuredEvent && !resumeSessionId) { diff --git a/extensions/acpx/src/test-utils/runtime-fixtures.ts b/extensions/acpx/src/test-utils/runtime-fixtures.ts index 1195e13e7a5..976047b5d77 100644 --- a/extensions/acpx/src/test-utils/runtime-fixtures.ts +++ b/extensions/acpx/src/test-utils/runtime-fixtures.ts @@ -201,10 +201,11 @@ if (command === "status") { } const status = process.env.MOCK_ACPX_STATUS_STATUS || (sessionFromOption ? "alive" : "no-session"); const summary = process.env.MOCK_ACPX_STATUS_SUMMARY || ""; + const omitStatusIds = process.env.MOCK_ACPX_STATUS_NO_IDS === "1"; emitJson({ - acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null, - acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null, - agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null, + acpxRecordId: sessionFromOption && !omitStatusIds ? "rec-" + sessionFromOption : null, + acpxSessionId: sessionFromOption && !omitStatusIds ? "sid-" + sessionFromOption : null, + agentSessionId: sessionFromOption && !omitStatusIds ? "inner-" + sessionFromOption : null, status, ...(summary ? { summary } : {}), pid: 4242, @@ -426,6 +427,7 @@ export async function cleanupMockRuntimeFixtures(): Promise { delete process.env.MOCK_ACPX_ENSURE_EXIT_1; delete process.env.MOCK_ACPX_ENSURE_STDERR; delete process.env.MOCK_ACPX_STATUS_STATUS; + delete process.env.MOCK_ACPX_STATUS_NO_IDS; delete process.env.MOCK_ACPX_STATUS_SUMMARY; sharedMockCliScriptPath = null; logFileSequence = 0;