WhatsApp: send ack before audio preflight

This commit is contained in:
Roger Deng
2026-04-19 15:42:13 +08:00
committed by Marcus Castro
parent 4fdc585494
commit cb043dd99a
5 changed files with 204 additions and 14 deletions

View File

@@ -60,9 +60,11 @@ export async function maybeBroadcastMessage(params: {
groupHistory?: GroupHistoryEntry[];
suppressGroupHistoryClear?: boolean;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
},
) => Promise<boolean>;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
}) {
const broadcastAgents = params.cfg.broadcast?.[params.peerId];
if (!broadcastAgents || !Array.isArray(broadcastAgents)) {
@@ -110,6 +112,7 @@ export async function maybeBroadcastMessage(params: {
groupHistory: groupHistorySnapshot,
suppressGroupHistoryClear: true,
preflightAudioTranscript: params.preflightAudioTranscript,
ackAlreadySent: params.ackAlreadySent,
});
} catch (err) {
whatsappInboundLog.error(`Broadcast agent ${agentId} failed: ${formatError(err)}`);

View File

@@ -0,0 +1,149 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const events: string[] = [];
const transcribeFirstAudioMock = vi.fn();
const maybeSendAckReactionMock = vi.fn();
const processMessageMock = vi.fn();
vi.mock("./audio-preflight.runtime.js", () => ({
transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args),
}));
vi.mock("./ack-reaction.js", () => ({
maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args),
}));
vi.mock("./process-message.js", () => ({
processMessage: (...args: unknown[]) => processMessageMock(...args),
}));
vi.mock("./broadcast.js", () => ({
maybeBroadcastMessage: vi.fn(async () => false),
}));
vi.mock("./group-gating.js", () => ({
applyGroupGating: vi.fn(async () => ({ shouldProcess: true })),
}));
vi.mock("./last-route.js", () => ({
updateLastRouteInBackground: () => {},
}));
vi.mock("./peer.js", () => ({
resolvePeerId: (msg: { from: string }) => msg.from,
}));
vi.mock("../config.runtime.js", () => ({
loadConfig: () => ({
channels: {
whatsapp: {
ackReaction: { enabled: true },
},
},
}),
}));
vi.mock("../../group-session-key.js", () => ({
resolveWhatsAppGroupSessionRoute: (route: unknown) => route,
}));
vi.mock("../../identity.js", () => ({
getPrimaryIdentityId: () => undefined,
getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }),
}));
vi.mock("../../text-runtime.js", () => ({
normalizeE164: (value: string) => value,
}));
vi.mock("openclaw/plugin-sdk/routing", () => ({
buildGroupHistoryKey: () => "group-key",
resolveAgentRoute: () => ({
agentId: "main",
accountId: "default",
sessionKey: "agent:main:whatsapp:+15550000002",
mainSessionKey: "agent:main:main",
}),
}));
import type { WebInboundMsg } from "../types.js";
import { createWebOnMessageHandler } from "./on-message.js";
function makeAudioMsg(): WebInboundMsg {
return {
id: "msg-1",
from: "+15550000002",
to: "+15550000001",
body: "<media:audio>",
chatType: "direct",
mediaType: "audio/ogg; codecs=opus",
mediaPath: "/tmp/voice.ogg",
timestamp: 1700000000,
accountId: "default",
} as WebInboundMsg;
}
function makeEchoTracker() {
return {
has: () => false,
forget: () => {},
rememberText: () => {},
buildCombinedKey: (p: { combinedBody: string }) => p.combinedBody,
};
}
describe("createWebOnMessageHandler audio preflight", () => {
beforeEach(() => {
events.length = 0;
maybeSendAckReactionMock.mockReset();
maybeSendAckReactionMock.mockImplementation(async () => {
events.push("ack");
});
transcribeFirstAudioMock.mockReset();
transcribeFirstAudioMock.mockImplementation(async () => {
events.push("stt");
return "transcribed voice note";
});
processMessageMock.mockReset();
processMessageMock.mockResolvedValue(true);
});
it("sends ack reaction before audio preflight for voice notes", async () => {
const handler = createWebOnMessageHandler({
cfg: {
channels: {
whatsapp: {
ackReaction: { enabled: true },
},
},
} as never,
verbose: false,
connectionId: "conn-1",
maxMediaBytes: 1024 * 1024,
groupHistoryLimit: 20,
groupHistories: new Map(),
groupMemberNames: new Map(),
echoTracker: makeEchoTracker() as never,
backgroundTasks: new Set(),
replyResolver: vi.fn() as never,
replyLogger: {
info: () => {},
warn: () => {},
debug: () => {},
error: () => {},
} as never,
baseMentionConfig: {} as never,
account: { authDir: "/tmp/auth", accountId: "default" },
});
await handler(makeAudioMsg());
expect(events).toEqual(["ack", "stt"]);
expect(processMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
preflightAudioTranscript: "transcribed voice note",
ackAlreadySent: true,
}),
);
});
});

View File

