fix(telegram): honor removeAckAfterReply for status reactions (#68067)

Thanks @poiskgit.
This commit is contained in:
poisk
2026-04-21 08:47:20 +08:00
committed by GitHub
parent 60a1f01a3e
commit 32e8bca02c
4 changed files with 271 additions and 21 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Telegram/status reactions: honor `messages.removeAckAfterReply` when lifecycle status reactions are enabled, clearing or restoring the reaction after success/error using the configured hold timings. (#68067) Thanks @poiskgit.
- Telegram/polling: raise the default polling watchdog threshold from 90s to 120s and add configurable `channels.telegram.pollingStallThresholdMs` (also per-account) so long-running Telegram work gets more room before polling is treated as stalled. (#57737) Thanks @Vitalcheffe.
- Telegram/polling: bound the persisted-offset confirmation `getUpdates` probe with a client-side timeout so a zombie socket cannot hang polling recovery before the runner watchdog starts. (#50368) Thanks @boticlaw.
- Agents/Pi runner: retry silent `stopReason=error` turns with no output when no side effects ran, so non-frontier providers that briefly return empty error turns get another chance instead of ending the session early. (#68310) Thanks @Chased1k.

View File

@@ -67,6 +67,7 @@ type TelegramStatusReactionController = {
cancelPending: () => void;
setError: () => void | Promise<void>;
setDone: () => void | Promise<void>;
restoreInitial: () => void | Promise<void>;
};
export type TelegramMessageContext = {

View File

@@ -298,6 +298,19 @@ describe("dispatchTelegramMessage draft streaming", () => {
};
}
function createStatusReactionController() {
return {
setQueued: vi.fn(),
setThinking: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
cancelPending: vi.fn(),
setError: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
restoreInitial: vi.fn(async () => {}),
};
}
function createBot(): Bot {
return {
api: {
@@ -3075,15 +3088,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
resolvePreviewVisible = resolve;
});
const statusReactionController = {
setQueued: vi.fn(),
setThinking: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
cancelPending: vi.fn(),
setError: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
};
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
@@ -3116,6 +3122,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
const firstPromise = dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
ctxPayload: {
SessionKey: "s1",
@@ -3123,6 +3131,15 @@ describe("dispatchTelegramMessage draft streaming", () => {
RawBody: "earlier request",
} as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
doneHoldMs: 250,
},
},
},
},
});
await previewVisible;
@@ -3147,11 +3164,23 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
releaseFirstFinal();
await Promise.all([firstPromise, abortPromise]);
vi.useFakeTimers();
try {
releaseFirstFinal();
await Promise.all([firstPromise, abortPromise]);
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.setError).not.toHaveBeenCalled();
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.setError).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(249);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("keeps an existing preview when abort arrives during queued draft-lane cleanup", async () => {
@@ -3529,6 +3558,174 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("uses configured doneHoldMs when clearing Telegram status reactions after reply", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
deliverReplies.mockResolvedValue({ delivered: true });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
doneHoldMs: 250,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(249);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after reply when removeAckAfterReply is disabled", async () => {
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: false,
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
await vi.waitFor(() => {
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
});
expect(statusReactionController.setError).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
});
it("uses configured errorHoldMs to clear Telegram status reactions after an error fallback", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
errorHoldMs: 320,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.setDone).not.toHaveBeenCalled();
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(319);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after an error when no final reply is sent", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: false });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
errorHoldMs: 320,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(319);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after an error fallback when removeAckAfterReply is disabled", async () => {
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: false,
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
await vi.waitFor(() => {
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
});
expect(statusReactionController.setDone).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
});
it("uses resolved DM config for auto-topic-label overrides", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
loadSessionStore.mockReturnValue({ s1: {} });

View File

@@ -1,5 +1,6 @@
import type { Bot } from "grammy";
import {
DEFAULT_TIMING,
logAckFailure,
logTypingFailure,
removeAckReactionAfterReply,
@@ -16,7 +17,7 @@ import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { isAbortRequestText, type ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { danger, logVerbose, sleepWithAbort } from "openclaw/plugin-sdk/runtime-env";
import { defaultTelegramBotDeps, type TelegramBotDeps } from "./bot-deps.js";
import type { TelegramMessageContext } from "./bot-message-context.js";
import {
@@ -252,6 +253,48 @@ export const dispatchTelegramMessage = async ({
removeAckAfterReply,
statusReactionController,
} = context;
const statusReactionTiming = {
...DEFAULT_TIMING,
...cfg.messages?.statusReactions?.timing,
};
const clearTelegramStatusReaction = async () => {
if (!msg.message_id || !reactionApi) {
return;
}
await reactionApi(chatId, msg.message_id, []);
};
const finalizeTelegramStatusReaction = async (params: {
outcome: "done" | "error";
hasFinalResponse: boolean;
}) => {
if (!statusReactionController) {
return;
}
if (params.outcome === "done") {
await statusReactionController.setDone();
if (removeAckAfterReply) {
await sleepWithAbort(statusReactionTiming.doneHoldMs);
await clearTelegramStatusReaction();
} else {
await statusReactionController.restoreInitial();
}
return;
}
await statusReactionController.setError();
if (params.hasFinalResponse) {
if (removeAckAfterReply) {
await sleepWithAbort(statusReactionTiming.errorHoldMs);
await clearTelegramStatusReaction();
} else {
await statusReactionController.restoreInitial();
}
return;
}
if (removeAckAfterReply) {
await sleepWithAbort(statusReactionTiming.errorHoldMs);
}
await statusReactionController.restoreInitial();
};
const dispatchFenceKey = resolveTelegramAbortFenceKey({
ctxPayload,
chatId,
@@ -1017,9 +1060,11 @@ export const dispatchTelegramMessage = async ({
}
if (dispatchWasSuperseded) {
if (statusReactionController) {
void Promise.resolve(statusReactionController.setDone()).catch((err: unknown) => {
logVerbose(`telegram: status reaction finalize failed: ${String(err)}`);
});
void finalizeTelegramStatusReaction({ outcome: "done", hasFinalResponse: true }).catch(
(err: unknown) => {
logVerbose(`telegram: status reaction finalize failed: ${String(err)}`);
},
);
} else {
removeAckReactionAfterReply({
removeAfterReply: removeAckAfterReply,
@@ -1065,9 +1110,11 @@ export const dispatchTelegramMessage = async ({
const hasFinalResponse = queuedFinal || sentFallback;
if (statusReactionController && !hasFinalResponse) {
void Promise.resolve(statusReactionController.setError()).catch((err: unknown) => {
logVerbose(`telegram: status reaction error finalize failed: ${String(err)}`);
});
void finalizeTelegramStatusReaction({ outcome: "error", hasFinalResponse: false }).catch(
(err: unknown) => {
logVerbose(`telegram: status reaction error finalize failed: ${String(err)}`);
},
);
}
if (!hasFinalResponse) {
@@ -1116,7 +1163,11 @@ export const dispatchTelegramMessage = async ({
}
if (statusReactionController) {
void Promise.resolve(statusReactionController.setDone()).catch((err: unknown) => {
const statusReactionOutcome = dispatchError || sentFallback ? "error" : "done";
void finalizeTelegramStatusReaction({
outcome: statusReactionOutcome,
hasFinalResponse: true,
}).catch((err: unknown) => {
logVerbose(`telegram: status reaction finalize failed: ${String(err)}`);
});
} else {