diff --git a/CHANGELOG.md b/CHANGELOG.md index 7287de1711b..be684087773 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ Docs: https://docs.openclaw.ai - Providers/runtime: add provider-owned replay hook surfaces for transcript policy, replay cleanup, and reasoning-mode dispatch. (#59143) Thanks @jalehman. - Diffs: add plugin-owned `viewerBaseUrl` so viewer links can use a stable proxy/public origin without passing `baseUrl` on every tool call. (#59341) Related #59227. Thanks @gumadeiras. - Matrix/plugin: emit spec-compliant `m.mentions` metadata across text sends, media captions, edits, poll fallback text, and action-driven edits so Matrix mentions notify reliably in clients like Element. (#59323) Thanks @gumadeiras. +- Agents/compaction: resolve `agents.defaults.compaction.model` consistently for manual `/compact` and other context-engine compaction paths, so engine-owned compaction uses the configured override model across runtime entrypoints. (#56710) Thanks @oliviareid-svg +- Channels/session routing: move provider-specific session conversation grammar into plugin-owned session-key surfaces, preserving Telegram topic routing and Feishu scoped inheritance across bootstrap, model override, restart, and tool-policy paths. +- WhatsApp/reactions: add `reactionLevel` guidance for agent reactions. Thanks @mcaxtr. +- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01. +- Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky. ### Fixes diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index f5cc4495ab4..0bb59fb12ad 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -13,6 +13,9 @@ const mocks = vi.hoisted(() => ({ tasksShowCommand: vi.fn(), tasksNotifyCommand: vi.fn(), tasksCancelCommand: vi.fn(), + flowsListCommand: vi.fn(), + flowsShowCommand: vi.fn(), + flowsCancelCommand: vi.fn(), setVerbose: vi.fn(), runtime: { log: vi.fn(), @@ -31,6 +34,9 @@ const tasksMaintenanceCommand = mocks.tasksMaintenanceCommand; const tasksShowCommand = mocks.tasksShowCommand; const tasksNotifyCommand = mocks.tasksNotifyCommand; const tasksCancelCommand = mocks.tasksCancelCommand; +const flowsListCommand = mocks.flowsListCommand; +const flowsShowCommand = mocks.flowsShowCommand; +const flowsCancelCommand = mocks.flowsCancelCommand; const setVerbose = mocks.setVerbose; const runtime = mocks.runtime; @@ -59,6 +65,12 @@ vi.mock("../../commands/tasks.js", () => ({ tasksCancelCommand: mocks.tasksCancelCommand, })); +vi.mock("../../commands/flows.js", () => ({ + flowsListCommand: mocks.flowsListCommand, + flowsShowCommand: mocks.flowsShowCommand, + flowsCancelCommand: mocks.flowsCancelCommand, +})); + vi.mock("../../globals.js", () => ({ setVerbose: mocks.setVerbose, })); @@ -87,6 +99,9 @@ describe("registerStatusHealthSessionsCommands", () => { tasksShowCommand.mockResolvedValue(undefined); tasksNotifyCommand.mockResolvedValue(undefined); tasksCancelCommand.mockResolvedValue(undefined); + flowsListCommand.mockResolvedValue(undefined); + flowsShowCommand.mockResolvedValue(undefined); + flowsCancelCommand.mockResolvedValue(undefined); }); it("runs status command with timeout and debug-derived verbose", async () => { @@ -223,6 +238,34 @@ describe("registerStatusHealthSessionsCommands", () => { ); }); + it("runs flows subcommands with forwarded options", async () => { + await runCli(["flows", "list", "--json", "--status", "blocked"]); + expect(flowsListCommand).toHaveBeenCalledWith( + expect.objectContaining({ + json: true, + status: "blocked", + }), + runtime, + ); + + await runCli(["flows", "show", "flow-123", "--json"]); + expect(flowsShowCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "flow-123", + json: true, + }), + runtime, + ); + + await runCli(["flows", "cancel", "flow-123"]); + expect(flowsCancelCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "flow-123", + }), + runtime, + ); + }); + it("forwards parent-level all-agents to cleanup subcommand", async () => { await runCli(["sessions", "--all-agents", "cleanup", "--dry-run"]); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 1467be8fe72..75ab6bd8418 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -1,4 +1,5 @@ import type { Command } from "commander"; +import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "../../commands/flows.js"; import { healthCommand } from "../../commands/health.js"; import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; @@ -373,4 +374,79 @@ export function registerStatusHealthSessionsCommands(program: Command) { ); }); }); + + const flowsCmd = program + .command("flows") + .description("Inspect durable background flow state") + .option("--json", "Output as JSON", false) + .option( + "--status ", + "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", + ) + .action(async (opts) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsListCommand( + { + json: Boolean(opts.json), + status: opts.status as string | undefined, + }, + defaultRuntime, + ); + }); + }); + flowsCmd.enablePositionalOptions(); + + flowsCmd + .command("list") + .description("List tracked background flows") + .option("--json", "Output as JSON", false) + .option( + "--status ", + "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", + ) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean; status?: string } | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsListCommand( + { + json: Boolean(opts.json || parentOpts?.json), + status: (opts.status as string | undefined) ?? parentOpts?.status, + }, + defaultRuntime, + ); + }); + }); + + flowsCmd + .command("show") + .description("Show one background flow by flow id or owner key") + .argument("", "Flow id or owner key") + .option("--json", "Output as JSON", false) + .action(async (lookup, opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsShowCommand( + { + lookup, + json: Boolean(opts.json || parentOpts?.json), + }, + defaultRuntime, + ); + }); + }); + + flowsCmd + .command("cancel") + .description("Cancel a running background flow") + .argument("", "Flow id or owner key") + .action(async (lookup) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsCancelCommand( + { + lookup, + }, + defaultRuntime, + ); + }); + }); } diff --git a/src/commands/doctor-workspace-status.test.ts b/src/commands/doctor-workspace-status.test.ts index d3803b22ccf..0072309d7a9 100644 --- a/src/commands/doctor-workspace-status.test.ts +++ b/src/commands/doctor-workspace-status.test.ts @@ -13,6 +13,8 @@ const mocks = vi.hoisted(() => ({ buildWorkspaceSkillStatus: vi.fn(), buildPluginStatusReport: vi.fn(), buildPluginCompatibilityWarnings: vi.fn(), + listFlowRecords: vi.fn<() => unknown[]>(() => []), + listTasksForFlowId: vi.fn<(flowId: string) => unknown[]>((_flowId: string) => []), })); vi.mock("../agents/agent-scope.js", () => ({ @@ -30,9 +32,21 @@ vi.mock("../plugins/status.js", () => ({ mocks.buildPluginCompatibilityWarnings(...args), })); +vi.mock("../tasks/flow-runtime-internal.js", () => ({ + listFlowRecords: () => mocks.listFlowRecords(), +})); + +vi.mock("../tasks/runtime-internal.js", () => ({ + listTasksForFlowId: (flowId: string) => mocks.listTasksForFlowId(flowId), +})); + async function runNoteWorkspaceStatusForTest( loadResult: ReturnType, compatibilityWarnings: string[] = [], + opts?: { + flows?: unknown[]; + tasksByFlowId?: (flowId: string) => unknown[]; + }, ) { mocks.resolveDefaultAgentId.mockReturnValue("default"); mocks.resolveAgentWorkspaceDir.mockReturnValue("/workspace"); @@ -44,6 +58,10 @@ async function runNoteWorkspaceStatusForTest( ...loadResult, }); mocks.buildPluginCompatibilityWarnings.mockReturnValue(compatibilityWarnings); + mocks.listFlowRecords.mockReturnValue(opts?.flows ?? []); + mocks.listTasksForFlowId.mockImplementation((flowId: string) => + opts?.tasksByFlowId ? opts.tasksByFlowId(flowId) : [], + ); const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); noteWorkspaceStatus({}); @@ -159,4 +177,32 @@ describe("noteWorkspaceStatus", () => { noteSpy.mockRestore(); } }); + + it("adds TaskFlow recovery hints for broken blocked flows", async () => { + const noteSpy = await runNoteWorkspaceStatusForTest(createPluginLoadResult(), [], { + flows: [ + { + flowId: "flow-123", + syncMode: "managed", + ownerKey: "agent:main:main", + revision: 0, + status: "blocked", + notifyPolicy: "done_only", + goal: "Investigate PR batch", + blockedTaskId: "task-missing", + createdAt: 100, + updatedAt: 100, + }, + ], + tasksByFlowId: () => [], + }); + try { + const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "TaskFlow recovery"); + expect(recoveryCalls).toHaveLength(1); + expect(String(recoveryCalls[0]?.[0])).toContain("flow-123"); + expect(String(recoveryCalls[0]?.[0])).toContain("openclaw flows show "); + } finally { + noteSpy.mockRestore(); + } + }); }); diff --git a/src/commands/doctor-workspace-status.ts b/src/commands/doctor-workspace-status.ts index c4c8d0a4fea..3a7a14949bd 100644 --- a/src/commands/doctor-workspace-status.ts +++ b/src/commands/doctor-workspace-status.ts @@ -1,10 +1,54 @@ import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { buildWorkspaceSkillStatus } from "../agents/skills-status.js"; +import { formatCliCommand } from "../cli/command-format.js"; import type { OpenClawConfig } from "../config/config.js"; import { buildPluginCompatibilityWarnings, buildPluginStatusReport } from "../plugins/status.js"; +import { listFlowRecords } from "../tasks/flow-runtime-internal.js"; +import { listTasksForFlowId } from "../tasks/runtime-internal.js"; import { note } from "../terminal/note.js"; import { detectLegacyWorkspaceDirs, formatLegacyWorkspaceWarning } from "./doctor-workspace.js"; +function noteFlowRecoveryHints() { + const suspicious = listFlowRecords().flatMap((flow) => { + const tasks = listTasksForFlowId(flow.flowId); + const findings: string[] = []; + if ( + flow.syncMode === "managed" && + flow.status === "running" && + tasks.length === 0 && + flow.waitJson === undefined + ) { + findings.push( + `${flow.flowId}: running managed flow has no linked tasks or wait state; inspect or cancel it manually.`, + ); + } + if ( + flow.status === "blocked" && + flow.blockedTaskId && + !tasks.some((task) => task.taskId === flow.blockedTaskId) + ) { + findings.push( + `${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`, + ); + } + return findings; + }); + if (suspicious.length === 0) { + return; + } + note( + [ + ...suspicious.slice(0, 5), + suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null, + `Inspect: ${formatCliCommand("openclaw flows show ")}`, + `Cancel: ${formatCliCommand("openclaw flows cancel ")}`, + ] + .filter((line): line is string => Boolean(line)) + .join("\n"), + "TaskFlow recovery", + ); +} + export function noteWorkspaceStatus(cfg: OpenClawConfig) { const workspaceDir = resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg)); const legacyWorkspace = detectLegacyWorkspaceDirs({ workspaceDir }); @@ -74,5 +118,7 @@ export function noteWorkspaceStatus(cfg: OpenClawConfig) { note(lines.join("\n"), "Plugin diagnostics"); } + noteFlowRecoveryHints(); + return { workspaceDir }; } diff --git a/src/commands/flows.ts b/src/commands/flows.ts new file mode 100644 index 00000000000..eb1ee0efb28 --- /dev/null +++ b/src/commands/flows.ts @@ -0,0 +1,247 @@ +import { loadConfig } from "../config/config.js"; +import { info } from "../globals.js"; +import type { RuntimeEnv } from "../runtime.js"; +import type { FlowRecord, FlowStatus } from "../tasks/flow-registry.types.js"; +import { + getFlowById, + listFlowRecords, + resolveFlowForLookupToken, +} from "../tasks/flow-runtime-internal.js"; +import { listTasksForFlowId } from "../tasks/runtime-internal.js"; +import { cancelFlowById, getFlowTaskSummary } from "../tasks/task-executor.js"; +import { isRich, theme } from "../terminal/theme.js"; + +const ID_PAD = 10; +const STATUS_PAD = 10; +const MODE_PAD = 14; +const REV_PAD = 6; +const CTRL_PAD = 20; + +function truncate(value: string, maxChars: number) { + if (value.length <= maxChars) { + return value; + } + if (maxChars <= 1) { + return value.slice(0, maxChars); + } + return `${value.slice(0, maxChars - 1)}…`; +} + +function shortToken(value: string | undefined, maxChars = ID_PAD): string { + const trimmed = value?.trim(); + if (!trimmed) { + return "n/a"; + } + return truncate(trimmed, maxChars); +} + +function formatFlowStatusCell(status: FlowStatus, rich: boolean) { + const padded = status.padEnd(STATUS_PAD); + if (!rich) { + return padded; + } + if (status === "succeeded") { + return theme.success(padded); + } + if (status === "failed" || status === "lost") { + return theme.error(padded); + } + if (status === "running") { + return theme.accentBright(padded); + } + if (status === "blocked") { + return theme.warn(padded); + } + return theme.muted(padded); +} + +function formatFlowRows(flows: FlowRecord[], rich: boolean) { + const header = [ + "TaskFlow".padEnd(ID_PAD), + "Mode".padEnd(MODE_PAD), + "Status".padEnd(STATUS_PAD), + "Rev".padEnd(REV_PAD), + "Controller".padEnd(CTRL_PAD), + "Tasks".padEnd(14), + "Goal", + ].join(" "); + const lines = [rich ? theme.heading(header) : header]; + for (const flow of flows) { + const taskSummary = getFlowTaskSummary(flow.flowId); + const counts = `${taskSummary.active} active/${taskSummary.total} total`; + lines.push( + [ + shortToken(flow.flowId).padEnd(ID_PAD), + flow.syncMode.padEnd(MODE_PAD), + formatFlowStatusCell(flow.status, rich), + String(flow.revision).padEnd(REV_PAD), + truncate(flow.controllerId ?? "n/a", CTRL_PAD).padEnd(CTRL_PAD), + counts.padEnd(14), + truncate(flow.goal, 80), + ].join(" "), + ); + } + return lines; +} + +function formatFlowListSummary(flows: FlowRecord[]) { + const active = flows.filter( + (flow) => flow.status === "queued" || flow.status === "running", + ).length; + const blocked = flows.filter((flow) => flow.status === "blocked").length; + const cancelRequested = flows.filter((flow) => flow.cancelRequestedAt != null).length; + return `${active} active · ${blocked} blocked · ${cancelRequested} cancel-requested · ${flows.length} total`; +} + +function summarizeWait(flow: FlowRecord): string { + if (flow.waitJson == null) { + return "n/a"; + } + if ( + typeof flow.waitJson === "string" || + typeof flow.waitJson === "number" || + typeof flow.waitJson === "boolean" + ) { + return String(flow.waitJson); + } + if (Array.isArray(flow.waitJson)) { + return `array(${flow.waitJson.length})`; + } + return Object.keys(flow.waitJson).toSorted().join(", ") || "object"; +} + +export async function flowsListCommand( + opts: { json?: boolean; status?: string }, + runtime: RuntimeEnv, +) { + const statusFilter = opts.status?.trim(); + const flows = listFlowRecords().filter((flow) => { + if (statusFilter && flow.status !== statusFilter) { + return false; + } + return true; + }); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + count: flows.length, + status: statusFilter ?? null, + flows: flows.map((flow) => ({ + ...flow, + tasks: listTasksForFlowId(flow.flowId), + taskSummary: getFlowTaskSummary(flow.flowId), + })), + }, + null, + 2, + ), + ); + return; + } + + runtime.log(info(`TaskFlows: ${flows.length}`)); + runtime.log(info(`TaskFlow pressure: ${formatFlowListSummary(flows)}`)); + if (statusFilter) { + runtime.log(info(`Status filter: ${statusFilter}`)); + } + if (flows.length === 0) { + runtime.log("No TaskFlows found."); + return; + } + const rich = isRich(); + for (const line of formatFlowRows(flows, rich)) { + runtime.log(line); + } +} + +export async function flowsShowCommand( + opts: { json?: boolean; lookup: string }, + runtime: RuntimeEnv, +) { + const flow = resolveFlowForLookupToken(opts.lookup); + if (!flow) { + runtime.error(`TaskFlow not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const tasks = listTasksForFlowId(flow.flowId); + const taskSummary = getFlowTaskSummary(flow.flowId); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + ...flow, + tasks, + taskSummary, + }, + null, + 2, + ), + ); + return; + } + + const lines = [ + "TaskFlow:", + `flowId: ${flow.flowId}`, + `syncMode: ${flow.syncMode}`, + `status: ${flow.status}`, + `notify: ${flow.notifyPolicy}`, + `ownerKey: ${flow.ownerKey}`, + `controllerId: ${flow.controllerId ?? "n/a"}`, + `revision: ${flow.revision}`, + `goal: ${flow.goal}`, + `currentStep: ${flow.currentStep ?? "n/a"}`, + `blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`, + `blockedSummary: ${flow.blockedSummary ?? "n/a"}`, + `wait: ${summarizeWait(flow)}`, + `cancelRequestedAt: ${ + flow.cancelRequestedAt ? new Date(flow.cancelRequestedAt).toISOString() : "n/a" + }`, + `createdAt: ${new Date(flow.createdAt).toISOString()}`, + `updatedAt: ${new Date(flow.updatedAt).toISOString()}`, + `endedAt: ${flow.endedAt ? new Date(flow.endedAt).toISOString() : "n/a"}`, + `tasks: ${taskSummary.total} total · ${taskSummary.active} active · ${taskSummary.failures} issues`, + ]; + for (const line of lines) { + runtime.log(line); + } + if (tasks.length === 0) { + runtime.log("Linked tasks: none"); + return; + } + runtime.log("Linked tasks:"); + for (const task of tasks) { + runtime.log( + `- ${task.taskId} ${task.status} ${task.runId ?? "n/a"} ${task.label ?? task.task}`, + ); + } +} + +export async function flowsCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) { + const flow = resolveFlowForLookupToken(opts.lookup); + if (!flow) { + runtime.error(`Flow not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const result = await cancelFlowById({ + cfg: loadConfig(), + flowId: flow.flowId, + }); + if (!result.found) { + runtime.error(result.reason ?? `Flow not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + if (!result.cancelled) { + runtime.error(result.reason ?? `Could not cancel TaskFlow: ${opts.lookup}`); + runtime.exit(1); + return; + } + const updated = getFlowById(flow.flowId) ?? result.flow ?? flow; + runtime.log(`Cancelled ${updated.flowId} (${updated.syncMode}) with status ${updated.status}.`); +} diff --git a/src/tasks/flow-owner-access.test.ts b/src/tasks/flow-owner-access.test.ts new file mode 100644 index 00000000000..6f0e90f916b --- /dev/null +++ b/src/tasks/flow-owner-access.test.ts @@ -0,0 +1,86 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + findLatestFlowForOwner, + getFlowByIdForOwner, + listFlowsForOwner, + resolveFlowForLookupTokenForOwner, +} from "./flow-owner-access.js"; +import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js"; + +afterEach(() => { + resetFlowRegistryForTests({ persist: false }); +}); + +describe("flow owner access", () => { + it("returns owner-scoped flows for direct and owner-key lookups", () => { + const older = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/owner-access", + goal: "Older flow", + createdAt: 100, + updatedAt: 100, + }); + const latest = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/owner-access", + goal: "Latest flow", + createdAt: 200, + updatedAt: 200, + }); + + expect( + getFlowByIdForOwner({ + flowId: older.flowId, + callerOwnerKey: "agent:main:main", + })?.flowId, + ).toBe(older.flowId); + expect( + findLatestFlowForOwner({ + callerOwnerKey: "agent:main:main", + })?.flowId, + ).toBe(latest.flowId); + expect( + resolveFlowForLookupTokenForOwner({ + token: "agent:main:main", + callerOwnerKey: "agent:main:main", + })?.flowId, + ).toBe(latest.flowId); + expect( + listFlowsForOwner({ + callerOwnerKey: "agent:main:main", + }).map((flow) => flow.flowId), + ).toEqual([latest.flowId, older.flowId]); + }); + + it("denies cross-owner flow reads", () => { + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/owner-access", + goal: "Hidden flow", + }); + + expect( + getFlowByIdForOwner({ + flowId: flow.flowId, + callerOwnerKey: "agent:main:other", + }), + ).toBeUndefined(); + expect( + resolveFlowForLookupTokenForOwner({ + token: flow.flowId, + callerOwnerKey: "agent:main:other", + }), + ).toBeUndefined(); + expect( + resolveFlowForLookupTokenForOwner({ + token: "agent:main:main", + callerOwnerKey: "agent:main:other", + }), + ).toBeUndefined(); + expect( + listFlowsForOwner({ + callerOwnerKey: "agent:main:other", + }), + ).toEqual([]); + }); +}); diff --git a/src/tasks/flow-owner-access.ts b/src/tasks/flow-owner-access.ts new file mode 100644 index 00000000000..36bf2f156c0 --- /dev/null +++ b/src/tasks/flow-owner-access.ts @@ -0,0 +1,48 @@ +import { findLatestFlowForOwnerKey, getFlowById, listFlowsForOwnerKey } from "./flow-registry.js"; +import type { FlowRecord } from "./flow-registry.types.js"; + +function normalizeOwnerKey(ownerKey?: string): string | undefined { + const trimmed = ownerKey?.trim(); + return trimmed ? trimmed : undefined; +} + +function canOwnerAccessFlow(flow: FlowRecord, callerOwnerKey: string): boolean { + return normalizeOwnerKey(flow.ownerKey) === normalizeOwnerKey(callerOwnerKey); +} + +export function getFlowByIdForOwner(params: { + flowId: string; + callerOwnerKey: string; +}): FlowRecord | undefined { + const flow = getFlowById(params.flowId); + return flow && canOwnerAccessFlow(flow, params.callerOwnerKey) ? flow : undefined; +} + +export function listFlowsForOwner(params: { callerOwnerKey: string }): FlowRecord[] { + const ownerKey = normalizeOwnerKey(params.callerOwnerKey); + return ownerKey ? listFlowsForOwnerKey(ownerKey) : []; +} + +export function findLatestFlowForOwner(params: { callerOwnerKey: string }): FlowRecord | undefined { + const ownerKey = normalizeOwnerKey(params.callerOwnerKey); + return ownerKey ? findLatestFlowForOwnerKey(ownerKey) : undefined; +} + +export function resolveFlowForLookupTokenForOwner(params: { + token: string; + callerOwnerKey: string; +}): FlowRecord | undefined { + const direct = getFlowByIdForOwner({ + flowId: params.token, + callerOwnerKey: params.callerOwnerKey, + }); + if (direct) { + return direct; + } + const normalizedToken = normalizeOwnerKey(params.token); + const normalizedCallerOwnerKey = normalizeOwnerKey(params.callerOwnerKey); + if (!normalizedToken || normalizedToken !== normalizedCallerOwnerKey) { + return undefined; + } + return findLatestFlowForOwner({ callerOwnerKey: normalizedCallerOwnerKey }); +} diff --git a/src/tasks/flow-registry-import-boundary.test.ts b/src/tasks/flow-registry-import-boundary.test.ts new file mode 100644 index 00000000000..1004164f272 --- /dev/null +++ b/src/tasks/flow-registry-import-boundary.test.ts @@ -0,0 +1,39 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; + +const TASK_ROOT = path.resolve(import.meta.dirname); +const SRC_ROOT = path.resolve(TASK_ROOT, ".."); + +const ALLOWED_IMPORTERS = new Set(["tasks/flow-owner-access.ts", "tasks/flow-runtime-internal.ts"]); + +async function listSourceFiles(root: string): Promise { + const entries = await fs.readdir(root, { withFileTypes: true }); + const files: string[] = []; + for (const entry of entries) { + const fullPath = path.join(root, entry.name); + if (entry.isDirectory()) { + files.push(...(await listSourceFiles(fullPath))); + continue; + } + if (!entry.isFile() || !entry.name.endsWith(".ts") || entry.name.endsWith(".test.ts")) { + continue; + } + files.push(fullPath); + } + return files; +} + +describe("flow registry import boundary", () => { + it("keeps direct flow-registry imports behind approved flow access seams", async () => { + const importers: string[] = []; + for (const file of await listSourceFiles(SRC_ROOT)) { + const relative = path.relative(SRC_ROOT, file).replaceAll(path.sep, "/"); + const source = await fs.readFile(file, "utf8"); + if (source.includes("flow-registry.js")) { + importers.push(relative); + } + } + expect(importers.toSorted()).toEqual([...ALLOWED_IMPORTERS].toSorted()); + }); +}); diff --git a/src/tasks/flow-registry.paths.ts b/src/tasks/flow-registry.paths.ts new file mode 100644 index 00000000000..8d35c657434 --- /dev/null +++ b/src/tasks/flow-registry.paths.ts @@ -0,0 +1,10 @@ +import path from "node:path"; +import { resolveTaskStateDir } from "./task-registry.paths.js"; + +export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveTaskStateDir(env), "flows"); +} + +export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveFlowRegistryDir(env), "registry.sqlite"); +} diff --git a/src/tasks/flow-registry.store.sqlite.ts b/src/tasks/flow-registry.store.sqlite.ts new file mode 100644 index 00000000000..0b836939841 --- /dev/null +++ b/src/tasks/flow-registry.store.sqlite.ts @@ -0,0 +1,401 @@ +import { chmodSync, existsSync, mkdirSync } from "node:fs"; +import type { DatabaseSync, StatementSync } from "node:sqlite"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import type { DeliveryContext } from "../utils/delivery-context.js"; +import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; +import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js"; +import type { FlowRecord, FlowSyncMode, JsonValue } from "./flow-registry.types.js"; + +type FlowRegistryRow = { + flow_id: string; + sync_mode: FlowSyncMode | null; + shape?: string | null; + owner_key: string; + requester_origin_json: string | null; + controller_id: string | null; + revision: number | bigint | null; + status: FlowRecord["status"]; + notify_policy: FlowRecord["notifyPolicy"]; + goal: string; + current_step: string | null; + blocked_task_id: string | null; + blocked_summary: string | null; + state_json: string | null; + wait_json: string | null; + cancel_requested_at: number | bigint | null; + created_at: number | bigint; + updated_at: number | bigint; + ended_at: number | bigint | null; +}; + +type FlowRegistryStatements = { + selectAll: StatementSync; + upsertRow: StatementSync; + deleteRow: StatementSync; + clearRows: StatementSync; +}; + +type FlowRegistryDatabase = { + db: DatabaseSync; + path: string; + statements: FlowRegistryStatements; +}; + +let cachedDatabase: FlowRegistryDatabase | null = null; +const FLOW_REGISTRY_DIR_MODE = 0o700; +const FLOW_REGISTRY_FILE_MODE = 0o600; +const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; + +function normalizeNumber(value: number | bigint | null): number | undefined { + if (typeof value === "bigint") { + return Number(value); + } + return typeof value === "number" ? value : undefined; +} + +function serializeJson(value: unknown): string | null { + return value === undefined ? null : JSON.stringify(value); +} + +function parseJsonValue(raw: string | null): T | undefined { + if (!raw?.trim()) { + return undefined; + } + try { + return JSON.parse(raw) as T; + } catch { + return undefined; + } +} + +function rowToSyncMode(row: FlowRegistryRow): FlowSyncMode { + if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") { + return row.sync_mode; + } + return row.shape === "single_task" ? "task_mirrored" : "managed"; +} + +function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { + const endedAt = normalizeNumber(row.ended_at); + const cancelRequestedAt = normalizeNumber(row.cancel_requested_at); + const requesterOrigin = parseJsonValue(row.requester_origin_json); + const stateJson = parseJsonValue(row.state_json); + const waitJson = parseJsonValue(row.wait_json); + return { + flowId: row.flow_id, + syncMode: rowToSyncMode(row), + ownerKey: row.owner_key, + ...(requesterOrigin ? { requesterOrigin } : {}), + ...(row.controller_id ? { controllerId: row.controller_id } : {}), + revision: normalizeNumber(row.revision) ?? 0, + status: row.status, + notifyPolicy: row.notify_policy, + goal: row.goal, + ...(row.current_step ? { currentStep: row.current_step } : {}), + ...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}), + ...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}), + ...(stateJson !== undefined ? { stateJson } : {}), + ...(waitJson !== undefined ? { waitJson } : {}), + ...(cancelRequestedAt != null ? { cancelRequestedAt } : {}), + createdAt: normalizeNumber(row.created_at) ?? 0, + updatedAt: normalizeNumber(row.updated_at) ?? 0, + ...(endedAt != null ? { endedAt } : {}), + }; +} + +function bindFlowRecord(record: FlowRecord) { + return { + flow_id: record.flowId, + sync_mode: record.syncMode, + owner_key: record.ownerKey, + requester_origin_json: serializeJson(record.requesterOrigin), + controller_id: record.controllerId ?? null, + revision: record.revision, + status: record.status, + notify_policy: record.notifyPolicy, + goal: record.goal, + current_step: record.currentStep ?? null, + blocked_task_id: record.blockedTaskId ?? null, + blocked_summary: record.blockedSummary ?? null, + state_json: serializeJson(record.stateJson), + wait_json: serializeJson(record.waitJson), + cancel_requested_at: record.cancelRequestedAt ?? null, + created_at: record.createdAt, + updated_at: record.updatedAt, + ended_at: record.endedAt ?? null, + }; +} + +function createStatements(db: DatabaseSync): FlowRegistryStatements { + return { + selectAll: db.prepare(` + SELECT + flow_id, + sync_mode, + shape, + owner_key, + requester_origin_json, + controller_id, + revision, + status, + notify_policy, + goal, + current_step, + blocked_task_id, + blocked_summary, + state_json, + wait_json, + cancel_requested_at, + created_at, + updated_at, + ended_at + FROM flow_runs + ORDER BY created_at ASC, flow_id ASC + `), + upsertRow: db.prepare(` + INSERT INTO flow_runs ( + flow_id, + sync_mode, + owner_key, + requester_origin_json, + controller_id, + revision, + status, + notify_policy, + goal, + current_step, + blocked_task_id, + blocked_summary, + state_json, + wait_json, + cancel_requested_at, + created_at, + updated_at, + ended_at + ) VALUES ( + @flow_id, + @sync_mode, + @owner_key, + @requester_origin_json, + @controller_id, + @revision, + @status, + @notify_policy, + @goal, + @current_step, + @blocked_task_id, + @blocked_summary, + @state_json, + @wait_json, + @cancel_requested_at, + @created_at, + @updated_at, + @ended_at + ) + ON CONFLICT(flow_id) DO UPDATE SET + sync_mode = excluded.sync_mode, + owner_key = excluded.owner_key, + requester_origin_json = excluded.requester_origin_json, + controller_id = excluded.controller_id, + revision = excluded.revision, + status = excluded.status, + notify_policy = excluded.notify_policy, + goal = excluded.goal, + current_step = excluded.current_step, + blocked_task_id = excluded.blocked_task_id, + blocked_summary = excluded.blocked_summary, + state_json = excluded.state_json, + wait_json = excluded.wait_json, + cancel_requested_at = excluded.cancel_requested_at, + created_at = excluded.created_at, + updated_at = excluded.updated_at, + ended_at = excluded.ended_at + `), + deleteRow: db.prepare(`DELETE FROM flow_runs WHERE flow_id = ?`), + clearRows: db.prepare(`DELETE FROM flow_runs`), + }; +} + +function hasFlowRunsColumn(db: DatabaseSync, columnName: string): boolean { + const rows = db.prepare(`PRAGMA table_info(flow_runs)`).all() as Array<{ name?: string }>; + return rows.some((row) => row.name === columnName); +} + +function ensureSchema(db: DatabaseSync) { + db.exec(` + CREATE TABLE IF NOT EXISTS flow_runs ( + flow_id TEXT PRIMARY KEY, + shape TEXT, + sync_mode TEXT NOT NULL DEFAULT 'managed', + owner_key TEXT NOT NULL, + requester_origin_json TEXT, + controller_id TEXT, + revision INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + goal TEXT NOT NULL, + current_step TEXT, + blocked_task_id TEXT, + blocked_summary TEXT, + state_json TEXT, + wait_json TEXT, + cancel_requested_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + ended_at INTEGER + ); + `); + if (!hasFlowRunsColumn(db, "owner_key") && hasFlowRunsColumn(db, "owner_session_key")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`); + db.exec(` + UPDATE flow_runs + SET owner_key = owner_session_key + WHERE owner_key IS NULL + `); + } + if (!hasFlowRunsColumn(db, "shape")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN shape TEXT;`); + } + if (!hasFlowRunsColumn(db, "sync_mode")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN sync_mode TEXT;`); + if (hasFlowRunsColumn(db, "shape")) { + db.exec(` + UPDATE flow_runs + SET sync_mode = CASE + WHEN shape = 'single_task' THEN 'task_mirrored' + ELSE 'managed' + END + WHERE sync_mode IS NULL + `); + } else { + db.exec(` + UPDATE flow_runs + SET sync_mode = 'managed' + WHERE sync_mode IS NULL + `); + } + } + if (!hasFlowRunsColumn(db, "controller_id")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN controller_id TEXT;`); + } + db.exec(` + UPDATE flow_runs + SET controller_id = 'core/legacy-restored' + WHERE sync_mode = 'managed' + AND (controller_id IS NULL OR trim(controller_id) = '') + `); + if (!hasFlowRunsColumn(db, "revision")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN revision INTEGER;`); + db.exec(` + UPDATE flow_runs + SET revision = 0 + WHERE revision IS NULL + `); + } + if (!hasFlowRunsColumn(db, "blocked_task_id")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_task_id TEXT;`); + } + if (!hasFlowRunsColumn(db, "blocked_summary")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_summary TEXT;`); + } + if (!hasFlowRunsColumn(db, "state_json")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN state_json TEXT;`); + } + if (!hasFlowRunsColumn(db, "wait_json")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN wait_json TEXT;`); + } + if (!hasFlowRunsColumn(db, "cancel_requested_at")) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`); + } + db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`); +} + +function ensureFlowRegistryPermissions(pathname: string) { + const dir = resolveFlowRegistryDir(process.env); + mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE }); + chmodSync(dir, FLOW_REGISTRY_DIR_MODE); + for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) { + const candidate = `${pathname}${suffix}`; + if (!existsSync(candidate)) { + continue; + } + chmodSync(candidate, FLOW_REGISTRY_FILE_MODE); + } +} + +function openFlowRegistryDatabase(): FlowRegistryDatabase { + const pathname = resolveFlowRegistrySqlitePath(process.env); + if (cachedDatabase && cachedDatabase.path === pathname) { + return cachedDatabase; + } + if (cachedDatabase) { + cachedDatabase.db.close(); + cachedDatabase = null; + } + ensureFlowRegistryPermissions(pathname); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(pathname); + db.exec(`PRAGMA journal_mode = WAL;`); + db.exec(`PRAGMA synchronous = NORMAL;`); + db.exec(`PRAGMA busy_timeout = 5000;`); + ensureSchema(db); + ensureFlowRegistryPermissions(pathname); + cachedDatabase = { + db, + path: pathname, + statements: createStatements(db), + }; + return cachedDatabase; +} + +function withWriteTransaction(write: (statements: FlowRegistryStatements) => void) { + const { db, path, statements } = openFlowRegistryDatabase(); + db.exec("BEGIN IMMEDIATE"); + try { + write(statements); + db.exec("COMMIT"); + ensureFlowRegistryPermissions(path); + } catch (error) { + db.exec("ROLLBACK"); + throw error; + } +} + +export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot { + const { statements } = openFlowRegistryDatabase(); + const rows = statements.selectAll.all() as FlowRegistryRow[]; + return { + flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])), + }; +} + +export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) { + withWriteTransaction((statements) => { + statements.clearRows.run(); + for (const flow of snapshot.flows.values()) { + statements.upsertRow.run(bindFlowRecord(flow)); + } + }); +} + +export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) { + const store = openFlowRegistryDatabase(); + store.statements.upsertRow.run(bindFlowRecord(flow)); + ensureFlowRegistryPermissions(store.path); +} + +export function deleteFlowRegistryRecordFromSqlite(flowId: string) { + const store = openFlowRegistryDatabase(); + store.statements.deleteRow.run(flowId); + ensureFlowRegistryPermissions(store.path); +} + +export function closeFlowRegistrySqliteStore() { + if (!cachedDatabase) { + return; + } + cachedDatabase.db.close(); + cachedDatabase = null; +} diff --git a/src/tasks/flow-registry.store.test.ts b/src/tasks/flow-registry.store.test.ts new file mode 100644 index 00000000000..57960ea52d0 --- /dev/null +++ b/src/tasks/flow-registry.store.test.ts @@ -0,0 +1,195 @@ +import { statSync } from "node:fs"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + createManagedFlow, + getFlowById, + requestFlowCancel, + resetFlowRegistryForTests, + setFlowWaiting, +} from "./flow-registry.js"; +import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; +import { configureFlowRegistryRuntime } from "./flow-registry.store.js"; +import type { FlowRecord } from "./flow-registry.types.js"; + +function createStoredFlow(): FlowRecord { + return { + flowId: "flow-restored", + syncMode: "managed", + ownerKey: "agent:main:main", + controllerId: "tests/restored-controller", + revision: 4, + status: "blocked", + notifyPolicy: "done_only", + goal: "Restored flow", + currentStep: "spawn_task", + blockedTaskId: "task-restored", + blockedSummary: "Writable session required.", + stateJson: { lane: "triage", done: 3 }, + waitJson: { kind: "task", taskId: "task-restored" }, + cancelRequestedAt: 115, + createdAt: 100, + updatedAt: 120, + endedAt: 120, + }; +} + +async function withFlowRegistryTempDir(run: (root: string) => Promise): Promise { + return await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + try { + return await run(root); + } finally { + resetFlowRegistryForTests(); + } + }); +} + +describe("flow-registry store runtime", () => { + beforeEach(() => { + vi.useRealTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + delete process.env.OPENCLAW_STATE_DIR; + resetFlowRegistryForTests(); + }); + + it("uses the configured flow store for restore and save", () => { + const storedFlow = createStoredFlow(); + const loadSnapshot = vi.fn(() => ({ + flows: new Map([[storedFlow.flowId, storedFlow]]), + })); + const saveSnapshot = vi.fn(); + configureFlowRegistryRuntime({ + store: { + loadSnapshot, + saveSnapshot, + }, + }); + + expect(getFlowById("flow-restored")).toMatchObject({ + flowId: "flow-restored", + syncMode: "managed", + controllerId: "tests/restored-controller", + revision: 4, + stateJson: { lane: "triage", done: 3 }, + waitJson: { kind: "task", taskId: "task-restored" }, + cancelRequestedAt: 115, + }); + expect(loadSnapshot).toHaveBeenCalledTimes(1); + + createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/new-flow", + goal: "New flow", + status: "running", + currentStep: "wait_for", + }); + + expect(saveSnapshot).toHaveBeenCalled(); + const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as { + flows: ReadonlyMap; + }; + expect(latestSnapshot.flows.size).toBe(2); + expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow"); + }); + + it("restores persisted wait-state, revision, and cancel intent from sqlite", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/persisted-flow", + goal: "Persisted flow", + status: "running", + currentStep: "spawn_task", + stateJson: { phase: "spawn" }, + }); + const waiting = setFlowWaiting({ + flowId: created.flowId, + expectedRevision: created.revision, + currentStep: "ask_user", + stateJson: { phase: "ask_user" }, + waitJson: { kind: "external_event", topic: "telegram" }, + }); + expect(waiting).toMatchObject({ + applied: true, + }); + const cancelRequested = requestFlowCancel({ + flowId: created.flowId, + expectedRevision: waiting.applied ? waiting.flow.revision : -1, + cancelRequestedAt: 444, + }); + expect(cancelRequested).toMatchObject({ + applied: true, + }); + + resetFlowRegistryForTests({ persist: false }); + + expect(getFlowById(created.flowId)).toMatchObject({ + flowId: created.flowId, + syncMode: "managed", + controllerId: "tests/persisted-flow", + revision: 2, + status: "waiting", + currentStep: "ask_user", + stateJson: { phase: "ask_user" }, + waitJson: { kind: "external_event", topic: "telegram" }, + cancelRequestedAt: 444, + }); + }); + }); + + it("round-trips explicit json null through sqlite", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/null-roundtrip", + goal: "Persist null payloads", + stateJson: null, + waitJson: null, + }); + + resetFlowRegistryForTests({ persist: false }); + + expect(getFlowById(created.flowId)).toMatchObject({ + flowId: created.flowId, + stateJson: null, + waitJson: null, + }); + }); + }); + + it("hardens the sqlite flow store directory and file modes", async () => { + if (process.platform === "win32") { + return; + } + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/secured-flow", + goal: "Secured flow", + status: "blocked", + blockedTaskId: "task-secured", + blockedSummary: "Need auth.", + waitJson: { kind: "task", taskId: "task-secured" }, + }); + + const registryDir = resolveFlowRegistryDir(process.env); + const sqlitePath = resolveFlowRegistrySqlitePath(process.env); + expect(statSync(registryDir).mode & 0o777).toBe(0o700); + expect(statSync(sqlitePath).mode & 0o777).toBe(0o600); + }); + }); +}); diff --git a/src/tasks/flow-registry.store.ts b/src/tasks/flow-registry.store.ts new file mode 100644 index 00000000000..8cde01da367 --- /dev/null +++ b/src/tasks/flow-registry.store.ts @@ -0,0 +1,78 @@ +import { + closeFlowRegistrySqliteStore, + deleteFlowRegistryRecordFromSqlite, + loadFlowRegistryStateFromSqlite, + saveFlowRegistryStateToSqlite, + upsertFlowRegistryRecordToSqlite, +} from "./flow-registry.store.sqlite.js"; +import type { FlowRecord } from "./flow-registry.types.js"; + +export type FlowRegistryStoreSnapshot = { + flows: Map; +}; + +export type FlowRegistryStore = { + loadSnapshot: () => FlowRegistryStoreSnapshot; + saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void; + upsertFlow?: (flow: FlowRecord) => void; + deleteFlow?: (flowId: string) => void; + close?: () => void; +}; + +export type FlowRegistryHookEvent = + | { + kind: "restored"; + flows: FlowRecord[]; + } + | { + kind: "upserted"; + flow: FlowRecord; + previous?: FlowRecord; + } + | { + kind: "deleted"; + flowId: string; + previous: FlowRecord; + }; + +export type FlowRegistryHooks = { + // Hooks are incremental/observational. Snapshot persistence belongs to FlowRegistryStore. + onEvent?: (event: FlowRegistryHookEvent) => void; +}; + +const defaultFlowRegistryStore: FlowRegistryStore = { + loadSnapshot: loadFlowRegistryStateFromSqlite, + saveSnapshot: saveFlowRegistryStateToSqlite, + upsertFlow: upsertFlowRegistryRecordToSqlite, + deleteFlow: deleteFlowRegistryRecordFromSqlite, + close: closeFlowRegistrySqliteStore, +}; + +let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore; +let configuredFlowRegistryHooks: FlowRegistryHooks | null = null; + +export function getFlowRegistryStore(): FlowRegistryStore { + return configuredFlowRegistryStore; +} + +export function getFlowRegistryHooks(): FlowRegistryHooks | null { + return configuredFlowRegistryHooks; +} + +export function configureFlowRegistryRuntime(params: { + store?: FlowRegistryStore; + hooks?: FlowRegistryHooks | null; +}) { + if (params.store) { + configuredFlowRegistryStore = params.store; + } + if ("hooks" in params) { + configuredFlowRegistryHooks = params.hooks ?? null; + } +} + +export function resetFlowRegistryRuntimeForTests() { + configuredFlowRegistryStore.close?.(); + configuredFlowRegistryStore = defaultFlowRegistryStore; + configuredFlowRegistryHooks = null; +} diff --git a/src/tasks/flow-registry.test.ts b/src/tasks/flow-registry.test.ts new file mode 100644 index 00000000000..7e214c58d8d --- /dev/null +++ b/src/tasks/flow-registry.test.ts @@ -0,0 +1,371 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + createFlowRecord, + createFlowForTask, + createManagedFlow, + deleteFlowRecordById, + failFlow, + getFlowById, + listFlowRecords, + requestFlowCancel, + resetFlowRegistryForTests, + resumeFlow, + setFlowWaiting, + syncFlowFromTask, + updateFlowRecordByIdExpectedRevision, +} from "./flow-registry.js"; +import { configureFlowRegistryRuntime } from "./flow-registry.store.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + +async function withFlowRegistryTempDir(run: (root: string) => Promise): Promise { + return await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + try { + return await run(root); + } finally { + resetFlowRegistryForTests(); + } + }); +} + +describe("flow-registry", () => { + beforeEach(() => { + vi.useRealTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetFlowRegistryForTests(); + }); + + it("creates managed flows and updates them through revision-checked helpers", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-controller", + goal: "Investigate flaky test", + currentStep: "spawn_task", + stateJson: { phase: "spawn" }, + }); + + expect(created).toMatchObject({ + flowId: created.flowId, + syncMode: "managed", + controllerId: "tests/managed-controller", + revision: 0, + status: "queued", + currentStep: "spawn_task", + stateJson: { phase: "spawn" }, + }); + + const waiting = setFlowWaiting({ + flowId: created.flowId, + expectedRevision: created.revision, + currentStep: "await_review", + stateJson: { phase: "await_review" }, + waitJson: { kind: "task", taskId: "task-123" }, + }); + expect(waiting).toMatchObject({ + applied: true, + flow: expect.objectContaining({ + flowId: created.flowId, + revision: 1, + status: "waiting", + currentStep: "await_review", + waitJson: { kind: "task", taskId: "task-123" }, + }), + }); + + const conflict = updateFlowRecordByIdExpectedRevision({ + flowId: created.flowId, + expectedRevision: 0, + patch: { + currentStep: "stale", + }, + }); + expect(conflict).toMatchObject({ + applied: false, + reason: "revision_conflict", + current: expect.objectContaining({ + flowId: created.flowId, + revision: 1, + }), + }); + + const resumed = resumeFlow({ + flowId: created.flowId, + expectedRevision: 1, + status: "running", + currentStep: "resume_work", + }); + expect(resumed).toMatchObject({ + applied: true, + flow: expect.objectContaining({ + flowId: created.flowId, + revision: 2, + status: "running", + currentStep: "resume_work", + waitJson: null, + }), + }); + + const cancelRequested = requestFlowCancel({ + flowId: created.flowId, + expectedRevision: 2, + cancelRequestedAt: 400, + }); + expect(cancelRequested).toMatchObject({ + applied: true, + flow: expect.objectContaining({ + flowId: created.flowId, + revision: 3, + cancelRequestedAt: 400, + }), + }); + + const failed = failFlow({ + flowId: created.flowId, + expectedRevision: 3, + blockedSummary: "Task runner failed.", + endedAt: 500, + }); + expect(failed).toMatchObject({ + applied: true, + flow: expect.objectContaining({ + flowId: created.flowId, + revision: 4, + status: "failed", + blockedSummary: "Task runner failed.", + endedAt: 500, + }), + }); + + expect(listFlowRecords()).toEqual([ + expect.objectContaining({ + flowId: created.flowId, + revision: 4, + cancelRequestedAt: 400, + }), + ]); + + expect(deleteFlowRecordById(created.flowId)).toBe(true); + expect(getFlowById(created.flowId)).toBeUndefined(); + }); + }); + + it("requires a controller for managed flows and rejects clearing it later", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + expect(() => + createFlowRecord({ + ownerKey: "agent:main:main", + goal: "Missing controller", + }), + ).toThrow("Managed flow controllerId is required."); + + const created = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-controller", + goal: "Protected controller", + }); + + expect(() => + updateFlowRecordByIdExpectedRevision({ + flowId: created.flowId, + expectedRevision: created.revision, + patch: { + controllerId: null, + }, + }), + ).toThrow("Managed flow controllerId is required."); + }); + }); + + it("emits restored, upserted, and deleted flow hook events", () => { + const onEvent = vi.fn(); + configureFlowRegistryRuntime({ + store: { + loadSnapshot: () => ({ + flows: new Map(), + }), + saveSnapshot: () => {}, + }, + hooks: { + onEvent, + }, + }); + + const created = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/hooks", + goal: "Observe hooks", + }); + + deleteFlowRecordById(created.flowId); + + expect(onEvent).toHaveBeenCalledWith({ + kind: "restored", + flows: [], + }); + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "upserted", + flow: expect.objectContaining({ + flowId: created.flowId, + }), + }), + ); + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "deleted", + flowId: created.flowId, + }), + ); + }); + + it("normalizes restored managed flows without a controller id", () => { + configureFlowRegistryRuntime({ + store: { + loadSnapshot: () => ({ + flows: new Map([ + [ + "legacy-managed", + { + flowId: "legacy-managed", + syncMode: "managed", + ownerKey: "agent:main:main", + revision: 0, + status: "queued", + notifyPolicy: "done_only", + goal: "Legacy managed flow", + createdAt: 10, + updatedAt: 10, + }, + ], + ]), + }), + saveSnapshot: () => {}, + }, + }); + + expect(getFlowById("legacy-managed")).toMatchObject({ + flowId: "legacy-managed", + syncMode: "managed", + controllerId: "core/legacy-restored", + }); + }); + + it("mirrors one-task flow state from tasks and leaves managed flows alone", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const mirrored = createFlowForTask({ + task: { + ownerKey: "agent:main:main", + taskId: "task-running", + notifyPolicy: "done_only", + status: "running", + label: "Fix permissions", + task: "Fix permissions", + createdAt: 100, + lastEventAt: 100, + }, + }); + + const blocked = syncFlowFromTask({ + taskId: "task-blocked", + parentFlowId: mirrored.flowId, + status: "succeeded", + terminalOutcome: "blocked", + notifyPolicy: "done_only", + label: "Fix permissions", + task: "Fix permissions", + lastEventAt: 200, + endedAt: 200, + terminalSummary: "Writable session required.", + }); + expect(blocked).toMatchObject({ + flowId: mirrored.flowId, + syncMode: "task_mirrored", + status: "blocked", + blockedTaskId: "task-blocked", + blockedSummary: "Writable session required.", + }); + + const managed = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed", + goal: "Cluster PRs", + currentStep: "wait_for", + status: "waiting", + waitJson: { kind: "external_event" }, + }); + const syncedManaged = syncFlowFromTask({ + taskId: "task-child", + parentFlowId: managed.flowId, + status: "running", + notifyPolicy: "done_only", + label: "Child task", + task: "Child task", + lastEventAt: 250, + progressSummary: "Running child task", + }); + expect(syncedManaged).toMatchObject({ + flowId: managed.flowId, + syncMode: "managed", + status: "waiting", + currentStep: "wait_for", + waitJson: { kind: "external_event" }, + }); + }); + }); + + it("preserves explicit json null in state and wait payloads", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/null-state", + goal: "Null payloads", + stateJson: null, + waitJson: null, + }); + + expect(created).toMatchObject({ + flowId: created.flowId, + stateJson: null, + waitJson: null, + }); + + const resumed = resumeFlow({ + flowId: created.flowId, + expectedRevision: created.revision, + stateJson: null, + }); + + expect(resumed).toMatchObject({ + applied: true, + flow: expect.objectContaining({ + flowId: created.flowId, + stateJson: null, + }), + }); + }); + }); +}); diff --git a/src/tasks/flow-registry.ts b/src/tasks/flow-registry.ts new file mode 100644 index 00000000000..baa51bf74d2 --- /dev/null +++ b/src/tasks/flow-registry.ts @@ -0,0 +1,690 @@ +import crypto from "node:crypto"; +import { + getFlowRegistryHooks, + getFlowRegistryStore, + resetFlowRegistryRuntimeForTests, + type FlowRegistryHookEvent, +} from "./flow-registry.store.js"; +import type { FlowRecord, FlowStatus, FlowSyncMode, JsonValue } from "./flow-registry.types.js"; +import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; + +const flows = new Map(); +let restoreAttempted = false; + +type FlowRecordPatch = Omit< + Partial< + Pick< + FlowRecord, + | "status" + | "notifyPolicy" + | "goal" + | "currentStep" + | "blockedTaskId" + | "blockedSummary" + | "controllerId" + | "stateJson" + | "waitJson" + | "cancelRequestedAt" + | "updatedAt" + | "endedAt" + > + >, + | "currentStep" + | "blockedTaskId" + | "blockedSummary" + | "controllerId" + | "stateJson" + | "waitJson" + | "cancelRequestedAt" + | "endedAt" +> & { + currentStep?: string | null; + blockedTaskId?: string | null; + blockedSummary?: string | null; + controllerId?: string | null; + stateJson?: JsonValue | null; + waitJson?: JsonValue | null; + cancelRequestedAt?: number | null; + endedAt?: number | null; +}; + +export type FlowUpdateResult = + | { + applied: true; + flow: FlowRecord; + } + | { + applied: false; + reason: "not_found" | "revision_conflict"; + current?: FlowRecord; + }; + +function cloneStructuredValue(value: T | undefined): T | undefined { + if (value === undefined) { + return undefined; + } + return structuredClone(value); +} + +function cloneFlowRecord(record: FlowRecord): FlowRecord { + return { + ...record, + ...(record.requesterOrigin + ? { requesterOrigin: cloneStructuredValue(record.requesterOrigin)! } + : {}), + ...(record.stateJson !== undefined + ? { stateJson: cloneStructuredValue(record.stateJson)! } + : {}), + ...(record.waitJson !== undefined ? { waitJson: cloneStructuredValue(record.waitJson)! } : {}), + }; +} + +function normalizeRestoredFlowRecord(record: FlowRecord): FlowRecord { + const syncMode = record.syncMode === "task_mirrored" ? "task_mirrored" : "managed"; + const controllerId = + syncMode === "managed" + ? (normalizeText(record.controllerId) ?? "core/legacy-restored") + : undefined; + return { + ...record, + syncMode, + ownerKey: assertFlowOwnerKey(record.ownerKey), + ...(record.requesterOrigin + ? { requesterOrigin: cloneStructuredValue(record.requesterOrigin)! } + : {}), + ...(controllerId ? { controllerId } : {}), + currentStep: normalizeText(record.currentStep), + blockedTaskId: normalizeText(record.blockedTaskId), + blockedSummary: normalizeText(record.blockedSummary), + ...(record.stateJson !== undefined + ? { stateJson: cloneStructuredValue(record.stateJson)! } + : {}), + ...(record.waitJson !== undefined ? { waitJson: cloneStructuredValue(record.waitJson)! } : {}), + revision: Math.max(0, record.revision), + cancelRequestedAt: record.cancelRequestedAt ?? undefined, + endedAt: record.endedAt ?? undefined, + }; +} + +function snapshotFlowRecords(source: ReadonlyMap): FlowRecord[] { + return [...source.values()].map((record) => cloneFlowRecord(record)); +} + +function emitFlowRegistryHookEvent(createEvent: () => FlowRegistryHookEvent): void { + const hooks = getFlowRegistryHooks(); + if (!hooks?.onEvent) { + return; + } + try { + hooks.onEvent(createEvent()); + } catch { + // Flow hooks are observational. They must not break registry writes. + } +} + +function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy { + return notifyPolicy ?? "done_only"; +} + +function normalizeOwnerKey(ownerKey?: string): string | undefined { + const trimmed = ownerKey?.trim(); + return trimmed ? trimmed : undefined; +} + +function normalizeText(value?: string | null): string | undefined { + const trimmed = value?.trim(); + return trimmed ? trimmed : undefined; +} + +function normalizeJsonBlob(value: JsonValue | null | undefined): JsonValue | undefined { + return value === undefined ? undefined : cloneStructuredValue(value); +} + +function assertFlowOwnerKey(ownerKey: string): string { + const normalized = normalizeOwnerKey(ownerKey); + if (!normalized) { + throw new Error("Flow ownerKey is required."); + } + return normalized; +} + +function assertControllerId(controllerId?: string | null): string { + const normalized = normalizeText(controllerId); + if (!normalized) { + throw new Error("Managed flow controllerId is required."); + } + return normalized; +} + +function resolveFlowGoal(task: Pick): string { + return task.label?.trim() || task.task.trim() || "Background task"; +} + +function resolveFlowBlockedSummary( + task: Pick, +): string | undefined { + if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") { + return undefined; + } + return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined; +} + +export function deriveFlowStatusFromTask( + task: Pick, +): FlowStatus { + if (task.status === "queued") { + return "queued"; + } + if (task.status === "running") { + return "running"; + } + if (task.status === "succeeded") { + return task.terminalOutcome === "blocked" ? "blocked" : "succeeded"; + } + if (task.status === "cancelled") { + return "cancelled"; + } + if (task.status === "lost") { + return "lost"; + } + return "failed"; +} + +function ensureFlowRegistryReady() { + if (restoreAttempted) { + return; + } + restoreAttempted = true; + const restored = getFlowRegistryStore().loadSnapshot(); + flows.clear(); + for (const [flowId, flow] of restored.flows) { + flows.set(flowId, normalizeRestoredFlowRecord(flow)); + } + emitFlowRegistryHookEvent(() => ({ + kind: "restored", + flows: snapshotFlowRecords(flows), + })); +} + +function persistFlowRegistry() { + getFlowRegistryStore().saveSnapshot({ + flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])), + }); +} + +function persistFlowUpsert(flow: FlowRecord) { + const store = getFlowRegistryStore(); + if (store.upsertFlow) { + store.upsertFlow(cloneFlowRecord(flow)); + return; + } + persistFlowRegistry(); +} + +function persistFlowDelete(flowId: string) { + const store = getFlowRegistryStore(); + if (store.deleteFlow) { + store.deleteFlow(flowId); + return; + } + persistFlowRegistry(); +} + +function buildFlowRecord(params: { + syncMode?: FlowSyncMode; + ownerKey: string; + requesterOrigin?: FlowRecord["requesterOrigin"]; + controllerId?: string | null; + revision?: number; + status?: FlowStatus; + notifyPolicy?: TaskNotifyPolicy; + goal: string; + currentStep?: string | null; + blockedTaskId?: string | null; + blockedSummary?: string | null; + stateJson?: JsonValue | null; + waitJson?: JsonValue | null; + cancelRequestedAt?: number | null; + createdAt?: number; + updatedAt?: number; + endedAt?: number | null; +}): FlowRecord { + const now = params.createdAt ?? Date.now(); + const syncMode = params.syncMode ?? "managed"; + const controllerId = syncMode === "managed" ? assertControllerId(params.controllerId) : undefined; + return { + flowId: crypto.randomUUID(), + syncMode, + ownerKey: assertFlowOwnerKey(params.ownerKey), + ...(params.requesterOrigin + ? { requesterOrigin: cloneStructuredValue(params.requesterOrigin)! } + : {}), + ...(controllerId ? { controllerId } : {}), + revision: Math.max(0, params.revision ?? 0), + status: params.status ?? "queued", + notifyPolicy: ensureNotifyPolicy(params.notifyPolicy), + goal: params.goal, + currentStep: normalizeText(params.currentStep), + blockedTaskId: normalizeText(params.blockedTaskId), + blockedSummary: normalizeText(params.blockedSummary), + ...(normalizeJsonBlob(params.stateJson) !== undefined + ? { stateJson: normalizeJsonBlob(params.stateJson)! } + : {}), + ...(normalizeJsonBlob(params.waitJson) !== undefined + ? { waitJson: normalizeJsonBlob(params.waitJson)! } + : {}), + ...(params.cancelRequestedAt != null ? { cancelRequestedAt: params.cancelRequestedAt } : {}), + createdAt: now, + updatedAt: params.updatedAt ?? now, + ...(params.endedAt != null ? { endedAt: params.endedAt } : {}), + }; +} + +function applyFlowPatch(current: FlowRecord, patch: FlowRecordPatch): FlowRecord { + const controllerId = + patch.controllerId === undefined ? current.controllerId : normalizeText(patch.controllerId); + if (current.syncMode === "managed") { + assertControllerId(controllerId); + } + return { + ...current, + ...(patch.status ? { status: patch.status } : {}), + ...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}), + ...(patch.goal ? { goal: patch.goal } : {}), + controllerId, + currentStep: + patch.currentStep === undefined ? current.currentStep : normalizeText(patch.currentStep), + blockedTaskId: + patch.blockedTaskId === undefined + ? current.blockedTaskId + : normalizeText(patch.blockedTaskId), + blockedSummary: + patch.blockedSummary === undefined + ? current.blockedSummary + : normalizeText(patch.blockedSummary), + stateJson: + patch.stateJson === undefined ? current.stateJson : normalizeJsonBlob(patch.stateJson), + waitJson: patch.waitJson === undefined ? current.waitJson : normalizeJsonBlob(patch.waitJson), + cancelRequestedAt: + patch.cancelRequestedAt === undefined + ? current.cancelRequestedAt + : (patch.cancelRequestedAt ?? undefined), + revision: current.revision + 1, + updatedAt: patch.updatedAt ?? Date.now(), + endedAt: patch.endedAt === undefined ? current.endedAt : (patch.endedAt ?? undefined), + }; +} + +function writeFlowRecord(next: FlowRecord, previous?: FlowRecord): FlowRecord { + flows.set(next.flowId, next); + persistFlowUpsert(next); + emitFlowRegistryHookEvent(() => ({ + kind: "upserted", + flow: cloneFlowRecord(next), + ...(previous ? { previous: cloneFlowRecord(previous) } : {}), + })); + return cloneFlowRecord(next); +} + +export function createFlowRecord(params: { + syncMode?: FlowSyncMode; + ownerKey: string; + requesterOrigin?: FlowRecord["requesterOrigin"]; + controllerId?: string | null; + revision?: number; + status?: FlowStatus; + notifyPolicy?: TaskNotifyPolicy; + goal: string; + currentStep?: string | null; + blockedTaskId?: string | null; + blockedSummary?: string | null; + stateJson?: JsonValue | null; + waitJson?: JsonValue | null; + cancelRequestedAt?: number | null; + createdAt?: number; + updatedAt?: number; + endedAt?: number | null; +}): FlowRecord { + ensureFlowRegistryReady(); + const record = buildFlowRecord(params); + return writeFlowRecord(record); +} + +export function createManagedFlow(params: { + ownerKey: string; + controllerId: string; + requesterOrigin?: FlowRecord["requesterOrigin"]; + status?: FlowStatus; + notifyPolicy?: TaskNotifyPolicy; + goal: string; + currentStep?: string | null; + blockedTaskId?: string | null; + blockedSummary?: string | null; + stateJson?: JsonValue | null; + waitJson?: JsonValue | null; + cancelRequestedAt?: number | null; + createdAt?: number; + updatedAt?: number; + endedAt?: number | null; +}): FlowRecord { + return createFlowRecord({ + ...params, + syncMode: "managed", + controllerId: assertControllerId(params.controllerId), + }); +} + +export function createFlowForTask(params: { + task: Pick< + TaskRecord, + | "ownerKey" + | "taskId" + | "notifyPolicy" + | "status" + | "terminalOutcome" + | "label" + | "task" + | "createdAt" + | "lastEventAt" + | "endedAt" + | "terminalSummary" + | "progressSummary" + >; + requesterOrigin?: FlowRecord["requesterOrigin"]; +}): FlowRecord { + const terminalFlowStatus = deriveFlowStatusFromTask(params.task); + const isTerminal = + terminalFlowStatus === "succeeded" || + terminalFlowStatus === "blocked" || + terminalFlowStatus === "failed" || + terminalFlowStatus === "cancelled" || + terminalFlowStatus === "lost"; + const endedAt = isTerminal + ? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt) + : undefined; + return createFlowRecord({ + syncMode: "task_mirrored", + ownerKey: params.task.ownerKey, + requesterOrigin: params.requesterOrigin, + status: terminalFlowStatus, + notifyPolicy: params.task.notifyPolicy, + goal: resolveFlowGoal(params.task), + blockedTaskId: + terminalFlowStatus === "blocked" ? params.task.taskId.trim() || undefined : undefined, + blockedSummary: resolveFlowBlockedSummary(params.task), + createdAt: params.task.createdAt, + updatedAt: params.task.lastEventAt ?? params.task.createdAt, + ...(endedAt !== undefined ? { endedAt } : {}), + }); +} + +function updateFlowRecordByIdUnchecked(flowId: string, patch: FlowRecordPatch): FlowRecord | null { + ensureFlowRegistryReady(); + const current = flows.get(flowId); + if (!current) { + return null; + } + return writeFlowRecord(applyFlowPatch(current, patch), current); +} + +export function updateFlowRecordByIdExpectedRevision(params: { + flowId: string; + expectedRevision: number; + patch: FlowRecordPatch; +}): FlowUpdateResult { + ensureFlowRegistryReady(); + const current = flows.get(params.flowId); + if (!current) { + return { + applied: false, + reason: "not_found", + }; + } + if (current.revision !== params.expectedRevision) { + return { + applied: false, + reason: "revision_conflict", + current: cloneFlowRecord(current), + }; + } + return { + applied: true, + flow: writeFlowRecord(applyFlowPatch(current, params.patch), current), + }; +} + +export function setFlowWaiting(params: { + flowId: string; + expectedRevision: number; + currentStep?: string | null; + stateJson?: JsonValue | null; + waitJson?: JsonValue | null; + blockedTaskId?: string | null; + blockedSummary?: string | null; + updatedAt?: number; +}): FlowUpdateResult { + return updateFlowRecordByIdExpectedRevision({ + flowId: params.flowId, + expectedRevision: params.expectedRevision, + patch: { + status: + normalizeText(params.blockedTaskId) || normalizeText(params.blockedSummary) + ? "blocked" + : "waiting", + currentStep: params.currentStep, + stateJson: params.stateJson, + waitJson: params.waitJson, + blockedTaskId: params.blockedTaskId, + blockedSummary: params.blockedSummary, + endedAt: null, + updatedAt: params.updatedAt, + }, + }); +} + +export function resumeFlow(params: { + flowId: string; + expectedRevision: number; + status?: Extract; + currentStep?: string | null; + stateJson?: JsonValue | null; + updatedAt?: number; +}): FlowUpdateResult { + return updateFlowRecordByIdExpectedRevision({ + flowId: params.flowId, + expectedRevision: params.expectedRevision, + patch: { + status: params.status ?? "queued", + currentStep: params.currentStep, + stateJson: params.stateJson, + waitJson: null, + blockedTaskId: null, + blockedSummary: null, + endedAt: null, + updatedAt: params.updatedAt, + }, + }); +} + +export function finishFlow(params: { + flowId: string; + expectedRevision: number; + currentStep?: string | null; + stateJson?: JsonValue | null; + updatedAt?: number; + endedAt?: number; +}): FlowUpdateResult { + const endedAt = params.endedAt ?? params.updatedAt ?? Date.now(); + return updateFlowRecordByIdExpectedRevision({ + flowId: params.flowId, + expectedRevision: params.expectedRevision, + patch: { + status: "succeeded", + currentStep: params.currentStep, + stateJson: params.stateJson, + waitJson: null, + blockedTaskId: null, + blockedSummary: null, + endedAt, + updatedAt: params.updatedAt ?? endedAt, + }, + }); +} + +export function failFlow(params: { + flowId: string; + expectedRevision: number; + currentStep?: string | null; + stateJson?: JsonValue | null; + blockedTaskId?: string | null; + blockedSummary?: string | null; + updatedAt?: number; + endedAt?: number; +}): FlowUpdateResult { + const endedAt = params.endedAt ?? params.updatedAt ?? Date.now(); + return updateFlowRecordByIdExpectedRevision({ + flowId: params.flowId, + expectedRevision: params.expectedRevision, + patch: { + status: "failed", + currentStep: params.currentStep, + stateJson: params.stateJson, + waitJson: null, + blockedTaskId: params.blockedTaskId, + blockedSummary: params.blockedSummary, + endedAt, + updatedAt: params.updatedAt ?? endedAt, + }, + }); +} + +export function requestFlowCancel(params: { + flowId: string; + expectedRevision: number; + cancelRequestedAt?: number; + updatedAt?: number; +}): FlowUpdateResult { + return updateFlowRecordByIdExpectedRevision({ + flowId: params.flowId, + expectedRevision: params.expectedRevision, + patch: { + cancelRequestedAt: params.cancelRequestedAt ?? params.updatedAt ?? Date.now(), + updatedAt: params.updatedAt, + }, + }); +} + +export function syncFlowFromTask( + task: Pick< + TaskRecord, + | "parentFlowId" + | "status" + | "terminalOutcome" + | "notifyPolicy" + | "label" + | "task" + | "lastEventAt" + | "endedAt" + | "taskId" + | "terminalSummary" + | "progressSummary" + >, +): FlowRecord | null { + const flowId = task.parentFlowId?.trim(); + if (!flowId) { + return null; + } + const flow = getFlowById(flowId); + if (!flow) { + return null; + } + if (flow.syncMode !== "task_mirrored") { + return flow; + } + const terminalFlowStatus = deriveFlowStatusFromTask(task); + const isTerminal = + terminalFlowStatus === "succeeded" || + terminalFlowStatus === "blocked" || + terminalFlowStatus === "failed" || + terminalFlowStatus === "cancelled" || + terminalFlowStatus === "lost"; + return updateFlowRecordByIdUnchecked(flowId, { + status: terminalFlowStatus, + notifyPolicy: task.notifyPolicy, + goal: resolveFlowGoal(task), + blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || null : null, + blockedSummary: + terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null, + waitJson: null, + updatedAt: task.lastEventAt ?? Date.now(), + ...(isTerminal + ? { + endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(), + } + : { endedAt: null }), + }); +} + +export function getFlowById(flowId: string): FlowRecord | undefined { + ensureFlowRegistryReady(); + const flow = flows.get(flowId); + return flow ? cloneFlowRecord(flow) : undefined; +} + +export function listFlowsForOwnerKey(ownerKey: string): FlowRecord[] { + ensureFlowRegistryReady(); + const normalizedOwnerKey = ownerKey.trim(); + if (!normalizedOwnerKey) { + return []; + } + return [...flows.values()] + .filter((flow) => flow.ownerKey.trim() === normalizedOwnerKey) + .map((flow) => cloneFlowRecord(flow)) + .toSorted((left, right) => right.createdAt - left.createdAt); +} + +export function findLatestFlowForOwnerKey(ownerKey: string): FlowRecord | undefined { + const flow = listFlowsForOwnerKey(ownerKey)[0]; + return flow ? cloneFlowRecord(flow) : undefined; +} + +export function resolveFlowForLookupToken(token: string): FlowRecord | undefined { + const lookup = token.trim(); + if (!lookup) { + return undefined; + } + return getFlowById(lookup) ?? findLatestFlowForOwnerKey(lookup); +} + +export function listFlowRecords(): FlowRecord[] { + ensureFlowRegistryReady(); + return [...flows.values()] + .map((flow) => cloneFlowRecord(flow)) + .toSorted((left, right) => right.createdAt - left.createdAt); +} + +export function deleteFlowRecordById(flowId: string): boolean { + ensureFlowRegistryReady(); + const current = flows.get(flowId); + if (!current) { + return false; + } + flows.delete(flowId); + persistFlowDelete(flowId); + emitFlowRegistryHookEvent(() => ({ + kind: "deleted", + flowId, + previous: cloneFlowRecord(current), + })); + return true; +} + +export function resetFlowRegistryForTests(opts?: { persist?: boolean }) { + flows.clear(); + restoreAttempted = false; + resetFlowRegistryRuntimeForTests(); + if (opts?.persist !== false) { + persistFlowRegistry(); + getFlowRegistryStore().close?.(); + } +} diff --git a/src/tasks/flow-registry.types.ts b/src/tasks/flow-registry.types.ts new file mode 100644 index 00000000000..a328e15a683 --- /dev/null +++ b/src/tasks/flow-registry.types.ts @@ -0,0 +1,43 @@ +import type { DeliveryContext } from "../utils/delivery-context.js"; +import type { TaskNotifyPolicy } from "./task-registry.types.js"; + +export type JsonValue = + | null + | boolean + | number + | string + | JsonValue[] + | { [key: string]: JsonValue }; + +export type FlowSyncMode = "task_mirrored" | "managed"; + +export type FlowStatus = + | "queued" + | "running" + | "waiting" + | "blocked" + | "succeeded" + | "failed" + | "cancelled" + | "lost"; + +export type FlowRecord = { + flowId: string; + syncMode: FlowSyncMode; + ownerKey: string; + requesterOrigin?: DeliveryContext; + controllerId?: string; + revision: number; + status: FlowStatus; + notifyPolicy: TaskNotifyPolicy; + goal: string; + currentStep?: string; + blockedTaskId?: string; + blockedSummary?: string; + stateJson?: JsonValue; + waitJson?: JsonValue; + cancelRequestedAt?: number; + createdAt: number; + updatedAt: number; + endedAt?: number; +}; diff --git a/src/tasks/flow-runtime-internal.ts b/src/tasks/flow-runtime-internal.ts new file mode 100644 index 00000000000..b085424a94a --- /dev/null +++ b/src/tasks/flow-runtime-internal.ts @@ -0,0 +1,17 @@ +export { + createFlowForTask, + createFlowRecord, + createManagedFlow, + deleteFlowRecordById, + findLatestFlowForOwnerKey, + getFlowById, + listFlowRecords, + listFlowsForOwnerKey, + requestFlowCancel, + resolveFlowForLookupToken, + resetFlowRegistryForTests, + resumeFlow, + setFlowWaiting, + syncFlowFromTask, + updateFlowRecordByIdExpectedRevision, +} from "./flow-registry.js"; diff --git a/src/tasks/runtime-internal.ts b/src/tasks/runtime-internal.ts index bbd3353c33c..53566c7232b 100644 --- a/src/tasks/runtime-internal.ts +++ b/src/tasks/runtime-internal.ts @@ -4,14 +4,17 @@ export { deleteTaskRecordById, ensureTaskRegistryReady, findLatestTaskForOwnerKey, + findLatestTaskForFlowId, findLatestTaskForRelatedSessionKey, findTaskByRunId, getTaskById, getTaskRegistrySnapshot, getTaskRegistrySummary, listTaskRecords, + listTasksForFlowId, listTasksForOwnerKey, listTasksForRelatedSessionKey, + linkTaskToFlowById, markTaskLostById, markTaskRunningByRunId, markTaskTerminalById, diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index f88cc014517..86cf925a123 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -1,16 +1,30 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { + createManagedFlow, + getFlowById, + listFlowRecords, + resetFlowRegistryForTests, +} from "./flow-registry.js"; +import { + cancelFlowById, + cancelFlowByIdForOwner, cancelDetachedTaskRunById, completeTaskRunByRunId, createQueuedTaskRun, createRunningTaskRun, failTaskRunByRunId, recordTaskRunProgressByRunId, + retryBlockedFlowAsQueuedTaskRun, setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, } from "./task-executor.js"; -import { getTaskById, resetTaskRegistryForTests } from "./task-registry.js"; +import { + getTaskById, + findLatestTaskForFlowId, + findTaskByRunId, + resetTaskRegistryForTests, +} from "./task-registry.js"; const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; const hoisted = vi.hoisted(() => { @@ -42,10 +56,12 @@ async function withTaskExecutorStateDir(run: (root: string) => Promise): P await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); + resetFlowRegistryForTests(); try { await run(root); } finally { resetTaskRegistryForTests(); + resetFlowRegistryForTests(); } }); } @@ -58,6 +74,7 @@ describe("task-executor", () => { process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; } resetTaskRegistryForTests(); + resetFlowRegistryForTests(); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -141,7 +158,64 @@ describe("task-executor", () => { }); }); - it("records blocked task outcomes without wrapping them in a separate flow model", async () => { + it("auto-creates a one-task flow and keeps it synced with task status", async () => { + await withTaskExecutorStateDir(async () => { + const created = createRunningTaskRun({ + runtime: "subagent", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:codex:subagent:child", + runId: "run-executor-flow", + task: "Write summary", + startedAt: 10, + deliveryStatus: "pending", + }); + + expect(created.parentFlowId).toEqual(expect.any(String)); + expect(getFlowById(created.parentFlowId!)).toMatchObject({ + flowId: created.parentFlowId, + ownerKey: "agent:main:main", + status: "running", + goal: "Write summary", + notifyPolicy: "done_only", + }); + + completeTaskRunByRunId({ + runId: "run-executor-flow", + endedAt: 40, + lastEventAt: 40, + terminalSummary: "Done.", + }); + + expect(getFlowById(created.parentFlowId!)).toMatchObject({ + flowId: created.parentFlowId, + status: "succeeded", + endedAt: 40, + goal: "Write summary", + notifyPolicy: "done_only", + }); + }); + }); + + it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => { + await withTaskExecutorStateDir(async () => { + const created = createRunningTaskRun({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:main", + runId: "run-executor-cli", + task: "Foreground gateway run", + deliveryStatus: "not_applicable", + startedAt: 10, + }); + + expect(created.parentFlowId).toBeUndefined(); + expect(listFlowRecords()).toEqual([]); + }); + }); + + it("records blocked metadata on one-task flows and reuses the same flow for queued retries", async () => { await withTaskExecutorStateDir(async () => { const created = createRunningTaskRun({ runtime: "acp", @@ -156,7 +230,6 @@ describe("task-executor", () => { task: "Patch file", startedAt: 10, deliveryStatus: "pending", - notifyPolicy: "silent", }); completeTaskRunByRunId({ @@ -173,6 +246,113 @@ describe("task-executor", () => { terminalOutcome: "blocked", terminalSummary: "Writable session required.", }); + expect(getFlowById(created.parentFlowId!)).toMatchObject({ + flowId: created.parentFlowId, + status: "blocked", + blockedTaskId: created.taskId, + blockedSummary: "Writable session required.", + endedAt: 40, + }); + + const retried = retryBlockedFlowAsQueuedTaskRun({ + flowId: created.parentFlowId!, + runId: "run-executor-retry", + childSessionKey: "agent:codex:acp:retry-child", + }); + + expect(retried).toMatchObject({ + found: true, + retried: true, + previousTask: expect.objectContaining({ + taskId: created.taskId, + }), + task: expect.objectContaining({ + parentFlowId: created.parentFlowId, + parentTaskId: created.taskId, + status: "queued", + runId: "run-executor-retry", + }), + }); + expect(getFlowById(created.parentFlowId!)).toMatchObject({ + flowId: created.parentFlowId, + status: "queued", + }); + expect(findLatestTaskForFlowId(created.parentFlowId!)).toMatchObject({ + runId: "run-executor-retry", + }); + expect(findTaskByRunId("run-executor-blocked")).toMatchObject({ + taskId: created.taskId, + status: "succeeded", + terminalOutcome: "blocked", + terminalSummary: "Writable session required.", + }); + }); + }); + + it("cancels active tasks linked to a managed flow", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-flow", + goal: "Inspect PR batch", + }); + const child = createRunningTaskRun({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-linear-cancel", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + const cancelled = await cancelFlowById({ + cfg: {} as never, + flowId: flow.flowId, + }); + + expect(cancelled).toMatchObject({ + found: true, + cancelled: true, + }); + expect(findTaskByRunId("run-linear-cancel")).toMatchObject({ + taskId: child.taskId, + status: "cancelled", + }); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "cancelled", + }); + }); + }); + + it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => { + await withTaskExecutorStateDir(async () => { + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-flow", + goal: "Protected flow", + }); + + const cancelled = await cancelFlowByIdForOwner({ + cfg: {} as never, + flowId: flow.flowId, + callerOwnerKey: "agent:main:other", + }); + + expect(cancelled).toMatchObject({ + found: false, + cancelled: false, + reason: "Flow not found.", + }); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "queued", + }); }); }); diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index f20ee9722b8..4c7503c7e75 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -1,24 +1,85 @@ import type { OpenClawConfig } from "../config/config.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { getFlowByIdForOwner } from "./flow-owner-access.js"; +import type { FlowRecord } from "./flow-registry.types.js"; +import { + createFlowForTask, + deleteFlowRecordById, + getFlowById, + updateFlowRecordByIdExpectedRevision, +} from "./flow-runtime-internal.js"; import { cancelTaskById, createTaskRecord, + findLatestTaskForFlowId, + linkTaskToFlowById, + listTasksForFlowId, markTaskLostById, markTaskRunningByRunId, markTaskTerminalByRunId, recordTaskProgressByRunId, setTaskRunDeliveryStatusByRunId, } from "./runtime-internal.js"; +import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskDeliveryState, TaskDeliveryStatus, TaskNotifyPolicy, TaskRecord, + TaskRegistrySummary, TaskRuntime, TaskScopeKind, TaskStatus, TaskTerminalOutcome, } from "./task-registry.types.js"; +const log = createSubsystemLogger("tasks/executor"); + +function isOneTaskFlowEligible(task: TaskRecord): boolean { + if (task.parentFlowId?.trim() || task.scopeKind !== "session") { + return false; + } + if (task.deliveryStatus === "not_applicable") { + return false; + } + return task.runtime === "acp" || task.runtime === "subagent"; +} + +function ensureSingleTaskFlow(params: { + task: TaskRecord; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; +}): TaskRecord { + if (!isOneTaskFlowEligible(params.task)) { + return params.task; + } + try { + const flow = createFlowForTask({ + task: params.task, + requesterOrigin: params.requesterOrigin, + }); + const linked = linkTaskToFlowById({ + taskId: params.task.taskId, + flowId: flow.flowId, + }); + if (!linked) { + deleteFlowRecordById(flow.flowId); + return params.task; + } + if (linked.parentFlowId !== flow.flowId) { + deleteFlowRecordById(flow.flowId); + return linked; + } + return linked; + } catch (error) { + log.warn("Failed to create one-task flow for detached run", { + taskId: params.task.taskId, + runId: params.task.runId, + error, + }); + return params.task; + } +} + export function createQueuedTaskRun(params: { runtime: TaskRuntime; sourceId?: string; @@ -26,6 +87,7 @@ export function createQueuedTaskRun(params: { ownerKey?: string; scopeKind?: TaskScopeKind; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -36,10 +98,18 @@ export function createQueuedTaskRun(params: { notifyPolicy?: TaskNotifyPolicy; deliveryStatus?: TaskDeliveryStatus; }): TaskRecord { - return createTaskRecord({ + const task = createTaskRecord({ ...params, status: "queued", }); + return ensureSingleTaskFlow({ + task, + requesterOrigin: params.requesterOrigin, + }); +} + +export function getFlowTaskSummary(flowId: string): TaskRegistrySummary { + return summarizeTaskRecords(listTasksForFlowId(flowId)); } export function createRunningTaskRun(params: { @@ -49,6 +119,7 @@ export function createRunningTaskRun(params: { ownerKey?: string; scopeKind?: TaskScopeKind; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -62,10 +133,14 @@ export function createRunningTaskRun(params: { lastEventAt?: number; progressSummary?: string | null; }): TaskRecord { - return createTaskRecord({ + const task = createTaskRecord({ ...params, status: "running", }); + return ensureSingleTaskFlow({ + task, + requesterOrigin: params.requesterOrigin, + }); } export function startTaskRunByRunId(params: { @@ -157,6 +232,255 @@ export function setDetachedTaskDeliveryStatusByRunId(params: { return setTaskRunDeliveryStatusByRunId(params); } +type RetryBlockedFlowResult = { + found: boolean; + retried: boolean; + reason?: string; + previousTask?: TaskRecord; + task?: TaskRecord; +}; + +type RetryBlockedFlowParams = { + flowId: string; + sourceId?: string; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + childSessionKey?: string; + agentId?: string; + runId?: string; + label?: string; + task?: string; + preferMetadata?: boolean; + notifyPolicy?: TaskNotifyPolicy; + deliveryStatus?: TaskDeliveryStatus; + status: "queued" | "running"; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; +}; + +function resolveRetryableBlockedFlowTask(flowId: string): { + flowFound: boolean; + retryable: boolean; + latestTask?: TaskRecord; + reason?: string; +} { + const flow = getFlowById(flowId); + if (!flow) { + return { + flowFound: false, + retryable: false, + reason: "Flow not found.", + }; + } + const latestTask = findLatestTaskForFlowId(flowId); + if (!latestTask) { + return { + flowFound: true, + retryable: false, + reason: "Flow has no retryable task.", + }; + } + if (flow.status !== "blocked") { + return { + flowFound: true, + retryable: false, + latestTask, + reason: "Flow is not blocked.", + }; + } + if (latestTask.status !== "succeeded" || latestTask.terminalOutcome !== "blocked") { + return { + flowFound: true, + retryable: false, + latestTask, + reason: "Latest flow task is not blocked.", + }; + } + return { + flowFound: true, + retryable: true, + latestTask, + }; +} + +function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowResult { + const resolved = resolveRetryableBlockedFlowTask(params.flowId); + if (!resolved.retryable || !resolved.latestTask) { + return { + found: resolved.flowFound, + retried: false, + reason: resolved.reason, + }; + } + const flow = getFlowById(params.flowId); + if (!flow) { + return { + found: false, + retried: false, + reason: "Flow not found.", + previousTask: resolved.latestTask, + }; + } + const task = createTaskRecord({ + runtime: resolved.latestTask.runtime, + sourceId: params.sourceId ?? resolved.latestTask.sourceId, + ownerKey: flow.ownerKey, + scopeKind: "session", + requesterOrigin: params.requesterOrigin ?? flow.requesterOrigin, + parentFlowId: flow.flowId, + childSessionKey: params.childSessionKey, + parentTaskId: resolved.latestTask.taskId, + agentId: params.agentId ?? resolved.latestTask.agentId, + runId: params.runId, + label: params.label ?? resolved.latestTask.label, + task: params.task ?? resolved.latestTask.task, + preferMetadata: params.preferMetadata, + notifyPolicy: params.notifyPolicy ?? resolved.latestTask.notifyPolicy, + deliveryStatus: params.deliveryStatus ?? "pending", + status: params.status, + startedAt: params.startedAt, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + }); + return { + found: true, + retried: true, + previousTask: resolved.latestTask, + task, + }; +} + +export function retryBlockedFlowAsQueuedTaskRun( + params: Omit, +): RetryBlockedFlowResult { + return retryBlockedFlowTask({ + ...params, + status: "queued", + }); +} + +export function retryBlockedFlowAsRunningTaskRun( + params: Omit, +): RetryBlockedFlowResult { + return retryBlockedFlowTask({ + ...params, + status: "running", + }); +} + +type CancelFlowResult = { + found: boolean; + cancelled: boolean; + reason?: string; + flow?: FlowRecord; + tasks?: TaskRecord[]; +}; + +function isActiveTaskStatus(status: TaskStatus): boolean { + return status === "queued" || status === "running"; +} + +function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { + return ( + status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost" + ); +} + +export async function cancelFlowById(params: { + cfg: OpenClawConfig; + flowId: string; +}): Promise { + const flow = getFlowById(params.flowId); + if (!flow) { + return { + found: false, + cancelled: false, + reason: "Flow not found.", + }; + } + const linkedTasks = listTasksForFlowId(flow.flowId); + const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status)); + for (const task of activeTasks) { + await cancelTaskById({ + cfg: params.cfg, + taskId: task.taskId, + }); + } + const refreshedTasks = listTasksForFlowId(flow.flowId); + const remainingActive = refreshedTasks.filter((task) => isActiveTaskStatus(task.status)); + if (remainingActive.length > 0) { + return { + found: true, + cancelled: false, + reason: "One or more child tasks are still active.", + flow: getFlowById(flow.flowId), + tasks: refreshedTasks, + }; + } + if (isTerminalFlowStatus(flow.status)) { + return { + found: true, + cancelled: false, + reason: `Flow is already ${flow.status}.`, + flow, + tasks: refreshedTasks, + }; + } + const now = Date.now(); + const refreshedFlow = getFlowById(flow.flowId) ?? flow; + const updatedFlowResult = updateFlowRecordByIdExpectedRevision({ + flowId: refreshedFlow.flowId, + expectedRevision: refreshedFlow.revision, + patch: { + status: "cancelled", + blockedTaskId: null, + blockedSummary: null, + endedAt: now, + updatedAt: now, + }, + }); + if (!updatedFlowResult.applied) { + return { + found: true, + cancelled: false, + reason: + updatedFlowResult.reason === "revision_conflict" + ? "Flow changed while cancellation was in progress." + : "Flow not found.", + flow: updatedFlowResult.current ?? getFlowById(flow.flowId), + tasks: refreshedTasks, + }; + } + return { + found: true, + cancelled: true, + flow: updatedFlowResult.flow, + tasks: refreshedTasks, + }; +} + +export async function cancelFlowByIdForOwner(params: { + cfg: OpenClawConfig; + flowId: string; + callerOwnerKey: string; +}): Promise { + const flow = getFlowByIdForOwner({ + flowId: params.flowId, + callerOwnerKey: params.callerOwnerKey, + }); + if (!flow) { + return { + found: false, + cancelled: false, + reason: "Flow not found.", + }; + } + return cancelFlowById({ + cfg: params.cfg, + flowId: flow.flowId, + }); +} + export async function cancelDetachedTaskRunById(params: { cfg: OpenClawConfig; taskId: string }) { return cancelTaskById(params); } diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index f7aa1980454..dbd0ea729ba 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -13,6 +13,7 @@ type TaskRegistryRow = { owner_key: string; scope_kind: TaskRecord["scopeKind"]; child_session_key: string | null; + parent_flow_id: string | null; parent_task_id: string | null; agent_id: string | null; run_id: string | null; @@ -99,6 +100,7 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { ownerKey: row.owner_key, scopeKind: row.scope_kind, ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), + ...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}), ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), ...(row.agent_id ? { agentId: row.agent_id } : {}), ...(row.run_id ? { runId: row.run_id } : {}), @@ -137,6 +139,7 @@ function bindTaskRecord(record: TaskRecord) { owner_key: record.ownerKey, scope_kind: record.scopeKind, child_session_key: record.childSessionKey ?? null, + parent_flow_id: record.parentFlowId ?? null, parent_task_id: record.parentTaskId ?? null, agent_id: record.agentId ?? null, run_id: record.runId ?? null, @@ -175,6 +178,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { owner_key, scope_kind, child_session_key, + parent_flow_id, parent_task_id, agent_id, run_id, @@ -211,6 +215,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { owner_key, scope_kind, child_session_key, + parent_flow_id, parent_task_id, agent_id, run_id, @@ -235,6 +240,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @owner_key, @scope_kind, @child_session_key, + @parent_flow_id, @parent_task_id, @agent_id, @run_id, @@ -259,6 +265,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { owner_key = excluded.owner_key, scope_kind = excluded.scope_kind, child_session_key = excluded.child_session_key, + parent_flow_id = excluded.parent_flow_id, parent_task_id = excluded.parent_task_id, agent_id = excluded.agent_id, run_id = excluded.run_id, @@ -340,6 +347,7 @@ function ensureSchema(db: DatabaseSync) { owner_key TEXT NOT NULL, scope_kind TEXT NOT NULL, child_session_key TEXT, + parent_flow_id TEXT, parent_task_id TEXT, agent_id TEXT, run_id TEXT, @@ -360,6 +368,9 @@ function ensureSchema(db: DatabaseSync) { ); `); migrateLegacyOwnerColumns(db); + if (!hasTaskRunsColumn(db, "parent_flow_id")) { + db.exec(`ALTER TABLE task_runs ADD COLUMN parent_flow_id TEXT;`); + } db.exec(` CREATE TABLE IF NOT EXISTS task_delivery_state ( task_id TEXT PRIMARY KEY, @@ -373,6 +384,7 @@ function ensureSchema(db: DatabaseSync) { db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`); db.exec( `CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`, ); diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index 61653e30201..7b9019cdc3c 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -194,6 +194,27 @@ describe("task-registry store runtime", () => { }); }); + it("persists parentFlowId with task rows", () => { + const created = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: "flow-123", + childSessionKey: "agent:codex:acp:new", + runId: "run-flow-linked", + task: "Linked task", + status: "running", + deliveryStatus: "pending", + }); + + resetTaskRegistryForTests({ persist: false }); + + expect(findTaskByRunId("run-flow-linked")).toMatchObject({ + taskId: created.taskId, + parentFlowId: "flow-123", + }); + }); + it("hardens the sqlite task store directory and file modes", () => { if (process.platform === "win32") { return; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 82543715f41..8b246013a02 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -11,6 +11,7 @@ import { } from "../infra/heartbeat-wake.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; +import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js"; import { createTaskRecord, findLatestTaskForOwnerKey, @@ -20,6 +21,7 @@ import { getTaskRegistrySummary, listTasksForOwnerKey, listTaskRecords, + linkTaskToFlowById, maybeDeliverTaskStateChangeUpdate, maybeDeliverTaskTerminalUpdate, markTaskRunningByRunId, @@ -190,6 +192,7 @@ describe("task-registry", () => { resetHeartbeatWakeStateForTests(); resetAgentRunContextForTest(); resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -293,6 +296,89 @@ describe("task-registry", () => { }); }); + it("rejects cross-owner parent flow links during task creation", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-registry", + goal: "Owner main flow", + }); + + expect(() => + createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:other", + scopeKind: "session", + parentFlowId: flow.flowId, + runId: "cross-owner-run", + task: "Attempt hijack", + }), + ).toThrow("Task ownerKey must match parent flow ownerKey."); + }); + }); + + it("rejects system-scoped parent flow links during task creation", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-registry", + goal: "Owner main flow", + }); + + expect(() => + createTaskRecord({ + runtime: "cron", + ownerKey: "agent:main:main", + scopeKind: "system", + parentFlowId: flow.flowId, + runId: "system-link-run", + task: "System task", + deliveryStatus: "not_applicable", + }), + ).toThrow("Only session-scoped tasks can link to flows."); + }); + }); + + it("rejects cross-owner flow links for existing tasks", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + + const task = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + runId: "owner-main-task", + task: "Safe task", + }); + const flow = createManagedFlow({ + ownerKey: "agent:main:other", + controllerId: "tests/task-registry", + goal: "Other owner flow", + }); + + expect(() => + linkTaskToFlowById({ + taskId: task.taskId, + flowId: flow.flowId, + }), + ).toThrow("Task ownerKey must match parent flow ownerKey."); + expect(getTaskById(task.taskId)).toMatchObject({ + taskId: task.taskId, + parentFlowId: undefined, + }); + }); + }); + it("delivers ACP completion to the requester channel when a delivery origin exists", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index c3e82b443cd..7c087454fb5 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -9,6 +9,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { getFlowById, syncFlowFromTask } from "./flow-runtime-internal.js"; import { formatTaskBlockedFollowupMessage, formatTaskStateChangeMessage, @@ -47,6 +48,7 @@ const tasks = new Map(); const taskDeliveryStates = new Map(); const taskIdsByRunId = new Map>(); const taskIdsByOwnerKey = new Map>(); +const taskIdsByParentFlowId = new Map>(); const taskIdsByRelatedSessionKey = new Map>(); const tasksWithPendingDelivery = new Set(); let listenerStarted = false; @@ -58,6 +60,7 @@ let deliveryRuntimePromise: Promise) deleteIndexedKey(taskIdsByOwnerKey, key, taskId); } +function addParentFlowIdIndex(taskId: string, task: Pick) { + const key = task.parentFlowId?.trim(); + if (!key) { + return; + } + addIndexedKey(taskIdsByParentFlowId, key, taskId); +} + +function deleteParentFlowIdIndex(taskId: string, task: Pick) { + const key = task.parentFlowId?.trim(); + if (!key) { + return; + } + deleteIndexedKey(taskIdsByParentFlowId, key, taskId); +} + function addRelatedSessionKeyIndex( taskId: string, task: Pick, @@ -370,6 +415,13 @@ function rebuildOwnerKeyIndex() { } } +function rebuildParentFlowIdIndex() { + taskIdsByParentFlowId.clear(); + for (const [taskId, task] of tasks.entries()) { + addParentFlowIdIndex(taskId, task); + } +} + function rebuildRelatedSessionKeyIndex() { taskIdsByRelatedSessionKey.clear(); for (const [taskId, task] of tasks.entries()) { @@ -473,6 +525,7 @@ function findExistingTaskForCreate(params: { ownerKey: string; scopeKind: TaskScopeKind; childSessionKey?: string; + parentFlowId?: string; runId?: string; label?: string; task: string; @@ -485,7 +538,9 @@ function findExistingTaskForCreate(params: { task.scopeKind === params.scopeKind && normalizeComparableText(task.ownerKey) === normalizeComparableText(params.ownerKey) && normalizeComparableText(task.childSessionKey) === - normalizeComparableText(params.childSessionKey), + normalizeComparableText(params.childSessionKey) && + normalizeComparableText(task.parentFlowId) === + normalizeComparableText(params.parentFlowId), ) : []; const exact = runId @@ -512,6 +567,7 @@ function mergeExistingTaskForCreate( params: { requesterOrigin?: TaskDeliveryState["requesterOrigin"]; sourceId?: string; + parentFlowId?: string; parentTaskId?: string; agentId?: string; label?: string; @@ -534,6 +590,14 @@ function mergeExistingTaskForCreate( if (params.sourceId?.trim() && !existing.sourceId?.trim()) { patch.sourceId = params.sourceId.trim(); } + if (params.parentFlowId?.trim() && !existing.parentFlowId?.trim()) { + assertParentFlowLinkAllowed({ + ownerKey: existing.ownerKey, + scopeKind: existing.scopeKind, + parentFlowId: params.parentFlowId, + }); + patch.parentFlowId = params.parentFlowId.trim(); + } if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) { patch.parentTaskId = params.parentTaskId.trim(); } @@ -580,14 +644,47 @@ function resolveTaskStateChangeIdempotencyKey(params: { latestEvent: TaskEventRecord; owner: TaskDeliveryOwner; }): string { + if (params.owner.flowId) { + return `flow-event:${params.owner.flowId}:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`; + } return `task-event:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`; } function resolveTaskTerminalIdempotencyKey(task: TaskRecord): string { + const owner = resolveTaskDeliveryOwner(task); + if (owner.flowId) { + const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default"; + return `flow-terminal:${owner.flowId}:${task.taskId}:${task.status}:${outcome}`; + } return taskTerminalDeliveryIdempotencyKey(task); } +function getLinkedFlowForDelivery(task: TaskRecord) { + const flowId = task.parentFlowId?.trim(); + if (!flowId || task.scopeKind !== "session") { + return undefined; + } + const flow = getFlowById(flowId); + if (!flow) { + return undefined; + } + if (normalizeOwnerKey(flow.ownerKey) !== normalizeOwnerKey(task.ownerKey)) { + return undefined; + } + return flow; +} + function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner { + const flow = getLinkedFlowForDelivery(task); + if (flow) { + return { + sessionKey: flow.ownerKey.trim(), + requesterOrigin: normalizeDeliveryContext( + flow.requesterOrigin ?? taskDeliveryStates.get(task.taskId)?.requesterOrigin, + ), + flowId: flow.flowId, + }; + } if (task.scopeKind !== "session") { return {}; } @@ -615,6 +712,7 @@ function restoreTaskRegistryOnce() { } rebuildRunIdIndex(); rebuildOwnerKeyIndex(); + rebuildParentFlowIdIndex(); rebuildRelatedSessionKeyIndex(); emitTaskRegistryHookEvent(() => ({ kind: "restored", @@ -644,6 +742,7 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu normalizeSessionIndexKey(current.ownerKey) !== normalizeSessionIndexKey(next.ownerKey) || normalizeSessionIndexKey(current.childSessionKey) !== normalizeSessionIndexKey(next.childSessionKey); + const parentFlowIndexChanged = current.parentFlowId?.trim() !== next.parentFlowId?.trim(); tasks.set(taskId, next); if (patch.runId && patch.runId !== current.runId) { rebuildRunIdIndex(); @@ -654,7 +753,20 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu deleteRelatedSessionKeyIndex(taskId, current); addRelatedSessionKeyIndex(taskId, next); } + if (parentFlowIndexChanged) { + deleteParentFlowIdIndex(taskId, current); + addParentFlowIdIndex(taskId, next); + } persistTaskUpsert(next); + try { + syncFlowFromTask(next); + } catch (error) { + log.warn("Failed to sync parent flow from task update", { + taskId, + flowId: next.parentFlowId, + error, + }); + } emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(next), @@ -1107,6 +1219,7 @@ export function createTaskRecord(params: { scopeKind?: TaskScopeKind; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; childSessionKey?: string; + parentFlowId?: string; parentTaskId?: string; agentId?: string; runId?: string; @@ -1137,11 +1250,17 @@ export function createTaskRecord(params: { ownerKey, scopeKind, }); + assertParentFlowLinkAllowed({ + ownerKey, + scopeKind, + parentFlowId: params.parentFlowId, + }); const existing = findExistingTaskForCreate({ runtime: params.runtime, ownerKey, scopeKind, childSessionKey: params.childSessionKey, + parentFlowId: params.parentFlowId, runId: params.runId, label: params.label, task: params.task, @@ -1173,6 +1292,7 @@ export function createTaskRecord(params: { ownerKey, scopeKind, childSessionKey: params.childSessionKey, + parentFlowId: params.parentFlowId?.trim() || undefined, parentTaskId: params.parentTaskId?.trim() || undefined, agentId: params.agentId?.trim() || undefined, runId: params.runId?.trim() || undefined, @@ -1203,8 +1323,18 @@ export function createTaskRecord(params: { }); addRunIdIndex(taskId, record.runId); addOwnerKeyIndex(taskId, record); + addParentFlowIdIndex(taskId, record); addRelatedSessionKeyIndex(taskId, record); persistTaskUpsert(record); + try { + syncFlowFromTask(record); + } catch (error) { + log.warn("Failed to sync parent flow from task create", { + taskId: record.taskId, + flowId: record.parentFlowId, + error, + }); + } emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(record), @@ -1400,6 +1530,29 @@ export function updateTaskNotifyPolicyById(params: { }); } +export function linkTaskToFlowById(params: { taskId: string; flowId: string }): TaskRecord | null { + ensureTaskRegistryReady(); + const flowId = params.flowId.trim(); + if (!flowId) { + return null; + } + const current = tasks.get(params.taskId); + if (!current) { + return null; + } + if (current.parentFlowId?.trim()) { + return cloneTaskRecord(current); + } + assertParentFlowLinkAllowed({ + ownerKey: current.ownerKey, + scopeKind: current.scopeKind, + parentFlowId: flowId, + }); + return updateTask(params.taskId, { + parentFlowId: flowId, + }); +} + export async function cancelTaskById(params: { cfg: OpenClawConfig; taskId: string; @@ -1567,6 +1720,11 @@ export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefi return task ? cloneTaskRecord(task) : undefined; } +export function findLatestTaskForFlowId(flowId: string): TaskRecord | undefined { + const task = listTasksForFlowId(flowId)[0]; + return task ? cloneTaskRecord(task) : undefined; +} + export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] { ensureTaskRegistryReady(); const key = normalizeSessionIndexKey(ownerKey); @@ -1576,6 +1734,15 @@ export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] { return listTasksFromIndex(taskIdsByOwnerKey, key); } +export function listTasksForFlowId(flowId: string): TaskRecord[] { + ensureTaskRegistryReady(); + const key = flowId.trim(); + if (!key) { + return []; + } + return listTasksFromIndex(taskIdsByParentFlowId, key); +} + export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined { const task = listTasksForRelatedSessionKey(sessionKey)[0]; return task ? cloneTaskRecord(task) : undefined; @@ -1607,6 +1774,7 @@ export function deleteTaskRecordById(taskId: string): boolean { return false; } deleteOwnerKeyIndex(taskId, current); + deleteParentFlowIdIndex(taskId, current); deleteRelatedSessionKeyIndex(taskId, current); tasks.delete(taskId); taskDeliveryStates.delete(taskId); @@ -1626,6 +1794,7 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) { taskDeliveryStates.clear(); taskIdsByRunId.clear(); taskIdsByOwnerKey.clear(); + taskIdsByParentFlowId.clear(); taskIdsByRelatedSessionKey.clear(); tasksWithPendingDelivery.clear(); restoreAttempted = false; diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index d31e3d2c884..2f5502375b3 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -58,6 +58,7 @@ export type TaskRecord = { ownerKey: string; scopeKind: TaskScopeKind; childSessionKey?: string; + parentFlowId?: string; parentTaskId?: string; agentId?: string; runId?: string;