mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 04:26:16 +00:00
fix(telegram): keep overlapping DM replies deliverable (#85361) (thanks @neeravmakwana)
Behavior addressed: Telegram direct-message turns no longer drop an earlier overlapping normal reply, while authorized aborts and explicit/native/plugin/skill command turns still supersede active reply work. Real environment tested: local OpenClaw focused Telegram test shard plus existing contributor Telegram screenshot/log proof in the PR body. Exact steps or command run after this patch: pnpm test extensions/telegram/src/telegram-reply-fence.test.ts extensions/telegram/src/bot-message-dispatch.test.ts; .agents/skills/autoreview/scripts/autoreview --mode branch --base origin/main Evidence after fix: 2 test files passed, 93 tests passed; final autoreview clean with no accepted/actionable findings. Observed result after fix: overlapping normal Telegram DMs use non-interrupting reply fences and both final replies remain deliverable; direct /stop, authorized built-in commands, and explicit text/native command turns still supersede. What was not tested: fresh live Telegram Desktop rerun by this agent; PR retains contributor screenshot/log proof and the Real behavior proof bot remains red despite proof labels. Thanks @neeravmakwana. Co-authored-by: Neerav Makwana <261249544+neeravmakwana@users.noreply.github.com>
This commit is contained in:
@@ -3050,6 +3050,88 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect(deliveredTexts).toContain("fresh request answer");
|
||||
});
|
||||
|
||||
it("keeps newer DM requests from aborting active same-session dispatch", async () => {
|
||||
let firstStarted: (() => void) | undefined;
|
||||
const firstStartGate = new Promise<void>((resolve) => {
|
||||
firstStarted = resolve;
|
||||
});
|
||||
let releaseFirst: (() => void) | undefined;
|
||||
const firstGate = new Promise<void>((resolve) => {
|
||||
releaseFirst = resolve;
|
||||
});
|
||||
let secondStarted: (() => void) | undefined;
|
||||
const secondStartGate = new Promise<void>((resolve) => {
|
||||
secondStarted = resolve;
|
||||
});
|
||||
let firstAbortSignal: AbortSignal | undefined;
|
||||
dispatchReplyWithBufferedBlockDispatcher
|
||||
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
|
||||
firstAbortSignal = replyOptions?.abortSignal;
|
||||
firstStarted?.();
|
||||
await firstGate;
|
||||
await dispatcherOptions.deliver({ text: "earlier DM answer" }, { kind: "final" });
|
||||
return {
|
||||
queuedFinal: true,
|
||||
counts: { block: 0, final: 1, tool: 0 },
|
||||
};
|
||||
})
|
||||
.mockImplementationOnce(async ({ dispatcherOptions }) => {
|
||||
secondStarted?.();
|
||||
await dispatcherOptions.deliver({ text: "fresh DM answer" }, { kind: "final" });
|
||||
return {
|
||||
queuedFinal: true,
|
||||
counts: { block: 0, final: 1, tool: 0 },
|
||||
};
|
||||
});
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
const createDirectContext = (messageId: number, body: string) =>
|
||||
createContext({
|
||||
ctxPayload: {
|
||||
SessionKey: "agent:main:main",
|
||||
ChatType: "direct",
|
||||
MessageSid: String(messageId),
|
||||
RawBody: body,
|
||||
BodyForAgent: body,
|
||||
CommandBody: body,
|
||||
CommandAuthorized: true,
|
||||
} as unknown as TelegramMessageContext["ctxPayload"],
|
||||
msg: {
|
||||
chat: { id: 123, type: "private" },
|
||||
message_id: messageId,
|
||||
} as unknown as TelegramMessageContext["msg"],
|
||||
chatId: 123,
|
||||
isGroup: false,
|
||||
historyKey: "telegram:123",
|
||||
historyLimit: 10,
|
||||
groupHistories: new Map(),
|
||||
threadSpec: { id: undefined, scope: "none" },
|
||||
});
|
||||
|
||||
const firstPromise = dispatchWithContext({
|
||||
context: createDirectContext(99, "first request"),
|
||||
streamMode: "off",
|
||||
});
|
||||
await firstStartGate;
|
||||
const secondPromise = dispatchWithContext({
|
||||
context: createDirectContext(100, "second request"),
|
||||
streamMode: "off",
|
||||
});
|
||||
await secondStartGate;
|
||||
|
||||
expect(firstAbortSignal?.aborted).toBe(false);
|
||||
releaseFirst?.();
|
||||
await Promise.all([firstPromise, secondPromise]);
|
||||
|
||||
const deliveredTexts = deliverReplies.mock.calls.flatMap((call) =>
|
||||
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
|
||||
(reply) => reply.text,
|
||||
),
|
||||
);
|
||||
expect(deliveredTexts).toContain("fresh DM answer");
|
||||
expect(deliveredTexts).toContain("earlier DM answer");
|
||||
});
|
||||
|
||||
it("keeps /btw side questions from aborting an active same-session dispatch", async () => {
|
||||
const historyKey = "telegram:group:-100123";
|
||||
const groupHistories = new Map([[historyKey, []]]);
|
||||
|
||||
@@ -55,6 +55,58 @@ describe("shouldSupersedeTelegramReplyFence", () => {
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps normal direct turns deliverable while preserving direct aborts", () => {
|
||||
expect(
|
||||
shouldSupersedeTelegramReplyFence({
|
||||
ChatType: "direct",
|
||||
CommandBody: "answer this",
|
||||
CommandAuthorized: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSupersedeTelegramReplyFence({
|
||||
ChatType: "direct",
|
||||
CommandBody: "/stop",
|
||||
CommandAuthorized: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSupersedeTelegramReplyFence({
|
||||
ChatType: "direct",
|
||||
CommandBody: "/diagnostics confirm abc123def456",
|
||||
CommandAuthorized: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSupersedeTelegramReplyFence({
|
||||
ChatType: "direct",
|
||||
CommandBody: "/diagnostics confirm abc123def456",
|
||||
CommandAuthorized: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSupersedeTelegramReplyFence({
|
||||
ChatType: "direct",
|
||||
CommandBody: "/var/log error",
|
||||
CommandAuthorized: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSupersedeTelegramReplyFence({
|
||||
ChatType: "direct",
|
||||
CommandBody: "/plugin_command",
|
||||
CommandAuthorized: true,
|
||||
CommandTurn: {
|
||||
kind: "text-slash",
|
||||
source: "text",
|
||||
authorized: true,
|
||||
commandName: "plugin_command",
|
||||
body: "/plugin_command",
|
||||
},
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("telegram reply fence supersede", () => {
|
||||
|
||||
@@ -1,3 +1,11 @@
|
||||
import {
|
||||
isExplicitCommandTurn,
|
||||
type CommandTurnContext,
|
||||
} from "openclaw/plugin-sdk/channel-inbound";
|
||||
import {
|
||||
maybeResolveTextAlias,
|
||||
normalizeCommandBody,
|
||||
} from "openclaw/plugin-sdk/command-auth-native";
|
||||
import {
|
||||
isAbortRequestText,
|
||||
isBtwRequestText,
|
||||
@@ -190,11 +198,17 @@ export function releaseTelegramReplyFenceAbortController(
|
||||
maybeDeleteTelegramReplyFenceState(key, state);
|
||||
}
|
||||
|
||||
function isRecognizedTelegramTextCommand(rawText: string): boolean {
|
||||
return maybeResolveTextAlias(normalizeCommandBody(rawText)) != null;
|
||||
}
|
||||
|
||||
export function shouldSupersedeTelegramReplyFence(ctxPayload: {
|
||||
Body?: string;
|
||||
ChatType?: string;
|
||||
RawBody?: string;
|
||||
CommandBody?: string;
|
||||
CommandAuthorized: boolean;
|
||||
CommandTurn?: CommandTurnContext;
|
||||
}): boolean {
|
||||
const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "";
|
||||
if (isAbortRequestText(dispatchText)) {
|
||||
@@ -206,6 +220,16 @@ export function shouldSupersedeTelegramReplyFence(ctxPayload: {
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
if (ctxPayload.ChatType === "direct") {
|
||||
if (
|
||||
ctxPayload.CommandAuthorized &&
|
||||
(isExplicitCommandTurn(ctxPayload.CommandTurn) ||
|
||||
isRecognizedTelegramTextCommand(dispatchText))
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user