mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-29 10:02:04 +00:00
refactor(tasks): rename flow registry modules to task-flow
This commit is contained in:
@@ -1,48 +0,0 @@
|
||||
import { findLatestFlowForOwnerKey, getFlowById, listFlowsForOwnerKey } from "./flow-registry.js";
|
||||
import type { FlowRecord } from "./flow-registry.types.js";
|
||||
|
||||
function normalizeOwnerKey(ownerKey?: string): string | undefined {
|
||||
const trimmed = ownerKey?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function canOwnerAccessFlow(flow: FlowRecord, callerOwnerKey: string): boolean {
|
||||
return normalizeOwnerKey(flow.ownerKey) === normalizeOwnerKey(callerOwnerKey);
|
||||
}
|
||||
|
||||
export function getFlowByIdForOwner(params: {
|
||||
flowId: string;
|
||||
callerOwnerKey: string;
|
||||
}): FlowRecord | undefined {
|
||||
const flow = getFlowById(params.flowId);
|
||||
return flow && canOwnerAccessFlow(flow, params.callerOwnerKey) ? flow : undefined;
|
||||
}
|
||||
|
||||
export function listFlowsForOwner(params: { callerOwnerKey: string }): FlowRecord[] {
|
||||
const ownerKey = normalizeOwnerKey(params.callerOwnerKey);
|
||||
return ownerKey ? listFlowsForOwnerKey(ownerKey) : [];
|
||||
}
|
||||
|
||||
export function findLatestFlowForOwner(params: { callerOwnerKey: string }): FlowRecord | undefined {
|
||||
const ownerKey = normalizeOwnerKey(params.callerOwnerKey);
|
||||
return ownerKey ? findLatestFlowForOwnerKey(ownerKey) : undefined;
|
||||
}
|
||||
|
||||
export function resolveFlowForLookupTokenForOwner(params: {
|
||||
token: string;
|
||||
callerOwnerKey: string;
|
||||
}): FlowRecord | undefined {
|
||||
const direct = getFlowByIdForOwner({
|
||||
flowId: params.token,
|
||||
callerOwnerKey: params.callerOwnerKey,
|
||||
});
|
||||
if (direct) {
|
||||
return direct;
|
||||
}
|
||||
const normalizedToken = normalizeOwnerKey(params.token);
|
||||
const normalizedCallerOwnerKey = normalizeOwnerKey(params.callerOwnerKey);
|
||||
if (!normalizedToken || normalizedToken !== normalizedCallerOwnerKey) {
|
||||
return undefined;
|
||||
}
|
||||
return findLatestFlowForOwner({ callerOwnerKey: normalizedCallerOwnerKey });
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
import path from "node:path";
|
||||
import { resolveTaskStateDir } from "./task-registry.paths.js";
|
||||
|
||||
export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveTaskStateDir(env), "flows");
|
||||
}
|
||||
|
||||
export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveFlowRegistryDir(env), "registry.sqlite");
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
import {
|
||||
closeFlowRegistrySqliteStore,
|
||||
deleteFlowRegistryRecordFromSqlite,
|
||||
loadFlowRegistryStateFromSqlite,
|
||||
saveFlowRegistryStateToSqlite,
|
||||
upsertFlowRegistryRecordToSqlite,
|
||||
} from "./flow-registry.store.sqlite.js";
|
||||
import type { FlowRecord } from "./flow-registry.types.js";
|
||||
|
||||
export type FlowRegistryStoreSnapshot = {
|
||||
flows: Map<string, FlowRecord>;
|
||||
};
|
||||
|
||||
export type FlowRegistryStore = {
|
||||
loadSnapshot: () => FlowRegistryStoreSnapshot;
|
||||
saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void;
|
||||
upsertFlow?: (flow: FlowRecord) => void;
|
||||
deleteFlow?: (flowId: string) => void;
|
||||
close?: () => void;
|
||||
};
|
||||
|
||||
export type FlowRegistryHookEvent =
|
||||
| {
|
||||
kind: "restored";
|
||||
flows: FlowRecord[];
|
||||
}
|
||||
| {
|
||||
kind: "upserted";
|
||||
flow: FlowRecord;
|
||||
previous?: FlowRecord;
|
||||
}
|
||||
| {
|
||||
kind: "deleted";
|
||||
flowId: string;
|
||||
previous: FlowRecord;
|
||||
};
|
||||
|
||||
export type FlowRegistryHooks = {
|
||||
// Hooks are incremental/observational. Snapshot persistence belongs to FlowRegistryStore.
|
||||
onEvent?: (event: FlowRegistryHookEvent) => void;
|
||||
};
|
||||
|
||||
const defaultFlowRegistryStore: FlowRegistryStore = {
|
||||
loadSnapshot: loadFlowRegistryStateFromSqlite,
|
||||
saveSnapshot: saveFlowRegistryStateToSqlite,
|
||||
upsertFlow: upsertFlowRegistryRecordToSqlite,
|
||||
deleteFlow: deleteFlowRegistryRecordFromSqlite,
|
||||
close: closeFlowRegistrySqliteStore,
|
||||
};
|
||||
|
||||
let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore;
|
||||
let configuredFlowRegistryHooks: FlowRegistryHooks | null = null;
|
||||
|
||||
export function getFlowRegistryStore(): FlowRegistryStore {
|
||||
return configuredFlowRegistryStore;
|
||||
}
|
||||
|
||||
export function getFlowRegistryHooks(): FlowRegistryHooks | null {
|
||||
return configuredFlowRegistryHooks;
|
||||
}
|
||||
|
||||
export function configureFlowRegistryRuntime(params: {
|
||||
store?: FlowRegistryStore;
|
||||
hooks?: FlowRegistryHooks | null;
|
||||
}) {
|
||||
if (params.store) {
|
||||
configuredFlowRegistryStore = params.store;
|
||||
}
|
||||
if ("hooks" in params) {
|
||||
configuredFlowRegistryHooks = params.hooks ?? null;
|
||||
}
|
||||
}
|
||||
|
||||
export function resetFlowRegistryRuntimeForTests() {
|
||||
configuredFlowRegistryStore.close?.();
|
||||
configuredFlowRegistryStore = defaultFlowRegistryStore;
|
||||
configuredFlowRegistryHooks = null;
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
export {
|
||||
createFlowForTask,
|
||||
createFlowRecord,
|
||||
createManagedFlow,
|
||||
deleteFlowRecordById,
|
||||
findLatestFlowForOwnerKey,
|
||||
failFlow,
|
||||
finishFlow,
|
||||
getFlowById,
|
||||
listFlowRecords,
|
||||
listFlowsForOwnerKey,
|
||||
requestFlowCancel,
|
||||
resolveFlowForLookupToken,
|
||||
resetFlowRegistryForTests,
|
||||
resumeFlow,
|
||||
setFlowWaiting,
|
||||
syncFlowFromTask,
|
||||
updateFlowRecordByIdExpectedRevision,
|
||||
} from "./flow-registry.js";
|
||||
|
||||
export type { FlowUpdateResult } from "./flow-registry.js";
|
||||
@@ -1,26 +1,26 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
findLatestFlowForOwner,
|
||||
getFlowByIdForOwner,
|
||||
listFlowsForOwner,
|
||||
resolveFlowForLookupTokenForOwner,
|
||||
} from "./flow-owner-access.js";
|
||||
import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js";
|
||||
findLatestTaskFlowForOwner,
|
||||
getTaskFlowByIdForOwner,
|
||||
listTaskFlowsForOwner,
|
||||
resolveTaskFlowForLookupTokenForOwner,
|
||||
} from "./task-flow-owner-access.js";
|
||||
import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
resetTaskFlowRegistryForTests({ persist: false });
|
||||
});
|
||||
|
||||
describe("flow owner access", () => {
|
||||
describe("task flow owner access", () => {
|
||||
it("returns owner-scoped flows for direct and owner-key lookups", () => {
|
||||
const older = createManagedFlow({
|
||||
const older = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/owner-access",
|
||||
goal: "Older flow",
|
||||
createdAt: 100,
|
||||
updatedAt: 100,
|
||||
});
|
||||
const latest = createManagedFlow({
|
||||
const latest = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/owner-access",
|
||||
goal: "Latest flow",
|
||||
@@ -29,56 +29,56 @@ describe("flow owner access", () => {
|
||||
});
|
||||
|
||||
expect(
|
||||
getFlowByIdForOwner({
|
||||
getTaskFlowByIdForOwner({
|
||||
flowId: older.flowId,
|
||||
callerOwnerKey: "agent:main:main",
|
||||
})?.flowId,
|
||||
).toBe(older.flowId);
|
||||
expect(
|
||||
findLatestFlowForOwner({
|
||||
findLatestTaskFlowForOwner({
|
||||
callerOwnerKey: "agent:main:main",
|
||||
})?.flowId,
|
||||
).toBe(latest.flowId);
|
||||
expect(
|
||||
resolveFlowForLookupTokenForOwner({
|
||||
resolveTaskFlowForLookupTokenForOwner({
|
||||
token: "agent:main:main",
|
||||
callerOwnerKey: "agent:main:main",
|
||||
})?.flowId,
|
||||
).toBe(latest.flowId);
|
||||
expect(
|
||||
listFlowsForOwner({
|
||||
listTaskFlowsForOwner({
|
||||
callerOwnerKey: "agent:main:main",
|
||||
}).map((flow) => flow.flowId),
|
||||
).toEqual([latest.flowId, older.flowId]);
|
||||
});
|
||||
|
||||
it("denies cross-owner flow reads", () => {
|
||||
const flow = createManagedFlow({
|
||||
const flow = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/owner-access",
|
||||
goal: "Hidden flow",
|
||||
});
|
||||
|
||||
expect(
|
||||
getFlowByIdForOwner({
|
||||
getTaskFlowByIdForOwner({
|
||||
flowId: flow.flowId,
|
||||
callerOwnerKey: "agent:main:other",
|
||||
}),
|
||||
).toBeUndefined();
|
||||
expect(
|
||||
resolveFlowForLookupTokenForOwner({
|
||||
resolveTaskFlowForLookupTokenForOwner({
|
||||
token: flow.flowId,
|
||||
callerOwnerKey: "agent:main:other",
|
||||
}),
|
||||
).toBeUndefined();
|
||||
expect(
|
||||
resolveFlowForLookupTokenForOwner({
|
||||
resolveTaskFlowForLookupTokenForOwner({
|
||||
token: "agent:main:main",
|
||||
callerOwnerKey: "agent:main:other",
|
||||
}),
|
||||
).toBeUndefined();
|
||||
expect(
|
||||
listFlowsForOwner({
|
||||
listTaskFlowsForOwner({
|
||||
callerOwnerKey: "agent:main:other",
|
||||
}),
|
||||
).toEqual([]);
|
||||
54
src/tasks/task-flow-owner-access.ts
Normal file
54
src/tasks/task-flow-owner-access.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import {
|
||||
findLatestTaskFlowForOwnerKey,
|
||||
getTaskFlowById,
|
||||
listTaskFlowsForOwnerKey,
|
||||
} from "./task-flow-registry.js";
|
||||
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
|
||||
|
||||
function normalizeOwnerKey(ownerKey?: string): string | undefined {
|
||||
const trimmed = ownerKey?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function canOwnerAccessFlow(flow: TaskFlowRecord, callerOwnerKey: string): boolean {
|
||||
return normalizeOwnerKey(flow.ownerKey) === normalizeOwnerKey(callerOwnerKey);
|
||||
}
|
||||
|
||||
export function getTaskFlowByIdForOwner(params: {
|
||||
flowId: string;
|
||||
callerOwnerKey: string;
|
||||
}): TaskFlowRecord | undefined {
|
||||
const flow = getTaskFlowById(params.flowId);
|
||||
return flow && canOwnerAccessFlow(flow, params.callerOwnerKey) ? flow : undefined;
|
||||
}
|
||||
|
||||
export function listTaskFlowsForOwner(params: { callerOwnerKey: string }): TaskFlowRecord[] {
|
||||
const ownerKey = normalizeOwnerKey(params.callerOwnerKey);
|
||||
return ownerKey ? listTaskFlowsForOwnerKey(ownerKey) : [];
|
||||
}
|
||||
|
||||
export function findLatestTaskFlowForOwner(params: {
|
||||
callerOwnerKey: string;
|
||||
}): TaskFlowRecord | undefined {
|
||||
const ownerKey = normalizeOwnerKey(params.callerOwnerKey);
|
||||
return ownerKey ? findLatestTaskFlowForOwnerKey(ownerKey) : undefined;
|
||||
}
|
||||
|
||||
export function resolveTaskFlowForLookupTokenForOwner(params: {
|
||||
token: string;
|
||||
callerOwnerKey: string;
|
||||
}): TaskFlowRecord | undefined {
|
||||
const direct = getTaskFlowByIdForOwner({
|
||||
flowId: params.token,
|
||||
callerOwnerKey: params.callerOwnerKey,
|
||||
});
|
||||
if (direct) {
|
||||
return direct;
|
||||
}
|
||||
const normalizedToken = normalizeOwnerKey(params.token);
|
||||
const normalizedCallerOwnerKey = normalizeOwnerKey(params.callerOwnerKey);
|
||||
if (!normalizedToken || normalizedToken !== normalizedCallerOwnerKey) {
|
||||
return undefined;
|
||||
}
|
||||
return findLatestTaskFlowForOwner({ callerOwnerKey: normalizedCallerOwnerKey });
|
||||
}
|
||||
@@ -5,7 +5,10 @@ import { describe, expect, it } from "vitest";
|
||||
const TASK_ROOT = path.resolve(import.meta.dirname);
|
||||
const SRC_ROOT = path.resolve(TASK_ROOT, "..");
|
||||
|
||||
const ALLOWED_IMPORTERS = new Set(["tasks/flow-owner-access.ts", "tasks/flow-runtime-internal.ts"]);
|
||||
const ALLOWED_IMPORTERS = new Set([
|
||||
"tasks/task-flow-owner-access.ts",
|
||||
"tasks/task-flow-runtime-internal.ts",
|
||||
]);
|
||||
|
||||
async function listSourceFiles(root: string): Promise<string[]> {
|
||||
const entries = await fs.readdir(root, { withFileTypes: true });
|
||||
@@ -24,13 +27,13 @@ async function listSourceFiles(root: string): Promise<string[]> {
|
||||
return files;
|
||||
}
|
||||
|
||||
describe("flow registry import boundary", () => {
|
||||
it("keeps direct flow-registry imports behind approved flow access seams", async () => {
|
||||
describe("task flow registry import boundary", () => {
|
||||
it("keeps direct task-flow-registry imports behind approved task-flow access seams", async () => {
|
||||
const importers: string[] = [];
|
||||
for (const file of await listSourceFiles(SRC_ROOT)) {
|
||||
const relative = path.relative(SRC_ROOT, file).replaceAll(path.sep, "/");
|
||||
const source = await fs.readFile(file, "utf8");
|
||||
if (source.includes("flow-registry.js")) {
|
||||
if (source.includes("task-flow-registry.js")) {
|
||||
importers.push(relative);
|
||||
}
|
||||
}
|
||||
10
src/tasks/task-flow-registry.paths.ts
Normal file
10
src/tasks/task-flow-registry.paths.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import path from "node:path";
|
||||
import { resolveTaskStateDir } from "./task-registry.paths.js";
|
||||
|
||||
export function resolveTaskFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveTaskStateDir(env), "flows");
|
||||
}
|
||||
|
||||
export function resolveTaskFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveTaskFlowRegistryDir(env), "registry.sqlite");
|
||||
}
|
||||
@@ -2,20 +2,23 @@ import { chmodSync, existsSync, mkdirSync } from "node:fs";
|
||||
import type { DatabaseSync, StatementSync } from "node:sqlite";
|
||||
import { requireNodeSqlite } from "../infra/node-sqlite.js";
|
||||
import type { DeliveryContext } from "../utils/delivery-context.js";
|
||||
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
|
||||
import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js";
|
||||
import type { FlowRecord, FlowSyncMode, JsonValue } from "./flow-registry.types.js";
|
||||
import {
|
||||
resolveTaskFlowRegistryDir,
|
||||
resolveTaskFlowRegistrySqlitePath,
|
||||
} from "./task-flow-registry.paths.js";
|
||||
import type { TaskFlowRegistryStoreSnapshot } from "./task-flow-registry.store.js";
|
||||
import type { TaskFlowRecord, TaskFlowSyncMode, JsonValue } from "./task-flow-registry.types.js";
|
||||
|
||||
type FlowRegistryRow = {
|
||||
flow_id: string;
|
||||
sync_mode: FlowSyncMode | null;
|
||||
sync_mode: TaskFlowSyncMode | null;
|
||||
shape?: string | null;
|
||||
owner_key: string;
|
||||
requester_origin_json: string | null;
|
||||
controller_id: string | null;
|
||||
revision: number | bigint | null;
|
||||
status: FlowRecord["status"];
|
||||
notify_policy: FlowRecord["notifyPolicy"];
|
||||
status: TaskFlowRecord["status"];
|
||||
notify_policy: TaskFlowRecord["notifyPolicy"];
|
||||
goal: string;
|
||||
current_step: string | null;
|
||||
blocked_task_id: string | null;
|
||||
@@ -68,14 +71,14 @@ function parseJsonValue<T>(raw: string | null): T | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
function rowToSyncMode(row: FlowRegistryRow): FlowSyncMode {
|
||||
function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode {
|
||||
if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") {
|
||||
return row.sync_mode;
|
||||
}
|
||||
return row.shape === "single_task" ? "task_mirrored" : "managed";
|
||||
}
|
||||
|
||||
function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
|
||||
function rowToFlowRecord(row: FlowRegistryRow): TaskFlowRecord {
|
||||
const endedAt = normalizeNumber(row.ended_at);
|
||||
const cancelRequestedAt = normalizeNumber(row.cancel_requested_at);
|
||||
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
|
||||
@@ -103,7 +106,7 @@ function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
|
||||
};
|
||||
}
|
||||
|
||||
function bindFlowRecord(record: FlowRecord) {
|
||||
function bindFlowRecord(record: TaskFlowRecord) {
|
||||
return {
|
||||
flow_id: record.flowId,
|
||||
sync_mode: record.syncMode,
|
||||
@@ -313,7 +316,7 @@ function ensureSchema(db: DatabaseSync) {
|
||||
}
|
||||
|
||||
function ensureFlowRegistryPermissions(pathname: string) {
|
||||
const dir = resolveFlowRegistryDir(process.env);
|
||||
const dir = resolveTaskFlowRegistryDir(process.env);
|
||||
mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE });
|
||||
chmodSync(dir, FLOW_REGISTRY_DIR_MODE);
|
||||
for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) {
|
||||
@@ -326,7 +329,7 @@ function ensureFlowRegistryPermissions(pathname: string) {
|
||||
}
|
||||
|
||||
function openFlowRegistryDatabase(): FlowRegistryDatabase {
|
||||
const pathname = resolveFlowRegistrySqlitePath(process.env);
|
||||
const pathname = resolveTaskFlowRegistrySqlitePath(process.env);
|
||||
if (cachedDatabase && cachedDatabase.path === pathname) {
|
||||
return cachedDatabase;
|
||||
}
|
||||
@@ -363,7 +366,7 @@ function withWriteTransaction(write: (statements: FlowRegistryStatements) => voi
|
||||
}
|
||||
}
|
||||
|
||||
export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot {
|
||||
export function loadTaskFlowRegistryStateFromSqlite(): TaskFlowRegistryStoreSnapshot {
|
||||
const { statements } = openFlowRegistryDatabase();
|
||||
const rows = statements.selectAll.all() as FlowRegistryRow[];
|
||||
return {
|
||||
@@ -371,7 +374,7 @@ export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot {
|
||||
};
|
||||
}
|
||||
|
||||
export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) {
|
||||
export function saveTaskFlowRegistryStateToSqlite(snapshot: TaskFlowRegistryStoreSnapshot) {
|
||||
withWriteTransaction((statements) => {
|
||||
statements.clearRows.run();
|
||||
for (const flow of snapshot.flows.values()) {
|
||||
@@ -380,19 +383,19 @@ export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapsho
|
||||
});
|
||||
}
|
||||
|
||||
export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) {
|
||||
export function upsertTaskFlowRegistryRecordToSqlite(flow: TaskFlowRecord) {
|
||||
const store = openFlowRegistryDatabase();
|
||||
store.statements.upsertRow.run(bindFlowRecord(flow));
|
||||
ensureFlowRegistryPermissions(store.path);
|
||||
}
|
||||
|
||||
export function deleteFlowRegistryRecordFromSqlite(flowId: string) {
|
||||
export function deleteTaskFlowRegistryRecordFromSqlite(flowId: string) {
|
||||
const store = openFlowRegistryDatabase();
|
||||
store.statements.deleteRow.run(flowId);
|
||||
ensureFlowRegistryPermissions(store.path);
|
||||
}
|
||||
|
||||
export function closeFlowRegistrySqliteStore() {
|
||||
export function closeTaskFlowRegistrySqliteStore() {
|
||||
if (!cachedDatabase) {
|
||||
return;
|
||||
}
|
||||
@@ -2,17 +2,20 @@ import { statSync } from "node:fs";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import {
|
||||
createManagedFlow,
|
||||
getFlowById,
|
||||
createManagedTaskFlow,
|
||||
getTaskFlowById,
|
||||
requestFlowCancel,
|
||||
resetFlowRegistryForTests,
|
||||
resetTaskFlowRegistryForTests,
|
||||
setFlowWaiting,
|
||||
} from "./flow-registry.js";
|
||||
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
|
||||
import { configureFlowRegistryRuntime } from "./flow-registry.store.js";
|
||||
import type { FlowRecord } from "./flow-registry.types.js";
|
||||
} from "./task-flow-registry.js";
|
||||
import {
|
||||
resolveTaskFlowRegistryDir,
|
||||
resolveTaskFlowRegistrySqlitePath,
|
||||
} from "./task-flow-registry.paths.js";
|
||||
import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js";
|
||||
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
|
||||
|
||||
function createStoredFlow(): FlowRecord {
|
||||
function createStoredFlow(): TaskFlowRecord {
|
||||
return {
|
||||
flowId: "flow-restored",
|
||||
syncMode: "managed",
|
||||
@@ -35,18 +38,18 @@ function createStoredFlow(): FlowRecord {
|
||||
}
|
||||
|
||||
async function withFlowRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
|
||||
return await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => {
|
||||
return await withTempDir({ prefix: "openclaw-task-flow-store-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
try {
|
||||
return await run(root);
|
||||
} finally {
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
describe("flow-registry store runtime", () => {
|
||||
describe("task-flow-registry store runtime", () => {
|
||||
beforeEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
@@ -54,7 +57,7 @@ describe("flow-registry store runtime", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
});
|
||||
|
||||
it("uses the configured flow store for restore and save", () => {
|
||||
@@ -63,14 +66,14 @@ describe("flow-registry store runtime", () => {
|
||||
flows: new Map([[storedFlow.flowId, storedFlow]]),
|
||||
}));
|
||||
const saveSnapshot = vi.fn();
|
||||
configureFlowRegistryRuntime({
|
||||
configureTaskFlowRegistryRuntime({
|
||||
store: {
|
||||
loadSnapshot,
|
||||
saveSnapshot,
|
||||
},
|
||||
});
|
||||
|
||||
expect(getFlowById("flow-restored")).toMatchObject({
|
||||
expect(getTaskFlowById("flow-restored")).toMatchObject({
|
||||
flowId: "flow-restored",
|
||||
syncMode: "managed",
|
||||
controllerId: "tests/restored-controller",
|
||||
@@ -81,7 +84,7 @@ describe("flow-registry store runtime", () => {
|
||||
});
|
||||
expect(loadSnapshot).toHaveBeenCalledTimes(1);
|
||||
|
||||
createManagedFlow({
|
||||
createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/new-flow",
|
||||
goal: "New flow",
|
||||
@@ -91,7 +94,7 @@ describe("flow-registry store runtime", () => {
|
||||
|
||||
expect(saveSnapshot).toHaveBeenCalled();
|
||||
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as {
|
||||
flows: ReadonlyMap<string, FlowRecord>;
|
||||
flows: ReadonlyMap<string, TaskFlowRecord>;
|
||||
};
|
||||
expect(latestSnapshot.flows.size).toBe(2);
|
||||
expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow");
|
||||
@@ -100,9 +103,9 @@ describe("flow-registry store runtime", () => {
|
||||
it("restores persisted wait-state, revision, and cancel intent from sqlite", async () => {
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
const created = createManagedFlow({
|
||||
const created = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/persisted-flow",
|
||||
goal: "Persisted flow",
|
||||
@@ -129,9 +132,9 @@ describe("flow-registry store runtime", () => {
|
||||
applied: true,
|
||||
});
|
||||
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
resetTaskFlowRegistryForTests({ persist: false });
|
||||
|
||||
expect(getFlowById(created.flowId)).toMatchObject({
|
||||
expect(getTaskFlowById(created.flowId)).toMatchObject({
|
||||
flowId: created.flowId,
|
||||
syncMode: "managed",
|
||||
controllerId: "tests/persisted-flow",
|
||||
@@ -148,9 +151,9 @@ describe("flow-registry store runtime", () => {
|
||||
it("round-trips explicit json null through sqlite", async () => {
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
const created = createManagedFlow({
|
||||
const created = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/null-roundtrip",
|
||||
goal: "Persist null payloads",
|
||||
@@ -158,9 +161,9 @@ describe("flow-registry store runtime", () => {
|
||||
waitJson: null,
|
||||
});
|
||||
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
resetTaskFlowRegistryForTests({ persist: false });
|
||||
|
||||
expect(getFlowById(created.flowId)).toMatchObject({
|
||||
expect(getTaskFlowById(created.flowId)).toMatchObject({
|
||||
flowId: created.flowId,
|
||||
stateJson: null,
|
||||
waitJson: null,
|
||||
@@ -174,9 +177,9 @@ describe("flow-registry store runtime", () => {
|
||||
}
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
createManagedFlow({
|
||||
createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/secured-flow",
|
||||
goal: "Secured flow",
|
||||
@@ -186,8 +189,8 @@ describe("flow-registry store runtime", () => {
|
||||
waitJson: { kind: "task", taskId: "task-secured" },
|
||||
});
|
||||
|
||||
const registryDir = resolveFlowRegistryDir(process.env);
|
||||
const sqlitePath = resolveFlowRegistrySqlitePath(process.env);
|
||||
const registryDir = resolveTaskFlowRegistryDir(process.env);
|
||||
const sqlitePath = resolveTaskFlowRegistrySqlitePath(process.env);
|
||||
expect(statSync(registryDir).mode & 0o777).toBe(0o700);
|
||||
expect(statSync(sqlitePath).mode & 0o777).toBe(0o600);
|
||||
});
|
||||
78
src/tasks/task-flow-registry.store.ts
Normal file
78
src/tasks/task-flow-registry.store.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
import {
|
||||
closeTaskFlowRegistrySqliteStore,
|
||||
deleteTaskFlowRegistryRecordFromSqlite,
|
||||
loadTaskFlowRegistryStateFromSqlite,
|
||||
saveTaskFlowRegistryStateToSqlite,
|
||||
upsertTaskFlowRegistryRecordToSqlite,
|
||||
} from "./task-flow-registry.store.sqlite.js";
|
||||
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
|
||||
|
||||
export type TaskFlowRegistryStoreSnapshot = {
|
||||
flows: Map<string, TaskFlowRecord>;
|
||||
};
|
||||
|
||||
export type TaskFlowRegistryStore = {
|
||||
loadSnapshot: () => TaskFlowRegistryStoreSnapshot;
|
||||
saveSnapshot: (snapshot: TaskFlowRegistryStoreSnapshot) => void;
|
||||
upsertFlow?: (flow: TaskFlowRecord) => void;
|
||||
deleteFlow?: (flowId: string) => void;
|
||||
close?: () => void;
|
||||
};
|
||||
|
||||
export type TaskFlowRegistryHookEvent =
|
||||
| {
|
||||
kind: "restored";
|
||||
flows: TaskFlowRecord[];
|
||||
}
|
||||
| {
|
||||
kind: "upserted";
|
||||
flow: TaskFlowRecord;
|
||||
previous?: TaskFlowRecord;
|
||||
}
|
||||
| {
|
||||
kind: "deleted";
|
||||
flowId: string;
|
||||
previous: TaskFlowRecord;
|
||||
};
|
||||
|
||||
export type TaskFlowRegistryHooks = {
|
||||
// Hooks are incremental/observational. Snapshot persistence belongs to TaskFlowRegistryStore.
|
||||
onEvent?: (event: TaskFlowRegistryHookEvent) => void;
|
||||
};
|
||||
|
||||
const defaultFlowRegistryStore: TaskFlowRegistryStore = {
|
||||
loadSnapshot: loadTaskFlowRegistryStateFromSqlite,
|
||||
saveSnapshot: saveTaskFlowRegistryStateToSqlite,
|
||||
upsertFlow: upsertTaskFlowRegistryRecordToSqlite,
|
||||
deleteFlow: deleteTaskFlowRegistryRecordFromSqlite,
|
||||
close: closeTaskFlowRegistrySqliteStore,
|
||||
};
|
||||
|
||||
let configuredFlowRegistryStore: TaskFlowRegistryStore = defaultFlowRegistryStore;
|
||||
let configuredFlowRegistryHooks: TaskFlowRegistryHooks | null = null;
|
||||
|
||||
export function getTaskFlowRegistryStore(): TaskFlowRegistryStore {
|
||||
return configuredFlowRegistryStore;
|
||||
}
|
||||
|
||||
export function getTaskFlowRegistryHooks(): TaskFlowRegistryHooks | null {
|
||||
return configuredFlowRegistryHooks;
|
||||
}
|
||||
|
||||
export function configureTaskFlowRegistryRuntime(params: {
|
||||
store?: TaskFlowRegistryStore;
|
||||
hooks?: TaskFlowRegistryHooks | null;
|
||||
}) {
|
||||
if (params.store) {
|
||||
configuredFlowRegistryStore = params.store;
|
||||
}
|
||||
if ("hooks" in params) {
|
||||
configuredFlowRegistryHooks = params.hooks ?? null;
|
||||
}
|
||||
}
|
||||
|
||||
export function resetTaskFlowRegistryRuntimeForTests() {
|
||||
configuredFlowRegistryStore.close?.();
|
||||
configuredFlowRegistryStore = defaultFlowRegistryStore;
|
||||
configuredFlowRegistryHooks = null;
|
||||
}
|
||||
@@ -2,36 +2,36 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import {
|
||||
createFlowRecord,
|
||||
createFlowForTask,
|
||||
createManagedFlow,
|
||||
deleteFlowRecordById,
|
||||
createTaskFlowForTask,
|
||||
createManagedTaskFlow,
|
||||
deleteTaskFlowRecordById,
|
||||
failFlow,
|
||||
getFlowById,
|
||||
listFlowRecords,
|
||||
getTaskFlowById,
|
||||
listTaskFlowRecords,
|
||||
requestFlowCancel,
|
||||
resetFlowRegistryForTests,
|
||||
resetTaskFlowRegistryForTests,
|
||||
resumeFlow,
|
||||
setFlowWaiting,
|
||||
syncFlowFromTask,
|
||||
updateFlowRecordByIdExpectedRevision,
|
||||
} from "./flow-registry.js";
|
||||
import { configureFlowRegistryRuntime } from "./flow-registry.store.js";
|
||||
} from "./task-flow-registry.js";
|
||||
import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js";
|
||||
|
||||
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
|
||||
|
||||
async function withFlowRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
|
||||
return await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => {
|
||||
return await withTempDir({ prefix: "openclaw-task-flow-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
try {
|
||||
return await run(root);
|
||||
} finally {
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
describe("flow-registry", () => {
|
||||
describe("task-flow-registry", () => {
|
||||
beforeEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
@@ -43,15 +43,15 @@ describe("flow-registry", () => {
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
|
||||
}
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
});
|
||||
|
||||
it("creates managed flows and updates them through revision-checked helpers", async () => {
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
const created = createManagedFlow({
|
||||
const created = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/managed-controller",
|
||||
goal: "Investigate flaky test",
|
||||
@@ -151,7 +151,7 @@ describe("flow-registry", () => {
|
||||
}),
|
||||
});
|
||||
|
||||
expect(listFlowRecords()).toEqual([
|
||||
expect(listTaskFlowRecords()).toEqual([
|
||||
expect.objectContaining({
|
||||
flowId: created.flowId,
|
||||
revision: 4,
|
||||
@@ -159,15 +159,15 @@ describe("flow-registry", () => {
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(deleteFlowRecordById(created.flowId)).toBe(true);
|
||||
expect(getFlowById(created.flowId)).toBeUndefined();
|
||||
expect(deleteTaskFlowRecordById(created.flowId)).toBe(true);
|
||||
expect(getTaskFlowById(created.flowId)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("requires a controller for managed flows and rejects clearing it later", async () => {
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
expect(() =>
|
||||
createFlowRecord({
|
||||
@@ -176,7 +176,7 @@ describe("flow-registry", () => {
|
||||
}),
|
||||
).toThrow("Managed flow controllerId is required.");
|
||||
|
||||
const created = createManagedFlow({
|
||||
const created = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/managed-controller",
|
||||
goal: "Protected controller",
|
||||
@@ -196,7 +196,7 @@ describe("flow-registry", () => {
|
||||
|
||||
it("emits restored, upserted, and deleted flow hook events", () => {
|
||||
const onEvent = vi.fn();
|
||||
configureFlowRegistryRuntime({
|
||||
configureTaskFlowRegistryRuntime({
|
||||
store: {
|
||||
loadSnapshot: () => ({
|
||||
flows: new Map(),
|
||||
@@ -208,13 +208,13 @@ describe("flow-registry", () => {
|
||||
},
|
||||
});
|
||||
|
||||
const created = createManagedFlow({
|
||||
const created = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/hooks",
|
||||
goal: "Observe hooks",
|
||||
});
|
||||
|
||||
deleteFlowRecordById(created.flowId);
|
||||
deleteTaskFlowRecordById(created.flowId);
|
||||
|
||||
expect(onEvent).toHaveBeenCalledWith({
|
||||
kind: "restored",
|
||||
@@ -237,7 +237,7 @@ describe("flow-registry", () => {
|
||||
});
|
||||
|
||||
it("normalizes restored managed flows without a controller id", () => {
|
||||
configureFlowRegistryRuntime({
|
||||
configureTaskFlowRegistryRuntime({
|
||||
store: {
|
||||
loadSnapshot: () => ({
|
||||
flows: new Map([
|
||||
@@ -261,7 +261,7 @@ describe("flow-registry", () => {
|
||||
},
|
||||
});
|
||||
|
||||
expect(getFlowById("legacy-managed")).toMatchObject({
|
||||
expect(getTaskFlowById("legacy-managed")).toMatchObject({
|
||||
flowId: "legacy-managed",
|
||||
syncMode: "managed",
|
||||
controllerId: "core/legacy-restored",
|
||||
@@ -271,9 +271,9 @@ describe("flow-registry", () => {
|
||||
it("mirrors one-task flow state from tasks and leaves managed flows alone", async () => {
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
const mirrored = createFlowForTask({
|
||||
const mirrored = createTaskFlowForTask({
|
||||
task: {
|
||||
ownerKey: "agent:main:main",
|
||||
taskId: "task-running",
|
||||
@@ -306,7 +306,7 @@ describe("flow-registry", () => {
|
||||
blockedSummary: "Writable session required.",
|
||||
});
|
||||
|
||||
const managed = createManagedFlow({
|
||||
const managed = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/managed",
|
||||
goal: "Cluster PRs",
|
||||
@@ -337,9 +337,9 @@ describe("flow-registry", () => {
|
||||
it("preserves explicit json null in state and wait payloads", async () => {
|
||||
await withFlowRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskFlowRegistryForTests();
|
||||
|
||||
const created = createManagedFlow({
|
||||
const created = createManagedTaskFlow({
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/null-state",
|
||||
goal: "Null payloads",
|
||||
@@ -1,20 +1,25 @@
|
||||
import crypto from "node:crypto";
|
||||
import {
|
||||
getFlowRegistryHooks,
|
||||
getFlowRegistryStore,
|
||||
resetFlowRegistryRuntimeForTests,
|
||||
type FlowRegistryHookEvent,
|
||||
} from "./flow-registry.store.js";
|
||||
import type { FlowRecord, FlowStatus, FlowSyncMode, JsonValue } from "./flow-registry.types.js";
|
||||
getTaskFlowRegistryHooks,
|
||||
getTaskFlowRegistryStore,
|
||||
resetTaskFlowRegistryRuntimeForTests,
|
||||
type TaskFlowRegistryHookEvent,
|
||||
} from "./task-flow-registry.store.js";
|
||||
import type {
|
||||
TaskFlowRecord,
|
||||
TaskFlowStatus,
|
||||
TaskFlowSyncMode,
|
||||
JsonValue,
|
||||
} from "./task-flow-registry.types.js";
|
||||
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
|
||||
|
||||
const flows = new Map<string, FlowRecord>();
|
||||
const flows = new Map<string, TaskFlowRecord>();
|
||||
let restoreAttempted = false;
|
||||
|
||||
type FlowRecordPatch = Omit<
|
||||
Partial<
|
||||
Pick<
|
||||
FlowRecord,
|
||||
TaskFlowRecord,
|
||||
| "status"
|
||||
| "notifyPolicy"
|
||||
| "goal"
|
||||
@@ -48,15 +53,15 @@ type FlowRecordPatch = Omit<
|
||||
endedAt?: number | null;
|
||||
};
|
||||
|
||||
export type FlowUpdateResult =
|
||||
export type TaskFlowUpdateResult =
|
||||
| {
|
||||
applied: true;
|
||||
flow: FlowRecord;
|
||||
flow: TaskFlowRecord;
|
||||
}
|
||||
| {
|
||||
applied: false;
|
||||
reason: "not_found" | "revision_conflict";
|
||||
current?: FlowRecord;
|
||||
current?: TaskFlowRecord;
|
||||
};
|
||||
|
||||
function cloneStructuredValue<T>(value: T | undefined): T | undefined {
|
||||
@@ -66,7 +71,7 @@ function cloneStructuredValue<T>(value: T | undefined): T | undefined {
|
||||
return structuredClone(value);
|
||||
}
|
||||
|
||||
function cloneFlowRecord(record: FlowRecord): FlowRecord {
|
||||
function cloneFlowRecord(record: TaskFlowRecord): TaskFlowRecord {
|
||||
return {
|
||||
...record,
|
||||
...(record.requesterOrigin
|
||||
@@ -79,7 +84,7 @@ function cloneFlowRecord(record: FlowRecord): FlowRecord {
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeRestoredFlowRecord(record: FlowRecord): FlowRecord {
|
||||
function normalizeRestoredFlowRecord(record: TaskFlowRecord): TaskFlowRecord {
|
||||
const syncMode = record.syncMode === "task_mirrored" ? "task_mirrored" : "managed";
|
||||
const controllerId =
|
||||
syncMode === "managed"
|
||||
@@ -106,12 +111,12 @@ function normalizeRestoredFlowRecord(record: FlowRecord): FlowRecord {
|
||||
};
|
||||
}
|
||||
|
||||
function snapshotFlowRecords(source: ReadonlyMap<string, FlowRecord>): FlowRecord[] {
|
||||
function snapshotFlowRecords(source: ReadonlyMap<string, TaskFlowRecord>): TaskFlowRecord[] {
|
||||
return [...source.values()].map((record) => cloneFlowRecord(record));
|
||||
}
|
||||
|
||||
function emitFlowRegistryHookEvent(createEvent: () => FlowRegistryHookEvent): void {
|
||||
const hooks = getFlowRegistryHooks();
|
||||
function emitFlowRegistryHookEvent(createEvent: () => TaskFlowRegistryHookEvent): void {
|
||||
const hooks = getTaskFlowRegistryHooks();
|
||||
if (!hooks?.onEvent) {
|
||||
return;
|
||||
}
|
||||
@@ -169,9 +174,9 @@ function resolveFlowBlockedSummary(
|
||||
return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined;
|
||||
}
|
||||
|
||||
export function deriveFlowStatusFromTask(
|
||||
export function deriveTaskFlowStatusFromTask(
|
||||
task: Pick<TaskRecord, "status" | "terminalOutcome">,
|
||||
): FlowStatus {
|
||||
): TaskFlowStatus {
|
||||
if (task.status === "queued") {
|
||||
return "queued";
|
||||
}
|
||||
@@ -195,7 +200,7 @@ function ensureFlowRegistryReady() {
|
||||
return;
|
||||
}
|
||||
restoreAttempted = true;
|
||||
const restored = getFlowRegistryStore().loadSnapshot();
|
||||
const restored = getTaskFlowRegistryStore().loadSnapshot();
|
||||
flows.clear();
|
||||
for (const [flowId, flow] of restored.flows) {
|
||||
flows.set(flowId, normalizeRestoredFlowRecord(flow));
|
||||
@@ -207,13 +212,13 @@ function ensureFlowRegistryReady() {
|
||||
}
|
||||
|
||||
function persistFlowRegistry() {
|
||||
getFlowRegistryStore().saveSnapshot({
|
||||
getTaskFlowRegistryStore().saveSnapshot({
|
||||
flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])),
|
||||
});
|
||||
}
|
||||
|
||||
function persistFlowUpsert(flow: FlowRecord) {
|
||||
const store = getFlowRegistryStore();
|
||||
function persistFlowUpsert(flow: TaskFlowRecord) {
|
||||
const store = getTaskFlowRegistryStore();
|
||||
if (store.upsertFlow) {
|
||||
store.upsertFlow(cloneFlowRecord(flow));
|
||||
return;
|
||||
@@ -222,7 +227,7 @@ function persistFlowUpsert(flow: FlowRecord) {
|
||||
}
|
||||
|
||||
function persistFlowDelete(flowId: string) {
|
||||
const store = getFlowRegistryStore();
|
||||
const store = getTaskFlowRegistryStore();
|
||||
if (store.deleteFlow) {
|
||||
store.deleteFlow(flowId);
|
||||
return;
|
||||
@@ -231,12 +236,12 @@ function persistFlowDelete(flowId: string) {
|
||||
}
|
||||
|
||||
function buildFlowRecord(params: {
|
||||
syncMode?: FlowSyncMode;
|
||||
syncMode?: TaskFlowSyncMode;
|
||||
ownerKey: string;
|
||||
requesterOrigin?: FlowRecord["requesterOrigin"];
|
||||
requesterOrigin?: TaskFlowRecord["requesterOrigin"];
|
||||
controllerId?: string | null;
|
||||
revision?: number;
|
||||
status?: FlowStatus;
|
||||
status?: TaskFlowStatus;
|
||||
notifyPolicy?: TaskNotifyPolicy;
|
||||
goal: string;
|
||||
currentStep?: string | null;
|
||||
@@ -248,7 +253,7 @@ function buildFlowRecord(params: {
|
||||
createdAt?: number;
|
||||
updatedAt?: number;
|
||||
endedAt?: number | null;
|
||||
}): FlowRecord {
|
||||
}): TaskFlowRecord {
|
||||
const now = params.createdAt ?? Date.now();
|
||||
const syncMode = params.syncMode ?? "managed";
|
||||
const controllerId = syncMode === "managed" ? assertControllerId(params.controllerId) : undefined;
|
||||
@@ -280,7 +285,7 @@ function buildFlowRecord(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function applyFlowPatch(current: FlowRecord, patch: FlowRecordPatch): FlowRecord {
|
||||
function applyFlowPatch(current: TaskFlowRecord, patch: FlowRecordPatch): TaskFlowRecord {
|
||||
const controllerId =
|
||||
patch.controllerId === undefined ? current.controllerId : normalizeText(patch.controllerId);
|
||||
if (current.syncMode === "managed") {
|
||||
@@ -315,7 +320,7 @@ function applyFlowPatch(current: FlowRecord, patch: FlowRecordPatch): FlowRecord
|
||||
};
|
||||
}
|
||||
|
||||
function writeFlowRecord(next: FlowRecord, previous?: FlowRecord): FlowRecord {
|
||||
function writeFlowRecord(next: TaskFlowRecord, previous?: TaskFlowRecord): TaskFlowRecord {
|
||||
flows.set(next.flowId, next);
|
||||
persistFlowUpsert(next);
|
||||
emitFlowRegistryHookEvent(() => ({
|
||||
@@ -327,12 +332,12 @@ function writeFlowRecord(next: FlowRecord, previous?: FlowRecord): FlowRecord {
|
||||
}
|
||||
|
||||
export function createFlowRecord(params: {
|
||||
syncMode?: FlowSyncMode;
|
||||
syncMode?: TaskFlowSyncMode;
|
||||
ownerKey: string;
|
||||
requesterOrigin?: FlowRecord["requesterOrigin"];
|
||||
requesterOrigin?: TaskFlowRecord["requesterOrigin"];
|
||||
controllerId?: string | null;
|
||||
revision?: number;
|
||||
status?: FlowStatus;
|
||||
status?: TaskFlowStatus;
|
||||
notifyPolicy?: TaskNotifyPolicy;
|
||||
goal: string;
|
||||
currentStep?: string | null;
|
||||
@@ -344,17 +349,17 @@ export function createFlowRecord(params: {
|
||||
createdAt?: number;
|
||||
updatedAt?: number;
|
||||
endedAt?: number | null;
|
||||
}): FlowRecord {
|
||||
}): TaskFlowRecord {
|
||||
ensureFlowRegistryReady();
|
||||
const record = buildFlowRecord(params);
|
||||
return writeFlowRecord(record);
|
||||
}
|
||||
|
||||
export function createManagedFlow(params: {
|
||||
export function createManagedTaskFlow(params: {
|
||||
ownerKey: string;
|
||||
controllerId: string;
|
||||
requesterOrigin?: FlowRecord["requesterOrigin"];
|
||||
status?: FlowStatus;
|
||||
requesterOrigin?: TaskFlowRecord["requesterOrigin"];
|
||||
status?: TaskFlowStatus;
|
||||
notifyPolicy?: TaskNotifyPolicy;
|
||||
goal: string;
|
||||
currentStep?: string | null;
|
||||
@@ -366,7 +371,7 @@ export function createManagedFlow(params: {
|
||||
createdAt?: number;
|
||||
updatedAt?: number;
|
||||
endedAt?: number | null;
|
||||
}): FlowRecord {
|
||||
}): TaskFlowRecord {
|
||||
return createFlowRecord({
|
||||
...params,
|
||||
syncMode: "managed",
|
||||
@@ -374,7 +379,7 @@ export function createManagedFlow(params: {
|
||||
});
|
||||
}
|
||||
|
||||
export function createFlowForTask(params: {
|
||||
export function createTaskFlowForTask(params: {
|
||||
task: Pick<
|
||||
TaskRecord,
|
||||
| "ownerKey"
|
||||
@@ -390,9 +395,9 @@ export function createFlowForTask(params: {
|
||||
| "terminalSummary"
|
||||
| "progressSummary"
|
||||
>;
|
||||
requesterOrigin?: FlowRecord["requesterOrigin"];
|
||||
}): FlowRecord {
|
||||
const terminalFlowStatus = deriveFlowStatusFromTask(params.task);
|
||||
requesterOrigin?: TaskFlowRecord["requesterOrigin"];
|
||||
}): TaskFlowRecord {
|
||||
const terminalFlowStatus = deriveTaskFlowStatusFromTask(params.task);
|
||||
const isTerminal =
|
||||
terminalFlowStatus === "succeeded" ||
|
||||
terminalFlowStatus === "blocked" ||
|
||||
@@ -418,7 +423,10 @@ export function createFlowForTask(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function updateFlowRecordByIdUnchecked(flowId: string, patch: FlowRecordPatch): FlowRecord | null {
|
||||
function updateFlowRecordByIdUnchecked(
|
||||
flowId: string,
|
||||
patch: FlowRecordPatch,
|
||||
): TaskFlowRecord | null {
|
||||
ensureFlowRegistryReady();
|
||||
const current = flows.get(flowId);
|
||||
if (!current) {
|
||||
@@ -431,7 +439,7 @@ export function updateFlowRecordByIdExpectedRevision(params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
patch: FlowRecordPatch;
|
||||
}): FlowUpdateResult {
|
||||
}): TaskFlowUpdateResult {
|
||||
ensureFlowRegistryReady();
|
||||
const current = flows.get(params.flowId);
|
||||
if (!current) {
|
||||
@@ -462,7 +470,7 @@ export function setFlowWaiting(params: {
|
||||
blockedTaskId?: string | null;
|
||||
blockedSummary?: string | null;
|
||||
updatedAt?: number;
|
||||
}): FlowUpdateResult {
|
||||
}): TaskFlowUpdateResult {
|
||||
return updateFlowRecordByIdExpectedRevision({
|
||||
flowId: params.flowId,
|
||||
expectedRevision: params.expectedRevision,
|
||||
@@ -485,11 +493,11 @@ export function setFlowWaiting(params: {
|
||||
export function resumeFlow(params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
status?: Extract<FlowStatus, "queued" | "running">;
|
||||
status?: Extract<TaskFlowStatus, "queued" | "running">;
|
||||
currentStep?: string | null;
|
||||
stateJson?: JsonValue | null;
|
||||
updatedAt?: number;
|
||||
}): FlowUpdateResult {
|
||||
}): TaskFlowUpdateResult {
|
||||
return updateFlowRecordByIdExpectedRevision({
|
||||
flowId: params.flowId,
|
||||
expectedRevision: params.expectedRevision,
|
||||
@@ -513,7 +521,7 @@ export function finishFlow(params: {
|
||||
stateJson?: JsonValue | null;
|
||||
updatedAt?: number;
|
||||
endedAt?: number;
|
||||
}): FlowUpdateResult {
|
||||
}): TaskFlowUpdateResult {
|
||||
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
|
||||
return updateFlowRecordByIdExpectedRevision({
|
||||
flowId: params.flowId,
|
||||
@@ -540,7 +548,7 @@ export function failFlow(params: {
|
||||
blockedSummary?: string | null;
|
||||
updatedAt?: number;
|
||||
endedAt?: number;
|
||||
}): FlowUpdateResult {
|
||||
}): TaskFlowUpdateResult {
|
||||
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
|
||||
return updateFlowRecordByIdExpectedRevision({
|
||||
flowId: params.flowId,
|
||||
@@ -563,7 +571,7 @@ export function requestFlowCancel(params: {
|
||||
expectedRevision: number;
|
||||
cancelRequestedAt?: number;
|
||||
updatedAt?: number;
|
||||
}): FlowUpdateResult {
|
||||
}): TaskFlowUpdateResult {
|
||||
return updateFlowRecordByIdExpectedRevision({
|
||||
flowId: params.flowId,
|
||||
expectedRevision: params.expectedRevision,
|
||||
@@ -589,19 +597,19 @@ export function syncFlowFromTask(
|
||||
| "terminalSummary"
|
||||
| "progressSummary"
|
||||
>,
|
||||
): FlowRecord | null {
|
||||
): TaskFlowRecord | null {
|
||||
const flowId = task.parentFlowId?.trim();
|
||||
if (!flowId) {
|
||||
return null;
|
||||
}
|
||||
const flow = getFlowById(flowId);
|
||||
const flow = getTaskFlowById(flowId);
|
||||
if (!flow) {
|
||||
return null;
|
||||
}
|
||||
if (flow.syncMode !== "task_mirrored") {
|
||||
return flow;
|
||||
}
|
||||
const terminalFlowStatus = deriveFlowStatusFromTask(task);
|
||||
const terminalFlowStatus = deriveTaskFlowStatusFromTask(task);
|
||||
const isTerminal =
|
||||
terminalFlowStatus === "succeeded" ||
|
||||
terminalFlowStatus === "blocked" ||
|
||||
@@ -625,13 +633,13 @@ export function syncFlowFromTask(
|
||||
});
|
||||
}
|
||||
|
||||
export function getFlowById(flowId: string): FlowRecord | undefined {
|
||||
export function getTaskFlowById(flowId: string): TaskFlowRecord | undefined {
|
||||
ensureFlowRegistryReady();
|
||||
const flow = flows.get(flowId);
|
||||
return flow ? cloneFlowRecord(flow) : undefined;
|
||||
}
|
||||
|
||||
export function listFlowsForOwnerKey(ownerKey: string): FlowRecord[] {
|
||||
export function listTaskFlowsForOwnerKey(ownerKey: string): TaskFlowRecord[] {
|
||||
ensureFlowRegistryReady();
|
||||
const normalizedOwnerKey = ownerKey.trim();
|
||||
if (!normalizedOwnerKey) {
|
||||
@@ -643,27 +651,27 @@ export function listFlowsForOwnerKey(ownerKey: string): FlowRecord[] {
|
||||
.toSorted((left, right) => right.createdAt - left.createdAt);
|
||||
}
|
||||
|
||||
export function findLatestFlowForOwnerKey(ownerKey: string): FlowRecord | undefined {
|
||||
const flow = listFlowsForOwnerKey(ownerKey)[0];
|
||||
export function findLatestTaskFlowForOwnerKey(ownerKey: string): TaskFlowRecord | undefined {
|
||||
const flow = listTaskFlowsForOwnerKey(ownerKey)[0];
|
||||
return flow ? cloneFlowRecord(flow) : undefined;
|
||||
}
|
||||
|
||||
export function resolveFlowForLookupToken(token: string): FlowRecord | undefined {
|
||||
export function resolveTaskFlowForLookupToken(token: string): TaskFlowRecord | undefined {
|
||||
const lookup = token.trim();
|
||||
if (!lookup) {
|
||||
return undefined;
|
||||
}
|
||||
return getFlowById(lookup) ?? findLatestFlowForOwnerKey(lookup);
|
||||
return getTaskFlowById(lookup) ?? findLatestTaskFlowForOwnerKey(lookup);
|
||||
}
|
||||
|
||||
export function listFlowRecords(): FlowRecord[] {
|
||||
export function listTaskFlowRecords(): TaskFlowRecord[] {
|
||||
ensureFlowRegistryReady();
|
||||
return [...flows.values()]
|
||||
.map((flow) => cloneFlowRecord(flow))
|
||||
.toSorted((left, right) => right.createdAt - left.createdAt);
|
||||
}
|
||||
|
||||
export function deleteFlowRecordById(flowId: string): boolean {
|
||||
export function deleteTaskFlowRecordById(flowId: string): boolean {
|
||||
ensureFlowRegistryReady();
|
||||
const current = flows.get(flowId);
|
||||
if (!current) {
|
||||
@@ -679,12 +687,12 @@ export function deleteFlowRecordById(flowId: string): boolean {
|
||||
return true;
|
||||
}
|
||||
|
||||
export function resetFlowRegistryForTests(opts?: { persist?: boolean }) {
|
||||
export function resetTaskFlowRegistryForTests(opts?: { persist?: boolean }) {
|
||||
flows.clear();
|
||||
restoreAttempted = false;
|
||||
resetFlowRegistryRuntimeForTests();
|
||||
resetTaskFlowRegistryRuntimeForTests();
|
||||
if (opts?.persist !== false) {
|
||||
persistFlowRegistry();
|
||||
getFlowRegistryStore().close?.();
|
||||
getTaskFlowRegistryStore().close?.();
|
||||
}
|
||||
}
|
||||
@@ -9,9 +9,9 @@ export type JsonValue =
|
||||
| JsonValue[]
|
||||
| { [key: string]: JsonValue };
|
||||
|
||||
export type FlowSyncMode = "task_mirrored" | "managed";
|
||||
export type TaskFlowSyncMode = "task_mirrored" | "managed";
|
||||
|
||||
export type FlowStatus =
|
||||
export type TaskFlowStatus =
|
||||
| "queued"
|
||||
| "running"
|
||||
| "waiting"
|
||||
@@ -21,14 +21,14 @@ export type FlowStatus =
|
||||
| "cancelled"
|
||||
| "lost";
|
||||
|
||||
export type FlowRecord = {
|
||||
export type TaskFlowRecord = {
|
||||
flowId: string;
|
||||
syncMode: FlowSyncMode;
|
||||
syncMode: TaskFlowSyncMode;
|
||||
ownerKey: string;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
controllerId?: string;
|
||||
revision: number;
|
||||
status: FlowStatus;
|
||||
status: TaskFlowStatus;
|
||||
notifyPolicy: TaskNotifyPolicy;
|
||||
goal: string;
|
||||
currentStep?: string;
|
||||
21
src/tasks/task-flow-runtime-internal.ts
Normal file
21
src/tasks/task-flow-runtime-internal.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
export {
|
||||
createTaskFlowForTask,
|
||||
createFlowRecord,
|
||||
createManagedTaskFlow,
|
||||
deleteTaskFlowRecordById,
|
||||
findLatestTaskFlowForOwnerKey,
|
||||
failFlow,
|
||||
finishFlow,
|
||||
getTaskFlowById,
|
||||
listTaskFlowRecords,
|
||||
listTaskFlowsForOwnerKey,
|
||||
requestFlowCancel,
|
||||
resolveTaskFlowForLookupToken,
|
||||
resetTaskFlowRegistryForTests,
|
||||
resumeFlow,
|
||||
setFlowWaiting,
|
||||
syncFlowFromTask,
|
||||
updateFlowRecordByIdExpectedRevision,
|
||||
} from "./task-flow-registry.js";
|
||||
|
||||
export type { TaskFlowUpdateResult } from "./task-flow-registry.js";
|
||||
Reference in New Issue
Block a user