refactor(telegram): reset draft throttle on message rotation

This commit is contained in:
Ayaan Zaidi
2026-02-21 17:31:53 +05:30
parent b0ec1a2651
commit ed3bf4ff14
5 changed files with 42 additions and 73 deletions

View File

@@ -3,6 +3,7 @@ export type DraftStreamLoop = {
flush: () => Promise<void>;
stop: () => void;
resetPending: () => void;
resetThrottleWindow: () => void;
waitForInFlight: () => Promise<void>;
};
@@ -87,6 +88,13 @@ export function createDraftStreamLoop(params: {
resetPending: () => {
pendingText = "";
},
resetThrottleWindow: () => {
lastSentAt = 0;
if (timer) {
clearTimeout(timer);
timer = undefined;
}
},
waitForInFlight: async () => {
if (inFlightPromise) {
await inFlightPromise;

View File

@@ -82,29 +82,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
};
}
function createFlushSequencedDraftStream(startMessageId = 2001) {
let activeMessageId: number | undefined;
let nextMessageId = startMessageId;
let pendingText = "";
return {
update: vi.fn().mockImplementation((text: string) => {
pendingText = text;
}),
flush: vi.fn().mockImplementation(async () => {
if (pendingText && activeMessageId == null) {
activeMessageId = nextMessageId++;
}
}),
messageId: vi.fn().mockImplementation(() => activeMessageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;
pendingText = "";
}),
};
}
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
@@ -600,55 +577,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliverReplies).not.toHaveBeenCalled();
});
it("flushes each assistant message boundary so previews stream separately before final delivery", async () => {
const answerDraftStream = createFlushSequencedDraftStream(2001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "First chunk" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Second chunk" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Third chunk" });
await dispatcherOptions.deliver({ text: "First final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Second final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Third final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "2001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.flush).toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
2001,
"First final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
2002,
"Second final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
3,
123,
2003,
"Third final",
expect.any(Object),
);
});
it.each(["block", "partial"] as const)(
"splits reasoning lane only when a later reasoning block starts (%s mode)",
async (streamMode) => {

View File

@@ -672,7 +672,6 @@ export const dispatchTelegramMessage = async ({
? async () => {
reasoningStepState.resetForNextStep();
if (answerLane.hasStreamedMessage) {
await answerLane.stream?.flush();
const previewMessageId = answerLane.stream?.messageId();
if (typeof previewMessageId === "number") {
archivedAnswerPreviews.push({

View File

@@ -134,6 +134,39 @@ describe("createTelegramDraftStream", () => {
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined);
});
it("sends first update immediately after forceNewMessage within throttle window", async () => {
vi.useFakeTimers();
try {
const api = {
sendMessage: vi
.fn()
.mockResolvedValueOnce({ message_id: 17 })
.mockResolvedValueOnce({ message_id: 42 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
throttleMs: 1000,
});
stream.update("Hello");
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1));
stream.update("Hello edited");
expect(api.editMessageText).not.toHaveBeenCalled();
stream.forceNewMessage();
stream.update("Second message");
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(2));
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "Second message", undefined);
} finally {
vi.useRealTimers();
}
});
it("supports rendered previews with parse_mode", async () => {
const api = createMockDraftApi();
const stream = createTelegramDraftStream({

View File

@@ -167,6 +167,7 @@ export function createTelegramDraftStream(params: {
lastSentText = "";
lastSentParseMode = undefined;
loop.resetPending();
loop.resetThrottleWindow();
};
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);