mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:40:43 +00:00
[codex] Telegram: unblock status commands behind busy turns (#66226)
* Telegram: unblock status commands behind busy turns * fix(telegram): keep export-session on topic lane * Update CHANGELOG.md --------- Co-authored-by: VACInc <3279061+VACInc@users.noreply.github.com> Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/OpenAI: map `minimal` thinking to OpenAI's supported `low` reasoning effort for GPT-5.4 requests, so embedded runs stop failing request validation. Thanks @steipete.
|
||||
- Voice-call/media-stream: resolve the source IP from trusted forwarding headers for per-IP pending-connection limits when `webhookSecurity.trustForwardingHeaders` and `trustedProxyIPs` are configured, and reserve `maxConnections` capacity for in-flight WebSocket upgrades so concurrent handshakes can no longer momentarily exceed the operator-set cap. (#66027) Thanks @eleqtrizit.
|
||||
- Feishu/allowlist: canonicalize allowlist entries by explicit `user`/`chat` kind, strip repeated `feishu:`/`lark:` provider prefixes, and stop folding opaque Feishu IDs to lowercase, so allowlist matching no longer crosses user/chat namespaces or widens to case-insensitive ID matches the operator did not intend. (#66021) Thanks @eleqtrizit.
|
||||
- Telegram/status commands: let read-only status slash commands bypass busy topic turns, while keeping `/export-session` on the normal lane so it cannot interleave with an in-flight session mutation. (#66226) Thanks @VACInc and @vincentkoc.
|
||||
- TTS/reply media: persist OpenClaw temp voice outputs into managed outbound media and allow them through reply-media normalization, so voice-note replies stop silently dropping. (#63511) Thanks @jetd1.
|
||||
- Agents/tools: treat Windows drive-letter paths (`C:\\...`) as absolute when resolving sandbox and read-tool paths so workspace root is not prepended under POSIX path rules. (#54039) Thanks @ly85206559 and @vincentkoc.
|
||||
- Agents/OpenAI: recover embedded GPT-style runs when reasoning-only or empty turns need bounded continuation, with replay-safe retry gating and incomplete-turn fallback when no visible answer arrives. (#66167) thanks @jalehman
|
||||
|
||||
@@ -62,6 +62,63 @@ const TELEGRAM_TEST_TIMINGS = {
|
||||
textFragmentGapMs: 30,
|
||||
} as const;
|
||||
|
||||
type TelegramMiddlewareTestContext = Record<string, unknown>;
|
||||
type TelegramMiddleware = (
|
||||
ctx: TelegramMiddlewareTestContext,
|
||||
next: () => Promise<void>,
|
||||
) => Promise<void> | void;
|
||||
|
||||
function getRegisteredTelegramMiddlewares(): TelegramMiddleware[] {
|
||||
return middlewareUseSpy.mock.calls
|
||||
.map((call) => call[0])
|
||||
.filter((fn): fn is TelegramMiddleware => typeof fn === "function");
|
||||
}
|
||||
|
||||
async function runTelegramMiddlewareChain(params: {
|
||||
ctx: TelegramMiddlewareTestContext;
|
||||
finalHandler: (ctx: TelegramMiddlewareTestContext) => Promise<void>;
|
||||
}): Promise<void> {
|
||||
const middlewares = getRegisteredTelegramMiddlewares();
|
||||
let idx = -1;
|
||||
const dispatch = async (i: number): Promise<void> => {
|
||||
if (i <= idx) {
|
||||
throw new Error("middleware dispatch called multiple times");
|
||||
}
|
||||
idx = i;
|
||||
const fn = middlewares[i];
|
||||
if (!fn) {
|
||||
await params.finalHandler(params.ctx);
|
||||
return;
|
||||
}
|
||||
await fn(params.ctx, async () => dispatch(i + 1));
|
||||
};
|
||||
await dispatch(0);
|
||||
}
|
||||
|
||||
function installPerKeySequentializer(): void {
|
||||
sequentializeSpy.mockImplementationOnce(() => {
|
||||
const lanes = new Map<string, Promise<void>>();
|
||||
return async (ctx: TelegramMiddlewareTestContext, next: () => Promise<void>) => {
|
||||
const key = harness.sequentializeKey?.(ctx) ?? "default";
|
||||
const previous = lanes.get(key) ?? Promise.resolve();
|
||||
const current = previous.then(async () => {
|
||||
await next();
|
||||
});
|
||||
lanes.set(
|
||||
key,
|
||||
current.catch(() => undefined),
|
||||
);
|
||||
try {
|
||||
await current;
|
||||
} finally {
|
||||
if (lanes.get(key) === current) {
|
||||
lanes.delete(key);
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
describe("createTelegramBot", () => {
|
||||
beforeAll(() => {
|
||||
process.env.TZ = "UTC";
|
||||
@@ -146,6 +203,182 @@ describe("createTelegramBot", () => {
|
||||
expect(harness.sequentializeKey).toBe(getTelegramSequentialKey);
|
||||
});
|
||||
|
||||
it("lets /status bypass a busy Telegram topic lane", async () => {
|
||||
installPerKeySequentializer();
|
||||
loadConfig.mockReturnValue({
|
||||
commands: { native: true },
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
allowFrom: ["*"],
|
||||
groups: { "*": { requireMention: false } },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const startedBodies: string[] = [];
|
||||
let releaseConversationTurn!: () => void;
|
||||
const conversationGate = new Promise<void>((resolve) => {
|
||||
releaseConversationTurn = resolve;
|
||||
});
|
||||
|
||||
replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => {
|
||||
await opts?.onReplyStart?.();
|
||||
const body = String(ctx.CommandBody ?? ctx.Body ?? "");
|
||||
startedBodies.push(body);
|
||||
if (body.includes("hello there")) {
|
||||
await conversationGate;
|
||||
}
|
||||
return { text: `reply:${body}` };
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const messageHandler = getOnHandler("message") as (
|
||||
ctx: TelegramMiddlewareTestContext,
|
||||
) => Promise<void>;
|
||||
const statusHandler = commandSpy.mock.calls.find((call) => call[0] === "status")?.[1] as
|
||||
| ((ctx: TelegramMiddlewareTestContext) => Promise<void>)
|
||||
| undefined;
|
||||
expect(statusHandler).toBeDefined();
|
||||
if (!statusHandler) {
|
||||
return;
|
||||
}
|
||||
|
||||
const busyCtx = {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "hello there" }),
|
||||
message: {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "hello there" }).message,
|
||||
message_id: 101,
|
||||
},
|
||||
update: { update_id: 101 },
|
||||
};
|
||||
const statusCtx = {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "/status" }),
|
||||
message: {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "/status" }).message,
|
||||
message_id: 102,
|
||||
},
|
||||
update: { update_id: 102 },
|
||||
match: "",
|
||||
};
|
||||
|
||||
const busyPromise = runTelegramMiddlewareChain({
|
||||
ctx: busyCtx,
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(startedBodies[0]).toContain("hello there");
|
||||
});
|
||||
|
||||
const statusPromise = runTelegramMiddlewareChain({
|
||||
ctx: statusCtx,
|
||||
finalHandler: statusHandler,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(startedBodies).toHaveLength(2);
|
||||
expect(startedBodies[0]).toContain("hello there");
|
||||
expect(startedBodies[1]).toBe("/status");
|
||||
expect(sendMessageSpy).toHaveBeenCalledTimes(1);
|
||||
expect(sendMessageSpy.mock.calls[0]?.[1]).toContain("reply:/status");
|
||||
});
|
||||
|
||||
await statusPromise;
|
||||
|
||||
releaseConversationTurn();
|
||||
await busyPromise;
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(sendMessageSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
const sentBodies = sendMessageSpy.mock.calls.map((call) => String(call[1]));
|
||||
expect(sentBodies[0]).toContain("reply:/status");
|
||||
expect(sentBodies[1]).toContain("hello there");
|
||||
});
|
||||
|
||||
it("keeps ordinary Telegram messages serialized within the same topic", async () => {
|
||||
installPerKeySequentializer();
|
||||
loadConfig.mockReturnValue({
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
allowFrom: ["*"],
|
||||
groups: { "*": { requireMention: false } },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const startedBodies: string[] = [];
|
||||
let releaseFirstTurn!: () => void;
|
||||
const firstTurnGate = new Promise<void>((resolve) => {
|
||||
releaseFirstTurn = resolve;
|
||||
});
|
||||
|
||||
replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => {
|
||||
await opts?.onReplyStart?.();
|
||||
const body = String(ctx.Body ?? "");
|
||||
startedBodies.push(body);
|
||||
if (body.includes("first message")) {
|
||||
await firstTurnGate;
|
||||
}
|
||||
return { text: `reply:${body}` };
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const messageHandler = getOnHandler("message") as (
|
||||
ctx: TelegramMiddlewareTestContext,
|
||||
) => Promise<void>;
|
||||
|
||||
const firstCtx = {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "first message" }),
|
||||
message: {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "first message" }).message,
|
||||
message_id: 201,
|
||||
},
|
||||
update: { update_id: 201 },
|
||||
};
|
||||
const secondCtx = {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "second message" }),
|
||||
message: {
|
||||
...makeForumGroupMessageCtx({ threadId: 99, text: "second message" }).message,
|
||||
message_id: 202,
|
||||
},
|
||||
update: { update_id: 202 },
|
||||
};
|
||||
|
||||
const firstPromise = runTelegramMiddlewareChain({
|
||||
ctx: firstCtx,
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(startedBodies[0]).toContain("first message");
|
||||
});
|
||||
|
||||
const secondPromise = runTelegramMiddlewareChain({
|
||||
ctx: secondCtx,
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(startedBodies[0]).toContain("first message");
|
||||
expect(sendMessageSpy).not.toHaveBeenCalled();
|
||||
|
||||
releaseFirstTurn();
|
||||
await Promise.all([firstPromise, secondPromise]);
|
||||
|
||||
expect(startedBodies).toHaveLength(2);
|
||||
expect(startedBodies[0]).toContain("first message");
|
||||
expect(startedBodies[1]).toContain("second message");
|
||||
const sentBodies = sendMessageSpy.mock.calls.map((call) => String(call[1]));
|
||||
expect(sentBodies[0]).toContain("first message");
|
||||
expect(sentBodies[1]).toContain("second message");
|
||||
});
|
||||
|
||||
it("preserves same-chat reply order when a debounced run is still active", async () => {
|
||||
const DEBOUNCE_MS = 4321;
|
||||
loadConfig.mockReturnValue({
|
||||
|
||||
@@ -59,7 +59,39 @@ describe("getTelegramSequentialKey", () => {
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/status" }) }, "telegram:123"],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/status" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/commands" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/help" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/tools" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/tasks" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/context" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/whoami" }) },
|
||||
"telegram:123:control",
|
||||
],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/export-session" }) },
|
||||
"telegram:123",
|
||||
],
|
||||
[{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/export" }) }, "telegram:123"],
|
||||
[
|
||||
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/btw what is the time?" }) },
|
||||
"telegram:123:btw:1",
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
import { type Message, type UserFromGetMe } from "@grammyjs/types";
|
||||
import {
|
||||
listChatCommands,
|
||||
maybeResolveTextAlias,
|
||||
normalizeCommandBody,
|
||||
} from "openclaw/plugin-sdk/command-auth";
|
||||
import { parseExecApprovalCommandText } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { isAbortRequestText } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { isBtwRequestText } from "openclaw/plugin-sdk/reply-runtime";
|
||||
@@ -20,6 +25,27 @@ export type TelegramSequentialKeyContext = {
|
||||
};
|
||||
};
|
||||
|
||||
function resolveStatusCommandControlLane(params: {
|
||||
rawText?: string;
|
||||
botUsername?: string;
|
||||
}): boolean {
|
||||
// Only read-only status commands should bypass the per-topic lane. Commands
|
||||
// like /export-session stay on the normal lane because they materialize
|
||||
// session state to disk and should not interleave with an active turn.
|
||||
const normalizedBody = normalizeCommandBody(
|
||||
params.rawText?.trim() ?? "",
|
||||
params.botUsername ? { botUsername: params.botUsername } : undefined,
|
||||
);
|
||||
const alias = maybeResolveTextAlias(normalizedBody);
|
||||
if (!alias) {
|
||||
return false;
|
||||
}
|
||||
const command = listChatCommands().find((entry) =>
|
||||
entry.textAliases.some((candidate) => candidate.trim().toLowerCase() === alias),
|
||||
);
|
||||
return command?.category === "status" && command.key !== "export-session";
|
||||
}
|
||||
|
||||
export function getTelegramSequentialKey(ctx: TelegramSequentialKeyContext): string {
|
||||
const reaction = ctx.update?.message_reaction;
|
||||
if (reaction?.chat?.id) {
|
||||
@@ -43,6 +69,12 @@ export function getTelegramSequentialKey(ctx: TelegramSequentialKeyContext): str
|
||||
}
|
||||
return "telegram:control";
|
||||
}
|
||||
if (resolveStatusCommandControlLane({ rawText, botUsername })) {
|
||||
if (typeof chatId === "number") {
|
||||
return `telegram:${chatId}:control`;
|
||||
}
|
||||
return "telegram:control";
|
||||
}
|
||||
if (isBtwRequestText(rawText, botUsername ? { botUsername } : undefined)) {
|
||||
const messageId = msg?.message_id;
|
||||
if (typeof chatId === "number" && typeof messageId === "number") {
|
||||
|
||||
Reference in New Issue
Block a user