fix: harden mcp channel bridge smoke

This commit is contained in:
Peter Steinberger
2026-03-28 04:10:00 +00:00
parent 9b405f88d4
commit ec5877346c
11 changed files with 970 additions and 29 deletions

View File

@@ -18,6 +18,7 @@ const mockState = vi.hoisted(() => ({
sessionId: "sess-1",
mainSessionKey: "main",
finalText: "[[reply_to_current]]",
dispatchError: null as Error | null,
triggerAgentRunStart: false,
agentRunId: "run-agent-1",
sessionEntry: {} as Record<string, unknown>,
@@ -88,6 +89,9 @@ vi.mock("../../auto-reply/dispatch.js", () => ({
}) => {
mockState.lastDispatchCtx = params.ctx;
mockState.lastDispatchImages = params.replyOptions?.images;
if (mockState.dispatchError) {
throw mockState.dispatchError;
}
if (mockState.triggerAgentRunStart) {
params.replyOptions?.onAgentRunStart?.(mockState.agentRunId);
}
@@ -326,6 +330,7 @@ async function runNonStreamingChatSend(params: {
describe("chat directive tag stripping for non-streaming final payloads", () => {
afterEach(() => {
mockState.finalText = "[[reply_to_current]]";
mockState.dispatchError = null;
mockState.mainSessionKey = "main";
mockState.triggerAgentRunStart = false;
mockState.agentRunId = "run-agent-1";
@@ -1711,4 +1716,38 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
},
});
});
it("emits a user transcript update when chat.send fails before an agent run starts", async () => {
createTranscriptFixture("openclaw-chat-send-user-transcript-error-no-run-");
mockState.dispatchError = new Error("upstream unavailable");
const respond = vi.fn();
const context = createChatContext();
await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-user-transcript-error-no-run",
message: "hello from failed dispatch",
expectBroadcast: false,
});
await waitForAssertion(() => {
expect(context.dedupe.get("chat:idem-user-transcript-error-no-run")?.ok).toBe(false);
const userUpdate = mockState.emittedTranscriptUpdates.find(
(update) =>
typeof update.message === "object" &&
update.message !== null &&
(update.message as { role?: unknown }).role === "user",
);
expect(userUpdate).toMatchObject({
sessionFile: expect.stringMatching(/sess\.jsonl$/),
sessionKey: "main",
message: {
role: "user",
content: "hello from failed dispatch",
timestamp: expect.any(Number),
},
});
});
});
});

View File

@@ -1470,36 +1470,39 @@ export const chatHandlers: GatewayRequestHandlers = {
channel: INTERNAL_MESSAGE_CHANNEL,
});
const deliveredReplies: Array<{ payload: ReplyPayload; kind: "block" | "final" }> = [];
let userTranscriptUpdateEmitted = false;
let userTranscriptUpdatePromise: Promise<void> | null = null;
const emitUserTranscriptUpdate = async () => {
if (userTranscriptUpdateEmitted) {
if (userTranscriptUpdatePromise) {
await userTranscriptUpdatePromise;
return;
}
const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry(sessionKey);
const resolvedSessionId = latestEntry?.sessionId ?? entry?.sessionId;
if (!resolvedSessionId) {
return;
}
const transcriptPath = resolveTranscriptPath({
sessionId: resolvedSessionId,
storePath: latestStorePath,
sessionFile: latestEntry?.sessionFile ?? entry?.sessionFile,
agentId,
});
if (!transcriptPath) {
return;
}
userTranscriptUpdateEmitted = true;
const persistedImages = await persistedImagesPromise;
emitSessionTranscriptUpdate({
sessionFile: transcriptPath,
sessionKey,
message: buildChatSendTranscriptMessage({
message: parsedMessage,
savedImages: persistedImages,
timestamp: now,
}),
});
userTranscriptUpdatePromise = (async () => {
const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry(sessionKey);
const resolvedSessionId = latestEntry?.sessionId ?? entry?.sessionId;
if (!resolvedSessionId) {
return;
}
const transcriptPath = resolveTranscriptPath({
sessionId: resolvedSessionId,
storePath: latestStorePath,
sessionFile: latestEntry?.sessionFile ?? entry?.sessionFile,
agentId,
});
if (!transcriptPath) {
return;
}
const persistedImages = await persistedImagesPromise;
emitSessionTranscriptUpdate({
sessionFile: transcriptPath,
sessionKey,
message: buildChatSendTranscriptMessage({
message: parsedMessage,
savedImages: persistedImages,
timestamp: now,
}),
});
})();
await userTranscriptUpdatePromise;
};
let transcriptMediaRewriteDone = false;
const rewriteUserTranscriptMedia = async () => {
@@ -1541,6 +1544,15 @@ export const chatHandlers: GatewayRequestHandlers = {
},
});
// Surface accepted inbound turns immediately so transcript subscribers
// (gateway watchers, MCP bridges, external channel backends) do not wait
// on model startup, completion, or failure paths before seeing the user turn.
void emitUserTranscriptUpdate().catch((transcriptErr) => {
context.logGateway.warn(
`webchat eager user transcript update failed: ${formatForLog(transcriptErr)}`,
);
});
let agentRunStarted = false;
void dispatchInboundMessage({
ctx,
@@ -1663,6 +1675,16 @@ export const chatHandlers: GatewayRequestHandlers = {
});
})
.catch((err) => {
void rewriteUserTranscriptMedia().catch((rewriteErr) => {
context.logGateway.warn(
`webchat transcript media rewrite failed after error: ${formatForLog(rewriteErr)}`,
);
});
void emitUserTranscriptUpdate().catch((transcriptErr) => {
context.logGateway.warn(
`webchat user transcript update failed after error: ${formatForLog(transcriptErr)}`,
);
});
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
setGatewayDedupeEntry({
dedupe: context.dedupe,

View File

@@ -136,6 +136,24 @@ describe("openclaw channel mcp server", () => {
timestamp: Date.now(),
},
},
{
id: "msg-attachment",
message: {
role: "assistant",
content: [
{ type: "text", text: "attached image" },
{
type: "image",
source: {
type: "base64",
media_type: "image/png",
data: "abc",
},
},
],
timestamp: Date.now() + 1,
},
},
],
});
@@ -175,6 +193,27 @@ describe("openclaw channel mcp server", () => {
role: "assistant",
content: [{ type: "text", text: "hello from transcript" }],
});
expect(read.structuredContent?.messages?.[1]).toMatchObject({
__openclaw: {
id: "msg-attachment",
},
});
const attachments = (await mcp.client.callTool({
name: "attachments_fetch",
arguments: { session_key: sessionKey, message_id: "msg-attachment" },
})) as {
structuredContent?: { attachments?: Array<Record<string, unknown>> };
isError?: boolean;
};
expect(attachments.isError).not.toBe(true);
expect(attachments.structuredContent?.attachments).toEqual(
expect.arrayContaining([
expect.objectContaining({
type: "image",
}),
]),
);
const waitPromise = mcp.client.callTool({
name: "events_wait",
@@ -454,6 +493,28 @@ describe("openclaw channel mcp server", () => {
request_id: "abcde",
behavior: "allow",
});
emitSessionTranscriptUpdate({
sessionFile: path.join(path.dirname(storePath), "sess-claude.jsonl"),
sessionKey,
messageId: "msg-user-3",
message: {
role: "user",
content: "plain string user turn",
timestamp: Date.now(),
},
});
await vi.waitFor(() => {
expect(channelNotifications).toHaveLength(2);
});
expect(channelNotifications[1]).toMatchObject({
content: "plain string user turn",
meta: expect.objectContaining({
session_key: sessionKey,
message_id: "msg-user-3",
}),
});
} finally {
await mcp?.close();
await harness.close();

View File

@@ -166,6 +166,15 @@ function toText(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
}
function resolveMessageId(entry: Record<string, unknown>): string | undefined {
return (
toText(entry.id) ??
(entry.__openclaw && typeof entry.__openclaw === "object"
? toText((entry.__openclaw as { id?: unknown }).id)
: undefined)
);
}
function summarizeResult(
label: string,
count: number,
@@ -832,7 +841,7 @@ export async function createOpenClawChannelMcpServer(opts: OpenClawMcpServeOptio
},
async ({ session_key, message_id, limit }) => {
const messages = await bridge.readMessages(session_key, limit ?? 100);
const message = messages.find((entry) => toText(entry.id) === message_id);
const message = messages.find((entry) => resolveMessageId(entry) === message_id);
if (!message) {
return {
content: [{ type: "text", text: `message not found: ${message_id}` }],

View File

@@ -10,6 +10,14 @@ describe("shared/chat-message-content", () => {
).toBe("hello");
});
it("returns plain string content", () => {
expect(
extractFirstTextBlock({
content: "hello from string content",
}),
).toBe("hello from string content");
});
it("preserves empty-string text in the first block", () => {
expect(
extractFirstTextBlock({

View File

@@ -3,6 +3,9 @@ export function extractFirstTextBlock(message: unknown): string | undefined {
return undefined;
}
const content = (message as { content?: unknown }).content;
if (typeof content === "string") {
return content;
}
if (!Array.isArray(content) || content.length === 0) {
return undefined;
}