diff --git a/CHANGELOG.md b/CHANGELOG.md index 8118fb11c69..f65c9999c9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -111,6 +111,11 @@ Docs: https://docs.openclaw.ai - Installer: warn when multiple npm global roots contain OpenClaw installs, showing active Node/npm/openclaw plus each install path and version so stale version-manager installs are visible. Fixes #40839. Thanks @zhixianio. - Cron/tasks: recover completed cron task ledger records from durable run logs and job state before marking them `lost`, reducing false `backing session missing` audit errors for isolated cron runs and keeping offline CLI audit from treating its empty local cron active-job set as authoritative. Fixes #71963. - Docker: copy patched dependency files into runtime images so downstream `pnpm install` layers keep working. Fixes #69224. Thanks @gucasbrg. +- Package: include patched dependency files in the published npm package so downstream installs can resolve `patchedDependencies`. (#69224) Thanks @gucasbrg and @vincentkoc. +- Plugins/channels: treat malformed bundled channel plugin loaders that return `undefined` as unavailable instead of crashing config and help paths. Fixes #69044. Thanks @frankhli843 and @vincentkoc. +- Scripts/watch: show corrupted dependency package-config recovery guidance when `gateway:watch` fails during watcher startup, without double-logging unrelated import failures. (#58780) Thanks @roytong9 and @vincentkoc. +- Signal: read signal-cli RPC, health checks, and SSE events through Node's HTTP client so Node 24/25 fetch regressions do not break Signal sends or inbound events. Fixes #51716 and #53040. Thanks @Barukimang, @minupla, and @vincentkoc. +- Skills/Docker: run npm-backed skill dependency installs with an OpenClaw-managed user prefix so non-root Docker images do not write to `/usr/local`. Fixes #59601. Thanks @chanjarster and @vincentkoc. - Agents/runtime: submit heartbeat, cron, and exec wakeups as transient runtime context instead of visible user prompts, keeping synthetic system work out of chat transcripts. Fixes #66496 and #66814. Thanks @jeades and @mandomaker. - Telegram: include native quote excerpts automatically for threaded replies and reply tags when the original Telegram text is available, without adding another config knob. Fixes #6975. Thanks @rex05ai. - Node/Linux: make `openclaw node install` enable and restart the `openclaw-node` systemd unit instead of the gateway unit on node-only VMs. Fixes #68287. Thanks @dlebee-agent. diff --git a/extensions/signal/src/client.test.ts b/extensions/signal/src/client.test.ts index b24ba47bb60..be81e71c9c3 100644 --- a/extensions/signal/src/client.test.ts +++ b/extensions/signal/src/client.test.ts @@ -1,11 +1,7 @@ -import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; - -const fetchWithTimeoutMock = vi.fn(); -const resolveFetchMock = vi.fn(); - -vi.mock("openclaw/plugin-sdk/fetch-runtime", () => ({ - resolveFetch: (...args: unknown[]) => resolveFetchMock(...args), -})); +import { Buffer } from "node:buffer"; +import { once } from "node:events"; +import http, { type IncomingMessage, type ServerResponse } from "node:http"; +import { afterEach, beforeAll, describe, expect, it, vi } from "vitest"; vi.mock("openclaw/plugin-sdk/core", async () => { const actual = await vi.importActual( @@ -17,47 +13,91 @@ vi.mock("openclaw/plugin-sdk/core", async () => { }; }); -vi.mock("openclaw/plugin-sdk/text-runtime", () => ({ - fetchWithTimeout: (...args: unknown[]) => fetchWithTimeoutMock(...args), -})); - +let signalCheck: typeof import("./client.js").signalCheck; let signalRpcRequest: typeof import("./client.js").signalRpcRequest; +let streamSignalEvents: typeof import("./client.js").streamSignalEvents; -function rpcResponse(body: unknown, status = 200): Response { - if (typeof body === "string") { - return new Response(body, { status }); +const servers: http.Server[] = []; + +async function readRequestBody(req: IncomingMessage): Promise { + const chunks: Buffer[] = []; + for await (const chunk of req as AsyncIterable) { + chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); } - return new Response(JSON.stringify(body), { status }); + return Buffer.concat(chunks).toString("utf8"); } +async function withSignalServer( + handler: (req: IncomingMessage, res: ServerResponse) => void | Promise, +): Promise { + const server = http.createServer((req, res) => { + void Promise.resolve(handler(req, res)).catch((error: unknown) => { + res.writeHead(500, { "Content-Type": "text/plain" }); + res.end(error instanceof Error ? error.message : String(error)); + }); + }); + servers.push(server); + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("missing test server address"); + } + return `http://127.0.0.1:${address.port}`; +} + +beforeAll(async () => { + ({ signalCheck, signalRpcRequest, streamSignalEvents } = await import("./client.js")); +}); + +afterEach(async () => { + await Promise.all( + servers.splice(0).map( + (server) => + new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }), + ), + ); +}); + describe("signalRpcRequest", () => { - beforeAll(async () => { - ({ signalRpcRequest } = await import("./client.js")); - }); - - beforeEach(() => { - vi.clearAllMocks(); - resolveFetchMock.mockReturnValue(vi.fn()); - }); - it("returns parsed RPC result", async () => { - fetchWithTimeoutMock.mockResolvedValueOnce( - rpcResponse({ jsonrpc: "2.0", result: { version: "0.13.22" }, id: "test-id" }), - ); + const baseUrl = await withSignalServer(async (req, res) => { + expect(req.method).toBe("POST"); + expect(req.url).toBe("/api/v1/rpc"); + expect(req.headers["content-type"]).toBe("application/json"); + expect(JSON.parse(await readRequestBody(req))).toEqual({ + jsonrpc: "2.0", + method: "version", + id: "test-id", + }); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ jsonrpc: "2.0", result: { version: "0.13.22" }, id: "test-id" })); + }); const result = await signalRpcRequest<{ version: string }>("version", undefined, { - baseUrl: "http://127.0.0.1:8080", + baseUrl, }); expect(result).toEqual({ version: "0.13.22" }); }); it("throws a wrapped error when RPC response JSON is malformed", async () => { - fetchWithTimeoutMock.mockResolvedValueOnce(rpcResponse("not-json", 502)); + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(502, { "Content-Type": "text/plain" }); + res.end("not-json"); + }); await expect( signalRpcRequest("version", undefined, { - baseUrl: "http://127.0.0.1:8080", + baseUrl, }), ).rejects.toMatchObject({ message: "Signal RPC returned malformed JSON (status 502)", @@ -66,12 +106,159 @@ describe("signalRpcRequest", () => { }); it("throws when RPC response envelope has neither result nor error", async () => { - fetchWithTimeoutMock.mockResolvedValueOnce(rpcResponse({ jsonrpc: "2.0", id: "test-id" })); + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ jsonrpc: "2.0", id: "test-id" })); + }); await expect( signalRpcRequest("version", undefined, { - baseUrl: "http://127.0.0.1:8080", + baseUrl, }), ).rejects.toThrow("Signal RPC returned invalid response envelope (status 200)"); }); + + it("rejects credentialed base URLs", async () => { + await expect( + signalRpcRequest("version", undefined, { + baseUrl: "http://user:pass@127.0.0.1:8080", + }), + ).rejects.toThrow("Signal base URL must not include credentials"); + }); + + it("rejects oversized RPC responses", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end("x".repeat(1_048_577)); + }); + + await expect( + signalRpcRequest("version", undefined, { + baseUrl, + }), + ).rejects.toThrow("Signal HTTP response exceeded size limit"); + }); + + it("uses an absolute deadline for slow-drip RPC responses", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + const interval = setInterval(() => { + res.write(" "); + }, 5); + res.on("close", () => clearInterval(interval)); + }); + + await expect( + signalRpcRequest("version", undefined, { + baseUrl, + timeoutMs: 25, + }), + ).rejects.toThrow("Signal HTTP exceeded deadline after 25ms"); + }); +}); + +describe("signalCheck", () => { + it("returns ok for a healthy signal-cli check", async () => { + const baseUrl = await withSignalServer((req, res) => { + expect(req.method).toBe("GET"); + expect(req.url).toBe("/api/v1/check"); + res.writeHead(204); + res.end(); + }); + + await expect(signalCheck(baseUrl)).resolves.toEqual({ ok: true, status: 204, error: null }); + }); + + it("returns an HTTP status failure for unhealthy checks", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(503); + res.end("down"); + }); + + await expect(signalCheck(baseUrl)).resolves.toEqual({ + ok: false, + status: 503, + error: "HTTP 503", + }); + }); +}); + +describe("streamSignalEvents", () => { + it("streams events through node http instead of fetch", async () => { + const events: Array = []; + const baseUrl = await withSignalServer((req, res) => { + expect(req.url).toBe("/api/v1/events?account=%2B15555550123"); + expect(req.headers.accept).toBe("text/event-stream"); + res.writeHead(200, { "Content-Type": "text/event-stream" }); + res.end('id: 42\nevent: message\ndata: {"group":true}\n\n'); + }); + + await streamSignalEvents({ + baseUrl, + account: "+15555550123", + onEvent: (event) => events.push(event), + }); + + expect(events).toEqual([{ id: "42", event: "message", data: '{"group":true}' }]); + }); + + it("reports HTTP status failures from the event stream", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(503, "Unavailable"); + res.end("down"); + }); + + await expect( + streamSignalEvents({ + baseUrl, + onEvent: () => {}, + }), + ).rejects.toThrow("Signal SSE failed (503 Unavailable)"); + }); + + it("rejects event streams that do not send headers before the deadline", async () => { + const baseUrl = await withSignalServer(() => { + // Leave the request open without response headers. + }); + + await expect( + streamSignalEvents({ + baseUrl, + timeoutMs: 25, + onEvent: () => {}, + }), + ).rejects.toThrow("Signal SSE connection timed out after 25ms"); + }); + + it("rejects oversized SSE line buffers by byte size", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "text/event-stream" }); + res.end(`data: ${"🙂".repeat(262_145)}`); + }); + + await expect( + streamSignalEvents({ + baseUrl, + onEvent: () => {}, + }), + ).rejects.toThrow("Signal SSE buffer exceeded size limit"); + }); + + it("rejects oversized SSE events split across smaller data lines", async () => { + const baseUrl = await withSignalServer((_req, res) => { + res.writeHead(200, { "Content-Type": "text/event-stream" }); + const line = `data: ${"x".repeat(4096)}\n`; + for (let index = 0; index < 260; index += 1) { + res.write(line); + } + res.end(); + }); + + await expect( + streamSignalEvents({ + baseUrl, + onEvent: () => {}, + }), + ).rejects.toThrow("Signal SSE event data exceeded size limit"); + }); }); diff --git a/extensions/signal/src/client.ts b/extensions/signal/src/client.ts index 2e9d33ef7d8..093c9eead87 100644 --- a/extensions/signal/src/client.ts +++ b/extensions/signal/src/client.ts @@ -1,7 +1,8 @@ +import { Buffer } from "node:buffer"; +import http, { type ClientRequest, type IncomingMessage } from "node:http"; +import https from "node:https"; import { generateSecureUuid } from "openclaw/plugin-sdk/core"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import { resolveFetch } from "openclaw/plugin-sdk/fetch-runtime"; -import { fetchWithTimeout } from "openclaw/plugin-sdk/text-runtime"; export type SignalRpcOptions = { baseUrl: string; @@ -28,6 +29,21 @@ export type SignalSseEvent = { }; const DEFAULT_TIMEOUT_MS = 10_000; +const MAX_SIGNAL_HTTP_RESPONSE_BYTES = 1_048_576; +const MAX_SIGNAL_SSE_BUFFER_BYTES = 1_048_576; +const MAX_SIGNAL_SSE_EVENT_DATA_BYTES = 1_048_576; + +type SignalHttpResponse = { + status: number; + statusText: string; + text: string; +}; + +function createSignalSseAbortError(): Error { + const error = new Error("Signal SSE aborted"); + error.name = "AbortError"; + return error; +} function normalizeBaseUrl(url: string): string { const trimmed = url.trim(); @@ -40,12 +56,16 @@ function normalizeBaseUrl(url: string): string { return `http://${trimmed}`.replace(/\/+$/, ""); } -function getRequiredFetch(): typeof fetch { - const fetchImpl = resolveFetch(); - if (!fetchImpl) { - throw new Error("fetch is not available"); +function parseSignalBaseUrl(url: string): URL { + const parsed = new URL(normalizeBaseUrl(url)); + if (parsed.username || parsed.password) { + throw new Error("Signal base URL must not include credentials"); } - return fetchImpl; + return parsed; +} + +function resolveSignalEndpointUrl(baseUrl: string, pathname: string): URL { + return new URL(pathname, parseSignalBaseUrl(baseUrl)); } function parseSignalRpcResponse(text: string, status: number): SignalRpcResponse { @@ -68,12 +88,97 @@ function parseSignalRpcResponse(text: string, status: number): SignalRpcRespo return rpc; } +function assertSignalHttpProtocol(url: URL, label: string): void { + if (url.protocol !== "http:" && url.protocol !== "https:") { + throw new Error(`Signal ${label} unsupported protocol: ${url.protocol}`); + } +} + +function requestSignalHttpText( + url: URL, + options: { + method: "GET" | "POST"; + headers?: Record; + body?: string; + timeoutMs: number; + }, +): Promise { + assertSignalHttpProtocol(url, "HTTP"); + const client = url.protocol === "https:" ? https : http; + return new Promise((resolve, reject) => { + let settled = false; + let request: ClientRequest | undefined; + const deadline = setTimeout(() => { + request?.destroy(new Error(`Signal HTTP exceeded deadline after ${options.timeoutMs}ms`)); + }, options.timeoutMs); + deadline.unref?.(); + const cleanup = () => { + clearTimeout(deadline); + request?.setTimeout(0); + }; + const rejectOnce = (error: unknown) => { + if (settled) { + return; + } + settled = true; + cleanup(); + reject(error); + }; + const resolveOnce = (response: SignalHttpResponse) => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(response); + }; + request = client.request( + url, + { + method: options.method, + headers: options.headers, + }, + (res) => { + const chunks: Buffer[] = []; + let totalBytes = 0; + res.on("data", (chunk: Buffer | string) => { + const next = typeof chunk === "string" ? Buffer.from(chunk) : chunk; + totalBytes += next.byteLength; + if (totalBytes > MAX_SIGNAL_HTTP_RESPONSE_BYTES) { + const error = new Error("Signal HTTP response exceeded size limit"); + request?.destroy(error); + res.destroy(error); + rejectOnce(error); + return; + } + chunks.push(next); + }); + res.on("error", rejectOnce); + res.on("end", () => { + resolveOnce({ + status: res.statusCode ?? 0, + statusText: res.statusMessage || "error", + text: Buffer.concat(chunks).toString("utf8"), + }); + }); + }, + ); + request.setTimeout(options.timeoutMs, () => { + request?.destroy(new Error(`Signal HTTP timed out after ${options.timeoutMs}ms`)); + }); + request.on("error", rejectOnce); + if (options.body !== undefined) { + request.write(options.body); + } + request.end(); + }); +} + export async function signalRpcRequest( method: string, params: Record | undefined, opts: SignalRpcOptions, ): Promise { - const baseUrl = normalizeBaseUrl(opts.baseUrl); const id = generateSecureUuid(); const body = JSON.stringify({ jsonrpc: "2.0", @@ -81,24 +186,22 @@ export async function signalRpcRequest( params, id, }); - const res = await fetchWithTimeout( - `${baseUrl}/api/v1/rpc`, - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body, + const res = await requestSignalHttpText(resolveSignalEndpointUrl(opts.baseUrl, "/api/v1/rpc"), { + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": String(Buffer.byteLength(body)), }, - opts.timeoutMs ?? DEFAULT_TIMEOUT_MS, - getRequiredFetch(), - ); + body, + timeoutMs: opts.timeoutMs ?? DEFAULT_TIMEOUT_MS, + }); if (res.status === 201) { return undefined as T; } - const text = await res.text(); - if (!text) { + if (!res.text) { throw new Error(`Signal RPC empty response (status ${res.status})`); } - const parsed = parseSignalRpcResponse(text, res.status); + const parsed = parseSignalRpcResponse(res.text, res.status); if (parsed.error) { const code = parsed.error.code ?? "unknown"; const msg = parsed.error.message ?? "Signal RPC error"; @@ -111,15 +214,12 @@ export async function signalCheck( baseUrl: string, timeoutMs = DEFAULT_TIMEOUT_MS, ): Promise<{ ok: boolean; status?: number | null; error?: string | null }> { - const normalized = normalizeBaseUrl(baseUrl); try { - const res = await fetchWithTimeout( - `${normalized}/api/v1/check`, - { method: "GET" }, + const res = await requestSignalHttpText(resolveSignalEndpointUrl(baseUrl, "/api/v1/check"), { + method: "GET", timeoutMs, - getRequiredFetch(), - ); - if (!res.ok) { + }); + if (res.status < 200 || res.status >= 300) { return { ok: false, status: res.status, error: `HTTP ${res.status}` }; } return { ok: true, status: res.status, error: null }; @@ -132,35 +232,99 @@ export async function signalCheck( } } +function openSignalEventStream( + url: URL, + abortSignal?: AbortSignal, + timeoutMs = DEFAULT_TIMEOUT_MS, +): Promise<{ response: IncomingMessage; cleanup: () => void }> { + assertSignalHttpProtocol(url, "SSE"); + if (abortSignal?.aborted) { + throw createSignalSseAbortError(); + } + + const client = url.protocol === "https:" ? https : http; + return new Promise((resolve, reject) => { + let settled = false; + let response: IncomingMessage | undefined; + let onAbort: () => void = () => {}; + let request: ClientRequest; + const headerDeadline = setTimeout(() => { + const error = new Error(`Signal SSE connection timed out after ${timeoutMs}ms`); + response?.destroy(error); + request.destroy(error); + rejectOnce(error); + }, timeoutMs); + headerDeadline.unref?.(); + const cleanup = () => { + clearTimeout(headerDeadline); + abortSignal?.removeEventListener("abort", onAbort); + }; + const rejectOnce = (error: unknown) => { + if (settled) { + return; + } + settled = true; + cleanup(); + reject(error); + }; + request = client.request( + url, + { + method: "GET", + headers: { Accept: "text/event-stream" }, + }, + (res) => { + const status = res.statusCode ?? 0; + if (status < 200 || status >= 300) { + res.resume(); + rejectOnce(new Error(`Signal SSE failed (${status} ${res.statusMessage || "error"})`)); + return; + } + if (settled) { + res.destroy(); + return; + } + clearTimeout(headerDeadline); + settled = true; + response = res; + resolve({ response: res, cleanup }); + }, + ); + onAbort = () => { + const error = createSignalSseAbortError(); + response?.destroy(error); + request.destroy(error); + rejectOnce(error); + }; + + abortSignal?.addEventListener("abort", onAbort, { once: true }); + request.on("error", rejectOnce); + request.end(); + }); +} + export async function streamSignalEvents(params: { baseUrl: string; account?: string; abortSignal?: AbortSignal; + timeoutMs?: number; onEvent: (event: SignalSseEvent) => void; }): Promise { - const baseUrl = normalizeBaseUrl(params.baseUrl); - const url = new URL(`${baseUrl}/api/v1/events`); + const url = resolveSignalEndpointUrl(params.baseUrl, "/api/v1/events"); if (params.account) { url.searchParams.set("account", params.account); } - const fetchImpl = resolveFetch(); - if (!fetchImpl) { - throw new Error("fetch is not available"); - } - const res = await fetchImpl(url, { - method: "GET", - headers: { Accept: "text/event-stream" }, - signal: params.abortSignal, - }); - if (!res.ok || !res.body) { - throw new Error(`Signal SSE failed (${res.status} ${res.statusText || "error"})`); - } - - const reader = res.body.getReader(); + const { response, cleanup } = await openSignalEventStream( + url, + params.abortSignal, + params.timeoutMs ?? DEFAULT_TIMEOUT_MS, + ); const decoder = new TextDecoder(); let buffer = ""; + let bufferedBytes = 0; let currentEvent: SignalSseEvent = {}; + let currentEventDataBytes = 0; const flushEvent = () => { if (!currentEvent.data && !currentEvent.event && !currentEvent.id) { @@ -172,14 +336,36 @@ export async function streamSignalEvents(params: { id: currentEvent.id, }); currentEvent = {}; + currentEventDataBytes = 0; }; - while (true) { - const { value, done } = await reader.read(); - if (done) { - break; + const processLine = (line: string) => { + if (line === "") { + flushEvent(); + return; } - buffer += decoder.decode(value, { stream: true }); + if (line.startsWith(":")) { + return; + } + const [rawField, ...rest] = line.split(":"); + const field = rawField.trim(); + const rawValue = rest.join(":"); + const value = rawValue.startsWith(" ") ? rawValue.slice(1) : rawValue; + if (field === "event") { + currentEvent.event = value; + } else if (field === "data") { + const segment = currentEvent.data ? `\n${value}` : value; + currentEventDataBytes += Buffer.byteLength(segment, "utf8"); + if (currentEventDataBytes > MAX_SIGNAL_SSE_EVENT_DATA_BYTES) { + throw new Error("Signal SSE event data exceeded size limit"); + } + currentEvent.data = currentEvent.data ? `${currentEvent.data}${segment}` : segment; + } else if (field === "id") { + currentEvent.id = value; + } + }; + + const drainCompleteLines = () => { let lineEnd = buffer.indexOf("\n"); while (lineEnd !== -1) { let line = buffer.slice(0, lineEnd); @@ -187,29 +373,33 @@ export async function streamSignalEvents(params: { if (line.endsWith("\r")) { line = line.slice(0, -1); } - - if (line === "") { - flushEvent(); - lineEnd = buffer.indexOf("\n"); - continue; - } - if (line.startsWith(":")) { - lineEnd = buffer.indexOf("\n"); - continue; - } - const [rawField, ...rest] = line.split(":"); - const field = rawField.trim(); - const rawValue = rest.join(":"); - const value = rawValue.startsWith(" ") ? rawValue.slice(1) : rawValue; - if (field === "event") { - currentEvent.event = value; - } else if (field === "data") { - currentEvent.data = currentEvent.data ? `${currentEvent.data}\n${value}` : value; - } else if (field === "id") { - currentEvent.id = value; - } + processLine(line); lineEnd = buffer.indexOf("\n"); } + bufferedBytes = Buffer.byteLength(buffer, "utf8"); + }; + + try { + for await (const chunk of response as AsyncIterable) { + const value = typeof chunk === "string" ? Buffer.from(chunk) : chunk; + bufferedBytes += value.byteLength; + if (bufferedBytes > MAX_SIGNAL_SSE_BUFFER_BYTES) { + throw new Error("Signal SSE buffer exceeded size limit"); + } + buffer += decoder.decode(value, { stream: true }); + drainCompleteLines(); + } + const tail = decoder.decode(); + if (tail) { + buffer += tail; + bufferedBytes = Buffer.byteLength(buffer, "utf8"); + } + if (bufferedBytes > MAX_SIGNAL_SSE_BUFFER_BYTES) { + throw new Error("Signal SSE buffer exceeded size limit"); + } + drainCompleteLines(); + } finally { + cleanup(); } flushEvent(); diff --git a/package.json b/package.json index ceff5ef7824..1a83d946cfc 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "!dist/qa-runtime-*.js", "docs/", "!docs/.generated/**", + "patches/", "skills/", "scripts/npm-runner.mjs", "scripts/preinstall-package-manager-warning.mjs", diff --git a/scripts/watch-node.d.mts b/scripts/watch-node.d.mts index 362670826a6..88b13600663 100644 --- a/scripts/watch-node.d.mts +++ b/scripts/watch-node.d.mts @@ -17,6 +17,18 @@ export function runWatchMain(params?: { on: (event: "add" | "change" | "unlink" | "error", cb: (arg?: unknown) => void) => void; close?: () => Promise | void; }; + loadChokidar?: () => Promise<{ + watch: ( + paths: string[], + options: { + ignoreInitial: boolean; + ignored: (watchPath: string) => boolean; + }, + ) => { + on: (event: "add" | "change" | "unlink" | "error", cb: (arg?: unknown) => void) => void; + close?: () => Promise | void; + }; + }>; watchPaths?: string[]; process?: NodeJS.Process; cwd?: string; diff --git a/scripts/watch-node.mjs b/scripts/watch-node.mjs index eecda3bf0b0..edefe2e7930 100644 --- a/scripts/watch-node.mjs +++ b/scripts/watch-node.mjs @@ -5,7 +5,6 @@ import fs from "node:fs"; import path from "node:path"; import process from "node:process"; import { pathToFileURL } from "node:url"; -import chokidar from "chokidar"; import { isRestartRelevantRunNodePath, runNodeWatchedPaths } from "./run-node.mjs"; const WATCH_NODE_RUNNER = "scripts/run-node.mjs"; @@ -120,6 +119,39 @@ const logWatcher = (message, deps) => { deps.process.stderr?.write?.(`[openclaw] ${message}\n`); }; +const isInvalidPackageConfigError = (err) => err?.code === "ERR_INVALID_PACKAGE_CONFIG"; + +const extractInvalidPackageConfigPath = (err) => { + const message = String(err?.message ?? ""); + const match = message.match(/Invalid package config (.+?) while importing /); + return match?.[1] ?? null; +}; + +const printFriendlyWatchStartupError = (err) => { + const packageConfigPath = extractInvalidPackageConfigPath(err); + + console.error(""); + console.error( + "[openclaw] gateway:watch could not start because a dependency package config looks corrupted.", + ); + if (packageConfigPath) { + console.error(`[openclaw] Invalid package config: ${packageConfigPath}`); + } + console.error("[openclaw] This usually means a file in node_modules is empty or truncated."); + console.error("[openclaw] Recommended recovery:"); + console.error("[openclaw] rm -rf node_modules"); + console.error("[openclaw] pnpm store prune"); + console.error("[openclaw] pnpm install"); + console.error(""); + console.error("[openclaw] Original error:"); + console.error(err); +}; + +const loadChokidar = async () => { + const mod = await import("chokidar"); + return mod.default ?? mod; +}; + const waitForWatcherRelease = async (lockPath, pid, deps) => { const deadline = deps.now() + WATCH_LOCK_WAIT_MS; while (deps.now() < deadline) { @@ -212,6 +244,19 @@ const releaseWatchLock = (lockHandle) => { * }} [params] */ export async function runWatchMain(params = {}) { + let createWatcher = params.createWatcher; + if (!createWatcher) { + try { + const chokidarModule = await (params.loadChokidar ?? loadChokidar)(); + createWatcher = (watchPaths, options) => chokidarModule.watch(watchPaths, options); + } catch (err) { + if (isInvalidPackageConfigError(err)) { + printFriendlyWatchStartupError(err); + } + throw err; + } + } + const deps = { spawn: params.spawn ?? spawn, process: params.process ?? process, @@ -222,8 +267,7 @@ export async function runWatchMain(params = {}) { sleep: params.sleep ?? sleep, signalProcess: params.signalProcess ?? ((pid, signal) => process.kill(pid, signal)), lockDisabled: params.lockDisabled === true, - createWatcher: - params.createWatcher ?? ((watchPaths, options) => chokidar.watch(watchPaths, options)), + createWatcher, watchPaths: params.watchPaths ?? runNodeWatchedPaths, }; @@ -363,7 +407,9 @@ if (import.meta.url === pathToFileURL(process.argv[1] ?? "").href) { void runWatchMain() .then((code) => process.exit(code)) .catch((err) => { - console.error(err); + if (!isInvalidPackageConfigError(err)) { + console.error(err); + } process.exit(1); }); } diff --git a/src/agents/skills-install.test.ts b/src/agents/skills-install.test.ts index 9db7145a74c..dda90668539 100644 --- a/src/agents/skills-install.test.ts +++ b/src/agents/skills-install.test.ts @@ -122,11 +122,18 @@ async function withWorkspaceCase( describe("installSkill code safety scanning", () => { beforeEach(() => { resetGlobalHookRunner(); - skillsInstallTesting.setDepsForTest({ - loadWorkspaceSkillEntries: loadTestWorkspaceSkillEntries, - }); runCommandWithTimeoutMock.mockClear(); scanDirectoryWithSummaryMock.mockClear(); + skillsInstallTesting.setDepsForTest({ + loadWorkspaceSkillEntries: loadTestWorkspaceSkillEntries, + resolveNodeInstallStateDir: () => { + const stateDir = process.env.OPENCLAW_STATE_DIR; + if (!stateDir) { + throw new Error("OPENCLAW_STATE_DIR missing in skills install test"); + } + return stateDir; + }, + }); runCommandWithTimeoutMock.mockResolvedValue({ code: 0, stdout: "ok", @@ -187,6 +194,60 @@ describe("installSkill code safety scanning", () => { }); }); + it("runs npm node installs with an OpenClaw-managed user prefix", async () => { + await withWorkspaceCase(async ({ workspaceDir, stateDir }) => { + await writeInstallableSkill(workspaceDir, "node-prefix-skill"); + + const result = await installSkill({ + workspaceDir, + skillName: "node-prefix-skill", + installId: "deps", + }); + + expect(result.ok).toBe(true); + const npmPrefix = path.join(stateDir, "tools", "node", "npm"); + const call = runCommandWithTimeoutMock.mock.calls.at(-1); + expect(call?.[0]).toEqual(["npm", "install", "-g", "--ignore-scripts", "example-package"]); + const options = call?.[1] as { env?: NodeJS.ProcessEnv }; + expect(options.env).toMatchObject({ + NPM_CONFIG_PREFIX: npmPrefix, + npm_config_prefix: npmPrefix, + }); + expect(options.env).not.toHaveProperty("PATH"); + const stat = await fs.stat(npmPrefix); + expect(stat.isDirectory()).toBe(true); + }); + }); + + it("keeps the default npm prefix out of env-overridden state paths", () => { + const envSnapshot = captureEnv(["OPENCLAW_STATE_DIR", "OPENCLAW_CONFIG_PATH"]); + try { + process.env.OPENCLAW_STATE_DIR = "/tmp/untrusted-state"; + process.env.OPENCLAW_CONFIG_PATH = "/tmp/untrusted-config/openclaw.json"; + + expect( + skillsInstallTesting.resolveDefaultNodeInstallStateDir({ + getuid: () => 501, + homedir: () => "/Users/tester", + platform: "darwin", + }), + ).toBe("/Users/tester/.openclaw"); + } finally { + envSnapshot.restore(); + } + }); + + it("uses a fixed system state root for root npm installs", () => { + expect( + skillsInstallTesting.resolveDefaultNodeInstallStateDir({ + cwd: "/workspace/openclaw", + getuid: () => 0, + homedir: () => "/root", + platform: "linux", + }), + ).toBe("/var/lib/openclaw"); + }); + it("blocks install when skill scan fails", async () => { await withWorkspaceCase(async ({ workspaceDir }) => { await writeInstallableSkill(workspaceDir, "scanfail-skill"); diff --git a/src/agents/skills-install.ts b/src/agents/skills-install.ts index a55f865a7c6..59dadb3cc0a 100644 --- a/src/agents/skills-install.ts +++ b/src/agents/skills-install.ts @@ -1,4 +1,5 @@ import fs from "node:fs"; +import os from "node:os"; import path from "node:path"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { resolveBrewExecutable as defaultResolveBrewExecutable } from "../infra/brew.js"; @@ -35,6 +36,7 @@ export type { SkillInstallResult } from "./skills-install.types.js"; type SkillsInstallDeps = { hasBinary: (bin: string) => boolean; loadWorkspaceSkillEntries: typeof defaultLoadWorkspaceSkillEntries; + resolveNodeInstallStateDir: () => string; resolveBrewExecutable: () => string | undefined; resolveSkillsInstallPreferences: typeof defaultResolveSkillsInstallPreferences; }; @@ -42,6 +44,7 @@ type SkillsInstallDeps = { const defaultSkillsInstallDeps: SkillsInstallDeps = { hasBinary: defaultHasBinary, loadWorkspaceSkillEntries: defaultLoadWorkspaceSkillEntries, + resolveNodeInstallStateDir: resolveDefaultNodeInstallStateDir, resolveBrewExecutable: defaultResolveBrewExecutable, resolveSkillsInstallPreferences: defaultResolveSkillsInstallPreferences, }; @@ -107,6 +110,37 @@ function buildNodeInstallCommand(packageName: string, prefs: SkillsInstallPrefer } } +function resolveDefaultNodeInstallStateDir({ + cwd = process.cwd(), + getuid = process.getuid?.bind(process), + homedir = os.homedir, + platform = process.platform, +}: { + cwd?: string; + getuid?: () => number; + homedir?: () => string; + platform?: NodeJS.Platform; +} = {}): string { + if (platform !== "win32" && getuid?.() === 0) { + return path.join(path.parse(cwd).root, "var", "lib", "openclaw"); + } + return path.join(homedir(), ".openclaw"); +} + +async function buildNodeInstallEnv(prefs: SkillsInstallPreferences): Promise { + if (prefs.nodeManager !== "npm") { + return {}; + } + + const stateDir = getSkillsInstallDeps().resolveNodeInstallStateDir(); + const prefix = path.join(stateDir, "tools", "node", "npm"); + await fs.promises.mkdir(prefix, { recursive: true, mode: 0o700 }); + return { + NPM_CONFIG_PREFIX: prefix, + npm_config_prefix: prefix, + }; +} + // Strict allowlist patterns to prevent option injection and malicious package names. const SAFE_BREW_FORMULA = /^[a-z0-9][a-z0-9+._@-]*(\/[a-z0-9][a-z0-9+._@-]*){0,2}$/; const SAFE_NODE_PACKAGE = /^(@[a-z0-9._-]+\/)?[a-z0-9._-]+(@[a-z0-9^~>=<.*|-]+)?$/; @@ -524,6 +558,9 @@ export async function installSkill(params: SkillInstallRequest): Promise): void { skillsInstallDeps = { ...defaultSkillsInstallDeps, diff --git a/src/channels/plugins/bundled.shape-guard.test.ts b/src/channels/plugins/bundled.shape-guard.test.ts index 8823666bd72..fcba5bf999d 100644 --- a/src/channels/plugins/bundled.shape-guard.test.ts +++ b/src/channels/plugins/bundled.shape-guard.test.ts @@ -731,6 +731,53 @@ describe("bundled channel entry shape guards", () => { } }); + it("caches undefined bundled plugin loads as unavailable", async () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-bundled-null-load-")); + const previousBundledPluginsDir = process.env.OPENCLAW_BUNDLED_PLUGINS_DIR; + const pluginDir = path.join(root, "dist", "extensions", "alpha"); + const testGlobal = globalThis as typeof globalThis & { + __bundledPluginUndefinedLoads?: number; + }; + fs.mkdirSync(pluginDir, { recursive: true }); + fs.writeFileSync( + path.join(pluginDir, "index.js"), + [ + "export default {", + " kind: 'bundled-channel-entry',", + " id: 'alpha',", + " name: 'Alpha',", + " description: 'Alpha',", + " register() {},", + " loadChannelPlugin() {", + " globalThis.__bundledPluginUndefinedLoads = (globalThis.__bundledPluginUndefinedLoads ?? 0) + 1;", + " return undefined;", + " },", + "};", + "", + ].join("\n"), + "utf8", + ); + + mockAlphaDistExtensionRuntime(); + + try { + process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = path.join(root, "dist", "extensions"); + + const bundled = await importFreshModule( + import.meta.url, + "./bundled.js?scope=bundled-undefined-load", + ); + + expect(bundled.getBundledChannelPlugin("alpha")).toBeUndefined(); + expect(bundled.getBundledChannelPlugin("alpha")).toBeUndefined(); + expect(testGlobal.__bundledPluginUndefinedLoads).toBe(1); + } finally { + restoreBundledPluginsDir(previousBundledPluginsDir); + fs.rmSync(root, { recursive: true, force: true }); + delete testGlobal.__bundledPluginUndefinedLoads; + } + }); + it("keeps channel entrypoints on the dedicated entry-contract SDK surface", () => { const offenders = collectBundledChannelEntrypointOffenders( bundledPluginRoots, diff --git a/src/channels/plugins/bundled.ts b/src/channels/plugins/bundled.ts index 933a5d0d9f6..80b6eb51b4e 100644 --- a/src/channels/plugins/bundled.ts +++ b/src/channels/plugins/bundled.ts @@ -545,7 +545,11 @@ function getBundledChannelPluginForRoot( cacheContext.pluginLoadInProgressIds.add(id); try { const metadata = resolveBundledChannelMetadata(id, rootScope); - const plugin = entry.loadChannelPlugin(); + const plugin = entry.loadChannelPlugin() as ChannelPlugin | undefined; + if (!plugin) { + cacheContext.lazyPluginsById.set(id, null); + return undefined; + } const normalizedPlugin = { ...plugin, meta: normalizeChannelMeta({ diff --git a/src/channels/plugins/read-only.ts b/src/channels/plugins/read-only.ts index 98149b8bc81..f841a7b6163 100644 --- a/src/channels/plugins/read-only.ts +++ b/src/channels/plugins/read-only.ts @@ -11,7 +11,6 @@ import { getCachedPluginJitiLoader, type PluginJitiLoaderCache, } from "../../plugins/jiti-loader-cache.js"; -import type { loadOpenClawPlugins as loadOpenClawPluginsType } from "../../plugins/loader.js"; import type { PluginManifestRecord } from "../../plugins/manifest-registry.js"; import { loadPluginManifestRegistryForPluginRegistry } from "../../plugins/plugin-registry.js"; import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "../../routing/session-key.js"; @@ -33,7 +32,23 @@ const LOADER_MODULE_CANDIDATES = [ const jitiLoaders: PluginJitiLoaderCache = new Map(); type PluginLoaderModule = { - loadOpenClawPlugins: typeof loadOpenClawPluginsType; + loadOpenClawPlugins: (params: { + config: OpenClawConfig; + activationSourceConfig?: OpenClawConfig; + env?: NodeJS.ProcessEnv; + workspaceDir?: string; + cache?: boolean; + activate?: boolean; + includeSetupOnlyChannelPlugins?: boolean; + forceSetupOnlyChannelPlugins?: boolean; + requireSetupEntryForSetupOnlyChannelPlugins?: boolean; + onlyPluginIds?: readonly string[]; + }) => { + channelSetups: Iterable<{ + pluginId: string; + plugin: ChannelPlugin; + }>; + }; }; let pluginLoaderModule: PluginLoaderModule | undefined; diff --git a/src/commands/doctor-security.test.ts b/src/commands/doctor-security.test.ts index 92af2432c2c..24fd49db866 100644 --- a/src/commands/doctor-security.test.ts +++ b/src/commands/doctor-security.test.ts @@ -12,10 +12,6 @@ vi.mock("../terminal/note.js", () => ({ note, })); -vi.mock("../channels/plugins/index.js", () => ({ - listChannelPlugins: () => pluginRegistry.list, -})); - vi.mock("../channels/plugins/read-only.js", () => ({ listReadOnlyChannelPluginsForConfig: listReadOnlyChannelPluginsForConfigMock, })); diff --git a/src/dockerfile.test.ts b/src/dockerfile.test.ts index 4f97b724a61..b6f0f0e5956 100644 --- a/src/dockerfile.test.ts +++ b/src/dockerfile.test.ts @@ -67,6 +67,9 @@ describe("Dockerfile", () => { expect(dockerfile).toContain( "COPY --from=runtime-assets --chown=node:node /app/node_modules ./node_modules", ); + expect(dockerfile).toContain( + "COPY --from=runtime-assets --chown=node:node /app/patches ./patches", + ); }); it("keeps package manager patch files in runtime images", async () => { diff --git a/src/infra/watch-node.test.ts b/src/infra/watch-node.test.ts index 83adf8a4c9c..47427154afb 100644 --- a/src/infra/watch-node.test.ts +++ b/src/infra/watch-node.test.ts @@ -345,6 +345,69 @@ describe("watch-node script", () => { expect(watcher.close).toHaveBeenCalledTimes(1); }); + it("prints recovery guidance when chokidar fails with invalid package config", async () => { + const error = Object.assign( + new Error( + 'Invalid package config /tmp/openclaw/.pnpm/chokidar/package.json while importing "chokidar" from /tmp/openclaw/scripts/watch-node.mjs.', + ), + { code: "ERR_INVALID_PACKAGE_CONFIG" }, + ); + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + try { + await expect( + runWatch({ + args: ["gateway", "--force"], + cwd: "/tmp/openclaw", + loadChokidar: vi.fn(async () => { + throw error; + }), + process: createFakeProcess(), + }), + ).rejects.toBe(error); + + expect(errorSpy.mock.calls).toEqual([ + [""], + [ + "[openclaw] gateway:watch could not start because a dependency package config looks corrupted.", + ], + ["[openclaw] Invalid package config: /tmp/openclaw/.pnpm/chokidar/package.json"], + ["[openclaw] This usually means a file in node_modules is empty or truncated."], + ["[openclaw] Recommended recovery:"], + ["[openclaw] rm -rf node_modules"], + ["[openclaw] pnpm store prune"], + ["[openclaw] pnpm install"], + [""], + ["[openclaw] Original error:"], + [error], + ]); + } finally { + errorSpy.mockRestore(); + } + }); + + it("does not log non-package-config chokidar import errors before rethrowing", async () => { + const error = Object.assign(new Error("Cannot find package 'chokidar'"), { + code: "ERR_MODULE_NOT_FOUND", + }); + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + try { + await expect( + runWatch({ + loadChokidar: vi.fn(async () => { + throw error; + }), + process: createFakeProcess(), + }), + ).rejects.toBe(error); + + expect(errorSpy).not.toHaveBeenCalled(); + } finally { + errorSpy.mockRestore(); + } + }); + it("replaces an existing watcher lock holder before starting", async () => { const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); await withTempDir({ prefix: "openclaw-watch-node-lock-" }, async (cwd) => {