test(agents): isolate shared subagent state

This commit is contained in:
Peter Steinberger
2026-04-14 22:48:56 +01:00
parent e7dfc88bfa
commit 54cf4cd857
9 changed files with 128 additions and 49 deletions

View File

@@ -24,6 +24,7 @@ type SessionsSpawnGatewayMockOptions = {
const hoisted = vi.hoisted(() => {
const callGatewayMock = vi.fn();
const sessionStore: Record<string, { sessionId: string; updatedAt: number }> = {};
let nextRunId = 0;
const defaultConfigOverride = {
session: {
@@ -94,6 +95,7 @@ const hoisted = vi.hoisted(() => {
nextRunId += 1;
return `run-${nextRunId}`;
},
sessionStore,
state,
};
});
@@ -197,6 +199,12 @@ export function setupSessionsSpawnGatewayMock(setupOpts: SessionsSpawnGatewayMoc
if (params?.lane === "subagent") {
childRunId = runId;
childSessionKey = params.sessionKey ?? "";
if (childSessionKey) {
hoisted.sessionStore[childSessionKey] = {
sessionId: `sess-${childSessionKey}`,
updatedAt: Date.now(),
};
}
setupOpts.onAgentSubagentSpawn?.(params);
}
return {
@@ -268,6 +276,22 @@ vi.mock("../config/config.js", async () => {
};
});
vi.mock("../config/sessions.js", async () => {
const actual =
await vi.importActual<typeof import("../config/sessions.js")>("../config/sessions.js");
return {
...actual,
loadSessionStore: () => hoisted.sessionStore,
resolveStorePath: () => "/tmp/openclaw-sessions-spawn-test-store.json",
updateSessionStore: async (
_storePath: string,
mutator: (store: typeof hoisted.sessionStore) => void | Promise<void>,
) => {
await mutator(hoisted.sessionStore);
},
};
});
// Same module, different specifier (used by tools under src/agents/tools/*).
vi.mock("../../config/config.js", async () => {
const actual = await vi.importActual<typeof import("../config/config.js")>("../config/config.js");

View File

@@ -10,6 +10,14 @@ import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js";
const tempDirs: string[] = [];
const originalBundledDir = process.env.OPENCLAW_BUNDLED_PLUGINS_DIR;
function restoreBundledPluginsDir() {
if (originalBundledDir === undefined) {
delete process.env.OPENCLAW_BUNDLED_PLUGINS_DIR;
return;
}
process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = originalBundledDir;
}
async function createTempDir(prefix: string) {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
tempDirs.push(dir);
@@ -40,7 +48,7 @@ async function resolveBundledDiffsSkillEntries(config?: OpenClawConfig) {
}
afterEach(async () => {
process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = originalBundledDir;
restoreBundledPluginsDir();
clearPluginManifestRegistryCache();
await Promise.all(
tempDirs.splice(0, tempDirs.length).map((dir) => fs.rm(dir, { recursive: true, force: true })),

View File

@@ -1,25 +1,25 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { beforeAll, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import type { ExecApprovalsResolved } from "../infra/exec-approvals.js";
import type { SafeBinProfileFixture } from "../infra/exec-safe-bin-policy.js";
import { captureEnv } from "../test-utils/env.js";
import { withEnvAsync } from "../test-utils/env.js";
const bundledPluginsDirSnapshot = captureEnv(["OPENCLAW_BUNDLED_PLUGINS_DIR"]);
let createOpenClawCodingTools: typeof import("./pi-tools.js").createOpenClawCodingTools;
beforeAll(() => {
process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = path.join(
os.tmpdir(),
"openclaw-test-no-bundled-extensions",
beforeAll(async () => {
await withEnvAsync(
{
OPENCLAW_BUNDLED_PLUGINS_DIR: path.join(os.tmpdir(), "openclaw-test-no-bundled-extensions"),
},
async () => {
({ createOpenClawCodingTools } = await import("./pi-tools.js"));
},
);
});
afterAll(() => {
bundledPluginsDirSnapshot.restore();
});
vi.mock("../infra/shell-env.js", async () => {
const mod =
await vi.importActual<typeof import("../infra/shell-env.js")>("../infra/shell-env.js");
@@ -77,8 +77,6 @@ vi.mock("../infra/exec-approvals.js", async () => {
return { ...mod, resolveExecApprovals: () => approvals };
});
const { createOpenClawCodingTools } = await import("./pi-tools.js");
type ExecToolResult = {
content: Array<{ type: string; text?: string }>;
details?: { status?: string };

View File

@@ -65,6 +65,27 @@ export function createSubagentRegistryLifecycleController(params: {
runSubagentAnnounceFlow: typeof runSubagentAnnounceFlow;
warn(message: string, meta?: Record<string, unknown>): void;
}) {
const scheduledResumeTimers = new Set<ReturnType<typeof setTimeout>>();
const scheduleResumeSubagentRun = (runId: string, entry: SubagentRunRecord, delayMs: number) => {
const timer = setTimeout(() => {
scheduledResumeTimers.delete(timer);
if (params.runs.get(runId) !== entry) {
return;
}
params.resumeSubagentRun(runId);
}, delayMs);
timer.unref?.();
scheduledResumeTimers.add(timer);
};
const clearScheduledResumeTimers = () => {
for (const timer of scheduledResumeTimers) {
clearTimeout(timer);
}
scheduledResumeTimers.clear();
};
const maskRunId = (runId: string): string => {
const trimmed = runId.trim();
if (!trimmed) {
@@ -422,9 +443,7 @@ export function createSubagentRegistryLifecycleController(params: {
entry.cleanupHandled = false;
params.resumedRuns.delete(runId);
params.persist();
setTimeout(() => {
params.resumeSubagentRun(runId);
}, deferredDecision.delayMs).unref?.();
scheduleResumeSubagentRun(runId, entry, deferredDecision.delayMs);
return;
}
@@ -466,9 +485,7 @@ export function createSubagentRegistryLifecycleController(params: {
if (deferredDecision.resumeDelayMs == null) {
return;
}
setTimeout(() => {
params.resumeSubagentRun(runId);
}, deferredDecision.resumeDelayMs).unref?.();
scheduleResumeSubagentRun(runId, entry, deferredDecision.resumeDelayMs);
};
const startSubagentAnnounceCleanupFlow = (runId: string, entry: SubagentRunRecord): boolean => {
@@ -645,6 +662,7 @@ export function createSubagentRegistryLifecycleController(params: {
};
return {
clearScheduledResumeTimers,
completeCleanupBookkeeping,
completeSubagentRun,
finalizeResumedAnnounceGiveUp,

View File

@@ -72,7 +72,11 @@ export function createSubagentRunManager(params: {
triggerCleanup: boolean;
}): Promise<void>;
}) {
const waitForSubagentCompletion = async (runId: string, waitTimeoutMs: number) => {
const waitForSubagentCompletion = async (
runId: string,
waitTimeoutMs: number,
expectedEntry?: SubagentRunRecord,
) => {
try {
const wait = await waitForAgentRun({
runId,
@@ -80,7 +84,7 @@ export function createSubagentRunManager(params: {
callGateway: params.callGateway,
});
const entry = params.runs.get(runId);
if (!entry) {
if (!entry || (expectedEntry && entry !== expectedEntry)) {
return;
}
if (wait.status === "pending") {
@@ -253,7 +257,7 @@ export function createSubagentRunManager(params: {
params.persist();
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
params.startSweeper();
void waitForSubagentCompletion(nextRunId, waitTimeoutMs);
void waitForSubagentCompletion(nextRunId, waitTimeoutMs, next);
return true;
};
@@ -296,7 +300,7 @@ export function createSubagentRunManager(params: {
const runTimeoutSeconds = registerParams.runTimeoutSeconds ?? 0;
const waitTimeoutMs = params.resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
const requesterOrigin = normalizeDeliveryContext(registerParams.requesterOrigin);
params.runs.set(runId, {
const entry: SubagentRunRecord = {
runId,
childSessionKey,
controllerSessionKey,
@@ -322,7 +326,8 @@ export function createSubagentRunManager(params: {
attachmentsDir: registerParams.attachmentsDir,
attachmentsRootDir: registerParams.attachmentsRootDir,
retainAttachmentsOnKeep: registerParams.retainAttachmentsOnKeep,
});
};
params.runs.set(runId, entry);
try {
createRunningTaskRun({
runtime: "subagent",
@@ -351,7 +356,7 @@ export function createSubagentRunManager(params: {
params.startSweeper();
// Wait for subagent completion via gateway RPC (cross-process).
// The in-process lifecycle listener is a fallback for embedded runs.
void waitForSubagentCompletion(runId, waitTimeoutMs);
void waitForSubagentCompletion(runId, waitTimeoutMs, entry);
};
const releaseSubagentRun = (runId: string) => {

View File

@@ -120,6 +120,7 @@ let contextEngineRegistryPromise: Promise<ContextEngineRegistryModule> | null =
let runtimePluginsPromise: Promise<RuntimePluginsModule> | null = null;
let sweeper: NodeJS.Timeout | null = null;
const resumeRetryTimers = new Set<ReturnType<typeof setTimeout>>();
let sweepInProgress = false;
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
@@ -369,6 +370,7 @@ const subagentLifecycleController = createSubagentRegistryLifecycleController({
});
const {
clearScheduledResumeTimers,
completeCleanupBookkeeping,
completeSubagentRun,
finalizeResumedAnnounceGiveUp,
@@ -384,22 +386,6 @@ function resumeSubagentRun(runId: string) {
if (!entry) {
return;
}
const orphanReason = resolveSubagentRunOrphanReason({ entry });
if (orphanReason) {
if (
reconcileOrphanedRun({
runId,
entry,
reason: orphanReason,
source: "resume",
runs: subagentRuns,
resumedRuns,
})
) {
persistSubagentRuns();
}
return;
}
if (entry.cleanupCompletedAt) {
return;
}
@@ -434,15 +420,38 @@ function resumeSubagentRun(runId: string) {
now < earliestRetryAt
) {
const waitMs = Math.max(1, earliestRetryAt - now);
setTimeout(() => {
const scheduledEntry = entry;
const timer = setTimeout(() => {
resumeRetryTimers.delete(timer);
if (subagentRuns.get(runId) !== scheduledEntry) {
return;
}
resumedRuns.delete(runId);
resumeSubagentRun(runId);
}, waitMs).unref?.();
}, waitMs);
timer.unref?.();
resumeRetryTimers.add(timer);
resumedRuns.add(runId);
return;
}
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
const orphanReason = resolveSubagentRunOrphanReason({ entry });
if (orphanReason) {
if (
reconcileOrphanedRun({
runId,
entry,
reason: orphanReason,
source: "resume",
runs: subagentRuns,
resumedRuns,
})
) {
persistSubagentRuns();
}
return;
}
if (suppressAnnounceForSteerRestart(entry)) {
resumedRuns.add(runId);
return;
@@ -457,7 +466,7 @@ function resumeSubagentRun(runId: string) {
// Wait for completion again after restart.
const cfg = subagentRegistryDeps.loadConfig();
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds);
void subagentRunManager.waitForSubagentCompletion(runId, waitTimeoutMs);
void subagentRunManager.waitForSubagentCompletion(runId, waitTimeoutMs, entry);
resumedRuns.add(runId);
}
@@ -737,6 +746,11 @@ export function registerSubagentRun(params: {
}
export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
clearScheduledResumeTimers();
for (const timer of resumeRetryTimers) {
clearTimeout(timer);
}
resumeRetryTimers.clear();
subagentRuns.clear();
resumedRuns.clear();
endedHookInFlightRunIds.clear();

View File

@@ -1,6 +1,7 @@
import { normalizeProviderId } from "../agents/provider-id.js";
import type { ModelProviderConfig } from "../config/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveBundledPluginsDir } from "./bundled-dir.js";
import type {
ProviderApplyConfigDefaultsContext,
ProviderNormalizeConfigContext,
@@ -20,6 +21,11 @@ export type BundledProviderPolicySurface = {
const bundledProviderPolicySurfaceCache = new Map<string, BundledProviderPolicySurface | null>();
function buildProviderPolicySurfaceCacheKey(providerId: string): string {
const bundledPluginsDir = resolveBundledPluginsDir();
return `${providerId}::${bundledPluginsDir ?? "<default>"}`;
}
function hasProviderPolicyHook(
mod: Record<string, unknown>,
): mod is Record<string, unknown> & BundledProviderPolicySurface {
@@ -66,16 +72,17 @@ export function resolveBundledProviderPolicySurface(
if (!normalizedProviderId) {
return null;
}
if (bundledProviderPolicySurfaceCache.has(normalizedProviderId)) {
return bundledProviderPolicySurfaceCache.get(normalizedProviderId) ?? null;
const cacheKey = buildProviderPolicySurfaceCacheKey(normalizedProviderId);
if (bundledProviderPolicySurfaceCache.has(cacheKey)) {
return bundledProviderPolicySurfaceCache.get(cacheKey) ?? null;
}
const surface = tryLoadBundledProviderPolicySurface(normalizedProviderId);
if (surface) {
bundledProviderPolicySurfaceCache.set(normalizedProviderId, surface);
bundledProviderPolicySurfaceCache.set(cacheKey, surface);
return surface;
}
bundledProviderPolicySurfaceCache.set(normalizedProviderId, null);
bundledProviderPolicySurfaceCache.set(cacheKey, null);
return null;
}

View File

@@ -4,6 +4,7 @@ export function createAgentsVitestConfig(env?: Record<string, string | undefined
return createScopedVitestConfig(["src/agents/**/*.test.ts"], {
dir: "src/agents",
env,
fileParallelism: false,
name: "agents",
});
}

View File

@@ -141,6 +141,7 @@ export function createScopedVitestConfig(
includeOpenClawRuntimeSetup?: boolean;
isolate?: boolean;
name?: string;
fileParallelism?: boolean;
pool?: "forks" | "threads";
passWithNoTests?: boolean;
setupFiles?: string[];
@@ -184,6 +185,9 @@ export function createScopedVitestConfig(
include: relativizeScopedPatterns(includeFromEnv ?? cliInclude ?? include, scopedDir),
exclude,
...(options?.pool ? { pool: options.pool } : {}),
...(options?.fileParallelism === undefined
? {}
: { fileParallelism: options.fileParallelism }),
...(scopedGroupOrder === undefined
? {}
: {