fix(tts): clean streamed directive text

This commit is contained in:
Peter Steinberger
2026-04-26 04:08:03 +01:00
parent 2261918c8c
commit cf834e2a21
11 changed files with 290 additions and 17 deletions

View File

@@ -65,6 +65,9 @@ Docs: https://docs.openclaw.ai
### Fixes
- TTS: strip model-emitted TTS directives from streamed block text before channel
delivery, including directives split across adjacent blocks, while preserving
the accumulated raw reply for final-mode synthesis. Fixes #38937.
- ACP: send subagent and async-task completion wakes to external ACP harnesses as
plain prompts instead of OpenClaw internal runtime-context envelopes, while
keeping those envelopes out of ACP transcripts.

View File

@@ -680,6 +680,10 @@ for a single reply, plus an optional `[[tts:text]]...[[/tts:text]]` block to
provide expressive tags (laughter, singing cues, etc) that should only appear in
the audio.
Streaming block delivery strips these directives from visible text before the
channel sees them, even when a directive is split across adjacent blocks. Final
mode still parses the accumulated raw reply for TTS synthesis.
`provider=...` directives are ignored unless `modelOverrides.allowProvider: true`.
Example reply payload:

View File

@@ -196,6 +196,37 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(coordinator.getRoutedCounts().block).toBe(0);
});
it("strips split TTS directives from visible ACP block delivery", async () => {
const dispatcher = createDispatcher();
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig({
messages: { tts: { enabled: true } },
}),
ctx: buildTestCtx({
Provider: "visiblechat",
Surface: "visiblechat",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher,
inboundAudio: false,
shouldRouteToOriginating: false,
});
await coordinator.deliver("block", { text: "Intro [[tts:te" }, { skipTts: true });
await coordinator.deliver(
"block",
{ text: "xt]]hidden[[/tts:text]] visible" },
{ skipTts: true },
);
expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(1, { text: "Intro " });
expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(2, { text: " visible" });
expect(coordinator.getAccumulatedVisibleBlockText()).toBe("Intro \n visible");
expect(coordinator.getAccumulatedBlockTtsText()).toBe(
"Intro [[tts:text]]hidden[[/tts:text]] visible",
);
});
it("prefers provider over surface when detecting direct channel visibility", async () => {
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),

View File

@@ -7,8 +7,9 @@ import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js";
import { createTtsDirectiveTextStreamCleaner } from "../../tts/directives.js";
import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import { resolveConfiguredTtsMode, shouldCleanTtsDirectiveText } from "../../tts/tts-config.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js";
@@ -127,6 +128,9 @@ async function maybeApplyAcpTts(params: {
type AcpDispatchDeliveryState = {
startedReplyLifecycle: boolean;
accumulatedBlockText: string;
accumulatedVisibleBlockText: string;
accumulatedBlockTtsText: string;
cleanBlockTtsDirectiveText?: ReturnType<typeof createTtsDirectiveTextStreamCleaner>;
blockCount: number;
deliveredFinalReply: boolean;
deliveredVisibleText: boolean;
@@ -146,6 +150,8 @@ export type AcpDispatchDeliveryCoordinator = {
) => Promise<boolean>;
getBlockCount: () => number;
getAccumulatedBlockText: () => string;
getAccumulatedVisibleBlockText: () => string;
getAccumulatedBlockTtsText: () => string;
settleVisibleText: () => Promise<void>;
hasDeliveredFinalReply: () => boolean;
hasDeliveredVisibleText: () => boolean;
@@ -172,6 +178,15 @@ export function createAcpDispatchDeliveryCoordinator(params: {
const state: AcpDispatchDeliveryState = {
startedReplyLifecycle: false,
accumulatedBlockText: "",
accumulatedVisibleBlockText: "",
accumulatedBlockTtsText: "",
cleanBlockTtsDirectiveText: shouldCleanTtsDirectiveText({
cfg: params.cfg,
ttsAuto: params.sessionTtsAuto,
agentId: params.agentId,
})
? createTtsDirectiveTextStreamCleaner()
: undefined,
blockCount: 0,
deliveredFinalReply: false,
deliveredVisibleText: false,
@@ -279,16 +294,37 @@ export function createAcpDispatchDeliveryCoordinator(params: {
payload: ReplyPayload,
meta?: AcpDispatchDeliveryMeta,
): Promise<boolean> => {
if (kind === "block" && normalizeOptionalString(payload.text)) {
let visiblePayload = payload;
const rawBlockText = kind === "block" ? normalizeOptionalString(payload.text) : undefined;
if (rawBlockText) {
const joinsBufferedTtsDirective =
state.cleanBlockTtsDirectiveText?.hasBufferedDirectiveText() === true;
if (state.accumulatedBlockText.length > 0) {
state.accumulatedBlockText += "\n";
}
state.accumulatedBlockText += payload.text;
state.accumulatedBlockText += rawBlockText;
if (state.accumulatedBlockTtsText.length > 0 && !joinsBufferedTtsDirective) {
state.accumulatedBlockTtsText += "\n";
}
state.accumulatedBlockTtsText += rawBlockText;
state.blockCount += 1;
if (state.cleanBlockTtsDirectiveText && !payload.isCompactionNotice) {
const text = state.cleanBlockTtsDirectiveText.push(rawBlockText);
visiblePayload = { ...payload, text: text.trim() ? text : undefined };
}
if (visiblePayload.text) {
if (state.accumulatedVisibleBlockText.length > 0) {
state.accumulatedVisibleBlockText += "\n";
}
state.accumulatedVisibleBlockText += visiblePayload.text;
}
}
if (hasOutboundReplyContent(payload, { trimText: true })) {
if (hasOutboundReplyContent(visiblePayload, { trimText: true })) {
await startReplyLifecycleOnce();
} else {
return false;
}
if (params.suppressUserDelivery) {
@@ -296,7 +332,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
}
const ttsPayload = await maybeApplyAcpTts({
payload,
payload: visiblePayload,
cfg: params.cfg,
agentId: params.agentId,
channel: params.ttsChannel,
@@ -396,6 +432,8 @@ export function createAcpDispatchDeliveryCoordinator(params: {
deliver,
getBlockCount: () => state.blockCount,
getAccumulatedBlockText: () => state.accumulatedBlockText,
getAccumulatedVisibleBlockText: () => state.accumulatedVisibleBlockText,
getAccumulatedBlockTtsText: () => state.accumulatedBlockTtsText,
settleVisibleText: settleDirectVisibleText,
hasDeliveredFinalReply: () => state.deliveredFinalReply,
hasDeliveredVisibleText: () => state.deliveredVisibleText,

View File

@@ -197,8 +197,9 @@ async function finalizeAcpTurnOutput(params: {
let queuedFinal =
params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery();
const ttsMode = resolveConfiguredTtsMode(params.cfg, params.agentId);
const accumulatedBlockText = params.delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0;
const accumulatedVisibleBlockText = params.delivery.getAccumulatedVisibleBlockText();
const accumulatedBlockTtsText = params.delivery.getAccumulatedBlockTtsText();
const hasAccumulatedBlockText = accumulatedBlockTtsText.trim().length > 0;
const ttsStatus = resolveStatusTtsSnapshot({
cfg: params.cfg,
sessionAuto: params.sessionTtsAuto,
@@ -212,7 +213,7 @@ async function finalizeAcpTurnOutput(params: {
try {
const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime();
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: accumulatedBlockText },
payload: { text: accumulatedBlockTtsText },
cfg: params.cfg,
channel: params.ttsChannel,
kind: "final",
@@ -224,7 +225,7 @@ async function finalizeAcpTurnOutput(params: {
const delivered = await params.delivery.deliver("final", {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
spokenText: accumulatedBlockText,
spokenText: accumulatedBlockTtsText,
});
queuedFinal = queuedFinal || delivered;
finalMediaDelivered = delivered;
@@ -238,14 +239,14 @@ async function finalizeAcpTurnOutput(params: {
// to prove the final result was visible to the user.
const shouldDeliverTextFallback =
ttsMode !== "all" &&
hasAccumulatedBlockText &&
accumulatedVisibleBlockText.trim().length > 0 &&
!finalMediaDelivered &&
!params.delivery.hasDeliveredFinalReply() &&
(!params.delivery.hasDeliveredVisibleText() || params.delivery.hasFailedVisibleTextDelivery());
if (shouldDeliverTextFallback) {
const delivered = await params.delivery.deliver(
"final",
{ text: accumulatedBlockText },
{ text: accumulatedVisibleBlockText },
{ skipTts: true },
);
queuedFinal = queuedFinal || delivered;

View File

@@ -293,6 +293,7 @@ vi.mock("./dispatch-acp-session.runtime.js", () => ({
vi.mock("../../tts/tts-config.js", () => ({
normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value),
resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode,
shouldCleanTtsDirectiveText: () => true,
shouldAttemptTtsPayload: () => true,
}));

View File

@@ -363,6 +363,7 @@ vi.mock("./dispatch-acp-session.runtime.js", () => ({
vi.mock("../../tts/tts-config.js", () => ({
normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value),
resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode,
shouldCleanTtsDirectiveText: () => true,
shouldAttemptTtsPayload: () => true,
}));
@@ -3331,6 +3332,45 @@ describe("dispatchReplyFromConfig", () => {
expect(blockReplySentTexts).toContain("The answer is 42");
});
it("strips split TTS directives from streamed block text before delivery", async () => {
setNoAbort();
ttsMocks.state.synthesizeFinalAudio = true;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({ Provider: "whatsapp" });
const blockReplySentTexts: string[] = [];
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "Intro [[tts:te" });
await opts?.onBlockReply?.({ text: "xt]]hidden[[/tts:text]] visible" });
return undefined;
};
(dispatcher.sendBlockReply as ReturnType<typeof vi.fn>).mockImplementation(
(payload: ReplyPayload) => {
if (payload.text) {
blockReplySentTexts.push(payload.text);
}
return true;
},
);
await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver });
expect(blockReplySentTexts).toEqual(["Intro ", " visible"]);
expect(blockReplySentTexts.join("")).not.toContain("[[tts");
expect(blockReplySentTexts.join("")).not.toContain("hidden");
expect(ttsMocks.maybeApplyTtsToPayload).toHaveBeenCalledWith(
expect.objectContaining({
kind: "final",
payload: { text: "Intro [[tts:text]]hidden[[/tts:text]] visible" },
}),
);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ mediaUrl: "https://example.com/tts-synth.opus" }),
);
});
it("forwards generated-media block replies in WhatsApp group sessions", async () => {
setNoAbort();
const dispatcher = createDispatcher();

View File

@@ -49,9 +49,11 @@ import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js";
import { createTtsDirectiveTextStreamCleaner } from "../../tts/directives.js";
import {
normalizeTtsAutoMode,
resolveConfiguredTtsMode,
shouldCleanTtsDirectiveText,
shouldAttemptTtsPayload,
} from "../../tts/tts-config.js";
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
@@ -931,7 +933,15 @@ export async function dispatchReplyFromConfig(
// When block streaming succeeds, there's no final reply, so we need to generate
// TTS audio separately from the accumulated block content.
let accumulatedBlockText = "";
let accumulatedBlockTtsText = "";
let blockCount = 0;
const cleanBlockTtsDirectiveText = shouldCleanTtsDirectiveText({
cfg,
ttsAuto: sessionTtsAuto,
agentId: sessionAgentId,
})
? createTtsDirectiveTextStreamCleaner()
: undefined;
const resolveToolDeliveryPayload = (payload: ReplyPayload): ReplyPayload | null => {
if (
@@ -1076,12 +1086,28 @@ export async function dispatchReplyFromConfig(
// Exclude compaction status notices — they are informational UI
// signals and must not be synthesised into the spoken reply.
if (payload.text && !payload.isCompactionNotice) {
const joinsBufferedTtsDirective =
cleanBlockTtsDirectiveText?.hasBufferedDirectiveText() === true;
if (accumulatedBlockText.length > 0) {
accumulatedBlockText += "\n";
}
accumulatedBlockText += payload.text;
if (accumulatedBlockTtsText.length > 0 && !joinsBufferedTtsDirective) {
accumulatedBlockTtsText += "\n";
}
accumulatedBlockTtsText += payload.text;
blockCount++;
}
const visiblePayload =
payload.text && cleanBlockTtsDirectiveText && !payload.isCompactionNotice
? (() => {
const text = cleanBlockTtsDirectiveText.push(payload.text);
return { ...payload, text: text.trim() ? text : undefined };
})()
: payload;
if (!resolveSendableOutboundReplyParts(visiblePayload).hasContent) {
return;
}
// Channels that keep a live draft preview may need to rotate their
// preview state at the logical block boundary before queued block
// delivery drains asynchronously through the dispatcher.
@@ -1093,9 +1119,9 @@ export async function dispatchReplyFromConfig(
assistantMessageIndex: payloadMetadata.assistantMessageIndex,
}
: context;
await params.replyOptions?.onBlockReplyQueued?.(payload, queuedContext);
await params.replyOptions?.onBlockReplyQueued?.(visiblePayload, queuedContext);
const ttsPayload = await maybeApplyTtsToReplyPayload({
payload,
payload: visiblePayload,
cfg,
channel: deliveryChannel,
kind: "block",
@@ -1180,11 +1206,11 @@ export async function dispatchReplyFromConfig(
ttsMode === "final" &&
replies.length === 0 &&
blockCount > 0 &&
accumulatedBlockText.trim()
accumulatedBlockTtsText.trim()
) {
try {
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
payload: { text: accumulatedBlockText },
payload: { text: accumulatedBlockTtsText },
cfg,
channel: deliveryChannel,
kind: "final",
@@ -1199,7 +1225,7 @@ export async function dispatchReplyFromConfig(
const ttsOnlyPayload: ReplyPayload = {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
spokenText: accumulatedBlockText,
spokenText: accumulatedBlockTtsText,
};
const result = await routeReplyToOriginating(ttsOnlyPayload);
if (result) {

View File

@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import type { SpeechProviderPlugin } from "../plugins/types.js";
import { parseTtsDirectives } from "./directives.js";
import { createTtsDirectiveTextStreamCleaner, parseTtsDirectives } from "./directives.js";
import type {
SpeechDirectiveTokenParseContext,
SpeechDirectiveTokenParseResult,
@@ -218,3 +218,36 @@ describe("parseTtsDirectives provider-aware routing", () => {
});
});
});
describe("createTtsDirectiveTextStreamCleaner", () => {
it("strips directive tags split across streamed chunks", () => {
const cleaner = createTtsDirectiveTextStreamCleaner();
expect(cleaner.push("Hello [[tts:voice=al")).toBe("Hello ");
expect(cleaner.push("loy]]world[[/tt")).toBe("world");
expect(cleaner.push("s]]")).toBe("");
expect(cleaner.flush()).toBe("");
});
it("suppresses hidden tts text blocks while preserving normal text", () => {
const cleaner = createTtsDirectiveTextStreamCleaner();
expect(cleaner.push("Shown [[tts:text]]hid")).toBe("Shown ");
expect(cleaner.push("den[[/tts:text]] visible")).toBe(" visible");
expect(cleaner.flush()).toBe("");
});
it("keeps plain tts block contents visible", () => {
const cleaner = createTtsDirectiveTextStreamCleaner();
expect(cleaner.push("[[tts]]read")).toBe("read");
expect(cleaner.push(" this[[/tts]] now")).toBe(" this now");
});
it("preserves non-tts bracket markup and flushes incomplete literals", () => {
const cleaner = createTtsDirectiveTextStreamCleaner();
expect(cleaner.push("See [[note")).toBe("See ");
expect(cleaner.flush()).toBe("[[note");
});
});

View File

@@ -21,6 +21,12 @@ type TextRange = {
end: number;
};
export type TtsDirectiveTextStreamCleaner = {
push: (text: string) => string;
flush: () => string;
hasBufferedDirectiveText: () => boolean;
};
function buildProviderOrder(left: SpeechProviderPlugin, right: SpeechProviderPlugin): number {
const leftOrder = left.autoSelectOrder ?? Number.MAX_SAFE_INTEGER;
const rightOrder = right.autoSelectOrder ?? Number.MAX_SAFE_INTEGER;
@@ -98,6 +104,85 @@ function replaceOutsideMarkdownCode(
});
}
function normalizeTtsTagBody(body: string): string {
return body.trim().replace(/\s+/g, "").toLowerCase();
}
function classifyTtsTag(body: string): "hidden-open" | "hidden-close" | "tts" | "other" {
const normalized = normalizeTtsTagBody(body);
if (normalized === "tts:text") {
return "hidden-open";
}
if (normalized === "/tts:text") {
return "hidden-close";
}
if (
normalized === "tts" ||
normalized.startsWith("tts:") ||
normalized === "/tts" ||
normalized.startsWith("/tts:")
) {
return "tts";
}
return "other";
}
export function createTtsDirectiveTextStreamCleaner(): TtsDirectiveTextStreamCleaner {
let pending = "";
let insideHiddenTextBlock = false;
return {
push(text: string): string {
const input = pending + text;
pending = "";
let output = "";
let index = 0;
while (index < input.length) {
const tagStart = input.indexOf("[[", index);
if (tagStart === -1) {
if (!insideHiddenTextBlock) {
output += input.slice(index);
}
break;
}
if (!insideHiddenTextBlock) {
output += input.slice(index, tagStart);
}
const tagEnd = input.indexOf("]]", tagStart + 2);
if (tagEnd === -1) {
pending = input.slice(tagStart);
break;
}
const rawTag = input.slice(tagStart, tagEnd + 2);
const tag = classifyTtsTag(input.slice(tagStart + 2, tagEnd));
if (tag === "hidden-open") {
insideHiddenTextBlock = true;
} else if (tag === "hidden-close") {
insideHiddenTextBlock = false;
} else if (tag === "other" && !insideHiddenTextBlock) {
output += rawTag;
}
index = tagEnd + 2;
}
return output;
},
flush(): string {
const tail = pending;
pending = "";
return insideHiddenTextBlock ? "" : tail;
},
hasBufferedDirectiveText(): boolean {
return pending.length > 0 || insideHiddenTextBlock;
},
};
}
export function parseTtsDirectives(
text: string,
policy: SpeechModelOverridePolicy,

View File

@@ -105,3 +105,14 @@ export function shouldAttemptTtsPayload(params: {
}
return raw?.enabled === true;
}
export function shouldCleanTtsDirectiveText(params: {
cfg: OpenClawConfig;
ttsAuto?: string;
agentId?: string;
}): boolean {
if (!shouldAttemptTtsPayload(params)) {
return false;
}
return resolveEffectiveTtsConfig(params.cfg, params.agentId).modelOverrides?.enabled !== false;
}