feat(matrix): add MSC4357 live streaming markers to draft-stream edits (#63513)

Merged via squash.

Prepared head SHA: 87a866a238
Co-authored-by: TigerInYourDream <48358093+TigerInYourDream@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Alvin
2026-04-10 21:47:43 +08:00
committed by GitHub
parent 3631ec1f54
commit 65ef70b070
8 changed files with 281 additions and 17 deletions

View File

@@ -14,6 +14,8 @@ Docs: https://docs.openclaw.ai
- QA/testing: add a `--runner multipass` lane for `openclaw qa suite` so repo-backed QA scenarios can run inside a disposable Linux VM and write back the usual report, summary, and VM logs. (#63426) Thanks @shakkernerd.
- Docs i18n: chunk raw doc translation, reject truncated tagged outputs, avoid ambiguous body-only wrapper unwrapping, and recover from terminated Pi translation sessions without changing the default `openai/gpt-5.4` path. (#62969, #63808) Thanks @hxy91819.
- Control UI/dreaming: simplify the Scene and Diary surfaces, preserve unknown phase state for partial status payloads, and stabilize waiting-entry recency ordering so Dreaming status and review lists stay clear and deterministic. (#64035) Thanks @davemorin.
- Gateway: split startup and runtime seams so gateway lifecycle sequencing, reload state, and shutdown behavior stay easier to maintain without changing observed behavior. (#63975) Thanks @gumadeiras.
- Matrix/partial streaming: add MSC4357 live markers to draft preview sends and edits so supporting Matrix clients can render a live/typewriter animation and stop it when the final edit lands. (#63513) Thanks @TigerInYourDream.
### Fixes

View File

@@ -203,6 +203,56 @@ describe("createMatrixDraftStream", () => {
expect(eventId).toBe("$evt1");
});
it("stop does not finalize live drafts on its own", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
mode: "partial",
});
stream.update("Hello");
await stream.stop();
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(sendMessageMock.mock.calls[0]?.[1]).toHaveProperty("org.matrix.msc4357.live");
});
it("finalizeLive clears the live marker at most once", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
mode: "partial",
});
stream.update("Hello");
await stream.stop();
await stream.finalizeLive();
await stream.finalizeLive();
expect(sendMessageMock).toHaveBeenCalledTimes(2);
expect(sendMessageMock.mock.calls[1]?.[1]).not.toHaveProperty("org.matrix.msc4357.live");
});
it("marks live finalize failures for normal final delivery fallback", async () => {
sendMessageMock.mockResolvedValueOnce("$evt1").mockRejectedValueOnce(new Error("rate limited"));
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
mode: "partial",
});
stream.update("Hello");
await stream.stop();
await expect(stream.finalizeLive()).resolves.toBe(false);
expect(stream.mustDeliverFinalNormally()).toBe(true);
});
it("reset allows reuse for next block", async () => {
sendMessageMock.mockResolvedValueOnce("$first").mockResolvedValueOnce("$second");

View File

@@ -29,6 +29,8 @@ export type MatrixDraftStream = {
flush: () => Promise<void>;
/** Flush and mark this block as done. Returns the event ID if a message was sent. */
stop: () => Promise<string | undefined>;
/** Clear the MSC4357 live marker in place when the draft is kept as final text. */
finalizeLive: () => Promise<boolean>;
/** Reset state for the next text block (after tool calls). */
reset: () => void;
/** The event ID of the current draft message, if any. */
@@ -53,12 +55,17 @@ export function createMatrixDraftStream(params: {
}): MatrixDraftStream {
const { roomId, client, cfg, threadId, accountId, log } = params;
const preview = resolveDraftPreviewOptions(params.mode ?? "partial");
// MSC4357 live markers are only useful for "partial" mode where users see
// the draft evolve. "quiet" mode uses m.notice for background previews
// where a streaming animation would be unexpected.
const useLive = params.mode !== "quiet";
let currentEventId: string | undefined;
let lastSentText = "";
let stopped = false;
let sendFailed = false;
let finalizeInPlaceBlocked = false;
let liveFinalized = false;
let replyToId = params.replyToId;
const sendOrEdit = async (text: string): Promise<boolean> => {
@@ -94,10 +101,11 @@ export function createMatrixDraftStream(params: {
accountId,
msgtype: preview.msgtype,
includeMentions: preview.includeMentions,
live: useLive,
});
currentEventId = result.messageId;
lastSentText = preparedText.trimmedText;
log?.(`draft-stream: created message ${currentEventId}`);
log?.(`draft-stream: created message ${currentEventId}${useLive ? " (MSC4357 live)" : ""}`);
} else {
await editMessageMatrix(roomId, currentEventId, preparedText.trimmedText, {
client,
@@ -106,6 +114,7 @@ export function createMatrixDraftStream(params: {
accountId,
msgtype: preview.msgtype,
includeMentions: preview.includeMentions,
live: useLive,
});
lastSentText = preparedText.trimmedText;
}
@@ -133,6 +142,37 @@ export function createMatrixDraftStream(params: {
log?.(`draft-stream: ready (throttleMs=${DEFAULT_THROTTLE_MS})`);
const finalizeLive = async (): Promise<boolean> => {
// Send a final edit without the MSC4357 live marker to signal that
// the stream is complete. Supporting clients will stop the streaming
// animation and display the final content.
if (useLive && !liveFinalized && currentEventId && lastSentText) {
liveFinalized = true;
try {
await editMessageMatrix(roomId, currentEventId, lastSentText, {
client,
cfg,
threadId,
accountId,
msgtype: preview.msgtype,
includeMentions: preview.includeMentions,
live: false,
});
log?.(`draft-stream: finalized ${currentEventId} (MSC4357 stream ended)`);
return true;
} catch (err) {
log?.(`draft-stream: finalize edit failed: ${String(err)}`);
// If the finalize edit fails, the live marker remains on the last
// successful edit. Flag the stream so callers can fall back to
// normal final delivery or redaction instead of leaving the message
// stuck in a "still streaming" state for MSC4357 clients.
finalizeInPlaceBlocked = true;
return false;
}
}
return true;
};
const stop = async (): Promise<string | undefined> => {
// Flush before marking stopped so the loop can drain pending text.
await loop.flush();
@@ -149,6 +189,7 @@ export function createMatrixDraftStream(params: {
stopped = false;
sendFailed = false;
finalizeInPlaceBlocked = false;
liveFinalized = false;
loop.resetPending();
loop.resetThrottleWindow();
};
@@ -162,6 +203,7 @@ export function createMatrixDraftStream(params: {
},
flush: loop.flush,
stop,
finalizeLive,
reset,
eventId: () => currentEventId,
matchesPreparedText: (text: string) =>

View File

@@ -48,7 +48,7 @@ vi.mock("../send.js", () => ({
sendTypingMatrix: vi.fn(async () => {}),
}));
const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => {}));
const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => true));
vi.mock("./replies.js", () => ({
deliverMatrixReplies: deliverMatrixRepliesMock,
@@ -2005,7 +2005,7 @@ describe("matrix monitor handler draft streaming", () => {
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined);
deliverMatrixRepliesMock.mockReset().mockResolvedValue(true);
const redactEventMock = vi.fn(async () => "$redacted");
@@ -2119,7 +2119,15 @@ describe("matrix monitor handler draft streaming", () => {
await deliver({ text: "Single block" }, { kind: "final" });
expect(editMessageMatrixMock).not.toHaveBeenCalled();
// MSC4357: even when text is unchanged, a finalize edit is sent to clear
// the live marker so supporting clients stop the streaming animation.
expect(editMessageMatrixMock).toHaveBeenCalledTimes(1);
expect(editMessageMatrixMock).toHaveBeenCalledWith(
"!room:example.org",
"$draft1",
"Single block",
expect.objectContaining({ live: false }),
);
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
expect(redactEventMock).not.toHaveBeenCalled();
await finish();
@@ -2139,13 +2147,12 @@ describe("matrix monitor handler draft streaming", () => {
await deliver({ text: "Single block" }, { kind: "final" });
expect(editMessageMatrixMock).toHaveBeenCalledTimes(1);
expect(editMessageMatrixMock).toHaveBeenCalledWith(
"!room:example.org",
"$draft1",
"Single block",
expect.not.objectContaining({
extraContent: { [MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY]: true },
}),
expect.not.objectContaining({ live: false }),
);
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
expect(redactEventMock).not.toHaveBeenCalled();
@@ -2523,12 +2530,14 @@ describe("matrix monitor handler draft streaming", () => {
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined);
deliverMatrixRepliesMock.mockReset().mockResolvedValue(true);
const redactEventMock = vi.fn(async () => "$redacted");
let capturedReplyOpts: ReplyOpts | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "quiet",
client: { redactEvent: redactEventMock },
createReplyDispatcherWithTyping: () => ({
dispatcher: { markComplete: () => {}, waitForIdle: async () => {} },
replyOptions: {},
@@ -2561,6 +2570,8 @@ describe("matrix monitor handler draft streaming", () => {
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
// After handler exits, draft stream timer must not fire.
sendSingleTextMessageMatrixMock.mockClear();
editMessageMatrixMock.mockClear();
@@ -2572,6 +2583,73 @@ describe("matrix monitor handler draft streaming", () => {
}
});
it("redacts partial live drafts when generation aborts mid-stream", async () => {
sendSingleTextMessageMatrixMock
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(true);
const redactEventMock = vi.fn(async () => "$redacted");
let capturedReplyOpts: ReplyOpts | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
client: { redactEvent: redactEventMock },
createReplyDispatcherWithTyping: () => ({
dispatcher: { markComplete: () => {}, waitForIdle: async () => {} },
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => {
capturedReplyOpts = args?.replyOptions;
capturedReplyOpts?.onPartialReply?.({ text: "partial" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
throw new Error("model timeout");
}) as never,
withReplyDispatcher: async <T>(params: {
dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise<void> };
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const result = await params.run();
await params.onSettled?.();
return result;
},
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
});
it("keeps shutdown cleanup for empty final payloads that send nothing", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "partial" });
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
deliverMatrixRepliesMock.mockResolvedValue(false);
await deliver({}, { kind: "final" });
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
expect(redactEventMock).not.toHaveBeenCalled();
await finish();
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
});
it("skips compaction notices in draft finalization", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
@@ -2605,6 +2683,7 @@ describe("matrix monitor handler draft streaming", () => {
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Final text", replyToId: "$different_msg" }, { kind: "final" });
expect(editMessageMatrixMock).not.toHaveBeenCalled();
// Draft should be redacted since it can't change reply relation.
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
// Final answer delivered via normal path.
@@ -2630,6 +2709,7 @@ describe("matrix monitor handler draft streaming", () => {
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Final text" }, { kind: "final" });
expect(editMessageMatrixMock).not.toHaveBeenCalled();
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
@@ -2647,11 +2727,51 @@ describe("matrix monitor handler draft streaming", () => {
deliverMatrixRepliesMock.mockClear();
await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" });
expect(editMessageMatrixMock).not.toHaveBeenCalled();
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("finalizes partial drafts before reusing unchanged media captions", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "partial" });
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "@room screenshot ready" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
await deliver(
{
text: "@room screenshot ready",
mediaUrl: "https://example.com/image.png",
},
{ kind: "final" },
);
expect(editMessageMatrixMock).toHaveBeenCalledTimes(1);
expect(editMessageMatrixMock).toHaveBeenCalledWith(
"!room:example.org",
"$draft1",
"@room screenshot ready",
expect.objectContaining({ live: false }),
);
expect(redactEventMock).not.toHaveBeenCalled();
expect(deliverMatrixRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replies: [
expect.objectContaining({
mediaUrl: "https://example.com/image.png",
text: undefined,
}),
],
}),
);
await finish();
});
it("finalizes quiet drafts before reusing unchanged media captions", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "quiet" });
const { deliver, opts, finish } = await dispatch();

View File

@@ -412,6 +412,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const eventId = typeof event.event_id === "string" ? event.event_id.trim() : "";
let claimedInboundEvent = false;
let draftStreamRef: ReturnType<typeof createMatrixDraftStream> | undefined;
let draftConsumed = false;
try {
const eventType = event.type;
if (eventType === EventType.RoomMessageEncrypted) {
@@ -1330,9 +1331,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const pendingDraftBoundaries: PendingDraftBoundary[] = [];
const latestQueuedDraftBoundaryOffsets = new Map<number, number>();
let currentDraftReplyToId = draftReplyToId;
// Set after the first final payload consumes the draft event so
// subsequent finals go through normal delivery.
let draftConsumed = false;
// Set after the first final payload consumes or discards the draft event
// so subsequent finals go through normal delivery.
const getDisplayableDraftText = () => {
const nextDraftBoundaryOffset = pendingDraftBoundaries.find(
@@ -1448,6 +1448,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? buildMatrixFinalizedPreviewContent()
: undefined,
});
} else if (!(await draftStream.finalizeLive())) {
throw new Error("Matrix draft live finalize failed");
}
} catch {
await redactMatrixDraftEvent(client, roomId, draftEventId);
@@ -1469,10 +1471,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
} else if (draftEventId && hasMedia && !payloadReplyMismatch) {
let textEditOk = !mustDeliverFinalNormally;
const payloadText = payload.text;
const payloadTextMatchesDraft =
typeof payloadText === "string" && draftStream.matchesPreparedText(payloadText);
const reusesDraftTextUnchanged =
typeof payloadText === "string" &&
Boolean(payloadText.trim()) &&
payloadTextMatchesDraft;
const requiresFinalTextEdit =
quietDraftStreaming ||
(typeof payloadText === "string" &&
!draftStream.matchesPreparedText(payloadText));
(typeof payloadText === "string" && !payloadTextMatchesDraft);
if (textEditOk && payloadText && requiresFinalTextEdit) {
textEditOk = await editMessageMatrix(roomId, draftEventId, payloadText, {
client,
@@ -1486,6 +1493,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
() => true,
() => false,
);
} else if (textEditOk && reusesDraftTextUnchanged) {
textEditOk = await draftStream.finalizeLive();
}
const reusesDraftAsFinalText = Boolean(payload.text?.trim()) && textEditOk;
if (!reusesDraftAsFinalText) {
@@ -1508,10 +1517,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
});
draftConsumed = true;
} else {
if (draftEventId && (payloadReplyMismatch || mustDeliverFinalNormally)) {
const draftRedacted =
Boolean(draftEventId) && (payloadReplyMismatch || mustDeliverFinalNormally);
if (draftRedacted && draftEventId) {
await redactMatrixDraftEvent(client, roomId, draftEventId);
}
await deliverMatrixReplies({
const deliveredFallback = await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
@@ -1524,6 +1535,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
mediaLocalRoots,
tableMode,
});
if (draftRedacted || deliveredFallback) {
draftConsumed = true;
}
}
if (info.kind === "block") {
@@ -1652,7 +1666,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
// Stop the draft stream timer so partial drafts don't leak if the
// model run throws or times out mid-stream.
if (draftStreamRef) {
await draftStreamRef.stop().catch(() => {});
const draftEventId = await draftStreamRef.stop().catch(() => undefined);
if (draftEventId && !draftConsumed) {
await redactMatrixDraftEvent(client, roomId, draftEventId);
}
}
if (claimedInboundEvent && inboundDeduper && eventId) {
inboundDeduper.releaseEvent({ roomId, eventId });

View File

@@ -41,7 +41,7 @@ export async function deliverMatrixReplies(params: {
accountId?: string;
mediaLocalRoots?: readonly string[];
tableMode?: MarkdownTableMode;
}): Promise<void> {
}): Promise<boolean> {
const core = getMatrixRuntime();
const tableMode =
params.tableMode ??
@@ -56,6 +56,7 @@ export async function deliverMatrixReplies(params: {
}
};
let hasReplied = false;
let deliveredAny = false;
for (const reply of params.replies) {
if (reply.isReasoning === true || shouldSuppressReasoningReplyText(reply.text)) {
logVerbose("matrix reply suppressed as reasoning-only");
@@ -102,6 +103,7 @@ export async function deliverMatrixReplies(params: {
threadId: params.threadId,
accountId: params.accountId,
});
deliveredAny = true;
sentTextChunk = true;
}
if (replyToIdForReply && !hasReplied && sentTextChunk) {
@@ -123,10 +125,12 @@ export async function deliverMatrixReplies(params: {
audioAsVoice: reply.audioAsVoice,
accountId: params.accountId,
});
deliveredAny = true;
first = false;
}
if (replyToIdForReply && !hasReplied) {
hasReplied = true;
}
}
return deliveredAny;
}

View File

@@ -31,6 +31,7 @@ import {
import { normalizeThreadId, resolveMatrixRoomId } from "./send/targets.js";
import {
EventType,
MSC4357_LIVE_KEY,
MsgType,
RelationType,
type MatrixExtraContentFields,
@@ -413,6 +414,8 @@ export async function sendSingleTextMessageMatrix(
msgtype?: MatrixTextMsgType;
includeMentions?: boolean;
extraContent?: MatrixExtraContentFields;
/** When true, marks the message as a live/streaming update (MSC4357). */
live?: boolean;
} = {},
): Promise<MatrixSendResult> {
const { trimmedText, convertedText, singleEventLimit, fitsInSingleEvent } =
@@ -452,6 +455,11 @@ export async function sendSingleTextMessageMatrix(
markdown: convertedText,
includeMentions: opts.includeMentions,
});
// MSC4357: mark the initial message as live so supporting clients start
// rendering a streaming animation immediately.
if (opts.live) {
(content as Record<string, unknown>)[MSC4357_LIVE_KEY] = {};
}
const eventId = await client.sendMessage(resolvedRoom, content);
return {
messageId: eventId ?? "unknown",
@@ -492,6 +500,8 @@ export async function editMessageMatrix(
msgtype?: MatrixTextMsgType;
includeMentions?: boolean;
extraContent?: MatrixExtraContentFields;
/** When true, marks the edit as a live/streaming update (MSC4357). */
live?: boolean;
} = {},
): Promise<string> {
return await withResolvedMatrixSendClient(
@@ -561,6 +571,15 @@ export async function editMessageMatrix(
content["m.mentions"] = replaceMentions;
}
// MSC4357: mark in-progress edits so supporting clients can render a
// streaming animation. The marker is placed in both the outer content
// (for unencrypted rooms / server-side aggregation) and inside
// m.new_content (for E2EE rooms where only decrypted content is read).
if (opts.live) {
content[MSC4357_LIVE_KEY] = {};
(content["m.new_content"] as Record<string, unknown>)[MSC4357_LIVE_KEY] = {};
}
const eventId = await client.sendMessage(resolvedRoom, content);
return eventId ?? "";
},

View File

@@ -122,3 +122,13 @@ export type MatrixFormattedContent = MessageEventContent & {
};
export type MatrixExtraContentFields = Record<string, unknown>;
/**
* MSC4357 live marker key.
* When present on event content, signals that the message is still being
* streamed (e.g. an LLM generating a response). Supporting clients render
* the message with a streaming animation until an edit without this marker
* arrives, indicating the stream is complete.
* @see https://github.com/matrix-org/matrix-spec-proposals/pull/4357
*/
export const MSC4357_LIVE_KEY = "org.matrix.msc4357.live" as const;