From 0669abdda5504c54a67281f7ab2dc299ce749f58 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Mon, 9 Mar 2026 03:58:31 -0400 Subject: [PATCH] Matrix: remove orphaned send queue --- .../matrix/src/matrix/send-queue.test.ts | 154 ------------------ extensions/matrix/src/matrix/send-queue.ts | 28 ---- 2 files changed, 182 deletions(-) delete mode 100644 extensions/matrix/src/matrix/send-queue.test.ts delete mode 100644 extensions/matrix/src/matrix/send-queue.ts diff --git a/extensions/matrix/src/matrix/send-queue.test.ts b/extensions/matrix/src/matrix/send-queue.test.ts deleted file mode 100644 index aa4765eaab3..00000000000 --- a/extensions/matrix/src/matrix/send-queue.test.ts +++ /dev/null @@ -1,154 +0,0 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { DEFAULT_SEND_GAP_MS, enqueueSend } from "./send-queue.js"; - -function deferred() { - let resolve!: (value: T | PromiseLike) => void; - let reject!: (reason?: unknown) => void; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} - -describe("enqueueSend", () => { - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - it("serializes sends per room", async () => { - const gate = deferred(); - const events: string[] = []; - - const first = enqueueSend("!room:example.org", async () => { - events.push("start1"); - await gate.promise; - events.push("end1"); - return "one"; - }); - const second = enqueueSend("!room:example.org", async () => { - events.push("start2"); - events.push("end2"); - return "two"; - }); - - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); - expect(events).toEqual(["start1"]); - - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS * 2); - expect(events).toEqual(["start1"]); - - gate.resolve(); - await first; - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS - 1); - expect(events).toEqual(["start1", "end1"]); - await vi.advanceTimersByTimeAsync(1); - await second; - expect(events).toEqual(["start1", "end1", "start2", "end2"]); - }); - - it("does not serialize across different rooms", async () => { - const events: string[] = []; - - const a = enqueueSend("!a:example.org", async () => { - events.push("a"); - return "a"; - }); - const b = enqueueSend("!b:example.org", async () => { - events.push("b"); - return "b"; - }); - - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); - await Promise.all([a, b]); - expect(events.sort()).toEqual(["a", "b"]); - }); - - it("continues queue after failures", async () => { - const first = enqueueSend("!room:example.org", async () => { - throw new Error("boom"); - }).then( - () => ({ ok: true as const }), - (error) => ({ ok: false as const, error }), - ); - - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); - const firstResult = await first; - expect(firstResult.ok).toBe(false); - if (firstResult.ok) { - throw new Error("expected first queue item to fail"); - } - expect(firstResult.error).toBeInstanceOf(Error); - expect(firstResult.error.message).toBe("boom"); - - const second = enqueueSend("!room:example.org", async () => "ok"); - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); - await expect(second).resolves.toBe("ok"); - }); - - it("continues queued work when the head task fails", async () => { - const gate = deferred(); - const events: string[] = []; - - const first = enqueueSend("!room:example.org", async () => { - events.push("start1"); - await gate.promise; - throw new Error("boom"); - }).then( - () => ({ ok: true as const }), - (error) => ({ ok: false as const, error }), - ); - const second = enqueueSend("!room:example.org", async () => { - events.push("start2"); - return "two"; - }); - - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); - expect(events).toEqual(["start1"]); - - gate.resolve(); - const firstResult = await first; - expect(firstResult.ok).toBe(false); - if (firstResult.ok) { - throw new Error("expected head queue item to fail"); - } - expect(firstResult.error).toBeInstanceOf(Error); - - await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); - await expect(second).resolves.toBe("two"); - expect(events).toEqual(["start1", "start2"]); - }); - - it("supports custom gap and delay injection", async () => { - const events: string[] = []; - const delayFn = vi.fn(async (_ms: number) => {}); - - const first = enqueueSend( - "!room:example.org", - async () => { - events.push("first"); - return "one"; - }, - { gapMs: 7, delayFn }, - ); - const second = enqueueSend( - "!room:example.org", - async () => { - events.push("second"); - return "two"; - }, - { gapMs: 7, delayFn }, - ); - - await expect(first).resolves.toBe("one"); - await expect(second).resolves.toBe("two"); - expect(events).toEqual(["first", "second"]); - expect(delayFn).toHaveBeenCalledTimes(2); - expect(delayFn).toHaveBeenNthCalledWith(1, 7); - expect(delayFn).toHaveBeenNthCalledWith(2, 7); - }); -}); diff --git a/extensions/matrix/src/matrix/send-queue.ts b/extensions/matrix/src/matrix/send-queue.ts deleted file mode 100644 index 4bad4878f90..00000000000 --- a/extensions/matrix/src/matrix/send-queue.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; - -export const DEFAULT_SEND_GAP_MS = 150; - -type MatrixSendQueueOptions = { - gapMs?: number; - delayFn?: (ms: number) => Promise; -}; - -// Serialize sends per room to preserve Matrix delivery order. -const roomQueues = new KeyedAsyncQueue(); - -export function enqueueSend( - roomId: string, - fn: () => Promise, - options?: MatrixSendQueueOptions, -): Promise { - const gapMs = options?.gapMs ?? DEFAULT_SEND_GAP_MS; - const delayFn = options?.delayFn ?? delay; - return roomQueues.enqueue(roomId, async () => { - await delayFn(gapMs); - return await fn(); - }); -} - -function delay(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -}