fix(tasks): harden task-flow restore and maintenance

This commit is contained in:
Vincent Koc
2026-04-02 21:36:47 +09:00
parent 9c22d63669
commit 0f45630d19
6 changed files with 796 additions and 35 deletions

View File

@@ -0,0 +1,141 @@
import { afterEach, describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createRunningTaskRun } from "./task-executor.js";
import { listTaskFlowAuditFindings } from "./task-flow-registry.audit.js";
import {
createManagedTaskFlow,
resetTaskFlowRegistryForTests,
setFlowWaiting,
} from "./task-flow-registry.js";
import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js";
import {
resetTaskRegistryDeliveryRuntimeForTests,
resetTaskRegistryForTests,
} from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
async function withTaskFlowAuditStateDir(run: (root: string) => Promise<void>): Promise<void> {
await withTempDir({ prefix: "openclaw-task-flow-audit-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
}
});
}
describe("task-flow-registry audit", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
});
it("surfaces restore failures as task-flow audit findings", () => {
configureTaskFlowRegistryRuntime({
store: {
loadSnapshot: () => {
throw new Error("boom");
},
saveSnapshot: () => {},
},
});
expect(listTaskFlowAuditFindings()).toEqual([
expect.objectContaining({
severity: "error",
code: "restore_failed",
detail: expect.stringContaining("boom"),
}),
]);
});
it("detects stuck managed flows and missing blocked tasks", async () => {
await withTaskFlowAuditStateDir(async () => {
const running = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-flow-audit",
goal: "Inspect queue",
status: "running",
createdAt: 1,
updatedAt: 1,
});
const blocked = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-flow-audit",
goal: "Wait on child",
status: "running",
createdAt: 1,
updatedAt: 1,
});
setFlowWaiting({
flowId: blocked.flowId,
expectedRevision: blocked.revision,
blockedTaskId: "task-missing",
blockedSummary: "Need follow-up",
updatedAt: 1,
});
const findings = listTaskFlowAuditFindings({ now: 31 * 60_000 });
expect(findings).toEqual(
expect.arrayContaining([
expect.objectContaining({
code: "missing_linked_tasks",
flow: expect.objectContaining({ flowId: running.flowId }),
}),
expect.objectContaining({
code: "blocked_task_missing",
flow: expect.objectContaining({ flowId: blocked.flowId }),
}),
]),
);
});
});
it("does not flag managed flows with active linked tasks as missing", async () => {
await withTaskFlowAuditStateDir(async () => {
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-flow-audit",
goal: "Inspect queue",
status: "running",
createdAt: 1,
updatedAt: 1,
});
createRunningTaskRun({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
parentFlowId: flow.flowId,
childSessionKey: "agent:main:child",
runId: "task-flow-audit-child",
task: "Inspect PR 1",
startedAt: 1,
lastEventAt: 1,
});
expect(listTaskFlowAuditFindings({ now: 31 * 60_000 })).not.toEqual(
expect.arrayContaining([
expect.objectContaining({
code: "missing_linked_tasks",
flow: expect.objectContaining({ flowId: flow.flowId }),
}),
]),
);
});
});
});

View File

