mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-03 05:50:23 +00:00
fix(telegram): split stop-created preview finalization path
Refactor lane preview finalization into explicit branches so stop-created previews never duplicate sends when edit fails. Add Telegram dispatch regressions for: - stop-created preview edit failure (no duplicate send) - existing preview edit failure (fallback send preserved) - missing message id after stop-created flush (fallback send) Thanks @obviyus for the original preview-prime direction in #27449. Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -416,6 +416,83 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect(answerDraftStream.stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not duplicate final delivery when stop-created preview edit fails", async () => {
|
||||
let messageId: number | undefined;
|
||||
const draftStream = {
|
||||
update: vi.fn(),
|
||||
flush: vi.fn().mockResolvedValue(undefined),
|
||||
messageId: vi.fn().mockImplementation(() => messageId),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockImplementation(async () => {
|
||||
messageId = 777;
|
||||
}),
|
||||
forceNewMessage: vi.fn(),
|
||||
};
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "Short final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockRejectedValue(new Error("500: edit failed after stop flush"));
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(123, 777, "Short final", expect.any(Object));
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
expect(draftStream.stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("falls back to normal delivery when existing preview edit fails", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Hel" });
|
||||
await dispatcherOptions.deliver({ text: "Hello final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockRejectedValue(new Error("500: preview edit failed"));
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Hello final", expect.any(Object));
|
||||
expect(deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
replies: [expect.objectContaining({ text: "Hello final" })],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to normal delivery when stop-created preview has no message id", async () => {
|
||||
const draftStream = {
|
||||
update: vi.fn(),
|
||||
flush: vi.fn().mockResolvedValue(undefined),
|
||||
messageId: vi.fn().mockReturnValue(undefined),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
forceNewMessage: vi.fn(),
|
||||
};
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "Short final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(editMessageTelegram).not.toHaveBeenCalled();
|
||||
expect(deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
replies: [expect.objectContaining({ text: "Short final" })],
|
||||
}),
|
||||
);
|
||||
expect(draftStream.stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not overwrite finalized preview when additional final payloads are sent", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
|
||||
@@ -104,6 +104,55 @@ type ConsumeArchivedAnswerPreviewParams = {
|
||||
export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
|
||||
|
||||
const shouldSkipRegressivePreviewUpdate = (args: {
|
||||
currentPreviewText: string | undefined;
|
||||
text: string;
|
||||
skipRegressive: "always" | "existingOnly";
|
||||
hadPreviewMessage: boolean;
|
||||
}): boolean =>
|
||||
Boolean(args.currentPreviewText) &&
|
||||
args.currentPreviewText.startsWith(args.text) &&
|
||||
args.text.length < args.currentPreviewText.length &&
|
||||
(args.skipRegressive === "always" || args.hadPreviewMessage);
|
||||
|
||||
const tryEditPreviewMessage = async (args: {
|
||||
laneName: LaneName;
|
||||
messageId: number;
|
||||
text: string;
|
||||
context: "final" | "update";
|
||||
previewButtons?: TelegramInlineButtons;
|
||||
updateLaneSnapshot: boolean;
|
||||
lane: DraftLaneState;
|
||||
treatEditFailureAsDelivered: boolean;
|
||||
}): Promise<boolean> => {
|
||||
try {
|
||||
await params.editPreview({
|
||||
laneName: args.laneName,
|
||||
messageId: args.messageId,
|
||||
text: args.text,
|
||||
previewButtons: args.previewButtons,
|
||||
context: args.context,
|
||||
});
|
||||
if (args.updateLaneSnapshot) {
|
||||
args.lane.lastPartialText = args.text;
|
||||
}
|
||||
params.markDelivered();
|
||||
return true;
|
||||
} catch (err) {
|
||||
if (args.treatEditFailureAsDelivered) {
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview ${args.context} edit failed after stop-created flush; treating as delivered (${String(err)})`,
|
||||
);
|
||||
params.markDelivered();
|
||||
return true;
|
||||
}
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const tryUpdatePreviewForLane = async ({
|
||||
lane,
|
||||
laneName,
|
||||
@@ -122,12 +171,39 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
const lanePreviewMessageId = lane.stream.messageId();
|
||||
const hadPreviewMessage =
|
||||
typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number";
|
||||
if (stopBeforeEdit) {
|
||||
if (!hadPreviewMessage && context === "final") {
|
||||
// If debounce prevented the first preview, replace stale pending partial text
|
||||
// before final stop() flush sends the first visible preview.
|
||||
lane.stream.update(text);
|
||||
const stopCreatesFirstPreview = stopBeforeEdit && !hadPreviewMessage && context === "final";
|
||||
if (stopCreatesFirstPreview) {
|
||||
// Final stop() can create the first visible preview message.
|
||||
// Prime pending text so the stop flush sends the final text snapshot.
|
||||
lane.stream.update(text);
|
||||
await params.stopDraftLane(lane);
|
||||
const previewMessageId = lane.stream.messageId();
|
||||
if (typeof previewMessageId !== "number") {
|
||||
return false;
|
||||
}
|
||||
const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane);
|
||||
const shouldSkipRegressive = shouldSkipRegressivePreviewUpdate({
|
||||
currentPreviewText,
|
||||
text,
|
||||
skipRegressive,
|
||||
hadPreviewMessage,
|
||||
});
|
||||
if (shouldSkipRegressive) {
|
||||
params.markDelivered();
|
||||
return true;
|
||||
}
|
||||
return tryEditPreviewMessage({
|
||||
laneName,
|
||||
messageId: previewMessageId,
|
||||
text,
|
||||
context,
|
||||
previewButtons,
|
||||
updateLaneSnapshot,
|
||||
lane,
|
||||
treatEditFailureAsDelivered: true,
|
||||
});
|
||||
}
|
||||
if (stopBeforeEdit) {
|
||||
await params.stopDraftLane(lane);
|
||||
}
|
||||
const previewMessageId =
|
||||
@@ -138,34 +214,26 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
return false;
|
||||
}
|
||||
const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane);
|
||||
const shouldSkipRegressive =
|
||||
Boolean(currentPreviewText) &&
|
||||
currentPreviewText.startsWith(text) &&
|
||||
text.length < currentPreviewText.length &&
|
||||
(skipRegressive === "always" || hadPreviewMessage);
|
||||
const shouldSkipRegressive = shouldSkipRegressivePreviewUpdate({
|
||||
currentPreviewText,
|
||||
text,
|
||||
skipRegressive,
|
||||
hadPreviewMessage,
|
||||
});
|
||||
if (shouldSkipRegressive) {
|
||||
params.markDelivered();
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
await params.editPreview({
|
||||
laneName,
|
||||
messageId: previewMessageId,
|
||||
text,
|
||||
previewButtons,
|
||||
context,
|
||||
});
|
||||
if (updateLaneSnapshot) {
|
||||
lane.lastPartialText = text;
|
||||
}
|
||||
params.markDelivered();
|
||||
return true;
|
||||
} catch (err) {
|
||||
params.log(
|
||||
`telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
return tryEditPreviewMessage({
|
||||
laneName,
|
||||
messageId: previewMessageId,
|
||||
text,
|
||||
context,
|
||||
previewButtons,
|
||||
updateLaneSnapshot,
|
||||
lane,
|
||||
treatEditFailureAsDelivered: false,
|
||||
});
|
||||
};
|
||||
|
||||
const consumeArchivedAnswerPreviewForFinal = async ({
|
||||
|
||||
Reference in New Issue
Block a user