import type { DatabaseSync } from "node:sqlite"; import type { Insertable, Selectable } from "kysely"; import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; import { closeOpenClawStateDatabase, openOpenClawStateDatabase, runOpenClawStateWriteTransaction, } from "../state/openclaw-state-db.js"; import type { TaskFlowRegistryStoreSnapshot } from "./task-flow-registry.store.types.js"; import { parseOptionalTaskFlowSyncMode, parseTaskFlowStatus, type JsonValue, type TaskFlowRecord, type TaskFlowSyncMode, } from "./task-flow-registry.types.js"; import { parseDeliveryContextJson } from "./task-registry.sqlite.shared.js"; import { parseTaskNotifyPolicy } from "./task-registry.types.js"; type FlowRunsTable = OpenClawStateKyselyDatabase["flow_runs"]; type FlowRegistryStoreDatabase = Pick; type FlowRegistryRow = Selectable & { sync_mode: string | null; status: string; notify_policy: string; }; type FlowRegistryDatabase = { db: DatabaseSync; path: string; }; let cachedDatabase: FlowRegistryDatabase | null = null; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { return Number(value); } return typeof value === "number" ? value : undefined; } function serializeJson(value: unknown): string | null { return value === undefined ? null : JSON.stringify(value); } function parseJsonValue(raw: string | null): JsonValue | undefined { if (!raw?.trim()) { return undefined; } try { return JSON.parse(raw) as JsonValue; } catch { return undefined; } } function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode { const syncMode = parseOptionalTaskFlowSyncMode(row.sync_mode); if (syncMode) { return syncMode; } return row.shape === "single_task" ? "task_mirrored" : "managed"; } function rowToFlowRecord(row: FlowRegistryRow): TaskFlowRecord { const endedAt = normalizeNumber(row.ended_at); const cancelRequestedAt = normalizeNumber(row.cancel_requested_at); const requesterOrigin = parseDeliveryContextJson(row.requester_origin_json); const stateJson = parseJsonValue(row.state_json); const waitJson = parseJsonValue(row.wait_json); return { flowId: row.flow_id, syncMode: rowToSyncMode(row), ownerKey: row.owner_key, ...(requesterOrigin ? { requesterOrigin } : {}), ...(row.controller_id ? { controllerId: row.controller_id } : {}), revision: normalizeNumber(row.revision) ?? 0, status: parseTaskFlowStatus(row.status), notifyPolicy: parseTaskNotifyPolicy(row.notify_policy), goal: row.goal, ...(row.current_step ? { currentStep: row.current_step } : {}), ...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}), ...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}), ...(stateJson !== undefined ? { stateJson } : {}), ...(waitJson !== undefined ? { waitJson } : {}), ...(cancelRequestedAt != null ? { cancelRequestedAt } : {}), createdAt: normalizeNumber(row.created_at) ?? 0, updatedAt: normalizeNumber(row.updated_at) ?? 0, ...(endedAt != null ? { endedAt } : {}), }; } function bindFlowRecord(record: TaskFlowRecord): Insertable { return { flow_id: record.flowId, sync_mode: record.syncMode, shape: null, owner_key: record.ownerKey, requester_origin_json: serializeJson(record.requesterOrigin), controller_id: record.controllerId ?? null, revision: record.revision, status: record.status, notify_policy: record.notifyPolicy, goal: record.goal, current_step: record.currentStep ?? null, blocked_task_id: record.blockedTaskId ?? null, blocked_summary: record.blockedSummary ?? null, state_json: serializeJson(record.stateJson), wait_json: serializeJson(record.waitJson), cancel_requested_at: record.cancelRequestedAt ?? null, created_at: record.createdAt, updated_at: record.updatedAt, ended_at: record.endedAt ?? null, }; } function getFlowRegistryKysely(db: DatabaseSync) { return getNodeSqliteKysely(db); } function pruneFlowsNotInSnapshot(params: { db: DatabaseSync; ids: readonly string[] }) { const tempTableName = "openclaw_live_flow_ids"; params.db.exec(`CREATE TEMP TABLE IF NOT EXISTS ${tempTableName} (id TEXT PRIMARY KEY)`); params.db.exec(`DELETE FROM ${tempTableName}`); const insert = params.db.prepare(`INSERT OR IGNORE INTO ${tempTableName} (id) VALUES (?)`); for (const id of params.ids) { insert.run(id); } params.db.exec(` DELETE FROM flow_runs WHERE NOT EXISTS ( SELECT 1 FROM ${tempTableName} WHERE ${tempTableName}.id = flow_runs.flow_id ) `); params.db.exec(`DELETE FROM ${tempTableName}`); } function selectFlowRows(db: DatabaseSync): FlowRegistryRow[] { const query = getFlowRegistryKysely(db) .selectFrom("flow_runs") .select([ "flow_id", "sync_mode", "shape", "owner_key", "requester_origin_json", "controller_id", "revision", "status", "notify_policy", "goal", "current_step", "blocked_task_id", "blocked_summary", "state_json", "wait_json", "cancel_requested_at", "created_at", "updated_at", "ended_at", ]) .orderBy("created_at", "asc") .orderBy("flow_id", "asc"); return executeSqliteQuerySync(db, query).rows; } function upsertFlowRow(db: DatabaseSync, row: Insertable): void { executeSqliteQuerySync( db, getFlowRegistryKysely(db) .insertInto("flow_runs") .values(row) .onConflict((conflict) => conflict.column("flow_id").doUpdateSet({ sync_mode: (eb) => eb.ref("excluded.sync_mode"), owner_key: (eb) => eb.ref("excluded.owner_key"), requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"), controller_id: (eb) => eb.ref("excluded.controller_id"), revision: (eb) => eb.ref("excluded.revision"), status: (eb) => eb.ref("excluded.status"), notify_policy: (eb) => eb.ref("excluded.notify_policy"), goal: (eb) => eb.ref("excluded.goal"), current_step: (eb) => eb.ref("excluded.current_step"), blocked_task_id: (eb) => eb.ref("excluded.blocked_task_id"), blocked_summary: (eb) => eb.ref("excluded.blocked_summary"), state_json: (eb) => eb.ref("excluded.state_json"), wait_json: (eb) => eb.ref("excluded.wait_json"), cancel_requested_at: (eb) => eb.ref("excluded.cancel_requested_at"), created_at: (eb) => eb.ref("excluded.created_at"), updated_at: (eb) => eb.ref("excluded.updated_at"), ended_at: (eb) => eb.ref("excluded.ended_at"), }), ), ); } function openFlowRegistryDatabase(): FlowRegistryDatabase { const database = openOpenClawStateDatabase(); const pathname = database.path; if (cachedDatabase && cachedDatabase.path === pathname && cachedDatabase.db.isOpen) { return cachedDatabase; } if (cachedDatabase && !cachedDatabase.db.isOpen) { cachedDatabase = null; } cachedDatabase = { db: database.db, path: pathname, }; return cachedDatabase; } function withWriteTransaction(write: (database: FlowRegistryDatabase) => void) { const database = openFlowRegistryDatabase(); runOpenClawStateWriteTransaction(() => { write(database); }); } export function loadTaskFlowRegistryStateFromSqlite(): TaskFlowRegistryStoreSnapshot { const { db } = openFlowRegistryDatabase(); const rows = selectFlowRows(db); return { flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])), }; } export function saveTaskFlowRegistryStateToSqlite(snapshot: TaskFlowRegistryStoreSnapshot) { withWriteTransaction(({ db }) => { const kysely = getFlowRegistryKysely(db); const flowIds = [...snapshot.flows.keys()]; if (flowIds.length === 0) { executeSqliteQuerySync(db, kysely.deleteFrom("flow_runs")); return; } pruneFlowsNotInSnapshot({ db, ids: flowIds }); for (const flow of snapshot.flows.values()) { upsertFlowRow(db, bindFlowRecord(flow)); } }); } export function upsertTaskFlowRegistryRecordToSqlite(flow: TaskFlowRecord) { withWriteTransaction(({ db }) => { upsertFlowRow(db, bindFlowRecord(flow)); }); } export function deleteTaskFlowRegistryRecordFromSqlite(flowId: string) { withWriteTransaction(({ db }) => { executeSqliteQuerySync( db, getFlowRegistryKysely(db).deleteFrom("flow_runs").where("flow_id", "=", flowId), ); }); } export function closeTaskFlowRegistryDatabase() { cachedDatabase = null; closeOpenClawStateDatabase(); }