feat(tasks): move task ledger to sqlite and add audit CLI (#57361)

* feat(tasks): move task ledger to sqlite

* feat(tasks): add task run audit command

* style(tasks): normalize audit command formatting

* fix(tasks): address audit summary and sqlite perms

* fix(tasks): avoid duplicate lost audit findings
This commit is contained in:
Vincent Koc
2026-03-29 19:34:51 -07:00
committed by GitHub
parent 6f09a68ae7
commit e57b3618fc
13 changed files with 1071 additions and 78 deletions

View File

@@ -0,0 +1,100 @@
import { describe, expect, it } from "vitest";
import { listTaskAuditFindings, summarizeTaskAuditFindings } from "./task-registry.audit.js";
import type { TaskRecord } from "./task-registry.types.js";
function createTask(partial: Partial<TaskRecord>): TaskRecord {
return {
taskId: partial.taskId ?? "task-1",
runtime: partial.runtime ?? "acp",
requesterSessionKey: partial.requesterSessionKey ?? "agent:main:main",
task: partial.task ?? "Background task",
status: partial.status ?? "queued",
deliveryStatus: partial.deliveryStatus ?? "pending",
notifyPolicy: partial.notifyPolicy ?? "done_only",
createdAt: partial.createdAt ?? Date.parse("2026-03-30T00:00:00.000Z"),
...partial,
};
}
describe("task-registry audit", () => {
it("flags stale running, lost, and missing cleanup tasks", () => {
const now = Date.parse("2026-03-30T01:00:00.000Z");
const findings = listTaskAuditFindings({
now,
tasks: [
createTask({
taskId: "stale-running",
status: "running",
startedAt: now - 40 * 60_000,
lastEventAt: now - 40 * 60_000,
}),
createTask({
taskId: "lost-task",
status: "lost",
error: "backing session missing",
endedAt: now - 5 * 60_000,
}),
createTask({
taskId: "missing-cleanup",
status: "failed",
endedAt: now - 60_000,
cleanupAfter: undefined,
}),
],
});
expect(findings.map((finding) => [finding.code, finding.task.taskId])).toEqual([
["lost", "lost-task"],
["stale_running", "stale-running"],
["missing_cleanup", "missing-cleanup"],
]);
});
it("summarizes findings by severity and code", () => {
const summary = summarizeTaskAuditFindings([
{
severity: "error",
code: "stale_running",
task: createTask({ taskId: "a", status: "running" }),
detail: "running task appears stuck",
},
{
severity: "warn",
code: "delivery_failed",
task: createTask({ taskId: "b", status: "failed" }),
detail: "terminal update delivery failed",
},
]);
expect(summary).toEqual({
total: 2,
warnings: 1,
errors: 1,
byCode: {
stale_queued: 0,
stale_running: 1,
lost: 0,
delivery_failed: 1,
missing_cleanup: 0,
inconsistent_timestamps: 0,
},
});
});
it("does not double-report lost tasks as missing cleanup", () => {
const now = Date.parse("2026-03-30T01:00:00.000Z");
const findings = listTaskAuditFindings({
now,
tasks: [
createTask({
taskId: "lost-projected",
status: "lost",
endedAt: now - 60_000,
cleanupAfter: undefined,
}),
],
});
expect(findings.map((finding) => finding.code)).toEqual(["lost"]);
});
});

View File

@@ -0,0 +1,213 @@
import { reconcileInspectableTasks } from "./task-registry.reconcile.js";
import type { TaskRecord } from "./task-registry.types.js";
export type TaskAuditSeverity = "warn" | "error";
export type TaskAuditCode =
| "stale_queued"
| "stale_running"
| "lost"
| "delivery_failed"
| "missing_cleanup"
| "inconsistent_timestamps";
export type TaskAuditFinding = {
severity: TaskAuditSeverity;
code: TaskAuditCode;
task: TaskRecord;
ageMs?: number;
detail: string;
};
export type TaskAuditSummary = {
total: number;
warnings: number;
errors: number;
byCode: Record<TaskAuditCode, number>;
};
export type TaskAuditOptions = {
now?: number;
tasks?: TaskRecord[];
staleQueuedMs?: number;
staleRunningMs?: number;
};
const DEFAULT_STALE_QUEUED_MS = 10 * 60_000;
const DEFAULT_STALE_RUNNING_MS = 30 * 60_000;
function createEmptyTaskAuditSummary(): TaskAuditSummary {
return {
total: 0,
warnings: 0,
errors: 0,
byCode: {
stale_queued: 0,
stale_running: 0,
lost: 0,
delivery_failed: 0,
missing_cleanup: 0,
inconsistent_timestamps: 0,
},
};
}
function createFinding(params: {
severity: TaskAuditSeverity;
code: TaskAuditCode;
task: TaskRecord;
detail: string;
ageMs?: number;
}): TaskAuditFinding {
return {
severity: params.severity,
code: params.code,
task: params.task,
detail: params.detail,
...(typeof params.ageMs === "number" ? { ageMs: params.ageMs } : {}),
};
}
function taskReferenceAt(task: TaskRecord): number {
return task.lastEventAt ?? task.startedAt ?? task.createdAt;
}
function findTimestampInconsistency(task: TaskRecord): TaskAuditFinding | null {
if (task.startedAt && task.startedAt < task.createdAt) {
return createFinding({
severity: "warn",
code: "inconsistent_timestamps",
task,
detail: "startedAt is earlier than createdAt",
});
}
if (task.endedAt && task.startedAt && task.endedAt < task.startedAt) {
return createFinding({
severity: "warn",
code: "inconsistent_timestamps",
task,
detail: "endedAt is earlier than startedAt",
});
}
if ((task.status === "queued" || task.status === "running") && task.endedAt) {
return createFinding({
severity: "warn",
code: "inconsistent_timestamps",
task,
detail: `${task.status} task should not already have endedAt`,
});
}
return null;
}
function compareFindings(left: TaskAuditFinding, right: TaskAuditFinding): number {
const severityRank = (severity: TaskAuditSeverity) => (severity === "error" ? 0 : 1);
const severityDiff = severityRank(left.severity) - severityRank(right.severity);
if (severityDiff !== 0) {
return severityDiff;
}
const leftAge = left.ageMs ?? -1;
const rightAge = right.ageMs ?? -1;
if (leftAge !== rightAge) {
return rightAge - leftAge;
}
return left.task.createdAt - right.task.createdAt;
}
export function listTaskAuditFindings(options: TaskAuditOptions = {}): TaskAuditFinding[] {
const tasks = options.tasks ?? reconcileInspectableTasks();
const now = options.now ?? Date.now();
const staleQueuedMs = options.staleQueuedMs ?? DEFAULT_STALE_QUEUED_MS;
const staleRunningMs = options.staleRunningMs ?? DEFAULT_STALE_RUNNING_MS;
const findings: TaskAuditFinding[] = [];
for (const task of tasks) {
const referenceAt = taskReferenceAt(task);
const ageMs = Math.max(0, now - referenceAt);
if (task.status === "queued" && ageMs >= staleQueuedMs) {
findings.push(
createFinding({
severity: "warn",
code: "stale_queued",
task,
ageMs,
detail: "queued task has not advanced recently",
}),
);
}
if (task.status === "running" && ageMs >= staleRunningMs) {
findings.push(
createFinding({
severity: "error",
code: "stale_running",
task,
ageMs,
detail: "running task appears stuck",
}),
);
}
if (task.status === "lost") {
findings.push(
createFinding({
severity: "error",
code: "lost",
task,
ageMs,
detail: task.error?.trim() || "task lost its backing session",
}),
);
}
if (task.deliveryStatus === "failed" && task.notifyPolicy !== "silent") {
findings.push(
createFinding({
severity: "warn",
code: "delivery_failed",
task,
ageMs,
detail: "terminal update delivery failed",
}),
);
}
if (
task.status !== "lost" &&
task.status !== "queued" &&
task.status !== "running" &&
typeof task.cleanupAfter !== "number"
) {
findings.push(
createFinding({
severity: "warn",
code: "missing_cleanup",
task,
ageMs,
detail: "terminal task is missing cleanupAfter",
}),
);
}
const inconsistency = findTimestampInconsistency(task);
if (inconsistency) {
findings.push(inconsistency);
}
}
return findings.toSorted(compareFindings);
}
export function summarizeTaskAuditFindings(findings: Iterable<TaskAuditFinding>): TaskAuditSummary {
const summary = createEmptyTaskAuditSummary();
for (const finding of findings) {
summary.total += 1;
summary.byCode[finding.code] += 1;
if (finding.severity === "error") {
summary.errors += 1;
} else {
summary.warnings += 1;
}
}
return summary;
}

View File

@@ -0,0 +1,22 @@
import os from "node:os";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
export function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string {
const explicit = env.OPENCLAW_STATE_DIR?.trim();
if (explicit) {
return resolveStateDir(env);
}
if (env.VITEST || env.NODE_ENV === "test") {
return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid));
}
return resolveStateDir(env);
}
export function resolveTaskRegistryDir(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveTaskStateDir(env), "tasks");
}
export function resolveTaskRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveTaskRegistryDir(env), "runs.sqlite");
}

