Plugins: add bound TaskFlow runtime (#59622)

Merged via squash.

Prepared head SHA: b4649f3238
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-04-02 13:17:09 +02:00
committed by GitHub
parent 474409deb5
commit bbf9800a8e
9 changed files with 703 additions and 0 deletions

View File

@@ -189,6 +189,15 @@ describe("plugin runtime command execution", () => {
]);
},
},
{
name: "exposes runtime.taskFlow binding helpers",
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {
expectFunctionKeys(runtime.taskFlow as Record<string, unknown>, [
"bindSession",
"fromToolContext",
]);
},
},
{
name: "exposes runtime.agent host helpers",
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {

View File

@@ -16,6 +16,7 @@ import { createRuntimeEvents } from "./runtime-events.js";
import { createRuntimeLogging } from "./runtime-logging.js";
import { createRuntimeMedia } from "./runtime-media.js";
import { createRuntimeSystem } from "./runtime-system.js";
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
import type { PluginRuntime } from "./types.js";
const loadTtsRuntime = createLazyRuntimeModule(() => import("./runtime-tts.runtime.js"));
@@ -203,6 +204,7 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}):
events: createRuntimeEvents(),
logging: createRuntimeLogging(),
state: { resolveStateDir },
taskFlow: createRuntimeTaskFlow(),
} satisfies Omit<
PluginRuntime,
"tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration"

View File

