mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 16:44:45 +00:00
fix(discord): suppress stale realtime consults
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Google/Gemini: normalize retired `google/gemini-3-pro-preview` primary, fallback, and model-map refs during config load and unrelated config writes so saved config keeps targeting Gemini 3.1 Pro Preview.
|
||||
- Google/Gemini: normalize retired Gemini 3 Pro Preview ids inside emitted Google provider model config, so regenerated models.json rows test `google/gemini-3.1-pro-preview`.
|
||||
- Plugins/doctor: drop stale managed npm install records when `openclaw doctor --fix` removes npm packages that shadow bundled plugins, so the rebuilt registry no longer resurrects the removed package metadata.
|
||||
- Discord/voice: reuse or suppress late realtime consult tool calls without stealing newer speaker context or speaking forced fallback answers twice.
|
||||
- Discord/voice: synthesize realtime playback timestamps from emitted Discord PCM so OpenAI realtime barge-in truncation no longer sees `audioEndMs=0` and skips legitimate interruptions.
|
||||
- Plugin SDK: keep activated linked plugin runtime facades loadable when bundled plugin fallback is disabled. Thanks @shakkernerd.
|
||||
- Feishu: auto-thread `message(action="send")` replies inside the topic when the active session is group_topic or group_topic_sender, and propagate `replyInThread` through text, card, and media outbound adapters so topic-scoped sessions no longer post at the group root. Fixes #74903. (#77151) Thanks @ai-hpc.
|
||||
|
||||
@@ -377,7 +377,7 @@ enumeration of `src/gateway/server-methods/*.ts`.
|
||||
- `talk.session.appendAudio` appends base64 PCM input audio to Gateway-owned realtime relay and transcription sessions.
|
||||
- `talk.session.startTurn`, `talk.session.endTurn`, and `talk.session.cancelTurn` drive managed-room turn lifecycle with stale-turn rejection before state is cleared.
|
||||
- `talk.session.cancelOutput` stops assistant audio output, primarily for VAD-gated barge-in in Gateway relay sessions.
|
||||
- `talk.session.submitToolResult` completes a provider tool call emitted by a Gateway-owned realtime relay session. Pass `options: { willContinue: true }` for interim tool output when a final result will follow.
|
||||
- `talk.session.submitToolResult` completes a provider tool call emitted by a Gateway-owned realtime relay session. Pass `options: { willContinue: true }` for interim tool output when a final result will follow, or `options: { suppressResponse: true }` when the tool result should satisfy the provider call without starting another realtime assistant response.
|
||||
- `talk.session.close` closes a Gateway-owned relay, transcription, or managed-room session and emits terminal Talk events.
|
||||
- `talk.mode` sets/broadcasts the current Talk mode state for WebChat/Control UI clients.
|
||||
- `talk.client.create` creates a client-owned realtime provider session using `webrtc` or `provider-websocket` while the Gateway owns config, credentials, instructions, and tool policy.
|
||||
|
||||
@@ -126,6 +126,12 @@ await gateway.request("talk.session.submitToolResult", {
|
||||
result: { status: "working" },
|
||||
options: { willContinue: true },
|
||||
});
|
||||
await gateway.request("talk.session.submitToolResult", {
|
||||
sessionId,
|
||||
callId,
|
||||
result: { status: "already_delivered" },
|
||||
options: { suppressResponse: true },
|
||||
});
|
||||
await gateway.request("talk.session.submitToolResult", { sessionId, callId, result });
|
||||
await gateway.request("talk.session.close", { sessionId });
|
||||
|
||||
@@ -178,15 +184,15 @@ Removed method map:
|
||||
|
||||
The unified control vocabulary is also deliberately narrow:
|
||||
|
||||
| Method | Applies to | Contract |
|
||||
| ------------------------------- | ------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `talk.session.appendAudio` | `realtime/gateway-relay`, `transcription/gateway-relay` | Append a base64 PCM audio chunk to the provider session owned by the same Gateway connection. |
|
||||
| `talk.session.startTurn` | `stt-tts/managed-room` | Start a managed-room user turn. |
|
||||
| `talk.session.endTurn` | `stt-tts/managed-room` | End the active turn after stale-turn validation. |
|
||||
| `talk.session.cancelTurn` | all Gateway-owned sessions | Cancel active capture/provider/agent/TTS work for a turn. |
|
||||
| `talk.session.cancelOutput` | `realtime/gateway-relay` | Stop assistant audio output without necessarily ending the user turn. |
|
||||
| `talk.session.submitToolResult` | `realtime/gateway-relay` | Complete a provider tool call emitted by the relay; pass `options.willContinue` for interim output before a final result. |
|
||||
| `talk.session.close` | all unified sessions | Stop relay sessions or revoke managed-room state, then forget the unified session id. |
|
||||
| Method | Applies to | Contract |
|
||||
| ------------------------------- | ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `talk.session.appendAudio` | `realtime/gateway-relay`, `transcription/gateway-relay` | Append a base64 PCM audio chunk to the provider session owned by the same Gateway connection. |
|
||||
| `talk.session.startTurn` | `stt-tts/managed-room` | Start a managed-room user turn. |
|
||||
| `talk.session.endTurn` | `stt-tts/managed-room` | End the active turn after stale-turn validation. |
|
||||
| `talk.session.cancelTurn` | all Gateway-owned sessions | Cancel active capture/provider/agent/TTS work for a turn. |
|
||||
| `talk.session.cancelOutput` | `realtime/gateway-relay` | Stop assistant audio output without necessarily ending the user turn. |
|
||||
| `talk.session.submitToolResult` | `realtime/gateway-relay` | Complete a provider tool call emitted by the relay; pass `options.willContinue` for interim output or `options.suppressResponse` to satisfy the call without another assistant response. |
|
||||
| `talk.session.close` | all unified sessions | Stop relay sessions or revoke managed-room state, then forget the unified session id. |
|
||||
|
||||
Do not introduce provider or platform special cases in core to make this work.
|
||||
Core owns Talk session semantics. Provider plugins own vendor session setup.
|
||||
|
||||
@@ -1296,6 +1296,338 @@ describe("DiscordVoiceManager", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("reuses forced agent-proxy answers for late matching consult tool calls", async () => {
|
||||
agentCommandMock.mockResolvedValueOnce({ payloads: [{ text: "forced answer" }] });
|
||||
const manager = createManager({
|
||||
groupPolicy: "open",
|
||||
voice: {
|
||||
enabled: true,
|
||||
mode: "agent-proxy",
|
||||
realtime: { provider: "openai" },
|
||||
},
|
||||
});
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
const entry = getSessionEntry(manager) as {
|
||||
realtime?: {
|
||||
beginSpeakerTurn: (
|
||||
context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string },
|
||||
userId: string,
|
||||
) => { close: () => void; sendInputAudio: (audio: Buffer) => void };
|
||||
};
|
||||
};
|
||||
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
|
||||
| {
|
||||
onToolCall?: (
|
||||
event: {
|
||||
itemId: string;
|
||||
callId: string;
|
||||
name: string;
|
||||
args: unknown;
|
||||
},
|
||||
session: typeof realtimeSessionMock,
|
||||
) => void;
|
||||
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
const ownerTurn = entry.realtime?.beginSpeakerTurn(
|
||||
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
|
||||
"u-owner",
|
||||
);
|
||||
ownerTurn?.sendInputAudio(Buffer.alloc(8));
|
||||
bridgeParams?.onTranscript?.("user", "late question", true);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 260));
|
||||
|
||||
bridgeParams?.onToolCall?.(
|
||||
{
|
||||
itemId: "item-late",
|
||||
callId: "call-late",
|
||||
name: "openclaw_agent_consult",
|
||||
args: { question: "late question" },
|
||||
},
|
||||
realtimeSessionMock,
|
||||
);
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
|
||||
expect.stringContaining("forced answer"),
|
||||
);
|
||||
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith(
|
||||
"call-late",
|
||||
{
|
||||
status: "already_delivered",
|
||||
message: "OpenClaw already delivered this answer to Discord voice.",
|
||||
},
|
||||
{ suppressResponse: true },
|
||||
);
|
||||
});
|
||||
|
||||
it("suppresses late forced agent-proxy tool calls when the forced consult rejects", async () => {
|
||||
let rejectAgentTurn: ((error: unknown) => void) | undefined;
|
||||
agentCommandMock.mockReturnValueOnce(
|
||||
new Promise((_, reject) => {
|
||||
rejectAgentTurn = reject;
|
||||
}),
|
||||
);
|
||||
const manager = createManager({
|
||||
groupPolicy: "open",
|
||||
voice: {
|
||||
enabled: true,
|
||||
mode: "agent-proxy",
|
||||
realtime: { provider: "openai" },
|
||||
},
|
||||
});
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
const entry = getSessionEntry(manager) as {
|
||||
realtime?: {
|
||||
beginSpeakerTurn: (
|
||||
context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string },
|
||||
userId: string,
|
||||
) => { close: () => void; sendInputAudio: (audio: Buffer) => void };
|
||||
};
|
||||
};
|
||||
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
|
||||
| {
|
||||
onToolCall?: (
|
||||
event: {
|
||||
itemId: string;
|
||||
callId: string;
|
||||
name: string;
|
||||
args: unknown;
|
||||
},
|
||||
session: typeof realtimeSessionMock,
|
||||
) => void;
|
||||
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
const ownerTurn = entry.realtime?.beginSpeakerTurn(
|
||||
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
|
||||
"u-owner",
|
||||
);
|
||||
ownerTurn?.sendInputAudio(Buffer.alloc(8));
|
||||
bridgeParams?.onTranscript?.("user", "late question", true);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 260));
|
||||
|
||||
bridgeParams?.onToolCall?.(
|
||||
{
|
||||
itemId: "item-late",
|
||||
callId: "call-late",
|
||||
name: "openclaw_agent_consult",
|
||||
args: { question: "late question" },
|
||||
},
|
||||
realtimeSessionMock,
|
||||
);
|
||||
rejectAgentTurn?.(new Error("agent broke"));
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
|
||||
expect.stringContaining("I hit an error while checking that. Please try again."),
|
||||
);
|
||||
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith(
|
||||
"call-late",
|
||||
{
|
||||
status: "already_delivered",
|
||||
message: "OpenClaw already delivered this answer to Discord voice.",
|
||||
},
|
||||
{ suppressResponse: true },
|
||||
);
|
||||
});
|
||||
|
||||
it("does not reuse recent agent-proxy answers over newer speaker audio", async () => {
|
||||
agentCommandMock
|
||||
.mockResolvedValueOnce({ payloads: [{ text: "forced answer" }] })
|
||||
.mockResolvedValueOnce({ payloads: [{ text: "guest answer" }] });
|
||||
const manager = createManager({
|
||||
groupPolicy: "open",
|
||||
voice: {
|
||||
enabled: true,
|
||||
mode: "agent-proxy",
|
||||
realtime: { provider: "openai" },
|
||||
},
|
||||
});
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
const entry = getSessionEntry(manager) as {
|
||||
realtime?: {
|
||||
beginSpeakerTurn: (
|
||||
context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string },
|
||||
userId: string,
|
||||
) => { close: () => void; sendInputAudio: (audio: Buffer) => void };
|
||||
};
|
||||
};
|
||||
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
|
||||
| {
|
||||
onToolCall?: (
|
||||
event: {
|
||||
itemId: string;
|
||||
callId: string;
|
||||
name: string;
|
||||
args: unknown;
|
||||
},
|
||||
session: typeof realtimeSessionMock,
|
||||
) => void;
|
||||
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
const ownerTurn = entry.realtime?.beginSpeakerTurn(
|
||||
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
|
||||
"u-owner",
|
||||
);
|
||||
ownerTurn?.sendInputAudio(Buffer.alloc(8));
|
||||
bridgeParams?.onTranscript?.("user", "late question", true);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 260));
|
||||
|
||||
const guestTurn = entry.realtime?.beginSpeakerTurn(
|
||||
{ extraSystemPrompt: undefined, senderIsOwner: false, speakerLabel: "Guest" },
|
||||
"u-guest",
|
||||
);
|
||||
guestTurn?.sendInputAudio(Buffer.alloc(8));
|
||||
|
||||
bridgeParams?.onToolCall?.(
|
||||
{
|
||||
itemId: "item-late",
|
||||
callId: "call-late",
|
||||
name: "openclaw_agent_consult",
|
||||
args: { question: "late question" },
|
||||
},
|
||||
realtimeSessionMock,
|
||||
);
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
|
||||
expect.stringContaining("forced answer"),
|
||||
);
|
||||
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-late", {
|
||||
error: "Discord speaker context changed before this realtime consult completed",
|
||||
});
|
||||
|
||||
bridgeParams?.onTranscript?.("user", "guest followup", true);
|
||||
await new Promise((resolve) => setTimeout(resolve, 260));
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(2);
|
||||
expect(agentCommandMock).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.objectContaining({
|
||||
message: expect.stringContaining("guest followup"),
|
||||
senderIsOwner: false,
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
|
||||
expect.stringContaining("guest answer"),
|
||||
);
|
||||
});
|
||||
|
||||
it("prefers the newest recent agent-proxy consult for repeated questions", async () => {
|
||||
agentCommandMock
|
||||
.mockResolvedValueOnce({ payloads: [{ text: "old direct answer" }] })
|
||||
.mockResolvedValueOnce({ payloads: [{ text: "new forced answer" }] });
|
||||
const manager = createManager({
|
||||
groupPolicy: "open",
|
||||
voice: {
|
||||
enabled: true,
|
||||
mode: "agent-proxy",
|
||||
realtime: { provider: "openai" },
|
||||
},
|
||||
});
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
const entry = getSessionEntry(manager) as {
|
||||
realtime?: {
|
||||
beginSpeakerTurn: (
|
||||
context: { extraSystemPrompt?: string; senderIsOwner: boolean; speakerLabel: string },
|
||||
userId: string,
|
||||
) => { close: () => void; sendInputAudio: (audio: Buffer) => void };
|
||||
};
|
||||
};
|
||||
const bridgeParams = createRealtimeVoiceBridgeSessionMock.mock.calls.at(-1)?.[0] as
|
||||
| {
|
||||
onToolCall?: (
|
||||
event: {
|
||||
itemId: string;
|
||||
callId: string;
|
||||
name: string;
|
||||
args: unknown;
|
||||
},
|
||||
session: typeof realtimeSessionMock,
|
||||
) => void;
|
||||
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
const firstTurn = entry.realtime?.beginSpeakerTurn(
|
||||
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
|
||||
"u-owner",
|
||||
);
|
||||
firstTurn?.sendInputAudio(Buffer.alloc(8));
|
||||
bridgeParams?.onToolCall?.(
|
||||
{
|
||||
itemId: "item-old",
|
||||
callId: "call-old",
|
||||
name: "openclaw_agent_consult",
|
||||
args: { question: "repeat question" },
|
||||
},
|
||||
realtimeSessionMock,
|
||||
);
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith("call-old", {
|
||||
text: "old direct answer",
|
||||
});
|
||||
|
||||
const secondTurn = entry.realtime?.beginSpeakerTurn(
|
||||
{ extraSystemPrompt: undefined, senderIsOwner: true, speakerLabel: "Owner" },
|
||||
"u-owner",
|
||||
);
|
||||
secondTurn?.sendInputAudio(Buffer.alloc(8));
|
||||
bridgeParams?.onTranscript?.("user", "repeat question", true);
|
||||
await new Promise((resolve) => setTimeout(resolve, 260));
|
||||
|
||||
bridgeParams?.onToolCall?.(
|
||||
{
|
||||
itemId: "item-new",
|
||||
callId: "call-new",
|
||||
name: "openclaw_agent_consult",
|
||||
args: { question: "repeat question" },
|
||||
},
|
||||
realtimeSessionMock,
|
||||
);
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(2);
|
||||
expect(realtimeSessionMock.sendUserMessage).toHaveBeenCalledWith(
|
||||
expect.stringContaining("new forced answer"),
|
||||
);
|
||||
expect(realtimeSessionMock.submitToolResult).toHaveBeenCalledWith(
|
||||
"call-new",
|
||||
{
|
||||
status: "already_delivered",
|
||||
message: "OpenClaw already delivered this answer to Discord voice.",
|
||||
},
|
||||
{ suppressResponse: true },
|
||||
);
|
||||
expect(realtimeSessionMock.submitToolResult).not.toHaveBeenCalledWith("call-new", {
|
||||
text: "old direct answer",
|
||||
});
|
||||
});
|
||||
|
||||
it("expires closed agent-proxy turns before later speaker audio", async () => {
|
||||
agentCommandMock.mockResolvedValueOnce({ payloads: [{ text: "guest answer" }] });
|
||||
const manager = createManager({
|
||||
|
||||
@@ -40,6 +40,8 @@ const logger = createSubsystemLogger("discord/voice");
|
||||
const DISCORD_REALTIME_TALKBACK_DEBOUNCE_MS = 350;
|
||||
const DISCORD_REALTIME_FALLBACK_TEXT = "I hit an error while checking that. Please try again.";
|
||||
const DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT = 32;
|
||||
const DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_LIMIT = 16;
|
||||
const DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_TTL_MS = 15_000;
|
||||
const DISCORD_REALTIME_LOG_PREVIEW_CHARS = 500;
|
||||
const DISCORD_REALTIME_DEFAULT_MIN_BARGE_IN_AUDIO_END_MS = 250;
|
||||
const DISCORD_REALTIME_FORCED_CONSULT_FALLBACK_DELAY_MS = 200;
|
||||
@@ -61,9 +63,23 @@ type PendingSpeakerTurn = {
|
||||
type PendingAgentProxyConsultContext = {
|
||||
context: DiscordRealtimeSpeakerContext;
|
||||
question: string;
|
||||
recent: RecentAgentProxyConsultContext;
|
||||
timer?: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
type RecentAgentProxyConsultResult =
|
||||
| { status: "fulfilled"; text: string }
|
||||
| { status: "rejected"; error: string };
|
||||
|
||||
type RecentAgentProxyConsultContext = {
|
||||
context: DiscordRealtimeSpeakerContext;
|
||||
createdAt: number;
|
||||
handledByForcedPlayback?: boolean;
|
||||
promise?: Promise<string>;
|
||||
questions: string[];
|
||||
result?: RecentAgentProxyConsultResult;
|
||||
};
|
||||
|
||||
function formatRealtimeLogPreview(text: string): string {
|
||||
const oneLine = text.replace(/\s+/g, " ").trim();
|
||||
if (oneLine.length <= DISCORD_REALTIME_LOG_PREVIEW_CHARS) {
|
||||
@@ -245,6 +261,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
private consultToolsAllow: string[] | undefined;
|
||||
private consultPolicy: "auto" | "always" = "auto";
|
||||
private pendingAgentProxyConsultContexts: PendingAgentProxyConsultContext[] = [];
|
||||
private recentAgentProxyConsultContexts: RecentAgentProxyConsultContext[] = [];
|
||||
private readonly pendingSpeakerTurns: PendingSpeakerTurn[] = [];
|
||||
private outputAudioTimestampMs = 0;
|
||||
private readonly playerIdleHandler = () => {
|
||||
@@ -383,6 +400,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
this.talkback.close();
|
||||
this.clearForcedConsultTimers();
|
||||
this.pendingAgentProxyConsultContexts = [];
|
||||
this.recentAgentProxyConsultContexts = [];
|
||||
this.pendingSpeakerTurns.length = 0;
|
||||
this.clearOutputAudio();
|
||||
this.bridge?.close();
|
||||
@@ -506,7 +524,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
event: RealtimeVoiceToolCallEvent,
|
||||
session: RealtimeVoiceBridgeSession,
|
||||
): void {
|
||||
const callId = event.callId || event.itemId;
|
||||
const callId = event.callId || event.itemId || "unknown";
|
||||
if (event.name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) {
|
||||
session.submitToolResult(callId, { error: `Tool "${event.name}" not available` });
|
||||
return;
|
||||
@@ -532,8 +550,35 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
willContinue: true,
|
||||
});
|
||||
}
|
||||
const context =
|
||||
this.consumeAgentProxyConsultContext(consultMessage) ?? this.consumePendingSpeakerContext();
|
||||
const pendingConsultContext = this.consumeAgentProxyConsultContext(consultMessage);
|
||||
if (pendingConsultContext) {
|
||||
this.addRecentAgentProxyConsultQuestion(pendingConsultContext.recent, consultMessage);
|
||||
}
|
||||
let context = pendingConsultContext?.context;
|
||||
let recent = pendingConsultContext?.recent;
|
||||
if (!context) {
|
||||
const recentConsult = this.findRecentAgentProxyConsultContext(consultMessage);
|
||||
if (recentConsult) {
|
||||
if (this.hasPendingSpeakerAudioContext()) {
|
||||
logger.info(
|
||||
`discord voice: realtime consult matched recent agent result but newer speaker audio is pending call=${callId} speaker=${recentConsult.context.speakerLabel} owner=${recentConsult.context.senderIsOwner}`,
|
||||
);
|
||||
session.submitToolResult(callId, {
|
||||
error: "Discord speaker context changed before this realtime consult completed",
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (this.submitRecentAgentProxyConsultResult(callId, recentConsult, session)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!context) {
|
||||
context = this.consumePendingSpeakerContext();
|
||||
if (context) {
|
||||
recent = this.rememberRecentAgentProxyConsultContext(consultMessage, context);
|
||||
}
|
||||
}
|
||||
if (!context) {
|
||||
logger.warn(
|
||||
`discord voice: realtime consult has no speaker context call=${callId || "unknown"}`,
|
||||
@@ -541,10 +586,14 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
session.submitToolResult(callId, { error: "No Discord speaker context available" });
|
||||
return;
|
||||
}
|
||||
void this.runAgentTurn({
|
||||
const promise = this.runAgentTurn({
|
||||
context,
|
||||
message: consultMessage,
|
||||
})
|
||||
});
|
||||
if (recent) {
|
||||
this.setRecentAgentProxyConsultPromise(recent, promise);
|
||||
}
|
||||
void promise
|
||||
.then((text) => {
|
||||
logger.info(
|
||||
`discord voice: realtime consult answer (${text.length} chars) voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId} speaker=${context.speakerLabel} owner=${context.senderIsOwner}: ${formatRealtimeLogPreview(text)}`,
|
||||
@@ -585,10 +634,18 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
}
|
||||
const context = this.consumePendingSpeakerContext();
|
||||
if (!context) {
|
||||
const recent = this.findRecentAgentProxyConsultContext(question);
|
||||
if (recent) {
|
||||
logVoiceVerbose(
|
||||
`realtime forced agent consult skipped (already delegated): guild ${this.params.entry.guildId} channel ${this.params.entry.channelId} speaker ${recent.context.userId}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
logger.warn("discord voice: realtime forced agent consult has no speaker context");
|
||||
return;
|
||||
}
|
||||
const pending: PendingAgentProxyConsultContext = { context, question };
|
||||
const recent = this.rememberRecentAgentProxyConsultContext(question, context);
|
||||
const pending: PendingAgentProxyConsultContext = { context, question, recent };
|
||||
this.pendingAgentProxyConsultContexts.push(pending);
|
||||
pending.timer = setTimeout(() => {
|
||||
pending.timer = undefined;
|
||||
@@ -613,7 +670,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
|
||||
private consumeAgentProxyConsultContext(
|
||||
consultMessage: string,
|
||||
): DiscordRealtimeSpeakerContext | undefined {
|
||||
): PendingAgentProxyConsultContext | undefined {
|
||||
let pending: PendingAgentProxyConsultContext | undefined;
|
||||
if (this.pendingAgentProxyConsultContexts.length === 1) {
|
||||
pending = this.pendingAgentProxyConsultContexts.shift();
|
||||
@@ -630,7 +687,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
return undefined;
|
||||
}
|
||||
this.clearForcedConsultTimer(pending);
|
||||
return pending.context;
|
||||
return pending;
|
||||
}
|
||||
|
||||
private removePendingAgentProxyConsultContext(pending: PendingAgentProxyConsultContext): void {
|
||||
@@ -656,14 +713,17 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
this.syncOutputAudioTimestamp();
|
||||
this.bridge?.handleBargeIn({ audioPlaybackActive: true, force: true });
|
||||
this.clearOutputAudio();
|
||||
pending.recent.handledByForcedPlayback = true;
|
||||
try {
|
||||
const text = await this.runAgentTurn({
|
||||
const promise = this.runAgentTurn({
|
||||
context,
|
||||
message: [
|
||||
question,
|
||||
"Context: The realtime model produced a final user transcript without calling openclaw_agent_consult. OpenClaw is forcing the consult because consultPolicy is always.",
|
||||
].join("\n\n"),
|
||||
});
|
||||
this.setRecentAgentProxyConsultPromise(pending.recent, promise);
|
||||
const text = await promise;
|
||||
logger.info(
|
||||
`discord voice: realtime forced agent consult answer (${text.length} chars) elapsedMs=${Date.now() - startedAt} voiceSession=${this.params.entry.voiceSessionKey} supervisorSession=${this.params.entry.route.sessionKey} agent=${this.params.entry.route.agentId}: ${formatRealtimeLogPreview(text)}`,
|
||||
);
|
||||
@@ -693,6 +753,12 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
return turn?.context;
|
||||
}
|
||||
|
||||
private hasPendingSpeakerAudioContext(): boolean {
|
||||
this.prunePendingSpeakerTurns();
|
||||
this.expireClosedSpeakerTurnsBeforeLaterAudio();
|
||||
return this.pendingSpeakerTurns.some((turn) => turn.hasAudio);
|
||||
}
|
||||
|
||||
private prunePendingSpeakerTurns(): void {
|
||||
for (let index = this.pendingSpeakerTurns.length - 1; index >= 0; index -= 1) {
|
||||
const turn = this.pendingSpeakerTurns[index];
|
||||
@@ -720,6 +786,131 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
|
||||
hasLaterAudio = true;
|
||||
}
|
||||
}
|
||||
|
||||
private rememberRecentAgentProxyConsultContext(
|
||||
question: string,
|
||||
context: DiscordRealtimeSpeakerContext,
|
||||
): RecentAgentProxyConsultContext {
|
||||
this.pruneRecentAgentProxyConsultContexts();
|
||||
const recent: RecentAgentProxyConsultContext = {
|
||||
context,
|
||||
createdAt: Date.now(),
|
||||
questions: [question],
|
||||
};
|
||||
this.recentAgentProxyConsultContexts.push(recent);
|
||||
this.pruneRecentAgentProxyConsultContexts();
|
||||
return recent;
|
||||
}
|
||||
|
||||
private addRecentAgentProxyConsultQuestion(
|
||||
recent: RecentAgentProxyConsultContext,
|
||||
question: string,
|
||||
): void {
|
||||
if (
|
||||
recent.questions.some((candidate) => matchesPendingAgentProxyQuestion(question, candidate))
|
||||
) {
|
||||
return;
|
||||
}
|
||||
recent.questions.push(question);
|
||||
}
|
||||
|
||||
private setRecentAgentProxyConsultPromise(
|
||||
recent: RecentAgentProxyConsultContext,
|
||||
promise: Promise<string>,
|
||||
): void {
|
||||
recent.promise = promise;
|
||||
void promise
|
||||
.then((text) => {
|
||||
recent.result = { status: "fulfilled", text };
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
recent.result = { status: "rejected", error: formatErrorMessage(error) };
|
||||
});
|
||||
}
|
||||
|
||||
private findRecentAgentProxyConsultContext(
|
||||
consultMessage: string,
|
||||
): RecentAgentProxyConsultContext | undefined {
|
||||
this.pruneRecentAgentProxyConsultContexts();
|
||||
for (let index = this.recentAgentProxyConsultContexts.length - 1; index >= 0; index -= 1) {
|
||||
const recent = this.recentAgentProxyConsultContexts[index];
|
||||
if (
|
||||
recent?.questions.some((question) =>
|
||||
matchesPendingAgentProxyQuestion(consultMessage, question),
|
||||
)
|
||||
) {
|
||||
return recent;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private submitRecentAgentProxyConsultResult(
|
||||
callId: string,
|
||||
recent: RecentAgentProxyConsultContext,
|
||||
session: RealtimeVoiceBridgeSession,
|
||||
): boolean {
|
||||
const submitAlreadyDelivered = () => {
|
||||
session.submitToolResult(
|
||||
callId,
|
||||
{
|
||||
status: "already_delivered",
|
||||
message: "OpenClaw already delivered this answer to Discord voice.",
|
||||
},
|
||||
{ suppressResponse: true },
|
||||
);
|
||||
};
|
||||
const submitResult = (result: RecentAgentProxyConsultResult) => {
|
||||
if (recent.handledByForcedPlayback) {
|
||||
submitAlreadyDelivered();
|
||||
return;
|
||||
}
|
||||
if (result.status === "fulfilled") {
|
||||
session.submitToolResult(callId, { text: result.text });
|
||||
return;
|
||||
}
|
||||
session.submitToolResult(callId, { error: result.error });
|
||||
};
|
||||
if (recent.result) {
|
||||
logger.info(
|
||||
`discord voice: realtime consult reused recent agent result call=${callId || "unknown"} speaker=${recent.context.speakerLabel} owner=${recent.context.senderIsOwner}`,
|
||||
);
|
||||
submitResult(recent.result);
|
||||
return true;
|
||||
}
|
||||
if (!recent.promise) {
|
||||
return false;
|
||||
}
|
||||
logger.info(
|
||||
`discord voice: realtime consult joined in-flight agent result call=${callId || "unknown"} speaker=${recent.context.speakerLabel} owner=${recent.context.senderIsOwner}`,
|
||||
);
|
||||
if (recent.handledByForcedPlayback) {
|
||||
void recent.promise.then(submitAlreadyDelivered, submitAlreadyDelivered);
|
||||
return true;
|
||||
}
|
||||
void recent.promise
|
||||
.then((text) => session.submitToolResult(callId, { text }))
|
||||
.catch((error: unknown) =>
|
||||
session.submitToolResult(callId, { error: formatErrorMessage(error) }),
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
private pruneRecentAgentProxyConsultContexts(): void {
|
||||
const minCreatedAt = Date.now() - DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_TTL_MS;
|
||||
for (let index = this.recentAgentProxyConsultContexts.length - 1; index >= 0; index -= 1) {
|
||||
const recent = this.recentAgentProxyConsultContexts[index];
|
||||
if (recent && recent.createdAt < minCreatedAt) {
|
||||
this.recentAgentProxyConsultContexts.splice(index, 1);
|
||||
}
|
||||
}
|
||||
while (
|
||||
this.recentAgentProxyConsultContexts.length >
|
||||
DISCORD_REALTIME_RECENT_AGENT_PROXY_CONSULT_LIMIT
|
||||
) {
|
||||
this.recentAgentProxyConsultContexts.shift();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function isDiscordRealtimeSpeakerContext(value: unknown): value is DiscordRealtimeSpeakerContext {
|
||||
|
||||
@@ -1037,6 +1037,39 @@ describe("buildOpenAIRealtimeVoiceProvider", () => {
|
||||
expect(parseSent(socket).filter((event) => event.type === "response.create")).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("does not request a realtime response for suppressed tool results", async () => {
|
||||
const provider = buildOpenAIRealtimeVoiceProvider();
|
||||
const bridge = provider.createBridge({
|
||||
providerConfig: { apiKey: "sk-test" }, // pragma: allowlist secret
|
||||
onAudio: vi.fn(),
|
||||
onClearAudio: vi.fn(),
|
||||
});
|
||||
const connecting = bridge.connect();
|
||||
const socket = FakeWebSocket.instances[0];
|
||||
if (!socket) {
|
||||
throw new Error("expected bridge to create a websocket");
|
||||
}
|
||||
|
||||
socket.readyState = FakeWebSocket.OPEN;
|
||||
socket.emit("open");
|
||||
socket.emit("message", Buffer.from(JSON.stringify({ type: "session.updated" })));
|
||||
await connecting;
|
||||
|
||||
bridge.submitToolResult("call_1", { status: "already_delivered" }, { suppressResponse: true });
|
||||
|
||||
expect(parseSent(socket).slice(-1)).toEqual([
|
||||
{
|
||||
type: "conversation.item.create",
|
||||
item: {
|
||||
type: "function_call_output",
|
||||
call_id: "call_1",
|
||||
output: JSON.stringify({ status: "already_delivered" }),
|
||||
},
|
||||
},
|
||||
]);
|
||||
expect(parseSent(socket)).not.toContainEqual({ type: "response.create" });
|
||||
});
|
||||
|
||||
it("does not flush deferred response.create while a tool result is still continuing", async () => {
|
||||
const provider = buildOpenAIRealtimeVoiceProvider();
|
||||
const onError = vi.fn();
|
||||
|
||||
@@ -368,6 +368,9 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
return;
|
||||
}
|
||||
this.continuingToolCallIds.delete(callId);
|
||||
if (options?.suppressResponse === true) {
|
||||
return;
|
||||
}
|
||||
this.requestResponseCreate();
|
||||
}
|
||||
|
||||
|
||||
@@ -409,7 +409,7 @@ describe("validateTalkSessionRelayParams", () => {
|
||||
sessionId: "session-1",
|
||||
callId: "call-1",
|
||||
result: { ok: true },
|
||||
options: { willContinue: true },
|
||||
options: { suppressResponse: true, willContinue: true },
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
@@ -254,6 +254,7 @@ export const TalkSessionSubmitToolResultParamsSchema = Type.Object(
|
||||
options: Type.Optional(
|
||||
Type.Object(
|
||||
{
|
||||
suppressResponse: Type.Optional(Type.Boolean()),
|
||||
willContinue: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
|
||||
@@ -567,7 +567,7 @@ describe("talk.session unified handlers", () => {
|
||||
sessionId: "relay-unified-1",
|
||||
callId: "call-1",
|
||||
result: { status: "working" },
|
||||
options: { willContinue: true },
|
||||
options: { suppressResponse: true, willContinue: true },
|
||||
},
|
||||
client: { connId: "conn-1" } as never,
|
||||
isWebchatConnect: () => false,
|
||||
@@ -579,7 +579,7 @@ describe("talk.session unified handlers", () => {
|
||||
connId: "conn-1",
|
||||
callId: "call-1",
|
||||
result: { status: "working" },
|
||||
options: { willContinue: true },
|
||||
options: { suppressResponse: true, willContinue: true },
|
||||
});
|
||||
|
||||
const closeRespond = vi.fn();
|
||||
|
||||
@@ -245,6 +245,13 @@ describe("talk realtime gateway relay", () => {
|
||||
callId: "call-1",
|
||||
result: { ok: true },
|
||||
});
|
||||
submitTalkRealtimeRelayToolResult({
|
||||
relaySessionId: session.relaySessionId,
|
||||
connId: "conn-1",
|
||||
callId: "call-2",
|
||||
result: { status: "already_delivered" },
|
||||
options: { suppressResponse: true },
|
||||
});
|
||||
cancelTalkRealtimeRelayTurn({
|
||||
relaySessionId: session.relaySessionId,
|
||||
connId: "conn-1",
|
||||
@@ -261,6 +268,12 @@ describe("talk realtime gateway relay", () => {
|
||||
{ willContinue: true },
|
||||
);
|
||||
expect(bridge.submitToolResult).toHaveBeenNthCalledWith(2, "call-1", { ok: true }, undefined);
|
||||
expect(bridge.submitToolResult).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
"call-2",
|
||||
{ status: "already_delivered" },
|
||||
{ suppressResponse: true },
|
||||
);
|
||||
expect(bridge.handleBargeIn).toHaveBeenCalledWith({ audioPlaybackActive: true });
|
||||
expect(bridge.close).toHaveBeenCalled();
|
||||
expect(events).toEqual(
|
||||
|
||||
@@ -50,6 +50,11 @@ export type RealtimeVoiceToolCallEvent = {
|
||||
};
|
||||
|
||||
export type RealtimeVoiceToolResultOptions = {
|
||||
/**
|
||||
* Submit the tool result without prompting the realtime provider to generate a new assistant
|
||||
* response. Use when another channel has already delivered the user-visible answer.
|
||||
*/
|
||||
suppressResponse?: boolean;
|
||||
willContinue?: boolean;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user