From 1fae716a04e54754e9cb13a7d0c9616a92bfe1e1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 26 Apr 2026 07:20:44 +0100 Subject: [PATCH] fix: recover stale cron task records --- CHANGELOG.md | 1 + docs/automation/cron-jobs.md | 2 +- docs/automation/tasks.md | 20 +- docs/cli/tasks.md | 4 + src/commands/status.summary.test.ts | 1 + src/commands/status.summary.ts | 4 + src/commands/tasks.ts | 13 +- src/cron/run-log.test.ts | 31 ++ src/cron/run-log.ts | 18 ++ src/cron/store.test.ts | 15 +- src/cron/store.ts | 87 ++++++ src/gateway/server-startup-early.ts | 10 +- ...k-registry.maintenance.issue-60299.test.ts | 136 +++++++++ src/tasks/task-registry.maintenance.ts | 270 +++++++++++++++++- src/tasks/task-registry.test.ts | 12 + 15 files changed, 609 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 409d43213ec..d5496b79e0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai - Installer: load nvm before Node.js detection so `curl | bash` installs respect nvm-managed Node instead of stale system Node. Fixes #49556. Thanks @heavenlxj. - CLI/Volta: respawn raw `openclaw` CLI runs through the named `node` shim when the current Node executable resolves to `volta-shim`, avoiding direct shim execution failures in non-interactive shells. Fixes #68672. Thanks @sanchezm86. - Installer: warn when multiple npm global roots contain OpenClaw installs, showing active Node/npm/openclaw plus each install path and version so stale version-manager installs are visible. Fixes #40839. Thanks @zhixianio. +- Cron/tasks: recover completed cron task ledger records from durable run logs and job state before marking them `lost`, reducing false `backing session missing` audit errors for isolated cron runs and keeping offline CLI audit from treating its empty local cron active-job set as authoritative. Fixes #71963. - Docker: copy patched dependency files into runtime images so downstream `pnpm install` layers keep working. Fixes #69224. Thanks @gucasbrg. - Agents/runtime: submit heartbeat, cron, and exec wakeups as transient runtime context instead of visible user prompts, keeping synthetic system work out of chat transcripts. Fixes #66496 and #66814. Thanks @jeades and @mandomaker. - Telegram: include native quote excerpts automatically for threaded replies and reply tags when the original Telegram text is available, without adding another config knob. Fixes #6975. Thanks @rex05ai. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 289ebd1105c..c8f072aa42c 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -51,7 +51,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t -Task reconciliation for cron is runtime-owned: an active cron task stays live while the cron runtime still tracks that job as running, even if an old child session row still exists. Once the runtime stops owning the job and the 5-minute grace window expires, maintenance can mark the task `lost`. +Task reconciliation for cron is runtime-owned first, durable-history-backed second: an active cron task stays live while the cron runtime still tracks that job as running, even if an old child session row still exists. Once the runtime stops owning the job and the 5-minute grace window expires, maintenance checks persisted run logs and job state for the matching `cron::` run. If that durable history shows a terminal result, the task ledger is finalized from it; otherwise Gateway-owned maintenance can mark the task `lost`. Offline CLI audit can recover from durable history, but it does not treat its own empty in-process active-job set as proof that a Gateway-owned cron run is gone. ## Schedule types diff --git a/docs/automation/tasks.md b/docs/automation/tasks.md index 3c226f6b190..f97cd4a2196 100644 --- a/docs/automation/tasks.md +++ b/docs/automation/tasks.md @@ -25,8 +25,12 @@ Not every agent run creates a task. Heartbeat turns and normal interactive chat - Tasks are **records**, not schedulers — cron and heartbeat decide _when_ work runs, tasks track _what happened_. - ACP, subagents, all cron jobs, and CLI operations create tasks. Heartbeat turns do not. - Each task moves through `queued → running → terminal` (succeeded, failed, timed_out, cancelled, or lost). -- Cron tasks stay live while the cron runtime still owns the job; chat-backed CLI tasks stay live only while their owning run context is still active. -- Completion is push-driven: detached work can notify directly or wake the requester session/heartbeat when it finishes, so status polling loops are usually the wrong shape. +- Cron tasks stay live while the cron runtime still owns the job; if the + in-memory runtime state is gone, task maintenance first checks durable cron + run history before marking a task lost. +- Completion is push-driven: detached work can notify directly or wake the + requester session/heartbeat when it finishes, so status polling loops are + usually the wrong shape. - Isolated cron runs and subagent completions best-effort clean up tracked browser tabs/processes for their child session before final cleanup bookkeeping. - Isolated cron delivery suppresses stale interim parent replies while descendant subagent work is still draining, and it prefers final descendant output when that arrives before delivery. - Completion notifications are delivered directly to a channel or queued for the next heartbeat. @@ -143,8 +147,14 @@ Agent run completion is authoritative for active task records. A successful deta - ACP tasks: backing ACP child session metadata disappeared. - Subagent tasks: backing child session disappeared from the target agent store. -- Cron tasks: the cron runtime no longer tracks the job as active. -- CLI tasks: isolated child-session tasks use the child session; chat-backed CLI tasks use the live run context instead, so lingering channel/group/direct session rows do not keep them alive. Gateway-backed `openclaw agent` runs also finalize from their run result, so completed runs do not sit active until the sweeper marks them `lost`. +- Cron tasks: the cron runtime no longer tracks the job as active and durable + cron run history does not show a terminal result for that run. Offline CLI + audit does not treat its own empty in-process cron runtime state as authority. +- CLI tasks: isolated child-session tasks use the child session; chat-backed + CLI tasks use the live run context instead, so lingering + channel/group/direct session rows do not keep them alive. Gateway-backed + `openclaw agent` runs also finalize from their run result, so completed runs + do not sit active until the sweeper marks them `lost`. ## Delivery and notifications @@ -236,7 +246,7 @@ openclaw tasks notify state_changes Reconciliation is runtime-aware: - ACP/subagent tasks check their backing child session. - - Cron tasks check whether the cron runtime still owns the job. + - Cron tasks check whether the cron runtime still owns the job, then recover terminal status from persisted cron run logs/job state before falling back to `lost`. Only the Gateway process is authoritative for the in-memory cron active-job set; offline CLI audit uses durable history but does not mark a cron task lost solely because that local Set is empty. - Chat-backed CLI tasks check the owning live run context, not just the chat session row. Completion cleanup is also runtime-aware: diff --git a/docs/cli/tasks.md b/docs/cli/tasks.md index 58ea07f9730..ca49545a557 100644 --- a/docs/cli/tasks.md +++ b/docs/cli/tasks.md @@ -84,6 +84,10 @@ openclaw tasks maintenance [--apply] [--json] ``` Previews or applies task and Task Flow reconciliation, cleanup stamping, and pruning. +For cron tasks, reconciliation uses persisted run logs/job state before marking an +old active task `lost`, so completed cron runs do not become false audit errors +just because the in-memory Gateway runtime state is gone. Offline CLI audit is +not authoritative for the Gateway's process-local cron active-job set. ### `flow` diff --git a/src/commands/status.summary.test.ts b/src/commands/status.summary.test.ts index c7b48a1b5b3..ed09d643fbf 100644 --- a/src/commands/status.summary.test.ts +++ b/src/commands/status.summary.test.ts @@ -58,6 +58,7 @@ vi.mock("../infra/system-events.js", () => ({ })); vi.mock("../tasks/task-registry.maintenance.js", () => ({ + configureTaskRegistryMaintenance: vi.fn(), getInspectableTaskRegistrySummary: vi.fn(() => ({ total: 0, active: 0, diff --git a/src/commands/status.summary.ts b/src/commands/status.summary.ts index e0f46ba6187..dcfb48a3468 100644 --- a/src/commands/status.summary.ts +++ b/src/commands/status.summary.ts @@ -4,6 +4,7 @@ import { resolveStorePath } from "../config/sessions/paths.js"; import { readSessionStoreReadOnly } from "../config/sessions/store-read.js"; import { resolveSessionTotalTokens, type SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.js"; +import { resolveCronStorePath } from "../cron/store.js"; import { listGatewayAgentsBasic } from "../gateway/agent-list.js"; import { resolveHeartbeatSummaryForAgent } from "../infra/heartbeat-summary.js"; import { peekSystemEvents } from "../infra/system-events.js"; @@ -151,6 +152,9 @@ export async function getStatusSummary( const mainSessionKey = resolveMainSessionKey(cfg); const queuedSystemEvents = peekSystemEvents(mainSessionKey); const taskMaintenanceModule = await loadTaskRegistryMaintenanceModule(); + taskMaintenanceModule.configureTaskRegistryMaintenance({ + cronStorePath: resolveCronStorePath(cfg.cron?.store), + }); const tasks = taskMaintenanceModule.getInspectableTaskRegistrySummary(); const taskAudit = taskMaintenanceModule.getInspectableTaskAuditSummary(); diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 65dc46778d7..b8e4be919ed 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -1,3 +1,5 @@ +import { loadConfig } from "../config/config.js"; +import { resolveCronStorePath } from "../cron/store.js"; import type { RuntimeEnv } from "../runtime.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js"; @@ -24,6 +26,7 @@ import { compareTaskAuditFindingSortKeys } from "../tasks/task-registry.audit.sh import { getInspectableTaskAuditSummary, getInspectableTaskRegistrySummary, + configureTaskRegistryMaintenance, previewTaskRegistryMaintenance, runTaskRegistryMaintenance, } from "../tasks/task-registry.maintenance.js"; @@ -44,10 +47,16 @@ const RUN_PAD = 10; const info = theme.info; async function loadTaskCancelConfig() { - const { loadConfig } = await import("../config/config.js"); return loadConfig(); } +function configureTaskMaintenanceFromConfig(): void { + const cfg = loadConfig(); + configureTaskRegistryMaintenance({ + cronStorePath: resolveCronStorePath(cfg.cron?.store), + }); +} + function truncate(value: string, maxChars: number) { if (value.length <= maxChars) { return value; @@ -417,6 +426,7 @@ export async function tasksAuditCommand( }, runtime: RuntimeEnv, ) { + configureTaskMaintenanceFromConfig(); const severityFilter = opts.severity?.trim() as TaskSystemAuditSeverity | undefined; const codeFilter = opts.code?.trim() as TaskSystemAuditCode | undefined; const { allFindings, filteredFindings, taskFindings, summary } = toSystemAuditFindings({ @@ -491,6 +501,7 @@ export async function tasksMaintenanceCommand( opts: { json?: boolean; apply?: boolean }, runtime: RuntimeEnv, ) { + configureTaskMaintenanceFromConfig(); const auditBefore = getInspectableTaskAuditSummary(); const flowAuditBefore = getInspectableTaskFlowAuditSummary(); const taskMaintenance = opts.apply diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index 6fb6505e9d8..cc0758f0242 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -9,6 +9,7 @@ import { getPendingCronRunLogWriteCountForTests, readCronRunLogEntries, readCronRunLogEntriesPage, + readCronRunLogEntriesSync, resolveCronRunLogPruneOptions, resolveCronRunLogPath, } from "./run-log.js"; @@ -96,6 +97,36 @@ describe("cron run log", () => { }); }); + it("reads run-log entries synchronously for task reconciliation", async () => { + await withRunLogDir("openclaw-cron-log-sync-", async (dir) => { + const logPath = path.join(dir, "runs", "job-1.jsonl"); + await appendCronRunLog(logPath, { + ts: 1000, + jobId: "job-1", + action: "finished", + status: "ok", + runAtMs: 900, + durationMs: 100, + }); + await appendCronRunLog(logPath, { + ts: 2000, + jobId: "job-2", + action: "finished", + status: "error", + }); + + expect(readCronRunLogEntriesSync(logPath, { jobId: "job-1" })).toEqual([ + expect.objectContaining({ + jobId: "job-1", + status: "ok", + runAtMs: 900, + durationMs: 100, + }), + ]); + expect(readCronRunLogEntriesSync(path.join(dir, "runs", "missing.jsonl"))).toEqual([]); + }); + }); + it.skipIf(process.platform === "win32")( "writes run log files with secure permissions", async () => { diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index ea45719659f..95ff8864e37 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -1,4 +1,5 @@ import { randomBytes } from "node:crypto"; +import fsSync from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; import { parseByteSize } from "../cli/parse-bytes.js"; @@ -198,6 +199,23 @@ export async function readCronRunLogEntries( return page.entries.toReversed(); } +export function readCronRunLogEntriesSync( + filePath: string, + opts?: { limit?: number; jobId?: string }, +): CronRunLogEntry[] { + const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200))); + let raw: string; + try { + raw = fsSync.readFileSync(path.resolve(filePath), "utf-8"); + } catch (error) { + if (typeof error === "object" && error !== null && "code" in error && error.code === "ENOENT") { + return []; + } + throw error; + } + return parseAllRunLogEntries(raw, { jobId: opts?.jobId }).slice(-limit); +} + function normalizeRunStatusFilter(status?: string): CronRunLogStatusFilter { if (status === "ok" || status === "error" || status === "skipped" || status === "all") { return status; diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts index 4e7724d0291..82172eeb148 100644 --- a/src/cron/store.test.ts +++ b/src/cron/store.test.ts @@ -3,7 +3,7 @@ import os from "node:os"; import path from "node:path"; import { setTimeout as scheduleNativeTimeout } from "node:timers"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { loadCronStore, resolveCronStorePath, saveCronStore } from "./store.js"; +import { loadCronStore, loadCronStoreSync, resolveCronStorePath, saveCronStore } from "./store.js"; import type { CronStoreFile } from "./types.js"; let fixtureRoot = ""; @@ -125,6 +125,19 @@ describe("cron store", () => { }); }); + it("loads split cron state synchronously for task reconciliation", async () => { + const { storePath } = await makeStorePath(); + await saveCronStore(storePath, makeStore("job-sync", true)); + + const loaded = loadCronStoreSync(storePath); + + expect(loaded.jobs[0]).toMatchObject({ + id: "job-sync", + state: expect.any(Object), + updatedAtMs: expect.any(Number), + }); + }); + it("does not create a backup file when saving unchanged content", async () => { const store = await makeStorePath(); const payload = makeStore("job-1", true); diff --git a/src/cron/store.ts b/src/cron/store.ts index 847b09a8d73..83c1729d449 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -114,6 +114,39 @@ async function loadStateFile(statePath: string): Promise { } } +function loadStateFileSync(statePath: string): CronStateFile | null { + let raw: string; + try { + raw = fs.readFileSync(statePath, "utf-8"); + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return null; + } + throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, { + cause: err, + }); + } + + try { + const parsed = parseJsonWithJson5Fallback(raw); + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + const record = parsed as Record; + if ( + record.version !== 1 || + typeof record.jobs !== "object" || + record.jobs === null || + Array.isArray(record.jobs) + ) { + return null; + } + return { version: 1, jobs: record.jobs as Record }; + } catch { + return null; + } +} + function hasInlineState(jobs: Array | null | undefined>): boolean { return jobs.some( (job) => @@ -219,6 +252,60 @@ export async function loadCronStore(storePath: string): Promise { } } +export function loadCronStoreSync(storePath: string): CronStoreFile { + try { + const raw = fs.readFileSync(storePath, "utf-8"); + let parsed: unknown; + try { + parsed = parseJsonWithJson5Fallback(raw); + } catch (err) { + throw new Error(`Failed to parse cron store at ${storePath}: ${String(err)}`, { + cause: err, + }); + } + const parsedRecord = + parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : {}; + const jobs = Array.isArray(parsedRecord.jobs) ? (parsedRecord.jobs as never[]) : []; + const store = { + version: 1 as const, + jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"], + }; + + const stateFile = loadStateFileSync(resolveStatePath(storePath)); + const hasLegacyInlineState = + !stateFile && hasInlineState(jobs as unknown as Array>); + + if (stateFile) { + for (const job of store.jobs) { + const entry = stateFile.jobs[job.id]; + if (entry) { + job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs); + job.state = (entry.state ?? {}) as never; + } else { + backfillMissingRuntimeFields(job); + } + } + } else if (!hasLegacyInlineState) { + for (const job of store.jobs) { + backfillMissingRuntimeFields(job); + } + } + + for (const job of store.jobs) { + ensureJobStateObject(job); + } + + return store; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return { version: 1, jobs: [] }; + } + throw err; + } +} + type SaveCronStoreOptions = { skipBackup?: boolean; }; diff --git a/src/gateway/server-startup-early.ts b/src/gateway/server-startup-early.ts index a00ba32bdbd..0d9cb54ef88 100644 --- a/src/gateway/server-startup-early.ts +++ b/src/gateway/server-startup-early.ts @@ -1,6 +1,7 @@ import { registerSkillsChangeListener } from "../agents/skills/refresh.js"; import type { GatewayTailscaleMode } from "../config/types.gateway.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { resolveCronStorePath } from "../cron/store.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; import { primeRemoteSkillsCache, @@ -8,7 +9,10 @@ import { setSkillsRemoteRegistry, } from "../infra/skills-remote.js"; import type { PluginRegistry } from "../plugins/registry-types.js"; -import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js"; +import { + configureTaskRegistryMaintenance, + startTaskRegistryMaintenance, +} from "../tasks/task-registry.maintenance.js"; import { startGatewayDiscovery } from "./server-discovery-runtime.js"; import { startGatewayMaintenanceTimers } from "./server-maintenance.js"; @@ -77,6 +81,10 @@ export async function startGatewayEarlyRuntime(params: { if (!params.minimalTestGateway) { setSkillsRemoteRegistry(params.nodeRegistry); void primeRemoteSkillsCache(); + configureTaskRegistryMaintenance({ + cronStorePath: resolveCronStorePath(params.cfgAtStart.cron?.store), + cronRuntimeAuthoritative: true, + }); startTaskRegistryMaintenance(); } diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 2b9adbb5295..8a266dea1b7 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -1,6 +1,8 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js"; import type { SessionEntry } from "../config/sessions.js"; +import type { CronRunLogEntry } from "../cron/run-log.js"; +import type { CronStoreFile } from "../cron/types.js"; import type { ParsedAgentSessionKey } from "../routing/session-key.js"; import { resetDetachedTaskLifecycleRuntimeForTests, @@ -9,6 +11,7 @@ import { } from "./detached-task-runtime.js"; import { previewTaskRegistryMaintenance, + reconcileInspectableTasks, resetTaskRegistryMaintenanceRuntimeForTests, runTaskRegistryMaintenance, setTaskRegistryMaintenanceRuntimeForTests, @@ -53,11 +56,15 @@ function createTaskRegistryMaintenanceHarness(params: { acpEntry?: AcpSessionStoreEntry["entry"]; activeCronJobIds?: string[]; activeRunIds?: string[]; + cronStore?: CronStoreFile; + cronRunLogEntries?: Record; + cronRuntimeAuthoritative?: boolean; }) { const sessionStore = params.sessionStore ?? {}; const acpEntry = params.acpEntry; const activeCronJobIds = new Set(params.activeCronJobIds ?? []); const activeRunIds = new Set(params.activeRunIds ?? []); + const cronRunLogEntries = params.cronRunLogEntries ?? {}; const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }])); const runtime: TaskRegistryMaintenanceRuntime = { @@ -113,6 +120,24 @@ function createTaskRegistryMaintenanceHarness(params: { currentTasks.set(patch.taskId, next); return next; }, + markTaskTerminalById: (patch) => { + const current = currentTasks.get(patch.taskId); + if (!current) { + return null; + } + const next = { + ...current, + status: patch.status, + endedAt: patch.endedAt, + lastEventAt: patch.lastEventAt ?? patch.endedAt, + ...(patch.error !== undefined ? { error: patch.error } : {}), + ...(patch.terminalSummary !== undefined + ? { terminalSummary: patch.terminalSummary ?? undefined } + : {}), + } satisfies TaskRecord; + currentTasks.set(patch.taskId, next); + return next; + }, maybeDeliverTaskTerminalUpdate: async () => null, resolveTaskForLookupToken: () => undefined, setTaskCleanupAfterById: (patch) => { @@ -124,6 +149,11 @@ function createTaskRegistryMaintenanceHarness(params: { currentTasks.set(patch.taskId, next); return next; }, + isCronRuntimeAuthoritative: () => params.cronRuntimeAuthoritative ?? true, + resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json", + loadCronStoreSync: () => params.cronStore ?? { version: 1, jobs: [] }, + resolveCronRunLogPath: ({ jobId }) => jobId, + readCronRunLogEntriesSync: (jobId) => cronRunLogEntries[jobId] ?? [], }; setTaskRegistryMaintenanceRuntimeForTests(runtime); @@ -164,6 +194,112 @@ describe("task-registry maintenance issue #60299", () => { expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); }); + it("does not mark cron tasks lost when the current process is not the cron runtime authority", async () => { + const task = makeStaleTask({ + runtime: "cron", + sourceId: "cron-job-offline-audit", + childSessionKey: undefined, + }); + + const { currentTasks } = createTaskRegistryMaintenanceHarness({ + tasks: [task], + cronRuntimeAuthoritative: false, + }); + + expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); + expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); + }); + + it("recovers finished cron tasks from durable run logs before marking them lost", async () => { + const startedAt = Date.now() - GRACE_EXPIRED_MS; + const task = makeStaleTask({ + runtime: "cron", + sourceId: "cron-job-run-log-ok", + runId: `cron:cron-job-run-log-ok:${startedAt}`, + startedAt, + lastEventAt: startedAt, + }); + + const { currentTasks } = createTaskRegistryMaintenanceHarness({ + tasks: [task], + cronRunLogEntries: { + "cron-job-run-log-ok": [ + { + ts: startedAt + 1250, + jobId: "cron-job-run-log-ok", + action: "finished", + status: "ok", + summary: "done", + runAtMs: startedAt, + durationMs: 1250, + }, + ], + }, + }); + + expect(reconcileInspectableTasks()).toEqual([ + expect.objectContaining({ + taskId: task.taskId, + status: "succeeded", + endedAt: startedAt + 1250, + terminalSummary: "done", + }), + ]); + expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 }); + expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ + status: "succeeded", + endedAt: startedAt + 1250, + terminalSummary: "done", + }); + }); + + it("recovers interrupted cron tasks from durable cron job state when run logs are absent", async () => { + const startedAt = Date.now() - GRACE_EXPIRED_MS; + const task = makeStaleTask({ + runtime: "cron", + sourceId: "cron-job-state-error", + runId: `cron:cron-job-state-error:${startedAt}`, + startedAt, + lastEventAt: startedAt, + }); + + const { currentTasks } = createTaskRegistryMaintenanceHarness({ + tasks: [task], + cronStore: { + version: 1, + jobs: [ + { + id: "cron-job-state-error", + name: "state error", + enabled: true, + createdAtMs: startedAt - 60_000, + updatedAtMs: startedAt, + schedule: { kind: "every", everyMs: 60_000, anchorMs: startedAt - 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "work" }, + state: { + lastRunAtMs: startedAt, + lastRunStatus: "error", + lastError: "cron: job interrupted by gateway restart", + lastDurationMs: 5000, + }, + }, + ], + }, + }); + + expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 }); + expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ + status: "failed", + endedAt: startedAt + 5000, + error: "cron: job interrupted by gateway restart", + }); + }); + it("marks chat-backed cli tasks lost after the owning run context disappears", async () => { const channelKey = "agent:main:workspace:channel:C1234567890"; const task = makeStaleTask({ diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index eb1257f4ceb..fc60cf988df 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -1,6 +1,10 @@ import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { isCronJobActive } from "../cron/active-jobs.js"; +import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js"; +import type { CronRunLogEntry } from "../cron/run-log.js"; +import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js"; +import type { CronJob, CronStoreFile } from "../cron/types.js"; import { getAgentRunContext } from "../infra/agent-events.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; @@ -12,6 +16,7 @@ import { getTaskById, listTaskRecords, markTaskLostById, + markTaskTerminalById, maybeDeliverTaskTerminalUpdate, resolveTaskForLookupToken, setTaskCleanupAfterById, @@ -23,7 +28,7 @@ import { } from "./task-registry.audit.js"; import type { TaskAuditSummary } from "./task-registry.audit.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; -import type { TaskRecord, TaskRegistrySummary } from "./task-registry.types.js"; +import type { TaskRecord, TaskRegistrySummary, TaskStatus } from "./task-registry.types.js"; const TASK_RECONCILE_GRACE_MS = 5 * 60_000; const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; @@ -38,6 +43,8 @@ const SWEEP_YIELD_BATCH_SIZE = 25; let sweeper: NodeJS.Timeout | null = null; let deferredSweep: NodeJS.Timeout | null = null; let sweepInProgress = false; +let configuredCronStorePath: string | undefined; +let configuredCronRuntimeAuthoritative = false; type TaskRegistryMaintenanceRuntime = { readAcpSessionEntry: typeof readAcpSessionEntry; @@ -51,9 +58,15 @@ type TaskRegistryMaintenanceRuntime = { getTaskById: typeof getTaskById; listTaskRecords: typeof listTaskRecords; markTaskLostById: typeof markTaskLostById; + markTaskTerminalById: typeof markTaskTerminalById; maybeDeliverTaskTerminalUpdate: typeof maybeDeliverTaskTerminalUpdate; resolveTaskForLookupToken: typeof resolveTaskForLookupToken; setTaskCleanupAfterById: typeof setTaskCleanupAfterById; + isCronRuntimeAuthoritative: () => boolean; + resolveCronStorePath: typeof resolveCronStorePath; + loadCronStoreSync: typeof loadCronStoreSync; + resolveCronRunLogPath: typeof resolveCronRunLogPath; + readCronRunLogEntriesSync: typeof readCronRunLogEntriesSync; }; const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { @@ -68,9 +81,15 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { getTaskById, listTaskRecords, markTaskLostById, + markTaskTerminalById, maybeDeliverTaskTerminalUpdate, resolveTaskForLookupToken, setTaskCleanupAfterById, + isCronRuntimeAuthoritative: () => configuredCronRuntimeAuthoritative, + resolveCronStorePath: () => configuredCronStorePath ?? resolveCronStorePath(), + loadCronStoreSync, + resolveCronRunLogPath, + readCronRunLogEntriesSync, }; let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = @@ -83,6 +102,32 @@ export type TaskRegistryMaintenanceSummary = { pruned: number; }; +type CronExecutionId = { + jobId: string; + startedAt: number; +}; + +type CronTerminalRecovery = { + status: Extract; + endedAt: number; + lastEventAt: number; + error?: string; + terminalSummary?: string; +}; + +type CronRecoveryContext = { + storePath: string; + store?: CronStoreFile | null; + runLogsByJobId: Map; +}; + +function createCronRecoveryContext(): CronRecoveryContext { + return { + storePath: taskRegistryMaintenanceRuntime.resolveCronStorePath(), + runLogsByJobId: new Map(), + }; +} + function findSessionEntryByKey(store: Record, sessionKey: string): unknown { const direct = store[sessionKey]; if (direct) { @@ -110,6 +155,142 @@ function hasLostGraceExpired(task: TaskRecord, now: number): boolean { return now - referenceAt >= TASK_RECONCILE_GRACE_MS; } +function parseCronExecutionId(task: TaskRecord): CronExecutionId | undefined { + const runId = task.runId?.trim(); + if (!runId?.startsWith("cron:")) { + return undefined; + } + const separator = runId.lastIndexOf(":"); + if (separator <= "cron:".length) { + return undefined; + } + const startedAt = Number(runId.slice(separator + 1)); + if (!Number.isFinite(startedAt)) { + return undefined; + } + const jobId = runId.slice("cron:".length, separator).trim(); + if (!jobId || (task.sourceId?.trim() && task.sourceId.trim() !== jobId)) { + return undefined; + } + return { jobId, startedAt }; +} + +function isTimeoutCronError(error: string | undefined): boolean { + return error === "cron: job execution timed out"; +} + +function mapCronTerminalStatus(status: unknown, error?: string): CronTerminalRecovery["status"] { + if (status === "ok" || status === "skipped") { + return "succeeded"; + } + return isTimeoutCronError(error) ? "timed_out" : "failed"; +} + +function getCronRunLogEntries(context: CronRecoveryContext, jobId: string): CronRunLogEntry[] { + const cached = context.runLogsByJobId.get(jobId); + if (cached) { + return cached; + } + let entries: CronRunLogEntry[] = []; + try { + const logPath = taskRegistryMaintenanceRuntime.resolveCronRunLogPath({ + storePath: context.storePath, + jobId, + }); + entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(logPath, { + jobId, + limit: 5000, + }); + } catch { + entries = []; + } + context.runLogsByJobId.set(jobId, entries); + return entries; +} + +function getCronStore(context: CronRecoveryContext): CronStoreFile | null { + if (context.store !== undefined) { + return context.store; + } + try { + context.store = taskRegistryMaintenanceRuntime.loadCronStoreSync(context.storePath); + } catch { + context.store = null; + } + return context.store; +} + +function resolveCronRunLogRecovery( + execution: CronExecutionId, + context: CronRecoveryContext, +): CronTerminalRecovery | undefined { + const entries = getCronRunLogEntries(context, execution.jobId); + const entry = entries.findLast( + (candidate) => + candidate.jobId === execution.jobId && + candidate.action === "finished" && + candidate.runAtMs === execution.startedAt && + (candidate.status === "ok" || candidate.status === "skipped" || candidate.status === "error"), + ); + if (!entry) { + return undefined; + } + const durationMs = + typeof entry.durationMs === "number" && Number.isFinite(entry.durationMs) + ? Math.max(0, entry.durationMs) + : undefined; + const endedAt = durationMs === undefined ? entry.ts : execution.startedAt + durationMs; + return { + status: mapCronTerminalStatus(entry.status, entry.error), + endedAt, + lastEventAt: endedAt, + ...(entry.error !== undefined ? { error: entry.error } : {}), + ...(entry.summary !== undefined ? { terminalSummary: entry.summary } : {}), + }; +} + +function resolveCronJobStateRecovery( + execution: CronExecutionId, + context: CronRecoveryContext, +): CronTerminalRecovery | undefined { + const store = getCronStore(context); + const job: CronJob | undefined = store?.jobs.find((entry) => entry.id === execution.jobId); + if (!job || job.state.lastRunAtMs !== execution.startedAt) { + return undefined; + } + const status = job.state.lastRunStatus ?? job.state.lastStatus; + if (status !== "ok" && status !== "skipped" && status !== "error") { + return undefined; + } + const durationMs = + typeof job.state.lastDurationMs === "number" && Number.isFinite(job.state.lastDurationMs) + ? Math.max(0, job.state.lastDurationMs) + : 0; + const endedAt = execution.startedAt + durationMs; + return { + status: mapCronTerminalStatus(status, job.state.lastError), + endedAt, + lastEventAt: endedAt, + ...(job.state.lastError !== undefined ? { error: job.state.lastError } : {}), + }; +} + +function resolveDurableCronTaskRecovery( + task: TaskRecord, + context: CronRecoveryContext, +): CronTerminalRecovery | undefined { + if (task.runtime !== "cron" || !isActiveTask(task)) { + return undefined; + } + const execution = parseCronExecutionId(task); + if (!execution) { + return undefined; + } + return ( + resolveCronRunLogRecovery(execution, context) ?? resolveCronJobStateRecovery(execution, context) + ); +} + function hasActiveCliRun(task: TaskRecord): boolean { const candidateRunIds = [task.sourceId, task.runId]; for (const candidate of candidateRunIds) { @@ -123,6 +304,9 @@ function hasActiveCliRun(task: TaskRecord): boolean { function hasBackingSession(task: TaskRecord): boolean { if (task.runtime === "cron") { + if (!taskRegistryMaintenanceRuntime.isCronRuntimeAuthoritative()) { + return true; + } const jobId = task.sourceId?.trim(); return jobId ? taskRegistryMaintenanceRuntime.isCronJobActive(jobId) : false; } @@ -204,6 +388,41 @@ function markTaskLost(task: TaskRecord, now: number): TaskRecord { return updated; } +function markTaskRecovered(task: TaskRecord, recovery: CronTerminalRecovery): TaskRecord { + const updated = + taskRegistryMaintenanceRuntime.markTaskTerminalById({ + taskId: task.taskId, + status: recovery.status, + endedAt: recovery.endedAt, + lastEventAt: recovery.lastEventAt, + ...(recovery.error !== undefined ? { error: recovery.error } : {}), + ...(recovery.terminalSummary !== undefined + ? { terminalSummary: recovery.terminalSummary } + : {}), + }) ?? projectTaskRecovered(task, recovery); + void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId); + return updated; +} + +function projectTaskRecovered(task: TaskRecord, recovery: CronTerminalRecovery): TaskRecord { + const projected: TaskRecord = { + ...task, + status: recovery.status, + endedAt: recovery.endedAt, + lastEventAt: recovery.lastEventAt, + ...(recovery.error !== undefined ? { error: recovery.error } : {}), + ...(recovery.terminalSummary !== undefined + ? { terminalSummary: recovery.terminalSummary } + : {}), + }; + return { + ...projected, + ...(typeof projected.cleanupAfter === "number" + ? {} + : { cleanupAfter: resolveCleanupAfter(projected) }), + }; +} + function projectTaskLost(task: TaskRecord, now: number): TaskRecord { const projected: TaskRecord = { ...task, @@ -220,7 +439,14 @@ function projectTaskLost(task: TaskRecord, now: number): TaskRecord { }; } -export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): TaskRecord { +export function reconcileTaskRecordForOperatorInspection( + task: TaskRecord, + context: CronRecoveryContext = createCronRecoveryContext(), +): TaskRecord { + const cronRecovery = resolveDurableCronTaskRecovery(task, context); + if (cronRecovery) { + return projectTaskRecovered(task, cronRecovery); + } const now = Date.now(); if (!shouldMarkLost(task, now)) { return task; @@ -230,9 +456,10 @@ export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): Task export function reconcileInspectableTasks(): TaskRecord[] { taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); + const cronRecoveryContext = createCronRecoveryContext(); return taskRegistryMaintenanceRuntime .listTaskRecords() - .map((task) => reconcileTaskRecordForOperatorInspection(task)); + .map((task) => reconcileTaskRecordForOperatorInspection(task, cronRecoveryContext)); } configureTaskAuditTaskProvider(reconcileInspectableTasks); @@ -253,15 +480,21 @@ export function reconcileTaskLookupToken(token: string): TaskRecord | undefined } // Preview is synchronous and cannot call the async detached-task recovery hook, -// so recovered tasks are counted under reconciled here. The real sweep -// in runTaskRegistryMaintenance splits them into reconciled vs recovered. +// so hook-recovered tasks are counted under reconciled here. Durable cron +// recovery is synchronous and can be previewed exactly. export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary { taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); const now = Date.now(); let reconciled = 0; + let recovered = 0; let cleanupStamped = 0; let pruned = 0; + const cronRecoveryContext = createCronRecoveryContext(); for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) { + if (resolveDurableCronTaskRecovery(task, cronRecoveryContext)) { + recovered += 1; + continue; + } if (shouldMarkLost(task, now)) { reconciled += 1; continue; @@ -274,7 +507,7 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary cleanupStamped += 1; } } - return { reconciled, recovered: 0, cleanupStamped, pruned }; + return { reconciled, recovered, cleanupStamped, pruned }; } /** @@ -305,12 +538,25 @@ export async function runTaskRegistryMaintenance(): Promise null, maybeDeliverTaskTerminalUpdate: async () => null, resolveTaskForLookupToken: () => undefined, setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => { @@ -143,6 +144,11 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: { params.currentTasks.set(patch.taskId, next); return next; }, + isCronRuntimeAuthoritative: () => true, + resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json", + loadCronStoreSync: () => ({ version: 1, jobs: [] }), + resolveCronRunLogPath: ({ jobId }) => jobId, + readCronRunLogEntriesSync: () => [], }); } @@ -1625,9 +1631,15 @@ describe("task-registry", () => { throw new Error("maintenance boom"); }, markTaskLostById: () => null, + markTaskTerminalById: () => null, maybeDeliverTaskTerminalUpdate: async () => null, resolveTaskForLookupToken: () => undefined, setTaskCleanupAfterById: () => null, + isCronRuntimeAuthoritative: () => true, + resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json", + loadCronStoreSync: () => ({ version: 1, jobs: [] }), + resolveCronRunLogPath: ({ jobId }) => jobId, + readCronRunLogEntriesSync: () => [], }); try {