@@ -9,6 +9,7 @@ import { normalizeE164 } from "../../text-runtime.js";
import { loadConfig } from "../config.runtime.js";
import type { MentionConfig } from "../mentions.js";
import type { WebInboundMsg } from "../types.js";
import { maybeSendAckReaction } from "./ack-reaction.js";
import { maybeBroadcastMessage } from "./broadcast.js";
import type { EchoTracker } from "./echo.js";
import type { GroupHistoryEntry } from "./group-gating.js";
@@ -40,6 +41,7 @@ export function createWebOnMessageHandler(params: {
groupHistory?: GroupHistoryEntry[];
suppressGroupHistoryClear?: boolean;
preflightAudioTranscript?: string | null;
ackAlreadySent?: boolean;
},
) =>
processMessage({
@@ -62,6 +64,7 @@ export function createWebOnMessageHandler(params: {
groupHistory: opts?.groupHistory,
suppressGroupHistoryClear: opts?.suppressGroupHistoryClear,
preflightAudioTranscript: opts?.preflightAudioTranscript,
ackAlreadySent: opts?.ackAlreadySent,
});
return async (msg: WebInboundMsg) => {
@@ -168,7 +171,20 @@ export function createWebOnMessageHandler(params: {
let preflightAudioTranscript: string | null | undefined;
const hasAudioBody =
msg.mediaType?.startsWith("audio/") === true && msg.body === "<media:audio>";
let ackAlreadySent = false;
if (hasAudioBody && msg.mediaPath) {
await maybeSendAckReaction({
cfg: params.cfg,
msg,
agentId: route.agentId,
sessionKey: route.sessionKey,
conversationId,
verbose: params.verbose,
accountId: route.accountId,
info: params.replyLogger.info.bind(params.replyLogger),
warn: params.replyLogger.warn.bind(params.replyLogger),
});
ackAlreadySent = true;
try {
const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js");
// transcribeFirstAudio returns undefined on failure/disabled; store null so
@@ -198,12 +214,16 @@ export function createWebOnMessageHandler(params: {
groupHistoryKey,
groupHistories: params.groupHistories,
preflightAudioTranscript,
ackAlreadySent,
processMessage: (m, r, k, opts) => processForRoute(m, r, k, opts),
})
) {
return;
}
await processForRoute(msg, route, groupHistoryKey, { preflightAudioTranscript });
await processForRoute(msg, route, groupHistoryKey, {
preflightAudioTranscript,
ackAlreadySent,
});
};
}

View File

@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
// Mock the lazy-loaded audio preflight runtime boundary
const transcribeFirstAudioMock = vi.fn();
const maybeSendAckReactionMock = vi.fn();
vi.mock("./audio-preflight.runtime.js", () => ({
transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args),
@@ -43,7 +44,7 @@ vi.mock("../loggers.js", () => ({
}));
vi.mock("./ack-reaction.js", () => ({
maybeSendAckReaction: () => {},
maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args),
}));
vi.mock("./inbound-context.js", () => ({
@@ -161,6 +162,8 @@ function makeParams(msgOverrides: Partial<WebInboundMsg> = {}) {
describe("processMessage audio preflight transcription", () => {
beforeEach(() => {
transcribeFirstAudioMock.mockReset();
maybeSendAckReactionMock.mockReset();
maybeSendAckReactionMock.mockResolvedValue(undefined);
shouldComputeCommandResult = false;
vi.mocked(dispatchWhatsAppBufferedReply).mockClear();
});
@@ -287,6 +290,16 @@ describe("processMessage audio preflight transcription", () => {
});
});
it("does not send a duplicate ack when caller already sent it", async () => {
await processMessage({
...makeParams(),
preflightAudioTranscript: "pre-computed transcript from caller",
ackAlreadySent: true,
});
expect(maybeSendAckReactionMock).not.toHaveBeenCalled();
});
it("skips internal STT when preflightAudioTranscript is null (failed preflight sentinel)", async () => {
// null = caller already attempted preflight but got nothing (provider unavailable,
// disabled, etc.). processMessage must NOT retry to avoid 1+N attempts in broadcast.

View File

@@ -191,6 +191,7 @@ export async function processMessage(params: {
maxMediaTextChunkLimit?: number;
groupHistory?: GroupHistoryEntry[];
suppressGroupHistoryClear?: boolean;
ackAlreadySent?: boolean;
/** Pre-computed audio transcript from a caller-level preflight, used to avoid
* re-transcribing the same voice note once per broadcast agent.
* - string → transcript obtained; use it directly, skip internal STT
@@ -314,18 +315,22 @@ export async function processMessage(params: {
return false;
}
// Send ack reaction immediately upon message receipt (post-gating)
await maybeSendAckReaction({
cfg: params.cfg,
msg: params.msg,
agentId: params.route.agentId,
sessionKey: params.route.sessionKey,
conversationId,
verbose: params.verbose,
accountId: account.accountId,
info: params.replyLogger.info.bind(params.replyLogger),
warn: params.replyLogger.warn.bind(params.replyLogger),
});
// Send ack reaction immediately upon message receipt (post-gating). Callers
// that do preflight work before processMessage can send it first and set
// ackAlreadySent so slow STT does not delay user-visible receipt feedback.
if (params.ackAlreadySent !== true) {
await maybeSendAckReaction({
cfg: params.cfg,
msg: params.msg,
agentId: params.route.agentId,
sessionKey: params.route.sessionKey,
conversationId,
verbose: params.verbose,
accountId: account.accountId,
info: params.replyLogger.info.bind(params.replyLogger),
warn: params.replyLogger.warn.bind(params.replyLogger),
});
}
const correlationId = params.msg.id ?? newConnectionId();
params.replyLogger.info(