diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index 0bb59fb12ad..9fa2abfdedf 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -338,6 +338,27 @@ describe("registerStatusHealthSessionsCommands", () => { ); }); + it("routes tasks flow commands through the TaskFlow handlers", async () => { + await runCli(["tasks", "flow", "list", "--json", "--status", "blocked"]); + expect(flowsListCommand).toHaveBeenCalledWith(expect.any(Object), runtime); + + await runCli(["tasks", "flow", "show", "flow-123", "--json"]); + expect(flowsShowCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "flow-123", + }), + runtime, + ); + + await runCli(["tasks", "flow", "cancel", "flow-123"]); + expect(flowsCancelCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "flow-123", + }), + runtime, + ); + }); + it("runs tasks notify subcommand with lookup and policy forwarding", async () => { await runCli(["tasks", "notify", "run-123", "state_changes"]); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 75ab6bd8418..fd93720c603 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -225,7 +225,7 @@ export function registerStatusHealthSessionsCommands(program: Command) { const tasksCmd = program .command("tasks") - .description("Inspect durable background task state") + .description("Inspect durable background tasks and TaskFlow state") .option("--json", "Output as JSON", false) .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") .option( @@ -277,12 +277,12 @@ export function registerStatusHealthSessionsCommands(program: Command) { tasksCmd .command("audit") - .description("Show stale or broken background task runs") + .description("Show stale or broken background tasks and TaskFlows") .option("--json", "Output as JSON", false) .option("--severity ", "Filter by severity (warn, error)") .option( "--code ", - "Filter by finding code (stale_queued, stale_running, lost, delivery_failed, missing_cleanup, inconsistent_timestamps)", + "Filter by finding code (stale_queued, stale_running, lost, delivery_failed, missing_cleanup, inconsistent_timestamps, restore_failed, stale_waiting, stale_blocked, cancel_stuck, missing_linked_tasks, blocked_task_missing)", ) .option("--limit ", "Limit displayed findings") .action(async (opts, command) => { @@ -299,6 +299,12 @@ export function registerStatusHealthSessionsCommands(program: Command) { | "delivery_failed" | "missing_cleanup" | "inconsistent_timestamps" + | "restore_failed" + | "stale_waiting" + | "stale_blocked" + | "cancel_stuck" + | "missing_linked_tasks" + | "blocked_task_missing" | undefined, limit: parsePositiveIntOrUndefined(opts.limit), }, @@ -309,7 +315,7 @@ export function registerStatusHealthSessionsCommands(program: Command) { tasksCmd .command("maintenance") - .description("Preview or apply task ledger maintenance") + .description("Preview or apply tasks and TaskFlow maintenance") .option("--json", "Output as JSON", false) .option("--apply", "Apply reconciliation, cleanup stamping, and pruning", false) .action(async (opts, command) => { @@ -375,9 +381,65 @@ export function registerStatusHealthSessionsCommands(program: Command) { }); }); + const tasksFlowCmd = tasksCmd + .command("flow") + .description("Inspect durable TaskFlow state under tasks"); + + tasksFlowCmd + .command("list") + .description("List tracked TaskFlows") + .option("--json", "Output as JSON", false) + .option( + "--status ", + "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", + ) + .action(async (opts) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsListCommand( + { + json: Boolean(opts.json), + status: opts.status as string | undefined, + }, + defaultRuntime, + ); + }); + }); + + tasksFlowCmd + .command("show") + .description("Show one TaskFlow by flow id or owner key") + .argument("", "Flow id or owner key") + .option("--json", "Output as JSON", false) + .action(async (lookup, opts) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsShowCommand( + { + lookup, + json: Boolean(opts.json), + }, + defaultRuntime, + ); + }); + }); + + tasksFlowCmd + .command("cancel") + .description("Cancel a running TaskFlow") + .argument("", "Flow id or owner key") + .action(async (lookup) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsCancelCommand( + { + lookup, + }, + defaultRuntime, + ); + }); + }); + const flowsCmd = program .command("flows") - .description("Inspect durable background flow state") + .description("Inspect durable TaskFlow state (alias for `openclaw tasks flow`)") .option("--json", "Output as JSON", false) .option( "--status ", diff --git a/src/commands/doctor-workspace-status.test.ts b/src/commands/doctor-workspace-status.test.ts index c933572fc6c..6886d8ee8cb 100644 --- a/src/commands/doctor-workspace-status.test.ts +++ b/src/commands/doctor-workspace-status.test.ts @@ -200,7 +200,7 @@ describe("noteWorkspaceStatus", () => { const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "TaskFlow recovery"); expect(recoveryCalls).toHaveLength(1); expect(String(recoveryCalls[0]?.[0])).toContain("flow-123"); - expect(String(recoveryCalls[0]?.[0])).toContain("openclaw flows show "); + expect(String(recoveryCalls[0]?.[0])).toContain("openclaw tasks flow show "); } finally { noteSpy.mockRestore(); } diff --git a/src/commands/doctor-workspace-status.ts b/src/commands/doctor-workspace-status.ts index 250a7742a40..44f267a0b82 100644 --- a/src/commands/doctor-workspace-status.ts +++ b/src/commands/doctor-workspace-status.ts @@ -40,8 +40,8 @@ function noteFlowRecoveryHints() { [ ...suspicious.slice(0, 5), suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null, - `Inspect: ${formatCliCommand("openclaw flows show ")}`, - `Cancel: ${formatCliCommand("openclaw flows cancel ")}`, + `Inspect: ${formatCliCommand("openclaw tasks flow show ")}`, + `Cancel: ${formatCliCommand("openclaw tasks flow cancel ")}`, ] .filter((line): line is string => Boolean(line)) .join("\n"), diff --git a/src/plugins/runtime/index.test.ts b/src/plugins/runtime/index.test.ts index 470e7086e89..cd5494037e5 100644 --- a/src/plugins/runtime/index.test.ts +++ b/src/plugins/runtime/index.test.ts @@ -190,12 +190,13 @@ describe("plugin runtime command execution", () => { }, }, { - name: "exposes runtime.taskFlow binding helpers", + name: "exposes runtime.tasks.flow as the canonical TaskFlow runtime and keeps runtime.taskFlow as an alias", assert: (runtime: ReturnType) => { - expectFunctionKeys(runtime.taskFlow as Record, [ + expectFunctionKeys(runtime.tasks.flow as Record, [ "bindSession", "fromToolContext", ]); + expect(runtime.taskFlow).toBe(runtime.tasks.flow); }, }, { diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index 850a2adf1f1..52585ece13a 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -184,6 +184,7 @@ export type CreatePluginRuntimeOptions = { export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): PluginRuntime { const mediaUnderstanding = createRuntimeMediaUnderstandingFacade(); + const taskFlow = createRuntimeTaskFlow(); const runtime = { // Sourced from the shared OpenClaw version resolver (#52899) so plugins // always see the same version the CLI reports, avoiding API-version drift. @@ -204,7 +205,10 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): events: createRuntimeEvents(), logging: createRuntimeLogging(), state: { resolveStateDir }, - taskFlow: createRuntimeTaskFlow(), + tasks: { + flow: taskFlow, + }, + taskFlow, } satisfies Omit< PluginRuntime, "tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration" diff --git a/src/plugins/runtime/types-core.ts b/src/plugins/runtime/types-core.ts index 388d9cdb837..25a654693a9 100644 --- a/src/plugins/runtime/types-core.ts +++ b/src/plugins/runtime/types-core.ts @@ -103,6 +103,9 @@ export type PluginRuntimeCore = { state: { resolveStateDir: typeof import("../../config/paths.js").resolveStateDir; }; + tasks: { + flow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow; + }; taskFlow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow; modelAuth: { /** Resolve auth for a model. Only provider/model and optional cfg are used. */ diff --git a/test/helpers/plugins/plugin-runtime-mock.ts b/test/helpers/plugins/plugin-runtime-mock.ts index 6c57dd2534d..c0e2c0ce30e 100644 --- a/test/helpers/plugins/plugin-runtime-mock.ts +++ b/test/helpers/plugins/plugin-runtime-mock.ts @@ -34,6 +34,40 @@ function mergeDeep(base: T, overrides: DeepPartial): T { } export function createPluginRuntimeMock(overrides: DeepPartial = {}): PluginRuntime { + const taskFlow = { + bindSession: vi.fn(() => ({ + sessionKey: "agent:main:main", + createManaged: vi.fn(), + get: vi.fn(), + list: vi.fn(() => []), + findLatest: vi.fn(), + resolve: vi.fn(), + getTaskSummary: vi.fn(), + setWaiting: vi.fn(), + resume: vi.fn(), + finish: vi.fn(), + fail: vi.fn(), + requestCancel: vi.fn(), + cancel: vi.fn(), + runTask: vi.fn(), + })) as unknown as PluginRuntime["taskFlow"]["bindSession"], + fromToolContext: vi.fn(() => ({ + sessionKey: "agent:main:main", + createManaged: vi.fn(), + get: vi.fn(), + list: vi.fn(() => []), + findLatest: vi.fn(), + resolve: vi.fn(), + getTaskSummary: vi.fn(), + setWaiting: vi.fn(), + resume: vi.fn(), + finish: vi.fn(), + fail: vi.fn(), + requestCancel: vi.fn(), + cancel: vi.fn(), + runTask: vi.fn(), + })) as unknown as PluginRuntime["taskFlow"]["fromToolContext"], + }; const base: PluginRuntime = { version: "1.0.0-test", config: { @@ -324,40 +358,10 @@ export function createPluginRuntimeMock(overrides: DeepPartial = state: { resolveStateDir: vi.fn(() => "/tmp/openclaw"), }, - taskFlow: { - bindSession: vi.fn(() => ({ - sessionKey: "agent:main:main", - createManaged: vi.fn(), - get: vi.fn(), - list: vi.fn(() => []), - findLatest: vi.fn(), - resolve: vi.fn(), - getTaskSummary: vi.fn(), - setWaiting: vi.fn(), - resume: vi.fn(), - finish: vi.fn(), - fail: vi.fn(), - requestCancel: vi.fn(), - cancel: vi.fn(), - runTask: vi.fn(), - })) as unknown as PluginRuntime["taskFlow"]["bindSession"], - fromToolContext: vi.fn(() => ({ - sessionKey: "agent:main:main", - createManaged: vi.fn(), - get: vi.fn(), - list: vi.fn(() => []), - findLatest: vi.fn(), - resolve: vi.fn(), - getTaskSummary: vi.fn(), - setWaiting: vi.fn(), - resume: vi.fn(), - finish: vi.fn(), - fail: vi.fn(), - requestCancel: vi.fn(), - cancel: vi.fn(), - runTask: vi.fn(), - })) as unknown as PluginRuntime["taskFlow"]["fromToolContext"], + tasks: { + flow: taskFlow, }, + taskFlow, modelAuth: { getApiKeyForModel: vi.fn() as unknown as PluginRuntime["modelAuth"]["getApiKeyForModel"], resolveApiKeyForProvider: