TaskFlow: add managed child task execution (#59610)

Merged via squash.

Prepared head SHA: e6cdde6c21
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-04-02 12:45:03 +02:00
committed by GitHub
parent f65da8711a
commit 8bdca2323d
7 changed files with 631 additions and 37 deletions

View File

@@ -19,7 +19,7 @@ function noteFlowRecoveryHints() {
flow.waitJson === undefined
) {
findings.push(
`${flow.flowId}: running managed flow has no linked tasks or wait state; inspect or cancel it manually.`,
`${flow.flowId}: running managed TaskFlow has no linked tasks or wait state; inspect or cancel it manually.`,
);
}
if (
@@ -28,7 +28,7 @@ function noteFlowRecoveryHints() {
!tasks.some((task) => task.taskId === flow.blockedTaskId)
) {
findings.push(
`${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
`${flow.flowId}: blocked TaskFlow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
);
}
return findings;

View File

@@ -23,6 +23,7 @@ export {
recordTaskProgressByRunId,
resolveTaskForLookupToken,
resetTaskRegistryForTests,
isParentFlowLinkError,
setTaskCleanupAfterById,
setTaskProgressById,
setTaskRunDeliveryStatusByRunId,

View File

@@ -16,6 +16,8 @@ import {
failTaskRunByRunId,
recordTaskRunProgressByRunId,
retryBlockedFlowAsQueuedTaskRun,
runTaskInFlow,
runTaskInFlowForOwner,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./task-executor.js";
@@ -289,7 +291,7 @@ describe("task-executor", () => {
});
});
it("cancels active tasks linked to a managed flow", async () => {
it("cancels active tasks linked to a managed TaskFlow", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
@@ -330,6 +332,139 @@ describe("task-executor", () => {
});
});
it("runs child tasks under managed TaskFlows", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Inspect PR batch",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
});
const created = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-child",
label: "Inspect a PR",
task: "Inspect a PR",
status: "running",
startedAt: 10,
lastEventAt: 10,
});
expect(created).toMatchObject({
found: true,
created: true,
task: expect.objectContaining({
parentFlowId: flow.flowId,
ownerKey: "agent:main:main",
status: "running",
runId: "run-flow-child",
}),
});
expect(getTaskById(created.task!.taskId)).toMatchObject({
parentFlowId: flow.flowId,
ownerKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
});
});
});
it("refuses to add child tasks once cancellation is requested on a managed TaskFlow", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Protected flow",
});
const cancelled = await cancelFlowById({
cfg: {} as never,
flowId: flow.flowId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: true,
});
const created = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-after-cancel",
task: "Should be denied",
});
expect(created).toMatchObject({
found: true,
created: false,
reason: "Flow cancellation has already been requested.",
});
});
});
it("sets cancel intent before child tasks settle and finalizes later", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockRejectedValue(new Error("still shutting down"));
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Long running batch",
});
const child = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-sticky-cancel",
task: "Inspect a PR",
status: "running",
startedAt: 10,
lastEventAt: 10,
}).task!;
const cancelled = await cancelFlowById({
cfg: {} as never,
flowId: flow.flowId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: false,
reason: "One or more child tasks are still active.",
flow: expect.objectContaining({
flowId: flow.flowId,
cancelRequestedAt: expect.any(Number),
status: "queued",
}),
});
failTaskRunByRunId({
runId: "run-flow-sticky-cancel",
endedAt: 50,
lastEventAt: 50,
error: "cancel completed later",
status: "cancelled",
});
expect(getTaskById(child.taskId)).toMatchObject({
taskId: child.taskId,
status: "cancelled",
});
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
cancelRequestedAt: expect.any(Number),
status: "cancelled",
endedAt: 50,
});
});
});
it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
@@ -356,6 +491,32 @@ describe("task-executor", () => {
});
});
it("denies cross-owner managed TaskFlow child spawning through the owner-scoped wrapper", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Protected flow",
});
const created = runTaskInFlowForOwner({
flowId: flow.flowId,
callerOwnerKey: "agent:main:other",
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-cross-owner",
task: "Should be denied",
});
expect(created).toMatchObject({
found: false,
created: false,
reason: "Flow not found.",
});
expect(findLatestTaskForFlowId(flow.flowId)).toBeUndefined();
});
});
it("cancels active ACP child tasks", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);

View File

