diff --git a/src/agents/main-session-restart-recovery.test.ts b/src/agents/main-session-restart-recovery.test.ts index 8c89813459c..0d83cea4e98 100644 --- a/src/agents/main-session-restart-recovery.test.ts +++ b/src/agents/main-session-restart-recovery.test.ts @@ -574,4 +574,54 @@ describe("main-session-restart-recovery", () => { expect(store["agent:main:main"]?.status).toBe("failed"); expect(store["agent:main:main"]?.abortedLastRun).toBe(true); }); + + it("sends a visible notice before failing an unresumable chat-bound main session", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:telegram:group:12345": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + deliveryContext: { + channel: "telegram", + to: "group:12345", + accountId: "default", + threadId: "topic-1", + }, + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "do the thing" }, + { role: "assistant", content: "partial answer" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(result).toEqual({ recovered: 0, failed: 1, skipped: 0 }); + expect(callGateway).toHaveBeenCalledOnce(); + const gatewayCall = vi.mocked(callGateway).mock.calls[0]?.[0] as + | { method?: string; params?: Record } + | undefined; + expect(gatewayCall?.method).toBe("message.action"); + expect(gatewayCall?.params).toMatchObject({ + channel: "telegram", + action: "send", + accountId: "default", + sessionKey: "agent:main:telegram:group:12345", + sessionId: "main-session", + }); + expect(gatewayCall?.params?.params).toMatchObject({ + to: "group:12345", + threadId: "topic-1", + bestEffort: true, + }); + expect(String((gatewayCall?.params?.params as Record)?.message)).toContain( + "couldn't safely resume", + ); + + const store = loadSessionStore(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:telegram:group:12345"]?.status).toBe("failed"); + expect(store["agent:main:telegram:group:12345"]?.abortedLastRun).toBe(true); + }); }); diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts index a56016f61e1..b3811d486e8 100644 --- a/src/agents/main-session-restart-recovery.ts +++ b/src/agents/main-session-restart-recovery.ts @@ -22,6 +22,8 @@ import { resolveGatewaySessionStoreTarget } from "../gateway/session-utils.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { CommandLane } from "../process/lanes.js"; import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js"; +import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { deliveryContextFromSession } from "../utils/delivery-context.shared.js"; import { resolveAgentSessionDirs } from "./session-dirs.js"; import type { SessionLockInspection } from "./session-write-lock.js"; @@ -30,6 +32,9 @@ const log = createSubsystemLogger("main-session-restart-recovery"); const DEFAULT_RECOVERY_DELAY_MS = 5_000; const MAX_RECOVERY_RETRIES = 3; const RETRY_BACKOFF_MULTIPLIER = 2; +const UNRESUMABLE_SESSION_NOTICE = + "I was interrupted by a gateway restart and couldn't safely resume the previous turn. " + + "Please send that last request again and I'll pick it up cleanly."; function shouldSkipMainRecovery(entry: SessionEntry, sessionKey: string): boolean { if (typeof entry.spawnDepth === "number" && entry.spawnDepth > 0) { @@ -274,6 +279,57 @@ async function markSessionFailed(params: { log.warn(`marked interrupted main session failed: ${params.sessionKey} (${params.reason})`); } +async function sendUnresumableSessionNotice(params: { + entry: SessionEntry; + reason: string; + sessionKey: string; +}): Promise { + const deliveryContext = deliveryContextFromSession(params.entry); + const channel = normalizeOptionalString(deliveryContext?.channel); + const to = normalizeOptionalString(deliveryContext?.to); + if (!channel || !to) { + return false; + } + + const messageParams: Record = { + to, + message: UNRESUMABLE_SESSION_NOTICE, + bestEffort: true, + }; + if (deliveryContext?.threadId != null) { + messageParams.threadId = deliveryContext.threadId; + } + const actionParams: Record = { + channel, + action: "send", + sessionKey: params.sessionKey, + sessionId: params.entry.sessionId, + idempotencyKey: `main-session-restart-recovery:${params.entry.sessionId}:failed-notice`, + params: messageParams, + }; + const accountId = normalizeOptionalString(deliveryContext?.accountId); + if (accountId) { + actionParams.accountId = accountId; + } + + try { + await callGateway({ + method: "message.action", + params: actionParams, + timeoutMs: 10_000, + }); + log.info( + `sent interrupted main session recovery notice: ${params.sessionKey} (${params.reason})`, + ); + return true; + } catch (err) { + log.warn( + `failed to send interrupted main session recovery notice ${params.sessionKey}: ${String(err)}`, + ); + return false; + } +} + async function resumeMainSession(params: { storePath: string; sessionKey: string; @@ -432,6 +488,11 @@ async function recoverStore(params: { const resumeBlockReason = resolveMainSessionResumeBlockReason(messages); if (resumeBlockReason) { + await sendUnresumableSessionNotice({ + entry, + sessionKey, + reason: resumeBlockReason, + }); await markSessionFailed({ storePath: params.storePath, sessionKey, diff --git a/src/gateway/server.reload.test.ts b/src/gateway/server.reload.test.ts index 74c2cf62d6f..769f621bcb6 100644 --- a/src/gateway/server.reload.test.ts +++ b/src/gateway/server.reload.test.ts @@ -333,6 +333,7 @@ describe("gateway hot reload", () => { prevSkipProviders = process.env.OPENCLAW_SKIP_PROVIDERS; prevOpenAiApiKey = process.env.OPENAI_API_KEY; process.env.OPENCLAW_SKIP_CHANNELS = "0"; + process.env.OPENAI_API_KEY = "sk-test-reload"; // pragma: allowlist secret delete process.env.OPENCLAW_SKIP_GMAIL_WATCHER; delete process.env.OPENCLAW_SKIP_PROVIDERS; hoisted.cronInstances.length = 0;