mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:40:43 +00:00
fix: recover restart-aborted main sessions
This commit is contained in:
@@ -63,6 +63,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Gateway/sessions: recover main-agent turns interrupted by a gateway restart from stale transcript-lock evidence, avoiding stuck `status: "running"` sessions without broad post-boot transcript scans. Fixes #70555. Thanks @bitloi.
|
||||
- Plugins/Google Meet: include live Chrome-node readiness in `googlemeet setup` and document the Parallels recovery checks, so stale node tokens or disconnected VM browsers are visible before an agent opens a meeting. Thanks @steipete.
|
||||
- Codex approvals: compact home-directory permission paths to `~` without repeating them as a separate high-risk warning, while preserving filesystem root and wildcard host warnings. Thanks @steipete.
|
||||
- Plugins/runtime deps: isolate the internal npm cache used for bundled plugin runtime-dependency repair and let package updates refresh/verify already-current installs, so failed update or sudo doctor runs can be repaired by rerunning `openclaw update`. Thanks @steipete.
|
||||
|
||||
169
src/agents/main-session-restart-recovery.test.ts
Normal file
169
src/agents/main-session-restart-recovery.test.ts
Normal file
@@ -0,0 +1,169 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { loadSessionStore, type SessionEntry } from "../config/sessions.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import {
|
||||
markRestartAbortedMainSessionsFromLocks,
|
||||
recoverRestartAbortedMainSessions,
|
||||
} from "./main-session-restart-recovery.js";
|
||||
import type { SessionLockInspection } from "./session-write-lock.js";
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn(async () => ({ runId: "run-resumed" })),
|
||||
}));
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-main-restart-recovery-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
async function makeSessionsDir(agentId = "main"): Promise<string> {
|
||||
const sessionsDir = path.join(tmpDir, "agents", agentId, "sessions");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
return sessionsDir;
|
||||
}
|
||||
|
||||
async function writeStore(sessionsDir: string, store: Record<string, SessionEntry>): Promise<void> {
|
||||
await fs.writeFile(path.join(sessionsDir, "sessions.json"), JSON.stringify(store, null, 2));
|
||||
}
|
||||
|
||||
async function writeTranscript(
|
||||
sessionsDir: string,
|
||||
sessionId: string,
|
||||
messages: unknown[],
|
||||
): Promise<void> {
|
||||
const lines = messages.map((message) => JSON.stringify({ message })).join("\n");
|
||||
await fs.writeFile(path.join(sessionsDir, `${sessionId}.jsonl`), `${lines}\n`);
|
||||
}
|
||||
|
||||
function cleanedLock(sessionsDir: string, sessionId: string): SessionLockInspection {
|
||||
return {
|
||||
lockPath: path.join(sessionsDir, `${sessionId}.jsonl.lock`),
|
||||
pid: 999_999,
|
||||
pidAlive: false,
|
||||
createdAt: new Date(Date.now() - 1_000).toISOString(),
|
||||
ageMs: 1_000,
|
||||
stale: true,
|
||||
staleReasons: ["dead-pid"],
|
||||
removed: true,
|
||||
};
|
||||
}
|
||||
|
||||
describe("main-session-restart-recovery", () => {
|
||||
it("marks only main running sessions whose transcript lock was cleaned", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "main-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
"agent:main:subagent:child": {
|
||||
sessionId: "child-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
spawnDepth: 1,
|
||||
},
|
||||
"agent:main:other": {
|
||||
sessionId: "other-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
});
|
||||
|
||||
const result = await markRestartAbortedMainSessionsFromLocks({
|
||||
sessionsDir,
|
||||
cleanedLocks: [
|
||||
cleanedLock(sessionsDir, "main-session"),
|
||||
cleanedLock(sessionsDir, "child-session"),
|
||||
],
|
||||
});
|
||||
|
||||
const store = loadSessionStore(path.join(sessionsDir, "sessions.json"));
|
||||
expect(result).toEqual({ marked: 1, skipped: 1 });
|
||||
expect(store["agent:main:main"]?.abortedLastRun).toBe(true);
|
||||
expect(store["agent:main:subagent:child"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:other"]?.abortedLastRun).toBeUndefined();
|
||||
});
|
||||
|
||||
it("resumes marked sessions with a tool-result transcript tail", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "main-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "main-session", [
|
||||
{ role: "user", content: "run the tool" },
|
||||
{ role: "assistant", content: [{ type: "toolCall", id: "call-1", name: "exec" }] },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
|
||||
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
|
||||
|
||||
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
|
||||
expect(callGateway).toHaveBeenCalledOnce();
|
||||
expect(vi.mocked(callGateway).mock.calls[0]?.[0].params).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
deliver: false,
|
||||
lane: "main",
|
||||
});
|
||||
const store = loadSessionStore(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:main"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("does not scan ordinary running sessions without the restart-aborted marker", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "main-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "main-session", [
|
||||
{ role: "user", content: "current process owns this" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
|
||||
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
|
||||
|
||||
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
|
||||
expect(callGateway).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("fails marked sessions whose transcript tail cannot be resumed", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "main-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "main-session", [
|
||||
{ role: "user", content: "hello" },
|
||||
{ role: "assistant", content: "partial answer" },
|
||||
]);
|
||||
|
||||
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
|
||||
|
||||
expect(result).toEqual({ recovered: 0, failed: 1, skipped: 0 });
|
||||
expect(callGateway).not.toHaveBeenCalled();
|
||||
const store = loadSessionStore(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:main"]?.status).toBe("failed");
|
||||
expect(store["agent:main:main"]?.abortedLastRun).toBe(true);
|
||||
});
|
||||
});
|
||||
307
src/agents/main-session-restart-recovery.ts
Normal file
307
src/agents/main-session-restart-recovery.ts
Normal file
@@ -0,0 +1,307 @@
|
||||
/**
|
||||
* Post-restart recovery for main sessions interrupted while holding a transcript lock.
|
||||
*/
|
||||
|
||||
import crypto from "node:crypto";
|
||||
import path from "node:path";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { type SessionEntry, loadSessionStore, updateSessionStore } from "../config/sessions.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { readSessionMessages } from "../gateway/session-utils.fs.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js";
|
||||
import { resolveAgentSessionDirs } from "./session-dirs.js";
|
||||
import type { SessionLockInspection } from "./session-write-lock.js";
|
||||
|
||||
const log = createSubsystemLogger("main-session-restart-recovery");
|
||||
|
||||
const DEFAULT_RECOVERY_DELAY_MS = 5_000;
|
||||
const MAX_RECOVERY_RETRIES = 3;
|
||||
const RETRY_BACKOFF_MULTIPLIER = 2;
|
||||
|
||||
function shouldSkipMainRecovery(entry: SessionEntry, sessionKey: string): boolean {
|
||||
if (typeof entry.spawnDepth === "number" && entry.spawnDepth > 0) {
|
||||
return true;
|
||||
}
|
||||
if (entry.subagentRole != null) {
|
||||
return true;
|
||||
}
|
||||
return (
|
||||
isSubagentSessionKey(sessionKey) || isCronSessionKey(sessionKey) || isAcpSessionKey(sessionKey)
|
||||
);
|
||||
}
|
||||
|
||||
function sessionIdFromLockPath(lockPath: string): string | undefined {
|
||||
const fileName = path.basename(lockPath);
|
||||
if (!fileName.endsWith(".jsonl.lock")) {
|
||||
return undefined;
|
||||
}
|
||||
const sessionId = fileName.slice(0, -".jsonl.lock".length).trim();
|
||||
return sessionId || undefined;
|
||||
}
|
||||
|
||||
function getMessageRole(message: unknown): string | undefined {
|
||||
if (!message || typeof message !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const role = (message as { role?: unknown }).role;
|
||||
return typeof role === "string" ? role : undefined;
|
||||
}
|
||||
|
||||
function isMeaningfulTailMessage(message: unknown): boolean {
|
||||
const role = getMessageRole(message);
|
||||
if (!role || role === "system") {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function isResumableTailMessage(message: unknown): boolean {
|
||||
const role = getMessageRole(message);
|
||||
return role === "user" || role === "tool" || role === "toolResult";
|
||||
}
|
||||
|
||||
function isMainSessionResumable(messages: unknown[]): boolean {
|
||||
const lastMeaningful = messages.toReversed().find(isMeaningfulTailMessage);
|
||||
return lastMeaningful ? isResumableTailMessage(lastMeaningful) : false;
|
||||
}
|
||||
|
||||
function buildResumeMessage(): string {
|
||||
return (
|
||||
"[System] Your previous turn was interrupted by a gateway restart while " +
|
||||
"OpenClaw was waiting on tool/model work. Continue from the existing " +
|
||||
"transcript and finish the interrupted response."
|
||||
);
|
||||
}
|
||||
|
||||
async function markSessionFailed(params: {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
reason: string;
|
||||
}): Promise<void> {
|
||||
await updateSessionStore(
|
||||
params.storePath,
|
||||
(store) => {
|
||||
const entry = store[params.sessionKey];
|
||||
if (!entry || entry.status !== "running") {
|
||||
return;
|
||||
}
|
||||
entry.status = "failed";
|
||||
entry.abortedLastRun = true;
|
||||
entry.endedAt = Date.now();
|
||||
entry.updatedAt = entry.endedAt;
|
||||
store[params.sessionKey] = entry;
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
log.warn(`marked interrupted main session failed: ${params.sessionKey} (${params.reason})`);
|
||||
}
|
||||
|
||||
async function resumeMainSession(params: {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
}): Promise<boolean> {
|
||||
try {
|
||||
await callGateway<{ runId: string }>({
|
||||
method: "agent",
|
||||
params: {
|
||||
message: buildResumeMessage(),
|
||||
sessionKey: params.sessionKey,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
deliver: false,
|
||||
lane: CommandLane.Main,
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
await updateSessionStore(
|
||||
params.storePath,
|
||||
(store) => {
|
||||
const entry = store[params.sessionKey];
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
entry.abortedLastRun = false;
|
||||
entry.updatedAt = Date.now();
|
||||
store[params.sessionKey] = entry;
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
log.info(`resumed interrupted main session: ${params.sessionKey}`);
|
||||
return true;
|
||||
} catch (err) {
|
||||
log.warn(`failed to resume interrupted main session ${params.sessionKey}: ${String(err)}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export async function markRestartAbortedMainSessionsFromLocks(params: {
|
||||
sessionsDir: string;
|
||||
cleanedLocks: SessionLockInspection[];
|
||||
}): Promise<{ marked: number; skipped: number }> {
|
||||
const result = { marked: 0, skipped: 0 };
|
||||
const interruptedSessionIds = new Set(
|
||||
params.cleanedLocks
|
||||
.map((lock) => sessionIdFromLockPath(lock.lockPath))
|
||||
.filter((sessionId): sessionId is string => Boolean(sessionId)),
|
||||
);
|
||||
if (interruptedSessionIds.size === 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
const storePath = path.join(path.resolve(params.sessionsDir), "sessions.json");
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
if (!entry || entry.status !== "running") {
|
||||
continue;
|
||||
}
|
||||
if (shouldSkipMainRecovery(entry, sessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
if (!interruptedSessionIds.has(entry.sessionId)) {
|
||||
continue;
|
||||
}
|
||||
entry.abortedLastRun = true;
|
||||
store[sessionKey] = entry;
|
||||
result.marked++;
|
||||
}
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
|
||||
if (result.marked > 0) {
|
||||
log.warn(`marked ${result.marked} interrupted main session(s) from stale transcript locks`);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async function recoverStore(params: {
|
||||
storePath: string;
|
||||
resumedSessionKeys: Set<string>;
|
||||
}): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||
const result = { recovered: 0, failed: 0, skipped: 0 };
|
||||
let store: Record<string, SessionEntry>;
|
||||
try {
|
||||
store = loadSessionStore(params.storePath);
|
||||
} catch (err) {
|
||||
log.warn(`failed to load session store ${params.storePath}: ${String(err)}`);
|
||||
result.failed++;
|
||||
return result;
|
||||
}
|
||||
|
||||
for (const [sessionKey, entry] of Object.entries(store).toSorted(([a], [b]) =>
|
||||
a.localeCompare(b),
|
||||
)) {
|
||||
if (!entry || entry.status !== "running" || entry.abortedLastRun !== true) {
|
||||
continue;
|
||||
}
|
||||
if (shouldSkipMainRecovery(entry, sessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
if (params.resumedSessionKeys.has(sessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
let messages: unknown[];
|
||||
try {
|
||||
messages = readSessionMessages(entry.sessionId, params.storePath, entry.sessionFile);
|
||||
} catch (err) {
|
||||
log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`);
|
||||
result.failed++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!isMainSessionResumable(messages)) {
|
||||
await markSessionFailed({
|
||||
storePath: params.storePath,
|
||||
sessionKey,
|
||||
reason: "transcript tail is not resumable",
|
||||
});
|
||||
result.failed++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const resumed = await resumeMainSession({
|
||||
storePath: params.storePath,
|
||||
sessionKey,
|
||||
});
|
||||
if (resumed) {
|
||||
params.resumedSessionKeys.add(sessionKey);
|
||||
result.recovered++;
|
||||
} else {
|
||||
result.failed++;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function recoverRestartAbortedMainSessions(
|
||||
params: {
|
||||
stateDir?: string;
|
||||
resumedSessionKeys?: Set<string>;
|
||||
} = {},
|
||||
): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||
const result = { recovered: 0, failed: 0, skipped: 0 };
|
||||
const resumedSessionKeys = params.resumedSessionKeys ?? new Set<string>();
|
||||
const stateDir = params.stateDir ?? resolveStateDir(process.env);
|
||||
const sessionDirs = await resolveAgentSessionDirs(stateDir);
|
||||
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
const storeResult = await recoverStore({
|
||||
storePath: path.join(sessionsDir, "sessions.json"),
|
||||
resumedSessionKeys,
|
||||
});
|
||||
result.recovered += storeResult.recovered;
|
||||
result.failed += storeResult.failed;
|
||||
result.skipped += storeResult.skipped;
|
||||
}
|
||||
|
||||
if (result.recovered > 0 || result.failed > 0) {
|
||||
log.info(
|
||||
`main-session restart recovery complete: recovered=${result.recovered} failed=${result.failed} skipped=${result.skipped}`,
|
||||
);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export function scheduleRestartAbortedMainSessionRecovery(
|
||||
params: {
|
||||
delayMs?: number;
|
||||
maxRetries?: number;
|
||||
stateDir?: string;
|
||||
} = {},
|
||||
): void {
|
||||
const initialDelay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS;
|
||||
const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES;
|
||||
const resumedSessionKeys = new Set<string>();
|
||||
|
||||
const attemptRecovery = (attempt: number, delay: number) => {
|
||||
setTimeout(() => {
|
||||
void recoverRestartAbortedMainSessions({
|
||||
stateDir: params.stateDir,
|
||||
resumedSessionKeys,
|
||||
})
|
||||
.then((result) => {
|
||||
if (result.failed > 0 && attempt < maxRetries) {
|
||||
attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER);
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
if (attempt < maxRetries) {
|
||||
log.warn(`main-session restart recovery failed: ${String(err)}`);
|
||||
attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER);
|
||||
} else {
|
||||
log.warn(`main-session restart recovery gave up: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
}, delay).unref?.();
|
||||
};
|
||||
|
||||
attemptRecovery(1, initialDelay);
|
||||
}
|
||||
@@ -157,12 +157,20 @@ export async function startGatewaySidecars(params: {
|
||||
const stateDir = resolveStateDir(process.env);
|
||||
const sessionDirs = await resolveAgentSessionDirs(stateDir);
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
await cleanStaleLockFiles({
|
||||
const result = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: SESSION_LOCK_STALE_MS,
|
||||
removeStale: true,
|
||||
log: { warn: (message) => params.log.warn(message) },
|
||||
});
|
||||
if (result.cleaned.length > 0) {
|
||||
const { markRestartAbortedMainSessionsFromLocks } =
|
||||
await import("../agents/main-session-restart-recovery.js");
|
||||
await markRestartAbortedMainSessionsFromLocks({
|
||||
sessionsDir,
|
||||
cleanedLocks: result.cleaned,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`session lock cleanup failed on startup: ${String(err)}`);
|
||||
@@ -354,6 +362,12 @@ export async function startGatewaySidecars(params: {
|
||||
scheduleSubagentOrphanRecovery();
|
||||
});
|
||||
|
||||
await measureStartup(params.startupTrace, "sidecars.main-session-recovery", async () => {
|
||||
const { scheduleRestartAbortedMainSessionRecovery } =
|
||||
await import("../agents/main-session-restart-recovery.js");
|
||||
scheduleRestartAbortedMainSessionRecovery();
|
||||
});
|
||||
|
||||
return { pluginServices };
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user