refactor(plugin-sdk): add task domain runtime surfaces (#59805)

* refactor(plugin-sdk): add task domain runtime views

* chore(plugin-sdk): refresh api baseline

* fix(plugin-sdk): preserve task runtime owner isolation
This commit is contained in:
Vincent Koc
2026-04-03 02:11:21 +09:00
committed by GitHub
parent f30b4bc717
commit 774beb8e5c
12 changed files with 969 additions and 20 deletions

View File

@@ -190,8 +190,16 @@ describe("plugin runtime command execution", () => {
},
},
{
name: "exposes runtime.tasks.flow as the canonical TaskFlow runtime and keeps runtime.taskFlow as an alias",
name: "exposes canonical runtime.tasks.runs and runtime.tasks.flows while keeping legacy TaskFlow aliases",
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {
expectFunctionKeys(runtime.tasks.runs as Record<string, unknown>, [
"bindSession",
"fromToolContext",
]);
expectFunctionKeys(runtime.tasks.flows as Record<string, unknown>, [
"bindSession",
"fromToolContext",
]);
expectFunctionKeys(runtime.tasks.flow as Record<string, unknown>, [
"bindSession",
"fromToolContext",

View File

@@ -17,6 +17,7 @@ import { createRuntimeLogging } from "./runtime-logging.js";
import { createRuntimeMedia } from "./runtime-media.js";
import { createRuntimeSystem } from "./runtime-system.js";
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
import { createRuntimeTasks } from "./runtime-tasks.js";
import type { PluginRuntime } from "./types.js";
const loadTtsRuntime = createLazyRuntimeModule(() => import("./runtime-tts.runtime.js"));
@@ -185,6 +186,9 @@ export type CreatePluginRuntimeOptions = {
export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): PluginRuntime {
const mediaUnderstanding = createRuntimeMediaUnderstandingFacade();
const taskFlow = createRuntimeTaskFlow();
const tasks = createRuntimeTasks({
legacyTaskFlow: taskFlow,
});
const runtime = {
// Sourced from the shared OpenClaw version resolver (#52899) so plugins
// always see the same version the CLI reports, avoiding API-version drift.
@@ -205,9 +209,7 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}):
events: createRuntimeEvents(),
logging: createRuntimeLogging(),
state: { resolveStateDir },
tasks: {
flow: taskFlow,
},
tasks,
taskFlow,
} satisfies Omit<
PluginRuntime,

View File

@@ -0,0 +1,236 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js";
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
import { createRuntimeTaskFlows, createRuntimeTaskRuns } from "./runtime-tasks.js";
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
afterEach(() => {
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests({ persist: false });
vi.clearAllMocks();
});
describe("runtime tasks", () => {
it("exposes canonical task and TaskFlow DTOs without leaking raw registry fields", () => {
const legacyTaskFlow = createRuntimeTaskFlow().bindSession({
sessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
});
const taskFlows = createRuntimeTaskFlows().bindSession({
sessionKey: "agent:main:main",
});
const taskRuns = createRuntimeTaskRuns().bindSession({
sessionKey: "agent:main:main",
});
const otherTaskFlows = createRuntimeTaskFlows().bindSession({
sessionKey: "agent:main:other",
});
const otherTaskRuns = createRuntimeTaskRuns().bindSession({
sessionKey: "agent:main:other",
});
const created = legacyTaskFlow.createManaged({
controllerId: "tests/runtime-tasks",
goal: "Review inbox",
currentStep: "triage",
stateJson: { lane: "priority" },
});
const child = legacyTaskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "runtime-task-run",
label: "Inbox triage",
task: "Review PR 1",
status: "running",
startedAt: 10,
lastEventAt: 11,
progressSummary: "Inspecting",
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
expect(taskFlows.list()).toEqual(
expect.arrayContaining([
expect.objectContaining({
id: created.flowId,
ownerKey: "agent:main:main",
goal: "Review inbox",
currentStep: "triage",
}),
]),
);
expect(taskFlows.get(created.flowId)).toMatchObject({
id: created.flowId,
ownerKey: "agent:main:main",
goal: "Review inbox",
currentStep: "triage",
state: { lane: "priority" },
taskSummary: {
total: 1,
active: 1,
},
tasks: [
expect.objectContaining({
id: child.task.taskId,
flowId: created.flowId,
title: "Review PR 1",
label: "Inbox triage",
runId: "runtime-task-run",
}),
],
});
expect(taskRuns.list()).toEqual(
expect.arrayContaining([
expect.objectContaining({
id: child.task.taskId,
flowId: created.flowId,
sessionKey: "agent:main:main",
title: "Review PR 1",
status: "running",
}),
]),
);
expect(taskRuns.get(child.task.taskId)).toMatchObject({
id: child.task.taskId,
flowId: created.flowId,
title: "Review PR 1",
progressSummary: "Inspecting",
});
expect(taskRuns.findLatest()?.id).toBe(child.task.taskId);
expect(taskRuns.resolve("runtime-task-run")?.id).toBe(child.task.taskId);
expect(taskFlows.getTaskSummary(created.flowId)).toMatchObject({
total: 1,
active: 1,
});
expect(otherTaskFlows.get(created.flowId)).toBeUndefined();
expect(otherTaskRuns.get(child.task.taskId)).toBeUndefined();
const flowDetail = taskFlows.get(created.flowId);
expect(flowDetail).not.toHaveProperty("revision");
expect(flowDetail).not.toHaveProperty("controllerId");
expect(flowDetail).not.toHaveProperty("syncMode");
const taskDetail = taskRuns.get(child.task.taskId);
expect(taskDetail).not.toHaveProperty("taskId");
expect(taskDetail).not.toHaveProperty("requesterSessionKey");
expect(taskDetail).not.toHaveProperty("scopeKind");
});
it("maps task cancellation results onto canonical task DTOs", async () => {
const legacyTaskFlow = createRuntimeTaskFlow().bindSession({
sessionKey: "agent:main:main",
});
const taskRuns = createRuntimeTaskRuns().bindSession({
sessionKey: "agent:main:main",
});
const created = legacyTaskFlow.createManaged({
controllerId: "tests/runtime-tasks",
goal: "Cancel active task",
});
const child = legacyTaskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "runtime-task-cancel",
task: "Cancel me",
status: "running",
startedAt: 20,
lastEventAt: 21,
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
const result = await taskRuns.cancel({
taskId: child.task.taskId,
cfg: {} as never,
});
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith({
cfg: {},
sessionKey: "agent:main:subagent:child",
reason: "task-cancel",
});
expect(result).toMatchObject({
found: true,
cancelled: true,
task: {
id: child.task.taskId,
title: "Cancel me",
status: "cancelled",
},
});
});
it("does not allow cross-owner task cancellation or leak task details", async () => {
const legacyTaskFlow = createRuntimeTaskFlow().bindSession({
sessionKey: "agent:main:main",
});
const otherTaskRuns = createRuntimeTaskRuns().bindSession({
sessionKey: "agent:main:other",
});
const created = legacyTaskFlow.createManaged({
controllerId: "tests/runtime-tasks",
goal: "Keep owner isolation",
});
const child = legacyTaskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "runtime-task-isolation",
task: "Do not cancel me",
status: "running",
startedAt: 30,
lastEventAt: 31,
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
const result = await otherTaskRuns.cancel({
taskId: child.task.taskId,
cfg: {} as never,
});
expect(hoisted.cancelSessionMock).not.toHaveBeenCalled();
expect(result).toEqual({
found: false,
cancelled: false,
reason: "Task not found.",
});
expect(otherTaskRuns.get(child.task.taskId)).toBeUndefined();
});
});

View File

@@ -0,0 +1,263 @@
import type { OpenClawConfig } from "../../config/config.js";
import { cancelTaskById, listTasksForFlowId } from "../../tasks/runtime-internal.js";
import {
mapTaskFlowDetail,
mapTaskFlowView,
mapTaskRunAggregateSummary,
mapTaskRunDetail,
mapTaskRunView,
} from "../../tasks/task-domain-views.js";
import { getFlowTaskSummary } from "../../tasks/task-executor.js";
import {
getTaskFlowByIdForOwner,
listTaskFlowsForOwner,
findLatestTaskFlowForOwner,
resolveTaskFlowForLookupTokenForOwner,
} from "../../tasks/task-flow-owner-access.js";
import {
findLatestTaskForRelatedSessionKeyForOwner,
getTaskByIdForOwner,
listTasksForRelatedSessionKeyForOwner,
resolveTaskForLookupTokenForOwner,
} from "../../tasks/task-owner-access.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import type { OpenClawPluginToolContext } from "../types.js";
import type { PluginRuntimeTaskFlow } from "./runtime-taskflow.js";
import type {
TaskFlowDetail,
TaskFlowView,
TaskRunAggregateSummary,
TaskRunCancelResult,
TaskRunDetail,
TaskRunView,
} from "./task-domain-types.js";
function assertSessionKey(sessionKey: string | undefined, errorMessage: string): string {
const normalized = sessionKey?.trim();
if (!normalized) {
throw new Error(errorMessage);
}
return normalized;
}
function mapCancelledTaskResult(
result: Awaited<ReturnType<typeof cancelTaskById>>,
): TaskRunCancelResult {
return {
found: result.found,
cancelled: result.cancelled,
...(result.reason ? { reason: result.reason } : {}),
...(result.task ? { task: mapTaskRunDetail(result.task) } : {}),
};
}
export type BoundTaskRunsRuntime = {
readonly sessionKey: string;
readonly requesterOrigin?: ReturnType<typeof normalizeDeliveryContext>;
get: (taskId: string) => TaskRunDetail | undefined;
list: () => TaskRunView[];
findLatest: () => TaskRunDetail | undefined;
resolve: (token: string) => TaskRunDetail | undefined;
cancel: (params: { taskId: string; cfg: OpenClawConfig }) => Promise<TaskRunCancelResult>;
};
export type PluginRuntimeTaskRuns = {
bindSession: (params: {
sessionKey: string;
requesterOrigin?: import("../../tasks/task-registry.types.js").TaskDeliveryState["requesterOrigin"];
}) => BoundTaskRunsRuntime;
fromToolContext: (
ctx: Pick<OpenClawPluginToolContext, "sessionKey" | "deliveryContext">,
) => BoundTaskRunsRuntime;
};
export type BoundTaskFlowsRuntime = {
readonly sessionKey: string;
readonly requesterOrigin?: ReturnType<typeof normalizeDeliveryContext>;
get: (flowId: string) => TaskFlowDetail | undefined;
list: () => TaskFlowView[];
findLatest: () => TaskFlowDetail | undefined;
resolve: (token: string) => TaskFlowDetail | undefined;
getTaskSummary: (flowId: string) => TaskRunAggregateSummary | undefined;
};
export type PluginRuntimeTaskFlows = {
bindSession: (params: {
sessionKey: string;
requesterOrigin?: import("../../tasks/task-registry.types.js").TaskDeliveryState["requesterOrigin"];
}) => BoundTaskFlowsRuntime;
fromToolContext: (
ctx: Pick<OpenClawPluginToolContext, "sessionKey" | "deliveryContext">,
) => BoundTaskFlowsRuntime;
};
export type PluginRuntimeTasks = {
runs: PluginRuntimeTaskRuns;
flows: PluginRuntimeTaskFlows;
/** @deprecated Use runtime.tasks.flows for DTO-based TaskFlow access. */
flow: PluginRuntimeTaskFlow;
};
function createBoundTaskRunsRuntime(params: {
sessionKey: string;
requesterOrigin?: import("../../tasks/task-registry.types.js").TaskDeliveryState["requesterOrigin"];
}): BoundTaskRunsRuntime {
const ownerKey = assertSessionKey(
params.sessionKey,
"Tasks runtime requires a bound sessionKey.",
);
const requesterOrigin = params.requesterOrigin
? normalizeDeliveryContext(params.requesterOrigin)
: undefined;
return {
sessionKey: ownerKey,
...(requesterOrigin ? { requesterOrigin } : {}),
get: (taskId) => {
const task = getTaskByIdForOwner({ taskId, callerOwnerKey: ownerKey });
return task ? mapTaskRunDetail(task) : undefined;
},
list: () =>
listTasksForRelatedSessionKeyForOwner({
relatedSessionKey: ownerKey,
callerOwnerKey: ownerKey,
}).map((task) => mapTaskRunView(task)),
findLatest: () => {
const task = findLatestTaskForRelatedSessionKeyForOwner({
relatedSessionKey: ownerKey,
callerOwnerKey: ownerKey,
});
return task ? mapTaskRunDetail(task) : undefined;
},
resolve: (token) => {
const task = resolveTaskForLookupTokenForOwner({
token,
callerOwnerKey: ownerKey,
});
return task ? mapTaskRunDetail(task) : undefined;
},
cancel: async ({ taskId, cfg }) => {
const task = getTaskByIdForOwner({
taskId,
callerOwnerKey: ownerKey,
});
if (!task) {
return {
found: false,
cancelled: false,
reason: "Task not found.",
};
}
return mapCancelledTaskResult(
await cancelTaskById({
cfg,
taskId: task.taskId,
}),
);
},
};
}
function createBoundTaskFlowsRuntime(params: {
sessionKey: string;
requesterOrigin?: import("../../tasks/task-registry.types.js").TaskDeliveryState["requesterOrigin"];
}): BoundTaskFlowsRuntime {
const ownerKey = assertSessionKey(
params.sessionKey,
"TaskFlow runtime requires a bound sessionKey.",
);
const requesterOrigin = params.requesterOrigin
? normalizeDeliveryContext(params.requesterOrigin)
: undefined;
const getDetail = (flowId: string): TaskFlowDetail | undefined => {
const flow = getTaskFlowByIdForOwner({
flowId,
callerOwnerKey: ownerKey,
});
if (!flow) {
return undefined;
}
const tasks = listTasksForFlowId(flow.flowId);
return mapTaskFlowDetail({
flow,
tasks,
summary: getFlowTaskSummary(flow.flowId),
});
};
return {
sessionKey: ownerKey,
...(requesterOrigin ? { requesterOrigin } : {}),
get: (flowId) => getDetail(flowId),
list: () =>
listTaskFlowsForOwner({
callerOwnerKey: ownerKey,
}).map((flow) => mapTaskFlowView(flow)),
findLatest: () => {
const flow = findLatestTaskFlowForOwner({
callerOwnerKey: ownerKey,
});
return flow ? getDetail(flow.flowId) : undefined;
},
resolve: (token) => {
const flow = resolveTaskFlowForLookupTokenForOwner({
token,
callerOwnerKey: ownerKey,
});
return flow ? getDetail(flow.flowId) : undefined;
},
getTaskSummary: (flowId) => {
const flow = getTaskFlowByIdForOwner({
flowId,
callerOwnerKey: ownerKey,
});
return flow ? mapTaskRunAggregateSummary(getFlowTaskSummary(flow.flowId)) : undefined;
},
};
}
export function createRuntimeTaskRuns(): PluginRuntimeTaskRuns {
return {
bindSession: (params) =>
createBoundTaskRunsRuntime({
sessionKey: params.sessionKey,
requesterOrigin: params.requesterOrigin,
}),
fromToolContext: (ctx) =>
createBoundTaskRunsRuntime({
sessionKey: assertSessionKey(
ctx.sessionKey,
"Tasks runtime requires tool context with a sessionKey.",
),
requesterOrigin: ctx.deliveryContext,
}),
};
}
export function createRuntimeTaskFlows(): PluginRuntimeTaskFlows {
return {
bindSession: (params) =>
createBoundTaskFlowsRuntime({
sessionKey: params.sessionKey,
requesterOrigin: params.requesterOrigin,
}),
fromToolContext: (ctx) =>
createBoundTaskFlowsRuntime({
sessionKey: assertSessionKey(
ctx.sessionKey,
"TaskFlow runtime requires tool context with a sessionKey.",
),
requesterOrigin: ctx.deliveryContext,
}),
};
}
export function createRuntimeTasks(params: {
legacyTaskFlow: PluginRuntimeTaskFlow;
}): PluginRuntimeTasks {
return {
runs: createRuntimeTaskRuns(),
flows: createRuntimeTaskFlows(),
flow: params.legacyTaskFlow,
};
}

View File

@@ -0,0 +1,83 @@
import type { JsonValue } from "../../tasks/task-flow-registry.types.js";
import type {
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRuntime,
TaskScopeKind,
TaskRuntimeCounts,
TaskStatus,
TaskStatusCounts,
TaskTerminalOutcome,
} from "../../tasks/task-registry.types.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
export type TaskRunAggregateSummary = {
total: number;
active: number;
terminal: number;
failures: number;
byStatus: TaskStatusCounts;
byRuntime: TaskRuntimeCounts;
};
export type TaskRunView = {
id: string;
runtime: TaskRuntime;
sourceId?: string;
sessionKey: string;
ownerKey: string;
scope: TaskScopeKind;
childSessionKey?: string;
flowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
title: string;
status: TaskStatus;
deliveryStatus: TaskDeliveryStatus;
notifyPolicy: TaskNotifyPolicy;
createdAt: number;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: TaskTerminalOutcome;
};
export type TaskRunDetail = TaskRunView;
export type TaskRunCancelResult = {
found: boolean;
cancelled: boolean;
reason?: string;
task?: TaskRunDetail;
};
export type TaskFlowView = {
id: string;
ownerKey: string;
requesterOrigin?: DeliveryContext;
status: import("../../tasks/task-flow-registry.types.js").TaskFlowStatus;
notifyPolicy: TaskNotifyPolicy;
goal: string;
currentStep?: string;
cancelRequestedAt?: number;
createdAt: number;
updatedAt: number;
endedAt?: number;
};
export type TaskFlowDetail = TaskFlowView & {
state?: JsonValue;
wait?: JsonValue;
blocked?: {
taskId?: string;
summary?: string;
};
tasks: TaskRunView[];
taskSummary: TaskRunAggregateSummary;
};

View File

@@ -104,8 +104,12 @@ export type PluginRuntimeCore = {
resolveStateDir: typeof import("../../config/paths.js").resolveStateDir;
};
tasks: {
runs: import("./runtime-tasks.js").PluginRuntimeTaskRuns;
flows: import("./runtime-tasks.js").PluginRuntimeTaskFlows;
/** @deprecated Use runtime.tasks.flows for DTO-based TaskFlow access. */
flow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
};
/** @deprecated Use runtime.tasks.flows for DTO-based TaskFlow access. */
taskFlow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
modelAuth: {
/** Resolve auth for a model. Only provider/model and optional cfg are used. */