mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-05 04:30:24 +00:00
413 lines
14 KiB
TypeScript
413 lines
14 KiB
TypeScript
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
|
import * as sessions from "../config/sessions.js";
|
|
import * as gateway from "../gateway/call.js";
|
|
import * as sessionUtils from "../gateway/session-utils.fs.js";
|
|
import { recoverOrphanedSubagentSessions } from "./subagent-orphan-recovery.js";
|
|
import * as subagentRegistry from "./subagent-registry.js";
|
|
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
|
|
|
// Mock dependencies before importing the module under test
|
|
vi.mock("../config/config.js", () => ({
|
|
loadConfig: vi.fn(() => ({
|
|
session: { store: undefined },
|
|
})),
|
|
}));
|
|
|
|
vi.mock("../config/sessions.js", () => ({
|
|
loadSessionStore: vi.fn(() => ({})),
|
|
resolveAgentIdFromSessionKey: vi.fn(() => "main"),
|
|
resolveStorePath: vi.fn(() => "/tmp/test-sessions.json"),
|
|
updateSessionStore: vi.fn(async () => {}),
|
|
}));
|
|
|
|
vi.mock("../gateway/call.js", () => ({
|
|
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
|
|
}));
|
|
|
|
vi.mock("../gateway/session-utils.fs.js", () => ({
|
|
readSessionMessages: vi.fn(() => []),
|
|
}));
|
|
|
|
vi.mock("./subagent-registry.js", () => ({
|
|
replaceSubagentRunAfterSteer: vi.fn(() => true),
|
|
}));
|
|
|
|
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
|
|
return {
|
|
runId: "run-1",
|
|
childSessionKey: "agent:main:subagent:test-session-1",
|
|
requesterSessionKey: "agent:main:signal:direct:+1234567890",
|
|
requesterDisplayKey: "main",
|
|
task: "Test task: implement feature X",
|
|
cleanup: "delete",
|
|
createdAt: Date.now() - 60_000,
|
|
startedAt: Date.now() - 55_000,
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function createActiveRuns(...runs: SubagentRunRecord[]) {
|
|
return new Map(runs.map((run) => [run.runId, run] satisfies [string, SubagentRunRecord]));
|
|
}
|
|
|
|
async function expectSkippedRecovery(store: ReturnType<typeof sessions.loadSessionStore>) {
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue(store);
|
|
|
|
const result = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => createActiveRuns(createTestRunRecord()),
|
|
});
|
|
|
|
expect(result.recovered).toBe(0);
|
|
expect(result.skipped).toBe(1);
|
|
expect(gateway.callGateway).not.toHaveBeenCalled();
|
|
}
|
|
|
|
describe("subagent-orphan-recovery", () => {
|
|
beforeEach(() => {
|
|
vi.clearAllMocks();
|
|
});
|
|
|
|
afterEach(() => {
|
|
vi.restoreAllMocks();
|
|
});
|
|
|
|
it("recovers orphaned sessions with abortedLastRun=true", async () => {
|
|
const sessionEntry = {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
};
|
|
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": sessionEntry,
|
|
});
|
|
|
|
const run = createTestRunRecord();
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", run);
|
|
|
|
const result = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
});
|
|
|
|
expect(result.recovered).toBe(1);
|
|
expect(result.failed).toBe(0);
|
|
expect(result.skipped).toBe(0);
|
|
|
|
// Should have called callGateway to resume the session
|
|
expect(gateway.callGateway).toHaveBeenCalledOnce();
|
|
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
|
const opts = callArgs[0];
|
|
expect(opts.method).toBe("agent");
|
|
const params = opts.params as Record<string, unknown>;
|
|
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
|
|
expect(params.message).toContain("gateway reload");
|
|
expect(params.message).toContain("Test task: implement feature X");
|
|
expect(subagentRegistry.replaceSubagentRunAfterSteer).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
previousRunId: "run-1",
|
|
nextRunId: "test-run-id",
|
|
fallback: run,
|
|
}),
|
|
);
|
|
});
|
|
|
|
it("skips sessions that are not aborted", async () => {
|
|
await expectSkippedRecovery({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: false,
|
|
},
|
|
});
|
|
});
|
|
|
|
it("skips runs that have already ended", async () => {
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set(
|
|
"run-1",
|
|
createTestRunRecord({
|
|
endedAt: Date.now() - 1000,
|
|
}),
|
|
);
|
|
|
|
const result = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
});
|
|
|
|
expect(result.recovered).toBe(0);
|
|
expect(gateway.callGateway).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("handles multiple orphaned sessions", async () => {
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:session-a": {
|
|
sessionId: "id-a",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
"agent:main:subagent:session-b": {
|
|
sessionId: "id-b",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
"agent:main:subagent:session-c": {
|
|
sessionId: "id-c",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: false,
|
|
},
|
|
});
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set(
|
|
"run-a",
|
|
createTestRunRecord({
|
|
runId: "run-a",
|
|
childSessionKey: "agent:main:subagent:session-a",
|
|
task: "Task A",
|
|
}),
|
|
);
|
|
activeRuns.set(
|
|
"run-b",
|
|
createTestRunRecord({
|
|
runId: "run-b",
|
|
childSessionKey: "agent:main:subagent:session-b",
|
|
task: "Task B",
|
|
}),
|
|
);
|
|
activeRuns.set(
|
|
"run-c",
|
|
createTestRunRecord({
|
|
runId: "run-c",
|
|
childSessionKey: "agent:main:subagent:session-c",
|
|
task: "Task C",
|
|
}),
|
|
);
|
|
|
|
const result = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
});
|
|
|
|
expect(result.recovered).toBe(2);
|
|
expect(result.skipped).toBe(1);
|
|
expect(gateway.callGateway).toHaveBeenCalledTimes(2);
|
|
});
|
|
|
|
it("handles callGateway failure gracefully and preserves abortedLastRun flag", async () => {
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
});
|
|
|
|
vi.mocked(gateway.callGateway).mockRejectedValue(new Error("gateway unavailable"));
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord());
|
|
|
|
const result = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
});
|
|
|
|
expect(result.recovered).toBe(0);
|
|
expect(result.failed).toBe(1);
|
|
|
|
// abortedLastRun flag should NOT be cleared on failure,
|
|
// so the next restart can retry the recovery
|
|
expect(sessions.updateSessionStore).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("returns empty results when no active runs exist", async () => {
|
|
const result = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => new Map(),
|
|
});
|
|
|
|
expect(result.recovered).toBe(0);
|
|
expect(result.failed).toBe(0);
|
|
expect(result.skipped).toBe(0);
|
|
});
|
|
|
|
it("skips sessions with missing session entry in store", async () => {
|
|
await expectSkippedRecovery({});
|
|
});
|
|
|
|
it("clears abortedLastRun flag after successful resume", async () => {
|
|
// Ensure callGateway succeeds for this test
|
|
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "resumed-run" } as never);
|
|
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
});
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord());
|
|
|
|
await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
});
|
|
|
|
// updateSessionStore should have been called AFTER successful resume to clear the flag
|
|
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
|
|
const calls = vi.mocked(sessions.updateSessionStore).mock.calls;
|
|
const [storePath, updater] = calls[0];
|
|
expect(storePath).toBe("/tmp/test-sessions.json");
|
|
|
|
// Simulate the updater to verify it clears abortedLastRun
|
|
const mockStore: Record<string, { abortedLastRun?: boolean; updatedAt?: number }> = {
|
|
"agent:main:subagent:test-session-1": {
|
|
abortedLastRun: true,
|
|
updatedAt: 0,
|
|
},
|
|
};
|
|
(updater as (store: Record<string, unknown>) => void)(mockStore);
|
|
expect(mockStore["agent:main:subagent:test-session-1"]?.abortedLastRun).toBe(false);
|
|
});
|
|
|
|
it("truncates long task descriptions in resume message", async () => {
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
});
|
|
|
|
const longTask = "x".repeat(5000);
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord({ task: longTask }));
|
|
|
|
await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
});
|
|
|
|
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
|
const opts = callArgs[0];
|
|
const params = opts.params as Record<string, unknown>;
|
|
const message = params.message as string;
|
|
// Message should contain truncated task (2000 chars + "...")
|
|
expect(message.length).toBeLessThan(5000);
|
|
expect(message).toContain("...");
|
|
});
|
|
|
|
it("includes last human message in resume when available", async () => {
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
sessionFile: "session-abc.jsonl",
|
|
},
|
|
});
|
|
|
|
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
|
|
{ role: "user", content: [{ type: "text", text: "Please build feature Y" }] },
|
|
{ role: "assistant", content: [{ type: "text", text: "Working on it..." }] },
|
|
{ role: "user", content: [{ type: "text", text: "Also add tests for it" }] },
|
|
{ role: "assistant", content: [{ type: "text", text: "Sure, adding tests now." }] },
|
|
]);
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord());
|
|
|
|
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
|
|
|
|
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
|
const params = callArgs[0].params as Record<string, unknown>;
|
|
const message = params.message as string;
|
|
expect(message).toContain("Also add tests for it");
|
|
expect(message).toContain("last message from the user");
|
|
});
|
|
|
|
it("adds config change hint when assistant messages reference config modifications", async () => {
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
});
|
|
|
|
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
|
|
{ role: "user", content: "Update the config" },
|
|
{ role: "assistant", content: "I've modified openclaw.json to add the new setting." },
|
|
]);
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord());
|
|
|
|
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
|
|
|
|
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
|
const params = callArgs[0].params as Record<string, unknown>;
|
|
const message = params.message as string;
|
|
expect(message).toContain("config changes from your previous run were already applied");
|
|
});
|
|
|
|
it("prevents duplicate resume when updateSessionStore fails", async () => {
|
|
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
|
|
vi.mocked(sessions.updateSessionStore).mockRejectedValue(new Error("write failed"));
|
|
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
});
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord());
|
|
activeRuns.set(
|
|
"run-2",
|
|
createTestRunRecord({
|
|
runId: "run-2",
|
|
}),
|
|
);
|
|
|
|
const result = await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
|
|
|
|
expect(result.recovered).toBe(1);
|
|
expect(result.skipped).toBe(1);
|
|
expect(gateway.callGateway).toHaveBeenCalledOnce();
|
|
});
|
|
|
|
it("does not retry a session after the gateway accepted resume but run remap failed", async () => {
|
|
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
|
|
vi.mocked(subagentRegistry.replaceSubagentRunAfterSteer).mockReturnValue(false);
|
|
|
|
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
|
"agent:main:subagent:test-session-1": {
|
|
sessionId: "session-abc",
|
|
updatedAt: Date.now(),
|
|
abortedLastRun: true,
|
|
},
|
|
});
|
|
|
|
const activeRuns = new Map<string, SubagentRunRecord>();
|
|
activeRuns.set("run-1", createTestRunRecord());
|
|
const resumedSessionKeys = new Set<string>();
|
|
|
|
const first = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
resumedSessionKeys,
|
|
});
|
|
const second = await recoverOrphanedSubagentSessions({
|
|
getActiveRuns: () => activeRuns,
|
|
resumedSessionKeys,
|
|
});
|
|
|
|
expect(first.recovered).toBe(1);
|
|
expect(first.failed).toBe(0);
|
|
expect(second.recovered).toBe(0);
|
|
expect(second.skipped).toBe(1);
|
|
expect(gateway.callGateway).toHaveBeenCalledOnce();
|
|
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
|
|
});
|
|
});
|