diff --git a/CHANGELOG.md b/CHANGELOG.md index ecac0dd5ecc..242399626a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,10 @@ Docs: https://docs.openclaw.ai ## Unreleased -### Highlights - -- Google Meet/Voice Call: make Twilio dial-in joins speak through the realtime Gemini voice bridge with paced audio streaming, backpressure-aware buffering, barge-in queue clearing, and no TwiML fallback during realtime speech, giving Meet participants a much snappier OpenClaw voice agent. (#77064) Thanks @scoootscooob. - ### Changes +- Google Meet/Voice Call: make Twilio dial-in joins speak through the realtime Gemini voice bridge with paced audio streaming, backpressure-aware buffering, barge-in queue clearing, same-session agent consult routing, duplicate-consult coalescing, and no TwiML fallback during realtime speech, giving Meet participants a much snappier OpenClaw voice agent. (#77064) Thanks @scoootscooob. +- Voice Call/realtime: add opt-in OpenClaw agent voice context capsules and consult-cadence guidance so Gemini/OpenAI realtime calls can sound like the configured agent without consulting the full agent on every ordinary turn. Thanks @scoootscooob. - Docker/Gateway: harden the gateway container by dropping `NET_RAW` and `NET_ADMIN` capabilities and enabling `no-new-privileges` in the bundled `docker-compose.yml`. Thanks @VintageAyu. - Telegram: accept plugin-owned numeric forum-topic targets in the agent message tool and keep reply-dispatch provider chunks behind a real stable runtime alias during in-place package updates. Fixes #77137. Thanks @richardmqq. - Channels/WhatsApp: support explicit WhatsApp Channel/Newsletter `@newsletter` outbound message targets with channel session metadata instead of DM routing. Fixes #13417; carries forward the narrow outbound target idea from #13424. Thanks @vincentkoc and @agentz-manfred. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 309743143e4..69c6cbcc458 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -657060e80f3dc4b7d992e8625d2a8b0ff9b1b408960148d3f5f6a381d602359a config-baseline.json +c93176f87a1e4576f5951b82037394c4bc9628bb6e056b6b24f96e662d6d636c config-baseline.json 92cbb12ca382f7424e7bd52df21798b10a57621f5c266909fa74e23f6cb973d7 config-baseline.core.json cd7c0c7fb1435bc7e59099e9ac334462d5ad444016e9ab4512aae63a238f78dc config-baseline.channel.json -9832b30a696930a3da7efccf38073137571e1b66cae84e54d747b733fdafcc54 config-baseline.plugin.json +6871e789b74722e4ff2c877940dac256c232433ae26b305fc6ca782b90662097 config-baseline.plugin.json diff --git a/docs/plugins/voice-call.md b/docs/plugins/voice-call.md index 04222a7d177..00ea1c74e8a 100644 --- a/docs/plugins/voice-call.md +++ b/docs/plugins/voice-call.md @@ -229,6 +229,8 @@ Current runtime behaviour: - Bundled realtime voice providers: Google Gemini Live (`google`) and OpenAI (`openai`), registered by their provider plugins. - Provider-owned raw config lives under `realtime.providers.`. - Voice Call exposes the shared `openclaw_agent_consult` realtime tool by default. The realtime model can call it when the caller asks for deeper reasoning, current information, or normal OpenClaw tools. +- `realtime.consultPolicy` optionally adds guidance for when the realtime model should call `openclaw_agent_consult`. +- `realtime.agentContext.enabled` is default-off. When enabled, Voice Call injects a bounded agent identity, system prompt override, and selected workspace-file capsule into the realtime provider instructions at session setup. - `realtime.fastContext.enabled` is default-off. When enabled, Voice Call first searches indexed memory/session context for the consult question and returns those snippets to the realtime model within `realtime.fastContext.timeoutMs` before falling back to the full consult agent only if `realtime.fastContext.fallbackToConsult` is true. - If `realtime.provider` points at an unregistered provider, or no realtime voice provider is registered at all, Voice Call logs a warning and skips realtime media instead of failing the whole plugin. - Consult session keys reuse the stored call session when available, then fall back to the configured `sessionScope` (`per-phone` by default, or `per-call` for isolated calls). @@ -243,6 +245,51 @@ Current runtime behaviour: | `owner` | Expose the consult tool and let the regular agent use the normal agent tool policy. | | `none` | Do not expose the consult tool. Custom `realtime.tools` are still passed through to the realtime provider. | +`realtime.consultPolicy` controls only the realtime model instructions: + +| Policy | Guidance | +| ------------- | ----------------------------------------------------------------------------------------------- | +| `auto` | Keep the default prompt and let the provider decide when to call the consult tool. | +| `substantive` | Answer simple conversational glue directly and consult before facts, memory, tools, or context. | +| `always` | Consult before every substantive answer. | + +### Agent voice context + +Enable `realtime.agentContext` when the voice bridge should sound like the +configured OpenClaw agent without paying a full agent-consult round trip on +ordinary turns. The context capsule is added once when the realtime session is +created, so it does not add per-turn latency. Calls to +`openclaw_agent_consult` still run the full OpenClaw agent and should be used +for tool work, current information, memory lookups, or workspace state. + +```json5 +{ + plugins: { + entries: { + "voice-call": { + config: { + agentId: "main", + realtime: { + enabled: true, + provider: "google", + toolPolicy: "safe-read-only", + consultPolicy: "substantive", + agentContext: { + enabled: true, + maxChars: 6000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "USER.md"], + }, + }, + }, + }, + }, + }, +} +``` + ### Realtime provider examples @@ -268,6 +315,8 @@ Current runtime behaviour: provider: "google", instructions: "Speak briefly. Call openclaw_agent_consult before using deeper tools.", toolPolicy: "safe-read-only", + consultPolicy: "substantive", + agentContext: { enabled: true }, providers: { google: { apiKey: "${GEMINI_API_KEY}", diff --git a/extensions/google-meet/index.test.ts b/extensions/google-meet/index.test.ts index 966b7aa7aaf..1a0b1337484 100644 --- a/extensions/google-meet/index.test.ts +++ b/extensions/google-meet/index.test.ts @@ -1256,6 +1256,7 @@ describe("google-meet plugin", () => { dtmfSequence: "123456#", logger: expect.objectContaining({ info: expect.any(Function) }), message: "Say exactly: I'm here and listening.", + sessionKey: expect.stringMatching(/^voice:google-meet:meet_/), }); }); diff --git a/extensions/google-meet/src/runtime.ts b/extensions/google-meet/src/runtime.ts index 1bfb90fa1c1..80de8432282 100644 --- a/extensions/google-meet/src/runtime.ts +++ b/extensions/google-meet/src/runtime.ts @@ -41,6 +41,10 @@ function nowIso(): string { return new Date().toISOString(); } +function buildTwilioVoiceCallSessionKey(meetingSessionId: string): string { + return `voice:google-meet:${meetingSessionId}`; +} + export function normalizeMeetUrl(input: unknown): string { const raw = normalizeOptionalString(input); if (!raw) { @@ -478,6 +482,10 @@ export class GoogleMeetRuntime { dialInNumber, dtmfSequence, logger: this.params.logger, + ...(request.requesterSessionKey + ? { requesterSessionKey: request.requesterSessionKey } + : {}), + sessionKey: buildTwilioVoiceCallSessionKey(session.id), message: isGoogleMeetTalkBackMode(mode) ? (request.message ?? this.params.config.voiceCall.introMessage ?? @@ -505,7 +513,7 @@ export class GoogleMeetRuntime { session.notes.push( this.params.config.voiceCall.enabled ? dtmfSequence - ? "Twilio transport delegated the phone leg to the voice-call plugin, then sent configured DTMF after connect before speaking." + ? "Twilio transport delegated the phone leg to the voice-call plugin, then queued configured DTMF before realtime connect." : "Twilio transport delegated the call to the voice-call plugin without configured DTMF." : "Twilio transport is an explicit dial plan; voice-call delegation is disabled.", ); diff --git a/extensions/google-meet/src/voice-call-gateway.test.ts b/extensions/google-meet/src/voice-call-gateway.test.ts index ffe810ef1d1..981c48c7d96 100644 --- a/extensions/google-meet/src/voice-call-gateway.test.ts +++ b/extensions/google-meet/src/voice-call-gateway.test.ts @@ -28,7 +28,7 @@ describe("Google Meet voice-call gateway", () => { gatewayMocks.startGatewayClientWhenEventLoopReady.mockClear(); }); - it("starts Twilio Meet calls, sends delayed DTMF, then speaks the intro without TwiML fallback", async () => { + it("starts Twilio Meet calls with pre-connect DTMF, then speaks the intro without TwiML fallback", async () => { const config = resolveGoogleMeetConfig({ voiceCall: { gatewayUrl: "ws://127.0.0.1:18789", @@ -43,6 +43,8 @@ describe("Google Meet voice-call gateway", () => { dialInNumber: "+15551234567", dtmfSequence: "123456#", message: "Say exactly: I'm here and listening.", + requesterSessionKey: "agent:main:discord:channel:general", + sessionKey: "voice:google-meet:meet-1", }); await join; @@ -53,20 +55,14 @@ describe("Google Meet voice-call gateway", () => { { to: "+15551234567", mode: "conversation", + dtmfSequence: "123456#", + requesterSessionKey: "agent:main:discord:channel:general", + sessionKey: "voice:google-meet:meet-1", }, { timeoutMs: 30_000 }, ); expect(gatewayMocks.request).toHaveBeenNthCalledWith( 2, - "voicecall.dtmf", - { - callId: "call-1", - digits: "123456#", - }, - { timeoutMs: 30_000 }, - ); - expect(gatewayMocks.request).toHaveBeenNthCalledWith( - 3, "voicecall.speak", { callId: "call-1", @@ -75,13 +71,12 @@ describe("Google Meet voice-call gateway", () => { }, { timeoutMs: 30_000 }, ); - expect(gatewayMocks.request).toHaveBeenCalledTimes(3); + expect(gatewayMocks.request).toHaveBeenCalledTimes(2); }); it("skips the intro without failing when the realtime bridge is not ready", async () => { gatewayMocks.request .mockResolvedValueOnce({ callId: "call-1" }) - .mockResolvedValueOnce({ success: true }) .mockResolvedValueOnce({ success: false, error: "No active realtime bridge for call" }); const config = resolveGoogleMeetConfig({ voiceCall: { diff --git a/extensions/google-meet/src/voice-call-gateway.ts b/extensions/google-meet/src/voice-call-gateway.ts index c39bb90c47b..2cc685f20fc 100644 --- a/extensions/google-meet/src/voice-call-gateway.ts +++ b/extensions/google-meet/src/voice-call-gateway.ts @@ -1,3 +1,4 @@ +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { GatewayClient, startGatewayClientWhenEventLoopReady, @@ -18,11 +19,6 @@ type VoiceCallSpeakResult = { error?: string; }; -type VoiceCallDtmfResult = { - success?: boolean; - error?: string; -}; - type VoiceCallMeetJoinResult = { callId: string; dtmfSent: boolean; @@ -87,19 +83,24 @@ export async function joinMeetViaVoiceCallGateway(params: { dtmfSequence?: string; logger?: RuntimeLogger; message?: string; + requesterSessionKey?: string; + sessionKey?: string; }): Promise { let client: VoiceCallGatewayClient | undefined; try { client = await createConnectedGatewayClient(params.config); params.logger?.info( - `[google-meet] Delegating Twilio join to Voice Call (dtmf=${params.dtmfSequence ? "post-connect" : "none"}, intro=${params.message ? "delayed" : "none"})`, + `[google-meet] Delegating Twilio join to Voice Call (dtmf=${params.dtmfSequence ? "pre-connect" : "none"}, intro=${params.message ? "delayed" : "none"})`, ); const start = (await client.request( "voicecall.start", { to: params.dialInNumber, mode: "conversation", + ...(params.dtmfSequence ? { dtmfSequence: params.dtmfSequence } : {}), + ...(params.requesterSessionKey ? { requesterSessionKey: params.requesterSessionKey } : {}), + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), }, { timeoutMs: params.config.voiceCall.requestTimeoutMs }, )) as VoiceCallStartResult; @@ -109,27 +110,10 @@ export async function joinMeetViaVoiceCallGateway(params: { params.logger?.info( `[google-meet] Voice Call Twilio phone leg started: callId=${start.callId}`, ); - let dtmfSent = false; - if (params.dtmfSequence) { - const delayMs = params.config.voiceCall.dtmfDelayMs; + const dtmfSent = Boolean(params.dtmfSequence); + if (dtmfSent) { params.logger?.info( - `[google-meet] Waiting ${delayMs}ms before sending Meet DTMF for callId=${start.callId}`, - ); - await sleep(delayMs); - const dtmf = (await client.request( - "voicecall.dtmf", - { - callId: start.callId, - digits: params.dtmfSequence, - }, - { timeoutMs: params.config.voiceCall.requestTimeoutMs }, - )) as VoiceCallDtmfResult; - if (dtmf.success === false) { - throw new Error(dtmf.error || "voicecall.dtmf failed"); - } - dtmfSent = true; - params.logger?.info( - `[google-meet] Meet DTMF sent after phone leg connected: callId=${start.callId} digits=${params.dtmfSequence.length}`, + `[google-meet] Meet DTMF queued before realtime connect: callId=${start.callId} digits=${params.dtmfSequence?.length ?? 0}`, ); } let introSent = false; @@ -141,15 +125,23 @@ export async function joinMeetViaVoiceCallGateway(params: { ); await sleep(delayMs); } - const spoken = (await client.request( - "voicecall.speak", - { - callId: start.callId, - allowTwimlFallback: false, - message: params.message, - }, - { timeoutMs: params.config.voiceCall.requestTimeoutMs }, - )) as VoiceCallSpeakResult; + let spoken: VoiceCallSpeakResult; + try { + spoken = (await client.request( + "voicecall.speak", + { + callId: start.callId, + allowTwimlFallback: false, + message: params.message, + }, + { timeoutMs: params.config.voiceCall.requestTimeoutMs }, + )) as VoiceCallSpeakResult; + } catch (err) { + params.logger?.warn?.( + `[google-meet] Skipped intro speech because realtime bridge was not ready: ${formatErrorMessage(err)}`, + ); + spoken = { success: false }; + } if (spoken.success === false) { params.logger?.warn?.( `[google-meet] Skipped intro speech because realtime bridge was not ready: ${ diff --git a/extensions/google/index.test.ts b/extensions/google/index.test.ts index ab76956776c..be2f20956f5 100644 --- a/extensions/google/index.test.ts +++ b/extensions/google/index.test.ts @@ -3,13 +3,16 @@ import type { ProviderReplaySessionEntry, ProviderSanitizeReplayHistoryContext, } from "openclaw/plugin-sdk/plugin-entry"; +import { createTestPluginApi } from "openclaw/plugin-sdk/plugin-test-api"; import { registerProviderPlugin, requireRegisteredProvider, } from "openclaw/plugin-sdk/plugin-test-runtime"; import { createCapturedThinkingConfigStream } from "openclaw/plugin-sdk/provider-test-contracts"; +import type { RealtimeVoiceProviderPlugin } from "openclaw/plugin-sdk/realtime-voice"; import { describe, expect, it } from "vitest"; import { registerGoogleGeminiCliProvider } from "./gemini-cli-provider.js"; +import googlePlugin from "./index.js"; import { registerGoogleProvider } from "./provider-registration.js"; const googleProviderPlugin = { @@ -226,4 +229,26 @@ describe("google provider plugin hooks", () => { expect(googleProvider.buildReplayPolicy).toBe(cliProvider.buildReplayPolicy); expect(googleProvider.wrapStreamFn).toBe(cliProvider.wrapStreamFn); }); + + it("buffers early realtime audio while the lazy Google bridge loads", () => { + let realtimeProvider: RealtimeVoiceProviderPlugin | undefined; + googlePlugin.register( + createTestPluginApi({ + registerRealtimeVoiceProvider(provider) { + realtimeProvider = provider; + }, + }), + ); + + const bridge = realtimeProvider?.createBridge({ + providerConfig: { apiKey: "gemini-key" }, + onAudio() {}, + onClearAudio() {}, + }); + + expect(bridge).toBeDefined(); + expect(() => bridge?.sendAudio(Buffer.alloc(160))).not.toThrow(); + expect(() => bridge?.setMediaTimestamp(20)).not.toThrow(); + expect(() => bridge?.sendUserMessage?.("hello")).not.toThrow(); + }); }); diff --git a/extensions/google/index.ts b/extensions/google/index.ts index 0f53f977d9f..c7ef9031163 100644 --- a/extensions/google/index.ts +++ b/extensions/google/index.ts @@ -200,11 +200,18 @@ function resolveGoogleRealtimeEnvApiKey(): string | undefined { ); } +const GOOGLE_REALTIME_LAZY_MAX_PENDING_AUDIO_CHUNKS = 320; + function createLazyGoogleRealtimeVoiceBridge( req: RealtimeVoiceBridgeCreateRequest, ): RealtimeVoiceBridge { let bridge: RealtimeVoiceBridge | undefined; let bridgePromise: Promise | undefined; + let closed = false; + let latestMediaTimestamp: number | undefined; + let pendingGreeting: string | undefined; + const pendingAudio: Buffer[] = []; + const pendingUserMessages: string[] = []; const loadBridge = async () => { if (!bridgePromise) { bridgePromise = loadGoogleRealtimeVoiceProvider().then((provider) => @@ -220,20 +227,78 @@ function createLazyGoogleRealtimeVoiceBridge( } return bridge; }; + const flushPending = (loadedBridge: RealtimeVoiceBridge) => { + if (typeof latestMediaTimestamp === "number") { + loadedBridge.setMediaTimestamp(latestMediaTimestamp); + } + for (const audio of pendingAudio.splice(0)) { + loadedBridge.sendAudio(audio); + } + for (const text of pendingUserMessages.splice(0)) { + loadedBridge.sendUserMessage?.(text); + } + if (pendingGreeting !== undefined) { + const greeting = pendingGreeting; + pendingGreeting = undefined; + loadedBridge.triggerGreeting?.(greeting); + } + }; return { supportsToolResultContinuation: true, connect: async () => { - await (await loadBridge()).connect(); + const loadedBridge = await loadBridge(); + if (closed) { + loadedBridge.close(); + return; + } + await loadedBridge.connect(); + flushPending(loadedBridge); + }, + sendAudio: (audio) => { + if (bridge) { + bridge.sendAudio(audio); + return; + } + if (!closed) { + if (pendingAudio.length >= GOOGLE_REALTIME_LAZY_MAX_PENDING_AUDIO_CHUNKS) { + pendingAudio.shift(); + } + pendingAudio.push(audio); + } + }, + setMediaTimestamp: (ts) => { + latestMediaTimestamp = ts; + bridge?.setMediaTimestamp(ts); + }, + sendUserMessage: (text) => { + if (bridge) { + bridge.sendUserMessage?.(text); + return; + } + if (!closed) { + pendingUserMessages.push(text); + } + }, + triggerGreeting: (instructions) => { + if (bridge) { + bridge.triggerGreeting?.(instructions); + return; + } + if (!closed) { + pendingGreeting = instructions; + } }, - sendAudio: (audio) => requireBridge().sendAudio(audio), - setMediaTimestamp: (ts) => requireBridge().setMediaTimestamp(ts), - sendUserMessage: (text) => requireBridge().sendUserMessage?.(text), - triggerGreeting: (instructions) => requireBridge().triggerGreeting?.(instructions), handleBargeIn: (options) => requireBridge().handleBargeIn?.(options), submitToolResult: (callId, result, options) => requireBridge().submitToolResult(callId, result, options), acknowledgeMark: () => requireBridge().acknowledgeMark(), - close: () => bridge?.close(), + close: () => { + closed = true; + pendingAudio.length = 0; + pendingUserMessages.length = 0; + pendingGreeting = undefined; + bridge?.close(); + }, isConnected: () => bridge?.isConnected() ?? false, }; } diff --git a/extensions/google/realtime-voice-provider.test.ts b/extensions/google/realtime-voice-provider.test.ts index 23b140bc341..2b8a78fc7ea 100644 --- a/extensions/google/realtime-voice-provider.test.ts +++ b/extensions/google/realtime-voice-provider.test.ts @@ -16,7 +16,7 @@ type MockGoogleLiveConnectParams = { onopen: () => void; onmessage: (message: Record) => void; onerror: (event: { error?: unknown; message?: string }) => void; - onclose: () => void; + onclose: (event?: { code?: number; reason?: string; wasClean?: boolean }) => void; }; }; @@ -352,6 +352,47 @@ describe("buildGoogleRealtimeVoiceProvider", () => { expect(lastConnectParams().config.sessionResumption).toEqual({ handle: "resume-1" }); }); + it("reconnects unexpected Google Live closes with the latest resumption handle", async () => { + vi.useFakeTimers(); + try { + const provider = buildGoogleRealtimeVoiceProvider(); + const onClose = vi.fn(); + const onError = vi.fn(); + const bridge = provider.createBridge({ + providerConfig: { apiKey: "gemini-key" }, + onAudio: vi.fn(), + onClearAudio: vi.fn(), + onClose, + onError, + }); + + await bridge.connect(); + lastConnectParams().callbacks.onmessage({ + setupComplete: { sessionId: "session-1" }, + sessionResumptionUpdate: { resumable: true, newHandle: "resume-1" }, + }); + lastConnectParams().callbacks.onclose({ + code: 1011, + reason: "temporary upstream close", + wasClean: false, + }); + + expect(onClose).not.toHaveBeenCalled(); + expect(onError).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringContaining("reconnecting 1/3"), + }), + ); + + await vi.advanceTimersByTimeAsync(250); + + expect(connectMock).toHaveBeenCalledTimes(2); + expect(lastConnectParams().config.sessionResumption).toEqual({ handle: "resume-1" }); + } finally { + vi.useRealTimers(); + } + }); + it("waits for setup completion before draining audio and firing ready", async () => { const provider = buildGoogleRealtimeVoiceProvider(); const onReady = vi.fn(); diff --git a/extensions/google/realtime-voice-provider.ts b/extensions/google/realtime-voice-provider.ts index 77bf205a203..68c90529671 100644 --- a/extensions/google/realtime-voice-provider.ts +++ b/extensions/google/realtime-voice-provider.ts @@ -50,6 +50,9 @@ const MAX_PENDING_AUDIO_CHUNKS = 320; const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 500; const GOOGLE_REALTIME_BROWSER_SESSION_TTL_MS = 30 * 60 * 1000; const GOOGLE_REALTIME_BROWSER_NEW_SESSION_TTL_MS = 60 * 1000; +const GOOGLE_REALTIME_RECONNECT_MAX_ATTEMPTS = 3; +const GOOGLE_REALTIME_RECONNECT_BASE_DELAY_MS = 250; +const GOOGLE_REALTIME_RECONNECT_MAX_DELAY_MS = 2_000; const MULAW_LINEAR_SAMPLES = new Int16Array(256); for (let i = 0; i < MULAW_LINEAR_SAMPLES.length; i += 1) { @@ -401,6 +404,24 @@ function isPcm16Silence(audio: Buffer): boolean { return true; } +function formatGoogleLiveCloseEvent( + event: + | { + code?: number; + reason?: string; + wasClean?: boolean; + } + | undefined, +): string { + if (!event) { + return "code=unknown reason=unknown"; + } + const code = typeof event.code === "number" ? event.code : "unknown"; + const reason = event.reason?.trim() || "none"; + const clean = typeof event.wasClean === "boolean" ? ` clean=${event.wasClean}` : ""; + return `code=${code} reason=${reason}${clean}`; +} + class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { readonly supportsToolResultContinuation = true; @@ -415,6 +436,8 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { private pendingFunctionNames = new Map(); private readonly audioFormat: RealtimeVoiceAudioFormat; private resumptionHandle: string | undefined; + private reconnectAttempts = 0; + private reconnectTimer: ReturnType | undefined; constructor(private readonly config: GoogleRealtimeVoiceBridgeConfig) { this.audioFormat = config.audioFormat ?? REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ; @@ -464,13 +487,23 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { ); this.config.onError?.(error); }, - onclose: () => { + onclose: (event) => { this.connected = false; this.sessionConfigured = false; this.pendingFunctionNames.clear(); - const reason = this.intentionallyClosed ? "completed" : "error"; this.session = null; - this.config.onClose?.(reason); + if (this.intentionallyClosed) { + this.config.onClose?.("completed"); + return; + } + const closeDetails = formatGoogleLiveCloseEvent(event); + if (this.scheduleReconnect(closeDetails)) { + return; + } + this.config.onError?.( + new Error(`Google Live session closed after reconnect attempts: ${closeDetails}`), + ); + this.config.onClose?.("error"); }, }, })) as GoogleLiveSession; @@ -596,6 +629,10 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { this.intentionallyClosed = true; this.connected = false; this.sessionConfigured = false; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = undefined; + } this.pendingAudio = []; this.consecutiveSilenceMs = 0; this.audioStreamEnded = false; @@ -667,6 +704,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { private handleSetupComplete(): void { this.sessionConfigured = true; + this.reconnectAttempts = 0; for (const chunk of this.pendingAudio.splice(0)) { this.sendAudio(chunk); } @@ -739,6 +777,36 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { }); } } + + private scheduleReconnect(closeDetails: string): boolean { + if (this.reconnectAttempts >= GOOGLE_REALTIME_RECONNECT_MAX_ATTEMPTS) { + return false; + } + const attempt = ++this.reconnectAttempts; + const delayMs = Math.min( + GOOGLE_REALTIME_RECONNECT_MAX_DELAY_MS, + GOOGLE_REALTIME_RECONNECT_BASE_DELAY_MS * 2 ** (attempt - 1), + ); + this.config.onError?.( + new Error( + `Google Live session closed unexpectedly (${closeDetails}); reconnecting ${attempt}/${GOOGLE_REALTIME_RECONNECT_MAX_ATTEMPTS} in ${delayMs}ms`, + ), + ); + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = undefined; + if (this.intentionallyClosed) { + return; + } + this.connect().catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + this.config.onError?.(error instanceof Error ? error : new Error(message)); + if (!this.scheduleReconnect(`connect failed: ${message}`)) { + this.config.onClose?.("error"); + } + }); + }, delayMs); + return true; + } } function convertMulaw8kToPcm16k(muLaw: Buffer): Buffer { diff --git a/extensions/voice-call/index.test.ts b/extensions/voice-call/index.test.ts index 739768eec04..c253f0c9adb 100644 --- a/extensions/voice-call/index.test.ts +++ b/extensions/voice-call/index.test.ts @@ -412,6 +412,37 @@ describe("voice-call plugin", () => { expect(respond.mock.calls[0]?.[0]).toBe(true); }); + it("preserves explicit session keys on voicecall.start", async () => { + const { methods } = setup({ provider: "mock" }); + const handler = methods.get("voicecall.start") as + | ((ctx: { + params: Record; + respond: ReturnType; + }) => Promise) + | undefined; + const respond = vi.fn(); + await handler?.({ + params: { + mode: "conversation", + requesterSessionKey: "agent:main:discord:channel:general", + sessionKey: "voice:google-meet:meet-1", + to: "+15550001234", + }, + respond, + }); + expect(runtimeStub.manager.initiateCall).toHaveBeenCalledWith( + "+15550001234", + "voice:google-meet:meet-1", + { + dtmfSequence: undefined, + message: undefined, + mode: "conversation", + requesterSessionKey: "agent:main:discord:channel:general", + }, + ); + expect(respond.mock.calls[0]?.[0]).toBe(true); + }); + it("returns call status", async () => { const { methods } = setup({ provider: "mock" }); const handler = methods.get("voicecall.status") as diff --git a/extensions/voice-call/index.ts b/extensions/voice-call/index.ts index b6c4bf0a9ff..202ffc469ba 100644 --- a/extensions/voice-call/index.ts +++ b/extensions/voice-call/index.ts @@ -97,6 +97,11 @@ const voiceCallConfigSchema = { help: "Controls the shared openclaw_agent_consult tool.", advanced: true, }, + "realtime.consultPolicy": { + label: "Realtime Consult Policy", + help: "Guides when the realtime voice model should call openclaw_agent_consult.", + advanced: true, + }, "realtime.fastContext.enabled": { label: "Enable Fast Realtime Context", help: "Searches memory/session context before the full consult agent.", @@ -118,6 +123,31 @@ const voiceCallConfigSchema = { label: "Fallback To Full Consult", advanced: true, }, + "realtime.agentContext.enabled": { + label: "Enable Agent Voice Context", + help: "Injects a compact agent identity, system prompt, and workspace context capsule into realtime voice instructions.", + advanced: true, + }, + "realtime.agentContext.maxChars": { + label: "Agent Voice Context Limit", + advanced: true, + }, + "realtime.agentContext.includeIdentity": { + label: "Include Agent Identity", + advanced: true, + }, + "realtime.agentContext.includeSystemPrompt": { + label: "Include Agent System Prompt", + advanced: true, + }, + "realtime.agentContext.includeWorkspaceFiles": { + label: "Include Agent Workspace Files", + advanced: true, + }, + "realtime.agentContext.files": { + label: "Agent Voice Context Files", + advanced: true, + }, "realtime.providers": { label: "Realtime Provider Config", advanced: true }, "tts.provider": { label: "TTS Provider Override", @@ -152,6 +182,10 @@ const VoiceCallToolSchema = Type.Union([ to: Type.Optional(Type.String({ description: "Call target" })), message: Type.String({ description: "Intro message" }), mode: Type.Optional(Type.Union([Type.Literal("notify"), Type.Literal("conversation")])), + sessionKey: Type.Optional(Type.String({ description: "OpenClaw session key for the call" })), + requesterSessionKey: Type.Optional( + Type.String({ description: "OpenClaw session key that initiated the call" }), + ), dtmfSequence: Type.Optional(Type.String({ description: "DTMF digits to play before connect" })), }), Type.Object({ @@ -182,6 +216,10 @@ const VoiceCallToolSchema = Type.Union([ to: Type.Optional(Type.String({ description: "Call target" })), sid: Type.Optional(Type.String({ description: "Call SID" })), message: Type.Optional(Type.String({ description: "Optional intro message" })), + sessionKey: Type.Optional(Type.String({ description: "OpenClaw session key for the call" })), + requesterSessionKey: Type.Optional( + Type.String({ description: "OpenClaw session key that initiated the call" }), + ), dtmfSequence: Type.Optional(Type.String({ description: "DTMF digits to play before connect" })), }), ]); @@ -342,11 +380,14 @@ export default definePluginEntry({ message?: string; mode?: "notify" | "conversation"; dtmfSequence?: string; + sessionKey?: string; + requesterSessionKey?: string; }) => { - const result = await params.rt.manager.initiateCall(params.to, undefined, { + const result = await params.rt.manager.initiateCall(params.to, params.sessionKey, { message: params.message, mode: params.mode, dtmfSequence: params.dtmfSequence, + ...(params.requesterSessionKey ? { requesterSessionKey: params.requesterSessionKey } : {}), }); if (!result.success) { respondError(params.respond, result.error || "initiate failed"); @@ -413,6 +454,8 @@ export default definePluginEntry({ to, message, mode, + sessionKey: normalizeOptionalString(params?.sessionKey), + requesterSessionKey: normalizeOptionalString(params?.requesterSessionKey), }); } catch (err) { sendError(respond, err); @@ -603,6 +646,8 @@ export default definePluginEntry({ const to = normalizeOptionalString(params?.to) ?? ""; const message = normalizeOptionalString(params?.message) ?? ""; const dtmfSequence = normalizeOptionalString(params?.dtmfSequence); + const sessionKey = normalizeOptionalString(params?.sessionKey); + const requesterSessionKey = normalizeOptionalString(params?.requesterSessionKey); if (!to) { respondError(respond, "to required", ErrorCodes.INVALID_REQUEST); return; @@ -617,6 +662,8 @@ export default definePluginEntry({ message: message || undefined, mode, dtmfSequence, + sessionKey, + ...(requesterSessionKey ? { requesterSessionKey } : {}), }); } catch (err) { sendError(respond, err); @@ -737,10 +784,17 @@ export default definePluginEntry({ if (!to) { throw new Error("to required for call"); } - const result = await rt.manager.initiateCall(to, undefined, { - dtmfSequence: normalizeOptionalString(rawParams.dtmfSequence), - message: normalizeOptionalString(rawParams.message), - }); + const result = await rt.manager.initiateCall( + to, + normalizeOptionalString(rawParams.sessionKey), + { + dtmfSequence: normalizeOptionalString(rawParams.dtmfSequence), + message: normalizeOptionalString(rawParams.message), + ...(normalizeOptionalString(rawParams.requesterSessionKey) + ? { requesterSessionKey: normalizeOptionalString(rawParams.requesterSessionKey) } + : {}), + }, + ); if (!result.success) { throw new Error(result.error || "initiate failed"); } diff --git a/extensions/voice-call/openclaw.plugin.json b/extensions/voice-call/openclaw.plugin.json index 1ac33e6e594..5abc35329d5 100644 --- a/extensions/voice-call/openclaw.plugin.json +++ b/extensions/voice-call/openclaw.plugin.json @@ -148,6 +148,11 @@ "help": "Controls the shared openclaw_agent_consult tool.", "advanced": true }, + "realtime.consultPolicy": { + "label": "Realtime Consult Policy", + "help": "Guides when the realtime voice model should call openclaw_agent_consult.", + "advanced": true + }, "realtime.fastContext.enabled": { "label": "Enable Fast Realtime Context", "help": "Searches memory/session context before the full consult agent.", @@ -169,6 +174,31 @@ "label": "Fallback To Full Consult", "advanced": true }, + "realtime.agentContext.enabled": { + "label": "Enable Agent Voice Context", + "help": "Injects a compact agent identity, system prompt, and workspace context capsule into realtime voice instructions.", + "advanced": true + }, + "realtime.agentContext.maxChars": { + "label": "Agent Voice Context Limit", + "advanced": true + }, + "realtime.agentContext.includeIdentity": { + "label": "Include Agent Identity", + "advanced": true + }, + "realtime.agentContext.includeSystemPrompt": { + "label": "Include Agent System Prompt", + "advanced": true + }, + "realtime.agentContext.includeWorkspaceFiles": { + "label": "Include Agent Workspace Files", + "advanced": true + }, + "realtime.agentContext.files": { + "label": "Agent Voice Context Files", + "advanced": true + }, "realtime.providers": { "label": "Realtime Provider Config", "advanced": true @@ -481,6 +511,10 @@ "type": "string", "enum": ["safe-read-only", "owner", "none"] }, + "consultPolicy": { + "type": "string", + "enum": ["auto", "substantive", "always"] + }, "tools": { "type": "array", "items": { @@ -550,6 +584,35 @@ } } }, + "agentContext": { + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean" + }, + "maxChars": { + "type": "integer", + "minimum": 1 + }, + "includeIdentity": { + "type": "boolean" + }, + "includeSystemPrompt": { + "type": "boolean" + }, + "includeWorkspaceFiles": { + "type": "boolean" + }, + "files": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + } + } + }, "providers": { "type": "object", "additionalProperties": { diff --git a/extensions/voice-call/src/config.test.ts b/extensions/voice-call/src/config.test.ts index 3985589692f..3f0fb2d82c4 100644 --- a/extensions/voice-call/src/config.test.ts +++ b/extensions/voice-call/src/config.test.ts @@ -388,6 +388,7 @@ describe("normalizeVoiceCallConfig", () => { expect(normalized.streaming.providers).toEqual({}); expect(normalized.realtime.streamPath).toBe("/voice/stream/realtime"); expect(normalized.realtime.toolPolicy).toBe("safe-read-only"); + expect(normalized.realtime.consultPolicy).toBe("auto"); expect(normalized.realtime.fastContext).toEqual({ enabled: false, timeoutMs: 800, @@ -395,6 +396,14 @@ describe("normalizeVoiceCallConfig", () => { sources: ["memory", "sessions"], fallbackToConsult: false, }); + expect(normalized.realtime.agentContext).toEqual({ + enabled: false, + maxChars: 6000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "USER.md"], + }); expect(normalized.realtime.instructions).toContain("openclaw_agent_consult"); expect(normalized.tunnel.provider).toBe("none"); expect(normalized.webhookSecurity.allowedHosts).toEqual([]); @@ -455,6 +464,7 @@ describe("resolveVoiceCallConfig", () => { expect(resolved.realtime.instructions).toBe("Stay concise."); expect(resolved.realtime.toolPolicy).toBe("safe-read-only"); + expect(resolved.realtime.consultPolicy).toBe("auto"); expect(resolved.realtime.provider).toBeUndefined(); }); diff --git a/extensions/voice-call/src/config.ts b/extensions/voice-call/src/config.ts index dff394ee752..ec52ba52575 100644 --- a/extensions/voice-call/src/config.ts +++ b/extensions/voice-call/src/config.ts @@ -227,6 +227,7 @@ const VoiceCallRealtimeProvidersConfigSchema = z .default({}); const VoiceCallRealtimeToolPolicySchema = z.enum(REALTIME_VOICE_AGENT_CONSULT_TOOL_POLICIES); +const VoiceCallRealtimeConsultPolicySchema = z.enum(["auto", "substantive", "always"]); const VoiceCallRealtimeFastContextSourceSchema = z.enum(["memory", "sessions"]); @@ -258,6 +259,34 @@ export type VoiceCallRealtimeFastContextConfig = z.infer< typeof VoiceCallRealtimeFastContextConfigSchema >; +const VoiceCallRealtimeAgentContextConfigSchema = z + .object({ + /** Inject a compact agent persona/context capsule into realtime voice instructions. */ + enabled: z.boolean().default(false), + /** Maximum number of characters from the generated capsule to append. */ + maxChars: z.number().int().positive().default(6000), + /** Include configured agent identity fields. */ + includeIdentity: z.boolean().default(true), + /** Include agents.defaults/list systemPromptOverride when configured. */ + includeSystemPrompt: z.boolean().default(true), + /** Include selected workspace files such as SOUL.md and IDENTITY.md. */ + includeWorkspaceFiles: z.boolean().default(true), + /** Workspace-relative files to include, bounded by maxChars. */ + files: z.array(z.string().min(1)).default(["SOUL.md", "IDENTITY.md", "USER.md"]), + }) + .strict() + .default({ + enabled: false, + maxChars: 6000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "USER.md"], + }); +export type VoiceCallRealtimeAgentContextConfig = z.infer< + typeof VoiceCallRealtimeAgentContextConfigSchema +>; + const VoiceCallStreamingProvidersConfigSchema = z .record(z.string(), z.record(z.string(), z.unknown())) .default({}); @@ -274,10 +303,14 @@ const VoiceCallRealtimeConfigSchema = z instructions: z.string().default(DEFAULT_VOICE_CALL_REALTIME_INSTRUCTIONS), /** Tool policy for the shared OpenClaw agent consult tool. */ toolPolicy: VoiceCallRealtimeToolPolicySchema.default("safe-read-only"), + /** Guidance for when the realtime model should call the OpenClaw agent consult tool. */ + consultPolicy: VoiceCallRealtimeConsultPolicySchema.default("auto"), /** Tool definitions exposed to the realtime provider. */ tools: z.array(RealtimeToolSchema).default([]), /** Low-latency memory/session context for the consult tool. */ fastContext: VoiceCallRealtimeFastContextConfigSchema, + /** Bounded agent persona/context injection for the fast realtime voice path. */ + agentContext: VoiceCallRealtimeAgentContextConfigSchema, /** Provider-owned raw config blobs keyed by provider id. */ providers: VoiceCallRealtimeProvidersConfigSchema, }) @@ -286,6 +319,7 @@ const VoiceCallRealtimeConfigSchema = z enabled: false, instructions: DEFAULT_VOICE_CALL_REALTIME_INSTRUCTIONS, toolPolicy: "safe-read-only", + consultPolicy: "auto", tools: [], fastContext: { enabled: false, @@ -294,6 +328,14 @@ const VoiceCallRealtimeConfigSchema = z sources: ["memory", "sessions"], fallbackToConsult: false, }, + agentContext: { + enabled: false, + maxChars: 6000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "USER.md"], + }, providers: {}, }); export type VoiceCallRealtimeConfig = z.infer; @@ -606,6 +648,11 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal ...config.realtime?.fastContext, sources: config.realtime?.fastContext?.sources ?? defaults.realtime.fastContext.sources, }; + const realtimeAgentContext = { + ...defaults.realtime.agentContext, + ...config.realtime?.agentContext, + files: config.realtime?.agentContext?.files ?? defaults.realtime.agentContext.files, + }; return { ...defaults, ...config, @@ -640,6 +687,7 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal tools: (config.realtime?.tools as RealtimeToolConfig[] | undefined) ?? defaults.realtime.tools, fastContext: realtimeFastContext, + agentContext: realtimeAgentContext, providers: realtimeProviders, }, tts: normalizeVoiceCallTtsConfig(defaults.tts, config.tts), diff --git a/extensions/voice-call/src/manager/outbound.ts b/extensions/voice-call/src/manager/outbound.ts index 8d94b09fa2a..2c4c99b7a7d 100644 --- a/extensions/voice-call/src/manager/outbound.ts +++ b/extensions/voice-call/src/manager/outbound.ts @@ -123,6 +123,7 @@ export async function initiateCall( const initialMessage = opts.message; const mode = opts.mode ?? ctx.config.outbound.defaultMode; const dtmfSequence = opts.dtmfSequence; + const requesterSessionKey = opts.requesterSessionKey?.trim(); if (dtmfSequence) { const validationError = validateDtmfDigits(dtmfSequence); if (validationError) { @@ -178,6 +179,7 @@ export async function initiateCall( metadata: { ...(initialMessage && { initialMessage }), mode, + ...(requesterSessionKey ? { requesterSessionKey } : {}), }, }; diff --git a/extensions/voice-call/src/realtime-agent-context.test.ts b/extensions/voice-call/src/realtime-agent-context.test.ts new file mode 100644 index 00000000000..ef0f53ccd9c --- /dev/null +++ b/extensions/voice-call/src/realtime-agent-context.test.ts @@ -0,0 +1,101 @@ +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { VoiceCallConfig } from "./config.js"; +import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js"; +import { buildRealtimeVoiceInstructions } from "./realtime-agent-context.js"; +import { createVoiceCallBaseConfig } from "./test-fixtures.js"; + +const tempDirs: string[] = []; + +afterEach(async () => { + await Promise.all(tempDirs.splice(0).map((dir) => rm(dir, { recursive: true, force: true }))); +}); + +async function createWorkspace(): Promise { + const workspaceDir = await mkdtemp(path.join(tmpdir(), "openclaw-voice-context-")); + tempDirs.push(workspaceDir); + return workspaceDir; +} + +function createConfig(overrides?: Partial): VoiceCallConfig { + const config = createVoiceCallBaseConfig(); + config.agentId = "voice"; + config.realtime.enabled = true; + config.realtime.instructions = "Base voice instructions."; + config.realtime = { + ...config.realtime, + ...overrides, + fastContext: { + ...config.realtime.fastContext, + ...overrides?.fastContext, + sources: overrides?.fastContext?.sources ?? config.realtime.fastContext.sources, + }, + agentContext: { + ...config.realtime.agentContext, + ...overrides?.agentContext, + files: overrides?.agentContext?.files ?? config.realtime.agentContext.files, + }, + tools: overrides?.tools ?? config.realtime.tools, + providers: overrides?.providers ?? config.realtime.providers, + }; + return config; +} + +function createAgentRuntime(workspaceDir: string): CoreAgentDeps { + return { + resolveAgentIdentity: vi.fn(() => ({ + name: "Claw Voice", + emoji: ":claw:", + theme: "bright", + vibe: "snappy", + creature: "operator", + })), + resolveAgentWorkspaceDir: vi.fn(() => workspaceDir), + } as unknown as CoreAgentDeps; +} + +describe("buildRealtimeVoiceInstructions", () => { + it("injects bounded identity, system prompt, and workspace context", async () => { + const workspaceDir = await createWorkspace(); + await writeFile(path.join(workspaceDir, "SOUL.md"), "Stay quick, direct, and warm.\n"); + await writeFile(path.join(workspaceDir, "IDENTITY.md"), "Name: Claw Voice\nVibe: snappy\n"); + await writeFile(path.join(workspaceDir, "SECRET.md"), "do not include\n"); + + const coreConfig = { + agents: { + list: [{ id: "voice", systemPromptOverride: "Keep spoken answers short." }], + }, + } as CoreConfig; + + const instructions = await buildRealtimeVoiceInstructions({ + baseInstructions: "Base voice instructions.", + config: createConfig({ + consultPolicy: "substantive", + agentContext: { + enabled: true, + maxChars: 2000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "../SECRET.md"], + }, + }), + coreConfig, + agentRuntime: createAgentRuntime(workspaceDir), + }); + + expect(instructions).toContain("OpenClaw agent voice context:"); + expect(instructions).toContain("Consult behavior:"); + expect(instructions).toContain("Call openclaw_agent_consult before answering requests"); + expect(instructions).toContain("- Agent id: voice"); + expect(instructions).toContain("- Name: Claw Voice"); + expect(instructions).toContain("- Vibe: snappy"); + expect(instructions).toContain("Keep spoken answers short."); + expect(instructions).toContain("### SOUL.md"); + expect(instructions).toContain("Stay quick, direct, and warm."); + expect(instructions).toContain("### IDENTITY.md"); + expect(instructions).not.toContain("do not include"); + }); +}); diff --git a/extensions/voice-call/src/realtime-agent-context.ts b/extensions/voice-call/src/realtime-agent-context.ts new file mode 100644 index 00000000000..a5f85bc5e7b --- /dev/null +++ b/extensions/voice-call/src/realtime-agent-context.ts @@ -0,0 +1,177 @@ +import { readFile } from "node:fs/promises"; +import path from "node:path"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; +import type { VoiceCallConfig } from "./config.js"; +import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js"; + +type AgentEntryLike = { + id?: unknown; + systemPromptOverride?: unknown; +}; + +type VoiceIdentityLike = { + name?: unknown; + emoji?: unknown; + theme?: unknown; + creature?: unknown; + vibe?: unknown; +}; + +function normalizeString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +function readAgentEntries(cfg: CoreConfig): AgentEntryLike[] { + const agents = (cfg as { agents?: { list?: unknown } }).agents; + return Array.isArray(agents?.list) + ? agents.list.filter((entry): entry is AgentEntryLike => + Boolean(entry && typeof entry === "object"), + ) + : []; +} + +function resolveAgentSystemPromptOverride(cfg: CoreConfig, agentId: string): string | undefined { + const entries = readAgentEntries(cfg); + const entry = entries.find((candidate) => normalizeString(candidate.id) === agentId); + return ( + normalizeString(entry?.systemPromptOverride) ?? + normalizeString( + (cfg as { agents?: { defaults?: { systemPromptOverride?: unknown } } }).agents?.defaults + ?.systemPromptOverride, + ) + ); +} + +function isSafeWorkspaceRelativeFile(file: string): boolean { + if (!file.trim() || path.isAbsolute(file)) { + return false; + } + const normalized = path.normalize(file); + const parts = normalized.split(/[\\/]+/); + return normalized !== "." && !parts.includes("..") && !normalized.includes("\0"); +} + +function limitText(text: string, maxChars: number): string { + if (text.length <= maxChars) { + return text; + } + return `${text.slice(0, Math.max(0, maxChars - 32)).trimEnd()}\n[truncated]`; +} + +async function readWorkspaceVoiceContextFiles(params: { + workspaceDir: string; + files: readonly string[]; + maxChars: number; +}): Promise { + const sections: string[] = []; + let remaining = params.maxChars; + for (const file of params.files) { + if (remaining <= 0 || !isSafeWorkspaceRelativeFile(file)) { + continue; + } + const fullPath = path.join(params.workspaceDir, path.normalize(file)); + const content = await readFile(fullPath, "utf8").catch(() => undefined); + const trimmed = content?.trim(); + if (!trimmed) { + continue; + } + const body = limitText(trimmed, Math.max(0, remaining - file.length - 16)); + const section = `### ${file}\n${body}`; + sections.push(section); + remaining -= section.length; + } + return sections; +} + +function buildConsultPolicyGuidance( + config: Pick, +): string | undefined { + if (config.toolPolicy === "none" || config.consultPolicy === "auto") { + return undefined; + } + if (config.consultPolicy === "always") { + return [ + "Consult behavior:", + "- Call openclaw_agent_consult before every substantive answer.", + "- You may answer directly only for greetings, acknowledgements, brief latency tests, or filler while waiting for the consult result.", + "- After the consult result arrives, speak that result concisely.", + ].join("\n"); + } + return [ + "Consult behavior:", + "- Answer directly for greetings, acknowledgements, simple conversational glue, and brief latency tests.", + "- Call openclaw_agent_consult before answering requests that need facts, memory, current information, tools, workspace state, or the user's OpenClaw-specific context.", + "- Keep spoken replies concise and natural.", + ].join("\n"); +} + +export async function buildRealtimeVoiceInstructions(params: { + baseInstructions: string; + config: VoiceCallConfig; + coreConfig: CoreConfig; + agentRuntime: CoreAgentDeps; +}): Promise { + const { config } = params; + const sections: string[] = [params.baseInstructions]; + const consultGuidance = buildConsultPolicyGuidance(config.realtime); + if (consultGuidance) { + sections.push(consultGuidance); + } + + const contextConfig = config.realtime.agentContext; + if (!contextConfig.enabled) { + return sections.filter(Boolean).join("\n\n"); + } + + const agentId = config.agentId ?? "main"; + const capsule: string[] = [ + "OpenClaw agent voice context:", + `- Agent id: ${agentId}`, + "- Use this context to match the OpenClaw agent's personality and standing preferences on fast voice turns.", + "- Treat this as compact context only; call openclaw_agent_consult when the caller needs the full agent brain, tools, memory, or workspace state.", + ]; + + if (contextConfig.includeIdentity) { + const identity = params.agentRuntime.resolveAgentIdentity( + params.coreConfig as OpenClawConfig, + agentId, + ) as VoiceIdentityLike | undefined; + const identityLines = [ + normalizeString(identity?.name) ? `- Name: ${normalizeString(identity?.name)}` : undefined, + normalizeString(identity?.emoji) ? `- Emoji: ${normalizeString(identity?.emoji)}` : undefined, + normalizeString(identity?.vibe) ? `- Vibe: ${normalizeString(identity?.vibe)}` : undefined, + normalizeString(identity?.theme) ? `- Theme: ${normalizeString(identity?.theme)}` : undefined, + normalizeString(identity?.creature) + ? `- Creature/persona: ${normalizeString(identity?.creature)}` + : undefined, + ].filter(Boolean); + if (identityLines.length > 0) { + capsule.push(`Configured identity:\n${identityLines.join("\n")}`); + } + } + + if (contextConfig.includeSystemPrompt) { + const systemPrompt = resolveAgentSystemPromptOverride(params.coreConfig, agentId); + if (systemPrompt) { + capsule.push(`Configured system prompt override:\n${systemPrompt}`); + } + } + + if (contextConfig.includeWorkspaceFiles) { + const workspaceDir = params.agentRuntime.resolveAgentWorkspaceDir( + params.coreConfig as OpenClawConfig, + agentId, + ); + const fileSections = await readWorkspaceVoiceContextFiles({ + workspaceDir, + files: contextConfig.files, + maxChars: contextConfig.maxChars, + }); + if (fileSections.length > 0) { + capsule.push(`Workspace voice context:\n${fileSections.join("\n\n")}`); + } + } + + sections.push(limitText(capsule.join("\n\n"), contextConfig.maxChars)); + return sections.filter(Boolean).join("\n\n"); +} diff --git a/extensions/voice-call/src/runtime.test.ts b/extensions/voice-call/src/runtime.test.ts index 6510b205184..bdb4490938a 100644 --- a/extensions/voice-call/src/runtime.test.ts +++ b/extensions/voice-call/src/runtime.test.ts @@ -347,6 +347,7 @@ describe("createVoiceCallRuntime lifecycle", () => { direction: "outbound", from: "+15550001234", to: "+15550009999", + metadata: { requesterSessionKey: "agent:main:discord:channel:general" }, transcript: [{ speaker: "user", text: "Can you check shipment status?" }], }); @@ -384,6 +385,7 @@ describe("createVoiceCallRuntime lifecycle", () => { expect(runEmbeddedPiAgent).toHaveBeenCalledWith( expect.objectContaining({ sessionKey: "voice:15550009999", + spawnedBy: "agent:main:discord:channel:general", messageProvider: "voice", lane: "voice", provider: "openai", diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index a1187396578..ec0d311787f 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -20,6 +20,7 @@ import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js"; import { CallManager } from "./manager.js"; import type { VoiceCallProvider } from "./providers/base.js"; import type { TwilioProvider } from "./providers/twilio.js"; +import { buildRealtimeVoiceInstructions } from "./realtime-agent-context.js"; import { resolveRealtimeFastContextConsult } from "./realtime-fast-context.js"; import { resolveVoiceResponseModel } from "./response-model.js"; import type { TelephonyTtsRuntime } from "./telephony-tts.js"; @@ -60,8 +61,9 @@ type RealtimeVoiceRuntimeModule = typeof import("./realtime-voice.runtime.js"); type RealtimeHandlerModule = typeof import("./webhook/realtime-handler.js"); const REALTIME_VOICE_CONSULT_SYSTEM_PROMPT = [ - "You are a behind-the-scenes consultant for a live phone voice agent.", - "Prioritize a fast, speakable answer over exhaustive investigation.", + "You are the configured OpenClaw agent receiving delegated requests from a live phone voice bridge.", + "Act on behalf of the caller using the normal available tools when the caller asks you to do work.", + "Prioritize completing the user's request and returning a fast, speakable result over exhaustive investigation.", "For tool-backed status checks, prefer one or two bounded read-only queries before answering.", "Do not print secret values or dump environment variables; only check whether required configuration is present.", "Be accurate, brief, and speakable.", @@ -317,8 +319,15 @@ export async function createVoiceCallRuntime(params: { ); if (realtimeProvider) { const { RealtimeCallHandler } = await loadRealtimeHandler(); + const realtimeInstructions = await buildRealtimeVoiceInstructions({ + baseInstructions: config.realtime.instructions, + config, + coreConfig, + agentRuntime, + }); const realtimeConfig = { ...config.realtime, + instructions: realtimeInstructions, tools: resolveRealtimeVoiceAgentConsultTools( config.realtime.toolPolicy, config.realtime.tools, @@ -350,6 +359,10 @@ export async function createVoiceCallRuntime(params: { ...call, config: effectiveConfig, }); + const requesterSessionKey = + typeof call.metadata?.requesterSessionKey === "string" + ? call.metadata.requesterSessionKey + : undefined; const fastContext = await resolveRealtimeFastContextConsult({ cfg, agentId, @@ -389,6 +402,8 @@ export async function createVoiceCallRuntime(params: { model, thinkLevel, timeoutMs: effectiveConfig.responseTimeoutMs, + spawnedBy: requesterSessionKey, + contextMode: requesterSessionKey ? "fork" : undefined, toolsAllow: resolveRealtimeVoiceAgentConsultToolsAllow( effectiveConfig.realtime.toolPolicy, ), diff --git a/extensions/voice-call/src/test-fixtures.ts b/extensions/voice-call/src/test-fixtures.ts index 75034c34330..bc5f4064766 100644 --- a/extensions/voice-call/src/test-fixtures.ts +++ b/extensions/voice-call/src/test-fixtures.ts @@ -51,6 +51,7 @@ export function createVoiceCallBaseConfig(params?: { streamPath: "/voice/stream/realtime", instructions: DEFAULT_VOICE_CALL_REALTIME_INSTRUCTIONS, toolPolicy: "safe-read-only", + consultPolicy: "auto", tools: [], fastContext: { enabled: false, @@ -59,6 +60,14 @@ export function createVoiceCallBaseConfig(params?: { sources: ["memory", "sessions"], fallbackToConsult: false, }, + agentContext: { + enabled: false, + maxChars: 6000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "USER.md"], + }, providers: {}, }, skipSignatureVerification: false, diff --git a/extensions/voice-call/src/types.ts b/extensions/voice-call/src/types.ts index a5fb6f85f74..fde32b589f4 100644 --- a/extensions/voice-call/src/types.ts +++ b/extensions/voice-call/src/types.ts @@ -288,4 +288,6 @@ export type OutboundCallOptions = { mode?: CallMode; /** DTMF digits to send after the call is connected */ dtmfSequence?: string; + /** Session that initiated the call, used for agent context/delegated message routing */ + requesterSessionKey?: string; }; diff --git a/extensions/voice-call/src/webhook.test.ts b/extensions/voice-call/src/webhook.test.ts index 3534e77878a..fb929d73d1d 100644 --- a/extensions/voice-call/src/webhook.test.ts +++ b/extensions/voice-call/src/webhook.test.ts @@ -83,6 +83,11 @@ const createConfig = (overrides: VoiceCallConfigInput = {}): VoiceCallConfig => ...overrides.realtime?.fastContext, sources: overrides.realtime?.fastContext?.sources ?? base.realtime.fastContext.sources, }, + agentContext: { + ...base.realtime.agentContext, + ...overrides.realtime?.agentContext, + files: overrides.realtime?.agentContext?.files ?? base.realtime.agentContext.files, + }, providers: overrides.realtime?.providers ?? base.realtime.providers, }, }; diff --git a/extensions/voice-call/src/webhook/realtime-audio-pacer.ts b/extensions/voice-call/src/webhook/realtime-audio-pacer.ts index 3e4c8183f93..300b83d91da 100644 --- a/extensions/voice-call/src/webhook/realtime-audio-pacer.ts +++ b/extensions/voice-call/src/webhook/realtime-audio-pacer.ts @@ -1,9 +1,9 @@ const TELEPHONY_SAMPLE_RATE = 8_000; const TELEPHONY_CHUNK_BYTES = 160; const TELEPHONY_CHUNK_MS = 20; -const DEFAULT_SPEECH_RMS_THRESHOLD = 0.02; -const DEFAULT_REQUIRED_LOUD_CHUNKS = 2; -const DEFAULT_REQUIRED_QUIET_CHUNKS = 10; +const DEFAULT_SPEECH_RMS_THRESHOLD = 0.035; +const DEFAULT_REQUIRED_LOUD_CHUNKS = 4; +const DEFAULT_REQUIRED_QUIET_CHUNKS = 12; const DEFAULT_MAX_QUEUED_AUDIO_BYTES = TELEPHONY_SAMPLE_RATE * 120; const PCM16_MAX_AMPLITUDE = 32768; const MULAW_LINEAR_SAMPLES = new Int16Array(256); @@ -69,14 +69,16 @@ export class RealtimeTwilioAudioPacer { this.ensurePump(); } - clearAudio(): void { + clearAudio(): number { if (this.closed) { - return; + return 0; } + const clearedAudioBytes = this.queuedAudioBytes; this.clearTimer(); this.queue = []; this.queuedAudioBytes = 0; this.params.sendJson({ event: "clear", streamSid: this.params.streamSid }); + return clearedAudioBytes; } close(): void { diff --git a/extensions/voice-call/src/webhook/realtime-handler.test.ts b/extensions/voice-call/src/webhook/realtime-handler.test.ts index f6d6592361a..840a021d339 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.test.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.test.ts @@ -59,6 +59,7 @@ function makeHandler( streamPath: overrides?.streamPath ?? "/voice/stream/realtime", instructions: overrides?.instructions ?? "Be helpful.", toolPolicy: overrides?.toolPolicy ?? "safe-read-only", + consultPolicy: overrides?.consultPolicy ?? "auto", tools: overrides?.tools ?? [], fastContext: overrides?.fastContext ?? { enabled: false, @@ -67,6 +68,14 @@ function makeHandler( sources: ["memory", "sessions"], fallbackToConsult: false, }, + agentContext: overrides?.agentContext ?? { + enabled: false, + maxChars: 6000, + includeIdentity: true, + includeSystemPrompt: true, + includeWorkspaceFiles: true, + files: ["SOUL.md", "IDENTITY.md", "USER.md"], + }, providers: overrides?.providers ?? {}, ...(overrides?.provider ? { provider: overrides.provider } : {}), }; @@ -337,7 +346,7 @@ describe("RealtimeCallHandler path routing", () => { } }); - it("marks realtime calls ended when the provider closes normally", async () => { + it("ends realtime calls when the telephony stream stops", async () => { let callbacks: | { onClose?: (reason: "completed" | "error") => void; @@ -488,7 +497,9 @@ describe("RealtimeCallHandler path routing", () => { name: "openclaw_agent_consult", args: { question: "Are the basement lights on?" }, }); - expect(receivedPartialTranscript).toBe("Are the basement"); + await vi.waitFor(() => { + expect(receivedPartialTranscript).toBe("Are the basement"); + }); await vi.waitFor(() => { expect(submitToolResult).toHaveBeenCalledWith( @@ -538,6 +549,346 @@ describe("RealtimeCallHandler path routing", () => { } }); + it("forces an agent consult from final user transcript when consult policy is always", async () => { + let callbacks: + | { + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + const sendUserMessage = vi.fn(); + const bridge = makeBridge({ sendUserMessage }); + const createBridge = vi.fn( + (request: Parameters[0]) => { + callbacks = request; + return bridge; + }, + ); + const handler = makeHandler( + { consultPolicy: "always" }, + { + manager: { + getCallByProviderCallId: vi.fn( + (): CallRecord => ({ + callId: "call-1", + providerCallId: "CA-force", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }), + ), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }, + ); + const consult = vi.fn(async () => ({ text: "I created the smoke test file." })); + handler.registerToolHandler("openclaw_agent_consult", consult); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-force", callSid: "CA-force" }, + }), + ); + await vi.waitFor(() => { + expect(createBridge).toHaveBeenCalled(); + }); + + callbacks?.onTranscript?.("user", "Create a smoke test file for me.", true); + + await vi.waitFor(() => { + expect(consult).toHaveBeenCalledWith( + expect.objectContaining({ + question: "Create a smoke test file for me.", + }), + "call-1", + {}, + ); + }); + await vi.waitFor(() => { + expect(sendUserMessage).toHaveBeenCalledWith( + expect.stringContaining("I created the smoke test file."), + ); + }); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + + it("does not carry a final transcript into the next direct voice turn", async () => { + let callbacks: + | { + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + const processEvent = vi.fn(); + const createBridge = vi.fn( + (request: Parameters[0]) => { + callbacks = request; + return makeBridge(); + }, + ); + const handler = makeHandler(undefined, { + manager: { + processEvent, + getCallByProviderCallId: vi.fn( + (): CallRecord => ({ + callId: "call-1", + providerCallId: "CA-direct-turns", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }), + ), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-direct-turns", callSid: "CA-direct-turns" }, + }), + ); + await vi.waitFor(() => { + expect(createBridge).toHaveBeenCalled(); + }); + + callbacks?.onTranscript?.("user", "Hello there.", true); + callbacks?.onTranscript?.("user", "How are you?", true); + + expect(processEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: "call.speech", + transcript: "Hello there.", + }), + ); + expect(processEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: "call.speech", + transcript: "How are you?", + }), + ); + expect(processEvent).not.toHaveBeenCalledWith( + expect.objectContaining({ + type: "call.speech", + transcript: "Hello there. How are you?", + }), + ); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + + it("waits for partial transcript fragments to settle before consulting", async () => { + let callbacks: + | { + onToolCall?: (event: RealtimeVoiceToolCallEvent) => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + const submitToolResult = vi.fn(); + const bridge = makeBridge({ + supportsToolResultContinuation: true, + submitToolResult, + }); + const createBridge = vi.fn( + (request: Parameters[0]) => { + callbacks = request; + return bridge; + }, + ); + const handler = makeHandler(undefined, { + manager: { + getCallByProviderCallId: vi.fn( + (): CallRecord => ({ + callId: "call-1", + providerCallId: "CA-settle", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }), + ), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }); + const consult = vi.fn(async () => ({ text: "I sent it." })); + handler.registerToolHandler("openclaw_agent_consult", consult); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-settle", callSid: "CA-settle" }, + }), + ); + await vi.waitFor(() => { + expect(createBridge).toHaveBeenCalled(); + }); + + callbacks?.onTranscript?.("user", "Send a Discord", false); + callbacks?.onToolCall?.({ + itemId: "item-1", + callId: "consult-call", + name: "openclaw_agent_consult", + args: { question: "message" }, + }); + await new Promise((resolve) => setTimeout(resolve, 50)); + callbacks?.onTranscript?.("user", "message.", false); + + await vi.waitFor( + () => { + expect(consult).toHaveBeenCalledWith( + expect.objectContaining({ + question: "Send a Discord message.", + context: expect.stringContaining("shorter consult question: message"), + }), + "call-1", + { partialUserTranscript: "Send a Discord message." }, + ); + }, + { timeout: 2_000 }, + ); + await vi.waitFor(() => { + expect(submitToolResult).toHaveBeenLastCalledWith( + "consult-call", + { text: "I sent it." }, + undefined, + ); + }); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + + it("does not force a duplicate consult when the realtime provider calls the consult tool", async () => { + let callbacks: + | { + onToolCall?: (event: RealtimeVoiceToolCallEvent) => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + const submitToolResult = vi.fn(); + const bridge = makeBridge({ + supportsToolResultContinuation: true, + submitToolResult, + }); + const createBridge = vi.fn( + (request: Parameters[0]) => { + callbacks = request; + return bridge; + }, + ); + const handler = makeHandler( + { consultPolicy: "always" }, + { + manager: { + getCallByProviderCallId: vi.fn( + (): CallRecord => ({ + callId: "call-1", + providerCallId: "CA-native", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }), + ), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }, + ); + const consult = vi.fn(async () => ({ text: "Native consult result." })); + handler.registerToolHandler("openclaw_agent_consult", consult); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-native", callSid: "CA-native" }, + }), + ); + await vi.waitFor(() => { + expect(createBridge).toHaveBeenCalled(); + }); + + callbacks?.onTranscript?.("user", "Send me a Discord message.", true); + callbacks?.onToolCall?.({ + itemId: "item-1", + callId: "consult-call", + name: "openclaw_agent_consult", + args: { question: "Send me a Discord message." }, + }); + + await vi.waitFor(() => { + expect(submitToolResult).toHaveBeenLastCalledWith( + "consult-call", + { text: "Native consult result." }, + undefined, + ); + }); + await new Promise((resolve) => setTimeout(resolve, 250)); + expect(consult).toHaveBeenCalledTimes(1); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + it("does not submit an interim checking result when fast context is enabled", async () => { let callbacks: | { diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index e60f1e946e5..96aea578af4 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -34,6 +34,13 @@ const STREAM_TOKEN_TTL_MS = 30_000; const DEFAULT_HOST = "localhost:8443"; const MAX_REALTIME_MESSAGE_BYTES = 256 * 1024; const MAX_REALTIME_WS_BUFFERED_BYTES = 1024 * 1024; +const FORCED_CONSULT_FALLBACK_DELAY_MS = 200; +const FORCED_CONSULT_NATIVE_DEDUPE_MS = 2_000; +const FORCED_CONSULT_RESULT_MAX_CHARS = 1800; +const CONSULT_TRANSCRIPT_SETTLE_MS = 350; +const CONSULT_TRANSCRIPT_SETTLE_MAX_MS = 1_000; +const MAX_PARTIAL_USER_TRANSCRIPT_CHARS = 1_200; +const RECENT_FINAL_USER_TRANSCRIPT_TTL_MS = 2_000; function normalizePath(pathname: string): string { const trimmed = pathname.trim(); @@ -62,6 +69,147 @@ function buildGreetingInstructions( : `${intro} "${trimmedGreeting}"`; } +function readSpeakableToolResultText(result: unknown): string | undefined { + if (typeof result === "string") { + return result.trim() || undefined; + } + if (!result || typeof result !== "object" || Array.isArray(result)) { + return undefined; + } + const text = (result as { text?: unknown }).text; + if (typeof text === "string" && text.trim()) { + return text.trim(); + } + const output = (result as { output?: unknown }).output; + return typeof output === "string" && output.trim() ? output.trim() : undefined; +} + +function readConsultArgText(args: unknown, key: string): string | undefined { + if (!args || typeof args !== "object" || Array.isArray(args)) { + return undefined; + } + const value = (args as Record)[key]; + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +function readConsultQuestionText(args: unknown): string | undefined { + return ( + readConsultArgText(args, "question") ?? + readConsultArgText(args, "prompt") ?? + readConsultArgText(args, "query") ?? + readConsultArgText(args, "task") + ); +} + +function normalizeTranscriptText(text: string): string { + return text.replace(/\s+/g, " ").trim(); +} + +function findTextOverlap(base: string, next: string): number { + const max = Math.min(base.length, next.length); + for (let size = max; size > 0; size -= 1) { + if (base.slice(-size) === next.slice(0, size)) { + return size; + } + } + return 0; +} + +function shouldInsertTranscriptSpace(base: string, next: string): boolean { + if (!base || !next) { + return false; + } + const last = base.at(-1); + if ( + /\s$/.test(base) || + last === "(" || + last === "[" || + last === "{" || + last === '"' || + last === "'" || + /^[\s,.;:!?)]/.test(next) + ) { + return false; + } + return true; +} + +function appendTranscriptText(base: string | undefined, fragment: string): string { + const next = normalizeTranscriptText(fragment); + if (!next) { + return base ?? ""; + } + const current = normalizeTranscriptText(base ?? ""); + if (!current) { + return next; + } + const currentLower = current.toLowerCase(); + const nextLower = next.toLowerCase(); + if (currentLower === nextLower || currentLower.endsWith(nextLower)) { + return current; + } + if (nextLower.startsWith(currentLower)) { + return next; + } + const overlap = findTextOverlap(currentLower, nextLower); + if (overlap >= 6 || (overlap >= 3 && next.length <= 12)) { + return `${current}${next.slice(overlap)}`.trim(); + } + const separator = shouldInsertTranscriptSpace(current, next) ? " " : ""; + return `${current}${separator}${next}`.trim(); +} + +function limitPartialUserTranscript(text: string): string { + if (text.length <= MAX_PARTIAL_USER_TRANSCRIPT_CHARS) { + return text; + } + const tail = text.slice(-MAX_PARTIAL_USER_TRANSCRIPT_CHARS); + return tail.replace(/^\S+\s+/, "").trimStart() || tail.trimStart(); +} + +function withFallbackConsultQuestion(args: unknown, fallback: string | undefined): unknown { + const providerQuestion = readConsultQuestionText(args); + const question = fallback?.trim(); + if (providerQuestion) { + if ( + question && + providerQuestion.length <= 40 && + question.length >= providerQuestion.length + 8 + ) { + const context = readConsultArgText(args, "context"); + const fallbackContext = `Realtime provider supplied a shorter consult question: ${providerQuestion}`; + return args && typeof args === "object" && !Array.isArray(args) + ? { + ...args, + question, + context: context ? `${context}\n\n${fallbackContext}` : fallbackContext, + } + : { question, context: fallbackContext }; + } + return args; + } + if (!question) { + return args; + } + return args && typeof args === "object" && !Array.isArray(args) + ? { ...args, question } + : { question }; +} + +function buildForcedConsultSpeechPrompt(result: string): string { + const trimmed = result.trim(); + const bounded = + trimmed.length <= FORCED_CONSULT_RESULT_MAX_CHARS + ? trimmed + : `${trimmed.slice(0, FORCED_CONSULT_RESULT_MAX_CHARS - 16).trimEnd()} [truncated]`; + return [ + "Internal OpenClaw consult result is ready.", + "Do not call tools for this internal result.", + "Speak the following answer to the caller now, briefly and naturally:", + bounded, + ].join("\n"); +} + type PendingStreamToken = { expiry: number; from?: string; @@ -81,11 +229,40 @@ type RealtimeSpeakResult = { error?: string; }; +type ForcedConsultState = { + promise: Promise; + sendSpeechPrompt: boolean; + completedAt?: number; +}; + +type NativeConsultState = { + startedAt: number; + promise: Promise; + partialUserTranscript?: string; +}; + +type TelephonyCloseReason = "completed" | "error"; + export class RealtimeCallHandler { private readonly toolHandlers = new Map(); private readonly pendingStreamTokens = new Map(); private readonly activeBridgesByCallId = new Map(); + private readonly activeTelephonyClosersByCallId = new Map< + string, + (reason: TelephonyCloseReason) => void + >(); private readonly partialUserTranscriptsByCallId = new Map(); + private readonly partialUserTranscriptUpdatedAtByCallId = new Map(); + private readonly recentFinalUserTranscriptsByCallId = new Map(); + private readonly recentFinalUserTranscriptTimersByCallId = new Map< + string, + ReturnType + >(); + private readonly forcedConsultTimersByCallId = new Map>(); + private readonly forcedConsultInFlightByCallId = new Set(); + private readonly forcedConsultsByCallId = new Map(); + private readonly lastProviderConsultAtByCallId = new Map(); + private readonly nativeConsultsInFlightByCallId = new Map(); private publicOrigin: string | null = null; private publicPathPrefix = ""; @@ -156,6 +333,10 @@ export class RealtimeCallHandler { wss.handleUpgrade(request, socket, head, (ws) => { let bridge: ActiveRealtimeVoiceBridge | null = null; let initialized = false; + let activeCallSid = "unknown"; + let stopReceived = false; + let lastMediaTimestamp: number | undefined; + let lastMediaGapWarnAt = 0; ws.on("message", (data: Buffer) => { try { @@ -169,6 +350,7 @@ export class RealtimeCallHandler { const streamSid = typeof startData?.streamSid === "string" ? startData.streamSid : "unknown"; const callSid = typeof startData?.callSid === "string" ? startData.callSid : "unknown"; + activeCallSid = callSid; const nextBridge = this.handleCall(streamSid, callSid, ws, callerMeta); if (!nextBridge) { return; @@ -186,10 +368,25 @@ export class RealtimeCallHandler { if (msg.event === "media" && typeof mediaData?.payload === "string") { const audio = Buffer.from(mediaData.payload, "base64"); bridge.sendAudio(audio); - if (typeof mediaData.timestamp === "number") { - bridge.setMediaTimestamp(mediaData.timestamp); - } else if (typeof mediaData.timestamp === "string") { - bridge.setMediaTimestamp(Number.parseInt(mediaData.timestamp, 10)); + const mediaTimestamp = + typeof mediaData.timestamp === "number" + ? mediaData.timestamp + : typeof mediaData.timestamp === "string" + ? Number.parseInt(mediaData.timestamp, 10) + : Number.NaN; + if (Number.isFinite(mediaTimestamp)) { + if (lastMediaTimestamp !== undefined) { + const gapMs = mediaTimestamp - lastMediaTimestamp; + const now = Date.now(); + if ((gapMs > 120 || gapMs < 0) && now - lastMediaGapWarnAt > 5_000) { + lastMediaGapWarnAt = now; + console.warn( + `[voice-call] realtime media timestamp gap providerCallId=${activeCallSid} gapMs=${gapMs} timestamp=${mediaTimestamp}`, + ); + } + } + lastMediaTimestamp = mediaTimestamp; + bridge.setMediaTimestamp(mediaTimestamp); } return; } @@ -198,15 +395,17 @@ export class RealtimeCallHandler { return; } if (msg.event === "stop") { - bridge.close(); + stopReceived = true; + this.closeTelephonyBridge(activeCallSid, bridge, "completed"); } } catch (error) { console.error("[voice-call] realtime WS parse failed:", error); } }); - ws.on("close", () => { - bridge?.close(); + ws.on("close", (code) => { + const reason = stopReceived || code === 1000 || code === 1005 ? "completed" : "error"; + this.closeTelephonyBridge(activeCallSid, bridge, reason); }); ws.on("error", (error) => { @@ -289,11 +488,17 @@ export class RealtimeCallHandler { return false; } if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) { + console.warn( + `[voice-call] realtime outbound websocket backpressure before send callId=${callId} providerCallId=${callSid} bufferedBytes=${ws.bufferedAmount}`, + ); ws.close(1013, "Backpressure: send buffer exceeded"); return false; } ws.send(JSON.stringify(message)); if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) { + console.warn( + `[voice-call] realtime outbound websocket backpressure after send callId=${callId} providerCallId=${callSid} bufferedBytes=${ws.bufferedAmount}`, + ); ws.close(1013, "Backpressure: send buffer exceeded"); return false; } @@ -303,6 +508,9 @@ export class RealtimeCallHandler { streamSid, sendJson, onBackpressure: () => { + console.warn( + `[voice-call] realtime paced audio backpressure callId=${callId} providerCallId=${callSid}`, + ); if (ws.readyState === WebSocket.OPEN) { ws.close(1013, "Backpressure: paced audio queue exceeded"); } @@ -322,7 +530,10 @@ export class RealtimeCallHandler { audioPacer.sendAudio(muLaw); }, clearAudio: () => { - audioPacer.clearAudio(); + const clearedBytes = audioPacer.clearAudio(); + console.log( + `[voice-call] realtime outbound audio clear requested callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`, + ); }, sendMark: (markName) => { audioPacer.sendMark(markName); @@ -331,22 +542,42 @@ export class RealtimeCallHandler { onTranscript: (role, text, isFinal) => { if (!isFinal) { if (role === "user" && text.trim()) { - this.partialUserTranscriptsByCallId.set(callId, text); + const transcript = this.recordPartialUserTranscript(callId, text); + console.log( + `[voice-call] realtime input transcript callId=${callId} providerCallId=${callSid} final=false chars=${text.trim().length} aggregateChars=${transcript.length}`, + ); } return; } if (role === "user") { - this.partialUserTranscriptsByCallId.delete(callId); + const transcript = this.recordPartialUserTranscript(callId, text); + this.clearPartialUserTranscript(callId); + this.setRecentFinalUserTranscript(callId, transcript); + console.log( + `[voice-call] realtime input transcript callId=${callId} providerCallId=${callSid} final=true chars=${text.trim().length} aggregateChars=${transcript.length}`, + ); const event: NormalizedEvent = { id: `realtime-speech-${callSid}-${Date.now()}`, type: "call.speech", callId, providerCallId: callSid, timestamp: Date.now(), - transcript: text, + transcript, isFinal: true, }; this.manager.processEvent(event); + this.scheduleForcedAgentConsult({ + session, + callId, + callSid, + transcript, + clearAudio: () => { + const clearedBytes = audioPacer.clearAudio(); + console.log( + `[voice-call] realtime forced consult cleared outbound audio callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`, + ); + }, + }); return; } this.manager.processEvent({ @@ -359,6 +590,9 @@ export class RealtimeCallHandler { }); }, onToolCall: (toolEvent, session) => { + console.log( + `[voice-call] realtime tool call received callId=${callId} providerCallId=${callSid} tool=${toolEvent.name}`, + ); void this.executeToolCall( session, callId, @@ -373,9 +607,10 @@ export class RealtimeCallHandler { onClose: (reason) => { this.activeBridgesByCallId.delete(callId); this.activeBridgesByCallId.delete(callSid); - this.partialUserTranscriptsByCallId.delete(callId); + this.activeTelephonyClosersByCallId.delete(callId); + this.activeTelephonyClosersByCallId.delete(callSid); + this.clearUserTranscriptState(callId); if (reason !== "error") { - emitCallEnd("completed"); return; } emitCallEnd("error"); @@ -393,12 +628,21 @@ export class RealtimeCallHandler { }); }, }); + const closeTelephony = (reason: TelephonyCloseReason) => { + emitCallEnd(reason); + session.close(); + }; this.activeBridgesByCallId.set(callId, session); this.activeBridgesByCallId.set(callSid, session); + this.activeTelephonyClosersByCallId.set(callId, closeTelephony); + this.activeTelephonyClosersByCallId.set(callSid, closeTelephony); const sendAudioToSession = session.sendAudio.bind(session); session.sendAudio = (audio) => { if (speechDetector.accept(audio)) { - audioPacer.clearAudio(); + const clearedBytes = audioPacer.clearAudio(); + console.log( + `[voice-call] realtime outbound audio cleared by barge-in callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`, + ); } sendAudioToSession(audio); }; @@ -406,7 +650,10 @@ export class RealtimeCallHandler { session.close = () => { this.activeBridgesByCallId.delete(callId); this.activeBridgesByCallId.delete(callSid); - this.partialUserTranscriptsByCallId.delete(callId); + this.activeTelephonyClosersByCallId.delete(callId); + this.activeTelephonyClosersByCallId.delete(callSid); + this.clearUserTranscriptState(callId); + this.clearForcedConsultState(callId); audioPacer.close(); closeSession(); }; @@ -421,6 +668,227 @@ export class RealtimeCallHandler { return session; } + private recordPartialUserTranscript(callId: string, text: string): string { + const current = this.partialUserTranscriptsByCallId.get(callId); + const next = limitPartialUserTranscript(appendTranscriptText(current, text)); + this.partialUserTranscriptsByCallId.set(callId, next); + this.partialUserTranscriptUpdatedAtByCallId.set(callId, Date.now()); + return next; + } + + private clearPartialUserTranscript(callId: string): void { + this.partialUserTranscriptsByCallId.delete(callId); + this.partialUserTranscriptUpdatedAtByCallId.delete(callId); + } + + private setRecentFinalUserTranscript(callId: string, text: string): void { + this.clearRecentFinalUserTranscript(callId); + this.recentFinalUserTranscriptsByCallId.set(callId, text); + const timer = setTimeout(() => { + if (this.recentFinalUserTranscriptsByCallId.get(callId) === text) { + this.recentFinalUserTranscriptsByCallId.delete(callId); + } + this.recentFinalUserTranscriptTimersByCallId.delete(callId); + }, RECENT_FINAL_USER_TRANSCRIPT_TTL_MS); + timer.unref?.(); + this.recentFinalUserTranscriptTimersByCallId.set(callId, timer); + } + + private clearRecentFinalUserTranscript(callId: string): void { + const timer = this.recentFinalUserTranscriptTimersByCallId.get(callId); + if (timer) { + clearTimeout(timer); + this.recentFinalUserTranscriptTimersByCallId.delete(callId); + } + this.recentFinalUserTranscriptsByCallId.delete(callId); + } + + private clearUserTranscriptState(callId: string): void { + this.clearPartialUserTranscript(callId); + this.clearRecentFinalUserTranscript(callId); + } + + private resolveUserTranscriptContext(callId: string): string | undefined { + return ( + this.partialUserTranscriptsByCallId.get(callId) ?? + this.recentFinalUserTranscriptsByCallId.get(callId) + ); + } + + private consumePartialUserTranscript(callId: string, consumed: string | undefined): void { + const text = consumed?.trim(); + if (!text) { + return; + } + const current = this.partialUserTranscriptsByCallId.get(callId); + if (!current) { + return; + } + if (current === text) { + this.clearPartialUserTranscript(callId); + return; + } + if (current.toLowerCase().startsWith(text.toLowerCase())) { + const remaining = current.slice(text.length).trimStart(); + if (remaining) { + this.partialUserTranscriptsByCallId.set(callId, remaining); + } else { + this.clearPartialUserTranscript(callId); + } + } + const recent = this.recentFinalUserTranscriptsByCallId.get(callId); + if (!recent) { + return; + } + if (recent === text || recent.toLowerCase().startsWith(text.toLowerCase())) { + this.clearRecentFinalUserTranscript(callId); + } + } + + private async waitForConsultTranscriptSettle(callId: string, startedAt: number): Promise { + const deadline = startedAt + CONSULT_TRANSCRIPT_SETTLE_MAX_MS; + while (true) { + const updatedAt = this.partialUserTranscriptUpdatedAtByCallId.get(callId); + if (!updatedAt) { + return; + } + const now = Date.now(); + const quietFor = now - updatedAt; + if (quietFor >= CONSULT_TRANSCRIPT_SETTLE_MS || now >= deadline) { + return; + } + await new Promise((resolve) => + setTimeout(resolve, Math.min(CONSULT_TRANSCRIPT_SETTLE_MS - quietFor, deadline - now)), + ); + } + } + + private clearForcedConsultState(callId: string): void { + const timer = this.forcedConsultTimersByCallId.get(callId); + if (timer) { + clearTimeout(timer); + this.forcedConsultTimersByCallId.delete(callId); + } + this.forcedConsultInFlightByCallId.delete(callId); + this.forcedConsultsByCallId.delete(callId); + this.lastProviderConsultAtByCallId.delete(callId); + } + + private closeTelephonyBridge( + callIdOrSid: string, + bridge: ActiveRealtimeVoiceBridge | null, + reason: TelephonyCloseReason, + ): void { + const closer = this.activeTelephonyClosersByCallId.get(callIdOrSid); + if (closer) { + closer(reason); + return; + } + bridge?.close(); + } + + private scheduleForcedAgentConsult(params: { + session: ActiveRealtimeVoiceBridge; + callId: string; + callSid: string; + transcript: string; + clearAudio: () => void; + }): void { + if (this.config.consultPolicy !== "always") { + return; + } + const question = params.transcript.trim(); + if (!question) { + return; + } + const handler = this.toolHandlers.get(REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME); + if (!handler) { + return; + } + const existingTimer = this.forcedConsultTimersByCallId.get(params.callId); + if (existingTimer) { + clearTimeout(existingTimer); + } + const timer = setTimeout(() => { + this.forcedConsultTimersByCallId.delete(params.callId); + if (this.forcedConsultInFlightByCallId.has(params.callId)) { + return; + } + const lastProviderConsultAt = this.lastProviderConsultAtByCallId.get(params.callId) ?? 0; + if (Date.now() - lastProviderConsultAt < 2_000) { + return; + } + void this.runForcedAgentConsult({ + ...params, + question, + handler, + }); + }, FORCED_CONSULT_FALLBACK_DELAY_MS); + this.forcedConsultTimersByCallId.set(params.callId, timer); + } + + private async runForcedAgentConsult(params: { + session: ActiveRealtimeVoiceBridge; + callId: string; + callSid: string; + question: string; + clearAudio: () => void; + handler: ToolHandlerFn; + }): Promise { + this.forcedConsultInFlightByCallId.add(params.callId); + const startedAt = Date.now(); + console.log( + `[voice-call] realtime forced agent consult starting callId=${params.callId} providerCallId=${params.callSid} chars=${params.question.length}`, + ); + params.clearAudio(); + const state: ForcedConsultState = { + sendSpeechPrompt: true, + promise: Promise.resolve().then(() => + params.handler( + { + question: params.question, + context: + "The realtime provider produced a final user transcript without invoking openclaw_agent_consult, so OpenClaw is forcing the consult because consultPolicy is always.", + }, + params.callId, + {}, + ), + ), + }; + this.forcedConsultsByCallId.set(params.callId, state); + try { + const result = await state.promise; + state.completedAt = Date.now(); + const text = readSpeakableToolResultText(result); + if (!text) { + console.warn( + `[voice-call] realtime forced agent consult returned no speakable text callId=${params.callId} providerCallId=${params.callSid}`, + ); + return; + } + if (state.sendSpeechPrompt) { + params.clearAudio(); + params.session.sendUserMessage(buildForcedConsultSpeechPrompt(text)); + } + console.log( + `[voice-call] realtime forced agent consult completed callId=${params.callId} providerCallId=${params.callSid} elapsedMs=${Date.now() - startedAt}`, + ); + this.consumePartialUserTranscript(params.callId, params.question); + } catch (error) { + console.warn( + `[voice-call] realtime forced agent consult failed callId=${params.callId} providerCallId=${params.callSid} error=${formatErrorMessage(error)}`, + ); + } finally { + this.forcedConsultInFlightByCallId.delete(params.callId); + const cleanupTimer = setTimeout(() => { + if (this.forcedConsultsByCallId.get(params.callId) === state) { + this.forcedConsultsByCallId.delete(params.callId); + } + }, FORCED_CONSULT_NATIVE_DEDUPE_MS); + cleanupTimer.unref?.(); + } + } + private registerCallInManager( callSid: string, callerMeta: Omit = {}, @@ -495,25 +963,130 @@ export class RealtimeCallHandler { args: unknown, ): Promise { const handler = this.toolHandlers.get(name); - if ( - handler && - name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME && - bridge.bridge.supportsToolResultContinuation && - !this.config.fastContext.enabled - ) { - bridge.submitToolResult( - bridgeCallId, - buildRealtimeVoiceAgentConsultWorkingResponse("caller"), - { willContinue: true }, - ); - } - const result = !handler - ? { error: `Tool "${name}" not available` } - : await handler(args, callId, { - partialUserTranscript: this.partialUserTranscriptsByCallId.get(callId), - }).catch((error: unknown) => ({ + const startedAt = Date.now(); + if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { + this.lastProviderConsultAtByCallId.set(callId, Date.now()); + const timer = this.forcedConsultTimersByCallId.get(callId); + if (timer) { + clearTimeout(timer); + this.forcedConsultTimersByCallId.delete(callId); + } + const forcedConsult = this.forcedConsultsByCallId.get(callId); + if (forcedConsult) { + if (forcedConsult.completedAt) { + bridge.submitToolResult(bridgeCallId, { + status: "already_delivered", + message: "OpenClaw already delivered this consult result internally. Do not repeat it.", + }); + return; + } + forcedConsult.sendSpeechPrompt = false; + const result = await forcedConsult.promise.catch((error: unknown) => ({ error: formatErrorMessage(error), })); + bridge.submitToolResult(bridgeCallId, result); + return; + } + + const submitWorkingResponse = () => { + if ( + handler && + bridge.bridge.supportsToolResultContinuation && + !this.config.fastContext.enabled + ) { + bridge.submitToolResult( + bridgeCallId, + buildRealtimeVoiceAgentConsultWorkingResponse("caller"), + { willContinue: true }, + ); + } + }; + + const existingNativeConsult = this.nativeConsultsInFlightByCallId.get(callId); + if (existingNativeConsult) { + console.log( + `[voice-call] realtime tool call sharing in-flight agent consult callId=${callId} ageMs=${Date.now() - existingNativeConsult.startedAt}`, + ); + submitWorkingResponse(); + bridge.submitToolResult(bridgeCallId, await existingNativeConsult.promise); + return; + } + + submitWorkingResponse(); + const state: NativeConsultState = { + startedAt, + promise: Promise.resolve(), + }; + state.promise = (async () => { + await this.waitForConsultTranscriptSettle(callId, startedAt); + const context = { + partialUserTranscript: this.resolveUserTranscriptContext(callId), + }; + state.partialUserTranscript = context.partialUserTranscript; + const handlerArgs = withFallbackConsultQuestion(args, context.partialUserTranscript); + console.log( + `[voice-call] realtime tool call executing callId=${callId} tool=${name} hasHandler=${Boolean(handler)}`, + ); + return !handler + ? { error: `Tool "${name}" not available` } + : await handler(handlerArgs, callId, context); + })().catch((error: unknown) => ({ + error: formatErrorMessage(error), + })); + this.nativeConsultsInFlightByCallId.set(callId, state); + try { + const result = await state.promise; + const status = + result && typeof result === "object" && !Array.isArray(result) && "error" in result + ? "error" + : "ok"; + const error = + status === "error" && result && typeof result === "object" && !Array.isArray(result) + ? formatErrorMessage((result as { error?: unknown }).error ?? "unknown") + : undefined; + console.log( + `[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`, + ); + bridge.submitToolResult(bridgeCallId, result); + if (status === "ok") { + this.consumePartialUserTranscript(callId, state.partialUserTranscript); + } + } finally { + if (this.nativeConsultsInFlightByCallId.get(callId) === state) { + this.nativeConsultsInFlightByCallId.delete(callId); + } + } + return; + } + console.log( + `[voice-call] realtime tool call executing callId=${callId} tool=${name} hasHandler=${Boolean(handler)}`, + ); + const context = { + partialUserTranscript: this.resolveUserTranscriptContext(callId), + }; + const handlerArgs = + name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME + ? withFallbackConsultQuestion(args, context.partialUserTranscript) + : args; + const result = !handler + ? { error: `Tool "${name}" not available` } + : await handler(handlerArgs, callId, context).catch((error: unknown) => ({ + error: formatErrorMessage(error), + })); + const status = + result && typeof result === "object" && !Array.isArray(result) && "error" in result + ? "error" + : "ok"; + const error = + status === "error" && result && typeof result === "object" && !Array.isArray(result) + ? formatErrorMessage((result as { error?: unknown }).error ?? "unknown") + : undefined; + console.log( + `[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`, + ); bridge.submitToolResult(bridgeCallId, result); + if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME && status === "ok") { + this.consumePartialUserTranscript(callId, context.partialUserTranscript); + } } } diff --git a/src/plugins/capability-provider-runtime.test.ts b/src/plugins/capability-provider-runtime.test.ts index 543cced4a18..1cce48f4f34 100644 --- a/src/plugins/capability-provider-runtime.test.ts +++ b/src/plugins/capability-provider-runtime.test.ts @@ -924,6 +924,45 @@ describe("resolvePluginCapabilityProviders", () => { }); }); + it("loads requested realtime voice providers missing from active registry", () => { + const active = createEmptyPluginRegistry(); + active.realtimeVoiceProviders.push({ + pluginId: "openai", + pluginName: "openai", + source: "test", + provider: { id: "openai" }, + } as never); + const loaded = createEmptyPluginRegistry(); + loaded.realtimeVoiceProviders.push({ + pluginId: "google", + pluginName: "Google", + source: "test", + provider: { id: "google" }, + } as never); + mocks.loadPluginManifestRegistry.mockReturnValue({ + plugins: [ + { + id: "google", + origin: "bundled", + contracts: { realtimeVoiceProviders: ["google"] }, + }, + ] as never, + diagnostics: [], + }); + mocks.resolveRuntimePluginRegistry.mockImplementation((params?: unknown) => + params === undefined ? active : loaded, + ); + + const provider = resolvePluginCapabilityProvider({ + key: "realtimeVoiceProviders", + providerId: "google", + cfg: { plugins: { allow: ["openai", "google"] } } as OpenClawConfig, + }); + + expect(provider?.id).toBe("google"); + expectActiveRegistryLookup(["google"]); + }); + it("does not merge unrelated bundled capability providers when cfg requests one provider", () => { const active = createEmptyPluginRegistry(); active.speechProviders.push({ diff --git a/src/plugins/capability-provider-runtime.ts b/src/plugins/capability-provider-runtime.ts index f902c51c18f..9a5666f4404 100644 --- a/src/plugins/capability-provider-runtime.ts +++ b/src/plugins/capability-provider-runtime.ts @@ -361,7 +361,12 @@ function filterLoadedProvidersForRequestedConfig; entries: PluginRegistry[K]; }): PluginRegistry[K] { - if (params.key !== "speechProviders" && params.key !== "mediaUnderstandingProviders") { + if ( + params.key !== "speechProviders" && + params.key !== "realtimeTranscriptionProviders" && + params.key !== "realtimeVoiceProviders" && + params.key !== "mediaUnderstandingProviders" + ) { return [] as unknown as PluginRegistry[K]; } if (params.requested.size === 0) { @@ -386,7 +391,7 @@ function resolveRequestedCapabilityPluginIds(params: { cfg?: OpenClawConfig; requested?: Set; }): CapabilityPluginResolution | undefined { - if (params.key !== "speechProviders" || !params.requested || params.requested.size === 0) { + if (!params.requested || params.requested.size === 0) { return undefined; } const runtimePluginIds = new Set(); @@ -436,9 +441,7 @@ function loadCapabilityProviderEntries( ? loadedEntries : coldEntries; const missingRequested = - params.key === "speechProviders" && params.requested && params.requested.size > 0 - ? new Set(params.requested) - : undefined; + params.requested && params.requested.size > 0 ? new Set(params.requested) : undefined; if (missingRequested) { removeActiveProviderIds(missingRequested, entries); } @@ -551,17 +554,19 @@ export function resolvePluginCapabilityProviders entry.provider) as CapabilityProviderForKey[]; } } - let requestedSpeechProviders: Set | undefined; + let requestedProviders: Set | undefined; if (params.key === "speechProviders") { - requestedSpeechProviders = + requestedProviders = missingRequestedProviders ?? - (activeProviders.length === 0 ? collectRequestedSpeechProviderIds(params.cfg) : undefined); + (activeProviders.length === 0 + ? collectRequestedCapabilityProviderIds({ key: params.key, cfg: params.cfg }) + : undefined); } const pluginIds = resolveRequestedCapabilityPluginIds({ key: params.key, cfg: params.cfg, - requested: requestedSpeechProviders, + requested: requestedProviders, }) ?? resolveCapabilityPluginIds({ key: params.key, @@ -581,7 +586,7 @@ export function resolvePluginCapabilityProviders = {}; const runEmbeddedPiAgent = vi.fn(async () => ({ @@ -114,6 +124,7 @@ describe("realtime voice agent consult runtime", () => { thinkLevel: "high", timeoutMs: 10_000, prompt: expect.stringContaining("Caller: Can you check this?"), + extraSystemPrompt: expect.stringContaining("delegated requests"), }), ); }); @@ -232,4 +243,97 @@ describe("realtime voice agent consult runtime", () => { }), ); }); + + it("inherits requester message routing for forked consult sessions", async () => { + const { runtime, runEmbeddedPiAgent, sessionStore } = createAgentRuntime(); + sessionStore["agent:main:discord:channel:123"] = { + sessionId: "parent-session", + deliveryContext: { + channel: "discord", + to: "channel:123", + accountId: "default", + }, + updatedAt: 1, + }; + + await consultRealtimeVoiceAgent({ + cfg: {} as never, + agentRuntime: runtime as never, + logger: { warn: vi.fn() }, + agentId: "main", + sessionKey: "voice:google-meet:meet-1", + spawnedBy: "agent:main:discord:channel:123", + contextMode: "fork", + messageProvider: "voice", + lane: "voice", + runIdPrefix: "voice-realtime-consult:call-1", + args: { question: "Send a status message." }, + transcript: [], + surface: "a live phone call", + userLabel: "Caller", + }); + + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "voice:google-meet:meet-1", + spawnedBy: "agent:main:discord:channel:123", + messageProvider: "discord", + agentAccountId: "default", + messageTo: "channel:123", + currentChannelId: "channel:123", + }), + ); + expect(sessionStore["voice:google-meet:meet-1"]).toMatchObject({ + deliveryContext: { + channel: "discord", + to: "channel:123", + accountId: "default", + }, + lastChannel: "discord", + lastTo: "channel:123", + lastAccountId: "default", + }); + }); + + it("reuses the call session delivery context when requester metadata is absent", async () => { + const { runtime, runEmbeddedPiAgent, sessionStore } = createAgentRuntime(); + sessionStore["voice:google-meet:meet-1"] = { + sessionId: "call-session", + deliveryContext: { + channel: "discord", + to: "channel:123", + accountId: "default", + threadId: "thread-456", + }, + updatedAt: 1, + }; + + await consultRealtimeVoiceAgent({ + cfg: {} as never, + agentRuntime: runtime as never, + logger: { warn: vi.fn() }, + agentId: "main", + sessionKey: "voice:google-meet:meet-1", + messageProvider: "voice", + lane: "voice", + runIdPrefix: "voice-realtime-consult:call-1", + args: { question: "Send this to the original chat." }, + transcript: [], + surface: "a live phone call", + userLabel: "Caller", + }); + + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "call-session", + sessionKey: "voice:google-meet:meet-1", + messageProvider: "discord", + agentAccountId: "default", + messageTo: "channel:123", + messageThreadId: "thread-456", + currentChannelId: "channel:123", + currentThreadTs: "thread-456", + }), + ); + }); }); diff --git a/src/realtime-voice/agent-consult-runtime.ts b/src/realtime-voice/agent-consult-runtime.ts index 7d8a415552c..9d9f2997c2a 100644 --- a/src/realtime-voice/agent-consult-runtime.ts +++ b/src/realtime-voice/agent-consult-runtime.ts @@ -5,10 +5,16 @@ import { forkSessionFromParent, resolveParentForkDecision, } from "../auto-reply/reply/session-fork.js"; +import { parseSessionThreadInfoFast } from "../config/sessions/thread-info.js"; import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { RuntimeLogger, PluginRuntimeCore } from "../plugins/runtime/types-core.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; +import { + deliveryContextFromSession, + normalizeDeliveryContext, + type DeliveryContext, +} from "../utils/delivery-context.shared.js"; import { buildRealtimeVoiceAgentConsultPrompt, collectRealtimeVoiceAgentConsultVisibleText, @@ -53,11 +59,61 @@ function resolveRealtimeVoiceAgentSandboxSessionKey(agentId: string, sessionKey: return `agent:${agentId}:${trimmed}`; } +function hasRoutableDeliveryContext( + context: DeliveryContext | undefined, +): context is DeliveryContext & { channel: string; to: string } { + return Boolean(context?.channel && context?.to); +} + +function resolveDeliverySessionFields(context?: DeliveryContext): Partial { + const normalized = normalizeDeliveryContext(context); + if (!normalized?.channel || !normalized.to) { + return {}; + } + return { + deliveryContext: normalized, + lastChannel: normalized.channel, + lastTo: normalized.to, + lastAccountId: normalized.accountId, + lastThreadId: normalized.threadId, + }; +} + +function resolveRealtimeVoiceAgentDeliveryContext(params: { + agentRuntime: RealtimeVoiceAgentConsultRuntime; + storePath: string; + sessionKey: string; + spawnedBy?: string | null; +}): DeliveryContext | undefined { + const requesterSessionKey = params.spawnedBy?.trim(); + try { + const store = params.agentRuntime.session.loadSessionStore(params.storePath); + const candidates: string[] = []; + if (requesterSessionKey) { + const { baseSessionKey } = parseSessionThreadInfoFast(requesterSessionKey); + candidates.push( + ...[requesterSessionKey, baseSessionKey].filter((key): key is string => Boolean(key)), + ); + } + candidates.push(params.sessionKey); + for (const key of candidates) { + const context = deliveryContextFromSession(store[key] as SessionEntry | undefined); + if (hasRoutableDeliveryContext(context)) { + return context; + } + } + } catch { + // Best-effort routing enrichment only; consults should still work without it. + } + return undefined; +} + async function resolveRealtimeVoiceAgentConsultSessionEntry(params: { agentId: string; sessionKey: string; spawnedBy?: string | null; contextMode?: RealtimeVoiceAgentConsultContextMode; + deliveryContext?: DeliveryContext; storePath: string; agentRuntime: RealtimeVoiceAgentConsultRuntime; logger: Pick; @@ -65,8 +121,9 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: { const now = Date.now(); return await params.agentRuntime.session.updateSessionStore(params.storePath, async (store) => { const existing = store[params.sessionKey] as SessionEntry | undefined; + const deliveryFields = resolveDeliverySessionFields(params.deliveryContext); if (existing?.sessionId?.trim()) { - const next: SessionEntry = { ...existing, updatedAt: now }; + const next: SessionEntry = { ...existing, ...deliveryFields, updatedAt: now }; store[params.sessionKey] = next; return next; } @@ -94,6 +151,7 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: { if (fork) { const next: SessionEntry = { ...existing, + ...deliveryFields, sessionId: fork.sessionId, sessionFile: fork.sessionFile, spawnedBy: requesterSessionKey, @@ -111,6 +169,7 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: { const next: SessionEntry = { ...existing, + ...deliveryFields, sessionId: realtimeVoiceAgentConsultDeps.randomUUID(), ...(requesterSessionKey ? { spawnedBy: requesterSessionKey } : {}), updatedAt: now, @@ -153,15 +212,24 @@ export async function consultRealtimeVoiceAgent(params: { const storePath = params.agentRuntime.session.resolveStorePath(params.cfg.session?.store, { agentId, }); + const resolvedDeliveryContext = resolveRealtimeVoiceAgentDeliveryContext({ + agentRuntime: params.agentRuntime, + storePath, + sessionKey: params.sessionKey, + spawnedBy: params.spawnedBy, + }); const sessionEntry = await resolveRealtimeVoiceAgentConsultSessionEntry({ agentId, sessionKey: params.sessionKey, spawnedBy: params.spawnedBy, contextMode: params.contextMode, + deliveryContext: resolvedDeliveryContext, storePath, agentRuntime: params.agentRuntime, logger: params.logger, }); + const consultDeliveryContext = + resolvedDeliveryContext ?? deliveryContextFromSession(sessionEntry); const sessionId = sessionEntry.sessionId; const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, { @@ -173,7 +241,15 @@ export async function consultRealtimeVoiceAgent(params: { sandboxSessionKey: resolveRealtimeVoiceAgentSandboxSessionKey(agentId, params.sessionKey), agentId, spawnedBy: params.spawnedBy, - messageProvider: params.messageProvider, + messageProvider: consultDeliveryContext?.channel ?? params.messageProvider, + agentAccountId: consultDeliveryContext?.accountId, + messageTo: consultDeliveryContext?.to, + messageThreadId: consultDeliveryContext?.threadId, + currentChannelId: consultDeliveryContext?.to, + currentThreadTs: + consultDeliveryContext?.threadId != null + ? String(consultDeliveryContext.threadId) + : undefined, sessionFile, workspaceDir, config: params.cfg, @@ -197,7 +273,7 @@ export async function consultRealtimeVoiceAgent(params: { lane: params.lane, extraSystemPrompt: params.extraSystemPrompt ?? - "You are a behind-the-scenes consultant for a live voice agent. Be accurate, brief, and speakable.", + "You are the configured OpenClaw agent receiving delegated requests from a live voice bridge. Act on behalf of the user, use available tools when appropriate, and return a brief speakable result.", agentDir, }); diff --git a/src/realtime-voice/agent-consult-tool.test.ts b/src/realtime-voice/agent-consult-tool.test.ts index 358ef9923f5..71344486e3f 100644 --- a/src/realtime-voice/agent-consult-tool.test.ts +++ b/src/realtime-voice/agent-consult-tool.test.ts @@ -27,7 +27,18 @@ describe("realtime voice agent consult tool", () => { ); }); - it("builds a reusable spoken consultant prompt with recent transcript", () => { + it("accepts provider question aliases from realtime tool calls", () => { + expect(parseRealtimeVoiceAgentConsultArgs({ prompt: " Check the repo. " })).toMatchObject({ + question: "Check the repo.", + }); + expect( + parseRealtimeVoiceAgentConsultArgs({ query: " Send a Discord message. " }), + ).toMatchObject({ + question: "Send a Discord message.", + }); + }); + + it("builds a delegated voice request prompt with recent transcript", () => { const prompt = buildRealtimeVoiceAgentConsultPrompt({ args: { question: "Do we support realtime tools?" }, transcript: [ @@ -40,10 +51,13 @@ describe("realtime voice agent consult tool", () => { questionSourceLabel: "participant", }); - expect(prompt).toContain("during a private Google Meet"); + expect(prompt).toContain( + "Live voice request from the participant during a private Google Meet", + ); + expect(prompt).toContain("Act as the configured OpenClaw agent on behalf of this user"); expect(prompt).toContain("Participant: Can you check the repo?"); expect(prompt).toContain("Agent: I'll verify."); - expect(prompt).toContain("Question:\nDo we support realtime tools?"); + expect(prompt).toContain("User request:\nDo we support realtime tools?"); }); it("filters reasoning and error payloads from visible consult output", () => { diff --git a/src/realtime-voice/agent-consult-tool.ts b/src/realtime-voice/agent-consult-tool.ts index 25d524f7007..b379397bea0 100644 --- a/src/realtime-voice/agent-consult-tool.ts +++ b/src/realtime-voice/agent-consult-tool.ts @@ -26,7 +26,7 @@ export const REALTIME_VOICE_AGENT_CONSULT_TOOL: RealtimeVoiceTool = { type: "function", name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, description: - "Ask the full OpenClaw agent for deeper reasoning, current information, or tool-backed help before speaking.", + "Delegate the caller's request to the configured OpenClaw agent for normal tool-backed work, context, memory, or reasoning before speaking.", parameters: { type: "object", properties: { @@ -114,7 +114,11 @@ export function resolveRealtimeVoiceAgentConsultToolsAllow( } export function parseRealtimeVoiceAgentConsultArgs(args: unknown): RealtimeVoiceAgentConsultArgs { - const question = readConsultStringArg(args, "question"); + const question = + readConsultStringArg(args, "question") ?? + readConsultStringArg(args, "prompt") ?? + readConsultStringArg(args, "query") ?? + readConsultStringArg(args, "task"); if (!question) { throw new Error("question required"); } @@ -155,14 +159,14 @@ export function buildRealtimeVoiceAgentConsultPrompt(params: { .join("\n"); return [ - `You are helping an OpenClaw realtime voice agent during ${params.surface}.`, - `Answer the ${questionSourceLabel}'s question with the strongest useful reasoning and available tools.`, - "Return only the concise answer the realtime voice agent should speak next.", - "Do not include markdown, citations unless needed, tool logs, or private reasoning.", + `Live voice request from the ${questionSourceLabel} during ${params.surface}.`, + "Act as the configured OpenClaw agent on behalf of this user. Use available tools when the request asks you to do work.", + "When finished, return only the concise result the realtime voice agent should speak back.", + "Do not include markdown, tool logs, or private reasoning. Include citations only when the spoken answer needs them.", parsed.responseStyle ? `Spoken style: ${parsed.responseStyle}` : undefined, - transcript ? `Recent transcript:\n${transcript}` : undefined, - parsed.context ? `Additional context:\n${parsed.context}` : undefined, - `Question:\n${parsed.question}`, + transcript ? `Recent voice transcript for context:\n${transcript}` : undefined, + parsed.context ? `Additional realtime context:\n${parsed.context}` : undefined, + `User request:\n${parsed.question}`, ] .filter(Boolean) .join("\n\n"); diff --git a/src/realtime-voice/provider-registry.ts b/src/realtime-voice/provider-registry.ts index f8dff037acf..9065f7edc55 100644 --- a/src/realtime-voice/provider-registry.ts +++ b/src/realtime-voice/provider-registry.ts @@ -1,5 +1,8 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { resolvePluginCapabilityProviders } from "../plugins/capability-provider-runtime.js"; +import { + resolvePluginCapabilityProvider, + resolvePluginCapabilityProviders, +} from "../plugins/capability-provider-runtime.js"; import { buildCapabilityProviderMaps, normalizeCapabilityProviderId, @@ -39,6 +42,14 @@ export function getRealtimeVoiceProvider( if (!normalized) { return undefined; } + const directProvider = resolvePluginCapabilityProvider({ + key: "realtimeVoiceProviders", + providerId: normalized, + cfg, + }); + if (directProvider) { + return directProvider; + } return buildProviderMaps(cfg).aliases.get(normalized); }