refactor: inject subagent announce test seams

This commit is contained in:
Peter Steinberger
2026-03-30 00:40:32 +01:00
parent f914cd598a
commit dd8d66fc44
8 changed files with 267 additions and 132 deletions

View File

@@ -33,13 +33,27 @@ import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-que
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000;
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
const DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS = FAST_TEST_MODE
? ([8, 16, 32] as const)
: ([5_000, 10_000, 20_000] as const);
type SubagentAnnounceDeliveryDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
};
const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
callGateway,
loadConfig,
};
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
defaultSubagentAnnounceDeliveryDeps;
function resolveDirectAnnounceTransientRetryDelaysMs() {
return process.env.OPENCLAW_TEST_FAST === "1"
? ([8, 16, 32] as const)
: ([5_000, 10_000, 20_000] as const);
}
type DeliveryContextSource = Parameters<typeof deliveryContextFromSession>[0];
@@ -136,6 +150,7 @@ export async function runAnnounceDeliveryWithRetry<T>(params: {
signal?: AbortSignal;
run: () => Promise<T>;
}): Promise<T> {
const retryDelaysMs = resolveDirectAnnounceTransientRetryDelaysMs();
let retryIndex = 0;
for (;;) {
if (params.signal?.aborted) {
@@ -144,12 +159,12 @@ export async function runAnnounceDeliveryWithRetry<T>(params: {
try {
return await params.run();
} catch (err) {
const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex];
const delayMs = retryDelaysMs[retryIndex];
if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) {
throw err;
}
const nextAttempt = retryIndex + 2;
const maxAttempts = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS.length + 1;
const maxAttempts = retryDelaysMs.length + 1;
defaultRuntime.log(
`[warn] Subagent announce ${params.operation} transient failure, retrying ${nextAttempt}/${maxAttempts} in ${Math.round(delayMs / 1000)}s: ${summarizeDeliveryError(err)}`,
);
@@ -272,7 +287,7 @@ export async function resolveSubagentCompletionOrigin(params: {
}
async function sendAnnounce(item: AnnounceQueueItem) {
const cfg = loadConfig();
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const requesterIsSubagent = isInternalAnnounceRequesterSession(item.sessionKey);
const origin = item.origin;
@@ -285,7 +300,7 @@ async function sendAnnounce(item: AnnounceQueueItem) {
enqueuedAt: item.enqueuedAt,
}),
);
await callGateway({
await subagentAnnounceDeliveryDeps.callGateway({
method: "agent",
params: {
sessionKey: item.sessionKey,
@@ -331,7 +346,7 @@ export function resolveRequesterStoreKey(
}
export function loadRequesterSessionEntry(requesterSessionKey: string) {
const cfg = loadConfig();
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const canonicalKey = resolveRequesterStoreKey(cfg, requesterSessionKey);
const agentId = resolveAgentIdFromSessionKey(canonicalKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
@@ -341,7 +356,7 @@ export function loadRequesterSessionEntry(requesterSessionKey: string) {
}
export function loadSessionEntryByKey(sessionKey: string) {
const cfg = loadConfig();
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
@@ -445,7 +460,7 @@ async function sendSubagentAnnounceDirectly(params: {
path: "none",
};
}
const cfg = loadConfig();
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
cfg,
@@ -486,7 +501,7 @@ async function sendSubagentAnnounceDirectly(params: {
: "direct announce agent call",
signal: params.signal,
run: async () =>
await callGateway({
await subagentAnnounceDeliveryDeps.callGateway({
method: "agent",
params: {
sessionKey: canonicalRequesterSessionKey,
@@ -579,3 +594,14 @@ export async function deliverSubagentAnnouncement(params: {
}),
});
}
export const __testing = {
setDepsForTest(overrides?: Partial<SubagentAnnounceDeliveryDeps>) {
subagentAnnounceDeliveryDeps = overrides
? {
...defaultSubagentAnnounceDeliveryDeps,
...overrides,
}
: defaultSubagentAnnounceDeliveryDeps;
},
};

View File

@@ -11,9 +11,24 @@ import { readLatestAssistantReply } from "./tools/agent-step.js";
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
const FAST_TEST_RETRY_INTERVAL_MS = 8;
type SubagentAnnounceOutputDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
};
const defaultSubagentAnnounceOutputDeps: SubagentAnnounceOutputDeps = {
callGateway,
loadConfig,
};
let subagentAnnounceOutputDeps: SubagentAnnounceOutputDeps = defaultSubagentAnnounceOutputDeps;
function isFastTestMode() {
return process.env.OPENCLAW_TEST_FAST === "1";
}
type ToolResultMessage = {
role?: unknown;
content?: unknown;
@@ -230,7 +245,7 @@ export async function readSubagentOutput(
sessionKey: string,
outcome?: SubagentRunOutcome,
): Promise<string | undefined> {
const history = await callGateway<{ messages?: Array<unknown> }>({
const history = await subagentAnnounceOutputDeps.callGateway<{ messages?: Array<unknown> }>({
method: "chat.history",
params: { sessionKey, limit: 100 },
});
@@ -248,15 +263,22 @@ export async function readLatestSubagentOutputWithRetry(params: {
maxWaitMs: number;
outcome?: SubagentRunOutcome;
}): Promise<string | undefined> {
const retryIntervalMs = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100;
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
const retryIntervalMs = isFastTestMode() ? FAST_TEST_RETRY_INTERVAL_MS : 100;
const maxWaitMs = Math.max(0, Math.min(params.maxWaitMs, 15_000));
let waitedMs = 0;
let result: string | undefined;
while (Date.now() < deadline) {
while (waitedMs < maxWaitMs) {
result = await readSubagentOutput(params.sessionKey, params.outcome);
if (result?.trim()) {
return result;
}
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
const remainingMs = maxWaitMs - waitedMs;
if (remainingMs <= 0) {
break;
}
const sleepMs = Math.min(retryIntervalMs, remainingMs);
await new Promise((resolve) => setTimeout(resolve, sleepMs));
waitedMs += sleepMs;
}
return result;
}
@@ -266,7 +288,7 @@ export async function waitForSubagentRunOutcome(
timeoutMs: number,
): Promise<AgentWaitResult> {
const waitMs = Math.max(0, Math.floor(timeoutMs));
return await callGateway<AgentWaitResult>({
return await subagentAnnounceOutputDeps.callGateway<AgentWaitResult>({
method: "agent.wait",
params: {
runId,
@@ -313,7 +335,7 @@ export async function captureSubagentCompletionReply(
}
return await readLatestSubagentOutputWithRetry({
sessionKey,
maxWaitMs: FAST_TEST_MODE ? 50 : 1_500,
maxWaitMs: isFastTestMode() ? 50 : 1_500,
});
}
@@ -478,11 +500,11 @@ export async function buildCompactAnnounceStatsLine(params: {
startedAt?: number;
endedAt?: number;
}) {
const cfg = loadConfig();
const cfg = subagentAnnounceOutputDeps.loadConfig();
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
let entry = loadSessionStore(storePath)[params.sessionKey];
const tokenWaitAttempts = FAST_TEST_MODE ? 1 : 3;
const tokenWaitAttempts = isFastTestMode() ? 1 : 3;
for (let attempt = 0; attempt < tokenWaitAttempts; attempt += 1) {
const hasTokenData =
typeof entry?.inputTokens === "number" ||
@@ -491,7 +513,7 @@ export async function buildCompactAnnounceStatsLine(params: {
if (hasTokenData) {
break;
}
if (!FAST_TEST_MODE) {
if (!isFastTestMode()) {
await new Promise((resolve) => setTimeout(resolve, 150));
}
entry = loadSessionStore(storePath)[params.sessionKey];
@@ -515,3 +537,14 @@ export async function buildCompactAnnounceStatsLine(params: {
}
return `Stats: ${parts.join(" • ")}`;
}
export const __testing = {
setDepsForTest(overrides?: Partial<SubagentAnnounceOutputDeps>) {
subagentAnnounceOutputDeps = overrides
? {
...defaultSubagentAnnounceOutputDeps,
...overrides,
}
: defaultSubagentAnnounceOutputDeps;
},
};

View File

@@ -40,16 +40,21 @@ vi.mock("../config/config.js", async (importOriginal) => {
return {
...actual,
loadConfig: () => mockConfig,
resolveGatewayPort: () => 18789,
};
});
vi.mock("../config/sessions.js", () => ({
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),
resolveAgentIdFromSessionKey: (sessionKey: string) =>
resolveAgentIdFromSessionKeyMock(sessionKey),
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options),
}));
vi.mock("../config/sessions.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions.js")>();
return {
...actual,
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),
resolveAgentIdFromSessionKey: (sessionKey: string) =>
resolveAgentIdFromSessionKeyMock(sessionKey),
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options),
};
});
vi.mock("../gateway/call.js", () => ({
callGateway: (request: unknown) => callGatewayMock(request),
@@ -89,12 +94,10 @@ vi.mock("./subagent-registry-runtime.js", async (importOriginal) => {
...subagentRegistryRuntimeMock,
};
});
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
describe("subagent announce seam flow", () => {
let runSubagentAnnounceFlow: (typeof import("./subagent-announce.js"))["runSubagentAnnounceFlow"];
beforeEach(() => {
vi.resetModules();
agentSpy.mockClear();
sessionsDeleteSpy.mockClear();
callGatewayMock.mockReset().mockImplementation(async (req: unknown) => {
@@ -150,7 +153,6 @@ describe("subagent announce seam flow", () => {
});
it("suppresses ANNOUNCE_SKIP delivery while still deleting the child session", async () => {
({ runSubagentAnnounceFlow } = await import("./subagent-announce.js"));
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-skip-whitespace",
@@ -181,7 +183,6 @@ describe("subagent announce seam flow", () => {
});
it("keeps lifecycle hooks enabled when deleting a completed session-mode child session", async () => {
({ runSubagentAnnounceFlow } = await import("./subagent-announce.js"));
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-session-delete-cleanup",
@@ -236,7 +237,6 @@ describe("subagent announce seam flow", () => {
isEmbeddedPiRunActiveMock.mockReturnValue(true);
queueEmbeddedPiMessageMock.mockReturnValue(true);
({ runSubagentAnnounceFlow } = await import("./subagent-announce.js"));
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-origin-provider-steer",

View File

@@ -43,16 +43,6 @@ function createGatewayCallModuleMock() {
};
}
async function createConfigModuleMock(
importOriginal: () => Promise<typeof import("../config/config.js")>,
) {
const actual = await importOriginal();
return {
...actual,
loadConfig: () => configOverride,
};
}
function createSessionsModuleMock() {
return {
loadSessionStore: vi.fn(() => sessionStore),
@@ -68,33 +58,6 @@ function createSubagentDepthModuleMock() {
};
}
async function createPiEmbeddedModuleMock(
importOriginal: () => Promise<typeof import("./pi-embedded.js")>,
) {
const actual = await importOriginal();
return {
...actual,
isEmbeddedPiRunActive: () => false,
queueEmbeddedPiMessage: () => false,
waitForEmbeddedPiRunEnd: async () => true,
};
}
async function createSubagentRegistryModuleMock(
importOriginal: () => Promise<typeof import("./subagent-registry.js")>,
) {
const actual = await importOriginal();
return {
...actual,
countActiveDescendantRuns: () => 0,
countPendingDescendantRuns: () => pendingDescendantRuns,
listSubagentRunsForRequester: () => [],
isSubagentSessionRunActive: () => subagentSessionRunActive,
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
resolveRequesterForChildSession: () => fallbackRequesterResolution,
};
}
function createTimeoutHistoryWithNoReply() {
return [
{ role: "user", content: "do something" },
@@ -114,28 +77,65 @@ function createTimeoutHistoryWithNoReply() {
}
vi.mock("../gateway/call.js", createGatewayCallModuleMock);
vi.mock("../config/config.js", createConfigModuleMock);
vi.mock("../config/sessions.js", createSessionsModuleMock);
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
return {
...actual,
loadConfig: () => configOverride,
resolveGatewayPort: () => 18789,
};
});
vi.mock("../config/sessions.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions.js")>();
return {
...actual,
...createSessionsModuleMock(),
};
});
vi.mock("./subagent-depth.js", createSubagentDepthModuleMock);
vi.mock("./pi-embedded.js", createPiEmbeddedModuleMock);
vi.mock("./subagent-registry.js", createSubagentRegistryModuleMock);
let runSubagentAnnounceFlow: typeof import("./subagent-announce.js").runSubagentAnnounceFlow;
vi.mock("./pi-embedded.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./pi-embedded.js")>();
return {
...actual,
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: (_sessionId: string, _text: string) => false,
waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) =>
waitForEmbeddedPiRunEndMock(sessionId, timeoutMs),
};
});
vi.mock("./subagent-registry.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./subagent-registry.js")>();
return {
...actual,
countActiveDescendantRuns: () => 0,
countPendingDescendantRuns: () => pendingDescendantRuns,
countPendingDescendantRunsExcludingRun: () => 0,
listSubagentRunsForRequester: () => [],
isSubagentSessionRunActive: () => subagentSessionRunActive,
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
replaceSubagentRunAfterSteer: () => true,
resolveRequesterForChildSession: () => fallbackRequesterResolution,
};
});
vi.mock("./subagent-registry-runtime.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./subagent-registry-runtime.js")>();
return {
...actual,
countActiveDescendantRuns: () => 0,
countPendingDescendantRuns: () => pendingDescendantRuns,
countPendingDescendantRunsExcludingRun: () => 0,
listSubagentRunsForRequester: () => [],
isSubagentSessionRunActive: () => subagentSessionRunActive,
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
replaceSubagentRunAfterSteer: () => true,
resolveRequesterForChildSession: () => fallbackRequesterResolution,
};
});
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
type AnnounceFlowParams = Parameters<
typeof import("./subagent-announce.js").runSubagentAnnounceFlow
>[0];
async function loadFreshSubagentAnnounceFlowForTest() {
vi.resetModules();
vi.doMock("../gateway/call.js", createGatewayCallModuleMock);
vi.doMock("../config/config.js", createConfigModuleMock);
vi.doMock("../config/sessions.js", createSessionsModuleMock);
vi.doMock("./subagent-depth.js", createSubagentDepthModuleMock);
vi.doMock("./pi-embedded.js", createPiEmbeddedModuleMock);
vi.doMock("./subagent-registry.js", createSubagentRegistryModuleMock);
({ runSubagentAnnounceFlow } = await import("./subagent-announce.js"));
}
const defaultSessionConfig = {
mainKey: "main",
scope: "per-sender",
@@ -217,10 +217,6 @@ describe("subagent announce timeout config", () => {
fallbackRequesterResolution = null;
});
beforeEach(async () => {
await loadFreshSubagentAnnounceFlowForTest();
});
it("uses 90s timeout by default for direct announce agent call", async () => {
await runAnnounceFlowForTest("run-default-timeout");
@@ -257,8 +253,8 @@ describe("subagent announce timeout config", () => {
});
it("retries gateway timeout for externally delivered completion announces before giving up", async () => {
vi.useFakeTimers();
try {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
callGatewayImpl = async (request) => {
if (request.method === "chat.history") {
return { messages: [] };
@@ -273,7 +269,6 @@ describe("subagent announce timeout config", () => {
},
expectsCompletionMessage: true,
});
await vi.runAllTimersAsync();
await expect(announcePromise).resolves.toBe(false);
const directAgentCalls = gatewayCalls.filter(
@@ -281,7 +276,7 @@ describe("subagent announce timeout config", () => {
);
expect(directAgentCalls).toHaveLength(4);
} finally {
vi.useRealTimers();
vi.unstubAllEnvs();
}
});

View File

@@ -35,6 +35,19 @@ import {
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
type SubagentAnnounceDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
};
const defaultSubagentAnnounceDeps: SubagentAnnounceDeps = {
callGateway,
loadConfig,
};
let subagentAnnounceDeps: SubagentAnnounceDeps = defaultSubagentAnnounceDeps;
let subagentRegistryRuntimePromise: Promise<
typeof import("./subagent-registry-runtime.js")
> | null = null;
@@ -235,7 +248,7 @@ async function wakeSubagentRunAfterDescendants(params: {
return false;
}
const cfg = loadConfig();
const cfg = subagentAnnounceDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const wakeMessage = buildDescendantWakeMessage({
findings: params.findings,
@@ -248,7 +261,7 @@ async function wakeSubagentRunAfterDescendants(params: {
operation: "descendant wake agent call",
signal: params.signal,
run: async () =>
await callGateway({
await subagentAnnounceDeps.callGateway({
method: "agent",
params: {
sessionKey: params.childSessionKey,
@@ -612,7 +625,7 @@ export async function runSubagentAnnounceFlow(params: {
// Patch label after all writes complete
if (params.label) {
try {
await callGateway({
await subagentAnnounceDeps.callGateway({
method: "sessions.patch",
params: { key: params.childSessionKey, label: params.label },
timeoutMs: 10_000,
@@ -623,7 +636,7 @@ export async function runSubagentAnnounceFlow(params: {
}
if (shouldDeleteChildSession) {
try {
await callGateway({
await subagentAnnounceDeps.callGateway({
method: "sessions.delete",
params: {
key: params.childSessionKey,
@@ -639,3 +652,14 @@ export async function runSubagentAnnounceFlow(params: {
}
return didAnnounce;
}
export const __testing = {
setDepsForTest(overrides?: Partial<SubagentAnnounceDeps>) {
subagentAnnounceDeps = overrides
? {
...defaultSubagentAnnounceDeps,
...overrides,
}
: defaultSubagentAnnounceDeps;
},
};

View File

@@ -54,6 +54,8 @@ export function createSubagentRegistryLifecycleController(params: {
workspaceDir?: string;
}): Promise<void>;
resumeSubagentRun(runId: string): void;
captureSubagentCompletionReply: typeof captureSubagentCompletionReply;
runSubagentAnnounceFlow: typeof runSubagentAnnounceFlow;
warn(message: string, meta?: Record<string, unknown>): void;
}) {
const freezeRunResultAtCompletion = async (entry: SubagentRunRecord): Promise<boolean> => {
@@ -61,7 +63,7 @@ export function createSubagentRegistryLifecycleController(params: {
return false;
}
try {
const captured = await captureSubagentCompletionReply(entry.childSessionKey);
const captured = await params.captureSubagentCompletionReply(entry.childSessionKey);
entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null;
} catch {
entry.frozenResultText = null;
@@ -375,7 +377,7 @@ export function createSubagentRegistryLifecycleController(params: {
});
};
void runSubagentAnnounceFlow({
void params.runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey,

View File

@@ -33,6 +33,9 @@ export function createSubagentRunManager(params: {
resumedRuns: Set<string>;
endedHookInFlightRunIds: Set<string>;
persist(): void;
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
ensureRuntimePluginsLoaded: typeof ensureRuntimePluginsLoaded;
ensureListener(): void;
startSweeper(): void;
stopSweeper(): void;
@@ -66,7 +69,7 @@ export function createSubagentRunManager(params: {
const waitForSubagentCompletion = async (runId: string, waitTimeoutMs: number) => {
try {
const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs));
const wait = await callGateway<{
const wait = await params.callGateway<{
status?: string;
startedAt?: number;
endedAt?: number;
@@ -200,7 +203,7 @@ export function createSubagentRunManager(params: {
}
const now = Date.now();
const cfg = loadConfig();
const cfg = params.loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const spawnMode = source.spawnMode === "session" ? "session" : "run";
const archiveAtMs =
@@ -277,7 +280,7 @@ export function createSubagentRunManager(params: {
retainAttachmentsOnKeep?: boolean;
}) => {
const now = Date.now();
const cfg = loadConfig();
const cfg = params.loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const spawnMode = registerParams.spawnMode === "session" ? "session" : "run";
const archiveAtMs =
@@ -433,8 +436,8 @@ export function createSubagentRunManager(params: {
cleanup: entry.cleanup,
completedAt: now,
});
const cfg = loadConfig();
ensureRuntimePluginsLoaded({
const cfg = params.loadConfig();
params.ensureRuntimePluginsLoaded({
config: cfg,
workspaceDir: entry.workspaceDir,
allowGatewaySubagentBinding: true,

View File

@@ -5,10 +5,10 @@ import type { SubagentEndReason } from "../context-engine/types.js";
import { callGateway } from "../gateway/call.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { resetTaskRegistryForTests } from "../tasks/task-registry.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
import * as subagentAnnounceModule from "./subagent-announce.js";
import type { SubagentRunOutcome } from "./subagent-announce.js";
import {
SUBAGENT_ENDED_REASON_COMPLETE,
@@ -61,6 +61,39 @@ export {
} from "./subagent-registry-helpers.js";
const log = createSubsystemLogger("agents/subagent-registry");
type SubagentRegistryDeps = {
callGateway: typeof callGateway;
captureSubagentCompletionReply: typeof subagentAnnounceModule.captureSubagentCompletionReply;
ensureContextEnginesInitialized: typeof ensureContextEnginesInitialized;
ensureRuntimePluginsLoaded: typeof ensureRuntimePluginsLoaded;
getSubagentRunsSnapshotForRead: typeof getSubagentRunsSnapshotForRead;
loadConfig: typeof loadConfig;
onAgentEvent: typeof onAgentEvent;
persistSubagentRunsToDisk: typeof persistSubagentRunsToDisk;
resolveAgentTimeoutMs: typeof resolveAgentTimeoutMs;
resolveContextEngine: typeof resolveContextEngine;
restoreSubagentRunsFromDisk: typeof restoreSubagentRunsFromDisk;
runSubagentAnnounceFlow: typeof subagentAnnounceModule.runSubagentAnnounceFlow;
};
const defaultSubagentRegistryDeps: SubagentRegistryDeps = {
callGateway,
captureSubagentCompletionReply: (sessionKey) =>
subagentAnnounceModule.captureSubagentCompletionReply(sessionKey),
ensureContextEnginesInitialized,
ensureRuntimePluginsLoaded,
getSubagentRunsSnapshotForRead,
loadConfig,
onAgentEvent,
persistSubagentRunsToDisk,
resolveAgentTimeoutMs,
resolveContextEngine,
restoreSubagentRunsFromDisk,
runSubagentAnnounceFlow: (params) => subagentAnnounceModule.runSubagentAnnounceFlow(params),
};
let subagentRegistryDeps: SubagentRegistryDeps = defaultSubagentRegistryDeps;
let sweeper: NodeJS.Timeout | null = null;
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
@@ -75,7 +108,7 @@ const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
function persistSubagentRuns() {
persistSubagentRunsToDisk(subagentRuns);
subagentRegistryDeps.persistSubagentRunsToDisk(subagentRuns);
}
const resumedRuns = new Set<string>();
@@ -147,14 +180,14 @@ async function notifyContextEngineSubagentEnded(params: {
workspaceDir?: string;
}) {
try {
const cfg = loadConfig();
ensureRuntimePluginsLoaded({
const cfg = subagentRegistryDeps.loadConfig();
subagentRegistryDeps.ensureRuntimePluginsLoaded({
config: cfg,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: true,
});
ensureContextEnginesInitialized();
const engine = await resolveContextEngine(cfg);
subagentRegistryDeps.ensureContextEnginesInitialized();
const engine = await subagentRegistryDeps.resolveContextEngine(cfg);
if (!engine.onSubagentEnded) {
return;
}
@@ -191,8 +224,8 @@ async function emitSubagentEndedHookForRun(params: {
sendFarewell?: boolean;
accountId?: string;
}) {
const cfg = loadConfig();
ensureRuntimePluginsLoaded({
const cfg = subagentRegistryDeps.loadConfig();
subagentRegistryDeps.ensureRuntimePluginsLoaded({
config: cfg,
workspaceDir: params.entry.workspaceDir,
allowGatewaySubagentBinding: true,
@@ -224,6 +257,9 @@ const subagentLifecycleController = createSubagentRegistryLifecycleController({
emitSubagentEndedHookForRun,
notifyContextEngineSubagentEnded,
resumeSubagentRun,
captureSubagentCompletionReply: (sessionKey) =>
subagentRegistryDeps.captureSubagentCompletionReply(sessionKey),
runSubagentAnnounceFlow: (params) => subagentRegistryDeps.runSubagentAnnounceFlow(params),
warn: (message, meta) => log.warn(message, meta),
});
@@ -314,7 +350,7 @@ function resumeSubagentRun(runId: string) {
}
// Wait for completion again after restart.
const cfg = loadConfig();
const cfg = subagentRegistryDeps.loadConfig();
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds);
void subagentRunManager.waitForSubagentCompletion(runId, waitTimeoutMs);
resumedRuns.add(runId);
@@ -326,7 +362,7 @@ function restoreSubagentRunsOnce() {
}
restoreAttempted = true;
try {
const restoredCount = restoreSubagentRunsFromDisk({
const restoredCount = subagentRegistryDeps.restoreSubagentRunsFromDisk({
runs: subagentRuns,
mergeOnly: true,
});
@@ -374,7 +410,10 @@ function resolveSubagentWaitTimeoutMs(
cfg: ReturnType<typeof loadConfig>,
runTimeoutSeconds?: number,
) {
return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds ?? 0 });
return subagentRegistryDeps.resolveAgentTimeoutMs({
cfg,
overrideSeconds: runTimeoutSeconds ?? 0,
});
}
function startSweeper() {
@@ -413,7 +452,7 @@ async function sweepSubagentRuns() {
// Archive/purge is terminal for the run record; remove any retained attachments too.
await safeRemoveAttachmentsDir(entry);
try {
await callGateway({
await subagentRegistryDeps.callGateway({
method: "sessions.delete",
params: {
key: entry.childSessionKey,
@@ -439,7 +478,7 @@ function ensureListener() {
return;
}
listenerStarted = true;
listenerStop = onAgentEvent((evt) => {
listenerStop = subagentRegistryDeps.onAgentEvent((evt) => {
void (async () => {
if (!evt || evt.stream !== "lifecycle") {
return;
@@ -499,6 +538,9 @@ const subagentRunManager = createSubagentRunManager({
resumedRuns,
endedHookInFlightRunIds,
persist: persistSubagentRuns,
callGateway: (request) => subagentRegistryDeps.callGateway(request),
loadConfig: () => subagentRegistryDeps.loadConfig(),
ensureRuntimePluginsLoaded,
ensureListener,
startSweeper,
stopSweeper,
@@ -556,7 +598,6 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
endedHookInFlightRunIds.clear();
clearAllPendingLifecycleErrors();
resetAnnounceQueuesForTests();
resetTaskRegistryForTests({ persist: opts?.persist });
stopSweeper();
restoreAttempted = false;
if (listenerStop) {
@@ -569,6 +610,17 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
}
}
export const __testing = {
setDepsForTest(overrides?: Partial<SubagentRegistryDeps>) {
subagentRegistryDeps = overrides
? {
...defaultSubagentRegistryDeps,
...overrides,
}
: defaultSubagentRegistryDeps;
},
} as const;
export function addSubagentRunForTests(entry: SubagentRunRecord) {
subagentRuns.set(entry.runId, entry);
}
@@ -586,7 +638,7 @@ export function resolveRequesterForChildSession(childSessionKey: string): {
requesterOrigin?: DeliveryContext;
} | null {
const resolved = resolveRequesterForChildSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
childSessionKey,
);
if (!resolved) {
@@ -615,7 +667,7 @@ export function isSubagentSessionRunActive(childSessionKey: string): boolean {
export function shouldIgnorePostCompletionAnnounceForSession(childSessionKey: string): boolean {
return shouldIgnorePostCompletionAnnounceForSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
childSessionKey,
);
}
@@ -637,28 +689,28 @@ export function listSubagentRunsForRequester(
export function listSubagentRunsForController(controllerSessionKey: string): SubagentRunRecord[] {
return listRunsForControllerFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
controllerSessionKey,
);
}
export function countActiveRunsForSession(requesterSessionKey: string): number {
return countActiveRunsForSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
requesterSessionKey,
);
}
export function countActiveDescendantRuns(rootSessionKey: string): number {
return countActiveDescendantRunsFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
export function countPendingDescendantRuns(rootSessionKey: string): number {
return countPendingDescendantRunsFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
@@ -668,7 +720,7 @@ export function countPendingDescendantRunsExcludingRun(
excludeRunId: string,
): number {
return countPendingDescendantRunsExcludingRunFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
excludeRunId,
);
@@ -676,7 +728,7 @@ export function countPendingDescendantRunsExcludingRun(
export function listDescendantRunsForRequester(rootSessionKey: string): SubagentRunRecord[] {
return listDescendantRunsForRequesterFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
@@ -689,7 +741,7 @@ export function getSubagentRunByChildSessionKey(childSessionKey: string): Subage
let latestActive: SubagentRunRecord | null = null;
let latestEnded: SubagentRunRecord | null = null;
for (const entry of getSubagentRunsSnapshotForRead(subagentRuns).values()) {
for (const entry of subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns).values()) {
if (entry.childSessionKey !== key) {
continue;
}
@@ -716,7 +768,7 @@ export function getLatestSubagentRunByChildSessionKey(
}
let latest: SubagentRunRecord | null = null;
for (const entry of getSubagentRunsSnapshotForRead(subagentRuns).values()) {
for (const entry of subagentRegistryDeps.getSubagentRunsSnapshotForRead(subagentRuns).values()) {
if (entry.childSessionKey !== key) {
continue;
}