Files
openclaw/src/gateway/server-chat-state.ts
samzong bb8aa0cfe2 [Fix] Throttle agent event fanout (#80335)
Merged via squash.

Prepared head SHA: 5dddb405ad
Co-authored-by: samzong <13782141+samzong@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-05-13 22:21:46 -07:00

315 lines
8.8 KiB
TypeScript

import type { AgentEventPayload } from "../infra/agent-events.js";
export type ChatRunEntry = {
sessionKey: string;
clientRunId: string;
};
export type BufferedAgentEvent = {
sessionKey?: string;
payload: AgentEventPayload & { spawnedBy?: string };
};
export type ChatRunRegistry = {
add: (sessionId: string, entry: ChatRunEntry) => void;
peek: (sessionId: string) => ChatRunEntry | undefined;
shift: (sessionId: string) => ChatRunEntry | undefined;
remove: (sessionId: string, clientRunId: string, sessionKey?: string) => ChatRunEntry | undefined;
clear: () => void;
};
export function createChatRunRegistry(): ChatRunRegistry {
const chatRunSessions = new Map<string, ChatRunEntry[]>();
const add = (sessionId: string, entry: ChatRunEntry) => {
const queue = chatRunSessions.get(sessionId);
if (queue) {
queue.push(entry);
} else {
chatRunSessions.set(sessionId, [entry]);
}
};
const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0];
const shift = (sessionId: string) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) {
return undefined;
}
const entry = queue.shift();
if (!queue.length) {
chatRunSessions.delete(sessionId);
}
return entry;
};
const remove = (sessionId: string, clientRunId: string, sessionKey?: string) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) {
return undefined;
}
const idx = queue.findIndex(
(entry) =>
entry.clientRunId === clientRunId && (sessionKey ? entry.sessionKey === sessionKey : true),
);
if (idx < 0) {
return undefined;
}
const [entry] = queue.splice(idx, 1);
if (!queue.length) {
chatRunSessions.delete(sessionId);
}
return entry;
};
const clear = () => {
chatRunSessions.clear();
};
return { add, peek, shift, remove, clear };
}
export type ChatRunState = {
registry: ChatRunRegistry;
rawBuffers: Map<string, string>;
buffers: Map<string, string>;
deltaSentAt: Map<string, number>;
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
deltaLastBroadcastLen: Map<string, number>;
deltaLastBroadcastText: Map<string, string>;
agentDeltaSentAt: Map<string, number>;
bufferedAgentEvents: Map<string, BufferedAgentEvent>;
abortedRuns: Map<string, number>;
clear: () => void;
};
export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry();
const rawBuffers = new Map<string, string>();
const buffers = new Map<string, string>();
const deltaSentAt = new Map<string, number>();
const deltaLastBroadcastLen = new Map<string, number>();
const deltaLastBroadcastText = new Map<string, string>();
const agentDeltaSentAt = new Map<string, number>();
const bufferedAgentEvents = new Map<string, BufferedAgentEvent>();
const abortedRuns = new Map<string, number>();
const clear = () => {
registry.clear();
rawBuffers.clear();
buffers.clear();
deltaSentAt.clear();
deltaLastBroadcastLen.clear();
deltaLastBroadcastText.clear();
agentDeltaSentAt.clear();
bufferedAgentEvents.clear();
abortedRuns.clear();
};
return {
registry,
rawBuffers,
buffers,
deltaSentAt,
deltaLastBroadcastLen,
deltaLastBroadcastText,
agentDeltaSentAt,
bufferedAgentEvents,
abortedRuns,
clear,
};
}
export type ToolEventRecipientRegistry = {
add: (runId: string, connId: string) => void;
get: (runId: string) => ReadonlySet<string> | undefined;
markFinal: (runId: string) => void;
};
export type SessionEventSubscriberRegistry = {
subscribe: (connId: string) => void;
unsubscribe: (connId: string) => void;
getAll: () => ReadonlySet<string>;
clear: () => void;
};
export type SessionMessageSubscriberRegistry = {
subscribe: (connId: string, sessionKey: string) => void;
unsubscribe: (connId: string, sessionKey: string) => void;
unsubscribeAll: (connId: string) => void;
get: (sessionKey: string) => ReadonlySet<string>;
clear: () => void;
};
type ToolRecipientEntry = {
connIds: Set<string>;
updatedAt: number;
finalizedAt?: number;
};
const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000;
const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000;
export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry {
const connIds = new Set<string>();
const empty = new Set<string>();
return {
subscribe: (connId: string) => {
const normalized = connId.trim();
if (!normalized) {
return;
}
connIds.add(normalized);
},
unsubscribe: (connId: string) => {
const normalized = connId.trim();
if (!normalized) {
return;
}
connIds.delete(normalized);
},
getAll: () => (connIds.size > 0 ? connIds : empty),
clear: () => {
connIds.clear();
},
};
}
export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry {
const sessionToConnIds = new Map<string, Set<string>>();
const connToSessionKeys = new Map<string, Set<string>>();
const empty = new Set<string>();
const normalize = (value: string): string => value.trim();
return {
subscribe: (connId: string, sessionKey: string) => {
const normalizedConnId = normalize(connId);
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedConnId || !normalizedSessionKey) {
return;
}
const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set<string>();
connIds.add(normalizedConnId);
sessionToConnIds.set(normalizedSessionKey, connIds);
const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set<string>();
sessionKeys.add(normalizedSessionKey);
connToSessionKeys.set(normalizedConnId, sessionKeys);
},
unsubscribe: (connId: string, sessionKey: string) => {
const normalizedConnId = normalize(connId);
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedConnId || !normalizedSessionKey) {
return;
}
const connIds = sessionToConnIds.get(normalizedSessionKey);
if (connIds) {
connIds.delete(normalizedConnId);
if (connIds.size === 0) {
sessionToConnIds.delete(normalizedSessionKey);
}
}
const sessionKeys = connToSessionKeys.get(normalizedConnId);
if (sessionKeys) {
sessionKeys.delete(normalizedSessionKey);
if (sessionKeys.size === 0) {
connToSessionKeys.delete(normalizedConnId);
}
}
},
unsubscribeAll: (connId: string) => {
const normalizedConnId = normalize(connId);
if (!normalizedConnId) {
return;
}
const sessionKeys = connToSessionKeys.get(normalizedConnId);
if (!sessionKeys) {
return;
}
for (const sessionKey of sessionKeys) {
const connIds = sessionToConnIds.get(sessionKey);
if (!connIds) {
continue;
}
connIds.delete(normalizedConnId);
if (connIds.size === 0) {
sessionToConnIds.delete(sessionKey);
}
}
connToSessionKeys.delete(normalizedConnId);
},
get: (sessionKey: string) => {
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedSessionKey) {
return empty;
}
return sessionToConnIds.get(normalizedSessionKey) ?? empty;
},
clear: () => {
sessionToConnIds.clear();
connToSessionKeys.clear();
},
};
}
export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
const recipients = new Map<string, ToolRecipientEntry>();
const prune = () => {
if (recipients.size === 0) {
return;
}
const now = Date.now();
for (const [runId, entry] of recipients) {
const cutoff = entry.finalizedAt
? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS
: entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS;
if (now >= cutoff) {
recipients.delete(runId);
}
}
};
const add = (runId: string, connId: string) => {
if (!runId || !connId) {
return;
}
const now = Date.now();
const existing = recipients.get(runId);
if (existing) {
existing.connIds.add(connId);
existing.updatedAt = now;
} else {
recipients.set(runId, {
connIds: new Set([connId]),
updatedAt: now,
});
}
prune();
};
const get = (runId: string) => {
const entry = recipients.get(runId);
if (!entry) {
return undefined;
}
entry.updatedAt = Date.now();
prune();
return entry.connIds;
};
const markFinal = (runId: string) => {
const entry = recipients.get(runId);
if (!entry) {
return;
}
entry.finalizedAt = Date.now();
prune();
};
return { add, get, markFinal };
}