mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
WhatsApp: add preflight audio transcription for DM voice notes (#64120)
Merged via squash.
Prepared head SHA: 7480b339da
Co-authored-by: rogerdigital <13251150+rogerdigital@users.noreply.github.com>
Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com>
Reviewed-by: @mcaxtr
This commit is contained in:
@@ -155,6 +155,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/startup: await startup sidecars before channel monitors report ready, reducing Discord and plugin startup races while still keeping gateway boot observability intact. Thanks @steipete.
|
||||
- Plugins/Google Meet: report required manual actions for Chrome joins, use browser automation for Meet entry, and persist the private-WS node opt-in so paired-node realtime sessions keep their intended network policy. Thanks @steipete.
|
||||
- Slack: route native stream fallback replies through the normal chunked sender so long buffered Slack Connect responses are not dropped or duplicated. (#71124) Thanks @martingarramon.
|
||||
- WhatsApp: transcribe accepted voice notes before agent dispatch while keeping spoken transcripts out of command authorization. (#64120) Thanks @rogerdigital.
|
||||
|
||||
## 2026.4.23
|
||||
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
import { transcribeFirstAudio as transcribeFirstAudioImpl } from "openclaw/plugin-sdk/media-runtime";
|
||||
|
||||
type TranscribeFirstAudio = typeof import("openclaw/plugin-sdk/media-runtime").transcribeFirstAudio;
|
||||
|
||||
export async function transcribeFirstAudio(
|
||||
...args: Parameters<TranscribeFirstAudio>
|
||||
): ReturnType<TranscribeFirstAudio> {
|
||||
return await transcribeFirstAudioImpl(...args);
|
||||
}
|
||||
@@ -59,8 +59,12 @@ export async function maybeBroadcastMessage(params: {
|
||||
opts?: {
|
||||
groupHistory?: GroupHistoryEntry[];
|
||||
suppressGroupHistoryClear?: boolean;
|
||||
preflightAudioTranscript?: string | null;
|
||||
ackAlreadySent?: boolean;
|
||||
},
|
||||
) => Promise<boolean>;
|
||||
preflightAudioTranscript?: string | null;
|
||||
ackAlreadySent?: boolean;
|
||||
}) {
|
||||
const broadcastAgents = params.cfg.broadcast?.[params.peerId];
|
||||
if (!broadcastAgents || !Array.isArray(broadcastAgents)) {
|
||||
@@ -104,10 +108,22 @@ export async function maybeBroadcastMessage(params: {
|
||||
: baseAgentRoute;
|
||||
|
||||
try {
|
||||
return await params.processMessage(params.msg, agentRoute, params.groupHistoryKey, {
|
||||
const opts: {
|
||||
groupHistory?: GroupHistoryEntry[];
|
||||
suppressGroupHistoryClear: true;
|
||||
preflightAudioTranscript?: string | null;
|
||||
ackAlreadySent?: boolean;
|
||||
} = {
|
||||
groupHistory: groupHistorySnapshot,
|
||||
suppressGroupHistoryClear: true,
|
||||
});
|
||||
};
|
||||
if (params.preflightAudioTranscript !== undefined) {
|
||||
opts.preflightAudioTranscript = params.preflightAudioTranscript;
|
||||
}
|
||||
if (params.ackAlreadySent === true) {
|
||||
opts.ackAlreadySent = true;
|
||||
}
|
||||
return await params.processMessage(params.msg, agentRoute, params.groupHistoryKey, opts);
|
||||
} catch (err) {
|
||||
whatsappInboundLog.error(`Broadcast agent ${agentId} failed: ${formatError(err)}`);
|
||||
return false;
|
||||
|
||||
@@ -175,6 +175,35 @@ describe("whatsapp inbound dispatch", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps agent and command bodies independently overridable", () => {
|
||||
const ctx = buildWhatsAppInboundContext({
|
||||
bodyForAgent: "spoken transcript",
|
||||
combinedBody: "spoken transcript",
|
||||
commandBody: "<media:audio>",
|
||||
conversationId: "+1000",
|
||||
msg: makeMsg({
|
||||
body: "<media:audio>",
|
||||
mediaPath: "/tmp/voice.ogg",
|
||||
mediaType: "audio/ogg; codecs=opus",
|
||||
}),
|
||||
rawBody: "<media:audio>",
|
||||
route: makeRoute(),
|
||||
sender: {
|
||||
e164: "+1000",
|
||||
},
|
||||
transcript: "spoken transcript",
|
||||
});
|
||||
|
||||
expect(ctx).toMatchObject({
|
||||
Body: "spoken transcript",
|
||||
BodyForAgent: "spoken transcript",
|
||||
BodyForCommands: "<media:audio>",
|
||||
CommandBody: "<media:audio>",
|
||||
RawBody: "<media:audio>",
|
||||
Transcript: "spoken transcript",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back SenderId to SenderE164 when sender id is missing", () => {
|
||||
const ctx = buildWhatsAppInboundContext({
|
||||
combinedBody: "hi",
|
||||
|
||||
@@ -86,15 +86,19 @@ export function resolveWhatsAppResponsePrefix(params: {
|
||||
}
|
||||
|
||||
export function buildWhatsAppInboundContext(params: {
|
||||
bodyForAgent?: string;
|
||||
combinedBody: string;
|
||||
commandBody?: string;
|
||||
commandAuthorized?: boolean;
|
||||
conversationId: string;
|
||||
groupHistory?: GroupHistoryEntry[];
|
||||
groupMemberRoster?: Map<string, string>;
|
||||
groupSystemPrompt?: string;
|
||||
msg: WebInboundMsg;
|
||||
rawBody?: string;
|
||||
route: ReturnType<typeof resolveAgentRoute>;
|
||||
sender: SenderContext;
|
||||
transcript?: string;
|
||||
replyThreading?: ReplyThreadingContext;
|
||||
visibleReplyTo?: VisibleReplyTarget;
|
||||
}) {
|
||||
@@ -109,10 +113,11 @@ export function buildWhatsAppInboundContext(params: {
|
||||
|
||||
const result = finalizeInboundContext({
|
||||
Body: params.combinedBody,
|
||||
BodyForAgent: params.msg.body,
|
||||
BodyForAgent: params.bodyForAgent ?? params.msg.body,
|
||||
InboundHistory: inboundHistory,
|
||||
RawBody: params.msg.body,
|
||||
CommandBody: params.msg.body,
|
||||
RawBody: params.rawBody ?? params.msg.body,
|
||||
CommandBody: params.commandBody ?? params.msg.body,
|
||||
Transcript: params.transcript,
|
||||
From: params.msg.from,
|
||||
To: params.msg.to,
|
||||
SessionKey: params.route.sessionKey,
|
||||
|
||||
@@ -0,0 +1,250 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const events: string[] = [];
|
||||
const transcribeFirstAudioMock = vi.fn();
|
||||
const maybeSendAckReactionMock = vi.fn();
|
||||
const processMessageMock = vi.fn();
|
||||
const maybeBroadcastMessageMock = vi.fn();
|
||||
|
||||
vi.mock("./audio-preflight.runtime.js", () => ({
|
||||
transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./ack-reaction.js", () => ({
|
||||
maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./process-message.js", () => ({
|
||||
processMessage: (...args: unknown[]) => processMessageMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./broadcast.js", () => ({
|
||||
maybeBroadcastMessage: (...args: unknown[]) => maybeBroadcastMessageMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./group-gating.js", () => ({
|
||||
applyGroupGating: vi.fn(async () => ({ shouldProcess: true })),
|
||||
}));
|
||||
|
||||
vi.mock("./last-route.js", () => ({
|
||||
updateLastRouteInBackground: () => {},
|
||||
}));
|
||||
|
||||
vi.mock("./peer.js", () => ({
|
||||
resolvePeerId: (msg: { from: string }) => msg.from,
|
||||
}));
|
||||
|
||||
vi.mock("../config.runtime.js", () => ({
|
||||
loadConfig: () => ({
|
||||
channels: {
|
||||
whatsapp: {
|
||||
ackReaction: { enabled: true },
|
||||
},
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../../group-session-key.js", () => ({
|
||||
resolveWhatsAppGroupSessionRoute: (route: unknown) => route,
|
||||
}));
|
||||
|
||||
vi.mock("../../identity.js", () => ({
|
||||
getPrimaryIdentityId: () => undefined,
|
||||
getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }),
|
||||
}));
|
||||
|
||||
vi.mock("../../text-runtime.js", () => ({
|
||||
normalizeE164: (value: string) => value,
|
||||
}));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/routing", () => ({
|
||||
buildGroupHistoryKey: () => "group-key",
|
||||
resolveAgentRoute: () => ({
|
||||
agentId: "main",
|
||||
accountId: "default",
|
||||
sessionKey: "agent:main:whatsapp:+15550000002",
|
||||
mainSessionKey: "agent:main:main",
|
||||
}),
|
||||
}));
|
||||
|
||||
import type { WebInboundMsg } from "../types.js";
|
||||
import { createWebOnMessageHandler } from "./on-message.js";
|
||||
|
||||
function makeAudioMsg(): WebInboundMsg {
|
||||
return {
|
||||
id: "msg-1",
|
||||
from: "+15550000002",
|
||||
to: "+15550000001",
|
||||
accessControlPassed: true,
|
||||
body: "<media:audio>",
|
||||
chatType: "direct",
|
||||
mediaType: "audio/ogg; codecs=opus",
|
||||
mediaPath: "/tmp/voice.ogg",
|
||||
timestamp: 1700000000,
|
||||
accountId: "default",
|
||||
} as WebInboundMsg;
|
||||
}
|
||||
|
||||
function makeGroupAudioMsg(): WebInboundMsg {
|
||||
return {
|
||||
...makeAudioMsg(),
|
||||
from: "1203630@g.us",
|
||||
chatId: "1203630@g.us",
|
||||
chatType: "group",
|
||||
conversationId: "1203630@g.us",
|
||||
wasMentioned: false,
|
||||
} as WebInboundMsg;
|
||||
}
|
||||
|
||||
function makeEchoTracker() {
|
||||
return {
|
||||
has: () => false,
|
||||
forget: () => {},
|
||||
rememberText: () => {},
|
||||
buildCombinedKey: (p: { combinedBody: string }) => p.combinedBody,
|
||||
};
|
||||
}
|
||||
|
||||
describe("createWebOnMessageHandler audio preflight", () => {
|
||||
beforeEach(() => {
|
||||
events.length = 0;
|
||||
maybeBroadcastMessageMock.mockReset();
|
||||
maybeBroadcastMessageMock.mockImplementation(async () => false);
|
||||
maybeSendAckReactionMock.mockReset();
|
||||
maybeSendAckReactionMock.mockImplementation(async () => {
|
||||
events.push("ack");
|
||||
});
|
||||
transcribeFirstAudioMock.mockReset();
|
||||
transcribeFirstAudioMock.mockImplementation(async () => {
|
||||
events.push("stt");
|
||||
return "transcribed voice note";
|
||||
});
|
||||
processMessageMock.mockReset();
|
||||
processMessageMock.mockResolvedValue(true);
|
||||
});
|
||||
|
||||
it("sends ack reaction before audio preflight for voice notes", async () => {
|
||||
const handler = createWebOnMessageHandler({
|
||||
cfg: {
|
||||
channels: {
|
||||
whatsapp: {
|
||||
ackReaction: { enabled: true },
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
verbose: false,
|
||||
connectionId: "conn-1",
|
||||
maxMediaBytes: 1024 * 1024,
|
||||
groupHistoryLimit: 20,
|
||||
groupHistories: new Map(),
|
||||
groupMemberNames: new Map(),
|
||||
echoTracker: makeEchoTracker() as never,
|
||||
backgroundTasks: new Set(),
|
||||
replyResolver: vi.fn() as never,
|
||||
replyLogger: {
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
debug: () => {},
|
||||
error: () => {},
|
||||
} as never,
|
||||
baseMentionConfig: {} as never,
|
||||
account: { authDir: "/tmp/auth", accountId: "default" },
|
||||
});
|
||||
|
||||
await handler(makeAudioMsg());
|
||||
|
||||
expect(events).toEqual(["ack", "stt"]);
|
||||
expect(processMessageMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
preflightAudioTranscript: "transcribed voice note",
|
||||
ackAlreadySent: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("skips early DM ack/preflight when access-control was not explicitly passed through", async () => {
|
||||
|
||||
const handler = createWebOnMessageHandler({
|
||||
cfg: {
|
||||
channels: {
|
||||
whatsapp: {
|
||||
ackReaction: { enabled: true },
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
verbose: false,
|
||||
connectionId: "conn-1",
|
||||
maxMediaBytes: 1024 * 1024,
|
||||
groupHistoryLimit: 20,
|
||||
groupHistories: new Map(),
|
||||
groupMemberNames: new Map(),
|
||||
echoTracker: makeEchoTracker() as never,
|
||||
backgroundTasks: new Set(),
|
||||
replyResolver: vi.fn() as never,
|
||||
replyLogger: {
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
debug: () => {},
|
||||
error: () => {},
|
||||
} as never,
|
||||
baseMentionConfig: {} as never,
|
||||
account: { authDir: "/tmp/auth", accountId: "default" },
|
||||
});
|
||||
|
||||
await handler({ ...makeAudioMsg(), accessControlPassed: undefined });
|
||||
|
||||
expect(events).toEqual([]);
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
expect(maybeSendAckReactionMock).not.toHaveBeenCalled();
|
||||
expect(processMessageMock).toHaveBeenCalledWith(
|
||||
expect.not.objectContaining({
|
||||
preflightAudioTranscript: expect.anything(),
|
||||
ackAlreadySent: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("preserves per-agent ack checks for group broadcast voice notes", async () => {
|
||||
maybeBroadcastMessageMock.mockImplementation(
|
||||
async (params: { ackAlreadySent?: boolean; preflightAudioTranscript?: string | null }) => {
|
||||
expect(params.preflightAudioTranscript).toBe("transcribed voice note");
|
||||
expect(params.ackAlreadySent).toBeUndefined();
|
||||
return true;
|
||||
},
|
||||
);
|
||||
const handler = createWebOnMessageHandler({
|
||||
cfg: {
|
||||
channels: {
|
||||
whatsapp: {
|
||||
ackReaction: { enabled: true },
|
||||
},
|
||||
},
|
||||
broadcast: {
|
||||
"1203630@g.us": ["main", "backup"],
|
||||
},
|
||||
} as never,
|
||||
verbose: false,
|
||||
connectionId: "conn-1",
|
||||
maxMediaBytes: 1024 * 1024,
|
||||
groupHistoryLimit: 20,
|
||||
groupHistories: new Map(),
|
||||
groupMemberNames: new Map(),
|
||||
echoTracker: makeEchoTracker() as never,
|
||||
backgroundTasks: new Set(),
|
||||
replyResolver: vi.fn() as never,
|
||||
replyLogger: {
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
debug: () => {},
|
||||
error: () => {},
|
||||
} as never,
|
||||
baseMentionConfig: {} as never,
|
||||
account: { authDir: "/tmp/auth", accountId: "default" },
|
||||
});
|
||||
|
||||
await handler(makeGroupAudioMsg());
|
||||
|
||||
expect(events).toEqual(["ack", "stt"]);
|
||||
expect(processMessageMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,7 @@ import { normalizeE164 } from "../../text-runtime.js";
|
||||
import { loadConfig } from "../config.runtime.js";
|
||||
import type { MentionConfig } from "../mentions.js";
|
||||
import type { WebInboundMsg } from "../types.js";
|
||||
import { maybeSendAckReaction } from "./ack-reaction.js";
|
||||
import { maybeBroadcastMessage } from "./broadcast.js";
|
||||
import type { EchoTracker } from "./echo.js";
|
||||
import type { GroupHistoryEntry } from "./group-gating.js";
|
||||
@@ -39,9 +40,11 @@ export function createWebOnMessageHandler(params: {
|
||||
opts?: {
|
||||
groupHistory?: GroupHistoryEntry[];
|
||||
suppressGroupHistoryClear?: boolean;
|
||||
preflightAudioTranscript?: string | null;
|
||||
ackAlreadySent?: boolean;
|
||||
},
|
||||
) =>
|
||||
processMessage({
|
||||
) => {
|
||||
const processParams: Parameters<typeof processMessage>[0] = {
|
||||
cfg: params.cfg,
|
||||
msg,
|
||||
route,
|
||||
@@ -58,9 +61,21 @@ export function createWebOnMessageHandler(params: {
|
||||
echoHas: params.echoTracker.has,
|
||||
echoForget: params.echoTracker.forget,
|
||||
buildCombinedEchoKey: params.echoTracker.buildCombinedKey,
|
||||
groupHistory: opts?.groupHistory,
|
||||
suppressGroupHistoryClear: opts?.suppressGroupHistoryClear,
|
||||
});
|
||||
};
|
||||
if (opts?.groupHistory !== undefined) {
|
||||
processParams.groupHistory = opts.groupHistory;
|
||||
}
|
||||
if (opts?.suppressGroupHistoryClear !== undefined) {
|
||||
processParams.suppressGroupHistoryClear = opts.suppressGroupHistoryClear;
|
||||
}
|
||||
if (opts?.preflightAudioTranscript !== undefined) {
|
||||
processParams.preflightAudioTranscript = opts.preflightAudioTranscript;
|
||||
}
|
||||
if (opts?.ackAlreadySent === true) {
|
||||
processParams.ackAlreadySent = true;
|
||||
}
|
||||
return processMessage(processParams);
|
||||
};
|
||||
|
||||
return async (msg: WebInboundMsg) => {
|
||||
const conversationId = msg.conversationId ?? msg.from;
|
||||
@@ -159,6 +174,49 @@ export function createWebOnMessageHandler(params: {
|
||||
}
|
||||
}
|
||||
|
||||
// Preflight audio transcription: run once here, before broadcast fan-out, so
|
||||
// all agents share the same transcript instead of each making a separate STT call.
|
||||
// For DMs, only do this on the real inbound path after access-control/pairing
|
||||
// checks have already passed in inbound/monitor.ts. That keeps external STT and
|
||||
// early ack feedback behind the same auth-first gate as the rest of DM handling.
|
||||
// null = preflight was attempted but produced no transcript (failed / disabled / no audio);
|
||||
// undefined = preflight was not attempted (non-audio message).
|
||||
let preflightAudioTranscript: string | null | undefined;
|
||||
const hasAudioBody =
|
||||
msg.mediaType?.startsWith("audio/") === true && msg.body === "<media:audio>";
|
||||
const canRunEarlyDmPreflight = msg.chatType === "group" || msg.accessControlPassed === true;
|
||||
let ackAlreadySent = false;
|
||||
if (canRunEarlyDmPreflight && hasAudioBody && msg.mediaPath) {
|
||||
await maybeSendAckReaction({
|
||||
cfg: params.cfg,
|
||||
msg,
|
||||
agentId: route.agentId,
|
||||
sessionKey: route.sessionKey,
|
||||
conversationId,
|
||||
verbose: params.verbose,
|
||||
accountId: route.accountId,
|
||||
info: params.replyLogger.info.bind(params.replyLogger),
|
||||
warn: params.replyLogger.warn.bind(params.replyLogger),
|
||||
});
|
||||
ackAlreadySent = true;
|
||||
try {
|
||||
const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js");
|
||||
// transcribeFirstAudio returns undefined on failure/disabled; store null so
|
||||
// processMessage knows the attempt was already made and does not retry.
|
||||
preflightAudioTranscript =
|
||||
(await transcribeFirstAudio({
|
||||
ctx: {
|
||||
MediaPaths: [msg.mediaPath],
|
||||
MediaTypes: msg.mediaType ? [msg.mediaType] : undefined,
|
||||
},
|
||||
cfg: params.cfg,
|
||||
})) ?? null;
|
||||
} catch {
|
||||
// Non-fatal: store null so per-agent retries are suppressed.
|
||||
preflightAudioTranscript = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast groups: when we'd reply anyway, run multiple agents.
|
||||
// Does not bypass group mention/activation gating above.
|
||||
if (
|
||||
@@ -169,12 +227,20 @@ export function createWebOnMessageHandler(params: {
|
||||
route,
|
||||
groupHistoryKey,
|
||||
groupHistories: params.groupHistories,
|
||||
processMessage: processForRoute,
|
||||
...(preflightAudioTranscript !== undefined ? { preflightAudioTranscript } : {}),
|
||||
// Group ack eligibility depends on the target agent/session, so a
|
||||
// preflight ack attempt on the base route must not suppress downstream
|
||||
// per-agent checks during broadcast fan-out.
|
||||
...(ackAlreadySent && msg.chatType !== "group" ? { ackAlreadySent: true } : {}),
|
||||
processMessage: (m, r, k, opts) => processForRoute(m, r, k, opts),
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
await processForRoute(msg, route, groupHistoryKey);
|
||||
await processForRoute(msg, route, groupHistoryKey, {
|
||||
...(preflightAudioTranscript !== undefined ? { preflightAudioTranscript } : {}),
|
||||
...(ackAlreadySent ? { ackAlreadySent: true } : {}),
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -0,0 +1,341 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// Mock the lazy-loaded audio preflight runtime boundary
|
||||
const transcribeFirstAudioMock = vi.fn();
|
||||
const maybeSendAckReactionMock = vi.fn();
|
||||
|
||||
vi.mock("./audio-preflight.runtime.js", () => ({
|
||||
transcribeFirstAudio: (...args: unknown[]) => transcribeFirstAudioMock(...args),
|
||||
}));
|
||||
|
||||
// Controllable shouldComputeCommandAuthorized for command-sync tests
|
||||
let shouldComputeCommandResult = false;
|
||||
let shouldComputeCommandBodies: string[] = [];
|
||||
|
||||
// Minimal mocks for process-message dependencies
|
||||
vi.mock("../../accounts.js", () => ({
|
||||
resolveWhatsAppAccount: () => ({
|
||||
accountId: "default",
|
||||
dmPolicy: "pairing",
|
||||
groupPolicy: "allowlist",
|
||||
allowFrom: [],
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../../identity.js", () => ({
|
||||
getPrimaryIdentityId: () => undefined,
|
||||
getSelfIdentity: () => ({ e164: "+15550000001" }),
|
||||
getSenderIdentity: () => ({ e164: "+15550000002", name: "Alice" }),
|
||||
}));
|
||||
|
||||
vi.mock("../../reconnect.js", () => ({
|
||||
newConnectionId: () => "test-conn-id",
|
||||
}));
|
||||
|
||||
vi.mock("../../session.js", () => ({
|
||||
formatError: (err: unknown) => String(err),
|
||||
}));
|
||||
|
||||
vi.mock("../deliver-reply.js", () => ({
|
||||
deliverWebReply: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("../loggers.js", () => ({
|
||||
whatsappInboundLog: { info: () => {}, debug: () => {} },
|
||||
}));
|
||||
|
||||
vi.mock("./ack-reaction.js", () => ({
|
||||
maybeSendAckReaction: (...args: unknown[]) => maybeSendAckReactionMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./inbound-context.js", () => ({
|
||||
resolveVisibleWhatsAppGroupHistory: () => [],
|
||||
resolveVisibleWhatsAppReplyContext: () => null,
|
||||
}));
|
||||
|
||||
vi.mock("./last-route.js", () => ({
|
||||
trackBackgroundTask: () => {},
|
||||
updateLastRouteInBackground: () => {},
|
||||
}));
|
||||
|
||||
vi.mock("./message-line.js", () => ({
|
||||
buildInboundLine: (params: { msg: { body: string } }) => params.msg.body,
|
||||
}));
|
||||
|
||||
vi.mock("./runtime-api.js", () => ({
|
||||
buildHistoryContextFromEntries: (_p: { currentMessage: string }) => _p.currentMessage,
|
||||
createChannelReplyPipeline: () => ({ onModelSelected: undefined }),
|
||||
formatInboundEnvelope: (p: { body: string }) => p.body,
|
||||
logVerbose: () => {},
|
||||
normalizeE164: (v: string) => v,
|
||||
readStoreAllowFromForDmPolicy: async () => [],
|
||||
recordSessionMetaFromInbound: async () => {},
|
||||
resolveChannelContextVisibilityMode: () => "standard",
|
||||
resolveInboundSessionEnvelopeContext: () => ({
|
||||
storePath: "/tmp/sessions.json",
|
||||
envelopeOptions: {},
|
||||
previousTimestamp: undefined,
|
||||
}),
|
||||
resolvePinnedMainDmOwnerFromAllowlist: () => null,
|
||||
resolveDmGroupAccessWithCommandGate: () => ({ commandAuthorized: true }),
|
||||
shouldComputeCommandAuthorized: (body: string) => {
|
||||
shouldComputeCommandBodies.push(body);
|
||||
return shouldComputeCommandResult || body.startsWith("/");
|
||||
},
|
||||
shouldLogVerbose: () => false,
|
||||
type: undefined,
|
||||
}));
|
||||
|
||||
vi.mock("./inbound-dispatch.js", () => ({
|
||||
buildWhatsAppInboundContext: (params: {
|
||||
bodyForAgent?: string;
|
||||
combinedBody: string;
|
||||
commandAuthorized?: boolean;
|
||||
commandBody?: string;
|
||||
msg: { body: string; mediaPath?: string; mediaType?: string };
|
||||
rawBody?: string;
|
||||
transcript?: string;
|
||||
}) => ({
|
||||
Body: params.combinedBody,
|
||||
BodyForAgent: params.bodyForAgent ?? params.msg.body,
|
||||
CommandAuthorized: params.commandAuthorized,
|
||||
CommandBody: params.commandBody ?? params.msg.body,
|
||||
MediaPath: params.msg.mediaPath,
|
||||
MediaType: params.msg.mediaType,
|
||||
RawBody: params.rawBody ?? params.msg.body,
|
||||
Transcript: params.transcript,
|
||||
}),
|
||||
dispatchWhatsAppBufferedReply: vi.fn(async () => true),
|
||||
resolveWhatsAppDmRouteTarget: () => "+15550000002",
|
||||
resolveWhatsAppResponsePrefix: () => undefined,
|
||||
updateWhatsAppMainLastRoute: () => {},
|
||||
}));
|
||||
|
||||
import { dispatchWhatsAppBufferedReply } from "./inbound-dispatch.js";
|
||||
import { processMessage } from "./process-message.js";
|
||||
|
||||
type WebInboundMsg = Parameters<typeof processMessage>[0]["msg"];
|
||||
type TestRoute = Parameters<typeof processMessage>[0]["route"];
|
||||
|
||||
function makeAudioMsg(overrides: Partial<WebInboundMsg> = {}): WebInboundMsg {
|
||||
return {
|
||||
id: "msg-1",
|
||||
from: "+15550000002",
|
||||
to: "+15550000001",
|
||||
body: "<media:audio>",
|
||||
chatType: "direct",
|
||||
mediaType: "audio/ogg; codecs=opus",
|
||||
mediaPath: "/tmp/voice.ogg",
|
||||
timestamp: 1700000000,
|
||||
accountId: "default",
|
||||
...overrides,
|
||||
} as WebInboundMsg;
|
||||
}
|
||||
|
||||
function makeRoute(overrides: Partial<TestRoute> = {}): TestRoute {
|
||||
return {
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
mainSessionKey: "agent:main:main",
|
||||
accountId: "default",
|
||||
...overrides,
|
||||
} as TestRoute;
|
||||
}
|
||||
|
||||
function makeParams(msgOverrides: Partial<WebInboundMsg> = {}) {
|
||||
return {
|
||||
cfg: {
|
||||
tools: { media: { audio: { enabled: true } } },
|
||||
channels: { whatsapp: {} },
|
||||
commands: { useAccessGroups: false },
|
||||
} as never,
|
||||
msg: makeAudioMsg(msgOverrides),
|
||||
route: makeRoute(),
|
||||
groupHistoryKey: "whatsapp:default:+15550000002",
|
||||
groupHistories: new Map(),
|
||||
groupMemberNames: new Map(),
|
||||
connectionId: "conn-1",
|
||||
verbose: false,
|
||||
maxMediaBytes: 1024 * 1024,
|
||||
replyResolver: vi.fn() as never,
|
||||
replyLogger: {
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
debug: () => {},
|
||||
error: () => {},
|
||||
} as never,
|
||||
backgroundTasks: new Set<Promise<unknown>>(),
|
||||
rememberSentText: () => {},
|
||||
echoHas: () => false,
|
||||
echoForget: () => {},
|
||||
buildCombinedEchoKey: (p: { combinedBody: string }) => p.combinedBody,
|
||||
};
|
||||
}
|
||||
|
||||
describe("processMessage audio preflight transcription", () => {
|
||||
beforeEach(() => {
|
||||
transcribeFirstAudioMock.mockReset();
|
||||
maybeSendAckReactionMock.mockReset();
|
||||
maybeSendAckReactionMock.mockResolvedValue(undefined);
|
||||
shouldComputeCommandResult = false;
|
||||
shouldComputeCommandBodies = [];
|
||||
vi.mocked(dispatchWhatsAppBufferedReply).mockClear();
|
||||
});
|
||||
|
||||
it("replaces <media:audio> body with transcript when transcription succeeds", async () => {
|
||||
transcribeFirstAudioMock.mockResolvedValueOnce("okay let's test this voice message");
|
||||
|
||||
await processMessage(makeParams());
|
||||
|
||||
expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1);
|
||||
expect(transcribeFirstAudioMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
ctx: expect.objectContaining({
|
||||
MediaPaths: ["/tmp/voice.ogg"],
|
||||
MediaTypes: ["audio/ogg; codecs=opus"],
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "okay let's test this voice message",
|
||||
BodyForAgent: "okay let's test this voice message",
|
||||
CommandBody: "<media:audio>",
|
||||
RawBody: "<media:audio>",
|
||||
Transcript: "okay let's test this voice message",
|
||||
});
|
||||
// mediaPath and mediaType must be preserved so inboundAudio detection (used by
|
||||
// features like messages.tts.auto: "inbound") still recognises this as audio.
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
MediaPath: "/tmp/voice.ogg",
|
||||
MediaType: "audio/ogg; codecs=opus",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to <media:audio> placeholder when transcription fails", async () => {
|
||||
transcribeFirstAudioMock.mockRejectedValueOnce(new Error("provider unavailable"));
|
||||
|
||||
await processMessage(makeParams());
|
||||
|
||||
expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "<media:audio>",
|
||||
BodyForAgent: "<media:audio>",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to <media:audio> placeholder when transcription returns undefined", async () => {
|
||||
transcribeFirstAudioMock.mockResolvedValueOnce(undefined);
|
||||
|
||||
await processMessage(makeParams());
|
||||
|
||||
expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "<media:audio>",
|
||||
BodyForAgent: "<media:audio>",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not call transcribeFirstAudio when mediaType is not audio", async () => {
|
||||
await processMessage(
|
||||
makeParams({ body: "<media:image>", mediaType: "image/jpeg", mediaPath: "/tmp/img.jpg" }),
|
||||
);
|
||||
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not call transcribeFirstAudio when body is not <media:audio>", async () => {
|
||||
await processMessage(makeParams({ body: "hello there", mediaType: "audio/ogg; codecs=opus" }));
|
||||
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not call transcribeFirstAudio when mediaPath is absent", async () => {
|
||||
await processMessage(makeParams({ mediaPath: undefined }));
|
||||
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not call transcribeFirstAudio when msg.mediaType is absent", async () => {
|
||||
await processMessage(
|
||||
makeParams({ mediaType: undefined, body: "<media:audio>", mediaPath: "/tmp/voice.ogg" }),
|
||||
);
|
||||
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
|
||||
// Body passes through as-is without a mediaType to confirm audio
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "<media:audio>",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not use transcript body for command detection", async () => {
|
||||
transcribeFirstAudioMock.mockResolvedValueOnce("/new start a new session");
|
||||
|
||||
await processMessage(makeParams());
|
||||
|
||||
expect(shouldComputeCommandBodies).toEqual(["<media:audio>"]);
|
||||
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "/new start a new session",
|
||||
BodyForAgent: "/new start a new session",
|
||||
CommandBody: "<media:audio>",
|
||||
RawBody: "<media:audio>",
|
||||
Transcript: "/new start a new session",
|
||||
});
|
||||
});
|
||||
|
||||
it("uses preflightAudioTranscript when provided, skipping transcribeFirstAudio", async () => {
|
||||
// Simulate broadcast fan-out: caller pre-computed the transcript and passes it in.
|
||||
// transcribeFirstAudio must NOT be called again inside processMessage.
|
||||
await processMessage({
|
||||
...makeParams(),
|
||||
preflightAudioTranscript: "pre-computed transcript from fan-out caller",
|
||||
});
|
||||
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "pre-computed transcript from fan-out caller",
|
||||
BodyForAgent: "pre-computed transcript from fan-out caller",
|
||||
CommandBody: "<media:audio>",
|
||||
RawBody: "<media:audio>",
|
||||
Transcript: "pre-computed transcript from fan-out caller",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not send a duplicate ack when caller already sent it", async () => {
|
||||
await processMessage({
|
||||
...makeParams(),
|
||||
preflightAudioTranscript: "pre-computed transcript from caller",
|
||||
ackAlreadySent: true,
|
||||
});
|
||||
|
||||
expect(maybeSendAckReactionMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips internal STT when preflightAudioTranscript is null (failed preflight sentinel)", async () => {
|
||||
// null = caller already attempted preflight but got nothing (provider unavailable,
|
||||
// disabled, etc.). processMessage must NOT retry to avoid 1+N attempts in broadcast.
|
||||
await processMessage({
|
||||
...makeParams(),
|
||||
preflightAudioTranscript: null,
|
||||
});
|
||||
|
||||
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
|
||||
|
||||
// Body falls back to the original <media:audio> placeholder, not retried transcript.
|
||||
const dispatchCall = vi.mocked(dispatchWhatsAppBufferedReply).mock.calls[0]?.[0];
|
||||
expect(dispatchCall?.context).toMatchObject({
|
||||
Body: "<media:audio>",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -191,6 +191,13 @@ export async function processMessage(params: {
|
||||
maxMediaTextChunkLimit?: number;
|
||||
groupHistory?: GroupHistoryEntry[];
|
||||
suppressGroupHistoryClear?: boolean;
|
||||
ackAlreadySent?: boolean;
|
||||
/** Pre-computed audio transcript from a caller-level preflight, used to avoid
|
||||
* re-transcribing the same voice note once per broadcast agent.
|
||||
* - string → transcript obtained; use it directly, skip internal STT
|
||||
* - null → preflight was attempted but failed / returned nothing; skip internal STT
|
||||
* - undefined (omitted) → caller did not attempt preflight; run internal STT as normal */
|
||||
preflightAudioTranscript?: string | null;
|
||||
}) {
|
||||
const conversationId = params.msg.conversationId ?? params.msg.from;
|
||||
const self = getSelfIdentity(params.msg);
|
||||
@@ -210,9 +217,50 @@ export async function processMessage(params: {
|
||||
agentId: params.route.agentId,
|
||||
sessionKey: params.route.sessionKey,
|
||||
});
|
||||
// Preflight audio transcription: transcribe voice notes before building the
|
||||
// inbound context so the agent receives the transcript instead of <media:audio>.
|
||||
// Mirrors the preflight step added for Telegram in #61008.
|
||||
// When the caller already performed transcription (e.g. on-message.ts before
|
||||
// broadcast fan-out) the pre-computed result is reused to avoid N STT calls
|
||||
// for N broadcast agents on the same voice note.
|
||||
// preflightAudioTranscript semantics:
|
||||
// string → transcript ready, use it
|
||||
// null → caller attempted but got nothing; skip internal STT to avoid retry
|
||||
// undefined → caller did not attempt; run internal STT
|
||||
let audioTranscript: string | undefined = params.preflightAudioTranscript ?? undefined;
|
||||
const hasAudioBody =
|
||||
params.msg.mediaType?.startsWith("audio/") === true && params.msg.body === "<media:audio>";
|
||||
if (params.preflightAudioTranscript === undefined && hasAudioBody && params.msg.mediaPath) {
|
||||
try {
|
||||
const { transcribeFirstAudio } = await import("./audio-preflight.runtime.js");
|
||||
audioTranscript = await transcribeFirstAudio({
|
||||
ctx: {
|
||||
MediaPaths: [params.msg.mediaPath],
|
||||
MediaTypes: params.msg.mediaType ? [params.msg.mediaType] : undefined,
|
||||
},
|
||||
cfg: params.cfg,
|
||||
});
|
||||
} catch {
|
||||
// Transcription failure is non-fatal: fall back to <media:audio> placeholder.
|
||||
if (shouldLogVerbose()) {
|
||||
logVerbose("whatsapp: audio preflight transcription failed, using placeholder");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a transcript, replace the agent-facing body so the agent sees the spoken text.
|
||||
// mediaPath and mediaType are intentionally preserved so that inboundAudio detection
|
||||
// (used by features such as messages.tts.auto: "inbound") still sees this as an
|
||||
// audio message. The transcript is also stored in Transcript so downstream pipelines
|
||||
// can detect it. Preventing a second STT pass in the media-understanding pipeline
|
||||
// requires SDK-level support (alreadyTranscribed on a shared attachment instance);
|
||||
// that is a shared concern across all channels and is tracked separately.
|
||||
const msgForAgent =
|
||||
audioTranscript !== undefined ? { ...params.msg, body: audioTranscript } : params.msg;
|
||||
|
||||
let combinedBody = buildInboundLine({
|
||||
cfg: params.cfg,
|
||||
msg: params.msg,
|
||||
msg: msgForAgent,
|
||||
agentId: params.route.agentId,
|
||||
previousTimestamp,
|
||||
envelope: envelopeOptions,
|
||||
@@ -267,18 +315,22 @@ export async function processMessage(params: {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Send ack reaction immediately upon message receipt (post-gating)
|
||||
await maybeSendAckReaction({
|
||||
cfg: params.cfg,
|
||||
msg: params.msg,
|
||||
agentId: params.route.agentId,
|
||||
sessionKey: params.route.sessionKey,
|
||||
conversationId,
|
||||
verbose: params.verbose,
|
||||
accountId: account.accountId,
|
||||
info: params.replyLogger.info.bind(params.replyLogger),
|
||||
warn: params.replyLogger.warn.bind(params.replyLogger),
|
||||
});
|
||||
// Send ack reaction immediately upon message receipt (post-gating). Callers
|
||||
// that do preflight work before processMessage can send it first and set
|
||||
// ackAlreadySent so slow STT does not delay user-visible receipt feedback.
|
||||
if (params.ackAlreadySent !== true) {
|
||||
await maybeSendAckReaction({
|
||||
cfg: params.cfg,
|
||||
msg: params.msg,
|
||||
agentId: params.route.agentId,
|
||||
sessionKey: params.route.sessionKey,
|
||||
conversationId,
|
||||
verbose: params.verbose,
|
||||
accountId: account.accountId,
|
||||
info: params.replyLogger.info.bind(params.replyLogger),
|
||||
warn: params.replyLogger.warn.bind(params.replyLogger),
|
||||
});
|
||||
}
|
||||
|
||||
const correlationId = params.msg.id ?? newConnectionId();
|
||||
params.replyLogger.info(
|
||||
@@ -353,19 +405,23 @@ export async function processMessage(params: {
|
||||
});
|
||||
|
||||
const ctxPayload = buildWhatsAppInboundContext({
|
||||
bodyForAgent: msgForAgent.body,
|
||||
combinedBody,
|
||||
commandBody: params.msg.body,
|
||||
commandAuthorized,
|
||||
conversationId,
|
||||
groupHistory: visibleGroupHistory,
|
||||
groupMemberRoster: params.groupMemberNames.get(params.groupHistoryKey),
|
||||
groupSystemPrompt: conversationSystemPrompt,
|
||||
msg: params.msg,
|
||||
rawBody: params.msg.body,
|
||||
route: params.route,
|
||||
sender: {
|
||||
id: getPrimaryIdentityId(sender) ?? undefined,
|
||||
name: sender.name ?? undefined,
|
||||
e164: sender.e164 ?? undefined,
|
||||
},
|
||||
...(audioTranscript !== undefined ? { transcript: audioTranscript } : {}),
|
||||
replyThreading,
|
||||
visibleReplyTo: visibleReplyTo ?? undefined,
|
||||
});
|
||||
|
||||
@@ -567,6 +567,7 @@ export async function attachWebInboxToSocket(
|
||||
conversationId: inbound.from,
|
||||
to: self.e164 ?? "me",
|
||||
accountId: inbound.access.resolvedAccountId,
|
||||
accessControlPassed: true,
|
||||
body: enriched.body,
|
||||
pushName: senderName,
|
||||
timestamp,
|
||||
|
||||
@@ -57,6 +57,8 @@ export type WebInboundMessage = {
|
||||
conversationId: string; // alias for clarity (same as from)
|
||||
to: string;
|
||||
accountId: string;
|
||||
/** Set by the real inbound monitor after access-control / pairing checks pass. */
|
||||
accessControlPassed?: boolean;
|
||||
body: string;
|
||||
pushName?: string;
|
||||
timestamp?: number;
|
||||
|
||||
@@ -150,7 +150,12 @@ describe("web monitor inbox", () => {
|
||||
|
||||
expect(onMessage).toHaveBeenCalledTimes(1);
|
||||
expect(onMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ from: "+123", to: "+123", body: "self ping" }),
|
||||
expect.objectContaining({
|
||||
from: "+123",
|
||||
to: "+123",
|
||||
body: "self ping",
|
||||
accessControlPassed: true,
|
||||
}),
|
||||
);
|
||||
expect(sock.readMessages).not.toHaveBeenCalled();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user