feat(whatsapp): adopt replyToMode quoting (#62305)

* fix(core): align auto-reply threading behavior

* fix(core): propagate reply threading through outbound and gateway

* fix(whatsapp): use cached metadata for native quoted replies

* feat(whatsapp): add configurable native reply quoting
This commit is contained in:
Marcus Castro
2026-04-23 01:19:47 -03:00
committed by GitHub
parent 728c644e4b
commit f5f0235bb1
45 changed files with 1565 additions and 80 deletions

View File

@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
### Changes
- WhatsApp: add configurable native reply quoting with replyToMode for WhatsApp conversations. Thanks @mcaxtr.
- Providers/Amazon Bedrock Mantle: add Claude Opus 4.7 through Mantle's Anthropic Messages route with provider-owned bearer-auth streaming, so the model is actually callable without treating AWS bearer tokens like Anthropic API keys. Thanks @wirjo.
- Providers/OpenAI Codex: remove the Codex CLI auth import path from onboarding and provider discovery so OpenClaw no longer copies `~/.codex` OAuth material into agent auth stores; use browser login or device pairing instead. (#70390) Thanks @pashpashpash.
- Providers/OpenAI-compatible: mark known local backends such as vLLM, SGLang, llama.cpp, LM Studio, LocalAI, Jan, TabbyAPI, and text-generation-webui as streaming-usage compatible, so their token accounting no longer degrades to unknown/stale totals. (#68711) Thanks @gaineyllc.

View File

@@ -464,6 +464,7 @@ public struct SendParams: Codable, Sendable {
public let channel: String?
public let accountid: String?
public let agentid: String?
public let replytoid: String?
public let threadid: String?
public let sessionkey: String?
public let idempotencykey: String
@@ -477,6 +478,7 @@ public struct SendParams: Codable, Sendable {
channel: String?,
accountid: String?,
agentid: String?,
replytoid: String?,
threadid: String?,
sessionkey: String?,
idempotencykey: String)
@@ -489,6 +491,7 @@ public struct SendParams: Codable, Sendable {
self.channel = channel
self.accountid = accountid
self.agentid = agentid
self.replytoid = replytoid
self.threadid = threadid
self.sessionkey = sessionkey
self.idempotencykey = idempotencykey
@@ -503,6 +506,7 @@ public struct SendParams: Codable, Sendable {
case channel
case accountid = "accountId"
case agentid = "agentId"
case replytoid = "replyToId"
case threadid = "threadId"
case sessionkey = "sessionKey"
case idempotencykey = "idempotencyKey"

View File

@@ -464,6 +464,7 @@ public struct SendParams: Codable, Sendable {
public let channel: String?
public let accountid: String?
public let agentid: String?
public let replytoid: String?
public let threadid: String?
public let sessionkey: String?
public let idempotencykey: String
@@ -477,6 +478,7 @@ public struct SendParams: Codable, Sendable {
channel: String?,
accountid: String?,
agentid: String?,
replytoid: String?,
threadid: String?,
sessionkey: String?,
idempotencykey: String)
@@ -489,6 +491,7 @@ public struct SendParams: Codable, Sendable {
self.channel = channel
self.accountid = accountid
self.agentid = agentid
self.replytoid = replytoid
self.threadid = threadid
self.sessionkey = sessionkey
self.idempotencykey = idempotencykey
@@ -503,6 +506,7 @@ public struct SendParams: Codable, Sendable {
case channel
case accountid = "accountId"
case agentid = "agentId"
case replytoid = "replyToId"
case threadid = "threadId"
case sessionkey = "sessionKey"
case idempotencykey = "idempotencyKey"

View File

@@ -7,7 +7,7 @@ import {
resolveUserPath,
type OpenClawConfig,
} from "openclaw/plugin-sdk/account-core";
import type { DmPolicy, GroupPolicy } from "openclaw/plugin-sdk/config-runtime";
import type { DmPolicy, GroupPolicy, ReplyToMode } from "openclaw/plugin-sdk/config-runtime";
import { resolveOAuthDir } from "openclaw/plugin-sdk/state-paths";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import { resolveMergedWhatsAppAccountConfig } from "./account-config.js";
@@ -38,6 +38,7 @@ export type ResolvedWhatsAppAccount = {
groups?: WhatsAppAccountConfig["groups"];
direct?: WhatsAppAccountConfig["direct"];
debounceMs?: number;
replyToMode?: ReplyToMode;
};
export const DEFAULT_WHATSAPP_MEDIA_MAX_MB = 50;
@@ -153,6 +154,7 @@ export function resolveWhatsAppAccount(params: {
groups: merged.groups,
direct: merged.direct,
debounceMs: merged.debounceMs,
replyToMode: merged.replyToMode,
};
}

View File

@@ -2,6 +2,7 @@ import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { sleep } from "openclaw/plugin-sdk/text-runtime";
import { beforeAll, describe, expect, it, vi } from "vitest";
import { loadWebMedia } from "../media.js";
import { cacheInboundMessageMeta } from "../quoted-message.js";
import type { WebInboundMsg } from "./types.js";
vi.mock("openclaw/plugin-sdk/runtime-env", async () => {
@@ -35,7 +36,12 @@ function makeMsg(): WebInboundMsg {
return {
from: "+10000000000",
to: "+20000000000",
accountId: "work",
chatId: "15551234567@s.whatsapp.net",
chatType: "group",
id: "msg-1",
body: "latest batch body",
senderJid: "222@s.whatsapp.net",
reply: vi.fn(async () => undefined),
sendMedia: vi.fn(async () => undefined),
} as unknown as WebInboundMsg;
@@ -120,6 +126,7 @@ describe("deliverWebReply", () => {
expect(msg.reply).toHaveBeenCalledTimes(1);
expect(msg.reply).toHaveBeenCalledWith(
"Intro line\nReasoning: appears in content but is not a prefix",
undefined,
);
});
@@ -136,11 +143,59 @@ describe("deliverWebReply", () => {
});
expect(msg.reply).toHaveBeenCalledTimes(2);
expect(msg.reply).toHaveBeenNthCalledWith(1, "aaa");
expect(msg.reply).toHaveBeenNthCalledWith(2, "aaa");
expect(msg.reply).toHaveBeenNthCalledWith(1, "aaa", undefined);
expect(msg.reply).toHaveBeenNthCalledWith(2, "aaa", undefined);
expect(replyLogger.info).toHaveBeenCalledWith(expect.any(Object), "auto-reply sent (text)");
});
it("keeps quote threading on every text chunk for a threaded reply", async () => {
const msg = makeMsg();
cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-1", {
participant: "111@s.whatsapp.net",
body: "quoted body",
fromMe: true,
});
await deliverWebReply({
replyResult: { text: "aaaaaa", replyToId: "reply-1" },
msg,
maxMediaBytes: 1024 * 1024,
textLimit: 3,
replyLogger,
skipLog: true,
});
expect(msg.reply).toHaveBeenCalledTimes(2);
expect(msg.reply).toHaveBeenNthCalledWith(
1,
"aaa",
expect.objectContaining({
quoted: expect.objectContaining({
key: expect.objectContaining({
id: "reply-1",
fromMe: true,
participant: "111@s.whatsapp.net",
}),
message: { conversation: "quoted body" },
}),
}),
);
expect(msg.reply).toHaveBeenNthCalledWith(
2,
"aaa",
expect.objectContaining({
quoted: expect.objectContaining({
key: expect.objectContaining({
id: "reply-1",
fromMe: true,
participant: "111@s.whatsapp.net",
}),
message: { conversation: "quoted body" },
}),
}),
);
});
it.each(["connection closed", "operation timed out"])(
"retries text send on transient failure: %s",
async (errorMessage) => {
@@ -188,12 +243,67 @@ describe("deliverWebReply", () => {
caption: "aaa",
mimetype: "image/jpeg",
}),
undefined,
);
expect(msg.reply).toHaveBeenCalledWith("aaa");
expect(msg.reply).toHaveBeenCalledWith("aaa", undefined);
expect(replyLogger.info).toHaveBeenCalledWith(expect.any(Object), "auto-reply sent (media)");
expect(logVerbose).toHaveBeenCalled();
});
it("keeps quote threading on media and trailing text chunks for a threaded reply", async () => {
const msg = makeMsg();
mockLoadedImageMedia();
cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-2", {
participant: "111@s.whatsapp.net",
body: "quoted media body",
fromMe: true,
});
await deliverWebReply({
replyResult: {
text: "captiontrail",
mediaUrl: "http://example.com/img.jpg",
replyToId: "reply-2",
},
msg,
maxMediaBytes: 1024 * 1024,
textLimit: 7,
replyLogger,
skipLog: true,
});
expect(msg.sendMedia).toHaveBeenCalledWith(
expect.objectContaining({
image: expect.any(Buffer),
caption: "caption",
mimetype: "image/jpeg",
}),
expect.objectContaining({
quoted: expect.objectContaining({
key: expect.objectContaining({
id: "reply-2",
fromMe: true,
participant: "111@s.whatsapp.net",
}),
message: { conversation: "quoted media body" },
}),
}),
);
expect(msg.reply).toHaveBeenCalledWith(
"trail",
expect.objectContaining({
quoted: expect.objectContaining({
key: expect.objectContaining({
id: "reply-2",
fromMe: true,
participant: "111@s.whatsapp.net",
}),
message: { conversation: "quoted media body" },
}),
}),
);
});
it("retries media send on transient failure", async () => {
const msg = makeMsg();
mockLoadedImageMedia();
@@ -265,6 +375,7 @@ describe("deliverWebReply", () => {
mimetype: "audio/ogg",
caption: "cap",
}),
undefined,
);
});
@@ -293,6 +404,7 @@ describe("deliverWebReply", () => {
caption: "cap",
mimetype: "video/mp4",
}),
undefined,
);
});
@@ -323,6 +435,7 @@ describe("deliverWebReply", () => {
caption: "cap",
mimetype: "application/octet-stream",
}),
undefined,
);
});
});

View File

