From 86328585fac2dfe150fc8036e9d65547a20c3381 Mon Sep 17 00:00:00 2001 From: likewen-tech <314317936@qq.com> Date: Sun, 26 Apr 2026 12:06:33 +0800 Subject: [PATCH] fix(tasks): terminalize gateway agent run ledger Terminalize Gateway-backed async task records from the run result while preserving aborted, failed, cancelled, and lost outcomes.\n\nThanks @likewen-tech. --- CHANGELOG.md | 1 + docs/automation/tasks.md | 13 +- src/cron/service/timer.test.ts | 8 +- src/gateway/server-methods/agent.test.ts | 170 +++++++++++++++++++- src/gateway/server-methods/agent.ts | 51 +++++- src/plugins/loader.test.ts | 4 + src/tasks/detached-task-runtime-contract.ts | 14 ++ src/tasks/detached-task-runtime.test.ts | 30 ++++ src/tasks/detached-task-runtime.ts | 18 +++ src/tasks/runtime-internal.ts | 1 + src/tasks/task-executor.ts | 29 ++-- src/tasks/task-registry.test.ts | 123 ++++++++++++++ src/tasks/task-registry.ts | 41 +++++ 13 files changed, 474 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a392147633..4126ebae4b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ Docs: https://docs.openclaw.ai - Feishu: transcribe inbound voice-note audio with the shared media audio path before agent dispatch and keep raw Feishu `file_key` payloads out of message text. Fixes #67120 and #61876. +- Tasks: terminalize async Gateway agent task records from the Gateway run result while preserving aborted, failed, and cancelled outcomes instead of leaving completed runs stuck as active or lost. (#71905) Thanks @likewen-tech. - ACP: send subagent and async-task completion wakes to external ACP harnesses as plain prompts instead of OpenClaw internal runtime-context envelopes, while keeping those envelopes out of ACP transcripts. diff --git a/docs/automation/tasks.md b/docs/automation/tasks.md index 686cfc057ab..8ed002df224 100644 --- a/docs/automation/tasks.md +++ b/docs/automation/tasks.md @@ -115,12 +115,23 @@ stateDiagram-v2 Transitions happen automatically — when the associated agent run ends, the task status updates to match. +Agent run completion is authoritative for active task records. A successful +detached run finalizes as `succeeded`, ordinary run errors finalize as +`failed`, and timeout or abort outcomes finalize as `timed_out`. If an operator +already cancelled the task, or the runtime already recorded a stronger terminal +state such as `failed`, `timed_out`, or `lost`, a later success signal does not +downgrade that terminal status. + `lost` is runtime-aware: - ACP tasks: backing ACP child session metadata disappeared. - Subagent tasks: backing child session disappeared from the target agent store. - Cron tasks: the cron runtime no longer tracks the job as active. -- CLI tasks: isolated child-session tasks use the child session; chat-backed CLI tasks use the live run context instead, so lingering channel/group/direct session rows do not keep them alive. +- CLI tasks: isolated child-session tasks use the child session; chat-backed + CLI tasks use the live run context instead, so lingering + channel/group/direct session rows do not keep them alive. Gateway-backed + `openclaw agent` runs also finalize from their run result, so completed runs + do not sit active until the sweeper marks them `lost`. ## Delivery and notifications diff --git a/src/cron/service/timer.test.ts b/src/cron/service/timer.test.ts index d38a1dc754c..63ff5ff78f0 100644 --- a/src/cron/service/timer.test.ts +++ b/src/cron/service/timer.test.ts @@ -5,7 +5,7 @@ import { onTimer } from "../../cron/service/timer.js"; import { loadCronStore } from "../../cron/store.js"; import type { CronJob } from "../../cron/types.js"; import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js"; -import { resetTaskRegistryForTests } from "../../tasks/task-registry.js"; +import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; const { logger, makeStorePath } = setupCronServiceSuite({ prefix: "cron-service-timer-seam", @@ -74,6 +74,12 @@ describe("cron service timer seam coverage", () => { expect(job?.state.lastStatus).toBe("ok"); expect(job?.state.runningAtMs).toBeUndefined(); expect(job?.state.nextRunAtMs).toBe(now + 60_000); + expect(findTaskByRunId(`cron:main-heartbeat-job:${now}`)).toMatchObject({ + runtime: "cron", + status: "succeeded", + endedAt: now, + cleanupAfter: expect.any(Number), + }); const delays = timeoutSpy.mock.calls .map(([, delay]) => delay) diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 8c3e667efca..fac604e6fbb 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -6,7 +6,11 @@ import { resetDetachedTaskLifecycleRuntimeForTests, setDetachedTaskLifecycleRuntime, } from "../../tasks/detached-task-runtime.js"; -import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; +import { + findTaskByRunId, + markTaskTerminalById, + resetTaskRegistryForTests, +} from "../../tasks/task-registry.js"; import { withTempDir } from "../../test-helpers/temp-dir.js"; import { agentHandlers } from "./agent.js"; import { chatHandlers } from "./chat.js"; @@ -994,7 +998,7 @@ describe("gateway agent handler", () => { expect(callArgs.runContext?.messageChannel).toBe("webchat"); }); - it("tracks async gateway agent runs in the shared task registry", async () => { + it("terminalizes successful async gateway agent runs in the shared task registry", async () => { await withTempDir({ prefix: "openclaw-gateway-agent-task-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); @@ -1009,10 +1013,148 @@ describe("gateway agent handler", () => { { reqId: "task-registry-agent-run" }, ); - expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({ - runtime: "cli", - childSessionKey: "agent:main:main", - status: "running", + await waitForAssertion(() => { + expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({ + runtime: "cli", + childSessionKey: "agent:main:main", + status: "succeeded", + terminalSummary: "completed", + }); + }); + }); + }); + + it("terminalizes failed async gateway agent runs in the shared task registry", async () => { + await withTempDir({ prefix: "openclaw-gateway-agent-task-error-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + primeMainAgentRun(); + mocks.agentCommand.mockRejectedValueOnce(new Error("agent unavailable")); + + await invokeAgent( + { + message: "background cli task", + sessionKey: "agent:main:main", + idempotencyKey: "task-registry-agent-run-error", + }, + { reqId: "task-registry-agent-run-error" }, + ); + + await waitForAssertion(() => { + expect(findTaskByRunId("task-registry-agent-run-error")).toMatchObject({ + runtime: "cli", + childSessionKey: "agent:main:main", + status: "failed", + error: "Error: agent unavailable", + }); + }); + }); + }); + + it("preserves aborted async gateway agent runs as timed out", async () => { + await withTempDir({ prefix: "openclaw-gateway-agent-task-aborted-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + primeMainAgentRun(); + mocks.agentCommand.mockResolvedValueOnce({ + payloads: [], + meta: { durationMs: 100, aborted: true }, + }); + + await invokeAgent( + { + message: "background cli task", + sessionKey: "agent:main:main", + idempotencyKey: "task-registry-agent-run-aborted", + }, + { reqId: "task-registry-agent-run-aborted" }, + ); + + await waitForAssertion(() => { + expect(findTaskByRunId("task-registry-agent-run-aborted")).toMatchObject({ + runtime: "cli", + childSessionKey: "agent:main:main", + status: "timed_out", + terminalSummary: "aborted", + }); + }); + }); + }); + + it("classifies aborted async gateway agent rejections as timed out", async () => { + await withTempDir({ prefix: "openclaw-gateway-agent-task-abort-error-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + primeMainAgentRun(); + const abortError = new Error("This operation was aborted"); + abortError.name = "AbortError"; + mocks.agentCommand.mockRejectedValueOnce(abortError); + + await invokeAgent( + { + message: "background cli task", + sessionKey: "agent:main:main", + idempotencyKey: "task-registry-agent-run-abort-error", + }, + { reqId: "task-registry-agent-run-abort-error" }, + ); + + await waitForAssertion(() => { + expect(findTaskByRunId("task-registry-agent-run-abort-error")).toMatchObject({ + runtime: "cli", + childSessionKey: "agent:main:main", + status: "timed_out", + error: "AbortError: This operation was aborted", + }); + }); + }); + }); + + it("does not overwrite operator-cancelled async gateway agent tasks after late completion", async () => { + await withTempDir({ prefix: "openclaw-gateway-agent-task-cancelled-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + primeMainAgentRun(); + let resolveRun: (value: { + payloads: Array<{ text: string }>; + meta: { durationMs: number }; + }) => void; + const pending = new Promise<{ + payloads: Array<{ text: string }>; + meta: { durationMs: number }; + }>((resolve) => { + resolveRun = resolve; + }); + mocks.agentCommand.mockReturnValueOnce(pending); + + await invokeAgent( + { + message: "background cli task", + sessionKey: "agent:main:main", + idempotencyKey: "task-registry-agent-run-cancelled", + }, + { reqId: "task-registry-agent-run-cancelled" }, + ); + + const task = findTaskByRunId("task-registry-agent-run-cancelled"); + expect(task).toMatchObject({ status: "running" }); + const cancelledAt = (task?.startedAt ?? Date.now()) + 1; + markTaskTerminalById({ + taskId: task!.taskId, + status: "cancelled", + endedAt: cancelledAt, + lastEventAt: cancelledAt, + terminalSummary: "Cancelled by operator.", + }); + + resolveRun!({ payloads: [{ text: "ok" }], meta: { durationMs: 100 } }); + + await waitForAssertion(() => { + expect(findTaskByRunId("task-registry-agent-run-cancelled")).toMatchObject({ + status: "cancelled", + endedAt: cancelledAt, + terminalSummary: "Cancelled by operator.", + }); }); }); }); @@ -1250,10 +1392,15 @@ describe("gateway agent handler", () => { (...args: Parameters) => defaultRuntime.createRunningTaskRun(...args), ); + const finalizeTaskRunByRunIdSpy = vi.fn( + (...args: Parameters>) => + defaultRuntime.finalizeTaskRunByRunId!(...args), + ); setDetachedTaskLifecycleRuntime({ ...defaultRuntime, createRunningTaskRun: createRunningTaskRunSpy, + finalizeTaskRunByRunId: finalizeTaskRunByRunIdSpy, }); await invokeAgent( @@ -1274,10 +1421,19 @@ describe("gateway agent handler", () => { task: expect.stringContaining("background cli seam task"), }), ); + expect(finalizeTaskRunByRunIdSpy).toHaveBeenCalledWith( + expect.objectContaining({ + runtime: "cli", + runId: "task-registry-agent-seam", + status: "succeeded", + terminalSummary: "completed", + }), + ); expect(findTaskByRunId("task-registry-agent-seam")).toMatchObject({ runtime: "cli", childSessionKey: "agent:main:main", - status: "running", + status: "succeeded", + terminalSummary: "completed", }); }); }); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index a56819cc5a6..4a338db5221 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { listAgentIds, resolveAgentWorkspaceDir } from "../../agents/agent-scope.js"; +import { isTimeoutError } from "../../agents/failover-error.js"; import { resolveAgentAvatar, resolvePublicAgentAvatarSource, @@ -41,6 +42,7 @@ import { } from "../../infra/outbound/agent-delivery.js"; import { shouldDowngradeDeliveryToSessionOnly } from "../../infra/outbound/best-effort-delivery.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; +import { isAbortError } from "../../infra/unhandled-rejections.js"; import type { PromptImageOrderEntry } from "../../media/prompt-image-order.js"; import { classifySessionKeyShape, @@ -55,7 +57,8 @@ import { normalizeOptionalLowercaseString, normalizeOptionalString, } from "../../shared/string-coerce.js"; -import { createRunningTaskRun } from "../../tasks/detached-task-runtime.js"; +import { createRunningTaskRun, finalizeTaskRunByRunId } from "../../tasks/detached-task-runtime.js"; +import type { TaskStatus } from "../../tasks/task-registry.types.js"; import { mergeDeliveryContext, normalizeDeliveryContext, @@ -237,6 +240,35 @@ function emitSessionsChanged( ); } +type GatewayAgentTaskTerminalStatus = Extract< + TaskStatus, + "succeeded" | "failed" | "timed_out" | "cancelled" +>; + +function resolveFailedTrackedAgentTaskStatus(error: unknown): GatewayAgentTaskTerminalStatus { + return isAbortError(error) || isTimeoutError(error) ? "timed_out" : "failed"; +} + +function tryFinalizeTrackedAgentTask(params: { + runId: string; + status: GatewayAgentTaskTerminalStatus; + error?: string; + terminalSummary?: string; +}): void { + try { + finalizeTaskRunByRunId({ + runId: params.runId, + runtime: "cli", + status: params.status, + endedAt: Date.now(), + ...(params.error !== undefined ? { error: params.error } : {}), + ...(params.terminalSummary !== undefined ? { terminalSummary: params.terminalSummary } : {}), + }); + } catch { + // Best-effort only: background task tracking must not block agent runs. + } +} + function dispatchAgentRunFromGateway(params: { ingressOpts: Parameters[0]; runId: string; @@ -278,6 +310,14 @@ function dispatchAgentRunFromGateway(params: { } void agentCommandFromIngress(params.ingressOpts, defaultRuntime, params.context.deps) .then((result) => { + if (shouldTrackTask) { + const aborted = result.meta.aborted === true; + tryFinalizeTrackedAgentTask({ + runId: params.runId, + status: aborted ? "timed_out" : "succeeded", + terminalSummary: aborted ? "aborted" : "completed", + }); + } const payload = { runId: params.runId, status: "ok" as const, @@ -298,6 +338,15 @@ function dispatchAgentRunFromGateway(params: { params.respond(true, payload, undefined, { runId: params.runId }); }) .catch((err) => { + if (shouldTrackTask) { + const error = String(err); + tryFinalizeTrackedAgentTask({ + runId: params.runId, + status: resolveFailedTrackedAgentTaskStatus(err), + error, + terminalSummary: error, + }); + } const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const payload = { runId: params.runId, diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index 855556658d9..17a1ebff203 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -99,6 +99,7 @@ function createDetachedTaskRuntimeStub(id: string): DetachedTaskLifecycleRuntime createRunningTaskRun: () => fail("createRunningTaskRun"), startTaskRunByRunId: () => fail("startTaskRunByRunId"), recordTaskRunProgressByRunId: () => fail("recordTaskRunProgressByRunId"), + finalizeTaskRunByRunId: () => fail("finalizeTaskRunByRunId"), completeTaskRunByRunId: () => fail("completeTaskRunByRunId"), failTaskRunByRunId: () => fail("failTaskRunByRunId"), setDetachedTaskDeliveryStatusByRunId: () => fail("setDetachedTaskDeliveryStatusByRunId"), @@ -3301,6 +3302,7 @@ module.exports = { id: "throws-after-import", register() {} };`, createRunningTaskRun() { throw new Error("snapshot createRunningTaskRun should not run"); }, startTaskRunByRunId() { throw new Error("snapshot startTaskRunByRunId should not run"); }, recordTaskRunProgressByRunId() { throw new Error("snapshot recordTaskRunProgressByRunId should not run"); }, + finalizeTaskRunByRunId() { throw new Error("snapshot finalizeTaskRunByRunId should not run"); }, completeTaskRunByRunId() { throw new Error("snapshot completeTaskRunByRunId should not run"); }, failTaskRunByRunId() { throw new Error("snapshot failTaskRunByRunId should not run"); }, setDetachedTaskDeliveryStatusByRunId() { throw new Error("snapshot setDetachedTaskDeliveryStatusByRunId should not run"); }, @@ -3345,6 +3347,7 @@ module.exports = { id: "throws-after-import", register() {} };`, createRunningTaskRun() { throw new Error("failing createRunningTaskRun should not run"); }, startTaskRunByRunId() { throw new Error("failing startTaskRunByRunId should not run"); }, recordTaskRunProgressByRunId() { throw new Error("failing recordTaskRunProgressByRunId should not run"); }, + finalizeTaskRunByRunId() { throw new Error("failing finalizeTaskRunByRunId should not run"); }, completeTaskRunByRunId() { throw new Error("failing completeTaskRunByRunId should not run"); }, failTaskRunByRunId() { throw new Error("failing failTaskRunByRunId should not run"); }, setDetachedTaskDeliveryStatusByRunId() { throw new Error("failing setDetachedTaskDeliveryStatusByRunId should not run"); }, @@ -3386,6 +3389,7 @@ module.exports = { id: "throws-after-import", register() {} };`, createRunningTaskRun() { throw new Error("cached createRunningTaskRun should not run"); }, startTaskRunByRunId() { throw new Error("cached startTaskRunByRunId should not run"); }, recordTaskRunProgressByRunId() { throw new Error("cached recordTaskRunProgressByRunId should not run"); }, + finalizeTaskRunByRunId() { throw new Error("cached finalizeTaskRunByRunId should not run"); }, completeTaskRunByRunId() { throw new Error("cached completeTaskRunByRunId should not run"); }, failTaskRunByRunId() { throw new Error("cached failTaskRunByRunId should not run"); }, setDetachedTaskDeliveryStatusByRunId() { throw new Error("cached setDetachedTaskDeliveryStatusByRunId should not run"); }, diff --git a/src/tasks/detached-task-runtime-contract.ts b/src/tasks/detached-task-runtime-contract.ts index 018f8115a24..82455277e3e 100644 --- a/src/tasks/detached-task-runtime-contract.ts +++ b/src/tasks/detached-task-runtime-contract.ts @@ -78,6 +78,19 @@ export type DetachedTaskFailParams = { terminalSummary?: string | null; }; +export type DetachedTaskFinalizeParams = { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + status: Extract; + endedAt: number; + lastEventAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; + terminalOutcome?: TaskTerminalOutcome | null; +}; + export type DetachedTaskDeliveryStatusParams = { runId: string; runtime?: TaskRuntime; @@ -113,6 +126,7 @@ export type DetachedTaskLifecycleRuntime = { createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord; startTaskRunByRunId: (params: DetachedTaskStartParams) => TaskRecord[]; recordTaskRunProgressByRunId: (params: DetachedTaskProgressParams) => TaskRecord[]; + finalizeTaskRunByRunId?: (params: DetachedTaskFinalizeParams) => TaskRecord[]; completeTaskRunByRunId: (params: DetachedTaskCompleteParams) => TaskRecord[]; failTaskRunByRunId: (params: DetachedTaskFailParams) => TaskRecord[]; setDetachedTaskDeliveryStatusByRunId: (params: DetachedTaskDeliveryStatusParams) => TaskRecord[]; diff --git a/src/tasks/detached-task-runtime.test.ts b/src/tasks/detached-task-runtime.test.ts index aadf64048a4..5285253e6ed 100644 --- a/src/tasks/detached-task-runtime.test.ts +++ b/src/tasks/detached-task-runtime.test.ts @@ -5,6 +5,7 @@ import { createQueuedTaskRun, createRunningTaskRun, failTaskRunByRunId, + finalizeTaskRunByRunId, getDetachedTaskLifecycleRuntime, getDetachedTaskLifecycleRuntimeRegistration, registerDetachedTaskRuntime, @@ -76,6 +77,7 @@ describe("detached-task-runtime", () => { createRunningTaskRun: vi.fn(() => runningTask), startTaskRunByRunId: vi.fn(() => updatedTasks), recordTaskRunProgressByRunId: vi.fn(() => updatedTasks), + finalizeTaskRunByRunId: vi.fn(() => updatedTasks), completeTaskRunByRunId: vi.fn(() => updatedTasks), failTaskRunByRunId: vi.fn(() => updatedTasks), setDetachedTaskDeliveryStatusByRunId: vi.fn(() => updatedTasks), @@ -111,6 +113,7 @@ describe("detached-task-runtime", () => { startTaskRunByRunId({ runId: "run-running", startedAt: 10 }); recordTaskRunProgressByRunId({ runId: "run-running", lastEventAt: 20 }); + finalizeTaskRunByRunId({ runId: "run-running", status: "succeeded", endedAt: 25 }); completeTaskRunByRunId({ runId: "run-running", endedAt: 30 }); failTaskRunByRunId({ runId: "run-running", endedAt: 40 }); setDetachedTaskDeliveryStatusByRunId({ @@ -134,6 +137,9 @@ describe("detached-task-runtime", () => { expect(fakeRuntime.recordTaskRunProgressByRunId).toHaveBeenCalledWith( expect.objectContaining({ runId: "run-running", lastEventAt: 20 }), ); + expect(fakeRuntime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ runId: "run-running", status: "succeeded", endedAt: 25 }), + ); expect(fakeRuntime.completeTaskRunByRunId).toHaveBeenCalledWith( expect.objectContaining({ runId: "run-running", endedAt: 30 }), ); @@ -166,6 +172,30 @@ describe("detached-task-runtime", () => { expect(getDetachedTaskLifecycleRuntime()).toBe(runtime); }); + it("falls back to legacy complete and fail hooks when a runtime has no finalizer", () => { + const defaultRuntime = getDetachedTaskLifecycleRuntime(); + const completeTaskRunByRunIdSpy = vi.fn(() => []); + const failTaskRunByRunIdSpy = vi.fn(() => []); + const legacyRuntime = { + ...defaultRuntime, + completeTaskRunByRunId: completeTaskRunByRunIdSpy, + failTaskRunByRunId: failTaskRunByRunIdSpy, + }; + delete legacyRuntime.finalizeTaskRunByRunId; + + setDetachedTaskLifecycleRuntime(legacyRuntime); + + finalizeTaskRunByRunId({ runId: "legacy-ok", status: "succeeded", endedAt: 10 }); + finalizeTaskRunByRunId({ runId: "legacy-timeout", status: "timed_out", endedAt: 20 }); + + expect(completeTaskRunByRunIdSpy).toHaveBeenCalledWith( + expect.objectContaining({ runId: "legacy-ok", status: "succeeded", endedAt: 10 }), + ); + expect(failTaskRunByRunIdSpy).toHaveBeenCalledWith( + expect.objectContaining({ runId: "legacy-timeout", status: "timed_out", endedAt: 20 }), + ); + }); + describe("tryRecoverTaskBeforeMarkLost", () => { it("returns recovered when hook returns recovered true", async () => { const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" }); diff --git a/src/tasks/detached-task-runtime.ts b/src/tasks/detached-task-runtime.ts index a62042177c8..254130b0d74 100644 --- a/src/tasks/detached-task-runtime.ts +++ b/src/tasks/detached-task-runtime.ts @@ -2,6 +2,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import type { DetachedTaskRecoveryAttemptParams, DetachedTaskRecoveryAttemptResult, + DetachedTaskFinalizeParams, DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration, } from "./detached-task-runtime-contract.js"; @@ -17,10 +18,12 @@ import { createQueuedTaskRun as createQueuedTaskRunFromExecutor, createRunningTaskRun as createRunningTaskRunFromExecutor, failTaskRunByRunId as failTaskRunByRunIdFromExecutor, + finalizeTaskRunByRunId as finalizeTaskRunByRunIdFromExecutor, recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdFromExecutor, setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor, startTaskRunByRunId as startTaskRunByRunIdFromExecutor, } from "./task-executor.js"; +import type { TaskRecord } from "./task-registry.types.js"; const log = createSubsystemLogger("tasks/detached-runtime"); const DETACHED_TASK_RECOVERY_WARN_MS = 5_000; @@ -32,6 +35,7 @@ const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = { createRunningTaskRun: createRunningTaskRunFromExecutor, startTaskRunByRunId: startTaskRunByRunIdFromExecutor, recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdFromExecutor, + finalizeTaskRunByRunId: finalizeTaskRunByRunIdFromExecutor, completeTaskRunByRunId: completeTaskRunByRunIdFromExecutor, failTaskRunByRunId: failTaskRunByRunIdFromExecutor, setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdFromExecutor, @@ -87,6 +91,20 @@ export function recordTaskRunProgressByRunId( return getDetachedTaskLifecycleRuntime().recordTaskRunProgressByRunId(...args); } +export function finalizeTaskRunByRunId(params: DetachedTaskFinalizeParams): TaskRecord[] { + const runtime = getDetachedTaskLifecycleRuntime(); + if (runtime.finalizeTaskRunByRunId) { + return runtime.finalizeTaskRunByRunId(params); + } + if (params.status === "succeeded") { + return runtime.completeTaskRunByRunId(params); + } + return runtime.failTaskRunByRunId({ + ...params, + status: params.status, + }); +} + export function completeTaskRunByRunId( ...args: Parameters ): ReturnType { diff --git a/src/tasks/runtime-internal.ts b/src/tasks/runtime-internal.ts index c2b245002f8..02027db6c8f 100644 --- a/src/tasks/runtime-internal.ts +++ b/src/tasks/runtime-internal.ts @@ -8,6 +8,7 @@ export { findLatestTaskForFlowId, findLatestTaskForRelatedSessionKey, findTaskByRunId, + finalizeTaskRunByRunId, getTaskById, getTaskRegistrySnapshot, getTaskRegistrySummary, diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index f8c60450682..ce09f698eda 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -3,6 +3,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import type { DetachedRunningTaskCreateParams, DetachedTaskCreateParams, + DetachedTaskFinalizeParams, } from "./detached-task-runtime-contract.js"; import { getRegisteredDetachedTaskLifecycleRuntime } from "./detached-task-runtime-state.js"; import { @@ -15,7 +16,7 @@ import { listTasksForFlowId, markTaskLostById, markTaskRunningByRunId, - markTaskTerminalByRunId, + finalizeTaskRunByRunId as finalizeTaskRunByRunIdInRegistry, recordTaskProgressByRunId, setTaskRunDeliveryStatusByRunId, } from "./runtime-internal.js"; @@ -168,19 +169,16 @@ export function completeTaskRunByRunId(params: { terminalSummary?: string | null; terminalOutcome?: TaskTerminalOutcome | null; }) { - return markTaskTerminalByRunId({ - runId: params.runId, - runtime: params.runtime, - sessionKey: params.sessionKey, + return finalizeTaskRunByRunId({ + ...params, status: "succeeded", - endedAt: params.endedAt, - lastEventAt: params.lastEventAt, - progressSummary: params.progressSummary, - terminalSummary: params.terminalSummary, - terminalOutcome: params.terminalOutcome, }); } +export function finalizeTaskRunByRunId(params: DetachedTaskFinalizeParams) { + return finalizeTaskRunByRunIdInRegistry(params); +} + export function failTaskRunByRunId(params: { runId: string; runtime?: TaskRuntime; @@ -192,16 +190,9 @@ export function failTaskRunByRunId(params: { progressSummary?: string | null; terminalSummary?: string | null; }) { - return markTaskTerminalByRunId({ - runId: params.runId, - runtime: params.runtime, - sessionKey: params.sessionKey, + return finalizeTaskRunByRunId({ + ...params, status: params.status ?? "failed", - endedAt: params.endedAt, - lastEventAt: params.lastEventAt, - error: params.error, - progressSummary: params.progressSummary, - terminalSummary: params.terminalSummary, }); } diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 51af5da44a9..b4dfb9c3edc 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -32,6 +32,7 @@ import { maybeDeliverTaskTerminalUpdate, markTaskRunningByRunId, markTaskTerminalById, + markTaskTerminalByRunId, recordTaskProgressByRunId, resetTaskRegistryControlRuntimeForTests, resetTaskRegistryDeliveryRuntimeForTests, @@ -332,6 +333,128 @@ describe("task-registry", () => { }); }); + it("keeps stronger run-scoped terminal states when a late success arrives", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + createTaskRecord({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:main", + runId: "run-timeout-then-success", + task: "Do the thing", + status: "running", + deliveryStatus: "not_applicable", + startedAt: 100, + }); + + emitAgentEvent({ + runId: "run-timeout-then-success", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 200, + aborted: true, + }, + }); + markTaskTerminalByRunId({ + runId: "run-timeout-then-success", + runtime: "cli", + status: "succeeded", + endedAt: 300, + terminalSummary: "completed", + }); + + expect(findTaskByRunId("run-timeout-then-success")).toMatchObject({ + status: "timed_out", + endedAt: 200, + }); + }); + }); + + it("does not downgrade failed run-scoped tasks when a late success arrives", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + createTaskRecord({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:main", + runId: "run-fail-then-success", + task: "Deliver result", + status: "running", + deliveryStatus: "not_applicable", + startedAt: 100, + }); + + markTaskTerminalByRunId({ + runId: "run-fail-then-success", + runtime: "cli", + status: "failed", + endedAt: 200, + error: "delivery failed", + }); + markTaskTerminalByRunId({ + runId: "run-fail-then-success", + runtime: "cli", + status: "succeeded", + endedAt: 300, + terminalSummary: "completed", + }); + + expect(findTaskByRunId("run-fail-then-success")).toMatchObject({ + status: "failed", + endedAt: 200, + error: "delivery failed", + }); + }); + }); + + it("lets delivery failure upgrade a lifecycle success", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + createTaskRecord({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:main", + runId: "run-success-then-fail", + task: "Deliver result", + status: "running", + deliveryStatus: "not_applicable", + startedAt: 100, + }); + + emitAgentEvent({ + runId: "run-success-then-fail", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 200, + }, + }); + markTaskTerminalByRunId({ + runId: "run-success-then-fail", + runtime: "cli", + status: "failed", + endedAt: 300, + error: "delivery failed", + }); + + expect(findTaskByRunId("run-success-then-fail")).toMatchObject({ + status: "failed", + endedAt: 300, + error: "delivery failed", + }); + }); + }); + it("summarizes task pressure by status and runtime", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index fcab081ed73..f2a08464bcb 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -399,6 +399,22 @@ function normalizeTaskTerminalOutcome( return value === "succeeded" || value === "blocked" ? value : undefined; } +function shouldApplyRunScopedStatusUpdate(params: { + currentStatus: TaskStatus; + nextStatus: TaskStatus; +}): boolean { + if (params.currentStatus === params.nextStatus) { + return true; + } + if (!isTerminalTaskStatus(params.currentStatus)) { + return true; + } + if (!isTerminalTaskStatus(params.nextStatus)) { + return false; + } + return params.currentStatus === "succeeded" && params.nextStatus !== "lost"; +} + function resolveTaskTerminalOutcome(params: { status: TaskStatus; terminalOutcome?: TaskTerminalOutcome | null; @@ -1584,6 +1600,15 @@ function updateTaskStateByRunId(params: { for (const current of matches) { const patch: Partial = {}; const nextStatus = params.status ? normalizeTaskStatus(params.status) : current.status; + if ( + params.status && + !shouldApplyRunScopedStatusUpdate({ + currentStatus: current.status, + nextStatus, + }) + ) { + continue; + } const eventAt = params.lastEventAt ?? params.endedAt ?? Date.now(); if (params.status) { patch.status = normalizeTaskStatus(params.status); @@ -1710,6 +1735,22 @@ export function markTaskTerminalByRunId(params: { progressSummary?: string | null; terminalSummary?: string | null; terminalOutcome?: TaskTerminalOutcome | null; +}) { + return finalizeTaskRunByRunId(params); +} + +export function finalizeTaskRunByRunId(params: { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + status: Extract; + startedAt?: number; + endedAt: number; + lastEventAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; + terminalOutcome?: TaskTerminalOutcome | null; }) { return updateTaskStateByRunId({ runId: params.runId,