feat: add shared talk runtime primitives

This commit is contained in:
Peter Steinberger
2026-05-05 20:59:20 +01:00
parent 24853ced11
commit c90c68c636
12 changed files with 1357 additions and 2 deletions

View File

@@ -8,6 +8,7 @@ export type {
RealtimeVoiceBrowserSession,
RealtimeVoiceBrowserSessionCreateRequest,
RealtimeVoiceBridgeCreateRequest,
RealtimeVoiceProviderCapabilities,
RealtimeVoiceCloseReason,
RealtimeVoiceProviderConfig,
RealtimeVoiceProviderConfiguredContext,
@@ -22,6 +23,29 @@ export {
REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ,
REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
} from "../realtime-voice/provider-types.js";
export {
createTalkEventSequencer,
TALK_EVENT_TYPES,
type TalkBrain,
type TalkEvent,
type TalkEventContext,
type TalkEventInput,
type TalkEventSequencer,
type TalkEventType,
type TalkMode,
type TalkTransport,
} from "../realtime-voice/talk-events.js";
export {
createTalkSessionController,
normalizeTalkTransport,
type TalkEnsureTurnResult,
type TalkSessionController,
type TalkSessionControllerParams,
type TalkTurnFailure,
type TalkTurnFailureReason,
type TalkTurnResult,
type TalkTurnSuccess,
} from "../realtime-voice/talk-session-controller.js";
export {
buildRealtimeVoiceAgentConsultChatMessage,
buildRealtimeVoiceAgentConsultPrompt,
@@ -44,6 +68,18 @@ export {
type RealtimeVoiceAgentConsultResult,
type RealtimeVoiceAgentConsultRuntime,
} from "../realtime-voice/agent-consult-runtime.js";
export {
createRealtimeVoiceAgentTalkbackQueue,
type RealtimeVoiceAgentTalkbackQueue,
type RealtimeVoiceAgentTalkbackQueueParams,
type RealtimeVoiceAgentTalkbackResult,
} from "../realtime-voice/agent-talkback-runtime.js";
export {
resolveRealtimeVoiceFastContextConsult,
type RealtimeVoiceFastContextConfig,
type RealtimeVoiceFastContextConsultResult,
type RealtimeVoiceFastContextLabels,
} from "../realtime-voice/fast-context-runtime.js";
export {
canonicalizeRealtimeVoiceProviderId,
getRealtimeVoiceProvider,
@@ -62,6 +98,18 @@ export {
type RealtimeVoiceBridgeSessionParams,
type RealtimeVoiceMarkStrategy,
} from "../realtime-voice/session-runtime.js";
export {
extendRealtimeVoiceOutputEchoSuppression,
getRealtimeVoiceBridgeEventHealth,
getRealtimeVoiceTranscriptHealth,
isLikelyRealtimeVoiceAssistantEchoTranscript,
recordRealtimeVoiceBridgeEvent,
recordRealtimeVoiceTranscript,
type RealtimeVoiceBridgeEventHealth,
type RealtimeVoiceBridgeEventLogEntry,
type RealtimeVoiceTranscriptEntry,
type RealtimeVoiceTranscriptHealth,
} from "../realtime-voice/session-log-runtime.js";
export {
convertPcmToMulaw8k,
mulawToPcm,

View File

@@ -47,6 +47,7 @@ import type {
RealtimeVoiceBrowserSession,
RealtimeVoiceBrowserSessionCreateRequest,
RealtimeVoiceBridgeCreateRequest,
RealtimeVoiceProviderCapabilities,
RealtimeVoiceProviderConfig,
RealtimeVoiceProviderConfiguredContext,
RealtimeVoiceProviderId,
@@ -1839,6 +1840,7 @@ export type RealtimeVoiceProviderPlugin = {
aliases?: string[];
defaultModel?: string;
autoSelectOrder?: number;
capabilities?: RealtimeVoiceProviderCapabilities;
resolveConfig?: (ctx: RealtimeVoiceProviderResolveConfigContext) => RealtimeVoiceProviderConfig;
isConfigured: (ctx: RealtimeVoiceProviderConfiguredContext) => boolean;
createBridge: (req: RealtimeVoiceBridgeCreateRequest) => RealtimeVoiceBridge;

View File

@@ -0,0 +1,173 @@
import { describe, expect, it, vi } from "vitest";
import { createRealtimeVoiceAgentTalkbackQueue } from "./agent-talkback-runtime.js";
function makeLogger() {
return {
info: vi.fn(),
warn: vi.fn(),
};
}
describe("realtime voice agent talkback queue", () => {
it("debounces transcript fragments into one consult", async () => {
vi.useFakeTimers();
const logger = makeLogger();
const consult = vi.fn(async ({ question }) => ({ text: `answer:${question}` }));
const deliver = vi.fn();
const queue = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: 100,
isStopped: () => false,
logger,
logPrefix: "[test]",
responseStyle: "brief",
fallbackText: "fallback",
consult,
deliver,
});
queue.enqueue("first");
queue.enqueue("second");
await vi.advanceTimersByTimeAsync(100);
expect(consult).toHaveBeenCalledWith({
question: "first\nsecond",
responseStyle: "brief",
signal: expect.any(AbortSignal),
});
expect(deliver).toHaveBeenCalledWith("answer:first\nsecond");
vi.useRealTimers();
});
it("accumulates pending questions while a consult is active", async () => {
vi.useFakeTimers();
const logger = makeLogger();
let finishFirst: ((value: { text: string }) => void) | undefined;
const consult = vi
.fn()
.mockImplementationOnce(
() =>
new Promise<{ text: string }>((resolve) => {
finishFirst = resolve;
}),
)
.mockResolvedValueOnce({ text: "second-answer" });
const deliver = vi.fn();
const queue = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: 10,
isStopped: () => false,
logger,
logPrefix: "[test]",
responseStyle: "brief",
fallbackText: "fallback",
consult,
deliver,
});
queue.enqueue("first");
await vi.advanceTimersByTimeAsync(10);
queue.enqueue("ignored");
queue.enqueue("second");
await vi.advanceTimersByTimeAsync(10);
finishFirst?.({ text: "first-answer" });
await vi.runAllTimersAsync();
expect(consult).toHaveBeenNthCalledWith(1, {
question: "first",
responseStyle: "brief",
signal: expect.any(AbortSignal),
});
expect(consult).toHaveBeenNthCalledWith(2, {
question: "ignored\nsecond",
responseStyle: "brief",
signal: expect.any(AbortSignal),
});
expect(deliver).toHaveBeenCalledWith("first-answer");
expect(deliver).toHaveBeenCalledWith("second-answer");
vi.useRealTimers();
});
it("delivers fallback text when consult fails", async () => {
vi.useFakeTimers();
const logger = makeLogger();
const deliver = vi.fn();
const queue = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: 1,
isStopped: () => false,
logger,
logPrefix: "[test]",
responseStyle: "brief",
fallbackText: "fallback",
consult: vi.fn(async () => {
throw new Error("boom");
}),
deliver,
});
queue.enqueue("question");
await vi.advanceTimersByTimeAsync(1);
expect(logger.warn).toHaveBeenCalledWith("[test] consult failed: boom");
expect(deliver).toHaveBeenCalledWith("fallback");
vi.useRealTimers();
});
it("cancels pending debounced work on close", async () => {
vi.useFakeTimers();
const consult = vi.fn(async () => ({ text: "answer" }));
const queue = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: 100,
isStopped: () => false,
logger: makeLogger(),
logPrefix: "[test]",
responseStyle: "brief",
fallbackText: "fallback",
consult,
deliver: vi.fn(),
});
queue.enqueue("question");
queue.close();
await vi.advanceTimersByTimeAsync(100);
expect(consult).not.toHaveBeenCalled();
vi.useRealTimers();
});
it("aborts the active consult on close without delivering fallback", async () => {
vi.useFakeTimers();
const logger = makeLogger();
let signal: AbortSignal | undefined;
const consult = vi.fn(
({ signal: nextSignal }) =>
new Promise<{ text: string }>((_resolve, reject) => {
signal = nextSignal;
nextSignal.addEventListener("abort", () => {
const error = new Error("aborted");
error.name = "AbortError";
reject(error);
});
}),
);
const deliver = vi.fn();
const queue = createRealtimeVoiceAgentTalkbackQueue({
debounceMs: 1,
isStopped: () => false,
logger,
logPrefix: "[test]",
responseStyle: "brief",
fallbackText: "fallback",
consult,
deliver,
});
queue.enqueue("question");
await vi.advanceTimersByTimeAsync(1);
queue.close();
await vi.runAllTimersAsync();
expect(signal?.aborted).toBe(true);
expect(deliver).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
vi.useRealTimers();
});
});

