refactor(tasks): migrate task runtime callsites to task-flow

This commit is contained in:
Vincent Koc
2026-04-02 20:36:30 +09:00
parent a51c976d27
commit a7909d46d2
5 changed files with 80 additions and 80 deletions

View File

@@ -1,11 +1,5 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createManagedFlow,
getFlowById,
listFlowRecords,
resetFlowRegistryForTests,
} from "./flow-registry.js";
import {
cancelFlowById,
cancelFlowByIdForOwner,
@@ -21,6 +15,12 @@ import {
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./task-executor.js";
import {
createManagedTaskFlow,
getTaskFlowById,
listTaskFlowRecords,
resetTaskFlowRegistryForTests,
} from "./task-flow-registry.js";
import {
getTaskById,
findLatestTaskForFlowId,
@@ -60,13 +60,13 @@ async function withTaskExecutorStateDir(run: (root: string) => Promise<void>): P
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
}
});
}
@@ -80,7 +80,7 @@ describe("task-executor", () => {
}
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
@@ -178,7 +178,7 @@ describe("task-executor", () => {
});
expect(created.parentFlowId).toEqual(expect.any(String));
expect(getFlowById(created.parentFlowId!)).toMatchObject({
expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
ownerKey: "agent:main:main",
status: "running",
@@ -193,7 +193,7 @@ describe("task-executor", () => {
terminalSummary: "Done.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "succeeded",
endedAt: 40,
@@ -217,7 +217,7 @@ describe("task-executor", () => {
});
expect(created.parentFlowId).toBeUndefined();
expect(listFlowRecords()).toEqual([]);
expect(listTaskFlowRecords()).toEqual([]);
});
});
@@ -252,7 +252,7 @@ describe("task-executor", () => {
terminalOutcome: "blocked",
terminalSummary: "Writable session required.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "blocked",
blockedTaskId: created.taskId,
@@ -279,7 +279,7 @@ describe("task-executor", () => {
runId: "run-executor-retry",
}),
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
expect(getTaskFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "queued",
});
@@ -299,7 +299,7 @@ describe("task-executor", () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Inspect PR batch",
@@ -329,7 +329,7 @@ describe("task-executor", () => {
taskId: child.taskId,
status: "cancelled",
});
expect(getFlowById(flow.flowId)).toMatchObject({
expect(getTaskFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "cancelled",
});
@@ -338,7 +338,7 @@ describe("task-executor", () => {
it("runs child tasks under managed TaskFlows", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Inspect PR batch",
@@ -380,7 +380,7 @@ describe("task-executor", () => {
it("refuses to add child tasks once cancellation is requested on a managed TaskFlow", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Protected flow",
@@ -416,7 +416,7 @@ describe("task-executor", () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockRejectedValue(new Error("still shutting down"));
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Long running batch",
@@ -460,7 +460,7 @@ describe("task-executor", () => {
taskId: child.taskId,
status: "cancelled",
});
expect(getFlowById(flow.flowId)).toMatchObject({
expect(getTaskFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
cancelRequestedAt: expect.any(Number),
status: "cancelled",
@@ -471,7 +471,7 @@ describe("task-executor", () => {
it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Protected flow",
@@ -488,7 +488,7 @@ describe("task-executor", () => {
cancelled: false,
reason: "Flow not found.",
});
expect(getFlowById(flow.flowId)).toMatchObject({
expect(getTaskFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "queued",
});
@@ -497,7 +497,7 @@ describe("task-executor", () => {
it("denies cross-owner managed TaskFlow child spawning through the owner-scoped wrapper", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Protected flow",

View File

@@ -1,14 +1,5 @@
import type { OpenClawConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { getFlowByIdForOwner } from "./flow-owner-access.js";
import type { FlowRecord } from "./flow-registry.types.js";
import {
createFlowForTask,
deleteFlowRecordById,
getFlowById,
requestFlowCancel,
updateFlowRecordByIdExpectedRevision,
} from "./flow-runtime-internal.js";
import {
cancelTaskById,
createTaskRecord,
@@ -22,6 +13,15 @@ import {
recordTaskProgressByRunId,
setTaskRunDeliveryStatusByRunId,
} from "./runtime-internal.js";
import { getTaskFlowByIdForOwner } from "./task-flow-owner-access.js";
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
import {
createTaskFlowForTask,
deleteTaskFlowRecordById,
getTaskFlowById,
requestFlowCancel,
updateFlowRecordByIdExpectedRevision,
} from "./task-flow-runtime-internal.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type {
TaskDeliveryState,
@@ -55,7 +55,7 @@ function ensureSingleTaskFlow(params: {
return params.task;
}
try {
const flow = createFlowForTask({
const flow = createTaskFlowForTask({
task: params.task,
requesterOrigin: params.requesterOrigin,
});
@@ -64,11 +64,11 @@ function ensureSingleTaskFlow(params: {
flowId: flow.flowId,
});
if (!linked) {
deleteFlowRecordById(flow.flowId);
deleteTaskFlowRecordById(flow.flowId);
return params.task;
}
if (linked.parentFlowId !== flow.flowId) {
deleteFlowRecordById(flow.flowId);
deleteTaskFlowRecordById(flow.flowId);
return linked;
}
return linked;
@@ -266,7 +266,7 @@ function resolveRetryableBlockedFlowTask(flowId: string): {
latestTask?: TaskRecord;
reason?: string;
} {
const flow = getFlowById(flowId);
const flow = getTaskFlowById(flowId);
if (!flow) {
return {
flowFound: false,
@@ -314,7 +314,7 @@ function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowR
reason: resolved.reason,
};
}
const flow = getFlowById(params.flowId);
const flow = getTaskFlowById(params.flowId);
if (!flow) {
return {
found: false,
@@ -374,7 +374,7 @@ type CancelFlowResult = {
found: boolean;
cancelled: boolean;
reason?: string;
flow?: FlowRecord;
flow?: TaskFlowRecord;
tasks?: TaskRecord[];
};
@@ -382,7 +382,7 @@ type RunTaskInFlowResult = {
found: boolean;
created: boolean;
reason?: string;
flow?: FlowRecord;
flow?: TaskFlowRecord;
task?: TaskRecord;
};
@@ -390,13 +390,13 @@ function isActiveTaskStatus(status: TaskStatus): boolean {
return status === "queued" || status === "running";
}
function isTerminalFlowStatus(status: FlowRecord["status"]): boolean {
function isTerminalFlowStatus(status: TaskFlowRecord["status"]): boolean {
return (
status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost"
);
}
function markFlowCancelRequested(flow: FlowRecord): FlowRecord | FlowUpdateFailure {
function markFlowCancelRequested(flow: TaskFlowRecord): TaskFlowRecord | FlowUpdateFailure {
if (flow.cancelRequestedAt != null) {
return flow;
}
@@ -412,19 +412,19 @@ function markFlowCancelRequested(flow: FlowRecord): FlowRecord | FlowUpdateFailu
result.reason === "revision_conflict"
? "Flow changed while cancellation was in progress."
: "Flow not found.",
flow: result.current ?? getFlowById(flow.flowId),
flow: result.current ?? getTaskFlowById(flow.flowId),
};
}
type FlowUpdateFailure = {
reason: string;
flow?: FlowRecord;
flow?: TaskFlowRecord;
};
function cancelManagedFlowAfterChildrenSettle(
flow: FlowRecord,
flow: TaskFlowRecord,
endedAt: number,
): FlowRecord | FlowUpdateFailure {
): TaskFlowRecord | FlowUpdateFailure {
const result = updateFlowRecordByIdExpectedRevision({
flowId: flow.flowId,
expectedRevision: flow.revision,
@@ -445,7 +445,7 @@ function cancelManagedFlowAfterChildrenSettle(
result.reason === "revision_conflict"
? "Flow changed while cancellation was in progress."
: "Flow not found.",
flow: result.current ?? getFlowById(flow.flowId),
flow: result.current ?? getTaskFlowById(flow.flowId),
};
}
@@ -453,7 +453,7 @@ function mapRunTaskInFlowCreateError(params: {
error: unknown;
flowId: string;
}): RunTaskInFlowResult {
const flow = getFlowById(params.flowId);
const flow = getTaskFlowById(params.flowId);
if (isParentFlowLinkError(params.error)) {
if (params.error.code === "cancel_requested") {
return {
@@ -501,7 +501,7 @@ export function runTaskInFlow(params: {
lastEventAt?: number;
progressSummary?: string | null;
}): RunTaskInFlowResult {
const flow = getFlowById(params.flowId);
const flow = getTaskFlowById(params.flowId);
if (!flow) {
return {
found: false,
@@ -572,7 +572,7 @@ export function runTaskInFlow(params: {
return {
found: true,
created: true,
flow: getFlowById(flow.flowId) ?? flow,
flow: getTaskFlowById(flow.flowId) ?? flow,
task,
};
}
@@ -596,7 +596,7 @@ export function runTaskInFlowForOwner(params: {
lastEventAt?: number;
progressSummary?: string | null;
}): RunTaskInFlowResult {
const flow = getFlowByIdForOwner({
const flow = getTaskFlowByIdForOwner({
flowId: params.flowId,
callerOwnerKey: params.callerOwnerKey,
});
@@ -631,7 +631,7 @@ export async function cancelFlowById(params: {
cfg: OpenClawConfig;
flowId: string;
}): Promise<CancelFlowResult> {
const flow = getFlowById(params.flowId);
const flow = getTaskFlowById(params.flowId);
if (!flow) {
return {
found: false,
@@ -673,12 +673,12 @@ export async function cancelFlowById(params: {
found: true,
cancelled: false,
reason: "One or more child tasks are still active.",
flow: getFlowById(flow.flowId) ?? cancelRequestedFlow,
flow: getTaskFlowById(flow.flowId) ?? cancelRequestedFlow,
tasks: refreshedTasks,
};
}
const now = Date.now();
const refreshedFlow = getFlowById(flow.flowId) ?? cancelRequestedFlow;
const refreshedFlow = getTaskFlowById(flow.flowId) ?? cancelRequestedFlow;
if (isTerminalFlowStatus(refreshedFlow.status)) {
return {
found: true,
@@ -714,7 +714,7 @@ export async function cancelFlowByIdForOwner(params: {
flowId: string;
callerOwnerKey: string;
}): Promise<CancelFlowResult> {
const flow = getFlowByIdForOwner({
const flow = getTaskFlowByIdForOwner({
flowId: params.flowId,
callerOwnerKey: params.callerOwnerKey,
});

View File

@@ -3,7 +3,7 @@ import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js";
import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js";
import {
createTaskRecord,
deleteTaskRecordById,
@@ -38,7 +38,7 @@ describe("task-registry store runtime", () => {
afterEach(() => {
delete process.env.OPENCLAW_STATE_DIR;
resetTaskRegistryForTests();
resetFlowRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
});
it("uses the configured task store for restore and save", () => {
@@ -197,7 +197,7 @@ describe("task-registry store runtime", () => {
});
it("persists parentFlowId with task rows", () => {
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-store-parent-flow",
goal: "Persist linked tasks",

View File

@@ -11,7 +11,7 @@ import {
} from "../infra/heartbeat-wake.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js";
import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js";
import {
createTaskRecord,
findLatestTaskForOwnerKey,
@@ -193,7 +193,7 @@ describe("task-registry", () => {
resetHeartbeatWakeStateForTests();
resetAgentRunContextForTest();
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
@@ -301,9 +301,9 @@ describe("task-registry", () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Owner main flow",
@@ -326,9 +326,9 @@ describe("task-registry", () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Owner main flow",
@@ -352,7 +352,7 @@ describe("task-registry", () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
const task = createTaskRecord({
runtime: "acp",
@@ -361,7 +361,7 @@ describe("task-registry", () => {
runId: "owner-main-task",
task: "Safe task",
});
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:other",
controllerId: "tests/task-registry",
goal: "Other owner flow",
@@ -384,9 +384,9 @@ describe("task-registry", () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Cancelling flow",
@@ -417,9 +417,9 @@ describe("task-registry", () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetTaskFlowRegistryForTests();
const flow = createManagedFlow({
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Completed flow",

View File

@@ -9,12 +9,6 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import type { FlowRecord } from "./flow-registry.types.js";
import {
getFlowById,
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./flow-runtime-internal.js";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
@@ -24,6 +18,12 @@ import {
shouldAutoDeliverTaskTerminalUpdate,
shouldSuppressDuplicateTerminalDelivery,
} from "./task-executor-policy.js";
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
import {
getTaskFlowById,
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./task-flow-runtime-internal.js";
import {
getTaskRegistryHooks,
getTaskRegistryStore,
@@ -81,7 +81,7 @@ export class ParentFlowLinkError extends Error {
message: string,
public readonly details?: {
flowId?: string;
status?: FlowRecord["status"];
status?: TaskFlowRecord["status"];
},
) {
super(message);
@@ -97,7 +97,7 @@ function isActiveTaskStatus(status: TaskStatus): boolean {
return status === "queued" || status === "running";
}
function isTerminalFlowStatus(status: FlowRecord["status"]): boolean {
function isTerminalFlowStatus(status: TaskFlowRecord["status"]): boolean {
return (
status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost"
);
@@ -131,7 +131,7 @@ function assertParentFlowLinkAllowed(params: {
{ flowId },
);
}
const flow = getFlowById(flowId);
const flow = getTaskFlowById(flowId);
if (!flow) {
throw new ParentFlowLinkError("parent_flow_not_found", `Parent flow not found: ${flowId}`, {
flowId,
@@ -727,7 +727,7 @@ function getLinkedFlowForDelivery(task: TaskRecord) {
if (!flowId || task.scopeKind !== "session") {
return undefined;
}
const flow = getFlowById(flowId);
const flow = getTaskFlowById(flowId);
if (!flow) {
return undefined;
}
@@ -762,7 +762,7 @@ function syncManagedFlowCancellationFromTask(task: TaskRecord): void {
if (!flowId) {
return;
}
let flow = getFlowById(flowId);
let flow = getTaskFlowById(flowId);
if (
!flow ||
flow.syncMode !== "managed" ||