fix(telegram): send fresh finals for stale previews (#72038)

* fix(telegram): send fresh finals for stale previews

* test(telegram): cover stale preview send fallback

* fix(telegram): keep stale archived preview fallback

* fix(telegram): clear stale active previews

* fix(telegram): reset preview state after fresh finals
This commit is contained in:
Rubén Cuevas
2026-04-26 18:44:30 -04:00
committed by GitHub
parent 084dde89fd
commit a08b65a90a
9 changed files with 236 additions and 8 deletions

View File

@@ -433,6 +433,7 @@ export const dispatchTelegramMessage = async ({
archivedAnswerPreviews.push({
messageId: preview.messageId,
textSnapshot: preview.textSnapshot,
visibleSinceMs: preview.visibleSinceMs,
deleteIfUnused: true,
});
}
@@ -539,6 +540,7 @@ export const dispatchTelegramMessage = async ({
archivedAnswerPreviews.push({
messageId: previewMessageId,
textSnapshot: answerLane.lastPartialText,
visibleSinceMs: answerLane.stream?.visibleSinceMs?.(),
deleteIfUnused: false,
});
}

View File

@@ -6,6 +6,7 @@ export type TestDraftStream = {
update: ReturnType<typeof vi.fn<(text: string) => void>>;
flush: ReturnType<typeof vi.fn<() => Promise<void>>>;
messageId: ReturnType<typeof vi.fn<() => number | undefined>>;
visibleSinceMs: ReturnType<typeof vi.fn<() => number | undefined>>;
previewMode: ReturnType<typeof vi.fn<() => DraftPreviewMode>>;
previewRevision: ReturnType<typeof vi.fn<() => number>>;
lastDeliveredText: ReturnType<typeof vi.fn<() => string>>;
@@ -25,8 +26,10 @@ export function createTestDraftStream(params?: {
onStop?: () => void | Promise<void>;
onDiscard?: () => void | Promise<void>;
clearMessageIdOnForceNew?: boolean;
visibleSinceMs?: number;
}): TestDraftStream {
let messageId = params?.messageId;
let visibleSinceMs = params?.visibleSinceMs;
let previewRevision = 0;
let lastDeliveredText = "";
return {
@@ -37,6 +40,7 @@ export function createTestDraftStream(params?: {
}),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockImplementation(() => messageId),
visibleSinceMs: vi.fn().mockImplementation(() => visibleSinceMs),
previewMode: vi.fn().mockReturnValue(params?.previewMode ?? "message"),
previewRevision: vi.fn().mockImplementation(() => previewRevision),
lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText),
@@ -52,16 +56,19 @@ export function createTestDraftStream(params?: {
if (params?.clearMessageIdOnForceNew) {
messageId = undefined;
}
visibleSinceMs = undefined;
}),
sendMayHaveLanded: vi.fn().mockReturnValue(false),
setMessageId: (value: number | undefined) => {
messageId = value;
visibleSinceMs = value == null ? undefined : Date.now();
},
};
}
export function createSequencedTestDraftStream(startMessageId = 1001): TestDraftStream {
let activeMessageId: number | undefined;
let visibleSinceMs: number | undefined;
let nextMessageId = startMessageId;
let previewRevision = 0;
let lastDeliveredText = "";
@@ -69,12 +76,14 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
update: vi.fn().mockImplementation((text: string) => {
if (activeMessageId == null) {
activeMessageId = nextMessageId++;
visibleSinceMs = Date.now();
}
previewRevision += 1;
lastDeliveredText = text.trimEnd();
}),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockImplementation(() => activeMessageId),
visibleSinceMs: vi.fn().mockImplementation(() => visibleSinceMs),
previewMode: vi.fn().mockReturnValue("message"),
previewRevision: vi.fn().mockImplementation(() => previewRevision),
lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText),
@@ -84,10 +93,12 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
materialize: vi.fn().mockImplementation(async () => activeMessageId),
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;
visibleSinceMs = undefined;
}),
sendMayHaveLanded: vi.fn().mockReturnValue(false),
setMessageId: (value: number | undefined) => {
activeMessageId = value;
visibleSinceMs = value == null ? undefined : Date.now();
},
};
}

View File