View File

@@ -0,0 +1,131 @@
import type { RuntimeLogger } from "../plugins/runtime/types-core.js";
export type RealtimeVoiceAgentTalkbackResult = {
text: string;
};
export type RealtimeVoiceAgentTalkbackQueue = {
close(): void;
enqueue(question: string): void;
};
export type RealtimeVoiceAgentTalkbackQueueParams = {
debounceMs: number;
isStopped: () => boolean;
logger: Pick<RuntimeLogger, "info" | "warn">;
logPrefix: string;
responseStyle: string;
fallbackText: string;
consult: (args: {
question: string;
responseStyle: string;
signal: AbortSignal;
}) => Promise<RealtimeVoiceAgentTalkbackResult>;
deliver: (text: string) => void;
};
export function createRealtimeVoiceAgentTalkbackQueue(
params: RealtimeVoiceAgentTalkbackQueueParams,
): RealtimeVoiceAgentTalkbackQueue {
let active = false;
let pendingQuestion: string | undefined;
let debounceTimer: ReturnType<typeof setTimeout> | undefined;
let activeAbortController: AbortController | undefined;
const clearDebounceTimer = () => {
if (!debounceTimer) {
return;
}
clearTimeout(debounceTimer);
debounceTimer = undefined;
};
const run = async (question: string): Promise<void> => {
const trimmed = question.trim();
if (!trimmed || params.isStopped()) {
return;
}
if (active) {
pendingQuestion = appendPendingQuestion(pendingQuestion, trimmed);
return;
}
active = true;
let nextQuestion: string | undefined = trimmed;
try {
while (nextQuestion) {
if (params.isStopped()) {
return;
}
const currentQuestion = nextQuestion;
pendingQuestion = undefined;
params.logger.info(`${params.logPrefix} consult: ${currentQuestion}`);
activeAbortController = new AbortController();
const result = await params.consult({
question: currentQuestion,
responseStyle: params.responseStyle,
signal: activeAbortController.signal,
});
activeAbortController = undefined;
const text = result.text.trim();
if (!params.isStopped() && text) {
params.deliver(text);
}
nextQuestion = pendingQuestion;
}
} catch (error) {
activeAbortController = undefined;
if (params.isStopped() || isAbortError(error)) {
return;
}
const message = error instanceof Error ? error.message : String(error);
params.logger.warn(`${params.logPrefix} consult failed: ${message}`);
params.deliver(params.fallbackText);
} finally {
active = false;
const queuedQuestion = pendingQuestion;
pendingQuestion = undefined;
if (queuedQuestion && !params.isStopped()) {
void run(queuedQuestion);
}
}
};
return {
close: () => {
clearDebounceTimer();
pendingQuestion = undefined;
activeAbortController?.abort();
},
enqueue: (question) => {
const trimmed = question.trim();
if (!trimmed || params.isStopped()) {
return;
}
if (active) {
pendingQuestion = appendPendingQuestion(pendingQuestion, trimmed);
clearDebounceTimer();
return;
}
pendingQuestion = appendPendingQuestion(pendingQuestion, trimmed);
clearDebounceTimer();
debounceTimer = setTimeout(() => {
debounceTimer = undefined;
const queuedQuestion = pendingQuestion;
pendingQuestion = undefined;
if (queuedQuestion && !params.isStopped()) {
void run(queuedQuestion);
}
}, params.debounceMs);
debounceTimer.unref?.();
},
};
}
function appendPendingQuestion(current: string | undefined, next: string): string {
return current ? `${current}\n${next}` : next;
}
function isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}

