mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-25 16:12:13 +00:00
test(agents): split subagent persistence restart case
This commit is contained in:
179
src/agents/subagent-registry.persistence.resume.test.ts
Normal file
179
src/agents/subagent-registry.persistence.resume.test.ts
Normal file
@@ -0,0 +1,179 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import "./subagent-registry.mocks.shared.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
drainSessionStoreLockQueuesForTest,
|
||||
} from "../config/sessions/store.js";
|
||||
import { captureEnv } from "../test-utils/env.js";
|
||||
|
||||
const { announceSpy } = vi.hoisted(() => ({
|
||||
announceSpy: vi.fn(async () => true),
|
||||
}));
|
||||
vi.mock("./subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: announceSpy,
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-orphan-recovery.js", () => ({
|
||||
scheduleOrphanRecovery: vi.fn(),
|
||||
}));
|
||||
|
||||
let initSubagentRegistry: typeof import("./subagent-registry.js").initSubagentRegistry;
|
||||
let listSubagentRunsForRequester: typeof import("./subagent-registry.js").listSubagentRunsForRequester;
|
||||
let registerSubagentRun: typeof import("./subagent-registry.js").registerSubagentRun;
|
||||
let resetSubagentRegistryForTests: typeof import("./subagent-registry.js").resetSubagentRegistryForTests;
|
||||
|
||||
async function loadSubagentRegistryModules(): Promise<void> {
|
||||
vi.resetModules();
|
||||
({
|
||||
initSubagentRegistry,
|
||||
listSubagentRunsForRequester,
|
||||
registerSubagentRun,
|
||||
resetSubagentRegistryForTests,
|
||||
} = await import("./subagent-registry.js"));
|
||||
}
|
||||
|
||||
describe("subagent registry persistence resume", () => {
|
||||
const envSnapshot = captureEnv(["OPENCLAW_STATE_DIR"]);
|
||||
let tempStateDir: string | null = null;
|
||||
|
||||
const resolveSessionStorePath = (stateDir: string, agentId: string) =>
|
||||
path.join(stateDir, "agents", agentId, "sessions", "sessions.json");
|
||||
|
||||
const readSessionStore = async (storePath: string) => {
|
||||
try {
|
||||
const raw = await fs.readFile(storePath, "utf8");
|
||||
const parsed = JSON.parse(raw) as unknown;
|
||||
if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) {
|
||||
return parsed as Record<string, Record<string, unknown>>;
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
return {} as Record<string, Record<string, unknown>>;
|
||||
};
|
||||
|
||||
const writeChildSessionEntry = async (params: {
|
||||
sessionKey: string;
|
||||
sessionId?: string;
|
||||
updatedAt?: number;
|
||||
}) => {
|
||||
if (!tempStateDir) {
|
||||
throw new Error("tempStateDir not initialized");
|
||||
}
|
||||
const storePath = resolveSessionStorePath(tempStateDir, "main");
|
||||
const store = await readSessionStore(storePath);
|
||||
store[params.sessionKey] = {
|
||||
...store[params.sessionKey],
|
||||
sessionId: params.sessionId ?? `sess-${Date.now()}`,
|
||||
updatedAt: params.updatedAt ?? Date.now(),
|
||||
};
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8");
|
||||
return storePath;
|
||||
};
|
||||
|
||||
const flushQueuedRegistryWork = async () => {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await new Promise((resolve) => setTimeout(resolve, 25));
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
await loadSubagentRegistryModules();
|
||||
const { callGateway } = await import("../gateway/call.js");
|
||||
const { onAgentEvent } = await import("../infra/agent-events.js");
|
||||
vi.mocked(callGateway).mockReset();
|
||||
vi.mocked(callGateway).mockResolvedValue({
|
||||
status: "ok",
|
||||
startedAt: 111,
|
||||
endedAt: 222,
|
||||
});
|
||||
vi.mocked(onAgentEvent).mockReset();
|
||||
vi.mocked(onAgentEvent).mockReturnValue(() => undefined);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
announceSpy.mockClear();
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
await drainSessionStoreLockQueuesForTest();
|
||||
clearSessionStoreCacheForTest();
|
||||
if (tempStateDir) {
|
||||
await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });
|
||||
tempStateDir = null;
|
||||
}
|
||||
envSnapshot.restore();
|
||||
});
|
||||
|
||||
it("persists runs to disk and resumes after restart", async () => {
|
||||
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
|
||||
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
||||
|
||||
const { callGateway } = await import("../gateway/call.js");
|
||||
let releaseInitialWait:
|
||||
| ((value: { status: "ok"; startedAt: number; endedAt: number }) => void)
|
||||
| undefined;
|
||||
vi.mocked(callGateway)
|
||||
.mockImplementationOnce(
|
||||
async () =>
|
||||
await new Promise((resolve) => {
|
||||
releaseInitialWait = resolve as typeof releaseInitialWait;
|
||||
}),
|
||||
)
|
||||
.mockResolvedValueOnce({
|
||||
status: "ok",
|
||||
startedAt: 111,
|
||||
endedAt: 222,
|
||||
});
|
||||
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: { channel: " whatsapp ", accountId: " acct-main " },
|
||||
requesterDisplayKey: "main",
|
||||
task: "do the thing",
|
||||
cleanup: "keep",
|
||||
});
|
||||
await writeChildSessionEntry({
|
||||
sessionKey: "agent:main:subagent:test",
|
||||
sessionId: "sess-test",
|
||||
});
|
||||
|
||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||
const raw = await fs.readFile(registryPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> };
|
||||
expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1");
|
||||
const run = parsed.runs?.["run-1"] as
|
||||
| {
|
||||
requesterOrigin?: { channel?: string; accountId?: string };
|
||||
}
|
||||
| undefined;
|
||||
expect(run).toBeDefined();
|
||||
if (run) {
|
||||
expect("requesterAccountId" in run).toBe(false);
|
||||
expect("requesterChannel" in run).toBe(false);
|
||||
}
|
||||
expect(run?.requesterOrigin?.channel).toBe("whatsapp");
|
||||
expect(run?.requesterOrigin?.accountId).toBe("acct-main");
|
||||
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
initSubagentRegistry();
|
||||
releaseInitialWait?.({
|
||||
status: "ok",
|
||||
startedAt: 111,
|
||||
endedAt: 222,
|
||||
});
|
||||
|
||||
await flushQueuedRegistryWork();
|
||||
|
||||
expect(announceSpy).not.toHaveBeenCalled();
|
||||
|
||||
const restored = listSubagentRunsForRequester("agent:main:main")[0];
|
||||
expect(restored?.childSessionKey).toBe("agent:main:subagent:test");
|
||||
expect(restored?.requesterOrigin?.channel).toBe("whatsapp");
|
||||
expect(restored?.requesterOrigin?.accountId).toBe("acct-main");
|
||||
});
|
||||
});
|
||||
@@ -16,13 +16,16 @@ vi.mock("./subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: announceSpy,
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-orphan-recovery.js", () => ({
|
||||
scheduleOrphanRecovery: vi.fn(),
|
||||
}));
|
||||
|
||||
let addSubagentRunForTests: typeof import("./subagent-registry.js").addSubagentRunForTests;
|
||||
let clearSubagentRunSteerRestart: typeof import("./subagent-registry.js").clearSubagentRunSteerRestart;
|
||||
let getSubagentRunByChildSessionKey: typeof import("./subagent-registry.js").getSubagentRunByChildSessionKey;
|
||||
let getLatestSubagentRunByChildSessionKey: typeof import("./subagent-registry.js").getLatestSubagentRunByChildSessionKey;
|
||||
let initSubagentRegistry: typeof import("./subagent-registry.js").initSubagentRegistry;
|
||||
let listSubagentRunsForRequester: typeof import("./subagent-registry.js").listSubagentRunsForRequester;
|
||||
let registerSubagentRun: typeof import("./subagent-registry.js").registerSubagentRun;
|
||||
let resetSubagentRegistryForTests: typeof import("./subagent-registry.js").resetSubagentRegistryForTests;
|
||||
let loadSubagentRegistryFromDisk: typeof import("./subagent-registry.store.js").loadSubagentRegistryFromDisk;
|
||||
|
||||
@@ -35,7 +38,6 @@ async function loadSubagentRegistryModules(): Promise<void> {
|
||||
getSubagentRunByChildSessionKey,
|
||||
initSubagentRegistry,
|
||||
listSubagentRunsForRequester,
|
||||
registerSubagentRun,
|
||||
resetSubagentRegistryForTests,
|
||||
} = await import("./subagent-registry.js"));
|
||||
({ loadSubagentRegistryFromDisk } = await import("./subagent-registry.store.js"));
|
||||
@@ -208,79 +210,6 @@ describe("subagent registry persistence", () => {
|
||||
envSnapshot.restore();
|
||||
});
|
||||
|
||||
it("persists runs to disk and resumes after restart", async () => {
|
||||
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
|
||||
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
||||
|
||||
const { callGateway } = await import("../gateway/call.js");
|
||||
let releaseInitialWait:
|
||||
| ((value: { status: "ok"; startedAt: number; endedAt: number }) => void)
|
||||
| undefined;
|
||||
vi.mocked(callGateway)
|
||||
.mockImplementationOnce(
|
||||
async () =>
|
||||
await new Promise((resolve) => {
|
||||
releaseInitialWait = resolve as typeof releaseInitialWait;
|
||||
}),
|
||||
)
|
||||
.mockResolvedValueOnce({
|
||||
status: "ok",
|
||||
startedAt: 111,
|
||||
endedAt: 222,
|
||||
});
|
||||
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: { channel: " whatsapp ", accountId: " acct-main " },
|
||||
requesterDisplayKey: "main",
|
||||
task: "do the thing",
|
||||
cleanup: "keep",
|
||||
});
|
||||
await writeChildSessionEntry({
|
||||
sessionKey: "agent:main:subagent:test",
|
||||
sessionId: "sess-test",
|
||||
});
|
||||
|
||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||
const raw = await fs.readFile(registryPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> };
|
||||
expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1");
|
||||
const run = parsed.runs?.["run-1"] as
|
||||
| {
|
||||
requesterOrigin?: { channel?: string; accountId?: string };
|
||||
}
|
||||
| undefined;
|
||||
expect(run).toBeDefined();
|
||||
if (run) {
|
||||
expect("requesterAccountId" in run).toBe(false);
|
||||
expect("requesterChannel" in run).toBe(false);
|
||||
}
|
||||
expect(run?.requesterOrigin?.channel).toBe("whatsapp");
|
||||
expect(run?.requesterOrigin?.accountId).toBe("acct-main");
|
||||
|
||||
// Simulate a process restart: module re-import should load persisted runs
|
||||
// and preserve requester origin for non-completion-message runs.
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
initSubagentRegistry();
|
||||
releaseInitialWait?.({
|
||||
status: "ok",
|
||||
startedAt: 111,
|
||||
endedAt: 222,
|
||||
});
|
||||
|
||||
// allow queued async wait/cleanup to execute
|
||||
await flushQueuedRegistryWork();
|
||||
|
||||
expect(announceSpy).not.toHaveBeenCalled();
|
||||
|
||||
const restored = listSubagentRunsForRequester("agent:main:main")[0];
|
||||
expect(restored?.childSessionKey).toBe("agent:main:subagent:test");
|
||||
expect(restored?.requesterOrigin?.channel).toBe("whatsapp");
|
||||
expect(restored?.requesterOrigin?.accountId).toBe("acct-main");
|
||||
});
|
||||
|
||||
it("persists completed subagent timing into the child session entry", async () => {
|
||||
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
|
||||
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
||||
|
||||
Reference in New Issue
Block a user