fix: repair queue owner session recovery (#58669) (thanks @neeravmakwana)

* fix(acpx): repair queue owner session recovery

* fix(acpx): avoid duplicate queue owner recovery

* fix: repair queue owner session recovery (#58669) (thanks @neeravmakwana)

---------

Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
Neerav Makwana
2026-04-01 08:50:23 -04:00
committed by GitHub
parent cd07ebef99
commit ed482b1ce7
4 changed files with 136 additions and 50 deletions

View File

@@ -47,6 +47,7 @@ async function expectSessionEnsureFallback(params: {
env?: Record<string, string>;
expectNewAfterStatus: boolean;
expectedRecordId?: string;
expectedResumeSessionId?: string | null;
}) {
const previousEnv = new Map<string, string | undefined>();
for (const [key, value] of Object.entries(params.env ?? {})) {
@@ -70,13 +71,26 @@ async function expectSessionEnsureFallback(params: {
const logs = await readMockRuntimeLogEntries(logPath);
const ensureIndex = logs.findIndex((entry) => entry.kind === "ensure");
const statusIndex = logs.findIndex((entry) => entry.kind === "status");
const newIndex = logs.findIndex((entry) => entry.kind === "new");
const newEntries = logs.filter((entry) => entry.kind === "new");
const newEntry = newEntries[0] ?? null;
const newIndex = newEntry ? logs.indexOf(newEntry) : -1;
expect(ensureIndex).toBeGreaterThanOrEqual(0);
expect(statusIndex).toBeGreaterThan(ensureIndex);
if (params.expectNewAfterStatus) {
expect(newEntries).toHaveLength(1);
expect(newIndex).toBeGreaterThan(statusIndex);
} else {
expect(newIndex).toBe(-1);
expect(newEntries).toHaveLength(0);
}
const newArgs = ((newEntry?.args as string[]) ?? []).slice();
const resumeFlagIndex = newArgs.indexOf("--resume-session");
if (params.expectedResumeSessionId === undefined) {
// No assertion requested for resume behavior.
} else if (params.expectedResumeSessionId === null) {
expect(resumeFlagIndex).toBe(-1);
} else {
expect(resumeFlagIndex).toBeGreaterThanOrEqual(0);
expect(newArgs[resumeFlagIndex + 1]).toBe(params.expectedResumeSessionId);
}
} finally {
for (const [key, value] of previousEnv.entries()) {
@@ -241,14 +255,15 @@ describe("AcpxRuntime", () => {
expect(resumeArgs[resumeFlagIndex + 1]).toBe(resumeSessionId);
});
it("retains dead named sessions when status only reports queue owner unavailable", async () => {
it("repairs dead named sessions when status only reports queue owner unavailable", async () => {
await expectSessionEnsureFallback({
sessionKey: "agent:codex:acp:dead-session",
env: {
MOCK_ACPX_STATUS_STATUS: "dead",
MOCK_ACPX_STATUS_SUMMARY: "queue owner unavailable",
},
expectNewAfterStatus: false,
expectNewAfterStatus: true,
expectedResumeSessionId: "sid-agent:codex:acp:dead-session",
});
});
@@ -275,7 +290,7 @@ describe("AcpxRuntime", () => {
});
});
it("retains the named session after ensure failure when status only reports queue owner unavailable", async () => {
it("repairs the named session after ensure failure when status only reports queue owner unavailable", async () => {
await expectSessionEnsureFallback({
sessionKey: "agent:codex:acp:ensure-fallback-dead",
env: {
@@ -283,8 +298,22 @@ describe("AcpxRuntime", () => {
MOCK_ACPX_STATUS_STATUS: "dead",
MOCK_ACPX_STATUS_SUMMARY: "queue owner unavailable",
},
expectNewAfterStatus: false,
expectNewAfterStatus: true,
expectedRecordId: "rec-agent:codex:acp:ensure-fallback-dead",
expectedResumeSessionId: "sid-agent:codex:acp:ensure-fallback-dead",
});
});
it("falls back to a fresh named session when queue owner recovery has no resumable id", async () => {
await expectSessionEnsureFallback({
sessionKey: "agent:codex:acp:dead-session-no-ids",
env: {
MOCK_ACPX_STATUS_STATUS: "dead",
MOCK_ACPX_STATUS_SUMMARY: "queue owner unavailable",
MOCK_ACPX_STATUS_NO_IDS: "1",
},
expectNewAfterStatus: true,
expectedResumeSessionId: null,
});
});

View File

@@ -74,6 +74,11 @@ type AcpxHealthCheckResult =
};
};
type EnsureFailureRecoveryResult = {
events: AcpxJsonObject[];
skipPostEnsureReplacement: boolean;
};
function formatPermissionModeGuidance(): string {
return "Configure plugins.entries.acpx.config.permissionMode to one of: approve-reads, approve-all, deny-all.";
}
@@ -129,6 +134,10 @@ function shouldRetainNamedSessionForDeadStatus(detail: AcpxJsonObject | undefine
return summary?.includes("queue owner unavailable") ?? false;
}
function resolveResumeSessionIdFromDetail(detail: AcpxJsonObject | undefined): string | undefined {
return asOptionalString(detail?.acpxSessionId) ?? asOptionalString(detail?.agentSessionId);
}
function formatAcpxControlErrorMessage(params: {
code?: string;
message: string;
@@ -350,11 +359,32 @@ export class AcpxRuntime implements AcpRuntime {
});
}
private async replaceDeadNamedSession(params: {
detail: AcpxJsonObject | undefined;
sessionName: string;
agent: string;
cwd: string;
logContext: string;
}): Promise<AcpxJsonObject[]> {
const resumeSessionId = resolveResumeSessionIdFromDetail(params.detail);
this.logger?.warn?.(
resumeSessionId
? `acpx ensureSession repairing dead named session by resuming backend session: session=${params.sessionName} cwd=${params.cwd} resumeSessionId=${resumeSessionId} ${params.logContext}`
: `acpx ensureSession repairing dead named session with fresh session owner: session=${params.sessionName} cwd=${params.cwd} ${params.logContext}`,
);
return await this.createNamedSession({
agent: params.agent,
cwd: params.cwd,
sessionName: params.sessionName,
...(resumeSessionId ? { resumeSessionId } : {}),
});
}
private async shouldReplaceEnsuredSession(params: {
sessionName: string;
agent: string;
cwd: string;
}): Promise<boolean> {
}): Promise<{ replace: boolean; replacementEvents?: AcpxJsonObject[] }> {
const args = await this.buildVerbArgs({
agent: params.agent,
cwd: params.cwd,
@@ -372,7 +402,7 @@ export class AcpxRuntime implements AcpRuntime {
this.logger?.warn?.(
`acpx ensureSession status probe failed: session=${params.sessionName} cwd=${params.cwd} error=${summarizeLogText(error instanceof Error ? error.message : String(error)) || "<empty>"}`,
);
return false;
return { replace: false };
}
const noSession = events.some((event) => toAcpxErrorEvent(event)?.code === "NO_SESSION");
@@ -380,7 +410,7 @@ export class AcpxRuntime implements AcpRuntime {
this.logger?.warn?.(
`acpx ensureSession replacing missing named session: session=${params.sessionName} cwd=${params.cwd}`,
);
return true;
return { replace: true };
}
const detail = events.find((event) => !toAcpxErrorEvent(event));
@@ -388,18 +418,24 @@ export class AcpxRuntime implements AcpRuntime {
if (status === "dead") {
const summary = summarizeLogText(asOptionalString(detail?.summary) ?? "");
if (shouldRetainNamedSessionForDeadStatus(detail)) {
this.logger?.warn?.(
`acpx ensureSession retaining dead named session with recoverable status: session=${params.sessionName} cwd=${params.cwd} status=${status} summary=${summary || "<empty>"}`,
);
return false;
return {
replace: true,
replacementEvents: await this.replaceDeadNamedSession({
detail,
sessionName: params.sessionName,
agent: params.agent,
cwd: params.cwd,
logContext: `status=${status} summary=${summary || "<empty>"}`,
}),
};
}
this.logger?.warn?.(
`acpx ensureSession replacing dead named session: session=${params.sessionName} cwd=${params.cwd} status=${status} summary=${summary || "<empty>"}`,
);
return true;
return { replace: true };
}
return false;
return { replace: false };
}
private async recoverEnsureFailure(params: {
@@ -407,7 +443,7 @@ export class AcpxRuntime implements AcpRuntime {
agent: string;
cwd: string;
error: unknown;
}): Promise<AcpxJsonObject[] | null> {
}): Promise<EnsureFailureRecoveryResult | null> {
const errorMessage = summarizeLogText(
params.error instanceof Error ? params.error.message : String(params.error),
);
@@ -439,11 +475,14 @@ export class AcpxRuntime implements AcpRuntime {
this.logger?.warn?.(
`acpx ensureSession creating named session after ensure failure and missing status: session=${params.sessionName} cwd=${params.cwd}`,
);
return await this.createNamedSession({
agent: params.agent,
cwd: params.cwd,
sessionName: params.sessionName,
});
return {
events: await this.createNamedSession({
agent: params.agent,
cwd: params.cwd,
sessionName: params.sessionName,
}),
skipPostEnsureReplacement: true,
};
}
const detail = events.find((event) => !toAcpxErrorEvent(event));
@@ -451,26 +490,38 @@ export class AcpxRuntime implements AcpRuntime {
if (status === "dead") {
const summary = summarizeLogText(asOptionalString(detail?.summary) ?? "");
if (shouldRetainNamedSessionForDeadStatus(detail)) {
this.logger?.warn?.(
`acpx ensureSession retaining dead named session after ensure failure with recoverable status: session=${params.sessionName} cwd=${params.cwd} status=${status} summary=${summary || "<empty>"}`,
);
return events;
return {
events: await this.replaceDeadNamedSession({
detail,
sessionName: params.sessionName,
agent: params.agent,
cwd: params.cwd,
logContext: `status=${status} summary=${summary || "<empty>"}`,
}),
skipPostEnsureReplacement: true,
};
}
this.logger?.warn?.(
`acpx ensureSession replacing dead named session after ensure failure: session=${params.sessionName} cwd=${params.cwd}`,
);
return await this.createNamedSession({
agent: params.agent,
cwd: params.cwd,
sessionName: params.sessionName,
});
return {
events: await this.createNamedSession({
agent: params.agent,
cwd: params.cwd,
sessionName: params.sessionName,
}),
skipPostEnsureReplacement: true,
};
}
if (status === "alive" || findSessionIdentifierEvent(events)) {
this.logger?.warn?.(
`acpx ensureSession reusing live named session after ensure failure: session=${params.sessionName} cwd=${params.cwd} status=${status || "unknown"}`,
);
return events;
return {
events,
skipPostEnsureReplacement: false,
};
}
return null;
@@ -489,6 +540,7 @@ export class AcpxRuntime implements AcpRuntime {
const mode = input.mode;
const resumeSessionId = asTrimmedString(input.resumeSessionId);
let events: AcpxJsonObject[];
let skipPostEnsureReplacement = false;
if (resumeSessionId) {
events = await this.createNamedSession({
agent,
@@ -517,7 +569,8 @@ export class AcpxRuntime implements AcpRuntime {
if (!recovered) {
throw error;
}
events = recovered;
events = recovered.events;
skipPostEnsureReplacement = recovered.skipPostEnsureReplacement;
}
}
if (events.length === 0) {
@@ -527,26 +580,27 @@ export class AcpxRuntime implements AcpRuntime {
}
let ensuredEvent = findSessionIdentifierEvent(events);
if (
ensuredEvent &&
!resumeSessionId &&
(await this.shouldReplaceEnsuredSession({
if (ensuredEvent && !resumeSessionId && !skipPostEnsureReplacement) {
const replacement = await this.shouldReplaceEnsuredSession({
sessionName,
agent,
cwd,
}))
) {
events = await this.createNamedSession({
agent,
cwd,
sessionName,
});
if (events.length === 0) {
this.logger?.warn?.(
`acpx ensureSession returned no events after replacing dead session: session=${sessionName} agent=${agent} cwd=${cwd}`,
);
if (replacement.replace) {
events =
replacement.replacementEvents ??
(await this.createNamedSession({
agent,
cwd,
sessionName,
}));
if (events.length === 0) {
this.logger?.warn?.(
`acpx ensureSession returned no events after replacing dead session: session=${sessionName} agent=${agent} cwd=${cwd}`,
);
}
ensuredEvent = findSessionIdentifierEvent(events);
}
ensuredEvent = findSessionIdentifierEvent(events);
}
if (!ensuredEvent && !resumeSessionId) {

View File

@@ -201,10 +201,11 @@ if (command === "status") {
}
const status = process.env.MOCK_ACPX_STATUS_STATUS || (sessionFromOption ? "alive" : "no-session");
const summary = process.env.MOCK_ACPX_STATUS_SUMMARY || "";
const omitStatusIds = process.env.MOCK_ACPX_STATUS_NO_IDS === "1";
emitJson({
acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null,
acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null,
agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null,
acpxRecordId: sessionFromOption && !omitStatusIds ? "rec-" + sessionFromOption : null,
acpxSessionId: sessionFromOption && !omitStatusIds ? "sid-" + sessionFromOption : null,
agentSessionId: sessionFromOption && !omitStatusIds ? "inner-" + sessionFromOption : null,
status,
...(summary ? { summary } : {}),
pid: 4242,
@@ -426,6 +427,7 @@ export async function cleanupMockRuntimeFixtures(): Promise<void> {
delete process.env.MOCK_ACPX_ENSURE_EXIT_1;
delete process.env.MOCK_ACPX_ENSURE_STDERR;
delete process.env.MOCK_ACPX_STATUS_STATUS;
delete process.env.MOCK_ACPX_STATUS_NO_IDS;
delete process.env.MOCK_ACPX_STATUS_SUMMARY;
sharedMockCliScriptPath = null;
logFileSequence = 0;