@@ -8,6 +8,7 @@ import {
} from "openclaw/plugin-sdk/reply-payload";
import { logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env";
import { loadWebMedia } from "../media.js";
import { buildQuotedMessageOptions, lookupInboundMessageMeta } from "../quoted-message.js";
import { newConnectionId } from "../reconnect.js";
import { formatError } from "../session.js";
import { convertMarkdownTables, sleep } from "../text-runtime.js";
@@ -45,6 +46,23 @@ export async function deliverWebReply(params: {
const textChunks = chunkMarkdownTextWithMode(convertedText, textLimit, chunkMode);
const mediaList = resolveOutboundMediaUrls(replyResult);
const getQuote = () => {
if (!replyResult.replyToId) {
return undefined;
}
// Use replyToId (not msg.id) so batched payloads quote the correct
// per-message target. Look up cached metadata for the specific
// message being quoted — msg.body may be a combined batch body.
const cached = lookupInboundMessageMeta(msg.accountId, msg.chatId, replyResult.replyToId);
return buildQuotedMessageOptions({
messageId: replyResult.replyToId,
remoteJid: msg.chatId,
fromMe: cached?.fromMe ?? false,
participant: cached?.participant ?? (msg.chatType === "group" ? msg.senderJid : undefined),
messageText: cached?.body ?? "",
});
};
const sendWithRetry = async (fn: () => Promise<unknown>, label: string, maxAttempts = 3) => {
let lastErr: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
@@ -73,7 +91,8 @@ export async function deliverWebReply(params: {
const totalChunks = textChunks.length;
for (const [index, chunk] of textChunks.entries()) {
const chunkStarted = Date.now();
await sendWithRetry(() => msg.reply(chunk), "text");
const quote = getQuote();
await sendWithRetry(() => msg.reply(chunk, quote), "text");
if (!skipLog) {
const durationMs = Date.now() - chunkStarted;
whatsappOutboundLog.debug(
@@ -117,47 +136,63 @@ export async function deliverWebReply(params: {
logVerbose(`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`);
}
if (media.kind === "image") {
const quote = getQuote();
await sendWithRetry(
() =>
msg.sendMedia({
image: media.buffer,
caption,
mimetype: media.contentType,
}),
msg.sendMedia(
{
image: media.buffer,
caption,
mimetype: media.contentType,
},
quote,
),
"media:image",
);
} else if (media.kind === "audio") {
const quote = getQuote();
await sendWithRetry(
() =>
msg.sendMedia({
audio: media.buffer,
ptt: true,
mimetype: media.contentType,
caption,
}),
msg.sendMedia(
{
audio: media.buffer,
ptt: true,
mimetype: media.contentType,
caption,
},
quote,
),
"media:audio",
);
} else if (media.kind === "video") {
const quote = getQuote();
await sendWithRetry(
() =>
msg.sendMedia({
video: media.buffer,
caption,
mimetype: media.contentType,
}),
msg.sendMedia(
{
video: media.buffer,
caption,
mimetype: media.contentType,
},
quote,
),
"media:video",
);
} else {
const fileName = media.fileName ?? mediaUrl.split("/").pop() ?? "file";
const mimetype = media.contentType ?? "application/octet-stream";
const quote = getQuote();
await sendWithRetry(
() =>
msg.sendMedia({
document: media.buffer,
fileName,
caption,
mimetype,
}),
msg.sendMedia(
{
document: media.buffer,
fileName,
caption,
mimetype,
},
quote,
),
"media:document",
);
}
@@ -193,12 +228,12 @@ export async function deliverWebReply(params: {
return;
}
whatsappOutboundLog.warn(`Media skipped; sent text-only to ${msg.from}`);
await msg.reply(fallbackText);
await msg.reply(fallbackText, getQuote());
},
});
// Remaining text chunks after media
for (const chunk of remainingText) {
await msg.reply(chunk);
await msg.reply(chunk, getQuote());
}
}

View File

@@ -232,6 +232,21 @@ describe("whatsapp inbound dispatch", () => {
expect(ctx.GroupSystemPrompt).toBeUndefined();
});
it("preserves reply threading policy in the inbound context", () => {
const ctx = buildWhatsAppInboundContext({
combinedBody: "hi",
conversationId: "+1000",
msg: makeMsg(),
route: makeRoute(),
sender: {
e164: "+1000",
},
replyThreading: { implicitCurrentMessage: "allow" },
});
expect(ctx.ReplyThreading).toEqual({ implicitCurrentMessage: "allow" });
});
it("defaults responsePrefix to identity name in self-chats when unset", () => {
const responsePrefix = resolveWhatsAppResponsePrefix({
cfg: {

View File

@@ -40,6 +40,10 @@ type VisibleReplyTarget = {
} | null;
};
type ReplyThreadingContext = {
implicitCurrentMessage?: "default" | "allow" | "deny";
};
type SenderContext = {
id?: string;
name?: string;
@@ -91,6 +95,7 @@ export function buildWhatsAppInboundContext(params: {
msg: WebInboundMsg;
route: ReturnType<typeof resolveAgentRoute>;
sender: SenderContext;
replyThreading?: ReplyThreadingContext;
visibleReplyTo?: VisibleReplyTarget;
}) {
const inboundHistory =
@@ -102,7 +107,7 @@ export function buildWhatsAppInboundContext(params: {
}))
: undefined;
return finalizeInboundContext({
const result = finalizeInboundContext({
Body: params.combinedBody,
BodyForAgent: params.msg.body,
InboundHistory: inboundHistory,
@@ -132,6 +137,7 @@ export function buildWhatsAppInboundContext(params: {
SenderId: params.sender.id ?? params.sender.e164,
SenderE164: params.sender.e164,
CommandAuthorized: params.commandAuthorized,
ReplyThreading: params.replyThreading,
WasMentioned: params.msg.wasMentioned,
GroupSystemPrompt: params.groupSystemPrompt,
...(params.msg.location ? toLocationContext(params.msg.location) : {}),
@@ -140,6 +146,7 @@ export function buildWhatsAppInboundContext(params: {
OriginatingChannel: "whatsapp",
OriginatingTo: params.msg.from,
});
return result;
}
export function resolveWhatsAppDmRouteTarget(params: {
@@ -238,7 +245,7 @@ export async function dispatchWhatsAppBufferedReply(params: {
maxMediaBytes: number;
maxMediaTextChunkLimit?: number;
msg: WebInboundMsg;
onModelSelected?: ChannelReplyOnModelSelected | undefined;
onModelSelected?: ChannelReplyOnModelSelected;
rememberSentText: (
text: string | undefined,
opts: {

View File

@@ -1,3 +1,4 @@
import { resolveBatchedReplyThreadingPolicy } from "openclaw/plugin-sdk/reply-reference";
import { getPrimaryIdentityId, getSelfIdentity, getSenderIdentity } from "../../identity.js";
import {
resolveWhatsAppCommandAuthorized,
@@ -230,6 +231,10 @@ export async function processMessage(params: {
isSelfChat: params.msg.chatType !== "group" && inboundPolicy.isSelfChat,
pipelineResponsePrefix: replyPipeline.responsePrefix,
});
const replyThreading = resolveBatchedReplyThreadingPolicy(
account.replyToMode ?? "off",
params.msg.isBatched === true,
);
// Resolve combined conversation system prompt using the group or direct surface.
const conversationSystemPrompt =
@@ -257,6 +262,7 @@ export async function processMessage(params: {
name: sender.name ?? undefined,
e164: sender.e164 ?? undefined,
},
replyThreading,
visibleReplyTo: visibleReplyTo ?? undefined,
});

View File

@@ -67,6 +67,12 @@ export const whatsappPlugin: ChannelPlugin<ResolvedWhatsAppAccount> =
idLabel: "whatsappSenderId",
},
outbound: whatsappChannelOutbound,
threading: {
scopedAccountReplyToMode: {
resolveAccount: (cfg, accountId) => resolveWhatsAppAccount({ cfg, accountId }),
resolveReplyToMode: (account) => account.replyToMode,
},
},
base: {
...createWhatsAppPluginBase({
groups: {

View File

@@ -1,4 +1,10 @@
import type { AnyMessageContent, proto, WAMessage, WASocket } from "@whiskeysockets/baileys";
import type {
AnyMessageContent,
MiscMessageGenerationOptions,
proto,
WAMessage,
WASocket,
} from "@whiskeysockets/baileys";
import { createInboundDebouncer, formatLocationText } from "openclaw/plugin-sdk/channel-inbound";
import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime";
import { defaultRuntime } from "openclaw/plugin-sdk/runtime-env";
@@ -6,6 +12,7 @@ import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import { getChildLogger } from "openclaw/plugin-sdk/text-runtime";
import { readWebSelfIdentityForDecision, WhatsAppAuthUnstableError } from "../auth-store.js";
import { getPrimaryIdentityId, resolveComparableIdentity } from "../identity.js";
import { cacheInboundMessageMeta } from "../quoted-message.js";
import { DEFAULT_RECONNECT_POLICY, computeBackoff, sleepWithAbort } from "../reconnect.js";
import type { OpenClawConfig } from "../runtime-api.js";
import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from "../session.js";
@@ -209,6 +216,7 @@ export async function attachWebInboxToSocket(
body: combinedBody,
mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined,
mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined,
isBatched: true,
};
await options.onMessage(combinedMessage);
await finalizeInboundDedupe(entries);
@@ -247,13 +255,19 @@ export async function attachWebInboxToSocket(
});
};
const sendTrackedMessage = async (jid: string, content: AnyMessageContent) => {
const sendTrackedMessage = async (
jid: string,
content: AnyMessageContent,
sendOptions?: MiscMessageGenerationOptions,
) => {
let lastErr: unknown = new Error(RECONNECT_IN_PROGRESS_ERROR);
for (let attempt = 1; ; attempt++) {
const currentSock = getCurrentSock();
if (currentSock) {
try {
const result = await currentSock.sendMessage(jid, content);
const result = sendOptions
? await currentSock.sendMessage(jid, content, sendOptions)
: await currentSock.sendMessage(jid, content);
rememberOutboundMessage(jid, result);
return result;
} catch (err) {
@@ -398,7 +412,9 @@ export async function attachWebInboxToSocket(
messageTimestampMs,
connectedAtMs,
verbose: options.verbose,
sock: { sendMessage: (jid, content) => sendTrackedMessage(jid, content) },
sock: {
sendMessage: (jid: string, content: AnyMessageContent) => sendTrackedMessage(jid, content),
},
remoteJid,
});
if (!access.allowed) {
@@ -515,11 +531,14 @@ export async function attachWebInboxToSocket(
logWhatsAppVerbose(options.verbose, `Presence update failed: ${String(err)}`);
}
};
const reply = async (text: string) => {
await sendTrackedMessage(chatJid, { text });
const reply = async (text: string, options?: MiscMessageGenerationOptions) => {
await sendTrackedMessage(chatJid, { text }, options);
};
const sendMedia = async (payload: AnyMessageContent) => {
await sendTrackedMessage(chatJid, payload);
const sendMedia = async (
payload: AnyMessageContent,
options?: MiscMessageGenerationOptions,
) => {
await sendTrackedMessage(chatJid, payload, options);
};
const timestamp = inbound.messageTimestampMs;
const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined);
@@ -580,6 +599,15 @@ export async function attachWebInboxToSocket(
mediaFileName: enriched.mediaFileName,
dedupeKey: inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : undefined,
};
if (inboundMessage.id) {
cacheInboundMessageMeta(inboundMessage.accountId, inboundMessage.chatId, inboundMessage.id, {
participant: inboundMessage.senderJid,
participantE164:
inboundMessage.chatType === "direct" ? inboundMessage.senderE164 : undefined,
body: inboundMessage.body,
fromMe: inboundMessage.fromMe,
});
}
try {
const task = Promise.resolve(debouncer.enqueue(inboundMessage));
void task.catch((err) => {
@@ -692,7 +720,11 @@ export async function attachWebInboxToSocket(
const sendApi = createWebSendApi({
sock: {
sendMessage: (jid: string, content: AnyMessageContent) => sendTrackedMessage(jid, content),
sendMessage: (
jid: string,
content: AnyMessageContent,
options?: MiscMessageGenerationOptions,
) => sendTrackedMessage(jid, content, options),
sendPresenceUpdate: async (presence, jid?: string) => {
const currentSock = getCurrentSock();
if (!currentSock) {

View File

@@ -218,4 +218,29 @@ describe("createWebSendApi", () => {
expect(sendMessage).toHaveBeenCalledWith("123@s.whatsapp.net", { text: "hello" });
});
it("preserves the quoted remoteJid provided by the outbound adapter", async () => {
await api.sendMessage("+1555", "hello", undefined, undefined, {
quotedMessageKey: {
id: "quoted-1",
remoteJid: "277038292303944@lid",
fromMe: false,
participant: "1234@s.whatsapp.net",
messageText: "quoted body",
},
});
expect(sendMessage).toHaveBeenCalledWith(
"1555@s.whatsapp.net",
{ text: "hello" },
expect.objectContaining({
quoted: expect.objectContaining({
key: expect.objectContaining({
remoteJid: "277038292303944@lid",
id: "quoted-1",
}),
}),
}),
);
});
});

View File

@@ -1,5 +1,10 @@
import type { AnyMessageContent, WAPresence } from "@whiskeysockets/baileys";
import type {
AnyMessageContent,
MiscMessageGenerationOptions,
WAPresence,
} from "@whiskeysockets/baileys";
import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime";
import { buildQuotedMessageOptions } from "../quoted-message.js";
import { toWhatsappJid } from "../text-runtime.js";
import type { ActiveWebSendOptions } from "./types.js";
@@ -19,7 +24,11 @@ function resolveOutboundMessageId(result: unknown): string {
export function createWebSendApi(params: {
sock: {
sendMessage: (jid: string, content: AnyMessageContent) => Promise<unknown>;
sendMessage: (
jid: string,
content: AnyMessageContent,
options?: MiscMessageGenerationOptions,
) => Promise<unknown>;
sendPresenceUpdate: (presence: WAPresence, jid?: string) => Promise<unknown>;
};
defaultAccountId: string;
@@ -66,7 +75,16 @@ export function createWebSendApi(params: {
} else {
payload = { text };
}
const result = await params.sock.sendMessage(jid, payload);
const quotedOpts = buildQuotedMessageOptions({
messageId: sendOptions?.quotedMessageKey?.id,
remoteJid: sendOptions?.quotedMessageKey?.remoteJid,
fromMe: sendOptions?.quotedMessageKey?.fromMe,
participant: sendOptions?.quotedMessageKey?.participant,
messageText: sendOptions?.quotedMessageKey?.messageText,
});
const result = quotedOpts
? await params.sock.sendMessage(jid, payload, quotedOpts)
: await params.sock.sendMessage(jid, payload);
const accountId = sendOptions?.accountId ?? params.defaultAccountId;
recordWhatsAppOutbound(accountId);
const messageId = resolveOutboundMessageId(result);

View File

@@ -1,4 +1,4 @@
import type { AnyMessageContent } from "@whiskeysockets/baileys";
import type { AnyMessageContent, MiscMessageGenerationOptions } from "@whiskeysockets/baileys";
import type { NormalizedLocation } from "openclaw/plugin-sdk/channel-inbound";
import type { PollInput } from "openclaw/plugin-sdk/media-runtime";
import type { WhatsAppIdentity, WhatsAppReplyContext, WhatsAppSelfIdentity } from "../identity.js";
@@ -10,6 +10,13 @@ export type WebListenerCloseReason = {
};
export type ActiveWebSendOptions = {
quotedMessageKey?: {
id: string;
remoteJid: string;
fromMe: boolean;
participant?: string;
messageText?: string;
};
gifPlayback?: boolean;
accountId?: string;
fileName?: string;
@@ -67,11 +74,12 @@ export type WebInboundMessage = {
fromMe?: boolean;
location?: NormalizedLocation;
sendComposing: () => Promise<void>;
reply: (text: string) => Promise<void>;
sendMedia: (payload: AnyMessageContent) => Promise<void>;
reply: (text: string, options?: MiscMessageGenerationOptions) => Promise<void>;
sendMedia: (payload: AnyMessageContent, options?: MiscMessageGenerationOptions) => Promise<void>;
mediaPath?: string;
mediaType?: string;
mediaFileName?: string;
mediaUrl?: string;
wasMentioned?: boolean;
isBatched?: boolean;
};

View File

@@ -1,6 +1,7 @@
import { describe, expect, it, vi } from "vitest";
import { createWhatsAppOutboundBase } from "./outbound-base.js";
import { createWhatsAppPollFixture } from "./outbound-test-support.js";
import { cacheInboundMessageMeta } from "./quoted-message.js";
describe("createWhatsAppOutboundBase", () => {
it("exposes the provided chunker", () => {
@@ -54,6 +55,302 @@ describe("createWhatsAppOutboundBase", () => {
expect(result).toMatchObject({ channel: "whatsapp", messageId: "msg-1" });
});
it("uses the configured default account for quote metadata lookup when accountId is omitted", async () => {
cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-1", {
participant: "111@s.whatsapp.net",
body: "quoted body",
});
const sendMessageWhatsApp = vi.fn(async () => ({
messageId: "msg-1",
toJid: "15551234567@s.whatsapp.net",
}));
const outbound = createWhatsAppOutboundBase({
chunker: (text) => [text],
sendMessageWhatsApp,
sendPollWhatsApp: vi.fn(),
shouldLogVerbose: () => false,
resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }),
});
await outbound.sendText!({
cfg: {
channels: {
whatsapp: {
defaultAccount: "work",
accounts: {
work: {},
},
},
},
} as never,
to: "whatsapp:+15551234567",
text: "reply",
deps: { sendWhatsApp: sendMessageWhatsApp },
replyToId: "reply-1",
});
expect(sendMessageWhatsApp).toHaveBeenCalledWith(
"whatsapp:+15551234567",
"reply",
expect.objectContaining({
quotedMessageKey: {
id: "reply-1",
remoteJid: "15551234567@s.whatsapp.net",
fromMe: false,
participant: "111@s.whatsapp.net",
messageText: "quoted body",
},
}),
);
});
it("normalizes mixed-case defaultAccount before quote metadata lookup", async () => {
cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-case", {
participant: "333@s.whatsapp.net",
body: "case-normalized body",
});
const sendMessageWhatsApp = vi.fn(async () => ({
messageId: "msg-case",
toJid: "15551234567@s.whatsapp.net",
}));
const outbound = createWhatsAppOutboundBase({
chunker: (text) => [text],
sendMessageWhatsApp,
sendPollWhatsApp: vi.fn(),
shouldLogVerbose: () => false,
resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }),
});
await outbound.sendText!({
cfg: {
channels: {
whatsapp: {
defaultAccount: "Work",
accounts: {
work: {},
other: {},
},
},
},
} as never,
to: "whatsapp:+15551234567",
text: "reply",
deps: { sendWhatsApp: sendMessageWhatsApp },
replyToId: "reply-case",
});
expect(sendMessageWhatsApp).toHaveBeenCalledWith(
"whatsapp:+15551234567",
"reply",
expect.objectContaining({
quotedMessageKey: {
id: "reply-case",
remoteJid: "15551234567@s.whatsapp.net",
fromMe: false,
participant: "333@s.whatsapp.net",
messageText: "case-normalized body",
},
}),
);
});
it("matches sorted default-account fallback for quote metadata lookup when defaultAccount is unset", async () => {
cacheInboundMessageMeta("alpha", "15551234567@s.whatsapp.net", "reply-2", {
participant: "222@s.whatsapp.net",
body: "sorted default body",
});
const sendMessageWhatsApp = vi.fn(async () => ({
messageId: "msg-2",
toJid: "15551234567@s.whatsapp.net",
}));
const outbound = createWhatsAppOutboundBase({
chunker: (text) => [text],
sendMessageWhatsApp,
sendPollWhatsApp: vi.fn(),
shouldLogVerbose: () => false,
resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }),
});
await outbound.sendText!({
cfg: {
channels: {
whatsapp: {
accounts: {
zeta: {},
alpha: {},
},
},
},
} as never,
to: "whatsapp:+15551234567",
text: "reply",
deps: { sendWhatsApp: sendMessageWhatsApp },
replyToId: "reply-2",
});
expect(sendMessageWhatsApp).toHaveBeenCalledWith(
"whatsapp:+15551234567",
"reply",
expect.objectContaining({
quotedMessageKey: {
id: "reply-2",
remoteJid: "15551234567@s.whatsapp.net",
fromMe: false,
participant: "222@s.whatsapp.net",
messageText: "sorted default body",
},
}),
);
});
it("reuses the cached inbound remoteJid when the outbound target normalizes differently", async () => {
cacheInboundMessageMeta("default", "277038292303944@lid", "reply-lid", {
participant: "5511976136970@s.whatsapp.net",
body: "quoted from lid chat",
fromMe: true,
});
const sendMessageWhatsApp = vi.fn(async () => ({
messageId: "msg-lid",
toJid: "5511976136970@s.whatsapp.net",
}));
const outbound = createWhatsAppOutboundBase({
chunker: (text) => [text],
sendMessageWhatsApp,
sendPollWhatsApp: vi.fn(),
shouldLogVerbose: () => false,
resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }),
});
await outbound.sendText!({
cfg: {
channels: {
whatsapp: {
accounts: {
default: {},
},
},
},
} as never,
to: "whatsapp:+5511976136970",
text: "reply",
accountId: "default",
deps: { sendWhatsApp: sendMessageWhatsApp },
replyToId: "reply-lid",
});
expect(sendMessageWhatsApp).toHaveBeenCalledWith(
"whatsapp:+5511976136970",
"reply",
expect.objectContaining({
quotedMessageKey: {
id: "reply-lid",
remoteJid: "277038292303944@lid",
fromMe: true,
participant: "5511976136970@s.whatsapp.net",
messageText: "quoted from lid chat",
},
}),
);
});
it("normalizes explicit accountId before quote metadata lookup", async () => {
cacheInboundMessageMeta("work", "15551234567@s.whatsapp.net", "reply-explicit", {
participant: "333@s.whatsapp.net",
body: "explicit account body",
});
const sendMessageWhatsApp = vi.fn(async () => ({
messageId: "msg-explicit",
toJid: "15551234567@s.whatsapp.net",
}));
const outbound = createWhatsAppOutboundBase({
chunker: (text) => [text],
sendMessageWhatsApp,
sendPollWhatsApp: vi.fn(),
shouldLogVerbose: () => false,
resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }),
});
await outbound.sendText!({
cfg: {
channels: {
whatsapp: {
accounts: {
work: {},
},
},
},
} as never,
to: "whatsapp:+15551234567",
text: "reply",
accountId: "Work",
deps: { sendWhatsApp: sendMessageWhatsApp },
replyToId: "reply-explicit",
});
expect(sendMessageWhatsApp).toHaveBeenCalledWith(
"whatsapp:+15551234567",
"reply",
expect.objectContaining({
quotedMessageKey: {
id: "reply-explicit",
remoteJid: "15551234567@s.whatsapp.net",
fromMe: false,
participant: "333@s.whatsapp.net",
messageText: "explicit account body",
},
}),
);
});
it("falls back to the target JID when quote metadata only exists in a different conversation", async () => {
cacheInboundMessageMeta("default", "120363400000000000@g.us", "reply-group", {
participant: "5511976136970@s.whatsapp.net",
body: "group-only body",
});
const sendMessageWhatsApp = vi.fn(async () => ({
messageId: "msg-group-miss",
toJid: "5511976136970@s.whatsapp.net",
}));
const outbound = createWhatsAppOutboundBase({
chunker: (text) => [text],
sendMessageWhatsApp,
sendPollWhatsApp: vi.fn(),
shouldLogVerbose: () => false,
resolveTarget: ({ to }) => ({ ok: true as const, to: to ?? "" }),
});
await outbound.sendText!({
cfg: {
channels: {
whatsapp: {
accounts: {
default: {},
},
},
},
} as never,
to: "whatsapp:+5511976136970",
text: "reply",
accountId: "default",
deps: { sendWhatsApp: sendMessageWhatsApp },
replyToId: "reply-group",
});
expect(sendMessageWhatsApp).toHaveBeenCalledWith(
"whatsapp:+5511976136970",
"reply",
expect.objectContaining({
quotedMessageKey: {
id: "reply-group",
remoteJid: "5511976136970@s.whatsapp.net",
fromMe: false,
participant: undefined,
messageText: undefined,
},
}),
);
});
it("threads cfg into sendPollWhatsApp call", async () => {
const sendPollWhatsApp = vi.fn(async () => ({
messageId: "wa-poll-1",

View File

@@ -1,3 +1,9 @@
import {
DEFAULT_ACCOUNT_ID,
listCombinedAccountIds,
normalizeOptionalAccountId,
resolveListedDefaultAccountId,
} from "openclaw/plugin-sdk/account-core";
import {
createAttachedChannelResultAdapter,
type ChannelOutboundAdapter,
@@ -5,6 +11,8 @@ import {
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { resolveOutboundSendDep, sanitizeForPlainText } from "openclaw/plugin-sdk/infra-runtime";
import { WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS } from "./outbound-send-deps.js";
import { lookupInboundMessageMetaForTarget } from "./quoted-message.js";
import { toWhatsappJid } from "./text-runtime.js";
type WhatsAppChunker = NonNullable<ChannelOutboundAdapter["chunker"]>;
type WhatsAppSendTextOptions = {
@@ -19,6 +27,13 @@ type WhatsAppSendTextOptions = {
mediaReadFile?: (filePath: string) => Promise<Buffer>;
gifPlayback?: boolean;
accountId?: string;
quotedMessageKey?: {
id: string;
remoteJid: string;
fromMe: boolean;
participant?: string;
messageText?: string;
};
};
type WhatsAppSendMessage = (
to: string,
@@ -41,6 +56,25 @@ type CreateWhatsAppOutboundBaseParams = {
skipEmptyText?: boolean;
};
function resolveQuoteLookupAccountId(cfg?: OpenClawConfig, accountId?: string | null): string {
const explicitAccountId = normalizeOptionalAccountId(accountId);
if (explicitAccountId) {
return explicitAccountId;
}
const channelCfg = cfg?.channels?.whatsapp;
const configuredIds = listCombinedAccountIds({
configuredAccountIds:
channelCfg?.accounts && typeof channelCfg.accounts === "object"
? Object.keys(channelCfg.accounts).filter(Boolean)
: [],
fallbackAccountIdWhenEmpty: DEFAULT_ACCOUNT_ID,
});
return resolveListedDefaultAccountId({
accountIds: configuredIds,
configuredDefaultAccountId: normalizeOptionalAccountId(channelCfg?.defaultAccount),
});
}
export function createWhatsAppOutboundBase({
chunker,
sendMessageWhatsApp,
@@ -62,6 +96,26 @@ export function createWhatsAppOutboundBase({
| "sendMedia"
| "sendPoll"
> {
const resolveQuotedMessageKey = (params: {
accountId: string;
to: string;
replyToId?: string | null;
}) => {
const replyToId = params.replyToId?.trim();
if (!replyToId) {
return undefined;
}
const targetJid = toWhatsappJid(params.to);
const cachedMeta = lookupInboundMessageMetaForTarget(params.accountId, targetJid, replyToId);
return {
id: replyToId,
remoteJid: cachedMeta?.remoteJid ?? targetJid,
fromMe: cachedMeta?.fromMe ?? false,
participant: cachedMeta?.participant,
messageText: cachedMeta?.body,
};
};
return {
deliveryMode: "gateway",
chunker,
@@ -72,7 +126,7 @@ export function createWhatsAppOutboundBase({
resolveTarget,
...createAttachedChannelResultAdapter({
channel: "whatsapp",
sendText: async ({ cfg, to, text, accountId, deps, gifPlayback }) => {
sendText: async ({ cfg, to, text, accountId, deps, gifPlayback, replyToId }) => {
const normalizedText = normalizeText(text);
if (skipEmptyText && !normalizedText) {
return { messageId: "" };
@@ -81,11 +135,18 @@ export function createWhatsAppOutboundBase({
resolveOutboundSendDep<WhatsAppSendMessage>(deps, "whatsapp", {
legacyKeys: WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS,
}) ?? sendMessageWhatsApp;
const lookupAccountId = resolveQuoteLookupAccountId(cfg, accountId);
const quotedMessageKey = resolveQuotedMessageKey({
accountId: lookupAccountId,
to,
replyToId,
});
return await send(to, normalizedText, {
verbose: false,
cfg,
accountId: accountId ?? undefined,
gifPlayback,
quotedMessageKey,
});
},
sendMedia: async ({
@@ -99,11 +160,18 @@ export function createWhatsAppOutboundBase({
accountId,
deps,
gifPlayback,
replyToId,
}) => {
const send =
resolveOutboundSendDep<WhatsAppSendMessage>(deps, "whatsapp", {
legacyKeys: WHATSAPP_LEGACY_OUTBOUND_SEND_DEP_KEYS,
}) ?? sendMessageWhatsApp;
const lookupAccountId = resolveQuoteLookupAccountId(cfg, accountId);
const quotedMessageKey = resolveQuotedMessageKey({
accountId: lookupAccountId,
to,
replyToId,
});
return await send(to, normalizeText(text), {
verbose: false,
cfg,
@@ -113,6 +181,7 @@ export function createWhatsAppOutboundBase({
mediaReadFile,
accountId: accountId ?? undefined,
gifPlayback,
quotedMessageKey,
});
},
sendPoll: async ({ cfg, to, poll, accountId }) =>

View File

@@ -0,0 +1,83 @@
import { describe, expect, it } from "vitest";
import {
cacheInboundMessageMeta,
lookupInboundMessageMeta,
lookupInboundMessageMetaForTarget,
} from "./quoted-message.js";
describe("quoted message metadata cache", () => {
it("scopes cached metadata by account id", () => {
cacheInboundMessageMeta("account-a", "1555@s.whatsapp.net", "msg-1", {
participant: "111@s.whatsapp.net",
body: "hello from a",
fromMe: true,
});
cacheInboundMessageMeta("account-b", "1555@s.whatsapp.net", "msg-1", {
participant: "222@s.whatsapp.net",
body: "hello from b",
fromMe: false,
});
expect(lookupInboundMessageMeta("account-a", "1555@s.whatsapp.net", "msg-1")).toEqual({
participant: "111@s.whatsapp.net",
body: "hello from a",
fromMe: true,
});
expect(lookupInboundMessageMeta("account-b", "1555@s.whatsapp.net", "msg-1")).toEqual({
participant: "222@s.whatsapp.net",
body: "hello from b",
fromMe: false,
});
});
it("can recover the original remoteJid for a matching direct-chat target", () => {
cacheInboundMessageMeta("account-c", "277038292303944@lid", "msg-2", {
participant: "5511976136970@s.whatsapp.net",
body: "hello from lid chat",
fromMe: true,
});
expect(
lookupInboundMessageMetaForTarget("account-c", "5511976136970@s.whatsapp.net", "msg-2"),
).toEqual({
remoteJid: "277038292303944@lid",
participant: "5511976136970@s.whatsapp.net",
body: "hello from lid chat",
fromMe: true,
});
expect(
lookupInboundMessageMetaForTarget("account-c", "99999999999@s.whatsapp.net", "msg-2"),
).toBeUndefined();
expect(
lookupInboundMessageMetaForTarget("missing", "5511976136970@s.whatsapp.net", "msg-2"),
).toBeUndefined();
});
it("can recover a direct-chat remoteJid when only sender E164 was cached", () => {
cacheInboundMessageMeta("account-e", "277038292303944@lid", "msg-4", {
participantE164: "+5511976136970",
body: "hello from e164 participant",
});
expect(
lookupInboundMessageMetaForTarget("account-e", "5511976136970@s.whatsapp.net", "msg-4"),
).toEqual({
remoteJid: "277038292303944@lid",
participant: undefined,
participantE164: "+5511976136970",
body: "hello from e164 participant",
fromMe: undefined,
});
});
it("does not recover metadata from another chat when the target conversation differs", () => {
cacheInboundMessageMeta("account-d", "120363400000000000@g.us", "msg-3", {
participant: "111@s.whatsapp.net",
body: "group secret",
});
expect(
lookupInboundMessageMetaForTarget("account-d", "222@s.whatsapp.net", "msg-3"),
).toBeUndefined();
});
});

View File

@@ -0,0 +1,192 @@
import type { MiscMessageGenerationOptions } from "@whiskeysockets/baileys";
import { jidToE164 } from "./text-runtime.js";
export type QuotedMessageKey = {
id: string;
remoteJid: string;
fromMe: boolean;
participant?: string;
messageText?: string;
};
// ── Inbound message metadata cache ──────────────────────────────────────
// Maps messageId → { participant, participantE164, body, fromMe } so the
// outbound adapter can
// populate the quote key with the sender JID and preview text even though
// the outbound path only receives a bare messageId string.
type QuotedMeta = {
participant?: string;
participantE164?: string;
body?: string;
fromMe?: boolean;
};
type CacheEntry = QuotedMeta & { ts: number };
type QuotedMetaLookup = QuotedMeta & { remoteJid: string };
const CACHE_TTL_MS = 10 * 60 * 1000;
const MAX_ENTRIES = 500;
const cache = new Map<string, CacheEntry>();
function makeCacheKey(accountId: string, remoteJid: string, messageId: string): string {
return `${accountId}:${remoteJid}:${messageId}`;
}
export function cacheInboundMessageMeta(
accountId: string,
remoteJid: string,
messageId: string,
meta: QuotedMeta,
): void {
if (!accountId || !messageId || !remoteJid) {
return;
}
if (cache.size >= MAX_ENTRIES) {
const oldest = cache.keys().next().value;
if (oldest) {
cache.delete(oldest);
}
}
cache.set(makeCacheKey(accountId, remoteJid, messageId), { ...meta, ts: Date.now() });
}
export function lookupInboundMessageMeta(
accountId: string,
remoteJid: string,
messageId: string,
): QuotedMeta | undefined {
const cacheKey = makeCacheKey(accountId, remoteJid, messageId);
const entry = cache.get(cacheKey);
if (!entry) {
return undefined;
}
if (Date.now() - entry.ts > CACHE_TTL_MS) {
cache.delete(cacheKey);
return undefined;
}
return {
participant: entry.participant,
participantE164: entry.participantE164,
body: entry.body,
fromMe: entry.fromMe,
};
}
function normalizeComparableJid(jid: string | undefined): string | undefined {
const normalized = jid?.trim().replace(/:\d+/, "").toLowerCase();
return normalized || undefined;
}
function isGroupJid(jid: string | undefined): boolean {
return Boolean(jid && jid.endsWith("@g.us"));
}
function areComparableE164sEqual(left: string | undefined, right: string | undefined): boolean {
const normalizedLeft = left?.trim();
const normalizedRight = right?.trim();
if (!normalizedLeft || !normalizedRight) {
return false;
}
return normalizedLeft === normalizedRight;
}
function areComparableJidsEqual(left: string | undefined, right: string | undefined): boolean {
const normalizedLeft = normalizeComparableJid(left);
const normalizedRight = normalizeComparableJid(right);
if (!normalizedLeft || !normalizedRight) {
return false;
}
if (normalizedLeft === normalizedRight) {
return true;
}
const leftE164 = jidToE164(normalizedLeft);
const rightE164 = jidToE164(normalizedRight);
return Boolean(leftE164 && rightE164 && leftE164 === rightE164);
}
function matchesQuotedConversationTarget(targetJid: string, candidate: QuotedMetaLookup): boolean {
if (areComparableJidsEqual(targetJid, candidate.remoteJid)) {
return true;
}
if (isGroupJid(targetJid) || isGroupJid(candidate.remoteJid)) {
return false;
}
return (
areComparableJidsEqual(targetJid, candidate.participant) ||
areComparableE164sEqual(jidToE164(targetJid) ?? undefined, candidate.participantE164)
);
}
export function lookupInboundMessageMetaForTarget(
accountId: string,
targetJid: string,
messageId: string,
): QuotedMetaLookup | undefined {
if (!accountId || !messageId || !targetJid) {
return undefined;
}
const exact = lookupInboundMessageMeta(accountId, targetJid, messageId);
if (exact) {
return {
remoteJid: targetJid,
participant: exact.participant,
participantE164: exact.participantE164,
body: exact.body,
fromMe: exact.fromMe,
};
}
const prefix = `${accountId}:`;
const suffix = `:${messageId}`;
let matched: QuotedMetaLookup | undefined;
for (const [cacheKey, entry] of cache.entries()) {
if (!cacheKey.startsWith(prefix) || !cacheKey.endsWith(suffix)) {
continue;
}
if (Date.now() - entry.ts > CACHE_TTL_MS) {
cache.delete(cacheKey);
continue;
}
const remoteJid = cacheKey.slice(prefix.length, cacheKey.length - suffix.length);
const candidate = {
remoteJid,
participant: entry.participant,
participantE164: entry.participantE164,
body: entry.body,
fromMe: entry.fromMe,
};
if (!matchesQuotedConversationTarget(targetJid, candidate)) {
continue;
}
if (matched) {
return undefined;
}
matched = candidate;
}
return matched;
}
export function buildQuotedMessageOptions(params: {
messageId?: string | null;
remoteJid?: string | null;
fromMe?: boolean;
participant?: string;
/** Original message text — shown in the quote preview bubble. */
messageText?: string;
}): MiscMessageGenerationOptions | undefined {
const id = params.messageId?.trim();
const remoteJid = params.remoteJid?.trim();
if (!id || !remoteJid) {
return undefined;
}
return {
quoted: {
key: {
remoteJid,
id,
fromMe: params.fromMe ?? false,
participant: params.participant,
},
message: { conversation: params.messageText ?? "" },
},
} as MiscMessageGenerationOptions;
}

View File

@@ -62,6 +62,13 @@ export async function sendMessageWhatsApp(
mediaReadFile?: (filePath: string) => Promise<Buffer>;
gifPlayback?: boolean;
accountId?: string;
quotedMessageKey?: {
id: string;
remoteJid: string;
fromMe: boolean;
participant?: string;
messageText?: string;
};
},
): Promise<{ messageId: string; toJid: string }> {
let text = body.trimStart();
@@ -135,10 +142,11 @@ export async function sendMessageWhatsApp(
const hasExplicitAccountId = Boolean(options.accountId?.trim());
const accountId = hasExplicitAccountId ? resolvedAccountId : undefined;
const sendOptions: ActiveWebSendOptions | undefined =
options.gifPlayback || accountId || documentFileName
options.gifPlayback || accountId || documentFileName || options.quotedMessageKey
? {
...(options.gifPlayback ? { gifPlayback: true } : {}),
...(documentFileName ? { fileName: documentFileName } : {}),
...(options.quotedMessageKey ? { quotedMessageKey: options.quotedMessageKey } : {}),
accountId,
}
: undefined;

View File

@@ -566,6 +566,7 @@ export async function runAgentTurnWithFallback(params: {
commandBody: string;
followupRun: FollowupRun;
sessionCtx: TemplateContext;
replyThreading?: TemplateContext["ReplyThreading"];
replyOperation?: ReplyOperation;
opts?: GetReplyOptions;
typingSignals: TypingSignaler;
@@ -842,6 +843,7 @@ export async function runAgentTurnWithFallback(params: {
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId: params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
replyThreading: params.replyThreading,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: replyMediaContext.normalizePayload,

View File

@@ -250,7 +250,7 @@ describe("runReplyAgent media path normalization", () => {
text: undefined,
mediaUrl: "/tmp/outbound-media/1-chart.png",
mediaUrls: ["/tmp/outbound-media/1-chart.png"],
replyToCurrent: false,
replyToCurrent: undefined,
replyToId: "msg-1",
replyToTag: false,
audioAsVoice: false,

View File

@@ -891,6 +891,7 @@ export async function runReplyAgent(params: {
shouldInjectGroupIntro: boolean;
typingMode: TypingMode;
resetTriggered?: boolean;
replyThreadingOverride?: TemplateContext["ReplyThreading"];
replyOperation?: ReplyOperation;
}): Promise<ReplyPayload | ReplyPayload[] | undefined> {
const {
@@ -921,6 +922,7 @@ export async function runReplyAgent(params: {
shouldInjectGroupIntro,
typingMode,
resetTriggered,
replyThreadingOverride,
replyOperation: providedReplyOperation,
} = params;
@@ -1197,6 +1199,7 @@ export async function runReplyAgent(params: {
commandBody,
followupRun,
sessionCtx,
replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading,
replyOperation,
opts,
typingSignals,
@@ -1350,6 +1353,7 @@ export async function runReplyAgent(params: {
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
}
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
const payloadResult = await buildReplyPayloads({
payloads: payloadArray,
isHeartbeat,
@@ -1360,8 +1364,8 @@ export async function runReplyAgent(params: {
directlySentBlockKeys,
replyToMode,
replyToChannel,
currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
replyThreading: sessionCtx.ReplyThreading,
currentMessageId,
replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading,
messageProvider: followupRun.run.messageProvider,
messagingToolSentTexts: runResult.messagingToolSentTexts,
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,

View File

@@ -1,8 +1,12 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { HandleCommandsParams } from "./commands-types.js";
import type { CommandHandler, HandleCommandsParams } from "./commands-types.js";
const loadCommandHandlersMock = vi.hoisted(
(): ReturnType<typeof vi.fn<() => CommandHandler[]>> => vi.fn<() => CommandHandler[]>(() => []),
);
vi.mock("./commands-handlers.runtime.js", () => ({
loadCommandHandlers: () => [],
loadCommandHandlers: () => loadCommandHandlersMock(),
}));
vi.mock("./commands-reset.js", () => ({
@@ -13,8 +17,6 @@ vi.mock("../commands-registry.js", () => ({
shouldHandleTextCommands: vi.fn(() => true),
}));
import { handleCommands } from "./commands-core.js";
function makeParams(): HandleCommandsParams {
return {
cfg: {
@@ -76,9 +78,12 @@ function makeParams(): HandleCommandsParams {
describe("handleCommands send policy", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.resetModules();
loadCommandHandlersMock.mockReturnValue([]);
});
it("allows processing to continue even when send policy is deny (#53328)", async () => {
const { handleCommands } = await import("./commands-core.js");
// sendPolicy deny now only suppresses outbound delivery, not inbound processing.
// The deny gate moved to dispatch-from-config.ts where it suppresses delivery
// after the agent has processed the message.
@@ -86,4 +91,29 @@ describe("handleCommands send policy", () => {
expect(result).toEqual({ shouldContinue: true });
});
it("marks command replies as non-threaded", async () => {
const { handleCommands } = await import("./commands-core.js");
loadCommandHandlersMock.mockReturnValue([
vi.fn(async () => ({
shouldContinue: false,
reply: {
text: "done",
replyToId: "msg-123",
replyToCurrent: true,
},
})),
]);
const result = await handleCommands(makeParams());
expect(result).toEqual({
shouldContinue: false,
reply: {
text: "done",
replyToId: undefined,
replyToCurrent: false,
},
});
});
});

View File

@@ -17,13 +17,27 @@ function loadCommandHandlersRuntime() {
let HANDLERS: CommandHandler[] | null = null;
function normalizeCommandHandlerResult(result: CommandHandlerResult): CommandHandlerResult {
if (!result.reply) {
return result;
}
return {
...result,
reply: {
...result.reply,
replyToId: undefined,
replyToCurrent: false,
},
};
}
export async function handleCommands(params: HandleCommandsParams): Promise<CommandHandlerResult> {
if (HANDLERS === null) {
HANDLERS = (await loadCommandHandlersRuntime()).loadCommandHandlers();
}
const resetResult = await maybeHandleResetCommand(params);
if (resetResult) {
return resetResult;
return normalizeCommandHandlerResult(resetResult);
}
const allowTextCommands = shouldHandleTextCommands({
@@ -35,7 +49,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
for (const handler of HANDLERS) {
const result = await handler(params, allowTextCommands);
if (result) {
return result;
return normalizeCommandHandlerResult(result);
}
}

View File

@@ -503,12 +503,23 @@ describe("runPreparedReply media-only handling", () => {
it("does not send a standalone reset notice for reply-producing /new turns", async () => {
await runPreparedReply(
baseParams({
ctx: {
Body: "/new",
RawBody: "/new",
CommandBody: "/new",
},
command: {
...(baseParams().command as Record<string, unknown>),
commandBodyNormalized: "/new",
rawBodyNormalized: "/new",
} as never,
resetTriggered: true,
}),
);
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
expect(call?.resetTriggered).toBe(true);
expect(call?.replyThreadingOverride).toEqual({ implicitCurrentMessage: "deny" });
expect(vi.mocked(routeReply)).not.toHaveBeenCalled();
});
@@ -541,6 +552,7 @@ describe("runPreparedReply media-only handling", () => {
"User note for this reset turn (treat as ordinary user input, not startup instructions):",
);
expect(call?.followupRun.prompt).toContain("re-read persona files");
expect(call?.replyThreadingOverride).toEqual({ implicitCurrentMessage: "deny" });
});
it("does not emit a reset notice when /new is attempted during gateway drain", async () => {

View File

@@ -766,6 +766,14 @@ export async function runPreparedReply(
},
};
const replyThreadingOverride =
isBareSessionReset && sessionCtx.ReplyThreading?.implicitCurrentMessage !== "deny"
? {
...sessionCtx.ReplyThreading,
implicitCurrentMessage: "deny" as const,
}
: undefined;
return runReplyAgent({
commandBody: prefixedCommandBody,
followupRun,
@@ -799,5 +807,6 @@ export async function runPreparedReply(
shouldInjectGroupIntro,
typingMode,
resetTriggered: effectiveResetTriggered,
replyThreadingOverride,
});
}

View File

@@ -111,6 +111,38 @@ describe("createBlockReplyDeliveryHandler", () => {
});
});
it("suppresses implicit current-message threading for block replies when reply threading denies it", async () => {
const blockReplyPipeline = {
enqueue: vi.fn(),
} as unknown as BlockReplyPipelineLike;
const handler = createBlockReplyDeliveryHandler({
onBlockReply: vi.fn(async () => {}),
currentMessageId: "msg-123",
replyThreading: { implicitCurrentMessage: "deny" },
normalizeStreamingText: (payload) => ({ text: payload.text, skip: false }),
applyReplyToMode: (payload) => payload,
typingSignals: {
signalTextDelta: vi.fn(async () => {}),
} as unknown as TypingSignaler,
blockStreamingEnabled: true,
blockReplyPipeline,
directlySentBlockKeys: new Set(),
});
await handler({ text: "reset intro" });
expect(blockReplyPipeline.enqueue).toHaveBeenCalledWith({
text: "reset intro",
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: undefined,
replyToTag: undefined,
audioAsVoice: false,
mediaUrls: undefined,
});
});
it("parses media directives in block replies before path normalization", () => {
const normalized = normalizeReplyPayloadDirectives({
payload: { text: "Result\nMEDIA: ./image.png" },
@@ -139,6 +171,16 @@ describe("createBlockReplyDeliveryHandler", () => {
});
});
it("does not mark plain replies as explicit reply_to_current opt-outs", () => {
const normalized = normalizeReplyPayloadDirectives({
payload: { text: "plain reply" },
trimLeadingWhitespace: true,
parseMode: "auto",
});
expect(normalized.payload.replyToCurrent).toBeUndefined();
});
it("passes normalized media block replies through media path normalization", async () => {
const blockReplyPipeline = {
enqueue: vi.fn(),
@@ -169,7 +211,7 @@ describe("createBlockReplyDeliveryHandler", () => {
mediaUrl: absPath,
mediaUrls: [absPath],
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
});

View File

@@ -2,7 +2,7 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-pay
import { logVerbose } from "../../globals.js";
import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { BlockReplyContext, ReplyPayload } from "../types.js";
import type { BlockReplyContext, ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
import { createBlockReplyContentKey } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
@@ -77,6 +77,7 @@ async function sendDirectBlockReply(params: {
export function createBlockReplyDeliveryHandler(params: {
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
currentMessageId?: string;
replyThreading?: ReplyThreadingPolicy;
normalizeStreamingText: (payload: ReplyPayload) => { text?: string; skip: boolean };
applyReplyToMode: (payload: ReplyPayload) => ReplyPayload;
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
@@ -91,6 +92,13 @@ export function createBlockReplyDeliveryHandler(params: {
return;
}
const implicitCurrentMessageAllowed =
payload.replyToCurrent === true
? true
: payload.replyToCurrent === false
? false
: params.replyThreading?.implicitCurrentMessage !== "deny";
const taggedPayload = applyReplyTagsToPayload(
{
...payload,
@@ -98,7 +106,7 @@ export function createBlockReplyDeliveryHandler(params: {
mediaUrl: payload.mediaUrl ?? payload.mediaUrls?.[0],
replyToId:
payload.replyToId ??
(payload.replyToCurrent === false ? undefined : params.currentMessageId),
(implicitCurrentMessageAllowed ? params.currentMessageId : undefined),
},
params.currentMessageId,
);

View File

@@ -7,7 +7,7 @@ export type ReplyDirectiveParseResult = {
mediaUrls?: string[];
mediaUrl?: string;
replyToId?: string;
replyToCurrent: boolean;
replyToCurrent?: boolean;
replyToTag: boolean;
audioAsVoice?: boolean;
isSilent: boolean;
@@ -41,7 +41,7 @@ export function parseReplyDirectives(
mediaUrls: split.mediaUrls,
mediaUrl: split.mediaUrl,
replyToId: replyParsed.replyToId,
replyToCurrent: replyParsed.replyToCurrent,
replyToCurrent: replyParsed.replyToCurrent || undefined,
replyToTag: replyParsed.hasReplyTag,
audioAsVoice: split.audioAsVoice,
isSilent,

View File

@@ -101,6 +101,39 @@ describe("resolveReplyToMode", () => {
),
).toBe("first");
});
it("uses registered channel threading adapters for runtime reply-mode resolution", () => {
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "whatsapp",
source: "test",
plugin: {
id: "whatsapp",
meta: {
id: "whatsapp",
label: "WhatsApp",
selectionLabel: "WhatsApp",
docsPath: "/channels/whatsapp",
blurb: "test stub.",
},
capabilities: { chatTypes: ["direct", "group"] },
config: {
listAccountIds: () => ["default"],
resolveAccount: () => ({}),
},
threading: {
resolveReplyToMode: ({ accountId }: { accountId?: string | null }) =>
accountId === "work" ? "first" : "all",
},
},
},
]),
);
expect(resolveReplyToMode({} as OpenClawConfig, "whatsapp", "work", "group")).toBe("first");
expect(resolveReplyToMode({} as OpenClawConfig, "whatsapp", "default", "group")).toBe("all");
});
});
describe("resolveConfiguredReplyToMode", () => {

View File

@@ -1,3 +1,4 @@
import { getChannelPlugin } from "../../channels/plugins/index.js";
import type { ChannelThreadingAdapter } from "../../channels/plugins/types.core.js";
import { normalizeAnyChannelId } from "../../channels/registry.js";
import type { ReplyToMode } from "../../config/types.js";
@@ -74,8 +75,17 @@ export function resolveReplyToMode(
accountId?: string | null,
chatType?: string | null,
): ReplyToMode {
void accountId;
return resolveConfiguredReplyToMode(cfg, channel, chatType);
const normalizedAccountId = normalizeOptionalLowercaseString(accountId);
if (!normalizedAccountId) {
return resolveConfiguredReplyToMode(cfg, channel, chatType);
}
const provider = normalizeAnyChannelId(channel) ?? normalizeOptionalLowercaseString(channel);
const threading = provider ? getChannelPlugin(provider)?.threading : undefined;
return resolveReplyToModeWithThreading(cfg, threading, {
channel,
accountId: normalizedAccountId,
chatType,
});
}
export function createReplyToModeFilter(

View File

@@ -171,7 +171,8 @@ export function createStreamingDirectiveAccumulator() {
const parsed = parseChunk(combined, { silentToken: options.silentToken });
const hasTag = activeReply.hasTag || pendingReply.hasTag || parsed.replyToTag;
const sawCurrent = activeReply.sawCurrent || pendingReply.sawCurrent || parsed.replyToCurrent;
const sawCurrent =
activeReply.sawCurrent || pendingReply.sawCurrent || parsed.replyToCurrent === true;
const explicitId =
parsed.replyToExplicitId ?? pendingReply.explicitId ?? activeReply.explicitId;

View File

@@ -15227,6 +15227,26 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
minimum: 0,
maximum: 9007199254740991,
},
replyToMode: {
anyOf: [
{
type: "string",
const: "off",
},
{
type: "string",
const: "first",
},
{
type: "string",
const: "all",
},
{
type: "string",
const: "batched",
},
],
},
heartbeat: {
type: "object",
properties: {
@@ -15495,6 +15515,26 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
minimum: 0,
maximum: 9007199254740991,
},
replyToMode: {
anyOf: [
{
type: "string",
const: "off",
},
{
type: "string",
const: "first",
},
{
type: "string",
const: "all",
},
{
type: "string",
const: "batched",
},
],
},
heartbeat: {
type: "object",
properties: {

View File

@@ -5,6 +5,7 @@ import type {
DmPolicy,
GroupPolicy,
MarkdownConfig,
ReplyToMode,
} from "./types.base.js";
import type {
ChannelHealthMonitorConfig,
@@ -102,6 +103,8 @@ type WhatsAppSharedConfig = {
reactionLevel?: WhatsAppReactionLevel;
/** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */
debounceMs?: number;
/** Reply threading mode for auto-replies (off|first|all|batched). */
replyToMode?: ReplyToMode;
/** Heartbeat visibility settings. */
heartbeat?: ChannelHeartbeatVisibilityConfig;
/** Channel health monitor overrides for this channel/account. */

View File

@@ -13,6 +13,7 @@ import {
DmPolicySchema,
GroupPolicySchema,
MarkdownConfigSchema,
ReplyToModeSchema,
} from "./zod-schema.core.js";
const ToolPolicyBySenderSchema = z.record(z.string(), ToolPolicySchema).optional();
@@ -81,6 +82,7 @@ function buildWhatsAppCommonShape(params: { useDefaults: boolean }) {
debounceMs: params.useDefaults
? z.number().int().nonnegative().optional().default(0)
: z.number().int().nonnegative().optional(),
replyToMode: ReplyToModeSchema.optional(),
heartbeat: ChannelHeartbeatVisibilitySchema,
healthMonitor: ChannelHealthMonitorSchema,
};

View File

@@ -95,6 +95,8 @@ export const SendParamsSchema = Type.Object(
accountId: Type.Optional(Type.String()),
/** Optional agent id for per-agent media root resolution on gateway sends. */
agentId: Type.Optional(Type.String()),
/** Reply target message id for native quoted/threaded sends where supported. */
replyToId: Type.Optional(Type.String()),
/** Thread id (channel-specific meaning, e.g. Telegram forum topic id). */
threadId: Type.Optional(Type.String()),
/** Optional session key for mirroring delivered output back into the transcript. */

View File

@@ -1006,6 +1006,42 @@ describe("gateway send mirroring", () => {
);
});
it("forwards replyToId on gateway sends", async () => {
mocks.resolveOutboundTarget.mockReturnValue({ ok: true, to: "123" });
mocks.deliverOutboundPayloads.mockResolvedValue([{ messageId: "m-reply", channel: "slack" }]);
const outboundPlugin = { outbound: { sendPoll: mocks.sendPoll } };
mocks.getChannelPlugin.mockReturnValue(outboundPlugin);
const { respond } = await runSend({
to: "123",
message: "threaded completion",
channel: "slack",
replyToId: "wamid.42",
idempotencyKey: "idem-reply-to",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
channel: "slack",
to: "123",
replyToId: "wamid.42",
}),
);
expect(mocks.resolveOutboundSessionRoute).toHaveBeenCalledWith(
expect.objectContaining({
channel: "slack",
target: "123",
replyToId: "wamid.42",
}),
);
expect(respond).toHaveBeenCalledWith(
true,
expect.objectContaining({ messageId: "m-reply" }),
undefined,
expect.objectContaining({ channel: "slack" }),
);
});
it("dispatches message actions through the gateway for plugin-owned channels", async () => {
const reactPlugin: ChannelPlugin = {
id: "whatsapp",

View File

@@ -393,6 +393,7 @@ export const sendHandlers: GatewayRequestHandlers = {
channel?: string;
accountId?: string;
agentId?: string;
replyToId?: string;
threadId?: string;
sessionKey?: string;
idempotencyKey: string;
@@ -430,6 +431,7 @@ export const sendHandlers: GatewayRequestHandlers = {
}
const { cfg, channel } = resolvedChannel;
const accountId = normalizeOptionalString(request.accountId);
const replyToId = normalizeOptionalString(request.replyToId);
const threadId = normalizeOptionalString(request.threadId);
const outboundChannel = channel;
const plugin = resolveOutboundChannelPlugin({ channel, cfg });
@@ -485,6 +487,7 @@ export const sendHandlers: GatewayRequestHandlers = {
target: deliveryTarget,
currentSessionKey: providedSessionKey,
resolvedTarget: idLikeTarget,
replyToId,
threadId,
});
const providedSessionBaseKey =
@@ -530,6 +533,7 @@ export const sendHandlers: GatewayRequestHandlers = {
to: deliveryTarget,
accountId,
payloads: outboundPayloads,
replyToId: replyToId ?? null,
session: outboundSession,
gifPlayback: request.gifPlayback,
threadId: outboundRoute?.threadId ?? threadId ?? null,

View File

@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import {
prepareOutboundMirrorRoute,
resolveAndApplyOutboundReplyToId,
resolveAndApplyOutboundThreadId,
} from "./message-action-threading.js";
@@ -164,4 +165,147 @@ describe("message action threading helpers", () => {
expect(resolved).toBe("thread-777");
expect(actionParams.threadId).toBe("thread-777");
});
it("inherits currentMessageId for same-target sends when replyToMode=all", () => {
const actionParams: Record<string, unknown> = {
channel: "workspace",
target: "channel:C123",
message: "hi",
};
const resolved = resolveAndApplyOutboundReplyToId(actionParams, {
channel: "workspace",
toolContext: {
currentChannelId: "channel:C123",
currentMessageId: "msg-42",
replyToMode: "all",
},
});
expect(resolved).toBe("msg-42");
expect(actionParams.replyTo).toBe("msg-42");
});
it("skips inherited reply threading for batched mode", () => {
const actionParams: Record<string, unknown> = {
channel: "workspace",
target: "channel:C123",
message: "hi",
};
const resolved = resolveAndApplyOutboundReplyToId(actionParams, {
channel: "workspace",
toolContext: {
currentChannelId: "channel:C123",
currentMessageId: "msg-42",
replyToMode: "batched",
},
});
expect(resolved).toBeUndefined();
expect(actionParams.replyTo).toBeUndefined();
});
it("consumes first-mode inherited reply threading only once", () => {
const actionParams: Record<string, unknown> = {
channel: "workspace",
target: "channel:C123",
message: "hi",
};
const hasRepliedRef = { value: false };
const firstResolved = resolveAndApplyOutboundReplyToId(actionParams, {
channel: "workspace",
toolContext: {
currentChannelId: "channel:C123",
currentMessageId: "msg-42",
replyToMode: "first",
hasRepliedRef,
},
});
const secondResolved = resolveAndApplyOutboundReplyToId(
{
channel: "workspace",
target: "channel:C123",
message: "followup",
},
{
channel: "workspace",
toolContext: {
currentChannelId: "channel:C123",
currentMessageId: "msg-42",
replyToMode: "first",
hasRepliedRef,
},
},
);
expect(firstResolved).toBe("msg-42");
expect(secondResolved).toBeUndefined();
expect(hasRepliedRef.value).toBe(true);
});
it("consumes first-mode when the first send uses an explicit replyTo", () => {
const hasRepliedRef = { value: false };
const explicitResolved = resolveAndApplyOutboundReplyToId(
{
channel: "workspace",
target: "channel:C123",
message: "first",
replyTo: "explicit-1",
},
{
channel: "workspace",
toolContext: {
currentChannelId: "channel:C123",
currentMessageId: "msg-42",
replyToMode: "first",
hasRepliedRef,
},
},
);
const inheritedResolved = resolveAndApplyOutboundReplyToId(
{
channel: "workspace",
target: "channel:C123",
message: "followup",
},
{
channel: "workspace",
toolContext: {
currentChannelId: "channel:C123",
currentMessageId: "msg-42",
replyToMode: "first",
hasRepliedRef,
},
},
);
expect(explicitResolved).toBe("explicit-1");
expect(inheritedResolved).toBeUndefined();
expect(hasRepliedRef.value).toBe(true);
});
it("does not inherit reply threading across providers even when target ids match", () => {
const actionParams: Record<string, unknown> = {
channel: "discord",
target: "channel:C123",
message: "hi",
};
const resolved = resolveAndApplyOutboundReplyToId(actionParams, {
channel: "discord",
toolContext: {
currentChannelId: "channel:C123",
currentChannelProvider: "slack",
currentMessageId: "msg-42",
replyToMode: "all",
},
});
expect(resolved).toBeUndefined();
expect(actionParams.replyTo).toBeUndefined();
});
});

View File

@@ -59,6 +59,7 @@ import {
} from "./message-action-params.js";
import {
prepareOutboundMirrorRoute,
resolveAndApplyOutboundReplyToId,
resolveAndApplyOutboundThreadId,
} from "./message-action-threading.js";
import type { MessagePollResult, MessageSendResult } from "./message.js";
@@ -567,7 +568,10 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
const bestEffort = readBooleanParam(params, "bestEffort");
const silent = readBooleanParam(params, "silent");
const replyToId = readStringParam(params, "replyTo");
const replyToId = resolveAndApplyOutboundReplyToId(params, {
channel,
toolContext: input.toolContext,
});
const { resolvedThreadId, outboundRoute } = await prepareOutboundMirrorRoute({
cfg,
channel,

View File

@@ -39,7 +39,82 @@ function resolveOutboundThreadId(
}
export function createOutboundThreadingMock() {
const resolveOutboundReplyToId = vi.fn(
(
actionParams: Record<string, unknown>,
context: {
channel: string;
toolContext?: {
currentChannelId?: string;
currentChannelProvider?: string;
currentMessageId?: string | number;
replyToMode?: "off" | "first" | "all" | "batched";
hasRepliedRef?: { value: boolean };
};
},
) => {
const explicitReplyTo =
typeof actionParams.replyTo === "string" ? actionParams.replyTo.trim() : "";
if (explicitReplyTo) {
if (context.toolContext?.replyToMode === "first" && context.toolContext.hasRepliedRef) {
context.toolContext.hasRepliedRef.value = true;
}
return explicitReplyTo;
}
const currentChannelId = context.toolContext?.currentChannelId?.trim();
const currentChannelProvider = context.toolContext?.currentChannelProvider?.trim();
if (
!currentChannelId ||
(currentChannelProvider && currentChannelProvider !== context.channel)
) {
return undefined;
}
const explicitTarget =
typeof actionParams.target === "string"
? actionParams.target
: typeof actionParams.to === "string"
? actionParams.to
: typeof actionParams.channelId === "string"
? actionParams.channelId
: undefined;
if (explicitTarget && explicitTarget.trim() !== currentChannelId) {
return undefined;
}
const currentMessageId = context.toolContext?.currentMessageId;
if (currentMessageId == null) {
return undefined;
}
const replyToMode = context.toolContext?.replyToMode ?? "off";
if (replyToMode === "off" || replyToMode === "batched") {
return undefined;
}
if (replyToMode === "first") {
const hasRepliedRef = context.toolContext?.hasRepliedRef;
if (hasRepliedRef?.value) {
return undefined;
}
if (hasRepliedRef) {
hasRepliedRef.value = true;
}
}
const resolvedReplyToId =
typeof currentMessageId === "number" ? String(currentMessageId) : currentMessageId.trim();
if (!resolvedReplyToId) {
return undefined;
}
actionParams.replyTo = resolvedReplyToId;
return resolvedReplyToId;
},
);
return {
resolveAndApplyOutboundReplyToId: resolveOutboundReplyToId,
resolveAndApplyOutboundThreadId: vi.fn(resolveOutboundThreadId),
prepareOutboundMirrorRoute: vi.fn(
async ({

View File

@@ -39,6 +39,79 @@ export function resolveAndApplyOutboundThreadId(
return resolved ?? undefined;
}
function isSameConversationTarget(
actionParams: Record<string, unknown>,
channel: ChannelId,
toolContext?: ChannelThreadingToolContext,
): boolean {
const currentChannelId = toolContext?.currentChannelId?.trim();
if (!currentChannelId) {
return false;
}
const currentChannelProvider = toolContext?.currentChannelProvider?.trim();
if (currentChannelProvider && currentChannelProvider !== channel) {
return false;
}
const explicitTarget =
readStringParam(actionParams, "target") ??
readStringParam(actionParams, "to") ??
readStringParam(actionParams, "channelId");
if (!explicitTarget) {
return true;
}
return explicitTarget.trim() === currentChannelId;
}
export function resolveAndApplyOutboundReplyToId(
actionParams: Record<string, unknown>,
context: {
channel: ChannelId;
toolContext?: ChannelThreadingToolContext;
},
): string | undefined {
const explicitReplyToId = readStringParam(actionParams, "replyTo");
if (explicitReplyToId) {
if (context.toolContext?.replyToMode === "first") {
const hasRepliedRef = context.toolContext.hasRepliedRef;
if (hasRepliedRef) {
hasRepliedRef.value = true;
}
}
return explicitReplyToId;
}
if (!isSameConversationTarget(actionParams, context.channel, context.toolContext)) {
return undefined;
}
const currentMessageId = context.toolContext?.currentMessageId;
if (currentMessageId == null) {
return undefined;
}
const mode = context.toolContext?.replyToMode ?? "off";
if (mode === "off" || mode === "batched") {
return undefined;
}
if (mode === "first") {
const hasRepliedRef = context.toolContext?.hasRepliedRef;
if (hasRepliedRef?.value) {
return undefined;
}
if (hasRepliedRef) {
hasRepliedRef.value = true;
}
}
const resolvedReplyToId =
typeof currentMessageId === "number" ? String(currentMessageId) : currentMessageId.trim();
if (!resolvedReplyToId) {
return undefined;
}
actionParams.replyTo = resolvedReplyToId;
return resolvedReplyToId;
}
export async function prepareOutboundMirrorRoute(params: {
cfg: OpenClawConfig;
channel: ChannelId;

View File

@@ -310,6 +310,17 @@ describe("gateway url override hardening", () => {
},
},
},
{
name: "forwards replyToId in gateway send params",
params: {
replyToId: "wamid.42",
},
expected: {
params: {
replyToId: "wamid.42",
},
},
},
])("$name", async ({ params, expected }) => {
expect(await sendThreadChatGatewayMessage(params)).toMatchObject(expected);
});

View File

@@ -331,6 +331,7 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
accountId: params.accountId,
agentId: params.agentId,
channel,
replyToId: params.replyToId,
sessionKey: params.mirror?.sessionKey,
idempotencyKey: await resolveGatewayIdempotencyKey(params.idempotencyKey),
},

View File

@@ -47,7 +47,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"],
replyToId: "123",
replyToTag: true,
replyToCurrent: false,
replyToCurrent: undefined,
audioAsVoice: true,
},
]);
@@ -66,7 +66,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: undefined,
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -100,7 +100,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: undefined,
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -125,7 +125,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: undefined,
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -145,7 +145,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: undefined,
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -154,7 +154,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: undefined,
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -173,7 +173,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: ["https://x.test/one.png"],
mediaUrl: "https://x.test/one.png",
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -182,7 +182,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
mediaUrls: ["https://x.test/two.png"],
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: false,
replyToCurrent: undefined,
replyToTag: false,
audioAsVoice: false,
},
@@ -392,7 +392,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
"audioAsVoice": false,
"mediaUrl": undefined,
"mediaUrls": undefined,
"replyToCurrent": false,
"replyToCurrent": undefined,
"replyToId": undefined,
"replyToTag": false,
"text": "NO_REPLY with details",
@@ -401,7 +401,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
"audioAsVoice": false,
"mediaUrl": undefined,
"mediaUrls": undefined,
"replyToCurrent": false,
"replyToCurrent": undefined,
"replyToId": undefined,
"replyToTag": false,
"text": "{"action":"NO_REPLY","note":"keep"}",
@@ -412,7 +412,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
"mediaUrls": [
"https://x.test/m1.png",
],
"replyToCurrent": false,
"replyToCurrent": undefined,
"replyToId": undefined,
"replyToTag": false,
"text": "",
@@ -423,7 +423,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
"mediaUrls": [
"https://x.test/m2.png",
],
"replyToCurrent": false,
"replyToCurrent": undefined,
"replyToId": "444",
"replyToTag": true,
"text": "hi",
@@ -435,7 +435,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
},
"mediaUrl": undefined,
"mediaUrls": undefined,
"replyToCurrent": false,
"replyToCurrent": undefined,
"replyToId": undefined,
"replyToTag": false,
"text": "BTW
@@ -450,7 +450,7 @@ describe("normalizeReplyPayloadsForDelivery", () => {
},
"mediaUrl": undefined,
"mediaUrls": undefined,
"replyToCurrent": false,
"replyToCurrent": undefined,
"replyToId": undefined,
"replyToTag": false,
"text": "",