refactor: simplify sqlite cron persistence

This commit is contained in:
Peter Steinberger
2026-05-30 22:11:17 +01:00
committed by GitHub
parent 76b300babc
commit a825b5576b
17 changed files with 723 additions and 698 deletions

View File

@@ -0,0 +1,238 @@
import fs from "node:fs/promises";
import path from "node:path";
import { tryCronScheduleIdentity } from "../cron/schedule-identity.js";
import type {
CronConfigJobRuntimeEntry,
LoadedCronStore,
QuarantinedCronConfigJob,
} from "../cron/store.js";
import type { CronStoreFile } from "../cron/types.js";
import { isRecord } from "../shared/record-coerce.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js";
const LEGACY_CRON_ARCHIVE_SUFFIX = ".migrated";
function resolveLegacyCronStatePath(storePath: string): string {
if (storePath.endsWith(".json")) {
return storePath.replace(/\.json$/, "-state.json");
}
return `${storePath}-state.json`;
}
async function legacyCronFileExists(filePath: string): Promise<boolean> {
return fs
.access(filePath)
.then(() => true)
.catch(() => false);
}
async function archiveLegacyCronFile(filePath: string): Promise<void> {
if (!(await legacyCronFileExists(filePath))) {
return;
}
const archivePath = `${filePath}${LEGACY_CRON_ARCHIVE_SUFFIX}`;
if (await legacyCronFileExists(archivePath)) {
return;
}
await fs.rename(filePath, archivePath).catch(() => undefined);
}
function parseCronStateFile(raw: string): {
version: 1;
jobs: Record<string, CronConfigJobRuntimeEntry>;
} | null {
try {
const parsed = parseJsonWithJson5Fallback(raw);
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;
}
const record = parsed as Record<string, unknown>;
if (
record.version !== 1 ||
typeof record.jobs !== "object" ||
record.jobs === null ||
Array.isArray(record.jobs)
) {
return null;
}
return {
version: 1,
jobs: record.jobs as Record<string, CronConfigJobRuntimeEntry>,
};
} catch {
return null;
}
}
function getRawCronJobs(parsed: unknown): unknown[] {
return Array.isArray(parsed)
? parsed
: isRecord(parsed) && Array.isArray(parsed.jobs)
? parsed.jobs
: [];
}
function cloneConfigJobs(jobs: Array<Record<string, unknown>>): Array<Record<string, unknown>> {
return jobs.map((job) => structuredClone(job));
}
async function loadStateFile(
statePath: string,
): Promise<{ version: 1; jobs: Record<string, CronConfigJobRuntimeEntry> } | null> {
let raw: string;
try {
raw = await fs.readFile(statePath, "utf-8");
} catch (err) {
if ((err as { code?: unknown })?.code === "ENOENT") {
return null;
}
throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, {
cause: err,
});
}
return parseCronStateFile(raw);
}
function hasInlineState(jobs: Array<Record<string, unknown> | null | undefined>): boolean {
return jobs.some(
(job) => job != null && isRecord(job.state) && Object.keys(job.state).length > 0,
);
}
function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void {
if (!isRecord(job.state)) {
job.state = {} as never;
}
}
function backfillMissingRuntimeFields(job: CronStoreFile["jobs"][number]): void {
ensureJobStateObject(job);
if (typeof job.updatedAtMs !== "number") {
job.updatedAtMs = typeof job.createdAtMs === "number" ? job.createdAtMs : Date.now();
}
}
function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unknown): number {
if (typeof updatedAtMs === "number" && Number.isFinite(updatedAtMs)) {
return updatedAtMs;
}
if (typeof job.updatedAtMs === "number" && Number.isFinite(job.updatedAtMs)) {
return job.updatedAtMs;
}
return typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs)
? job.createdAtMs
: Date.now();
}
function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown): void {
if (!isRecord(entry)) {
backfillMissingRuntimeFields(job);
return;
}
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
job.state = isRecord(entry.state) ? (entry.state as never) : ({} as never);
if (
typeof entry.scheduleIdentity === "string" &&
entry.scheduleIdentity !== tryCronScheduleIdentity(job as unknown as Record<string, unknown>)
) {
ensureJobStateObject(job);
job.state.nextRunAtMs = undefined;
}
}
function resolveCronStateId(job: Record<string, unknown>): string | undefined {
return normalizeOptionalString(job.id) ?? normalizeOptionalString(job.jobId);
}
export async function legacyCronStoreFilesExist(storePath: string): Promise<boolean> {
const resolvedStorePath = path.resolve(storePath);
return (
(await legacyCronFileExists(resolvedStorePath)) ||
(await legacyCronFileExists(resolveLegacyCronStatePath(resolvedStorePath)))
);
}
export async function archiveLegacyCronStoreForMigration(storePath: string): Promise<void> {
const resolvedStorePath = path.resolve(storePath);
await Promise.all([
archiveLegacyCronFile(resolvedStorePath),
archiveLegacyCronFile(resolveLegacyCronStatePath(resolvedStorePath)),
]);
}
export async function loadLegacyCronStoreForMigration(storePath: string): Promise<LoadedCronStore> {
const resolvedStorePath = path.resolve(storePath);
try {
const raw = await fs.readFile(resolvedStorePath, "utf-8");
let parsed: unknown;
try {
parsed = parseJsonWithJson5Fallback(raw);
} catch (err) {
throw new Error(`Failed to parse cron store at ${resolvedStorePath}: ${String(err)}`, {
cause: err,
});
}
const rawJobs = getRawCronJobs(parsed);
const configJobIndexes: number[] = [];
const configRows: Array<Record<string, unknown>> = [];
const configJobRuntimeEntries: CronConfigJobRuntimeEntry[] = [];
const invalidConfigRows: QuarantinedCronConfigJob[] = [];
for (const [index, row] of rawJobs.entries()) {
if (isRecord(row)) {
configJobIndexes.push(index);
configRows.push(row);
} else {
invalidConfigRows.push({
sourceIndex: index,
reason: "non-object-row",
raw: structuredClone(row),
});
}
}
const store: CronStoreFile = {
version: 1,
jobs: configRows as never as CronStoreFile["jobs"],
};
const jobs = store.jobs as unknown as Array<Record<string, unknown>>;
const configJobs = cloneConfigJobs(configRows);
const stateFile = await loadStateFile(resolveLegacyCronStatePath(resolvedStorePath));
const hasLegacyInlineState = !stateFile && hasInlineState(jobs);
if (stateFile) {
for (const job of store.jobs) {
const stateId = resolveCronStateId(job as unknown as Record<string, unknown>);
const entry = stateId ? stateFile.jobs[stateId] : undefined;
configJobRuntimeEntries.push(isRecord(entry) ? structuredClone(entry) : {});
if (entry) {
mergeStateFileEntry(job, entry);
} else {
backfillMissingRuntimeFields(job);
}
}
} else if (!hasLegacyInlineState) {
for (const job of store.jobs) {
backfillMissingRuntimeFields(job);
}
}
for (const job of store.jobs) {
ensureJobStateObject(job);
}
return { store, configJobs, configJobIndexes, configJobRuntimeEntries, invalidConfigRows };
} catch (err) {
if ((err as { code?: unknown })?.code === "ENOENT") {
return {
store: { version: 1, jobs: [] },
configJobs: [],
configJobIndexes: [],
configJobRuntimeEntries: [],
invalidConfigRows: [],
};
}
throw err;
}
}

View File

@@ -438,7 +438,7 @@ describe("maybeRepairLegacyCronStore", () => {
prompter: makePrompter(true),
});
const entries = readCronRunLogEntriesSync(runLogPath);
const entries = readCronRunLogEntriesSync({ storePath, jobId: "sqlite-job" });
expect(entries).toHaveLength(1);
expect(entries[0]?.jobId).toBe("sqlite-job");
expect(entries[0]?.summary).toBe("done");

View File

@@ -6,13 +6,10 @@ import { resolveAgentModelPrimaryValue } from "../config/model-input.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { legacyCronRunLogFilesExist, migrateLegacyCronRunLogsToSqlite } from "../cron/run-log.js";
import {
archiveLegacyCronStoreForMigration,
legacyCronStoreFilesExist,
loadLegacyCronStoreForMigration,
loadCronQuarantineFile,
loadCronStore,
resolveCronQuarantinePath,
resolveCronStorePath,
loadCronStore,
saveCronStore,
} from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
@@ -25,6 +22,11 @@ import {
countStaleDreamingJobs,
migrateLegacyDreamingPayloadShape,
} from "./doctor-cron-dreaming-payload-migration.js";
import {
archiveLegacyCronStoreForMigration,
legacyCronStoreFilesExist,
loadLegacyCronStoreForMigration,
} from "./doctor-cron-legacy-store-migration.js";
import { normalizeStoredCronJobs } from "./doctor-cron-store-migration.js";
import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js";

