From 54cf4cd8575b75f654121aee6ea43883e60b1d18 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 14 Apr 2026 22:48:56 +0100 Subject: [PATCH] test(agents): isolate shared subagent state --- ...s.subagents.sessions-spawn.test-harness.ts | 24 +++++++++ .../skills-runtime.integration.test.ts | 10 +++- src/agents/pi-tools.safe-bins.test.ts | 24 ++++----- src/agents/subagent-registry-lifecycle.ts | 30 ++++++++--- src/agents/subagent-registry-run-manager.ts | 17 +++--- src/agents/subagent-registry.ts | 52 ++++++++++++------- src/plugins/provider-public-artifacts.ts | 15 ++++-- test/vitest/vitest.agents.config.ts | 1 + test/vitest/vitest.scoped-config.ts | 4 ++ 9 files changed, 128 insertions(+), 49 deletions(-) diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn.test-harness.ts b/src/agents/openclaw-tools.subagents.sessions-spawn.test-harness.ts index a80fe857187..1001bfbcd05 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn.test-harness.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn.test-harness.ts @@ -24,6 +24,7 @@ type SessionsSpawnGatewayMockOptions = { const hoisted = vi.hoisted(() => { const callGatewayMock = vi.fn(); + const sessionStore: Record = {}; let nextRunId = 0; const defaultConfigOverride = { session: { @@ -94,6 +95,7 @@ const hoisted = vi.hoisted(() => { nextRunId += 1; return `run-${nextRunId}`; }, + sessionStore, state, }; }); @@ -197,6 +199,12 @@ export function setupSessionsSpawnGatewayMock(setupOpts: SessionsSpawnGatewayMoc if (params?.lane === "subagent") { childRunId = runId; childSessionKey = params.sessionKey ?? ""; + if (childSessionKey) { + hoisted.sessionStore[childSessionKey] = { + sessionId: `sess-${childSessionKey}`, + updatedAt: Date.now(), + }; + } setupOpts.onAgentSubagentSpawn?.(params); } return { @@ -268,6 +276,22 @@ vi.mock("../config/config.js", async () => { }; }); +vi.mock("../config/sessions.js", async () => { + const actual = + await vi.importActual("../config/sessions.js"); + return { + ...actual, + loadSessionStore: () => hoisted.sessionStore, + resolveStorePath: () => "/tmp/openclaw-sessions-spawn-test-store.json", + updateSessionStore: async ( + _storePath: string, + mutator: (store: typeof hoisted.sessionStore) => void | Promise, + ) => { + await mutator(hoisted.sessionStore); + }, + }; +}); + // Same module, different specifier (used by tools under src/agents/tools/*). vi.mock("../../config/config.js", async () => { const actual = await vi.importActual("../config/config.js"); diff --git a/src/agents/pi-embedded-runner/skills-runtime.integration.test.ts b/src/agents/pi-embedded-runner/skills-runtime.integration.test.ts index 437b021cdd7..e35d4fed270 100644 --- a/src/agents/pi-embedded-runner/skills-runtime.integration.test.ts +++ b/src/agents/pi-embedded-runner/skills-runtime.integration.test.ts @@ -10,6 +10,14 @@ import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js"; const tempDirs: string[] = []; const originalBundledDir = process.env.OPENCLAW_BUNDLED_PLUGINS_DIR; +function restoreBundledPluginsDir() { + if (originalBundledDir === undefined) { + delete process.env.OPENCLAW_BUNDLED_PLUGINS_DIR; + return; + } + process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = originalBundledDir; +} + async function createTempDir(prefix: string) { const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); tempDirs.push(dir); @@ -40,7 +48,7 @@ async function resolveBundledDiffsSkillEntries(config?: OpenClawConfig) { } afterEach(async () => { - process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = originalBundledDir; + restoreBundledPluginsDir(); clearPluginManifestRegistryCache(); await Promise.all( tempDirs.splice(0, tempDirs.length).map((dir) => fs.rm(dir, { recursive: true, force: true })), diff --git a/src/agents/pi-tools.safe-bins.test.ts b/src/agents/pi-tools.safe-bins.test.ts index 568bda4b492..db0c7a934e2 100644 --- a/src/agents/pi-tools.safe-bins.test.ts +++ b/src/agents/pi-tools.safe-bins.test.ts @@ -1,25 +1,25 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; +import { beforeAll, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import type { ExecApprovalsResolved } from "../infra/exec-approvals.js"; import type { SafeBinProfileFixture } from "../infra/exec-safe-bin-policy.js"; -import { captureEnv } from "../test-utils/env.js"; +import { withEnvAsync } from "../test-utils/env.js"; -const bundledPluginsDirSnapshot = captureEnv(["OPENCLAW_BUNDLED_PLUGINS_DIR"]); +let createOpenClawCodingTools: typeof import("./pi-tools.js").createOpenClawCodingTools; -beforeAll(() => { - process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = path.join( - os.tmpdir(), - "openclaw-test-no-bundled-extensions", +beforeAll(async () => { + await withEnvAsync( + { + OPENCLAW_BUNDLED_PLUGINS_DIR: path.join(os.tmpdir(), "openclaw-test-no-bundled-extensions"), + }, + async () => { + ({ createOpenClawCodingTools } = await import("./pi-tools.js")); + }, ); }); -afterAll(() => { - bundledPluginsDirSnapshot.restore(); -}); - vi.mock("../infra/shell-env.js", async () => { const mod = await vi.importActual("../infra/shell-env.js"); @@ -77,8 +77,6 @@ vi.mock("../infra/exec-approvals.js", async () => { return { ...mod, resolveExecApprovals: () => approvals }; }); -const { createOpenClawCodingTools } = await import("./pi-tools.js"); - type ExecToolResult = { content: Array<{ type: string; text?: string }>; details?: { status?: string }; diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index 73a7bb194e8..58c5beda6c7 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -65,6 +65,27 @@ export function createSubagentRegistryLifecycleController(params: { runSubagentAnnounceFlow: typeof runSubagentAnnounceFlow; warn(message: string, meta?: Record): void; }) { + const scheduledResumeTimers = new Set>(); + + const scheduleResumeSubagentRun = (runId: string, entry: SubagentRunRecord, delayMs: number) => { + const timer = setTimeout(() => { + scheduledResumeTimers.delete(timer); + if (params.runs.get(runId) !== entry) { + return; + } + params.resumeSubagentRun(runId); + }, delayMs); + timer.unref?.(); + scheduledResumeTimers.add(timer); + }; + + const clearScheduledResumeTimers = () => { + for (const timer of scheduledResumeTimers) { + clearTimeout(timer); + } + scheduledResumeTimers.clear(); + }; + const maskRunId = (runId: string): string => { const trimmed = runId.trim(); if (!trimmed) { @@ -422,9 +443,7 @@ export function createSubagentRegistryLifecycleController(params: { entry.cleanupHandled = false; params.resumedRuns.delete(runId); params.persist(); - setTimeout(() => { - params.resumeSubagentRun(runId); - }, deferredDecision.delayMs).unref?.(); + scheduleResumeSubagentRun(runId, entry, deferredDecision.delayMs); return; } @@ -466,9 +485,7 @@ export function createSubagentRegistryLifecycleController(params: { if (deferredDecision.resumeDelayMs == null) { return; } - setTimeout(() => { - params.resumeSubagentRun(runId); - }, deferredDecision.resumeDelayMs).unref?.(); + scheduleResumeSubagentRun(runId, entry, deferredDecision.resumeDelayMs); }; const startSubagentAnnounceCleanupFlow = (runId: string, entry: SubagentRunRecord): boolean => { @@ -645,6 +662,7 @@ export function createSubagentRegistryLifecycleController(params: { }; return { + clearScheduledResumeTimers, completeCleanupBookkeeping, completeSubagentRun, finalizeResumedAnnounceGiveUp, diff --git a/src/agents/subagent-registry-run-manager.ts b/src/agents/subagent-registry-run-manager.ts index b8f805111ce..71573306643 100644 --- a/src/agents/subagent-registry-run-manager.ts +++ b/src/agents/subagent-registry-run-manager.ts @@ -72,7 +72,11 @@ export function createSubagentRunManager(params: { triggerCleanup: boolean; }): Promise; }) { - const waitForSubagentCompletion = async (runId: string, waitTimeoutMs: number) => { + const waitForSubagentCompletion = async ( + runId: string, + waitTimeoutMs: number, + expectedEntry?: SubagentRunRecord, + ) => { try { const wait = await waitForAgentRun({ runId, @@ -80,7 +84,7 @@ export function createSubagentRunManager(params: { callGateway: params.callGateway, }); const entry = params.runs.get(runId); - if (!entry) { + if (!entry || (expectedEntry && entry !== expectedEntry)) { return; } if (wait.status === "pending") { @@ -253,7 +257,7 @@ export function createSubagentRunManager(params: { params.persist(); // Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup. params.startSweeper(); - void waitForSubagentCompletion(nextRunId, waitTimeoutMs); + void waitForSubagentCompletion(nextRunId, waitTimeoutMs, next); return true; }; @@ -296,7 +300,7 @@ export function createSubagentRunManager(params: { const runTimeoutSeconds = registerParams.runTimeoutSeconds ?? 0; const waitTimeoutMs = params.resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds); const requesterOrigin = normalizeDeliveryContext(registerParams.requesterOrigin); - params.runs.set(runId, { + const entry: SubagentRunRecord = { runId, childSessionKey, controllerSessionKey, @@ -322,7 +326,8 @@ export function createSubagentRunManager(params: { attachmentsDir: registerParams.attachmentsDir, attachmentsRootDir: registerParams.attachmentsRootDir, retainAttachmentsOnKeep: registerParams.retainAttachmentsOnKeep, - }); + }; + params.runs.set(runId, entry); try { createRunningTaskRun({ runtime: "subagent", @@ -351,7 +356,7 @@ export function createSubagentRunManager(params: { params.startSweeper(); // Wait for subagent completion via gateway RPC (cross-process). // The in-process lifecycle listener is a fallback for embedded runs. - void waitForSubagentCompletion(runId, waitTimeoutMs); + void waitForSubagentCompletion(runId, waitTimeoutMs, entry); }; const releaseSubagentRun = (runId: string) => { diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 4fe924f9c9a..21ed36cc093 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -120,6 +120,7 @@ let contextEngineRegistryPromise: Promise | null = let runtimePluginsPromise: Promise | null = null; let sweeper: NodeJS.Timeout | null = null; +const resumeRetryTimers = new Set>(); let sweepInProgress = false; let listenerStarted = false; let listenerStop: (() => void) | null = null; @@ -369,6 +370,7 @@ const subagentLifecycleController = createSubagentRegistryLifecycleController({ }); const { + clearScheduledResumeTimers, completeCleanupBookkeeping, completeSubagentRun, finalizeResumedAnnounceGiveUp, @@ -384,22 +386,6 @@ function resumeSubagentRun(runId: string) { if (!entry) { return; } - const orphanReason = resolveSubagentRunOrphanReason({ entry }); - if (orphanReason) { - if ( - reconcileOrphanedRun({ - runId, - entry, - reason: orphanReason, - source: "resume", - runs: subagentRuns, - resumedRuns, - }) - ) { - persistSubagentRuns(); - } - return; - } if (entry.cleanupCompletedAt) { return; } @@ -434,15 +420,38 @@ function resumeSubagentRun(runId: string) { now < earliestRetryAt ) { const waitMs = Math.max(1, earliestRetryAt - now); - setTimeout(() => { + const scheduledEntry = entry; + const timer = setTimeout(() => { + resumeRetryTimers.delete(timer); + if (subagentRuns.get(runId) !== scheduledEntry) { + return; + } resumedRuns.delete(runId); resumeSubagentRun(runId); - }, waitMs).unref?.(); + }, waitMs); + timer.unref?.(); + resumeRetryTimers.add(timer); resumedRuns.add(runId); return; } if (typeof entry.endedAt === "number" && entry.endedAt > 0) { + const orphanReason = resolveSubagentRunOrphanReason({ entry }); + if (orphanReason) { + if ( + reconcileOrphanedRun({ + runId, + entry, + reason: orphanReason, + source: "resume", + runs: subagentRuns, + resumedRuns, + }) + ) { + persistSubagentRuns(); + } + return; + } if (suppressAnnounceForSteerRestart(entry)) { resumedRuns.add(runId); return; @@ -457,7 +466,7 @@ function resumeSubagentRun(runId: string) { // Wait for completion again after restart. const cfg = subagentRegistryDeps.loadConfig(); const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds); - void subagentRunManager.waitForSubagentCompletion(runId, waitTimeoutMs); + void subagentRunManager.waitForSubagentCompletion(runId, waitTimeoutMs, entry); resumedRuns.add(runId); } @@ -737,6 +746,11 @@ export function registerSubagentRun(params: { } export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) { + clearScheduledResumeTimers(); + for (const timer of resumeRetryTimers) { + clearTimeout(timer); + } + resumeRetryTimers.clear(); subagentRuns.clear(); resumedRuns.clear(); endedHookInFlightRunIds.clear(); diff --git a/src/plugins/provider-public-artifacts.ts b/src/plugins/provider-public-artifacts.ts index 60b555c396f..14cf81bb444 100644 --- a/src/plugins/provider-public-artifacts.ts +++ b/src/plugins/provider-public-artifacts.ts @@ -1,6 +1,7 @@ import { normalizeProviderId } from "../agents/provider-id.js"; import type { ModelProviderConfig } from "../config/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { resolveBundledPluginsDir } from "./bundled-dir.js"; import type { ProviderApplyConfigDefaultsContext, ProviderNormalizeConfigContext, @@ -20,6 +21,11 @@ export type BundledProviderPolicySurface = { const bundledProviderPolicySurfaceCache = new Map(); +function buildProviderPolicySurfaceCacheKey(providerId: string): string { + const bundledPluginsDir = resolveBundledPluginsDir(); + return `${providerId}::${bundledPluginsDir ?? ""}`; +} + function hasProviderPolicyHook( mod: Record, ): mod is Record & BundledProviderPolicySurface { @@ -66,16 +72,17 @@ export function resolveBundledProviderPolicySurface( if (!normalizedProviderId) { return null; } - if (bundledProviderPolicySurfaceCache.has(normalizedProviderId)) { - return bundledProviderPolicySurfaceCache.get(normalizedProviderId) ?? null; + const cacheKey = buildProviderPolicySurfaceCacheKey(normalizedProviderId); + if (bundledProviderPolicySurfaceCache.has(cacheKey)) { + return bundledProviderPolicySurfaceCache.get(cacheKey) ?? null; } const surface = tryLoadBundledProviderPolicySurface(normalizedProviderId); if (surface) { - bundledProviderPolicySurfaceCache.set(normalizedProviderId, surface); + bundledProviderPolicySurfaceCache.set(cacheKey, surface); return surface; } - bundledProviderPolicySurfaceCache.set(normalizedProviderId, null); + bundledProviderPolicySurfaceCache.set(cacheKey, null); return null; } diff --git a/test/vitest/vitest.agents.config.ts b/test/vitest/vitest.agents.config.ts index 30a867c875e..01ca629913b 100644 --- a/test/vitest/vitest.agents.config.ts +++ b/test/vitest/vitest.agents.config.ts @@ -4,6 +4,7 @@ export function createAgentsVitestConfig(env?: Record