test: stabilize cron and pairing shard hangs

This commit is contained in:
Peter Steinberger
2026-05-06 03:36:41 +01:00
parent 0bdba47a3e
commit ea391c6df2
2 changed files with 25 additions and 183 deletions

View File

@@ -1,5 +1,4 @@
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
@@ -7,184 +6,17 @@ import {
} from "../infra/heartbeat-wake.js";
import type { CronEvent, CronServiceDeps } from "./service.js";
import { CronService } from "./service.js";
import { createDeferred, createNoopLogger, installCronTestHooks } from "./service.test-harness.js";
import {
createCronStoreHarness,
createDeferred,
createNoopLogger,
installCronTestHooks,
} from "./service.test-harness.js";
const noopLogger = createNoopLogger();
installCronTestHooks({ logger: noopLogger });
type FakeFsEntry =
| { kind: "file"; content: string; mtimeMs: number }
| { kind: "dir"; mtimeMs: number };
const fsState = vi.hoisted(() => ({
entries: new Map<string, FakeFsEntry>(),
nowMs: 0,
fixtureCount: 0,
}));
const abs = (p: string) => path.resolve(p);
const fixturesRoot = abs(path.join("__openclaw_vitest__", "cron", "runs-one-shot"));
const isFixturePath = (p: string) => {
const resolved = abs(p);
const rootPrefix = `${fixturesRoot}${path.sep}`;
return resolved === fixturesRoot || resolved.startsWith(rootPrefix);
};
function bumpMtimeMs() {
fsState.nowMs += 1;
return fsState.nowMs;
}
function ensureDir(dirPath: string) {
let current = abs(dirPath);
while (true) {
if (!fsState.entries.has(current)) {
fsState.entries.set(current, { kind: "dir", mtimeMs: bumpMtimeMs() });
}
const parent = path.dirname(current);
if (parent === current) {
break;
}
current = parent;
}
}
function setFile(filePath: string, content: string) {
const resolved = abs(filePath);
ensureDir(path.dirname(resolved));
fsState.entries.set(resolved, { kind: "file", content, mtimeMs: bumpMtimeMs() });
}
async function makeStorePath() {
const dir = path.join(fixturesRoot, `case-${fsState.fixtureCount++}`);
ensureDir(dir);
const storePath = path.join(dir, "cron", "jobs.json");
ensureDir(path.dirname(storePath));
return { storePath, cleanup: async () => {} };
}
vi.mock("node:fs", async () => {
const actual = await vi.importActual<typeof import("node:fs")>("node:fs");
const pathMod = await import("node:path");
const absInMock = (p: string) => pathMod.resolve(p);
const isFixtureInMock = (p: string) => {
const resolved = absInMock(p);
const rootPrefix = `${absInMock(fixturesRoot)}${pathMod.sep}`;
return resolved === absInMock(fixturesRoot) || resolved.startsWith(rootPrefix);
};
const mkErr = (code: string, message: string) => Object.assign(new Error(message), { code });
const promises = {
...actual.promises,
mkdir: async (p: string) => {
if (!isFixtureInMock(p)) {
return await actual.promises.mkdir(p, { recursive: true });
}
ensureDir(p);
return undefined;
},
readFile: async (p: string) => {
if (!isFixtureInMock(p)) {
return await actual.promises.readFile(p, "utf-8");
}
const entry = fsState.entries.get(absInMock(p));
if (!entry || entry.kind !== "file") {
throw mkErr("ENOENT", `ENOENT: no such file or directory, open '${p}'`);
}
return entry.content;
},
writeFile: async (p: string, data: string | Uint8Array) => {
if (!isFixtureInMock(p)) {
return await actual.promises.writeFile(p, data, "utf-8");
}
const content = typeof data === "string" ? data : Buffer.from(data).toString("utf-8");
setFile(p, content);
},
rename: async (from: string, to: string) => {
if (!isFixtureInMock(from) || !isFixtureInMock(to)) {
return await actual.promises.rename(from, to);
}
const fromAbs = absInMock(from);
const toAbs = absInMock(to);
const entry = fsState.entries.get(fromAbs);
if (!entry || entry.kind !== "file") {
throw mkErr("ENOENT", `ENOENT: no such file or directory, rename '${from}' -> '${to}'`);
}
ensureDir(pathMod.dirname(toAbs));
fsState.entries.delete(fromAbs);
fsState.entries.set(toAbs, { ...entry, mtimeMs: bumpMtimeMs() });
},
copyFile: async (from: string, to: string) => {
if (!isFixtureInMock(from) || !isFixtureInMock(to)) {
return await actual.promises.copyFile(from, to);
}
const entry = fsState.entries.get(absInMock(from));
if (!entry || entry.kind !== "file") {
throw mkErr("ENOENT", `ENOENT: no such file or directory, copyfile '${from}' -> '${to}'`);
}
setFile(to, entry.content);
},
stat: async (p: string) => {
if (!isFixtureInMock(p)) {
return await actual.promises.stat(p);
}
const entry = fsState.entries.get(absInMock(p));
if (!entry) {
throw mkErr("ENOENT", `ENOENT: no such file or directory, stat '${p}'`);
}
return {
mtimeMs: entry.mtimeMs,
isDirectory: () => entry.kind === "dir",
isFile: () => entry.kind === "file",
};
},
access: async (p: string) => {
if (!isFixtureInMock(p)) {
return await actual.promises.access(p);
}
const entry = fsState.entries.get(absInMock(p));
if (!entry) {
throw mkErr("ENOENT", `ENOENT: no such file or directory, access '${p}'`);
}
},
unlink: async (p: string) => {
if (!isFixtureInMock(p)) {
return await actual.promises.unlink(p);
}
fsState.entries.delete(absInMock(p));
},
} as unknown as typeof actual.promises;
const wrapped = { ...actual, promises };
return { ...wrapped, default: wrapped };
});
vi.mock("node:fs/promises", async () => {
const actual = await vi.importActual<typeof import("node:fs/promises")>("node:fs/promises");
const wrapped = {
...actual,
mkdir: async (p: string, _opts?: unknown) => {
if (!isFixturePath(p)) {
return await actual.mkdir(p, { recursive: true });
}
ensureDir(p);
return undefined;
},
writeFile: async (p: string, data: string, _enc?: unknown) => {
if (!isFixturePath(p)) {
return await actual.writeFile(p, data, "utf-8");
}
setFile(p, data);
},
};
return { ...wrapped, default: wrapped };
});
beforeEach(() => {
fsState.entries.clear();
fsState.nowMs = 0;
ensureDir(fixturesRoot);
const { makeStorePath } = createCronStoreHarness({
prefix: "openclaw-cron-runs-one-shot-",
});
function createCronEventHarness() {
@@ -229,7 +61,6 @@ type CronHarnessOptions = {
};
async function createCronHarness(options: CronHarnessOptions = {}) {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeat = vi.fn();
@@ -377,6 +208,7 @@ function expectMainSystemEventPosted(enqueueSystemEvent: unknown, text: string)
}
async function stopCronAndCleanup(cron: CronService, store: { cleanup: () => Promise<void> }) {
await cron.status();
cron.stop();
await store.cleanup();
}
@@ -678,7 +510,6 @@ describe("CronService", () => {
});
it("rejects unsupported session/payload combinations", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const cron = createStartedCronService(
@@ -712,7 +543,6 @@ describe("CronService", () => {
}),
).rejects.toThrow(/isolated.*cron jobs require/);
cron.stop();
await store.cleanup();
await stopCronAndCleanup(cron, store);
});
});

View File

@@ -211,8 +211,20 @@ describe("device pairing tokens", () => {
},
baseDir,
);
const originalTs = first.request.ts;
await new Promise((resolve) => setTimeout(resolve, 20));
const originalTs = first.request.ts - 1_000;
const paths = resolvePairingPaths(baseDir, "devices");
const pendingById = JSON.parse(await readFile(paths.pendingPath, "utf8")) as Record<
string,
{ ts: number }
>;
const pending = pendingById[first.request.requestId];
expect(pending).toBeDefined();
if (!pending) {
throw new Error("expected pending pairing request");
}
pending.ts = originalTs;
await writeFile(paths.pendingPath, JSON.stringify(pendingById, null, 2));
const second = await requestDevicePairing(
{
deviceId: "device-1",