mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-28 16:33:34 +00:00
test(infra): isolate matrix outbound queue integration
This commit is contained in:
@@ -1,5 +1,12 @@
|
||||
import { describe, expect, it, vi, beforeAll, beforeEach } from "vitest";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { ChannelOutboundAdapter } from "../../channels/plugins/types.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { createEmptyPluginRegistry } from "../../plugins/registry.js";
|
||||
import {
|
||||
releasePinnedPluginChannelRegistry,
|
||||
setActivePluginRegistry,
|
||||
} from "../../plugins/runtime.js";
|
||||
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
|
||||
import { drainPendingDeliveries, type DeliverFn, loadPendingDeliveries } from "./delivery-queue.js";
|
||||
import {
|
||||
createRecoveryLog,
|
||||
@@ -8,6 +15,40 @@ import {
|
||||
|
||||
let deliverOutboundPayloads: typeof import("./deliver.js").deliverOutboundPayloads;
|
||||
|
||||
type MatrixSendFn = (
|
||||
to: string,
|
||||
text: string,
|
||||
options?: Record<string, unknown>,
|
||||
) => Promise<{ messageId: string } & Record<string, unknown>>;
|
||||
|
||||
function resolveMatrixSender(
|
||||
deps: Parameters<NonNullable<ChannelOutboundAdapter["sendText"]>>[0]["deps"],
|
||||
): MatrixSendFn {
|
||||
const sender = deps?.matrix;
|
||||
if (typeof sender !== "function") {
|
||||
throw new Error("missing matrix sender");
|
||||
}
|
||||
return sender as MatrixSendFn;
|
||||
}
|
||||
|
||||
function withMatrixChannel(result: Awaited<ReturnType<MatrixSendFn>>) {
|
||||
return {
|
||||
channel: "matrix" as const,
|
||||
...result,
|
||||
};
|
||||
}
|
||||
|
||||
const matrixOutboundForQueueTest: ChannelOutboundAdapter = {
|
||||
deliveryMode: "direct",
|
||||
sendText: async ({ cfg, to, text, accountId, deps }) =>
|
||||
withMatrixChannel(
|
||||
await resolveMatrixSender(deps)(to, text, {
|
||||
cfg,
|
||||
accountId: accountId ?? undefined,
|
||||
}),
|
||||
),
|
||||
};
|
||||
|
||||
async function drainMatrixReconnect(opts: { deliver: DeliverFn; stateDir: string }): Promise<void> {
|
||||
await drainPendingDeliveries({
|
||||
drainKey: "matrix:reconnect-test",
|
||||
@@ -51,6 +92,20 @@ describe("deliverOutboundPayloads queue integration: mid-batch failure with send
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = fixtures.tmpDir();
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "matrix",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({ id: "matrix", outbound: matrixOutboundForQueueTest }),
|
||||
},
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
releasePinnedPluginChannelRegistry();
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
});
|
||||
|
||||
it("advances queued entry to unknown_after_send when a later payload fails after an earlier one succeeded", async () => {
|
||||
|
||||
Reference in New Issue
Block a user