View File

@@ -0,0 +1,189 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { formatErrorMessage } from "../infra/errors.js";
import { getActiveMemorySearchManager } from "../plugins/memory-runtime.js";
import type { RealtimeVoiceAgentConsultResult } from "./agent-consult-runtime.js";
import { parseRealtimeVoiceAgentConsultArgs } from "./agent-consult-tool.js";
type Logger = {
debug?: (message: string) => void;
};
type MemorySearchHit = {
path: string;
startLine: number;
endLine: number;
snippet: string;
source: "memory" | "sessions";
score: number;
};
export type RealtimeVoiceFastContextConfig = {
enabled: boolean;
maxResults: number;
sources: Array<"memory" | "sessions">;
timeoutMs: number;
fallbackToConsult: boolean;
};
export type RealtimeVoiceFastContextLabels = {
audienceLabel: string;
contextName: string;
};
type FastContextLookupResult =
| { status: "unavailable"; error?: string }
| { status: "hits"; hits: MemorySearchHit[] };
export type RealtimeVoiceFastContextConsultResult =
| { handled: false }
| { handled: true; result: RealtimeVoiceAgentConsultResult };
const MAX_SNIPPET_CHARS = 700;
class RealtimeFastContextTimeoutError extends Error {
constructor(timeoutMs: number) {
super(`fast context lookup timed out after ${timeoutMs}ms`);
this.name = "RealtimeFastContextTimeoutError";
}
}
function normalizeSnippet(text: string): string {
const normalized = text.replace(/\s+/g, " ").trim();
if (normalized.length <= MAX_SNIPPET_CHARS) {
return normalized;
}
return `${normalized.slice(0, MAX_SNIPPET_CHARS - 1).trimEnd()}...`;
}
function buildSearchQuery(args: unknown): string {
const parsed = parseRealtimeVoiceAgentConsultArgs(args);
return [parsed.question, parsed.context].filter(Boolean).join("\n\n");
}
function resolveLabels(
labels?: Partial<RealtimeVoiceFastContextLabels>,
): RealtimeVoiceFastContextLabels {
return {
audienceLabel: labels?.audienceLabel?.trim() || "person",
contextName: labels?.contextName?.trim() || "OpenClaw memory context",
};
}
function buildContextText(params: {
query: string;
hits: MemorySearchHit[];
labels: RealtimeVoiceFastContextLabels;
}): string {
const hits = params.hits
.map((hit, index) => {
const location = `${hit.path}:${hit.startLine}-${hit.endLine}`;
return `${index + 1}. [${hit.source}] ${location}\n${normalizeSnippet(hit.snippet)}`;
})
.join("\n\n");
return [
`Fast ${params.labels.contextName} found for the live ${params.labels.audienceLabel}.`,
`Use this context only if it answers the ${params.labels.audienceLabel}'s question. If it is not relevant, say briefly that you do not have that context handy.`,
`Question:\n${params.query}`,
`Context:\n${hits}`,
].join("\n\n");
}
function buildMissText(query: string, labels: RealtimeVoiceFastContextLabels): string {
return [
`No relevant ${labels.contextName} was found quickly for the live ${labels.audienceLabel}.`,
`Answer briefly that you do not have that context handy. Do not keep checking unless the ${labels.audienceLabel} asks you to.`,
`Question:\n${query}`,
].join("\n\n");
}
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<T>((_resolve, reject) => {
timer = setTimeout(() => reject(new RealtimeFastContextTimeoutError(timeoutMs)), timeoutMs);
}),
]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
async function lookupFastContext(params: {
cfg: OpenClawConfig;
agentId: string;
sessionKey: string;
config: RealtimeVoiceFastContextConfig;
query: string;
}): Promise<FastContextLookupResult> {
const memory = await getActiveMemorySearchManager({
cfg: params.cfg,
agentId: params.agentId,
});
if (!memory.manager) {
return {
status: "unavailable",
error: memory.error ?? "no active memory manager",
};
}
const hits = await memory.manager.search(params.query, {
maxResults: params.config.maxResults,
sessionKey: params.sessionKey,
sources: params.config.sources,
});
return { status: "hits", hits };
}
export async function resolveRealtimeVoiceFastContextConsult(params: {
cfg: OpenClawConfig;
agentId: string;
sessionKey: string;
config: RealtimeVoiceFastContextConfig;
args: unknown;
logger: Logger;
labels?: Partial<RealtimeVoiceFastContextLabels>;
}): Promise<RealtimeVoiceFastContextConsultResult> {
if (!params.config.enabled) {
return { handled: false };
}
const labels = resolveLabels(params.labels);
const query = buildSearchQuery(params.args);
try {
const lookup = await withTimeout(
lookupFastContext({
cfg: params.cfg,
agentId: params.agentId,
sessionKey: params.sessionKey,
config: params.config,
query,
}),
params.config.timeoutMs,
);
if (lookup.status === "unavailable") {
params.logger.debug?.(`[realtime-voice] fast context unavailable: ${lookup.error}`);
return params.config.fallbackToConsult
? { handled: false }
: { handled: true, result: { text: buildMissText(query, labels) } };
}
const { hits } = lookup;
if (hits.length === 0) {
return params.config.fallbackToConsult
? { handled: false }
: { handled: true, result: { text: buildMissText(query, labels) } };
}
return {
handled: true,
result: { text: buildContextText({ query, hits, labels }) },
};
} catch (error) {
const message = formatErrorMessage(error);
params.logger.debug?.(`[realtime-voice] fast context lookup failed: ${message}`);
return params.config.fallbackToConsult
? { handled: false }
: { handled: true, result: { text: buildMissText(query, labels) } };
}
}

