Files
openclaw/src/tasks/task-executor.test.ts

200 lines
5.8 KiB
TypeScript

import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { getFlowById, listFlowRecords, resetFlowRegistryForTests } from "./flow-registry.js";
import {
completeTaskRunByRunId,
createQueuedTaskRun,
createRunningTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./task-executor.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("./task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
async function withTaskExecutorStateDir(run: (root: string) => Promise<void>): Promise<void> {
await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryForTests();
resetFlowRegistryForTests();
}
});
}
describe("task-executor", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
resetFlowRegistryForTests();
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
});
it("advances a queued run through start and completion", async () => {
await withTaskExecutorStateDir(async () => {
const created = createQueuedTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
runId: "run-executor-queued",
task: "Investigate issue",
});
expect(created.status).toBe("queued");
startTaskRunByRunId({
runId: "run-executor-queued",
startedAt: 100,
lastEventAt: 100,
eventSummary: "Started.",
});
completeTaskRunByRunId({
runId: "run-executor-queued",
endedAt: 250,
lastEventAt: 250,
terminalSummary: "Done.",
});
expect(findTaskByRunId("run-executor-queued")).toMatchObject({
taskId: created.taskId,
status: "succeeded",
startedAt: 100,
endedAt: 250,
terminalSummary: "Done.",
});
});
});
it("records progress, failure, and delivery status through the executor", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-fail",
task: "Write summary",
startedAt: 10,
});
recordTaskRunProgressByRunId({
runId: "run-executor-fail",
lastEventAt: 20,
progressSummary: "Collecting results",
eventSummary: "Collecting results",
});
failTaskRunByRunId({
runId: "run-executor-fail",
endedAt: 40,
lastEventAt: 40,
error: "tool failed",
});
setDetachedTaskDeliveryStatusByRunId({
runId: "run-executor-fail",
deliveryStatus: "failed",
});
expect(findTaskByRunId("run-executor-fail")).toMatchObject({
taskId: created.taskId,
status: "failed",
progressSummary: "Collecting results",
error: "tool failed",
deliveryStatus: "failed",
});
});
});
it("auto-creates a one-task flow and keeps it synced with task status", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-flow",
task: "Write summary",
startedAt: 10,
deliveryStatus: "pending",
});
expect(created.parentFlowId).toEqual(expect.any(String));
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
ownerSessionKey: "agent:main:main",
status: "running",
goal: "Write summary",
notifyPolicy: "done_only",
});
completeTaskRunByRunId({
runId: "run-executor-flow",
endedAt: 40,
lastEventAt: 40,
terminalSummary: "Done.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "succeeded",
endedAt: 40,
goal: "Write summary",
notifyPolicy: "done_only",
});
});
});
it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "cli",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:main",
runId: "run-executor-cli",
task: "Foreground gateway run",
deliveryStatus: "not_applicable",
startedAt: 10,
});
expect(created.parentFlowId).toBeUndefined();
expect(listFlowRecords()).toEqual([]);
});
});
});