fix(gateway): defer chat event imports

This commit is contained in:
Vincent Koc
2026-04-26 23:06:44 -07:00
parent 6175309c01
commit 8440f67935
5 changed files with 366 additions and 43 deletions

View File

@@ -37,6 +37,7 @@ Docs: https://docs.openclaw.ai
- Gateway/startup: keep CLI outbound channel send dependencies as lazy request-time senders so Gateway boot no longer imports channel plugin registration just to construct default deps. Thanks @vincentkoc.
- Gateway/startup: split lightweight HTTP auth helpers away from model-override helpers so Gateway bind no longer imports model catalog selection while wiring base HTTP routes. Thanks @vincentkoc.
- Gateway/startup: lazy-load plugin HTTP route dispatch when active plugin routes exist so no-plugin Gateway boot skips plugin route runtime scope setup. Thanks @vincentkoc.
- Gateway/startup: move chat run/subscriber registries onto a lightweight state module and defer chat/session event projection until the first event so Gateway boot skips session IO imports. Thanks @vincentkoc.
- CLI/Gateway: use a parse-only config snapshot for plain `gateway status` reads and reuse same-path service config context so status no longer spends tens of seconds in full config validation before printing. Thanks @vincentkoc.
- Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc.
- Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq.

View File

@@ -0,0 +1,295 @@
export type ChatRunEntry = {
sessionKey: string;
clientRunId: string;
};
export type ChatRunRegistry = {
add: (sessionId: string, entry: ChatRunEntry) => void;
peek: (sessionId: string) => ChatRunEntry | undefined;
shift: (sessionId: string) => ChatRunEntry | undefined;
remove: (sessionId: string, clientRunId: string, sessionKey?: string) => ChatRunEntry | undefined;
clear: () => void;
};
export function createChatRunRegistry(): ChatRunRegistry {
const chatRunSessions = new Map<string, ChatRunEntry[]>();
const add = (sessionId: string, entry: ChatRunEntry) => {
const queue = chatRunSessions.get(sessionId);
if (queue) {
queue.push(entry);
} else {
chatRunSessions.set(sessionId, [entry]);
}
};
const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0];
const shift = (sessionId: string) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) {
return undefined;
}
const entry = queue.shift();
if (!queue.length) {
chatRunSessions.delete(sessionId);
}
return entry;
};
const remove = (sessionId: string, clientRunId: string, sessionKey?: string) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) {
return undefined;
}
const idx = queue.findIndex(
(entry) =>
entry.clientRunId === clientRunId && (sessionKey ? entry.sessionKey === sessionKey : true),
);
if (idx < 0) {
return undefined;
}
const [entry] = queue.splice(idx, 1);
if (!queue.length) {
chatRunSessions.delete(sessionId);
}
return entry;
};
const clear = () => {
chatRunSessions.clear();
};
return { add, peek, shift, remove, clear };
}
export type ChatRunState = {
registry: ChatRunRegistry;
rawBuffers: Map<string, string>;
buffers: Map<string, string>;
deltaSentAt: Map<string, number>;
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
deltaLastBroadcastLen: Map<string, number>;
abortedRuns: Map<string, number>;
clear: () => void;
};
export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry();
const rawBuffers = new Map<string, string>();
const buffers = new Map<string, string>();
const deltaSentAt = new Map<string, number>();
const deltaLastBroadcastLen = new Map<string, number>();
const abortedRuns = new Map<string, number>();
const clear = () => {
registry.clear();
rawBuffers.clear();
buffers.clear();
deltaSentAt.clear();
deltaLastBroadcastLen.clear();
abortedRuns.clear();
};
return {
registry,
rawBuffers,
buffers,
deltaSentAt,
deltaLastBroadcastLen,
abortedRuns,
clear,
};
}
export type ToolEventRecipientRegistry = {
add: (runId: string, connId: string) => void;
get: (runId: string) => ReadonlySet<string> | undefined;
markFinal: (runId: string) => void;
};
export type SessionEventSubscriberRegistry = {
subscribe: (connId: string) => void;
unsubscribe: (connId: string) => void;
getAll: () => ReadonlySet<string>;
clear: () => void;
};
export type SessionMessageSubscriberRegistry = {
subscribe: (connId: string, sessionKey: string) => void;
unsubscribe: (connId: string, sessionKey: string) => void;
unsubscribeAll: (connId: string) => void;
get: (sessionKey: string) => ReadonlySet<string>;
clear: () => void;
};
type ToolRecipientEntry = {
connIds: Set<string>;
updatedAt: number;
finalizedAt?: number;
};
const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000;
const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000;
export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry {
const connIds = new Set<string>();
const empty = new Set<string>();
return {
subscribe: (connId: string) => {
const normalized = connId.trim();
if (!normalized) {
return;
}
connIds.add(normalized);
},
unsubscribe: (connId: string) => {
const normalized = connId.trim();
if (!normalized) {
return;
}
connIds.delete(normalized);
},
getAll: () => (connIds.size > 0 ? connIds : empty),
clear: () => {
connIds.clear();
},
};
}
export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry {
const sessionToConnIds = new Map<string, Set<string>>();
const connToSessionKeys = new Map<string, Set<string>>();
const empty = new Set<string>();
const normalize = (value: string): string => value.trim();
return {
subscribe: (connId: string, sessionKey: string) => {
const normalizedConnId = normalize(connId);
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedConnId || !normalizedSessionKey) {
return;
}
const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set<string>();
connIds.add(normalizedConnId);
sessionToConnIds.set(normalizedSessionKey, connIds);
const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set<string>();
sessionKeys.add(normalizedSessionKey);
connToSessionKeys.set(normalizedConnId, sessionKeys);
},
unsubscribe: (connId: string, sessionKey: string) => {
const normalizedConnId = normalize(connId);
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedConnId || !normalizedSessionKey) {
return;
}
const connIds = sessionToConnIds.get(normalizedSessionKey);
if (connIds) {
connIds.delete(normalizedConnId);
if (connIds.size === 0) {
sessionToConnIds.delete(normalizedSessionKey);
}
}
const sessionKeys = connToSessionKeys.get(normalizedConnId);
if (sessionKeys) {
sessionKeys.delete(normalizedSessionKey);
if (sessionKeys.size === 0) {
connToSessionKeys.delete(normalizedConnId);
}
}
},
unsubscribeAll: (connId: string) => {
const normalizedConnId = normalize(connId);
if (!normalizedConnId) {
return;
}
const sessionKeys = connToSessionKeys.get(normalizedConnId);
if (!sessionKeys) {
return;
}
for (const sessionKey of sessionKeys) {
const connIds = sessionToConnIds.get(sessionKey);
if (!connIds) {
continue;
}
connIds.delete(normalizedConnId);
if (connIds.size === 0) {
sessionToConnIds.delete(sessionKey);
}
}
connToSessionKeys.delete(normalizedConnId);
},
get: (sessionKey: string) => {
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedSessionKey) {
return empty;
}
return sessionToConnIds.get(normalizedSessionKey) ?? empty;
},
clear: () => {
sessionToConnIds.clear();
connToSessionKeys.clear();
},
};
}
export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
const recipients = new Map<string, ToolRecipientEntry>();
const prune = () => {
if (recipients.size === 0) {
return;
}
const now = Date.now();
for (const [runId, entry] of recipients) {
const cutoff = entry.finalizedAt
? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS
: entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS;
if (now >= cutoff) {
recipients.delete(runId);
}
}
};
const add = (runId: string, connId: string) => {
if (!runId || !connId) {
return;
}
const now = Date.now();
const existing = recipients.get(runId);
if (existing) {
existing.connIds.add(connId);
existing.updatedAt = now;
} else {
recipients.set(runId, {
connIds: new Set([connId]),
updatedAt: now,
});
}
prune();
};
const get = (runId: string) => {
const entry = recipients.get(runId);
if (!entry) {
return undefined;
}
entry.updatedAt = Date.now();
prune();
return entry.connIds;
};
const markFinal = (runId: string) => {
const entry = recipients.get(runId);
if (!entry) {
return;
}
entry.finalizedAt = Date.now();
prune();
};
return { add, get, markFinal };
}

