mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-08 07:41:08 +00:00
refactor: lazy load subagent registry runtime hooks
This commit is contained in:
committed by
Peter Steinberger
parent
08560c1f48
commit
18891b1806
@@ -3,7 +3,7 @@ import * as sessions from "../config/sessions.js";
|
||||
import * as gateway from "../gateway/call.js";
|
||||
import * as sessionUtils from "../gateway/session-utils.fs.js";
|
||||
import { recoverOrphanedSubagentSessions } from "./subagent-orphan-recovery.js";
|
||||
import * as subagentRegistry from "./subagent-registry.js";
|
||||
import * as subagentRegistryRuntime from "./subagent-registry-runtime.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
// Mock dependencies before importing the module under test
|
||||
@@ -28,7 +28,7 @@ vi.mock("../gateway/session-utils.fs.js", () => ({
|
||||
readSessionMessages: vi.fn(() => []),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-registry.js", () => ({
|
||||
vi.mock("./subagent-registry-runtime.js", () => ({
|
||||
replaceSubagentRunAfterSteer: vi.fn(() => true),
|
||||
}));
|
||||
|
||||
@@ -103,7 +103,7 @@ describe("subagent-orphan-recovery", () => {
|
||||
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
|
||||
expect(params.message).toContain("gateway reload");
|
||||
expect(params.message).toContain("Test task: implement feature X");
|
||||
expect(subagentRegistry.replaceSubagentRunAfterSteer).toHaveBeenCalledWith(
|
||||
expect(subagentRegistryRuntime.replaceSubagentRunAfterSteer).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
previousRunId: "run-1",
|
||||
nextRunId: "test-run-id",
|
||||
@@ -379,7 +379,7 @@ describe("subagent-orphan-recovery", () => {
|
||||
|
||||
it("does not retry a session after the gateway accepted resume but run remap failed", async () => {
|
||||
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
|
||||
vi.mocked(subagentRegistry.replaceSubagentRunAfterSteer).mockReturnValue(false);
|
||||
vi.mocked(subagentRegistryRuntime.replaceSubagentRunAfterSteer).mockReturnValue(false);
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
|
||||
@@ -21,7 +21,7 @@ import {
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { readSessionMessages } from "../gateway/session-utils.fs.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { replaceSubagentRunAfterSteer } from "./subagent-registry.js";
|
||||
import { replaceSubagentRunAfterSteer } from "./subagent-registry-runtime.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
const log = createSubsystemLogger("subagent-orphan-recovery");
|
||||
|
||||
@@ -3,7 +3,7 @@ import { callGateway } from "../gateway/call.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { createRunningTaskRun } from "../tasks/task-executor.js";
|
||||
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
|
||||
import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js";
|
||||
import type { SubagentRunOutcome } from "./subagent-announce.js";
|
||||
import {
|
||||
SUBAGENT_ENDED_OUTCOME_KILLED,
|
||||
@@ -35,7 +35,13 @@ export function createSubagentRunManager(params: {
|
||||
persist(): void;
|
||||
callGateway: typeof callGateway;
|
||||
loadConfig: typeof loadConfig;
|
||||
ensureRuntimePluginsLoaded: typeof ensureRuntimePluginsLoaded;
|
||||
ensureRuntimePluginsLoaded:
|
||||
| typeof ensureRuntimePluginsLoadedFn
|
||||
| ((args: {
|
||||
config: ReturnType<typeof loadConfig>;
|
||||
workspaceDir?: string;
|
||||
allowGatewaySubagentBinding?: boolean;
|
||||
}) => void | Promise<void>);
|
||||
ensureListener(): void;
|
||||
startSweeper(): void;
|
||||
stopSweeper(): void;
|
||||
@@ -437,23 +443,28 @@ export function createSubagentRunManager(params: {
|
||||
completedAt: now,
|
||||
});
|
||||
const cfg = params.loadConfig();
|
||||
params.ensureRuntimePluginsLoaded({
|
||||
config: cfg,
|
||||
workspaceDir: entry.workspaceDir,
|
||||
allowGatewaySubagentBinding: true,
|
||||
});
|
||||
void emitSubagentEndedHookOnce({
|
||||
entry,
|
||||
reason: SUBAGENT_ENDED_REASON_KILLED,
|
||||
sendFarewell: true,
|
||||
accountId: entry.requesterOrigin?.accountId,
|
||||
outcome: SUBAGENT_ENDED_OUTCOME_KILLED,
|
||||
error: reason,
|
||||
inFlightRunIds: params.endedHookInFlightRunIds,
|
||||
persist: () => params.persist(),
|
||||
}).catch(() => {
|
||||
// Hook failures should not break termination flow.
|
||||
});
|
||||
void Promise.resolve(
|
||||
params.ensureRuntimePluginsLoaded({
|
||||
config: cfg,
|
||||
workspaceDir: entry.workspaceDir,
|
||||
allowGatewaySubagentBinding: true,
|
||||
}),
|
||||
)
|
||||
.then(() =>
|
||||
emitSubagentEndedHookOnce({
|
||||
entry,
|
||||
reason: SUBAGENT_ENDED_REASON_KILLED,
|
||||
sendFarewell: true,
|
||||
accountId: entry.requesterOrigin?.accountId,
|
||||
outcome: SUBAGENT_ENDED_OUTCOME_KILLED,
|
||||
error: reason,
|
||||
inFlightRunIds: params.endedHookInFlightRunIds,
|
||||
persist: () => params.persist(),
|
||||
}),
|
||||
)
|
||||
.catch(() => {
|
||||
// Hook failures should not break termination flow.
|
||||
});
|
||||
}
|
||||
}
|
||||
return updated;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import * as registry from "./subagent-registry.js";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
|
||||
/**
|
||||
* Regression test for #18264: Gateway announcement delivery loop.
|
||||
@@ -72,6 +71,13 @@ vi.mock("./timeout.js", () => ({
|
||||
}));
|
||||
|
||||
describe("announce loop guard (#18264)", () => {
|
||||
let registry: typeof import("./subagent-registry.js");
|
||||
|
||||
beforeAll(async () => {
|
||||
vi.resetModules();
|
||||
registry = await import("./subagent-registry.js");
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
mocks.callGateway.mockClear();
|
||||
|
||||
3
src/agents/subagent-registry.runtime.ts
Normal file
3
src/agents/subagent-registry.runtime.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
export { ensureContextEnginesInitialized } from "../context-engine/init.js";
|
||||
export { resolveContextEngine } from "../context-engine/registry.js";
|
||||
export { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
|
||||
@@ -277,7 +277,9 @@ describe("subagent registry seam flow", () => {
|
||||
await Promise.resolve();
|
||||
|
||||
expect(mocks.runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
||||
expect(mocks.runSubagentEnded).toHaveBeenCalledTimes(1);
|
||||
await vi.waitFor(() => {
|
||||
expect(mocks.runSubagentEnded).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(mocks.onSubagentEnded).toHaveBeenCalledWith({
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { ensureContextEnginesInitialized } from "../context-engine/init.js";
|
||||
import { resolveContextEngine } from "../context-engine/registry.js";
|
||||
import type { ensureContextEnginesInitialized as ensureContextEnginesInitializedFn } from "../context-engine/init.js";
|
||||
import type { resolveContextEngine as resolveContextEngineFn } from "../context-engine/registry.js";
|
||||
import type { SubagentEndReason } from "../context-engine/types.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
|
||||
import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js";
|
||||
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
|
||||
import * as subagentAnnounceModule from "./subagent-announce.js";
|
||||
import type { SubagentRunOutcome } from "./subagent-announce.js";
|
||||
@@ -64,35 +64,35 @@ const log = createSubsystemLogger("agents/subagent-registry");
|
||||
type SubagentRegistryDeps = {
|
||||
callGateway: typeof callGateway;
|
||||
captureSubagentCompletionReply: typeof subagentAnnounceModule.captureSubagentCompletionReply;
|
||||
ensureContextEnginesInitialized: typeof ensureContextEnginesInitialized;
|
||||
ensureRuntimePluginsLoaded: typeof ensureRuntimePluginsLoaded;
|
||||
getSubagentRunsSnapshotForRead: typeof getSubagentRunsSnapshotForRead;
|
||||
loadConfig: typeof loadConfig;
|
||||
onAgentEvent: typeof onAgentEvent;
|
||||
persistSubagentRunsToDisk: typeof persistSubagentRunsToDisk;
|
||||
resolveAgentTimeoutMs: typeof resolveAgentTimeoutMs;
|
||||
resolveContextEngine: typeof resolveContextEngine;
|
||||
restoreSubagentRunsFromDisk: typeof restoreSubagentRunsFromDisk;
|
||||
runSubagentAnnounceFlow: typeof subagentAnnounceModule.runSubagentAnnounceFlow;
|
||||
ensureContextEnginesInitialized?: typeof ensureContextEnginesInitializedFn;
|
||||
ensureRuntimePluginsLoaded?: typeof ensureRuntimePluginsLoadedFn;
|
||||
resolveContextEngine?: typeof resolveContextEngineFn;
|
||||
};
|
||||
|
||||
const defaultSubagentRegistryDeps: SubagentRegistryDeps = {
|
||||
callGateway,
|
||||
captureSubagentCompletionReply: (sessionKey) =>
|
||||
subagentAnnounceModule.captureSubagentCompletionReply(sessionKey),
|
||||
ensureContextEnginesInitialized,
|
||||
ensureRuntimePluginsLoaded,
|
||||
getSubagentRunsSnapshotForRead,
|
||||
loadConfig,
|
||||
onAgentEvent,
|
||||
persistSubagentRunsToDisk,
|
||||
resolveAgentTimeoutMs,
|
||||
resolveContextEngine,
|
||||
restoreSubagentRunsFromDisk,
|
||||
runSubagentAnnounceFlow: (params) => subagentAnnounceModule.runSubagentAnnounceFlow(params),
|
||||
};
|
||||
|
||||
let subagentRegistryDeps: SubagentRegistryDeps = defaultSubagentRegistryDeps;
|
||||
let subagentRegistryRuntimePromise: Promise<
|
||||
typeof import("./subagent-registry.runtime.js")
|
||||
> | null = null;
|
||||
|
||||
let sweeper: NodeJS.Timeout | null = null;
|
||||
let listenerStarted = false;
|
||||
@@ -107,6 +107,35 @@ const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
|
||||
*/
|
||||
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
|
||||
function loadSubagentRegistryRuntime() {
|
||||
subagentRegistryRuntimePromise ??= import("./subagent-registry.runtime.js");
|
||||
return subagentRegistryRuntimePromise;
|
||||
}
|
||||
|
||||
async function ensureSubagentRegistryPluginRuntimeLoaded(params: {
|
||||
config: ReturnType<typeof loadConfig>;
|
||||
workspaceDir?: string;
|
||||
allowGatewaySubagentBinding?: boolean;
|
||||
}) {
|
||||
const ensureRuntimePluginsLoaded = subagentRegistryDeps.ensureRuntimePluginsLoaded;
|
||||
if (ensureRuntimePluginsLoaded) {
|
||||
ensureRuntimePluginsLoaded(params);
|
||||
return;
|
||||
}
|
||||
const runtime = await loadSubagentRegistryRuntime();
|
||||
runtime.ensureRuntimePluginsLoaded(params);
|
||||
}
|
||||
|
||||
async function resolveSubagentRegistryContextEngine(cfg: ReturnType<typeof loadConfig>) {
|
||||
const runtime = await loadSubagentRegistryRuntime();
|
||||
const ensureContextEnginesInitialized =
|
||||
subagentRegistryDeps.ensureContextEnginesInitialized ?? runtime.ensureContextEnginesInitialized;
|
||||
const resolveContextEngine =
|
||||
subagentRegistryDeps.resolveContextEngine ?? runtime.resolveContextEngine;
|
||||
ensureContextEnginesInitialized();
|
||||
return await resolveContextEngine(cfg);
|
||||
}
|
||||
|
||||
function persistSubagentRuns() {
|
||||
subagentRegistryDeps.persistSubagentRunsToDisk(subagentRuns);
|
||||
}
|
||||
@@ -181,13 +210,12 @@ async function notifyContextEngineSubagentEnded(params: {
|
||||
}) {
|
||||
try {
|
||||
const cfg = subagentRegistryDeps.loadConfig();
|
||||
subagentRegistryDeps.ensureRuntimePluginsLoaded({
|
||||
await ensureSubagentRegistryPluginRuntimeLoaded({
|
||||
config: cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
allowGatewaySubagentBinding: true,
|
||||
});
|
||||
subagentRegistryDeps.ensureContextEnginesInitialized();
|
||||
const engine = await subagentRegistryDeps.resolveContextEngine(cfg);
|
||||
const engine = await resolveSubagentRegistryContextEngine(cfg);
|
||||
if (!engine.onSubagentEnded) {
|
||||
return;
|
||||
}
|
||||
@@ -225,7 +253,7 @@ async function emitSubagentEndedHookForRun(params: {
|
||||
accountId?: string;
|
||||
}) {
|
||||
const cfg = subagentRegistryDeps.loadConfig();
|
||||
subagentRegistryDeps.ensureRuntimePluginsLoaded({
|
||||
await ensureSubagentRegistryPluginRuntimeLoaded({
|
||||
config: cfg,
|
||||
workspaceDir: params.entry.workspaceDir,
|
||||
allowGatewaySubagentBinding: true,
|
||||
@@ -540,7 +568,11 @@ const subagentRunManager = createSubagentRunManager({
|
||||
persist: persistSubagentRuns,
|
||||
callGateway: (request) => subagentRegistryDeps.callGateway(request),
|
||||
loadConfig: () => subagentRegistryDeps.loadConfig(),
|
||||
ensureRuntimePluginsLoaded,
|
||||
ensureRuntimePluginsLoaded: (args: {
|
||||
config: ReturnType<typeof loadConfig>;
|
||||
workspaceDir?: string;
|
||||
allowGatewaySubagentBinding?: boolean;
|
||||
}) => ensureSubagentRegistryPluginRuntimeLoaded(args),
|
||||
ensureListener,
|
||||
startSweeper,
|
||||
stopSweeper,
|
||||
@@ -597,6 +629,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
|
||||
resumedRuns.clear();
|
||||
endedHookInFlightRunIds.clear();
|
||||
clearAllPendingLifecycleErrors();
|
||||
subagentRegistryRuntimePromise = null;
|
||||
resetAnnounceQueuesForTests();
|
||||
stopSweeper();
|
||||
restoreAttempted = false;
|
||||
|
||||
Reference in New Issue
Block a user