@@ -0,0 +1,280 @@
import { listTasksForFlowId } from "./runtime-internal.js";
import { getTaskFlowRegistryRestoreFailure, listTaskFlowRecords } from "./task-flow-registry.js";
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
import type { TaskRecord } from "./task-registry.types.js";
export type TaskFlowAuditSeverity = "warn" | "error";
export type TaskFlowAuditCode =
| "restore_failed"
| "stale_running"
| "stale_waiting"
| "stale_blocked"
| "cancel_stuck"
| "missing_linked_tasks"
| "blocked_task_missing"
| "inconsistent_timestamps";
export type TaskFlowAuditFinding = {
severity: TaskFlowAuditSeverity;
code: TaskFlowAuditCode;
detail: string;
ageMs?: number;
flow?: TaskFlowRecord;
};
export type TaskFlowAuditSummary = {
total: number;
warnings: number;
errors: number;
byCode: Record<TaskFlowAuditCode, number>;
};
export type TaskFlowAuditOptions = {
now?: number;
flows?: TaskFlowRecord[];
staleRunningMs?: number;
staleWaitingMs?: number;
staleBlockedMs?: number;
cancelStuckMs?: number;
};
const DEFAULT_STALE_RUNNING_MS = 30 * 60_000;
const DEFAULT_STALE_WAITING_MS = 30 * 60_000;
const DEFAULT_STALE_BLOCKED_MS = 30 * 60_000;
const DEFAULT_CANCEL_STUCK_MS = 5 * 60_000;
function createFinding(params: {
severity: TaskFlowAuditSeverity;
code: TaskFlowAuditCode;
detail: string;
ageMs?: number;
flow?: TaskFlowRecord;
}): TaskFlowAuditFinding {
return {
severity: params.severity,
code: params.code,
detail: params.detail,
...(typeof params.ageMs === "number" ? { ageMs: params.ageMs } : {}),
...(params.flow ? { flow: params.flow } : {}),
};
}
function severityRank(severity: TaskFlowAuditSeverity): number {
return severity === "error" ? 0 : 1;
}
function compareFindings(left: TaskFlowAuditFinding, right: TaskFlowAuditFinding): number {
const severityDiff = severityRank(left.severity) - severityRank(right.severity);
if (severityDiff !== 0) {
return severityDiff;
}
const leftAge = left.ageMs ?? -1;
const rightAge = right.ageMs ?? -1;
if (leftAge !== rightAge) {
return rightAge - leftAge;
}
return (left.flow?.createdAt ?? 0) - (right.flow?.createdAt ?? 0);
}
function getReferenceAt(flow: TaskFlowRecord): number {
return flow.updatedAt ?? flow.createdAt;
}
function getLinkedTasks(flowId: string): TaskRecord[] {
return listTasksForFlowId(flowId);
}
function hasBlockingMetadata(flow: TaskFlowRecord): boolean {
return Boolean(
flow.blockedTaskId?.trim() || flow.blockedSummary?.trim() || flow.waitJson != null,
);
}
function findTimestampInconsistency(flow: TaskFlowRecord): TaskFlowAuditFinding | null {
if (flow.updatedAt < flow.createdAt) {
return createFinding({
severity: "warn",
code: "inconsistent_timestamps",
flow,
detail: "updatedAt is earlier than createdAt",
});
}
if (flow.endedAt && flow.endedAt < flow.createdAt) {
return createFinding({
severity: "warn",
code: "inconsistent_timestamps",
flow,
detail: "endedAt is earlier than createdAt",
});
}
if (flow.endedAt && flow.endedAt < flow.updatedAt) {
return createFinding({
severity: "warn",
code: "inconsistent_timestamps",
flow,
detail: "endedAt is earlier than updatedAt",
});
}
return null;
}
export function createEmptyTaskFlowAuditSummary(): TaskFlowAuditSummary {
return {
total: 0,
warnings: 0,
errors: 0,
byCode: {
restore_failed: 0,
stale_running: 0,
stale_waiting: 0,
stale_blocked: 0,
cancel_stuck: 0,
missing_linked_tasks: 0,
blocked_task_missing: 0,
inconsistent_timestamps: 0,
},
};
}
export function listTaskFlowAuditFindings(
options: TaskFlowAuditOptions = {},
): TaskFlowAuditFinding[] {
const flows = options.flows ?? listTaskFlowRecords();
const now = options.now ?? Date.now();
const staleRunningMs = options.staleRunningMs ?? DEFAULT_STALE_RUNNING_MS;
const staleWaitingMs = options.staleWaitingMs ?? DEFAULT_STALE_WAITING_MS;
const staleBlockedMs = options.staleBlockedMs ?? DEFAULT_STALE_BLOCKED_MS;
const cancelStuckMs = options.cancelStuckMs ?? DEFAULT_CANCEL_STUCK_MS;
const findings: TaskFlowAuditFinding[] = [];
const restoreFailure = getTaskFlowRegistryRestoreFailure();
if (restoreFailure) {
findings.push(
createFinding({
severity: "error",
code: "restore_failed",
detail: `task-flow registry restore failed: ${restoreFailure}`,
}),
);
}
for (const flow of flows) {
const referenceAt = getReferenceAt(flow);
const ageMs = Math.max(0, now - referenceAt);
const linkedTasks = getLinkedTasks(flow.flowId);
const activeTasks = linkedTasks.filter(
(task) => task.status === "queued" || task.status === "running",
);
if (flow.status === "running" && ageMs >= staleRunningMs) {
findings.push(
createFinding({
severity: "error",
code: "stale_running",
flow,
ageMs,
detail: "running TaskFlow has not advanced recently",
}),
);
}
if (flow.status === "waiting" && ageMs >= staleWaitingMs) {
findings.push(
createFinding({
severity: "warn",
code: "stale_waiting",
flow,
ageMs,
detail: "waiting TaskFlow has not advanced recently",
}),
);
}
if (flow.status === "blocked" && ageMs >= staleBlockedMs) {
findings.push(
createFinding({
severity: "warn",
code: "stale_blocked",
flow,
ageMs,
detail: "blocked TaskFlow has not advanced recently",
}),
);
}
if (
flow.cancelRequestedAt != null &&
flow.status !== "cancelled" &&
flow.status !== "failed" &&
flow.status !== "succeeded" &&
flow.status !== "lost" &&
activeTasks.length === 0 &&
now - flow.cancelRequestedAt >= cancelStuckMs
) {
findings.push(
createFinding({
severity: "warn",
code: "cancel_stuck",
flow,
ageMs: Math.max(0, now - flow.cancelRequestedAt),
detail: "cancel-requested TaskFlow has no active child tasks but is still nonterminal",
}),
);
}
if (
flow.syncMode === "managed" &&
(flow.status === "running" || flow.status === "waiting" || flow.status === "blocked") &&
linkedTasks.length === 0 &&
!hasBlockingMetadata(flow)
) {
findings.push(
createFinding({
severity: flow.status === "running" ? "error" : "warn",
code: "missing_linked_tasks",
flow,
ageMs,
detail: "managed TaskFlow has no linked tasks or wait state",
}),
);
}
if (flow.blockedTaskId?.trim()) {
const blockedTaskId = flow.blockedTaskId.trim();
if (!linkedTasks.some((task) => task.taskId === blockedTaskId)) {
findings.push(
createFinding({
severity: "warn",
code: "blocked_task_missing",
flow,
ageMs,
detail: `blocked TaskFlow points at missing task ${blockedTaskId}`,
}),
);
}
}
const inconsistency = findTimestampInconsistency(flow);
if (inconsistency) {
findings.push(inconsistency);
}
}
return findings.toSorted(compareFindings);
}
export function summarizeTaskFlowAuditFindings(
findings: Iterable<TaskFlowAuditFinding>,
): TaskFlowAuditSummary {
const summary = createEmptyTaskFlowAuditSummary();
for (const finding of findings) {
summary.total += 1;
summary.byCode[finding.code] += 1;
if (finding.severity === "error") {
summary.errors += 1;
} else {
summary.warnings += 1;
}
}
return summary;
}

