From 8440f67935610ee54ce3862a9ffe6d3da83a0670 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 26 Apr 2026 23:06:44 -0700 Subject: [PATCH] fix(gateway): defer chat event imports --- CHANGELOG.md | 1 + src/gateway/server-chat-state.ts | 295 ++++++++++++++++++++ src/gateway/server-node-session-runtime.ts | 2 +- src/gateway/server-runtime-state.ts | 2 +- src/gateway/server-runtime-subscriptions.ts | 109 +++++--- 5 files changed, 366 insertions(+), 43 deletions(-) create mode 100644 src/gateway/server-chat-state.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 2889b597383..421fe064c50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Docs: https://docs.openclaw.ai - Gateway/startup: keep CLI outbound channel send dependencies as lazy request-time senders so Gateway boot no longer imports channel plugin registration just to construct default deps. Thanks @vincentkoc. - Gateway/startup: split lightweight HTTP auth helpers away from model-override helpers so Gateway bind no longer imports model catalog selection while wiring base HTTP routes. Thanks @vincentkoc. - Gateway/startup: lazy-load plugin HTTP route dispatch when active plugin routes exist so no-plugin Gateway boot skips plugin route runtime scope setup. Thanks @vincentkoc. +- Gateway/startup: move chat run/subscriber registries onto a lightweight state module and defer chat/session event projection until the first event so Gateway boot skips session IO imports. Thanks @vincentkoc. - CLI/Gateway: use a parse-only config snapshot for plain `gateway status` reads and reuse same-path service config context so status no longer spends tens of seconds in full config validation before printing. Thanks @vincentkoc. - Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc. - Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq. diff --git a/src/gateway/server-chat-state.ts b/src/gateway/server-chat-state.ts new file mode 100644 index 00000000000..51c719abcb4 --- /dev/null +++ b/src/gateway/server-chat-state.ts @@ -0,0 +1,295 @@ +export type ChatRunEntry = { + sessionKey: string; + clientRunId: string; +}; + +export type ChatRunRegistry = { + add: (sessionId: string, entry: ChatRunEntry) => void; + peek: (sessionId: string) => ChatRunEntry | undefined; + shift: (sessionId: string) => ChatRunEntry | undefined; + remove: (sessionId: string, clientRunId: string, sessionKey?: string) => ChatRunEntry | undefined; + clear: () => void; +}; + +export function createChatRunRegistry(): ChatRunRegistry { + const chatRunSessions = new Map(); + + const add = (sessionId: string, entry: ChatRunEntry) => { + const queue = chatRunSessions.get(sessionId); + if (queue) { + queue.push(entry); + } else { + chatRunSessions.set(sessionId, [entry]); + } + }; + + const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0]; + + const shift = (sessionId: string) => { + const queue = chatRunSessions.get(sessionId); + if (!queue || queue.length === 0) { + return undefined; + } + const entry = queue.shift(); + if (!queue.length) { + chatRunSessions.delete(sessionId); + } + return entry; + }; + + const remove = (sessionId: string, clientRunId: string, sessionKey?: string) => { + const queue = chatRunSessions.get(sessionId); + if (!queue || queue.length === 0) { + return undefined; + } + const idx = queue.findIndex( + (entry) => + entry.clientRunId === clientRunId && (sessionKey ? entry.sessionKey === sessionKey : true), + ); + if (idx < 0) { + return undefined; + } + const [entry] = queue.splice(idx, 1); + if (!queue.length) { + chatRunSessions.delete(sessionId); + } + return entry; + }; + + const clear = () => { + chatRunSessions.clear(); + }; + + return { add, peek, shift, remove, clear }; +} + +export type ChatRunState = { + registry: ChatRunRegistry; + rawBuffers: Map; + buffers: Map; + deltaSentAt: Map; + /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ + deltaLastBroadcastLen: Map; + abortedRuns: Map; + clear: () => void; +}; + +export function createChatRunState(): ChatRunState { + const registry = createChatRunRegistry(); + const rawBuffers = new Map(); + const buffers = new Map(); + const deltaSentAt = new Map(); + const deltaLastBroadcastLen = new Map(); + const abortedRuns = new Map(); + + const clear = () => { + registry.clear(); + rawBuffers.clear(); + buffers.clear(); + deltaSentAt.clear(); + deltaLastBroadcastLen.clear(); + abortedRuns.clear(); + }; + + return { + registry, + rawBuffers, + buffers, + deltaSentAt, + deltaLastBroadcastLen, + abortedRuns, + clear, + }; +} + +export type ToolEventRecipientRegistry = { + add: (runId: string, connId: string) => void; + get: (runId: string) => ReadonlySet | undefined; + markFinal: (runId: string) => void; +}; + +export type SessionEventSubscriberRegistry = { + subscribe: (connId: string) => void; + unsubscribe: (connId: string) => void; + getAll: () => ReadonlySet; + clear: () => void; +}; + +export type SessionMessageSubscriberRegistry = { + subscribe: (connId: string, sessionKey: string) => void; + unsubscribe: (connId: string, sessionKey: string) => void; + unsubscribeAll: (connId: string) => void; + get: (sessionKey: string) => ReadonlySet; + clear: () => void; +}; + +type ToolRecipientEntry = { + connIds: Set; + updatedAt: number; + finalizedAt?: number; +}; + +const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000; +const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000; + +export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry { + const connIds = new Set(); + const empty = new Set(); + + return { + subscribe: (connId: string) => { + const normalized = connId.trim(); + if (!normalized) { + return; + } + connIds.add(normalized); + }, + unsubscribe: (connId: string) => { + const normalized = connId.trim(); + if (!normalized) { + return; + } + connIds.delete(normalized); + }, + getAll: () => (connIds.size > 0 ? connIds : empty), + clear: () => { + connIds.clear(); + }, + }; +} + +export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry { + const sessionToConnIds = new Map>(); + const connToSessionKeys = new Map>(); + const empty = new Set(); + + const normalize = (value: string): string => value.trim(); + + return { + subscribe: (connId: string, sessionKey: string) => { + const normalizedConnId = normalize(connId); + const normalizedSessionKey = normalize(sessionKey); + if (!normalizedConnId || !normalizedSessionKey) { + return; + } + const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set(); + connIds.add(normalizedConnId); + sessionToConnIds.set(normalizedSessionKey, connIds); + + const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set(); + sessionKeys.add(normalizedSessionKey); + connToSessionKeys.set(normalizedConnId, sessionKeys); + }, + unsubscribe: (connId: string, sessionKey: string) => { + const normalizedConnId = normalize(connId); + const normalizedSessionKey = normalize(sessionKey); + if (!normalizedConnId || !normalizedSessionKey) { + return; + } + const connIds = sessionToConnIds.get(normalizedSessionKey); + if (connIds) { + connIds.delete(normalizedConnId); + if (connIds.size === 0) { + sessionToConnIds.delete(normalizedSessionKey); + } + } + const sessionKeys = connToSessionKeys.get(normalizedConnId); + if (sessionKeys) { + sessionKeys.delete(normalizedSessionKey); + if (sessionKeys.size === 0) { + connToSessionKeys.delete(normalizedConnId); + } + } + }, + unsubscribeAll: (connId: string) => { + const normalizedConnId = normalize(connId); + if (!normalizedConnId) { + return; + } + const sessionKeys = connToSessionKeys.get(normalizedConnId); + if (!sessionKeys) { + return; + } + for (const sessionKey of sessionKeys) { + const connIds = sessionToConnIds.get(sessionKey); + if (!connIds) { + continue; + } + connIds.delete(normalizedConnId); + if (connIds.size === 0) { + sessionToConnIds.delete(sessionKey); + } + } + connToSessionKeys.delete(normalizedConnId); + }, + get: (sessionKey: string) => { + const normalizedSessionKey = normalize(sessionKey); + if (!normalizedSessionKey) { + return empty; + } + return sessionToConnIds.get(normalizedSessionKey) ?? empty; + }, + clear: () => { + sessionToConnIds.clear(); + connToSessionKeys.clear(); + }, + }; +} + +export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry { + const recipients = new Map(); + + const prune = () => { + if (recipients.size === 0) { + return; + } + const now = Date.now(); + for (const [runId, entry] of recipients) { + const cutoff = entry.finalizedAt + ? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS + : entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS; + if (now >= cutoff) { + recipients.delete(runId); + } + } + }; + + const add = (runId: string, connId: string) => { + if (!runId || !connId) { + return; + } + const now = Date.now(); + const existing = recipients.get(runId); + if (existing) { + existing.connIds.add(connId); + existing.updatedAt = now; + } else { + recipients.set(runId, { + connIds: new Set([connId]), + updatedAt: now, + }); + } + prune(); + }; + + const get = (runId: string) => { + const entry = recipients.get(runId); + if (!entry) { + return undefined; + } + entry.updatedAt = Date.now(); + prune(); + return entry.connIds; + }; + + const markFinal = (runId: string) => { + const entry = recipients.get(runId); + if (!entry) { + return; + } + entry.finalizedAt = Date.now(); + prune(); + }; + + return { add, get, markFinal }; +} diff --git a/src/gateway/server-node-session-runtime.ts b/src/gateway/server-node-session-runtime.ts index ef1b55a8b2e..2b88ff5812b 100644 --- a/src/gateway/server-node-session-runtime.ts +++ b/src/gateway/server-node-session-runtime.ts @@ -2,7 +2,7 @@ import { NodeRegistry } from "./node-registry.js"; import { createSessionEventSubscriberRegistry, createSessionMessageSubscriberRegistry, -} from "./server-chat.js"; +} from "./server-chat-state.js"; import { safeParseJson } from "./server-methods/nodes.helpers.js"; import { hasConnectedMobileNode } from "./server-mobile-nodes.js"; import { createNodeSubscriptionManager } from "./server-node-subscriptions.js"; diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index ccb359332de..563c19f8672 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -26,7 +26,7 @@ import { type ChatRunEntry, createChatRunState, createToolEventRecipientRegistry, -} from "./server-chat.js"; +} from "./server-chat-state.js"; import { MAX_PREAUTH_PAYLOAD_BYTES } from "./server-constants.js"; import { attachGatewayUpgradeHandler, diff --git a/src/gateway/server-runtime-subscriptions.ts b/src/gateway/server-runtime-subscriptions.ts index 59335c13dcb..eaeb74d0b42 100644 --- a/src/gateway/server-runtime-subscriptions.ts +++ b/src/gateway/server-runtime-subscriptions.ts @@ -3,17 +3,12 @@ import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import type { ChatAbortControllerEntry } from "./chat-abort.js"; -import { - createAgentEventHandler, - type ChatRunState, - type SessionEventSubscriberRegistry, - type SessionMessageSubscriberRegistry, - type ToolEventRecipientRegistry, -} from "./server-chat.js"; -import { - createLifecycleEventBroadcastHandler, - createTranscriptUpdateBroadcastHandler, -} from "./server-session-events.js"; +import type { + ChatRunState, + SessionEventSubscriberRegistry, + SessionMessageSubscriberRegistry, + ToolEventRecipientRegistry, +} from "./server-chat-state.js"; export function startGatewayEventSubscriptions(params: { broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void; @@ -33,42 +28,74 @@ export function startGatewayEventSubscriptions(params: { sessionMessageSubscribers: SessionMessageSubscriberRegistry; chatAbortControllers: Map; }) { - const agentUnsub = onAgentEvent( - createAgentEventHandler({ - broadcast: params.broadcast, - broadcastToConnIds: params.broadcastToConnIds, - nodeSendToSession: params.nodeSendToSession, - agentRunSeq: params.agentRunSeq, - chatRunState: params.chatRunState, - resolveSessionKeyForRun: params.resolveSessionKeyForRun, - clearAgentRunContext: params.clearAgentRunContext, - toolEventRecipients: params.toolEventRecipients, - sessionEventSubscribers: params.sessionEventSubscribers, - isChatSendRunActive: (runId) => { - const entry = params.chatAbortControllers.get(runId); - return entry !== undefined && entry.kind !== "agent"; - }, - }), - ); + let agentEventHandlerPromise: Promise< + ReturnType + > | null = null; + const getAgentEventHandler = () => { + agentEventHandlerPromise ??= import("./server-chat.js").then(({ createAgentEventHandler }) => + createAgentEventHandler({ + broadcast: params.broadcast, + broadcastToConnIds: params.broadcastToConnIds, + nodeSendToSession: params.nodeSendToSession, + agentRunSeq: params.agentRunSeq, + chatRunState: params.chatRunState, + resolveSessionKeyForRun: params.resolveSessionKeyForRun, + clearAgentRunContext: params.clearAgentRunContext, + toolEventRecipients: params.toolEventRecipients, + sessionEventSubscribers: params.sessionEventSubscribers, + isChatSendRunActive: (runId) => { + const entry = params.chatAbortControllers.get(runId); + return entry !== undefined && entry.kind !== "agent"; + }, + }), + ); + return agentEventHandlerPromise; + }; + + let transcriptUpdateHandlerPromise: Promise< + ReturnType + > | null = null; + const getTranscriptUpdateHandler = () => { + transcriptUpdateHandlerPromise ??= import("./server-session-events.js").then( + ({ createTranscriptUpdateBroadcastHandler }) => + createTranscriptUpdateBroadcastHandler({ + broadcastToConnIds: params.broadcastToConnIds, + sessionEventSubscribers: params.sessionEventSubscribers, + sessionMessageSubscribers: params.sessionMessageSubscribers, + }), + ); + return transcriptUpdateHandlerPromise; + }; + + let lifecycleEventHandlerPromise: Promise< + ReturnType + > | null = null; + const getLifecycleEventHandler = () => { + lifecycleEventHandlerPromise ??= import("./server-session-events.js").then( + ({ createLifecycleEventBroadcastHandler }) => + createLifecycleEventBroadcastHandler({ + broadcastToConnIds: params.broadcastToConnIds, + sessionEventSubscribers: params.sessionEventSubscribers, + }), + ); + return lifecycleEventHandlerPromise; + }; + + const agentUnsub = onAgentEvent((evt) => { + void getAgentEventHandler().then((handler) => handler(evt)); + }); const heartbeatUnsub = onHeartbeatEvent((evt) => { params.broadcast("heartbeat", evt, { dropIfSlow: true }); }); - const transcriptUnsub = onSessionTranscriptUpdate( - createTranscriptUpdateBroadcastHandler({ - broadcastToConnIds: params.broadcastToConnIds, - sessionEventSubscribers: params.sessionEventSubscribers, - sessionMessageSubscribers: params.sessionMessageSubscribers, - }), - ); + const transcriptUnsub = onSessionTranscriptUpdate((evt) => { + void getTranscriptUpdateHandler().then((handler) => handler(evt)); + }); - const lifecycleUnsub = onSessionLifecycleEvent( - createLifecycleEventBroadcastHandler({ - broadcastToConnIds: params.broadcastToConnIds, - sessionEventSubscribers: params.sessionEventSubscribers, - }), - ); + const lifecycleUnsub = onSessionLifecycleEvent((evt) => { + void getLifecycleEventHandler().then((handler) => handler(evt)); + }); return { agentUnsub,