diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index ef390114ef0..baca779f231 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -11,12 +11,33 @@ import { resolveMergedAssistantText, shouldSuppressAssistantEventForLiveChat, } from "./live-chat-projector.js"; +import type { + ChatRunState, + SessionEventSubscriberRegistry, + ToolEventRecipientRegistry, +} from "./server-chat-state.js"; import { loadGatewaySessionRow } from "./server-chat.load-gateway-session-row.runtime.js"; import { persistGatewaySessionLifecycleEvent } from "./server-chat.persist-session-lifecycle.runtime.js"; import { deriveGatewaySessionLifecycleSnapshot } from "./session-lifecycle-state.js"; import { loadSessionEntry } from "./session-utils.js"; import { formatForLog } from "./ws-log.js"; +export { + createChatRunRegistry, + createChatRunState, + createSessionEventSubscriberRegistry, + createSessionMessageSubscriberRegistry, + createToolEventRecipientRegistry, +} from "./server-chat-state.js"; +export type { + ChatRunEntry, + ChatRunRegistry, + ChatRunState, + SessionEventSubscriberRegistry, + SessionMessageSubscriberRegistry, + ToolEventRecipientRegistry, +} from "./server-chat-state.js"; + function resolveHeartbeatAckMaxChars(): number { try { const cfg = getRuntimeConfig(); @@ -84,307 +105,12 @@ function normalizeHeartbeatChatFinalText(params: { return { suppress: false, text: stripped.text }; } -export type ChatRunEntry = { - sessionKey: string; - clientRunId: string; -}; - -export type ChatRunRegistry = { - add: (sessionId: string, entry: ChatRunEntry) => void; - peek: (sessionId: string) => ChatRunEntry | undefined; - shift: (sessionId: string) => ChatRunEntry | undefined; - remove: (sessionId: string, clientRunId: string, sessionKey?: string) => ChatRunEntry | undefined; - clear: () => void; -}; - -export function createChatRunRegistry(): ChatRunRegistry { - const chatRunSessions = new Map(); - - const add = (sessionId: string, entry: ChatRunEntry) => { - const queue = chatRunSessions.get(sessionId); - if (queue) { - queue.push(entry); - } else { - chatRunSessions.set(sessionId, [entry]); - } - }; - - const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0]; - - const shift = (sessionId: string) => { - const queue = chatRunSessions.get(sessionId); - if (!queue || queue.length === 0) { - return undefined; - } - const entry = queue.shift(); - if (!queue.length) { - chatRunSessions.delete(sessionId); - } - return entry; - }; - - const remove = (sessionId: string, clientRunId: string, sessionKey?: string) => { - const queue = chatRunSessions.get(sessionId); - if (!queue || queue.length === 0) { - return undefined; - } - const idx = queue.findIndex( - (entry) => - entry.clientRunId === clientRunId && (sessionKey ? entry.sessionKey === sessionKey : true), - ); - if (idx < 0) { - return undefined; - } - const [entry] = queue.splice(idx, 1); - if (!queue.length) { - chatRunSessions.delete(sessionId); - } - return entry; - }; - - const clear = () => { - chatRunSessions.clear(); - }; - - return { add, peek, shift, remove, clear }; -} - -export type ChatRunState = { - registry: ChatRunRegistry; - rawBuffers: Map; - buffers: Map; - deltaSentAt: Map; - /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ - deltaLastBroadcastLen: Map; - abortedRuns: Map; - clear: () => void; -}; - -export function createChatRunState(): ChatRunState { - const registry = createChatRunRegistry(); - const rawBuffers = new Map(); - const buffers = new Map(); - const deltaSentAt = new Map(); - const deltaLastBroadcastLen = new Map(); - const abortedRuns = new Map(); - - const clear = () => { - registry.clear(); - rawBuffers.clear(); - buffers.clear(); - deltaSentAt.clear(); - deltaLastBroadcastLen.clear(); - abortedRuns.clear(); - }; - - return { - registry, - rawBuffers, - buffers, - deltaSentAt, - deltaLastBroadcastLen, - abortedRuns, - clear, - }; -} - -export type ToolEventRecipientRegistry = { - add: (runId: string, connId: string) => void; - get: (runId: string) => ReadonlySet | undefined; - markFinal: (runId: string) => void; -}; - -export type SessionEventSubscriberRegistry = { - subscribe: (connId: string) => void; - unsubscribe: (connId: string) => void; - getAll: () => ReadonlySet; - clear: () => void; -}; - -export type SessionMessageSubscriberRegistry = { - subscribe: (connId: string, sessionKey: string) => void; - unsubscribe: (connId: string, sessionKey: string) => void; - unsubscribeAll: (connId: string) => void; - get: (sessionKey: string) => ReadonlySet; - clear: () => void; -}; - -type ToolRecipientEntry = { - connIds: Set; - updatedAt: number; - finalizedAt?: number; -}; - -const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000; -const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000; /** * Keep this aligned with the agent.wait lifecycle-error grace so chat surfaces * do not finalize a run before fallback or retry reuses the same runId. */ const AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000; -export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry { - const connIds = new Set(); - const empty = new Set(); - - return { - subscribe: (connId: string) => { - const normalized = connId.trim(); - if (!normalized) { - return; - } - connIds.add(normalized); - }, - unsubscribe: (connId: string) => { - const normalized = connId.trim(); - if (!normalized) { - return; - } - connIds.delete(normalized); - }, - getAll: () => (connIds.size > 0 ? connIds : empty), - clear: () => { - connIds.clear(); - }, - }; -} - -export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry { - const sessionToConnIds = new Map>(); - const connToSessionKeys = new Map>(); - const empty = new Set(); - - const normalize = (value: string): string => value.trim(); - - return { - subscribe: (connId: string, sessionKey: string) => { - const normalizedConnId = normalize(connId); - const normalizedSessionKey = normalize(sessionKey); - if (!normalizedConnId || !normalizedSessionKey) { - return; - } - const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set(); - connIds.add(normalizedConnId); - sessionToConnIds.set(normalizedSessionKey, connIds); - - const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set(); - sessionKeys.add(normalizedSessionKey); - connToSessionKeys.set(normalizedConnId, sessionKeys); - }, - unsubscribe: (connId: string, sessionKey: string) => { - const normalizedConnId = normalize(connId); - const normalizedSessionKey = normalize(sessionKey); - if (!normalizedConnId || !normalizedSessionKey) { - return; - } - const connIds = sessionToConnIds.get(normalizedSessionKey); - if (connIds) { - connIds.delete(normalizedConnId); - if (connIds.size === 0) { - sessionToConnIds.delete(normalizedSessionKey); - } - } - const sessionKeys = connToSessionKeys.get(normalizedConnId); - if (sessionKeys) { - sessionKeys.delete(normalizedSessionKey); - if (sessionKeys.size === 0) { - connToSessionKeys.delete(normalizedConnId); - } - } - }, - unsubscribeAll: (connId: string) => { - const normalizedConnId = normalize(connId); - if (!normalizedConnId) { - return; - } - const sessionKeys = connToSessionKeys.get(normalizedConnId); - if (!sessionKeys) { - return; - } - for (const sessionKey of sessionKeys) { - const connIds = sessionToConnIds.get(sessionKey); - if (!connIds) { - continue; - } - connIds.delete(normalizedConnId); - if (connIds.size === 0) { - sessionToConnIds.delete(sessionKey); - } - } - connToSessionKeys.delete(normalizedConnId); - }, - get: (sessionKey: string) => { - const normalizedSessionKey = normalize(sessionKey); - if (!normalizedSessionKey) { - return empty; - } - return sessionToConnIds.get(normalizedSessionKey) ?? empty; - }, - clear: () => { - sessionToConnIds.clear(); - connToSessionKeys.clear(); - }, - }; -} - -export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry { - const recipients = new Map(); - - const prune = () => { - if (recipients.size === 0) { - return; - } - const now = Date.now(); - for (const [runId, entry] of recipients) { - const cutoff = entry.finalizedAt - ? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS - : entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS; - if (now >= cutoff) { - recipients.delete(runId); - } - } - }; - - const add = (runId: string, connId: string) => { - if (!runId || !connId) { - return; - } - const now = Date.now(); - const existing = recipients.get(runId); - if (existing) { - existing.connIds.add(connId); - existing.updatedAt = now; - } else { - recipients.set(runId, { - connIds: new Set([connId]), - updatedAt: now, - }); - } - prune(); - }; - - const get = (runId: string) => { - const entry = recipients.get(runId); - if (!entry) { - return undefined; - } - entry.updatedAt = Date.now(); - prune(); - return entry.connIds; - }; - - const markFinal = (runId: string) => { - const entry = recipients.get(runId); - if (!entry) { - return; - } - entry.finalizedAt = Date.now(); - prune(); - }; - - return { add, get, markFinal }; -} - export type ChatEventBroadcast = ( event: string, payload: unknown,