View File

@@ -1,67 +0,0 @@
import os from "node:os";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import type { TaskRecord } from "./task-registry.types.js";
type PersistedTaskRegistry = {
version: 1;
tasks: Record<string, TaskRecord>;
};
const TASK_REGISTRY_VERSION = 1 as const;
function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string {
const explicit = env.OPENCLAW_STATE_DIR?.trim();
if (explicit) {
return resolveStateDir(env);
}
if (env.VITEST || env.NODE_ENV === "test") {
return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid));
}
return resolveStateDir(env);
}
export function resolveTaskRegistryPath(): string {
return path.join(resolveTaskStateDir(process.env), "tasks", "runs.json");
}
export function loadTaskRegistrySnapshotFromJson(): Map<string, TaskRecord> {
const pathname = resolveTaskRegistryPath();
const raw = loadJsonFile(pathname);
if (!raw || typeof raw !== "object") {
return new Map();
}
const record = raw as Partial<PersistedTaskRegistry>;
if (record.version !== TASK_REGISTRY_VERSION) {
return new Map();
}
const tasksRaw = record.tasks;
if (!tasksRaw || typeof tasksRaw !== "object") {
return new Map();
}
const out = new Map<string, TaskRecord>();
for (const [taskId, entry] of Object.entries(tasksRaw)) {
if (!entry || typeof entry !== "object") {
continue;
}
if (!entry.taskId || typeof entry.taskId !== "string") {
continue;
}
out.set(taskId, entry);
}
return out;
}
export function saveTaskRegistrySnapshotToJson(tasks: ReadonlyMap<string, TaskRecord>) {
const pathname = resolveTaskRegistryPath();
const serialized: Record<string, TaskRecord> = {};
for (const [taskId, entry] of tasks.entries()) {
serialized[taskId] = entry;
}
const out: PersistedTaskRegistry = {
version: TASK_REGISTRY_VERSION,
tasks: serialized,
};
saveJsonFile(pathname, out);
}

