diff --git a/CHANGELOG.md b/CHANGELOG.md index a2dd4091892..df5ca355bea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai - WhatsApp/reactions: add `reactionLevel` guidance for agent reactions. Thanks @mcaxtr. - Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01. - Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky. +- Tasks/TaskFlow: add managed child task spawning plus sticky cancel intent, so external orchestrators can stop scheduling immediately and let parent TaskFlows settle to `cancelled` once active child tasks finish. (#59610) Thanks @mbelinky. ### Fixes diff --git a/src/commands/doctor-workspace-status.ts b/src/commands/doctor-workspace-status.ts index 3a7a14949bd..eb5f440c964 100644 --- a/src/commands/doctor-workspace-status.ts +++ b/src/commands/doctor-workspace-status.ts @@ -19,7 +19,7 @@ function noteFlowRecoveryHints() { flow.waitJson === undefined ) { findings.push( - `${flow.flowId}: running managed flow has no linked tasks or wait state; inspect or cancel it manually.`, + `${flow.flowId}: running managed TaskFlow has no linked tasks or wait state; inspect or cancel it manually.`, ); } if ( @@ -28,7 +28,7 @@ function noteFlowRecoveryHints() { !tasks.some((task) => task.taskId === flow.blockedTaskId) ) { findings.push( - `${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`, + `${flow.flowId}: blocked TaskFlow points at missing task ${flow.blockedTaskId}; inspect before retrying.`, ); } return findings; diff --git a/src/tasks/runtime-internal.ts b/src/tasks/runtime-internal.ts index 53566c7232b..8a9f41d7764 100644 --- a/src/tasks/runtime-internal.ts +++ b/src/tasks/runtime-internal.ts @@ -23,6 +23,7 @@ export { recordTaskProgressByRunId, resolveTaskForLookupToken, resetTaskRegistryForTests, + isParentFlowLinkError, setTaskCleanupAfterById, setTaskProgressById, setTaskRunDeliveryStatusByRunId, diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index 86cf925a123..e9db6feb840 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -16,6 +16,8 @@ import { failTaskRunByRunId, recordTaskRunProgressByRunId, retryBlockedFlowAsQueuedTaskRun, + runTaskInFlow, + runTaskInFlowForOwner, setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, } from "./task-executor.js"; @@ -289,7 +291,7 @@ describe("task-executor", () => { }); }); - it("cancels active tasks linked to a managed flow", async () => { + it("cancels active tasks linked to a managed TaskFlow", async () => { await withTaskExecutorStateDir(async () => { hoisted.cancelSessionMock.mockResolvedValue(undefined); @@ -330,6 +332,139 @@ describe("task-executor", () => { }); }); + it("runs child tasks under managed TaskFlows", async () => { + await withTaskExecutorStateDir(async () => { + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-flow", + goal: "Inspect PR batch", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + }); + + const created = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-child", + label: "Inspect a PR", + task: "Inspect a PR", + status: "running", + startedAt: 10, + lastEventAt: 10, + }); + + expect(created).toMatchObject({ + found: true, + created: true, + task: expect.objectContaining({ + parentFlowId: flow.flowId, + ownerKey: "agent:main:main", + status: "running", + runId: "run-flow-child", + }), + }); + expect(getTaskById(created.task!.taskId)).toMatchObject({ + parentFlowId: flow.flowId, + ownerKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child", + }); + }); + }); + + it("refuses to add child tasks once cancellation is requested on a managed TaskFlow", async () => { + await withTaskExecutorStateDir(async () => { + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-flow", + goal: "Protected flow", + }); + + const cancelled = await cancelFlowById({ + cfg: {} as never, + flowId: flow.flowId, + }); + + expect(cancelled).toMatchObject({ + found: true, + cancelled: true, + }); + + const created = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-after-cancel", + task: "Should be denied", + }); + + expect(created).toMatchObject({ + found: true, + created: false, + reason: "Flow cancellation has already been requested.", + }); + }); + }); + + it("sets cancel intent before child tasks settle and finalizes later", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockRejectedValue(new Error("still shutting down")); + + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-flow", + goal: "Long running batch", + }); + const child = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-sticky-cancel", + task: "Inspect a PR", + status: "running", + startedAt: 10, + lastEventAt: 10, + }).task!; + + const cancelled = await cancelFlowById({ + cfg: {} as never, + flowId: flow.flowId, + }); + + expect(cancelled).toMatchObject({ + found: true, + cancelled: false, + reason: "One or more child tasks are still active.", + flow: expect.objectContaining({ + flowId: flow.flowId, + cancelRequestedAt: expect.any(Number), + status: "queued", + }), + }); + + failTaskRunByRunId({ + runId: "run-flow-sticky-cancel", + endedAt: 50, + lastEventAt: 50, + error: "cancel completed later", + status: "cancelled", + }); + + expect(getTaskById(child.taskId)).toMatchObject({ + taskId: child.taskId, + status: "cancelled", + }); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + cancelRequestedAt: expect.any(Number), + status: "cancelled", + endedAt: 50, + }); + }); + }); + it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => { await withTaskExecutorStateDir(async () => { const flow = createManagedFlow({ @@ -356,6 +491,32 @@ describe("task-executor", () => { }); }); + it("denies cross-owner managed TaskFlow child spawning through the owner-scoped wrapper", async () => { + await withTaskExecutorStateDir(async () => { + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/managed-flow", + goal: "Protected flow", + }); + + const created = runTaskInFlowForOwner({ + flowId: flow.flowId, + callerOwnerKey: "agent:main:other", + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-cross-owner", + task: "Should be denied", + }); + + expect(created).toMatchObject({ + found: false, + created: false, + reason: "Flow not found.", + }); + expect(findLatestTaskForFlowId(flow.flowId)).toBeUndefined(); + }); + }); + it("cancels active ACP child tasks", async () => { await withTaskExecutorStateDir(async () => { hoisted.cancelSessionMock.mockResolvedValue(undefined); diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index 4c7503c7e75..f959a6495f0 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -6,12 +6,14 @@ import { createFlowForTask, deleteFlowRecordById, getFlowById, + requestFlowCancel, updateFlowRecordByIdExpectedRevision, } from "./flow-runtime-internal.js"; import { cancelTaskById, createTaskRecord, findLatestTaskForFlowId, + isParentFlowLinkError, linkTaskToFlowById, listTasksForFlowId, markTaskLostById, @@ -293,7 +295,7 @@ function resolveRetryableBlockedFlowTask(flowId: string): { flowFound: true, retryable: false, latestTask, - reason: "Latest flow task is not blocked.", + reason: "Latest TaskFlow task is not blocked.", }; } return { @@ -376,6 +378,14 @@ type CancelFlowResult = { tasks?: TaskRecord[]; }; +type RunTaskInFlowResult = { + found: boolean; + created: boolean; + reason?: string; + flow?: FlowRecord; + task?: TaskRecord; +}; + function isActiveTaskStatus(status: TaskStatus): boolean { return status === "queued" || status === "running"; } @@ -386,6 +396,237 @@ function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { ); } +function markFlowCancelRequested(flow: FlowRecord): FlowRecord | FlowUpdateFailure { + if (flow.cancelRequestedAt != null) { + return flow; + } + const result = requestFlowCancel({ + flowId: flow.flowId, + expectedRevision: flow.revision, + }); + if (result.applied) { + return result.flow; + } + return { + reason: + result.reason === "revision_conflict" + ? "Flow changed while cancellation was in progress." + : "Flow not found.", + flow: result.current ?? getFlowById(flow.flowId), + }; +} + +type FlowUpdateFailure = { + reason: string; + flow?: FlowRecord; +}; + +function cancelManagedFlowAfterChildrenSettle( + flow: FlowRecord, + endedAt: number, +): FlowRecord | FlowUpdateFailure { + const result = updateFlowRecordByIdExpectedRevision({ + flowId: flow.flowId, + expectedRevision: flow.revision, + patch: { + status: "cancelled", + blockedTaskId: null, + blockedSummary: null, + waitJson: null, + endedAt, + updatedAt: endedAt, + }, + }); + if (result.applied) { + return result.flow; + } + return { + reason: + result.reason === "revision_conflict" + ? "Flow changed while cancellation was in progress." + : "Flow not found.", + flow: result.current ?? getFlowById(flow.flowId), + }; +} + +function mapRunTaskInFlowCreateError(params: { + error: unknown; + flowId: string; +}): RunTaskInFlowResult { + const flow = getFlowById(params.flowId); + if (isParentFlowLinkError(params.error)) { + if (params.error.code === "cancel_requested") { + return { + found: true, + created: false, + reason: "Flow cancellation has already been requested.", + ...(flow ? { flow } : {}), + }; + } + if (params.error.code === "terminal") { + const terminalStatus = flow?.status ?? params.error.details?.status ?? "terminal"; + return { + found: true, + created: false, + reason: `Flow is already ${terminalStatus}.`, + ...(flow ? { flow } : {}), + }; + } + if (params.error.code === "parent_flow_not_found") { + return { + found: false, + created: false, + reason: "Flow not found.", + }; + } + } + throw params.error; +} + +export function runTaskInFlow(params: { + flowId: string; + runtime: TaskRuntime; + sourceId?: string; + childSessionKey?: string; + parentTaskId?: string; + agentId?: string; + runId?: string; + label?: string; + task: string; + preferMetadata?: boolean; + notifyPolicy?: TaskNotifyPolicy; + deliveryStatus?: TaskDeliveryStatus; + status?: "queued" | "running"; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; +}): RunTaskInFlowResult { + const flow = getFlowById(params.flowId); + if (!flow) { + return { + found: false, + created: false, + reason: "Flow not found.", + }; + } + if (flow.syncMode !== "managed") { + return { + found: true, + created: false, + reason: "Flow does not accept managed child tasks.", + flow, + }; + } + if (flow.cancelRequestedAt != null) { + return { + found: true, + created: false, + reason: "Flow cancellation has already been requested.", + flow, + }; + } + if (isTerminalFlowStatus(flow.status)) { + return { + found: true, + created: false, + reason: `Flow is already ${flow.status}.`, + flow, + }; + } + + const common = { + runtime: params.runtime, + sourceId: params.sourceId, + ownerKey: flow.ownerKey, + scopeKind: "session" as const, + requesterOrigin: flow.requesterOrigin, + parentFlowId: flow.flowId, + childSessionKey: params.childSessionKey, + parentTaskId: params.parentTaskId, + agentId: params.agentId, + runId: params.runId, + label: params.label, + task: params.task, + preferMetadata: params.preferMetadata, + notifyPolicy: params.notifyPolicy, + deliveryStatus: params.deliveryStatus ?? "pending", + }; + let task: TaskRecord; + try { + task = + params.status === "running" + ? createRunningTaskRun({ + ...common, + startedAt: params.startedAt, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + }) + : createQueuedTaskRun(common); + } catch (error) { + return mapRunTaskInFlowCreateError({ + error, + flowId: flow.flowId, + }); + } + + return { + found: true, + created: true, + flow: getFlowById(flow.flowId) ?? flow, + task, + }; +} + +export function runTaskInFlowForOwner(params: { + flowId: string; + callerOwnerKey: string; + runtime: TaskRuntime; + sourceId?: string; + childSessionKey?: string; + parentTaskId?: string; + agentId?: string; + runId?: string; + label?: string; + task: string; + preferMetadata?: boolean; + notifyPolicy?: TaskNotifyPolicy; + deliveryStatus?: TaskDeliveryStatus; + status?: "queued" | "running"; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; +}): RunTaskInFlowResult { + const flow = getFlowByIdForOwner({ + flowId: params.flowId, + callerOwnerKey: params.callerOwnerKey, + }); + if (!flow) { + return { + found: false, + created: false, + reason: "Flow not found.", + }; + } + return runTaskInFlow({ + flowId: flow.flowId, + runtime: params.runtime, + sourceId: params.sourceId, + childSessionKey: params.childSessionKey, + parentTaskId: params.parentTaskId, + agentId: params.agentId, + runId: params.runId, + label: params.label, + task: params.task, + preferMetadata: params.preferMetadata, + notifyPolicy: params.notifyPolicy, + deliveryStatus: params.deliveryStatus, + status: params.status, + startedAt: params.startedAt, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + }); +} + export async function cancelFlowById(params: { cfg: OpenClawConfig; flowId: string; @@ -398,6 +639,25 @@ export async function cancelFlowById(params: { reason: "Flow not found.", }; } + if (isTerminalFlowStatus(flow.status)) { + return { + found: true, + cancelled: false, + reason: `Flow is already ${flow.status}.`, + flow, + tasks: listTasksForFlowId(flow.flowId), + }; + } + const cancelRequestedFlow = markFlowCancelRequested(flow); + if ("reason" in cancelRequestedFlow) { + return { + found: true, + cancelled: false, + reason: cancelRequestedFlow.reason, + flow: cancelRequestedFlow.flow, + tasks: listTasksForFlowId(flow.flowId), + }; + } const linkedTasks = listTasksForFlowId(flow.flowId); const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status)); for (const task of activeTasks) { @@ -413,48 +673,38 @@ export async function cancelFlowById(params: { found: true, cancelled: false, reason: "One or more child tasks are still active.", - flow: getFlowById(flow.flowId), - tasks: refreshedTasks, - }; - } - if (isTerminalFlowStatus(flow.status)) { - return { - found: true, - cancelled: false, - reason: `Flow is already ${flow.status}.`, - flow, + flow: getFlowById(flow.flowId) ?? cancelRequestedFlow, tasks: refreshedTasks, }; } const now = Date.now(); - const refreshedFlow = getFlowById(flow.flowId) ?? flow; - const updatedFlowResult = updateFlowRecordByIdExpectedRevision({ - flowId: refreshedFlow.flowId, - expectedRevision: refreshedFlow.revision, - patch: { - status: "cancelled", - blockedTaskId: null, - blockedSummary: null, - endedAt: now, - updatedAt: now, - }, - }); - if (!updatedFlowResult.applied) { + const refreshedFlow = getFlowById(flow.flowId) ?? cancelRequestedFlow; + if (isTerminalFlowStatus(refreshedFlow.status)) { + return { + found: true, + cancelled: refreshedFlow.status === "cancelled", + reason: + refreshedFlow.status === "cancelled" + ? undefined + : `Flow is already ${refreshedFlow.status}.`, + flow: refreshedFlow, + tasks: refreshedTasks, + }; + } + const updatedFlow = cancelManagedFlowAfterChildrenSettle(refreshedFlow, now); + if ("reason" in updatedFlow) { return { found: true, cancelled: false, - reason: - updatedFlowResult.reason === "revision_conflict" - ? "Flow changed while cancellation was in progress." - : "Flow not found.", - flow: updatedFlowResult.current ?? getFlowById(flow.flowId), + reason: updatedFlow.reason, + flow: updatedFlow.flow, tasks: refreshedTasks, }; } return { found: true, cancelled: true, - flow: updatedFlowResult.flow, + flow: updatedFlow, tasks: refreshedTasks, }; } diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 8b246013a02..810f2a0c676 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -19,6 +19,7 @@ import { findTaskByRunId, getTaskById, getTaskRegistrySummary, + isParentFlowLinkError, listTasksForOwnerKey, listTaskRecords, linkTaskToFlowById, @@ -379,6 +380,65 @@ describe("task-registry", () => { }); }); + it("rejects parent flow links once cancellation has been requested", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-registry", + goal: "Cancelling flow", + cancelRequestedAt: 42, + }); + + try { + createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: flow.flowId, + runId: "cancel-requested-link", + task: "Should be denied", + }); + throw new Error("Expected createTaskRecord to throw."); + } catch (error) { + expect(isParentFlowLinkError(error)).toBe(true); + expect(error).toMatchObject({ + code: "cancel_requested", + message: "Parent flow cancellation has already been requested.", + }); + } + }); + }); + + it("rejects parent flow links for terminal flows", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + + const flow = createManagedFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/task-registry", + goal: "Completed flow", + status: "cancelled", + }); + + expect(() => + createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + parentFlowId: flow.flowId, + runId: "terminal-flow-link", + task: "Should be denied", + }), + ).toThrow("Parent flow is already cancelled."); + }); + }); + it("delivers ACP completion to the requester channel when a delivery origin exists", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 7c087454fb5..c8d032159a0 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -9,7 +9,12 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; -import { getFlowById, syncFlowFromTask } from "./flow-runtime-internal.js"; +import type { FlowRecord } from "./flow-registry.types.js"; +import { + getFlowById, + syncFlowFromTask, + updateFlowRecordByIdExpectedRevision, +} from "./flow-runtime-internal.js"; import { formatTaskBlockedFollowupMessage, formatTaskStateChangeMessage, @@ -63,6 +68,41 @@ type TaskDeliveryOwner = { flowId?: string; }; +export type ParentFlowLinkErrorCode = + | "scope_kind_not_session" + | "parent_flow_not_found" + | "owner_key_mismatch" + | "cancel_requested" + | "terminal"; + +export class ParentFlowLinkError extends Error { + constructor( + public readonly code: ParentFlowLinkErrorCode, + message: string, + public readonly details?: { + flowId?: string; + status?: FlowRecord["status"]; + }, + ) { + super(message); + this.name = "ParentFlowLinkError"; + } +} + +export function isParentFlowLinkError(error: unknown): error is ParentFlowLinkError { + return error instanceof ParentFlowLinkError; +} + +function isActiveTaskStatus(status: TaskStatus): boolean { + return status === "queued" || status === "running"; +} + +function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { + return ( + status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost" + ); +} + function assertTaskOwner(params: { ownerKey: string; scopeKind: TaskScopeKind }) { const ownerKey = params.ownerKey.trim(); if (!ownerKey && params.scopeKind !== "system") { @@ -85,14 +125,37 @@ function assertParentFlowLinkAllowed(params: { return; } if (params.scopeKind !== "session") { - throw new Error("Only session-scoped tasks can link to flows."); + throw new ParentFlowLinkError( + "scope_kind_not_session", + "Only session-scoped tasks can link to flows.", + { flowId }, + ); } const flow = getFlowById(flowId); if (!flow) { - throw new Error(`Parent flow not found: ${flowId}`); + throw new ParentFlowLinkError("parent_flow_not_found", `Parent flow not found: ${flowId}`, { + flowId, + }); } if (normalizeOwnerKey(flow.ownerKey) !== normalizeOwnerKey(params.ownerKey)) { - throw new Error("Task ownerKey must match parent flow ownerKey."); + throw new ParentFlowLinkError( + "owner_key_mismatch", + "Task ownerKey must match parent flow ownerKey.", + { flowId }, + ); + } + if (flow.cancelRequestedAt != null) { + throw new ParentFlowLinkError( + "cancel_requested", + "Parent flow cancellation has already been requested.", + { flowId, status: flow.status }, + ); + } + if (isTerminalFlowStatus(flow.status)) { + throw new ParentFlowLinkError("terminal", `Parent flow is already ${flow.status}.`, { + flowId, + status: flow.status, + }); } } @@ -694,6 +757,55 @@ function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner { }; } +function syncManagedFlowCancellationFromTask(task: TaskRecord): void { + const flowId = task.parentFlowId?.trim(); + if (!flowId) { + return; + } + let flow = getFlowById(flowId); + if ( + !flow || + flow.syncMode !== "managed" || + flow.cancelRequestedAt == null || + isTerminalFlowStatus(flow.status) + ) { + return; + } + if (listTasksForFlowId(flowId).some((candidate) => isActiveTaskStatus(candidate.status))) { + return; + } + const endedAt = task.endedAt ?? task.lastEventAt ?? Date.now(); + for (let attempt = 0; attempt < 2; attempt += 1) { + const result = updateFlowRecordByIdExpectedRevision({ + flowId, + expectedRevision: flow.revision, + patch: { + status: "cancelled", + blockedTaskId: null, + blockedSummary: null, + waitJson: null, + endedAt, + updatedAt: endedAt, + }, + }); + if (result.applied || result.reason === "not_found") { + return; + } + flow = result.current; + if ( + !flow || + flow.syncMode !== "managed" || + flow.cancelRequestedAt == null || + isTerminalFlowStatus(flow.status) + ) { + return; + } + if (listTasksForFlowId(flowId).some((candidate) => isActiveTaskStatus(candidate.status))) { + return; + } + } +} + function restoreTaskRegistryOnce() { if (restoreAttempted) { return; @@ -767,6 +879,15 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu error, }); } + try { + syncManagedFlowCancellationFromTask(next); + } catch (error) { + log.warn("Failed to finalize managed flow cancellation from task update", { + taskId, + flowId: next.parentFlowId, + error, + }); + } emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(next),