From 387668263559c7efde3c322e59d32bf321e8e251 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 22:30:02 +0100 Subject: [PATCH] refactor(channels): centralize route normalization --- src/agents/subagent-announce-origin.ts | 106 ++---------- src/auto-reply/reply/reply-payloads-dedupe.ts | 10 +- src/channels/plugins/target-parsing-loaded.ts | 42 +++-- src/channels/plugins/target-parsing.test.ts | 19 ++ src/channels/route/ref.test.ts | 93 ++++++++++ src/channels/route/ref.ts | 162 ++++++++++++++++++ src/utils/delivery-context.shared.ts | 57 +++--- src/utils/delivery-context.test.ts | 3 + 8 files changed, 347 insertions(+), 145 deletions(-) create mode 100644 src/channels/route/ref.test.ts create mode 100644 src/channels/route/ref.ts diff --git a/src/agents/subagent-announce-origin.ts b/src/agents/subagent-announce-origin.ts index acdaeb1183b..ecb8bfd0e73 100644 --- a/src/agents/subagent-announce-origin.ts +++ b/src/agents/subagent-announce-origin.ts @@ -1,98 +1,16 @@ import { resolveComparableTargetForLoadedChannel } from "../channels/plugins/target-parsing-loaded.js"; +import { normalizeOptionalString } from "../shared/string-coerce.js"; import { - normalizeOptionalLowercaseString, - normalizeOptionalString, - normalizeOptionalThreadValue, -} from "../shared/string-coerce.js"; + deliveryContextFromSession, + mergeDeliveryContext, + normalizeDeliveryContext, +} from "../utils/delivery-context.shared.js"; +import type { + DeliveryContext, + DeliveryContextSessionSource, +} from "../utils/delivery-context.types.js"; import { isInternalMessageChannel } from "../utils/message-channel.js"; - -export type DeliveryContext = { - channel?: string; - to?: string; - accountId?: string; - threadId?: string | number; -}; - -type DeliveryContextSource = { - channel?: string; - lastChannel?: string; - lastTo?: string; - lastAccountId?: string; - lastThreadId?: string | number; - origin?: { - provider?: string; - accountId?: string; - threadId?: string | number; - }; - deliveryContext?: DeliveryContext; -}; - -function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { - if (!context) { - return undefined; - } - const normalized: DeliveryContext = { - channel: normalizeOptionalLowercaseString(context.channel), - to: normalizeOptionalString(context.to), - accountId: normalizeOptionalString(context.accountId), - }; - const threadId = normalizeOptionalThreadValue(context.threadId); - if (threadId != null) { - normalized.threadId = threadId; - } - if ( - !normalized.channel && - !normalized.to && - !normalized.accountId && - normalized.threadId == null - ) { - return undefined; - } - return normalized; -} - -function mergeDeliveryContext( - primary?: DeliveryContext, - fallback?: DeliveryContext, -): DeliveryContext | undefined { - const normalizedPrimary = normalizeDeliveryContext(primary); - const normalizedFallback = normalizeDeliveryContext(fallback); - if (!normalizedPrimary && !normalizedFallback) { - return undefined; - } - const channelsConflict = - normalizedPrimary?.channel && - normalizedFallback?.channel && - normalizedPrimary.channel !== normalizedFallback.channel; - return normalizeDeliveryContext({ - channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, - to: channelsConflict - ? normalizedPrimary?.to - : (normalizedPrimary?.to ?? normalizedFallback?.to), - accountId: channelsConflict - ? normalizedPrimary?.accountId - : (normalizedPrimary?.accountId ?? normalizedFallback?.accountId), - threadId: channelsConflict - ? normalizedPrimary?.threadId - : (normalizedPrimary?.threadId ?? normalizedFallback?.threadId), - }); -} - -function deliveryContextFromSession(entry?: DeliveryContextSource): DeliveryContext | undefined { - if (!entry) { - return undefined; - } - return normalizeDeliveryContext({ - channel: - entry.deliveryContext?.channel ?? - entry.lastChannel ?? - entry.channel ?? - entry.origin?.provider, - to: entry.deliveryContext?.to ?? entry.lastTo, - accountId: entry.deliveryContext?.accountId ?? entry.lastAccountId ?? entry.origin?.accountId, - threadId: entry.deliveryContext?.threadId ?? entry.lastThreadId ?? entry.origin?.threadId, - }); -} +export type { DeliveryContext } from "../utils/delivery-context.types.js"; function stripThreadRouteSuffix(target: string): string { return /^(.*):topic:[^:]+$/u.exec(target)?.[1] ?? target; @@ -103,7 +21,7 @@ function normalizeAnnounceRouteTarget(context?: DeliveryContext): string | undef if (!rawTo) { return undefined; } - const channel = normalizeOptionalLowercaseString(context?.channel); + const channel = normalizeOptionalString(context?.channel); const parsed = channel ? resolveComparableTargetForLoadedChannel({ channel, @@ -141,7 +59,7 @@ function shouldStripThreadFromAnnounceEntry( } export function resolveAnnounceOrigin( - entry?: DeliveryContextSource, + entry?: DeliveryContextSessionSource, requesterOrigin?: DeliveryContext, ): DeliveryContext | undefined { const normalizedRequester = normalizeDeliveryContext(requesterOrigin); diff --git a/src/auto-reply/reply/reply-payloads-dedupe.ts b/src/auto-reply/reply/reply-payloads-dedupe.ts index 69fbfe7d3c2..df148aba9de 100644 --- a/src/auto-reply/reply/reply-payloads-dedupe.ts +++ b/src/auto-reply/reply/reply-payloads-dedupe.ts @@ -2,6 +2,7 @@ import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js"; import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js"; import { getChannelPlugin } from "../../channels/plugins/index.js"; import { normalizeAnyChannelId } from "../../channels/registry.js"; +import { stringifyRouteThreadId } from "../../channels/route/ref.js"; import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js"; import { normalizeOptionalAccountId } from "../../routing/account-id.js"; import { @@ -83,14 +84,11 @@ function normalizeProviderForComparison(value?: string): string | undefined { } function normalizeThreadIdForComparison(value?: string): string | undefined { - const trimmed = normalizeOptionalString(value); - if (!trimmed) { + const normalized = stringifyRouteThreadId(value); + if (!normalized) { return undefined; } - if (/^-?\d+$/.test(trimmed)) { - return String(Number.parseInt(trimmed, 10)); - } - return normalizeLowercaseStringOrEmpty(trimmed); + return /^-?\d+$/.test(normalized) ? String(Number.parseInt(normalized, 10)) : normalized; } function resolveTargetProviderForComparison(params: { diff --git a/src/channels/plugins/target-parsing-loaded.ts b/src/channels/plugins/target-parsing-loaded.ts index 1a5b303f2e6..c91fbe849bb 100644 --- a/src/channels/plugins/target-parsing-loaded.ts +++ b/src/channels/plugins/target-parsing-loaded.ts @@ -3,6 +3,11 @@ import { normalizeOptionalThreadValue, } from "../../shared/string-coerce.js"; import type { ChatType } from "../chat-type.js"; +import { + channelRoutesMatchExact, + channelRoutesShareConversation, + normalizeChannelRouteRef, +} from "../route/ref.js"; import { getLoadedChannelPluginForRead } from "./registry-loaded-read.js"; export type ParsedChannelExplicitTarget = { @@ -56,28 +61,29 @@ export function comparableChannelTargetsMatch(params: { left?: ComparableChannelTarget | null; right?: ComparableChannelTarget | null; }): boolean { - const left = params.left; - const right = params.right; - if (!left || !right) { - return false; - } - return left.to === right.to && left.threadId === right.threadId; + return channelRoutesMatchExact({ + left: targetToRoute(params.left), + right: targetToRoute(params.right), + }); } export function comparableChannelTargetsShareRoute(params: { left?: ComparableChannelTarget | null; right?: ComparableChannelTarget | null; }): boolean { - const left = params.left; - const right = params.right; - if (!left || !right) { - return false; - } - if (left.to !== right.to) { - return false; - } - if (left.threadId == null || right.threadId == null) { - return true; - } - return left.threadId === right.threadId; + return channelRoutesShareConversation({ + left: targetToRoute(params.left), + right: targetToRoute(params.right), + }); +} + +function targetToRoute(target?: ComparableChannelTarget | null) { + return target + ? normalizeChannelRouteRef({ + to: target.to, + rawTo: target.rawTo, + threadId: target.threadId, + chatType: target.chatType, + }) + : undefined; } diff --git a/src/channels/plugins/target-parsing.test.ts b/src/channels/plugins/target-parsing.test.ts index 3d8a6714402..269a772dd72 100644 --- a/src/channels/plugins/target-parsing.test.ts +++ b/src/channels/plugins/target-parsing.test.ts @@ -173,4 +173,23 @@ describe("parseExplicitTargetForChannel", () => { }), ).toBe(true); }); + + it("compares numeric and string thread ids through the shared route contract", () => { + const numericThread = resolveComparableTargetForChannel({ + channel: "mock-threaded", + rawTarget: "threaded:room-a:topic:77", + }); + const stringThread = resolveComparableTargetForChannel({ + channel: "mock-threaded", + rawTarget: "room-a", + fallbackThreadId: "77", + }); + + expect( + comparableChannelTargetsMatch({ + left: numericThread, + right: stringThread, + }), + ).toBe(true); + }); }); diff --git a/src/channels/route/ref.test.ts b/src/channels/route/ref.test.ts new file mode 100644 index 00000000000..db3d30e1388 --- /dev/null +++ b/src/channels/route/ref.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it } from "vitest"; +import { + channelRouteKey, + channelRoutesMatchExact, + channelRoutesShareConversation, + normalizeChannelRouteRef, + stringifyRouteThreadId, +} from "./ref.js"; + +describe("channel route refs", () => { + it("normalizes target, account, and thread fields", () => { + expect( + normalizeChannelRouteRef({ + channel: " Slack ", + accountId: " Work ", + rawTo: " channel:C1 ", + to: " C1 ", + threadId: " 171234.567 ", + }), + ).toEqual({ + channel: "slack", + accountId: "work", + target: { + rawTo: "channel:C1", + to: "C1", + }, + thread: { + id: "171234.567", + }, + }); + }); + + it("normalizes numeric thread ids for route keys", () => { + const route = normalizeChannelRouteRef({ + channel: "telegram", + to: "-100123", + threadId: 42.9, + }); + + expect(stringifyRouteThreadId(route?.thread?.id)).toBe("42"); + expect(channelRouteKey(route)).toBe("telegram|-100123||42"); + }); + + it("matches exact routes when numeric and string thread ids are equivalent", () => { + expect( + channelRoutesMatchExact({ + left: normalizeChannelRouteRef({ + channel: "telegram", + to: "-100123", + threadId: 42, + }), + right: normalizeChannelRouteRef({ + channel: "telegram", + to: "-100123", + threadId: "42", + }), + }), + ).toBe(true); + }); + + it("shares conversation when one side is the parent route", () => { + expect( + channelRoutesShareConversation({ + left: normalizeChannelRouteRef({ + channel: "slack", + to: "channel:C1", + threadId: "171234.567", + }), + right: normalizeChannelRouteRef({ + channel: "slack", + to: "channel:C1", + }), + }), + ).toBe(true); + }); + + it("does not share different child threads", () => { + expect( + channelRoutesShareConversation({ + left: normalizeChannelRouteRef({ + channel: "matrix", + to: "room:!abc:example.org", + threadId: "$root-1", + }), + right: normalizeChannelRouteRef({ + channel: "matrix", + to: "room:!abc:example.org", + threadId: "$root-2", + }), + }), + ).toBe(false); + }); +}); diff --git a/src/channels/route/ref.ts b/src/channels/route/ref.ts new file mode 100644 index 00000000000..d61d5382a90 --- /dev/null +++ b/src/channels/route/ref.ts @@ -0,0 +1,162 @@ +import { normalizeOptionalAccountId } from "../../routing/account-id.js"; +import { + normalizeLowercaseStringOrEmpty, + normalizeOptionalString, + normalizeOptionalThreadValue, +} from "../../shared/string-coerce.js"; +import type { ChatType } from "../chat-type.js"; + +export type ChannelRouteThreadKind = "topic" | "thread" | "reply"; + +export type ChannelRouteThreadSource = "explicit" | "target" | "session" | "turn"; + +export type ChannelRouteRef = { + channel?: string; + accountId?: string; + target?: { + to: string; + rawTo?: string; + chatType?: ChatType; + }; + thread?: { + id: string | number; + kind?: ChannelRouteThreadKind; + source?: ChannelRouteThreadSource; + }; +}; + +export type ChannelRouteRefInput = { + channel?: unknown; + accountId?: unknown; + to?: unknown; + rawTo?: unknown; + chatType?: ChatType; + threadId?: unknown; + threadKind?: ChannelRouteThreadKind; + threadSource?: ChannelRouteThreadSource; +}; + +export function normalizeRouteThreadId(value: unknown): string | number | undefined { + return normalizeOptionalThreadValue(value); +} + +export function stringifyRouteThreadId(value: unknown): string | undefined { + const normalized = normalizeRouteThreadId(value); + return normalized == null ? undefined : String(normalized); +} + +export function normalizeChannelRouteRef( + input?: ChannelRouteRefInput, +): ChannelRouteRef | undefined { + if (!input) { + return undefined; + } + const channel = normalizeLowercaseStringOrEmpty(input.channel); + const accountId = + typeof input.accountId === "string" ? normalizeOptionalAccountId(input.accountId) : undefined; + const to = normalizeOptionalString(input.to); + const rawTo = normalizeOptionalString(input.rawTo); + const threadId = normalizeRouteThreadId(input.threadId); + if (!channel && !to && !accountId && threadId == null) { + return undefined; + } + return { + ...(channel ? { channel } : {}), + ...(accountId ? { accountId } : {}), + ...(to + ? { + target: { + to, + ...(rawTo && rawTo !== to ? { rawTo } : {}), + ...(input.chatType ? { chatType: input.chatType } : {}), + }, + } + : {}), + ...(threadId != null + ? { + thread: { + id: threadId, + ...(input.threadKind ? { kind: input.threadKind } : {}), + ...(input.threadSource ? { source: input.threadSource } : {}), + }, + } + : {}), + }; +} + +export function channelRouteTarget(route?: ChannelRouteRef): string | undefined { + return route?.target?.to; +} + +export function channelRouteThreadId(route?: ChannelRouteRef): string | number | undefined { + return route?.thread?.id; +} + +function threadIdsEqual(left?: string | number, right?: string | number): boolean { + const normalizedLeft = stringifyRouteThreadId(left); + const normalizedRight = stringifyRouteThreadId(right); + return normalizedLeft === normalizedRight; +} + +function accountsCompatible(left?: string, right?: string): boolean { + return !left || !right || left === right; +} + +export function channelRoutesMatchExact(params: { + left?: ChannelRouteRef | null; + right?: ChannelRouteRef | null; +}): boolean { + const { left, right } = params; + if (!left || !right) { + return false; + } + return ( + left.channel === right.channel && + left.accountId === right.accountId && + channelRouteTarget(left) === channelRouteTarget(right) && + threadIdsEqual(channelRouteThreadId(left), channelRouteThreadId(right)) + ); +} + +export function channelRoutesShareConversation(params: { + left?: ChannelRouteRef | null; + right?: ChannelRouteRef | null; +}): boolean { + const { left, right } = params; + if (!left || !right) { + return false; + } + if (left.channel && right.channel && left.channel !== right.channel) { + return false; + } + if (!accountsCompatible(left.accountId, right.accountId)) { + return false; + } + if (channelRouteTarget(left) !== channelRouteTarget(right)) { + return false; + } + const leftThreadId = channelRouteThreadId(left); + const rightThreadId = channelRouteThreadId(right); + if (leftThreadId == null || rightThreadId == null) { + return true; + } + return threadIdsEqual(leftThreadId, rightThreadId); +} + +export function channelRouteKey(route?: ChannelRouteRef): string | undefined { + const normalized = normalizeChannelRouteRef({ + channel: route?.channel, + accountId: route?.accountId, + to: route?.target?.to, + threadId: route?.thread?.id, + }); + if (!normalized?.channel || !normalized.target?.to) { + return undefined; + } + return [ + normalized.channel, + normalized.target.to, + normalized.accountId ?? "", + stringifyRouteThreadId(normalized.thread?.id) ?? "", + ].join("|"); +} diff --git a/src/utils/delivery-context.shared.ts b/src/utils/delivery-context.shared.ts index 7a29712b593..a22a15977f5 100644 --- a/src/utils/delivery-context.shared.ts +++ b/src/utils/delivery-context.shared.ts @@ -1,4 +1,9 @@ -import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { + channelRouteKey, + channelRouteThreadId, + channelRouteTarget, + normalizeChannelRouteRef, +} from "../channels/route/ref.js"; import { normalizeAccountId } from "./account-id.js"; import type { DeliveryContext, DeliveryContextSessionSource } from "./delivery-context.types.js"; import { normalizeMessageChannel } from "./message-channel-core.js"; @@ -8,30 +13,26 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon if (!context) { return undefined; } - const channel = - typeof context.channel === "string" - ? (normalizeMessageChannel(context.channel) ?? context.channel.trim()) - : undefined; - const to = normalizeOptionalString(context.to); - const accountId = normalizeAccountId(context.accountId); - const threadId = - typeof context.threadId === "number" && Number.isFinite(context.threadId) - ? Math.trunc(context.threadId) - : typeof context.threadId === "string" - ? normalizeOptionalString(context.threadId) - : undefined; - const normalizedThreadId = - typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId; - if (!channel && !to && !accountId && normalizedThreadId == null) { + const route = normalizeChannelRouteRef({ + channel: + typeof context.channel === "string" + ? (normalizeMessageChannel(context.channel) ?? context.channel.trim()) + : undefined, + to: context.to, + accountId: context.accountId, + threadId: context.threadId, + }); + if (!route) { return undefined; } const normalized: DeliveryContext = { - channel: channel || undefined, - to: to || undefined, - accountId, + channel: route.channel, + to: channelRouteTarget(route), + accountId: normalizeAccountId(route.accountId), }; - if (normalizedThreadId != null) { - normalized.threadId = normalizedThreadId; + const threadId = channelRouteThreadId(route); + if (threadId != null) { + normalized.threadId = threadId; } return normalized; } @@ -131,10 +132,12 @@ export function mergeDeliveryContext( export function deliveryContextKey(context?: DeliveryContext): string | undefined { const normalized = normalizeDeliveryContext(context); - if (!normalized?.channel || !normalized?.to) { - return undefined; - } - const threadId = - normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : ""; - return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`; + return channelRouteKey( + normalizeChannelRouteRef({ + channel: normalized?.channel, + to: normalized?.to, + accountId: normalized?.accountId, + threadId: normalized?.threadId, + }), + ); } diff --git a/src/utils/delivery-context.test.ts b/src/utils/delivery-context.test.ts index 93616236f48..7d155f6f0d5 100644 --- a/src/utils/delivery-context.test.ts +++ b/src/utils/delivery-context.test.ts @@ -137,6 +137,9 @@ describe("delivery context helpers", () => { expect( deliveryContextKey({ channel: "demo-channel", to: "channel:C1", threadId: "123.456" }), ).toBe("demo-channel|channel:C1||123.456"); + expect(deliveryContextKey({ channel: "telegram", to: "-100123", threadId: 42.9 })).toBe( + "telegram|-100123||42", + ); }); it("formats generic fallback conversation targets as channels", () => {