From 65ef70b0702d6eb0ffda587fa5b2622274283f16 Mon Sep 17 00:00:00 2001 From: Alvin <48358093+TigerInYourDream@users.noreply.github.com> Date: Fri, 10 Apr 2026 21:47:43 +0800 Subject: [PATCH] feat(matrix): add MSC4357 live streaming markers to draft-stream edits (#63513) Merged via squash. Prepared head SHA: 87a866a238f829e1eccbd8cd77ceb43fc61d82c8 Co-authored-by: TigerInYourDream <48358093+TigerInYourDream@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 2 + .../matrix/src/matrix/draft-stream.test.ts | 50 +++++++ extensions/matrix/src/matrix/draft-stream.ts | 44 +++++- .../matrix/src/matrix/monitor/handler.test.ts | 134 +++++++++++++++++- .../matrix/src/matrix/monitor/handler.ts | 33 +++-- .../matrix/src/matrix/monitor/replies.ts | 6 +- extensions/matrix/src/matrix/send.ts | 19 +++ extensions/matrix/src/matrix/send/types.ts | 10 ++ 8 files changed, 281 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e989e571ffe..bde5693f651 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Docs: https://docs.openclaw.ai - QA/testing: add a `--runner multipass` lane for `openclaw qa suite` so repo-backed QA scenarios can run inside a disposable Linux VM and write back the usual report, summary, and VM logs. (#63426) Thanks @shakkernerd. - Docs i18n: chunk raw doc translation, reject truncated tagged outputs, avoid ambiguous body-only wrapper unwrapping, and recover from terminated Pi translation sessions without changing the default `openai/gpt-5.4` path. (#62969, #63808) Thanks @hxy91819. - Control UI/dreaming: simplify the Scene and Diary surfaces, preserve unknown phase state for partial status payloads, and stabilize waiting-entry recency ordering so Dreaming status and review lists stay clear and deterministic. (#64035) Thanks @davemorin. +- Gateway: split startup and runtime seams so gateway lifecycle sequencing, reload state, and shutdown behavior stay easier to maintain without changing observed behavior. (#63975) Thanks @gumadeiras. +- Matrix/partial streaming: add MSC4357 live markers to draft preview sends and edits so supporting Matrix clients can render a live/typewriter animation and stop it when the final edit lands. (#63513) Thanks @TigerInYourDream. ### Fixes diff --git a/extensions/matrix/src/matrix/draft-stream.test.ts b/extensions/matrix/src/matrix/draft-stream.test.ts index b5b5bbab980..e9239f5a822 100644 --- a/extensions/matrix/src/matrix/draft-stream.test.ts +++ b/extensions/matrix/src/matrix/draft-stream.test.ts @@ -203,6 +203,56 @@ describe("createMatrixDraftStream", () => { expect(eventId).toBe("$evt1"); }); + it("stop does not finalize live drafts on its own", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + mode: "partial", + }); + + stream.update("Hello"); + await stream.stop(); + + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(sendMessageMock.mock.calls[0]?.[1]).toHaveProperty("org.matrix.msc4357.live"); + }); + + it("finalizeLive clears the live marker at most once", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + mode: "partial", + }); + + stream.update("Hello"); + await stream.stop(); + + await stream.finalizeLive(); + await stream.finalizeLive(); + + expect(sendMessageMock).toHaveBeenCalledTimes(2); + expect(sendMessageMock.mock.calls[1]?.[1]).not.toHaveProperty("org.matrix.msc4357.live"); + }); + + it("marks live finalize failures for normal final delivery fallback", async () => { + sendMessageMock.mockResolvedValueOnce("$evt1").mockRejectedValueOnce(new Error("rate limited")); + + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + mode: "partial", + }); + + stream.update("Hello"); + await stream.stop(); + + await expect(stream.finalizeLive()).resolves.toBe(false); + expect(stream.mustDeliverFinalNormally()).toBe(true); + }); + it("reset allows reuse for next block", async () => { sendMessageMock.mockResolvedValueOnce("$first").mockResolvedValueOnce("$second"); diff --git a/extensions/matrix/src/matrix/draft-stream.ts b/extensions/matrix/src/matrix/draft-stream.ts index c9ca5c9b01c..1d18c4c3e0e 100644 --- a/extensions/matrix/src/matrix/draft-stream.ts +++ b/extensions/matrix/src/matrix/draft-stream.ts @@ -29,6 +29,8 @@ export type MatrixDraftStream = { flush: () => Promise; /** Flush and mark this block as done. Returns the event ID if a message was sent. */ stop: () => Promise; + /** Clear the MSC4357 live marker in place when the draft is kept as final text. */ + finalizeLive: () => Promise; /** Reset state for the next text block (after tool calls). */ reset: () => void; /** The event ID of the current draft message, if any. */ @@ -53,12 +55,17 @@ export function createMatrixDraftStream(params: { }): MatrixDraftStream { const { roomId, client, cfg, threadId, accountId, log } = params; const preview = resolveDraftPreviewOptions(params.mode ?? "partial"); + // MSC4357 live markers are only useful for "partial" mode where users see + // the draft evolve. "quiet" mode uses m.notice for background previews + // where a streaming animation would be unexpected. + const useLive = params.mode !== "quiet"; let currentEventId: string | undefined; let lastSentText = ""; let stopped = false; let sendFailed = false; let finalizeInPlaceBlocked = false; + let liveFinalized = false; let replyToId = params.replyToId; const sendOrEdit = async (text: string): Promise => { @@ -94,10 +101,11 @@ export function createMatrixDraftStream(params: { accountId, msgtype: preview.msgtype, includeMentions: preview.includeMentions, + live: useLive, }); currentEventId = result.messageId; lastSentText = preparedText.trimmedText; - log?.(`draft-stream: created message ${currentEventId}`); + log?.(`draft-stream: created message ${currentEventId}${useLive ? " (MSC4357 live)" : ""}`); } else { await editMessageMatrix(roomId, currentEventId, preparedText.trimmedText, { client, @@ -106,6 +114,7 @@ export function createMatrixDraftStream(params: { accountId, msgtype: preview.msgtype, includeMentions: preview.includeMentions, + live: useLive, }); lastSentText = preparedText.trimmedText; } @@ -133,6 +142,37 @@ export function createMatrixDraftStream(params: { log?.(`draft-stream: ready (throttleMs=${DEFAULT_THROTTLE_MS})`); + const finalizeLive = async (): Promise => { + // Send a final edit without the MSC4357 live marker to signal that + // the stream is complete. Supporting clients will stop the streaming + // animation and display the final content. + if (useLive && !liveFinalized && currentEventId && lastSentText) { + liveFinalized = true; + try { + await editMessageMatrix(roomId, currentEventId, lastSentText, { + client, + cfg, + threadId, + accountId, + msgtype: preview.msgtype, + includeMentions: preview.includeMentions, + live: false, + }); + log?.(`draft-stream: finalized ${currentEventId} (MSC4357 stream ended)`); + return true; + } catch (err) { + log?.(`draft-stream: finalize edit failed: ${String(err)}`); + // If the finalize edit fails, the live marker remains on the last + // successful edit. Flag the stream so callers can fall back to + // normal final delivery or redaction instead of leaving the message + // stuck in a "still streaming" state for MSC4357 clients. + finalizeInPlaceBlocked = true; + return false; + } + } + return true; + }; + const stop = async (): Promise => { // Flush before marking stopped so the loop can drain pending text. await loop.flush(); @@ -149,6 +189,7 @@ export function createMatrixDraftStream(params: { stopped = false; sendFailed = false; finalizeInPlaceBlocked = false; + liveFinalized = false; loop.resetPending(); loop.resetThrottleWindow(); }; @@ -162,6 +203,7 @@ export function createMatrixDraftStream(params: { }, flush: loop.flush, stop, + finalizeLive, reset, eventId: () => currentEventId, matchesPreparedText: (text: string) => diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 8a4b5a4bfcf..7580165a346 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -48,7 +48,7 @@ vi.mock("../send.js", () => ({ sendTypingMatrix: vi.fn(async () => {}), })); -const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => {})); +const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => true)); vi.mock("./replies.js", () => ({ deliverMatrixReplies: deliverMatrixRepliesMock, @@ -2005,7 +2005,7 @@ describe("matrix monitor handler draft streaming", () => { .mockReset() .mockResolvedValue({ messageId: "$draft1", roomId: "!room" }); editMessageMatrixMock.mockReset().mockResolvedValue("$edited"); - deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined); + deliverMatrixRepliesMock.mockReset().mockResolvedValue(true); const redactEventMock = vi.fn(async () => "$redacted"); @@ -2119,7 +2119,15 @@ describe("matrix monitor handler draft streaming", () => { await deliver({ text: "Single block" }, { kind: "final" }); - expect(editMessageMatrixMock).not.toHaveBeenCalled(); + // MSC4357: even when text is unchanged, a finalize edit is sent to clear + // the live marker so supporting clients stop the streaming animation. + expect(editMessageMatrixMock).toHaveBeenCalledTimes(1); + expect(editMessageMatrixMock).toHaveBeenCalledWith( + "!room:example.org", + "$draft1", + "Single block", + expect.objectContaining({ live: false }), + ); expect(deliverMatrixRepliesMock).not.toHaveBeenCalled(); expect(redactEventMock).not.toHaveBeenCalled(); await finish(); @@ -2139,13 +2147,12 @@ describe("matrix monitor handler draft streaming", () => { await deliver({ text: "Single block" }, { kind: "final" }); + expect(editMessageMatrixMock).toHaveBeenCalledTimes(1); expect(editMessageMatrixMock).toHaveBeenCalledWith( "!room:example.org", "$draft1", "Single block", - expect.not.objectContaining({ - extraContent: { [MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY]: true }, - }), + expect.not.objectContaining({ live: false }), ); expect(deliverMatrixRepliesMock).not.toHaveBeenCalled(); expect(redactEventMock).not.toHaveBeenCalled(); @@ -2523,12 +2530,14 @@ describe("matrix monitor handler draft streaming", () => { .mockReset() .mockResolvedValue({ messageId: "$draft1", roomId: "!room" }); editMessageMatrixMock.mockReset().mockResolvedValue("$edited"); - deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined); + deliverMatrixRepliesMock.mockReset().mockResolvedValue(true); + const redactEventMock = vi.fn(async () => "$redacted"); let capturedReplyOpts: ReplyOpts | undefined; const { handler } = createMatrixHandlerTestHarness({ streaming: "quiet", + client: { redactEvent: redactEventMock }, createReplyDispatcherWithTyping: () => ({ dispatcher: { markComplete: () => {}, waitForIdle: async () => {} }, replyOptions: {}, @@ -2561,6 +2570,8 @@ describe("matrix monitor handler draft streaming", () => { createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), ); + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + // After handler exits, draft stream timer must not fire. sendSingleTextMessageMatrixMock.mockClear(); editMessageMatrixMock.mockClear(); @@ -2572,6 +2583,73 @@ describe("matrix monitor handler draft streaming", () => { } }); + it("redacts partial live drafts when generation aborts mid-stream", async () => { + sendSingleTextMessageMatrixMock + .mockReset() + .mockResolvedValue({ messageId: "$draft1", roomId: "!room" }); + editMessageMatrixMock.mockReset().mockResolvedValue("$edited"); + deliverMatrixRepliesMock.mockReset().mockResolvedValue(true); + + const redactEventMock = vi.fn(async () => "$redacted"); + let capturedReplyOpts: ReplyOpts | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "partial", + client: { redactEvent: redactEventMock }, + createReplyDispatcherWithTyping: () => ({ + dispatcher: { markComplete: () => {}, waitForIdle: async () => {} }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }), + dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => { + capturedReplyOpts = args?.replyOptions; + capturedReplyOpts?.onPartialReply?.({ text: "partial" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + throw new Error("model timeout"); + }) as never, + withReplyDispatcher: async (params: { + dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + const result = await params.run(); + await params.onSettled?.(); + return result; + }, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + }); + + it("keeps shutdown cleanup for empty final payloads that send nothing", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "partial" }); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Partial reply" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + deliverMatrixRepliesMock.mockClear(); + deliverMatrixRepliesMock.mockResolvedValue(false); + await deliver({}, { kind: "final" }); + + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + expect(redactEventMock).not.toHaveBeenCalled(); + + await finish(); + + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + }); + it("skips compaction notices in draft finalization", async () => { const { dispatch } = createStreamingHarness(); const { deliver, opts, finish } = await dispatch(); @@ -2605,6 +2683,7 @@ describe("matrix monitor handler draft streaming", () => { deliverMatrixRepliesMock.mockClear(); await deliver({ text: "Final text", replyToId: "$different_msg" }, { kind: "final" }); + expect(editMessageMatrixMock).not.toHaveBeenCalled(); // Draft should be redacted since it can't change reply relation. expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); // Final answer delivered via normal path. @@ -2630,6 +2709,7 @@ describe("matrix monitor handler draft streaming", () => { deliverMatrixRepliesMock.mockClear(); await deliver({ text: "Final text" }, { kind: "final" }); + expect(editMessageMatrixMock).not.toHaveBeenCalled(); expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); await finish(); @@ -2647,11 +2727,51 @@ describe("matrix monitor handler draft streaming", () => { deliverMatrixRepliesMock.mockClear(); await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" }); + expect(editMessageMatrixMock).not.toHaveBeenCalled(); expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); await finish(); }); + it("finalizes partial drafts before reusing unchanged media captions", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "partial" }); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "@room screenshot ready" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + deliverMatrixRepliesMock.mockClear(); + await deliver( + { + text: "@room screenshot ready", + mediaUrl: "https://example.com/image.png", + }, + { kind: "final" }, + ); + + expect(editMessageMatrixMock).toHaveBeenCalledTimes(1); + expect(editMessageMatrixMock).toHaveBeenCalledWith( + "!room:example.org", + "$draft1", + "@room screenshot ready", + expect.objectContaining({ live: false }), + ); + expect(redactEventMock).not.toHaveBeenCalled(); + expect(deliverMatrixRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + mediaUrl: "https://example.com/image.png", + text: undefined, + }), + ], + }), + ); + await finish(); + }); + it("finalizes quiet drafts before reusing unchanged media captions", async () => { const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "quiet" }); const { deliver, opts, finish } = await dispatch(); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 99c33bf5f7e..73711345710 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -412,6 +412,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const eventId = typeof event.event_id === "string" ? event.event_id.trim() : ""; let claimedInboundEvent = false; let draftStreamRef: ReturnType | undefined; + let draftConsumed = false; try { const eventType = event.type; if (eventType === EventType.RoomMessageEncrypted) { @@ -1330,9 +1331,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const pendingDraftBoundaries: PendingDraftBoundary[] = []; const latestQueuedDraftBoundaryOffsets = new Map(); let currentDraftReplyToId = draftReplyToId; - // Set after the first final payload consumes the draft event so - // subsequent finals go through normal delivery. - let draftConsumed = false; + // Set after the first final payload consumes or discards the draft event + // so subsequent finals go through normal delivery. const getDisplayableDraftText = () => { const nextDraftBoundaryOffset = pendingDraftBoundaries.find( @@ -1448,6 +1448,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ? buildMatrixFinalizedPreviewContent() : undefined, }); + } else if (!(await draftStream.finalizeLive())) { + throw new Error("Matrix draft live finalize failed"); } } catch { await redactMatrixDraftEvent(client, roomId, draftEventId); @@ -1469,10 +1471,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } else if (draftEventId && hasMedia && !payloadReplyMismatch) { let textEditOk = !mustDeliverFinalNormally; const payloadText = payload.text; + const payloadTextMatchesDraft = + typeof payloadText === "string" && draftStream.matchesPreparedText(payloadText); + const reusesDraftTextUnchanged = + typeof payloadText === "string" && + Boolean(payloadText.trim()) && + payloadTextMatchesDraft; const requiresFinalTextEdit = quietDraftStreaming || - (typeof payloadText === "string" && - !draftStream.matchesPreparedText(payloadText)); + (typeof payloadText === "string" && !payloadTextMatchesDraft); if (textEditOk && payloadText && requiresFinalTextEdit) { textEditOk = await editMessageMatrix(roomId, draftEventId, payloadText, { client, @@ -1486,6 +1493,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam () => true, () => false, ); + } else if (textEditOk && reusesDraftTextUnchanged) { + textEditOk = await draftStream.finalizeLive(); } const reusesDraftAsFinalText = Boolean(payload.text?.trim()) && textEditOk; if (!reusesDraftAsFinalText) { @@ -1508,10 +1517,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); draftConsumed = true; } else { - if (draftEventId && (payloadReplyMismatch || mustDeliverFinalNormally)) { + const draftRedacted = + Boolean(draftEventId) && (payloadReplyMismatch || mustDeliverFinalNormally); + if (draftRedacted && draftEventId) { await redactMatrixDraftEvent(client, roomId, draftEventId); } - await deliverMatrixReplies({ + const deliveredFallback = await deliverMatrixReplies({ cfg, replies: [payload], roomId, @@ -1524,6 +1535,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam mediaLocalRoots, tableMode, }); + if (draftRedacted || deliveredFallback) { + draftConsumed = true; + } } if (info.kind === "block") { @@ -1652,7 +1666,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam // Stop the draft stream timer so partial drafts don't leak if the // model run throws or times out mid-stream. if (draftStreamRef) { - await draftStreamRef.stop().catch(() => {}); + const draftEventId = await draftStreamRef.stop().catch(() => undefined); + if (draftEventId && !draftConsumed) { + await redactMatrixDraftEvent(client, roomId, draftEventId); + } } if (claimedInboundEvent && inboundDeduper && eventId) { inboundDeduper.releaseEvent({ roomId, eventId }); diff --git a/extensions/matrix/src/matrix/monitor/replies.ts b/extensions/matrix/src/matrix/monitor/replies.ts index 016ed980c9c..42cdaa47e5a 100644 --- a/extensions/matrix/src/matrix/monitor/replies.ts +++ b/extensions/matrix/src/matrix/monitor/replies.ts @@ -41,7 +41,7 @@ export async function deliverMatrixReplies(params: { accountId?: string; mediaLocalRoots?: readonly string[]; tableMode?: MarkdownTableMode; -}): Promise { +}): Promise { const core = getMatrixRuntime(); const tableMode = params.tableMode ?? @@ -56,6 +56,7 @@ export async function deliverMatrixReplies(params: { } }; let hasReplied = false; + let deliveredAny = false; for (const reply of params.replies) { if (reply.isReasoning === true || shouldSuppressReasoningReplyText(reply.text)) { logVerbose("matrix reply suppressed as reasoning-only"); @@ -102,6 +103,7 @@ export async function deliverMatrixReplies(params: { threadId: params.threadId, accountId: params.accountId, }); + deliveredAny = true; sentTextChunk = true; } if (replyToIdForReply && !hasReplied && sentTextChunk) { @@ -123,10 +125,12 @@ export async function deliverMatrixReplies(params: { audioAsVoice: reply.audioAsVoice, accountId: params.accountId, }); + deliveredAny = true; first = false; } if (replyToIdForReply && !hasReplied) { hasReplied = true; } } + return deliveredAny; } diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts index 7c40e9b41cc..b2f63a6e5e3 100644 --- a/extensions/matrix/src/matrix/send.ts +++ b/extensions/matrix/src/matrix/send.ts @@ -31,6 +31,7 @@ import { import { normalizeThreadId, resolveMatrixRoomId } from "./send/targets.js"; import { EventType, + MSC4357_LIVE_KEY, MsgType, RelationType, type MatrixExtraContentFields, @@ -413,6 +414,8 @@ export async function sendSingleTextMessageMatrix( msgtype?: MatrixTextMsgType; includeMentions?: boolean; extraContent?: MatrixExtraContentFields; + /** When true, marks the message as a live/streaming update (MSC4357). */ + live?: boolean; } = {}, ): Promise { const { trimmedText, convertedText, singleEventLimit, fitsInSingleEvent } = @@ -452,6 +455,11 @@ export async function sendSingleTextMessageMatrix( markdown: convertedText, includeMentions: opts.includeMentions, }); + // MSC4357: mark the initial message as live so supporting clients start + // rendering a streaming animation immediately. + if (opts.live) { + (content as Record)[MSC4357_LIVE_KEY] = {}; + } const eventId = await client.sendMessage(resolvedRoom, content); return { messageId: eventId ?? "unknown", @@ -492,6 +500,8 @@ export async function editMessageMatrix( msgtype?: MatrixTextMsgType; includeMentions?: boolean; extraContent?: MatrixExtraContentFields; + /** When true, marks the edit as a live/streaming update (MSC4357). */ + live?: boolean; } = {}, ): Promise { return await withResolvedMatrixSendClient( @@ -561,6 +571,15 @@ export async function editMessageMatrix( content["m.mentions"] = replaceMentions; } + // MSC4357: mark in-progress edits so supporting clients can render a + // streaming animation. The marker is placed in both the outer content + // (for unencrypted rooms / server-side aggregation) and inside + // m.new_content (for E2EE rooms where only decrypted content is read). + if (opts.live) { + content[MSC4357_LIVE_KEY] = {}; + (content["m.new_content"] as Record)[MSC4357_LIVE_KEY] = {}; + } + const eventId = await client.sendMessage(resolvedRoom, content); return eventId ?? ""; }, diff --git a/extensions/matrix/src/matrix/send/types.ts b/extensions/matrix/src/matrix/send/types.ts index 9d777753874..634b8c83eaa 100644 --- a/extensions/matrix/src/matrix/send/types.ts +++ b/extensions/matrix/src/matrix/send/types.ts @@ -122,3 +122,13 @@ export type MatrixFormattedContent = MessageEventContent & { }; export type MatrixExtraContentFields = Record; + +/** + * MSC4357 live marker key. + * When present on event content, signals that the message is still being + * streamed (e.g. an LLM generating a response). Supporting clients render + * the message with a streaming animation until an edit without this marker + * arrives, indicating the stream is complete. + * @see https://github.com/matrix-org/matrix-spec-proposals/pull/4357 + */ +export const MSC4357_LIVE_KEY = "org.matrix.msc4357.live" as const;