mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:10:45 +00:00
fix(runtime): harden dependency install surfaces (#71997)
* fix(runtime): harden dependency surfaces * fix(runtime): harden dependency install surfaces * fix(runtime): address dependency surface review * fix(runtime): address dependency surface review * fix(channels): avoid read-only plugin loader cycle * fix(channels): allow optional read-only loader workspace * test(commands): refresh current main checks * test(commands): keep provider metadata mock unique * test(commands): keep doctor security read-only mock unique
This commit is contained in:
@@ -1,11 +1,7 @@
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const fetchWithTimeoutMock = vi.fn();
|
||||
const resolveFetchMock = vi.fn();
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/fetch-runtime", () => ({
|
||||
resolveFetch: (...args: unknown[]) => resolveFetchMock(...args),
|
||||
}));
|
||||
import { Buffer } from "node:buffer";
|
||||
import { once } from "node:events";
|
||||
import http, { type IncomingMessage, type ServerResponse } from "node:http";
|
||||
import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/core", async () => {
|
||||
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/core")>(
|
||||
@@ -17,47 +13,91 @@ vi.mock("openclaw/plugin-sdk/core", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/text-runtime", () => ({
|
||||
fetchWithTimeout: (...args: unknown[]) => fetchWithTimeoutMock(...args),
|
||||
}));
|
||||
|
||||
let signalCheck: typeof import("./client.js").signalCheck;
|
||||
let signalRpcRequest: typeof import("./client.js").signalRpcRequest;
|
||||
let streamSignalEvents: typeof import("./client.js").streamSignalEvents;
|
||||
|
||||
function rpcResponse(body: unknown, status = 200): Response {
|
||||
if (typeof body === "string") {
|
||||
return new Response(body, { status });
|
||||
const servers: http.Server[] = [];
|
||||
|
||||
async function readRequestBody(req: IncomingMessage): Promise<string> {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req as AsyncIterable<Buffer | string>) {
|
||||
chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk);
|
||||
}
|
||||
return new Response(JSON.stringify(body), { status });
|
||||
return Buffer.concat(chunks).toString("utf8");
|
||||
}
|
||||
|
||||
async function withSignalServer(
|
||||
handler: (req: IncomingMessage, res: ServerResponse) => void | Promise<void>,
|
||||
): Promise<string> {
|
||||
const server = http.createServer((req, res) => {
|
||||
void Promise.resolve(handler(req, res)).catch((error: unknown) => {
|
||||
res.writeHead(500, { "Content-Type": "text/plain" });
|
||||
res.end(error instanceof Error ? error.message : String(error));
|
||||
});
|
||||
});
|
||||
servers.push(server);
|
||||
server.listen(0, "127.0.0.1");
|
||||
await once(server, "listening");
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("missing test server address");
|
||||
}
|
||||
return `http://127.0.0.1:${address.port}`;
|
||||
}
|
||||
|
||||
beforeAll(async () => {
|
||||
({ signalCheck, signalRpcRequest, streamSignalEvents } = await import("./client.js"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await Promise.all(
|
||||
servers.splice(0).map(
|
||||
(server) =>
|
||||
new Promise<void>((resolve, reject) => {
|
||||
server.close((error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
describe("signalRpcRequest", () => {
|
||||
beforeAll(async () => {
|
||||
({ signalRpcRequest } = await import("./client.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
resolveFetchMock.mockReturnValue(vi.fn());
|
||||
});
|
||||
|
||||
it("returns parsed RPC result", async () => {
|
||||
fetchWithTimeoutMock.mockResolvedValueOnce(
|
||||
rpcResponse({ jsonrpc: "2.0", result: { version: "0.13.22" }, id: "test-id" }),
|
||||
);
|
||||
const baseUrl = await withSignalServer(async (req, res) => {
|
||||
expect(req.method).toBe("POST");
|
||||
expect(req.url).toBe("/api/v1/rpc");
|
||||
expect(req.headers["content-type"]).toBe("application/json");
|
||||
expect(JSON.parse(await readRequestBody(req))).toEqual({
|
||||
jsonrpc: "2.0",
|
||||
method: "version",
|
||||
id: "test-id",
|
||||
});
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ jsonrpc: "2.0", result: { version: "0.13.22" }, id: "test-id" }));
|
||||
});
|
||||
|
||||
const result = await signalRpcRequest<{ version: string }>("version", undefined, {
|
||||
baseUrl: "http://127.0.0.1:8080",
|
||||
baseUrl,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ version: "0.13.22" });
|
||||
});
|
||||
|
||||
it("throws a wrapped error when RPC response JSON is malformed", async () => {
|
||||
fetchWithTimeoutMock.mockResolvedValueOnce(rpcResponse("not-json", 502));
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(502, { "Content-Type": "text/plain" });
|
||||
res.end("not-json");
|
||||
});
|
||||
|
||||
await expect(
|
||||
signalRpcRequest("version", undefined, {
|
||||
baseUrl: "http://127.0.0.1:8080",
|
||||
baseUrl,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
message: "Signal RPC returned malformed JSON (status 502)",
|
||||
@@ -66,12 +106,159 @@ describe("signalRpcRequest", () => {
|
||||
});
|
||||
|
||||
it("throws when RPC response envelope has neither result nor error", async () => {
|
||||
fetchWithTimeoutMock.mockResolvedValueOnce(rpcResponse({ jsonrpc: "2.0", id: "test-id" }));
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ jsonrpc: "2.0", id: "test-id" }));
|
||||
});
|
||||
|
||||
await expect(
|
||||
signalRpcRequest("version", undefined, {
|
||||
baseUrl: "http://127.0.0.1:8080",
|
||||
baseUrl,
|
||||
}),
|
||||
).rejects.toThrow("Signal RPC returned invalid response envelope (status 200)");
|
||||
});
|
||||
|
||||
it("rejects credentialed base URLs", async () => {
|
||||
await expect(
|
||||
signalRpcRequest("version", undefined, {
|
||||
baseUrl: "http://user:pass@127.0.0.1:8080",
|
||||
}),
|
||||
).rejects.toThrow("Signal base URL must not include credentials");
|
||||
});
|
||||
|
||||
it("rejects oversized RPC responses", async () => {
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end("x".repeat(1_048_577));
|
||||
});
|
||||
|
||||
await expect(
|
||||
signalRpcRequest("version", undefined, {
|
||||
baseUrl,
|
||||
}),
|
||||
).rejects.toThrow("Signal HTTP response exceeded size limit");
|
||||
});
|
||||
|
||||
it("uses an absolute deadline for slow-drip RPC responses", async () => {
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
const interval = setInterval(() => {
|
||||
res.write(" ");
|
||||
}, 5);
|
||||
res.on("close", () => clearInterval(interval));
|
||||
});
|
||||
|
||||
await expect(
|
||||
signalRpcRequest("version", undefined, {
|
||||
baseUrl,
|
||||
timeoutMs: 25,
|
||||
}),
|
||||
).rejects.toThrow("Signal HTTP exceeded deadline after 25ms");
|
||||
});
|
||||
});
|
||||
|
||||
describe("signalCheck", () => {
|
||||
it("returns ok for a healthy signal-cli check", async () => {
|
||||
const baseUrl = await withSignalServer((req, res) => {
|
||||
expect(req.method).toBe("GET");
|
||||
expect(req.url).toBe("/api/v1/check");
|
||||
res.writeHead(204);
|
||||
res.end();
|
||||
});
|
||||
|
||||
await expect(signalCheck(baseUrl)).resolves.toEqual({ ok: true, status: 204, error: null });
|
||||
});
|
||||
|
||||
it("returns an HTTP status failure for unhealthy checks", async () => {
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(503);
|
||||
res.end("down");
|
||||
});
|
||||
|
||||
await expect(signalCheck(baseUrl)).resolves.toEqual({
|
||||
ok: false,
|
||||
status: 503,
|
||||
error: "HTTP 503",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("streamSignalEvents", () => {
|
||||
it("streams events through node http instead of fetch", async () => {
|
||||
const events: Array<import("./client.js").SignalSseEvent> = [];
|
||||
const baseUrl = await withSignalServer((req, res) => {
|
||||
expect(req.url).toBe("/api/v1/events?account=%2B15555550123");
|
||||
expect(req.headers.accept).toBe("text/event-stream");
|
||||
res.writeHead(200, { "Content-Type": "text/event-stream" });
|
||||
res.end('id: 42\nevent: message\ndata: {"group":true}\n\n');
|
||||
});
|
||||
|
||||
await streamSignalEvents({
|
||||
baseUrl,
|
||||
account: "+15555550123",
|
||||
onEvent: (event) => events.push(event),
|
||||
});
|
||||
|
||||
expect(events).toEqual([{ id: "42", event: "message", data: '{"group":true}' }]);
|
||||
});
|
||||
|
||||
it("reports HTTP status failures from the event stream", async () => {
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(503, "Unavailable");
|
||||
res.end("down");
|
||||
});
|
||||
|
||||
await expect(
|
||||
streamSignalEvents({
|
||||
baseUrl,
|
||||
onEvent: () => {},
|
||||
}),
|
||||
).rejects.toThrow("Signal SSE failed (503 Unavailable)");
|
||||
});
|
||||
|
||||
it("rejects event streams that do not send headers before the deadline", async () => {
|
||||
const baseUrl = await withSignalServer(() => {
|
||||
// Leave the request open without response headers.
|
||||
});
|
||||
|
||||
await expect(
|
||||
streamSignalEvents({
|
||||
baseUrl,
|
||||
timeoutMs: 25,
|
||||
onEvent: () => {},
|
||||
}),
|
||||
).rejects.toThrow("Signal SSE connection timed out after 25ms");
|
||||
});
|
||||
|
||||
it("rejects oversized SSE line buffers by byte size", async () => {
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(200, { "Content-Type": "text/event-stream" });
|
||||
res.end(`data: ${"🙂".repeat(262_145)}`);
|
||||
});
|
||||
|
||||
await expect(
|
||||
streamSignalEvents({
|
||||
baseUrl,
|
||||
onEvent: () => {},
|
||||
}),
|
||||
).rejects.toThrow("Signal SSE buffer exceeded size limit");
|
||||
});
|
||||
|
||||
it("rejects oversized SSE events split across smaller data lines", async () => {
|
||||
const baseUrl = await withSignalServer((_req, res) => {
|
||||
res.writeHead(200, { "Content-Type": "text/event-stream" });
|
||||
const line = `data: ${"x".repeat(4096)}\n`;
|
||||
for (let index = 0; index < 260; index += 1) {
|
||||
res.write(line);
|
||||
}
|
||||
res.end();
|
||||
});
|
||||
|
||||
await expect(
|
||||
streamSignalEvents({
|
||||
baseUrl,
|
||||
onEvent: () => {},
|
||||
}),
|
||||
).rejects.toThrow("Signal SSE event data exceeded size limit");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { Buffer } from "node:buffer";
|
||||
import http, { type ClientRequest, type IncomingMessage } from "node:http";
|
||||
import https from "node:https";
|
||||
import { generateSecureUuid } from "openclaw/plugin-sdk/core";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { resolveFetch } from "openclaw/plugin-sdk/fetch-runtime";
|
||||
import { fetchWithTimeout } from "openclaw/plugin-sdk/text-runtime";
|
||||
|
||||
export type SignalRpcOptions = {
|
||||
baseUrl: string;
|
||||
@@ -28,6 +29,21 @@ export type SignalSseEvent = {
|
||||
};
|
||||
|
||||
const DEFAULT_TIMEOUT_MS = 10_000;
|
||||
const MAX_SIGNAL_HTTP_RESPONSE_BYTES = 1_048_576;
|
||||
const MAX_SIGNAL_SSE_BUFFER_BYTES = 1_048_576;
|
||||
const MAX_SIGNAL_SSE_EVENT_DATA_BYTES = 1_048_576;
|
||||
|
||||
type SignalHttpResponse = {
|
||||
status: number;
|
||||
statusText: string;
|
||||
text: string;
|
||||
};
|
||||
|
||||
function createSignalSseAbortError(): Error {
|
||||
const error = new Error("Signal SSE aborted");
|
||||
error.name = "AbortError";
|
||||
return error;
|
||||
}
|
||||
|
||||
function normalizeBaseUrl(url: string): string {
|
||||
const trimmed = url.trim();
|
||||
@@ -40,12 +56,16 @@ function normalizeBaseUrl(url: string): string {
|
||||
return `http://${trimmed}`.replace(/\/+$/, "");
|
||||
}
|
||||
|
||||
function getRequiredFetch(): typeof fetch {
|
||||
const fetchImpl = resolveFetch();
|
||||
if (!fetchImpl) {
|
||||
throw new Error("fetch is not available");
|
||||
function parseSignalBaseUrl(url: string): URL {
|
||||
const parsed = new URL(normalizeBaseUrl(url));
|
||||
if (parsed.username || parsed.password) {
|
||||
throw new Error("Signal base URL must not include credentials");
|
||||
}
|
||||
return fetchImpl;
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function resolveSignalEndpointUrl(baseUrl: string, pathname: string): URL {
|
||||
return new URL(pathname, parseSignalBaseUrl(baseUrl));
|
||||
}
|
||||
|
||||
function parseSignalRpcResponse<T>(text: string, status: number): SignalRpcResponse<T> {
|
||||
@@ -68,12 +88,97 @@ function parseSignalRpcResponse<T>(text: string, status: number): SignalRpcRespo
|
||||
return rpc;
|
||||
}
|
||||
|
||||
function assertSignalHttpProtocol(url: URL, label: string): void {
|
||||
if (url.protocol !== "http:" && url.protocol !== "https:") {
|
||||
throw new Error(`Signal ${label} unsupported protocol: ${url.protocol}`);
|
||||
}
|
||||
}
|
||||
|
||||
function requestSignalHttpText(
|
||||
url: URL,
|
||||
options: {
|
||||
method: "GET" | "POST";
|
||||
headers?: Record<string, string>;
|
||||
body?: string;
|
||||
timeoutMs: number;
|
||||
},
|
||||
): Promise<SignalHttpResponse> {
|
||||
assertSignalHttpProtocol(url, "HTTP");
|
||||
const client = url.protocol === "https:" ? https : http;
|
||||
return new Promise((resolve, reject) => {
|
||||
let settled = false;
|
||||
let request: ClientRequest | undefined;
|
||||
const deadline = setTimeout(() => {
|
||||
request?.destroy(new Error(`Signal HTTP exceeded deadline after ${options.timeoutMs}ms`));
|
||||
}, options.timeoutMs);
|
||||
deadline.unref?.();
|
||||
const cleanup = () => {
|
||||
clearTimeout(deadline);
|
||||
request?.setTimeout(0);
|
||||
};
|
||||
const rejectOnce = (error: unknown) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
cleanup();
|
||||
reject(error);
|
||||
};
|
||||
const resolveOnce = (response: SignalHttpResponse) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
cleanup();
|
||||
resolve(response);
|
||||
};
|
||||
request = client.request(
|
||||
url,
|
||||
{
|
||||
method: options.method,
|
||||
headers: options.headers,
|
||||
},
|
||||
(res) => {
|
||||
const chunks: Buffer[] = [];
|
||||
let totalBytes = 0;
|
||||
res.on("data", (chunk: Buffer | string) => {
|
||||
const next = typeof chunk === "string" ? Buffer.from(chunk) : chunk;
|
||||
totalBytes += next.byteLength;
|
||||
if (totalBytes > MAX_SIGNAL_HTTP_RESPONSE_BYTES) {
|
||||
const error = new Error("Signal HTTP response exceeded size limit");
|
||||
request?.destroy(error);
|
||||
res.destroy(error);
|
||||
rejectOnce(error);
|
||||
return;
|
||||
}
|
||||
chunks.push(next);
|
||||
});
|
||||
res.on("error", rejectOnce);
|
||||
res.on("end", () => {
|
||||
resolveOnce({
|
||||
status: res.statusCode ?? 0,
|
||||
statusText: res.statusMessage || "error",
|
||||
text: Buffer.concat(chunks).toString("utf8"),
|
||||
});
|
||||
});
|
||||
},
|
||||
);
|
||||
request.setTimeout(options.timeoutMs, () => {
|
||||
request?.destroy(new Error(`Signal HTTP timed out after ${options.timeoutMs}ms`));
|
||||
});
|
||||
request.on("error", rejectOnce);
|
||||
if (options.body !== undefined) {
|
||||
request.write(options.body);
|
||||
}
|
||||
request.end();
|
||||
});
|
||||
}
|
||||
|
||||
export async function signalRpcRequest<T = unknown>(
|
||||
method: string,
|
||||
params: Record<string, unknown> | undefined,
|
||||
opts: SignalRpcOptions,
|
||||
): Promise<T> {
|
||||
const baseUrl = normalizeBaseUrl(opts.baseUrl);
|
||||
const id = generateSecureUuid();
|
||||
const body = JSON.stringify({
|
||||
jsonrpc: "2.0",
|
||||
@@ -81,24 +186,22 @@ export async function signalRpcRequest<T = unknown>(
|
||||
params,
|
||||
id,
|
||||
});
|
||||
const res = await fetchWithTimeout(
|
||||
`${baseUrl}/api/v1/rpc`,
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body,
|
||||
const res = await requestSignalHttpText(resolveSignalEndpointUrl(opts.baseUrl, "/api/v1/rpc"), {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": String(Buffer.byteLength(body)),
|
||||
},
|
||||
opts.timeoutMs ?? DEFAULT_TIMEOUT_MS,
|
||||
getRequiredFetch(),
|
||||
);
|
||||
body,
|
||||
timeoutMs: opts.timeoutMs ?? DEFAULT_TIMEOUT_MS,
|
||||
});
|
||||
if (res.status === 201) {
|
||||
return undefined as T;
|
||||
}
|
||||
const text = await res.text();
|
||||
if (!text) {
|
||||
if (!res.text) {
|
||||
throw new Error(`Signal RPC empty response (status ${res.status})`);
|
||||
}
|
||||
const parsed = parseSignalRpcResponse<T>(text, res.status);
|
||||
const parsed = parseSignalRpcResponse<T>(res.text, res.status);
|
||||
if (parsed.error) {
|
||||
const code = parsed.error.code ?? "unknown";
|
||||
const msg = parsed.error.message ?? "Signal RPC error";
|
||||
@@ -111,15 +214,12 @@ export async function signalCheck(
|
||||
baseUrl: string,
|
||||
timeoutMs = DEFAULT_TIMEOUT_MS,
|
||||
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
|
||||
const normalized = normalizeBaseUrl(baseUrl);
|
||||
try {
|
||||
const res = await fetchWithTimeout(
|
||||
`${normalized}/api/v1/check`,
|
||||
{ method: "GET" },
|
||||
const res = await requestSignalHttpText(resolveSignalEndpointUrl(baseUrl, "/api/v1/check"), {
|
||||
method: "GET",
|
||||
timeoutMs,
|
||||
getRequiredFetch(),
|
||||
);
|
||||
if (!res.ok) {
|
||||
});
|
||||
if (res.status < 200 || res.status >= 300) {
|
||||
return { ok: false, status: res.status, error: `HTTP ${res.status}` };
|
||||
}
|
||||
return { ok: true, status: res.status, error: null };
|
||||
@@ -132,35 +232,99 @@ export async function signalCheck(
|
||||
}
|
||||
}
|
||||
|
||||
function openSignalEventStream(
|
||||
url: URL,
|
||||
abortSignal?: AbortSignal,
|
||||
timeoutMs = DEFAULT_TIMEOUT_MS,
|
||||
): Promise<{ response: IncomingMessage; cleanup: () => void }> {
|
||||
assertSignalHttpProtocol(url, "SSE");
|
||||
if (abortSignal?.aborted) {
|
||||
throw createSignalSseAbortError();
|
||||
}
|
||||
|
||||
const client = url.protocol === "https:" ? https : http;
|
||||
return new Promise((resolve, reject) => {
|
||||
let settled = false;
|
||||
let response: IncomingMessage | undefined;
|
||||
let onAbort: () => void = () => {};
|
||||
let request: ClientRequest;
|
||||
const headerDeadline = setTimeout(() => {
|
||||
const error = new Error(`Signal SSE connection timed out after ${timeoutMs}ms`);
|
||||
response?.destroy(error);
|
||||
request.destroy(error);
|
||||
rejectOnce(error);
|
||||
}, timeoutMs);
|
||||
headerDeadline.unref?.();
|
||||
const cleanup = () => {
|
||||
clearTimeout(headerDeadline);
|
||||
abortSignal?.removeEventListener("abort", onAbort);
|
||||
};
|
||||
const rejectOnce = (error: unknown) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
cleanup();
|
||||
reject(error);
|
||||
};
|
||||
request = client.request(
|
||||
url,
|
||||
{
|
||||
method: "GET",
|
||||
headers: { Accept: "text/event-stream" },
|
||||
},
|
||||
(res) => {
|
||||
const status = res.statusCode ?? 0;
|
||||
if (status < 200 || status >= 300) {
|
||||
res.resume();
|
||||
rejectOnce(new Error(`Signal SSE failed (${status} ${res.statusMessage || "error"})`));
|
||||
return;
|
||||
}
|
||||
if (settled) {
|
||||
res.destroy();
|
||||
return;
|
||||
}
|
||||
clearTimeout(headerDeadline);
|
||||
settled = true;
|
||||
response = res;
|
||||
resolve({ response: res, cleanup });
|
||||
},
|
||||
);
|
||||
onAbort = () => {
|
||||
const error = createSignalSseAbortError();
|
||||
response?.destroy(error);
|
||||
request.destroy(error);
|
||||
rejectOnce(error);
|
||||
};
|
||||
|
||||
abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
request.on("error", rejectOnce);
|
||||
request.end();
|
||||
});
|
||||
}
|
||||
|
||||
export async function streamSignalEvents(params: {
|
||||
baseUrl: string;
|
||||
account?: string;
|
||||
abortSignal?: AbortSignal;
|
||||
timeoutMs?: number;
|
||||
onEvent: (event: SignalSseEvent) => void;
|
||||
}): Promise<void> {
|
||||
const baseUrl = normalizeBaseUrl(params.baseUrl);
|
||||
const url = new URL(`${baseUrl}/api/v1/events`);
|
||||
const url = resolveSignalEndpointUrl(params.baseUrl, "/api/v1/events");
|
||||
if (params.account) {
|
||||
url.searchParams.set("account", params.account);
|
||||
}
|
||||
|
||||
const fetchImpl = resolveFetch();
|
||||
if (!fetchImpl) {
|
||||
throw new Error("fetch is not available");
|
||||
}
|
||||
const res = await fetchImpl(url, {
|
||||
method: "GET",
|
||||
headers: { Accept: "text/event-stream" },
|
||||
signal: params.abortSignal,
|
||||
});
|
||||
if (!res.ok || !res.body) {
|
||||
throw new Error(`Signal SSE failed (${res.status} ${res.statusText || "error"})`);
|
||||
}
|
||||
|
||||
const reader = res.body.getReader();
|
||||
const { response, cleanup } = await openSignalEventStream(
|
||||
url,
|
||||
params.abortSignal,
|
||||
params.timeoutMs ?? DEFAULT_TIMEOUT_MS,
|
||||
);
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
let bufferedBytes = 0;
|
||||
let currentEvent: SignalSseEvent = {};
|
||||
let currentEventDataBytes = 0;
|
||||
|
||||
const flushEvent = () => {
|
||||
if (!currentEvent.data && !currentEvent.event && !currentEvent.id) {
|
||||
@@ -172,14 +336,36 @@ export async function streamSignalEvents(params: {
|
||||
id: currentEvent.id,
|
||||
});
|
||||
currentEvent = {};
|
||||
currentEventDataBytes = 0;
|
||||
};
|
||||
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
const processLine = (line: string) => {
|
||||
if (line === "") {
|
||||
flushEvent();
|
||||
return;
|
||||
}
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
if (line.startsWith(":")) {
|
||||
return;
|
||||
}
|
||||
const [rawField, ...rest] = line.split(":");
|
||||
const field = rawField.trim();
|
||||
const rawValue = rest.join(":");
|
||||
const value = rawValue.startsWith(" ") ? rawValue.slice(1) : rawValue;
|
||||
if (field === "event") {
|
||||
currentEvent.event = value;
|
||||
} else if (field === "data") {
|
||||
const segment = currentEvent.data ? `\n${value}` : value;
|
||||
currentEventDataBytes += Buffer.byteLength(segment, "utf8");
|
||||
if (currentEventDataBytes > MAX_SIGNAL_SSE_EVENT_DATA_BYTES) {
|
||||
throw new Error("Signal SSE event data exceeded size limit");
|
||||
}
|
||||
currentEvent.data = currentEvent.data ? `${currentEvent.data}${segment}` : segment;
|
||||
} else if (field === "id") {
|
||||
currentEvent.id = value;
|
||||
}
|
||||
};
|
||||
|
||||
const drainCompleteLines = () => {
|
||||
let lineEnd = buffer.indexOf("\n");
|
||||
while (lineEnd !== -1) {
|
||||
let line = buffer.slice(0, lineEnd);
|
||||
@@ -187,29 +373,33 @@ export async function streamSignalEvents(params: {
|
||||
if (line.endsWith("\r")) {
|
||||
line = line.slice(0, -1);
|
||||
}
|
||||
|
||||
if (line === "") {
|
||||
flushEvent();
|
||||
lineEnd = buffer.indexOf("\n");
|
||||
continue;
|
||||
}
|
||||
if (line.startsWith(":")) {
|
||||
lineEnd = buffer.indexOf("\n");
|
||||
continue;
|
||||
}
|
||||
const [rawField, ...rest] = line.split(":");
|
||||
const field = rawField.trim();
|
||||
const rawValue = rest.join(":");
|
||||
const value = rawValue.startsWith(" ") ? rawValue.slice(1) : rawValue;
|
||||
if (field === "event") {
|
||||
currentEvent.event = value;
|
||||
} else if (field === "data") {
|
||||
currentEvent.data = currentEvent.data ? `${currentEvent.data}\n${value}` : value;
|
||||
} else if (field === "id") {
|
||||
currentEvent.id = value;
|
||||
}
|
||||
processLine(line);
|
||||
lineEnd = buffer.indexOf("\n");
|
||||
}
|
||||
bufferedBytes = Buffer.byteLength(buffer, "utf8");
|
||||
};
|
||||
|
||||
try {
|
||||
for await (const chunk of response as AsyncIterable<Buffer | string>) {
|
||||
const value = typeof chunk === "string" ? Buffer.from(chunk) : chunk;
|
||||
bufferedBytes += value.byteLength;
|
||||
if (bufferedBytes > MAX_SIGNAL_SSE_BUFFER_BYTES) {
|
||||
throw new Error("Signal SSE buffer exceeded size limit");
|
||||
}
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
drainCompleteLines();
|
||||
}
|
||||
const tail = decoder.decode();
|
||||
if (tail) {
|
||||
buffer += tail;
|
||||
bufferedBytes = Buffer.byteLength(buffer, "utf8");
|
||||
}
|
||||
if (bufferedBytes > MAX_SIGNAL_SSE_BUFFER_BYTES) {
|
||||
throw new Error("Signal SSE buffer exceeded size limit");
|
||||
}
|
||||
drainCompleteLines();
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
flushEvent();
|
||||
|
||||
Reference in New Issue
Block a user