refactor(test): share signal receive harness

This commit is contained in:
Peter Steinberger
2026-02-15 15:14:23 +00:00
parent 89dccc79a7
commit e2f73650d4

View File

@@ -25,12 +25,40 @@ const {
waitForTransportReadyMock, waitForTransportReadyMock,
} = getSignalToolResultTestMocks(); } = getSignalToolResultTestMocks();
const SIGNAL_BASE_URL = "http://127.0.0.1:8080";
async function runMonitorWithMocks( async function runMonitorWithMocks(
opts: Parameters<(typeof import("./monitor.js"))["monitorSignalProvider"]>[0], opts: Parameters<(typeof import("./monitor.js"))["monitorSignalProvider"]>[0],
) { ) {
const { monitorSignalProvider } = await import("./monitor.js"); const { monitorSignalProvider } = await import("./monitor.js");
return monitorSignalProvider(opts); return monitorSignalProvider(opts);
} }
async function receiveSignalPayloads(params: {
payloads: unknown[];
opts?: Partial<Parameters<(typeof import("./monitor.js"))["monitorSignalProvider"]>[0]>;
}) {
const abortController = new AbortController();
streamMock.mockImplementation(async ({ onEvent }) => {
for (const payload of params.payloads) {
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
}
abortController.abort();
});
await runMonitorWithMocks({
autoStart: false,
baseUrl: SIGNAL_BASE_URL,
abortSignal: abortController.signal,
...params.opts,
});
await flush();
}
describe("monitorSignalProvider tool results", () => { describe("monitorSignalProvider tool results", () => {
it("uses bounded readiness checks when auto-starting the daemon", async () => { it("uses bounded readiness checks when auto-starting the daemon", async () => {
const runtime = { const runtime = {
@@ -54,7 +82,7 @@ describe("monitorSignalProvider tool results", () => {
}); });
await runMonitorWithMocks({ await runMonitorWithMocks({
autoStart: true, autoStart: true,
baseUrl: "http://127.0.0.1:8080", baseUrl: SIGNAL_BASE_URL,
abortSignal: abortController.signal, abortSignal: abortController.signal,
runtime, runtime,
}); });
@@ -101,7 +129,7 @@ describe("monitorSignalProvider tool results", () => {
await runMonitorWithMocks({ await runMonitorWithMocks({
autoStart: true, autoStart: true,
baseUrl: "http://127.0.0.1:8080", baseUrl: SIGNAL_BASE_URL,
abortSignal: abortController.signal, abortSignal: abortController.signal,
runtime, runtime,
startupTimeoutMs: 90_000, startupTimeoutMs: 90_000,
@@ -143,7 +171,7 @@ describe("monitorSignalProvider tool results", () => {
await runMonitorWithMocks({ await runMonitorWithMocks({
autoStart: true, autoStart: true,
baseUrl: "http://127.0.0.1:8080", baseUrl: SIGNAL_BASE_URL,
abortSignal: abortController.signal, abortSignal: abortController.signal,
runtime, runtime,
}); });
@@ -157,35 +185,23 @@ describe("monitorSignalProvider tool results", () => {
}); });
it("skips tool summaries with responsePrefix", async () => { it("skips tool summaries with responsePrefix", async () => {
const abortController = new AbortController();
replyMock.mockResolvedValue({ text: "final reply" }); replyMock.mockResolvedValue({ text: "final reply" });
streamMock.mockImplementation(async ({ onEvent }) => { await receiveSignalPayloads({
const payload = { payloads: [
envelope: { {
sourceNumber: "+15550001111", envelope: {
sourceName: "Ada", sourceNumber: "+15550001111",
timestamp: 1, sourceName: "Ada",
dataMessage: { timestamp: 1,
message: "hello", dataMessage: {
message: "hello",
},
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
expect(sendMock).toHaveBeenCalledTimes(1); expect(sendMock).toHaveBeenCalledTimes(1);
expect(sendMock.mock.calls[0][1]).toBe("PFX final reply"); expect(sendMock.mock.calls[0][1]).toBe("PFX final reply");
}); });
@@ -203,34 +219,21 @@ describe("monitorSignalProvider tool results", () => {
}, },
}, },
}); });
const abortController = new AbortController(); await receiveSignalPayloads({
payloads: [
streamMock.mockImplementation(async ({ onEvent }) => { {
const payload = { envelope: {
envelope: { sourceNumber: "+15550001111",
sourceNumber: "+15550001111", sourceName: "Ada",
sourceName: "Ada", timestamp: 1,
timestamp: 1, dataMessage: {
dataMessage: { message: "hello",
message: "hello", },
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
expect(replyMock).not.toHaveBeenCalled(); expect(replyMock).not.toHaveBeenCalled();
expect(upsertPairingRequestMock).toHaveBeenCalled(); expect(upsertPairingRequestMock).toHaveBeenCalled();
expect(sendMock).toHaveBeenCalledTimes(1); expect(sendMock).toHaveBeenCalledTimes(1);
@@ -239,75 +242,49 @@ describe("monitorSignalProvider tool results", () => {
}); });
it("ignores reaction-only messages", async () => { it("ignores reaction-only messages", async () => {
const abortController = new AbortController(); await receiveSignalPayloads({
payloads: [
streamMock.mockImplementation(async ({ onEvent }) => { {
const payload = { envelope: {
envelope: { sourceNumber: "+15550001111",
sourceNumber: "+15550001111", sourceName: "Ada",
sourceName: "Ada", timestamp: 1,
timestamp: 1, reactionMessage: {
reactionMessage: { emoji: "👍",
emoji: "👍", targetAuthor: "+15550002222",
targetAuthor: "+15550002222", targetSentTimestamp: 2,
targetSentTimestamp: 2, },
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
expect(replyMock).not.toHaveBeenCalled(); expect(replyMock).not.toHaveBeenCalled();
expect(sendMock).not.toHaveBeenCalled(); expect(sendMock).not.toHaveBeenCalled();
expect(updateLastRouteMock).not.toHaveBeenCalled(); expect(updateLastRouteMock).not.toHaveBeenCalled();
}); });
it("ignores reaction-only dataMessage.reaction events (dont treat as broken attachments)", async () => { it("ignores reaction-only dataMessage.reaction events (dont treat as broken attachments)", async () => {
const abortController = new AbortController(); await receiveSignalPayloads({
payloads: [
streamMock.mockImplementation(async ({ onEvent }) => { {
const payload = { envelope: {
envelope: { sourceNumber: "+15550001111",
sourceNumber: "+15550001111", sourceName: "Ada",
sourceName: "Ada", timestamp: 1,
timestamp: 1, dataMessage: {
dataMessage: { reaction: {
reaction: { emoji: "👍",
emoji: "👍", targetAuthor: "+15550002222",
targetAuthor: "+15550002222", targetSentTimestamp: 2,
targetSentTimestamp: 2, },
attachments: [{}],
}, },
attachments: [{}],
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
expect(replyMock).not.toHaveBeenCalled(); expect(replyMock).not.toHaveBeenCalled();
expect(sendMock).not.toHaveBeenCalled(); expect(sendMock).not.toHaveBeenCalled();
expect(updateLastRouteMock).not.toHaveBeenCalled(); expect(updateLastRouteMock).not.toHaveBeenCalled();
@@ -327,36 +304,23 @@ describe("monitorSignalProvider tool results", () => {
}, },
}, },
}); });
const abortController = new AbortController(); await receiveSignalPayloads({
payloads: [
streamMock.mockImplementation(async ({ onEvent }) => { {
const payload = { envelope: {
envelope: { sourceNumber: "+15550001111",
sourceNumber: "+15550001111", sourceName: "Ada",
sourceName: "Ada", timestamp: 1,
timestamp: 1, reactionMessage: {
reactionMessage: { emoji: "✅",
emoji: "✅", targetAuthor: "+15550002222",
targetAuthor: "+15550002222", targetSentTimestamp: 2,
targetSentTimestamp: 2, },
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
const route = resolveAgentRoute({ const route = resolveAgentRoute({
cfg: config as OpenClawConfig, cfg: config as OpenClawConfig,
channel: "signal", channel: "signal",
@@ -382,37 +346,24 @@ describe("monitorSignalProvider tool results", () => {
}, },
}, },
}); });
const abortController = new AbortController(); await receiveSignalPayloads({
payloads: [
streamMock.mockImplementation(async ({ onEvent }) => { {
const payload = { envelope: {
envelope: { sourceNumber: "+15550001111",
sourceNumber: "+15550001111", sourceName: "Ada",
sourceName: "Ada", timestamp: 1,
timestamp: 1, reactionMessage: {
reactionMessage: { emoji: "✅",
emoji: "✅", targetAuthor: "+15550002222",
targetAuthor: "+15550002222", targetAuthorUuid: "123e4567-e89b-12d3-a456-426614174000",
targetAuthorUuid: "123e4567-e89b-12d3-a456-426614174000", targetSentTimestamp: 2,
targetSentTimestamp: 2, },
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
const route = resolveAgentRoute({ const route = resolveAgentRoute({
cfg: config as OpenClawConfig, cfg: config as OpenClawConfig,
channel: "signal", channel: "signal",
@@ -424,40 +375,28 @@ describe("monitorSignalProvider tool results", () => {
}); });
it("processes messages when reaction metadata is present", async () => { it("processes messages when reaction metadata is present", async () => {
const abortController = new AbortController();
replyMock.mockResolvedValue({ text: "pong" }); replyMock.mockResolvedValue({ text: "pong" });
streamMock.mockImplementation(async ({ onEvent }) => { await receiveSignalPayloads({
const payload = { payloads: [
envelope: { {
sourceNumber: "+15550001111", envelope: {
sourceName: "Ada", sourceNumber: "+15550001111",
timestamp: 1, sourceName: "Ada",
reactionMessage: { timestamp: 1,
emoji: "👍", reactionMessage: {
targetAuthor: "+15550002222", emoji: "👍",
targetSentTimestamp: 2, targetAuthor: "+15550002222",
}, targetSentTimestamp: 2,
dataMessage: { },
message: "ping", dataMessage: {
message: "ping",
},
}, },
}, },
}; ],
await onEvent({
event: "receive",
data: JSON.stringify(payload),
});
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
expect(sendMock).toHaveBeenCalledTimes(1); expect(sendMock).toHaveBeenCalledTimes(1);
expect(updateLastRouteMock).toHaveBeenCalled(); expect(updateLastRouteMock).toHaveBeenCalled();
}); });
@@ -475,44 +414,30 @@ describe("monitorSignalProvider tool results", () => {
}, },
}, },
}); });
const abortController = new AbortController();
upsertPairingRequestMock upsertPairingRequestMock
.mockResolvedValueOnce({ code: "PAIRCODE", created: true }) .mockResolvedValueOnce({ code: "PAIRCODE", created: true })
.mockResolvedValueOnce({ code: "PAIRCODE", created: false }); .mockResolvedValueOnce({ code: "PAIRCODE", created: false });
streamMock.mockImplementation(async ({ onEvent }) => { const payload = {
const payload = { envelope: {
envelope: { sourceNumber: "+15550001111",
sourceNumber: "+15550001111", sourceName: "Ada",
sourceName: "Ada", timestamp: 1,
timestamp: 1, dataMessage: {
dataMessage: { message: "hello",
message: "hello",
},
}, },
}; },
await onEvent({ };
event: "receive", await receiveSignalPayloads({
data: JSON.stringify(payload), payloads: [
}); payload,
await onEvent({ {
event: "receive",
data: JSON.stringify({
...payload, ...payload,
envelope: { ...payload.envelope, timestamp: 2 }, envelope: { ...payload.envelope, timestamp: 2 },
}), },
}); ],
abortController.abort();
}); });
await runMonitorWithMocks({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
abortSignal: abortController.signal,
});
await flush();
expect(sendMock).toHaveBeenCalledTimes(1); expect(sendMock).toHaveBeenCalledTimes(1);
}); });
}); });