mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
refactor(discord): split voice receive and capture helpers
This commit is contained in:
48
extensions/discord/src/voice/capture-state.test.ts
Normal file
48
extensions/discord/src/voice/capture-state.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
beginVoiceCapture,
|
||||
clearVoiceCaptureFinalizeTimer,
|
||||
createVoiceCaptureState,
|
||||
finishVoiceCapture,
|
||||
scheduleVoiceCaptureFinalize,
|
||||
} from "./capture-state.js";
|
||||
|
||||
describe("voice capture state", () => {
|
||||
it("increments generations per speaker", () => {
|
||||
const state = createVoiceCaptureState();
|
||||
const first = beginVoiceCapture(state, "u1", { destroy: vi.fn() } as never);
|
||||
finishVoiceCapture(state, "u1", first);
|
||||
const second = beginVoiceCapture(state, "u1", { destroy: vi.fn() } as never);
|
||||
|
||||
expect(first).toBe(1);
|
||||
expect(second).toBe(2);
|
||||
});
|
||||
|
||||
it("clears active speaker state before destroying a finalized capture", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const state = createVoiceCaptureState();
|
||||
const destroy = vi.fn(() => {
|
||||
expect(state.activeSpeakers.has("u1")).toBe(false);
|
||||
expect(state.activeCaptureStreams.has("u1")).toBe(false);
|
||||
});
|
||||
beginVoiceCapture(state, "u1", { destroy } as never);
|
||||
|
||||
expect(scheduleVoiceCaptureFinalize({ state, userId: "u1", delayMs: 1_200 })).toBe(true);
|
||||
await vi.advanceTimersByTimeAsync(1_200);
|
||||
|
||||
expect(destroy).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("lets a pending finalize be canceled for the same generation", () => {
|
||||
const state = createVoiceCaptureState();
|
||||
const generation = beginVoiceCapture(state, "u1", { destroy: vi.fn() } as never);
|
||||
|
||||
expect(scheduleVoiceCaptureFinalize({ state, userId: "u1", delayMs: 1_200 })).toBe(true);
|
||||
expect(clearVoiceCaptureFinalizeTimer(state, "u1", generation)).toBe(true);
|
||||
expect(state.captureFinalizeTimers.has("u1")).toBe(false);
|
||||
});
|
||||
});
|
||||
120
extensions/discord/src/voice/capture-state.ts
Normal file
120
extensions/discord/src/voice/capture-state.ts
Normal file
@@ -0,0 +1,120 @@
|
||||
import type { Readable } from "node:stream";
|
||||
|
||||
export type VoiceCaptureEntry = {
|
||||
generation: number;
|
||||
stream: Readable;
|
||||
};
|
||||
|
||||
export type VoiceCaptureFinalizeTimer = {
|
||||
generation: number;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
export type VoiceCaptureState = {
|
||||
activeSpeakers: Set<string>;
|
||||
activeCaptureStreams: Map<string, VoiceCaptureEntry>;
|
||||
captureFinalizeTimers: Map<string, VoiceCaptureFinalizeTimer>;
|
||||
captureGenerations: Map<string, number>;
|
||||
};
|
||||
|
||||
export function createVoiceCaptureState(): VoiceCaptureState {
|
||||
return {
|
||||
activeSpeakers: new Set(),
|
||||
activeCaptureStreams: new Map(),
|
||||
captureFinalizeTimers: new Map(),
|
||||
captureGenerations: new Map(),
|
||||
};
|
||||
}
|
||||
|
||||
export function stopVoiceCaptureState(state: VoiceCaptureState): void {
|
||||
for (const { timer } of state.captureFinalizeTimers.values()) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
state.captureFinalizeTimers.clear();
|
||||
for (const { stream } of state.activeCaptureStreams.values()) {
|
||||
stream.destroy();
|
||||
}
|
||||
state.activeCaptureStreams.clear();
|
||||
state.captureGenerations.clear();
|
||||
state.activeSpeakers.clear();
|
||||
}
|
||||
|
||||
export function getActiveVoiceCapture(
|
||||
state: VoiceCaptureState,
|
||||
userId: string,
|
||||
): VoiceCaptureEntry | undefined {
|
||||
return state.activeCaptureStreams.get(userId);
|
||||
}
|
||||
|
||||
export function isVoiceCaptureActive(state: VoiceCaptureState, userId: string): boolean {
|
||||
return state.activeSpeakers.has(userId);
|
||||
}
|
||||
|
||||
export function clearVoiceCaptureFinalizeTimer(
|
||||
state: VoiceCaptureState,
|
||||
userId: string,
|
||||
generation?: number,
|
||||
): boolean {
|
||||
const scheduled = state.captureFinalizeTimers.get(userId);
|
||||
if (!scheduled || (generation !== undefined && scheduled.generation !== generation)) {
|
||||
return false;
|
||||
}
|
||||
clearTimeout(scheduled.timer);
|
||||
state.captureFinalizeTimers.delete(userId);
|
||||
return true;
|
||||
}
|
||||
|
||||
export function beginVoiceCapture(
|
||||
state: VoiceCaptureState,
|
||||
userId: string,
|
||||
stream: Readable,
|
||||
): number {
|
||||
const generation = (state.captureGenerations.get(userId) ?? 0) + 1;
|
||||
state.captureGenerations.set(userId, generation);
|
||||
state.activeSpeakers.add(userId);
|
||||
state.activeCaptureStreams.set(userId, { generation, stream });
|
||||
clearVoiceCaptureFinalizeTimer(state, userId, generation);
|
||||
return generation;
|
||||
}
|
||||
|
||||
export function finishVoiceCapture(
|
||||
state: VoiceCaptureState,
|
||||
userId: string,
|
||||
generation: number,
|
||||
): boolean {
|
||||
clearVoiceCaptureFinalizeTimer(state, userId, generation);
|
||||
const activeCapture = state.activeCaptureStreams.get(userId);
|
||||
if (activeCapture?.generation !== generation) {
|
||||
return false;
|
||||
}
|
||||
state.activeCaptureStreams.delete(userId);
|
||||
state.activeSpeakers.delete(userId);
|
||||
return true;
|
||||
}
|
||||
|
||||
export function scheduleVoiceCaptureFinalize(params: {
|
||||
state: VoiceCaptureState;
|
||||
userId: string;
|
||||
delayMs: number;
|
||||
onFinalize?: (capture: VoiceCaptureEntry) => void;
|
||||
}): boolean {
|
||||
const { state, userId, delayMs, onFinalize } = params;
|
||||
const capture = state.activeCaptureStreams.get(userId);
|
||||
if (!capture) {
|
||||
return false;
|
||||
}
|
||||
clearVoiceCaptureFinalizeTimer(state, userId, capture.generation);
|
||||
const timer = setTimeout(() => {
|
||||
const activeCapture = state.activeCaptureStreams.get(userId);
|
||||
if (!activeCapture || activeCapture.generation !== capture.generation) {
|
||||
return;
|
||||
}
|
||||
state.captureFinalizeTimers.delete(userId);
|
||||
state.activeCaptureStreams.delete(userId);
|
||||
state.activeSpeakers.delete(userId);
|
||||
onFinalize?.(activeCapture);
|
||||
activeCapture.stream.destroy();
|
||||
}, delayMs);
|
||||
state.captureFinalizeTimers.set(userId, { generation: capture.generation, timer });
|
||||
return true;
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
import { ChannelType } from "@buape/carbon";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createVoiceCaptureState } from "./capture-state.js";
|
||||
import { createVoiceReceiveRecoveryState } from "./receive-recovery.js";
|
||||
|
||||
const {
|
||||
createConnectionMock,
|
||||
@@ -263,13 +265,8 @@ describe("DiscordVoiceManager", () => {
|
||||
player: createAudioPlayerMock(),
|
||||
playbackQueue: Promise.resolve(),
|
||||
processingQueue: Promise.resolve(),
|
||||
activeSpeakers: new Set<string>(),
|
||||
activeCaptureStreams: new Map(),
|
||||
captureFinalizeTimers: new Map(),
|
||||
captureGenerations: new Map(),
|
||||
decryptFailureCount: 0,
|
||||
lastDecryptFailureAt: 0,
|
||||
decryptRecoveryInFlight: false,
|
||||
capture: createVoiceCaptureState(),
|
||||
receiveRecovery: createVoiceReceiveRecoveryState(),
|
||||
},
|
||||
wavPath: "/tmp/test.wav",
|
||||
userId,
|
||||
@@ -383,7 +380,9 @@ describe("DiscordVoiceManager", () => {
|
||||
|
||||
it("re-arms passthrough but still rejoin-recovers after repeated decrypt failures", async () => {
|
||||
const connection = createConnectionMock();
|
||||
joinVoiceChannelMock.mockReturnValueOnce(connection).mockReturnValueOnce(createConnectionMock());
|
||||
joinVoiceChannelMock
|
||||
.mockReturnValueOnce(connection)
|
||||
.mockReturnValueOnce(createConnectionMock());
|
||||
const manager = createManager();
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
@@ -412,18 +411,23 @@ describe("DiscordVoiceManager", () => {
|
||||
| {
|
||||
guildId: string;
|
||||
channelId: string;
|
||||
activeSpeakers: Set<string>;
|
||||
activeCaptureStreams: Map<string, { generation: number; stream: { destroy: () => void } }>;
|
||||
captureFinalizeTimers: Map<string, unknown>;
|
||||
captureGenerations: Map<string, number>;
|
||||
capture: {
|
||||
activeSpeakers: Set<string>;
|
||||
activeCaptureStreams: Map<
|
||||
string,
|
||||
{ generation: number; stream: { destroy: () => void } }
|
||||
>;
|
||||
captureFinalizeTimers: Map<string, unknown>;
|
||||
captureGenerations: Map<string, number>;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
expect(entry).toBeDefined();
|
||||
|
||||
const firstStream = { destroy: vi.fn() };
|
||||
entry?.activeSpeakers.add("u1");
|
||||
entry?.captureGenerations.set("u1", 1);
|
||||
entry?.activeCaptureStreams.set("u1", { generation: 1, stream: firstStream });
|
||||
entry?.capture.activeSpeakers.add("u1");
|
||||
entry?.capture.captureGenerations.set("u1", 1);
|
||||
entry?.capture.activeCaptureStreams.set("u1", { generation: 1, stream: firstStream });
|
||||
|
||||
(
|
||||
manager as unknown as {
|
||||
@@ -434,7 +438,7 @@ describe("DiscordVoiceManager", () => {
|
||||
await vi.advanceTimersByTimeAsync(1_200);
|
||||
|
||||
expect(firstStream.destroy).toHaveBeenCalledTimes(1);
|
||||
expect(entry?.activeSpeakers.has("u1")).toBe(false);
|
||||
expect(entry?.capture.activeSpeakers.has("u1")).toBe(false);
|
||||
|
||||
const secondStream = {
|
||||
on: vi.fn(),
|
||||
|
||||
@@ -22,7 +22,29 @@ import { normalizeDiscordSlug, resolveDiscordOwnerAccess } from "../monitor/allo
|
||||
import { formatDiscordUserTag } from "../monitor/format.js";
|
||||
import { getDiscordRuntime } from "../runtime.js";
|
||||
import { authorizeDiscordVoiceIngress } from "./access.js";
|
||||
import {
|
||||
beginVoiceCapture,
|
||||
clearVoiceCaptureFinalizeTimer,
|
||||
createVoiceCaptureState,
|
||||
finishVoiceCapture,
|
||||
getActiveVoiceCapture,
|
||||
isVoiceCaptureActive,
|
||||
scheduleVoiceCaptureFinalize,
|
||||
stopVoiceCaptureState,
|
||||
type VoiceCaptureState,
|
||||
} from "./capture-state.js";
|
||||
import { formatVoiceIngressPrompt } from "./prompt.js";
|
||||
import {
|
||||
analyzeVoiceReceiveError,
|
||||
createVoiceReceiveRecoveryState,
|
||||
DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS,
|
||||
DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS,
|
||||
enableDaveReceivePassthrough as tryEnableDaveReceivePassthrough,
|
||||
finishVoiceDecryptRecovery,
|
||||
noteVoiceDecryptFailure,
|
||||
resetVoiceReceiveRecoveryState,
|
||||
type VoiceReceiveRecoveryState,
|
||||
} from "./receive-recovery.js";
|
||||
import { sanitizeVoiceReplyTextForSpeech } from "./sanitize.js";
|
||||
import { loadDiscordVoiceSdk } from "./sdk-runtime.js";
|
||||
|
||||
@@ -36,12 +58,6 @@ const CAPTURE_FINALIZE_GRACE_MS = 1_200;
|
||||
const VOICE_CONNECT_READY_TIMEOUT_MS = 15_000;
|
||||
const PLAYBACK_READY_TIMEOUT_MS = 60_000;
|
||||
const SPEAKING_READY_TIMEOUT_MS = 60_000;
|
||||
const DECRYPT_FAILURE_WINDOW_MS = 30_000;
|
||||
const DECRYPT_FAILURE_RECONNECT_THRESHOLD = 3;
|
||||
const DECRYPT_FAILURE_PATTERN = /DecryptionFailed\(/;
|
||||
const DAVE_PASSTHROUGH_DISABLED_PATTERN = /UnencryptedWhenPassthroughDisabled/;
|
||||
const DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS = 30;
|
||||
const DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS = 15;
|
||||
const SPEAKER_CONTEXT_CACHE_TTL_MS = 60_000;
|
||||
|
||||
const logger = createSubsystemLogger("discord/voice");
|
||||
@@ -57,16 +73,6 @@ type VoiceOperationResult = {
|
||||
guildId?: string;
|
||||
};
|
||||
|
||||
type VoiceCaptureEntry = {
|
||||
generation: number;
|
||||
stream: Readable;
|
||||
};
|
||||
|
||||
type VoiceCaptureFinalizeTimer = {
|
||||
generation: number;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
type VoiceSessionEntry = {
|
||||
guildId: string;
|
||||
guildName?: string;
|
||||
@@ -78,13 +84,8 @@ type VoiceSessionEntry = {
|
||||
player: import("@discordjs/voice").AudioPlayer;
|
||||
playbackQueue: Promise<void>;
|
||||
processingQueue: Promise<void>;
|
||||
activeSpeakers: Set<string>;
|
||||
activeCaptureStreams: Map<string, VoiceCaptureEntry>;
|
||||
captureFinalizeTimers: Map<string, VoiceCaptureFinalizeTimer>;
|
||||
captureGenerations: Map<string, number>;
|
||||
decryptFailureCount: number;
|
||||
lastDecryptFailureAt: number;
|
||||
decryptRecoveryInFlight: boolean;
|
||||
capture: VoiceCaptureState;
|
||||
receiveRecovery: VoiceReceiveRecoveryState;
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
@@ -172,25 +173,6 @@ type OpusDecoderFactory = {
|
||||
let warnedOpusMissing = false;
|
||||
let cachedOpusDecoderFactory: OpusDecoderFactory | null | "unresolved" = "unresolved";
|
||||
|
||||
function isAbortLikeError(err: unknown): boolean {
|
||||
if (!err || typeof err !== "object") {
|
||||
return false;
|
||||
}
|
||||
const name =
|
||||
"name" in err && typeof (err as { name?: unknown }).name === "string"
|
||||
? (err as { name: string }).name
|
||||
: "";
|
||||
const message =
|
||||
"message" in err && typeof (err as { message?: unknown }).message === "string"
|
||||
? (err as { message: string }).message
|
||||
: "";
|
||||
return (
|
||||
name === "AbortError" ||
|
||||
message.includes("The operation was aborted") ||
|
||||
message.includes("aborted")
|
||||
);
|
||||
}
|
||||
|
||||
function resolveOpusDecoderFactory(): OpusDecoderFactory | null {
|
||||
const factories: OpusDecoderFactory[] = [
|
||||
{
|
||||
@@ -534,13 +516,8 @@ export class DiscordVoiceManager {
|
||||
player,
|
||||
playbackQueue: Promise.resolve(),
|
||||
processingQueue: Promise.resolve(),
|
||||
activeSpeakers: new Set(),
|
||||
activeCaptureStreams: new Map(),
|
||||
captureFinalizeTimers: new Map(),
|
||||
captureGenerations: new Map(),
|
||||
decryptFailureCount: 0,
|
||||
lastDecryptFailureAt: 0,
|
||||
decryptRecoveryInFlight: false,
|
||||
capture: createVoiceCaptureState(),
|
||||
receiveRecovery: createVoiceReceiveRecoveryState(),
|
||||
stop: () => {
|
||||
if (speakingHandler) {
|
||||
connection.receiver.speaking.off("start", speakingHandler);
|
||||
@@ -548,16 +525,7 @@ export class DiscordVoiceManager {
|
||||
if (speakingEndHandler) {
|
||||
connection.receiver.speaking.off("end", speakingEndHandler);
|
||||
}
|
||||
for (const { timer } of entry.captureFinalizeTimers.values()) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
entry.captureFinalizeTimers.clear();
|
||||
for (const { stream } of entry.activeCaptureStreams.values()) {
|
||||
stream.destroy();
|
||||
}
|
||||
entry.activeCaptureStreams.clear();
|
||||
entry.captureGenerations.clear();
|
||||
entry.activeSpeakers.clear();
|
||||
stopVoiceCaptureState(entry.capture);
|
||||
if (disconnectedHandler) {
|
||||
connection.off(voiceSdk.VoiceConnectionStatus.Disconnected, disconnectedHandler);
|
||||
}
|
||||
@@ -660,35 +628,20 @@ export class DiscordVoiceManager {
|
||||
}
|
||||
|
||||
private clearCaptureFinalizeTimer(entry: VoiceSessionEntry, userId: string, generation?: number) {
|
||||
const scheduled = entry.captureFinalizeTimers.get(userId);
|
||||
if (!scheduled || (generation !== undefined && scheduled.generation !== generation)) {
|
||||
return false;
|
||||
}
|
||||
clearTimeout(scheduled.timer);
|
||||
entry.captureFinalizeTimers.delete(userId);
|
||||
return true;
|
||||
return clearVoiceCaptureFinalizeTimer(entry.capture, userId, generation);
|
||||
}
|
||||
|
||||
private scheduleCaptureFinalize(entry: VoiceSessionEntry, userId: string, reason: string) {
|
||||
const capture = entry.activeCaptureStreams.get(userId);
|
||||
if (!capture) {
|
||||
return;
|
||||
}
|
||||
this.clearCaptureFinalizeTimer(entry, userId, capture.generation);
|
||||
const timer = setTimeout(() => {
|
||||
const activeCapture = entry.activeCaptureStreams.get(userId);
|
||||
if (!activeCapture || activeCapture.generation !== capture.generation) {
|
||||
return;
|
||||
}
|
||||
entry.captureFinalizeTimers.delete(userId);
|
||||
entry.activeCaptureStreams.delete(userId);
|
||||
entry.activeSpeakers.delete(userId);
|
||||
logVoiceVerbose(
|
||||
`capture finalize: guild ${entry.guildId} channel ${entry.channelId} user ${userId} reason=${reason} grace=${CAPTURE_FINALIZE_GRACE_MS}ms`,
|
||||
);
|
||||
activeCapture.stream.destroy();
|
||||
}, CAPTURE_FINALIZE_GRACE_MS);
|
||||
entry.captureFinalizeTimers.set(userId, { generation: capture.generation, timer });
|
||||
scheduleVoiceCaptureFinalize({
|
||||
state: entry.capture,
|
||||
userId,
|
||||
delayMs: CAPTURE_FINALIZE_GRACE_MS,
|
||||
onFinalize: () => {
|
||||
logVoiceVerbose(
|
||||
`capture finalize: guild ${entry.guildId} channel ${entry.channelId} user ${userId} reason=${reason} grace=${CAPTURE_FINALIZE_GRACE_MS}ms`,
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private async handleSpeakingStart(entry: VoiceSessionEntry, userId: string) {
|
||||
@@ -698,8 +651,8 @@ export class DiscordVoiceManager {
|
||||
if (this.botUserId && userId === this.botUserId) {
|
||||
return;
|
||||
}
|
||||
if (entry.activeSpeakers.has(userId)) {
|
||||
const activeCapture = entry.activeCaptureStreams.get(userId);
|
||||
if (isVoiceCaptureActive(entry.capture, userId)) {
|
||||
const activeCapture = getActiveVoiceCapture(entry.capture, userId);
|
||||
const extended = activeCapture
|
||||
? this.clearCaptureFinalizeTimer(entry, userId, activeCapture.generation)
|
||||
: false;
|
||||
@@ -709,7 +662,6 @@ export class DiscordVoiceManager {
|
||||
return;
|
||||
}
|
||||
|
||||
entry.activeSpeakers.add(userId);
|
||||
logVoiceVerbose(
|
||||
`capture start: guild ${entry.guildId} channel ${entry.channelId} user ${userId}`,
|
||||
);
|
||||
@@ -723,18 +675,15 @@ export class DiscordVoiceManager {
|
||||
entry.player.stop(true);
|
||||
}
|
||||
|
||||
const generation = (entry.captureGenerations.get(userId) ?? 0) + 1;
|
||||
entry.captureGenerations.set(userId, generation);
|
||||
const stream = entry.connection.receiver.subscribe(userId, {
|
||||
end: {
|
||||
behavior: voiceSdk.EndBehaviorType.Manual,
|
||||
},
|
||||
});
|
||||
entry.activeCaptureStreams.set(userId, { generation, stream });
|
||||
this.clearCaptureFinalizeTimer(entry, userId, generation);
|
||||
const generation = beginVoiceCapture(entry.capture, userId, stream);
|
||||
let streamAborted = false;
|
||||
stream.on("error", (err) => {
|
||||
streamAborted = isAbortLikeError(err);
|
||||
streamAborted = analyzeVoiceReceiveError(err).isAbortLike;
|
||||
this.handleReceiveError(entry, err);
|
||||
});
|
||||
|
||||
@@ -762,12 +711,7 @@ export class DiscordVoiceManager {
|
||||
await this.processSegment({ entry, wavPath, userId, durationSeconds });
|
||||
});
|
||||
} finally {
|
||||
this.clearCaptureFinalizeTimer(entry, userId, generation);
|
||||
const activeCapture = entry.activeCaptureStreams.get(userId);
|
||||
if (activeCapture?.generation === generation) {
|
||||
entry.activeCaptureStreams.delete(userId);
|
||||
entry.activeSpeakers.delete(userId);
|
||||
}
|
||||
finishVoiceCapture(entry.capture, userId, generation);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -907,44 +851,33 @@ export class DiscordVoiceManager {
|
||||
}
|
||||
|
||||
private handleReceiveError(entry: VoiceSessionEntry, err: unknown) {
|
||||
const message = formatErrorMessage(err);
|
||||
logger.warn(`discord voice: receive error: ${message}`);
|
||||
const sawPassthroughDisabled = DAVE_PASSTHROUGH_DISABLED_PATTERN.test(message);
|
||||
if (sawPassthroughDisabled) {
|
||||
const analysis = analyzeVoiceReceiveError(err);
|
||||
logger.warn(`discord voice: receive error: ${analysis.message}`);
|
||||
if (analysis.shouldAttemptPassthrough) {
|
||||
this.enableDaveReceivePassthrough(
|
||||
entry,
|
||||
"receive decrypt error",
|
||||
DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS,
|
||||
);
|
||||
}
|
||||
if (!DECRYPT_FAILURE_PATTERN.test(message) && !sawPassthroughDisabled) {
|
||||
if (!analysis.countsAsDecryptFailure) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
if (now - entry.lastDecryptFailureAt > DECRYPT_FAILURE_WINDOW_MS) {
|
||||
entry.decryptFailureCount = 0;
|
||||
}
|
||||
entry.lastDecryptFailureAt = now;
|
||||
entry.decryptFailureCount += 1;
|
||||
if (entry.decryptFailureCount === 1) {
|
||||
const decryptFailure = noteVoiceDecryptFailure(entry.receiveRecovery);
|
||||
if (decryptFailure.firstFailure) {
|
||||
logger.warn(
|
||||
"discord voice: DAVE decrypt failures detected; voice receive may be unstable (upstream: discordjs/discord.js#11419)",
|
||||
);
|
||||
}
|
||||
if (
|
||||
entry.decryptFailureCount < DECRYPT_FAILURE_RECONNECT_THRESHOLD ||
|
||||
entry.decryptRecoveryInFlight
|
||||
) {
|
||||
if (!decryptFailure.shouldRecover) {
|
||||
return;
|
||||
}
|
||||
entry.decryptRecoveryInFlight = true;
|
||||
this.resetDecryptFailureState(entry);
|
||||
void this.recoverFromDecryptFailures(entry)
|
||||
.catch((recoverErr) =>
|
||||
logger.warn(`discord voice: decrypt recovery failed: ${formatErrorMessage(recoverErr)}`),
|
||||
)
|
||||
.finally(() => {
|
||||
entry.decryptRecoveryInFlight = false;
|
||||
finishVoiceDecryptRecovery(entry.receiveRecovery);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -954,51 +887,44 @@ export class DiscordVoiceManager {
|
||||
expirySeconds: number,
|
||||
): boolean {
|
||||
const voiceSdk = loadDiscordVoiceSdk();
|
||||
const state = entry.connection.state as {
|
||||
status: unknown;
|
||||
networking?: {
|
||||
state?: {
|
||||
code?: unknown;
|
||||
dave?: {
|
||||
session?: {
|
||||
setPassthroughMode: (passthrough: boolean, expiry: number) => void;
|
||||
return tryEnableDaveReceivePassthrough({
|
||||
target: {
|
||||
guildId: entry.guildId,
|
||||
channelId: entry.channelId,
|
||||
connection: entry.connection as {
|
||||
state: {
|
||||
status: unknown;
|
||||
networking?: {
|
||||
state?: {
|
||||
code?: unknown;
|
||||
dave?: {
|
||||
session?: {
|
||||
setPassthroughMode: (passthrough: boolean, expirySeconds: number) => void;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
if (state.status !== voiceSdk.VoiceConnectionStatus.Ready) {
|
||||
return false;
|
||||
}
|
||||
const networkingState = state.networking?.state;
|
||||
if (
|
||||
!networkingState ||
|
||||
(networkingState.code !== voiceSdk.NetworkingStatusCode.Ready &&
|
||||
networkingState.code !== voiceSdk.NetworkingStatusCode.Resuming)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const daveSession = networkingState.dave?.session;
|
||||
if (!daveSession) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
daveSession.setPassthroughMode(true, expirySeconds);
|
||||
logVoiceVerbose(
|
||||
`enabled DAVE receive passthrough: guild ${entry.guildId} channel ${entry.channelId} expiry=${expirySeconds}s reason=${reason}`,
|
||||
);
|
||||
return true;
|
||||
} catch (passthroughErr) {
|
||||
logger.warn(
|
||||
`discord voice: failed to enable DAVE passthrough guild=${entry.guildId} channel=${entry.channelId} reason=${reason}: ${formatErrorMessage(passthroughErr)}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
},
|
||||
},
|
||||
sdk: {
|
||||
VoiceConnectionStatus: {
|
||||
Ready: voiceSdk.VoiceConnectionStatus.Ready,
|
||||
},
|
||||
NetworkingStatusCode: {
|
||||
Ready: voiceSdk.NetworkingStatusCode.Ready,
|
||||
Resuming: voiceSdk.NetworkingStatusCode.Resuming,
|
||||
},
|
||||
},
|
||||
reason,
|
||||
expirySeconds,
|
||||
onVerbose: logVoiceVerbose,
|
||||
onWarn: (message) => logger.warn(message),
|
||||
});
|
||||
}
|
||||
|
||||
private resetDecryptFailureState(entry: VoiceSessionEntry) {
|
||||
entry.decryptFailureCount = 0;
|
||||
entry.lastDecryptFailureAt = 0;
|
||||
resetVoiceReceiveRecoveryState(entry.receiveRecovery);
|
||||
}
|
||||
|
||||
private async recoverFromDecryptFailures(entry: VoiceSessionEntry) {
|
||||
|
||||
79
extensions/discord/src/voice/receive-recovery.test.ts
Normal file
79
extensions/discord/src/voice/receive-recovery.test.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
analyzeVoiceReceiveError,
|
||||
createVoiceReceiveRecoveryState,
|
||||
enableDaveReceivePassthrough,
|
||||
noteVoiceDecryptFailure,
|
||||
} from "./receive-recovery.js";
|
||||
|
||||
describe("voice receive recovery", () => {
|
||||
it("treats passthrough-disabled decrypt errors as decrypt failures", () => {
|
||||
expect(
|
||||
analyzeVoiceReceiveError(
|
||||
new Error("Failed to decrypt: DecryptionFailed(UnencryptedWhenPassthroughDisabled)"),
|
||||
),
|
||||
).toMatchObject({
|
||||
shouldAttemptPassthrough: true,
|
||||
countsAsDecryptFailure: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("gates recovery after repeated decrypt failures in the same window", () => {
|
||||
const state = createVoiceReceiveRecoveryState();
|
||||
|
||||
expect(noteVoiceDecryptFailure(state, 1_000)).toEqual({
|
||||
firstFailure: true,
|
||||
shouldRecover: false,
|
||||
});
|
||||
expect(noteVoiceDecryptFailure(state, 2_000)).toEqual({
|
||||
firstFailure: false,
|
||||
shouldRecover: false,
|
||||
});
|
||||
expect(noteVoiceDecryptFailure(state, 3_000)).toEqual({
|
||||
firstFailure: false,
|
||||
shouldRecover: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("enables passthrough only for ready DAVE sessions", () => {
|
||||
const setPassthroughMode = vi.fn();
|
||||
const onVerbose = vi.fn();
|
||||
const onWarn = vi.fn();
|
||||
|
||||
expect(
|
||||
enableDaveReceivePassthrough({
|
||||
target: {
|
||||
guildId: "g1",
|
||||
channelId: "c1",
|
||||
connection: {
|
||||
state: {
|
||||
status: "ready",
|
||||
networking: {
|
||||
state: {
|
||||
code: "networking-ready",
|
||||
dave: {
|
||||
session: {
|
||||
setPassthroughMode,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
sdk: {
|
||||
VoiceConnectionStatus: { Ready: "ready" },
|
||||
NetworkingStatusCode: { Ready: "networking-ready", Resuming: "networking-resuming" },
|
||||
},
|
||||
reason: "test",
|
||||
expirySeconds: 15,
|
||||
onVerbose,
|
||||
onWarn,
|
||||
}),
|
||||
).toBe(true);
|
||||
|
||||
expect(setPassthroughMode).toHaveBeenCalledWith(true, 15);
|
||||
expect(onVerbose).toHaveBeenCalled();
|
||||
expect(onWarn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
159
extensions/discord/src/voice/receive-recovery.ts
Normal file
159
extensions/discord/src/voice/receive-recovery.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
|
||||
|
||||
const DECRYPT_FAILURE_WINDOW_MS = 30_000;
|
||||
const DECRYPT_FAILURE_RECONNECT_THRESHOLD = 3;
|
||||
const DECRYPT_FAILURE_PATTERN = /DecryptionFailed\(/;
|
||||
const DAVE_PASSTHROUGH_DISABLED_PATTERN = /UnencryptedWhenPassthroughDisabled/;
|
||||
|
||||
export const DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS = 30;
|
||||
export const DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS = 15;
|
||||
|
||||
export type VoiceReceiveRecoveryState = {
|
||||
decryptFailureCount: number;
|
||||
lastDecryptFailureAt: number;
|
||||
decryptRecoveryInFlight: boolean;
|
||||
};
|
||||
|
||||
export type VoiceReceiveErrorAnalysis = {
|
||||
message: string;
|
||||
isAbortLike: boolean;
|
||||
shouldAttemptPassthrough: boolean;
|
||||
countsAsDecryptFailure: boolean;
|
||||
};
|
||||
|
||||
type DavePassthroughTarget = {
|
||||
guildId: string;
|
||||
channelId: string;
|
||||
connection: {
|
||||
state: {
|
||||
status: unknown;
|
||||
networking?: {
|
||||
state?: {
|
||||
code?: unknown;
|
||||
dave?: {
|
||||
session?: {
|
||||
setPassthroughMode: (passthrough: boolean, expirySeconds: number) => void;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
type DavePassthroughSdk = {
|
||||
VoiceConnectionStatus: {
|
||||
Ready: unknown;
|
||||
};
|
||||
NetworkingStatusCode: {
|
||||
Ready: unknown;
|
||||
Resuming: unknown;
|
||||
};
|
||||
};
|
||||
|
||||
export function createVoiceReceiveRecoveryState(): VoiceReceiveRecoveryState {
|
||||
return {
|
||||
decryptFailureCount: 0,
|
||||
lastDecryptFailureAt: 0,
|
||||
decryptRecoveryInFlight: false,
|
||||
};
|
||||
}
|
||||
|
||||
export function isAbortLikeReceiveError(err: unknown): boolean {
|
||||
if (!err || typeof err !== "object") {
|
||||
return false;
|
||||
}
|
||||
const name =
|
||||
"name" in err && typeof (err as { name?: unknown }).name === "string"
|
||||
? (err as { name: string }).name
|
||||
: "";
|
||||
const message =
|
||||
"message" in err && typeof (err as { message?: unknown }).message === "string"
|
||||
? (err as { message: string }).message
|
||||
: "";
|
||||
return (
|
||||
name === "AbortError" ||
|
||||
message.includes("The operation was aborted") ||
|
||||
message.includes("aborted")
|
||||
);
|
||||
}
|
||||
|
||||
export function analyzeVoiceReceiveError(err: unknown): VoiceReceiveErrorAnalysis {
|
||||
const message = formatErrorMessage(err);
|
||||
const shouldAttemptPassthrough = DAVE_PASSTHROUGH_DISABLED_PATTERN.test(message);
|
||||
return {
|
||||
message,
|
||||
isAbortLike: isAbortLikeReceiveError(err),
|
||||
shouldAttemptPassthrough,
|
||||
countsAsDecryptFailure: DECRYPT_FAILURE_PATTERN.test(message) || shouldAttemptPassthrough,
|
||||
};
|
||||
}
|
||||
|
||||
export function noteVoiceDecryptFailure(
|
||||
state: VoiceReceiveRecoveryState,
|
||||
now: number = Date.now(),
|
||||
): {
|
||||
firstFailure: boolean;
|
||||
shouldRecover: boolean;
|
||||
} {
|
||||
if (now - state.lastDecryptFailureAt > DECRYPT_FAILURE_WINDOW_MS) {
|
||||
state.decryptFailureCount = 0;
|
||||
}
|
||||
state.lastDecryptFailureAt = now;
|
||||
state.decryptFailureCount += 1;
|
||||
const firstFailure = state.decryptFailureCount === 1;
|
||||
if (
|
||||
state.decryptFailureCount < DECRYPT_FAILURE_RECONNECT_THRESHOLD ||
|
||||
state.decryptRecoveryInFlight
|
||||
) {
|
||||
return { firstFailure, shouldRecover: false };
|
||||
}
|
||||
state.decryptRecoveryInFlight = true;
|
||||
resetVoiceReceiveRecoveryState(state);
|
||||
return { firstFailure, shouldRecover: true };
|
||||
}
|
||||
|
||||
export function resetVoiceReceiveRecoveryState(state: VoiceReceiveRecoveryState): void {
|
||||
state.decryptFailureCount = 0;
|
||||
state.lastDecryptFailureAt = 0;
|
||||
}
|
||||
|
||||
export function finishVoiceDecryptRecovery(state: VoiceReceiveRecoveryState): void {
|
||||
state.decryptRecoveryInFlight = false;
|
||||
}
|
||||
|
||||
export function enableDaveReceivePassthrough(params: {
|
||||
target: DavePassthroughTarget;
|
||||
sdk: DavePassthroughSdk;
|
||||
reason: string;
|
||||
expirySeconds: number;
|
||||
onVerbose: (message: string) => void;
|
||||
onWarn: (message: string) => void;
|
||||
}): boolean {
|
||||
const { target, sdk, reason, expirySeconds, onVerbose, onWarn } = params;
|
||||
const networkingState = target.connection.state.networking?.state;
|
||||
if (
|
||||
target.connection.state.status !== sdk.VoiceConnectionStatus.Ready ||
|
||||
!networkingState ||
|
||||
(networkingState.code !== sdk.NetworkingStatusCode.Ready &&
|
||||
networkingState.code !== sdk.NetworkingStatusCode.Resuming)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const daveSession = networkingState.dave?.session;
|
||||
if (!daveSession) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
daveSession.setPassthroughMode(true, expirySeconds);
|
||||
onVerbose(
|
||||
`enabled DAVE receive passthrough: guild ${target.guildId} channel ${target.channelId} expiry=${expirySeconds}s reason=${reason}`,
|
||||
);
|
||||
return true;
|
||||
} catch (err) {
|
||||
onWarn(
|
||||
`discord voice: failed to enable DAVE passthrough guild=${target.guildId} channel=${target.channelId} reason=${reason}: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user