View File

@@ -0,0 +1,103 @@
import { afterEach, describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createManagedTaskFlow,
getTaskFlowById,
resetTaskFlowRegistryForTests,
} from "./task-flow-registry.js";
import {
previewTaskFlowRegistryMaintenance,
runTaskFlowRegistryMaintenance,
} from "./task-flow-registry.maintenance.js";
import {
resetTaskRegistryDeliveryRuntimeForTests,
resetTaskRegistryForTests,
} from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
async function withTaskFlowMaintenanceStateDir(
run: (root: string) => Promise<void>,
): Promise<void> {
await withTempDir({ prefix: "openclaw-task-flow-maintenance-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
}
});
}
describe("task-flow-registry maintenance", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
});
it("finalizes cancel-requested managed flows once no child tasks remain active", async () => {
await withTaskFlowMaintenanceStateDir(async () => {
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-flow-maintenance",
goal: "Cancel work",
status: "running",
cancelRequestedAt: 100,
createdAt: 1,
updatedAt: 100,
});
expect(previewTaskFlowRegistryMaintenance()).toEqual({
reconciled: 1,
pruned: 0,
});
expect(await runTaskFlowRegistryMaintenance()).toEqual({
reconciled: 1,
pruned: 0,
});
expect(getTaskFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "cancelled",
cancelRequestedAt: 100,
});
});
});
it("prunes old terminal flows", async () => {
await withTaskFlowMaintenanceStateDir(async () => {
const now = Date.now();
const oldFlow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-flow-maintenance",
goal: "Old terminal flow",
status: "succeeded",
createdAt: now - 8 * 24 * 60 * 60_000,
updatedAt: now - 8 * 24 * 60 * 60_000,
endedAt: now - 8 * 24 * 60 * 60_000,
});
expect(previewTaskFlowRegistryMaintenance()).toEqual({
reconciled: 0,
pruned: 1,
});
expect(await runTaskFlowRegistryMaintenance()).toEqual({
reconciled: 0,
pruned: 1,
});
expect(getTaskFlowById(oldFlow.flowId)).toBeUndefined();
});
});
});

