diff --git a/src/agents/pi-embedded-runner.e2e.test.ts b/src/agents/pi-embedded-runner.e2e.test.ts index 9f6059c3449..08024ea29d2 100644 --- a/src/agents/pi-embedded-runner.e2e.test.ts +++ b/src/agents/pi-embedded-runner.e2e.test.ts @@ -73,6 +73,9 @@ vi.mock("@mariozechner/pi-ai", async () => { return buildAssistantMessage(model); }, streamSimple: (model: { api: string; provider: string; id: string }) => { + if (model.id === "mock-throw") { + throw new Error("transport failed"); + } const stream = new actual.AssistantMessageEventStream(); queueMicrotask(() => { stream.push({ @@ -182,20 +185,21 @@ const textFromContent = (content: unknown) => { return undefined; }; -const readSessionMessages = async (sessionFile: string) => { +const readSessionEntries = async (sessionFile: string) => { const raw = await fs.readFile(sessionFile, "utf-8"); return raw .split(/\r?\n/) .filter(Boolean) - .map( - (line) => - JSON.parse(line) as { - type?: string; - message?: { role?: string; content?: unknown }; - }, - ) + .map((line) => JSON.parse(line) as { type?: string; customType?: string; data?: unknown }); +}; + +const readSessionMessages = async (sessionFile: string) => { + const entries = await readSessionEntries(sessionFile); + return entries .filter((entry) => entry.type === "message") - .map((entry) => entry.message as { role?: string; content?: unknown }); + .map( + (entry) => (entry as { message?: { role?: string; content?: unknown } }).message, + ) as Array<{ role?: string; content?: unknown }>; }; const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string) => { @@ -373,6 +377,35 @@ describe("runEmbeddedPiAgent", () => { expect(userIndex).toBeGreaterThanOrEqual(0); }); + it("persists prompt transport errors as transcript entries", async () => { + const sessionFile = nextSessionFile(); + const cfg = makeOpenAiConfig(["mock-throw"]); + await ensureModels(cfg); + + const result = await runEmbeddedPiAgent({ + sessionId: "session:test", + sessionKey: testSessionKey, + sessionFile, + workspaceDir, + config: cfg, + prompt: "transport error", + provider: "openai", + model: "mock-throw", + timeoutMs: 5_000, + agentDir, + enqueue: immediateEnqueue, + }); + expect(result.payloads[0]?.isError).toBe(true); + + const entries = await readSessionEntries(sessionFile); + const promptErrorEntry = entries.find( + (entry) => entry.type === "custom" && entry.customType === "openclaw:prompt-error", + ) as { data?: { error?: string } } | undefined; + + expect(promptErrorEntry).toBeTruthy(); + expect(promptErrorEntry?.data?.error).toContain("transport failed"); + }); + it( "appends new user + assistant after existing transcript entries", { timeout: 90_000 }, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 6fa11039573..80e088201ff 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -848,6 +848,7 @@ export async function runEmbeddedAttempt( }).sessionAgentId; let promptError: unknown = null; + let promptErrorSource: "prompt" | "compaction" | null = null; try { const promptStartedAt = Date.now(); @@ -1000,6 +1001,7 @@ export async function runEmbeddedAttempt( } } catch (err) { promptError = err; + promptErrorSource = "prompt"; } finally { log.debug( `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, @@ -1022,6 +1024,7 @@ export async function runEmbeddedAttempt( if (isRunnerAbortError(err)) { if (!promptError) { promptError = err; + promptErrorSource = "compaction"; } if (!isProbeSession) { log.debug( @@ -1070,6 +1073,23 @@ export async function runEmbeddedAttempt( } messagesSnapshot = snapshotSelection.messagesSnapshot; sessionIdUsed = snapshotSelection.sessionIdUsed; + + if (promptError && promptErrorSource === "prompt") { + try { + sessionManager.appendCustomEntry("openclaw:prompt-error", { + timestamp: Date.now(), + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: params.modelId, + api: params.model.api, + error: describeUnknownError(promptError), + }); + } catch (entryErr) { + log.warn(`failed to persist prompt error entry: ${String(entryErr)}`); + } + } + cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, note: timedOutDuringCompaction diff --git a/src/agents/session-write-lock.e2e.test.ts b/src/agents/session-write-lock.e2e.test.ts index bbe26cb7096..9d829ba5cb9 100644 --- a/src/agents/session-write-lock.e2e.test.ts +++ b/src/agents/session-write-lock.e2e.test.ts @@ -1,8 +1,8 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it } from "vitest"; -import { __testing, acquireSessionWriteLock } from "./session-write-lock.js"; +import { describe, expect, it, vi } from "vitest"; +import { __testing, acquireSessionWriteLock, cleanStaleLockFiles } from "./session-write-lock.js"; describe("acquireSessionWriteLock", () => { it("reuses locks across symlinked session paths", async () => { @@ -72,6 +72,95 @@ describe("acquireSessionWriteLock", () => { } }); + it("watchdog releases stale in-process locks", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + try { + const sessionFile = path.join(root, "session.jsonl"); + const lockPath = `${sessionFile}.lock`; + const lockA = await acquireSessionWriteLock({ + sessionFile, + timeoutMs: 500, + maxHoldMs: 1, + }); + + const released = await __testing.runLockWatchdogCheck(Date.now() + 1000); + expect(released).toBeGreaterThanOrEqual(1); + await expect(fs.access(lockPath)).rejects.toThrow(); + + const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + await expect(fs.access(lockPath)).resolves.toBeUndefined(); + + // Old release handle must not affect the new lock. + await lockA.release(); + await expect(fs.access(lockPath)).resolves.toBeUndefined(); + + await lockB.release(); + await expect(fs.access(lockPath)).rejects.toThrow(); + } finally { + warnSpy.mockRestore(); + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("cleans stale .jsonl lock files in sessions directories", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + + const nowMs = Date.now(); + const staleDeadLock = path.join(sessionsDir, "dead.jsonl.lock"); + const staleAliveLock = path.join(sessionsDir, "old-live.jsonl.lock"); + const freshAliveLock = path.join(sessionsDir, "fresh-live.jsonl.lock"); + + try { + await fs.writeFile( + staleDeadLock, + JSON.stringify({ + pid: 999_999, + createdAt: new Date(nowMs - 120_000).toISOString(), + }), + "utf8", + ); + await fs.writeFile( + staleAliveLock, + JSON.stringify({ + pid: process.pid, + createdAt: new Date(nowMs - 120_000).toISOString(), + }), + "utf8", + ); + await fs.writeFile( + freshAliveLock, + JSON.stringify({ + pid: process.pid, + createdAt: new Date(nowMs - 1_000).toISOString(), + }), + "utf8", + ); + + const result = await cleanStaleLockFiles({ + sessionsDir, + staleMs: 30_000, + nowMs, + removeStale: true, + }); + + expect(result.locks).toHaveLength(3); + expect(result.cleaned).toHaveLength(2); + expect(result.cleaned.map((entry) => path.basename(entry.lockPath)).toSorted()).toEqual([ + "dead.jsonl.lock", + "old-live.jsonl.lock", + ]); + + await expect(fs.access(staleDeadLock)).rejects.toThrow(); + await expect(fs.access(staleAliveLock)).rejects.toThrow(); + await expect(fs.access(freshAliveLock)).resolves.toBeUndefined(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + it("removes held locks on termination signals", async () => { const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; for (const signal of signals) { diff --git a/src/agents/session-write-lock.ts b/src/agents/session-write-lock.ts index 94d43d5ac8d..ca0b0c6f7d2 100644 --- a/src/agents/session-write-lock.ts +++ b/src/agents/session-write-lock.ts @@ -4,26 +4,51 @@ import path from "node:path"; import { isPidAlive } from "../shared/pid-alive.js"; type LockFilePayload = { - pid: number; - createdAt: string; + pid?: number; + createdAt?: string; }; type HeldLock = { count: number; handle: fs.FileHandle; lockPath: string; + acquiredAt: number; + maxHoldMs: number; + releasePromise?: Promise; +}; + +export type SessionLockInspection = { + lockPath: string; + pid: number | null; + pidAlive: boolean; + createdAt: string | null; + ageMs: number | null; + stale: boolean; + staleReasons: string[]; + removed: boolean; }; const CLEANUP_SIGNALS = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; type CleanupSignal = (typeof CLEANUP_SIGNALS)[number]; const CLEANUP_STATE_KEY = Symbol.for("openclaw.sessionWriteLockCleanupState"); const HELD_LOCKS_KEY = Symbol.for("openclaw.sessionWriteLockHeldLocks"); +const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState"); + +const DEFAULT_STALE_MS = 30 * 60 * 1000; +const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000; +const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000; type CleanupState = { registered: boolean; cleanupHandlers: Map void>; }; +type WatchdogState = { + started: boolean; + intervalMs: number; + timer?: NodeJS.Timeout; +}; + function resolveHeldLocks(): Map { const proc = process as NodeJS.Process & { [HELD_LOCKS_KEY]?: Map; @@ -49,6 +74,82 @@ function resolveCleanupState(): CleanupState { return proc[CLEANUP_STATE_KEY]; } +function resolveWatchdogState(): WatchdogState { + const proc = process as NodeJS.Process & { + [WATCHDOG_STATE_KEY]?: WatchdogState; + }; + if (!proc[WATCHDOG_STATE_KEY]) { + proc[WATCHDOG_STATE_KEY] = { + started: false, + intervalMs: DEFAULT_WATCHDOG_INTERVAL_MS, + }; + } + return proc[WATCHDOG_STATE_KEY]; +} + +function resolvePositiveMs( + value: number | undefined, + fallback: number, + opts: { allowInfinity?: boolean } = {}, +): number { + if (typeof value !== "number" || Number.isNaN(value) || value <= 0) { + return fallback; + } + if (value === Number.POSITIVE_INFINITY) { + return opts.allowInfinity ? value : fallback; + } + if (!Number.isFinite(value)) { + return fallback; + } + return value; +} + +async function releaseHeldLock( + normalizedSessionFile: string, + held: HeldLock, + opts: { force?: boolean } = {}, +): Promise { + const current = HELD_LOCKS.get(normalizedSessionFile); + if (current !== held) { + return false; + } + + if (opts.force) { + held.count = 0; + } else { + held.count -= 1; + if (held.count > 0) { + return false; + } + } + + if (held.releasePromise) { + await held.releasePromise.catch(() => undefined); + return true; + } + + HELD_LOCKS.delete(normalizedSessionFile); + held.releasePromise = (async () => { + try { + await held.handle.close(); + } catch { + // Ignore errors during cleanup - best effort. + } + try { + await fs.rm(held.lockPath, { force: true }); + } catch { + // Ignore errors during cleanup - best effort. + } + })(); + + try { + await held.releasePromise; + return true; + } finally { + held.releasePromise = undefined; + } +} + /** * Synchronously release all held locks. * Used during process exit when async operations aren't reliable. @@ -71,6 +172,42 @@ function releaseAllLocksSync(): void { } } +async function runLockWatchdogCheck(nowMs = Date.now()): Promise { + let released = 0; + for (const [sessionFile, held] of HELD_LOCKS.entries()) { + const heldForMs = nowMs - held.acquiredAt; + if (heldForMs <= held.maxHoldMs) { + continue; + } + + // eslint-disable-next-line no-console + console.warn( + `[session-write-lock] releasing lock held for ${heldForMs}ms (max=${held.maxHoldMs}ms): ${held.lockPath}`, + ); + + const didRelease = await releaseHeldLock(sessionFile, held, { force: true }); + if (didRelease) { + released += 1; + } + } + return released; +} + +function ensureWatchdogStarted(intervalMs: number): void { + const watchdogState = resolveWatchdogState(); + if (watchdogState.started) { + return; + } + watchdogState.started = true; + watchdogState.intervalMs = intervalMs; + watchdogState.timer = setInterval(() => { + void runLockWatchdogCheck().catch(() => { + // Ignore watchdog errors - best effort cleanup only. + }); + }, intervalMs); + watchdogState.timer.unref?.(); +} + function handleTerminationSignal(signal: CleanupSignal): void { releaseAllLocksSync(); const cleanupState = resolveCleanupState(); @@ -99,6 +236,8 @@ function registerCleanupHandlers(): void { }); } + ensureWatchdogStarted(DEFAULT_WATCHDOG_INTERVAL_MS); + // Handle termination signals for (const signal of CLEANUP_SIGNALS) { if (cleanupState.cleanupHandlers.has(signal)) { @@ -117,29 +256,125 @@ function registerCleanupHandlers(): void { async function readLockPayload(lockPath: string): Promise { try { const raw = await fs.readFile(lockPath, "utf8"); - const parsed = JSON.parse(raw) as Partial; - if (typeof parsed.pid !== "number") { - return null; + const parsed = JSON.parse(raw) as Record; + const payload: LockFilePayload = {}; + if (typeof parsed.pid === "number") { + payload.pid = parsed.pid; } - if (typeof parsed.createdAt !== "string") { - return null; + if (typeof parsed.createdAt === "string") { + payload.createdAt = parsed.createdAt; } - return { pid: parsed.pid, createdAt: parsed.createdAt }; + return payload; } catch { return null; } } +function inspectLockPayload( + payload: LockFilePayload | null, + staleMs: number, + nowMs: number, +): Pick< + SessionLockInspection, + "pid" | "pidAlive" | "createdAt" | "ageMs" | "stale" | "staleReasons" +> { + const pid = typeof payload?.pid === "number" ? payload.pid : null; + const pidAlive = pid !== null ? isPidAlive(pid) : false; + const createdAt = typeof payload?.createdAt === "string" ? payload.createdAt : null; + const createdAtMs = createdAt ? Date.parse(createdAt) : Number.NaN; + const ageMs = Number.isFinite(createdAtMs) ? Math.max(0, nowMs - createdAtMs) : null; + + const staleReasons: string[] = []; + if (pid === null) { + staleReasons.push("missing-pid"); + } else if (!pidAlive) { + staleReasons.push("dead-pid"); + } + if (ageMs === null) { + staleReasons.push("invalid-createdAt"); + } else if (ageMs > staleMs) { + staleReasons.push("too-old"); + } + + return { + pid, + pidAlive, + createdAt, + ageMs, + stale: staleReasons.length > 0, + staleReasons, + }; +} + +export async function cleanStaleLockFiles(params: { + sessionsDir: string; + staleMs?: number; + removeStale?: boolean; + nowMs?: number; + log?: { + warn?: (message: string) => void; + info?: (message: string) => void; + }; +}): Promise<{ locks: SessionLockInspection[]; cleaned: SessionLockInspection[] }> { + const sessionsDir = path.resolve(params.sessionsDir); + const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); + const removeStale = params.removeStale !== false; + const nowMs = params.nowMs ?? Date.now(); + + let entries: fsSync.Dirent[] = []; + try { + entries = await fs.readdir(sessionsDir, { withFileTypes: true }); + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") { + return { locks: [], cleaned: [] }; + } + throw err; + } + + const locks: SessionLockInspection[] = []; + const cleaned: SessionLockInspection[] = []; + const lockEntries = entries + .filter((entry) => entry.name.endsWith(".jsonl.lock")) + .toSorted((a, b) => a.name.localeCompare(b.name)); + + for (const entry of lockEntries) { + const lockPath = path.join(sessionsDir, entry.name); + const payload = await readLockPayload(lockPath); + const inspected = inspectLockPayload(payload, staleMs, nowMs); + const lockInfo: SessionLockInspection = { + lockPath, + ...inspected, + removed: false, + }; + + if (lockInfo.stale && removeStale) { + await fs.rm(lockPath, { force: true }); + lockInfo.removed = true; + cleaned.push(lockInfo); + params.log?.warn?.( + `removed stale session lock: ${lockPath} (${lockInfo.staleReasons.join(", ") || "unknown"})`, + ); + } + + locks.push(lockInfo); + } + + return { locks, cleaned }; +} + export async function acquireSessionWriteLock(params: { sessionFile: string; timeoutMs?: number; staleMs?: number; + maxHoldMs?: number; }): Promise<{ release: () => Promise; }> { registerCleanupHandlers(); - const timeoutMs = params.timeoutMs ?? 10_000; - const staleMs = params.staleMs ?? 30 * 60 * 1000; + const timeoutMs = resolvePositiveMs(params.timeoutMs, 10_000, { allowInfinity: true }); + const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); + const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS); const sessionFile = path.resolve(params.sessionFile); const sessionDir = path.dirname(sessionFile); await fs.mkdir(sessionDir, { recursive: true }); @@ -151,25 +386,14 @@ export async function acquireSessionWriteLock(params: { } const normalizedSessionFile = path.join(normalizedDir, path.basename(sessionFile)); const lockPath = `${normalizedSessionFile}.lock`; - const release = async () => { - const current = HELD_LOCKS.get(normalizedSessionFile); - if (!current) { - return; - } - current.count -= 1; - if (current.count > 0) { - return; - } - HELD_LOCKS.delete(normalizedSessionFile); - await current.handle.close(); - await fs.rm(current.lockPath, { force: true }); - }; const held = HELD_LOCKS.get(normalizedSessionFile); if (held) { held.count += 1; return { - release, + release: async () => { + await releaseHeldLock(normalizedSessionFile, held); + }, }; } @@ -179,13 +403,20 @@ export async function acquireSessionWriteLock(params: { attempt += 1; try { const handle = await fs.open(lockPath, "wx"); - await handle.writeFile( - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), - "utf8", - ); - HELD_LOCKS.set(normalizedSessionFile, { count: 1, handle, lockPath }); + const createdAt = new Date().toISOString(); + await handle.writeFile(JSON.stringify({ pid: process.pid, createdAt }, null, 2), "utf8"); + const createdHeld: HeldLock = { + count: 1, + handle, + lockPath, + acquiredAt: Date.now(), + maxHoldMs, + }; + HELD_LOCKS.set(normalizedSessionFile, createdHeld); return { - release, + release: async () => { + await releaseHeldLock(normalizedSessionFile, createdHeld); + }, }; } catch (err) { const code = (err as { code?: unknown }).code; @@ -193,10 +424,8 @@ export async function acquireSessionWriteLock(params: { throw err; } const payload = await readLockPayload(lockPath); - const createdAt = payload?.createdAt ? Date.parse(payload.createdAt) : NaN; - const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs; - const alive = payload?.pid ? isPidAlive(payload.pid) : false; - if (stale || !alive) { + const inspected = inspectLockPayload(payload, staleMs, Date.now()); + if (inspected.stale) { await fs.rm(lockPath, { force: true }); continue; } @@ -207,7 +436,7 @@ export async function acquireSessionWriteLock(params: { } const payload = await readLockPayload(lockPath); - const owner = payload?.pid ? `pid=${payload.pid}` : "unknown"; + const owner = typeof payload?.pid === "number" ? `pid=${payload.pid}` : "unknown"; throw new Error(`session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`); } @@ -215,4 +444,5 @@ export const __testing = { cleanupSignals: [...CLEANUP_SIGNALS], handleTerminationSignal, releaseAllLocksSync, + runLockWatchdogCheck, }; diff --git a/src/commands/doctor-session-locks.test.ts b/src/commands/doctor-session-locks.test.ts new file mode 100644 index 00000000000..eb5a656a833 --- /dev/null +++ b/src/commands/doctor-session-locks.test.ts @@ -0,0 +1,83 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const note = vi.hoisted(() => vi.fn()); + +vi.mock("../terminal/note.js", () => ({ + note, +})); + +import { noteSessionLockHealth } from "./doctor-session-locks.js"; + +describe("noteSessionLockHealth", () => { + let root: string; + let prevStateDir: string | undefined; + + beforeEach(async () => { + note.mockReset(); + prevStateDir = process.env.OPENCLAW_STATE_DIR; + root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-locks-")); + process.env.OPENCLAW_STATE_DIR = root; + }); + + afterEach(async () => { + if (prevStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = prevStateDir; + } + await fs.rm(root, { recursive: true, force: true }); + }); + + it("reports existing lock files with pid status and age", async () => { + const sessionsDir = path.join(root, "agents", "main", "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + const lockPath = path.join(sessionsDir, "active.jsonl.lock"); + await fs.writeFile( + lockPath, + JSON.stringify({ pid: process.pid, createdAt: new Date(Date.now() - 1500).toISOString() }), + "utf8", + ); + + await noteSessionLockHealth({ shouldRepair: false, staleMs: 60_000 }); + + expect(note).toHaveBeenCalledTimes(1); + const [message, title] = note.mock.calls[0] as [string, string]; + expect(title).toBe("Session locks"); + expect(message).toContain("Found 1 session lock file"); + expect(message).toContain(`pid=${process.pid} (alive)`); + expect(message).toContain("stale=no"); + await expect(fs.access(lockPath)).resolves.toBeUndefined(); + }); + + it("removes stale locks in repair mode", async () => { + const sessionsDir = path.join(root, "agents", "main", "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + + const staleLock = path.join(sessionsDir, "stale.jsonl.lock"); + const freshLock = path.join(sessionsDir, "fresh.jsonl.lock"); + + await fs.writeFile( + staleLock, + JSON.stringify({ pid: -1, createdAt: new Date(Date.now() - 120_000).toISOString() }), + "utf8", + ); + await fs.writeFile( + freshLock, + JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }), + "utf8", + ); + + await noteSessionLockHealth({ shouldRepair: true, staleMs: 30_000 }); + + expect(note).toHaveBeenCalledTimes(1); + const [message] = note.mock.calls[0] as [string, string]; + expect(message).toContain("[removed]"); + expect(message).toContain("Removed 1 stale session lock file"); + + await expect(fs.access(staleLock)).rejects.toThrow(); + await expect(fs.access(freshLock)).resolves.toBeUndefined(); + }); +}); diff --git a/src/commands/doctor-session-locks.ts b/src/commands/doctor-session-locks.ts new file mode 100644 index 00000000000..d516484a5ee --- /dev/null +++ b/src/commands/doctor-session-locks.ts @@ -0,0 +1,106 @@ +import type { Dirent } from "node:fs"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { cleanStaleLockFiles, type SessionLockInspection } from "../agents/session-write-lock.js"; +import { resolveStateDir } from "../config/paths.js"; +import { note } from "../terminal/note.js"; +import { shortenHomePath } from "../utils.js"; + +const DEFAULT_STALE_MS = 30 * 60 * 1000; + +async function resolveAgentSessionDirs(stateDir: string): Promise { + const agentsDir = path.join(stateDir, "agents"); + let entries: Dirent[] = []; + try { + entries = await fs.readdir(agentsDir, { withFileTypes: true }); + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") { + return []; + } + throw err; + } + + return entries + .filter((entry) => entry.isDirectory()) + .map((entry) => path.join(agentsDir, entry.name, "sessions")) + .toSorted((a, b) => a.localeCompare(b)); +} + +function formatAge(ageMs: number | null): string { + if (ageMs === null) { + return "unknown"; + } + const seconds = Math.floor(ageMs / 1000); + if (seconds < 60) { + return `${seconds}s`; + } + const minutes = Math.floor(seconds / 60); + const remainingSeconds = seconds % 60; + if (minutes < 60) { + return `${minutes}m${remainingSeconds}s`; + } + const hours = Math.floor(minutes / 60); + const remainingMinutes = minutes % 60; + return `${hours}h${remainingMinutes}m`; +} + +function formatLockLine(lock: SessionLockInspection): string { + const pidStatus = + lock.pid === null ? "pid=missing" : `pid=${lock.pid} (${lock.pidAlive ? "alive" : "dead"})`; + const ageStatus = `age=${formatAge(lock.ageMs)}`; + const staleStatus = lock.stale + ? `stale=yes (${lock.staleReasons.join(", ") || "unknown"})` + : "stale=no"; + const removedStatus = lock.removed ? " [removed]" : ""; + return `- ${shortenHomePath(lock.lockPath)} ${pidStatus} ${ageStatus} ${staleStatus}${removedStatus}`; +} + +export async function noteSessionLockHealth(params?: { shouldRepair?: boolean; staleMs?: number }) { + const shouldRepair = params?.shouldRepair === true; + const staleMs = params?.staleMs ?? DEFAULT_STALE_MS; + let sessionDirs: string[] = []; + try { + sessionDirs = await resolveAgentSessionDirs(resolveStateDir(process.env)); + } catch (err) { + note(`- Failed to inspect session lock files: ${String(err)}`, "Session locks"); + return; + } + + if (sessionDirs.length === 0) { + return; + } + + const allLocks: SessionLockInspection[] = []; + for (const sessionsDir of sessionDirs) { + const result = await cleanStaleLockFiles({ + sessionsDir, + staleMs, + removeStale: shouldRepair, + }); + allLocks.push(...result.locks); + } + + if (allLocks.length === 0) { + return; + } + + const staleCount = allLocks.filter((lock) => lock.stale).length; + const removedCount = allLocks.filter((lock) => lock.removed).length; + const lines: string[] = [ + `- Found ${allLocks.length} session lock file${allLocks.length === 1 ? "" : "s"}.`, + ...allLocks.toSorted((a, b) => a.lockPath.localeCompare(b.lockPath)).map(formatLockLine), + ]; + + if (staleCount > 0 && !shouldRepair) { + lines.push(`- ${staleCount} lock file${staleCount === 1 ? " is" : "s are"} stale.`); + lines.push('- Run "openclaw doctor --fix" to remove stale lock files automatically.'); + } + if (shouldRepair && removedCount > 0) { + lines.push( + `- Removed ${removedCount} stale session lock file${removedCount === 1 ? "" : "s"}.`, + ); + } + + note(lines.join("\n"), "Session locks"); +} diff --git a/src/commands/doctor.ts b/src/commands/doctor.ts index ac2d77b0ced..a29f756f890 100644 --- a/src/commands/doctor.ts +++ b/src/commands/doctor.ts @@ -44,6 +44,7 @@ import { import { createDoctorPrompter, type DoctorOptions } from "./doctor-prompter.js"; import { maybeRepairSandboxImages, noteSandboxScopeWarnings } from "./doctor-sandbox.js"; import { noteSecurityWarnings } from "./doctor-security.js"; +import { noteSessionLockHealth } from "./doctor-session-locks.js"; import { noteStateIntegrity, noteWorkspaceBackupTip } from "./doctor-state-integrity.js"; import { detectLegacyStateMigrations, @@ -188,6 +189,7 @@ export async function doctorCommand( } await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH); + await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair }); cfg = await maybeRepairSandboxImages(cfg, runtime, prompter); noteSandboxScopeWarnings(cfg); diff --git a/src/gateway/server-startup.ts b/src/gateway/server-startup.ts index e9267d855ec..89e2e38cf60 100644 --- a/src/gateway/server-startup.ts +++ b/src/gateway/server-startup.ts @@ -1,3 +1,6 @@ +import type { Dirent } from "node:fs"; +import fs from "node:fs/promises"; +import path from "node:path"; import type { CliDeps } from "../cli/deps.js"; import type { loadConfig } from "../config/config.js"; import type { loadOpenClawPlugins } from "../plugins/loader.js"; @@ -8,6 +11,8 @@ import { resolveConfiguredModelRef, resolveHooksGmailModel, } from "../agents/model-selection.js"; +import { cleanStaleLockFiles } from "../agents/session-write-lock.js"; +import { resolveStateDir } from "../config/paths.js"; import { startGmailWatcher } from "../hooks/gmail-watcher.js"; import { clearInternalHooks, @@ -24,6 +29,27 @@ import { } from "./server-restart-sentinel.js"; import { startGatewayMemoryBackend } from "./server-startup-memory.js"; +const SESSION_LOCK_STALE_MS = 30 * 60 * 1000; + +async function resolveAgentSessionDirs(stateDir: string): Promise { + const agentsDir = path.join(stateDir, "agents"); + let entries: Dirent[] = []; + try { + entries = await fs.readdir(agentsDir, { withFileTypes: true }); + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") { + return []; + } + throw err; + } + + return entries + .filter((entry) => entry.isDirectory()) + .map((entry) => path.join(agentsDir, entry.name, "sessions")) + .toSorted((a, b) => a.localeCompare(b)); +} + export async function startGatewaySidecars(params: { cfg: ReturnType; pluginRegistry: ReturnType; @@ -39,6 +65,21 @@ export async function startGatewaySidecars(params: { logChannels: { info: (msg: string) => void; error: (msg: string) => void }; logBrowser: { error: (msg: string) => void }; }) { + try { + const stateDir = resolveStateDir(process.env); + const sessionDirs = await resolveAgentSessionDirs(stateDir); + for (const sessionsDir of sessionDirs) { + await cleanStaleLockFiles({ + sessionsDir, + staleMs: SESSION_LOCK_STALE_MS, + removeStale: true, + log: { warn: (message) => params.log.warn(message) }, + }); + } + } catch (err) { + params.log.warn(`session lock cleanup failed on startup: ${String(err)}`); + } + // Start OpenClaw browser control server (unless disabled via config). let browserControl: Awaited> = null; try {