import type { OpenClawConfig } from "../config/config.js"; import type { InboundDebounceByProvider } from "../config/types.messages.js"; const resolveMs = (value: unknown): number | undefined => { if (typeof value !== "number" || !Number.isFinite(value)) { return undefined; } return Math.max(0, Math.trunc(value)); }; const resolveChannelOverride = (params: { byChannel?: InboundDebounceByProvider; channel: string; }): number | undefined => { if (!params.byChannel) { return undefined; } return resolveMs(params.byChannel[params.channel]); }; export function resolveInboundDebounceMs(params: { cfg: OpenClawConfig; channel: string; overrideMs?: number; }): number { const inbound = params.cfg.messages?.inbound; const override = resolveMs(params.overrideMs); const byChannel = resolveChannelOverride({ byChannel: inbound?.byChannel, channel: params.channel, }); const base = resolveMs(inbound?.debounceMs); return override ?? byChannel ?? base ?? 0; } type DebounceBuffer = { items: T[]; timeout: ReturnType | null; debounceMs: number; releaseReady: () => void; readyReleased: boolean; task: Promise; }; export type InboundDebounceCreateParams = { debounceMs: number; buildKey: (item: T) => string | null | undefined; shouldDebounce?: (item: T) => boolean; resolveDebounceMs?: (item: T) => number | undefined; onFlush: (items: T[]) => Promise; onError?: (err: unknown, items: T[]) => void; }; export function createInboundDebouncer(params: InboundDebounceCreateParams) { const buffers = new Map>(); const keyChains = new Map>(); const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs)); const resolveDebounceMs = (item: T) => { const resolved = params.resolveDebounceMs?.(item); if (typeof resolved !== "number" || !Number.isFinite(resolved)) { return defaultDebounceMs; } return Math.max(0, Math.trunc(resolved)); }; const runFlush = async (items: T[]) => { try { await params.onFlush(items); } catch (err) { params.onError?.(err, items); } }; const enqueueKeyTask = (key: string, task: () => Promise) => { const previous = keyChains.get(key) ?? Promise.resolve(); const next = previous.catch(() => undefined).then(task); const settled = next.catch(() => undefined); keyChains.set(key, settled); void settled.finally(() => { if (keyChains.get(key) === settled) { keyChains.delete(key); } }); return next; }; const enqueueReservedKeyTask = (key: string, task: () => Promise) => { let readyReleased = false; let releaseReady!: () => void; const ready = new Promise((resolve) => { releaseReady = resolve; }); return { task: enqueueKeyTask(key, async () => { await ready; await task(); }), release: () => { if (readyReleased) { return; } readyReleased = true; releaseReady(); }, }; }; const releaseBuffer = (buffer: DebounceBuffer) => { if (buffer.readyReleased) { return; } buffer.readyReleased = true; buffer.releaseReady(); }; const flushBuffer = async (key: string, buffer: DebounceBuffer) => { if (buffers.get(key) === buffer) { buffers.delete(key); } if (buffer.timeout) { clearTimeout(buffer.timeout); buffer.timeout = null; } // Reserve each key's execution slot as soon as the first buffered item // arrives, so later same-key work cannot overtake a timer-backed flush. releaseBuffer(buffer); await buffer.task; }; const flushKey = async (key: string) => { const buffer = buffers.get(key); if (!buffer) { return; } await flushBuffer(key, buffer); }; const scheduleFlush = (key: string, buffer: DebounceBuffer) => { if (buffer.timeout) { clearTimeout(buffer.timeout); } buffer.timeout = setTimeout(async () => { await flushBuffer(key, buffer); }, buffer.debounceMs); buffer.timeout.unref?.(); }; const enqueue = async (item: T) => { const key = params.buildKey(item); const debounceMs = resolveDebounceMs(item); const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); if (!canDebounce || !key) { if (key) { if (!buffers.has(key)) { await enqueueKeyTask(key, async () => { await runFlush([item]); }); return; } // Reserve the keyed immediate slot before forcing the pending buffer // to flush so fire-and-forget callers cannot be overtaken. const reservedTask = enqueueReservedKeyTask(key, async () => { await runFlush([item]); }); try { await flushKey(key); } finally { reservedTask.release(); } await reservedTask.task; } else { await runFlush([item]); } return; } const existing = buffers.get(key); if (existing) { existing.items.push(item); existing.debounceMs = debounceMs; scheduleFlush(key, existing); return; } let buffer!: DebounceBuffer; const reservedTask = enqueueReservedKeyTask(key, async () => { if (buffer.items.length === 0) { return; } await runFlush(buffer.items); }); buffer = { items: [item], timeout: null, debounceMs, releaseReady: reservedTask.release, readyReleased: false, task: reservedTask.task, }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; return { enqueue, flushKey }; }