diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index 9fa2abfdedf..947a2017fbc 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -381,4 +381,21 @@ describe("registerStatusHealthSessionsCommands", () => { runtime, ); }); + + it("uses TaskFlow wording for the alias command help", () => { + const program = new Command(); + registerStatusHealthSessionsCommands(program); + + const flowsCommand = program.commands.find((command) => command.name() === "flows"); + expect(flowsCommand?.description()).toContain("TaskFlow"); + expect(flowsCommand?.commands.find((command) => command.name() === "list")?.description()).toBe( + "List tracked TaskFlows", + ); + expect(flowsCommand?.commands.find((command) => command.name() === "show")?.description()).toBe( + "Show one TaskFlow by flow id or owner key", + ); + expect( + flowsCommand?.commands.find((command) => command.name() === "cancel")?.description(), + ).toBe("Cancel a running TaskFlow"); + }); }); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index fd93720c603..214c86945d7 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -460,7 +460,7 @@ export function registerStatusHealthSessionsCommands(program: Command) { flowsCmd .command("list") - .description("List tracked background flows") + .description("List tracked TaskFlows") .option("--json", "Output as JSON", false) .option( "--status ", @@ -481,7 +481,7 @@ export function registerStatusHealthSessionsCommands(program: Command) { flowsCmd .command("show") - .description("Show one background flow by flow id or owner key") + .description("Show one TaskFlow by flow id or owner key") .argument("", "Flow id or owner key") .option("--json", "Output as JSON", false) .action(async (lookup, opts, command) => { @@ -499,7 +499,7 @@ export function registerStatusHealthSessionsCommands(program: Command) { flowsCmd .command("cancel") - .description("Cancel a running background flow") + .description("Cancel a running TaskFlow") .argument("", "Flow id or owner key") .action(async (lookup) => { await runCommandWithRuntime(defaultRuntime, async () => { diff --git a/src/commands/flows.test.ts b/src/commands/flows.test.ts new file mode 100644 index 00000000000..219be77ab2b --- /dev/null +++ b/src/commands/flows.test.ts @@ -0,0 +1,183 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { RuntimeEnv } from "../runtime.js"; +import { createRunningTaskRun } from "../tasks/task-executor.js"; +import { + createManagedTaskFlow, + resetTaskFlowRegistryForTests, +} from "../tasks/task-flow-registry.js"; +import { + resetTaskRegistryDeliveryRuntimeForTests, + resetTaskRegistryForTests, +} from "../tasks/task-registry.js"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "./flows.js"; + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: vi.fn(() => ({})), + }; +}); + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + +function createRuntime(): RuntimeEnv { + return { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + } as unknown as RuntimeEnv; +} + +async function withTaskFlowCommandStateDir(run: (root: string) => Promise): Promise { + await withTempDir({ prefix: "openclaw-flows-command-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryForTests(); + resetTaskFlowRegistryForTests(); + try { + await run(root); + } finally { + resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryForTests(); + resetTaskFlowRegistryForTests(); + } + }); +} + +describe("flows commands", () => { + afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryForTests(); + resetTaskFlowRegistryForTests(); + }); + + it("lists TaskFlows as JSON with linked tasks and summaries", async () => { + await withTaskFlowCommandStateDir(async () => { + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/flows-command", + goal: "Inspect a PR cluster", + status: "blocked", + blockedSummary: "Waiting on child task", + createdAt: 100, + updatedAt: 100, + }); + + createRunningTaskRun({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: flow.flowId, + childSessionKey: "agent:main:child", + runId: "run-child-1", + label: "Inspect PR 123", + task: "Inspect PR 123", + startedAt: 100, + lastEventAt: 100, + }); + + const runtime = createRuntime(); + await flowsListCommand({ json: true, status: "blocked" }, runtime); + + const payload = JSON.parse(String(vi.mocked(runtime.log).mock.calls[0]?.[0])) as { + count: number; + status: string | null; + flows: Array<{ + flowId: string; + tasks: Array<{ runId?: string; label?: string }>; + taskSummary: { total: number; active: number }; + }>; + }; + + expect(payload).toMatchObject({ + count: 1, + status: "blocked", + flows: [ + { + flowId: flow.flowId, + taskSummary: { + total: 1, + active: 1, + }, + tasks: [ + { + runId: "run-child-1", + label: "Inspect PR 123", + }, + ], + }, + ], + }); + }); + }); + + it("shows one TaskFlow with linked task details in text mode", async () => { + await withTaskFlowCommandStateDir(async () => { + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/flows-command", + goal: "Investigate a flaky queue", + status: "running", + currentStep: "spawn_child", + createdAt: 100, + updatedAt: 100, + }); + + createRunningTaskRun({ + runtime: "subagent", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: flow.flowId, + childSessionKey: "agent:main:child", + runId: "run-child-2", + label: "Collect logs", + task: "Collect logs", + startedAt: 100, + lastEventAt: 100, + }); + + const runtime = createRuntime(); + await flowsShowCommand({ lookup: flow.flowId, json: false }, runtime); + + const output = vi + .mocked(runtime.log) + .mock.calls.map(([line]) => String(line)) + .join("\n"); + expect(output).toContain("TaskFlow:"); + expect(output).toContain(`flowId: ${flow.flowId}`); + expect(output).toContain("currentStep: spawn_child"); + expect(output).toContain("Linked tasks:"); + expect(output).toContain("run-child-2"); + expect(output).toContain("Collect logs"); + }); + }); + + it("cancels a managed TaskFlow with no active children", async () => { + await withTaskFlowCommandStateDir(async () => { + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/flows-command", + goal: "Stop detached work", + status: "running", + createdAt: 100, + updatedAt: 100, + }); + + const runtime = createRuntime(); + await flowsCancelCommand({ lookup: flow.flowId }, runtime); + + expect(vi.mocked(runtime.error)).not.toHaveBeenCalled(); + expect(vi.mocked(runtime.exit)).not.toHaveBeenCalled(); + expect(String(vi.mocked(runtime.log).mock.calls[0]?.[0])).toContain("Cancelled"); + expect(String(vi.mocked(runtime.log).mock.calls[0]?.[0])).toContain(flow.flowId); + expect(String(vi.mocked(runtime.log).mock.calls[0]?.[0])).toContain("cancelled"); + }); + }); +}); diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts new file mode 100644 index 00000000000..f741faf6463 --- /dev/null +++ b/src/commands/tasks.test.ts @@ -0,0 +1,176 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { RuntimeEnv } from "../runtime.js"; +import { createRunningTaskRun } from "../tasks/task-executor.js"; +import { + createManagedTaskFlow, + resetTaskFlowRegistryForTests, +} from "../tasks/task-flow-registry.js"; +import { + resetTaskRegistryDeliveryRuntimeForTests, + resetTaskRegistryForTests, +} from "../tasks/task-registry.js"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { tasksAuditCommand, tasksMaintenanceCommand } from "./tasks.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + +function createRuntime(): RuntimeEnv { + return { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + } as unknown as RuntimeEnv; +} + +async function withTaskCommandStateDir(run: () => Promise): Promise { + await withTempDir({ prefix: "openclaw-tasks-command-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryForTests(); + resetTaskFlowRegistryForTests(); + try { + await run(); + } finally { + resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryForTests(); + resetTaskFlowRegistryForTests(); + } + }); +} + +describe("tasks commands", () => { + beforeEach(() => { + vi.useRealTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryForTests(); + resetTaskFlowRegistryForTests(); + }); + + it("keeps tasks audit JSON stable while adding TaskFlow summary fields", async () => { + await withTaskCommandStateDir(async () => { + const now = Date.now(); + vi.useFakeTimers(); + vi.setSystemTime(now - 40 * 60_000); + createRunningTaskRun({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + runId: "task-stale-queued", + task: "Inspect issue backlog", + }); + vi.setSystemTime(now); + createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/tasks-command", + goal: "Inspect issue backlog", + status: "waiting", + createdAt: now - 40 * 60_000, + updatedAt: now - 40 * 60_000, + }); + + const runtime = createRuntime(); + await tasksAuditCommand({ json: true }, runtime); + + const payload = JSON.parse(String(vi.mocked(runtime.log).mock.calls[0]?.[0])) as { + summary: { + total: number; + errors: number; + warnings: number; + byCode: Record; + taskFlows: { total: number; byCode: Record }; + combined: { total: number; errors: number; warnings: number }; + }; + }; + + expect(payload.summary.byCode.stale_running).toBe(1); + expect(payload.summary.taskFlows.byCode.stale_waiting).toBe(1); + expect(payload.summary.taskFlows.byCode.missing_linked_tasks).toBe(1); + expect(payload.summary.combined.total).toBe(3); + }); + }); + + it("sorts combined audit findings before applying the limit", async () => { + await withTaskCommandStateDir(async () => { + const now = Date.now(); + vi.useFakeTimers(); + vi.setSystemTime(now - 40 * 60_000); + createRunningTaskRun({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + runId: "task-stale-queued", + task: "Queue audit", + }); + vi.setSystemTime(now); + const runningFlow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/tasks-command", + goal: "Running flow", + status: "running", + createdAt: now - 45 * 60_000, + updatedAt: now - 45 * 60_000, + }); + + const runtime = createRuntime(); + await tasksAuditCommand({ json: true, limit: 1 }, runtime); + + const payload = JSON.parse(String(vi.mocked(runtime.log).mock.calls[0]?.[0])) as { + findings: Array<{ kind: string; code: string; token?: string }>; + }; + + expect(payload.findings).toHaveLength(1); + expect(payload.findings[0]).toMatchObject({ + kind: "task_flow", + code: "stale_running", + token: runningFlow.flowId, + }); + }); + }); + + it("keeps tasks maintenance JSON additive for TaskFlow state", async () => { + await withTaskCommandStateDir(async () => { + const now = Date.now(); + createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/tasks-command", + goal: "Old terminal flow", + status: "succeeded", + createdAt: now - 8 * 24 * 60 * 60_000, + updatedAt: now - 8 * 24 * 60 * 60_000, + endedAt: now - 8 * 24 * 60 * 60_000, + }); + + const runtime = createRuntime(); + await tasksMaintenanceCommand({ json: true, apply: false }, runtime); + + const payload = JSON.parse(String(vi.mocked(runtime.log).mock.calls[0]?.[0])) as { + mode: string; + maintenance: { taskFlows: { pruned: number } }; + auditBefore: { + byCode: Record; + taskFlows: { byCode: Record }; + }; + auditAfter: { + byCode: Record; + taskFlows: { byCode: Record }; + }; + }; + + expect(payload.mode).toBe("preview"); + expect(payload.maintenance.taskFlows.pruned).toBe(1); + expect(payload.auditBefore.byCode).toBeDefined(); + expect(payload.auditBefore.taskFlows.byCode.stale_running).toBe(0); + expect(payload.auditAfter.byCode).toBeDefined(); + expect(payload.auditAfter.taskFlows.byCode.stale_running).toBe(0); + }); + }); +}); diff --git a/src/tasks/task-flow-registry.audit.test.ts b/src/tasks/task-flow-registry.audit.test.ts index 8c0224136cc..d9d88ef6ad6 100644 --- a/src/tasks/task-flow-registry.audit.test.ts +++ b/src/tasks/task-flow-registry.audit.test.ts @@ -62,6 +62,35 @@ describe("task-flow-registry audit", () => { ]); }); + it("clears restore-failed findings after a clean reset and restore", () => { + configureTaskFlowRegistryRuntime({ + store: { + loadSnapshot: () => { + throw new Error("boom"); + }, + saveSnapshot: () => {}, + }, + }); + + expect(listTaskFlowAuditFindings()).toEqual([ + expect.objectContaining({ + code: "restore_failed", + }), + ]); + + resetTaskFlowRegistryForTests({ persist: false }); + configureTaskFlowRegistryRuntime({ + store: { + loadSnapshot: () => ({ + flows: new Map(), + }), + saveSnapshot: () => {}, + }, + }); + + expect(listTaskFlowAuditFindings()).toEqual([]); + }); + it("detects stuck managed flows and missing blocked tasks", async () => { await withTaskFlowAuditStateDir(async () => { const running = createManagedTaskFlow({ @@ -138,4 +167,56 @@ describe("task-flow-registry audit", () => { ); }); }); + + it("does not flag missing linked tasks before the flow is stale", async () => { + await withTaskFlowAuditStateDir(async () => { + const now = Date.now(); + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-flow-audit", + goal: "Fresh managed flow", + status: "running", + createdAt: now - 5 * 60_000, + updatedAt: now - 5 * 60_000, + }); + + expect( + listTaskFlowAuditFindings({ now }).find( + (finding) => finding.code === "missing_linked_tasks", + ), + ).toBeUndefined(); + + expect(listTaskFlowAuditFindings({ now: now + 26 * 60_000 })).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + code: "missing_linked_tasks", + flow: expect.objectContaining({ flowId: flow.flowId }), + }), + ]), + ); + }); + }); + + it("reports cancel-stuck before maintenance finalizes the flow", async () => { + await withTaskFlowAuditStateDir(async () => { + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-flow-audit", + goal: "Cancel work", + status: "running", + cancelRequestedAt: 100, + createdAt: 1, + updatedAt: 100, + }); + + expect(listTaskFlowAuditFindings({ now: 6 * 60_000 })).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + code: "cancel_stuck", + flow: expect.objectContaining({ flowId: flow.flowId }), + }), + ]), + ); + }); + }); }); diff --git a/src/tasks/task-flow-registry.maintenance.test.ts b/src/tasks/task-flow-registry.maintenance.test.ts index 21ac3877674..d45fb637eac 100644 --- a/src/tasks/task-flow-registry.maintenance.test.ts +++ b/src/tasks/task-flow-registry.maintenance.test.ts @@ -1,8 +1,11 @@ import { afterEach, describe, expect, it } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; +import { createRunningTaskRun } from "./task-executor.js"; import { createManagedTaskFlow, getTaskFlowById, + listTaskFlowRecords, + requestFlowCancel, resetTaskFlowRegistryForTests, } from "./task-flow-registry.js"; import { @@ -100,4 +103,110 @@ describe("task-flow-registry maintenance", () => { expect(getTaskFlowById(oldFlow.flowId)).toBeUndefined(); }); }); + + it("does not finalize cancel-requested flows while a child task is still active", async () => { + await withTaskFlowMaintenanceStateDir(async () => { + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-flow-maintenance", + goal: "Wait for child cancel", + status: "running", + createdAt: 1, + updatedAt: 100, + }); + + const child = createRunningTaskRun({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: flow.flowId, + childSessionKey: "agent:main:child", + runId: "run-active-child", + task: "Inspect repo", + startedAt: 100, + lastEventAt: 100, + }); + + expect( + requestFlowCancel({ + flowId: flow.flowId, + expectedRevision: flow.revision, + cancelRequestedAt: 100, + updatedAt: 100, + }), + ).toMatchObject({ + applied: true, + flow: expect.objectContaining({ + flowId: flow.flowId, + cancelRequestedAt: 100, + }), + }); + + expect(previewTaskFlowRegistryMaintenance()).toEqual({ + reconciled: 0, + pruned: 0, + }); + + expect(await runTaskFlowRegistryMaintenance()).toEqual({ + reconciled: 0, + pruned: 0, + }); + expect(getTaskFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "running", + cancelRequestedAt: 100, + }); + expect(child.parentFlowId).toBe(flow.flowId); + }); + }); + + it("prunes many old terminal flows while keeping fresh and active ones", async () => { + await withTaskFlowMaintenanceStateDir(async () => { + const now = Date.now(); + + for (let index = 0; index < 25; index += 1) { + createManagedTaskFlow({ + ownerKey: `agent:main:${index}`, + controllerId: "tests/task-flow-maintenance", + goal: `Old terminal flow ${index}`, + status: "succeeded", + createdAt: now - 8 * 24 * 60 * 60_000 - index, + updatedAt: now - 8 * 24 * 60 * 60_000 - index, + endedAt: now - 8 * 24 * 60 * 60_000 - index, + }); + } + + const fresh = createManagedTaskFlow({ + ownerKey: "agent:main:fresh", + controllerId: "tests/task-flow-maintenance", + goal: "Fresh terminal flow", + status: "succeeded", + createdAt: now - 2 * 24 * 60 * 60_000, + updatedAt: now - 2 * 24 * 60 * 60_000, + endedAt: now - 2 * 24 * 60 * 60_000, + }); + + const running = createManagedTaskFlow({ + ownerKey: "agent:main:running", + controllerId: "tests/task-flow-maintenance", + goal: "Active flow", + status: "running", + createdAt: now - 60_000, + updatedAt: now - 60_000, + }); + + expect(previewTaskFlowRegistryMaintenance()).toEqual({ + reconciled: 0, + pruned: 25, + }); + + expect(await runTaskFlowRegistryMaintenance()).toEqual({ + reconciled: 0, + pruned: 25, + }); + + const remainingFlowIds = new Set(listTaskFlowRecords().map((flow) => flow.flowId)); + expect(remainingFlowIds).toEqual(new Set([fresh.flowId, running.flowId])); + }); + }); });