mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
fix(discord): restore voice receive path and reply playback
This commit is contained in:
committed by
Peter Steinberger
parent
80789809a4
commit
37e89b930f
@@ -6,10 +6,14 @@
|
||||
"dependencies": {
|
||||
"@buape/carbon": "0.0.0-beta-20260406003433",
|
||||
"@discordjs/voice": "^0.19.2",
|
||||
"@snazzah/davey": "^0.1.11",
|
||||
"discord-api-types": "^0.38.44",
|
||||
"https-proxy-agent": "^9.0.0",
|
||||
"opusscript": "^0.1.1"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@discordjs/opus": "^0.10.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"openclaw": "workspace:*"
|
||||
},
|
||||
|
||||
@@ -24,11 +24,26 @@ const {
|
||||
};
|
||||
subscribe: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
state: {
|
||||
status: string;
|
||||
networking: {
|
||||
state: {
|
||||
code: string;
|
||||
dave: {
|
||||
session: {
|
||||
setPassthroughMode: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
daveSetPassthroughMode: ReturnType<typeof vi.fn>;
|
||||
handlers: Map<string, EventHandler>;
|
||||
};
|
||||
|
||||
const createConnectionMock = (): MockConnection => {
|
||||
const handlers = new Map<string, EventHandler>();
|
||||
const daveSetPassthroughMode = vi.fn();
|
||||
const connection: MockConnection = {
|
||||
destroy: vi.fn(),
|
||||
subscribe: vi.fn(),
|
||||
@@ -43,9 +58,24 @@ const {
|
||||
},
|
||||
subscribe: vi.fn(() => ({
|
||||
on: vi.fn(),
|
||||
destroy: vi.fn(),
|
||||
[Symbol.asyncIterator]: async function* () {},
|
||||
})),
|
||||
},
|
||||
state: {
|
||||
status: "ready",
|
||||
networking: {
|
||||
state: {
|
||||
code: "networking-ready",
|
||||
dave: {
|
||||
session: {
|
||||
setPassthroughMode: daveSetPassthroughMode,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
daveSetPassthroughMode,
|
||||
handlers,
|
||||
};
|
||||
return connection;
|
||||
@@ -74,7 +104,8 @@ const {
|
||||
vi.mock("./sdk-runtime.js", () => ({
|
||||
loadDiscordVoiceSdk: () => ({
|
||||
AudioPlayerStatus: { Playing: "playing", Idle: "idle" },
|
||||
EndBehaviorType: { AfterSilence: "AfterSilence" },
|
||||
EndBehaviorType: { AfterSilence: "AfterSilence", Manual: "Manual" },
|
||||
NetworkingStatusCode: { Ready: "networking-ready", Resuming: "networking-resuming" },
|
||||
VoiceConnectionStatus: {
|
||||
Ready: "ready",
|
||||
Disconnected: "disconnected",
|
||||
@@ -228,6 +259,17 @@ describe("DiscordVoiceManager", () => {
|
||||
guildId: "g1",
|
||||
channelId: "1001",
|
||||
route: { sessionKey: "discord:g1:1001", agentId: "agent-1" },
|
||||
connection: createConnectionMock(),
|
||||
player: createAudioPlayerMock(),
|
||||
playbackQueue: Promise.resolve(),
|
||||
processingQueue: Promise.resolve(),
|
||||
activeSpeakers: new Set<string>(),
|
||||
activeCaptureStreams: new Map(),
|
||||
captureFinalizeTimers: new Map(),
|
||||
captureGenerations: new Map(),
|
||||
decryptFailureCount: 0,
|
||||
lastDecryptFailureAt: 0,
|
||||
decryptRecoveryInFlight: false,
|
||||
},
|
||||
wavPath: "/tmp/test.wav",
|
||||
userId,
|
||||
@@ -284,6 +326,7 @@ describe("DiscordVoiceManager", () => {
|
||||
|
||||
const player = createAudioPlayerMock.mock.results[0]?.value;
|
||||
expect(connection.receiver.speaking.off).toHaveBeenCalledWith("start", expect.any(Function));
|
||||
expect(connection.receiver.speaking.off).toHaveBeenCalledWith("end", expect.any(Function));
|
||||
expect(connection.off).toHaveBeenCalledWith("disconnected", expect.any(Function));
|
||||
expect(connection.off).toHaveBeenCalledWith("destroyed", expect.any(Function));
|
||||
expect(player.off).toHaveBeenCalledWith("error", expect.any(Function));
|
||||
@@ -328,20 +371,93 @@ describe("DiscordVoiceManager", () => {
|
||||
expect(entry?.guildName).toBe("Guild One");
|
||||
});
|
||||
|
||||
it("attempts rejoin after repeated decrypt failures", async () => {
|
||||
it("enables DAVE receive passthrough after join", async () => {
|
||||
const connection = createConnectionMock();
|
||||
joinVoiceChannelMock.mockReturnValueOnce(connection);
|
||||
const manager = createManager();
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
|
||||
expect(connection.daveSetPassthroughMode).toHaveBeenCalledWith(true, 30);
|
||||
});
|
||||
|
||||
it("re-arms passthrough but still rejoin-recovers after repeated decrypt failures", async () => {
|
||||
const connection = createConnectionMock();
|
||||
joinVoiceChannelMock.mockReturnValueOnce(connection).mockReturnValueOnce(createConnectionMock());
|
||||
const manager = createManager();
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
connection.daveSetPassthroughMode.mockClear();
|
||||
|
||||
emitDecryptFailure(manager);
|
||||
emitDecryptFailure(manager);
|
||||
emitDecryptFailure(manager);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
expect(connection.daveSetPassthroughMode).toHaveBeenCalledWith(true, 15);
|
||||
expect(joinVoiceChannelMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("allows the same speaker to restart after finalize fires", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const connection = createConnectionMock();
|
||||
joinVoiceChannelMock.mockReturnValueOnce(connection);
|
||||
const manager = createManager();
|
||||
|
||||
await manager.join({ guildId: "g1", channelId: "1001" });
|
||||
|
||||
const entry = (manager as unknown as { sessions: Map<string, unknown> }).sessions.get("g1") as
|
||||
| {
|
||||
guildId: string;
|
||||
channelId: string;
|
||||
activeSpeakers: Set<string>;
|
||||
activeCaptureStreams: Map<string, { generation: number; stream: { destroy: () => void } }>;
|
||||
captureFinalizeTimers: Map<string, unknown>;
|
||||
captureGenerations: Map<string, number>;
|
||||
}
|
||||
| undefined;
|
||||
expect(entry).toBeDefined();
|
||||
|
||||
const firstStream = { destroy: vi.fn() };
|
||||
entry?.activeSpeakers.add("u1");
|
||||
entry?.captureGenerations.set("u1", 1);
|
||||
entry?.activeCaptureStreams.set("u1", { generation: 1, stream: firstStream });
|
||||
|
||||
(
|
||||
manager as unknown as {
|
||||
scheduleCaptureFinalize: (entry: unknown, userId: string, reason: string) => void;
|
||||
}
|
||||
).scheduleCaptureFinalize(entry, "u1", "test");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_200);
|
||||
|
||||
expect(firstStream.destroy).toHaveBeenCalledTimes(1);
|
||||
expect(entry?.activeSpeakers.has("u1")).toBe(false);
|
||||
|
||||
const secondStream = {
|
||||
on: vi.fn(),
|
||||
destroy: vi.fn(),
|
||||
async *[Symbol.asyncIterator]() {},
|
||||
};
|
||||
connection.receiver.subscribe.mockReturnValueOnce(secondStream);
|
||||
|
||||
await (
|
||||
manager as unknown as {
|
||||
handleSpeakingStart: (entry: unknown, userId: string) => Promise<void>;
|
||||
}
|
||||
).handleSpeakingStart(entry, "u1");
|
||||
|
||||
expect(connection.receiver.subscribe).toHaveBeenCalledWith(
|
||||
"u1",
|
||||
expect.objectContaining({ end: { behavior: "Manual" } }),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("passes senderIsOwner=true for allowlisted voice speakers", async () => {
|
||||
const client = createClient();
|
||||
client.fetchMember.mockResolvedValue({
|
||||
|
||||
@@ -22,6 +22,8 @@ import { normalizeDiscordSlug, resolveDiscordOwnerAccess } from "../monitor/allo
|
||||
import { formatDiscordUserTag } from "../monitor/format.js";
|
||||
import { getDiscordRuntime } from "../runtime.js";
|
||||
import { authorizeDiscordVoiceIngress } from "./access.js";
|
||||
import { formatVoiceIngressPrompt } from "./prompt.js";
|
||||
import { sanitizeVoiceReplyTextForSpeech } from "./sanitize.js";
|
||||
import { loadDiscordVoiceSdk } from "./sdk-runtime.js";
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
@@ -30,13 +32,16 @@ const SAMPLE_RATE = 48_000;
|
||||
const CHANNELS = 2;
|
||||
const BIT_DEPTH = 16;
|
||||
const MIN_SEGMENT_SECONDS = 0.35;
|
||||
const SILENCE_DURATION_MS = 1_000;
|
||||
const CAPTURE_FINALIZE_GRACE_MS = 1_200;
|
||||
const VOICE_CONNECT_READY_TIMEOUT_MS = 15_000;
|
||||
const PLAYBACK_READY_TIMEOUT_MS = 60_000;
|
||||
const SPEAKING_READY_TIMEOUT_MS = 60_000;
|
||||
const DECRYPT_FAILURE_WINDOW_MS = 30_000;
|
||||
const DECRYPT_FAILURE_RECONNECT_THRESHOLD = 3;
|
||||
const DECRYPT_FAILURE_PATTERN = /DecryptionFailed\(/;
|
||||
const DAVE_PASSTHROUGH_DISABLED_PATTERN = /UnencryptedWhenPassthroughDisabled/;
|
||||
const DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS = 30;
|
||||
const DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS = 15;
|
||||
const SPEAKER_CONTEXT_CACHE_TTL_MS = 60_000;
|
||||
|
||||
const logger = createSubsystemLogger("discord/voice");
|
||||
@@ -52,6 +57,16 @@ type VoiceOperationResult = {
|
||||
guildId?: string;
|
||||
};
|
||||
|
||||
type VoiceCaptureEntry = {
|
||||
generation: number;
|
||||
stream: Readable;
|
||||
};
|
||||
|
||||
type VoiceCaptureFinalizeTimer = {
|
||||
generation: number;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
type VoiceSessionEntry = {
|
||||
guildId: string;
|
||||
guildName?: string;
|
||||
@@ -64,6 +79,9 @@ type VoiceSessionEntry = {
|
||||
playbackQueue: Promise<void>;
|
||||
processingQueue: Promise<void>;
|
||||
activeSpeakers: Set<string>;
|
||||
activeCaptureStreams: Map<string, VoiceCaptureEntry>;
|
||||
captureFinalizeTimers: Map<string, VoiceCaptureFinalizeTimer>;
|
||||
captureGenerations: Map<string, number>;
|
||||
decryptFailureCount: number;
|
||||
lastDecryptFailureAt: number;
|
||||
decryptRecoveryInFlight: boolean;
|
||||
@@ -146,27 +164,90 @@ type OpusDecoder = {
|
||||
decode: (buffer: Buffer) => Buffer;
|
||||
};
|
||||
|
||||
let warnedOpusMissing = false;
|
||||
type OpusDecoderFactory = {
|
||||
load: () => OpusDecoder;
|
||||
name: string;
|
||||
};
|
||||
|
||||
function createOpusDecoder(): { decoder: OpusDecoder; name: string } | null {
|
||||
try {
|
||||
const OpusScript = require("opusscript") as {
|
||||
new (sampleRate: number, channels: number, application: number): OpusDecoder;
|
||||
Application: { AUDIO: number };
|
||||
};
|
||||
const decoder = new OpusScript(SAMPLE_RATE, CHANNELS, OpusScript.Application.AUDIO);
|
||||
return { decoder, name: "opusscript" };
|
||||
} catch (err) {
|
||||
if (!warnedOpusMissing) {
|
||||
warnedOpusMissing = true;
|
||||
logger.warn(
|
||||
`discord voice: opusscript unavailable (${formatErrorMessage(err)}); cannot decode voice audio`,
|
||||
);
|
||||
let warnedOpusMissing = false;
|
||||
let cachedOpusDecoderFactory: OpusDecoderFactory | null | "unresolved" = "unresolved";
|
||||
|
||||
function isAbortLikeError(err: unknown): boolean {
|
||||
if (!err || typeof err !== "object") {
|
||||
return false;
|
||||
}
|
||||
const name = "name" in err ? String((err as { name?: unknown }).name ?? "") : "";
|
||||
const message = "message" in err ? String((err as { message?: unknown }).message ?? "") : "";
|
||||
return (
|
||||
name === "AbortError" ||
|
||||
message.includes("The operation was aborted") ||
|
||||
message.includes("aborted")
|
||||
);
|
||||
}
|
||||
|
||||
function resolveOpusDecoderFactory(): OpusDecoderFactory | null {
|
||||
const factories: OpusDecoderFactory[] = [
|
||||
{
|
||||
name: "@discordjs/opus",
|
||||
load: () => {
|
||||
const DiscordOpus = require("@discordjs/opus") as {
|
||||
OpusEncoder: new (
|
||||
sampleRate: number,
|
||||
channels: number,
|
||||
) => {
|
||||
decode: (buffer: Buffer) => Buffer;
|
||||
};
|
||||
};
|
||||
return new DiscordOpus.OpusEncoder(SAMPLE_RATE, CHANNELS);
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "opusscript",
|
||||
load: () => {
|
||||
const OpusScript = require("opusscript") as {
|
||||
new (sampleRate: number, channels: number, application: number): OpusDecoder;
|
||||
Application: { AUDIO: number };
|
||||
};
|
||||
return new OpusScript(SAMPLE_RATE, CHANNELS, OpusScript.Application.AUDIO);
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const failures: string[] = [];
|
||||
for (const factory of factories) {
|
||||
try {
|
||||
factory.load();
|
||||
return factory;
|
||||
} catch (err) {
|
||||
failures.push(`${factory.name}: ${formatErrorMessage(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!warnedOpusMissing) {
|
||||
warnedOpusMissing = true;
|
||||
logger.warn(
|
||||
`discord voice: no usable opus decoder available (${failures.join("; ")}); cannot decode voice audio`,
|
||||
);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function createOpusDecoder(): { decoder: OpusDecoder; name: string } | null {
|
||||
const factory = getOrCreateOpusDecoderFactory();
|
||||
if (!factory) {
|
||||
return null;
|
||||
}
|
||||
return { decoder: factory.load(), name: factory.name };
|
||||
}
|
||||
|
||||
function getOrCreateOpusDecoderFactory(): OpusDecoderFactory | null {
|
||||
if (cachedOpusDecoderFactory !== "unresolved") {
|
||||
return cachedOpusDecoderFactory;
|
||||
}
|
||||
cachedOpusDecoderFactory = resolveOpusDecoderFactory();
|
||||
return cachedOpusDecoderFactory;
|
||||
}
|
||||
|
||||
async function decodeOpusStream(stream: Readable): Promise<Buffer> {
|
||||
const selected = createOpusDecoder();
|
||||
if (!selected) {
|
||||
@@ -416,6 +497,7 @@ export class DiscordVoiceManager {
|
||||
connection.subscribe(player);
|
||||
|
||||
let speakingHandler: ((userId: string) => void) | undefined;
|
||||
let speakingEndHandler: ((userId: string) => void) | undefined;
|
||||
let disconnectedHandler: (() => Promise<void>) | undefined;
|
||||
let destroyedHandler: (() => void) | undefined;
|
||||
let playerErrorHandler: ((err: Error) => void) | undefined;
|
||||
@@ -447,6 +529,9 @@ export class DiscordVoiceManager {
|
||||
playbackQueue: Promise.resolve(),
|
||||
processingQueue: Promise.resolve(),
|
||||
activeSpeakers: new Set(),
|
||||
activeCaptureStreams: new Map(),
|
||||
captureFinalizeTimers: new Map(),
|
||||
captureGenerations: new Map(),
|
||||
decryptFailureCount: 0,
|
||||
lastDecryptFailureAt: 0,
|
||||
decryptRecoveryInFlight: false,
|
||||
@@ -454,6 +539,19 @@ export class DiscordVoiceManager {
|
||||
if (speakingHandler) {
|
||||
connection.receiver.speaking.off("start", speakingHandler);
|
||||
}
|
||||
if (speakingEndHandler) {
|
||||
connection.receiver.speaking.off("end", speakingEndHandler);
|
||||
}
|
||||
for (const { timer } of entry.captureFinalizeTimers.values()) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
entry.captureFinalizeTimers.clear();
|
||||
for (const { stream } of entry.activeCaptureStreams.values()) {
|
||||
stream.destroy();
|
||||
}
|
||||
entry.activeCaptureStreams.clear();
|
||||
entry.captureGenerations.clear();
|
||||
entry.activeSpeakers.clear();
|
||||
if (disconnectedHandler) {
|
||||
connection.off(voiceSdk.VoiceConnectionStatus.Disconnected, disconnectedHandler);
|
||||
}
|
||||
@@ -473,6 +571,9 @@ export class DiscordVoiceManager {
|
||||
logger.warn(`discord voice: capture failed: ${formatErrorMessage(err)}`);
|
||||
});
|
||||
};
|
||||
speakingEndHandler = (userId: string) => {
|
||||
this.scheduleCaptureFinalize(entry, userId, "speaker end");
|
||||
};
|
||||
|
||||
disconnectedHandler = async () => {
|
||||
try {
|
||||
@@ -492,7 +593,13 @@ export class DiscordVoiceManager {
|
||||
logger.warn(`discord voice: playback error: ${formatErrorMessage(err)}`);
|
||||
};
|
||||
|
||||
this.enableDaveReceivePassthrough(
|
||||
entry,
|
||||
"post-join warmup",
|
||||
DAVE_RECEIVE_PASSTHROUGH_INITIAL_EXPIRY_SECONDS,
|
||||
);
|
||||
connection.receiver.speaking.on("start", speakingHandler);
|
||||
connection.receiver.speaking.on("end", speakingEndHandler);
|
||||
connection.on(voiceSdk.VoiceConnectionStatus.Disconnected, disconnectedHandler);
|
||||
connection.on(voiceSdk.VoiceConnectionStatus.Destroyed, destroyedHandler);
|
||||
player.on("error", playerErrorHandler);
|
||||
@@ -546,30 +653,86 @@ export class DiscordVoiceManager {
|
||||
.catch((err) => logger.warn(`discord voice: playback failed: ${formatErrorMessage(err)}`));
|
||||
}
|
||||
|
||||
private clearCaptureFinalizeTimer(
|
||||
entry: VoiceSessionEntry,
|
||||
userId: string,
|
||||
generation?: number,
|
||||
) {
|
||||
const scheduled = entry.captureFinalizeTimers.get(userId);
|
||||
if (!scheduled || (generation !== undefined && scheduled.generation !== generation)) {
|
||||
return false;
|
||||
}
|
||||
clearTimeout(scheduled.timer);
|
||||
entry.captureFinalizeTimers.delete(userId);
|
||||
return true;
|
||||
}
|
||||
|
||||
private scheduleCaptureFinalize(entry: VoiceSessionEntry, userId: string, reason: string) {
|
||||
const capture = entry.activeCaptureStreams.get(userId);
|
||||
if (!capture) {
|
||||
return;
|
||||
}
|
||||
this.clearCaptureFinalizeTimer(entry, userId, capture.generation);
|
||||
const timer = setTimeout(() => {
|
||||
const activeCapture = entry.activeCaptureStreams.get(userId);
|
||||
if (!activeCapture || activeCapture.generation !== capture.generation) {
|
||||
return;
|
||||
}
|
||||
entry.captureFinalizeTimers.delete(userId);
|
||||
entry.activeCaptureStreams.delete(userId);
|
||||
entry.activeSpeakers.delete(userId);
|
||||
logVoiceVerbose(
|
||||
`capture finalize: guild ${entry.guildId} channel ${entry.channelId} user ${userId} reason=${reason} grace=${CAPTURE_FINALIZE_GRACE_MS}ms`,
|
||||
);
|
||||
activeCapture.stream.destroy();
|
||||
}, CAPTURE_FINALIZE_GRACE_MS);
|
||||
entry.captureFinalizeTimers.set(userId, { generation: capture.generation, timer });
|
||||
}
|
||||
|
||||
private async handleSpeakingStart(entry: VoiceSessionEntry, userId: string) {
|
||||
if (!userId || entry.activeSpeakers.has(userId)) {
|
||||
if (!userId) {
|
||||
return;
|
||||
}
|
||||
if (this.botUserId && userId === this.botUserId) {
|
||||
return;
|
||||
}
|
||||
if (entry.activeSpeakers.has(userId)) {
|
||||
const activeCapture = entry.activeCaptureStreams.get(userId);
|
||||
const extended = activeCapture
|
||||
? this.clearCaptureFinalizeTimer(entry, userId, activeCapture.generation)
|
||||
: false;
|
||||
logVoiceVerbose(
|
||||
`capture start ignored (already active): guild ${entry.guildId} channel ${entry.channelId} user ${userId}${extended ? " (finalize canceled)" : ""}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
entry.activeSpeakers.add(userId);
|
||||
logVoiceVerbose(
|
||||
`capture start: guild ${entry.guildId} channel ${entry.channelId} user ${userId}`,
|
||||
);
|
||||
const voiceSdk = loadDiscordVoiceSdk();
|
||||
this.enableDaveReceivePassthrough(
|
||||
entry,
|
||||
`speaker ${userId} start`,
|
||||
DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS,
|
||||
);
|
||||
if (entry.player.state.status === voiceSdk.AudioPlayerStatus.Playing) {
|
||||
entry.player.stop(true);
|
||||
}
|
||||
|
||||
const generation = (entry.captureGenerations.get(userId) ?? 0) + 1;
|
||||
entry.captureGenerations.set(userId, generation);
|
||||
const stream = entry.connection.receiver.subscribe(userId, {
|
||||
end: {
|
||||
behavior: voiceSdk.EndBehaviorType.AfterSilence,
|
||||
duration: SILENCE_DURATION_MS,
|
||||
behavior: voiceSdk.EndBehaviorType.Manual,
|
||||
},
|
||||
});
|
||||
entry.activeCaptureStreams.set(userId, { generation, stream });
|
||||
this.clearCaptureFinalizeTimer(entry, userId, generation);
|
||||
let streamAborted = false;
|
||||
stream.on("error", (err) => {
|
||||
streamAborted = isAbortLikeError(err);
|
||||
this.handleReceiveError(entry, err);
|
||||
});
|
||||
|
||||
@@ -583,7 +746,8 @@ export class DiscordVoiceManager {
|
||||
}
|
||||
this.resetDecryptFailureState(entry);
|
||||
const { path: wavPath, durationSeconds } = await writeWavFile(pcm);
|
||||
if (durationSeconds < MIN_SEGMENT_SECONDS) {
|
||||
const minimumDurationSeconds = streamAborted ? 0.2 : MIN_SEGMENT_SECONDS;
|
||||
if (durationSeconds < minimumDurationSeconds) {
|
||||
logVoiceVerbose(
|
||||
`capture too short (${durationSeconds.toFixed(2)}s): guild ${entry.guildId} channel ${entry.channelId} user ${userId}`,
|
||||
);
|
||||
@@ -596,7 +760,12 @@ export class DiscordVoiceManager {
|
||||
await this.processSegment({ entry, wavPath, userId, durationSeconds });
|
||||
});
|
||||
} finally {
|
||||
entry.activeSpeakers.delete(userId);
|
||||
this.clearCaptureFinalizeTimer(entry, userId, generation);
|
||||
const activeCapture = entry.activeCaptureStreams.get(userId);
|
||||
if (activeCapture?.generation === generation) {
|
||||
entry.activeCaptureStreams.delete(userId);
|
||||
entry.activeSpeakers.delete(userId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -655,7 +824,7 @@ export class DiscordVoiceManager {
|
||||
`transcription ok (${transcript.length} chars): guild ${entry.guildId} channel ${entry.channelId}`,
|
||||
);
|
||||
|
||||
const prompt = speaker.label ? `${speaker.label}: ${transcript}` : transcript;
|
||||
const prompt = formatVoiceIngressPrompt(transcript, speaker.label);
|
||||
|
||||
const result = await agentCommandFromIngress(
|
||||
{
|
||||
@@ -694,7 +863,8 @@ export class DiscordVoiceManager {
|
||||
cfg: ttsCfg,
|
||||
providerConfigs: ttsConfig.providerConfigs,
|
||||
});
|
||||
const speakText = directive.overrides.ttsText ?? directive.cleanedText.trim();
|
||||
const rawSpeakText = directive.overrides.ttsText ?? directive.cleanedText.trim();
|
||||
const speakText = sanitizeVoiceReplyTextForSpeech(rawSpeakText, speaker.label);
|
||||
if (!speakText) {
|
||||
logVoiceVerbose(
|
||||
`tts skipped (empty): guild ${entry.guildId} channel ${entry.channelId} user ${userId}`,
|
||||
@@ -737,7 +907,15 @@ export class DiscordVoiceManager {
|
||||
private handleReceiveError(entry: VoiceSessionEntry, err: unknown) {
|
||||
const message = formatErrorMessage(err);
|
||||
logger.warn(`discord voice: receive error: ${message}`);
|
||||
if (!DECRYPT_FAILURE_PATTERN.test(message)) {
|
||||
const sawPassthroughDisabled = DAVE_PASSTHROUGH_DISABLED_PATTERN.test(message);
|
||||
if (sawPassthroughDisabled) {
|
||||
this.enableDaveReceivePassthrough(
|
||||
entry,
|
||||
"receive decrypt error",
|
||||
DAVE_RECEIVE_PASSTHROUGH_REARM_EXPIRY_SECONDS,
|
||||
);
|
||||
}
|
||||
if (!DECRYPT_FAILURE_PATTERN.test(message) && !sawPassthroughDisabled) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
@@ -768,6 +946,54 @@ export class DiscordVoiceManager {
|
||||
});
|
||||
}
|
||||
|
||||
private enableDaveReceivePassthrough(
|
||||
entry: Pick<VoiceSessionEntry, "guildId" | "channelId" | "connection">,
|
||||
reason: string,
|
||||
expirySeconds: number,
|
||||
): boolean {
|
||||
const voiceSdk = loadDiscordVoiceSdk();
|
||||
const state = entry.connection.state as {
|
||||
status: unknown;
|
||||
networking?: {
|
||||
state?: {
|
||||
code?: unknown;
|
||||
dave?: {
|
||||
session?: {
|
||||
setPassthroughMode: (passthrough: boolean, expiry: number) => void;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
if (state.status !== voiceSdk.VoiceConnectionStatus.Ready) {
|
||||
return false;
|
||||
}
|
||||
const networkingState = state.networking?.state;
|
||||
if (
|
||||
!networkingState ||
|
||||
(networkingState.code !== voiceSdk.NetworkingStatusCode.Ready &&
|
||||
networkingState.code !== voiceSdk.NetworkingStatusCode.Resuming)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const daveSession = networkingState.dave?.session;
|
||||
if (!daveSession) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
daveSession.setPassthroughMode(true, expirySeconds);
|
||||
logVoiceVerbose(
|
||||
`enabled DAVE receive passthrough: guild ${entry.guildId} channel ${entry.channelId} expiry=${expirySeconds}s reason=${reason}`,
|
||||
);
|
||||
return true;
|
||||
} catch (passthroughErr) {
|
||||
logger.warn(
|
||||
`discord voice: failed to enable DAVE passthrough guild=${entry.guildId} channel=${entry.channelId} reason=${reason}: ${formatErrorMessage(passthroughErr)}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private resetDecryptFailureState(entry: VoiceSessionEntry) {
|
||||
entry.decryptFailureCount = 0;
|
||||
entry.lastDecryptFailureAt = 0;
|
||||
|
||||
14
extensions/discord/src/voice/prompt.test.ts
Normal file
14
extensions/discord/src/voice/prompt.test.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { formatVoiceIngressPrompt } from "./prompt.js";
|
||||
|
||||
describe("formatVoiceIngressPrompt", () => {
|
||||
it("formats speaker-labeled voice input without imperative-looking prefixes", () => {
|
||||
expect(formatVoiceIngressPrompt("hello there", "speaker-1")).toBe(
|
||||
'Voice transcript from speaker "speaker-1":\nhello there',
|
||||
);
|
||||
});
|
||||
|
||||
it("returns the bare transcript when no speaker label exists", () => {
|
||||
expect(formatVoiceIngressPrompt("hello there")).toBe("hello there");
|
||||
});
|
||||
});
|
||||
8
extensions/discord/src/voice/prompt.ts
Normal file
8
extensions/discord/src/voice/prompt.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
export function formatVoiceIngressPrompt(transcript: string, speakerLabel?: string): string {
|
||||
const cleanedTranscript = transcript.trim();
|
||||
const cleanedLabel = speakerLabel?.trim();
|
||||
if (!cleanedLabel) {
|
||||
return cleanedTranscript;
|
||||
}
|
||||
return [`Voice transcript from speaker "${cleanedLabel}":`, cleanedTranscript].join("\n");
|
||||
}
|
||||
34
extensions/discord/src/voice/sanitize.test.ts
Normal file
34
extensions/discord/src/voice/sanitize.test.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { sanitizeVoiceReplyTextForSpeech } from "./sanitize.js";
|
||||
|
||||
describe("sanitizeVoiceReplyTextForSpeech", () => {
|
||||
it("strips reply tags before speech", () => {
|
||||
expect(sanitizeVoiceReplyTextForSpeech("[[reply_to_current]] hello there")).toBe("hello there");
|
||||
});
|
||||
|
||||
it("strips the current speaker label prefix before speech", () => {
|
||||
expect(sanitizeVoiceReplyTextForSpeech("speaker-1: hello there", "speaker-1")).toBe(
|
||||
"hello there",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps other prefixes intact", () => {
|
||||
expect(sanitizeVoiceReplyTextForSpeech("speaker-2: hello there", "speaker-1")).toBe(
|
||||
"speaker-2: hello there",
|
||||
);
|
||||
});
|
||||
|
||||
it("handles reply tags and speaker prefixes together", () => {
|
||||
expect(
|
||||
sanitizeVoiceReplyTextForSpeech("[[reply_to_current]] speaker-1: hello there", "speaker-1"),
|
||||
).toBe("hello there");
|
||||
});
|
||||
|
||||
it("strips decorative emoji before speech", () => {
|
||||
expect(sanitizeVoiceReplyTextForSpeech("😀 hello there 🎉", "speaker-1")).toBe("hello there");
|
||||
});
|
||||
|
||||
it("keeps punctuation sane after emoji stripping", () => {
|
||||
expect(sanitizeVoiceReplyTextForSpeech("✅ done!", "speaker-1")).toBe("done!");
|
||||
});
|
||||
});
|
||||
32
extensions/discord/src/voice/sanitize.ts
Normal file
32
extensions/discord/src/voice/sanitize.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import { stripInlineDirectiveTagsForDisplay } from "openclaw/plugin-sdk/text-runtime";
|
||||
|
||||
const SPEECH_EMOJI_RE =
|
||||
/(?:\p{Extended_Pictographic}(?:\uFE0F|\u200D|\p{Extended_Pictographic}|\p{Emoji_Modifier})*)+/gu;
|
||||
|
||||
function escapeRegExp(value: string): string {
|
||||
return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
||||
}
|
||||
|
||||
function stripEmojiForSpeech(text: string): string {
|
||||
return text
|
||||
.replace(SPEECH_EMOJI_RE, " ")
|
||||
.replace(/\s+([?!.,:;])/g, "$1")
|
||||
.replace(/[ \t]{2,}/g, " ")
|
||||
.replace(/ *\n */g, "\n")
|
||||
.trim();
|
||||
}
|
||||
|
||||
export function sanitizeVoiceReplyTextForSpeech(text: string, speakerLabel?: string): string {
|
||||
let cleaned = stripInlineDirectiveTagsForDisplay(text).text.trim();
|
||||
if (!cleaned) {
|
||||
return "";
|
||||
}
|
||||
|
||||
const label = speakerLabel?.trim();
|
||||
if (label) {
|
||||
const prefix = new RegExp(`^${escapeRegExp(label)}\\s*:\\s*`, "i");
|
||||
cleaned = cleaned.replace(prefix, "").trim();
|
||||
}
|
||||
|
||||
return stripEmojiForSpeech(cleaned);
|
||||
}
|
||||
@@ -1335,6 +1335,7 @@
|
||||
"yauzl": "3.2.1"
|
||||
},
|
||||
"onlyBuiltDependencies": [
|
||||
"@discordjs/opus",
|
||||
"@lydell/node-pty",
|
||||
"@matrix-org/matrix-sdk-crypto-nodejs",
|
||||
"@napi-rs/canvas",
|
||||
@@ -1347,10 +1348,7 @@
|
||||
"protobufjs",
|
||||
"sharp"
|
||||
],
|
||||
"ignoredBuiltDependencies": [
|
||||
"@discordjs/opus",
|
||||
"koffi"
|
||||
],
|
||||
"ignoredBuiltDependencies": ["koffi"],
|
||||
"packageExtensions": {
|
||||
"@mariozechner/pi-coding-agent": {
|
||||
"dependencies": {
|
||||
|
||||
7
pnpm-lock.yaml
generated
7
pnpm-lock.yaml
generated
@@ -371,6 +371,9 @@ importers:
|
||||
'@discordjs/voice':
|
||||
specifier: ^0.19.2
|
||||
version: 0.19.2(@discordjs/opus@0.10.0)(@emnapi/core@1.8.1)(@emnapi/runtime@1.9.1)(opusscript@0.1.1)
|
||||
'@snazzah/davey':
|
||||
specifier: ^0.1.11
|
||||
version: 0.1.11(@emnapi/core@1.8.1)(@emnapi/runtime@1.9.1)
|
||||
discord-api-types:
|
||||
specifier: ^0.38.44
|
||||
version: 0.38.44
|
||||
@@ -384,6 +387,10 @@ importers:
|
||||
openclaw:
|
||||
specifier: workspace:*
|
||||
version: link:../..
|
||||
optionalDependencies:
|
||||
'@discordjs/opus':
|
||||
specifier: ^0.10.0
|
||||
version: 0.10.0
|
||||
|
||||
extensions/duckduckgo: {}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ minimumReleaseAgeExclude:
|
||||
- "sqlite-vec-*"
|
||||
|
||||
onlyBuiltDependencies:
|
||||
- "@discordjs/opus"
|
||||
- "@lydell/node-pty"
|
||||
- "@matrix-org/matrix-sdk-crypto-nodejs"
|
||||
- "@napi-rs/canvas"
|
||||
@@ -31,5 +32,4 @@ onlyBuiltDependencies:
|
||||
- sharp
|
||||
|
||||
ignoredBuiltDependencies:
|
||||
- "@discordjs/opus"
|
||||
- koffi
|
||||
|
||||
Reference in New Issue
Block a user