fix: keep telegram transient preview across compaction retry (#66939) (thanks @rubencu)

* fix(telegram): keep transient previews across compaction

* test(telegram): cover suppressed approval previews after compaction

* fix(telegram): preserve delayed message-start boundaries

* fix: keep telegram transient preview across compaction retry (#66939) (thanks @rubencu)

---------

Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
Rubén Cuevas
2026-04-17 01:27:46 -04:00
committed by GitHub
parent 7e18c07e41
commit c65f356ddc
3 changed files with 305 additions and 3 deletions

View File

@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
- Models/config: preserve an existing `models.json` provider `baseUrl` during merge-mode regeneration so custom endpoints do not get reset on restart. (#67893) Thanks @lawrence3699.
- Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus.
- Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus.
- Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu.
## 2026.4.15

View File

@@ -800,6 +800,82 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("preserves pre-rotation skip until queued message-start callbacks flush", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await replyOptions?.onPartialReply?.({ text: "Message B early" });
void replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
});
it("does not double-rotate when assistant_message_start arrives after final delivery drains", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await replyOptions?.onPartialReply?.({ text: "Message B early" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
await replyOptions?.onAssistantMessageStart?.();
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
});
it("clears active preview even when an unrelated boundary archive exists", async () => {
const answerDraftStream = createDraftStream(999);
answerDraftStream.materialize.mockResolvedValue(4321);
@@ -1054,6 +1130,204 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B second chunk");
});
it("does not rotate the streamed preview when compaction retries replay the same assistant message", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onPartialReply?.({ text: "Message A partial extended" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(answerDraftStream.materialize).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
1001,
"Message A final",
expect.any(Object),
);
});
it("clears the compaction replay skip after the retried message finalizes", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message A partial extended" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
});
it("preserves the compaction replay flag until queued retry callbacks flush", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
void replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
1001,
"Message A final",
expect.any(Object),
);
});
it("keeps the existing preview when the retried answer only arrives as final text", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
await replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(answerDraftStream.materialize).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
1001,
"Message B final",
expect.any(Object),
);
});
it("keeps the transient preview when a local exec approval prompt is suppressed after compaction", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
await dispatcherOptions.deliver(
{
text: "Approval required.\n\n```txt\n/approve 7f423fdc allow-once\n```",
channelData: {
execApproval: {
approvalId: "7f423fdc-1111-2222-3333-444444444444",
approvalSlug: "7f423fdc",
allowedDecisions: ["allow-once", "allow-always", "deny"],
},
},
},
{ kind: "tool" },
);
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
cfg: {
channels: {
telegram: {
execApprovals: {
enabled: true,
approvers: ["12345"],
target: "dm",
},
},
},
},
});
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
1001,
"Message B final",
expect.any(Object),
);
});
it("finalizes multi-message assistant stream to matching preview messages in order", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();

View File

@@ -280,6 +280,10 @@ export const dispatchTelegramMessage = async ({
const reasoningLane = lanes.reasoning;
let splitReasoningOnNextStream = false;
let skipNextAnswerMessageStartRotation = false;
// If compaction interrupts a still-transient answer preview, keep the next
// assistant-message boundary on that same preview instead of materializing a
// duplicate retry message.
let pendingCompactionReplayBoundary = false;
let draftLaneEventQueue = Promise.resolve();
const reasoningStepState = createTelegramReasoningStepState();
const enqueueDraftLaneEvent = (task: () => Promise<void>): Promise<void> => {
@@ -693,6 +697,9 @@ export const dispatchTelegramMessage = async ({
}
}
if (segments.length > 0) {
if (info.kind === "final") {
pendingCompactionReplayBoundary = false;
}
return;
}
if (split.suppressedReasoningOnly) {
@@ -703,6 +710,7 @@ export const dispatchTelegramMessage = async ({
}
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
@@ -716,12 +724,14 @@ export const dispatchTelegramMessage = async ({
if (!canSendAsIs) {
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
await sendPayload(payload);
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
},
onSkip: (payload, info) => {
@@ -793,6 +803,12 @@ export const dispatchTelegramMessage = async ({
retainPreviewOnCleanupByLane.answer = false;
return;
}
if (pendingCompactionReplayBoundary) {
pendingCompactionReplayBoundary = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
await rotateAnswerLaneForNewAssistantMessage();
// Message-start is an explicit assistant-message boundary.
// Even when no forceNewMessage happened (e.g. prior answer had no
@@ -817,9 +833,20 @@ export const dispatchTelegramMessage = async ({
}
}
: undefined,
onCompactionStart: statusReactionController
? () => statusReactionController.setCompacting()
: undefined,
onCompactionStart:
statusReactionController || answerLane.stream
? async () => {
if (
answerLane.hasStreamedMessage &&
activePreviewLifecycleByLane.answer === "transient"
) {
pendingCompactionReplayBoundary = true;
}
if (statusReactionController) {
await statusReactionController.setCompacting();
}
}
: undefined,
onCompactionEnd: statusReactionController
? async () => {
statusReactionController.cancelPending();