From c45f1ac8cef8c82621bb84a717a1387316c858cb Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 6 Apr 2026 03:20:31 +0100 Subject: [PATCH] perf(agents): isolate subagent announce origin helper --- src/agents/subagent-announce-delivery.test.ts | 2 +- src/agents/subagent-announce-delivery.ts | 81 +------- src/agents/subagent-announce-origin.ts | 183 ++++++++++++++++++ 3 files changed, 186 insertions(+), 80 deletions(-) create mode 100644 src/agents/subagent-announce-origin.ts diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index e1cc3f57fa7..81cdbaf8c77 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { resolveAnnounceOrigin } from "./subagent-announce-delivery.js"; +import { resolveAnnounceOrigin } from "./subagent-announce-origin.js"; describe("resolveAnnounceOrigin telegram forum topics", () => { it("preserves stored forum topic thread ids when requester origin omits one for the same chat", () => { diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index e4f6252e981..e9b8d240b89 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -1,15 +1,8 @@ -import { getChannelPlugin } from "../channels/plugins/index.js"; import type { ConversationRef } from "../infra/outbound/session-binding-service.js"; import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; import { isCronSessionKey } from "../sessions/session-key-utils.js"; -import { - type DeliveryContext, - deliveryContextFromSession, - mergeDeliveryContext, - normalizeDeliveryContext, - resolveConversationDeliveryTarget, -} from "../utils/delivery-context.js"; +import { resolveConversationDeliveryTarget } from "../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, isGatewayMessageChannel, @@ -37,6 +30,7 @@ import { runSubagentAnnounceDispatch, type SubagentAnnounceDeliveryResult, } from "./subagent-announce-dispatch.js"; +import { resolveAnnounceOrigin, type DeliveryContext } from "./subagent-announce-origin.js"; import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { getSubagentDepthFromSessionStore } from "./subagent-depth.js"; import type { SpawnSubagentMode } from "./subagent-spawn.js"; @@ -63,8 +57,6 @@ function resolveDirectAnnounceTransientRetryDelaysMs() { : ([5_000, 10_000, 20_000] as const); } -type DeliveryContextSource = Parameters[0]; - export function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType): number { const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs; if (typeof configured !== "number" || !Number.isFinite(configured)) { @@ -94,50 +86,6 @@ function summarizeDeliveryError(error: unknown): string { } } -function normalizeTelegramAnnounceTarget(target: string | undefined): string | undefined { - const trimmed = target?.trim(); - if (!trimmed) { - return undefined; - } - if (trimmed.startsWith("group:")) { - return `telegram:${trimmed.slice("group:".length)}`; - } - if (!trimmed.startsWith("telegram:")) { - return undefined; - } - const raw = trimmed.slice("telegram:".length); - const topicMatch = /^(.*):topic:[^:]+$/u.exec(raw); - return `telegram:${topicMatch?.[1] ?? raw}`; -} - -function shouldStripThreadFromAnnounceEntry( - normalizedRequester?: DeliveryContext, - normalizedEntry?: DeliveryContext, -): boolean { - if ( - !normalizedRequester?.to || - normalizedRequester.threadId != null || - normalizedEntry?.threadId == null - ) { - return false; - } - const requesterChannel = normalizedRequester.channel?.trim().toLowerCase(); - if (requesterChannel === "telegram") { - const requesterTarget = normalizeTelegramAnnounceTarget(normalizedRequester.to); - const entryTarget = normalizeTelegramAnnounceTarget(normalizedEntry?.to); - if (requesterTarget && entryTarget) { - return requesterTarget !== entryTarget; - } - } - const plugin = requesterChannel ? getChannelPlugin(requesterChannel) : undefined; - return Boolean( - plugin?.conversationBindings?.shouldStripThreadFromAnnounceOrigin?.({ - requester: normalizedRequester, - entry: normalizedEntry, - }), - ); -} - const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /\berrorcode=unavailable\b/i, /\bstatus\s*[:=]\s*"?unavailable\b/i, @@ -226,31 +174,6 @@ export async function runAnnounceDeliveryWithRetry(params: { } } -export function resolveAnnounceOrigin( - entry?: DeliveryContextSource, - requesterOrigin?: DeliveryContext, -): DeliveryContext | undefined { - const normalizedRequester = normalizeDeliveryContext(requesterOrigin); - const normalizedEntry = deliveryContextFromSession(entry); - if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) { - return mergeDeliveryContext( - { - accountId: normalizedRequester.accountId, - threadId: normalizedRequester.threadId, - }, - normalizedEntry, - ); - } - const entryForMerge = - normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry) - ? (() => { - const { threadId: _ignore, ...rest } = normalizedEntry; - return rest; - })() - : normalizedEntry; - return mergeDeliveryContext(normalizedRequester, entryForMerge); -} - export async function resolveSubagentCompletionOrigin(params: { childSessionKey: string; requesterSessionKey: string; diff --git a/src/agents/subagent-announce-origin.ts b/src/agents/subagent-announce-origin.ts new file mode 100644 index 00000000000..12800e13106 --- /dev/null +++ b/src/agents/subagent-announce-origin.ts @@ -0,0 +1,183 @@ +import { getChannelPlugin } from "../channels/plugins/index.js"; + +export type DeliveryContext = { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; +}; + +type DeliveryContextSource = { + channel?: string; + lastChannel?: string; + lastTo?: string; + lastAccountId?: string; + lastThreadId?: string | number; + origin?: { + provider?: string; + accountId?: string; + threadId?: string | number; + }; + deliveryContext?: DeliveryContext; +}; + +function normalizeChannel(raw?: string): string | undefined { + const value = raw?.trim().toLowerCase(); + return value || undefined; +} + +function normalizeText(raw?: string): string | undefined { + const value = raw?.trim(); + return value || undefined; +} + +function normalizeThreadId(raw?: string | number): string | number | undefined { + if (typeof raw === "number" && Number.isFinite(raw)) { + return Math.trunc(raw); + } + if (typeof raw === "string") { + const value = raw.trim(); + return value || undefined; + } + return undefined; +} + +function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { + if (!context) { + return undefined; + } + const normalized: DeliveryContext = { + channel: normalizeChannel(context.channel), + to: normalizeText(context.to), + accountId: normalizeText(context.accountId), + }; + const threadId = normalizeThreadId(context.threadId); + if (threadId != null) { + normalized.threadId = threadId; + } + if ( + !normalized.channel && + !normalized.to && + !normalized.accountId && + normalized.threadId == null + ) { + return undefined; + } + return normalized; +} + +function mergeDeliveryContext( + primary?: DeliveryContext, + fallback?: DeliveryContext, +): DeliveryContext | undefined { + const normalizedPrimary = normalizeDeliveryContext(primary); + const normalizedFallback = normalizeDeliveryContext(fallback); + if (!normalizedPrimary && !normalizedFallback) { + return undefined; + } + const channelsConflict = + normalizedPrimary?.channel && + normalizedFallback?.channel && + normalizedPrimary.channel !== normalizedFallback.channel; + return normalizeDeliveryContext({ + channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, + to: channelsConflict + ? normalizedPrimary?.to + : (normalizedPrimary?.to ?? normalizedFallback?.to), + accountId: channelsConflict + ? normalizedPrimary?.accountId + : (normalizedPrimary?.accountId ?? normalizedFallback?.accountId), + threadId: channelsConflict + ? normalizedPrimary?.threadId + : (normalizedPrimary?.threadId ?? normalizedFallback?.threadId), + }); +} + +function deliveryContextFromSession(entry?: DeliveryContextSource): DeliveryContext | undefined { + if (!entry) { + return undefined; + } + return normalizeDeliveryContext({ + channel: + entry.deliveryContext?.channel ?? + entry.lastChannel ?? + entry.channel ?? + entry.origin?.provider, + to: entry.deliveryContext?.to ?? entry.lastTo, + accountId: entry.deliveryContext?.accountId ?? entry.lastAccountId ?? entry.origin?.accountId, + threadId: entry.deliveryContext?.threadId ?? entry.lastThreadId ?? entry.origin?.threadId, + }); +} + +function isInternalMessageChannel(raw?: string): boolean { + return normalizeChannel(raw) === "webchat"; +} + +function normalizeTelegramAnnounceTarget(target: string | undefined): string | undefined { + const trimmed = target?.trim(); + if (!trimmed) { + return undefined; + } + if (trimmed.startsWith("group:")) { + return `telegram:${trimmed.slice("group:".length)}`; + } + if (!trimmed.startsWith("telegram:")) { + return undefined; + } + const raw = trimmed.slice("telegram:".length); + const topicMatch = /^(.*):topic:[^:]+$/u.exec(raw); + return `telegram:${topicMatch?.[1] ?? raw}`; +} + +function shouldStripThreadFromAnnounceEntry( + normalizedRequester?: DeliveryContext, + normalizedEntry?: DeliveryContext, +): boolean { + if ( + !normalizedRequester?.to || + normalizedRequester.threadId != null || + normalizedEntry?.threadId == null + ) { + return false; + } + const requesterChannel = normalizeChannel(normalizedRequester.channel); + if (requesterChannel === "telegram") { + const requesterTarget = normalizeTelegramAnnounceTarget(normalizedRequester.to); + const entryTarget = normalizeTelegramAnnounceTarget(normalizedEntry?.to); + if (requesterTarget && entryTarget) { + return requesterTarget !== entryTarget; + } + } + const plugin = requesterChannel ? getChannelPlugin(requesterChannel) : undefined; + return Boolean( + plugin?.conversationBindings?.shouldStripThreadFromAnnounceOrigin?.({ + requester: normalizedRequester, + entry: normalizedEntry, + }), + ); +} + +export function resolveAnnounceOrigin( + entry?: DeliveryContextSource, + requesterOrigin?: DeliveryContext, +): DeliveryContext | undefined { + const normalizedRequester = normalizeDeliveryContext(requesterOrigin); + const normalizedEntry = deliveryContextFromSession(entry); + if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) { + return mergeDeliveryContext( + { + accountId: normalizedRequester.accountId, + threadId: normalizedRequester.threadId, + }, + normalizedEntry, + ); + } + const entryForMerge = + normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry) + ? (() => { + const { threadId: _ignore, ...rest } = normalizedEntry; + return rest; + })() + : normalizedEntry; + return mergeDeliveryContext(normalizedRequester, entryForMerge); +}