feat: adapt voice surfaces to talk events

This commit is contained in:
Peter Steinberger
2026-05-05 20:59:57 +01:00
parent 9e6f38f4e1
commit ada560ece4
14 changed files with 1489 additions and 649 deletions

View File

@@ -2294,6 +2294,8 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
return;
case "session.long_running":
case "session.stalled":
case "session.recovery.completed":
case "session.recovery.requested":
return;
case "session.stuck":
recordSessionStuck(evt);

View File

@@ -3945,6 +3945,23 @@ describe("google-meet plugin", () => {
realtimeTranscriptLines: 2,
lastRealtimeTranscriptRole: "assistant",
});
const talkEventTypes = handle.getHealth().recentTalkEvents?.map((event) => event.type) ?? [];
expect(talkEventTypes).toEqual([
"session.started",
"session.ready",
"turn.started",
"input.audio.delta",
"input.audio.committed",
"transcript.done",
"output.text.done",
"output.audio.started",
"output.audio.delta",
"output.audio.done",
"turn.ended",
]);
expect(talkEventTypes.indexOf("output.text.done")).toBeLessThan(
talkEventTypes.indexOf("output.audio.started"),
);
await handle.stop();
});
@@ -4167,6 +4184,21 @@ describe("google-meet plugin", () => {
undefined,
);
});
expect(handle.getHealth().recentTalkEvents?.map((event) => event.type)).toEqual(
expect.arrayContaining([
"session.started",
"session.ready",
"input.audio.delta",
"output.audio.delta",
"output.audio.done",
"transcript.done",
"output.text.done",
"tool.call",
"tool.progress",
"tool.result",
"turn.ended",
]),
);
expect(runtime.agent.runEmbeddedPiAgent).toHaveBeenCalledWith(
expect.objectContaining({
messageProvider: "google-meet",
@@ -4644,6 +4676,24 @@ describe("google-meet plugin", () => {
lastRealtimeEventDetail: "status=completed",
clearCount: 1,
});
const talkEvents = handle.getHealth().recentTalkEvents ?? [];
expect(talkEvents.map((event) => event.type)).toEqual(
expect.arrayContaining([
"session.started",
"session.ready",
"input.audio.delta",
"output.audio.delta",
"output.audio.done",
"output.text.done",
"tool.call",
"tool.progress",
"tool.result",
"turn.ended",
]),
);
expect(talkEvents[0]).toMatchObject({
sessionId: "google-meet:meet-1:bridge-1:node-realtime",
});
await handle.stop();

View File

@@ -1,4 +1,5 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk/plugin-runtime";
import {
buildRealtimeVoiceAgentConsultWorkingResponse,
@@ -7,7 +8,9 @@ import {
resolveRealtimeVoiceAgentConsultTools,
resolveRealtimeVoiceAgentConsultToolsAllow,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceToolCallEvent,
type RealtimeVoiceTool,
type TalkEventInput,
} from "openclaw/plugin-sdk/realtime-voice";
import { normalizeAgentId } from "openclaw/plugin-sdk/routing";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
@@ -74,3 +77,82 @@ export async function consultOpenClawAgentForGoogleMeet(params: {
extraSystemPrompt: GOOGLE_MEET_CONSULT_SYSTEM_PROMPT,
});
}
export function handleGoogleMeetRealtimeConsultToolCall(params: {
strategy: string;
session: RealtimeVoiceBridgeSession;
event: RealtimeVoiceToolCallEvent;
config: GoogleMeetConfig;
fullConfig: OpenClawConfig;
runtime: PluginRuntime;
logger: RuntimeLogger;
meetingSessionId: string;
requesterSessionKey?: string;
transcript: Array<{ role: "user" | "assistant"; text: string }>;
onTalkEvent?: (event: TalkEventInput) => void;
}): void {
const callId = params.event.callId || params.event.itemId;
if (params.strategy !== "bidi") {
params.onTalkEvent?.({
type: "tool.error",
callId,
payload: {
name: params.event.name,
error: `Tool "${params.event.name}" is only available in bidi realtime strategy`,
},
final: true,
});
params.session.submitToolResult(callId, {
error: `Tool "${params.event.name}" is only available in bidi realtime strategy`,
});
return;
}
if (params.event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) {
params.onTalkEvent?.({
type: "tool.error",
callId,
payload: { name: params.event.name, error: `Tool "${params.event.name}" not available` },
final: true,
});
params.session.submitToolResult(callId, {
error: `Tool "${params.event.name}" not available`,
});
return;
}
params.onTalkEvent?.({
type: "tool.progress",
callId,
payload: { name: params.event.name, status: "working" },
});
submitGoogleMeetConsultWorkingResponse(params.session, callId);
void consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: params.event.args,
transcript: params.transcript,
})
.then((result) => {
params.onTalkEvent?.({
type: "tool.result",
callId,
payload: { name: params.event.name, result },
final: true,
});
params.session.submitToolResult(callId, result);
})
.catch((error: Error) => {
params.onTalkEvent?.({
type: "tool.error",
callId,
payload: { name: params.event.name, error: formatErrorMessage(error) },
final: true,
});
params.session.submitToolResult(callId, {
error: formatErrorMessage(error),
});
});
}

View File

@@ -6,15 +6,20 @@ import type {
RealtimeTranscriptionSession,
} from "openclaw/plugin-sdk/realtime-transcription";
import {
createRealtimeVoiceAgentTalkbackQueue,
createTalkSessionController,
createRealtimeVoiceBridgeSession,
type RealtimeVoiceAgentTalkbackQueue,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceProviderPlugin,
type TalkEvent,
type TalkEventInput,
type TalkSessionController,
} from "openclaw/plugin-sdk/realtime-voice";
import {
consultOpenClawAgentForGoogleMeet,
GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME,
handleGoogleMeetRealtimeConsultToolCall,
resolveGoogleMeetRealtimeTools,
submitGoogleMeetConsultWorkingResponse,
} from "./agent-consult.js";
import type { GoogleMeetConfig } from "./config.js";
import {
@@ -29,6 +34,8 @@ import {
resolveGoogleMeetRealtimeProvider,
resolveGoogleMeetRealtimeTranscriptionProvider,
isGoogleMeetLikelyAssistantEchoTranscript,
pushGoogleMeetTalkEvent,
summarizeGoogleMeetTalkEvents,
convertGoogleMeetBridgeAudioForStt,
convertGoogleMeetTtsAudioForBridge,
formatGoogleMeetAgentAudioModelLog,
@@ -108,9 +115,7 @@ export async function startNodeAgentAudioBridge(params: {
}),
);
const transcript: GoogleMeetRealtimeTranscriptEntry[] = [];
let agentConsultActive = false;
let pendingAgentQuestion: string | undefined;
let agentConsultDebounceTimer: ReturnType<typeof setTimeout> | undefined;
let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined;
let ttsQueue = Promise.resolve();
const stop = async () => {
@@ -118,10 +123,7 @@ export async function startNodeAgentAudioBridge(params: {
return;
}
stopped = true;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
agentConsultDebounceTimer = undefined;
}
agentTalkback?.close();
try {
sttSession?.close();
} catch (error) {
@@ -201,73 +203,26 @@ export async function startNodeAgentAudioBridge(params: {
});
};
const runAgentConsultForUserTranscript = async (question: string): Promise<void> => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
if (agentConsultActive) {
pendingAgentQuestion = trimmed;
return;
}
agentConsultActive = true;
let nextQuestion: string | undefined = trimmed;
try {
while (nextQuestion) {
if (stopped) {
return;
}
const currentQuestion = nextQuestion;
pendingAgentQuestion = undefined;
params.logger.info(`[google-meet] node agent consult: ${currentQuestion}`);
const result = await consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: {
question: currentQuestion,
responseStyle: "Brief, natural spoken answer for a live meeting.",
},
transcript,
});
enqueueSpeakText(result.text);
nextQuestion = pendingAgentQuestion;
}
} catch (error) {
params.logger.warn(`[google-meet] node agent consult failed: ${formatErrorMessage(error)}`);
enqueueSpeakText("I hit an error while checking that. Please try again.");
} finally {
agentConsultActive = false;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}
};
const enqueueAgentConsultForUserTranscript = (question: string): void => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
}
agentConsultDebounceTimer = setTimeout(() => {
agentConsultDebounceTimer = undefined;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS);
agentConsultDebounceTimer.unref?.();
};
agentTalkback = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS,
isStopped: () => stopped,
logger: params.logger,
logPrefix: "[google-meet] node agent",
responseStyle: "Brief, natural spoken answer for a live meeting.",
fallbackText: "I hit an error while checking that. Please try again.",
consult: ({ question, responseStyle }) =>
consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: { question, responseStyle },
transcript,
}),
deliver: enqueueSpeakText,
});
sttSession = resolved.provider.createSession({
providerConfig: resolved.providerConfig,
@@ -284,7 +239,7 @@ export async function startNodeAgentAudioBridge(params: {
);
return;
}
enqueueAgentConsultForUserTranscript(trimmed);
agentTalkback?.enqueue(trimmed);
},
onError: (error) => {
params.logger.warn(
@@ -404,6 +359,54 @@ export async function startNodeRealtimeAudioBridge(params: {
const transcript: GoogleMeetRealtimeTranscriptEntry[] = [];
const realtimeEvents: GoogleMeetRealtimeEventEntry[] = [];
const strategy = params.config.realtime.strategy;
const talk: TalkSessionController = createTalkSessionController({
sessionId: `google-meet:${params.meetingSessionId}:${params.bridgeId}:node-realtime`,
mode: "realtime",
transport: "gateway-relay",
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
provider: resolved.provider.id,
});
const recentTalkEvents: TalkEvent[] = [];
const rememberTalkEvent = (event: TalkEvent | undefined): void => {
if (event) {
pushGoogleMeetTalkEvent(recentTalkEvents, event);
}
};
const emitTalkEvent = (input: TalkEventInput): void => {
rememberTalkEvent(talk.emit(input));
};
const ensureTalkTurn = (): string => {
const turn = talk.ensureTurn({
payload: { bridgeId: params.bridgeId, meetingSessionId: params.meetingSessionId },
});
if (turn.event) {
rememberTalkEvent(turn.event);
}
return turn.turnId;
};
const finishOutputAudio = (reason: string): void => {
rememberTalkEvent(
talk.finishOutputAudio({
payload: { bridgeId: params.bridgeId, reason },
}),
);
};
const endTalkTurn = (reason = "completed"): void => {
const ended = talk.endTurn({
payload: { bridgeId: params.bridgeId, reason },
});
if (ended.ok) {
rememberTalkEvent(ended.event);
}
};
emitTalkEvent({
type: "session.started",
payload: {
bridgeId: params.bridgeId,
meetingSessionId: params.meetingSessionId,
nodeId: params.nodeId,
},
});
params.logger.info(
formatGoogleMeetRealtimeVoiceModelLog({
strategy,
@@ -413,95 +416,36 @@ export async function startNodeRealtimeAudioBridge(params: {
audioFormat: params.config.chrome.audioFormat,
}),
);
let agentConsultActive = false;
let pendingAgentQuestion: string | undefined;
let agentConsultDebounceTimer: ReturnType<typeof setTimeout> | undefined;
const enqueueAgentConsultForUserTranscript = (question: string): void => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
}
agentConsultDebounceTimer = setTimeout(() => {
agentConsultDebounceTimer = undefined;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS);
agentConsultDebounceTimer.unref?.();
};
const runAgentConsultForUserTranscript = async (question: string): Promise<void> => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
if (agentConsultActive) {
pendingAgentQuestion = trimmed;
return;
}
agentConsultActive = true;
let nextQuestion: string | undefined = trimmed;
try {
while (nextQuestion) {
if (stopped) {
return;
}
const currentQuestion = nextQuestion;
pendingAgentQuestion = undefined;
params.logger.info(`[google-meet] node realtime agent consult: ${currentQuestion}`);
const result = await consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: {
question: currentQuestion,
responseStyle: "Brief, natural spoken answer for a live meeting.",
},
transcript,
});
if (!stopped && result.text.trim()) {
bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(result.text.trim()));
}
nextQuestion = pendingAgentQuestion;
}
} catch (error) {
params.logger.warn(
`[google-meet] node realtime agent consult failed: ${formatErrorMessage(error)}`,
);
if (!stopped) {
bridge?.sendUserMessage(
buildGoogleMeetSpeakExactUserMessage(
"I hit an error while checking that. Please try again.",
),
);
}
} finally {
agentConsultActive = false;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}
};
let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined;
agentTalkback = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS,
isStopped: () => stopped,
logger: params.logger,
logPrefix: "[google-meet] node realtime agent",
responseStyle: "Brief, natural spoken answer for a live meeting.",
fallbackText: "I hit an error while checking that. Please try again.",
consult: ({ question, responseStyle }) =>
consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: { question, responseStyle },
transcript,
}),
deliver: (text) => {
bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(text));
},
});
const stop = async () => {
if (stopped) {
return;
}
stopped = true;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
agentConsultDebounceTimer = undefined;
}
agentTalkback?.close();
try {
bridge?.close();
} catch (error) {
@@ -537,6 +481,18 @@ export async function startNodeRealtimeAudioBridge(params: {
audioSink: {
isOpen: () => !stopped,
sendAudio: (audio) => {
const turnId = ensureTalkTurn();
rememberTalkEvent(
talk.startOutputAudio({
turnId,
payload: { bridgeId: params.bridgeId },
}).event,
);
emitTalkEvent({
type: "output.audio.delta",
turnId,
payload: { byteLength: audio.byteLength },
});
const suppression = extendGoogleMeetOutputEchoSuppression({
audio,
audioFormat: params.config.chrome.audioFormat,
@@ -569,6 +525,7 @@ export async function startNodeRealtimeAudioBridge(params: {
clearAudio: () => {
lastClearAt = new Date().toISOString();
clearCount += 1;
finishOutputAudio("clear");
suppressInputUntil = 0;
lastOutputPlayableUntilMs = 0;
void params.runtime.nodes
@@ -590,6 +547,30 @@ export async function startNodeRealtimeAudioBridge(params: {
},
},
onTranscript: (role, text, isFinal) => {
const turnId = ensureTalkTurn();
const eventType =
role === "assistant"
? isFinal
? "output.text.done"
: "output.text.delta"
: isFinal
? "transcript.done"
: "transcript.delta";
const payload = role === "assistant" ? { text } : { role, text };
emitTalkEvent({
type: eventType,
turnId,
payload,
final: isFinal,
});
if (role === "user" && isFinal) {
emitTalkEvent({
type: "input.audio.committed",
turnId,
payload: { bridgeId: params.bridgeId },
final: true,
});
}
if (isFinal) {
recordGoogleMeetRealtimeTranscript(transcript, role, text);
params.logger.info(`[google-meet] node realtime ${role}: ${text}`);
@@ -600,12 +581,35 @@ export async function startNodeRealtimeAudioBridge(params: {
);
return;
}
enqueueAgentConsultForUserTranscript(text);
agentTalkback?.enqueue(text);
}
}
},
onEvent: (event) => {
recordGoogleMeetRealtimeEvent(realtimeEvents, event);
if (event.type === "input_audio_buffer.speech_started") {
ensureTalkTurn();
} else if (event.type === "input_audio_buffer.speech_stopped") {
const turnId = talk.activeTurnId;
if (!turnId) {
return;
}
emitTalkEvent({
type: "input.audio.committed",
turnId,
payload: { bridgeId: params.bridgeId, source: event.type },
final: true,
});
} else if (event.type === "response.done") {
finishOutputAudio("response.done");
endTalkTurn("response.done");
} else if (event.type === "error") {
emitTalkEvent({
type: "session.error",
payload: { message: event.detail ?? "Realtime provider error" },
final: true,
});
}
if (
event.type === "error" ||
event.type === "response.done" ||
@@ -619,52 +623,57 @@ export async function startNodeRealtimeAudioBridge(params: {
}
},
onToolCall: (event, session) => {
if (strategy !== "bidi") {
session.submitToolResult(event.callId || event.itemId, {
error: `Tool "${event.name}" is only available in bidi realtime strategy`,
});
return;
}
if (event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) {
session.submitToolResult(event.callId || event.itemId, {
error: `Tool "${event.name}" not available`,
});
return;
}
submitGoogleMeetConsultWorkingResponse(session, event.callId || event.itemId);
void consultOpenClawAgentForGoogleMeet({
emitTalkEvent({
type: "tool.call",
turnId: ensureTalkTurn(),
itemId: event.itemId,
callId: event.callId,
payload: { name: event.name, args: event.args },
});
const turnId = ensureTalkTurn();
handleGoogleMeetRealtimeConsultToolCall({
strategy,
session,
event,
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: event.args,
transcript,
})
.then((result) => {
session.submitToolResult(event.callId || event.itemId, result);
})
.catch((error: Error) => {
session.submitToolResult(event.callId || event.itemId, {
error: formatErrorMessage(error),
});
});
onTalkEvent: (input) => emitTalkEvent({ ...input, turnId: input.turnId ?? turnId }),
});
},
onError: (error) => {
params.logger.warn(
`[google-meet] node realtime voice bridge failed: ${formatErrorMessage(error)}`,
);
emitTalkEvent({
type: "session.error",
payload: { message: formatErrorMessage(error) },
final: true,
});
void stop();
},
onClose: (reason) => {
realtimeReady = false;
finishOutputAudio(reason);
emitTalkEvent({
type: "session.closed",
payload: { reason },
final: true,
});
if (reason === "error") {
void stop();
}
},
onReady: () => {
realtimeReady = true;
emitTalkEvent({
type: "session.ready",
payload: { bridgeId: params.bridgeId },
});
},
});
@@ -695,6 +704,11 @@ export async function startNodeRealtimeAudioBridge(params: {
}
lastInputAt = new Date().toISOString();
lastInputBytes += audio.byteLength;
emitTalkEvent({
type: "input.audio.delta",
turnId: ensureTalkTurn(),
payload: { byteLength: audio.byteLength },
});
bridge?.sendAudio(audio);
}
if (result.closed === true) {
@@ -740,6 +754,7 @@ export async function startNodeRealtimeAudioBridge(params: {
suppressedInputBytes,
...getGoogleMeetRealtimeTranscriptHealth(transcript),
...getGoogleMeetRealtimeEventHealth(realtimeEvents),
recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents),
consecutiveInputErrors,
lastInputError,
clearCount,

View File

@@ -11,23 +11,35 @@ import {
type RealtimeTranscriptionSession,
} from "openclaw/plugin-sdk/realtime-transcription";
import {
createRealtimeVoiceAgentTalkbackQueue,
createRealtimeVoiceBridgeSession,
createTalkSessionController,
convertPcmToMulaw8k,
extendRealtimeVoiceOutputEchoSuppression,
getRealtimeVoiceBridgeEventHealth,
getRealtimeVoiceTranscriptHealth,
isLikelyRealtimeVoiceAssistantEchoTranscript,
mulawToPcm,
REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ,
REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
recordRealtimeVoiceBridgeEvent,
recordRealtimeVoiceTranscript,
resamplePcm,
resolveConfiguredRealtimeVoiceProvider,
type RealtimeVoiceAgentTalkbackQueue,
type RealtimeVoiceBridgeEventLogEntry,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceBridgeEvent,
type RealtimeVoiceProviderConfig,
type RealtimeVoiceProviderPlugin,
type RealtimeVoiceTranscriptEntry,
type TalkEvent,
type TalkEventInput,
type TalkSessionController,
} from "openclaw/plugin-sdk/realtime-voice";
import {
consultOpenClawAgentForGoogleMeet,
GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME,
handleGoogleMeetRealtimeConsultToolCall,
resolveGoogleMeetRealtimeTools,
submitGoogleMeetConsultWorkingResponse,
} from "./agent-consult.js";
import type { GoogleMeetConfig } from "./config.js";
import type { GoogleMeetChromeHealth } from "./transports/types.js";
@@ -71,48 +83,16 @@ type ResolvedRealtimeTranscriptionProvider = {
providerConfig: RealtimeTranscriptionProviderConfig;
};
export type GoogleMeetRealtimeTranscriptEntry = {
at: string;
role: "user" | "assistant";
text: string;
};
export function recordGoogleMeetRealtimeTranscript(
transcript: GoogleMeetRealtimeTranscriptEntry[],
role: "user" | "assistant",
text: string,
): GoogleMeetRealtimeTranscriptEntry {
const entry = { at: new Date().toISOString(), role, text };
transcript.push(entry);
if (transcript.length > 40) {
transcript.splice(0, transcript.length - 40);
}
return entry;
}
export type GoogleMeetRealtimeTranscriptEntry = RealtimeVoiceTranscriptEntry;
export const recordGoogleMeetRealtimeTranscript = recordRealtimeVoiceTranscript;
export function getGoogleMeetRealtimeTranscriptHealth(
transcript: GoogleMeetRealtimeTranscriptEntry[],
): Pick<
GoogleMeetChromeHealth,
| "realtimeTranscriptLines"
| "lastRealtimeTranscriptAt"
| "lastRealtimeTranscriptRole"
| "lastRealtimeTranscriptText"
| "recentRealtimeTranscript"
> {
const last = transcript.at(-1);
return {
realtimeTranscriptLines: transcript.length,
lastRealtimeTranscriptAt: last?.at,
lastRealtimeTranscriptRole: last?.role,
lastRealtimeTranscriptText: last?.text,
recentRealtimeTranscript: transcript.slice(-5),
};
): Pick<GoogleMeetChromeHealth, keyof ReturnType<typeof getRealtimeVoiceTranscriptHealth>> {
return getRealtimeVoiceTranscriptHealth(transcript);
}
export type GoogleMeetRealtimeEventEntry = RealtimeVoiceBridgeEvent & {
at: string;
};
export type GoogleMeetRealtimeEventEntry = RealtimeVoiceBridgeEventLogEntry;
export const GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS = 900;
export const GOOGLE_MEET_OUTPUT_ECHO_SUPPRESSION_TAIL_MS = 3_000;
@@ -120,33 +100,15 @@ export const GOOGLE_MEET_TRANSCRIPT_ECHO_LOOKBACK_MS = 45_000;
export function recordGoogleMeetRealtimeEvent(
events: GoogleMeetRealtimeEventEntry[],
event: RealtimeVoiceBridgeEvent,
) {
if (event.direction === "client" && event.type === "input_audio_buffer.append") {
return;
}
events.push({ at: new Date().toISOString(), ...event });
if (events.length > 40) {
events.splice(0, events.length - 40);
}
event: Parameters<typeof recordRealtimeVoiceBridgeEvent>[1],
): void {
recordRealtimeVoiceBridgeEvent(events, event);
}
export function getGoogleMeetRealtimeEventHealth(
events: GoogleMeetRealtimeEventEntry[],
): Pick<
GoogleMeetChromeHealth,
| "lastRealtimeEventAt"
| "lastRealtimeEventType"
| "lastRealtimeEventDetail"
| "recentRealtimeEvents"
> {
const last = events.at(-1);
return {
lastRealtimeEventAt: last?.at,
lastRealtimeEventType: last ? `${last.direction}:${last.type}` : undefined,
lastRealtimeEventDetail: last?.detail,
recentRealtimeEvents: events.slice(-10),
};
): Pick<GoogleMeetChromeHealth, keyof ReturnType<typeof getRealtimeVoiceBridgeEventHealth>> {
return getRealtimeVoiceBridgeEventHealth(events);
}
function splitCommand(argv: string[]): { command: string; args: string[] } {
@@ -174,61 +136,15 @@ function readPcm16Stats(audio: Buffer): { rms: number; peak: number } {
};
}
function normalizeTranscriptForEchoMatch(text: string): string[] {
return text
.toLowerCase()
.replace(/[']/g, "")
.replace(/[^a-z0-9]+/g, " ")
.trim()
.split(/\s+/)
.filter((token) => token.length > 1);
}
function hasMeaningfulEchoOverlap(userTokens: string[], assistantTokens: string[]): boolean {
if (userTokens.length < 4 || assistantTokens.length < 4) {
return false;
}
const uniqueUserTokens = [...new Set(userTokens)];
if (uniqueUserTokens.length < 4) {
return false;
}
const assistantTokenSet = new Set(assistantTokens);
const overlap = uniqueUserTokens.filter((token) => assistantTokenSet.has(token)).length;
return overlap / uniqueUserTokens.length >= 0.58;
}
export function isGoogleMeetLikelyAssistantEchoTranscript(params: {
transcript: GoogleMeetRealtimeTranscriptEntry[];
text: string;
nowMs?: number;
}): boolean {
const userTokens = normalizeTranscriptForEchoMatch(params.text);
if (userTokens.length < 4) {
return false;
}
const nowMs = params.nowMs ?? Date.now();
const recentAssistantText = params.transcript
.filter((entry) => {
if (entry.role !== "assistant") {
return false;
}
const at = Date.parse(entry.at);
return Number.isFinite(at) && nowMs - at <= GOOGLE_MEET_TRANSCRIPT_ECHO_LOOKBACK_MS;
})
.slice(-6)
.map((entry) => entry.text)
.join(" ");
if (!recentAssistantText.trim()) {
return false;
}
const userNormalized = userTokens.join(" ");
const assistantTokens = normalizeTranscriptForEchoMatch(recentAssistantText);
const assistantNormalized = assistantTokens.join(" ");
return (
(userNormalized.length >= 18 && assistantNormalized.includes(userNormalized)) ||
(assistantNormalized.length >= 18 && userNormalized.includes(assistantNormalized)) ||
hasMeaningfulEchoOverlap(userTokens, assistantTokens)
);
return isLikelyRealtimeVoiceAssistantEchoTranscript({
...params,
lookbackMs: GOOGLE_MEET_TRANSCRIPT_ECHO_LOOKBACK_MS,
});
}
export function extendGoogleMeetOutputEchoSuppression(params: {
@@ -239,17 +155,11 @@ export function extendGoogleMeetOutputEchoSuppression(params: {
suppressInputUntilMs: number;
}): { lastOutputPlayableUntilMs: number; suppressInputUntilMs: number; durationMs: number } {
const bytesPerMs = params.audioFormat === "g711-ulaw-8khz" ? 8 : 48;
const durationMs = Math.ceil(params.audio.byteLength / bytesPerMs);
const playbackStartMs = Math.max(params.nowMs, params.lastOutputPlayableUntilMs);
const playbackEndMs = playbackStartMs + durationMs;
return {
durationMs,
lastOutputPlayableUntilMs: playbackEndMs,
suppressInputUntilMs: Math.max(
params.suppressInputUntilMs,
playbackEndMs + GOOGLE_MEET_OUTPUT_ECHO_SUPPRESSION_TAIL_MS,
),
};
return extendRealtimeVoiceOutputEchoSuppression({
...params,
bytesPerMs,
tailMs: GOOGLE_MEET_OUTPUT_ECHO_SUPPRESSION_TAIL_MS,
});
}
export function resolveGoogleMeetRealtimeAudioFormat(config: GoogleMeetConfig) {
@@ -508,6 +418,31 @@ function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | un
return trimmed;
}
export function pushGoogleMeetTalkEvent(
events: TalkEvent[],
event: TalkEvent,
maxEntries = 40,
): void {
events.push(event);
if (events.length > maxEntries) {
events.splice(0, events.length - maxEntries);
}
}
export function summarizeGoogleMeetTalkEvents(
events: TalkEvent[],
): NonNullable<GoogleMeetChromeHealth["recentTalkEvents"]> {
return events.slice(-20).map((event) => ({
id: event.id,
type: event.type,
sessionId: event.sessionId,
turnId: event.turnId,
seq: event.seq,
timestamp: event.timestamp,
final: event.final,
}));
}
export async function startCommandAgentAudioBridge(params: {
config: GoogleMeetConfig;
fullConfig: OpenClawConfig;
@@ -542,9 +477,7 @@ export async function startCommandAgentAudioBridge(params: {
let lastSuppressedInputAt: string | undefined;
let suppressInputUntil = 0;
let lastOutputPlayableUntilMs = 0;
let agentConsultActive = false;
let pendingAgentQuestion: string | undefined;
let agentConsultDebounceTimer: ReturnType<typeof setTimeout> | undefined;
let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined;
let ttsQueue = Promise.resolve();
const transcript: GoogleMeetRealtimeTranscriptEntry[] = [];
const resolved = resolveGoogleMeetRealtimeTranscriptionProvider({
@@ -552,6 +485,34 @@ export async function startCommandAgentAudioBridge(params: {
fullConfig: params.fullConfig,
providers: params.providers,
});
const talk = createTalkSessionController({
sessionId: `google-meet:${params.meetingSessionId}:agent`,
mode: "stt-tts",
transport: "gateway-relay",
brain: "agent-consult",
provider: resolved.provider.id,
turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`,
});
const recentTalkEvents: TalkEvent[] = [];
const emitTalkEvent = (input: TalkEventInput) =>
pushGoogleMeetTalkEvent(recentTalkEvents, talk.emit(input));
const ensureTalkTurn = () => {
const turn = talk.ensureTurn({
payload: { meetingSessionId: params.meetingSessionId },
});
if (turn.event) {
pushGoogleMeetTalkEvent(recentTalkEvents, turn.event);
}
return turn.turnId;
};
const endTalkTurn = () => {
const ended = talk.endTurn({
payload: { meetingSessionId: params.meetingSessionId },
});
if (ended.ok) {
pushGoogleMeetTalkEvent(recentTalkEvents, ended.event);
}
};
params.logger.info(
formatGoogleMeetAgentAudioModelLog({
provider: resolved.provider,
@@ -593,10 +554,7 @@ export async function startCommandAgentAudioBridge(params: {
return;
}
stopped = true;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
agentConsultDebounceTimer = undefined;
}
agentTalkback?.close();
try {
sttSession?.close();
} catch (error) {
@@ -604,6 +562,11 @@ export async function startCommandAgentAudioBridge(params: {
`[google-meet] agent transcription bridge close ignored: ${formatErrorMessage(error)}`,
);
}
emitTalkEvent({
type: "session.closed",
final: true,
payload: { meetingSessionId: params.meetingSessionId },
});
terminateProcess(inputProcess);
terminateProcess(outputProcess);
};
@@ -646,6 +609,11 @@ export async function startCommandAgentAudioBridge(params: {
lastOutputPlayableUntilMs = suppression.lastOutputPlayableUntilMs;
lastOutputAt = new Date().toISOString();
lastOutputBytes += audio.byteLength;
emitTalkEvent({
type: "output.audio.delta",
turnId: ensureTalkTurn(),
payload: { meetingSessionId: params.meetingSessionId, bytes: audio.byteLength },
});
try {
outputProcess.stdin?.write(audio);
} catch (error) {
@@ -665,6 +633,13 @@ export async function startCommandAgentAudioBridge(params: {
}
recordGoogleMeetRealtimeTranscript(transcript, "assistant", normalized);
params.logger.info(`[google-meet] agent assistant: ${normalized}`);
const turnId = ensureTalkTurn();
emitTalkEvent({
type: "output.text.done",
turnId,
final: true,
payload: { meetingSessionId: params.meetingSessionId, text: normalized },
});
const result = await params.runtime.tts.textToSpeechTelephony({
text: normalized,
cfg: params.fullConfig,
@@ -673,6 +648,11 @@ export async function startCommandAgentAudioBridge(params: {
throw new Error(result.error ?? "TTS conversion failed");
}
params.logger.info(formatGoogleMeetAgentTtsResultLog("agent", result));
emitTalkEvent({
type: "output.audio.started",
turnId,
payload: { meetingSessionId: params.meetingSessionId },
});
writeOutputAudio(
convertGoogleMeetTtsAudioForBridge(
result.audioBuffer,
@@ -681,79 +661,39 @@ export async function startCommandAgentAudioBridge(params: {
result.outputFormat,
),
);
emitTalkEvent({
type: "output.audio.done",
turnId,
final: true,
payload: { meetingSessionId: params.meetingSessionId },
});
endTalkTurn();
})
.catch((error) => {
params.logger.warn(`[google-meet] agent TTS failed: ${formatErrorMessage(error)}`);
});
};
const runAgentConsultForUserTranscript = async (question: string): Promise<void> => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
if (agentConsultActive) {
pendingAgentQuestion = trimmed;
return;
}
agentConsultActive = true;
let nextQuestion: string | undefined = trimmed;
try {
while (nextQuestion) {
if (stopped) {
return;
}
const currentQuestion = nextQuestion;
pendingAgentQuestion = undefined;
params.logger.info(`[google-meet] agent consult: ${currentQuestion}`);
const result = await consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: {
question: currentQuestion,
responseStyle: "Brief, natural spoken answer for a live meeting.",
},
transcript,
});
enqueueSpeakText(result.text);
nextQuestion = pendingAgentQuestion;
}
} catch (error) {
params.logger.warn(`[google-meet] agent consult failed: ${formatErrorMessage(error)}`);
enqueueSpeakText("I hit an error while checking that. Please try again.");
} finally {
agentConsultActive = false;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}
};
const enqueueAgentConsultForUserTranscript = (question: string): void => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
}
agentConsultDebounceTimer = setTimeout(() => {
agentConsultDebounceTimer = undefined;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS);
agentConsultDebounceTimer.unref?.();
};
agentTalkback = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS,
isStopped: () => stopped,
logger: params.logger,
logPrefix: "[google-meet] agent",
responseStyle: "Brief, natural spoken answer for a live meeting.",
fallbackText: "I hit an error while checking that. Please try again.",
consult: ({ question, responseStyle }) =>
consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: { question, responseStyle },
transcript,
}),
deliver: enqueueSpeakText,
});
sttSession = resolved.provider.createSession({
providerConfig: resolved.providerConfig,
@@ -762,24 +702,50 @@ export async function startCommandAgentAudioBridge(params: {
if (!trimmed || stopped) {
return;
}
const turnId = ensureTalkTurn();
emitTalkEvent({
type: "input.audio.committed",
turnId,
final: true,
payload: { meetingSessionId: params.meetingSessionId },
});
emitTalkEvent({
type: "transcript.done",
turnId,
final: true,
payload: { meetingSessionId: params.meetingSessionId, text: trimmed, role: "user" },
});
recordGoogleMeetRealtimeTranscript(transcript, "user", trimmed);
params.logger.info(`[google-meet] agent user: ${trimmed}`);
if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text: trimmed })) {
params.logger.info(`[google-meet] agent ignored assistant echo transcript: ${trimmed}`);
return;
}
enqueueAgentConsultForUserTranscript(trimmed);
agentTalkback?.enqueue(trimmed);
},
onError: (error) => {
params.logger.warn(
`[google-meet] agent transcription bridge failed: ${formatErrorMessage(error)}`,
);
emitTalkEvent({
type: "session.error",
final: true,
payload: { meetingSessionId: params.meetingSessionId, error: formatErrorMessage(error) },
});
void stop();
},
});
emitTalkEvent({
type: "session.started",
payload: { meetingSessionId: params.meetingSessionId, provider: resolved.provider.id },
});
await sttSession.connect();
realtimeReady = true;
emitTalkEvent({
type: "session.ready",
payload: { meetingSessionId: params.meetingSessionId },
});
inputProcess.stdout?.on("data", (chunk) => {
if (stopped) {
@@ -793,6 +759,11 @@ export async function startCommandAgentAudioBridge(params: {
}
lastInputAt = new Date().toISOString();
lastInputBytes += audio.byteLength;
emitTalkEvent({
type: "input.audio.delta",
turnId: ensureTalkTurn(),
payload: { meetingSessionId: params.meetingSessionId, bytes: audio.byteLength },
});
sttSession?.sendAudio(convertGoogleMeetBridgeAudioForStt(audio, params.config));
});
@@ -813,6 +784,7 @@ export async function startCommandAgentAudioBridge(params: {
lastOutputBytes,
suppressedInputBytes,
...getGoogleMeetRealtimeTranscriptHealth(transcript),
recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents),
bridgeClosed: stopped,
}),
stop,
@@ -859,7 +831,7 @@ export async function startCommandRealtimeAudioBridge(params: {
let lastOutputAtMs = 0;
let lastOutputPlayableUntilMs = 0;
let bargeInInputProcess: BridgeProcess | undefined;
let agentConsultDebounceTimer: ReturnType<typeof setTimeout> | undefined;
let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined;
const suppressInputForOutput = (audio: Buffer) => {
const suppression = extendGoogleMeetOutputEchoSuppression({
@@ -906,10 +878,7 @@ export async function startCommandRealtimeAudioBridge(params: {
return;
}
stopped = true;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
agentConsultDebounceTimer = undefined;
}
agentTalkback?.close();
try {
bridge?.close();
} catch (error) {
@@ -1065,84 +1034,72 @@ export async function startCommandRealtimeAudioBridge(params: {
);
const transcript: GoogleMeetRealtimeTranscriptEntry[] = [];
const realtimeEvents: GoogleMeetRealtimeEventEntry[] = [];
let agentConsultActive = false;
let pendingAgentQuestion: string | undefined;
const enqueueAgentConsultForUserTranscript = (question: string): void => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed;
if (agentConsultDebounceTimer) {
clearTimeout(agentConsultDebounceTimer);
}
agentConsultDebounceTimer = setTimeout(() => {
agentConsultDebounceTimer = undefined;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
}, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS);
agentConsultDebounceTimer.unref?.();
};
const runAgentConsultForUserTranscript = async (question: string): Promise<void> => {
const trimmed = question.trim();
if (!trimmed || stopped) {
return;
}
if (agentConsultActive) {
pendingAgentQuestion = trimmed;
return;
}
agentConsultActive = true;
let nextQuestion: string | undefined = trimmed;
try {
while (nextQuestion) {
if (stopped) {
return;
}
const currentQuestion = nextQuestion;
pendingAgentQuestion = undefined;
params.logger.info(`[google-meet] realtime agent consult: ${currentQuestion}`);
const result = await consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: {
question: currentQuestion,
responseStyle: "Brief, natural spoken answer for a live meeting.",
},
transcript,
});
if (!stopped && result.text.trim()) {
bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(result.text.trim()));
}
nextQuestion = pendingAgentQuestion;
}
} catch (error) {
params.logger.warn(
`[google-meet] realtime agent consult failed: ${formatErrorMessage(error)}`,
);
if (!stopped) {
bridge?.sendUserMessage(
buildGoogleMeetSpeakExactUserMessage(
"I hit an error while checking that. Please try again.",
),
);
}
} finally {
agentConsultActive = false;
const queuedQuestion = pendingAgentQuestion;
pendingAgentQuestion = undefined;
if (queuedQuestion && !stopped) {
void runAgentConsultForUserTranscript(queuedQuestion);
}
const talk: TalkSessionController = createTalkSessionController({
sessionId: `google-meet:${params.meetingSessionId}:command-realtime`,
mode: "realtime",
transport: "gateway-relay",
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
provider: resolved.provider.id,
});
const recentTalkEvents: TalkEvent[] = [];
const rememberTalkEvent = (event: TalkEvent | undefined): void => {
if (event) {
pushGoogleMeetTalkEvent(recentTalkEvents, event);
}
};
const emitTalkEvent = (input: TalkEventInput): void => {
rememberTalkEvent(talk.emit(input));
};
const ensureTalkTurn = (): string => {
const turn = talk.ensureTurn({
payload: { meetingSessionId: params.meetingSessionId },
});
if (turn.event) {
rememberTalkEvent(turn.event);
}
return turn.turnId;
};
const finishOutputAudio = (reason: string): void => {
rememberTalkEvent(
talk.finishOutputAudio({
payload: { reason },
}),
);
};
const endTalkTurn = (reason = "completed"): void => {
const ended = talk.endTurn({
payload: { reason },
});
if (ended.ok) {
rememberTalkEvent(ended.event);
}
};
emitTalkEvent({
type: "session.started",
payload: { meetingSessionId: params.meetingSessionId },
});
agentTalkback = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS,
isStopped: () => stopped,
logger: params.logger,
logPrefix: "[google-meet] realtime agent",
responseStyle: "Brief, natural spoken answer for a live meeting.",
fallbackText: "I hit an error while checking that. Please try again.",
consult: ({ question, responseStyle }) =>
consultOpenClawAgentForGoogleMeet({
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: { question, responseStyle },
transcript,
}),
deliver: (text) => {
bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(text));
},
});
bridge = createRealtimeVoiceBridgeSession({
provider: resolved.provider,
providerConfig: resolved.providerConfig,
@@ -1157,15 +1114,54 @@ export async function startCommandRealtimeAudioBridge(params: {
audioSink: {
isOpen: () => !stopped,
sendAudio: (audio) => {
const turnId = ensureTalkTurn();
rememberTalkEvent(
talk.startOutputAudio({
turnId,
payload: { meetingSessionId: params.meetingSessionId },
}).event,
);
emitTalkEvent({
type: "output.audio.delta",
turnId,
payload: { byteLength: audio.byteLength },
});
lastOutputAtMs = Date.now();
lastOutputAt = new Date().toISOString();
lastOutputBytes += audio.byteLength;
suppressInputForOutput(audio);
writeOutputAudio(audio);
},
clearAudio: clearOutputPlayback,
clearAudio: () => {
clearOutputPlayback();
finishOutputAudio("clear");
},
},
onTranscript: (role, text, isFinal) => {
const turnId = ensureTalkTurn();
const eventType =
role === "assistant"
? isFinal
? "output.text.done"
: "output.text.delta"
: isFinal
? "transcript.done"
: "transcript.delta";
const payload = role === "assistant" ? { text } : { role, text };
emitTalkEvent({
type: eventType,
turnId,
payload,
final: isFinal,
});
if (role === "user" && isFinal) {
emitTalkEvent({
type: "input.audio.committed",
turnId,
payload: { meetingSessionId: params.meetingSessionId },
final: true,
});
}
if (isFinal) {
recordGoogleMeetRealtimeTranscript(transcript, role, text);
params.logger.info(`[google-meet] realtime ${role}: ${text}`);
@@ -1174,12 +1170,35 @@ export async function startCommandRealtimeAudioBridge(params: {
params.logger.info(`[google-meet] realtime ignored assistant echo transcript: ${text}`);
return;
}
enqueueAgentConsultForUserTranscript(text);
agentTalkback?.enqueue(text);
}
}
},
onEvent: (event) => {
recordGoogleMeetRealtimeEvent(realtimeEvents, event);
if (event.type === "input_audio_buffer.speech_started") {
ensureTalkTurn();
} else if (event.type === "input_audio_buffer.speech_stopped") {
const turnId = talk.activeTurnId;
if (!turnId) {
return;
}
emitTalkEvent({
type: "input.audio.committed",
turnId,
payload: { meetingSessionId: params.meetingSessionId, source: event.type },
final: true,
});
} else if (event.type === "response.done") {
finishOutputAudio("response.done");
endTalkTurn("response.done");
} else if (event.type === "error") {
emitTalkEvent({
type: "session.error",
payload: { message: event.detail ?? "Realtime provider error" },
final: true,
});
}
if (
event.type === "error" ||
event.type === "response.done" ||
@@ -1193,47 +1212,54 @@ export async function startCommandRealtimeAudioBridge(params: {
}
},
onToolCall: (event, session) => {
if (strategy !== "bidi") {
session.submitToolResult(event.callId || event.itemId, {
error: `Tool "${event.name}" is only available in bidi realtime strategy`,
});
return;
}
if (event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) {
session.submitToolResult(event.callId || event.itemId, {
error: `Tool "${event.name}" not available`,
});
return;
}
submitGoogleMeetConsultWorkingResponse(session, event.callId || event.itemId);
void consultOpenClawAgentForGoogleMeet({
emitTalkEvent({
type: "tool.call",
turnId: ensureTalkTurn(),
itemId: event.itemId,
callId: event.callId,
payload: { name: event.name, args: event.args },
});
const turnId = ensureTalkTurn();
handleGoogleMeetRealtimeConsultToolCall({
strategy,
session,
event,
config: params.config,
fullConfig: params.fullConfig,
runtime: params.runtime,
logger: params.logger,
meetingSessionId: params.meetingSessionId,
requesterSessionKey: params.requesterSessionKey,
args: event.args,
transcript,
})
.then((result) => {
session.submitToolResult(event.callId || event.itemId, result);
})
.catch((error: Error) => {
session.submitToolResult(event.callId || event.itemId, {
error: formatErrorMessage(error),
});
});
onTalkEvent: (input) => emitTalkEvent({ ...input, turnId: input.turnId ?? turnId }),
});
},
onError: (error) => {
emitTalkEvent({
type: "session.error",
payload: { message: formatErrorMessage(error) },
final: true,
});
fail("realtime voice bridge")(error);
},
onError: fail("realtime voice bridge"),
onClose: (reason) => {
realtimeReady = false;
finishOutputAudio(reason);
emitTalkEvent({
type: "session.closed",
payload: { reason },
final: true,
});
if (reason === "error") {
void stop();
}
},
onReady: () => {
realtimeReady = true;
emitTalkEvent({
type: "session.ready",
payload: { meetingSessionId: params.meetingSessionId },
});
},
});
startHumanBargeInMonitor();
@@ -1248,6 +1274,11 @@ export async function startCommandRealtimeAudioBridge(params: {
}
lastInputAt = new Date().toISOString();
lastInputBytes += audio.byteLength;
emitTalkEvent({
type: "input.audio.delta",
turnId: ensureTalkTurn(),
payload: { byteLength: audio.byteLength },
});
bridge?.sendAudio(Buffer.from(audio));
}
});
@@ -1273,6 +1304,7 @@ export async function startCommandRealtimeAudioBridge(params: {
suppressedInputBytes,
...getGoogleMeetRealtimeTranscriptHealth(transcript),
...getGoogleMeetRealtimeEventHealth(realtimeEvents),
recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents),
lastClearAt,
clearCount,
bridgeClosed: stopped,

View File

@@ -62,6 +62,15 @@ export type GoogleMeetChromeHealth = {
type: string;
detail?: string;
}>;
recentTalkEvents?: Array<{
id: string;
type: string;
sessionId: string;
turnId?: string;
seq: number;
timestamp: string;
final?: boolean;
}>;
manualActionRequired?: boolean;
manualActionReason?: GoogleMeetManualActionReason;
manualActionMessage?: string;

View File

@@ -3,7 +3,9 @@ import net from "node:net";
import type {
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
} from "openclaw/plugin-sdk/realtime-transcription";
import { createTalkSessionController, type TalkEvent } from "openclaw/plugin-sdk/realtime-voice";
import { describe, expect, it, vi } from "vitest";
import { WebSocket } from "ws";
import { MediaStreamHandler, sanitizeLogText } from "./media-stream.js";
@@ -160,6 +162,124 @@ describe("MediaStreamHandler TTS queue", () => {
});
describe("MediaStreamHandler security hardening", () => {
it("emits common Talk events for telephony STT/TTS sessions", async () => {
let callbacks: RealtimeTranscriptionSessionCreateRequest | undefined;
const sentAudio: Buffer[] = [];
const session: RealtimeTranscriptionSession = {
connect: async () => {},
sendAudio: (audio) => {
sentAudio.push(Buffer.from(audio));
},
close: () => {},
isConnected: () => true,
};
const talkEvents: TalkEvent[] = [];
const handler = new MediaStreamHandler({
transcriptionProvider: {
createSession: (request) => {
callbacks = request;
return session;
},
id: "openai",
label: "OpenAI",
isConfigured: () => true,
},
providerConfig: {},
shouldAcceptStream: () => true,
onTalkEvent: (_callId, _streamSid, event) => {
talkEvents.push(event);
},
});
const server = await startWsServer(handler);
try {
const ws = await connectWs(server.url);
ws.send(
JSON.stringify({
event: "start",
streamSid: "MZ-talk",
start: { callSid: "CA-talk" },
}),
);
await flush();
ws.send(
JSON.stringify({
event: "media",
streamSid: "MZ-talk",
media: { payload: Buffer.from("hello").toString("base64") },
}),
);
await flush();
expect(Buffer.concat(sentAudio).toString()).toBe("hello");
callbacks?.onSpeechStart?.();
callbacks?.onPartial?.("hel");
callbacks?.onTranscript?.("hello there");
await handler.queueTts("MZ-talk", async () => {
handler.sendAudio("MZ-talk", Buffer.alloc(160, 0xff));
});
const activePlayback = handler.queueTts("MZ-talk", async (signal) => {
await waitForAbort(signal);
});
await flush();
handler.clearTtsQueue("MZ-talk", "barge-in");
await activePlayback;
ws.close();
await waitForClose(ws);
await vi.waitFor(() => {
expect(talkEvents.some((event) => event.type === "session.closed")).toBe(true);
});
expect(talkEvents.map((event) => event.type)).toEqual([
"session.started",
"session.ready",
"turn.started",
"input.audio.delta",
"transcript.delta",
"input.audio.committed",
"transcript.done",
"output.audio.started",
"output.audio.delta",
"output.audio.done",
"turn.ended",
"turn.started",
"output.audio.started",
"turn.cancelled",
"session.closed",
]);
expect(talkEvents[0]).toEqual(
expect.objectContaining({
sessionId: "voice-call:CA-talk:MZ-talk",
mode: "stt-tts",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
seq: 1,
}),
);
expect(talkEvents.find((event) => event.type === "transcript.done")).toEqual(
expect.objectContaining({
final: true,
turnId: "MZ-talk:turn-1",
payload: expect.objectContaining({ text: "hello there", role: "user" }),
}),
);
expect(talkEvents.find((event) => event.type === "turn.cancelled")).toEqual(
expect.objectContaining({
final: true,
turnId: "MZ-talk:turn-2",
payload: expect.objectContaining({ reason: "barge-in" }),
}),
);
} finally {
await server.close();
}
});
it("fails sends and closes stream when buffered bytes already exceed the cap", () => {
const handler = new MediaStreamHandler({
transcriptionProvider: createStubSttProvider(),
@@ -180,6 +300,7 @@ describe("MediaStreamHandler security hardening", () => {
streamSid: string;
ws: WebSocket;
sttSession: RealtimeTranscriptionSession;
talk: ReturnType<typeof createTalkSessionController>;
}
>;
}
@@ -188,6 +309,13 @@ describe("MediaStreamHandler security hardening", () => {
streamSid: "MZ-backpressure",
ws,
sttSession: createStubSession(),
talk: createTalkSessionController({
sessionId: "voice-call:CA-backpressure:MZ-backpressure",
mode: "stt-tts",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
}),
});
const result = handler.sendAudio("MZ-backpressure", Buffer.alloc(160, 0xff));

View File

@@ -14,6 +14,12 @@ import type {
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
} from "openclaw/plugin-sdk/realtime-transcription";
import {
createTalkSessionController,
type TalkEvent,
type TalkEventInput,
type TalkSessionController,
} from "openclaw/plugin-sdk/realtime-voice";
import { type RawData, WebSocket, WebSocketServer } from "ws";
/**
@@ -48,6 +54,8 @@ export interface MediaStreamConfig {
onSpeechStart?: (callId: string) => void;
/** Callback when stream disconnects */
onDisconnect?: (callId: string, streamSid: string) => void;
/** Callback for common Talk events emitted by the telephony STT/TTS adapter. */
onTalkEvent?: (callId: string, streamSid: string, event: TalkEvent) => void;
}
/**
@@ -58,6 +66,7 @@ interface StreamSession {
streamSid: string;
ws: WebSocket;
sttSession: RealtimeTranscriptionSession;
talk: TalkSessionController;
}
type TtsQueueEntry = {
@@ -225,6 +234,16 @@ export class MediaStreamHandler {
if (session && message.media?.payload) {
// Forward audio to STT
const audioBuffer = Buffer.from(message.media.payload, "base64");
const turnId = this.ensureActiveTurn(session);
this.emitTalkEvent(session, {
type: "input.audio.delta",
turnId,
payload: {
callId: session.callId,
streamSid: session.streamSid,
bytes: audioBuffer.byteLength,
},
});
session.sttSession.sendAudio(audioBuffer);
}
break;
@@ -296,16 +315,52 @@ export class MediaStreamHandler {
const sttSession = this.config.transcriptionProvider.createSession({
providerConfig: this.config.providerConfig,
onPartial: (partial) => {
const session = this.sessions.get(streamSid);
if (session) {
this.emitTalkEvent(session, {
type: "transcript.delta",
turnId: this.ensureActiveTurn(session),
payload: { callId: callSid, streamSid, text: partial, role: "user" },
});
}
this.config.onPartialTranscript?.(callSid, partial);
},
onTranscript: (transcript) => {
const session = this.sessions.get(streamSid);
if (session) {
const turnId = this.ensureActiveTurn(session);
this.emitTalkEvent(session, {
type: "input.audio.committed",
turnId,
final: true,
payload: { callId: callSid, streamSid },
});
this.emitTalkEvent(session, {
type: "transcript.done",
turnId,
final: true,
payload: { callId: callSid, streamSid, text: transcript, role: "user" },
});
}
this.config.onTranscript?.(callSid, transcript);
},
onSpeechStart: () => {
const session = this.sessions.get(streamSid);
if (session) {
this.ensureActiveTurn(session);
}
this.config.onSpeechStart?.(callSid);
},
onError: (error) => {
console.warn("[MediaStream] Transcription session error:", error.message);
const session = this.sessions.get(streamSid);
if (session) {
this.emitTalkEvent(session, {
type: "session.error",
final: true,
payload: { callId: callSid, streamSid, error: error.message },
});
}
},
});
@@ -314,10 +369,15 @@ export class MediaStreamHandler {
streamSid,
ws,
sttSession,
talk: this.createTalkEvents(callSid, streamSid),
};
this.sessions.set(streamSid, session);
this.config.onConnect?.(callSid, streamSid);
this.emitTalkEvent(session, {
type: "session.started",
payload: { callId: callSid, streamSid, provider: this.config.transcriptionProvider.id },
});
void this.connectTranscriptionAndNotify(session);
return session;
@@ -331,6 +391,15 @@ export class MediaStreamHandler {
"[MediaStream] STT connection failed; closing media stream:",
error instanceof Error ? error.message : String(error),
);
this.emitTalkEvent(session, {
type: "session.error",
final: true,
payload: {
callId: session.callId,
streamSid: session.streamSid,
error: error instanceof Error ? error.message : String(error),
},
});
if (
this.sessions.get(session.streamSid) === session &&
session.ws.readyState === WebSocket.OPEN
@@ -350,6 +419,10 @@ export class MediaStreamHandler {
return;
}
this.emitTalkEvent(session, {
type: "session.ready",
payload: { callId: session.callId, streamSid: session.streamSid },
});
this.config.onTranscriptionReady?.(session.callId, session.streamSid);
}
@@ -362,6 +435,11 @@ export class MediaStreamHandler {
this.clearTtsState(session.streamSid);
session.sttSession.close();
this.sessions.delete(session.streamSid);
this.emitTalkEvent(session, {
type: "session.closed",
final: true,
payload: { callId: session.callId, streamSid: session.streamSid },
});
this.config.onDisconnect?.(session.callId, session.streamSid);
}
@@ -530,6 +608,14 @@ export class MediaStreamHandler {
* Audio should be mu-law encoded at 8kHz mono.
*/
sendAudio(streamSid: string, muLawAudio: Buffer): StreamSendResult {
const session = this.getOpenSession(streamSid);
if (session) {
this.emitTalkEvent(session, {
type: "output.audio.delta",
turnId: this.ensureActiveTurn(session),
payload: { callId: session.callId, streamSid, bytes: muLawAudio.byteLength },
});
}
return this.sendToStream(streamSid, {
event: "media",
streamSid,
@@ -589,6 +675,15 @@ export class MediaStreamHandler {
const queue = this.getTtsQueue(streamSid);
this.resolveQueuedTtsEntries(queue);
this.ttsActiveControllers.get(streamSid)?.abort();
const session = this.sessions.get(streamSid);
if (session?.talk.activeTurnId) {
const cancelled = session.talk.cancelTurn({
payload: { callId: session.callId, streamSid, reason: _reason },
});
if (cancelled.ok) {
this.config.onTalkEvent?.(session.callId, session.streamSid, cancelled.event);
}
}
this.clearAudio(streamSid);
}
@@ -638,9 +733,40 @@ export class MediaStreamHandler {
const entry = queue.shift()!;
this.ttsActiveControllers.set(streamSid, entry.controller);
const session = this.sessions.get(streamSid);
let playbackTurnId: string | undefined;
try {
if (session) {
playbackTurnId = this.ensureActiveTurn(session);
this.emitTalkEvent(session, {
type: "output.audio.started",
turnId: playbackTurnId,
payload: { callId: session.callId, streamSid },
});
}
await entry.playFn(entry.controller.signal);
if (entry.controller.signal.aborted) {
entry.resolve();
continue;
}
if (session) {
const turnId = playbackTurnId ?? this.ensureActiveTurn(session);
this.emitTalkEvent(session, {
type: "output.audio.done",
turnId,
final: true,
payload: { callId: session.callId, streamSid },
});
if (session.talk.activeTurnId) {
const ended = session.talk.endTurn({
payload: { callId: session.callId, streamSid },
});
if (ended.ok) {
this.config.onTalkEvent?.(session.callId, session.streamSid, ended.event);
}
}
}
entry.resolve();
} catch (error) {
if (entry.controller.signal.aborted) {
@@ -657,6 +783,32 @@ export class MediaStreamHandler {
}
}
private createTalkEvents(callId: string, streamSid: string): TalkSessionController {
return createTalkSessionController({
sessionId: `voice-call:${callId}:${streamSid}`,
mode: "stt-tts",
transport: "gateway-relay",
brain: "agent-consult",
provider: this.config.transcriptionProvider.id,
turnIdPrefix: `${streamSid}:turn`,
});
}
private emitTalkEvent(session: StreamSession, input: TalkEventInput): void {
const event = session.talk.emit(input);
this.config.onTalkEvent?.(session.callId, session.streamSid, event);
}
private ensureActiveTurn(session: StreamSession): string {
const turn = session.talk.ensureTurn({
payload: { callId: session.callId, streamSid: session.streamSid },
});
if (turn.event) {
this.config.onTalkEvent?.(session.callId, session.streamSid, turn.event);
}
return turn.turnId;
}
private clearTtsState(streamSid: string): void {
const queue = this.ttsQueues.get(streamSid);
if (queue) {

View File

@@ -6,7 +6,7 @@ const mocks = vi.hoisted(() => ({
getActiveMemorySearchManager: vi.fn(),
}));
vi.mock("openclaw/plugin-sdk/memory-host-search", () => ({
vi.mock("../../../src/plugins/memory-runtime.js", () => ({
getActiveMemorySearchManager: mocks.getActiveMemorySearchManager,
}));

View File

@@ -1,151 +1,27 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { getActiveMemorySearchManager } from "openclaw/plugin-sdk/memory-host-search";
import {
parseRealtimeVoiceAgentConsultArgs,
type RealtimeVoiceAgentConsultResult,
resolveRealtimeVoiceFastContextConsult,
type RealtimeVoiceFastContextConsultResult,
type RealtimeVoiceFastContextConfig,
} from "openclaw/plugin-sdk/realtime-voice";
import { withTimeout } from "openclaw/plugin-sdk/security-runtime";
import type { VoiceCallRealtimeFastContextConfig } from "./config.js";
type Logger = {
debug?: (message: string) => void;
warn: (message: string) => void;
};
type MemorySearchHit = {
path: string;
startLine: number;
endLine: number;
snippet: string;
source: "memory" | "sessions";
score: number;
};
type FastContextLookupResult =
| { status: "unavailable"; error?: string }
| { status: "hits"; hits: MemorySearchHit[] };
type RealtimeFastContextConsultResult =
| { handled: false }
| { handled: true; result: RealtimeVoiceAgentConsultResult };
const MAX_SNIPPET_CHARS = 700;
class RealtimeFastContextTimeoutError extends Error {
constructor(timeoutMs: number) {
super(`fast context lookup timed out after ${timeoutMs}ms`);
this.name = "RealtimeFastContextTimeoutError";
}
}
function normalizeSnippet(text: string): string {
const normalized = text.replace(/\s+/g, " ").trim();
if (normalized.length <= MAX_SNIPPET_CHARS) {
return normalized;
}
return `${normalized.slice(0, MAX_SNIPPET_CHARS - 1).trimEnd()}...`;
}
function buildSearchQuery(args: unknown): string {
const parsed = parseRealtimeVoiceAgentConsultArgs(args);
return [parsed.question, parsed.context].filter(Boolean).join("\n\n");
}
function buildContextText(params: { query: string; hits: MemorySearchHit[] }): string {
const hits = params.hits
.map((hit, index) => {
const location = `${hit.path}:${hit.startLine}-${hit.endLine}`;
return `${index + 1}. [${hit.source}] ${location}\n${normalizeSnippet(hit.snippet)}`;
})
.join("\n\n");
return [
"Fast OpenClaw memory context found for the live caller.",
"Use this context only if it answers the caller's question. If it is not relevant, say briefly that you do not have that context handy.",
`Question:\n${params.query}`,
`Context:\n${hits}`,
].join("\n\n");
}
function buildMissText(query: string): string {
return [
"No relevant OpenClaw memory or session context was found quickly for the live caller.",
"Answer briefly that you do not have that context handy. Do not keep checking unless the caller asks you to.",
`Question:\n${query}`,
].join("\n\n");
}
async function lookupFastContext(params: {
cfg: OpenClawConfig;
agentId: string;
sessionKey: string;
config: VoiceCallRealtimeFastContextConfig;
query: string;
}): Promise<FastContextLookupResult> {
const memory = await getActiveMemorySearchManager({
cfg: params.cfg,
agentId: params.agentId,
});
if (!memory.manager) {
return {
status: "unavailable",
error: memory.error ?? "no active memory manager",
};
}
const hits = await memory.manager.search(params.query, {
maxResults: params.config.maxResults,
sessionKey: params.sessionKey,
sources: params.config.sources,
});
return { status: "hits", hits };
}
export async function resolveRealtimeFastContextConsult(params: {
cfg: OpenClawConfig;
agentId: string;
sessionKey: string;
config: VoiceCallRealtimeFastContextConfig;
config: RealtimeVoiceFastContextConfig;
args: unknown;
logger: Logger;
}): Promise<RealtimeFastContextConsultResult> {
if (!params.config.enabled) {
return { handled: false };
}
const query = buildSearchQuery(params.args);
try {
const lookup = await withTimeout(
lookupFastContext({
cfg: params.cfg,
agentId: params.agentId,
sessionKey: params.sessionKey,
config: params.config,
query,
}),
params.config.timeoutMs,
{ createError: () => new RealtimeFastContextTimeoutError(params.config.timeoutMs) },
);
if (lookup.status === "unavailable") {
params.logger.debug?.(`[voice-call] realtime fast context unavailable: ${lookup.error}`);
return params.config.fallbackToConsult
? { handled: false }
: { handled: true, result: { text: buildMissText(query) } };
}
const { hits } = lookup;
if (hits.length === 0) {
return params.config.fallbackToConsult
? { handled: false }
: { handled: true, result: { text: buildMissText(query) } };
}
return {
handled: true,
result: { text: buildContextText({ query, hits }) },
};
} catch (error) {
const message = formatErrorMessage(error);
params.logger.debug?.(`[voice-call] realtime fast context lookup failed: ${message}`);
return params.config.fallbackToConsult
? { handled: false }
: { handled: true, result: { text: buildMissText(query) } };
}
}): Promise<RealtimeVoiceFastContextConsultResult> {
return await resolveRealtimeVoiceFastContextConsult({
...params,
labels: {
audienceLabel: "caller",
contextName: "OpenClaw memory or session context",
},
});
}

View File

@@ -224,6 +224,70 @@ describe("VoiceCallWebhookServer realtime transcription provider selection", ()
await server.stop();
}
});
it("records media stream Talk events on the active call metadata", async () => {
const call = createCall(Date.now());
const manager = {
getActiveCalls: () => [call],
getCallByProviderCallId: (providerCallId: string) =>
providerCallId === "provider-call-1" ? call : undefined,
endCall: vi.fn(async () => ({ success: true })),
processEvent: vi.fn(),
speakInitialMessage: vi.fn(async () => {}),
} as unknown as CallManager;
const config = createConfig({
streaming: {
...createConfig().streaming,
enabled: true,
providers: {
openai: {
apiKey: "sk-test", // pragma: allowlist secret
},
},
},
});
const server = new VoiceCallWebhookServer(config, manager, provider);
try {
await server.start();
const mediaHandler = server.getMediaStreamHandler() as unknown as {
config: {
onTalkEvent?: NonNullable<import("./media-stream.js").MediaStreamConfig["onTalkEvent"]>;
};
};
mediaHandler.config.onTalkEvent?.("provider-call-1", "MZ-talk", {
id: "voice-call:provider-call-1:MZ-talk:1",
type: "transcript.done",
sessionId: "voice-call:provider-call-1:MZ-talk",
turnId: "MZ-talk:turn:1",
seq: 1,
timestamp: "2026-05-05T06:00:00.000Z",
mode: "stt-tts",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
final: true,
payload: { text: "hello", role: "user" },
});
expect(call.metadata).toEqual(
expect.objectContaining({
lastTalkEventAt: "2026-05-05T06:00:00.000Z",
lastTalkEventType: "transcript.done",
recentTalkEvents: [
{
at: "2026-05-05T06:00:00.000Z",
type: "transcript.done",
sessionId: "voice-call:provider-call-1:MZ-talk",
turnId: "MZ-talk:turn:1",
},
],
}),
);
} finally {
await server.stop();
}
});
});
describe("VoiceCallWebhookServer media stream client IP resolution", () => {

View File

@@ -2,6 +2,7 @@ import http from "node:http";
import { URL } from "node:url";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { resolveConfiguredCapabilityProvider } from "openclaw/plugin-sdk/provider-selection-runtime";
import type { TalkEvent } from "openclaw/plugin-sdk/realtime-voice";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import {
createWebhookInFlightLimiter,
@@ -77,6 +78,28 @@ function sanitizeTranscriptForLog(value: string): string {
return `${sanitized.slice(0, TRANSCRIPT_LOG_MAX_CHARS)}...`;
}
function appendRecentTalkEventMetadata(call: CallRecord, event: TalkEvent): void {
const metadata = call.metadata ?? {};
const recent = Array.isArray(metadata.recentTalkEvents)
? metadata.recentTalkEvents.filter(
(entry): entry is { at: string; type: string; sessionId: string; turnId?: string } =>
!!entry && typeof entry === "object" && !Array.isArray(entry),
)
: [];
recent.push({
at: event.timestamp,
type: event.type,
sessionId: event.sessionId,
turnId: event.turnId,
});
call.metadata = {
...metadata,
lastTalkEventAt: event.timestamp,
lastTalkEventType: event.type,
recentTalkEvents: recent.slice(-10),
};
}
function buildRequestUrl(
requestUrl: string | undefined,
requestHost: string | undefined,
@@ -400,6 +423,12 @@ export class VoiceCallWebhookServer {
const safePartial = sanitizeTranscriptForLog(partial);
console.log(`[voice-call] Partial for ${callId}: ${safePartial} (chars=${partial.length})`);
},
onTalkEvent: (providerCallId, _streamSid, event) => {
const call = this.manager.getCallByProviderCallId(providerCallId);
if (call) {
appendRecentTalkEventMetadata(call, event);
}
},
onConnect: (callId, streamSid) => {
console.log(`[voice-call] Media stream connected: ${callId} -> ${streamSid}`);
this.clearPendingDisconnectHangup(callId);

View File

@@ -422,6 +422,192 @@ describe("RealtimeCallHandler path routing", () => {
}
});
it("records common Talk events for realtime telephony sessions", async () => {
let callbacks:
| {
onAudio?: (audio: Buffer) => void;
onEvent?: (event: {
direction: "client" | "server";
type: string;
detail?: string;
}) => void;
onReady?: () => void;
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const sendAudio = vi.fn();
const call: CallRecord = {
callId: "call-1",
providerCallId: "CA-talk-events",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
};
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
callbacks = request;
return makeBridge({ sendAudio });
},
);
const handler = makeHandler(undefined, {
manager: {
getCallByProviderCallId: vi.fn((): CallRecord => call),
},
realtimeProvider: makeRealtimeProvider(createBridge),
});
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-talk-events", callSid: "CA-talk-events" },
}),
);
await vi.waitFor(() => {
expect(createBridge).toHaveBeenCalled();
});
callbacks?.onReady?.();
ws.send(
JSON.stringify({
event: "media",
media: { payload: Buffer.from([0xff, 0xff]).toString("base64") },
}),
);
await vi.waitFor(() => {
expect(sendAudio).toHaveBeenCalledWith(Buffer.from([0xff, 0xff]));
});
callbacks?.onTranscript?.("user", "hello", true);
callbacks?.onAudio?.(Buffer.from([1, 2, 3]));
callbacks?.onTranscript?.("assistant", "hi there", true);
callbacks?.onEvent?.({ direction: "server", type: "response.done" });
const recent = call.metadata?.recentTalkEvents as
| Array<{
brain: string;
provider: string;
sessionId: string;
transport: string;
type: string;
}>
| undefined;
expect(recent?.map((event) => event.type)).toEqual([
"session.started",
"session.ready",
"turn.started",
"input.audio.delta",
"transcript.done",
"input.audio.committed",
"output.audio.started",
"output.audio.delta",
"output.text.done",
"output.audio.done",
"turn.ended",
]);
expect(recent?.[0]).toMatchObject({
provider: "openai",
sessionId: "voice-call:call-1:realtime",
transport: "gateway-relay",
});
expect(call.metadata?.lastTalkEventType).toBe("turn.ended");
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("emits barge-in cancellation with a turn before provider speech_started", async () => {
let callbacks:
| {
onAudio?: (audio: Buffer) => void;
}
| undefined;
const sendAudio = vi.fn();
const call: CallRecord = {
callId: "call-1",
providerCallId: "CA-barge-in",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
};
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
callbacks = request;
return makeBridge({ sendAudio });
},
);
const handler = makeHandler(undefined, {
manager: {
getCallByProviderCallId: vi.fn((): CallRecord => call),
},
realtimeProvider: makeRealtimeProvider(createBridge),
});
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-barge-in", callSid: "CA-barge-in" },
}),
);
await vi.waitFor(() => {
expect(createBridge).toHaveBeenCalled();
});
callbacks?.onAudio?.(Buffer.from([1, 2, 3]));
const speechPayload = Buffer.alloc(160, 0x00).toString("base64");
ws.send(JSON.stringify({ event: "media", media: { payload: speechPayload } }));
ws.send(JSON.stringify({ event: "media", media: { payload: speechPayload } }));
await vi.waitFor(() => {
expect(sendAudio).toHaveBeenCalledTimes(2);
});
const recent = call.metadata?.recentTalkEvents as
| Array<{
turnId?: string;
type: string;
}>
| undefined;
const cancelled = recent?.find((event) => event.type === "turn.cancelled");
expect(cancelled).toMatchObject({
turnId: expect.stringMatching(/^turn-\d+$/),
});
expect(recent?.findLast((event) => event.type === "input.audio.delta")?.turnId).not.toBe(
cancelled?.turnId,
);
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("submits continuing responses only for realtime agent consult calls", async () => {
let callbacks:
| {

View File

@@ -4,11 +4,15 @@ import type { Duplex } from "node:stream";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
buildRealtimeVoiceAgentConsultWorkingResponse,
createTalkSessionController,
createRealtimeVoiceBridgeSession,
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceProviderConfig,
type RealtimeVoiceProviderPlugin,
type TalkEvent,
type TalkEventInput,
type TalkSessionController,
} from "openclaw/plugin-sdk/realtime-voice";
import WebSocket, { WebSocketServer } from "ws";
import type { VoiceCallRealtimeConfig } from "../config.js";
@@ -41,6 +45,7 @@ const CONSULT_TRANSCRIPT_SETTLE_MS = 350;
const CONSULT_TRANSCRIPT_SETTLE_MAX_MS = 1_000;
const MAX_PARTIAL_USER_TRANSCRIPT_CHARS = 1_200;
const RECENT_FINAL_USER_TRANSCRIPT_TTL_MS = 2_000;
const BARGE_IN_REQUIRED_LOUD_CHUNKS = 2;
function normalizePath(pathname: string): string {
const trimmed = pathname.trim();
@@ -243,6 +248,36 @@ type NativeConsultState = {
type TelephonyCloseReason = "completed" | "error";
function appendRecentTalkEventMetadata(
call: CallRecord | null | undefined,
event: TalkEvent,
): void {
if (!call) {
return;
}
const metadata = call.metadata ?? {};
const previous = Array.isArray(metadata.recentTalkEvents) ? metadata.recentTalkEvents : [];
metadata.lastTalkEventAt = event.timestamp;
metadata.lastTalkEventType = event.type;
metadata.recentTalkEvents = [
...previous,
{
id: event.id,
brain: event.brain,
mode: event.mode,
provider: event.provider,
seq: event.seq,
sessionId: event.sessionId,
timestamp: event.timestamp,
transport: event.transport,
type: event.type,
...(event.turnId ? { turnId: event.turnId } : {}),
...(event.final !== undefined ? { final: event.final } : {}),
},
].slice(-12);
call.metadata = metadata;
}
export class RealtimeCallHandler {
private readonly toolHandlers = new Map<string, ToolHandlerFn>();
private readonly pendingStreamTokens = new Map<string, PendingStreamToken>();
@@ -471,6 +506,49 @@ export class RealtimeCallHandler {
}
const { callId, initialGreetingInstructions } = registration;
const callRecord = this.manager.getCallByProviderCallId(callSid);
const talk: TalkSessionController = createTalkSessionController({
sessionId: `voice-call:${callId}:realtime`,
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: this.realtimeProvider.id,
});
const rememberTalkEvent = (event: TalkEvent | undefined): TalkEvent | undefined => {
if (event) {
appendRecentTalkEventMetadata(callRecord, event);
}
return event;
};
const emitTalkEvent = (input: TalkEventInput): TalkEvent => {
return rememberTalkEvent(talk.emit(input)) as TalkEvent;
};
const ensureTalkTurn = (): string => {
const turn = talk.ensureTurn({
payload: { callId, providerCallId: callSid },
});
rememberTalkEvent(turn.event);
return turn.turnId;
};
const endTalkTurn = (reason = "completed"): void => {
const ended = talk.endTurn({
payload: { callId, providerCallId: callSid, reason },
});
if (ended.ok) {
rememberTalkEvent(ended.event);
}
};
const finishOutputAudio = (reason: string): void => {
rememberTalkEvent(
talk.finishOutputAudio({
payload: { callId, providerCallId: callSid, reason },
}),
);
};
emitTalkEvent({
type: "session.started",
payload: { callId, providerCallId: callSid, streamSid },
});
console.log(
`[voice-call] Realtime bridge starting for call ${callId} (providerCallId=${callSid}, initialGreeting=${initialGreetingInstructions ? "queued" : "absent"})`,
);
@@ -516,7 +594,9 @@ export class RealtimeCallHandler {
}
},
});
const speechDetector = new RealtimeMulawSpeechStartDetector();
const speechDetector = new RealtimeMulawSpeechStartDetector({
requiredLoudChunks: BARGE_IN_REQUIRED_LOUD_CHUNKS,
});
const session = createRealtimeVoiceBridgeSession({
provider: this.realtimeProvider,
providerConfig: this.providerConfig,
@@ -527,6 +607,18 @@ export class RealtimeCallHandler {
audioSink: {
isOpen: () => ws.readyState === WebSocket.OPEN,
sendAudio: (muLaw) => {
const turnId = ensureTalkTurn();
rememberTalkEvent(
talk.startOutputAudio({
turnId,
payload: { callId, providerCallId: callSid },
}).event,
);
emitTalkEvent({
type: "output.audio.delta",
turnId,
payload: { byteLength: muLaw.length },
});
audioPacer.sendAudio(muLaw);
},
clearAudio: () => {
@@ -534,12 +626,37 @@ export class RealtimeCallHandler {
console.log(
`[voice-call] realtime outbound audio clear requested callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`,
);
finishOutputAudio("clear");
},
sendMark: (markName) => {
audioPacer.sendMark(markName);
},
},
onTranscript: (role, text, isFinal) => {
const turnId = ensureTalkTurn();
const eventType =
role === "assistant"
? isFinal
? "output.text.done"
: "output.text.delta"
: isFinal
? "transcript.done"
: "transcript.delta";
const payload = role === "assistant" ? { text } : { role, text };
emitTalkEvent({
type: eventType,
turnId,
payload,
final: isFinal,
});
if (role === "user" && isFinal) {
emitTalkEvent({
type: "input.audio.committed",
turnId,
payload: { callId, providerCallId: callSid },
final: true,
});
}
if (!isFinal) {
if (role === "user" && text.trim()) {
const transcript = this.recordPartialUserTranscript(callId, text);
@@ -590,6 +707,14 @@ export class RealtimeCallHandler {
});
},
onToolCall: (toolEvent, session) => {
const turnId = ensureTalkTurn();
emitTalkEvent({
type: "tool.call",
turnId,
itemId: toolEvent.itemId,
callId: toolEvent.callId,
payload: { name: toolEvent.name, args: toolEvent.args },
});
console.log(
`[voice-call] realtime tool call received callId=${callId} providerCallId=${callSid} tool=${toolEvent.name}`,
);
@@ -599,10 +724,54 @@ export class RealtimeCallHandler {
toolEvent.callId || toolEvent.itemId,
toolEvent.name,
toolEvent.args,
turnId,
emitTalkEvent,
);
},
onEvent: (event) => {
if (event.type === "input_audio_buffer.speech_started") {
ensureTalkTurn();
return;
}
if (event.type === "input_audio_buffer.speech_stopped") {
const turnId = talk.activeTurnId;
if (!turnId) {
return;
}
emitTalkEvent({
type: "input.audio.committed",
turnId,
payload: { callId, providerCallId: callSid, source: event.type },
final: true,
});
return;
}
if (event.type === "response.done") {
finishOutputAudio("response.done");
endTalkTurn("response.done");
return;
}
if (event.type === "error") {
emitTalkEvent({
type: "session.error",
payload: { message: event.detail ?? "Realtime provider error" },
final: true,
});
}
},
onReady: () => {
emitTalkEvent({
type: "session.ready",
payload: { callId, providerCallId: callSid },
});
},
onError: (error) => {
console.error("[voice-call] realtime voice error:", error.message);
emitTalkEvent({
type: "session.error",
payload: { message: error.message },
final: true,
});
},
onClose: (reason) => {
this.activeBridgesByCallId.delete(callId);
@@ -610,6 +779,12 @@ export class RealtimeCallHandler {
this.activeTelephonyClosersByCallId.delete(callId);
this.activeTelephonyClosersByCallId.delete(callSid);
this.clearUserTranscriptState(callId);
finishOutputAudio(reason);
emitTalkEvent({
type: "session.closed",
payload: { reason },
final: true,
});
if (reason !== "error") {
return;
}
@@ -639,11 +814,25 @@ export class RealtimeCallHandler {
const sendAudioToSession = session.sendAudio.bind(session);
session.sendAudio = (audio) => {
if (speechDetector.accept(audio)) {
const interruptedTurnId = ensureTalkTurn();
const clearedBytes = audioPacer.clearAudio();
console.log(
`[voice-call] realtime outbound audio cleared by barge-in callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`,
);
finishOutputAudio("barge-in");
const cancelled = talk.cancelTurn({
turnId: interruptedTurnId,
payload: { callId, providerCallId: callSid, reason: "barge-in" },
});
if (cancelled.ok) {
rememberTalkEvent(cancelled.event);
}
}
emitTalkEvent({
type: "input.audio.delta",
turnId: ensureTalkTurn(),
payload: { byteLength: audio.length },
});
sendAudioToSession(audio);
};
const closeSession = session.close.bind(session);
@@ -961,9 +1150,49 @@ export class RealtimeCallHandler {
bridgeCallId: string,
name: string,
args: unknown,
turnId: string,
emitTalkEvent?: (input: TalkEventInput) => TalkEvent,
): Promise<void> {
const handler = this.toolHandlers.get(name);
const startedAt = Date.now();
const hasResultError = (result: unknown): boolean => {
return Boolean(
result && typeof result === "object" && !Array.isArray(result) && "error" in result,
);
};
const emitFinalToolEvent = (result: unknown): void => {
emitTalkEvent?.({
type: hasResultError(result) ? "tool.error" : "tool.result",
turnId,
callId: bridgeCallId,
payload: { name, result },
final: true,
});
};
const submitFinalToolResult = (result: unknown): void => {
bridge.submitToolResult(bridgeCallId, result);
emitFinalToolEvent(result);
};
const submitWorkingResponse = () => {
if (
handler &&
name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME &&
bridge.bridge.supportsToolResultContinuation &&
!this.config.fastContext.enabled
) {
emitTalkEvent?.({
type: "tool.progress",
turnId,
callId: bridgeCallId,
payload: { name, status: "working" },
});
bridge.submitToolResult(
bridgeCallId,
buildRealtimeVoiceAgentConsultWorkingResponse("caller"),
{ willContinue: true },
);
}
};
if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) {
this.lastProviderConsultAtByCallId.set(callId, Date.now());
const timer = this.forcedConsultTimersByCallId.get(callId);
@@ -974,7 +1203,7 @@ export class RealtimeCallHandler {
const forcedConsult = this.forcedConsultsByCallId.get(callId);
if (forcedConsult) {
if (forcedConsult.completedAt) {
bridge.submitToolResult(bridgeCallId, {
submitFinalToolResult({
status: "already_delivered",
message: "OpenClaw already delivered this consult result internally. Do not repeat it.",
});
@@ -984,31 +1213,17 @@ export class RealtimeCallHandler {
const result = await forcedConsult.promise.catch((error: unknown) => ({
error: formatErrorMessage(error),
}));
bridge.submitToolResult(bridgeCallId, result);
submitFinalToolResult(result);
return;
}
const submitWorkingResponse = () => {
if (
handler &&
bridge.bridge.supportsToolResultContinuation &&
!this.config.fastContext.enabled
) {
bridge.submitToolResult(
bridgeCallId,
buildRealtimeVoiceAgentConsultWorkingResponse("caller"),
{ willContinue: true },
);
}
};
const existingNativeConsult = this.nativeConsultsInFlightByCallId.get(callId);
if (existingNativeConsult) {
console.log(
`[voice-call] realtime tool call sharing in-flight agent consult callId=${callId} ageMs=${Date.now() - existingNativeConsult.startedAt}`,
);
submitWorkingResponse();
bridge.submitToolResult(bridgeCallId, await existingNativeConsult.promise);
submitFinalToolResult(await existingNativeConsult.promise);
return;
}
@@ -1047,7 +1262,7 @@ export class RealtimeCallHandler {
console.log(
`[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`,
);
bridge.submitToolResult(bridgeCallId, result);
submitFinalToolResult(result);
if (status === "ok") {
this.consumePartialUserTranscript(callId, state.partialUserTranscript);
}
@@ -1084,7 +1299,7 @@ export class RealtimeCallHandler {
console.log(
`[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`,
);
bridge.submitToolResult(bridgeCallId, result);
submitFinalToolResult(result);
if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME && status === "ok") {
this.consumePartialUserTranscript(callId, context.partialUserTranscript);
}