From 26e76f9a6149c78a62bcd1531ad58894e1677ea7 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 9 Mar 2026 09:31:05 +0530 Subject: [PATCH] fix: dedupe inbound Telegram DM replies per agent (#40519) Merged via squash. Prepared head SHA: 6e235e7d1f7a00ef43455a9240b62e24dbc4ef94 Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus --- CHANGELOG.md | 1 + src/auto-reply/inbound.test.ts | 28 ++++++++++++++-- .../reply/dispatch-from-config.test.ts | 32 +++++++++++++++++++ src/auto-reply/reply/inbound-dedupe.ts | 22 +++++++++++-- 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1832db6255..1cbe70540ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai - Docs/Changelog: correct the contributor credit for the bundled Control UI global-install fix to @LarytheLord. (#40420) Thanks @velvet-shark. - Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii. - Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung. +- Telegram/DM routing: dedupe inbound Telegram DMs per agent instead of per session key so the same DM cannot trigger duplicate replies when both `agent:main:main` and `agent:main:telegram:direct:` resolve for one agent. Fixes #40005. Supersedes #40116. (#40519) thanks @obviyus. ## 2026.3.7 diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index f602c7dca60..4d624ecabd1 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -236,7 +236,7 @@ describe("inbound dedupe", () => { ).toBe(false); }); - it("does not dedupe across session keys", () => { + it("does not dedupe across agent ids", () => { resetInboundDedupe(); const base: MsgContext = { Provider: "whatsapp", @@ -248,12 +248,36 @@ describe("inbound dedupe", () => { shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 100 }), ).toBe(false); expect( - shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:bravo:main" }, { now: 200 }), + shouldSkipDuplicateInbound( + { ...base, SessionKey: "agent:bravo:whatsapp:direct:+1555" }, + { + now: 200, + }, + ), ).toBe(false); expect( shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 300 }), ).toBe(true); }); + + it("dedupes when the same agent sees the same inbound message under different session keys", () => { + resetInboundDedupe(); + const base: MsgContext = { + Provider: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:7463849194", + MessageSid: "msg-1", + }; + expect( + shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:main:main" }, { now: 100 }), + ).toBe(false); + expect( + shouldSkipDuplicateInbound( + { ...base, SessionKey: "agent:main:telegram:direct:7463849194" }, + { now: 200 }, + ), + ).toBe(true); + }); }); describe("createInboundDebouncer", () => { diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index cb71c9b09ba..982557ecb68 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -1539,6 +1539,38 @@ describe("dispatchReplyFromConfig", () => { expect(replyResolver).toHaveBeenCalledTimes(1); }); + it("deduplicates same-agent inbound replies across main and direct session keys", async () => { + setNoAbort(); + const cfg = emptyConfig; + const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); + const baseCtx = buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:7463849194", + MessageSid: "msg-1", + SessionKey: "agent:main:main", + }); + + await dispatchReplyFromConfig({ + ctx: baseCtx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + await dispatchReplyFromConfig({ + ctx: { + ...baseCtx, + SessionKey: "agent:main:telegram:direct:7463849194", + }, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + }); + it("emits message_received hook with originating channel metadata", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockReturnValue(true); diff --git a/src/auto-reply/reply/inbound-dedupe.ts b/src/auto-reply/reply/inbound-dedupe.ts index 191e4c4f478..0e4740261b9 100644 --- a/src/auto-reply/reply/inbound-dedupe.ts +++ b/src/auto-reply/reply/inbound-dedupe.ts @@ -1,5 +1,6 @@ import { logVerbose, shouldLogVerbose } from "../../globals.js"; import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js"; +import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; import type { MsgContext } from "../templating.js"; const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000; @@ -15,6 +16,23 @@ const normalizeProvider = (value?: string | null) => value?.trim().toLowerCase() const resolveInboundPeerId = (ctx: MsgContext) => ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey; +function resolveInboundDedupeSessionScope(ctx: MsgContext): string { + const sessionKey = + (ctx.CommandSource === "native" ? ctx.CommandTargetSessionKey : undefined)?.trim() || + ctx.SessionKey?.trim() || + ""; + if (!sessionKey) { + return ""; + } + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed) { + return sessionKey; + } + // The same physical inbound message should never run twice for the same + // agent, even if a routing bug presents it under both main and direct keys. + return `agent:${parsed.agentId}`; +} + export function buildInboundDedupeKey(ctx: MsgContext): string | null { const provider = normalizeProvider(ctx.OriginatingChannel ?? ctx.Provider ?? ctx.Surface); const messageId = ctx.MessageSid?.trim(); @@ -25,13 +43,13 @@ export function buildInboundDedupeKey(ctx: MsgContext): string | null { if (!peerId) { return null; } - const sessionKey = ctx.SessionKey?.trim() ?? ""; + const sessionScope = resolveInboundDedupeSessionScope(ctx); const accountId = ctx.AccountId?.trim() ?? ""; const threadId = ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null ? String(ctx.MessageThreadId) : ""; - return [provider, accountId, sessionKey, peerId, threadId, messageId].filter(Boolean).join("|"); + return [provider, accountId, sessionScope, peerId, threadId, messageId].filter(Boolean).join("|"); } export function shouldSkipDuplicateInbound(