diff --git a/src/plugin-sdk/realtime-voice.ts b/src/plugin-sdk/realtime-voice.ts index d88b6d34b7b..a13a04b5ec9 100644 --- a/src/plugin-sdk/realtime-voice.ts +++ b/src/plugin-sdk/realtime-voice.ts @@ -8,6 +8,7 @@ export type { RealtimeVoiceBrowserSession, RealtimeVoiceBrowserSessionCreateRequest, RealtimeVoiceBridgeCreateRequest, + RealtimeVoiceProviderCapabilities, RealtimeVoiceCloseReason, RealtimeVoiceProviderConfig, RealtimeVoiceProviderConfiguredContext, @@ -22,6 +23,29 @@ export { REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ, REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, } from "../realtime-voice/provider-types.js"; +export { + createTalkEventSequencer, + TALK_EVENT_TYPES, + type TalkBrain, + type TalkEvent, + type TalkEventContext, + type TalkEventInput, + type TalkEventSequencer, + type TalkEventType, + type TalkMode, + type TalkTransport, +} from "../realtime-voice/talk-events.js"; +export { + createTalkSessionController, + normalizeTalkTransport, + type TalkEnsureTurnResult, + type TalkSessionController, + type TalkSessionControllerParams, + type TalkTurnFailure, + type TalkTurnFailureReason, + type TalkTurnResult, + type TalkTurnSuccess, +} from "../realtime-voice/talk-session-controller.js"; export { buildRealtimeVoiceAgentConsultChatMessage, buildRealtimeVoiceAgentConsultPrompt, @@ -44,6 +68,18 @@ export { type RealtimeVoiceAgentConsultResult, type RealtimeVoiceAgentConsultRuntime, } from "../realtime-voice/agent-consult-runtime.js"; +export { + createRealtimeVoiceAgentTalkbackQueue, + type RealtimeVoiceAgentTalkbackQueue, + type RealtimeVoiceAgentTalkbackQueueParams, + type RealtimeVoiceAgentTalkbackResult, +} from "../realtime-voice/agent-talkback-runtime.js"; +export { + resolveRealtimeVoiceFastContextConsult, + type RealtimeVoiceFastContextConfig, + type RealtimeVoiceFastContextConsultResult, + type RealtimeVoiceFastContextLabels, +} from "../realtime-voice/fast-context-runtime.js"; export { canonicalizeRealtimeVoiceProviderId, getRealtimeVoiceProvider, @@ -62,6 +98,18 @@ export { type RealtimeVoiceBridgeSessionParams, type RealtimeVoiceMarkStrategy, } from "../realtime-voice/session-runtime.js"; +export { + extendRealtimeVoiceOutputEchoSuppression, + getRealtimeVoiceBridgeEventHealth, + getRealtimeVoiceTranscriptHealth, + isLikelyRealtimeVoiceAssistantEchoTranscript, + recordRealtimeVoiceBridgeEvent, + recordRealtimeVoiceTranscript, + type RealtimeVoiceBridgeEventHealth, + type RealtimeVoiceBridgeEventLogEntry, + type RealtimeVoiceTranscriptEntry, + type RealtimeVoiceTranscriptHealth, +} from "../realtime-voice/session-log-runtime.js"; export { convertPcmToMulaw8k, mulawToPcm, diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 3718330c388..612a442bb1d 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -47,6 +47,7 @@ import type { RealtimeVoiceBrowserSession, RealtimeVoiceBrowserSessionCreateRequest, RealtimeVoiceBridgeCreateRequest, + RealtimeVoiceProviderCapabilities, RealtimeVoiceProviderConfig, RealtimeVoiceProviderConfiguredContext, RealtimeVoiceProviderId, @@ -1839,6 +1840,7 @@ export type RealtimeVoiceProviderPlugin = { aliases?: string[]; defaultModel?: string; autoSelectOrder?: number; + capabilities?: RealtimeVoiceProviderCapabilities; resolveConfig?: (ctx: RealtimeVoiceProviderResolveConfigContext) => RealtimeVoiceProviderConfig; isConfigured: (ctx: RealtimeVoiceProviderConfiguredContext) => boolean; createBridge: (req: RealtimeVoiceBridgeCreateRequest) => RealtimeVoiceBridge; diff --git a/src/realtime-voice/agent-talkback-runtime.test.ts b/src/realtime-voice/agent-talkback-runtime.test.ts new file mode 100644 index 00000000000..cea54ed7e66 --- /dev/null +++ b/src/realtime-voice/agent-talkback-runtime.test.ts @@ -0,0 +1,173 @@ +import { describe, expect, it, vi } from "vitest"; +import { createRealtimeVoiceAgentTalkbackQueue } from "./agent-talkback-runtime.js"; + +function makeLogger() { + return { + info: vi.fn(), + warn: vi.fn(), + }; +} + +describe("realtime voice agent talkback queue", () => { + it("debounces transcript fragments into one consult", async () => { + vi.useFakeTimers(); + const logger = makeLogger(); + const consult = vi.fn(async ({ question }) => ({ text: `answer:${question}` })); + const deliver = vi.fn(); + const queue = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: 100, + isStopped: () => false, + logger, + logPrefix: "[test]", + responseStyle: "brief", + fallbackText: "fallback", + consult, + deliver, + }); + + queue.enqueue("first"); + queue.enqueue("second"); + await vi.advanceTimersByTimeAsync(100); + + expect(consult).toHaveBeenCalledWith({ + question: "first\nsecond", + responseStyle: "brief", + signal: expect.any(AbortSignal), + }); + expect(deliver).toHaveBeenCalledWith("answer:first\nsecond"); + vi.useRealTimers(); + }); + + it("accumulates pending questions while a consult is active", async () => { + vi.useFakeTimers(); + const logger = makeLogger(); + let finishFirst: ((value: { text: string }) => void) | undefined; + const consult = vi + .fn() + .mockImplementationOnce( + () => + new Promise<{ text: string }>((resolve) => { + finishFirst = resolve; + }), + ) + .mockResolvedValueOnce({ text: "second-answer" }); + const deliver = vi.fn(); + const queue = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: 10, + isStopped: () => false, + logger, + logPrefix: "[test]", + responseStyle: "brief", + fallbackText: "fallback", + consult, + deliver, + }); + + queue.enqueue("first"); + await vi.advanceTimersByTimeAsync(10); + queue.enqueue("ignored"); + queue.enqueue("second"); + await vi.advanceTimersByTimeAsync(10); + finishFirst?.({ text: "first-answer" }); + await vi.runAllTimersAsync(); + + expect(consult).toHaveBeenNthCalledWith(1, { + question: "first", + responseStyle: "brief", + signal: expect.any(AbortSignal), + }); + expect(consult).toHaveBeenNthCalledWith(2, { + question: "ignored\nsecond", + responseStyle: "brief", + signal: expect.any(AbortSignal), + }); + expect(deliver).toHaveBeenCalledWith("first-answer"); + expect(deliver).toHaveBeenCalledWith("second-answer"); + vi.useRealTimers(); + }); + + it("delivers fallback text when consult fails", async () => { + vi.useFakeTimers(); + const logger = makeLogger(); + const deliver = vi.fn(); + const queue = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: 1, + isStopped: () => false, + logger, + logPrefix: "[test]", + responseStyle: "brief", + fallbackText: "fallback", + consult: vi.fn(async () => { + throw new Error("boom"); + }), + deliver, + }); + + queue.enqueue("question"); + await vi.advanceTimersByTimeAsync(1); + + expect(logger.warn).toHaveBeenCalledWith("[test] consult failed: boom"); + expect(deliver).toHaveBeenCalledWith("fallback"); + vi.useRealTimers(); + }); + + it("cancels pending debounced work on close", async () => { + vi.useFakeTimers(); + const consult = vi.fn(async () => ({ text: "answer" })); + const queue = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: 100, + isStopped: () => false, + logger: makeLogger(), + logPrefix: "[test]", + responseStyle: "brief", + fallbackText: "fallback", + consult, + deliver: vi.fn(), + }); + + queue.enqueue("question"); + queue.close(); + await vi.advanceTimersByTimeAsync(100); + + expect(consult).not.toHaveBeenCalled(); + vi.useRealTimers(); + }); + + it("aborts the active consult on close without delivering fallback", async () => { + vi.useFakeTimers(); + const logger = makeLogger(); + let signal: AbortSignal | undefined; + const consult = vi.fn( + ({ signal: nextSignal }) => + new Promise<{ text: string }>((_resolve, reject) => { + signal = nextSignal; + nextSignal.addEventListener("abort", () => { + const error = new Error("aborted"); + error.name = "AbortError"; + reject(error); + }); + }), + ); + const deliver = vi.fn(); + const queue = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: 1, + isStopped: () => false, + logger, + logPrefix: "[test]", + responseStyle: "brief", + fallbackText: "fallback", + consult, + deliver, + }); + + queue.enqueue("question"); + await vi.advanceTimersByTimeAsync(1); + queue.close(); + await vi.runAllTimersAsync(); + + expect(signal?.aborted).toBe(true); + expect(deliver).not.toHaveBeenCalled(); + expect(logger.warn).not.toHaveBeenCalled(); + vi.useRealTimers(); + }); +}); diff --git a/src/realtime-voice/agent-talkback-runtime.ts b/src/realtime-voice/agent-talkback-runtime.ts new file mode 100644 index 00000000000..638c5fbb3ae --- /dev/null +++ b/src/realtime-voice/agent-talkback-runtime.ts @@ -0,0 +1,131 @@ +import type { RuntimeLogger } from "../plugins/runtime/types-core.js"; + +export type RealtimeVoiceAgentTalkbackResult = { + text: string; +}; + +export type RealtimeVoiceAgentTalkbackQueue = { + close(): void; + enqueue(question: string): void; +}; + +export type RealtimeVoiceAgentTalkbackQueueParams = { + debounceMs: number; + isStopped: () => boolean; + logger: Pick; + logPrefix: string; + responseStyle: string; + fallbackText: string; + consult: (args: { + question: string; + responseStyle: string; + signal: AbortSignal; + }) => Promise; + deliver: (text: string) => void; +}; + +export function createRealtimeVoiceAgentTalkbackQueue( + params: RealtimeVoiceAgentTalkbackQueueParams, +): RealtimeVoiceAgentTalkbackQueue { + let active = false; + let pendingQuestion: string | undefined; + let debounceTimer: ReturnType | undefined; + let activeAbortController: AbortController | undefined; + + const clearDebounceTimer = () => { + if (!debounceTimer) { + return; + } + clearTimeout(debounceTimer); + debounceTimer = undefined; + }; + + const run = async (question: string): Promise => { + const trimmed = question.trim(); + if (!trimmed || params.isStopped()) { + return; + } + if (active) { + pendingQuestion = appendPendingQuestion(pendingQuestion, trimmed); + return; + } + + active = true; + let nextQuestion: string | undefined = trimmed; + try { + while (nextQuestion) { + if (params.isStopped()) { + return; + } + const currentQuestion = nextQuestion; + pendingQuestion = undefined; + params.logger.info(`${params.logPrefix} consult: ${currentQuestion}`); + activeAbortController = new AbortController(); + const result = await params.consult({ + question: currentQuestion, + responseStyle: params.responseStyle, + signal: activeAbortController.signal, + }); + activeAbortController = undefined; + const text = result.text.trim(); + if (!params.isStopped() && text) { + params.deliver(text); + } + nextQuestion = pendingQuestion; + } + } catch (error) { + activeAbortController = undefined; + if (params.isStopped() || isAbortError(error)) { + return; + } + const message = error instanceof Error ? error.message : String(error); + params.logger.warn(`${params.logPrefix} consult failed: ${message}`); + params.deliver(params.fallbackText); + } finally { + active = false; + const queuedQuestion = pendingQuestion; + pendingQuestion = undefined; + if (queuedQuestion && !params.isStopped()) { + void run(queuedQuestion); + } + } + }; + + return { + close: () => { + clearDebounceTimer(); + pendingQuestion = undefined; + activeAbortController?.abort(); + }, + enqueue: (question) => { + const trimmed = question.trim(); + if (!trimmed || params.isStopped()) { + return; + } + if (active) { + pendingQuestion = appendPendingQuestion(pendingQuestion, trimmed); + clearDebounceTimer(); + return; + } + pendingQuestion = appendPendingQuestion(pendingQuestion, trimmed); + clearDebounceTimer(); + debounceTimer = setTimeout(() => { + debounceTimer = undefined; + const queuedQuestion = pendingQuestion; + pendingQuestion = undefined; + if (queuedQuestion && !params.isStopped()) { + void run(queuedQuestion); + } + }, params.debounceMs); + debounceTimer.unref?.(); + }, + }; +} + +function appendPendingQuestion(current: string | undefined, next: string): string { + return current ? `${current}\n${next}` : next; +} + +function isAbortError(error: unknown): boolean { + return error instanceof Error && error.name === "AbortError"; +} diff --git a/src/realtime-voice/fast-context-runtime.ts b/src/realtime-voice/fast-context-runtime.ts new file mode 100644 index 00000000000..ca34b861103 --- /dev/null +++ b/src/realtime-voice/fast-context-runtime.ts @@ -0,0 +1,189 @@ +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { formatErrorMessage } from "../infra/errors.js"; +import { getActiveMemorySearchManager } from "../plugins/memory-runtime.js"; +import type { RealtimeVoiceAgentConsultResult } from "./agent-consult-runtime.js"; +import { parseRealtimeVoiceAgentConsultArgs } from "./agent-consult-tool.js"; + +type Logger = { + debug?: (message: string) => void; +}; + +type MemorySearchHit = { + path: string; + startLine: number; + endLine: number; + snippet: string; + source: "memory" | "sessions"; + score: number; +}; + +export type RealtimeVoiceFastContextConfig = { + enabled: boolean; + maxResults: number; + sources: Array<"memory" | "sessions">; + timeoutMs: number; + fallbackToConsult: boolean; +}; + +export type RealtimeVoiceFastContextLabels = { + audienceLabel: string; + contextName: string; +}; + +type FastContextLookupResult = + | { status: "unavailable"; error?: string } + | { status: "hits"; hits: MemorySearchHit[] }; + +export type RealtimeVoiceFastContextConsultResult = + | { handled: false } + | { handled: true; result: RealtimeVoiceAgentConsultResult }; + +const MAX_SNIPPET_CHARS = 700; + +class RealtimeFastContextTimeoutError extends Error { + constructor(timeoutMs: number) { + super(`fast context lookup timed out after ${timeoutMs}ms`); + this.name = "RealtimeFastContextTimeoutError"; + } +} + +function normalizeSnippet(text: string): string { + const normalized = text.replace(/\s+/g, " ").trim(); + if (normalized.length <= MAX_SNIPPET_CHARS) { + return normalized; + } + return `${normalized.slice(0, MAX_SNIPPET_CHARS - 1).trimEnd()}...`; +} + +function buildSearchQuery(args: unknown): string { + const parsed = parseRealtimeVoiceAgentConsultArgs(args); + return [parsed.question, parsed.context].filter(Boolean).join("\n\n"); +} + +function resolveLabels( + labels?: Partial, +): RealtimeVoiceFastContextLabels { + return { + audienceLabel: labels?.audienceLabel?.trim() || "person", + contextName: labels?.contextName?.trim() || "OpenClaw memory context", + }; +} + +function buildContextText(params: { + query: string; + hits: MemorySearchHit[]; + labels: RealtimeVoiceFastContextLabels; +}): string { + const hits = params.hits + .map((hit, index) => { + const location = `${hit.path}:${hit.startLine}-${hit.endLine}`; + return `${index + 1}. [${hit.source}] ${location}\n${normalizeSnippet(hit.snippet)}`; + }) + .join("\n\n"); + return [ + `Fast ${params.labels.contextName} found for the live ${params.labels.audienceLabel}.`, + `Use this context only if it answers the ${params.labels.audienceLabel}'s question. If it is not relevant, say briefly that you do not have that context handy.`, + `Question:\n${params.query}`, + `Context:\n${hits}`, + ].join("\n\n"); +} + +function buildMissText(query: string, labels: RealtimeVoiceFastContextLabels): string { + return [ + `No relevant ${labels.contextName} was found quickly for the live ${labels.audienceLabel}.`, + `Answer briefly that you do not have that context handy. Do not keep checking unless the ${labels.audienceLabel} asks you to.`, + `Question:\n${query}`, + ].join("\n\n"); +} + +async function withTimeout(promise: Promise, timeoutMs: number): Promise { + let timer: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_resolve, reject) => { + timer = setTimeout(() => reject(new RealtimeFastContextTimeoutError(timeoutMs)), timeoutMs); + }), + ]); + } finally { + if (timer) { + clearTimeout(timer); + } + } +} + +async function lookupFastContext(params: { + cfg: OpenClawConfig; + agentId: string; + sessionKey: string; + config: RealtimeVoiceFastContextConfig; + query: string; +}): Promise { + const memory = await getActiveMemorySearchManager({ + cfg: params.cfg, + agentId: params.agentId, + }); + if (!memory.manager) { + return { + status: "unavailable", + error: memory.error ?? "no active memory manager", + }; + } + const hits = await memory.manager.search(params.query, { + maxResults: params.config.maxResults, + sessionKey: params.sessionKey, + sources: params.config.sources, + }); + return { status: "hits", hits }; +} + +export async function resolveRealtimeVoiceFastContextConsult(params: { + cfg: OpenClawConfig; + agentId: string; + sessionKey: string; + config: RealtimeVoiceFastContextConfig; + args: unknown; + logger: Logger; + labels?: Partial; +}): Promise { + if (!params.config.enabled) { + return { handled: false }; + } + + const labels = resolveLabels(params.labels); + const query = buildSearchQuery(params.args); + try { + const lookup = await withTimeout( + lookupFastContext({ + cfg: params.cfg, + agentId: params.agentId, + sessionKey: params.sessionKey, + config: params.config, + query, + }), + params.config.timeoutMs, + ); + if (lookup.status === "unavailable") { + params.logger.debug?.(`[realtime-voice] fast context unavailable: ${lookup.error}`); + return params.config.fallbackToConsult + ? { handled: false } + : { handled: true, result: { text: buildMissText(query, labels) } }; + } + const { hits } = lookup; + if (hits.length === 0) { + return params.config.fallbackToConsult + ? { handled: false } + : { handled: true, result: { text: buildMissText(query, labels) } }; + } + return { + handled: true, + result: { text: buildContextText({ query, hits, labels }) }, + }; + } catch (error) { + const message = formatErrorMessage(error); + params.logger.debug?.(`[realtime-voice] fast context lookup failed: ${message}`); + return params.config.fallbackToConsult + ? { handled: false } + : { handled: true, result: { text: buildMissText(query, labels) } }; + } +} diff --git a/src/realtime-voice/provider-types.ts b/src/realtime-voice/provider-types.ts index d40d420e026..cc784fb175e 100644 --- a/src/realtime-voice/provider-types.ts +++ b/src/realtime-voice/provider-types.ts @@ -1,4 +1,5 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { TalkTransport } from "./talk-events.js"; export type RealtimeVoiceProviderId = string; @@ -72,6 +73,17 @@ export type RealtimeVoiceBridgeCallbacks = { export type RealtimeVoiceProviderConfig = Record; +export type RealtimeVoiceProviderCapabilities = { + transports: TalkTransport[]; + inputAudioFormats: RealtimeVoiceAudioFormat[]; + outputAudioFormats: RealtimeVoiceAudioFormat[]; + supportsBrowserSession?: boolean; + supportsBargeIn?: boolean; + supportsToolCalls?: boolean; + supportsVideoFrames?: boolean; + supportsSessionResumption?: boolean; +}; + export type RealtimeVoiceProviderResolveConfigContext = { cfg: OpenClawConfig; rawConfig: RealtimeVoiceProviderConfig; @@ -107,7 +119,7 @@ export type RealtimeVoiceBrowserAudioContract = { export type RealtimeVoiceBrowserWebRtcSdpSession = { provider: RealtimeVoiceProviderId; - transport?: "webrtc-sdp"; + transport: "webrtc"; clientSecret: string; offerUrl?: string; offerHeaders?: Record; @@ -118,7 +130,7 @@ export type RealtimeVoiceBrowserWebRtcSdpSession = { export type RealtimeVoiceBrowserJsonPcmWebSocketSession = { provider: RealtimeVoiceProviderId; - transport: "json-pcm-websocket"; + transport: "provider-websocket"; protocol: string; clientSecret: string; websocketUrl: string; diff --git a/src/realtime-voice/session-log-runtime.test.ts b/src/realtime-voice/session-log-runtime.test.ts new file mode 100644 index 00000000000..e97f6f68d79 --- /dev/null +++ b/src/realtime-voice/session-log-runtime.test.ts @@ -0,0 +1,80 @@ +import { describe, expect, it } from "vitest"; +import { + extendRealtimeVoiceOutputEchoSuppression, + getRealtimeVoiceBridgeEventHealth, + getRealtimeVoiceTranscriptHealth, + isLikelyRealtimeVoiceAssistantEchoTranscript, + recordRealtimeVoiceBridgeEvent, + recordRealtimeVoiceTranscript, + type RealtimeVoiceBridgeEventLogEntry, + type RealtimeVoiceTranscriptEntry, +} from "./session-log-runtime.js"; + +describe("realtime voice session log runtime", () => { + it("records bounded transcript health", () => { + const transcript: RealtimeVoiceTranscriptEntry[] = []; + recordRealtimeVoiceTranscript(transcript, "user", "hello", 1); + recordRealtimeVoiceTranscript(transcript, "assistant", "hi", 1); + + expect(getRealtimeVoiceTranscriptHealth(transcript)).toMatchObject({ + realtimeTranscriptLines: 1, + lastRealtimeTranscriptRole: "assistant", + lastRealtimeTranscriptText: "hi", + }); + }); + + it("skips noisy audio append events and records bridge health", () => { + const events: RealtimeVoiceBridgeEventLogEntry[] = []; + recordRealtimeVoiceBridgeEvent(events, { + direction: "client", + type: "input_audio_buffer.append", + }); + recordRealtimeVoiceBridgeEvent(events, { + direction: "server", + type: "response.done", + detail: "ok", + }); + + expect(getRealtimeVoiceBridgeEventHealth(events)).toMatchObject({ + lastRealtimeEventType: "server:response.done", + lastRealtimeEventDetail: "ok", + }); + }); + + it("detects likely assistant echo transcripts", () => { + const nowMs = Date.now(); + const transcript: RealtimeVoiceTranscriptEntry[] = [ + { + at: new Date(nowMs - 1000).toISOString(), + role: "assistant", + text: "The deployment finished cleanly and all checks passed", + }, + ]; + + expect( + isLikelyRealtimeVoiceAssistantEchoTranscript({ + transcript, + text: "deployment finished cleanly and all checks passed", + lookbackMs: 45_000, + nowMs, + }), + ).toBe(true); + }); + + it("extends output echo suppression from audio duration", () => { + expect( + extendRealtimeVoiceOutputEchoSuppression({ + audio: Buffer.alloc(96), + bytesPerMs: 48, + tailMs: 3000, + nowMs: 100, + lastOutputPlayableUntilMs: 0, + suppressInputUntilMs: 0, + }), + ).toEqual({ + durationMs: 2, + lastOutputPlayableUntilMs: 102, + suppressInputUntilMs: 3102, + }); + }); +}); diff --git a/src/realtime-voice/session-log-runtime.ts b/src/realtime-voice/session-log-runtime.ts new file mode 100644 index 00000000000..361eea3929f --- /dev/null +++ b/src/realtime-voice/session-log-runtime.ts @@ -0,0 +1,155 @@ +import type { RealtimeVoiceBridgeEvent, RealtimeVoiceRole } from "./provider-types.js"; + +export type RealtimeVoiceTranscriptEntry = { + at: string; + role: RealtimeVoiceRole; + text: string; +}; + +export type RealtimeVoiceTranscriptHealth = { + realtimeTranscriptLines: number; + lastRealtimeTranscriptAt?: string; + lastRealtimeTranscriptRole?: RealtimeVoiceRole; + lastRealtimeTranscriptText?: string; + recentRealtimeTranscript: RealtimeVoiceTranscriptEntry[]; +}; + +export type RealtimeVoiceBridgeEventLogEntry = RealtimeVoiceBridgeEvent & { + at: string; +}; + +export type RealtimeVoiceBridgeEventHealth = { + lastRealtimeEventAt?: string; + lastRealtimeEventType?: string; + lastRealtimeEventDetail?: string; + recentRealtimeEvents: RealtimeVoiceBridgeEventLogEntry[]; +}; + +export function recordRealtimeVoiceTranscript( + transcript: RealtimeVoiceTranscriptEntry[], + role: RealtimeVoiceRole, + text: string, + maxEntries = 40, +): RealtimeVoiceTranscriptEntry { + const entry = { at: new Date().toISOString(), role, text }; + transcript.push(entry); + if (transcript.length > maxEntries) { + transcript.splice(0, transcript.length - maxEntries); + } + return entry; +} + +export function getRealtimeVoiceTranscriptHealth( + transcript: RealtimeVoiceTranscriptEntry[], +): RealtimeVoiceTranscriptHealth { + const last = transcript.at(-1); + return { + realtimeTranscriptLines: transcript.length, + lastRealtimeTranscriptAt: last?.at, + lastRealtimeTranscriptRole: last?.role, + lastRealtimeTranscriptText: last?.text, + recentRealtimeTranscript: transcript.slice(-5), + }; +} + +export function recordRealtimeVoiceBridgeEvent( + events: RealtimeVoiceBridgeEventLogEntry[], + event: RealtimeVoiceBridgeEvent, + maxEntries = 40, +): void { + if (event.direction === "client" && event.type === "input_audio_buffer.append") { + return; + } + events.push({ at: new Date().toISOString(), ...event }); + if (events.length > maxEntries) { + events.splice(0, events.length - maxEntries); + } +} + +export function getRealtimeVoiceBridgeEventHealth( + events: RealtimeVoiceBridgeEventLogEntry[], +): RealtimeVoiceBridgeEventHealth { + const last = events.at(-1); + return { + lastRealtimeEventAt: last?.at, + lastRealtimeEventType: last ? `${last.direction}:${last.type}` : undefined, + lastRealtimeEventDetail: last?.detail, + recentRealtimeEvents: events.slice(-10), + }; +} + +function normalizeTranscriptForEchoMatch(text: string): string[] { + return text + .toLowerCase() + .replace(/['’]/g, "") + .replace(/[^a-z0-9]+/g, " ") + .trim() + .split(/\s+/) + .filter((token) => token.length > 1); +} + +function hasMeaningfulEchoOverlap(userTokens: string[], assistantTokens: string[]): boolean { + if (userTokens.length < 4 || assistantTokens.length < 4) { + return false; + } + const uniqueUserTokens = [...new Set(userTokens)]; + if (uniqueUserTokens.length < 4) { + return false; + } + const assistantTokenSet = new Set(assistantTokens); + const overlap = uniqueUserTokens.filter((token) => assistantTokenSet.has(token)).length; + return overlap / uniqueUserTokens.length >= 0.58; +} + +export function isLikelyRealtimeVoiceAssistantEchoTranscript(params: { + transcript: RealtimeVoiceTranscriptEntry[]; + text: string; + lookbackMs: number; + nowMs?: number; +}): boolean { + const userTokens = normalizeTranscriptForEchoMatch(params.text); + if (userTokens.length < 4) { + return false; + } + const nowMs = params.nowMs ?? Date.now(); + const recentAssistantText = params.transcript + .filter((entry) => { + if (entry.role !== "assistant") { + return false; + } + const at = Date.parse(entry.at); + return Number.isFinite(at) && nowMs - at <= params.lookbackMs; + }) + .slice(-6) + .map((entry) => entry.text) + .join(" "); + if (!recentAssistantText.trim()) { + return false; + } + const userNormalized = userTokens.join(" "); + const assistantTokens = normalizeTranscriptForEchoMatch(recentAssistantText); + const assistantNormalized = assistantTokens.join(" "); + return ( + (userNormalized.length >= 18 && assistantNormalized.includes(userNormalized)) || + (assistantNormalized.length >= 18 && userNormalized.includes(assistantNormalized)) || + hasMeaningfulEchoOverlap(userTokens, assistantTokens) + ); +} + +export function extendRealtimeVoiceOutputEchoSuppression(params: { + audio: Buffer; + bytesPerMs: number; + tailMs: number; + nowMs: number; + lastOutputPlayableUntilMs: number; + suppressInputUntilMs: number; +}): { lastOutputPlayableUntilMs: number; suppressInputUntilMs: number; durationMs: number } { + const durationMs = Math.ceil(params.audio.byteLength / params.bytesPerMs); + const playbackStartMs = Math.max(params.nowMs, params.lastOutputPlayableUntilMs); + const playbackEndMs = playbackStartMs + durationMs; + return { + durationMs, + lastOutputPlayableUntilMs: playbackEndMs, + suppressInputUntilMs: Math.max(params.suppressInputUntilMs, playbackEndMs + params.tailMs), + }; +} diff --git a/src/realtime-voice/talk-events.test.ts b/src/realtime-voice/talk-events.test.ts new file mode 100644 index 00000000000..6da2411daf1 --- /dev/null +++ b/src/realtime-voice/talk-events.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it } from "vitest"; +import { createTalkEventSequencer } from "./talk-events.js"; + +describe("talk event envelope", () => { + it("adds stable session context and monotonically increasing sequence numbers", () => { + const events = createTalkEventSequencer( + { + sessionId: "session-1", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + }, + { now: () => "2026-05-05T12:00:00.000Z" }, + ); + + expect(events.next({ type: "session.started", payload: { ok: true } })).toEqual({ + id: "session-1:1", + sessionId: "session-1", + seq: 1, + timestamp: "2026-05-05T12:00:00.000Z", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + type: "session.started", + payload: { ok: true }, + turnId: undefined, + captureId: undefined, + final: undefined, + callId: undefined, + itemId: undefined, + parentId: undefined, + }); + expect(events.next({ type: "session.ready", payload: null }).seq).toBe(2); + }); + + it("preserves turn, capture, and provider correlation fields", () => { + const events = createTalkEventSequencer({ + sessionId: "session-voice", + mode: "stt-tts", + transport: "managed-room", + brain: "agent-consult", + }); + + expect( + events.next({ + type: "tool.call", + turnId: "turn-1", + captureId: "capture-1", + callId: "call-1", + itemId: "item-1", + parentId: "parent-1", + final: false, + timestamp: "2026-05-05T12:00:01.000Z", + payload: { name: "openclaw_agent_consult" }, + }), + ).toMatchObject({ + id: "session-voice:1", + sessionId: "session-voice", + mode: "stt-tts", + transport: "managed-room", + brain: "agent-consult", + type: "tool.call", + turnId: "turn-1", + captureId: "capture-1", + callId: "call-1", + itemId: "item-1", + parentId: "parent-1", + final: false, + payload: { name: "openclaw_agent_consult" }, + }); + }); + + it("rejects turn and capture scoped events without correlation ids", () => { + const events = createTalkEventSequencer({ + sessionId: "session-voice", + mode: "stt-tts", + transport: "managed-room", + brain: "agent-consult", + }); + + expect(() => events.next({ type: "turn.started", payload: {} })).toThrow( + "Talk event turn.started requires turnId", + ); + expect(() => events.next({ type: "capture.started", payload: {} })).toThrow( + "Talk event capture.started requires captureId", + ); + }); +}); diff --git a/src/realtime-voice/talk-events.ts b/src/realtime-voice/talk-events.ts new file mode 100644 index 00000000000..5887c70632d --- /dev/null +++ b/src/realtime-voice/talk-events.ts @@ -0,0 +1,145 @@ +export const TALK_EVENT_TYPES = [ + "session.started", + "session.ready", + "session.closed", + "session.error", + "session.replaced", + "turn.started", + "turn.ended", + "turn.cancelled", + "capture.started", + "capture.stopped", + "capture.cancelled", + "capture.once", + "input.audio.delta", + "input.audio.committed", + "transcript.delta", + "transcript.done", + "output.text.delta", + "output.text.done", + "output.audio.started", + "output.audio.delta", + "output.audio.done", + "tool.call", + "tool.progress", + "tool.result", + "tool.error", + "usage.metrics", + "latency.metrics", + "health.changed", +] as const; + +export type TalkEventType = (typeof TALK_EVENT_TYPES)[number]; + +export type TalkMode = "realtime" | "stt-tts" | "transcription"; + +export type TalkTransport = "webrtc" | "provider-websocket" | "gateway-relay" | "managed-room"; + +export type TalkBrain = "agent-consult" | "direct-tools" | "none"; + +export type TalkEventContext = { + sessionId: string; + mode: TalkMode; + transport: TalkTransport; + brain: TalkBrain; + provider?: string; +}; + +export type TalkEvent = TalkEventContext & { + id: string; + type: TalkEventType; + turnId?: string; + captureId?: string; + seq: number; + timestamp: string; + final?: boolean; + callId?: string; + itemId?: string; + parentId?: string; + payload: TPayload; +}; + +export type TalkEventInput = { + type: TalkEventType; + payload: TPayload; + turnId?: string; + captureId?: string; + timestamp?: string; + final?: boolean; + callId?: string; + itemId?: string; + parentId?: string; +}; + +export type TalkEventSequencer = { + next(input: TalkEventInput): TalkEvent; +}; + +const TURN_SCOPED_TALK_EVENT_TYPES = new Set([ + "turn.started", + "turn.ended", + "turn.cancelled", + "input.audio.delta", + "input.audio.committed", + "transcript.delta", + "transcript.done", + "output.text.delta", + "output.text.done", + "output.audio.started", + "output.audio.delta", + "output.audio.done", + "tool.call", + "tool.progress", + "tool.result", + "tool.error", +]); + +const CAPTURE_SCOPED_TALK_EVENT_TYPES = new Set([ + "capture.started", + "capture.stopped", + "capture.cancelled", + "capture.once", +]); + +function assertTalkEventCorrelation(input: TalkEventInput): void { + if (TURN_SCOPED_TALK_EVENT_TYPES.has(input.type) && !input.turnId?.trim()) { + throw new Error(`Talk event ${input.type} requires turnId`); + } + if (CAPTURE_SCOPED_TALK_EVENT_TYPES.has(input.type) && !input.captureId?.trim()) { + throw new Error(`Talk event ${input.type} requires captureId`); + } +} + +export function createTalkEventSequencer( + context: TalkEventContext, + options: { now?: () => Date | string } = {}, +): TalkEventSequencer { + let seq = 0; + const now = options.now ?? (() => new Date()); + return { + next(input: TalkEventInput): TalkEvent { + assertTalkEventCorrelation(input); + seq += 1; + const timestamp = + input.timestamp ?? + (() => { + const value = now(); + return typeof value === "string" ? value : value.toISOString(); + })(); + return { + ...context, + id: `${context.sessionId}:${seq}`, + type: input.type, + turnId: input.turnId, + captureId: input.captureId, + seq, + timestamp, + final: input.final, + callId: input.callId, + itemId: input.itemId, + parentId: input.parentId, + payload: input.payload, + }; + }, + }; +} diff --git a/src/realtime-voice/talk-session-controller.test.ts b/src/realtime-voice/talk-session-controller.test.ts new file mode 100644 index 00000000000..34e1693f440 --- /dev/null +++ b/src/realtime-voice/talk-session-controller.test.ts @@ -0,0 +1,124 @@ +import { describe, expect, it } from "vitest"; +import { createTalkSessionController, normalizeTalkTransport } from "./talk-session-controller.js"; + +function createController() { + return createTalkSessionController( + { + sessionId: "talk-session", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "test", + maxRecentEvents: 3, + }, + { now: () => "2026-05-05T00:00:00.000Z" }, + ); +} + +describe("createTalkSessionController", () => { + it("emits common envelopes and keeps bounded recent event history", () => { + const talk = createController(); + + talk.emit({ type: "session.started", payload: {} }); + const firstTurn = talk.ensureTurn(); + talk.emit({ + type: "input.audio.delta", + turnId: firstTurn.turnId, + payload: { byteLength: 5 }, + }); + talk.emit({ + type: "transcript.done", + turnId: firstTurn.turnId, + payload: { text: "hello" }, + final: true, + }); + + expect(firstTurn.event).toMatchObject({ + id: "talk-session:2", + type: "turn.started", + sessionId: "talk-session", + turnId: "turn-1", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "test", + seq: 2, + timestamp: "2026-05-05T00:00:00.000Z", + }); + expect(talk.recentEvents.map((event) => event.type)).toEqual([ + "turn.started", + "input.audio.delta", + "transcript.done", + ]); + }); + + it("rejects stale turn completion before clearing the active turn", () => { + const talk = createController(); + talk.ensureTurn({ turnId: "turn-old" }); + expect(talk.endTurn({ turnId: "turn-other" })).toEqual({ + ok: false, + reason: "stale_turn", + }); + expect(talk.activeTurnId).toBe("turn-old"); + + const ended = talk.endTurn({ turnId: "turn-old", payload: { reason: "done" } }); + + expect(ended).toMatchObject({ + ok: true, + turnId: "turn-old", + event: { + type: "turn.ended", + turnId: "turn-old", + payload: { reason: "done" }, + final: true, + }, + }); + expect(talk.activeTurnId).toBeUndefined(); + }); + + it("tracks output audio lifecycle without duplicate started events", () => { + const talk = createController(); + + const first = talk.startOutputAudio({ payload: { callId: "call-1" } }); + const second = talk.startOutputAudio({ payload: { callId: "call-1" } }); + const done = talk.finishOutputAudio({ payload: { reason: "mark" } }); + + expect(first.event).toMatchObject({ + type: "output.audio.started", + turnId: "turn-1", + }); + expect(second).toEqual({ turnId: "turn-1" }); + expect(done).toMatchObject({ + type: "output.audio.done", + turnId: "turn-1", + payload: { reason: "mark" }, + final: true, + }); + expect(talk.outputAudioActive).toBe(false); + }); + + it("clears stale output audio state when a replacement turn starts", () => { + const talk = createController(); + + talk.startOutputAudio({ turnId: "turn-old" }); + expect(talk.outputAudioActive).toBe(true); + + const current = talk.startTurn({ turnId: "turn-current" }); + + expect(current).toMatchObject({ + turnId: "turn-current", + event: expect.objectContaining({ type: "turn.started", turnId: "turn-current" }), + }); + expect(talk.activeTurnId).toBe("turn-current"); + expect(talk.outputAudioActive).toBe(false); + }); +}); + +describe("normalizeTalkTransport", () => { + it("maps legacy public transport names to canonical names", () => { + expect(normalizeTalkTransport(undefined)).toBeUndefined(); + expect(normalizeTalkTransport("webrtc-sdp")).toBe("webrtc"); + expect(normalizeTalkTransport("json-pcm-websocket")).toBe("provider-websocket"); + expect(normalizeTalkTransport("gateway-relay")).toBe("gateway-relay"); + }); +}); diff --git a/src/realtime-voice/talk-session-controller.ts b/src/realtime-voice/talk-session-controller.ts new file mode 100644 index 00000000000..3643853704c --- /dev/null +++ b/src/realtime-voice/talk-session-controller.ts @@ -0,0 +1,206 @@ +import { + createTalkEventSequencer, + type TalkBrain, + type TalkEvent, + type TalkEventContext, + type TalkEventInput, + type TalkEventSequencer, + type TalkMode, + type TalkTransport, +} from "./talk-events.js"; + +export type TalkTurnFailureReason = "no_active_turn" | "stale_turn"; + +export type TalkTurnSuccess = { + event: TalkEvent; + ok: true; + turnId: string; +}; + +export type TalkTurnFailure = { + ok: false; + reason: TalkTurnFailureReason; +}; + +export type TalkTurnResult = TalkTurnSuccess | TalkTurnFailure; + +export type TalkEnsureTurnResult = { + event?: TalkEvent; + turnId: string; +}; + +export type TalkSessionController = { + readonly activeTurnId: string | undefined; + readonly context: TalkEventContext; + readonly outputAudioActive: boolean; + readonly recentEvents: readonly TalkEvent[]; + clearActiveTurn(): void; + emit(input: TalkEventInput): TalkEvent; + ensureTurn(params?: { payload?: unknown; turnId?: string }): TalkEnsureTurnResult; + startTurn(params?: { payload?: unknown; turnId?: string }): TalkEnsureTurnResult; + endTurn(params?: { payload?: unknown; turnId?: string }): TalkTurnResult; + cancelTurn(params?: { payload?: unknown; turnId?: string }): TalkTurnResult; + finishOutputAudio(params?: { payload?: unknown; turnId?: string }): TalkEvent | undefined; + startOutputAudio(params?: { payload?: unknown; turnId?: string }): TalkEnsureTurnResult; +}; + +export type TalkSessionControllerParams = TalkEventContext & { + maxRecentEvents?: number; + turnIdPrefix?: string; +}; + +export function createTalkSessionController( + params: TalkSessionControllerParams, + options: { now?: () => Date | string; sequencer?: TalkEventSequencer } = {}, +): TalkSessionController { + const { maxRecentEvents = 20, turnIdPrefix = "turn", ...context } = params; + const sequencer = options.sequencer ?? createTalkEventSequencer(context, { now: options.now }); + const recentEvents: TalkEvent[] = []; + let activeTurnId: string | undefined; + let outputAudioActive = false; + let turnSeq = 0; + + const remember = (event: TalkEvent): TalkEvent => { + recentEvents.push(event as TalkEvent); + if (recentEvents.length > maxRecentEvents) { + recentEvents.splice(0, recentEvents.length - maxRecentEvents); + } + return event; + }; + + const emit = (input: TalkEventInput): TalkEvent => { + return remember(sequencer.next(input)); + }; + + const resolveActiveTurn = (requestedTurnId: string | undefined): string | TalkTurnFailure => { + if (!activeTurnId) { + return { ok: false, reason: "no_active_turn" }; + } + const normalizedRequested = normalizeOptionalString(requestedTurnId); + if (normalizedRequested && normalizedRequested !== activeTurnId) { + return { ok: false, reason: "stale_turn" }; + } + return activeTurnId; + }; + + const ensureTurn = (ensureParams: { payload?: unknown; turnId?: string } = {}) => { + if (activeTurnId) { + return { turnId: activeTurnId }; + } + return startTurn(ensureParams); + }; + + const startTurn = (startParams: { payload?: unknown; turnId?: string } = {}) => { + const turnId = normalizeOptionalString(startParams.turnId) ?? `${turnIdPrefix}-${++turnSeq}`; + outputAudioActive = false; + activeTurnId = turnId; + return { + turnId, + event: emit({ + type: "turn.started", + turnId, + payload: startParams.payload ?? {}, + }), + }; + }; + + const finishTurn = ( + type: "turn.ended" | "turn.cancelled", + paramsForTurn: { payload?: unknown; turnId?: string } = {}, + ): TalkTurnResult => { + const turnId = resolveActiveTurn(paramsForTurn.turnId); + if (typeof turnId !== "string") { + return turnId; + } + outputAudioActive = false; + activeTurnId = undefined; + return { + ok: true, + turnId, + event: emit({ + type, + turnId, + payload: paramsForTurn.payload ?? {}, + final: true, + }), + }; + }; + + return { + get activeTurnId() { + return activeTurnId; + }, + context, + get outputAudioActive() { + return outputAudioActive; + }, + get recentEvents() { + return recentEvents; + }, + clearActiveTurn() { + activeTurnId = undefined; + outputAudioActive = false; + }, + emit, + ensureTurn, + startTurn, + endTurn(paramsForTurn) { + return finishTurn("turn.ended", paramsForTurn); + }, + cancelTurn(paramsForTurn) { + return finishTurn("turn.cancelled", paramsForTurn); + }, + finishOutputAudio(paramsForOutput = {}) { + if (!outputAudioActive) { + return undefined; + } + const turnId = resolveActiveTurn(paramsForOutput.turnId); + if (typeof turnId !== "string") { + return undefined; + } + outputAudioActive = false; + return emit({ + type: "output.audio.done", + turnId, + payload: paramsForOutput.payload ?? {}, + final: true, + }); + }, + startOutputAudio(paramsForOutput = {}) { + const turn = ensureTurn({ turnId: paramsForOutput.turnId, payload: {} }); + if (outputAudioActive) { + return { turnId: turn.turnId }; + } + outputAudioActive = true; + return { + turnId: turn.turnId, + event: emit({ + type: "output.audio.started", + turnId: turn.turnId, + payload: paramsForOutput.payload ?? {}, + }), + }; + }, + }; +} + +export function normalizeTalkTransport(value: string | undefined): string | undefined { + const normalized = normalizeOptionalString(value); + if (!normalized) { + return undefined; + } + if (normalized === "webrtc-sdp") { + return "webrtc"; + } + if (normalized === "json-pcm-websocket") { + return "provider-websocket"; + } + return normalized; +} + +export type { TalkBrain, TalkEvent, TalkEventContext, TalkEventInput, TalkMode, TalkTransport }; + +function normalizeOptionalString(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + return trimmed ? trimmed : undefined; +}