View File

@@ -21,6 +21,10 @@ async function writeLegacyRunLogAndMigrate(
return file;
}
function fileToStorePath(file: string): string {
return path.join(path.dirname(path.dirname(file)), "jobs.json");
}
describe("cron run log errorReason", () => {
it("backfills errorReason from timeout error text for older entries", async () => {
const file = await writeLegacyRunLogAndMigrate([
@@ -33,13 +37,17 @@ describe("cron run log errorReason", () => {
},
]);
const page = await readCronRunLogEntriesPage(file, { limit: 10 });
const page = await readCronRunLogEntriesPage({
storePath: fileToStorePath(file),
jobId: "job-1",
limit: 10,
});
expect(page.entries[0]?.errorReason).toBe("timeout");
});
it("validates persisted errorReason against the full failover reason set", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-run-log-"));
const file = path.join(dir, "job.jsonl");
const storePath = path.join(dir, "jobs.json");
const reasons = [
"auth",
"auth_permanent",
@@ -57,16 +65,24 @@ describe("cron run log errorReason", () => {
"unknown",
] satisfies Array<NonNullable<CronRunLogEntry["errorReason"]>>;
for (const [index, errorReason] of reasons.entries()) {
await appendCronRunLog(file, {
ts: index + 1,
jobId: "job-1",
action: "finished",
status: "error",
errorReason,
await appendCronRunLog({
storePath,
entry: {
ts: index + 1,
jobId: "job-1",
action: "finished",
status: "error",
errorReason,
},
});
}
const page = await readCronRunLogEntriesPage(file, { limit: 50, sortDir: "asc" });
const page = await readCronRunLogEntriesPage({
storePath,
jobId: "job-1",
limit: 50,
sortDir: "asc",
});
expect(page.entries.map((entry) => entry.errorReason)).toEqual(reasons);
});
@@ -82,38 +98,53 @@ describe("cron run log errorReason", () => {
},
]);
const page = await readCronRunLogEntriesPage(file, { limit: 10 });
const page = await readCronRunLogEntriesPage({
storePath: fileToStorePath(file),
jobId: "job-1",
limit: 10,
});
expect(page.entries[0]?.errorReason).toBe("overloaded");
});
it("uses provider context when deriving persisted run-log reasons", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-run-log-"));
const file = path.join(dir, "job.jsonl");
await appendCronRunLog(file, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
error: "403 Key limit exceeded (monthly limit)",
provider: "openrouter",
const storePath = path.join(dir, "jobs.json");
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
error: "403 Key limit exceeded (monthly limit)",
provider: "openrouter",
},
});
const page = await readCronRunLogEntriesPage(file, { limit: 10 });
const page = await readCronRunLogEntriesPage({ storePath, jobId: "job-1", limit: 10 });
expect(page.entries[0]?.errorReason).toBe("billing");
});
it("includes derived errorReason values in run-log search", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-run-log-"));
const file = path.join(dir, "job.jsonl");
await appendCronRunLog(file, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
error: "cron: job execution timed out",
const storePath = path.join(dir, "jobs.json");
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
error: "cron: job execution timed out",
},
});
const page = await readCronRunLogEntriesPage(file, { limit: 10, query: "timeout" });
const page = await readCronRunLogEntriesPage({
storePath,
jobId: "job-1",
limit: 10,
query: "timeout",
});
expect(page.entries).toHaveLength(1);
expect(page.entries[0]?.errorReason).toBe("timeout");
});

View File