View File

@@ -2,7 +2,7 @@ import { NodeRegistry } from "./node-registry.js";
import {
createSessionEventSubscriberRegistry,
createSessionMessageSubscriberRegistry,
} from "./server-chat.js";
} from "./server-chat-state.js";
import { safeParseJson } from "./server-methods/nodes.helpers.js";
import { hasConnectedMobileNode } from "./server-mobile-nodes.js";
import { createNodeSubscriptionManager } from "./server-node-subscriptions.js";

View File

@@ -26,7 +26,7 @@ import {
type ChatRunEntry,
createChatRunState,
createToolEventRecipientRegistry,
} from "./server-chat.js";
} from "./server-chat-state.js";
import { MAX_PREAUTH_PAYLOAD_BYTES } from "./server-constants.js";
import {
attachGatewayUpgradeHandler,

View File

@@ -3,17 +3,12 @@ import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import {
createAgentEventHandler,
type ChatRunState,
type SessionEventSubscriberRegistry,
type SessionMessageSubscriberRegistry,
type ToolEventRecipientRegistry,
} from "./server-chat.js";
import {
createLifecycleEventBroadcastHandler,
createTranscriptUpdateBroadcastHandler,
} from "./server-session-events.js";
import type {
ChatRunState,
SessionEventSubscriberRegistry,
SessionMessageSubscriberRegistry,
ToolEventRecipientRegistry,
} from "./server-chat-state.js";
export function startGatewayEventSubscriptions(params: {
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
@@ -33,42 +28,74 @@ export function startGatewayEventSubscriptions(params: {
sessionMessageSubscribers: SessionMessageSubscriberRegistry;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
}) {
const agentUnsub = onAgentEvent(
createAgentEventHandler({
broadcast: params.broadcast,
broadcastToConnIds: params.broadcastToConnIds,
nodeSendToSession: params.nodeSendToSession,
agentRunSeq: params.agentRunSeq,
chatRunState: params.chatRunState,
resolveSessionKeyForRun: params.resolveSessionKeyForRun,
clearAgentRunContext: params.clearAgentRunContext,
toolEventRecipients: params.toolEventRecipients,
sessionEventSubscribers: params.sessionEventSubscribers,
isChatSendRunActive: (runId) => {
const entry = params.chatAbortControllers.get(runId);
return entry !== undefined && entry.kind !== "agent";
},
}),
);
let agentEventHandlerPromise: Promise<
ReturnType<typeof import("./server-chat.js").createAgentEventHandler>
> | null = null;
const getAgentEventHandler = () => {
agentEventHandlerPromise ??= import("./server-chat.js").then(({ createAgentEventHandler }) =>
createAgentEventHandler({
broadcast: params.broadcast,
broadcastToConnIds: params.broadcastToConnIds,
nodeSendToSession: params.nodeSendToSession,
agentRunSeq: params.agentRunSeq,
chatRunState: params.chatRunState,
resolveSessionKeyForRun: params.resolveSessionKeyForRun,
clearAgentRunContext: params.clearAgentRunContext,
toolEventRecipients: params.toolEventRecipients,
sessionEventSubscribers: params.sessionEventSubscribers,
isChatSendRunActive: (runId) => {
const entry = params.chatAbortControllers.get(runId);
return entry !== undefined && entry.kind !== "agent";
},
}),
);
return agentEventHandlerPromise;
};
let transcriptUpdateHandlerPromise: Promise<
ReturnType<typeof import("./server-session-events.js").createTranscriptUpdateBroadcastHandler>
> | null = null;
const getTranscriptUpdateHandler = () => {
transcriptUpdateHandlerPromise ??= import("./server-session-events.js").then(
({ createTranscriptUpdateBroadcastHandler }) =>
createTranscriptUpdateBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
sessionMessageSubscribers: params.sessionMessageSubscribers,
}),
);
return transcriptUpdateHandlerPromise;
};
let lifecycleEventHandlerPromise: Promise<
ReturnType<typeof import("./server-session-events.js").createLifecycleEventBroadcastHandler>
> | null = null;
const getLifecycleEventHandler = () => {
lifecycleEventHandlerPromise ??= import("./server-session-events.js").then(
({ createLifecycleEventBroadcastHandler }) =>
createLifecycleEventBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
}),
);
return lifecycleEventHandlerPromise;
};
const agentUnsub = onAgentEvent((evt) => {
void getAgentEventHandler().then((handler) => handler(evt));
});
const heartbeatUnsub = onHeartbeatEvent((evt) => {
params.broadcast("heartbeat", evt, { dropIfSlow: true });
});
const transcriptUnsub = onSessionTranscriptUpdate(
createTranscriptUpdateBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
sessionMessageSubscribers: params.sessionMessageSubscribers,
}),
);
const transcriptUnsub = onSessionTranscriptUpdate((evt) => {
void getTranscriptUpdateHandler().then((handler) => handler(evt));
});
const lifecycleUnsub = onSessionLifecycleEvent(
createLifecycleEventBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
}),
);
const lifecycleUnsub = onSessionLifecycleEvent((evt) => {
void getLifecycleEventHandler().then((handler) => handler(evt));
});
return {
agentUnsub,