diff --git a/extensions/signal/src/client.test.ts b/extensions/signal/src/client.test.ts index be81e71c9c3..48d2dd0ab5d 100644 --- a/extensions/signal/src/client.test.ts +++ b/extensions/signal/src/client.test.ts @@ -139,6 +139,65 @@ describe("signalRpcRequest", () => { ).rejects.toThrow("Signal HTTP response exceeded size limit"); }); + it("accepts RPC responses larger than the default cap when maxResponseBytes is raised", async () => { + const payload = JSON.stringify({ + jsonrpc: "2.0", + result: { data: "y".repeat(1_200_000) }, + id: "test-id", + }); + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(payload); + }); + + const result = await signalRpcRequest<{ data: string }>("getAttachment", undefined, { + baseUrl, + maxResponseBytes: 4_000_000, + }); + + expect(result.data.length).toBe(1_200_000); + }); + + it("rejects RPC responses that exceed a custom maxResponseBytes cap", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end("x".repeat(8_193)); + }); + + await expect( + signalRpcRequest("getAttachment", undefined, { + baseUrl, + maxResponseBytes: 8_192, + }), + ).rejects.toThrow("Signal HTTP response exceeded size limit"); + }); + + it("falls back to the default cap when maxResponseBytes is zero or non-finite", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end("x".repeat(1_048_577)); + }); + + await expect( + signalRpcRequest("version", undefined, { + baseUrl, + maxResponseBytes: 0, + }), + ).rejects.toThrow("Signal HTTP response exceeded size limit"); + + const baseUrl2 = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end("x".repeat(1_048_577)); + }); + + await expect( + signalRpcRequest("version", undefined, { + baseUrl: baseUrl2, + maxResponseBytes: Number.POSITIVE_INFINITY, + }), + ).rejects.toThrow("Signal HTTP response exceeded size limit"); + }); + it("uses an absolute deadline for slow-drip RPC responses", async () => { const baseUrl = await withSignalServer((_req, res) => { res.writeHead(200, { "Content-Type": "application/json" }); @@ -230,6 +289,25 @@ describe("streamSignalEvents", () => { ).rejects.toThrow("Signal SSE connection timed out after 25ms"); }); + it("allows idle event streams to wait for abort when the deadline is disabled", async () => { + const baseUrl = await withSignalServer(() => { + // Leave the request open without response headers, matching signal-cli 0.14.3 before + // its first keepalive flush. + }); + const abortController = new AbortController(); + const abortTimer = setTimeout(() => abortController.abort(), 25); + abortTimer.unref?.(); + + await expect( + streamSignalEvents({ + baseUrl, + timeoutMs: 0, + abortSignal: abortController.signal, + onEvent: () => {}, + }), + ).rejects.toMatchObject({ name: "AbortError", message: "Signal SSE aborted" }); + }); + it("rejects oversized SSE line buffers by byte size", async () => { const baseUrl = await withSignalServer((_req, res) => { res.writeHead(200, { "Content-Type": "text/event-stream" }); diff --git a/extensions/signal/src/client.ts b/extensions/signal/src/client.ts index 093c9eead87..c7751194a29 100644 --- a/extensions/signal/src/client.ts +++ b/extensions/signal/src/client.ts @@ -7,6 +7,7 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; export type SignalRpcOptions = { baseUrl: string; timeoutMs?: number; + maxResponseBytes?: number; }; export type SignalRpcError = { @@ -29,7 +30,7 @@ export type SignalSseEvent = { }; const DEFAULT_TIMEOUT_MS = 10_000; -const MAX_SIGNAL_HTTP_RESPONSE_BYTES = 1_048_576; +const DEFAULT_SIGNAL_HTTP_RESPONSE_MAX_BYTES = 1_048_576; const MAX_SIGNAL_SSE_BUFFER_BYTES = 1_048_576; const MAX_SIGNAL_SSE_EVENT_DATA_BYTES = 1_048_576; @@ -94,6 +95,20 @@ function assertSignalHttpProtocol(url: URL, label: string): void { } } +function normalizeSignalHttpResponseMaxBytes(value: number | undefined): number { + if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) { + return DEFAULT_SIGNAL_HTTP_RESPONSE_MAX_BYTES; + } + return Math.floor(value); +} + +function normalizeSignalSseTimeoutMs(timeoutMs: number): number | null { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return null; + } + return timeoutMs; +} + function requestSignalHttpText( url: URL, options: { @@ -101,6 +116,7 @@ function requestSignalHttpText( headers?: Record; body?: string; timeoutMs: number; + maxResponseBytes?: number; }, ): Promise { assertSignalHttpProtocol(url, "HTTP"); @@ -132,6 +148,7 @@ function requestSignalHttpText( cleanup(); resolve(response); }; + const maxResponseBytes = normalizeSignalHttpResponseMaxBytes(options.maxResponseBytes); request = client.request( url, { @@ -144,7 +161,7 @@ function requestSignalHttpText( res.on("data", (chunk: Buffer | string) => { const next = typeof chunk === "string" ? Buffer.from(chunk) : chunk; totalBytes += next.byteLength; - if (totalBytes > MAX_SIGNAL_HTTP_RESPONSE_BYTES) { + if (totalBytes > maxResponseBytes) { const error = new Error("Signal HTTP response exceeded size limit"); request?.destroy(error); res.destroy(error); @@ -194,6 +211,7 @@ export async function signalRpcRequest( }, body, timeoutMs: opts.timeoutMs ?? DEFAULT_TIMEOUT_MS, + maxResponseBytes: opts.maxResponseBytes, }); if (res.status === 201) { return undefined as T; @@ -248,15 +266,23 @@ function openSignalEventStream( let response: IncomingMessage | undefined; let onAbort: () => void = () => {}; let request: ClientRequest; - const headerDeadline = setTimeout(() => { - const error = new Error(`Signal SSE connection timed out after ${timeoutMs}ms`); - response?.destroy(error); - request.destroy(error); - rejectOnce(error); - }, timeoutMs); - headerDeadline.unref?.(); + const effectiveTimeoutMs = normalizeSignalSseTimeoutMs(timeoutMs); + const headerDeadline = + effectiveTimeoutMs === null + ? undefined + : setTimeout(() => { + const error = new Error( + `Signal SSE connection timed out after ${effectiveTimeoutMs}ms`, + ); + response?.destroy(error); + request.destroy(error); + rejectOnce(error); + }, effectiveTimeoutMs); + headerDeadline?.unref?.(); const cleanup = () => { - clearTimeout(headerDeadline); + if (headerDeadline) { + clearTimeout(headerDeadline); + } abortSignal?.removeEventListener("abort", onAbort); }; const rejectOnce = (error: unknown) => { @@ -284,7 +310,9 @@ function openSignalEventStream( res.destroy(); return; } - clearTimeout(headerDeadline); + if (headerDeadline) { + clearTimeout(headerDeadline); + } settled = true; response = res; resolve({ response: res, cleanup }); diff --git a/extensions/signal/src/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts b/extensions/signal/src/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts index fcd9033990d..07d3df3a924 100644 --- a/extensions/signal/src/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts +++ b/extensions/signal/src/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts @@ -1,3 +1,4 @@ +import { Buffer } from "node:buffer"; import { describe, expect, it, vi } from "vitest"; import { config, @@ -10,7 +11,7 @@ import { installSignalToolResultTestHooks(); const { monitorSignalProvider } = await import("./monitor.js"); -const { replyMock, sendMock, streamMock, upsertPairingRequestMock } = +const { replyMock, sendMock, streamMock, signalRpcRequestMock, upsertPairingRequestMock } = getSignalToolResultTestMocks(); type MonitorSignalProviderOptions = Parameters[0]; @@ -109,9 +110,55 @@ describe("monitorSignalProvider tool results", () => { await monitorPromise; expect(streamMock).toHaveBeenCalledTimes(2); + expect(streamMock.mock.calls[0]?.[0]).toMatchObject({ timeoutMs: 0 }); + expect(streamMock.mock.calls[1]?.[0]).toMatchObject({ timeoutMs: 0 }); } finally { randomSpy.mockRestore(); vi.useRealTimers(); } }); + + it("sizes attachment RPC response caps from mediaMaxMb", async () => { + const abortController = new AbortController(); + const maxBytes = 2 * 1024 * 1024; + const expectedMaxResponseBytes = Math.ceil((maxBytes * 4) / 3) + 64 * 1024; + + replyMock.mockResolvedValue({ text: "ok" }); + signalRpcRequestMock.mockResolvedValue({ data: Buffer.from("hello").toString("base64") }); + streamMock.mockImplementation(async ({ onEvent }) => { + await onEvent({ + event: "receive", + data: JSON.stringify({ + envelope: { + sourceNumber: "+15550001111", + sourceName: "Ada", + timestamp: 1, + dataMessage: { + message: "", + attachments: [{ id: "attachment-1", size: 1_500_000, contentType: "text/plain" }], + }, + }, + }), + }); + abortController.abort(); + }); + + await monitorSignalProvider({ + autoStart: false, + baseUrl: "http://127.0.0.1:8080", + mediaMaxMb: 2, + abortSignal: abortController.signal, + }); + + await flush(); + + expect(signalRpcRequestMock).toHaveBeenCalledWith( + "getAttachment", + expect.objectContaining({ id: "attachment-1", recipient: "+15550001111" }), + expect.objectContaining({ + baseUrl: "http://127.0.0.1:8080", + maxResponseBytes: expectedMaxResponseBytes, + }), + ); + }); }); diff --git a/extensions/signal/src/monitor.ts b/extensions/signal/src/monitor.ts index 985e370c7e3..af5abe994b6 100644 --- a/extensions/signal/src/monitor.ts +++ b/extensions/signal/src/monitor.ts @@ -255,6 +255,20 @@ async function waitForSignalDaemonReady(params: { }); } +const SIGNAL_ATTACHMENT_RPC_RESPONSE_HEADROOM_BYTES = 64 * 1024; +const SIGNAL_BASE64_OVERHEAD_NUMERATOR = 4; +const SIGNAL_BASE64_OVERHEAD_DENOMINATOR = 3; + +function deriveSignalAttachmentRpcMaxResponseBytes(maxBytes: number): number | undefined { + if (!Number.isFinite(maxBytes) || maxBytes <= 0) { + return undefined; + } + const base64Bytes = Math.ceil( + (maxBytes * SIGNAL_BASE64_OVERHEAD_NUMERATOR) / SIGNAL_BASE64_OVERHEAD_DENOMINATOR, + ); + return base64Bytes + SIGNAL_ATTACHMENT_RPC_RESPONSE_HEADROOM_BYTES; +} + async function fetchAttachment(params: { baseUrl: string; account?: string; @@ -288,6 +302,7 @@ async function fetchAttachment(params: { const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, { baseUrl: params.baseUrl, + maxResponseBytes: deriveSignalAttachmentRpcMaxResponseBytes(params.maxBytes), }); if (!result?.data) { return null; @@ -489,6 +504,8 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi account, abortSignal: daemonLifecycle.abortSignal, runtime, + // signal-cli can keep the SSE event endpoint idle until the next inbound event. + timeoutMs: 0, policy: opts.reconnectPolicy, onEvent: (event) => { void handleEvent(event).catch((err) => { diff --git a/extensions/signal/src/sse-reconnect.ts b/extensions/signal/src/sse-reconnect.ts index 75bfe480a40..5270b9dd304 100644 --- a/extensions/signal/src/sse-reconnect.ts +++ b/extensions/signal/src/sse-reconnect.ts @@ -21,6 +21,7 @@ type RunSignalSseLoopParams = { abortSignal?: AbortSignal; runtime: RuntimeEnv; onEvent: (event: SignalSseEvent) => void; + timeoutMs?: number; policy?: Partial; }; @@ -30,6 +31,7 @@ export async function runSignalSseLoop({ abortSignal, runtime, onEvent, + timeoutMs, policy, }: RunSignalSseLoopParams) { const reconnectPolicy = { @@ -54,6 +56,7 @@ export async function runSignalSseLoop({ baseUrl, account, abortSignal, + timeoutMs, onEvent: (event) => { reconnectAttempts = 0; onEvent(event);