diff --git a/CHANGELOG.md b/CHANGELOG.md index 06593138d76..2b59a5c35f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Google/Gemini: normalize retired `google/gemini-3-pro-preview` primary, fallback, and model-map refs during config load and unrelated config writes so saved config keeps targeting Gemini 3.1 Pro Preview. - Google/Gemini: normalize retired Gemini 3 Pro Preview ids inside emitted Google provider model config, so regenerated models.json rows test `google/gemini-3.1-pro-preview`. - Plugins/doctor: drop stale managed npm install records when `openclaw doctor --fix` removes npm packages that shadow bundled plugins, so the rebuilt registry no longer resurrects the removed package metadata. +- Discord/voice: reuse or suppress late realtime consult tool calls without stealing newer speaker context or speaking forced fallback answers twice. - Discord/voice: synthesize realtime playback timestamps from emitted Discord PCM so OpenAI realtime barge-in truncation no longer sees `audioEndMs=0` and skips legitimate interruptions. - Plugin SDK: keep activated linked plugin runtime facades loadable when bundled plugin fallback is disabled. Thanks @shakkernerd. - Feishu: auto-thread `message(action="send")` replies inside the topic when the active session is group_topic or group_topic_sender, and propagate `replyInThread` through text, card, and media outbound adapters so topic-scoped sessions no longer post at the group root. Fixes #74903. (#77151) Thanks @ai-hpc. diff --git a/docs/gateway/protocol.md b/docs/gateway/protocol.md index e73d2488913..1b9679cbc61 100644 --- a/docs/gateway/protocol.md +++ b/docs/gateway/protocol.md @@ -377,7 +377,7 @@ enumeration of `src/gateway/server-methods/*.ts`. - `talk.session.appendAudio` appends base64 PCM input audio to Gateway-owned realtime relay and transcription sessions. - `talk.session.startTurn`, `talk.session.endTurn`, and `talk.session.cancelTurn` drive managed-room turn lifecycle with stale-turn rejection before state is cleared. - `talk.session.cancelOutput` stops assistant audio output, primarily for VAD-gated barge-in in Gateway relay sessions. - - `talk.session.submitToolResult` completes a provider tool call emitted by a Gateway-owned realtime relay session. Pass `options: { willContinue: true }` for interim tool output when a final result will follow. + - `talk.session.submitToolResult` completes a provider tool call emitted by a Gateway-owned realtime relay session. Pass `options: { willContinue: true }` for interim tool output when a final result will follow, or `options: { suppressResponse: true }` when the tool result should satisfy the provider call without starting another realtime assistant response. - `talk.session.close` closes a Gateway-owned relay, transcription, or managed-room session and emits terminal Talk events. - `talk.mode` sets/broadcasts the current Talk mode state for WebChat/Control UI clients. - `talk.client.create` creates a client-owned realtime provider session using `webrtc` or `provider-websocket` while the Gateway owns config, credentials, instructions, and tool policy. diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index 7f57f6fa680..544ca8c7c46 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -126,6 +126,12 @@ await gateway.request("talk.session.submitToolResult", { result: { status: "working" }, options: { willContinue: true }, }); +await gateway.request("talk.session.submitToolResult", { + sessionId, + callId, + result: { status: "already_delivered" }, + options: { suppressResponse: true }, +}); await gateway.request("talk.session.submitToolResult", { sessionId, callId, result }); await gateway.request("talk.session.close", { sessionId }); @@ -178,15 +184,15 @@ Removed method map: The unified control vocabulary is also deliberately narrow: -| Method | Applies to | Contract | -| ------------------------------- | ------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------- | -| `talk.session.appendAudio` | `realtime/gateway-relay`, `transcription/gateway-relay` | Append a base64 PCM audio chunk to the provider session owned by the same Gateway connection. | -| `talk.session.startTurn` | `stt-tts/managed-room` | Start a managed-room user turn. | -| `talk.session.endTurn` | `stt-tts/managed-room` | End the active turn after stale-turn validation. | -| `talk.session.cancelTurn` | all Gateway-owned sessions | Cancel active capture/provider/agent/TTS work for a turn. | -| `talk.session.cancelOutput` | `realtime/gateway-relay` | Stop assistant audio output without necessarily ending the user turn. | -| `talk.session.submitToolResult` | `realtime/gateway-relay` | Complete a provider tool call emitted by the relay; pass `options.willContinue` for interim output before a final result. | -| `talk.session.close` | all unified sessions | Stop relay sessions or revoke managed-room state, then forget the unified session id. | +| Method | Applies to | Contract | +| ------------------------------- | ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `talk.session.appendAudio` | `realtime/gateway-relay`, `transcription/gateway-relay` | Append a base64 PCM audio chunk to the provider session owned by the same Gateway connection. | +| `talk.session.startTurn` | `stt-tts/managed-room` | Start a managed-room user turn. | +| `talk.session.endTurn` | `stt-tts/managed-room` | End the active turn after stale-turn validation. | +| `talk.session.cancelTurn` | all Gateway-owned sessions | Cancel active capture/provider/agent/TTS work for a turn. | +| `talk.session.cancelOutput` | `realtime/gateway-relay` | Stop assistant audio output without necessarily ending the user turn. | +| `talk.session.submitToolResult` | `realtime/gateway-relay` | Complete a provider tool call emitted by the relay; pass `options.willContinue` for interim output or `options.suppressResponse` to satisfy the call without another assistant response. | +| `talk.session.close` | all unified sessions | Stop relay sessions or revoke managed-room state, then forget the unified session id. | Do not introduce provider or platform special cases in core to make this work. Core owns Talk session semantics. Provider plugins own vendor session setup. diff --git a/extensions/discord/src/voice/manager.e2e.test.ts b/extensions/discord/src/voice/manager.e2e.test.ts index 0efc08dee50..ca28732c439 100644 --- a/extensions/discord/src/voice/manager.e2e.test.ts +++ b/extensions/discord/src/voice/manager.e2e.test.ts @@ -1296,6 +1296,338 @@ describe("DiscordVoiceManager", () => { ); }); + it("reuses forced agent-proxy answers for late matching consult tool calls", async () => { + agentCommandMock.mockResolvedValueOnce({ payloads: [{ text: "forced answer" }] }); + const manager = createManager({ + groupPolicy: "open", + voice: { + enabled: true, + mode: "agent-proxy", + realtime: { provider: "openai" }, + }, + }); + + await manager.join({ guildId: "g1", channelId: "1001" }); + const entry = getSessionEntry(manager) as { + realtime?: { + beginSpeakerTurn: ( + context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string }, + userId: string, + ) => { close: () => void; sendInputAudio: (audio: Buffer) => void }; + }; + }; + const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as + | { + onToolCall?: ( + event: { + itemId: string; + callId: string; + name: string; + args: unknown; + }, + session: typeof realtimeSessionMock, + ) => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + + const ownerTurn = entry.realtime?.beginSpeakerTurn( + { extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" }, + "u-owner", + ); + ownerTurn?.sendInputAudio(Buffer.alloc(8)); + bridgeParams?.onTranscript?.("user", "late question", true); + + await new Promise((resolve) => setTimeout(resolve, 260)); + + bridgeParams?.onToolCall?.( + { + itemId: "item-late", + callId: "call-late", + name: "openclaw_agent_consult", + args: { question: "late question" }, + }, + realtimeSessionMock, + ); + await Promise.resolve(); + await Promise.resolve(); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith( + expect.stringContaining("forced answer"), + ); + expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith( + "call-late", + { + status: "already_delivered", + message: "OpenClaw already delivered this answer to Discord voice.", + }, + { suppressResponse: true }, + ); + }); + + it("suppresses late forced agent-proxy tool calls when the forced consult rejects", async () => { + let rejectAgentTurn: ((error: unknown) => void) | undefined; + agentCommandMock.mockReturnValueOnce( + new Promise((_, reject) => { + rejectAgentTurn = reject; + }), + ); + const manager = createManager({ + groupPolicy: "open", + voice: { + enabled: true, + mode: "agent-proxy", + realtime: { provider: "openai" }, + }, + }); + + await manager.join({ guildId: "g1", channelId: "1001" }); + const entry = getSessionEntry(manager) as { + realtime?: { + beginSpeakerTurn: ( + context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string }, + userId: string, + ) => { close: () => void; sendInputAudio: (audio: Buffer) => void }; + }; + }; + const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as + | { + onToolCall?: ( + event: { + itemId: string; + callId: string; + name: string; + args: unknown; + }, + session: typeof realtimeSessionMock, + ) => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + + const ownerTurn = entry.realtime?.beginSpeakerTurn( + { extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" }, + "u-owner", + ); + ownerTurn?.sendInputAudio(Buffer.alloc(8)); + bridgeParams?.onTranscript?.("user", "late question", true); + + await new Promise((resolve) => setTimeout(resolve, 260)); + + bridgeParams?.onToolCall?.( + { + itemId: "item-late", + callId: "call-late", + name: "openclaw_agent_consult", + args: { question: "late question" }, + }, + realtimeSessionMock, + ); + rejectAgentTurn?.(new Error("agent broke")); + await Promise.resolve(); + await Promise.resolve(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith( + expect.stringContaining("I hit an error while checking that. Please try again."), + ); + expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith( + "call-late", + { + status: "already_delivered", + message: "OpenClaw already delivered this answer to Discord voice.", + }, + { suppressResponse: true }, + ); + }); + + it("does not reuse recent agent-proxy answers over newer speaker audio", async () => { + agentCommandMock + .mockResolvedValueOnce({ payloads: [{ text: "forced answer" }] }) + .mockResolvedValueOnce({ payloads: [{ text: "guest answer" }] }); + const manager = createManager({ + groupPolicy: "open", + voice: { + enabled: true, + mode: "agent-proxy", + realtime: { provider: "openai" }, + }, + }); + + await manager.join({ guildId: "g1", channelId: "1001" }); + const entry = getSessionEntry(manager) as { + realtime?: { + beginSpeakerTurn: ( + context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string }, + userId: string, + ) => { close: () => void; sendInputAudio: (audio: Buffer) => void }; + }; + }; + const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as + | { + onToolCall?: ( + event: { + itemId: string; + callId: string; + name: string; + args: unknown; + }, + session: typeof realtimeSessionMock, + ) => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + + const ownerTurn = entry.realtime?.beginSpeakerTurn( + { extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" }, + "u-owner", + ); + ownerTurn?.sendInputAudio(Buffer.alloc(8)); + bridgeParams?.onTranscript?.("user", "late question", true); + + await new Promise((resolve) => setTimeout(resolve, 260)); + + const guestTurn = entry.realtime?.beginSpeakerTurn( + { extraSystemPrompt: undefined, senderIsOwner: false, speakerLabel: "Guest" }, + "u-guest", + ); + guestTurn?.sendInputAudio(Buffer.alloc(8)); + + bridgeParams?.onToolCall?.( + { + itemId: "item-late", + callId: "call-late", + name: "openclaw_agent_consult", + args: { question: "late question" }, + }, + realtimeSessionMock, + ); + await Promise.resolve(); + await Promise.resolve(); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith( + expect.stringContaining("forced answer"), + ); + expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-late", { + error: "Discord speaker context changed before this realtime consult completed", + }); + + bridgeParams?.onTranscript?.("user", "guest followup", true); + await new Promise((resolve) => setTimeout(resolve, 260)); + + expect(agentCommandMock).toHaveBeenCalledTimes(2); + expect(agentCommandMock).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + message: expect.stringContaining("guest followup"), + senderIsOwner: false, + }), + expect.anything(), + ); + expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith( + expect.stringContaining("guest answer"), + ); + }); + + it("prefers the newest recent agent-proxy consult for repeated questions", async () => { + agentCommandMock + .mockResolvedValueOnce({ payloads: [{ text: "old direct answer" }] }) + .mockResolvedValueOnce({ payloads: [{ text: "new forced answer" }] }); + const manager = createManager({ + groupPolicy: "open", + voice: { + enabled: true, + mode: "agent-proxy", + realtime: { provider: "openai" }, + }, + }); + + await manager.join({ guildId: "g1", channelId: "1001" }); + const entry = getSessionEntry(manager) as { + realtime?: { + beginSpeakerTurn: ( + context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string }, + userId: string, + ) => { close: () => void; sendInputAudio: (audio: Buffer) => void }; + }; + }; + const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as + | { + onToolCall?: ( + event: { + itemId: string; + callId: string; + name: string; + args: unknown; + }, + session: typeof realtimeSessionMock, + ) => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + + const firstTurn = entry.realtime?.beginSpeakerTurn( + { extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" }, + "u-owner", + ); + firstTurn?.sendInputAudio(Buffer.alloc(8)); + bridgeParams?.onToolCall?.( + { + itemId: "item-old", + callId: "call-old", + name: "openclaw_agent_consult", + args: { question: "repeat question" }, + }, + realtimeSessionMock, + ); + await Promise.resolve(); + await Promise.resolve(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-old", { + text: "old direct answer", + }); + + const secondTurn = entry.realtime?.beginSpeakerTurn( + { extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" }, + "u-owner", + ); + secondTurn?.sendInputAudio(Buffer.alloc(8)); + bridgeParams?.onTranscript?.("user", "repeat question", true); + await new Promise((resolve) => setTimeout(resolve, 260)); + + bridgeParams?.onToolCall?.( + { + itemId: "item-new", + callId: "call-new", + name: "openclaw_agent_consult", + args: { question: "repeat question" }, + }, + realtimeSessionMock, + ); + await Promise.resolve(); + await Promise.resolve(); + + expect(agentCommandMock).toHaveBeenCalledTimes(2); + expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith( + expect.stringContaining("new forced answer"), + ); + expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith( + "call-new", + { + status: "already_delivered", + message: "OpenClaw already delivered this answer to Discord voice.", + }, + { suppressResponse: true }, + ); + expect(realtimeSessionMock.submitToolResult).not.toHaveBeenCalledWith("call-new", { + text: "old direct answer", + }); + }); + it("expires closed agent-proxy turns before later speaker audio", async () => { agentCommandMock.mockResolvedValueOnce({ payloads: [{ text: "guest answer" }] }); const manager = createManager({ diff --git a/extensions/discord/src/voice/realtime.ts b/extensions/discord/src/voice/realtime.ts index b0d60c03e4b..e0dd1b2540e 100644 --- a/extensions/discord/src/voice/realtime.ts +++ b/extensions/discord/src/voice/realtime.ts @@ -40,6 +40,8 @@ const logger = createSubsystemLogger("discord/voice"); const DISCORD_REALTIME_TALKBACK_DEBOUNCE_MS = 350; const DISCORD_REALTIME_FALLBACK_TEXT = "I hit an error while checking that. Please try again."; const DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT = 32; +const DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_LIMIT = 16; +const DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_TTL_MS = 15_000; const DISCORD_REALTIME_LOG_PREVIEW_CHARS = 500; const DISCORD_REALTIME_DEFAULT_MIN_BARGE_IN_AUDIO_END_MS = 250; const DISCORD_REALTIME_FORCED_CONSULT_FALLBACK_DELAY_MS = 200; @@ -61,9 +63,23 @@ type PendingSpeakerTurn = { type PendingAgentProxyConsultContext = { context: DiscordRealtimeSpeakerContext; question: string; + recent: RecentAgentProxyConsultContext; timer?: ReturnType; }; +type RecentAgentProxyConsultResult = + | { status: "fulfilled"; text: string } + | { status: "rejected"; error: string }; + +type RecentAgentProxyConsultContext = { + context: DiscordRealtimeSpeakerContext; + createdAt: number; + handledByForcedPlayback?: boolean; + promise?: Promise; + questions: string[]; + result?: RecentAgentProxyConsultResult; +}; + function formatRealtimeLogPreview(text: string): string { const oneLine = text.replace(/\s+/g, " ").trim(); if (oneLine.length <= DISCORD_REALTIME_LOG_PREVIEW_CHARS) { @@ -245,6 +261,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { private consultToolsAllow: string[] | undefined; private consultPolicy: "auto" | "always" = "auto"; private pendingAgentProxyConsultContexts: PendingAgentProxyConsultContext[] = []; + private recentAgentProxyConsultContexts: RecentAgentProxyConsultContext[] = []; private readonly pendingSpeakerTurns: PendingSpeakerTurn[] = []; private outputAudioTimestampMs = 0; private readonly playerIdleHandler = () => { @@ -383,6 +400,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { this.talkback.close(); this.clearForcedConsultTimers(); this.pendingAgentProxyConsultContexts = []; + this.recentAgentProxyConsultContexts = []; this.pendingSpeakerTurns.length = 0; this.clearOutputAudio(); this.bridge?.close(); @@ -506,7 +524,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { event: RealtimeVoiceToolCallEvent, session: RealtimeVoiceBridgeSession, ): void { - const callId = event.callId || event.itemId; + const callId = event.callId || event.itemId || "unknown"; if (event.name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { session.submitToolResult(callId, { error: `Tool "${event.name}" not available` }); return; @@ -532,8 +550,35 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { willContinue: true, }); } - const context = - this.consumeAgentProxyConsultContext(consultMessage) ?? this.consumePendingSpeakerContext(); + const pendingConsultContext = this.consumeAgentProxyConsultContext(consultMessage); + if (pendingConsultContext) { + this.addRecentAgentProxyConsultQuestion(pendingConsultContext.recent, consultMessage); + } + let context = pendingConsultContext?.context; + let recent = pendingConsultContext?.recent; + if (!context) { + const recentConsult = this.findRecentAgentProxyConsultContext(consultMessage); + if (recentConsult) { + if (this.hasPendingSpeakerAudioContext()) { + logger.info( + `discord voice: realtime consult matched recent agent result but newer speaker audio is pending call=${callId} speaker=${recentConsult.context.speakerLabel} owner=${recentConsult.context.senderIsOwner}`, + ); + session.submitToolResult(callId, { + error: "Discord speaker context changed before this realtime consult completed", + }); + return; + } + if (this.submitRecentAgentProxyConsultResult(callId, recentConsult, session)) { + return; + } + } + } + if (!context) { + context = this.consumePendingSpeakerContext(); + if (context) { + recent = this.rememberRecentAgentProxyConsultContext(consultMessage, context); + } + } if (!context) { logger.warn( `discord voice: realtime consult has no speaker context call=${callId || "unknown"}`, @@ -541,10 +586,14 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { session.submitToolResult(callId, { error: "No Discord speaker context available" }); return; } - void this.runAgentTurn({ + const promise = this.runAgentTurn({ context, message: consultMessage, - }) + }); + if (recent) { + this.setRecentAgentProxyConsultPromise(recent, promise); + } + void promise .then((text) => { logger.info( `discord voice: realtime consult answer (${text.length} chars) voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId} speaker=${context.speakerLabel} owner=${context.senderIsOwner}: ${formatRealtimeLogPreview(text)}`, @@ -585,10 +634,18 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { } const context = this.consumePendingSpeakerContext(); if (!context) { + const recent = this.findRecentAgentProxyConsultContext(question); + if (recent) { + logVoiceVerbose( + `realtime forced agent consult skipped (already delegated): guild ${this.params.entry.guildId} channel ${this.params.entry.channelId} speaker ${recent.context.userId}`, + ); + return; + } logger.warn("discord voice: realtime forced agent consult has no speaker context"); return; } - const pending: PendingAgentProxyConsultContext = { context, question }; + const recent = this.rememberRecentAgentProxyConsultContext(question, context); + const pending: PendingAgentProxyConsultContext = { context, question, recent }; this.pendingAgentProxyConsultContexts.push(pending); pending.timer = setTimeout(() => { pending.timer = undefined; @@ -613,7 +670,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { private consumeAgentProxyConsultContext( consultMessage: string, - ): DiscordRealtimeSpeakerContext | undefined { + ): PendingAgentProxyConsultContext | undefined { let pending: PendingAgentProxyConsultContext | undefined; if (this.pendingAgentProxyConsultContexts.length === 1) { pending = this.pendingAgentProxyConsultContexts.shift(); @@ -630,7 +687,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { return undefined; } this.clearForcedConsultTimer(pending); - return pending.context; + return pending; } private removePendingAgentProxyConsultContext(pending: PendingAgentProxyConsultContext): void { @@ -656,14 +713,17 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { this.syncOutputAudioTimestamp(); this.bridge?.handleBargeIn({ audioPlaybackActive: true, force: true }); this.clearOutputAudio(); + pending.recent.handledByForcedPlayback = true; try { - const text = await this.runAgentTurn({ + const promise = this.runAgentTurn({ context, message: [ question, "Context: The realtime model produced a final user transcript without calling openclaw_agent_consult. OpenClaw is forcing the consult because consultPolicy is always.", ].join("\n\n"), }); + this.setRecentAgentProxyConsultPromise(pending.recent, promise); + const text = await promise; logger.info( `discord voice: realtime forced agent consult answer (${text.length} chars) elapsedMs=${Date.now() - startedAt} voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId}: ${formatRealtimeLogPreview(text)}`, ); @@ -693,6 +753,12 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { return turn?.context; } + private hasPendingSpeakerAudioContext(): boolean { + this.prunePendingSpeakerTurns(); + this.expireClosedSpeakerTurnsBeforeLaterAudio(); + return this.pendingSpeakerTurns.some((turn) => turn.hasAudio); + } + private prunePendingSpeakerTurns(): void { for (let index = this.pendingSpeakerTurns.length - 1; index >= 0; index -= 1) { const turn = this.pendingSpeakerTurns[index]; @@ -720,6 +786,131 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { hasLaterAudio = true; } } + + private rememberRecentAgentProxyConsultContext( + question: string, + context: DiscordRealtimeSpeakerContext, + ): RecentAgentProxyConsultContext { + this.pruneRecentAgentProxyConsultContexts(); + const recent: RecentAgentProxyConsultContext = { + context, + createdAt: Date.now(), + questions: [question], + }; + this.recentAgentProxyConsultContexts.push(recent); + this.pruneRecentAgentProxyConsultContexts(); + return recent; + } + + private addRecentAgentProxyConsultQuestion( + recent: RecentAgentProxyConsultContext, + question: string, + ): void { + if ( + recent.questions.some((candidate) => matchesPendingAgentProxyQuestion(question, candidate)) + ) { + return; + } + recent.questions.push(question); + } + + private setRecentAgentProxyConsultPromise( + recent: RecentAgentProxyConsultContext, + promise: Promise, + ): void { + recent.promise = promise; + void promise + .then((text) => { + recent.result = { status: "fulfilled", text }; + }) + .catch((error: unknown) => { + recent.result = { status: "rejected", error: formatErrorMessage(error) }; + }); + } + + private findRecentAgentProxyConsultContext( + consultMessage: string, + ): RecentAgentProxyConsultContext | undefined { + this.pruneRecentAgentProxyConsultContexts(); + for (let index = this.recentAgentProxyConsultContexts.length - 1; index >= 0; index -= 1) { + const recent = this.recentAgentProxyConsultContexts[index]; + if ( + recent?.questions.some((question) => + matchesPendingAgentProxyQuestion(consultMessage, question), + ) + ) { + return recent; + } + } + return undefined; + } + + private submitRecentAgentProxyConsultResult( + callId: string, + recent: RecentAgentProxyConsultContext, + session: RealtimeVoiceBridgeSession, + ): boolean { + const submitAlreadyDelivered = () => { + session.submitToolResult( + callId, + { + status: "already_delivered", + message: "OpenClaw already delivered this answer to Discord voice.", + }, + { suppressResponse: true }, + ); + }; + const submitResult = (result: RecentAgentProxyConsultResult) => { + if (recent.handledByForcedPlayback) { + submitAlreadyDelivered(); + return; + } + if (result.status === "fulfilled") { + session.submitToolResult(callId, { text: result.text }); + return; + } + session.submitToolResult(callId, { error: result.error }); + }; + if (recent.result) { + logger.info( + `discord voice: realtime consult reused recent agent result call=${callId || "unknown"} speaker=${recent.context.speakerLabel} owner=${recent.context.senderIsOwner}`, + ); + submitResult(recent.result); + return true; + } + if (!recent.promise) { + return false; + } + logger.info( + `discord voice: realtime consult joined in-flight agent result call=${callId || "unknown"} speaker=${recent.context.speakerLabel} owner=${recent.context.senderIsOwner}`, + ); + if (recent.handledByForcedPlayback) { + void recent.promise.then(submitAlreadyDelivered, submitAlreadyDelivered); + return true; + } + void recent.promise + .then((text) => session.submitToolResult(callId, { text })) + .catch((error: unknown) => + session.submitToolResult(callId, { error: formatErrorMessage(error) }), + ); + return true; + } + + private pruneRecentAgentProxyConsultContexts(): void { + const minCreatedAt = Date.now() - DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_TTL_MS; + for (let index = this.recentAgentProxyConsultContexts.length - 1; index >= 0; index -= 1) { + const recent = this.recentAgentProxyConsultContexts[index]; + if (recent && recent.createdAt < minCreatedAt) { + this.recentAgentProxyConsultContexts.splice(index, 1); + } + } + while ( + this.recentAgentProxyConsultContexts.length > + DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_LIMIT + ) { + this.recentAgentProxyConsultContexts.shift(); + } + } } function isDiscordRealtimeSpeakerContext(value: unknown): value is DiscordRealtimeSpeakerContext { diff --git a/extensions/openai/realtime-voice-provider.test.ts b/extensions/openai/realtime-voice-provider.test.ts index e07af1f787d..edd288eb2b9 100644 --- a/extensions/openai/realtime-voice-provider.test.ts +++ b/extensions/openai/realtime-voice-provider.test.ts @@ -1037,6 +1037,39 @@ describe("buildOpenAIRealtimeVoiceProvider", () => { expect(parseSent(socket).filter((event) => event.type === "response.create")).toHaveLength(1); }); + it("does not request a realtime response for suppressed tool results", async () => { + const provider = buildOpenAIRealtimeVoiceProvider(); + const bridge = provider.createBridge({ + providerConfig: { apiKey: "sk-test" }, // pragma: allowlist secret + onAudio: vi.fn(), + onClearAudio: vi.fn(), + }); + const connecting = bridge.connect(); + const socket = FakeWebSocket.instances[0]; + if (!socket) { + throw new Error("expected bridge to create a websocket"); + } + + socket.readyState = FakeWebSocket.OPEN; + socket.emit("open"); + socket.emit("message", Buffer.from(JSON.stringify({ type: "session.updated" }))); + await connecting; + + bridge.submitToolResult("call_1", { status: "already_delivered" }, { suppressResponse: true }); + + expect(parseSent(socket).slice(-1)).toEqual([ + { + type: "conversation.item.create", + item: { + type: "function_call_output", + call_id: "call_1", + output: JSON.stringify({ status: "already_delivered" }), + }, + }, + ]); + expect(parseSent(socket)).not.toContainEqual({ type: "response.create" }); + }); + it("does not flush deferred response.create while a tool result is still continuing", async () => { const provider = buildOpenAIRealtimeVoiceProvider(); const onError = vi.fn(); diff --git a/extensions/openai/realtime-voice-provider.ts b/extensions/openai/realtime-voice-provider.ts index 02190fe6566..185ea61495d 100644 --- a/extensions/openai/realtime-voice-provider.ts +++ b/extensions/openai/realtime-voice-provider.ts @@ -368,6 +368,9 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge { return; } this.continuingToolCallIds.delete(callId); + if (options?.suppressResponse === true) { + return; + } this.requestResponseCreate(); } diff --git a/src/gateway/protocol/index.test.ts b/src/gateway/protocol/index.test.ts index 1af7b0117ea..732e7412ae9 100644 --- a/src/gateway/protocol/index.test.ts +++ b/src/gateway/protocol/index.test.ts @@ -409,7 +409,7 @@ describe("validateTalkSessionRelayParams", () => { sessionId: "session-1", callId: "call-1", result: { ok: true }, - options: { willContinue: true }, + options: { suppressResponse: true, willContinue: true }, }), ).toBe(true); }); diff --git a/src/gateway/protocol/schema/channels.ts b/src/gateway/protocol/schema/channels.ts index 94e53fdfb53..1f9c76a8c68 100644 --- a/src/gateway/protocol/schema/channels.ts +++ b/src/gateway/protocol/schema/channels.ts @@ -254,6 +254,7 @@ export const TalkSessionSubmitToolResultParamsSchema = Type.Object( options: Type.Optional( Type.Object( { + suppressResponse: Type.Optional(Type.Boolean()), willContinue: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, diff --git a/src/gateway/server-methods/talk.test.ts b/src/gateway/server-methods/talk.test.ts index b634a5322cf..eceb3cda3fe 100644 --- a/src/gateway/server-methods/talk.test.ts +++ b/src/gateway/server-methods/talk.test.ts @@ -567,7 +567,7 @@ describe("talk.session unified handlers", () => { sessionId: "relay-unified-1", callId: "call-1", result: { status: "working" }, - options: { willContinue: true }, + options: { suppressResponse: true, willContinue: true }, }, client: { connId: "conn-1" } as never, isWebchatConnect: () => false, @@ -579,7 +579,7 @@ describe("talk.session unified handlers", () => { connId: "conn-1", callId: "call-1", result: { status: "working" }, - options: { willContinue: true }, + options: { suppressResponse: true, willContinue: true }, }); const closeRespond = vi.fn(); diff --git a/src/gateway/talk-realtime-relay.test.ts b/src/gateway/talk-realtime-relay.test.ts index 58f458b1c28..a5628386681 100644 --- a/src/gateway/talk-realtime-relay.test.ts +++ b/src/gateway/talk-realtime-relay.test.ts @@ -245,6 +245,13 @@ describe("talk realtime gateway relay", () => { callId: "call-1", result: { ok: true }, }); + submitTalkRealtimeRelayToolResult({ + relaySessionId: session.relaySessionId, + connId: "conn-1", + callId: "call-2", + result: { status: "already_delivered" }, + options: { suppressResponse: true }, + }); cancelTalkRealtimeRelayTurn({ relaySessionId: session.relaySessionId, connId: "conn-1", @@ -261,6 +268,12 @@ describe("talk realtime gateway relay", () => { { willContinue: true }, ); expect(bridge.submitToolResult).toHaveBeenNthCalledWith(2, "call-1", { ok: true }, undefined); + expect(bridge.submitToolResult).toHaveBeenNthCalledWith( + 3, + "call-2", + { status: "already_delivered" }, + { suppressResponse: true }, + ); expect(bridge.handleBargeIn).toHaveBeenCalledWith({ audioPlaybackActive: true }); expect(bridge.close).toHaveBeenCalled(); expect(events).toEqual( diff --git a/src/talk/provider-types.ts b/src/talk/provider-types.ts index e4c691ee328..98f7974bb0f 100644 --- a/src/talk/provider-types.ts +++ b/src/talk/provider-types.ts @@ -50,6 +50,11 @@ export type RealtimeVoiceToolCallEvent = { }; export type RealtimeVoiceToolResultOptions = { + /** + * Submit the tool result without prompting the realtime provider to generate a new assistant + * response. Use when another channel has already delivered the user-visible answer. + */ + suppressResponse?: boolean; willContinue?: boolean; };