diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index b8b63bec238..097e2049bc1 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -9,25 +9,21 @@ vi.mock("../gateway/call.js", () => ({ callGateway: (opts: unknown) => callGatewayMock(opts), })); -vi.mock("../config/config.js", async () => { - const actual = await vi.importActual("../config/config.js"); - return { - ...actual, - loadConfig: () => ({ - session: { - mainKey: "main", - scope: "per-sender", - agentToAgent: { maxPingPongTurns: 2 }, - }, - tools: { - // Keep sessions tools permissive in this suite; dedicated visibility tests cover defaults. - sessions: { visibility: "all" }, - agentToAgent: { enabled: true }, - }, - }), - resolveGatewayPort: () => 18789, - }; -}); +vi.mock("../config/config.js", () => ({ + loadConfig: () => ({ + session: { + mainKey: "main", + scope: "per-sender", + agentToAgent: { maxPingPongTurns: 2 }, + }, + tools: { + // Keep sessions tools permissive in this suite; dedicated visibility tests cover defaults. + sessions: { visibility: "all" }, + agentToAgent: { enabled: true }, + }, + }), + resolveGatewayPort: () => 18789, +})); import "./test-helpers/fast-openclaw-tools-sessions.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.test.ts index 754744800d5..b28ccadac31 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.test.ts @@ -39,17 +39,6 @@ const hookRunnerMocks = vi.hoisted(() => ({ runSubagentEnded: vi.fn(async () => {}), })); -vi.mock("./pi-embedded.js", async () => { - const actual = await vi.importActual("./pi-embedded.js"); - return { - ...actual, - isEmbeddedPiRunActive: () => false, - isEmbeddedPiRunStreaming: () => false, - queueEmbeddedPiMessage: () => false, - waitForEmbeddedPiRunEnd: async () => true, - }; -}); - vi.mock("./tools/agent-step.js", () => ({ readLatestAssistantReply: async () => "done", })); @@ -76,7 +65,7 @@ const waitFor = async (label: string, predicate: () => boolean, timeoutMs = 30_0 () => { expect(predicate(), label).toBe(true); }, - { timeout: timeoutMs, interval: 20 }, + { timeout: timeoutMs, interval: 1 }, ); }; diff --git a/src/agents/subagent-control.runtime.ts b/src/agents/subagent-control.runtime.ts new file mode 100644 index 00000000000..bffb3c46024 --- /dev/null +++ b/src/agents/subagent-control.runtime.ts @@ -0,0 +1,2 @@ +export { clearSessionQueues } from "../auto-reply/reply/queue.js"; +export { abortEmbeddedPiRun } from "./pi-embedded-runner/runs.js"; diff --git a/src/agents/subagent-control.test.ts b/src/agents/subagent-control.test.ts index 558e4d96f40..4e42309beb2 100644 --- a/src/agents/subagent-control.test.ts +++ b/src/agents/subagent-control.test.ts @@ -1,7 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import * as sessions from "../config/sessions.js"; import type { CallGatewayOptions } from "../gateway/call.js"; @@ -19,6 +19,20 @@ import { resetSubagentRegistryForTests, } from "./subagent-registry.js"; +function setSubagentControlDepsForTest( + overrides: Parameters[0] = {}, +) { + __testing.setDepsForTest({ + abortEmbeddedPiRun: () => false, + clearSessionQueues: () => ({ followupCleared: 0, laneCleared: 0, keys: [] }), + ...overrides, + }); +} + +beforeEach(() => { + setSubagentControlDepsForTest(); +}); + describe("sendControlledSubagentMessage", () => { afterEach(() => { resetSubagentRegistryForTests({ persist: false }); @@ -71,7 +85,7 @@ describe("sendControlledSubagentMessage", () => { startedAt: Date.now() - 4_000, }); - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async >(request: CallGatewayOptions) => { if (request.method === "agent") { throw new Error("gateway unavailable"); @@ -170,7 +184,7 @@ describe("sendControlledSubagentMessage", () => { outcome: { status: "ok" }, }); - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async >(request: CallGatewayOptions) => { if (request.method === "chat.history") { return { messages: [] } as T; @@ -245,7 +259,7 @@ describe("sendControlledSubagentMessage", () => { outcome: { status: "ok" }, }); - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async >(request: CallGatewayOptions) => { if (request.method === "chat.history") { return { messages: [] } as T; @@ -314,7 +328,7 @@ describe("sendControlledSubagentMessage", () => { content: [{ type: "text", text: "older reply from a previous run" }], }; - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async >(request: CallGatewayOptions) => { if (request.method === "chat.history") { historyCalls += 1; @@ -1101,7 +1115,7 @@ describe("steerControlledSubagentRun", () => { .spyOn(await import("./subagent-registry.js"), "replaceSubagentRunAfterSteer") .mockReturnValue(false); - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async >(request: CallGatewayOptions) => { if (request.method === "agent.wait") { return {} as T; @@ -1153,7 +1167,7 @@ describe("steerControlledSubagentRun", () => { }); it("rejects steering runs that are no longer tracked in the registry", async () => { - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async () => { throw new Error("gateway should not be called"); }, @@ -1227,7 +1241,7 @@ describe("steerControlledSubagentRun", () => { startedAt: Date.now() - 500, }); - __testing.setDepsForTest({ + setSubagentControlDepsForTest({ callGateway: async >(request: CallGatewayOptions) => { if (request.method === "agent.wait") { return {} as T; diff --git a/src/agents/subagent-control.ts b/src/agents/subagent-control.ts index 62bd74b79d5..48b4e499484 100644 --- a/src/agents/subagent-control.ts +++ b/src/agents/subagent-control.ts @@ -1,5 +1,5 @@ import crypto from "node:crypto"; -import { clearSessionQueues } from "../auto-reply/reply/queue.js"; +import type { ClearSessionQueueResult } from "../auto-reply/reply/queue.js"; import { resolveSubagentLabel, resolveSubagentTargetFromRuns, @@ -15,7 +15,6 @@ import { formatErrorMessage } from "../infra/errors.js"; import { isSubagentSessionKey, parseAgentSessionKey } from "../routing/session-key.js"; import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; import { AGENT_LANE_SUBAGENT } from "./lanes.js"; -import { abortEmbeddedPiRun } from "./pi-embedded-runner/runs.js"; import { readLatestAssistantReplySnapshot, waitForAgentRunAndReadUpdatedAssistantReply, @@ -57,6 +56,8 @@ const SUBAGENT_REPLY_HISTORY_LIMIT = 50; const steerRateLimit = new Map(); type GatewayCaller = typeof callGateway; +type AbortEmbeddedPiRun = (sessionId: string) => boolean; +type ClearSessionQueues = (keys: Array) => ClearSessionQueueResult; const defaultSubagentControlDeps = { callGateway, @@ -64,8 +65,35 @@ const defaultSubagentControlDeps = { let subagentControlDeps: { callGateway: GatewayCaller; + abortEmbeddedPiRun?: AbortEmbeddedPiRun; + clearSessionQueues?: ClearSessionQueues; } = defaultSubagentControlDeps; +let subagentControlRuntimePromise: Promise | null = + null; + +function loadSubagentControlRuntime() { + subagentControlRuntimePromise ??= import("./subagent-control.runtime.js"); + return subagentControlRuntimePromise; +} + +async function resolveSubagentControlRuntime(): Promise<{ + abortEmbeddedPiRun: AbortEmbeddedPiRun; + clearSessionQueues: ClearSessionQueues; +}> { + if (subagentControlDeps.abortEmbeddedPiRun && subagentControlDeps.clearSessionQueues) { + return { + abortEmbeddedPiRun: subagentControlDeps.abortEmbeddedPiRun, + clearSessionQueues: subagentControlDeps.clearSessionQueues, + }; + } + const runtime = await loadSubagentControlRuntime(); + return { + abortEmbeddedPiRun: subagentControlDeps.abortEmbeddedPiRun ?? runtime.abortEmbeddedPiRun, + clearSessionQueues: subagentControlDeps.clearSessionQueues ?? runtime.clearSessionQueues, + }; +} + export type ResolvedSubagentController = { controllerSessionKey: string; callerSessionKey: string; @@ -152,8 +180,9 @@ async function killSubagentRun(params: { cache: params.cache, }); const sessionId = resolved.entry?.sessionId; - const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false; - const cleared = clearSessionQueues([childSessionKey, sessionId]); + const runtime = await resolveSubagentControlRuntime(); + const aborted = sessionId ? runtime.abortEmbeddedPiRun(sessionId) : false; + const cleared = runtime.clearSessionQueues([childSessionKey, sessionId]); if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { logVerbose( `subagents control kill: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, @@ -512,9 +541,11 @@ export async function steerControlledSubagentRun(params: { : undefined; if (sessionId) { - abortEmbeddedPiRun(sessionId); + const runtime = await resolveSubagentControlRuntime(); + runtime.abortEmbeddedPiRun(sessionId); } - const cleared = clearSessionQueues([params.entry.childSessionKey, sessionId]); + const runtime = await resolveSubagentControlRuntime(); + const cleared = runtime.clearSessionQueues([params.entry.childSessionKey, sessionId]); if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { logVerbose( `subagents control steer: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, @@ -709,7 +740,13 @@ export function resolveControlledSubagentTarget( } export const __testing = { - setDepsForTest(overrides?: Partial<{ callGateway: GatewayCaller }>) { + setDepsForTest( + overrides?: Partial<{ + callGateway: GatewayCaller; + abortEmbeddedPiRun: AbortEmbeddedPiRun; + clearSessionQueues: ClearSessionQueues; + }>, + ) { subagentControlDeps = overrides ? { ...defaultSubagentControlDeps, diff --git a/src/agents/subagent-registry.steer-restart.test.ts b/src/agents/subagent-registry.steer-restart.test.ts index 106501ea262..2da4d01a878 100644 --- a/src/agents/subagent-registry.steer-restart.test.ts +++ b/src/agents/subagent-registry.steer-restart.test.ts @@ -118,6 +118,9 @@ describe("subagent registry steer restarts", () => { const flushAnnounce = async () => { await new Promise((resolve) => setImmediate(resolve)); }; + const waitForRegistrySideEffect = async (assertion: () => void) => { + await vi.waitFor(assertion, { interval: 1, timeout: 1_000 }); + }; const withPendingAgentWait = async (run: () => Promise): Promise => { const callGateway = vi.mocked((await import("../gateway/call.js")).callGateway); @@ -277,10 +280,10 @@ describe("subagent registry steer restarts", () => { emitLifecycleEnd("run-new"); - await vi.waitFor(() => { + await waitForRegistrySideEffect(() => { expect(announceSpy).toHaveBeenCalledTimes(1); }); - await vi.waitFor(() => { + await waitForRegistrySideEffect(() => { const matchingCalls = runSubagentEndedHookMock.mock.calls.filter((call) => { const ctx = call[1] as { runId?: string } | undefined; return ctx?.runId === "run-new"; @@ -318,7 +321,7 @@ describe("subagent registry steer restarts", () => { expect(runSubagentEndedHookMock).not.toHaveBeenCalled(); resolveAnnounce(true); - await vi.waitFor(() => { + await waitForRegistrySideEffect(() => { expect(runSubagentEndedHookMock).toHaveBeenCalledTimes(1); }); expect(runSubagentEndedHookMock).toHaveBeenCalledWith( @@ -413,7 +416,7 @@ describe("subagent registry steer restarts", () => { emitLifecycleEnd("run-terminal-state-new"); - await vi.waitFor(() => { + await waitForRegistrySideEffect(() => { expect(runSubagentEndedHookMock).toHaveBeenCalledWith( expect.objectContaining({ runId: "run-terminal-state-new", @@ -571,26 +574,25 @@ describe("subagent registry steer restarts", () => { expect(run?.outcome).toEqual({ status: "error", error: "manual kill" }); expect(run?.cleanupHandled).toBe(true); expect(typeof run?.cleanupCompletedAt).toBe("number"); - await vi.waitFor(() => { - expect(runSubagentEndedHookMock).toHaveBeenCalledWith( - { - targetSessionKey: childSessionKey, - targetKind: "subagent", - reason: "subagent-killed", - sendFarewell: true, - accountId: undefined, - runId: "run-killed", - endedAt: expect.any(Number), - outcome: "killed", - error: "manual kill", - }, - { - runId: "run-killed", - childSessionKey, - requesterSessionKey: MAIN_REQUESTER_SESSION_KEY, - }, - ); - }); + await flushAnnounce(); + expect(runSubagentEndedHookMock).toHaveBeenCalledWith( + { + targetSessionKey: childSessionKey, + targetKind: "subagent", + reason: "subagent-killed", + sendFarewell: true, + accountId: undefined, + runId: "run-killed", + endedAt: expect.any(Number), + outcome: "killed", + error: "manual kill", + }, + { + runId: "run-killed", + childSessionKey, + requesterSessionKey: MAIN_REQUESTER_SESSION_KEY, + }, + ); }); it("treats a child session as inactive when only a stale older row is still unended", async () => { @@ -679,7 +681,7 @@ describe("subagent registry steer restarts", () => { }); emitLifecycleEnd("run-parent"); - await vi.waitFor(() => { + await waitForRegistrySideEffect(() => { const childRunIds = announceSpy.mock.calls.map( (call) => ((call[0] ?? {}) as { childRunId?: string }).childRunId, ); @@ -687,7 +689,7 @@ describe("subagent registry steer restarts", () => { }); emitLifecycleEnd("run-child"); - await vi.waitFor(() => { + await waitForRegistrySideEffect(() => { const childRunIds = announceSpy.mock.calls.map( (call) => ((call[0] ?? {}) as { childRunId?: string }).childRunId, ); diff --git a/src/agents/subagent-registry.test.ts b/src/agents/subagent-registry.test.ts index 80936c15592..2c8b2de16f0 100644 --- a/src/agents/subagent-registry.test.ts +++ b/src/agents/subagent-registry.test.ts @@ -4,6 +4,8 @@ import path from "node:path"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const noop = () => {}; +const waitForFast = (callback: () => T | Promise) => + vi.waitFor(callback, { timeout: 1_000, interval: 1 }); const mocks = vi.hoisted(() => ({ callGateway: vi.fn(), @@ -44,10 +46,8 @@ vi.mock("../infra/agent-events.js", () => ({ onAgentEvent: mocks.onAgentEvent, })); -vi.mock("../config/config.js", async () => { - const actual = await vi.importActual("../config/config.js"); +vi.mock("../config/config.js", () => { return { - ...actual, loadConfig: mocks.loadConfig, }; }); @@ -171,7 +171,7 @@ describe("subagent registry seam flow", () => { cleanup: "delete", }); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); }); @@ -295,10 +295,10 @@ describe("subagent registry seam flow", () => { await Promise.resolve(); expect(mocks.runSubagentAnnounceFlow).not.toHaveBeenCalled(); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.runSubagentEnded).toHaveBeenCalledTimes(1); }); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.onSubagentEnded).toHaveBeenCalledWith({ childSessionKey: "agent:main:subagent:child", reason: "deleted", @@ -347,7 +347,7 @@ describe("subagent registry seam flow", () => { cleanup: "keep", }); - await vi.waitFor(() => { + await waitForFast(() => { expect( mod .listSubagentRunsForRequester("agent:main:main") @@ -361,7 +361,7 @@ describe("subagent registry seam flow", () => { childRunId: "run-child-finished", }), ); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.onSubagentEnded).toHaveBeenCalledWith({ childSessionKey: "agent:main:subagent:parent", reason: "deleted", @@ -397,7 +397,7 @@ describe("subagent registry seam flow", () => { }); expect(updated).toBe(1); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.ensureRuntimePluginsLoaded).toHaveBeenCalledWith({ config: { agents: { defaults: { subagents: { archiveAfterMinutes: 0 } } }, @@ -446,7 +446,7 @@ describe("subagent registry seam flow", () => { .listSubagentRunsForRequester("agent:main:main") .find((entry) => entry.runId === "run-killed-delete"), ).toBeUndefined(); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.onSubagentEnded).toHaveBeenCalledWith({ childSessionKey: "agent:main:subagent:killed-delete", reason: "deleted", @@ -480,7 +480,7 @@ describe("subagent registry seam flow", () => { }); expect(updated).toBe(1); - await vi.waitFor(async () => { + await waitForFast(async () => { await expect(fs.access(attachmentsDir)).rejects.toMatchObject({ code: "ENOENT" }); }); }); @@ -515,10 +515,10 @@ describe("subagent registry seam flow", () => { mod.releaseSubagentRun("run-release-delete"); - await vi.waitFor(async () => { + await waitForFast(async () => { await expect(fs.access(attachmentsDir)).rejects.toMatchObject({ code: "ENOENT" }); }); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.onSubagentEnded).toHaveBeenCalledWith({ childSessionKey: "agent:main:subagent:release-delete", reason: "released", @@ -549,7 +549,7 @@ describe("subagent registry seam flow", () => { mod.releaseSubagentRun("run-release-context-engine"); - await vi.waitFor(() => { + await waitForFast(() => { expect(mocks.onSubagentEnded).toHaveBeenCalledWith({ childSessionKey: "agent:main:session:child", reason: "released",