diff --git a/src/commands/doctor-cron-legacy-store-migration.ts b/src/commands/doctor-cron-legacy-store-migration.ts new file mode 100644 index 00000000000..ccd8412db4b --- /dev/null +++ b/src/commands/doctor-cron-legacy-store-migration.ts @@ -0,0 +1,238 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { tryCronScheduleIdentity } from "../cron/schedule-identity.js"; +import type { + CronConfigJobRuntimeEntry, + LoadedCronStore, + QuarantinedCronConfigJob, +} from "../cron/store.js"; +import type { CronStoreFile } from "../cron/types.js"; +import { isRecord } from "../shared/record-coerce.js"; +import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js"; + +const LEGACY_CRON_ARCHIVE_SUFFIX = ".migrated"; + +function resolveLegacyCronStatePath(storePath: string): string { + if (storePath.endsWith(".json")) { + return storePath.replace(/\.json$/, "-state.json"); + } + return `${storePath}-state.json`; +} + +async function legacyCronFileExists(filePath: string): Promise { + return fs + .access(filePath) + .then(() => true) + .catch(() => false); +} + +async function archiveLegacyCronFile(filePath: string): Promise { + if (!(await legacyCronFileExists(filePath))) { + return; + } + const archivePath = `${filePath}${LEGACY_CRON_ARCHIVE_SUFFIX}`; + if (await legacyCronFileExists(archivePath)) { + return; + } + await fs.rename(filePath, archivePath).catch(() => undefined); +} + +function parseCronStateFile(raw: string): { + version: 1; + jobs: Record; +} | null { + try { + const parsed = parseJsonWithJson5Fallback(raw); + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + const record = parsed as Record; + if ( + record.version !== 1 || + typeof record.jobs !== "object" || + record.jobs === null || + Array.isArray(record.jobs) + ) { + return null; + } + return { + version: 1, + jobs: record.jobs as Record, + }; + } catch { + return null; + } +} + +function getRawCronJobs(parsed: unknown): unknown[] { + return Array.isArray(parsed) + ? parsed + : isRecord(parsed) && Array.isArray(parsed.jobs) + ? parsed.jobs + : []; +} + +function cloneConfigJobs(jobs: Array>): Array> { + return jobs.map((job) => structuredClone(job)); +} + +async function loadStateFile( + statePath: string, +): Promise<{ version: 1; jobs: Record } | null> { + let raw: string; + try { + raw = await fs.readFile(statePath, "utf-8"); + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return null; + } + throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, { + cause: err, + }); + } + + return parseCronStateFile(raw); +} + +function hasInlineState(jobs: Array | null | undefined>): boolean { + return jobs.some( + (job) => job != null && isRecord(job.state) && Object.keys(job.state).length > 0, + ); +} + +function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void { + if (!isRecord(job.state)) { + job.state = {} as never; + } +} + +function backfillMissingRuntimeFields(job: CronStoreFile["jobs"][number]): void { + ensureJobStateObject(job); + if (typeof job.updatedAtMs !== "number") { + job.updatedAtMs = typeof job.createdAtMs === "number" ? job.createdAtMs : Date.now(); + } +} + +function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unknown): number { + if (typeof updatedAtMs === "number" && Number.isFinite(updatedAtMs)) { + return updatedAtMs; + } + if (typeof job.updatedAtMs === "number" && Number.isFinite(job.updatedAtMs)) { + return job.updatedAtMs; + } + return typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs) + ? job.createdAtMs + : Date.now(); +} + +function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown): void { + if (!isRecord(entry)) { + backfillMissingRuntimeFields(job); + return; + } + job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs); + job.state = isRecord(entry.state) ? (entry.state as never) : ({} as never); + if ( + typeof entry.scheduleIdentity === "string" && + entry.scheduleIdentity !== tryCronScheduleIdentity(job as unknown as Record) + ) { + ensureJobStateObject(job); + job.state.nextRunAtMs = undefined; + } +} + +function resolveCronStateId(job: Record): string | undefined { + return normalizeOptionalString(job.id) ?? normalizeOptionalString(job.jobId); +} + +export async function legacyCronStoreFilesExist(storePath: string): Promise { + const resolvedStorePath = path.resolve(storePath); + return ( + (await legacyCronFileExists(resolvedStorePath)) || + (await legacyCronFileExists(resolveLegacyCronStatePath(resolvedStorePath))) + ); +} + +export async function archiveLegacyCronStoreForMigration(storePath: string): Promise { + const resolvedStorePath = path.resolve(storePath); + await Promise.all([ + archiveLegacyCronFile(resolvedStorePath), + archiveLegacyCronFile(resolveLegacyCronStatePath(resolvedStorePath)), + ]); +} + +export async function loadLegacyCronStoreForMigration(storePath: string): Promise { + const resolvedStorePath = path.resolve(storePath); + try { + const raw = await fs.readFile(resolvedStorePath, "utf-8"); + let parsed: unknown; + try { + parsed = parseJsonWithJson5Fallback(raw); + } catch (err) { + throw new Error(`Failed to parse cron store at ${resolvedStorePath}: ${String(err)}`, { + cause: err, + }); + } + const rawJobs = getRawCronJobs(parsed); + const configJobIndexes: number[] = []; + const configRows: Array> = []; + const configJobRuntimeEntries: CronConfigJobRuntimeEntry[] = []; + const invalidConfigRows: QuarantinedCronConfigJob[] = []; + for (const [index, row] of rawJobs.entries()) { + if (isRecord(row)) { + configJobIndexes.push(index); + configRows.push(row); + } else { + invalidConfigRows.push({ + sourceIndex: index, + reason: "non-object-row", + raw: structuredClone(row), + }); + } + } + const store: CronStoreFile = { + version: 1, + jobs: configRows as never as CronStoreFile["jobs"], + }; + const jobs = store.jobs as unknown as Array>; + const configJobs = cloneConfigJobs(configRows); + + const stateFile = await loadStateFile(resolveLegacyCronStatePath(resolvedStorePath)); + const hasLegacyInlineState = !stateFile && hasInlineState(jobs); + + if (stateFile) { + for (const job of store.jobs) { + const stateId = resolveCronStateId(job as unknown as Record); + const entry = stateId ? stateFile.jobs[stateId] : undefined; + configJobRuntimeEntries.push(isRecord(entry) ? structuredClone(entry) : {}); + if (entry) { + mergeStateFileEntry(job, entry); + } else { + backfillMissingRuntimeFields(job); + } + } + } else if (!hasLegacyInlineState) { + for (const job of store.jobs) { + backfillMissingRuntimeFields(job); + } + } + + for (const job of store.jobs) { + ensureJobStateObject(job); + } + + return { store, configJobs, configJobIndexes, configJobRuntimeEntries, invalidConfigRows }; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return { + store: { version: 1, jobs: [] }, + configJobs: [], + configJobIndexes: [], + configJobRuntimeEntries: [], + invalidConfigRows: [], + }; + } + throw err; + } +} diff --git a/src/commands/doctor-cron.test.ts b/src/commands/doctor-cron.test.ts index b56a5dbc544..8e2ed11c78c 100644 --- a/src/commands/doctor-cron.test.ts +++ b/src/commands/doctor-cron.test.ts @@ -438,7 +438,7 @@ describe("maybeRepairLegacyCronStore", () => { prompter: makePrompter(true), }); - const entries = readCronRunLogEntriesSync(runLogPath); + const entries = readCronRunLogEntriesSync({ storePath, jobId: "sqlite-job" }); expect(entries).toHaveLength(1); expect(entries[0]?.jobId).toBe("sqlite-job"); expect(entries[0]?.summary).toBe("done"); diff --git a/src/commands/doctor-cron.ts b/src/commands/doctor-cron.ts index 7c1070dc193..2752c91531d 100644 --- a/src/commands/doctor-cron.ts +++ b/src/commands/doctor-cron.ts @@ -6,13 +6,10 @@ import { resolveAgentModelPrimaryValue } from "../config/model-input.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { legacyCronRunLogFilesExist, migrateLegacyCronRunLogsToSqlite } from "../cron/run-log.js"; import { - archiveLegacyCronStoreForMigration, - legacyCronStoreFilesExist, - loadLegacyCronStoreForMigration, loadCronQuarantineFile, + loadCronStore, resolveCronQuarantinePath, resolveCronStorePath, - loadCronStore, saveCronStore, } from "../cron/store.js"; import type { CronJob } from "../cron/types.js"; @@ -25,6 +22,11 @@ import { countStaleDreamingJobs, migrateLegacyDreamingPayloadShape, } from "./doctor-cron-dreaming-payload-migration.js"; +import { + archiveLegacyCronStoreForMigration, + legacyCronStoreFilesExist, + loadLegacyCronStoreForMigration, +} from "./doctor-cron-legacy-store-migration.js"; import { normalizeStoredCronJobs } from "./doctor-cron-store-migration.js"; import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js"; diff --git a/src/cron/run-log.error-reason.test.ts b/src/cron/run-log.error-reason.test.ts index 9aab6cc64ad..fef74413f2f 100644 --- a/src/cron/run-log.error-reason.test.ts +++ b/src/cron/run-log.error-reason.test.ts @@ -21,6 +21,10 @@ async function writeLegacyRunLogAndMigrate( return file; } +function fileToStorePath(file: string): string { + return path.join(path.dirname(path.dirname(file)), "jobs.json"); +} + describe("cron run log errorReason", () => { it("backfills errorReason from timeout error text for older entries", async () => { const file = await writeLegacyRunLogAndMigrate([ @@ -33,13 +37,17 @@ describe("cron run log errorReason", () => { }, ]); - const page = await readCronRunLogEntriesPage(file, { limit: 10 }); + const page = await readCronRunLogEntriesPage({ + storePath: fileToStorePath(file), + jobId: "job-1", + limit: 10, + }); expect(page.entries[0]?.errorReason).toBe("timeout"); }); it("validates persisted errorReason against the full failover reason set", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-run-log-")); - const file = path.join(dir, "job.jsonl"); + const storePath = path.join(dir, "jobs.json"); const reasons = [ "auth", "auth_permanent", @@ -57,16 +65,24 @@ describe("cron run log errorReason", () => { "unknown", ] satisfies Array>; for (const [index, errorReason] of reasons.entries()) { - await appendCronRunLog(file, { - ts: index + 1, - jobId: "job-1", - action: "finished", - status: "error", - errorReason, + await appendCronRunLog({ + storePath, + entry: { + ts: index + 1, + jobId: "job-1", + action: "finished", + status: "error", + errorReason, + }, }); } - const page = await readCronRunLogEntriesPage(file, { limit: 50, sortDir: "asc" }); + const page = await readCronRunLogEntriesPage({ + storePath, + jobId: "job-1", + limit: 50, + sortDir: "asc", + }); expect(page.entries.map((entry) => entry.errorReason)).toEqual(reasons); }); @@ -82,38 +98,53 @@ describe("cron run log errorReason", () => { }, ]); - const page = await readCronRunLogEntriesPage(file, { limit: 10 }); + const page = await readCronRunLogEntriesPage({ + storePath: fileToStorePath(file), + jobId: "job-1", + limit: 10, + }); expect(page.entries[0]?.errorReason).toBe("overloaded"); }); it("uses provider context when deriving persisted run-log reasons", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-run-log-")); - const file = path.join(dir, "job.jsonl"); - await appendCronRunLog(file, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "error", - error: "403 Key limit exceeded (monthly limit)", - provider: "openrouter", + const storePath = path.join(dir, "jobs.json"); + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "error", + error: "403 Key limit exceeded (monthly limit)", + provider: "openrouter", + }, }); - const page = await readCronRunLogEntriesPage(file, { limit: 10 }); + const page = await readCronRunLogEntriesPage({ storePath, jobId: "job-1", limit: 10 }); expect(page.entries[0]?.errorReason).toBe("billing"); }); it("includes derived errorReason values in run-log search", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-run-log-")); - const file = path.join(dir, "job.jsonl"); - await appendCronRunLog(file, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "error", - error: "cron: job execution timed out", + const storePath = path.join(dir, "jobs.json"); + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "error", + error: "cron: job execution timed out", + }, }); - const page = await readCronRunLogEntriesPage(file, { limit: 10, query: "timeout" }); + const page = await readCronRunLogEntriesPage({ + storePath, + jobId: "job-1", + limit: 10, + query: "timeout", + }); expect(page.entries).toHaveLength(1); expect(page.entries[0]?.errorReason).toBe("timeout"); }); diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index f25d20d93ee..7b5605a7933 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -12,7 +12,6 @@ import { readCronRunLogEntriesPage, readCronRunLogEntriesSync, resolveCronRunLogPruneOptions, - resolveCronRunLogPath, } from "./run-log.js"; describe("cron run log", () => { @@ -50,74 +49,75 @@ describe("cron run log", () => { } } - it("resolves store path to per-job runs/.jsonl", () => { - const storePath = path.join(os.tmpdir(), "cron", "jobs.json"); - const p = resolveCronRunLogPath({ storePath, jobId: "job-1" }); - expect(p.endsWith(path.join(os.tmpdir(), "cron", "runs", "job-1.jsonl"))).toBe(true); - }); + function storePathForDir(dir: string): string { + return path.join(dir, "jobs.json"); + } - it("rejects unsafe job ids when resolving run log path", () => { + it("rejects unsafe job ids before querying SQLite run logs", async () => { const storePath = path.join(os.tmpdir(), "cron", "jobs.json"); - expect(() => resolveCronRunLogPath({ storePath, jobId: "../job-1" })).toThrow( - /invalid cron run log job id/i, - ); - expect(() => resolveCronRunLogPath({ storePath, jobId: "nested/job-1" })).toThrow( - /invalid cron run log job id/i, - ); - expect(() => resolveCronRunLogPath({ storePath, jobId: "..\\job-1" })).toThrow( - /invalid cron run log job id/i, - ); + for (const jobId of ["../job-1", "nested/job-1", "..\\job-1"]) { + await expect(readCronRunLogEntriesPage({ storePath, jobId })).rejects.toThrow( + /invalid cron run log job id/i, + ); + } }); it("appends SQLite rows and prunes by line count", async () => { await withRunLogDir("openclaw-cron-log-", async (dir) => { - const logPath = path.join(dir, "runs", "job-1.jsonl"); + const storePath = storePathForDir(dir); for (let i = 0; i < 10; i++) { - await appendCronRunLog( - logPath, - { + await appendCronRunLog({ + storePath, + entry: { ts: 1000 + i, jobId: "job-1", action: "finished", status: "ok", durationMs: i, }, - { maxBytes: 1, keepLines: 3 }, - ); + opts: { keepLines: 3 }, + }); } - const entries = readCronRunLogEntriesSync(logPath, { limit: 10 }); + const entries = readCronRunLogEntriesSync({ storePath, jobId: "job-1", limit: 10 }); expect(entries.map((entry) => entry.ts)).toEqual([1007, 1008, 1009]); + const logPath = path.join(dir, "runs", "job-1.jsonl"); await expect(fs.stat(logPath)).rejects.toMatchObject({ code: "ENOENT" }); }); }); it("reads run-log entries synchronously for task reconciliation", async () => { await withRunLogDir("openclaw-cron-log-sync-", async (dir) => { - const logPath = path.join(dir, "runs", "job-1.jsonl"); - await appendCronRunLog(logPath, { - ts: 1000, - jobId: "job-1", - action: "finished", - status: "ok", - runAtMs: 900, - durationMs: 100, + const storePath = storePathForDir(dir); + await appendCronRunLog({ + storePath, + entry: { + ts: 1000, + jobId: "job-1", + action: "finished", + status: "ok", + runAtMs: 900, + durationMs: 100, + }, }); - await appendCronRunLog(logPath, { - ts: 2000, - jobId: "job-2", - action: "finished", - status: "error", + await appendCronRunLog({ + storePath, + entry: { + ts: 2000, + jobId: "job-2", + action: "finished", + status: "error", + }, }); - const jobEntries = readCronRunLogEntriesSync(logPath, { jobId: "job-1" }); + const jobEntries = readCronRunLogEntriesSync({ storePath, jobId: "job-1" }); expect(jobEntries).toHaveLength(1); expect(jobEntries[0]?.jobId).toBe("job-1"); expect(jobEntries[0]?.status).toBe("ok"); expect(jobEntries[0]?.runAtMs).toBe(900); expect(jobEntries[0]?.durationMs).toBe(100); - expect(readCronRunLogEntriesSync(path.join(dir, "runs", "missing.jsonl"))).toStrictEqual([]); + expect(readCronRunLogEntriesSync({ storePath, jobId: "missing" })).toStrictEqual([]); }); }); @@ -125,13 +125,17 @@ describe("cron run log", () => { "does not create legacy run log files for new writes", async () => { await withRunLogDir("openclaw-cron-log-perms-", async (dir) => { + const storePath = storePathForDir(dir); const logPath = path.join(dir, "runs", "job-1.jsonl"); - await appendCronRunLog(logPath, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "ok", + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "ok", + }, }); await expect(fs.stat(logPath)).rejects.toMatchObject({ code: "ENOENT" }); @@ -143,16 +147,19 @@ describe("cron run log", () => { "does not mutate legacy run-log directory permissions on SQLite writes", async () => { await withRunLogDir("openclaw-cron-log-dir-perms-", async (dir) => { + const storePath = storePathForDir(dir); const runDir = path.join(dir, "runs"); - const logPath = path.join(runDir, "job-1.jsonl"); await fs.mkdir(runDir, { recursive: true, mode: 0o755 }); await fs.chmod(runDir, 0o755); - await appendCronRunLog(logPath, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "ok", + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "ok", + }, }); const runDirMode = (await fs.stat(runDir)).mode & 0o777; @@ -163,80 +170,91 @@ describe("cron run log", () => { it("reads newest entries and filters by jobId", async () => { await withRunLogDir("openclaw-cron-log-read-", async (dir) => { - const logPathA = path.join(dir, "runs", "a.jsonl"); - const logPathB = path.join(dir, "runs", "b.jsonl"); + const storePath = storePathForDir(dir); - await appendCronRunLog(logPathA, { - ts: 1, - jobId: "a", - action: "finished", - status: "ok", + await appendCronRunLog({ + storePath, + entry: { ts: 1, jobId: "a", action: "finished", status: "ok" }, }); - await appendCronRunLog(logPathB, { - ts: 2, - jobId: "b", - action: "finished", - status: "error", - error: "nope", - summary: "oops", + await appendCronRunLog({ + storePath, + entry: { + ts: 2, + jobId: "b", + action: "finished", + status: "error", + error: "nope", + summary: "oops", + }, }); - await appendCronRunLog(logPathA, { - ts: 3, - jobId: "a", - action: "finished", - status: "skipped", - sessionId: "run-123", - sessionKey: "agent:main:cron:a:run:run-123", + await appendCronRunLog({ + storePath, + entry: { + ts: 3, + jobId: "a", + action: "finished", + status: "skipped", + sessionId: "run-123", + sessionKey: "agent:main:cron:a:run:run-123", + }, }); - const allA = await readCronRunLogEntries(logPathA, { limit: 10 }); + const allA = await readCronRunLogEntries({ storePath, jobId: "a", limit: 10 }); expect(allA.map((e) => e.jobId)).toEqual(["a", "a"]); - const onlyA = await readCronRunLogEntries(logPathA, { + const onlyA = await readCronRunLogEntries({ + storePath, limit: 10, jobId: "a", }); expect(onlyA.map((e) => e.ts)).toEqual([1, 3]); - const lastOne = await readCronRunLogEntries(logPathA, { limit: 1 }); + const lastOne = await readCronRunLogEntries({ storePath, jobId: "a", limit: 1 }); expect(lastOne.map((e) => e.ts)).toEqual([3]); expect(lastOne[0]?.sessionId).toBe("run-123"); expect(lastOne[0]?.sessionKey).toBe("agent:main:cron:a:run:run-123"); - const onlyB = await readCronRunLogEntries(logPathB, { + const onlyB = await readCronRunLogEntries({ + storePath, limit: 10, jobId: "b", }); expect(onlyB[0]?.summary).toBe("oops"); - const wrongFilter = await readCronRunLogEntries(logPathA, { - limit: 10, - jobId: "b", - }); - expect(wrongFilter).toStrictEqual([]); + expect(await readCronRunLogEntries({ storePath, limit: 10, jobId: "missing" })).toStrictEqual( + [], + ); }); }); it("filters run-log pages by runId", async () => { await withRunLogDir("openclaw-cron-log-runid-", async (dir) => { - const logPath = path.join(dir, "runs", "job-1.jsonl"); + const storePath = storePathForDir(dir); - await appendCronRunLog(logPath, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "error", - runId: "manual:job-1:1:0", + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "error", + runId: "manual:job-1:1:0", + }, }); - await appendCronRunLog(logPath, { - ts: 2, - jobId: "job-1", - action: "finished", - status: "ok", - runId: "manual:job-1:2:0", + await appendCronRunLog({ + storePath, + entry: { + ts: 2, + jobId: "job-1", + action: "finished", + status: "ok", + runId: "manual:job-1:2:0", + }, }); - const page = await readCronRunLogEntriesPage(logPath, { + const page = await readCronRunLogEntriesPage({ + storePath, + jobId: "job-1", runId: "manual:job-1:2:0", limit: 10, }); @@ -280,8 +298,9 @@ describe("cron run log", () => { "utf-8", ); - await migrateLegacyCronRunLogsToSqlite(path.join(dir, "jobs.json")); - const entries = await readCronRunLogEntries(logPath, { limit: 10, jobId: "job-1" }); + const storePath = storePathForDir(dir); + await migrateLegacyCronRunLogsToSqlite(storePath); + const entries = await readCronRunLogEntries({ storePath, limit: 10, jobId: "job-1" }); expect(entries).toHaveLength(1); expect(entries[0]?.ts).toBe(2); expect(entries[0]?.delivered).toBe(true); @@ -322,10 +341,12 @@ describe("cron run log", () => { "utf-8", ); - await migrateLegacyCronRunLogsToSqlite(path.join(dir, "jobs.json")); + const storePath = storePathForDir(dir); + await migrateLegacyCronRunLogsToSqlite(storePath); expect( ( - await readCronRunLogEntriesPage(logPath, { + await readCronRunLogEntriesPage({ + storePath, limit: 10, jobId: "job-1", query: "telegram", @@ -334,7 +355,8 @@ describe("cron run log", () => { ).toHaveLength(1); expect( ( - await readCronRunLogEntriesPage(logPath, { + await readCronRunLogEntriesPage({ + storePath, limit: 10, jobId: "job-1", query: "-100", @@ -346,28 +368,31 @@ describe("cron run log", () => { it("reads and searches run diagnostics", async () => { await withRunLogDir("openclaw-cron-log-diagnostics-", async (dir) => { - const logPath = path.join(dir, "runs", "job-1.jsonl"); + const storePath = storePathForDir(dir); - await appendCronRunLog(logPath, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "error", - diagnostics: { - summary: "exec stderr tail", - entries: [ - { - ts: 1, - source: "exec", - severity: "error", - message: "exec stderr tail", - exitCode: 2, - }, - ], + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "error", + diagnostics: { + summary: "exec stderr tail", + entries: [ + { + ts: 1, + source: "exec", + severity: "error", + message: "exec stderr tail", + exitCode: 2, + }, + ], + }, }, }); - const entries = await readCronRunLogEntries(logPath, { limit: 10, jobId: "job-1" }); + const entries = await readCronRunLogEntries({ storePath, limit: 10, jobId: "job-1" }); expect(entries[0]?.diagnostics?.summary).toBe("exec stderr tail"); expect(entries[0]?.diagnostics?.entries).toHaveLength(1); expect(entries[0]?.diagnostics?.entries[0]?.source).toBe("exec"); @@ -376,7 +401,8 @@ describe("cron run log", () => { expect(entries[0]?.diagnostics?.entries[0]?.exitCode).toBe(2); expect( ( - await readCronRunLogEntriesPage(logPath, { + await readCronRunLogEntriesPage({ + storePath, limit: 10, jobId: "job-1", query: "stderr tail", @@ -388,21 +414,25 @@ describe("cron run log", () => { it("reads telemetry fields", async () => { await withRunLogDir("openclaw-cron-log-telemetry-", async (dir) => { + const storePath = storePathForDir(dir); const logPath = path.join(dir, "runs", "job-1.jsonl"); - await appendCronRunLog(logPath, { - ts: 1, - jobId: "job-1", - action: "finished", - status: "ok", - model: "gpt-5.4", - provider: "openai", - usage: { - input_tokens: 10, - output_tokens: 5, - total_tokens: 15, - cache_read_tokens: 2, - cache_write_tokens: 1, + await appendCronRunLog({ + storePath, + entry: { + ts: 1, + jobId: "job-1", + action: "finished", + status: "ok", + model: "gpt-5.4", + provider: "openai", + usage: { + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + cache_read_tokens: 2, + cache_write_tokens: 1, + }, }, }); @@ -421,8 +451,8 @@ describe("cron run log", () => { "utf-8", ); - await migrateLegacyCronRunLogsToSqlite(path.join(dir, "jobs.json")); - const entries = await readCronRunLogEntries(logPath, { limit: 10, jobId: "job-1" }); + await migrateLegacyCronRunLogsToSqlite(storePath); + const entries = await readCronRunLogEntries({ storePath, limit: 10, jobId: "job-1" }); expect(entries[0]?.model).toBe("gpt-5.4"); expect(entries[0]?.provider).toBe("openai"); expect(entries[0]?.usage).toEqual({ @@ -440,12 +470,14 @@ describe("cron run log", () => { it("cleans up pending-write bookkeeping after appends complete", async () => { await withRunLogDir("openclaw-cron-log-pending-", async (dir) => { - const logPath = path.join(dir, "runs", "job-cleanup.jsonl"); - await appendCronRunLog(logPath, { - ts: 1, - jobId: "job-cleanup", - action: "finished", - status: "ok", + await appendCronRunLog({ + storePath: storePathForDir(dir), + entry: { + ts: 1, + jobId: "job-cleanup", + action: "finished", + status: "ok", + }, }); expect(getPendingCronRunLogWriteCountForTests()).toBe(0); @@ -454,21 +486,24 @@ describe("cron run log", () => { it("read drains pending fire-and-forget writes", async () => { await withRunLogDir("openclaw-cron-log-drain-", async (dir) => { - const logPath = path.join(dir, "runs", "job-drain.jsonl"); + const storePath = storePathForDir(dir); // Fire-and-forget write (simulates the `void appendCronRunLog(...)` pattern // in server-cron.ts). Do NOT await. - const writePromise = appendCronRunLog(logPath, { - ts: 42, - jobId: "job-drain", - action: "finished", - status: "ok", - summary: "drain-test", + const writePromise = appendCronRunLog({ + storePath, + entry: { + ts: 42, + jobId: "job-drain", + action: "finished", + status: "ok", + summary: "drain-test", + }, }); void writePromise.catch(() => undefined); // Read should see the entry because it drains pending writes. - const entries = await readCronRunLogEntries(logPath, { limit: 10 }); + const entries = await readCronRunLogEntries({ storePath, jobId: "job-drain", limit: 10 }); expect(entries).toHaveLength(1); expect(entries[0]?.ts).toBe(42); expect(entries[0]?.summary).toBe("drain-test"); diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index c44ea667018..dad9c3c90ec 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -7,7 +7,6 @@ import type { FailoverReason } from "../agents/embedded-agent-helpers/types.js"; import { resolveFailoverReasonFromError } from "../agents/failover-error.js"; import { parseByteSize } from "../cli/parse-bytes.js"; import type { CronConfig } from "../config/types.cron.js"; -import { isPathInside } from "../infra/fs-safe.js"; import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; import { normalizeLowercaseStringOrEmpty, @@ -82,6 +81,10 @@ type ReadCronRunLogAllPageOptions = Omit & { jobNameById?: Record; }; +type AppendCronRunLogOptions = { + keepLines?: number; +}; + type CronRunLogsTable = OpenClawStateKyselyDatabase["cron_run_logs"]; type CronRunLogDatabase = Pick; type CronRunLogRow = Selectable; @@ -105,8 +108,8 @@ const CRON_FAILOVER_REASONS = new Set([ ]); const LEGACY_CRON_RUN_LOG_ARCHIVE_SUFFIX = ".migrated"; +const INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE = "invalid cron run log job id"; type CronRunLogTarget = { storePath: string; jobId: string; strictJobId: boolean }; -const runLogTargetsByPath = new Map(); function normalizeCronRunLogErrorReason(value: unknown): FailoverReason | undefined { return typeof value === "string" && CRON_FAILOVER_REASONS.has(value as FailoverReason) @@ -117,28 +120,19 @@ function normalizeCronRunLogErrorReason(value: unknown): FailoverReason | undefi function assertSafeCronRunLogJobId(jobId: string): string { const trimmed = jobId.trim(); if (!trimmed) { - throw new Error("invalid cron run log job id"); + throw new Error(INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE); } if (trimmed.includes("/") || trimmed.includes("\\") || trimmed.includes("\0")) { - throw new Error("invalid cron run log job id"); + throw new Error(INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE); } return trimmed; } -export function resolveCronRunLogPath(params: { storePath: string; jobId: string }) { - const storePath = path.resolve(params.storePath); - const dir = path.dirname(storePath); - const runsDir = path.resolve(dir, "runs"); - const safeJobId = assertSafeCronRunLogJobId(params.jobId); - const resolvedPath = path.resolve(runsDir, `${safeJobId}.jsonl`); - if (!isPathInside(runsDir, resolvedPath)) { - throw new Error("invalid cron run log job id"); - } - runLogTargetsByPath.set(resolvedPath, { storePath, jobId: safeJobId, strictJobId: true }); - return resolvedPath; +export function isInvalidCronRunLogJobIdError(err: unknown): boolean { + return err instanceof Error && err.message === INVALID_CRON_RUN_LOG_JOB_ID_MESSAGE; } -const writesByPath = new Map>(); +const writesByTarget = new Map>(); export const DEFAULT_CRON_RUN_LOG_MAX_BYTES = 2_000_000; export const DEFAULT_CRON_RUN_LOG_KEEP_LINES = 2_000; @@ -168,15 +162,23 @@ export function resolveCronRunLogPruneOptions(cfg?: CronConfig["runLog"]): { } export function getPendingCronRunLogWriteCountForTests() { - return writesByPath.size; + return writesByTarget.size; } -async function drainPendingWrite(filePath: string): Promise { - const resolved = path.resolve(filePath); - const pending = writesByPath.get(resolved); - if (pending) { - await pending.catch(() => undefined); +function cronRunLogWriteKey(storePath: string, jobId?: string): string { + return `${cronStoreKey(storePath)}\0${jobId ?? ""}`; +} + +async function drainPendingWrite(storePath: string, jobId?: string): Promise { + if (jobId) { + await writesByTarget.get(cronRunLogWriteKey(storePath, jobId))?.catch(() => undefined); + return; } + const storePrefix = `${cronStoreKey(storePath)}\0`; + const pending = [...writesByTarget.entries()] + .filter(([key]) => key.startsWith(storePrefix)) + .map(([, write]) => write.catch(() => undefined)); + await Promise.all(pending); } function cronStoreKey(storePath: string): string { @@ -187,20 +189,6 @@ function getCronRunLogKysely(db: DatabaseSync) { return getNodeSqliteKysely(db); } -function inferCronRunLogTarget(filePath: string, jobId?: string): CronRunLogTarget { - const resolved = path.resolve(filePath); - const known = runLogTargetsByPath.get(resolved); - if (known && (!jobId || known.jobId === jobId)) { - return known; - } - const inferredJobId = assertSafeCronRunLogJobId(jobId ?? path.basename(resolved, ".jsonl")); - const parentDir = path.dirname(resolved); - const isRunsDir = path.basename(parentDir) === "runs"; - const storeDir = isRunsDir ? path.dirname(parentDir) : parentDir; - const storePath = path.resolve(storeDir, "jobs.json"); - return { storePath, jobId: inferredJobId, strictJobId: isRunsDir }; -} - function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { return Number(value); @@ -286,6 +274,74 @@ function readCronRunLogRows(db: DatabaseSync, storeKey: string, jobId?: string): return executeSqliteQuerySync(db, query.orderBy("ts", "asc").orderBy("seq", "asc")).rows; } +function buildRunLogWhereClause(params: { + storeKey: string; + jobId?: string; + statuses: CronRunStatus[] | null; + deliveryStatuses: CronDeliveryStatus[] | null; + runId?: string; +}): { whereSql: string; values: Array } { + const clauses = ["store_key = ?"]; + const values: Array = [params.storeKey]; + if (params.jobId) { + clauses.push("job_id = ?"); + values.push(params.jobId); + } + if (params.statuses?.length) { + clauses.push(`status IN (${params.statuses.map(() => "?").join(", ")})`); + values.push(...params.statuses); + } + if (params.deliveryStatuses?.length) { + clauses.push( + `COALESCE(delivery_status, 'not-requested') IN (${params.deliveryStatuses + .map(() => "?") + .join(", ")})`, + ); + values.push(...params.deliveryStatuses); + } + const runId = normalizeOptionalString(params.runId); + if (runId) { + clauses.push("run_id = ?"); + values.push(runId); + } + return { whereSql: clauses.join(" AND "), values }; +} + +function countCronRunLogRows( + db: DatabaseSync, + whereSql: string, + values: Array, +): number { + const row = db + .prepare(`SELECT COUNT(*) AS count FROM cron_run_logs WHERE ${whereSql}`) + .get(...values) as { count?: number | bigint } | undefined; + return normalizeNumber(row?.count ?? null) ?? 0; +} + +function readCronRunLogRowsPage(params: { + db: DatabaseSync; + storeKey: string; + jobId?: string; + statuses: CronRunStatus[] | null; + deliveryStatuses: CronDeliveryStatus[] | null; + runId?: string; + sortDir: CronRunLogSortDir; + offset?: number; + limit?: number; +}): CronRunLogRow[] { + const { whereSql, values } = buildRunLogWhereClause(params); + const order = params.sortDir === "asc" ? "ASC" : "DESC"; + const limitSql = + params.limit === undefined || params.offset === undefined ? "" : " LIMIT ? OFFSET ?"; + const limitValues = + params.limit === undefined || params.offset === undefined ? [] : [params.limit, params.offset]; + return params.db + .prepare( + `SELECT * FROM cron_run_logs WHERE ${whereSql} ORDER BY ts ${order}, seq ${order}${limitSql}`, + ) + .all(...values, ...limitValues) as CronRunLogRow[]; +} + function nextCronRunLogSeq(db: DatabaseSync, storeKey: string, jobId: string): number { const row = db .prepare( @@ -387,45 +443,47 @@ function archiveLegacyCronRunLogSync(filePath: string): void { } } -export async function appendCronRunLog( - filePath: string, - entry: CronRunLogEntry, - opts?: { maxBytes?: number; keepLines?: number }, -) { - const resolved = path.resolve(filePath); - const prev = writesByPath.get(resolved) ?? Promise.resolve(); +export async function appendCronRunLog(params: { + storePath: string; + entry: CronRunLogEntry; + opts?: AppendCronRunLogOptions; +}) { + const storeKey = cronStoreKey(params.storePath); + const writeKey = cronRunLogWriteKey(params.storePath, params.entry.jobId); + const prev = writesByTarget.get(writeKey) ?? Promise.resolve(); const next = prev .catch(() => undefined) .then(async () => { - const target = inferCronRunLogTarget(resolved, entry.jobId); runOpenClawStateWriteTransaction(({ db }) => { - insertCronRunLogEntry(db, cronStoreKey(target.storePath), entry); + insertCronRunLogEntry(db, storeKey, params.entry); pruneCronRunLogRows( db, - cronStoreKey(target.storePath), - entry.jobId, - opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES, + storeKey, + params.entry.jobId, + params.opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES, ); }); }); - writesByPath.set(resolved, next); + writesByTarget.set(writeKey, next); try { await next; } finally { - if (writesByPath.get(resolved) === next) { - writesByPath.delete(resolved); + if (writesByTarget.get(writeKey) === next) { + writesByTarget.delete(writeKey); } } } -export async function readCronRunLogEntries( - filePath: string, - opts?: { limit?: number; jobId?: string }, -): Promise { - await drainPendingWrite(filePath); - const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200))); - const page = await readCronRunLogEntriesPage(filePath, { - jobId: opts?.jobId, +export async function readCronRunLogEntries(params: { + storePath: string; + jobId?: string; + limit?: number; +}): Promise { + await drainPendingWrite(params.storePath, params.jobId); + const limit = Math.max(1, Math.min(5000, Math.floor(params.limit ?? 200))); + const page = await readCronRunLogEntriesPage({ + storePath: params.storePath, + jobId: params.jobId, limit, offset: 0, status: "all", @@ -434,22 +492,18 @@ export async function readCronRunLogEntries( return page.entries.toReversed(); } -export function readCronRunLogEntriesSync( - filePath: string, - opts?: { limit?: number; jobId?: string }, -): CronRunLogEntry[] { - const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200))); - const resolved = path.resolve(filePath); - const target = inferCronRunLogTarget(resolved); - const rows = readCronRunLogRows( - openOpenClawStateDatabase().db, - cronStoreKey(target.storePath), - target.strictJobId ? target.jobId : undefined, - ); +export function readCronRunLogEntriesSync(params: { + storePath: string; + jobId?: string; + limit?: number; +}): CronRunLogEntry[] { + const limit = Math.max(1, Math.min(5000, Math.floor(params.limit ?? 200))); + const storeKey = cronStoreKey(params.storePath); + const jobId = params.jobId ? assertSafeCronRunLogJobId(params.jobId) : undefined; + const rows = readCronRunLogRows(openOpenClawStateDatabase().db, storeKey, jobId); return rows .map(parseStoredRunLogEntry) .filter((entry): entry is CronRunLogEntry => entry !== null) - .filter((entry) => !opts?.jobId || entry.jobId === opts.jobId) .slice(-limit); } @@ -665,76 +719,76 @@ function filterRunLogEntries( } export async function readCronRunLogEntriesPage( - filePath: string, - opts?: ReadCronRunLogPageOptions, -): Promise { - await drainPendingWrite(filePath); - const limit = Math.max(1, Math.min(200, Math.floor(opts?.limit ?? 50))); - const resolved = path.resolve(filePath); - const target = inferCronRunLogTarget(resolved); - const statuses = normalizeRunStatuses(opts); - const deliveryStatuses = normalizeDeliveryStatuses(opts); - const query = normalizeLowercaseStringOrEmpty(opts?.query); - const sortDir: CronRunLogSortDir = opts?.sortDir === "asc" ? "asc" : "desc"; - const all = readCronRunLogRows( - openOpenClawStateDatabase().db, - cronStoreKey(target.storePath), - target.strictJobId ? target.jobId : undefined, - ) - .map(parseStoredRunLogEntry) - .filter((entry): entry is CronRunLogEntry => entry !== null) - .filter((entry) => !opts?.jobId || entry.jobId === opts.jobId); - const filtered = filterRunLogEntries(all, { - runId: opts?.runId, - statuses, - deliveryStatuses, - query, - queryTextForEntry: (entry) => - [ - entry.summary ?? "", - entry.error ?? "", - entry.errorReason ?? "", - entry.diagnostics?.summary ?? "", - ...(entry.diagnostics?.entries ?? []).map((diagnostic) => diagnostic.message), - entry.jobId, - entry.delivery?.intended?.channel ?? "", - entry.delivery?.resolved?.channel ?? "", - ...(entry.delivery?.messageToolSentTo ?? []).map((target) => target.channel), - ].join(" "), - }); - const sorted = - sortDir === "asc" - ? filtered.toSorted((a, b) => a.ts - b.ts) - : filtered.toSorted((a, b) => b.ts - a.ts); - const total = sorted.length; - const offset = Math.max(0, Math.min(total, Math.floor(opts?.offset ?? 0))); - const entries = sorted.slice(offset, offset + limit); - const nextOffset = offset + entries.length; - return { - entries, - total, - offset, - limit, - hasMore: nextOffset < total, - nextOffset: nextOffset < total ? nextOffset : null, - }; -} - -export async function readCronRunLogEntriesPageAll( - opts: ReadCronRunLogAllPageOptions, + opts: ReadCronRunLogPageOptions & { storePath: string; jobNameById?: Record }, ): Promise { + const jobId = opts.jobId ? assertSafeCronRunLogJobId(opts.jobId) : undefined; + await drainPendingWrite(opts.storePath, jobId); const limit = Math.max(1, Math.min(200, Math.floor(opts.limit ?? 50))); const statuses = normalizeRunStatuses(opts); const deliveryStatuses = normalizeDeliveryStatuses(opts); const query = normalizeLowercaseStringOrEmpty(opts.query); const sortDir: CronRunLogSortDir = opts.sortDir === "asc" ? "asc" : "desc"; - const all = readCronRunLogRows(openOpenClawStateDatabase().db, cronStoreKey(opts.storePath)) + const db = openOpenClawStateDatabase().db; + const storeKey = cronStoreKey(opts.storePath); + const offset = Math.max(0, Math.floor(opts.offset ?? 0)); + + if (!query) { + const { whereSql, values } = buildRunLogWhereClause({ + storeKey, + jobId, + statuses, + deliveryStatuses, + runId: opts.runId, + }); + const total = countCronRunLogRows(db, whereSql, values); + const boundedOffset = Math.min(total, offset); + const entries = readCronRunLogRowsPage({ + db, + storeKey, + jobId, + statuses, + deliveryStatuses, + runId: opts.runId, + sortDir, + offset: boundedOffset, + limit, + }) + .map(parseStoredRunLogEntry) + .filter((entry): entry is CronRunLogEntry => entry !== null); + if (opts.jobNameById) { + for (const entry of entries) { + const jobName = opts.jobNameById[entry.jobId]; + if (jobName) { + (entry as CronRunLogEntry & { jobName?: string }).jobName = jobName; + } + } + } + const nextOffset = boundedOffset + entries.length; + return { + entries, + total, + offset: boundedOffset, + limit, + hasMore: nextOffset < total, + nextOffset: nextOffset < total ? nextOffset : null, + }; + } + + const all = readCronRunLogRowsPage({ + db, + storeKey, + jobId, + statuses, + deliveryStatuses, + runId: opts.runId, + sortDir, + }) .map(parseStoredRunLogEntry) .filter((entry): entry is CronRunLogEntry => entry !== null); const filtered = filterRunLogEntries(all, { runId: opts.runId, - statuses, - deliveryStatuses, + statuses: null, + deliveryStatuses: null, query, queryTextForEntry: (entry) => { const jobName = opts.jobNameById?.[entry.jobId] ?? ""; @@ -757,8 +811,8 @@ export async function readCronRunLogEntriesPageAll( ? filtered.toSorted((a, b) => a.ts - b.ts) : filtered.toSorted((a, b) => b.ts - a.ts); const total = sorted.length; - const offset = Math.max(0, Math.min(total, Math.floor(opts.offset ?? 0))); - const entries = sorted.slice(offset, offset + limit); + const boundedOffset = Math.min(total, offset); + const entries = sorted.slice(boundedOffset, boundedOffset + limit); if (opts.jobNameById) { for (const entry of entries) { const jobName = opts.jobNameById[entry.jobId]; @@ -767,17 +821,23 @@ export async function readCronRunLogEntriesPageAll( } } } - const nextOffset = offset + entries.length; + const nextOffset = boundedOffset + entries.length; return { entries, total, - offset, + offset: boundedOffset, limit, hasMore: nextOffset < total, nextOffset: nextOffset < total ? nextOffset : null, }; } +export async function readCronRunLogEntriesPageAll( + opts: ReadCronRunLogAllPageOptions, +): Promise { + return readCronRunLogEntriesPage(opts); +} + export async function migrateLegacyCronRunLogsToSqlite( storePath: string, ): Promise<{ importedFiles: number }> { @@ -789,7 +849,7 @@ export async function migrateLegacyCronRunLogsToSqlite( for (const file of jsonlFiles) { const jobId = path.basename(file.name, ".jsonl"); const logPath = path.join(runsDir, file.name); - await drainPendingWrite(logPath); + await drainPendingWrite(resolvedStorePath, jobId); await importLegacyCronRunLog(logPath, { storePath: resolvedStorePath, jobId, diff --git a/src/cron/service.test-harness.ts b/src/cron/service.test-harness.ts index ffcd771b75e..7b3af9f2439 100644 --- a/src/cron/service.test-harness.ts +++ b/src/cron/service.test-harness.ts @@ -236,7 +236,6 @@ export function createMockCronStateForJobs(params: { storeLoadedAtMs: nowMs, op: Promise.resolve(), warnedDisabled: false, - warnedMissingSessionTargetJobIds: new Set(), warnedInvalidPersistedJobKeys: new Set(), pendingQuarantineConfigJobs: [], lastQuarantineFailureWarnKey: null, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index f3ca9459750..b6fb8a84779 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -158,12 +158,6 @@ export type CronServiceState = { running: boolean; op: Promise; warnedDisabled: boolean; - /** - * Job ids whose missing `sessionTarget` was defaulted at load and warned - * about. Used to suppress duplicate warns across forceReload ticks so a - * single broken job does not spam the log on every scheduler cycle. - */ - warnedMissingSessionTargetJobIds: Set; /** * Persisted job rows with non-canonical storage shape are skipped in memory * until the runtime can quarantine and sanitize the active store. @@ -182,7 +176,6 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState running: false, op: Promise.resolve(), warnedDisabled: false, - warnedMissingSessionTargetJobIds: new Set(), warnedInvalidPersistedJobKeys: new Set(), pendingQuarantineConfigJobs: [], lastQuarantineFailureWarnKey: null, diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index e13afc48c91..d956f43a8b9 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -108,7 +108,7 @@ export async function ensureLoaded( const rawConfigJob = loaded.configJobs[index] ?? structuredClone(raw); const sourceIndex = loaded.configJobIndexes[index] ?? index; const runtimeEntry = loaded.configJobRuntimeEntries[index]; - const { legacyJobIdIssue } = normalizeCronJobIdentityFields(raw); + normalizeCronJobIdentityFields(raw); let normalized: Record | null; try { normalized = normalizeCronJobInput(raw); @@ -149,56 +149,7 @@ export async function ensureLoaded( continue; } jobs.push(hydrated); - if (legacyJobIdIssue) { - const resolvedId = typeof hydrated.id === "string" ? hydrated.id : undefined; - state.deps.log.warn( - { storePath: state.deps.storePath, jobId: resolvedId }, - "cron: job used legacy jobId field; normalized id in memory (run openclaw doctor --fix to persist canonical shape)", - ); - } - // Persisted legacy jobs may predate the required `enabled` field. - // Keep runtime behavior backward-compatible without rewriting the store. - if (typeof hydrated.enabled !== "boolean") { - hydrated.enabled = true; - } invalidateStaleNextRunOnScheduleChange({ previousJobsById, hydrated }); - // Same shape: persisted jobs missing `sessionTarget` crash downstream - // on any code path that dereferences `.startsWith` (e.g. - // `runIsolatedAgentJob` in `src/gateway/server-cron.ts`). Mirror the - // defaulter applied at create time: systemEvent payloads -> "main", - // agentTurn -> "isolated". Use `Object.hasOwn` rather than `in` so a - // poisoned prototype cannot feed a crafted `kind` into the defaulter. - if (typeof hydrated.sessionTarget !== "string") { - const payload = hydrated.payload as unknown; - const payloadKind = - payload && - typeof payload === "object" && - !Array.isArray(payload) && - Object.hasOwn(payload, "kind") - ? (payload as { kind?: unknown }).kind - : undefined; - let defaulted: "main" | "isolated" | undefined; - if (payloadKind === "systemEvent") { - defaulted = "main"; - } else if (payloadKind === "agentTurn") { - defaulted = "isolated"; - } - if (defaulted) { - hydrated.sessionTarget = defaulted; - // `ensureLoaded` is called with `forceReload: true` on every tick; - // warn once per jobId per process to avoid log spam on repeated - // loads of the same still-broken store file. - const jobId = typeof hydrated.id === "string" ? hydrated.id : undefined; - const dedupeKey = jobId ?? ""; - if (!state.warnedMissingSessionTargetJobIds.has(dedupeKey)) { - state.warnedMissingSessionTargetJobIds.add(dedupeKey); - state.deps.log.warn( - { storePath: state.deps.storePath, jobId, defaulted }, - "cron: job missing sessionTarget; defaulted in memory (run openclaw doctor --fix to persist canonical shape)", - ); - } - } - } } state.store = { version: 1, @@ -251,10 +202,7 @@ export function warnIfDisabled(state: CronServiceState, action: string) { ); } -export async function persist( - state: CronServiceState, - opts?: { skipBackup?: boolean; stateOnly?: boolean }, -) { +export async function persist(state: CronServiceState, opts?: { stateOnly?: boolean }) { if (!state.store) { return; } @@ -266,6 +214,9 @@ export async function persist( } flushedPendingQuarantine = true; } - const saveOpts = flushedPendingQuarantine ? { skipBackup: opts?.skipBackup } : opts; - await saveCronStore(state.deps.storePath, state.store, saveOpts); + await saveCronStore( + state.deps.storePath, + state.store, + flushedPendingQuarantine ? undefined : opts, + ); } diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts index e9d8a30edd8..a6729f8f51f 100644 --- a/src/cron/store.test.ts +++ b/src/cron/store.test.ts @@ -2,10 +2,12 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js"; import { archiveLegacyCronStoreForMigration, loadLegacyCronStoreForMigration, +} from "../commands/doctor-cron-legacy-store-migration.js"; +import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js"; +import { loadCronQuarantineFile, loadCronStore, loadCronStoreSync, diff --git a/src/cron/store.ts b/src/cron/store.ts index dba7f34fe63..11ba9fb8c22 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -28,12 +28,6 @@ import type { CronStoreFile, } from "./types.js"; -type SerializedStoreCacheEntry = { - configJson?: string; - stateJson?: string; - needsSplitMigration: boolean; -}; - export type QuarantinedCronConfigJob = { sourceIndex: number; reason: string; @@ -57,8 +51,6 @@ export type LoadedCronStore = { invalidConfigRows: QuarantinedCronConfigJob[]; }; -const serializedStoreCache = new Map(); - function resolveDefaultCronDir(): string { return path.join(resolveConfigDir(), "cron"); } @@ -67,13 +59,6 @@ function resolveDefaultCronStorePath(): string { return path.join(resolveDefaultCronDir(), "jobs.json"); } -function resolveStatePath(storePath: string): string { - if (storePath.endsWith(".json")) { - return storePath.replace(/\.json$/, "-state.json"); - } - return `${storePath}-state.json`; -} - export function resolveCronQuarantinePath(storePath: string): string { if (storePath.endsWith(".json")) { return storePath.replace(/\.json$/, "-quarantine.json"); @@ -89,39 +74,11 @@ type CronStateFileEntry = { export type CronConfigJobRuntimeEntry = CronStateFileEntry; -type CronStateFile = { - version: 1; - jobs: Record; -}; - type CronJobsTable = OpenClawStateKyselyDatabase["cron_jobs"]; type CronStoreDatabase = Pick; type CronJobRow = Selectable; type CronJobInsert = Insertable; -const LEGACY_CRON_ARCHIVE_SUFFIX = ".migrated"; - -function parseCronStateFile(raw: string): CronStateFile | null { - try { - const parsed = parseJsonWithJson5Fallback(raw); - if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { - return null; - } - const record = parsed as Record; - if ( - record.version !== 1 || - typeof record.jobs !== "object" || - record.jobs === null || - Array.isArray(record.jobs) - ) { - return null; - } - return { version: 1, jobs: record.jobs as Record }; - } catch { - return null; - } -} - function cronStoreKey(storePath: string): string { return path.resolve(storePath); } @@ -749,79 +706,11 @@ function loadedCronStoreFromRows(rows: CronJobRow[]): LoadedCronStore { }; } -async function legacyCronFileExists(filePath: string): Promise { - return fs.promises - .access(filePath, fs.constants.F_OK) - .then(() => true) - .catch(() => false); -} - -async function archiveLegacyCronFile(filePath: string): Promise { - if (!(await legacyCronFileExists(filePath))) { - return; - } - const archivePath = `${filePath}${LEGACY_CRON_ARCHIVE_SUFFIX}`; - if (await legacyCronFileExists(archivePath)) { - return; - } - await fs.promises.rename(filePath, archivePath).catch(() => undefined); -} - -async function archiveLegacyCronStoreFiles(storePath: string): Promise { - await Promise.all([ - archiveLegacyCronFile(storePath), - archiveLegacyCronFile(resolveStatePath(storePath)), - ]); -} - -export async function legacyCronStoreFilesExist(storePath: string): Promise { - return ( - (await legacyCronFileExists(path.resolve(storePath))) || - (await legacyCronFileExists(resolveStatePath(path.resolve(storePath)))) - ); -} - -export async function archiveLegacyCronStoreForMigration(storePath: string): Promise { - await archiveLegacyCronStoreFiles(path.resolve(storePath)); -} - -function getRawCronJobs(parsed: unknown): unknown[] { - return Array.isArray(parsed) - ? parsed - : isRecord(parsed) && Array.isArray(parsed.jobs) - ? parsed.jobs - : []; -} - -function cloneConfigJobs(jobs: Array>): Array> { - return jobs.map((job) => structuredClone(job)); -} - function stripJobRuntimeFields(job: CronStoreFile["jobs"][number]): Record { const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job; return { ...rest, state: {} }; } -function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown { - const jobs = store.jobs.map(stripJobRuntimeFields); - return { - version: store.version, - jobs, - }; -} - -function extractStateFile(store: CronStoreFile): CronStateFile { - const jobs: Record = {}; - for (const job of store.jobs) { - jobs[job.id] = { - updatedAtMs: job.updatedAtMs, - scheduleIdentity: tryCronScheduleIdentity(job as unknown as Record), - state: job.state ?? {}, - }; - } - return { version: 1, jobs }; -} - export function resolveCronStorePath(storePath?: string) { if (storePath?.trim()) { const raw = storePath.trim(); @@ -833,166 +722,6 @@ export function resolveCronStorePath(storePath?: string) { return resolveDefaultCronStorePath(); } -async function loadStateFile(statePath: string): Promise { - let raw: string; - try { - raw = await fs.promises.readFile(statePath, "utf-8"); - } catch (err) { - if ((err as { code?: unknown })?.code === "ENOENT") { - return null; - } - throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, { - cause: err, - }); - } - - return parseCronStateFile(raw); -} - -function hasInlineState(jobs: Array | null | undefined>): boolean { - return jobs.some( - (job) => job != null && isRecord(job.state) && Object.keys(job.state).length > 0, - ); -} - -function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void { - if (!isRecord(job.state)) { - job.state = {} as never; - } -} - -function backfillMissingRuntimeFields(job: CronStoreFile["jobs"][number]): void { - ensureJobStateObject(job); - if (typeof job.updatedAtMs !== "number") { - job.updatedAtMs = typeof job.createdAtMs === "number" ? job.createdAtMs : Date.now(); - } -} - -function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unknown): number { - if (typeof updatedAtMs === "number" && Number.isFinite(updatedAtMs)) { - return updatedAtMs; - } - if (typeof job.updatedAtMs === "number" && Number.isFinite(job.updatedAtMs)) { - return job.updatedAtMs; - } - return typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs) - ? job.createdAtMs - : Date.now(); -} - -function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown): void { - if (!isRecord(entry)) { - backfillMissingRuntimeFields(job); - return; - } - job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs); - job.state = isRecord(entry.state) ? (entry.state as never) : ({} as never); - if ( - typeof entry.scheduleIdentity === "string" && - entry.scheduleIdentity !== tryCronScheduleIdentity(job as unknown as Record) - ) { - ensureJobStateObject(job); - job.state.nextRunAtMs = undefined; - } -} - -function resolveCronStateId(job: Record): string | undefined { - return normalizeOptionalString(job.id) ?? normalizeOptionalString(job.jobId); -} - -async function loadLegacyCronStoreWithConfigJobs(storePath: string): Promise { - try { - const raw = await fs.promises.readFile(storePath, "utf-8"); - let parsed: unknown; - try { - parsed = parseJsonWithJson5Fallback(raw); - } catch (err) { - throw new Error(`Failed to parse cron store at ${storePath}: ${String(err)}`, { - cause: err, - }); - } - const rawJobs = getRawCronJobs(parsed); - const configJobIndexes: number[] = []; - const configRows: Array> = []; - const configJobRuntimeEntries: CronConfigJobRuntimeEntry[] = []; - const invalidConfigRows: QuarantinedCronConfigJob[] = []; - for (const [index, row] of rawJobs.entries()) { - if (isRecord(row)) { - configJobIndexes.push(index); - configRows.push(row); - } else { - invalidConfigRows.push({ - sourceIndex: index, - reason: "non-object-row", - raw: structuredClone(row), - }); - } - } - const store: CronStoreFile = { - version: 1, - jobs: configRows as never as CronStoreFile["jobs"], - }; - const jobs = store.jobs as unknown as Array>; - const configJobs = cloneConfigJobs(configRows); - - // Load state file and merge. - const statePath = resolveStatePath(storePath); - const stateFile = await loadStateFile(statePath); - const hasLegacyInlineState = !stateFile && hasInlineState(jobs); - - if (stateFile) { - // State file exists: merge state by job ID. Inline state in jobs.json is ignored. - for (const job of store.jobs) { - const stateId = resolveCronStateId(job as unknown as Record); - const entry = stateId ? stateFile.jobs[stateId] : undefined; - configJobRuntimeEntries.push(isRecord(entry) ? structuredClone(entry) : {}); - if (entry) { - mergeStateFileEntry(job, entry); - } else { - backfillMissingRuntimeFields(job); - } - } - } else if (!hasLegacyInlineState) { - // No state file, no inline state: fresh clone or first run. - for (const job of store.jobs) { - backfillMissingRuntimeFields(job); - } - } - // else: migration mode — no state file but jobs.json has inline state. Use as-is. - - // Ensure every job has a state object (defensive). - for (const job of store.jobs) { - ensureJobStateObject(job); - } - - const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); - const stateJson = JSON.stringify(extractStateFile(store), null, 2); - serializedStoreCache.set(storePath, { - configJson, - stateJson, - needsSplitMigration: hasLegacyInlineState, - }); - - return { store, configJobs, configJobIndexes, configJobRuntimeEntries, invalidConfigRows }; - } catch (err) { - if ((err as { code?: unknown })?.code === "ENOENT") { - serializedStoreCache.delete(storePath); - return { - store: { version: 1, jobs: [] }, - configJobs: [], - configJobIndexes: [], - configJobRuntimeEntries: [], - invalidConfigRows: [], - }; - } - throw err; - } -} - -export async function loadLegacyCronStoreForMigration(storePath: string): Promise { - return loadLegacyCronStoreWithConfigJobs(path.resolve(storePath)); -} - export async function loadCronStoreWithConfigJobs(storePath: string): Promise { const resolvedStorePath = path.resolve(storePath); const storeKey = cronStoreKey(resolvedStorePath); @@ -1026,7 +755,6 @@ export function loadCronStoreSync(storePath: string): CronStoreFile { } type SaveCronStoreOptions = { - skipBackup?: boolean; stateOnly?: boolean; }; @@ -1047,7 +775,6 @@ export async function saveCronStore( store: CronStoreFile, opts?: SaveCronStoreOptions, ) { - void opts; const resolvedStorePath = path.resolve(storePath); const storeKey = cronStoreKey(resolvedStorePath); if (opts?.stateOnly) { @@ -1060,7 +787,6 @@ export async function saveCronStore( runOpenClawStateWriteTransaction(({ db }) => { replaceCronRows(db, storeKey, store); }); - serializedStoreCache.delete(resolvedStorePath); } export async function loadCronQuarantineFile(path: string): Promise { diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 855017472a0..53dae041d70 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -12,11 +12,7 @@ import { resolveStorePath } from "../config/sessions/paths.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; -import { - appendCronRunLog, - resolveCronRunLogPath, - resolveCronRunLogPruneOptions, -} from "../cron/run-log.js"; +import { appendCronRunLog, resolveCronRunLogPruneOptions } from "../cron/run-log.js"; import type { CronServiceContract } from "../cron/service-contract.js"; import { CronService } from "../cron/service.js"; import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js"; @@ -471,13 +467,9 @@ export function buildGatewayCronService(params: { warnedLegacyWebhookJobs, }); - const logPath = resolveCronRunLogPath({ + void appendCronRunLog({ storePath, - jobId: evt.jobId, - }); - void appendCronRunLog( - logPath, - { + entry: { ts: Date.now(), jobId: evt.jobId, action: "finished", @@ -500,9 +492,12 @@ export function buildGatewayCronService(params: { provider: evt.provider, usage: evt.usage, }, - runLogPrune, - ).catch((err) => { - cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed"); + opts: { keepLines: runLogPrune.keepLines }, + }).catch((err) => { + cronLogger.warn( + { err: String(err), storePath, jobId: evt.jobId }, + "cron: run log append failed", + ); }); } }, diff --git a/src/gateway/server-methods/cron.ts b/src/gateway/server-methods/cron.ts index 01356e22f83..8a197dd93d0 100644 --- a/src/gateway/server-methods/cron.ts +++ b/src/gateway/server-methods/cron.ts @@ -16,9 +16,9 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { resolveCronDeliveryPreviews } from "../../cron/delivery-preview.js"; import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js"; import { + isInvalidCronRunLogJobIdError, readCronRunLogEntriesPage, readCronRunLogEntriesPageAll, - resolveCronRunLogPath, } from "../../cron/run-log.js"; import { applyJobPatch } from "../../cron/service/jobs.js"; import { isInvalidCronSessionTargetIdError } from "../../cron/session-target.js"; @@ -586,32 +586,30 @@ export const cronHandlers: GatewayRequestHandlers = { respond(true, page, undefined); return; } - let logPath: string; try { - logPath = resolveCronRunLogPath({ + const page = await readCronRunLogEntriesPage({ storePath: context.cronStorePath, jobId: jobId as string, + limit: p.limit, + offset: p.offset, + statuses: p.statuses, + status: p.status, + runId: p.runId, + deliveryStatuses: p.deliveryStatuses, + deliveryStatus: p.deliveryStatus, + query: p.query, + sortDir: p.sortDir, }); - } catch { + respond(true, page, undefined); + } catch (err) { + if (!isInvalidCronRunLogJobIdError(err)) { + throw err; + } respond( false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.runs params: invalid id"), ); - return; } - const page = await readCronRunLogEntriesPage(logPath, { - limit: p.limit, - offset: p.offset, - jobId: jobId as string, - statuses: p.statuses, - status: p.status, - runId: p.runId, - deliveryStatuses: p.deliveryStatuses, - deliveryStatus: p.deliveryStatus, - query: p.query, - sortDir: p.sortDir, - }); - respond(true, page, undefined); }, }; diff --git a/src/state/openclaw-state-db.test.ts b/src/state/openclaw-state-db.test.ts index 06522bed983..8e4abe89766 100644 --- a/src/state/openclaw-state-db.test.ts +++ b/src/state/openclaw-state-db.test.ts @@ -132,7 +132,10 @@ describe("openclaw state database", () => { process.env["OPENCLAW_STATE_DIR"] = stateDir; try { expect( - readCronRunLogEntriesSync(path.join(stateDir, "cron", "runs", "legacy-job.jsonl")), + readCronRunLogEntriesSync({ + storePath: path.join(stateDir, "cron", "jobs.json"), + jobId: "legacy-job", + }), ).toMatchObject([{ action: "finished", jobId: "legacy-job", ts: 12345 }]); } finally { if (previousStateDir === undefined) { diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 870cbcf806f..a7f2b78e66b 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -170,8 +170,7 @@ function createTaskRegistryMaintenanceHarness(params: { isCronRuntimeAuthoritative: () => params.cronRuntimeAuthoritative ?? true, resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json", loadCronStoreSync: () => params.cronStore ?? { version: 1, jobs: [] }, - resolveCronRunLogPath: ({ jobId }) => jobId, - readCronRunLogEntriesSync: (jobId) => cronRunLogEntries[jobId] ?? [], + readCronRunLogEntriesSync: ({ jobId }) => (jobId ? (cronRunLogEntries[jobId] ?? []) : []), }; setTaskRegistryMaintenanceRuntimeForTests(runtime); diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index c8dd6a8bb5f..3d9e18e87a6 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -12,7 +12,7 @@ import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import type { SessionEntry } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { isCronJobActive } from "../cron/active-jobs.js"; -import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js"; +import { readCronRunLogEntriesSync } from "../cron/run-log.js"; import type { CronRunLogEntry } from "../cron/run-log.js"; import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js"; import type { CronJob, CronStoreFile } from "../cron/types.js"; @@ -114,7 +114,6 @@ type TaskRegistryMaintenanceRuntime = { isCronRuntimeAuthoritative: () => boolean; resolveCronStorePath: typeof resolveCronStorePath; loadCronStoreSync: typeof loadCronStoreSync; - resolveCronRunLogPath: typeof resolveCronRunLogPath; readCronRunLogEntriesSync: typeof readCronRunLogEntriesSync; }; @@ -154,7 +153,6 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { isCronRuntimeAuthoritative: () => configuredCronRuntimeAuthoritative, resolveCronStorePath: () => configuredCronStorePath ?? resolveCronStorePath(), loadCronStoreSync, - resolveCronRunLogPath, readCronRunLogEntriesSync, }; @@ -367,12 +365,9 @@ function getCronRunLogEntries(context: CronRecoveryContext, jobId: string): Cron } let entries: CronRunLogEntry[] = []; try { - const logPath = taskRegistryMaintenanceRuntime.resolveCronRunLogPath({ + entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync({ storePath: context.storePath, jobId, - }); - entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(logPath, { - jobId, limit: 5000, }); } catch { diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 5b3cc29232b..a0385e27342 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -190,7 +190,6 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: { isCronRuntimeAuthoritative: () => true, resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json", loadCronStoreSync: () => ({ version: 1, jobs: [] }), - resolveCronRunLogPath: ({ jobId }) => jobId, readCronRunLogEntriesSync: () => [], }); } @@ -2654,7 +2653,6 @@ describe("task-registry", () => { isCronRuntimeAuthoritative: () => true, resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json", loadCronStoreSync: () => ({ version: 1, jobs: [] }), - resolveCronRunLogPath: ({ jobId }) => jobId, readCronRunLogEntriesSync: () => [], });