fix(tasks): normalize task timestamps and retained lost audit

Normalize task lifecycle timestamps on create, update, and restore so startedAt/lastEventAt/endedAt cannot precede createdAt in audit-visible records.

Downgrade retained lost tasks with future cleanupAfter from audit errors to warnings while keeping expired or unstamped lost tasks as errors.

Verification: pnpm exec oxfmt --write --threads=1 src/tasks/task-registry.ts src/tasks/task-registry.test.ts src/tasks/task-registry.audit.ts src/tasks/task-registry.audit.test.ts

Verification: node scripts/test-projects.mjs src/tasks/task-registry.test.ts src/tasks/task-registry.audit.test.ts (task-registry.audit.test.ts 4 passed; task-registry.test.ts 45 passed)
This commit is contained in:
Likewen
2026-04-26 00:17:43 +08:00
committed by Peter Steinberger
parent d955bf0ff8
commit de5b173546
4 changed files with 188 additions and 6 deletions

View File

@@ -83,6 +83,38 @@ describe("task-registry audit", () => {
});
});
it("downgrades retained lost tasks with future cleanupAfter to warnings", () => {
const now = Date.parse("2026-03-30T01:00:00.000Z");
const findings = listTaskAuditFindings({
now,
tasks: [
createTask({
taskId: "lost-retained",
status: "lost",
error: "backing session missing",
endedAt: now - 60_000,
lastEventAt: now - 60_000,
cleanupAfter: now + 60_000,
}),
createTask({
taskId: "lost-expired",
status: "lost",
error: "backing session missing",
endedAt: now - 120_000,
lastEventAt: now - 120_000,
cleanupAfter: now - 1,
}),
],
});
expect(
findings.map((finding) => [finding.task.taskId, finding.code, finding.severity]),
).toEqual([
["lost-expired", "lost", "error"],
["lost-retained", "lost", "warn"],
]);
});
it("does not double-report lost tasks as missing cleanup", () => {
const now = Date.parse("2026-03-30T01:00:00.000Z");
const findings = listTaskAuditFindings({

View File

@@ -125,13 +125,17 @@ export function listTaskAuditFindings(options: TaskAuditOptions = {}): TaskAudit
}
if (task.status === "lost") {
const retainedUntilCleanup = typeof task.cleanupAfter === "number" && task.cleanupAfter > now;
findings.push(
createFinding({
severity: "error",
severity: retainedUntilCleanup ? "warn" : "error",
code: "lost",
task,
ageMs,
detail: task.error?.trim() || "task lost its backing session",
detail: retainedUntilCleanup
? task.error?.trim() ||
"task lost its backing session and is retained until cleanupAfter"
: task.error?.trim() || "task lost its backing session",
}),
);
}

View File

@@ -1590,6 +1590,108 @@ describe("task-registry", () => {
});
});
it("backdates createdAt when a task is created with an earlier startedAt", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_700_000_000_000);
const task = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "run-backdated-create",
task: "Backdated create",
status: "running",
deliveryStatus: "pending",
startedAt: 1_699_999_999_000,
});
nowSpy.mockRestore();
expect(task).toMatchObject({
createdAt: 1_699_999_999_000,
startedAt: 1_699_999_999_000,
lastEventAt: 1_699_999_999_000,
});
expect(getInspectableTaskAuditSummary().byCode.inconsistent_timestamps).toBe(0);
});
});
it("keeps timestamps monotonic when an update supplies an earlier startedAt", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_700_000_000_000);
const task = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "run-backdated-update",
task: "Backdated update",
status: "queued",
deliveryStatus: "pending",
});
nowSpy.mockReturnValue(1_700_000_001_000);
setTaskTimingById({
taskId: task.taskId,
startedAt: 1_699_999_998_000,
lastEventAt: 1_699_999_998_500,
});
nowSpy.mockRestore();
expect(getTaskById(task.taskId)).toMatchObject({
createdAt: 1_699_999_998_000,
startedAt: 1_699_999_998_000,
lastEventAt: 1_699_999_998_500,
});
expect(getInspectableTaskAuditSummary().byCode.inconsistent_timestamps).toBe(0);
});
});
it("normalizes restored task timestamps before exposing them", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => ({
tasks: new Map([
[
"task-restored-bad-timestamps",
{
taskId: "task-restored-bad-timestamps",
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "run-restored-bad-timestamps",
task: "Restored task with old start time",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "done_only",
createdAt: 200,
startedAt: 100,
lastEventAt: 150,
},
],
]),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
},
});
expect(findTaskByRunId("run-restored-bad-timestamps")).toMatchObject({
createdAt: 100,
startedAt: 100,
lastEventAt: 150,
});
});
});
it("summarizes inspectable task audit findings", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@@ -180,6 +180,50 @@ function cloneTaskRecord(record: TaskRecord): TaskRecord {
return { ...record };
}
function normalizeTaskTimestamps(task: TaskRecord): TaskRecord {
let createdAt = task.createdAt;
for (const candidate of [task.startedAt, task.lastEventAt, task.endedAt]) {
if (typeof candidate === "number" && candidate < createdAt) {
createdAt = candidate;
}
}
const startedAt =
typeof task.startedAt === "number" ? Math.max(task.startedAt, createdAt) : task.startedAt;
const lastEventAt =
typeof task.lastEventAt === "number"
? Math.max(task.lastEventAt, startedAt ?? createdAt)
: task.lastEventAt;
const endedAt =
typeof task.endedAt === "number"
? Math.max(task.endedAt, startedAt ?? createdAt)
: task.endedAt;
if (
createdAt === task.createdAt &&
startedAt === task.startedAt &&
lastEventAt === task.lastEventAt &&
endedAt === task.endedAt
) {
return task;
}
const normalized: TaskRecord = {
...task,
createdAt,
};
if (typeof startedAt === "number") {
normalized.startedAt = startedAt;
}
if (typeof lastEventAt === "number") {
normalized.lastEventAt = lastEventAt;
}
if (typeof endedAt === "number") {
normalized.endedAt = endedAt;
}
return normalized;
}
function cloneTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
return {
...state,
@@ -861,7 +905,7 @@ function restoreTaskRegistryOnce() {
return;
}
for (const [taskId, task] of restored.tasks.entries()) {
tasks.set(taskId, task);
tasks.set(taskId, normalizeTaskTimestamps(task));
}
for (const [taskId, state] of restored.deliveryStates.entries()) {
taskDeliveryStates.set(taskId, state);
@@ -889,7 +933,7 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
if (!current) {
return null;
}
const next = { ...current, ...patch };
const next = normalizeTaskTimestamps({ ...current, ...patch });
if (isTerminalTaskStatus(next.status) && typeof next.cleanupAfter !== "number") {
const terminalAt = next.endedAt ?? next.lastEventAt ?? Date.now();
next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS;
@@ -1453,7 +1497,7 @@ export function createTaskRecord(params: {
scopeKind,
});
const lastEventAt = params.lastEventAt ?? params.startedAt ?? now;
const record: TaskRecord = {
const record: TaskRecord = normalizeTaskTimestamps({
taskId,
runtime: params.runtime,
taskKind: normalizeOptionalString(params.taskKind),
@@ -1481,7 +1525,7 @@ export function createTaskRecord(params: {
status,
terminalOutcome: params.terminalOutcome,
}),
};
});
if (isTerminalTaskStatus(record.status) && typeof record.cleanupAfter !== "number") {
record.cleanupAfter =
(record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS;