Files
openclaw/src/agents/subagent-announce.ts
2026-04-11 02:46:41 +01:00

561 lines
20 KiB
TypeScript

import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { defaultRuntime } from "../runtime.js";
import { isCronSessionKey } from "../sessions/session-key-utils.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js";
import {
buildAnnounceIdFromChildRun,
buildAnnounceIdempotencyKey,
} from "./announce-idempotency.js";
import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "./internal-events.js";
import {
deliverSubagentAnnouncement,
loadRequesterSessionEntry,
loadSessionEntryByKey,
runAnnounceDeliveryWithRetry,
resolveSubagentAnnounceTimeoutMs,
resolveSubagentCompletionOrigin,
} from "./subagent-announce-delivery.js";
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
import {
applySubagentWaitOutcome,
buildChildCompletionFindings,
buildCompactAnnounceStatsLine,
dedupeLatestChildCompletionRows,
filterCurrentDirectChildCompletionRows,
readLatestSubagentOutputWithRetry,
readSubagentOutput,
type SubagentRunOutcome,
waitForSubagentRunOutcome,
} from "./subagent-announce-output.js";
import {
callGateway,
isEmbeddedPiRunActive,
loadConfig,
waitForEmbeddedPiRunEnd,
} from "./subagent-announce.runtime.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.types.js";
import { isAnnounceSkip } from "./tools/sessions-send-tokens.js";
type SubagentAnnounceDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
};
const defaultSubagentAnnounceDeps: SubagentAnnounceDeps = {
callGateway,
loadConfig,
};
let subagentAnnounceDeps: SubagentAnnounceDeps = defaultSubagentAnnounceDeps;
let subagentRegistryRuntimePromise: Promise<
typeof import("./subagent-announce.registry.runtime.js")
> | null = null;
function loadSubagentRegistryRuntime() {
subagentRegistryRuntimePromise ??= import("./subagent-announce.registry.runtime.js");
return subagentRegistryRuntimePromise;
}
export { buildSubagentSystemPrompt } from "./subagent-system-prompt.js";
export { captureSubagentCompletionReply } from "./subagent-announce-output.js";
export type { SubagentRunOutcome } from "./subagent-announce-output.js";
export type SubagentAnnounceType = "subagent task" | "cron job";
function buildAnnounceReplyInstruction(params: {
requesterIsSubagent: boolean;
announceType: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
}): string {
if (params.requesterIsSubagent) {
return `Convert this completion into a concise internal orchestration update for your parent agent in your own words. Keep this internal context private (don't mention system/log/stats/session details or announce type). If this result is duplicate or no update is needed, reply ONLY: ${SILENT_REPLY_TOKEN}.`;
}
if (params.expectsCompletionMessage) {
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`;
}
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type), and do not copy the internal event text verbatim. Reply ONLY: ${SILENT_REPLY_TOKEN} if this exact result was already delivered to the user in this same turn.`;
}
function buildAnnounceSteerMessage(events: AgentInternalEvent[]): string {
return (
formatAgentInternalEventsForPrompt(events) ||
"A background task finished. Process the completion update now."
);
}
function hasUsableSessionEntry(entry: unknown): boolean {
if (!entry || typeof entry !== "object") {
return false;
}
const sessionId = (entry as { sessionId?: unknown }).sessionId;
return typeof sessionId !== "string" || sessionId.trim() !== "";
}
function buildDescendantWakeMessage(params: { findings: string; taskLabel: string }): string {
return [
"[Subagent Context] Your prior run ended while waiting for descendant subagent completions.",
"[Subagent Context] All pending descendants for that run have now settled.",
"[Subagent Context] Continue your workflow using these results. Spawn more subagents if needed, otherwise send your final answer.",
"",
`Task: ${params.taskLabel}`,
"",
params.findings,
].join("\n");
}
const WAKE_RUN_SUFFIX = ":wake";
function stripWakeRunSuffixes(runId: string): string {
let next = runId.trim();
while (next.endsWith(WAKE_RUN_SUFFIX)) {
next = next.slice(0, -WAKE_RUN_SUFFIX.length);
}
return next || runId.trim();
}
function isWakeContinuationRun(runId: string): boolean {
const trimmed = runId.trim();
if (!trimmed) {
return false;
}
return stripWakeRunSuffixes(trimmed) !== trimmed;
}
async function wakeSubagentRunAfterDescendants(params: {
runId: string;
childSessionKey: string;
taskLabel: string;
findings: string;
announceId: string;
signal?: AbortSignal;
}): Promise<boolean> {
if (params.signal?.aborted) {
return false;
}
const childEntry = loadSessionEntryByKey(params.childSessionKey);
if (!hasUsableSessionEntry(childEntry)) {
return false;
}
const cfg = subagentAnnounceDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const wakeMessage = buildDescendantWakeMessage({
findings: params.findings,
taskLabel: params.taskLabel,
});
let wakeRunId = "";
try {
const wakeResponse = await runAnnounceDeliveryWithRetry<{ runId?: string }>({
operation: "descendant wake agent call",
signal: params.signal,
run: async () =>
await subagentAnnounceDeps.callGateway({
method: "agent",
params: {
sessionKey: params.childSessionKey,
message: wakeMessage,
deliver: false,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: params.childSessionKey,
sourceChannel: INTERNAL_MESSAGE_CHANNEL,
sourceTool: "subagent_announce",
},
idempotencyKey: buildAnnounceIdempotencyKey(`${params.announceId}:wake`),
},
timeoutMs: announceTimeoutMs,
}),
});
wakeRunId = normalizeOptionalString(wakeResponse?.runId) ?? "";
} catch {
return false;
}
if (!wakeRunId) {
return false;
}
const { replaceSubagentRunAfterSteer } = await loadSubagentRegistryRuntime();
return replaceSubagentRunAfterSteer({
previousRunId: params.runId,
nextRunId: wakeRunId,
preserveFrozenResultFallback: true,
});
}
export async function runSubagentAnnounceFlow(params: {
childSessionKey: string;
childRunId: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
requesterDisplayKey: string;
task: string;
timeoutMs: number;
cleanup: "delete" | "keep";
roundOneReply?: string;
/**
* Fallback text preserved from the pre-wake run when a wake continuation
* completes with NO_REPLY despite an earlier final summary already existing.
*/
fallbackReply?: string;
waitForCompletion?: boolean;
startedAt?: number;
endedAt?: number;
label?: string;
outcome?: SubagentRunOutcome;
announceType?: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
spawnMode?: SpawnSubagentMode;
wakeOnDescendantSettle?: boolean;
signal?: AbortSignal;
bestEffortDeliver?: boolean;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
const announceType = params.announceType ?? "subagent task";
let shouldDeleteChildSession = params.cleanup === "delete";
try {
let targetRequesterSessionKey = params.requesterSessionKey;
let targetRequesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const childSessionId = (() => {
const entry = loadSessionEntryByKey(params.childSessionKey);
return typeof entry?.sessionId === "string" && entry.sessionId.trim()
? entry.sessionId.trim()
: undefined;
})();
const settleTimeoutMs = Math.min(Math.max(params.timeoutMs, 1), 120_000);
let reply = params.roundOneReply;
let outcome: SubagentRunOutcome | undefined = params.outcome;
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
shouldDeleteChildSession = false;
return false;
}
}
if (!reply && params.waitForCompletion !== false) {
const wait = await waitForSubagentRunOutcome(params.childRunId, settleTimeoutMs);
const applied = applySubagentWaitOutcome({
wait,
outcome,
startedAt: params.startedAt,
endedAt: params.endedAt,
});
outcome = applied.outcome;
params.startedAt = applied.startedAt;
params.endedAt = applied.endedAt;
}
if (!outcome) {
outcome = { status: "unknown" };
}
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
const requesterIsInternalSession = () =>
requesterDepth >= 1 || isCronSessionKey(targetRequesterSessionKey);
let childCompletionFindings: string | undefined;
let subagentRegistryRuntime:
| Awaited<ReturnType<typeof loadSubagentRegistryRuntime>>
| undefined;
try {
subagentRegistryRuntime = await loadSubagentRegistryRuntime();
if (
requesterDepth >= 1 &&
subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession(
targetRequesterSessionKey,
)
) {
return true;
}
const pendingChildDescendantRuns = Math.max(
0,
subagentRegistryRuntime.countPendingDescendantRuns(params.childSessionKey),
);
if (pendingChildDescendantRuns > 0 && announceType !== "cron job") {
shouldDeleteChildSession = false;
return false;
}
if (typeof subagentRegistryRuntime.listSubagentRunsForRequester === "function") {
const directChildren = subagentRegistryRuntime.listSubagentRunsForRequester(
params.childSessionKey,
{
requesterRunId: params.childRunId,
},
);
if (Array.isArray(directChildren) && directChildren.length > 0) {
childCompletionFindings = buildChildCompletionFindings(
dedupeLatestChildCompletionRows(
filterCurrentDirectChildCompletionRows(directChildren, {
requesterSessionKey: params.childSessionKey,
getLatestSubagentRunByChildSessionKey:
subagentRegistryRuntime.getLatestSubagentRunByChildSessionKey,
}),
),
);
}
}
} catch {
// Best-effort only.
}
const announceId = buildAnnounceIdFromChildRun({
childSessionKey: params.childSessionKey,
childRunId: params.childRunId,
});
const childRunAlreadyWoken = isWakeContinuationRun(params.childRunId);
if (
params.wakeOnDescendantSettle === true &&
childCompletionFindings?.trim() &&
!childRunAlreadyWoken
) {
const wakeAnnounceId = buildAnnounceIdFromChildRun({
childSessionKey: params.childSessionKey,
childRunId: stripWakeRunSuffixes(params.childRunId),
});
const woke = await wakeSubagentRunAfterDescendants({
runId: params.childRunId,
childSessionKey: params.childSessionKey,
taskLabel: params.label || params.task || "task",
findings: childCompletionFindings,
announceId: wakeAnnounceId,
signal: params.signal,
});
if (woke) {
shouldDeleteChildSession = false;
return true;
}
}
if (!childCompletionFindings) {
const fallbackReply = normalizeOptionalString(params.fallbackReply);
const fallbackIsSilent =
Boolean(fallbackReply) &&
(isAnnounceSkip(fallbackReply) || isSilentReplyText(fallbackReply, SILENT_REPLY_TOKEN));
if (!reply) {
reply = await readSubagentOutput(params.childSessionKey, outcome);
}
if (!reply?.trim()) {
reply = await readLatestSubagentOutputWithRetry({
sessionKey: params.childSessionKey,
maxWaitMs: params.timeoutMs,
outcome,
});
}
if (!reply?.trim() && fallbackReply && !fallbackIsSilent) {
reply = fallbackReply;
}
// A worker can finish just after the first wait request timed out.
// If we already have real completion content, do one cached recheck so
// the final completion event prefers the authoritative terminal state.
// This is best-effort; if the recheck fails, keep the known timeout
// outcome instead of dropping the announcement entirely.
if (outcome?.status === "timeout" && reply?.trim() && params.waitForCompletion !== false) {
try {
const rechecked = await waitForSubagentRunOutcome(params.childRunId, 0);
const applied = applySubagentWaitOutcome({
wait: rechecked,
outcome,
startedAt: params.startedAt,
endedAt: params.endedAt,
});
outcome = applied.outcome;
params.startedAt = applied.startedAt;
params.endedAt = applied.endedAt;
} catch {
// Best-effort recheck; keep the existing timeout outcome on failure.
}
}
if (isAnnounceSkip(reply) || isSilentReplyText(reply, SILENT_REPLY_TOKEN)) {
if (fallbackReply && !fallbackIsSilent) {
reply = fallbackReply;
} else {
return true;
}
}
}
if (!outcome) {
outcome = { status: "unknown" };
}
// Build status label
const statusLabel =
outcome.status === "ok"
? "completed successfully"
: outcome.status === "timeout"
? "timed out"
: outcome.status === "error"
? `failed: ${outcome.error || "unknown error"}`
: "finished with unknown status";
const taskLabel = params.label || params.task || "task";
const announceSessionId = childSessionId || "unknown";
const findings = childCompletionFindings || reply || "(no output)";
let requesterIsSubagent = requesterIsInternalSession();
if (requesterIsSubagent) {
const {
isSubagentSessionRunActive,
resolveRequesterForChildSession,
shouldIgnorePostCompletionAnnounceForSession,
} = subagentRegistryRuntime ?? (await loadSubagentRegistryRuntime());
if (!isSubagentSessionRunActive(targetRequesterSessionKey)) {
if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) {
return true;
}
const parentSessionEntry = loadSessionEntryByKey(targetRequesterSessionKey);
const parentSessionAlive = hasUsableSessionEntry(parentSessionEntry);
if (!parentSessionAlive) {
const fallback = resolveRequesterForChildSession(targetRequesterSessionKey);
if (!fallback?.requesterSessionKey) {
shouldDeleteChildSession = false;
return false;
}
targetRequesterSessionKey = fallback.requesterSessionKey;
targetRequesterOrigin =
normalizeDeliveryContext(fallback.requesterOrigin) ?? targetRequesterOrigin;
requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
requesterIsSubagent = requesterIsInternalSession();
}
}
}
const replyInstruction = buildAnnounceReplyInstruction({
requesterIsSubagent,
announceType,
expectsCompletionMessage,
});
const statsLine = await buildCompactAnnounceStatsLine({
sessionKey: params.childSessionKey,
startedAt: params.startedAt,
endedAt: params.endedAt,
});
const internalEvents: AgentInternalEvent[] = [
{
type: "task_completion",
source: announceType === "cron job" ? "cron" : "subagent",
childSessionKey: params.childSessionKey,
childSessionId: announceSessionId,
announceType,
taskLabel,
status: outcome.status,
statusLabel,
result: findings,
statsLine,
replyInstruction,
},
];
const triggerMessage = buildAnnounceSteerMessage(internalEvents);
// Send to the requester session. For nested subagents this is an internal
// follow-up injection (deliver=false) so the orchestrator receives it.
let directOrigin = targetRequesterOrigin;
if (!requesterIsSubagent) {
const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey);
directOrigin = resolveAnnounceOrigin(entry, targetRequesterOrigin);
}
const completionDirectOrigin =
expectsCompletionMessage && !requesterIsSubagent
? await resolveSubagentCompletionOrigin({
childSessionKey: params.childSessionKey,
requesterSessionKey: targetRequesterSessionKey,
requesterOrigin: directOrigin,
childRunId: params.childRunId,
spawnMode: params.spawnMode,
expectsCompletionMessage,
})
: targetRequesterOrigin;
const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId);
const delivery = await deliverSubagentAnnouncement({
requesterSessionKey: targetRequesterSessionKey,
announceId,
triggerMessage,
steerMessage: triggerMessage,
internalEvents,
summaryLine: taskLabel,
requesterSessionOrigin: targetRequesterOrigin,
requesterOrigin:
expectsCompletionMessage && !requesterIsSubagent
? completionDirectOrigin
: targetRequesterOrigin,
completionDirectOrigin,
directOrigin,
sourceSessionKey: params.childSessionKey,
sourceChannel: INTERNAL_MESSAGE_CHANNEL,
sourceTool: "subagent_announce",
targetRequesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
bestEffortDeliver: params.bestEffortDeliver,
directIdempotencyKey,
signal: params.signal,
});
didAnnounce = delivery.delivered;
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
defaultRuntime.error?.(
`Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`,
);
}
} catch (err) {
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
} finally {
// Patch label after all writes complete
if (params.label) {
try {
await subagentAnnounceDeps.callGateway({
method: "sessions.patch",
params: { key: params.childSessionKey, label: params.label },
timeoutMs: 10_000,
});
} catch {
// Best-effort
}
}
if (shouldDeleteChildSession) {
try {
await subagentAnnounceDeps.callGateway({
method: "sessions.delete",
params: {
key: params.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: params.spawnMode === "session",
},
timeoutMs: 10_000,
});
} catch {
// ignore
}
}
}
return didAnnounce;
}
export const __testing = {
setDepsForTest(overrides?: Partial<SubagentAnnounceDeps>) {
subagentAnnounceDeps = overrides
? {
...defaultSubagentAnnounceDeps,
...overrides,
}
: defaultSubagentAnnounceDeps;
},
};