fix: stabilize Discord realtime voice proxy

This commit is contained in:
Peter Steinberger
2026-05-09 14:21:34 +01:00
parent b766b8e846
commit 6a2260eac0
12 changed files with 669 additions and 61 deletions

View File

@@ -49,6 +49,7 @@ Docs: https://docs.openclaw.ai
- Discord/voice: add bounded realtime gateway logs for voice channel joins, realtime model/voice selection, transcripts, consult routing/answers, and playback start, allow OpenAI realtime Discord sessions to disable input-triggered response interruption for echo-heavy rooms while keeping explicit Discord barge-in available for new and already-active speakers, and allow voice turns to target an existing Discord channel agent session.
- Discord/voice: add `voice.realtime.minBargeInAudioEndMs` and let the realtime provider own playback clearing, so speaker echo no longer cuts OpenAI realtime model audio at `audioEndMs=0` while low-echo rooms can opt back into immediate barge-in with `0`.
- Discord/voice: make `agent-proxy` the default voice mode so realtime voice acts as the microphone/speaker extension of the routed OpenClaw agent session, with `stt-tts` remaining available as an explicit fallback.
- Discord/voice: route default `agent-proxy` realtime turns through the OpenClaw consult handoff with owner-level tool access and a forced-consult transcript fallback, matching the Codex-style voice front end while keeping the routed agent authoritative.
- Discord/voice: keep OpenAI realtime bidi consults quiet while the supervisor agent is still working, accept Codex-style `conversation.item.done` function-call events, and preserve continuing tool results through the gateway relay so the OpenAI realtime bridge reliably routes consults before speaking the final answer.
- Discord/voice: include a bounded one-line STT transcript preview in verbose voice logs so live voice debugging shows what speakers said before the agent reply.
- Codex app-server: pin the managed Codex harness and Codex CLI smoke package to `@openai/codex@0.129.0`, defer OpenClaw integration dynamic tools behind Codex tool search by default, and accept current Codex service-tier values so legacy `fast` settings survive the stable harness upgrade as `priority`.

View File

@@ -1,4 +1,4 @@
0c9cdf45265ecb198f11fdc0fee838019f21020272d910596a5202c8074f573d config-baseline.json
7ac9eadabe0119deba4418dbaadc478092fa32617fab3f9618e0a14210720e4b config-baseline.core.json
7d3b4153a6eda2e83c32a0063d2ae179d16c3ce1e3b81a74a9b752fe418831ba config-baseline.channel.json
df93bfde8e3de8d6f80dbf1b0ae43ad250f216f2fc0244c5d9a19afca50806f6 config-baseline.plugin.json
0672b537aaacd7ef88dfa02f746216922420a9fdc30418bef4100c1436b88cbb config-baseline.json
0f1af8c72fe0c0ba9219e35b4547b8884d0556fc6e72b24ab9dd8fbcc2ff76a8 config-baseline.core.json
9edc62ae7dfedabc645470dd03102b813fc780b9108caf675fd661104714206f config-baseline.channel.json
1da42cb10427fb08510f29732493d24851ab915a424f91556569febdd450d9c3 config-baseline.plugin.json

View File

