import { promises as fs } from "node:fs"; import path from "node:path"; import { loadConfig } from "../config/config.js"; import { loadSessionStore, resolveAgentIdFromSessionKey, resolveStorePath, type SessionEntry, } from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { defaultRuntime } from "../runtime.js"; import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js"; import { SUBAGENT_ENDED_OUTCOME_KILLED, SUBAGENT_ENDED_REASON_COMPLETE, SUBAGENT_ENDED_REASON_ERROR, SUBAGENT_ENDED_REASON_KILLED, type SubagentLifecycleEndedReason, } from "./subagent-lifecycle-events.js"; import { resolveCleanupCompletionReason, resolveDeferredCleanupDecision, } from "./subagent-registry-cleanup.js"; import { emitSubagentEndedHookOnce, resolveLifecycleOutcomeFromRunOutcome, runOutcomesEqual, } from "./subagent-registry-completion.js"; import { countActiveDescendantRunsFromRuns, countActiveRunsForSessionFromRuns, countPendingDescendantRunsExcludingRunFromRuns, countPendingDescendantRunsFromRuns, findRunIdsByChildSessionKeyFromRuns, listDescendantRunsForRequesterFromRuns, listRunsForRequesterFromRuns, resolveRequesterForChildSessionFromRuns, } from "./subagent-registry-queries.js"; import { getSubagentRunsSnapshotForRead, persistSubagentRunsToDisk, restoreSubagentRunsFromDisk, } from "./subagent-registry-state.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; export type { SubagentRunRecord } from "./subagent-registry.types.js"; const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; let listenerStarted = false; let listenerStop: (() => void) | null = null; // Use var to avoid TDZ when init runs across circular imports during bootstrap. var restoreAttempted = false; const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000; const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000; const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000; /** * Maximum number of announce delivery attempts before giving up. * Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly * returns `false` due to stale state or transient conditions (#18264). */ const MAX_ANNOUNCE_RETRY_COUNT = 3; /** * Non-completion announce entries older than this are force-expired even if * delivery never succeeded. */ const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes /** * Completion-message flows can wait for descendants to finish, but this hard * cap prevents indefinite pending state when descendants never fully settle. */ const ANNOUNCE_COMPLETION_HARD_EXPIRY_MS = 30 * 60_000; // 30 minutes type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id"; /** * Embedded runs can emit transient lifecycle `error` events while provider/model * retry is still in progress. Defer terminal error cleanup briefly so a * subsequent lifecycle `start` / `end` can cancel premature failure announces. */ const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000; function resolveAnnounceRetryDelayMs(retryCount: number) { const boundedRetryCount = Math.max(0, Math.min(retryCount, 10)); // retryCount is "attempts already made", so retry #1 waits 1s, then 2s, 4s... const backoffExponent = Math.max(0, boundedRetryCount - 1); const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** backoffExponent; return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS); } function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") { const retryCount = entry.announceRetryCount ?? 0; const endedAgoMs = typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined; const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a"; defaultRuntime.log( `[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}`, ); } function persistSubagentRuns() { persistSubagentRunsToDisk(subagentRuns); } function findSessionEntryByKey(store: Record, sessionKey: string) { const direct = store[sessionKey]; if (direct) { return direct; } const normalized = sessionKey.toLowerCase(); for (const [key, entry] of Object.entries(store)) { if (key.toLowerCase() === normalized) { return entry; } } return undefined; } function resolveSubagentRunOrphanReason(params: { entry: SubagentRunRecord; storeCache?: Map>; }): SubagentRunOrphanReason | null { const childSessionKey = params.entry.childSessionKey?.trim(); if (!childSessionKey) { return "missing-session-entry"; } try { const cfg = loadConfig(); const agentId = resolveAgentIdFromSessionKey(childSessionKey); const storePath = resolveStorePath(cfg.session?.store, { agentId }); let store = params.storeCache?.get(storePath); if (!store) { store = loadSessionStore(storePath); params.storeCache?.set(storePath, store); } const sessionEntry = findSessionEntryByKey(store, childSessionKey); if (!sessionEntry) { return "missing-session-entry"; } if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) { return "missing-session-id"; } return null; } catch { // Best-effort guard: avoid false orphan pruning on transient read/config failures. return null; } } function reconcileOrphanedRun(params: { runId: string; entry: SubagentRunRecord; reason: SubagentRunOrphanReason; source: "restore" | "resume"; }) { const now = Date.now(); let changed = false; if (typeof params.entry.endedAt !== "number") { params.entry.endedAt = now; changed = true; } const orphanOutcome: SubagentRunOutcome = { status: "error", error: `orphaned subagent run (${params.reason})`, }; if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) { params.entry.outcome = orphanOutcome; changed = true; } if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) { params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR; changed = true; } if (params.entry.cleanupHandled !== true) { params.entry.cleanupHandled = true; changed = true; } if (typeof params.entry.cleanupCompletedAt !== "number") { params.entry.cleanupCompletedAt = now; changed = true; } const removed = subagentRuns.delete(params.runId); resumedRuns.delete(params.runId); if (!removed && !changed) { return false; } defaultRuntime.log( `[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`, ); return true; } function reconcileOrphanedRestoredRuns() { const storeCache = new Map>(); let changed = false; for (const [runId, entry] of subagentRuns.entries()) { const orphanReason = resolveSubagentRunOrphanReason({ entry, storeCache, }); if (!orphanReason) { continue; } if ( reconcileOrphanedRun({ runId, entry, reason: orphanReason, source: "restore", }) ) { changed = true; } } return changed; } const resumedRuns = new Set(); const endedHookInFlightRunIds = new Set(); const pendingLifecycleErrorByRunId = new Map< string, { timer: NodeJS.Timeout; endedAt: number; error?: string; } >(); function clearPendingLifecycleError(runId: string) { const pending = pendingLifecycleErrorByRunId.get(runId); if (!pending) { return; } clearTimeout(pending.timer); pendingLifecycleErrorByRunId.delete(runId); } function clearAllPendingLifecycleErrors() { for (const pending of pendingLifecycleErrorByRunId.values()) { clearTimeout(pending.timer); } pendingLifecycleErrorByRunId.clear(); } function schedulePendingLifecycleError(params: { runId: string; endedAt: number; error?: string }) { clearPendingLifecycleError(params.runId); const timer = setTimeout(() => { const pending = pendingLifecycleErrorByRunId.get(params.runId); if (!pending || pending.timer !== timer) { return; } pendingLifecycleErrorByRunId.delete(params.runId); const entry = subagentRuns.get(params.runId); if (!entry) { return; } if (entry.endedReason === SUBAGENT_ENDED_REASON_COMPLETE || entry.outcome?.status === "ok") { return; } void completeSubagentRun({ runId: params.runId, endedAt: pending.endedAt, outcome: { status: "error", error: pending.error, }, reason: SUBAGENT_ENDED_REASON_ERROR, sendFarewell: true, accountId: entry.requesterOrigin?.accountId, triggerCleanup: true, }); }, LIFECYCLE_ERROR_RETRY_GRACE_MS); timer.unref?.(); pendingLifecycleErrorByRunId.set(params.runId, { timer, endedAt: params.endedAt, error: params.error, }); } function suppressAnnounceForSteerRestart(entry?: SubagentRunRecord) { return entry?.suppressAnnounceReason === "steer-restart"; } function shouldKeepThreadBindingAfterRun(params: { entry: SubagentRunRecord; reason: SubagentLifecycleEndedReason; }) { if (params.reason === SUBAGENT_ENDED_REASON_KILLED) { return false; } return params.entry.spawnMode === "session"; } function shouldEmitEndedHookForRun(params: { entry: SubagentRunRecord; reason: SubagentLifecycleEndedReason; }) { return !shouldKeepThreadBindingAfterRun(params); } async function emitSubagentEndedHookForRun(params: { entry: SubagentRunRecord; reason?: SubagentLifecycleEndedReason; sendFarewell?: boolean; accountId?: string; }) { const reason = params.reason ?? params.entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE; const outcome = resolveLifecycleOutcomeFromRunOutcome(params.entry.outcome); const error = params.entry.outcome?.status === "error" ? params.entry.outcome.error : undefined; await emitSubagentEndedHookOnce({ entry: params.entry, reason, sendFarewell: params.sendFarewell, accountId: params.accountId ?? params.entry.requesterOrigin?.accountId, outcome, error, inFlightRunIds: endedHookInFlightRunIds, persist: persistSubagentRuns, }); } async function completeSubagentRun(params: { runId: string; endedAt?: number; outcome: SubagentRunOutcome; reason: SubagentLifecycleEndedReason; sendFarewell?: boolean; accountId?: string; triggerCleanup: boolean; }) { clearPendingLifecycleError(params.runId); const entry = subagentRuns.get(params.runId); if (!entry) { return; } let mutated = false; const endedAt = typeof params.endedAt === "number" ? params.endedAt : Date.now(); if (entry.endedAt !== endedAt) { entry.endedAt = endedAt; mutated = true; } if (!runOutcomesEqual(entry.outcome, params.outcome)) { entry.outcome = params.outcome; mutated = true; } if (entry.endedReason !== params.reason) { entry.endedReason = params.reason; mutated = true; } if (mutated) { persistSubagentRuns(); } const suppressedForSteerRestart = suppressAnnounceForSteerRestart(entry); const shouldEmitEndedHook = !suppressedForSteerRestart && shouldEmitEndedHookForRun({ entry, reason: params.reason, }); const shouldDeferEndedHook = shouldEmitEndedHook && params.triggerCleanup && entry.expectsCompletionMessage === true && !suppressedForSteerRestart; if (!shouldDeferEndedHook && shouldEmitEndedHook) { await emitSubagentEndedHookForRun({ entry, reason: params.reason, sendFarewell: params.sendFarewell, accountId: params.accountId, }); } if (!params.triggerCleanup) { return; } if (suppressedForSteerRestart) { return; } startSubagentAnnounceCleanupFlow(params.runId, entry); } function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecord): boolean { if (!beginSubagentCleanup(runId)) { return false; } const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, endedAt: entry.endedAt, label: entry.label, outcome: entry.outcome, spawnMode: entry.spawnMode, expectsCompletionMessage: entry.expectsCompletionMessage, }) .then((didAnnounce) => { void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); }) .catch((error) => { defaultRuntime.log( `[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`, ); void finalizeSubagentCleanup(runId, entry.cleanup, false); }); return true; } function resumeSubagentRun(runId: string) { if (!runId || resumedRuns.has(runId)) { return; } const entry = subagentRuns.get(runId); if (!entry) { return; } const orphanReason = resolveSubagentRunOrphanReason({ entry }); if (orphanReason) { if ( reconcileOrphanedRun({ runId, entry, reason: orphanReason, source: "resume", }) ) { persistSubagentRuns(); } return; } if (entry.cleanupCompletedAt) { return; } // Skip entries that have exhausted their retry budget or expired (#18264). if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) { logAnnounceGiveUp(entry, "retry-limit"); entry.cleanupCompletedAt = Date.now(); persistSubagentRuns(); return; } if ( entry.expectsCompletionMessage !== true && typeof entry.endedAt === "number" && Date.now() - entry.endedAt > ANNOUNCE_EXPIRY_MS ) { logAnnounceGiveUp(entry, "expiry"); entry.cleanupCompletedAt = Date.now(); persistSubagentRuns(); return; } const now = Date.now(); const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0); const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs; if ( entry.expectsCompletionMessage === true && entry.lastAnnounceRetryAt && now < earliestRetryAt ) { const waitMs = Math.max(1, earliestRetryAt - now); setTimeout(() => { resumedRuns.delete(runId); resumeSubagentRun(runId); }, waitMs).unref?.(); resumedRuns.add(runId); return; } if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (suppressAnnounceForSteerRestart(entry)) { resumedRuns.add(runId); return; } if (!startSubagentAnnounceCleanupFlow(runId, entry)) { return; } resumedRuns.add(runId); return; } // Wait for completion again after restart. const cfg = loadConfig(); const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds); void waitForSubagentCompletion(runId, waitTimeoutMs); resumedRuns.add(runId); } function restoreSubagentRunsOnce() { if (restoreAttempted) { return; } restoreAttempted = true; try { const restoredCount = restoreSubagentRunsFromDisk({ runs: subagentRuns, mergeOnly: true, }); if (restoredCount === 0) { return; } if (reconcileOrphanedRestoredRuns()) { persistSubagentRuns(); } if (subagentRuns.size === 0) { return; } // Resume pending work. ensureListener(); if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) { startSweeper(); } for (const runId of subagentRuns.keys()) { resumeSubagentRun(runId); } } catch { // ignore restore failures } } function resolveArchiveAfterMs(cfg?: ReturnType) { const config = cfg ?? loadConfig(); const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60; if (!Number.isFinite(minutes) || minutes <= 0) { return undefined; } return Math.max(1, Math.floor(minutes)) * 60_000; } function resolveSubagentWaitTimeoutMs( cfg: ReturnType, runTimeoutSeconds?: number, ) { return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds ?? 0 }); } function startSweeper() { if (sweeper) { return; } sweeper = setInterval(() => { void sweepSubagentRuns(); }, 60_000); sweeper.unref?.(); } function stopSweeper() { if (!sweeper) { return; } clearInterval(sweeper); sweeper = null; } async function sweepSubagentRuns() { const now = Date.now(); let mutated = false; for (const [runId, entry] of subagentRuns.entries()) { if (!entry.archiveAtMs || entry.archiveAtMs > now) { continue; } clearPendingLifecycleError(runId); subagentRuns.delete(runId); mutated = true; // Archive/purge is terminal for the run record; remove any retained attachments too. await safeRemoveAttachmentsDir(entry); try { await callGateway({ method: "sessions.delete", params: { key: entry.childSessionKey, deleteTranscript: true, emitLifecycleHooks: false, }, timeoutMs: 10_000, }); } catch { // ignore } } if (mutated) { persistSubagentRuns(); } if (subagentRuns.size === 0) { stopSweeper(); } } function ensureListener() { if (listenerStarted) { return; } listenerStarted = true; listenerStop = onAgentEvent((evt) => { void (async () => { if (!evt || evt.stream !== "lifecycle") { return; } const entry = subagentRuns.get(evt.runId); if (!entry) { return; } const phase = evt.data?.phase; if (phase === "start") { clearPendingLifecycleError(evt.runId); const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined; if (startedAt) { entry.startedAt = startedAt; persistSubagentRuns(); } return; } if (phase !== "end" && phase !== "error") { return; } const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : Date.now(); const error = typeof evt.data?.error === "string" ? evt.data.error : undefined; if (phase === "error") { schedulePendingLifecycleError({ runId: evt.runId, endedAt, error, }); return; } clearPendingLifecycleError(evt.runId); const outcome: SubagentRunOutcome = evt.data?.aborted ? { status: "timeout" } : { status: "ok" }; await completeSubagentRun({ runId: evt.runId, endedAt, outcome, reason: SUBAGENT_ENDED_REASON_COMPLETE, sendFarewell: true, accountId: entry.requesterOrigin?.accountId, triggerCleanup: true, }); })(); }); } async function safeRemoveAttachmentsDir(entry: SubagentRunRecord): Promise { if (!entry.attachmentsDir || !entry.attachmentsRootDir) { return; } const resolveReal = async (targetPath: string): Promise => { try { return await fs.realpath(targetPath); } catch (err) { if ((err as NodeJS.ErrnoException | undefined)?.code === "ENOENT") { return null; } throw err; } }; try { const [rootReal, dirReal] = await Promise.all([ resolveReal(entry.attachmentsRootDir), resolveReal(entry.attachmentsDir), ]); if (!dirReal) { return; } const rootBase = rootReal ?? path.resolve(entry.attachmentsRootDir); // dirReal is guaranteed non-null here (early return above handles null case). const dirBase = dirReal; const rootWithSep = rootBase.endsWith(path.sep) ? rootBase : `${rootBase}${path.sep}`; if (!dirBase.startsWith(rootWithSep)) { return; } await fs.rm(dirBase, { recursive: true, force: true }); } catch { // best effort } } async function finalizeSubagentCleanup( runId: string, cleanup: "delete" | "keep", didAnnounce: boolean, ) { const entry = subagentRuns.get(runId); if (!entry) { return; } if (didAnnounce) { const completionReason = resolveCleanupCompletionReason(entry); await emitCompletionEndedHookIfNeeded(entry, completionReason); // Clean up attachments before the run record is removed. const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep; if (shouldDeleteAttachments) { await safeRemoveAttachmentsDir(entry); } completeCleanupBookkeeping({ runId, entry, cleanup, completedAt: Date.now(), }); return; } const now = Date.now(); const deferredDecision = resolveDeferredCleanupDecision({ entry, now, // Defer until descendants are fully settled, including post-end cleanup. activeDescendantRuns: Math.max(0, countPendingDescendantRuns(entry.childSessionKey)), announceExpiryMs: ANNOUNCE_EXPIRY_MS, announceCompletionHardExpiryMs: ANNOUNCE_COMPLETION_HARD_EXPIRY_MS, maxAnnounceRetryCount: MAX_ANNOUNCE_RETRY_COUNT, deferDescendantDelayMs: MIN_ANNOUNCE_RETRY_DELAY_MS, resolveAnnounceRetryDelayMs, }); if (deferredDecision.kind === "defer-descendants") { entry.lastAnnounceRetryAt = now; entry.cleanupHandled = false; resumedRuns.delete(runId); persistSubagentRuns(); setTimeout(() => { resumeSubagentRun(runId); }, deferredDecision.delayMs).unref?.(); return; } if (deferredDecision.retryCount != null) { entry.announceRetryCount = deferredDecision.retryCount; entry.lastAnnounceRetryAt = now; } if (deferredDecision.kind === "give-up") { const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep; if (shouldDeleteAttachments) { await safeRemoveAttachmentsDir(entry); } const completionReason = resolveCleanupCompletionReason(entry); await emitCompletionEndedHookIfNeeded(entry, completionReason); logAnnounceGiveUp(entry, deferredDecision.reason); completeCleanupBookkeeping({ runId, entry, cleanup: "keep", completedAt: now, }); return; } // Allow retry on the next wake if announce was deferred or failed. // Applies to both keep/delete cleanup modes so delete-runs are only removed // after a successful announce (or terminal give-up). entry.cleanupHandled = false; // Clear the in-flight resume marker so the scheduled retry can run again. resumedRuns.delete(runId); persistSubagentRuns(); if (deferredDecision.resumeDelayMs == null) { return; } setTimeout(() => { resumeSubagentRun(runId); }, deferredDecision.resumeDelayMs).unref?.(); } async function emitCompletionEndedHookIfNeeded( entry: SubagentRunRecord, reason: SubagentLifecycleEndedReason, ) { if ( entry.expectsCompletionMessage === true && shouldEmitEndedHookForRun({ entry, reason, }) ) { await emitSubagentEndedHookForRun({ entry, reason, sendFarewell: true, }); } } function completeCleanupBookkeeping(params: { runId: string; entry: SubagentRunRecord; cleanup: "delete" | "keep"; completedAt: number; }) { if (params.cleanup === "delete") { clearPendingLifecycleError(params.runId); subagentRuns.delete(params.runId); persistSubagentRuns(); retryDeferredCompletedAnnounces(params.runId); return; } params.entry.cleanupCompletedAt = params.completedAt; persistSubagentRuns(); retryDeferredCompletedAnnounces(params.runId); } function retryDeferredCompletedAnnounces(excludeRunId?: string) { const now = Date.now(); for (const [runId, entry] of subagentRuns.entries()) { if (excludeRunId && runId === excludeRunId) { continue; } if (typeof entry.endedAt !== "number") { continue; } if (entry.cleanupCompletedAt || entry.cleanupHandled) { continue; } if (suppressAnnounceForSteerRestart(entry)) { continue; } // Force-expire stale non-completion announces; completion-message flows can // stay pending while descendants run for a long time. const endedAgo = now - (entry.endedAt ?? now); if (entry.expectsCompletionMessage !== true && endedAgo > ANNOUNCE_EXPIRY_MS) { logAnnounceGiveUp(entry, "expiry"); entry.cleanupCompletedAt = now; persistSubagentRuns(); continue; } resumedRuns.delete(runId); resumeSubagentRun(runId); } } function beginSubagentCleanup(runId: string) { const entry = subagentRuns.get(runId); if (!entry) { return false; } if (entry.cleanupCompletedAt) { return false; } if (entry.cleanupHandled) { return false; } entry.cleanupHandled = true; persistSubagentRuns(); return true; } export function markSubagentRunForSteerRestart(runId: string) { const key = runId.trim(); if (!key) { return false; } const entry = subagentRuns.get(key); if (!entry) { return false; } if (entry.suppressAnnounceReason === "steer-restart") { return true; } entry.suppressAnnounceReason = "steer-restart"; persistSubagentRuns(); return true; } export function clearSubagentRunSteerRestart(runId: string) { const key = runId.trim(); if (!key) { return false; } const entry = subagentRuns.get(key); if (!entry) { return false; } if (entry.suppressAnnounceReason !== "steer-restart") { return true; } entry.suppressAnnounceReason = undefined; persistSubagentRuns(); // If the interrupted run already finished while suppression was active, retry // cleanup now so completion output is not lost when restart dispatch fails. resumedRuns.delete(key); if (typeof entry.endedAt === "number" && !entry.cleanupCompletedAt) { resumeSubagentRun(key); } return true; } export function replaceSubagentRunAfterSteer(params: { previousRunId: string; nextRunId: string; fallback?: SubagentRunRecord; runTimeoutSeconds?: number; }) { const previousRunId = params.previousRunId.trim(); const nextRunId = params.nextRunId.trim(); if (!previousRunId || !nextRunId) { return false; } const previous = subagentRuns.get(previousRunId); const source = previous ?? params.fallback; if (!source) { return false; } if (previousRunId !== nextRunId) { clearPendingLifecycleError(previousRunId); subagentRuns.delete(previousRunId); resumedRuns.delete(previousRunId); } const now = Date.now(); const cfg = loadConfig(); const archiveAfterMs = resolveArchiveAfterMs(cfg); const spawnMode = source.spawnMode === "session" ? "session" : "run"; const archiveAtMs = spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined; const runTimeoutSeconds = params.runTimeoutSeconds ?? source.runTimeoutSeconds ?? 0; const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds); const next: SubagentRunRecord = { ...source, runId: nextRunId, startedAt: now, endedAt: undefined, endedReason: undefined, endedHookEmittedAt: undefined, outcome: undefined, cleanupCompletedAt: undefined, cleanupHandled: false, suppressAnnounceReason: undefined, announceRetryCount: undefined, lastAnnounceRetryAt: undefined, spawnMode, archiveAtMs, runTimeoutSeconds, }; subagentRuns.set(nextRunId, next); ensureListener(); persistSubagentRuns(); if (archiveAtMs) { startSweeper(); } void waitForSubagentCompletion(nextRunId, waitTimeoutMs); return true; } export function registerSubagentRun(params: { runId: string; childSessionKey: string; requesterSessionKey: string; requesterOrigin?: DeliveryContext; requesterDisplayKey: string; task: string; cleanup: "delete" | "keep"; label?: string; model?: string; runTimeoutSeconds?: number; expectsCompletionMessage?: boolean; spawnMode?: "run" | "session"; attachmentsDir?: string; attachmentsRootDir?: string; retainAttachmentsOnKeep?: boolean; }) { const now = Date.now(); const cfg = loadConfig(); const archiveAfterMs = resolveArchiveAfterMs(cfg); const spawnMode = params.spawnMode === "session" ? "session" : "run"; const archiveAtMs = spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined; const runTimeoutSeconds = params.runTimeoutSeconds ?? 0; const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds); const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); subagentRuns.set(params.runId, { runId: params.runId, childSessionKey: params.childSessionKey, requesterSessionKey: params.requesterSessionKey, requesterOrigin, requesterDisplayKey: params.requesterDisplayKey, task: params.task, cleanup: params.cleanup, expectsCompletionMessage: params.expectsCompletionMessage, spawnMode, label: params.label, model: params.model, runTimeoutSeconds, createdAt: now, startedAt: now, archiveAtMs, cleanupHandled: false, attachmentsDir: params.attachmentsDir, attachmentsRootDir: params.attachmentsRootDir, retainAttachmentsOnKeep: params.retainAttachmentsOnKeep, }); ensureListener(); persistSubagentRuns(); if (archiveAtMs) { startSweeper(); } // Wait for subagent completion via gateway RPC (cross-process). // The in-process lifecycle listener is a fallback for embedded runs. void waitForSubagentCompletion(params.runId, waitTimeoutMs); } async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { try { const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs)); const wait = await callGateway<{ status?: string; startedAt?: number; endedAt?: number; error?: string; }>({ method: "agent.wait", params: { runId, timeoutMs, }, timeoutMs: timeoutMs + 10_000, }); if (wait?.status !== "ok" && wait?.status !== "error" && wait?.status !== "timeout") { return; } const entry = subagentRuns.get(runId); if (!entry) { return; } let mutated = false; if (typeof wait.startedAt === "number") { entry.startedAt = wait.startedAt; mutated = true; } if (typeof wait.endedAt === "number") { entry.endedAt = wait.endedAt; mutated = true; } if (!entry.endedAt) { entry.endedAt = Date.now(); mutated = true; } const waitError = typeof wait.error === "string" ? wait.error : undefined; const outcome: SubagentRunOutcome = wait.status === "error" ? { status: "error", error: waitError } : wait.status === "timeout" ? { status: "timeout" } : { status: "ok" }; if (!runOutcomesEqual(entry.outcome, outcome)) { entry.outcome = outcome; mutated = true; } if (mutated) { persistSubagentRuns(); } await completeSubagentRun({ runId, endedAt: entry.endedAt, outcome, reason: wait.status === "error" ? SUBAGENT_ENDED_REASON_ERROR : SUBAGENT_ENDED_REASON_COMPLETE, sendFarewell: true, accountId: entry.requesterOrigin?.accountId, triggerCleanup: true, }); } catch { // ignore } } export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) { subagentRuns.clear(); resumedRuns.clear(); endedHookInFlightRunIds.clear(); clearAllPendingLifecycleErrors(); resetAnnounceQueuesForTests(); stopSweeper(); restoreAttempted = false; if (listenerStop) { listenerStop(); listenerStop = null; } listenerStarted = false; if (opts?.persist !== false) { persistSubagentRuns(); } } export function addSubagentRunForTests(entry: SubagentRunRecord) { subagentRuns.set(entry.runId, entry); } export function releaseSubagentRun(runId: string) { clearPendingLifecycleError(runId); const didDelete = subagentRuns.delete(runId); if (didDelete) { persistSubagentRuns(); } if (subagentRuns.size === 0) { stopSweeper(); } } function findRunIdsByChildSessionKey(childSessionKey: string): string[] { return findRunIdsByChildSessionKeyFromRuns(subagentRuns, childSessionKey); } export function resolveRequesterForChildSession(childSessionKey: string): { requesterSessionKey: string; requesterOrigin?: DeliveryContext; } | null { const resolved = resolveRequesterForChildSessionFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), childSessionKey, ); if (!resolved) { return null; } return { requesterSessionKey: resolved.requesterSessionKey, requesterOrigin: normalizeDeliveryContext(resolved.requesterOrigin), }; } export function isSubagentSessionRunActive(childSessionKey: string): boolean { const runIds = findRunIdsByChildSessionKey(childSessionKey); for (const runId of runIds) { const entry = subagentRuns.get(runId); if (!entry) { continue; } if (typeof entry.endedAt !== "number") { return true; } } return false; } export function markSubagentRunTerminated(params: { runId?: string; childSessionKey?: string; reason?: string; }): number { const runIds = new Set(); if (typeof params.runId === "string" && params.runId.trim()) { runIds.add(params.runId.trim()); } if (typeof params.childSessionKey === "string" && params.childSessionKey.trim()) { for (const runId of findRunIdsByChildSessionKey(params.childSessionKey)) { runIds.add(runId); } } if (runIds.size === 0) { return 0; } const now = Date.now(); const reason = params.reason?.trim() || "killed"; let updated = 0; const entriesByChildSessionKey = new Map(); for (const runId of runIds) { clearPendingLifecycleError(runId); const entry = subagentRuns.get(runId); if (!entry) { continue; } if (typeof entry.endedAt === "number") { continue; } entry.endedAt = now; entry.outcome = { status: "error", error: reason }; entry.endedReason = SUBAGENT_ENDED_REASON_KILLED; entry.cleanupHandled = true; entry.cleanupCompletedAt = now; entry.suppressAnnounceReason = "killed"; if (!entriesByChildSessionKey.has(entry.childSessionKey)) { entriesByChildSessionKey.set(entry.childSessionKey, entry); } updated += 1; } if (updated > 0) { persistSubagentRuns(); for (const entry of entriesByChildSessionKey.values()) { void emitSubagentEndedHookOnce({ entry, reason: SUBAGENT_ENDED_REASON_KILLED, sendFarewell: true, outcome: SUBAGENT_ENDED_OUTCOME_KILLED, error: reason, inFlightRunIds: endedHookInFlightRunIds, persist: persistSubagentRuns, }).catch(() => { // Hook failures should not break termination flow. }); } } return updated; } export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] { return listRunsForRequesterFromRuns(subagentRuns, requesterSessionKey); } export function countActiveRunsForSession(requesterSessionKey: string): number { return countActiveRunsForSessionFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), requesterSessionKey, ); } export function countActiveDescendantRuns(rootSessionKey: string): number { return countActiveDescendantRunsFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), rootSessionKey, ); } export function countPendingDescendantRuns(rootSessionKey: string): number { return countPendingDescendantRunsFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), rootSessionKey, ); } export function countPendingDescendantRunsExcludingRun( rootSessionKey: string, excludeRunId: string, ): number { return countPendingDescendantRunsExcludingRunFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), rootSessionKey, excludeRunId, ); } export function listDescendantRunsForRequester(rootSessionKey: string): SubagentRunRecord[] { return listDescendantRunsForRequesterFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), rootSessionKey, ); } export function initSubagentRegistry() { restoreSubagentRunsOnce(); }