@@ -161,6 +161,28 @@ describe("createTelegramDraftStream", () => {
expect(api.sendMessageDraft).not.toHaveBeenCalled();
});
it("tracks when a message preview first became visible", async () => {
vi.useFakeTimers();
try {
vi.setSystemTime(new Date("2026-04-26T01:00:00.000Z"));
const api = createMockDraftApi();
const stream = createDraftStream(api, { previewTransport: "message" });
stream.update("Hello");
await stream.flush();
expect(stream.visibleSinceMs?.()).toBe(Date.parse("2026-04-26T01:00:00.000Z"));
vi.setSystemTime(new Date("2026-04-26T01:01:00.000Z"));
stream.update("Hello again");
await stream.flush();
expect(stream.visibleSinceMs?.()).toBe(Date.parse("2026-04-26T01:00:00.000Z"));
} finally {
vi.useRealTimers();
}
});
it("falls back to message transport when sendMessageDraft is unavailable", async () => {
const api = createMockDraftApi();
delete (api as { sendMessageDraft?: unknown }).sendMessageDraft;
@@ -436,6 +458,23 @@ describe("createTelegramDraftStream", () => {
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined);
});
it("creates new message after cleanup and forceNewMessage", async () => {
const { api, stream } = createForceNewMessageHarness();
stream.update("Stale preview");
await stream.flush();
await stream.clear();
expect(api.deleteMessage).toHaveBeenCalledWith(123, 17);
stream.forceNewMessage();
stream.update("Next preview");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(2);
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "Next preview", undefined);
});
it("sends first update immediately after forceNewMessage within throttle window", async () => {
vi.useFakeTimers();
try {
@@ -487,6 +526,7 @@ describe("createTelegramDraftStream", () => {
messageId: 17,
textSnapshot: "Message A partial",
parseMode: undefined,
visibleSinceMs: expect.any(Number),
});
expect(api.sendMessage).toHaveBeenCalledTimes(2);
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Message B partial", undefined);

View File

@@ -94,6 +94,7 @@ export type TelegramDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
messageId: () => number | undefined;
visibleSinceMs?: () => number | undefined;
previewMode?: () => "message" | "draft";
previewRevision?: () => number;
lastDeliveredText?: () => string;
@@ -118,6 +119,7 @@ type SupersededTelegramPreview = {
messageId: number;
textSnapshot: string;
parseMode?: "HTML";
visibleSinceMs?: number;
};
export function createTelegramDraftStream(params: {
@@ -174,6 +176,7 @@ export function createTelegramDraftStream(params: {
const streamState = { stopped: false, final: false };
let messageSendAttempted = false;
let streamMessageId: number | undefined;
let streamVisibleSinceMs: number | undefined;
let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined;
let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message";
let lastSentText = "";
@@ -226,6 +229,7 @@ export function createTelegramDraftStream(params: {
sendGeneration,
}: PreviewSendParams): Promise<boolean> => {
if (typeof streamMessageId === "number") {
streamVisibleSinceMs ??= Date.now();
if (renderedParseMode) {
await params.api.editMessageText(chatId, streamMessageId, renderedText, {
parse_mode: renderedParseMode,
@@ -257,15 +261,18 @@ export function createTelegramDraftStream(params: {
return false;
}
const normalizedMessageId = Math.trunc(sentMessageId);
const visibleSinceMs = Date.now();
if (sendGeneration !== generation) {
params.onSupersededPreview?.({
messageId: normalizedMessageId,
textSnapshot: renderedText,
parseMode: renderedParseMode,
visibleSinceMs,
});
return true;
}
streamMessageId = normalizedMessageId;
streamVisibleSinceMs = visibleSinceMs;
return true;
};
const sendDraftTransportPreview = async ({
@@ -397,10 +404,12 @@ export function createTelegramDraftStream(params: {
};
const forceNewMessage = () => {
streamState.stopped = false;
streamState.final = false;
generation += 1;
messageSendAttempted = false;
streamMessageId = undefined;
streamVisibleSinceMs = undefined;
if (previewTransport === "draft") {
streamDraftId = allocateTelegramDraftId();
}
@@ -430,6 +439,7 @@ export function createTelegramDraftStream(params: {
const sentId = sent?.message_id;
if (typeof sentId === "number" && Number.isFinite(sentId)) {
streamMessageId = Math.trunc(sentId);
streamVisibleSinceMs = Date.now();
if (resolvedDraftApi != null && streamDraftId != null) {
const clearDraftId = streamDraftId;
const clearThreadParams =
@@ -454,6 +464,7 @@ export function createTelegramDraftStream(params: {
update,
flush: loop.flush,
messageId: () => streamMessageId,
visibleSinceMs: () => streamVisibleSinceMs,
previewMode: () => previewTransport,
previewRevision: () => previewRevision,
lastDeliveredText: () => lastDeliveredText,

View File

@@ -12,6 +12,7 @@ const MESSAGE_NOT_MODIFIED_RE =
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
const MESSAGE_NOT_FOUND_RE =
/400:\s*Bad Request:\s*message to edit not found|MESSAGE_ID_INVALID|message can't be edited/i;
const LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS = 60_000;
function extractErrorText(err: unknown): string {
return typeof err === "string"
@@ -55,6 +56,7 @@ export type DraftLaneState = {
export type ArchivedPreview = {
messageId: number;
textSnapshot: string;
visibleSinceMs?: number;
// Boundary-finalized previews should remain visible even if no matching
// final edit arrives; superseded previews can be safely deleted.
deleteIfUnused?: boolean;
@@ -92,6 +94,7 @@ type CreateLaneTextDelivererParams = {
deletePreviewMessage: (messageId: number) => Promise<void>;
log: (message: string) => void;
markDelivered: () => void;
now?: () => number;
};
type DeliverLaneTextParams = {
@@ -169,6 +172,14 @@ function shouldSkipRegressivePreviewUpdate(args: {
);
}
function isLongLivedPreview(visibleSinceMs: number | undefined, nowMs: number): boolean {
return (
typeof visibleSinceMs === "number" &&
Number.isFinite(visibleSinceMs) &&
nowMs - visibleSinceMs >= LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS
);
}
function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTargetResolution {
const lanePreviewMessageId = params.lane.stream?.messageId();
const previewMessageId =
@@ -187,11 +198,27 @@ function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTarget
export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
const readNow = () => params.now?.() ?? Date.now();
const markActivePreviewComplete = (laneName: LaneName) => {
params.activePreviewLifecycleByLane[laneName] = "complete";
params.retainPreviewOnCleanupByLane[laneName] = true;
};
const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft";
const isMessagePreviewLane = (lane: DraftLaneState) => !isDraftPreviewLane(lane);
const shouldUseFreshFinalForLane = (lane: DraftLaneState) =>
isMessagePreviewLane(lane) && isLongLivedPreview(lane.stream?.visibleSinceMs?.(), readNow());
const shouldUseFreshFinalForPreview = (lane: DraftLaneState, visibleSinceMs?: number) =>
isMessagePreviewLane(lane) && isLongLivedPreview(visibleSinceMs, readNow());
const clearActivePreviewAfterFreshFinal = async (lane: DraftLaneState, laneName: LaneName) => {
try {
await lane.stream?.clear();
} catch (err) {
params.log(`telegram: ${laneName} fresh final preview cleanup failed: ${String(err)}`);
}
lane.lastPartialText = "";
lane.hasStreamedMessage = false;
lane.stream?.forceNewMessage();
};
const canMaterializeDraftFinal = (
lane: DraftLaneState,
previewButtons?: TelegramInlineButtons,
@@ -444,6 +471,19 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
if (!archivedPreview) {
return undefined;
}
if (canEditViaPreview && shouldUseFreshFinalForPreview(lane, archivedPreview.visibleSinceMs)) {
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
if (delivered) {
try {
await params.deletePreviewMessage(archivedPreview.messageId);
} catch (err) {
params.log(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
return result("sent");
}
}
if (canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane,
@@ -551,6 +591,14 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
});
}
}
if (shouldUseFreshFinalForLane(lane)) {
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
if (delivered) {
await clearActivePreviewAfterFreshFinal(lane, laneName);
return result("sent");
}
}
const previewMessageId = lane.stream?.messageId();
const finalized = await tryUpdatePreviewForLane({
lane,

View File

@@ -2,6 +2,7 @@ import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { describe, expect, it, vi } from "vitest";
import { createTestDraftStream } from "./draft-stream.test-helpers.js";
import {
type ArchivedPreview,
createLaneTextDeliverer,
type DraftLaneState,
type LaneDeliveryResult,
@@ -17,9 +18,15 @@ function createHarness(params?: {
answerStream?: DraftLaneState["stream"];
answerHasStreamedMessage?: boolean;
answerLastPartialText?: string;
answerPreviewVisibleSinceMs?: number;
nowMs?: number;
}) {
const answer =
params?.answerStream ?? createTestDraftStream({ messageId: params?.answerMessageId });
params?.answerStream ??
createTestDraftStream({
messageId: params?.answerMessageId,
visibleSinceMs: params?.answerPreviewVisibleSinceMs,
});
const reasoning = createTestDraftStream();
const lanes: Record<LaneName, DraftLaneState> = {
answer: {
@@ -51,11 +58,7 @@ function createHarness(params?: {
const markDelivered = vi.fn();
const activePreviewLifecycleByLane = { answer: "transient", reasoning: "transient" } as const;
const retainPreviewOnCleanupByLane = { answer: false, reasoning: false } as const;
const archivedAnswerPreviews: Array<{
messageId: number;
textSnapshot: string;
deleteIfUnused?: boolean;
}> = [];
const archivedAnswerPreviews: ArchivedPreview[] = [];
const deliverLaneText = createLaneTextDeliverer({
lanes,
@@ -71,6 +74,7 @@ function createHarness(params?: {
deletePreviewMessage,
log,
markDelivered,
now: params?.nowMs != null ? () => params.nowMs! : undefined,
});
return {
@@ -347,6 +351,116 @@ describe("createLaneTextDeliverer", () => {
expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("preview final too long"));
});
it("sends a fresh final when a message preview is long lived", async () => {
const visibleSinceMs = 10_000;
const harness = createHarness({
answerMessageId: 999,
answerHasStreamedMessage: true,
answerLastPartialText: "Working...",
answerPreviewVisibleSinceMs: visibleSinceMs,
nowMs: visibleSinceMs + 60_000,
});
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(result.kind).toBe("sent");
expect(harness.stopDraftLane).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: HELLO_FINAL }),
);
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.answer.stream?.clear).toHaveBeenCalledTimes(1);
expect(harness.answer.stream?.forceNewMessage).toHaveBeenCalledTimes(1);
expect(harness.lanes.answer.hasStreamedMessage).toBe(false);
expect(harness.lanes.answer.lastPartialText).toBe("");
expect(harness.markDelivered).not.toHaveBeenCalled();
});
it("falls back to editing a long-lived preview when fresh final send returns false", async () => {
const visibleSinceMs = 10_000;
const harness = createHarness({
answerMessageId: 999,
answerHasStreamedMessage: true,
answerLastPartialText: "Working...",
answerPreviewVisibleSinceMs: visibleSinceMs,
nowMs: visibleSinceMs + 60_000,
});
harness.sendPayload.mockResolvedValueOnce(false);
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(expectPreviewFinalized(result)).toEqual({
content: HELLO_FINAL,
messageId: 999,
});
expect(harness.stopDraftLane).toHaveBeenCalledTimes(2);
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.editPreview).toHaveBeenCalledWith(
expect.objectContaining({
messageId: 999,
text: HELLO_FINAL,
}),
);
expect(harness.answer.stream?.clear).not.toHaveBeenCalled();
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("sends a fresh final for stale archived previews", async () => {
const visibleSinceMs = 10_000;
const harness = createHarness({
answerMessageId: 1001,
answerPreviewVisibleSinceMs: visibleSinceMs,
nowMs: visibleSinceMs + 60_000,
});
harness.archivedAnswerPreviews.push({
messageId: 222,
textSnapshot: "Working...",
visibleSinceMs,
deleteIfUnused: true,
});
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(result.kind).toBe("sent");
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: HELLO_FINAL }),
);
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.deletePreviewMessage).toHaveBeenCalledWith(222);
});
it("falls back to editing a stale archived preview when fresh final send returns false", async () => {
const visibleSinceMs = 10_000;
const harness = createHarness({
answerMessageId: 1001,
answerPreviewVisibleSinceMs: visibleSinceMs,
nowMs: visibleSinceMs + 60_000,
});
harness.archivedAnswerPreviews.push({
messageId: 222,
textSnapshot: "Working...",
visibleSinceMs,
deleteIfUnused: true,
});
harness.sendPayload.mockResolvedValueOnce(false);
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(expectPreviewFinalized(result)).toEqual({
content: HELLO_FINAL,
messageId: 222,
});
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.editPreview).toHaveBeenCalledWith(
expect.objectContaining({
messageId: 222,
text: HELLO_FINAL,
}),
);
expect(harness.deletePreviewMessage).not.toHaveBeenCalled();
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("materializes DM draft streaming final even when text is unchanged", async () => {
const answerStream = createTestDraftStream({ previewMode: "draft", messageId: 321 });
answerStream.materialize.mockResolvedValue(321);