From f5f0235bb18a202ce10519f4267283ab5f4a2b46 Mon Sep 17 00:00:00 2001 From: Marcus Castro <7562095+mcaxtr@users.noreply.github.com> Date: Thu, 23 Apr 2026 01:19:47 -0300 Subject: [PATCH] feat(whatsapp): adopt replyToMode quoting (#62305) * fix(core): align auto-reply threading behavior * fix(core): propagate reply threading through outbound and gateway * fix(whatsapp): use cached metadata for native quoted replies * feat(whatsapp): add configurable native reply quoting --- CHANGELOG.md | 1 + .../OpenClawProtocol/GatewayModels.swift | 4 + .../OpenClawProtocol/GatewayModels.swift | 4 + extensions/whatsapp/src/accounts.ts | 4 +- .../src/auto-reply/deliver-reply.test.ts | 119 ++++++- .../whatsapp/src/auto-reply/deliver-reply.ts | 85 +++-- .../monitor/inbound-dispatch.test.ts | 15 + .../auto-reply/monitor/inbound-dispatch.ts | 11 +- .../src/auto-reply/monitor/process-message.ts | 6 + extensions/whatsapp/src/channel.ts | 6 + extensions/whatsapp/src/inbound/monitor.ts | 50 ++- .../whatsapp/src/inbound/send-api.test.ts | 25 ++ extensions/whatsapp/src/inbound/send-api.ts | 24 +- extensions/whatsapp/src/inbound/types.ts | 14 +- extensions/whatsapp/src/outbound-base.test.ts | 297 ++++++++++++++++++ extensions/whatsapp/src/outbound-base.ts | 71 ++++- .../whatsapp/src/quoted-message.test.ts | 83 +++++ extensions/whatsapp/src/quoted-message.ts | 192 +++++++++++ extensions/whatsapp/src/send.ts | 10 +- .../reply/agent-runner-execution.ts | 2 + .../reply/agent-runner.media-paths.test.ts | 2 +- src/auto-reply/reply/agent-runner.ts | 8 +- .../reply/commands-core.send-policy.test.ts | 38 ++- src/auto-reply/reply/commands-core.ts | 18 +- .../reply/get-reply-run.media-only.test.ts | 12 + src/auto-reply/reply/get-reply-run.ts | 9 + src/auto-reply/reply/reply-delivery.test.ts | 44 ++- src/auto-reply/reply/reply-delivery.ts | 12 +- src/auto-reply/reply/reply-directives.ts | 4 +- src/auto-reply/reply/reply-threading.test.ts | 33 ++ src/auto-reply/reply/reply-threading.ts | 14 +- src/auto-reply/reply/streaming-directives.ts | 3 +- ...ndled-channel-config-metadata.generated.ts | 40 +++ src/config/types.whatsapp.ts | 3 + src/config/zod-schema.providers-whatsapp.ts | 2 + src/gateway/protocol/schema/agent.ts | 2 + src/gateway/server-methods/send.test.ts | 36 +++ src/gateway/server-methods/send.ts | 4 + .../message-action-runner.threading.test.ts | 144 +++++++++ src/infra/outbound/message-action-runner.ts | 6 +- .../message-action-threading.test-helpers.ts | 75 +++++ .../outbound/message-action-threading.ts | 73 +++++ src/infra/outbound/message.channels.test.ts | 11 + src/infra/outbound/message.ts | 1 + src/infra/outbound/payloads.test.ts | 28 +- 45 files changed, 1565 insertions(+), 80 deletions(-) create mode 100644 extensions/whatsapp/src/quoted-message.test.ts create mode 100644 extensions/whatsapp/src/quoted-message.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d654da1037..62fd06a5e69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai ### Changes +- WhatsApp: add configurable native reply quoting with replyToMode for WhatsApp conversations. Thanks @mcaxtr. - Providers/Amazon Bedrock Mantle: add Claude Opus 4.7 through Mantle's Anthropic Messages route with provider-owned bearer-auth streaming, so the model is actually callable without treating AWS bearer tokens like Anthropic API keys. Thanks @wirjo. - Providers/OpenAI Codex: remove the Codex CLI auth import path from onboarding and provider discovery so OpenClaw no longer copies `~/.codex` OAuth material into agent auth stores; use browser login or device pairing instead. (#70390) Thanks @pashpashpash. - Providers/OpenAI-compatible: mark known local backends such as vLLM, SGLang, llama.cpp, LM Studio, LocalAI, Jan, TabbyAPI, and text-generation-webui as streaming-usage compatible, so their token accounting no longer degrades to unknown/stale totals. (#68711) Thanks @gaineyllc. diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index ca3709c02a5..e89d26be834 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -464,6 +464,7 @@ public struct SendParams: Codable, Sendable { public let channel: String? public let accountid: String? public let agentid: String? + public let replytoid: String? public let threadid: String? public let sessionkey: String? public let idempotencykey: String @@ -477,6 +478,7 @@ public struct SendParams: Codable, Sendable { channel: String?, accountid: String?, agentid: String?, + replytoid: String?, threadid: String?, sessionkey: String?, idempotencykey: String) @@ -489,6 +491,7 @@ public struct SendParams: Codable, Sendable { self.channel = channel self.accountid = accountid self.agentid = agentid + self.replytoid = replytoid self.threadid = threadid self.sessionkey = sessionkey self.idempotencykey = idempotencykey @@ -503,6 +506,7 @@ public struct SendParams: Codable, Sendable { case channel case accountid = "accountId" case agentid = "agentId" + case replytoid = "replyToId" case threadid = "threadId" case sessionkey = "sessionKey" case idempotencykey = "idempotencyKey" diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index ca3709c02a5..e89d26be834 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -464,6 +464,7 @@ public struct SendParams: Codable, Sendable { public let channel: String? public let accountid: String? public let agentid: String? + public let replytoid: String? public let threadid: String? public let sessionkey: String? public let idempotencykey: String @@ -477,6 +478,7 @@ public struct SendParams: Codable, Sendable { channel: String?, accountid: String?, agentid: String?, + replytoid: String?, threadid: String?, sessionkey: String?, idempotencykey: String) @@ -489,6 +491,7 @@ public struct SendParams: Codable, Sendable { self.channel = channel self.accountid = accountid self.agentid = agentid + self.replytoid = replytoid self.threadid = threadid self.sessionkey = sessionkey self.idempotencykey = idempotencykey @@ -503,6 +506,7 @@ public struct SendParams: Codable, Sendable { case channel case accountid = "accountId" case agentid = "agentId" + case replytoid = "replyToId" case threadid = "threadId" case sessionkey = "sessionKey" case idempotencykey = "idempotencyKey" diff --git a/extensions/whatsapp/src/accounts.ts b/extensions/whatsapp/src/accounts.ts index 024cc09e6df..97df8c8487b 100644 --- a/extensions/whatsapp/src/accounts.ts +++ b/extensions/whatsapp/src/accounts.ts @@ -7,7 +7,7 @@ import { resolveUserPath, type OpenClawConfig, } from "openclaw/plugin-sdk/account-core"; -import type { DmPolicy, GroupPolicy } from "openclaw/plugin-sdk/config-runtime"; +import type { DmPolicy, GroupPolicy, ReplyToMode } from "openclaw/plugin-sdk/config-runtime"; import { resolveOAuthDir } from "openclaw/plugin-sdk/state-paths"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import { resolveMergedWhatsAppAccountConfig } from "./account-config.js"; @@ -38,6 +38,7 @@ export type ResolvedWhatsAppAccount = { groups?: WhatsAppAccountConfig["groups"]; direct?: WhatsAppAccountConfig["direct"]; debounceMs?: number; + replyToMode?: ReplyToMode; }; export const DEFAULT_WHATSAPP_MEDIA_MAX_MB = 50; @@ -153,6 +154,7 @@ export function resolveWhatsAppAccount(params: { groups: merged.groups, direct: merged.direct, debounceMs: merged.debounceMs, + replyToMode: merged.replyToMode, }; } diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts index 179fa5b2b29..959cd17de3f 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts @@ -2,6 +2,7 @@ import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { sleep } from "openclaw/plugin-sdk/text-runtime"; import { beforeAll, describe, expect, it, vi } from "vitest"; import { loadWebMedia } from "../media.js"; +import { cacheInboundMessageMeta } from "../quoted-message.js"; import type { WebInboundMsg } from "./types.js"; vi.mock("openclaw/plugin-sdk/runtime-env", async () => { @@ -35,7 +36,12 @@ function makeMsg(): WebInboundMsg { return { from: "+10000000000", to: "+20000000000", + accountId: "work", + chatId: "15551234567@s.whatsapp.net", + chatType: "group", id: "msg-1", + body: "latest batch body", + senderJid: "222@s.whatsapp.net", reply: vi.fn(async () => undefined), sendMedia: vi.fn(async () => undefined), } as unknown as WebInboundMsg; @@ -120,6 +126,7 @@ describe("deliverWebReply", () => { expect(msg.reply).toHaveBeenCalledTimes(1); expect(msg.reply).toHaveBeenCalledWith( "Intro line\nReasoning: appears in content but is not a prefix", + undefined, ); }); @@ -136,11 +143,59 @@ describe("deliverWebReply", () => { }); expect(msg.reply).toHaveBeenCalledTimes(2); - expect(msg.reply).toHaveBeenNthCalledWith(1, "aaa"); - expect(msg.reply).toHaveBeenNthCalledWith(2, "aaa"); + expect(msg.reply).toHaveBeenNthCalledWith(1, "aaa", undefined); + expect(msg.reply).toHaveBeenNthCalledWith(2, "aaa", undefined); expect(replyLogger.info).toHaveBeenCalledWith(expect.any(Object), "auto-reply sent (text)"); }); + it("keeps quote threading on every text chunk for a threaded reply", async () => { + const msg = makeMsg(); + cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-1", { + participant: "111@s.whatsapp.net", + body: "quoted body", + fromMe: true, + }); + + await deliverWebReply({ + replyResult: { text: "aaaaaa", replyToId: "reply-1" }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 3, + replyLogger, + skipLog: true, + }); + + expect(msg.reply).toHaveBeenCalledTimes(2); + expect(msg.reply).toHaveBeenNthCalledWith( + 1, + "aaa", + expect.objectContaining({ + quoted: expect.objectContaining({ + key: expect.objectContaining({ + id: "reply-1", + fromMe: true, + participant: "111@s.whatsapp.net", + }), + message: { conversation: "quoted body" }, + }), + }), + ); + expect(msg.reply).toHaveBeenNthCalledWith( + 2, + "aaa", + expect.objectContaining({ + quoted: expect.objectContaining({ + key: expect.objectContaining({ + id: "reply-1", + fromMe: true, + participant: "111@s.whatsapp.net", + }), + message: { conversation: "quoted body" }, + }), + }), + ); + }); + it.each(["connection closed", "operation timed out"])( "retries text send on transient failure: %s", async (errorMessage) => { @@ -188,12 +243,67 @@ describe("deliverWebReply", () => { caption: "aaa", mimetype: "image/jpeg", }), + undefined, ); - expect(msg.reply).toHaveBeenCalledWith("aaa"); + expect(msg.reply).toHaveBeenCalledWith("aaa", undefined); expect(replyLogger.info).toHaveBeenCalledWith(expect.any(Object), "auto-reply sent (media)"); expect(logVerbose).toHaveBeenCalled(); }); + it("keeps quote threading on media and trailing text chunks for a threaded reply", async () => { + const msg = makeMsg(); + mockLoadedImageMedia(); + cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-2", { + participant: "111@s.whatsapp.net", + body: "quoted media body", + fromMe: true, + }); + + await deliverWebReply({ + replyResult: { + text: "captiontrail", + mediaUrl: "http://example.com/img.jpg", + replyToId: "reply-2", + }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 7, + replyLogger, + skipLog: true, + }); + + expect(msg.sendMedia).toHaveBeenCalledWith( + expect.objectContaining({ + image: expect.any(Buffer), + caption: "caption", + mimetype: "image/jpeg", + }), + expect.objectContaining({ + quoted: expect.objectContaining({ + key: expect.objectContaining({ + id: "reply-2", + fromMe: true, + participant: "111@s.whatsapp.net", + }), + message: { conversation: "quoted media body" }, + }), + }), + ); + expect(msg.reply).toHaveBeenCalledWith( + "trail", + expect.objectContaining({ + quoted: expect.objectContaining({ + key: expect.objectContaining({ + id: "reply-2", + fromMe: true, + participant: "111@s.whatsapp.net", + }), + message: { conversation: "quoted media body" }, + }), + }), + ); + }); + it("retries media send on transient failure", async () => { const msg = makeMsg(); mockLoadedImageMedia(); @@ -265,6 +375,7 @@ describe("deliverWebReply", () => { mimetype: "audio/ogg", caption: "cap", }), + undefined, ); }); @@ -293,6 +404,7 @@ describe("deliverWebReply", () => { caption: "cap", mimetype: "video/mp4", }), + undefined, ); }); @@ -323,6 +435,7 @@ describe("deliverWebReply", () => { caption: "cap", mimetype: "application/octet-stream", }), + undefined, ); }); }); diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.ts index 363a003a21b..a74308a799f 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.ts @@ -8,6 +8,7 @@ import { } from "openclaw/plugin-sdk/reply-payload"; import { logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import { loadWebMedia } from "../media.js"; +import { buildQuotedMessageOptions, lookupInboundMessageMeta } from "../quoted-message.js"; import { newConnectionId } from "../reconnect.js"; import { formatError } from "../session.js"; import { convertMarkdownTables, sleep } from "../text-runtime.js"; @@ -45,6 +46,23 @@ export async function deliverWebReply(params: { const textChunks = chunkMarkdownTextWithMode(convertedText, textLimit, chunkMode); const mediaList = resolveOutboundMediaUrls(replyResult); + const getQuote = () => { + if (!replyResult.replyToId) { + return undefined; + } + // Use replyToId (not msg.id) so batched payloads quote the correct + // per-message target. Look up cached metadata for the specific + // message being quoted — msg.body may be a combined batch body. + const cached = lookupInboundMessageMeta(msg.accountId, msg.chatId, replyResult.replyToId); + return buildQuotedMessageOptions({ + messageId: replyResult.replyToId, + remoteJid: msg.chatId, + fromMe: cached?.fromMe ?? false, + participant: cached?.participant ?? (msg.chatType === "group" ? msg.senderJid : undefined), + messageText: cached?.body ?? "", + }); + }; + const sendWithRetry = async (fn: () => Promise, label: string, maxAttempts = 3) => { let lastErr: unknown; for (let attempt = 1; attempt <= maxAttempts; attempt++) { @@ -73,7 +91,8 @@ export async function deliverWebReply(params: { const totalChunks = textChunks.length; for (const [index, chunk] of textChunks.entries()) { const chunkStarted = Date.now(); - await sendWithRetry(() => msg.reply(chunk), "text"); + const quote = getQuote(); + await sendWithRetry(() => msg.reply(chunk, quote), "text"); if (!skipLog) { const durationMs = Date.now() - chunkStarted; whatsappOutboundLog.debug( @@ -117,47 +136,63 @@ export async function deliverWebReply(params: { logVerbose(`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`); } if (media.kind === "image") { + const quote = getQuote(); await sendWithRetry( () => - msg.sendMedia({ - image: media.buffer, - caption, - mimetype: media.contentType, - }), + msg.sendMedia( + { + image: media.buffer, + caption, + mimetype: media.contentType, + }, + quote, + ), "media:image", ); } else if (media.kind === "audio") { + const quote = getQuote(); await sendWithRetry( () => - msg.sendMedia({ - audio: media.buffer, - ptt: true, - mimetype: media.contentType, - caption, - }), + msg.sendMedia( + { + audio: media.buffer, + ptt: true, + mimetype: media.contentType, + caption, + }, + quote, + ), "media:audio", ); } else if (media.kind === "video") { + const quote = getQuote(); await sendWithRetry( () => - msg.sendMedia({ - video: media.buffer, - caption, - mimetype: media.contentType, - }), + msg.sendMedia( + { + video: media.buffer, + caption, + mimetype: media.contentType, + }, + quote, + ), "media:video", ); } else { const fileName = media.fileName ?? mediaUrl.split("/").pop() ?? "file"; const mimetype = media.contentType ?? "application/octet-stream"; + const quote = getQuote(); await sendWithRetry( () => - msg.sendMedia({ - document: media.buffer, - fileName, - caption, - mimetype, - }), + msg.sendMedia( + { + document: media.buffer, + fileName, + caption, + mimetype, + }, + quote, + ), "media:document", ); } @@ -193,12 +228,12 @@ export async function deliverWebReply(params: { return; } whatsappOutboundLog.warn(`Media skipped; sent text-only to ${msg.from}`); - await msg.reply(fallbackText); + await msg.reply(fallbackText, getQuote()); }, }); // Remaining text chunks after media for (const chunk of remainingText) { - await msg.reply(chunk); + await msg.reply(chunk, getQuote()); } } diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts index 7f49da45f2e..3d7fd3bf4e1 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts @@ -232,6 +232,21 @@ describe("whatsapp inbound dispatch", () => { expect(ctx.GroupSystemPrompt).toBeUndefined(); }); + it("preserves reply threading policy in the inbound context", () => { + const ctx = buildWhatsAppInboundContext({ + combinedBody: "hi", + conversationId: "+1000", + msg: makeMsg(), + route: makeRoute(), + sender: { + e164: "+1000", + }, + replyThreading: { implicitCurrentMessage: "allow" }, + }); + + expect(ctx.ReplyThreading).toEqual({ implicitCurrentMessage: "allow" }); + }); + it("defaults responsePrefix to identity name in self-chats when unset", () => { const responsePrefix = resolveWhatsAppResponsePrefix({ cfg: { diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts index 7c98365a23c..0c82f569636 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts @@ -40,6 +40,10 @@ type VisibleReplyTarget = { } | null; }; +type ReplyThreadingContext = { + implicitCurrentMessage?: "default" | "allow" | "deny"; +}; + type SenderContext = { id?: string; name?: string; @@ -91,6 +95,7 @@ export function buildWhatsAppInboundContext(params: { msg: WebInboundMsg; route: ReturnType; sender: SenderContext; + replyThreading?: ReplyThreadingContext; visibleReplyTo?: VisibleReplyTarget; }) { const inboundHistory = @@ -102,7 +107,7 @@ export function buildWhatsAppInboundContext(params: { })) : undefined; - return finalizeInboundContext({ + const result = finalizeInboundContext({ Body: params.combinedBody, BodyForAgent: params.msg.body, InboundHistory: inboundHistory, @@ -132,6 +137,7 @@ export function buildWhatsAppInboundContext(params: { SenderId: params.sender.id ?? params.sender.e164, SenderE164: params.sender.e164, CommandAuthorized: params.commandAuthorized, + ReplyThreading: params.replyThreading, WasMentioned: params.msg.wasMentioned, GroupSystemPrompt: params.groupSystemPrompt, ...(params.msg.location ? toLocationContext(params.msg.location) : {}), @@ -140,6 +146,7 @@ export function buildWhatsAppInboundContext(params: { OriginatingChannel: "whatsapp", OriginatingTo: params.msg.from, }); + return result; } export function resolveWhatsAppDmRouteTarget(params: { @@ -238,7 +245,7 @@ export async function dispatchWhatsAppBufferedReply(params: { maxMediaBytes: number; maxMediaTextChunkLimit?: number; msg: WebInboundMsg; - onModelSelected?: ChannelReplyOnModelSelected | undefined; + onModelSelected?: ChannelReplyOnModelSelected; rememberSentText: ( text: string | undefined, opts: { diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts index 81a64a5a6fe..b3165195d7f 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts @@ -1,3 +1,4 @@ +import { resolveBatchedReplyThreadingPolicy } from "openclaw/plugin-sdk/reply-reference"; import { getPrimaryIdentityId, getSelfIdentity, getSenderIdentity } from "../../identity.js"; import { resolveWhatsAppCommandAuthorized, @@ -230,6 +231,10 @@ export async function processMessage(params: { isSelfChat: params.msg.chatType !== "group" && inboundPolicy.isSelfChat, pipelineResponsePrefix: replyPipeline.responsePrefix, }); + const replyThreading = resolveBatchedReplyThreadingPolicy( + account.replyToMode ?? "off", + params.msg.isBatched === true, + ); // Resolve combined conversation system prompt using the group or direct surface. const conversationSystemPrompt = @@ -257,6 +262,7 @@ export async function processMessage(params: { name: sender.name ?? undefined, e164: sender.e164 ?? undefined, }, + replyThreading, visibleReplyTo: visibleReplyTo ?? undefined, }); diff --git a/extensions/whatsapp/src/channel.ts b/extensions/whatsapp/src/channel.ts index 6508de908f3..42cd2059d19 100644 --- a/extensions/whatsapp/src/channel.ts +++ b/extensions/whatsapp/src/channel.ts @@ -67,6 +67,12 @@ export const whatsappPlugin: ChannelPlugin = idLabel: "whatsappSenderId", }, outbound: whatsappChannelOutbound, + threading: { + scopedAccountReplyToMode: { + resolveAccount: (cfg, accountId) => resolveWhatsAppAccount({ cfg, accountId }), + resolveReplyToMode: (account) => account.replyToMode, + }, + }, base: { ...createWhatsAppPluginBase({ groups: { diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index bbc6ac6c89c..3ea6654cfd1 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -1,4 +1,10 @@ -import type { AnyMessageContent, proto, WAMessage, WASocket } from "@whiskeysockets/baileys"; +import type { + AnyMessageContent, + MiscMessageGenerationOptions, + proto, + WAMessage, + WASocket, +} from "@whiskeysockets/baileys"; import { createInboundDebouncer, formatLocationText } from "openclaw/plugin-sdk/channel-inbound"; import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime"; import { defaultRuntime } from "openclaw/plugin-sdk/runtime-env"; @@ -6,6 +12,7 @@ import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env"; import { getChildLogger } from "openclaw/plugin-sdk/text-runtime"; import { readWebSelfIdentityForDecision, WhatsAppAuthUnstableError } from "../auth-store.js"; import { getPrimaryIdentityId, resolveComparableIdentity } from "../identity.js"; +import { cacheInboundMessageMeta } from "../quoted-message.js"; import { DEFAULT_RECONNECT_POLICY, computeBackoff, sleepWithAbort } from "../reconnect.js"; import type { OpenClawConfig } from "../runtime-api.js"; import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from "../session.js"; @@ -209,6 +216,7 @@ export async function attachWebInboxToSocket( body: combinedBody, mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined, mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined, + isBatched: true, }; await options.onMessage(combinedMessage); await finalizeInboundDedupe(entries); @@ -247,13 +255,19 @@ export async function attachWebInboxToSocket( }); }; - const sendTrackedMessage = async (jid: string, content: AnyMessageContent) => { + const sendTrackedMessage = async ( + jid: string, + content: AnyMessageContent, + sendOptions?: MiscMessageGenerationOptions, + ) => { let lastErr: unknown = new Error(RECONNECT_IN_PROGRESS_ERROR); for (let attempt = 1; ; attempt++) { const currentSock = getCurrentSock(); if (currentSock) { try { - const result = await currentSock.sendMessage(jid, content); + const result = sendOptions + ? await currentSock.sendMessage(jid, content, sendOptions) + : await currentSock.sendMessage(jid, content); rememberOutboundMessage(jid, result); return result; } catch (err) { @@ -398,7 +412,9 @@ export async function attachWebInboxToSocket( messageTimestampMs, connectedAtMs, verbose: options.verbose, - sock: { sendMessage: (jid, content) => sendTrackedMessage(jid, content) }, + sock: { + sendMessage: (jid: string, content: AnyMessageContent) => sendTrackedMessage(jid, content), + }, remoteJid, }); if (!access.allowed) { @@ -515,11 +531,14 @@ export async function attachWebInboxToSocket( logWhatsAppVerbose(options.verbose, `Presence update failed: ${String(err)}`); } }; - const reply = async (text: string) => { - await sendTrackedMessage(chatJid, { text }); + const reply = async (text: string, options?: MiscMessageGenerationOptions) => { + await sendTrackedMessage(chatJid, { text }, options); }; - const sendMedia = async (payload: AnyMessageContent) => { - await sendTrackedMessage(chatJid, payload); + const sendMedia = async ( + payload: AnyMessageContent, + options?: MiscMessageGenerationOptions, + ) => { + await sendTrackedMessage(chatJid, payload, options); }; const timestamp = inbound.messageTimestampMs; const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined); @@ -580,6 +599,15 @@ export async function attachWebInboxToSocket( mediaFileName: enriched.mediaFileName, dedupeKey: inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : undefined, }; + if (inboundMessage.id) { + cacheInboundMessageMeta(inboundMessage.accountId, inboundMessage.chatId, inboundMessage.id, { + participant: inboundMessage.senderJid, + participantE164: + inboundMessage.chatType === "direct" ? inboundMessage.senderE164 : undefined, + body: inboundMessage.body, + fromMe: inboundMessage.fromMe, + }); + } try { const task = Promise.resolve(debouncer.enqueue(inboundMessage)); void task.catch((err) => { @@ -692,7 +720,11 @@ export async function attachWebInboxToSocket( const sendApi = createWebSendApi({ sock: { - sendMessage: (jid: string, content: AnyMessageContent) => sendTrackedMessage(jid, content), + sendMessage: ( + jid: string, + content: AnyMessageContent, + options?: MiscMessageGenerationOptions, + ) => sendTrackedMessage(jid, content, options), sendPresenceUpdate: async (presence, jid?: string) => { const currentSock = getCurrentSock(); if (!currentSock) { diff --git a/extensions/whatsapp/src/inbound/send-api.test.ts b/extensions/whatsapp/src/inbound/send-api.test.ts index e1919151990..7dec794a14c 100644 --- a/extensions/whatsapp/src/inbound/send-api.test.ts +++ b/extensions/whatsapp/src/inbound/send-api.test.ts @@ -218,4 +218,29 @@ describe("createWebSendApi", () => { expect(sendMessage).toHaveBeenCalledWith("123@s.whatsapp.net", { text: "hello" }); }); + + it("preserves the quoted remoteJid provided by the outbound adapter", async () => { + await api.sendMessage("+1555", "hello", undefined, undefined, { + quotedMessageKey: { + id: "quoted-1", + remoteJid: "277038292303944@lid", + fromMe: false, + participant: "1234@s.whatsapp.net", + messageText: "quoted body", + }, + }); + + expect(sendMessage).toHaveBeenCalledWith( + "1555@s.whatsapp.net", + { text: "hello" }, + expect.objectContaining({ + quoted: expect.objectContaining({ + key: expect.objectContaining({ + remoteJid: "277038292303944@lid", + id: "quoted-1", + }), + }), + }), + ); + }); }); diff --git a/extensions/whatsapp/src/inbound/send-api.ts b/extensions/whatsapp/src/inbound/send-api.ts index 2b043c3f266..92ccef288ba 100644 --- a/extensions/whatsapp/src/inbound/send-api.ts +++ b/extensions/whatsapp/src/inbound/send-api.ts @@ -1,5 +1,10 @@ -import type { AnyMessageContent, WAPresence } from "@whiskeysockets/baileys"; +import type { + AnyMessageContent, + MiscMessageGenerationOptions, + WAPresence, +} from "@whiskeysockets/baileys"; import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime"; +import { buildQuotedMessageOptions } from "../quoted-message.js"; import { toWhatsappJid } from "../text-runtime.js"; import type { ActiveWebSendOptions } from "./types.js"; @@ -19,7 +24,11 @@ function resolveOutboundMessageId(result: unknown): string { export function createWebSendApi(params: { sock: { - sendMessage: (jid: string, content: AnyMessageContent) => Promise; + sendMessage: ( + jid: string, + content: AnyMessageContent, + options?: MiscMessageGenerationOptions, + ) => Promise; sendPresenceUpdate: (presence: WAPresence, jid?: string) => Promise; }; defaultAccountId: string; @@ -66,7 +75,16 @@ export function createWebSendApi(params: { } else { payload = { text }; } - const result = await params.sock.sendMessage(jid, payload); + const quotedOpts = buildQuotedMessageOptions({ + messageId: sendOptions?.quotedMessageKey?.id, + remoteJid: sendOptions?.quotedMessageKey?.remoteJid, + fromMe: sendOptions?.quotedMessageKey?.fromMe, + participant: sendOptions?.quotedMessageKey?.participant, + messageText: sendOptions?.quotedMessageKey?.messageText, + }); + const result = quotedOpts + ? await params.sock.sendMessage(jid, payload, quotedOpts) + : await params.sock.sendMessage(jid, payload); const accountId = sendOptions?.accountId ?? params.defaultAccountId; recordWhatsAppOutbound(accountId); const messageId = resolveOutboundMessageId(result); diff --git a/extensions/whatsapp/src/inbound/types.ts b/extensions/whatsapp/src/inbound/types.ts index 7e83c6fa5cd..5da397fd119 100644 --- a/extensions/whatsapp/src/inbound/types.ts +++ b/extensions/whatsapp/src/inbound/types.ts @@ -1,4 +1,4 @@ -import type { AnyMessageContent } from "@whiskeysockets/baileys"; +import type { AnyMessageContent, MiscMessageGenerationOptions } from "@whiskeysockets/baileys"; import type { NormalizedLocation } from "openclaw/plugin-sdk/channel-inbound"; import type { PollInput } from "openclaw/plugin-sdk/media-runtime"; import type { WhatsAppIdentity, WhatsAppReplyContext, WhatsAppSelfIdentity } from "../identity.js"; @@ -10,6 +10,13 @@ export type WebListenerCloseReason = { }; export type ActiveWebSendOptions = { + quotedMessageKey?: { + id: string; + remoteJid: string; + fromMe: boolean; + participant?: string; + messageText?: string; + }; gifPlayback?: boolean; accountId?: string; fileName?: string; @@ -67,11 +74,12 @@ export type WebInboundMessage = { fromMe?: boolean; location?: NormalizedLocation; sendComposing: () => Promise; - reply: (text: string) => Promise; - sendMedia: (payload: AnyMessageContent) => Promise; + reply: (text: string, options?: MiscMessageGenerationOptions) => Promise; + sendMedia: (payload: AnyMessageContent, options?: MiscMessageGenerationOptions) => Promise; mediaPath?: string; mediaType?: string; mediaFileName?: string; mediaUrl?: string; wasMentioned?: boolean; + isBatched?: boolean; }; diff --git a/extensions/whatsapp/src/outbound-base.test.ts b/extensions/whatsapp/src/outbound-base.test.ts index 0509e88412a..e58aa8bf38c 100644 --- a/extensions/whatsapp/src/outbound-base.test.ts +++ b/extensions/whatsapp/src/outbound-base.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from "vitest"; import { createWhatsAppOutboundBase } from "./outbound-base.js"; import { createWhatsAppPollFixture } from "./outbound-test-support.js"; +import { cacheInboundMessageMeta } from "./quoted-message.js"; describe("createWhatsAppOutboundBase", () => { it("exposes the provided chunker", () => { @@ -54,6 +55,302 @@ describe("createWhatsAppOutboundBase", () => { expect(result).toMatchObject({ channel: "whatsapp", messageId: "msg-1" }); }); + it("uses the configured default account for quote metadata lookup when accountId is omitted", async () => { + cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-1", { + participant: "111@s.whatsapp.net", + body: "quoted body", + }); + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-1", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendText!({ + cfg: { + channels: { + whatsapp: { + defaultAccount: "work", + accounts: { + work: {}, + }, + }, + }, + } as never, + to: "whatsapp:+15551234567", + text: "reply", + deps: { sendWhatsApp: sendMessageWhatsApp }, + replyToId: "reply-1", + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+15551234567", + "reply", + expect.objectContaining({ + quotedMessageKey: { + id: "reply-1", + remoteJid: "15551234567@s.whatsapp.net", + fromMe: false, + participant: "111@s.whatsapp.net", + messageText: "quoted body", + }, + }), + ); + }); + + it("normalizes mixed-case defaultAccount before quote metadata lookup", async () => { + cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-case", { + participant: "333@s.whatsapp.net", + body: "case-normalized body", + }); + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-case", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendText!({ + cfg: { + channels: { + whatsapp: { + defaultAccount: "Work", + accounts: { + work: {}, + other: {}, + }, + }, + }, + } as never, + to: "whatsapp:+15551234567", + text: "reply", + deps: { sendWhatsApp: sendMessageWhatsApp }, + replyToId: "reply-case", + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+15551234567", + "reply", + expect.objectContaining({ + quotedMessageKey: { + id: "reply-case", + remoteJid: "15551234567@s.whatsapp.net", + fromMe: false, + participant: "333@s.whatsapp.net", + messageText: "case-normalized body", + }, + }), + ); + }); + + it("matches sorted default-account fallback for quote metadata lookup when defaultAccount is unset", async () => { + cacheInboundMessageMeta("alpha", "15551234567@s.whatsapp.net", "reply-2", { + participant: "222@s.whatsapp.net", + body: "sorted default body", + }); + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-2", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendText!({ + cfg: { + channels: { + whatsapp: { + accounts: { + zeta: {}, + alpha: {}, + }, + }, + }, + } as never, + to: "whatsapp:+15551234567", + text: "reply", + deps: { sendWhatsApp: sendMessageWhatsApp }, + replyToId: "reply-2", + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+15551234567", + "reply", + expect.objectContaining({ + quotedMessageKey: { + id: "reply-2", + remoteJid: "15551234567@s.whatsapp.net", + fromMe: false, + participant: "222@s.whatsapp.net", + messageText: "sorted default body", + }, + }), + ); + }); + + it("reuses the cached inbound remoteJid when the outbound target normalizes differently", async () => { + cacheInboundMessageMeta("default", "277038292303944@lid", "reply-lid", { + participant: "5511976136970@s.whatsapp.net", + body: "quoted from lid chat", + fromMe: true, + }); + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-lid", + toJid: "5511976136970@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendText!({ + cfg: { + channels: { + whatsapp: { + accounts: { + default: {}, + }, + }, + }, + } as never, + to: "whatsapp:+5511976136970", + text: "reply", + accountId: "default", + deps: { sendWhatsApp: sendMessageWhatsApp }, + replyToId: "reply-lid", + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+5511976136970", + "reply", + expect.objectContaining({ + quotedMessageKey: { + id: "reply-lid", + remoteJid: "277038292303944@lid", + fromMe: true, + participant: "5511976136970@s.whatsapp.net", + messageText: "quoted from lid chat", + }, + }), + ); + }); + + it("normalizes explicit accountId before quote metadata lookup", async () => { + cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-explicit", { + participant: "333@s.whatsapp.net", + body: "explicit account body", + }); + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-explicit", + toJid: "15551234567@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendText!({ + cfg: { + channels: { + whatsapp: { + accounts: { + work: {}, + }, + }, + }, + } as never, + to: "whatsapp:+15551234567", + text: "reply", + accountId: "Work", + deps: { sendWhatsApp: sendMessageWhatsApp }, + replyToId: "reply-explicit", + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+15551234567", + "reply", + expect.objectContaining({ + quotedMessageKey: { + id: "reply-explicit", + remoteJid: "15551234567@s.whatsapp.net", + fromMe: false, + participant: "333@s.whatsapp.net", + messageText: "explicit account body", + }, + }), + ); + }); + + it("falls back to the target JID when quote metadata only exists in a different conversation", async () => { + cacheInboundMessageMeta("default", "120363400000000000@g.us", "reply-group", { + participant: "5511976136970@s.whatsapp.net", + body: "group-only body", + }); + const sendMessageWhatsApp = vi.fn(async () => ({ + messageId: "msg-group-miss", + toJid: "5511976136970@s.whatsapp.net", + })); + const outbound = createWhatsAppOutboundBase({ + chunker: (text) => [text], + sendMessageWhatsApp, + sendPollWhatsApp: vi.fn(), + shouldLogVerbose: () => false, + resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }), + }); + + await outbound.sendText!({ + cfg: { + channels: { + whatsapp: { + accounts: { + default: {}, + }, + }, + }, + } as never, + to: "whatsapp:+5511976136970", + text: "reply", + accountId: "default", + deps: { sendWhatsApp: sendMessageWhatsApp }, + replyToId: "reply-group", + }); + + expect(sendMessageWhatsApp).toHaveBeenCalledWith( + "whatsapp:+5511976136970", + "reply", + expect.objectContaining({ + quotedMessageKey: { + id: "reply-group", + remoteJid: "5511976136970@s.whatsapp.net", + fromMe: false, + participant: undefined, + messageText: undefined, + }, + }), + ); + }); + it("threads cfg into sendPollWhatsApp call", async () => { const sendPollWhatsApp = vi.fn(async () => ({ messageId: "wa-poll-1", diff --git a/extensions/whatsapp/src/outbound-base.ts b/extensions/whatsapp/src/outbound-base.ts index 45acef604be..72ef2a64caa 100644 --- a/extensions/whatsapp/src/outbound-base.ts +++ b/extensions/whatsapp/src/outbound-base.ts @@ -1,3 +1,9 @@ +import { + DEFAULT_ACCOUNT_ID, + listCombinedAccountIds, + normalizeOptionalAccountId, + resolveListedDefaultAccountId, +} from "openclaw/plugin-sdk/account-core"; import { createAttachedChannelResultAdapter, type ChannelOutboundAdapter, @@ -5,6 +11,8 @@ import { import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { resolveOutboundSendDep, sanitizeForPlainText } from "openclaw/plugin-sdk/infra-runtime"; import { WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS } from "./outbound-send-deps.js"; +import { lookupInboundMessageMetaForTarget } from "./quoted-message.js"; +import { toWhatsappJid } from "./text-runtime.js"; type WhatsAppChunker = NonNullable; type WhatsAppSendTextOptions = { @@ -19,6 +27,13 @@ type WhatsAppSendTextOptions = { mediaReadFile?: (filePath: string) => Promise; gifPlayback?: boolean; accountId?: string; + quotedMessageKey?: { + id: string; + remoteJid: string; + fromMe: boolean; + participant?: string; + messageText?: string; + }; }; type WhatsAppSendMessage = ( to: string, @@ -41,6 +56,25 @@ type CreateWhatsAppOutboundBaseParams = { skipEmptyText?: boolean; }; +function resolveQuoteLookupAccountId(cfg?: OpenClawConfig, accountId?: string | null): string { + const explicitAccountId = normalizeOptionalAccountId(accountId); + if (explicitAccountId) { + return explicitAccountId; + } + const channelCfg = cfg?.channels?.whatsapp; + const configuredIds = listCombinedAccountIds({ + configuredAccountIds: + channelCfg?.accounts && typeof channelCfg.accounts === "object" + ? Object.keys(channelCfg.accounts).filter(Boolean) + : [], + fallbackAccountIdWhenEmpty: DEFAULT_ACCOUNT_ID, + }); + return resolveListedDefaultAccountId({ + accountIds: configuredIds, + configuredDefaultAccountId: normalizeOptionalAccountId(channelCfg?.defaultAccount), + }); +} + export function createWhatsAppOutboundBase({ chunker, sendMessageWhatsApp, @@ -62,6 +96,26 @@ export function createWhatsAppOutboundBase({ | "sendMedia" | "sendPoll" > { + const resolveQuotedMessageKey = (params: { + accountId: string; + to: string; + replyToId?: string | null; + }) => { + const replyToId = params.replyToId?.trim(); + if (!replyToId) { + return undefined; + } + const targetJid = toWhatsappJid(params.to); + const cachedMeta = lookupInboundMessageMetaForTarget(params.accountId, targetJid, replyToId); + return { + id: replyToId, + remoteJid: cachedMeta?.remoteJid ?? targetJid, + fromMe: cachedMeta?.fromMe ?? false, + participant: cachedMeta?.participant, + messageText: cachedMeta?.body, + }; + }; + return { deliveryMode: "gateway", chunker, @@ -72,7 +126,7 @@ export function createWhatsAppOutboundBase({ resolveTarget, ...createAttachedChannelResultAdapter({ channel: "whatsapp", - sendText: async ({ cfg, to, text, accountId, deps, gifPlayback }) => { + sendText: async ({ cfg, to, text, accountId, deps, gifPlayback, replyToId }) => { const normalizedText = normalizeText(text); if (skipEmptyText && !normalizedText) { return { messageId: "" }; @@ -81,11 +135,18 @@ export function createWhatsAppOutboundBase({ resolveOutboundSendDep(deps, "whatsapp", { legacyKeys: WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS, }) ?? sendMessageWhatsApp; + const lookupAccountId = resolveQuoteLookupAccountId(cfg, accountId); + const quotedMessageKey = resolveQuotedMessageKey({ + accountId: lookupAccountId, + to, + replyToId, + }); return await send(to, normalizedText, { verbose: false, cfg, accountId: accountId ?? undefined, gifPlayback, + quotedMessageKey, }); }, sendMedia: async ({ @@ -99,11 +160,18 @@ export function createWhatsAppOutboundBase({ accountId, deps, gifPlayback, + replyToId, }) => { const send = resolveOutboundSendDep(deps, "whatsapp", { legacyKeys: WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS, }) ?? sendMessageWhatsApp; + const lookupAccountId = resolveQuoteLookupAccountId(cfg, accountId); + const quotedMessageKey = resolveQuotedMessageKey({ + accountId: lookupAccountId, + to, + replyToId, + }); return await send(to, normalizeText(text), { verbose: false, cfg, @@ -113,6 +181,7 @@ export function createWhatsAppOutboundBase({ mediaReadFile, accountId: accountId ?? undefined, gifPlayback, + quotedMessageKey, }); }, sendPoll: async ({ cfg, to, poll, accountId }) => diff --git a/extensions/whatsapp/src/quoted-message.test.ts b/extensions/whatsapp/src/quoted-message.test.ts new file mode 100644 index 00000000000..7905be00e05 --- /dev/null +++ b/extensions/whatsapp/src/quoted-message.test.ts @@ -0,0 +1,83 @@ +import { describe, expect, it } from "vitest"; +import { + cacheInboundMessageMeta, + lookupInboundMessageMeta, + lookupInboundMessageMetaForTarget, +} from "./quoted-message.js"; + +describe("quoted message metadata cache", () => { + it("scopes cached metadata by account id", () => { + cacheInboundMessageMeta("account-a", "1555@s.whatsapp.net", "msg-1", { + participant: "111@s.whatsapp.net", + body: "hello from a", + fromMe: true, + }); + cacheInboundMessageMeta("account-b", "1555@s.whatsapp.net", "msg-1", { + participant: "222@s.whatsapp.net", + body: "hello from b", + fromMe: false, + }); + + expect(lookupInboundMessageMeta("account-a", "1555@s.whatsapp.net", "msg-1")).toEqual({ + participant: "111@s.whatsapp.net", + body: "hello from a", + fromMe: true, + }); + expect(lookupInboundMessageMeta("account-b", "1555@s.whatsapp.net", "msg-1")).toEqual({ + participant: "222@s.whatsapp.net", + body: "hello from b", + fromMe: false, + }); + }); + + it("can recover the original remoteJid for a matching direct-chat target", () => { + cacheInboundMessageMeta("account-c", "277038292303944@lid", "msg-2", { + participant: "5511976136970@s.whatsapp.net", + body: "hello from lid chat", + fromMe: true, + }); + + expect( + lookupInboundMessageMetaForTarget("account-c", "5511976136970@s.whatsapp.net", "msg-2"), + ).toEqual({ + remoteJid: "277038292303944@lid", + participant: "5511976136970@s.whatsapp.net", + body: "hello from lid chat", + fromMe: true, + }); + expect( + lookupInboundMessageMetaForTarget("account-c", "99999999999@s.whatsapp.net", "msg-2"), + ).toBeUndefined(); + expect( + lookupInboundMessageMetaForTarget("missing", "5511976136970@s.whatsapp.net", "msg-2"), + ).toBeUndefined(); + }); + + it("can recover a direct-chat remoteJid when only sender E164 was cached", () => { + cacheInboundMessageMeta("account-e", "277038292303944@lid", "msg-4", { + participantE164: "+5511976136970", + body: "hello from e164 participant", + }); + + expect( + lookupInboundMessageMetaForTarget("account-e", "5511976136970@s.whatsapp.net", "msg-4"), + ).toEqual({ + remoteJid: "277038292303944@lid", + participant: undefined, + participantE164: "+5511976136970", + body: "hello from e164 participant", + fromMe: undefined, + }); + }); + + it("does not recover metadata from another chat when the target conversation differs", () => { + cacheInboundMessageMeta("account-d", "120363400000000000@g.us", "msg-3", { + participant: "111@s.whatsapp.net", + body: "group secret", + }); + + expect( + lookupInboundMessageMetaForTarget("account-d", "222@s.whatsapp.net", "msg-3"), + ).toBeUndefined(); + }); +}); diff --git a/extensions/whatsapp/src/quoted-message.ts b/extensions/whatsapp/src/quoted-message.ts new file mode 100644 index 00000000000..4b369986115 --- /dev/null +++ b/extensions/whatsapp/src/quoted-message.ts @@ -0,0 +1,192 @@ +import type { MiscMessageGenerationOptions } from "@whiskeysockets/baileys"; +import { jidToE164 } from "./text-runtime.js"; + +export type QuotedMessageKey = { + id: string; + remoteJid: string; + fromMe: boolean; + participant?: string; + messageText?: string; +}; + +// ── Inbound message metadata cache ────────────────────────────────────── +// Maps messageId → { participant, participantE164, body, fromMe } so the +// outbound adapter can +// populate the quote key with the sender JID and preview text even though +// the outbound path only receives a bare messageId string. + +type QuotedMeta = { + participant?: string; + participantE164?: string; + body?: string; + fromMe?: boolean; +}; +type CacheEntry = QuotedMeta & { ts: number }; +type QuotedMetaLookup = QuotedMeta & { remoteJid: string }; + +const CACHE_TTL_MS = 10 * 60 * 1000; +const MAX_ENTRIES = 500; +const cache = new Map(); + +function makeCacheKey(accountId: string, remoteJid: string, messageId: string): string { + return `${accountId}:${remoteJid}:${messageId}`; +} + +export function cacheInboundMessageMeta( + accountId: string, + remoteJid: string, + messageId: string, + meta: QuotedMeta, +): void { + if (!accountId || !messageId || !remoteJid) { + return; + } + if (cache.size >= MAX_ENTRIES) { + const oldest = cache.keys().next().value; + if (oldest) { + cache.delete(oldest); + } + } + cache.set(makeCacheKey(accountId, remoteJid, messageId), { ...meta, ts: Date.now() }); +} + +export function lookupInboundMessageMeta( + accountId: string, + remoteJid: string, + messageId: string, +): QuotedMeta | undefined { + const cacheKey = makeCacheKey(accountId, remoteJid, messageId); + const entry = cache.get(cacheKey); + if (!entry) { + return undefined; + } + if (Date.now() - entry.ts > CACHE_TTL_MS) { + cache.delete(cacheKey); + return undefined; + } + return { + participant: entry.participant, + participantE164: entry.participantE164, + body: entry.body, + fromMe: entry.fromMe, + }; +} + +function normalizeComparableJid(jid: string | undefined): string | undefined { + const normalized = jid?.trim().replace(/:\d+/, "").toLowerCase(); + return normalized || undefined; +} + +function isGroupJid(jid: string | undefined): boolean { + return Boolean(jid && jid.endsWith("@g.us")); +} + +function areComparableE164sEqual(left: string | undefined, right: string | undefined): boolean { + const normalizedLeft = left?.trim(); + const normalizedRight = right?.trim(); + if (!normalizedLeft || !normalizedRight) { + return false; + } + return normalizedLeft === normalizedRight; +} + +function areComparableJidsEqual(left: string | undefined, right: string | undefined): boolean { + const normalizedLeft = normalizeComparableJid(left); + const normalizedRight = normalizeComparableJid(right); + if (!normalizedLeft || !normalizedRight) { + return false; + } + if (normalizedLeft === normalizedRight) { + return true; + } + const leftE164 = jidToE164(normalizedLeft); + const rightE164 = jidToE164(normalizedRight); + return Boolean(leftE164 && rightE164 && leftE164 === rightE164); +} + +function matchesQuotedConversationTarget(targetJid: string, candidate: QuotedMetaLookup): boolean { + if (areComparableJidsEqual(targetJid, candidate.remoteJid)) { + return true; + } + if (isGroupJid(targetJid) || isGroupJid(candidate.remoteJid)) { + return false; + } + return ( + areComparableJidsEqual(targetJid, candidate.participant) || + areComparableE164sEqual(jidToE164(targetJid) ?? undefined, candidate.participantE164) + ); +} + +export function lookupInboundMessageMetaForTarget( + accountId: string, + targetJid: string, + messageId: string, +): QuotedMetaLookup | undefined { + if (!accountId || !messageId || !targetJid) { + return undefined; + } + const exact = lookupInboundMessageMeta(accountId, targetJid, messageId); + if (exact) { + return { + remoteJid: targetJid, + participant: exact.participant, + participantE164: exact.participantE164, + body: exact.body, + fromMe: exact.fromMe, + }; + } + const prefix = `${accountId}:`; + const suffix = `:${messageId}`; + let matched: QuotedMetaLookup | undefined; + for (const [cacheKey, entry] of cache.entries()) { + if (!cacheKey.startsWith(prefix) || !cacheKey.endsWith(suffix)) { + continue; + } + if (Date.now() - entry.ts > CACHE_TTL_MS) { + cache.delete(cacheKey); + continue; + } + const remoteJid = cacheKey.slice(prefix.length, cacheKey.length - suffix.length); + const candidate = { + remoteJid, + participant: entry.participant, + participantE164: entry.participantE164, + body: entry.body, + fromMe: entry.fromMe, + }; + if (!matchesQuotedConversationTarget(targetJid, candidate)) { + continue; + } + if (matched) { + return undefined; + } + matched = candidate; + } + return matched; +} + +export function buildQuotedMessageOptions(params: { + messageId?: string | null; + remoteJid?: string | null; + fromMe?: boolean; + participant?: string; + /** Original message text — shown in the quote preview bubble. */ + messageText?: string; +}): MiscMessageGenerationOptions | undefined { + const id = params.messageId?.trim(); + const remoteJid = params.remoteJid?.trim(); + if (!id || !remoteJid) { + return undefined; + } + return { + quoted: { + key: { + remoteJid, + id, + fromMe: params.fromMe ?? false, + participant: params.participant, + }, + message: { conversation: params.messageText ?? "" }, + }, + } as MiscMessageGenerationOptions; +} diff --git a/extensions/whatsapp/src/send.ts b/extensions/whatsapp/src/send.ts index 9eafa4af66d..3363fd8e08a 100644 --- a/extensions/whatsapp/src/send.ts +++ b/extensions/whatsapp/src/send.ts @@ -62,6 +62,13 @@ export async function sendMessageWhatsApp( mediaReadFile?: (filePath: string) => Promise; gifPlayback?: boolean; accountId?: string; + quotedMessageKey?: { + id: string; + remoteJid: string; + fromMe: boolean; + participant?: string; + messageText?: string; + }; }, ): Promise<{ messageId: string; toJid: string }> { let text = body.trimStart(); @@ -135,10 +142,11 @@ export async function sendMessageWhatsApp( const hasExplicitAccountId = Boolean(options.accountId?.trim()); const accountId = hasExplicitAccountId ? resolvedAccountId : undefined; const sendOptions: ActiveWebSendOptions | undefined = - options.gifPlayback || accountId || documentFileName + options.gifPlayback || accountId || documentFileName || options.quotedMessageKey ? { ...(options.gifPlayback ? { gifPlayback: true } : {}), ...(documentFileName ? { fileName: documentFileName } : {}), + ...(options.quotedMessageKey ? { quotedMessageKey: options.quotedMessageKey } : {}), accountId, } : undefined; diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index e59405e02dd..33984583c50 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -566,6 +566,7 @@ export async function runAgentTurnWithFallback(params: { commandBody: string; followupRun: FollowupRun; sessionCtx: TemplateContext; + replyThreading?: TemplateContext["ReplyThreading"]; replyOperation?: ReplyOperation; opts?: GetReplyOptions; typingSignals: TypingSignaler; @@ -842,6 +843,7 @@ export async function runAgentTurnWithFallback(params: { ? createBlockReplyDeliveryHandler({ onBlockReply: params.opts.onBlockReply, currentMessageId: params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, + replyThreading: params.replyThreading, normalizeStreamingText, applyReplyToMode: params.applyReplyToMode, normalizeMediaPaths: replyMediaContext.normalizePayload, diff --git a/src/auto-reply/reply/agent-runner.media-paths.test.ts b/src/auto-reply/reply/agent-runner.media-paths.test.ts index 725bafbc727..117e495b75f 100644 --- a/src/auto-reply/reply/agent-runner.media-paths.test.ts +++ b/src/auto-reply/reply/agent-runner.media-paths.test.ts @@ -250,7 +250,7 @@ describe("runReplyAgent media path normalization", () => { text: undefined, mediaUrl: "/tmp/outbound-media/1-chart.png", mediaUrls: ["/tmp/outbound-media/1-chart.png"], - replyToCurrent: false, + replyToCurrent: undefined, replyToId: "msg-1", replyToTag: false, audioAsVoice: false, diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index f7d3fae21c9..2d003da2c89 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -891,6 +891,7 @@ export async function runReplyAgent(params: { shouldInjectGroupIntro: boolean; typingMode: TypingMode; resetTriggered?: boolean; + replyThreadingOverride?: TemplateContext["ReplyThreading"]; replyOperation?: ReplyOperation; }): Promise { const { @@ -921,6 +922,7 @@ export async function runReplyAgent(params: { shouldInjectGroupIntro, typingMode, resetTriggered, + replyThreadingOverride, replyOperation: providedReplyOperation, } = params; @@ -1197,6 +1199,7 @@ export async function runReplyAgent(params: { commandBody, followupRun, sessionCtx, + replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading, replyOperation, opts, typingSignals, @@ -1350,6 +1353,7 @@ export async function runReplyAgent(params: { return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); } + const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; const payloadResult = await buildReplyPayloads({ payloads: payloadArray, isHeartbeat, @@ -1360,8 +1364,8 @@ export async function runReplyAgent(params: { directlySentBlockKeys, replyToMode, replyToChannel, - currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid, - replyThreading: sessionCtx.ReplyThreading, + currentMessageId, + replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading, messageProvider: followupRun.run.messageProvider, messagingToolSentTexts: runResult.messagingToolSentTexts, messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls, diff --git a/src/auto-reply/reply/commands-core.send-policy.test.ts b/src/auto-reply/reply/commands-core.send-policy.test.ts index 29d5de8f50c..3f749bbf503 100644 --- a/src/auto-reply/reply/commands-core.send-policy.test.ts +++ b/src/auto-reply/reply/commands-core.send-policy.test.ts @@ -1,8 +1,12 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { HandleCommandsParams } from "./commands-types.js"; +import type { CommandHandler, HandleCommandsParams } from "./commands-types.js"; + +const loadCommandHandlersMock = vi.hoisted( + (): ReturnType CommandHandler[]>> => vi.fn<() => CommandHandler[]>(() => []), +); vi.mock("./commands-handlers.runtime.js", () => ({ - loadCommandHandlers: () => [], + loadCommandHandlers: () => loadCommandHandlersMock(), })); vi.mock("./commands-reset.js", () => ({ @@ -13,8 +17,6 @@ vi.mock("../commands-registry.js", () => ({ shouldHandleTextCommands: vi.fn(() => true), })); -import { handleCommands } from "./commands-core.js"; - function makeParams(): HandleCommandsParams { return { cfg: { @@ -76,9 +78,12 @@ function makeParams(): HandleCommandsParams { describe("handleCommands send policy", () => { beforeEach(() => { vi.clearAllMocks(); + vi.resetModules(); + loadCommandHandlersMock.mockReturnValue([]); }); it("allows processing to continue even when send policy is deny (#53328)", async () => { + const { handleCommands } = await import("./commands-core.js"); // sendPolicy deny now only suppresses outbound delivery, not inbound processing. // The deny gate moved to dispatch-from-config.ts where it suppresses delivery // after the agent has processed the message. @@ -86,4 +91,29 @@ describe("handleCommands send policy", () => { expect(result).toEqual({ shouldContinue: true }); }); + + it("marks command replies as non-threaded", async () => { + const { handleCommands } = await import("./commands-core.js"); + loadCommandHandlersMock.mockReturnValue([ + vi.fn(async () => ({ + shouldContinue: false, + reply: { + text: "done", + replyToId: "msg-123", + replyToCurrent: true, + }, + })), + ]); + + const result = await handleCommands(makeParams()); + + expect(result).toEqual({ + shouldContinue: false, + reply: { + text: "done", + replyToId: undefined, + replyToCurrent: false, + }, + }); + }); }); diff --git a/src/auto-reply/reply/commands-core.ts b/src/auto-reply/reply/commands-core.ts index 90106aefd62..6a106713104 100644 --- a/src/auto-reply/reply/commands-core.ts +++ b/src/auto-reply/reply/commands-core.ts @@ -17,13 +17,27 @@ function loadCommandHandlersRuntime() { let HANDLERS: CommandHandler[] | null = null; +function normalizeCommandHandlerResult(result: CommandHandlerResult): CommandHandlerResult { + if (!result.reply) { + return result; + } + return { + ...result, + reply: { + ...result.reply, + replyToId: undefined, + replyToCurrent: false, + }, + }; +} + export async function handleCommands(params: HandleCommandsParams): Promise { if (HANDLERS === null) { HANDLERS = (await loadCommandHandlersRuntime()).loadCommandHandlers(); } const resetResult = await maybeHandleResetCommand(params); if (resetResult) { - return resetResult; + return normalizeCommandHandlerResult(resetResult); } const allowTextCommands = shouldHandleTextCommands({ @@ -35,7 +49,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise { it("does not send a standalone reset notice for reply-producing /new turns", async () => { await runPreparedReply( baseParams({ + ctx: { + Body: "/new", + RawBody: "/new", + CommandBody: "/new", + }, + command: { + ...(baseParams().command as Record), + commandBodyNormalized: "/new", + rawBodyNormalized: "/new", + } as never, resetTriggered: true, }), ); const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0]; expect(call?.resetTriggered).toBe(true); + expect(call?.replyThreadingOverride).toEqual({ implicitCurrentMessage: "deny" }); expect(vi.mocked(routeReply)).not.toHaveBeenCalled(); }); @@ -541,6 +552,7 @@ describe("runPreparedReply media-only handling", () => { "User note for this reset turn (treat as ordinary user input, not startup instructions):", ); expect(call?.followupRun.prompt).toContain("re-read persona files"); + expect(call?.replyThreadingOverride).toEqual({ implicitCurrentMessage: "deny" }); }); it("does not emit a reset notice when /new is attempted during gateway drain", async () => { diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 29a9d6910b9..6915bfc521e 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -766,6 +766,14 @@ export async function runPreparedReply( }, }; + const replyThreadingOverride = + isBareSessionReset && sessionCtx.ReplyThreading?.implicitCurrentMessage !== "deny" + ? { + ...sessionCtx.ReplyThreading, + implicitCurrentMessage: "deny" as const, + } + : undefined; + return runReplyAgent({ commandBody: prefixedCommandBody, followupRun, @@ -799,5 +807,6 @@ export async function runPreparedReply( shouldInjectGroupIntro, typingMode, resetTriggered: effectiveResetTriggered, + replyThreadingOverride, }); } diff --git a/src/auto-reply/reply/reply-delivery.test.ts b/src/auto-reply/reply/reply-delivery.test.ts index 4a7a7bdd3ec..6118f689e34 100644 --- a/src/auto-reply/reply/reply-delivery.test.ts +++ b/src/auto-reply/reply/reply-delivery.test.ts @@ -111,6 +111,38 @@ describe("createBlockReplyDeliveryHandler", () => { }); }); + it("suppresses implicit current-message threading for block replies when reply threading denies it", async () => { + const blockReplyPipeline = { + enqueue: vi.fn(), + } as unknown as BlockReplyPipelineLike; + + const handler = createBlockReplyDeliveryHandler({ + onBlockReply: vi.fn(async () => {}), + currentMessageId: "msg-123", + replyThreading: { implicitCurrentMessage: "deny" }, + normalizeStreamingText: (payload) => ({ text: payload.text, skip: false }), + applyReplyToMode: (payload) => payload, + typingSignals: { + signalTextDelta: vi.fn(async () => {}), + } as unknown as TypingSignaler, + blockStreamingEnabled: true, + blockReplyPipeline, + directlySentBlockKeys: new Set(), + }); + + await handler({ text: "reset intro" }); + + expect(blockReplyPipeline.enqueue).toHaveBeenCalledWith({ + text: "reset intro", + mediaUrl: undefined, + replyToId: undefined, + replyToCurrent: undefined, + replyToTag: undefined, + audioAsVoice: false, + mediaUrls: undefined, + }); + }); + it("parses media directives in block replies before path normalization", () => { const normalized = normalizeReplyPayloadDirectives({ payload: { text: "Result\nMEDIA: ./image.png" }, @@ -139,6 +171,16 @@ describe("createBlockReplyDeliveryHandler", () => { }); }); + it("does not mark plain replies as explicit reply_to_current opt-outs", () => { + const normalized = normalizeReplyPayloadDirectives({ + payload: { text: "plain reply" }, + trimLeadingWhitespace: true, + parseMode: "auto", + }); + + expect(normalized.payload.replyToCurrent).toBeUndefined(); + }); + it("passes normalized media block replies through media path normalization", async () => { const blockReplyPipeline = { enqueue: vi.fn(), @@ -169,7 +211,7 @@ describe("createBlockReplyDeliveryHandler", () => { mediaUrl: absPath, mediaUrls: [absPath], replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }); diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts index 9547ba1bc1a..690512cff6e 100644 --- a/src/auto-reply/reply/reply-delivery.ts +++ b/src/auto-reply/reply/reply-delivery.ts @@ -2,7 +2,7 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-pay import { logVerbose } from "../../globals.js"; import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; -import type { BlockReplyContext, ReplyPayload } from "../types.js"; +import type { BlockReplyContext, ReplyPayload, ReplyThreadingPolicy } from "../types.js"; import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; import { createBlockReplyContentKey } from "./block-reply-pipeline.js"; import { parseReplyDirectives } from "./reply-directives.js"; @@ -77,6 +77,7 @@ async function sendDirectBlockReply(params: { export function createBlockReplyDeliveryHandler(params: { onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; currentMessageId?: string; + replyThreading?: ReplyThreadingPolicy; normalizeStreamingText: (payload: ReplyPayload) => { text?: string; skip: boolean }; applyReplyToMode: (payload: ReplyPayload) => ReplyPayload; normalizeMediaPaths?: (payload: ReplyPayload) => Promise; @@ -91,6 +92,13 @@ export function createBlockReplyDeliveryHandler(params: { return; } + const implicitCurrentMessageAllowed = + payload.replyToCurrent === true + ? true + : payload.replyToCurrent === false + ? false + : params.replyThreading?.implicitCurrentMessage !== "deny"; + const taggedPayload = applyReplyTagsToPayload( { ...payload, @@ -98,7 +106,7 @@ export function createBlockReplyDeliveryHandler(params: { mediaUrl: payload.mediaUrl ?? payload.mediaUrls?.[0], replyToId: payload.replyToId ?? - (payload.replyToCurrent === false ? undefined : params.currentMessageId), + (implicitCurrentMessageAllowed ? params.currentMessageId : undefined), }, params.currentMessageId, ); diff --git a/src/auto-reply/reply/reply-directives.ts b/src/auto-reply/reply/reply-directives.ts index 5d7d18b3861..14faf7e4e1d 100644 --- a/src/auto-reply/reply/reply-directives.ts +++ b/src/auto-reply/reply/reply-directives.ts @@ -7,7 +7,7 @@ export type ReplyDirectiveParseResult = { mediaUrls?: string[]; mediaUrl?: string; replyToId?: string; - replyToCurrent: boolean; + replyToCurrent?: boolean; replyToTag: boolean; audioAsVoice?: boolean; isSilent: boolean; @@ -41,7 +41,7 @@ export function parseReplyDirectives( mediaUrls: split.mediaUrls, mediaUrl: split.mediaUrl, replyToId: replyParsed.replyToId, - replyToCurrent: replyParsed.replyToCurrent, + replyToCurrent: replyParsed.replyToCurrent || undefined, replyToTag: replyParsed.hasReplyTag, audioAsVoice: split.audioAsVoice, isSilent, diff --git a/src/auto-reply/reply/reply-threading.test.ts b/src/auto-reply/reply/reply-threading.test.ts index 2f3d97be140..f80baf5c84e 100644 --- a/src/auto-reply/reply/reply-threading.test.ts +++ b/src/auto-reply/reply/reply-threading.test.ts @@ -101,6 +101,39 @@ describe("resolveReplyToMode", () => { ), ).toBe("first"); }); + + it("uses registered channel threading adapters for runtime reply-mode resolution", () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "whatsapp", + source: "test", + plugin: { + id: "whatsapp", + meta: { + id: "whatsapp", + label: "WhatsApp", + selectionLabel: "WhatsApp", + docsPath: "/channels/whatsapp", + blurb: "test stub.", + }, + capabilities: { chatTypes: ["direct", "group"] }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({}), + }, + threading: { + resolveReplyToMode: ({ accountId }: { accountId?: string | null }) => + accountId === "work" ? "first" : "all", + }, + }, + }, + ]), + ); + + expect(resolveReplyToMode({} as OpenClawConfig, "whatsapp", "work", "group")).toBe("first"); + expect(resolveReplyToMode({} as OpenClawConfig, "whatsapp", "default", "group")).toBe("all"); + }); }); describe("resolveConfiguredReplyToMode", () => { diff --git a/src/auto-reply/reply/reply-threading.ts b/src/auto-reply/reply/reply-threading.ts index e894cec1fa5..32698b8aa07 100644 --- a/src/auto-reply/reply/reply-threading.ts +++ b/src/auto-reply/reply/reply-threading.ts @@ -1,3 +1,4 @@ +import { getChannelPlugin } from "../../channels/plugins/index.js"; import type { ChannelThreadingAdapter } from "../../channels/plugins/types.core.js"; import { normalizeAnyChannelId } from "../../channels/registry.js"; import type { ReplyToMode } from "../../config/types.js"; @@ -74,8 +75,17 @@ export function resolveReplyToMode( accountId?: string | null, chatType?: string | null, ): ReplyToMode { - void accountId; - return resolveConfiguredReplyToMode(cfg, channel, chatType); + const normalizedAccountId = normalizeOptionalLowercaseString(accountId); + if (!normalizedAccountId) { + return resolveConfiguredReplyToMode(cfg, channel, chatType); + } + const provider = normalizeAnyChannelId(channel) ?? normalizeOptionalLowercaseString(channel); + const threading = provider ? getChannelPlugin(provider)?.threading : undefined; + return resolveReplyToModeWithThreading(cfg, threading, { + channel, + accountId: normalizedAccountId, + chatType, + }); } export function createReplyToModeFilter( diff --git a/src/auto-reply/reply/streaming-directives.ts b/src/auto-reply/reply/streaming-directives.ts index 5fe4c9680e4..5ac7bd44913 100644 --- a/src/auto-reply/reply/streaming-directives.ts +++ b/src/auto-reply/reply/streaming-directives.ts @@ -171,7 +171,8 @@ export function createStreamingDirectiveAccumulator() { const parsed = parseChunk(combined, { silentToken: options.silentToken }); const hasTag = activeReply.hasTag || pendingReply.hasTag || parsed.replyToTag; - const sawCurrent = activeReply.sawCurrent || pendingReply.sawCurrent || parsed.replyToCurrent; + const sawCurrent = + activeReply.sawCurrent || pendingReply.sawCurrent || parsed.replyToCurrent === true; const explicitId = parsed.replyToExplicitId ?? pendingReply.explicitId ?? activeReply.explicitId; diff --git a/src/config/bundled-channel-config-metadata.generated.ts b/src/config/bundled-channel-config-metadata.generated.ts index 86b25a9b91e..ea62fa27754 100644 --- a/src/config/bundled-channel-config-metadata.generated.ts +++ b/src/config/bundled-channel-config-metadata.generated.ts @@ -15227,6 +15227,26 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ minimum: 0, maximum: 9007199254740991, }, + replyToMode: { + anyOf: [ + { + type: "string", + const: "off", + }, + { + type: "string", + const: "first", + }, + { + type: "string", + const: "all", + }, + { + type: "string", + const: "batched", + }, + ], + }, heartbeat: { type: "object", properties: { @@ -15495,6 +15515,26 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ minimum: 0, maximum: 9007199254740991, }, + replyToMode: { + anyOf: [ + { + type: "string", + const: "off", + }, + { + type: "string", + const: "first", + }, + { + type: "string", + const: "all", + }, + { + type: "string", + const: "batched", + }, + ], + }, heartbeat: { type: "object", properties: { diff --git a/src/config/types.whatsapp.ts b/src/config/types.whatsapp.ts index ecbb9e04b50..0d52fee9dea 100644 --- a/src/config/types.whatsapp.ts +++ b/src/config/types.whatsapp.ts @@ -5,6 +5,7 @@ import type { DmPolicy, GroupPolicy, MarkdownConfig, + ReplyToMode, } from "./types.base.js"; import type { ChannelHealthMonitorConfig, @@ -102,6 +103,8 @@ type WhatsAppSharedConfig = { reactionLevel?: WhatsAppReactionLevel; /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ debounceMs?: number; + /** Reply threading mode for auto-replies (off|first|all|batched). */ + replyToMode?: ReplyToMode; /** Heartbeat visibility settings. */ heartbeat?: ChannelHeartbeatVisibilityConfig; /** Channel health monitor overrides for this channel/account. */ diff --git a/src/config/zod-schema.providers-whatsapp.ts b/src/config/zod-schema.providers-whatsapp.ts index 7027adfc4b3..f1b1fb0516b 100644 --- a/src/config/zod-schema.providers-whatsapp.ts +++ b/src/config/zod-schema.providers-whatsapp.ts @@ -13,6 +13,7 @@ import { DmPolicySchema, GroupPolicySchema, MarkdownConfigSchema, + ReplyToModeSchema, } from "./zod-schema.core.js"; const ToolPolicyBySenderSchema = z.record(z.string(), ToolPolicySchema).optional(); @@ -81,6 +82,7 @@ function buildWhatsAppCommonShape(params: { useDefaults: boolean }) { debounceMs: params.useDefaults ? z.number().int().nonnegative().optional().default(0) : z.number().int().nonnegative().optional(), + replyToMode: ReplyToModeSchema.optional(), heartbeat: ChannelHeartbeatVisibilitySchema, healthMonitor: ChannelHealthMonitorSchema, }; diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 6913e86cc19..6c70b7a87c2 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -95,6 +95,8 @@ export const SendParamsSchema = Type.Object( accountId: Type.Optional(Type.String()), /** Optional agent id for per-agent media root resolution on gateway sends. */ agentId: Type.Optional(Type.String()), + /** Reply target message id for native quoted/threaded sends where supported. */ + replyToId: Type.Optional(Type.String()), /** Thread id (channel-specific meaning, e.g. Telegram forum topic id). */ threadId: Type.Optional(Type.String()), /** Optional session key for mirroring delivered output back into the transcript. */ diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index 1e3ab7ae6a9..2ebc7096ad4 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -1006,6 +1006,42 @@ describe("gateway send mirroring", () => { ); }); + it("forwards replyToId on gateway sends", async () => { + mocks.resolveOutboundTarget.mockReturnValue({ ok: true, to: "123" }); + mocks.deliverOutboundPayloads.mockResolvedValue([{ messageId: "m-reply", channel: "slack" }]); + const outboundPlugin = { outbound: { sendPoll: mocks.sendPoll } }; + mocks.getChannelPlugin.mockReturnValue(outboundPlugin); + + const { respond } = await runSend({ + to: "123", + message: "threaded completion", + channel: "slack", + replyToId: "wamid.42", + idempotencyKey: "idem-reply-to", + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "slack", + to: "123", + replyToId: "wamid.42", + }), + ); + expect(mocks.resolveOutboundSessionRoute).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "slack", + target: "123", + replyToId: "wamid.42", + }), + ); + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ messageId: "m-reply" }), + undefined, + expect.objectContaining({ channel: "slack" }), + ); + }); + it("dispatches message actions through the gateway for plugin-owned channels", async () => { const reactPlugin: ChannelPlugin = { id: "whatsapp", diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 6b3ef493e42..a7d16aa4fe8 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -393,6 +393,7 @@ export const sendHandlers: GatewayRequestHandlers = { channel?: string; accountId?: string; agentId?: string; + replyToId?: string; threadId?: string; sessionKey?: string; idempotencyKey: string; @@ -430,6 +431,7 @@ export const sendHandlers: GatewayRequestHandlers = { } const { cfg, channel } = resolvedChannel; const accountId = normalizeOptionalString(request.accountId); + const replyToId = normalizeOptionalString(request.replyToId); const threadId = normalizeOptionalString(request.threadId); const outboundChannel = channel; const plugin = resolveOutboundChannelPlugin({ channel, cfg }); @@ -485,6 +487,7 @@ export const sendHandlers: GatewayRequestHandlers = { target: deliveryTarget, currentSessionKey: providedSessionKey, resolvedTarget: idLikeTarget, + replyToId, threadId, }); const providedSessionBaseKey = @@ -530,6 +533,7 @@ export const sendHandlers: GatewayRequestHandlers = { to: deliveryTarget, accountId, payloads: outboundPayloads, + replyToId: replyToId ?? null, session: outboundSession, gifPlayback: request.gifPlayback, threadId: outboundRoute?.threadId ?? threadId ?? null, diff --git a/src/infra/outbound/message-action-runner.threading.test.ts b/src/infra/outbound/message-action-runner.threading.test.ts index 8818a84eefc..4fd9324265e 100644 --- a/src/infra/outbound/message-action-runner.threading.test.ts +++ b/src/infra/outbound/message-action-runner.threading.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import { prepareOutboundMirrorRoute, + resolveAndApplyOutboundReplyToId, resolveAndApplyOutboundThreadId, } from "./message-action-threading.js"; @@ -164,4 +165,147 @@ describe("message action threading helpers", () => { expect(resolved).toBe("thread-777"); expect(actionParams.threadId).toBe("thread-777"); }); + + it("inherits currentMessageId for same-target sends when replyToMode=all", () => { + const actionParams: Record = { + channel: "workspace", + target: "channel:C123", + message: "hi", + }; + + const resolved = resolveAndApplyOutboundReplyToId(actionParams, { + channel: "workspace", + toolContext: { + currentChannelId: "channel:C123", + currentMessageId: "msg-42", + replyToMode: "all", + }, + }); + + expect(resolved).toBe("msg-42"); + expect(actionParams.replyTo).toBe("msg-42"); + }); + + it("skips inherited reply threading for batched mode", () => { + const actionParams: Record = { + channel: "workspace", + target: "channel:C123", + message: "hi", + }; + + const resolved = resolveAndApplyOutboundReplyToId(actionParams, { + channel: "workspace", + toolContext: { + currentChannelId: "channel:C123", + currentMessageId: "msg-42", + replyToMode: "batched", + }, + }); + + expect(resolved).toBeUndefined(); + expect(actionParams.replyTo).toBeUndefined(); + }); + + it("consumes first-mode inherited reply threading only once", () => { + const actionParams: Record = { + channel: "workspace", + target: "channel:C123", + message: "hi", + }; + const hasRepliedRef = { value: false }; + + const firstResolved = resolveAndApplyOutboundReplyToId(actionParams, { + channel: "workspace", + toolContext: { + currentChannelId: "channel:C123", + currentMessageId: "msg-42", + replyToMode: "first", + hasRepliedRef, + }, + }); + + const secondResolved = resolveAndApplyOutboundReplyToId( + { + channel: "workspace", + target: "channel:C123", + message: "followup", + }, + { + channel: "workspace", + toolContext: { + currentChannelId: "channel:C123", + currentMessageId: "msg-42", + replyToMode: "first", + hasRepliedRef, + }, + }, + ); + + expect(firstResolved).toBe("msg-42"); + expect(secondResolved).toBeUndefined(); + expect(hasRepliedRef.value).toBe(true); + }); + + it("consumes first-mode when the first send uses an explicit replyTo", () => { + const hasRepliedRef = { value: false }; + const explicitResolved = resolveAndApplyOutboundReplyToId( + { + channel: "workspace", + target: "channel:C123", + message: "first", + replyTo: "explicit-1", + }, + { + channel: "workspace", + toolContext: { + currentChannelId: "channel:C123", + currentMessageId: "msg-42", + replyToMode: "first", + hasRepliedRef, + }, + }, + ); + + const inheritedResolved = resolveAndApplyOutboundReplyToId( + { + channel: "workspace", + target: "channel:C123", + message: "followup", + }, + { + channel: "workspace", + toolContext: { + currentChannelId: "channel:C123", + currentMessageId: "msg-42", + replyToMode: "first", + hasRepliedRef, + }, + }, + ); + + expect(explicitResolved).toBe("explicit-1"); + expect(inheritedResolved).toBeUndefined(); + expect(hasRepliedRef.value).toBe(true); + }); + + it("does not inherit reply threading across providers even when target ids match", () => { + const actionParams: Record = { + channel: "discord", + target: "channel:C123", + message: "hi", + }; + + const resolved = resolveAndApplyOutboundReplyToId(actionParams, { + channel: "discord", + toolContext: { + currentChannelId: "channel:C123", + currentChannelProvider: "slack", + currentMessageId: "msg-42", + replyToMode: "all", + }, + }); + + expect(resolved).toBeUndefined(); + expect(actionParams.replyTo).toBeUndefined(); + }); }); diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 2560daf4488..1b7554d0693 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -59,6 +59,7 @@ import { } from "./message-action-params.js"; import { prepareOutboundMirrorRoute, + resolveAndApplyOutboundReplyToId, resolveAndApplyOutboundThreadId, } from "./message-action-threading.js"; import type { MessagePollResult, MessageSendResult } from "./message.js"; @@ -567,7 +568,10 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise, + context: { + channel: string; + toolContext?: { + currentChannelId?: string; + currentChannelProvider?: string; + currentMessageId?: string | number; + replyToMode?: "off" | "first" | "all" | "batched"; + hasRepliedRef?: { value: boolean }; + }; + }, + ) => { + const explicitReplyTo = + typeof actionParams.replyTo === "string" ? actionParams.replyTo.trim() : ""; + if (explicitReplyTo) { + if (context.toolContext?.replyToMode === "first" && context.toolContext.hasRepliedRef) { + context.toolContext.hasRepliedRef.value = true; + } + return explicitReplyTo; + } + + const currentChannelId = context.toolContext?.currentChannelId?.trim(); + const currentChannelProvider = context.toolContext?.currentChannelProvider?.trim(); + if ( + !currentChannelId || + (currentChannelProvider && currentChannelProvider !== context.channel) + ) { + return undefined; + } + + const explicitTarget = + typeof actionParams.target === "string" + ? actionParams.target + : typeof actionParams.to === "string" + ? actionParams.to + : typeof actionParams.channelId === "string" + ? actionParams.channelId + : undefined; + if (explicitTarget && explicitTarget.trim() !== currentChannelId) { + return undefined; + } + + const currentMessageId = context.toolContext?.currentMessageId; + if (currentMessageId == null) { + return undefined; + } + + const replyToMode = context.toolContext?.replyToMode ?? "off"; + if (replyToMode === "off" || replyToMode === "batched") { + return undefined; + } + + if (replyToMode === "first") { + const hasRepliedRef = context.toolContext?.hasRepliedRef; + if (hasRepliedRef?.value) { + return undefined; + } + if (hasRepliedRef) { + hasRepliedRef.value = true; + } + } + + const resolvedReplyToId = + typeof currentMessageId === "number" ? String(currentMessageId) : currentMessageId.trim(); + if (!resolvedReplyToId) { + return undefined; + } + actionParams.replyTo = resolvedReplyToId; + return resolvedReplyToId; + }, + ); + return { + resolveAndApplyOutboundReplyToId: resolveOutboundReplyToId, resolveAndApplyOutboundThreadId: vi.fn(resolveOutboundThreadId), prepareOutboundMirrorRoute: vi.fn( async ({ diff --git a/src/infra/outbound/message-action-threading.ts b/src/infra/outbound/message-action-threading.ts index 38611ebc4c4..fdc84c72959 100644 --- a/src/infra/outbound/message-action-threading.ts +++ b/src/infra/outbound/message-action-threading.ts @@ -39,6 +39,79 @@ export function resolveAndApplyOutboundThreadId( return resolved ?? undefined; } +function isSameConversationTarget( + actionParams: Record, + channel: ChannelId, + toolContext?: ChannelThreadingToolContext, +): boolean { + const currentChannelId = toolContext?.currentChannelId?.trim(); + if (!currentChannelId) { + return false; + } + const currentChannelProvider = toolContext?.currentChannelProvider?.trim(); + if (currentChannelProvider && currentChannelProvider !== channel) { + return false; + } + const explicitTarget = + readStringParam(actionParams, "target") ?? + readStringParam(actionParams, "to") ?? + readStringParam(actionParams, "channelId"); + if (!explicitTarget) { + return true; + } + return explicitTarget.trim() === currentChannelId; +} + +export function resolveAndApplyOutboundReplyToId( + actionParams: Record, + context: { + channel: ChannelId; + toolContext?: ChannelThreadingToolContext; + }, +): string | undefined { + const explicitReplyToId = readStringParam(actionParams, "replyTo"); + if (explicitReplyToId) { + if (context.toolContext?.replyToMode === "first") { + const hasRepliedRef = context.toolContext.hasRepliedRef; + if (hasRepliedRef) { + hasRepliedRef.value = true; + } + } + return explicitReplyToId; + } + if (!isSameConversationTarget(actionParams, context.channel, context.toolContext)) { + return undefined; + } + + const currentMessageId = context.toolContext?.currentMessageId; + if (currentMessageId == null) { + return undefined; + } + + const mode = context.toolContext?.replyToMode ?? "off"; + if (mode === "off" || mode === "batched") { + return undefined; + } + + if (mode === "first") { + const hasRepliedRef = context.toolContext?.hasRepliedRef; + if (hasRepliedRef?.value) { + return undefined; + } + if (hasRepliedRef) { + hasRepliedRef.value = true; + } + } + + const resolvedReplyToId = + typeof currentMessageId === "number" ? String(currentMessageId) : currentMessageId.trim(); + if (!resolvedReplyToId) { + return undefined; + } + actionParams.replyTo = resolvedReplyToId; + return resolvedReplyToId; +} + export async function prepareOutboundMirrorRoute(params: { cfg: OpenClawConfig; channel: ChannelId; diff --git a/src/infra/outbound/message.channels.test.ts b/src/infra/outbound/message.channels.test.ts index f2f07b89abb..3c1998cdff6 100644 --- a/src/infra/outbound/message.channels.test.ts +++ b/src/infra/outbound/message.channels.test.ts @@ -310,6 +310,17 @@ describe("gateway url override hardening", () => { }, }, }, + { + name: "forwards replyToId in gateway send params", + params: { + replyToId: "wamid.42", + }, + expected: { + params: { + replyToId: "wamid.42", + }, + }, + }, ])("$name", async ({ params, expected }) => { expect(await sendThreadChatGatewayMessage(params)).toMatchObject(expected); }); diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index a5cd3a955ca..5937ab2581e 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -331,6 +331,7 @@ export async function sendMessage(params: MessageSendParams): Promise { mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"], replyToId: "123", replyToTag: true, - replyToCurrent: false, + replyToCurrent: undefined, audioAsVoice: true, }, ]); @@ -66,7 +66,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: undefined, mediaUrl: undefined, replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -100,7 +100,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: undefined, mediaUrl: undefined, replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -125,7 +125,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: undefined, mediaUrl: undefined, replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -145,7 +145,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: undefined, mediaUrl: undefined, replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -154,7 +154,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: undefined, mediaUrl: undefined, replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -173,7 +173,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: ["https://x.test/one.png"], mediaUrl: "https://x.test/one.png", replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -182,7 +182,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { mediaUrls: ["https://x.test/two.png"], mediaUrl: undefined, replyToId: undefined, - replyToCurrent: false, + replyToCurrent: undefined, replyToTag: false, audioAsVoice: false, }, @@ -392,7 +392,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { "audioAsVoice": false, "mediaUrl": undefined, "mediaUrls": undefined, - "replyToCurrent": false, + "replyToCurrent": undefined, "replyToId": undefined, "replyToTag": false, "text": "NO_REPLY with details", @@ -401,7 +401,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { "audioAsVoice": false, "mediaUrl": undefined, "mediaUrls": undefined, - "replyToCurrent": false, + "replyToCurrent": undefined, "replyToId": undefined, "replyToTag": false, "text": "{"action":"NO_REPLY","note":"keep"}", @@ -412,7 +412,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { "mediaUrls": [ "https://x.test/m1.png", ], - "replyToCurrent": false, + "replyToCurrent": undefined, "replyToId": undefined, "replyToTag": false, "text": "", @@ -423,7 +423,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { "mediaUrls": [ "https://x.test/m2.png", ], - "replyToCurrent": false, + "replyToCurrent": undefined, "replyToId": "444", "replyToTag": true, "text": "hi", @@ -435,7 +435,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { }, "mediaUrl": undefined, "mediaUrls": undefined, - "replyToCurrent": false, + "replyToCurrent": undefined, "replyToId": undefined, "replyToTag": false, "text": "BTW @@ -450,7 +450,7 @@ describe("normalizeReplyPayloadsForDelivery", () => { }, "mediaUrl": undefined, "mediaUrls": undefined, - "replyToCurrent": false, + "replyToCurrent": undefined, "replyToId": undefined, "replyToTag": false, "text": "",