mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:00:43 +00:00
fix: stabilize channel MCP Docker smoke
This commit is contained in:
@@ -33,16 +33,18 @@ async function main() {
|
||||
});
|
||||
mcp = mcpHandle.client;
|
||||
}
|
||||
const callTool = <T>(params: Parameters<typeof mcp.callTool>[0]) =>
|
||||
mcp.callTool(params, undefined, { timeout: 240_000 }) as Promise<T>;
|
||||
|
||||
const conversation = await waitFor(
|
||||
"seeded conversation in conversations_list",
|
||||
async () => {
|
||||
const listed = (await mcp.callTool({
|
||||
const listed = await callTool<{
|
||||
structuredContent?: { conversations?: Array<Record<string, unknown>> };
|
||||
}>({
|
||||
name: "conversations_list",
|
||||
arguments: {},
|
||||
})) as {
|
||||
structuredContent?: { conversations?: Array<Record<string, unknown>> };
|
||||
};
|
||||
});
|
||||
return listed.structuredContent?.conversations?.find(
|
||||
(entry) => entry.sessionKey === "agent:main:main",
|
||||
);
|
||||
@@ -52,33 +54,40 @@ async function main() {
|
||||
assert(conversation.channel === "imessage", "expected seeded channel");
|
||||
assert(conversation.to === "+15551234567", "expected seeded target");
|
||||
|
||||
const fetched = (await mcp.callTool({
|
||||
name: "conversation_get",
|
||||
arguments: { session_key: "agent:main:main" },
|
||||
})) as {
|
||||
const fetched = await callTool<{
|
||||
structuredContent?: { conversation?: Record<string, unknown> };
|
||||
isError?: boolean;
|
||||
};
|
||||
}>({
|
||||
name: "conversation_get",
|
||||
arguments: { session_key: "agent:main:main" },
|
||||
});
|
||||
assert(!fetched.isError, "conversation_get should succeed");
|
||||
assert(
|
||||
fetched.structuredContent?.conversation?.sessionKey === "agent:main:main",
|
||||
"conversation_get returned wrong session",
|
||||
);
|
||||
|
||||
let lastHistory: unknown;
|
||||
const messages = await waitFor(
|
||||
"seeded transcript messages",
|
||||
async () => {
|
||||
const history = (await mcp.callTool({
|
||||
const history = await callTool<{
|
||||
structuredContent?: { messages?: Array<Record<string, unknown>> };
|
||||
}>({
|
||||
name: "messages_read",
|
||||
arguments: { session_key: "agent:main:main", limit: 10 },
|
||||
})) as {
|
||||
structuredContent?: { messages?: Array<Record<string, unknown>> };
|
||||
};
|
||||
});
|
||||
lastHistory = history;
|
||||
const currentMessages = history.structuredContent?.messages ?? [];
|
||||
return currentMessages.length >= 2 ? currentMessages : undefined;
|
||||
},
|
||||
240_000,
|
||||
);
|
||||
).catch((error) => {
|
||||
throw new Error(
|
||||
`timeout waiting for seeded transcript messages: ${JSON.stringify(lastHistory, null, 2)}`,
|
||||
{ cause: error },
|
||||
);
|
||||
});
|
||||
await waitFor(
|
||||
"seeded attachment message",
|
||||
() =>
|
||||
@@ -91,13 +100,13 @@ async function main() {
|
||||
240_000,
|
||||
);
|
||||
|
||||
const attachments = (await mcp.callTool({
|
||||
name: "attachments_fetch",
|
||||
arguments: { session_key: "agent:main:main", message_id: "msg-attachment" },
|
||||
})) as {
|
||||
const attachments = await callTool<{
|
||||
structuredContent?: { attachments?: Array<Record<string, unknown>> };
|
||||
isError?: boolean;
|
||||
};
|
||||
}>({
|
||||
name: "attachments_fetch",
|
||||
arguments: { session_key: "agent:main:main", message_id: "msg-attachment" },
|
||||
});
|
||||
assert(!attachments.isError, "attachments_fetch should succeed");
|
||||
assert(
|
||||
(attachments.structuredContent?.attachments?.length ?? 0) === 1,
|
||||
@@ -105,16 +114,16 @@ async function main() {
|
||||
);
|
||||
|
||||
const waited = (await Promise.all([
|
||||
mcp.callTool({
|
||||
callTool<{
|
||||
structuredContent?: { event?: Record<string, unknown> };
|
||||
}>({
|
||||
name: "events_wait",
|
||||
arguments: {
|
||||
session_key: "agent:main:main",
|
||||
after_cursor: 0,
|
||||
timeout_ms: 10_000,
|
||||
},
|
||||
}) as Promise<{
|
||||
structuredContent?: { event?: Record<string, unknown> };
|
||||
}>,
|
||||
}),
|
||||
gateway.request("chat.inject", {
|
||||
sessionKey: "agent:main:main",
|
||||
message: "assistant live event",
|
||||
@@ -129,12 +138,12 @@ async function main() {
|
||||
assert(assistantEvent.text === "assistant live event", "expected assistant event text");
|
||||
const assistantCursor = typeof assistantEvent.cursor === "number" ? assistantEvent.cursor : 0;
|
||||
|
||||
const polled = (await mcp.callTool({
|
||||
const polled = await callTool<{
|
||||
structuredContent?: { events?: Array<Record<string, unknown>> };
|
||||
}>({
|
||||
name: "events_poll",
|
||||
arguments: { session_key: "agent:main:main", after_cursor: 0, limit: 10 },
|
||||
})) as {
|
||||
structuredContent?: { events?: Array<Record<string, unknown>> };
|
||||
};
|
||||
});
|
||||
assert(
|
||||
(polled.structuredContent?.events ?? []).some(
|
||||
(entry) => entry.text === "assistant live event",
|
||||
@@ -144,16 +153,16 @@ async function main() {
|
||||
|
||||
const channelMessage = `hello from docker ${randomUUID()}`;
|
||||
const userEvent = (await Promise.all([
|
||||
mcp.callTool({
|
||||
callTool<{
|
||||
structuredContent?: { event?: Record<string, unknown> };
|
||||
}>({
|
||||
name: "events_wait",
|
||||
arguments: {
|
||||
session_key: "agent:main:main",
|
||||
after_cursor: assistantCursor,
|
||||
timeout_ms: 10_000,
|
||||
},
|
||||
}) as Promise<{
|
||||
structuredContent?: { event?: Record<string, unknown> };
|
||||
}>,
|
||||
}),
|
||||
gateway.request("chat.send", {
|
||||
sessionKey: "agent:main:main",
|
||||
message: channelMessage,
|
||||
|
||||
@@ -26,7 +26,8 @@ docker run --rm \
|
||||
-e "OPENCLAW_SKIP_GMAIL_WATCHER=1" \
|
||||
-e "OPENCLAW_SKIP_CRON=1" \
|
||||
-e "OPENCLAW_SKIP_CANVAS_HOST=1" \
|
||||
-e "OPENCLAW_ACPX_RUNTIME_STARTUP_PROBE=1" \
|
||||
-e "OPENCLAW_SKIP_ACPX_RUNTIME=1" \
|
||||
-e "OPENCLAW_SKIP_ACPX_RUNTIME_PROBE=1" \
|
||||
-e "OPENCLAW_STATE_DIR=/tmp/openclaw-state" \
|
||||
-e "OPENCLAW_CONFIG_PATH=/tmp/openclaw-state/openclaw.json" \
|
||||
-e "GW_URL=ws://127.0.0.1:$PORT" \
|
||||
@@ -50,11 +51,22 @@ docker run --rm \
|
||||
node --import tsx scripts/e2e/mcp-channels-seed.ts >/tmp/mcp-channels-seed.log
|
||||
node \"\$entry\" gateway --port $PORT --bind loopback --allow-unconfigured >/tmp/mcp-channels-gateway.log 2>&1 &
|
||||
gateway_pid=\$!
|
||||
stop_process() {
|
||||
pid=\"\$1\"
|
||||
kill \"\$pid\" >/dev/null 2>&1 || true
|
||||
for _ in \$(seq 1 40); do
|
||||
if ! kill -0 \"\$pid\" >/dev/null 2>&1; then
|
||||
wait \"\$pid\" >/dev/null 2>&1 || true
|
||||
return
|
||||
fi
|
||||
sleep 0.25
|
||||
done
|
||||
kill -9 \"\$pid\" >/dev/null 2>&1 || true
|
||||
wait \"\$pid\" >/dev/null 2>&1 || true
|
||||
}
|
||||
cleanup_inner() {
|
||||
kill \"\$gateway_pid\" >/dev/null 2>&1 || true
|
||||
wait \"\$gateway_pid\" >/dev/null 2>&1 || true
|
||||
kill \"\$mock_pid\" >/dev/null 2>&1 || true
|
||||
wait \"\$mock_pid\" >/dev/null 2>&1 || true
|
||||
stop_process \"\$gateway_pid\"
|
||||
stop_process \"\$mock_pid\"
|
||||
}
|
||||
dump_gateway_log_on_error() {
|
||||
status=\$?
|
||||
@@ -79,19 +91,6 @@ docker run --rm \
|
||||
tail -n 120 /tmp/mcp-channels-gateway.log 2>/dev/null || true
|
||||
exit 1
|
||||
fi
|
||||
acpx_ready=0
|
||||
for _ in \$(seq 1 2400); do
|
||||
if grep -q '\[plugins\] embedded acpx runtime backend ready' /tmp/mcp-channels-gateway.log 2>/dev/null; then
|
||||
acpx_ready=1
|
||||
break
|
||||
fi
|
||||
sleep 0.25
|
||||
done
|
||||
if [ \"\$acpx_ready\" -ne 1 ]; then
|
||||
echo \"Embedded ACPX runtime did not become ready\"
|
||||
tail -n 120 /tmp/mcp-channels-gateway.log 2>/dev/null || true
|
||||
exit 1
|
||||
fi
|
||||
node --import tsx scripts/e2e/mcp-channels-docker-client.ts
|
||||
" >"$CLIENT_LOG" 2>&1
|
||||
status=${PIPESTATUS[0]}
|
||||
|
||||
@@ -388,7 +388,10 @@ export async function maybeApprovePendingBridgePairing(
|
||||
}>("device.pair.list", {});
|
||||
} catch (error) {
|
||||
const message = formatErrorMessage(error);
|
||||
if (message.includes("missing scope: operator.pairing")) {
|
||||
if (
|
||||
message.includes("missing scope: operator.pairing") ||
|
||||
message.includes("device.pair.list")
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
|
||||
@@ -23,6 +23,16 @@ async function main() {
|
||||
enabled: false,
|
||||
},
|
||||
},
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: {
|
||||
every: "0m",
|
||||
},
|
||||
},
|
||||
},
|
||||
plugins: {
|
||||
enabled: false,
|
||||
},
|
||||
} satisfies OpenClawConfig,
|
||||
"sk-docker-smoke-test",
|
||||
);
|
||||
|
||||
@@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto";
|
||||
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveGatewayClientBootstrap } from "../gateway/client-bootstrap.js";
|
||||
import { GatewayClient } from "../gateway/client.js";
|
||||
import { GatewayClient, GatewayClientRequestError } from "../gateway/client.js";
|
||||
import { APPROVALS_SCOPE, READ_SCOPE, WRITE_SCOPE } from "../gateway/method-scopes.js";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js";
|
||||
import type { EventFrame } from "../gateway/protocol/index.js";
|
||||
@@ -54,6 +54,7 @@ export class OpenClawChannelBridge {
|
||||
private closed = false;
|
||||
private ready = false;
|
||||
private started = false;
|
||||
private retryingInitialConnect = false;
|
||||
private readonly readyPromise: Promise<void>;
|
||||
private resolveReady!: () => void;
|
||||
private rejectReady!: (error: Error) => void;
|
||||
@@ -110,19 +111,27 @@ export class OpenClawChannelBridge {
|
||||
clientVersion: VERSION,
|
||||
mode: GATEWAY_CLIENT_MODES.CLI,
|
||||
scopes: [READ_SCOPE, WRITE_SCOPE, APPROVALS_SCOPE],
|
||||
requestTimeoutMs: 180_000,
|
||||
onEvent: (event) => {
|
||||
void this.handleGatewayEvent(event);
|
||||
},
|
||||
onHelloOk: () => {
|
||||
this.retryingInitialConnect = false;
|
||||
void this.handleHelloOk();
|
||||
},
|
||||
onConnectError: (error) => {
|
||||
this.rejectReadyOnce(error instanceof Error ? error : new Error(String(error)));
|
||||
const normalizedError = error instanceof Error ? error : new Error(String(error));
|
||||
if (shouldRetryInitialMcpGatewayConnect(normalizedError)) {
|
||||
this.retryingInitialConnect = true;
|
||||
return;
|
||||
}
|
||||
this.rejectReadyOnce(normalizedError);
|
||||
},
|
||||
onClose: (code, reason) => {
|
||||
if (!this.ready && !this.closed) {
|
||||
if (!this.ready && !this.closed && !this.retryingInitialConnect) {
|
||||
this.rejectReadyOnce(new Error(`gateway closed before ready (${code}): ${reason}`));
|
||||
}
|
||||
this.retryingInitialConnect = false;
|
||||
},
|
||||
});
|
||||
this.gateway.start();
|
||||
@@ -192,8 +201,8 @@ export class OpenClawChannelBridge {
|
||||
limit = 20,
|
||||
): Promise<NonNullable<ChatHistoryResult["messages"]>> {
|
||||
await this.waitUntilReady();
|
||||
const response: ChatHistoryResult = await this.requestGateway("chat.history", {
|
||||
sessionKey,
|
||||
const response: ChatHistoryResult = await this.requestGateway("sessions.get", {
|
||||
key: sessionKey,
|
||||
limit,
|
||||
});
|
||||
return response.messages ?? [];
|
||||
@@ -514,3 +523,14 @@ export class OpenClawChannelBridge {
|
||||
return Boolean(conversation);
|
||||
}
|
||||
}
|
||||
|
||||
export function shouldRetryInitialMcpGatewayConnect(error: Error): boolean {
|
||||
if (error instanceof GatewayClientRequestError) {
|
||||
return error.retryable;
|
||||
}
|
||||
const message = error.message.toLowerCase();
|
||||
return (
|
||||
message.includes("gateway request timeout for connect") ||
|
||||
message.includes("gateway connect challenge timeout")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ import { Client } from "@modelcontextprotocol/sdk/client/index.js";
|
||||
import { InMemoryTransport } from "@modelcontextprotocol/sdk/inMemory.js";
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { z } from "zod";
|
||||
import { GatewayClientRequestError } from "../gateway/client.js";
|
||||
import { shouldRetryInitialMcpGatewayConnect } from "./channel-bridge.js";
|
||||
import { createOpenClawChannelMcpServer, OpenClawChannelBridge } from "./channel-server.js";
|
||||
import { extractAttachmentsFromMessage } from "./channel-shared.js";
|
||||
|
||||
@@ -73,6 +75,30 @@ async function flushMcpNotifications() {
|
||||
}
|
||||
|
||||
describe("openclaw channel mcp server", () => {
|
||||
test("keeps initial MCP gateway connection alive through transient connect errors", () => {
|
||||
expect(
|
||||
shouldRetryInitialMcpGatewayConnect(new Error("gateway request timeout for connect")),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldRetryInitialMcpGatewayConnect(
|
||||
new GatewayClientRequestError({
|
||||
code: "BUSY",
|
||||
message: "gateway busy",
|
||||
retryable: true,
|
||||
}),
|
||||
),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldRetryInitialMcpGatewayConnect(
|
||||
new GatewayClientRequestError({
|
||||
code: "UNAUTHORIZED",
|
||||
message: "auth failed",
|
||||
retryable: false,
|
||||
}),
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
describe("gateway-backed flows", () => {
|
||||
describe("gateway integration", () => {
|
||||
test("lists conversations and reads messages", async () => {
|
||||
@@ -93,7 +119,7 @@ describe("openclaw channel mcp server", () => {
|
||||
],
|
||||
};
|
||||
}
|
||||
if (method === "chat.history") {
|
||||
if (method === "sessions.get") {
|
||||
return {
|
||||
messages: [
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user