test: speed up task registry tests

This commit is contained in:
Peter Steinberger
2026-04-07 10:00:14 +01:00
parent 5a1cf20aee
commit a14aef191b
2 changed files with 185 additions and 156 deletions

View File

@@ -33,6 +33,43 @@ let sweeper: NodeJS.Timeout | null = null;
let deferredSweep: NodeJS.Timeout | null = null;
let sweepInProgress = false;
type TaskRegistryMaintenanceRuntime = {
readAcpSessionEntry: typeof readAcpSessionEntry;
loadSessionStore: typeof loadSessionStore;
resolveStorePath: typeof resolveStorePath;
isCronJobActive: typeof isCronJobActive;
getAgentRunContext: typeof getAgentRunContext;
parseAgentSessionKey: typeof parseAgentSessionKey;
deleteTaskRecordById: typeof deleteTaskRecordById;
ensureTaskRegistryReady: typeof ensureTaskRegistryReady;
getTaskById: typeof getTaskById;
listTaskRecords: typeof listTaskRecords;
markTaskLostById: typeof markTaskLostById;
maybeDeliverTaskTerminalUpdate: typeof maybeDeliverTaskTerminalUpdate;
resolveTaskForLookupToken: typeof resolveTaskForLookupToken;
setTaskCleanupAfterById: typeof setTaskCleanupAfterById;
};
const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
readAcpSessionEntry,
loadSessionStore,
resolveStorePath,
isCronJobActive,
getAgentRunContext,
parseAgentSessionKey,
deleteTaskRecordById,
ensureTaskRegistryReady,
getTaskById,
listTaskRecords,
markTaskLostById,
maybeDeliverTaskTerminalUpdate,
resolveTaskForLookupToken,
setTaskCleanupAfterById,
};
let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime =
defaultTaskRegistryMaintenanceRuntime;
export type TaskRegistryMaintenanceSummary = {
reconciled: number;
cleanupStamped: number;
@@ -70,7 +107,7 @@ function hasActiveCliRun(task: TaskRecord): boolean {
const candidateRunIds = [task.sourceId, task.runId];
for (const candidate of candidateRunIds) {
const runId = candidate?.trim();
if (runId && getAgentRunContext(runId)) {
if (runId && taskRegistryMaintenanceRuntime.getAgentRunContext(runId)) {
return true;
}
}
@@ -80,7 +117,7 @@ function hasActiveCliRun(task: TaskRecord): boolean {
function hasBackingSession(task: TaskRecord): boolean {
if (task.runtime === "cron") {
const jobId = task.sourceId?.trim();
return jobId ? isCronJobActive(jobId) : false;
return jobId ? taskRegistryMaintenanceRuntime.isCronJobActive(jobId) : false;
}
if (task.runtime === "cli" && hasActiveCliRun(task)) {
@@ -92,7 +129,7 @@ function hasBackingSession(task: TaskRecord): boolean {
return true;
}
if (task.runtime === "acp") {
const acpEntry = readAcpSessionEntry({
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({
sessionKey: childSessionKey,
});
if (!acpEntry || acpEntry.storeReadFailed) {
@@ -107,9 +144,9 @@ function hasBackingSession(task: TaskRecord): boolean {
return false;
}
}
const agentId = parseAgentSessionKey(childSessionKey)?.agentId;
const storePath = resolveStorePath(undefined, { agentId });
const store = loadSessionStore(storePath);
const agentId = taskRegistryMaintenanceRuntime.parseAgentSessionKey(childSessionKey)?.agentId;
const storePath = taskRegistryMaintenanceRuntime.resolveStorePath(undefined, { agentId });
const store = taskRegistryMaintenanceRuntime.loadSessionStore(storePath);
return Boolean(findSessionEntryByKey(store, childSessionKey));
}
@@ -149,14 +186,14 @@ function resolveCleanupAfter(task: TaskRecord): number {
function markTaskLost(task: TaskRecord, now: number): TaskRecord {
const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter;
const updated =
markTaskLostById({
taskRegistryMaintenanceRuntime.markTaskLostById({
taskId: task.taskId,
endedAt: task.endedAt ?? now,
lastEventAt: now,
error: task.error ?? "backing session missing",
cleanupAfter,
}) ?? task;
void maybeDeliverTaskTerminalUpdate(updated.taskId);
void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId);
return updated;
}
@@ -185,8 +222,10 @@ export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): Task
}
export function reconcileInspectableTasks(): TaskRecord[] {
ensureTaskRegistryReady();
return listTaskRecords().map((task) => reconcileTaskRecordForOperatorInspection(task));
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
return taskRegistryMaintenanceRuntime
.listTaskRecords()
.map((task) => reconcileTaskRecordForOperatorInspection(task));
}
export function getInspectableTaskRegistrySummary(): TaskRegistrySummary {
@@ -199,18 +238,18 @@ export function getInspectableTaskAuditSummary(): TaskAuditSummary {
}
export function reconcileTaskLookupToken(token: string): TaskRecord | undefined {
ensureTaskRegistryReady();
const task = resolveTaskForLookupToken(token);
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const task = taskRegistryMaintenanceRuntime.resolveTaskForLookupToken(token);
return task ? reconcileTaskRecordForOperatorInspection(task) : undefined;
}
export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary {
ensureTaskRegistryReady();
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const now = Date.now();
let reconciled = 0;
let cleanupStamped = 0;
let pruned = 0;
for (const task of listTaskRecords()) {
for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) {
if (shouldMarkLost(task, now)) {
reconciled += 1;
continue;
@@ -246,15 +285,15 @@ function startScheduledSweep() {
}
export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintenanceSummary> {
ensureTaskRegistryReady();
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const now = Date.now();
let reconciled = 0;
let cleanupStamped = 0;
let pruned = 0;
const tasks = listTaskRecords();
const tasks = taskRegistryMaintenanceRuntime.listTaskRecords();
let processed = 0;
for (const task of tasks) {
const current = getTaskById(task.taskId);
const current = taskRegistryMaintenanceRuntime.getTaskById(task.taskId);
if (!current) {
continue;
}
@@ -269,7 +308,10 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
continue;
}
if (shouldPruneTerminalTask(current, now) && deleteTaskRecordById(current.taskId)) {
if (
shouldPruneTerminalTask(current, now) &&
taskRegistryMaintenanceRuntime.deleteTaskRecordById(current.taskId)
) {
pruned += 1;
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
@@ -279,7 +321,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
if (
shouldStampCleanupAfter(current) &&
setTaskCleanupAfterById({
taskRegistryMaintenanceRuntime.setTaskCleanupAfterById({
taskId: current.taskId,
cleanupAfter: resolveCleanupAfter(current),
})
@@ -299,7 +341,7 @@ export async function sweepTaskRegistry(): Promise<TaskRegistryMaintenanceSummar
}
export function startTaskRegistryMaintenance() {
ensureTaskRegistryReady();
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
deferredSweep = setTimeout(() => {
deferredSweep = null;
startScheduledSweep();
@@ -326,6 +368,16 @@ export function stopTaskRegistryMaintenance() {
export const stopTaskRegistryMaintenanceForTests = stopTaskRegistryMaintenance;
export function setTaskRegistryMaintenanceRuntimeForTests(
runtime: TaskRegistryMaintenanceRuntime,
): void {
taskRegistryMaintenanceRuntime = runtime;
}
export function resetTaskRegistryMaintenanceRuntimeForTests(): void {
taskRegistryMaintenanceRuntime = defaultTaskRegistryMaintenanceRuntime;
}
export function getReconciledTaskById(taskId: string): TaskRecord | undefined {
const task = getTaskById(taskId);
return task ? reconcileTaskRecordForOperatorInspection(task) : undefined;

View File

@@ -1,4 +1,5 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js";
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js";
import { resetCronActiveJobsForTests } from "../cron/active-jobs.js";
import {
@@ -11,10 +12,12 @@ import {
resetHeartbeatWakeStateForTests,
} from "../infra/heartbeat-wake.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js";
import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js";
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForOwnerKey,
findLatestTaskForRelatedSessionKey,
@@ -41,8 +44,10 @@ import {
import {
getInspectableTaskAuditSummary,
previewTaskRegistryMaintenance,
resetTaskRegistryMaintenanceRuntimeForTests,
reconcileInspectableTasks,
runTaskRegistryMaintenance,
setTaskRegistryMaintenanceRuntimeForTests,
startTaskRegistryMaintenance,
stopTaskRegistryMaintenanceForTests,
sweepTaskRegistry,
@@ -71,39 +76,23 @@ vi.mock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
async function loadFreshTaskRegistryModulesForControlTest() {
vi.resetModules();
vi.doMock("../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.doMock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
const registry = await import("./task-registry.js");
registry.setTaskRegistryDeliveryRuntimeForTests({
sendMessage: hoisted.sendMessageMock,
});
return registry;
}
async function loadFreshTaskRegistryMaintenanceModuleForTest(params: {
function configureTaskRegistryMaintenanceRuntimeForTest(params: {
currentTasks: Map<string, ReturnType<typeof createTaskRecord>>;
snapshotTasks: ReturnType<typeof createTaskRecord>[];
}) {
vi.resetModules();
vi.doMock("../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: () => ({ entry: undefined, storeReadFailed: false }),
}));
vi.doMock("../config/sessions.js", () => ({
}): void {
const emptyAcpEntry = {
cfg: {} as never,
storePath: "",
sessionKey: "",
storeSessionKey: "",
entry: undefined,
storeReadFailed: false,
} satisfies AcpSessionStoreEntry;
setTaskRegistryMaintenanceRuntimeForTests({
readAcpSessionEntry: () => emptyAcpEntry,
loadSessionStore: () => ({}),
resolveStorePath: () => "",
}));
vi.doMock("../routing/session-key.js", () => ({
parseAgentSessionKey: () => undefined,
}));
vi.doMock("./runtime-internal.js", () => ({
parseAgentSessionKey: () => null as ParsedAgentSessionKey | null,
deleteTaskRecordById: (taskId: string) => params.currentTasks.delete(taskId),
ensureTaskRegistryReady: () => {},
getTaskById: (taskId: string) => params.currentTasks.get(taskId),
@@ -130,7 +119,7 @@ async function loadFreshTaskRegistryMaintenanceModuleForTest(params: {
params.currentTasks.set(patch.taskId, next);
return next;
},
maybeDeliverTaskTerminalUpdate: () => false,
maybeDeliverTaskTerminalUpdate: async () => null,
resolveTaskForLookupToken: () => undefined,
setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => {
const current = params.currentTasks.get(patch.taskId);
@@ -144,8 +133,7 @@ async function loadFreshTaskRegistryMaintenanceModuleForTest(params: {
params.currentTasks.set(patch.taskId, next);
return next;
},
}));
return await import("./task-registry.maintenance.js");
});
}
async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) {
@@ -229,6 +217,7 @@ describe("task-registry", () => {
resetAgentRunContextForTest();
resetCronActiveJobsForTests();
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryMaintenanceRuntimeForTests();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
hoisted.sendMessageMock.mockReset();
@@ -1424,7 +1413,7 @@ describe("task-registry", () => {
lastEventAt: now,
};
const currentTasks = new Map([[snapshotTask.taskId, currentTask]]);
const { runTaskRegistryMaintenance } = await loadFreshTaskRegistryMaintenanceModuleForTest({
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks,
snapshotTasks: [staleTask],
});
@@ -1464,7 +1453,7 @@ describe("task-registry", () => {
cleanupAfter: now + 60_000,
};
const currentTasks = new Map([[snapshotTask.taskId, currentTask]]);
const { sweepTaskRegistry } = await loadFreshTaskRegistryMaintenanceModuleForTest({
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks,
snapshotTasks: [staleTask],
});
@@ -1775,122 +1764,110 @@ describe("task-registry", () => {
});
it("cancels ACP-backed tasks through the ACP session manager", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
const registry = await loadFreshTaskRegistryModulesForControlTest();
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
registry.resetTaskRegistryForTests();
try {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const task = registry.createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
const task = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-cancel-acp",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:codex:acp:child",
reason: "task-cancel",
}),
);
expect(result).toMatchObject({
found: true,
cancelled: true,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled",
error: "Cancelled by operator.",
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-cancel-acp",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
const result = await registry.cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:codex:acp:child",
reason: "task-cancel",
content: "Background task cancelled: ACP background task (run run-canc).",
}),
);
expect(result).toMatchObject({
found: true,
cancelled: true,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled",
error: "Cancelled by operator.",
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
content: "Background task cancelled: ACP background task (run run-canc).",
}),
),
);
} finally {
registry.resetTaskRegistryForTests();
}
),
);
});
});
it("cancels subagent-backed tasks through subagent control", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
const registry = await loadFreshTaskRegistryModulesForControlTest();
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
registry.resetTaskRegistryForTests();
try {
hoisted.killSubagentRunAdminMock.mockResolvedValue({
found: true,
killed: true,
});
hoisted.killSubagentRunAdminMock.mockResolvedValue({
found: true,
killed: true,
});
const task = registry.createTaskRecord({
runtime: "subagent",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
const task = createTaskRecord({
runtime: "subagent",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:worker:subagent:child",
runId: "run-cancel-subagent",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:worker:subagent:child",
}),
);
expect(result).toMatchObject({
found: true,
cancelled: true,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled",
error: "Cancelled by operator.",
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:worker:subagent:child",
runId: "run-cancel-subagent",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
const result = await registry.cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:worker:subagent:child",
content: "Background task cancelled: Subagent task (run run-canc).",
}),
);
expect(result).toMatchObject({
found: true,
cancelled: true,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled",
error: "Cancelled by operator.",
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
content: "Background task cancelled: Subagent task (run run-canc).",
}),
),
);
} finally {
registry.resetTaskRegistryForTests();
}
),
);
});
});
});