import type { DatabaseSync } from "node:sqlite"; import type { Insertable, Selectable } from "kysely"; import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; import { closeOpenClawStateDatabase, openOpenClawStateDatabase, runOpenClawStateWriteTransaction, } from "../state/openclaw-state-db.js"; import { parseDeliveryContextJson } from "./task-registry.sqlite.shared.js"; import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js"; import { parseOptionalTaskTerminalOutcome, parseTaskDeliveryStatus, parseTaskNotifyPolicy, parseTaskRuntime, parseTaskScopeKind, parseTaskStatus, type TaskDeliveryState, type TaskRecord, } from "./task-registry.types.js"; type TaskRunsTable = OpenClawStateKyselyDatabase["task_runs"]; type TaskDeliveryStateTable = OpenClawStateKyselyDatabase["task_delivery_state"]; type TaskRegistryStoreDatabase = Pick< OpenClawStateKyselyDatabase, "task_delivery_state" | "task_runs" >; type TaskRegistryRow = Selectable & { runtime: string; scope_kind: string; status: string; delivery_status: string; notify_policy: string; terminal_outcome: string | null; }; type TaskDeliveryStateRow = Selectable; type TaskRegistryDatabase = { db: DatabaseSync; path: string; }; const TASK_RUN_SELECT_COLUMNS = [ "task_id", "runtime", "task_kind", "source_id", "requester_session_key", "owner_key", "scope_kind", "child_session_key", "parent_flow_id", "parent_task_id", "agent_id", "run_id", "label", "task", "status", "delivery_status", "notify_policy", "created_at", "started_at", "ended_at", "last_event_at", "cleanup_after", "error", "progress_summary", "terminal_summary", "terminal_outcome", ] as const; let cachedDatabase: TaskRegistryDatabase | null = null; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { return Number(value); } return typeof value === "number" ? value : undefined; } function serializeJson(value: unknown): string | null { return value == null ? null : JSON.stringify(value); } function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { const startedAt = normalizeNumber(row.started_at); const endedAt = normalizeNumber(row.ended_at); const lastEventAt = normalizeNumber(row.last_event_at); const cleanupAfter = normalizeNumber(row.cleanup_after); const scopeKind = parseTaskScopeKind(row.scope_kind); const terminalOutcome = parseOptionalTaskTerminalOutcome(row.terminal_outcome); const requesterSessionKey = scopeKind === "system" ? "" : row.requester_session_key?.trim() || row.owner_key; return { taskId: row.task_id, runtime: parseTaskRuntime(row.runtime), ...(row.task_kind ? { taskKind: row.task_kind } : {}), ...(row.source_id ? { sourceId: row.source_id } : {}), requesterSessionKey, ownerKey: row.owner_key, scopeKind, ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), ...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}), ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), ...(row.agent_id ? { agentId: row.agent_id } : {}), ...(row.run_id ? { runId: row.run_id } : {}), ...(row.label ? { label: row.label } : {}), task: row.task, status: parseTaskStatus(row.status), deliveryStatus: parseTaskDeliveryStatus(row.delivery_status), notifyPolicy: parseTaskNotifyPolicy(row.notify_policy), createdAt: normalizeNumber(row.created_at) ?? 0, ...(startedAt != null ? { startedAt } : {}), ...(endedAt != null ? { endedAt } : {}), ...(lastEventAt != null ? { lastEventAt } : {}), ...(cleanupAfter != null ? { cleanupAfter } : {}), ...(row.error ? { error: row.error } : {}), ...(row.progress_summary ? { progressSummary: row.progress_summary } : {}), ...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}), ...(terminalOutcome ? { terminalOutcome } : {}), }; } function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState { const requesterOrigin = parseDeliveryContextJson(row.requester_origin_json); const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at); return { taskId: row.task_id, ...(requesterOrigin ? { requesterOrigin } : {}), ...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}), }; } function bindTaskRecordBase(record: TaskRecord): Insertable { return { task_id: record.taskId, runtime: record.runtime, task_kind: record.taskKind ?? null, source_id: record.sourceId ?? null, requester_session_key: record.scopeKind === "system" ? "" : record.requesterSessionKey, owner_key: record.ownerKey, scope_kind: record.scopeKind, child_session_key: record.childSessionKey ?? null, parent_flow_id: record.parentFlowId ?? null, parent_task_id: record.parentTaskId ?? null, agent_id: record.agentId ?? null, run_id: record.runId ?? null, label: record.label ?? null, task: record.task, status: record.status, delivery_status: record.deliveryStatus, notify_policy: record.notifyPolicy, created_at: record.createdAt, started_at: record.startedAt ?? null, ended_at: record.endedAt ?? null, last_event_at: record.lastEventAt ?? null, cleanup_after: record.cleanupAfter ?? null, error: record.error ?? null, progress_summary: record.progressSummary ?? null, terminal_summary: record.terminalSummary ?? null, terminal_outcome: record.terminalOutcome ?? null, }; } function bindTaskDeliveryState(state: TaskDeliveryState): Insertable { return { task_id: state.taskId, requester_origin_json: serializeJson(state.requesterOrigin), last_notified_event_at: state.lastNotifiedEventAt ?? null, }; } function getTaskRegistryKysely(db: DatabaseSync) { return getNodeSqliteKysely(db); } function pruneRowsNotInSnapshot(params: { db: DatabaseSync; tableName: "task_delivery_state" | "task_runs"; columnName: "task_id"; tempTableName: string; ids: readonly string[]; }) { params.db.exec(`CREATE TEMP TABLE IF NOT EXISTS ${params.tempTableName} (id TEXT PRIMARY KEY)`); params.db.exec(`DELETE FROM ${params.tempTableName}`); const insert = params.db.prepare(`INSERT OR IGNORE INTO ${params.tempTableName} (id) VALUES (?)`); for (const id of params.ids) { insert.run(id); } params.db.exec(` DELETE FROM ${params.tableName} WHERE NOT EXISTS ( SELECT 1 FROM ${params.tempTableName} WHERE ${params.tempTableName}.id = ${params.tableName}.${params.columnName} ) `); params.db.exec(`DELETE FROM ${params.tempTableName}`); } function selectTaskRows(db: DatabaseSync): TaskRegistryRow[] { const query = getTaskRegistryKysely(db) .selectFrom("task_runs") .select(TASK_RUN_SELECT_COLUMNS) .orderBy("created_at", "asc") .orderBy("task_id", "asc"); return executeSqliteQuerySync(db, query).rows; } function selectTaskDeliveryStateRows(db: DatabaseSync): TaskDeliveryStateRow[] { const query = getTaskRegistryKysely(db) .selectFrom("task_delivery_state") .select(["task_id", "requester_origin_json", "last_notified_event_at"]) .orderBy("task_id", "asc"); return executeSqliteQuerySync(db, query).rows; } function upsertTaskRow(db: DatabaseSync, row: Insertable): void { executeSqliteQuerySync( db, getTaskRegistryKysely(db) .insertInto("task_runs") .values(row) .onConflict((conflict) => conflict.column("task_id").doUpdateSet({ runtime: (eb) => eb.ref("excluded.runtime"), task_kind: (eb) => eb.ref("excluded.task_kind"), source_id: (eb) => eb.ref("excluded.source_id"), requester_session_key: (eb) => eb.ref("excluded.requester_session_key"), owner_key: (eb) => eb.ref("excluded.owner_key"), scope_kind: (eb) => eb.ref("excluded.scope_kind"), child_session_key: (eb) => eb.ref("excluded.child_session_key"), parent_flow_id: (eb) => eb.ref("excluded.parent_flow_id"), parent_task_id: (eb) => eb.ref("excluded.parent_task_id"), agent_id: (eb) => eb.ref("excluded.agent_id"), run_id: (eb) => eb.ref("excluded.run_id"), label: (eb) => eb.ref("excluded.label"), task: (eb) => eb.ref("excluded.task"), status: (eb) => eb.ref("excluded.status"), delivery_status: (eb) => eb.ref("excluded.delivery_status"), notify_policy: (eb) => eb.ref("excluded.notify_policy"), created_at: (eb) => eb.ref("excluded.created_at"), started_at: (eb) => eb.ref("excluded.started_at"), ended_at: (eb) => eb.ref("excluded.ended_at"), last_event_at: (eb) => eb.ref("excluded.last_event_at"), cleanup_after: (eb) => eb.ref("excluded.cleanup_after"), error: (eb) => eb.ref("excluded.error"), progress_summary: (eb) => eb.ref("excluded.progress_summary"), terminal_summary: (eb) => eb.ref("excluded.terminal_summary"), terminal_outcome: (eb) => eb.ref("excluded.terminal_outcome"), }), ), ); } function replaceTaskDeliveryStateRow( db: DatabaseSync, row: Insertable, ): void { executeSqliteQuerySync( db, getTaskRegistryKysely(db) .insertInto("task_delivery_state") .values(row) .onConflict((conflict) => conflict.column("task_id").doUpdateSet({ requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"), last_notified_event_at: (eb) => eb.ref("excluded.last_notified_event_at"), }), ), ); } function deleteTaskRowsWithDeliveryState(db: DatabaseSync, taskId: string): void { const kysely = getTaskRegistryKysely(db); executeSqliteQuerySync( db, kysely.deleteFrom("task_delivery_state").where("task_id", "=", taskId), ); executeSqliteQuerySync(db, kysely.deleteFrom("task_runs").where("task_id", "=", taskId)); } function openTaskRegistryDatabase(): TaskRegistryDatabase { const database = openOpenClawStateDatabase(); const pathname = database.path; if (cachedDatabase && cachedDatabase.path === pathname && cachedDatabase.db.isOpen) { return cachedDatabase; } if (cachedDatabase && !cachedDatabase.db.isOpen) { cachedDatabase = null; } cachedDatabase = { db: database.db, path: pathname, }; return cachedDatabase; } function withWriteTransaction(write: (database: TaskRegistryDatabase) => void) { const database = openTaskRegistryDatabase(); runOpenClawStateWriteTransaction(() => { write(database); }); } export function loadTaskRegistryStateFromSqlite(): TaskRegistryStoreSnapshot { const { db } = openTaskRegistryDatabase(); const taskRows = selectTaskRows(db); const deliveryRows = selectTaskDeliveryStateRows(db); return { tasks: new Map(taskRows.map((row) => [row.task_id, rowToTaskRecord(row)])), deliveryStates: new Map(deliveryRows.map((row) => [row.task_id, rowToTaskDeliveryState(row)])), }; } export function listTaskRegistryRecordsByOwnerKeyFromSqlite(ownerKey: string): TaskRecord[] { const key = ownerKey.trim(); if (!key) { return []; } const { db } = openTaskRegistryDatabase(); const query = getTaskRegistryKysely(db) .selectFrom("task_runs") .select(TASK_RUN_SELECT_COLUMNS) .where("owner_key", "=", key) .orderBy("created_at", "asc") .orderBy("task_id", "asc"); const rows = executeSqliteQuerySync(db, query).rows; return rows.map(rowToTaskRecord); } export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapshot) { withWriteTransaction(({ db }) => { const kysely = getTaskRegistryKysely(db); const taskIds = [...snapshot.tasks.keys()]; if (taskIds.length === 0) { executeSqliteQuerySync(db, kysely.deleteFrom("task_delivery_state")); executeSqliteQuerySync(db, kysely.deleteFrom("task_runs")); return; } pruneRowsNotInSnapshot({ db, tableName: "task_runs", columnName: "task_id", tempTableName: "openclaw_live_task_run_ids", ids: taskIds, }); const deliveryTaskIds = [...snapshot.deliveryStates.keys()]; if (deliveryTaskIds.length === 0) { executeSqliteQuerySync(db, kysely.deleteFrom("task_delivery_state")); } else { pruneRowsNotInSnapshot({ db, tableName: "task_delivery_state", columnName: "task_id", tempTableName: "openclaw_live_task_delivery_ids", ids: deliveryTaskIds, }); } for (const task of snapshot.tasks.values()) { upsertTaskRow(db, bindTaskRecordBase(task)); } for (const state of snapshot.deliveryStates.values()) { replaceTaskDeliveryStateRow(db, bindTaskDeliveryState(state)); } }); } export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { withWriteTransaction(({ db }) => { upsertTaskRow(db, bindTaskRecordBase(task)); }); } export function upsertTaskWithDeliveryStateToSqlite(params: { task: TaskRecord; deliveryState?: TaskDeliveryState; }) { withWriteTransaction(({ db }) => { upsertTaskRow(db, bindTaskRecordBase(params.task)); if (params.deliveryState) { replaceTaskDeliveryStateRow(db, bindTaskDeliveryState(params.deliveryState)); } else { executeSqliteQuerySync( db, getTaskRegistryKysely(db) .deleteFrom("task_delivery_state") .where("task_id", "=", params.task.taskId), ); } }); } export function deleteTaskRegistryRecordFromSqlite(taskId: string) { withWriteTransaction(({ db }) => { deleteTaskRowsWithDeliveryState(db, taskId); }); } export function deleteTaskAndDeliveryStateFromSqlite(taskId: string) { withWriteTransaction(({ db }) => { deleteTaskRowsWithDeliveryState(db, taskId); }); } export function upsertTaskDeliveryStateToSqlite(state: TaskDeliveryState) { withWriteTransaction(({ db }) => { replaceTaskDeliveryStateRow(db, bindTaskDeliveryState(state)); }); } export function deleteTaskDeliveryStateFromSqlite(taskId: string) { withWriteTransaction(({ db }) => { executeSqliteQuerySync( db, getTaskRegistryKysely(db).deleteFrom("task_delivery_state").where("task_id", "=", taskId), ); }); } export function closeTaskRegistryDatabase() { cachedDatabase = null; closeOpenClawStateDatabase(); }