Files
openclaw/src/agents/subagent-announce.ts
2026-03-01 23:11:48 +00:00

1431 lines
51 KiB
TypeScript

import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveMainSessionKey,
resolveStorePath,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { extractTextFromChatContent } from "../shared/chat-content.js";
import {
type DeliveryContext,
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import { isDeliverableMessageChannel, isInternalMessageChannel } from "../utils/message-channel.js";
import {
buildAnnounceIdFromChildRun,
buildAnnounceIdempotencyKey,
resolveQueueAnnounceId,
} from "./announce-idempotency.js";
import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "./internal-events.js";
import {
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
waitForEmbeddedPiRunEnd,
} from "./pi-embedded.js";
import {
runSubagentAnnounceDispatch,
type SubagentAnnounceDeliveryResult,
} from "./subagent-announce-dispatch.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
import { readLatestAssistantReply } from "./tools/agent-step.js";
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
const FAST_TEST_RETRY_INTERVAL_MS = 8;
const FAST_TEST_REPLY_CHANGE_WAIT_MS = 20;
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000;
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
const DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS = FAST_TEST_MODE
? ([8, 16, 32] as const)
: ([5_000, 10_000, 20_000] as const);
type ToolResultMessage = {
role?: unknown;
content?: unknown;
};
function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): number {
const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs;
if (typeof configured !== "number" || !Number.isFinite(configured)) {
return DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS;
}
return Math.min(Math.max(1, Math.floor(configured)), MAX_TIMER_SAFE_TIMEOUT_MS);
}
function buildCompletionDeliveryMessage(params: {
findings: string;
subagentName: string;
spawnMode?: SpawnSubagentMode;
outcome?: SubagentRunOutcome;
announceType?: SubagentAnnounceType;
}): string {
const findingsText = params.findings.trim();
if (isAnnounceSkip(findingsText)) {
return "";
}
const hasFindings = findingsText.length > 0 && findingsText !== "(no output)";
// Cron completions are standalone messages — skip the subagent status header.
if (params.announceType === "cron job") {
return hasFindings ? findingsText : "";
}
const header = (() => {
if (params.outcome?.status === "error") {
return params.spawnMode === "session"
? `❌ Subagent ${params.subagentName} failed this task (session remains active)`
: `❌ Subagent ${params.subagentName} failed`;
}
if (params.outcome?.status === "timeout") {
return params.spawnMode === "session"
? `⏱️ Subagent ${params.subagentName} timed out on this task (session remains active)`
: `⏱️ Subagent ${params.subagentName} timed out`;
}
return params.spawnMode === "session"
? `✅ Subagent ${params.subagentName} completed this task (session remains active)`
: `✅ Subagent ${params.subagentName} finished`;
})();
if (!hasFindings) {
return header;
}
return `${header}\n\n${findingsText}`;
}
function summarizeDeliveryError(error: unknown): string {
if (error instanceof Error) {
return error.message || "error";
}
if (typeof error === "string") {
return error;
}
if (error === undefined || error === null) {
return "unknown error";
}
try {
return JSON.stringify(error);
} catch {
return "error";
}
}
const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/\berrorcode=unavailable\b/i,
/\bstatus\s*[:=]\s*"?unavailable\b/i,
/\bUNAVAILABLE\b/,
/no active .* listener/i,
/gateway not connected/i,
/gateway closed \(1006/i,
/gateway timeout/i,
/\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i,
];
const PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/unsupported channel/i,
/unknown channel/i,
/chat not found/i,
/user not found/i,
/bot was blocked by the user/i,
/forbidden: bot was kicked/i,
/recipient is not a valid/i,
/outbound not configured for channel/i,
];
function isTransientAnnounceDeliveryError(error: unknown): boolean {
const message = summarizeDeliveryError(error);
if (!message) {
return false;
}
if (PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message))) {
return false;
}
return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message));
}
async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise<void> {
if (ms <= 0) {
return;
}
if (!signal) {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
return;
}
if (signal.aborted) {
return;
}
await new Promise<void>((resolve) => {
const timer = setTimeout(() => {
signal.removeEventListener("abort", onAbort);
resolve();
}, ms);
const onAbort = () => {
clearTimeout(timer);
signal.removeEventListener("abort", onAbort);
resolve();
};
signal.addEventListener("abort", onAbort, { once: true });
});
}
async function runAnnounceDeliveryWithRetry<T>(params: {
operation: string;
signal?: AbortSignal;
run: () => Promise<T>;
}): Promise<T> {
let retryIndex = 0;
for (;;) {
if (params.signal?.aborted) {
throw new Error("announce delivery aborted");
}
try {
return await params.run();
} catch (err) {
const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex];
if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) {
throw err;
}
const nextAttempt = retryIndex + 2;
const maxAttempts = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS.length + 1;
defaultRuntime.log(
`[warn] Subagent announce ${params.operation} transient failure, retrying ${nextAttempt}/${maxAttempts} in ${Math.round(delayMs / 1000)}s: ${summarizeDeliveryError(err)}`,
);
retryIndex += 1;
await waitForAnnounceRetryDelay(delayMs, params.signal);
}
}
}
function extractToolResultText(content: unknown): string {
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (content && typeof content === "object" && !Array.isArray(content)) {
const obj = content as {
text?: unknown;
output?: unknown;
content?: unknown;
result?: unknown;
error?: unknown;
summary?: unknown;
};
if (typeof obj.text === "string") {
return sanitizeTextContent(obj.text);
}
if (typeof obj.output === "string") {
return sanitizeTextContent(obj.output);
}
if (typeof obj.content === "string") {
return sanitizeTextContent(obj.content);
}
if (typeof obj.result === "string") {
return sanitizeTextContent(obj.result);
}
if (typeof obj.error === "string") {
return sanitizeTextContent(obj.error);
}
if (typeof obj.summary === "string") {
return sanitizeTextContent(obj.summary);
}
}
if (!Array.isArray(content)) {
return "";
}
const joined = extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
normalizeText: (text) => text,
joinWith: "\n",
});
return joined?.trim() ?? "";
}
function extractInlineTextContent(content: unknown): string {
if (!Array.isArray(content)) {
return "";
}
return (
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
normalizeText: (text) => text.trim(),
joinWith: "",
}) ?? ""
);
}
function extractSubagentOutputText(message: unknown): string {
if (!message || typeof message !== "object") {
return "";
}
const role = (message as { role?: unknown }).role;
const content = (message as { content?: unknown }).content;
if (role === "assistant") {
const assistantText = extractAssistantText(message);
if (assistantText) {
return assistantText;
}
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return extractInlineTextContent(content);
}
return "";
}
if (role === "toolResult" || role === "tool") {
return extractToolResultText((message as ToolResultMessage).content);
}
if (role == null) {
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return extractInlineTextContent(content);
}
}
return "";
}
async function readLatestSubagentOutput(sessionKey: string): Promise<string | undefined> {
try {
const latestAssistant = await readLatestAssistantReply({
sessionKey,
limit: 50,
});
if (latestAssistant?.trim()) {
return latestAssistant;
}
} catch {
// Best-effort: fall back to richer history parsing below.
}
const history = await callGateway<{ messages?: Array<unknown> }>({
method: "chat.history",
params: { sessionKey, limit: 50 },
});
const messages = Array.isArray(history?.messages) ? history.messages : [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const msg = messages[i];
const text = extractSubagentOutputText(msg);
if (text) {
return text;
}
}
return undefined;
}
async function readLatestSubagentOutputWithRetry(params: {
sessionKey: string;
maxWaitMs: number;
}): Promise<string | undefined> {
const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100;
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
let result: string | undefined;
while (Date.now() < deadline) {
result = await readLatestSubagentOutput(params.sessionKey);
if (result?.trim()) {
return result;
}
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
}
return result;
}
async function waitForSubagentOutputChange(params: {
sessionKey: string;
baselineReply: string;
maxWaitMs: number;
}): Promise<string> {
const baseline = params.baselineReply.trim();
if (!baseline) {
return params.baselineReply;
}
const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100;
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 5_000));
let latest = params.baselineReply;
while (Date.now() < deadline) {
const next = await readLatestSubagentOutput(params.sessionKey);
if (next?.trim()) {
latest = next;
if (next.trim() !== baseline) {
return next;
}
}
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
}
return latest;
}
function formatDurationShort(valueMs?: number) {
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) {
return "n/a";
}
const totalSeconds = Math.round(valueMs / 1000);
const hours = Math.floor(totalSeconds / 3600);
const minutes = Math.floor((totalSeconds % 3600) / 60);
const seconds = totalSeconds % 60;
if (hours > 0) {
return `${hours}h${minutes}m`;
}
if (minutes > 0) {
return `${minutes}m${seconds}s`;
}
return `${seconds}s`;
}
function formatTokenCount(value?: number) {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return "0";
}
if (value >= 1_000_000) {
return `${(value / 1_000_000).toFixed(1)}m`;
}
if (value >= 1_000) {
return `${(value / 1_000).toFixed(1)}k`;
}
return String(Math.round(value));
}
async function buildCompactAnnounceStatsLine(params: {
sessionKey: string;
startedAt?: number;
endedAt?: number;
}) {
const cfg = loadConfig();
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
let entry = loadSessionStore(storePath)[params.sessionKey];
const tokenWaitAttempts = FAST_TEST_MODE ? 1 : 3;
for (let attempt = 0; attempt < tokenWaitAttempts; attempt += 1) {
const hasTokenData =
typeof entry?.inputTokens === "number" ||
typeof entry?.outputTokens === "number" ||
typeof entry?.totalTokens === "number";
if (hasTokenData) {
break;
}
if (!FAST_TEST_MODE) {
await new Promise((resolve) => setTimeout(resolve, 150));
}
entry = loadSessionStore(storePath)[params.sessionKey];
}
const input = typeof entry?.inputTokens === "number" ? entry.inputTokens : 0;
const output = typeof entry?.outputTokens === "number" ? entry.outputTokens : 0;
const ioTotal = input + output;
const promptCache = typeof entry?.totalTokens === "number" ? entry.totalTokens : undefined;
const runtimeMs =
typeof params.startedAt === "number" && typeof params.endedAt === "number"
? Math.max(0, params.endedAt - params.startedAt)
: undefined;
const parts = [
`runtime ${formatDurationShort(runtimeMs)}`,
`tokens ${formatTokenCount(ioTotal)} (in ${formatTokenCount(input)} / out ${formatTokenCount(output)})`,
];
if (typeof promptCache === "number" && promptCache > ioTotal) {
parts.push(`prompt/cache ${formatTokenCount(promptCache)}`);
}
return `Stats: ${parts.join(" • ")}`;
}
type DeliveryContextSource = Parameters<typeof deliveryContextFromSession>[0];
function resolveAnnounceOrigin(
entry?: DeliveryContextSource,
requesterOrigin?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);
const normalizedEntry = deliveryContextFromSession(entry);
if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) {
// Ignore internal channel hints (webchat) so a valid persisted route
// can still be used for outbound delivery. Non-standard channels that
// are not in the deliverable list should NOT be stripped here — doing
// so causes the session entry's stale lastChannel (often WhatsApp) to
// override the actual requester origin, leading to delivery failures.
return mergeDeliveryContext(
{
accountId: normalizedRequester.accountId,
threadId: normalizedRequester.threadId,
},
normalizedEntry,
);
}
// requesterOrigin (captured at spawn time) reflects the channel the user is
// actually on and must take priority over the session entry, which may carry
// stale lastChannel / lastTo values from a previous channel interaction.
const entryForMerge =
normalizedRequester?.to &&
normalizedRequester.threadId == null &&
normalizedEntry?.threadId != null
? (() => {
const { threadId: _ignore, ...rest } = normalizedEntry;
return rest;
})()
: normalizedEntry;
return mergeDeliveryContext(normalizedRequester, entryForMerge);
}
async function resolveSubagentCompletionOrigin(params: {
childSessionKey: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childRunId?: string;
spawnMode?: SpawnSubagentMode;
expectsCompletionMessage: boolean;
}): Promise<{
origin?: DeliveryContext;
routeMode: "bound" | "fallback" | "hook";
}> {
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const requesterConversation = (() => {
const channel = requesterOrigin?.channel?.trim().toLowerCase();
const to = requesterOrigin?.to?.trim();
const accountId = normalizeAccountId(requesterOrigin?.accountId);
const threadId =
requesterOrigin?.threadId != null && requesterOrigin.threadId !== ""
? String(requesterOrigin.threadId).trim()
: undefined;
const conversationId =
threadId || (to?.startsWith("channel:") ? to.slice("channel:".length) : "");
if (!channel || !conversationId) {
return undefined;
}
const ref: ConversationRef = {
channel,
accountId,
conversationId,
};
return ref;
})();
const route = createBoundDeliveryRouter().resolveDestination({
eventKind: "task_completion",
targetSessionKey: params.childSessionKey,
requester: requesterConversation,
failClosed: false,
});
if (route.mode === "bound" && route.binding) {
const boundOrigin: DeliveryContext = {
channel: route.binding.conversation.channel,
accountId: route.binding.conversation.accountId,
to: `channel:${route.binding.conversation.conversationId}`,
threadId: route.binding.conversation.conversationId,
};
return {
// Bound target is authoritative; requester hints fill only missing fields.
origin: mergeDeliveryContext(boundOrigin, requesterOrigin),
routeMode: "bound",
};
}
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("subagent_delivery_target")) {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
try {
const result = await hookRunner.runSubagentDeliveryTarget(
{
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin,
childRunId: params.childRunId,
spawnMode: params.spawnMode,
expectsCompletionMessage: params.expectsCompletionMessage,
},
{
runId: params.childRunId,
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
},
);
const hookOrigin = normalizeDeliveryContext(result?.origin);
if (!hookOrigin) {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
if (hookOrigin.channel && !isDeliverableMessageChannel(hookOrigin.channel)) {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
// Hook-provided origin should override requester defaults when present.
return {
origin: mergeDeliveryContext(hookOrigin, requesterOrigin),
routeMode: "hook",
};
} catch {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
}
async function sendAnnounce(item: AnnounceQueueItem) {
const cfg = loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const requesterDepth = getSubagentDepthFromSessionStore(item.sessionKey);
const requesterIsSubagent = requesterDepth >= 1;
const origin = item.origin;
const threadId =
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
// Share one announce identity across direct and queued delivery paths so
// gateway dedupe suppresses true retries without collapsing distinct events.
const idempotencyKey = buildAnnounceIdempotencyKey(
resolveQueueAnnounceId({
announceId: item.announceId,
sessionKey: item.sessionKey,
enqueuedAt: item.enqueuedAt,
}),
);
await callGateway({
method: "agent",
params: {
sessionKey: item.sessionKey,
message: item.prompt,
channel: requesterIsSubagent ? undefined : origin?.channel,
accountId: requesterIsSubagent ? undefined : origin?.accountId,
to: requesterIsSubagent ? undefined : origin?.to,
threadId: requesterIsSubagent ? undefined : threadId,
deliver: !requesterIsSubagent,
internalEvents: item.internalEvents,
idempotencyKey,
},
timeoutMs: announceTimeoutMs,
});
}
function resolveRequesterStoreKey(
cfg: ReturnType<typeof loadConfig>,
requesterSessionKey: string,
): string {
const raw = (requesterSessionKey ?? "").trim();
if (!raw) {
return raw;
}
if (raw === "global" || raw === "unknown") {
return raw;
}
if (raw.startsWith("agent:")) {
return raw;
}
const mainKey = normalizeMainKey(cfg.session?.mainKey);
if (raw === "main" || raw === mainKey) {
return resolveMainSessionKey(cfg);
}
const agentId = resolveAgentIdFromSessionKey(raw);
return `agent:${agentId}:${raw}`;
}
function loadRequesterSessionEntry(requesterSessionKey: string) {
const cfg = loadConfig();
const canonicalKey = resolveRequesterStoreKey(cfg, requesterSessionKey);
const agentId = resolveAgentIdFromSessionKey(canonicalKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
const entry = store[canonicalKey];
return { cfg, entry, canonicalKey };
}
function buildAnnounceQueueKey(sessionKey: string, origin?: DeliveryContext): string {
const accountId = normalizeAccountId(origin?.accountId);
if (!accountId) {
return sessionKey;
}
return `${sessionKey}:acct:${accountId}`;
}
async function maybeQueueSubagentAnnounce(params: {
requesterSessionKey: string;
announceId?: string;
triggerMessage: string;
steerMessage: string;
summaryLine?: string;
requesterOrigin?: DeliveryContext;
internalEvents?: AgentInternalEvent[];
signal?: AbortSignal;
}): Promise<"steered" | "queued" | "none"> {
if (params.signal?.aborted) {
return "none";
}
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
const sessionId = entry?.sessionId;
if (!sessionId) {
return "none";
}
const queueSettings = resolveQueueSettings({
cfg,
channel: entry?.channel ?? entry?.lastChannel,
sessionEntry: entry,
});
const isActive = isEmbeddedPiRunActive(sessionId);
const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog";
if (shouldSteer) {
const steered = queueEmbeddedPiMessage(sessionId, params.steerMessage);
if (steered) {
return "steered";
}
}
const shouldFollowup =
queueSettings.mode === "followup" ||
queueSettings.mode === "collect" ||
queueSettings.mode === "steer-backlog" ||
queueSettings.mode === "interrupt";
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
enqueueAnnounce({
key: buildAnnounceQueueKey(canonicalKey, origin),
item: {
announceId: params.announceId,
prompt: params.triggerMessage,
summaryLine: params.summaryLine,
internalEvents: params.internalEvents,
enqueuedAt: Date.now(),
sessionKey: canonicalKey,
origin,
},
settings: queueSettings,
send: sendAnnounce,
});
return "queued";
}
return "none";
}
async function sendSubagentAnnounceDirectly(params: {
targetRequesterSessionKey: string;
triggerMessage: string;
completionMessage?: string;
internalEvents?: AgentInternalEvent[];
expectsCompletionMessage: boolean;
bestEffortDeliver?: boolean;
completionRouteMode?: "bound" | "fallback" | "hook";
spawnMode?: SpawnSubagentMode;
directIdempotencyKey: string;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
requesterIsSubagent: boolean;
signal?: AbortSignal;
}): Promise<SubagentAnnounceDeliveryResult> {
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
const cfg = loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
cfg,
params.targetRequesterSessionKey,
);
try {
const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin);
const completionChannelRaw =
typeof completionDirectOrigin?.channel === "string"
? completionDirectOrigin.channel.trim()
: "";
const completionChannel =
completionChannelRaw && isDeliverableMessageChannel(completionChannelRaw)
? completionChannelRaw
: "";
const completionTo =
typeof completionDirectOrigin?.to === "string" ? completionDirectOrigin.to.trim() : "";
const hasCompletionDirectTarget =
!params.requesterIsSubagent && Boolean(completionChannel) && Boolean(completionTo);
if (
params.expectsCompletionMessage &&
hasCompletionDirectTarget &&
params.completionMessage?.trim()
) {
const forceBoundSessionDirectDelivery =
params.spawnMode === "session" &&
(params.completionRouteMode === "bound" || params.completionRouteMode === "hook");
let shouldSendCompletionDirectly = true;
if (!forceBoundSessionDirectDelivery) {
let activeDescendantRuns = 0;
try {
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
activeDescendantRuns = Math.max(
0,
countActiveDescendantRuns(canonicalRequesterSessionKey),
);
} catch {
// Best-effort only; when unavailable keep historical direct-send behavior.
}
// Keep non-bound completion announcements coordinated via requester
// session routing while sibling/descendant runs are still active.
if (activeDescendantRuns > 0) {
shouldSendCompletionDirectly = false;
}
}
if (shouldSendCompletionDirectly) {
const completionThreadId =
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
? String(completionDirectOrigin.threadId)
: undefined;
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
await runAnnounceDeliveryWithRetry({
operation: "completion direct send",
signal: params.signal,
run: async () =>
await callGateway({
method: "send",
params: {
channel: completionChannel,
to: completionTo,
accountId: completionDirectOrigin?.accountId,
threadId: completionThreadId,
sessionKey: canonicalRequesterSessionKey,
message: params.completionMessage,
idempotencyKey: params.directIdempotencyKey,
},
timeoutMs: announceTimeoutMs,
}),
});
return {
delivered: true,
path: "direct",
};
}
}
const directOrigin = normalizeDeliveryContext(params.directOrigin);
const directChannelRaw =
typeof directOrigin?.channel === "string" ? directOrigin.channel.trim() : "";
const directChannel =
directChannelRaw && isDeliverableMessageChannel(directChannelRaw) ? directChannelRaw : "";
const directTo = typeof directOrigin?.to === "string" ? directOrigin.to.trim() : "";
const hasDeliverableDirectTarget =
!params.requesterIsSubagent && Boolean(directChannel) && Boolean(directTo);
const shouldDeliverExternally =
!params.requesterIsSubagent &&
(!params.expectsCompletionMessage || hasDeliverableDirectTarget);
const threadId =
directOrigin?.threadId != null && directOrigin.threadId !== ""
? String(directOrigin.threadId)
: undefined;
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
await runAnnounceDeliveryWithRetry({
operation: "direct announce agent call",
signal: params.signal,
run: async () =>
await callGateway({
method: "agent",
params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: shouldDeliverExternally,
bestEffortDeliver: params.bestEffortDeliver,
internalEvents: params.internalEvents,
channel: shouldDeliverExternally ? directChannel : undefined,
accountId: shouldDeliverExternally ? directOrigin?.accountId : undefined,
to: shouldDeliverExternally ? directTo : undefined,
threadId: shouldDeliverExternally ? threadId : undefined,
idempotencyKey: params.directIdempotencyKey,
},
expectFinal: true,
timeoutMs: announceTimeoutMs,
}),
});
return {
delivered: true,
path: "direct",
};
} catch (err) {
return {
delivered: false,
path: "direct",
error: summarizeDeliveryError(err),
};
}
}
async function deliverSubagentAnnouncement(params: {
requesterSessionKey: string;
announceId?: string;
triggerMessage: string;
steerMessage: string;
completionMessage?: string;
internalEvents?: AgentInternalEvent[];
summaryLine?: string;
requesterOrigin?: DeliveryContext;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
targetRequesterSessionKey: string;
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
bestEffortDeliver?: boolean;
completionRouteMode?: "bound" | "fallback" | "hook";
spawnMode?: SpawnSubagentMode;
directIdempotencyKey: string;
signal?: AbortSignal;
}): Promise<SubagentAnnounceDeliveryResult> {
return await runSubagentAnnounceDispatch({
expectsCompletionMessage: params.expectsCompletionMessage,
signal: params.signal,
queue: async () =>
await maybeQueueSubagentAnnounce({
requesterSessionKey: params.requesterSessionKey,
announceId: params.announceId,
triggerMessage: params.triggerMessage,
steerMessage: params.steerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
internalEvents: params.internalEvents,
signal: params.signal,
}),
direct: async () =>
await sendSubagentAnnounceDirectly({
targetRequesterSessionKey: params.targetRequesterSessionKey,
triggerMessage: params.triggerMessage,
completionMessage: params.completionMessage,
internalEvents: params.internalEvents,
directIdempotencyKey: params.directIdempotencyKey,
completionDirectOrigin: params.completionDirectOrigin,
completionRouteMode: params.completionRouteMode,
spawnMode: params.spawnMode,
directOrigin: params.directOrigin,
requesterIsSubagent: params.requesterIsSubagent,
expectsCompletionMessage: params.expectsCompletionMessage,
signal: params.signal,
bestEffortDeliver: params.bestEffortDeliver,
}),
});
}
function loadSessionEntryByKey(sessionKey: string) {
const cfg = loadConfig();
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
return store[sessionKey];
}
export function buildSubagentSystemPrompt(params: {
requesterSessionKey?: string;
requesterOrigin?: DeliveryContext;
childSessionKey: string;
label?: string;
task?: string;
/** Whether ACP-specific routing guidance should be included. Defaults to true. */
acpEnabled?: boolean;
/** Depth of the child being spawned (1 = sub-agent, 2 = sub-sub-agent). */
childDepth?: number;
/** Config value: max allowed spawn depth. */
maxSpawnDepth?: number;
}) {
const taskText =
typeof params.task === "string" && params.task.trim()
? params.task.replace(/\s+/g, " ").trim()
: "{{TASK_DESCRIPTION}}";
const childDepth = typeof params.childDepth === "number" ? params.childDepth : 1;
const maxSpawnDepth =
typeof params.maxSpawnDepth === "number"
? params.maxSpawnDepth
: DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
const acpEnabled = params.acpEnabled !== false;
const canSpawn = childDepth < maxSpawnDepth;
const parentLabel = childDepth >= 2 ? "parent orchestrator" : "main agent";
const lines = [
"# Subagent Context",
"",
`You are a **subagent** spawned by the ${parentLabel} for a specific task.`,
"",
"## Your Role",
`- You were created to handle: ${taskText}`,
"- Complete this task. That's your entire purpose.",
`- You are NOT the ${parentLabel}. Don't try to be.`,
"",
"## Rules",
"1. **Stay focused** - Do your assigned task, nothing else",
`2. **Complete the task** - Your final message will be automatically reported to the ${parentLabel}`,
"3. **Don't initiate** - No heartbeats, no proactive actions, no side quests",
"4. **Be ephemeral** - You may be terminated after task completion. That's fine.",
"5. **Trust push-based completion** - Descendant results are auto-announced back to you; do not busy-poll for status.",
"6. **Recover from compacted/truncated tool output** - If you see `[compacted: tool output removed to free context]` or `[truncated: output exceeded context limit]`, assume prior output was reduced. Re-read only what you need using smaller chunks (`read` with offset/limit, or targeted `rg`/`head`/`tail`) instead of full-file `cat`.",
"",
"## Output Format",
"When complete, your final response should include:",
`- What you accomplished or found`,
`- Any relevant details the ${parentLabel} should know`,
"- Keep it concise but informative",
"",
"## What You DON'T Do",
`- NO user conversations (that's ${parentLabel}'s job)`,
"- NO external messages (email, tweets, etc.) unless explicitly tasked with a specific recipient/channel",
"- NO cron jobs or persistent state",
`- NO pretending to be the ${parentLabel}`,
`- Only use the \`message\` tool when explicitly instructed to contact a specific external recipient; otherwise return plain text and let the ${parentLabel} deliver it`,
"",
];
if (canSpawn) {
lines.push(
"## Sub-Agent Spawning",
"You CAN spawn your own sub-agents for parallel or complex work using `sessions_spawn`.",
"Use the `subagents` tool to steer, kill, or do an on-demand status check for your spawned sub-agents.",
"Your sub-agents will announce their results back to you automatically (not to the main agent).",
"Default workflow: spawn work, continue orchestrating, and wait for auto-announced completions.",
"Do NOT repeatedly poll `subagents list` in a loop unless you are actively debugging or intervening.",
"Coordinate their work and synthesize results before reporting back.",
...(acpEnabled
? [
'For ACP harness sessions (codex/claudecode/gemini), use `sessions_spawn` with `runtime: "acp"` (set `agentId` unless `acp.defaultAgent` is configured).',
'`agents_list` and `subagents` apply to OpenClaw sub-agents (`runtime: "subagent"`); ACP harness ids are controlled by `acp.allowedAgents`.',
"Do not ask users to run slash commands or CLI when `sessions_spawn` can do it directly.",
"Do not use `exec` (`openclaw ...`, `acpx ...`) to spawn ACP sessions.",
'Use `subagents` only for OpenClaw subagents (`runtime: "subagent"`).',
"Subagent results auto-announce back to you; ACP sessions continue in their bound thread.",
"Avoid polling loops; spawn, orchestrate, and synthesize results.",
]
: []),
"",
);
} else if (childDepth >= 2) {
lines.push(
"## Sub-Agent Spawning",
"You are a leaf worker and CANNOT spawn further sub-agents. Focus on your assigned task.",
"",
);
}
lines.push(
"## Session Context",
...[
params.label ? `- Label: ${params.label}` : undefined,
params.requesterSessionKey
? `- Requester session: ${params.requesterSessionKey}.`
: undefined,
params.requesterOrigin?.channel
? `- Requester channel: ${params.requesterOrigin.channel}.`
: undefined,
`- Your session: ${params.childSessionKey}.`,
].filter((line): line is string => line !== undefined),
"",
);
return lines.join("\n");
}
export type SubagentRunOutcome = {
status: "ok" | "error" | "timeout" | "unknown";
error?: string;
};
export type SubagentAnnounceType = "subagent task" | "cron job";
function buildAnnounceReplyInstruction(params: {
remainingActiveSubagentRuns: number;
requesterIsSubagent: boolean;
announceType: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
}): string {
if (params.remainingActiveSubagentRuns > 0) {
const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs";
return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`;
}
if (params.requesterIsSubagent) {
return `Convert this completion into a concise internal orchestration update for your parent agent in your own words. Keep this internal context private (don't mention system/log/stats/session details or announce type). If this result is duplicate or no update is needed, reply ONLY: ${SILENT_REPLY_TOKEN}.`;
}
if (params.expectsCompletionMessage) {
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`;
}
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type), and do not copy the internal event text verbatim. Reply ONLY: ${SILENT_REPLY_TOKEN} if this exact result was already delivered to the user in this same turn.`;
}
function buildAnnounceSteerMessage(events: AgentInternalEvent[]): string {
const rendered = formatAgentInternalEventsForPrompt(events);
if (!rendered) {
return "A background task finished. Process the completion update now.";
}
return rendered;
}
export async function runSubagentAnnounceFlow(params: {
childSessionKey: string;
childRunId: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
requesterDisplayKey: string;
task: string;
timeoutMs: number;
cleanup: "delete" | "keep";
roundOneReply?: string;
waitForCompletion?: boolean;
startedAt?: number;
endedAt?: number;
label?: string;
outcome?: SubagentRunOutcome;
announceType?: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
spawnMode?: SpawnSubagentMode;
signal?: AbortSignal;
bestEffortDeliver?: boolean;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
let shouldDeleteChildSession = params.cleanup === "delete";
try {
let targetRequesterSessionKey = params.requesterSessionKey;
let targetRequesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const childSessionId = (() => {
const entry = loadSessionEntryByKey(params.childSessionKey);
return typeof entry?.sessionId === "string" && entry.sessionId.trim()
? entry.sessionId.trim()
: undefined;
})();
const settleTimeoutMs = Math.min(Math.max(params.timeoutMs, 1), 120_000);
let reply = params.roundOneReply;
let outcome: SubagentRunOutcome | undefined = params.outcome;
// Lifecycle "end" can arrive before auto-compaction retries finish. If the
// subagent is still active, wait for the embedded run to fully settle.
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
// The child run is still active (e.g., compaction retry still in progress).
// Defer announcement so we don't report stale/partial output.
// Keep the child session so output is not lost while the run is still active.
shouldDeleteChildSession = false;
return false;
}
}
if (!reply && params.waitForCompletion !== false) {
const waitMs = settleTimeoutMs;
const wait = await callGateway<{
status?: string;
startedAt?: number;
endedAt?: number;
error?: string;
}>({
method: "agent.wait",
params: {
runId: params.childRunId,
timeoutMs: waitMs,
},
timeoutMs: waitMs + 2000,
});
const waitError = typeof wait?.error === "string" ? wait.error : undefined;
if (wait?.status === "timeout") {
outcome = { status: "timeout" };
} else if (wait?.status === "error") {
outcome = { status: "error", error: waitError };
} else if (wait?.status === "ok") {
outcome = { status: "ok" };
}
if (typeof wait?.startedAt === "number" && !params.startedAt) {
params.startedAt = wait.startedAt;
}
if (typeof wait?.endedAt === "number" && !params.endedAt) {
params.endedAt = wait.endedAt;
}
if (wait?.status === "timeout") {
if (!outcome) {
outcome = { status: "timeout" };
}
}
reply = await readLatestSubagentOutput(params.childSessionKey);
}
if (!reply) {
reply = await readLatestSubagentOutput(params.childSessionKey);
}
if (!reply?.trim()) {
reply = await readLatestSubagentOutputWithRetry({
sessionKey: params.childSessionKey,
maxWaitMs: params.timeoutMs,
});
}
if (
!expectsCompletionMessage &&
!reply?.trim() &&
childSessionId &&
isEmbeddedPiRunActive(childSessionId)
) {
// Avoid announcing "(no output)" while the child run is still producing output.
shouldDeleteChildSession = false;
return false;
}
if (isAnnounceSkip(reply)) {
return true;
}
if (isSilentReplyText(reply, SILENT_REPLY_TOKEN)) {
return true;
}
if (!outcome) {
outcome = { status: "unknown" };
}
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
let activeChildDescendantRuns = 0;
try {
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
activeChildDescendantRuns = Math.max(0, countActiveDescendantRuns(params.childSessionKey));
} catch {
// Best-effort only; fall back to direct announce behavior when unavailable.
}
if (activeChildDescendantRuns > 0) {
// The finished run still has active descendant subagents. Defer announcing
// this run until descendants settle so we avoid posting in-progress updates.
shouldDeleteChildSession = false;
return false;
}
if (requesterDepth >= 1 && reply?.trim()) {
const minReplyChangeWaitMs = FAST_TEST_MODE ? FAST_TEST_REPLY_CHANGE_WAIT_MS : 250;
reply = await waitForSubagentOutputChange({
sessionKey: params.childSessionKey,
baselineReply: reply,
maxWaitMs: Math.max(minReplyChangeWaitMs, Math.min(params.timeoutMs, 2_000)),
});
}
// Build status label
const statusLabel =
outcome.status === "ok"
? "completed successfully"
: outcome.status === "timeout"
? "timed out"
: outcome.status === "error"
? `failed: ${outcome.error || "unknown error"}`
: "finished with unknown status";
// Build instructional message for main agent
const announceType = params.announceType ?? "subagent task";
const taskLabel = params.label || params.task || "task";
const subagentName = resolveAgentIdFromSessionKey(params.childSessionKey);
const announceSessionId = childSessionId || "unknown";
const findings = reply || "(no output)";
let completionMessage = "";
let triggerMessage = "";
let steerMessage = "";
let internalEvents: AgentInternalEvent[] = [];
let requesterIsSubagent = requesterDepth >= 1;
// If the requester subagent has already finished, bubble the announce to its
// requester (typically main) so descendant completion is not silently lost.
// BUT: only fallback if the parent SESSION is deleted, not just if the current
// run ended. A parent waiting for child results has no active run but should
// still receive the announce — injecting will start a new agent turn.
if (requesterIsSubagent) {
const { isSubagentSessionRunActive, resolveRequesterForChildSession } =
await import("./subagent-registry.js");
if (!isSubagentSessionRunActive(targetRequesterSessionKey)) {
// Parent run has ended. Check if parent SESSION still exists.
// If it does, the parent may be waiting for child results — inject there.
const parentSessionEntry = loadSessionEntryByKey(targetRequesterSessionKey);
const parentSessionAlive =
parentSessionEntry &&
typeof parentSessionEntry.sessionId === "string" &&
parentSessionEntry.sessionId.trim();
if (!parentSessionAlive) {
// Parent session is truly gone — fallback to grandparent
const fallback = resolveRequesterForChildSession(targetRequesterSessionKey);
if (!fallback?.requesterSessionKey) {
// Without a requester fallback we cannot safely deliver this nested
// completion. Keep cleanup retryable so a later registry restore can
// recover and re-announce instead of silently dropping the result.
shouldDeleteChildSession = false;
return false;
}
targetRequesterSessionKey = fallback.requesterSessionKey;
targetRequesterOrigin =
normalizeDeliveryContext(fallback.requesterOrigin) ?? targetRequesterOrigin;
requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
requesterIsSubagent = requesterDepth >= 1;
}
// If parent session is alive (just has no active run), continue with parent
// as target. Injecting the announce will start a new agent turn for processing.
}
}
let remainingActiveSubagentRuns = 0;
try {
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
remainingActiveSubagentRuns = Math.max(
0,
countActiveDescendantRuns(targetRequesterSessionKey),
);
} catch {
// Best-effort only; fall back to default announce instructions when unavailable.
}
const replyInstruction = buildAnnounceReplyInstruction({
remainingActiveSubagentRuns,
requesterIsSubagent,
announceType,
expectsCompletionMessage,
});
const statsLine = await buildCompactAnnounceStatsLine({
sessionKey: params.childSessionKey,
startedAt: params.startedAt,
endedAt: params.endedAt,
});
completionMessage = buildCompletionDeliveryMessage({
findings,
subagentName,
spawnMode: params.spawnMode,
outcome,
announceType,
});
internalEvents = [
{
type: "task_completion",
source: announceType === "cron job" ? "cron" : "subagent",
childSessionKey: params.childSessionKey,
childSessionId: announceSessionId,
announceType,
taskLabel,
status: outcome.status,
statusLabel,
result: findings,
statsLine,
replyInstruction,
},
];
triggerMessage = buildAnnounceSteerMessage(internalEvents);
steerMessage = triggerMessage;
const announceId = buildAnnounceIdFromChildRun({
childSessionKey: params.childSessionKey,
childRunId: params.childRunId,
});
// Send to the requester session. For nested subagents this is an internal
// follow-up injection (deliver=false) so the orchestrator receives it.
let directOrigin = targetRequesterOrigin;
if (!requesterIsSubagent) {
const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey);
directOrigin = resolveAnnounceOrigin(entry, targetRequesterOrigin);
}
const completionResolution =
expectsCompletionMessage && !requesterIsSubagent
? await resolveSubagentCompletionOrigin({
childSessionKey: params.childSessionKey,
requesterSessionKey: targetRequesterSessionKey,
requesterOrigin: directOrigin,
childRunId: params.childRunId,
spawnMode: params.spawnMode,
expectsCompletionMessage,
})
: {
origin: targetRequesterOrigin,
routeMode: "fallback" as const,
};
const completionDirectOrigin = completionResolution.origin;
// Use a deterministic idempotency key so the gateway dedup cache
// catches duplicates if this announce is also queued by the gateway-
// level message queue while the main session is busy (#17122).
const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId);
const delivery = await deliverSubagentAnnouncement({
requesterSessionKey: targetRequesterSessionKey,
announceId,
triggerMessage,
steerMessage,
completionMessage,
internalEvents,
summaryLine: taskLabel,
requesterOrigin:
expectsCompletionMessage && !requesterIsSubagent
? completionDirectOrigin
: targetRequesterOrigin,
completionDirectOrigin,
directOrigin,
targetRequesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
bestEffortDeliver: params.bestEffortDeliver,
completionRouteMode: completionResolution.routeMode,
spawnMode: params.spawnMode,
directIdempotencyKey,
signal: params.signal,
});
// Cron delivery state should only be marked as delivered when we have a
// direct path result. Queue/steer means "accepted for later processing",
// not a confirmed channel send, and can otherwise produce false positives.
if (
announceType === "cron job" &&
(delivery.path === "queued" || delivery.path === "steered")
) {
didAnnounce = false;
} else {
didAnnounce = delivery.delivered;
}
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
defaultRuntime.error?.(
`Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`,
);
}
} catch (err) {
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
} finally {
// Patch label after all writes complete
if (params.label) {
try {
await callGateway({
method: "sessions.patch",
params: { key: params.childSessionKey, label: params.label },
timeoutMs: 10_000,
});
} catch {
// Best-effort
}
}
if (shouldDeleteChildSession) {
try {
await callGateway({
method: "sessions.delete",
params: {
key: params.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {
// ignore
}
}
}
return didAnnounce;
}