mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-04 08:50:21 +00:00
Revert "refactor: move tasks into bundled plugin"
This reverts commit c75f4695b7.
This commit is contained in:
@@ -1,183 +0,0 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import { defaultTaskOperationsRuntime } from "./operations-runtime.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js";
|
||||
|
||||
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
|
||||
|
||||
async function withTaskStateDir(run: () => Promise<void>): Promise<void> {
|
||||
await withTempDir({ prefix: "openclaw-task-operations-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
try {
|
||||
await run();
|
||||
} finally {
|
||||
resetTaskRegistryForTests();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
describe("task operations runtime", () => {
|
||||
afterEach(() => {
|
||||
if (ORIGINAL_STATE_DIR === undefined) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
|
||||
}
|
||||
resetTaskRegistryForTests();
|
||||
});
|
||||
|
||||
it("creates and transitions task records through the generic operations runtime", async () => {
|
||||
await withTaskStateDir(async () => {
|
||||
const created = await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "create",
|
||||
namespace: "tasks",
|
||||
kind: "cli",
|
||||
status: "queued",
|
||||
requesterSessionKey: "agent:test:main",
|
||||
childSessionKey: "agent:test:child",
|
||||
runId: "run-ops-create",
|
||||
title: "Task title",
|
||||
description: "Do the thing",
|
||||
});
|
||||
|
||||
expect(created.matched).toBe(true);
|
||||
expect(created.created).toBe(true);
|
||||
expect(created.record).toMatchObject({
|
||||
namespace: "tasks",
|
||||
kind: "cli",
|
||||
status: "queued",
|
||||
title: "Task title",
|
||||
description: "Do the thing",
|
||||
runId: "run-ops-create",
|
||||
});
|
||||
|
||||
const progressed = await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "transition",
|
||||
runId: "run-ops-create",
|
||||
status: "running",
|
||||
at: 100,
|
||||
startedAt: 100,
|
||||
progressSummary: "Started work",
|
||||
});
|
||||
|
||||
expect(progressed.record).toMatchObject({
|
||||
status: "running",
|
||||
progressSummary: "Started work",
|
||||
});
|
||||
|
||||
const completed = await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "transition",
|
||||
runId: "run-ops-create",
|
||||
status: "succeeded",
|
||||
at: 200,
|
||||
endedAt: 200,
|
||||
terminalSummary: "All done",
|
||||
});
|
||||
|
||||
expect(completed.record).toMatchObject({
|
||||
status: "succeeded",
|
||||
terminalSummary: "All done",
|
||||
});
|
||||
expect(findTaskByRunId("run-ops-create")).toMatchObject({
|
||||
status: "succeeded",
|
||||
terminalSummary: "All done",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("lists and summarizes task-backed operations", async () => {
|
||||
await withTaskStateDir(async () => {
|
||||
await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "create",
|
||||
namespace: "tasks",
|
||||
kind: "acp",
|
||||
status: "running",
|
||||
requesterSessionKey: "agent:test:main",
|
||||
runId: "run-ops-list-1",
|
||||
description: "One",
|
||||
startedAt: 10,
|
||||
});
|
||||
await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "create",
|
||||
namespace: "tasks",
|
||||
kind: "cron",
|
||||
status: "failed",
|
||||
requesterSessionKey: "agent:test:main",
|
||||
runId: "run-ops-list-2",
|
||||
description: "Two",
|
||||
endedAt: 20,
|
||||
terminalSummary: "Failed",
|
||||
});
|
||||
|
||||
const listed = await defaultTaskOperationsRuntime.list({
|
||||
namespace: "tasks",
|
||||
});
|
||||
const summary = await defaultTaskOperationsRuntime.summarize({
|
||||
namespace: "tasks",
|
||||
});
|
||||
|
||||
expect(listed).toHaveLength(2);
|
||||
expect(summary).toEqual({
|
||||
total: 2,
|
||||
active: 1,
|
||||
terminal: 1,
|
||||
failures: 1,
|
||||
byNamespace: { tasks: 2 },
|
||||
byKind: { acp: 1, cron: 1 },
|
||||
byStatus: { failed: 1, running: 1 },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("patches notify policy and exposes audit plus maintenance", async () => {
|
||||
await withTaskStateDir(async () => {
|
||||
const created = await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "create",
|
||||
namespace: "tasks",
|
||||
kind: "cli",
|
||||
status: "running",
|
||||
requesterSessionKey: "agent:test:main",
|
||||
runId: "run-ops-patch",
|
||||
description: "Patch me",
|
||||
startedAt: Date.now() - 31 * 60_000,
|
||||
});
|
||||
|
||||
expect(created.record?.metadata?.notifyPolicy).toBe("done_only");
|
||||
|
||||
const findings = await defaultTaskOperationsRuntime.audit({
|
||||
namespace: "tasks",
|
||||
severity: "error",
|
||||
code: "stale_running",
|
||||
});
|
||||
|
||||
const patched = await defaultTaskOperationsRuntime.dispatch({
|
||||
type: "patch",
|
||||
operationId: created.record?.operationId,
|
||||
metadataPatch: {
|
||||
notifyPolicy: "silent",
|
||||
},
|
||||
});
|
||||
|
||||
expect(patched.record?.metadata?.notifyPolicy).toBe("silent");
|
||||
|
||||
const preview = await defaultTaskOperationsRuntime.maintenance({
|
||||
namespace: "tasks",
|
||||
});
|
||||
|
||||
expect(findings).toHaveLength(1);
|
||||
expect(findings[0]).toMatchObject({
|
||||
severity: "error",
|
||||
code: "stale_running",
|
||||
operation: {
|
||||
operationId: created.record?.operationId,
|
||||
},
|
||||
});
|
||||
expect(preview).toEqual({
|
||||
reconciled: 0,
|
||||
cleanupStamped: 0,
|
||||
pruned: 0,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,389 +0,0 @@
|
||||
import type {
|
||||
PluginOperationAuditFinding,
|
||||
PluginOperationAuditQuery,
|
||||
PluginOperationDispatchEvent,
|
||||
PluginOperationDispatchResult,
|
||||
PluginOperationListQuery,
|
||||
PluginOperationMaintenanceQuery,
|
||||
PluginOperationMaintenanceSummary,
|
||||
PluginOperationRecord,
|
||||
PluginOperationSummary,
|
||||
PluginOperationsCancelResult,
|
||||
PluginOperationsRuntime,
|
||||
} from "../plugins/operations-state.js";
|
||||
import { summarizeOperationRecords } from "../plugins/operations-state.js";
|
||||
import {
|
||||
listTaskAuditFindings,
|
||||
type TaskAuditFinding,
|
||||
type TaskAuditSeverity,
|
||||
} from "./task-registry.audit.js";
|
||||
import {
|
||||
cancelTaskById,
|
||||
createTaskRecord,
|
||||
findTaskByRunId,
|
||||
getTaskById,
|
||||
listTaskRecords,
|
||||
listTasksForSessionKey,
|
||||
markTaskLostById,
|
||||
markTaskRunningByRunId,
|
||||
markTaskTerminalByRunId,
|
||||
recordTaskProgressByRunId,
|
||||
updateTaskNotifyPolicyById,
|
||||
} from "./task-registry.js";
|
||||
import {
|
||||
previewTaskRegistryMaintenance,
|
||||
runTaskRegistryMaintenance,
|
||||
} from "./task-registry.maintenance.js";
|
||||
import type {
|
||||
TaskRecord,
|
||||
TaskRuntime,
|
||||
TaskStatus,
|
||||
TaskTerminalOutcome,
|
||||
} from "./task-registry.types.js";
|
||||
|
||||
const TASK_NAMESPACE = "tasks";
|
||||
|
||||
function isTaskNamespace(namespace: string | undefined): boolean {
|
||||
const trimmed = namespace?.trim().toLowerCase();
|
||||
return !trimmed || trimmed === "task" || trimmed === TASK_NAMESPACE;
|
||||
}
|
||||
|
||||
function normalizeTaskRuntime(kind: string): TaskRuntime {
|
||||
const trimmed = kind.trim();
|
||||
if (trimmed === "acp" || trimmed === "subagent" || trimmed === "cli" || trimmed === "cron") {
|
||||
return trimmed;
|
||||
}
|
||||
throw new Error(`Unsupported task operation kind: ${kind}`);
|
||||
}
|
||||
|
||||
function normalizeTaskStatus(status: string | undefined): TaskStatus {
|
||||
const trimmed = status?.trim();
|
||||
if (
|
||||
trimmed === "queued" ||
|
||||
trimmed === "running" ||
|
||||
trimmed === "succeeded" ||
|
||||
trimmed === "failed" ||
|
||||
trimmed === "timed_out" ||
|
||||
trimmed === "cancelled" ||
|
||||
trimmed === "lost"
|
||||
) {
|
||||
return trimmed;
|
||||
}
|
||||
return "queued";
|
||||
}
|
||||
|
||||
function normalizeTaskTerminalOutcome(status: TaskStatus): TaskTerminalOutcome | undefined {
|
||||
return status === "succeeded" ? "succeeded" : undefined;
|
||||
}
|
||||
|
||||
function toOperationRecord(task: TaskRecord): PluginOperationRecord {
|
||||
const metadata: Record<string, unknown> = {
|
||||
deliveryStatus: task.deliveryStatus,
|
||||
notifyPolicy: task.notifyPolicy,
|
||||
};
|
||||
if (typeof task.cleanupAfter === "number") {
|
||||
metadata.cleanupAfter = task.cleanupAfter;
|
||||
}
|
||||
if (task.terminalOutcome) {
|
||||
metadata.terminalOutcome = task.terminalOutcome;
|
||||
}
|
||||
return {
|
||||
operationId: task.taskId,
|
||||
namespace: TASK_NAMESPACE,
|
||||
kind: task.runtime,
|
||||
status: task.status,
|
||||
sourceId: task.sourceId,
|
||||
requesterSessionKey: task.requesterSessionKey,
|
||||
childSessionKey: task.childSessionKey,
|
||||
parentOperationId: task.parentTaskId,
|
||||
agentId: task.agentId,
|
||||
runId: task.runId,
|
||||
title: task.label,
|
||||
description: task.task,
|
||||
createdAt: task.createdAt,
|
||||
startedAt: task.startedAt,
|
||||
endedAt: task.endedAt,
|
||||
updatedAt: task.lastEventAt ?? task.endedAt ?? task.startedAt ?? task.createdAt,
|
||||
error: task.error,
|
||||
progressSummary: task.progressSummary,
|
||||
terminalSummary: task.terminalSummary,
|
||||
metadata,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveTaskRecordForTransition(event: {
|
||||
operationId?: string;
|
||||
runId?: string;
|
||||
}): TaskRecord | undefined {
|
||||
const operationId = event.operationId?.trim();
|
||||
if (operationId) {
|
||||
return getTaskById(operationId);
|
||||
}
|
||||
const runId = event.runId?.trim();
|
||||
if (runId) {
|
||||
return findTaskByRunId(runId);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function filterOperationRecord(
|
||||
record: PluginOperationRecord,
|
||||
query: PluginOperationListQuery,
|
||||
): boolean {
|
||||
if (query.namespace && !isTaskNamespace(query.namespace)) {
|
||||
return false;
|
||||
}
|
||||
if (query.kind && record.kind !== query.kind) {
|
||||
return false;
|
||||
}
|
||||
if (query.status && record.status !== query.status) {
|
||||
return false;
|
||||
}
|
||||
if (query.runId && record.runId !== query.runId) {
|
||||
return false;
|
||||
}
|
||||
if (query.sourceId && record.sourceId !== query.sourceId) {
|
||||
return false;
|
||||
}
|
||||
if (query.parentOperationId && record.parentOperationId !== query.parentOperationId) {
|
||||
return false;
|
||||
}
|
||||
if (
|
||||
query.sessionKey &&
|
||||
record.requesterSessionKey !== query.sessionKey &&
|
||||
record.childSessionKey !== query.sessionKey
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async function dispatchTaskOperation(
|
||||
event: PluginOperationDispatchEvent,
|
||||
): Promise<PluginOperationDispatchResult> {
|
||||
if (event.type === "create") {
|
||||
if (!isTaskNamespace(event.namespace)) {
|
||||
throw new Error(
|
||||
`Default operations runtime only supports the "${TASK_NAMESPACE}" namespace.`,
|
||||
);
|
||||
}
|
||||
const status = normalizeTaskStatus(event.status);
|
||||
const record = createTaskRecord({
|
||||
runtime: normalizeTaskRuntime(event.kind),
|
||||
sourceId: event.sourceId,
|
||||
requesterSessionKey: event.requesterSessionKey?.trim() || "",
|
||||
childSessionKey: event.childSessionKey,
|
||||
parentTaskId: event.parentOperationId,
|
||||
agentId: event.agentId,
|
||||
runId: event.runId,
|
||||
label: event.title,
|
||||
task: event.description,
|
||||
status,
|
||||
startedAt: event.startedAt,
|
||||
lastEventAt: event.updatedAt ?? event.startedAt ?? event.createdAt,
|
||||
progressSummary: event.progressSummary,
|
||||
terminalSummary: event.terminalSummary,
|
||||
terminalOutcome: normalizeTaskTerminalOutcome(status),
|
||||
});
|
||||
return {
|
||||
matched: true,
|
||||
created: true,
|
||||
record: toOperationRecord(record),
|
||||
};
|
||||
}
|
||||
|
||||
if (event.type === "patch") {
|
||||
const current = resolveTaskRecordForTransition(event);
|
||||
if (!current) {
|
||||
return {
|
||||
matched: false,
|
||||
record: null,
|
||||
};
|
||||
}
|
||||
const nextNotifyPolicy = event.metadataPatch?.notifyPolicy;
|
||||
const next =
|
||||
nextNotifyPolicy === "done_only" ||
|
||||
nextNotifyPolicy === "state_changes" ||
|
||||
nextNotifyPolicy === "silent"
|
||||
? (updateTaskNotifyPolicyById({
|
||||
taskId: current.taskId,
|
||||
notifyPolicy: nextNotifyPolicy,
|
||||
}) ?? current)
|
||||
: current;
|
||||
return {
|
||||
matched: true,
|
||||
record: toOperationRecord(next),
|
||||
};
|
||||
}
|
||||
|
||||
const current = resolveTaskRecordForTransition(event);
|
||||
if (!current) {
|
||||
return {
|
||||
matched: false,
|
||||
record: null,
|
||||
};
|
||||
}
|
||||
|
||||
const at = event.at ?? event.endedAt ?? event.startedAt ?? Date.now();
|
||||
const runId = event.runId?.trim() || current.runId?.trim();
|
||||
const status = normalizeTaskStatus(event.status);
|
||||
let next: TaskRecord | null | undefined;
|
||||
|
||||
if (status === "running") {
|
||||
if (!runId) {
|
||||
throw new Error("Task transition to running requires a runId.");
|
||||
}
|
||||
next = markTaskRunningByRunId({
|
||||
runId,
|
||||
startedAt: event.startedAt,
|
||||
lastEventAt: at,
|
||||
progressSummary: event.progressSummary,
|
||||
eventSummary: event.progressSummary,
|
||||
})[0];
|
||||
} else if (status === "queued") {
|
||||
if (!runId) {
|
||||
throw new Error("Task transition to queued requires a runId.");
|
||||
}
|
||||
next = recordTaskProgressByRunId({
|
||||
runId,
|
||||
lastEventAt: at,
|
||||
progressSummary: event.progressSummary,
|
||||
eventSummary: event.progressSummary,
|
||||
})[0];
|
||||
} else if (
|
||||
status === "succeeded" ||
|
||||
status === "failed" ||
|
||||
status === "timed_out" ||
|
||||
status === "cancelled"
|
||||
) {
|
||||
if (!runId) {
|
||||
throw new Error(`Task transition to ${status} requires a runId.`);
|
||||
}
|
||||
next = markTaskTerminalByRunId({
|
||||
runId,
|
||||
status,
|
||||
startedAt: event.startedAt,
|
||||
endedAt: event.endedAt ?? at,
|
||||
lastEventAt: at,
|
||||
error: event.error ?? undefined,
|
||||
progressSummary: event.progressSummary,
|
||||
terminalSummary: event.terminalSummary,
|
||||
terminalOutcome: status === "succeeded" ? "succeeded" : undefined,
|
||||
})[0];
|
||||
} else if (status === "lost") {
|
||||
next = markTaskLostById({
|
||||
taskId: current.taskId,
|
||||
endedAt: event.endedAt ?? at,
|
||||
lastEventAt: at,
|
||||
error: event.error ?? undefined,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
matched: true,
|
||||
record: next ? toOperationRecord(next) : toOperationRecord(current),
|
||||
};
|
||||
}
|
||||
|
||||
async function getTaskOperationList(
|
||||
query: PluginOperationListQuery = {},
|
||||
): Promise<PluginOperationRecord[]> {
|
||||
if (query.namespace && !isTaskNamespace(query.namespace)) {
|
||||
return [];
|
||||
}
|
||||
const records = (
|
||||
query.sessionKey ? listTasksForSessionKey(query.sessionKey) : listTaskRecords()
|
||||
).map(toOperationRecord);
|
||||
const filtered = records.filter((record) => filterOperationRecord(record, query));
|
||||
const limit =
|
||||
typeof query.limit === "number" && Number.isFinite(query.limit) && query.limit > 0
|
||||
? Math.floor(query.limit)
|
||||
: undefined;
|
||||
return typeof limit === "number" ? filtered.slice(0, limit) : filtered;
|
||||
}
|
||||
|
||||
function isMatchingTaskAuditSeverity(
|
||||
actual: TaskAuditSeverity,
|
||||
requested: PluginOperationAuditQuery["severity"],
|
||||
): boolean {
|
||||
return !requested || actual === requested;
|
||||
}
|
||||
|
||||
function toOperationAuditFinding(finding: TaskAuditFinding): PluginOperationAuditFinding {
|
||||
return {
|
||||
severity: finding.severity,
|
||||
code: finding.code,
|
||||
operation: toOperationRecord(finding.task),
|
||||
detail: finding.detail,
|
||||
...(typeof finding.ageMs === "number" ? { ageMs: finding.ageMs } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async function auditTaskOperations(
|
||||
query: PluginOperationAuditQuery = {},
|
||||
): Promise<PluginOperationAuditFinding[]> {
|
||||
if (query.namespace && !isTaskNamespace(query.namespace)) {
|
||||
return [];
|
||||
}
|
||||
return listTaskAuditFindings()
|
||||
.filter((finding) => {
|
||||
if (!isMatchingTaskAuditSeverity(finding.severity, query.severity)) {
|
||||
return false;
|
||||
}
|
||||
if (query.code && finding.code !== query.code) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.map(toOperationAuditFinding);
|
||||
}
|
||||
|
||||
async function maintainTaskOperations(
|
||||
query: PluginOperationMaintenanceQuery = {},
|
||||
): Promise<PluginOperationMaintenanceSummary> {
|
||||
if (query.namespace && !isTaskNamespace(query.namespace)) {
|
||||
return {
|
||||
reconciled: 0,
|
||||
cleanupStamped: 0,
|
||||
pruned: 0,
|
||||
};
|
||||
}
|
||||
return query.apply ? runTaskRegistryMaintenance() : previewTaskRegistryMaintenance();
|
||||
}
|
||||
|
||||
export const defaultTaskOperationsRuntime: PluginOperationsRuntime = {
|
||||
dispatch: dispatchTaskOperation,
|
||||
async getById(operationId: string) {
|
||||
const record = getTaskById(operationId.trim());
|
||||
return record ? toOperationRecord(record) : null;
|
||||
},
|
||||
async findByRunId(runId: string) {
|
||||
const record = findTaskByRunId(runId.trim());
|
||||
return record ? toOperationRecord(record) : null;
|
||||
},
|
||||
list: getTaskOperationList,
|
||||
async summarize(query) {
|
||||
const records = await getTaskOperationList(query);
|
||||
return summarizeOperationRecords(records);
|
||||
},
|
||||
audit: auditTaskOperations,
|
||||
maintenance: maintainTaskOperations,
|
||||
async cancel(params): Promise<PluginOperationsCancelResult> {
|
||||
const result = await cancelTaskById({
|
||||
cfg: params.cfg,
|
||||
taskId: params.operationId,
|
||||
});
|
||||
return {
|
||||
found: result.found,
|
||||
cancelled: result.cancelled,
|
||||
reason: result.reason,
|
||||
record: result.task ? toOperationRecord(result.task) : null,
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
export async function summarizeTaskOperations(
|
||||
query: PluginOperationListQuery = {},
|
||||
): Promise<PluginOperationSummary> {
|
||||
return defaultTaskOperationsRuntime.summarize(query);
|
||||
}
|
||||
@@ -14,7 +14,6 @@ const RAW_TASK_MUTATORS = [
|
||||
] as const;
|
||||
|
||||
const ALLOWED_CALLERS = new Set([
|
||||
"tasks/operations-runtime.ts",
|
||||
"tasks/task-executor.ts",
|
||||
"tasks/task-registry.ts",
|
||||
"tasks/task-registry.maintenance.ts",
|
||||
|
||||
@@ -11,8 +11,8 @@ const ALLOWED_IMPORTERS = new Set([
|
||||
"auto-reply/reply/commands-subagents/action-info.ts",
|
||||
"commands/doctor-workspace-status.ts",
|
||||
"commands/flows.ts",
|
||||
"commands/tasks.ts",
|
||||
"tasks/flow-runtime.ts",
|
||||
"tasks/operations-runtime.ts",
|
||||
"tasks/task-executor.ts",
|
||||
"tasks/task-registry.maintenance.ts",
|
||||
]);
|
||||
|
||||
Reference in New Issue
Block a user