diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index aa41126734e..02263c02d7d 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -1,11 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; -import { - createManagedFlow, - getFlowById, - listFlowRecords, - resetFlowRegistryForTests, -} from "./flow-registry.js"; import { cancelFlowById, cancelFlowByIdForOwner, @@ -21,6 +15,12 @@ import { setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, } from "./task-executor.js"; +import { + createManagedTaskFlow, + getTaskFlowById, + listTaskFlowRecords, + resetTaskFlowRegistryForTests, +} from "./task-flow-registry.js"; import { getTaskById, findLatestTaskForFlowId, @@ -60,13 +60,13 @@ async function withTaskExecutorStateDir(run: (root: string) => Promise): P process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); try { await run(root); } finally { resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); } }); } @@ -80,7 +80,7 @@ describe("task-executor", () => { } resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -178,7 +178,7 @@ describe("task-executor", () => { }); expect(created.parentFlowId).toEqual(expect.any(String)); - expect(getFlowById(created.parentFlowId!)).toMatchObject({ + expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({ flowId: created.parentFlowId, ownerKey: "agent:main:main", status: "running", @@ -193,7 +193,7 @@ describe("task-executor", () => { terminalSummary: "Done.", }); - expect(getFlowById(created.parentFlowId!)).toMatchObject({ + expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({ flowId: created.parentFlowId, status: "succeeded", endedAt: 40, @@ -217,7 +217,7 @@ describe("task-executor", () => { }); expect(created.parentFlowId).toBeUndefined(); - expect(listFlowRecords()).toEqual([]); + expect(listTaskFlowRecords()).toEqual([]); }); }); @@ -252,7 +252,7 @@ describe("task-executor", () => { terminalOutcome: "blocked", terminalSummary: "Writable session required.", }); - expect(getFlowById(created.parentFlowId!)).toMatchObject({ + expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({ flowId: created.parentFlowId, status: "blocked", blockedTaskId: created.taskId, @@ -279,7 +279,7 @@ describe("task-executor", () => { runId: "run-executor-retry", }), }); - expect(getFlowById(created.parentFlowId!)).toMatchObject({ + expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({ flowId: created.parentFlowId, status: "queued", }); @@ -299,7 +299,7 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { hoisted.cancelSessionMock.mockResolvedValue(undefined); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/managed-flow", goal: "Inspect PR batch", @@ -329,7 +329,7 @@ describe("task-executor", () => { taskId: child.taskId, status: "cancelled", }); - expect(getFlowById(flow.flowId)).toMatchObject({ + expect(getTaskFlowById(flow.flowId)).toMatchObject({ flowId: flow.flowId, status: "cancelled", }); @@ -338,7 +338,7 @@ describe("task-executor", () => { it("runs child tasks under managed TaskFlows", async () => { await withTaskExecutorStateDir(async () => { - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/managed-flow", goal: "Inspect PR batch", @@ -380,7 +380,7 @@ describe("task-executor", () => { it("refuses to add child tasks once cancellation is requested on a managed TaskFlow", async () => { await withTaskExecutorStateDir(async () => { - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/managed-flow", goal: "Protected flow", @@ -416,7 +416,7 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { hoisted.cancelSessionMock.mockRejectedValue(new Error("still shutting down")); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/managed-flow", goal: "Long running batch", @@ -460,7 +460,7 @@ describe("task-executor", () => { taskId: child.taskId, status: "cancelled", }); - expect(getFlowById(flow.flowId)).toMatchObject({ + expect(getTaskFlowById(flow.flowId)).toMatchObject({ flowId: flow.flowId, cancelRequestedAt: expect.any(Number), status: "cancelled", @@ -471,7 +471,7 @@ describe("task-executor", () => { it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => { await withTaskExecutorStateDir(async () => { - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/managed-flow", goal: "Protected flow", @@ -488,7 +488,7 @@ describe("task-executor", () => { cancelled: false, reason: "Flow not found.", }); - expect(getFlowById(flow.flowId)).toMatchObject({ + expect(getTaskFlowById(flow.flowId)).toMatchObject({ flowId: flow.flowId, status: "queued", }); @@ -497,7 +497,7 @@ describe("task-executor", () => { it("denies cross-owner managed TaskFlow child spawning through the owner-scoped wrapper", async () => { await withTaskExecutorStateDir(async () => { - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/managed-flow", goal: "Protected flow", diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index f959a6495f0..bd1a534fab1 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -1,14 +1,5 @@ import type { OpenClawConfig } from "../config/config.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { getFlowByIdForOwner } from "./flow-owner-access.js"; -import type { FlowRecord } from "./flow-registry.types.js"; -import { - createFlowForTask, - deleteFlowRecordById, - getFlowById, - requestFlowCancel, - updateFlowRecordByIdExpectedRevision, -} from "./flow-runtime-internal.js"; import { cancelTaskById, createTaskRecord, @@ -22,6 +13,15 @@ import { recordTaskProgressByRunId, setTaskRunDeliveryStatusByRunId, } from "./runtime-internal.js"; +import { getTaskFlowByIdForOwner } from "./task-flow-owner-access.js"; +import type { TaskFlowRecord } from "./task-flow-registry.types.js"; +import { + createTaskFlowForTask, + deleteTaskFlowRecordById, + getTaskFlowById, + requestFlowCancel, + updateFlowRecordByIdExpectedRevision, +} from "./task-flow-runtime-internal.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskDeliveryState, @@ -55,7 +55,7 @@ function ensureSingleTaskFlow(params: { return params.task; } try { - const flow = createFlowForTask({ + const flow = createTaskFlowForTask({ task: params.task, requesterOrigin: params.requesterOrigin, }); @@ -64,11 +64,11 @@ function ensureSingleTaskFlow(params: { flowId: flow.flowId, }); if (!linked) { - deleteFlowRecordById(flow.flowId); + deleteTaskFlowRecordById(flow.flowId); return params.task; } if (linked.parentFlowId !== flow.flowId) { - deleteFlowRecordById(flow.flowId); + deleteTaskFlowRecordById(flow.flowId); return linked; } return linked; @@ -266,7 +266,7 @@ function resolveRetryableBlockedFlowTask(flowId: string): { latestTask?: TaskRecord; reason?: string; } { - const flow = getFlowById(flowId); + const flow = getTaskFlowById(flowId); if (!flow) { return { flowFound: false, @@ -314,7 +314,7 @@ function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowR reason: resolved.reason, }; } - const flow = getFlowById(params.flowId); + const flow = getTaskFlowById(params.flowId); if (!flow) { return { found: false, @@ -374,7 +374,7 @@ type CancelFlowResult = { found: boolean; cancelled: boolean; reason?: string; - flow?: FlowRecord; + flow?: TaskFlowRecord; tasks?: TaskRecord[]; }; @@ -382,7 +382,7 @@ type RunTaskInFlowResult = { found: boolean; created: boolean; reason?: string; - flow?: FlowRecord; + flow?: TaskFlowRecord; task?: TaskRecord; }; @@ -390,13 +390,13 @@ function isActiveTaskStatus(status: TaskStatus): boolean { return status === "queued" || status === "running"; } -function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { +function isTerminalFlowStatus(status: TaskFlowRecord["status"]): boolean { return ( status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost" ); } -function markFlowCancelRequested(flow: FlowRecord): FlowRecord | FlowUpdateFailure { +function markFlowCancelRequested(flow: TaskFlowRecord): TaskFlowRecord | FlowUpdateFailure { if (flow.cancelRequestedAt != null) { return flow; } @@ -412,19 +412,19 @@ function markFlowCancelRequested(flow: FlowRecord): FlowRecord | FlowUpdateFailu result.reason === "revision_conflict" ? "Flow changed while cancellation was in progress." : "Flow not found.", - flow: result.current ?? getFlowById(flow.flowId), + flow: result.current ?? getTaskFlowById(flow.flowId), }; } type FlowUpdateFailure = { reason: string; - flow?: FlowRecord; + flow?: TaskFlowRecord; }; function cancelManagedFlowAfterChildrenSettle( - flow: FlowRecord, + flow: TaskFlowRecord, endedAt: number, -): FlowRecord | FlowUpdateFailure { +): TaskFlowRecord | FlowUpdateFailure { const result = updateFlowRecordByIdExpectedRevision({ flowId: flow.flowId, expectedRevision: flow.revision, @@ -445,7 +445,7 @@ function cancelManagedFlowAfterChildrenSettle( result.reason === "revision_conflict" ? "Flow changed while cancellation was in progress." : "Flow not found.", - flow: result.current ?? getFlowById(flow.flowId), + flow: result.current ?? getTaskFlowById(flow.flowId), }; } @@ -453,7 +453,7 @@ function mapRunTaskInFlowCreateError(params: { error: unknown; flowId: string; }): RunTaskInFlowResult { - const flow = getFlowById(params.flowId); + const flow = getTaskFlowById(params.flowId); if (isParentFlowLinkError(params.error)) { if (params.error.code === "cancel_requested") { return { @@ -501,7 +501,7 @@ export function runTaskInFlow(params: { lastEventAt?: number; progressSummary?: string | null; }): RunTaskInFlowResult { - const flow = getFlowById(params.flowId); + const flow = getTaskFlowById(params.flowId); if (!flow) { return { found: false, @@ -572,7 +572,7 @@ export function runTaskInFlow(params: { return { found: true, created: true, - flow: getFlowById(flow.flowId) ?? flow, + flow: getTaskFlowById(flow.flowId) ?? flow, task, }; } @@ -596,7 +596,7 @@ export function runTaskInFlowForOwner(params: { lastEventAt?: number; progressSummary?: string | null; }): RunTaskInFlowResult { - const flow = getFlowByIdForOwner({ + const flow = getTaskFlowByIdForOwner({ flowId: params.flowId, callerOwnerKey: params.callerOwnerKey, }); @@ -631,7 +631,7 @@ export async function cancelFlowById(params: { cfg: OpenClawConfig; flowId: string; }): Promise { - const flow = getFlowById(params.flowId); + const flow = getTaskFlowById(params.flowId); if (!flow) { return { found: false, @@ -673,12 +673,12 @@ export async function cancelFlowById(params: { found: true, cancelled: false, reason: "One or more child tasks are still active.", - flow: getFlowById(flow.flowId) ?? cancelRequestedFlow, + flow: getTaskFlowById(flow.flowId) ?? cancelRequestedFlow, tasks: refreshedTasks, }; } const now = Date.now(); - const refreshedFlow = getFlowById(flow.flowId) ?? cancelRequestedFlow; + const refreshedFlow = getTaskFlowById(flow.flowId) ?? cancelRequestedFlow; if (isTerminalFlowStatus(refreshedFlow.status)) { return { found: true, @@ -714,7 +714,7 @@ export async function cancelFlowByIdForOwner(params: { flowId: string; callerOwnerKey: string; }): Promise { - const flow = getFlowByIdForOwner({ + const flow = getTaskFlowByIdForOwner({ flowId: params.flowId, callerOwnerKey: params.callerOwnerKey, }); diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index f6a415186df..e7950d71e97 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -3,7 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; -import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js"; +import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js"; import { createTaskRecord, deleteTaskRecordById, @@ -38,7 +38,7 @@ describe("task-registry store runtime", () => { afterEach(() => { delete process.env.OPENCLAW_STATE_DIR; resetTaskRegistryForTests(); - resetFlowRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); }); it("uses the configured task store for restore and save", () => { @@ -197,7 +197,7 @@ describe("task-registry store runtime", () => { }); it("persists parentFlowId with task rows", () => { - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/task-store-parent-flow", goal: "Persist linked tasks", diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 810f2a0c676..da8ffe4b62c 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -11,7 +11,7 @@ import { } from "../infra/heartbeat-wake.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; -import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js"; +import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js"; import { createTaskRecord, findLatestTaskForOwnerKey, @@ -193,7 +193,7 @@ describe("task-registry", () => { resetHeartbeatWakeStateForTests(); resetAgentRunContextForTest(); resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -301,9 +301,9 @@ describe("task-registry", () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/task-registry", goal: "Owner main flow", @@ -326,9 +326,9 @@ describe("task-registry", () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/task-registry", goal: "Owner main flow", @@ -352,7 +352,7 @@ describe("task-registry", () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); const task = createTaskRecord({ runtime: "acp", @@ -361,7 +361,7 @@ describe("task-registry", () => { runId: "owner-main-task", task: "Safe task", }); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:other", controllerId: "tests/task-registry", goal: "Other owner flow", @@ -384,9 +384,9 @@ describe("task-registry", () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/task-registry", goal: "Cancelling flow", @@ -417,9 +417,9 @@ describe("task-registry", () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskFlowRegistryForTests(); - const flow = createManagedFlow({ + const flow = createManagedTaskFlow({ ownerKey: "agent:main:main", controllerId: "tests/task-registry", goal: "Completed flow", diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 560ad714674..45877ad44ec 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -9,12 +9,6 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; -import type { FlowRecord } from "./flow-registry.types.js"; -import { - getFlowById, - syncFlowFromTask, - updateFlowRecordByIdExpectedRevision, -} from "./flow-runtime-internal.js"; import { formatTaskBlockedFollowupMessage, formatTaskStateChangeMessage, @@ -24,6 +18,12 @@ import { shouldAutoDeliverTaskTerminalUpdate, shouldSuppressDuplicateTerminalDelivery, } from "./task-executor-policy.js"; +import type { TaskFlowRecord } from "./task-flow-registry.types.js"; +import { + getTaskFlowById, + syncFlowFromTask, + updateFlowRecordByIdExpectedRevision, +} from "./task-flow-runtime-internal.js"; import { getTaskRegistryHooks, getTaskRegistryStore, @@ -81,7 +81,7 @@ export class ParentFlowLinkError extends Error { message: string, public readonly details?: { flowId?: string; - status?: FlowRecord["status"]; + status?: TaskFlowRecord["status"]; }, ) { super(message); @@ -97,7 +97,7 @@ function isActiveTaskStatus(status: TaskStatus): boolean { return status === "queued" || status === "running"; } -function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { +function isTerminalFlowStatus(status: TaskFlowRecord["status"]): boolean { return ( status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost" ); @@ -131,7 +131,7 @@ function assertParentFlowLinkAllowed(params: { { flowId }, ); } - const flow = getFlowById(flowId); + const flow = getTaskFlowById(flowId); if (!flow) { throw new ParentFlowLinkError("parent_flow_not_found", `Parent flow not found: ${flowId}`, { flowId, @@ -727,7 +727,7 @@ function getLinkedFlowForDelivery(task: TaskRecord) { if (!flowId || task.scopeKind !== "session") { return undefined; } - const flow = getFlowById(flowId); + const flow = getTaskFlowById(flowId); if (!flow) { return undefined; } @@ -762,7 +762,7 @@ function syncManagedFlowCancellationFromTask(task: TaskRecord): void { if (!flowId) { return; } - let flow = getFlowById(flowId); + let flow = getTaskFlowById(flowId); if ( !flow || flow.syncMode !== "managed" ||