fix: centralize draft preview finalization

This commit is contained in:
Peter Steinberger
2026-04-22 02:28:58 +01:00
parent ffef84dea7
commit fb9a21ae8f
33 changed files with 824 additions and 195 deletions

View File

@@ -78,4 +78,50 @@ describe("createDiscordDraftStream", () => {
expect(warn).toHaveBeenCalledWith(expect.stringContaining("discord stream preview stopped"));
expect(stream.messageId()).toBeUndefined();
});
it("discardPending keeps an existing preview but ignores later updates", async () => {
const rest = {
post: vi.fn(async () => ({ id: "m1" })),
patch: vi.fn(async () => undefined),
delete: vi.fn(async () => undefined),
};
const stream = createDiscordDraftStream({
rest: rest as never,
channelId: "c1",
throttleMs: 250,
});
stream.update("first draft");
await stream.flush();
await stream.discardPending();
stream.update("late draft");
await stream.flush();
expect(rest.post).toHaveBeenCalledTimes(1);
expect(rest.patch).not.toHaveBeenCalled();
expect(rest.delete).not.toHaveBeenCalled();
expect(stream.messageId()).toBe("m1");
});
it("seal keeps an existing preview and cancels pending final overwrites", async () => {
const rest = {
post: vi.fn(async () => ({ id: "m1" })),
patch: vi.fn(async () => undefined),
delete: vi.fn(async () => undefined),
};
const stream = createDiscordDraftStream({
rest: rest as never,
channelId: "c1",
throttleMs: 250,
});
stream.update("first draft");
await stream.flush();
stream.update("stale final draft");
await stream.seal();
expect(rest.post).toHaveBeenCalledTimes(1);
expect(rest.patch).not.toHaveBeenCalled();
expect(stream.messageId()).toBe("m1");
});
});

View File

