From 7cce191b0530a3327fde262ca1069eebd6153468 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Fri, 26 Jun 2026 01:23:08 +0800 Subject: [PATCH] test(infra): isolate matrix outbound queue integration --- .../deliver.queue-integration.test.ts | 57 ++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/src/infra/outbound/deliver.queue-integration.test.ts b/src/infra/outbound/deliver.queue-integration.test.ts index 7d2417c367c..cde71d07f86 100644 --- a/src/infra/outbound/deliver.queue-integration.test.ts +++ b/src/infra/outbound/deliver.queue-integration.test.ts @@ -1,5 +1,12 @@ -import { describe, expect, it, vi, beforeAll, beforeEach } from "vitest"; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ChannelOutboundAdapter } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { createEmptyPluginRegistry } from "../../plugins/registry.js"; +import { + releasePinnedPluginChannelRegistry, + setActivePluginRegistry, +} from "../../plugins/runtime.js"; +import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js"; import { drainPendingDeliveries, type DeliverFn, loadPendingDeliveries } from "./delivery-queue.js"; import { createRecoveryLog, @@ -8,6 +15,40 @@ import { let deliverOutboundPayloads: typeof import("./deliver.js").deliverOutboundPayloads; +type MatrixSendFn = ( + to: string, + text: string, + options?: Record, +) => Promise<{ messageId: string } & Record>; + +function resolveMatrixSender( + deps: Parameters>[0]["deps"], +): MatrixSendFn { + const sender = deps?.matrix; + if (typeof sender !== "function") { + throw new Error("missing matrix sender"); + } + return sender as MatrixSendFn; +} + +function withMatrixChannel(result: Awaited>) { + return { + channel: "matrix" as const, + ...result, + }; +} + +const matrixOutboundForQueueTest: ChannelOutboundAdapter = { + deliveryMode: "direct", + sendText: async ({ cfg, to, text, accountId, deps }) => + withMatrixChannel( + await resolveMatrixSender(deps)(to, text, { + cfg, + accountId: accountId ?? undefined, + }), + ), +}; + async function drainMatrixReconnect(opts: { deliver: DeliverFn; stateDir: string }): Promise { await drainPendingDeliveries({ drainKey: "matrix:reconnect-test", @@ -51,6 +92,20 @@ describe("deliverOutboundPayloads queue integration: mid-batch failure with send beforeEach(() => { tmpDir = fixtures.tmpDir(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ id: "matrix", outbound: matrixOutboundForQueueTest }), + }, + ]), + ); + }); + + afterEach(() => { + releasePinnedPluginChannelRegistry(); + setActivePluginRegistry(createEmptyPluginRegistry()); }); it("advances queued entry to unknown_after_send when a later payload fails after an earlier one succeeded", async () => {