diff --git a/CHANGELOG.md b/CHANGELOG.md index db1fd0761b8..7032d101a63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/status reactions: honor `messages.removeAckAfterReply` when lifecycle status reactions are enabled, clearing or restoring the reaction after success/error using the configured hold timings. (#68067) Thanks @poiskgit. - Telegram/polling: raise the default polling watchdog threshold from 90s to 120s and add configurable `channels.telegram.pollingStallThresholdMs` (also per-account) so long-running Telegram work gets more room before polling is treated as stalled. (#57737) Thanks @Vitalcheffe. - Telegram/polling: bound the persisted-offset confirmation `getUpdates` probe with a client-side timeout so a zombie socket cannot hang polling recovery before the runner watchdog starts. (#50368) Thanks @boticlaw. - Agents/Pi runner: retry silent `stopReason=error` turns with no output when no side effects ran, so non-frontier providers that briefly return empty error turns get another chance instead of ending the session early. (#68310) Thanks @Chased1k. diff --git a/extensions/telegram/src/bot-message-context.ts b/extensions/telegram/src/bot-message-context.ts index 55dda165ce3..35267c21ace 100644 --- a/extensions/telegram/src/bot-message-context.ts +++ b/extensions/telegram/src/bot-message-context.ts @@ -67,6 +67,7 @@ type TelegramStatusReactionController = { cancelPending: () => void; setError: () => void | Promise; setDone: () => void | Promise; + restoreInitial: () => void | Promise; }; export type TelegramMessageContext = { diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 7efc056eba2..c07a8034b61 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -298,6 +298,19 @@ describe("dispatchTelegramMessage draft streaming", () => { }; } + function createStatusReactionController() { + return { + setQueued: vi.fn(), + setThinking: vi.fn(async () => {}), + setTool: vi.fn(async () => {}), + setCompacting: vi.fn(async () => {}), + cancelPending: vi.fn(), + setError: vi.fn(async () => {}), + setDone: vi.fn(async () => {}), + restoreInitial: vi.fn(async () => {}), + }; + } + function createBot(): Bot { return { api: { @@ -3075,15 +3088,8 @@ describe("dispatchTelegramMessage draft streaming", () => { resolvePreviewVisible = resolve; }); - const statusReactionController = { - setQueued: vi.fn(), - setThinking: vi.fn(async () => {}), - setTool: vi.fn(async () => {}), - setCompacting: vi.fn(async () => {}), - cancelPending: vi.fn(), - setError: vi.fn(async () => {}), - setDone: vi.fn(async () => {}), - }; + const reactionApi = vi.fn(async () => true); + const statusReactionController = createStatusReactionController(); const firstAnswerDraft = createTestDraftStream({ messageId: 1001, onUpdate: (text) => { @@ -3116,6 +3122,8 @@ describe("dispatchTelegramMessage draft streaming", () => { const firstPromise = dispatchWithContext({ context: createContext({ + reactionApi: reactionApi as never, + removeAckAfterReply: true, statusReactionController: statusReactionController as never, ctxPayload: { SessionKey: "s1", @@ -3123,6 +3131,15 @@ describe("dispatchTelegramMessage draft streaming", () => { RawBody: "earlier request", } as never, }), + cfg: { + messages: { + statusReactions: { + timing: { + doneHoldMs: 250, + }, + }, + }, + }, }); await previewVisible; @@ -3147,11 +3164,23 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); - releaseFirstFinal(); - await Promise.all([firstPromise, abortPromise]); + vi.useFakeTimers(); + try { + releaseFirstFinal(); + await Promise.all([firstPromise, abortPromise]); - expect(statusReactionController.setDone).toHaveBeenCalledTimes(1); - expect(statusReactionController.setError).not.toHaveBeenCalled(); + expect(statusReactionController.setDone).toHaveBeenCalledTimes(1); + expect(statusReactionController.setError).not.toHaveBeenCalled(); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(249); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(1); + expect(reactionApi).toHaveBeenCalledWith(123, 456, []); + } finally { + vi.useRealTimers(); + } }); it("keeps an existing preview when abort arrives during queued draft-lane cleanup", async () => { @@ -3529,6 +3558,174 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("uses configured doneHoldMs when clearing Telegram status reactions after reply", async () => { + vi.useFakeTimers(); + const reactionApi = vi.fn(async () => true); + const statusReactionController = createStatusReactionController(); + dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true }); + deliverReplies.mockResolvedValue({ delivered: true }); + + try { + await dispatchWithContext({ + context: createContext({ + reactionApi: reactionApi as never, + removeAckAfterReply: true, + statusReactionController: statusReactionController as never, + }), + cfg: { + messages: { + statusReactions: { + timing: { + doneHoldMs: 250, + }, + }, + }, + }, + streamMode: "off", + }); + + expect(statusReactionController.setDone).toHaveBeenCalledTimes(1); + expect(statusReactionController.restoreInitial).not.toHaveBeenCalled(); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(249); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(1); + expect(reactionApi).toHaveBeenCalledWith(123, 456, []); + } finally { + vi.useRealTimers(); + } + }); + + it("restores the initial Telegram status reaction after reply when removeAckAfterReply is disabled", async () => { + const reactionApi = vi.fn(async () => true); + const statusReactionController = createStatusReactionController(); + dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext({ + reactionApi: reactionApi as never, + removeAckAfterReply: false, + statusReactionController: statusReactionController as never, + }), + streamMode: "off", + }); + + await vi.waitFor(() => { + expect(statusReactionController.setDone).toHaveBeenCalledTimes(1); + expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1); + }); + expect(statusReactionController.setError).not.toHaveBeenCalled(); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + }); + + it("uses configured errorHoldMs to clear Telegram status reactions after an error fallback", async () => { + vi.useFakeTimers(); + const reactionApi = vi.fn(async () => true); + const statusReactionController = createStatusReactionController(); + dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded")); + deliverReplies.mockResolvedValue({ delivered: true }); + + try { + await dispatchWithContext({ + context: createContext({ + reactionApi: reactionApi as never, + removeAckAfterReply: true, + statusReactionController: statusReactionController as never, + }), + cfg: { + messages: { + statusReactions: { + timing: { + errorHoldMs: 320, + }, + }, + }, + }, + streamMode: "off", + }); + + expect(statusReactionController.setError).toHaveBeenCalledTimes(1); + expect(statusReactionController.setDone).not.toHaveBeenCalled(); + expect(statusReactionController.restoreInitial).not.toHaveBeenCalled(); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(319); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(1); + expect(reactionApi).toHaveBeenCalledWith(123, 456, []); + } finally { + vi.useRealTimers(); + } + }); + + it("restores the initial Telegram status reaction after an error when no final reply is sent", async () => { + vi.useFakeTimers(); + const reactionApi = vi.fn(async () => true); + const statusReactionController = createStatusReactionController(); + dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded")); + deliverReplies.mockResolvedValue({ delivered: false }); + + try { + await dispatchWithContext({ + context: createContext({ + reactionApi: reactionApi as never, + removeAckAfterReply: true, + statusReactionController: statusReactionController as never, + }), + cfg: { + messages: { + statusReactions: { + timing: { + errorHoldMs: 320, + }, + }, + }, + }, + streamMode: "off", + }); + + expect(statusReactionController.setError).toHaveBeenCalledTimes(1); + expect(statusReactionController.restoreInitial).not.toHaveBeenCalled(); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + + await vi.advanceTimersByTimeAsync(319); + expect(statusReactionController.restoreInitial).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + } finally { + vi.useRealTimers(); + } + }); + + it("restores the initial Telegram status reaction after an error fallback when removeAckAfterReply is disabled", async () => { + const reactionApi = vi.fn(async () => true); + const statusReactionController = createStatusReactionController(); + dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded")); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext({ + reactionApi: reactionApi as never, + removeAckAfterReply: false, + statusReactionController: statusReactionController as never, + }), + streamMode: "off", + }); + + await vi.waitFor(() => { + expect(statusReactionController.setError).toHaveBeenCalledTimes(1); + expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1); + }); + expect(statusReactionController.setDone).not.toHaveBeenCalled(); + expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []); + }); + it("uses resolved DM config for auto-topic-label overrides", async () => { dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true }); loadSessionStore.mockReturnValue({ s1: {} }); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index e2ba1b19ff5..35c22051c30 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -1,5 +1,6 @@ import type { Bot } from "grammy"; import { + DEFAULT_TIMING, logAckFailure, logTypingFailure, removeAckReactionAfterReply, @@ -16,7 +17,7 @@ import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history" import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { isAbortRequestText, type ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; -import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env"; +import { danger, logVerbose, sleepWithAbort } from "openclaw/plugin-sdk/runtime-env"; import { defaultTelegramBotDeps, type TelegramBotDeps } from "./bot-deps.js"; import type { TelegramMessageContext } from "./bot-message-context.js"; import { @@ -252,6 +253,48 @@ export const dispatchTelegramMessage = async ({ removeAckAfterReply, statusReactionController, } = context; + const statusReactionTiming = { + ...DEFAULT_TIMING, + ...cfg.messages?.statusReactions?.timing, + }; + const clearTelegramStatusReaction = async () => { + if (!msg.message_id || !reactionApi) { + return; + } + await reactionApi(chatId, msg.message_id, []); + }; + const finalizeTelegramStatusReaction = async (params: { + outcome: "done" | "error"; + hasFinalResponse: boolean; + }) => { + if (!statusReactionController) { + return; + } + if (params.outcome === "done") { + await statusReactionController.setDone(); + if (removeAckAfterReply) { + await sleepWithAbort(statusReactionTiming.doneHoldMs); + await clearTelegramStatusReaction(); + } else { + await statusReactionController.restoreInitial(); + } + return; + } + await statusReactionController.setError(); + if (params.hasFinalResponse) { + if (removeAckAfterReply) { + await sleepWithAbort(statusReactionTiming.errorHoldMs); + await clearTelegramStatusReaction(); + } else { + await statusReactionController.restoreInitial(); + } + return; + } + if (removeAckAfterReply) { + await sleepWithAbort(statusReactionTiming.errorHoldMs); + } + await statusReactionController.restoreInitial(); + }; const dispatchFenceKey = resolveTelegramAbortFenceKey({ ctxPayload, chatId, @@ -1017,9 +1060,11 @@ export const dispatchTelegramMessage = async ({ } if (dispatchWasSuperseded) { if (statusReactionController) { - void Promise.resolve(statusReactionController.setDone()).catch((err: unknown) => { - logVerbose(`telegram: status reaction finalize failed: ${String(err)}`); - }); + void finalizeTelegramStatusReaction({ outcome: "done", hasFinalResponse: true }).catch( + (err: unknown) => { + logVerbose(`telegram: status reaction finalize failed: ${String(err)}`); + }, + ); } else { removeAckReactionAfterReply({ removeAfterReply: removeAckAfterReply, @@ -1065,9 +1110,11 @@ export const dispatchTelegramMessage = async ({ const hasFinalResponse = queuedFinal || sentFallback; if (statusReactionController && !hasFinalResponse) { - void Promise.resolve(statusReactionController.setError()).catch((err: unknown) => { - logVerbose(`telegram: status reaction error finalize failed: ${String(err)}`); - }); + void finalizeTelegramStatusReaction({ outcome: "error", hasFinalResponse: false }).catch( + (err: unknown) => { + logVerbose(`telegram: status reaction error finalize failed: ${String(err)}`); + }, + ); } if (!hasFinalResponse) { @@ -1116,7 +1163,11 @@ export const dispatchTelegramMessage = async ({ } if (statusReactionController) { - void Promise.resolve(statusReactionController.setDone()).catch((err: unknown) => { + const statusReactionOutcome = dispatchError || sentFallback ? "error" : "done"; + void finalizeTelegramStatusReaction({ + outcome: statusReactionOutcome, + hasFinalResponse: true, + }).catch((err: unknown) => { logVerbose(`telegram: status reaction finalize failed: ${String(err)}`); }); } else {