From a9a308becd5c73ac2930d2f9de8e148c62c97980 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 02:25:48 +0100 Subject: [PATCH] fix: recover restart-aborted main sessions --- CHANGELOG.md | 1 + .../main-session-restart-recovery.test.ts | 169 ++++++++++ src/agents/main-session-restart-recovery.ts | 307 ++++++++++++++++++ src/gateway/server-startup-post-attach.ts | 16 +- 4 files changed, 492 insertions(+), 1 deletion(-) create mode 100644 src/agents/main-session-restart-recovery.test.ts create mode 100644 src/agents/main-session-restart-recovery.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d7621ac192..d945162e408 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Gateway/sessions: recover main-agent turns interrupted by a gateway restart from stale transcript-lock evidence, avoiding stuck `status: "running"` sessions without broad post-boot transcript scans. Fixes #70555. Thanks @bitloi. - Plugins/Google Meet: include live Chrome-node readiness in `googlemeet setup` and document the Parallels recovery checks, so stale node tokens or disconnected VM browsers are visible before an agent opens a meeting. Thanks @steipete. - Codex approvals: compact home-directory permission paths to `~` without repeating them as a separate high-risk warning, while preserving filesystem root and wildcard host warnings. Thanks @steipete. - Plugins/runtime deps: isolate the internal npm cache used for bundled plugin runtime-dependency repair and let package updates refresh/verify already-current installs, so failed update or sudo doctor runs can be repaired by rerunning `openclaw update`. Thanks @steipete. diff --git a/src/agents/main-session-restart-recovery.test.ts b/src/agents/main-session-restart-recovery.test.ts new file mode 100644 index 00000000000..0fbfe5b466c --- /dev/null +++ b/src/agents/main-session-restart-recovery.test.ts @@ -0,0 +1,169 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { loadSessionStore, type SessionEntry } from "../config/sessions.js"; +import { callGateway } from "../gateway/call.js"; +import { + markRestartAbortedMainSessionsFromLocks, + recoverRestartAbortedMainSessions, +} from "./main-session-restart-recovery.js"; +import type { SessionLockInspection } from "./session-write-lock.js"; + +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(async () => ({ runId: "run-resumed" })), +})); + +let tmpDir: string; + +beforeEach(async () => { + vi.clearAllMocks(); + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-main-restart-recovery-")); +}); + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +async function makeSessionsDir(agentId = "main"): Promise { + const sessionsDir = path.join(tmpDir, "agents", agentId, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + return sessionsDir; +} + +async function writeStore(sessionsDir: string, store: Record): Promise { + await fs.writeFile(path.join(sessionsDir, "sessions.json"), JSON.stringify(store, null, 2)); +} + +async function writeTranscript( + sessionsDir: string, + sessionId: string, + messages: unknown[], +): Promise { + const lines = messages.map((message) => JSON.stringify({ message })).join("\n"); + await fs.writeFile(path.join(sessionsDir, `${sessionId}.jsonl`), `${lines}\n`); +} + +function cleanedLock(sessionsDir: string, sessionId: string): SessionLockInspection { + return { + lockPath: path.join(sessionsDir, `${sessionId}.jsonl.lock`), + pid: 999_999, + pidAlive: false, + createdAt: new Date(Date.now() - 1_000).toISOString(), + ageMs: 1_000, + stale: true, + staleReasons: ["dead-pid"], + removed: true, + }; +} + +describe("main-session-restart-recovery", () => { + it("marks only main running sessions whose transcript lock was cleaned", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + }, + "agent:main:subagent:child": { + sessionId: "child-session", + updatedAt: Date.now() - 10_000, + status: "running", + spawnDepth: 1, + }, + "agent:main:other": { + sessionId: "other-session", + updatedAt: Date.now() - 10_000, + status: "running", + }, + }); + + const result = await markRestartAbortedMainSessionsFromLocks({ + sessionsDir, + cleanedLocks: [ + cleanedLock(sessionsDir, "main-session"), + cleanedLock(sessionsDir, "child-session"), + ], + }); + + const store = loadSessionStore(path.join(sessionsDir, "sessions.json")); + expect(result).toEqual({ marked: 1, skipped: 1 }); + expect(store["agent:main:main"]?.abortedLastRun).toBe(true); + expect(store["agent:main:subagent:child"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:other"]?.abortedLastRun).toBeUndefined(); + }); + + it("resumes marked sessions with a tool-result transcript tail", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "run the tool" }, + { role: "assistant", content: [{ type: "toolCall", id: "call-1", name: "exec" }] }, + { role: "toolResult", content: "done" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(callGateway).toHaveBeenCalledOnce(); + expect(vi.mocked(callGateway).mock.calls[0]?.[0].params).toMatchObject({ + sessionKey: "agent:main:main", + deliver: false, + lane: "main", + }); + const store = loadSessionStore(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.abortedLastRun).toBe(false); + }); + + it("does not scan ordinary running sessions without the restart-aborted marker", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "current process owns this" }, + { role: "toolResult", content: "done" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + expect(callGateway).not.toHaveBeenCalled(); + }); + + it("fails marked sessions whose transcript tail cannot be resumed", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "hello" }, + { role: "assistant", content: "partial answer" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(result).toEqual({ recovered: 0, failed: 1, skipped: 0 }); + expect(callGateway).not.toHaveBeenCalled(); + const store = loadSessionStore(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.status).toBe("failed"); + expect(store["agent:main:main"]?.abortedLastRun).toBe(true); + }); +}); diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts new file mode 100644 index 00000000000..a976b110aa6 --- /dev/null +++ b/src/agents/main-session-restart-recovery.ts @@ -0,0 +1,307 @@ +/** + * Post-restart recovery for main sessions interrupted while holding a transcript lock. + */ + +import crypto from "node:crypto"; +import path from "node:path"; +import { resolveStateDir } from "../config/paths.js"; +import { type SessionEntry, loadSessionStore, updateSessionStore } from "../config/sessions.js"; +import { callGateway } from "../gateway/call.js"; +import { readSessionMessages } from "../gateway/session-utils.fs.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { CommandLane } from "../process/lanes.js"; +import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js"; +import { resolveAgentSessionDirs } from "./session-dirs.js"; +import type { SessionLockInspection } from "./session-write-lock.js"; + +const log = createSubsystemLogger("main-session-restart-recovery"); + +const DEFAULT_RECOVERY_DELAY_MS = 5_000; +const MAX_RECOVERY_RETRIES = 3; +const RETRY_BACKOFF_MULTIPLIER = 2; + +function shouldSkipMainRecovery(entry: SessionEntry, sessionKey: string): boolean { + if (typeof entry.spawnDepth === "number" && entry.spawnDepth > 0) { + return true; + } + if (entry.subagentRole != null) { + return true; + } + return ( + isSubagentSessionKey(sessionKey) || isCronSessionKey(sessionKey) || isAcpSessionKey(sessionKey) + ); +} + +function sessionIdFromLockPath(lockPath: string): string | undefined { + const fileName = path.basename(lockPath); + if (!fileName.endsWith(".jsonl.lock")) { + return undefined; + } + const sessionId = fileName.slice(0, -".jsonl.lock".length).trim(); + return sessionId || undefined; +} + +function getMessageRole(message: unknown): string | undefined { + if (!message || typeof message !== "object") { + return undefined; + } + const role = (message as { role?: unknown }).role; + return typeof role === "string" ? role : undefined; +} + +function isMeaningfulTailMessage(message: unknown): boolean { + const role = getMessageRole(message); + if (!role || role === "system") { + return false; + } + return true; +} + +function isResumableTailMessage(message: unknown): boolean { + const role = getMessageRole(message); + return role === "user" || role === "tool" || role === "toolResult"; +} + +function isMainSessionResumable(messages: unknown[]): boolean { + const lastMeaningful = messages.toReversed().find(isMeaningfulTailMessage); + return lastMeaningful ? isResumableTailMessage(lastMeaningful) : false; +} + +function buildResumeMessage(): string { + return ( + "[System] Your previous turn was interrupted by a gateway restart while " + + "OpenClaw was waiting on tool/model work. Continue from the existing " + + "transcript and finish the interrupted response." + ); +} + +async function markSessionFailed(params: { + storePath: string; + sessionKey: string; + reason: string; +}): Promise { + await updateSessionStore( + params.storePath, + (store) => { + const entry = store[params.sessionKey]; + if (!entry || entry.status !== "running") { + return; + } + entry.status = "failed"; + entry.abortedLastRun = true; + entry.endedAt = Date.now(); + entry.updatedAt = entry.endedAt; + store[params.sessionKey] = entry; + }, + { skipMaintenance: true }, + ); + log.warn(`marked interrupted main session failed: ${params.sessionKey} (${params.reason})`); +} + +async function resumeMainSession(params: { + storePath: string; + sessionKey: string; +}): Promise { + try { + await callGateway<{ runId: string }>({ + method: "agent", + params: { + message: buildResumeMessage(), + sessionKey: params.sessionKey, + idempotencyKey: crypto.randomUUID(), + deliver: false, + lane: CommandLane.Main, + }, + timeoutMs: 10_000, + }); + await updateSessionStore( + params.storePath, + (store) => { + const entry = store[params.sessionKey]; + if (!entry) { + return; + } + entry.abortedLastRun = false; + entry.updatedAt = Date.now(); + store[params.sessionKey] = entry; + }, + { skipMaintenance: true }, + ); + log.info(`resumed interrupted main session: ${params.sessionKey}`); + return true; + } catch (err) { + log.warn(`failed to resume interrupted main session ${params.sessionKey}: ${String(err)}`); + return false; + } +} + +export async function markRestartAbortedMainSessionsFromLocks(params: { + sessionsDir: string; + cleanedLocks: SessionLockInspection[]; +}): Promise<{ marked: number; skipped: number }> { + const result = { marked: 0, skipped: 0 }; + const interruptedSessionIds = new Set( + params.cleanedLocks + .map((lock) => sessionIdFromLockPath(lock.lockPath)) + .filter((sessionId): sessionId is string => Boolean(sessionId)), + ); + if (interruptedSessionIds.size === 0) { + return result; + } + + const storePath = path.join(path.resolve(params.sessionsDir), "sessions.json"); + await updateSessionStore( + storePath, + (store) => { + for (const [sessionKey, entry] of Object.entries(store)) { + if (!entry || entry.status !== "running") { + continue; + } + if (shouldSkipMainRecovery(entry, sessionKey)) { + result.skipped++; + continue; + } + if (!interruptedSessionIds.has(entry.sessionId)) { + continue; + } + entry.abortedLastRun = true; + store[sessionKey] = entry; + result.marked++; + } + }, + { skipMaintenance: true }, + ); + + if (result.marked > 0) { + log.warn(`marked ${result.marked} interrupted main session(s) from stale transcript locks`); + } + return result; +} + +async function recoverStore(params: { + storePath: string; + resumedSessionKeys: Set; +}): Promise<{ recovered: number; failed: number; skipped: number }> { + const result = { recovered: 0, failed: 0, skipped: 0 }; + let store: Record; + try { + store = loadSessionStore(params.storePath); + } catch (err) { + log.warn(`failed to load session store ${params.storePath}: ${String(err)}`); + result.failed++; + return result; + } + + for (const [sessionKey, entry] of Object.entries(store).toSorted(([a], [b]) => + a.localeCompare(b), + )) { + if (!entry || entry.status !== "running" || entry.abortedLastRun !== true) { + continue; + } + if (shouldSkipMainRecovery(entry, sessionKey)) { + result.skipped++; + continue; + } + if (params.resumedSessionKeys.has(sessionKey)) { + result.skipped++; + continue; + } + + let messages: unknown[]; + try { + messages = readSessionMessages(entry.sessionId, params.storePath, entry.sessionFile); + } catch (err) { + log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`); + result.failed++; + continue; + } + + if (!isMainSessionResumable(messages)) { + await markSessionFailed({ + storePath: params.storePath, + sessionKey, + reason: "transcript tail is not resumable", + }); + result.failed++; + continue; + } + + const resumed = await resumeMainSession({ + storePath: params.storePath, + sessionKey, + }); + if (resumed) { + params.resumedSessionKeys.add(sessionKey); + result.recovered++; + } else { + result.failed++; + } + } + + return result; +} + +export async function recoverRestartAbortedMainSessions( + params: { + stateDir?: string; + resumedSessionKeys?: Set; + } = {}, +): Promise<{ recovered: number; failed: number; skipped: number }> { + const result = { recovered: 0, failed: 0, skipped: 0 }; + const resumedSessionKeys = params.resumedSessionKeys ?? new Set(); + const stateDir = params.stateDir ?? resolveStateDir(process.env); + const sessionDirs = await resolveAgentSessionDirs(stateDir); + + for (const sessionsDir of sessionDirs) { + const storeResult = await recoverStore({ + storePath: path.join(sessionsDir, "sessions.json"), + resumedSessionKeys, + }); + result.recovered += storeResult.recovered; + result.failed += storeResult.failed; + result.skipped += storeResult.skipped; + } + + if (result.recovered > 0 || result.failed > 0) { + log.info( + `main-session restart recovery complete: recovered=${result.recovered} failed=${result.failed} skipped=${result.skipped}`, + ); + } + return result; +} + +export function scheduleRestartAbortedMainSessionRecovery( + params: { + delayMs?: number; + maxRetries?: number; + stateDir?: string; + } = {}, +): void { + const initialDelay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS; + const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES; + const resumedSessionKeys = new Set(); + + const attemptRecovery = (attempt: number, delay: number) => { + setTimeout(() => { + void recoverRestartAbortedMainSessions({ + stateDir: params.stateDir, + resumedSessionKeys, + }) + .then((result) => { + if (result.failed > 0 && attempt < maxRetries) { + attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER); + } + }) + .catch((err) => { + if (attempt < maxRetries) { + log.warn(`main-session restart recovery failed: ${String(err)}`); + attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER); + } else { + log.warn(`main-session restart recovery gave up: ${String(err)}`); + } + }); + }, delay).unref?.(); + }; + + attemptRecovery(1, initialDelay); +} diff --git a/src/gateway/server-startup-post-attach.ts b/src/gateway/server-startup-post-attach.ts index 31fb20debe1..81e2d67b92c 100644 --- a/src/gateway/server-startup-post-attach.ts +++ b/src/gateway/server-startup-post-attach.ts @@ -157,12 +157,20 @@ export async function startGatewaySidecars(params: { const stateDir = resolveStateDir(process.env); const sessionDirs = await resolveAgentSessionDirs(stateDir); for (const sessionsDir of sessionDirs) { - await cleanStaleLockFiles({ + const result = await cleanStaleLockFiles({ sessionsDir, staleMs: SESSION_LOCK_STALE_MS, removeStale: true, log: { warn: (message) => params.log.warn(message) }, }); + if (result.cleaned.length > 0) { + const { markRestartAbortedMainSessionsFromLocks } = + await import("../agents/main-session-restart-recovery.js"); + await markRestartAbortedMainSessionsFromLocks({ + sessionsDir, + cleanedLocks: result.cleaned, + }); + } } } catch (err) { params.log.warn(`session lock cleanup failed on startup: ${String(err)}`); @@ -354,6 +362,12 @@ export async function startGatewaySidecars(params: { scheduleSubagentOrphanRecovery(); }); + await measureStartup(params.startupTrace, "sidecars.main-session-recovery", async () => { + const { scheduleRestartAbortedMainSessionRecovery } = + await import("../agents/main-session-restart-recovery.js"); + scheduleRestartAbortedMainSessionRecovery(); + }); + return { pluginServices }; }