diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a4c7ab1212..4470724fefd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Agents/OpenAI: map `minimal` thinking to OpenAI's supported `low` reasoning effort for GPT-5.4 requests, so embedded runs stop failing request validation. Thanks @steipete. - Voice-call/media-stream: resolve the source IP from trusted forwarding headers for per-IP pending-connection limits when `webhookSecurity.trustForwardingHeaders` and `trustedProxyIPs` are configured, and reserve `maxConnections` capacity for in-flight WebSocket upgrades so concurrent handshakes can no longer momentarily exceed the operator-set cap. (#66027) Thanks @eleqtrizit. - Feishu/allowlist: canonicalize allowlist entries by explicit `user`/`chat` kind, strip repeated `feishu:`/`lark:` provider prefixes, and stop folding opaque Feishu IDs to lowercase, so allowlist matching no longer crosses user/chat namespaces or widens to case-insensitive ID matches the operator did not intend. (#66021) Thanks @eleqtrizit. +- Telegram/status commands: let read-only status slash commands bypass busy topic turns, while keeping `/export-session` on the normal lane so it cannot interleave with an in-flight session mutation. (#66226) Thanks @VACInc and @vincentkoc. - TTS/reply media: persist OpenClaw temp voice outputs into managed outbound media and allow them through reply-media normalization, so voice-note replies stop silently dropping. (#63511) Thanks @jetd1. - Agents/tools: treat Windows drive-letter paths (`C:\\...`) as absolute when resolving sandbox and read-tool paths so workspace root is not prepended under POSIX path rules. (#54039) Thanks @ly85206559 and @vincentkoc. - Agents/OpenAI: recover embedded GPT-style runs when reasoning-only or empty turns need bounded continuation, with replay-safe retry gating and incomplete-turn fallback when no visible answer arrives. (#66167) thanks @jalehman diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index b976f681131..b36a5f89aeb 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -62,6 +62,63 @@ const TELEGRAM_TEST_TIMINGS = { textFragmentGapMs: 30, } as const; +type TelegramMiddlewareTestContext = Record; +type TelegramMiddleware = ( + ctx: TelegramMiddlewareTestContext, + next: () => Promise, +) => Promise | void; + +function getRegisteredTelegramMiddlewares(): TelegramMiddleware[] { + return middlewareUseSpy.mock.calls + .map((call) => call[0]) + .filter((fn): fn is TelegramMiddleware => typeof fn === "function"); +} + +async function runTelegramMiddlewareChain(params: { + ctx: TelegramMiddlewareTestContext; + finalHandler: (ctx: TelegramMiddlewareTestContext) => Promise; +}): Promise { + const middlewares = getRegisteredTelegramMiddlewares(); + let idx = -1; + const dispatch = async (i: number): Promise => { + if (i <= idx) { + throw new Error("middleware dispatch called multiple times"); + } + idx = i; + const fn = middlewares[i]; + if (!fn) { + await params.finalHandler(params.ctx); + return; + } + await fn(params.ctx, async () => dispatch(i + 1)); + }; + await dispatch(0); +} + +function installPerKeySequentializer(): void { + sequentializeSpy.mockImplementationOnce(() => { + const lanes = new Map>(); + return async (ctx: TelegramMiddlewareTestContext, next: () => Promise) => { + const key = harness.sequentializeKey?.(ctx) ?? "default"; + const previous = lanes.get(key) ?? Promise.resolve(); + const current = previous.then(async () => { + await next(); + }); + lanes.set( + key, + current.catch(() => undefined), + ); + try { + await current; + } finally { + if (lanes.get(key) === current) { + lanes.delete(key); + } + } + }; + }); +} + describe("createTelegramBot", () => { beforeAll(() => { process.env.TZ = "UTC"; @@ -146,6 +203,182 @@ describe("createTelegramBot", () => { expect(harness.sequentializeKey).toBe(getTelegramSequentialKey); }); + it("lets /status bypass a busy Telegram topic lane", async () => { + installPerKeySequentializer(); + loadConfig.mockReturnValue({ + commands: { native: true }, + channels: { + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + groups: { "*": { requireMention: false } }, + }, + }, + }); + + const startedBodies: string[] = []; + let releaseConversationTurn!: () => void; + const conversationGate = new Promise((resolve) => { + releaseConversationTurn = resolve; + }); + + replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => { + await opts?.onReplyStart?.(); + const body = String(ctx.CommandBody ?? ctx.Body ?? ""); + startedBodies.push(body); + if (body.includes("hello there")) { + await conversationGate; + } + return { text: `reply:${body}` }; + }); + + createTelegramBot({ token: "tok" }); + const messageHandler = getOnHandler("message") as ( + ctx: TelegramMiddlewareTestContext, + ) => Promise; + const statusHandler = commandSpy.mock.calls.find((call) => call[0] === "status")?.[1] as + | ((ctx: TelegramMiddlewareTestContext) => Promise) + | undefined; + expect(statusHandler).toBeDefined(); + if (!statusHandler) { + return; + } + + const busyCtx = { + ...makeForumGroupMessageCtx({ threadId: 99, text: "hello there" }), + message: { + ...makeForumGroupMessageCtx({ threadId: 99, text: "hello there" }).message, + message_id: 101, + }, + update: { update_id: 101 }, + }; + const statusCtx = { + ...makeForumGroupMessageCtx({ threadId: 99, text: "/status" }), + message: { + ...makeForumGroupMessageCtx({ threadId: 99, text: "/status" }).message, + message_id: 102, + }, + update: { update_id: 102 }, + match: "", + }; + + const busyPromise = runTelegramMiddlewareChain({ + ctx: busyCtx, + finalHandler: messageHandler, + }); + + await vi.waitFor(() => { + expect(startedBodies).toHaveLength(1); + expect(startedBodies[0]).toContain("hello there"); + }); + + const statusPromise = runTelegramMiddlewareChain({ + ctx: statusCtx, + finalHandler: statusHandler, + }); + + await vi.waitFor(() => { + expect(startedBodies).toHaveLength(2); + expect(startedBodies[0]).toContain("hello there"); + expect(startedBodies[1]).toBe("/status"); + expect(sendMessageSpy).toHaveBeenCalledTimes(1); + expect(sendMessageSpy.mock.calls[0]?.[1]).toContain("reply:/status"); + }); + + await statusPromise; + + releaseConversationTurn(); + await busyPromise; + + await vi.waitFor(() => { + expect(sendMessageSpy).toHaveBeenCalledTimes(2); + }); + const sentBodies = sendMessageSpy.mock.calls.map((call) => String(call[1])); + expect(sentBodies[0]).toContain("reply:/status"); + expect(sentBodies[1]).toContain("hello there"); + }); + + it("keeps ordinary Telegram messages serialized within the same topic", async () => { + installPerKeySequentializer(); + loadConfig.mockReturnValue({ + channels: { + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + groups: { "*": { requireMention: false } }, + }, + }, + }); + + const startedBodies: string[] = []; + let releaseFirstTurn!: () => void; + const firstTurnGate = new Promise((resolve) => { + releaseFirstTurn = resolve; + }); + + replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => { + await opts?.onReplyStart?.(); + const body = String(ctx.Body ?? ""); + startedBodies.push(body); + if (body.includes("first message")) { + await firstTurnGate; + } + return { text: `reply:${body}` }; + }); + + createTelegramBot({ token: "tok" }); + const messageHandler = getOnHandler("message") as ( + ctx: TelegramMiddlewareTestContext, + ) => Promise; + + const firstCtx = { + ...makeForumGroupMessageCtx({ threadId: 99, text: "first message" }), + message: { + ...makeForumGroupMessageCtx({ threadId: 99, text: "first message" }).message, + message_id: 201, + }, + update: { update_id: 201 }, + }; + const secondCtx = { + ...makeForumGroupMessageCtx({ threadId: 99, text: "second message" }), + message: { + ...makeForumGroupMessageCtx({ threadId: 99, text: "second message" }).message, + message_id: 202, + }, + update: { update_id: 202 }, + }; + + const firstPromise = runTelegramMiddlewareChain({ + ctx: firstCtx, + finalHandler: messageHandler, + }); + + await vi.waitFor(() => { + expect(startedBodies).toHaveLength(1); + expect(startedBodies[0]).toContain("first message"); + }); + + const secondPromise = runTelegramMiddlewareChain({ + ctx: secondCtx, + finalHandler: messageHandler, + }); + + await Promise.resolve(); + expect(startedBodies).toHaveLength(1); + expect(startedBodies[0]).toContain("first message"); + expect(sendMessageSpy).not.toHaveBeenCalled(); + + releaseFirstTurn(); + await Promise.all([firstPromise, secondPromise]); + + expect(startedBodies).toHaveLength(2); + expect(startedBodies[0]).toContain("first message"); + expect(startedBodies[1]).toContain("second message"); + const sentBodies = sendMessageSpy.mock.calls.map((call) => String(call[1])); + expect(sentBodies[0]).toContain("first message"); + expect(sentBodies[1]).toContain("second message"); + }); + it("preserves same-chat reply order when a debounced run is still active", async () => { const DEBOUNCE_MS = 4321; loadConfig.mockReturnValue({ diff --git a/extensions/telegram/src/sequential-key.test.ts b/extensions/telegram/src/sequential-key.test.ts index 5d15f337577..b24f59d6cd9 100644 --- a/extensions/telegram/src/sequential-key.test.ts +++ b/extensions/telegram/src/sequential-key.test.ts @@ -59,7 +59,39 @@ describe("getTelegramSequentialKey", () => { { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }) }, "telegram:123:control", ], - [{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/status" }) }, "telegram:123"], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/status" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/commands" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/help" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/tools" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/tasks" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/context" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/whoami" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/export-session" }) }, + "telegram:123", + ], + [{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/export" }) }, "telegram:123"], [ { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/btw what is the time?" }) }, "telegram:123:btw:1", diff --git a/extensions/telegram/src/sequential-key.ts b/extensions/telegram/src/sequential-key.ts index 4d021b963d7..650af3fb49c 100644 --- a/extensions/telegram/src/sequential-key.ts +++ b/extensions/telegram/src/sequential-key.ts @@ -1,4 +1,9 @@ import { type Message, type UserFromGetMe } from "@grammyjs/types"; +import { + listChatCommands, + maybeResolveTextAlias, + normalizeCommandBody, +} from "openclaw/plugin-sdk/command-auth"; import { parseExecApprovalCommandText } from "openclaw/plugin-sdk/infra-runtime"; import { isAbortRequestText } from "openclaw/plugin-sdk/reply-runtime"; import { isBtwRequestText } from "openclaw/plugin-sdk/reply-runtime"; @@ -20,6 +25,27 @@ export type TelegramSequentialKeyContext = { }; }; +function resolveStatusCommandControlLane(params: { + rawText?: string; + botUsername?: string; +}): boolean { + // Only read-only status commands should bypass the per-topic lane. Commands + // like /export-session stay on the normal lane because they materialize + // session state to disk and should not interleave with an active turn. + const normalizedBody = normalizeCommandBody( + params.rawText?.trim() ?? "", + params.botUsername ? { botUsername: params.botUsername } : undefined, + ); + const alias = maybeResolveTextAlias(normalizedBody); + if (!alias) { + return false; + } + const command = listChatCommands().find((entry) => + entry.textAliases.some((candidate) => candidate.trim().toLowerCase() === alias), + ); + return command?.category === "status" && command.key !== "export-session"; +} + export function getTelegramSequentialKey(ctx: TelegramSequentialKeyContext): string { const reaction = ctx.update?.message_reaction; if (reaction?.chat?.id) { @@ -43,6 +69,12 @@ export function getTelegramSequentialKey(ctx: TelegramSequentialKeyContext): str } return "telegram:control"; } + if (resolveStatusCommandControlLane({ rawText, botUsername })) { + if (typeof chatId === "number") { + return `telegram:${chatId}:control`; + } + return "telegram:control"; + } if (isBtwRequestText(rawText, botUsername ? { botUsername } : undefined)) { const messageId = msg?.message_id; if (typeof chatId === "number" && typeof messageId === "number") {