View File

@@ -0,0 +1,131 @@
import { listTasksForFlowId } from "./runtime-internal.js";
import {
listTaskFlowAuditFindings,
summarizeTaskFlowAuditFindings,
type TaskFlowAuditSummary,
} from "./task-flow-registry.audit.js";
import {
deleteTaskFlowRecordById,
getTaskFlowById,
listTaskFlowRecords,
updateFlowRecordByIdExpectedRevision,
} from "./task-flow-registry.js";
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
const TASK_FLOW_RETENTION_MS = 7 * 24 * 60 * 60_000;
export type TaskFlowRegistryMaintenanceSummary = {
reconciled: number;
pruned: number;
};
function isTerminalFlow(flow: TaskFlowRecord): boolean {
return (
flow.status === "succeeded" ||
flow.status === "failed" ||
flow.status === "cancelled" ||
flow.status === "lost"
);
}
function hasActiveLinkedTasks(flowId: string): boolean {
return listTasksForFlowId(flowId).some(
(task) => task.status === "queued" || task.status === "running",
);
}
function resolveTerminalAt(flow: TaskFlowRecord): number {
return flow.endedAt ?? flow.updatedAt ?? flow.createdAt;
}
function shouldPruneFlow(flow: TaskFlowRecord, now: number): boolean {
if (!isTerminalFlow(flow)) {
return false;
}
if (hasActiveLinkedTasks(flow.flowId)) {
return false;
}
return now - resolveTerminalAt(flow) >= TASK_FLOW_RETENTION_MS;
}
function shouldFinalizeCancelledFlow(flow: TaskFlowRecord): boolean {
if (flow.syncMode !== "managed") {
return false;
}
if (flow.cancelRequestedAt == null || isTerminalFlow(flow)) {
return false;
}
return !hasActiveLinkedTasks(flow.flowId);
}
function finalizeCancelledFlow(flow: TaskFlowRecord, now: number): boolean {
let current = flow;
for (let attempt = 0; attempt < 2; attempt += 1) {
const endedAt = Math.max(now, current.updatedAt, current.cancelRequestedAt ?? now);
const result = updateFlowRecordByIdExpectedRevision({
flowId: current.flowId,
expectedRevision: current.revision,
patch: {
status: "cancelled",
blockedTaskId: null,
blockedSummary: null,
waitJson: null,
endedAt,
updatedAt: endedAt,
},
});
if (result.applied) {
return true;
}
if (result.reason === "not_found" || !result.current) {
return false;
}
current = result.current;
if (!shouldFinalizeCancelledFlow(current)) {
return false;
}
}
return false;
}
export function getInspectableTaskFlowAuditSummary(): TaskFlowAuditSummary {
return summarizeTaskFlowAuditFindings(listTaskFlowAuditFindings());
}
export function previewTaskFlowRegistryMaintenance(): TaskFlowRegistryMaintenanceSummary {
const now = Date.now();
let reconciled = 0;
let pruned = 0;
for (const flow of listTaskFlowRecords()) {
if (shouldFinalizeCancelledFlow(flow)) {
reconciled += 1;
continue;
}
if (shouldPruneFlow(flow, now)) {
pruned += 1;
}
}
return { reconciled, pruned };
}
export async function runTaskFlowRegistryMaintenance(): Promise<TaskFlowRegistryMaintenanceSummary> {
const now = Date.now();
let reconciled = 0;
let pruned = 0;
for (const flow of listTaskFlowRecords()) {
const current = getTaskFlowById(flow.flowId);
if (!current) {
continue;
}
if (shouldFinalizeCancelledFlow(current)) {
if (finalizeCancelledFlow(current, now)) {
reconciled += 1;
}
continue;
}
if (shouldPruneFlow(current, now) && deleteTaskFlowRecordById(current.flowId)) {
pruned += 1;
}
}
return { reconciled, pruned };
}

