From e1978cf73cbdc8dd9cb8caee06ed6e6669957f9b Mon Sep 17 00:00:00 2001 From: Josh Avant <830519+joshavant@users.noreply.github.com> Date: Tue, 9 Jun 2026 00:37:16 -0500 Subject: [PATCH] fix main session startup recovery (#91566) --- .../main-session-restart-recovery.test.ts | 234 ++++++++++++++++ src/agents/main-session-restart-recovery.ts | 254 ++++++++++++++++-- .../server-startup-post-attach.test.ts | 123 +++++++++ src/gateway/server-startup-post-attach.ts | 29 +- 4 files changed, 607 insertions(+), 33 deletions(-) diff --git a/src/agents/main-session-restart-recovery.test.ts b/src/agents/main-session-restart-recovery.test.ts index ed2953978c0..d1a96e6f994 100644 --- a/src/agents/main-session-restart-recovery.test.ts +++ b/src/agents/main-session-restart-recovery.test.ts @@ -16,6 +16,8 @@ import { import { markRestartAbortedMainSessions, markRestartAbortedMainSessionsFromLocks, + markStartupOrphanedMainSessionsForRecovery, + recoverStartupOrphanedMainSessions, recoverRestartAbortedMainSessions, } from "./main-session-restart-recovery.js"; import type { SessionLockInspection } from "./session-write-lock.js"; @@ -637,6 +639,37 @@ describe("main-session-restart-recovery", () => { expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe("The final answer is 42."); }); + it("resumes pending final delivery even when the transcript tail is assistant output", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + pendingFinalDelivery: true, + pendingFinalDeliveryText: "assistant final was already captured", + pendingFinalDeliveryCreatedAt: Date.now() - 5_000, + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "finish" }, + { role: "assistant", content: "assistant final was already captured" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(callGateway).toHaveBeenCalledOnce(); + expect(firstGatewayParams().message).toContain("assistant final was already captured"); + const store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.status).toBe("running"); + expect(store["agent:main:main"]?.pendingFinalDelivery).toBe(true); + expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe( + "assistant final was already captured", + ); + }); + it("does not scan ordinary running sessions without the restart-aborted marker", async () => { const sessionsDir = await makeSessionsDir(); await writeStore(sessionsDir, { @@ -657,6 +690,207 @@ describe("main-session-restart-recovery", () => { expect(callGateway).not.toHaveBeenCalled(); }); + it("skips restart-aborted sessions that a current process owns", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:active-key": { + sessionId: "active-key-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + }, + "agent:main:active-id": { + sessionId: "active-id-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + }, + "agent:main:recoverable": { + sessionId: "recoverable-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + }, + }); + await writeTranscript(sessionsDir, "active-key-session", [ + { role: "user", content: "new run owns this key" }, + { role: "toolResult", content: "done" }, + ]); + await writeTranscript(sessionsDir, "active-id-session", [ + { role: "user", content: "new run owns this id" }, + { role: "toolResult", content: "done" }, + ]); + await writeTranscript(sessionsDir, "recoverable-session", [ + { role: "user", content: "recover this one" }, + { role: "toolResult", content: "done" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ + stateDir: tmpDir, + activeSessionKeys: ["agent:main:active-key"], + activeSessionIds: ["active-key-session", "active-id-session"], + }); + + expect(result).toEqual({ recovered: 1, failed: 0, skipped: 2 }); + expect(callGateway).toHaveBeenCalledOnce(); + const store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:active-key"]?.abortedLastRun).toBe(true); + expect(store["agent:main:active-id"]?.abortedLastRun).toBe(true); + expect(store["agent:main:recoverable"]?.abortedLastRun).toBe(false); + }); + + it("recovers duplicate-key restart-aborted rows when the active run owns a different session id", async () => { + const sessionsDir = await makeSessionsDir(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "stale-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + }, + }); + await writeTranscript(sessionsDir, "stale-session", [ + { role: "user", content: "recover the stale duplicate" }, + { role: "toolResult", content: "done" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ + stateDir: tmpDir, + activeSessionKeys: ["agent:main:main"], + activeSessionIds: ["new-current-session"], + }); + + expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(callGateway).toHaveBeenCalledOnce(); + const store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.abortedLastRun).toBe(false); + }); + + it("marks startup-orphaned running main sessions before recovery", async () => { + const sessionsDir = await makeSessionsDir(); + const cutoff = Date.now(); + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: cutoff - 10_000, + status: "running", + }, + "agent:main:active-key": { + sessionId: "active-key-session", + updatedAt: cutoff - 10_000, + status: "running", + }, + "agent:main:active-id": { + sessionId: "active-id-session", + updatedAt: cutoff - 10_000, + status: "running", + }, + "agent:main:fresh": { + sessionId: "fresh-session", + updatedAt: cutoff + 1, + status: "running", + }, + "agent:main:subagent:child": { + sessionId: "child-session", + updatedAt: cutoff - 10_000, + status: "running", + spawnDepth: 1, + }, + "agent:main:cron:nightly": { + sessionId: "cron-session", + updatedAt: cutoff - 10_000, + status: "running", + }, + "agent:main:completed": { + sessionId: "completed-session", + updatedAt: cutoff - 10_000, + status: "done", + }, + "agent:main:already-marked": { + sessionId: "already-marked-session", + updatedAt: cutoff - 10_000, + status: "running", + abortedLastRun: true, + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "run the tool" }, + { role: "toolResult", content: "done" }, + ]); + await writeTranscript(sessionsDir, "already-marked-session", [ + { role: "user", content: "already interrupted" }, + { role: "toolResult", content: "done" }, + ]); + + const marked = await markStartupOrphanedMainSessionsForRecovery({ + stateDir: tmpDir, + activeSessionKeys: ["agent:main:active-key"], + activeSessionIds: ["active-key-session", "active-id-session"], + updatedBeforeMs: cutoff, + }); + + expect(marked).toEqual({ marked: 1, skipped: 2 }); + let store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.abortedLastRun).toBe(true); + expect(store["agent:main:active-key"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:active-id"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:fresh"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:subagent:child"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:cron:nightly"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:completed"]?.abortedLastRun).toBeUndefined(); + expect(store["agent:main:already-marked"]?.abortedLastRun).toBe(true); + + const recovered = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(recovered).toEqual({ recovered: 2, failed: 0, skipped: 0 }); + expect(callGateway).toHaveBeenCalledTimes(2); + store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.abortedLastRun).toBe(false); + expect(store["agent:main:already-marked"]?.abortedLastRun).toBe(false); + }); + + it("recovers only the configured store for duplicate startup-orphaned session keys", async () => { + const cutoff = Date.now(); + const defaultSessionsDir = await makeSessionsDir(); + await writeStore(defaultSessionsDir, { + "agent:main:main": { + sessionId: "default-main-session", + updatedAt: cutoff - 10_000, + status: "running", + }, + }); + await writeTranscript(defaultSessionsDir, "default-main-session", [ + { role: "user", content: "continue default" }, + { role: "toolResult", content: "default result" }, + ]); + + const customStorePath = path.join(tmpDir, "custom-startup-duplicate", "sessions.json"); + await writeSessionStoreForTestAsync(customStorePath, { + "agent:main:main": { + sessionId: "custom-main-session", + updatedAt: cutoff - 10_000, + status: "running", + }, + }); + await writeTranscript(path.dirname(customStorePath), "custom-main-session", [ + { role: "user", content: "continue custom" }, + { role: "toolResult", content: "custom result" }, + ]); + + const result = await recoverStartupOrphanedMainSessions({ + cfg: { session: { store: customStorePath } }, + stateDir: tmpDir, + updatedBeforeMs: cutoff, + }); + + expect(result).toEqual({ marked: 2, recovered: 1, failed: 0, skipped: 1 }); + expect(callGateway).toHaveBeenCalledOnce(); + const defaultStore = readSessionStoreForTest(path.join(defaultSessionsDir, "sessions.json")); + const customStore = readSessionStoreForTest(customStorePath); + expect(defaultStore["agent:main:main"]?.abortedLastRun).toBe(true); + expect(customStore["agent:main:main"]?.abortedLastRun).toBe(false); + }); + it("fails marked sessions whose transcript tail cannot be resumed", async () => { const sessionsDir = await makeSessionsDir(); await writeStore(sessionsDir, { diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts index 7a5f32c2714..3855405e65c 100644 --- a/src/agents/main-session-restart-recovery.ts +++ b/src/agents/main-session-restart-recovery.ts @@ -30,6 +30,10 @@ import { type DeliveryContext, } from "../utils/delivery-context.shared.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { + listActiveEmbeddedRunSessionIds, + listActiveEmbeddedRunSessionKeys, +} from "./embedded-agent-runner/run-state.js"; import { resolveAgentSessionDirs } from "./session-dirs.js"; import type { SessionLockInspection } from "./session-write-lock.js"; @@ -65,6 +69,22 @@ function normalizeStringSet(values: Iterable | undefined): Set { return normalized; } +function normalizeFiniteTimestamp(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function hasCurrentProcessOwner(params: { + activeSessionIds: Set; + activeSessionKeys: Set; + entry: SessionEntry; + sessionKey: string; +}): boolean { + if (params.activeSessionIds.has(params.entry.sessionId)) { + return true; + } + return params.activeSessionIds.size === 0 && params.activeSessionKeys.has(params.sessionKey); +} + function normalizeTranscriptLockPath(lockPath: string): string | undefined { const trimmed = lockPath.trim(); if (!path.basename(trimmed).endsWith(".jsonl.lock")) { @@ -197,6 +217,72 @@ export async function markRestartAbortedMainSessions(params: { return result; } +export async function markStartupOrphanedMainSessionsForRecovery(params: { + cfg?: OpenClawConfig; + stateDir?: string; + activeSessionIds?: Iterable; + activeSessionKeys?: Iterable; + updatedBeforeMs?: number; +}): Promise<{ marked: number; skipped: number }> { + const result = { marked: 0, skipped: 0 }; + const providedActiveSessionIds = + params.activeSessionIds === undefined ? undefined : normalizeStringSet(params.activeSessionIds); + const providedActiveSessionKeys = + params.activeSessionKeys === undefined + ? undefined + : normalizeStringSet(params.activeSessionKeys); + const updatedBeforeMs = normalizeFiniteTimestamp(params.updatedBeforeMs); + const resolveActiveSessionIds = () => + providedActiveSessionIds ?? normalizeStringSet(listActiveEmbeddedRunSessionIds()); + const resolveActiveSessionKeys = () => + providedActiveSessionKeys ?? normalizeStringSet(listActiveEmbeddedRunSessionKeys()); + + for (const storePath of await resolveRestartRecoveryStorePaths(params)) { + await updateSessionStore( + storePath, + (store) => { + for (const [sessionKey, entry] of Object.entries(store)) { + if (!entry || entry.status !== "running" || entry.abortedLastRun === true) { + continue; + } + if (shouldSkipMainRecovery(entry, sessionKey)) { + result.skipped++; + continue; + } + const updatedAt = normalizeFiniteTimestamp(entry.updatedAt); + if ( + updatedBeforeMs !== undefined && + updatedAt !== undefined && + updatedAt > updatedBeforeMs + ) { + continue; + } + if ( + hasCurrentProcessOwner({ + activeSessionIds: resolveActiveSessionIds(), + activeSessionKeys: resolveActiveSessionKeys(), + entry, + sessionKey, + }) + ) { + continue; + } + entry.abortedLastRun = true; + entry.updatedAt = Date.now(); + store[sessionKey] = entry; + result.marked++; + } + }, + { skipMaintenance: true }, + ); + } + + if (result.marked > 0) { + log.warn(`marked ${result.marked} startup-orphaned main session(s) for restart recovery`); + } + return result; +} + function getMessageRole(message: unknown): string | undefined { if (!message || typeof message !== "object") { return undefined; @@ -504,12 +590,48 @@ export async function markRestartAbortedMainSessionsFromLocks(params: { return result; } +function isRoutableRecoveryStore(params: { + cfg?: OpenClawConfig; + sessionKey: string; + storePath: string; +}): boolean { + if (!params.cfg) { + return true; + } + if (!params.cfg.session?.store) { + return true; + } + try { + const target = resolveGatewaySessionStoreTarget({ + cfg: params.cfg, + key: params.sessionKey, + scanLegacyKeys: true, + }); + return path.resolve(target.storePath) === path.resolve(params.storePath); + } catch (err) { + log.warn(`failed to resolve recovery store for ${params.sessionKey}: ${String(err)}`); + return false; + } +} + async function recoverStore(params: { cfg?: OpenClawConfig; storePath: string; resumedSessionKeys: Set; + activeSessionIds?: Iterable; + activeSessionKeys?: Iterable; }): Promise<{ recovered: number; failed: number; skipped: number }> { const result = { recovered: 0, failed: 0, skipped: 0 }; + const providedActiveSessionIds = + params.activeSessionIds === undefined ? undefined : normalizeStringSet(params.activeSessionIds); + const providedActiveSessionKeys = + params.activeSessionKeys === undefined + ? undefined + : normalizeStringSet(params.activeSessionKeys); + const resolveActiveSessionIds = () => + providedActiveSessionIds ?? normalizeStringSet(listActiveEmbeddedRunSessionIds()); + const resolveActiveSessionKeys = () => + providedActiveSessionKeys ?? normalizeStringSet(listActiveEmbeddedRunSessionKeys()); let store: Record; try { store = loadSessionStore(params.storePath); @@ -529,10 +651,49 @@ async function recoverStore(params: { result.skipped++; continue; } - if (params.resumedSessionKeys.has(sessionKey)) { + if ( + !isRoutableRecoveryStore({ + cfg: params.cfg, + sessionKey, + storePath: params.storePath, + }) + ) { result.skipped++; continue; } + if ( + hasCurrentProcessOwner({ + activeSessionIds: resolveActiveSessionIds(), + activeSessionKeys: resolveActiveSessionKeys(), + entry, + sessionKey, + }) + ) { + result.skipped++; + continue; + } + const resumeDedupeKey = sessionKey; + if (params.resumedSessionKeys.has(resumeDedupeKey)) { + result.skipped++; + continue; + } + + if (entry.pendingFinalDelivery === true && entry.pendingFinalDeliveryText) { + const resumed = await resumeMainSession({ + cfg: params.cfg, + entry, + storePath: params.storePath, + sessionKey, + pendingFinalDeliveryText: entry.pendingFinalDeliveryText, + }); + if (resumed) { + params.resumedSessionKeys.add(resumeDedupeKey); + result.recovered++; + } else { + result.failed++; + } + continue; + } let messages: unknown[]; try { @@ -577,7 +738,7 @@ async function recoverStore(params: { pendingFinalDeliveryText: entry.pendingFinalDeliveryText, }); if (resumed) { - params.resumedSessionKeys.add(sessionKey); + params.resumedSessionKeys.add(resumeDedupeKey); result.recovered++; } else { result.failed++; @@ -610,6 +771,8 @@ export async function recoverRestartAbortedMainSessions( cfg?: OpenClawConfig; stateDir?: string; resumedSessionKeys?: Set; + activeSessionIds?: Iterable; + activeSessionKeys?: Iterable; } = {}, ): Promise<{ recovered: number; failed: number; skipped: number }> { const result = { recovered: 0, failed: 0, skipped: 0 }; @@ -620,6 +783,8 @@ export async function recoverRestartAbortedMainSessions( cfg: params.cfg, storePath, resumedSessionKeys, + activeSessionIds: params.activeSessionIds, + activeSessionKeys: params.activeSessionKeys, }); result.recovered += storeResult.recovered; result.failed += storeResult.failed; @@ -634,6 +799,39 @@ export async function recoverRestartAbortedMainSessions( return result; } +export async function recoverStartupOrphanedMainSessions( + params: { + cfg?: OpenClawConfig; + stateDir?: string; + activeSessionIds?: Iterable; + activeSessionKeys?: Iterable; + updatedBeforeMs?: number; + resumedSessionKeys?: Set; + } = {}, +): Promise<{ marked: number; recovered: number; failed: number; skipped: number }> { + const startupRecoveryCutoffMs = params.updatedBeforeMs ?? Date.now(); + const marked = await markStartupOrphanedMainSessionsForRecovery({ + cfg: params.cfg, + stateDir: params.stateDir, + activeSessionIds: params.activeSessionIds, + activeSessionKeys: params.activeSessionKeys, + updatedBeforeMs: startupRecoveryCutoffMs, + }); + const recovered = await recoverRestartAbortedMainSessions({ + cfg: params.cfg, + stateDir: params.stateDir, + resumedSessionKeys: params.resumedSessionKeys, + activeSessionIds: params.activeSessionIds, + activeSessionKeys: params.activeSessionKeys, + }); + return { + marked: marked.marked, + recovered: recovered.recovered, + failed: recovered.failed, + skipped: marked.skipped + recovered.skipped, + }; +} + export function scheduleRestartAbortedMainSessionRecovery( params: { cfg?: OpenClawConfig; @@ -645,29 +843,41 @@ export function scheduleRestartAbortedMainSessionRecovery( const initialDelay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS; const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES; const resumedSessionKeys = new Set(); + // Only reconcile rows that existed before this startup recovery was scheduled. + // Fresh runs started by this gateway are protected again by the active-run check. + const startupRecoveryCutoffMs = Date.now(); - const attemptRecovery = (attempt: number, delay: number) => { - setTimeout(() => { - void recoverRestartAbortedMainSessions({ - cfg: params.cfg, - stateDir: params.stateDir, - resumedSessionKeys, + const runRecoveryAttempt = (attempt: number, delay: number) => { + void recoverStartupOrphanedMainSessions({ + cfg: params.cfg, + stateDir: params.stateDir, + resumedSessionKeys, + updatedBeforeMs: startupRecoveryCutoffMs, + }) + .then((result) => { + if (result.failed > 0 && attempt < maxRetries) { + scheduleAttempt(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER); + } }) - .then((result) => { - if (result.failed > 0 && attempt < maxRetries) { - attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER); - } - }) - .catch((err: unknown) => { - if (attempt < maxRetries) { - log.warn(`main-session restart recovery failed: ${String(err)}`); - attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER); - } else { - log.warn(`main-session restart recovery gave up: ${String(err)}`); - } - }); + .catch((err: unknown) => { + if (attempt < maxRetries) { + log.warn(`main-session restart recovery failed: ${String(err)}`); + scheduleAttempt(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER); + } else { + log.warn(`main-session restart recovery gave up: ${String(err)}`); + } + }); + }; + + const scheduleAttempt = (attempt: number, delay: number) => { + if (delay <= 0) { + runRecoveryAttempt(attempt, delay); + return; + } + setTimeout(() => { + runRecoveryAttempt(attempt, delay); }, delay).unref?.(); }; - attemptRecovery(1, initialDelay); + scheduleAttempt(1, initialDelay); } diff --git a/src/gateway/server-startup-post-attach.test.ts b/src/gateway/server-startup-post-attach.test.ts index ee717a9db77..063817f2edd 100644 --- a/src/gateway/server-startup-post-attach.test.ts +++ b/src/gateway/server-startup-post-attach.test.ts @@ -26,6 +26,18 @@ const hoisted = vi.hoisted(() => { const startGatewayTailscaleExposure = vi.fn(async () => null); const logGatewayStartup = vi.fn(); const scheduleSubagentOrphanRecovery = vi.fn(); + const markRestartAbortedMainSessionsFromLocks = vi.fn(async () => {}); + const markStartupOrphanedMainSessionsForRecovery = vi.fn(async () => ({ + marked: 0, + skipped: 0, + })); + const recoverStartupOrphanedMainSessions = vi.fn(async () => ({ + marked: 0, + recovered: 0, + failed: 0, + skipped: 0, + })); + const scheduleRestartAbortedMainSessionRecovery = vi.fn(); const shouldWakeFromRestartSentinel = vi.fn(() => false); const scheduleRestartSentinelWake = vi.fn(); const refreshLatestUpdateRestartSentinel = vi.fn< @@ -75,6 +87,10 @@ const hoisted = vi.hoisted(() => { startGatewayTailscaleExposure, logGatewayStartup, scheduleSubagentOrphanRecovery, + markRestartAbortedMainSessionsFromLocks, + markStartupOrphanedMainSessionsForRecovery, + recoverStartupOrphanedMainSessions, + scheduleRestartAbortedMainSessionRecovery, shouldWakeFromRestartSentinel, scheduleRestartSentinelWake, refreshLatestUpdateRestartSentinel, @@ -107,6 +123,13 @@ vi.mock("../agents/subagent-registry.js", () => ({ scheduleSubagentOrphanRecovery: hoisted.scheduleSubagentOrphanRecovery, })); +vi.mock("../agents/main-session-restart-recovery.js", () => ({ + markRestartAbortedMainSessionsFromLocks: hoisted.markRestartAbortedMainSessionsFromLocks, + markStartupOrphanedMainSessionsForRecovery: hoisted.markStartupOrphanedMainSessionsForRecovery, + recoverStartupOrphanedMainSessions: hoisted.recoverStartupOrphanedMainSessions, + scheduleRestartAbortedMainSessionRecovery: hoisted.scheduleRestartAbortedMainSessionRecovery, +})); + vi.mock("../config/paths.js", async () => { const actual = await vi.importActual("../config/paths.js"); return { @@ -290,6 +313,20 @@ describe("startGatewayPostAttachRuntime", () => { hoisted.startGatewayTailscaleExposure.mockClear(); hoisted.logGatewayStartup.mockClear(); hoisted.scheduleSubagentOrphanRecovery.mockClear(); + hoisted.markRestartAbortedMainSessionsFromLocks.mockClear(); + hoisted.markStartupOrphanedMainSessionsForRecovery.mockReset(); + hoisted.markStartupOrphanedMainSessionsForRecovery.mockResolvedValue({ + marked: 0, + skipped: 0, + }); + hoisted.recoverStartupOrphanedMainSessions.mockReset(); + hoisted.recoverStartupOrphanedMainSessions.mockResolvedValue({ + marked: 0, + recovered: 0, + failed: 0, + skipped: 0, + }); + hoisted.scheduleRestartAbortedMainSessionRecovery.mockClear(); hoisted.shouldWakeFromRestartSentinel.mockReturnValue(false); hoisted.scheduleRestartSentinelWake.mockClear(); hoisted.getAcpRuntimeBackend.mockReset(); @@ -348,6 +385,9 @@ describe("startGatewayPostAttachRuntime", () => { expect(hoisted.logGatewayStartup).toHaveBeenCalledTimes(1); expect(firstStartupLog().loadedPluginIds).toEqual(["beta", "alpha"]); expect(log.info).toHaveBeenCalledWith("gateway ready"); + expect(hoisted.scheduleRestartAbortedMainSessionRecovery).toHaveBeenCalledWith({ + cfg: { hooks: { internal: { enabled: false } } }, + }); expect(hoisted.startGatewayMemoryBackend).not.toHaveBeenCalled(); }); @@ -1483,6 +1523,89 @@ describe("startGatewayPostAttachRuntime", () => { ); }); + it("marks startup main-session orphans before channel startup", async () => { + const events: string[] = []; + let releaseMarking: (() => void) | undefined; + const startChannels = vi.fn(async () => { + events.push("channels"); + }); + hoisted.markStartupOrphanedMainSessionsForRecovery.mockImplementationOnce( + async () => + await new Promise<{ marked: number; skipped: number }>((resolve) => { + events.push("main-session-mark:start"); + releaseMarking = () => { + events.push("main-session-mark:done"); + resolve({ marked: 1, skipped: 0 }); + }; + }), + ); + + const sidecars = startGatewaySidecars({ + cfg: { hooks: { internal: { enabled: false } } } as never, + pluginRegistry: createPostAttachParams().pluginRegistry, + defaultWorkspaceDir: "/tmp/openclaw-workspace", + deps: {} as never, + startChannels, + log: { warn: vi.fn() }, + logHooks: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, + logChannels: { + info: vi.fn(), + error: vi.fn(), + }, + }); + + await vi.waitFor(() => { + expect(events).toEqual(["main-session-mark:start"]); + }); + expect(startChannels).not.toHaveBeenCalled(); + + if (!releaseMarking) { + throw new Error("Expected marker release callback to be initialized"); + } + releaseMarking(); + await sidecars; + + expect(events).toEqual(["main-session-mark:start", "main-session-mark:done", "channels"]); + expect(startChannels).toHaveBeenCalledTimes(1); + expect(hoisted.scheduleRestartAbortedMainSessionRecovery).not.toHaveBeenCalled(); + }); + + it("logs startup main-session marker failures and still starts channels", async () => { + const log = { warn: vi.fn() }; + const startChannels = vi.fn(async () => {}); + hoisted.markStartupOrphanedMainSessionsForRecovery.mockRejectedValueOnce( + new Error("store unreadable"), + ); + + await startGatewaySidecars({ + cfg: { hooks: { internal: { enabled: false } } } as never, + pluginRegistry: createPostAttachParams().pluginRegistry, + defaultWorkspaceDir: "/tmp/openclaw-workspace", + deps: {} as never, + startChannels, + log, + logHooks: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, + logChannels: { + info: vi.fn(), + error: vi.fn(), + }, + }); + + expect(log.warn).toHaveBeenCalledWith( + "main-session startup orphan marking failed before channel startup: Error: store unreadable", + ); + expect(hoisted.scheduleRestartAbortedMainSessionRecovery).not.toHaveBeenCalled(); + expect(startChannels).toHaveBeenCalledTimes(1); + }); + it("emits a sidecar readiness summary in startup trace details", async () => { const trace = createStartupTraceRecorder(); diff --git a/src/gateway/server-startup-post-attach.ts b/src/gateway/server-startup-post-attach.ts index 3f2f9d8b251..9cd1d30296c 100644 --- a/src/gateway/server-startup-post-attach.ts +++ b/src/gateway/server-startup-post-attach.ts @@ -781,6 +781,17 @@ export async function startGatewaySidecars(params: { const skipChannels = isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) || isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS); + await measureStartup(params.startupTrace, "sidecars.main-session-recovery", async () => { + try { + const { markStartupOrphanedMainSessionsForRecovery } = + await loadMainSessionRestartRecoveryModule(); + await markStartupOrphanedMainSessionsForRecovery({ cfg: params.cfg }); + } catch (err) { + params.log.warn( + `main-session startup orphan marking failed before channel startup: ${String(err)}`, + ); + } + }); await measureStartup(params.startupTrace, "sidecars.channels", async () => { if (!skipChannels) { try { @@ -947,17 +958,6 @@ export async function startGatewaySidecars(params: { }, }); - schedulePostReadySidecarTask({ - startupTrace: params.startupTrace, - name: "sidecars.main-session-recovery", - log: params.log, - run: async () => { - const { scheduleRestartAbortedMainSessionRecovery } = - await loadMainSessionRestartRecoveryModule(); - scheduleRestartAbortedMainSessionRecovery({ cfg: params.cfg }); - }, - }); - if (params.cfg.hooks?.enabled && params.cfg.hooks.gmail?.account) { postReadySidecars.push( schedulePostReadySidecarTask({ @@ -1333,6 +1333,13 @@ export async function startGatewayPostAttachRuntime( for (const method of STARTUP_UNAVAILABLE_GATEWAY_METHODS) { params.unavailableGatewayMethods.delete(method); } + try { + const { scheduleRestartAbortedMainSessionRecovery } = + await loadMainSessionRestartRecoveryModule(); + scheduleRestartAbortedMainSessionRecovery({ cfg: params.cfgAtStart }); + } catch (err) { + params.log.warn(`main-session restart recovery failed to schedule: ${String(err)}`); + } if (!pluginServicesReported) { reportPluginServices(result.pluginServices); }