@@ -0,0 +1,157 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { getFlowById, resetFlowRegistryForTests } from "../../tasks/flow-registry.js";
import { getTaskById, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
afterEach(() => {
resetTaskRegistryForTests();
resetFlowRegistryForTests({ persist: false });
vi.clearAllMocks();
});
describe("runtime TaskFlow", () => {
it("binds managed TaskFlow operations to a session key", () => {
const runtime = createRuntimeTaskFlow();
const taskFlow = runtime.bindSession({
sessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
});
const created = taskFlow.createManaged({
controllerId: "tests/runtime-taskflow",
goal: "Triage inbox",
currentStep: "classify",
stateJson: { lane: "inbox" },
});
expect(created).toMatchObject({
syncMode: "managed",
ownerKey: "agent:main:main",
controllerId: "tests/runtime-taskflow",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
goal: "Triage inbox",
});
expect(taskFlow.get(created.flowId)?.flowId).toBe(created.flowId);
expect(taskFlow.findLatest()?.flowId).toBe(created.flowId);
expect(taskFlow.resolve("agent:main:main")?.flowId).toBe(created.flowId);
});
it("binds TaskFlows from trusted tool context", () => {
const runtime = createRuntimeTaskFlow();
const taskFlow = runtime.fromToolContext({
sessionKey: "agent:main:main",
deliveryContext: {
channel: "discord",
to: "channel:123",
threadId: "thread:456",
},
});
const created = taskFlow.createManaged({
controllerId: "tests/runtime-taskflow",
goal: "Review queue",
});
expect(created.requesterOrigin).toMatchObject({
channel: "discord",
to: "channel:123",
threadId: "thread:456",
});
});
it("rejects tool contexts without a bound session key", () => {
const runtime = createRuntimeTaskFlow();
expect(() =>
runtime.fromToolContext({
sessionKey: undefined,
deliveryContext: undefined,
}),
).toThrow("TaskFlow runtime requires tool context with a sessionKey.");
});
it("keeps TaskFlow reads owner-scoped and runs child tasks under the bound TaskFlow", () => {
const runtime = createRuntimeTaskFlow();
const ownerTaskFlow = runtime.bindSession({
sessionKey: "agent:main:main",
});
const otherTaskFlow = runtime.bindSession({
sessionKey: "agent:main:other",
});
const created = ownerTaskFlow.createManaged({
controllerId: "tests/runtime-taskflow",
goal: "Inspect PR batch",
});
expect(otherTaskFlow.get(created.flowId)).toBeUndefined();
expect(otherTaskFlow.list()).toEqual([]);
const child = ownerTaskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "runtime-taskflow-child",
task: "Inspect PR 1",
status: "running",
startedAt: 10,
lastEventAt: 10,
});
expect(child).toMatchObject({
created: true,
flow: expect.objectContaining({
flowId: created.flowId,
}),
task: expect.objectContaining({
parentFlowId: created.flowId,
ownerKey: "agent:main:main",
runId: "runtime-taskflow-child",
}),
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
expect(getTaskById(child.task.taskId)).toMatchObject({
parentFlowId: created.flowId,
ownerKey: "agent:main:main",
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
});
expect(ownerTaskFlow.getTaskSummary(created.flowId)).toMatchObject({
total: 1,
active: 1,
});
});
});

View File

@@ -0,0 +1,461 @@
import type { OpenClawConfig } from "../../config/config.js";
import {
findLatestFlowForOwner,
getFlowByIdForOwner,
listFlowsForOwner,
resolveFlowForLookupTokenForOwner,
} from "../../tasks/flow-owner-access.js";
import type { FlowRecord, JsonValue } from "../../tasks/flow-registry.types.js";
import {
createManagedFlow,
failFlow,
finishFlow,
type FlowUpdateResult,
requestFlowCancel,
resumeFlow,
setFlowWaiting,
} from "../../tasks/flow-runtime-internal.js";
import {
cancelFlowByIdForOwner,
getFlowTaskSummary,
runTaskInFlowForOwner,
} from "../../tasks/task-executor.js";
import type {
TaskDeliveryStatus,
TaskDeliveryState,
TaskNotifyPolicy,
TaskRecord,
TaskRegistrySummary,
TaskRuntime,
} from "../../tasks/task-registry.types.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import type { OpenClawPluginToolContext } from "../types.js";
export type ManagedTaskFlowRecord = FlowRecord & {
syncMode: "managed";
controllerId: string;
};
export type ManagedTaskFlowMutationErrorCode = "not_found" | "not_managed" | "revision_conflict";
export type ManagedTaskFlowMutationResult =
| {
applied: true;
flow: ManagedTaskFlowRecord;
}
| {
applied: false;
code: ManagedTaskFlowMutationErrorCode;
current?: FlowRecord;
};
export type BoundTaskFlowTaskRunResult =
| {
created: true;
flow: ManagedTaskFlowRecord;
task: TaskRecord;
}
| {
created: false;
reason: string;
found: boolean;
flow?: FlowRecord;
};
export type BoundTaskFlowCancelResult = Awaited<ReturnType<typeof cancelFlowByIdForOwner>>;
export type BoundTaskFlowRuntime = {
readonly sessionKey: string;
readonly requesterOrigin?: TaskDeliveryState["requesterOrigin"];
createManaged: (params: {
controllerId: string;
goal: string;
status?: ManagedTaskFlowRecord["status"];
notifyPolicy?: TaskNotifyPolicy;
currentStep?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
cancelRequestedAt?: number | null;
createdAt?: number;
updatedAt?: number;
endedAt?: number | null;
}) => ManagedTaskFlowRecord;
get: (flowId: string) => FlowRecord | undefined;
list: () => FlowRecord[];
findLatest: () => FlowRecord | undefined;
resolve: (token: string) => FlowRecord | undefined;
getTaskSummary: (flowId: string) => TaskRegistrySummary | undefined;
setWaiting: (params: {
flowId: string;
expectedRevision: number;
currentStep?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
}) => ManagedTaskFlowMutationResult;
resume: (params: {
flowId: string;
expectedRevision: number;
status?: Extract<ManagedTaskFlowRecord["status"], "queued" | "running">;
currentStep?: string | null;
stateJson?: JsonValue | null;
updatedAt?: number;
}) => ManagedTaskFlowMutationResult;
finish: (params: {
flowId: string;
expectedRevision: number;
stateJson?: JsonValue | null;
updatedAt?: number;
endedAt?: number;
}) => ManagedTaskFlowMutationResult;
fail: (params: {
flowId: string;
expectedRevision: number;
stateJson?: JsonValue | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
endedAt?: number;
}) => ManagedTaskFlowMutationResult;
requestCancel: (params: {
flowId: string;
expectedRevision: number;
cancelRequestedAt?: number;
}) => ManagedTaskFlowMutationResult;
cancel: (params: { flowId: string; cfg: OpenClawConfig }) => Promise<BoundTaskFlowCancelResult>;
runTask: (params: {
flowId: string;
runtime: TaskRuntime;
sourceId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status?: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
}) => BoundTaskFlowTaskRunResult;
};
export type PluginRuntimeTaskFlow = {
bindSession: (params: {
sessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}) => BoundTaskFlowRuntime;
fromToolContext: (
ctx: Pick<OpenClawPluginToolContext, "sessionKey" | "deliveryContext">,
) => BoundTaskFlowRuntime;
};
function assertSessionKey(sessionKey: string | undefined, errorMessage: string): string {
const normalized = sessionKey?.trim();
if (!normalized) {
throw new Error(errorMessage);
}
return normalized;
}
function asManagedTaskFlowRecord(flow: FlowRecord | undefined): ManagedTaskFlowRecord | undefined {
if (!flow || flow.syncMode !== "managed" || !flow.controllerId) {
return undefined;
}
return flow as ManagedTaskFlowRecord;
}
function resolveManagedFlowForOwner(params: {
flowId: string;
ownerKey: string;
}):
| { ok: true; flow: ManagedTaskFlowRecord }
| { ok: false; code: "not_found" | "not_managed"; current?: FlowRecord } {
const flow = getFlowByIdForOwner({
flowId: params.flowId,
callerOwnerKey: params.ownerKey,
});
if (!flow) {
return { ok: false, code: "not_found" };
}
const managed = asManagedTaskFlowRecord(flow);
if (!managed) {
return { ok: false, code: "not_managed", current: flow };
}
return { ok: true, flow: managed };
}
function mapFlowUpdateResult(result: FlowUpdateResult): ManagedTaskFlowMutationResult {
if (result.applied) {
const managed = asManagedTaskFlowRecord(result.flow);
if (!managed) {
return {
applied: false,
code: "not_managed",
current: result.flow,
};
}
return {
applied: true,
flow: managed,
};
}
return {
applied: false,
code: result.reason,
...(result.current ? { current: result.current } : {}),
};
}
function createBoundTaskFlowRuntime(params: {
sessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}): BoundTaskFlowRuntime {
const ownerKey = assertSessionKey(
params.sessionKey,
"TaskFlow runtime requires a bound sessionKey.",
);
const requesterOrigin = params.requesterOrigin
? normalizeDeliveryContext(params.requesterOrigin)
: undefined;
return {
sessionKey: ownerKey,
...(requesterOrigin ? { requesterOrigin } : {}),
createManaged: (input) =>
createManagedFlow({
ownerKey,
controllerId: input.controllerId,
requesterOrigin,
status: input.status,
notifyPolicy: input.notifyPolicy,
goal: input.goal,
currentStep: input.currentStep,
stateJson: input.stateJson,
waitJson: input.waitJson,
cancelRequestedAt: input.cancelRequestedAt,
createdAt: input.createdAt,
updatedAt: input.updatedAt,
endedAt: input.endedAt,
}) as ManagedTaskFlowRecord,
get: (flowId) =>
getFlowByIdForOwner({
flowId,
callerOwnerKey: ownerKey,
}),
list: () =>
listFlowsForOwner({
callerOwnerKey: ownerKey,
}),
findLatest: () =>
findLatestFlowForOwner({
callerOwnerKey: ownerKey,
}),
resolve: (token) =>
resolveFlowForLookupTokenForOwner({
token,
callerOwnerKey: ownerKey,
}),
getTaskSummary: (flowId) => {
const flow = getFlowByIdForOwner({
flowId,
callerOwnerKey: ownerKey,
});
return flow ? getFlowTaskSummary(flow.flowId) : undefined;
},
setWaiting: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
setFlowWaiting({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
currentStep: input.currentStep,
stateJson: input.stateJson,
waitJson: input.waitJson,
blockedTaskId: input.blockedTaskId,
blockedSummary: input.blockedSummary,
updatedAt: input.updatedAt,
}),
);
},
resume: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
resumeFlow({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
status: input.status,
currentStep: input.currentStep,
stateJson: input.stateJson,
updatedAt: input.updatedAt,
}),
);
},
finish: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
finishFlow({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
stateJson: input.stateJson,
updatedAt: input.updatedAt,
endedAt: input.endedAt,
}),
);
},
fail: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
failFlow({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
stateJson: input.stateJson,
blockedTaskId: input.blockedTaskId,
blockedSummary: input.blockedSummary,
updatedAt: input.updatedAt,
endedAt: input.endedAt,
}),
);
},
requestCancel: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
requestFlowCancel({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
cancelRequestedAt: input.cancelRequestedAt,
}),
);
},
cancel: ({ flowId, cfg }) =>
cancelFlowByIdForOwner({
cfg,
flowId,
callerOwnerKey: ownerKey,
}),
runTask: (input) => {
const created = runTaskInFlowForOwner({
flowId: input.flowId,
callerOwnerKey: ownerKey,
runtime: input.runtime,
sourceId: input.sourceId,
childSessionKey: input.childSessionKey,
parentTaskId: input.parentTaskId,
agentId: input.agentId,
runId: input.runId,
label: input.label,
task: input.task,
preferMetadata: input.preferMetadata,
notifyPolicy: input.notifyPolicy,
deliveryStatus: input.deliveryStatus,
status: input.status,
startedAt: input.startedAt,
lastEventAt: input.lastEventAt,
progressSummary: input.progressSummary,
});
if (!created.created) {
return {
created: false,
found: created.found,
reason: created.reason ?? "Task was not created.",
...(created.flow ? { flow: created.flow } : {}),
};
}
const managed = asManagedTaskFlowRecord(created.flow);
if (!managed) {
return {
created: false,
found: true,
reason: "TaskFlow does not accept managed child tasks.",
flow: created.flow,
};
}
if (!created.task) {
return {
created: false,
found: true,
reason: "Task was not created.",
flow: created.flow,
};
}
return {
created: true,
flow: managed,
task: created.task,
};
},
};
}
export function createRuntimeTaskFlow(): PluginRuntimeTaskFlow {
return {
bindSession: (params) =>
createBoundTaskFlowRuntime({
sessionKey: params.sessionKey,
requesterOrigin: params.requesterOrigin,
}),
fromToolContext: (ctx) =>
createBoundTaskFlowRuntime({
sessionKey: assertSessionKey(
ctx.sessionKey,
"TaskFlow runtime requires tool context with a sessionKey.",
),
requesterOrigin: ctx.deliveryContext,
}),
};
}

View File

@@ -103,6 +103,7 @@ export type PluginRuntimeCore = {
state: {
resolveStateDir: typeof import("../../config/paths.js").resolveStateDir;
};
taskFlow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
modelAuth: {
/** Resolve auth for a model. Only provider/model and optional cfg are used. */
getApiKeyForModel: (params: {

View File

@@ -4,6 +4,8 @@ export {
createManagedFlow,
deleteFlowRecordById,
findLatestFlowForOwnerKey,
failFlow,
finishFlow,
getFlowById,
listFlowRecords,
listFlowsForOwnerKey,
@@ -15,3 +17,5 @@ export {
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./flow-registry.js";
export type { FlowUpdateResult } from "./flow-registry.js";