From 4fdc5854945a1bb4e868e13562b31bed8a367145 Mon Sep 17 00:00:00 2001 From: Roger Deng <13251150+rogerdigital@users.noreply.github.com> Date: Fri, 10 Apr 2026 13:30:13 +0800 Subject: [PATCH] WhatsApp: add preflight audio transcription for DM voice notes --- .../monitor/audio-preflight.runtime.ts | 9 + .../src/auto-reply/monitor/broadcast.ts | 3 + .../src/auto-reply/monitor/on-message.ts | 33 +- .../process-message.audio-preflight.test.ts | 306 ++++++++++++++++++ .../src/auto-reply/monitor/process-message.ts | 57 +++- 5 files changed, 402 insertions(+), 6 deletions(-) create mode 100644 extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts create mode 100644 extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts diff --git a/extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts b/extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts new file mode 100644 index 00000000000..7e7f111d104 --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor/audio-preflight.runtime.ts @@ -0,0 +1,9 @@ +import { transcribeFirstAudio as transcribeFirstAudioImpl } from "openclaw/plugin-sdk/media-runtime"; + +type TranscribeFirstAudio = typeof import("openclaw/plugin-sdk/media-runtime").transcribeFirstAudio; + +export async function transcribeFirstAudio( + ...args: Parameters +): ReturnType { + return await transcribeFirstAudioImpl(...args); +} diff --git a/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts b/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts index 597df193458..2c391706319 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/broadcast.ts @@ -59,8 +59,10 @@ export async function maybeBroadcastMessage(params: { opts?: { groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + preflightAudioTranscript?: string | null; }, ) => Promise; + preflightAudioTranscript?: string | null; }) { const broadcastAgents = params.cfg.broadcast?.[params.peerId]; if (!broadcastAgents || !Array.isArray(broadcastAgents)) { @@ -107,6 +109,7 @@ export async function maybeBroadcastMessage(params: { return await params.processMessage(params.msg, agentRoute, params.groupHistoryKey, { groupHistory: groupHistorySnapshot, suppressGroupHistoryClear: true, + preflightAudioTranscript: params.preflightAudioTranscript, }); } catch (err) { whatsappInboundLog.error(`Broadcast agent ${agentId} failed: ${formatError(err)}`); diff --git a/extensions/whatsapp/src/auto-reply/monitor/on-message.ts b/extensions/whatsapp/src/auto-reply/monitor/on-message.ts index 281c49f91f9..4b3d1705e3e 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/on-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/on-message.ts @@ -39,6 +39,7 @@ export function createWebOnMessageHandler(params: { opts?: { groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + preflightAudioTranscript?: string | null; }, ) => processMessage({ @@ -60,6 +61,7 @@ export function createWebOnMessageHandler(params: { buildCombinedEchoKey: params.echoTracker.buildCombinedKey, groupHistory: opts?.groupHistory, suppressGroupHistoryClear: opts?.suppressGroupHistoryClear, + preflightAudioTranscript: opts?.preflightAudioTranscript, }); return async (msg: WebInboundMsg) => { @@ -159,6 +161,32 @@ export function createWebOnMessageHandler(params: { } } + // Preflight audio transcription: run once here, before broadcast fan-out, so + // all agents share the same transcript instead of each making a separate STT call. + // null = preflight was attempted but produced no transcript (failed / disabled / no audio); + // undefined = preflight was not attempted (non-audio message). + let preflightAudioTranscript: string | null | undefined; + const hasAudioBody = + msg.mediaType?.startsWith("audio/") === true && msg.body === ""; + if (hasAudioBody && msg.mediaPath) { + try { + const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js"); + // transcribeFirstAudio returns undefined on failure/disabled; store null so + // processMessage knows the attempt was already made and does not retry. + preflightAudioTranscript = + (await transcribeFirstAudio({ + ctx: { + MediaPaths: [msg.mediaPath], + MediaTypes: msg.mediaType ? [msg.mediaType] : undefined, + }, + cfg: params.cfg, + })) ?? null; + } catch { + // Non-fatal: store null so per-agent retries are suppressed. + preflightAudioTranscript = null; + } + } + // Broadcast groups: when we'd reply anyway, run multiple agents. // Does not bypass group mention/activation gating above. if ( @@ -169,12 +197,13 @@ export function createWebOnMessageHandler(params: { route, groupHistoryKey, groupHistories: params.groupHistories, - processMessage: processForRoute, + preflightAudioTranscript, + processMessage: (m, r, k, opts) => processForRoute(m, r, k, opts), }) ) { return; } - await processForRoute(msg, route, groupHistoryKey); + await processForRoute(msg, route, groupHistoryKey, { preflightAudioTranscript }); }; } diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts new file mode 100644 index 00000000000..e0656ee24a6 --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.audio-preflight.test.ts @@ -0,0 +1,306 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +// Mock the lazy-loaded audio preflight runtime boundary +const transcribeFirstAudioMock = vi.fn(); + +vi.mock("./audio-preflight.runtime.js", () => ({ + transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args), +})); + +// Controllable shouldComputeCommandAuthorized for command-sync tests +let shouldComputeCommandResult = false; + +// Minimal mocks for process-message dependencies +vi.mock("../../accounts.js", () => ({ + resolveWhatsAppAccount: () => ({ + accountId: "default", + dmPolicy: "pairing", + groupPolicy: "allowlist", + allowFrom: [], + }), +})); + +vi.mock("../../identity.js", () => ({ + getPrimaryIdentityId: () => undefined, + getSelfIdentity: () => ({ e164: "+15550000001" }), + getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }), +})); + +vi.mock("../../reconnect.js", () => ({ + newConnectionId: () => "test-conn-id", +})); + +vi.mock("../../session.js", () => ({ + formatError: (err: unknown) => String(err), +})); + +vi.mock("../deliver-reply.js", () => ({ + deliverWebReply: vi.fn(async () => {}), +})); + +vi.mock("../loggers.js", () => ({ + whatsappInboundLog: { info: () => {}, debug: () => {} }, +})); + +vi.mock("./ack-reaction.js", () => ({ + maybeSendAckReaction: () => {}, +})); + +vi.mock("./inbound-context.js", () => ({ + resolveVisibleWhatsAppGroupHistory: () => [], + resolveVisibleWhatsAppReplyContext: () => null, +})); + +vi.mock("./last-route.js", () => ({ + trackBackgroundTask: () => {}, + updateLastRouteInBackground: () => {}, +})); + +vi.mock("./message-line.js", () => ({ + buildInboundLine: (params: { msg: { body: string } }) => params.msg.body, +})); + +vi.mock("./runtime-api.js", () => ({ + buildHistoryContextFromEntries: (_p: { currentMessage: string }) => _p.currentMessage, + createChannelReplyPipeline: () => ({ onModelSelected: undefined }), + formatInboundEnvelope: (p: { body: string }) => p.body, + logVerbose: () => {}, + normalizeE164: (v: string) => v, + readStoreAllowFromForDmPolicy: async () => [], + recordSessionMetaFromInbound: async () => {}, + resolveChannelContextVisibilityMode: () => "standard", + resolveInboundSessionEnvelopeContext: () => ({ + storePath: "/tmp/sessions.json", + envelopeOptions: {}, + previousTimestamp: undefined, + }), + resolvePinnedMainDmOwnerFromAllowlist: () => null, + resolveDmGroupAccessWithCommandGate: () => ({ commandAuthorized: true }), + shouldComputeCommandAuthorized: (body: string) => + shouldComputeCommandResult || body.startsWith("/"), + shouldLogVerbose: () => false, + type: undefined, +})); + +vi.mock("./inbound-dispatch.js", () => ({ + buildWhatsAppInboundContext: (params: { + msg: { body: string; mediaPath?: string; mediaType?: string }; + }) => ({ + Body: params.msg.body, + BodyForAgent: params.msg.body, + MediaPath: params.msg.mediaPath, + MediaType: params.msg.mediaType, + }), + dispatchWhatsAppBufferedReply: vi.fn(async () => true), + resolveWhatsAppDmRouteTarget: () => "+15550000002", + resolveWhatsAppResponsePrefix: () => undefined, + updateWhatsAppMainLastRoute: () => {}, +})); + +import { dispatchWhatsAppBufferedReply } from "./inbound-dispatch.js"; +import { processMessage } from "./process-message.js"; + +type WebInboundMsg = Parameters[0]["msg"]; +type TestRoute = Parameters[0]["route"]; + +function makeAudioMsg(overrides: Partial = {}): WebInboundMsg { + return { + id: "msg-1", + from: "+15550000002", + to: "+15550000001", + body: "", + chatType: "direct", + mediaType: "audio/ogg; codecs=opus", + mediaPath: "/tmp/voice.ogg", + timestamp: 1700000000, + accountId: "default", + ...overrides, + } as WebInboundMsg; +} + +function makeRoute(overrides: Partial = {}): TestRoute { + return { + agentId: "main", + sessionKey: "agent:main:main", + mainSessionKey: "agent:main:main", + accountId: "default", + ...overrides, + } as TestRoute; +} + +function makeParams(msgOverrides: Partial = {}) { + return { + cfg: { + tools: { media: { audio: { enabled: true } } }, + channels: { whatsapp: {} }, + commands: { useAccessGroups: false }, + } as never, + msg: makeAudioMsg(msgOverrides), + route: makeRoute(), + groupHistoryKey: "whatsapp:default:+15550000002", + groupHistories: new Map(), + groupMemberNames: new Map(), + connectionId: "conn-1", + verbose: false, + maxMediaBytes: 1024 * 1024, + replyResolver: vi.fn() as never, + replyLogger: { + info: () => {}, + warn: () => {}, + debug: () => {}, + error: () => {}, + } as never, + backgroundTasks: new Set>(), + rememberSentText: () => {}, + echoHas: () => false, + echoForget: () => {}, + buildCombinedEchoKey: (p: { combinedBody: string }) => p.combinedBody, + }; +} + +describe("processMessage audio preflight transcription", () => { + beforeEach(() => { + transcribeFirstAudioMock.mockReset(); + shouldComputeCommandResult = false; + vi.mocked(dispatchWhatsAppBufferedReply).mockClear(); + }); + + it("replaces body with transcript when transcription succeeds", async () => { + transcribeFirstAudioMock.mockResolvedValueOnce("okay let's test this voice message"); + + await processMessage(makeParams()); + + expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1); + expect(transcribeFirstAudioMock).toHaveBeenCalledWith( + expect.objectContaining({ + ctx: expect.objectContaining({ + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg; codecs=opus"], + }), + }), + ); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "okay let's test this voice message", + BodyForAgent: "okay let's test this voice message", + }); + // mediaPath and mediaType must be preserved so inboundAudio detection (used by + // features like messages.tts.auto: "inbound") still recognises this as audio. + expect(dispatchCall?.context).toMatchObject({ + MediaPath: "/tmp/voice.ogg", + MediaType: "audio/ogg; codecs=opus", + }); + }); + + it("falls back to placeholder when transcription fails", async () => { + transcribeFirstAudioMock.mockRejectedValueOnce(new Error("provider unavailable")); + + await processMessage(makeParams()); + + expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + BodyForAgent: "", + }); + }); + + it("falls back to placeholder when transcription returns undefined", async () => { + transcribeFirstAudioMock.mockResolvedValueOnce(undefined); + + await processMessage(makeParams()); + + expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + BodyForAgent: "", + }); + }); + + it("does not call transcribeFirstAudio when mediaType is not audio", async () => { + await processMessage( + makeParams({ body: "", mediaType: "image/jpeg", mediaPath: "/tmp/img.jpg" }), + ); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + }); + + it("does not call transcribeFirstAudio when body is not ", async () => { + await processMessage(makeParams({ body: "hello there", mediaType: "audio/ogg; codecs=opus" })); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + }); + + it("does not call transcribeFirstAudio when mediaPath is absent", async () => { + await processMessage(makeParams({ mediaPath: undefined })); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + }); + + it("does not call transcribeFirstAudio when msg.mediaType is absent", async () => { + await processMessage( + makeParams({ mediaType: undefined, body: "", mediaPath: "/tmp/voice.ogg" }), + ); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + + // Body passes through as-is without a mediaType to confirm audio + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + }); + }); + + it("uses transcript body for command detection so voice commands are not missed", async () => { + // Transcript starts with a slash command — shouldComputeCommandAuthorized must + // see the transcript, not the original placeholder. + transcribeFirstAudioMock.mockResolvedValueOnce("/new start a new session"); + + await processMessage(makeParams()); + + // Command detection ran against the transcript, so CommandBody is the transcript. + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "/new start a new session", + BodyForAgent: "/new start a new session", + }); + }); + + it("uses preflightAudioTranscript when provided, skipping transcribeFirstAudio", async () => { + // Simulate broadcast fan-out: caller pre-computed the transcript and passes it in. + // transcribeFirstAudio must NOT be called again inside processMessage. + await processMessage({ + ...makeParams(), + preflightAudioTranscript: "pre-computed transcript from fan-out caller", + }); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "pre-computed transcript from fan-out caller", + BodyForAgent: "pre-computed transcript from fan-out caller", + }); + }); + + it("skips internal STT when preflightAudioTranscript is null (failed preflight sentinel)", async () => { + // null = caller already attempted preflight but got nothing (provider unavailable, + // disabled, etc.). processMessage must NOT retry to avoid 1+N attempts in broadcast. + await processMessage({ + ...makeParams(), + preflightAudioTranscript: null, + }); + + expect(transcribeFirstAudioMock).not.toHaveBeenCalled(); + + // Body falls back to the original placeholder, not retried transcript. + const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0]; + expect(dispatchCall?.context).toMatchObject({ + Body: "", + }); + }); +}); diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts index 1bf4bfb678b..c2a076d9a2d 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts @@ -191,6 +191,12 @@ export async function processMessage(params: { maxMediaTextChunkLimit?: number; groupHistory?: GroupHistoryEntry[]; suppressGroupHistoryClear?: boolean; + /** Pre-computed audio transcript from a caller-level preflight, used to avoid + * re-transcribing the same voice note once per broadcast agent. + * - string → transcript obtained; use it directly, skip internal STT + * - null → preflight was attempted but failed / returned nothing; skip internal STT + * - undefined (omitted) → caller did not attempt preflight; run internal STT as normal */ + preflightAudioTranscript?: string | null; }) { const conversationId = params.msg.conversationId ?? params.msg.from; const self = getSelfIdentity(params.msg); @@ -210,9 +216,50 @@ export async function processMessage(params: { agentId: params.route.agentId, sessionKey: params.route.sessionKey, }); + // Preflight audio transcription: transcribe voice notes before building the + // inbound context so the agent receives the transcript instead of . + // Mirrors the preflight step added for Telegram in #61008. + // When the caller already performed transcription (e.g. on-message.ts before + // broadcast fan-out) the pre-computed result is reused to avoid N STT calls + // for N broadcast agents on the same voice note. + // preflightAudioTranscript semantics: + // string → transcript ready, use it + // null → caller attempted but got nothing; skip internal STT to avoid retry + // undefined → caller did not attempt; run internal STT + let audioTranscript: string | undefined = params.preflightAudioTranscript ?? undefined; + const hasAudioBody = + params.msg.mediaType?.startsWith("audio/") === true && params.msg.body === ""; + if (params.preflightAudioTranscript === undefined && hasAudioBody && params.msg.mediaPath) { + try { + const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js"); + audioTranscript = await transcribeFirstAudio({ + ctx: { + MediaPaths: [params.msg.mediaPath], + MediaTypes: params.msg.mediaType ? [params.msg.mediaType] : undefined, + }, + cfg: params.cfg, + }); + } catch { + // Transcription failure is non-fatal: fall back to placeholder. + if (shouldLogVerbose()) { + logVerbose("whatsapp: audio preflight transcription failed, using placeholder"); + } + } + } + + // If we have a transcript, replace the body so the agent sees the spoken text. + // mediaPath and mediaType are intentionally preserved so that inboundAudio detection + // (used by features such as messages.tts.auto: "inbound") still sees this as an + // audio message. The transcript is also stored in Transcript so downstream pipelines + // can detect it. Preventing a second STT pass in the media-understanding pipeline + // requires SDK-level support (alreadyTranscribed on a shared attachment instance); + // that is a shared concern across all channels and is tracked separately. + const msgForInbound = + audioTranscript !== undefined ? { ...params.msg, body: audioTranscript } : params.msg; + let combinedBody = buildInboundLine({ cfg: params.cfg, - msg: params.msg, + msg: msgForInbound, agentId: params.route.agentId, previousTimestamp, envelope: envelopeOptions, @@ -316,10 +363,12 @@ export async function processMessage(params: { senderE164: sender.e164 ?? undefined, normalizeE164, }); - const commandAuthorized = shouldComputeCommandAuthorized(params.msg.body, params.cfg) + // Use msgForInbound so that if a voice note transcribes to a command (e.g. /new), + // command detection and auth are evaluated against the transcript, not . + const commandAuthorized = shouldComputeCommandAuthorized(msgForInbound.body, params.cfg) ? await resolveWhatsAppCommandAuthorized({ cfg: params.cfg, - msg: params.msg, + msg: msgForInbound, policy: inboundPolicy, }) : undefined; @@ -359,7 +408,7 @@ export async function processMessage(params: { groupHistory: visibleGroupHistory, groupMemberRoster: params.groupMemberNames.get(params.groupHistoryKey), groupSystemPrompt: conversationSystemPrompt, - msg: params.msg, + msg: msgForInbound, route: params.route, sender: { id: getPrimaryIdentityId(sender) ?? undefined,