@@ -12,6 +12,8 @@ export type DiscordDraftStream = {
flush: () => Promise<void>;
messageId: () => string | undefined;
clear: () => Promise<void>;
discardPending: () => Promise<void>;
seal: () => Promise<void>;
stop: () => Promise<void>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
@@ -113,7 +115,7 @@ export function createDiscordDraftStream(params: {
await rest.delete(Routes.channelMessage(channelId, messageId));
};
const { loop, update, stop, clear } = createFinalizableDraftLifecycle({
const { loop, update, stop, clear, discardPending, seal } = createFinalizableDraftLifecycle({
throttleMs,
state: streamState,
sendOrEditStreamMessage,
@@ -138,6 +140,8 @@ export function createDiscordDraftStream(params: {
flush: loop.flush,
messageId: () => streamMessageId,
clear,
discardPending,
seal,
stop,
forceNewMessage,
};

View File

@@ -11,11 +11,16 @@ const sendMocks = vi.hoisted(() => ({
>(async () => {}),
}));
function createMockDraftStream() {
let messageId: string | undefined = "preview-1";
return {
update: vi.fn<(text: string) => void>(() => {}),
flush: vi.fn(async () => {}),
messageId: vi.fn(() => "preview-1"),
clear: vi.fn(async () => {}),
messageId: vi.fn(() => messageId),
clear: vi.fn(async () => {
messageId = undefined;
}),
discardPending: vi.fn(async () => {}),
seal: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
forceNewMessage: vi.fn(() => {}),
};
@@ -820,6 +825,52 @@ describe("processDiscordMessage draft streaming", () => {
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});
it("does not flush draft previews for media finals before normal delivery", async () => {
const draftStream = createMockDraftStreamForTest();
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendFinalReply({
text: "Photo",
mediaUrl: "https://example.com/a.png",
} as never);
return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } };
});
const ctx = await createBaseContext({
discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 },
});
await processDiscordMessage(ctx as any);
expect(draftStream.flush).not.toHaveBeenCalled();
expect(draftStream.discardPending).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(editMessageDiscord).not.toHaveBeenCalled();
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});
it("does not flush draft previews for error finals before normal delivery", async () => {
const draftStream = createMockDraftStreamForTest();
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendFinalReply({
text: "Something failed",
isError: true,
} as never);
return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } };
});
const ctx = await createBaseContext({
discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 },
});
await processDiscordMessage(ctx as any);
expect(draftStream.flush).not.toHaveBeenCalled();
expect(draftStream.discardPending).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(editMessageDiscord).not.toHaveBeenCalled();
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});
it("suppresses reasoning payload delivery to Discord", async () => {
mockDispatchSingleBlockReply({ text: "thinking...", isReasoning: true });
await processStreamOffDiscordMessage();

View File

@@ -15,6 +15,7 @@ import {
formatInboundEnvelope,
resolveEnvelopeFormatOptions,
} from "openclaw/plugin-sdk/channel-inbound";
import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
resolveChannelStreamingBlockEnabled,
@@ -579,7 +580,7 @@ export async function processDiscordMessage(
resolveChannelStreamingBlockEnabled(discordConfig) ??
cfg.agents?.defaults?.blockStreamingDefault === "on";
const canStreamDraft = discordStreamMode !== "off" && !accountBlockStreamingEnabled;
const draftReplyToMessageId = () => replyReference.use();
const draftReplyToMessageId = () => replyReference.peek();
const deliverChannelId = deliverTarget.startsWith("channel:")
? deliverTarget.slice("channel:".length)
: messageChannelId;
@@ -605,6 +606,7 @@ export async function processDiscordMessage(
let draftText = "";
let hasStreamedMessage = false;
let finalizedViaPreviewMessage = false;
let draftFinalDeliveryHandled = false;
const previewToolProgressEnabled =
Boolean(draftStream) && resolveChannelStreamingPreviewToolProgress(discordConfig);
let previewToolProgressSuppressed = false;
@@ -770,7 +772,7 @@ export async function processDiscordMessage(
return;
}
if (draftStream && isFinal) {
await flushDraft();
draftFinalDeliveryHandled = true;
const reply = resolveSendableOutboundReplyParts(payload);
const hasMedia = reply.hasMedia;
const finalText = payload.text;
@@ -778,78 +780,79 @@ export async function processDiscordMessage(
const hasExplicitReplyDirective =
Boolean(payload.replyToTag || payload.replyToCurrent) ||
(typeof finalText === "string" && /\[\[\s*reply_to(?:_current|\s*:)/i.test(finalText));
const previewMessageId = draftStream.messageId();
// Try to finalize via preview edit (text-only, fits in 2000 chars, not an error)
const canFinalizeViaPreviewEdit =
!finalizedViaPreviewMessage &&
!hasMedia &&
typeof previewFinalText === "string" &&
typeof previewMessageId === "string" &&
!hasExplicitReplyDirective &&
!payload.isError;
if (canFinalizeViaPreviewEdit) {
await draftStream.stop();
if (isProcessAborted(abortSignal)) {
return;
}
try {
const result = await deliverFinalizableDraftPreview({
kind: info.kind,
payload,
draft: {
flush: flushDraft,
clear: draftStream.clear,
discardPending: draftStream.discardPending,
seal: draftStream.seal,
id: draftStream.messageId,
},
buildFinalEdit: () => {
if (
finalizedViaPreviewMessage ||
hasMedia ||
typeof previewFinalText !== "string" ||
hasExplicitReplyDirective ||
payload.isError
) {
return undefined;
}
return { content: previewFinalText };
},
editFinal: async (previewMessageId, edit) => {
if (isProcessAborted(abortSignal)) {
throw new Error("process aborted");
}
notifyFinalReplyStart();
await editMessageDiscord(
deliverChannelId,
previewMessageId,
{ content: previewFinalText },
{ rest: deliveryRest },
);
await editMessageDiscord(deliverChannelId, previewMessageId, edit, {
rest: deliveryRest,
});
},
deliverNormally: async () => {
if (isProcessAborted(abortSignal)) {
return false;
}
const replyToId = replyReference.use();
notifyFinalReplyStart();
await deliverDiscordReply({
cfg,
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: deliveryRest,
runtime,
replyToId,
replyToMode,
textLimit,
maxLinesPerMessage,
tableMode,
chunkMode,
sessionKey: ctxPayload.SessionKey,
threadBindings,
mediaLocalRoots,
});
replyReference.markSent();
observer?.onFinalReplyDelivered?.();
return true;
},
onPreviewFinalized: () => {
finalizedViaPreviewMessage = true;
replyReference.markSent();
observer?.onFinalReplyDelivered?.();
return;
} catch (err) {
},
logPreviewEditFailure: (err) => {
logVerbose(
`discord: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
}
// Check if stop() flushed a message we can edit
if (!finalizedViaPreviewMessage) {
await draftStream.stop();
if (isProcessAborted(abortSignal)) {
return;
}
const messageIdAfterStop = draftStream.messageId();
if (
typeof messageIdAfterStop === "string" &&
typeof previewFinalText === "string" &&
!hasMedia &&
!hasExplicitReplyDirective &&
!payload.isError
) {
try {
notifyFinalReplyStart();
await editMessageDiscord(
deliverChannelId,
messageIdAfterStop,
{ content: previewFinalText },
{ rest: deliveryRest },
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
observer?.onFinalReplyDelivered?.();
return;
} catch (err) {
logVerbose(
`discord: post-stop preview edit failed; falling back to standard send (${String(err)})`,
);
}
}
}
// Clear the preview and fall through to standard delivery
if (!finalizedViaPreviewMessage) {
await draftStream.clear();
},
});
if (result !== "normal-skipped") {
return;
}
}
if (isProcessAborted(abortSignal)) {
@@ -1019,9 +1022,10 @@ export async function processDiscordMessage(
throw err;
} finally {
try {
// Must stop() first to flush debounced content before clear() wipes state.
await draftStream?.stop();
if (!finalizedViaPreviewMessage) {
if (!draftFinalDeliveryHandled) {
await draftStream?.discardPending();
}
if (!draftFinalDeliveryHandled && !finalizedViaPreviewMessage && draftStream?.messageId()) {
await draftStream?.clear();
}
} catch (err) {

View File

@@ -182,6 +182,7 @@ describe("createMatrixDraftStream", () => {
.mockReset()
.mockImplementation((text: string) => (text ? [text] : []));
convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text);
sendModuleMocks.editMessageMatrix.mockClear();
});
afterEach(() => {
@@ -503,6 +504,24 @@ describe("createMatrixDraftStream", () => {
);
});
it("discardPending cancels pending updates without creating another preview event", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("First draft");
await stream.flush();
stream.update("Pending draft");
await stream.discardPending();
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(sendModuleMocks.editMessageMatrix).not.toHaveBeenCalled();
expect(stream.eventId()).toBe("$evt1");
});
it("uses converted Matrix text when checking the single-event preview limit", async () => {
const log = vi.fn();
resolveTextChunkLimitMock.mockReturnValue(5);

View File

@@ -29,6 +29,8 @@ export type MatrixDraftStream = {
flush: () => Promise<void>;
/** Flush and mark this block as done. Returns the event ID if a message was sent. */
stop: () => Promise<string | undefined>;
/** Cancel pending draft updates without creating a new preview event. */
discardPending: () => Promise<void>;
/** Clear the MSC4357 live marker in place when the draft is kept as final text. */
finalizeLive: () => Promise<boolean>;
/** Reset state for the next text block (after tool calls). */
@@ -180,6 +182,12 @@ export function createMatrixDraftStream(params: {
return currentEventId;
};
const discardPending = async (): Promise<void> => {
stopped = true;
loop.stop();
await loop.waitForInFlight();
};
const reset = (): void => {
// Clear reply context unless preserveReplyId is set (replyToMode "all"),
// in which case subsequent blocks should keep replying to the original.
@@ -203,6 +211,7 @@ export function createMatrixDraftStream(params: {
},
flush: loop.flush,
stop,
discardPending,
finalizeLive,
reset,
eventId: () => currentEventId,

View File

@@ -3231,6 +3231,50 @@ describe("matrix monitor handler draft streaming", () => {
await finish();
});
it("does not create a throwaway draft for fast media-only finals", async () => {
const { dispatch, redactEventMock } = createStreamingHarness();
const { deliver, finish } = await dispatch();
await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" });
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
expect(editMessageMatrixMock).not.toHaveBeenCalled();
expect(redactEventMock).not.toHaveBeenCalled();
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("does not create a throwaway draft for fast error finals", async () => {
const { dispatch, redactEventMock } = createStreamingHarness();
const { deliver, finish } = await dispatch();
await deliver({ text: "Something failed", isError: true } as never, { kind: "final" });
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
expect(editMessageMatrixMock).not.toHaveBeenCalled();
expect(redactEventMock).not.toHaveBeenCalled();
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("redacts existing drafts for text error finals and uses normal delivery", async () => {
const { dispatch, redactEventMock } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Something failed", isError: true } as never, { kind: "final" });
expect(editMessageMatrixMock).not.toHaveBeenCalled();
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("finalizes partial drafts before reusing unchanged media captions", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "partial" });
const { deliver, opts, finish } = await dispatch();

View File

@@ -121,6 +121,7 @@ type MatrixAllowBotsMode = "off" | "mentions" | "all";
type MatrixDraftStreamHandle = {
update: (text: string) => void;
stop: () => Promise<string | undefined>;
discardPending: () => Promise<void>;
eventId: () => string | undefined;
mustDeliverFinalNormally: () => boolean;
matchesPreparedText: (text: string) => boolean;
@@ -1547,10 +1548,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (draftStream && info.kind !== "tool" && !payload.isCompactionNotice) {
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
await draftStream.stop();
const draftEventId = draftStream.eventId();
if (draftConsumed) {
await draftStream.discardPending();
await deliverMatrixReplies({
cfg,
replies: [payload],
@@ -1572,11 +1571,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
replyToMode !== "off" &&
!threadTarget &&
payloadReplyToId !== currentDraftReplyToId;
const mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally();
let mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally();
const canPotentiallyFinalizeDraft =
Boolean(payload.text?.trim()) &&
!payload.isError &&
!payloadReplyMismatch &&
!mustDeliverFinalNormally;
if (canPotentiallyFinalizeDraft) {
await draftStream.stop();
mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally();
} else {
await draftStream.discardPending();
}
const draftEventId = draftStream.eventId();
if (
draftEventId &&
payload.text &&
!payload.isError &&
!hasMedia &&
!payloadReplyMismatch &&
!mustDeliverFinalNormally
@@ -1666,7 +1679,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
draftConsumed = true;
} else {
const draftRedacted =
Boolean(draftEventId) && (payloadReplyMismatch || mustDeliverFinalNormally);
Boolean(draftEventId) &&
(payload.isError || payloadReplyMismatch || mustDeliverFinalNormally);
if (draftRedacted && draftEventId) {
await redactMatrixDraftEvent(client, roomId, draftEventId);
}

View File

@@ -100,6 +100,45 @@ describe("createMattermostDraftStream", () => {
expect(stream.postId()).toBeUndefined();
});
it("discardPending keeps the preview post but ignores later updates", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
rootId: "root-1",
throttleMs: 0,
});
stream.update("Working...");
await stream.flush();
await stream.discardPending();
stream.update("Late update");
await stream.flush();
expect(calls).toHaveLength(1);
expect(calls[0]?.path).toBe("/posts");
expect(stream.postId()).toBe("post-1");
});
it("seal keeps the preview post and cancels pending final overwrites", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
rootId: "root-1",
throttleMs: 0,
});
stream.update("Working...");
await stream.flush();
stream.update("Stale final draft");
await stream.seal();
expect(calls).toHaveLength(1);
expect(calls[0]?.path).toBe("/posts");
expect(stream.postId()).toBe("post-1");
});
it("stop flushes the last pending update and ignores later ones", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({

View File

@@ -14,6 +14,8 @@ export type MattermostDraftStream = {
flush: () => Promise<void>;
postId: () => string | undefined;
clear: () => Promise<void>;
discardPending: () => Promise<void>;
seal: () => Promise<void>;
stop: () => Promise<void>;
forceNewMessage: () => void;
};
@@ -95,7 +97,7 @@ export function createMattermostDraftStream(params: {
}
};
const { loop, update, stop, clear } = createFinalizableDraftLifecycle({
const { loop, update, stop, clear, discardPending, seal } = createFinalizableDraftLifecycle({
throttleMs,
state: streamState,
sendOrEditStreamMessage,
@@ -125,6 +127,8 @@ export function createMattermostDraftStream(params: {
flush: loop.flush,
postId: () => streamPostId,
clear,
discardPending,
seal,
stop,
forceNewMessage,
};

View File

@@ -66,7 +66,8 @@ function createDraftStreamMock(postId: string | undefined = "preview-post-1") {
flush: vi.fn(async () => {}),
postId: vi.fn(() => postId),
clear: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
discardPending: vi.fn(async () => {}),
seal: vi.fn(async () => {}),
};
}
@@ -267,6 +268,8 @@ describe("deliverMattermostReplyWithDraftPreview", () => {
});
expect(deliverFinal).toHaveBeenCalledTimes(1);
expect(draftStream.flush).not.toHaveBeenCalled();
expect(draftStream.discardPending).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(updateMattermostPostSpy).not.toHaveBeenCalled();
});
@@ -291,6 +294,29 @@ describe("deliverMattermostReplyWithDraftPreview", () => {
deliverFinal,
});
expect(deliverFinal).toHaveBeenCalledTimes(1);
expect(draftStream.flush).not.toHaveBeenCalled();
expect(draftStream.discardPending).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("does not flush error finals before normal delivery", async () => {
const draftStream = createDraftStreamMock();
const deliverFinal = vi.fn(async () => {});
await deliverMattermostReplyWithDraftPreview({
payload: { text: "Error", isError: true } as never,
info: { kind: "final" },
client: createMattermostClientMock(),
draftStream,
effectiveReplyToId: "thread-root-1",
resolvePreviewFinalText: (text) => text?.trim(),
previewState: { finalizedViaPreviewPost: false },
logVerboseMessage: vi.fn(),
deliverFinal,
});
expect(draftStream.flush).not.toHaveBeenCalled();
expect(deliverFinal).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
@@ -316,8 +342,9 @@ describe("deliverMattermostReplyWithDraftPreview", () => {
"preview-post-1",
expect.objectContaining({ message: "Final answer" }),
);
expect(draftStream.stop).toHaveBeenCalledTimes(1);
expect(draftStream.stop.mock.invocationCallOrder[0]).toBeLessThan(
expect(draftStream.flush).toHaveBeenCalledTimes(1);
expect(draftStream.seal).toHaveBeenCalledTimes(1);
expect(draftStream.seal.mock.invocationCallOrder[0]).toBeLessThan(
updateMattermostPostSpy.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(deliverFinal).not.toHaveBeenCalled();
@@ -343,6 +370,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => {
}),
).rejects.toThrow("send failed");
expect(draftStream.discardPending).toHaveBeenCalledTimes(1);
expect(draftStream.clear).not.toHaveBeenCalled();
expect(updateMattermostPostSpy).not.toHaveBeenCalledWith(
expect.anything(),

View File

@@ -1,3 +1,4 @@
import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle";
import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import { isPrivateNetworkOptInEnabled } from "openclaw/plugin-sdk/ssrf-runtime";
import {
@@ -276,7 +277,7 @@ type MattermostDraftPreviewDeliverParams = {
client: MattermostClient;
draftStream: Pick<
ReturnType<typeof createMattermostDraftStream>,
"flush" | "postId" | "clear" | "stop"
"flush" | "postId" | "clear" | "discardPending" | "seal"
>;
effectiveReplyToId?: string;
resolvePreviewFinalText: (text?: string) => string | undefined;
@@ -292,65 +293,49 @@ export async function deliverMattermostReplyWithDraftPreview(
return;
}
const isFinal = params.info.kind === "final";
let previewPostId: string | undefined;
if (isFinal) {
await params.draftStream.flush();
const hasMedia =
Boolean(params.payload.mediaUrl) || (params.payload.mediaUrls?.length ?? 0) > 0;
const previewFinalText = params.resolvePreviewFinalText(params.payload.text);
previewPostId = params.draftStream.postId();
await deliverFinalizableDraftPreview({
kind: params.info.kind,
payload: params.payload,
draft: {
flush: params.draftStream.flush,
clear: params.draftStream.clear,
discardPending: params.draftStream.discardPending,
seal: params.draftStream.seal,
id: params.draftStream.postId,
},
buildFinalEdit: (payload) => {
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const previewFinalText = params.resolvePreviewFinalText(payload.text);
if (
typeof previewPostId === "string" &&
!hasMedia &&
typeof previewFinalText === "string" &&
!params.payload.isError &&
canFinalizeMattermostPreviewInPlace({
previewRootId: params.effectiveReplyToId,
threadRootId: params.effectiveReplyToId,
replyToId: params.payload.replyToId,
})
) {
try {
// Seal the preview before the final edit so late draft events cannot
// patch over the finalized visible message.
await params.draftStream.stop();
await updateMattermostPost(params.client, previewPostId, {
message: previewFinalText,
});
params.previewState.finalizedViaPreviewPost = true;
return;
} catch (err) {
params.logVerboseMessage(
`mattermost preview final edit failed; falling back to normal send (${String(err)})`,
);
if (
hasMedia ||
typeof previewFinalText !== "string" ||
payload.isError ||
!canFinalizeMattermostPreviewInPlace({
previewRootId: params.effectiveReplyToId,
threadRootId: params.effectiveReplyToId,
replyToId: payload.replyToId,
})
) {
return undefined;
}
}
}
let finalReplyDelivered = false;
try {
await params.deliverFinal();
finalReplyDelivered = true;
} finally {
if (
isFinal &&
typeof previewPostId === "string" &&
shouldClearMattermostDraftPreview({
finalizedViaPreviewPost: params.previewState.finalizedViaPreviewPost,
finalReplyDelivered,
})
) {
try {
await params.draftStream.clear();
} catch (err) {
params.logVerboseMessage(
`mattermost draft preview clear failed after successful final delivery (${String(err)})`,
);
}
}
}
return { message: previewFinalText };
},
editFinal: async (previewPostId, edit) => {
await updateMattermostPost(params.client, previewPostId, edit);
},
deliverNormally: async () => {
await params.deliverFinal();
},
onPreviewFinalized: () => {
params.previewState.finalizedViaPreviewPost = true;
},
logPreviewEditFailure: (err) => {
params.logVerboseMessage(
`mattermost preview final edit failed; falling back to normal send (${String(err)})`,
);
},
});
}
export function resolveMattermostEffectiveReplyToId(params: {

View File

@@ -132,6 +132,22 @@ describe("createSlackDraftStream", () => {
expect(stream.channelId()).toBeUndefined();
});
it("discardPending stops late updates without deleting the visible preview", async () => {
const { stream, send, edit, remove } = createDraftStreamHarness();
stream.update("hello");
await stream.flush();
await stream.discardPending();
stream.update("late");
await stream.flush();
expect(send).toHaveBeenCalledTimes(1);
expect(edit).not.toHaveBeenCalled();
expect(remove).not.toHaveBeenCalled();
expect(stream.messageId()).toBe("111.222");
expect(stream.channelId()).toBe("C123");
});
it("clear is a no-op when no preview message exists", async () => {
const { stream, remove } = createDraftStreamHarness();

View File

@@ -10,6 +10,8 @@ export type SlackDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
clear: () => Promise<void>;
discardPending: () => Promise<void>;
seal: () => Promise<void>;
stop: () => void;
forceNewMessage: () => void;
messageId: () => string | undefined;
@@ -95,9 +97,13 @@ export function createSlackDraftStream(params: {
loop.stop();
};
const clear = async () => {
const discardPending = async () => {
stop();
await loop.waitForInFlight();
};
const clear = async () => {
await discardPending();
const channelId = streamChannelId;
const messageId = streamMessageId;
streamChannelId = undefined;
@@ -129,6 +135,8 @@ export function createSlackDraftStream(params: {
update: loop.update,
flush: loop.flush,
clear,
discardPending,
seal: discardPending,
stop,
forceNewMessage,
messageId: () => streamMessageId,

View File

@@ -9,7 +9,7 @@ const deliverRepliesMock = vi.fn(async () => {});
const finalizeSlackPreviewEditMock = vi.fn(async () => {});
let mockedDispatchSequence: Array<{
kind: "tool" | "block" | "final";
payload: { text: string };
payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] };
}> = [];
const noop = () => {};
@@ -20,6 +20,8 @@ function createDraftStreamStub() {
update: noop,
flush: noopAsync,
clear: noopAsync,
discardPending: noopAsync,
seal: noopAsync,
stop: noop,
forceNewMessage: noop,
messageId: () => "171234.567",
@@ -213,6 +215,7 @@ vi.mock("../config.runtime.js", () => ({
vi.mock("../replies.js", () => ({
createSlackReplyDeliveryPlan: () => ({
peekThreadTs: () => THREAD_TS,
nextThreadTs: () => THREAD_TS,
markSent: () => {},
}),
@@ -234,7 +237,7 @@ vi.mock("../reply.runtime.js", () => ({
dispatchInboundMessage: async (params: {
dispatcher: {
deliver: (
payload: { text: string },
payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] },
info: { kind: "tool" | "block" | "final" },
) => Promise<void>;
};
@@ -293,7 +296,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(2);
expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(1);
expect(deliverRepliesMock).toHaveBeenCalledTimes(2);
expect(deliverRepliesMock).toHaveBeenNthCalledWith(
1,
@@ -310,4 +313,54 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
}),
);
});
it("does not flush draft previews for media finals before normal delivery", async () => {
const draftStream = {
...createDraftStreamStub(),
flush: vi.fn(noopAsync),
clear: vi.fn(noopAsync),
discardPending: vi.fn(noopAsync),
seal: vi.fn(noopAsync),
};
createSlackDraftStreamMock.mockReturnValueOnce(draftStream);
mockedDispatchSequence = [
{
kind: "final",
payload: { text: "Photo", mediaUrl: "https://example.com/a.png" },
},
];
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(draftStream.flush).not.toHaveBeenCalled();
expect(draftStream.discardPending).toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
});
it("does not flush draft previews for error finals before normal delivery", async () => {
const draftStream = {
...createDraftStreamStub(),
flush: vi.fn(noopAsync),
clear: vi.fn(noopAsync),
discardPending: vi.fn(noopAsync),
seal: vi.fn(noopAsync),
};
createSlackDraftStreamMock.mockReturnValueOnce(draftStream);
mockedDispatchSequence = [
{
kind: "final",
payload: { text: "Something failed", isError: true },
},
];
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(draftStream.flush).not.toHaveBeenCalled();
expect(draftStream.discardPending).toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
});
});

View File

@@ -7,6 +7,7 @@ import {
removeAckReactionAfterReply,
type StatusReactionAdapter,
} from "openclaw/plugin-sdk/channel-feedback";
import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
resolveChannelStreamingBlockEnabled,
@@ -580,45 +581,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
const reply = resolveSendableOutboundReplyParts(payload);
const slackBlocks = readSlackReplyBlocks(payload);
const draftMessageId = draftStream?.messageId();
const draftChannelId = draftStream?.channelId();
const trimmedFinalText = reply.trimmedText;
const canFinalizeViaPreviewEdit =
previewStreamingEnabled &&
streamMode !== "status_final" &&
!reply.hasMedia &&
!payload.isError &&
(trimmedFinalText.length > 0 || Boolean(slackBlocks?.length)) &&
typeof draftMessageId === "string" &&
typeof draftChannelId === "string";
if (canFinalizeViaPreviewEdit) {
const finalThreadTs = usedReplyThreadTs ?? statusThreadTs;
if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: finalThreadTs })) {
observedReplyDelivery = true;
return;
}
draftStream?.stop();
try {
await finalizeSlackPreviewEdit({
client: ctx.app.client,
token: ctx.botToken,
accountId: account.accountId,
channelId: draftChannelId,
messageId: draftMessageId,
text: normalizeSlackOutboundText(trimmedFinalText),
...(slackBlocks?.length ? { blocks: slackBlocks } : {}),
threadTs: finalThreadTs,
});
observedReplyDelivery = true;
deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs });
return;
} catch (err) {
logVerbose(
`slack: preview final edit failed; falling back to standard send (${formatErrorMessage(err)})`,
);
}
} else if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) {
if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) {
try {
const statusChannelId = draftStream?.channelId();
const statusMessageId = draftStream?.messageId();
@@ -633,12 +598,75 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
} catch (err) {
logVerbose(`slack: status_final completion update failed (${formatErrorMessage(err)})`);
}
} else if (reply.hasMedia) {
await draftStream?.clear();
hasStreamedMessage = false;
}
await deliverNormally({ payload, kind: info.kind });
const result = await deliverFinalizableDraftPreview({
kind: info.kind,
payload,
draft: draftStream
? {
flush: draftStream.flush,
clear: draftStream.clear,
discardPending: draftStream.discardPending,
seal: draftStream.seal,
id: () => {
const channelId = draftStream.channelId();
const messageId = draftStream.messageId();
return channelId && messageId ? { channelId, messageId } : undefined;
},
}
: undefined,
buildFinalEdit: () => {
if (
!previewStreamingEnabled ||
streamMode === "status_final" ||
reply.hasMedia ||
payload.isError ||
(trimmedFinalText.length === 0 && !slackBlocks?.length)
) {
return undefined;
}
return {
text: normalizeSlackOutboundText(trimmedFinalText),
blocks: slackBlocks,
threadTs: usedReplyThreadTs ?? statusThreadTs,
};
},
editFinal: async (preview, edit) => {
if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: edit.threadTs })) {
return;
}
await finalizeSlackPreviewEdit({
client: ctx.app.client,
token: ctx.botToken,
accountId: account.accountId,
channelId: preview.channelId,
messageId: preview.messageId,
text: edit.text,
...(edit.blocks?.length ? { blocks: edit.blocks } : {}),
threadTs: edit.threadTs,
});
},
deliverNormally: async () => {
await deliverNormally({ payload, kind: info.kind });
},
onPreviewFinalized: (_preview) => {
const finalThreadTs = usedReplyThreadTs ?? statusThreadTs;
observedReplyDelivery = true;
replyPlan.markSent();
deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs });
},
logPreviewEditFailure: (err) => {
logVerbose(
`slack: preview final edit failed; falling back to standard send (${formatErrorMessage(err)})`,
);
},
});
if (result === "preview-finalized") {
return;
}
},
onError: (err, info) => {
runtime.error?.(danger(`slack ${info.kind} reply failed: ${formatErrorMessage(err)}`));
@@ -653,13 +681,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
accountId: account.accountId,
maxChars: Math.min(ctx.textLimit, SLACK_TEXT_LIMIT),
resolveThreadTs: () => {
const ts = replyPlan.nextThreadTs();
const ts = replyPlan.peekThreadTs();
if (ts) {
usedReplyThreadTs ??= ts;
}
return ts;
},
onMessageSent: () => replyPlan.markSent(),
log: logVerbose,
warn: logVerbose,
})
@@ -826,8 +853,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
} catch (err) {
dispatchError = err;
} finally {
await draftStream?.flush();
draftStream?.stop();
await draftStream?.discardPending();
markDispatchIdle();
}

View File

@@ -6,6 +6,7 @@ vi.mock("../send.js", () => ({
}));
let deliverReplies: typeof import("./replies.js").deliverReplies;
let createSlackReplyDeliveryPlan: typeof import("./replies.js").createSlackReplyDeliveryPlan;
let resolveSlackThreadTs: typeof import("./replies.js").resolveSlackThreadTs;
import { deliverSlackSlashReplies } from "./replies.js";
@@ -23,7 +24,8 @@ function baseParams(overrides?: Record<string, unknown>) {
describe("deliverReplies identity passthrough", () => {
beforeAll(async () => {
({ deliverReplies, resolveSlackThreadTs } = await import("./replies.js"));
({ createSlackReplyDeliveryPlan, deliverReplies, resolveSlackThreadTs } =
await import("./replies.js"));
});
beforeEach(() => {
@@ -211,6 +213,29 @@ describe("resolveSlackThreadTs fallback classification", () => {
});
});
describe("createSlackReplyDeliveryPlan", () => {
it("lets draft previews inspect first thread targets without consuming them", () => {
const hasRepliedRef = { value: false };
const plan = createSlackReplyDeliveryPlan({
replyToMode: "first",
incomingThreadTs: undefined,
messageTs: "9999999999.999999",
hasRepliedRef,
isThreadReply: false,
});
expect(plan.peekThreadTs()).toBe("9999999999.999999");
expect(plan.peekThreadTs()).toBe("9999999999.999999");
expect(hasRepliedRef.value).toBe(false);
plan.markSent();
expect(hasRepliedRef.value).toBe(true);
expect(plan.peekThreadTs()).toBeUndefined();
expect(plan.nextThreadTs()).toBeUndefined();
});
});
describe("deliverSlackSlashReplies chunking", () => {
it("keeps a 4205-character reply in a single slash response by default", async () => {
const respond = vi.fn(async () => undefined);

View File

@@ -127,6 +127,7 @@ export function resolveSlackThreadTs(params: {
}
type SlackReplyDeliveryPlan = {
peekThreadTs: () => string | undefined;
nextThreadTs: () => string | undefined;
markSent: () => void;
};
@@ -168,6 +169,7 @@ export function createSlackReplyDeliveryPlan(params: {
isThreadReply: params.isThreadReply,
});
return {
peekThreadTs: () => replyReference.peek(),
nextThreadTs: () => replyReference.use(),
markSent: () => {
replyReference.markSent();