fix(telegram): flush draft stream on assistant boundaries

This commit is contained in:
Ayaan Zaidi
2026-02-21 17:24:20 +05:30
parent 2b4b1f3f90
commit b0ec1a2651
2 changed files with 74 additions and 1 deletions

View File

@@ -82,6 +82,29 @@ describe("dispatchTelegramMessage draft streaming", () => {
};
}
function createFlushSequencedDraftStream(startMessageId = 2001) {
let activeMessageId: number | undefined;
let nextMessageId = startMessageId;
let pendingText = "";
return {
update: vi.fn().mockImplementation((text: string) => {
pendingText = text;
}),
flush: vi.fn().mockImplementation(async () => {
if (pendingText && activeMessageId == null) {
activeMessageId = nextMessageId++;
}
}),
messageId: vi.fn().mockImplementation(() => activeMessageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;
pendingText = "";
}),
};
}
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
@@ -577,6 +600,55 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliverReplies).not.toHaveBeenCalled();
});
it("flushes each assistant message boundary so previews stream separately before final delivery", async () => {
const answerDraftStream = createFlushSequencedDraftStream(2001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "First chunk" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Second chunk" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Third chunk" });
await dispatcherOptions.deliver({ text: "First final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Second final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Third final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "2001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.flush).toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
2001,
"First final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
2002,
"Second final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
3,
123,
2003,
"Third final",
expect.any(Object),
);
});
it.each(["block", "partial"] as const)(
"splits reasoning lane only when a later reasoning block starts (%s mode)",
async (streamMode) => {

View File

@@ -669,9 +669,10 @@ export const dispatchTelegramMessage = async ({
}
: undefined,
onAssistantMessageStart: answerLane.stream
? () => {
? async () => {
reasoningStepState.resetForNextStep();
if (answerLane.hasStreamedMessage) {
await answerLane.stream?.flush();
const previewMessageId = answerLane.stream?.messageId();
if (typeof previewMessageId === "number") {
archivedAnswerPreviews.push({