Add node-hosted MCP session plumbing

This commit is contained in:
pashpashpash
2026-04-29 20:49:08 -04:00
parent 027ea5f08b
commit b7aa7cdb69
28 changed files with 1656 additions and 3 deletions

View File

@@ -29,6 +29,7 @@ public struct ConnectParams: Codable, Sendable {
public let caps: [String]?
public let commands: [String]?
public let permissions: [String: AnyCodable]?
public let mcpservers: [NodeMcpServerDescriptor]?
public let pathenv: String?
public let role: String?
public let scopes: [String]?
@@ -44,6 +45,7 @@ public struct ConnectParams: Codable, Sendable {
caps: [String]?,
commands: [String]?,
permissions: [String: AnyCodable]?,
mcpservers: [NodeMcpServerDescriptor]?,
pathenv: String?,
role: String?,
scopes: [String]?,
@@ -58,6 +60,7 @@ public struct ConnectParams: Codable, Sendable {
self.caps = caps
self.commands = commands
self.permissions = permissions
self.mcpservers = mcpservers
self.pathenv = pathenv
self.role = role
self.scopes = scopes
@@ -74,6 +77,7 @@ public struct ConnectParams: Codable, Sendable {
case caps
case commands
case permissions
case mcpservers = "mcpServers"
case pathenv = "pathEnv"
case role
case scopes
@@ -833,6 +837,7 @@ public struct NodePairRequestParams: Codable, Sendable {
public let modelidentifier: String?
public let caps: [String]?
public let commands: [String]?
public let mcpservers: [NodeMcpServerDescriptor]?
public let remoteip: String?
public let silent: Bool?
@@ -847,6 +852,7 @@ public struct NodePairRequestParams: Codable, Sendable {
modelidentifier: String?,
caps: [String]?,
commands: [String]?,
mcpservers: [NodeMcpServerDescriptor]?,
remoteip: String?,
silent: Bool?)
{
@@ -860,6 +866,7 @@ public struct NodePairRequestParams: Codable, Sendable {
self.modelidentifier = modelidentifier
self.caps = caps
self.commands = commands
self.mcpservers = mcpservers
self.remoteip = remoteip
self.silent = silent
}
@@ -875,6 +882,7 @@ public struct NodePairRequestParams: Codable, Sendable {
case modelidentifier = "modelIdentifier"
case caps
case commands
case mcpservers = "mcpServers"
case remoteip = "remoteIp"
case silent
}
@@ -1102,6 +1110,220 @@ public struct NodeEventResult: Codable, Sendable {
}
}
public struct NodeMcpServerDescriptor: Codable, Sendable {
public let id: String
public let displayname: String?
public let provider: String?
public let transport: String?
public let source: String?
public let status: String?
public let requiredpermissions: [String]?
public let metadata: [String: AnyCodable]?
public init(
id: String,
displayname: String?,
provider: String?,
transport: String?,
source: String?,
status: String?,
requiredpermissions: [String]?,
metadata: [String: AnyCodable]?)
{
self.id = id
self.displayname = displayname
self.provider = provider
self.transport = transport
self.source = source
self.status = status
self.requiredpermissions = requiredpermissions
self.metadata = metadata
}
private enum CodingKeys: String, CodingKey {
case id
case displayname = "displayName"
case provider
case transport
case source
case status
case requiredpermissions = "requiredPermissions"
case metadata
}
}
public struct NodeMcpSessionOpenEvent: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let serverid: String
public let timeoutms: Int?
public init(
sessionid: String,
nodeid: String,
serverid: String,
timeoutms: Int?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.serverid = serverid
self.timeoutms = timeoutms
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case serverid = "serverId"
case timeoutms = "timeoutMs"
}
}
public struct NodeMcpSessionOpenResultParams: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let serverid: String
public let ok: Bool
public let pid: Int?
public let error: [String: AnyCodable]?
public init(
sessionid: String,
nodeid: String,
serverid: String,
ok: Bool,
pid: Int?,
error: [String: AnyCodable]?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.serverid = serverid
self.ok = ok
self.pid = pid
self.error = error
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case serverid = "serverId"
case ok
case pid
case error
}
}
public struct NodeMcpSessionInputEvent: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let seq: Int
public let database64: String
public init(
sessionid: String,
nodeid: String,
seq: Int,
database64: String)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.seq = seq
self.database64 = database64
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case seq
case database64 = "dataBase64"
}
}
public struct NodeMcpSessionOutputParams: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let seq: Int
public let stream: String
public let database64: String
public init(
sessionid: String,
nodeid: String,
seq: Int,
stream: String,
database64: String)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.seq = seq
self.stream = stream
self.database64 = database64
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case seq
case stream
case database64 = "dataBase64"
}
}
public struct NodeMcpSessionCloseEvent: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let reason: String?
public init(
sessionid: String,
nodeid: String,
reason: String?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.reason = reason
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case reason
}
}
public struct NodeMcpSessionClosedParams: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let ok: Bool
public let exitcode: AnyCodable?
public let signal: AnyCodable?
public let error: [String: AnyCodable]?
public init(
sessionid: String,
nodeid: String,
ok: Bool,
exitcode: AnyCodable?,
signal: AnyCodable?,
error: [String: AnyCodable]?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.ok = ok
self.exitcode = exitcode
self.signal = signal
self.error = error
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case ok
case exitcode = "exitCode"
case signal
case error
}
}
public struct NodePresenceAlivePayload: Codable, Sendable {
public let trigger: NodePresenceAliveReason
public let sentatms: Int?

View File

@@ -29,6 +29,7 @@ public struct ConnectParams: Codable, Sendable {
public let caps: [String]?
public let commands: [String]?
public let permissions: [String: AnyCodable]?
public let mcpservers: [NodeMcpServerDescriptor]?
public let pathenv: String?
public let role: String?
public let scopes: [String]?
@@ -44,6 +45,7 @@ public struct ConnectParams: Codable, Sendable {
caps: [String]?,
commands: [String]?,
permissions: [String: AnyCodable]?,
mcpservers: [NodeMcpServerDescriptor]?,
pathenv: String?,
role: String?,
scopes: [String]?,
@@ -58,6 +60,7 @@ public struct ConnectParams: Codable, Sendable {
self.caps = caps
self.commands = commands
self.permissions = permissions
self.mcpservers = mcpservers
self.pathenv = pathenv
self.role = role
self.scopes = scopes
@@ -74,6 +77,7 @@ public struct ConnectParams: Codable, Sendable {
case caps
case commands
case permissions
case mcpservers = "mcpServers"
case pathenv = "pathEnv"
case role
case scopes
@@ -833,6 +837,7 @@ public struct NodePairRequestParams: Codable, Sendable {
public let modelidentifier: String?
public let caps: [String]?
public let commands: [String]?
public let mcpservers: [NodeMcpServerDescriptor]?
public let remoteip: String?
public let silent: Bool?
@@ -847,6 +852,7 @@ public struct NodePairRequestParams: Codable, Sendable {
modelidentifier: String?,
caps: [String]?,
commands: [String]?,
mcpservers: [NodeMcpServerDescriptor]?,
remoteip: String?,
silent: Bool?)
{
@@ -860,6 +866,7 @@ public struct NodePairRequestParams: Codable, Sendable {
self.modelidentifier = modelidentifier
self.caps = caps
self.commands = commands
self.mcpservers = mcpservers
self.remoteip = remoteip
self.silent = silent
}
@@ -875,6 +882,7 @@ public struct NodePairRequestParams: Codable, Sendable {
case modelidentifier = "modelIdentifier"
case caps
case commands
case mcpservers = "mcpServers"
case remoteip = "remoteIp"
case silent
}
@@ -1102,6 +1110,220 @@ public struct NodeEventResult: Codable, Sendable {
}
}
public struct NodeMcpServerDescriptor: Codable, Sendable {
public let id: String
public let displayname: String?
public let provider: String?
public let transport: String?
public let source: String?
public let status: String?
public let requiredpermissions: [String]?
public let metadata: [String: AnyCodable]?
public init(
id: String,
displayname: String?,
provider: String?,
transport: String?,
source: String?,
status: String?,
requiredpermissions: [String]?,
metadata: [String: AnyCodable]?)
{
self.id = id
self.displayname = displayname
self.provider = provider
self.transport = transport
self.source = source
self.status = status
self.requiredpermissions = requiredpermissions
self.metadata = metadata
}
private enum CodingKeys: String, CodingKey {
case id
case displayname = "displayName"
case provider
case transport
case source
case status
case requiredpermissions = "requiredPermissions"
case metadata
}
}
public struct NodeMcpSessionOpenEvent: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let serverid: String
public let timeoutms: Int?
public init(
sessionid: String,
nodeid: String,
serverid: String,
timeoutms: Int?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.serverid = serverid
self.timeoutms = timeoutms
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case serverid = "serverId"
case timeoutms = "timeoutMs"
}
}
public struct NodeMcpSessionOpenResultParams: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let serverid: String
public let ok: Bool
public let pid: Int?
public let error: [String: AnyCodable]?
public init(
sessionid: String,
nodeid: String,
serverid: String,
ok: Bool,
pid: Int?,
error: [String: AnyCodable]?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.serverid = serverid
self.ok = ok
self.pid = pid
self.error = error
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case serverid = "serverId"
case ok
case pid
case error
}
}
public struct NodeMcpSessionInputEvent: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let seq: Int
public let database64: String
public init(
sessionid: String,
nodeid: String,
seq: Int,
database64: String)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.seq = seq
self.database64 = database64
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case seq
case database64 = "dataBase64"
}
}
public struct NodeMcpSessionOutputParams: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let seq: Int
public let stream: String
public let database64: String
public init(
sessionid: String,
nodeid: String,
seq: Int,
stream: String,
database64: String)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.seq = seq
self.stream = stream
self.database64 = database64
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case seq
case stream
case database64 = "dataBase64"
}
}
public struct NodeMcpSessionCloseEvent: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let reason: String?
public init(
sessionid: String,
nodeid: String,
reason: String?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.reason = reason
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case reason
}
}
public struct NodeMcpSessionClosedParams: Codable, Sendable {
public let sessionid: String
public let nodeid: String
public let ok: Bool
public let exitcode: AnyCodable?
public let signal: AnyCodable?
public let error: [String: AnyCodable]?
public init(
sessionid: String,
nodeid: String,
ok: Bool,
exitcode: AnyCodable?,
signal: AnyCodable?,
error: [String: AnyCodable]?)
{
self.sessionid = sessionid
self.nodeid = nodeid
self.ok = ok
self.exitcode = exitcode
self.signal = signal
self.error = error
}
private enum CodingKeys: String, CodingKey {
case sessionid = "sessionId"
case nodeid = "nodeId"
case ok
case exitcode = "exitCode"
case signal
case error
}
}
public struct NodePresenceAlivePayload: Codable, Sendable {
public let trigger: NodePresenceAliveReason
public let sentatms: Int?

View File

@@ -252,9 +252,20 @@ Nodes declare capability claims at connect time:
- `caps`: high-level capability categories.
- `commands`: command allowlist for invoke.
- `mcpServers`: named node-hosted MCP servers the node is willing to run.
- `permissions`: granular toggles (e.g. `screen.record`, `camera.capture`).
The Gateway treats these as **claims** and enforces server-side allowlists.
Nodes that advertise MCP servers should also include the `mcpHost` cap. Adding
a new node-hosted MCP server is treated as a privileged surface and requires
node pairing approval.
Node-hosted MCP sessions use a long-lived stream instead of `node.invoke`.
The Gateway opens a named server with `node.mcp.session.open`, sends MCP stdio
bytes with `node.mcp.session.input`, receives stdout/stderr chunks through
`node.mcp.session.output`, and closes with `node.mcp.session.close` /
`node.mcp.session.closed`. The Gateway only sends the named `serverId`; the node
owns the executable path, bundle checks, permissions, and process lifecycle.
## Presence
@@ -424,6 +435,7 @@ enumeration of `src/gateway/server-methods/*.ts`.
- `node.rename` updates a paired node label.
- `node.invoke` forwards a command to a connected node.
- `node.invoke.result` returns the result for an invoke request.
- `node.mcp.session.open`, `node.mcp.session.input`, `node.mcp.session.close`, `node.mcp.session.open.result`, `node.mcp.session.output`, and `node.mcp.session.closed` carry node-hosted MCP stdio streams.
- `node.event` carries node-originated events back into the gateway.
- `node.canvas.capability.refresh` refreshes scoped canvas-capability tokens.
- `node.pending.pull` and `node.pending.ack` are the connected-node queue APIs.

View File

@@ -58,6 +58,13 @@ The macOS app presents itself as a node. Common commands:
The node reports a `permissions` map so agents can decide whats allowed.
For permission-sensitive MCP tools such as Computer Use, the intended model is
for the Mac app to advertise an `mcpHost` capability with named `mcpServers`.
The Gateway may open one of those named servers and proxy MCP stdio bytes over
the node WebSocket, but the Mac app owns the executable mapping, signing checks,
TCC prompts, and child-process lifetime. This keeps CLI-hatched and remote
Gateways working while preserving the app as the macOS permission boundary.
Node service + app IPC:
- When the headless node host service is running (remote mode), it connects to the Gateway WS as a node.

View File

@@ -31,6 +31,9 @@ export const CLI_DEFAULT_OPERATOR_SCOPES: OperatorScope[] = [
const NODE_ROLE_METHODS = new Set([
"node.invoke.result",
"node.mcp.session.open.result",
"node.mcp.session.output",
"node.mcp.session.closed",
"node.event",
"node.pending.drain",
"node.canvas.capability.refresh",

View File

@@ -1,6 +1,7 @@
import { hasEffectivePairedDeviceRole, type PairedDevice } from "../infra/device-pairing.js";
import type { NodePairingPairedNode } from "../infra/node-pairing.js";
import type { NodeListNode } from "../shared/node-list-types.js";
import { normalizeNodeMcpServerDescriptors } from "../shared/node-mcp-types.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import type { NodeSession } from "./node-registry.js";
@@ -28,6 +29,7 @@ export type KnownNodeApprovedSource = {
modelIdentifier?: string;
caps: string[];
commands: string[];
mcpServers?: NonNullable<NodeListNode["mcpServers"]>;
permissions?: Record<string, boolean>;
approvedAtMs?: number;
lastConnectedAtMs?: number;
@@ -90,6 +92,7 @@ function buildApprovedNodeSource(entry: NodePairingPairedNode): KnownNodeApprove
modelIdentifier: entry.modelIdentifier,
caps: entry.caps ?? [],
commands: entry.commands ?? [],
mcpServers: normalizeNodeMcpServerDescriptors(entry.mcpServers),
permissions: entry.permissions,
approvedAtMs: entry.approvedAtMs,
lastConnectedAtMs: entry.lastConnectedAtMs,
@@ -149,6 +152,7 @@ function buildEffectiveKnownNode(entry: {
commands: live
? uniqueSortedStrings(live.commands)
: uniqueSortedStrings(nodePairing?.commands),
mcpServers: live?.mcpServers ?? nodePairing?.mcpServers,
pathEnv: live?.pathEnv,
permissions: live?.permissions ?? nodePairing?.permissions,
connectedAtMs: live?.connectedAtMs,

View File

@@ -0,0 +1,74 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { NodePairingRequestInput } from "../infra/node-pairing.js";
import { reconcileNodePairingOnConnect } from "./node-connect-reconcile.js";
import type { ConnectParams } from "./protocol/index.js";
function connectParams(patch: Partial<ConnectParams> = {}): ConnectParams {
return {
minProtocol: 1,
maxProtocol: 1,
client: {
id: "node-host",
displayName: "Mac",
version: "dev",
platform: "macOS",
mode: "node",
},
role: "node",
scopes: [],
caps: ["mcpHost"],
commands: [],
device: {
id: "mac-node",
publicKey: "public-key",
signature: "signature",
signedAt: 1,
nonce: "nonce",
},
...patch,
};
}
describe("reconcileNodePairingOnConnect", () => {
it("requires a new pairing request before exposing newly declared MCP servers", async () => {
const requestPairing = vi.fn(async (input: NodePairingRequestInput) => ({
status: "pending" as const,
request: {
...input,
requestId: "pair-1",
ts: 1,
},
created: true,
}));
const result = await reconcileNodePairingOnConnect({
cfg: {} as OpenClawConfig,
connectParams: connectParams({
mcpServers: [
{ id: "computer-use", displayName: "Computer Use", status: "missing_permissions" },
],
}),
pairedNode: {
nodeId: "mac-node",
token: "token",
caps: ["mcpHost"],
commands: [],
mcpServers: [],
createdAtMs: 1,
approvedAtMs: 1,
},
requestPairing,
});
expect(result.effectiveMcpServers).toEqual([]);
expect(result.pendingPairing?.created).toBe(true);
expect(requestPairing).toHaveBeenCalledWith(
expect.objectContaining({
mcpServers: [
{ id: "computer-use", displayName: "Computer Use", status: "missing_permissions" },
],
}),
);
});
});

View File

@@ -4,6 +4,11 @@ import type {
NodePairingPendingRequest,
NodePairingRequestInput,
} from "../infra/node-pairing.js";
import {
normalizeNodeMcpServerDescriptors,
normalizeNodeMcpServerIds,
type NodeMcpServerDescriptor,
} from "../shared/node-mcp-types.js";
import {
normalizeDeclaredNodeCommands,
resolveNodeCommandAllowlist,
@@ -19,6 +24,7 @@ type PendingNodePairingResult = {
export type NodeConnectPairingReconcileResult = {
nodeId: string;
effectiveCommands: string[];
effectiveMcpServers?: NodeMcpServerDescriptor[];
pendingPairing?: PendingNodePairingResult;
};
@@ -47,6 +53,7 @@ function buildNodePairingRequestInput(params: {
modelIdentifier: params.connectParams.client.modelIdentifier,
caps: params.connectParams.caps,
commands: params.commands,
mcpServers: normalizeNodeMcpServerDescriptors(params.connectParams.mcpServers),
remoteIp: params.remoteIp,
};
}
@@ -69,6 +76,7 @@ export async function reconcileNodePairingOnConnect(params: {
: [],
allowlist,
});
const declaredMcpServers = normalizeNodeMcpServerDescriptors(params.connectParams.mcpServers);
if (!params.pairedNode) {
const pendingPairing = await params.requestPairing(
@@ -82,6 +90,7 @@ export async function reconcileNodePairingOnConnect(params: {
return {
nodeId,
effectiveCommands: declared,
effectiveMcpServers: undefined,
pendingPairing,
};
}
@@ -91,8 +100,16 @@ export async function reconcileNodePairingOnConnect(params: {
allowlist,
});
const hasCommandUpgrade = declared.some((command) => !approvedCommands.includes(command));
const approvedMcpServerIds = normalizeNodeMcpServerIds(params.pairedNode.mcpServers);
const declaredMcpServerIds = normalizeNodeMcpServerIds(declaredMcpServers);
const hasMcpServerUpgrade = declaredMcpServerIds.some(
(serverId) => !approvedMcpServerIds.includes(serverId),
);
const effectiveMcpServers = declaredMcpServers?.filter((descriptor) =>
approvedMcpServerIds.includes(descriptor.id),
);
if (hasCommandUpgrade) {
if (hasCommandUpgrade || hasMcpServerUpgrade) {
const pendingPairing = await params.requestPairing(
buildNodePairingRequestInput({
nodeId,
@@ -104,6 +121,7 @@ export async function reconcileNodePairingOnConnect(params: {
return {
nodeId,
effectiveCommands: approvedCommands,
effectiveMcpServers,
pendingPairing,
};
}
@@ -111,5 +129,6 @@ export async function reconcileNodePairingOnConnect(params: {
return {
nodeId,
effectiveCommands: declared,
effectiveMcpServers: declaredMcpServers,
};
}

View File

@@ -0,0 +1,215 @@
import { Buffer } from "node:buffer";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { describe, expect, it, vi } from "vitest";
import type { WebSocket } from "ws";
import { NodeMcpClientTransport } from "./node-mcp-client-transport.js";
import { NodeRegistry } from "./node-registry.js";
import type { ConnectParams } from "./protocol/index.js";
import type { GatewayWsClient } from "./server/ws-types.js";
function createNodeClient(params: { nodeId?: string; caps?: string[]; mcpServers?: unknown[] }): {
client: GatewayWsClient;
sent: Array<{ event: string; payload: unknown }>;
} {
const sent: Array<{ event: string; payload: unknown }> = [];
const nodeId = params.nodeId ?? "mac-node";
const connect: ConnectParams = {
minProtocol: 1,
maxProtocol: 1,
client: {
id: "node-host",
displayName: "Mac",
version: "dev",
platform: "macOS",
mode: "node",
},
role: "node",
scopes: [],
caps: params.caps ?? ["mcpHost"],
commands: [],
mcpServers: params.mcpServers as ConnectParams["mcpServers"],
device: {
id: nodeId,
publicKey: "public-key",
signature: "signature",
signedAt: 1,
nonce: "nonce",
},
};
const socket = {
send(data: string) {
const frame = JSON.parse(data) as { event?: string; payload?: unknown };
if (frame.event) {
sent.push({ event: frame.event, payload: frame.payload });
}
},
} as unknown as WebSocket;
return {
client: {
socket,
connect,
connId: `${nodeId}-conn`,
usesSharedGatewayAuth: false,
},
sent,
};
}
function assignTransportHandlers(
transport: NodeMcpClientTransport,
handlers: Partial<Transport>,
): void {
Object.assign(transport, handlers);
}
describe("NodeMcpClientTransport", () => {
it("opens a declared node-hosted MCP server and forwards JSON-RPC over stdout", async () => {
const registry = new NodeRegistry();
const { client, sent } = createNodeClient({
mcpServers: [{ id: "computer-use", displayName: "Computer Use", status: "ready" }],
});
registry.register(client, {});
const transport = new NodeMcpClientTransport(registry, {
nodeId: "mac-node",
serverId: "computer-use",
sessionId: "session-1",
openTimeoutMs: 1000,
});
const messages: unknown[] = [];
const onclose = vi.fn();
assignTransportHandlers(transport, {
onmessage: (message) => messages.push(message),
onclose,
});
const start = transport.start();
expect(sent).toEqual([
{
event: "node.mcp.session.open",
payload: {
sessionId: "session-1",
nodeId: "mac-node",
serverId: "computer-use",
timeoutMs: 1000,
},
},
]);
expect(
registry.handleMcpSessionOpenResult({
sessionId: "session-1",
nodeId: "mac-node",
serverId: "computer-use",
ok: true,
pid: 42,
}),
).toBe(true);
await start;
await transport.send({ jsonrpc: "2.0", id: 1, method: "tools/list" });
expect(sent.at(-1)?.event).toBe("node.mcp.session.input");
const inputPayload = sent.at(-1)?.payload as { dataBase64?: string };
expect(Buffer.from(inputPayload.dataBase64 ?? "", "base64").toString("utf8")).toBe(
'{"jsonrpc":"2.0","id":1,"method":"tools/list"}\n',
);
expect(
registry.handleMcpSessionOutput({
sessionId: "session-1",
nodeId: "mac-node",
seq: 0,
stream: "stdout",
dataBase64: Buffer.from('{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}\n').toString(
"base64",
),
}),
).toBe(true);
expect(messages).toEqual([{ jsonrpc: "2.0", id: 1, result: { tools: [] } }]);
await transport.close();
expect(sent.at(-1)).toEqual({
event: "node.mcp.session.close",
payload: {
sessionId: "session-1",
nodeId: "mac-node",
reason: "client_close",
},
});
expect(
registry.handleMcpSessionOutput({
sessionId: "session-1",
nodeId: "mac-node",
seq: 1,
stream: "stdout",
dataBase64: Buffer.from('{"jsonrpc":"2.0","method":"stale"}\n').toString("base64"),
}),
).toBe(false);
expect(onclose).toHaveBeenCalledTimes(1);
});
it("rejects sessions for undeclared MCP servers before sending to the node", async () => {
const registry = new NodeRegistry();
const { client, sent } = createNodeClient({
mcpServers: [{ id: "computer-use" }],
});
registry.register(client, {});
const transport = new NodeMcpClientTransport(registry, {
nodeId: "mac-node",
serverId: "not-advertised",
sessionId: "session-2",
openTimeoutMs: 1,
});
await expect(transport.start()).rejects.toThrow("node did not advertise MCP server");
expect(sent).toEqual([]);
});
it("rejects advertised MCP servers that are not ready", async () => {
const registry = new NodeRegistry();
const { client, sent } = createNodeClient({
mcpServers: [{ id: "computer-use", status: "missing_permissions" }],
});
registry.register(client, {});
const transport = new NodeMcpClientTransport(registry, {
nodeId: "mac-node",
serverId: "computer-use",
sessionId: "session-not-ready",
openTimeoutMs: 1,
});
await expect(transport.start()).rejects.toThrow("node MCP server is missing_permissions");
expect(sent).toEqual([]);
});
it("closes active sessions when the node disconnects", async () => {
const registry = new NodeRegistry();
const { client } = createNodeClient({
mcpServers: [{ id: "computer-use" }],
});
registry.register(client, {});
const transport = new NodeMcpClientTransport(registry, {
nodeId: "mac-node",
serverId: "computer-use",
sessionId: "session-3",
openTimeoutMs: 1000,
});
const onclose = vi.fn();
const onerror = vi.fn();
assignTransportHandlers(transport, { onclose, onerror });
const start = transport.start();
registry.handleMcpSessionOpenResult({
sessionId: "session-3",
nodeId: "mac-node",
serverId: "computer-use",
ok: true,
});
await start;
expect(registry.unregister(client.connId)).toBe("mac-node");
expect(onerror).toHaveBeenCalledWith(expect.objectContaining({ message: "node disconnected" }));
expect(onclose).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,119 @@
import { randomUUID } from "node:crypto";
import { ReadBuffer, serializeMessage } from "@modelcontextprotocol/sdk/shared/stdio.js";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
import type { NodeMcpClosedResult, NodeMcpOutputChunk, NodeRegistry } from "./node-registry.js";
export type NodeMcpClientTransportOptions = {
nodeId: string;
serverId: string;
sessionId?: string;
openTimeoutMs?: number;
};
export class NodeMcpClientTransport implements Transport {
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
private readonly readBuffer = new ReadBuffer();
private readonly nodeMcpSessionId: string;
private started = false;
private closed = false;
private seq = 0;
constructor(
private readonly registry: NodeRegistry,
private readonly options: NodeMcpClientTransportOptions,
) {
this.nodeMcpSessionId = options.sessionId ?? randomUUID();
}
async start(): Promise<void> {
if (this.started) {
throw new Error("NodeMcpClientTransport already started");
}
if (this.closed) {
throw new Error("NodeMcpClientTransport is closed");
}
const result = await this.registry.openMcpSession({
nodeId: this.options.nodeId,
serverId: this.options.serverId,
sessionId: this.nodeMcpSessionId,
timeoutMs: this.options.openTimeoutMs,
onOutput: (chunk) => this.handleOutput(chunk),
onClosed: (closed) => this.handleClosed(closed),
});
if (!result.ok) {
const message = result.error?.message ?? "failed to open node MCP session";
throw new Error(message);
}
this.started = true;
}
async send(message: JSONRPCMessage): Promise<void> {
if (!this.started || this.closed) {
throw new Error("Not connected");
}
const payload = Buffer.from(serializeMessage(message), "utf8").toString("base64");
const sent = this.registry.sendMcpInput({
nodeId: this.options.nodeId,
sessionId: this.nodeMcpSessionId,
seq: this.seq++,
dataBase64: payload,
});
if (!sent) {
throw new Error("failed to send node MCP input");
}
}
async close(): Promise<void> {
if (this.closed) {
return;
}
this.closed = true;
this.readBuffer.clear();
if (this.started) {
this.registry.closeMcpSession({
nodeId: this.options.nodeId,
sessionId: this.nodeMcpSessionId,
reason: "client_close",
});
}
this.onclose?.();
}
private handleOutput(chunk: NodeMcpOutputChunk) {
if (this.closed) {
return;
}
if (chunk.stream !== "stdout") {
return;
}
try {
this.readBuffer.append(Buffer.from(chunk.dataBase64, "base64"));
while (true) {
const message = this.readBuffer.readMessage();
if (message === null) {
break;
}
this.onmessage?.(message);
}
} catch (error) {
this.onerror?.(error instanceof Error ? error : new Error(String(error)));
}
}
private handleClosed(result: NodeMcpClosedResult) {
if (this.closed) {
return;
}
this.closed = true;
this.readBuffer.clear();
if (!result.ok) {
const message = result.error?.message ?? "node MCP session closed";
this.onerror?.(new Error(message));
}
this.onclose?.();
}
}

View File

@@ -1,6 +1,36 @@
import { randomUUID } from "node:crypto";
import {
normalizeNodeMcpServerDescriptors,
type NodeMcpServerDescriptor,
} from "../shared/node-mcp-types.js";
import type { GatewayWsClient } from "./server/ws-types.js";
export type NodeMcpOutputChunk = {
sessionId: string;
nodeId: string;
seq: number;
stream: "stdout" | "stderr";
dataBase64: string;
};
export type NodeMcpClosedResult = {
sessionId: string;
nodeId: string;
ok: boolean;
exitCode?: number | null;
signal?: string | null;
error?: { code?: string; message?: string } | null;
};
export type NodeMcpOpenResult = {
sessionId: string;
nodeId: string;
serverId: string;
ok: boolean;
pid?: number;
error?: { code?: string; message?: string } | null;
};
export type NodeSession = {
nodeId: string;
connId: string;
@@ -17,6 +47,7 @@ export type NodeSession = {
remoteIp?: string;
caps: string[];
commands: string[];
mcpServers?: NodeMcpServerDescriptor[];
permissions?: Record<string, boolean>;
pathEnv?: string;
connectedAtMs: number;
@@ -30,6 +61,20 @@ type PendingInvoke = {
timer: ReturnType<typeof setTimeout>;
};
type PendingMcpOpen = {
nodeId: string;
serverId: string;
resolve: (value: NodeMcpOpenResult) => void;
timer: ReturnType<typeof setTimeout>;
};
type ActiveMcpSession = {
nodeId: string;
serverId: string;
onOutput?: (chunk: NodeMcpOutputChunk) => void;
onClosed?: (result: NodeMcpClosedResult) => void;
};
export type NodeInvokeResult = {
ok: boolean;
payload?: unknown;
@@ -41,6 +86,8 @@ export class NodeRegistry {
private nodesById = new Map<string, NodeSession>();
private nodesByConn = new Map<string, string>();
private pendingInvokes = new Map<string, PendingInvoke>();
private pendingMcpOpens = new Map<string, PendingMcpOpen>();
private activeMcpSessions = new Map<string, ActiveMcpSession>();
register(client: GatewayWsClient, opts: { remoteIp?: string | undefined }) {
const connect = client.connect;
@@ -57,6 +104,9 @@ export class NodeRegistry {
typeof (connect as { pathEnv?: string }).pathEnv === "string"
? (connect as { pathEnv?: string }).pathEnv
: undefined;
const mcpServers = normalizeNodeMcpServerDescriptors(
(connect as { mcpServers?: unknown }).mcpServers,
);
const session: NodeSession = {
nodeId,
connId: client.connId,
@@ -73,6 +123,7 @@ export class NodeRegistry {
remoteIp: opts.remoteIp,
caps,
commands,
mcpServers,
permissions,
pathEnv,
connectedAtMs: Date.now(),
@@ -97,6 +148,32 @@ export class NodeRegistry {
pending.reject(new Error(`node disconnected (${pending.command})`));
this.pendingInvokes.delete(id);
}
for (const [sessionId, pending] of this.pendingMcpOpens.entries()) {
if (pending.nodeId !== nodeId) {
continue;
}
clearTimeout(pending.timer);
pending.resolve({
sessionId,
nodeId,
serverId: pending.serverId,
ok: false,
error: { code: "NODE_DISCONNECTED", message: "node disconnected" },
});
this.pendingMcpOpens.delete(sessionId);
}
for (const [sessionId, session] of this.activeMcpSessions.entries()) {
if (session.nodeId !== nodeId) {
continue;
}
this.activeMcpSessions.delete(sessionId);
session.onClosed?.({
sessionId,
nodeId,
ok: false,
error: { code: "NODE_DISCONNECTED", message: "node disconnected" },
});
}
return nodeId;
}
@@ -192,6 +269,166 @@ export class NodeRegistry {
return this.sendEventToSession(node, event, payload);
}
async openMcpSession(params: {
nodeId: string;
serverId: string;
sessionId?: string;
timeoutMs?: number;
onOutput?: (chunk: NodeMcpOutputChunk) => void;
onClosed?: (result: NodeMcpClosedResult) => void;
}): Promise<NodeMcpOpenResult> {
const node = this.nodesById.get(params.nodeId);
const sessionId = params.sessionId ?? randomUUID();
if (!node) {
return {
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: false,
error: { code: "NOT_CONNECTED", message: "node not connected" },
};
}
if (!node.caps.includes("mcpHost")) {
return {
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: false,
error: { code: "MCP_HOST_UNAVAILABLE", message: "node did not advertise mcpHost" },
};
}
const descriptors = node.mcpServers ?? [];
const descriptor = descriptors.find((entry) => entry.id === params.serverId);
if (!descriptor) {
return {
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: false,
error: { code: "MCP_SERVER_NOT_DECLARED", message: "node did not advertise MCP server" },
};
}
if (descriptor.status && descriptor.status !== "ready") {
return {
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: false,
error: {
code: "MCP_SERVER_NOT_READY",
message: `node MCP server is ${descriptor.status}`,
},
};
}
this.activeMcpSessions.set(sessionId, {
nodeId: params.nodeId,
serverId: params.serverId,
onOutput: params.onOutput,
onClosed: params.onClosed,
});
const ok = this.sendEventToSession(node, "node.mcp.session.open", {
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
timeoutMs: params.timeoutMs,
});
if (!ok) {
this.activeMcpSessions.delete(sessionId);
return {
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: false,
error: { code: "UNAVAILABLE", message: "failed to send MCP session open to node" },
};
}
const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 30_000;
return await new Promise<NodeMcpOpenResult>((resolve) => {
const timer = setTimeout(() => {
this.pendingMcpOpens.delete(sessionId);
this.activeMcpSessions.delete(sessionId);
resolve({
sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: false,
error: { code: "TIMEOUT", message: "node MCP session open timed out" },
});
}, timeoutMs);
this.pendingMcpOpens.set(sessionId, {
nodeId: params.nodeId,
serverId: params.serverId,
resolve,
timer,
});
});
}
handleMcpSessionOpenResult(params: NodeMcpOpenResult): boolean {
const pending = this.pendingMcpOpens.get(params.sessionId);
if (!pending) {
return false;
}
if (pending.nodeId !== params.nodeId || pending.serverId !== params.serverId) {
return false;
}
clearTimeout(pending.timer);
this.pendingMcpOpens.delete(params.sessionId);
if (!params.ok) {
this.activeMcpSessions.delete(params.sessionId);
}
pending.resolve({
sessionId: params.sessionId,
nodeId: params.nodeId,
serverId: params.serverId,
ok: params.ok,
pid: params.pid,
error: params.error ?? null,
});
return true;
}
sendMcpInput(params: {
sessionId: string;
nodeId: string;
seq: number;
dataBase64: string;
}): boolean {
const active = this.activeMcpSessions.get(params.sessionId);
if (!active || active.nodeId !== params.nodeId) {
return false;
}
return this.sendEvent(params.nodeId, "node.mcp.session.input", params);
}
closeMcpSession(params: { sessionId: string; nodeId: string; reason?: string }): boolean {
const active = this.activeMcpSessions.get(params.sessionId);
if (!active || active.nodeId !== params.nodeId) {
return false;
}
this.activeMcpSessions.delete(params.sessionId);
return this.sendEvent(params.nodeId, "node.mcp.session.close", params);
}
handleMcpSessionOutput(params: NodeMcpOutputChunk): boolean {
const active = this.activeMcpSessions.get(params.sessionId);
if (!active || active.nodeId !== params.nodeId) {
return false;
}
active.onOutput?.(params);
return true;
}
handleMcpSessionClosed(params: NodeMcpClosedResult): boolean {
const active = this.activeMcpSessions.get(params.sessionId);
if (!active || active.nodeId !== params.nodeId) {
return false;
}
this.activeMcpSessions.delete(params.sessionId);
active.onClosed?.(params);
return true;
}
private sendEventInternal(node: NodeSession, event: string, payload: unknown): boolean {
try {
node.client.socket.send(

View File

@@ -5,6 +5,10 @@ import {
formatValidationErrors,
validateModelsListParams,
validateNodeEventResult,
validateNodeMcpServerDescriptor,
validateNodeMcpSessionInputEvent,
validateNodeMcpSessionOpenResultParams,
validateNodeMcpSessionOutputParams,
validateNodePresenceAlivePayload,
validateTalkConfigResult,
validateTalkRealtimeSessionParams,
@@ -232,3 +236,56 @@ describe("validateNodeEventResult", () => {
).toBe(true);
});
});
describe("node MCP protocol validators", () => {
it("accepts declared node-hosted MCP servers", () => {
expect(
validateNodeMcpServerDescriptor({
id: "computer-use",
displayName: "Computer Use",
provider: "openclaw",
transport: "stdio",
status: "missing_permissions",
requiredPermissions: ["accessibility", "screenRecording"],
}),
).toBe(true);
});
it("rejects unknown node-hosted MCP server statuses", () => {
expect(
validateNodeMcpServerDescriptor({
id: "computer-use",
status: "maybe",
}),
).toBe(false);
});
it("accepts MCP session lifecycle frames", () => {
expect(
validateNodeMcpSessionOpenResultParams({
sessionId: "session-1",
nodeId: "mac-node",
serverId: "computer-use",
ok: true,
pid: 123,
}),
).toBe(true);
expect(
validateNodeMcpSessionInputEvent({
sessionId: "session-1",
nodeId: "mac-node",
seq: 0,
dataBase64: "e30K",
}),
).toBe(true);
expect(
validateNodeMcpSessionOutputParams({
sessionId: "session-1",
nodeId: "mac-node",
seq: 0,
stream: "stdout",
dataBase64: "e30K",
}),
).toBe(true);
});
});

View File

@@ -179,6 +179,20 @@ import {
NodeEventParamsSchema,
type NodeEventResult,
NodeEventResultSchema,
type NodeMcpServerDescriptor,
NodeMcpServerDescriptorSchema,
type NodeMcpSessionClosedParams,
NodeMcpSessionClosedParamsSchema,
type NodeMcpSessionCloseEvent,
NodeMcpSessionCloseEventSchema,
type NodeMcpSessionInputEvent,
NodeMcpSessionInputEventSchema,
type NodeMcpSessionOpenEvent,
NodeMcpSessionOpenEventSchema,
type NodeMcpSessionOpenResultParams,
NodeMcpSessionOpenResultParamsSchema,
type NodeMcpSessionOutputParams,
NodeMcpSessionOutputParamsSchema,
type NodePendingDrainParams,
NodePendingDrainParamsSchema,
type NodePendingDrainResult,
@@ -395,6 +409,27 @@ export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams
);
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
export const validateNodeEventResult = ajv.compile<NodeEventResult>(NodeEventResultSchema);
export const validateNodeMcpServerDescriptor = ajv.compile<NodeMcpServerDescriptor>(
NodeMcpServerDescriptorSchema,
);
export const validateNodeMcpSessionOpenEvent = ajv.compile<NodeMcpSessionOpenEvent>(
NodeMcpSessionOpenEventSchema,
);
export const validateNodeMcpSessionOpenResultParams = ajv.compile<NodeMcpSessionOpenResultParams>(
NodeMcpSessionOpenResultParamsSchema,
);
export const validateNodeMcpSessionInputEvent = ajv.compile<NodeMcpSessionInputEvent>(
NodeMcpSessionInputEventSchema,
);
export const validateNodeMcpSessionOutputParams = ajv.compile<NodeMcpSessionOutputParams>(
NodeMcpSessionOutputParamsSchema,
);
export const validateNodeMcpSessionCloseEvent = ajv.compile<NodeMcpSessionCloseEvent>(
NodeMcpSessionCloseEventSchema,
);
export const validateNodeMcpSessionClosedParams = ajv.compile<NodeMcpSessionClosedParams>(
NodeMcpSessionClosedParamsSchema,
);
export const validateNodePresenceAlivePayload = ajv.compile<NodePresenceAlivePayload>(
NodePresenceAlivePayloadSchema,
);

View File

@@ -1,4 +1,5 @@
import { Type } from "typebox";
import { NodeMcpServerDescriptorSchema } from "./nodes.js";
import { GatewayClientIdSchema, GatewayClientModeSchema, NonEmptyString } from "./primitives.js";
import { SnapshotSchema, StateVersionSchema } from "./snapshot.js";
@@ -37,6 +38,7 @@ export const ConnectParamsSchema = Type.Object(
caps: Type.Optional(Type.Array(NonEmptyString, { default: [] })),
commands: Type.Optional(Type.Array(NonEmptyString)),
permissions: Type.Optional(Type.Record(NonEmptyString, Type.Boolean())),
mcpServers: Type.Optional(Type.Array(NodeMcpServerDescriptorSchema)),
pathEnv: Type.Optional(Type.String()),
role: Type.Optional(NonEmptyString),
scopes: Type.Optional(Type.Array(NonEmptyString)),

View File

@@ -1,4 +1,5 @@
import { Type } from "typebox";
import { NODE_MCP_SERVER_STATUSES } from "../../../shared/node-mcp-types.js";
import { NonEmptyString } from "./primitives.js";
const NodePendingWorkTypeSchema = Type.String({
@@ -44,6 +45,20 @@ export const NodeEventResultSchema = Type.Object(
{ additionalProperties: false },
);
export const NodeMcpServerDescriptorSchema = Type.Object(
{
id: NonEmptyString,
displayName: Type.Optional(NonEmptyString),
provider: Type.Optional(NonEmptyString),
transport: Type.Optional(Type.Literal("stdio")),
source: Type.Optional(NonEmptyString),
status: Type.Optional(Type.String({ enum: [...NODE_MCP_SERVER_STATUSES] })),
requiredPermissions: Type.Optional(Type.Array(NonEmptyString)),
metadata: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
},
{ additionalProperties: false },
);
export const NodePairRequestParamsSchema = Type.Object(
{
nodeId: NonEmptyString,
@@ -56,6 +71,7 @@ export const NodePairRequestParamsSchema = Type.Object(
modelIdentifier: Type.Optional(NonEmptyString),
caps: Type.Optional(Type.Array(NonEmptyString)),
commands: Type.Optional(Type.Array(NonEmptyString)),
mcpServers: Type.Optional(Type.Array(NodeMcpServerDescriptorSchema)),
remoteIp: Type.Optional(NonEmptyString),
silent: Type.Optional(Type.Boolean()),
},
@@ -204,3 +220,75 @@ export const NodeInvokeRequestEventSchema = Type.Object(
},
{ additionalProperties: false },
);
const NodeMcpSessionErrorSchema = Type.Object(
{
code: Type.Optional(NonEmptyString),
message: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const NodeMcpSessionOpenEventSchema = Type.Object(
{
sessionId: NonEmptyString,
nodeId: NonEmptyString,
serverId: NonEmptyString,
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const NodeMcpSessionOpenResultParamsSchema = Type.Object(
{
sessionId: NonEmptyString,
nodeId: NonEmptyString,
serverId: NonEmptyString,
ok: Type.Boolean(),
pid: Type.Optional(Type.Integer({ minimum: 0 })),
error: Type.Optional(NodeMcpSessionErrorSchema),
},
{ additionalProperties: false },
);
export const NodeMcpSessionInputEventSchema = Type.Object(
{
sessionId: NonEmptyString,
nodeId: NonEmptyString,
seq: Type.Integer({ minimum: 0 }),
dataBase64: NonEmptyString,
},
{ additionalProperties: false },
);
export const NodeMcpSessionOutputParamsSchema = Type.Object(
{
sessionId: NonEmptyString,
nodeId: NonEmptyString,
seq: Type.Integer({ minimum: 0 }),
stream: Type.String({ enum: ["stdout", "stderr"] }),
dataBase64: NonEmptyString,
},
{ additionalProperties: false },
);
export const NodeMcpSessionCloseEventSchema = Type.Object(
{
sessionId: NonEmptyString,
nodeId: NonEmptyString,
reason: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const NodeMcpSessionClosedParamsSchema = Type.Object(
{
sessionId: NonEmptyString,
nodeId: NonEmptyString,
ok: Type.Boolean(),
exitCode: Type.Optional(Type.Union([Type.Integer(), Type.Null()])),
signal: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
error: Type.Optional(NodeMcpSessionErrorSchema),
},
{ additionalProperties: false },
);

View File

@@ -141,6 +141,13 @@ import {
NodeDescribeParamsSchema,
NodeEventParamsSchema,
NodeEventResultSchema,
NodeMcpServerDescriptorSchema,
NodeMcpSessionClosedParamsSchema,
NodeMcpSessionCloseEventSchema,
NodeMcpSessionInputEventSchema,
NodeMcpSessionOpenEventSchema,
NodeMcpSessionOpenResultParamsSchema,
NodeMcpSessionOutputParamsSchema,
NodePendingDrainParamsSchema,
NodePendingDrainResultSchema,
NodePendingEnqueueParamsSchema,
@@ -248,6 +255,13 @@ export const ProtocolSchemas = {
NodeInvokeResultParams: NodeInvokeResultParamsSchema,
NodeEventParams: NodeEventParamsSchema,
NodeEventResult: NodeEventResultSchema,
NodeMcpServerDescriptor: NodeMcpServerDescriptorSchema,
NodeMcpSessionOpenEvent: NodeMcpSessionOpenEventSchema,
NodeMcpSessionOpenResultParams: NodeMcpSessionOpenResultParamsSchema,
NodeMcpSessionInputEvent: NodeMcpSessionInputEventSchema,
NodeMcpSessionOutputParams: NodeMcpSessionOutputParamsSchema,
NodeMcpSessionCloseEvent: NodeMcpSessionCloseEventSchema,
NodeMcpSessionClosedParams: NodeMcpSessionClosedParamsSchema,
NodePresenceAlivePayload: NodePresenceAlivePayloadSchema,
NodePresenceAliveReason: NodePresenceAliveReasonSchema,
NodePendingDrainParams: NodePendingDrainParamsSchema,

View File

@@ -35,6 +35,13 @@ export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;
export type NodeEventParams = SchemaType<"NodeEventParams">;
export type NodeEventResult = SchemaType<"NodeEventResult">;
export type NodeMcpServerDescriptor = SchemaType<"NodeMcpServerDescriptor">;
export type NodeMcpSessionOpenEvent = SchemaType<"NodeMcpSessionOpenEvent">;
export type NodeMcpSessionOpenResultParams = SchemaType<"NodeMcpSessionOpenResultParams">;
export type NodeMcpSessionInputEvent = SchemaType<"NodeMcpSessionInputEvent">;
export type NodeMcpSessionOutputParams = SchemaType<"NodeMcpSessionOutputParams">;
export type NodeMcpSessionCloseEvent = SchemaType<"NodeMcpSessionCloseEvent">;
export type NodeMcpSessionClosedParams = SchemaType<"NodeMcpSessionClosedParams">;
export type NodePresenceAlivePayload = SchemaType<"NodePresenceAlivePayload">;
export type NodePresenceAliveReason = SchemaType<"NodePresenceAliveReason">;
export type NodePendingDrainParams = SchemaType<"NodePendingDrainParams">;

View File

@@ -21,10 +21,12 @@ describe("gateway role policy", () => {
test("authorizes roles against node vs operator methods", () => {
expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true);
expect(isRoleAuthorizedForMethod("node", "node.mcp.session.output")).toBe(true);
expect(isRoleAuthorizedForMethod("node", "node.pending.drain")).toBe(true);
expect(isRoleAuthorizedForMethod("node", "status")).toBe(false);
expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true);
expect(isRoleAuthorizedForMethod("operator", "node.pending.drain")).toBe(false);
expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false);
expect(isRoleAuthorizedForMethod("operator", "node.mcp.session.output")).toBe(false);
});
});

View File

@@ -127,6 +127,9 @@ const BASE_METHODS = [
"node.pending.pull",
"node.pending.ack",
"node.invoke.result",
"node.mcp.session.open.result",
"node.mcp.session.output",
"node.mcp.session.closed",
"node.event",
"node.canvas.capability.refresh",
"cron.list",
@@ -172,6 +175,9 @@ export const GATEWAY_EVENTS = [
"node.pair.requested",
"node.pair.resolved",
"node.invoke.request",
"node.mcp.session.open",
"node.mcp.session.input",
"node.mcp.session.close",
"device.pair.requested",
"device.pair.resolved",
"voicewake.changed",

View File

@@ -0,0 +1,73 @@
import { describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
requestNodePairing: vi.fn(),
}));
vi.mock("../../infra/node-pairing.js", async (importOriginal) => ({
...(await importOriginal<typeof import("../../infra/node-pairing.js")>()),
requestNodePairing: mocks.requestNodePairing,
}));
import { nodeHandlers } from "./nodes.js";
describe("node.pair.request", () => {
it("forwards declared MCP servers into the pairing request", async () => {
mocks.requestNodePairing.mockResolvedValue({
status: "pending",
created: true,
request: {
id: "pair-1",
nodeId: "mac-node",
},
});
const respond = vi.fn();
const broadcast = vi.fn();
await nodeHandlers["node.pair.request"]({
req: { type: "req", id: "req-pair", method: "node.pair.request" },
params: {
nodeId: "mac-node",
displayName: "Mac",
platform: "macOS",
caps: ["mcpHost"],
mcpServers: [
{
id: "computer-use",
displayName: "Computer Use",
status: "ready",
transport: "stdio",
},
],
},
respond,
context: { broadcast } as never,
client: null,
isWebchatConnect: () => false,
});
expect(mocks.requestNodePairing).toHaveBeenCalledWith(
expect.objectContaining({
nodeId: "mac-node",
mcpServers: [
{
id: "computer-use",
displayName: "Computer Use",
status: "ready",
transport: "stdio",
},
],
}),
);
expect(broadcast).toHaveBeenCalledWith(
"node.pair.requested",
expect.objectContaining({ nodeId: "mac-node" }),
{ dropIfSlow: true },
);
expect(respond).toHaveBeenCalledWith(
true,
expect.objectContaining({ status: "pending" }),
undefined,
);
});
});

View File

@@ -42,6 +42,9 @@ import {
validateNodeEventParams,
validateNodeInvokeParams,
validateNodeListParams,
validateNodeMcpSessionClosedParams,
validateNodeMcpSessionOpenResultParams,
validateNodeMcpSessionOutputParams,
validateNodePendingAckParams,
validateNodePairApproveParams,
validateNodePairListParams,
@@ -544,6 +547,7 @@ export const nodeHandlers: GatewayRequestHandlers = {
modelIdentifier: p.modelIdentifier,
caps: p.caps,
commands: p.commands,
mcpServers: p.mcpServers,
permissions: p.permissions,
remoteIp: p.remoteIp,
silent: p.silent,
@@ -1166,6 +1170,94 @@ export const nodeHandlers: GatewayRequestHandlers = {
});
},
"node.invoke.result": handleNodeInvokeResult,
"node.mcp.session.open.result": async ({ params, respond, context, client }) => {
if (!validateNodeMcpSessionOpenResultParams(params)) {
respondInvalidParams({
respond,
method: "node.mcp.session.open.result",
validator: validateNodeMcpSessionOpenResultParams,
});
return;
}
const p = params as {
sessionId: string;
nodeId: string;
serverId: string;
ok: boolean;
pid?: number;
error?: { code?: string; message?: string } | null;
};
const callerNodeId = client?.connect?.device?.id ?? client?.connect?.client?.id;
if (callerNodeId && callerNodeId !== p.nodeId) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "nodeId mismatch"));
return;
}
const handled = context.nodeRegistry.handleMcpSessionOpenResult({
sessionId: p.sessionId,
nodeId: p.nodeId,
serverId: p.serverId,
ok: p.ok,
pid: p.pid,
error: p.error ?? null,
});
respond(true, { ok: true, ignored: !handled }, undefined);
},
"node.mcp.session.output": async ({ params, respond, context, client }) => {
if (!validateNodeMcpSessionOutputParams(params)) {
respondInvalidParams({
respond,
method: "node.mcp.session.output",
validator: validateNodeMcpSessionOutputParams,
});
return;
}
const p = params as {
sessionId: string;
nodeId: string;
seq: number;
stream: "stdout" | "stderr";
dataBase64: string;
};
const callerNodeId = client?.connect?.device?.id ?? client?.connect?.client?.id;
if (callerNodeId && callerNodeId !== p.nodeId) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "nodeId mismatch"));
return;
}
const handled = context.nodeRegistry.handleMcpSessionOutput(p);
respond(true, { ok: true, ignored: !handled }, undefined);
},
"node.mcp.session.closed": async ({ params, respond, context, client }) => {
if (!validateNodeMcpSessionClosedParams(params)) {
respondInvalidParams({
respond,
method: "node.mcp.session.closed",
validator: validateNodeMcpSessionClosedParams,
});
return;
}
const p = params as {
sessionId: string;
nodeId: string;
ok: boolean;
exitCode?: number | null;
signal?: string | null;
error?: { code?: string; message?: string } | null;
};
const callerNodeId = client?.connect?.device?.id ?? client?.connect?.client?.id;
if (callerNodeId && callerNodeId !== p.nodeId) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "nodeId mismatch"));
return;
}
const handled = context.nodeRegistry.handleMcpSessionClosed({
sessionId: p.sessionId,
nodeId: p.nodeId,
ok: p.ok,
exitCode: p.exitCode,
signal: p.signal,
error: p.error ?? null,
});
respond(true, { ok: true, ignored: !handled }, undefined);
},
"node.event": async ({ params, respond, context, client }) => {
if (!validateNodeEventParams(params)) {
respondInvalidParams({

View File

@@ -1,3 +1,7 @@
import {
NodeMcpClientTransport,
type NodeMcpClientTransportOptions,
} from "./node-mcp-client-transport.js";
import { NodeRegistry } from "./node-registry.js";
import {
createSessionEventSubscriberRegistry,
@@ -27,6 +31,8 @@ export function createGatewayNodeSessionRuntime(params: {
params.broadcast("voicewake.changed", { triggers }, { dropIfSlow: true });
};
const hasMobileNodeConnected = () => hasConnectedMobileNode(nodeRegistry);
const createNodeMcpClientTransport = (options: NodeMcpClientTransportOptions) =>
new NodeMcpClientTransport(nodeRegistry, options);
return {
nodeRegistry,
@@ -39,6 +45,7 @@ export function createGatewayNodeSessionRuntime(params: {
nodeUnsubscribe: nodeSubscriptions.unsubscribe,
nodeUnsubscribeAll: nodeSubscriptions.unsubscribeAll,
broadcastVoiceWakeChanged,
createNodeMcpClientTransport,
hasMobileNodeConnected,
};
}

View File

@@ -1282,6 +1282,7 @@ export function attachGatewayWsMessageHandler(params: {
});
}
connectParams.commands = reconciliation.effectiveCommands;
connectParams.mcpServers = reconciliation.effectiveMcpServers;
}
const shouldTrackPresence = !isGatewayCliClient(connectParams.client);

View File

@@ -6,11 +6,16 @@ export const OPERATOR_PAIRING_SCOPE: NodeApprovalScope = "operator.pairing";
export const OPERATOR_WRITE_SCOPE: NodeApprovalScope = "operator.write";
export const OPERATOR_ADMIN_SCOPE: NodeApprovalScope = "operator.admin";
export function resolveNodePairApprovalScopes(commands: unknown): NodeApprovalScope[] {
export function resolveNodePairApprovalScopes(
commands: unknown,
opts?: { mcpServers?: unknown },
): NodeApprovalScope[] {
const normalized = Array.isArray(commands)
? commands.filter((command): command is string => typeof command === "string")
: [];
const hasMcpServers = Array.isArray(opts?.mcpServers) && opts.mcpServers.length > 0;
if (
hasMcpServers ||
normalized.some((command) => NODE_SYSTEM_RUN_COMMANDS.some((allowed) => allowed === command))
) {
return [OPERATOR_PAIRING_SCOPE, OPERATOR_ADMIN_SCOPE];

View File

@@ -238,6 +238,27 @@ describe("node pairing tokens", () => {
});
await expect(getPairedNode("node-1", baseDir)).resolves.toBeNull();
const mcpHostRequest = await requestNodePairing(
{
nodeId: "node-mcp",
platform: "darwin",
mcpServers: [{ id: "computer-use", displayName: "Computer Use" }],
},
baseDir,
);
await expect(
approveNodePairing(
mcpHostRequest.request.requestId,
{ callerScopes: ["operator.pairing"] },
baseDir,
),
).resolves.toEqual({
status: "forbidden",
missingScope: "operator.admin",
});
await expect(getPairedNode("node-mcp", baseDir)).resolves.toBeNull();
const commandlessRequest = await requestNodePairing(
{
nodeId: "node-2",

View File

@@ -1,4 +1,8 @@
import { randomUUID } from "node:crypto";
import {
normalizeNodeMcpServerDescriptors,
type NodeMcpServerDescriptor,
} from "../shared/node-mcp-types.js";
import { resolveMissingRequestedScope } from "../shared/operator-scope-compat.js";
import { normalizeArrayBackedTrimmedStringList } from "../shared/string-normalization.js";
import { type NodeApprovalScope, resolveNodePairApprovalScopes } from "./node-pairing-authz.js";
@@ -25,6 +29,7 @@ export type NodeDeclaredSurface = {
modelIdentifier?: string;
caps?: string[];
commands?: string[];
mcpServers?: NodeMcpServerDescriptor[];
permissions?: Record<string, boolean>;
remoteIp?: string;
};
@@ -86,6 +91,7 @@ function buildPendingNodePairingRequest(params: {
modelIdentifier: params.req.modelIdentifier,
caps: normalizeArrayBackedTrimmedStringList(params.req.caps),
commands: normalizeArrayBackedTrimmedStringList(params.req.commands),
mcpServers: normalizeNodeMcpServerDescriptors(params.req.mcpServers),
permissions: params.req.permissions,
remoteIp: params.req.remoteIp,
silent: params.req.silent,
@@ -108,6 +114,7 @@ function refreshPendingNodePairingRequest(
modelIdentifier: incoming.modelIdentifier ?? existing.modelIdentifier,
caps: normalizeArrayBackedTrimmedStringList(incoming.caps) ?? existing.caps,
commands: normalizeArrayBackedTrimmedStringList(incoming.commands) ?? existing.commands,
mcpServers: normalizeNodeMcpServerDescriptors(incoming.mcpServers) ?? existing.mcpServers,
permissions: incoming.permissions ?? existing.permissions,
remoteIp: incoming.remoteIp ?? existing.remoteIp,
// Preserve interactive visibility if either request needs attention.
@@ -120,7 +127,7 @@ function resolveNodeApprovalRequiredScopes(
pending: NodePairingPendingRequest,
): NodeApprovalScope[] {
const commands = Array.isArray(pending.commands) ? pending.commands : [];
return resolveNodePairApprovalScopes(commands);
return resolveNodePairApprovalScopes(commands, { mcpServers: pending.mcpServers });
}
function toPendingNodePairingEntry(pending: NodePairingPendingRequest): NodePairingPendingEntry {
@@ -258,6 +265,7 @@ export async function approveNodePairing(
modelIdentifier: pending.modelIdentifier,
caps: pending.caps,
commands: pending.commands,
mcpServers: pending.mcpServers,
permissions: pending.permissions,
remoteIp: pending.remoteIp,
createdAtMs: existing?.createdAtMs ?? now,
@@ -345,6 +353,7 @@ export async function updatePairedNodeMetadata(
remoteIp: patch.remoteIp ?? existing.remoteIp,
caps: patch.caps ?? existing.caps,
commands: patch.commands ?? existing.commands,
mcpServers: patch.mcpServers ?? existing.mcpServers,
bins: patch.bins ?? existing.bins,
permissions: patch.permissions ?? existing.permissions,
lastConnectedAtMs: patch.lastConnectedAtMs ?? existing.lastConnectedAtMs,

View File

@@ -1,3 +1,5 @@
import type { NodeMcpServerDescriptor } from "./node-mcp-types.js";
export type NodeListNode = {
nodeId: string;
displayName?: string;
@@ -13,6 +15,7 @@ export type NodeListNode = {
pathEnv?: string;
caps?: string[];
commands?: string[];
mcpServers?: NodeMcpServerDescriptor[];
permissions?: Record<string, boolean>;
paired?: boolean;
connected?: boolean;
@@ -33,6 +36,7 @@ export type PendingRequest = {
remoteIp?: string;
ts: number;
commands?: string[];
mcpServers?: NodeMcpServerDescriptor[];
requiredApproveScopes?: Array<"operator.pairing" | "operator.write" | "operator.admin">;
};
@@ -46,6 +50,7 @@ export type PairedNode = {
uiVersion?: string;
remoteIp?: string;
permissions?: Record<string, boolean>;
mcpServers?: NodeMcpServerDescriptor[];
createdAtMs?: number;
approvedAtMs?: number;
lastConnectedAtMs?: number;

View File

@@ -0,0 +1,95 @@
export const NODE_MCP_SERVER_STATUSES = [
"ready",
"disabled",
"missing_permissions",
"missing_backend",
"unsupported",
"error",
] as const;
export type NodeMcpServerStatus = (typeof NODE_MCP_SERVER_STATUSES)[number];
export type NodeMcpServerDescriptor = {
id: string;
displayName?: string;
provider?: string;
transport?: "stdio";
source?: string;
status?: NodeMcpServerStatus;
requiredPermissions?: string[];
metadata?: Record<string, unknown>;
};
function nonEmptyTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
function normalizeStringList(value: unknown): string[] | undefined {
if (!Array.isArray(value)) {
return undefined;
}
const values = new Set<string>();
for (const item of value) {
const trimmed = nonEmptyTrimmedString(item);
if (trimmed) {
values.add(trimmed);
}
}
return [...values].toSorted((left, right) => left.localeCompare(right));
}
function isNodeMcpServerStatus(value: unknown): value is NodeMcpServerStatus {
return (
typeof value === "string" && (NODE_MCP_SERVER_STATUSES as readonly string[]).includes(value)
);
}
export function normalizeNodeMcpServerDescriptors(
value: unknown,
): NodeMcpServerDescriptor[] | undefined {
if (!Array.isArray(value)) {
return undefined;
}
const descriptors: NodeMcpServerDescriptor[] = [];
const seen = new Set<string>();
for (const item of value) {
if (!item || typeof item !== "object") {
continue;
}
const raw = item as Record<string, unknown>;
const id = nonEmptyTrimmedString(raw.id);
if (!id || seen.has(id)) {
continue;
}
seen.add(id);
const displayName = nonEmptyTrimmedString(raw.displayName);
const provider = nonEmptyTrimmedString(raw.provider);
const source = nonEmptyTrimmedString(raw.source);
const requiredPermissions = normalizeStringList(raw.requiredPermissions);
const transport = raw.transport === "stdio" ? "stdio" : undefined;
const status = isNodeMcpServerStatus(raw.status) ? raw.status : undefined;
const metadata =
raw.metadata && typeof raw.metadata === "object" && !Array.isArray(raw.metadata)
? (raw.metadata as Record<string, unknown>)
: undefined;
descriptors.push({
id,
...(displayName ? { displayName } : {}),
...(provider ? { provider } : {}),
...(transport ? { transport } : {}),
...(source ? { source } : {}),
...(status ? { status } : {}),
...(requiredPermissions ? { requiredPermissions } : {}),
...(metadata ? { metadata } : {}),
});
}
return descriptors.length > 0 ? descriptors : undefined;
}
export function normalizeNodeMcpServerIds(value: unknown): string[] {
return (normalizeNodeMcpServerDescriptors(value) ?? []).map((descriptor) => descriptor.id);
}