diff --git a/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts b/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts index 2c391706319..1c4f49b4add 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts @@ -60,9 +60,11 @@ export async function maybeBroadcastMessage(params: { groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; }, ) => Promise; preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; }) { const broadcastAgents = params.cfg.broadcast?.[params.peerId]; if (!broadcastAgents || !Array.isArray(broadcastAgents)) { @@ -110,6 +112,7 @@ export async function maybeBroadcastMessage(params: { groupHistory: groupHistorySnapshot, suppressGroupHistoryClear: true, preflightAudioTranscript: params.preflightAudioTranscript, + ackAlreadySent: params.ackAlreadySent, }); } catch (err) { whatsappInboundLog.error(`Broadcast agent ${agentId} failed: ${formatError(err)}`); diff --git a/extensions/whatsapp/src/auto-reply/monitor/on-message.audio-preflight.test.ts b/extensions/whatsapp/src/auto-reply/monitor/on-message.audio-preflight.test.ts new file mode 100644 index 00000000000..5d8c33c2b9e --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor/on-message.audio-preflight.test.ts @@ -0,0 +1,149 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const events: string[] = []; +const transcribeFirstAudioMock = vi.fn(); +const maybeSendAckReactionMock = vi.fn(); +const processMessageMock = vi.fn(); + +vi.mock("./audio-preflight.runtime.js", () => ({ + transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args), +})); + +vi.mock("./ack-reaction.js", () => ({ + maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args), +})); + +vi.mock("./process-message.js", () => ({ + processMessage: (...args: unknown[]) => processMessageMock(...args), +})); + +vi.mock("./broadcast.js", () => ({ + maybeBroadcastMessage: vi.fn(async () => false), +})); + +vi.mock("./group-gating.js", () => ({ + applyGroupGating: vi.fn(async () => ({ shouldProcess: true })), +})); + +vi.mock("./last-route.js", () => ({ + updateLastRouteInBackground: () => {}, +})); + +vi.mock("./peer.js", () => ({ + resolvePeerId: (msg: { from: string }) => msg.from, +})); + +vi.mock("../config.runtime.js", () => ({ + loadConfig: () => ({ + channels: { + whatsapp: { + ackReaction: { enabled: true }, + }, + }, + }), +})); + +vi.mock("../../group-session-key.js", () => ({ + resolveWhatsAppGroupSessionRoute: (route: unknown) => route, +})); + +vi.mock("../../identity.js", () => ({ + getPrimaryIdentityId: () => undefined, + getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }), +})); + +vi.mock("../../text-runtime.js", () => ({ + normalizeE164: (value: string) => value, +})); + +vi.mock("openclaw/plugin-sdk/routing", () => ({ + buildGroupHistoryKey: () => "group-key", + resolveAgentRoute: () => ({ + agentId: "main", + accountId: "default", + sessionKey: "agent:main:whatsapp:+15550000002", + mainSessionKey: "agent:main:main", + }), +})); + +import type { WebInboundMsg } from "../types.js"; +import { createWebOnMessageHandler } from "./on-message.js"; + +function makeAudioMsg(): WebInboundMsg { + return { + id: "msg-1", + from: "+15550000002", + to: "+15550000001", + body: "", + chatType: "direct", + mediaType: "audio/ogg; codecs=opus", + mediaPath: "/tmp/voice.ogg", + timestamp: 1700000000, + accountId: "default", + } as WebInboundMsg; +} + +function makeEchoTracker() { + return { + has: () => false, + forget: () => {}, + rememberText: () => {}, + buildCombinedKey: (p: { combinedBody: string }) => p.combinedBody, + }; +} + +describe("createWebOnMessageHandler audio preflight", () => { + beforeEach(() => { + events.length = 0; + maybeSendAckReactionMock.mockReset(); + maybeSendAckReactionMock.mockImplementation(async () => { + events.push("ack"); + }); + transcribeFirstAudioMock.mockReset(); + transcribeFirstAudioMock.mockImplementation(async () => { + events.push("stt"); + return "transcribed voice note"; + }); + processMessageMock.mockReset(); + processMessageMock.mockResolvedValue(true); + }); + + it("sends ack reaction before audio preflight for voice notes", async () => { + const handler = createWebOnMessageHandler({ + cfg: { + channels: { + whatsapp: { + ackReaction: { enabled: true }, + }, + }, + } as never, + verbose: false, + connectionId: "conn-1", + maxMediaBytes: 1024 * 1024, + groupHistoryLimit: 20, + groupHistories: new Map(), + groupMemberNames: new Map(), + echoTracker: makeEchoTracker() as never, + backgroundTasks: new Set(), + replyResolver: vi.fn() as never, + replyLogger: { + info: () => {}, + warn: () => {}, + debug: () => {}, + error: () => {}, + } as never, + baseMentionConfig: {} as never, + account: { authDir: "/tmp/auth", accountId: "default" }, + }); + + await handler(makeAudioMsg()); + + expect(events).toEqual(["ack", "stt"]); + expect(processMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + preflightAudioTranscript: "transcribed voice note", + ackAlreadySent: true, + }), + ); + }); +}); diff --git a/extensions/whatsapp/src/auto-reply/monitor/on-message.ts b/extensions/whatsapp/src/auto-reply/monitor/on-message.ts index 4b3d1705e3e..95151409a5c 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/on-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/on-message.ts @@ -9,6 +9,7 @@ import { normalizeE164 } from "../../text-runtime.js"; import { loadConfig } from "../config.runtime.js"; import type { MentionConfig } from "../mentions.js"; import type { WebInboundMsg } from "../types.js"; +import { maybeSendAckReaction } from "./ack-reaction.js"; import { maybeBroadcastMessage } from "./broadcast.js"; import type { EchoTracker } from "./echo.js"; import type { GroupHistoryEntry } from "./group-gating.js"; @@ -40,6 +41,7 @@ export function createWebOnMessageHandler(params: { groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; }, ) => processMessage({ @@ -62,6 +64,7 @@ export function createWebOnMessageHandler(params: { groupHistory: opts?.groupHistory, suppressGroupHistoryClear: opts?.suppressGroupHistoryClear, preflightAudioTranscript: opts?.preflightAudioTranscript, + ackAlreadySent: opts?.ackAlreadySent, }); return async (msg: WebInboundMsg) => { @@ -168,7 +171,20 @@ export function createWebOnMessageHandler(params: { let preflightAudioTranscript: string | null | undefined; const hasAudioBody = msg.mediaType?.startsWith("audio/") === true && msg.body === ""; + let ackAlreadySent = false; if (hasAudioBody && msg.mediaPath) { + await maybeSendAckReaction({ + cfg: params.cfg, + msg, + agentId: route.agentId, + sessionKey: route.sessionKey, + conversationId, + verbose: params.verbose, + accountId: route.accountId, + info: params.replyLogger.info.bind(params.replyLogger), + warn: params.replyLogger.warn.bind(params.replyLogger), + }); + ackAlreadySent = true; try { const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js"); // transcribeFirstAudio returns undefined on failure/disabled; store null so @@ -198,12 +214,16 @@ export function createWebOnMessageHandler(params: { groupHistoryKey, groupHistories: params.groupHistories, preflightAudioTranscript, + ackAlreadySent, processMessage: (m, r, k, opts) => processForRoute(m, r, k, opts), }) ) { return; } - await processForRoute(msg, route, groupHistoryKey, { preflightAudioTranscript }); + await processForRoute(msg, route, groupHistoryKey, { + preflightAudioTranscript, + ackAlreadySent, + }); }; } diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts index e0656ee24a6..baff7745db4 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; // Mock the lazy-loaded audio preflight runtime boundary const transcribeFirstAudioMock = vi.fn(); +const maybeSendAckReactionMock = vi.fn(); vi.mock("./audio-preflight.runtime.js", () => ({ transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args), @@ -43,7 +44,7 @@ vi.mock("../loggers.js", () => ({ })); vi.mock("./ack-reaction.js", () => ({ - maybeSendAckReaction: () => {}, + maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args), })); vi.mock("./inbound-context.js", () => ({ @@ -161,6 +162,8 @@ function makeParams(msgOverrides: Partial = {}) { describe("processMessage audio preflight transcription", () => { beforeEach(() => { transcribeFirstAudioMock.mockReset(); + maybeSendAckReactionMock.mockReset(); + maybeSendAckReactionMock.mockResolvedValue(undefined); shouldComputeCommandResult = false; vi.mocked(dispatchWhatsAppBufferedReply).mockClear(); }); @@ -287,6 +290,16 @@ describe("processMessage audio preflight transcription", () => { }); }); + it("does not send a duplicate ack when caller already sent it", async () => { + await processMessage({ + ...makeParams(), + preflightAudioTranscript: "pre-computed transcript from caller", + ackAlreadySent: true, + }); + + expect(maybeSendAckReactionMock).not.toHaveBeenCalled(); + }); + it("skips internal STT when preflightAudioTranscript is null (failed preflight sentinel)", async () => { // null = caller already attempted preflight but got nothing (provider unavailable, // disabled, etc.). processMessage must NOT retry to avoid 1+N attempts in broadcast. diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts index c2a076d9a2d..0f2fbdb23f2 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts @@ -191,6 +191,7 @@ export async function processMessage(params: { maxMediaTextChunkLimit?: number; groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + ackAlreadySent?: boolean; /** Pre-computed audio transcript from a caller-level preflight, used to avoid * re-transcribing the same voice note once per broadcast agent. * - string → transcript obtained; use it directly, skip internal STT @@ -314,18 +315,22 @@ export async function processMessage(params: { return false; } - // Send ack reaction immediately upon message receipt (post-gating) - await maybeSendAckReaction({ - cfg: params.cfg, - msg: params.msg, - agentId: params.route.agentId, - sessionKey: params.route.sessionKey, - conversationId, - verbose: params.verbose, - accountId: account.accountId, - info: params.replyLogger.info.bind(params.replyLogger), - warn: params.replyLogger.warn.bind(params.replyLogger), - }); + // Send ack reaction immediately upon message receipt (post-gating). Callers + // that do preflight work before processMessage can send it first and set + // ackAlreadySent so slow STT does not delay user-visible receipt feedback. + if (params.ackAlreadySent !== true) { + await maybeSendAckReaction({ + cfg: params.cfg, + msg: params.msg, + agentId: params.route.agentId, + sessionKey: params.route.sessionKey, + conversationId, + verbose: params.verbose, + accountId: account.accountId, + info: params.replyLogger.info.bind(params.replyLogger), + warn: params.replyLogger.warn.bind(params.replyLogger), + }); + } const correlationId = params.msg.id ?? newConnectionId(); params.replyLogger.info(