mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:30:43 +00:00
fix(gateway): require read scope for chat websocket broadcasts (#69373)
* fix(gateway): guard chat-class websocket broadcasts * fix(gateway): harden broadcast event scope guards * fix(gateway): keep websocket seq per recipient * fix(gateway): let nodes receive voicewake broadcasts * fix(gateway): preserve seq gaps for dropped broadcasts * fix(gateway): drop USER.md worklog from PR * fix(gateway): add scope guard docstring for pairing exclusion * fix(gateway): allow plugin.* broadcast events for write/admin scopes - Plugin-defined gateway broadcast events (plugin.* namespace) are now delivered to operator.write and operator.admin scoped clients - This preserves the ability for plugins to broadcast custom events through context.broadcast() without requiring explicit enumeration - Explicit plugin.* entries in EVENT_SCOPE_GUARDS take precedence (e.g., plugin.approval.* uses APPROVALS_SCOPE) * docs(changelog): note chat broadcast read-scope gating (#69373) --------- Co-authored-by: Devin Robison <drobison@nvidia.com>
This commit is contained in:
@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Security/dotenv: block all `OPENCLAW_*` keys from untrusted workspace `.env` files so workspace-local env loading fails closed for new runtime-control variables instead of silently inheriting them. (#473)
|
||||
- Gateway/device pairing: restrict non-admin paired-device sessions (device-token auth) to their own pairing list, approve, and reject actions so a paired device cannot enumerate other devices or approve/reject pairing requests authored by another device. Admin and shared-secret operator sessions retain full visibility. (#69375) Thanks @eleqtrizit.
|
||||
- Agents/gateway tool: extend the agent-facing `gateway` tool's config mutation guard so model-driven `config.patch` and `config.apply` cannot rewrite operator-trusted paths (sandbox, plugin trust, gateway auth/TLS, hook routing and tokens, SSRF policy, MCP servers, workspace filesystem hardening) and cannot bypass the guard by editing per-agent sandbox, tools, or embedded-Pi overrides in place under `agents.list[]`. (#69377) Thanks @eleqtrizit.
|
||||
- Gateway/websocket broadcasts: require `operator.read` (or higher) for chat, agent, and tool-result event frames so pairing-scoped and node-role sessions no longer passively receive session chat content, and scope-gate unknown broadcast events by default. Plugin-defined `plugin.*` broadcasts are scoped to operator.write/admin, and status/transport events (`heartbeat`, `presence`, `tick`, etc.) remain unrestricted. Per-client sequence numbers preserve per-connection monotonicity. (#69373) Thanks @eleqtrizit.
|
||||
|
||||
## 2026.4.20
|
||||
|
||||
|
||||
@@ -167,6 +167,29 @@ type TestSocket = {
|
||||
close: (code: number, reason: string) => void;
|
||||
};
|
||||
|
||||
type EventFrame = {
|
||||
type: "event";
|
||||
event: string;
|
||||
payload?: unknown;
|
||||
seq?: number;
|
||||
};
|
||||
|
||||
type RecordingSocket = TestSocket & {
|
||||
sent: EventFrame[];
|
||||
};
|
||||
|
||||
function makeRecordingSocket(): RecordingSocket {
|
||||
const sent: EventFrame[] = [];
|
||||
return {
|
||||
bufferedAmount: 0,
|
||||
send: vi.fn((payload: string) => {
|
||||
sent.push(JSON.parse(payload) as EventFrame);
|
||||
}),
|
||||
close: vi.fn(),
|
||||
sent,
|
||||
};
|
||||
}
|
||||
|
||||
describe("gateway broadcaster", () => {
|
||||
it("filters approval and pairing events by scope", () => {
|
||||
const approvalsSocket: TestSocket = {
|
||||
@@ -220,6 +243,311 @@ describe("gateway broadcaster", () => {
|
||||
expect(approvalsSocket.send).toHaveBeenCalledTimes(1);
|
||||
expect(pairingSocket.send).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("requires operator.read for chat-class broadcast events", () => {
|
||||
const pairingSocket = makeRecordingSocket();
|
||||
const nodeSocket = makeRecordingSocket();
|
||||
const readSocket = makeRecordingSocket();
|
||||
const writeSocket = makeRecordingSocket();
|
||||
const adminSocket = makeRecordingSocket();
|
||||
|
||||
const clients = new Set<GatewayWsClient>([
|
||||
{
|
||||
socket: pairingSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"],
|
||||
connId: "c-pairing",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: nodeSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "node", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-node",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: readSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-read",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: writeSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.write"] } as GatewayWsClient["connect"],
|
||||
connId: "c-write",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: adminSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.admin"] } as GatewayWsClient["connect"],
|
||||
connId: "c-admin",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
]);
|
||||
|
||||
const { broadcast } = createGatewayBroadcaster({ clients });
|
||||
|
||||
broadcast("chat", { sessionKey: "agent:main:main", message: "secret" });
|
||||
broadcast("agent", { type: "status", sessionKey: "agent:main:main" });
|
||||
broadcast("chat.side_result", { sessionKey: "agent:main:main", text: "tool output" });
|
||||
|
||||
expect(pairingSocket.send).not.toHaveBeenCalled();
|
||||
expect(nodeSocket.send).not.toHaveBeenCalled();
|
||||
expect(readSocket.send).toHaveBeenCalledTimes(3);
|
||||
expect(writeSocket.send).toHaveBeenCalledTimes(3);
|
||||
expect(adminSocket.send).toHaveBeenCalledTimes(3);
|
||||
expect(readSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"chat",
|
||||
"agent",
|
||||
"chat.side_result",
|
||||
]);
|
||||
expect(writeSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"chat",
|
||||
"agent",
|
||||
"chat.side_result",
|
||||
]);
|
||||
expect(adminSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"chat",
|
||||
"agent",
|
||||
"chat.side_result",
|
||||
]);
|
||||
});
|
||||
|
||||
it("allows plugin.* broadcast events for operator.write and operator.admin", () => {
|
||||
const pairingSocket = makeRecordingSocket();
|
||||
const nodeSocket = makeRecordingSocket();
|
||||
const readSocket = makeRecordingSocket();
|
||||
const writeSocket = makeRecordingSocket();
|
||||
const adminSocket = makeRecordingSocket();
|
||||
|
||||
const clients = new Set<GatewayWsClient>([
|
||||
{
|
||||
socket: pairingSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"],
|
||||
connId: "c-pairing",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: nodeSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "node", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-node",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: readSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-read",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: writeSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.write"] } as GatewayWsClient["connect"],
|
||||
connId: "c-write",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: adminSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.admin"] } as GatewayWsClient["connect"],
|
||||
connId: "c-admin",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
]);
|
||||
|
||||
const { broadcast } = createGatewayBroadcaster({ clients });
|
||||
|
||||
broadcast("plugin.myplugin.custom", { data: "test" });
|
||||
broadcast("plugin.otherplugin.state", { state: "updated" });
|
||||
|
||||
expect(pairingSocket.send).not.toHaveBeenCalled();
|
||||
expect(nodeSocket.send).not.toHaveBeenCalled();
|
||||
expect(readSocket.send).not.toHaveBeenCalled();
|
||||
expect(writeSocket.send).toHaveBeenCalledTimes(2);
|
||||
expect(adminSocket.send).toHaveBeenCalledTimes(2);
|
||||
expect(writeSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"plugin.myplugin.custom",
|
||||
"plugin.otherplugin.state",
|
||||
]);
|
||||
expect(adminSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"plugin.myplugin.custom",
|
||||
"plugin.otherplugin.state",
|
||||
]);
|
||||
});
|
||||
|
||||
it("defaults unknown events to deny and classifies remaining gateway broadcast events", () => {
|
||||
const pairingSocket = makeRecordingSocket();
|
||||
const nodeSocket = makeRecordingSocket();
|
||||
const readSocket = makeRecordingSocket();
|
||||
const writeSocket = makeRecordingSocket();
|
||||
const adminSocket = makeRecordingSocket();
|
||||
|
||||
const clients = new Set<GatewayWsClient>([
|
||||
{
|
||||
socket: pairingSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"],
|
||||
connId: "c-pairing",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: nodeSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "node", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-node",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: readSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-read",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: writeSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.write"] } as GatewayWsClient["connect"],
|
||||
connId: "c-write",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: adminSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.admin"] } as GatewayWsClient["connect"],
|
||||
connId: "c-admin",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
]);
|
||||
|
||||
const { broadcast } = createGatewayBroadcaster({ clients });
|
||||
|
||||
broadcast("cron", { jobId: "job-1" });
|
||||
broadcast("talk.mode", { enabled: true });
|
||||
broadcast("voicewake.changed", { triggers: ["hello"] });
|
||||
broadcast("heartbeat", { ts: 1 });
|
||||
broadcast("presence", { presence: [] });
|
||||
broadcast("health", { ok: true });
|
||||
broadcast("tick", { ts: 2 });
|
||||
broadcast("shutdown", { reason: "restart" });
|
||||
broadcast("update.available", { updateAvailable: { version: "2026.4.20" } });
|
||||
broadcast("unknown.future.event", { hidden: true });
|
||||
|
||||
expect(pairingSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"heartbeat",
|
||||
"presence",
|
||||
"health",
|
||||
"tick",
|
||||
"shutdown",
|
||||
"update.available",
|
||||
]);
|
||||
expect(nodeSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"voicewake.changed",
|
||||
"heartbeat",
|
||||
"presence",
|
||||
"health",
|
||||
"tick",
|
||||
"shutdown",
|
||||
"update.available",
|
||||
]);
|
||||
expect(readSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"cron",
|
||||
"voicewake.changed",
|
||||
"heartbeat",
|
||||
"presence",
|
||||
"health",
|
||||
"tick",
|
||||
"shutdown",
|
||||
"update.available",
|
||||
]);
|
||||
expect(writeSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"cron",
|
||||
"talk.mode",
|
||||
"voicewake.changed",
|
||||
"heartbeat",
|
||||
"presence",
|
||||
"health",
|
||||
"tick",
|
||||
"shutdown",
|
||||
"update.available",
|
||||
]);
|
||||
expect(adminSocket.sent.map((frame) => frame.event)).toEqual([
|
||||
"cron",
|
||||
"talk.mode",
|
||||
"voicewake.changed",
|
||||
"heartbeat",
|
||||
"presence",
|
||||
"health",
|
||||
"tick",
|
||||
"shutdown",
|
||||
"update.available",
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps event seq contiguous per receiving client when scoped events are filtered", () => {
|
||||
const pairingSocket = makeRecordingSocket();
|
||||
const readSocket = makeRecordingSocket();
|
||||
|
||||
const clients = new Set<GatewayWsClient>([
|
||||
{
|
||||
socket: pairingSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"],
|
||||
connId: "c-pairing",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: readSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-read",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
]);
|
||||
|
||||
const { broadcast } = createGatewayBroadcaster({ clients });
|
||||
|
||||
broadcast("chat", { sessionKey: "agent:main:main", message: "secret" });
|
||||
broadcast("heartbeat", { ts: 1 });
|
||||
broadcast("chat.side_result", { sessionKey: "agent:main:main", text: "tool output" });
|
||||
broadcast("tick", { ts: 2 });
|
||||
|
||||
expect(pairingSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([
|
||||
["heartbeat", 1],
|
||||
["tick", 2],
|
||||
]);
|
||||
expect(readSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([
|
||||
["chat", 1],
|
||||
["heartbeat", 2],
|
||||
["chat.side_result", 3],
|
||||
["tick", 4],
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves seq gaps when dropIfSlow skips an eligible broadcast", () => {
|
||||
const slowReadSocket = makeRecordingSocket();
|
||||
slowReadSocket.bufferedAmount = Number.MAX_SAFE_INTEGER;
|
||||
const readSocket = makeRecordingSocket();
|
||||
|
||||
const clients = new Set<GatewayWsClient>([
|
||||
{
|
||||
socket: slowReadSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-slow-read",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
{
|
||||
socket: readSocket as unknown as GatewayWsClient["socket"],
|
||||
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
|
||||
connId: "c-read",
|
||||
usesSharedGatewayAuth: false,
|
||||
},
|
||||
]);
|
||||
|
||||
const { broadcast } = createGatewayBroadcaster({ clients });
|
||||
|
||||
broadcast("chat", { sessionKey: "agent:main:main", message: "secret" }, { dropIfSlow: true });
|
||||
slowReadSocket.bufferedAmount = 0;
|
||||
broadcast("heartbeat", { ts: 1 });
|
||||
|
||||
expect(slowReadSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([
|
||||
["heartbeat", 2],
|
||||
]);
|
||||
expect(readSocket.sent.map((frame) => [frame.event, frame.seq])).toEqual([
|
||||
["chat", 1],
|
||||
["heartbeat", 2],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("chat run registry", () => {
|
||||
|
||||
@@ -15,11 +15,26 @@ import { MAX_BUFFERED_BYTES } from "./server-constants.js";
|
||||
import type { GatewayWsClient } from "./server/ws-types.js";
|
||||
import { logWs, shouldLogWs, summarizeAgentEventForWsLog } from "./ws-log.js";
|
||||
|
||||
// Pairing scope is for device-pairing handshakes only; chat transcript events
|
||||
// require operator-level session access. Pairing-scoped and node-role clients
|
||||
// must not passively receive chat-class broadcasts.
|
||||
const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
|
||||
agent: [READ_SCOPE],
|
||||
chat: [READ_SCOPE],
|
||||
"chat.side_result": [READ_SCOPE],
|
||||
cron: [READ_SCOPE],
|
||||
health: [],
|
||||
"exec.approval.requested": [APPROVALS_SCOPE],
|
||||
"exec.approval.resolved": [APPROVALS_SCOPE],
|
||||
heartbeat: [],
|
||||
"plugin.approval.requested": [APPROVALS_SCOPE],
|
||||
"plugin.approval.resolved": [APPROVALS_SCOPE],
|
||||
presence: [],
|
||||
shutdown: [],
|
||||
tick: [],
|
||||
"talk.mode": [WRITE_SCOPE],
|
||||
"update.available": [],
|
||||
"voicewake.changed": [READ_SCOPE],
|
||||
"device.pair.requested": [PAIRING_SCOPE],
|
||||
"device.pair.resolved": [PAIRING_SCOPE],
|
||||
"node.pair.requested": [PAIRING_SCOPE],
|
||||
@@ -29,6 +44,11 @@ const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
|
||||
"session.tool": [READ_SCOPE],
|
||||
};
|
||||
|
||||
// Events that node-role sessions must receive even when the event's operator
|
||||
// scope would otherwise reject non-operator roles. Nodes act on these updates
|
||||
// (e.g. reconfiguring wake-word triggers).
|
||||
const NODE_ALLOWED_EVENTS = new Set<string>(["voicewake.changed"]);
|
||||
|
||||
export type {
|
||||
GatewayBroadcastFn,
|
||||
GatewayBroadcastOpts,
|
||||
@@ -38,12 +58,26 @@ export type {
|
||||
|
||||
function hasEventScope(client: GatewayWsClient, event: string): boolean {
|
||||
const required = EVENT_SCOPE_GUARDS[event];
|
||||
// Plugin-defined gateway broadcast events (plugin.* namespace) are allowed
|
||||
// for operator.write and operator.admin scopes. Explicit plugin.* entries
|
||||
// in EVENT_SCOPE_GUARDS take precedence (e.g., plugin.approval.*).
|
||||
if (!required && event.startsWith("plugin.")) {
|
||||
const role = client.connect.role ?? "operator";
|
||||
if (role !== "operator") {
|
||||
return false;
|
||||
}
|
||||
const scopes = Array.isArray(client.connect.scopes) ? client.connect.scopes : [];
|
||||
return scopes.includes(WRITE_SCOPE) || scopes.includes(ADMIN_SCOPE);
|
||||
}
|
||||
if (!required) {
|
||||
return false;
|
||||
}
|
||||
if (required.length === 0) {
|
||||
return true;
|
||||
}
|
||||
const role = client.connect.role ?? "operator";
|
||||
if (role !== "operator") {
|
||||
return false;
|
||||
return role === "node" && NODE_ALLOWED_EVENTS.has(event);
|
||||
}
|
||||
const scopes = Array.isArray(client.connect.scopes) ? client.connect.scopes : [];
|
||||
if (scopes.includes(ADMIN_SCOPE)) {
|
||||
@@ -56,7 +90,7 @@ function hasEventScope(client: GatewayWsClient, event: string): boolean {
|
||||
}
|
||||
|
||||
export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient> }) {
|
||||
let seq = 0;
|
||||
const clientSeq = new WeakMap<GatewayWsClient, number>();
|
||||
|
||||
const broadcastInternal = (
|
||||
event: string,
|
||||
@@ -68,18 +102,10 @@ export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient>
|
||||
return;
|
||||
}
|
||||
const isTargeted = Boolean(targetConnIds);
|
||||
const eventSeq = isTargeted ? undefined : ++seq;
|
||||
const frame = JSON.stringify({
|
||||
type: "event",
|
||||
event,
|
||||
payload,
|
||||
seq: eventSeq,
|
||||
stateVersion: opts?.stateVersion,
|
||||
});
|
||||
if (shouldLogWs()) {
|
||||
const logMeta: Record<string, unknown> = {
|
||||
event,
|
||||
seq: eventSeq ?? "targeted",
|
||||
seq: isTargeted ? "targeted" : "per-client",
|
||||
clients: params.clients.size,
|
||||
targets: targetConnIds ? targetConnIds.size : undefined,
|
||||
dropIfSlow: opts?.dropIfSlow,
|
||||
@@ -98,8 +124,12 @@ export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient>
|
||||
if (!hasEventScope(c, event)) {
|
||||
continue;
|
||||
}
|
||||
const nextSeq = (clientSeq.get(c) ?? 0) + 1;
|
||||
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES;
|
||||
if (slow && opts?.dropIfSlow) {
|
||||
if (!isTargeted) {
|
||||
clientSeq.set(c, nextSeq);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (slow) {
|
||||
@@ -111,6 +141,17 @@ export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient>
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const eventSeq = isTargeted ? undefined : nextSeq;
|
||||
if (!isTargeted) {
|
||||
clientSeq.set(c, nextSeq);
|
||||
}
|
||||
const frame = JSON.stringify({
|
||||
type: "event",
|
||||
event,
|
||||
payload,
|
||||
seq: eventSeq,
|
||||
stateVersion: opts?.stateVersion,
|
||||
});
|
||||
c.socket.send(frame);
|
||||
} catch {
|
||||
/* ignore */
|
||||
|
||||
Reference in New Issue
Block a user