refactor(channels): centralize route normalization

This commit is contained in:
Peter Steinberger
2026-04-27 22:30:02 +01:00
parent 0294aebe6f
commit 3876682635
8 changed files with 347 additions and 145 deletions

View File

@@ -1,98 +1,16 @@
import { resolveComparableTargetForLoadedChannel } from "../channels/plugins/target-parsing-loaded.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
normalizeOptionalThreadValue,
} from "../shared/string-coerce.js";
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.shared.js";
import type {
DeliveryContext,
DeliveryContextSessionSource,
} from "../utils/delivery-context.types.js";
import { isInternalMessageChannel } from "../utils/message-channel.js";
export type DeliveryContext = {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
type DeliveryContextSource = {
channel?: string;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
origin?: {
provider?: string;
accountId?: string;
threadId?: string | number;
};
deliveryContext?: DeliveryContext;
};
function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context) {
return undefined;
}
const normalized: DeliveryContext = {
channel: normalizeOptionalLowercaseString(context.channel),
to: normalizeOptionalString(context.to),
accountId: normalizeOptionalString(context.accountId),
};
const threadId = normalizeOptionalThreadValue(context.threadId);
if (threadId != null) {
normalized.threadId = threadId;
}
if (
!normalized.channel &&
!normalized.to &&
!normalized.accountId &&
normalized.threadId == null
) {
return undefined;
}
return normalized;
}
function mergeDeliveryContext(
primary?: DeliveryContext,
fallback?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedPrimary = normalizeDeliveryContext(primary);
const normalizedFallback = normalizeDeliveryContext(fallback);
if (!normalizedPrimary && !normalizedFallback) {
return undefined;
}
const channelsConflict =
normalizedPrimary?.channel &&
normalizedFallback?.channel &&
normalizedPrimary.channel !== normalizedFallback.channel;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
to: channelsConflict
? normalizedPrimary?.to
: (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: channelsConflict
? normalizedPrimary?.accountId
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
threadId: channelsConflict
? normalizedPrimary?.threadId
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
});
}
function deliveryContextFromSession(entry?: DeliveryContextSource): DeliveryContext | undefined {
if (!entry) {
return undefined;
}
return normalizeDeliveryContext({
channel:
entry.deliveryContext?.channel ??
entry.lastChannel ??
entry.channel ??
entry.origin?.provider,
to: entry.deliveryContext?.to ?? entry.lastTo,
accountId: entry.deliveryContext?.accountId ?? entry.lastAccountId ?? entry.origin?.accountId,
threadId: entry.deliveryContext?.threadId ?? entry.lastThreadId ?? entry.origin?.threadId,
});
}
export type { DeliveryContext } from "../utils/delivery-context.types.js";
function stripThreadRouteSuffix(target: string): string {
return /^(.*):topic:[^:]+$/u.exec(target)?.[1] ?? target;
@@ -103,7 +21,7 @@ function normalizeAnnounceRouteTarget(context?: DeliveryContext): string | undef
if (!rawTo) {
return undefined;
}
const channel = normalizeOptionalLowercaseString(context?.channel);
const channel = normalizeOptionalString(context?.channel);
const parsed = channel
? resolveComparableTargetForLoadedChannel({
channel,
@@ -141,7 +59,7 @@ function shouldStripThreadFromAnnounceEntry(
}
export function resolveAnnounceOrigin(
entry?: DeliveryContextSource,
entry?: DeliveryContextSessionSource,
requesterOrigin?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);

View File

@@ -2,6 +2,7 @@ import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js";
import { getChannelPlugin } from "../../channels/plugins/index.js";
import { normalizeAnyChannelId } from "../../channels/registry.js";
import { stringifyRouteThreadId } from "../../channels/route/ref.js";
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
import { normalizeOptionalAccountId } from "../../routing/account-id.js";
import {
@@ -83,14 +84,11 @@ function normalizeProviderForComparison(value?: string): string | undefined {
}
function normalizeThreadIdForComparison(value?: string): string | undefined {
const trimmed = normalizeOptionalString(value);
if (!trimmed) {
const normalized = stringifyRouteThreadId(value);
if (!normalized) {
return undefined;
}
if (/^-?\d+$/.test(trimmed)) {
return String(Number.parseInt(trimmed, 10));
}
return normalizeLowercaseStringOrEmpty(trimmed);
return /^-?\d+$/.test(normalized) ? String(Number.parseInt(normalized, 10)) : normalized;
}
function resolveTargetProviderForComparison(params: {

View File

@@ -3,6 +3,11 @@ import {
normalizeOptionalThreadValue,
} from "../../shared/string-coerce.js";
import type { ChatType } from "../chat-type.js";
import {
channelRoutesMatchExact,
channelRoutesShareConversation,
normalizeChannelRouteRef,
} from "../route/ref.js";
import { getLoadedChannelPluginForRead } from "./registry-loaded-read.js";
export type ParsedChannelExplicitTarget = {
@@ -56,28 +61,29 @@ export function comparableChannelTargetsMatch(params: {
left?: ComparableChannelTarget | null;
right?: ComparableChannelTarget | null;
}): boolean {
const left = params.left;
const right = params.right;
if (!left || !right) {
return false;
}
return left.to === right.to && left.threadId === right.threadId;
return channelRoutesMatchExact({
left: targetToRoute(params.left),
right: targetToRoute(params.right),
});
}
export function comparableChannelTargetsShareRoute(params: {
left?: ComparableChannelTarget | null;
right?: ComparableChannelTarget | null;
}): boolean {
const left = params.left;
const right = params.right;
if (!left || !right) {
return false;
}
if (left.to !== right.to) {
return false;
}
if (left.threadId == null || right.threadId == null) {
return true;
}
return left.threadId === right.threadId;
return channelRoutesShareConversation({
left: targetToRoute(params.left),
right: targetToRoute(params.right),
});
}
function targetToRoute(target?: ComparableChannelTarget | null) {
return target
? normalizeChannelRouteRef({
to: target.to,
rawTo: target.rawTo,
threadId: target.threadId,
chatType: target.chatType,
})
: undefined;
}

View File

@@ -173,4 +173,23 @@ describe("parseExplicitTargetForChannel", () => {
}),
).toBe(true);
});
it("compares numeric and string thread ids through the shared route contract", () => {
const numericThread = resolveComparableTargetForChannel({
channel: "mock-threaded",
rawTarget: "threaded:room-a:topic:77",
});
const stringThread = resolveComparableTargetForChannel({
channel: "mock-threaded",
rawTarget: "room-a",
fallbackThreadId: "77",
});
expect(
comparableChannelTargetsMatch({
left: numericThread,
right: stringThread,
}),
).toBe(true);
});
});

View File

@@ -0,0 +1,93 @@
import { describe, expect, it } from "vitest";
import {
channelRouteKey,
channelRoutesMatchExact,
channelRoutesShareConversation,
normalizeChannelRouteRef,
stringifyRouteThreadId,
} from "./ref.js";
describe("channel route refs", () => {
it("normalizes target, account, and thread fields", () => {
expect(
normalizeChannelRouteRef({
channel: " Slack ",
accountId: " Work ",
rawTo: " channel:C1 ",
to: " C1 ",
threadId: " 171234.567 ",
}),
).toEqual({
channel: "slack",
accountId: "work",
target: {
rawTo: "channel:C1",
to: "C1",
},
thread: {
id: "171234.567",
},
});
});
it("normalizes numeric thread ids for route keys", () => {
const route = normalizeChannelRouteRef({
channel: "telegram",
to: "-100123",
threadId: 42.9,
});
expect(stringifyRouteThreadId(route?.thread?.id)).toBe("42");
expect(channelRouteKey(route)).toBe("telegram|-100123||42");
});
it("matches exact routes when numeric and string thread ids are equivalent", () => {
expect(
channelRoutesMatchExact({
left: normalizeChannelRouteRef({
channel: "telegram",
to: "-100123",
threadId: 42,
}),
right: normalizeChannelRouteRef({
channel: "telegram",
to: "-100123",
threadId: "42",
}),
}),
).toBe(true);
});
it("shares conversation when one side is the parent route", () => {
expect(
channelRoutesShareConversation({
left: normalizeChannelRouteRef({
channel: "slack",
to: "channel:C1",
threadId: "171234.567",
}),
right: normalizeChannelRouteRef({
channel: "slack",
to: "channel:C1",
}),
}),
).toBe(true);
});
it("does not share different child threads", () => {
expect(
channelRoutesShareConversation({
left: normalizeChannelRouteRef({
channel: "matrix",
to: "room:!abc:example.org",
threadId: "$root-1",
}),
right: normalizeChannelRouteRef({
channel: "matrix",
to: "room:!abc:example.org",
threadId: "$root-2",
}),
}),
).toBe(false);
});
});

162
src/channels/route/ref.ts Normal file
View File

@@ -0,0 +1,162 @@
import { normalizeOptionalAccountId } from "../../routing/account-id.js";
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
normalizeOptionalThreadValue,
} from "../../shared/string-coerce.js";
import type { ChatType } from "../chat-type.js";
export type ChannelRouteThreadKind = "topic" | "thread" | "reply";
export type ChannelRouteThreadSource = "explicit" | "target" | "session" | "turn";
export type ChannelRouteRef = {
channel?: string;
accountId?: string;
target?: {
to: string;
rawTo?: string;
chatType?: ChatType;
};
thread?: {
id: string | number;
kind?: ChannelRouteThreadKind;
source?: ChannelRouteThreadSource;
};
};
export type ChannelRouteRefInput = {
channel?: unknown;
accountId?: unknown;
to?: unknown;
rawTo?: unknown;
chatType?: ChatType;
threadId?: unknown;
threadKind?: ChannelRouteThreadKind;
threadSource?: ChannelRouteThreadSource;
};
export function normalizeRouteThreadId(value: unknown): string | number | undefined {
return normalizeOptionalThreadValue(value);
}
export function stringifyRouteThreadId(value: unknown): string | undefined {
const normalized = normalizeRouteThreadId(value);
return normalized == null ? undefined : String(normalized);
}
export function normalizeChannelRouteRef(
input?: ChannelRouteRefInput,
): ChannelRouteRef | undefined {
if (!input) {
return undefined;
}
const channel = normalizeLowercaseStringOrEmpty(input.channel);
const accountId =
typeof input.accountId === "string" ? normalizeOptionalAccountId(input.accountId) : undefined;
const to = normalizeOptionalString(input.to);
const rawTo = normalizeOptionalString(input.rawTo);
const threadId = normalizeRouteThreadId(input.threadId);
if (!channel && !to && !accountId && threadId == null) {
return undefined;
}
return {
...(channel ? { channel } : {}),
...(accountId ? { accountId } : {}),
...(to
? {
target: {
to,
...(rawTo && rawTo !== to ? { rawTo } : {}),
...(input.chatType ? { chatType: input.chatType } : {}),
},
}
: {}),
...(threadId != null
? {
thread: {
id: threadId,
...(input.threadKind ? { kind: input.threadKind } : {}),
...(input.threadSource ? { source: input.threadSource } : {}),
},
}
: {}),
};
}
export function channelRouteTarget(route?: ChannelRouteRef): string | undefined {
return route?.target?.to;
}
export function channelRouteThreadId(route?: ChannelRouteRef): string | number | undefined {
return route?.thread?.id;
}
function threadIdsEqual(left?: string | number, right?: string | number): boolean {
const normalizedLeft = stringifyRouteThreadId(left);
const normalizedRight = stringifyRouteThreadId(right);
return normalizedLeft === normalizedRight;
}
function accountsCompatible(left?: string, right?: string): boolean {
return !left || !right || left === right;
}
export function channelRoutesMatchExact(params: {
left?: ChannelRouteRef | null;
right?: ChannelRouteRef | null;
}): boolean {
const { left, right } = params;
if (!left || !right) {
return false;
}
return (
left.channel === right.channel &&
left.accountId === right.accountId &&
channelRouteTarget(left) === channelRouteTarget(right) &&
threadIdsEqual(channelRouteThreadId(left), channelRouteThreadId(right))
);
}
export function channelRoutesShareConversation(params: {
left?: ChannelRouteRef | null;
right?: ChannelRouteRef | null;
}): boolean {
const { left, right } = params;
if (!left || !right) {
return false;
}
if (left.channel && right.channel && left.channel !== right.channel) {
return false;
}
if (!accountsCompatible(left.accountId, right.accountId)) {
return false;
}
if (channelRouteTarget(left) !== channelRouteTarget(right)) {
return false;
}
const leftThreadId = channelRouteThreadId(left);
const rightThreadId = channelRouteThreadId(right);
if (leftThreadId == null || rightThreadId == null) {
return true;
}
return threadIdsEqual(leftThreadId, rightThreadId);
}
export function channelRouteKey(route?: ChannelRouteRef): string | undefined {
const normalized = normalizeChannelRouteRef({
channel: route?.channel,
accountId: route?.accountId,
to: route?.target?.to,
threadId: route?.thread?.id,
});
if (!normalized?.channel || !normalized.target?.to) {
return undefined;
}
return [
normalized.channel,
normalized.target.to,
normalized.accountId ?? "",
stringifyRouteThreadId(normalized.thread?.id) ?? "",
].join("|");
}

View File

@@ -1,4 +1,9 @@
import { normalizeOptionalString } from "../shared/string-coerce.js";
import {
channelRouteKey,
channelRouteThreadId,
channelRouteTarget,
normalizeChannelRouteRef,
} from "../channels/route/ref.js";
import { normalizeAccountId } from "./account-id.js";
import type { DeliveryContext, DeliveryContextSessionSource } from "./delivery-context.types.js";
import { normalizeMessageChannel } from "./message-channel-core.js";
@@ -8,30 +13,26 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon
if (!context) {
return undefined;
}
const channel =
typeof context.channel === "string"
? (normalizeMessageChannel(context.channel) ?? context.channel.trim())
: undefined;
const to = normalizeOptionalString(context.to);
const accountId = normalizeAccountId(context.accountId);
const threadId =
typeof context.threadId === "number" && Number.isFinite(context.threadId)
? Math.trunc(context.threadId)
: typeof context.threadId === "string"
? normalizeOptionalString(context.threadId)
: undefined;
const normalizedThreadId =
typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId;
if (!channel && !to && !accountId && normalizedThreadId == null) {
const route = normalizeChannelRouteRef({
channel:
typeof context.channel === "string"
? (normalizeMessageChannel(context.channel) ?? context.channel.trim())
: undefined,
to: context.to,
accountId: context.accountId,
threadId: context.threadId,
});
if (!route) {
return undefined;
}
const normalized: DeliveryContext = {
channel: channel || undefined,
to: to || undefined,
accountId,
channel: route.channel,
to: channelRouteTarget(route),
accountId: normalizeAccountId(route.accountId),
};
if (normalizedThreadId != null) {
normalized.threadId = normalizedThreadId;
const threadId = channelRouteThreadId(route);
if (threadId != null) {
normalized.threadId = threadId;
}
return normalized;
}
@@ -131,10 +132,12 @@ export function mergeDeliveryContext(
export function deliveryContextKey(context?: DeliveryContext): string | undefined {
const normalized = normalizeDeliveryContext(context);
if (!normalized?.channel || !normalized?.to) {
return undefined;
}
const threadId =
normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : "";
return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`;
return channelRouteKey(
normalizeChannelRouteRef({
channel: normalized?.channel,
to: normalized?.to,
accountId: normalized?.accountId,
threadId: normalized?.threadId,
}),
);
}

View File

@@ -137,6 +137,9 @@ describe("delivery context helpers", () => {
expect(
deliveryContextKey({ channel: "demo-channel", to: "channel:C1", threadId: "123.456" }),
).toBe("demo-channel|channel:C1||123.456");
expect(deliveryContextKey({ channel: "telegram", to: "-100123", threadId: 42.9 })).toBe(
"telegram|-100123||42",
);
});
it("formats generic fallback conversation targets as channels", () => {