diff --git a/CHANGELOG.md b/CHANGELOG.md index 745cd46e761..543cf1ab8df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -175,6 +175,7 @@ Docs: https://docs.openclaw.ai - Agents/Anthropic transport: replay `reasoning_content` from compatible thinking blocks for Xiaomi/MiMo-style Anthropic Messages routes, preventing follow-up turns from losing required reasoning context. Fixes #81261. Thanks @Sunnyone2three. - Telegram: cache successful startup bot identity by account and token fingerprint for up to 24 hours, so restarts can skip redundant `getMe` probes during Telegram API slow periods without permanently pinning renamed bots. Refs #82525. - Telegram: keep streamed text replies in place when delayed TTS audio arrives, sending the audio as a follow-up instead of deleting the preview. Fixes #82570. (#82820) Thanks @joshavant. +- Channels/TTS: deliver TTS supplements across live-preview channels without duplicating text replies, covering WebChat, Telegram, Discord, Slack, Mattermost, and Matrix. (#82935) Thanks @joshavant. - Gateway/sessions: discard stale metadata when recreating dead main session rows, so replacement sessions do not inherit old labels or transcript paths. - Codex app-server: mark native context compaction completion events as successful, preventing false "Compaction incomplete" notices after successful Codex-managed compaction. Fixes #82470. (#81593) Thanks @Kyzcreig. - Codex app-server: keep long-running turns alive while current-turn approvals, user input, dynamic tools, and notifications make progress, and carry that progress into the outer run timeout. (#82601) Thanks @100yenadmin. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index c0a6d5aeb1b..fd010d97bb3 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -7aeb36be8d2e9b28463c8a6fcf9dddd8bf74dae494abe473d42384e7fb2ceefa plugin-sdk-api-baseline.json -4425dcdbb37c75718d55dca283b7fa90f0bef5f97c20995a2e3fe08529faec89 plugin-sdk-api-baseline.jsonl +be07626349862f372f4674118440eb65046b94f96306299a8e485920b0842507 plugin-sdk-api-baseline.json +c2921821a5e913d5329e0df199a7fb726b5a9771a28c0ed55bb6b36d70cdb7e7 plugin-sdk-api-baseline.jsonl diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 60c278b1fe1..918b8882976 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -1916,6 +1916,116 @@ describe("processDiscordMessage draft streaming", () => { expect(deliverDiscordReply).toHaveBeenCalledTimes(1); }); + it("keeps the preview and sends media-only for TTS supplement finals", async () => { + const draftStream = createMockDraftStreamForTest(); + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + } as never); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createAutomaticSourceDeliveryContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 }, + replyToMode: "first", + }); + + await runProcessDiscordMessage(ctx); + + expect(draftStream.flush).toHaveBeenCalledTimes(1); + expect(draftStream.discardPending).not.toHaveBeenCalled(); + expect(draftStream.clear).not.toHaveBeenCalled(); + expectPreviewEditContent("Spoken answer"); + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + expect(firstMockArg(deliverDiscordReply, "deliverDiscordReply")).toMatchObject({ + replyToId: "m1", + replies: [ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ], + }); + }); + + it("falls back with visible text when TTS supplement preview finalization fails", async () => { + const draftStream = createMockDraftStreamForTest(); + editMessageDiscord.mockRejectedValueOnce(new Error("edit failed")); + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + } as never); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createAutomaticSourceDeliveryContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 }, + }); + + await runProcessDiscordMessage(ctx); + + expect(draftStream.flush).toHaveBeenCalledTimes(1); + expect(draftStream.discardPending).toHaveBeenCalled(); + expect(draftStream.clear).toHaveBeenCalled(); + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + expect(firstMockArg(deliverDiscordReply, "deliverDiscordReply")).toMatchObject({ + replies: [ + { + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ], + }); + }); + + it("keeps already-delivered TTS supplement fallback audio-only", async () => { + editMessageDiscord.mockRejectedValueOnce(new Error("edit failed")); + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + } as never); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createAutomaticSourceDeliveryContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 }, + }); + + await runProcessDiscordMessage(ctx); + + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + expect(firstMockArg(deliverDiscordReply, "deliverDiscordReply")).toMatchObject({ + replies: [ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + }, + ], + }); + }); + it("does not flush draft previews for error finals before normal delivery", async () => { const draftStream = createMockDraftStreamForTest(); dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 295401e7675..b4e2e212a9f 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -35,7 +35,11 @@ import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-dispatch-runtime"; import { createChannelHistoryWindow } from "openclaw/plugin-sdk/reply-history"; -import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; +import { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, + resolveSendableOutboundReplyParts, +} from "openclaw/plugin-sdk/reply-payload"; import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import { loadSessionStore, @@ -553,10 +557,14 @@ export async function processDiscordMessage( ) { const reply = resolveSendableOutboundReplyParts(effectivePayload); const hasMedia = reply.hasMedia; - const previewFinalText = draftPreview.resolvePreviewFinalText(finalText); + const ttsSupplement = getReplyPayloadTtsSupplement(effectivePayload); + const previewSourceText = finalText ?? ttsSupplement?.spokenText; + const previewFinalText = draftPreview.resolvePreviewFinalText(previewSourceText); + const previewReplyToId = replyReference.peek(); const hasExplicitReplyDirective = Boolean(effectivePayload.replyToTag || effectivePayload.replyToCurrent) || - (typeof finalText === "string" && /\[\[\s*reply_to(?:_current|\s*:)/i.test(finalText)); + (typeof previewSourceText === "string" && + /\[\[\s*reply_to(?:_current|\s*:)/i.test(previewSourceText)); const result = await deliverWithFinalizableLivePreviewAdapter({ kind: info.kind, @@ -572,7 +580,7 @@ export async function processDiscordMessage( buildFinalEdit: () => { if ( draftPreview.finalizedViaPreviewMessage || - hasMedia || + (hasMedia && !ttsSupplement) || typeof previewFinalText !== "string" || hasExplicitReplyDirective || payload.isError @@ -601,6 +609,40 @@ export async function processDiscordMessage( replyReference.markSent(); observer?.onFinalReplyDelivered?.(); }, + buildSupplementalPayload: () => + ttsSupplement ? buildTtsSupplementMediaPayload(effectivePayload) : undefined, + deliverSupplemental: async (supplementalPayload) => { + if (isProcessAborted(abortSignal)) { + return false; + } + const supplementalReplyToId = + previewReplyToId ?? + replyReference.peek() ?? + (replyToMode === "all" + ? typeof message.id === "string" && message.id + ? message.id + : ctxPayload.MessageSid + : undefined); + await deliverDiscordReply({ + cfg, + replies: [supplementalPayload], + target: deliverTarget, + token, + accountId, + rest: deliveryRest, + runtime, + replyToId: supplementalReplyToId, + replyToMode, + textLimit, + maxLinesPerMessage, + tableMode, + chunkMode, + sessionKey: ctxPayload.SessionKey, + threadBindings, + mediaLocalRoots, + }); + return true; + }, logPreviewEditFailure: (err) => { logVerbose( `discord: preview final edit failed; falling back to standard send (${String(err)})`, @@ -611,11 +653,17 @@ export async function processDiscordMessage( if (isProcessAborted(abortSignal)) { return false; } + const fallbackPayload = + ttsSupplement && + ttsSupplement.visibleTextAlreadyDelivered !== true && + !effectivePayload.text?.trim() + ? { ...effectivePayload, text: ttsSupplement.spokenText } + : effectivePayload; const replyToId = replyReference.use(); notifyFinalReplyStart(); await deliverDiscordReply({ cfg, - replies: [effectivePayload], + replies: [fallbackPayload], target: deliverTarget, token, accountId, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 1c7a7e970bc..29dc8103c6d 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -2809,6 +2809,9 @@ describe("matrix monitor handler draft streaming", () => { text?: string; mediaUrl?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; + spokenText?: string; + ttsSupplement?: { spokenText: string; visibleTextAlreadyDelivered?: boolean }; isCompactionNotice?: boolean; replyToId?: string; }, @@ -2856,7 +2859,7 @@ describe("matrix monitor handler draft streaming", () => { function createStreamingHarness(opts?: { replyToMode?: "off" | "first" | "all" | "batched"; blockStreamingEnabled?: boolean; - streaming?: "partial" | "quiet" | "progress"; + streaming?: "partial" | "quiet" | "progress" | "off"; previewToolProgressEnabled?: boolean; accountConfig?: import("../../types.js").MatrixConfig; }) { @@ -3107,6 +3110,166 @@ describe("matrix monitor handler draft streaming", () => { await finish(); }); + it("keeps the draft preview and sends media-only for TTS supplement finals", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ + blockStreamingEnabled: true, + streaming: "partial", + }); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Spoken answer" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + await deliver( + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + { kind: "final" }, + ); + + expectEditLiveFlag("$draft1", "Spoken answer", false); + expect(redactEventMock).not.toHaveBeenCalled(); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + expect( + requireRecord( + callArg(deliverMatrixRepliesMock, 0, 0, "deliver replies params"), + "deliver replies params", + ).replies, + ).toEqual([ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + await finish(); + }); + + it("falls back with visible text when TTS supplement live finalization fails", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ + blockStreamingEnabled: true, + streaming: "partial", + }); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Spoken answer" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + editMessageMatrixMock.mockRejectedValueOnce(new Error("rate limited")); + await deliver( + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + { kind: "final" }, + ); + + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + expect( + requireRecord( + callArg(deliverMatrixRepliesMock, 0, 0, "deliver replies params"), + "deliver replies params", + ).replies, + ).toEqual([ + { + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + await finish(); + }); + + it("falls back with visible text when TTS supplement preview has no event id", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ + blockStreamingEnabled: true, + streaming: "partial", + }); + const { deliver, finish } = await dispatch(); + + await deliver( + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + { kind: "final" }, + ); + + expect(redactEventMock).not.toHaveBeenCalled(); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + expect( + requireRecord( + callArg(deliverMatrixRepliesMock, 0, 0, "deliver replies params"), + "deliver replies params", + ).replies, + ).toEqual([ + { + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + await finish(); + }); + + it("keeps already-delivered TTS supplements audio-only without a draft preview", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ + blockStreamingEnabled: true, + streaming: "off", + }); + const { deliver, finish } = await dispatch(); + + await deliver( + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + }, + { kind: "final" }, + ); + + expect(redactEventMock).not.toHaveBeenCalled(); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + expect( + requireRecord( + callArg(deliverMatrixRepliesMock, 0, 0, "deliver replies params"), + "deliver replies params", + ).replies, + ).toEqual([ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + }, + ]); + await finish(); + }); + it("still edits partial preview-first drafts when the final text changes", async () => { const { dispatch, redactEventMock } = createStreamingHarness({ blockStreamingEnabled: true, diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index e3e3386a99d..e47a88f11ce 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -25,6 +25,10 @@ import { hasFinalInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply- import type { ChannelBotLoopProtectionFacts } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { mergePairLoopGuardConfig } from "openclaw/plugin-sdk/pair-loop-guard-runtime"; import { buildInboundHistoryFromEntries } from "openclaw/plugin-sdk/reply-history"; +import { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, +} from "openclaw/plugin-sdk/reply-payload"; import type { GetReplyOptions } from "openclaw/plugin-sdk/reply-runtime"; import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime"; import { @@ -1769,12 +1773,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam deliver: async (payload: ReplyPayload, info: { kind: string }) => { if (draftStream && info.kind !== "tool" && !payload.isCompactionNotice) { const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const ttsSupplement = getReplyPayloadTtsSupplement(payload); + const fallbackPayload = + ttsSupplement && + ttsSupplement.visibleTextAlreadyDelivered !== true && + !payload.text?.trim() + ? { ...payload, text: ttsSupplement.spokenText } + : payload; if (draftConsumed) { await draftStream.discardPending(); await deliverMatrixReplies({ cfg, - replies: [payload], + replies: [fallbackPayload], roomId, client, runtime, @@ -1874,7 +1885,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam await redactMatrixDraftEvent(client, roomId, draftEventId); await deliverMatrixReplies({ cfg, - replies: [payload], + replies: [fallbackPayload], roomId, client, runtime, @@ -1890,7 +1901,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam draftConsumed = true; } else if (draftEventId && hasMedia && !payloadReplyMismatch) { let textEditOk = !mustDeliverFinalNormally; - const payloadText = payload.text; + const payloadText = payload.text ?? ttsSupplement?.spokenText; const payloadTextMatchesDraft = typeof payloadText === "string" && draftStream.matchesPreparedText(payloadText); const reusesDraftTextUnchanged = @@ -1917,15 +1928,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } else if (textEditOk && reusesDraftTextUnchanged) { textEditOk = await draftStream.finalizeLive(); } - const reusesDraftAsFinalText = Boolean(payload.text?.trim()) && textEditOk; + const reusesDraftAsFinalText = Boolean(payloadText?.trim()) && textEditOk; if (!reusesDraftAsFinalText) { await redactMatrixDraftEvent(client, roomId, draftEventId); } + const mediaPayload = + ttsSupplement && reusesDraftAsFinalText + ? buildTtsSupplementMediaPayload(payload) + : { + ...payload, + text: reusesDraftAsFinalText + ? undefined + : (payload.text ?? + (ttsSupplement?.visibleTextAlreadyDelivered === true + ? undefined + : ttsSupplement?.spokenText)), + }; await deliverMatrixReplies({ cfg, - replies: [ - { ...payload, text: reusesDraftAsFinalText ? undefined : payload.text }, - ], + replies: [mediaPayload], roomId, client, runtime, @@ -1946,7 +1967,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } const deliveredFallback = await deliverMatrixReplies({ cfg, - replies: [payload], + replies: [fallbackPayload], roomId, client, runtime, diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 35625b15de3..bee23ca372f 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -412,7 +412,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { resolvePreviewFinalText: (text) => text?.trim(), previewState: { finalizedViaPreviewPost: false }, logVerboseMessage: vi.fn(), - deliverFinal, + deliverPayload: deliverFinal, }); expect(deliverFinal).not.toHaveBeenCalled(); @@ -435,7 +435,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { resolvePreviewFinalText: (text) => text?.trim(), previewState: { finalizedViaPreviewPost: false }, logVerboseMessage: vi.fn(), - deliverFinal, + deliverPayload: deliverFinal, }); expect(deliverFinal).toHaveBeenCalledTimes(1); @@ -463,7 +463,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { resolvePreviewFinalText: (text) => text?.trim(), previewState: { finalizedViaPreviewPost: false }, logVerboseMessage: vi.fn(), - deliverFinal, + deliverPayload: deliverFinal, }); expect(deliverFinal).toHaveBeenCalledTimes(1); @@ -472,6 +472,113 @@ describe("deliverMattermostReplyWithDraftPreview", () => { expect(draftStream.clear).toHaveBeenCalledTimes(1); }); + it("keeps the preview and sends media-only for TTS supplement finals", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + } as never, + info: { kind: "final" }, + kind: "channel", + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-1", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverPayload: deliverFinal, + }); + + expect(updateMattermostPostSpy).toHaveBeenCalledWith(expect.anything(), "preview-post-1", { + message: "Spoken answer", + }); + expect(draftStream.discardPending).not.toHaveBeenCalled(); + expect(draftStream.clear).not.toHaveBeenCalled(); + expect(deliverFinal).toHaveBeenCalledWith({ + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }); + }); + + it("falls back with visible text when TTS supplement preview finalization fails", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + updateMattermostPostSpy.mockRejectedValueOnce(new Error("edit failed")); + + await deliverMattermostReplyWithDraftPreview({ + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + } as never, + info: { kind: "final" }, + kind: "channel", + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-1", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverPayload: deliverFinal, + }); + + expect(updateMattermostPostSpy).toHaveBeenCalledTimes(1); + expect(draftStream.discardPending).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(deliverFinal).toHaveBeenCalledWith({ + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }); + }); + + it("keeps already-delivered TTS supplement fallback audio-only", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + updateMattermostPostSpy.mockRejectedValueOnce(new Error("edit failed")); + + await deliverMattermostReplyWithDraftPreview({ + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + } as never, + info: { kind: "final" }, + kind: "channel", + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-1", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverPayload: deliverFinal, + }); + + expect(deliverFinal).toHaveBeenCalledWith({ + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + }); + }); + it("does not flush error finals before normal delivery", async () => { const draftStream = createDraftStreamMock(); const deliverFinal = vi.fn(async () => {}); @@ -486,7 +593,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { resolvePreviewFinalText: (text) => text?.trim(), previewState: { finalizedViaPreviewPost: false }, logVerboseMessage: vi.fn(), - deliverFinal, + deliverPayload: deliverFinal, }); expect(draftStream.flush).not.toHaveBeenCalled(); @@ -509,7 +616,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { resolvePreviewFinalText: (text) => text?.trim(), previewState: { finalizedViaPreviewPost: false }, logVerboseMessage: vi.fn(), - deliverFinal, + deliverPayload: deliverFinal, }); expect(updateMattermostPostSpy).toHaveBeenCalledTimes(1); @@ -546,7 +653,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { resolvePreviewFinalText: (text) => text?.trim(), previewState: { finalizedViaPreviewPost: false }, logVerboseMessage: vi.fn(), - deliverFinal, + deliverPayload: deliverFinal, }), ).rejects.toThrow("send failed"); diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 5bf90de4214..0c3d8c5b358 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -8,7 +8,11 @@ import { } from "openclaw/plugin-sdk/channel-streaming"; import { isLoopbackHost } from "openclaw/plugin-sdk/gateway-runtime"; import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; -import { isReasoningReplyPayload } from "openclaw/plugin-sdk/reply-payload"; +import { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, + isReasoningReplyPayload, +} from "openclaw/plugin-sdk/reply-payload"; import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime"; import { isPrivateNetworkOptInEnabled } from "openclaw/plugin-sdk/ssrf-runtime"; import { @@ -330,7 +334,7 @@ type MattermostDraftPreviewDeliverParams = { resolvePreviewFinalText: (text?: string) => string | undefined; previewState: MattermostDraftPreviewState; logVerboseMessage: (message: string) => void; - deliverFinal: () => Promise; + deliverPayload: (payload: ReplyPayload) => Promise; }; export async function deliverMattermostReplyWithDraftPreview( @@ -353,10 +357,13 @@ export async function deliverMattermostReplyWithDraftPreview( }, buildFinalEdit: (payload) => { const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const previewFinalText = params.resolvePreviewFinalText(payload.text); + const ttsSupplement = getReplyPayloadTtsSupplement(payload); + const previewFinalText = params.resolvePreviewFinalText( + payload.text ?? ttsSupplement?.spokenText, + ); if ( - hasMedia || + (hasMedia && !ttsSupplement) || typeof previewFinalText !== "string" || payload.isError || !canFinalizeMattermostPreviewInPlace({ @@ -376,14 +383,24 @@ export async function deliverMattermostReplyWithDraftPreview( onPreviewFinalized: () => { params.previewState.finalizedViaPreviewPost = true; }, + buildSupplementalPayload: (payload) => + getReplyPayloadTtsSupplement(payload) ? buildTtsSupplementMediaPayload(payload) : undefined, + deliverSupplemental: async (payload) => { + await params.deliverPayload(payload); + }, logPreviewEditFailure: (err) => { params.logVerboseMessage( `mattermost preview final edit failed; falling back to normal send (${String(err)})`, ); }, }), - deliverNormally: async () => { - await params.deliverFinal(); + deliverNormally: async (payload) => { + const supplement = getReplyPayloadTtsSupplement(payload); + await params.deliverPayload( + supplement && !payload.text?.trim() && supplement.visibleTextAlreadyDelivered !== true + ? { ...payload, text: supplement.spokenText } + : payload, + ); }, }); } @@ -1711,18 +1728,18 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} resolvePreviewFinalText, previewState, logVerboseMessage, - deliverFinal: async () => { + deliverPayload: async (payloadToDeliver) => { const outcome = await deliverMattermostReplyPayload({ core, cfg, - payload, + payload: payloadToDeliver, to, accountId: account.accountId, agentId: route.agentId, replyToId: resolveMattermostReplyRootId({ kind, threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, + replyToId: payloadToDeliver.replyToId, }), textLimit, tableMode, @@ -1730,7 +1747,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }); const deliveryLog = formatMattermostFinalDeliveryOutcomeLog({ outcome, - payload, + payload: payloadToDeliver, to, accountId: account.accountId, agentId: route.agentId, diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 3ee89c2fd28..061953034d5 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -64,6 +64,7 @@ const statusReactionControllerMock = { }; let mockedReplyThreadTs: string | undefined = THREAD_TS; let mockedReplyThreadTsSequence: Array | undefined; +let mockedSlackReplyBlocks: unknown[] | undefined; let capturedTyping: | { start: () => Promise; @@ -75,11 +76,14 @@ let capturedTyping: let mockedDispatchSequence: Array<{ kind: "tool" | "block" | "final"; payload: { - text: string; + text?: string; isError?: boolean; isReasoning?: boolean; mediaUrl?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; + spokenText?: string; + ttsSupplement?: { spokenText: string; visibleTextAlreadyDelivered?: boolean }; }; }> = []; @@ -531,6 +535,33 @@ vi.mock("openclaw/plugin-sdk/reply-history", () => ({ })); vi.mock("openclaw/plugin-sdk/reply-payload", () => ({ + buildTtsSupplementMediaPayload: (payload: { + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; + audioAsVoice?: boolean; + spokenText?: string; + ttsSupplement?: { spokenText: string; visibleTextAlreadyDelivered?: boolean }; + }) => { + const { text: _text, ...rest } = payload; + return rest; + }, + getReplyPayloadTtsSupplement: (payload: { + mediaUrl?: string; + mediaUrls?: string[]; + ttsSupplement?: { spokenText?: string; visibleTextAlreadyDelivered?: boolean }; + }) => { + const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length); + const spokenText = payload.ttsSupplement?.spokenText?.trim(); + return hasMedia && spokenText + ? { + spokenText, + ...(payload.ttsSupplement?.visibleTextAlreadyDelivered === true + ? { visibleTextAlreadyDelivered: true } + : {}), + } + : undefined; + }, resolveSendableOutboundReplyParts: ( payload: { text?: string; mediaUrl?: string; mediaUrls?: string[] }, opts?: { text?: string }, @@ -640,7 +671,7 @@ vi.mock("../replies.js", () => ({ markSent: () => {}, }), deliverReplies: deliverRepliesMock, - readSlackReplyBlocks: () => undefined, + readSlackReplyBlocks: () => mockedSlackReplyBlocks, resolveDeliveredSlackReplyThreadTs: (params: { replyToMode: "off" | "first" | "all" | "batched"; payloadReplyToId?: string; @@ -679,11 +710,14 @@ vi.mock("../reply.runtime.js", () => ({ dispatcher: { deliver: ( payload: { - text: string; + text?: string; isError?: boolean; isReasoning?: boolean; mediaUrl?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; + spokenText?: string; + ttsSupplement?: { spokenText: string; visibleTextAlreadyDelivered?: boolean }; }, info: { kind: "tool" | "block" | "final" }, ) => Promise; @@ -759,6 +793,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { capturedTyping = undefined; mockedReplyThreadTs = THREAD_TS; mockedReplyThreadTsSequence = undefined; + mockedSlackReplyBlocks = undefined; mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }]; mockedProgressEvents = []; mockedReplyOptionEvents = []; @@ -1430,6 +1465,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { seal: vi.fn(noopAsync), }; createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + finalizeSlackPreviewEditMock.mockResolvedValueOnce(undefined); mockedDispatchSequence = [ { kind: "final", @@ -1446,6 +1482,222 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { expect(deliverRepliesMock).toHaveBeenCalledTimes(1); }); + it("keeps the preview and sends media-only for TTS supplement finals", async () => { + const draftStream = { + ...createDraftStreamStub(), + flush: vi.fn(noopAsync), + clear: vi.fn(noopAsync), + discardPending: vi.fn(noopAsync), + seal: vi.fn(noopAsync), + }; + mockedSlackReplyBlocks = [ + { + type: "section", + text: { type: "mrkdwn", text: "Spoken answer" }, + }, + ]; + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + finalizeSlackPreviewEditMock.mockResolvedValueOnce(undefined); + mockedReplyThreadTsSequence = [undefined]; + mockedDispatchSequence = [ + { + kind: "final", + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(draftStream.flush).toHaveBeenCalledTimes(1); + expect(draftStream.clear).not.toHaveBeenCalled(); + expectMockCallArgFields(finalizeSlackPreviewEditMock, 0, "preview edit params", { + channelId: "C123", + messageId: "171234.567", + text: "Spoken answer", + blocks: mockedSlackReplyBlocks, + }); + const delivered = requireRecord( + requireMockCall(deliverRepliesMock, 0, "deliver replies")[0], + "deliver replies params", + ); + expectRecordFields(delivered, { replyThreadTs: THREAD_TS }); + expect(delivered.replies).toEqual([ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + }); + + it("suppresses duplicate TTS supplement finals after preview finalization", async () => { + const draftStream = { + ...createDraftStreamStub(), + flush: vi.fn(noopAsync), + clear: vi.fn(noopAsync), + discardPending: vi.fn(noopAsync), + seal: vi.fn(noopAsync), + }; + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + finalizeSlackPreviewEditMock.mockResolvedValueOnce(undefined); + const payload = { + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }; + mockedDispatchSequence = [ + { kind: "final", payload }, + { kind: "final", payload }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + const delivered = requireRecord( + requireMockCall(deliverRepliesMock, 0, "deliver replies")[0], + "deliver replies params", + ); + expect(delivered.replies).toEqual([ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + }); + + it("falls back with visible text when TTS supplement preview finalization fails", async () => { + const draftStream = { + ...createDraftStreamStub(), + flush: vi.fn(noopAsync), + clear: vi.fn(noopAsync), + discardPending: vi.fn(noopAsync), + seal: vi.fn(noopAsync), + }; + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + mockedReplyThreadTsSequence = [undefined]; + mockedDispatchSequence = [ + { + kind: "final", + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(1); + expect(draftStream.discardPending).toHaveBeenCalled(); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + const delivered = requireRecord( + requireMockCall(deliverRepliesMock, 0, "deliver replies")[0], + "deliver replies params", + ); + expectRecordFields(delivered, { replyThreadTs: THREAD_TS }); + expect(delivered.replies).toEqual([ + { + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + }); + + it("falls back with visible text when TTS supplement preview has no message id", async () => { + const draftStream = { + ...createDraftStreamStub(), + flush: vi.fn(noopAsync), + clear: vi.fn(noopAsync), + discardPending: vi.fn(noopAsync), + seal: vi.fn(noopAsync), + messageId: () => undefined, + }; + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + mockedDispatchSequence = [ + { + kind: "final", + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalled(); + const delivered = requireRecord( + requireMockCall(deliverRepliesMock, 0, "deliver replies")[0], + "deliver replies params", + ); + expect(delivered.replies).toEqual([ + { + text: "Spoken answer", + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { spokenText: "Spoken answer" }, + }, + ]); + }); + + it("keeps already-delivered TTS supplements audio-only without a draft preview", async () => { + mockedSlackStreamingMode = "off"; + mockedBlockStreamingEnabled = true; + mockedDispatchSequence = [ + { + kind: "final", + payload: { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + }, + }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled(); + const delivered = requireRecord( + requireMockCall(deliverRepliesMock, 0, "deliver replies")[0], + "deliver replies params", + ); + expect(delivered.replies).toEqual([ + { + mediaUrl: "https://example.com/tts.mp3", + audioAsVoice: true, + spokenText: "Spoken answer", + ttsSupplement: { + spokenText: "Spoken answer", + visibleTextAlreadyDelivered: true, + }, + }, + ]); + }); + it("does not flush draft previews for error finals before normal delivery", async () => { const draftStream = { ...createDraftStreamStub(), diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 0858c0cf5fd..6e1ba489ba0 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -38,7 +38,11 @@ import { } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime"; import { mergePairLoopGuardConfig } from "openclaw/plugin-sdk/pair-loop-guard-runtime"; -import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; +import { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, + resolveSendableOutboundReplyParts, +} from "openclaw/plugin-sdk/reply-payload"; import type { ReplyDispatchKind, ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { resolveInboundLastRouteSessionKey } from "openclaw/plugin-sdk/routing"; import { danger, logVerbose, shouldLogVerbose, sleep } from "openclaw/plugin-sdk/runtime-env"; @@ -893,7 +897,79 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const reply = resolveSendableOutboundReplyParts(payload); const slackBlocks = readSlackReplyBlocks(payload); - const trimmedFinalText = reply.trimmedText; + const ttsSupplement = getReplyPayloadTtsSupplement(payload); + const trimmedFinalText = (payload.text ?? ttsSupplement?.spokenText ?? "").trim(); + const shouldRestoreTtsSupplementTextForPreviewFallback = + Boolean(ttsSupplement) && + ttsSupplement?.visibleTextAlreadyDelivered !== true && + Boolean(draftStream) && + !draftPreviewCommitted && + previewStreamingEnabled && + !payload.text?.trim(); + + if ( + info.kind === "final" && + ttsSupplement && + draftStream && + !draftPreviewCommitted && + previewStreamingEnabled && + !payload.isError && + trimmedFinalText.length > 0 + ) { + const channelId = draftStream.channelId(); + const messageId = draftStream.messageId(); + if (channelId && messageId) { + const finalThreadTs = usedReplyThreadTs ?? statusThreadTs; + await draftStream.flush(); + await draftStream.seal(); + try { + await finalizeSlackPreviewEdit({ + client: ctx.app.client, + token: ctx.botToken, + accountId: account.accountId, + channelId, + messageId, + text: normalizeSlackOutboundText(trimmedFinalText), + ...(slackBlocks?.length ? { blocks: slackBlocks } : {}), + threadTs: finalThreadTs, + }); + } catch (err) { + logVerbose( + `slack: preview final edit failed; falling back to standard send (${formatSlackError(err)})`, + ); + await draftStream.discardPending(); + let delivered = false; + try { + await deliverNormally({ + payload: payload.text?.trim() + ? payload + : { + ...payload, + text: trimmedFinalText, + }, + kind: info.kind, + forcedThreadTs: finalThreadTs, + }); + delivered = true; + } finally { + if (delivered) { + await draftStream.clear(); + } + } + return; + } + draftPreviewCommitted = true; + observedReplyDelivery = true; + replyPlan.markSent(); + await deliverNormally({ + payload: buildTtsSupplementMediaPayload(payload), + kind: info.kind, + forcedThreadTs: finalThreadTs, + }); + deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs }); + return; + } + } const result = await deliverWithFinalizableLivePreviewAdapter({ kind: info.kind, @@ -916,7 +992,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag buildFinalEdit: () => { if ( !previewStreamingEnabled || - reply.hasMedia || + (reply.hasMedia && !ttsSupplement) || payload.isError || (trimmedFinalText.length === 0 && !slackBlocks?.length) ) { @@ -944,6 +1020,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ...(edit.blocks?.length ? { blocks: edit.blocks } : {}), threadTs: edit.threadTs, }); + draftPreviewCommitted = true; }, onPreviewFinalized: (_preview) => { // The preview edit promotes the draft message into the final answer. @@ -954,6 +1031,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag replyPlan.markSent(); deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs }); }, + buildSupplementalPayload: () => + ttsSupplement ? buildTtsSupplementMediaPayload(payload) : undefined, + deliverSupplemental: async (supplementalPayload) => { + await deliverNormally({ + payload: supplementalPayload, + kind: info.kind, + }); + }, logPreviewEditFailure: (err) => { logVerbose( `slack: preview final edit failed; falling back to standard send (${formatSlackError(err)})`, @@ -962,7 +1047,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }), deliverNormally: async () => { await deliverNormally({ - payload, + payload: shouldRestoreTtsSupplementTextForPreviewFallback + ? { + ...payload, + text: ttsSupplement?.spokenText, + } + : payload, kind: info.kind, }); }, @@ -1394,7 +1484,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag agentId: route.agentId, }); } - if (!anyReplyDelivered) { + if (!anyReplyDelivered && !draftPreviewCommitted) { await draftStream?.clear(); return; } diff --git a/extensions/speech-core/src/tts.test.ts b/extensions/speech-core/src/tts.test.ts index a6acb589324..154849642db 100644 --- a/extensions/speech-core/src/tts.test.ts +++ b/extensions/speech-core/src/tts.test.ts @@ -211,6 +211,7 @@ async function expectTtsPayloadResult(params: { expect(result.audioAsVoice).toBe(params.audioAsVoice); expect(result.mediaUrl).toMatch(new RegExp(`voice-\\d+\\.${params.mediaExtension ?? "ogg"}$`)); expect(result.spokenText).toBe(params.text); + expect(result.ttsSupplement).toEqual({ spokenText: params.text }); expect((result as { trustedLocalMedia?: boolean }).trustedLocalMedia).toBe(true); mediaDir = result.mediaUrl ? path.dirname(result.mediaUrl) : undefined; @@ -447,6 +448,7 @@ describe("speech-core native voice-note routing", () => { expect(result.mediaUrl).toMatch(/voice-\d+\.ogg$/); expect(result.audioAsVoice).toBe(true); expect(result.text).toBeUndefined(); + expect(result.ttsSupplement).toBeUndefined(); mediaDir = result.mediaUrl ? path.dirname(result.mediaUrl) : undefined; } finally { if (mediaDir) { diff --git a/extensions/speech-core/src/tts.ts b/extensions/speech-core/src/tts.ts index 0088279b90f..a3836eedd27 100644 --- a/extensions/speech-core/src/tts.ts +++ b/extensions/speech-core/src/tts.ts @@ -12,6 +12,7 @@ import type { import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { redactSensitiveText } from "openclaw/plugin-sdk/logging-core"; import { + markReplyPayloadAsTtsSupplement, resolveSendableOutboundReplyParts, type ReplyPayload, } from "openclaw/plugin-sdk/reply-payload"; @@ -1845,13 +1846,16 @@ export async function maybeApplyTtsToPayload(params: { latencyMs: result.latencyMs, }; - return { + const payloadWithAudio = { ...nextPayload, mediaUrl: result.audioPath, audioAsVoice: result.audioAsVoice || params.payload.audioAsVoice, spokenText: textForAudio, trustedLocalMedia: true, } as ReplyPayload; + return nextPayload.text?.trim() + ? markReplyPayloadAsTtsSupplement(payloadWithAudio) + : payloadWithAudio; } lastTtsAttempt = { diff --git a/extensions/telegram/src/lane-delivery-text-deliverer.ts b/extensions/telegram/src/lane-delivery-text-deliverer.ts index ecb93d88958..e48a9392493 100644 --- a/extensions/telegram/src/lane-delivery-text-deliverer.ts +++ b/extensions/telegram/src/lane-delivery-text-deliverer.ts @@ -6,7 +6,11 @@ import { isPotentialTruncatedFinal, selectLongerFinalText, } from "openclaw/plugin-sdk/channel-streaming"; -import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; +import { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, + resolveSendableOutboundReplyParts, +} from "openclaw/plugin-sdk/reply-payload"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import type { TelegramInlineButtons } from "./button-types.js"; import type { TelegramDraftStream } from "./draft-stream.js"; @@ -199,6 +203,15 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { text: string, options?: { stripButtons?: boolean; fallbackButtons?: TelegramInlineButtons }, ): ReplyPayload => { + if (getReplyPayloadTtsSupplement(payload)) { + return withFallbackTelegramButtons( + withMediaChannelData( + buildTtsSupplementMediaPayload(params.applyTextToPayload(payload, text)), + options, + ), + options?.fallbackButtons, + ); + } if (payload.audioAsVoice === true) { const { text: _text, diff --git a/src/agents/runtime-plan/types.ts b/src/agents/runtime-plan/types.ts index 27a9f3e00f8..4b40fcfd76c 100644 --- a/src/agents/runtime-plan/types.ts +++ b/src/agents/runtime-plan/types.ts @@ -175,6 +175,10 @@ export type AgentRuntimeReplyPayload = { replyToCurrent?: boolean; audioAsVoice?: boolean; spokenText?: string; + ttsSupplement?: { + spokenText: string; + visibleTextAlreadyDelivered?: boolean; + }; isError?: boolean; isReasoning?: boolean; isCompactionNotice?: boolean; diff --git a/src/auto-reply/reply-payload.ts b/src/auto-reply/reply-payload.ts index 9a45212dc53..817038ab242 100644 --- a/src/auto-reply/reply-payload.ts +++ b/src/auto-reply/reply-payload.ts @@ -36,6 +36,11 @@ export type ReplyPayload = { * archival/search use when no visible channel text is sent. */ spokenText?: string; + /** + * Marks a TTS media payload as supplemental audio for assistant text that is + * already visible through streaming or transcript projection. + */ + ttsSupplement?: ReplyPayloadTtsSupplement; isError?: boolean; /** Marks this payload as a reasoning/thinking block. Channels that do not * have a dedicated reasoning lane (e.g. WhatsApp, web) should suppress it. */ @@ -50,6 +55,80 @@ export type ReplyPayload = { channelData?: Record; }; +export type ReplyPayloadTtsSupplement = { + spokenText: string; + visibleTextAlreadyDelivered?: boolean; +}; + +function normalizeTtsSupplementSpokenText(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value : undefined; +} + +function hasReplyPayloadMedia(payload: Pick): boolean { + return Boolean(payload.mediaUrl?.trim() || payload.mediaUrls?.some((url) => url.trim())); +} + +export function getReplyPayloadTtsSupplement( + payload: Pick, +): ReplyPayloadTtsSupplement | undefined { + const spokenText = normalizeTtsSupplementSpokenText(payload.ttsSupplement?.spokenText); + if (!spokenText || !hasReplyPayloadMedia(payload)) { + return undefined; + } + return { + spokenText, + ...(payload.ttsSupplement?.visibleTextAlreadyDelivered === true + ? { visibleTextAlreadyDelivered: true } + : {}), + }; +} + +export function isReplyPayloadTtsSupplement( + payload: Pick, +): boolean { + return Boolean(getReplyPayloadTtsSupplement(payload)); +} + +export function markReplyPayloadAsTtsSupplement( + payload: T, + spokenText: string = payload.spokenText ?? payload.text ?? "", + options?: { visibleTextAlreadyDelivered?: boolean }, +): T { + const normalizedSpokenText = normalizeTtsSupplementSpokenText(spokenText); + if (!normalizedSpokenText) { + return payload; + } + return { + ...payload, + spokenText: normalizedSpokenText, + ttsSupplement: { + spokenText: normalizedSpokenText, + ...(options?.visibleTextAlreadyDelivered === true + ? { visibleTextAlreadyDelivered: true } + : {}), + }, + }; +} + +export function buildTtsSupplementMediaPayload(payload: ReplyPayload): ReplyPayload { + const supplement = getReplyPayloadTtsSupplement(payload); + if (!supplement) { + return payload; + } + const { + text: _text, + presentation: _presentation, + interactive: _interactive, + btw: _btw, + ...mediaPayload + } = payload; + return { + ...mediaPayload, + spokenText: supplement.spokenText, + ttsSupplement: supplement, + }; +} + export type ReplyPayloadMetadata = { assistantMessageIndex?: number; /** diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 90a6befbe13..072648c8abf 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -26,6 +26,7 @@ import { import { resolveStatusTtsSnapshot } from "../../tts/status-config.js"; import { resolveConfiguredTtsMode } from "../../tts/tts-config.js"; import type { SourceReplyDeliveryMode } from "../get-reply-options.types.js"; +import { markReplyPayloadAsTtsSupplement } from "../reply-payload.js"; import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; import { @@ -250,12 +251,19 @@ async function finalizeAcpTurnOutput(params: { accountId: params.ttsAccountId, }); if (ttsSyntheticReply.mediaUrl) { - const delivered = await params.delivery.deliver("final", { - mediaUrl: ttsSyntheticReply.mediaUrl, - audioAsVoice: ttsSyntheticReply.audioAsVoice, - spokenText: accumulatedBlockTtsText, - trustedLocalMedia: true, - }); + const delivered = await params.delivery.deliver( + "final", + markReplyPayloadAsTtsSupplement( + { + mediaUrl: ttsSyntheticReply.mediaUrl, + audioAsVoice: ttsSyntheticReply.audioAsVoice, + spokenText: accumulatedBlockTtsText, + trustedLocalMedia: true, + }, + accumulatedBlockTtsText, + { visibleTextAlreadyDelivered: true }, + ), + ); queuedFinal = queuedFinal || delivered; finalMediaDelivered = delivered; } diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index bdbc594e183..0dc704d9665 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -84,7 +84,11 @@ import { resolveCommandTurnTargetSessionKey, } from "../command-turn-context.js"; import type { BlockReplyContext } from "../get-reply-options.types.js"; -import { getReplyPayloadMetadata, type ReplyPayload } from "../reply-payload.js"; +import { + getReplyPayloadMetadata, + markReplyPayloadAsTtsSupplement, + type ReplyPayload, +} from "../reply-payload.js"; import type { FinalizedMsgContext } from "../templating.js"; import { normalizeVerboseLevel } from "../thinking.js"; import { resolveConversationBindingContextFromMessage } from "./conversation-binding-input.js"; @@ -1700,12 +1704,16 @@ export async function dispatchReplyFromConfig( if (ttsSyntheticReply.mediaUrl) { // Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content. // Keep the spoken text only for hooks/archive consumers. - const ttsOnlyPayload: ReplyPayload = { - mediaUrl: ttsSyntheticReply.mediaUrl, - audioAsVoice: ttsSyntheticReply.audioAsVoice, - spokenText: accumulatedBlockTtsText, - trustedLocalMedia: true, - }; + const ttsOnlyPayload = markReplyPayloadAsTtsSupplement( + { + mediaUrl: ttsSyntheticReply.mediaUrl, + audioAsVoice: ttsSyntheticReply.audioAsVoice, + spokenText: accumulatedBlockTtsText, + trustedLocalMedia: true, + }, + accumulatedBlockTtsText, + { visibleTextAlreadyDelivered: true }, + ); const normalizedTtsOnlyPayload = await normalizeReplyMediaPayload(ttsOnlyPayload); const result = await routeReplyToOriginating(normalizedTtsOnlyPayload); if (result) { diff --git a/src/channels/message/lifecycle.test.ts b/src/channels/message/lifecycle.test.ts index 8f45d752f0c..1071f96a86f 100644 --- a/src/channels/message/lifecycle.test.ts +++ b/src/channels/message/lifecycle.test.ts @@ -109,6 +109,37 @@ describe("message lifecycle primitives", () => { expect(stateArg).toBe(liveState); }); + it("delivers supplemental payloads after finalizing live previews", async () => { + const editFinal = vi.fn(async () => undefined); + const deliverNormally = vi.fn(async () => undefined); + const deliverSupplemental = vi.fn(async () => true); + + const result = await deliverFinalizableLivePreview< + { text?: string; mediaUrl: string }, + string, + { text?: string } + >({ + kind: "final", + payload: { text: "done", mediaUrl: "file:///tmp/reply.mp3" }, + draft: { + flush: vi.fn(async () => undefined), + id: () => "preview-1", + seal: vi.fn(async () => undefined), + clear: vi.fn(async () => undefined), + }, + buildFinalEdit: (payload) => ({ text: payload.text }), + buildSupplementalPayload: (payload) => ({ mediaUrl: payload.mediaUrl }), + editFinal, + deliverNormally, + deliverSupplemental, + }); + + expect(result.kind).toBe("preview-finalized"); + expect(editFinal).toHaveBeenCalledWith("preview-1", { text: "done" }); + expect(deliverNormally).not.toHaveBeenCalled(); + expect(deliverSupplemental).toHaveBeenCalledWith({ mediaUrl: "file:///tmp/reply.mp3" }); + }); + it("treats live preview fallback delivery as terminal state", async () => { const discardPending = vi.fn(async () => undefined); const clear = vi.fn(async () => undefined); diff --git a/src/channels/message/live.ts b/src/channels/message/live.ts index f05da00cb54..65f82e4e274 100644 --- a/src/channels/message/live.ts +++ b/src/channels/message/live.ts @@ -31,6 +31,8 @@ export type FinalizableLivePreviewAdapter = { receipt: MessageReceipt, liveState: LiveMessageState, ) => Promise | void; + buildSupplementalPayload?: (payload: TPayload) => TPayload | undefined; + deliverSupplemental?: (payload: TPayload) => Promise; handlePreviewEditError?: (params: { error: unknown; id: TId; @@ -114,6 +116,8 @@ export async function deliverFinalizableLivePreview(params receipt: MessageReceipt, liveState: LiveMessageState, ) => Promise | void; + buildSupplementalPayload?: (payload: TPayload) => TPayload | undefined; + deliverSupplemental?: (payload: TPayload) => Promise; handlePreviewEditError?: (params: { error: unknown; id: TId; @@ -178,6 +182,10 @@ export async function deliverFinalizableLivePreview(params createPreviewMessageReceipt({ id: finalizedId }); liveState = markLiveMessageFinalized(liveState, receipt); await params.onPreviewFinalized?.(finalizedId, receipt, liveState); + const supplementalPayload = params.buildSupplementalPayload?.(params.payload); + if (supplementalPayload !== undefined) { + await params.deliverSupplemental?.(supplementalPayload); + } return { kind: "preview-finalized", liveState }; } } @@ -241,6 +249,12 @@ export async function deliverWithFinalizableLivePreviewAdapter, +): { textSha256?: string; spokenText?: string } | undefined { + const marker = message.openclawTtsSupplement; + if (!marker || typeof marker !== "object" || Array.isArray(marker)) { + return undefined; + } + const entry = marker as { textSha256?: unknown; spokenText?: unknown }; + const textSha256 = + typeof entry.textSha256 === "string" && entry.textSha256.trim() + ? entry.textSha256.trim() + : undefined; + const spokenText = + typeof entry.spokenText === "string" && entry.spokenText.trim() + ? entry.spokenText.trim() + : undefined; + return textSha256 || spokenText ? { textSha256, spokenText } : undefined; +} + +function isAssistantTtsSupplementMessage(message: Record): boolean { + if (asRoleContentMessage(message)?.role !== "assistant") { + return false; + } + if (!readTtsSupplementMarker(message)) { + return false; + } + const content = message.content; + if (!Array.isArray(content)) { + return false; + } + let hasSupplementBlock = false; + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const type = (block as { type?: unknown }).type; + if (type !== "text") { + hasSupplementBlock = true; + continue; + } + const text = + typeof (block as { text?: unknown }).text === "string" + ? (block as { text: string }).text.trim() + : ""; + if (text && text !== "Audio reply") { + return false; + } + } + return hasSupplementBlock; +} + +function ttsSupplementMatchesAssistant( + marker: { textSha256?: string; spokenText?: string }, + message: Record, +): boolean { + if (asRoleContentMessage(message)?.role !== "assistant") { + return false; + } + if (readTtsSupplementMarker(message)) { + return false; + } + const text = extractProjectedText(message.content ?? message.text).trim(); + if (!text) { + return false; + } + if (marker.textSha256 && digestTtsSupplementText(text) === marker.textSha256) { + return true; + } + return Boolean(marker.spokenText && text === marker.spokenText); +} + +function mergeTtsSupplementContent( + target: Record, + supplement: Record, +): Record { + const supplementBlocks = Array.isArray(supplement.content) + ? supplement.content.filter( + (block) => + Boolean(block) && + typeof block === "object" && + (block as { type?: unknown }).type !== "text", + ) + : []; + if (supplementBlocks.length === 0) { + return target; + } + const targetContent = target.content; + if (Array.isArray(targetContent)) { + return { ...target, content: [...targetContent, ...supplementBlocks] }; + } + const targetText = extractProjectedText(targetContent ?? target.text).trim(); + return { + ...target, + content: [...(targetText ? [{ type: "text", text: targetText }] : []), ...supplementBlocks], + }; +} + +function mergeTtsSupplementMessages( + messages: Array>, +): Array> { + if (!messages.some(isAssistantTtsSupplementMessage)) { + return messages; + } + const merged: Array> = []; + let changed = false; + for (const message of messages) { + const marker = readTtsSupplementMarker(message); + if (marker && isAssistantTtsSupplementMessage(message)) { + let targetIndex = -1; + for (let i = merged.length - 1; i >= 0; i--) { + if (ttsSupplementMatchesAssistant(marker, merged[i])) { + targetIndex = i; + break; + } + } + if (targetIndex >= 0) { + merged[targetIndex] = mergeTtsSupplementContent(merged[targetIndex], message); + changed = true; + continue; + } + } + merged.push(message); + } + return changed ? merged : messages; +} + function isSubagentAnnounceInterSessionUserMessage(message: Record): boolean { const provenance = normalizeInputProvenance(message.provenance); if (provenance?.kind === "inter_session" && provenance.sourceTool === "subagent_announce") { @@ -620,11 +751,15 @@ export function projectChatDisplayMessages( options?: { maxChars?: number; stripEnvelope?: boolean }, ): Array> { const source = options?.stripEnvelope === false ? messages : stripEnvelopeFromMessages(messages); - return filterVisibleProjectedHistoryMessages( - toProjectedMessages( - sanitizeChatHistoryMessages(source, options?.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS), + const merged = mergeTtsSupplementMessages( + filterVisibleProjectedHistoryMessages( + toProjectedMessages(sanitizeChatHistoryMessages(source, Number.MAX_SAFE_INTEGER)), ), ); + return sanitizeChatHistoryMessages( + merged, + options?.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, + ) as Array>; } function limitChatDisplayMessages(messages: T[], maxMessages?: number): T[] { diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index 00cade5d730..6fdc4912e2c 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -19,6 +19,10 @@ export type GatewayInjectedTranscriptAppendResult = { error?: string; }; +export type GatewayInjectedTtsSupplementMarker = { + textSha256: string; +}; + function resolveInjectedAssistantContent(params: { message: string; label?: string; @@ -51,6 +55,7 @@ export async function appendInjectedAssistantMessageToTranscript(params: { content?: Array>; idempotencyKey?: string; abortMeta?: GatewayInjectedAbortMeta; + ttsSupplement?: GatewayInjectedTtsSupplementMarker; now?: number; config?: OpenClawConfig; }): Promise { @@ -91,6 +96,7 @@ export async function appendInjectedAssistantMessageToTranscript(params: { provider: "openclaw", model: "gateway-injected", ...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}), + ...(params.ttsSupplement ? { openclawTtsSupplement: params.ttsSupplement } : {}), ...(params.abortMeta ? { openclawAbort: { diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index 0fa317033bd..7223f95bb0c 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -39,6 +39,7 @@ const mockState = vi.hoisted(() => ({ mediaUrl?: string; mediaUrls?: string[]; spokenText?: string; + ttsSupplement?: { spokenText: string }; audioAsVoice?: boolean; trustedLocalMedia?: boolean; replyToId?: string; @@ -798,6 +799,7 @@ describe("chat directive tag stripping for non-streaming final payloads", () => mediaUrls: [audioPath], trustedLocalMedia: true, audioAsVoice: true, + ttsSupplement: { spokenText: "This text is already in the model transcript." }, }, }, ]; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 805d5f18160..2024a24f99e 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -3,7 +3,12 @@ import fs from "node:fs"; import path from "node:path"; import type { AgentMessage } from "@earendil-works/pi-agent-core"; import { CURRENT_SESSION_VERSION } from "@earendil-works/pi-coding-agent"; -import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; +import { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, + isReplyPayloadTtsSupplement, + resolveSendableOutboundReplyParts, +} from "openclaw/plugin-sdk/reply-payload"; import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveDefaultModelForAgent } from "../../agents/model-selection.js"; import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js"; @@ -124,7 +129,10 @@ import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js"; import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js"; import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js"; import { normalizeWebchatReplyMediaPathsForDisplay } from "./chat-reply-media.js"; -import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js"; +import { + appendInjectedAssistantMessageToTranscript, + type GatewayInjectedTtsSupplementMarker, +} from "./chat-transcript-inject.js"; import { buildWebchatAssistantMessageFromReplyPayloads, buildWebchatAudioContentBlocksFromReplyPayloads, @@ -171,16 +179,51 @@ function isMediaBearingPayload(payload: ReplyPayload): boolean { return false; } -function isTtsSupplementPayload(payload: ReplyPayload): boolean { +function stripVisibleTextFromTtsSupplement(payload: ReplyPayload): ReplyPayload { + return isReplyPayloadTtsSupplement(payload) ? buildTtsSupplementMediaPayload(payload) : payload; +} + +function resolveTtsSupplementMarkerText(text: string): string { + const trimmed = text.trim(); + const projected = projectChatDisplayMessage( + { + role: "assistant", + content: [{ type: "text", text: trimmed }], + }, + { maxChars: Number.MAX_SAFE_INTEGER }, + ); + const projectedContent = Array.isArray(projected?.content) + ? (projected.content as AssistantDisplayContentBlock[]) + : undefined; return ( - typeof payload.spokenText === "string" && - payload.spokenText.trim().length > 0 && - isMediaBearingPayload(payload) + extractAssistantDisplayTextFromContent(projectedContent) ?? + (typeof projected?.text === "string" ? projected.text.trim() : undefined) ?? + trimmed ); } -function stripVisibleTextFromTtsSupplement(payload: ReplyPayload): ReplyPayload { - return isTtsSupplementPayload(payload) ? { ...payload, text: undefined } : payload; +function buildTtsSupplementTranscriptMarker( + payload: ReplyPayload, +): GatewayInjectedTtsSupplementMarker | undefined { + const supplement = getReplyPayloadTtsSupplement(payload); + if (!supplement) { + return undefined; + } + const visibleText = resolveTtsSupplementMarkerText( + payload.text?.trim() || supplement.spokenText.trim(), + ); + return { + textSha256: createHash("sha256").update(visibleText).digest("hex"), + }; +} + +function buildMediaOnlyTtsSupplementTranscriptMarker( + payload: ReplyPayload, +): GatewayInjectedTtsSupplementMarker | undefined { + if (payload.text?.trim()) { + return undefined; + } + return buildTtsSupplementTranscriptMarker(payload); } async function hasImageChatAttachments(attachments: ChatAttachment[]): Promise { @@ -1403,6 +1446,7 @@ async function appendAssistantTranscriptMessage(params: { origin: AbortOrigin; runId: string; }; + ttsSupplement?: GatewayInjectedTtsSupplementMarker; cfg?: OpenClawConfig; }): Promise { const transcriptPath = resolveTranscriptPath({ @@ -1442,6 +1486,7 @@ async function appendAssistantTranscriptMessage(params: { content: params.content, idempotencyKey: params.idempotencyKey, abortMeta: params.abortMeta, + ttsSupplement: params.ttsSupplement, config: params.cfg, }); } @@ -2464,6 +2509,7 @@ export const chatHandlers: GatewayRequestHandlers = { if (!agentRunStarted || appendedWebchatAgentMedia || !isMediaBearingPayload(payload)) { return; } + const ttsSupplementMarker = buildTtsSupplementTranscriptMarker(payload); const [transcriptPayload] = await normalizeWebchatReplyMediaPathsForDisplay({ cfg, sessionKey, @@ -2530,6 +2576,7 @@ export const chatHandlers: GatewayRequestHandlers = { agentId, createIfMissing: true, idempotencyKey: `${clientRunId}:assistant-media`, + ttsSupplement: ttsSupplementMarker, cfg, }); if (appended.ok) { @@ -2719,6 +2766,11 @@ export const chatHandlers: GatewayRequestHandlers = { }, }); const hasSensitiveMedia = hasSensitiveMediaPayload(finalPayloads); + const ttsSupplementMarker = finalPayloads + .map((payload) => buildMediaOnlyTtsSupplementTranscriptMarker(payload)) + .find((marker): marker is GatewayInjectedTtsSupplementMarker => + Boolean(marker), + ); const persistedAssistantContent = replaceAssistantContentTextBlocks( hasSensitiveMedia ? await buildAssistantDisplayContentFromReplyPayloads({ @@ -2775,6 +2827,7 @@ export const chatHandlers: GatewayRequestHandlers = { sessionFile: latestEntry?.sessionFile, agentId, createIfMissing: true, + ttsSupplement: ttsSupplementMarker, cfg, }); if (appended.ok) { @@ -2806,6 +2859,9 @@ export const chatHandlers: GatewayRequestHandlers = { : {}), ...(fallbackText ? { text: fallbackText } : {}), timestamp: now, + ...(ttsSupplementMarker + ? { openclawTtsSupplement: ttsSupplementMarker } + : {}), // Keep this compatible with Pi stopReason enums even though this message isn't // persisted to the transcript due to the append failure. stopReason: "stop", diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 9a45caba72c..63a6f6e54cc 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import fs from "node:fs"; import fsPromises from "node:fs/promises"; import os from "node:os"; @@ -616,6 +617,245 @@ describe("projectRecentChatDisplayMessages", () => { expect(result).toEqual([mediaOnly, multiMediaOnly]); }); + + it("merges delayed TTS supplements into their original assistant message", () => { + const visibleText = "**Here** is the answer."; + const spokenText = "Here is the answer."; + const textSha256 = createHash("sha256").update(visibleText).digest("hex"); + + const result = projectRecentChatDisplayMessages([ + { + role: "user", + content: [{ type: "text", text: "first" }], + timestamp: 1, + }, + { + role: "assistant", + content: [{ type: "text", text: visibleText }], + timestamp: 2, + }, + { + role: "user", + content: [{ type: "text", text: "second" }], + timestamp: 3, + }, + { + role: "assistant", + content: [ + { type: "text", text: "Audio reply" }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + openclawTtsSupplement: { textSha256, spokenText }, + timestamp: 4, + }, + ]); + + expect(result).toEqual([ + { + role: "user", + content: [{ type: "text", text: "first" }], + timestamp: 1, + }, + { + role: "assistant", + content: [ + { type: "text", text: visibleText }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + timestamp: 2, + }, + { + role: "user", + content: [{ type: "text", text: "second" }], + timestamp: 3, + }, + ]); + }); + + it("merges delayed TTS supplements when directive tags are stripped for display", () => { + const rawVisibleText = "[[reply_to_current]]Visible answer."; + const projectedVisibleText = "Visible answer."; + const textSha256 = createHash("sha256").update(projectedVisibleText).digest("hex"); + + const result = projectRecentChatDisplayMessages([ + { + role: "assistant", + content: [{ type: "text", text: rawVisibleText }], + timestamp: 1, + }, + { + role: "assistant", + content: [ + { type: "text", text: "Audio reply" }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + openclawTtsSupplement: { textSha256 }, + timestamp: 2, + }, + ]); + + expect(result).toEqual([ + { + role: "assistant", + content: [ + { type: "text", text: projectedVisibleText }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + timestamp: 1, + }, + ]); + }); + + it("merges delayed TTS supplements before display truncation", () => { + const projectedVisibleText = "Visible answer ".repeat(8).trim(); + const rawVisibleText = `[[reply_to_current]]${projectedVisibleText}`; + const textSha256 = createHash("sha256").update(projectedVisibleText).digest("hex"); + + const result = projectRecentChatDisplayMessages( + [ + { + role: "assistant", + content: [{ type: "text", text: rawVisibleText }], + timestamp: 1, + }, + { + role: "assistant", + content: [ + { type: "text", text: "Audio reply" }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + openclawTtsSupplement: { textSha256 }, + timestamp: 2, + }, + ], + { maxChars: 24 }, + ); + + expect(result).toEqual([ + { + role: "assistant", + content: [ + { type: "text", text: `${projectedVisibleText.slice(0, 24)}\n...(truncated)...` }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + timestamp: 1, + }, + ]); + }); + + it("does not merge visible TTS finals into an older identical assistant message", () => { + const visibleText = "Done."; + const textSha256 = createHash("sha256").update(visibleText).digest("hex"); + const ttsSupplement = { textSha256 }; + + const result = projectRecentChatDisplayMessages([ + { + role: "assistant", + content: [{ type: "text", text: visibleText }], + timestamp: 1, + }, + { + role: "user", + content: [{ type: "text", text: "again" }], + timestamp: 2, + }, + { + role: "assistant", + content: [ + { type: "text", text: visibleText }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + openclawTtsSupplement: ttsSupplement, + timestamp: 3, + }, + ]); + + expect(result).toEqual([ + { + role: "assistant", + content: [{ type: "text", text: visibleText }], + timestamp: 1, + }, + { + role: "user", + content: [{ type: "text", text: "again" }], + timestamp: 2, + }, + { + role: "assistant", + content: [ + { type: "text", text: visibleText }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + openclawTtsSupplement: ttsSupplement, + timestamp: 3, + }, + ]); + }); }); describe("resolveEffectiveChatHistoryMaxChars", () => { diff --git a/src/gateway/session-history-state.test.ts b/src/gateway/session-history-state.test.ts index b7fc930e27a..d2bc4cbef70 100644 --- a/src/gateway/session-history-state.test.ts +++ b/src/gateway/session-history-state.test.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import { describe, expect, test, vi } from "vitest"; import { HEARTBEAT_PROMPT } from "../auto-reply/heartbeat.js"; import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js"; @@ -101,6 +102,61 @@ describe("SessionHistorySseState", () => { expect(state.snapshot().messages.at(-1)?.__openclaw?.seq).toBe(9); }); + test("requests refresh when inline TTS supplement merges into an existing assistant message", () => { + const visibleText = "Here is the answer."; + const textSha256 = createHash("sha256").update(visibleText).digest("hex"); + const state = SessionHistorySseState.fromRawSnapshot({ + target: { sessionId: "sess-main" }, + rawMessages: [ + { + role: "assistant", + content: [{ type: "text", text: visibleText }], + __openclaw: { seq: 2 }, + }, + ], + }); + + const appended = state.appendInlineMessage({ + message: { + role: "assistant", + content: [ + { type: "text", text: "Audio reply" }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + openclawTtsSupplement: { textSha256, spokenText: visibleText }, + }, + messageSeq: 3, + }); + + expect(appended).toEqual({ shouldRefresh: true }); + expect(state.snapshot().messages).toEqual([ + { + role: "assistant", + content: [ + { type: "text", text: visibleText }, + { + type: "attachment", + attachment: { + url: "/tmp/tts.mp3", + kind: "audio", + label: "tts.mp3", + mimeType: "audio/mpeg", + }, + }, + ], + __openclaw: { seq: 2 }, + }, + ]); + }); + test("requests refresh for non-monotonic carried inline sequence", () => { const state = SessionHistorySseState.fromRawSnapshot({ target: { sessionId: "sess-main" }, diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts index ab48ccf303d..788ebfab5c1 100644 --- a/src/gateway/session-history-state.ts +++ b/src/gateway/session-history-state.ts @@ -254,14 +254,27 @@ export class SessionHistorySseState { if (!sanitizedMessage) { return null; } - const nextMessages = [...this.sentHistory.messages, sanitizedMessage]; + const projectedMessages = toSessionHistoryMessages( + projectChatDisplayMessages([...this.sentHistory.messages, nextMessage], { + maxChars: this.maxChars, + }), + ); + if (projectedMessages.length <= this.sentHistory.messages.length) { + this.sentHistory = buildPaginatedSessionHistory({ + messages: projectedMessages, + hasMore: false, + }); + return { shouldRefresh: true }; + } + const projectedMessage = projectedMessages.at(-1) ?? sanitizedMessage; + const nextMessages = [...this.sentHistory.messages, projectedMessage]; this.sentHistory = buildPaginatedSessionHistory({ messages: nextMessages, hasMore: false, }); return { - message: sanitizedMessage, - messageSeq: resolveMessageSeq(sanitizedMessage), + message: projectedMessage, + messageSeq: resolveMessageSeq(projectedMessage), }; } diff --git a/src/plugin-sdk/reply-payload.test.ts b/src/plugin-sdk/reply-payload.test.ts index 47f855e8a5e..93585f9bcdc 100644 --- a/src/plugin-sdk/reply-payload.test.ts +++ b/src/plugin-sdk/reply-payload.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it, vi } from "vitest"; import { + buildTtsSupplementMediaPayload, countOutboundMedia, createNormalizedOutboundDeliverer, deliverFormattedTextWithAttachments, @@ -7,8 +8,10 @@ import { hasOutboundMedia, hasOutboundReplyContent, hasOutboundText, + getReplyPayloadTtsSupplement, isReasoningReplyPayload, isNumericTargetId, + markReplyPayloadAsTtsSupplement, normalizeOutboundReplyPayload, resolveOutboundMediaUrls, resolveSendableOutboundReplyParts, @@ -255,6 +258,48 @@ describe("normalizeOutboundReplyPayload", () => { }); }); +describe("TTS supplement payload helpers", () => { + it("marks media payloads as TTS supplements without treating spokenText alone as enough", () => { + const explicitTtsCommandPayload = { + mediaUrl: "file:///tmp/tts.mp3", + spokenText: "read this", + }; + + expect(getReplyPayloadTtsSupplement(explicitTtsCommandPayload)).toBeUndefined(); + + const marked = markReplyPayloadAsTtsSupplement(explicitTtsCommandPayload, "read this"); + + expect(getReplyPayloadTtsSupplement(marked)).toEqual({ spokenText: "read this" }); + expect( + getReplyPayloadTtsSupplement( + markReplyPayloadAsTtsSupplement(explicitTtsCommandPayload, "read this", { + visibleTextAlreadyDelivered: true, + }), + ), + ).toEqual({ spokenText: "read this", visibleTextAlreadyDelivered: true }); + }); + + it("strips visible content while keeping TTS supplement media fallback text", () => { + expect( + buildTtsSupplementMediaPayload( + markReplyPayloadAsTtsSupplement({ + text: "visible", + mediaUrl: "file:///tmp/tts.mp3", + audioAsVoice: true, + presentation: { title: "visible", blocks: [] }, + interactive: { blocks: [] }, + btw: { question: "side" }, + }), + ), + ).toEqual({ + mediaUrl: "file:///tmp/tts.mp3", + audioAsVoice: true, + spokenText: "visible", + ttsSupplement: { spokenText: "visible" }, + }); + }); +}); + describe("resolveOutboundMediaUrls", () => { it.each([ { diff --git a/src/plugin-sdk/reply-payload.ts b/src/plugin-sdk/reply-payload.ts index ed0b5147c24..889375ede8c 100644 --- a/src/plugin-sdk/reply-payload.ts +++ b/src/plugin-sdk/reply-payload.ts @@ -7,6 +7,13 @@ import { normalizeLowercaseStringOrEmpty, readStringValue } from "../shared/stri export type { MediaPayload, MediaPayloadInput } from "../channels/plugins/media-payload.js"; export { buildMediaPayload } from "../channels/plugins/media-payload.js"; export type ReplyPayload = Omit; +export type { ReplyPayloadTtsSupplement } from "../auto-reply/reply-payload.js"; +export { + buildTtsSupplementMediaPayload, + getReplyPayloadTtsSupplement, + isReplyPayloadTtsSupplement, + markReplyPayloadAsTtsSupplement, +} from "../auto-reply/reply-payload.js"; export type OutboundReplyPayload = { text?: string;