@@ -6,12 +6,14 @@ import {
createFlowForTask,
deleteFlowRecordById,
getFlowById,
requestFlowCancel,
updateFlowRecordByIdExpectedRevision,
} from "./flow-runtime-internal.js";
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForFlowId,
isParentFlowLinkError,
linkTaskToFlowById,
listTasksForFlowId,
markTaskLostById,
@@ -293,7 +295,7 @@ function resolveRetryableBlockedFlowTask(flowId: string): {
flowFound: true,
retryable: false,
latestTask,
reason: "Latest flow task is not blocked.",
reason: "Latest TaskFlow task is not blocked.",
};
}
return {
@@ -376,6 +378,14 @@ type CancelFlowResult = {
tasks?: TaskRecord[];
};
type RunTaskInFlowResult = {
found: boolean;
created: boolean;
reason?: string;
flow?: FlowRecord;
task?: TaskRecord;
};
function isActiveTaskStatus(status: TaskStatus): boolean {
return status === "queued" || status === "running";
}
@@ -386,6 +396,237 @@ function isTerminalFlowStatus(status: FlowRecord["status"]): boolean {
);
}
function markFlowCancelRequested(flow: FlowRecord): FlowRecord | FlowUpdateFailure {
if (flow.cancelRequestedAt != null) {
return flow;
}
const result = requestFlowCancel({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
if (result.applied) {
return result.flow;
}
return {
reason:
result.reason === "revision_conflict"
? "Flow changed while cancellation was in progress."
: "Flow not found.",
flow: result.current ?? getFlowById(flow.flowId),
};
}
type FlowUpdateFailure = {
reason: string;
flow?: FlowRecord;
};
function cancelManagedFlowAfterChildrenSettle(
flow: FlowRecord,
endedAt: number,
): FlowRecord | FlowUpdateFailure {
const result = updateFlowRecordByIdExpectedRevision({
flowId: flow.flowId,
expectedRevision: flow.revision,
patch: {
status: "cancelled",
blockedTaskId: null,
blockedSummary: null,
waitJson: null,
endedAt,
updatedAt: endedAt,
},
});
if (result.applied) {
return result.flow;
}
return {
reason:
result.reason === "revision_conflict"
? "Flow changed while cancellation was in progress."
: "Flow not found.",
flow: result.current ?? getFlowById(flow.flowId),
};
}
function mapRunTaskInFlowCreateError(params: {
error: unknown;
flowId: string;
}): RunTaskInFlowResult {
const flow = getFlowById(params.flowId);
if (isParentFlowLinkError(params.error)) {
if (params.error.code === "cancel_requested") {
return {
found: true,
created: false,
reason: "Flow cancellation has already been requested.",
...(flow ? { flow } : {}),
};
}
if (params.error.code === "terminal") {
const terminalStatus = flow?.status ?? params.error.details?.status ?? "terminal";
return {
found: true,
created: false,
reason: `Flow is already ${terminalStatus}.`,
...(flow ? { flow } : {}),
};
}
if (params.error.code === "parent_flow_not_found") {
return {
found: false,
created: false,
reason: "Flow not found.",
};
}
}
throw params.error;
}
export function runTaskInFlow(params: {
flowId: string;
runtime: TaskRuntime;
sourceId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status?: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
}): RunTaskInFlowResult {
const flow = getFlowById(params.flowId);
if (!flow) {
return {
found: false,
created: false,
reason: "Flow not found.",
};
}
if (flow.syncMode !== "managed") {
return {
found: true,
created: false,
reason: "Flow does not accept managed child tasks.",
flow,
};
}
if (flow.cancelRequestedAt != null) {
return {
found: true,
created: false,
reason: "Flow cancellation has already been requested.",
flow,
};
}
if (isTerminalFlowStatus(flow.status)) {
return {
found: true,
created: false,
reason: `Flow is already ${flow.status}.`,
flow,
};
}
const common = {
runtime: params.runtime,
sourceId: params.sourceId,
ownerKey: flow.ownerKey,
scopeKind: "session" as const,
requesterOrigin: flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId,
agentId: params.agentId,
runId: params.runId,
label: params.label,
task: params.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy,
deliveryStatus: params.deliveryStatus ?? "pending",
};
let task: TaskRecord;
try {
task =
params.status === "running"
? createRunningTaskRun({
...common,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
})
: createQueuedTaskRun(common);
} catch (error) {
return mapRunTaskInFlowCreateError({
error,
flowId: flow.flowId,
});
}
return {
found: true,
created: true,
flow: getFlowById(flow.flowId) ?? flow,
task,
};
}
export function runTaskInFlowForOwner(params: {
flowId: string;
callerOwnerKey: string;
runtime: TaskRuntime;
sourceId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status?: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
}): RunTaskInFlowResult {
const flow = getFlowByIdForOwner({
flowId: params.flowId,
callerOwnerKey: params.callerOwnerKey,
});
if (!flow) {
return {
found: false,
created: false,
reason: "Flow not found.",
};
}
return runTaskInFlow({
flowId: flow.flowId,
runtime: params.runtime,
sourceId: params.sourceId,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId,
agentId: params.agentId,
runId: params.runId,
label: params.label,
task: params.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy,
deliveryStatus: params.deliveryStatus,
status: params.status,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
});
}
export async function cancelFlowById(params: {
cfg: OpenClawConfig;
flowId: string;
@@ -398,6 +639,25 @@ export async function cancelFlowById(params: {
reason: "Flow not found.",
};
}
if (isTerminalFlowStatus(flow.status)) {
return {
found: true,
cancelled: false,
reason: `Flow is already ${flow.status}.`,
flow,
tasks: listTasksForFlowId(flow.flowId),
};
}
const cancelRequestedFlow = markFlowCancelRequested(flow);
if ("reason" in cancelRequestedFlow) {
return {
found: true,
cancelled: false,
reason: cancelRequestedFlow.reason,
flow: cancelRequestedFlow.flow,
tasks: listTasksForFlowId(flow.flowId),
};
}
const linkedTasks = listTasksForFlowId(flow.flowId);
const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status));
for (const task of activeTasks) {
@@ -413,48 +673,38 @@ export async function cancelFlowById(params: {
found: true,
cancelled: false,
reason: "One or more child tasks are still active.",
flow: getFlowById(flow.flowId),
tasks: refreshedTasks,
};
}
if (isTerminalFlowStatus(flow.status)) {
return {
found: true,
cancelled: false,
reason: `Flow is already ${flow.status}.`,
flow,
flow: getFlowById(flow.flowId) ?? cancelRequestedFlow,
tasks: refreshedTasks,
};
}
const now = Date.now();
const refreshedFlow = getFlowById(flow.flowId) ?? flow;
const updatedFlowResult = updateFlowRecordByIdExpectedRevision({
flowId: refreshedFlow.flowId,
expectedRevision: refreshedFlow.revision,
patch: {
status: "cancelled",
blockedTaskId: null,
blockedSummary: null,
endedAt: now,
updatedAt: now,
},
});
if (!updatedFlowResult.applied) {
const refreshedFlow = getFlowById(flow.flowId) ?? cancelRequestedFlow;
if (isTerminalFlowStatus(refreshedFlow.status)) {
return {
found: true,
cancelled: refreshedFlow.status === "cancelled",
reason:
refreshedFlow.status === "cancelled"
? undefined
: `Flow is already ${refreshedFlow.status}.`,
flow: refreshedFlow,
tasks: refreshedTasks,
};
}
const updatedFlow = cancelManagedFlowAfterChildrenSettle(refreshedFlow, now);
if ("reason" in updatedFlow) {
return {
found: true,
cancelled: false,
reason:
updatedFlowResult.reason === "revision_conflict"
? "Flow changed while cancellation was in progress."
: "Flow not found.",
flow: updatedFlowResult.current ?? getFlowById(flow.flowId),
reason: updatedFlow.reason,
flow: updatedFlow.flow,
tasks: refreshedTasks,
};
}
return {
found: true,
cancelled: true,
flow: updatedFlowResult.flow,
flow: updatedFlow,
tasks: refreshedTasks,
};
}

View File

@@ -19,6 +19,7 @@ import {
findTaskByRunId,
getTaskById,
getTaskRegistrySummary,
isParentFlowLinkError,
listTasksForOwnerKey,
listTaskRecords,
linkTaskToFlowById,
@@ -379,6 +380,65 @@ describe("task-registry", () => {
});
});
it("rejects parent flow links once cancellation has been requested", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Cancelling flow",
cancelRequestedAt: 42,
});
try {
createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
parentFlowId: flow.flowId,
runId: "cancel-requested-link",
task: "Should be denied",
});
throw new Error("Expected createTaskRecord to throw.");
} catch (error) {
expect(isParentFlowLinkError(error)).toBe(true);
expect(error).toMatchObject({
code: "cancel_requested",
message: "Parent flow cancellation has already been requested.",
});
}
});
});
it("rejects parent flow links for terminal flows", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Completed flow",
status: "cancelled",
});
expect(() =>
createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
parentFlowId: flow.flowId,
runId: "terminal-flow-link",
task: "Should be denied",
}),
).toThrow("Parent flow is already cancelled.");
});
});
it("delivers ACP completion to the requester channel when a delivery origin exists", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@@ -9,7 +9,12 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { getFlowById, syncFlowFromTask } from "./flow-runtime-internal.js";
import type { FlowRecord } from "./flow-registry.types.js";
import {
getFlowById,
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./flow-runtime-internal.js";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
@@ -63,6 +68,41 @@ type TaskDeliveryOwner = {
flowId?: string;
};
export type ParentFlowLinkErrorCode =
| "scope_kind_not_session"
| "parent_flow_not_found"
| "owner_key_mismatch"
| "cancel_requested"
| "terminal";
export class ParentFlowLinkError extends Error {
constructor(
public readonly code: ParentFlowLinkErrorCode,
message: string,
public readonly details?: {
flowId?: string;
status?: FlowRecord["status"];
},
) {
super(message);
this.name = "ParentFlowLinkError";
}
}
export function isParentFlowLinkError(error: unknown): error is ParentFlowLinkError {
return error instanceof ParentFlowLinkError;
}
function isActiveTaskStatus(status: TaskStatus): boolean {
return status === "queued" || status === "running";
}
function isTerminalFlowStatus(status: FlowRecord["status"]): boolean {
return (
status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost"
);
}
function assertTaskOwner(params: { ownerKey: string; scopeKind: TaskScopeKind }) {
const ownerKey = params.ownerKey.trim();
if (!ownerKey && params.scopeKind !== "system") {
@@ -85,14 +125,37 @@ function assertParentFlowLinkAllowed(params: {
return;
}
if (params.scopeKind !== "session") {
throw new Error("Only session-scoped tasks can link to flows.");
throw new ParentFlowLinkError(
"scope_kind_not_session",
"Only session-scoped tasks can link to flows.",
{ flowId },
);
}
const flow = getFlowById(flowId);
if (!flow) {
throw new Error(`Parent flow not found: ${flowId}`);
throw new ParentFlowLinkError("parent_flow_not_found", `Parent flow not found: ${flowId}`, {
flowId,
});
}
if (normalizeOwnerKey(flow.ownerKey) !== normalizeOwnerKey(params.ownerKey)) {
throw new Error("Task ownerKey must match parent flow ownerKey.");
throw new ParentFlowLinkError(
"owner_key_mismatch",
"Task ownerKey must match parent flow ownerKey.",
{ flowId },
);
}
if (flow.cancelRequestedAt != null) {
throw new ParentFlowLinkError(
"cancel_requested",
"Parent flow cancellation has already been requested.",
{ flowId, status: flow.status },
);
}
if (isTerminalFlowStatus(flow.status)) {
throw new ParentFlowLinkError("terminal", `Parent flow is already ${flow.status}.`, {
flowId,
status: flow.status,
});
}
}
@@ -694,6 +757,55 @@ function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner {
};
}
function syncManagedFlowCancellationFromTask(task: TaskRecord): void {
const flowId = task.parentFlowId?.trim();
if (!flowId) {
return;
}
let flow = getFlowById(flowId);
if (
!flow ||
flow.syncMode !== "managed" ||
flow.cancelRequestedAt == null ||
isTerminalFlowStatus(flow.status)
) {
return;
}
if (listTasksForFlowId(flowId).some((candidate) => isActiveTaskStatus(candidate.status))) {
return;
}
const endedAt = task.endedAt ?? task.lastEventAt ?? Date.now();
for (let attempt = 0; attempt < 2; attempt += 1) {
const result = updateFlowRecordByIdExpectedRevision({
flowId,
expectedRevision: flow.revision,
patch: {
status: "cancelled",
blockedTaskId: null,
blockedSummary: null,
waitJson: null,
endedAt,
updatedAt: endedAt,
},
});
if (result.applied || result.reason === "not_found") {
return;
}
flow = result.current;
if (
!flow ||
flow.syncMode !== "managed" ||
flow.cancelRequestedAt == null ||
isTerminalFlowStatus(flow.status)
) {
return;
}
if (listTasksForFlowId(flowId).some((candidate) => isActiveTaskStatus(candidate.status))) {
return;
}
}
}
function restoreTaskRegistryOnce() {
if (restoreAttempted) {
return;
@@ -767,6 +879,15 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
error,
});
}
try {
syncManagedFlowCancellationFromTask(next);
} catch (error) {
log.warn("Failed to finalize managed flow cancellation from task update", {
taskId,
flowId: next.parentFlowId,
error,
});
}
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(next),