@@ -1197,10 +1197,10 @@ Auto-join example:
Notes:
- `voice.tts` overrides `messages.tts` for `stt-tts` voice playback only. Realtime modes use `voice.realtime.voice`.
- `voice.mode` controls the conversation path. The default is `agent-proxy`: a realtime voice shell handles turn timing, transcription, interruption, and playback, while the routed OpenClaw agent produces the answer with the same session/tool permissions as a typed Discord prompt from that speaker. `stt-tts` keeps the older batch STT plus TTS flow. `bidi` lets the realtime model converse directly while exposing `openclaw_agent_consult` for the OpenClaw brain.
- `voice.mode` controls the conversation path. The default is `agent-proxy`: a realtime voice front end handles turn timing, interruption, and playback, delegates substantive work to the routed OpenClaw agent through `openclaw_agent_consult`, and treats the result like a typed Discord prompt from that speaker. `stt-tts` keeps the older batch STT plus TTS flow. `bidi` lets the realtime model converse directly while exposing `openclaw_agent_consult` for the OpenClaw brain.
- `voice.agentSession` controls which OpenClaw conversation receives voice turns. Leave it unset for the voice channel's own session, or set `{ mode: "target", target: "channel:<text-channel-id>" }` to make the voice channel act as the microphone/speaker extension of an existing Discord text channel session such as `#maintainers`.
- `voice.model` overrides the OpenClaw agent brain for Discord voice responses and realtime consults. Leave it unset to inherit the routed agent model. It is separate from `voice.realtime.model`.
- `agent-proxy` routes speech through `discord-voice`, which preserves normal owner/tool authorization for the speaker and target session but hides the agent `tts` tool because Discord voice owns playback.
- `agent-proxy` routes speech through `discord-voice`, which preserves normal owner/tool authorization for the speaker and target session but hides the agent `tts` tool because Discord voice owns playback. By default, `agent-proxy` gives the consult full owner-equivalent tool access for owner speakers (`voice.realtime.toolPolicy: "owner"`) and strongly prefers consulting the OpenClaw agent before substantive answers (`voice.realtime.consultPolicy: "always"`).
- In `stt-tts` mode, STT uses `tools.media.audio`; `voice.model` does not affect transcription.
- In realtime modes, `voice.realtime.provider`, `voice.realtime.model`, and `voice.realtime.voice` configure the realtime audio session. For OpenAI Realtime 2 plus the Codex brain, use `voice.realtime.model: "gpt-realtime-2"` and `voice.model: "openai-codex/gpt-5.5"`.
- `voice.realtime.bargeIn` controls whether Discord speaker-start events interrupt active realtime playback. If unset, it follows the realtime provider's input-audio interruption setting.
@@ -1252,7 +1252,7 @@ Default agent-proxy voice-channel session example:
}
```
With no `voice.agentSession` block, each voice channel gets its own routed OpenClaw session. For example, `/vc join channel:234567890123456789` talks to the session for that Discord voice channel.
With no `voice.agentSession` block, each voice channel gets its own routed OpenClaw session. For example, `/vc join channel:234567890123456789` talks to the session for that Discord voice channel. The realtime model is only the voice front end; substantive requests are handed to the configured OpenClaw agent. If the realtime model produces a final transcript without calling the consult tool, OpenClaw forces the consult as a fallback so the default still behaves like talking to the agent.
Legacy STT plus TTS example:

View File

@@ -207,11 +207,11 @@ export const discordChannelConfigUiHints = {
},
"voice.realtime.toolPolicy": {
label: "Discord Realtime Tool Policy",
help: "Tool policy for the OpenClaw agent consult tool in bidi mode: safe-read-only, owner, or none.",
help: "Tool policy for the OpenClaw agent consult tool in realtime voice modes: safe-read-only, owner, or none. Default is owner for agent-proxy and safe-read-only for bidi.",
},
"voice.realtime.consultPolicy": {
label: "Discord Realtime Consult Policy",
help: "Use always to strongly prefer the OpenClaw agent brain for substantive bidi turns.",
help: "Use always to strongly prefer the OpenClaw agent brain for substantive realtime turns. agent-proxy defaults to always.",
},
"voice.realtime.bargeIn": {
label: "Discord Realtime Barge-In",

View File

@@ -892,14 +892,34 @@ describe("DiscordVoiceManager", () => {
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
| {
autoRespondToAudio?: boolean;
tools?: unknown[];
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
instructions?: string;
tools?: Array<{ name: string }>;
onToolCall?: (
event: {
itemId: string;
callId: string;
name: string;
args: unknown;
},
session: typeof realtimeSessionMock,
) => void;
}
| undefined;
expect(bridgeParams?.autoRespondToAudio).toBe(false);
expect(bridgeParams?.tools).toStrictEqual([]);
expect(bridgeParams?.autoRespondToAudio).toBe(true);
expect(bridgeParams?.instructions).toContain("same OpenClaw agent");
expect(bridgeParams?.tools?.map((tool) => tool.name)).toContain("openclaw_agent_consult");
bridgeParams?.onTranscript?.("user", "what did I ask?", true);
bridgeParams?.onToolCall?.(
{
itemId: "item-1",
callId: "call-1",
name: "openclaw_agent_consult",
args: { question: "what did I ask?" },
},
realtimeSessionMock,
);
await Promise.resolve();
await Promise.resolve();
await new Promise((resolve) => setTimeout(resolve, 20));
expect(agentCommandMock).toHaveBeenCalledWith(
@@ -910,9 +930,76 @@ describe("DiscordVoiceManager", () => {
}),
expect.anything(),
);
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("agent proxy answer"),
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith(
"call-1",
expect.objectContaining({ status: "working" }),
{ willContinue: true },
);
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-1", {
text: "agent proxy answer",
});
});
it("does not require speaker context for internal exact-speech consults", async () => {
const manager = createManager({
groupPolicy: "open",
voice: {
enabled: true,
mode: "agent-proxy",
realtime: { provider: "openai" },
},
});
await manager.join({ guildId: "g1", channelId: "1001" });
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
| {
onToolCall?: (
event: {
itemId: string;
callId: string;
name: string;
args: unknown;
},
session: typeof realtimeSessionMock,
) => void;
}
| undefined;
bridgeParams?.onToolCall?.(
{
itemId: "item-exact",
callId: "call-exact",
name: "openclaw_agent_consult",
args: {
question: "Speak the provided exact answer verbatim to the Discord voice channel.",
context: 'Provided answer text: "already answered"\\nSpoken style: verbatim only',
},
},
realtimeSessionMock,
);
bridgeParams?.onToolCall?.(
{
itemId: "item-internal",
callId: "call-internal",
name: "openclaw_agent_consult",
args: {
question: [
"Speak this exact OpenClaw answer to the Discord voice channel, without adding, removing, or rephrasing words.",
'Answer: "direct internal answer"',
].join("\n"),
},
},
realtimeSessionMock,
);
expect(agentCommandMock).not.toHaveBeenCalled();
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledTimes(2);
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-exact", {
text: "already answered",
});
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-internal", {
text: "direct internal answer",
});
});
it("creates a fresh realtime output stream after the Discord player idles", async () => {
@@ -1018,11 +1105,6 @@ describe("DiscordVoiceManager", () => {
"u-guest",
);
nonOwnerTurn?.sendInputAudio(Buffer.alloc(8));
const ownerTurn = entry?.realtime?.beginSpeakerTurn(
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
"u-owner",
);
ownerTurn?.sendInputAudio(Buffer.alloc(8));
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
| {
@@ -1030,7 +1112,12 @@ describe("DiscordVoiceManager", () => {
}
| undefined;
bridgeParams?.onTranscript?.("user", "non-owner question", true);
await new Promise((resolve) => setTimeout(resolve, 20));
const ownerTurn = entry?.realtime?.beginSpeakerTurn(
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
"u-owner",
);
ownerTurn?.sendInputAudio(Buffer.alloc(8));
await new Promise((resolve) => setTimeout(resolve, 260));
expect(agentCommandMock).toHaveBeenCalledWith(
expect.objectContaining({
@@ -1038,6 +1125,169 @@ describe("DiscordVoiceManager", () => {
}),
expect.anything(),
);
expect(realtimeSessionMock.handleBargeIn).toHaveBeenCalledWith({
audioPlaybackActive: true,
force: true,
});
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("non-owner answer"),
);
});
it("keeps separate forced agent-proxy fallback timers for rapid transcripts", async () => {
agentCommandMock
.mockResolvedValueOnce({ payloads: [{ text: "guest answer" }] })
.mockResolvedValueOnce({ payloads: [{ text: "owner answer" }] });
const manager = createManager({
groupPolicy: "open",
voice: {
enabled: true,
mode: "agent-proxy",
realtime: { provider: "openai" },
},
});
await manager.join({ guildId: "g1", channelId: "1001" });
const entry = getSessionEntry(manager) as {
realtime?: {
beginSpeakerTurn: (
context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string },
userId: string,
) => { close: () => void; sendInputAudio: (audio: Buffer) => void };
};
};
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
| {
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const guestTurn = entry.realtime?.beginSpeakerTurn(
{ extraSystemPrompt: undefined, senderIsOwner: false, speakerLabel: "Guest" },
"u-guest",
);
guestTurn?.sendInputAudio(Buffer.alloc(8));
bridgeParams?.onTranscript?.("user", "guest question", true);
const ownerTurn = entry.realtime?.beginSpeakerTurn(
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
"u-owner",
);
ownerTurn?.sendInputAudio(Buffer.alloc(8));
bridgeParams?.onTranscript?.("user", "owner question", true);
await new Promise((resolve) => setTimeout(resolve, 260));
expect(agentCommandMock).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
message: expect.stringContaining("guest question"),
senderIsOwner: false,
}),
expect.anything(),
);
expect(agentCommandMock).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
message: expect.stringContaining("owner question"),
senderIsOwner: true,
}),
expect.anything(),
);
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("guest answer"),
);
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("owner answer"),
);
});
it("matches agent-proxy consult tool calls to the pending transcript", async () => {
agentCommandMock
.mockResolvedValueOnce({ payloads: [{ text: "owner answer" }] })
.mockResolvedValueOnce({ payloads: [{ text: "guest fallback answer" }] });
const manager = createManager({
groupPolicy: "open",
voice: {
enabled: true,
mode: "agent-proxy",
realtime: { provider: "openai" },
},
});
await manager.join({ guildId: "g1", channelId: "1001" });
const entry = getSessionEntry(manager) as {
realtime?: {
beginSpeakerTurn: (
context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string },
userId: string,
) => { close: () => void; sendInputAudio: (audio: Buffer) => void };
};
};
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
| {
onToolCall?: (
event: {
itemId: string;
callId: string;
name: string;
args: unknown;
},
session: typeof realtimeSessionMock,
) => void;
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
}
| undefined;
const guestTurn = entry.realtime?.beginSpeakerTurn(
{ extraSystemPrompt: undefined, senderIsOwner: false, speakerLabel: "Guest" },
"u-guest",
);
guestTurn?.sendInputAudio(Buffer.alloc(8));
bridgeParams?.onTranscript?.("user", "guest question", true);
const ownerTurn = entry.realtime?.beginSpeakerTurn(
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
"u-owner",
);
ownerTurn?.sendInputAudio(Buffer.alloc(8));
bridgeParams?.onTranscript?.("user", "owner question", true);
bridgeParams?.onToolCall?.(
{
itemId: "item-owner",
callId: "call-owner",
name: "openclaw_agent_consult",
args: { question: "owner question" },
},
realtimeSessionMock,
);
await Promise.resolve();
await Promise.resolve();
await new Promise((resolve) => setTimeout(resolve, 260));
expect(agentCommandMock).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
message: expect.stringContaining("owner question"),
senderIsOwner: true,
}),
expect.anything(),
);
expect(agentCommandMock).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
message: expect.stringContaining("guest question"),
senderIsOwner: false,
}),
expect.anything(),
);
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-owner", {
text: "owner answer",
});
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("guest fallback answer"),
);
});
it("expires closed agent-proxy turns before later speaker audio", async () => {
@@ -1078,7 +1328,7 @@ describe("DiscordVoiceManager", () => {
}
| undefined;
bridgeParams?.onTranscript?.("user", "guest question", true);
await new Promise((resolve) => setTimeout(resolve, 20));
await new Promise((resolve) => setTimeout(resolve, 260));
expect(agentCommandMock).toHaveBeenCalledWith(
expect.objectContaining({
@@ -1086,6 +1336,9 @@ describe("DiscordVoiceManager", () => {
}),
expect.anything(),
);
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
expect.stringContaining("guest answer"),
);
});
it("starts Discord realtime voice in bidi mode with the consult tool", async () => {

View File

@@ -42,6 +42,7 @@ const DISCORD_REALTIME_FALLBACK_TEXT = "I hit an error while checking that. Plea
const DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT = 32;
const DISCORD_REALTIME_LOG_PREVIEW_CHARS = 500;
const DISCORD_REALTIME_DEFAULT_MIN_BARGE_IN_AUDIO_END_MS = 250;
const DISCORD_REALTIME_FORCED_CONSULT_FALLBACK_DELAY_MS = 200;
export type DiscordVoiceMode = "stt-tts" | "agent-proxy" | "bidi";
@@ -56,6 +57,12 @@ type PendingSpeakerTurn = {
closed: boolean;
};
type PendingAgentProxyConsultContext = {
context: DiscordRealtimeSpeakerContext;
question: string;
timer?: ReturnType<typeof setTimeout>;
};
function formatRealtimeLogPreview(text: string): string {
const oneLine = text.replace(/\s+/g, " ").trim();
if (oneLine.length <= DISCORD_REALTIME_LOG_PREVIEW_CHARS) {
@@ -147,11 +154,87 @@ export function resolveDiscordRealtimeBargeIn(params: {
export function buildDiscordSpeakExactUserMessage(text: string): string {
return [
"Internal OpenClaw voice playback result.",
"Do not call openclaw_agent_consult or any other tool for this message.",
"Speak this exact OpenClaw answer to the Discord voice channel, without adding, removing, or rephrasing words.",
`Answer: ${JSON.stringify(text)}`,
].join("\n");
}
function isEscapedQuote(text: string, quoteIndex: number): boolean {
let backslashes = 0;
for (let index = quoteIndex - 1; index >= 0 && text[index] === "\\"; index -= 1) {
backslashes += 1;
}
return backslashes % 2 === 1;
}
function readJsonStringAfterLabel(text: string, label: string): string | undefined {
const labelIndex = text.indexOf(label);
if (labelIndex < 0) {
return undefined;
}
const quoteIndex = text.indexOf('"', labelIndex + label.length);
if (quoteIndex < 0) {
return undefined;
}
for (let index = quoteIndex + 1; index < text.length; index += 1) {
if (text[index] !== '"' || isEscapedQuote(text, index)) {
continue;
}
try {
const parsed: unknown = JSON.parse(text.slice(quoteIndex, index + 1));
return typeof parsed === "string" ? parsed : undefined;
} catch {
return undefined;
}
}
return undefined;
}
function collectRealtimeConsultArgStrings(args: unknown): string[] {
if (!args || typeof args !== "object") {
return typeof args === "string" ? [args] : [];
}
const values: string[] = [];
for (const key of ["question", "prompt", "query", "task", "context", "responseStyle"]) {
const value = (args as Record<string, unknown>)[key];
if (typeof value === "string") {
values.push(value);
}
}
return values;
}
function extractDiscordExactSpeechConsultText(args: unknown): string | undefined {
const message = collectRealtimeConsultArgStrings(args).join("\n");
if (
!message.includes("Speak this exact OpenClaw answer") &&
!message.includes("Speak the provided exact answer verbatim")
) {
return undefined;
}
return (
readJsonStringAfterLabel(message, "Answer:") ??
readJsonStringAfterLabel(message, "Provided answer text:")
);
}
function normalizeRealtimeConsultMatchText(text: string): string {
return text.toLowerCase().replace(/\s+/g, " ").trim();
}
function matchesPendingAgentProxyQuestion(consultMessage: string, question: string): boolean {
const normalizedConsult = normalizeRealtimeConsultMatchText(consultMessage);
const normalizedQuestion = normalizeRealtimeConsultMatchText(question);
if (!normalizedConsult || !normalizedQuestion) {
return false;
}
return (
normalizedConsult.includes(normalizedQuestion) || normalizedQuestion.includes(normalizedConsult)
);
}
export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
private bridge: RealtimeVoiceBridgeSession | null = null;
private outputStream: PassThrough | null = null;
@@ -159,6 +242,8 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
private stopped = false;
private consultToolPolicy: RealtimeVoiceAgentConsultToolPolicy = "safe-read-only";
private consultToolsAllow: string[] | undefined;
private consultPolicy: "auto" | "always" = "auto";
private pendingAgentProxyConsultContexts: PendingAgentProxyConsultContext[] = [];
private readonly pendingSpeakerTurns: PendingSpeakerTurn[] = [];
private readonly playerIdleHandler = () => {
this.resetOutputStream();
@@ -207,13 +292,19 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
defaultModel: this.realtimeConfig?.model,
noRegisteredProviderMessage: "No configured realtime voice provider registered",
});
const isAgentProxy = isDiscordAgentProxyVoiceMode(this.params.mode);
const defaultToolPolicy: RealtimeVoiceAgentConsultToolPolicy = isAgentProxy
? "owner"
: "safe-read-only";
const toolPolicy = resolveRealtimeVoiceAgentConsultToolPolicy(
this.realtimeConfig?.toolPolicy,
"safe-read-only",
defaultToolPolicy,
);
this.consultToolPolicy = toolPolicy;
this.consultToolsAllow = resolveRealtimeVoiceAgentConsultToolsAllow(toolPolicy);
const consultPolicy = this.realtimeConfig?.consultPolicy ?? "auto";
const consultPolicy = this.realtimeConfig?.consultPolicy ?? (isAgentProxy ? "always" : "auto");
this.consultPolicy = consultPolicy;
const usesRealtimeAgentHandoff = this.params.mode === "bidi" || toolPolicy !== "none";
const interruptResponseOnInputAudio = resolveDiscordRealtimeInterruptResponseOnInputAudio({
realtimeConfig: this.realtimeConfig,
providerId: resolved.provider.id,
@@ -229,10 +320,10 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
providerConfig: resolved.providerConfig,
audioFormat: REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
instructions,
autoRespondToAudio: this.params.mode === "bidi",
autoRespondToAudio: usesRealtimeAgentHandoff,
interruptResponseOnInputAudio,
markStrategy: "ack-immediately",
tools: this.params.mode === "bidi" ? resolveRealtimeVoiceAgentConsultTools(toolPolicy) : [],
tools: usesRealtimeAgentHandoff ? resolveRealtimeVoiceAgentConsultTools(toolPolicy) : [],
audioSink: {
isOpen: () => !this.stopped,
sendAudio: (audio) => this.sendOutputAudio(audio),
@@ -247,6 +338,10 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
if (!isFinal || role !== "user" || !isDiscordAgentProxyVoiceMode(this.params.mode)) {
return;
}
if (usesRealtimeAgentHandoff) {
this.scheduleForcedAgentProxyConsult(text);
return;
}
this.talkback.enqueue(text, this.consumePendingSpeakerContext());
},
onToolCall: (event, session) => this.handleToolCall(event, session),
@@ -266,7 +361,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
readProviderConfigString(resolved.providerConfig, "model") ?? resolved.provider.defaultModel;
const resolvedVoice = readProviderConfigString(resolved.providerConfig, "voice");
logger.info(
`discord voice: realtime bridge starting mode=${this.params.mode} provider=${resolved.provider.id} model=${resolvedModel ?? "default"} voice=${resolvedVoice ?? "default"} consultPolicy=${consultPolicy} toolPolicy=${toolPolicy} autoRespond=${this.params.mode === "bidi"} interruptResponse=${interruptResponseOnInputAudio} bargeIn=${resolveDiscordRealtimeBargeIn(
`discord voice: realtime bridge starting mode=${this.params.mode} provider=${resolved.provider.id} model=${resolvedModel ?? "default"} voice=${resolvedVoice ?? "default"} consultPolicy=${consultPolicy} toolPolicy=${toolPolicy} autoRespond=${usesRealtimeAgentHandoff} interruptResponse=${interruptResponseOnInputAudio} bargeIn=${resolveDiscordRealtimeBargeIn(
{
realtimeConfig: this.realtimeConfig,
providerId: resolved.provider.id,
@@ -284,6 +379,8 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
close(): void {
this.stopped = true;
this.talkback.close();
this.clearForcedConsultTimers();
this.pendingAgentProxyConsultContexts = [];
this.pendingSpeakerTurns.length = 0;
this.clearOutputAudio();
this.bridge?.close();
@@ -397,12 +494,6 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
session: RealtimeVoiceBridgeSession,
): void {
const callId = event.callId || event.itemId;
if (this.params.mode !== "bidi") {
session.submitToolResult(callId, {
error: `Tool "${event.name}" is only available in bidi Discord voice mode`,
});
return;
}
if (event.name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) {
session.submitToolResult(callId, { error: `Tool "${event.name}" not available` });
return;
@@ -411,6 +502,14 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
session.submitToolResult(callId, { error: `Tool "${event.name}" not available` });
return;
}
const exactSpeechText = extractDiscordExactSpeechConsultText(event.args);
if (exactSpeechText !== undefined) {
logger.info(
`discord voice: realtime exact speech consult bypassed call=${callId || "unknown"} answerChars=${exactSpeechText.length}`,
);
session.submitToolResult(callId, { text: exactSpeechText });
return;
}
const consultMessage = buildRealtimeVoiceAgentConsultChatMessage(event.args);
logger.info(
`discord voice: realtime consult requested call=${callId || "unknown"} voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId} question=${formatRealtimeLogPreview(consultMessage)}`,
@@ -420,7 +519,8 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
willContinue: true,
});
}
const context = this.consumePendingSpeakerContext();
const context =
this.consumeAgentProxyConsultContext(consultMessage) ?? this.consumePendingSpeakerContext();
if (!context) {
logger.warn(
`discord voice: realtime consult has no speaker context call=${callId || "unknown"}`,
@@ -457,11 +557,116 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
return this.params.runAgentTurn({
context,
message: params.message,
toolsAllow: this.params.mode === "bidi" ? this.consultToolsAllow : undefined,
toolsAllow: this.consultToolsAllow,
userId: context.userId,
});
}
private scheduleForcedAgentProxyConsult(transcript: string): void {
if (this.consultPolicy !== "always") {
return;
}
const question = transcript.trim();
if (!question) {
return;
}
const context = this.consumePendingSpeakerContext();
if (!context) {
logger.warn("discord voice: realtime forced agent consult has no speaker context");
return;
}
const pending: PendingAgentProxyConsultContext = { context, question };
this.pendingAgentProxyConsultContexts.push(pending);
pending.timer = setTimeout(() => {
pending.timer = undefined;
void this.runForcedAgentProxyConsult(pending);
}, DISCORD_REALTIME_FORCED_CONSULT_FALLBACK_DELAY_MS);
pending.timer.unref?.();
}
private clearForcedConsultTimers(): void {
for (const pending of this.pendingAgentProxyConsultContexts) {
this.clearForcedConsultTimer(pending);
}
}
private clearForcedConsultTimer(pending: PendingAgentProxyConsultContext): void {
if (!pending.timer) {
return;
}
clearTimeout(pending.timer);
pending.timer = undefined;
}
private consumeAgentProxyConsultContext(
consultMessage: string,
): DiscordRealtimeSpeakerContext | undefined {
let pending: PendingAgentProxyConsultContext | undefined;
if (this.pendingAgentProxyConsultContexts.length === 1) {
pending = this.pendingAgentProxyConsultContexts.shift();
} else if (this.pendingAgentProxyConsultContexts.length > 1) {
const index = this.pendingAgentProxyConsultContexts.findIndex((candidate) =>
matchesPendingAgentProxyQuestion(consultMessage, candidate.question),
);
if (index < 0) {
return undefined;
}
pending = this.pendingAgentProxyConsultContexts.splice(index, 1)[0];
}
if (!pending) {
return undefined;
}
this.clearForcedConsultTimer(pending);
return pending.context;
}
private removePendingAgentProxyConsultContext(pending: PendingAgentProxyConsultContext): void {
this.clearForcedConsultTimer(pending);
const index = this.pendingAgentProxyConsultContexts.indexOf(pending);
if (index >= 0) {
this.pendingAgentProxyConsultContexts.splice(index, 1);
}
}
private async runForcedAgentProxyConsult(
pending: PendingAgentProxyConsultContext,
): Promise<void> {
this.removePendingAgentProxyConsultContext(pending);
const { context, question } = pending;
if (this.stopped) {
return;
}
const startedAt = Date.now();
logger.info(
`discord voice: realtime forced agent consult starting chars=${question.length} voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId} speaker=${context.speakerLabel} owner=${context.senderIsOwner}`,
);
this.bridge?.handleBargeIn({ audioPlaybackActive: true, force: true });
this.clearOutputAudio();
try {
const text = await this.runAgentTurn({
context,
message: [
question,
"Context: The realtime model produced a final user transcript without calling openclaw_agent_consult. OpenClaw is forcing the consult because consultPolicy is always.",
].join("\n\n"),
});
logger.info(
`discord voice: realtime forced agent consult answer (${text.length} chars) elapsedMs=${Date.now() - startedAt} voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId}: ${formatRealtimeLogPreview(text)}`,
);
if (text.trim()) {
this.clearOutputAudio();
this.bridge?.sendUserMessage(buildDiscordSpeakExactUserMessage(text));
}
} catch (error) {
logger.warn(
`discord voice: realtime forced agent consult failed elapsedMs=${Date.now() - startedAt}: ${formatErrorMessage(error)}`,
);
this.bridge?.sendUserMessage(
buildDiscordSpeakExactUserMessage(DISCORD_REALTIME_FALLBACK_TEXT),
);
}
}
private consumePendingSpeakerContext(): DiscordRealtimeSpeakerContext | undefined {
this.prunePendingSpeakerTurns();
this.expireClosedSpeakerTurnsBeforeLaterAudio();
@@ -557,8 +762,16 @@ function buildDiscordRealtimeInstructions(params: {
return [
base,
"Mode: OpenClaw agent proxy.",
"Use audio input only to transcribe the speaker. Do not answer user speech by yourself.",
"When OpenClaw sends an exact answer to speak, say only that answer.",
"You are the realtime voice surface for the same OpenClaw agent the user can message directly.",
"Do not mention a backend, supervisor, helper, or separate system. Present the result as your own work.",
"Delegate substantive requests, actions, tool work, current facts, memory, workspace context, and user-specific context with openclaw_agent_consult.",
"Do not block, refuse, or downscope at the voice layer. Delegate to OpenClaw and treat its result as authoritative.",
"Answer directly only for greetings, acknowledgements, brief latency tests, or filler while waiting.",
"When OpenClaw sends an internal exact answer to speak, do not call tools. Say only that answer.",
buildRealtimeVoiceAgentConsultPolicyInstructions({
toolPolicy: params.toolPolicy,
consultPolicy: params.consultPolicy,
}),
].join("\n\n");
}
return [

View File

@@ -1017,6 +1017,65 @@ describe("buildOpenAIRealtimeVoiceProvider", () => {
bridge.submitToolResult("call_1", { text: "done" });
expect(parseSent(socket).slice(-2)).toEqual([
{
type: "conversation.item.create",
item: {
type: "function_call_output",
call_id: "call_1",
output: JSON.stringify({ text: "done" }),
},
},
{ type: "response.create" },
]);
socket.emit(
"message",
Buffer.from(JSON.stringify({ type: "response.created", response: { id: "resp_2" } })),
);
socket.emit("message", Buffer.from(JSON.stringify({ type: "response.done" })));
expect(parseSent(socket).filter((event) => event.type === "response.create")).toHaveLength(1);
});
it("does not flush deferred response.create while a tool result is still continuing", async () => {
const provider = buildOpenAIRealtimeVoiceProvider();
const onError = vi.fn();
const bridge = provider.createBridge({
providerConfig: { apiKey: "sk-test" }, // pragma: allowlist secret
onAudio: vi.fn(),
onClearAudio: vi.fn(),
onError,
});
const connecting = bridge.connect();
const socket = FakeWebSocket.instances[0];
if (!socket) {
throw new Error("expected bridge to create a websocket");
}
socket.readyState = FakeWebSocket.OPEN;
socket.emit("open");
socket.emit("message", Buffer.from(JSON.stringify({ type: "session.updated" })));
await connecting;
bridge.submitToolResult("call_1", { status: "working" }, { willContinue: true });
socket.emit(
"message",
Buffer.from(
JSON.stringify({
type: "error",
error: {
message: "Conversation already has an active response in progress: resp_1",
},
}),
),
);
socket.emit("message", Buffer.from(JSON.stringify({ type: "response.done" })));
expect(onError).not.toHaveBeenCalled();
expect(parseSent(socket).some((event) => event.type === "response.create")).toBe(false);
bridge.submitToolResult("call_1", { text: "done" });
expect(parseSent(socket).slice(-2)).toEqual([
{
type: "conversation.item.create",
@@ -1160,6 +1219,54 @@ describe("buildOpenAIRealtimeVoiceProvider", () => {
});
});
it("force-cancels zero-length playback barge-in for agent handoff fallback", async () => {
const provider = buildOpenAIRealtimeVoiceProvider();
const onClearAudio = vi.fn();
const onEvent = vi.fn();
const bridge = provider.createBridge({
providerConfig: { apiKey: "sk-test" }, // pragma: allowlist secret
onAudio: vi.fn(),
onClearAudio,
onEvent,
});
const connecting = bridge.connect();
const socket = FakeWebSocket.instances[0];
if (!socket) {
throw new Error("expected bridge to create a websocket");
}
socket.readyState = FakeWebSocket.OPEN;
socket.emit("open");
socket.emit("message", Buffer.from(JSON.stringify({ type: "session.updated" })));
await connecting;
bridge.setMediaTimestamp(1000);
socket.emit(
"message",
Buffer.from(JSON.stringify({ type: "response.created", response: { id: "resp_1" } })),
);
socket.emit(
"message",
Buffer.from(
JSON.stringify({
type: "response.audio.delta",
item_id: "item_1",
delta: Buffer.from("assistant audio").toString("base64"),
}),
),
);
bridge.handleBargeIn?.({ audioPlaybackActive: true, force: true });
expect(parseSent(socket)).toContainEqual({ type: "response.cancel" });
expect(parseSent(socket)).toContainEqual(
expect.objectContaining({ type: "conversation.item.truncate" }),
);
expect(onClearAudio).toHaveBeenCalled();
expect(onEvent).not.toHaveBeenCalledWith(
expect.objectContaining({ type: "conversation.item.truncate.skipped" }),
);
});
it("allows immediate playback barge-in when the minimum audio window is zero", async () => {
const provider = buildOpenAIRealtimeVoiceProvider();
const onClearAudio = vi.fn();

View File

@@ -295,6 +295,7 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
private responseCreateInFlight = false;
private responseCancelInFlight = false;
private responseCreatePending = false;
private continuingToolCallIds = new Set<string>();
private latestMediaTimestamp = 0;
private lastAssistantItemId: string | null = null;
private toolCallBuffers = new Map<string, { name: string; callId: string; args: string }>();
@@ -362,9 +363,12 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
output: JSON.stringify(result),
},
});
if (options?.willContinue !== true) {
this.requestResponseCreate();
if (options?.willContinue === true) {
this.continuingToolCallIds.add(callId);
return;
}
this.continuingToolCallIds.delete(callId);
this.requestResponseCreate();
}
acknowledgeMark(): void {
@@ -840,16 +844,23 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
handleBargeIn(options?: RealtimeVoiceBargeInOptions): void {
const assistantItemId = this.lastAssistantItemId;
const responseStartTimestamp = this.responseStartTimestamp;
const force = options?.force === true;
const shouldInterruptProvider =
responseStartTimestamp !== null &&
assistantItemId !== null &&
(this.markQueue.length > 0 || options?.audioPlaybackActive === true);
((responseStartTimestamp !== null &&
(this.markQueue.length > 0 || options?.audioPlaybackActive === true)) ||
force);
const audioEndMs = shouldInterruptProvider
? Math.max(0, this.latestMediaTimestamp - responseStartTimestamp)
? Math.max(
0,
responseStartTimestamp === null
? this.latestMediaTimestamp
: this.latestMediaTimestamp - responseStartTimestamp,
)
: null;
const minBargeInAudioEndMs =
this.config.minBargeInAudioEndMs ?? OPENAI_REALTIME_DEFAULT_MIN_BARGE_IN_AUDIO_END_MS;
if (audioEndMs !== null && audioEndMs < minBargeInAudioEndMs) {
if (!force && audioEndMs !== null && audioEndMs < minBargeInAudioEndMs) {
this.config.onEvent?.({
direction: "client",
type: "conversation.item.truncate.skipped",
@@ -914,10 +925,16 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
}
private requestResponseCreate(): void {
if (this.responseActive || this.responseCreateInFlight || this.responseCancelInFlight) {
if (
this.responseActive ||
this.responseCreateInFlight ||
this.responseCancelInFlight ||
this.continuingToolCallIds.size > 0
) {
this.responseCreatePending = true;
return;
}
this.responseCreatePending = false;
this.responseCreateInFlight = true;
this.sendEvent({ type: "response.create" });
}
@@ -937,6 +954,7 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
this.responseCreateInFlight = false;
this.responseCancelInFlight = false;
this.responseCreatePending = false;
this.continuingToolCallIds.clear();
this.lastAssistantItemId = null;
this.toolCallBuffers.clear();
this.deliveredToolCallKeys.clear();

File diff suppressed because one or more lines are too long

View File

@@ -169,7 +169,9 @@ describe("realtime voice agent talkback queue", () => {
queue.enqueue("question");
await vi.advanceTimersByTimeAsync(1);
expect(logger.warn).toHaveBeenCalledWith("[test] consult failed: boom");
expect(logger.warn).toHaveBeenCalledWith(
expect.stringMatching(/^\[test\] consult failed: elapsedMs=\d+ boom$/),
);
expect(deliver).toHaveBeenCalledWith("fallback");
vi.useRealTimers();
});

View File

@@ -64,13 +64,17 @@ export function createRealtimeVoiceAgentTalkbackQueue(
question: trimmed,
metadata: pending.metadata,
};
let consultStartedAt: number | undefined;
try {
while (nextQuestion) {
if (params.isStopped()) {
return;
}
const currentQuestion = nextQuestion;
params.logger.info(`${params.logPrefix} consult: chars=${currentQuestion.question.length}`);
consultStartedAt = Date.now();
params.logger.info(
`${params.logPrefix} consult: chars=${currentQuestion.question.length} queued=${pendingQuestions.length}`,
);
activeAbortController = new AbortController();
const result = await params.consult({
question: currentQuestion.question,
@@ -80,6 +84,9 @@ export function createRealtimeVoiceAgentTalkbackQueue(
});
activeAbortController = undefined;
const text = result.text.trim();
params.logger.info(
`${params.logPrefix} consult done: elapsedMs=${Date.now() - consultStartedAt} answerChars=${text.length} queued=${pendingQuestions.length}`,
);
if (!params.isStopped() && text) {
params.deliver(text);
}
@@ -91,7 +98,9 @@ export function createRealtimeVoiceAgentTalkbackQueue(
return;
}
const message = error instanceof Error ? error.message : String(error);
params.logger.warn(`${params.logPrefix} consult failed: ${message}`);
const elapsedDetail =
consultStartedAt === undefined ? "" : ` elapsedMs=${Date.now() - consultStartedAt}`;
params.logger.warn(`${params.logPrefix} consult failed:${elapsedDetail} ${message}`);
params.deliver(params.fallbackText);
} finally {
active = false;
@@ -115,6 +124,9 @@ export function createRealtimeVoiceAgentTalkbackQueue(
}
if (active) {
appendPendingQuestion(pendingQuestions, { question: trimmed, metadata });
params.logger.info(
`${params.logPrefix} consult queued: chars=${trimmed.length} queued=${pendingQuestions.length}`,
);
clearDebounceTimer();
return;
}

View File

@@ -188,4 +188,6 @@ export type RealtimeVoiceBargeInOptions = {
* This lets providers interrupt output even when the sink cannot provide real playback marks.
*/
audioPlaybackActive?: boolean;
/** Interrupt even when normal barge-in audio-duration guards would treat the event as echo. */
force?: boolean;
};