feat(voice-call): improve realtime Meet voice agent

* feat(voice-call): inject agent context into realtime voice

* fix(voice-call): stabilize realtime meet audio

* fix(voice-call): delegate realtime consults to agent

* Improve realtime Meet voice consult routing

* Pin voice consult delivery to call session

* Move voice changelog entries to changes

* fix(voice-call): isolate final realtime transcripts

* test(voice-call): trim redundant realtime coverage
This commit is contained in:
scoootscooob
2026-05-05 12:56:31 -07:00
committed by GitHub
parent 782963ae66
commit 79dd65e208
35 changed files with 2088 additions and 137 deletions

View File

@@ -1256,6 +1256,7 @@ describe("google-meet plugin", () => {
dtmfSequence: "123456#",
logger: expect.objectContaining({ info: expect.any(Function) }),
message: "Say exactly: I'm here and listening.",
sessionKey: expect.stringMatching(/^voice:google-meet:meet_/),
});
});

View File

@@ -41,6 +41,10 @@ function nowIso(): string {
return new Date().toISOString();
}
function buildTwilioVoiceCallSessionKey(meetingSessionId: string): string {
return `voice:google-meet:${meetingSessionId}`;
}
export function normalizeMeetUrl(input: unknown): string {
const raw = normalizeOptionalString(input);
if (!raw) {
@@ -478,6 +482,10 @@ export class GoogleMeetRuntime {
dialInNumber,
dtmfSequence,
logger: this.params.logger,
...(request.requesterSessionKey
? { requesterSessionKey: request.requesterSessionKey }
: {}),
sessionKey: buildTwilioVoiceCallSessionKey(session.id),
message: isGoogleMeetTalkBackMode(mode)
? (request.message ??
this.params.config.voiceCall.introMessage ??
@@ -505,7 +513,7 @@ export class GoogleMeetRuntime {
session.notes.push(
this.params.config.voiceCall.enabled
? dtmfSequence
? "Twilio transport delegated the phone leg to the voice-call plugin, then sent configured DTMF after connect before speaking."
? "Twilio transport delegated the phone leg to the voice-call plugin, then queued configured DTMF before realtime connect."
: "Twilio transport delegated the call to the voice-call plugin without configured DTMF."
: "Twilio transport is an explicit dial plan; voice-call delegation is disabled.",
);

View File

@@ -28,7 +28,7 @@ describe("Google Meet voice-call gateway", () => {
gatewayMocks.startGatewayClientWhenEventLoopReady.mockClear();
});
it("starts Twilio Meet calls, sends delayed DTMF, then speaks the intro without TwiML fallback", async () => {
it("starts Twilio Meet calls with pre-connect DTMF, then speaks the intro without TwiML fallback", async () => {
const config = resolveGoogleMeetConfig({
voiceCall: {
gatewayUrl: "ws://127.0.0.1:18789",
@@ -43,6 +43,8 @@ describe("Google Meet voice-call gateway", () => {
dialInNumber: "+15551234567",
dtmfSequence: "123456#",
message: "Say exactly: I'm here and listening.",
requesterSessionKey: "agent:main:discord:channel:general",
sessionKey: "voice:google-meet:meet-1",
});
await join;
@@ -53,20 +55,14 @@ describe("Google Meet voice-call gateway", () => {
{
to: "+15551234567",
mode: "conversation",
dtmfSequence: "123456#",
requesterSessionKey: "agent:main:discord:channel:general",
sessionKey: "voice:google-meet:meet-1",
},
{ timeoutMs: 30_000 },
);
expect(gatewayMocks.request).toHaveBeenNthCalledWith(
2,
"voicecall.dtmf",
{
callId: "call-1",
digits: "123456#",
},
{ timeoutMs: 30_000 },
);
expect(gatewayMocks.request).toHaveBeenNthCalledWith(
3,
"voicecall.speak",
{
callId: "call-1",
@@ -75,13 +71,12 @@ describe("Google Meet voice-call gateway", () => {
},
{ timeoutMs: 30_000 },
);
expect(gatewayMocks.request).toHaveBeenCalledTimes(3);
expect(gatewayMocks.request).toHaveBeenCalledTimes(2);
});
it("skips the intro without failing when the realtime bridge is not ready", async () => {
gatewayMocks.request
.mockResolvedValueOnce({ callId: "call-1" })
.mockResolvedValueOnce({ success: true })
.mockResolvedValueOnce({ success: false, error: "No active realtime bridge for call" });
const config = resolveGoogleMeetConfig({
voiceCall: {

View File

@@ -1,3 +1,4 @@
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
GatewayClient,
startGatewayClientWhenEventLoopReady,
@@ -18,11 +19,6 @@ type VoiceCallSpeakResult = {
error?: string;
};
type VoiceCallDtmfResult = {
success?: boolean;
error?: string;
};
type VoiceCallMeetJoinResult = {
callId: string;
dtmfSent: boolean;
@@ -87,19 +83,24 @@ export async function joinMeetViaVoiceCallGateway(params: {
dtmfSequence?: string;
logger?: RuntimeLogger;
message?: string;
requesterSessionKey?: string;
sessionKey?: string;
}): Promise<VoiceCallMeetJoinResult> {
let client: VoiceCallGatewayClient | undefined;
try {
client = await createConnectedGatewayClient(params.config);
params.logger?.info(
`[google-meet] Delegating Twilio join to Voice Call (dtmf=${params.dtmfSequence ? "post-connect" : "none"}, intro=${params.message ? "delayed" : "none"})`,
`[google-meet] Delegating Twilio join to Voice Call (dtmf=${params.dtmfSequence ? "pre-connect" : "none"}, intro=${params.message ? "delayed" : "none"})`,
);
const start = (await client.request(
"voicecall.start",
{
to: params.dialInNumber,
mode: "conversation",
...(params.dtmfSequence ? { dtmfSequence: params.dtmfSequence } : {}),
...(params.requesterSessionKey ? { requesterSessionKey: params.requesterSessionKey } : {}),
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
},
{ timeoutMs: params.config.voiceCall.requestTimeoutMs },
)) as VoiceCallStartResult;
@@ -109,27 +110,10 @@ export async function joinMeetViaVoiceCallGateway(params: {
params.logger?.info(
`[google-meet] Voice Call Twilio phone leg started: callId=${start.callId}`,
);
let dtmfSent = false;
if (params.dtmfSequence) {
const delayMs = params.config.voiceCall.dtmfDelayMs;
const dtmfSent = Boolean(params.dtmfSequence);
if (dtmfSent) {
params.logger?.info(
`[google-meet] Waiting ${delayMs}ms before sending Meet DTMF for callId=${start.callId}`,
);
await sleep(delayMs);
const dtmf = (await client.request(
"voicecall.dtmf",
{
callId: start.callId,
digits: params.dtmfSequence,
},
{ timeoutMs: params.config.voiceCall.requestTimeoutMs },
)) as VoiceCallDtmfResult;
if (dtmf.success === false) {
throw new Error(dtmf.error || "voicecall.dtmf failed");
}
dtmfSent = true;
params.logger?.info(
`[google-meet] Meet DTMF sent after phone leg connected: callId=${start.callId} digits=${params.dtmfSequence.length}`,
`[google-meet] Meet DTMF queued before realtime connect: callId=${start.callId} digits=${params.dtmfSequence?.length ?? 0}`,
);
}
let introSent = false;
@@ -141,15 +125,23 @@ export async function joinMeetViaVoiceCallGateway(params: {
);
await sleep(delayMs);
}
const spoken = (await client.request(
"voicecall.speak",
{
callId: start.callId,
allowTwimlFallback: false,
message: params.message,
},
{ timeoutMs: params.config.voiceCall.requestTimeoutMs },
)) as VoiceCallSpeakResult;
let spoken: VoiceCallSpeakResult;
try {
spoken = (await client.request(
"voicecall.speak",
{
callId: start.callId,
allowTwimlFallback: false,
message: params.message,
},
{ timeoutMs: params.config.voiceCall.requestTimeoutMs },
)) as VoiceCallSpeakResult;
} catch (err) {
params.logger?.warn?.(
`[google-meet] Skipped intro speech because realtime bridge was not ready: ${formatErrorMessage(err)}`,
);
spoken = { success: false };
}
if (spoken.success === false) {
params.logger?.warn?.(
`[google-meet] Skipped intro speech because realtime bridge was not ready: ${

View File

@@ -3,13 +3,16 @@ import type {
ProviderReplaySessionEntry,
ProviderSanitizeReplayHistoryContext,
} from "openclaw/plugin-sdk/plugin-entry";
import { createTestPluginApi } from "openclaw/plugin-sdk/plugin-test-api";
import {
registerProviderPlugin,
requireRegisteredProvider,
} from "openclaw/plugin-sdk/plugin-test-runtime";
import { createCapturedThinkingConfigStream } from "openclaw/plugin-sdk/provider-test-contracts";
import type { RealtimeVoiceProviderPlugin } from "openclaw/plugin-sdk/realtime-voice";
import { describe, expect, it } from "vitest";
import { registerGoogleGeminiCliProvider } from "./gemini-cli-provider.js";
import googlePlugin from "./index.js";
import { registerGoogleProvider } from "./provider-registration.js";
const googleProviderPlugin = {
@@ -226,4 +229,26 @@ describe("google provider plugin hooks", () => {
expect(googleProvider.buildReplayPolicy).toBe(cliProvider.buildReplayPolicy);
expect(googleProvider.wrapStreamFn).toBe(cliProvider.wrapStreamFn);
});
it("buffers early realtime audio while the lazy Google bridge loads", () => {
let realtimeProvider: RealtimeVoiceProviderPlugin | undefined;
googlePlugin.register(
createTestPluginApi({
registerRealtimeVoiceProvider(provider) {
realtimeProvider = provider;
},
}),
);
const bridge = realtimeProvider?.createBridge({
providerConfig: { apiKey: "gemini-key" },
onAudio() {},
onClearAudio() {},
});
expect(bridge).toBeDefined();
expect(() => bridge?.sendAudio(Buffer.alloc(160))).not.toThrow();
expect(() => bridge?.setMediaTimestamp(20)).not.toThrow();
expect(() => bridge?.sendUserMessage?.("hello")).not.toThrow();
});
});

View File

@@ -200,11 +200,18 @@ function resolveGoogleRealtimeEnvApiKey(): string | undefined {
);
}
const GOOGLE_REALTIME_LAZY_MAX_PENDING_AUDIO_CHUNKS = 320;
function createLazyGoogleRealtimeVoiceBridge(
req: RealtimeVoiceBridgeCreateRequest,
): RealtimeVoiceBridge {
let bridge: RealtimeVoiceBridge | undefined;
let bridgePromise: Promise<RealtimeVoiceBridge> | undefined;
let closed = false;
let latestMediaTimestamp: number | undefined;
let pendingGreeting: string | undefined;
const pendingAudio: Buffer[] = [];
const pendingUserMessages: string[] = [];
const loadBridge = async () => {
if (!bridgePromise) {
bridgePromise = loadGoogleRealtimeVoiceProvider().then((provider) =>
@@ -220,20 +227,78 @@ function createLazyGoogleRealtimeVoiceBridge(
}
return bridge;
};
const flushPending = (loadedBridge: RealtimeVoiceBridge) => {
if (typeof latestMediaTimestamp === "number") {
loadedBridge.setMediaTimestamp(latestMediaTimestamp);
}
for (const audio of pendingAudio.splice(0)) {
loadedBridge.sendAudio(audio);
}
for (const text of pendingUserMessages.splice(0)) {
loadedBridge.sendUserMessage?.(text);
}
if (pendingGreeting !== undefined) {
const greeting = pendingGreeting;
pendingGreeting = undefined;
loadedBridge.triggerGreeting?.(greeting);
}
};
return {
supportsToolResultContinuation: true,
connect: async () => {
await (await loadBridge()).connect();
const loadedBridge = await loadBridge();
if (closed) {
loadedBridge.close();
return;
}
await loadedBridge.connect();
flushPending(loadedBridge);
},
sendAudio: (audio) => {
if (bridge) {
bridge.sendAudio(audio);
return;
}
if (!closed) {
if (pendingAudio.length >= GOOGLE_REALTIME_LAZY_MAX_PENDING_AUDIO_CHUNKS) {
pendingAudio.shift();
}
pendingAudio.push(audio);
}
},
setMediaTimestamp: (ts) => {
latestMediaTimestamp = ts;
bridge?.setMediaTimestamp(ts);
},
sendUserMessage: (text) => {
if (bridge) {
bridge.sendUserMessage?.(text);
return;
}
if (!closed) {
pendingUserMessages.push(text);
}
},
triggerGreeting: (instructions) => {
if (bridge) {
bridge.triggerGreeting?.(instructions);
return;
}
if (!closed) {
pendingGreeting = instructions;
}
},
sendAudio: (audio) => requireBridge().sendAudio(audio),
setMediaTimestamp: (ts) => requireBridge().setMediaTimestamp(ts),
sendUserMessage: (text) => requireBridge().sendUserMessage?.(text),
triggerGreeting: (instructions) => requireBridge().triggerGreeting?.(instructions),
handleBargeIn: (options) => requireBridge().handleBargeIn?.(options),
submitToolResult: (callId, result, options) =>
requireBridge().submitToolResult(callId, result, options),
acknowledgeMark: () => requireBridge().acknowledgeMark(),
close: () => bridge?.close(),
close: () => {
closed = true;
pendingAudio.length = 0;
pendingUserMessages.length = 0;
pendingGreeting = undefined;
bridge?.close();
},
isConnected: () => bridge?.isConnected() ?? false,
};
}

View File

@@ -16,7 +16,7 @@ type MockGoogleLiveConnectParams = {
onopen: () => void;
onmessage: (message: Record<string, unknown>) => void;
onerror: (event: { error?: unknown; message?: string }) => void;
onclose: () => void;
onclose: (event?: { code?: number; reason?: string; wasClean?: boolean }) => void;
};
};
@@ -352,6 +352,47 @@ describe("buildGoogleRealtimeVoiceProvider", () => {
expect(lastConnectParams().config.sessionResumption).toEqual({ handle: "resume-1" });
});
it("reconnects unexpected Google Live closes with the latest resumption handle", async () => {
vi.useFakeTimers();
try {
const provider = buildGoogleRealtimeVoiceProvider();
const onClose = vi.fn();
const onError = vi.fn();
const bridge = provider.createBridge({
providerConfig: { apiKey: "gemini-key" },
onAudio: vi.fn(),
onClearAudio: vi.fn(),
onClose,
onError,
});
await bridge.connect();
lastConnectParams().callbacks.onmessage({
setupComplete: { sessionId: "session-1" },
sessionResumptionUpdate: { resumable: true, newHandle: "resume-1" },
});
lastConnectParams().callbacks.onclose({
code: 1011,
reason: "temporary upstream close",
wasClean: false,
});
expect(onClose).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(
expect.objectContaining({
message: expect.stringContaining("reconnecting 1/3"),
}),
);
await vi.advanceTimersByTimeAsync(250);
expect(connectMock).toHaveBeenCalledTimes(2);
expect(lastConnectParams().config.sessionResumption).toEqual({ handle: "resume-1" });
} finally {
vi.useRealTimers();
}
});
it("waits for setup completion before draining audio and firing ready", async () => {
const provider = buildGoogleRealtimeVoiceProvider();
const onReady = vi.fn();

View File

@@ -50,6 +50,9 @@ const MAX_PENDING_AUDIO_CHUNKS = 320;
const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 500;
const GOOGLE_REALTIME_BROWSER_SESSION_TTL_MS = 30 * 60 * 1000;
const GOOGLE_REALTIME_BROWSER_NEW_SESSION_TTL_MS = 60 * 1000;
const GOOGLE_REALTIME_RECONNECT_MAX_ATTEMPTS = 3;
const GOOGLE_REALTIME_RECONNECT_BASE_DELAY_MS = 250;
const GOOGLE_REALTIME_RECONNECT_MAX_DELAY_MS = 2_000;
const MULAW_LINEAR_SAMPLES = new Int16Array(256);
for (let i = 0; i < MULAW_LINEAR_SAMPLES.length; i += 1) {
@@ -401,6 +404,24 @@ function isPcm16Silence(audio: Buffer): boolean {
return true;
}
function formatGoogleLiveCloseEvent(
event:
| {
code?: number;
reason?: string;
wasClean?: boolean;
}
| undefined,
): string {
if (!event) {
return "code=unknown reason=unknown";
}
const code = typeof event.code === "number" ? event.code : "unknown";
const reason = event.reason?.trim() || "none";
const clean = typeof event.wasClean === "boolean" ? ` clean=${event.wasClean}` : "";
return `code=${code} reason=${reason}${clean}`;
}
class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
readonly supportsToolResultContinuation = true;
@@ -415,6 +436,8 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
private pendingFunctionNames = new Map<string, string>();
private readonly audioFormat: RealtimeVoiceAudioFormat;
private resumptionHandle: string | undefined;
private reconnectAttempts = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | undefined;
constructor(private readonly config: GoogleRealtimeVoiceBridgeConfig) {
this.audioFormat = config.audioFormat ?? REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ;
@@ -464,13 +487,23 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
);
this.config.onError?.(error);
},
onclose: () => {
onclose: (event) => {
this.connected = false;
this.sessionConfigured = false;
this.pendingFunctionNames.clear();
const reason = this.intentionallyClosed ? "completed" : "error";
this.session = null;
this.config.onClose?.(reason);
if (this.intentionallyClosed) {
this.config.onClose?.("completed");
return;
}
const closeDetails = formatGoogleLiveCloseEvent(event);
if (this.scheduleReconnect(closeDetails)) {
return;
}
this.config.onError?.(
new Error(`Google Live session closed after reconnect attempts: ${closeDetails}`),
);
this.config.onClose?.("error");
},
},
})) as GoogleLiveSession;
@@ -596,6 +629,10 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
this.intentionallyClosed = true;
this.connected = false;
this.sessionConfigured = false;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
}
this.pendingAudio = [];
this.consecutiveSilenceMs = 0;
this.audioStreamEnded = false;
@@ -667,6 +704,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
private handleSetupComplete(): void {
this.sessionConfigured = true;
this.reconnectAttempts = 0;
for (const chunk of this.pendingAudio.splice(0)) {
this.sendAudio(chunk);
}
@@ -739,6 +777,36 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
});
}
}
private scheduleReconnect(closeDetails: string): boolean {
if (this.reconnectAttempts >= GOOGLE_REALTIME_RECONNECT_MAX_ATTEMPTS) {
return false;
}
const attempt = ++this.reconnectAttempts;
const delayMs = Math.min(
GOOGLE_REALTIME_RECONNECT_MAX_DELAY_MS,
GOOGLE_REALTIME_RECONNECT_BASE_DELAY_MS * 2 ** (attempt - 1),
);
this.config.onError?.(
new Error(
`Google Live session closed unexpectedly (${closeDetails}); reconnecting ${attempt}/${GOOGLE_REALTIME_RECONNECT_MAX_ATTEMPTS} in ${delayMs}ms`,
),
);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = undefined;
if (this.intentionallyClosed) {
return;
}
this.connect().catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
this.config.onError?.(error instanceof Error ? error : new Error(message));
if (!this.scheduleReconnect(`connect failed: ${message}`)) {
this.config.onClose?.("error");
}
});
}, delayMs);
return true;
}
}
function convertMulaw8kToPcm16k(muLaw: Buffer): Buffer {

View File

@@ -412,6 +412,37 @@ describe("voice-call plugin", () => {
expect(respond.mock.calls[0]?.[0]).toBe(true);
});
it("preserves explicit session keys on voicecall.start", async () => {
const { methods } = setup({ provider: "mock" });
const handler = methods.get("voicecall.start") as
| ((ctx: {
params: Record<string, unknown>;
respond: ReturnType<typeof vi.fn>;
}) => Promise<void>)
| undefined;
const respond = vi.fn();
await handler?.({
params: {
mode: "conversation",
requesterSessionKey: "agent:main:discord:channel:general",
sessionKey: "voice:google-meet:meet-1",
to: "+15550001234",
},
respond,
});
expect(runtimeStub.manager.initiateCall).toHaveBeenCalledWith(
"+15550001234",
"voice:google-meet:meet-1",
{
dtmfSequence: undefined,
message: undefined,
mode: "conversation",
requesterSessionKey: "agent:main:discord:channel:general",
},
);
expect(respond.mock.calls[0]?.[0]).toBe(true);
});
it("returns call status", async () => {
const { methods } = setup({ provider: "mock" });
const handler = methods.get("voicecall.status") as

View File

@@ -97,6 +97,11 @@ const voiceCallConfigSchema = {
help: "Controls the shared openclaw_agent_consult tool.",
advanced: true,
},
"realtime.consultPolicy": {
label: "Realtime Consult Policy",
help: "Guides when the realtime voice model should call openclaw_agent_consult.",
advanced: true,
},
"realtime.fastContext.enabled": {
label: "Enable Fast Realtime Context",
help: "Searches memory/session context before the full consult agent.",
@@ -118,6 +123,31 @@ const voiceCallConfigSchema = {
label: "Fallback To Full Consult",
advanced: true,
},
"realtime.agentContext.enabled": {
label: "Enable Agent Voice Context",
help: "Injects a compact agent identity, system prompt, and workspace context capsule into realtime voice instructions.",
advanced: true,
},
"realtime.agentContext.maxChars": {
label: "Agent Voice Context Limit",
advanced: true,
},
"realtime.agentContext.includeIdentity": {
label: "Include Agent Identity",
advanced: true,
},
"realtime.agentContext.includeSystemPrompt": {
label: "Include Agent System Prompt",
advanced: true,
},
"realtime.agentContext.includeWorkspaceFiles": {
label: "Include Agent Workspace Files",
advanced: true,
},
"realtime.agentContext.files": {
label: "Agent Voice Context Files",
advanced: true,
},
"realtime.providers": { label: "Realtime Provider Config", advanced: true },
"tts.provider": {
label: "TTS Provider Override",
@@ -152,6 +182,10 @@ const VoiceCallToolSchema = Type.Union([
to: Type.Optional(Type.String({ description: "Call target" })),
message: Type.String({ description: "Intro message" }),
mode: Type.Optional(Type.Union([Type.Literal("notify"), Type.Literal("conversation")])),
sessionKey: Type.Optional(Type.String({ description: "OpenClaw session key for the call" })),
requesterSessionKey: Type.Optional(
Type.String({ description: "OpenClaw session key that initiated the call" }),
),
dtmfSequence: Type.Optional(Type.String({ description: "DTMF digits to play before connect" })),
}),
Type.Object({
@@ -182,6 +216,10 @@ const VoiceCallToolSchema = Type.Union([
to: Type.Optional(Type.String({ description: "Call target" })),
sid: Type.Optional(Type.String({ description: "Call SID" })),
message: Type.Optional(Type.String({ description: "Optional intro message" })),
sessionKey: Type.Optional(Type.String({ description: "OpenClaw session key for the call" })),
requesterSessionKey: Type.Optional(
Type.String({ description: "OpenClaw session key that initiated the call" }),
),
dtmfSequence: Type.Optional(Type.String({ description: "DTMF digits to play before connect" })),
}),
]);
@@ -342,11 +380,14 @@ export default definePluginEntry({
message?: string;
mode?: "notify" | "conversation";
dtmfSequence?: string;
sessionKey?: string;
requesterSessionKey?: string;
}) => {
const result = await params.rt.manager.initiateCall(params.to, undefined, {
const result = await params.rt.manager.initiateCall(params.to, params.sessionKey, {
message: params.message,
mode: params.mode,
dtmfSequence: params.dtmfSequence,
...(params.requesterSessionKey ? { requesterSessionKey: params.requesterSessionKey } : {}),
});
if (!result.success) {
respondError(params.respond, result.error || "initiate failed");
@@ -413,6 +454,8 @@ export default definePluginEntry({
to,
message,
mode,
sessionKey: normalizeOptionalString(params?.sessionKey),
requesterSessionKey: normalizeOptionalString(params?.requesterSessionKey),
});
} catch (err) {
sendError(respond, err);
@@ -603,6 +646,8 @@ export default definePluginEntry({
const to = normalizeOptionalString(params?.to) ?? "";
const message = normalizeOptionalString(params?.message) ?? "";
const dtmfSequence = normalizeOptionalString(params?.dtmfSequence);
const sessionKey = normalizeOptionalString(params?.sessionKey);
const requesterSessionKey = normalizeOptionalString(params?.requesterSessionKey);
if (!to) {
respondError(respond, "to required", ErrorCodes.INVALID_REQUEST);
return;
@@ -617,6 +662,8 @@ export default definePluginEntry({
message: message || undefined,
mode,
dtmfSequence,
sessionKey,
...(requesterSessionKey ? { requesterSessionKey } : {}),
});
} catch (err) {
sendError(respond, err);
@@ -737,10 +784,17 @@ export default definePluginEntry({
if (!to) {
throw new Error("to required for call");
}
const result = await rt.manager.initiateCall(to, undefined, {
dtmfSequence: normalizeOptionalString(rawParams.dtmfSequence),
message: normalizeOptionalString(rawParams.message),
});
const result = await rt.manager.initiateCall(
to,
normalizeOptionalString(rawParams.sessionKey),
{
dtmfSequence: normalizeOptionalString(rawParams.dtmfSequence),
message: normalizeOptionalString(rawParams.message),
...(normalizeOptionalString(rawParams.requesterSessionKey)
? { requesterSessionKey: normalizeOptionalString(rawParams.requesterSessionKey) }
: {}),
},
);
if (!result.success) {
throw new Error(result.error || "initiate failed");
}

View File

@@ -148,6 +148,11 @@
"help": "Controls the shared openclaw_agent_consult tool.",
"advanced": true
},
"realtime.consultPolicy": {
"label": "Realtime Consult Policy",
"help": "Guides when the realtime voice model should call openclaw_agent_consult.",
"advanced": true
},
"realtime.fastContext.enabled": {
"label": "Enable Fast Realtime Context",
"help": "Searches memory/session context before the full consult agent.",
@@ -169,6 +174,31 @@
"label": "Fallback To Full Consult",
"advanced": true
},
"realtime.agentContext.enabled": {
"label": "Enable Agent Voice Context",
"help": "Injects a compact agent identity, system prompt, and workspace context capsule into realtime voice instructions.",
"advanced": true
},
"realtime.agentContext.maxChars": {
"label": "Agent Voice Context Limit",
"advanced": true
},
"realtime.agentContext.includeIdentity": {
"label": "Include Agent Identity",
"advanced": true
},
"realtime.agentContext.includeSystemPrompt": {
"label": "Include Agent System Prompt",
"advanced": true
},
"realtime.agentContext.includeWorkspaceFiles": {
"label": "Include Agent Workspace Files",
"advanced": true
},
"realtime.agentContext.files": {
"label": "Agent Voice Context Files",
"advanced": true
},
"realtime.providers": {
"label": "Realtime Provider Config",
"advanced": true
@@ -481,6 +511,10 @@
"type": "string",
"enum": ["safe-read-only", "owner", "none"]
},
"consultPolicy": {
"type": "string",
"enum": ["auto", "substantive", "always"]
},
"tools": {
"type": "array",
"items": {
@@ -550,6 +584,35 @@
}
}
},
"agentContext": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"type": "boolean"
},
"maxChars": {
"type": "integer",
"minimum": 1
},
"includeIdentity": {
"type": "boolean"
},
"includeSystemPrompt": {
"type": "boolean"
},
"includeWorkspaceFiles": {
"type": "boolean"
},
"files": {
"type": "array",
"items": {
"type": "string",
"minLength": 1
}
}
}
},
"providers": {
"type": "object",
"additionalProperties": {

View File

@@ -388,6 +388,7 @@ describe("normalizeVoiceCallConfig", () => {
expect(normalized.streaming.providers).toEqual({});
expect(normalized.realtime.streamPath).toBe("/voice/stream/realtime");
expect(normalized.realtime.toolPolicy).toBe("safe-read-only");
expect(normalized.realtime.consultPolicy).toBe("auto");
expect(normalized.realtime.fastContext).toEqual({
enabled: false,
timeoutMs: 800,
@@ -395,6 +396,14 @@ describe("normalizeVoiceCallConfig", () => {
sources: ["memory", "sessions"],
fallbackToConsult: false,
});
expect(normalized.realtime.agentContext).toEqual({
enabled: false,
maxChars: 6000,
includeIdentity: true,
includeSystemPrompt: true,
includeWorkspaceFiles: true,
files: ["SOUL.md", "IDENTITY.md", "USER.md"],
});
expect(normalized.realtime.instructions).toContain("openclaw_agent_consult");
expect(normalized.tunnel.provider).toBe("none");
expect(normalized.webhookSecurity.allowedHosts).toEqual([]);
@@ -455,6 +464,7 @@ describe("resolveVoiceCallConfig", () => {
expect(resolved.realtime.instructions).toBe("Stay concise.");
expect(resolved.realtime.toolPolicy).toBe("safe-read-only");
expect(resolved.realtime.consultPolicy).toBe("auto");
expect(resolved.realtime.provider).toBeUndefined();
});

View File

@@ -227,6 +227,7 @@ const VoiceCallRealtimeProvidersConfigSchema = z
.default({});
const VoiceCallRealtimeToolPolicySchema = z.enum(REALTIME_VOICE_AGENT_CONSULT_TOOL_POLICIES);
const VoiceCallRealtimeConsultPolicySchema = z.enum(["auto", "substantive", "always"]);
const VoiceCallRealtimeFastContextSourceSchema = z.enum(["memory", "sessions"]);
@@ -258,6 +259,34 @@ export type VoiceCallRealtimeFastContextConfig = z.infer<
typeof VoiceCallRealtimeFastContextConfigSchema
>;
const VoiceCallRealtimeAgentContextConfigSchema = z
.object({
/** Inject a compact agent persona/context capsule into realtime voice instructions. */
enabled: z.boolean().default(false),
/** Maximum number of characters from the generated capsule to append. */
maxChars: z.number().int().positive().default(6000),
/** Include configured agent identity fields. */
includeIdentity: z.boolean().default(true),
/** Include agents.defaults/list systemPromptOverride when configured. */
includeSystemPrompt: z.boolean().default(true),
/** Include selected workspace files such as SOUL.md and IDENTITY.md. */
includeWorkspaceFiles: z.boolean().default(true),
/** Workspace-relative files to include, bounded by maxChars. */
files: z.array(z.string().min(1)).default(["SOUL.md", "IDENTITY.md", "USER.md"]),
})
.strict()
.default({
enabled: false,
maxChars: 6000,
includeIdentity: true,
includeSystemPrompt: true,
includeWorkspaceFiles: true,
files: ["SOUL.md", "IDENTITY.md", "USER.md"],
});
export type VoiceCallRealtimeAgentContextConfig = z.infer<
typeof VoiceCallRealtimeAgentContextConfigSchema
>;
const VoiceCallStreamingProvidersConfigSchema = z
.record(z.string(), z.record(z.string(), z.unknown()))
.default({});
@@ -274,10 +303,14 @@ const VoiceCallRealtimeConfigSchema = z
instructions: z.string().default(DEFAULT_VOICE_CALL_REALTIME_INSTRUCTIONS),
/** Tool policy for the shared OpenClaw agent consult tool. */
toolPolicy: VoiceCallRealtimeToolPolicySchema.default("safe-read-only"),
/** Guidance for when the realtime model should call the OpenClaw agent consult tool. */
consultPolicy: VoiceCallRealtimeConsultPolicySchema.default("auto"),
/** Tool definitions exposed to the realtime provider. */
tools: z.array(RealtimeToolSchema).default([]),
/** Low-latency memory/session context for the consult tool. */
fastContext: VoiceCallRealtimeFastContextConfigSchema,
/** Bounded agent persona/context injection for the fast realtime voice path. */
agentContext: VoiceCallRealtimeAgentContextConfigSchema,
/** Provider-owned raw config blobs keyed by provider id. */
providers: VoiceCallRealtimeProvidersConfigSchema,
})
@@ -286,6 +319,7 @@ const VoiceCallRealtimeConfigSchema = z
enabled: false,
instructions: DEFAULT_VOICE_CALL_REALTIME_INSTRUCTIONS,
toolPolicy: "safe-read-only",
consultPolicy: "auto",
tools: [],
fastContext: {
enabled: false,
@@ -294,6 +328,14 @@ const VoiceCallRealtimeConfigSchema = z
sources: ["memory", "sessions"],
fallbackToConsult: false,
},
agentContext: {
enabled: false,
maxChars: 6000,
includeIdentity: true,
includeSystemPrompt: true,
includeWorkspaceFiles: true,
files: ["SOUL.md", "IDENTITY.md", "USER.md"],
},
providers: {},
});
export type VoiceCallRealtimeConfig = z.infer<typeof VoiceCallRealtimeConfigSchema>;
@@ -606,6 +648,11 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal
...config.realtime?.fastContext,
sources: config.realtime?.fastContext?.sources ?? defaults.realtime.fastContext.sources,
};
const realtimeAgentContext = {
...defaults.realtime.agentContext,
...config.realtime?.agentContext,
files: config.realtime?.agentContext?.files ?? defaults.realtime.agentContext.files,
};
return {
...defaults,
...config,
@@ -640,6 +687,7 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal
tools:
(config.realtime?.tools as RealtimeToolConfig[] | undefined) ?? defaults.realtime.tools,
fastContext: realtimeFastContext,
agentContext: realtimeAgentContext,
providers: realtimeProviders,
},
tts: normalizeVoiceCallTtsConfig(defaults.tts, config.tts),

View File

@@ -123,6 +123,7 @@ export async function initiateCall(
const initialMessage = opts.message;
const mode = opts.mode ?? ctx.config.outbound.defaultMode;
const dtmfSequence = opts.dtmfSequence;
const requesterSessionKey = opts.requesterSessionKey?.trim();
if (dtmfSequence) {
const validationError = validateDtmfDigits(dtmfSequence);
if (validationError) {
@@ -178,6 +179,7 @@ export async function initiateCall(
metadata: {
...(initialMessage && { initialMessage }),
mode,
...(requesterSessionKey ? { requesterSessionKey } : {}),
},
};

View File

@@ -0,0 +1,101 @@
import { mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { VoiceCallConfig } from "./config.js";
import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js";
import { buildRealtimeVoiceInstructions } from "./realtime-agent-context.js";
import { createVoiceCallBaseConfig } from "./test-fixtures.js";
const tempDirs: string[] = [];
afterEach(async () => {
await Promise.all(tempDirs.splice(0).map((dir) => rm(dir, { recursive: true, force: true })));
});
async function createWorkspace(): Promise<string> {
const workspaceDir = await mkdtemp(path.join(tmpdir(), "openclaw-voice-context-"));
tempDirs.push(workspaceDir);
return workspaceDir;
}
function createConfig(overrides?: Partial<VoiceCallConfig["realtime"]>): VoiceCallConfig {
const config = createVoiceCallBaseConfig();
config.agentId = "voice";
config.realtime.enabled = true;
config.realtime.instructions = "Base voice instructions.";
config.realtime = {
...config.realtime,
...overrides,
fastContext: {
...config.realtime.fastContext,
...overrides?.fastContext,
sources: overrides?.fastContext?.sources ?? config.realtime.fastContext.sources,
},
agentContext: {
...config.realtime.agentContext,
...overrides?.agentContext,
files: overrides?.agentContext?.files ?? config.realtime.agentContext.files,
},
tools: overrides?.tools ?? config.realtime.tools,
providers: overrides?.providers ?? config.realtime.providers,
};
return config;
}
function createAgentRuntime(workspaceDir: string): CoreAgentDeps {
return {
resolveAgentIdentity: vi.fn(() => ({
name: "Claw Voice",
emoji: ":claw:",
theme: "bright",
vibe: "snappy",
creature: "operator",
})),
resolveAgentWorkspaceDir: vi.fn(() => workspaceDir),
} as unknown as CoreAgentDeps;
}
describe("buildRealtimeVoiceInstructions", () => {
it("injects bounded identity, system prompt, and workspace context", async () => {
const workspaceDir = await createWorkspace();
await writeFile(path.join(workspaceDir, "SOUL.md"), "Stay quick, direct, and warm.\n");
await writeFile(path.join(workspaceDir, "IDENTITY.md"), "Name: Claw Voice\nVibe: snappy\n");
await writeFile(path.join(workspaceDir, "SECRET.md"), "do not include\n");
const coreConfig = {
agents: {
list: [{ id: "voice", systemPromptOverride: "Keep spoken answers short." }],
},
} as CoreConfig;
const instructions = await buildRealtimeVoiceInstructions({
baseInstructions: "Base voice instructions.",
config: createConfig({
consultPolicy: "substantive",
agentContext: {
enabled: true,
maxChars: 2000,
includeIdentity: true,
includeSystemPrompt: true,
includeWorkspaceFiles: true,
files: ["SOUL.md", "IDENTITY.md", "../SECRET.md"],
},
}),
coreConfig,
agentRuntime: createAgentRuntime(workspaceDir),
});
expect(instructions).toContain("OpenClaw agent voice context:");
expect(instructions).toContain("Consult behavior:");
expect(instructions).toContain("Call openclaw_agent_consult before answering requests");
expect(instructions).toContain("- Agent id: voice");
expect(instructions).toContain("- Name: Claw Voice");
expect(instructions).toContain("- Vibe: snappy");
expect(instructions).toContain("Keep spoken answers short.");
expect(instructions).toContain("### SOUL.md");
expect(instructions).toContain("Stay quick, direct, and warm.");
expect(instructions).toContain("### IDENTITY.md");
expect(instructions).not.toContain("do not include");
});
});

View File

@@ -0,0 +1,177 @@
import { readFile } from "node:fs/promises";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import type { VoiceCallConfig } from "./config.js";
import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js";
type AgentEntryLike = {
id?: unknown;
systemPromptOverride?: unknown;
};
type VoiceIdentityLike = {
name?: unknown;
emoji?: unknown;
theme?: unknown;
creature?: unknown;
vibe?: unknown;
};
function normalizeString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function readAgentEntries(cfg: CoreConfig): AgentEntryLike[] {
const agents = (cfg as { agents?: { list?: unknown } }).agents;
return Array.isArray(agents?.list)
? agents.list.filter((entry): entry is AgentEntryLike =>
Boolean(entry && typeof entry === "object"),
)
: [];
}
function resolveAgentSystemPromptOverride(cfg: CoreConfig, agentId: string): string | undefined {
const entries = readAgentEntries(cfg);
const entry = entries.find((candidate) => normalizeString(candidate.id) === agentId);
return (
normalizeString(entry?.systemPromptOverride) ??
normalizeString(
(cfg as { agents?: { defaults?: { systemPromptOverride?: unknown } } }).agents?.defaults
?.systemPromptOverride,
)
);
}
function isSafeWorkspaceRelativeFile(file: string): boolean {
if (!file.trim() || path.isAbsolute(file)) {
return false;
}
const normalized = path.normalize(file);
const parts = normalized.split(/[\\/]+/);
return normalized !== "." && !parts.includes("..") && !normalized.includes("\0");
}
function limitText(text: string, maxChars: number): string {
if (text.length <= maxChars) {
return text;
}
return `${text.slice(0, Math.max(0, maxChars - 32)).trimEnd()}\n[truncated]`;
}
async function readWorkspaceVoiceContextFiles(params: {
workspaceDir: string;
files: readonly string[];
maxChars: number;
}): Promise<string[]> {
const sections: string[] = [];
let remaining = params.maxChars;
for (const file of params.files) {
if (remaining <= 0 || !isSafeWorkspaceRelativeFile(file)) {
continue;
}
const fullPath = path.join(params.workspaceDir, path.normalize(file));
const content = await readFile(fullPath, "utf8").catch(() => undefined);
const trimmed = content?.trim();
if (!trimmed) {
continue;
}
const body = limitText(trimmed, Math.max(0, remaining - file.length - 16));
const section = `### ${file}\n${body}`;
sections.push(section);
remaining -= section.length;
}
return sections;
}
function buildConsultPolicyGuidance(
config: Pick<VoiceCallConfig["realtime"], "consultPolicy" | "toolPolicy">,
): string | undefined {
if (config.toolPolicy === "none" || config.consultPolicy === "auto") {
return undefined;
}
if (config.consultPolicy === "always") {
return [
"Consult behavior:",
"- Call openclaw_agent_consult before every substantive answer.",
"- You may answer directly only for greetings, acknowledgements, brief latency tests, or filler while waiting for the consult result.",
"- After the consult result arrives, speak that result concisely.",
].join("\n");
}
return [
"Consult behavior:",
"- Answer directly for greetings, acknowledgements, simple conversational glue, and brief latency tests.",
"- Call openclaw_agent_consult before answering requests that need facts, memory, current information, tools, workspace state, or the user's OpenClaw-specific context.",
"- Keep spoken replies concise and natural.",
].join("\n");
}
export async function buildRealtimeVoiceInstructions(params: {
baseInstructions: string;
config: VoiceCallConfig;
coreConfig: CoreConfig;
agentRuntime: CoreAgentDeps;
}): Promise<string> {
const { config } = params;
const sections: string[] = [params.baseInstructions];
const consultGuidance = buildConsultPolicyGuidance(config.realtime);
if (consultGuidance) {
sections.push(consultGuidance);
}
const contextConfig = config.realtime.agentContext;
if (!contextConfig.enabled) {
return sections.filter(Boolean).join("\n\n");
}
const agentId = config.agentId ?? "main";
const capsule: string[] = [
"OpenClaw agent voice context:",
`- Agent id: ${agentId}`,
"- Use this context to match the OpenClaw agent's personality and standing preferences on fast voice turns.",
"- Treat this as compact context only; call openclaw_agent_consult when the caller needs the full agent brain, tools, memory, or workspace state.",
];
if (contextConfig.includeIdentity) {
const identity = params.agentRuntime.resolveAgentIdentity(
params.coreConfig as OpenClawConfig,
agentId,
) as VoiceIdentityLike | undefined;
const identityLines = [
normalizeString(identity?.name) ? `- Name: ${normalizeString(identity?.name)}` : undefined,
normalizeString(identity?.emoji) ? `- Emoji: ${normalizeString(identity?.emoji)}` : undefined,
normalizeString(identity?.vibe) ? `- Vibe: ${normalizeString(identity?.vibe)}` : undefined,
normalizeString(identity?.theme) ? `- Theme: ${normalizeString(identity?.theme)}` : undefined,
normalizeString(identity?.creature)
? `- Creature/persona: ${normalizeString(identity?.creature)}`
: undefined,
].filter(Boolean);
if (identityLines.length > 0) {
capsule.push(`Configured identity:\n${identityLines.join("\n")}`);
}
}
if (contextConfig.includeSystemPrompt) {
const systemPrompt = resolveAgentSystemPromptOverride(params.coreConfig, agentId);
if (systemPrompt) {
capsule.push(`Configured system prompt override:\n${systemPrompt}`);
}
}
if (contextConfig.includeWorkspaceFiles) {
const workspaceDir = params.agentRuntime.resolveAgentWorkspaceDir(
params.coreConfig as OpenClawConfig,
agentId,
);
const fileSections = await readWorkspaceVoiceContextFiles({
workspaceDir,
files: contextConfig.files,
maxChars: contextConfig.maxChars,
});
if (fileSections.length > 0) {
capsule.push(`Workspace voice context:\n${fileSections.join("\n\n")}`);
}
}
sections.push(limitText(capsule.join("\n\n"), contextConfig.maxChars));
return sections.filter(Boolean).join("\n\n");
}

View File

@@ -347,6 +347,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
direction: "outbound",
from: "+15550001234",
to: "+15550009999",
metadata: { requesterSessionKey: "agent:main:discord:channel:general" },
transcript: [{ speaker: "user", text: "Can you check shipment status?" }],
});
@@ -384,6 +385,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
expect(runEmbeddedPiAgent).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "voice:15550009999",
spawnedBy: "agent:main:discord:channel:general",
messageProvider: "voice",
lane: "voice",
provider: "openai",

View File

@@ -20,6 +20,7 @@ import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js";
import { CallManager } from "./manager.js";
import type { VoiceCallProvider } from "./providers/base.js";
import type { TwilioProvider } from "./providers/twilio.js";
import { buildRealtimeVoiceInstructions } from "./realtime-agent-context.js";
import { resolveRealtimeFastContextConsult } from "./realtime-fast-context.js";
import { resolveVoiceResponseModel } from "./response-model.js";
import type { TelephonyTtsRuntime } from "./telephony-tts.js";
@@ -60,8 +61,9 @@ type RealtimeVoiceRuntimeModule = typeof import("./realtime-voice.runtime.js");
type RealtimeHandlerModule = typeof import("./webhook/realtime-handler.js");
const REALTIME_VOICE_CONSULT_SYSTEM_PROMPT = [
"You are a behind-the-scenes consultant for a live phone voice agent.",
"Prioritize a fast, speakable answer over exhaustive investigation.",
"You are the configured OpenClaw agent receiving delegated requests from a live phone voice bridge.",
"Act on behalf of the caller using the normal available tools when the caller asks you to do work.",
"Prioritize completing the user's request and returning a fast, speakable result over exhaustive investigation.",
"For tool-backed status checks, prefer one or two bounded read-only queries before answering.",
"Do not print secret values or dump environment variables; only check whether required configuration is present.",
"Be accurate, brief, and speakable.",
@@ -317,8 +319,15 @@ export async function createVoiceCallRuntime(params: {
);
if (realtimeProvider) {
const { RealtimeCallHandler } = await loadRealtimeHandler();
const realtimeInstructions = await buildRealtimeVoiceInstructions({
baseInstructions: config.realtime.instructions,
config,
coreConfig,
agentRuntime,
});
const realtimeConfig = {
...config.realtime,
instructions: realtimeInstructions,
tools: resolveRealtimeVoiceAgentConsultTools(
config.realtime.toolPolicy,
config.realtime.tools,
@@ -350,6 +359,10 @@ export async function createVoiceCallRuntime(params: {
...call,
config: effectiveConfig,
});
const requesterSessionKey =
typeof call.metadata?.requesterSessionKey === "string"
? call.metadata.requesterSessionKey
: undefined;
const fastContext = await resolveRealtimeFastContextConsult({
cfg,
agentId,
@@ -389,6 +402,8 @@ export async function createVoiceCallRuntime(params: {
model,
thinkLevel,
timeoutMs: effectiveConfig.responseTimeoutMs,
spawnedBy: requesterSessionKey,
contextMode: requesterSessionKey ? "fork" : undefined,
toolsAllow: resolveRealtimeVoiceAgentConsultToolsAllow(
effectiveConfig.realtime.toolPolicy,
),

View File

@@ -51,6 +51,7 @@ export function createVoiceCallBaseConfig(params?: {
streamPath: "/voice/stream/realtime",
instructions: DEFAULT_VOICE_CALL_REALTIME_INSTRUCTIONS,
toolPolicy: "safe-read-only",
consultPolicy: "auto",
tools: [],
fastContext: {
enabled: false,
@@ -59,6 +60,14 @@ export function createVoiceCallBaseConfig(params?: {
sources: ["memory", "sessions"],
fallbackToConsult: false,
},
agentContext: {
enabled: false,
maxChars: 6000,
includeIdentity: true,
includeSystemPrompt: true,
includeWorkspaceFiles: true,
files: ["SOUL.md", "IDENTITY.md", "USER.md"],
},
providers: {},
},
skipSignatureVerification: false,

View File

@@ -288,4 +288,6 @@ export type OutboundCallOptions = {
mode?: CallMode;
/** DTMF digits to send after the call is connected */
dtmfSequence?: string;
/** Session that initiated the call, used for agent context/delegated message routing */
requesterSessionKey?: string;
};

View File

@@ -83,6 +83,11 @@ const createConfig = (overrides: VoiceCallConfigInput = {}): VoiceCallConfig =>
...overrides.realtime?.fastContext,
sources: overrides.realtime?.fastContext?.sources ?? base.realtime.fastContext.sources,
},
agentContext: {
...base.realtime.agentContext,
...overrides.realtime?.agentContext,
files: overrides.realtime?.agentContext?.files ?? base.realtime.agentContext.files,
},
providers: overrides.realtime?.providers ?? base.realtime.providers,
},
};

View File

@@ -1,9 +1,9 @@
const TELEPHONY_SAMPLE_RATE = 8_000;
const TELEPHONY_CHUNK_BYTES = 160;
const TELEPHONY_CHUNK_MS = 20;
const DEFAULT_SPEECH_RMS_THRESHOLD = 0.02;
const DEFAULT_REQUIRED_LOUD_CHUNKS = 2;
const DEFAULT_REQUIRED_QUIET_CHUNKS = 10;
const DEFAULT_SPEECH_RMS_THRESHOLD = 0.035;
const DEFAULT_REQUIRED_LOUD_CHUNKS = 4;
const DEFAULT_REQUIRED_QUIET_CHUNKS = 12;
const DEFAULT_MAX_QUEUED_AUDIO_BYTES = TELEPHONY_SAMPLE_RATE * 120;
const PCM16_MAX_AMPLITUDE = 32768;
const MULAW_LINEAR_SAMPLES = new Int16Array(256);
@@ -69,14 +69,16 @@ export class RealtimeTwilioAudioPacer {
this.ensurePump();
}
clearAudio(): void {
clearAudio(): number {
if (this.closed) {
return;
return 0;
}
const clearedAudioBytes = this.queuedAudioBytes;
this.clearTimer();
this.queue = [];
this.queuedAudioBytes = 0;
this.params.sendJson({ event: "clear", streamSid: this.params.streamSid });
return clearedAudioBytes;
}
close(): void {

View File

@@ -59,6 +59,7 @@ function makeHandler(
streamPath: overrides?.streamPath ?? "/voice/stream/realtime",
instructions: overrides?.instructions ?? "Be helpful.",
toolPolicy: overrides?.toolPolicy ?? "safe-read-only",
consultPolicy: overrides?.consultPolicy ?? "auto",
tools: overrides?.tools ?? [],
fastContext: overrides?.fastContext ?? {
enabled: false,
@@ -67,6 +68,14 @@ function makeHandler(
sources: ["memory", "sessions"],
fallbackToConsult: false,
},
agentContext: overrides?.agentContext ?? {
enabled: false,
maxChars: 6000,
includeIdentity: true,
includeSystemPrompt: true,
includeWorkspaceFiles: true,
files: ["SOUL.md", "IDENTITY.md", "USER.md"],
},
providers: overrides?.providers ?? {},
...(overrides?.provider ? { provider: overrides.provider } : {}),
};
@@ -337,7 +346,7 @@ describe("RealtimeCallHandler path routing", () => {
}
});
it("marks realtime calls ended when the provider closes normally", async () => {
it("ends realtime calls when the telephony stream stops", async () => {
let callbacks:
| {
onClose?: (reason: "completed" | "error") => void;
@@ -488,7 +497,9 @@ describe("RealtimeCallHandler path routing", () => {
name: "openclaw_agent_consult",
args: { question: "Are the basement lights on?" },
});
expect(receivedPartialTranscript).toBe("Are the basement");
await vi.waitFor(() => {
expect(receivedPartialTranscript).toBe("Are the basement");
});
await vi.waitFor(() => {
expect(submitToolResult).toHaveBeenCalledWith(
@@ -538,6 +549,346 @@ describe("RealtimeCallHandler path routing", () => {
}
});
it("forces an agent consult from final user transcript when consult policy is always", async () => {
let callbacks:
| {
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const sendUserMessage = vi.fn();
const bridge = makeBridge({ sendUserMessage });
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
callbacks = request;
return bridge;
},
);
const handler = makeHandler(
{ consultPolicy: "always" },
{
manager: {
getCallByProviderCallId: vi.fn(
(): CallRecord => ({
callId: "call-1",
providerCallId: "CA-force",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
}),
),
},
realtimeProvider: makeRealtimeProvider(createBridge),
},
);
const consult = vi.fn(async () => ({ text: "I created the smoke test file." }));
handler.registerToolHandler("openclaw_agent_consult", consult);
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-force", callSid: "CA-force" },
}),
);
await vi.waitFor(() => {
expect(createBridge).toHaveBeenCalled();
});
callbacks?.onTranscript?.("user", "Create a smoke test file for me.", true);
await vi.waitFor(() => {
expect(consult).toHaveBeenCalledWith(
expect.objectContaining({
question: "Create a smoke test file for me.",
}),
"call-1",
{},
);
});
await vi.waitFor(() => {
expect(sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("I created the smoke test file."),
);
});
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("does not carry a final transcript into the next direct voice turn", async () => {
let callbacks:
| {
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const processEvent = vi.fn();
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
callbacks = request;
return makeBridge();
},
);
const handler = makeHandler(undefined, {
manager: {
processEvent,
getCallByProviderCallId: vi.fn(
(): CallRecord => ({
callId: "call-1",
providerCallId: "CA-direct-turns",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
}),
),
},
realtimeProvider: makeRealtimeProvider(createBridge),
});
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-direct-turns", callSid: "CA-direct-turns" },
}),
);
await vi.waitFor(() => {
expect(createBridge).toHaveBeenCalled();
});
callbacks?.onTranscript?.("user", "Hello there.", true);
callbacks?.onTranscript?.("user", "How are you?", true);
expect(processEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: "call.speech",
transcript: "Hello there.",
}),
);
expect(processEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: "call.speech",
transcript: "How are you?",
}),
);
expect(processEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
type: "call.speech",
transcript: "Hello there. How are you?",
}),
);
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("waits for partial transcript fragments to settle before consulting", async () => {
let callbacks:
| {
onToolCall?: (event: RealtimeVoiceToolCallEvent) => void;
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const submitToolResult = vi.fn();
const bridge = makeBridge({
supportsToolResultContinuation: true,
submitToolResult,
});
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
callbacks = request;
return bridge;
},
);
const handler = makeHandler(undefined, {
manager: {
getCallByProviderCallId: vi.fn(
(): CallRecord => ({
callId: "call-1",
providerCallId: "CA-settle",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
}),
),
},
realtimeProvider: makeRealtimeProvider(createBridge),
});
const consult = vi.fn(async () => ({ text: "I sent it." }));
handler.registerToolHandler("openclaw_agent_consult", consult);
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-settle", callSid: "CA-settle" },
}),
);
await vi.waitFor(() => {
expect(createBridge).toHaveBeenCalled();
});
callbacks?.onTranscript?.("user", "Send a Discord", false);
callbacks?.onToolCall?.({
itemId: "item-1",
callId: "consult-call",
name: "openclaw_agent_consult",
args: { question: "message" },
});
await new Promise((resolve) => setTimeout(resolve, 50));
callbacks?.onTranscript?.("user", "message.", false);
await vi.waitFor(
() => {
expect(consult).toHaveBeenCalledWith(
expect.objectContaining({
question: "Send a Discord message.",
context: expect.stringContaining("shorter consult question: message"),
}),
"call-1",
{ partialUserTranscript: "Send a Discord message." },
);
},
{ timeout: 2_000 },
);
await vi.waitFor(() => {
expect(submitToolResult).toHaveBeenLastCalledWith(
"consult-call",
{ text: "I sent it." },
undefined,
);
});
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("does not force a duplicate consult when the realtime provider calls the consult tool", async () => {
let callbacks:
| {
onToolCall?: (event: RealtimeVoiceToolCallEvent) => void;
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const submitToolResult = vi.fn();
const bridge = makeBridge({
supportsToolResultContinuation: true,
submitToolResult,
});
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
callbacks = request;
return bridge;
},
);
const handler = makeHandler(
{ consultPolicy: "always" },
{
manager: {
getCallByProviderCallId: vi.fn(
(): CallRecord => ({
callId: "call-1",
providerCallId: "CA-native",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
}),
),
},
realtimeProvider: makeRealtimeProvider(createBridge),
},
);
const consult = vi.fn(async () => ({ text: "Native consult result." }));
handler.registerToolHandler("openclaw_agent_consult", consult);
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-native", callSid: "CA-native" },
}),
);
await vi.waitFor(() => {
expect(createBridge).toHaveBeenCalled();
});
callbacks?.onTranscript?.("user", "Send me a Discord message.", true);
callbacks?.onToolCall?.({
itemId: "item-1",
callId: "consult-call",
name: "openclaw_agent_consult",
args: { question: "Send me a Discord message." },
});
await vi.waitFor(() => {
expect(submitToolResult).toHaveBeenLastCalledWith(
"consult-call",
{ text: "Native consult result." },
undefined,
);
});
await new Promise((resolve) => setTimeout(resolve, 250));
expect(consult).toHaveBeenCalledTimes(1);
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("does not submit an interim checking result when fast context is enabled", async () => {
let callbacks:
| {

View File

@@ -34,6 +34,13 @@ const STREAM_TOKEN_TTL_MS = 30_000;
const DEFAULT_HOST = "localhost:8443";
const MAX_REALTIME_MESSAGE_BYTES = 256 * 1024;
const MAX_REALTIME_WS_BUFFERED_BYTES = 1024 * 1024;
const FORCED_CONSULT_FALLBACK_DELAY_MS = 200;
const FORCED_CONSULT_NATIVE_DEDUPE_MS = 2_000;
const FORCED_CONSULT_RESULT_MAX_CHARS = 1800;
const CONSULT_TRANSCRIPT_SETTLE_MS = 350;
const CONSULT_TRANSCRIPT_SETTLE_MAX_MS = 1_000;
const MAX_PARTIAL_USER_TRANSCRIPT_CHARS = 1_200;
const RECENT_FINAL_USER_TRANSCRIPT_TTL_MS = 2_000;
function normalizePath(pathname: string): string {
const trimmed = pathname.trim();
@@ -62,6 +69,147 @@ function buildGreetingInstructions(
: `${intro} "${trimmedGreeting}"`;
}
function readSpeakableToolResultText(result: unknown): string | undefined {
if (typeof result === "string") {
return result.trim() || undefined;
}
if (!result || typeof result !== "object" || Array.isArray(result)) {
return undefined;
}
const text = (result as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
return text.trim();
}
const output = (result as { output?: unknown }).output;
return typeof output === "string" && output.trim() ? output.trim() : undefined;
}
function readConsultArgText(args: unknown, key: string): string | undefined {
if (!args || typeof args !== "object" || Array.isArray(args)) {
return undefined;
}
const value = (args as Record<string, unknown>)[key];
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function readConsultQuestionText(args: unknown): string | undefined {
return (
readConsultArgText(args, "question") ??
readConsultArgText(args, "prompt") ??
readConsultArgText(args, "query") ??
readConsultArgText(args, "task")
);
}
function normalizeTranscriptText(text: string): string {
return text.replace(/\s+/g, " ").trim();
}
function findTextOverlap(base: string, next: string): number {
const max = Math.min(base.length, next.length);
for (let size = max; size > 0; size -= 1) {
if (base.slice(-size) === next.slice(0, size)) {
return size;
}
}
return 0;
}
function shouldInsertTranscriptSpace(base: string, next: string): boolean {
if (!base || !next) {
return false;
}
const last = base.at(-1);
if (
/\s$/.test(base) ||
last === "(" ||
last === "[" ||
last === "{" ||
last === '"' ||
last === "'" ||
/^[\s,.;:!?)]/.test(next)
) {
return false;
}
return true;
}
function appendTranscriptText(base: string | undefined, fragment: string): string {
const next = normalizeTranscriptText(fragment);
if (!next) {
return base ?? "";
}
const current = normalizeTranscriptText(base ?? "");
if (!current) {
return next;
}
const currentLower = current.toLowerCase();
const nextLower = next.toLowerCase();
if (currentLower === nextLower || currentLower.endsWith(nextLower)) {
return current;
}
if (nextLower.startsWith(currentLower)) {
return next;
}
const overlap = findTextOverlap(currentLower, nextLower);
if (overlap >= 6 || (overlap >= 3 && next.length <= 12)) {
return `${current}${next.slice(overlap)}`.trim();
}
const separator = shouldInsertTranscriptSpace(current, next) ? " " : "";
return `${current}${separator}${next}`.trim();
}
function limitPartialUserTranscript(text: string): string {
if (text.length <= MAX_PARTIAL_USER_TRANSCRIPT_CHARS) {
return text;
}
const tail = text.slice(-MAX_PARTIAL_USER_TRANSCRIPT_CHARS);
return tail.replace(/^\S+\s+/, "").trimStart() || tail.trimStart();
}
function withFallbackConsultQuestion(args: unknown, fallback: string | undefined): unknown {
const providerQuestion = readConsultQuestionText(args);
const question = fallback?.trim();
if (providerQuestion) {
if (
question &&
providerQuestion.length <= 40 &&
question.length >= providerQuestion.length + 8
) {
const context = readConsultArgText(args, "context");
const fallbackContext = `Realtime provider supplied a shorter consult question: ${providerQuestion}`;
return args && typeof args === "object" && !Array.isArray(args)
? {
...args,
question,
context: context ? `${context}\n\n${fallbackContext}` : fallbackContext,
}
: { question, context: fallbackContext };
}
return args;
}
if (!question) {
return args;
}
return args && typeof args === "object" && !Array.isArray(args)
? { ...args, question }
: { question };
}
function buildForcedConsultSpeechPrompt(result: string): string {
const trimmed = result.trim();
const bounded =
trimmed.length <= FORCED_CONSULT_RESULT_MAX_CHARS
? trimmed
: `${trimmed.slice(0, FORCED_CONSULT_RESULT_MAX_CHARS - 16).trimEnd()} [truncated]`;
return [
"Internal OpenClaw consult result is ready.",
"Do not call tools for this internal result.",
"Speak the following answer to the caller now, briefly and naturally:",
bounded,
].join("\n");
}
type PendingStreamToken = {
expiry: number;
from?: string;
@@ -81,11 +229,40 @@ type RealtimeSpeakResult = {
error?: string;
};
type ForcedConsultState = {
promise: Promise<unknown>;
sendSpeechPrompt: boolean;
completedAt?: number;
};
type NativeConsultState = {
startedAt: number;
promise: Promise<unknown>;
partialUserTranscript?: string;
};
type TelephonyCloseReason = "completed" | "error";
export class RealtimeCallHandler {
private readonly toolHandlers = new Map<string, ToolHandlerFn>();
private readonly pendingStreamTokens = new Map<string, PendingStreamToken>();
private readonly activeBridgesByCallId = new Map<string, ActiveRealtimeVoiceBridge>();
private readonly activeTelephonyClosersByCallId = new Map<
string,
(reason: TelephonyCloseReason) => void
>();
private readonly partialUserTranscriptsByCallId = new Map<string, string>();
private readonly partialUserTranscriptUpdatedAtByCallId = new Map<string, number>();
private readonly recentFinalUserTranscriptsByCallId = new Map<string, string>();
private readonly recentFinalUserTranscriptTimersByCallId = new Map<
string,
ReturnType<typeof setTimeout>
>();
private readonly forcedConsultTimersByCallId = new Map<string, ReturnType<typeof setTimeout>>();
private readonly forcedConsultInFlightByCallId = new Set<string>();
private readonly forcedConsultsByCallId = new Map<string, ForcedConsultState>();
private readonly lastProviderConsultAtByCallId = new Map<string, number>();
private readonly nativeConsultsInFlightByCallId = new Map<string, NativeConsultState>();
private publicOrigin: string | null = null;
private publicPathPrefix = "";
@@ -156,6 +333,10 @@ export class RealtimeCallHandler {
wss.handleUpgrade(request, socket, head, (ws) => {
let bridge: ActiveRealtimeVoiceBridge | null = null;
let initialized = false;
let activeCallSid = "unknown";
let stopReceived = false;
let lastMediaTimestamp: number | undefined;
let lastMediaGapWarnAt = 0;
ws.on("message", (data: Buffer) => {
try {
@@ -169,6 +350,7 @@ export class RealtimeCallHandler {
const streamSid =
typeof startData?.streamSid === "string" ? startData.streamSid : "unknown";
const callSid = typeof startData?.callSid === "string" ? startData.callSid : "unknown";
activeCallSid = callSid;
const nextBridge = this.handleCall(streamSid, callSid, ws, callerMeta);
if (!nextBridge) {
return;
@@ -186,10 +368,25 @@ export class RealtimeCallHandler {
if (msg.event === "media" && typeof mediaData?.payload === "string") {
const audio = Buffer.from(mediaData.payload, "base64");
bridge.sendAudio(audio);
if (typeof mediaData.timestamp === "number") {
bridge.setMediaTimestamp(mediaData.timestamp);
} else if (typeof mediaData.timestamp === "string") {
bridge.setMediaTimestamp(Number.parseInt(mediaData.timestamp, 10));
const mediaTimestamp =
typeof mediaData.timestamp === "number"
? mediaData.timestamp
: typeof mediaData.timestamp === "string"
? Number.parseInt(mediaData.timestamp, 10)
: Number.NaN;
if (Number.isFinite(mediaTimestamp)) {
if (lastMediaTimestamp !== undefined) {
const gapMs = mediaTimestamp - lastMediaTimestamp;
const now = Date.now();
if ((gapMs > 120 || gapMs < 0) && now - lastMediaGapWarnAt > 5_000) {
lastMediaGapWarnAt = now;
console.warn(
`[voice-call] realtime media timestamp gap providerCallId=${activeCallSid} gapMs=${gapMs} timestamp=${mediaTimestamp}`,
);
}
}
lastMediaTimestamp = mediaTimestamp;
bridge.setMediaTimestamp(mediaTimestamp);
}
return;
}
@@ -198,15 +395,17 @@ export class RealtimeCallHandler {
return;
}
if (msg.event === "stop") {
bridge.close();
stopReceived = true;
this.closeTelephonyBridge(activeCallSid, bridge, "completed");
}
} catch (error) {
console.error("[voice-call] realtime WS parse failed:", error);
}
});
ws.on("close", () => {
bridge?.close();
ws.on("close", (code) => {
const reason = stopReceived || code === 1000 || code === 1005 ? "completed" : "error";
this.closeTelephonyBridge(activeCallSid, bridge, reason);
});
ws.on("error", (error) => {
@@ -289,11 +488,17 @@ export class RealtimeCallHandler {
return false;
}
if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) {
console.warn(
`[voice-call] realtime outbound websocket backpressure before send callId=${callId} providerCallId=${callSid} bufferedBytes=${ws.bufferedAmount}`,
);
ws.close(1013, "Backpressure: send buffer exceeded");
return false;
}
ws.send(JSON.stringify(message));
if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) {
console.warn(
`[voice-call] realtime outbound websocket backpressure after send callId=${callId} providerCallId=${callSid} bufferedBytes=${ws.bufferedAmount}`,
);
ws.close(1013, "Backpressure: send buffer exceeded");
return false;
}
@@ -303,6 +508,9 @@ export class RealtimeCallHandler {
streamSid,
sendJson,
onBackpressure: () => {
console.warn(
`[voice-call] realtime paced audio backpressure callId=${callId} providerCallId=${callSid}`,
);
if (ws.readyState === WebSocket.OPEN) {
ws.close(1013, "Backpressure: paced audio queue exceeded");
}
@@ -322,7 +530,10 @@ export class RealtimeCallHandler {
audioPacer.sendAudio(muLaw);
},
clearAudio: () => {
audioPacer.clearAudio();
const clearedBytes = audioPacer.clearAudio();
console.log(
`[voice-call] realtime outbound audio clear requested callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`,
);
},
sendMark: (markName) => {
audioPacer.sendMark(markName);
@@ -331,22 +542,42 @@ export class RealtimeCallHandler {
onTranscript: (role, text, isFinal) => {
if (!isFinal) {
if (role === "user" && text.trim()) {
this.partialUserTranscriptsByCallId.set(callId, text);
const transcript = this.recordPartialUserTranscript(callId, text);
console.log(
`[voice-call] realtime input transcript callId=${callId} providerCallId=${callSid} final=false chars=${text.trim().length} aggregateChars=${transcript.length}`,
);
}
return;
}
if (role === "user") {
this.partialUserTranscriptsByCallId.delete(callId);
const transcript = this.recordPartialUserTranscript(callId, text);
this.clearPartialUserTranscript(callId);
this.setRecentFinalUserTranscript(callId, transcript);
console.log(
`[voice-call] realtime input transcript callId=${callId} providerCallId=${callSid} final=true chars=${text.trim().length} aggregateChars=${transcript.length}`,
);
const event: NormalizedEvent = {
id: `realtime-speech-${callSid}-${Date.now()}`,
type: "call.speech",
callId,
providerCallId: callSid,
timestamp: Date.now(),
transcript: text,
transcript,
isFinal: true,
};
this.manager.processEvent(event);
this.scheduleForcedAgentConsult({
session,
callId,
callSid,
transcript,
clearAudio: () => {
const clearedBytes = audioPacer.clearAudio();
console.log(
`[voice-call] realtime forced consult cleared outbound audio callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`,
);
},
});
return;
}
this.manager.processEvent({
@@ -359,6 +590,9 @@ export class RealtimeCallHandler {
});
},
onToolCall: (toolEvent, session) => {
console.log(
`[voice-call] realtime tool call received callId=${callId} providerCallId=${callSid} tool=${toolEvent.name}`,
);
void this.executeToolCall(
session,
callId,
@@ -373,9 +607,10 @@ export class RealtimeCallHandler {
onClose: (reason) => {
this.activeBridgesByCallId.delete(callId);
this.activeBridgesByCallId.delete(callSid);
this.partialUserTranscriptsByCallId.delete(callId);
this.activeTelephonyClosersByCallId.delete(callId);
this.activeTelephonyClosersByCallId.delete(callSid);
this.clearUserTranscriptState(callId);
if (reason !== "error") {
emitCallEnd("completed");
return;
}
emitCallEnd("error");
@@ -393,12 +628,21 @@ export class RealtimeCallHandler {
});
},
});
const closeTelephony = (reason: TelephonyCloseReason) => {
emitCallEnd(reason);
session.close();
};
this.activeBridgesByCallId.set(callId, session);
this.activeBridgesByCallId.set(callSid, session);
this.activeTelephonyClosersByCallId.set(callId, closeTelephony);
this.activeTelephonyClosersByCallId.set(callSid, closeTelephony);
const sendAudioToSession = session.sendAudio.bind(session);
session.sendAudio = (audio) => {
if (speechDetector.accept(audio)) {
audioPacer.clearAudio();
const clearedBytes = audioPacer.clearAudio();
console.log(
`[voice-call] realtime outbound audio cleared by barge-in callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`,
);
}
sendAudioToSession(audio);
};
@@ -406,7 +650,10 @@ export class RealtimeCallHandler {
session.close = () => {
this.activeBridgesByCallId.delete(callId);
this.activeBridgesByCallId.delete(callSid);
this.partialUserTranscriptsByCallId.delete(callId);
this.activeTelephonyClosersByCallId.delete(callId);
this.activeTelephonyClosersByCallId.delete(callSid);
this.clearUserTranscriptState(callId);
this.clearForcedConsultState(callId);
audioPacer.close();
closeSession();
};
@@ -421,6 +668,227 @@ export class RealtimeCallHandler {
return session;
}
private recordPartialUserTranscript(callId: string, text: string): string {
const current = this.partialUserTranscriptsByCallId.get(callId);
const next = limitPartialUserTranscript(appendTranscriptText(current, text));
this.partialUserTranscriptsByCallId.set(callId, next);
this.partialUserTranscriptUpdatedAtByCallId.set(callId, Date.now());
return next;
}
private clearPartialUserTranscript(callId: string): void {
this.partialUserTranscriptsByCallId.delete(callId);
this.partialUserTranscriptUpdatedAtByCallId.delete(callId);
}
private setRecentFinalUserTranscript(callId: string, text: string): void {
this.clearRecentFinalUserTranscript(callId);
this.recentFinalUserTranscriptsByCallId.set(callId, text);
const timer = setTimeout(() => {
if (this.recentFinalUserTranscriptsByCallId.get(callId) === text) {
this.recentFinalUserTranscriptsByCallId.delete(callId);
}
this.recentFinalUserTranscriptTimersByCallId.delete(callId);
}, RECENT_FINAL_USER_TRANSCRIPT_TTL_MS);
timer.unref?.();
this.recentFinalUserTranscriptTimersByCallId.set(callId, timer);
}
private clearRecentFinalUserTranscript(callId: string): void {
const timer = this.recentFinalUserTranscriptTimersByCallId.get(callId);
if (timer) {
clearTimeout(timer);
this.recentFinalUserTranscriptTimersByCallId.delete(callId);
}
this.recentFinalUserTranscriptsByCallId.delete(callId);
}
private clearUserTranscriptState(callId: string): void {
this.clearPartialUserTranscript(callId);
this.clearRecentFinalUserTranscript(callId);
}
private resolveUserTranscriptContext(callId: string): string | undefined {
return (
this.partialUserTranscriptsByCallId.get(callId) ??
this.recentFinalUserTranscriptsByCallId.get(callId)
);
}
private consumePartialUserTranscript(callId: string, consumed: string | undefined): void {
const text = consumed?.trim();
if (!text) {
return;
}
const current = this.partialUserTranscriptsByCallId.get(callId);
if (!current) {
return;
}
if (current === text) {
this.clearPartialUserTranscript(callId);
return;
}
if (current.toLowerCase().startsWith(text.toLowerCase())) {
const remaining = current.slice(text.length).trimStart();
if (remaining) {
this.partialUserTranscriptsByCallId.set(callId, remaining);
} else {
this.clearPartialUserTranscript(callId);
}
}
const recent = this.recentFinalUserTranscriptsByCallId.get(callId);
if (!recent) {
return;
}
if (recent === text || recent.toLowerCase().startsWith(text.toLowerCase())) {
this.clearRecentFinalUserTranscript(callId);
}
}
private async waitForConsultTranscriptSettle(callId: string, startedAt: number): Promise<void> {
const deadline = startedAt + CONSULT_TRANSCRIPT_SETTLE_MAX_MS;
while (true) {
const updatedAt = this.partialUserTranscriptUpdatedAtByCallId.get(callId);
if (!updatedAt) {
return;
}
const now = Date.now();
const quietFor = now - updatedAt;
if (quietFor >= CONSULT_TRANSCRIPT_SETTLE_MS || now >= deadline) {
return;
}
await new Promise((resolve) =>
setTimeout(resolve, Math.min(CONSULT_TRANSCRIPT_SETTLE_MS - quietFor, deadline - now)),
);
}
}
private clearForcedConsultState(callId: string): void {
const timer = this.forcedConsultTimersByCallId.get(callId);
if (timer) {
clearTimeout(timer);
this.forcedConsultTimersByCallId.delete(callId);
}
this.forcedConsultInFlightByCallId.delete(callId);
this.forcedConsultsByCallId.delete(callId);
this.lastProviderConsultAtByCallId.delete(callId);
}
private closeTelephonyBridge(
callIdOrSid: string,
bridge: ActiveRealtimeVoiceBridge | null,
reason: TelephonyCloseReason,
): void {
const closer = this.activeTelephonyClosersByCallId.get(callIdOrSid);
if (closer) {
closer(reason);
return;
}
bridge?.close();
}
private scheduleForcedAgentConsult(params: {
session: ActiveRealtimeVoiceBridge;
callId: string;
callSid: string;
transcript: string;
clearAudio: () => void;
}): void {
if (this.config.consultPolicy !== "always") {
return;
}
const question = params.transcript.trim();
if (!question) {
return;
}
const handler = this.toolHandlers.get(REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME);
if (!handler) {
return;
}
const existingTimer = this.forcedConsultTimersByCallId.get(params.callId);
if (existingTimer) {
clearTimeout(existingTimer);
}
const timer = setTimeout(() => {
this.forcedConsultTimersByCallId.delete(params.callId);
if (this.forcedConsultInFlightByCallId.has(params.callId)) {
return;
}
const lastProviderConsultAt = this.lastProviderConsultAtByCallId.get(params.callId) ?? 0;
if (Date.now() - lastProviderConsultAt < 2_000) {
return;
}
void this.runForcedAgentConsult({
...params,
question,
handler,
});
}, FORCED_CONSULT_FALLBACK_DELAY_MS);
this.forcedConsultTimersByCallId.set(params.callId, timer);
}
private async runForcedAgentConsult(params: {
session: ActiveRealtimeVoiceBridge;
callId: string;
callSid: string;
question: string;
clearAudio: () => void;
handler: ToolHandlerFn;
}): Promise<void> {
this.forcedConsultInFlightByCallId.add(params.callId);
const startedAt = Date.now();
console.log(
`[voice-call] realtime forced agent consult starting callId=${params.callId} providerCallId=${params.callSid} chars=${params.question.length}`,
);
params.clearAudio();
const state: ForcedConsultState = {
sendSpeechPrompt: true,
promise: Promise.resolve().then(() =>
params.handler(
{
question: params.question,
context:
"The realtime provider produced a final user transcript without invoking openclaw_agent_consult, so OpenClaw is forcing the consult because consultPolicy is always.",
},
params.callId,
{},
),
),
};
this.forcedConsultsByCallId.set(params.callId, state);
try {
const result = await state.promise;
state.completedAt = Date.now();
const text = readSpeakableToolResultText(result);
if (!text) {
console.warn(
`[voice-call] realtime forced agent consult returned no speakable text callId=${params.callId} providerCallId=${params.callSid}`,
);
return;
}
if (state.sendSpeechPrompt) {
params.clearAudio();
params.session.sendUserMessage(buildForcedConsultSpeechPrompt(text));
}
console.log(
`[voice-call] realtime forced agent consult completed callId=${params.callId} providerCallId=${params.callSid} elapsedMs=${Date.now() - startedAt}`,
);
this.consumePartialUserTranscript(params.callId, params.question);
} catch (error) {
console.warn(
`[voice-call] realtime forced agent consult failed callId=${params.callId} providerCallId=${params.callSid} error=${formatErrorMessage(error)}`,
);
} finally {
this.forcedConsultInFlightByCallId.delete(params.callId);
const cleanupTimer = setTimeout(() => {
if (this.forcedConsultsByCallId.get(params.callId) === state) {
this.forcedConsultsByCallId.delete(params.callId);
}
}, FORCED_CONSULT_NATIVE_DEDUPE_MS);
cleanupTimer.unref?.();
}
}
private registerCallInManager(
callSid: string,
callerMeta: Omit<PendingStreamToken, "expiry"> = {},
@@ -495,25 +963,130 @@ export class RealtimeCallHandler {
args: unknown,
): Promise<void> {
const handler = this.toolHandlers.get(name);
if (
handler &&
name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME &&
bridge.bridge.supportsToolResultContinuation &&
!this.config.fastContext.enabled
) {
bridge.submitToolResult(
bridgeCallId,
buildRealtimeVoiceAgentConsultWorkingResponse("caller"),
{ willContinue: true },
);
}
const result = !handler
? { error: `Tool "${name}" not available` }
: await handler(args, callId, {
partialUserTranscript: this.partialUserTranscriptsByCallId.get(callId),
}).catch((error: unknown) => ({
const startedAt = Date.now();
if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) {
this.lastProviderConsultAtByCallId.set(callId, Date.now());
const timer = this.forcedConsultTimersByCallId.get(callId);
if (timer) {
clearTimeout(timer);
this.forcedConsultTimersByCallId.delete(callId);
}
const forcedConsult = this.forcedConsultsByCallId.get(callId);
if (forcedConsult) {
if (forcedConsult.completedAt) {
bridge.submitToolResult(bridgeCallId, {
status: "already_delivered",
message: "OpenClaw already delivered this consult result internally. Do not repeat it.",
});
return;
}
forcedConsult.sendSpeechPrompt = false;
const result = await forcedConsult.promise.catch((error: unknown) => ({
error: formatErrorMessage(error),
}));
bridge.submitToolResult(bridgeCallId, result);
return;
}
const submitWorkingResponse = () => {
if (
handler &&
bridge.bridge.supportsToolResultContinuation &&
!this.config.fastContext.enabled
) {
bridge.submitToolResult(
bridgeCallId,
buildRealtimeVoiceAgentConsultWorkingResponse("caller"),
{ willContinue: true },
);
}
};
const existingNativeConsult = this.nativeConsultsInFlightByCallId.get(callId);
if (existingNativeConsult) {
console.log(
`[voice-call] realtime tool call sharing in-flight agent consult callId=${callId} ageMs=${Date.now() - existingNativeConsult.startedAt}`,
);
submitWorkingResponse();
bridge.submitToolResult(bridgeCallId, await existingNativeConsult.promise);
return;
}
submitWorkingResponse();
const state: NativeConsultState = {
startedAt,
promise: Promise.resolve(),
};
state.promise = (async () => {
await this.waitForConsultTranscriptSettle(callId, startedAt);
const context = {
partialUserTranscript: this.resolveUserTranscriptContext(callId),
};
state.partialUserTranscript = context.partialUserTranscript;
const handlerArgs = withFallbackConsultQuestion(args, context.partialUserTranscript);
console.log(
`[voice-call] realtime tool call executing callId=${callId} tool=${name} hasHandler=${Boolean(handler)}`,
);
return !handler
? { error: `Tool "${name}" not available` }
: await handler(handlerArgs, callId, context);
})().catch((error: unknown) => ({
error: formatErrorMessage(error),
}));
this.nativeConsultsInFlightByCallId.set(callId, state);
try {
const result = await state.promise;
const status =
result && typeof result === "object" && !Array.isArray(result) && "error" in result
? "error"
: "ok";
const error =
status === "error" && result && typeof result === "object" && !Array.isArray(result)
? formatErrorMessage((result as { error?: unknown }).error ?? "unknown")
: undefined;
console.log(
`[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`,
);
bridge.submitToolResult(bridgeCallId, result);
if (status === "ok") {
this.consumePartialUserTranscript(callId, state.partialUserTranscript);
}
} finally {
if (this.nativeConsultsInFlightByCallId.get(callId) === state) {
this.nativeConsultsInFlightByCallId.delete(callId);
}
}
return;
}
console.log(
`[voice-call] realtime tool call executing callId=${callId} tool=${name} hasHandler=${Boolean(handler)}`,
);
const context = {
partialUserTranscript: this.resolveUserTranscriptContext(callId),
};
const handlerArgs =
name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME
? withFallbackConsultQuestion(args, context.partialUserTranscript)
: args;
const result = !handler
? { error: `Tool "${name}" not available` }
: await handler(handlerArgs, callId, context).catch((error: unknown) => ({
error: formatErrorMessage(error),
}));
const status =
result && typeof result === "object" && !Array.isArray(result) && "error" in result
? "error"
: "ok";
const error =
status === "error" && result && typeof result === "object" && !Array.isArray(result)
? formatErrorMessage((result as { error?: unknown }).error ?? "unknown")
: undefined;
console.log(
`[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`,
);
bridge.submitToolResult(bridgeCallId, result);
if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME && status === "ok") {
this.consumePartialUserTranscript(callId, context.partialUserTranscript);
}
}
}