perf(agents): isolate subagent announce origin helper

This commit is contained in:
Peter Steinberger
2026-04-06 03:20:31 +01:00
parent 3ee823b229
commit c45f1ac8ce
3 changed files with 186 additions and 80 deletions

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { resolveAnnounceOrigin } from "./subagent-announce-delivery.js";
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
describe("resolveAnnounceOrigin telegram forum topics", () => {
it("preserves stored forum topic thread ids when requester origin omits one for the same chat", () => {

View File

@@ -1,15 +1,8 @@
import { getChannelPlugin } from "../channels/plugins/index.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { isCronSessionKey } from "../sessions/session-key-utils.js";
import {
type DeliveryContext,
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
resolveConversationDeliveryTarget,
} from "../utils/delivery-context.js";
import { resolveConversationDeliveryTarget } from "../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isGatewayMessageChannel,
@@ -37,6 +30,7 @@ import {
runSubagentAnnounceDispatch,
type SubagentAnnounceDeliveryResult,
} from "./subagent-announce-dispatch.js";
import { resolveAnnounceOrigin, type DeliveryContext } from "./subagent-announce-origin.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
@@ -63,8 +57,6 @@ function resolveDirectAnnounceTransientRetryDelaysMs() {
: ([5_000, 10_000, 20_000] as const);
}
type DeliveryContextSource = Parameters<typeof deliveryContextFromSession>[0];
export function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): number {
const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs;
if (typeof configured !== "number" || !Number.isFinite(configured)) {
@@ -94,50 +86,6 @@ function summarizeDeliveryError(error: unknown): string {
}
}
function normalizeTelegramAnnounceTarget(target: string | undefined): string | undefined {
const trimmed = target?.trim();
if (!trimmed) {
return undefined;
}
if (trimmed.startsWith("group:")) {
return `telegram:${trimmed.slice("group:".length)}`;
}
if (!trimmed.startsWith("telegram:")) {
return undefined;
}
const raw = trimmed.slice("telegram:".length);
const topicMatch = /^(.*):topic:[^:]+$/u.exec(raw);
return `telegram:${topicMatch?.[1] ?? raw}`;
}
function shouldStripThreadFromAnnounceEntry(
normalizedRequester?: DeliveryContext,
normalizedEntry?: DeliveryContext,
): boolean {
if (
!normalizedRequester?.to ||
normalizedRequester.threadId != null ||
normalizedEntry?.threadId == null
) {
return false;
}
const requesterChannel = normalizedRequester.channel?.trim().toLowerCase();
if (requesterChannel === "telegram") {
const requesterTarget = normalizeTelegramAnnounceTarget(normalizedRequester.to);
const entryTarget = normalizeTelegramAnnounceTarget(normalizedEntry?.to);
if (requesterTarget && entryTarget) {
return requesterTarget !== entryTarget;
}
}
const plugin = requesterChannel ? getChannelPlugin(requesterChannel) : undefined;
return Boolean(
plugin?.conversationBindings?.shouldStripThreadFromAnnounceOrigin?.({
requester: normalizedRequester,
entry: normalizedEntry,
}),
);
}
const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/\berrorcode=unavailable\b/i,
/\bstatus\s*[:=]\s*"?unavailable\b/i,
@@ -226,31 +174,6 @@ export async function runAnnounceDeliveryWithRetry<T>(params: {
}
}
export function resolveAnnounceOrigin(
entry?: DeliveryContextSource,
requesterOrigin?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);
const normalizedEntry = deliveryContextFromSession(entry);
if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) {
return mergeDeliveryContext(
{
accountId: normalizedRequester.accountId,
threadId: normalizedRequester.threadId,
},
normalizedEntry,
);
}
const entryForMerge =
normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry)
? (() => {
const { threadId: _ignore, ...rest } = normalizedEntry;
return rest;
})()
: normalizedEntry;
return mergeDeliveryContext(normalizedRequester, entryForMerge);
}
export async function resolveSubagentCompletionOrigin(params: {
childSessionKey: string;
requesterSessionKey: string;

View File

@@ -0,0 +1,183 @@
import { getChannelPlugin } from "../channels/plugins/index.js";
export type DeliveryContext = {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
type DeliveryContextSource = {
channel?: string;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
origin?: {
provider?: string;
accountId?: string;
threadId?: string | number;
};
deliveryContext?: DeliveryContext;
};
function normalizeChannel(raw?: string): string | undefined {
const value = raw?.trim().toLowerCase();
return value || undefined;
}
function normalizeText(raw?: string): string | undefined {
const value = raw?.trim();
return value || undefined;
}
function normalizeThreadId(raw?: string | number): string | number | undefined {
if (typeof raw === "number" && Number.isFinite(raw)) {
return Math.trunc(raw);
}
if (typeof raw === "string") {
const value = raw.trim();
return value || undefined;
}
return undefined;
}
function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context) {
return undefined;
}
const normalized: DeliveryContext = {
channel: normalizeChannel(context.channel),
to: normalizeText(context.to),
accountId: normalizeText(context.accountId),
};
const threadId = normalizeThreadId(context.threadId);
if (threadId != null) {
normalized.threadId = threadId;
}
if (
!normalized.channel &&
!normalized.to &&
!normalized.accountId &&
normalized.threadId == null
) {
return undefined;
}
return normalized;
}
function mergeDeliveryContext(
primary?: DeliveryContext,
fallback?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedPrimary = normalizeDeliveryContext(primary);
const normalizedFallback = normalizeDeliveryContext(fallback);
if (!normalizedPrimary && !normalizedFallback) {
return undefined;
}
const channelsConflict =
normalizedPrimary?.channel &&
normalizedFallback?.channel &&
normalizedPrimary.channel !== normalizedFallback.channel;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
to: channelsConflict
? normalizedPrimary?.to
: (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: channelsConflict
? normalizedPrimary?.accountId
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
threadId: channelsConflict
? normalizedPrimary?.threadId
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
});
}
function deliveryContextFromSession(entry?: DeliveryContextSource): DeliveryContext | undefined {
if (!entry) {
return undefined;
}
return normalizeDeliveryContext({
channel:
entry.deliveryContext?.channel ??
entry.lastChannel ??
entry.channel ??
entry.origin?.provider,
to: entry.deliveryContext?.to ?? entry.lastTo,
accountId: entry.deliveryContext?.accountId ?? entry.lastAccountId ?? entry.origin?.accountId,
threadId: entry.deliveryContext?.threadId ?? entry.lastThreadId ?? entry.origin?.threadId,
});
}
function isInternalMessageChannel(raw?: string): boolean {
return normalizeChannel(raw) === "webchat";
}
function normalizeTelegramAnnounceTarget(target: string | undefined): string | undefined {
const trimmed = target?.trim();
if (!trimmed) {
return undefined;
}
if (trimmed.startsWith("group:")) {
return `telegram:${trimmed.slice("group:".length)}`;
}
if (!trimmed.startsWith("telegram:")) {
return undefined;
}
const raw = trimmed.slice("telegram:".length);
const topicMatch = /^(.*):topic:[^:]+$/u.exec(raw);
return `telegram:${topicMatch?.[1] ?? raw}`;
}
function shouldStripThreadFromAnnounceEntry(
normalizedRequester?: DeliveryContext,
normalizedEntry?: DeliveryContext,
): boolean {
if (
!normalizedRequester?.to ||
normalizedRequester.threadId != null ||
normalizedEntry?.threadId == null
) {
return false;
}
const requesterChannel = normalizeChannel(normalizedRequester.channel);
if (requesterChannel === "telegram") {
const requesterTarget = normalizeTelegramAnnounceTarget(normalizedRequester.to);
const entryTarget = normalizeTelegramAnnounceTarget(normalizedEntry?.to);
if (requesterTarget && entryTarget) {
return requesterTarget !== entryTarget;
}
}
const plugin = requesterChannel ? getChannelPlugin(requesterChannel) : undefined;
return Boolean(
plugin?.conversationBindings?.shouldStripThreadFromAnnounceOrigin?.({
requester: normalizedRequester,
entry: normalizedEntry,
}),
);
}
export function resolveAnnounceOrigin(
entry?: DeliveryContextSource,
requesterOrigin?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);
const normalizedEntry = deliveryContextFromSession(entry);
if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) {
return mergeDeliveryContext(
{
accountId: normalizedRequester.accountId,
threadId: normalizedRequester.threadId,
},
normalizedEntry,
);
}
const entryForMerge =
normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry)
? (() => {
const { threadId: _ignore, ...rest } = normalizedEntry;
return rest;
})()
: normalizedEntry;
return mergeDeliveryContext(normalizedRequester, entryForMerge);
}