@@ -12,7 +12,6 @@ import {
readCronRunLogEntriesPage,
readCronRunLogEntriesSync,
resolveCronRunLogPruneOptions,
resolveCronRunLogPath,
} from "./run-log.js";
describe("cron run log", () => {
@@ -50,74 +49,75 @@ describe("cron run log", () => {
}
}
it("resolves store path to per-job runs/<jobId>.jsonl", () => {
const storePath = path.join(os.tmpdir(), "cron", "jobs.json");
const p = resolveCronRunLogPath({ storePath, jobId: "job-1" });
expect(p.endsWith(path.join(os.tmpdir(), "cron", "runs", "job-1.jsonl"))).toBe(true);
});
function storePathForDir(dir: string): string {
return path.join(dir, "jobs.json");
}
it("rejects unsafe job ids when resolving run log path", () => {
it("rejects unsafe job ids before querying SQLite run logs", async () => {
const storePath = path.join(os.tmpdir(), "cron", "jobs.json");
expect(() => resolveCronRunLogPath({ storePath, jobId: "../job-1" })).toThrow(
/invalid cron run log job id/i,
);
expect(() => resolveCronRunLogPath({ storePath, jobId: "nested/job-1" })).toThrow(
/invalid cron run log job id/i,
);
expect(() => resolveCronRunLogPath({ storePath, jobId: "..\\job-1" })).toThrow(
/invalid cron run log job id/i,
);
for (const jobId of ["../job-1", "nested/job-1", "..\\job-1"]) {
await expect(readCronRunLogEntriesPage({ storePath, jobId })).rejects.toThrow(
/invalid cron run log job id/i,
);
}
});
it("appends SQLite rows and prunes by line count", async () => {
await withRunLogDir("openclaw-cron-log-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");
const storePath = storePathForDir(dir);
for (let i = 0; i < 10; i++) {
await appendCronRunLog(
logPath,
{
await appendCronRunLog({
storePath,
entry: {
ts: 1000 + i,
jobId: "job-1",
action: "finished",
status: "ok",
durationMs: i,
},
{ maxBytes: 1, keepLines: 3 },
);
opts: { keepLines: 3 },
});
}
const entries = readCronRunLogEntriesSync(logPath, { limit: 10 });
const entries = readCronRunLogEntriesSync({ storePath, jobId: "job-1", limit: 10 });
expect(entries.map((entry) => entry.ts)).toEqual([1007, 1008, 1009]);
const logPath = path.join(dir, "runs", "job-1.jsonl");
await expect(fs.stat(logPath)).rejects.toMatchObject({ code: "ENOENT" });
});
});
it("reads run-log entries synchronously for task reconciliation", async () => {
await withRunLogDir("openclaw-cron-log-sync-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");
await appendCronRunLog(logPath, {
ts: 1000,
jobId: "job-1",
action: "finished",
status: "ok",
runAtMs: 900,
durationMs: 100,
const storePath = storePathForDir(dir);
await appendCronRunLog({
storePath,
entry: {
ts: 1000,
jobId: "job-1",
action: "finished",
status: "ok",
runAtMs: 900,
durationMs: 100,
},
});
await appendCronRunLog(logPath, {
ts: 2000,
jobId: "job-2",
action: "finished",
status: "error",
await appendCronRunLog({
storePath,
entry: {
ts: 2000,
jobId: "job-2",
action: "finished",
status: "error",
},
});
const jobEntries = readCronRunLogEntriesSync(logPath, { jobId: "job-1" });
const jobEntries = readCronRunLogEntriesSync({ storePath, jobId: "job-1" });
expect(jobEntries).toHaveLength(1);
expect(jobEntries[0]?.jobId).toBe("job-1");
expect(jobEntries[0]?.status).toBe("ok");
expect(jobEntries[0]?.runAtMs).toBe(900);
expect(jobEntries[0]?.durationMs).toBe(100);
expect(readCronRunLogEntriesSync(path.join(dir, "runs", "missing.jsonl"))).toStrictEqual([]);
expect(readCronRunLogEntriesSync({ storePath, jobId: "missing" })).toStrictEqual([]);
});
});
@@ -125,13 +125,17 @@ describe("cron run log", () => {
"does not create legacy run log files for new writes",
async () => {
await withRunLogDir("openclaw-cron-log-perms-", async (dir) => {
const storePath = storePathForDir(dir);
const logPath = path.join(dir, "runs", "job-1.jsonl");
await appendCronRunLog(logPath, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
},
});
await expect(fs.stat(logPath)).rejects.toMatchObject({ code: "ENOENT" });
@@ -143,16 +147,19 @@ describe("cron run log", () => {
"does not mutate legacy run-log directory permissions on SQLite writes",
async () => {
await withRunLogDir("openclaw-cron-log-dir-perms-", async (dir) => {
const storePath = storePathForDir(dir);
const runDir = path.join(dir, "runs");
const logPath = path.join(runDir, "job-1.jsonl");
await fs.mkdir(runDir, { recursive: true, mode: 0o755 });
await fs.chmod(runDir, 0o755);
await appendCronRunLog(logPath, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
},
});
const runDirMode = (await fs.stat(runDir)).mode & 0o777;
@@ -163,80 +170,91 @@ describe("cron run log", () => {
it("reads newest entries and filters by jobId", async () => {
await withRunLogDir("openclaw-cron-log-read-", async (dir) => {
const logPathA = path.join(dir, "runs", "a.jsonl");
const logPathB = path.join(dir, "runs", "b.jsonl");
const storePath = storePathForDir(dir);
await appendCronRunLog(logPathA, {
ts: 1,
jobId: "a",
action: "finished",
status: "ok",
await appendCronRunLog({
storePath,
entry: { ts: 1, jobId: "a", action: "finished", status: "ok" },
});
await appendCronRunLog(logPathB, {
ts: 2,
jobId: "b",
action: "finished",
status: "error",
error: "nope",
summary: "oops",
await appendCronRunLog({
storePath,
entry: {
ts: 2,
jobId: "b",
action: "finished",
status: "error",
error: "nope",
summary: "oops",
},
});
await appendCronRunLog(logPathA, {
ts: 3,
jobId: "a",
action: "finished",
status: "skipped",
sessionId: "run-123",
sessionKey: "agent:main:cron:a:run:run-123",
await appendCronRunLog({
storePath,
entry: {
ts: 3,
jobId: "a",
action: "finished",
status: "skipped",
sessionId: "run-123",
sessionKey: "agent:main:cron:a:run:run-123",
},
});
const allA = await readCronRunLogEntries(logPathA, { limit: 10 });
const allA = await readCronRunLogEntries({ storePath, jobId: "a", limit: 10 });
expect(allA.map((e) => e.jobId)).toEqual(["a", "a"]);
const onlyA = await readCronRunLogEntries(logPathA, {
const onlyA = await readCronRunLogEntries({
storePath,
limit: 10,
jobId: "a",
});
expect(onlyA.map((e) => e.ts)).toEqual([1, 3]);
const lastOne = await readCronRunLogEntries(logPathA, { limit: 1 });
const lastOne = await readCronRunLogEntries({ storePath, jobId: "a", limit: 1 });
expect(lastOne.map((e) => e.ts)).toEqual([3]);
expect(lastOne[0]?.sessionId).toBe("run-123");
expect(lastOne[0]?.sessionKey).toBe("agent:main:cron:a:run:run-123");
const onlyB = await readCronRunLogEntries(logPathB, {
const onlyB = await readCronRunLogEntries({
storePath,
limit: 10,
jobId: "b",
});
expect(onlyB[0]?.summary).toBe("oops");
const wrongFilter = await readCronRunLogEntries(logPathA, {
limit: 10,
jobId: "b",
});
expect(wrongFilter).toStrictEqual([]);
expect(await readCronRunLogEntries({ storePath, limit: 10, jobId: "missing" })).toStrictEqual(
[],
);
});
});
it("filters run-log pages by runId", async () => {
await withRunLogDir("openclaw-cron-log-runid-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");
const storePath = storePathForDir(dir);
await appendCronRunLog(logPath, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
runId: "manual:job-1:1:0",
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
runId: "manual:job-1:1:0",
},
});
await appendCronRunLog(logPath, {
ts: 2,
jobId: "job-1",
action: "finished",
status: "ok",
runId: "manual:job-1:2:0",
await appendCronRunLog({
storePath,
entry: {
ts: 2,
jobId: "job-1",
action: "finished",
status: "ok",
runId: "manual:job-1:2:0",
},
});
const page = await readCronRunLogEntriesPage(logPath, {
const page = await readCronRunLogEntriesPage({
storePath,
jobId: "job-1",
runId: "manual:job-1:2:0",
limit: 10,
});
@@ -280,8 +298,9 @@ describe("cron run log", () => {
"utf-8",
);
await migrateLegacyCronRunLogsToSqlite(path.join(dir, "jobs.json"));
const entries = await readCronRunLogEntries(logPath, { limit: 10, jobId: "job-1" });
const storePath = storePathForDir(dir);
await migrateLegacyCronRunLogsToSqlite(storePath);
const entries = await readCronRunLogEntries({ storePath, limit: 10, jobId: "job-1" });
expect(entries).toHaveLength(1);
expect(entries[0]?.ts).toBe(2);
expect(entries[0]?.delivered).toBe(true);
@@ -322,10 +341,12 @@ describe("cron run log", () => {
"utf-8",
);
await migrateLegacyCronRunLogsToSqlite(path.join(dir, "jobs.json"));
const storePath = storePathForDir(dir);
await migrateLegacyCronRunLogsToSqlite(storePath);
expect(
(
await readCronRunLogEntriesPage(logPath, {
await readCronRunLogEntriesPage({
storePath,
limit: 10,
jobId: "job-1",
query: "telegram",
@@ -334,7 +355,8 @@ describe("cron run log", () => {
).toHaveLength(1);
expect(
(
await readCronRunLogEntriesPage(logPath, {
await readCronRunLogEntriesPage({
storePath,
limit: 10,
jobId: "job-1",
query: "-100",
@@ -346,28 +368,31 @@ describe("cron run log", () => {
it("reads and searches run diagnostics", async () => {
await withRunLogDir("openclaw-cron-log-diagnostics-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");
const storePath = storePathForDir(dir);
await appendCronRunLog(logPath, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
diagnostics: {
summary: "exec stderr tail",
entries: [
{
ts: 1,
source: "exec",
severity: "error",
message: "exec stderr tail",
exitCode: 2,
},
],
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "error",
diagnostics: {
summary: "exec stderr tail",
entries: [
{
ts: 1,
source: "exec",
severity: "error",
message: "exec stderr tail",
exitCode: 2,
},
],
},
},
});
const entries = await readCronRunLogEntries(logPath, { limit: 10, jobId: "job-1" });
const entries = await readCronRunLogEntries({ storePath, limit: 10, jobId: "job-1" });
expect(entries[0]?.diagnostics?.summary).toBe("exec stderr tail");
expect(entries[0]?.diagnostics?.entries).toHaveLength(1);
expect(entries[0]?.diagnostics?.entries[0]?.source).toBe("exec");
@@ -376,7 +401,8 @@ describe("cron run log", () => {
expect(entries[0]?.diagnostics?.entries[0]?.exitCode).toBe(2);
expect(
(
await readCronRunLogEntriesPage(logPath, {
await readCronRunLogEntriesPage({
storePath,
limit: 10,
jobId: "job-1",
query: "stderr tail",
@@ -388,21 +414,25 @@ describe("cron run log", () => {
it("reads telemetry fields", async () => {
await withRunLogDir("openclaw-cron-log-telemetry-", async (dir) => {
const storePath = storePathForDir(dir);
const logPath = path.join(dir, "runs", "job-1.jsonl");
await appendCronRunLog(logPath, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
model: "gpt-5.4",
provider: "openai",
usage: {
input_tokens: 10,
output_tokens: 5,
total_tokens: 15,
cache_read_tokens: 2,
cache_write_tokens: 1,
await appendCronRunLog({
storePath,
entry: {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
model: "gpt-5.4",
provider: "openai",
usage: {
input_tokens: 10,
output_tokens: 5,
total_tokens: 15,
cache_read_tokens: 2,
cache_write_tokens: 1,
},
},
});
@@ -421,8 +451,8 @@ describe("cron run log", () => {
"utf-8",
);
await migrateLegacyCronRunLogsToSqlite(path.join(dir, "jobs.json"));
const entries = await readCronRunLogEntries(logPath, { limit: 10, jobId: "job-1" });
await migrateLegacyCronRunLogsToSqlite(storePath);
const entries = await readCronRunLogEntries({ storePath, limit: 10, jobId: "job-1" });
expect(entries[0]?.model).toBe("gpt-5.4");
expect(entries[0]?.provider).toBe("openai");
expect(entries[0]?.usage).toEqual({
@@ -440,12 +470,14 @@ describe("cron run log", () => {
it("cleans up pending-write bookkeeping after appends complete", async () => {
await withRunLogDir("openclaw-cron-log-pending-", async (dir) => {
const logPath = path.join(dir, "runs", "job-cleanup.jsonl");
await appendCronRunLog(logPath, {
ts: 1,
jobId: "job-cleanup",
action: "finished",
status: "ok",
await appendCronRunLog({
storePath: storePathForDir(dir),
entry: {
ts: 1,
jobId: "job-cleanup",
action: "finished",
status: "ok",
},
});
expect(getPendingCronRunLogWriteCountForTests()).toBe(0);
@@ -454,21 +486,24 @@ describe("cron run log", () => {
it("read drains pending fire-and-forget writes", async () => {
await withRunLogDir("openclaw-cron-log-drain-", async (dir) => {
const logPath = path.join(dir, "runs", "job-drain.jsonl");
const storePath = storePathForDir(dir);
// Fire-and-forget write (simulates the `void appendCronRunLog(...)` pattern
// in server-cron.ts). Do NOT await.
const writePromise = appendCronRunLog(logPath, {
ts: 42,
jobId: "job-drain",
action: "finished",
status: "ok",
summary: "drain-test",
const writePromise = appendCronRunLog({
storePath,
entry: {
ts: 42,
jobId: "job-drain",
action: "finished",
status: "ok",
summary: "drain-test",
},
});
void writePromise.catch(() => undefined);
// Read should see the entry because it drains pending writes.
const entries = await readCronRunLogEntries(logPath, { limit: 10 });
const entries = await readCronRunLogEntries({ storePath, jobId: "job-drain", limit: 10 });
expect(entries).toHaveLength(1);
expect(entries[0]?.ts).toBe(42);
expect(entries[0]?.summary).toBe("drain-test");

View File

@@ -7,7 +7,6 @@ import type { FailoverReason } from "../agents/embedded-agent-helpers/types.js";
import { resolveFailoverReasonFromError } from "../agents/failover-error.js";
import { parseByteSize } from "../cli/parse-bytes.js";
import type { CronConfig } from "../config/types.cron.js";
import { isPathInside } from "../infra/fs-safe.js";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
import {
normalizeLowercaseStringOrEmpty,
@@ -82,6 +81,10 @@ type ReadCronRunLogAllPageOptions = Omit<ReadCronRunLogPageOptions, "jobId"> & {
jobNameById?: Record<string, string>;
};
type AppendCronRunLogOptions = {
keepLines?: number;
};
type CronRunLogsTable = OpenClawStateKyselyDatabase["cron_run_logs"];
type CronRunLogDatabase = Pick<OpenClawStateKyselyDatabase, "cron_run_logs">;
type CronRunLogRow = Selectable<CronRunLogsTable>;
@@ -105,8 +108,8 @@ const CRON_FAILOVER_REASONS = new Set<FailoverReason>([
]);
const LEGACY_CRON_RUN_LOG_ARCHIVE_SUFFIX = ".migrated";
const INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE = "invalid cron run log job id";
type CronRunLogTarget = { storePath: string; jobId: string; strictJobId: boolean };
const runLogTargetsByPath = new Map<string, CronRunLogTarget>();
function normalizeCronRunLogErrorReason(value: unknown): FailoverReason | undefined {
return typeof value === "string" && CRON_FAILOVER_REASONS.has(value as FailoverReason)
@@ -117,28 +120,19 @@ function normalizeCronRunLogErrorReason(value: unknown): FailoverReason | undefi
function assertSafeCronRunLogJobId(jobId: string): string {
const trimmed = jobId.trim();
if (!trimmed) {
throw new Error("invalid cron run log job id");
throw new Error(INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE);
}
if (trimmed.includes("/") || trimmed.includes("\\") || trimmed.includes("\0")) {
throw new Error("invalid cron run log job id");
throw new Error(INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE);
}
return trimmed;
}
export function resolveCronRunLogPath(params: { storePath: string; jobId: string }) {
const storePath = path.resolve(params.storePath);
const dir = path.dirname(storePath);
const runsDir = path.resolve(dir, "runs");
const safeJobId = assertSafeCronRunLogJobId(params.jobId);
const resolvedPath = path.resolve(runsDir, `${safeJobId}.jsonl`);
if (!isPathInside(runsDir, resolvedPath)) {
throw new Error("invalid cron run log job id");
}
runLogTargetsByPath.set(resolvedPath, { storePath, jobId: safeJobId, strictJobId: true });
return resolvedPath;
export function isInvalidCronRunLogJobIdError(err: unknown): boolean {
return err instanceof Error && err.message === INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE;
}
const writesByPath = new Map<string, Promise<void>>();
const writesByTarget = new Map<string, Promise<void>>();
export const DEFAULT_CRON_RUN_LOG_MAX_BYTES = 2_000_000;
export const DEFAULT_CRON_RUN_LOG_KEEP_LINES = 2_000;
@@ -168,15 +162,23 @@ export function resolveCronRunLogPruneOptions(cfg?: CronConfig["runLog"]): {
}
export function getPendingCronRunLogWriteCountForTests() {
return writesByPath.size;
return writesByTarget.size;
}
async function drainPendingWrite(filePath: string): Promise<void> {
const resolved = path.resolve(filePath);
const pending = writesByPath.get(resolved);
if (pending) {
await pending.catch(() => undefined);
function cronRunLogWriteKey(storePath: string, jobId?: string): string {
return `${cronStoreKey(storePath)}\0${jobId ?? ""}`;
}
async function drainPendingWrite(storePath: string, jobId?: string): Promise<void> {
if (jobId) {
await writesByTarget.get(cronRunLogWriteKey(storePath, jobId))?.catch(() => undefined);
return;
}
const storePrefix = `${cronStoreKey(storePath)}\0`;
const pending = [...writesByTarget.entries()]
.filter(([key]) => key.startsWith(storePrefix))
.map(([, write]) => write.catch(() => undefined));
await Promise.all(pending);
}
function cronStoreKey(storePath: string): string {
@@ -187,20 +189,6 @@ function getCronRunLogKysely(db: DatabaseSync) {
return getNodeSqliteKysely<CronRunLogDatabase>(db);
}
function inferCronRunLogTarget(filePath: string, jobId?: string): CronRunLogTarget {
const resolved = path.resolve(filePath);
const known = runLogTargetsByPath.get(resolved);
if (known && (!jobId || known.jobId === jobId)) {
return known;
}
const inferredJobId = assertSafeCronRunLogJobId(jobId ?? path.basename(resolved, ".jsonl"));
const parentDir = path.dirname(resolved);
const isRunsDir = path.basename(parentDir) === "runs";
const storeDir = isRunsDir ? path.dirname(parentDir) : parentDir;
const storePath = path.resolve(storeDir, "jobs.json");
return { storePath, jobId: inferredJobId, strictJobId: isRunsDir };
}
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
@@ -286,6 +274,74 @@ function readCronRunLogRows(db: DatabaseSync, storeKey: string, jobId?: string):
return executeSqliteQuerySync(db, query.orderBy("ts", "asc").orderBy("seq", "asc")).rows;
}
function buildRunLogWhereClause(params: {
storeKey: string;
jobId?: string;
statuses: CronRunStatus[] | null;
deliveryStatuses: CronDeliveryStatus[] | null;
runId?: string;
}): { whereSql: string; values: Array<string | number> } {
const clauses = ["store_key = ?"];
const values: Array<string | number> = [params.storeKey];
if (params.jobId) {
clauses.push("job_id = ?");
values.push(params.jobId);
}
if (params.statuses?.length) {
clauses.push(`status IN (${params.statuses.map(() => "?").join(", ")})`);
values.push(...params.statuses);
}
if (params.deliveryStatuses?.length) {
clauses.push(
`COALESCE(delivery_status, 'not-requested') IN (${params.deliveryStatuses
.map(() => "?")
.join(", ")})`,
);
values.push(...params.deliveryStatuses);
}
const runId = normalizeOptionalString(params.runId);
if (runId) {
clauses.push("run_id = ?");
values.push(runId);
}
return { whereSql: clauses.join(" AND "), values };
}
function countCronRunLogRows(
db: DatabaseSync,
whereSql: string,
values: Array<string | number>,
): number {
const row = db
.prepare(`SELECT COUNT(*) AS count FROM cron_run_logs WHERE ${whereSql}`)
.get(...values) as { count?: number | bigint } | undefined;
return normalizeNumber(row?.count ?? null) ?? 0;
}
function readCronRunLogRowsPage(params: {
db: DatabaseSync;
storeKey: string;
jobId?: string;
statuses: CronRunStatus[] | null;
deliveryStatuses: CronDeliveryStatus[] | null;
runId?: string;
sortDir: CronRunLogSortDir;
offset?: number;
limit?: number;
}): CronRunLogRow[] {
const { whereSql, values } = buildRunLogWhereClause(params);
const order = params.sortDir === "asc" ? "ASC" : "DESC";
const limitSql =
params.limit === undefined || params.offset === undefined ? "" : " LIMIT ? OFFSET ?";
const limitValues =
params.limit === undefined || params.offset === undefined ? [] : [params.limit, params.offset];
return params.db
.prepare(
`SELECT * FROM cron_run_logs WHERE ${whereSql} ORDER BY ts ${order}, seq ${order}${limitSql}`,
)
.all(...values, ...limitValues) as CronRunLogRow[];
}
function nextCronRunLogSeq(db: DatabaseSync, storeKey: string, jobId: string): number {
const row = db
.prepare(
@@ -387,45 +443,47 @@ function archiveLegacyCronRunLogSync(filePath: string): void {
}
}
export async function appendCronRunLog(
filePath: string,
entry: CronRunLogEntry,
opts?: { maxBytes?: number; keepLines?: number },
) {
const resolved = path.resolve(filePath);
const prev = writesByPath.get(resolved) ?? Promise.resolve();
export async function appendCronRunLog(params: {
storePath: string;
entry: CronRunLogEntry;
opts?: AppendCronRunLogOptions;
}) {
const storeKey = cronStoreKey(params.storePath);
const writeKey = cronRunLogWriteKey(params.storePath, params.entry.jobId);
const prev = writesByTarget.get(writeKey) ?? Promise.resolve();
const next = prev
.catch(() => undefined)
.then(async () => {
const target = inferCronRunLogTarget(resolved, entry.jobId);
runOpenClawStateWriteTransaction(({ db }) => {
insertCronRunLogEntry(db, cronStoreKey(target.storePath), entry);
insertCronRunLogEntry(db, storeKey, params.entry);
pruneCronRunLogRows(
db,
cronStoreKey(target.storePath),
entry.jobId,
opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES,
storeKey,
params.entry.jobId,
params.opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES,
);
});
});
writesByPath.set(resolved, next);
writesByTarget.set(writeKey, next);
try {
await next;
} finally {
if (writesByPath.get(resolved) === next) {
writesByPath.delete(resolved);
if (writesByTarget.get(writeKey) === next) {
writesByTarget.delete(writeKey);
}
}
}
export async function readCronRunLogEntries(
filePath: string,
opts?: { limit?: number; jobId?: string },
): Promise<CronRunLogEntry[]> {
await drainPendingWrite(filePath);
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
const page = await readCronRunLogEntriesPage(filePath, {
jobId: opts?.jobId,
export async function readCronRunLogEntries(params: {
storePath: string;
jobId?: string;
limit?: number;
}): Promise<CronRunLogEntry[]> {
await drainPendingWrite(params.storePath, params.jobId);
const limit = Math.max(1, Math.min(5000, Math.floor(params.limit ?? 200)));
const page = await readCronRunLogEntriesPage({
storePath: params.storePath,
jobId: params.jobId,
limit,
offset: 0,
status: "all",
@@ -434,22 +492,18 @@ export async function readCronRunLogEntries(
return page.entries.toReversed();
}
export function readCronRunLogEntriesSync(
filePath: string,
opts?: { limit?: number; jobId?: string },
): CronRunLogEntry[] {
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
const resolved = path.resolve(filePath);
const target = inferCronRunLogTarget(resolved);
const rows = readCronRunLogRows(
openOpenClawStateDatabase().db,
cronStoreKey(target.storePath),
target.strictJobId ? target.jobId : undefined,
);
export function readCronRunLogEntriesSync(params: {
storePath: string;
jobId?: string;
limit?: number;
}): CronRunLogEntry[] {
const limit = Math.max(1, Math.min(5000, Math.floor(params.limit ?? 200)));
const storeKey = cronStoreKey(params.storePath);
const jobId = params.jobId ? assertSafeCronRunLogJobId(params.jobId) : undefined;
const rows = readCronRunLogRows(openOpenClawStateDatabase().db, storeKey, jobId);
return rows
.map(parseStoredRunLogEntry)
.filter((entry): entry is CronRunLogEntry => entry !== null)
.filter((entry) => !opts?.jobId || entry.jobId === opts.jobId)
.slice(-limit);
}
@@ -665,76 +719,76 @@ function filterRunLogEntries(
}
export async function readCronRunLogEntriesPage(
filePath: string,
opts?: ReadCronRunLogPageOptions,
): Promise<CronRunLogPageResult> {
await drainPendingWrite(filePath);
const limit = Math.max(1, Math.min(200, Math.floor(opts?.limit ?? 50)));
const resolved = path.resolve(filePath);
const target = inferCronRunLogTarget(resolved);
const statuses = normalizeRunStatuses(opts);
const deliveryStatuses = normalizeDeliveryStatuses(opts);
const query = normalizeLowercaseStringOrEmpty(opts?.query);
const sortDir: CronRunLogSortDir = opts?.sortDir === "asc" ? "asc" : "desc";
const all = readCronRunLogRows(
openOpenClawStateDatabase().db,
cronStoreKey(target.storePath),
target.strictJobId ? target.jobId : undefined,
)
.map(parseStoredRunLogEntry)
.filter((entry): entry is CronRunLogEntry => entry !== null)
.filter((entry) => !opts?.jobId || entry.jobId === opts.jobId);
const filtered = filterRunLogEntries(all, {
runId: opts?.runId,
statuses,
deliveryStatuses,
query,
queryTextForEntry: (entry) =>
[
entry.summary ?? "",
entry.error ?? "",
entry.errorReason ?? "",
entry.diagnostics?.summary ?? "",
...(entry.diagnostics?.entries ?? []).map((diagnostic) => diagnostic.message),
entry.jobId,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.resolved?.channel ?? "",
...(entry.delivery?.messageToolSentTo ?? []).map((target) => target.channel),
].join(" "),
});
const sorted =
sortDir === "asc"
? filtered.toSorted((a, b) => a.ts - b.ts)
: filtered.toSorted((a, b) => b.ts - a.ts);
const total = sorted.length;
const offset = Math.max(0, Math.min(total, Math.floor(opts?.offset ?? 0)));
const entries = sorted.slice(offset, offset + limit);
const nextOffset = offset + entries.length;
return {
entries,
total,
offset,
limit,
hasMore: nextOffset < total,
nextOffset: nextOffset < total ? nextOffset : null,
};
}
export async function readCronRunLogEntriesPageAll(
opts: ReadCronRunLogAllPageOptions,
opts: ReadCronRunLogPageOptions & { storePath: string; jobNameById?: Record<string, string> },
): Promise<CronRunLogPageResult> {
const jobId = opts.jobId ? assertSafeCronRunLogJobId(opts.jobId) : undefined;
await drainPendingWrite(opts.storePath, jobId);
const limit = Math.max(1, Math.min(200, Math.floor(opts.limit ?? 50)));
const statuses = normalizeRunStatuses(opts);
const deliveryStatuses = normalizeDeliveryStatuses(opts);
const query = normalizeLowercaseStringOrEmpty(opts.query);
const sortDir: CronRunLogSortDir = opts.sortDir === "asc" ? "asc" : "desc";
const all = readCronRunLogRows(openOpenClawStateDatabase().db, cronStoreKey(opts.storePath))
const db = openOpenClawStateDatabase().db;
const storeKey = cronStoreKey(opts.storePath);
const offset = Math.max(0, Math.floor(opts.offset ?? 0));
if (!query) {
const { whereSql, values } = buildRunLogWhereClause({
storeKey,
jobId,
statuses,
deliveryStatuses,
runId: opts.runId,
});
const total = countCronRunLogRows(db, whereSql, values);
const boundedOffset = Math.min(total, offset);
const entries = readCronRunLogRowsPage({
db,
storeKey,
jobId,
statuses,
deliveryStatuses,
runId: opts.runId,
sortDir,
offset: boundedOffset,
limit,
})
.map(parseStoredRunLogEntry)
.filter((entry): entry is CronRunLogEntry => entry !== null);
if (opts.jobNameById) {
for (const entry of entries) {
const jobName = opts.jobNameById[entry.jobId];
if (jobName) {
(entry as CronRunLogEntry & { jobName?: string }).jobName = jobName;
}
}
}
const nextOffset = boundedOffset + entries.length;
return {
entries,
total,
offset: boundedOffset,
limit,
hasMore: nextOffset < total,
nextOffset: nextOffset < total ? nextOffset : null,
};
}
const all = readCronRunLogRowsPage({
db,
storeKey,
jobId,
statuses,
deliveryStatuses,
runId: opts.runId,
sortDir,
})
.map(parseStoredRunLogEntry)
.filter((entry): entry is CronRunLogEntry => entry !== null);
const filtered = filterRunLogEntries(all, {
runId: opts.runId,
statuses,
deliveryStatuses,
statuses: null,
deliveryStatuses: null,
query,
queryTextForEntry: (entry) => {
const jobName = opts.jobNameById?.[entry.jobId] ?? "";
@@ -757,8 +811,8 @@ export async function readCronRunLogEntriesPageAll(
? filtered.toSorted((a, b) => a.ts - b.ts)
: filtered.toSorted((a, b) => b.ts - a.ts);
const total = sorted.length;
const offset = Math.max(0, Math.min(total, Math.floor(opts.offset ?? 0)));
const entries = sorted.slice(offset, offset + limit);
const boundedOffset = Math.min(total, offset);
const entries = sorted.slice(boundedOffset, boundedOffset + limit);
if (opts.jobNameById) {
for (const entry of entries) {
const jobName = opts.jobNameById[entry.jobId];
@@ -767,17 +821,23 @@ export async function readCronRunLogEntriesPageAll(
}
}
}
const nextOffset = offset + entries.length;
const nextOffset = boundedOffset + entries.length;
return {
entries,
total,
offset,
offset: boundedOffset,
limit,
hasMore: nextOffset < total,
nextOffset: nextOffset < total ? nextOffset : null,
};
}
export async function readCronRunLogEntriesPageAll(
opts: ReadCronRunLogAllPageOptions,
): Promise<CronRunLogPageResult> {
return readCronRunLogEntriesPage(opts);
}
export async function migrateLegacyCronRunLogsToSqlite(
storePath: string,
): Promise<{ importedFiles: number }> {
@@ -789,7 +849,7 @@ export async function migrateLegacyCronRunLogsToSqlite(
for (const file of jsonlFiles) {
const jobId = path.basename(file.name, ".jsonl");
const logPath = path.join(runsDir, file.name);
await drainPendingWrite(logPath);
await drainPendingWrite(resolvedStorePath, jobId);
await importLegacyCronRunLog(logPath, {
storePath: resolvedStorePath,
jobId,

View File

@@ -236,7 +236,6 @@ export function createMockCronStateForJobs(params: {
storeLoadedAtMs: nowMs,
op: Promise.resolve(),
warnedDisabled: false,
warnedMissingSessionTargetJobIds: new Set<string>(),
warnedInvalidPersistedJobKeys: new Set<string>(),
pendingQuarantineConfigJobs: [],
lastQuarantineFailureWarnKey: null,

View File

@@ -158,12 +158,6 @@ export type CronServiceState = {
running: boolean;
op: Promise<unknown>;
warnedDisabled: boolean;
/**
* Job ids whose missing `sessionTarget` was defaulted at load and warned
* about. Used to suppress duplicate warns across forceReload ticks so a
* single broken job does not spam the log on every scheduler cycle.
*/
warnedMissingSessionTargetJobIds: Set<string>;
/**
* Persisted job rows with non-canonical storage shape are skipped in memory
* until the runtime can quarantine and sanitize the active store.
@@ -182,7 +176,6 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState
running: false,
op: Promise.resolve(),
warnedDisabled: false,
warnedMissingSessionTargetJobIds: new Set<string>(),
warnedInvalidPersistedJobKeys: new Set<string>(),
pendingQuarantineConfigJobs: [],
lastQuarantineFailureWarnKey: null,

View File

@@ -108,7 +108,7 @@ export async function ensureLoaded(
const rawConfigJob = loaded.configJobs[index] ?? structuredClone(raw);
const sourceIndex = loaded.configJobIndexes[index] ?? index;
const runtimeEntry = loaded.configJobRuntimeEntries[index];
const { legacyJobIdIssue } = normalizeCronJobIdentityFields(raw);
normalizeCronJobIdentityFields(raw);
let normalized: Record<string, unknown> | null;
try {
normalized = normalizeCronJobInput(raw);
@@ -149,56 +149,7 @@ export async function ensureLoaded(
continue;
}
jobs.push(hydrated);
if (legacyJobIdIssue) {
const resolvedId = typeof hydrated.id === "string" ? hydrated.id : undefined;
state.deps.log.warn(
{ storePath: state.deps.storePath, jobId: resolvedId },
"cron: job used legacy jobId field; normalized id in memory (run openclaw doctor --fix to persist canonical shape)",
);
}
// Persisted legacy jobs may predate the required `enabled` field.
// Keep runtime behavior backward-compatible without rewriting the store.
if (typeof hydrated.enabled !== "boolean") {
hydrated.enabled = true;
}
invalidateStaleNextRunOnScheduleChange({ previousJobsById, hydrated });
// Same shape: persisted jobs missing `sessionTarget` crash downstream
// on any code path that dereferences `.startsWith` (e.g.
// `runIsolatedAgentJob` in `src/gateway/server-cron.ts`). Mirror the
// defaulter applied at create time: systemEvent payloads -> "main",
// agentTurn -> "isolated". Use `Object.hasOwn` rather than `in` so a
// poisoned prototype cannot feed a crafted `kind` into the defaulter.
if (typeof hydrated.sessionTarget !== "string") {
const payload = hydrated.payload as unknown;
const payloadKind =
payload &&
typeof payload === "object" &&
!Array.isArray(payload) &&
Object.hasOwn(payload, "kind")
? (payload as { kind?: unknown }).kind
: undefined;
let defaulted: "main" | "isolated" | undefined;
if (payloadKind === "systemEvent") {
defaulted = "main";
} else if (payloadKind === "agentTurn") {
defaulted = "isolated";
}
if (defaulted) {
hydrated.sessionTarget = defaulted;
// `ensureLoaded` is called with `forceReload: true` on every tick;
// warn once per jobId per process to avoid log spam on repeated
// loads of the same still-broken store file.
const jobId = typeof hydrated.id === "string" ? hydrated.id : undefined;
const dedupeKey = jobId ?? "<unknown>";
if (!state.warnedMissingSessionTargetJobIds.has(dedupeKey)) {
state.warnedMissingSessionTargetJobIds.add(dedupeKey);
state.deps.log.warn(
{ storePath: state.deps.storePath, jobId, defaulted },
"cron: job missing sessionTarget; defaulted in memory (run openclaw doctor --fix to persist canonical shape)",
);
}
}
}
}
state.store = {
version: 1,
@@ -251,10 +202,7 @@ export function warnIfDisabled(state: CronServiceState, action: string) {
);
}
export async function persist(
state: CronServiceState,
opts?: { skipBackup?: boolean; stateOnly?: boolean },
) {
export async function persist(state: CronServiceState, opts?: { stateOnly?: boolean }) {
if (!state.store) {
return;
}
@@ -266,6 +214,9 @@ export async function persist(
}
flushedPendingQuarantine = true;
}
const saveOpts = flushedPendingQuarantine ? { skipBackup: opts?.skipBackup } : opts;
await saveCronStore(state.deps.storePath, state.store, saveOpts);
await saveCronStore(
state.deps.storePath,
state.store,
flushedPendingQuarantine ? undefined : opts,
);
}

View File

@@ -2,10 +2,12 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js";
import {
archiveLegacyCronStoreForMigration,
loadLegacyCronStoreForMigration,
} from "../commands/doctor-cron-legacy-store-migration.js";
import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js";
import {
loadCronQuarantineFile,
loadCronStore,
loadCronStoreSync,

View File

@@ -28,12 +28,6 @@ import type {
CronStoreFile,
} from "./types.js";
type SerializedStoreCacheEntry = {
configJson?: string;
stateJson?: string;
needsSplitMigration: boolean;
};
export type QuarantinedCronConfigJob = {
sourceIndex: number;
reason: string;
@@ -57,8 +51,6 @@ export type LoadedCronStore = {
invalidConfigRows: QuarantinedCronConfigJob[];
};
const serializedStoreCache = new Map<string, SerializedStoreCacheEntry>();
function resolveDefaultCronDir(): string {
return path.join(resolveConfigDir(), "cron");
}
@@ -67,13 +59,6 @@ function resolveDefaultCronStorePath(): string {
return path.join(resolveDefaultCronDir(), "jobs.json");
}
function resolveStatePath(storePath: string): string {
if (storePath.endsWith(".json")) {
return storePath.replace(/\.json$/, "-state.json");
}
return `${storePath}-state.json`;
}
export function resolveCronQuarantinePath(storePath: string): string {
if (storePath.endsWith(".json")) {
return storePath.replace(/\.json$/, "-quarantine.json");
@@ -89,39 +74,11 @@ type CronStateFileEntry = {
export type CronConfigJobRuntimeEntry = CronStateFileEntry;
type CronStateFile = {
version: 1;
jobs: Record<string, CronStateFileEntry>;
};
type CronJobsTable = OpenClawStateKyselyDatabase["cron_jobs"];
type CronStoreDatabase = Pick<OpenClawStateKyselyDatabase, "cron_jobs">;
type CronJobRow = Selectable<CronJobsTable>;
type CronJobInsert = Insertable<CronJobsTable>;
const LEGACY_CRON_ARCHIVE_SUFFIX = ".migrated";
function parseCronStateFile(raw: string): CronStateFile | null {
try {
const parsed = parseJsonWithJson5Fallback(raw);
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;
}
const record = parsed as Record<string, unknown>;
if (
record.version !== 1 ||
typeof record.jobs !== "object" ||
record.jobs === null ||
Array.isArray(record.jobs)
) {
return null;
}
return { version: 1, jobs: record.jobs as Record<string, CronStateFileEntry> };
} catch {
return null;
}
}
function cronStoreKey(storePath: string): string {
return path.resolve(storePath);
}
@@ -749,79 +706,11 @@ function loadedCronStoreFromRows(rows: CronJobRow[]): LoadedCronStore {
};
}
async function legacyCronFileExists(filePath: string): Promise<boolean> {
return fs.promises
.access(filePath, fs.constants.F_OK)
.then(() => true)
.catch(() => false);
}
async function archiveLegacyCronFile(filePath: string): Promise<void> {
if (!(await legacyCronFileExists(filePath))) {
return;
}
const archivePath = `${filePath}${LEGACY_CRON_ARCHIVE_SUFFIX}`;
if (await legacyCronFileExists(archivePath)) {
return;
}
await fs.promises.rename(filePath, archivePath).catch(() => undefined);
}
async function archiveLegacyCronStoreFiles(storePath: string): Promise<void> {
await Promise.all([
archiveLegacyCronFile(storePath),
archiveLegacyCronFile(resolveStatePath(storePath)),
]);
}
export async function legacyCronStoreFilesExist(storePath: string): Promise<boolean> {
return (
(await legacyCronFileExists(path.resolve(storePath))) ||
(await legacyCronFileExists(resolveStatePath(path.resolve(storePath))))
);
}
export async function archiveLegacyCronStoreForMigration(storePath: string): Promise<void> {
await archiveLegacyCronStoreFiles(path.resolve(storePath));
}
function getRawCronJobs(parsed: unknown): unknown[] {
return Array.isArray(parsed)
? parsed
: isRecord(parsed) && Array.isArray(parsed.jobs)
? parsed.jobs
: [];
}
function cloneConfigJobs(jobs: Array<Record<string, unknown>>): Array<Record<string, unknown>> {
return jobs.map((job) => structuredClone(job));
}
function stripJobRuntimeFields(job: CronStoreFile["jobs"][number]): Record<string, unknown> {
const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job;
return { ...rest, state: {} };
}
function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown {
const jobs = store.jobs.map(stripJobRuntimeFields);
return {
version: store.version,
jobs,
};
}
function extractStateFile(store: CronStoreFile): CronStateFile {
const jobs: Record<string, CronStateFileEntry> = {};
for (const job of store.jobs) {
jobs[job.id] = {
updatedAtMs: job.updatedAtMs,
scheduleIdentity: tryCronScheduleIdentity(job as unknown as Record<string, unknown>),
state: job.state ?? {},
};
}
return { version: 1, jobs };
}
export function resolveCronStorePath(storePath?: string) {
if (storePath?.trim()) {
const raw = storePath.trim();
@@ -833,166 +722,6 @@ export function resolveCronStorePath(storePath?: string) {
return resolveDefaultCronStorePath();
}
async function loadStateFile(statePath: string): Promise<CronStateFile | null> {
let raw: string;
try {
raw = await fs.promises.readFile(statePath, "utf-8");
} catch (err) {
if ((err as { code?: unknown })?.code === "ENOENT") {
return null;
}
throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, {
cause: err,
});
}
return parseCronStateFile(raw);
}
function hasInlineState(jobs: Array<Record<string, unknown> | null | undefined>): boolean {
return jobs.some(
(job) => job != null && isRecord(job.state) && Object.keys(job.state).length > 0,
);
}
function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void {
if (!isRecord(job.state)) {
job.state = {} as never;
}
}
function backfillMissingRuntimeFields(job: CronStoreFile["jobs"][number]): void {
ensureJobStateObject(job);
if (typeof job.updatedAtMs !== "number") {
job.updatedAtMs = typeof job.createdAtMs === "number" ? job.createdAtMs : Date.now();
}
}
function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unknown): number {
if (typeof updatedAtMs === "number" && Number.isFinite(updatedAtMs)) {
return updatedAtMs;
}
if (typeof job.updatedAtMs === "number" && Number.isFinite(job.updatedAtMs)) {
return job.updatedAtMs;
}
return typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs)
? job.createdAtMs
: Date.now();
}
function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown): void {
if (!isRecord(entry)) {
backfillMissingRuntimeFields(job);
return;
}
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
job.state = isRecord(entry.state) ? (entry.state as never) : ({} as never);
if (
typeof entry.scheduleIdentity === "string" &&
entry.scheduleIdentity !== tryCronScheduleIdentity(job as unknown as Record<string, unknown>)
) {
ensureJobStateObject(job);
job.state.nextRunAtMs = undefined;
}
}
function resolveCronStateId(job: Record<string, unknown>): string | undefined {
return normalizeOptionalString(job.id) ?? normalizeOptionalString(job.jobId);
}
async function loadLegacyCronStoreWithConfigJobs(storePath: string): Promise<LoadedCronStore> {
try {
const raw = await fs.promises.readFile(storePath, "utf-8");
let parsed: unknown;
try {
parsed = parseJsonWithJson5Fallback(raw);
} catch (err) {
throw new Error(`Failed to parse cron store at ${storePath}: ${String(err)}`, {
cause: err,
});
}
const rawJobs = getRawCronJobs(parsed);
const configJobIndexes: number[] = [];
const configRows: Array<Record<string, unknown>> = [];
const configJobRuntimeEntries: CronConfigJobRuntimeEntry[] = [];
const invalidConfigRows: QuarantinedCronConfigJob[] = [];
for (const [index, row] of rawJobs.entries()) {
if (isRecord(row)) {
configJobIndexes.push(index);
configRows.push(row);
} else {
invalidConfigRows.push({
sourceIndex: index,
reason: "non-object-row",
raw: structuredClone(row),
});
}
}
const store: CronStoreFile = {
version: 1,
jobs: configRows as never as CronStoreFile["jobs"],
};
const jobs = store.jobs as unknown as Array<Record<string, unknown>>;
const configJobs = cloneConfigJobs(configRows);
// Load state file and merge.
const statePath = resolveStatePath(storePath);
const stateFile = await loadStateFile(statePath);
const hasLegacyInlineState = !stateFile && hasInlineState(jobs);
if (stateFile) {
// State file exists: merge state by job ID. Inline state in jobs.json is ignored.
for (const job of store.jobs) {
const stateId = resolveCronStateId(job as unknown as Record<string, unknown>);
const entry = stateId ? stateFile.jobs[stateId] : undefined;
configJobRuntimeEntries.push(isRecord(entry) ? structuredClone(entry) : {});
if (entry) {
mergeStateFileEntry(job, entry);
} else {
backfillMissingRuntimeFields(job);
}
}
} else if (!hasLegacyInlineState) {
// No state file, no inline state: fresh clone or first run.
for (const job of store.jobs) {
backfillMissingRuntimeFields(job);
}
}
// else: migration mode — no state file but jobs.json has inline state. Use as-is.
// Ensure every job has a state object (defensive).
for (const job of store.jobs) {
ensureJobStateObject(job);
}
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
const stateJson = JSON.stringify(extractStateFile(store), null, 2);
serializedStoreCache.set(storePath, {
configJson,
stateJson,
needsSplitMigration: hasLegacyInlineState,
});
return { store, configJobs, configJobIndexes, configJobRuntimeEntries, invalidConfigRows };
} catch (err) {
if ((err as { code?: unknown })?.code === "ENOENT") {
serializedStoreCache.delete(storePath);
return {
store: { version: 1, jobs: [] },
configJobs: [],
configJobIndexes: [],
configJobRuntimeEntries: [],
invalidConfigRows: [],
};
}
throw err;
}
}
export async function loadLegacyCronStoreForMigration(storePath: string): Promise<LoadedCronStore> {
return loadLegacyCronStoreWithConfigJobs(path.resolve(storePath));
}
export async function loadCronStoreWithConfigJobs(storePath: string): Promise<LoadedCronStore> {
const resolvedStorePath = path.resolve(storePath);
const storeKey = cronStoreKey(resolvedStorePath);
@@ -1026,7 +755,6 @@ export function loadCronStoreSync(storePath: string): CronStoreFile {
}
type SaveCronStoreOptions = {
skipBackup?: boolean;
stateOnly?: boolean;
};
@@ -1047,7 +775,6 @@ export async function saveCronStore(
store: CronStoreFile,
opts?: SaveCronStoreOptions,
) {
void opts;
const resolvedStorePath = path.resolve(storePath);
const storeKey = cronStoreKey(resolvedStorePath);
if (opts?.stateOnly) {
@@ -1060,7 +787,6 @@ export async function saveCronStore(
runOpenClawStateWriteTransaction(({ db }) => {
replaceCronRows(db, storeKey, store);
});
serializedStoreCache.delete(resolvedStorePath);
}
export async function loadCronQuarantineFile(path: string): Promise<CronQuarantineFile> {

View File

@@ -12,11 +12,7 @@ import { resolveStorePath } from "../config/sessions/paths.js";
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import {
appendCronRunLog,
resolveCronRunLogPath,
resolveCronRunLogPruneOptions,
} from "../cron/run-log.js";
import { appendCronRunLog, resolveCronRunLogPruneOptions } from "../cron/run-log.js";
import type { CronServiceContract } from "../cron/service-contract.js";
import { CronService } from "../cron/service.js";
import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
@@ -471,13 +467,9 @@ export function buildGatewayCronService(params: {
warnedLegacyWebhookJobs,
});
const logPath = resolveCronRunLogPath({
void appendCronRunLog({
storePath,
jobId: evt.jobId,
});
void appendCronRunLog(
logPath,
{
entry: {
ts: Date.now(),
jobId: evt.jobId,
action: "finished",
@@ -500,9 +492,12 @@ export function buildGatewayCronService(params: {
provider: evt.provider,
usage: evt.usage,
},
runLogPrune,
).catch((err) => {
cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed");
opts: { keepLines: runLogPrune.keepLines },
}).catch((err) => {
cronLogger.warn(
{ err: String(err), storePath, jobId: evt.jobId },
"cron: run log append failed",
);
});
}
},

View File

@@ -16,9 +16,9 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveCronDeliveryPreviews } from "../../cron/delivery-preview.js";
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
import {
isInvalidCronRunLogJobIdError,
readCronRunLogEntriesPage,
readCronRunLogEntriesPageAll,
resolveCronRunLogPath,
} from "../../cron/run-log.js";
import { applyJobPatch } from "../../cron/service/jobs.js";
import { isInvalidCronSessionTargetIdError } from "../../cron/session-target.js";
@@ -586,32 +586,30 @@ export const cronHandlers: GatewayRequestHandlers = {
respond(true, page, undefined);
return;
}
let logPath: string;
try {
logPath = resolveCronRunLogPath({
const page = await readCronRunLogEntriesPage({
storePath: context.cronStorePath,
jobId: jobId as string,
limit: p.limit,
offset: p.offset,
statuses: p.statuses,
status: p.status,
runId: p.runId,
deliveryStatuses: p.deliveryStatuses,
deliveryStatus: p.deliveryStatus,
query: p.query,
sortDir: p.sortDir,
});
} catch {
respond(true, page, undefined);
} catch (err) {
if (!isInvalidCronRunLogJobIdError(err)) {
throw err;
}
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.runs params: invalid id"),
);
return;
}
const page = await readCronRunLogEntriesPage(logPath, {
limit: p.limit,
offset: p.offset,
jobId: jobId as string,
statuses: p.statuses,
status: p.status,
runId: p.runId,
deliveryStatuses: p.deliveryStatuses,
deliveryStatus: p.deliveryStatus,
query: p.query,
sortDir: p.sortDir,
});
respond(true, page, undefined);
},
};

View File

@@ -132,7 +132,10 @@ describe("openclaw state database", () => {
process.env["OPENCLAW_STATE_DIR"] = stateDir;
try {
expect(
readCronRunLogEntriesSync(path.join(stateDir, "cron", "runs", "legacy-job.jsonl")),
readCronRunLogEntriesSync({
storePath: path.join(stateDir, "cron", "jobs.json"),
jobId: "legacy-job",
}),
).toMatchObject([{ action: "finished", jobId: "legacy-job", ts: 12345 }]);
} finally {
if (previousStateDir === undefined) {

View File

@@ -170,8 +170,7 @@ function createTaskRegistryMaintenanceHarness(params: {
isCronRuntimeAuthoritative: () => params.cronRuntimeAuthoritative ?? true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => params.cronStore ?? { version: 1, jobs: [] },
resolveCronRunLogPath: ({ jobId }) => jobId,
readCronRunLogEntriesSync: (jobId) => cronRunLogEntries[jobId] ?? [],
readCronRunLogEntriesSync: ({ jobId }) => (jobId ? (cronRunLogEntries[jobId] ?? []) : []),
};
setTaskRegistryMaintenanceRuntimeForTests(runtime);

View File

@@ -12,7 +12,7 @@ import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import type { SessionEntry } from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { isCronJobActive } from "../cron/active-jobs.js";
import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js";
import { readCronRunLogEntriesSync } from "../cron/run-log.js";
import type { CronRunLogEntry } from "../cron/run-log.js";
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
import type { CronJob, CronStoreFile } from "../cron/types.js";
@@ -114,7 +114,6 @@ type TaskRegistryMaintenanceRuntime = {
isCronRuntimeAuthoritative: () => boolean;
resolveCronStorePath: typeof resolveCronStorePath;
loadCronStoreSync: typeof loadCronStoreSync;
resolveCronRunLogPath: typeof resolveCronRunLogPath;
readCronRunLogEntriesSync: typeof readCronRunLogEntriesSync;
};
@@ -154,7 +153,6 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
isCronRuntimeAuthoritative: () => configuredCronRuntimeAuthoritative,
resolveCronStorePath: () => configuredCronStorePath ?? resolveCronStorePath(),
loadCronStoreSync,
resolveCronRunLogPath,
readCronRunLogEntriesSync,
};
@@ -367,12 +365,9 @@ function getCronRunLogEntries(context: CronRecoveryContext, jobId: string): Cron
}
let entries: CronRunLogEntry[] = [];
try {
const logPath = taskRegistryMaintenanceRuntime.resolveCronRunLogPath({
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync({
storePath: context.storePath,
jobId,
});
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(logPath, {
jobId,
limit: 5000,
});
} catch {

View File

@@ -190,7 +190,6 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
isCronRuntimeAuthoritative: () => true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
resolveCronRunLogPath: ({ jobId }) => jobId,
readCronRunLogEntriesSync: () => [],
});
}
@@ -2654,7 +2653,6 @@ describe("task-registry", () => {
isCronRuntimeAuthoritative: () => true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
resolveCronRunLogPath: ({ jobId }) => jobId,
readCronRunLogEntriesSync: () => [],
});