mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-26 02:09:33 +00:00
fix main session startup recovery (#91566)
This commit is contained in:
@@ -16,6 +16,8 @@ import {
|
||||
import {
|
||||
markRestartAbortedMainSessions,
|
||||
markRestartAbortedMainSessionsFromLocks,
|
||||
markStartupOrphanedMainSessionsForRecovery,
|
||||
recoverStartupOrphanedMainSessions,
|
||||
recoverRestartAbortedMainSessions,
|
||||
} from "./main-session-restart-recovery.js";
|
||||
import type { SessionLockInspection } from "./session-write-lock.js";
|
||||
@@ -637,6 +639,37 @@ describe("main-session-restart-recovery", () => {
|
||||
expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe("The final answer is 42.");
|
||||
});
|
||||
|
||||
it("resumes pending final delivery even when the transcript tail is assistant output", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "main-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
pendingFinalDelivery: true,
|
||||
pendingFinalDeliveryText: "assistant final was already captured",
|
||||
pendingFinalDeliveryCreatedAt: Date.now() - 5_000,
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "main-session", [
|
||||
{ role: "user", content: "finish" },
|
||||
{ role: "assistant", content: "assistant final was already captured" },
|
||||
]);
|
||||
|
||||
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
|
||||
|
||||
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
|
||||
expect(callGateway).toHaveBeenCalledOnce();
|
||||
expect(firstGatewayParams().message).toContain("assistant final was already captured");
|
||||
const store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:main"]?.status).toBe("running");
|
||||
expect(store["agent:main:main"]?.pendingFinalDelivery).toBe(true);
|
||||
expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe(
|
||||
"assistant final was already captured",
|
||||
);
|
||||
});
|
||||
|
||||
it("does not scan ordinary running sessions without the restart-aborted marker", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
@@ -657,6 +690,207 @@ describe("main-session-restart-recovery", () => {
|
||||
expect(callGateway).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips restart-aborted sessions that a current process owns", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:active-key": {
|
||||
sessionId: "active-key-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
"agent:main:active-id": {
|
||||
sessionId: "active-id-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
"agent:main:recoverable": {
|
||||
sessionId: "recoverable-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "active-key-session", [
|
||||
{ role: "user", content: "new run owns this key" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
await writeTranscript(sessionsDir, "active-id-session", [
|
||||
{ role: "user", content: "new run owns this id" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
await writeTranscript(sessionsDir, "recoverable-session", [
|
||||
{ role: "user", content: "recover this one" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
|
||||
const result = await recoverRestartAbortedMainSessions({
|
||||
stateDir: tmpDir,
|
||||
activeSessionKeys: ["agent:main:active-key"],
|
||||
activeSessionIds: ["active-key-session", "active-id-session"],
|
||||
});
|
||||
|
||||
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 2 });
|
||||
expect(callGateway).toHaveBeenCalledOnce();
|
||||
const store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:active-key"]?.abortedLastRun).toBe(true);
|
||||
expect(store["agent:main:active-id"]?.abortedLastRun).toBe(true);
|
||||
expect(store["agent:main:recoverable"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("recovers duplicate-key restart-aborted rows when the active run owns a different session id", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "stale-session",
|
||||
updatedAt: Date.now() - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "stale-session", [
|
||||
{ role: "user", content: "recover the stale duplicate" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
|
||||
const result = await recoverRestartAbortedMainSessions({
|
||||
stateDir: tmpDir,
|
||||
activeSessionKeys: ["agent:main:main"],
|
||||
activeSessionIds: ["new-current-session"],
|
||||
});
|
||||
|
||||
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
|
||||
expect(callGateway).toHaveBeenCalledOnce();
|
||||
const store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:main"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("marks startup-orphaned running main sessions before recovery", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
const cutoff = Date.now();
|
||||
await writeStore(sessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "main-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
"agent:main:active-key": {
|
||||
sessionId: "active-key-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
"agent:main:active-id": {
|
||||
sessionId: "active-id-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
"agent:main:fresh": {
|
||||
sessionId: "fresh-session",
|
||||
updatedAt: cutoff + 1,
|
||||
status: "running",
|
||||
},
|
||||
"agent:main:subagent:child": {
|
||||
sessionId: "child-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
spawnDepth: 1,
|
||||
},
|
||||
"agent:main:cron:nightly": {
|
||||
sessionId: "cron-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
"agent:main:completed": {
|
||||
sessionId: "completed-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "done",
|
||||
},
|
||||
"agent:main:already-marked": {
|
||||
sessionId: "already-marked-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
await writeTranscript(sessionsDir, "main-session", [
|
||||
{ role: "user", content: "run the tool" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
await writeTranscript(sessionsDir, "already-marked-session", [
|
||||
{ role: "user", content: "already interrupted" },
|
||||
{ role: "toolResult", content: "done" },
|
||||
]);
|
||||
|
||||
const marked = await markStartupOrphanedMainSessionsForRecovery({
|
||||
stateDir: tmpDir,
|
||||
activeSessionKeys: ["agent:main:active-key"],
|
||||
activeSessionIds: ["active-key-session", "active-id-session"],
|
||||
updatedBeforeMs: cutoff,
|
||||
});
|
||||
|
||||
expect(marked).toEqual({ marked: 1, skipped: 2 });
|
||||
let store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:main"]?.abortedLastRun).toBe(true);
|
||||
expect(store["agent:main:active-key"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:active-id"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:fresh"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:subagent:child"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:cron:nightly"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:completed"]?.abortedLastRun).toBeUndefined();
|
||||
expect(store["agent:main:already-marked"]?.abortedLastRun).toBe(true);
|
||||
|
||||
const recovered = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
|
||||
|
||||
expect(recovered).toEqual({ recovered: 2, failed: 0, skipped: 0 });
|
||||
expect(callGateway).toHaveBeenCalledTimes(2);
|
||||
store = readSessionStoreForTest(path.join(sessionsDir, "sessions.json"));
|
||||
expect(store["agent:main:main"]?.abortedLastRun).toBe(false);
|
||||
expect(store["agent:main:already-marked"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("recovers only the configured store for duplicate startup-orphaned session keys", async () => {
|
||||
const cutoff = Date.now();
|
||||
const defaultSessionsDir = await makeSessionsDir();
|
||||
await writeStore(defaultSessionsDir, {
|
||||
"agent:main:main": {
|
||||
sessionId: "default-main-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
});
|
||||
await writeTranscript(defaultSessionsDir, "default-main-session", [
|
||||
{ role: "user", content: "continue default" },
|
||||
{ role: "toolResult", content: "default result" },
|
||||
]);
|
||||
|
||||
const customStorePath = path.join(tmpDir, "custom-startup-duplicate", "sessions.json");
|
||||
await writeSessionStoreForTestAsync(customStorePath, {
|
||||
"agent:main:main": {
|
||||
sessionId: "custom-main-session",
|
||||
updatedAt: cutoff - 10_000,
|
||||
status: "running",
|
||||
},
|
||||
});
|
||||
await writeTranscript(path.dirname(customStorePath), "custom-main-session", [
|
||||
{ role: "user", content: "continue custom" },
|
||||
{ role: "toolResult", content: "custom result" },
|
||||
]);
|
||||
|
||||
const result = await recoverStartupOrphanedMainSessions({
|
||||
cfg: { session: { store: customStorePath } },
|
||||
stateDir: tmpDir,
|
||||
updatedBeforeMs: cutoff,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ marked: 2, recovered: 1, failed: 0, skipped: 1 });
|
||||
expect(callGateway).toHaveBeenCalledOnce();
|
||||
const defaultStore = readSessionStoreForTest(path.join(defaultSessionsDir, "sessions.json"));
|
||||
const customStore = readSessionStoreForTest(customStorePath);
|
||||
expect(defaultStore["agent:main:main"]?.abortedLastRun).toBe(true);
|
||||
expect(customStore["agent:main:main"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("fails marked sessions whose transcript tail cannot be resumed", async () => {
|
||||
const sessionsDir = await makeSessionsDir();
|
||||
await writeStore(sessionsDir, {
|
||||
|
||||
@@ -30,6 +30,10 @@ import {
|
||||
type DeliveryContext,
|
||||
} from "../utils/delivery-context.shared.js";
|
||||
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
|
||||
import {
|
||||
listActiveEmbeddedRunSessionIds,
|
||||
listActiveEmbeddedRunSessionKeys,
|
||||
} from "./embedded-agent-runner/run-state.js";
|
||||
import { resolveAgentSessionDirs } from "./session-dirs.js";
|
||||
import type { SessionLockInspection } from "./session-write-lock.js";
|
||||
|
||||
@@ -65,6 +69,22 @@ function normalizeStringSet(values: Iterable<string> | undefined): Set<string> {
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function normalizeFiniteTimestamp(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
function hasCurrentProcessOwner(params: {
|
||||
activeSessionIds: Set<string>;
|
||||
activeSessionKeys: Set<string>;
|
||||
entry: SessionEntry;
|
||||
sessionKey: string;
|
||||
}): boolean {
|
||||
if (params.activeSessionIds.has(params.entry.sessionId)) {
|
||||
return true;
|
||||
}
|
||||
return params.activeSessionIds.size === 0 && params.activeSessionKeys.has(params.sessionKey);
|
||||
}
|
||||
|
||||
function normalizeTranscriptLockPath(lockPath: string): string | undefined {
|
||||
const trimmed = lockPath.trim();
|
||||
if (!path.basename(trimmed).endsWith(".jsonl.lock")) {
|
||||
@@ -197,6 +217,72 @@ export async function markRestartAbortedMainSessions(params: {
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function markStartupOrphanedMainSessionsForRecovery(params: {
|
||||
cfg?: OpenClawConfig;
|
||||
stateDir?: string;
|
||||
activeSessionIds?: Iterable<string>;
|
||||
activeSessionKeys?: Iterable<string>;
|
||||
updatedBeforeMs?: number;
|
||||
}): Promise<{ marked: number; skipped: number }> {
|
||||
const result = { marked: 0, skipped: 0 };
|
||||
const providedActiveSessionIds =
|
||||
params.activeSessionIds === undefined ? undefined : normalizeStringSet(params.activeSessionIds);
|
||||
const providedActiveSessionKeys =
|
||||
params.activeSessionKeys === undefined
|
||||
? undefined
|
||||
: normalizeStringSet(params.activeSessionKeys);
|
||||
const updatedBeforeMs = normalizeFiniteTimestamp(params.updatedBeforeMs);
|
||||
const resolveActiveSessionIds = () =>
|
||||
providedActiveSessionIds ?? normalizeStringSet(listActiveEmbeddedRunSessionIds());
|
||||
const resolveActiveSessionKeys = () =>
|
||||
providedActiveSessionKeys ?? normalizeStringSet(listActiveEmbeddedRunSessionKeys());
|
||||
|
||||
for (const storePath of await resolveRestartRecoveryStorePaths(params)) {
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
if (!entry || entry.status !== "running" || entry.abortedLastRun === true) {
|
||||
continue;
|
||||
}
|
||||
if (shouldSkipMainRecovery(entry, sessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
const updatedAt = normalizeFiniteTimestamp(entry.updatedAt);
|
||||
if (
|
||||
updatedBeforeMs !== undefined &&
|
||||
updatedAt !== undefined &&
|
||||
updatedAt > updatedBeforeMs
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
hasCurrentProcessOwner({
|
||||
activeSessionIds: resolveActiveSessionIds(),
|
||||
activeSessionKeys: resolveActiveSessionKeys(),
|
||||
entry,
|
||||
sessionKey,
|
||||
})
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
entry.abortedLastRun = true;
|
||||
entry.updatedAt = Date.now();
|
||||
store[sessionKey] = entry;
|
||||
result.marked++;
|
||||
}
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
}
|
||||
|
||||
if (result.marked > 0) {
|
||||
log.warn(`marked ${result.marked} startup-orphaned main session(s) for restart recovery`);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function getMessageRole(message: unknown): string | undefined {
|
||||
if (!message || typeof message !== "object") {
|
||||
return undefined;
|
||||
@@ -504,12 +590,48 @@ export async function markRestartAbortedMainSessionsFromLocks(params: {
|
||||
return result;
|
||||
}
|
||||
|
||||
function isRoutableRecoveryStore(params: {
|
||||
cfg?: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
storePath: string;
|
||||
}): boolean {
|
||||
if (!params.cfg) {
|
||||
return true;
|
||||
}
|
||||
if (!params.cfg.session?.store) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
cfg: params.cfg,
|
||||
key: params.sessionKey,
|
||||
scanLegacyKeys: true,
|
||||
});
|
||||
return path.resolve(target.storePath) === path.resolve(params.storePath);
|
||||
} catch (err) {
|
||||
log.warn(`failed to resolve recovery store for ${params.sessionKey}: ${String(err)}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function recoverStore(params: {
|
||||
cfg?: OpenClawConfig;
|
||||
storePath: string;
|
||||
resumedSessionKeys: Set<string>;
|
||||
activeSessionIds?: Iterable<string>;
|
||||
activeSessionKeys?: Iterable<string>;
|
||||
}): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||
const result = { recovered: 0, failed: 0, skipped: 0 };
|
||||
const providedActiveSessionIds =
|
||||
params.activeSessionIds === undefined ? undefined : normalizeStringSet(params.activeSessionIds);
|
||||
const providedActiveSessionKeys =
|
||||
params.activeSessionKeys === undefined
|
||||
? undefined
|
||||
: normalizeStringSet(params.activeSessionKeys);
|
||||
const resolveActiveSessionIds = () =>
|
||||
providedActiveSessionIds ?? normalizeStringSet(listActiveEmbeddedRunSessionIds());
|
||||
const resolveActiveSessionKeys = () =>
|
||||
providedActiveSessionKeys ?? normalizeStringSet(listActiveEmbeddedRunSessionKeys());
|
||||
let store: Record<string, SessionEntry>;
|
||||
try {
|
||||
store = loadSessionStore(params.storePath);
|
||||
@@ -529,10 +651,49 @@ async function recoverStore(params: {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
if (params.resumedSessionKeys.has(sessionKey)) {
|
||||
if (
|
||||
!isRoutableRecoveryStore({
|
||||
cfg: params.cfg,
|
||||
sessionKey,
|
||||
storePath: params.storePath,
|
||||
})
|
||||
) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
hasCurrentProcessOwner({
|
||||
activeSessionIds: resolveActiveSessionIds(),
|
||||
activeSessionKeys: resolveActiveSessionKeys(),
|
||||
entry,
|
||||
sessionKey,
|
||||
})
|
||||
) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
const resumeDedupeKey = sessionKey;
|
||||
if (params.resumedSessionKeys.has(resumeDedupeKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.pendingFinalDelivery === true && entry.pendingFinalDeliveryText) {
|
||||
const resumed = await resumeMainSession({
|
||||
cfg: params.cfg,
|
||||
entry,
|
||||
storePath: params.storePath,
|
||||
sessionKey,
|
||||
pendingFinalDeliveryText: entry.pendingFinalDeliveryText,
|
||||
});
|
||||
if (resumed) {
|
||||
params.resumedSessionKeys.add(resumeDedupeKey);
|
||||
result.recovered++;
|
||||
} else {
|
||||
result.failed++;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let messages: unknown[];
|
||||
try {
|
||||
@@ -577,7 +738,7 @@ async function recoverStore(params: {
|
||||
pendingFinalDeliveryText: entry.pendingFinalDeliveryText,
|
||||
});
|
||||
if (resumed) {
|
||||
params.resumedSessionKeys.add(sessionKey);
|
||||
params.resumedSessionKeys.add(resumeDedupeKey);
|
||||
result.recovered++;
|
||||
} else {
|
||||
result.failed++;
|
||||
@@ -610,6 +771,8 @@ export async function recoverRestartAbortedMainSessions(
|
||||
cfg?: OpenClawConfig;
|
||||
stateDir?: string;
|
||||
resumedSessionKeys?: Set<string>;
|
||||
activeSessionIds?: Iterable<string>;
|
||||
activeSessionKeys?: Iterable<string>;
|
||||
} = {},
|
||||
): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||
const result = { recovered: 0, failed: 0, skipped: 0 };
|
||||
@@ -620,6 +783,8 @@ export async function recoverRestartAbortedMainSessions(
|
||||
cfg: params.cfg,
|
||||
storePath,
|
||||
resumedSessionKeys,
|
||||
activeSessionIds: params.activeSessionIds,
|
||||
activeSessionKeys: params.activeSessionKeys,
|
||||
});
|
||||
result.recovered += storeResult.recovered;
|
||||
result.failed += storeResult.failed;
|
||||
@@ -634,6 +799,39 @@ export async function recoverRestartAbortedMainSessions(
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function recoverStartupOrphanedMainSessions(
|
||||
params: {
|
||||
cfg?: OpenClawConfig;
|
||||
stateDir?: string;
|
||||
activeSessionIds?: Iterable<string>;
|
||||
activeSessionKeys?: Iterable<string>;
|
||||
updatedBeforeMs?: number;
|
||||
resumedSessionKeys?: Set<string>;
|
||||
} = {},
|
||||
): Promise<{ marked: number; recovered: number; failed: number; skipped: number }> {
|
||||
const startupRecoveryCutoffMs = params.updatedBeforeMs ?? Date.now();
|
||||
const marked = await markStartupOrphanedMainSessionsForRecovery({
|
||||
cfg: params.cfg,
|
||||
stateDir: params.stateDir,
|
||||
activeSessionIds: params.activeSessionIds,
|
||||
activeSessionKeys: params.activeSessionKeys,
|
||||
updatedBeforeMs: startupRecoveryCutoffMs,
|
||||
});
|
||||
const recovered = await recoverRestartAbortedMainSessions({
|
||||
cfg: params.cfg,
|
||||
stateDir: params.stateDir,
|
||||
resumedSessionKeys: params.resumedSessionKeys,
|
||||
activeSessionIds: params.activeSessionIds,
|
||||
activeSessionKeys: params.activeSessionKeys,
|
||||
});
|
||||
return {
|
||||
marked: marked.marked,
|
||||
recovered: recovered.recovered,
|
||||
failed: recovered.failed,
|
||||
skipped: marked.skipped + recovered.skipped,
|
||||
};
|
||||
}
|
||||
|
||||
export function scheduleRestartAbortedMainSessionRecovery(
|
||||
params: {
|
||||
cfg?: OpenClawConfig;
|
||||
@@ -645,29 +843,41 @@ export function scheduleRestartAbortedMainSessionRecovery(
|
||||
const initialDelay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS;
|
||||
const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES;
|
||||
const resumedSessionKeys = new Set<string>();
|
||||
// Only reconcile rows that existed before this startup recovery was scheduled.
|
||||
// Fresh runs started by this gateway are protected again by the active-run check.
|
||||
const startupRecoveryCutoffMs = Date.now();
|
||||
|
||||
const attemptRecovery = (attempt: number, delay: number) => {
|
||||
setTimeout(() => {
|
||||
void recoverRestartAbortedMainSessions({
|
||||
cfg: params.cfg,
|
||||
stateDir: params.stateDir,
|
||||
resumedSessionKeys,
|
||||
const runRecoveryAttempt = (attempt: number, delay: number) => {
|
||||
void recoverStartupOrphanedMainSessions({
|
||||
cfg: params.cfg,
|
||||
stateDir: params.stateDir,
|
||||
resumedSessionKeys,
|
||||
updatedBeforeMs: startupRecoveryCutoffMs,
|
||||
})
|
||||
.then((result) => {
|
||||
if (result.failed > 0 && attempt < maxRetries) {
|
||||
scheduleAttempt(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER);
|
||||
}
|
||||
})
|
||||
.then((result) => {
|
||||
if (result.failed > 0 && attempt < maxRetries) {
|
||||
attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER);
|
||||
}
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
if (attempt < maxRetries) {
|
||||
log.warn(`main-session restart recovery failed: ${String(err)}`);
|
||||
attemptRecovery(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER);
|
||||
} else {
|
||||
log.warn(`main-session restart recovery gave up: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
.catch((err: unknown) => {
|
||||
if (attempt < maxRetries) {
|
||||
log.warn(`main-session restart recovery failed: ${String(err)}`);
|
||||
scheduleAttempt(attempt + 1, delay * RETRY_BACKOFF_MULTIPLIER);
|
||||
} else {
|
||||
log.warn(`main-session restart recovery gave up: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const scheduleAttempt = (attempt: number, delay: number) => {
|
||||
if (delay <= 0) {
|
||||
runRecoveryAttempt(attempt, delay);
|
||||
return;
|
||||
}
|
||||
setTimeout(() => {
|
||||
runRecoveryAttempt(attempt, delay);
|
||||
}, delay).unref?.();
|
||||
};
|
||||
|
||||
attemptRecovery(1, initialDelay);
|
||||
scheduleAttempt(1, initialDelay);
|
||||
}
|
||||
|
||||
@@ -26,6 +26,18 @@ const hoisted = vi.hoisted(() => {
|
||||
const startGatewayTailscaleExposure = vi.fn(async () => null);
|
||||
const logGatewayStartup = vi.fn();
|
||||
const scheduleSubagentOrphanRecovery = vi.fn();
|
||||
const markRestartAbortedMainSessionsFromLocks = vi.fn(async () => {});
|
||||
const markStartupOrphanedMainSessionsForRecovery = vi.fn(async () => ({
|
||||
marked: 0,
|
||||
skipped: 0,
|
||||
}));
|
||||
const recoverStartupOrphanedMainSessions = vi.fn(async () => ({
|
||||
marked: 0,
|
||||
recovered: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
}));
|
||||
const scheduleRestartAbortedMainSessionRecovery = vi.fn();
|
||||
const shouldWakeFromRestartSentinel = vi.fn(() => false);
|
||||
const scheduleRestartSentinelWake = vi.fn();
|
||||
const refreshLatestUpdateRestartSentinel = vi.fn<
|
||||
@@ -75,6 +87,10 @@ const hoisted = vi.hoisted(() => {
|
||||
startGatewayTailscaleExposure,
|
||||
logGatewayStartup,
|
||||
scheduleSubagentOrphanRecovery,
|
||||
markRestartAbortedMainSessionsFromLocks,
|
||||
markStartupOrphanedMainSessionsForRecovery,
|
||||
recoverStartupOrphanedMainSessions,
|
||||
scheduleRestartAbortedMainSessionRecovery,
|
||||
shouldWakeFromRestartSentinel,
|
||||
scheduleRestartSentinelWake,
|
||||
refreshLatestUpdateRestartSentinel,
|
||||
@@ -107,6 +123,13 @@ vi.mock("../agents/subagent-registry.js", () => ({
|
||||
scheduleSubagentOrphanRecovery: hoisted.scheduleSubagentOrphanRecovery,
|
||||
}));
|
||||
|
||||
vi.mock("../agents/main-session-restart-recovery.js", () => ({
|
||||
markRestartAbortedMainSessionsFromLocks: hoisted.markRestartAbortedMainSessionsFromLocks,
|
||||
markStartupOrphanedMainSessionsForRecovery: hoisted.markStartupOrphanedMainSessionsForRecovery,
|
||||
recoverStartupOrphanedMainSessions: hoisted.recoverStartupOrphanedMainSessions,
|
||||
scheduleRestartAbortedMainSessionRecovery: hoisted.scheduleRestartAbortedMainSessionRecovery,
|
||||
}));
|
||||
|
||||
vi.mock("../config/paths.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../config/paths.js")>("../config/paths.js");
|
||||
return {
|
||||
@@ -290,6 +313,20 @@ describe("startGatewayPostAttachRuntime", () => {
|
||||
hoisted.startGatewayTailscaleExposure.mockClear();
|
||||
hoisted.logGatewayStartup.mockClear();
|
||||
hoisted.scheduleSubagentOrphanRecovery.mockClear();
|
||||
hoisted.markRestartAbortedMainSessionsFromLocks.mockClear();
|
||||
hoisted.markStartupOrphanedMainSessionsForRecovery.mockReset();
|
||||
hoisted.markStartupOrphanedMainSessionsForRecovery.mockResolvedValue({
|
||||
marked: 0,
|
||||
skipped: 0,
|
||||
});
|
||||
hoisted.recoverStartupOrphanedMainSessions.mockReset();
|
||||
hoisted.recoverStartupOrphanedMainSessions.mockResolvedValue({
|
||||
marked: 0,
|
||||
recovered: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
});
|
||||
hoisted.scheduleRestartAbortedMainSessionRecovery.mockClear();
|
||||
hoisted.shouldWakeFromRestartSentinel.mockReturnValue(false);
|
||||
hoisted.scheduleRestartSentinelWake.mockClear();
|
||||
hoisted.getAcpRuntimeBackend.mockReset();
|
||||
@@ -348,6 +385,9 @@ describe("startGatewayPostAttachRuntime", () => {
|
||||
expect(hoisted.logGatewayStartup).toHaveBeenCalledTimes(1);
|
||||
expect(firstStartupLog().loadedPluginIds).toEqual(["beta", "alpha"]);
|
||||
expect(log.info).toHaveBeenCalledWith("gateway ready");
|
||||
expect(hoisted.scheduleRestartAbortedMainSessionRecovery).toHaveBeenCalledWith({
|
||||
cfg: { hooks: { internal: { enabled: false } } },
|
||||
});
|
||||
expect(hoisted.startGatewayMemoryBackend).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -1483,6 +1523,89 @@ describe("startGatewayPostAttachRuntime", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("marks startup main-session orphans before channel startup", async () => {
|
||||
const events: string[] = [];
|
||||
let releaseMarking: (() => void) | undefined;
|
||||
const startChannels = vi.fn(async () => {
|
||||
events.push("channels");
|
||||
});
|
||||
hoisted.markStartupOrphanedMainSessionsForRecovery.mockImplementationOnce(
|
||||
async () =>
|
||||
await new Promise<{ marked: number; skipped: number }>((resolve) => {
|
||||
events.push("main-session-mark:start");
|
||||
releaseMarking = () => {
|
||||
events.push("main-session-mark:done");
|
||||
resolve({ marked: 1, skipped: 0 });
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
const sidecars = startGatewaySidecars({
|
||||
cfg: { hooks: { internal: { enabled: false } } } as never,
|
||||
pluginRegistry: createPostAttachParams().pluginRegistry,
|
||||
defaultWorkspaceDir: "/tmp/openclaw-workspace",
|
||||
deps: {} as never,
|
||||
startChannels,
|
||||
log: { warn: vi.fn() },
|
||||
logHooks: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
logChannels: {
|
||||
info: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(events).toEqual(["main-session-mark:start"]);
|
||||
});
|
||||
expect(startChannels).not.toHaveBeenCalled();
|
||||
|
||||
if (!releaseMarking) {
|
||||
throw new Error("Expected marker release callback to be initialized");
|
||||
}
|
||||
releaseMarking();
|
||||
await sidecars;
|
||||
|
||||
expect(events).toEqual(["main-session-mark:start", "main-session-mark:done", "channels"]);
|
||||
expect(startChannels).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.scheduleRestartAbortedMainSessionRecovery).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("logs startup main-session marker failures and still starts channels", async () => {
|
||||
const log = { warn: vi.fn() };
|
||||
const startChannels = vi.fn(async () => {});
|
||||
hoisted.markStartupOrphanedMainSessionsForRecovery.mockRejectedValueOnce(
|
||||
new Error("store unreadable"),
|
||||
);
|
||||
|
||||
await startGatewaySidecars({
|
||||
cfg: { hooks: { internal: { enabled: false } } } as never,
|
||||
pluginRegistry: createPostAttachParams().pluginRegistry,
|
||||
defaultWorkspaceDir: "/tmp/openclaw-workspace",
|
||||
deps: {} as never,
|
||||
startChannels,
|
||||
log,
|
||||
logHooks: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
logChannels: {
|
||||
info: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
});
|
||||
|
||||
expect(log.warn).toHaveBeenCalledWith(
|
||||
"main-session startup orphan marking failed before channel startup: Error: store unreadable",
|
||||
);
|
||||
expect(hoisted.scheduleRestartAbortedMainSessionRecovery).not.toHaveBeenCalled();
|
||||
expect(startChannels).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("emits a sidecar readiness summary in startup trace details", async () => {
|
||||
const trace = createStartupTraceRecorder();
|
||||
|
||||
|
||||
@@ -781,6 +781,17 @@ export async function startGatewaySidecars(params: {
|
||||
const skipChannels =
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS);
|
||||
await measureStartup(params.startupTrace, "sidecars.main-session-recovery", async () => {
|
||||
try {
|
||||
const { markStartupOrphanedMainSessionsForRecovery } =
|
||||
await loadMainSessionRestartRecoveryModule();
|
||||
await markStartupOrphanedMainSessionsForRecovery({ cfg: params.cfg });
|
||||
} catch (err) {
|
||||
params.log.warn(
|
||||
`main-session startup orphan marking failed before channel startup: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
});
|
||||
await measureStartup(params.startupTrace, "sidecars.channels", async () => {
|
||||
if (!skipChannels) {
|
||||
try {
|
||||
@@ -947,17 +958,6 @@ export async function startGatewaySidecars(params: {
|
||||
},
|
||||
});
|
||||
|
||||
schedulePostReadySidecarTask({
|
||||
startupTrace: params.startupTrace,
|
||||
name: "sidecars.main-session-recovery",
|
||||
log: params.log,
|
||||
run: async () => {
|
||||
const { scheduleRestartAbortedMainSessionRecovery } =
|
||||
await loadMainSessionRestartRecoveryModule();
|
||||
scheduleRestartAbortedMainSessionRecovery({ cfg: params.cfg });
|
||||
},
|
||||
});
|
||||
|
||||
if (params.cfg.hooks?.enabled && params.cfg.hooks.gmail?.account) {
|
||||
postReadySidecars.push(
|
||||
schedulePostReadySidecarTask({
|
||||
@@ -1333,6 +1333,13 @@ export async function startGatewayPostAttachRuntime(
|
||||
for (const method of STARTUP_UNAVAILABLE_GATEWAY_METHODS) {
|
||||
params.unavailableGatewayMethods.delete(method);
|
||||
}
|
||||
try {
|
||||
const { scheduleRestartAbortedMainSessionRecovery } =
|
||||
await loadMainSessionRestartRecoveryModule();
|
||||
scheduleRestartAbortedMainSessionRecovery({ cfg: params.cfgAtStart });
|
||||
} catch (err) {
|
||||
params.log.warn(`main-session restart recovery failed to schedule: ${String(err)}`);
|
||||
}
|
||||
if (!pluginServicesReported) {
|
||||
reportPluginServices(result.pluginServices);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user