From ec5877346c28ef403068d676ea0c97c42e7da457 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 28 Mar 2026 04:10:00 +0000 Subject: [PATCH] fix: harden mcp channel bridge smoke --- package.json | 3 +- scripts/e2e/Dockerfile | 1 + scripts/e2e/mcp-channels-docker-client.ts | 584 ++++++++++++++++++ scripts/e2e/mcp-channels-docker.sh | 79 +++ scripts/e2e/mcp-channels-seed.ts | 134 ++++ .../chat.directive-tags.test.ts | 39 ++ src/gateway/server-methods/chat.ts | 76 ++- src/mcp/channel-server.test.ts | 61 ++ src/mcp/channel-server.ts | 11 +- src/shared/chat-message-content.test.ts | 8 + src/shared/chat-message-content.ts | 3 + 11 files changed, 970 insertions(+), 29 deletions(-) create mode 100644 scripts/e2e/mcp-channels-docker-client.ts create mode 100644 scripts/e2e/mcp-channels-docker.sh create mode 100644 scripts/e2e/mcp-channels-seed.ts diff --git a/package.json b/package.json index d72974fcc07..72ee396bdbb 100644 --- a/package.json +++ b/package.json @@ -1089,13 +1089,14 @@ "test:contracts:plugins": "OPENCLAW_TEST_PROFILE=serial pnpm exec vitest run --config vitest.contracts.config.ts src/plugins/contracts", "test:coverage": "vitest run --config vitest.unit.config.ts --coverage", "test:coverage:changed": "vitest run --config vitest.unit.config.ts --coverage --changed origin/main", - "test:docker:all": "pnpm test:docker:live-models && pnpm test:docker:live-gateway && pnpm test:docker:openwebui && pnpm test:docker:onboard && pnpm test:docker:gateway-network && pnpm test:docker:qr && pnpm test:docker:doctor-switch && pnpm test:docker:plugins && pnpm test:docker:cleanup", + "test:docker:all": "pnpm test:docker:live-models && pnpm test:docker:live-gateway && pnpm test:docker:openwebui && pnpm test:docker:onboard && pnpm test:docker:gateway-network && pnpm test:docker:mcp-channels && pnpm test:docker:qr && pnpm test:docker:doctor-switch && pnpm test:docker:plugins && pnpm test:docker:cleanup", "test:docker:cleanup": "bash scripts/test-cleanup-docker.sh", "test:docker:doctor-switch": "bash scripts/e2e/doctor-install-switch-docker.sh", "test:docker:gateway-network": "bash scripts/e2e/gateway-network-docker.sh", "test:docker:live-cli-backend": "bash scripts/test-live-cli-backend-docker.sh", "test:docker:live-gateway": "bash scripts/test-live-gateway-models-docker.sh", "test:docker:live-models": "bash scripts/test-live-models-docker.sh", + "test:docker:mcp-channels": "bash scripts/e2e/mcp-channels-docker.sh", "test:docker:onboard": "bash scripts/e2e/onboard-docker.sh", "test:docker:openwebui": "bash scripts/e2e/openwebui-docker.sh", "test:docker:plugins": "bash scripts/e2e/plugins-docker.sh", diff --git a/scripts/e2e/Dockerfile b/scripts/e2e/Dockerfile index ba8f0d46545..70d67f15b2c 100644 --- a/scripts/e2e/Dockerfile +++ b/scripts/e2e/Dockerfile @@ -31,6 +31,7 @@ COPY --chown=appuser:appuser src ./src COPY --chown=appuser:appuser test ./test COPY --chown=appuser:appuser scripts ./scripts COPY --chown=appuser:appuser docs ./docs +COPY --chown=appuser:appuser packages ./packages COPY --chown=appuser:appuser skills ./skills COPY --chown=appuser:appuser ui ./ui COPY --chown=appuser:appuser extensions ./extensions diff --git a/scripts/e2e/mcp-channels-docker-client.ts b/scripts/e2e/mcp-channels-docker-client.ts new file mode 100644 index 00000000000..20d483f0029 --- /dev/null +++ b/scripts/e2e/mcp-channels-docker-client.ts @@ -0,0 +1,584 @@ +import { randomUUID } from "node:crypto"; +import process from "node:process"; +import { setTimeout as delay } from "node:timers/promises"; +import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; +import { WebSocket } from "ws"; +import { z } from "zod"; +import { PROTOCOL_VERSION } from "../../src/gateway/protocol/index.ts"; +import { rawDataToString } from "../../src/infra/ws.ts"; + +const ClaudeChannelNotificationSchema = z.object({ + method: z.literal("notifications/claude/channel"), + params: z.object({ + content: z.string(), + meta: z.record(z.string(), z.string()), + }), +}); + +const ClaudePermissionNotificationSchema = z.object({ + method: z.literal("notifications/claude/channel/permission"), + params: z.object({ + request_id: z.string(), + behavior: z.enum(["allow", "deny"]), + }), +}); + +type ClaudeChannelNotification = z.infer["params"]; + +type GatewayRpcClient = { + request(method: string, params?: unknown): Promise; + events: Array<{ event: string; payload: Record }>; + close(): Promise; +}; + +type McpClientHandle = { + client: Client; + transport: StdioClientTransport; + rawMessages: unknown[]; +}; + +function assert(condition: unknown, message: string): asserts condition { + if (!condition) { + throw new Error(message); + } +} + +function extractTextFromGatewayPayload( + payload: Record | undefined, +): string | undefined { + const message = payload?.message; + if (!message || typeof message !== "object") { + return undefined; + } + const content = (message as { content?: unknown }).content; + if (typeof content === "string" && content.trim().length > 0) { + return content; + } + if (!Array.isArray(content)) { + return undefined; + } + const first = content[0]; + if (!first || typeof first !== "object") { + return undefined; + } + const text = (first as { text?: unknown }).text; + return typeof text === "string" ? text : undefined; +} + +async function connectGateway(params: { url: string; token: string }): Promise { + const ws = new WebSocket(params.url); + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error("gateway ws open timeout")), 10_000); + timeout.unref?.(); + ws.once("open", () => { + clearTimeout(timeout); + resolve(); + }); + ws.once("error", (error) => { + clearTimeout(timeout); + reject(error); + }); + }); + + const pending = new Map< + string, + { + resolve: (value: unknown) => void; + reject: (error: Error) => void; + } + >(); + const requestedScopes = ["operator.read", "operator.write", "operator.pairing", "operator.admin"]; + const events: Array<{ event: string; payload: Record }> = []; + + ws.on("message", (data) => { + let frame: unknown; + try { + frame = JSON.parse(rawDataToString(data)); + } catch { + return; + } + if (!frame || typeof frame !== "object") { + return; + } + const typed = frame as { + type?: unknown; + event?: unknown; + payload?: unknown; + id?: unknown; + ok?: unknown; + result?: unknown; + error?: { message?: unknown } | null; + }; + if (typed.type === "event" && typeof typed.event === "string") { + events.push({ + event: typed.event, + payload: + typed.payload && typeof typed.payload === "object" + ? (typed.payload as Record) + : {}, + }); + return; + } + if (typed.type !== "res" || typeof typed.id !== "string") { + return; + } + const match = pending.get(typed.id); + if (!match) { + return; + } + pending.delete(typed.id); + if (typed.ok === true) { + match.resolve(typed.result); + return; + } + match.reject( + new Error( + typed.error && typeof typed.error.message === "string" + ? typed.error.message + : "gateway request failed", + ), + ); + }); + + ws.once("close", (code, reason) => { + const error = new Error(`gateway closed (${code}): ${rawDataToString(reason)}`); + for (const entry of pending.values()) { + entry.reject(error); + } + pending.clear(); + }); + + const connectId = randomUUID(); + ws.send( + JSON.stringify({ + type: "req", + id: connectId, + method: "connect", + params: { + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, + client: { + id: "openclaw-tui", + displayName: "docker-mcp-channels", + version: "1.0.0", + platform: process.platform, + mode: "ui", + }, + role: "operator", + scopes: requestedScopes, + caps: [], + auth: { token: params.token }, + }, + }), + ); + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + pending.delete(connectId); + reject(new Error("gateway connect timeout")); + }, 10_000); + timeout.unref?.(); + pending.set(connectId, { + resolve: () => { + clearTimeout(timeout); + resolve(); + }, + reject: (error) => { + clearTimeout(timeout); + reject(error); + }, + }); + }); + await new Promise((resolve, reject) => { + const id = randomUUID(); + const timeout = setTimeout(() => { + pending.delete(id); + reject(new Error("gateway sessions.subscribe timeout")); + }, 10_000); + timeout.unref?.(); + pending.set(id, { + resolve: () => { + clearTimeout(timeout); + resolve(); + }, + reject: (error) => { + clearTimeout(timeout); + reject(error); + }, + }); + ws.send( + JSON.stringify({ + type: "req", + id, + method: "sessions.subscribe", + params: {}, + }), + ); + }); + + return { + request(method, requestParams) { + const id = randomUUID(); + ws.send( + JSON.stringify({ + type: "req", + id, + method, + params: requestParams ?? {}, + }), + ); + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + pending.delete(id); + reject(new Error(`gateway request timeout: ${method}`)); + }, 10_000); + timeout.unref?.(); + pending.set(id, { + resolve: (value) => { + clearTimeout(timeout); + resolve(value as T); + }, + reject: (error) => { + clearTimeout(timeout); + reject(error); + }, + }); + }); + }, + events, + async close() { + if (ws.readyState === WebSocket.CLOSED) { + return; + } + await new Promise((resolve) => { + const timeout = setTimeout(resolve, 2_000); + timeout.unref?.(); + ws.once("close", () => { + clearTimeout(timeout); + resolve(); + }); + ws.close(); + }); + }, + }; +} + +async function waitFor( + label: string, + predicate: () => T | undefined, + timeoutMs = 10_000, +): Promise { + const started = Date.now(); + while (Date.now() - started < timeoutMs) { + const value = predicate(); + if (value !== undefined) { + return value; + } + await delay(50); + } + throw new Error(`timeout waiting for ${label}`); +} + +async function connectMcpClient(params: { + gatewayUrl: string; + gatewayToken: string; +}): Promise { + const transport = new StdioClientTransport({ + command: "node", + args: [ + "/app/openclaw.mjs", + "mcp", + "serve", + "--url", + params.gatewayUrl, + "--token", + params.gatewayToken, + "--claude-channel-mode", + "on", + ], + cwd: "/app", + env: { + ...process.env, + OPENCLAW_ALLOW_INSECURE_PRIVATE_WS: "1", + OPENCLAW_STATE_DIR: "/tmp/openclaw-mcp-client", + }, + stderr: "pipe", + }); + transport.stderr?.on("data", (chunk) => { + process.stderr.write(`[openclaw mcp] ${String(chunk)}`); + }); + const rawMessages: unknown[] = []; + transport.addEventListener("message", (event) => { + rawMessages.push(event.data); + }); + + const client = new Client({ name: "docker-mcp-channels", version: "1.0.0" }); + await client.connect(transport); + return { client, transport, rawMessages }; +} + +async function maybeApprovePendingBridgePairing(gateway: GatewayRpcClient): Promise { + let pairingState: + | { + pending?: Array<{ requestId?: string; role?: string }>; + } + | undefined; + try { + pairingState = await gateway.request<{ + pending?: Array<{ requestId?: string; role?: string }>; + }>("device.pair.list", {}); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes("missing scope: operator.pairing")) { + return false; + } + throw error; + } + if (!pairingState) { + return false; + } + const pendingRequest = pairingState.pending?.find((entry) => entry.role === "operator"); + if (!pendingRequest?.requestId) { + return false; + } + await gateway.request("device.pair.approve", { requestId: pendingRequest.requestId }); + return true; +} + +async function main() { + const gatewayUrl = process.env.GW_URL?.trim(); + const gatewayToken = process.env.GW_TOKEN?.trim(); + assert(gatewayUrl, "missing GW_URL"); + assert(gatewayToken, "missing GW_TOKEN"); + + const gateway = await connectGateway({ url: gatewayUrl, token: gatewayToken }); + let mcpHandle = await connectMcpClient({ + gatewayUrl, + gatewayToken, + }); + let mcp = mcpHandle.client; + + try { + await delay(500); + if (await maybeApprovePendingBridgePairing(gateway)) { + await Promise.allSettled([mcp.close(), mcpHandle.transport.close()]); + mcpHandle = await connectMcpClient({ + gatewayUrl, + gatewayToken, + }); + mcp = mcpHandle.client; + } + + const listed = (await mcp.callTool({ + name: "conversations_list", + arguments: {}, + })) as { + structuredContent?: { conversations?: Array> }; + }; + const conversation = listed.structuredContent?.conversations?.find( + (entry) => entry.sessionKey === "agent:main:main", + ); + assert(conversation, "expected seeded conversation in conversations_list"); + assert(conversation.channel === "imessage", "expected seeded channel"); + assert(conversation.to === "+15551234567", "expected seeded target"); + + const fetched = (await mcp.callTool({ + name: "conversation_get", + arguments: { session_key: "agent:main:main" }, + })) as { + structuredContent?: { conversation?: Record }; + isError?: boolean; + }; + assert(!fetched.isError, "conversation_get should succeed"); + assert( + fetched.structuredContent?.conversation?.sessionKey === "agent:main:main", + "conversation_get returned wrong session", + ); + + const history = (await mcp.callTool({ + name: "messages_read", + arguments: { session_key: "agent:main:main", limit: 10 }, + })) as { + structuredContent?: { messages?: Array> }; + }; + const messages = history.structuredContent?.messages ?? []; + assert(messages.length >= 2, "expected seeded transcript messages"); + const attachmentMessage = messages.find((entry) => { + const raw = entry.__openclaw; + return raw && typeof raw === "object" && (raw as { id?: unknown }).id === "msg-attachment"; + }); + assert(attachmentMessage, "expected seeded attachment message"); + + const attachments = (await mcp.callTool({ + name: "attachments_fetch", + arguments: { session_key: "agent:main:main", message_id: "msg-attachment" }, + })) as { + structuredContent?: { attachments?: Array> }; + isError?: boolean; + }; + assert(!attachments.isError, "attachments_fetch should succeed"); + assert( + (attachments.structuredContent?.attachments?.length ?? 0) === 1, + "expected one seeded attachment", + ); + + const waited = (await Promise.all([ + mcp.callTool({ + name: "events_wait", + arguments: { + session_key: "agent:main:main", + after_cursor: 0, + timeout_ms: 10_000, + }, + }) as Promise<{ + structuredContent?: { event?: Record }; + }>, + gateway.request("chat.inject", { + sessionKey: "agent:main:main", + message: "assistant live event", + }), + ]).then(([result]) => result)) as { + structuredContent?: { event?: Record }; + }; + const assistantEvent = waited.structuredContent?.event; + assert(assistantEvent, "expected events_wait result"); + assert(assistantEvent.type === "message", "expected message event"); + assert(assistantEvent.role === "assistant", "expected assistant event role"); + assert(assistantEvent.text === "assistant live event", "expected assistant event text"); + const assistantCursor = + typeof assistantEvent.cursor === "number" ? assistantEvent.cursor : undefined; + assert(typeof assistantCursor === "number", "expected assistant event cursor"); + + const polled = (await mcp.callTool({ + name: "events_poll", + arguments: { session_key: "agent:main:main", after_cursor: 0, limit: 10 }, + })) as { + structuredContent?: { events?: Array> }; + }; + assert( + (polled.structuredContent?.events ?? []).some( + (entry) => entry.text === "assistant live event", + ), + "expected assistant event in events_poll", + ); + + const channelMessage = `hello from docker ${randomUUID()}`; + const userEvent = (await Promise.all([ + mcp.callTool({ + name: "events_wait", + arguments: { + session_key: "agent:main:main", + after_cursor: assistantCursor, + timeout_ms: 10_000, + }, + }) as Promise<{ + structuredContent?: { event?: Record }; + }>, + gateway.request("chat.send", { + sessionKey: "agent:main:main", + message: channelMessage, + idempotencyKey: randomUUID(), + }), + ]).then(([result]) => result)) as { + structuredContent?: { event?: Record }; + }; + const rawGatewayUserMessage = await waitFor("raw gateway user session.message", () => + gateway.events.find( + (entry) => + entry.event === "session.message" && + entry.payload.sessionKey === "agent:main:main" && + extractTextFromGatewayPayload(entry.payload) === channelMessage, + ), + ); + if (userEvent.structuredContent?.event?.text !== channelMessage) { + throw new Error( + `expected user event after chat.send: ${JSON.stringify( + { + userEvent: userEvent.structuredContent?.event ?? null, + rawGatewayUserMessage: rawGatewayUserMessage ?? null, + recentGatewayEvents: gateway.events.slice(-10).map((entry) => ({ + event: entry.event, + sessionKey: entry.payload.sessionKey, + text: extractTextFromGatewayPayload(entry.payload), + })), + }, + null, + 2, + )}`, + ); + } + assert(rawGatewayUserMessage, "expected raw gateway session.message after chat.send"); + let helpNotification: ClaudeChannelNotification; + try { + helpNotification = await waitFor( + "Claude channel notification", + () => + mcpHandle.rawMessages + .map((entry) => ClaudeChannelNotificationSchema.safeParse(entry)) + .find( + (entry) => + entry.success && + entry.data.params.meta.session_key === "agent:main:main" && + entry.data.params.content === channelMessage, + )?.data.params, + ); + } catch (error) { + throw new Error( + `timeout waiting for Claude channel notification: ${JSON.stringify( + { + rawMessages: mcpHandle.rawMessages.slice(-10), + }, + null, + 2, + )}`, + { cause: error }, + ); + } + assert(helpNotification.content === channelMessage, "expected Claude channel content"); + + await mcp.notification({ + method: "notifications/claude/channel/permission_request", + params: { + request_id: "abcde", + tool_name: "Bash", + description: "run npm test", + input_preview: '{"cmd":"npm test"}', + }, + }); + + await gateway.request("chat.send", { + sessionKey: "agent:main:main", + message: "yes abcde", + idempotencyKey: randomUUID(), + }); + const permission = await waitFor( + "Claude permission notification", + () => + mcpHandle.rawMessages + .map((entry) => ClaudePermissionNotificationSchema.safeParse(entry)) + .find((entry) => entry.success && entry.data.params.request_id === "abcde")?.data.params, + ); + assert(permission.behavior === "allow", "expected allow permission reply"); + + process.stdout.write( + JSON.stringify( + { + ok: true, + sessionKey: "agent:main:main", + rawNotifications: mcpHandle.rawMessages.filter( + (entry) => + ClaudeChannelNotificationSchema.safeParse(entry).success || + ClaudePermissionNotificationSchema.safeParse(entry).success, + ).length, + }, + null, + 2, + ) + "\n", + ); + } finally { + await Promise.allSettled([mcp.close(), mcpHandle.transport.close(), gateway.close()]); + } +} + +await main(); diff --git a/scripts/e2e/mcp-channels-docker.sh b/scripts/e2e/mcp-channels-docker.sh new file mode 100644 index 00000000000..4deb8d081d3 --- /dev/null +++ b/scripts/e2e/mcp-channels-docker.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +IMAGE_NAME="${OPENCLAW_IMAGE:-openclaw-mcp-channels-e2e}" +PORT="18789" +TOKEN="mcp-e2e-$(date +%s)-$$" +CONTAINER_NAME="openclaw-mcp-e2e-$$" +CLIENT_LOG="$(mktemp -t openclaw-mcp-client-log.XXXXXX)" + +cleanup() { + docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true + rm -f "$CLIENT_LOG" +} +trap cleanup EXIT + +echo "Building Docker image..." +docker build -t "$IMAGE_NAME" -f "$ROOT_DIR/scripts/e2e/Dockerfile" "$ROOT_DIR" + +echo "Running in-container gateway + MCP smoke..." +set +e +docker run --rm \ + --name "$CONTAINER_NAME" \ + -e "OPENCLAW_GATEWAY_TOKEN=$TOKEN" \ + -e "OPENCLAW_SKIP_CHANNELS=1" \ + -e "OPENCLAW_SKIP_GMAIL_WATCHER=1" \ + -e "OPENCLAW_SKIP_CRON=1" \ + -e "OPENCLAW_SKIP_CANVAS_HOST=1" \ + -e "OPENCLAW_STATE_DIR=/tmp/openclaw-state" \ + -e "OPENCLAW_CONFIG_PATH=/tmp/openclaw-state/openclaw.json" \ + -e "GW_URL=ws://127.0.0.1:$PORT" \ + -e "GW_TOKEN=$TOKEN" \ + -e "OPENCLAW_ALLOW_INSECURE_PRIVATE_WS=1" \ + "$IMAGE_NAME" \ + bash -lc "set -euo pipefail + entry=dist/index.mjs + [ -f \"\$entry\" ] || entry=dist/index.js + node --import tsx scripts/e2e/mcp-channels-seed.ts >/tmp/mcp-channels-seed.log + node \"\$entry\" gateway --port $PORT --bind loopback --allow-unconfigured >/tmp/mcp-channels-gateway.log 2>&1 & + gateway_pid=\$! + cleanup_inner() { + kill \"\$gateway_pid\" >/dev/null 2>&1 || true + wait \"\$gateway_pid\" >/dev/null 2>&1 || true + } + trap cleanup_inner EXIT + for _ in \$(seq 1 80); do + if node --input-type=module -e ' + import net from \"node:net\"; + const socket = net.createConnection({ host: \"127.0.0.1\", port: $PORT }); + const timeout = setTimeout(() => { + socket.destroy(); + process.exit(1); + }, 400); + socket.on(\"connect\", () => { + clearTimeout(timeout); + socket.end(); + process.exit(0); + }); + socket.on(\"error\", () => { + clearTimeout(timeout); + process.exit(1); + }); + ' >/dev/null 2>&1; then + break + fi + sleep 0.25 + done + node --import tsx scripts/e2e/mcp-channels-docker-client.ts + tail -n 80 /tmp/mcp-channels-gateway.log + " | tee "$CLIENT_LOG" +status=${PIPESTATUS[0]} +set -e + +if [ "$status" -ne 0 ]; then + echo "Docker MCP smoke failed" + exit "$status" +fi + +echo "OK" diff --git a/scripts/e2e/mcp-channels-seed.ts b/scripts/e2e/mcp-channels-seed.ts new file mode 100644 index 00000000000..b6d5dae1c3a --- /dev/null +++ b/scripts/e2e/mcp-channels-seed.ts @@ -0,0 +1,134 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { + applyProviderConfigWithDefaultModelPreset, + type ModelDefinitionConfig, + type OpenClawConfig, +} from "../../src/plugin-sdk/provider-onboard.ts"; + +const DOCKER_OPENAI_MODEL_REF = "openai/gpt-5.4"; +const DOCKER_OPENAI_MODEL: ModelDefinitionConfig = { + id: "gpt-5.4", + name: "gpt-5.4", + api: "openai-responses", + reasoning: true, + input: ["text", "image"], + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + }, + contextWindow: 1_050_000, + maxTokens: 128_000, +}; + +async function main() { + const stateDir = process.env.OPENCLAW_STATE_DIR?.trim() || path.join(os.homedir(), ".openclaw"); + const configPath = + process.env.OPENCLAW_CONFIG_PATH?.trim() || path.join(stateDir, "openclaw.json"); + const sessionsDir = path.join(stateDir, "agents", "main", "sessions"); + const sessionFile = path.join(sessionsDir, "sess-main.jsonl"); + const storePath = path.join(sessionsDir, "sessions.json"); + const now = Date.now(); + + await fs.mkdir(sessionsDir, { recursive: true }); + await fs.mkdir(path.dirname(configPath), { recursive: true }); + + const seededConfig = applyProviderConfigWithDefaultModelPreset( + { + gateway: { + controlUi: { + allowInsecureAuth: true, + enabled: false, + }, + }, + } satisfies OpenClawConfig, + { + providerId: "openai", + api: "openai-responses", + baseUrl: "http://127.0.0.1:9/v1", + defaultModel: DOCKER_OPENAI_MODEL, + defaultModelId: DOCKER_OPENAI_MODEL.id, + aliases: [{ modelRef: DOCKER_OPENAI_MODEL_REF, alias: "GPT" }], + primaryModelRef: DOCKER_OPENAI_MODEL_REF, + }, + ); + const openAiProvider = seededConfig.models?.providers?.openai; + if (!openAiProvider) { + throw new Error("failed to seed OpenAI provider config"); + } + openAiProvider.apiKey = "sk-docker-smoke-test"; + + await fs.writeFile(configPath, JSON.stringify(seededConfig, null, 2), "utf-8"); + + await fs.writeFile( + storePath, + JSON.stringify( + { + "agent:main:main": { + sessionId: "sess-main", + sessionFile, + updatedAt: now, + lastChannel: "imessage", + lastTo: "+15551234567", + lastAccountId: "imessage-default", + lastThreadId: "thread-42", + displayName: "Docker MCP Channel Smoke", + derivedTitle: "Docker MCP Channel Smoke", + lastMessagePreview: "seeded transcript", + }, + }, + null, + 2, + ), + "utf-8", + ); + + await fs.writeFile( + sessionFile, + [ + JSON.stringify({ type: "session", version: 1, id: "sess-main" }), + JSON.stringify({ + id: "msg-1", + message: { + role: "assistant", + content: [{ type: "text", text: "hello from seeded transcript" }], + timestamp: now, + }, + }), + JSON.stringify({ + id: "msg-attachment", + message: { + role: "assistant", + content: [ + { type: "text", text: "seeded image attachment" }, + { + type: "image", + source: { + type: "base64", + media_type: "image/png", + data: "abc", + }, + }, + ], + timestamp: now + 1, + }, + }), + ].join("\n") + "\n", + "utf-8", + ); + + process.stdout.write( + JSON.stringify({ + ok: true, + stateDir, + configPath, + storePath, + sessionFile, + }) + "\n", + ); +} + +await main(); diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index 1607f276fa1..c8c49f7ba53 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -18,6 +18,7 @@ const mockState = vi.hoisted(() => ({ sessionId: "sess-1", mainSessionKey: "main", finalText: "[[reply_to_current]]", + dispatchError: null as Error | null, triggerAgentRunStart: false, agentRunId: "run-agent-1", sessionEntry: {} as Record, @@ -88,6 +89,9 @@ vi.mock("../../auto-reply/dispatch.js", () => ({ }) => { mockState.lastDispatchCtx = params.ctx; mockState.lastDispatchImages = params.replyOptions?.images; + if (mockState.dispatchError) { + throw mockState.dispatchError; + } if (mockState.triggerAgentRunStart) { params.replyOptions?.onAgentRunStart?.(mockState.agentRunId); } @@ -326,6 +330,7 @@ async function runNonStreamingChatSend(params: { describe("chat directive tag stripping for non-streaming final payloads", () => { afterEach(() => { mockState.finalText = "[[reply_to_current]]"; + mockState.dispatchError = null; mockState.mainSessionKey = "main"; mockState.triggerAgentRunStart = false; mockState.agentRunId = "run-agent-1"; @@ -1711,4 +1716,38 @@ describe("chat directive tag stripping for non-streaming final payloads", () => }, }); }); + + it("emits a user transcript update when chat.send fails before an agent run starts", async () => { + createTranscriptFixture("openclaw-chat-send-user-transcript-error-no-run-"); + mockState.dispatchError = new Error("upstream unavailable"); + const respond = vi.fn(); + const context = createChatContext(); + + await runNonStreamingChatSend({ + context, + respond, + idempotencyKey: "idem-user-transcript-error-no-run", + message: "hello from failed dispatch", + expectBroadcast: false, + }); + + await waitForAssertion(() => { + expect(context.dedupe.get("chat:idem-user-transcript-error-no-run")?.ok).toBe(false); + const userUpdate = mockState.emittedTranscriptUpdates.find( + (update) => + typeof update.message === "object" && + update.message !== null && + (update.message as { role?: unknown }).role === "user", + ); + expect(userUpdate).toMatchObject({ + sessionFile: expect.stringMatching(/sess\.jsonl$/), + sessionKey: "main", + message: { + role: "user", + content: "hello from failed dispatch", + timestamp: expect.any(Number), + }, + }); + }); + }); }); diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 4105030eb0a..505996b069e 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1470,36 +1470,39 @@ export const chatHandlers: GatewayRequestHandlers = { channel: INTERNAL_MESSAGE_CHANNEL, }); const deliveredReplies: Array<{ payload: ReplyPayload; kind: "block" | "final" }> = []; - let userTranscriptUpdateEmitted = false; + let userTranscriptUpdatePromise: Promise | null = null; const emitUserTranscriptUpdate = async () => { - if (userTranscriptUpdateEmitted) { + if (userTranscriptUpdatePromise) { + await userTranscriptUpdatePromise; return; } - const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry(sessionKey); - const resolvedSessionId = latestEntry?.sessionId ?? entry?.sessionId; - if (!resolvedSessionId) { - return; - } - const transcriptPath = resolveTranscriptPath({ - sessionId: resolvedSessionId, - storePath: latestStorePath, - sessionFile: latestEntry?.sessionFile ?? entry?.sessionFile, - agentId, - }); - if (!transcriptPath) { - return; - } - userTranscriptUpdateEmitted = true; - const persistedImages = await persistedImagesPromise; - emitSessionTranscriptUpdate({ - sessionFile: transcriptPath, - sessionKey, - message: buildChatSendTranscriptMessage({ - message: parsedMessage, - savedImages: persistedImages, - timestamp: now, - }), - }); + userTranscriptUpdatePromise = (async () => { + const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry(sessionKey); + const resolvedSessionId = latestEntry?.sessionId ?? entry?.sessionId; + if (!resolvedSessionId) { + return; + } + const transcriptPath = resolveTranscriptPath({ + sessionId: resolvedSessionId, + storePath: latestStorePath, + sessionFile: latestEntry?.sessionFile ?? entry?.sessionFile, + agentId, + }); + if (!transcriptPath) { + return; + } + const persistedImages = await persistedImagesPromise; + emitSessionTranscriptUpdate({ + sessionFile: transcriptPath, + sessionKey, + message: buildChatSendTranscriptMessage({ + message: parsedMessage, + savedImages: persistedImages, + timestamp: now, + }), + }); + })(); + await userTranscriptUpdatePromise; }; let transcriptMediaRewriteDone = false; const rewriteUserTranscriptMedia = async () => { @@ -1541,6 +1544,15 @@ export const chatHandlers: GatewayRequestHandlers = { }, }); + // Surface accepted inbound turns immediately so transcript subscribers + // (gateway watchers, MCP bridges, external channel backends) do not wait + // on model startup, completion, or failure paths before seeing the user turn. + void emitUserTranscriptUpdate().catch((transcriptErr) => { + context.logGateway.warn( + `webchat eager user transcript update failed: ${formatForLog(transcriptErr)}`, + ); + }); + let agentRunStarted = false; void dispatchInboundMessage({ ctx, @@ -1663,6 +1675,16 @@ export const chatHandlers: GatewayRequestHandlers = { }); }) .catch((err) => { + void rewriteUserTranscriptMedia().catch((rewriteErr) => { + context.logGateway.warn( + `webchat transcript media rewrite failed after error: ${formatForLog(rewriteErr)}`, + ); + }); + void emitUserTranscriptUpdate().catch((transcriptErr) => { + context.logGateway.warn( + `webchat user transcript update failed after error: ${formatForLog(transcriptErr)}`, + ); + }); const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); setGatewayDedupeEntry({ dedupe: context.dedupe, diff --git a/src/mcp/channel-server.test.ts b/src/mcp/channel-server.test.ts index 41c3b84b5ce..fc81a52b0f9 100644 --- a/src/mcp/channel-server.test.ts +++ b/src/mcp/channel-server.test.ts @@ -136,6 +136,24 @@ describe("openclaw channel mcp server", () => { timestamp: Date.now(), }, }, + { + id: "msg-attachment", + message: { + role: "assistant", + content: [ + { type: "text", text: "attached image" }, + { + type: "image", + source: { + type: "base64", + media_type: "image/png", + data: "abc", + }, + }, + ], + timestamp: Date.now() + 1, + }, + }, ], }); @@ -175,6 +193,27 @@ describe("openclaw channel mcp server", () => { role: "assistant", content: [{ type: "text", text: "hello from transcript" }], }); + expect(read.structuredContent?.messages?.[1]).toMatchObject({ + __openclaw: { + id: "msg-attachment", + }, + }); + + const attachments = (await mcp.client.callTool({ + name: "attachments_fetch", + arguments: { session_key: sessionKey, message_id: "msg-attachment" }, + })) as { + structuredContent?: { attachments?: Array> }; + isError?: boolean; + }; + expect(attachments.isError).not.toBe(true); + expect(attachments.structuredContent?.attachments).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: "image", + }), + ]), + ); const waitPromise = mcp.client.callTool({ name: "events_wait", @@ -454,6 +493,28 @@ describe("openclaw channel mcp server", () => { request_id: "abcde", behavior: "allow", }); + + emitSessionTranscriptUpdate({ + sessionFile: path.join(path.dirname(storePath), "sess-claude.jsonl"), + sessionKey, + messageId: "msg-user-3", + message: { + role: "user", + content: "plain string user turn", + timestamp: Date.now(), + }, + }); + + await vi.waitFor(() => { + expect(channelNotifications).toHaveLength(2); + }); + expect(channelNotifications[1]).toMatchObject({ + content: "plain string user turn", + meta: expect.objectContaining({ + session_key: sessionKey, + message_id: "msg-user-3", + }), + }); } finally { await mcp?.close(); await harness.close(); diff --git a/src/mcp/channel-server.ts b/src/mcp/channel-server.ts index 012db898177..c669169c3ba 100644 --- a/src/mcp/channel-server.ts +++ b/src/mcp/channel-server.ts @@ -166,6 +166,15 @@ function toText(value: unknown): string | undefined { return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined; } +function resolveMessageId(entry: Record): string | undefined { + return ( + toText(entry.id) ?? + (entry.__openclaw && typeof entry.__openclaw === "object" + ? toText((entry.__openclaw as { id?: unknown }).id) + : undefined) + ); +} + function summarizeResult( label: string, count: number, @@ -832,7 +841,7 @@ export async function createOpenClawChannelMcpServer(opts: OpenClawMcpServeOptio }, async ({ session_key, message_id, limit }) => { const messages = await bridge.readMessages(session_key, limit ?? 100); - const message = messages.find((entry) => toText(entry.id) === message_id); + const message = messages.find((entry) => resolveMessageId(entry) === message_id); if (!message) { return { content: [{ type: "text", text: `message not found: ${message_id}` }], diff --git a/src/shared/chat-message-content.test.ts b/src/shared/chat-message-content.test.ts index 7c35516f903..adf848dafe5 100644 --- a/src/shared/chat-message-content.test.ts +++ b/src/shared/chat-message-content.test.ts @@ -10,6 +10,14 @@ describe("shared/chat-message-content", () => { ).toBe("hello"); }); + it("returns plain string content", () => { + expect( + extractFirstTextBlock({ + content: "hello from string content", + }), + ).toBe("hello from string content"); + }); + it("preserves empty-string text in the first block", () => { expect( extractFirstTextBlock({ diff --git a/src/shared/chat-message-content.ts b/src/shared/chat-message-content.ts index a874715b3a3..b1524932214 100644 --- a/src/shared/chat-message-content.ts +++ b/src/shared/chat-message-content.ts @@ -3,6 +3,9 @@ export function extractFirstTextBlock(message: unknown): string | undefined { return undefined; } const content = (message as { content?: unknown }).content; + if (typeof content === "string") { + return content; + } if (!Array.isArray(content) || content.length === 0) { return undefined; }