View File

@@ -1,4 +1,5 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { TalkTransport } from "./talk-events.js";
export type RealtimeVoiceProviderId = string;
@@ -72,6 +73,17 @@ export type RealtimeVoiceBridgeCallbacks = {
export type RealtimeVoiceProviderConfig = Record<string, unknown>;
export type RealtimeVoiceProviderCapabilities = {
transports: TalkTransport[];
inputAudioFormats: RealtimeVoiceAudioFormat[];
outputAudioFormats: RealtimeVoiceAudioFormat[];
supportsBrowserSession?: boolean;
supportsBargeIn?: boolean;
supportsToolCalls?: boolean;
supportsVideoFrames?: boolean;
supportsSessionResumption?: boolean;
};
export type RealtimeVoiceProviderResolveConfigContext = {
cfg: OpenClawConfig;
rawConfig: RealtimeVoiceProviderConfig;
@@ -107,7 +119,7 @@ export type RealtimeVoiceBrowserAudioContract = {
export type RealtimeVoiceBrowserWebRtcSdpSession = {
provider: RealtimeVoiceProviderId;
transport?: "webrtc-sdp";
transport: "webrtc";
clientSecret: string;
offerUrl?: string;
offerHeaders?: Record<string, string>;
@@ -118,7 +130,7 @@ export type RealtimeVoiceBrowserWebRtcSdpSession = {
export type RealtimeVoiceBrowserJsonPcmWebSocketSession = {
provider: RealtimeVoiceProviderId;
transport: "json-pcm-websocket";
transport: "provider-websocket";
protocol: string;
clientSecret: string;
websocketUrl: string;

View File

@@ -0,0 +1,80 @@
import { describe, expect, it } from "vitest";
import {
extendRealtimeVoiceOutputEchoSuppression,
getRealtimeVoiceBridgeEventHealth,
getRealtimeVoiceTranscriptHealth,
isLikelyRealtimeVoiceAssistantEchoTranscript,
recordRealtimeVoiceBridgeEvent,
recordRealtimeVoiceTranscript,
type RealtimeVoiceBridgeEventLogEntry,
type RealtimeVoiceTranscriptEntry,
} from "./session-log-runtime.js";
describe("realtime voice session log runtime", () => {
it("records bounded transcript health", () => {
const transcript: RealtimeVoiceTranscriptEntry[] = [];
recordRealtimeVoiceTranscript(transcript, "user", "hello", 1);
recordRealtimeVoiceTranscript(transcript, "assistant", "hi", 1);
expect(getRealtimeVoiceTranscriptHealth(transcript)).toMatchObject({
realtimeTranscriptLines: 1,
lastRealtimeTranscriptRole: "assistant",
lastRealtimeTranscriptText: "hi",
});
});
it("skips noisy audio append events and records bridge health", () => {
const events: RealtimeVoiceBridgeEventLogEntry[] = [];
recordRealtimeVoiceBridgeEvent(events, {
direction: "client",
type: "input_audio_buffer.append",
});
recordRealtimeVoiceBridgeEvent(events, {
direction: "server",
type: "response.done",
detail: "ok",
});
expect(getRealtimeVoiceBridgeEventHealth(events)).toMatchObject({
lastRealtimeEventType: "server:response.done",
lastRealtimeEventDetail: "ok",
});
});
it("detects likely assistant echo transcripts", () => {
const nowMs = Date.now();
const transcript: RealtimeVoiceTranscriptEntry[] = [
{
at: new Date(nowMs - 1000).toISOString(),
role: "assistant",
text: "The deployment finished cleanly and all checks passed",
},
];
expect(
isLikelyRealtimeVoiceAssistantEchoTranscript({
transcript,
text: "deployment finished cleanly and all checks passed",
lookbackMs: 45_000,
nowMs,
}),
).toBe(true);
});
it("extends output echo suppression from audio duration", () => {
expect(
extendRealtimeVoiceOutputEchoSuppression({
audio: Buffer.alloc(96),
bytesPerMs: 48,
tailMs: 3000,
nowMs: 100,
lastOutputPlayableUntilMs: 0,
suppressInputUntilMs: 0,
}),
).toEqual({
durationMs: 2,
lastOutputPlayableUntilMs: 102,
suppressInputUntilMs: 3102,
});
});
});

View File

@@ -0,0 +1,155 @@
import type { RealtimeVoiceBridgeEvent, RealtimeVoiceRole } from "./provider-types.js";
export type RealtimeVoiceTranscriptEntry = {
at: string;
role: RealtimeVoiceRole;
text: string;
};
export type RealtimeVoiceTranscriptHealth = {
realtimeTranscriptLines: number;
lastRealtimeTranscriptAt?: string;
lastRealtimeTranscriptRole?: RealtimeVoiceRole;
lastRealtimeTranscriptText?: string;
recentRealtimeTranscript: RealtimeVoiceTranscriptEntry[];
};
export type RealtimeVoiceBridgeEventLogEntry = RealtimeVoiceBridgeEvent & {
at: string;
};
export type RealtimeVoiceBridgeEventHealth = {
lastRealtimeEventAt?: string;
lastRealtimeEventType?: string;
lastRealtimeEventDetail?: string;
recentRealtimeEvents: RealtimeVoiceBridgeEventLogEntry[];
};
export function recordRealtimeVoiceTranscript(
transcript: RealtimeVoiceTranscriptEntry[],
role: RealtimeVoiceRole,
text: string,
maxEntries = 40,
): RealtimeVoiceTranscriptEntry {
const entry = { at: new Date().toISOString(), role, text };
transcript.push(entry);
if (transcript.length > maxEntries) {
transcript.splice(0, transcript.length - maxEntries);
}
return entry;
}
export function getRealtimeVoiceTranscriptHealth(
transcript: RealtimeVoiceTranscriptEntry[],
): RealtimeVoiceTranscriptHealth {
const last = transcript.at(-1);
return {
realtimeTranscriptLines: transcript.length,
lastRealtimeTranscriptAt: last?.at,
lastRealtimeTranscriptRole: last?.role,
lastRealtimeTranscriptText: last?.text,
recentRealtimeTranscript: transcript.slice(-5),
};
}
export function recordRealtimeVoiceBridgeEvent(
events: RealtimeVoiceBridgeEventLogEntry[],
event: RealtimeVoiceBridgeEvent,
maxEntries = 40,
): void {
if (event.direction === "client" && event.type === "input_audio_buffer.append") {
return;
}
events.push({ at: new Date().toISOString(), ...event });
if (events.length > maxEntries) {
events.splice(0, events.length - maxEntries);
}
}
export function getRealtimeVoiceBridgeEventHealth(
events: RealtimeVoiceBridgeEventLogEntry[],
): RealtimeVoiceBridgeEventHealth {
const last = events.at(-1);
return {
lastRealtimeEventAt: last?.at,
lastRealtimeEventType: last ? `${last.direction}:${last.type}` : undefined,
lastRealtimeEventDetail: last?.detail,
recentRealtimeEvents: events.slice(-10),
};
}
function normalizeTranscriptForEchoMatch(text: string): string[] {
return text
.toLowerCase()
.replace(/[']/g, "")
.replace(/[^a-z0-9]+/g, " ")
.trim()
.split(/\s+/)
.filter((token) => token.length > 1);
}
function hasMeaningfulEchoOverlap(userTokens: string[], assistantTokens: string[]): boolean {
if (userTokens.length < 4 || assistantTokens.length < 4) {
return false;
}
const uniqueUserTokens = [...new Set(userTokens)];
if (uniqueUserTokens.length < 4) {
return false;
}
const assistantTokenSet = new Set(assistantTokens);
const overlap = uniqueUserTokens.filter((token) => assistantTokenSet.has(token)).length;
return overlap / uniqueUserTokens.length >= 0.58;
}
export function isLikelyRealtimeVoiceAssistantEchoTranscript(params: {
transcript: RealtimeVoiceTranscriptEntry[];
text: string;
lookbackMs: number;
nowMs?: number;
}): boolean {
const userTokens = normalizeTranscriptForEchoMatch(params.text);
if (userTokens.length < 4) {
return false;
}
const nowMs = params.nowMs ?? Date.now();
const recentAssistantText = params.transcript
.filter((entry) => {
if (entry.role !== "assistant") {
return false;
}
const at = Date.parse(entry.at);
return Number.isFinite(at) && nowMs - at <= params.lookbackMs;
})
.slice(-6)
.map((entry) => entry.text)
.join(" ");
if (!recentAssistantText.trim()) {
return false;
}
const userNormalized = userTokens.join(" ");
const assistantTokens = normalizeTranscriptForEchoMatch(recentAssistantText);
const assistantNormalized = assistantTokens.join(" ");
return (
(userNormalized.length >= 18 && assistantNormalized.includes(userNormalized)) ||
(assistantNormalized.length >= 18 && userNormalized.includes(assistantNormalized)) ||
hasMeaningfulEchoOverlap(userTokens, assistantTokens)
);
}
export function extendRealtimeVoiceOutputEchoSuppression(params: {
audio: Buffer;
bytesPerMs: number;
tailMs: number;
nowMs: number;
lastOutputPlayableUntilMs: number;
suppressInputUntilMs: number;
}): { lastOutputPlayableUntilMs: number; suppressInputUntilMs: number; durationMs: number } {
const durationMs = Math.ceil(params.audio.byteLength / params.bytesPerMs);
const playbackStartMs = Math.max(params.nowMs, params.lastOutputPlayableUntilMs);
const playbackEndMs = playbackStartMs + durationMs;
return {
durationMs,
lastOutputPlayableUntilMs: playbackEndMs,
suppressInputUntilMs: Math.max(params.suppressInputUntilMs, playbackEndMs + params.tailMs),
};
}

View File

@@ -0,0 +1,90 @@
import { describe, expect, it } from "vitest";
import { createTalkEventSequencer } from "./talk-events.js";
describe("talk event envelope", () => {
it("adds stable session context and monotonically increasing sequence numbers", () => {
const events = createTalkEventSequencer(
{
sessionId: "session-1",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
},
{ now: () => "2026-05-05T12:00:00.000Z" },
);
expect(events.next({ type: "session.started", payload: { ok: true } })).toEqual({
id: "session-1:1",
sessionId: "session-1",
seq: 1,
timestamp: "2026-05-05T12:00:00.000Z",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
type: "session.started",
payload: { ok: true },
turnId: undefined,
captureId: undefined,
final: undefined,
callId: undefined,
itemId: undefined,
parentId: undefined,
});
expect(events.next({ type: "session.ready", payload: null }).seq).toBe(2);
});
it("preserves turn, capture, and provider correlation fields", () => {
const events = createTalkEventSequencer({
sessionId: "session-voice",
mode: "stt-tts",
transport: "managed-room",
brain: "agent-consult",
});
expect(
events.next({
type: "tool.call",
turnId: "turn-1",
captureId: "capture-1",
callId: "call-1",
itemId: "item-1",
parentId: "parent-1",
final: false,
timestamp: "2026-05-05T12:00:01.000Z",
payload: { name: "openclaw_agent_consult" },
}),
).toMatchObject({
id: "session-voice:1",
sessionId: "session-voice",
mode: "stt-tts",
transport: "managed-room",
brain: "agent-consult",
type: "tool.call",
turnId: "turn-1",
captureId: "capture-1",
callId: "call-1",
itemId: "item-1",
parentId: "parent-1",
final: false,
payload: { name: "openclaw_agent_consult" },
});
});
it("rejects turn and capture scoped events without correlation ids", () => {
const events = createTalkEventSequencer({
sessionId: "session-voice",
mode: "stt-tts",
transport: "managed-room",
brain: "agent-consult",
});
expect(() => events.next({ type: "turn.started", payload: {} })).toThrow(
"Talk event turn.started requires turnId",
);
expect(() => events.next({ type: "capture.started", payload: {} })).toThrow(
"Talk event capture.started requires captureId",
);
});
});

View File

@@ -0,0 +1,145 @@
export const TALK_EVENT_TYPES = [
"session.started",
"session.ready",
"session.closed",
"session.error",
"session.replaced",
"turn.started",
"turn.ended",
"turn.cancelled",
"capture.started",
"capture.stopped",
"capture.cancelled",
"capture.once",
"input.audio.delta",
"input.audio.committed",
"transcript.delta",
"transcript.done",
"output.text.delta",
"output.text.done",
"output.audio.started",
"output.audio.delta",
"output.audio.done",
"tool.call",
"tool.progress",
"tool.result",
"tool.error",
"usage.metrics",
"latency.metrics",
"health.changed",
] as const;
export type TalkEventType = (typeof TALK_EVENT_TYPES)[number];
export type TalkMode = "realtime" | "stt-tts" | "transcription";
export type TalkTransport = "webrtc" | "provider-websocket" | "gateway-relay" | "managed-room";
export type TalkBrain = "agent-consult" | "direct-tools" | "none";
export type TalkEventContext = {
sessionId: string;
mode: TalkMode;
transport: TalkTransport;
brain: TalkBrain;
provider?: string;
};
export type TalkEvent<TPayload = unknown> = TalkEventContext & {
id: string;
type: TalkEventType;
turnId?: string;
captureId?: string;
seq: number;
timestamp: string;
final?: boolean;
callId?: string;
itemId?: string;
parentId?: string;
payload: TPayload;
};
export type TalkEventInput<TPayload = unknown> = {
type: TalkEventType;
payload: TPayload;
turnId?: string;
captureId?: string;
timestamp?: string;
final?: boolean;
callId?: string;
itemId?: string;
parentId?: string;
};
export type TalkEventSequencer = {
next<TPayload>(input: TalkEventInput<TPayload>): TalkEvent<TPayload>;
};
const TURN_SCOPED_TALK_EVENT_TYPES = new Set<TalkEventType>([
"turn.started",
"turn.ended",
"turn.cancelled",
"input.audio.delta",
"input.audio.committed",
"transcript.delta",
"transcript.done",
"output.text.delta",
"output.text.done",
"output.audio.started",
"output.audio.delta",
"output.audio.done",
"tool.call",
"tool.progress",
"tool.result",
"tool.error",
]);
const CAPTURE_SCOPED_TALK_EVENT_TYPES = new Set<TalkEventType>([
"capture.started",
"capture.stopped",
"capture.cancelled",
"capture.once",
]);
function assertTalkEventCorrelation(input: TalkEventInput): void {
if (TURN_SCOPED_TALK_EVENT_TYPES.has(input.type) && !input.turnId?.trim()) {
throw new Error(`Talk event ${input.type} requires turnId`);
}
if (CAPTURE_SCOPED_TALK_EVENT_TYPES.has(input.type) && !input.captureId?.trim()) {
throw new Error(`Talk event ${input.type} requires captureId`);
}
}
export function createTalkEventSequencer(
context: TalkEventContext,
options: { now?: () => Date | string } = {},
): TalkEventSequencer {
let seq = 0;
const now = options.now ?? (() => new Date());
return {
next<TPayload>(input: TalkEventInput<TPayload>): TalkEvent<TPayload> {
assertTalkEventCorrelation(input);
seq += 1;
const timestamp =
input.timestamp ??
(() => {
const value = now();
return typeof value === "string" ? value : value.toISOString();
})();
return {
...context,
id: `${context.sessionId}:${seq}`,
type: input.type,
turnId: input.turnId,
captureId: input.captureId,
seq,
timestamp,
final: input.final,
callId: input.callId,
itemId: input.itemId,
parentId: input.parentId,
payload: input.payload,
};
},
};
}

View File

@@ -0,0 +1,124 @@
import { describe, expect, it } from "vitest";
import { createTalkSessionController, normalizeTalkTransport } from "./talk-session-controller.js";
function createController() {
return createTalkSessionController(
{
sessionId: "talk-session",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "test",
maxRecentEvents: 3,
},
{ now: () => "2026-05-05T00:00:00.000Z" },
);
}
describe("createTalkSessionController", () => {
it("emits common envelopes and keeps bounded recent event history", () => {
const talk = createController();
talk.emit({ type: "session.started", payload: {} });
const firstTurn = talk.ensureTurn();
talk.emit({
type: "input.audio.delta",
turnId: firstTurn.turnId,
payload: { byteLength: 5 },
});
talk.emit({
type: "transcript.done",
turnId: firstTurn.turnId,
payload: { text: "hello" },
final: true,
});
expect(firstTurn.event).toMatchObject({
id: "talk-session:2",
type: "turn.started",
sessionId: "talk-session",
turnId: "turn-1",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "test",
seq: 2,
timestamp: "2026-05-05T00:00:00.000Z",
});
expect(talk.recentEvents.map((event) => event.type)).toEqual([
"turn.started",
"input.audio.delta",
"transcript.done",
]);
});
it("rejects stale turn completion before clearing the active turn", () => {
const talk = createController();
talk.ensureTurn({ turnId: "turn-old" });
expect(talk.endTurn({ turnId: "turn-other" })).toEqual({
ok: false,
reason: "stale_turn",
});
expect(talk.activeTurnId).toBe("turn-old");
const ended = talk.endTurn({ turnId: "turn-old", payload: { reason: "done" } });
expect(ended).toMatchObject({
ok: true,
turnId: "turn-old",
event: {
type: "turn.ended",
turnId: "turn-old",
payload: { reason: "done" },
final: true,
},
});
expect(talk.activeTurnId).toBeUndefined();
});
it("tracks output audio lifecycle without duplicate started events", () => {
const talk = createController();
const first = talk.startOutputAudio({ payload: { callId: "call-1" } });
const second = talk.startOutputAudio({ payload: { callId: "call-1" } });
const done = talk.finishOutputAudio({ payload: { reason: "mark" } });
expect(first.event).toMatchObject({
type: "output.audio.started",
turnId: "turn-1",
});
expect(second).toEqual({ turnId: "turn-1" });
expect(done).toMatchObject({
type: "output.audio.done",
turnId: "turn-1",
payload: { reason: "mark" },
final: true,
});
expect(talk.outputAudioActive).toBe(false);
});
it("clears stale output audio state when a replacement turn starts", () => {
const talk = createController();
talk.startOutputAudio({ turnId: "turn-old" });
expect(talk.outputAudioActive).toBe(true);
const current = talk.startTurn({ turnId: "turn-current" });
expect(current).toMatchObject({
turnId: "turn-current",
event: expect.objectContaining({ type: "turn.started", turnId: "turn-current" }),
});
expect(talk.activeTurnId).toBe("turn-current");
expect(talk.outputAudioActive).toBe(false);
});
});
describe("normalizeTalkTransport", () => {
it("maps legacy public transport names to canonical names", () => {
expect(normalizeTalkTransport(undefined)).toBeUndefined();
expect(normalizeTalkTransport("webrtc-sdp")).toBe("webrtc");
expect(normalizeTalkTransport("json-pcm-websocket")).toBe("provider-websocket");
expect(normalizeTalkTransport("gateway-relay")).toBe("gateway-relay");
});
});

View File

@@ -0,0 +1,206 @@
import {
createTalkEventSequencer,
type TalkBrain,
type TalkEvent,
type TalkEventContext,
type TalkEventInput,
type TalkEventSequencer,
type TalkMode,
type TalkTransport,
} from "./talk-events.js";
export type TalkTurnFailureReason = "no_active_turn" | "stale_turn";
export type TalkTurnSuccess = {
event: TalkEvent;
ok: true;
turnId: string;
};
export type TalkTurnFailure = {
ok: false;
reason: TalkTurnFailureReason;
};
export type TalkTurnResult = TalkTurnSuccess | TalkTurnFailure;
export type TalkEnsureTurnResult = {
event?: TalkEvent;
turnId: string;
};
export type TalkSessionController = {
readonly activeTurnId: string | undefined;
readonly context: TalkEventContext;
readonly outputAudioActive: boolean;
readonly recentEvents: readonly TalkEvent[];
clearActiveTurn(): void;
emit<TPayload>(input: TalkEventInput<TPayload>): TalkEvent<TPayload>;
ensureTurn(params?: { payload?: unknown; turnId?: string }): TalkEnsureTurnResult;
startTurn(params?: { payload?: unknown; turnId?: string }): TalkEnsureTurnResult;
endTurn(params?: { payload?: unknown; turnId?: string }): TalkTurnResult;
cancelTurn(params?: { payload?: unknown; turnId?: string }): TalkTurnResult;
finishOutputAudio(params?: { payload?: unknown; turnId?: string }): TalkEvent | undefined;
startOutputAudio(params?: { payload?: unknown; turnId?: string }): TalkEnsureTurnResult;
};
export type TalkSessionControllerParams = TalkEventContext & {
maxRecentEvents?: number;
turnIdPrefix?: string;
};
export function createTalkSessionController(
params: TalkSessionControllerParams,
options: { now?: () => Date | string; sequencer?: TalkEventSequencer } = {},
): TalkSessionController {
const { maxRecentEvents = 20, turnIdPrefix = "turn", ...context } = params;
const sequencer = options.sequencer ?? createTalkEventSequencer(context, { now: options.now });
const recentEvents: TalkEvent[] = [];
let activeTurnId: string | undefined;
let outputAudioActive = false;
let turnSeq = 0;
const remember = <TPayload>(event: TalkEvent<TPayload>): TalkEvent<TPayload> => {
recentEvents.push(event as TalkEvent);
if (recentEvents.length > maxRecentEvents) {
recentEvents.splice(0, recentEvents.length - maxRecentEvents);
}
return event;
};
const emit = <TPayload>(input: TalkEventInput<TPayload>): TalkEvent<TPayload> => {
return remember(sequencer.next(input));
};
const resolveActiveTurn = (requestedTurnId: string | undefined): string | TalkTurnFailure => {
if (!activeTurnId) {
return { ok: false, reason: "no_active_turn" };
}
const normalizedRequested = normalizeOptionalString(requestedTurnId);
if (normalizedRequested && normalizedRequested !== activeTurnId) {
return { ok: false, reason: "stale_turn" };
}
return activeTurnId;
};
const ensureTurn = (ensureParams: { payload?: unknown; turnId?: string } = {}) => {
if (activeTurnId) {
return { turnId: activeTurnId };
}
return startTurn(ensureParams);
};
const startTurn = (startParams: { payload?: unknown; turnId?: string } = {}) => {
const turnId = normalizeOptionalString(startParams.turnId) ?? `${turnIdPrefix}-${++turnSeq}`;
outputAudioActive = false;
activeTurnId = turnId;
return {
turnId,
event: emit({
type: "turn.started",
turnId,
payload: startParams.payload ?? {},
}),
};
};
const finishTurn = (
type: "turn.ended" | "turn.cancelled",
paramsForTurn: { payload?: unknown; turnId?: string } = {},
): TalkTurnResult => {
const turnId = resolveActiveTurn(paramsForTurn.turnId);
if (typeof turnId !== "string") {
return turnId;
}
outputAudioActive = false;
activeTurnId = undefined;
return {
ok: true,
turnId,
event: emit({
type,
turnId,
payload: paramsForTurn.payload ?? {},
final: true,
}),
};
};
return {
get activeTurnId() {
return activeTurnId;
},
context,
get outputAudioActive() {
return outputAudioActive;
},
get recentEvents() {
return recentEvents;
},
clearActiveTurn() {
activeTurnId = undefined;
outputAudioActive = false;
},
emit,
ensureTurn,
startTurn,
endTurn(paramsForTurn) {
return finishTurn("turn.ended", paramsForTurn);
},
cancelTurn(paramsForTurn) {
return finishTurn("turn.cancelled", paramsForTurn);
},
finishOutputAudio(paramsForOutput = {}) {
if (!outputAudioActive) {
return undefined;
}
const turnId = resolveActiveTurn(paramsForOutput.turnId);
if (typeof turnId !== "string") {
return undefined;
}
outputAudioActive = false;
return emit({
type: "output.audio.done",
turnId,
payload: paramsForOutput.payload ?? {},
final: true,
});
},
startOutputAudio(paramsForOutput = {}) {
const turn = ensureTurn({ turnId: paramsForOutput.turnId, payload: {} });
if (outputAudioActive) {
return { turnId: turn.turnId };
}
outputAudioActive = true;
return {
turnId: turn.turnId,
event: emit({
type: "output.audio.started",
turnId: turn.turnId,
payload: paramsForOutput.payload ?? {},
}),
};
},
};
}
export function normalizeTalkTransport(value: string | undefined): string | undefined {
const normalized = normalizeOptionalString(value);
if (!normalized) {
return undefined;
}
if (normalized === "webrtc-sdp") {
return "webrtc";
}
if (normalized === "json-pcm-websocket") {
return "provider-websocket";
}
return normalized;
}
export type { TalkBrain, TalkEvent, TalkEventContext, TalkEventInput, TalkMode, TalkTransport };
function normalizeOptionalString(value: string | undefined): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}