mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:40:43 +00:00
refactor(gateway): reuse chat state registries
This commit is contained in:
@@ -11,12 +11,33 @@ import {
|
||||
resolveMergedAssistantText,
|
||||
shouldSuppressAssistantEventForLiveChat,
|
||||
} from "./live-chat-projector.js";
|
||||
import type {
|
||||
ChatRunState,
|
||||
SessionEventSubscriberRegistry,
|
||||
ToolEventRecipientRegistry,
|
||||
} from "./server-chat-state.js";
|
||||
import { loadGatewaySessionRow } from "./server-chat.load-gateway-session-row.runtime.js";
|
||||
import { persistGatewaySessionLifecycleEvent } from "./server-chat.persist-session-lifecycle.runtime.js";
|
||||
import { deriveGatewaySessionLifecycleSnapshot } from "./session-lifecycle-state.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
import { formatForLog } from "./ws-log.js";
|
||||
|
||||
export {
|
||||
createChatRunRegistry,
|
||||
createChatRunState,
|
||||
createSessionEventSubscriberRegistry,
|
||||
createSessionMessageSubscriberRegistry,
|
||||
createToolEventRecipientRegistry,
|
||||
} from "./server-chat-state.js";
|
||||
export type {
|
||||
ChatRunEntry,
|
||||
ChatRunRegistry,
|
||||
ChatRunState,
|
||||
SessionEventSubscriberRegistry,
|
||||
SessionMessageSubscriberRegistry,
|
||||
ToolEventRecipientRegistry,
|
||||
} from "./server-chat-state.js";
|
||||
|
||||
function resolveHeartbeatAckMaxChars(): number {
|
||||
try {
|
||||
const cfg = getRuntimeConfig();
|
||||
@@ -84,307 +105,12 @@ function normalizeHeartbeatChatFinalText(params: {
|
||||
return { suppress: false, text: stripped.text };
|
||||
}
|
||||
|
||||
export type ChatRunEntry = {
|
||||
sessionKey: string;
|
||||
clientRunId: string;
|
||||
};
|
||||
|
||||
export type ChatRunRegistry = {
|
||||
add: (sessionId: string, entry: ChatRunEntry) => void;
|
||||
peek: (sessionId: string) => ChatRunEntry | undefined;
|
||||
shift: (sessionId: string) => ChatRunEntry | undefined;
|
||||
remove: (sessionId: string, clientRunId: string, sessionKey?: string) => ChatRunEntry | undefined;
|
||||
clear: () => void;
|
||||
};
|
||||
|
||||
export function createChatRunRegistry(): ChatRunRegistry {
|
||||
const chatRunSessions = new Map<string, ChatRunEntry[]>();
|
||||
|
||||
const add = (sessionId: string, entry: ChatRunEntry) => {
|
||||
const queue = chatRunSessions.get(sessionId);
|
||||
if (queue) {
|
||||
queue.push(entry);
|
||||
} else {
|
||||
chatRunSessions.set(sessionId, [entry]);
|
||||
}
|
||||
};
|
||||
|
||||
const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0];
|
||||
|
||||
const shift = (sessionId: string) => {
|
||||
const queue = chatRunSessions.get(sessionId);
|
||||
if (!queue || queue.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
const entry = queue.shift();
|
||||
if (!queue.length) {
|
||||
chatRunSessions.delete(sessionId);
|
||||
}
|
||||
return entry;
|
||||
};
|
||||
|
||||
const remove = (sessionId: string, clientRunId: string, sessionKey?: string) => {
|
||||
const queue = chatRunSessions.get(sessionId);
|
||||
if (!queue || queue.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
const idx = queue.findIndex(
|
||||
(entry) =>
|
||||
entry.clientRunId === clientRunId && (sessionKey ? entry.sessionKey === sessionKey : true),
|
||||
);
|
||||
if (idx < 0) {
|
||||
return undefined;
|
||||
}
|
||||
const [entry] = queue.splice(idx, 1);
|
||||
if (!queue.length) {
|
||||
chatRunSessions.delete(sessionId);
|
||||
}
|
||||
return entry;
|
||||
};
|
||||
|
||||
const clear = () => {
|
||||
chatRunSessions.clear();
|
||||
};
|
||||
|
||||
return { add, peek, shift, remove, clear };
|
||||
}
|
||||
|
||||
export type ChatRunState = {
|
||||
registry: ChatRunRegistry;
|
||||
rawBuffers: Map<string, string>;
|
||||
buffers: Map<string, string>;
|
||||
deltaSentAt: Map<string, number>;
|
||||
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
|
||||
deltaLastBroadcastLen: Map<string, number>;
|
||||
abortedRuns: Map<string, number>;
|
||||
clear: () => void;
|
||||
};
|
||||
|
||||
export function createChatRunState(): ChatRunState {
|
||||
const registry = createChatRunRegistry();
|
||||
const rawBuffers = new Map<string, string>();
|
||||
const buffers = new Map<string, string>();
|
||||
const deltaSentAt = new Map<string, number>();
|
||||
const deltaLastBroadcastLen = new Map<string, number>();
|
||||
const abortedRuns = new Map<string, number>();
|
||||
|
||||
const clear = () => {
|
||||
registry.clear();
|
||||
rawBuffers.clear();
|
||||
buffers.clear();
|
||||
deltaSentAt.clear();
|
||||
deltaLastBroadcastLen.clear();
|
||||
abortedRuns.clear();
|
||||
};
|
||||
|
||||
return {
|
||||
registry,
|
||||
rawBuffers,
|
||||
buffers,
|
||||
deltaSentAt,
|
||||
deltaLastBroadcastLen,
|
||||
abortedRuns,
|
||||
clear,
|
||||
};
|
||||
}
|
||||
|
||||
export type ToolEventRecipientRegistry = {
|
||||
add: (runId: string, connId: string) => void;
|
||||
get: (runId: string) => ReadonlySet<string> | undefined;
|
||||
markFinal: (runId: string) => void;
|
||||
};
|
||||
|
||||
export type SessionEventSubscriberRegistry = {
|
||||
subscribe: (connId: string) => void;
|
||||
unsubscribe: (connId: string) => void;
|
||||
getAll: () => ReadonlySet<string>;
|
||||
clear: () => void;
|
||||
};
|
||||
|
||||
export type SessionMessageSubscriberRegistry = {
|
||||
subscribe: (connId: string, sessionKey: string) => void;
|
||||
unsubscribe: (connId: string, sessionKey: string) => void;
|
||||
unsubscribeAll: (connId: string) => void;
|
||||
get: (sessionKey: string) => ReadonlySet<string>;
|
||||
clear: () => void;
|
||||
};
|
||||
|
||||
type ToolRecipientEntry = {
|
||||
connIds: Set<string>;
|
||||
updatedAt: number;
|
||||
finalizedAt?: number;
|
||||
};
|
||||
|
||||
const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000;
|
||||
const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000;
|
||||
/**
|
||||
* Keep this aligned with the agent.wait lifecycle-error grace so chat surfaces
|
||||
* do not finalize a run before fallback or retry reuses the same runId.
|
||||
*/
|
||||
const AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
|
||||
export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry {
|
||||
const connIds = new Set<string>();
|
||||
const empty = new Set<string>();
|
||||
|
||||
return {
|
||||
subscribe: (connId: string) => {
|
||||
const normalized = connId.trim();
|
||||
if (!normalized) {
|
||||
return;
|
||||
}
|
||||
connIds.add(normalized);
|
||||
},
|
||||
unsubscribe: (connId: string) => {
|
||||
const normalized = connId.trim();
|
||||
if (!normalized) {
|
||||
return;
|
||||
}
|
||||
connIds.delete(normalized);
|
||||
},
|
||||
getAll: () => (connIds.size > 0 ? connIds : empty),
|
||||
clear: () => {
|
||||
connIds.clear();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry {
|
||||
const sessionToConnIds = new Map<string, Set<string>>();
|
||||
const connToSessionKeys = new Map<string, Set<string>>();
|
||||
const empty = new Set<string>();
|
||||
|
||||
const normalize = (value: string): string => value.trim();
|
||||
|
||||
return {
|
||||
subscribe: (connId: string, sessionKey: string) => {
|
||||
const normalizedConnId = normalize(connId);
|
||||
const normalizedSessionKey = normalize(sessionKey);
|
||||
if (!normalizedConnId || !normalizedSessionKey) {
|
||||
return;
|
||||
}
|
||||
const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set<string>();
|
||||
connIds.add(normalizedConnId);
|
||||
sessionToConnIds.set(normalizedSessionKey, connIds);
|
||||
|
||||
const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set<string>();
|
||||
sessionKeys.add(normalizedSessionKey);
|
||||
connToSessionKeys.set(normalizedConnId, sessionKeys);
|
||||
},
|
||||
unsubscribe: (connId: string, sessionKey: string) => {
|
||||
const normalizedConnId = normalize(connId);
|
||||
const normalizedSessionKey = normalize(sessionKey);
|
||||
if (!normalizedConnId || !normalizedSessionKey) {
|
||||
return;
|
||||
}
|
||||
const connIds = sessionToConnIds.get(normalizedSessionKey);
|
||||
if (connIds) {
|
||||
connIds.delete(normalizedConnId);
|
||||
if (connIds.size === 0) {
|
||||
sessionToConnIds.delete(normalizedSessionKey);
|
||||
}
|
||||
}
|
||||
const sessionKeys = connToSessionKeys.get(normalizedConnId);
|
||||
if (sessionKeys) {
|
||||
sessionKeys.delete(normalizedSessionKey);
|
||||
if (sessionKeys.size === 0) {
|
||||
connToSessionKeys.delete(normalizedConnId);
|
||||
}
|
||||
}
|
||||
},
|
||||
unsubscribeAll: (connId: string) => {
|
||||
const normalizedConnId = normalize(connId);
|
||||
if (!normalizedConnId) {
|
||||
return;
|
||||
}
|
||||
const sessionKeys = connToSessionKeys.get(normalizedConnId);
|
||||
if (!sessionKeys) {
|
||||
return;
|
||||
}
|
||||
for (const sessionKey of sessionKeys) {
|
||||
const connIds = sessionToConnIds.get(sessionKey);
|
||||
if (!connIds) {
|
||||
continue;
|
||||
}
|
||||
connIds.delete(normalizedConnId);
|
||||
if (connIds.size === 0) {
|
||||
sessionToConnIds.delete(sessionKey);
|
||||
}
|
||||
}
|
||||
connToSessionKeys.delete(normalizedConnId);
|
||||
},
|
||||
get: (sessionKey: string) => {
|
||||
const normalizedSessionKey = normalize(sessionKey);
|
||||
if (!normalizedSessionKey) {
|
||||
return empty;
|
||||
}
|
||||
return sessionToConnIds.get(normalizedSessionKey) ?? empty;
|
||||
},
|
||||
clear: () => {
|
||||
sessionToConnIds.clear();
|
||||
connToSessionKeys.clear();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
|
||||
const recipients = new Map<string, ToolRecipientEntry>();
|
||||
|
||||
const prune = () => {
|
||||
if (recipients.size === 0) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
for (const [runId, entry] of recipients) {
|
||||
const cutoff = entry.finalizedAt
|
||||
? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS
|
||||
: entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS;
|
||||
if (now >= cutoff) {
|
||||
recipients.delete(runId);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const add = (runId: string, connId: string) => {
|
||||
if (!runId || !connId) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
const existing = recipients.get(runId);
|
||||
if (existing) {
|
||||
existing.connIds.add(connId);
|
||||
existing.updatedAt = now;
|
||||
} else {
|
||||
recipients.set(runId, {
|
||||
connIds: new Set([connId]),
|
||||
updatedAt: now,
|
||||
});
|
||||
}
|
||||
prune();
|
||||
};
|
||||
|
||||
const get = (runId: string) => {
|
||||
const entry = recipients.get(runId);
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
entry.updatedAt = Date.now();
|
||||
prune();
|
||||
return entry.connIds;
|
||||
};
|
||||
|
||||
const markFinal = (runId: string) => {
|
||||
const entry = recipients.get(runId);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
entry.finalizedAt = Date.now();
|
||||
prune();
|
||||
};
|
||||
|
||||
return { add, get, markFinal };
|
||||
}
|
||||
|
||||
export type ChatEventBroadcast = (
|
||||
event: string,
|
||||
payload: unknown,
|
||||
|
||||
Reference in New Issue
Block a user