fix: notify chat when main session recovery fails

This commit is contained in:
Paul Frederiksen
2026-05-23 19:21:41 +00:00
committed by Peter Steinberger
parent d4e42d61c9
commit cf61b876ec
3 changed files with 112 additions and 0 deletions

View File

@@ -574,4 +574,54 @@ describe("main-session-restart-recovery", () => {
expect(store["agent:main:main"]?.status).toBe("failed");
expect(store["agent:main:main"]?.abortedLastRun).toBe(true);
});
it("sends a visible notice before failing an unresumable chat-bound main session", async () => {
const sessionsDir = await makeSessionsDir();
await writeStore(sessionsDir, {
"agent:main:telegram:group:12345": {
sessionId: "main-session",
updatedAt: Date.now() - 10_000,
status: "running",
abortedLastRun: true,
deliveryContext: {
channel: "telegram",
to: "group:12345",
accountId: "default",
threadId: "topic-1",
},
},
});
await writeTranscript(sessionsDir, "main-session", [
{ role: "user", content: "do the thing" },
{ role: "assistant", content: "partial answer" },
]);
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
expect(result).toEqual({ recovered: 0, failed: 1, skipped: 0 });
expect(callGateway).toHaveBeenCalledOnce();
const gatewayCall = vi.mocked(callGateway).mock.calls[0]?.[0] as
| { method?: string; params?: Record<string, unknown> }
| undefined;
expect(gatewayCall?.method).toBe("message.action");
expect(gatewayCall?.params).toMatchObject({
channel: "telegram",
action: "send",
accountId: "default",
sessionKey: "agent:main:telegram:group:12345",
sessionId: "main-session",
});
expect(gatewayCall?.params?.params).toMatchObject({
to: "group:12345",
threadId: "topic-1",
bestEffort: true,
});
expect(String((gatewayCall?.params?.params as Record<string, unknown>)?.message)).toContain(
"couldn't safely resume",
);
const store = loadSessionStore(path.join(sessionsDir, "sessions.json"));
expect(store["agent:main:telegram:group:12345"]?.status).toBe("failed");
expect(store["agent:main:telegram:group:12345"]?.abortedLastRun).toBe(true);
});
});

View File

@@ -22,6 +22,8 @@ import { resolveGatewaySessionStoreTarget } from "../gateway/session-utils.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { CommandLane } from "../process/lanes.js";
import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { deliveryContextFromSession } from "../utils/delivery-context.shared.js";
import { resolveAgentSessionDirs } from "./session-dirs.js";
import type { SessionLockInspection } from "./session-write-lock.js";
@@ -30,6 +32,9 @@ const log = createSubsystemLogger("main-session-restart-recovery");
const DEFAULT_RECOVERY_DELAY_MS = 5_000;
const MAX_RECOVERY_RETRIES = 3;
const RETRY_BACKOFF_MULTIPLIER = 2;
const UNRESUMABLE_SESSION_NOTICE =
"I was interrupted by a gateway restart and couldn't safely resume the previous turn. " +
"Please send that last request again and I'll pick it up cleanly.";
function shouldSkipMainRecovery(entry: SessionEntry, sessionKey: string): boolean {
if (typeof entry.spawnDepth === "number" && entry.spawnDepth > 0) {
@@ -274,6 +279,57 @@ async function markSessionFailed(params: {
log.warn(`marked interrupted main session failed: ${params.sessionKey} (${params.reason})`);
}
async function sendUnresumableSessionNotice(params: {
entry: SessionEntry;
reason: string;
sessionKey: string;
}): Promise<boolean> {
const deliveryContext = deliveryContextFromSession(params.entry);
const channel = normalizeOptionalString(deliveryContext?.channel);
const to = normalizeOptionalString(deliveryContext?.to);
if (!channel || !to) {
return false;
}
const messageParams: Record<string, unknown> = {
to,
message: UNRESUMABLE_SESSION_NOTICE,
bestEffort: true,
};
if (deliveryContext?.threadId != null) {
messageParams.threadId = deliveryContext.threadId;
}
const actionParams: Record<string, unknown> = {
channel,
action: "send",
sessionKey: params.sessionKey,
sessionId: params.entry.sessionId,
idempotencyKey: `main-session-restart-recovery:${params.entry.sessionId}:failed-notice`,
params: messageParams,
};
const accountId = normalizeOptionalString(deliveryContext?.accountId);
if (accountId) {
actionParams.accountId = accountId;
}
try {
await callGateway({
method: "message.action",
params: actionParams,
timeoutMs: 10_000,
});
log.info(
`sent interrupted main session recovery notice: ${params.sessionKey} (${params.reason})`,
);
return true;
} catch (err) {
log.warn(
`failed to send interrupted main session recovery notice ${params.sessionKey}: ${String(err)}`,
);
return false;
}
}
async function resumeMainSession(params: {
storePath: string;
sessionKey: string;
@@ -432,6 +488,11 @@ async function recoverStore(params: {
const resumeBlockReason = resolveMainSessionResumeBlockReason(messages);
if (resumeBlockReason) {
await sendUnresumableSessionNotice({
entry,
sessionKey,
reason: resumeBlockReason,
});
await markSessionFailed({
storePath: params.storePath,
sessionKey,

View File

@@ -333,6 +333,7 @@ describe("gateway hot reload", () => {
prevSkipProviders = process.env.OPENCLAW_SKIP_PROVIDERS;
prevOpenAiApiKey = process.env.OPENAI_API_KEY;
process.env.OPENCLAW_SKIP_CHANNELS = "0";
process.env.OPENAI_API_KEY = "sk-test-reload"; // pragma: allowlist secret
delete process.env.OPENCLAW_SKIP_GMAIL_WATCHER;
delete process.env.OPENCLAW_SKIP_PROVIDERS;
hoisted.cronInstances.length = 0;