View File

@@ -1,4 +1,5 @@
import crypto from "node:crypto";
import { createSubsystemLogger } from "../logging/subsystem.js";
import {
getTaskFlowRegistryHooks,
getTaskFlowRegistryStore,
@@ -13,8 +14,10 @@ import type {
} from "./task-flow-registry.types.js";
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/task-flow-registry");
const flows = new Map<string, TaskFlowRecord>();
let restoreAttempted = false;
let restoreFailureMessage: string | null = null;
type FlowRecordPatch = Omit<
Partial<
@@ -200,10 +203,18 @@ function ensureFlowRegistryReady() {
return;
}
restoreAttempted = true;
const restored = getTaskFlowRegistryStore().loadSnapshot();
flows.clear();
for (const [flowId, flow] of restored.flows) {
flows.set(flowId, normalizeRestoredFlowRecord(flow));
try {
const restored = getTaskFlowRegistryStore().loadSnapshot();
flows.clear();
for (const [flowId, flow] of restored.flows) {
flows.set(flowId, normalizeRestoredFlowRecord(flow));
}
restoreFailureMessage = null;
} catch (error) {
flows.clear();
restoreFailureMessage = error instanceof Error ? error.message : String(error);
log.warn("Failed to restore task-flow registry", { error });
return;
}
emitFlowRegistryHookEvent(() => ({
kind: "restored",
@@ -211,6 +222,11 @@ function ensureFlowRegistryReady() {
}));
}
export function getTaskFlowRegistryRestoreFailure(): string | null {
ensureFlowRegistryReady();
return restoreFailureMessage;
}
function persistFlowRegistry() {
getTaskFlowRegistryStore().saveSnapshot({
flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])),
@@ -690,6 +706,7 @@ export function deleteTaskFlowRecordById(flowId: string): boolean {
export function resetTaskFlowRegistryForTests(opts?: { persist?: boolean }) {
flows.clear();
restoreAttempted = false;
restoreFailureMessage = null;
resetTaskFlowRegistryRuntimeForTests();
if (opts?.persist !== false) {
persistFlowRegistry();