Files
openclaw/src/agents/subagent-announce-delivery.ts
2026-04-05 09:44:20 +01:00

681 lines
23 KiB
TypeScript

import { getChannelPlugin } from "../channels/plugins/index.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { isCronSessionKey } from "../sessions/session-key-utils.js";
import {
type DeliveryContext,
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
resolveConversationDeliveryTarget,
} from "../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isGatewayMessageChannel,
isInternalMessageChannel,
normalizeMessageChannel,
} from "../utils/message-channel.js";
import { buildAnnounceIdempotencyKey, resolveQueueAnnounceId } from "./announce-idempotency.js";
import type { AgentInternalEvent } from "./internal-events.js";
import {
callGateway,
createBoundDeliveryRouter,
getGlobalHookRunner,
isEmbeddedPiRunActive,
loadConfig,
loadSessionStore,
queueEmbeddedPiMessage,
resolveAgentIdFromSessionKey,
resolveConversationIdFromTargets,
resolveExternalBestEffortDeliveryTarget,
resolveMainSessionKey,
resolveQueueSettings,
resolveStorePath,
} from "./subagent-announce-delivery.runtime.js";
import {
runSubagentAnnounceDispatch,
type SubagentAnnounceDeliveryResult,
} from "./subagent-announce-dispatch.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000;
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
type SubagentAnnounceDeliveryDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
};
const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
callGateway,
loadConfig,
};
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
defaultSubagentAnnounceDeliveryDeps;
function resolveDirectAnnounceTransientRetryDelaysMs() {
return process.env.OPENCLAW_TEST_FAST === "1"
? ([8, 16, 32] as const)
: ([5_000, 10_000, 20_000] as const);
}
type DeliveryContextSource = Parameters<typeof deliveryContextFromSession>[0];
export function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): number {
const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs;
if (typeof configured !== "number" || !Number.isFinite(configured)) {
return DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS;
}
return Math.min(Math.max(1, Math.floor(configured)), MAX_TIMER_SAFE_TIMEOUT_MS);
}
export function isInternalAnnounceRequesterSession(sessionKey: string | undefined): boolean {
return getSubagentDepthFromSessionStore(sessionKey) >= 1 || isCronSessionKey(sessionKey);
}
function summarizeDeliveryError(error: unknown): string {
if (error instanceof Error) {
return error.message || "error";
}
if (typeof error === "string") {
return error;
}
if (error === undefined || error === null) {
return "unknown error";
}
try {
return JSON.stringify(error);
} catch {
return "error";
}
}
function normalizeTelegramAnnounceTarget(target: string | undefined): string | undefined {
const trimmed = target?.trim();
if (!trimmed) {
return undefined;
}
if (trimmed.startsWith("group:")) {
return `telegram:${trimmed.slice("group:".length)}`;
}
if (!trimmed.startsWith("telegram:")) {
return undefined;
}
const raw = trimmed.slice("telegram:".length);
const topicMatch = /^(.*):topic:[^:]+$/u.exec(raw);
return `telegram:${topicMatch?.[1] ?? raw}`;
}
function shouldStripThreadFromAnnounceEntry(
normalizedRequester?: DeliveryContext,
normalizedEntry?: DeliveryContext,
): boolean {
if (
!normalizedRequester?.to ||
normalizedRequester.threadId != null ||
normalizedEntry?.threadId == null
) {
return false;
}
const requesterChannel = normalizedRequester.channel?.trim().toLowerCase();
if (requesterChannel === "telegram") {
const requesterTarget = normalizeTelegramAnnounceTarget(normalizedRequester.to);
const entryTarget = normalizeTelegramAnnounceTarget(normalizedEntry?.to);
if (requesterTarget && entryTarget) {
return requesterTarget !== entryTarget;
}
}
const plugin = requesterChannel ? getChannelPlugin(requesterChannel) : undefined;
return Boolean(
plugin?.conversationBindings?.shouldStripThreadFromAnnounceOrigin?.({
requester: normalizedRequester,
entry: normalizedEntry,
}),
);
}
const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/\berrorcode=unavailable\b/i,
/\bstatus\s*[:=]\s*"?unavailable\b/i,
/\bUNAVAILABLE\b/,
/no active .* listener/i,
/gateway not connected/i,
/gateway closed \(1006/i,
/gateway timeout/i,
/\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i,
];
const PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/unsupported channel/i,
/unknown channel/i,
/chat not found/i,
/user not found/i,
/bot.*not.*member/i,
/bot was blocked by the user/i,
/forbidden: bot was kicked/i,
/recipient is not a valid/i,
/outbound not configured for channel/i,
];
function isTransientAnnounceDeliveryError(error: unknown): boolean {
const message = summarizeDeliveryError(error);
if (!message) {
return false;
}
if (PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message))) {
return false;
}
return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message));
}
async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise<void> {
if (ms <= 0) {
return;
}
if (!signal) {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
return;
}
if (signal.aborted) {
return;
}
await new Promise<void>((resolve) => {
const timer = setTimeout(() => {
signal.removeEventListener("abort", onAbort);
resolve();
}, ms);
const onAbort = () => {
clearTimeout(timer);
signal.removeEventListener("abort", onAbort);
resolve();
};
signal.addEventListener("abort", onAbort, { once: true });
});
}
export async function runAnnounceDeliveryWithRetry<T>(params: {
operation: string;
signal?: AbortSignal;
run: () => Promise<T>;
}): Promise<T> {
const retryDelaysMs = resolveDirectAnnounceTransientRetryDelaysMs();
let retryIndex = 0;
for (;;) {
if (params.signal?.aborted) {
throw new Error("announce delivery aborted");
}
try {
return await params.run();
} catch (err) {
const delayMs = retryDelaysMs[retryIndex];
if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) {
throw err;
}
const nextAttempt = retryIndex + 2;
const maxAttempts = retryDelaysMs.length + 1;
defaultRuntime.log(
`[warn] Subagent announce ${params.operation} transient failure, retrying ${nextAttempt}/${maxAttempts} in ${Math.round(delayMs / 1000)}s: ${summarizeDeliveryError(err)}`,
);
retryIndex += 1;
await waitForAnnounceRetryDelay(delayMs, params.signal);
}
}
}
export function resolveAnnounceOrigin(
entry?: DeliveryContextSource,
requesterOrigin?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);
const normalizedEntry = deliveryContextFromSession(entry);
if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) {
return mergeDeliveryContext(
{
accountId: normalizedRequester.accountId,
threadId: normalizedRequester.threadId,
},
normalizedEntry,
);
}
const entryForMerge =
normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry)
? (() => {
const { threadId: _ignore, ...rest } = normalizedEntry;
return rest;
})()
: normalizedEntry;
return mergeDeliveryContext(normalizedRequester, entryForMerge);
}
export async function resolveSubagentCompletionOrigin(params: {
childSessionKey: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childRunId?: string;
spawnMode?: SpawnSubagentMode;
expectsCompletionMessage: boolean;
}): Promise<DeliveryContext | undefined> {
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const channel = requesterOrigin?.channel?.trim().toLowerCase();
const to = requesterOrigin?.to?.trim();
const accountId = normalizeAccountId(requesterOrigin?.accountId);
const threadId =
requesterOrigin?.threadId != null && requesterOrigin.threadId !== ""
? String(requesterOrigin.threadId).trim()
: undefined;
const conversationId =
threadId ||
resolveConversationIdFromTargets({
targets: [to],
}) ||
"";
const requesterConversation: ConversationRef | undefined =
channel && conversationId ? { channel, accountId, conversationId } : undefined;
const route = createBoundDeliveryRouter().resolveDestination({
eventKind: "task_completion",
targetSessionKey: params.childSessionKey,
requester: requesterConversation,
failClosed: false,
});
if (route.mode === "bound" && route.binding) {
const boundTarget = resolveConversationDeliveryTarget({
channel: route.binding.conversation.channel,
conversationId: route.binding.conversation.conversationId,
parentConversationId: route.binding.conversation.parentConversationId,
});
return mergeDeliveryContext(
{
channel: route.binding.conversation.channel,
accountId: route.binding.conversation.accountId,
to: boundTarget.to,
threadId:
boundTarget.threadId ??
(requesterOrigin?.threadId != null && requesterOrigin.threadId !== ""
? String(requesterOrigin.threadId)
: undefined),
},
requesterOrigin,
);
}
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("subagent_delivery_target")) {
return requesterOrigin;
}
try {
const result = await hookRunner.runSubagentDeliveryTarget(
{
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin,
childRunId: params.childRunId,
spawnMode: params.spawnMode,
expectsCompletionMessage: params.expectsCompletionMessage,
},
{
runId: params.childRunId,
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
},
);
const hookOrigin = normalizeDeliveryContext(result?.origin);
if (!hookOrigin) {
return requesterOrigin;
}
if (hookOrigin.channel && isInternalMessageChannel(hookOrigin.channel)) {
return requesterOrigin;
}
return mergeDeliveryContext(hookOrigin, requesterOrigin);
} catch {
return requesterOrigin;
}
}
async function sendAnnounce(item: AnnounceQueueItem) {
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const requesterIsSubagent = isInternalAnnounceRequesterSession(item.sessionKey);
const origin = item.origin;
const threadId =
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
const idempotencyKey = buildAnnounceIdempotencyKey(
resolveQueueAnnounceId({
announceId: item.announceId,
sessionKey: item.sessionKey,
enqueuedAt: item.enqueuedAt,
}),
);
await subagentAnnounceDeliveryDeps.callGateway({
method: "agent",
params: {
sessionKey: item.sessionKey,
message: item.prompt,
channel: requesterIsSubagent ? undefined : origin?.channel,
accountId: requesterIsSubagent ? undefined : origin?.accountId,
to: requesterIsSubagent ? undefined : origin?.to,
threadId: requesterIsSubagent ? undefined : threadId,
deliver: !requesterIsSubagent,
internalEvents: item.internalEvents,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: item.sourceSessionKey,
sourceChannel: item.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
sourceTool: item.sourceTool ?? "subagent_announce",
},
idempotencyKey,
},
timeoutMs: announceTimeoutMs,
});
}
export function resolveRequesterStoreKey(
cfg: ReturnType<typeof loadConfig>,
requesterSessionKey: string,
): string {
const raw = (requesterSessionKey ?? "").trim();
if (!raw) {
return raw;
}
if (raw === "global" || raw === "unknown") {
return raw;
}
if (raw.startsWith("agent:")) {
return raw;
}
const mainKey = normalizeMainKey(cfg.session?.mainKey);
if (raw === "main" || raw === mainKey) {
return resolveMainSessionKey(cfg);
}
const agentId = resolveAgentIdFromSessionKey(raw);
return `agent:${agentId}:${raw}`;
}
export function loadRequesterSessionEntry(requesterSessionKey: string) {
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const canonicalKey = resolveRequesterStoreKey(cfg, requesterSessionKey);
const agentId = resolveAgentIdFromSessionKey(canonicalKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
const entry = store[canonicalKey];
return { cfg, entry, canonicalKey };
}
export function loadSessionEntryByKey(sessionKey: string) {
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
return store[sessionKey];
}
function buildAnnounceQueueKey(sessionKey: string, origin?: DeliveryContext): string {
const accountId = normalizeAccountId(origin?.accountId);
if (!accountId) {
return sessionKey;
}
return `${sessionKey}:acct:${accountId}`;
}
async function maybeQueueSubagentAnnounce(params: {
requesterSessionKey: string;
announceId?: string;
triggerMessage: string;
steerMessage: string;
summaryLine?: string;
requesterOrigin?: DeliveryContext;
sourceSessionKey?: string;
sourceChannel?: string;
sourceTool?: string;
internalEvents?: AgentInternalEvent[];
signal?: AbortSignal;
}): Promise<"steered" | "queued" | "none" | "dropped"> {
if (params.signal?.aborted) {
return "none";
}
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
const sessionId = entry?.sessionId;
if (!sessionId) {
return "none";
}
const queueSettings = resolveQueueSettings({
cfg,
channel: entry?.channel ?? entry?.lastChannel ?? entry?.origin?.provider,
sessionEntry: entry,
});
const isActive = isEmbeddedPiRunActive(sessionId);
const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog";
if (shouldSteer) {
const steered = queueEmbeddedPiMessage(sessionId, params.steerMessage);
if (steered) {
return "steered";
}
}
const shouldFollowup =
queueSettings.mode === "followup" ||
queueSettings.mode === "collect" ||
queueSettings.mode === "steer-backlog" ||
queueSettings.mode === "interrupt";
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
const didQueue = enqueueAnnounce({
key: buildAnnounceQueueKey(canonicalKey, origin),
item: {
announceId: params.announceId,
prompt: params.triggerMessage,
summaryLine: params.summaryLine,
internalEvents: params.internalEvents,
enqueuedAt: Date.now(),
sessionKey: canonicalKey,
origin,
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel,
sourceTool: params.sourceTool,
},
settings: queueSettings,
send: sendAnnounce,
});
return didQueue ? "queued" : "dropped";
}
return "none";
}
async function sendSubagentAnnounceDirectly(params: {
targetRequesterSessionKey: string;
triggerMessage: string;
internalEvents?: AgentInternalEvent[];
expectsCompletionMessage: boolean;
bestEffortDeliver?: boolean;
directIdempotencyKey: string;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
requesterSessionOrigin?: DeliveryContext;
sourceSessionKey?: string;
sourceChannel?: string;
sourceTool?: string;
requesterIsSubagent: boolean;
signal?: AbortSignal;
}): Promise<SubagentAnnounceDeliveryResult> {
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
cfg,
params.targetRequesterSessionKey,
);
try {
const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin);
const directOrigin = normalizeDeliveryContext(params.directOrigin);
const requesterSessionOrigin = normalizeDeliveryContext(params.requesterSessionOrigin);
// Merge completionDirectOrigin with directOrigin so that missing fields
// (channel, to, accountId) fall back to the originating session's
// lastChannel / lastTo. Without this, a completion origin that carries a
// channel but not a `to` would prevent external delivery.
const effectiveDirectOrigin =
params.expectsCompletionMessage && completionDirectOrigin
? mergeDeliveryContext(completionDirectOrigin, directOrigin)
: directOrigin;
const sessionOnlyOrigin = effectiveDirectOrigin?.channel
? effectiveDirectOrigin
: requesterSessionOrigin;
const deliveryTarget = !params.requesterIsSubagent
? resolveExternalBestEffortDeliveryTarget({
channel: effectiveDirectOrigin?.channel,
to: effectiveDirectOrigin?.to,
accountId: effectiveDirectOrigin?.accountId,
threadId: effectiveDirectOrigin?.threadId,
})
: { deliver: false };
const normalizedSessionOnlyOriginChannel = !params.requesterIsSubagent
? normalizeMessageChannel(sessionOnlyOrigin?.channel)
: undefined;
const sessionOnlyOriginChannel =
normalizedSessionOnlyOriginChannel &&
isGatewayMessageChannel(normalizedSessionOnlyOriginChannel)
? normalizedSessionOnlyOriginChannel
: undefined;
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
await runAnnounceDeliveryWithRetry({
operation: params.expectsCompletionMessage
? "completion direct announce agent call"
: "direct announce agent call",
signal: params.signal,
run: async () =>
await subagentAnnounceDeliveryDeps.callGateway({
method: "agent",
params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: deliveryTarget.deliver,
bestEffortDeliver: params.bestEffortDeliver,
internalEvents: params.internalEvents,
channel: deliveryTarget.deliver ? deliveryTarget.channel : sessionOnlyOriginChannel,
accountId: deliveryTarget.deliver
? deliveryTarget.accountId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.accountId
: undefined,
to: deliveryTarget.deliver
? deliveryTarget.to
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.to
: undefined,
threadId: deliveryTarget.deliver
? deliveryTarget.threadId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.threadId
: undefined,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
sourceTool: params.sourceTool ?? "subagent_announce",
},
idempotencyKey: params.directIdempotencyKey,
},
expectFinal: true,
timeoutMs: announceTimeoutMs,
}),
});
return {
delivered: true,
path: "direct",
};
} catch (err) {
return {
delivered: false,
path: "direct",
error: summarizeDeliveryError(err),
};
}
}
export async function deliverSubagentAnnouncement(params: {
requesterSessionKey: string;
announceId?: string;
triggerMessage: string;
steerMessage: string;
internalEvents?: AgentInternalEvent[];
summaryLine?: string;
requesterSessionOrigin?: DeliveryContext;
requesterOrigin?: DeliveryContext;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
sourceSessionKey?: string;
sourceChannel?: string;
sourceTool?: string;
targetRequesterSessionKey: string;
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
bestEffortDeliver?: boolean;
directIdempotencyKey: string;
signal?: AbortSignal;
}): Promise<SubagentAnnounceDeliveryResult> {
return await runSubagentAnnounceDispatch({
expectsCompletionMessage: params.expectsCompletionMessage,
signal: params.signal,
queue: async () =>
await maybeQueueSubagentAnnounce({
requesterSessionKey: params.requesterSessionKey,
announceId: params.announceId,
triggerMessage: params.triggerMessage,
steerMessage: params.steerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel,
sourceTool: params.sourceTool,
internalEvents: params.internalEvents,
signal: params.signal,
}),
direct: async () =>
await sendSubagentAnnounceDirectly({
targetRequesterSessionKey: params.targetRequesterSessionKey,
triggerMessage: params.triggerMessage,
internalEvents: params.internalEvents,
directIdempotencyKey: params.directIdempotencyKey,
completionDirectOrigin: params.completionDirectOrigin,
directOrigin: params.directOrigin,
requesterSessionOrigin: params.requesterSessionOrigin,
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel,
sourceTool: params.sourceTool,
requesterIsSubagent: params.requesterIsSubagent,
expectsCompletionMessage: params.expectsCompletionMessage,
signal: params.signal,
bestEffortDeliver: params.bestEffortDeliver,
}),
});
}
export const __testing = {
setDepsForTest(overrides?: Partial<SubagentAnnounceDeliveryDeps>) {
subagentAnnounceDeliveryDeps = overrides
? {
...defaultSubagentAnnounceDeliveryDeps,
...overrides,
}
: defaultSubagentAnnounceDeliveryDeps;
},
};