mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-15 03:01:02 +00:00
fix(matrix): quiet streamed preview notifications
This commit is contained in:
@@ -196,6 +196,7 @@ done:
|
||||
- `streaming: "partial"` creates one editable preview message for the current assistant block instead of sending multiple partial messages.
|
||||
- `blockStreaming: true` enables separate Matrix progress messages. With `streaming: "partial"`, Matrix keeps the live draft for the current block and preserves completed blocks as separate messages.
|
||||
- When `streaming: "partial"` and `blockStreaming` is off, Matrix only edits the live draft and sends the completed reply once that block or turn finishes.
|
||||
- Draft preview events use quiet Matrix notices, so notifications fire on completed blocks or the final completed reply instead of the first streamed token.
|
||||
- If the preview no longer fits in one Matrix event, OpenClaw stops preview streaming and falls back to normal final delivery.
|
||||
- Media replies still send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply.
|
||||
- Preview edits cost extra Matrix API calls. Leave streaming off if you want the most conservative rate-limit behavior.
|
||||
|
||||
@@ -80,6 +80,10 @@ describe("createMatrixDraftStream", () => {
|
||||
await stream.flush();
|
||||
|
||||
expect(sendMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(sendMessageMock.mock.calls[0]?.[1]).toMatchObject({
|
||||
msgtype: "m.notice",
|
||||
});
|
||||
expect(sendMessageMock.mock.calls[0]?.[1]).not.toHaveProperty("m.mentions");
|
||||
expect(stream.eventId()).toBe("$evt1");
|
||||
});
|
||||
|
||||
@@ -102,6 +106,10 @@ describe("createMatrixDraftStream", () => {
|
||||
|
||||
// First call = initial send, second call = edit (both go through sendMessage)
|
||||
expect(sendMessageMock).toHaveBeenCalledTimes(2);
|
||||
expect(sendMessageMock.mock.calls[1]?.[1]).toMatchObject({
|
||||
msgtype: "m.notice",
|
||||
"m.new_content": { msgtype: "m.notice" },
|
||||
});
|
||||
});
|
||||
|
||||
it("coalesces rapid updates within throttle window", async () => {
|
||||
@@ -122,6 +130,11 @@ describe("createMatrixDraftStream", () => {
|
||||
expect(sendMessageMock.mock.calls[0][1]).toMatchObject({ body: "A" });
|
||||
// Edit uses "* <text>" prefix per Matrix m.replace spec.
|
||||
expect(sendMessageMock.mock.calls[1][1]).toMatchObject({ body: "* ABC" });
|
||||
expect(sendMessageMock.mock.calls[0][1]).toMatchObject({ msgtype: "m.notice" });
|
||||
expect(sendMessageMock.mock.calls[1][1]).toMatchObject({
|
||||
msgtype: "m.notice",
|
||||
"m.new_content": { msgtype: "m.notice" },
|
||||
});
|
||||
});
|
||||
|
||||
it("skips no-op updates", async () => {
|
||||
@@ -296,7 +309,6 @@ describe("createMatrixDraftStream", () => {
|
||||
|
||||
expect(sendMessageMock).not.toHaveBeenCalled();
|
||||
expect(stream.eventId()).toBeUndefined();
|
||||
expect(stream.mustDeliverFinalNormally()).toBe(true);
|
||||
expect(log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("preview exceeded single-event limit"),
|
||||
);
|
||||
@@ -317,7 +329,6 @@ describe("createMatrixDraftStream", () => {
|
||||
await stream.flush();
|
||||
|
||||
expect(sendMessageMock).not.toHaveBeenCalled();
|
||||
expect(stream.mustDeliverFinalNormally()).toBe(true);
|
||||
expect(log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("preview exceeded single-event limit"),
|
||||
);
|
||||
|
||||
@@ -2,8 +2,10 @@ import { createDraftStreamLoop } from "openclaw/plugin-sdk/channel-lifecycle";
|
||||
import type { CoreConfig } from "../types.js";
|
||||
import type { MatrixClient } from "./sdk.js";
|
||||
import { editMessageMatrix, prepareMatrixSingleText, sendSingleTextMessageMatrix } from "./send.js";
|
||||
import { MsgType } from "./send/types.js";
|
||||
|
||||
const DEFAULT_THROTTLE_MS = 1000;
|
||||
const DRAFT_PREVIEW_MSGTYPE = MsgType.Notice;
|
||||
|
||||
export type MatrixDraftStream = {
|
||||
/** Update the draft with the latest accumulated text for the current block. */
|
||||
@@ -18,8 +20,6 @@ export type MatrixDraftStream = {
|
||||
eventId: () => string | undefined;
|
||||
/** The last text successfully sent or edited. */
|
||||
lastSentText: () => string;
|
||||
/** True when preview streaming must fall back to normal final delivery. */
|
||||
mustDeliverFinalNormally: () => boolean;
|
||||
};
|
||||
|
||||
export function createMatrixDraftStream(params: {
|
||||
@@ -38,8 +38,6 @@ export function createMatrixDraftStream(params: {
|
||||
let currentEventId: string | undefined;
|
||||
let lastSentText = "";
|
||||
let stopped = false;
|
||||
let sendFailed = false;
|
||||
let finalizeInPlaceBlocked = false;
|
||||
let replyToId = params.replyToId;
|
||||
|
||||
const sendOrEdit = async (text: string): Promise<boolean> => {
|
||||
@@ -49,21 +47,12 @@ export function createMatrixDraftStream(params: {
|
||||
}
|
||||
const preparedText = prepareMatrixSingleText(trimmed, { cfg, accountId });
|
||||
if (!preparedText.fitsInSingleEvent) {
|
||||
finalizeInPlaceBlocked = true;
|
||||
if (!currentEventId) {
|
||||
sendFailed = true;
|
||||
}
|
||||
stopped = true;
|
||||
log?.(
|
||||
`draft-stream: preview exceeded single-event limit (${preparedText.convertedText.length} > ${preparedText.singleEventLimit})`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
// If the initial send failed, stop trying for this block. The deliver
|
||||
// callback will fall back to deliverMatrixReplies.
|
||||
if (sendFailed) {
|
||||
return false;
|
||||
}
|
||||
if (preparedText.trimmedText === lastSentText) {
|
||||
return true;
|
||||
}
|
||||
@@ -75,6 +64,8 @@ export function createMatrixDraftStream(params: {
|
||||
replyToId,
|
||||
threadId,
|
||||
accountId,
|
||||
msgtype: DRAFT_PREVIEW_MSGTYPE,
|
||||
includeMentions: false,
|
||||
});
|
||||
currentEventId = result.messageId;
|
||||
lastSentText = preparedText.trimmedText;
|
||||
@@ -85,25 +76,14 @@ export function createMatrixDraftStream(params: {
|
||||
cfg,
|
||||
threadId,
|
||||
accountId,
|
||||
msgtype: DRAFT_PREVIEW_MSGTYPE,
|
||||
includeMentions: false,
|
||||
});
|
||||
lastSentText = preparedText.trimmedText;
|
||||
}
|
||||
return true;
|
||||
} catch (err) {
|
||||
log?.(`draft-stream: send/edit failed: ${String(err)}`);
|
||||
const isPreviewLimitError =
|
||||
err instanceof Error && err.message.startsWith("Matrix single-message text exceeds limit");
|
||||
if (isPreviewLimitError) {
|
||||
// Once the preview no longer fits in one editable event, preserve the
|
||||
// current preview as-is and fall back to normal final delivery.
|
||||
finalizeInPlaceBlocked = true;
|
||||
}
|
||||
if (!currentEventId) {
|
||||
// First send failed — give up for this block so the deliver callback
|
||||
// falls through to normal delivery.
|
||||
sendFailed = true;
|
||||
}
|
||||
// Signal failure so the loop stops retrying.
|
||||
stopped = true;
|
||||
return false;
|
||||
}
|
||||
@@ -131,8 +111,6 @@ export function createMatrixDraftStream(params: {
|
||||
currentEventId = undefined;
|
||||
lastSentText = "";
|
||||
stopped = false;
|
||||
sendFailed = false;
|
||||
finalizeInPlaceBlocked = false;
|
||||
loop.resetPending();
|
||||
loop.resetThrottleWindow();
|
||||
};
|
||||
@@ -149,6 +127,5 @@ export function createMatrixDraftStream(params: {
|
||||
reset,
|
||||
eventId: () => currentEventId,
|
||||
lastSentText: () => lastSentText,
|
||||
mustDeliverFinalNormally: () => sendFailed || finalizeInPlaceBlocked,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2067,7 +2067,7 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
return { dispatch, redactEventMock };
|
||||
}
|
||||
|
||||
it("finalizes a single partial-preview block in place when block streaming is enabled", async () => {
|
||||
it("redacts the quiet preview and sends the final message normally", async () => {
|
||||
const { dispatch, redactEventMock } = createStreamingHarness({ blockStreamingEnabled: true });
|
||||
const { deliver, opts, finish } = await dispatch();
|
||||
|
||||
@@ -2080,12 +2080,12 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
await deliver({ text: "Single block" }, { kind: "final" });
|
||||
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
await finish();
|
||||
});
|
||||
|
||||
it("preserves completed blocks by rotating to a new draft when block streaming is enabled", async () => {
|
||||
it("sends completed blocks normally and rotates to a new quiet preview", async () => {
|
||||
const { dispatch, redactEventMock } = createStreamingHarness({ blockStreamingEnabled: true });
|
||||
const { deliver, opts, finish } = await dispatch();
|
||||
|
||||
@@ -2097,8 +2097,8 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
deliverMatrixRepliesMock.mockClear();
|
||||
await deliver({ text: "Block one" }, { kind: "block" });
|
||||
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
|
||||
opts.onAssistantMessageStart?.();
|
||||
sendSingleTextMessageMatrixMock.mockResolvedValueOnce({
|
||||
@@ -2112,8 +2112,8 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
|
||||
await deliver({ text: "Block two" }, { kind: "final" });
|
||||
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(2);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft2");
|
||||
await finish();
|
||||
});
|
||||
|
||||
@@ -2145,8 +2145,8 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
expect(sendSingleTextMessageMatrixMock.mock.calls[1]?.[1]).toBe("Beta");
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
await finish();
|
||||
});
|
||||
|
||||
@@ -2183,35 +2183,30 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toBe("Beta");
|
||||
expect(editMessageMatrixMock).toHaveBeenCalledWith(
|
||||
"!room:example.org",
|
||||
"$draft1",
|
||||
"Alpha",
|
||||
expect.anything(),
|
||||
);
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(editMessageMatrixMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
await finish();
|
||||
});
|
||||
|
||||
it("falls back to deliverMatrixReplies when final edit fails", async () => {
|
||||
it("sends finals normally instead of relying on draft edits", async () => {
|
||||
const { dispatch } = createStreamingHarness();
|
||||
const { deliver, opts, finish } = await dispatch();
|
||||
|
||||
// Simulate streaming: partial reply creates draft message.
|
||||
opts.onPartialReply?.({ text: "Hello" });
|
||||
// Wait for the draft stream's immediate send to complete.
|
||||
await vi.waitFor(() => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
// Make the final edit fail.
|
||||
editMessageMatrixMock.mockRejectedValueOnce(new Error("rate limited"));
|
||||
|
||||
// Deliver final — should catch edit failure and fall back.
|
||||
await deliver({ text: "Hello world" }, { kind: "block" });
|
||||
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageMatrixMock).not.toHaveBeenCalledWith(
|
||||
"!room:example.org",
|
||||
"$draft1",
|
||||
"Hello world",
|
||||
expect.anything(),
|
||||
);
|
||||
await finish();
|
||||
});
|
||||
|
||||
@@ -2303,12 +2298,9 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
});
|
||||
await deliver({ text: "Alpha" }, { kind: "block" });
|
||||
|
||||
expect(editMessageMatrixMock).toHaveBeenCalledWith(
|
||||
"!room:example.org",
|
||||
"$draft1",
|
||||
"Alpha",
|
||||
expect.anything(),
|
||||
);
|
||||
expect(editMessageMatrixMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
await vi.waitFor(() => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
@@ -2316,8 +2308,8 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
|
||||
await deliver({ text: "Beta" }, { kind: "final" });
|
||||
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(2);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft2");
|
||||
await finish();
|
||||
});
|
||||
|
||||
@@ -2352,12 +2344,9 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
});
|
||||
await deliver({ text: "Alpha" }, { kind: "block" });
|
||||
|
||||
expect(editMessageMatrixMock).toHaveBeenCalledWith(
|
||||
"!room:example.org",
|
||||
"$draft1",
|
||||
"Alpha",
|
||||
expect.anything(),
|
||||
);
|
||||
expect(editMessageMatrixMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
await vi.waitFor(() => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
@@ -2365,8 +2354,8 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
|
||||
await deliver({ text: "Beta" }, { kind: "final" });
|
||||
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(2);
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft2");
|
||||
await finish();
|
||||
});
|
||||
|
||||
@@ -2578,7 +2567,6 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
await deliver({ text: "123456" }, { kind: "final" });
|
||||
|
||||
expect(editMessageMatrixMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
|
||||
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
await finish();
|
||||
|
||||
@@ -25,7 +25,6 @@ import {
|
||||
} from "../poll-types.js";
|
||||
import type { LocationMessageEventContent, MatrixClient } from "../sdk.js";
|
||||
import {
|
||||
editMessageMatrix,
|
||||
reactMatrixMessage,
|
||||
sendMessageMatrix,
|
||||
sendReadReceiptMatrix,
|
||||
@@ -1282,7 +1281,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
});
|
||||
const draftStreamingEnabled = streaming === "partial";
|
||||
const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined;
|
||||
let currentDraftReplyToId = draftReplyToId;
|
||||
const draftStream = draftStreamingEnabled
|
||||
? createMatrixDraftStream({
|
||||
roomId,
|
||||
@@ -1308,9 +1306,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
let latestDraftFullText = "";
|
||||
const pendingDraftBoundaries: PendingDraftBoundary[] = [];
|
||||
const latestQueuedDraftBoundaryOffsets = new Map<number, number>();
|
||||
// Set after the first final payload consumes the draft event so
|
||||
// subsequent finals go through normal delivery.
|
||||
let draftConsumed = false;
|
||||
|
||||
const getDisplayableDraftText = () => {
|
||||
const nextDraftBoundaryOffset = pendingDraftBoundaries.find(
|
||||
@@ -1377,152 +1372,28 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, _route.agentId),
|
||||
deliver: async (payload: ReplyPayload, info: { kind: string }) => {
|
||||
if (draftStream && info.kind !== "tool" && !payload.isCompactionNotice) {
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
|
||||
await draftStream.stop();
|
||||
|
||||
// After the first final payload consumes the draft, subsequent
|
||||
// finals must go through normal delivery to avoid overwriting.
|
||||
if (draftConsumed) {
|
||||
await deliverMatrixReplies({
|
||||
cfg,
|
||||
replies: [payload],
|
||||
roomId,
|
||||
client,
|
||||
runtime,
|
||||
textLimit,
|
||||
replyToMode,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
mediaLocalRoots,
|
||||
tableMode,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Read event id after stop() — flush may have created the
|
||||
// initial message while draining pending text.
|
||||
const draftEventId = draftStream.eventId();
|
||||
|
||||
// If the payload carries a reply target that differs from the
|
||||
// draft's, fall through to normal delivery — Matrix edits
|
||||
// cannot change the reply relation on an existing event.
|
||||
// Skip when replyToMode is "off" (replies stripped anyway)
|
||||
// or when threadTarget is set (thread relations take
|
||||
// precedence over replyToId in deliverMatrixReplies).
|
||||
const payloadReplyToId = payload.replyToId?.trim() || undefined;
|
||||
const payloadReplyMismatch =
|
||||
replyToMode !== "off" &&
|
||||
!threadTarget &&
|
||||
payloadReplyToId !== currentDraftReplyToId;
|
||||
const mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally();
|
||||
|
||||
if (
|
||||
draftEventId &&
|
||||
payload.text &&
|
||||
!hasMedia &&
|
||||
!payloadReplyMismatch &&
|
||||
!mustDeliverFinalNormally
|
||||
) {
|
||||
// Text-only: final edit of the draft message. Skip if
|
||||
// stop() already flushed identical text to avoid a
|
||||
// redundant API call that wastes rate-limit budget.
|
||||
if (payload.text !== draftStream.lastSentText()) {
|
||||
try {
|
||||
await editMessageMatrix(roomId, draftEventId, payload.text, {
|
||||
client,
|
||||
cfg,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
});
|
||||
} catch {
|
||||
// Edit failed (rate limit, server error) — redact the
|
||||
// stale draft and fall back to normal delivery so the
|
||||
// user still gets the final answer.
|
||||
await client.redactEvent(roomId, draftEventId).catch(() => {});
|
||||
await deliverMatrixReplies({
|
||||
cfg,
|
||||
replies: [payload],
|
||||
roomId,
|
||||
client,
|
||||
runtime,
|
||||
textLimit,
|
||||
replyToMode,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
mediaLocalRoots,
|
||||
tableMode,
|
||||
});
|
||||
}
|
||||
}
|
||||
draftConsumed = true;
|
||||
} else if (draftEventId && hasMedia && !payloadReplyMismatch) {
|
||||
// Media payload: finalize draft text, send media separately.
|
||||
let textEditOk = !mustDeliverFinalNormally;
|
||||
if (textEditOk && payload.text && payload.text !== draftStream.lastSentText()) {
|
||||
textEditOk = await editMessageMatrix(roomId, draftEventId, payload.text, {
|
||||
client,
|
||||
cfg,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
}).then(
|
||||
() => true,
|
||||
() => false,
|
||||
);
|
||||
}
|
||||
const reusesDraftAsFinalText = Boolean(payload.text?.trim()) && textEditOk;
|
||||
// If the text edit failed, or there is no final text to reuse
|
||||
// the preview, redact the stale draft and include text in media
|
||||
// delivery so the final caption is not lost.
|
||||
if (!reusesDraftAsFinalText) {
|
||||
await client.redactEvent(roomId, draftEventId).catch(() => {});
|
||||
}
|
||||
await deliverMatrixReplies({
|
||||
cfg,
|
||||
replies: [
|
||||
{ ...payload, text: reusesDraftAsFinalText ? undefined : payload.text },
|
||||
],
|
||||
roomId,
|
||||
client,
|
||||
runtime,
|
||||
textLimit,
|
||||
replyToMode,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
mediaLocalRoots,
|
||||
tableMode,
|
||||
});
|
||||
draftConsumed = true;
|
||||
} else {
|
||||
// Redact stale draft when the final delivery will create a
|
||||
// new message (reply-target mismatch, preview overflow, or no
|
||||
// usable draft).
|
||||
if (draftEventId && (payloadReplyMismatch || mustDeliverFinalNormally)) {
|
||||
await client.redactEvent(roomId, draftEventId).catch(() => {});
|
||||
}
|
||||
await deliverMatrixReplies({
|
||||
cfg,
|
||||
replies: [payload],
|
||||
roomId,
|
||||
client,
|
||||
runtime,
|
||||
textLimit,
|
||||
replyToMode,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
mediaLocalRoots,
|
||||
tableMode,
|
||||
});
|
||||
await deliverMatrixReplies({
|
||||
cfg,
|
||||
replies: [payload],
|
||||
roomId,
|
||||
client,
|
||||
runtime,
|
||||
textLimit,
|
||||
replyToMode,
|
||||
threadId: threadTarget,
|
||||
accountId: _route.accountId,
|
||||
mediaLocalRoots,
|
||||
tableMode,
|
||||
});
|
||||
if (draftEventId) {
|
||||
await client.redactEvent(roomId, draftEventId).catch(() => {});
|
||||
}
|
||||
|
||||
// Only reset for intermediate blocks — after the final delivery
|
||||
// the stream must stay stopped so late async callbacks cannot
|
||||
// create ghost messages.
|
||||
if (info.kind === "block") {
|
||||
draftConsumed = false;
|
||||
advanceDraftBlockBoundary({ fallbackToLatestEnd: true });
|
||||
draftStream.reset();
|
||||
currentDraftReplyToId = replyToMode === "all" ? draftReplyToId : undefined;
|
||||
updateDraftFromLatestFullText();
|
||||
|
||||
// Re-assert typing so the user still sees the indicator while
|
||||
|
||||
@@ -602,6 +602,25 @@ describe("sendSingleTextMessageMatrix", () => {
|
||||
|
||||
expect(sendMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("supports quiet draft preview sends without mention metadata", async () => {
|
||||
const { client, sendMessage } = makeClient();
|
||||
|
||||
await sendSingleTextMessageMatrix("room:!room:example", "@room hi @alice:example.org", {
|
||||
client,
|
||||
msgtype: "m.notice",
|
||||
includeMentions: false,
|
||||
});
|
||||
|
||||
expect(sendMessage.mock.calls[0]?.[1]).toMatchObject({
|
||||
msgtype: "m.notice",
|
||||
body: "@room hi @alice:example.org",
|
||||
});
|
||||
expect(sendMessage.mock.calls[0]?.[1]).not.toHaveProperty("m.mentions");
|
||||
expect(
|
||||
(sendMessage.mock.calls[0]?.[1] as { formatted_body?: string }).formatted_body,
|
||||
).not.toContain("matrix.to");
|
||||
});
|
||||
});
|
||||
|
||||
describe("editMessageMatrix mentions", () => {
|
||||
@@ -677,6 +696,41 @@ describe("editMessageMatrix mentions", () => {
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("supports quiet draft preview edits without mention metadata", async () => {
|
||||
const { client, sendMessage, getEvent } = makeClient();
|
||||
getEvent.mockResolvedValue({
|
||||
content: {
|
||||
body: "@room hi @alice:example.org",
|
||||
"m.mentions": { room: true, user_ids: ["@alice:example.org"] },
|
||||
},
|
||||
});
|
||||
|
||||
await editMessageMatrix("room:!room:example", "$original", "@room hi @alice:example.org", {
|
||||
client,
|
||||
msgtype: "m.notice",
|
||||
includeMentions: false,
|
||||
});
|
||||
|
||||
expect(sendMessage.mock.calls[0]?.[1]).toMatchObject({
|
||||
msgtype: "m.notice",
|
||||
"m.new_content": {
|
||||
msgtype: "m.notice",
|
||||
},
|
||||
});
|
||||
expect(sendMessage.mock.calls[0]?.[1]).not.toHaveProperty("m.mentions");
|
||||
expect(sendMessage.mock.calls[0]?.[1]?.["m.new_content"]).not.toHaveProperty("m.mentions");
|
||||
expect(
|
||||
(sendMessage.mock.calls[0]?.[1] as { formatted_body?: string }).formatted_body,
|
||||
).not.toContain("matrix.to");
|
||||
expect(
|
||||
(
|
||||
sendMessage.mock.calls[0]?.[1] as {
|
||||
"m.new_content"?: { formatted_body?: string };
|
||||
}
|
||||
)["m.new_content"]?.formatted_body,
|
||||
).not.toContain("matrix.to");
|
||||
});
|
||||
});
|
||||
|
||||
describe("sendPollMatrix mentions", () => {
|
||||
|
||||
@@ -36,6 +36,7 @@ import {
|
||||
type MatrixOutboundContent,
|
||||
type MatrixSendOpts,
|
||||
type MatrixSendResult,
|
||||
type MatrixTextMsgType,
|
||||
} from "./send/types.js";
|
||||
|
||||
const MATRIX_TEXT_LIMIT = 4000;
|
||||
@@ -398,6 +399,8 @@ export async function sendSingleTextMessageMatrix(
|
||||
replyToId?: string;
|
||||
threadId?: string;
|
||||
accountId?: string;
|
||||
msgtype?: MatrixTextMsgType;
|
||||
includeMentions?: boolean;
|
||||
} = {},
|
||||
): Promise<MatrixSendResult> {
|
||||
const { trimmedText, convertedText, singleEventLimit, fitsInSingleEvent } =
|
||||
@@ -425,11 +428,14 @@ export async function sendSingleTextMessageMatrix(
|
||||
const relation = normalizedThreadId
|
||||
? buildThreadRelation(normalizedThreadId, opts.replyToId)
|
||||
: buildReplyRelation(opts.replyToId);
|
||||
const content = buildTextContent(convertedText, relation);
|
||||
const content = buildTextContent(convertedText, relation, {
|
||||
msgtype: opts.msgtype,
|
||||
});
|
||||
await enrichMatrixFormattedContent({
|
||||
client,
|
||||
content,
|
||||
markdown: convertedText,
|
||||
includeMentions: opts.includeMentions,
|
||||
});
|
||||
const eventId = await client.sendMessage(resolvedRoom, content);
|
||||
return {
|
||||
@@ -468,6 +474,8 @@ export async function editMessageMatrix(
|
||||
threadId?: string;
|
||||
accountId?: string;
|
||||
timeoutMs?: number;
|
||||
msgtype?: MatrixTextMsgType;
|
||||
includeMentions?: boolean;
|
||||
} = {},
|
||||
): Promise<string> {
|
||||
return await withResolvedMatrixSendClient(
|
||||
@@ -486,22 +494,27 @@ export async function editMessageMatrix(
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const convertedText = getCore().channel.text.convertMarkdownTables(newText, tableMode);
|
||||
const newContent = buildTextContent(convertedText);
|
||||
const newContent = buildTextContent(convertedText, undefined, {
|
||||
msgtype: opts.msgtype,
|
||||
});
|
||||
await enrichMatrixFormattedContent({
|
||||
client,
|
||||
content: newContent,
|
||||
markdown: convertedText,
|
||||
includeMentions: opts.includeMentions,
|
||||
});
|
||||
const previousEvent = await getPreviousMatrixEvent(client, resolvedRoom, originalEventId);
|
||||
const previousContent = resolvePreviousEditContent(previousEvent);
|
||||
const previousMentions = await resolvePreviousEditMentions({
|
||||
client,
|
||||
content: previousContent,
|
||||
});
|
||||
const replaceMentions = diffMatrixMentions(
|
||||
extractMatrixMentions(newContent),
|
||||
previousMentions,
|
||||
);
|
||||
const replaceMentions =
|
||||
opts.includeMentions === false
|
||||
? undefined
|
||||
: diffMatrixMentions(
|
||||
extractMatrixMentions(newContent),
|
||||
await resolvePreviousEditMentions({
|
||||
client,
|
||||
content: resolvePreviousEditContent(
|
||||
await getPreviousMatrixEvent(client, resolvedRoom, originalEventId),
|
||||
),
|
||||
}),
|
||||
);
|
||||
|
||||
const replaceRelation: Record<string, unknown> = {
|
||||
rel_type: RelationType.Replace,
|
||||
@@ -522,10 +535,12 @@ export async function editMessageMatrix(
|
||||
...(typeof newContent.formatted_body === "string"
|
||||
? { formatted_body: `* ${newContent.formatted_body}` }
|
||||
: {}),
|
||||
"m.mentions": replaceMentions,
|
||||
"m.new_content": newContent,
|
||||
"m.relates_to": replaceRelation,
|
||||
};
|
||||
if (replaceMentions !== undefined) {
|
||||
content["m.mentions"] = replaceMentions;
|
||||
}
|
||||
|
||||
const eventId = await client.sendMessage(resolvedRoom, content);
|
||||
return eventId ?? "";
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { getMatrixRuntime } from "../../runtime.js";
|
||||
import {
|
||||
markdownToMatrixHtml,
|
||||
resolveMatrixMentionsInMarkdown,
|
||||
renderMarkdownToMatrixHtmlWithMentions,
|
||||
type MatrixMentions,
|
||||
@@ -13,20 +14,45 @@ import {
|
||||
type MatrixRelation,
|
||||
type MatrixReplyRelation,
|
||||
type MatrixTextContent,
|
||||
type MatrixTextMsgType,
|
||||
type MatrixThreadRelation,
|
||||
} from "./types.js";
|
||||
|
||||
const getCore = () => getMatrixRuntime();
|
||||
|
||||
export function buildTextContent(body: string, relation?: MatrixRelation): MatrixTextContent {
|
||||
async function renderMatrixFormattedContent(params: {
|
||||
client: MatrixClient;
|
||||
markdown?: string | null;
|
||||
includeMentions?: boolean;
|
||||
}): Promise<{ html?: string; mentions?: MatrixMentions }> {
|
||||
const markdown = params.markdown ?? "";
|
||||
if (params.includeMentions === false) {
|
||||
const html = markdownToMatrixHtml(markdown).trimEnd();
|
||||
return { html: html || undefined };
|
||||
}
|
||||
const { html, mentions } = await renderMarkdownToMatrixHtmlWithMentions({
|
||||
markdown,
|
||||
client: params.client,
|
||||
});
|
||||
return { html, mentions };
|
||||
}
|
||||
|
||||
export function buildTextContent(
|
||||
body: string,
|
||||
relation?: MatrixRelation,
|
||||
opts: {
|
||||
msgtype?: MatrixTextMsgType;
|
||||
} = {},
|
||||
): MatrixTextContent {
|
||||
const msgtype = opts.msgtype ?? MsgType.Text;
|
||||
return relation
|
||||
? {
|
||||
msgtype: MsgType.Text,
|
||||
msgtype,
|
||||
body,
|
||||
"m.relates_to": relation,
|
||||
}
|
||||
: {
|
||||
msgtype: MsgType.Text,
|
||||
msgtype,
|
||||
body,
|
||||
};
|
||||
}
|
||||
@@ -35,12 +61,18 @@ export async function enrichMatrixFormattedContent(params: {
|
||||
client: MatrixClient;
|
||||
content: MatrixFormattedContent;
|
||||
markdown?: string | null;
|
||||
includeMentions?: boolean;
|
||||
}): Promise<void> {
|
||||
const { html, mentions } = await renderMarkdownToMatrixHtmlWithMentions({
|
||||
markdown: params.markdown ?? "",
|
||||
const { html, mentions } = await renderMatrixFormattedContent({
|
||||
client: params.client,
|
||||
markdown: params.markdown,
|
||||
includeMentions: params.includeMentions,
|
||||
});
|
||||
params.content["m.mentions"] = mentions;
|
||||
if (mentions) {
|
||||
params.content["m.mentions"] = mentions;
|
||||
} else {
|
||||
delete params.content["m.mentions"];
|
||||
}
|
||||
if (!html) {
|
||||
delete params.content.format;
|
||||
delete params.content.formatted_body;
|
||||
|
||||
@@ -110,6 +110,8 @@ export type MatrixMediaMsgType =
|
||||
| typeof MsgType.Video
|
||||
| typeof MsgType.File;
|
||||
|
||||
export type MatrixTextMsgType = typeof MsgType.Text | typeof MsgType.Notice;
|
||||
|
||||
export type MediaKind = "image" | "audio" | "video" | "document" | "unknown";
|
||||
|
||||
export type MatrixFormattedContent = MessageEventContent & {
|
||||
|
||||
Reference in New Issue
Block a user