fix(signal): handle attachment and SSE regressions

This commit is contained in:
Peter Steinberger
2026-04-30 15:14:11 +01:00
parent 4e168de6d9
commit 3b0ed18b86
5 changed files with 185 additions and 12 deletions

View File

@@ -139,6 +139,65 @@ describe("signalRpcRequest", () => {
).rejects.toThrow("Signal HTTP response exceeded size limit");
});
it("accepts RPC responses larger than the default cap when maxResponseBytes is raised", async () => {
const payload = JSON.stringify({
jsonrpc: "2.0",
result: { data: "y".repeat(1_200_000) },
id: "test-id",
});
const baseUrl = await withSignalServer((_req, res) => {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(payload);
});
const result = await signalRpcRequest<{ data: string }>("getAttachment", undefined, {
baseUrl,
maxResponseBytes: 4_000_000,
});
expect(result.data.length).toBe(1_200_000);
});
it("rejects RPC responses that exceed a custom maxResponseBytes cap", async () => {
const baseUrl = await withSignalServer((_req, res) => {
res.writeHead(200, { "Content-Type": "application/json" });
res.end("x".repeat(8_193));
});
await expect(
signalRpcRequest("getAttachment", undefined, {
baseUrl,
maxResponseBytes: 8_192,
}),
).rejects.toThrow("Signal HTTP response exceeded size limit");
});
it("falls back to the default cap when maxResponseBytes is zero or non-finite", async () => {
const baseUrl = await withSignalServer((_req, res) => {
res.writeHead(200, { "Content-Type": "application/json" });
res.end("x".repeat(1_048_577));
});
await expect(
signalRpcRequest("version", undefined, {
baseUrl,
maxResponseBytes: 0,
}),
).rejects.toThrow("Signal HTTP response exceeded size limit");
const baseUrl2 = await withSignalServer((_req, res) => {
res.writeHead(200, { "Content-Type": "application/json" });
res.end("x".repeat(1_048_577));
});
await expect(
signalRpcRequest("version", undefined, {
baseUrl: baseUrl2,
maxResponseBytes: Number.POSITIVE_INFINITY,
}),
).rejects.toThrow("Signal HTTP response exceeded size limit");
});
it("uses an absolute deadline for slow-drip RPC responses", async () => {
const baseUrl = await withSignalServer((_req, res) => {
res.writeHead(200, { "Content-Type": "application/json" });
@@ -230,6 +289,25 @@ describe("streamSignalEvents", () => {
).rejects.toThrow("Signal SSE connection timed out after 25ms");
});
it("allows idle event streams to wait for abort when the deadline is disabled", async () => {
const baseUrl = await withSignalServer(() => {
// Leave the request open without response headers, matching signal-cli 0.14.3 before
// its first keepalive flush.
});
const abortController = new AbortController();
const abortTimer = setTimeout(() => abortController.abort(), 25);
abortTimer.unref?.();
await expect(
streamSignalEvents({
baseUrl,
timeoutMs: 0,
abortSignal: abortController.signal,
onEvent: () => {},
}),
).rejects.toMatchObject({ name: "AbortError", message: "Signal SSE aborted" });
});
it("rejects oversized SSE line buffers by byte size", async () => {
const baseUrl = await withSignalServer((_req, res) => {
res.writeHead(200, { "Content-Type": "text/event-stream" });

View File

@@ -7,6 +7,7 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
export type SignalRpcOptions = {
baseUrl: string;
timeoutMs?: number;
maxResponseBytes?: number;
};
export type SignalRpcError = {
@@ -29,7 +30,7 @@ export type SignalSseEvent = {
};
const DEFAULT_TIMEOUT_MS = 10_000;
const MAX_SIGNAL_HTTP_RESPONSE_BYTES = 1_048_576;
const DEFAULT_SIGNAL_HTTP_RESPONSE_MAX_BYTES = 1_048_576;
const MAX_SIGNAL_SSE_BUFFER_BYTES = 1_048_576;
const MAX_SIGNAL_SSE_EVENT_DATA_BYTES = 1_048_576;
@@ -94,6 +95,20 @@ function assertSignalHttpProtocol(url: URL, label: string): void {
}
}
function normalizeSignalHttpResponseMaxBytes(value: number | undefined): number {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return DEFAULT_SIGNAL_HTTP_RESPONSE_MAX_BYTES;
}
return Math.floor(value);
}
function normalizeSignalSseTimeoutMs(timeoutMs: number): number | null {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return null;
}
return timeoutMs;
}
function requestSignalHttpText(
url: URL,
options: {
@@ -101,6 +116,7 @@ function requestSignalHttpText(
headers?: Record<string, string>;
body?: string;
timeoutMs: number;
maxResponseBytes?: number;
},
): Promise<SignalHttpResponse> {
assertSignalHttpProtocol(url, "HTTP");
@@ -132,6 +148,7 @@ function requestSignalHttpText(
cleanup();
resolve(response);
};
const maxResponseBytes = normalizeSignalHttpResponseMaxBytes(options.maxResponseBytes);
request = client.request(
url,
{
@@ -144,7 +161,7 @@ function requestSignalHttpText(
res.on("data", (chunk: Buffer | string) => {
const next = typeof chunk === "string" ? Buffer.from(chunk) : chunk;
totalBytes += next.byteLength;
if (totalBytes > MAX_SIGNAL_HTTP_RESPONSE_BYTES) {
if (totalBytes > maxResponseBytes) {
const error = new Error("Signal HTTP response exceeded size limit");
request?.destroy(error);
res.destroy(error);
@@ -194,6 +211,7 @@ export async function signalRpcRequest<T = unknown>(
},
body,
timeoutMs: opts.timeoutMs ?? DEFAULT_TIMEOUT_MS,
maxResponseBytes: opts.maxResponseBytes,
});
if (res.status === 201) {
return undefined as T;
@@ -248,15 +266,23 @@ function openSignalEventStream(
let response: IncomingMessage | undefined;
let onAbort: () => void = () => {};
let request: ClientRequest;
const headerDeadline = setTimeout(() => {
const error = new Error(`Signal SSE connection timed out after ${timeoutMs}ms`);
response?.destroy(error);
request.destroy(error);
rejectOnce(error);
}, timeoutMs);
headerDeadline.unref?.();
const effectiveTimeoutMs = normalizeSignalSseTimeoutMs(timeoutMs);
const headerDeadline =
effectiveTimeoutMs === null
? undefined
: setTimeout(() => {
const error = new Error(
`Signal SSE connection timed out after ${effectiveTimeoutMs}ms`,
);
response?.destroy(error);
request.destroy(error);
rejectOnce(error);
}, effectiveTimeoutMs);
headerDeadline?.unref?.();
const cleanup = () => {
clearTimeout(headerDeadline);
if (headerDeadline) {
clearTimeout(headerDeadline);
}
abortSignal?.removeEventListener("abort", onAbort);
};
const rejectOnce = (error: unknown) => {
@@ -284,7 +310,9 @@ function openSignalEventStream(
res.destroy();
return;
}
clearTimeout(headerDeadline);
if (headerDeadline) {
clearTimeout(headerDeadline);
}
settled = true;
response = res;
resolve({ response: res, cleanup });

View File

@@ -1,3 +1,4 @@
import { Buffer } from "node:buffer";
import { describe, expect, it, vi } from "vitest";
import {
config,
@@ -10,7 +11,7 @@ import {
installSignalToolResultTestHooks();
const { monitorSignalProvider } = await import("./monitor.js");
const { replyMock, sendMock, streamMock, upsertPairingRequestMock } =
const { replyMock, sendMock, streamMock, signalRpcRequestMock, upsertPairingRequestMock } =
getSignalToolResultTestMocks();
type MonitorSignalProviderOptions = Parameters<typeof monitorSignalProvider>[0];
@@ -109,9 +110,55 @@ describe("monitorSignalProvider tool results", () => {
await monitorPromise;
expect(streamMock).toHaveBeenCalledTimes(2);
expect(streamMock.mock.calls[0]?.[0]).toMatchObject({ timeoutMs: 0 });
expect(streamMock.mock.calls[1]?.[0]).toMatchObject({ timeoutMs: 0 });
} finally {
randomSpy.mockRestore();
vi.useRealTimers();
}
});
it("sizes attachment RPC response caps from mediaMaxMb", async () => {
const abortController = new AbortController();
const maxBytes = 2 * 1024 * 1024;
const expectedMaxResponseBytes = Math.ceil((maxBytes * 4) / 3) + 64 * 1024;
replyMock.mockResolvedValue({ text: "ok" });
signalRpcRequestMock.mockResolvedValue({ data: Buffer.from("hello").toString("base64") });
streamMock.mockImplementation(async ({ onEvent }) => {
await onEvent({
event: "receive",
data: JSON.stringify({
envelope: {
sourceNumber: "+15550001111",
sourceName: "Ada",
timestamp: 1,
dataMessage: {
message: "",
attachments: [{ id: "attachment-1", size: 1_500_000, contentType: "text/plain" }],
},
},
}),
});
abortController.abort();
});
await monitorSignalProvider({
autoStart: false,
baseUrl: "http://127.0.0.1:8080",
mediaMaxMb: 2,
abortSignal: abortController.signal,
});
await flush();
expect(signalRpcRequestMock).toHaveBeenCalledWith(
"getAttachment",
expect.objectContaining({ id: "attachment-1", recipient: "+15550001111" }),
expect.objectContaining({
baseUrl: "http://127.0.0.1:8080",
maxResponseBytes: expectedMaxResponseBytes,
}),
);
});
});

View File

@@ -255,6 +255,20 @@ async function waitForSignalDaemonReady(params: {
});
}
const SIGNAL_ATTACHMENT_RPC_RESPONSE_HEADROOM_BYTES = 64 * 1024;
const SIGNAL_BASE64_OVERHEAD_NUMERATOR = 4;
const SIGNAL_BASE64_OVERHEAD_DENOMINATOR = 3;
function deriveSignalAttachmentRpcMaxResponseBytes(maxBytes: number): number | undefined {
if (!Number.isFinite(maxBytes) || maxBytes <= 0) {
return undefined;
}
const base64Bytes = Math.ceil(
(maxBytes * SIGNAL_BASE64_OVERHEAD_NUMERATOR) / SIGNAL_BASE64_OVERHEAD_DENOMINATOR,
);
return base64Bytes + SIGNAL_ATTACHMENT_RPC_RESPONSE_HEADROOM_BYTES;
}
async function fetchAttachment(params: {
baseUrl: string;
account?: string;
@@ -288,6 +302,7 @@ async function fetchAttachment(params: {
const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
baseUrl: params.baseUrl,
maxResponseBytes: deriveSignalAttachmentRpcMaxResponseBytes(params.maxBytes),
});
if (!result?.data) {
return null;
@@ -489,6 +504,8 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
account,
abortSignal: daemonLifecycle.abortSignal,
runtime,
// signal-cli can keep the SSE event endpoint idle until the next inbound event.
timeoutMs: 0,
policy: opts.reconnectPolicy,
onEvent: (event) => {
void handleEvent(event).catch((err) => {

View File

@@ -21,6 +21,7 @@ type RunSignalSseLoopParams = {
abortSignal?: AbortSignal;
runtime: RuntimeEnv;
onEvent: (event: SignalSseEvent) => void;
timeoutMs?: number;
policy?: Partial<BackoffPolicy>;
};
@@ -30,6 +31,7 @@ export async function runSignalSseLoop({
abortSignal,
runtime,
onEvent,
timeoutMs,
policy,
}: RunSignalSseLoopParams) {
const reconnectPolicy = {
@@ -54,6 +56,7 @@ export async function runSignalSseLoop({
baseUrl,
account,
abortSignal,
timeoutMs,
onEvent: (event) => {
reconnectAttempts = 0;
onEvent(event);