fix(whatsapp): remove ack reactions after replies

This commit is contained in:
Peter Steinberger
2026-04-26 05:35:24 +01:00
parent 427e485f76
commit 9b93b7df62
16 changed files with 329 additions and 32 deletions

View File

@@ -71,7 +71,6 @@ const expectAckReactionSent = (accountId: string) => {
expect.objectContaining({
verbose: false,
fromMe: false,
participant: undefined,
accountId,
}),
);
@@ -85,24 +84,27 @@ describe("maybeSendAckReaction", () => {
it.each(["ack", "minimal", "extensive"] as const)(
"sends ack reactions when reactionLevel is %s",
async (reactionLevel) => {
await runAckReaction({
const ackReaction = await runAckReaction({
cfg: createConfig(reactionLevel),
});
expect(ackReaction?.ackReactionValue).toBe("👀");
await expect(ackReaction?.ackReactionPromise).resolves.toBe(true);
expectAckReactionSent("default");
},
);
it("suppresses ack reactions when reactionLevel is off", async () => {
await runAckReaction({
const ackReaction = await runAckReaction({
cfg: createConfig("off"),
});
expect(ackReaction).toBeNull();
expect(hoisted.sendReactionWhatsApp).not.toHaveBeenCalled();
});
it("uses the active account reactionLevel override for ack gating", async () => {
await runAckReaction({
const ackReaction = await runAckReaction({
cfg: createConfig("off", {
accounts: {
work: {
@@ -117,6 +119,41 @@ describe("maybeSendAckReaction", () => {
accountId: "work",
});
expect(ackReaction?.ackReactionValue).toBe("👀");
expectAckReactionSent("work");
});
it("returns a handle that removes the ack with an empty reaction", async () => {
const ackReaction = await runAckReaction();
await ackReaction?.remove();
expect(hoisted.sendReactionWhatsApp).toHaveBeenLastCalledWith(
"15551234567@s.whatsapp.net",
"msg-1",
"",
expect.objectContaining({
verbose: false,
fromMe: false,
accountId: "default",
}),
);
});
it("records ack send failures on the handle", async () => {
const warn = vi.fn();
hoisted.sendReactionWhatsApp.mockRejectedValueOnce(new Error("session down"));
const ackReaction = await runAckReaction({ warn });
await expect(ackReaction?.ackReactionPromise).resolves.toBe(false);
expect(warn).toHaveBeenCalledWith(
expect.objectContaining({
error: "session down",
chatId: "15551234567@s.whatsapp.net",
messageId: "msg-1",
}),
"failed to send ack reaction",
);
});
});

View File

@@ -1,4 +1,8 @@
import { shouldAckReactionForWhatsApp } from "openclaw/plugin-sdk/channel-feedback";
import {
createAckReactionHandle,
shouldAckReactionForWhatsApp,
type AckReactionHandle,
} from "openclaw/plugin-sdk/channel-feedback";
import type { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { getSenderIdentity } from "../../identity.js";
@@ -18,9 +22,9 @@ export async function maybeSendAckReaction(params: {
accountId?: string;
info: (obj: unknown, msg: string) => void;
warn: (obj: unknown, msg: string) => void;
}) {
}): Promise<AckReactionHandle | null> {
if (!params.msg.id) {
return;
return null;
}
// Keep ackReaction as the emoji/scope control, while letting reactionLevel
@@ -30,7 +34,7 @@ export async function maybeSendAckReaction(params: {
accountId: params.accountId,
});
if (reactionLevel.level === "off") {
return;
return null;
}
const ackConfig = params.cfg.channels?.whatsapp?.ackReaction;
@@ -61,7 +65,7 @@ export async function maybeSendAckReaction(params: {
});
if (!shouldSendReaction()) {
return;
return null;
}
params.info(
@@ -69,21 +73,27 @@ export async function maybeSendAckReaction(params: {
"sending ack reaction",
);
const sender = getSenderIdentity(params.msg);
sendReactionWhatsApp(params.msg.chatId, params.msg.id, emoji, {
const reactionOptions = {
verbose: params.verbose,
fromMe: false,
participant: sender.jid ?? undefined,
accountId: params.accountId,
...(sender.jid ? { participant: sender.jid } : {}),
...(params.accountId ? { accountId: params.accountId } : {}),
cfg: params.cfg,
}).catch((err) => {
params.warn(
{
error: formatError(err),
chatId: params.msg.chatId,
messageId: params.msg.id,
},
"failed to send ack reaction",
);
logVerbose(`WhatsApp ack reaction failed for chat ${params.msg.chatId}: ${formatError(err)}`);
};
return createAckReactionHandle({
ackReactionValue: emoji,
send: () => sendReactionWhatsApp(params.msg.chatId, params.msg.id!, emoji, reactionOptions),
remove: () => sendReactionWhatsApp(params.msg.chatId, params.msg.id!, "", reactionOptions),
onSendError: (err) => {
params.warn(
{
error: formatError(err),
chatId: params.msg.chatId,
messageId: params.msg.id,
},
"failed to send ack reaction",
);
logVerbose(`WhatsApp ack reaction failed for chat ${params.msg.chatId}: ${formatError(err)}`);
},
});
}

View File

@@ -1,3 +1,4 @@
import type { AckReactionHandle } from "openclaw/plugin-sdk/channel-feedback";
import type { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import type { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
import { buildAgentSessionKey, deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing";
@@ -61,10 +62,12 @@ export async function maybeBroadcastMessage(params: {
suppressGroupHistoryClear?: boolean;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
ackReaction?: AckReactionHandle | null;
},
) => Promise<boolean>;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
ackReaction?: AckReactionHandle | null;
}) {
const broadcastAgents = params.cfg.broadcast?.[params.peerId];
if (!broadcastAgents || !Array.isArray(broadcastAgents)) {
@@ -113,6 +116,7 @@ export async function maybeBroadcastMessage(params: {
suppressGroupHistoryClear: true;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
ackReaction?: AckReactionHandle | null;
} = {
groupHistory: groupHistorySnapshot,
suppressGroupHistoryClear: true,
@@ -123,6 +127,9 @@ export async function maybeBroadcastMessage(params: {
if (params.ackAlreadySent === true) {
opts.ackAlreadySent = true;
}
if (params.ackReaction !== undefined) {
opts.ackReaction = params.ackReaction;
}
return await params.processMessage(params.msg, agentRoute, params.groupHistoryKey, opts);
} catch (err) {
whatsappInboundLog.error(`Broadcast agent ${agentId} failed: ${formatError(err)}`);

View File

@@ -5,6 +5,11 @@ const transcribeFirstAudioMock = vi.fn();
const maybeSendAckReactionMock = vi.fn();
const processMessageMock = vi.fn();
const maybeBroadcastMessageMock = vi.fn();
const ackReactionHandle = {
ackReactionPromise: Promise.resolve(true),
ackReactionValue: "👀",
remove: vi.fn(async () => undefined),
};
vi.mock("./audio-preflight.runtime.js", () => ({
transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args),
@@ -113,6 +118,7 @@ describe("createWebOnMessageHandler audio preflight", () => {
maybeSendAckReactionMock.mockReset();
maybeSendAckReactionMock.mockImplementation(async () => {
events.push("ack");
return ackReactionHandle;
});
transcribeFirstAudioMock.mockReset();
transcribeFirstAudioMock.mockImplementation(async () => {
@@ -158,12 +164,12 @@ describe("createWebOnMessageHandler audio preflight", () => {
expect.objectContaining({
preflightAudioTranscript: "transcribed voice note",
ackAlreadySent: true,
ackReaction: ackReactionHandle,
}),
);
});
it("skips early DM ack/preflight when access-control was not explicitly passed through", async () => {
const handler = createWebOnMessageHandler({
cfg: {
channels: {
@@ -206,9 +212,14 @@ describe("createWebOnMessageHandler audio preflight", () => {
it("preserves per-agent ack checks for group broadcast voice notes", async () => {
maybeBroadcastMessageMock.mockImplementation(
async (params: { ackAlreadySent?: boolean; preflightAudioTranscript?: string | null }) => {
async (params: {
ackAlreadySent?: boolean;
ackReaction?: unknown;
preflightAudioTranscript?: string | null;
}) => {
expect(params.preflightAudioTranscript).toBe("transcribed voice note");
expect(params.ackAlreadySent).toBeUndefined();
expect(params.ackReaction).toBeUndefined();
return true;
},
);

View File

@@ -1,3 +1,4 @@
import type { AckReactionHandle } from "openclaw/plugin-sdk/channel-feedback";
import type { getReplyFromConfig } from "openclaw/plugin-sdk/reply-runtime";
import type { MsgContext } from "openclaw/plugin-sdk/reply-runtime";
import { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
@@ -42,6 +43,7 @@ export function createWebOnMessageHandler(params: {
suppressGroupHistoryClear?: boolean;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
ackReaction?: AckReactionHandle | null;
},
) => {
const processParams: Parameters<typeof processMessage>[0] = {
@@ -74,6 +76,9 @@ export function createWebOnMessageHandler(params: {
if (opts?.ackAlreadySent === true) {
processParams.ackAlreadySent = true;
}
if (opts?.ackReaction !== undefined) {
processParams.ackReaction = opts.ackReaction;
}
return processMessage(processParams);
};
@@ -186,8 +191,9 @@ export function createWebOnMessageHandler(params: {
msg.mediaType?.startsWith("audio/") === true && msg.body === "<media:audio>";
const canRunEarlyDmPreflight = msg.chatType === "group" || msg.accessControlPassed === true;
let ackAlreadySent = false;
let ackReaction: AckReactionHandle | null = null;
if (canRunEarlyDmPreflight && hasAudioBody && msg.mediaPath) {
await maybeSendAckReaction({
ackReaction = await maybeSendAckReaction({
cfg: params.cfg,
msg,
agentId: route.agentId,
@@ -198,7 +204,7 @@ export function createWebOnMessageHandler(params: {
info: params.replyLogger.info.bind(params.replyLogger),
warn: params.replyLogger.warn.bind(params.replyLogger),
});
ackAlreadySent = true;
ackAlreadySent = ackReaction !== null;
try {
const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js");
// transcribeFirstAudio returns undefined on failure/disabled; store null so
@@ -232,6 +238,7 @@ export function createWebOnMessageHandler(params: {
// preflight ack attempt on the base route must not suppress downstream
// per-agent checks during broadcast fan-out.
...(ackAlreadySent && msg.chatType !== "group" ? { ackAlreadySent: true } : {}),
...(ackReaction && msg.chatType !== "group" ? { ackReaction } : {}),
processMessage: (m, r, k, opts) => processForRoute(m, r, k, opts),
})
) {
@@ -241,6 +248,7 @@ export function createWebOnMessageHandler(params: {
await processForRoute(msg, route, groupHistoryKey, {
...(preflightAudioTranscript !== undefined ? { preflightAudioTranscript } : {}),
...(ackAlreadySent ? { ackAlreadySent: true } : {}),
...(ackReaction ? { ackReaction } : {}),
});
};
}

View File

@@ -117,6 +117,11 @@ import { processMessage } from "./process-message.js";
type WebInboundMsg = Parameters<typeof processMessage>[0]["msg"];
type TestRoute = Parameters<typeof processMessage>[0]["route"];
const flushMicrotasks = async () => {
await Promise.resolve();
await Promise.resolve();
};
function makeAudioMsg(overrides: Partial<WebInboundMsg> = {}): WebInboundMsg {
return {
id: "msg-1",
@@ -172,11 +177,32 @@ function makeParams(msgOverrides: Partial<WebInboundMsg> = {}) {
};
}
function makeAckReactionHandle() {
return {
ackReactionPromise: Promise.resolve(true),
ackReactionValue: "👀",
remove: vi.fn(async () => undefined),
};
}
function makeRemoveAckAfterReplyParams() {
return {
...makeParams(),
cfg: {
tools: { media: { audio: { enabled: true } } },
channels: { whatsapp: {} },
commands: { useAccessGroups: false },
messages: { removeAckAfterReply: true },
} as never,
preflightAudioTranscript: "pre-computed transcript from caller",
};
}
describe("processMessage audio preflight transcription", () => {
beforeEach(() => {
transcribeFirstAudioMock.mockReset();
maybeSendAckReactionMock.mockReset();
maybeSendAckReactionMock.mockResolvedValue(undefined);
maybeSendAckReactionMock.mockResolvedValue(null);
shouldComputeCommandResult = false;
shouldComputeCommandBodies = [];
vi.mocked(dispatchWhatsAppBufferedReply).mockClear();
@@ -317,11 +343,59 @@ describe("processMessage audio preflight transcription", () => {
...makeParams(),
preflightAudioTranscript: "pre-computed transcript from caller",
ackAlreadySent: true,
ackReaction: makeAckReactionHandle(),
});
expect(maybeSendAckReactionMock).not.toHaveBeenCalled();
});
it("removes caller-provided ack after a successful visible reply", async () => {
const ackReaction = makeAckReactionHandle();
await processMessage({
...makeRemoveAckAfterReplyParams(),
ackReaction,
});
await flushMicrotasks();
expect(ackReaction.remove).toHaveBeenCalledTimes(1);
});
it("removes internally sent ack after a successful visible reply", async () => {
const ackReaction = makeAckReactionHandle();
maybeSendAckReactionMock.mockResolvedValueOnce(ackReaction);
await processMessage(makeRemoveAckAfterReplyParams());
await flushMicrotasks();
expect(maybeSendAckReactionMock).toHaveBeenCalledTimes(1);
expect(ackReaction.remove).toHaveBeenCalledTimes(1);
});
it("keeps ack when no visible reply was delivered", async () => {
const ackReaction = makeAckReactionHandle();
maybeSendAckReactionMock.mockResolvedValueOnce(ackReaction);
vi.mocked(dispatchWhatsAppBufferedReply).mockResolvedValueOnce(false);
await processMessage(makeRemoveAckAfterReplyParams());
await flushMicrotasks();
expect(ackReaction.remove).not.toHaveBeenCalled();
});
it("keeps ack when the ack send failed", async () => {
const ackReaction = {
...makeAckReactionHandle(),
ackReactionPromise: Promise.resolve(false),
};
maybeSendAckReactionMock.mockResolvedValueOnce(ackReaction);
await processMessage(makeRemoveAckAfterReplyParams());
await flushMicrotasks();
expect(ackReaction.remove).not.toHaveBeenCalled();
});
it("skips internal STT when preflightAudioTranscript is null (failed preflight sentinel)", async () => {
// null = caller already attempted preflight but got nothing (provider unavailable,
// disabled, etc.). processMessage must NOT retry to avoid 1+N attempts in broadcast.

View File

@@ -1,3 +1,8 @@
import {
logAckFailure,
removeAckReactionHandleAfterReply,
type AckReactionHandle,
} from "openclaw/plugin-sdk/channel-feedback";
import {
createInternalHookEvent,
deriveInboundMessageHookContext,
@@ -192,6 +197,7 @@ export async function processMessage(params: {
groupHistory?: GroupHistoryEntry[];
suppressGroupHistoryClear?: boolean;
ackAlreadySent?: boolean;
ackReaction?: AckReactionHandle | null;
/** Pre-computed audio transcript from a caller-level preflight, used to avoid
* re-transcribing the same voice note once per broadcast agent.
* - string → transcript obtained; use it directly, skip internal STT
@@ -318,8 +324,9 @@ export async function processMessage(params: {
// Send ack reaction immediately upon message receipt (post-gating). Callers
// that do preflight work before processMessage can send it first and set
// ackAlreadySent so slow STT does not delay user-visible receipt feedback.
if (params.ackAlreadySent !== true) {
await maybeSendAckReaction({
let ackReaction = params.ackReaction ?? null;
if (!ackReaction && params.ackAlreadySent !== true) {
ackReaction = await maybeSendAckReaction({
cfg: params.cfg,
msg: params.msg,
agentId: params.route.agentId,
@@ -463,7 +470,7 @@ export async function processMessage(params: {
});
trackBackgroundTask(params.backgroundTasks, metaTask);
return dispatchWhatsAppBufferedReply({
const didSendReply = await dispatchWhatsAppBufferedReply({
cfg: params.cfg,
connectionId: params.connectionId,
context: ctxPayload,
@@ -485,6 +492,19 @@ export async function processMessage(params: {
route: params.route,
shouldClearGroupHistory,
});
removeAckReactionHandleAfterReply({
removeAfterReply: Boolean(params.cfg.messages?.removeAckAfterReply && didSendReply),
ackReaction,
onError: (err) => {
logAckFailure({
log: logVerbose,
channel: "whatsapp",
target: `${params.msg.chatId ?? conversationId}/${params.msg.id ?? "unknown"}`,
error: err,
});
},
});
return didSendReply;
}
export const __testing = {