View File

@@ -0,0 +1,359 @@
import { chmodSync, existsSync, mkdirSync } from "node:fs";
import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
import type { TaskEventRecord, TaskRecord } from "./task-registry.types.js";
type TaskRegistryRow = {
task_id: string;
runtime: TaskRecord["runtime"];
source_id: string | null;
requester_session_key: string;
requester_origin_json: string | null;
child_session_key: string | null;
parent_task_id: string | null;
agent_id: string | null;
run_id: string | null;
label: string | null;
task: string;
status: TaskRecord["status"];
delivery_status: TaskRecord["deliveryStatus"];
notify_policy: TaskRecord["notifyPolicy"];
created_at: number | bigint;
started_at: number | bigint | null;
ended_at: number | bigint | null;
last_event_at: number | bigint | null;
cleanup_after: number | bigint | null;
error: string | null;
progress_summary: string | null;
terminal_summary: string | null;
terminal_outcome: TaskRecord["terminalOutcome"] | null;
recent_events_json: string | null;
last_notified_event_at: number | bigint | null;
};
type TaskRegistryStatements = {
selectAll: StatementSync;
replaceRow: StatementSync;
deleteRow: StatementSync;
clearRows: StatementSync;
};
type TaskRegistryDatabase = {
db: DatabaseSync;
path: string;
statements: TaskRegistryStatements;
};
let cachedDatabase: TaskRegistryDatabase | null = null;
const TASK_REGISTRY_DIR_MODE = 0o700;
const TASK_REGISTRY_FILE_MODE = 0o600;
const TASK_REGISTRY_SIDEcar_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null {
return value == null ? null : JSON.stringify(value);
}
function parseJsonValue<T>(raw: string | null): T | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
return JSON.parse(raw) as T;
} catch {
return undefined;
}
}
function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const recentEvents = parseJsonValue<TaskEventRecord[]>(row.recent_events_json);
const startedAt = normalizeNumber(row.started_at);
const endedAt = normalizeNumber(row.ended_at);
const lastEventAt = normalizeNumber(row.last_event_at);
const cleanupAfter = normalizeNumber(row.cleanup_after);
const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at);
return {
taskId: row.task_id,
runtime: row.runtime,
...(row.source_id ? { sourceId: row.source_id } : {}),
requesterSessionKey: row.requester_session_key,
...(requesterOrigin ? { requesterOrigin } : {}),
...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}),
...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}),
...(row.agent_id ? { agentId: row.agent_id } : {}),
...(row.run_id ? { runId: row.run_id } : {}),
...(row.label ? { label: row.label } : {}),
task: row.task,
status: row.status,
deliveryStatus: row.delivery_status,
notifyPolicy: row.notify_policy,
createdAt: normalizeNumber(row.created_at) ?? 0,
...(startedAt != null ? { startedAt } : {}),
...(endedAt != null ? { endedAt } : {}),
...(lastEventAt != null ? { lastEventAt } : {}),
...(cleanupAfter != null ? { cleanupAfter } : {}),
...(row.error ? { error: row.error } : {}),
...(row.progress_summary ? { progressSummary: row.progress_summary } : {}),
...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}),
...(row.terminal_outcome ? { terminalOutcome: row.terminal_outcome } : {}),
...(recentEvents?.length ? { recentEvents } : {}),
...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}),
};
}
function bindTaskRecord(record: TaskRecord) {
return {
task_id: record.taskId,
runtime: record.runtime,
source_id: record.sourceId ?? null,
requester_session_key: record.requesterSessionKey,
requester_origin_json: serializeJson(record.requesterOrigin),
child_session_key: record.childSessionKey ?? null,
parent_task_id: record.parentTaskId ?? null,
agent_id: record.agentId ?? null,
run_id: record.runId ?? null,
label: record.label ?? null,
task: record.task,
status: record.status,
delivery_status: record.deliveryStatus,
notify_policy: record.notifyPolicy,
created_at: record.createdAt,
started_at: record.startedAt ?? null,
ended_at: record.endedAt ?? null,
last_event_at: record.lastEventAt ?? null,
cleanup_after: record.cleanupAfter ?? null,
error: record.error ?? null,
progress_summary: record.progressSummary ?? null,
terminal_summary: record.terminalSummary ?? null,
terminal_outcome: record.terminalOutcome ?? null,
recent_events_json: serializeJson(record.recentEvents),
last_notified_event_at: record.lastNotifiedEventAt ?? null,
};
}
function createStatements(db: DatabaseSync): TaskRegistryStatements {
return {
selectAll: db.prepare(`
SELECT
task_id,
runtime,
source_id,
requester_session_key,
requester_origin_json,
child_session_key,
parent_task_id,
agent_id,
run_id,
label,
task,
status,
delivery_status,
notify_policy,
created_at,
started_at,
ended_at,
last_event_at,
cleanup_after,
error,
progress_summary,
terminal_summary,
terminal_outcome,
recent_events_json,
last_notified_event_at
FROM task_runs
ORDER BY created_at ASC, task_id ASC
`),
replaceRow: db.prepare(`
INSERT OR REPLACE INTO task_runs (
task_id,
runtime,
source_id,
requester_session_key,
requester_origin_json,
child_session_key,
parent_task_id,
agent_id,
run_id,
label,
task,
status,
delivery_status,
notify_policy,
created_at,
started_at,
ended_at,
last_event_at,
cleanup_after,
error,
progress_summary,
terminal_summary,
terminal_outcome,
recent_events_json,
last_notified_event_at
) VALUES (
@task_id,
@runtime,
@source_id,
@requester_session_key,
@requester_origin_json,
@child_session_key,
@parent_task_id,
@agent_id,
@run_id,
@label,
@task,
@status,
@delivery_status,
@notify_policy,
@created_at,
@started_at,
@ended_at,
@last_event_at,
@cleanup_after,
@error,
@progress_summary,
@terminal_summary,
@terminal_outcome,
@recent_events_json,
@last_notified_event_at
)
`),
deleteRow: db.prepare(`DELETE FROM task_runs WHERE task_id = ?`),
clearRows: db.prepare(`DELETE FROM task_runs`),
};
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS task_runs (
task_id TEXT PRIMARY KEY,
runtime TEXT NOT NULL,
source_id TEXT,
requester_session_key TEXT NOT NULL,
requester_origin_json TEXT,
child_session_key TEXT,
parent_task_id TEXT,
agent_id TEXT,
run_id TEXT,
label TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL,
delivery_status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
last_event_at INTEGER,
cleanup_after INTEGER,
error TEXT,
progress_summary TEXT,
terminal_summary TEXT,
terminal_outcome TEXT,
recent_events_json TEXT,
last_notified_event_at INTEGER
);
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`,
);
}
function ensureTaskRegistryPermissions(pathname: string) {
const dir = resolveTaskRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE });
chmodSync(dir, TASK_REGISTRY_DIR_MODE);
for (const suffix of TASK_REGISTRY_SIDEcar_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (!existsSync(candidate)) {
continue;
}
chmodSync(candidate, TASK_REGISTRY_FILE_MODE);
}
}
function openTaskRegistryDatabase(): TaskRegistryDatabase {
const pathname = resolveTaskRegistrySqlitePath(process.env);
if (cachedDatabase && cachedDatabase.path === pathname) {
return cachedDatabase;
}
if (cachedDatabase) {
cachedDatabase.db.close();
cachedDatabase = null;
}
ensureTaskRegistryPermissions(pathname);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(pathname);
db.exec(`PRAGMA journal_mode = WAL;`);
db.exec(`PRAGMA synchronous = NORMAL;`);
db.exec(`PRAGMA busy_timeout = 5000;`);
ensureSchema(db);
ensureTaskRegistryPermissions(pathname);
cachedDatabase = {
db,
path: pathname,
statements: createStatements(db),
};
return cachedDatabase;
}
function withWriteTransaction(write: (statements: TaskRegistryStatements) => void) {
const { db, path, statements } = openTaskRegistryDatabase();
db.exec("BEGIN IMMEDIATE");
try {
write(statements);
db.exec("COMMIT");
ensureTaskRegistryPermissions(path);
} catch (error) {
db.exec("ROLLBACK");
throw error;
}
}
export function loadTaskRegistrySnapshotFromSqlite(): Map<string, TaskRecord> {
const { statements } = openTaskRegistryDatabase();
const rows = statements.selectAll.all() as TaskRegistryRow[];
return new Map(rows.map((row) => [row.task_id, rowToTaskRecord(row)]));
}
export function saveTaskRegistrySnapshotToSqlite(tasks: ReadonlyMap<string, TaskRecord>) {
withWriteTransaction((statements) => {
statements.clearRows.run();
for (const task of tasks.values()) {
statements.replaceRow.run(bindTaskRecord(task));
}
});
}
export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) {
const store = openTaskRegistryDatabase();
store.statements.replaceRow.run(bindTaskRecord(task));
ensureTaskRegistryPermissions(store.path);
}
export function deleteTaskRegistryRecordFromSqlite(taskId: string) {
const store = openTaskRegistryDatabase();
store.statements.deleteRow.run(taskId);
ensureTaskRegistryPermissions(store.path);
}
export function closeTaskRegistrySqliteStore() {
if (!cachedDatabase) {
return;
}
cachedDatabase.db.close();
cachedDatabase = null;
}

View File

@@ -1,3 +1,6 @@
import { mkdtempSync, rmSync, statSync } from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
createTaskRecord,
@@ -5,6 +8,7 @@ import {
findTaskByRunId,
resetTaskRegistryForTests,
} from "./task-registry.js";
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
import { configureTaskRegistryRuntime, type TaskRegistryHookEvent } from "./task-registry.store.js";
import type { TaskRecord } from "./task-registry.types.js";
@@ -27,7 +31,8 @@ function createStoredTask(): TaskRecord {
describe("task-registry store runtime", () => {
afterEach(() => {
resetTaskRegistryForTests({ persist: false });
delete process.env.OPENCLAW_STATE_DIR;
resetTaskRegistryForTests();
});
it("uses the configured task store for restore and save", () => {
@@ -103,4 +108,52 @@ describe("task-registry store runtime", () => {
taskId: created.taskId,
});
});
it("restores persisted tasks from the default sqlite store", () => {
const created = createTaskRecord({
runtime: "cron",
requesterSessionKey: "agent:main:main",
sourceId: "job-123",
runId: "run-sqlite",
task: "Run nightly cron",
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
});
resetTaskRegistryForTests({ persist: false });
expect(findTaskByRunId("run-sqlite")).toMatchObject({
taskId: created.taskId,
sourceId: "job-123",
task: "Run nightly cron",
});
});
it("hardens the sqlite task store directory and file modes", () => {
if (process.platform === "win32") {
return;
}
const stateDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-task-store-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
createTaskRecord({
runtime: "cron",
requesterSessionKey: "agent:main:main",
sourceId: "job-456",
runId: "run-perms",
task: "Run secured cron",
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
});
const registryDir = resolveTaskRegistryDir(process.env);
const sqlitePath = resolveTaskRegistrySqlitePath(process.env);
expect(statSync(registryDir).mode & 0o777).toBe(0o700);
expect(statSync(sqlitePath).mode & 0o777).toBe(0o600);
resetTaskRegistryForTests();
rmSync(stateDir, { recursive: true, force: true });
});
});

View File

@@ -1,12 +1,18 @@
import {
loadTaskRegistrySnapshotFromJson,
saveTaskRegistrySnapshotToJson,
} from "./task-registry.store.json.js";
closeTaskRegistrySqliteStore,
deleteTaskRegistryRecordFromSqlite,
loadTaskRegistrySnapshotFromSqlite,
saveTaskRegistrySnapshotToSqlite,
upsertTaskRegistryRecordToSqlite,
} from "./task-registry.store.sqlite.js";
import type { TaskRecord } from "./task-registry.types.js";
export type TaskRegistryStore = {
loadSnapshot: () => Map<string, TaskRecord>;
saveSnapshot: (tasks: ReadonlyMap<string, TaskRecord>) => void;
upsertTask?: (task: TaskRecord) => void;
deleteTask?: (taskId: string) => void;
close?: () => void;
};
export type TaskRegistryHookEvent =
@@ -31,8 +37,11 @@ export type TaskRegistryHooks = {
};
const defaultTaskRegistryStore: TaskRegistryStore = {
loadSnapshot: loadTaskRegistrySnapshotFromJson,
saveSnapshot: saveTaskRegistrySnapshotToJson,
loadSnapshot: loadTaskRegistrySnapshotFromSqlite,
saveSnapshot: saveTaskRegistrySnapshotToSqlite,
upsertTask: upsertTaskRegistryRecordToSqlite,
deleteTask: deleteTaskRegistryRecordFromSqlite,
close: closeTaskRegistrySqliteStore,
};
let configuredTaskRegistryStore: TaskRegistryStore = defaultTaskRegistryStore;
@@ -59,6 +68,7 @@ export function configureTaskRegistryRuntime(params: {
}
export function resetTaskRegistryRuntimeForTests() {
configuredTaskRegistryStore.close?.();
configuredTaskRegistryStore = defaultTaskRegistryStore;
configuredTaskRegistryHooks = null;
}

View File

@@ -74,6 +74,24 @@ function persistTaskRegistry() {
getTaskRegistryStore().saveSnapshot(tasks);
}
function persistTaskUpsert(task: TaskRecord) {
const store = getTaskRegistryStore();
if (store.upsertTask) {
store.upsertTask(task);
return;
}
store.saveSnapshot(tasks);
}
function persistTaskDelete(taskId: string) {
const store = getTaskRegistryStore();
if (store.deleteTask) {
store.deleteTask(taskId);
return;
}
store.saveSnapshot(tasks);
}
function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus {
return requesterSessionKey.trim() ? "pending" : "parent_missing";
}
@@ -354,7 +372,7 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
if (patch.runId && patch.runId !== current.runId) {
rebuildRunIdIndex();
}
persistTaskRegistry();
persistTaskUpsert(next);
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(next),
@@ -838,7 +856,7 @@ export function createTaskRecord(params: {
}
tasks.set(taskId, record);
addRunIdIndex(taskId, record.runId);
persistTaskRegistry();
persistTaskUpsert(record);
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(record),
@@ -1100,7 +1118,7 @@ export function deleteTaskRecordById(taskId: string): boolean {
}
tasks.delete(taskId);
rebuildRunIdIndex();
persistTaskRegistry();
persistTaskDelete(taskId);
emitTaskRegistryHookEvent(() => ({
kind: "deleted",
taskId: current.taskId,