From 4a30ae182b6ab86933df98e6ac42fb9dffbcfcee Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 26 Apr 2026 23:50:53 -0700 Subject: [PATCH] fix(gateway): defer embedded runner imports --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner/run-state.ts | 68 +++++++++++++++++ src/agents/pi-embedded-runner/runs.ts | 86 +++++----------------- src/gateway/server-reload-handlers.ts | 2 +- src/gateway/server.impl.ts | 2 +- src/gateway/server.reload.test.ts | 10 +++ 6 files changed, 99 insertions(+), 70 deletions(-) create mode 100644 src/agents/pi-embedded-runner/run-state.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3213358100b..b7c9b9b3752 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai - Gateway/startup: lazy-load plugin HTTP route dispatch when active plugin routes exist so no-plugin Gateway boot skips plugin route runtime scope setup. Thanks @vincentkoc. - Gateway/startup: move chat run/subscriber registries onto a lightweight state module and defer chat/session event projection until the first event so Gateway boot skips session IO imports. Thanks @vincentkoc. - Gateway/startup: keep node session runtime on a lightweight JSON parser instead of importing gateway method validation helpers during boot. Thanks @vincentkoc. +- Gateway/startup: read embedded-run activity from a lightweight shared state module so restart deferral no longer imports the embedded runner during Gateway boot. Thanks @vincentkoc. - CLI/Gateway: use a parse-only config snapshot for plain `gateway status` reads and reuse same-path service config context so status no longer spends tens of seconds in full config validation before printing. Thanks @vincentkoc. - Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc. - Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq. diff --git a/src/agents/pi-embedded-runner/run-state.ts b/src/agents/pi-embedded-runner/run-state.ts new file mode 100644 index 00000000000..9858b88b651 --- /dev/null +++ b/src/agents/pi-embedded-runner/run-state.ts @@ -0,0 +1,68 @@ +import { + getActiveReplyRunCount, + listActiveReplyRunSessionIds, +} from "../../auto-reply/reply/reply-run-registry.js"; +import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; + +export type EmbeddedPiQueueHandle = { + kind?: "embedded"; + queueMessage: (text: string) => Promise; + isStreaming: () => boolean; + isCompacting: () => boolean; + cancel?: (reason?: "user_abort" | "restart" | "superseded") => void; + abort: () => void; +}; + +export type ActiveEmbeddedRunSnapshot = { + transcriptLeafId: string | null; + messages?: unknown[]; + inFlightPrompt?: string; +}; + +export type EmbeddedRunModelSwitchRequest = { + provider: string; + model: string; + authProfileId?: string; + authProfileIdSource?: "auto" | "user"; +}; + +export type EmbeddedRunWaiter = { + resolve: (ended: boolean) => void; + timer: NodeJS.Timeout; +}; + +const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState"); + +const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({ + activeRuns: new Map(), + snapshots: new Map(), + sessionIdsByKey: new Map(), + waiters: new Map>(), + modelSwitchRequests: new Map(), +})); + +export const ACTIVE_EMBEDDED_RUNS = + embeddedRunState.activeRuns ?? + (embeddedRunState.activeRuns = new Map()); +export const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = + embeddedRunState.snapshots ?? + (embeddedRunState.snapshots = new Map()); +export const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY = + embeddedRunState.sessionIdsByKey ?? + (embeddedRunState.sessionIdsByKey = new Map()); +export const EMBEDDED_RUN_WAITERS = + embeddedRunState.waiters ?? + (embeddedRunState.waiters = new Map>()); +export const EMBEDDED_RUN_MODEL_SWITCH_REQUESTS = + embeddedRunState.modelSwitchRequests ?? + (embeddedRunState.modelSwitchRequests = new Map()); + +export function getActiveEmbeddedRunCount(): number { + let activeCount = ACTIVE_EMBEDDED_RUNS.size; + for (const sessionId of listActiveReplyRunSessionIds()) { + if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) { + activeCount += 1; + } + } + return Math.max(activeCount, getActiveReplyRunCount()); +} diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 4931c77e7a7..5965c3ef070 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -1,10 +1,8 @@ import { abortActiveReplyRuns, abortReplyRunBySessionId, - getActiveReplyRunCount, isReplyRunActiveForSessionId, isReplyRunStreamingForSessionId, - listActiveReplyRunSessionIds, queueReplyRunMessage, resolveActiveReplyRunSessionId, waitForReplyRunEndBySessionId, @@ -14,64 +12,26 @@ import { logMessageQueued, logSessionStateChange, } from "../../logging/diagnostic.js"; -import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import { + ACTIVE_EMBEDDED_RUNS, + ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY, + ACTIVE_EMBEDDED_RUN_SNAPSHOTS, + EMBEDDED_RUN_MODEL_SWITCH_REQUESTS, + EMBEDDED_RUN_WAITERS, + getActiveEmbeddedRunCount, + type ActiveEmbeddedRunSnapshot, + type EmbeddedPiQueueHandle, + type EmbeddedRunModelSwitchRequest, + type EmbeddedRunWaiter, +} from "./run-state.js"; -export type EmbeddedPiQueueHandle = { - kind?: "embedded"; - queueMessage: (text: string) => Promise; - isStreaming: () => boolean; - isCompacting: () => boolean; - cancel?: (reason?: "user_abort" | "restart" | "superseded") => void; - abort: () => void; -}; - -export type ActiveEmbeddedRunSnapshot = { - transcriptLeafId: string | null; - messages?: unknown[]; - inFlightPrompt?: string; -}; - -type EmbeddedRunWaiter = { - resolve: (ended: boolean) => void; - timer: NodeJS.Timeout; -}; - -export type EmbeddedRunModelSwitchRequest = { - provider: string; - model: string; - authProfileId?: string; - authProfileIdSource?: "auto" | "user"; -}; - -/** - * Use global singleton state so busy/streaming checks stay consistent even - * when the bundler emits multiple copies of this module into separate chunks. - */ -const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState"); - -const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({ - activeRuns: new Map(), - snapshots: new Map(), - sessionIdsByKey: new Map(), - waiters: new Map>(), - modelSwitchRequests: new Map(), -})); -const ACTIVE_EMBEDDED_RUNS = - embeddedRunState.activeRuns ?? - (embeddedRunState.activeRuns = new Map()); -const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = - embeddedRunState.snapshots ?? - (embeddedRunState.snapshots = new Map()); -const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY = - embeddedRunState.sessionIdsByKey ?? - (embeddedRunState.sessionIdsByKey = new Map()); -const EMBEDDED_RUN_WAITERS = - embeddedRunState.waiters ?? - (embeddedRunState.waiters = new Map>()); -const EMBEDDED_RUN_MODEL_SWITCH_REQUESTS = - embeddedRunState.modelSwitchRequests ?? - (embeddedRunState.modelSwitchRequests = new Map()); +export { + getActiveEmbeddedRunCount, + type ActiveEmbeddedRunSnapshot, + type EmbeddedPiQueueHandle, + type EmbeddedRunModelSwitchRequest, +} from "./run-state.js"; function setActiveRunSessionKey(sessionKey: string | undefined, sessionId: string): void { const normalizedSessionKey = sessionKey?.trim(); @@ -216,16 +176,6 @@ export function resolveActiveEmbeddedRunSessionId(sessionKey: string): string | ); } -export function getActiveEmbeddedRunCount(): number { - let activeCount = ACTIVE_EMBEDDED_RUNS.size; - for (const sessionId of listActiveReplyRunSessionIds()) { - if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) { - activeCount += 1; - } - } - return Math.max(activeCount, getActiveReplyRunCount()); -} - export function getActiveEmbeddedRunSnapshot( sessionId: string, ): ActiveEmbeddedRunSnapshot | undefined { diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 1126dad61d0..e308eca6c7e 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -1,6 +1,6 @@ import { resetModelCatalogCache } from "../agents/model-catalog.js"; import { disposeAllSessionMcpRuntimes } from "../agents/pi-bundle-mcp-tools.js"; -import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js"; +import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js"; import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; import type { CliDeps } from "../cli/deps.types.js"; import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index f5ada858852..a4190d53460 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -1,4 +1,4 @@ -import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js"; +import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js"; import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; import type { CanvasHostServer } from "../canvas-host/server.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; diff --git a/src/gateway/server.reload.test.ts b/src/gateway/server.reload.test.ts index 561a01f8bd6..05f68017290 100644 --- a/src/gateway/server.reload.test.ts +++ b/src/gateway/server.reload.test.ts @@ -216,6 +216,16 @@ vi.mock("../agents/pi-embedded-runner/runs.js", async () => { }; }); +vi.mock("../agents/pi-embedded-runner/run-state.js", async () => { + const actual = await vi.importActual( + "../agents/pi-embedded-runner/run-state.js", + ); + return { + ...actual, + getActiveEmbeddedRunCount: () => hoisted.activeEmbeddedRunCount.value, + }; +}); + vi.mock("../auto-reply/reply/dispatcher-registry.js", async () => { const actual = await vi.importActual( "../auto-reply/reply/dispatcher-registry.js",