diff --git a/CHANGELOG.md b/CHANGELOG.md index aae6a215a75..a3f3c9f38d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai - Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash. - Agents/bootstrap: pass pending BOOTSTRAP.md contents through the first-run user prompt while keeping them out of privileged system context, and show limited bootstrap guidance when workspace file access is unavailable. Fixes #73622. Thanks @mark1010. - ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, and close terminal stale ACP sessions when no active binding remains, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26. +- Tasks: keep terminal mirrored TaskFlow timestamps pinned to task completion time and let maintenance repair stale mirrors, so ACP terminal delivery updates no longer leave inconsistent flow audits. Refs #73609. Thanks @joerod26. - Gateway/sessions: add conservative stuck-session recovery that releases only stale session lanes while active embedded runs, reply operations, and lane tasks remain serialized, so queued follow-ups can drain without aborting legitimate long-running turns. Refs #73581, #73655, #73652, #73705, #73647, #73602, #73592, and #73601. Thanks @WS-Q0758, @bryangauvin, @spenceryang1996-dot, @bmilne1981, @mattmcintyre, @Vksh07, and @Spolen23. - Plugins: cache unchanged plugin manifest loads by file signature, reducing repeated JSON/JSON5 parsing and manifest normalization in bursty startup and runtime registry paths. Refs #73532 and #73647; carries forward #73678. Thanks @TheDutchRuler. - Agents/model selection: resolve slash-form aliases before provider/model parsing and keep alias-resolved primary models subject to transient provider cooldowns, so cron and persisted sessions do not retry cooled-down raw aliases. Fixes #73573 and #73657. Thanks @akai-shuuichi and @hashslingers. diff --git a/src/tasks/task-flow-registry.maintenance.test.ts b/src/tasks/task-flow-registry.maintenance.test.ts index 158b9f9107f..6c796a0ee79 100644 --- a/src/tasks/task-flow-registry.maintenance.test.ts +++ b/src/tasks/task-flow-registry.maintenance.test.ts @@ -2,6 +2,7 @@ import { afterEach, describe, expect, it } from "vitest"; import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; import { createRunningTaskRun } from "./task-executor.js"; import { + createFlowRecord, createManagedTaskFlow, getTaskFlowById, listTaskFlowRecords, @@ -9,6 +10,7 @@ import { resetTaskFlowRegistryForTests, } from "./task-flow-registry.js"; import { + getInspectableTaskFlowAuditSummary, previewTaskFlowRegistryMaintenance, runTaskFlowRegistryMaintenance, } from "./task-flow-registry.maintenance.js"; @@ -109,6 +111,36 @@ describe("task-flow-registry maintenance", () => { }); }); + it("repairs terminal mirrored flows whose delivery updates outlived endedAt", async () => { + await withTaskFlowMaintenanceStateDir(async () => { + const flow = createFlowRecord({ + syncMode: "task_mirrored", + ownerKey: "agent:main:main", + goal: "Failed ACP task", + status: "failed", + createdAt: 100, + updatedAt: 250, + endedAt: 200, + }); + + expect(getInspectableTaskFlowAuditSummary().byCode.inconsistent_timestamps).toBe(1); + expect(previewTaskFlowRegistryMaintenance()).toEqual({ + reconciled: 1, + pruned: 0, + }); + + expect(await runTaskFlowRegistryMaintenance()).toEqual({ + reconciled: 1, + pruned: 0, + }); + expect(getTaskFlowById(flow.flowId)).toMatchObject({ + endedAt: 200, + updatedAt: 200, + }); + expect(getInspectableTaskFlowAuditSummary().byCode.inconsistent_timestamps).toBe(0); + }); + }); + it("does not finalize cancel-requested flows while a child task is still active", async () => { await withTaskFlowMaintenanceStateDir(async () => { const flow = createManagedTaskFlow({ diff --git a/src/tasks/task-flow-registry.maintenance.ts b/src/tasks/task-flow-registry.maintenance.ts index 7f21543556d..d403d03cfba 100644 --- a/src/tasks/task-flow-registry.maintenance.ts +++ b/src/tasks/task-flow-registry.maintenance.ts @@ -22,6 +22,7 @@ export type TaskFlowRegistryMaintenanceSummary = { function isTerminalFlow(flow: TaskFlowRecord): boolean { return ( flow.status === "succeeded" || + flow.status === "blocked" || flow.status === "failed" || flow.status === "cancelled" || flow.status === "lost" @@ -88,6 +89,40 @@ function finalizeCancelledFlow(flow: TaskFlowRecord, now: number): boolean { return false; } +function shouldRepairTerminalMirroredFlowTimestamp(flow: TaskFlowRecord): boolean { + if (flow.syncMode !== "task_mirrored" || !isTerminalFlow(flow)) { + return false; + } + if (flow.endedAt == null || flow.endedAt < flow.createdAt) { + return false; + } + return flow.updatedAt > flow.endedAt; +} + +function repairTerminalMirroredFlowTimestamp(flow: TaskFlowRecord): boolean { + let current = flow; + for (let attempt = 0; attempt < 2; attempt += 1) { + if (!shouldRepairTerminalMirroredFlowTimestamp(current)) { + return false; + } + const result = updateFlowRecordByIdExpectedRevision({ + flowId: current.flowId, + expectedRevision: current.revision, + patch: { + updatedAt: current.endedAt, + }, + }); + if (result.applied) { + return true; + } + if (result.reason === "not_found" || !result.current) { + return false; + } + current = result.current; + } + return false; +} + export function getInspectableTaskFlowAuditSummary(): TaskFlowAuditSummary { return summarizeTaskFlowAuditFindings(listTaskFlowAuditFindings()); } @@ -97,6 +132,10 @@ export function previewTaskFlowRegistryMaintenance(): TaskFlowRegistryMaintenanc let reconciled = 0; let pruned = 0; for (const flow of listTaskFlowRecords()) { + if (shouldRepairTerminalMirroredFlowTimestamp(flow)) { + reconciled += 1; + continue; + } if (shouldFinalizeCancelledFlow(flow)) { reconciled += 1; continue; @@ -117,6 +156,12 @@ export async function runTaskFlowRegistryMaintenance(): Promise { status: "blocked", blockedTaskId: "task-blocked", blockedSummary: "Writable session required.", + endedAt: 200, + updatedAt: 200, + }); + + const delivered = syncFlowFromTask({ + taskId: "task-blocked", + parentFlowId: mirrored.flowId, + status: "succeeded", + terminalOutcome: "blocked", + notifyPolicy: "done_only", + label: "Fix permissions", + task: "Fix permissions", + lastEventAt: 250, + endedAt: 200, + terminalSummary: "Writable session required.", + }); + expect(delivered).toMatchObject({ + flowId: mirrored.flowId, + status: "blocked", + endedAt: 200, + updatedAt: 200, + }); + + const terminalCreated = createTaskFlowForTask({ + task: { + ownerKey: "agent:main:main", + taskId: "task-failed", + notifyPolicy: "done_only", + status: "failed", + label: "Fail permissions", + task: "Fail permissions", + createdAt: 100, + lastEventAt: 300, + endedAt: 200, + }, + }); + expect(terminalCreated).toMatchObject({ + status: "failed", + endedAt: 200, + updatedAt: 200, }); const managed = createManagedTaskFlow({ diff --git a/src/tasks/task-flow-registry.ts b/src/tasks/task-flow-registry.ts index 98286813d5c..e462afc6995 100644 --- a/src/tasks/task-flow-registry.ts +++ b/src/tasks/task-flow-registry.ts @@ -211,6 +211,27 @@ export function deriveTaskFlowStatusFromTask( return "failed"; } +function isTerminalTaskFlowStatus(status: TaskFlowStatus): boolean { + return ( + status === "succeeded" || + status === "blocked" || + status === "failed" || + status === "cancelled" || + status === "lost" + ); +} + +function resolveTaskMirroredFlowTiming( + task: Pick, + isTerminal: boolean, +): { updatedAt: number; endedAt?: number } { + if (!isTerminal) { + return { updatedAt: task.lastEventAt ?? task.createdAt }; + } + const endedAt = task.endedAt ?? task.lastEventAt ?? task.createdAt; + return { updatedAt: endedAt, endedAt }; +} + function ensureFlowRegistryReady() { if (restoreAttempted) { return; @@ -383,15 +404,10 @@ export function createTaskFlowForTask(params: { requesterOrigin?: TaskFlowRecord["requesterOrigin"]; }): TaskFlowRecord { const terminalFlowStatus = deriveTaskFlowStatusFromTask(params.task); - const isTerminal = - terminalFlowStatus === "succeeded" || - terminalFlowStatus === "blocked" || - terminalFlowStatus === "failed" || - terminalFlowStatus === "cancelled" || - terminalFlowStatus === "lost"; - const endedAt = isTerminal - ? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt) - : undefined; + const timing = resolveTaskMirroredFlowTiming( + params.task, + isTerminalTaskFlowStatus(terminalFlowStatus), + ); return createFlowRecord({ syncMode: "task_mirrored", ownerKey: params.task.ownerKey, @@ -404,8 +420,8 @@ export function createTaskFlowForTask(params: { terminalFlowStatus === "blocked" ? normalizeOptionalString(params.task.taskId) : undefined, blockedSummary: resolveFlowBlockedSummary(params.task), createdAt: params.task.createdAt, - updatedAt: params.task.lastEventAt ?? params.task.createdAt, - ...(endedAt !== undefined ? { endedAt } : {}), + updatedAt: timing.updatedAt, + ...(timing.endedAt !== undefined ? { endedAt: timing.endedAt } : {}), }); } @@ -597,12 +613,15 @@ export function syncFlowFromTask( return flow; } const terminalFlowStatus = deriveTaskFlowStatusFromTask(task); - const isTerminal = - terminalFlowStatus === "succeeded" || - terminalFlowStatus === "blocked" || - terminalFlowStatus === "failed" || - terminalFlowStatus === "cancelled" || - terminalFlowStatus === "lost"; + const isTerminal = isTerminalTaskFlowStatus(terminalFlowStatus); + const timing = resolveTaskMirroredFlowTiming( + { + createdAt: flow.createdAt, + lastEventAt: task.lastEventAt, + endedAt: task.endedAt, + }, + isTerminal, + ); return updateFlowRecordByIdUnchecked(flowId, { status: terminalFlowStatus, notifyPolicy: task.notifyPolicy, @@ -611,10 +630,10 @@ export function syncFlowFromTask( blockedSummary: terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null, waitJson: null, - updatedAt: task.lastEventAt ?? Date.now(), + updatedAt: timing.updatedAt, ...(isTerminal ? { - endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(), + endedAt: timing.endedAt ?? timing.updatedAt, } : { endedAt: null }), });