perf: narrow subagent test runtime seams

This commit is contained in:
Peter Steinberger
2026-04-18 18:32:59 +01:00
parent 4180e7cd59
commit e45a50c828
7 changed files with 126 additions and 86 deletions

View File

@@ -9,25 +9,21 @@ vi.mock("../gateway/call.js", () => ({
callGateway: (opts: unknown) => callGatewayMock(opts),
}));
vi.mock("../config/config.js", async () => {
const actual = await vi.importActual<typeof import("../config/config.js")>("../config/config.js");
return {
...actual,
loadConfig: () => ({
session: {
mainKey: "main",
scope: "per-sender",
agentToAgent: { maxPingPongTurns: 2 },
},
tools: {
// Keep sessions tools permissive in this suite; dedicated visibility tests cover defaults.
sessions: { visibility: "all" },
agentToAgent: { enabled: true },
},
}),
resolveGatewayPort: () => 18789,
};
});
vi.mock("../config/config.js", () => ({
loadConfig: () => ({
session: {
mainKey: "main",
scope: "per-sender",
agentToAgent: { maxPingPongTurns: 2 },
},
tools: {
// Keep sessions tools permissive in this suite; dedicated visibility tests cover defaults.
sessions: { visibility: "all" },
agentToAgent: { enabled: true },
},
}),
resolveGatewayPort: () => 18789,
}));
import "./test-helpers/fast-openclaw-tools-sessions.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";

View File

@@ -39,17 +39,6 @@ const hookRunnerMocks = vi.hoisted(() => ({
runSubagentEnded: vi.fn(async () => {}),
}));
vi.mock("./pi-embedded.js", async () => {
const actual = await vi.importActual<typeof import("./pi-embedded.js")>("./pi-embedded.js");
return {
...actual,
isEmbeddedPiRunActive: () => false,
isEmbeddedPiRunStreaming: () => false,
queueEmbeddedPiMessage: () => false,
waitForEmbeddedPiRunEnd: async () => true,
};
});
vi.mock("./tools/agent-step.js", () => ({
readLatestAssistantReply: async () => "done",
}));
@@ -76,7 +65,7 @@ const waitFor = async (label: string, predicate: () => boolean, timeoutMs = 30_0
() => {
expect(predicate(), label).toBe(true);
},
{ timeout: timeoutMs, interval: 20 },
{ timeout: timeoutMs, interval: 1 },
);
};

View File

@@ -0,0 +1,2 @@
export { clearSessionQueues } from "../auto-reply/reply/queue.js";
export { abortEmbeddedPiRun } from "./pi-embedded-runner/runs.js";

View File

