From 18c98316f7ace3d2f9e10e71d01b15ba8136c1b1 Mon Sep 17 00:00:00 2001 From: Marcus Castro <7562095+mcaxtr@users.noreply.github.com> Date: Fri, 24 Apr 2026 01:04:28 -0300 Subject: [PATCH] fix(whatsapp): canonicalize outbound media delivery (#69813) * fix(whatsapp): normalize outbound media payloads * fix(embedded-runner): preserve final media directives * fix(auto-reply): keep non-streaming media on final path * fix(auto-reply): warn when reply media is dropped * fix(whatsapp): align auto-reply media delivery * docs(changelog): note whatsapp media normalization --- CHANGELOG.md | 1 + ...compresses-common-formats-jpeg-cap.test.ts | 2 +- .../src/auto-reply/deliver-reply.test.ts | 202 +++++++++++++++++- .../whatsapp/src/auto-reply/deliver-reply.ts | 69 +++--- .../whatsapp/src/channel-outbound.test.ts | 74 +++++++ extensions/whatsapp/src/channel-outbound.ts | 16 +- .../src/outbound-adapter.sendpayload.test.ts | 25 +++ extensions/whatsapp/src/outbound-adapter.ts | 104 ++------- extensions/whatsapp/src/outbound-base.test.ts | 140 ++++++++++++ extensions/whatsapp/src/outbound-base.ts | 49 ++++- .../whatsapp/src/outbound-media-contract.ts | 147 +++++++++++++ .../src/outbound-payload.contract.test.ts | 29 ++- extensions/whatsapp/src/send.test.ts | 51 +++++ extensions/whatsapp/src/send.ts | 47 ++-- .../pi-embedded-runner/run/payloads.test.ts | 55 +++++ src/agents/pi-embedded-runner/run/payloads.ts | 43 +++- .../reply/agent-runner-payloads.test.ts | 51 ++++- src/auto-reply/reply/agent-runner-payloads.ts | 43 ++-- .../reply/agent-runner.media-paths.test.ts | 12 +- src/auto-reply/reply/reply-delivery.test.ts | 32 ++- src/auto-reply/reply/reply-delivery.ts | 9 +- .../reply/reply-media-paths.test.ts | 40 ++++ src/auto-reply/reply/reply-media-paths.ts | 8 + src/auto-reply/reply/reply-payloads-dedupe.ts | 7 +- 24 files changed, 1056 insertions(+), 200 deletions(-) create mode 100644 extensions/whatsapp/src/channel-outbound.test.ts create mode 100644 extensions/whatsapp/src/outbound-media-contract.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9de38895272..3c1d9ddcdba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai - Agents/replay: stop OpenAI/Codex transcript replay from synthesizing missing tool results while still preserving synthetic repair on Anthropic, Gemini, and Bedrock transport-owned sessions. (#61556) Thanks @VictorJeon and @vincentkoc. - Telegram/media replies: parse remote markdown image syntax into outbound media payloads on the final reply path, so Telegram group chats stop falling back to plain-text image URLs when the model or a tool emits `![...](...)` instead of a `MEDIA:` token. (#66191) Thanks @apezam and @vincentkoc. - Agents/WebChat: surface non-retryable provider failures such as billing, auth, and rate-limit errors from the embedded runner instead of logging `surface_error` and leaving webchat with no rendered error. Fixes #70124. (#70848) Thanks @truffle-dev. +- WhatsApp: unify outbound media normalization across direct sends and auto-replies. Thanks @mcaxtr. - Memory/CLI: declare the built-in `local` embedding provider in the memory-core manifest, so standalone `openclaw memory status`, `index`, and `search` can resolve local embeddings just like the gateway runtime. Fixes #70836. (#70873) Thanks @mattznojassist. - Gateway/WebChat: preserve image attachments for text-only primary models by offloading them as media refs instead of dropping them, so configured image tools can still inspect the original file. Fixes #68513, #44276, #51656, #70212. - Plugins/Google Meet: hang up delegated Twilio calls on leave, clean up Chrome realtime audio bridges when launch fails, and use a flat provider-safe tool schema. diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts index ffc80772560..22159af7556 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts @@ -379,7 +379,7 @@ describe("web auto-reply", () => { const fallback = reply.mock.calls[0]?.[0] as string; expect(fallback).toContain("caption"); expect(fallback).toContain("Media failed"); - expect(fallback).toContain("404"); + expect(fallback).not.toContain("404"); fetchMock.mockRestore(); }); diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts index 959cd17de3f..36a13745206 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts @@ -31,6 +31,7 @@ vi.mock("../media.js", () => ({ })); let deliverWebReply: typeof import("./deliver-reply.js").deliverWebReply; +let whatsappOutbound: typeof import("../outbound-adapter.js").whatsappOutbound; function makeMsg(): WebInboundMsg { return { @@ -69,6 +70,12 @@ function mockFirstReplyFailure(msg: WebInboundMsg, message: string) { ); } +function mockFirstReplyFailureWithWrappedError(msg: WebInboundMsg, message: string) { + (msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce({ + error: { message }, + }); +} + function mockSecondReplySuccess(msg: WebInboundMsg) { (msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce( undefined, @@ -97,6 +104,7 @@ async function expectReplySuppressed(replyResult: { text: string; isReasoning?: describe("deliverWebReply", () => { beforeAll(async () => { ({ deliverWebReply } = await import("./deliver-reply.js")); + ({ whatsappOutbound } = await import("../outbound-adapter.js")); }); it("suppresses payloads flagged as reasoning", async () => { @@ -217,6 +225,24 @@ describe("deliverWebReply", () => { }, ); + it("retries text send on wrapped transient failure", async () => { + const msg = makeMsg(); + mockFirstReplyFailureWithWrappedError(msg, "connection closed"); + mockSecondReplySuccess(msg); + + await deliverWebReply({ + replyResult: { text: "hi" }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); + + expect(msg.reply).toHaveBeenCalledTimes(2); + expect(sleep).toHaveBeenCalledWith(500); + }); + it("sends image media with caption and then remaining text", async () => { const msg = makeMsg(); const mediaLocalRoots = ["/tmp/workspace-work"]; @@ -250,6 +276,22 @@ describe("deliverWebReply", () => { expect(logVerbose).toHaveBeenCalled(); }); + it("preserves leading indentation after trimming only leading blank lines", async () => { + const msg = makeMsg(); + + await deliverWebReply({ + replyResult: { text: "\n \n indented block" }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); + + expect(msg.reply).toHaveBeenCalledTimes(1); + expect(msg.reply).toHaveBeenCalledWith(" indented block", undefined); + }); + it("keeps quote threading on media and trailing text chunks for a threaded reply", async () => { const msg = makeMsg(); mockLoadedImageMedia(); @@ -343,12 +385,137 @@ describe("deliverWebReply", () => { expect( String((msg.reply as unknown as { mock: { calls: unknown[][] } }).mock.calls[0]?.[0]), ).toContain("⚠️ Media failed"); + expect( + String((msg.reply as unknown as { mock: { calls: unknown[][] } }).mock.calls[0]?.[0]), + ).not.toContain("boom"); expect(replyLogger.warn).toHaveBeenCalledWith( expect.objectContaining({ mediaUrl: "http://example.com/img.jpg" }), "failed to send web media reply", ); }); + it("still attempts later media after the first media fails", async () => { + vi.clearAllMocks(); + const msg = makeMsg(); + ( + loadWebMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void } + ).mockResolvedValueOnce({ + buffer: Buffer.from("bad"), + contentType: "image/jpeg", + kind: "image", + }); + ( + loadWebMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void } + ).mockResolvedValueOnce({ + buffer: Buffer.from("good"), + contentType: "application/pdf", + kind: "file", + fileName: "good.pdf", + }); + mockFirstSendMediaFailure(msg, "boom"); + ( + msg.sendMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void } + ).mockResolvedValueOnce(undefined); + + await deliverWebReply({ + replyResult: { + text: "caption", + mediaUrls: ["http://example.com/bad.jpg", "http://example.com/good.pdf"], + }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); + + expect(loadWebMedia).toHaveBeenNthCalledWith(1, "http://example.com/bad.jpg", { + maxBytes: 1024 * 1024, + localRoots: undefined, + }); + expect(loadWebMedia).toHaveBeenNthCalledWith(2, "http://example.com/good.pdf", { + maxBytes: 1024 * 1024, + localRoots: undefined, + }); + expect(msg.sendMedia).toHaveBeenCalledTimes(2); + expect(msg.sendMedia).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + document: expect.any(Buffer), + fileName: "good.pdf", + caption: undefined, + mimetype: "application/pdf", + }), + undefined, + ); + expect(msg.reply).toHaveBeenCalledTimes(1); + expect( + String((msg.reply as unknown as { mock: { calls: unknown[][] } }).mock.calls[0]?.[0]), + ).toContain("⚠️ Media failed"); + expect( + String((msg.reply as unknown as { mock: { calls: unknown[][] } }).mock.calls[0]?.[0]), + ).not.toContain("boom"); + }); + + it("keeps payload and auto-reply media normalization in parity", async () => { + const payload = { + text: "\n\ncaption", + mediaUrls: [" ", " /tmp/voice.ogg "], + }; + const sendWhatsApp = vi.fn(async () => ({ messageId: "wa-1", toJid: "jid" })); + + await whatsappOutbound.sendPayload!({ + cfg: {}, + to: "5511999999999@c.us", + text: "", + payload, + deps: { sendWhatsApp }, + }); + + const msg = makeMsg(); + ( + loadWebMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void } + ).mockResolvedValueOnce({ + buffer: Buffer.from("aud"), + contentType: "audio/ogg", + kind: "audio", + }); + + await deliverWebReply({ + replyResult: payload, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); + + expect(sendWhatsApp).toHaveBeenCalledTimes(1); + expect(sendWhatsApp).toHaveBeenCalledWith("5511999999999@c.us", "caption", { + verbose: false, + cfg: {}, + mediaUrl: "/tmp/voice.ogg", + mediaLocalRoots: undefined, + accountId: undefined, + gifPlayback: undefined, + }); + expect(loadWebMedia).toHaveBeenCalledWith("/tmp/voice.ogg", { + maxBytes: 1024 * 1024, + localRoots: undefined, + }); + expect(msg.sendMedia).toHaveBeenCalledTimes(1); + expect(msg.sendMedia).toHaveBeenCalledWith( + expect.objectContaining({ + audio: expect.any(Buffer), + ptt: true, + mimetype: "audio/ogg; codecs=opus", + caption: "caption", + }), + undefined, + ); + expect(msg.reply).not.toHaveBeenCalled(); + }); + it("sends audio media as ptt voice note", async () => { const msg = makeMsg(); ( @@ -372,7 +539,7 @@ describe("deliverWebReply", () => { expect.objectContaining({ audio: expect.any(Buffer), ptt: true, - mimetype: "audio/ogg", + mimetype: "audio/ogg; codecs=opus", caption: "cap", }), undefined, @@ -438,4 +605,37 @@ describe("deliverWebReply", () => { undefined, ); }); + + it("strips URL query and fragment data from derived document file names", async () => { + const msg = makeMsg(); + ( + loadWebMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void } + ).mockResolvedValueOnce({ + buffer: Buffer.from("pdf"), + contentType: "application/pdf", + kind: "file", + }); + + await deliverWebReply({ + replyResult: { + text: "cap", + mediaUrl: "https://example.com/report.pdf?X-Amz-Signature=secret#frag", + }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); + + expect(msg.sendMedia).toHaveBeenCalledWith( + expect.objectContaining({ + document: expect.any(Buffer), + fileName: "report.pdf", + caption: "cap", + mimetype: "application/pdf", + }), + undefined, + ); + }); }); diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.ts index a74308a799f..59d671ce183 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.ts @@ -3,15 +3,20 @@ import { chunkMarkdownTextWithMode, type ChunkMode } from "openclaw/plugin-sdk/r import type { ReplyPayload } from "openclaw/plugin-sdk/reply-chunking"; import { isReasoningReplyPayload, - resolveOutboundMediaUrls, sendMediaWithLeadingCaption, } from "openclaw/plugin-sdk/reply-payload"; import { logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import { loadWebMedia } from "../media.js"; +import { + normalizeWhatsAppLoadedMedia, + normalizeWhatsAppOutboundPayload, + normalizeWhatsAppPayloadTextPreservingIndentation, + sendWhatsAppOutboundWithRetry, +} from "../outbound-media-contract.js"; import { buildQuotedMessageOptions, lookupInboundMessageMeta } from "../quoted-message.js"; import { newConnectionId } from "../reconnect.js"; import { formatError } from "../session.js"; -import { convertMarkdownTables, sleep } from "../text-runtime.js"; +import { convertMarkdownTables } from "../text-runtime.js"; import { markdownToWhatsApp } from "../text-runtime.js"; import { whatsappOutboundLog } from "./loggers.js"; import type { WebInboundMsg } from "./types.js"; @@ -40,11 +45,12 @@ export async function deliverWebReply(params: { } const tableMode = params.tableMode ?? "code"; const chunkMode = params.chunkMode ?? "length"; - const convertedText = markdownToWhatsApp( - convertMarkdownTables(replyResult.text || "", tableMode), - ); + const normalizedReply = normalizeWhatsAppOutboundPayload(replyResult, { + normalizeText: normalizeWhatsAppPayloadTextPreservingIndentation, + }); + const convertedText = markdownToWhatsApp(convertMarkdownTables(normalizedReply.text, tableMode)); const textChunks = chunkMarkdownTextWithMode(convertedText, textLimit, chunkMode); - const mediaList = resolveOutboundMediaUrls(replyResult); + const mediaList = normalizedReply.mediaUrls ?? []; const getQuote = () => { if (!replyResult.replyToId) { @@ -64,26 +70,15 @@ export async function deliverWebReply(params: { }; const sendWithRetry = async (fn: () => Promise, label: string, maxAttempts = 3) => { - let lastErr: unknown; - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - try { - return await fn(); - } catch (err) { - lastErr = err; - const errText = formatError(err); - const isLast = attempt === maxAttempts; - const shouldRetry = /closed|reset|timed\s*out|disconnect/i.test(errText); - if (!shouldRetry || isLast) { - throw err; - } - const backoffMs = 500 * attempt; + return await sendWhatsAppOutboundWithRetry({ + send: fn, + maxAttempts, + onRetry: ({ attempt, maxAttempts: retryMaxAttempts, backoffMs, errorText }) => { logVerbose( - `Retrying ${label} to ${msg.from} after failure (${attempt}/${maxAttempts - 1}) in ${backoffMs}ms: ${errText}`, + `Retrying ${label} to ${msg.from} after failure (${attempt}/${retryMaxAttempts - 1}) in ${backoffMs}ms: ${errorText}`, ); - await sleep(backoffMs); - } - } - throw lastErr; + }, + }); }; // Text-only replies @@ -125,10 +120,13 @@ export async function deliverWebReply(params: { mediaUrls: mediaList, caption: leadingCaption, send: async ({ mediaUrl, caption }) => { - const media = await loadWebMedia(mediaUrl, { - maxBytes: maxMediaBytes, - localRoots: params.mediaLocalRoots, - }); + const media = normalizeWhatsAppLoadedMedia( + await loadWebMedia(mediaUrl, { + maxBytes: maxMediaBytes, + localRoots: params.mediaLocalRoots, + }), + mediaUrl, + ); if (shouldLogVerbose()) { logVerbose( `Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`, @@ -143,7 +141,7 @@ export async function deliverWebReply(params: { { image: media.buffer, caption, - mimetype: media.contentType, + mimetype: media.mimetype, }, quote, ), @@ -157,7 +155,7 @@ export async function deliverWebReply(params: { { audio: media.buffer, ptt: true, - mimetype: media.contentType, + mimetype: media.mimetype, caption, }, quote, @@ -172,24 +170,22 @@ export async function deliverWebReply(params: { { video: media.buffer, caption, - mimetype: media.contentType, + mimetype: media.mimetype, }, quote, ), "media:video", ); } else { - const fileName = media.fileName ?? mediaUrl.split("/").pop() ?? "file"; - const mimetype = media.contentType ?? "application/octet-stream"; const quote = getQuote(); await sendWithRetry( () => msg.sendMedia( { document: media.buffer, - fileName, + fileName: media.fileName, caption, - mimetype, + mimetype: media.mimetype, }, quote, ), @@ -220,8 +216,7 @@ export async function deliverWebReply(params: { if (!isFirst) { return; } - const warning = - error instanceof Error ? `⚠️ Media failed: ${error.message}` : "⚠️ Media failed."; + const warning = "⚠️ Media failed."; const fallbackTextParts = [remainingText.shift() ?? caption ?? "", warning].filter(Boolean); const fallbackText = fallbackTextParts.join("\n"); if (!fallbackText) { diff --git a/extensions/whatsapp/src/channel-outbound.test.ts b/extensions/whatsapp/src/channel-outbound.test.ts new file mode 100644 index 00000000000..a6392a01b41 --- /dev/null +++ b/extensions/whatsapp/src/channel-outbound.test.ts @@ -0,0 +1,74 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; + +const hoisted = vi.hoisted(() => ({ + sendMessageWhatsApp: vi.fn(async () => ({ messageId: "wa-1", toJid: "jid" })), + sendPollWhatsApp: vi.fn(async () => ({ messageId: "poll-1", toJid: "jid" })), +})); + +vi.mock("./send.js", () => ({ + sendMessageWhatsApp: hoisted.sendMessageWhatsApp, + sendPollWhatsApp: hoisted.sendPollWhatsApp, +})); + +vi.mock("./runtime.js", () => ({ + getWhatsAppRuntime: () => ({ + logging: { + shouldLogVerbose: () => false, + }, + }), +})); + +let whatsappChannelOutbound: typeof import("./channel-outbound.js").whatsappChannelOutbound; + +describe("whatsappChannelOutbound", () => { + beforeAll(async () => { + ({ whatsappChannelOutbound } = await import("./channel-outbound.js")); + }); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("drops leading blank lines but preserves intentional indentation", () => { + expect( + whatsappChannelOutbound.normalizePayload?.({ + payload: { text: "\n \n indented" }, + }), + ).toEqual({ + text: " indented", + }); + }); + + it("preserves indentation for live text sends", async () => { + await whatsappChannelOutbound.sendText!({ + cfg: {}, + to: "5511999999999@c.us", + text: "\n \n indented", + }); + + expect(hoisted.sendMessageWhatsApp).toHaveBeenCalledWith("5511999999999@c.us", " indented", { + verbose: false, + cfg: {}, + accountId: undefined, + gifPlayback: undefined, + preserveLeadingWhitespace: true, + }); + }); + + it("preserves indentation for payload delivery", async () => { + await whatsappChannelOutbound.sendPayload!({ + cfg: {}, + to: "5511999999999@c.us", + text: "", + payload: { text: "\n \n indented" }, + }); + + expect(hoisted.sendMessageWhatsApp).toHaveBeenCalledWith("5511999999999@c.us", " indented", { + verbose: false, + cfg: {}, + accountId: undefined, + gifPlayback: undefined, + preserveLeadingWhitespace: true, + }); + }); +}); diff --git a/extensions/whatsapp/src/channel-outbound.ts b/extensions/whatsapp/src/channel-outbound.ts index 0c7b88ffe55..ca80f2a29ac 100644 --- a/extensions/whatsapp/src/channel-outbound.ts +++ b/extensions/whatsapp/src/channel-outbound.ts @@ -4,21 +4,31 @@ import { resolveWhatsAppOutboundTarget } from "./resolve-outbound-target.js"; import { getWhatsAppRuntime } from "./runtime.js"; import { sendMessageWhatsApp, sendPollWhatsApp } from "./send.js"; -export function normalizeWhatsAppPayloadText(text: string | undefined): string { +export function normalizeWhatsAppChannelPayloadText(text: string | undefined): string { return (text ?? "").replace(/^(?:[ \t]*\r?\n)+/, ""); } +function normalizeWhatsAppChannelSendText(text: string | undefined): string { + const normalized = normalizeWhatsAppChannelPayloadText(text); + return normalized.trim() ? normalized : ""; +} + export const whatsappChannelOutbound = { ...createWhatsAppOutboundBase({ chunker: chunkText, - sendMessageWhatsApp, + sendMessageWhatsApp: async (to, text, options) => + await sendMessageWhatsApp(to, text, { + ...options, + preserveLeadingWhitespace: true, + }), sendPollWhatsApp, shouldLogVerbose: () => getWhatsAppRuntime().logging.shouldLogVerbose(), resolveTarget: ({ to, allowFrom, mode }) => resolveWhatsAppOutboundTarget({ to, allowFrom, mode }), + normalizeText: normalizeWhatsAppChannelSendText, }), normalizePayload: ({ payload }: { payload: { text?: string } }) => ({ ...payload, - text: normalizeWhatsAppPayloadText(payload.text), + text: normalizeWhatsAppChannelPayloadText(payload.text), }), }; diff --git a/extensions/whatsapp/src/outbound-adapter.sendpayload.test.ts b/extensions/whatsapp/src/outbound-adapter.sendpayload.test.ts index 96a23a19f7d..f2ebf3ae7c3 100644 --- a/extensions/whatsapp/src/outbound-adapter.sendpayload.test.ts +++ b/extensions/whatsapp/src/outbound-adapter.sendpayload.test.ts @@ -75,6 +75,31 @@ describe("whatsappOutbound sendPayload", () => { }); }); + it("drops blank mediaUrls before sending payload media", async () => { + const sendWhatsApp = vi.fn(async () => ({ messageId: "wa-1", toJid: "jid" })); + + await whatsappOutbound.sendPayload!({ + cfg: {}, + to: "5511999999999@c.us", + text: "", + payload: { + text: "\n\ncaption", + mediaUrls: [" ", " /tmp/voice.ogg "], + }, + deps: { sendWhatsApp }, + }); + + expect(sendWhatsApp).toHaveBeenCalledTimes(1); + expect(sendWhatsApp).toHaveBeenCalledWith("5511999999999@c.us", "caption", { + verbose: false, + cfg: {}, + mediaUrl: "/tmp/voice.ogg", + mediaLocalRoots: undefined, + accountId: undefined, + gifPlayback: undefined, + }); + }); + it("skips whitespace-only text payloads", async () => { const sendWhatsApp = vi.fn(); diff --git a/extensions/whatsapp/src/outbound-adapter.ts b/extensions/whatsapp/src/outbound-adapter.ts index 5045e715594..7d0993936e9 100644 --- a/extensions/whatsapp/src/outbound-adapter.ts +++ b/extensions/whatsapp/src/outbound-adapter.ts @@ -1,16 +1,7 @@ -import { - type ChannelOutboundAdapter, - createAttachedChannelResultAdapter, - createEmptyChannelResult, -} from "openclaw/plugin-sdk/channel-send-result"; -import { resolveOutboundSendDep, sanitizeForPlainText } from "openclaw/plugin-sdk/outbound-runtime"; -import { - resolveSendableOutboundReplyParts, - sendTextMediaPayload, -} from "openclaw/plugin-sdk/reply-payload"; +import { type ChannelOutboundAdapter } from "openclaw/plugin-sdk/channel-send-result"; import { chunkText } from "openclaw/plugin-sdk/reply-runtime"; import { shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; -import { WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS } from "./outbound-send-deps.js"; +import { createWhatsAppOutboundBase } from "./outbound-base.js"; import { resolveWhatsAppOutboundTarget } from "./resolve-outbound-target.js"; type WhatsAppSendModule = typeof import("./send.js"); @@ -26,84 +17,19 @@ function trimLeadingWhitespace(text: string | undefined): string { return text?.trimStart() ?? ""; } -export const whatsappOutbound: ChannelOutboundAdapter = { - deliveryMode: "gateway", +export const whatsappOutbound: ChannelOutboundAdapter = createWhatsAppOutboundBase({ chunker: chunkText, - chunkerMode: "text", - textChunkLimit: 4000, - sanitizeText: ({ text }) => sanitizeForPlainText(text), - pollMaxOptions: 12, + sendMessageWhatsApp: async (to, text, options) => + await ( + await loadWhatsAppSendModule() + ).sendMessageWhatsApp(to, trimLeadingWhitespace(text), { + ...options, + }), + sendPollWhatsApp: async (to, poll, options) => + await (await loadWhatsAppSendModule()).sendPollWhatsApp(to, poll, options), + shouldLogVerbose: () => shouldLogVerbose(), resolveTarget: ({ to, allowFrom, mode }) => resolveWhatsAppOutboundTarget({ to, allowFrom, mode }), - sendPayload: async (ctx) => { - const text = trimLeadingWhitespace(ctx.payload.text); - const hasMedia = resolveSendableOutboundReplyParts(ctx.payload).hasMedia; - if (!text && !hasMedia) { - return createEmptyChannelResult("whatsapp"); - } - return await sendTextMediaPayload({ - channel: "whatsapp", - ctx: { - ...ctx, - payload: { - ...ctx.payload, - text, - }, - }, - adapter: whatsappOutbound, - }); - }, - ...createAttachedChannelResultAdapter({ - channel: "whatsapp", - sendText: async ({ cfg, to, text, accountId, deps, gifPlayback }) => { - const normalizedText = trimLeadingWhitespace(text); - if (!normalizedText) { - return createEmptyChannelResult("whatsapp"); - } - const send = - resolveOutboundSendDep(deps, "whatsapp", { - legacyKeys: WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS, - }) ?? (await loadWhatsAppSendModule()).sendMessageWhatsApp; - return await send(to, normalizedText, { - verbose: false, - cfg, - accountId: accountId ?? undefined, - gifPlayback, - }); - }, - sendMedia: async ({ - cfg, - to, - text, - mediaUrl, - mediaLocalRoots, - mediaReadFile, - accountId, - deps, - gifPlayback, - }) => { - const normalizedText = trimLeadingWhitespace(text); - const send = - resolveOutboundSendDep(deps, "whatsapp", { - legacyKeys: WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS, - }) ?? (await loadWhatsAppSendModule()).sendMessageWhatsApp; - return await send(to, normalizedText, { - verbose: false, - cfg, - mediaUrl, - mediaLocalRoots, - mediaReadFile, - accountId: accountId ?? undefined, - gifPlayback, - }); - }, - sendPoll: async ({ cfg, to, poll, accountId }) => - await ( - await loadWhatsAppSendModule() - ).sendPollWhatsApp(to, poll, { - verbose: shouldLogVerbose(), - accountId: accountId ?? undefined, - cfg, - }), - }), -}; + normalizeText: trimLeadingWhitespace, + skipEmptyText: true, +}); diff --git a/extensions/whatsapp/src/outbound-base.test.ts b/extensions/whatsapp/src/outbound-base.test.ts index e58aa8bf38c..9e735ca394b 100644 --- a/extensions/whatsapp/src/outbound-base.test.ts +++ b/extensions/whatsapp/src/outbound-base.test.ts @@ -351,6 +351,146 @@ describe("createWhatsAppOutboundBase", () => { ); }); + it("normalizes mediaUrls before payload delivery", async () => { + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-1", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendPayload!({ + cfg: {} as never, + to: "whatsapp:+15551234567", + text: "", + payload: { + text: "\n\ncaption", + mediaUrls: [" ", " /tmp/voice.ogg "], + }, + deps: { sendWhatsApp: sendMessageWhatsApp }, + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledTimes(1); + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+15551234567", + "caption", + expect.objectContaining({ + verbose: false, + mediaUrl: "/tmp/voice.ogg", + }), + ); + }); + + it("keeps explicit mediaUrl first when payload also includes mediaUrls", async () => { + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-1", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendPayload!({ + cfg: {} as never, + to: "whatsapp:+15551234567", + text: "", + payload: { + text: "\n\ncaption", + mediaUrl: "/tmp/primary.ogg", + mediaUrls: [" /tmp/secondary.ogg "], + }, + deps: { sendWhatsApp: sendMessageWhatsApp }, + }); + + expect(sendMessageWhatsApp).toHaveBeenNthCalledWith( + 1, + "whatsapp:+15551234567", + "caption", + expect.objectContaining({ + mediaUrl: "/tmp/primary.ogg", + }), + ); + expect(sendMessageWhatsApp).toHaveBeenNthCalledWith( + 2, + "whatsapp:+15551234567", + "", + expect.objectContaining({ + mediaUrl: "/tmp/secondary.ogg", + }), + ); + }); + + it("uses the caller-provided text normalization for payload delivery", async () => { + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-1", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + normalizeText: (text) => (text ?? "").replace(/^(?:[ \t]*\r?\n)+/, ""), + }); + + await outbound.sendPayload!({ + cfg: {} as never, + to: "whatsapp:+15551234567", + text: "", + payload: { + text: "\n \n indented", + }, + deps: { sendWhatsApp: sendMessageWhatsApp }, + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+15551234567", + " indented", + expect.objectContaining({ + verbose: false, + }), + ); + }); + + it("rejects structured-only payloads instead of reporting an empty successful send", async () => { + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-1", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await expect( + outbound.sendPayload!({ + cfg: {} as never, + to: "whatsapp:+15551234567", + text: "", + payload: { + channelData: { kind: "structured-only" }, + }, + deps: { sendWhatsApp: sendMessageWhatsApp }, + }), + ).rejects.toThrow( + "WhatsApp sendPayload does not support structured-only payloads without text or media.", + ); + expect(sendMessageWhatsApp).not.toHaveBeenCalled(); + }); + it("threads cfg into sendPollWhatsApp call", async () => { const sendPollWhatsApp = vi.fn(async () => ({ messageId: "wa-poll-1", diff --git a/extensions/whatsapp/src/outbound-base.ts b/extensions/whatsapp/src/outbound-base.ts index 72ef2a64caa..74742dfce92 100644 --- a/extensions/whatsapp/src/outbound-base.ts +++ b/extensions/whatsapp/src/outbound-base.ts @@ -10,6 +10,11 @@ import { } from "openclaw/plugin-sdk/channel-send-result"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { resolveOutboundSendDep, sanitizeForPlainText } from "openclaw/plugin-sdk/infra-runtime"; +import { sendTextMediaPayload } from "openclaw/plugin-sdk/reply-payload"; +import { + normalizeWhatsAppOutboundPayload, + normalizeWhatsAppPayloadText, +} from "./outbound-media-contract.js"; import { WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS } from "./outbound-send-deps.js"; import { lookupInboundMessageMetaForTarget } from "./quoted-message.js"; import { toWhatsappJid } from "./text-runtime.js"; @@ -34,6 +39,7 @@ type WhatsAppSendTextOptions = { participant?: string; messageText?: string; }; + preserveLeadingWhitespace?: boolean; }; type WhatsAppSendMessage = ( to: string, @@ -75,14 +81,28 @@ function resolveQuoteLookupAccountId(cfg?: OpenClawConfig, accountId?: string | }); } +type WhatsAppOutboundBaseCore = Pick< + ChannelOutboundAdapter, + | "deliveryMode" + | "chunker" + | "chunkerMode" + | "textChunkLimit" + | "sanitizeText" + | "pollMaxOptions" + | "resolveTarget" + | "sendText" + | "sendMedia" + | "sendPoll" +>; + export function createWhatsAppOutboundBase({ chunker, sendMessageWhatsApp, sendPollWhatsApp, shouldLogVerbose, resolveTarget, - normalizeText = (text) => text ?? "", - skipEmptyText = false, + normalizeText = normalizeWhatsAppPayloadText, + skipEmptyText = true, }: CreateWhatsAppOutboundBaseParams): Pick< ChannelOutboundAdapter, | "deliveryMode" @@ -92,6 +112,7 @@ export function createWhatsAppOutboundBase({ | "sanitizeText" | "pollMaxOptions" | "resolveTarget" + | "sendPayload" | "sendText" | "sendMedia" | "sendPoll" @@ -116,7 +137,7 @@ export function createWhatsAppOutboundBase({ }; }; - return { + const outbound: WhatsAppOutboundBaseCore = { deliveryMode: "gateway", chunker, chunkerMode: "text", @@ -192,4 +213,26 @@ export function createWhatsAppOutboundBase({ }), }), }; + return { + ...outbound, + sendPayload: async (ctx) => { + const payload = normalizeWhatsAppOutboundPayload(ctx.payload, { normalizeText }); + if (!payload.text && !(payload.mediaUrl || payload.mediaUrls?.length)) { + if (ctx.payload.interactive || ctx.payload.presentation || ctx.payload.channelData) { + throw new Error( + "WhatsApp sendPayload does not support structured-only payloads without text or media.", + ); + } + return { channel: "whatsapp", messageId: "" }; + } + return await sendTextMediaPayload({ + channel: "whatsapp", + ctx: { + ...ctx, + payload, + }, + adapter: outbound, + }); + }, + }; } diff --git a/extensions/whatsapp/src/outbound-media-contract.ts b/extensions/whatsapp/src/outbound-media-contract.ts new file mode 100644 index 00000000000..8ec45c7ec4b --- /dev/null +++ b/extensions/whatsapp/src/outbound-media-contract.ts @@ -0,0 +1,147 @@ +import path from "node:path"; +import { formatError } from "./session-errors.js"; +import { sleep } from "./text-runtime.js"; + +type WhatsAppOutboundPayloadLike = { + text?: string; + mediaUrl?: string; + mediaUrls?: readonly string[]; +}; + +type WhatsAppLoadedMediaLike = { + buffer: Buffer; + contentType?: string; + kind?: string; + fileName?: string; +}; + +export type CanonicalWhatsAppLoadedMedia = { + buffer: Buffer; + kind: "image" | "audio" | "video" | "document"; + mimetype: string; + fileName?: string; +}; + +export function normalizeWhatsAppPayloadText(text: string | undefined): string { + return text?.trimStart() ?? ""; +} + +export function normalizeWhatsAppPayloadTextPreservingIndentation( + text: string | undefined, +): string { + return (text ?? "").replace(/^(?:[ \t]*\r?\n)+/, ""); +} + +export function resolveWhatsAppOutboundMediaUrls( + payload: Pick, +): string[] { + const primaryMediaUrl = payload.mediaUrl?.trim(); + const mediaUrls = (payload.mediaUrls ? [...payload.mediaUrls] : []) + .map((entry) => entry.trim()) + .filter((entry): entry is string => Boolean(entry)); + const orderedMediaUrls = [primaryMediaUrl, ...mediaUrls].filter((entry): entry is string => + Boolean(entry), + ); + return Array.from(new Set(orderedMediaUrls)); +} + +// Keep new WhatsApp outbound-media behavior in this helper so payload, gateway, and auto-reply paths stay aligned. +export function normalizeWhatsAppOutboundPayload( + payload: T, + options?: { + normalizeText?: (text: string | undefined) => string; + }, +): Omit & { + text: string; + mediaUrl?: string; + mediaUrls?: string[]; +} { + const mediaUrls = resolveWhatsAppOutboundMediaUrls(payload); + const normalizeText = options?.normalizeText ?? normalizeWhatsAppPayloadText; + return { + ...payload, + text: normalizeText(payload.text), + mediaUrl: mediaUrls[0], + mediaUrls: mediaUrls.length > 0 ? mediaUrls : undefined, + }; +} + +export function normalizeWhatsAppLoadedMedia( + media: WhatsAppLoadedMediaLike, + mediaUrl?: string, +): CanonicalWhatsAppLoadedMedia { + const kind = + media.kind === "image" || media.kind === "audio" || media.kind === "video" + ? media.kind + : "document"; + const mimetype = + kind === "audio" && media.contentType === "audio/ogg" + ? "audio/ogg; codecs=opus" + : (media.contentType ?? "application/octet-stream"); + const fileName = + kind === "document" + ? (media.fileName ?? deriveWhatsAppDocumentFileName(mediaUrl) ?? "file") + : undefined; + return { + buffer: media.buffer, + kind, + mimetype, + ...(fileName ? { fileName } : {}), + }; +} + +function deriveWhatsAppDocumentFileName(mediaUrl: string | undefined): string | undefined { + if (!mediaUrl) { + return undefined; + } + try { + const parsed = new URL(mediaUrl); + const fileName = path.posix.basename(parsed.pathname); + return fileName ? decodeURIComponent(fileName) : undefined; + } catch { + const withoutQueryOrFragment = mediaUrl.split(/[?#]/, 1)[0] ?? ""; + const fileName = withoutQueryOrFragment.split(/[\\/]/).pop(); + return fileName || undefined; + } +} + +export function isRetryableWhatsAppOutboundError(error: unknown): boolean { + return /closed|reset|timed\s*out|disconnect/i.test(formatError(error)); +} + +export async function sendWhatsAppOutboundWithRetry(params: { + send: () => Promise; + onRetry?: (params: { + attempt: number; + maxAttempts: number; + backoffMs: number; + error: unknown; + errorText: string; + }) => Promise | void; + maxAttempts?: number; +}): Promise { + const maxAttempts = params.maxAttempts ?? 3; + let lastError: unknown; + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + try { + return await params.send(); + } catch (error) { + lastError = error; + const errorText = formatError(error); + const isLastAttempt = attempt === maxAttempts; + if (!isRetryableWhatsAppOutboundError(error) || isLastAttempt) { + throw error; + } + const backoffMs = 500 * attempt; + await params.onRetry?.({ + attempt, + maxAttempts, + backoffMs, + error, + errorText, + }); + await sleep(backoffMs); + } + } + throw lastError; +} diff --git a/extensions/whatsapp/src/outbound-payload.contract.test.ts b/extensions/whatsapp/src/outbound-payload.contract.test.ts index 7f961066415..559a1fd14e1 100644 --- a/extensions/whatsapp/src/outbound-payload.contract.test.ts +++ b/extensions/whatsapp/src/outbound-payload.contract.test.ts @@ -3,7 +3,7 @@ import { primeChannelOutboundSendMock, type OutboundPayloadHarnessParams, } from "openclaw/plugin-sdk/testing"; -import { describe, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { whatsappOutbound } from "./outbound-adapter.js"; function createWhatsAppHarness(params: OutboundPayloadHarnessParams) { @@ -31,4 +31,31 @@ describe("WhatsApp outbound payload contract", () => { chunking: { mode: "split", longTextLength: 5000, maxChunkLength: 4000 }, createHarness: createWhatsAppHarness, }); + + it("normalizes blank mediaUrls before contract delivery", async () => { + const sendWhatsApp = vi.fn(); + primeChannelOutboundSendMock(sendWhatsApp, { messageId: "wa-1" }); + + await whatsappOutbound.sendPayload!({ + cfg: {}, + to: "5511999999999@c.us", + text: "", + payload: { + text: "\n\ncaption", + mediaUrls: [" ", " /tmp/voice.ogg "], + }, + deps: { + whatsapp: sendWhatsApp, + }, + }); + + expect(sendWhatsApp).toHaveBeenCalledTimes(1); + expect(sendWhatsApp).toHaveBeenCalledWith( + "5511999999999@c.us", + "caption", + expect.objectContaining({ + mediaUrl: "/tmp/voice.ogg", + }), + ); + }); }); diff --git a/extensions/whatsapp/src/send.test.ts b/extensions/whatsapp/src/send.test.ts index ff8f086a2a2..fcccf8ce55a 100644 --- a/extensions/whatsapp/src/send.test.ts +++ b/extensions/whatsapp/src/send.test.ts @@ -49,6 +49,14 @@ vi.mock("./outbound-media.runtime.js", async () => { }; }); +vi.mock("./text-runtime.js", async () => { + const actual = await vi.importActual("./text-runtime.js"); + return { + ...actual, + sleep: vi.fn(async () => {}), + }; +}); + describe("web outbound", () => { const sendComposingTo = vi.fn(async () => {}); const sendMessage = vi.fn(async () => ({ messageId: "msg123" })); @@ -161,6 +169,16 @@ describe("web outbound", () => { expect(sendMessage).toHaveBeenLastCalledWith("+1555", "caption", buf, "image/jpeg"); }); + it("preserves intentional indentation when the caller opts out of transport trimming", async () => { + await sendMessageWhatsApp("+1555", " indented", { + verbose: false, + cfg: WHATSAPP_TEST_CFG, + preserveLeadingWhitespace: true, + }); + + expect(sendMessage).toHaveBeenLastCalledWith("+1555", " indented", undefined, undefined); + }); + it("skips whitespace-only text sends without media", async () => { const result = await sendMessageWhatsApp("+1555", "\n \t", { verbose: false, @@ -268,6 +286,39 @@ describe("web outbound", () => { expect(sendMessage).toHaveBeenLastCalledWith("+1555", "pic", buf, "image/jpeg"); }); + it("does not retry transient outbound send failures to avoid duplicate sends", async () => { + sendMessage.mockRejectedValueOnce({ error: { message: "connection closed" } }); + + await expect( + sendMessageWhatsApp("+1555", "hi", { verbose: false, cfg: WHATSAPP_TEST_CFG }), + ).rejects.toEqual({ error: { message: "connection closed" } }); + expect(sendMessage).toHaveBeenCalledTimes(1); + }); + + it("prefers explicit mediaUrl over mediaUrls when both are present", async () => { + const buf = Buffer.from("img"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "image/jpeg", + kind: "image", + }); + + await sendMessageWhatsApp("+1555", "pic", { + verbose: false, + cfg: WHATSAPP_TEST_CFG, + mediaUrl: "/tmp/primary.jpg", + mediaUrls: [" /tmp/secondary.jpg "], + }); + + expect(loadWebMediaMock).toHaveBeenCalledWith( + "/tmp/primary.jpg", + expect.objectContaining({ + hostReadCapability: false, + }), + ); + expect(sendMessage).toHaveBeenLastCalledWith("+1555", "pic", buf, "image/jpeg"); + }); + it("falls back to the first mediaUrls entry when mediaUrl is omitted", async () => { const buf = Buffer.from("img"); loadWebMediaMock.mockResolvedValueOnce({ diff --git a/extensions/whatsapp/src/send.ts b/extensions/whatsapp/src/send.ts index 3363fd8e08a..12da0318adc 100644 --- a/extensions/whatsapp/src/send.ts +++ b/extensions/whatsapp/src/send.ts @@ -14,6 +14,11 @@ import { } from "./accounts.js"; import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js"; import type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js"; +import { + normalizeWhatsAppLoadedMedia, + normalizeWhatsAppPayloadText, + resolveWhatsAppOutboundMediaUrls, +} from "./outbound-media-contract.js"; import { loadOutboundMediaFromUrl } from "./outbound-media.runtime.js"; import { markdownToWhatsApp, toWhatsappJid } from "./text-runtime.js"; @@ -69,16 +74,13 @@ export async function sendMessageWhatsApp( participant?: string; messageText?: string; }; + preserveLeadingWhitespace?: boolean; }, ): Promise<{ messageId: string; toJid: string }> { - let text = body.trimStart(); + let text = options.preserveLeadingWhitespace ? body : normalizeWhatsAppPayloadText(body); const jid = toWhatsappJid(to); - const mediaUrls = Array.isArray(options.mediaUrls) - ? options.mediaUrls - .map((entry) => (typeof entry === "string" ? entry.trim() : "")) - .filter(Boolean) - : []; - const primaryMediaUrl = options.mediaUrl?.trim() || mediaUrls[0]; + const mediaUrls = resolveWhatsAppOutboundMediaUrls(options); + const primaryMediaUrl = mediaUrls[0]; if (!text && !primaryMediaUrl) { return { messageId: "", toJid: jid }; } @@ -112,28 +114,23 @@ export async function sendMessageWhatsApp( let mediaType: string | undefined; let documentFileName: string | undefined; if (primaryMediaUrl) { - const media = await loadOutboundMediaFromUrl(primaryMediaUrl, { - maxBytes: resolveWhatsAppMediaMaxBytes(account), - mediaAccess: options.mediaAccess, - mediaLocalRoots: options.mediaLocalRoots, - mediaReadFile: options.mediaReadFile, - }); + const media = normalizeWhatsAppLoadedMedia( + await loadOutboundMediaFromUrl(primaryMediaUrl, { + maxBytes: resolveWhatsAppMediaMaxBytes(account), + mediaAccess: options.mediaAccess, + mediaLocalRoots: options.mediaLocalRoots, + mediaReadFile: options.mediaReadFile, + }), + primaryMediaUrl, + ); const caption = text || undefined; mediaBuffer = media.buffer; - mediaType = media.contentType ?? "application/octet-stream"; - if (media.kind === "audio") { - // WhatsApp expects explicit opus codec for PTT voice notes. - mediaType = - media.contentType === "audio/ogg" - ? "audio/ogg; codecs=opus" - : (media.contentType ?? "application/octet-stream"); - } else if (media.kind === "video") { - text = caption ?? ""; - } else if (media.kind === "image") { - text = caption ?? ""; - } else { + mediaType = media.mimetype; + if (media.kind === "document") { text = caption ?? ""; documentFileName = media.fileName; + } else { + text = caption ?? ""; } } outboundLog.info(`Sending message -> ${redactedJid}${primaryMediaUrl ? " (media)" : ""}`); diff --git a/src/agents/pi-embedded-runner/run/payloads.test.ts b/src/agents/pi-embedded-runner/run/payloads.test.ts index c2db4ac95be..8faf106189d 100644 --- a/src/agents/pi-embedded-runner/run/payloads.test.ts +++ b/src/agents/pi-embedded-runner/run/payloads.test.ts @@ -199,4 +199,59 @@ describe("buildEmbeddedRunPayloads tool-error warnings", () => { assistantTexts: ['{"action":"NO_REPLY"}'], }); }); + + it("preserves media directives when stored assistant text was reduced to visible text only", () => { + const payloads = buildPayloads({ + assistantTexts: ["Attached image"], + lastAssistant: { + role: "assistant", + stopReason: "stop", + content: [ + { + type: "text", + text: "MEDIA:/tmp/reply-image.png\nAttached image", + textSignature: JSON.stringify({ + v: 1, + id: "item_final", + phase: "final_answer", + }), + }, + ], + } as AssistantMessage, + }); + + expect(payloads).toHaveLength(1); + expect(payloads[0]).toMatchObject({ + text: "Attached image", + mediaUrl: "/tmp/reply-image.png", + mediaUrls: ["/tmp/reply-image.png"], + }); + }); + + it("uses raw final assistant text when visible-text extraction removed a media-only directive line", () => { + const payloads = buildPayloads({ + lastAssistant: { + role: "assistant", + stopReason: "stop", + content: [ + { + type: "text", + text: "MEDIA:/tmp/reply-image.png\nAttached image", + textSignature: JSON.stringify({ + v: 1, + id: "item_final", + phase: "final_answer", + }), + }, + ], + } as AssistantMessage, + }); + + expect(payloads).toHaveLength(1); + expect(payloads[0]).toMatchObject({ + text: "Attached image", + mediaUrl: "/tmp/reply-image.png", + mediaUrls: ["/tmp/reply-image.png"], + }); + }); }); diff --git a/src/agents/pi-embedded-runner/run/payloads.ts b/src/agents/pi-embedded-runner/run/payloads.ts index 6a32bd03129..0a94c58cf43 100644 --- a/src/agents/pi-embedded-runner/run/payloads.ts +++ b/src/agents/pi-embedded-runner/run/payloads.ts @@ -6,6 +6,7 @@ import { isSilentReplyPayloadText, SILENT_REPLY_TOKEN } from "../../../auto-repl import { formatToolAggregate } from "../../../auto-reply/tool-meta.js"; import type { OpenClawConfig } from "../../../config/types.openclaw.js"; import { isCronSessionKey } from "../../../routing/session-key.js"; +import { extractAssistantTextForPhase } from "../../../shared/chat-message-content.js"; import { normalizeOptionalLowercaseString, normalizeOptionalString, @@ -52,6 +53,18 @@ function isVerboseToolDetailEnabled(level?: VerboseLevel): boolean { return level === "on" || level === "full"; } +function resolveRawAssistantAnswerText(lastAssistant: AssistantMessage | undefined): string { + if (!lastAssistant) { + return ""; + } + return ( + normalizeOptionalString( + extractAssistantTextForPhase(lastAssistant, { phase: "final_answer" }) ?? + extractAssistantTextForPhase(lastAssistant), + ) ?? "" + ); +} + function shouldIncludeToolErrorDetails(params: { lastToolError: ToolErrorSummary; isCronTrigger?: boolean; @@ -220,6 +233,7 @@ export function buildEmbeddedRunPayloads(params: { const fallbackAnswerText = params.lastAssistant ? extractAssistantVisibleText(params.lastAssistant) : ""; + const fallbackRawAnswerText = resolveRawAssistantAnswerText(params.lastAssistant); const shouldSuppressRawErrorText = (text: string) => { if (!lastAssistantErrored) { return false; @@ -270,13 +284,32 @@ export function buildEmbeddedRunPayloads(params: { } return isRawApiErrorPayload(trimmed); }; + const rawAnswerDirectiveState = fallbackRawAnswerText + ? parseReplyDirectives(fallbackRawAnswerText) + : null; + const rawAnswerHasMedia = + (rawAnswerDirectiveState?.mediaUrls?.length ?? 0) > 0 || rawAnswerDirectiveState?.audioAsVoice; + const assistantTextsHaveMedia = params.assistantTexts.some((text) => { + const parsed = parseReplyDirectives(text); + return (parsed.mediaUrls?.length ?? 0) > 0 || parsed.audioAsVoice; + }); + const normalizedAssistantTexts = normalizeTextForComparison(params.assistantTexts.join("\n\n")); + const normalizedRawAnswerText = normalizeTextForComparison(rawAnswerDirectiveState?.text ?? ""); + const shouldPreferRawAnswerText = + rawAnswerHasMedia && + (!params.assistantTexts.length || + (!assistantTextsHaveMedia && + normalizedAssistantTexts.length > 0 && + normalizedAssistantTexts === normalizedRawAnswerText)); const answerTexts = suppressAssistantArtifacts ? [] - : (params.assistantTexts.length - ? params.assistantTexts - : fallbackAnswerText - ? [fallbackAnswerText] - : [] + : (shouldPreferRawAnswerText && fallbackRawAnswerText + ? [fallbackRawAnswerText] + : params.assistantTexts.length + ? params.assistantTexts + : fallbackAnswerText + ? [fallbackAnswerText] + : [] ).filter((text) => !shouldSuppressRawErrorText(text)); let hasUserFacingAssistantReply = false; diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index cd783b149b9..79cd98d6bea 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -103,7 +103,7 @@ describe("buildReplyPayloads media filter integration", () => { }); }); - it("applies media filter after text filter", async () => { + it("drops duplicate caption text after matching media is stripped", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!", mediaUrl: "file:///tmp/photo.jpg" }], @@ -111,10 +111,24 @@ describe("buildReplyPayloads media filter integration", () => { messagingToolSentMediaUrls: ["file:///tmp/photo.jpg"], }); - // Text filter removes the payload entirely (text matched), so nothing remains. expect(replyPayloads).toHaveLength(0); }); + it("keeps captioned media when only the caption matches a messaging tool send", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "hello world!", mediaUrl: "file:///tmp/photo.jpg" }], + messagingToolSentTexts: ["hello world!"], + messagingToolSentMediaUrls: ["file:///tmp/other.jpg"], + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]).toMatchObject({ + text: "hello world!", + mediaUrl: "file:///tmp/photo.jpg", + }); + }); + it("does not dedupe text for cross-target messaging sends", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, @@ -246,6 +260,20 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); + it("suppresses warning text when silent media payloads fail normalization", async () => { + const normalizeMediaPaths = async () => { + throw new Error("file not found"); + }; + + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "NO_REPLY\nMEDIA: ./missing.png" }], + normalizeMediaPaths, + }); + + expect(replyPayloads).toHaveLength(0); + }); + it("extracts markdown image replies into final payload media urls", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, @@ -310,6 +338,25 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); + it("deduplicates final payloads against directly sent block keys when streaming is enabled without a pipeline", async () => { + const { createBlockReplyContentKey } = await import("./block-reply-pipeline.js"); + const directlySentBlockKeys = new Set(); + directlySentBlockKeys.add( + createBlockReplyContentKey({ text: "response", replyToId: "post-1" }), + ); + + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + blockStreamingEnabled: true, + blockReplyPipeline: null, + directlySentBlockKeys, + replyToMode: "off", + payloads: [{ text: "response" }], + }); + + expect(replyPayloads).toHaveLength(0); + }); + it("does not suppress same-target replies when accountId differs", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 444d58f0d69..e1777261af3 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -148,11 +148,18 @@ export async function buildReplyPayloads(params: { currentMessageId: params.currentMessageId, silentToken: SILENT_REPLY_TOKEN, parseMode: "always", - }).payload; - return await normalizeReplyPayloadMedia({ - payload: parsed, + }); + const mediaNormalizedPayload = await normalizeReplyPayloadMedia({ + payload: parsed.payload, normalizeMediaPaths: params.normalizeMediaPaths, }); + if ( + parsed.isSilent && + !resolveSendableOutboundReplyParts(mediaNormalizedPayload).hasMedia + ) { + mediaNormalizedPayload.text = undefined; + } + return mediaNormalizedPayload; }), ) ).filter(isRenderablePayload); @@ -199,32 +206,36 @@ export async function buildReplyPayloads(params: { normalizeMediaPaths: params.normalizeMediaPaths, }) : (params.messagingToolSentMediaUrls ?? []); - const dedupedPayloads = dedupeMessagingToolPayloads - ? (dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime())).filterMessagingToolDuplicates({ - payloads: silentFilteredPayloads, - sentTexts: messagingToolSentTexts, - }) - : silentFilteredPayloads; const mediaFilteredPayloads = dedupeMessagingToolPayloads ? ( dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime()) ).filterMessagingToolMediaDuplicates({ - payloads: dedupedPayloads, + payloads: silentFilteredPayloads, sentMediaUrls: messagingToolSentMediaUrls, }) - : dedupedPayloads; + : silentFilteredPayloads; + const dedupedPayloads = dedupeMessagingToolPayloads + ? (dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime())).filterMessagingToolDuplicates({ + payloads: mediaFilteredPayloads, + sentTexts: messagingToolSentTexts, + }) + : mediaFilteredPayloads; + const isDirectlySentBlockPayload = (payload: ReplyPayload) => + Boolean(params.directlySentBlockKeys?.has(createBlockReplyContentKey(payload))); // Filter out payloads already sent via pipeline or directly during tool flush. const filteredPayloads = shouldDropFinalPayloads - ? mediaFilteredPayloads.filter((payload) => payload.isError) + ? dedupedPayloads.filter((payload) => payload.isError) : params.blockStreamingEnabled - ? mediaFilteredPayloads.filter( - (payload) => !params.blockReplyPipeline?.hasSentPayload(payload), + ? dedupedPayloads.filter( + (payload) => + !params.blockReplyPipeline?.hasSentPayload(payload) && + !isDirectlySentBlockPayload(payload), ) : params.directlySentBlockKeys?.size - ? mediaFilteredPayloads.filter( + ? dedupedPayloads.filter( (payload) => !params.directlySentBlockKeys!.has(createBlockReplyContentKey(payload)), ) - : mediaFilteredPayloads; + : dedupedPayloads; const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads; return { diff --git a/src/auto-reply/reply/agent-runner.media-paths.test.ts b/src/auto-reply/reply/agent-runner.media-paths.test.ts index bc3d9848949..557bd993cb7 100644 --- a/src/auto-reply/reply/agent-runner.media-paths.test.ts +++ b/src/auto-reply/reply/agent-runner.media-paths.test.ts @@ -196,7 +196,7 @@ describe("runReplyAgent media path normalization", () => { ); }); - it("shares one media cache between direct block media and final payload filtering", async () => { + it("shares one media cache between block accumulation and final payload delivery", async () => { let stagedIndex = 0; resolveOutboundAttachmentFromUrlMock.mockImplementation(async (mediaUrl: string) => { stagedIndex += 1; @@ -233,18 +233,16 @@ describe("runReplyAgent media path normalization", () => { }), ); - expect(result).toBeUndefined(); - expect(resolveOutboundAttachmentFromUrlMock).toHaveBeenCalledTimes(1); - expect(onBlockReply).toHaveBeenCalledTimes(1); - expect(onBlockReply).toHaveBeenCalledWith({ - text: undefined, + expect(result).toMatchObject({ + text: "here is the chart", mediaUrl: "/tmp/outbound-media/1-chart.png", mediaUrls: ["/tmp/outbound-media/1-chart.png"], - replyToCurrent: undefined, replyToId: "msg-1", replyToTag: false, audioAsVoice: false, }); + expect(resolveOutboundAttachmentFromUrlMock).toHaveBeenCalledTimes(1); + expect(onBlockReply).not.toHaveBeenCalled(); }); it("does not create a second media context inside runAgentTurnWithFallback when onBlockReply is provided", async () => { diff --git a/src/auto-reply/reply/reply-delivery.test.ts b/src/auto-reply/reply/reply-delivery.test.ts index 6118f689e34..f9811a15855 100644 --- a/src/auto-reply/reply/reply-delivery.test.ts +++ b/src/auto-reply/reply/reply-delivery.test.ts @@ -13,7 +13,7 @@ type BlockReplyPipelineLike = NonNullable< >; describe("createBlockReplyDeliveryHandler", () => { - it("sends media-bearing block replies even when block streaming is disabled", async () => { + it("keeps captioned media-bearing block replies buffered when block streaming is disabled", async () => { const onBlockReply = vi.fn(async () => {}); const normalizeStreamingText = vi.fn((payload: { text?: string }) => ({ text: payload.text, @@ -40,25 +40,49 @@ describe("createBlockReplyDeliveryHandler", () => { replyToCurrent: true, }); + expect(onBlockReply).not.toHaveBeenCalled(); + expect(directlySentBlockKeys).toEqual(new Set()); + expect(typingSignals.signalTextDelta).toHaveBeenCalledWith("here's the vibe"); + }); + + it("sends media-only block replies when block streaming is disabled", async () => { + const onBlockReply = vi.fn(async () => {}); + const directlySentBlockKeys = new Set(); + + const handler = createBlockReplyDeliveryHandler({ + onBlockReply, + normalizeStreamingText: (payload) => ({ text: payload.text, skip: false }), + applyReplyToMode: (payload) => payload, + typingSignals: { + signalTextDelta: vi.fn(async () => {}), + } as unknown as TypingSignaler, + blockStreamingEnabled: false, + blockReplyPipeline: null, + directlySentBlockKeys, + }); + + await handler({ + mediaUrls: ["/tmp/generated.png"], + replyToCurrent: true, + }); + expect(onBlockReply).toHaveBeenCalledWith({ - text: undefined, mediaUrl: "/tmp/generated.png", mediaUrls: ["/tmp/generated.png"], replyToCurrent: true, replyToId: undefined, replyToTag: undefined, audioAsVoice: false, + text: undefined, }); expect(directlySentBlockKeys).toEqual( new Set([ createBlockReplyContentKey({ - text: "here's the vibe", mediaUrls: ["/tmp/generated.png"], replyToCurrent: true, }), ]), ); - expect(typingSignals.signalTextDelta).toHaveBeenCalledWith("here's the vibe"); }); it("keeps text-only block replies buffered when block streaming is disabled", async () => { diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts index 690512cff6e..c20c6ec873d 100644 --- a/src/auto-reply/reply/reply-delivery.ts +++ b/src/auto-reply/reply/reply-delivery.ts @@ -159,15 +159,14 @@ export function createBlockReplyDeliveryHandler(params: { trackingPayload: blockPayload, payload: blockPayload, }); - } else if (blockHasMedia) { - // When block streaming is disabled, text-only block replies are accumulated into the - // final response. Media cannot be reconstructed later, so send it immediately and let - // the assistant's final text arrive through the normal final-reply path. + } else if (blockHasMedia && !blockPayload.text) { + // Media-only block replies (for example orphaned tool attachments) are not reconstructible + // from the assistant's final text, so they still need a direct fallback when streaming is off. await sendDirectBlockReply({ onBlockReply: params.onBlockReply, directlySentBlockKeys: params.directlySentBlockKeys, trackingPayload: blockPayload, - payload: { ...blockPayload, text: undefined }, + payload: blockPayload, }); } // When streaming is disabled entirely, text-only blocks are accumulated in final text. diff --git a/src/auto-reply/reply/reply-media-paths.test.ts b/src/auto-reply/reply/reply-media-paths.test.ts index d7aca2428fa..b6724e641d9 100644 --- a/src/auto-reply/reply/reply-media-paths.test.ts +++ b/src/auto-reply/reply/reply-media-paths.test.ts @@ -120,6 +120,7 @@ describe("createReplyMediaPathNormalizer", () => { 5 * 1024 * 1024, expect.any(Object), ); + expect(result.text).toBe("⚠️ Media failed."); }); it("drops host file URLs when no sandbox mapping applies", async () => { @@ -319,6 +320,45 @@ describe("createReplyMediaPathNormalizer", () => { }); }); + it("keeps reply text and appends a warning when all reply media is dropped", async () => { + resolveOutboundAttachmentFromUrl.mockRejectedValueOnce(new Error("file not found")); + const normalize = createReplyMediaPathNormalizer({ + cfg: {}, + sessionKey: "session-key", + workspaceDir: "/tmp/agent-workspace", + }); + + const result = await normalize({ + text: "WA_MEDIA_DM_07", + mediaUrls: ["./out/missing.png"], + }); + + expect(result).toMatchObject({ + text: "WA_MEDIA_DM_07\n⚠️ Media failed.", + mediaUrl: undefined, + mediaUrls: undefined, + }); + }); + + it("returns a warning-only text reply when media-only output is dropped upstream", async () => { + resolveOutboundAttachmentFromUrl.mockRejectedValueOnce(new Error("file not found")); + const normalize = createReplyMediaPathNormalizer({ + cfg: {}, + sessionKey: "session-key", + workspaceDir: "/tmp/agent-workspace", + }); + + const result = await normalize({ + mediaUrls: ["./out/missing.png"], + }); + + expect(result).toMatchObject({ + text: "⚠️ Media failed.", + mediaUrl: undefined, + mediaUrls: undefined, + }); + }); + it("threads requester context into shared outbound media access", async () => { const normalize = createReplyMediaPathNormalizer({ cfg: {}, diff --git a/src/auto-reply/reply/reply-media-paths.ts b/src/auto-reply/reply/reply-media-paths.ts index f0a9ede1dc5..1574c257ee1 100644 --- a/src/auto-reply/reply/reply-media-paths.ts +++ b/src/auto-reply/reply/reply-media-paths.ts @@ -60,6 +60,10 @@ function resolveReplyMediaMaxBytes(params: { : MEDIA_MAX_BYTES; } +function formatBlockedReplyMediaWarning(): string { + return "⚠️ Media failed."; +} + export function createReplyMediaPathNormalizer(params: { cfg: OpenClawConfig; sessionKey?: string; @@ -206,11 +210,13 @@ export function createReplyMediaPathNormalizer(params: { const normalizedMedia: string[] = []; const seen = new Set(); + let firstMediaDropError: unknown; for (const media of mediaList) { let normalized: string; try { normalized = await normalizeMediaSource(media); } catch (err) { + firstMediaDropError ??= err; logVerbose(`dropping blocked reply media ${media}: ${String(err)}`); continue; } @@ -222,8 +228,10 @@ export function createReplyMediaPathNormalizer(params: { } if (normalizedMedia.length === 0) { + const warning = firstMediaDropError ? formatBlockedReplyMediaWarning() : undefined; return { ...payload, + text: warning ? (payload.text ? `${payload.text}\n${warning}` : warning) : payload.text, mediaUrl: undefined, mediaUrls: undefined, }; diff --git a/src/auto-reply/reply/reply-payloads-dedupe.ts b/src/auto-reply/reply/reply-payloads-dedupe.ts index 7ad80888bc4..69fbfe7d3c2 100644 --- a/src/auto-reply/reply/reply-payloads-dedupe.ts +++ b/src/auto-reply/reply/reply-payloads-dedupe.ts @@ -18,7 +18,12 @@ export function filterMessagingToolDuplicates(params: { if (sentTexts.length === 0) { return payloads; } - return payloads.filter((payload) => !isMessagingToolDuplicate(payload.text ?? "", sentTexts)); + return payloads.filter((payload) => { + if (payload.mediaUrl || payload.mediaUrls?.length) { + return true; + } + return !isMessagingToolDuplicate(payload.text ?? "", sentTexts); + }); } export function filterMessagingToolMediaDuplicates(params: {