From 6d3ce088da933f7aef88b5cf57a8351ecd1b1fbb Mon Sep 17 00:00:00 2001 From: Agustin Rivera <31522568+eleqtrizit@users.noreply.github.com> Date: Mon, 20 Apr 2026 14:24:34 -0700 Subject: [PATCH] fix(gateway): require read scope for chat websocket broadcasts (#69373) * fix(gateway): guard chat-class websocket broadcasts * fix(gateway): harden broadcast event scope guards * fix(gateway): keep websocket seq per recipient * fix(gateway): let nodes receive voicewake broadcasts * fix(gateway): preserve seq gaps for dropped broadcasts * fix(gateway): drop USER.md worklog from PR * fix(gateway): add scope guard docstring for pairing exclusion * fix(gateway): allow plugin.* broadcast events for write/admin scopes - Plugin-defined gateway broadcast events (plugin.* namespace) are now delivered to operator.write and operator.admin scoped clients - This preserves the ability for plugins to broadcast custom events through context.broadcast() without requiring explicit enumeration - Explicit plugin.* entries in EVENT_SCOPE_GUARDS take precedence (e.g., plugin.approval.* uses APPROVALS_SCOPE) * docs(changelog): note chat broadcast read-scope gating (#69373) --------- Co-authored-by: Devin Robison --- CHANGELOG.md | 1 + src/gateway/gateway-misc.test.ts | 328 +++++++++++++++++++++++++++++++ src/gateway/server-broadcast.ts | 63 ++++-- 3 files changed, 381 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3f05a74534..6c36fa96551 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Security/dotenv: block all `OPENCLAW_*` keys from untrusted workspace `.env` files so workspace-local env loading fails closed for new runtime-control variables instead of silently inheriting them. (#473) - Gateway/device pairing: restrict non-admin paired-device sessions (device-token auth) to their own pairing list, approve, and reject actions so a paired device cannot enumerate other devices or approve/reject pairing requests authored by another device. Admin and shared-secret operator sessions retain full visibility. (#69375) Thanks @eleqtrizit. - Agents/gateway tool: extend the agent-facing `gateway` tool's config mutation guard so model-driven `config.patch` and `config.apply` cannot rewrite operator-trusted paths (sandbox, plugin trust, gateway auth/TLS, hook routing and tokens, SSRF policy, MCP servers, workspace filesystem hardening) and cannot bypass the guard by editing per-agent sandbox, tools, or embedded-Pi overrides in place under `agents.list[]`. (#69377) Thanks @eleqtrizit. +- Gateway/websocket broadcasts: require `operator.read` (or higher) for chat, agent, and tool-result event frames so pairing-scoped and node-role sessions no longer passively receive session chat content, and scope-gate unknown broadcast events by default. Plugin-defined `plugin.*` broadcasts are scoped to operator.write/admin, and status/transport events (`heartbeat`, `presence`, `tick`, etc.) remain unrestricted. Per-client sequence numbers preserve per-connection monotonicity. (#69373) Thanks @eleqtrizit. ## 2026.4.20 diff --git a/src/gateway/gateway-misc.test.ts b/src/gateway/gateway-misc.test.ts index ad8f480fb93..381556bf8ea 100644 --- a/src/gateway/gateway-misc.test.ts +++ b/src/gateway/gateway-misc.test.ts @@ -167,6 +167,29 @@ type TestSocket = { close: (code: number, reason: string) => void; }; +type EventFrame = { + type: "event"; + event: string; + payload?: unknown; + seq?: number; +}; + +type RecordingSocket = TestSocket & { + sent: EventFrame[]; +}; + +function makeRecordingSocket(): RecordingSocket { + const sent: EventFrame[] = []; + return { + bufferedAmount: 0, + send: vi.fn((payload: string) => { + sent.push(JSON.parse(payload) as EventFrame); + }), + close: vi.fn(), + sent, + }; +} + describe("gateway broadcaster", () => { it("filters approval and pairing events by scope", () => { const approvalsSocket: TestSocket = { @@ -220,6 +243,311 @@ describe("gateway broadcaster", () => { expect(approvalsSocket.send).toHaveBeenCalledTimes(1); expect(pairingSocket.send).toHaveBeenCalledTimes(1); }); + + it("requires operator.read for chat-class broadcast events", () => { + const pairingSocket = makeRecordingSocket(); + const nodeSocket = makeRecordingSocket(); + const readSocket = makeRecordingSocket(); + const writeSocket = makeRecordingSocket(); + const adminSocket = makeRecordingSocket(); + + const clients = new Set([ + { + socket: pairingSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"], + connId: "c-pairing", + usesSharedGatewayAuth: false, + }, + { + socket: nodeSocket as unknown as GatewayWsClient["socket"], + connect: { role: "node", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-node", + usesSharedGatewayAuth: false, + }, + { + socket: readSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-read", + usesSharedGatewayAuth: false, + }, + { + socket: writeSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.write"] } as GatewayWsClient["connect"], + connId: "c-write", + usesSharedGatewayAuth: false, + }, + { + socket: adminSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.admin"] } as GatewayWsClient["connect"], + connId: "c-admin", + usesSharedGatewayAuth: false, + }, + ]); + + const { broadcast } = createGatewayBroadcaster({ clients }); + + broadcast("chat", { sessionKey: "agent:main:main", message: "secret" }); + broadcast("agent", { type: "status", sessionKey: "agent:main:main" }); + broadcast("chat.side_result", { sessionKey: "agent:main:main", text: "tool output" }); + + expect(pairingSocket.send).not.toHaveBeenCalled(); + expect(nodeSocket.send).not.toHaveBeenCalled(); + expect(readSocket.send).toHaveBeenCalledTimes(3); + expect(writeSocket.send).toHaveBeenCalledTimes(3); + expect(adminSocket.send).toHaveBeenCalledTimes(3); + expect(readSocket.sent.map((frame) => frame.event)).toEqual([ + "chat", + "agent", + "chat.side_result", + ]); + expect(writeSocket.sent.map((frame) => frame.event)).toEqual([ + "chat", + "agent", + "chat.side_result", + ]); + expect(adminSocket.sent.map((frame) => frame.event)).toEqual([ + "chat", + "agent", + "chat.side_result", + ]); + }); + + it("allows plugin.* broadcast events for operator.write and operator.admin", () => { + const pairingSocket = makeRecordingSocket(); + const nodeSocket = makeRecordingSocket(); + const readSocket = makeRecordingSocket(); + const writeSocket = makeRecordingSocket(); + const adminSocket = makeRecordingSocket(); + + const clients = new Set([ + { + socket: pairingSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"], + connId: "c-pairing", + usesSharedGatewayAuth: false, + }, + { + socket: nodeSocket as unknown as GatewayWsClient["socket"], + connect: { role: "node", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-node", + usesSharedGatewayAuth: false, + }, + { + socket: readSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-read", + usesSharedGatewayAuth: false, + }, + { + socket: writeSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.write"] } as GatewayWsClient["connect"], + connId: "c-write", + usesSharedGatewayAuth: false, + }, + { + socket: adminSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.admin"] } as GatewayWsClient["connect"], + connId: "c-admin", + usesSharedGatewayAuth: false, + }, + ]); + + const { broadcast } = createGatewayBroadcaster({ clients }); + + broadcast("plugin.myplugin.custom", { data: "test" }); + broadcast("plugin.otherplugin.state", { state: "updated" }); + + expect(pairingSocket.send).not.toHaveBeenCalled(); + expect(nodeSocket.send).not.toHaveBeenCalled(); + expect(readSocket.send).not.toHaveBeenCalled(); + expect(writeSocket.send).toHaveBeenCalledTimes(2); + expect(adminSocket.send).toHaveBeenCalledTimes(2); + expect(writeSocket.sent.map((frame) => frame.event)).toEqual([ + "plugin.myplugin.custom", + "plugin.otherplugin.state", + ]); + expect(adminSocket.sent.map((frame) => frame.event)).toEqual([ + "plugin.myplugin.custom", + "plugin.otherplugin.state", + ]); + }); + + it("defaults unknown events to deny and classifies remaining gateway broadcast events", () => { + const pairingSocket = makeRecordingSocket(); + const nodeSocket = makeRecordingSocket(); + const readSocket = makeRecordingSocket(); + const writeSocket = makeRecordingSocket(); + const adminSocket = makeRecordingSocket(); + + const clients = new Set([ + { + socket: pairingSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"], + connId: "c-pairing", + usesSharedGatewayAuth: false, + }, + { + socket: nodeSocket as unknown as GatewayWsClient["socket"], + connect: { role: "node", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-node", + usesSharedGatewayAuth: false, + }, + { + socket: readSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-read", + usesSharedGatewayAuth: false, + }, + { + socket: writeSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.write"] } as GatewayWsClient["connect"], + connId: "c-write", + usesSharedGatewayAuth: false, + }, + { + socket: adminSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.admin"] } as GatewayWsClient["connect"], + connId: "c-admin", + usesSharedGatewayAuth: false, + }, + ]); + + const { broadcast } = createGatewayBroadcaster({ clients }); + + broadcast("cron", { jobId: "job-1" }); + broadcast("talk.mode", { enabled: true }); + broadcast("voicewake.changed", { triggers: ["hello"] }); + broadcast("heartbeat", { ts: 1 }); + broadcast("presence", { presence: [] }); + broadcast("health", { ok: true }); + broadcast("tick", { ts: 2 }); + broadcast("shutdown", { reason: "restart" }); + broadcast("update.available", { updateAvailable: { version: "2026.4.20" } }); + broadcast("unknown.future.event", { hidden: true }); + + expect(pairingSocket.sent.map((frame) => frame.event)).toEqual([ + "heartbeat", + "presence", + "health", + "tick", + "shutdown", + "update.available", + ]); + expect(nodeSocket.sent.map((frame) => frame.event)).toEqual([ + "voicewake.changed", + "heartbeat", + "presence", + "health", + "tick", + "shutdown", + "update.available", + ]); + expect(readSocket.sent.map((frame) => frame.event)).toEqual([ + "cron", + "voicewake.changed", + "heartbeat", + "presence", + "health", + "tick", + "shutdown", + "update.available", + ]); + expect(writeSocket.sent.map((frame) => frame.event)).toEqual([ + "cron", + "talk.mode", + "voicewake.changed", + "heartbeat", + "presence", + "health", + "tick", + "shutdown", + "update.available", + ]); + expect(adminSocket.sent.map((frame) => frame.event)).toEqual([ + "cron", + "talk.mode", + "voicewake.changed", + "heartbeat", + "presence", + "health", + "tick", + "shutdown", + "update.available", + ]); + }); + + it("keeps event seq contiguous per receiving client when scoped events are filtered", () => { + const pairingSocket = makeRecordingSocket(); + const readSocket = makeRecordingSocket(); + + const clients = new Set([ + { + socket: pairingSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"], + connId: "c-pairing", + usesSharedGatewayAuth: false, + }, + { + socket: readSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-read", + usesSharedGatewayAuth: false, + }, + ]); + + const { broadcast } = createGatewayBroadcaster({ clients }); + + broadcast("chat", { sessionKey: "agent:main:main", message: "secret" }); + broadcast("heartbeat", { ts: 1 }); + broadcast("chat.side_result", { sessionKey: "agent:main:main", text: "tool output" }); + broadcast("tick", { ts: 2 }); + + expect(pairingSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([ + ["heartbeat", 1], + ["tick", 2], + ]); + expect(readSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([ + ["chat", 1], + ["heartbeat", 2], + ["chat.side_result", 3], + ["tick", 4], + ]); + }); + + it("preserves seq gaps when dropIfSlow skips an eligible broadcast", () => { + const slowReadSocket = makeRecordingSocket(); + slowReadSocket.bufferedAmount = Number.MAX_SAFE_INTEGER; + const readSocket = makeRecordingSocket(); + + const clients = new Set([ + { + socket: slowReadSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-slow-read", + usesSharedGatewayAuth: false, + }, + { + socket: readSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-read", + usesSharedGatewayAuth: false, + }, + ]); + + const { broadcast } = createGatewayBroadcaster({ clients }); + + broadcast("chat", { sessionKey: "agent:main:main", message: "secret" }, { dropIfSlow: true }); + slowReadSocket.bufferedAmount = 0; + broadcast("heartbeat", { ts: 1 }); + + expect(slowReadSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([ + ["heartbeat", 2], + ]); + expect(readSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([ + ["chat", 1], + ["heartbeat", 2], + ]); + }); }); describe("chat run registry", () => { diff --git a/src/gateway/server-broadcast.ts b/src/gateway/server-broadcast.ts index c286575dac9..2b1fa05f842 100644 --- a/src/gateway/server-broadcast.ts +++ b/src/gateway/server-broadcast.ts @@ -15,11 +15,26 @@ import { MAX_BUFFERED_BYTES } from "./server-constants.js"; import type { GatewayWsClient } from "./server/ws-types.js"; import { logWs, shouldLogWs, summarizeAgentEventForWsLog } from "./ws-log.js"; +// Pairing scope is for device-pairing handshakes only; chat transcript events +// require operator-level session access. Pairing-scoped and node-role clients +// must not passively receive chat-class broadcasts. const EVENT_SCOPE_GUARDS: Record = { + agent: [READ_SCOPE], + chat: [READ_SCOPE], + "chat.side_result": [READ_SCOPE], + cron: [READ_SCOPE], + health: [], "exec.approval.requested": [APPROVALS_SCOPE], "exec.approval.resolved": [APPROVALS_SCOPE], + heartbeat: [], "plugin.approval.requested": [APPROVALS_SCOPE], "plugin.approval.resolved": [APPROVALS_SCOPE], + presence: [], + shutdown: [], + tick: [], + "talk.mode": [WRITE_SCOPE], + "update.available": [], + "voicewake.changed": [READ_SCOPE], "device.pair.requested": [PAIRING_SCOPE], "device.pair.resolved": [PAIRING_SCOPE], "node.pair.requested": [PAIRING_SCOPE], @@ -29,6 +44,11 @@ const EVENT_SCOPE_GUARDS: Record = { "session.tool": [READ_SCOPE], }; +// Events that node-role sessions must receive even when the event's operator +// scope would otherwise reject non-operator roles. Nodes act on these updates +// (e.g. reconfiguring wake-word triggers). +const NODE_ALLOWED_EVENTS = new Set(["voicewake.changed"]); + export type { GatewayBroadcastFn, GatewayBroadcastOpts, @@ -38,12 +58,26 @@ export type { function hasEventScope(client: GatewayWsClient, event: string): boolean { const required = EVENT_SCOPE_GUARDS[event]; + // Plugin-defined gateway broadcast events (plugin.* namespace) are allowed + // for operator.write and operator.admin scopes. Explicit plugin.* entries + // in EVENT_SCOPE_GUARDS take precedence (e.g., plugin.approval.*). + if (!required && event.startsWith("plugin.")) { + const role = client.connect.role ?? "operator"; + if (role !== "operator") { + return false; + } + const scopes = Array.isArray(client.connect.scopes) ? client.connect.scopes : []; + return scopes.includes(WRITE_SCOPE) || scopes.includes(ADMIN_SCOPE); + } if (!required) { + return false; + } + if (required.length === 0) { return true; } const role = client.connect.role ?? "operator"; if (role !== "operator") { - return false; + return role === "node" && NODE_ALLOWED_EVENTS.has(event); } const scopes = Array.isArray(client.connect.scopes) ? client.connect.scopes : []; if (scopes.includes(ADMIN_SCOPE)) { @@ -56,7 +90,7 @@ function hasEventScope(client: GatewayWsClient, event: string): boolean { } export function createGatewayBroadcaster(params: { clients: Set }) { - let seq = 0; + const clientSeq = new WeakMap(); const broadcastInternal = ( event: string, @@ -68,18 +102,10 @@ export function createGatewayBroadcaster(params: { clients: Set return; } const isTargeted = Boolean(targetConnIds); - const eventSeq = isTargeted ? undefined : ++seq; - const frame = JSON.stringify({ - type: "event", - event, - payload, - seq: eventSeq, - stateVersion: opts?.stateVersion, - }); if (shouldLogWs()) { const logMeta: Record = { event, - seq: eventSeq ?? "targeted", + seq: isTargeted ? "targeted" : "per-client", clients: params.clients.size, targets: targetConnIds ? targetConnIds.size : undefined, dropIfSlow: opts?.dropIfSlow, @@ -98,8 +124,12 @@ export function createGatewayBroadcaster(params: { clients: Set if (!hasEventScope(c, event)) { continue; } + const nextSeq = (clientSeq.get(c) ?? 0) + 1; const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; if (slow && opts?.dropIfSlow) { + if (!isTargeted) { + clientSeq.set(c, nextSeq); + } continue; } if (slow) { @@ -111,6 +141,17 @@ export function createGatewayBroadcaster(params: { clients: Set continue; } try { + const eventSeq = isTargeted ? undefined : nextSeq; + if (!isTargeted) { + clientSeq.set(c, nextSeq); + } + const frame = JSON.stringify({ + type: "event", + event, + payload, + seq: eventSeq, + stateVersion: opts?.stateVersion, + }); c.socket.send(frame); } catch { /* ignore */