diff --git a/CHANGELOG.md b/CHANGELOG.md index 53dbaa6649f..078929b05a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ Docs: https://docs.openclaw.ai - Telegram/pairing: ignore self-authored DM `message` updates so bot-pinned status cards and similar service updates do not trigger bogus pairing requests or re-enter inbound dispatch. (#54530) thanks @huntharo - Mattermost/replies: keep pairing replies, slash-command fallback replies, and model-picker messages on the resolved config path so `exec:` SecretRef bot tokens work across all outbound reply branches. (#48347) thanks @mathiasnagler. - Microsoft Teams/config: accept the existing `welcomeCard`, `groupWelcomeCard`, `promptStarters`, and feedback/reflection keys in strict config validation so already-supported Teams runtime settings stop failing schema checks. (#54679) Thanks @gumclaw. +- MCP/channels: add a Gateway-backed channel MCP bridge with Codex/Claude-facing conversation tools, Claude channel notifications, and safer stdio bridge lifecycle handling for reconnects and routed session discovery. - Plugins/SDK: thread `moduleUrl` through plugin-sdk alias resolution so user-installed plugins outside the openclaw directory (e.g. `~/.openclaw/extensions/`) correctly resolve `openclaw/plugin-sdk/*` subpath imports, and gate `plugin-sdk:check-exports` in `release:check`. (#54283) Thanks @xieyongliang. - Config/web fetch: allow the documented `tools.web.fetch.maxResponseBytes` setting in runtime schema validation so valid configs no longer fail with unrecognized-key errors. (#53401) Thanks @erhhung. - Message tool/buttons: keep the shared `buttons` schema optional in merged tool definitions so plain `action=send` calls stop failing validation when no buttons are provided. (#54418) Thanks @adzendo. diff --git a/docs/cli/acp.md b/docs/cli/acp.md index d30deb49eca..17c23527594 100644 --- a/docs/cli/acp.md +++ b/docs/cli/acp.md @@ -17,6 +17,10 @@ over WebSocket. It keeps ACP sessions mapped to Gateway session keys. runtime. It focuses on session routing, prompt delivery, and basic streaming updates. +If you want an external MCP client to talk directly to OpenClaw channel +conversations instead of hosting an ACP harness session, use +[`openclaw mcp serve`](/cli/mcp) instead. + ## Compatibility Matrix | ACP area | Status | Notes | diff --git a/docs/cli/index.md b/docs/cli/index.md index 7c5b6201b15..830b05450d3 100644 --- a/docs/cli/index.md +++ b/docs/cli/index.md @@ -27,6 +27,7 @@ This page describes the current CLI behavior. If commands change, update this do - [`agent`](/cli/agent) - [`agents`](/cli/agents) - [`acp`](/cli/acp) +- [`mcp`](/cli/mcp) - [`status`](/cli/status) - [`health`](/cli/health) - [`sessions`](/cli/sessions) @@ -155,6 +156,7 @@ openclaw [--dev] [--profile ] add delete acp + mcp status health sessions diff --git a/docs/cli/mcp.md b/docs/cli/mcp.md new file mode 100644 index 00000000000..2a418bfaedd --- /dev/null +++ b/docs/cli/mcp.md @@ -0,0 +1,251 @@ +--- +summary: "Expose OpenClaw channel conversations over MCP and manage saved MCP server definitions" +read_when: + - Connecting Codex, Claude Code, or another MCP client to OpenClaw-backed channels + - Running `openclaw mcp serve` + - Managing OpenClaw-saved MCP server definitions +title: "mcp" +--- + +# mcp + +`openclaw mcp` has two jobs: + +- run a Gateway-backed MCP bridge with `openclaw mcp serve` +- manage OpenClaw-saved MCP server definitions with `list`, `show`, `set`, and `unset` + +Use `openclaw mcp serve` when an external MCP client should talk directly to +OpenClaw channel conversations. + +Use [`openclaw acp`](/cli/acp) when OpenClaw should host a coding harness +session itself and route that runtime through ACP. + +## What `serve` does + +`openclaw mcp serve` starts a stdio MCP server that connects to a local or +remote OpenClaw Gateway over WebSocket. + +The bridge uses existing Gateway session route metadata to expose channel-backed +conversations. In practice, that means a conversation appears when OpenClaw has +session state with a known channel route such as: + +- `channel` +- `to` +- optional `accountId` +- optional `threadId` + +This gives MCP clients one place to: + +- list recent routed conversations +- read recent transcript history +- wait for new inbound events +- send a reply back through the same route +- see approval requests that arrive while the bridge is connected + +## Usage + +```bash +# Local Gateway +openclaw mcp serve + +# Remote Gateway +openclaw mcp serve --url wss://gateway-host:18789 --token-file ~/.openclaw/gateway.token + +# Remote Gateway with password auth +openclaw mcp serve --url wss://gateway-host:18789 --password-file ~/.openclaw/gateway.password + +# Enable verbose bridge logs +openclaw mcp serve --verbose + +# Disable Claude-specific push notifications +openclaw mcp serve --claude-channel-mode off +``` + +## Bridge tools + +The current bridge exposes these MCP tools: + +- `conversations_list` +- `conversation_get` +- `messages_read` +- `attachments_fetch` +- `events_poll` +- `events_wait` +- `messages_send` +- `permissions_list_open` +- `permissions_respond` + +### `conversations_list` + +Lists recent session-backed conversations that already have route metadata in +Gateway session state. + +Useful filters: + +- `limit` +- `search` +- `channel` +- `includeDerivedTitles` +- `includeLastMessage` + +### `conversation_get` + +Returns one conversation by `session_key`. + +### `messages_read` + +Reads recent transcript messages for one session-backed conversation. + +### `attachments_fetch` + +Extracts non-text message content blocks from one transcript message. This is a +metadata view over transcript content, not a standalone durable attachment blob +store. + +### `events_poll` + +Reads queued live events since a numeric cursor. + +### `events_wait` + +Long-polls until the next matching queued event arrives or a timeout expires. + +### `messages_send` + +Sends text back through the same route already recorded on the session. + +Current behavior: + +- requires an existing conversation route +- uses the session's channel, recipient, account id, and thread id +- sends text only + +### `permissions_list_open` + +Lists pending exec/plugin approval requests the bridge has observed since it +connected to the Gateway. + +### `permissions_respond` + +Resolves one pending exec/plugin approval request with: + +- `allow-once` +- `allow-always` +- `deny` + +## Event model + +The bridge keeps an in-memory event queue while it is connected. + +Current event types: + +- `message` +- `exec_approval_requested` +- `exec_approval_resolved` +- `plugin_approval_requested` +- `plugin_approval_resolved` +- `claude_permission_request` + +Important limits: + +- the queue is live-only; it starts when the MCP bridge starts +- `events_poll` and `events_wait` do not replay older Gateway history by + themselves +- durable backlog should be read with `messages_read` + +## Claude channel notifications + +The bridge can also expose Claude-specific channel notifications. + +Flags: + +- `--claude-channel-mode off`: standard MCP tools only +- `--claude-channel-mode on`: enable Claude channel notifications +- `--claude-channel-mode auto`: current default; same bridge behavior as `on` + +When Claude channel mode is enabled, the server advertises Claude experimental +capabilities and can emit: + +- `notifications/claude/channel` +- `notifications/claude/channel/permission` + +Current bridge behavior: + +- inbound `user` transcript messages are forwarded as + `notifications/claude/channel` +- Claude permission requests received over MCP are tracked in-memory +- if the linked conversation later sends `yes abcde` or `no abcde`, the bridge + converts that to `notifications/claude/channel/permission` + +This is intentionally client-specific. Generic MCP clients should rely on the +standard polling tools. + +## MCP client config + +Example stdio client config: + +```json +{ + "mcpServers": { + "openclaw": { + "command": "openclaw", + "args": [ + "mcp", + "serve", + "--url", + "wss://gateway-host:18789", + "--token-file", + "/path/to/gateway.token" + ] + } + } +} +``` + +## Options + +`openclaw mcp serve` supports: + +- `--url `: Gateway WebSocket URL +- `--token `: Gateway token +- `--token-file `: read token from file +- `--password `: Gateway password +- `--password-file `: read password from file +- `--claude-channel-mode `: Claude notification mode +- `-v`, `--verbose`: verbose logs on stderr + +## Saved MCP server definitions + +OpenClaw also stores a lightweight MCP server registry in config for surfaces +that want OpenClaw-managed MCP definitions. + +Commands: + +- `openclaw mcp list` +- `openclaw mcp show [name]` +- `openclaw mcp set ` +- `openclaw mcp unset ` + +Examples: + +```bash +openclaw mcp list +openclaw mcp show context7 --json +openclaw mcp set context7 '{"command":"uvx","args":["context7-mcp"]}' +openclaw mcp unset context7 +``` + +These commands manage saved config only. They do not start the channel bridge. + +## Current limits + +This page documents the bridge as shipped today. + +Current limits: + +- conversation discovery depends on existing Gateway session route metadata +- no generic push protocol beyond the Claude-specific adapter +- no message edit or react tools yet +- no dedicated HTTP MCP transport yet +- `permissions_list_open` only includes approvals observed while the bridge is + connected diff --git a/docs/docs.json b/docs/docs.json index d281d193cb3..d5c3f9ae86b 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -1431,7 +1431,14 @@ }, { "group": "Utility", - "pages": ["cli/acp", "cli/clawbot", "cli/completion", "cli/dns", "cli/docs"] + "pages": [ + "cli/acp", + "cli/clawbot", + "cli/completion", + "cli/dns", + "cli/docs", + "cli/mcp" + ] } ] }, diff --git a/docs/tools/acp-agents.md b/docs/tools/acp-agents.md index 918cade6dc5..23b9f7b388f 100644 --- a/docs/tools/acp-agents.md +++ b/docs/tools/acp-agents.md @@ -15,6 +15,10 @@ title: "ACP Agents" If you ask OpenClaw in plain language to "run this in Codex" or "start Claude Code in a thread", OpenClaw should route that request to the ACP runtime (not the native sub-agent runtime). +If you want Codex or Claude Code to connect as an external MCP client directly +to existing OpenClaw channel conversations, use [`openclaw mcp serve`](/cli/mcp) +instead of ACP. + ## Fast operator flow Use this when you want a practical `/acp` runbook: diff --git a/src/cli/mcp-cli.test.ts b/src/cli/mcp-cli.test.ts index 57e9f16e0d7..6531636bc37 100644 --- a/src/cli/mcp-cli.test.ts +++ b/src/cli/mcp-cli.test.ts @@ -9,11 +9,16 @@ import { createCliRuntimeCapture } from "./test-runtime-capture.js"; const { defaultRuntime, resetRuntimeCapture } = createCliRuntimeCapture(); const mockLog = defaultRuntime.log; const mockError = defaultRuntime.error; +const serveOpenClawChannelMcp = vi.fn(); vi.mock("../runtime.js", () => ({ defaultRuntime, })); +vi.mock("../mcp/channel-server.js", () => ({ + serveOpenClawChannelMcp, +})); + const tempDirs: string[] = []; async function createWorkspace(): Promise { @@ -74,4 +79,33 @@ describe("mcp cli", () => { ); }); }); + + it("starts the channel bridge with parsed serve options", async () => { + await withTempHome("openclaw-cli-mcp-home-", async () => { + const workspaceDir = await createWorkspace(); + const tokenFile = path.join(workspaceDir, "gateway.token"); + vi.spyOn(process, "cwd").mockReturnValue(workspaceDir); + await fs.writeFile(tokenFile, "secret-token\n", "utf-8"); + + await runMcpCommand([ + "mcp", + "serve", + "--url", + "ws://127.0.0.1:18789", + "--token-file", + tokenFile, + "--claude-channel-mode", + "on", + "--verbose", + ]); + + expect(serveOpenClawChannelMcp).toHaveBeenCalledWith({ + gatewayUrl: "ws://127.0.0.1:18789", + gatewayToken: "secret-token", + gatewayPassword: undefined, + claudeChannelMode: "on", + verbose: true, + }); + }); + }); }); diff --git a/src/cli/mcp-cli.ts b/src/cli/mcp-cli.ts index 677b3db28a9..f013e973b7a 100644 --- a/src/cli/mcp-cli.ts +++ b/src/cli/mcp-cli.ts @@ -1,10 +1,12 @@ import { Command } from "commander"; +import { readSecretFromFile } from "../acp/secret-file.js"; import { parseConfigValue } from "../auto-reply/reply/config-value.js"; import { listConfiguredMcpServers, setConfiguredMcpServer, unsetConfiguredMcpServer, } from "../config/mcp-config.js"; +import { serveOpenClawChannelMcp } from "../mcp/channel-server.js"; import { defaultRuntime } from "../runtime.js"; function fail(message: string): never { @@ -17,8 +19,91 @@ function printJson(value: unknown): void { defaultRuntime.writeJson(value); } +function resolveSecretOption(params: { + direct?: string; + file?: string; + directFlag: string; + fileFlag: string; + label: string; +}) { + const direct = params.direct?.trim(); + const file = params.file?.trim(); + if (direct && file) { + throw new Error(`Use either ${params.directFlag} or ${params.fileFlag} for ${params.label}.`); + } + if (file) { + return readSecretFromFile(file, params.label); + } + return direct || undefined; +} + +function warnSecretCliFlag(flag: "--token" | "--password") { + defaultRuntime.error( + `Warning: ${flag} can be exposed via process listings. Prefer ${flag}-file or environment variables.`, + ); +} + export function registerMcpCli(program: Command) { - const mcp = program.command("mcp").description("Manage OpenClaw MCP server config"); + const mcp = program.command("mcp").description("Manage OpenClaw MCP config and channel bridge"); + + mcp + .command("serve") + .description("Expose OpenClaw channels over MCP stdio") + .option("--url ", "Gateway WebSocket URL (defaults to gateway.remote.url when configured)") + .option("--token ", "Gateway token (if required)") + .option("--token-file ", "Read gateway token from file") + .option("--password ", "Gateway password (if required)") + .option("--password-file ", "Read gateway password from file") + .option( + "--claude-channel-mode ", + "Claude channel notification mode: auto, on, or off", + "auto", + ) + .option("-v, --verbose", "Verbose logging to stderr", false) + .action(async (opts) => { + try { + const gatewayToken = resolveSecretOption({ + direct: opts.token as string | undefined, + file: opts.tokenFile as string | undefined, + directFlag: "--token", + fileFlag: "--token-file", + label: "Gateway token", + }); + const gatewayPassword = resolveSecretOption({ + direct: opts.password as string | undefined, + file: opts.passwordFile as string | undefined, + directFlag: "--password", + fileFlag: "--password-file", + label: "Gateway password", + }); + if (opts.token) { + warnSecretCliFlag("--token"); + } + if (opts.password) { + warnSecretCliFlag("--password"); + } + const claudeChannelMode = String(opts.claudeChannelMode ?? "auto") + .trim() + .toLowerCase(); + if ( + claudeChannelMode !== "auto" && + claudeChannelMode !== "on" && + claudeChannelMode !== "off" + ) { + throw new Error("Invalid --claude-channel-mode value. Use auto, on, or off."); + } + await serveOpenClawChannelMcp({ + gatewayUrl: opts.url as string | undefined, + gatewayToken, + gatewayPassword, + claudeChannelMode, + verbose: Boolean(opts.verbose), + }); + } catch (err) { + defaultRuntime.error(String(err)); + defaultRuntime.exit(1); + } + }); mcp .command("list") diff --git a/src/cli/program/command-registry.ts b/src/cli/program/command-registry.ts index 43b57f638ce..11980d3e3d9 100644 --- a/src/cli/program/command-registry.ts +++ b/src/cli/program/command-registry.ts @@ -151,7 +151,7 @@ const coreEntries: CoreCliEntry[] = [ commands: [ { name: "mcp", - description: "Manage embedded Pi MCP servers", + description: "Manage OpenClaw MCP config and channel bridge", hasSubcommands: true, }, ], diff --git a/src/mcp/channel-server.test.ts b/src/mcp/channel-server.test.ts new file mode 100644 index 00000000000..41c3b84b5ce --- /dev/null +++ b/src/mcp/channel-server.test.ts @@ -0,0 +1,462 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { InMemoryTransport } from "@modelcontextprotocol/sdk/inMemory.js"; +import { afterEach, describe, expect, test, vi } from "vitest"; +import { z } from "zod"; +import { testState } from "../gateway/test-helpers.mocks.js"; +import { + createGatewaySuiteHarness, + installGatewayTestHooks, + writeSessionStore, +} from "../gateway/test-helpers.server.js"; +import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { createOpenClawChannelMcpServer, OpenClawChannelBridge } from "./channel-server.js"; + +installGatewayTestHooks(); + +const ClaudeChannelNotificationSchema = z.object({ + method: z.literal("notifications/claude/channel"), + params: z.object({ + content: z.string(), + meta: z.record(z.string(), z.string()), + }), +}); + +const ClaudePermissionNotificationSchema = z.object({ + method: z.literal("notifications/claude/channel/permission"), + params: z.object({ + request_id: z.string(), + behavior: z.enum(["allow", "deny"]), + }), +}); + +const cleanupDirs: string[] = []; + +afterEach(async () => { + await Promise.all( + cleanupDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })), + ); +}); + +async function createSessionStoreFile(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mcp-channel-")); + cleanupDirs.push(dir); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + return storePath; +} + +async function seedSession(params: { + storePath: string; + sessionKey: string; + sessionId: string; + route: { + channel: string; + to: string; + accountId?: string; + threadId?: string | number; + }; + entryOverrides?: Record; + transcriptMessages: Array<{ id: string; message: Record }>; +}) { + const transcriptPath = path.join(path.dirname(params.storePath), `${params.sessionId}.jsonl`); + await writeSessionStore({ + entries: { + [params.sessionKey.split(":").at(-1) ?? "main"]: { + sessionId: params.sessionId, + sessionFile: transcriptPath, + updatedAt: Date.now(), + lastChannel: params.route.channel, + lastTo: params.route.to, + lastAccountId: params.route.accountId, + lastThreadId: params.route.threadId, + ...params.entryOverrides, + }, + }, + storePath: params.storePath, + }); + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ type: "session", version: 1, id: params.sessionId }), + ...params.transcriptMessages.map((entry) => JSON.stringify(entry)), + ].join("\n"), + "utf-8", + ); + return transcriptPath; +} + +async function connectMcp(params: { + gatewayUrl: string; + gatewayToken: string; + claudeChannelMode?: "auto" | "on" | "off"; +}) { + const serverHarness = await createOpenClawChannelMcpServer({ + gatewayUrl: params.gatewayUrl, + gatewayToken: params.gatewayToken, + claudeChannelMode: params.claudeChannelMode ?? "auto", + }); + const client = new Client({ name: "mcp-test-client", version: "1.0.0" }); + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + await serverHarness.server.connect(serverTransport); + await client.connect(clientTransport); + await serverHarness.start(); + return { + client, + bridge: serverHarness.bridge, + close: async () => { + await client.close(); + await serverHarness.close(); + }, + }; +} + +describe("openclaw channel mcp server", () => { + test("lists conversations, reads messages, and waits for events", async () => { + const storePath = await createSessionStoreFile(); + const sessionKey = "agent:main:main"; + await seedSession({ + storePath, + sessionKey, + sessionId: "sess-main", + route: { + channel: "telegram", + to: "-100123", + accountId: "acct-1", + threadId: 42, + }, + transcriptMessages: [ + { + id: "msg-1", + message: { + role: "assistant", + content: [{ type: "text", text: "hello from transcript" }], + timestamp: Date.now(), + }, + }, + ], + }); + + const harness = await createGatewaySuiteHarness(); + let mcp: Awaited> | null = null; + try { + mcp = await connectMcp({ + gatewayUrl: `ws://127.0.0.1:${harness.port}`, + gatewayToken: "test-gateway-token-1234567890", + }); + + const listed = (await mcp.client.callTool({ + name: "conversations_list", + arguments: {}, + })) as { + structuredContent?: { conversations?: Array> }; + }; + expect(listed.structuredContent?.conversations).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + sessionKey, + channel: "telegram", + to: "-100123", + accountId: "acct-1", + threadId: 42, + }), + ]), + ); + + const read = (await mcp.client.callTool({ + name: "messages_read", + arguments: { session_key: sessionKey, limit: 5 }, + })) as { + structuredContent?: { messages?: Array> }; + }; + expect(read.structuredContent?.messages?.[0]).toMatchObject({ + role: "assistant", + content: [{ type: "text", text: "hello from transcript" }], + }); + + const waitPromise = mcp.client.callTool({ + name: "events_wait", + arguments: { session_key: sessionKey, after_cursor: 0, timeout_ms: 5_000 }, + }) as Promise<{ + structuredContent?: { event?: Record }; + }>; + + emitSessionTranscriptUpdate({ + sessionFile: path.join(path.dirname(storePath), "sess-main.jsonl"), + sessionKey, + messageId: "msg-2", + message: { + role: "user", + content: [{ type: "text", text: "inbound live message" }], + timestamp: Date.now(), + }, + }); + + const waited = await waitPromise; + expect(waited.structuredContent?.event).toMatchObject({ + type: "message", + sessionKey, + messageId: "msg-2", + role: "user", + text: "inbound live message", + }); + } finally { + await mcp?.close(); + await harness.close(); + } + }); + + test("sendMessage normalizes route metadata for gateway send", async () => { + const bridge = new OpenClawChannelBridge({} as never, { + claudeChannelMode: "off", + verbose: false, + }); + const gatewayRequest = vi.fn().mockResolvedValue({ ok: true, channel: "telegram" }); + + ( + bridge as unknown as { + gateway: { request: typeof gatewayRequest; stopAndWait: () => Promise }; + readySettled: boolean; + resolveReady: () => void; + } + ).gateway = { + request: gatewayRequest, + stopAndWait: async () => {}, + }; + ( + bridge as unknown as { + readySettled: boolean; + resolveReady: () => void; + } + ).readySettled = true; + ( + bridge as unknown as { + resolveReady: () => void; + } + ).resolveReady(); + + vi.spyOn(bridge, "getConversation").mockResolvedValue({ + sessionKey: "agent:main:main", + channel: "telegram", + to: "-100123", + accountId: "acct-1", + threadId: 42, + }); + + await bridge.sendMessage({ + sessionKey: "agent:main:main", + text: "reply from mcp", + }); + + expect(gatewayRequest).toHaveBeenCalledWith( + "send", + expect.objectContaining({ + to: "-100123", + channel: "telegram", + accountId: "acct-1", + threadId: "42", + sessionKey: "agent:main:main", + message: "reply from mcp", + }), + ); + }); + + test("lists routed sessions that only expose modern channel fields", async () => { + const bridge = new OpenClawChannelBridge({} as never, { + claudeChannelMode: "off", + verbose: false, + }); + const gatewayRequest = vi.fn().mockResolvedValue({ + sessions: [ + { + key: "agent:main:channel-field", + channel: "telegram", + deliveryContext: { + to: "-100111", + }, + }, + { + key: "agent:main:origin-field", + origin: { + provider: "imessage", + accountId: "imessage-default", + threadId: "thread-7", + }, + deliveryContext: { + to: "+15551230000", + }, + }, + ], + }); + + ( + bridge as unknown as { + gateway: { request: typeof gatewayRequest; stopAndWait: () => Promise }; + readySettled: boolean; + resolveReady: () => void; + } + ).gateway = { + request: gatewayRequest, + stopAndWait: async () => {}, + }; + ( + bridge as unknown as { + readySettled: boolean; + resolveReady: () => void; + } + ).readySettled = true; + ( + bridge as unknown as { + resolveReady: () => void; + } + ).resolveReady(); + + await expect(bridge.listConversations()).resolves.toEqual([ + expect.objectContaining({ + sessionKey: "agent:main:channel-field", + channel: "telegram", + to: "-100111", + }), + expect.objectContaining({ + sessionKey: "agent:main:origin-field", + channel: "imessage", + to: "+15551230000", + accountId: "imessage-default", + threadId: "thread-7", + }), + ]); + }); + + test("swallows notification send errors after channel replies are matched", async () => { + const bridge = new OpenClawChannelBridge({} as never, { + claudeChannelMode: "on", + verbose: false, + }); + + ( + bridge as unknown as { + pendingClaudePermissions: Map>; + server: { server: { notification: ReturnType } }; + } + ).pendingClaudePermissions.set("abcde", { + toolName: "Bash", + description: "run npm test", + inputPreview: '{"cmd":"npm test"}', + }); + ( + bridge as unknown as { + server: { server: { notification: ReturnType } }; + } + ).server = { + server: { + notification: vi.fn().mockRejectedValue(new Error("Not connected")), + }, + }; + + await expect( + ( + bridge as unknown as { + handleSessionMessageEvent: (payload: Record) => Promise; + } + ).handleSessionMessageEvent({ + sessionKey: "agent:main:main", + message: { + role: "user", + content: [{ type: "text", text: "yes abcde" }], + }, + }), + ).resolves.toBeUndefined(); + }); + + test("emits Claude channel and permission notifications", async () => { + const storePath = await createSessionStoreFile(); + const sessionKey = "agent:main:main"; + await seedSession({ + storePath, + sessionKey, + sessionId: "sess-claude", + route: { + channel: "imessage", + to: "+15551234567", + }, + transcriptMessages: [], + }); + + const harness = await createGatewaySuiteHarness(); + let mcp: Awaited> | null = null; + try { + const channelNotifications: Array<{ content: string; meta: Record }> = []; + const permissionNotifications: Array<{ request_id: string; behavior: "allow" | "deny" }> = []; + + mcp = await connectMcp({ + gatewayUrl: `ws://127.0.0.1:${harness.port}`, + gatewayToken: "test-gateway-token-1234567890", + claudeChannelMode: "on", + }); + mcp.client.setNotificationHandler(ClaudeChannelNotificationSchema, ({ params }) => { + channelNotifications.push(params); + }); + mcp.client.setNotificationHandler(ClaudePermissionNotificationSchema, ({ params }) => { + permissionNotifications.push(params); + }); + + emitSessionTranscriptUpdate({ + sessionFile: path.join(path.dirname(storePath), "sess-claude.jsonl"), + sessionKey, + messageId: "msg-user-1", + message: { + role: "user", + content: [{ type: "text", text: "hello Claude" }], + timestamp: Date.now(), + }, + }); + + await vi.waitFor(() => { + expect(channelNotifications).toHaveLength(1); + }); + expect(channelNotifications[0]).toMatchObject({ + content: "hello Claude", + meta: expect.objectContaining({ + session_key: sessionKey, + channel: "imessage", + to: "+15551234567", + message_id: "msg-user-1", + }), + }); + + await mcp.client.notification({ + method: "notifications/claude/channel/permission_request", + params: { + request_id: "abcde", + tool_name: "Bash", + description: "run npm test", + input_preview: '{"cmd":"npm test"}', + }, + }); + + emitSessionTranscriptUpdate({ + sessionFile: path.join(path.dirname(storePath), "sess-claude.jsonl"), + sessionKey, + messageId: "msg-user-2", + message: { + role: "user", + content: [{ type: "text", text: "yes abcde" }], + timestamp: Date.now(), + }, + }); + + await vi.waitFor(() => { + expect(permissionNotifications).toHaveLength(1); + }); + expect(permissionNotifications[0]).toEqual({ + request_id: "abcde", + behavior: "allow", + }); + } finally { + await mcp?.close(); + await harness.close(); + } + }); +}); diff --git a/src/mcp/channel-server.ts b/src/mcp/channel-server.ts new file mode 100644 index 00000000000..012db898177 --- /dev/null +++ b/src/mcp/channel-server.ts @@ -0,0 +1,986 @@ +import { randomUUID } from "node:crypto"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import { z } from "zod"; +import { loadConfig, type OpenClawConfig } from "../config/config.js"; +import { buildGatewayConnectionDetails } from "../gateway/call.js"; +import { GatewayClient } from "../gateway/client.js"; +import { resolveGatewayConnectionAuth } from "../gateway/connection-auth.js"; +import { APPROVALS_SCOPE, READ_SCOPE, WRITE_SCOPE } from "../gateway/method-scopes.js"; +import type { EventFrame } from "../gateway/protocol/index.js"; +import { extractFirstTextBlock } from "../shared/chat-message-content.js"; +import { + GATEWAY_CLIENT_MODES, + GATEWAY_CLIENT_NAMES, + normalizeMessageChannel, +} from "../utils/message-channel.js"; +import { VERSION } from "../version.js"; + +type ClaudeChannelMode = "off" | "on" | "auto"; + +export type OpenClawMcpServeOptions = { + gatewayUrl?: string; + gatewayToken?: string; + gatewayPassword?: string; + config?: OpenClawConfig; + claudeChannelMode?: ClaudeChannelMode; + verbose?: boolean; +}; + +type ConversationDescriptor = { + sessionKey: string; + channel: string; + to: string; + accountId?: string; + threadId?: string | number; + label?: string; + displayName?: string; + derivedTitle?: string; + lastMessagePreview?: string; + updatedAt?: number | null; +}; + +type SessionRow = { + key: string; + channel?: string; + lastChannel?: string; + lastTo?: string; + lastAccountId?: string; + lastThreadId?: string | number; + deliveryContext?: { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; + }; + origin?: { + provider?: string; + accountId?: string; + threadId?: string | number; + }; + label?: string; + displayName?: string; + derivedTitle?: string; + lastMessagePreview?: string; + updatedAt?: number | null; +}; + +type SessionListResult = { + sessions?: SessionRow[]; +}; + +type ChatHistoryResult = { + messages?: Array<{ id?: string; role?: string; content?: unknown; [key: string]: unknown }>; +}; + +type SessionMessagePayload = { + sessionKey?: string; + messageId?: string; + messageSeq?: number; + message?: { role?: string; content?: unknown; [key: string]: unknown }; + lastChannel?: string; + lastTo?: string; + lastAccountId?: string; + lastThreadId?: string | number; + [key: string]: unknown; +}; + +type ApprovalKind = "exec" | "plugin"; +type ApprovalDecision = "allow-once" | "allow-always" | "deny"; + +type PendingApproval = { + kind: ApprovalKind; + id: string; + request?: Record; + createdAtMs?: number; + expiresAtMs?: number; +}; + +type QueueEvent = + | { + cursor: number; + type: "message"; + sessionKey: string; + conversation?: ConversationDescriptor; + messageId?: string; + messageSeq?: number; + role?: string; + text?: string; + raw: SessionMessagePayload; + } + | { + cursor: number; + type: "claude_permission_request"; + requestId: string; + toolName: string; + description: string; + inputPreview: string; + } + | { + cursor: number; + type: "exec_approval_requested" | "exec_approval_resolved"; + raw: Record; + } + | { + cursor: number; + type: "plugin_approval_requested" | "plugin_approval_resolved"; + raw: Record; + }; + +type ClaudePermissionRequest = { + toolName: string; + description: string; + inputPreview: string; +}; + +type ServerNotification = { + method: string; + params?: Record; +}; + +type WaitFilter = { + afterCursor: number; + sessionKey?: string; +}; + +type PendingWaiter = { + filter: WaitFilter; + resolve: (value: QueueEvent | null) => void; + timeout: NodeJS.Timeout | null; +}; + +const CLAUDE_PERMISSION_REPLY_RE = /^(yes|no)\s+([a-km-z]{5})$/i; +const QUEUE_LIMIT = 1_000; + +const ClaudePermissionRequestSchema = z.object({ + method: z.literal("notifications/claude/channel/permission_request"), + params: z.object({ + request_id: z.string(), + tool_name: z.string(), + description: z.string(), + input_preview: z.string(), + }), +}); + +function toText(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined; +} + +function summarizeResult( + label: string, + count: number, +): { content: Array<{ type: "text"; text: string }> } { + return { + content: [{ type: "text", text: `${label}: ${count}` }], + }; +} + +function resolveConversationChannel(row: SessionRow): string | undefined { + return normalizeMessageChannel( + toText(row.deliveryContext?.channel) ?? + toText(row.lastChannel) ?? + toText(row.channel) ?? + toText(row.origin?.provider), + ); +} + +function toConversation(row: SessionRow): ConversationDescriptor | null { + const channel = resolveConversationChannel(row); + const to = toText(row.deliveryContext?.to) ?? toText(row.lastTo); + if (!channel || !to) { + return null; + } + return { + sessionKey: row.key, + channel, + to, + accountId: + toText(row.deliveryContext?.accountId) ?? + toText(row.lastAccountId) ?? + toText(row.origin?.accountId), + threadId: row.deliveryContext?.threadId ?? row.lastThreadId ?? row.origin?.threadId, + label: toText(row.label), + displayName: toText(row.displayName), + derivedTitle: toText(row.derivedTitle), + lastMessagePreview: toText(row.lastMessagePreview), + updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : null, + }; +} + +function matchEventFilter(event: QueueEvent, filter: WaitFilter): boolean { + if (event.cursor <= filter.afterCursor) { + return false; + } + if (!filter.sessionKey) { + return true; + } + return "sessionKey" in event && event.sessionKey === filter.sessionKey; +} + +function extractAttachmentsFromMessage(message: unknown): unknown[] { + if (!message || typeof message !== "object") { + return []; + } + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content)) { + return []; + } + return content.filter((entry) => { + if (!entry || typeof entry !== "object") { + return false; + } + return toText((entry as { type?: unknown }).type) !== "text"; + }); +} + +function normalizeApprovalId(value: unknown): string | undefined { + const id = toText(value); + return id ? id.trim() : undefined; +} + +export class OpenClawChannelBridge { + private gateway: GatewayClient | null = null; + private readonly verbose: boolean; + private readonly claudeChannelMode: ClaudeChannelMode; + private readonly queue: QueueEvent[] = []; + private readonly pendingWaiters = new Set(); + private readonly pendingClaudePermissions = new Map(); + private readonly pendingApprovals = new Map(); + private server: McpServer | null = null; + private cursor = 0; + private closed = false; + private ready = false; + private started = false; + private readonly readyPromise: Promise; + private resolveReady!: () => void; + private rejectReady!: (error: Error) => void; + private readySettled = false; + + constructor( + private readonly cfg: OpenClawConfig, + private readonly params: { + gatewayUrl?: string; + gatewayToken?: string; + gatewayPassword?: string; + claudeChannelMode: ClaudeChannelMode; + verbose: boolean; + }, + ) { + this.verbose = params.verbose; + this.claudeChannelMode = params.claudeChannelMode; + this.readyPromise = new Promise((resolve, reject) => { + this.resolveReady = resolve; + this.rejectReady = reject; + }); + } + + setServer(server: McpServer): void { + this.server = server; + } + + async start(): Promise { + if (this.started) { + await this.readyPromise; + return; + } + this.started = true; + const connection = buildGatewayConnectionDetails({ + config: this.cfg, + url: this.params.gatewayUrl, + }); + const gatewayUrlOverrideSource = + connection.urlSource === "cli --url" + ? "cli" + : connection.urlSource === "env OPENCLAW_GATEWAY_URL" + ? "env" + : undefined; + const creds = await resolveGatewayConnectionAuth({ + config: this.cfg, + explicitAuth: { + token: this.params.gatewayToken, + password: this.params.gatewayPassword, + }, + env: process.env, + urlOverride: gatewayUrlOverrideSource ? connection.url : undefined, + urlOverrideSource: gatewayUrlOverrideSource, + }); + if (this.closed) { + this.resolveReadyOnce(); + return; + } + + this.gateway = new GatewayClient({ + url: connection.url, + token: creds.token, + password: creds.password, + clientName: GATEWAY_CLIENT_NAMES.CLI, + clientDisplayName: "OpenClaw MCP", + clientVersion: VERSION, + mode: GATEWAY_CLIENT_MODES.CLI, + scopes: [READ_SCOPE, WRITE_SCOPE, APPROVALS_SCOPE], + onEvent: (event) => { + void this.handleGatewayEvent(event); + }, + onHelloOk: () => { + void this.handleHelloOk(); + }, + onConnectError: (error) => { + this.rejectReadyOnce(error instanceof Error ? error : new Error(String(error))); + }, + onClose: (code, reason) => { + if (!this.ready && !this.closed) { + this.rejectReadyOnce(new Error(`gateway closed before ready (${code}): ${reason}`)); + } + }, + }); + this.gateway.start(); + await this.readyPromise; + } + + async waitUntilReady(): Promise { + await this.readyPromise; + } + + async close(): Promise { + if (this.closed) { + return; + } + this.closed = true; + this.resolveReadyOnce(); + for (const waiter of this.pendingWaiters) { + if (waiter.timeout) { + clearTimeout(waiter.timeout); + } + waiter.resolve(null); + } + this.pendingWaiters.clear(); + const gateway = this.gateway; + this.gateway = null; + await gateway?.stopAndWait().catch(() => undefined); + } + + async listConversations(params?: { + limit?: number; + search?: string; + channel?: string; + includeDerivedTitles?: boolean; + includeLastMessage?: boolean; + }): Promise { + await this.waitUntilReady(); + const response = await this.requestGateway("sessions.list", { + limit: params?.limit ?? 50, + search: params?.search, + includeDerivedTitles: params?.includeDerivedTitles ?? true, + includeLastMessage: params?.includeLastMessage ?? true, + }); + const requestedChannel = toText(params?.channel)?.toLowerCase(); + return (response.sessions ?? []) + .map(toConversation) + .filter((conversation): conversation is ConversationDescriptor => Boolean(conversation)) + .filter((conversation) => + requestedChannel ? conversation.channel.toLowerCase() === requestedChannel : true, + ); + } + + async getConversation(sessionKey: string): Promise { + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedSessionKey) { + return null; + } + const conversations = await this.listConversations({ limit: 500, includeLastMessage: true }); + return ( + conversations.find((conversation) => conversation.sessionKey === normalizedSessionKey) ?? null + ); + } + + async readMessages( + sessionKey: string, + limit = 20, + ): Promise> { + await this.waitUntilReady(); + const response = await this.requestGateway("chat.history", { + sessionKey, + limit, + }); + return response.messages ?? []; + } + + async sendMessage(params: { + sessionKey: string; + text: string; + }): Promise> { + const conversation = await this.getConversation(params.sessionKey); + if (!conversation) { + throw new Error(`Conversation not found for session ${params.sessionKey}`); + } + return await this.requestGateway("send", { + to: conversation.to, + channel: conversation.channel, + accountId: conversation.accountId, + threadId: conversation.threadId == null ? undefined : String(conversation.threadId), + message: params.text, + sessionKey: conversation.sessionKey, + idempotencyKey: randomUUID(), + }); + } + + listPendingApprovals(): PendingApproval[] { + return [...this.pendingApprovals.values()].toSorted((a, b) => { + return (a.createdAtMs ?? 0) - (b.createdAtMs ?? 0); + }); + } + + async respondToApproval(params: { + kind: ApprovalKind; + id: string; + decision: ApprovalDecision; + }): Promise> { + if (params.kind === "exec") { + return await this.requestGateway("exec.approval.resolve", { + id: params.id, + decision: params.decision, + }); + } + return await this.requestGateway("plugin.approval.resolve", { + id: params.id, + decision: params.decision, + }); + } + + pollEvents(filter: WaitFilter, limit = 20): { events: QueueEvent[]; nextCursor: number } { + const events = this.queue.filter((event) => matchEventFilter(event, filter)).slice(0, limit); + const nextCursor = events.at(-1)?.cursor ?? filter.afterCursor; + return { events, nextCursor }; + } + + async waitForEvent(filter: WaitFilter, timeoutMs = 30_000): Promise { + const existing = this.queue.find((event) => matchEventFilter(event, filter)); + if (existing) { + return existing; + } + return await new Promise((resolve) => { + const waiter: PendingWaiter = { + filter, + resolve: (value) => { + this.pendingWaiters.delete(waiter); + resolve(value); + }, + timeout: null, + }; + if (timeoutMs > 0) { + waiter.timeout = setTimeout(() => { + waiter.resolve(null); + }, timeoutMs); + } + this.pendingWaiters.add(waiter); + }); + } + + async handleClaudePermissionRequest(params: { + requestId: string; + toolName: string; + description: string; + inputPreview: string; + }): Promise { + this.pendingClaudePermissions.set(params.requestId, { + toolName: params.toolName, + description: params.description, + inputPreview: params.inputPreview, + }); + this.enqueue({ + cursor: this.nextCursor(), + type: "claude_permission_request", + requestId: params.requestId, + toolName: params.toolName, + description: params.description, + inputPreview: params.inputPreview, + }); + if (this.verbose) { + process.stderr.write(`openclaw mcp: pending Claude permission ${params.requestId}\n`); + } + } + + private async requestGateway>( + method: string, + params: Record, + ): Promise { + if (!this.gateway) { + throw new Error("Gateway client is not ready"); + } + return await this.gateway.request(method, params); + } + + private async requestNoThrow(method: string, params: Record): Promise { + try { + await this.requestGateway(method, params); + } catch (error) { + if (this.verbose) { + process.stderr.write(`openclaw mcp: ${method} failed: ${String(error)}\n`); + } + } + } + + private async sendNotification(notification: ServerNotification): Promise { + if (!this.server || this.closed) { + return; + } + try { + await this.server.server.notification(notification); + } catch (error) { + if (this.verbose && !this.closed) { + process.stderr.write( + `openclaw mcp: notification ${notification.method} failed: ${String(error)}\n`, + ); + } + } + } + + private async handleHelloOk(): Promise { + try { + await this.requestGateway("sessions.subscribe", {}); + this.ready = true; + this.resolveReadyOnce(); + } catch (error) { + this.rejectReadyOnce(error instanceof Error ? error : new Error(String(error))); + } + } + + private resolveReadyOnce(): void { + if (this.readySettled) { + return; + } + this.readySettled = true; + this.resolveReady(); + } + + private rejectReadyOnce(error: Error): void { + if (this.readySettled) { + return; + } + this.readySettled = true; + this.rejectReady(error); + } + + private nextCursor(): number { + this.cursor += 1; + return this.cursor; + } + + private enqueue(event: QueueEvent): void { + this.queue.push(event); + while (this.queue.length > QUEUE_LIMIT) { + this.queue.shift(); + } + for (const waiter of this.pendingWaiters) { + if (!matchEventFilter(event, waiter.filter)) { + continue; + } + if (waiter.timeout) { + clearTimeout(waiter.timeout); + } + waiter.resolve(event); + } + } + + private trackApproval(kind: ApprovalKind, payload: Record): void { + const id = normalizeApprovalId(payload.id); + if (!id) { + return; + } + this.pendingApprovals.set(id, { + kind, + id, + request: + payload.request && typeof payload.request === "object" + ? (payload.request as Record) + : undefined, + createdAtMs: typeof payload.createdAtMs === "number" ? payload.createdAtMs : undefined, + expiresAtMs: typeof payload.expiresAtMs === "number" ? payload.expiresAtMs : undefined, + }); + } + + private resolveTrackedApproval(payload: Record): void { + const id = normalizeApprovalId(payload.id); + if (id) { + this.pendingApprovals.delete(id); + } + } + + private async handleGatewayEvent(event: EventFrame): Promise { + if (event.event === "session.message") { + await this.handleSessionMessageEvent(event.payload as SessionMessagePayload); + return; + } + if (event.event === "exec.approval.requested") { + const raw = (event.payload ?? {}) as Record; + this.trackApproval("exec", raw); + this.enqueue({ + cursor: this.nextCursor(), + type: "exec_approval_requested", + raw, + }); + return; + } + if (event.event === "exec.approval.resolved") { + const raw = (event.payload ?? {}) as Record; + this.resolveTrackedApproval(raw); + this.enqueue({ + cursor: this.nextCursor(), + type: "exec_approval_resolved", + raw, + }); + return; + } + if (event.event === "plugin.approval.requested") { + const raw = (event.payload ?? {}) as Record; + this.trackApproval("plugin", raw); + this.enqueue({ + cursor: this.nextCursor(), + type: "plugin_approval_requested", + raw, + }); + return; + } + if (event.event === "plugin.approval.resolved") { + const raw = (event.payload ?? {}) as Record; + this.resolveTrackedApproval(raw); + this.enqueue({ + cursor: this.nextCursor(), + type: "plugin_approval_resolved", + raw, + }); + } + } + + private async handleSessionMessageEvent(payload: SessionMessagePayload): Promise { + const sessionKey = toText(payload.sessionKey); + if (!sessionKey) { + return; + } + const conversation = + toConversation({ + key: sessionKey, + lastChannel: toText(payload.lastChannel), + lastTo: toText(payload.lastTo), + lastAccountId: toText(payload.lastAccountId), + lastThreadId: payload.lastThreadId, + }) ?? undefined; + const role = toText(payload.message?.role); + const text = extractFirstTextBlock(payload.message); + const permissionMatch = text ? CLAUDE_PERMISSION_REPLY_RE.exec(text) : null; + if (permissionMatch) { + const requestId = permissionMatch[2]?.toLowerCase(); + if (requestId && this.pendingClaudePermissions.has(requestId)) { + this.pendingClaudePermissions.delete(requestId); + await this.sendNotification({ + method: "notifications/claude/channel/permission", + params: { + request_id: requestId, + behavior: permissionMatch[1]?.toLowerCase().startsWith("y") ? "allow" : "deny", + }, + }); + return; + } + } + + this.enqueue({ + cursor: this.nextCursor(), + type: "message", + sessionKey, + conversation, + messageId: toText(payload.messageId), + messageSeq: typeof payload.messageSeq === "number" ? payload.messageSeq : undefined, + role, + text, + raw: payload, + }); + + if (!this.shouldEmitClaudeChannel(role, conversation)) { + return; + } + await this.sendNotification({ + method: "notifications/claude/channel", + params: { + content: text ?? "[non-text message]", + meta: { + session_key: sessionKey, + channel: conversation?.channel ?? "", + to: conversation?.to ?? "", + account_id: conversation?.accountId ?? "", + thread_id: conversation?.threadId == null ? "" : String(conversation.threadId), + message_id: toText(payload.messageId) ?? "", + }, + }, + }); + } + + private shouldEmitClaudeChannel( + role: string | undefined, + conversation: ConversationDescriptor | undefined, + ): boolean { + if (this.claudeChannelMode === "off") { + return false; + } + if (role !== "user") { + return false; + } + return Boolean(conversation); + } +} + +export async function createOpenClawChannelMcpServer(opts: OpenClawMcpServeOptions = {}): Promise<{ + server: McpServer; + bridge: OpenClawChannelBridge; + start: () => Promise; + close: () => Promise; +}> { + const cfg = opts.config ?? loadConfig(); + const claudeChannelMode = opts.claudeChannelMode ?? "auto"; + const capabilities = + claudeChannelMode === "off" + ? undefined + : { + experimental: { + "claude/channel": {}, + "claude/channel/permission": {}, + }, + }; + + const server = new McpServer( + { name: "openclaw", version: VERSION }, + capabilities ? { capabilities } : undefined, + ); + const bridge = new OpenClawChannelBridge(cfg, { + gatewayUrl: opts.gatewayUrl, + gatewayToken: opts.gatewayToken, + gatewayPassword: opts.gatewayPassword, + claudeChannelMode, + verbose: opts.verbose ?? false, + }); + bridge.setServer(server); + + server.server.setNotificationHandler(ClaudePermissionRequestSchema, async ({ params }) => { + await bridge.handleClaudePermissionRequest({ + requestId: params.request_id, + toolName: params.tool_name, + description: params.description, + inputPreview: params.input_preview, + }); + }); + + server.tool( + "conversations_list", + "List OpenClaw channel-backed conversations available through session routes.", + { + limit: z.number().int().min(1).max(500).optional(), + search: z.string().optional(), + channel: z.string().optional(), + includeDerivedTitles: z.boolean().optional(), + includeLastMessage: z.boolean().optional(), + }, + async (args) => { + const conversations = await bridge.listConversations(args); + return { + ...summarizeResult("conversations", conversations.length), + structuredContent: { conversations }, + }; + }, + ); + + server.tool( + "conversation_get", + "Get one OpenClaw conversation by session key.", + { session_key: z.string().min(1) }, + async ({ session_key }) => { + const conversation = await bridge.getConversation(session_key); + if (!conversation) { + return { + content: [{ type: "text", text: `conversation not found: ${session_key}` }], + isError: true, + }; + } + return { + content: [{ type: "text", text: `conversation ${conversation.sessionKey}` }], + structuredContent: { conversation }, + }; + }, + ); + + server.tool( + "messages_read", + "Read recent messages for one OpenClaw conversation.", + { + session_key: z.string().min(1), + limit: z.number().int().min(1).max(200).optional(), + }, + async ({ session_key, limit }) => { + const messages = await bridge.readMessages(session_key, limit ?? 20); + return { + ...summarizeResult("messages", messages.length), + structuredContent: { messages }, + }; + }, + ); + + server.tool( + "attachments_fetch", + "List non-text attachments for a message in one OpenClaw conversation.", + { + session_key: z.string().min(1), + message_id: z.string().min(1), + limit: z.number().int().min(1).max(200).optional(), + }, + async ({ session_key, message_id, limit }) => { + const messages = await bridge.readMessages(session_key, limit ?? 100); + const message = messages.find((entry) => toText(entry.id) === message_id); + if (!message) { + return { + content: [{ type: "text", text: `message not found: ${message_id}` }], + isError: true, + }; + } + const attachments = extractAttachmentsFromMessage(message); + return { + ...summarizeResult("attachments", attachments.length), + structuredContent: { attachments, message }, + }; + }, + ); + + server.tool( + "events_poll", + "Poll queued OpenClaw conversation events since a cursor.", + { + after_cursor: z.number().int().min(0).optional(), + session_key: z.string().optional(), + limit: z.number().int().min(1).max(200).optional(), + }, + async ({ after_cursor, session_key, limit }) => { + const { events, nextCursor } = bridge.pollEvents( + { afterCursor: after_cursor ?? 0, sessionKey: toText(session_key) }, + limit ?? 20, + ); + return { + ...summarizeResult("events", events.length), + structuredContent: { events, next_cursor: nextCursor }, + }; + }, + ); + + server.tool( + "events_wait", + "Wait for the next queued OpenClaw conversation event.", + { + after_cursor: z.number().int().min(0).optional(), + session_key: z.string().optional(), + timeout_ms: z.number().int().min(1).max(300_000).optional(), + }, + async ({ after_cursor, session_key, timeout_ms }) => { + const event = await bridge.waitForEvent( + { afterCursor: after_cursor ?? 0, sessionKey: toText(session_key) }, + timeout_ms ?? 30_000, + ); + return { + content: [{ type: "text", text: event ? `event ${event.cursor}` : "timeout" }], + structuredContent: { event }, + }; + }, + ); + + server.tool( + "messages_send", + "Send a message back through the same OpenClaw conversation route.", + { + session_key: z.string().min(1), + text: z.string().min(1), + }, + async ({ session_key, text }) => { + const result = await bridge.sendMessage({ sessionKey: session_key, text }); + return { + content: [{ type: "text", text: "sent" }], + structuredContent: { result }, + }; + }, + ); + + server.tool( + "permissions_list_open", + "List open OpenClaw exec or plugin approval requests visible through the Gateway.", + {}, + async () => { + const approvals = bridge.listPendingApprovals(); + return { + ...summarizeResult("approvals", approvals.length), + structuredContent: { approvals }, + }; + }, + ); + + server.tool( + "permissions_respond", + "Allow or deny one pending OpenClaw exec or plugin approval request.", + { + kind: z.enum(["exec", "plugin"]), + id: z.string().min(1), + decision: z.enum(["allow-once", "allow-always", "deny"]), + }, + async ({ kind, id, decision }) => { + const result = await bridge.respondToApproval({ kind, id, decision }); + return { + content: [{ type: "text", text: "approval resolved" }], + structuredContent: { result }, + }; + }, + ); + + return { + server, + bridge, + start: async () => { + await bridge.start(); + }, + close: async () => { + await bridge.close(); + await server.close(); + }, + }; +} + +export async function serveOpenClawChannelMcp(opts: OpenClawMcpServeOptions = {}): Promise { + const { server, start, close } = await createOpenClawChannelMcpServer(opts); + const transport = new StdioServerTransport(); + + let shuttingDown = false; + let resolveClosed!: () => void; + const closed = new Promise((resolve) => { + resolveClosed = resolve; + }); + + const shutdown = () => { + if (shuttingDown) { + return; + } + shuttingDown = true; + process.stdin.off("end", shutdown); + process.stdin.off("close", shutdown); + process.off("SIGINT", shutdown); + process.off("SIGTERM", shutdown); + transport["onclose"] = undefined; + void close().finally(resolveClosed); + }; + + transport["onclose"] = shutdown; + process.stdin.once("end", shutdown); + process.stdin.once("close", shutdown); + process.once("SIGINT", shutdown); + process.once("SIGTERM", shutdown); + + try { + await server.connect(transport); + await start(); + await closed; + } finally { + shutdown(); + await closed; + } +}