Files
openclaw/extensions/workboard/src/sqlite-store.ts
Peter Steinberger 8d6a6e9d03 feat: move workboard to relational sqlite
Move Workboard durable data into a relational SQLite database and add extension doctor migration for .28 plugin-state rows. Preserve attachment lifecycle behavior, SQLite permissions/WAL settings, and scoped plugin migration access.
2026-05-31 00:02:23 +01:00

1386 lines
45 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { DatabaseSync, type SQLInputValue } from "node:sqlite";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import type {
PersistedWorkboardAttachment,
PersistedWorkboardBoard,
PersistedWorkboardCard,
PersistedWorkboardNotificationSubscription,
WorkboardKeyedStore,
} from "./persistence-types.js";
import type {
WorkboardArtifact,
WorkboardAttachment,
WorkboardCard,
WorkboardComment,
WorkboardDiagnostic,
WorkboardEvent,
WorkboardExecution,
WorkboardLink,
WorkboardMetadata,
WorkboardNotification,
WorkboardProof,
WorkboardRunAttempt,
WorkboardWorkerLog,
} from "./types.js";
const WORKBOARD_DB_RELATIVE_PATH = ["plugins", "workboard", "workboard.sqlite"] as const;
const SCHEMA_VERSION = 1;
const WORKBOARD_SQLITE_BUSY_TIMEOUT_MS = 5000;
const WORKBOARD_SQLITE_DIR_MODE = 0o700;
const WORKBOARD_SQLITE_FILE_MODE = 0o600;
type Row = Record<string, unknown>;
export type WorkboardSqliteStores = {
cards: WorkboardKeyedStore;
boards: WorkboardKeyedStore<PersistedWorkboardBoard>;
subscriptions: WorkboardKeyedStore<PersistedWorkboardNotificationSubscription>;
attachments: WorkboardKeyedStore<PersistedWorkboardAttachment>;
close: () => void;
};
export function resolveWorkboardSqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), ...WORKBOARD_DB_RELATIVE_PATH);
}
function jsonValue(value: unknown): string | null {
return value === undefined ? null : JSON.stringify(value);
}
function parseJson(value: unknown): unknown {
if (typeof value !== "string" || !value) {
return undefined;
}
return JSON.parse(value) as unknown;
}
function stringValue(row: Row, key: string): string | undefined {
const value = row[key];
return typeof value === "string" && value.length > 0 ? value : undefined;
}
function numberValue(row: Row, key: string): number | undefined {
const value = row[key];
if (typeof value === "number") {
return Number.isFinite(value) ? value : undefined;
}
if (typeof value === "bigint") {
return Number(value);
}
return undefined;
}
function requiredString(row: Row, key: string): string {
const value = stringValue(row, key);
if (!value) {
throw new Error(`workboard sqlite row missing ${key}`);
}
return value;
}
function requiredNumber(row: Row, key: string): number {
const value = numberValue(row, key);
if (value === undefined) {
throw new Error(`workboard sqlite row missing ${key}`);
}
return value;
}
function optional<T extends object>(value: T): T | undefined {
return Object.keys(value).length > 0 ? value : undefined;
}
function asBlobContent(value: string): Uint8Array {
return Buffer.from(value, "base64");
}
function blobToBase64(value: unknown): string {
if (value instanceof Uint8Array) {
return Buffer.from(value).toString("base64");
}
if (typeof value === "string") {
return Buffer.from(value).toString("base64");
}
return "";
}
function runTransaction<T>(db: DatabaseSync, run: () => T): T {
db.exec("BEGIN IMMEDIATE");
try {
const result = run();
db.exec("COMMIT");
return result;
} catch (error) {
db.exec("ROLLBACK");
throw error;
}
}
function ensureWorkboardSchema(db: DatabaseSync): void {
db.exec(`
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS workboard_schema_migrations (
id TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS workboard_boards (
id TEXT PRIMARY KEY,
name TEXT,
description TEXT,
icon TEXT,
color TEXT,
default_workspace_json TEXT,
orchestration_json TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
archived_at INTEGER
);
CREATE TABLE IF NOT EXISTS workboard_cards (
id TEXT PRIMARY KEY,
board_id TEXT NOT NULL,
title TEXT NOT NULL,
notes TEXT,
status TEXT NOT NULL,
priority TEXT NOT NULL,
agent_id TEXT,
session_key TEXT,
run_id TEXT,
task_id TEXT,
source_url TEXT,
position REAL NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER,
completed_at INTEGER,
execution_id TEXT,
execution_kind TEXT,
execution_engine TEXT,
execution_mode TEXT,
execution_status TEXT,
execution_model TEXT,
execution_session_key TEXT,
execution_run_id TEXT,
execution_started_at INTEGER,
execution_updated_at INTEGER,
automation_json TEXT,
claim_json TEXT,
template_id TEXT,
archived_at INTEGER,
stale_json TEXT,
failure_count INTEGER
);
CREATE INDEX IF NOT EXISTS workboard_cards_board_status_idx
ON workboard_cards(board_id, status, position);
CREATE INDEX IF NOT EXISTS workboard_cards_session_idx
ON workboard_cards(session_key, run_id);
CREATE TABLE IF NOT EXISTS workboard_card_labels (
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
label TEXT NOT NULL,
PRIMARY KEY(card_id, ordinal)
);
CREATE TABLE IF NOT EXISTS workboard_card_events (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
kind TEXT NOT NULL,
at INTEGER NOT NULL,
from_status TEXT,
to_status TEXT,
session_key TEXT,
run_id TEXT
);
CREATE TABLE IF NOT EXISTS workboard_card_attempts (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
status TEXT NOT NULL,
started_at INTEGER NOT NULL,
ended_at INTEGER,
engine TEXT,
mode TEXT,
model TEXT,
session_key TEXT,
run_id TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS workboard_card_comments (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
body TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER
);
CREATE TABLE IF NOT EXISTS workboard_card_links (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
type TEXT NOT NULL,
target_card_id TEXT,
title TEXT,
url TEXT,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS workboard_card_proof (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
status TEXT NOT NULL,
label TEXT,
command TEXT,
url TEXT,
note TEXT,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS workboard_card_artifacts (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
label TEXT,
url TEXT,
path TEXT,
mime_type TEXT,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS workboard_card_diagnostics (
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
kind TEXT NOT NULL,
severity TEXT NOT NULL,
title TEXT NOT NULL,
detail TEXT NOT NULL,
first_seen_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
count INTEGER NOT NULL,
actions_json TEXT NOT NULL,
PRIMARY KEY(card_id, ordinal)
);
CREATE TABLE IF NOT EXISTS workboard_card_notifications (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
kind TEXT NOT NULL,
message TEXT NOT NULL,
created_at INTEGER NOT NULL,
sequence INTEGER,
session_key TEXT,
run_id TEXT
);
CREATE TABLE IF NOT EXISTS workboard_worker_logs (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
created_at INTEGER NOT NULL,
session_key TEXT,
run_id TEXT
);
CREATE TABLE IF NOT EXISTS workboard_worker_protocol (
card_id TEXT PRIMARY KEY REFERENCES workboard_cards(id) ON DELETE CASCADE,
state TEXT NOT NULL,
updated_at INTEGER NOT NULL,
detail TEXT
);
CREATE TABLE IF NOT EXISTS workboard_card_attachments (
id TEXT PRIMARY KEY,
card_id TEXT NOT NULL REFERENCES workboard_cards(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
file_name TEXT NOT NULL,
byte_size INTEGER NOT NULL,
mime_type TEXT,
note TEXT,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS workboard_card_attachments_card_idx
ON workboard_card_attachments(card_id, ordinal);
CREATE TABLE IF NOT EXISTS workboard_attachment_blobs (
attachment_id TEXT PRIMARY KEY,
content BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS workboard_notification_subscriptions (
id TEXT PRIMARY KEY,
board_id TEXT NOT NULL,
card_id TEXT,
session_key TEXT,
run_id TEXT,
target TEXT,
event_kinds_json TEXT,
last_event_at INTEGER,
last_event_id TEXT,
last_event_sequence INTEGER,
delivered_event_ids_json TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
`);
db.prepare(
"INSERT OR IGNORE INTO workboard_schema_migrations (id, applied_at) VALUES (?, ?)",
).run(`schema-${SCHEMA_VERSION}`, Date.now());
}
function configureWorkboardDatabase(db: DatabaseSync): void {
db.exec(`
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = ${WORKBOARD_SQLITE_BUSY_TIMEOUT_MS};
PRAGMA foreign_keys = ON;
`);
}
function chmodIfExists(targetPath: string, mode: number): void {
try {
fs.chmodSync(targetPath, mode);
} catch (err) {
if ((err as NodeJS.ErrnoException).code !== "ENOENT") {
throw err;
}
}
}
function hardenWorkboardDatabaseFiles(dbPath: string): void {
fs.chmodSync(path.dirname(dbPath), WORKBOARD_SQLITE_DIR_MODE);
chmodIfExists(dbPath, WORKBOARD_SQLITE_FILE_MODE);
chmodIfExists(`${dbPath}-wal`, WORKBOARD_SQLITE_FILE_MODE);
chmodIfExists(`${dbPath}-shm`, WORKBOARD_SQLITE_FILE_MODE);
}
function createDatabase(dbPath: string): DatabaseSync {
fs.mkdirSync(path.dirname(dbPath), { recursive: true, mode: WORKBOARD_SQLITE_DIR_MODE });
chmodIfExists(path.dirname(dbPath), WORKBOARD_SQLITE_DIR_MODE);
if (!fs.existsSync(dbPath)) {
fs.closeSync(fs.openSync(dbPath, "a", WORKBOARD_SQLITE_FILE_MODE));
}
const db = new DatabaseSync(dbPath);
configureWorkboardDatabase(db);
ensureWorkboardSchema(db);
hardenWorkboardDatabaseFiles(dbPath);
return db;
}
function childRows(db: DatabaseSync, table: string, cardId: string): Row[] {
return db
.prepare(`SELECT * FROM ${table} WHERE card_id = ? ORDER BY ordinal ASC`)
.all(cardId) as Row[];
}
function readLabels(db: DatabaseSync, cardId: string): string[] {
return childRows(db, "workboard_card_labels", cardId).flatMap((row) => {
const label = stringValue(row, "label");
return label ? [label] : [];
});
}
function readEvents(db: DatabaseSync, cardId: string): WorkboardEvent[] | undefined {
const events = childRows(db, "workboard_card_events", cardId).map((row) => {
const event: WorkboardEvent = {
id: requiredString(row, "id"),
kind: requiredString(row, "kind") as WorkboardEvent["kind"],
at: requiredNumber(row, "at"),
};
const fromStatus = stringValue(row, "from_status");
const toStatus = stringValue(row, "to_status");
const sessionKey = stringValue(row, "session_key");
const runId = stringValue(row, "run_id");
if (fromStatus) {
event.fromStatus = fromStatus as WorkboardEvent["fromStatus"];
}
if (toStatus) {
event.toStatus = toStatus as WorkboardEvent["toStatus"];
}
if (sessionKey) {
event.sessionKey = sessionKey;
}
if (runId) {
event.runId = runId;
}
return event;
});
return events.length > 0 ? events : undefined;
}
function readExecution(row: Row): WorkboardExecution | undefined {
const id = stringValue(row, "execution_id");
if (!id) {
return undefined;
}
return {
id,
kind: "agent-session",
engine: requiredString(row, "execution_engine") as WorkboardExecution["engine"],
mode: requiredString(row, "execution_mode") as WorkboardExecution["mode"],
status: requiredString(row, "execution_status") as WorkboardExecution["status"],
model: requiredString(row, "execution_model"),
...(stringValue(row, "execution_session_key")
? { sessionKey: stringValue(row, "execution_session_key") }
: {}),
...(stringValue(row, "execution_run_id")
? { runId: stringValue(row, "execution_run_id") }
: {}),
startedAt: requiredNumber(row, "execution_started_at"),
updatedAt: requiredNumber(row, "execution_updated_at"),
};
}
function readMetadata(db: DatabaseSync, row: Row): WorkboardMetadata | undefined {
const cardId = requiredString(row, "id");
const attempts = childRows(db, "workboard_card_attempts", cardId).map((child) => {
const entry: WorkboardRunAttempt = {
id: requiredString(child, "id"),
status: requiredString(child, "status") as WorkboardRunAttempt["status"],
startedAt: requiredNumber(child, "started_at"),
};
const endedAt = numberValue(child, "ended_at");
const engine = stringValue(child, "engine");
const mode = stringValue(child, "mode");
const model = stringValue(child, "model");
const sessionKey = stringValue(child, "session_key");
const runId = stringValue(child, "run_id");
const error = stringValue(child, "error");
if (endedAt !== undefined) {
entry.endedAt = endedAt;
}
if (engine) {
entry.engine = engine as WorkboardRunAttempt["engine"];
}
if (mode) {
entry.mode = mode as WorkboardRunAttempt["mode"];
}
if (model) {
entry.model = model;
}
if (sessionKey) {
entry.sessionKey = sessionKey;
}
if (runId) {
entry.runId = runId;
}
if (error) {
entry.error = error;
}
return entry;
});
const comments = childRows(db, "workboard_card_comments", cardId).map((child) => {
const entry: WorkboardComment = {
id: requiredString(child, "id"),
body: requiredString(child, "body"),
createdAt: requiredNumber(child, "created_at"),
};
const updatedAt = numberValue(child, "updated_at");
if (updatedAt !== undefined) {
entry.updatedAt = updatedAt;
}
return entry;
});
const links = childRows(db, "workboard_card_links", cardId).map((child) => {
const entry: WorkboardLink = {
id: requiredString(child, "id"),
type: requiredString(child, "type") as WorkboardLink["type"],
createdAt: requiredNumber(child, "created_at"),
};
const targetCardId = stringValue(child, "target_card_id");
const title = stringValue(child, "title");
const url = stringValue(child, "url");
if (targetCardId) {
entry.targetCardId = targetCardId;
}
if (title) {
entry.title = title;
}
if (url) {
entry.url = url;
}
return entry;
});
const proof = childRows(db, "workboard_card_proof", cardId).map((child) => {
const entry: WorkboardProof = {
id: requiredString(child, "id"),
status: requiredString(child, "status") as WorkboardProof["status"],
createdAt: requiredNumber(child, "created_at"),
};
const label = stringValue(child, "label");
const command = stringValue(child, "command");
const url = stringValue(child, "url");
const note = stringValue(child, "note");
if (label) {
entry.label = label;
}
if (command) {
entry.command = command;
}
if (url) {
entry.url = url;
}
if (note) {
entry.note = note;
}
return entry;
});
const artifacts = childRows(db, "workboard_card_artifacts", cardId).map((child) => {
const entry: WorkboardArtifact = {
id: requiredString(child, "id"),
createdAt: requiredNumber(child, "created_at"),
};
const label = stringValue(child, "label");
const url = stringValue(child, "url");
const artifactPath = stringValue(child, "path");
const mimeType = stringValue(child, "mime_type");
if (label) {
entry.label = label;
}
if (url) {
entry.url = url;
}
if (artifactPath) {
entry.path = artifactPath;
}
if (mimeType) {
entry.mimeType = mimeType;
}
return entry;
});
const attachments = childRows(db, "workboard_card_attachments", cardId).map((child) => {
const entry: WorkboardAttachment = {
id: requiredString(child, "id"),
cardId: requiredString(child, "card_id"),
createdAt: requiredNumber(child, "created_at"),
fileName: requiredString(child, "file_name"),
byteSize: requiredNumber(child, "byte_size"),
};
const mimeType = stringValue(child, "mime_type");
const note = stringValue(child, "note");
if (mimeType) {
entry.mimeType = mimeType;
}
if (note) {
entry.note = note;
}
return entry;
});
const workerLogs = childRows(db, "workboard_worker_logs", cardId).map((child) => {
const entry: WorkboardWorkerLog = {
id: requiredString(child, "id"),
createdAt: requiredNumber(child, "created_at"),
level: requiredString(child, "level") as WorkboardWorkerLog["level"],
message: requiredString(child, "message"),
};
const sessionKey = stringValue(child, "session_key");
const runId = stringValue(child, "run_id");
if (sessionKey) {
entry.sessionKey = sessionKey;
}
if (runId) {
entry.runId = runId;
}
return entry;
});
const diagnostics = childRows(db, "workboard_card_diagnostics", cardId).map((child) => ({
kind: requiredString(child, "kind") as WorkboardDiagnostic["kind"],
severity: requiredString(child, "severity") as WorkboardDiagnostic["severity"],
title: requiredString(child, "title"),
detail: requiredString(child, "detail"),
firstSeenAt: requiredNumber(child, "first_seen_at"),
lastSeenAt: requiredNumber(child, "last_seen_at"),
count: requiredNumber(child, "count"),
actions: (parseJson(child.actions_json) as WorkboardDiagnostic["actions"] | undefined) ?? [],
}));
const notifications = childRows(db, "workboard_card_notifications", cardId).map((child) => {
const entry: WorkboardNotification = {
id: requiredString(child, "id"),
kind: requiredString(child, "kind") as WorkboardNotification["kind"],
createdAt: requiredNumber(child, "created_at"),
message: requiredString(child, "message"),
};
const sequence = numberValue(child, "sequence");
const sessionKey = stringValue(child, "session_key");
const runId = stringValue(child, "run_id");
if (sequence !== undefined) {
entry.sequence = sequence;
}
if (sessionKey) {
entry.sessionKey = sessionKey;
}
if (runId) {
entry.runId = runId;
}
return entry;
});
const protocol = db
.prepare("SELECT * FROM workboard_worker_protocol WHERE card_id = ?")
.get(cardId) as Row | undefined;
const automation = parseJson(row.automation_json) as WorkboardMetadata["automation"] | undefined;
const claim = parseJson(row.claim_json) as WorkboardMetadata["claim"] | undefined;
const stale = parseJson(row.stale_json) as WorkboardMetadata["stale"] | undefined;
return optional({
...(attempts.length > 0 ? { attempts } : {}),
...(comments.length > 0 ? { comments } : {}),
...(links.length > 0 ? { links } : {}),
...(proof.length > 0 ? { proof } : {}),
...(artifacts.length > 0 ? { artifacts } : {}),
...(attachments.length > 0 ? { attachments } : {}),
...(workerLogs.length > 0 ? { workerLogs } : {}),
...(protocol
? {
workerProtocol: {
state: requiredString(protocol, "state") as NonNullable<
WorkboardMetadata["workerProtocol"]
>["state"],
updatedAt: requiredNumber(protocol, "updated_at"),
...(stringValue(protocol, "detail") ? { detail: stringValue(protocol, "detail") } : {}),
},
}
: {}),
...(automation ? { automation } : {}),
...(claim ? { claim } : {}),
...(diagnostics.length > 0 ? { diagnostics } : {}),
...(notifications.length > 0 ? { notifications } : {}),
...(stringValue(row, "template_id")
? { templateId: stringValue(row, "template_id") as WorkboardMetadata["templateId"] }
: {}),
...(numberValue(row, "archived_at") !== undefined
? { archivedAt: numberValue(row, "archived_at") }
: {}),
...(stale ? { stale } : {}),
...(numberValue(row, "failure_count") !== undefined
? { failureCount: numberValue(row, "failure_count") }
: {}),
});
}
function readCard(db: DatabaseSync, row: Row): WorkboardCard {
const card: WorkboardCard = {
id: requiredString(row, "id"),
title: requiredString(row, "title"),
status: requiredString(row, "status") as WorkboardCard["status"],
priority: requiredString(row, "priority") as WorkboardCard["priority"],
labels: readLabels(db, requiredString(row, "id")),
position: requiredNumber(row, "position"),
createdAt: requiredNumber(row, "created_at"),
updatedAt: requiredNumber(row, "updated_at"),
};
const metadata = readMetadata(db, row);
return {
...card,
...(stringValue(row, "notes") ? { notes: stringValue(row, "notes") } : {}),
...(stringValue(row, "agent_id") ? { agentId: stringValue(row, "agent_id") } : {}),
...(stringValue(row, "session_key") ? { sessionKey: stringValue(row, "session_key") } : {}),
...(stringValue(row, "run_id") ? { runId: stringValue(row, "run_id") } : {}),
...(stringValue(row, "task_id") ? { taskId: stringValue(row, "task_id") } : {}),
...(stringValue(row, "source_url") ? { sourceUrl: stringValue(row, "source_url") } : {}),
...(readExecution(row) ? { execution: readExecution(row) } : {}),
...(numberValue(row, "started_at") !== undefined
? { startedAt: numberValue(row, "started_at") }
: {}),
...(numberValue(row, "completed_at") !== undefined
? { completedAt: numberValue(row, "completed_at") }
: {}),
...(readEvents(db, card.id) ? { events: readEvents(db, card.id) } : {}),
...(metadata ? { metadata } : {}),
};
}
function cardBoardId(card: WorkboardCard): string {
return card.metadata?.automation?.boardId ?? "default";
}
function bindNull(value: unknown): SQLInputValue {
if (
value === undefined ||
value === null ||
typeof value === "string" ||
typeof value === "number" ||
typeof value === "bigint" ||
value instanceof Uint8Array
) {
return (value ?? null) as SQLInputValue;
}
return JSON.stringify(value);
}
function insertChildren<T>(
db: DatabaseSync,
table: string,
cardId: string,
entries: readonly T[] | undefined,
insert: (entry: T, ordinal: number) => void,
): void {
db.prepare(`DELETE FROM ${table} WHERE card_id = ?`).run(cardId);
entries?.forEach(insert);
}
function insertCard(db: DatabaseSync, card: WorkboardCard): void {
const execution = card.execution;
const metadata = card.metadata;
db.prepare(
`
INSERT INTO workboard_cards (
id, board_id, title, notes, status, priority, agent_id, session_key, run_id, task_id,
source_url, position, created_at, updated_at, started_at, completed_at,
execution_id, execution_kind, execution_engine, execution_mode, execution_status,
execution_model, execution_session_key, execution_run_id, execution_started_at,
execution_updated_at, automation_json, claim_json, template_id, archived_at, stale_json,
failure_count
) VALUES (
@id, @board_id, @title, @notes, @status, @priority, @agent_id, @session_key, @run_id,
@task_id, @source_url, @position, @created_at, @updated_at, @started_at, @completed_at,
@execution_id, @execution_kind, @execution_engine, @execution_mode, @execution_status,
@execution_model, @execution_session_key, @execution_run_id, @execution_started_at,
@execution_updated_at, @automation_json, @claim_json, @template_id, @archived_at,
@stale_json, @failure_count
)
ON CONFLICT(id) DO UPDATE SET
board_id = excluded.board_id,
title = excluded.title,
notes = excluded.notes,
status = excluded.status,
priority = excluded.priority,
agent_id = excluded.agent_id,
session_key = excluded.session_key,
run_id = excluded.run_id,
task_id = excluded.task_id,
source_url = excluded.source_url,
position = excluded.position,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
started_at = excluded.started_at,
completed_at = excluded.completed_at,
execution_id = excluded.execution_id,
execution_kind = excluded.execution_kind,
execution_engine = excluded.execution_engine,
execution_mode = excluded.execution_mode,
execution_status = excluded.execution_status,
execution_model = excluded.execution_model,
execution_session_key = excluded.execution_session_key,
execution_run_id = excluded.execution_run_id,
execution_started_at = excluded.execution_started_at,
execution_updated_at = excluded.execution_updated_at,
automation_json = excluded.automation_json,
claim_json = excluded.claim_json,
template_id = excluded.template_id,
archived_at = excluded.archived_at,
stale_json = excluded.stale_json,
failure_count = excluded.failure_count
`,
).run({
id: card.id,
board_id: cardBoardId(card),
title: card.title,
notes: bindNull(card.notes),
status: card.status,
priority: card.priority,
agent_id: bindNull(card.agentId),
session_key: bindNull(card.sessionKey),
run_id: bindNull(card.runId),
task_id: bindNull(card.taskId),
source_url: bindNull(card.sourceUrl),
position: card.position,
created_at: card.createdAt,
updated_at: card.updatedAt,
started_at: bindNull(card.startedAt),
completed_at: bindNull(card.completedAt),
execution_id: bindNull(execution?.id),
execution_kind: bindNull(execution?.kind),
execution_engine: bindNull(execution?.engine),
execution_mode: bindNull(execution?.mode),
execution_status: bindNull(execution?.status),
execution_model: bindNull(execution?.model),
execution_session_key: bindNull(execution?.sessionKey),
execution_run_id: bindNull(execution?.runId),
execution_started_at: bindNull(execution?.startedAt),
execution_updated_at: bindNull(execution?.updatedAt),
automation_json: jsonValue(metadata?.automation),
claim_json: jsonValue(metadata?.claim),
template_id: bindNull(metadata?.templateId),
archived_at: bindNull(metadata?.archivedAt),
stale_json: jsonValue(metadata?.stale),
failure_count: bindNull(metadata?.failureCount),
});
insertChildren(db, "workboard_card_labels", card.id, card.labels, (label, ordinal) => {
db.prepare("INSERT INTO workboard_card_labels (card_id, ordinal, label) VALUES (?, ?, ?)").run(
card.id,
ordinal,
label,
);
});
insertChildren(db, "workboard_card_events", card.id, card.events, (event, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_events
(id, card_id, ordinal, kind, at, from_status, to_status, session_key, run_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
event.id,
card.id,
ordinal,
event.kind,
event.at,
bindNull(event.fromStatus),
bindNull(event.toStatus),
bindNull(event.sessionKey),
bindNull(event.runId),
);
});
insertChildren(db, "workboard_card_attempts", card.id, metadata?.attempts, (entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_attempts
(id, card_id, ordinal, status, started_at, ended_at, engine, mode, model, session_key, run_id, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
card.id,
ordinal,
entry.status,
entry.startedAt,
bindNull(entry.endedAt),
bindNull(entry.engine),
bindNull(entry.mode),
bindNull(entry.model),
bindNull(entry.sessionKey),
bindNull(entry.runId),
bindNull(entry.error),
);
});
insertChildren(db, "workboard_card_comments", card.id, metadata?.comments, (entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_comments (id, card_id, ordinal, body, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
`,
).run(entry.id, card.id, ordinal, entry.body, entry.createdAt, bindNull(entry.updatedAt));
});
insertChildren(db, "workboard_card_links", card.id, metadata?.links, (entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_links
(id, card_id, ordinal, type, target_card_id, title, url, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
card.id,
ordinal,
entry.type,
bindNull(entry.targetCardId),
bindNull(entry.title),
bindNull(entry.url),
entry.createdAt,
);
});
insertChildren(db, "workboard_card_proof", card.id, metadata?.proof, (entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_proof
(id, card_id, ordinal, status, label, command, url, note, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
card.id,
ordinal,
entry.status,
bindNull(entry.label),
bindNull(entry.command),
bindNull(entry.url),
bindNull(entry.note),
entry.createdAt,
);
});
insertChildren(db, "workboard_card_artifacts", card.id, metadata?.artifacts, (entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_artifacts
(id, card_id, ordinal, label, url, path, mime_type, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
card.id,
ordinal,
bindNull(entry.label),
bindNull(entry.url),
bindNull(entry.path),
bindNull(entry.mimeType),
entry.createdAt,
);
});
insertChildren(
db,
"workboard_card_attachments",
card.id,
metadata?.attachments,
(entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_attachments
(id, card_id, ordinal, file_name, byte_size, mime_type, note, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
entry.cardId,
ordinal,
entry.fileName,
entry.byteSize,
bindNull(entry.mimeType),
bindNull(entry.note),
entry.createdAt,
);
},
);
insertChildren(
db,
"workboard_card_diagnostics",
card.id,
metadata?.diagnostics,
(entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_diagnostics
(card_id, ordinal, kind, severity, title, detail, first_seen_at, last_seen_at, count, actions_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
card.id,
ordinal,
entry.kind,
entry.severity,
entry.title,
entry.detail,
entry.firstSeenAt,
entry.lastSeenAt,
entry.count,
JSON.stringify(entry.actions),
);
},
);
insertChildren(
db,
"workboard_card_notifications",
card.id,
metadata?.notifications,
(entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_card_notifications
(id, card_id, ordinal, kind, message, created_at, sequence, session_key, run_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
card.id,
ordinal,
entry.kind,
entry.message,
entry.createdAt,
bindNull(entry.sequence),
bindNull(entry.sessionKey),
bindNull(entry.runId),
);
},
);
insertChildren(db, "workboard_worker_logs", card.id, metadata?.workerLogs, (entry, ordinal) => {
db.prepare(
`
INSERT INTO workboard_worker_logs
(id, card_id, ordinal, level, message, created_at, session_key, run_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
entry.id,
card.id,
ordinal,
entry.level,
entry.message,
entry.createdAt,
bindNull(entry.sessionKey),
bindNull(entry.runId),
);
});
db.prepare("DELETE FROM workboard_worker_protocol WHERE card_id = ?").run(card.id);
if (metadata?.workerProtocol) {
db.prepare(
`
INSERT INTO workboard_worker_protocol (card_id, state, updated_at, detail)
VALUES (?, ?, ?, ?)
`,
).run(
card.id,
metadata.workerProtocol.state,
metadata.workerProtocol.updatedAt,
bindNull(metadata.workerProtocol.detail),
);
}
}
class WorkboardSqliteCardStore implements WorkboardKeyedStore {
constructor(private readonly db: DatabaseSync) {}
async register(key: string, value: PersistedWorkboardCard): Promise<void> {
if (value.version !== 1 || value.card.id !== key) {
throw new Error("invalid workboard card payload");
}
runTransaction(this.db, () => insertCard(this.db, value.card));
}
async lookup(key: string): Promise<PersistedWorkboardCard | undefined> {
const row = this.db.prepare("SELECT * FROM workboard_cards WHERE id = ?").get(key) as
| Row
| undefined;
return row ? { version: 1, card: readCard(this.db, row) } : undefined;
}
async delete(key: string): Promise<boolean> {
const result = runTransaction(this.db, () => {
this.db
.prepare(
`
DELETE FROM workboard_attachment_blobs
WHERE attachment_id IN (
SELECT id FROM workboard_card_attachments WHERE card_id = ?
)
`,
)
.run(key);
return this.db.prepare("DELETE FROM workboard_cards WHERE id = ?").run(key);
});
return result.changes > 0;
}
async entries(): Promise<Array<{ key: string; value: PersistedWorkboardCard }>> {
return (
this.db
.prepare("SELECT * FROM workboard_cards ORDER BY created_at ASC, id ASC")
.all() as Row[]
).map((row) => ({
key: requiredString(row, "id"),
value: { version: 1, card: readCard(this.db, row) },
}));
}
}
class WorkboardSqliteBoardStore implements WorkboardKeyedStore<PersistedWorkboardBoard> {
constructor(private readonly db: DatabaseSync) {}
async register(key: string, value: PersistedWorkboardBoard): Promise<void> {
if (value.version !== 1 || value.board.id !== key) {
throw new Error("invalid workboard board payload");
}
const board = value.board;
this.db
.prepare(
`
INSERT INTO workboard_boards (
id, name, description, icon, color, default_workspace_json, orchestration_json,
created_at, updated_at, archived_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
description = excluded.description,
icon = excluded.icon,
color = excluded.color,
default_workspace_json = excluded.default_workspace_json,
orchestration_json = excluded.orchestration_json,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
archived_at = excluded.archived_at
`,
)
.run(
board.id,
bindNull(board.name),
bindNull(board.description),
bindNull(board.icon),
bindNull(board.color),
jsonValue(board.defaultWorkspace),
jsonValue(board.orchestration),
board.createdAt,
board.updatedAt,
bindNull(board.archivedAt),
);
}
async lookup(key: string): Promise<PersistedWorkboardBoard | undefined> {
const row = this.db.prepare("SELECT * FROM workboard_boards WHERE id = ?").get(key) as
| Row
| undefined;
if (!row) {
return undefined;
}
const defaultWorkspace = parseJson(row.default_workspace_json) as
| PersistedWorkboardBoard["board"]["defaultWorkspace"]
| undefined;
const orchestration = parseJson(row.orchestration_json) as
| PersistedWorkboardBoard["board"]["orchestration"]
| undefined;
return {
version: 1,
board: {
id: requiredString(row, "id"),
...(stringValue(row, "name") ? { name: stringValue(row, "name") } : {}),
...(stringValue(row, "description")
? { description: stringValue(row, "description") }
: {}),
...(stringValue(row, "icon") ? { icon: stringValue(row, "icon") } : {}),
...(stringValue(row, "color") ? { color: stringValue(row, "color") } : {}),
...(defaultWorkspace ? { defaultWorkspace } : {}),
...(orchestration ? { orchestration } : {}),
createdAt: requiredNumber(row, "created_at"),
updatedAt: requiredNumber(row, "updated_at"),
...(numberValue(row, "archived_at") !== undefined
? { archivedAt: numberValue(row, "archived_at") }
: {}),
},
};
}
async delete(key: string): Promise<boolean> {
const result = this.db.prepare("DELETE FROM workboard_boards WHERE id = ?").run(key);
return result.changes > 0;
}
async entries(): Promise<Array<{ key: string; value: PersistedWorkboardBoard }>> {
const rows = this.db.prepare("SELECT id FROM workboard_boards ORDER BY id ASC").all() as Row[];
const entries: Array<{ key: string; value: PersistedWorkboardBoard }> = [];
for (const row of rows) {
const key = requiredString(row, "id");
const value = await this.lookup(key);
if (value) {
entries.push({ key, value });
}
}
return entries;
}
}
class WorkboardSqliteSubscriptionStore implements WorkboardKeyedStore<PersistedWorkboardNotificationSubscription> {
constructor(private readonly db: DatabaseSync) {}
async register(key: string, value: PersistedWorkboardNotificationSubscription): Promise<void> {
if (value.version !== 1 || value.subscription.id !== key) {
throw new Error("invalid workboard notification subscription payload");
}
const subscription = value.subscription;
this.db
.prepare(
`
INSERT INTO workboard_notification_subscriptions (
id, board_id, card_id, session_key, run_id, target, event_kinds_json,
last_event_at, last_event_id, last_event_sequence, delivered_event_ids_json,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
board_id = excluded.board_id,
card_id = excluded.card_id,
session_key = excluded.session_key,
run_id = excluded.run_id,
target = excluded.target,
event_kinds_json = excluded.event_kinds_json,
last_event_at = excluded.last_event_at,
last_event_id = excluded.last_event_id,
last_event_sequence = excluded.last_event_sequence,
delivered_event_ids_json = excluded.delivered_event_ids_json,
created_at = excluded.created_at,
updated_at = excluded.updated_at
`,
)
.run(
subscription.id,
subscription.boardId,
bindNull(subscription.cardId),
bindNull(subscription.sessionKey),
bindNull(subscription.runId),
bindNull(subscription.target),
jsonValue(subscription.eventKinds),
bindNull(subscription.lastEventAt),
bindNull(subscription.lastEventId),
bindNull(subscription.lastEventSequence),
jsonValue(subscription.deliveredEventIds),
subscription.createdAt,
subscription.updatedAt,
);
}
async lookup(key: string): Promise<PersistedWorkboardNotificationSubscription | undefined> {
const row = this.db
.prepare("SELECT * FROM workboard_notification_subscriptions WHERE id = ?")
.get(key) as Row | undefined;
if (!row) {
return undefined;
}
const eventKinds = parseJson(row.event_kinds_json) as
| PersistedWorkboardNotificationSubscription["subscription"]["eventKinds"]
| undefined;
const deliveredEventIds = parseJson(row.delivered_event_ids_json) as
| PersistedWorkboardNotificationSubscription["subscription"]["deliveredEventIds"]
| undefined;
return {
version: 1,
subscription: {
id: requiredString(row, "id"),
boardId: requiredString(row, "board_id"),
...(stringValue(row, "card_id") ? { cardId: stringValue(row, "card_id") } : {}),
...(stringValue(row, "session_key") ? { sessionKey: stringValue(row, "session_key") } : {}),
...(stringValue(row, "run_id") ? { runId: stringValue(row, "run_id") } : {}),
...(stringValue(row, "target") ? { target: stringValue(row, "target") } : {}),
...(eventKinds ? { eventKinds } : {}),
...(numberValue(row, "last_event_at") !== undefined
? { lastEventAt: numberValue(row, "last_event_at") }
: {}),
...(stringValue(row, "last_event_id")
? { lastEventId: stringValue(row, "last_event_id") }
: {}),
...(numberValue(row, "last_event_sequence") !== undefined
? { lastEventSequence: numberValue(row, "last_event_sequence") }
: {}),
...(deliveredEventIds ? { deliveredEventIds } : {}),
createdAt: requiredNumber(row, "created_at"),
updatedAt: requiredNumber(row, "updated_at"),
},
};
}
async delete(key: string): Promise<boolean> {
const result = this.db
.prepare("DELETE FROM workboard_notification_subscriptions WHERE id = ?")
.run(key);
return result.changes > 0;
}
async entries(): Promise<
Array<{ key: string; value: PersistedWorkboardNotificationSubscription }>
> {
const rows = this.db
.prepare(
"SELECT id FROM workboard_notification_subscriptions ORDER BY created_at ASC, id ASC",
)
.all() as Row[];
const entries: Array<{ key: string; value: PersistedWorkboardNotificationSubscription }> = [];
for (const row of rows) {
const key = requiredString(row, "id");
const value = await this.lookup(key);
if (value) {
entries.push({ key, value });
}
}
return entries;
}
}
class WorkboardSqliteAttachmentStore implements WorkboardKeyedStore<PersistedWorkboardAttachment> {
constructor(private readonly db: DatabaseSync) {}
async register(key: string, value: PersistedWorkboardAttachment): Promise<void> {
if (value.version !== 1 || value.attachment.id !== key) {
throw new Error("invalid workboard attachment payload");
}
const attachment = value.attachment;
this.db
.prepare(
`
INSERT INTO workboard_attachment_blobs (attachment_id, content)
VALUES (?, ?)
ON CONFLICT(attachment_id) DO UPDATE SET content = excluded.content
`,
)
.run(attachment.id, asBlobContent(value.contentBase64));
}
async lookup(key: string): Promise<PersistedWorkboardAttachment | undefined> {
const row = this.db
.prepare(
`
SELECT a.*, b.content
FROM workboard_card_attachments a
JOIN workboard_attachment_blobs b ON b.attachment_id = a.id
WHERE a.id = ?
`,
)
.get(key) as Row | undefined;
if (!row) {
return undefined;
}
return {
version: 1,
attachment: {
id: requiredString(row, "id"),
cardId: requiredString(row, "card_id"),
createdAt: requiredNumber(row, "created_at"),
fileName: requiredString(row, "file_name"),
byteSize: requiredNumber(row, "byte_size"),
...(stringValue(row, "mime_type") ? { mimeType: stringValue(row, "mime_type") } : {}),
...(stringValue(row, "note") ? { note: stringValue(row, "note") } : {}),
},
contentBase64: blobToBase64(row.content),
};
}
async delete(key: string): Promise<boolean> {
const deleted = runTransaction(this.db, () => {
this.db.prepare("DELETE FROM workboard_attachment_blobs WHERE attachment_id = ?").run(key);
return this.db.prepare("DELETE FROM workboard_card_attachments WHERE id = ?").run(key);
});
return deleted.changes > 0;
}
async entries(): Promise<Array<{ key: string; value: PersistedWorkboardAttachment }>> {
const rows = this.db
.prepare(
`
SELECT a.id
FROM workboard_card_attachments a
JOIN workboard_attachment_blobs b ON b.attachment_id = a.id
ORDER BY a.created_at ASC, a.id ASC
`,
)
.all() as Row[];
const entries: Array<{ key: string; value: PersistedWorkboardAttachment }> = [];
for (const row of rows) {
const key = requiredString(row, "id");
const value = await this.lookup(key);
if (value) {
entries.push({ key, value });
}
}
return entries;
}
}
export function createWorkboardSqliteStores(
options: {
dbPath?: string;
env?: NodeJS.ProcessEnv;
} = {},
): WorkboardSqliteStores {
const db = createDatabase(options.dbPath ?? resolveWorkboardSqlitePath(options.env));
return {
cards: new WorkboardSqliteCardStore(db),
boards: new WorkboardSqliteBoardStore(db),
subscriptions: new WorkboardSqliteSubscriptionStore(db),
attachments: new WorkboardSqliteAttachmentStore(db),
close: () => db.close(),
};
}