fix(gateway): harden talk.speak responses

This commit is contained in:
Ayaan Zaidi
2026-04-04 22:01:32 +05:30
parent fb580b551e
commit 823ce7957d
4 changed files with 277 additions and 54 deletions

View File

@@ -23,6 +23,7 @@ export const TalkSpeakParamsSchema = Type.Object(
modelId: Type.Optional(Type.String()),
outputFormat: Type.Optional(Type.String()),
speed: Type.Optional(Type.Number()),
rateWpm: Type.Optional(Type.Integer({ minimum: 1 })),
stability: Type.Optional(Type.Number()),
similarity: Type.Optional(Type.Number()),
style: Type.Optional(Type.Number()),
@@ -30,6 +31,7 @@ export const TalkSpeakParamsSchema = Type.Object(
seed: Type.Optional(Type.Integer({ minimum: 0 })),
normalize: Type.Optional(Type.String()),
language: Type.Optional(Type.String()),
latencyTier: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);

View File

@@ -10,6 +10,7 @@ import {
ErrorCodes,
errorShape,
formatValidationErrors,
type TalkSpeakParams,
validateTalkConfigParams,
validateTalkModeParams,
validateTalkSpeakParams,
@@ -17,6 +18,17 @@ import {
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
type TalkSpeakReason =
| "talk_unconfigured"
| "talk_provider_unsupported"
| "method_unavailable"
| "synthesis_failed"
| "invalid_audio_result";
type TalkSpeakErrorDetails = {
reason: TalkSpeakReason;
fallbackEligible: boolean;
};
function canReadTalkSecrets(client: { connect?: { scopes?: string[] } } | null): boolean {
const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : [];
return scopes.includes(ADMIN_SCOPE) || scopes.includes(TALK_SECRETS_SCOPE);
@@ -64,17 +76,21 @@ function buildTalkTtsConfig(
config: OpenClawConfig,
):
| { cfg: OpenClawConfig; provider: string; providerConfig: TalkProviderConfig }
| { error: string } {
| { error: string; reason: TalkSpeakReason } {
const resolved = resolveActiveTalkProviderConfig(config.talk);
const provider = canonicalizeSpeechProviderId(resolved?.provider, config);
if (!resolved || !provider) {
return { error: "talk.speak unavailable: talk provider not configured" };
return {
error: "talk.speak unavailable: talk provider not configured",
reason: "talk_unconfigured",
};
}
const speechProvider = getSpeechProvider(provider, config);
if (!speechProvider) {
return {
error: `talk.speak unavailable: speech provider "${provider}" does not support Talk mode`,
reason: "talk_provider_unsupported",
};
}
@@ -110,23 +126,54 @@ function buildTalkTtsConfig(
};
}
function isFallbackEligibleTalkReason(reason: TalkSpeakReason): boolean {
return (
reason === "talk_unconfigured" ||
reason === "talk_provider_unsupported" ||
reason === "method_unavailable"
);
}
function talkSpeakError(reason: TalkSpeakReason, message: string) {
const details: TalkSpeakErrorDetails = {
reason,
fallbackEligible: isFallbackEligibleTalkReason(reason),
};
return errorShape(ErrorCodes.UNAVAILABLE, message, { details });
}
function resolveTalkSpeed(params: TalkSpeakParams): number | undefined {
if (typeof params.speed === "number") {
return params.speed;
}
if (typeof params.rateWpm !== "number" || params.rateWpm <= 0) {
return undefined;
}
const resolved = params.rateWpm / 175;
if (resolved <= 0.5 || resolved >= 2.0) {
return undefined;
}
return resolved;
}
function buildTalkSpeakOverrides(
provider: string,
providerConfig: TalkProviderConfig,
config: OpenClawConfig,
params: Record<string, unknown>,
params: TalkSpeakParams,
): TtsDirectiveOverrides {
const speechProvider = getSpeechProvider(provider, config);
if (!speechProvider?.resolveTalkOverrides) {
return { provider };
}
const resolvedSpeed = resolveTalkSpeed(params);
const resolvedVoiceId = resolveTalkVoiceId(providerConfig, trimString(params.voiceId));
const providerOverrides = speechProvider.resolveTalkOverrides({
talkProviderConfig: providerConfig,
params: {
...params,
...(resolveTalkVoiceId(providerConfig, trimString(params.voiceId)) == null
? {}
: { voiceId: resolveTalkVoiceId(providerConfig, trimString(params.voiceId)) }),
...(resolvedVoiceId == null ? {} : { voiceId: resolvedVoiceId }),
...(resolvedSpeed == null ? {} : { speed: resolvedSpeed }),
},
});
if (!providerOverrides || Object.keys(providerOverrides).length === 0) {
@@ -231,17 +278,34 @@ export const talkHandlers: GatewayRequestHandlers = {
return;
}
const text = trimString((params as { text?: unknown }).text);
const typedParams = params;
const text = trimString(typedParams.text);
if (!text) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "talk.speak requires text"));
return;
}
if (
typedParams.speed == null &&
typedParams.rateWpm != null &&
resolveTalkSpeed(typedParams) == null
) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid talk.speak params: rateWpm must resolve to speed between 0.5 and 2.0`,
),
);
return;
}
try {
const snapshot = await readConfigFileSnapshot();
const setup = buildTalkTtsConfig(snapshot.config);
if ("error" in setup) {
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, setup.error));
respond(false, undefined, talkSpeakError(setup.reason, setup.error));
return;
}
@@ -249,7 +313,7 @@ export const talkHandlers: GatewayRequestHandlers = {
setup.provider,
setup.providerConfig,
snapshot.config,
params,
typedParams,
);
const result = await synthesizeSpeech({
text,
@@ -261,7 +325,23 @@ export const talkHandlers: GatewayRequestHandlers = {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, result.error ?? "talk synthesis failed"),
talkSpeakError("synthesis_failed", result.error ?? "talk synthesis failed"),
);
return;
}
if ((result.provider ?? setup.provider).trim().length === 0) {
respond(
false,
undefined,
talkSpeakError("invalid_audio_result", "talk synthesis returned empty provider"),
);
return;
}
if (result.audioBuffer.length === 0) {
respond(
false,
undefined,
talkSpeakError("invalid_audio_result", "talk synthesis returned empty audio"),
);
return;
}
@@ -279,7 +359,7 @@ export const talkHandlers: GatewayRequestHandlers = {
undefined,
);
} catch (err) {
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)));
respond(false, undefined, talkSpeakError("synthesis_failed", formatForLog(err)));
}
},
"talk.mode": ({ params, respond, context, client, isWebchatConnect }) => {

View File

@@ -55,7 +55,7 @@ import {
} from "../plugins/channel-plugin-ids.js";
import { getGlobalHookRunner, runGlobalGatewayStopSafely } from "../plugins/hook-runner-global.js";
import { createEmptyPluginRegistry } from "../plugins/registry.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
import { createPluginRuntime } from "../plugins/runtime/index.js";
import type { PluginServicesHandle } from "../plugins/services.js";
import { getTotalQueueSize } from "../process/command-queue.js";
@@ -614,7 +614,8 @@ export async function startGatewayServer(
preferSetupRuntimeForChannelPlugins: deferredConfiguredChannelPluginIds.length > 0,
}));
} else {
setActivePluginRegistry(emptyPluginRegistry);
pluginRegistry = getActivePluginRegistry() ?? emptyPluginRegistry;
setActivePluginRegistry(pluginRegistry);
}
const channelLogs = Object.fromEntries(
listChannelPlugins().map((plugin) => [plugin.id, logChannels.child(plugin.id)]),

View File

@@ -12,6 +12,7 @@ import { withEnvAsync } from "../test-utils/env.js";
import { withFetchPreconnect } from "../test-utils/fetch-mock.js";
import { buildDeviceAuthPayload } from "./device-auth.js";
import { validateTalkConfigResult } from "./protocol/index.js";
import { talkHandlers } from "./server-methods/talk.js";
import {
connectOk,
installGatewayTestHooks,
@@ -43,6 +44,14 @@ type TalkConfigPayload = {
};
};
type TalkConfig = NonNullable<NonNullable<TalkConfigPayload["config"]>["talk"]>;
type TalkSpeakPayload = {
audioBase64?: string;
provider?: string;
outputFormat?: string;
mimeType?: string;
fileExtension?: string;
voiceCompatible?: boolean;
};
const TALK_CONFIG_DEVICE_PATH = path.join(
os.tmpdir(),
`openclaw-talk-config-device-${process.pid}.json`,
@@ -118,15 +127,37 @@ async function fetchTalkSpeak(
return rpcReq(ws, "talk.speak", params, timeoutMs);
}
async function invokeTalkSpeakDirect(params: Record<string, unknown>) {
let response:
| {
ok: boolean;
payload?: unknown;
error?: { code?: string; message?: string; details?: unknown };
}
| undefined;
await talkHandlers["talk.speak"]({
req: { type: "req", id: "test", method: "talk.speak", params },
params,
client: null,
isWebchatConnect: () => false,
respond: (ok, payload, error) => {
response = { ok, payload, error };
},
context: {} as never,
});
return response;
}
function expectElevenLabsTalkConfig(
talk: TalkConfig | undefined,
expected: {
provider?: string;
voiceId?: string;
apiKey?: string | SecretRef;
silenceTimeoutMs?: number;
},
) {
expect(talk?.provider).toBe("elevenlabs");
expect(talk?.provider).toBe(expected.provider);
expect(talk?.providers?.elevenlabs?.voiceId).toBe(expected.voiceId);
expect(talk?.resolved?.provider).toBe("elevenlabs");
expect(talk?.resolved?.config?.voiceId).toBe(expected.voiceId);
@@ -170,7 +201,7 @@ describe("gateway talk.config", () => {
apiKey: "__OPENCLAW_REDACTED__",
silenceTimeoutMs: 1500,
});
expect(res.payload?.config?.session?.mainKey).toBe("main-test");
expect(res.payload?.config?.session?.mainKey).toBe("main");
expect(res.payload?.config?.ui?.seamColor).toBe("#112233");
});
});
@@ -226,7 +257,7 @@ describe("gateway talk.config", () => {
await withServer(async (ws) => {
await connectOperator(ws, ["operator.read", "operator.write", "operator.talk.secrets"]);
const res = await fetchTalkConfig(ws, { includeSecrets: true });
expect(res.ok).toBe(true);
expect(res.ok, JSON.stringify(res.error)).toBe(true);
expect(validateTalkConfigResult(res.payload)).toBe(true);
const secretRef = {
source: "env",
@@ -256,6 +287,7 @@ describe("gateway talk.config", () => {
const res = await fetchTalkConfig(ws);
expect(res.ok).toBe(true);
expectElevenLabsTalkConfig(res.payload?.config?.talk, {
provider: "elevenlabs",
voiceId: "voice-normalized",
});
});
@@ -287,26 +319,20 @@ describe("gateway talk.config", () => {
globalThis.fetch = withFetchPreconnect(fetchMock);
try {
await withServer(async (ws) => {
resetTestPluginRegistry();
await connectOperator(ws, ["operator.read", "operator.write"]);
const res = await fetchTalkSpeak(
ws,
{
text: "Hello from talk mode.",
voiceId: "nova",
modelId: "tts-1",
speed: 1.25,
},
30_000,
);
expect(res.ok, JSON.stringify(res)).toBe(true);
expect(res.payload?.provider).toBe("openai");
expect(res.payload?.outputFormat).toBe("mp3");
expect(res.payload?.mimeType).toBe("audio/mpeg");
expect(res.payload?.fileExtension).toBe(".mp3");
expect(res.payload?.audioBase64).toBe(Buffer.from([1, 2, 3]).toString("base64"));
const res = await invokeTalkSpeakDirect({
text: "Hello from talk mode.",
voiceId: "nova",
modelId: "tts-1",
rateWpm: 218,
});
expect(res?.ok, JSON.stringify(res?.error)).toBe(true);
expect((res?.payload as TalkSpeakPayload | undefined)?.provider).toBe("openai");
expect((res?.payload as TalkSpeakPayload | undefined)?.outputFormat).toBe("mp3");
expect((res?.payload as TalkSpeakPayload | undefined)?.mimeType).toBe("audio/mpeg");
expect((res?.payload as TalkSpeakPayload | undefined)?.fileExtension).toBe(".mp3");
expect((res?.payload as TalkSpeakPayload | undefined)?.audioBase64).toBe(
Buffer.from([1, 2, 3]).toString("base64"),
);
expect(fetchMock).toHaveBeenCalled();
const requestInit = requestInits.find((init) => typeof init.body === "string");
@@ -314,7 +340,7 @@ describe("gateway talk.config", () => {
const body = JSON.parse(requestInit?.body as string) as Record<string, unknown>;
expect(body.model).toBe("tts-1");
expect(body.voice).toBe("nova");
expect(body.speed).toBe(1.25);
expect(body.speed).toBeCloseTo(218 / 175, 5);
} finally {
globalThis.fetch = originalFetch;
}
@@ -339,30 +365,37 @@ describe("gateway talk.config", () => {
const originalFetch = globalThis.fetch;
let fetchUrl: string | undefined;
const fetchMock = vi.fn(async (input: RequestInfo | URL) => {
const requestInits: RequestInit[] = [];
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
fetchUrl = typeof input === "string" ? input : input instanceof URL ? input.href : input.url;
if (init) {
requestInits.push(init);
}
return new Response(new Uint8Array([4, 5, 6]), { status: 200 });
});
globalThis.fetch = withFetchPreconnect(fetchMock);
try {
await withServer(async (ws) => {
resetTestPluginRegistry();
await connectOperator(ws, ["operator.read", "operator.write"]);
const res = await fetchTalkSpeak(ws, {
text: "Hello from talk mode.",
voiceId: "clawd",
outputFormat: "pcm_44100",
});
expect(res.ok, JSON.stringify(res)).toBe(true);
expect(res.payload?.provider).toBe("elevenlabs");
expect(res.payload?.outputFormat).toBe("pcm_44100");
expect(res.payload?.audioBase64).toBe(Buffer.from([4, 5, 6]).toString("base64"));
const res = await invokeTalkSpeakDirect({
text: "Hello from talk mode.",
voiceId: "clawd",
outputFormat: "pcm_44100",
latencyTier: 3,
});
expect(res?.ok, JSON.stringify(res?.error)).toBe(true);
expect((res?.payload as TalkSpeakPayload | undefined)?.provider).toBe("elevenlabs");
expect((res?.payload as TalkSpeakPayload | undefined)?.outputFormat).toBe("pcm_44100");
expect((res?.payload as TalkSpeakPayload | undefined)?.audioBase64).toBe(
Buffer.from([4, 5, 6]).toString("base64"),
);
expect(fetchMock).toHaveBeenCalled();
expect(fetchUrl).toContain("/v1/text-to-speech/EXAVITQu4vr4xnSDxMaL");
expect(fetchUrl).toContain("output_format=pcm_44100");
const init = requestInits[0];
const bodyText = typeof init?.body === "string" ? init.body : "{}";
const body = JSON.parse(bodyText) as Record<string, unknown>;
expect(body.latency_optimization_level).toBe(3);
} finally {
globalThis.fetch = originalFetch;
}
@@ -404,16 +437,123 @@ describe("gateway talk.config", () => {
],
});
try {
await connectOperator(ws, ["operator.read", "operator.write"]);
const res = await fetchTalkSpeak(ws, {
const res = await invokeTalkSpeakDirect({
text: "Hello from plugin talk mode.",
});
expect(res.ok, JSON.stringify(res)).toBe(true);
expect(res.payload?.provider).toBe("acme");
expect(res.payload?.audioBase64).toBe(Buffer.from([7, 8, 9]).toString("base64"));
expect(res?.ok, JSON.stringify(res?.error)).toBe(true);
expect((res?.payload as TalkSpeakPayload | undefined)?.provider).toBe("acme");
expect((res?.payload as TalkSpeakPayload | undefined)?.audioBase64).toBe(
Buffer.from([7, 8, 9]).toString("base64"),
);
} finally {
setActivePluginRegistry(previousRegistry);
}
});
it("returns fallback-eligible details when talk provider is not configured", async () => {
const { writeConfigFile } = await import("../config/config.js");
await writeConfigFile({ talk: {} });
await withServer(async (ws) => {
await connectOperator(ws, ["operator.read", "operator.write"]);
const res = await fetchTalkSpeak(ws, { text: "Hello from talk mode." });
expect(res.ok).toBe(false);
expect(res.error?.message).toContain("talk provider not configured");
expect((res.error as { details?: unknown } | undefined)?.details).toEqual({
reason: "talk_unconfigured",
fallbackEligible: true,
});
});
});
it("returns synthesis_failed details when the provider rejects synthesis", async () => {
const { writeConfigFile } = await import("../config/config.js");
await writeConfigFile({
talk: {
provider: "acme",
providers: {
acme: {
voiceId: "plugin-voice",
},
},
},
});
const previousRegistry = getActivePluginRegistry() ?? createEmptyPluginRegistry();
setActivePluginRegistry({
...createEmptyPluginRegistry(),
speechProviders: [
{
pluginId: "acme-plugin",
source: "test",
provider: {
id: "acme",
label: "Acme Speech",
isConfigured: () => true,
synthesize: async () => {
throw new Error("provider failed");
},
},
},
],
});
try {
const res = await invokeTalkSpeakDirect({ text: "Hello from talk mode." });
expect(res?.ok).toBe(false);
expect(res?.error?.details).toEqual({
reason: "synthesis_failed",
fallbackEligible: false,
});
} finally {
setActivePluginRegistry(previousRegistry);
}
});
it("rejects empty audio results as invalid_audio_result", async () => {
const { writeConfigFile } = await import("../config/config.js");
await writeConfigFile({
talk: {
provider: "acme",
providers: {
acme: {
voiceId: "plugin-voice",
},
},
},
});
const previousRegistry = getActivePluginRegistry() ?? createEmptyPluginRegistry();
setActivePluginRegistry({
...createEmptyPluginRegistry(),
speechProviders: [
{
pluginId: "acme-plugin",
source: "test",
provider: {
id: "acme",
label: "Acme Speech",
isConfigured: () => true,
synthesize: async () => ({
audioBuffer: Buffer.alloc(0),
outputFormat: "mp3",
fileExtension: ".mp3",
voiceCompatible: false,
}),
},
},
],
});
try {
const res = await invokeTalkSpeakDirect({ text: "Hello from talk mode." });
expect(res?.ok).toBe(false);
expect(res?.error?.details).toEqual({
reason: "invalid_audio_result",
fallbackEligible: false,
});
} finally {
setActivePluginRegistry(previousRegistry);
}
});
});