From 8bf8baef87b19df9d0299e300db7c9d5806f1029 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 1 Apr 2026 01:28:18 +0900 Subject: [PATCH] Revert "refactor: move tasks into bundled plugin" This reverts commit c75f4695b71bbbca557ade25543a2b134ef6ed67. --- docs/plugins/sdk-overview.md | 17 +- docs/plugins/sdk-runtime.md | 34 -- extensions/lobster/src/lobster-tool.test.ts | 1 - package.json | 4 - scripts/lib/plugin-sdk-entrypoints.json | 1 - .../register.status-health-sessions.ts | 7 +- src/plugin-sdk/operations-default.ts | 14 - src/plugin-sdk/plugin-entry.ts | 30 -- src/plugins/api-builder.ts | 3 - src/plugins/captured-registration.test.ts | 1 - src/plugins/loader.test.ts | 179 -------- src/plugins/loader.ts | 26 -- src/plugins/operations-state.test.ts | 134 ------ src/plugins/operations-state.ts | 277 ------------- src/plugins/registry.ts | 15 - src/plugins/runtime/index.test.ts | 14 - src/plugins/runtime/index.ts | 17 - src/plugins/runtime/types-core.ts | 27 -- src/plugins/types.ts | 3 - src/tasks/operations-runtime.test.ts | 183 -------- src/tasks/operations-runtime.ts | 389 ------------------ src/tasks/task-executor-boundary.test.ts | 1 - .../task-registry-import-boundary.test.ts | 2 +- test/helpers/plugins/plugin-api.ts | 1 - test/helpers/plugins/plugin-runtime-mock.ts | 30 -- 25 files changed, 10 insertions(+), 1400 deletions(-) delete mode 100644 src/plugin-sdk/operations-default.ts delete mode 100644 src/plugins/operations-state.test.ts delete mode 100644 src/plugins/operations-state.ts delete mode 100644 src/tasks/operations-runtime.test.ts delete mode 100644 src/tasks/operations-runtime.ts diff --git a/docs/plugins/sdk-overview.md b/docs/plugins/sdk-overview.md index 69c9be1455e..4e05ef942cf 100644 --- a/docs/plugins/sdk-overview.md +++ b/docs/plugins/sdk-overview.md @@ -140,15 +140,14 @@ methods: ### Infrastructure -| Method | What it registers | -| ---------------------------------------------- | -------------------------- | -| `api.registerHook(events, handler, opts?)` | Event hook | -| `api.registerHttpRoute(params)` | Gateway HTTP endpoint | -| `api.registerGatewayMethod(name, handler)` | Gateway RPC method | -| `api.registerCli(registrar, opts?)` | CLI subcommand | -| `api.registerService(service)` | Background service | -| `api.registerInteractiveHandler(registration)` | Interactive handler | -| `api.registerOperationsRuntime(runtime)` | Durable operations runtime | +| Method | What it registers | +| ---------------------------------------------- | --------------------- | +| `api.registerHook(events, handler, opts?)` | Event hook | +| `api.registerHttpRoute(params)` | Gateway HTTP endpoint | +| `api.registerGatewayMethod(name, handler)` | Gateway RPC method | +| `api.registerCli(registrar, opts?)` | CLI subcommand | +| `api.registerService(service)` | Background service | +| `api.registerInteractiveHandler(registration)` | Interactive handler | ### CLI registration metadata diff --git a/docs/plugins/sdk-runtime.md b/docs/plugins/sdk-runtime.md index ab77fba0750..607b6d74101 100644 --- a/docs/plugins/sdk-runtime.md +++ b/docs/plugins/sdk-runtime.md @@ -115,40 +115,6 @@ await api.runtime.subagent.deleteSession({ Untrusted plugins can still run subagents, but override requests are rejected. -### `api.runtime.operations` - -Dispatch and query durable operation records behind a plugin-owned operations -runtime. - -```typescript -const created = await api.runtime.operations.dispatch({ - type: "create", - namespace: "imports", - kind: "csv", - status: "queued", - description: "Import contacts.csv", - runId: "import-1", -}); - -const progressed = await api.runtime.operations.dispatch({ - type: "transition", - runId: "import-1", - status: "running", - progressSummary: "Parsing rows", -}); - -const record = await api.runtime.operations.findByRunId("import-1"); -const list = await api.runtime.operations.list({ namespace: "imports" }); -const summary = await api.runtime.operations.summarize({ namespace: "imports" }); -``` - -Notes: - -- `api.registerOperationsRuntime(...)` installs the active runtime. -- Core exposes the facade; plugins own the operation semantics and storage. -- The built-in default runtime maps the existing background task ledger into the - generic operations shape until a plugin overrides it. - ### `api.runtime.tts` Text-to-speech synthesis. diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 205402402d0..de3f2cc2cae 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -60,7 +60,6 @@ function fakeApi(overrides: Partial = {}): OpenClawPluginApi registerMemoryFlushPlan() {}, registerMemoryRuntime() {}, registerMemoryEmbeddingProvider() {}, - registerOperationsRuntime() {}, on() {}, resolvePath: (p) => p, ...overrides, diff --git a/package.json b/package.json index cd3cc31c00a..c8f62dd3ba4 100644 --- a/package.json +++ b/package.json @@ -185,10 +185,6 @@ "types": "./dist/plugin-sdk/plugin-runtime.d.ts", "default": "./dist/plugin-sdk/plugin-runtime.js" }, - "./plugin-sdk/operations-default": { - "types": "./dist/plugin-sdk/operations-default.d.ts", - "default": "./dist/plugin-sdk/operations-default.js" - }, "./plugin-sdk/security-runtime": { "types": "./dist/plugin-sdk/security-runtime.d.ts", "default": "./dist/plugin-sdk/security-runtime.js" diff --git a/scripts/lib/plugin-sdk-entrypoints.json b/scripts/lib/plugin-sdk-entrypoints.json index b103e7c7d9e..1085535796d 100644 --- a/scripts/lib/plugin-sdk-entrypoints.json +++ b/scripts/lib/plugin-sdk-entrypoints.json @@ -36,7 +36,6 @@ "speech-runtime", "speech-core", "plugin-runtime", - "operations-default", "security-runtime", "gateway-runtime", "github-copilot-login", diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 992a9663f2b..4affcdc933c 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -25,12 +25,7 @@ function resolveVerbose(opts: { verbose?: boolean; debug?: boolean }): boolean { } function parseTimeoutMs(timeout: unknown): number | null | undefined { - const parsedRaw = - typeof timeout === "string" && timeout.trim() ? Number.parseInt(timeout, 10) : undefined; - const parsed = - typeof parsedRaw === "number" && Number.isFinite(parsedRaw) && parsedRaw > 0 - ? parsedRaw - : undefined; + const parsed = parsePositiveIntOrUndefined(timeout); if (timeout !== undefined && parsed === undefined) { defaultRuntime.error("--timeout must be a positive integer (milliseconds)"); defaultRuntime.exit(1); diff --git a/src/plugin-sdk/operations-default.ts b/src/plugin-sdk/operations-default.ts deleted file mode 100644 index ae928552702..00000000000 --- a/src/plugin-sdk/operations-default.ts +++ /dev/null @@ -1,14 +0,0 @@ -import type { OpenClawPluginService } from "../plugins/types.js"; -import { defaultTaskOperationsRuntime } from "../tasks/operations-runtime.js"; -import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js"; - -export const defaultOperationsRuntime = defaultTaskOperationsRuntime; - -export function createDefaultOperationsMaintenanceService(): OpenClawPluginService { - return { - id: "default-operations-maintenance", - start() { - startTaskRegistryMaintenance(); - }, - }; -} diff --git a/src/plugin-sdk/plugin-entry.ts b/src/plugin-sdk/plugin-entry.ts index 3ad358d7cbc..f88041ae3b3 100644 --- a/src/plugin-sdk/plugin-entry.ts +++ b/src/plugin-sdk/plugin-entry.ts @@ -1,21 +1,5 @@ import type { OpenClawConfig } from "../config/config.js"; import { emptyPluginConfigSchema } from "../plugins/config-schema.js"; -import type { - PluginOperationAuditFinding, - PluginOperationAuditQuery, - PluginOperationAuditSeverity, - PluginOperationAuditSummary, - PluginOperationDispatchEvent, - PluginOperationDispatchResult, - PluginOperationListQuery, - PluginOperationMaintenanceQuery, - PluginOperationMaintenanceSummary, - PluginOperationPatchEvent, - PluginOperationRecord, - PluginOperationSummary, - PluginOperationsCancelResult, - PluginOperationsRuntime, -} from "../plugins/operations-state.js"; import type { AnyAgentTool, MediaUnderstandingProviderPlugin, @@ -112,20 +96,6 @@ export type { OpenClawPluginDefinition, PluginLogger, PluginInteractiveTelegramHandlerContext, - PluginOperationAuditFinding, - PluginOperationAuditQuery, - PluginOperationAuditSeverity, - PluginOperationAuditSummary, - PluginOperationDispatchEvent, - PluginOperationDispatchResult, - PluginOperationListQuery, - PluginOperationMaintenanceQuery, - PluginOperationMaintenanceSummary, - PluginOperationPatchEvent, - PluginOperationRecord, - PluginOperationSummary, - PluginOperationsCancelResult, - PluginOperationsRuntime, }; export type { OpenClawConfig }; diff --git a/src/plugins/api-builder.ts b/src/plugins/api-builder.ts index ecccf444d75..e743c1a9c12 100644 --- a/src/plugins/api-builder.ts +++ b/src/plugins/api-builder.ts @@ -39,7 +39,6 @@ export type BuildPluginApiParams = { | "registerMemoryFlushPlan" | "registerMemoryRuntime" | "registerMemoryEmbeddingProvider" - | "registerOperationsRuntime" | "on" > >; @@ -70,7 +69,6 @@ const noopRegisterMemoryFlushPlan: OpenClawPluginApi["registerMemoryFlushPlan"] const noopRegisterMemoryRuntime: OpenClawPluginApi["registerMemoryRuntime"] = () => {}; const noopRegisterMemoryEmbeddingProvider: OpenClawPluginApi["registerMemoryEmbeddingProvider"] = () => {}; -const noopRegisterOperationsRuntime: OpenClawPluginApi["registerOperationsRuntime"] = () => {}; const noopOn: OpenClawPluginApi["on"] = () => {}; export function buildPluginApi(params: BuildPluginApiParams): OpenClawPluginApi { @@ -114,7 +112,6 @@ export function buildPluginApi(params: BuildPluginApiParams): OpenClawPluginApi registerMemoryRuntime: handlers.registerMemoryRuntime ?? noopRegisterMemoryRuntime, registerMemoryEmbeddingProvider: handlers.registerMemoryEmbeddingProvider ?? noopRegisterMemoryEmbeddingProvider, - registerOperationsRuntime: handlers.registerOperationsRuntime ?? noopRegisterOperationsRuntime, resolvePath: params.resolvePath, on: handlers.on ?? noopOn, }; diff --git a/src/plugins/captured-registration.test.ts b/src/plugins/captured-registration.test.ts index 295e09661c6..844be720a3e 100644 --- a/src/plugins/captured-registration.test.ts +++ b/src/plugins/captured-registration.test.ts @@ -48,6 +48,5 @@ describe("captured plugin registration", () => { expect(captured.tools.map((tool) => tool.name)).toEqual(["captured-tool"]); expect(captured.providers.map((provider) => provider.id)).toEqual(["captured-provider"]); expect(captured.api.registerMemoryEmbeddingProvider).toBeTypeOf("function"); - expect(captured.api.registerOperationsRuntime).toBeTypeOf("function"); }); }); diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index bca4620629f..c2e107f1933 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -30,10 +30,6 @@ import { registerMemoryRuntime, resolveMemoryFlushPlan, } from "./memory-state.js"; -import { - getRegisteredOperationsRuntime, - registerOperationsRuntimeForOwner, -} from "./operations-state.js"; import { createEmptyPluginRegistry } from "./registry.js"; import { getActivePluginRegistry, @@ -1465,181 +1461,6 @@ module.exports = { id: "skipped-scoped-only", register() { throw new Error("skip expect(listMemoryEmbeddingProviders()).toEqual([]); }); - it("restores the active operations runtime during snapshot loads", () => { - const activeRuntime = { - async dispatch() { - return { matched: true, created: true, record: null }; - }, - async getById() { - return null; - }, - async findByRunId() { - return null; - }, - async list() { - return []; - }, - async summarize() { - return { - total: 0, - active: 0, - terminal: 0, - failures: 0, - byNamespace: { active: 0 }, - byKind: {}, - byStatus: {}, - }; - }, - async audit() { - return []; - }, - async maintenance() { - return { - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }; - }, - async cancel() { - return { found: false, cancelled: false, reason: "active" }; - }, - }; - registerOperationsRuntimeForOwner(activeRuntime, "active-operations"); - const plugin = writePlugin({ - id: "snapshot-operations", - filename: "snapshot-operations.cjs", - body: `module.exports = { - id: "snapshot-operations", - register(api) { - api.registerOperationsRuntime({ - async dispatch() { - return { matched: true, created: true, record: null }; - }, - async getById() { - return null; - }, - async findByRunId() { - return null; - }, - async list() { - return []; - }, - async summarize() { - return { - total: 1, - active: 1, - terminal: 0, - failures: 0, - byNamespace: { snapshot: 1 }, - byKind: { snapshot: 1 }, - byStatus: { queued: 1 }, - }; - }, - async audit() { - return []; - }, - async maintenance() { - return { - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }; - }, - async cancel() { - return { found: false, cancelled: false, reason: "snapshot" }; - }, - }); - }, - };`, - }); - - const scoped = loadOpenClawPlugins({ - cache: false, - activate: false, - workspaceDir: plugin.dir, - config: { - plugins: { - load: { paths: [plugin.file] }, - allow: ["snapshot-operations"], - }, - }, - onlyPluginIds: ["snapshot-operations"], - }); - - expect(scoped.plugins.find((entry) => entry.id === "snapshot-operations")?.status).toBe( - "loaded", - ); - expect(getRegisteredOperationsRuntime()).toBe(activeRuntime); - }); - - it("clears newly-registered operations runtime when plugin register fails", () => { - const plugin = writePlugin({ - id: "failing-operations", - filename: "failing-operations.cjs", - body: `module.exports = { - id: "failing-operations", - register(api) { - api.registerOperationsRuntime({ - async dispatch() { - return { matched: true, created: true, record: null }; - }, - async getById() { - return null; - }, - async findByRunId() { - return null; - }, - async list() { - return []; - }, - async summarize() { - return { - total: 1, - active: 1, - terminal: 0, - failures: 0, - byNamespace: { failing: 1 }, - byKind: { failing: 1 }, - byStatus: { queued: 1 }, - }; - }, - async audit() { - return []; - }, - async maintenance() { - return { - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }; - }, - async cancel() { - return { found: false, cancelled: false, reason: "failing" }; - }, - }); - throw new Error("operations register failed"); - }, - };`, - }); - - const registry = loadOpenClawPlugins({ - cache: false, - workspaceDir: plugin.dir, - config: { - plugins: { - load: { paths: [plugin.file] }, - allow: ["failing-operations"], - }, - }, - onlyPluginIds: ["failing-operations"], - }); - - expect(registry.plugins.find((entry) => entry.id === "failing-operations")?.status).toBe( - "error", - ); - expect(getRegisteredOperationsRuntime()).toBeUndefined(); - }); - it("throws when activate:false is used without cache:false", () => { expect(() => loadOpenClawPlugins({ activate: false })).toThrow( "activate:false requires cache:false", diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 601ae4fd971..f608c8b213a 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -35,12 +35,6 @@ import { getMemoryRuntime, restoreMemoryPluginState, } from "./memory-state.js"; -import { - clearOperationsRuntimeState, - getRegisteredOperationsRuntime, - getRegisteredOperationsRuntimeOwner, - restoreOperationsRuntimeState, -} from "./operations-state.js"; import { isPathInside, safeStatSync } from "./path-safety.js"; import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js"; import { resolvePluginCacheInputs } from "./roots.js"; @@ -122,8 +116,6 @@ type CachedPluginState = { memoryFlushPlanResolver: ReturnType; memoryPromptBuilder: ReturnType; memoryRuntime: ReturnType; - operationsRuntime: ReturnType; - operationsRuntimeOwner: ReturnType; }; const MAX_PLUGIN_REGISTRY_CACHE_ENTRIES = 128; @@ -144,7 +136,6 @@ const LAZY_RUNTIME_REFLECTION_KEYS = [ "logging", "state", "modelAuth", - "operations", ] as const satisfies readonly (keyof PluginRuntime)[]; export function clearPluginLoaderCache(): void { @@ -152,7 +143,6 @@ export function clearPluginLoaderCache(): void { openAllowlistWarningCache.clear(); clearMemoryEmbeddingProviders(); clearMemoryPluginState(); - clearOperationsRuntimeState(); } const defaultLogger = () => createSubsystemLogger("plugins"); @@ -853,10 +843,6 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi flushPlanResolver: cached.memoryFlushPlanResolver, runtime: cached.memoryRuntime, }); - restoreOperationsRuntimeState({ - runtime: cached.operationsRuntime, - ownerPluginId: cached.operationsRuntimeOwner, - }); if (shouldActivate) { activatePluginRegistry(cached.registry, cacheKey, runtimeSubagentMode); } @@ -1350,8 +1336,6 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi const previousMemoryFlushPlanResolver = getMemoryFlushPlanResolver(); const previousMemoryPromptBuilder = getMemoryPromptSectionBuilder(); const previousMemoryRuntime = getMemoryRuntime(); - const previousOperationsRuntime = getRegisteredOperationsRuntime(); - const previousOperationsRuntimeOwner = getRegisteredOperationsRuntimeOwner(); try { const result = register(api); @@ -1371,10 +1355,6 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi flushPlanResolver: previousMemoryFlushPlanResolver, runtime: previousMemoryRuntime, }); - restoreOperationsRuntimeState({ - runtime: previousOperationsRuntime, - ownerPluginId: previousOperationsRuntimeOwner, - }); } registry.plugins.push(record); seenIds.set(pluginId, candidate.origin); @@ -1385,10 +1365,6 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi flushPlanResolver: previousMemoryFlushPlanResolver, runtime: previousMemoryRuntime, }); - restoreOperationsRuntimeState({ - runtime: previousOperationsRuntime, - ownerPluginId: previousOperationsRuntimeOwner, - }); recordPluginError({ logger, registry, @@ -1428,8 +1404,6 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi memoryFlushPlanResolver: getMemoryFlushPlanResolver(), memoryPromptBuilder: getMemoryPromptSectionBuilder(), memoryRuntime: getMemoryRuntime(), - operationsRuntime: getRegisteredOperationsRuntime(), - operationsRuntimeOwner: getRegisteredOperationsRuntimeOwner(), }); } if (shouldActivate) { diff --git a/src/plugins/operations-state.test.ts b/src/plugins/operations-state.test.ts deleted file mode 100644 index cc1b4bad640..00000000000 --- a/src/plugins/operations-state.test.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { describe, expect, it } from "vitest"; -import { - clearOperationsRuntimeState, - getRegisteredOperationsRuntime, - getRegisteredOperationsRuntimeOwner, - registerOperationsRuntimeForOwner, - restoreOperationsRuntimeState, - summarizeOperationRecords, - type PluginOperationsRuntime, -} from "./operations-state.js"; - -function createRuntime(label: string): PluginOperationsRuntime { - return { - async dispatch() { - return { matched: true, created: true, record: null }; - }, - async getById() { - return null; - }, - async findByRunId() { - return null; - }, - async list() { - return []; - }, - async summarize() { - return { - total: 0, - active: 0, - terminal: 0, - failures: 0, - byNamespace: { [label]: 0 }, - byKind: {}, - byStatus: {}, - }; - }, - async audit() { - return []; - }, - async maintenance() { - return { - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }; - }, - async cancel() { - return { found: false, cancelled: false, reason: label }; - }, - }; -} - -describe("operations-state", () => { - it("registers an operations runtime and tracks the owner", () => { - clearOperationsRuntimeState(); - const runtime = createRuntime("one"); - expect(registerOperationsRuntimeForOwner(runtime, "plugin-one")).toEqual({ ok: true }); - expect(getRegisteredOperationsRuntime()).toBe(runtime); - expect(getRegisteredOperationsRuntimeOwner()).toBe("plugin-one"); - }); - - it("rejects a second owner and allows same-owner refresh", () => { - clearOperationsRuntimeState(); - const first = createRuntime("one"); - const second = createRuntime("two"); - const replacement = createRuntime("three"); - expect(registerOperationsRuntimeForOwner(first, "plugin-one")).toEqual({ ok: true }); - expect(registerOperationsRuntimeForOwner(second, "plugin-two")).toEqual({ - ok: false, - existingOwner: "plugin-one", - }); - expect( - registerOperationsRuntimeForOwner(replacement, "plugin-one", { - allowSameOwnerRefresh: true, - }), - ).toEqual({ ok: true }); - expect(getRegisteredOperationsRuntime()).toBe(replacement); - }); - - it("restores and clears runtime state", () => { - clearOperationsRuntimeState(); - const runtime = createRuntime("restore"); - restoreOperationsRuntimeState({ - runtime, - ownerPluginId: "plugin-restore", - }); - expect(getRegisteredOperationsRuntime()).toBe(runtime); - expect(getRegisteredOperationsRuntimeOwner()).toBe("plugin-restore"); - clearOperationsRuntimeState(); - expect(getRegisteredOperationsRuntime()).toBeUndefined(); - expect(getRegisteredOperationsRuntimeOwner()).toBeUndefined(); - }); - - it("summarizes generic operation records", () => { - const summary = summarizeOperationRecords([ - { - operationId: "op-1", - namespace: "tasks", - kind: "cli", - status: "queued", - description: "Queued task", - createdAt: 1, - updatedAt: 1, - }, - { - operationId: "op-2", - namespace: "imports", - kind: "csv", - status: "failed", - description: "Failed import", - createdAt: 2, - updatedAt: 2, - }, - ]); - expect(summary).toEqual({ - total: 2, - active: 1, - terminal: 1, - failures: 1, - byNamespace: { - imports: 1, - tasks: 1, - }, - byKind: { - cli: 1, - csv: 1, - }, - byStatus: { - failed: 1, - queued: 1, - }, - }); - }); -}); diff --git a/src/plugins/operations-state.ts b/src/plugins/operations-state.ts deleted file mode 100644 index 5e9e68d6954..00000000000 --- a/src/plugins/operations-state.ts +++ /dev/null @@ -1,277 +0,0 @@ -import type { OpenClawConfig } from "../config/config.js"; - -export type PluginOperationRecord = { - operationId: string; - namespace: string; - kind: string; - status: string; - sourceId?: string; - requesterSessionKey?: string; - childSessionKey?: string; - parentOperationId?: string; - agentId?: string; - runId?: string; - title?: string; - description: string; - createdAt: number; - startedAt?: number; - endedAt?: number; - updatedAt: number; - error?: string; - progressSummary?: string; - terminalSummary?: string; - metadata?: Record; -}; - -export type PluginOperationListQuery = { - namespace?: string; - kind?: string; - status?: string; - sessionKey?: string; - runId?: string; - sourceId?: string; - parentOperationId?: string; - limit?: number; -}; - -export type PluginOperationSummary = { - total: number; - active: number; - terminal: number; - failures: number; - byNamespace: Record; - byKind: Record; - byStatus: Record; -}; - -export type PluginOperationCreateEvent = { - type: "create"; - namespace: string; - kind: string; - status?: string; - sourceId?: string; - requesterSessionKey?: string; - childSessionKey?: string; - parentOperationId?: string; - agentId?: string; - runId?: string; - title?: string; - description: string; - createdAt?: number; - startedAt?: number; - endedAt?: number; - updatedAt?: number; - error?: string; - progressSummary?: string | null; - terminalSummary?: string | null; - metadata?: Record; -}; - -export type PluginOperationTransitionEvent = { - type: "transition"; - operationId?: string; - runId?: string; - status: string; - at?: number; - startedAt?: number; - endedAt?: number; - error?: string | null; - progressSummary?: string | null; - terminalSummary?: string | null; - metadataPatch?: Record; -}; - -export type PluginOperationPatchEvent = { - type: "patch"; - operationId?: string; - runId?: string; - at?: number; - title?: string | null; - description?: string | null; - error?: string | null; - progressSummary?: string | null; - terminalSummary?: string | null; - metadataPatch?: Record; -}; - -export type PluginOperationDispatchEvent = - | PluginOperationCreateEvent - | PluginOperationTransitionEvent - | PluginOperationPatchEvent; - -export type PluginOperationDispatchResult = { - matched: boolean; - created?: boolean; - record: PluginOperationRecord | null; -}; - -export type PluginOperationsCancelResult = { - found: boolean; - cancelled: boolean; - reason?: string; - record?: PluginOperationRecord | null; -}; - -export type PluginOperationAuditSeverity = "warn" | "error"; - -export type PluginOperationAuditFinding = { - severity: PluginOperationAuditSeverity; - code: string; - operation: PluginOperationRecord; - detail: string; - ageMs?: number; -}; - -export type PluginOperationAuditSummary = { - total: number; - warnings: number; - errors: number; - byCode: Record; -}; - -export type PluginOperationAuditQuery = { - namespace?: string; - severity?: PluginOperationAuditSeverity; - code?: string; -}; - -export type PluginOperationMaintenanceQuery = { - namespace?: string; - apply?: boolean; -}; - -export type PluginOperationMaintenanceSummary = { - reconciled: number; - cleanupStamped: number; - pruned: number; -}; - -export type PluginOperationsRuntime = { - dispatch(event: PluginOperationDispatchEvent): Promise; - getById(operationId: string): Promise; - findByRunId(runId: string): Promise; - list(query?: PluginOperationListQuery): Promise; - summarize(query?: PluginOperationListQuery): Promise; - audit(query?: PluginOperationAuditQuery): Promise; - maintenance(query?: PluginOperationMaintenanceQuery): Promise; - cancel(params: { - cfg: OpenClawConfig; - operationId: string; - }): Promise; -}; - -type OperationsRuntimeState = { - runtime?: PluginOperationsRuntime; - ownerPluginId?: string; -}; - -type RegisterOperationsRuntimeResult = { ok: true } | { ok: false; existingOwner?: string }; - -const operationsRuntimeState: OperationsRuntimeState = {}; - -function normalizeOwnedPluginId(ownerPluginId: string): string { - return ownerPluginId.trim(); -} - -export function registerOperationsRuntimeForOwner( - runtime: PluginOperationsRuntime, - ownerPluginId: string, - opts?: { allowSameOwnerRefresh?: boolean }, -): RegisterOperationsRuntimeResult { - const nextOwner = normalizeOwnedPluginId(ownerPluginId); - const existingOwner = operationsRuntimeState.ownerPluginId?.trim(); - if ( - operationsRuntimeState.runtime && - existingOwner && - existingOwner !== nextOwner && - !(opts?.allowSameOwnerRefresh === true && existingOwner === nextOwner) - ) { - return { - ok: false, - existingOwner, - }; - } - operationsRuntimeState.runtime = runtime; - operationsRuntimeState.ownerPluginId = nextOwner; - return { ok: true }; -} - -export function getRegisteredOperationsRuntime(): PluginOperationsRuntime | undefined { - return operationsRuntimeState.runtime; -} - -export function getRegisteredOperationsRuntimeOwner(): string | undefined { - return operationsRuntimeState.ownerPluginId; -} - -export function hasRegisteredOperationsRuntime(): boolean { - return operationsRuntimeState.runtime !== undefined; -} - -export function restoreOperationsRuntimeState(state: OperationsRuntimeState): void { - operationsRuntimeState.runtime = state.runtime; - operationsRuntimeState.ownerPluginId = state.ownerPluginId?.trim() || undefined; -} - -export function clearOperationsRuntimeState(): void { - operationsRuntimeState.runtime = undefined; - operationsRuntimeState.ownerPluginId = undefined; -} - -export function isActiveOperationStatus(status: string): boolean { - return status === "queued" || status === "running"; -} - -export function isFailureOperationStatus(status: string): boolean { - return status === "failed" || status === "timed_out" || status === "lost"; -} - -export function summarizeOperationRecords( - records: Iterable, -): PluginOperationSummary { - const summary: PluginOperationSummary = { - total: 0, - active: 0, - terminal: 0, - failures: 0, - byNamespace: {}, - byKind: {}, - byStatus: {}, - }; - for (const record of records) { - summary.total += 1; - summary.byNamespace[record.namespace] = (summary.byNamespace[record.namespace] ?? 0) + 1; - summary.byKind[record.kind] = (summary.byKind[record.kind] ?? 0) + 1; - summary.byStatus[record.status] = (summary.byStatus[record.status] ?? 0) + 1; - if (isActiveOperationStatus(record.status)) { - summary.active += 1; - } else { - summary.terminal += 1; - } - if (isFailureOperationStatus(record.status)) { - summary.failures += 1; - } - } - return summary; -} - -export function summarizeOperationAuditFindings( - findings: Iterable, -): PluginOperationAuditSummary { - const summary: PluginOperationAuditSummary = { - total: 0, - warnings: 0, - errors: 0, - byCode: {}, - }; - for (const finding of findings) { - summary.total += 1; - summary.byCode[finding.code] = (summary.byCode[finding.code] ?? 0) + 1; - if (finding.severity === "error") { - summary.errors += 1; - continue; - } - summary.warnings += 1; - } - return summary; -} diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index 3919b20ff06..a35aaa8834b 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -24,7 +24,6 @@ import { registerMemoryPromptSection, registerMemoryRuntime, } from "./memory-state.js"; -import { registerOperationsRuntimeForOwner } from "./operations-state.js"; import { normalizeRegisteredProvider } from "./provider-validation.js"; import { createEmptyPluginRegistry } from "./registry-empty.js"; import { withPluginRuntimePluginIdScope } from "./runtime/gateway-request-scope.js"; @@ -1154,20 +1153,6 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { ownerPluginId: record.id, }); }, - registerOperationsRuntime: (runtime) => { - const result = registerOperationsRuntimeForOwner(runtime, record.id, { - allowSameOwnerRefresh: true, - }); - if (!result.ok) { - const ownerDetail = result.existingOwner ? ` (${result.existingOwner})` : ""; - pushDiagnostic({ - level: "error", - pluginId: record.id, - source: record.source, - message: `operations runtime already registered${ownerDetail}`, - }); - } - }, on: (hookName, handler, opts) => registerTypedHook(record, hookName, handler, opts, params.hookPolicy), } diff --git a/src/plugins/runtime/index.test.ts b/src/plugins/runtime/index.test.ts index 47b38c48663..e3fc27f0632 100644 --- a/src/plugins/runtime/index.test.ts +++ b/src/plugins/runtime/index.test.ts @@ -215,20 +215,6 @@ describe("plugin runtime command execution", () => { ]); }, }, - { - name: "exposes runtime.operations helpers", - assert: (runtime: ReturnType) => { - expect(runtime.operations).toBeDefined(); - expectFunctionKeys(runtime.operations as Record, [ - "dispatch", - "getById", - "findByRunId", - "list", - "summarize", - "cancel", - ]); - }, - }, ] as const)("$name", ({ assert }) => { expectRuntimeShape(assert); }); diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index f72954cb915..c9ad7f44b4a 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -6,10 +6,8 @@ import { createLazyRuntimeMethodBinder, createLazyRuntimeModule, } from "../../shared/lazy-runtime.js"; -import { defaultTaskOperationsRuntime } from "../../tasks/operations-runtime.js"; import { VERSION } from "../../version.js"; import { listWebSearchProviders, runWebSearch } from "../../web-search/runtime.js"; -import { getRegisteredOperationsRuntime } from "../operations-state.js"; import { createRuntimeAgent } from "./runtime-agent.js"; import { defineCachedValue } from "./runtime-cache.js"; import { createRuntimeChannel } from "./runtime-channel.js"; @@ -98,20 +96,6 @@ function createRuntimeModelAuth(): PluginRuntime["modelAuth"] { }; } -function createRuntimeOperations(): PluginRuntime["operations"] { - const resolveRuntime = () => getRegisteredOperationsRuntime() ?? defaultTaskOperationsRuntime; - return { - dispatch: (event) => resolveRuntime().dispatch(event), - getById: (operationId) => resolveRuntime().getById(operationId), - findByRunId: (runId) => resolveRuntime().findByRunId(runId), - list: (query) => resolveRuntime().list(query), - summarize: (query) => resolveRuntime().summarize(query), - audit: (query) => resolveRuntime().audit(query), - maintenance: (query) => resolveRuntime().maintenance(query), - cancel: (params) => resolveRuntime().cancel(params), - }; -} - function createUnavailableSubagentRuntime(): PluginRuntime["subagent"] { const unavailable = () => { throw new Error("Plugin runtime subagent methods are only available during a gateway request."); @@ -219,7 +203,6 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): events: createRuntimeEvents(), logging: createRuntimeLogging(), state: { resolveStateDir }, - operations: createRuntimeOperations(), } satisfies Omit< PluginRuntime, "tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration" diff --git a/src/plugins/runtime/types-core.ts b/src/plugins/runtime/types-core.ts index 55718444494..9109b694d56 100644 --- a/src/plugins/runtime/types-core.ts +++ b/src/plugins/runtime/types-core.ts @@ -1,17 +1,5 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { LogLevel } from "../../logging/levels.js"; -import type { - PluginOperationAuditFinding, - PluginOperationAuditQuery, - PluginOperationDispatchEvent, - PluginOperationDispatchResult, - PluginOperationListQuery, - PluginOperationMaintenanceQuery, - PluginOperationMaintenanceSummary, - PluginOperationRecord, - PluginOperationSummary, - PluginOperationsCancelResult, -} from "../operations-state.js"; export type { HeartbeatRunResult }; @@ -127,19 +115,4 @@ export type PluginRuntimeCore = { cfg?: import("../../config/config.js").OpenClawConfig; }) => Promise; }; - operations: { - dispatch: (event: PluginOperationDispatchEvent) => Promise; - getById: (operationId: string) => Promise; - findByRunId: (runId: string) => Promise; - list: (query?: PluginOperationListQuery) => Promise; - summarize: (query?: PluginOperationListQuery) => Promise; - audit: (query?: PluginOperationAuditQuery) => Promise; - maintenance: ( - query?: PluginOperationMaintenanceQuery, - ) => Promise; - cancel: (params: { - cfg: import("../../config/config.js").OpenClawConfig; - operationId: string; - }) => Promise; - }; }; diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 032517f488c..1bbc1ab0369 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -54,7 +54,6 @@ import type { } from "../tts/provider-types.js"; import type { DeliveryContext } from "../utils/delivery-context.js"; import type { WizardPrompter } from "../wizard/prompts.js"; -import type { PluginOperationsRuntime } from "./operations-state.js"; import type { SecretInputMode } from "./provider-auth-types.js"; import type { createVpsAwareOAuthHandlers } from "./provider-oauth-flow.js"; import type { PluginRuntime } from "./runtime/types.js"; @@ -1768,8 +1767,6 @@ export type OpenClawPluginApi = { registerMemoryEmbeddingProvider: ( adapter: import("./memory-embedding-providers.js").MemoryEmbeddingProviderAdapter, ) => void; - /** Register the active operations runtime adapter (exclusive slot — only one active at a time). */ - registerOperationsRuntime: (runtime: PluginOperationsRuntime) => void; resolvePath: (input: string) => string; /** Register a lifecycle hook handler */ on: ( diff --git a/src/tasks/operations-runtime.test.ts b/src/tasks/operations-runtime.test.ts deleted file mode 100644 index 88fb8e007ce..00000000000 --- a/src/tasks/operations-runtime.test.ts +++ /dev/null @@ -1,183 +0,0 @@ -import { afterEach, describe, expect, it } from "vitest"; -import { withTempDir } from "../test-helpers/temp-dir.js"; -import { defaultTaskOperationsRuntime } from "./operations-runtime.js"; -import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js"; - -const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; - -async function withTaskStateDir(run: () => Promise): Promise { - await withTempDir({ prefix: "openclaw-task-operations-" }, async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); - try { - await run(); - } finally { - resetTaskRegistryForTests(); - } - }); -} - -describe("task operations runtime", () => { - afterEach(() => { - if (ORIGINAL_STATE_DIR === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; - } - resetTaskRegistryForTests(); - }); - - it("creates and transitions task records through the generic operations runtime", async () => { - await withTaskStateDir(async () => { - const created = await defaultTaskOperationsRuntime.dispatch({ - type: "create", - namespace: "tasks", - kind: "cli", - status: "queued", - requesterSessionKey: "agent:test:main", - childSessionKey: "agent:test:child", - runId: "run-ops-create", - title: "Task title", - description: "Do the thing", - }); - - expect(created.matched).toBe(true); - expect(created.created).toBe(true); - expect(created.record).toMatchObject({ - namespace: "tasks", - kind: "cli", - status: "queued", - title: "Task title", - description: "Do the thing", - runId: "run-ops-create", - }); - - const progressed = await defaultTaskOperationsRuntime.dispatch({ - type: "transition", - runId: "run-ops-create", - status: "running", - at: 100, - startedAt: 100, - progressSummary: "Started work", - }); - - expect(progressed.record).toMatchObject({ - status: "running", - progressSummary: "Started work", - }); - - const completed = await defaultTaskOperationsRuntime.dispatch({ - type: "transition", - runId: "run-ops-create", - status: "succeeded", - at: 200, - endedAt: 200, - terminalSummary: "All done", - }); - - expect(completed.record).toMatchObject({ - status: "succeeded", - terminalSummary: "All done", - }); - expect(findTaskByRunId("run-ops-create")).toMatchObject({ - status: "succeeded", - terminalSummary: "All done", - }); - }); - }); - - it("lists and summarizes task-backed operations", async () => { - await withTaskStateDir(async () => { - await defaultTaskOperationsRuntime.dispatch({ - type: "create", - namespace: "tasks", - kind: "acp", - status: "running", - requesterSessionKey: "agent:test:main", - runId: "run-ops-list-1", - description: "One", - startedAt: 10, - }); - await defaultTaskOperationsRuntime.dispatch({ - type: "create", - namespace: "tasks", - kind: "cron", - status: "failed", - requesterSessionKey: "agent:test:main", - runId: "run-ops-list-2", - description: "Two", - endedAt: 20, - terminalSummary: "Failed", - }); - - const listed = await defaultTaskOperationsRuntime.list({ - namespace: "tasks", - }); - const summary = await defaultTaskOperationsRuntime.summarize({ - namespace: "tasks", - }); - - expect(listed).toHaveLength(2); - expect(summary).toEqual({ - total: 2, - active: 1, - terminal: 1, - failures: 1, - byNamespace: { tasks: 2 }, - byKind: { acp: 1, cron: 1 }, - byStatus: { failed: 1, running: 1 }, - }); - }); - }); - - it("patches notify policy and exposes audit plus maintenance", async () => { - await withTaskStateDir(async () => { - const created = await defaultTaskOperationsRuntime.dispatch({ - type: "create", - namespace: "tasks", - kind: "cli", - status: "running", - requesterSessionKey: "agent:test:main", - runId: "run-ops-patch", - description: "Patch me", - startedAt: Date.now() - 31 * 60_000, - }); - - expect(created.record?.metadata?.notifyPolicy).toBe("done_only"); - - const findings = await defaultTaskOperationsRuntime.audit({ - namespace: "tasks", - severity: "error", - code: "stale_running", - }); - - const patched = await defaultTaskOperationsRuntime.dispatch({ - type: "patch", - operationId: created.record?.operationId, - metadataPatch: { - notifyPolicy: "silent", - }, - }); - - expect(patched.record?.metadata?.notifyPolicy).toBe("silent"); - - const preview = await defaultTaskOperationsRuntime.maintenance({ - namespace: "tasks", - }); - - expect(findings).toHaveLength(1); - expect(findings[0]).toMatchObject({ - severity: "error", - code: "stale_running", - operation: { - operationId: created.record?.operationId, - }, - }); - expect(preview).toEqual({ - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }); - }); - }); -}); diff --git a/src/tasks/operations-runtime.ts b/src/tasks/operations-runtime.ts deleted file mode 100644 index ba207b540eb..00000000000 --- a/src/tasks/operations-runtime.ts +++ /dev/null @@ -1,389 +0,0 @@ -import type { - PluginOperationAuditFinding, - PluginOperationAuditQuery, - PluginOperationDispatchEvent, - PluginOperationDispatchResult, - PluginOperationListQuery, - PluginOperationMaintenanceQuery, - PluginOperationMaintenanceSummary, - PluginOperationRecord, - PluginOperationSummary, - PluginOperationsCancelResult, - PluginOperationsRuntime, -} from "../plugins/operations-state.js"; -import { summarizeOperationRecords } from "../plugins/operations-state.js"; -import { - listTaskAuditFindings, - type TaskAuditFinding, - type TaskAuditSeverity, -} from "./task-registry.audit.js"; -import { - cancelTaskById, - createTaskRecord, - findTaskByRunId, - getTaskById, - listTaskRecords, - listTasksForSessionKey, - markTaskLostById, - markTaskRunningByRunId, - markTaskTerminalByRunId, - recordTaskProgressByRunId, - updateTaskNotifyPolicyById, -} from "./task-registry.js"; -import { - previewTaskRegistryMaintenance, - runTaskRegistryMaintenance, -} from "./task-registry.maintenance.js"; -import type { - TaskRecord, - TaskRuntime, - TaskStatus, - TaskTerminalOutcome, -} from "./task-registry.types.js"; - -const TASK_NAMESPACE = "tasks"; - -function isTaskNamespace(namespace: string | undefined): boolean { - const trimmed = namespace?.trim().toLowerCase(); - return !trimmed || trimmed === "task" || trimmed === TASK_NAMESPACE; -} - -function normalizeTaskRuntime(kind: string): TaskRuntime { - const trimmed = kind.trim(); - if (trimmed === "acp" || trimmed === "subagent" || trimmed === "cli" || trimmed === "cron") { - return trimmed; - } - throw new Error(`Unsupported task operation kind: ${kind}`); -} - -function normalizeTaskStatus(status: string | undefined): TaskStatus { - const trimmed = status?.trim(); - if ( - trimmed === "queued" || - trimmed === "running" || - trimmed === "succeeded" || - trimmed === "failed" || - trimmed === "timed_out" || - trimmed === "cancelled" || - trimmed === "lost" - ) { - return trimmed; - } - return "queued"; -} - -function normalizeTaskTerminalOutcome(status: TaskStatus): TaskTerminalOutcome | undefined { - return status === "succeeded" ? "succeeded" : undefined; -} - -function toOperationRecord(task: TaskRecord): PluginOperationRecord { - const metadata: Record = { - deliveryStatus: task.deliveryStatus, - notifyPolicy: task.notifyPolicy, - }; - if (typeof task.cleanupAfter === "number") { - metadata.cleanupAfter = task.cleanupAfter; - } - if (task.terminalOutcome) { - metadata.terminalOutcome = task.terminalOutcome; - } - return { - operationId: task.taskId, - namespace: TASK_NAMESPACE, - kind: task.runtime, - status: task.status, - sourceId: task.sourceId, - requesterSessionKey: task.requesterSessionKey, - childSessionKey: task.childSessionKey, - parentOperationId: task.parentTaskId, - agentId: task.agentId, - runId: task.runId, - title: task.label, - description: task.task, - createdAt: task.createdAt, - startedAt: task.startedAt, - endedAt: task.endedAt, - updatedAt: task.lastEventAt ?? task.endedAt ?? task.startedAt ?? task.createdAt, - error: task.error, - progressSummary: task.progressSummary, - terminalSummary: task.terminalSummary, - metadata, - }; -} - -function resolveTaskRecordForTransition(event: { - operationId?: string; - runId?: string; -}): TaskRecord | undefined { - const operationId = event.operationId?.trim(); - if (operationId) { - return getTaskById(operationId); - } - const runId = event.runId?.trim(); - if (runId) { - return findTaskByRunId(runId); - } - return undefined; -} - -function filterOperationRecord( - record: PluginOperationRecord, - query: PluginOperationListQuery, -): boolean { - if (query.namespace && !isTaskNamespace(query.namespace)) { - return false; - } - if (query.kind && record.kind !== query.kind) { - return false; - } - if (query.status && record.status !== query.status) { - return false; - } - if (query.runId && record.runId !== query.runId) { - return false; - } - if (query.sourceId && record.sourceId !== query.sourceId) { - return false; - } - if (query.parentOperationId && record.parentOperationId !== query.parentOperationId) { - return false; - } - if ( - query.sessionKey && - record.requesterSessionKey !== query.sessionKey && - record.childSessionKey !== query.sessionKey - ) { - return false; - } - return true; -} - -async function dispatchTaskOperation( - event: PluginOperationDispatchEvent, -): Promise { - if (event.type === "create") { - if (!isTaskNamespace(event.namespace)) { - throw new Error( - `Default operations runtime only supports the "${TASK_NAMESPACE}" namespace.`, - ); - } - const status = normalizeTaskStatus(event.status); - const record = createTaskRecord({ - runtime: normalizeTaskRuntime(event.kind), - sourceId: event.sourceId, - requesterSessionKey: event.requesterSessionKey?.trim() || "", - childSessionKey: event.childSessionKey, - parentTaskId: event.parentOperationId, - agentId: event.agentId, - runId: event.runId, - label: event.title, - task: event.description, - status, - startedAt: event.startedAt, - lastEventAt: event.updatedAt ?? event.startedAt ?? event.createdAt, - progressSummary: event.progressSummary, - terminalSummary: event.terminalSummary, - terminalOutcome: normalizeTaskTerminalOutcome(status), - }); - return { - matched: true, - created: true, - record: toOperationRecord(record), - }; - } - - if (event.type === "patch") { - const current = resolveTaskRecordForTransition(event); - if (!current) { - return { - matched: false, - record: null, - }; - } - const nextNotifyPolicy = event.metadataPatch?.notifyPolicy; - const next = - nextNotifyPolicy === "done_only" || - nextNotifyPolicy === "state_changes" || - nextNotifyPolicy === "silent" - ? (updateTaskNotifyPolicyById({ - taskId: current.taskId, - notifyPolicy: nextNotifyPolicy, - }) ?? current) - : current; - return { - matched: true, - record: toOperationRecord(next), - }; - } - - const current = resolveTaskRecordForTransition(event); - if (!current) { - return { - matched: false, - record: null, - }; - } - - const at = event.at ?? event.endedAt ?? event.startedAt ?? Date.now(); - const runId = event.runId?.trim() || current.runId?.trim(); - const status = normalizeTaskStatus(event.status); - let next: TaskRecord | null | undefined; - - if (status === "running") { - if (!runId) { - throw new Error("Task transition to running requires a runId."); - } - next = markTaskRunningByRunId({ - runId, - startedAt: event.startedAt, - lastEventAt: at, - progressSummary: event.progressSummary, - eventSummary: event.progressSummary, - })[0]; - } else if (status === "queued") { - if (!runId) { - throw new Error("Task transition to queued requires a runId."); - } - next = recordTaskProgressByRunId({ - runId, - lastEventAt: at, - progressSummary: event.progressSummary, - eventSummary: event.progressSummary, - })[0]; - } else if ( - status === "succeeded" || - status === "failed" || - status === "timed_out" || - status === "cancelled" - ) { - if (!runId) { - throw new Error(`Task transition to ${status} requires a runId.`); - } - next = markTaskTerminalByRunId({ - runId, - status, - startedAt: event.startedAt, - endedAt: event.endedAt ?? at, - lastEventAt: at, - error: event.error ?? undefined, - progressSummary: event.progressSummary, - terminalSummary: event.terminalSummary, - terminalOutcome: status === "succeeded" ? "succeeded" : undefined, - })[0]; - } else if (status === "lost") { - next = markTaskLostById({ - taskId: current.taskId, - endedAt: event.endedAt ?? at, - lastEventAt: at, - error: event.error ?? undefined, - }); - } - - return { - matched: true, - record: next ? toOperationRecord(next) : toOperationRecord(current), - }; -} - -async function getTaskOperationList( - query: PluginOperationListQuery = {}, -): Promise { - if (query.namespace && !isTaskNamespace(query.namespace)) { - return []; - } - const records = ( - query.sessionKey ? listTasksForSessionKey(query.sessionKey) : listTaskRecords() - ).map(toOperationRecord); - const filtered = records.filter((record) => filterOperationRecord(record, query)); - const limit = - typeof query.limit === "number" && Number.isFinite(query.limit) && query.limit > 0 - ? Math.floor(query.limit) - : undefined; - return typeof limit === "number" ? filtered.slice(0, limit) : filtered; -} - -function isMatchingTaskAuditSeverity( - actual: TaskAuditSeverity, - requested: PluginOperationAuditQuery["severity"], -): boolean { - return !requested || actual === requested; -} - -function toOperationAuditFinding(finding: TaskAuditFinding): PluginOperationAuditFinding { - return { - severity: finding.severity, - code: finding.code, - operation: toOperationRecord(finding.task), - detail: finding.detail, - ...(typeof finding.ageMs === "number" ? { ageMs: finding.ageMs } : {}), - }; -} - -async function auditTaskOperations( - query: PluginOperationAuditQuery = {}, -): Promise { - if (query.namespace && !isTaskNamespace(query.namespace)) { - return []; - } - return listTaskAuditFindings() - .filter((finding) => { - if (!isMatchingTaskAuditSeverity(finding.severity, query.severity)) { - return false; - } - if (query.code && finding.code !== query.code) { - return false; - } - return true; - }) - .map(toOperationAuditFinding); -} - -async function maintainTaskOperations( - query: PluginOperationMaintenanceQuery = {}, -): Promise { - if (query.namespace && !isTaskNamespace(query.namespace)) { - return { - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }; - } - return query.apply ? runTaskRegistryMaintenance() : previewTaskRegistryMaintenance(); -} - -export const defaultTaskOperationsRuntime: PluginOperationsRuntime = { - dispatch: dispatchTaskOperation, - async getById(operationId: string) { - const record = getTaskById(operationId.trim()); - return record ? toOperationRecord(record) : null; - }, - async findByRunId(runId: string) { - const record = findTaskByRunId(runId.trim()); - return record ? toOperationRecord(record) : null; - }, - list: getTaskOperationList, - async summarize(query) { - const records = await getTaskOperationList(query); - return summarizeOperationRecords(records); - }, - audit: auditTaskOperations, - maintenance: maintainTaskOperations, - async cancel(params): Promise { - const result = await cancelTaskById({ - cfg: params.cfg, - taskId: params.operationId, - }); - return { - found: result.found, - cancelled: result.cancelled, - reason: result.reason, - record: result.task ? toOperationRecord(result.task) : null, - }; - }, -}; - -export async function summarizeTaskOperations( - query: PluginOperationListQuery = {}, -): Promise { - return defaultTaskOperationsRuntime.summarize(query); -} diff --git a/src/tasks/task-executor-boundary.test.ts b/src/tasks/task-executor-boundary.test.ts index b2c5af37892..b0c140f0487 100644 --- a/src/tasks/task-executor-boundary.test.ts +++ b/src/tasks/task-executor-boundary.test.ts @@ -14,7 +14,6 @@ const RAW_TASK_MUTATORS = [ ] as const; const ALLOWED_CALLERS = new Set([ - "tasks/operations-runtime.ts", "tasks/task-executor.ts", "tasks/task-registry.ts", "tasks/task-registry.maintenance.ts", diff --git a/src/tasks/task-registry-import-boundary.test.ts b/src/tasks/task-registry-import-boundary.test.ts index 8be3a7bce1f..d6c5091f9de 100644 --- a/src/tasks/task-registry-import-boundary.test.ts +++ b/src/tasks/task-registry-import-boundary.test.ts @@ -11,8 +11,8 @@ const ALLOWED_IMPORTERS = new Set([ "auto-reply/reply/commands-subagents/action-info.ts", "commands/doctor-workspace-status.ts", "commands/flows.ts", + "commands/tasks.ts", "tasks/flow-runtime.ts", - "tasks/operations-runtime.ts", "tasks/task-executor.ts", "tasks/task-registry.maintenance.ts", ]); diff --git a/test/helpers/plugins/plugin-api.ts b/test/helpers/plugins/plugin-api.ts index d015903ce21..d89aca74b01 100644 --- a/test/helpers/plugins/plugin-api.ts +++ b/test/helpers/plugins/plugin-api.ts @@ -31,7 +31,6 @@ export function createTestPluginApi(api: TestPluginApiInput): OpenClawPluginApi registerMemoryFlushPlan() {}, registerMemoryRuntime() {}, registerMemoryEmbeddingProvider() {}, - registerOperationsRuntime() {}, resolvePath(input: string) { return input; }, diff --git a/test/helpers/plugins/plugin-runtime-mock.ts b/test/helpers/plugins/plugin-runtime-mock.ts index 99fa42f831b..a3e94e871b4 100644 --- a/test/helpers/plugins/plugin-runtime-mock.ts +++ b/test/helpers/plugins/plugin-runtime-mock.ts @@ -132,36 +132,6 @@ export function createPluginRuntimeMock(overrides: DeepPartial = stt: { transcribeAudioFile: vi.fn() as unknown as PluginRuntime["stt"]["transcribeAudioFile"], }, - operations: { - dispatch: vi.fn().mockResolvedValue({ - matched: false, - record: null, - }) as unknown as PluginRuntime["operations"]["dispatch"], - getById: vi.fn().mockResolvedValue(null) as unknown as PluginRuntime["operations"]["getById"], - findByRunId: vi - .fn() - .mockResolvedValue(null) as unknown as PluginRuntime["operations"]["findByRunId"], - list: vi.fn().mockResolvedValue([]) as unknown as PluginRuntime["operations"]["list"], - summarize: vi.fn().mockResolvedValue({ - total: 0, - active: 0, - terminal: 0, - failures: 0, - byNamespace: {}, - byKind: {}, - byStatus: {}, - }) as unknown as PluginRuntime["operations"]["summarize"], - audit: vi.fn().mockResolvedValue([]) as unknown as PluginRuntime["operations"]["audit"], - maintenance: vi.fn().mockResolvedValue({ - reconciled: 0, - cleanupStamped: 0, - pruned: 0, - }) as unknown as PluginRuntime["operations"]["maintenance"], - cancel: vi.fn().mockResolvedValue({ - found: false, - cancelled: false, - }) as unknown as PluginRuntime["operations"]["cancel"], - }, channel: { text: { chunkByNewline: vi.fn((text: string) => (text ? [text] : [])),