diff --git a/src/commands/agent-via-gateway.test.ts b/src/commands/agent-via-gateway.test.ts index 8173c40176f..83922ffdc94 100644 --- a/src/commands/agent-via-gateway.test.ts +++ b/src/commands/agent-via-gateway.test.ts @@ -5,7 +5,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import { loggingState } from "../logging/state.js"; import type { RuntimeEnv } from "../runtime.js"; -import { agentCliCommand } from "./agent-via-gateway.js"; +import { __testing as agentViaGatewayTesting, agentCliCommand } from "./agent-via-gateway.js"; import type { agentCommand as AgentCommand } from "./agent.js"; const loadConfig = vi.hoisted(() => vi.fn()); @@ -21,6 +21,7 @@ const isGatewayTransportError = vi.hoisted(() => ); const agentCommand = vi.hoisted(() => vi.fn()); const agentModuleLoadCount = vi.hoisted(() => vi.fn()); +const loadAgentSessionModuleMock = vi.hoisted(() => vi.fn()); const runtime: RuntimeEnv = { log: vi.fn(), @@ -147,6 +148,17 @@ async function waitForAgentCommandCall(expectedCalls = 1) { expect(agentCommand).toHaveBeenCalledTimes(expectedCalls); } +async function waitForGatewayCall(expectedCalls = 1) { + for ( + let attempt = 0; + attempt < 50 && callGateway.mock.calls.length < expectedCalls; + attempt += 1 + ) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + expect(callGateway).toHaveBeenCalledTimes(expectedCalls); +} + function mockMessages(mock: unknown): string[] { const calls = (mock as { mock?: { calls?: unknown[][] } }).mock?.calls ?? []; return calls.map(([message]) => String(message)); @@ -209,6 +221,9 @@ let originalForceConsoleToStderr = false; beforeEach(() => { vi.clearAllMocks(); + agentViaGatewayTesting.resetLazyImportsForTests(); + loadAgentSessionModuleMock.mockImplementation(async () => await import("./agent/session.js")); + agentViaGatewayTesting.setAgentSessionModuleLoaderForTests(loadAgentSessionModuleMock); originalForceConsoleToStderr = loggingState.forceConsoleToStderr; loggingState.forceConsoleToStderr = false; }); @@ -270,6 +285,7 @@ describe("agentCliCommand", () => { expect(params.sessionId).toBeUndefined(); expect(params.to).toBeUndefined(); expect(agentCommand).not.toHaveBeenCalled(); + expect(loadAgentSessionModuleMock).not.toHaveBeenCalled(); }); }); @@ -529,7 +545,7 @@ describe("agentCliCommand", () => { const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, { process: signals.processLike, }); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit(signalName); expect(signals.listenerCount("SIGTERM")).toBe(0); expect(signals.listenerCount("SIGINT")).toBe(0); @@ -596,7 +612,7 @@ describe("agentCliCommand", () => { process: signals.processLike, }, ); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit("SIGTERM"); await run; @@ -638,7 +654,7 @@ describe("agentCliCommand", () => { const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, { process: signals.processLike, }); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit("SIGTERM"); await run; @@ -702,7 +718,7 @@ describe("agentCliCommand", () => { process: signals.processLike, }, ); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit("SIGTERM"); await run; @@ -778,7 +794,7 @@ describe("agentCliCommand", () => { process: signals.processLike, }, ); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit("SIGTERM"); await run; @@ -856,7 +872,7 @@ describe("agentCliCommand", () => { process: signals.processLike, }, ); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit("SIGTERM"); await run; @@ -932,7 +948,7 @@ describe("agentCliCommand", () => { process: signals.processLike, }, ); - await Promise.resolve(); + await waitForGatewayCall(); signals.emit("SIGTERM"); await run; diff --git a/src/commands/agent-via-gateway.ts b/src/commands/agent-via-gateway.ts index 238d2159855..2346a3f9274 100644 --- a/src/commands/agent-via-gateway.ts +++ b/src/commands/agent-via-gateway.ts @@ -29,7 +29,6 @@ import { import { type RuntimeEnv, writeRuntimeJson } from "../runtime.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { normalizeMessageChannel } from "../utils/message-channel.js"; -import { buildExplicitSessionIdSessionKey, resolveSessionKeyForRequest } from "./agent/session.js"; type AgentGatewayResult = { payloads?: Array<{ @@ -93,6 +92,8 @@ type AgentGatewayCallIdentity = Pick< "clientName" | "mode" | "scopes" >; type EmbeddedAgentCommandModule = typeof import("./agent.js"); +type AgentSessionModule = typeof import("./agent/session.js"); +type AgentSessionModuleLoader = () => Promise; const AGENT_CLI_SIGNALS: readonly AgentCliSignal[] = ["SIGINT", "SIGTERM"]; const GATEWAY_ABORT_RETRY_DELAYS_MS = [50, 150, 300, 600] as const; @@ -103,12 +104,33 @@ const AGENT_CLI_SIGNAL_EXIT_CODES: Record = { }; let embeddedAgentCommandPromise: Promise | undefined; +let agentSessionModulePromise: Promise | undefined; +const defaultAgentSessionModuleLoader: AgentSessionModuleLoader = () => + import("./agent/session.js"); +let agentSessionModuleLoader: AgentSessionModuleLoader = defaultAgentSessionModuleLoader; function loadEmbeddedAgentCommand(): Promise { embeddedAgentCommandPromise ??= import("./agent.js").then((module) => module.agentCommand); return embeddedAgentCommandPromise; } +function loadAgentSessionModule(): Promise { + agentSessionModulePromise ??= agentSessionModuleLoader(); + return agentSessionModulePromise; +} + +export const __testing = { + resetLazyImportsForTests(): void { + embeddedAgentCommandPromise = undefined; + agentSessionModulePromise = undefined; + agentSessionModuleLoader = defaultAgentSessionModuleLoader; + }, + setAgentSessionModuleLoaderForTests(loader: AgentSessionModuleLoader): void { + agentSessionModulePromise = undefined; + agentSessionModuleLoader = loader; + }, +}; + function protectJsonStdout(opts: Pick): void { if (opts.json === true) { routeLogsToStderr(); @@ -461,11 +483,13 @@ function createGatewayTimeoutFallbackSession(agentId?: string): { const sessionId = createGatewayTimeoutFallbackSessionId(); return { sessionId, - sessionKey: buildExplicitSessionIdSessionKey({ sessionId, agentId }), + sessionKey: `agent:${normalizeAgentId(agentId)}:explicit:${sessionId.trim()}`, }; } -function resolveAgentIdForGatewayTimeoutFallback(opts: AgentCliOpts): string | undefined { +async function resolveAgentIdForGatewayTimeoutFallback( + opts: AgentCliOpts, +): Promise { const explicitSessionKey = opts.sessionKey?.trim(); if (classifySessionKeyShape(explicitSessionKey) === "agent") { return resolveAgentIdFromSessionKey(explicitSessionKey); @@ -483,6 +507,7 @@ function resolveAgentIdForGatewayTimeoutFallback(opts: AgentCliOpts): string | u return undefined; } const cfg = getRuntimeConfig(); + const { resolveSessionKeyForRequest } = await loadAgentSessionModule(); const resolvedSessionKey = resolveSessionKeyForRequest({ cfg, to: opts.to, @@ -550,13 +575,16 @@ async function agentViaGatewayCommand( ? NO_GATEWAY_TIMEOUT_MS // no timeout (timer-safe max) : Math.max(10_000, (timeoutSeconds + 30) * 1000); - const sessionKey = resolveSessionKeyForRequest({ - cfg, - agentId, - to: opts.to, - sessionId: opts.sessionId, - sessionKey: explicitSessionKey, - }).sessionKey; + const sessionKey = + classifySessionKeyShape(explicitSessionKey) === "agent" + ? explicitSessionKey + : (await loadAgentSessionModule()).resolveSessionKeyForRequest({ + cfg, + agentId, + to: opts.to, + sessionId: opts.sessionId, + sessionKey: explicitSessionKey, + }).sessionKey; const channel = normalizeMessageChannel(opts.channel); const idempotencyKey = normalizeOptionalString(opts.runId) || randomIdempotencyKey(); @@ -750,7 +778,7 @@ export async function agentCliCommand( if (isControlCommandThatMustNotFallback(dispatchOpts)) { throw err; } - const fallbackAgentId = resolveAgentIdForGatewayTimeoutFallback(dispatchOpts); + const fallbackAgentId = await resolveAgentIdForGatewayTimeoutFallback(dispatchOpts); const fallbackSession = createGatewayTimeoutFallbackSession(fallbackAgentId); runtime.error?.( `EMBEDDED FALLBACK: Gateway agent timed out; running embedded agent with fresh session ${fallbackSession.sessionId}: ${String(err)}`,