mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 13:11:40 +00:00
* Gateway: harden node event trust boundaries * Gateway: preserve trusted summary prefixes * Gateway: prefix multiline channel summaries
163 lines
4.4 KiB
TypeScript
163 lines
4.4 KiB
TypeScript
// Lightweight in-memory queue for human-readable system events that should be
|
|
// prefixed to the next prompt. We intentionally avoid persistence to keep
|
|
// events ephemeral. Events are session-scoped and require an explicit key.
|
|
|
|
import { resolveGlobalMap } from "../shared/global-singleton.js";
|
|
import {
|
|
mergeDeliveryContext,
|
|
normalizeDeliveryContext,
|
|
type DeliveryContext,
|
|
} from "../utils/delivery-context.js";
|
|
|
|
export type SystemEvent = {
|
|
text: string;
|
|
ts: number;
|
|
contextKey?: string | null;
|
|
deliveryContext?: DeliveryContext;
|
|
trusted?: boolean;
|
|
};
|
|
|
|
const MAX_EVENTS = 20;
|
|
|
|
type SessionQueue = {
|
|
queue: SystemEvent[];
|
|
lastText: string | null;
|
|
lastContextKey: string | null;
|
|
};
|
|
|
|
const SYSTEM_EVENT_QUEUES_KEY = Symbol.for("openclaw.systemEvents.queues");
|
|
|
|
const queues = resolveGlobalMap<string, SessionQueue>(SYSTEM_EVENT_QUEUES_KEY);
|
|
|
|
type SystemEventOptions = {
|
|
sessionKey: string;
|
|
contextKey?: string | null;
|
|
deliveryContext?: DeliveryContext;
|
|
trusted?: boolean;
|
|
};
|
|
|
|
function requireSessionKey(key?: string | null): string {
|
|
const trimmed = typeof key === "string" ? key.trim() : "";
|
|
if (!trimmed) {
|
|
throw new Error("system events require a sessionKey");
|
|
}
|
|
return trimmed;
|
|
}
|
|
|
|
function normalizeContextKey(key?: string | null): string | null {
|
|
if (!key) {
|
|
return null;
|
|
}
|
|
const trimmed = key.trim();
|
|
if (!trimmed) {
|
|
return null;
|
|
}
|
|
return trimmed.toLowerCase();
|
|
}
|
|
|
|
function getSessionQueue(sessionKey: string): SessionQueue | undefined {
|
|
return queues.get(requireSessionKey(sessionKey));
|
|
}
|
|
|
|
function getOrCreateSessionQueue(sessionKey: string): SessionQueue {
|
|
const key = requireSessionKey(sessionKey);
|
|
const existing = queues.get(key);
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
const created: SessionQueue = {
|
|
queue: [],
|
|
lastText: null,
|
|
lastContextKey: null,
|
|
};
|
|
queues.set(key, created);
|
|
return created;
|
|
}
|
|
|
|
function cloneSystemEvent(event: SystemEvent): SystemEvent {
|
|
return {
|
|
...event,
|
|
...(event.deliveryContext ? { deliveryContext: { ...event.deliveryContext } } : {}),
|
|
};
|
|
}
|
|
|
|
export function isSystemEventContextChanged(
|
|
sessionKey: string,
|
|
contextKey?: string | null,
|
|
): boolean {
|
|
const existing = getSessionQueue(sessionKey);
|
|
const normalized = normalizeContextKey(contextKey);
|
|
return normalized !== (existing?.lastContextKey ?? null);
|
|
}
|
|
|
|
export function enqueueSystemEvent(text: string, options: SystemEventOptions) {
|
|
const key = requireSessionKey(options?.sessionKey);
|
|
const entry = getOrCreateSessionQueue(key);
|
|
const cleaned = text.trim();
|
|
if (!cleaned) {
|
|
return false;
|
|
}
|
|
const normalizedContextKey = normalizeContextKey(options?.contextKey);
|
|
const normalizedDeliveryContext = normalizeDeliveryContext(options?.deliveryContext);
|
|
entry.lastContextKey = normalizedContextKey;
|
|
if (entry.lastText === cleaned) {
|
|
return false;
|
|
} // skip consecutive duplicates
|
|
entry.lastText = cleaned;
|
|
entry.queue.push({
|
|
text: cleaned,
|
|
ts: Date.now(),
|
|
contextKey: normalizedContextKey,
|
|
deliveryContext: normalizedDeliveryContext,
|
|
trusted: options.trusted !== false,
|
|
});
|
|
if (entry.queue.length > MAX_EVENTS) {
|
|
entry.queue.shift();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
export function drainSystemEventEntries(sessionKey: string): SystemEvent[] {
|
|
const key = requireSessionKey(sessionKey);
|
|
const entry = getSessionQueue(key);
|
|
if (!entry || entry.queue.length === 0) {
|
|
return [];
|
|
}
|
|
const out = entry.queue.map(cloneSystemEvent);
|
|
entry.queue.length = 0;
|
|
entry.lastText = null;
|
|
entry.lastContextKey = null;
|
|
queues.delete(key);
|
|
return out;
|
|
}
|
|
|
|
export function drainSystemEvents(sessionKey: string): string[] {
|
|
return drainSystemEventEntries(sessionKey).map((event) => event.text);
|
|
}
|
|
|
|
export function peekSystemEventEntries(sessionKey: string): SystemEvent[] {
|
|
return getSessionQueue(sessionKey)?.queue.map(cloneSystemEvent) ?? [];
|
|
}
|
|
|
|
export function peekSystemEvents(sessionKey: string): string[] {
|
|
return peekSystemEventEntries(sessionKey).map((event) => event.text);
|
|
}
|
|
|
|
export function hasSystemEvents(sessionKey: string) {
|
|
return (getSessionQueue(sessionKey)?.queue.length ?? 0) > 0;
|
|
}
|
|
|
|
export function resolveSystemEventDeliveryContext(
|
|
events: readonly SystemEvent[],
|
|
): DeliveryContext | undefined {
|
|
let resolved: DeliveryContext | undefined;
|
|
for (const event of events) {
|
|
resolved = mergeDeliveryContext(event.deliveryContext, resolved);
|
|
}
|
|
return resolved;
|
|
}
|
|
|
|
export function resetSystemEventsForTest() {
|
|
queues.clear();
|
|
}
|