mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:50:43 +00:00
fix(cron): persist startup state sidecar repairs
This commit is contained in:
@@ -69,6 +69,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Control UI/Gateway: avoid full session-list reloads for locally applied message-phase session updates, carry known session keys through transcript-file update events, and defer media provider listing when explicit generation model config is present. Refs #76236, #76203, #76188, #76107, and #76166. Thanks @BunsDev.
|
||||
- Install/update: prune the obsolete `plugin-runtime-deps` state directory during packaged postinstall so upgrades from pre-2026.5.2 releases reclaim old bundled-plugin dependency caches without touching external plugin installs.
|
||||
- Auto-reply/queue: treat reset-triggered `/new` and `/reset` turns as interrupt runs across active-run queue handling, so steer/followup modes cannot delay a fresh session behind existing work. Fixes #74093. (#74144) Thanks @ruji9527 and @yelog.
|
||||
- Cron: persist repaired startup runtime state back to `jobs-state.json` so a valid future `nextRunAtMs` with missing `updatedAtMs` no longer triggers repeated external health-check repairs after Gateway restart. Fixes #76461. Thanks @vincentkoc.
|
||||
- Cron: preserve manual `cron.run` IDs in `cron.runs` history so manual run acknowledgements can be correlated with finished run records. Fixes #76276.
|
||||
- CLI/devices: request `operator.admin` for `openclaw devices approve <requestId>` only when the exact pending device request would mint or inherit admin-scoped operator access, while keeping lower-scope approvals on the pairing scope.
|
||||
- Memory/embedding: broaden the embedding reindex retry classifier to include transient socket-layer errors (`fetch failed`, `ECONNRESET`, `socket hang up`, `UND_ERR_*`, `closed`) so memory reindex survives provider network hiccups instead of aborting mid-run. Related #56815, #44166. (#76311) Thanks @buyitsydney.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
@@ -182,6 +183,83 @@ describe("cron service ops seam coverage", () => {
|
||||
stop(state);
|
||||
});
|
||||
|
||||
it("start persists load-time updatedAtMs repairs to the state sidecar only", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.parse("2026-04-09T08:00:00.000Z");
|
||||
const createdAtMs = now - 86_400_000;
|
||||
const nextRunAtMs = Date.parse("2026-04-10T09:00:00.000Z");
|
||||
const jobId = "future-sidecar-repair";
|
||||
const statePath = storePath.replace(/\.json$/, "-state.json");
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: jobId,
|
||||
name: "future sidecar repair",
|
||||
enabled: true,
|
||||
createdAtMs,
|
||||
schedule: { kind: "cron", expr: "0 9 * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "daily" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
statePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: {
|
||||
[jobId]: {
|
||||
state: { nextRunAtMs },
|
||||
},
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
const configBefore = await fs.readFile(storePath, "utf-8");
|
||||
|
||||
const state = createCronServiceState({
|
||||
storePath,
|
||||
cronEnabled: true,
|
||||
log: logger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeat: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
try {
|
||||
await start(state);
|
||||
|
||||
const configAfter = await fs.readFile(storePath, "utf-8");
|
||||
const persistedState = JSON.parse(await fs.readFile(statePath, "utf-8")) as {
|
||||
jobs: Record<string, { updatedAtMs?: unknown; state?: { nextRunAtMs?: unknown } }>;
|
||||
};
|
||||
|
||||
expect(configAfter).toBe(configBefore);
|
||||
expect(persistedState.jobs[jobId]?.updatedAtMs).toBe(createdAtMs);
|
||||
expect(persistedState.jobs[jobId]?.state?.nextRunAtMs).toBe(nextRunAtMs);
|
||||
} finally {
|
||||
stop(state);
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps manual acknowledgement IDs separate from recoverable task run IDs", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.parse("2026-03-23T12:00:00.000Z");
|
||||
|
||||
@@ -160,8 +160,8 @@ export async function start(state: CronServiceState) {
|
||||
markedAnyInterruptedRun = true;
|
||||
}
|
||||
}
|
||||
if (markedAnyInterruptedRun) {
|
||||
await persist(state);
|
||||
if (markedAnyInterruptedRun || jobs.length > 0) {
|
||||
await persist(state, markedAnyInterruptedRun ? undefined : { stateOnly: true });
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -149,7 +149,10 @@ export function warnIfDisabled(state: CronServiceState, action: string) {
|
||||
);
|
||||
}
|
||||
|
||||
export async function persist(state: CronServiceState, opts?: { skipBackup?: boolean }) {
|
||||
export async function persist(
|
||||
state: CronServiceState,
|
||||
opts?: { skipBackup?: boolean; stateOnly?: boolean },
|
||||
) {
|
||||
if (!state.store) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -321,6 +321,7 @@ export function loadCronStoreSync(storePath: string): CronStoreFile {
|
||||
|
||||
type SaveCronStoreOptions = {
|
||||
skipBackup?: boolean;
|
||||
stateOnly?: boolean;
|
||||
};
|
||||
|
||||
async function setSecureFileMode(filePath: string): Promise<void> {
|
||||
@@ -361,6 +362,7 @@ export async function saveCronStore(
|
||||
store: CronStoreFile,
|
||||
opts?: SaveCronStoreOptions,
|
||||
) {
|
||||
const stateOnly = opts?.stateOnly === true;
|
||||
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
|
||||
const stateFile = extractStateFile(store);
|
||||
const stateJson = JSON.stringify(stateFile, null, 2);
|
||||
@@ -368,13 +370,17 @@ export async function saveCronStore(
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const cache = serializedStoreCache.get(storePath);
|
||||
|
||||
const configChanged = cache?.configJson !== configJson;
|
||||
const configChanged = !stateOnly && cache?.configJson !== configJson;
|
||||
const stateChanged = cache?.stateJson !== stateJson;
|
||||
const migrating = cache?.needsSplitMigration === true;
|
||||
const configNeedsWrite = await serializedFileNeedsWrite(storePath, configJson, configChanged);
|
||||
const configNeedsWrite = stateOnly
|
||||
? false
|
||||
: await serializedFileNeedsWrite(storePath, configJson, configChanged);
|
||||
const stateNeedsWrite = await serializedFileNeedsWrite(statePath, stateJson, stateChanged);
|
||||
|
||||
if (!configNeedsWrite && !stateNeedsWrite && !migrating) {
|
||||
if (
|
||||
stateOnly ? !stateNeedsWrite && !migrating : !configNeedsWrite && !stateNeedsWrite && !migrating
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -386,7 +392,7 @@ export async function saveCronStore(
|
||||
updatedCache.stateJson = stateJson;
|
||||
}
|
||||
|
||||
if (configNeedsWrite || migrating) {
|
||||
if (!stateOnly && (configNeedsWrite || migrating)) {
|
||||
// Determine backup need: only when config actually changed (not migration-only).
|
||||
const skipBackup = opts?.skipBackup === true || !configChanged;
|
||||
if (!skipBackup) {
|
||||
@@ -401,7 +407,7 @@ export async function saveCronStore(
|
||||
await atomicWrite(storePath, configJson);
|
||||
updatedCache.configJson = configJson;
|
||||
}
|
||||
updatedCache.needsSplitMigration = false;
|
||||
updatedCache.needsSplitMigration = stateOnly && migrating;
|
||||
}
|
||||
|
||||
const RENAME_MAX_RETRIES = 3;
|
||||
|
||||
Reference in New Issue
Block a user