mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:30:42 +00:00
fix(tasks): terminalize gateway agent run ledger
Terminalize Gateway-backed async task records from the run result while preserving aborted, failed, cancelled, and lost outcomes.\n\nThanks @likewen-tech.
This commit is contained in:
@@ -84,6 +84,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Feishu: transcribe inbound voice-note audio with the shared media audio path
|
||||
before agent dispatch and keep raw Feishu `file_key` payloads out of message
|
||||
text. Fixes #67120 and #61876.
|
||||
- Tasks: terminalize async Gateway agent task records from the Gateway run result while preserving aborted, failed, and cancelled outcomes instead of leaving completed runs stuck as active or lost. (#71905) Thanks @likewen-tech.
|
||||
- ACP: send subagent and async-task completion wakes to external ACP harnesses as
|
||||
plain prompts instead of OpenClaw internal runtime-context envelopes, while
|
||||
keeping those envelopes out of ACP transcripts.
|
||||
|
||||
@@ -115,12 +115,23 @@ stateDiagram-v2
|
||||
|
||||
Transitions happen automatically — when the associated agent run ends, the task status updates to match.
|
||||
|
||||
Agent run completion is authoritative for active task records. A successful
|
||||
detached run finalizes as `succeeded`, ordinary run errors finalize as
|
||||
`failed`, and timeout or abort outcomes finalize as `timed_out`. If an operator
|
||||
already cancelled the task, or the runtime already recorded a stronger terminal
|
||||
state such as `failed`, `timed_out`, or `lost`, a later success signal does not
|
||||
downgrade that terminal status.
|
||||
|
||||
`lost` is runtime-aware:
|
||||
|
||||
- ACP tasks: backing ACP child session metadata disappeared.
|
||||
- Subagent tasks: backing child session disappeared from the target agent store.
|
||||
- Cron tasks: the cron runtime no longer tracks the job as active.
|
||||
- CLI tasks: isolated child-session tasks use the child session; chat-backed CLI tasks use the live run context instead, so lingering channel/group/direct session rows do not keep them alive.
|
||||
- CLI tasks: isolated child-session tasks use the child session; chat-backed
|
||||
CLI tasks use the live run context instead, so lingering
|
||||
channel/group/direct session rows do not keep them alive. Gateway-backed
|
||||
`openclaw agent` runs also finalize from their run result, so completed runs
|
||||
do not sit active until the sweeper marks them `lost`.
|
||||
|
||||
## Delivery and notifications
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import { onTimer } from "../../cron/service/timer.js";
|
||||
import { loadCronStore } from "../../cron/store.js";
|
||||
import type { CronJob } from "../../cron/types.js";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
|
||||
const { logger, makeStorePath } = setupCronServiceSuite({
|
||||
prefix: "cron-service-timer-seam",
|
||||
@@ -74,6 +74,12 @@ describe("cron service timer seam coverage", () => {
|
||||
expect(job?.state.lastStatus).toBe("ok");
|
||||
expect(job?.state.runningAtMs).toBeUndefined();
|
||||
expect(job?.state.nextRunAtMs).toBe(now + 60_000);
|
||||
expect(findTaskByRunId(`cron:main-heartbeat-job:${now}`)).toMatchObject({
|
||||
runtime: "cron",
|
||||
status: "succeeded",
|
||||
endedAt: now,
|
||||
cleanupAfter: expect.any(Number),
|
||||
});
|
||||
|
||||
const delays = timeoutSpy.mock.calls
|
||||
.map(([, delay]) => delay)
|
||||
|
||||
@@ -6,7 +6,11 @@ import {
|
||||
resetDetachedTaskLifecycleRuntimeForTests,
|
||||
setDetachedTaskLifecycleRuntime,
|
||||
} from "../../tasks/detached-task-runtime.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
import {
|
||||
findTaskByRunId,
|
||||
markTaskTerminalById,
|
||||
resetTaskRegistryForTests,
|
||||
} from "../../tasks/task-registry.js";
|
||||
import { withTempDir } from "../../test-helpers/temp-dir.js";
|
||||
import { agentHandlers } from "./agent.js";
|
||||
import { chatHandlers } from "./chat.js";
|
||||
@@ -994,7 +998,7 @@ describe("gateway agent handler", () => {
|
||||
expect(callArgs.runContext?.messageChannel).toBe("webchat");
|
||||
});
|
||||
|
||||
it("tracks async gateway agent runs in the shared task registry", async () => {
|
||||
it("terminalizes successful async gateway agent runs in the shared task registry", async () => {
|
||||
await withTempDir({ prefix: "openclaw-gateway-agent-task-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
@@ -1009,10 +1013,148 @@ describe("gateway agent handler", () => {
|
||||
{ reqId: "task-registry-agent-run" },
|
||||
);
|
||||
|
||||
expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({
|
||||
runtime: "cli",
|
||||
childSessionKey: "agent:main:main",
|
||||
status: "running",
|
||||
await waitForAssertion(() => {
|
||||
expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({
|
||||
runtime: "cli",
|
||||
childSessionKey: "agent:main:main",
|
||||
status: "succeeded",
|
||||
terminalSummary: "completed",
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("terminalizes failed async gateway agent runs in the shared task registry", async () => {
|
||||
await withTempDir({ prefix: "openclaw-gateway-agent-task-error-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
primeMainAgentRun();
|
||||
mocks.agentCommand.mockRejectedValueOnce(new Error("agent unavailable"));
|
||||
|
||||
await invokeAgent(
|
||||
{
|
||||
message: "background cli task",
|
||||
sessionKey: "agent:main:main",
|
||||
idempotencyKey: "task-registry-agent-run-error",
|
||||
},
|
||||
{ reqId: "task-registry-agent-run-error" },
|
||||
);
|
||||
|
||||
await waitForAssertion(() => {
|
||||
expect(findTaskByRunId("task-registry-agent-run-error")).toMatchObject({
|
||||
runtime: "cli",
|
||||
childSessionKey: "agent:main:main",
|
||||
status: "failed",
|
||||
error: "Error: agent unavailable",
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves aborted async gateway agent runs as timed out", async () => {
|
||||
await withTempDir({ prefix: "openclaw-gateway-agent-task-aborted-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
primeMainAgentRun();
|
||||
mocks.agentCommand.mockResolvedValueOnce({
|
||||
payloads: [],
|
||||
meta: { durationMs: 100, aborted: true },
|
||||
});
|
||||
|
||||
await invokeAgent(
|
||||
{
|
||||
message: "background cli task",
|
||||
sessionKey: "agent:main:main",
|
||||
idempotencyKey: "task-registry-agent-run-aborted",
|
||||
},
|
||||
{ reqId: "task-registry-agent-run-aborted" },
|
||||
);
|
||||
|
||||
await waitForAssertion(() => {
|
||||
expect(findTaskByRunId("task-registry-agent-run-aborted")).toMatchObject({
|
||||
runtime: "cli",
|
||||
childSessionKey: "agent:main:main",
|
||||
status: "timed_out",
|
||||
terminalSummary: "aborted",
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("classifies aborted async gateway agent rejections as timed out", async () => {
|
||||
await withTempDir({ prefix: "openclaw-gateway-agent-task-abort-error-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
primeMainAgentRun();
|
||||
const abortError = new Error("This operation was aborted");
|
||||
abortError.name = "AbortError";
|
||||
mocks.agentCommand.mockRejectedValueOnce(abortError);
|
||||
|
||||
await invokeAgent(
|
||||
{
|
||||
message: "background cli task",
|
||||
sessionKey: "agent:main:main",
|
||||
idempotencyKey: "task-registry-agent-run-abort-error",
|
||||
},
|
||||
{ reqId: "task-registry-agent-run-abort-error" },
|
||||
);
|
||||
|
||||
await waitForAssertion(() => {
|
||||
expect(findTaskByRunId("task-registry-agent-run-abort-error")).toMatchObject({
|
||||
runtime: "cli",
|
||||
childSessionKey: "agent:main:main",
|
||||
status: "timed_out",
|
||||
error: "AbortError: This operation was aborted",
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("does not overwrite operator-cancelled async gateway agent tasks after late completion", async () => {
|
||||
await withTempDir({ prefix: "openclaw-gateway-agent-task-cancelled-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
primeMainAgentRun();
|
||||
let resolveRun: (value: {
|
||||
payloads: Array<{ text: string }>;
|
||||
meta: { durationMs: number };
|
||||
}) => void;
|
||||
const pending = new Promise<{
|
||||
payloads: Array<{ text: string }>;
|
||||
meta: { durationMs: number };
|
||||
}>((resolve) => {
|
||||
resolveRun = resolve;
|
||||
});
|
||||
mocks.agentCommand.mockReturnValueOnce(pending);
|
||||
|
||||
await invokeAgent(
|
||||
{
|
||||
message: "background cli task",
|
||||
sessionKey: "agent:main:main",
|
||||
idempotencyKey: "task-registry-agent-run-cancelled",
|
||||
},
|
||||
{ reqId: "task-registry-agent-run-cancelled" },
|
||||
);
|
||||
|
||||
const task = findTaskByRunId("task-registry-agent-run-cancelled");
|
||||
expect(task).toMatchObject({ status: "running" });
|
||||
const cancelledAt = (task?.startedAt ?? Date.now()) + 1;
|
||||
markTaskTerminalById({
|
||||
taskId: task!.taskId,
|
||||
status: "cancelled",
|
||||
endedAt: cancelledAt,
|
||||
lastEventAt: cancelledAt,
|
||||
terminalSummary: "Cancelled by operator.",
|
||||
});
|
||||
|
||||
resolveRun!({ payloads: [{ text: "ok" }], meta: { durationMs: 100 } });
|
||||
|
||||
await waitForAssertion(() => {
|
||||
expect(findTaskByRunId("task-registry-agent-run-cancelled")).toMatchObject({
|
||||
status: "cancelled",
|
||||
endedAt: cancelledAt,
|
||||
terminalSummary: "Cancelled by operator.",
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1250,10 +1392,15 @@ describe("gateway agent handler", () => {
|
||||
(...args: Parameters<typeof defaultRuntime.createRunningTaskRun>) =>
|
||||
defaultRuntime.createRunningTaskRun(...args),
|
||||
);
|
||||
const finalizeTaskRunByRunIdSpy = vi.fn(
|
||||
(...args: Parameters<NonNullable<typeof defaultRuntime.finalizeTaskRunByRunId>>) =>
|
||||
defaultRuntime.finalizeTaskRunByRunId!(...args),
|
||||
);
|
||||
|
||||
setDetachedTaskLifecycleRuntime({
|
||||
...defaultRuntime,
|
||||
createRunningTaskRun: createRunningTaskRunSpy,
|
||||
finalizeTaskRunByRunId: finalizeTaskRunByRunIdSpy,
|
||||
});
|
||||
|
||||
await invokeAgent(
|
||||
@@ -1274,10 +1421,19 @@ describe("gateway agent handler", () => {
|
||||
task: expect.stringContaining("background cli seam task"),
|
||||
}),
|
||||
);
|
||||
expect(finalizeTaskRunByRunIdSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
runtime: "cli",
|
||||
runId: "task-registry-agent-seam",
|
||||
status: "succeeded",
|
||||
terminalSummary: "completed",
|
||||
}),
|
||||
);
|
||||
expect(findTaskByRunId("task-registry-agent-seam")).toMatchObject({
|
||||
runtime: "cli",
|
||||
childSessionKey: "agent:main:main",
|
||||
status: "running",
|
||||
status: "succeeded",
|
||||
terminalSummary: "completed",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { listAgentIds, resolveAgentWorkspaceDir } from "../../agents/agent-scope.js";
|
||||
import { isTimeoutError } from "../../agents/failover-error.js";
|
||||
import {
|
||||
resolveAgentAvatar,
|
||||
resolvePublicAgentAvatarSource,
|
||||
@@ -41,6 +42,7 @@ import {
|
||||
} from "../../infra/outbound/agent-delivery.js";
|
||||
import { shouldDowngradeDeliveryToSessionOnly } from "../../infra/outbound/best-effort-delivery.js";
|
||||
import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js";
|
||||
import { isAbortError } from "../../infra/unhandled-rejections.js";
|
||||
import type { PromptImageOrderEntry } from "../../media/prompt-image-order.js";
|
||||
import {
|
||||
classifySessionKeyShape,
|
||||
@@ -55,7 +57,8 @@ import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
} from "../../shared/string-coerce.js";
|
||||
import { createRunningTaskRun } from "../../tasks/detached-task-runtime.js";
|
||||
import { createRunningTaskRun, finalizeTaskRunByRunId } from "../../tasks/detached-task-runtime.js";
|
||||
import type { TaskStatus } from "../../tasks/task-registry.types.js";
|
||||
import {
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
@@ -237,6 +240,35 @@ function emitSessionsChanged(
|
||||
);
|
||||
}
|
||||
|
||||
type GatewayAgentTaskTerminalStatus = Extract<
|
||||
TaskStatus,
|
||||
"succeeded" | "failed" | "timed_out" | "cancelled"
|
||||
>;
|
||||
|
||||
function resolveFailedTrackedAgentTaskStatus(error: unknown): GatewayAgentTaskTerminalStatus {
|
||||
return isAbortError(error) || isTimeoutError(error) ? "timed_out" : "failed";
|
||||
}
|
||||
|
||||
function tryFinalizeTrackedAgentTask(params: {
|
||||
runId: string;
|
||||
status: GatewayAgentTaskTerminalStatus;
|
||||
error?: string;
|
||||
terminalSummary?: string;
|
||||
}): void {
|
||||
try {
|
||||
finalizeTaskRunByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "cli",
|
||||
status: params.status,
|
||||
endedAt: Date.now(),
|
||||
...(params.error !== undefined ? { error: params.error } : {}),
|
||||
...(params.terminalSummary !== undefined ? { terminalSummary: params.terminalSummary } : {}),
|
||||
});
|
||||
} catch {
|
||||
// Best-effort only: background task tracking must not block agent runs.
|
||||
}
|
||||
}
|
||||
|
||||
function dispatchAgentRunFromGateway(params: {
|
||||
ingressOpts: Parameters<typeof agentCommandFromIngress>[0];
|
||||
runId: string;
|
||||
@@ -278,6 +310,14 @@ function dispatchAgentRunFromGateway(params: {
|
||||
}
|
||||
void agentCommandFromIngress(params.ingressOpts, defaultRuntime, params.context.deps)
|
||||
.then((result) => {
|
||||
if (shouldTrackTask) {
|
||||
const aborted = result.meta.aborted === true;
|
||||
tryFinalizeTrackedAgentTask({
|
||||
runId: params.runId,
|
||||
status: aborted ? "timed_out" : "succeeded",
|
||||
terminalSummary: aborted ? "aborted" : "completed",
|
||||
});
|
||||
}
|
||||
const payload = {
|
||||
runId: params.runId,
|
||||
status: "ok" as const,
|
||||
@@ -298,6 +338,15 @@ function dispatchAgentRunFromGateway(params: {
|
||||
params.respond(true, payload, undefined, { runId: params.runId });
|
||||
})
|
||||
.catch((err) => {
|
||||
if (shouldTrackTask) {
|
||||
const error = String(err);
|
||||
tryFinalizeTrackedAgentTask({
|
||||
runId: params.runId,
|
||||
status: resolveFailedTrackedAgentTaskStatus(err),
|
||||
error,
|
||||
terminalSummary: error,
|
||||
});
|
||||
}
|
||||
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
|
||||
const payload = {
|
||||
runId: params.runId,
|
||||
|
||||
@@ -99,6 +99,7 @@ function createDetachedTaskRuntimeStub(id: string): DetachedTaskLifecycleRuntime
|
||||
createRunningTaskRun: () => fail("createRunningTaskRun"),
|
||||
startTaskRunByRunId: () => fail("startTaskRunByRunId"),
|
||||
recordTaskRunProgressByRunId: () => fail("recordTaskRunProgressByRunId"),
|
||||
finalizeTaskRunByRunId: () => fail("finalizeTaskRunByRunId"),
|
||||
completeTaskRunByRunId: () => fail("completeTaskRunByRunId"),
|
||||
failTaskRunByRunId: () => fail("failTaskRunByRunId"),
|
||||
setDetachedTaskDeliveryStatusByRunId: () => fail("setDetachedTaskDeliveryStatusByRunId"),
|
||||
@@ -3301,6 +3302,7 @@ module.exports = { id: "throws-after-import", register() {} };`,
|
||||
createRunningTaskRun() { throw new Error("snapshot createRunningTaskRun should not run"); },
|
||||
startTaskRunByRunId() { throw new Error("snapshot startTaskRunByRunId should not run"); },
|
||||
recordTaskRunProgressByRunId() { throw new Error("snapshot recordTaskRunProgressByRunId should not run"); },
|
||||
finalizeTaskRunByRunId() { throw new Error("snapshot finalizeTaskRunByRunId should not run"); },
|
||||
completeTaskRunByRunId() { throw new Error("snapshot completeTaskRunByRunId should not run"); },
|
||||
failTaskRunByRunId() { throw new Error("snapshot failTaskRunByRunId should not run"); },
|
||||
setDetachedTaskDeliveryStatusByRunId() { throw new Error("snapshot setDetachedTaskDeliveryStatusByRunId should not run"); },
|
||||
@@ -3345,6 +3347,7 @@ module.exports = { id: "throws-after-import", register() {} };`,
|
||||
createRunningTaskRun() { throw new Error("failing createRunningTaskRun should not run"); },
|
||||
startTaskRunByRunId() { throw new Error("failing startTaskRunByRunId should not run"); },
|
||||
recordTaskRunProgressByRunId() { throw new Error("failing recordTaskRunProgressByRunId should not run"); },
|
||||
finalizeTaskRunByRunId() { throw new Error("failing finalizeTaskRunByRunId should not run"); },
|
||||
completeTaskRunByRunId() { throw new Error("failing completeTaskRunByRunId should not run"); },
|
||||
failTaskRunByRunId() { throw new Error("failing failTaskRunByRunId should not run"); },
|
||||
setDetachedTaskDeliveryStatusByRunId() { throw new Error("failing setDetachedTaskDeliveryStatusByRunId should not run"); },
|
||||
@@ -3386,6 +3389,7 @@ module.exports = { id: "throws-after-import", register() {} };`,
|
||||
createRunningTaskRun() { throw new Error("cached createRunningTaskRun should not run"); },
|
||||
startTaskRunByRunId() { throw new Error("cached startTaskRunByRunId should not run"); },
|
||||
recordTaskRunProgressByRunId() { throw new Error("cached recordTaskRunProgressByRunId should not run"); },
|
||||
finalizeTaskRunByRunId() { throw new Error("cached finalizeTaskRunByRunId should not run"); },
|
||||
completeTaskRunByRunId() { throw new Error("cached completeTaskRunByRunId should not run"); },
|
||||
failTaskRunByRunId() { throw new Error("cached failTaskRunByRunId should not run"); },
|
||||
setDetachedTaskDeliveryStatusByRunId() { throw new Error("cached setDetachedTaskDeliveryStatusByRunId should not run"); },
|
||||
|
||||
@@ -78,6 +78,19 @@ export type DetachedTaskFailParams = {
|
||||
terminalSummary?: string | null;
|
||||
};
|
||||
|
||||
export type DetachedTaskFinalizeParams = {
|
||||
runId: string;
|
||||
runtime?: TaskRuntime;
|
||||
sessionKey?: string;
|
||||
status: Extract<TaskStatus, "succeeded" | "failed" | "timed_out" | "cancelled">;
|
||||
endedAt: number;
|
||||
lastEventAt?: number;
|
||||
error?: string;
|
||||
progressSummary?: string | null;
|
||||
terminalSummary?: string | null;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
};
|
||||
|
||||
export type DetachedTaskDeliveryStatusParams = {
|
||||
runId: string;
|
||||
runtime?: TaskRuntime;
|
||||
@@ -113,6 +126,7 @@ export type DetachedTaskLifecycleRuntime = {
|
||||
createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord;
|
||||
startTaskRunByRunId: (params: DetachedTaskStartParams) => TaskRecord[];
|
||||
recordTaskRunProgressByRunId: (params: DetachedTaskProgressParams) => TaskRecord[];
|
||||
finalizeTaskRunByRunId?: (params: DetachedTaskFinalizeParams) => TaskRecord[];
|
||||
completeTaskRunByRunId: (params: DetachedTaskCompleteParams) => TaskRecord[];
|
||||
failTaskRunByRunId: (params: DetachedTaskFailParams) => TaskRecord[];
|
||||
setDetachedTaskDeliveryStatusByRunId: (params: DetachedTaskDeliveryStatusParams) => TaskRecord[];
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
createQueuedTaskRun,
|
||||
createRunningTaskRun,
|
||||
failTaskRunByRunId,
|
||||
finalizeTaskRunByRunId,
|
||||
getDetachedTaskLifecycleRuntime,
|
||||
getDetachedTaskLifecycleRuntimeRegistration,
|
||||
registerDetachedTaskRuntime,
|
||||
@@ -76,6 +77,7 @@ describe("detached-task-runtime", () => {
|
||||
createRunningTaskRun: vi.fn(() => runningTask),
|
||||
startTaskRunByRunId: vi.fn(() => updatedTasks),
|
||||
recordTaskRunProgressByRunId: vi.fn(() => updatedTasks),
|
||||
finalizeTaskRunByRunId: vi.fn(() => updatedTasks),
|
||||
completeTaskRunByRunId: vi.fn(() => updatedTasks),
|
||||
failTaskRunByRunId: vi.fn(() => updatedTasks),
|
||||
setDetachedTaskDeliveryStatusByRunId: vi.fn(() => updatedTasks),
|
||||
@@ -111,6 +113,7 @@ describe("detached-task-runtime", () => {
|
||||
|
||||
startTaskRunByRunId({ runId: "run-running", startedAt: 10 });
|
||||
recordTaskRunProgressByRunId({ runId: "run-running", lastEventAt: 20 });
|
||||
finalizeTaskRunByRunId({ runId: "run-running", status: "succeeded", endedAt: 25 });
|
||||
completeTaskRunByRunId({ runId: "run-running", endedAt: 30 });
|
||||
failTaskRunByRunId({ runId: "run-running", endedAt: 40 });
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
@@ -134,6 +137,9 @@ describe("detached-task-runtime", () => {
|
||||
expect(fakeRuntime.recordTaskRunProgressByRunId).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ runId: "run-running", lastEventAt: 20 }),
|
||||
);
|
||||
expect(fakeRuntime.finalizeTaskRunByRunId).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ runId: "run-running", status: "succeeded", endedAt: 25 }),
|
||||
);
|
||||
expect(fakeRuntime.completeTaskRunByRunId).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ runId: "run-running", endedAt: 30 }),
|
||||
);
|
||||
@@ -166,6 +172,30 @@ describe("detached-task-runtime", () => {
|
||||
expect(getDetachedTaskLifecycleRuntime()).toBe(runtime);
|
||||
});
|
||||
|
||||
it("falls back to legacy complete and fail hooks when a runtime has no finalizer", () => {
|
||||
const defaultRuntime = getDetachedTaskLifecycleRuntime();
|
||||
const completeTaskRunByRunIdSpy = vi.fn(() => []);
|
||||
const failTaskRunByRunIdSpy = vi.fn(() => []);
|
||||
const legacyRuntime = {
|
||||
...defaultRuntime,
|
||||
completeTaskRunByRunId: completeTaskRunByRunIdSpy,
|
||||
failTaskRunByRunId: failTaskRunByRunIdSpy,
|
||||
};
|
||||
delete legacyRuntime.finalizeTaskRunByRunId;
|
||||
|
||||
setDetachedTaskLifecycleRuntime(legacyRuntime);
|
||||
|
||||
finalizeTaskRunByRunId({ runId: "legacy-ok", status: "succeeded", endedAt: 10 });
|
||||
finalizeTaskRunByRunId({ runId: "legacy-timeout", status: "timed_out", endedAt: 20 });
|
||||
|
||||
expect(completeTaskRunByRunIdSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ runId: "legacy-ok", status: "succeeded", endedAt: 10 }),
|
||||
);
|
||||
expect(failTaskRunByRunIdSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ runId: "legacy-timeout", status: "timed_out", endedAt: 20 }),
|
||||
);
|
||||
});
|
||||
|
||||
describe("tryRecoverTaskBeforeMarkLost", () => {
|
||||
it("returns recovered when hook returns recovered true", async () => {
|
||||
const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" });
|
||||
|
||||
@@ -2,6 +2,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type {
|
||||
DetachedTaskRecoveryAttemptParams,
|
||||
DetachedTaskRecoveryAttemptResult,
|
||||
DetachedTaskFinalizeParams,
|
||||
DetachedTaskLifecycleRuntime,
|
||||
DetachedTaskLifecycleRuntimeRegistration,
|
||||
} from "./detached-task-runtime-contract.js";
|
||||
@@ -17,10 +18,12 @@ import {
|
||||
createQueuedTaskRun as createQueuedTaskRunFromExecutor,
|
||||
createRunningTaskRun as createRunningTaskRunFromExecutor,
|
||||
failTaskRunByRunId as failTaskRunByRunIdFromExecutor,
|
||||
finalizeTaskRunByRunId as finalizeTaskRunByRunIdFromExecutor,
|
||||
recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdFromExecutor,
|
||||
setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor,
|
||||
startTaskRunByRunId as startTaskRunByRunIdFromExecutor,
|
||||
} from "./task-executor.js";
|
||||
import type { TaskRecord } from "./task-registry.types.js";
|
||||
|
||||
const log = createSubsystemLogger("tasks/detached-runtime");
|
||||
const DETACHED_TASK_RECOVERY_WARN_MS = 5_000;
|
||||
@@ -32,6 +35,7 @@ const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = {
|
||||
createRunningTaskRun: createRunningTaskRunFromExecutor,
|
||||
startTaskRunByRunId: startTaskRunByRunIdFromExecutor,
|
||||
recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdFromExecutor,
|
||||
finalizeTaskRunByRunId: finalizeTaskRunByRunIdFromExecutor,
|
||||
completeTaskRunByRunId: completeTaskRunByRunIdFromExecutor,
|
||||
failTaskRunByRunId: failTaskRunByRunIdFromExecutor,
|
||||
setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdFromExecutor,
|
||||
@@ -87,6 +91,20 @@ export function recordTaskRunProgressByRunId(
|
||||
return getDetachedTaskLifecycleRuntime().recordTaskRunProgressByRunId(...args);
|
||||
}
|
||||
|
||||
export function finalizeTaskRunByRunId(params: DetachedTaskFinalizeParams): TaskRecord[] {
|
||||
const runtime = getDetachedTaskLifecycleRuntime();
|
||||
if (runtime.finalizeTaskRunByRunId) {
|
||||
return runtime.finalizeTaskRunByRunId(params);
|
||||
}
|
||||
if (params.status === "succeeded") {
|
||||
return runtime.completeTaskRunByRunId(params);
|
||||
}
|
||||
return runtime.failTaskRunByRunId({
|
||||
...params,
|
||||
status: params.status,
|
||||
});
|
||||
}
|
||||
|
||||
export function completeTaskRunByRunId(
|
||||
...args: Parameters<DetachedTaskLifecycleRuntime["completeTaskRunByRunId"]>
|
||||
): ReturnType<DetachedTaskLifecycleRuntime["completeTaskRunByRunId"]> {
|
||||
|
||||
@@ -8,6 +8,7 @@ export {
|
||||
findLatestTaskForFlowId,
|
||||
findLatestTaskForRelatedSessionKey,
|
||||
findTaskByRunId,
|
||||
finalizeTaskRunByRunId,
|
||||
getTaskById,
|
||||
getTaskRegistrySnapshot,
|
||||
getTaskRegistrySummary,
|
||||
|
||||
@@ -3,6 +3,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type {
|
||||
DetachedRunningTaskCreateParams,
|
||||
DetachedTaskCreateParams,
|
||||
DetachedTaskFinalizeParams,
|
||||
} from "./detached-task-runtime-contract.js";
|
||||
import { getRegisteredDetachedTaskLifecycleRuntime } from "./detached-task-runtime-state.js";
|
||||
import {
|
||||
@@ -15,7 +16,7 @@ import {
|
||||
listTasksForFlowId,
|
||||
markTaskLostById,
|
||||
markTaskRunningByRunId,
|
||||
markTaskTerminalByRunId,
|
||||
finalizeTaskRunByRunId as finalizeTaskRunByRunIdInRegistry,
|
||||
recordTaskProgressByRunId,
|
||||
setTaskRunDeliveryStatusByRunId,
|
||||
} from "./runtime-internal.js";
|
||||
@@ -168,19 +169,16 @@ export function completeTaskRunByRunId(params: {
|
||||
terminalSummary?: string | null;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
}) {
|
||||
return markTaskTerminalByRunId({
|
||||
runId: params.runId,
|
||||
runtime: params.runtime,
|
||||
sessionKey: params.sessionKey,
|
||||
return finalizeTaskRunByRunId({
|
||||
...params,
|
||||
status: "succeeded",
|
||||
endedAt: params.endedAt,
|
||||
lastEventAt: params.lastEventAt,
|
||||
progressSummary: params.progressSummary,
|
||||
terminalSummary: params.terminalSummary,
|
||||
terminalOutcome: params.terminalOutcome,
|
||||
});
|
||||
}
|
||||
|
||||
export function finalizeTaskRunByRunId(params: DetachedTaskFinalizeParams) {
|
||||
return finalizeTaskRunByRunIdInRegistry(params);
|
||||
}
|
||||
|
||||
export function failTaskRunByRunId(params: {
|
||||
runId: string;
|
||||
runtime?: TaskRuntime;
|
||||
@@ -192,16 +190,9 @@ export function failTaskRunByRunId(params: {
|
||||
progressSummary?: string | null;
|
||||
terminalSummary?: string | null;
|
||||
}) {
|
||||
return markTaskTerminalByRunId({
|
||||
runId: params.runId,
|
||||
runtime: params.runtime,
|
||||
sessionKey: params.sessionKey,
|
||||
return finalizeTaskRunByRunId({
|
||||
...params,
|
||||
status: params.status ?? "failed",
|
||||
endedAt: params.endedAt,
|
||||
lastEventAt: params.lastEventAt,
|
||||
error: params.error,
|
||||
progressSummary: params.progressSummary,
|
||||
terminalSummary: params.terminalSummary,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import {
|
||||
maybeDeliverTaskTerminalUpdate,
|
||||
markTaskRunningByRunId,
|
||||
markTaskTerminalById,
|
||||
markTaskTerminalByRunId,
|
||||
recordTaskProgressByRunId,
|
||||
resetTaskRegistryControlRuntimeForTests,
|
||||
resetTaskRegistryDeliveryRuntimeForTests,
|
||||
@@ -332,6 +333,128 @@ describe("task-registry", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps stronger run-scoped terminal states when a late success arrives", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
|
||||
createTaskRecord({
|
||||
runtime: "cli",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey: "agent:main:main",
|
||||
runId: "run-timeout-then-success",
|
||||
task: "Do the thing",
|
||||
status: "running",
|
||||
deliveryStatus: "not_applicable",
|
||||
startedAt: 100,
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-timeout-then-success",
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
endedAt: 200,
|
||||
aborted: true,
|
||||
},
|
||||
});
|
||||
markTaskTerminalByRunId({
|
||||
runId: "run-timeout-then-success",
|
||||
runtime: "cli",
|
||||
status: "succeeded",
|
||||
endedAt: 300,
|
||||
terminalSummary: "completed",
|
||||
});
|
||||
|
||||
expect(findTaskByRunId("run-timeout-then-success")).toMatchObject({
|
||||
status: "timed_out",
|
||||
endedAt: 200,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("does not downgrade failed run-scoped tasks when a late success arrives", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
|
||||
createTaskRecord({
|
||||
runtime: "cli",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey: "agent:main:main",
|
||||
runId: "run-fail-then-success",
|
||||
task: "Deliver result",
|
||||
status: "running",
|
||||
deliveryStatus: "not_applicable",
|
||||
startedAt: 100,
|
||||
});
|
||||
|
||||
markTaskTerminalByRunId({
|
||||
runId: "run-fail-then-success",
|
||||
runtime: "cli",
|
||||
status: "failed",
|
||||
endedAt: 200,
|
||||
error: "delivery failed",
|
||||
});
|
||||
markTaskTerminalByRunId({
|
||||
runId: "run-fail-then-success",
|
||||
runtime: "cli",
|
||||
status: "succeeded",
|
||||
endedAt: 300,
|
||||
terminalSummary: "completed",
|
||||
});
|
||||
|
||||
expect(findTaskByRunId("run-fail-then-success")).toMatchObject({
|
||||
status: "failed",
|
||||
endedAt: 200,
|
||||
error: "delivery failed",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("lets delivery failure upgrade a lifecycle success", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
|
||||
createTaskRecord({
|
||||
runtime: "cli",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey: "agent:main:main",
|
||||
runId: "run-success-then-fail",
|
||||
task: "Deliver result",
|
||||
status: "running",
|
||||
deliveryStatus: "not_applicable",
|
||||
startedAt: 100,
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-success-then-fail",
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
endedAt: 200,
|
||||
},
|
||||
});
|
||||
markTaskTerminalByRunId({
|
||||
runId: "run-success-then-fail",
|
||||
runtime: "cli",
|
||||
status: "failed",
|
||||
endedAt: 300,
|
||||
error: "delivery failed",
|
||||
});
|
||||
|
||||
expect(findTaskByRunId("run-success-then-fail")).toMatchObject({
|
||||
status: "failed",
|
||||
endedAt: 300,
|
||||
error: "delivery failed",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("summarizes task pressure by status and runtime", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
|
||||
@@ -399,6 +399,22 @@ function normalizeTaskTerminalOutcome(
|
||||
return value === "succeeded" || value === "blocked" ? value : undefined;
|
||||
}
|
||||
|
||||
function shouldApplyRunScopedStatusUpdate(params: {
|
||||
currentStatus: TaskStatus;
|
||||
nextStatus: TaskStatus;
|
||||
}): boolean {
|
||||
if (params.currentStatus === params.nextStatus) {
|
||||
return true;
|
||||
}
|
||||
if (!isTerminalTaskStatus(params.currentStatus)) {
|
||||
return true;
|
||||
}
|
||||
if (!isTerminalTaskStatus(params.nextStatus)) {
|
||||
return false;
|
||||
}
|
||||
return params.currentStatus === "succeeded" && params.nextStatus !== "lost";
|
||||
}
|
||||
|
||||
function resolveTaskTerminalOutcome(params: {
|
||||
status: TaskStatus;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
@@ -1584,6 +1600,15 @@ function updateTaskStateByRunId(params: {
|
||||
for (const current of matches) {
|
||||
const patch: Partial<TaskRecord> = {};
|
||||
const nextStatus = params.status ? normalizeTaskStatus(params.status) : current.status;
|
||||
if (
|
||||
params.status &&
|
||||
!shouldApplyRunScopedStatusUpdate({
|
||||
currentStatus: current.status,
|
||||
nextStatus,
|
||||
})
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const eventAt = params.lastEventAt ?? params.endedAt ?? Date.now();
|
||||
if (params.status) {
|
||||
patch.status = normalizeTaskStatus(params.status);
|
||||
@@ -1710,6 +1735,22 @@ export function markTaskTerminalByRunId(params: {
|
||||
progressSummary?: string | null;
|
||||
terminalSummary?: string | null;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
}) {
|
||||
return finalizeTaskRunByRunId(params);
|
||||
}
|
||||
|
||||
export function finalizeTaskRunByRunId(params: {
|
||||
runId: string;
|
||||
runtime?: TaskRuntime;
|
||||
sessionKey?: string;
|
||||
status: Extract<TaskStatus, "succeeded" | "failed" | "timed_out" | "cancelled">;
|
||||
startedAt?: number;
|
||||
endedAt: number;
|
||||
lastEventAt?: number;
|
||||
error?: string;
|
||||
progressSummary?: string | null;
|
||||
terminalSummary?: string | null;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
}) {
|
||||
return updateTaskStateByRunId({
|
||||
runId: params.runId,
|
||||
|
||||
Reference in New Issue
Block a user