perf(agent): defer session resolver for scoped gateway turns

This commit is contained in:
Peter Steinberger
2026-05-29 15:39:48 +01:00
parent fca7f220a7
commit ec0d3752ca
2 changed files with 63 additions and 19 deletions

View File

@@ -5,7 +5,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { loggingState } from "../logging/state.js";
import type { RuntimeEnv } from "../runtime.js";
import { agentCliCommand } from "./agent-via-gateway.js";
import { __testing as agentViaGatewayTesting, agentCliCommand } from "./agent-via-gateway.js";
import type { agentCommand as AgentCommand } from "./agent.js";
const loadConfig = vi.hoisted(() => vi.fn());
@@ -21,6 +21,7 @@ const isGatewayTransportError = vi.hoisted(() =>
);
const agentCommand = vi.hoisted(() => vi.fn());
const agentModuleLoadCount = vi.hoisted(() => vi.fn());
const loadAgentSessionModuleMock = vi.hoisted(() => vi.fn());
const runtime: RuntimeEnv = {
log: vi.fn(),
@@ -147,6 +148,17 @@ async function waitForAgentCommandCall(expectedCalls = 1) {
expect(agentCommand).toHaveBeenCalledTimes(expectedCalls);
}
async function waitForGatewayCall(expectedCalls = 1) {
for (
let attempt = 0;
attempt < 50 && callGateway.mock.calls.length < expectedCalls;
attempt += 1
) {
await new Promise<void>((resolve) => setTimeout(resolve, 0));
}
expect(callGateway).toHaveBeenCalledTimes(expectedCalls);
}
function mockMessages(mock: unknown): string[] {
const calls = (mock as { mock?: { calls?: unknown[][] } }).mock?.calls ?? [];
return calls.map(([message]) => String(message));
@@ -209,6 +221,9 @@ let originalForceConsoleToStderr = false;
beforeEach(() => {
vi.clearAllMocks();
agentViaGatewayTesting.resetLazyImportsForTests();
loadAgentSessionModuleMock.mockImplementation(async () => await import("./agent/session.js"));
agentViaGatewayTesting.setAgentSessionModuleLoaderForTests(loadAgentSessionModuleMock);
originalForceConsoleToStderr = loggingState.forceConsoleToStderr;
loggingState.forceConsoleToStderr = false;
});
@@ -270,6 +285,7 @@ describe("agentCliCommand", () => {
expect(params.sessionId).toBeUndefined();
expect(params.to).toBeUndefined();
expect(agentCommand).not.toHaveBeenCalled();
expect(loadAgentSessionModuleMock).not.toHaveBeenCalled();
});
});
@@ -529,7 +545,7 @@ describe("agentCliCommand", () => {
const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, {
process: signals.processLike,
});
await Promise.resolve();
await waitForGatewayCall();
signals.emit(signalName);
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
@@ -596,7 +612,7 @@ describe("agentCliCommand", () => {
process: signals.processLike,
},
);
await Promise.resolve();
await waitForGatewayCall();
signals.emit("SIGTERM");
await run;
@@ -638,7 +654,7 @@ describe("agentCliCommand", () => {
const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, {
process: signals.processLike,
});
await Promise.resolve();
await waitForGatewayCall();
signals.emit("SIGTERM");
await run;
@@ -702,7 +718,7 @@ describe("agentCliCommand", () => {
process: signals.processLike,
},
);
await Promise.resolve();
await waitForGatewayCall();
signals.emit("SIGTERM");
await run;
@@ -778,7 +794,7 @@ describe("agentCliCommand", () => {
process: signals.processLike,
},
);
await Promise.resolve();
await waitForGatewayCall();
signals.emit("SIGTERM");
await run;
@@ -856,7 +872,7 @@ describe("agentCliCommand", () => {
process: signals.processLike,
},
);
await Promise.resolve();
await waitForGatewayCall();
signals.emit("SIGTERM");
await run;
@@ -932,7 +948,7 @@ describe("agentCliCommand", () => {
process: signals.processLike,
},
);
await Promise.resolve();
await waitForGatewayCall();
signals.emit("SIGTERM");
await run;

View File

@@ -29,7 +29,6 @@ import {
import { type RuntimeEnv, writeRuntimeJson } from "../runtime.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { normalizeMessageChannel } from "../utils/message-channel.js";
import { buildExplicitSessionIdSessionKey, resolveSessionKeyForRequest } from "./agent/session.js";
type AgentGatewayResult = {
payloads?: Array<{
@@ -93,6 +92,8 @@ type AgentGatewayCallIdentity = Pick<
"clientName" | "mode" | "scopes"
>;
type EmbeddedAgentCommandModule = typeof import("./agent.js");
type AgentSessionModule = typeof import("./agent/session.js");
type AgentSessionModuleLoader = () => Promise<AgentSessionModule>;
const AGENT_CLI_SIGNALS: readonly AgentCliSignal[] = ["SIGINT", "SIGTERM"];
const GATEWAY_ABORT_RETRY_DELAYS_MS = [50, 150, 300, 600] as const;
@@ -103,12 +104,33 @@ const AGENT_CLI_SIGNAL_EXIT_CODES: Record<AgentCliSignal, number> = {
};
let embeddedAgentCommandPromise: Promise<EmbeddedAgentCommandModule["agentCommand"]> | undefined;
let agentSessionModulePromise: Promise<AgentSessionModule> | undefined;
const defaultAgentSessionModuleLoader: AgentSessionModuleLoader = () =>
import("./agent/session.js");
let agentSessionModuleLoader: AgentSessionModuleLoader = defaultAgentSessionModuleLoader;
function loadEmbeddedAgentCommand(): Promise<EmbeddedAgentCommandModule["agentCommand"]> {
embeddedAgentCommandPromise ??= import("./agent.js").then((module) => module.agentCommand);
return embeddedAgentCommandPromise;
}
function loadAgentSessionModule(): Promise<AgentSessionModule> {
agentSessionModulePromise ??= agentSessionModuleLoader();
return agentSessionModulePromise;
}
export const __testing = {
resetLazyImportsForTests(): void {
embeddedAgentCommandPromise = undefined;
agentSessionModulePromise = undefined;
agentSessionModuleLoader = defaultAgentSessionModuleLoader;
},
setAgentSessionModuleLoaderForTests(loader: AgentSessionModuleLoader): void {
agentSessionModulePromise = undefined;
agentSessionModuleLoader = loader;
},
};
function protectJsonStdout(opts: Pick<AgentCliOpts, "json">): void {
if (opts.json === true) {
routeLogsToStderr();
@@ -461,11 +483,13 @@ function createGatewayTimeoutFallbackSession(agentId?: string): {
const sessionId = createGatewayTimeoutFallbackSessionId();
return {
sessionId,
sessionKey: buildExplicitSessionIdSessionKey({ sessionId, agentId }),
sessionKey: `agent:${normalizeAgentId(agentId)}:explicit:${sessionId.trim()}`,
};
}
function resolveAgentIdForGatewayTimeoutFallback(opts: AgentCliOpts): string | undefined {
async function resolveAgentIdForGatewayTimeoutFallback(
opts: AgentCliOpts,
): Promise<string | undefined> {
const explicitSessionKey = opts.sessionKey?.trim();
if (classifySessionKeyShape(explicitSessionKey) === "agent") {
return resolveAgentIdFromSessionKey(explicitSessionKey);
@@ -483,6 +507,7 @@ function resolveAgentIdForGatewayTimeoutFallback(opts: AgentCliOpts): string | u
return undefined;
}
const cfg = getRuntimeConfig();
const { resolveSessionKeyForRequest } = await loadAgentSessionModule();
const resolvedSessionKey = resolveSessionKeyForRequest({
cfg,
to: opts.to,
@@ -550,13 +575,16 @@ async function agentViaGatewayCommand(
? NO_GATEWAY_TIMEOUT_MS // no timeout (timer-safe max)
: Math.max(10_000, (timeoutSeconds + 30) * 1000);
const sessionKey = resolveSessionKeyForRequest({
cfg,
agentId,
to: opts.to,
sessionId: opts.sessionId,
sessionKey: explicitSessionKey,
}).sessionKey;
const sessionKey =
classifySessionKeyShape(explicitSessionKey) === "agent"
? explicitSessionKey
: (await loadAgentSessionModule()).resolveSessionKeyForRequest({
cfg,
agentId,
to: opts.to,
sessionId: opts.sessionId,
sessionKey: explicitSessionKey,
}).sessionKey;
const channel = normalizeMessageChannel(opts.channel);
const idempotencyKey = normalizeOptionalString(opts.runId) || randomIdempotencyKey();
@@ -750,7 +778,7 @@ export async function agentCliCommand(
if (isControlCommandThatMustNotFallback(dispatchOpts)) {
throw err;
}
const fallbackAgentId = resolveAgentIdForGatewayTimeoutFallback(dispatchOpts);
const fallbackAgentId = await resolveAgentIdForGatewayTimeoutFallback(dispatchOpts);
const fallbackSession = createGatewayTimeoutFallbackSession(fallbackAgentId);
runtime.error?.(
`EMBEDDED FALLBACK: Gateway agent timed out; running embedded agent with fresh session ${fallbackSession.sessionId}: ${String(err)}`,