@@ -1,7 +1,7 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import * as sessions from "../config/sessions.js";
import type { CallGatewayOptions } from "../gateway/call.js";
@@ -19,6 +19,20 @@ import {
resetSubagentRegistryForTests,
} from "./subagent-registry.js";
function setSubagentControlDepsForTest(
overrides: Parameters<typeof __testing.setDepsForTest>[0] = {},
) {
__testing.setDepsForTest({
abortEmbeddedPiRun: () => false,
clearSessionQueues: () => ({ followupCleared: 0, laneCleared: 0, keys: [] }),
...overrides,
});
}
beforeEach(() => {
setSubagentControlDepsForTest();
});
describe("sendControlledSubagentMessage", () => {
afterEach(() => {
resetSubagentRegistryForTests({ persist: false });
@@ -71,7 +85,7 @@ describe("sendControlledSubagentMessage", () => {
startedAt: Date.now() - 4_000,
});
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "agent") {
throw new Error("gateway unavailable");
@@ -170,7 +184,7 @@ describe("sendControlledSubagentMessage", () => {
outcome: { status: "ok" },
});
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "chat.history") {
return { messages: [] } as T;
@@ -245,7 +259,7 @@ describe("sendControlledSubagentMessage", () => {
outcome: { status: "ok" },
});
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "chat.history") {
return { messages: [] } as T;
@@ -314,7 +328,7 @@ describe("sendControlledSubagentMessage", () => {
content: [{ type: "text", text: "older reply from a previous run" }],
};
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "chat.history") {
historyCalls += 1;
@@ -1101,7 +1115,7 @@ describe("steerControlledSubagentRun", () => {
.spyOn(await import("./subagent-registry.js"), "replaceSubagentRunAfterSteer")
.mockReturnValue(false);
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "agent.wait") {
return {} as T;
@@ -1153,7 +1167,7 @@ describe("steerControlledSubagentRun", () => {
});
it("rejects steering runs that are no longer tracked in the registry", async () => {
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async () => {
throw new Error("gateway should not be called");
},
@@ -1227,7 +1241,7 @@ describe("steerControlledSubagentRun", () => {
startedAt: Date.now() - 500,
});
__testing.setDepsForTest({
setSubagentControlDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "agent.wait") {
return {} as T;

View File

@@ -1,5 +1,5 @@
import crypto from "node:crypto";
import { clearSessionQueues } from "../auto-reply/reply/queue.js";
import type { ClearSessionQueueResult } from "../auto-reply/reply/queue.js";
import {
resolveSubagentLabel,
resolveSubagentTargetFromRuns,
@@ -15,7 +15,6 @@ import { formatErrorMessage } from "../infra/errors.js";
import { isSubagentSessionKey, parseAgentSessionKey } from "../routing/session-key.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js";
import { AGENT_LANE_SUBAGENT } from "./lanes.js";
import { abortEmbeddedPiRun } from "./pi-embedded-runner/runs.js";
import {
readLatestAssistantReplySnapshot,
waitForAgentRunAndReadUpdatedAssistantReply,
@@ -57,6 +56,8 @@ const SUBAGENT_REPLY_HISTORY_LIMIT = 50;
const steerRateLimit = new Map<string, number>();
type GatewayCaller = typeof callGateway;
type AbortEmbeddedPiRun = (sessionId: string) => boolean;
type ClearSessionQueues = (keys: Array<string | undefined>) => ClearSessionQueueResult;
const defaultSubagentControlDeps = {
callGateway,
@@ -64,8 +65,35 @@ const defaultSubagentControlDeps = {
let subagentControlDeps: {
callGateway: GatewayCaller;
abortEmbeddedPiRun?: AbortEmbeddedPiRun;
clearSessionQueues?: ClearSessionQueues;
} = defaultSubagentControlDeps;
let subagentControlRuntimePromise: Promise<typeof import("./subagent-control.runtime.js")> | null =
null;
function loadSubagentControlRuntime() {
subagentControlRuntimePromise ??= import("./subagent-control.runtime.js");
return subagentControlRuntimePromise;
}
async function resolveSubagentControlRuntime(): Promise<{
abortEmbeddedPiRun: AbortEmbeddedPiRun;
clearSessionQueues: ClearSessionQueues;
}> {
if (subagentControlDeps.abortEmbeddedPiRun && subagentControlDeps.clearSessionQueues) {
return {
abortEmbeddedPiRun: subagentControlDeps.abortEmbeddedPiRun,
clearSessionQueues: subagentControlDeps.clearSessionQueues,
};
}
const runtime = await loadSubagentControlRuntime();
return {
abortEmbeddedPiRun: subagentControlDeps.abortEmbeddedPiRun ?? runtime.abortEmbeddedPiRun,
clearSessionQueues: subagentControlDeps.clearSessionQueues ?? runtime.clearSessionQueues,
};
}
export type ResolvedSubagentController = {
controllerSessionKey: string;
callerSessionKey: string;
@@ -152,8 +180,9 @@ async function killSubagentRun(params: {
cache: params.cache,
});
const sessionId = resolved.entry?.sessionId;
const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false;
const cleared = clearSessionQueues([childSessionKey, sessionId]);
const runtime = await resolveSubagentControlRuntime();
const aborted = sessionId ? runtime.abortEmbeddedPiRun(sessionId) : false;
const cleared = runtime.clearSessionQueues([childSessionKey, sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`subagents control kill: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
@@ -512,9 +541,11 @@ export async function steerControlledSubagentRun(params: {
: undefined;
if (sessionId) {
abortEmbeddedPiRun(sessionId);
const runtime = await resolveSubagentControlRuntime();
runtime.abortEmbeddedPiRun(sessionId);
}
const cleared = clearSessionQueues([params.entry.childSessionKey, sessionId]);
const runtime = await resolveSubagentControlRuntime();
const cleared = runtime.clearSessionQueues([params.entry.childSessionKey, sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`subagents control steer: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
@@ -709,7 +740,13 @@ export function resolveControlledSubagentTarget(
}
export const __testing = {
setDepsForTest(overrides?: Partial<{ callGateway: GatewayCaller }>) {
setDepsForTest(
overrides?: Partial<{
callGateway: GatewayCaller;
abortEmbeddedPiRun: AbortEmbeddedPiRun;
clearSessionQueues: ClearSessionQueues;
}>,
) {
subagentControlDeps = overrides
? {
...defaultSubagentControlDeps,

View File

@@ -118,6 +118,9 @@ describe("subagent registry steer restarts", () => {
const flushAnnounce = async () => {
await new Promise<void>((resolve) => setImmediate(resolve));
};
const waitForRegistrySideEffect = async (assertion: () => void) => {
await vi.waitFor(assertion, { interval: 1, timeout: 1_000 });
};
const withPendingAgentWait = async <T>(run: () => Promise<T>): Promise<T> => {
const callGateway = vi.mocked((await import("../gateway/call.js")).callGateway);
@@ -277,10 +280,10 @@ describe("subagent registry steer restarts", () => {
emitLifecycleEnd("run-new");
await vi.waitFor(() => {
await waitForRegistrySideEffect(() => {
expect(announceSpy).toHaveBeenCalledTimes(1);
});
await vi.waitFor(() => {
await waitForRegistrySideEffect(() => {
const matchingCalls = runSubagentEndedHookMock.mock.calls.filter((call) => {
const ctx = call[1] as { runId?: string } | undefined;
return ctx?.runId === "run-new";
@@ -318,7 +321,7 @@ describe("subagent registry steer restarts", () => {
expect(runSubagentEndedHookMock).not.toHaveBeenCalled();
resolveAnnounce(true);
await vi.waitFor(() => {
await waitForRegistrySideEffect(() => {
expect(runSubagentEndedHookMock).toHaveBeenCalledTimes(1);
});
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
@@ -413,7 +416,7 @@ describe("subagent registry steer restarts", () => {
emitLifecycleEnd("run-terminal-state-new");
await vi.waitFor(() => {
await waitForRegistrySideEffect(() => {
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-terminal-state-new",
@@ -571,26 +574,25 @@ describe("subagent registry steer restarts", () => {
expect(run?.outcome).toEqual({ status: "error", error: "manual kill" });
expect(run?.cleanupHandled).toBe(true);
expect(typeof run?.cleanupCompletedAt).toBe("number");
await vi.waitFor(() => {
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
{
targetSessionKey: childSessionKey,
targetKind: "subagent",
reason: "subagent-killed",
sendFarewell: true,
accountId: undefined,
runId: "run-killed",
endedAt: expect.any(Number),
outcome: "killed",
error: "manual kill",
},
{
runId: "run-killed",
childSessionKey,
requesterSessionKey: MAIN_REQUESTER_SESSION_KEY,
},
);
});
await flushAnnounce();
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
{
targetSessionKey: childSessionKey,
targetKind: "subagent",
reason: "subagent-killed",
sendFarewell: true,
accountId: undefined,
runId: "run-killed",
endedAt: expect.any(Number),
outcome: "killed",
error: "manual kill",
},
{
runId: "run-killed",
childSessionKey,
requesterSessionKey: MAIN_REQUESTER_SESSION_KEY,
},
);
});
it("treats a child session as inactive when only a stale older row is still unended", async () => {
@@ -679,7 +681,7 @@ describe("subagent registry steer restarts", () => {
});
emitLifecycleEnd("run-parent");
await vi.waitFor(() => {
await waitForRegistrySideEffect(() => {
const childRunIds = announceSpy.mock.calls.map(
(call) => ((call[0] ?? {}) as { childRunId?: string }).childRunId,
);
@@ -687,7 +689,7 @@ describe("subagent registry steer restarts", () => {
});
emitLifecycleEnd("run-child");
await vi.waitFor(() => {
await waitForRegistrySideEffect(() => {
const childRunIds = announceSpy.mock.calls.map(
(call) => ((call[0] ?? {}) as { childRunId?: string }).childRunId,
);

View File

@@ -4,6 +4,8 @@ import path from "node:path";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const noop = () => {};
const waitForFast = <T>(callback: () => T | Promise<T>) =>
vi.waitFor(callback, { timeout: 1_000, interval: 1 });
const mocks = vi.hoisted(() => ({
callGateway: vi.fn(),
@@ -44,10 +46,8 @@ vi.mock("../infra/agent-events.js", () => ({
onAgentEvent: mocks.onAgentEvent,
}));
vi.mock("../config/config.js", async () => {
const actual = await vi.importActual<typeof import("../config/config.js")>("../config/config.js");
vi.mock("../config/config.js", () => {
return {
...actual,
loadConfig: mocks.loadConfig,
};
});
@@ -171,7 +171,7 @@ describe("subagent registry seam flow", () => {
cleanup: "delete",
});
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
});
@@ -295,10 +295,10 @@ describe("subagent registry seam flow", () => {
await Promise.resolve();
expect(mocks.runSubagentAnnounceFlow).not.toHaveBeenCalled();
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.runSubagentEnded).toHaveBeenCalledTimes(1);
});
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.onSubagentEnded).toHaveBeenCalledWith({
childSessionKey: "agent:main:subagent:child",
reason: "deleted",
@@ -347,7 +347,7 @@ describe("subagent registry seam flow", () => {
cleanup: "keep",
});
await vi.waitFor(() => {
await waitForFast(() => {
expect(
mod
.listSubagentRunsForRequester("agent:main:main")
@@ -361,7 +361,7 @@ describe("subagent registry seam flow", () => {
childRunId: "run-child-finished",
}),
);
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.onSubagentEnded).toHaveBeenCalledWith({
childSessionKey: "agent:main:subagent:parent",
reason: "deleted",
@@ -397,7 +397,7 @@ describe("subagent registry seam flow", () => {
});
expect(updated).toBe(1);
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.ensureRuntimePluginsLoaded).toHaveBeenCalledWith({
config: {
agents: { defaults: { subagents: { archiveAfterMinutes: 0 } } },
@@ -446,7 +446,7 @@ describe("subagent registry seam flow", () => {
.listSubagentRunsForRequester("agent:main:main")
.find((entry) => entry.runId === "run-killed-delete"),
).toBeUndefined();
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.onSubagentEnded).toHaveBeenCalledWith({
childSessionKey: "agent:main:subagent:killed-delete",
reason: "deleted",
@@ -480,7 +480,7 @@ describe("subagent registry seam flow", () => {
});
expect(updated).toBe(1);
await vi.waitFor(async () => {
await waitForFast(async () => {
await expect(fs.access(attachmentsDir)).rejects.toMatchObject({ code: "ENOENT" });
});
});
@@ -515,10 +515,10 @@ describe("subagent registry seam flow", () => {
mod.releaseSubagentRun("run-release-delete");
await vi.waitFor(async () => {
await waitForFast(async () => {
await expect(fs.access(attachmentsDir)).rejects.toMatchObject({ code: "ENOENT" });
});
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.onSubagentEnded).toHaveBeenCalledWith({
childSessionKey: "agent:main:subagent:release-delete",
reason: "released",
@@ -549,7 +549,7 @@ describe("subagent registry seam flow", () => {
mod.releaseSubagentRun("run-release-context-engine");
await vi.waitFor(() => {
await waitForFast(() => {
expect(mocks.onSubagentEnded).toHaveBeenCalledWith({
childSessionKey: "agent:main:session:child",
reason: "released",