diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb5d9526f7..ae547a06fda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -155,6 +155,7 @@ Docs: https://docs.openclaw.ai - Gateway/startup: await startup sidecars before channel monitors report ready, reducing Discord and plugin startup races while still keeping gateway boot observability intact. Thanks @steipete. - Plugins/Google Meet: report required manual actions for Chrome joins, use browser automation for Meet entry, and persist the private-WS node opt-in so paired-node realtime sessions keep their intended network policy. Thanks @steipete. - Slack: route native stream fallback replies through the normal chunked sender so long buffered Slack Connect responses are not dropped or duplicated. (#71124) Thanks @martingarramon. +- WhatsApp: transcribe accepted voice notes before agent dispatch while keeping spoken transcripts out of command authorization. (#64120) Thanks @rogerdigital. ## 2026.4.23 diff --git a/extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts b/extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts new file mode 100644 index 00000000000..7e7f111d104 --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts @@ -0,0 +1,9 @@ +import { transcribeFirstAudio as transcribeFirstAudioImpl } from "openclaw/plugin-sdk/media-runtime"; + +type TranscribeFirstAudio = typeof import("openclaw/plugin-sdk/media-runtime").transcribeFirstAudio; + +export async function transcribeFirstAudio( + ...args: Parameters +): ReturnType { + return await transcribeFirstAudioImpl(...args); +} diff --git a/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts b/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts index 597df193458..4842fa998ab 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts @@ -59,8 +59,12 @@ export async function maybeBroadcastMessage(params: { opts?: { groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; }, ) => Promise; + preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; }) { const broadcastAgents = params.cfg.broadcast?.[params.peerId]; if (!broadcastAgents || !Array.isArray(broadcastAgents)) { @@ -104,10 +108,22 @@ export async function maybeBroadcastMessage(params: { : baseAgentRoute; try { - return await params.processMessage(params.msg, agentRoute, params.groupHistoryKey, { + const opts: { + groupHistory?: GroupHistoryEntry[]; + suppressGroupHistoryClear: true; + preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; + } = { groupHistory: groupHistorySnapshot, suppressGroupHistoryClear: true, - }); + }; + if (params.preflightAudioTranscript !== undefined) { + opts.preflightAudioTranscript = params.preflightAudioTranscript; + } + if (params.ackAlreadySent === true) { + opts.ackAlreadySent = true; + } + return await params.processMessage(params.msg, agentRoute, params.groupHistoryKey, opts); } catch (err) { whatsappInboundLog.error(`Broadcast agent ${agentId} failed: ${formatError(err)}`); return false; diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts index 12bad29cf1a..c8ea78646fa 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts @@ -175,6 +175,35 @@ describe("whatsapp inbound dispatch", () => { }); }); + it("keeps agent and command bodies independently overridable", () => { + const ctx = buildWhatsAppInboundContext({ + bodyForAgent: "spoken transcript", + combinedBody: "spoken transcript", + commandBody: "", + conversationId: "+1000", + msg: makeMsg({ + body: "", + mediaPath: "/tmp/voice.ogg", + mediaType: "audio/ogg; codecs=opus", + }), + rawBody: "", + route: makeRoute(), + sender: { + e164: "+1000", + }, + transcript: "spoken transcript", + }); + + expect(ctx).toMatchObject({ + Body: "spoken transcript", + BodyForAgent: "spoken transcript", + BodyForCommands: "", + CommandBody: "", + RawBody: "", + Transcript: "spoken transcript", + }); + }); + it("falls back SenderId to SenderE164 when sender id is missing", () => { const ctx = buildWhatsAppInboundContext({ combinedBody: "hi", diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts index be9f1d668df..379f6f96294 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts @@ -86,15 +86,19 @@ export function resolveWhatsAppResponsePrefix(params: { } export function buildWhatsAppInboundContext(params: { + bodyForAgent?: string; combinedBody: string; + commandBody?: string; commandAuthorized?: boolean; conversationId: string; groupHistory?: GroupHistoryEntry[]; groupMemberRoster?: Map; groupSystemPrompt?: string; msg: WebInboundMsg; + rawBody?: string; route: ReturnType; sender: SenderContext; + transcript?: string; replyThreading?: ReplyThreadingContext; visibleReplyTo?: VisibleReplyTarget; }) { @@ -109,10 +113,11 @@ export function buildWhatsAppInboundContext(params: { const result = finalizeInboundContext({ Body: params.combinedBody, - BodyForAgent: params.msg.body, + BodyForAgent: params.bodyForAgent ?? params.msg.body, InboundHistory: inboundHistory, - RawBody: params.msg.body, - CommandBody: params.msg.body, + RawBody: params.rawBody ?? params.msg.body, + CommandBody: params.commandBody ?? params.msg.body, + Transcript: params.transcript, From: params.msg.from, To: params.msg.to, SessionKey: params.route.sessionKey, diff --git a/extensions/whatsapp/src/auto-reply/monitor/on-message.audio-preflight.test.ts b/extensions/whatsapp/src/auto-reply/monitor/on-message.audio-preflight.test.ts new file mode 100644 index 00000000000..85a119c40de --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor/on-message.audio-preflight.test.ts @@ -0,0 +1,250 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const events: string[] = []; +const transcribeFirstAudioMock = vi.fn(); +const maybeSendAckReactionMock = vi.fn(); +const processMessageMock = vi.fn(); +const maybeBroadcastMessageMock = vi.fn(); + +vi.mock("./audio-preflight.runtime.js", () => ({ + transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args), +})); + +vi.mock("./ack-reaction.js", () => ({ + maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args), +})); + +vi.mock("./process-message.js", () => ({ + processMessage: (...args: unknown[]) => processMessageMock(...args), +})); + +vi.mock("./broadcast.js", () => ({ + maybeBroadcastMessage: (...args: unknown[]) => maybeBroadcastMessageMock(...args), +})); + +vi.mock("./group-gating.js", () => ({ + applyGroupGating: vi.fn(async () => ({ shouldProcess: true })), +})); + +vi.mock("./last-route.js", () => ({ + updateLastRouteInBackground: () => {}, +})); + +vi.mock("./peer.js", () => ({ + resolvePeerId: (msg: { from: string }) => msg.from, +})); + +vi.mock("../config.runtime.js", () => ({ + loadConfig: () => ({ + channels: { + whatsapp: { + ackReaction: { enabled: true }, + }, + }, + }), +})); + +vi.mock("../../group-session-key.js", () => ({ + resolveWhatsAppGroupSessionRoute: (route: unknown) => route, +})); + +vi.mock("../../identity.js", () => ({ + getPrimaryIdentityId: () => undefined, + getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }), +})); + +vi.mock("../../text-runtime.js", () => ({ + normalizeE164: (value: string) => value, +})); + +vi.mock("openclaw/plugin-sdk/routing", () => ({ + buildGroupHistoryKey: () => "group-key", + resolveAgentRoute: () => ({ + agentId: "main", + accountId: "default", + sessionKey: "agent:main:whatsapp:+15550000002", + mainSessionKey: "agent:main:main", + }), +})); + +import type { WebInboundMsg } from "../types.js"; +import { createWebOnMessageHandler } from "./on-message.js"; + +function makeAudioMsg(): WebInboundMsg { + return { + id: "msg-1", + from: "+15550000002", + to: "+15550000001", + accessControlPassed: true, + body: "", + chatType: "direct", + mediaType: "audio/ogg; codecs=opus", + mediaPath: "/tmp/voice.ogg", + timestamp: 1700000000, + accountId: "default", + } as WebInboundMsg; +} + +function makeGroupAudioMsg(): WebInboundMsg { + return { + ...makeAudioMsg(), + from: "1203630@g.us", + chatId: "1203630@g.us", + chatType: "group", + conversationId: "1203630@g.us", + wasMentioned: false, + } as WebInboundMsg; +} + +function makeEchoTracker() { + return { + has: () => false, + forget: () => {}, + rememberText: () => {}, + buildCombinedKey: (p: { combinedBody: string }) => p.combinedBody, + }; +} + +describe("createWebOnMessageHandler audio preflight", () => { + beforeEach(() => { + events.length = 0; + maybeBroadcastMessageMock.mockReset(); + maybeBroadcastMessageMock.mockImplementation(async () => false); + maybeSendAckReactionMock.mockReset(); + maybeSendAckReactionMock.mockImplementation(async () => { + events.push("ack"); + }); + transcribeFirstAudioMock.mockReset(); + transcribeFirstAudioMock.mockImplementation(async () => { + events.push("stt"); + return "transcribed voice note"; + }); + processMessageMock.mockReset(); + processMessageMock.mockResolvedValue(true); + }); + + it("sends ack reaction before audio preflight for voice notes", async () => { + const handler = createWebOnMessageHandler({ + cfg: { + channels: { + whatsapp: { + ackReaction: { enabled: true }, + }, + }, + } as never, + verbose: false, + connectionId: "conn-1", + maxMediaBytes: 1024 * 1024, + groupHistoryLimit: 20, + groupHistories: new Map(), + groupMemberNames: new Map(), + echoTracker: makeEchoTracker() as never, + backgroundTasks: new Set(), + replyResolver: vi.fn() as never, + replyLogger: { + info: () => {}, + warn: () => {}, + debug: () => {}, + error: () => {}, + } as never, + baseMentionConfig: {} as never, + account: { authDir: "/tmp/auth", accountId: "default" }, + }); + + await handler(makeAudioMsg()); + + expect(events).toEqual(["ack", "stt"]); + expect(processMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + preflightAudioTranscript: "transcribed voice note", + ackAlreadySent: true, + }), + ); + }); + + it("skips early DM ack/preflight when access-control was not explicitly passed through", async () => { + + const handler = createWebOnMessageHandler({ + cfg: { + channels: { + whatsapp: { + ackReaction: { enabled: true }, + }, + }, + } as never, + verbose: false, + connectionId: "conn-1", + maxMediaBytes: 1024 * 1024, + groupHistoryLimit: 20, + groupHistories: new Map(), + groupMemberNames: new Map(), + echoTracker: makeEchoTracker() as never, + backgroundTasks: new Set(), + replyResolver: vi.fn() as never, + replyLogger: { + info: () => {}, + warn: () => {}, + debug: () => {}, + error: () => {}, + } as never, + baseMentionConfig: {} as never, + account: { authDir: "/tmp/auth", accountId: "default" }, + }); + + await handler({ ...makeAudioMsg(), accessControlPassed: undefined }); + + expect(events).toEqual([]); + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + expect(maybeSendAckReactionMock).not.toHaveBeenCalled(); + expect(processMessageMock).toHaveBeenCalledWith( + expect.not.objectContaining({ + preflightAudioTranscript: expect.anything(), + ackAlreadySent: true, + }), + ); + }); + + it("preserves per-agent ack checks for group broadcast voice notes", async () => { + maybeBroadcastMessageMock.mockImplementation( + async (params: { ackAlreadySent?: boolean; preflightAudioTranscript?: string | null }) => { + expect(params.preflightAudioTranscript).toBe("transcribed voice note"); + expect(params.ackAlreadySent).toBeUndefined(); + return true; + }, + ); + const handler = createWebOnMessageHandler({ + cfg: { + channels: { + whatsapp: { + ackReaction: { enabled: true }, + }, + }, + broadcast: { + "1203630@g.us": ["main", "backup"], + }, + } as never, + verbose: false, + connectionId: "conn-1", + maxMediaBytes: 1024 * 1024, + groupHistoryLimit: 20, + groupHistories: new Map(), + groupMemberNames: new Map(), + echoTracker: makeEchoTracker() as never, + backgroundTasks: new Set(), + replyResolver: vi.fn() as never, + replyLogger: { + info: () => {}, + warn: () => {}, + debug: () => {}, + error: () => {}, + } as never, + baseMentionConfig: {} as never, + account: { authDir: "/tmp/auth", accountId: "default" }, + }); + + await handler(makeGroupAudioMsg()); + + expect(events).toEqual(["ack", "stt"]); + expect(processMessageMock).not.toHaveBeenCalled(); + }); +}); diff --git a/extensions/whatsapp/src/auto-reply/monitor/on-message.ts b/extensions/whatsapp/src/auto-reply/monitor/on-message.ts index 281c49f91f9..b90d972fe1f 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/on-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/on-message.ts @@ -9,6 +9,7 @@ import { normalizeE164 } from "../../text-runtime.js"; import { loadConfig } from "../config.runtime.js"; import type { MentionConfig } from "../mentions.js"; import type { WebInboundMsg } from "../types.js"; +import { maybeSendAckReaction } from "./ack-reaction.js"; import { maybeBroadcastMessage } from "./broadcast.js"; import type { EchoTracker } from "./echo.js"; import type { GroupHistoryEntry } from "./group-gating.js"; @@ -39,9 +40,11 @@ export function createWebOnMessageHandler(params: { opts?: { groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + preflightAudioTranscript?: string | null; + ackAlreadySent?: boolean; }, - ) => - processMessage({ + ) => { + const processParams: Parameters[0] = { cfg: params.cfg, msg, route, @@ -58,9 +61,21 @@ export function createWebOnMessageHandler(params: { echoHas: params.echoTracker.has, echoForget: params.echoTracker.forget, buildCombinedEchoKey: params.echoTracker.buildCombinedKey, - groupHistory: opts?.groupHistory, - suppressGroupHistoryClear: opts?.suppressGroupHistoryClear, - }); + }; + if (opts?.groupHistory !== undefined) { + processParams.groupHistory = opts.groupHistory; + } + if (opts?.suppressGroupHistoryClear !== undefined) { + processParams.suppressGroupHistoryClear = opts.suppressGroupHistoryClear; + } + if (opts?.preflightAudioTranscript !== undefined) { + processParams.preflightAudioTranscript = opts.preflightAudioTranscript; + } + if (opts?.ackAlreadySent === true) { + processParams.ackAlreadySent = true; + } + return processMessage(processParams); + }; return async (msg: WebInboundMsg) => { const conversationId = msg.conversationId ?? msg.from; @@ -159,6 +174,49 @@ export function createWebOnMessageHandler(params: { } } + // Preflight audio transcription: run once here, before broadcast fan-out, so + // all agents share the same transcript instead of each making a separate STT call. + // For DMs, only do this on the real inbound path after access-control/pairing + // checks have already passed in inbound/monitor.ts. That keeps external STT and + // early ack feedback behind the same auth-first gate as the rest of DM handling. + // null = preflight was attempted but produced no transcript (failed / disabled / no audio); + // undefined = preflight was not attempted (non-audio message). + let preflightAudioTranscript: string | null | undefined; + const hasAudioBody = + msg.mediaType?.startsWith("audio/") === true && msg.body === ""; + const canRunEarlyDmPreflight = msg.chatType === "group" || msg.accessControlPassed === true; + let ackAlreadySent = false; + if (canRunEarlyDmPreflight && hasAudioBody && msg.mediaPath) { + await maybeSendAckReaction({ + cfg: params.cfg, + msg, + agentId: route.agentId, + sessionKey: route.sessionKey, + conversationId, + verbose: params.verbose, + accountId: route.accountId, + info: params.replyLogger.info.bind(params.replyLogger), + warn: params.replyLogger.warn.bind(params.replyLogger), + }); + ackAlreadySent = true; + try { + const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js"); + // transcribeFirstAudio returns undefined on failure/disabled; store null so + // processMessage knows the attempt was already made and does not retry. + preflightAudioTranscript = + (await transcribeFirstAudio({ + ctx: { + MediaPaths: [msg.mediaPath], + MediaTypes: msg.mediaType ? [msg.mediaType] : undefined, + }, + cfg: params.cfg, + })) ?? null; + } catch { + // Non-fatal: store null so per-agent retries are suppressed. + preflightAudioTranscript = null; + } + } + // Broadcast groups: when we'd reply anyway, run multiple agents. // Does not bypass group mention/activation gating above. if ( @@ -169,12 +227,20 @@ export function createWebOnMessageHandler(params: { route, groupHistoryKey, groupHistories: params.groupHistories, - processMessage: processForRoute, + ...(preflightAudioTranscript !== undefined ? { preflightAudioTranscript } : {}), + // Group ack eligibility depends on the target agent/session, so a + // preflight ack attempt on the base route must not suppress downstream + // per-agent checks during broadcast fan-out. + ...(ackAlreadySent && msg.chatType !== "group" ? { ackAlreadySent: true } : {}), + processMessage: (m, r, k, opts) => processForRoute(m, r, k, opts), }) ) { return; } - await processForRoute(msg, route, groupHistoryKey); + await processForRoute(msg, route, groupHistoryKey, { + ...(preflightAudioTranscript !== undefined ? { preflightAudioTranscript } : {}), + ...(ackAlreadySent ? { ackAlreadySent: true } : {}), + }); }; } diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts new file mode 100644 index 00000000000..ce11166d4a2 --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts @@ -0,0 +1,341 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +// Mock the lazy-loaded audio preflight runtime boundary +const transcribeFirstAudioMock = vi.fn(); +const maybeSendAckReactionMock = vi.fn(); + +vi.mock("./audio-preflight.runtime.js", () => ({ + transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args), +})); + +// Controllable shouldComputeCommandAuthorized for command-sync tests +let shouldComputeCommandResult = false; +let shouldComputeCommandBodies: string[] = []; + +// Minimal mocks for process-message dependencies +vi.mock("../../accounts.js", () => ({ + resolveWhatsAppAccount: () => ({ + accountId: "default", + dmPolicy: "pairing", + groupPolicy: "allowlist", + allowFrom: [], + }), +})); + +vi.mock("../../identity.js", () => ({ + getPrimaryIdentityId: () => undefined, + getSelfIdentity: () => ({ e164: "+15550000001" }), + getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }), +})); + +vi.mock("../../reconnect.js", () => ({ + newConnectionId: () => "test-conn-id", +})); + +vi.mock("../../session.js", () => ({ + formatError: (err: unknown) => String(err), +})); + +vi.mock("../deliver-reply.js", () => ({ + deliverWebReply: vi.fn(async () => {}), +})); + +vi.mock("../loggers.js", () => ({ + whatsappInboundLog: { info: () => {}, debug: () => {} }, +})); + +vi.mock("./ack-reaction.js", () => ({ + maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args), +})); + +vi.mock("./inbound-context.js", () => ({ + resolveVisibleWhatsAppGroupHistory: () => [], + resolveVisibleWhatsAppReplyContext: () => null, +})); + +vi.mock("./last-route.js", () => ({ + trackBackgroundTask: () => {}, + updateLastRouteInBackground: () => {}, +})); + +vi.mock("./message-line.js", () => ({ + buildInboundLine: (params: { msg: { body: string } }) => params.msg.body, +})); + +vi.mock("./runtime-api.js", () => ({ + buildHistoryContextFromEntries: (_p: { currentMessage: string }) => _p.currentMessage, + createChannelReplyPipeline: () => ({ onModelSelected: undefined }), + formatInboundEnvelope: (p: { body: string }) => p.body, + logVerbose: () => {}, + normalizeE164: (v: string) => v, + readStoreAllowFromForDmPolicy: async () => [], + recordSessionMetaFromInbound: async () => {}, + resolveChannelContextVisibilityMode: () => "standard", + resolveInboundSessionEnvelopeContext: () => ({ + storePath: "/tmp/sessions.json", + envelopeOptions: {}, + previousTimestamp: undefined, + }), + resolvePinnedMainDmOwnerFromAllowlist: () => null, + resolveDmGroupAccessWithCommandGate: () => ({ commandAuthorized: true }), + shouldComputeCommandAuthorized: (body: string) => { + shouldComputeCommandBodies.push(body); + return shouldComputeCommandResult || body.startsWith("/"); + }, + shouldLogVerbose: () => false, + type: undefined, +})); + +vi.mock("./inbound-dispatch.js", () => ({ + buildWhatsAppInboundContext: (params: { + bodyForAgent?: string; + combinedBody: string; + commandAuthorized?: boolean; + commandBody?: string; + msg: { body: string; mediaPath?: string; mediaType?: string }; + rawBody?: string; + transcript?: string; + }) => ({ + Body: params.combinedBody, + BodyForAgent: params.bodyForAgent ?? params.msg.body, + CommandAuthorized: params.commandAuthorized, + CommandBody: params.commandBody ?? params.msg.body, + MediaPath: params.msg.mediaPath, + MediaType: params.msg.mediaType, + RawBody: params.rawBody ?? params.msg.body, + Transcript: params.transcript, + }), + dispatchWhatsAppBufferedReply: vi.fn(async () => true), + resolveWhatsAppDmRouteTarget: () => "+15550000002", + resolveWhatsAppResponsePrefix: () => undefined, + updateWhatsAppMainLastRoute: () => {}, +})); + +import { dispatchWhatsAppBufferedReply } from "./inbound-dispatch.js"; +import { processMessage } from "./process-message.js"; + +type WebInboundMsg = Parameters[0]["msg"]; +type TestRoute = Parameters[0]["route"]; + +function makeAudioMsg(overrides: Partial = {}): WebInboundMsg { + return { + id: "msg-1", + from: "+15550000002", + to: "+15550000001", + body: "", + chatType: "direct", + mediaType: "audio/ogg; codecs=opus", + mediaPath: "/tmp/voice.ogg", + timestamp: 1700000000, + accountId: "default", + ...overrides, + } as WebInboundMsg; +} + +function makeRoute(overrides: Partial = {}): TestRoute { + return { + agentId: "main", + sessionKey: "agent:main:main", + mainSessionKey: "agent:main:main", + accountId: "default", + ...overrides, + } as TestRoute; +} + +function makeParams(msgOverrides: Partial = {}) { + return { + cfg: { + tools: { media: { audio: { enabled: true } } }, + channels: { whatsapp: {} }, + commands: { useAccessGroups: false }, + } as never, + msg: makeAudioMsg(msgOverrides), + route: makeRoute(), + groupHistoryKey: "whatsapp:default:+15550000002", + groupHistories: new Map(), + groupMemberNames: new Map(), + connectionId: "conn-1", + verbose: false, + maxMediaBytes: 1024 * 1024, + replyResolver: vi.fn() as never, + replyLogger: { + info: () => {}, + warn: () => {}, + debug: () => {}, + error: () => {}, + } as never, + backgroundTasks: new Set>(), + rememberSentText: () => {}, + echoHas: () => false, + echoForget: () => {}, + buildCombinedEchoKey: (p: { combinedBody: string }) => p.combinedBody, + }; +} + +describe("processMessage audio preflight transcription", () => { + beforeEach(() => { + transcribeFirstAudioMock.mockReset(); + maybeSendAckReactionMock.mockReset(); + maybeSendAckReactionMock.mockResolvedValue(undefined); + shouldComputeCommandResult = false; + shouldComputeCommandBodies = []; + vi.mocked(dispatchWhatsAppBufferedReply).mockClear(); + }); + + it("replaces body with transcript when transcription succeeds", async () => { + transcribeFirstAudioMock.mockResolvedValueOnce("okay let's test this voice message"); + + await processMessage(makeParams()); + + expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1); + expect(transcribeFirstAudioMock).toHaveBeenCalledWith( + expect.objectContaining({ + ctx: expect.objectContaining({ + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg; codecs=opus"], + }), + }), + ); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "okay let's test this voice message", + BodyForAgent: "okay let's test this voice message", + CommandBody: "", + RawBody: "", + Transcript: "okay let's test this voice message", + }); + // mediaPath and mediaType must be preserved so inboundAudio detection (used by + // features like messages.tts.auto: "inbound") still recognises this as audio. + expect(dispatchCall?.context).toMatchObject({ + MediaPath: "/tmp/voice.ogg", + MediaType: "audio/ogg; codecs=opus", + }); + }); + + it("falls back to placeholder when transcription fails", async () => { + transcribeFirstAudioMock.mockRejectedValueOnce(new Error("provider unavailable")); + + await processMessage(makeParams()); + + expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + BodyForAgent: "", + }); + }); + + it("falls back to placeholder when transcription returns undefined", async () => { + transcribeFirstAudioMock.mockResolvedValueOnce(undefined); + + await processMessage(makeParams()); + + expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + BodyForAgent: "", + }); + }); + + it("does not call transcribeFirstAudio when mediaType is not audio", async () => { + await processMessage( + makeParams({ body: "", mediaType: "image/jpeg", mediaPath: "/tmp/img.jpg" }), + ); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + }); + + it("does not call transcribeFirstAudio when body is not ", async () => { + await processMessage(makeParams({ body: "hello there", mediaType: "audio/ogg; codecs=opus" })); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + }); + + it("does not call transcribeFirstAudio when mediaPath is absent", async () => { + await processMessage(makeParams({ mediaPath: undefined })); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + }); + + it("does not call transcribeFirstAudio when msg.mediaType is absent", async () => { + await processMessage( + makeParams({ mediaType: undefined, body: "", mediaPath: "/tmp/voice.ogg" }), + ); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + + // Body passes through as-is without a mediaType to confirm audio + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + }); + }); + + it("does not use transcript body for command detection", async () => { + transcribeFirstAudioMock.mockResolvedValueOnce("/new start a new session"); + + await processMessage(makeParams()); + + expect(shouldComputeCommandBodies).toEqual([""]); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "/new start a new session", + BodyForAgent: "/new start a new session", + CommandBody: "", + RawBody: "", + Transcript: "/new start a new session", + }); + }); + + it("uses preflightAudioTranscript when provided, skipping transcribeFirstAudio", async () => { + // Simulate broadcast fan-out: caller pre-computed the transcript and passes it in. + // transcribeFirstAudio must NOT be called again inside processMessage. + await processMessage({ + ...makeParams(), + preflightAudioTranscript: "pre-computed transcript from fan-out caller", + }); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "pre-computed transcript from fan-out caller", + BodyForAgent: "pre-computed transcript from fan-out caller", + CommandBody: "", + RawBody: "", + Transcript: "pre-computed transcript from fan-out caller", + }); + }); + + it("does not send a duplicate ack when caller already sent it", async () => { + await processMessage({ + ...makeParams(), + preflightAudioTranscript: "pre-computed transcript from caller", + ackAlreadySent: true, + }); + + expect(maybeSendAckReactionMock).not.toHaveBeenCalled(); + }); + + it("skips internal STT when preflightAudioTranscript is null (failed preflight sentinel)", async () => { + // null = caller already attempted preflight but got nothing (provider unavailable, + // disabled, etc.). processMessage must NOT retry to avoid 1+N attempts in broadcast. + await processMessage({ + ...makeParams(), + preflightAudioTranscript: null, + }); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + + // Body falls back to the original placeholder, not retried transcript. + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + }); + }); +}); diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts index 1bf4bfb678b..c98dcb71837 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts @@ -191,6 +191,13 @@ export async function processMessage(params: { maxMediaTextChunkLimit?: number; groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + ackAlreadySent?: boolean; + /** Pre-computed audio transcript from a caller-level preflight, used to avoid + * re-transcribing the same voice note once per broadcast agent. + * - string → transcript obtained; use it directly, skip internal STT + * - null → preflight was attempted but failed / returned nothing; skip internal STT + * - undefined (omitted) → caller did not attempt preflight; run internal STT as normal */ + preflightAudioTranscript?: string | null; }) { const conversationId = params.msg.conversationId ?? params.msg.from; const self = getSelfIdentity(params.msg); @@ -210,9 +217,50 @@ export async function processMessage(params: { agentId: params.route.agentId, sessionKey: params.route.sessionKey, }); + // Preflight audio transcription: transcribe voice notes before building the + // inbound context so the agent receives the transcript instead of . + // Mirrors the preflight step added for Telegram in #61008. + // When the caller already performed transcription (e.g. on-message.ts before + // broadcast fan-out) the pre-computed result is reused to avoid N STT calls + // for N broadcast agents on the same voice note. + // preflightAudioTranscript semantics: + // string → transcript ready, use it + // null → caller attempted but got nothing; skip internal STT to avoid retry + // undefined → caller did not attempt; run internal STT + let audioTranscript: string | undefined = params.preflightAudioTranscript ?? undefined; + const hasAudioBody = + params.msg.mediaType?.startsWith("audio/") === true && params.msg.body === ""; + if (params.preflightAudioTranscript === undefined && hasAudioBody && params.msg.mediaPath) { + try { + const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js"); + audioTranscript = await transcribeFirstAudio({ + ctx: { + MediaPaths: [params.msg.mediaPath], + MediaTypes: params.msg.mediaType ? [params.msg.mediaType] : undefined, + }, + cfg: params.cfg, + }); + } catch { + // Transcription failure is non-fatal: fall back to placeholder. + if (shouldLogVerbose()) { + logVerbose("whatsapp: audio preflight transcription failed, using placeholder"); + } + } + } + + // If we have a transcript, replace the agent-facing body so the agent sees the spoken text. + // mediaPath and mediaType are intentionally preserved so that inboundAudio detection + // (used by features such as messages.tts.auto: "inbound") still sees this as an + // audio message. The transcript is also stored in Transcript so downstream pipelines + // can detect it. Preventing a second STT pass in the media-understanding pipeline + // requires SDK-level support (alreadyTranscribed on a shared attachment instance); + // that is a shared concern across all channels and is tracked separately. + const msgForAgent = + audioTranscript !== undefined ? { ...params.msg, body: audioTranscript } : params.msg; + let combinedBody = buildInboundLine({ cfg: params.cfg, - msg: params.msg, + msg: msgForAgent, agentId: params.route.agentId, previousTimestamp, envelope: envelopeOptions, @@ -267,18 +315,22 @@ export async function processMessage(params: { return false; } - // Send ack reaction immediately upon message receipt (post-gating) - await maybeSendAckReaction({ - cfg: params.cfg, - msg: params.msg, - agentId: params.route.agentId, - sessionKey: params.route.sessionKey, - conversationId, - verbose: params.verbose, - accountId: account.accountId, - info: params.replyLogger.info.bind(params.replyLogger), - warn: params.replyLogger.warn.bind(params.replyLogger), - }); + // Send ack reaction immediately upon message receipt (post-gating). Callers + // that do preflight work before processMessage can send it first and set + // ackAlreadySent so slow STT does not delay user-visible receipt feedback. + if (params.ackAlreadySent !== true) { + await maybeSendAckReaction({ + cfg: params.cfg, + msg: params.msg, + agentId: params.route.agentId, + sessionKey: params.route.sessionKey, + conversationId, + verbose: params.verbose, + accountId: account.accountId, + info: params.replyLogger.info.bind(params.replyLogger), + warn: params.replyLogger.warn.bind(params.replyLogger), + }); + } const correlationId = params.msg.id ?? newConnectionId(); params.replyLogger.info( @@ -353,19 +405,23 @@ export async function processMessage(params: { }); const ctxPayload = buildWhatsAppInboundContext({ + bodyForAgent: msgForAgent.body, combinedBody, + commandBody: params.msg.body, commandAuthorized, conversationId, groupHistory: visibleGroupHistory, groupMemberRoster: params.groupMemberNames.get(params.groupHistoryKey), groupSystemPrompt: conversationSystemPrompt, msg: params.msg, + rawBody: params.msg.body, route: params.route, sender: { id: getPrimaryIdentityId(sender) ?? undefined, name: sender.name ?? undefined, e164: sender.e164 ?? undefined, }, + ...(audioTranscript !== undefined ? { transcript: audioTranscript } : {}), replyThreading, visibleReplyTo: visibleReplyTo ?? undefined, }); diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index ef73d214d70..1926904e19d 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -567,6 +567,7 @@ export async function attachWebInboxToSocket( conversationId: inbound.from, to: self.e164 ?? "me", accountId: inbound.access.resolvedAccountId, + accessControlPassed: true, body: enriched.body, pushName: senderName, timestamp, diff --git a/extensions/whatsapp/src/inbound/types.ts b/extensions/whatsapp/src/inbound/types.ts index 0890dc65231..f30c333b9f4 100644 --- a/extensions/whatsapp/src/inbound/types.ts +++ b/extensions/whatsapp/src/inbound/types.ts @@ -57,6 +57,8 @@ export type WebInboundMessage = { conversationId: string; // alias for clarity (same as from) to: string; accountId: string; + /** Set by the real inbound monitor after access-control / pairing checks pass. */ + accessControlPassed?: boolean; body: string; pushName?: string; timestamp?: number; diff --git a/extensions/whatsapp/src/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test-support.ts b/extensions/whatsapp/src/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test-support.ts index 677993d7bc6..16cf52798f3 100644 --- a/extensions/whatsapp/src/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test-support.ts +++ b/extensions/whatsapp/src/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test-support.ts @@ -150,7 +150,12 @@ describe("web monitor inbox", () => { expect(onMessage).toHaveBeenCalledTimes(1); expect(onMessage).toHaveBeenCalledWith( - expect.objectContaining({ from: "+123", to: "+123", body: "self ping" }), + expect.objectContaining({ + from: "+123", + to: "+123", + body: "self ping", + accessControlPassed: true, + }), ); expect(sock.readMessages).not.toHaveBeenCalled();