fix(telegram): suppress superseded turn replies

This commit is contained in:
Peter Steinberger
2026-05-03 17:06:52 +01:00
parent 463493aa28
commit 2696baba81
4 changed files with 86 additions and 8 deletions

View File

@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
- Google Meet: route stateful CLI session commands through the gateway-owned runtime so joined realtime sessions survive after the starting CLI process exits. Fixes #76344. Thanks @coltonharris-wq.
- Memory/status: keep plain `openclaw memory status` and `openclaw memory status --json` on the cheap read-only path by reserving vector and embedding provider probes for `--deep` or `--index`. Fixes #76769. Thanks @daruire.
- Telegram: suppress stale same-session replies when a newer accepted message arrives before an older in-flight Telegram dispatch finalizes. Fixes #76642. Thanks @chinar-amrutkar.
- Control UI/Sessions: avoid full `sessions.list` reloads for chat-turn `sessions.changed` payloads, so large session stores no longer add multi-second delays while chat responses are being delivered. (#76676) Thanks @VACInc.
- Gateway/watch: run `doctor --fix --non-interactive` once and retry when the dev Gateway child exits during startup, so stale local plugin install/config state does not leave the tmux watch session disappearing without a repair attempt.
- Doctor/Telegram: warn when selected Telegram quote replies can suppress `streaming.preview.toolProgress`, and document the `replyToMode` trade-off without changing runtime delivery. Fixes #73487. Thanks @GodsBoy.

View File

@@ -3468,6 +3468,84 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(firstAnswerDraft.clear).not.toHaveBeenCalled();
});
it("ignores stale answer finalization after a newer message supersedes the same session", async () => {
let releaseFirstFinal!: () => void;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolvePreviewVisible!: () => void;
const previewVisible = new Promise<void>((resolve) => {
resolvePreviewVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
resolvePreviewVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const secondAnswerDraft = createDraftStream();
const secondReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => secondAnswerDraft)
.mockImplementationOnce(() => secondReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "New reply final" }, { kind: "final" });
return { queuedFinal: true };
});
const newReplyDelivered = observeDeliveredReply("New reply final");
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
MessageSid: "msg-1",
} as never,
}),
});
await previewVisible;
const secondPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "newer request",
RawBody: "newer request",
MessageSid: "msg-2",
} as never,
}),
});
await newReplyDelivered;
releaseFirstFinal();
await Promise.all([firstPromise, secondPromise]);
expect(editMessageTelegram).not.toHaveBeenCalledWith(
123,
1001,
"Old reply final",
expect.any(Object),
);
expect(firstAnswerDraft.clear).not.toHaveBeenCalled();
});
it("discards hidden short partials instead of flushing a stale preview after abort", async () => {
let releaseFirstCleanup!: () => void;
const firstCleanupGate = new Promise<void>((resolve) => {

View File

@@ -128,7 +128,7 @@ type TelegramAbortFenceState = {
activeDispatches: number;
};
// Abort can arrive on Telegram's control lane ahead of older same-session reply work.
// Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work.
const telegramAbortFenceByKey = new Map<string, TelegramAbortFenceState>();
function normalizeTelegramFenceKey(value: unknown): string | undefined {
@@ -607,9 +607,9 @@ export const dispatchTelegramMessage = async ({
: undefined;
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
const shouldSupersedeAbortFence =
ctxPayload.CommandAuthorized &&
isAbortRequestText(ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "");
const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "";
const isAbortRequest = isAbortRequestText(dispatchText);
const shouldSupersedeAbortFence = isAbortRequest ? ctxPayload.CommandAuthorized : true;
abortFenceGeneration = beginTelegramAbortFence({
key: dispatchFenceKey,
@@ -907,7 +907,8 @@ export const dispatchTelegramMessage = async ({
const _hasMedia = reply.hasMedia;
const flushBufferedFinalAnswer = async () => {
const buffered = reasoningStepState.takeBufferedFinalAnswer(abortFenceGeneration);
const buffered =
reasoningStepState.takeBufferedFinalAnswer(abortFenceGeneration);
if (!buffered) {
return;
}

View File

@@ -118,9 +118,7 @@ export function createTelegramReasoningStepState() {
bufferedFinalAnswer = value;
};
const takeBufferedFinalAnswer = (
currentGeneration?: number,
): BufferedFinalAnswer | undefined => {
const takeBufferedFinalAnswer = (currentGeneration?: number): BufferedFinalAnswer | undefined => {
if (
currentGeneration !== undefined &&
bufferedFinalAnswer?.bufferedGeneration !== undefined &&