diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index 84661ecc76b..86fb77eea58 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -7,6 +7,7 @@ const healthCommand = vi.fn(); const sessionsCommand = vi.fn(); const sessionsCleanupCommand = vi.fn(); const tasksListCommand = vi.fn(); +const tasksAuditCommand = vi.fn(); const tasksShowCommand = vi.fn(); const tasksNotifyCommand = vi.fn(); const tasksCancelCommand = vi.fn(); @@ -32,6 +33,7 @@ vi.mock("../../commands/sessions-cleanup.js", () => ({ vi.mock("../../commands/tasks.js", () => ({ tasksListCommand, + tasksAuditCommand, tasksShowCommand, tasksNotifyCommand, tasksCancelCommand, @@ -67,6 +69,7 @@ describe("registerStatusHealthSessionsCommands", () => { sessionsCommand.mockResolvedValue(undefined); sessionsCleanupCommand.mockResolvedValue(undefined); tasksListCommand.mockResolvedValue(undefined); + tasksAuditCommand.mockResolvedValue(undefined); tasksShowCommand.mockResolvedValue(undefined); tasksNotifyCommand.mockResolvedValue(undefined); tasksCancelCommand.mockResolvedValue(undefined); @@ -242,6 +245,30 @@ describe("registerStatusHealthSessionsCommands", () => { ); }); + it("runs tasks audit subcommand with filters", async () => { + await runCli([ + "tasks", + "--json", + "audit", + "--severity", + "error", + "--code", + "stale_running", + "--limit", + "5", + ]); + + expect(tasksAuditCommand).toHaveBeenCalledWith( + expect.objectContaining({ + json: true, + severity: "error", + code: "stale_running", + limit: 5, + }), + runtime, + ); + }); + it("runs tasks notify subcommand with lookup and policy forwarding", async () => { await runCli(["tasks", "notify", "run-123", "state_changes"]); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index cbb3069591d..3cc9b8024a5 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -4,6 +4,7 @@ import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; import { statusCommand } from "../../commands/status.js"; import { + tasksAuditCommand, tasksCancelCommand, tasksListCommand, tasksNotifyCommand, @@ -272,6 +273,38 @@ export function registerStatusHealthSessionsCommands(program: Command) { }); }); + tasksCmd + .command("audit") + .description("Show stale or broken background task runs") + .option("--json", "Output as JSON", false) + .option("--severity ", "Filter by severity (warn, error)") + .option( + "--code ", + "Filter by finding code (stale_queued, stale_running, lost, delivery_failed, missing_cleanup, inconsistent_timestamps)", + ) + .option("--limit ", "Limit displayed findings") + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await tasksAuditCommand( + { + json: Boolean(opts.json || parentOpts?.json), + severity: opts.severity as "warn" | "error" | undefined, + code: opts.code as + | "stale_queued" + | "stale_running" + | "lost" + | "delivery_failed" + | "missing_cleanup" + | "inconsistent_timestamps" + | undefined, + limit: parsePositiveIntOrUndefined(opts.limit), + }, + defaultRuntime, + ); + }); + }); + tasksCmd .command("show") .description("Show one background task by task id, run id, or session key") diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts index fbaea63a1d0..6ff8fcac761 100644 --- a/src/commands/tasks.test.ts +++ b/src/commands/tasks.test.ts @@ -3,6 +3,8 @@ import { createCliRuntimeCapture } from "../cli/test-runtime-capture.js"; const reconcileInspectableTasksMock = vi.fn(); const reconcileTaskLookupTokenMock = vi.fn(); +const listTaskAuditFindingsMock = vi.fn(); +const summarizeTaskAuditFindingsMock = vi.fn(); const updateTaskNotifyPolicyByIdMock = vi.fn(); const cancelTaskByIdMock = vi.fn(); const getTaskByIdMock = vi.fn(); @@ -13,6 +15,11 @@ vi.mock("../tasks/task-registry.reconcile.js", () => ({ reconcileTaskLookupToken: (...args: unknown[]) => reconcileTaskLookupTokenMock(...args), })); +vi.mock("../tasks/task-registry.audit.js", () => ({ + listTaskAuditFindings: (...args: unknown[]) => listTaskAuditFindingsMock(...args), + summarizeTaskAuditFindings: (...args: unknown[]) => summarizeTaskAuditFindingsMock(...args), +})); + vi.mock("../tasks/task-registry.js", () => ({ updateTaskNotifyPolicyById: (...args: unknown[]) => updateTaskNotifyPolicyByIdMock(...args), cancelTaskById: (...args: unknown[]) => cancelTaskByIdMock(...args), @@ -34,6 +41,7 @@ let tasksListCommand: typeof import("./tasks.js").tasksListCommand; let tasksShowCommand: typeof import("./tasks.js").tasksShowCommand; let tasksNotifyCommand: typeof import("./tasks.js").tasksNotifyCommand; let tasksCancelCommand: typeof import("./tasks.js").tasksCancelCommand; +let tasksAuditCommand: typeof import("./tasks.js").tasksAuditCommand; const taskFixture = { taskId: "task-12345678", @@ -59,8 +67,13 @@ const taskFixture = { } as const; beforeAll(async () => { - ({ tasksListCommand, tasksShowCommand, tasksNotifyCommand, tasksCancelCommand } = - await import("./tasks.js")); + ({ + tasksListCommand, + tasksShowCommand, + tasksNotifyCommand, + tasksCancelCommand, + tasksAuditCommand, + } = await import("./tasks.js")); }); describe("tasks commands", () => { @@ -69,6 +82,20 @@ describe("tasks commands", () => { resetRuntimeCapture(); reconcileInspectableTasksMock.mockReturnValue([]); reconcileTaskLookupTokenMock.mockReturnValue(undefined); + listTaskAuditFindingsMock.mockReturnValue([]); + summarizeTaskAuditFindingsMock.mockReturnValue({ + total: 0, + warnings: 0, + errors: 0, + byCode: { + stale_queued: 0, + stale_running: 0, + lost: 0, + delivery_failed: 0, + missing_cleanup: 0, + inconsistent_timestamps: 0, + }, + }); updateTaskNotifyPolicyByIdMock.mockReturnValue(undefined); cancelTaskByIdMock.mockResolvedValue({ found: false, cancelled: false, reason: "missing" }); getTaskByIdMock.mockReturnValue(undefined); @@ -137,4 +164,50 @@ describe("tasks commands", () => { expect(runtimeLogs[0]).toContain("Cancelled task-12345678 (acp) run run-12345678."); expect(runtimeErrors).toEqual([]); }); + + it("shows task audit findings with filters", async () => { + const findings = [ + { + severity: "error", + code: "stale_running", + task: taskFixture, + ageMs: 45 * 60_000, + detail: "running task appears stuck", + }, + { + severity: "warn", + code: "delivery_failed", + task: { + ...taskFixture, + taskId: "task-87654321", + status: "failed", + }, + ageMs: 10 * 60_000, + detail: "terminal update delivery failed", + }, + ]; + listTaskAuditFindingsMock.mockReturnValue(findings); + summarizeTaskAuditFindingsMock.mockReturnValue({ + total: 2, + warnings: 1, + errors: 1, + byCode: { + stale_queued: 0, + stale_running: 1, + lost: 0, + delivery_failed: 1, + missing_cleanup: 0, + inconsistent_timestamps: 0, + }, + }); + + await tasksAuditCommand({ severity: "error", code: "stale_running", limit: 1 }, runtime); + + expect(summarizeTaskAuditFindingsMock).toHaveBeenCalledWith(findings); + expect(runtimeLogs[0]).toContain("Task audit: 2 findings · 1 errors · 1 warnings"); + expect(runtimeLogs[1]).toContain("Showing 1 matching findings."); + expect(runtimeLogs.join("\n")).toContain("stale_running"); + expect(runtimeLogs.join("\n")).toContain("running task appears stuck"); + expect(runtimeLogs.join("\n")).not.toContain("delivery_failed"); + }); }); diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 69d93c5e540..daeaedd479c 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -1,6 +1,13 @@ import { loadConfig } from "../config/config.js"; import { info } from "../globals.js"; import type { RuntimeEnv } from "../runtime.js"; +import { + listTaskAuditFindings, + summarizeTaskAuditFindings, + type TaskAuditCode, + type TaskAuditFinding, + type TaskAuditSeverity, +} from "../tasks/task-registry.audit.js"; import { cancelTaskById, getTaskById, updateTaskNotifyPolicyById } from "../tasks/task-registry.js"; import { reconcileInspectableTasks, @@ -89,6 +96,60 @@ function formatTaskListSummary(tasks: TaskRecord[]) { return `${summary.byStatus.queued} queued · ${summary.byStatus.running} running · ${summary.failures} issues`; } +function formatAgeMs(ageMs: number | undefined): string { + if (typeof ageMs !== "number" || ageMs < 1000) { + return "fresh"; + } + const totalSeconds = Math.floor(ageMs / 1000); + const days = Math.floor(totalSeconds / 86_400); + const hours = Math.floor((totalSeconds % 86_400) / 3600); + const minutes = Math.floor((totalSeconds % 3600) / 60); + if (days > 0) { + return `${days}d${hours}h`; + } + if (hours > 0) { + return `${hours}h${minutes}m`; + } + if (minutes > 0) { + return `${minutes}m`; + } + return `${totalSeconds}s`; +} + +function formatAuditRows(findings: TaskAuditFinding[], rich: boolean) { + const header = [ + "Severity".padEnd(8), + "Code".padEnd(22), + "Task".padEnd(ID_PAD), + "Status".padEnd(STATUS_PAD), + "Age".padEnd(8), + "Detail", + ].join(" "); + const lines = [rich ? theme.heading(header) : header]; + for (const finding of findings) { + const severity = finding.severity.padEnd(8); + const status = formatTaskStatusCell(finding.task.status, rich); + const severityCell = !rich + ? severity + : finding.severity === "error" + ? theme.error(severity) + : theme.warn(severity); + lines.push( + [ + severityCell, + finding.code.padEnd(22), + shortToken(finding.task.taskId).padEnd(ID_PAD), + status, + formatAgeMs(finding.ageMs).padEnd(8), + truncate(finding.detail, 88), + ] + .join(" ") + .trimEnd(), + ); + } + return lines; +} + export async function tasksListCommand( opts: { json?: boolean; runtime?: string; status?: string }, runtime: RuntimeEnv, @@ -241,3 +302,77 @@ export async function tasksCancelCommand(opts: { lookup: string }, runtime: Runt `Cancelled ${updated?.taskId ?? task.taskId} (${updated?.runtime ?? task.runtime})${updated?.runId ? ` run ${updated.runId}` : ""}.`, ); } + +export async function tasksAuditCommand( + opts: { + json?: boolean; + severity?: TaskAuditSeverity; + code?: TaskAuditCode; + limit?: number; + }, + runtime: RuntimeEnv, +) { + const severityFilter = opts.severity?.trim() as TaskAuditSeverity | undefined; + const codeFilter = opts.code?.trim() as TaskAuditCode | undefined; + const allFindings = listTaskAuditFindings(); + const findings = allFindings.filter((finding) => { + if (severityFilter && finding.severity !== severityFilter) { + return false; + } + if (codeFilter && finding.code !== codeFilter) { + return false; + } + return true; + }); + const limit = typeof opts.limit === "number" && opts.limit > 0 ? opts.limit : undefined; + const displayed = limit ? findings.slice(0, limit) : findings; + const summary = summarizeTaskAuditFindings(allFindings); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + count: allFindings.length, + filteredCount: findings.length, + displayed: displayed.length, + filters: { + severity: severityFilter ?? null, + code: codeFilter ?? null, + limit: limit ?? null, + }, + summary, + findings: displayed, + }, + null, + 2, + ), + ); + return; + } + + runtime.log( + info( + `Task audit: ${summary.total} findings · ${summary.errors} errors · ${summary.warnings} warnings`, + ), + ); + if (severityFilter || codeFilter) { + runtime.log(info(`Showing ${findings.length} matching findings.`)); + } + if (severityFilter) { + runtime.log(info(`Severity filter: ${severityFilter}`)); + } + if (codeFilter) { + runtime.log(info(`Code filter: ${codeFilter}`)); + } + if (limit) { + runtime.log(info(`Limit: ${limit}`)); + } + if (displayed.length === 0) { + runtime.log("No task audit findings."); + return; + } + const rich = isRich(); + for (const line of formatAuditRows(displayed, rich)) { + runtime.log(line); + } +} diff --git a/src/infra/node-sqlite.ts b/src/infra/node-sqlite.ts new file mode 100644 index 00000000000..f433bd055f2 --- /dev/null +++ b/src/infra/node-sqlite.ts @@ -0,0 +1,17 @@ +import { createRequire } from "node:module"; +import { installProcessWarningFilter } from "./warning-filter.js"; + +const require = createRequire(import.meta.url); + +export function requireNodeSqlite(): typeof import("node:sqlite") { + installProcessWarningFilter(); + try { + return require("node:sqlite") as typeof import("node:sqlite"); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + throw new Error( + `SQLite support is unavailable in this Node runtime (missing node:sqlite). ${message}`, + { cause: err }, + ); + } +} diff --git a/src/tasks/task-registry.audit.test.ts b/src/tasks/task-registry.audit.test.ts new file mode 100644 index 00000000000..e2d8ddb41ad --- /dev/null +++ b/src/tasks/task-registry.audit.test.ts @@ -0,0 +1,100 @@ +import { describe, expect, it } from "vitest"; +import { listTaskAuditFindings, summarizeTaskAuditFindings } from "./task-registry.audit.js"; +import type { TaskRecord } from "./task-registry.types.js"; + +function createTask(partial: Partial): TaskRecord { + return { + taskId: partial.taskId ?? "task-1", + runtime: partial.runtime ?? "acp", + requesterSessionKey: partial.requesterSessionKey ?? "agent:main:main", + task: partial.task ?? "Background task", + status: partial.status ?? "queued", + deliveryStatus: partial.deliveryStatus ?? "pending", + notifyPolicy: partial.notifyPolicy ?? "done_only", + createdAt: partial.createdAt ?? Date.parse("2026-03-30T00:00:00.000Z"), + ...partial, + }; +} + +describe("task-registry audit", () => { + it("flags stale running, lost, and missing cleanup tasks", () => { + const now = Date.parse("2026-03-30T01:00:00.000Z"); + const findings = listTaskAuditFindings({ + now, + tasks: [ + createTask({ + taskId: "stale-running", + status: "running", + startedAt: now - 40 * 60_000, + lastEventAt: now - 40 * 60_000, + }), + createTask({ + taskId: "lost-task", + status: "lost", + error: "backing session missing", + endedAt: now - 5 * 60_000, + }), + createTask({ + taskId: "missing-cleanup", + status: "failed", + endedAt: now - 60_000, + cleanupAfter: undefined, + }), + ], + }); + + expect(findings.map((finding) => [finding.code, finding.task.taskId])).toEqual([ + ["lost", "lost-task"], + ["stale_running", "stale-running"], + ["missing_cleanup", "missing-cleanup"], + ]); + }); + + it("summarizes findings by severity and code", () => { + const summary = summarizeTaskAuditFindings([ + { + severity: "error", + code: "stale_running", + task: createTask({ taskId: "a", status: "running" }), + detail: "running task appears stuck", + }, + { + severity: "warn", + code: "delivery_failed", + task: createTask({ taskId: "b", status: "failed" }), + detail: "terminal update delivery failed", + }, + ]); + + expect(summary).toEqual({ + total: 2, + warnings: 1, + errors: 1, + byCode: { + stale_queued: 0, + stale_running: 1, + lost: 0, + delivery_failed: 1, + missing_cleanup: 0, + inconsistent_timestamps: 0, + }, + }); + }); + + it("does not double-report lost tasks as missing cleanup", () => { + const now = Date.parse("2026-03-30T01:00:00.000Z"); + const findings = listTaskAuditFindings({ + now, + tasks: [ + createTask({ + taskId: "lost-projected", + status: "lost", + endedAt: now - 60_000, + cleanupAfter: undefined, + }), + ], + }); + + expect(findings.map((finding) => finding.code)).toEqual(["lost"]); + }); +}); diff --git a/src/tasks/task-registry.audit.ts b/src/tasks/task-registry.audit.ts new file mode 100644 index 00000000000..0ff3b2504d2 --- /dev/null +++ b/src/tasks/task-registry.audit.ts @@ -0,0 +1,213 @@ +import { reconcileInspectableTasks } from "./task-registry.reconcile.js"; +import type { TaskRecord } from "./task-registry.types.js"; + +export type TaskAuditSeverity = "warn" | "error"; +export type TaskAuditCode = + | "stale_queued" + | "stale_running" + | "lost" + | "delivery_failed" + | "missing_cleanup" + | "inconsistent_timestamps"; + +export type TaskAuditFinding = { + severity: TaskAuditSeverity; + code: TaskAuditCode; + task: TaskRecord; + ageMs?: number; + detail: string; +}; + +export type TaskAuditSummary = { + total: number; + warnings: number; + errors: number; + byCode: Record; +}; + +export type TaskAuditOptions = { + now?: number; + tasks?: TaskRecord[]; + staleQueuedMs?: number; + staleRunningMs?: number; +}; + +const DEFAULT_STALE_QUEUED_MS = 10 * 60_000; +const DEFAULT_STALE_RUNNING_MS = 30 * 60_000; + +function createEmptyTaskAuditSummary(): TaskAuditSummary { + return { + total: 0, + warnings: 0, + errors: 0, + byCode: { + stale_queued: 0, + stale_running: 0, + lost: 0, + delivery_failed: 0, + missing_cleanup: 0, + inconsistent_timestamps: 0, + }, + }; +} + +function createFinding(params: { + severity: TaskAuditSeverity; + code: TaskAuditCode; + task: TaskRecord; + detail: string; + ageMs?: number; +}): TaskAuditFinding { + return { + severity: params.severity, + code: params.code, + task: params.task, + detail: params.detail, + ...(typeof params.ageMs === "number" ? { ageMs: params.ageMs } : {}), + }; +} + +function taskReferenceAt(task: TaskRecord): number { + return task.lastEventAt ?? task.startedAt ?? task.createdAt; +} + +function findTimestampInconsistency(task: TaskRecord): TaskAuditFinding | null { + if (task.startedAt && task.startedAt < task.createdAt) { + return createFinding({ + severity: "warn", + code: "inconsistent_timestamps", + task, + detail: "startedAt is earlier than createdAt", + }); + } + if (task.endedAt && task.startedAt && task.endedAt < task.startedAt) { + return createFinding({ + severity: "warn", + code: "inconsistent_timestamps", + task, + detail: "endedAt is earlier than startedAt", + }); + } + if ((task.status === "queued" || task.status === "running") && task.endedAt) { + return createFinding({ + severity: "warn", + code: "inconsistent_timestamps", + task, + detail: `${task.status} task should not already have endedAt`, + }); + } + return null; +} + +function compareFindings(left: TaskAuditFinding, right: TaskAuditFinding): number { + const severityRank = (severity: TaskAuditSeverity) => (severity === "error" ? 0 : 1); + const severityDiff = severityRank(left.severity) - severityRank(right.severity); + if (severityDiff !== 0) { + return severityDiff; + } + const leftAge = left.ageMs ?? -1; + const rightAge = right.ageMs ?? -1; + if (leftAge !== rightAge) { + return rightAge - leftAge; + } + return left.task.createdAt - right.task.createdAt; +} + +export function listTaskAuditFindings(options: TaskAuditOptions = {}): TaskAuditFinding[] { + const tasks = options.tasks ?? reconcileInspectableTasks(); + const now = options.now ?? Date.now(); + const staleQueuedMs = options.staleQueuedMs ?? DEFAULT_STALE_QUEUED_MS; + const staleRunningMs = options.staleRunningMs ?? DEFAULT_STALE_RUNNING_MS; + const findings: TaskAuditFinding[] = []; + + for (const task of tasks) { + const referenceAt = taskReferenceAt(task); + const ageMs = Math.max(0, now - referenceAt); + + if (task.status === "queued" && ageMs >= staleQueuedMs) { + findings.push( + createFinding({ + severity: "warn", + code: "stale_queued", + task, + ageMs, + detail: "queued task has not advanced recently", + }), + ); + } + + if (task.status === "running" && ageMs >= staleRunningMs) { + findings.push( + createFinding({ + severity: "error", + code: "stale_running", + task, + ageMs, + detail: "running task appears stuck", + }), + ); + } + + if (task.status === "lost") { + findings.push( + createFinding({ + severity: "error", + code: "lost", + task, + ageMs, + detail: task.error?.trim() || "task lost its backing session", + }), + ); + } + + if (task.deliveryStatus === "failed" && task.notifyPolicy !== "silent") { + findings.push( + createFinding({ + severity: "warn", + code: "delivery_failed", + task, + ageMs, + detail: "terminal update delivery failed", + }), + ); + } + + if ( + task.status !== "lost" && + task.status !== "queued" && + task.status !== "running" && + typeof task.cleanupAfter !== "number" + ) { + findings.push( + createFinding({ + severity: "warn", + code: "missing_cleanup", + task, + ageMs, + detail: "terminal task is missing cleanupAfter", + }), + ); + } + + const inconsistency = findTimestampInconsistency(task); + if (inconsistency) { + findings.push(inconsistency); + } + } + + return findings.toSorted(compareFindings); +} + +export function summarizeTaskAuditFindings(findings: Iterable): TaskAuditSummary { + const summary = createEmptyTaskAuditSummary(); + for (const finding of findings) { + summary.total += 1; + summary.byCode[finding.code] += 1; + if (finding.severity === "error") { + summary.errors += 1; + } else { + summary.warnings += 1; + } + } + return summary; +} diff --git a/src/tasks/task-registry.paths.ts b/src/tasks/task-registry.paths.ts new file mode 100644 index 00000000000..35488f76b54 --- /dev/null +++ b/src/tasks/task-registry.paths.ts @@ -0,0 +1,22 @@ +import os from "node:os"; +import path from "node:path"; +import { resolveStateDir } from "../config/paths.js"; + +export function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string { + const explicit = env.OPENCLAW_STATE_DIR?.trim(); + if (explicit) { + return resolveStateDir(env); + } + if (env.VITEST || env.NODE_ENV === "test") { + return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid)); + } + return resolveStateDir(env); +} + +export function resolveTaskRegistryDir(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveTaskStateDir(env), "tasks"); +} + +export function resolveTaskRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveTaskRegistryDir(env), "runs.sqlite"); +} diff --git a/src/tasks/task-registry.store.json.ts b/src/tasks/task-registry.store.json.ts deleted file mode 100644 index 9180e8b8b87..00000000000 --- a/src/tasks/task-registry.store.json.ts +++ /dev/null @@ -1,67 +0,0 @@ -import os from "node:os"; -import path from "node:path"; -import { resolveStateDir } from "../config/paths.js"; -import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; -import type { TaskRecord } from "./task-registry.types.js"; - -type PersistedTaskRegistry = { - version: 1; - tasks: Record; -}; - -const TASK_REGISTRY_VERSION = 1 as const; - -function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string { - const explicit = env.OPENCLAW_STATE_DIR?.trim(); - if (explicit) { - return resolveStateDir(env); - } - if (env.VITEST || env.NODE_ENV === "test") { - return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid)); - } - return resolveStateDir(env); -} - -export function resolveTaskRegistryPath(): string { - return path.join(resolveTaskStateDir(process.env), "tasks", "runs.json"); -} - -export function loadTaskRegistrySnapshotFromJson(): Map { - const pathname = resolveTaskRegistryPath(); - const raw = loadJsonFile(pathname); - if (!raw || typeof raw !== "object") { - return new Map(); - } - const record = raw as Partial; - if (record.version !== TASK_REGISTRY_VERSION) { - return new Map(); - } - const tasksRaw = record.tasks; - if (!tasksRaw || typeof tasksRaw !== "object") { - return new Map(); - } - const out = new Map(); - for (const [taskId, entry] of Object.entries(tasksRaw)) { - if (!entry || typeof entry !== "object") { - continue; - } - if (!entry.taskId || typeof entry.taskId !== "string") { - continue; - } - out.set(taskId, entry); - } - return out; -} - -export function saveTaskRegistrySnapshotToJson(tasks: ReadonlyMap) { - const pathname = resolveTaskRegistryPath(); - const serialized: Record = {}; - for (const [taskId, entry] of tasks.entries()) { - serialized[taskId] = entry; - } - const out: PersistedTaskRegistry = { - version: TASK_REGISTRY_VERSION, - tasks: serialized, - }; - saveJsonFile(pathname, out); -} diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts new file mode 100644 index 00000000000..42962655225 --- /dev/null +++ b/src/tasks/task-registry.store.sqlite.ts @@ -0,0 +1,359 @@ +import { chmodSync, existsSync, mkdirSync } from "node:fs"; +import type { DatabaseSync, StatementSync } from "node:sqlite"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import type { DeliveryContext } from "../utils/delivery-context.js"; +import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; +import type { TaskEventRecord, TaskRecord } from "./task-registry.types.js"; + +type TaskRegistryRow = { + task_id: string; + runtime: TaskRecord["runtime"]; + source_id: string | null; + requester_session_key: string; + requester_origin_json: string | null; + child_session_key: string | null; + parent_task_id: string | null; + agent_id: string | null; + run_id: string | null; + label: string | null; + task: string; + status: TaskRecord["status"]; + delivery_status: TaskRecord["deliveryStatus"]; + notify_policy: TaskRecord["notifyPolicy"]; + created_at: number | bigint; + started_at: number | bigint | null; + ended_at: number | bigint | null; + last_event_at: number | bigint | null; + cleanup_after: number | bigint | null; + error: string | null; + progress_summary: string | null; + terminal_summary: string | null; + terminal_outcome: TaskRecord["terminalOutcome"] | null; + recent_events_json: string | null; + last_notified_event_at: number | bigint | null; +}; + +type TaskRegistryStatements = { + selectAll: StatementSync; + replaceRow: StatementSync; + deleteRow: StatementSync; + clearRows: StatementSync; +}; + +type TaskRegistryDatabase = { + db: DatabaseSync; + path: string; + statements: TaskRegistryStatements; +}; + +let cachedDatabase: TaskRegistryDatabase | null = null; +const TASK_REGISTRY_DIR_MODE = 0o700; +const TASK_REGISTRY_FILE_MODE = 0o600; +const TASK_REGISTRY_SIDEcar_SUFFIXES = ["", "-shm", "-wal"] as const; + +function normalizeNumber(value: number | bigint | null): number | undefined { + if (typeof value === "bigint") { + return Number(value); + } + return typeof value === "number" ? value : undefined; +} + +function serializeJson(value: unknown): string | null { + return value == null ? null : JSON.stringify(value); +} + +function parseJsonValue(raw: string | null): T | undefined { + if (!raw?.trim()) { + return undefined; + } + try { + return JSON.parse(raw) as T; + } catch { + return undefined; + } +} + +function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { + const requesterOrigin = parseJsonValue(row.requester_origin_json); + const recentEvents = parseJsonValue(row.recent_events_json); + const startedAt = normalizeNumber(row.started_at); + const endedAt = normalizeNumber(row.ended_at); + const lastEventAt = normalizeNumber(row.last_event_at); + const cleanupAfter = normalizeNumber(row.cleanup_after); + const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at); + return { + taskId: row.task_id, + runtime: row.runtime, + ...(row.source_id ? { sourceId: row.source_id } : {}), + requesterSessionKey: row.requester_session_key, + ...(requesterOrigin ? { requesterOrigin } : {}), + ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), + ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), + ...(row.agent_id ? { agentId: row.agent_id } : {}), + ...(row.run_id ? { runId: row.run_id } : {}), + ...(row.label ? { label: row.label } : {}), + task: row.task, + status: row.status, + deliveryStatus: row.delivery_status, + notifyPolicy: row.notify_policy, + createdAt: normalizeNumber(row.created_at) ?? 0, + ...(startedAt != null ? { startedAt } : {}), + ...(endedAt != null ? { endedAt } : {}), + ...(lastEventAt != null ? { lastEventAt } : {}), + ...(cleanupAfter != null ? { cleanupAfter } : {}), + ...(row.error ? { error: row.error } : {}), + ...(row.progress_summary ? { progressSummary: row.progress_summary } : {}), + ...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}), + ...(row.terminal_outcome ? { terminalOutcome: row.terminal_outcome } : {}), + ...(recentEvents?.length ? { recentEvents } : {}), + ...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}), + }; +} + +function bindTaskRecord(record: TaskRecord) { + return { + task_id: record.taskId, + runtime: record.runtime, + source_id: record.sourceId ?? null, + requester_session_key: record.requesterSessionKey, + requester_origin_json: serializeJson(record.requesterOrigin), + child_session_key: record.childSessionKey ?? null, + parent_task_id: record.parentTaskId ?? null, + agent_id: record.agentId ?? null, + run_id: record.runId ?? null, + label: record.label ?? null, + task: record.task, + status: record.status, + delivery_status: record.deliveryStatus, + notify_policy: record.notifyPolicy, + created_at: record.createdAt, + started_at: record.startedAt ?? null, + ended_at: record.endedAt ?? null, + last_event_at: record.lastEventAt ?? null, + cleanup_after: record.cleanupAfter ?? null, + error: record.error ?? null, + progress_summary: record.progressSummary ?? null, + terminal_summary: record.terminalSummary ?? null, + terminal_outcome: record.terminalOutcome ?? null, + recent_events_json: serializeJson(record.recentEvents), + last_notified_event_at: record.lastNotifiedEventAt ?? null, + }; +} + +function createStatements(db: DatabaseSync): TaskRegistryStatements { + return { + selectAll: db.prepare(` + SELECT + task_id, + runtime, + source_id, + requester_session_key, + requester_origin_json, + child_session_key, + parent_task_id, + agent_id, + run_id, + label, + task, + status, + delivery_status, + notify_policy, + created_at, + started_at, + ended_at, + last_event_at, + cleanup_after, + error, + progress_summary, + terminal_summary, + terminal_outcome, + recent_events_json, + last_notified_event_at + FROM task_runs + ORDER BY created_at ASC, task_id ASC + `), + replaceRow: db.prepare(` + INSERT OR REPLACE INTO task_runs ( + task_id, + runtime, + source_id, + requester_session_key, + requester_origin_json, + child_session_key, + parent_task_id, + agent_id, + run_id, + label, + task, + status, + delivery_status, + notify_policy, + created_at, + started_at, + ended_at, + last_event_at, + cleanup_after, + error, + progress_summary, + terminal_summary, + terminal_outcome, + recent_events_json, + last_notified_event_at + ) VALUES ( + @task_id, + @runtime, + @source_id, + @requester_session_key, + @requester_origin_json, + @child_session_key, + @parent_task_id, + @agent_id, + @run_id, + @label, + @task, + @status, + @delivery_status, + @notify_policy, + @created_at, + @started_at, + @ended_at, + @last_event_at, + @cleanup_after, + @error, + @progress_summary, + @terminal_summary, + @terminal_outcome, + @recent_events_json, + @last_notified_event_at + ) + `), + deleteRow: db.prepare(`DELETE FROM task_runs WHERE task_id = ?`), + clearRows: db.prepare(`DELETE FROM task_runs`), + }; +} + +function ensureSchema(db: DatabaseSync) { + db.exec(` + CREATE TABLE IF NOT EXISTS task_runs ( + task_id TEXT PRIMARY KEY, + runtime TEXT NOT NULL, + source_id TEXT, + requester_session_key TEXT NOT NULL, + requester_origin_json TEXT, + child_session_key TEXT, + parent_task_id TEXT, + agent_id TEXT, + run_id TEXT, + label TEXT, + task TEXT NOT NULL, + status TEXT NOT NULL, + delivery_status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + created_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + last_event_at INTEGER, + cleanup_after INTEGER, + error TEXT, + progress_summary TEXT, + terminal_summary TEXT, + terminal_outcome TEXT, + recent_events_json TEXT, + last_notified_event_at INTEGER + ); + `); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`); + db.exec( + `CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`, + ); +} + +function ensureTaskRegistryPermissions(pathname: string) { + const dir = resolveTaskRegistryDir(process.env); + mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE }); + chmodSync(dir, TASK_REGISTRY_DIR_MODE); + for (const suffix of TASK_REGISTRY_SIDEcar_SUFFIXES) { + const candidate = `${pathname}${suffix}`; + if (!existsSync(candidate)) { + continue; + } + chmodSync(candidate, TASK_REGISTRY_FILE_MODE); + } +} + +function openTaskRegistryDatabase(): TaskRegistryDatabase { + const pathname = resolveTaskRegistrySqlitePath(process.env); + if (cachedDatabase && cachedDatabase.path === pathname) { + return cachedDatabase; + } + if (cachedDatabase) { + cachedDatabase.db.close(); + cachedDatabase = null; + } + ensureTaskRegistryPermissions(pathname); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(pathname); + db.exec(`PRAGMA journal_mode = WAL;`); + db.exec(`PRAGMA synchronous = NORMAL;`); + db.exec(`PRAGMA busy_timeout = 5000;`); + ensureSchema(db); + ensureTaskRegistryPermissions(pathname); + cachedDatabase = { + db, + path: pathname, + statements: createStatements(db), + }; + return cachedDatabase; +} + +function withWriteTransaction(write: (statements: TaskRegistryStatements) => void) { + const { db, path, statements } = openTaskRegistryDatabase(); + db.exec("BEGIN IMMEDIATE"); + try { + write(statements); + db.exec("COMMIT"); + ensureTaskRegistryPermissions(path); + } catch (error) { + db.exec("ROLLBACK"); + throw error; + } +} + +export function loadTaskRegistrySnapshotFromSqlite(): Map { + const { statements } = openTaskRegistryDatabase(); + const rows = statements.selectAll.all() as TaskRegistryRow[]; + return new Map(rows.map((row) => [row.task_id, rowToTaskRecord(row)])); +} + +export function saveTaskRegistrySnapshotToSqlite(tasks: ReadonlyMap) { + withWriteTransaction((statements) => { + statements.clearRows.run(); + for (const task of tasks.values()) { + statements.replaceRow.run(bindTaskRecord(task)); + } + }); +} + +export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { + const store = openTaskRegistryDatabase(); + store.statements.replaceRow.run(bindTaskRecord(task)); + ensureTaskRegistryPermissions(store.path); +} + +export function deleteTaskRegistryRecordFromSqlite(taskId: string) { + const store = openTaskRegistryDatabase(); + store.statements.deleteRow.run(taskId); + ensureTaskRegistryPermissions(store.path); +} + +export function closeTaskRegistrySqliteStore() { + if (!cachedDatabase) { + return; + } + cachedDatabase.db.close(); + cachedDatabase = null; +} diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index edb98a204c2..5414e5d40d6 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -1,3 +1,6 @@ +import { mkdtempSync, rmSync, statSync } from "node:fs"; +import os from "node:os"; +import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; import { createTaskRecord, @@ -5,6 +8,7 @@ import { findTaskByRunId, resetTaskRegistryForTests, } from "./task-registry.js"; +import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; import { configureTaskRegistryRuntime, type TaskRegistryHookEvent } from "./task-registry.store.js"; import type { TaskRecord } from "./task-registry.types.js"; @@ -27,7 +31,8 @@ function createStoredTask(): TaskRecord { describe("task-registry store runtime", () => { afterEach(() => { - resetTaskRegistryForTests({ persist: false }); + delete process.env.OPENCLAW_STATE_DIR; + resetTaskRegistryForTests(); }); it("uses the configured task store for restore and save", () => { @@ -103,4 +108,52 @@ describe("task-registry store runtime", () => { taskId: created.taskId, }); }); + + it("restores persisted tasks from the default sqlite store", () => { + const created = createTaskRecord({ + runtime: "cron", + requesterSessionKey: "agent:main:main", + sourceId: "job-123", + runId: "run-sqlite", + task: "Run nightly cron", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + }); + + resetTaskRegistryForTests({ persist: false }); + + expect(findTaskByRunId("run-sqlite")).toMatchObject({ + taskId: created.taskId, + sourceId: "job-123", + task: "Run nightly cron", + }); + }); + + it("hardens the sqlite task store directory and file modes", () => { + if (process.platform === "win32") { + return; + } + const stateDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-task-store-")); + process.env.OPENCLAW_STATE_DIR = stateDir; + + createTaskRecord({ + runtime: "cron", + requesterSessionKey: "agent:main:main", + sourceId: "job-456", + runId: "run-perms", + task: "Run secured cron", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + }); + + const registryDir = resolveTaskRegistryDir(process.env); + const sqlitePath = resolveTaskRegistrySqlitePath(process.env); + expect(statSync(registryDir).mode & 0o777).toBe(0o700); + expect(statSync(sqlitePath).mode & 0o777).toBe(0o600); + + resetTaskRegistryForTests(); + rmSync(stateDir, { recursive: true, force: true }); + }); }); diff --git a/src/tasks/task-registry.store.ts b/src/tasks/task-registry.store.ts index c8b16910e35..5580231b62c 100644 --- a/src/tasks/task-registry.store.ts +++ b/src/tasks/task-registry.store.ts @@ -1,12 +1,18 @@ import { - loadTaskRegistrySnapshotFromJson, - saveTaskRegistrySnapshotToJson, -} from "./task-registry.store.json.js"; + closeTaskRegistrySqliteStore, + deleteTaskRegistryRecordFromSqlite, + loadTaskRegistrySnapshotFromSqlite, + saveTaskRegistrySnapshotToSqlite, + upsertTaskRegistryRecordToSqlite, +} from "./task-registry.store.sqlite.js"; import type { TaskRecord } from "./task-registry.types.js"; export type TaskRegistryStore = { loadSnapshot: () => Map; saveSnapshot: (tasks: ReadonlyMap) => void; + upsertTask?: (task: TaskRecord) => void; + deleteTask?: (taskId: string) => void; + close?: () => void; }; export type TaskRegistryHookEvent = @@ -31,8 +37,11 @@ export type TaskRegistryHooks = { }; const defaultTaskRegistryStore: TaskRegistryStore = { - loadSnapshot: loadTaskRegistrySnapshotFromJson, - saveSnapshot: saveTaskRegistrySnapshotToJson, + loadSnapshot: loadTaskRegistrySnapshotFromSqlite, + saveSnapshot: saveTaskRegistrySnapshotToSqlite, + upsertTask: upsertTaskRegistryRecordToSqlite, + deleteTask: deleteTaskRegistryRecordFromSqlite, + close: closeTaskRegistrySqliteStore, }; let configuredTaskRegistryStore: TaskRegistryStore = defaultTaskRegistryStore; @@ -59,6 +68,7 @@ export function configureTaskRegistryRuntime(params: { } export function resetTaskRegistryRuntimeForTests() { + configuredTaskRegistryStore.close?.(); configuredTaskRegistryStore = defaultTaskRegistryStore; configuredTaskRegistryHooks = null; } diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 43f395b8206..0c621e1dc9d 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -74,6 +74,24 @@ function persistTaskRegistry() { getTaskRegistryStore().saveSnapshot(tasks); } +function persistTaskUpsert(task: TaskRecord) { + const store = getTaskRegistryStore(); + if (store.upsertTask) { + store.upsertTask(task); + return; + } + store.saveSnapshot(tasks); +} + +function persistTaskDelete(taskId: string) { + const store = getTaskRegistryStore(); + if (store.deleteTask) { + store.deleteTask(taskId); + return; + } + store.saveSnapshot(tasks); +} + function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus { return requesterSessionKey.trim() ? "pending" : "parent_missing"; } @@ -354,7 +372,7 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu if (patch.runId && patch.runId !== current.runId) { rebuildRunIdIndex(); } - persistTaskRegistry(); + persistTaskUpsert(next); emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(next), @@ -838,7 +856,7 @@ export function createTaskRecord(params: { } tasks.set(taskId, record); addRunIdIndex(taskId, record.runId); - persistTaskRegistry(); + persistTaskUpsert(record); emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(record), @@ -1100,7 +1118,7 @@ export function deleteTaskRecordById(taskId: string): boolean { } tasks.delete(taskId); rebuildRunIdIndex(); - persistTaskRegistry(); + persistTaskDelete(taskId); emitTaskRegistryHookEvent(() => ({ kind: "deleted", taskId: current.taskId,