mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-01 05:50:20 +00:00
Telegram: coalesce forwarded text+media bursts into one inbound turn (#19476)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 09e0b4e9bd
Co-authored-by: napetrov <18015221+napetrov@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
@@ -36,17 +36,27 @@ export function resolveInboundDebounceMs(params: {
|
||||
type DebounceBuffer<T> = {
|
||||
items: T[];
|
||||
timeout: ReturnType<typeof setTimeout> | null;
|
||||
debounceMs: number;
|
||||
};
|
||||
|
||||
export function createInboundDebouncer<T>(params: {
|
||||
debounceMs: number;
|
||||
buildKey: (item: T) => string | null | undefined;
|
||||
shouldDebounce?: (item: T) => boolean;
|
||||
resolveDebounceMs?: (item: T) => number | undefined;
|
||||
onFlush: (items: T[]) => Promise<void>;
|
||||
onError?: (err: unknown, items: T[]) => void;
|
||||
}) {
|
||||
const buffers = new Map<string, DebounceBuffer<T>>();
|
||||
const debounceMs = Math.max(0, Math.trunc(params.debounceMs));
|
||||
const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs));
|
||||
|
||||
const resolveDebounceMs = (item: T) => {
|
||||
const resolved = params.resolveDebounceMs?.(item);
|
||||
if (typeof resolved !== "number" || !Number.isFinite(resolved)) {
|
||||
return defaultDebounceMs;
|
||||
}
|
||||
return Math.max(0, Math.trunc(resolved));
|
||||
};
|
||||
|
||||
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
|
||||
buffers.delete(key);
|
||||
@@ -78,12 +88,13 @@ export function createInboundDebouncer<T>(params: {
|
||||
}
|
||||
buffer.timeout = setTimeout(() => {
|
||||
void flushBuffer(key, buffer);
|
||||
}, debounceMs);
|
||||
}, buffer.debounceMs);
|
||||
buffer.timeout.unref?.();
|
||||
};
|
||||
|
||||
const enqueue = async (item: T) => {
|
||||
const key = params.buildKey(item);
|
||||
const debounceMs = resolveDebounceMs(item);
|
||||
const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true);
|
||||
|
||||
if (!canDebounce || !key) {
|
||||
@@ -97,11 +108,12 @@ export function createInboundDebouncer<T>(params: {
|
||||
const existing = buffers.get(key);
|
||||
if (existing) {
|
||||
existing.items.push(item);
|
||||
existing.debounceMs = debounceMs;
|
||||
scheduleFlush(key, existing);
|
||||
return;
|
||||
}
|
||||
|
||||
const buffer: DebounceBuffer<T> = { items: [item], timeout: null };
|
||||
const buffer: DebounceBuffer<T> = { items: [item], timeout: null, debounceMs };
|
||||
buffers.set(key, buffer);
|
||||
scheduleFlush(key, buffer);
|
||||
};
|
||||
|
||||
@@ -256,6 +256,29 @@ describe("createInboundDebouncer", () => {
|
||||
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("supports per-item debounce windows when default debounce is disabled", async () => {
|
||||
vi.useFakeTimers();
|
||||
const calls: Array<string[]> = [];
|
||||
|
||||
const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({
|
||||
debounceMs: 0,
|
||||
buildKey: (item) => item.key,
|
||||
resolveDebounceMs: (item) => item.windowMs,
|
||||
onFlush: async (items) => {
|
||||
calls.push(items.map((entry) => entry.id));
|
||||
},
|
||||
});
|
||||
|
||||
await debouncer.enqueue({ key: "forward", id: "1", windowMs: 30 });
|
||||
await debouncer.enqueue({ key: "forward", id: "2", windowMs: 30 });
|
||||
|
||||
expect(calls).toEqual([]);
|
||||
await vi.advanceTimersByTimeAsync(30);
|
||||
expect(calls).toEqual([["1", "2"]]);
|
||||
|
||||
vi.useRealTimers();
|
||||
});
|
||||
});
|
||||
|
||||
describe("initSessionState BodyStripped", () => {
|
||||
|
||||
Reference in New Issue
Block a user