tasks: extract detached task lifecycle runtime (#68886)

* tasks: extract detached task lifecycle runtime

* tests: relax gateway seam expectation

---------

Co-authored-by: Mariano Belinky <mariano@mb-server-643.local>
This commit is contained in:
Mariano
2026-04-19 10:56:31 +02:00
committed by GitHub
parent 2ecea9395b
commit 0787266637
22 changed files with 273 additions and 22 deletions

View File

@@ -10,7 +10,7 @@ import {
completeTaskRunByRunId,
failTaskRunByRunId,
startTaskRunByRunId,
} from "../../tasks/task-executor.js";
} from "../../tasks/detached-task-runtime.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
import {
AcpRuntimeError,

View File

@@ -8,7 +8,7 @@ import { enqueueSystemEvent } from "../infra/system-events.js";
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
import { normalizeAssistantPhase } from "../shared/chat-message-content.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { recordTaskRunProgressByRunId } from "../tasks/task-executor.js";
import { recordTaskRunProgressByRunId } from "../tasks/detached-task-runtime.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
const DEFAULT_STREAM_FLUSH_MS = 2_500;

View File

@@ -122,7 +122,7 @@ vi.mock("../infra/heartbeat-wake.js", () => ({
areHeartbeatsEnabled: hoisted.areHeartbeatsEnabledMock,
}));
vi.mock("../tasks/task-executor.js", () => ({
vi.mock("../tasks/detached-task-runtime.js", () => ({
createRunningTaskRun: hoisted.createRunningTaskRunMock,
}));

View File

@@ -59,7 +59,7 @@ import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../shared/string-coerce.js";
import { createRunningTaskRun } from "../tasks/task-executor.js";
import { createRunningTaskRun } from "../tasks/detached-task-runtime.js";
import {
deliveryContextFromSession,
formatConversationTarget,

View File

@@ -307,7 +307,7 @@ vi.mock("../config/sessions.js", () => ({
},
}));
vi.mock("../tasks/task-executor.js", () => ({
vi.mock("../tasks/detached-task-runtime.js", () => ({
completeTaskRunByRunId: vi.fn(),
createRunningTaskRun: vi.fn(),
failTaskRunByRunId: vi.fn(),

View File

@@ -15,7 +15,7 @@ import {
recordTaskRunProgressByRunId,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "../../tasks/task-executor.js";
} from "../../tasks/detached-task-runtime.js";
import {
cancelTaskByIdForOwner,
findTaskByRunIdForOwner,

View File

@@ -27,7 +27,7 @@ const browserLifecycleCleanupMocks = vi.hoisted(() => ({
cleanupBrowserSessionsForLifecycleEnd: vi.fn(async () => {}),
}));
vi.mock("../tasks/task-executor.js", () => ({
vi.mock("../tasks/detached-task-runtime.js", () => ({
completeTaskRunByRunId: taskExecutorMocks.completeTaskRunByRunId,
failTaskRunByRunId: taskExecutorMocks.failTaskRunByRunId,
setDetachedTaskDeliveryStatusByRunId: taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId,

View File

@@ -7,7 +7,7 @@ import {
completeTaskRunByRunId,
failTaskRunByRunId,
setDetachedTaskDeliveryStatusByRunId,
} from "../tasks/task-executor.js";
} from "../tasks/detached-task-runtime.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
captureSubagentCompletionReply,

View File

@@ -3,7 +3,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
import { callGateway } from "../gateway/call.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { createRunningTaskRun } from "../tasks/task-executor.js";
import { createRunningTaskRun } from "../tasks/detached-task-runtime.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import { waitForAgentRun } from "./run-wait.js";
import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js";

View File

@@ -10,7 +10,7 @@ import {
createRunningTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
} from "../../tasks/task-executor.js";
} from "../../tasks/detached-task-runtime.js";
import { sendMessage } from "../../tasks/task-registry-delivery-runtime.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";

View File

@@ -12,7 +12,7 @@ import {
taskExecutorMocks,
} from "./media-generate-background.test-support.js";
vi.mock("../../tasks/task-executor.js", () => taskExecutorMocks);
vi.mock("../../tasks/detached-task-runtime.js", () => taskExecutorMocks);
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => taskDeliveryRuntimeMocks);
vi.mock("../subagent-announce-delivery.js", () => announceDeliveryMocks);

View File

@@ -98,7 +98,7 @@ vi.mock("../../media/web-media.js", () => ({
vi.mock("../../music-generation/runtime.js", () => musicGenerationRuntimeMocks);
vi.mock("./music-generate-background.js", () => musicGenerateBackgroundMocks);
vi.mock("../../tasks/runtime-internal.js", () => taskRuntimeInternalMocks);
vi.mock("../../tasks/task-executor.js", () => taskExecutorMocks);
vi.mock("../../tasks/detached-task-runtime.js", () => taskExecutorMocks);
function asConfig(value: unknown): OpenClawConfig {
return value as OpenClawConfig;

View File

@@ -12,7 +12,7 @@ import {
taskExecutorMocks,
} from "./media-generate-background.test-support.js";
vi.mock("../../tasks/task-executor.js", () => taskExecutorMocks);
vi.mock("../../tasks/detached-task-runtime.js", () => taskExecutorMocks);
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => taskDeliveryRuntimeMocks);
vi.mock("../subagent-announce-delivery.js", () => announceDeliveryMocks);

View File

@@ -17,7 +17,7 @@ const taskExecutorMocks = vi.hoisted(() => ({
}));
vi.mock("../../tasks/runtime-internal.js", () => taskRuntimeInternalMocks);
vi.mock("../../tasks/task-executor.js", () => taskExecutorMocks);
vi.mock("../../tasks/detached-task-runtime.js", () => taskExecutorMocks);
function asConfig(value: unknown): OpenClawConfig {
return value as OpenClawConfig;

View File

@@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import * as taskExecutor from "../../tasks/task-executor.js";
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
import type { CronJob } from "../types.js";
@@ -212,7 +212,7 @@ describe("cron service ops seam coverage", () => {
});
const createTaskRecordSpy = vi
.spyOn(taskExecutor, "createRunningTaskRun")
.spyOn(detachedTaskRuntime, "createRunningTaskRun")
.mockImplementation(() => {
throw new Error("disk full");
});
@@ -237,7 +237,7 @@ describe("cron service ops seam coverage", () => {
await writeDueIsolatedJobSnapshot(storePath, now);
const updateTaskRecordSpy = vi
.spyOn(taskExecutor, "completeTaskRunByRunId")
.spyOn(detachedTaskRuntime, "completeTaskRunByRunId")
.mockImplementation(() => {
throw new Error("disk full");
});

View File

@@ -5,7 +5,7 @@ import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/task-executor.js";
} from "../../tasks/detached-task-runtime.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
import {
applyJobPatch,

View File

@@ -4,7 +4,7 @@ import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/servic
import { createCronServiceState } from "../../cron/service/state.js";
import { onTimer } from "../../cron/service/timer.js";
import type { CronJob } from "../../cron/types.js";
import * as taskExecutor from "../../tasks/task-executor.js";
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
const { logger, makeStorePath } = setupCronServiceSuite({
@@ -96,7 +96,7 @@ describe("cron service timer seam coverage", () => {
});
const createTaskRecordSpy = vi
.spyOn(taskExecutor, "createRunningTaskRun")
.spyOn(detachedTaskRuntime, "createRunningTaskRun")
.mockImplementation(() => {
throw new Error("disk full");
});

View File

@@ -7,7 +7,7 @@ import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/task-executor.js";
} from "../../tasks/detached-task-runtime.js";
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
import { sweepCronRunSessions } from "../session-reaper.js";

View File

@@ -1,6 +1,11 @@
import fs from "node:fs/promises";
import { afterEach, describe, expect, it, vi } from "vitest";
import { BARE_SESSION_RESET_PROMPT } from "../../auto-reply/reply/session-reset-prompt.js";
import {
getDetachedTaskLifecycleRuntime,
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
} from "../../tasks/detached-task-runtime.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { withTempDir } from "../../test-helpers/temp-dir.js";
import { agentHandlers } from "./agent.js";
@@ -327,6 +332,7 @@ describe("gateway agent handler", () => {
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetDetachedTaskLifecycleRuntimeForTests();
resetTaskRegistryForTests();
mocks.resolveBareResetBootstrapFileAccess.mockReset().mockReturnValue(true);
});
@@ -990,6 +996,49 @@ describe("gateway agent handler", () => {
});
});
it("dispatches async gateway agent task creation through the detached task runtime seam", async () => {
await withTempDir({ prefix: "openclaw-gateway-agent-seam-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
primeMainAgentRun();
const defaultRuntime = getDetachedTaskLifecycleRuntime();
const createRunningTaskRunSpy = vi.fn(
(...args: Parameters<typeof defaultRuntime.createRunningTaskRun>) =>
defaultRuntime.createRunningTaskRun(...args),
);
setDetachedTaskLifecycleRuntime({
...defaultRuntime,
createRunningTaskRun: createRunningTaskRunSpy,
});
await invokeAgent(
{
message: "background cli seam task",
sessionKey: "agent:main:main",
idempotencyKey: "task-registry-agent-seam",
},
{ reqId: "task-registry-agent-seam" },
);
expect(createRunningTaskRunSpy).toHaveBeenCalledWith(
expect.objectContaining({
runtime: "cli",
runId: "task-registry-agent-seam",
childSessionKey: "agent:main:main",
sourceId: "task-registry-agent-seam",
task: expect.stringContaining("background cli seam task"),
}),
);
expect(findTaskByRunId("task-registry-agent-seam")).toMatchObject({
runtime: "cli",
childSessionKey: "agent:main:main",
status: "running",
});
});
});
it("handles missing cliSessionIds gracefully", async () => {
mockMainSessionEntry({});

View File

@@ -45,7 +45,7 @@ import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js";
import { createRunningTaskRun } from "../../tasks/task-executor.js";
import { createRunningTaskRun } from "../../tasks/detached-task-runtime.js";
import {
mergeDeliveryContext,
normalizeDeliveryContext,

View File

@@ -0,0 +1,117 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { TaskRecord } from "./task-registry.types.js";
import {
completeTaskRunByRunId,
createQueuedTaskRun,
createRunningTaskRun,
failTaskRunByRunId,
getDetachedTaskLifecycleRuntime,
recordTaskRunProgressByRunId,
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./detached-task-runtime.js";
function createFakeTaskRecord(overrides?: Partial<TaskRecord>): TaskRecord {
return {
taskId: "task-fake",
runtime: "cli",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "run-fake",
task: "Fake task",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "done_only",
createdAt: 1,
...overrides,
};
}
describe("detached-task-runtime", () => {
afterEach(() => {
resetDetachedTaskLifecycleRuntimeForTests();
});
it("dispatches lifecycle operations through the installed runtime", () => {
const defaultRuntime = getDetachedTaskLifecycleRuntime();
const queuedTask = createFakeTaskRecord({
taskId: "task-queued",
runId: "run-queued",
status: "queued",
});
const runningTask = createFakeTaskRecord({
taskId: "task-running",
runId: "run-running",
});
const fakeRuntime = {
createQueuedTaskRun: vi.fn(() => queuedTask),
createRunningTaskRun: vi.fn(() => runningTask),
startTaskRunByRunId: vi.fn(() => undefined),
recordTaskRunProgressByRunId: vi.fn(() => undefined),
completeTaskRunByRunId: vi.fn(() => undefined),
failTaskRunByRunId: vi.fn(() => undefined),
setDetachedTaskDeliveryStatusByRunId: vi.fn(() => undefined),
};
setDetachedTaskLifecycleRuntime(fakeRuntime);
expect(
createQueuedTaskRun({
runtime: "cli",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterSessionKey: "agent:main:main",
runId: "run-queued",
task: "Queue task",
}),
).toBe(queuedTask);
expect(
createRunningTaskRun({
runtime: "cli",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterSessionKey: "agent:main:main",
runId: "run-running",
task: "Run task",
}),
).toBe(runningTask);
startTaskRunByRunId({ runId: "run-running", startedAt: 10 });
recordTaskRunProgressByRunId({ runId: "run-running", lastEventAt: 20 });
completeTaskRunByRunId({ runId: "run-running", endedAt: 30 });
failTaskRunByRunId({ runId: "run-running", endedAt: 40 });
setDetachedTaskDeliveryStatusByRunId({
runId: "run-running",
deliveryStatus: "delivered",
});
expect(fakeRuntime.createQueuedTaskRun).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-queued", task: "Queue task" }),
);
expect(fakeRuntime.createRunningTaskRun).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", task: "Run task" }),
);
expect(fakeRuntime.startTaskRunByRunId).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", startedAt: 10 }),
);
expect(fakeRuntime.recordTaskRunProgressByRunId).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", lastEventAt: 20 }),
);
expect(fakeRuntime.completeTaskRunByRunId).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", endedAt: 30 }),
);
expect(fakeRuntime.failTaskRunByRunId).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", endedAt: 40 }),
);
expect(fakeRuntime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", deliveryStatus: "delivered" }),
);
resetDetachedTaskLifecycleRuntimeForTests();
expect(getDetachedTaskLifecycleRuntime()).toBe(defaultRuntime);
});
});

View File

@@ -0,0 +1,85 @@
import {
completeTaskRunByRunId as completeTaskRunByRunIdInCore,
createQueuedTaskRun as createQueuedTaskRunInCore,
createRunningTaskRun as createRunningTaskRunInCore,
failTaskRunByRunId as failTaskRunByRunIdInCore,
recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdInCore,
setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdInCore,
startTaskRunByRunId as startTaskRunByRunIdInCore,
} from "./task-executor.js";
export type DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: typeof createQueuedTaskRunInCore;
createRunningTaskRun: typeof createRunningTaskRunInCore;
startTaskRunByRunId: typeof startTaskRunByRunIdInCore;
recordTaskRunProgressByRunId: typeof recordTaskRunProgressByRunIdInCore;
completeTaskRunByRunId: typeof completeTaskRunByRunIdInCore;
failTaskRunByRunId: typeof failTaskRunByRunIdInCore;
setDetachedTaskDeliveryStatusByRunId: typeof setDetachedTaskDeliveryStatusByRunIdInCore;
};
const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: createQueuedTaskRunInCore,
createRunningTaskRun: createRunningTaskRunInCore,
startTaskRunByRunId: startTaskRunByRunIdInCore,
recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdInCore,
completeTaskRunByRunId: completeTaskRunByRunIdInCore,
failTaskRunByRunId: failTaskRunByRunIdInCore,
setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdInCore,
};
let detachedTaskLifecycleRuntime = DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME;
export function getDetachedTaskLifecycleRuntime(): DetachedTaskLifecycleRuntime {
return detachedTaskLifecycleRuntime;
}
export function setDetachedTaskLifecycleRuntime(runtime: DetachedTaskLifecycleRuntime): void {
detachedTaskLifecycleRuntime = runtime;
}
export function resetDetachedTaskLifecycleRuntimeForTests(): void {
detachedTaskLifecycleRuntime = DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME;
}
export function createQueuedTaskRun(
...args: Parameters<DetachedTaskLifecycleRuntime["createQueuedTaskRun"]>
): ReturnType<DetachedTaskLifecycleRuntime["createQueuedTaskRun"]> {
return detachedTaskLifecycleRuntime.createQueuedTaskRun(...args);
}
export function createRunningTaskRun(
...args: Parameters<DetachedTaskLifecycleRuntime["createRunningTaskRun"]>
): ReturnType<DetachedTaskLifecycleRuntime["createRunningTaskRun"]> {
return detachedTaskLifecycleRuntime.createRunningTaskRun(...args);
}
export function startTaskRunByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["startTaskRunByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["startTaskRunByRunId"]> {
return detachedTaskLifecycleRuntime.startTaskRunByRunId(...args);
}
export function recordTaskRunProgressByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["recordTaskRunProgressByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["recordTaskRunProgressByRunId"]> {
return detachedTaskLifecycleRuntime.recordTaskRunProgressByRunId(...args);
}
export function completeTaskRunByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["completeTaskRunByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["completeTaskRunByRunId"]> {
return detachedTaskLifecycleRuntime.completeTaskRunByRunId(...args);
}
export function failTaskRunByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["failTaskRunByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["failTaskRunByRunId"]> {
return detachedTaskLifecycleRuntime.failTaskRunByRunId(...args);
}
export function setDetachedTaskDeliveryStatusByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["setDetachedTaskDeliveryStatusByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["setDetachedTaskDeliveryStatusByRunId"]> {
return detachedTaskLifecycleRuntime.setDetachedTaskDeliveryStatusByRunId(...args);
}