refactor(tasks): move task-flow ownership under tasks

This commit is contained in:
Vincent Koc
2026-04-02 21:36:56 +09:00
parent 0f45630d19
commit 6f91f87f3b
8 changed files with 139 additions and 44 deletions

View File

@@ -338,6 +338,27 @@ describe("registerStatusHealthSessionsCommands", () => {
);
});
it("routes tasks flow commands through the TaskFlow handlers", async () => {
await runCli(["tasks", "flow", "list", "--json", "--status", "blocked"]);
expect(flowsListCommand).toHaveBeenCalledWith(expect.any(Object), runtime);
await runCli(["tasks", "flow", "show", "flow-123", "--json"]);
expect(flowsShowCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "flow-123",
}),
runtime,
);
await runCli(["tasks", "flow", "cancel", "flow-123"]);
expect(flowsCancelCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "flow-123",
}),
runtime,
);
});
it("runs tasks notify subcommand with lookup and policy forwarding", async () => {
await runCli(["tasks", "notify", "run-123", "state_changes"]);

View File

@@ -225,7 +225,7 @@ export function registerStatusHealthSessionsCommands(program: Command) {
const tasksCmd = program
.command("tasks")
.description("Inspect durable background task state")
.description("Inspect durable background tasks and TaskFlow state")
.option("--json", "Output as JSON", false)
.option("--runtime <name>", "Filter by kind (subagent, acp, cron, cli)")
.option(
@@ -277,12 +277,12 @@ export function registerStatusHealthSessionsCommands(program: Command) {
tasksCmd
.command("audit")
.description("Show stale or broken background task runs")
.description("Show stale or broken background tasks and TaskFlows")
.option("--json", "Output as JSON", false)
.option("--severity <level>", "Filter by severity (warn, error)")
.option(
"--code <name>",
"Filter by finding code (stale_queued, stale_running, lost, delivery_failed, missing_cleanup, inconsistent_timestamps)",
"Filter by finding code (stale_queued, stale_running, lost, delivery_failed, missing_cleanup, inconsistent_timestamps, restore_failed, stale_waiting, stale_blocked, cancel_stuck, missing_linked_tasks, blocked_task_missing)",
)
.option("--limit <n>", "Limit displayed findings")
.action(async (opts, command) => {
@@ -299,6 +299,12 @@ export function registerStatusHealthSessionsCommands(program: Command) {
| "delivery_failed"
| "missing_cleanup"
| "inconsistent_timestamps"
| "restore_failed"
| "stale_waiting"
| "stale_blocked"
| "cancel_stuck"
| "missing_linked_tasks"
| "blocked_task_missing"
| undefined,
limit: parsePositiveIntOrUndefined(opts.limit),
},
@@ -309,7 +315,7 @@ export function registerStatusHealthSessionsCommands(program: Command) {
tasksCmd
.command("maintenance")
.description("Preview or apply task ledger maintenance")
.description("Preview or apply tasks and TaskFlow maintenance")
.option("--json", "Output as JSON", false)
.option("--apply", "Apply reconciliation, cleanup stamping, and pruning", false)
.action(async (opts, command) => {
@@ -375,9 +381,65 @@ export function registerStatusHealthSessionsCommands(program: Command) {
});
});
const tasksFlowCmd = tasksCmd
.command("flow")
.description("Inspect durable TaskFlow state under tasks");
tasksFlowCmd
.command("list")
.description("List tracked TaskFlows")
.option("--json", "Output as JSON", false)
.option(
"--status <name>",
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
)
.action(async (opts) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsListCommand(
{
json: Boolean(opts.json),
status: opts.status as string | undefined,
},
defaultRuntime,
);
});
});
tasksFlowCmd
.command("show")
.description("Show one TaskFlow by flow id or owner key")
.argument("<lookup>", "Flow id or owner key")
.option("--json", "Output as JSON", false)
.action(async (lookup, opts) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsShowCommand(
{
lookup,
json: Boolean(opts.json),
},
defaultRuntime,
);
});
});
tasksFlowCmd
.command("cancel")
.description("Cancel a running TaskFlow")
.argument("<lookup>", "Flow id or owner key")
.action(async (lookup) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsCancelCommand(
{
lookup,
},
defaultRuntime,
);
});
});
const flowsCmd = program
.command("flows")
.description("Inspect durable background flow state")
.description("Inspect durable TaskFlow state (alias for `openclaw tasks flow`)")
.option("--json", "Output as JSON", false)
.option(
"--status <name>",

View File

@@ -200,7 +200,7 @@ describe("noteWorkspaceStatus", () => {
const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "TaskFlow recovery");
expect(recoveryCalls).toHaveLength(1);
expect(String(recoveryCalls[0]?.[0])).toContain("flow-123");
expect(String(recoveryCalls[0]?.[0])).toContain("openclaw flows show <flow-id>");
expect(String(recoveryCalls[0]?.[0])).toContain("openclaw tasks flow show <flow-id>");
} finally {
noteSpy.mockRestore();
}

View File

@@ -40,8 +40,8 @@ function noteFlowRecoveryHints() {
[
...suspicious.slice(0, 5),
suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null,
`Inspect: ${formatCliCommand("openclaw flows show <flow-id>")}`,
`Cancel: ${formatCliCommand("openclaw flows cancel <flow-id>")}`,
`Inspect: ${formatCliCommand("openclaw tasks flow show <flow-id>")}`,
`Cancel: ${formatCliCommand("openclaw tasks flow cancel <flow-id>")}`,
]
.filter((line): line is string => Boolean(line))
.join("\n"),

View File

@@ -190,12 +190,13 @@ describe("plugin runtime command execution", () => {
},
},
{
name: "exposes runtime.taskFlow binding helpers",
name: "exposes runtime.tasks.flow as the canonical TaskFlow runtime and keeps runtime.taskFlow as an alias",
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {
expectFunctionKeys(runtime.taskFlow as Record<string, unknown>, [
expectFunctionKeys(runtime.tasks.flow as Record<string, unknown>, [
"bindSession",
"fromToolContext",
]);
expect(runtime.taskFlow).toBe(runtime.tasks.flow);
},
},
{

View File

@@ -184,6 +184,7 @@ export type CreatePluginRuntimeOptions = {
export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): PluginRuntime {
const mediaUnderstanding = createRuntimeMediaUnderstandingFacade();
const taskFlow = createRuntimeTaskFlow();
const runtime = {
// Sourced from the shared OpenClaw version resolver (#52899) so plugins
// always see the same version the CLI reports, avoiding API-version drift.
@@ -204,7 +205,10 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}):
events: createRuntimeEvents(),
logging: createRuntimeLogging(),
state: { resolveStateDir },
taskFlow: createRuntimeTaskFlow(),
tasks: {
flow: taskFlow,
},
taskFlow,
} satisfies Omit<
PluginRuntime,
"tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration"

View File

@@ -103,6 +103,9 @@ export type PluginRuntimeCore = {
state: {
resolveStateDir: typeof import("../../config/paths.js").resolveStateDir;
};
tasks: {
flow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
};
taskFlow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
modelAuth: {
/** Resolve auth for a model. Only provider/model and optional cfg are used. */

View File

@@ -34,6 +34,40 @@ function mergeDeep<T>(base: T, overrides: DeepPartial<T>): T {
}
export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> = {}): PluginRuntime {
const taskFlow = {
bindSession: vi.fn(() => ({
sessionKey: "agent:main:main",
createManaged: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn(),
resume: vi.fn(),
finish: vi.fn(),
fail: vi.fn(),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
})) as unknown as PluginRuntime["taskFlow"]["bindSession"],
fromToolContext: vi.fn(() => ({
sessionKey: "agent:main:main",
createManaged: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn(),
resume: vi.fn(),
finish: vi.fn(),
fail: vi.fn(),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
})) as unknown as PluginRuntime["taskFlow"]["fromToolContext"],
};
const base: PluginRuntime = {
version: "1.0.0-test",
config: {
@@ -324,40 +358,10 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
state: {
resolveStateDir: vi.fn(() => "/tmp/openclaw"),
},
taskFlow: {
bindSession: vi.fn(() => ({
sessionKey: "agent:main:main",
createManaged: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn(),
resume: vi.fn(),
finish: vi.fn(),
fail: vi.fn(),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
})) as unknown as PluginRuntime["taskFlow"]["bindSession"],
fromToolContext: vi.fn(() => ({
sessionKey: "agent:main:main",
createManaged: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn(),
resume: vi.fn(),
finish: vi.fn(),
fail: vi.fn(),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
})) as unknown as PluginRuntime["taskFlow"]["fromToolContext"],
tasks: {
flow: taskFlow,
},
taskFlow,
modelAuth: {
getApiKeyForModel: vi.fn() as unknown as PluginRuntime["modelAuth"]["getApiKeyForModel"],
resolveApiKeyForProvider: