From 33cdb342cb66da8120bd30d3b4da1e80438e8fa0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 6 Apr 2026 19:23:48 +0100 Subject: [PATCH] refactor(discord): split voice receive and capture helpers --- .../discord/src/voice/capture-state.test.ts | 48 ++++ extensions/discord/src/voice/capture-state.ts | 120 +++++++++ .../discord/src/voice/manager.e2e.test.ts | 36 +-- extensions/discord/src/voice/manager.ts | 242 ++++++------------ .../src/voice/receive-recovery.test.ts | 79 ++++++ .../discord/src/voice/receive-recovery.ts | 159 ++++++++++++ 6 files changed, 510 insertions(+), 174 deletions(-) create mode 100644 extensions/discord/src/voice/capture-state.test.ts create mode 100644 extensions/discord/src/voice/capture-state.ts create mode 100644 extensions/discord/src/voice/receive-recovery.test.ts create mode 100644 extensions/discord/src/voice/receive-recovery.ts diff --git a/extensions/discord/src/voice/capture-state.test.ts b/extensions/discord/src/voice/capture-state.test.ts new file mode 100644 index 00000000000..f41f2672cc9 --- /dev/null +++ b/extensions/discord/src/voice/capture-state.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it, vi } from "vitest"; +import { + beginVoiceCapture, + clearVoiceCaptureFinalizeTimer, + createVoiceCaptureState, + finishVoiceCapture, + scheduleVoiceCaptureFinalize, +} from "./capture-state.js"; + +describe("voice capture state", () => { + it("increments generations per speaker", () => { + const state = createVoiceCaptureState(); + const first = beginVoiceCapture(state, "u1", { destroy: vi.fn() } as never); + finishVoiceCapture(state, "u1", first); + const second = beginVoiceCapture(state, "u1", { destroy: vi.fn() } as never); + + expect(first).toBe(1); + expect(second).toBe(2); + }); + + it("clears active speaker state before destroying a finalized capture", async () => { + vi.useFakeTimers(); + try { + const state = createVoiceCaptureState(); + const destroy = vi.fn(() => { + expect(state.activeSpeakers.has("u1")).toBe(false); + expect(state.activeCaptureStreams.has("u1")).toBe(false); + }); + beginVoiceCapture(state, "u1", { destroy } as never); + + expect(scheduleVoiceCaptureFinalize({ state, userId: "u1", delayMs: 1_200 })).toBe(true); + await vi.advanceTimersByTimeAsync(1_200); + + expect(destroy).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + + it("lets a pending finalize be canceled for the same generation", () => { + const state = createVoiceCaptureState(); + const generation = beginVoiceCapture(state, "u1", { destroy: vi.fn() } as never); + + expect(scheduleVoiceCaptureFinalize({ state, userId: "u1", delayMs: 1_200 })).toBe(true); + expect(clearVoiceCaptureFinalizeTimer(state, "u1", generation)).toBe(true); + expect(state.captureFinalizeTimers.has("u1")).toBe(false); + }); +}); diff --git a/extensions/discord/src/voice/capture-state.ts b/extensions/discord/src/voice/capture-state.ts new file mode 100644 index 00000000000..01086ddd04f --- /dev/null +++ b/extensions/discord/src/voice/capture-state.ts @@ -0,0 +1,120 @@ +import type { Readable } from "node:stream"; + +export type VoiceCaptureEntry = { + generation: number; + stream: Readable; +}; + +export type VoiceCaptureFinalizeTimer = { + generation: number; + timer: ReturnType; +}; + +export type VoiceCaptureState = { + activeSpeakers: Set; + activeCaptureStreams: Map; + captureFinalizeTimers: Map; + captureGenerations: Map; +}; + +export function createVoiceCaptureState(): VoiceCaptureState { + return { + activeSpeakers: new Set(), + activeCaptureStreams: new Map(), + captureFinalizeTimers: new Map(), + captureGenerations: new Map(), + }; +} + +export function stopVoiceCaptureState(state: VoiceCaptureState): void { + for (const { timer } of state.captureFinalizeTimers.values()) { + clearTimeout(timer); + } + state.captureFinalizeTimers.clear(); + for (const { stream } of state.activeCaptureStreams.values()) { + stream.destroy(); + } + state.activeCaptureStreams.clear(); + state.captureGenerations.clear(); + state.activeSpeakers.clear(); +} + +export function getActiveVoiceCapture( + state: VoiceCaptureState, + userId: string, +): VoiceCaptureEntry | undefined { + return state.activeCaptureStreams.get(userId); +} + +export function isVoiceCaptureActive(state: VoiceCaptureState, userId: string): boolean { + return state.activeSpeakers.has(userId); +} + +export function clearVoiceCaptureFinalizeTimer( + state: VoiceCaptureState, + userId: string, + generation?: number, +): boolean { + const scheduled = state.captureFinalizeTimers.get(userId); + if (!scheduled || (generation !== undefined && scheduled.generation !== generation)) { + return false; + } + clearTimeout(scheduled.timer); + state.captureFinalizeTimers.delete(userId); + return true; +} + +export function beginVoiceCapture( + state: VoiceCaptureState, + userId: string, + stream: Readable, +): number { + const generation = (state.captureGenerations.get(userId) ?? 0) + 1; + state.captureGenerations.set(userId, generation); + state.activeSpeakers.add(userId); + state.activeCaptureStreams.set(userId, { generation, stream }); + clearVoiceCaptureFinalizeTimer(state, userId, generation); + return generation; +} + +export function finishVoiceCapture( + state: VoiceCaptureState, + userId: string, + generation: number, +): boolean { + clearVoiceCaptureFinalizeTimer(state, userId, generation); + const activeCapture = state.activeCaptureStreams.get(userId); + if (activeCapture?.generation !== generation) { + return false; + } + state.activeCaptureStreams.delete(userId); + state.activeSpeakers.delete(userId); + return true; +} + +export function scheduleVoiceCaptureFinalize(params: { + state: VoiceCaptureState; + userId: string; + delayMs: number; + onFinalize?: (capture: VoiceCaptureEntry) => void; +}): boolean { + const { state, userId, delayMs, onFinalize } = params; + const capture = state.activeCaptureStreams.get(userId); + if (!capture) { + return false; + } + clearVoiceCaptureFinalizeTimer(state, userId, capture.generation); + const timer = setTimeout(() => { + const activeCapture = state.activeCaptureStreams.get(userId); + if (!activeCapture || activeCapture.generation !== capture.generation) { + return; + } + state.captureFinalizeTimers.delete(userId); + state.activeCaptureStreams.delete(userId); + state.activeSpeakers.delete(userId); + onFinalize?.(activeCapture); + activeCapture.stream.destroy(); + }, delayMs); + state.captureFinalizeTimers.set(userId, { generation: capture.generation, timer }); + return true; +} diff --git a/extensions/discord/src/voice/manager.e2e.test.ts b/extensions/discord/src/voice/manager.e2e.test.ts index e89bd93d365..49b6ce23d36 100644 --- a/extensions/discord/src/voice/manager.e2e.test.ts +++ b/extensions/discord/src/voice/manager.e2e.test.ts @@ -1,5 +1,7 @@ import { ChannelType } from "@buape/carbon"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { createVoiceCaptureState } from "./capture-state.js"; +import { createVoiceReceiveRecoveryState } from "./receive-recovery.js"; const { createConnectionMock, @@ -263,13 +265,8 @@ describe("DiscordVoiceManager", () => { player: createAudioPlayerMock(), playbackQueue: Promise.resolve(), processingQueue: Promise.resolve(), - activeSpeakers: new Set(), - activeCaptureStreams: new Map(), - captureFinalizeTimers: new Map(), - captureGenerations: new Map(), - decryptFailureCount: 0, - lastDecryptFailureAt: 0, - decryptRecoveryInFlight: false, + capture: createVoiceCaptureState(), + receiveRecovery: createVoiceReceiveRecoveryState(), }, wavPath: "/tmp/test.wav", userId, @@ -383,7 +380,9 @@ describe("DiscordVoiceManager", () => { it("re-arms passthrough but still rejoin-recovers after repeated decrypt failures", async () => { const connection = createConnectionMock(); - joinVoiceChannelMock.mockReturnValueOnce(connection).mockReturnValueOnce(createConnectionMock()); + joinVoiceChannelMock + .mockReturnValueOnce(connection) + .mockReturnValueOnce(createConnectionMock()); const manager = createManager(); await manager.join({ guildId: "g1", channelId: "1001" }); @@ -412,18 +411,23 @@ describe("DiscordVoiceManager", () => { | { guildId: string; channelId: string; - activeSpeakers: Set; - activeCaptureStreams: Map void } }>; - captureFinalizeTimers: Map; - captureGenerations: Map; + capture: { + activeSpeakers: Set; + activeCaptureStreams: Map< + string, + { generation: number; stream: { destroy: () => void } } + >; + captureFinalizeTimers: Map; + captureGenerations: Map; + }; } | undefined; expect(entry).toBeDefined(); const firstStream = { destroy: vi.fn() }; - entry?.activeSpeakers.add("u1"); - entry?.captureGenerations.set("u1", 1); - entry?.activeCaptureStreams.set("u1", { generation: 1, stream: firstStream }); + entry?.capture.activeSpeakers.add("u1"); + entry?.capture.captureGenerations.set("u1", 1); + entry?.capture.activeCaptureStreams.set("u1", { generation: 1, stream: firstStream }); ( manager as unknown as { @@ -434,7 +438,7 @@ describe("DiscordVoiceManager", () => { await vi.advanceTimersByTimeAsync(1_200); expect(firstStream.destroy).toHaveBeenCalledTimes(1); - expect(entry?.activeSpeakers.has("u1")).toBe(false); + expect(entry?.capture.activeSpeakers.has("u1")).toBe(false); const secondStream = { on: vi.fn(), diff --git a/extensions/discord/src/voice/manager.ts b/extensions/discord/src/voice/manager.ts index 6c5366fb74e..57f8e75ea3e 100644 --- a/extensions/discord/src/voice/manager.ts +++ b/extensions/discord/src/voice/manager.ts @@ -22,7 +22,29 @@ import { normalizeDiscordSlug, resolveDiscordOwnerAccess } from "../monitor/allo import { formatDiscordUserTag } from "../monitor/format.js"; import { getDiscordRuntime } from "../runtime.js"; import { authorizeDiscordVoiceIngress } from "./access.js"; +import { + beginVoiceCapture, + clearVoiceCaptureFinalizeTimer, + createVoiceCaptureState, + finishVoiceCapture, + getActiveVoiceCapture, + isVoiceCaptureActive, + scheduleVoiceCaptureFinalize, + stopVoiceCaptureState, + type VoiceCaptureState, +} from "./capture-state.js"; import { formatVoiceIngressPrompt } from "./prompt.js"; +import { + analyzeVoiceReceiveError, + createVoiceReceiveRecoveryState, + DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS, + DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS, + enableDaveReceivePassthrough as tryEnableDaveReceivePassthrough, + finishVoiceDecryptRecovery, + noteVoiceDecryptFailure, + resetVoiceReceiveRecoveryState, + type VoiceReceiveRecoveryState, +} from "./receive-recovery.js"; import { sanitizeVoiceReplyTextForSpeech } from "./sanitize.js"; import { loadDiscordVoiceSdk } from "./sdk-runtime.js"; @@ -36,12 +58,6 @@ const CAPTURE_FINALIZE_GRACE_MS = 1_200; const VOICE_CONNECT_READY_TIMEOUT_MS = 15_000; const PLAYBACK_READY_TIMEOUT_MS = 60_000; const SPEAKING_READY_TIMEOUT_MS = 60_000; -const DECRYPT_FAILURE_WINDOW_MS = 30_000; -const DECRYPT_FAILURE_RECONNECT_THRESHOLD = 3; -const DECRYPT_FAILURE_PATTERN = /DecryptionFailed\(/; -const DAVE_PASSTHROUGH_DISABLED_PATTERN = /UnencryptedWhenPassthroughDisabled/; -const DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS = 30; -const DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS = 15; const SPEAKER_CONTEXT_CACHE_TTL_MS = 60_000; const logger = createSubsystemLogger("discord/voice"); @@ -57,16 +73,6 @@ type VoiceOperationResult = { guildId?: string; }; -type VoiceCaptureEntry = { - generation: number; - stream: Readable; -}; - -type VoiceCaptureFinalizeTimer = { - generation: number; - timer: ReturnType; -}; - type VoiceSessionEntry = { guildId: string; guildName?: string; @@ -78,13 +84,8 @@ type VoiceSessionEntry = { player: import("@discordjs/voice").AudioPlayer; playbackQueue: Promise; processingQueue: Promise; - activeSpeakers: Set; - activeCaptureStreams: Map; - captureFinalizeTimers: Map; - captureGenerations: Map; - decryptFailureCount: number; - lastDecryptFailureAt: number; - decryptRecoveryInFlight: boolean; + capture: VoiceCaptureState; + receiveRecovery: VoiceReceiveRecoveryState; stop: () => void; }; @@ -172,25 +173,6 @@ type OpusDecoderFactory = { let warnedOpusMissing = false; let cachedOpusDecoderFactory: OpusDecoderFactory | null | "unresolved" = "unresolved"; -function isAbortLikeError(err: unknown): boolean { - if (!err || typeof err !== "object") { - return false; - } - const name = - "name" in err && typeof (err as { name?: unknown }).name === "string" - ? (err as { name: string }).name - : ""; - const message = - "message" in err && typeof (err as { message?: unknown }).message === "string" - ? (err as { message: string }).message - : ""; - return ( - name === "AbortError" || - message.includes("The operation was aborted") || - message.includes("aborted") - ); -} - function resolveOpusDecoderFactory(): OpusDecoderFactory | null { const factories: OpusDecoderFactory[] = [ { @@ -534,13 +516,8 @@ export class DiscordVoiceManager { player, playbackQueue: Promise.resolve(), processingQueue: Promise.resolve(), - activeSpeakers: new Set(), - activeCaptureStreams: new Map(), - captureFinalizeTimers: new Map(), - captureGenerations: new Map(), - decryptFailureCount: 0, - lastDecryptFailureAt: 0, - decryptRecoveryInFlight: false, + capture: createVoiceCaptureState(), + receiveRecovery: createVoiceReceiveRecoveryState(), stop: () => { if (speakingHandler) { connection.receiver.speaking.off("start", speakingHandler); @@ -548,16 +525,7 @@ export class DiscordVoiceManager { if (speakingEndHandler) { connection.receiver.speaking.off("end", speakingEndHandler); } - for (const { timer } of entry.captureFinalizeTimers.values()) { - clearTimeout(timer); - } - entry.captureFinalizeTimers.clear(); - for (const { stream } of entry.activeCaptureStreams.values()) { - stream.destroy(); - } - entry.activeCaptureStreams.clear(); - entry.captureGenerations.clear(); - entry.activeSpeakers.clear(); + stopVoiceCaptureState(entry.capture); if (disconnectedHandler) { connection.off(voiceSdk.VoiceConnectionStatus.Disconnected, disconnectedHandler); } @@ -660,35 +628,20 @@ export class DiscordVoiceManager { } private clearCaptureFinalizeTimer(entry: VoiceSessionEntry, userId: string, generation?: number) { - const scheduled = entry.captureFinalizeTimers.get(userId); - if (!scheduled || (generation !== undefined && scheduled.generation !== generation)) { - return false; - } - clearTimeout(scheduled.timer); - entry.captureFinalizeTimers.delete(userId); - return true; + return clearVoiceCaptureFinalizeTimer(entry.capture, userId, generation); } private scheduleCaptureFinalize(entry: VoiceSessionEntry, userId: string, reason: string) { - const capture = entry.activeCaptureStreams.get(userId); - if (!capture) { - return; - } - this.clearCaptureFinalizeTimer(entry, userId, capture.generation); - const timer = setTimeout(() => { - const activeCapture = entry.activeCaptureStreams.get(userId); - if (!activeCapture || activeCapture.generation !== capture.generation) { - return; - } - entry.captureFinalizeTimers.delete(userId); - entry.activeCaptureStreams.delete(userId); - entry.activeSpeakers.delete(userId); - logVoiceVerbose( - `capture finalize: guild ${entry.guildId} channel ${entry.channelId} user ${userId} reason=${reason} grace=${CAPTURE_FINALIZE_GRACE_MS}ms`, - ); - activeCapture.stream.destroy(); - }, CAPTURE_FINALIZE_GRACE_MS); - entry.captureFinalizeTimers.set(userId, { generation: capture.generation, timer }); + scheduleVoiceCaptureFinalize({ + state: entry.capture, + userId, + delayMs: CAPTURE_FINALIZE_GRACE_MS, + onFinalize: () => { + logVoiceVerbose( + `capture finalize: guild ${entry.guildId} channel ${entry.channelId} user ${userId} reason=${reason} grace=${CAPTURE_FINALIZE_GRACE_MS}ms`, + ); + }, + }); } private async handleSpeakingStart(entry: VoiceSessionEntry, userId: string) { @@ -698,8 +651,8 @@ export class DiscordVoiceManager { if (this.botUserId && userId === this.botUserId) { return; } - if (entry.activeSpeakers.has(userId)) { - const activeCapture = entry.activeCaptureStreams.get(userId); + if (isVoiceCaptureActive(entry.capture, userId)) { + const activeCapture = getActiveVoiceCapture(entry.capture, userId); const extended = activeCapture ? this.clearCaptureFinalizeTimer(entry, userId, activeCapture.generation) : false; @@ -709,7 +662,6 @@ export class DiscordVoiceManager { return; } - entry.activeSpeakers.add(userId); logVoiceVerbose( `capture start: guild ${entry.guildId} channel ${entry.channelId} user ${userId}`, ); @@ -723,18 +675,15 @@ export class DiscordVoiceManager { entry.player.stop(true); } - const generation = (entry.captureGenerations.get(userId) ?? 0) + 1; - entry.captureGenerations.set(userId, generation); const stream = entry.connection.receiver.subscribe(userId, { end: { behavior: voiceSdk.EndBehaviorType.Manual, }, }); - entry.activeCaptureStreams.set(userId, { generation, stream }); - this.clearCaptureFinalizeTimer(entry, userId, generation); + const generation = beginVoiceCapture(entry.capture, userId, stream); let streamAborted = false; stream.on("error", (err) => { - streamAborted = isAbortLikeError(err); + streamAborted = analyzeVoiceReceiveError(err).isAbortLike; this.handleReceiveError(entry, err); }); @@ -762,12 +711,7 @@ export class DiscordVoiceManager { await this.processSegment({ entry, wavPath, userId, durationSeconds }); }); } finally { - this.clearCaptureFinalizeTimer(entry, userId, generation); - const activeCapture = entry.activeCaptureStreams.get(userId); - if (activeCapture?.generation === generation) { - entry.activeCaptureStreams.delete(userId); - entry.activeSpeakers.delete(userId); - } + finishVoiceCapture(entry.capture, userId, generation); } } @@ -907,44 +851,33 @@ export class DiscordVoiceManager { } private handleReceiveError(entry: VoiceSessionEntry, err: unknown) { - const message = formatErrorMessage(err); - logger.warn(`discord voice: receive error: ${message}`); - const sawPassthroughDisabled = DAVE_PASSTHROUGH_DISABLED_PATTERN.test(message); - if (sawPassthroughDisabled) { + const analysis = analyzeVoiceReceiveError(err); + logger.warn(`discord voice: receive error: ${analysis.message}`); + if (analysis.shouldAttemptPassthrough) { this.enableDaveReceivePassthrough( entry, "receive decrypt error", DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS, ); } - if (!DECRYPT_FAILURE_PATTERN.test(message) && !sawPassthroughDisabled) { + if (!analysis.countsAsDecryptFailure) { return; } - const now = Date.now(); - if (now - entry.lastDecryptFailureAt > DECRYPT_FAILURE_WINDOW_MS) { - entry.decryptFailureCount = 0; - } - entry.lastDecryptFailureAt = now; - entry.decryptFailureCount += 1; - if (entry.decryptFailureCount === 1) { + const decryptFailure = noteVoiceDecryptFailure(entry.receiveRecovery); + if (decryptFailure.firstFailure) { logger.warn( "discord voice: DAVE decrypt failures detected; voice receive may be unstable (upstream: discordjs/discord.js#11419)", ); } - if ( - entry.decryptFailureCount < DECRYPT_FAILURE_RECONNECT_THRESHOLD || - entry.decryptRecoveryInFlight - ) { + if (!decryptFailure.shouldRecover) { return; } - entry.decryptRecoveryInFlight = true; - this.resetDecryptFailureState(entry); void this.recoverFromDecryptFailures(entry) .catch((recoverErr) => logger.warn(`discord voice: decrypt recovery failed: ${formatErrorMessage(recoverErr)}`), ) .finally(() => { - entry.decryptRecoveryInFlight = false; + finishVoiceDecryptRecovery(entry.receiveRecovery); }); } @@ -954,51 +887,44 @@ export class DiscordVoiceManager { expirySeconds: number, ): boolean { const voiceSdk = loadDiscordVoiceSdk(); - const state = entry.connection.state as { - status: unknown; - networking?: { - state?: { - code?: unknown; - dave?: { - session?: { - setPassthroughMode: (passthrough: boolean, expiry: number) => void; + return tryEnableDaveReceivePassthrough({ + target: { + guildId: entry.guildId, + channelId: entry.channelId, + connection: entry.connection as { + state: { + status: unknown; + networking?: { + state?: { + code?: unknown; + dave?: { + session?: { + setPassthroughMode: (passthrough: boolean, expirySeconds: number) => void; + }; + }; + }; }; }; - }; - }; - }; - if (state.status !== voiceSdk.VoiceConnectionStatus.Ready) { - return false; - } - const networkingState = state.networking?.state; - if ( - !networkingState || - (networkingState.code !== voiceSdk.NetworkingStatusCode.Ready && - networkingState.code !== voiceSdk.NetworkingStatusCode.Resuming) - ) { - return false; - } - const daveSession = networkingState.dave?.session; - if (!daveSession) { - return false; - } - try { - daveSession.setPassthroughMode(true, expirySeconds); - logVoiceVerbose( - `enabled DAVE receive passthrough: guild ${entry.guildId} channel ${entry.channelId} expiry=${expirySeconds}s reason=${reason}`, - ); - return true; - } catch (passthroughErr) { - logger.warn( - `discord voice: failed to enable DAVE passthrough guild=${entry.guildId} channel=${entry.channelId} reason=${reason}: ${formatErrorMessage(passthroughErr)}`, - ); - return false; - } + }, + }, + sdk: { + VoiceConnectionStatus: { + Ready: voiceSdk.VoiceConnectionStatus.Ready, + }, + NetworkingStatusCode: { + Ready: voiceSdk.NetworkingStatusCode.Ready, + Resuming: voiceSdk.NetworkingStatusCode.Resuming, + }, + }, + reason, + expirySeconds, + onVerbose: logVoiceVerbose, + onWarn: (message) => logger.warn(message), + }); } private resetDecryptFailureState(entry: VoiceSessionEntry) { - entry.decryptFailureCount = 0; - entry.lastDecryptFailureAt = 0; + resetVoiceReceiveRecoveryState(entry.receiveRecovery); } private async recoverFromDecryptFailures(entry: VoiceSessionEntry) { diff --git a/extensions/discord/src/voice/receive-recovery.test.ts b/extensions/discord/src/voice/receive-recovery.test.ts new file mode 100644 index 00000000000..58288ce9a68 --- /dev/null +++ b/extensions/discord/src/voice/receive-recovery.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it, vi } from "vitest"; +import { + analyzeVoiceReceiveError, + createVoiceReceiveRecoveryState, + enableDaveReceivePassthrough, + noteVoiceDecryptFailure, +} from "./receive-recovery.js"; + +describe("voice receive recovery", () => { + it("treats passthrough-disabled decrypt errors as decrypt failures", () => { + expect( + analyzeVoiceReceiveError( + new Error("Failed to decrypt: DecryptionFailed(UnencryptedWhenPassthroughDisabled)"), + ), + ).toMatchObject({ + shouldAttemptPassthrough: true, + countsAsDecryptFailure: true, + }); + }); + + it("gates recovery after repeated decrypt failures in the same window", () => { + const state = createVoiceReceiveRecoveryState(); + + expect(noteVoiceDecryptFailure(state, 1_000)).toEqual({ + firstFailure: true, + shouldRecover: false, + }); + expect(noteVoiceDecryptFailure(state, 2_000)).toEqual({ + firstFailure: false, + shouldRecover: false, + }); + expect(noteVoiceDecryptFailure(state, 3_000)).toEqual({ + firstFailure: false, + shouldRecover: true, + }); + }); + + it("enables passthrough only for ready DAVE sessions", () => { + const setPassthroughMode = vi.fn(); + const onVerbose = vi.fn(); + const onWarn = vi.fn(); + + expect( + enableDaveReceivePassthrough({ + target: { + guildId: "g1", + channelId: "c1", + connection: { + state: { + status: "ready", + networking: { + state: { + code: "networking-ready", + dave: { + session: { + setPassthroughMode, + }, + }, + }, + }, + }, + }, + }, + sdk: { + VoiceConnectionStatus: { Ready: "ready" }, + NetworkingStatusCode: { Ready: "networking-ready", Resuming: "networking-resuming" }, + }, + reason: "test", + expirySeconds: 15, + onVerbose, + onWarn, + }), + ).toBe(true); + + expect(setPassthroughMode).toHaveBeenCalledWith(true, 15); + expect(onVerbose).toHaveBeenCalled(); + expect(onWarn).not.toHaveBeenCalled(); + }); +}); diff --git a/extensions/discord/src/voice/receive-recovery.ts b/extensions/discord/src/voice/receive-recovery.ts new file mode 100644 index 00000000000..af5fba2cfbb --- /dev/null +++ b/extensions/discord/src/voice/receive-recovery.ts @@ -0,0 +1,159 @@ +import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime"; + +const DECRYPT_FAILURE_WINDOW_MS = 30_000; +const DECRYPT_FAILURE_RECONNECT_THRESHOLD = 3; +const DECRYPT_FAILURE_PATTERN = /DecryptionFailed\(/; +const DAVE_PASSTHROUGH_DISABLED_PATTERN = /UnencryptedWhenPassthroughDisabled/; + +export const DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS = 30; +export const DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS = 15; + +export type VoiceReceiveRecoveryState = { + decryptFailureCount: number; + lastDecryptFailureAt: number; + decryptRecoveryInFlight: boolean; +}; + +export type VoiceReceiveErrorAnalysis = { + message: string; + isAbortLike: boolean; + shouldAttemptPassthrough: boolean; + countsAsDecryptFailure: boolean; +}; + +type DavePassthroughTarget = { + guildId: string; + channelId: string; + connection: { + state: { + status: unknown; + networking?: { + state?: { + code?: unknown; + dave?: { + session?: { + setPassthroughMode: (passthrough: boolean, expirySeconds: number) => void; + }; + }; + }; + }; + }; + }; +}; + +type DavePassthroughSdk = { + VoiceConnectionStatus: { + Ready: unknown; + }; + NetworkingStatusCode: { + Ready: unknown; + Resuming: unknown; + }; +}; + +export function createVoiceReceiveRecoveryState(): VoiceReceiveRecoveryState { + return { + decryptFailureCount: 0, + lastDecryptFailureAt: 0, + decryptRecoveryInFlight: false, + }; +} + +export function isAbortLikeReceiveError(err: unknown): boolean { + if (!err || typeof err !== "object") { + return false; + } + const name = + "name" in err && typeof (err as { name?: unknown }).name === "string" + ? (err as { name: string }).name + : ""; + const message = + "message" in err && typeof (err as { message?: unknown }).message === "string" + ? (err as { message: string }).message + : ""; + return ( + name === "AbortError" || + message.includes("The operation was aborted") || + message.includes("aborted") + ); +} + +export function analyzeVoiceReceiveError(err: unknown): VoiceReceiveErrorAnalysis { + const message = formatErrorMessage(err); + const shouldAttemptPassthrough = DAVE_PASSTHROUGH_DISABLED_PATTERN.test(message); + return { + message, + isAbortLike: isAbortLikeReceiveError(err), + shouldAttemptPassthrough, + countsAsDecryptFailure: DECRYPT_FAILURE_PATTERN.test(message) || shouldAttemptPassthrough, + }; +} + +export function noteVoiceDecryptFailure( + state: VoiceReceiveRecoveryState, + now: number = Date.now(), +): { + firstFailure: boolean; + shouldRecover: boolean; +} { + if (now - state.lastDecryptFailureAt > DECRYPT_FAILURE_WINDOW_MS) { + state.decryptFailureCount = 0; + } + state.lastDecryptFailureAt = now; + state.decryptFailureCount += 1; + const firstFailure = state.decryptFailureCount === 1; + if ( + state.decryptFailureCount < DECRYPT_FAILURE_RECONNECT_THRESHOLD || + state.decryptRecoveryInFlight + ) { + return { firstFailure, shouldRecover: false }; + } + state.decryptRecoveryInFlight = true; + resetVoiceReceiveRecoveryState(state); + return { firstFailure, shouldRecover: true }; +} + +export function resetVoiceReceiveRecoveryState(state: VoiceReceiveRecoveryState): void { + state.decryptFailureCount = 0; + state.lastDecryptFailureAt = 0; +} + +export function finishVoiceDecryptRecovery(state: VoiceReceiveRecoveryState): void { + state.decryptRecoveryInFlight = false; +} + +export function enableDaveReceivePassthrough(params: { + target: DavePassthroughTarget; + sdk: DavePassthroughSdk; + reason: string; + expirySeconds: number; + onVerbose: (message: string) => void; + onWarn: (message: string) => void; +}): boolean { + const { target, sdk, reason, expirySeconds, onVerbose, onWarn } = params; + const networkingState = target.connection.state.networking?.state; + if ( + target.connection.state.status !== sdk.VoiceConnectionStatus.Ready || + !networkingState || + (networkingState.code !== sdk.NetworkingStatusCode.Ready && + networkingState.code !== sdk.NetworkingStatusCode.Resuming) + ) { + return false; + } + const daveSession = networkingState.dave?.session; + if (!daveSession) { + return false; + } + try { + daveSession.setPassthroughMode(true, expirySeconds); + onVerbose( + `enabled DAVE receive passthrough: guild ${target.guildId} channel ${target.channelId} expiry=${expirySeconds}s reason=${reason}`, + ); + return true; + } catch (err) { + onWarn( + `discord voice: failed to enable DAVE passthrough guild=${target.guildId} channel=${target.channelId} reason=${reason}: ${formatErrorMessage(err)}`, + ); + return false; + } +}