mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-29 10:50:58 +00:00
* feat(context-engine): add ContextEngine interface and registry
Introduce the pluggable ContextEngine abstraction that allows external
plugins to register custom context management strategies.
- ContextEngine interface with lifecycle methods: bootstrap, ingest,
ingestBatch, afterTurn, assemble, compact, prepareSubagentSpawn,
onSubagentEnded, dispose
- Module-level singleton registry with registerContextEngine() and
resolveContextEngine() (config-driven slot selection)
- LegacyContextEngine: pass-through implementation wrapping existing
compaction behavior for 100% backward compatibility
- ensureContextEnginesInitialized() guard for safe one-time registration
- 19 tests covering contract, registry, resolution, and legacy parity
* feat(plugins): add context-engine slot and registerContextEngine API
Wire the ContextEngine abstraction into the plugin system so external
plugins can register context engines via the standard plugin API.
- Add 'context-engine' to PluginKind union type
- Add 'contextEngine' slot to PluginSlotsConfig (default: 'legacy')
- Wire registerContextEngine() through OpenClawPluginApi
- Export ContextEngine types from plugin-sdk for external consumers
- Restore proper slot-based resolution in registry
* feat(context-engine): wire ContextEngine into agent run lifecycle
Integrate the ContextEngine abstraction into the core agent run path:
- Resolve context engine once per run (reused across retries)
- Bootstrap: hydrate canonical store from session file on first run
- Assemble: route context assembly through pluggable engine
- Auto-compaction guard: disable built-in auto-compaction when
the engine declares ownsCompaction (prevents double-compaction)
- AfterTurn: post-turn lifecycle hook for ingest + background
compaction decisions
- Overflow compaction: route through contextEngine.compact()
- Dispose: clean up engine resources in finally block
- Notify context engine on subagent lifecycle events
Legacy engine: all lifecycle methods are pass-through/no-op, preserving
100% backward compatibility for users without a context engine plugin.
* feat(plugins): add scoped subagent methods and gateway request scope
Expose runtime.subagent.{run, waitForRun, getSession, deleteSession}
so external plugins can spawn sub-agent sessions without raw gateway
dispatch access.
Uses AsyncLocalStorage request-scope bridge to dispatch internally via
handleGatewayRequest with a synthetic operator client. Methods are only
available during gateway request handling.
- Symbol.for-backed global singleton for cross-module-reload safety
- Fallback gateway context for non-WS dispatch paths (Telegram/WhatsApp)
- Set gateway request scope for all handlers, not just plugin handlers
- 3 staleness tests for fallback context hardening
* feat(context-engine): route /compact and sessions.get through context engine
Wire the /compact command and sessions.get handler through the pluggable
ContextEngine interface.
- Thread tokenBudget and force parameters to context engine compact
- Route /compact through contextEngine.compact() when registered
- Wire sessions.get as runtime alias for plugin subagent dispatch
- Add .pebbles/ to .gitignore
* style: format with oxfmt 0.33.0
Fix duplicate import (ControlUiRootState in server.impl.ts) and
import ordering across all changed files.
* fix: update extension test mocks for context-engine types
Add missing subagent property to bluebubbles PluginRuntime mock.
Add missing registerContextEngine to lobster OpenClawPluginApi mock.
* fix(subagents): keep deferred delete cleanup retryable
* style: format run attempt for CI
* fix(rebase): remove duplicate embedded-run imports
* test: add missing gateway context mock export
* fix: pass resolved auth profile into afterTurn compaction
Ensure the embedded runner forwards resolved auth profile context into
legacy context-engine compaction params on the normal afterTurn path,
matching overflow compaction behavior. This allows downstream LCM
summarization to use the intended provider auth/profile consistently.
Also fix strict TS typing in external-link token dedupe and align an
attempt unit test reasoningLevel value with the current ReasoningLevel
enum.
Regeneration-Prompt: |
We were debugging context-engine compaction where downstream summary
calls were missing the right auth/profile context in normal afterTurn
flow, while overflow compaction already propagated it. Preserve current
behavior and keep changes additive: thread the resolved authProfileId
through run -> attempt -> legacy compaction param builder without
broad refactors.
Add tests that prove the auth profile is included in afterTurn legacy
params and that overflow compaction still passes it through run
attempts. Keep existing APIs stable, and only adjust small type issues
needed for strict compilation.
* fix: remove duplicate imports from rebase
* feat: add context-engine system prompt additions
* fix(rebase): dedupe attempt import declarations
* test: fix fetch mock typing in ollama autodiscovery
* fix(test): add registerContextEngine to diffs extension mock APIs
* test(windows): use path.delimiter in ios-team-id fixture PATH
* test(cron): add model formatting and precedence edge case tests
Covers:
- Provider/model string splitting (whitespace, nested paths, empty segments)
- Provider normalization (casing, aliases like bedrock→amazon-bedrock)
- Anthropic model alias normalization (opus-4.5→claude-opus-4-5)
- Precedence: job payload > session override > config default
- Sequential runs with different providers (CI flake regression pattern)
- forceNew session preserving stored model overrides
- Whitespace/empty model string edge cases
- Config model as string vs object format
* test(cron): fix model formatting test config types
* test(phone-control): add registerContextEngine to mock API
* fix: re-export ChannelKind from config-reload-plan
* fix: add subagent mock to plugin-runtime-mock test util
* docs: add changelog fragment for context engine PR #22201
1451 lines
42 KiB
TypeScript
1451 lines
42 KiB
TypeScript
import { promises as fs } from "node:fs";
|
|
import path from "node:path";
|
|
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import {
|
|
loadSessionStore,
|
|
resolveAgentIdFromSessionKey,
|
|
resolveStorePath,
|
|
type SessionEntry,
|
|
} from "../config/sessions.js";
|
|
import { ensureContextEnginesInitialized } from "../context-engine/init.js";
|
|
import { resolveContextEngine } from "../context-engine/registry.js";
|
|
import type { SubagentEndReason } from "../context-engine/types.js";
|
|
import { callGateway } from "../gateway/call.js";
|
|
import { onAgentEvent } from "../infra/agent-events.js";
|
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
|
import { defaultRuntime } from "../runtime.js";
|
|
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
|
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
|
|
import {
|
|
captureSubagentCompletionReply,
|
|
runSubagentAnnounceFlow,
|
|
type SubagentRunOutcome,
|
|
} from "./subagent-announce.js";
|
|
import {
|
|
SUBAGENT_ENDED_OUTCOME_KILLED,
|
|
SUBAGENT_ENDED_REASON_COMPLETE,
|
|
SUBAGENT_ENDED_REASON_ERROR,
|
|
SUBAGENT_ENDED_REASON_KILLED,
|
|
type SubagentLifecycleEndedReason,
|
|
} from "./subagent-lifecycle-events.js";
|
|
import {
|
|
resolveCleanupCompletionReason,
|
|
resolveDeferredCleanupDecision,
|
|
} from "./subagent-registry-cleanup.js";
|
|
import {
|
|
emitSubagentEndedHookOnce,
|
|
resolveLifecycleOutcomeFromRunOutcome,
|
|
runOutcomesEqual,
|
|
} from "./subagent-registry-completion.js";
|
|
import {
|
|
countActiveDescendantRunsFromRuns,
|
|
countActiveRunsForSessionFromRuns,
|
|
countPendingDescendantRunsExcludingRunFromRuns,
|
|
countPendingDescendantRunsFromRuns,
|
|
findRunIdsByChildSessionKeyFromRuns,
|
|
listDescendantRunsForRequesterFromRuns,
|
|
listRunsForRequesterFromRuns,
|
|
resolveRequesterForChildSessionFromRuns,
|
|
shouldIgnorePostCompletionAnnounceForSessionFromRuns,
|
|
} from "./subagent-registry-queries.js";
|
|
import {
|
|
getSubagentRunsSnapshotForRead,
|
|
persistSubagentRunsToDisk,
|
|
restoreSubagentRunsFromDisk,
|
|
} from "./subagent-registry-state.js";
|
|
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
|
import { resolveAgentTimeoutMs } from "./timeout.js";
|
|
|
|
export type { SubagentRunRecord } from "./subagent-registry.types.js";
|
|
const log = createSubsystemLogger("agents/subagent-registry");
|
|
|
|
const subagentRuns = new Map<string, SubagentRunRecord>();
|
|
let sweeper: NodeJS.Timeout | null = null;
|
|
let listenerStarted = false;
|
|
let listenerStop: (() => void) | null = null;
|
|
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
|
|
var restoreAttempted = false;
|
|
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
|
|
const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000;
|
|
const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000;
|
|
/**
|
|
* Maximum number of announce delivery attempts before giving up.
|
|
* Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly
|
|
* returns `false` due to stale state or transient conditions (#18264).
|
|
*/
|
|
const MAX_ANNOUNCE_RETRY_COUNT = 3;
|
|
/**
|
|
* Non-completion announce entries older than this are force-expired even if
|
|
* delivery never succeeded.
|
|
*/
|
|
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
|
|
/**
|
|
* Completion-message flows can wait for descendants to finish, but this hard
|
|
* cap prevents indefinite pending state when descendants never fully settle.
|
|
*/
|
|
const ANNOUNCE_COMPLETION_HARD_EXPIRY_MS = 30 * 60_000; // 30 minutes
|
|
type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
|
|
/**
|
|
* Embedded runs can emit transient lifecycle `error` events while provider/model
|
|
* retry is still in progress. Defer terminal error cleanup briefly so a
|
|
* subsequent lifecycle `start` / `end` can cancel premature failure announces.
|
|
*/
|
|
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
|
const FROZEN_RESULT_TEXT_MAX_BYTES = 100 * 1024;
|
|
|
|
function capFrozenResultText(resultText: string): string {
|
|
const trimmed = resultText.trim();
|
|
if (!trimmed) {
|
|
return "";
|
|
}
|
|
const totalBytes = Buffer.byteLength(trimmed, "utf8");
|
|
if (totalBytes <= FROZEN_RESULT_TEXT_MAX_BYTES) {
|
|
return trimmed;
|
|
}
|
|
const notice = `\n\n[truncated: frozen completion output exceeded ${Math.round(FROZEN_RESULT_TEXT_MAX_BYTES / 1024)}KB (${Math.round(totalBytes / 1024)}KB)]`;
|
|
const maxPayloadBytes = Math.max(
|
|
0,
|
|
FROZEN_RESULT_TEXT_MAX_BYTES - Buffer.byteLength(notice, "utf8"),
|
|
);
|
|
const payload = Buffer.from(trimmed, "utf8").subarray(0, maxPayloadBytes).toString("utf8");
|
|
return `${payload}${notice}`;
|
|
}
|
|
|
|
function resolveAnnounceRetryDelayMs(retryCount: number) {
|
|
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
|
|
// retryCount is "attempts already made", so retry #1 waits 1s, then 2s, 4s...
|
|
const backoffExponent = Math.max(0, boundedRetryCount - 1);
|
|
const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** backoffExponent;
|
|
return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS);
|
|
}
|
|
|
|
function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") {
|
|
const retryCount = entry.announceRetryCount ?? 0;
|
|
const endedAgoMs =
|
|
typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined;
|
|
const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a";
|
|
defaultRuntime.log(
|
|
`[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}`,
|
|
);
|
|
}
|
|
|
|
function persistSubagentRuns() {
|
|
persistSubagentRunsToDisk(subagentRuns);
|
|
}
|
|
|
|
function findSessionEntryByKey(store: Record<string, SessionEntry>, sessionKey: string) {
|
|
const direct = store[sessionKey];
|
|
if (direct) {
|
|
return direct;
|
|
}
|
|
const normalized = sessionKey.toLowerCase();
|
|
for (const [key, entry] of Object.entries(store)) {
|
|
if (key.toLowerCase() === normalized) {
|
|
return entry;
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function resolveSubagentRunOrphanReason(params: {
|
|
entry: SubagentRunRecord;
|
|
storeCache?: Map<string, Record<string, SessionEntry>>;
|
|
}): SubagentRunOrphanReason | null {
|
|
const childSessionKey = params.entry.childSessionKey?.trim();
|
|
if (!childSessionKey) {
|
|
return "missing-session-entry";
|
|
}
|
|
try {
|
|
const cfg = loadConfig();
|
|
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
let store = params.storeCache?.get(storePath);
|
|
if (!store) {
|
|
store = loadSessionStore(storePath);
|
|
params.storeCache?.set(storePath, store);
|
|
}
|
|
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
|
|
if (!sessionEntry) {
|
|
return "missing-session-entry";
|
|
}
|
|
if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) {
|
|
return "missing-session-id";
|
|
}
|
|
return null;
|
|
} catch {
|
|
// Best-effort guard: avoid false orphan pruning on transient read/config failures.
|
|
return null;
|
|
}
|
|
}
|
|
|
|
function reconcileOrphanedRun(params: {
|
|
runId: string;
|
|
entry: SubagentRunRecord;
|
|
reason: SubagentRunOrphanReason;
|
|
source: "restore" | "resume";
|
|
}) {
|
|
const now = Date.now();
|
|
let changed = false;
|
|
if (typeof params.entry.endedAt !== "number") {
|
|
params.entry.endedAt = now;
|
|
changed = true;
|
|
}
|
|
const orphanOutcome: SubagentRunOutcome = {
|
|
status: "error",
|
|
error: `orphaned subagent run (${params.reason})`,
|
|
};
|
|
if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) {
|
|
params.entry.outcome = orphanOutcome;
|
|
changed = true;
|
|
}
|
|
if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) {
|
|
params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR;
|
|
changed = true;
|
|
}
|
|
if (params.entry.cleanupHandled !== true) {
|
|
params.entry.cleanupHandled = true;
|
|
changed = true;
|
|
}
|
|
if (typeof params.entry.cleanupCompletedAt !== "number") {
|
|
params.entry.cleanupCompletedAt = now;
|
|
changed = true;
|
|
}
|
|
const removed = subagentRuns.delete(params.runId);
|
|
resumedRuns.delete(params.runId);
|
|
if (!removed && !changed) {
|
|
return false;
|
|
}
|
|
defaultRuntime.log(
|
|
`[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`,
|
|
);
|
|
return true;
|
|
}
|
|
|
|
function reconcileOrphanedRestoredRuns() {
|
|
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
|
let changed = false;
|
|
for (const [runId, entry] of subagentRuns.entries()) {
|
|
const orphanReason = resolveSubagentRunOrphanReason({
|
|
entry,
|
|
storeCache,
|
|
});
|
|
if (!orphanReason) {
|
|
continue;
|
|
}
|
|
if (
|
|
reconcileOrphanedRun({
|
|
runId,
|
|
entry,
|
|
reason: orphanReason,
|
|
source: "restore",
|
|
})
|
|
) {
|
|
changed = true;
|
|
}
|
|
}
|
|
return changed;
|
|
}
|
|
|
|
const resumedRuns = new Set<string>();
|
|
const endedHookInFlightRunIds = new Set<string>();
|
|
const pendingLifecycleErrorByRunId = new Map<
|
|
string,
|
|
{
|
|
timer: NodeJS.Timeout;
|
|
endedAt: number;
|
|
error?: string;
|
|
}
|
|
>();
|
|
|
|
function clearPendingLifecycleError(runId: string) {
|
|
const pending = pendingLifecycleErrorByRunId.get(runId);
|
|
if (!pending) {
|
|
return;
|
|
}
|
|
clearTimeout(pending.timer);
|
|
pendingLifecycleErrorByRunId.delete(runId);
|
|
}
|
|
|
|
function clearAllPendingLifecycleErrors() {
|
|
for (const pending of pendingLifecycleErrorByRunId.values()) {
|
|
clearTimeout(pending.timer);
|
|
}
|
|
pendingLifecycleErrorByRunId.clear();
|
|
}
|
|
|
|
function schedulePendingLifecycleError(params: { runId: string; endedAt: number; error?: string }) {
|
|
clearPendingLifecycleError(params.runId);
|
|
const timer = setTimeout(() => {
|
|
const pending = pendingLifecycleErrorByRunId.get(params.runId);
|
|
if (!pending || pending.timer !== timer) {
|
|
return;
|
|
}
|
|
pendingLifecycleErrorByRunId.delete(params.runId);
|
|
const entry = subagentRuns.get(params.runId);
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
if (entry.endedReason === SUBAGENT_ENDED_REASON_COMPLETE || entry.outcome?.status === "ok") {
|
|
return;
|
|
}
|
|
void completeSubagentRun({
|
|
runId: params.runId,
|
|
endedAt: pending.endedAt,
|
|
outcome: {
|
|
status: "error",
|
|
error: pending.error,
|
|
},
|
|
reason: SUBAGENT_ENDED_REASON_ERROR,
|
|
sendFarewell: true,
|
|
accountId: entry.requesterOrigin?.accountId,
|
|
triggerCleanup: true,
|
|
});
|
|
}, LIFECYCLE_ERROR_RETRY_GRACE_MS);
|
|
timer.unref?.();
|
|
pendingLifecycleErrorByRunId.set(params.runId, {
|
|
timer,
|
|
endedAt: params.endedAt,
|
|
error: params.error,
|
|
});
|
|
}
|
|
|
|
async function notifyContextEngineSubagentEnded(params: {
|
|
childSessionKey: string;
|
|
reason: SubagentEndReason;
|
|
}) {
|
|
try {
|
|
ensureContextEnginesInitialized();
|
|
const engine = await resolveContextEngine(loadConfig());
|
|
if (!engine.onSubagentEnded) {
|
|
return;
|
|
}
|
|
await engine.onSubagentEnded(params);
|
|
} catch (err) {
|
|
log.warn("context-engine onSubagentEnded failed (best-effort)", { err });
|
|
}
|
|
}
|
|
|
|
function suppressAnnounceForSteerRestart(entry?: SubagentRunRecord) {
|
|
return entry?.suppressAnnounceReason === "steer-restart";
|
|
}
|
|
|
|
function shouldKeepThreadBindingAfterRun(params: {
|
|
entry: SubagentRunRecord;
|
|
reason: SubagentLifecycleEndedReason;
|
|
}) {
|
|
if (params.reason === SUBAGENT_ENDED_REASON_KILLED) {
|
|
return false;
|
|
}
|
|
return params.entry.spawnMode === "session";
|
|
}
|
|
|
|
function shouldEmitEndedHookForRun(params: {
|
|
entry: SubagentRunRecord;
|
|
reason: SubagentLifecycleEndedReason;
|
|
}) {
|
|
return !shouldKeepThreadBindingAfterRun(params);
|
|
}
|
|
|
|
async function emitSubagentEndedHookForRun(params: {
|
|
entry: SubagentRunRecord;
|
|
reason?: SubagentLifecycleEndedReason;
|
|
sendFarewell?: boolean;
|
|
accountId?: string;
|
|
}) {
|
|
const reason = params.reason ?? params.entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE;
|
|
const outcome = resolveLifecycleOutcomeFromRunOutcome(params.entry.outcome);
|
|
const error = params.entry.outcome?.status === "error" ? params.entry.outcome.error : undefined;
|
|
await emitSubagentEndedHookOnce({
|
|
entry: params.entry,
|
|
reason,
|
|
sendFarewell: params.sendFarewell,
|
|
accountId: params.accountId ?? params.entry.requesterOrigin?.accountId,
|
|
outcome,
|
|
error,
|
|
inFlightRunIds: endedHookInFlightRunIds,
|
|
persist: persistSubagentRuns,
|
|
});
|
|
}
|
|
|
|
async function freezeRunResultAtCompletion(entry: SubagentRunRecord): Promise<boolean> {
|
|
if (entry.frozenResultText !== undefined) {
|
|
return false;
|
|
}
|
|
try {
|
|
const captured = await captureSubagentCompletionReply(entry.childSessionKey);
|
|
entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null;
|
|
} catch {
|
|
entry.frozenResultText = null;
|
|
}
|
|
entry.frozenResultCapturedAt = Date.now();
|
|
return true;
|
|
}
|
|
|
|
function listPendingCompletionRunsForSession(sessionKey: string): SubagentRunRecord[] {
|
|
const key = sessionKey.trim();
|
|
if (!key) {
|
|
return [];
|
|
}
|
|
const out: SubagentRunRecord[] = [];
|
|
for (const entry of subagentRuns.values()) {
|
|
if (entry.childSessionKey !== key) {
|
|
continue;
|
|
}
|
|
if (entry.expectsCompletionMessage !== true) {
|
|
continue;
|
|
}
|
|
if (typeof entry.endedAt !== "number") {
|
|
continue;
|
|
}
|
|
if (typeof entry.cleanupCompletedAt === "number") {
|
|
continue;
|
|
}
|
|
out.push(entry);
|
|
}
|
|
return out;
|
|
}
|
|
|
|
async function refreshFrozenResultFromSession(sessionKey: string): Promise<boolean> {
|
|
const candidates = listPendingCompletionRunsForSession(sessionKey);
|
|
if (candidates.length === 0) {
|
|
return false;
|
|
}
|
|
|
|
let captured: string | undefined;
|
|
try {
|
|
captured = await captureSubagentCompletionReply(sessionKey);
|
|
} catch {
|
|
return false;
|
|
}
|
|
const trimmed = captured?.trim();
|
|
if (!trimmed || isSilentReplyText(trimmed, SILENT_REPLY_TOKEN)) {
|
|
return false;
|
|
}
|
|
|
|
const nextFrozen = capFrozenResultText(trimmed);
|
|
const capturedAt = Date.now();
|
|
let changed = false;
|
|
for (const entry of candidates) {
|
|
if (entry.frozenResultText === nextFrozen) {
|
|
continue;
|
|
}
|
|
entry.frozenResultText = nextFrozen;
|
|
entry.frozenResultCapturedAt = capturedAt;
|
|
changed = true;
|
|
}
|
|
if (changed) {
|
|
persistSubagentRuns();
|
|
}
|
|
return changed;
|
|
}
|
|
|
|
async function completeSubagentRun(params: {
|
|
runId: string;
|
|
endedAt?: number;
|
|
outcome: SubagentRunOutcome;
|
|
reason: SubagentLifecycleEndedReason;
|
|
sendFarewell?: boolean;
|
|
accountId?: string;
|
|
triggerCleanup: boolean;
|
|
}) {
|
|
clearPendingLifecycleError(params.runId);
|
|
const entry = subagentRuns.get(params.runId);
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
|
|
let mutated = false;
|
|
// If a late lifecycle completion arrives after an earlier kill marker, allow
|
|
// completion cleanup/announce to run instead of staying permanently suppressed.
|
|
if (
|
|
params.reason === SUBAGENT_ENDED_REASON_COMPLETE &&
|
|
entry.suppressAnnounceReason === "killed" &&
|
|
(entry.cleanupHandled || typeof entry.cleanupCompletedAt === "number")
|
|
) {
|
|
entry.suppressAnnounceReason = undefined;
|
|
entry.cleanupHandled = false;
|
|
entry.cleanupCompletedAt = undefined;
|
|
mutated = true;
|
|
}
|
|
|
|
const endedAt = typeof params.endedAt === "number" ? params.endedAt : Date.now();
|
|
if (entry.endedAt !== endedAt) {
|
|
entry.endedAt = endedAt;
|
|
mutated = true;
|
|
}
|
|
if (!runOutcomesEqual(entry.outcome, params.outcome)) {
|
|
entry.outcome = params.outcome;
|
|
mutated = true;
|
|
}
|
|
if (entry.endedReason !== params.reason) {
|
|
entry.endedReason = params.reason;
|
|
mutated = true;
|
|
}
|
|
|
|
if (await freezeRunResultAtCompletion(entry)) {
|
|
mutated = true;
|
|
}
|
|
|
|
if (mutated) {
|
|
persistSubagentRuns();
|
|
}
|
|
|
|
const suppressedForSteerRestart = suppressAnnounceForSteerRestart(entry);
|
|
const shouldEmitEndedHook =
|
|
!suppressedForSteerRestart &&
|
|
shouldEmitEndedHookForRun({
|
|
entry,
|
|
reason: params.reason,
|
|
});
|
|
const shouldDeferEndedHook =
|
|
shouldEmitEndedHook &&
|
|
params.triggerCleanup &&
|
|
entry.expectsCompletionMessage === true &&
|
|
!suppressedForSteerRestart;
|
|
if (!shouldDeferEndedHook && shouldEmitEndedHook) {
|
|
await emitSubagentEndedHookForRun({
|
|
entry,
|
|
reason: params.reason,
|
|
sendFarewell: params.sendFarewell,
|
|
accountId: params.accountId,
|
|
});
|
|
}
|
|
|
|
if (!params.triggerCleanup) {
|
|
return;
|
|
}
|
|
if (suppressedForSteerRestart) {
|
|
return;
|
|
}
|
|
startSubagentAnnounceCleanupFlow(params.runId, entry);
|
|
}
|
|
|
|
function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecord): boolean {
|
|
if (!beginSubagentCleanup(runId)) {
|
|
return false;
|
|
}
|
|
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
|
|
void runSubagentAnnounceFlow({
|
|
childSessionKey: entry.childSessionKey,
|
|
childRunId: entry.runId,
|
|
requesterSessionKey: entry.requesterSessionKey,
|
|
requesterOrigin,
|
|
requesterDisplayKey: entry.requesterDisplayKey,
|
|
task: entry.task,
|
|
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
|
|
cleanup: entry.cleanup,
|
|
roundOneReply: entry.frozenResultText ?? undefined,
|
|
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
|
|
waitForCompletion: false,
|
|
startedAt: entry.startedAt,
|
|
endedAt: entry.endedAt,
|
|
label: entry.label,
|
|
outcome: entry.outcome,
|
|
spawnMode: entry.spawnMode,
|
|
expectsCompletionMessage: entry.expectsCompletionMessage,
|
|
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
|
|
})
|
|
.then((didAnnounce) => {
|
|
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
|
|
})
|
|
.catch((error) => {
|
|
defaultRuntime.log(
|
|
`[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`,
|
|
);
|
|
void finalizeSubagentCleanup(runId, entry.cleanup, false);
|
|
});
|
|
return true;
|
|
}
|
|
|
|
function resumeSubagentRun(runId: string) {
|
|
if (!runId || resumedRuns.has(runId)) {
|
|
return;
|
|
}
|
|
const entry = subagentRuns.get(runId);
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
const orphanReason = resolveSubagentRunOrphanReason({ entry });
|
|
if (orphanReason) {
|
|
if (
|
|
reconcileOrphanedRun({
|
|
runId,
|
|
entry,
|
|
reason: orphanReason,
|
|
source: "resume",
|
|
})
|
|
) {
|
|
persistSubagentRuns();
|
|
}
|
|
return;
|
|
}
|
|
if (entry.cleanupCompletedAt) {
|
|
return;
|
|
}
|
|
// Skip entries that have exhausted their retry budget or expired (#18264).
|
|
if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) {
|
|
logAnnounceGiveUp(entry, "retry-limit");
|
|
entry.cleanupCompletedAt = Date.now();
|
|
persistSubagentRuns();
|
|
return;
|
|
}
|
|
if (
|
|
entry.expectsCompletionMessage !== true &&
|
|
typeof entry.endedAt === "number" &&
|
|
Date.now() - entry.endedAt > ANNOUNCE_EXPIRY_MS
|
|
) {
|
|
logAnnounceGiveUp(entry, "expiry");
|
|
entry.cleanupCompletedAt = Date.now();
|
|
persistSubagentRuns();
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0);
|
|
const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs;
|
|
if (
|
|
entry.expectsCompletionMessage === true &&
|
|
entry.lastAnnounceRetryAt &&
|
|
now < earliestRetryAt
|
|
) {
|
|
const waitMs = Math.max(1, earliestRetryAt - now);
|
|
setTimeout(() => {
|
|
resumedRuns.delete(runId);
|
|
resumeSubagentRun(runId);
|
|
}, waitMs).unref?.();
|
|
resumedRuns.add(runId);
|
|
return;
|
|
}
|
|
|
|
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
|
|
if (suppressAnnounceForSteerRestart(entry)) {
|
|
resumedRuns.add(runId);
|
|
return;
|
|
}
|
|
if (!startSubagentAnnounceCleanupFlow(runId, entry)) {
|
|
return;
|
|
}
|
|
resumedRuns.add(runId);
|
|
return;
|
|
}
|
|
|
|
// Wait for completion again after restart.
|
|
const cfg = loadConfig();
|
|
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds);
|
|
void waitForSubagentCompletion(runId, waitTimeoutMs);
|
|
resumedRuns.add(runId);
|
|
}
|
|
|
|
function restoreSubagentRunsOnce() {
|
|
if (restoreAttempted) {
|
|
return;
|
|
}
|
|
restoreAttempted = true;
|
|
try {
|
|
const restoredCount = restoreSubagentRunsFromDisk({
|
|
runs: subagentRuns,
|
|
mergeOnly: true,
|
|
});
|
|
if (restoredCount === 0) {
|
|
return;
|
|
}
|
|
if (reconcileOrphanedRestoredRuns()) {
|
|
persistSubagentRuns();
|
|
}
|
|
if (subagentRuns.size === 0) {
|
|
return;
|
|
}
|
|
// Resume pending work.
|
|
ensureListener();
|
|
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
|
|
startSweeper();
|
|
}
|
|
for (const runId of subagentRuns.keys()) {
|
|
resumeSubagentRun(runId);
|
|
}
|
|
} catch {
|
|
// ignore restore failures
|
|
}
|
|
}
|
|
|
|
function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
|
|
const config = cfg ?? loadConfig();
|
|
const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60;
|
|
if (!Number.isFinite(minutes) || minutes <= 0) {
|
|
return undefined;
|
|
}
|
|
return Math.max(1, Math.floor(minutes)) * 60_000;
|
|
}
|
|
|
|
function resolveSubagentWaitTimeoutMs(
|
|
cfg: ReturnType<typeof loadConfig>,
|
|
runTimeoutSeconds?: number,
|
|
) {
|
|
return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds ?? 0 });
|
|
}
|
|
|
|
function startSweeper() {
|
|
if (sweeper) {
|
|
return;
|
|
}
|
|
sweeper = setInterval(() => {
|
|
void sweepSubagentRuns();
|
|
}, 60_000);
|
|
sweeper.unref?.();
|
|
}
|
|
|
|
function stopSweeper() {
|
|
if (!sweeper) {
|
|
return;
|
|
}
|
|
clearInterval(sweeper);
|
|
sweeper = null;
|
|
}
|
|
|
|
async function sweepSubagentRuns() {
|
|
const now = Date.now();
|
|
let mutated = false;
|
|
for (const [runId, entry] of subagentRuns.entries()) {
|
|
if (!entry.archiveAtMs || entry.archiveAtMs > now) {
|
|
continue;
|
|
}
|
|
clearPendingLifecycleError(runId);
|
|
void notifyContextEngineSubagentEnded({
|
|
childSessionKey: entry.childSessionKey,
|
|
reason: "swept",
|
|
});
|
|
subagentRuns.delete(runId);
|
|
mutated = true;
|
|
// Archive/purge is terminal for the run record; remove any retained attachments too.
|
|
await safeRemoveAttachmentsDir(entry);
|
|
try {
|
|
await callGateway({
|
|
method: "sessions.delete",
|
|
params: {
|
|
key: entry.childSessionKey,
|
|
deleteTranscript: true,
|
|
emitLifecycleHooks: false,
|
|
},
|
|
timeoutMs: 10_000,
|
|
});
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
if (mutated) {
|
|
persistSubagentRuns();
|
|
}
|
|
if (subagentRuns.size === 0) {
|
|
stopSweeper();
|
|
}
|
|
}
|
|
|
|
function ensureListener() {
|
|
if (listenerStarted) {
|
|
return;
|
|
}
|
|
listenerStarted = true;
|
|
listenerStop = onAgentEvent((evt) => {
|
|
void (async () => {
|
|
if (!evt || evt.stream !== "lifecycle") {
|
|
return;
|
|
}
|
|
const phase = evt.data?.phase;
|
|
const entry = subagentRuns.get(evt.runId);
|
|
if (!entry) {
|
|
if (phase === "end" && typeof evt.sessionKey === "string") {
|
|
await refreshFrozenResultFromSession(evt.sessionKey);
|
|
}
|
|
return;
|
|
}
|
|
if (phase === "start") {
|
|
clearPendingLifecycleError(evt.runId);
|
|
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
|
|
if (startedAt) {
|
|
entry.startedAt = startedAt;
|
|
persistSubagentRuns();
|
|
}
|
|
return;
|
|
}
|
|
if (phase !== "end" && phase !== "error") {
|
|
return;
|
|
}
|
|
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : Date.now();
|
|
const error = typeof evt.data?.error === "string" ? evt.data.error : undefined;
|
|
if (phase === "error") {
|
|
schedulePendingLifecycleError({
|
|
runId: evt.runId,
|
|
endedAt,
|
|
error,
|
|
});
|
|
return;
|
|
}
|
|
clearPendingLifecycleError(evt.runId);
|
|
const outcome: SubagentRunOutcome = evt.data?.aborted
|
|
? { status: "timeout" }
|
|
: { status: "ok" };
|
|
await completeSubagentRun({
|
|
runId: evt.runId,
|
|
endedAt,
|
|
outcome,
|
|
reason: SUBAGENT_ENDED_REASON_COMPLETE,
|
|
sendFarewell: true,
|
|
accountId: entry.requesterOrigin?.accountId,
|
|
triggerCleanup: true,
|
|
});
|
|
})();
|
|
});
|
|
}
|
|
|
|
async function safeRemoveAttachmentsDir(entry: SubagentRunRecord): Promise<void> {
|
|
if (!entry.attachmentsDir || !entry.attachmentsRootDir) {
|
|
return;
|
|
}
|
|
|
|
const resolveReal = async (targetPath: string): Promise<string | null> => {
|
|
try {
|
|
return await fs.realpath(targetPath);
|
|
} catch (err) {
|
|
if ((err as NodeJS.ErrnoException | undefined)?.code === "ENOENT") {
|
|
return null;
|
|
}
|
|
throw err;
|
|
}
|
|
};
|
|
|
|
try {
|
|
const [rootReal, dirReal] = await Promise.all([
|
|
resolveReal(entry.attachmentsRootDir),
|
|
resolveReal(entry.attachmentsDir),
|
|
]);
|
|
if (!dirReal) {
|
|
return;
|
|
}
|
|
|
|
const rootBase = rootReal ?? path.resolve(entry.attachmentsRootDir);
|
|
// dirReal is guaranteed non-null here (early return above handles null case).
|
|
const dirBase = dirReal;
|
|
const rootWithSep = rootBase.endsWith(path.sep) ? rootBase : `${rootBase}${path.sep}`;
|
|
if (!dirBase.startsWith(rootWithSep)) {
|
|
return;
|
|
}
|
|
await fs.rm(dirBase, { recursive: true, force: true });
|
|
} catch {
|
|
// best effort
|
|
}
|
|
}
|
|
|
|
async function finalizeSubagentCleanup(
|
|
runId: string,
|
|
cleanup: "delete" | "keep",
|
|
didAnnounce: boolean,
|
|
) {
|
|
const entry = subagentRuns.get(runId);
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
if (didAnnounce) {
|
|
entry.wakeOnDescendantSettle = undefined;
|
|
entry.fallbackFrozenResultText = undefined;
|
|
entry.fallbackFrozenResultCapturedAt = undefined;
|
|
const completionReason = resolveCleanupCompletionReason(entry);
|
|
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
|
// Clean up attachments before the run record is removed.
|
|
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
|
if (shouldDeleteAttachments) {
|
|
await safeRemoveAttachmentsDir(entry);
|
|
}
|
|
if (cleanup === "delete") {
|
|
entry.frozenResultText = undefined;
|
|
entry.frozenResultCapturedAt = undefined;
|
|
}
|
|
completeCleanupBookkeeping({
|
|
runId,
|
|
entry,
|
|
cleanup,
|
|
completedAt: Date.now(),
|
|
});
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
const deferredDecision = resolveDeferredCleanupDecision({
|
|
entry,
|
|
now,
|
|
// Defer until descendants are fully settled, including post-end cleanup.
|
|
activeDescendantRuns: Math.max(0, countPendingDescendantRuns(entry.childSessionKey)),
|
|
announceExpiryMs: ANNOUNCE_EXPIRY_MS,
|
|
announceCompletionHardExpiryMs: ANNOUNCE_COMPLETION_HARD_EXPIRY_MS,
|
|
maxAnnounceRetryCount: MAX_ANNOUNCE_RETRY_COUNT,
|
|
deferDescendantDelayMs: MIN_ANNOUNCE_RETRY_DELAY_MS,
|
|
resolveAnnounceRetryDelayMs,
|
|
});
|
|
|
|
if (deferredDecision.kind === "defer-descendants") {
|
|
entry.lastAnnounceRetryAt = now;
|
|
entry.wakeOnDescendantSettle = true;
|
|
entry.cleanupHandled = false;
|
|
resumedRuns.delete(runId);
|
|
persistSubagentRuns();
|
|
setTimeout(() => {
|
|
resumeSubagentRun(runId);
|
|
}, deferredDecision.delayMs).unref?.();
|
|
return;
|
|
}
|
|
|
|
if (deferredDecision.retryCount != null) {
|
|
entry.announceRetryCount = deferredDecision.retryCount;
|
|
entry.lastAnnounceRetryAt = now;
|
|
}
|
|
|
|
if (deferredDecision.kind === "give-up") {
|
|
entry.wakeOnDescendantSettle = undefined;
|
|
entry.fallbackFrozenResultText = undefined;
|
|
entry.fallbackFrozenResultCapturedAt = undefined;
|
|
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
|
if (shouldDeleteAttachments) {
|
|
await safeRemoveAttachmentsDir(entry);
|
|
}
|
|
const completionReason = resolveCleanupCompletionReason(entry);
|
|
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
|
logAnnounceGiveUp(entry, deferredDecision.reason);
|
|
completeCleanupBookkeeping({
|
|
runId,
|
|
entry,
|
|
cleanup: "keep",
|
|
completedAt: now,
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Keep both cleanup modes retryable after deferred/failed announce.
|
|
// Delete-mode is finalized only after announce succeeds or give-up triggers.
|
|
entry.cleanupHandled = false;
|
|
// Clear the in-flight resume marker so the scheduled retry can run again.
|
|
resumedRuns.delete(runId);
|
|
persistSubagentRuns();
|
|
if (deferredDecision.resumeDelayMs == null) {
|
|
return;
|
|
}
|
|
setTimeout(() => {
|
|
resumeSubagentRun(runId);
|
|
}, deferredDecision.resumeDelayMs).unref?.();
|
|
}
|
|
|
|
async function emitCompletionEndedHookIfNeeded(
|
|
entry: SubagentRunRecord,
|
|
reason: SubagentLifecycleEndedReason,
|
|
) {
|
|
if (
|
|
entry.expectsCompletionMessage === true &&
|
|
shouldEmitEndedHookForRun({
|
|
entry,
|
|
reason,
|
|
})
|
|
) {
|
|
await emitSubagentEndedHookForRun({
|
|
entry,
|
|
reason,
|
|
sendFarewell: true,
|
|
});
|
|
}
|
|
}
|
|
|
|
function completeCleanupBookkeeping(params: {
|
|
runId: string;
|
|
entry: SubagentRunRecord;
|
|
cleanup: "delete" | "keep";
|
|
completedAt: number;
|
|
}) {
|
|
if (params.cleanup === "delete") {
|
|
clearPendingLifecycleError(params.runId);
|
|
void notifyContextEngineSubagentEnded({
|
|
childSessionKey: params.entry.childSessionKey,
|
|
reason: "deleted",
|
|
});
|
|
subagentRuns.delete(params.runId);
|
|
persistSubagentRuns();
|
|
retryDeferredCompletedAnnounces(params.runId);
|
|
return;
|
|
}
|
|
void notifyContextEngineSubagentEnded({
|
|
childSessionKey: params.entry.childSessionKey,
|
|
reason: "completed",
|
|
});
|
|
params.entry.cleanupCompletedAt = params.completedAt;
|
|
persistSubagentRuns();
|
|
retryDeferredCompletedAnnounces(params.runId);
|
|
}
|
|
|
|
function retryDeferredCompletedAnnounces(excludeRunId?: string) {
|
|
const now = Date.now();
|
|
for (const [runId, entry] of subagentRuns.entries()) {
|
|
if (excludeRunId && runId === excludeRunId) {
|
|
continue;
|
|
}
|
|
if (typeof entry.endedAt !== "number") {
|
|
continue;
|
|
}
|
|
if (entry.cleanupCompletedAt || entry.cleanupHandled) {
|
|
continue;
|
|
}
|
|
if (suppressAnnounceForSteerRestart(entry)) {
|
|
continue;
|
|
}
|
|
// Force-expire stale non-completion announces; completion-message flows can
|
|
// stay pending while descendants run for a long time.
|
|
const endedAgo = now - (entry.endedAt ?? now);
|
|
if (entry.expectsCompletionMessage !== true && endedAgo > ANNOUNCE_EXPIRY_MS) {
|
|
logAnnounceGiveUp(entry, "expiry");
|
|
entry.cleanupCompletedAt = now;
|
|
persistSubagentRuns();
|
|
continue;
|
|
}
|
|
resumedRuns.delete(runId);
|
|
resumeSubagentRun(runId);
|
|
}
|
|
}
|
|
|
|
function beginSubagentCleanup(runId: string) {
|
|
const entry = subagentRuns.get(runId);
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
if (entry.cleanupCompletedAt) {
|
|
return false;
|
|
}
|
|
if (entry.cleanupHandled) {
|
|
return false;
|
|
}
|
|
entry.cleanupHandled = true;
|
|
persistSubagentRuns();
|
|
return true;
|
|
}
|
|
|
|
export function markSubagentRunForSteerRestart(runId: string) {
|
|
const key = runId.trim();
|
|
if (!key) {
|
|
return false;
|
|
}
|
|
const entry = subagentRuns.get(key);
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
if (entry.suppressAnnounceReason === "steer-restart") {
|
|
return true;
|
|
}
|
|
entry.suppressAnnounceReason = "steer-restart";
|
|
persistSubagentRuns();
|
|
return true;
|
|
}
|
|
|
|
export function clearSubagentRunSteerRestart(runId: string) {
|
|
const key = runId.trim();
|
|
if (!key) {
|
|
return false;
|
|
}
|
|
const entry = subagentRuns.get(key);
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
if (entry.suppressAnnounceReason !== "steer-restart") {
|
|
return true;
|
|
}
|
|
entry.suppressAnnounceReason = undefined;
|
|
persistSubagentRuns();
|
|
// If the interrupted run already finished while suppression was active, retry
|
|
// cleanup now so completion output is not lost when restart dispatch fails.
|
|
resumedRuns.delete(key);
|
|
if (typeof entry.endedAt === "number" && !entry.cleanupCompletedAt) {
|
|
resumeSubagentRun(key);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
export function replaceSubagentRunAfterSteer(params: {
|
|
previousRunId: string;
|
|
nextRunId: string;
|
|
fallback?: SubagentRunRecord;
|
|
runTimeoutSeconds?: number;
|
|
preserveFrozenResultFallback?: boolean;
|
|
}) {
|
|
const previousRunId = params.previousRunId.trim();
|
|
const nextRunId = params.nextRunId.trim();
|
|
if (!previousRunId || !nextRunId) {
|
|
return false;
|
|
}
|
|
|
|
const previous = subagentRuns.get(previousRunId);
|
|
const source = previous ?? params.fallback;
|
|
if (!source) {
|
|
return false;
|
|
}
|
|
|
|
if (previousRunId !== nextRunId) {
|
|
clearPendingLifecycleError(previousRunId);
|
|
subagentRuns.delete(previousRunId);
|
|
resumedRuns.delete(previousRunId);
|
|
}
|
|
|
|
const now = Date.now();
|
|
const cfg = loadConfig();
|
|
const archiveAfterMs = resolveArchiveAfterMs(cfg);
|
|
const spawnMode = source.spawnMode === "session" ? "session" : "run";
|
|
const archiveAtMs =
|
|
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
|
|
const runTimeoutSeconds = params.runTimeoutSeconds ?? source.runTimeoutSeconds ?? 0;
|
|
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
|
|
const preserveFrozenResultFallback = params.preserveFrozenResultFallback === true;
|
|
|
|
const next: SubagentRunRecord = {
|
|
...source,
|
|
runId: nextRunId,
|
|
startedAt: now,
|
|
endedAt: undefined,
|
|
endedReason: undefined,
|
|
endedHookEmittedAt: undefined,
|
|
wakeOnDescendantSettle: undefined,
|
|
outcome: undefined,
|
|
frozenResultText: undefined,
|
|
frozenResultCapturedAt: undefined,
|
|
fallbackFrozenResultText: preserveFrozenResultFallback ? source.frozenResultText : undefined,
|
|
fallbackFrozenResultCapturedAt: preserveFrozenResultFallback
|
|
? source.frozenResultCapturedAt
|
|
: undefined,
|
|
cleanupCompletedAt: undefined,
|
|
cleanupHandled: false,
|
|
suppressAnnounceReason: undefined,
|
|
announceRetryCount: undefined,
|
|
lastAnnounceRetryAt: undefined,
|
|
spawnMode,
|
|
archiveAtMs,
|
|
runTimeoutSeconds,
|
|
};
|
|
|
|
subagentRuns.set(nextRunId, next);
|
|
ensureListener();
|
|
persistSubagentRuns();
|
|
if (archiveAtMs) {
|
|
startSweeper();
|
|
}
|
|
void waitForSubagentCompletion(nextRunId, waitTimeoutMs);
|
|
return true;
|
|
}
|
|
|
|
export function registerSubagentRun(params: {
|
|
runId: string;
|
|
childSessionKey: string;
|
|
requesterSessionKey: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
requesterDisplayKey: string;
|
|
task: string;
|
|
cleanup: "delete" | "keep";
|
|
label?: string;
|
|
model?: string;
|
|
runTimeoutSeconds?: number;
|
|
expectsCompletionMessage?: boolean;
|
|
spawnMode?: "run" | "session";
|
|
attachmentsDir?: string;
|
|
attachmentsRootDir?: string;
|
|
retainAttachmentsOnKeep?: boolean;
|
|
}) {
|
|
const now = Date.now();
|
|
const cfg = loadConfig();
|
|
const archiveAfterMs = resolveArchiveAfterMs(cfg);
|
|
const spawnMode = params.spawnMode === "session" ? "session" : "run";
|
|
const archiveAtMs =
|
|
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
|
|
const runTimeoutSeconds = params.runTimeoutSeconds ?? 0;
|
|
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
|
|
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
|
|
subagentRuns.set(params.runId, {
|
|
runId: params.runId,
|
|
childSessionKey: params.childSessionKey,
|
|
requesterSessionKey: params.requesterSessionKey,
|
|
requesterOrigin,
|
|
requesterDisplayKey: params.requesterDisplayKey,
|
|
task: params.task,
|
|
cleanup: params.cleanup,
|
|
expectsCompletionMessage: params.expectsCompletionMessage,
|
|
spawnMode,
|
|
label: params.label,
|
|
model: params.model,
|
|
runTimeoutSeconds,
|
|
createdAt: now,
|
|
startedAt: now,
|
|
archiveAtMs,
|
|
cleanupHandled: false,
|
|
wakeOnDescendantSettle: undefined,
|
|
attachmentsDir: params.attachmentsDir,
|
|
attachmentsRootDir: params.attachmentsRootDir,
|
|
retainAttachmentsOnKeep: params.retainAttachmentsOnKeep,
|
|
});
|
|
ensureListener();
|
|
persistSubagentRuns();
|
|
if (archiveAtMs) {
|
|
startSweeper();
|
|
}
|
|
// Wait for subagent completion via gateway RPC (cross-process).
|
|
// The in-process lifecycle listener is a fallback for embedded runs.
|
|
void waitForSubagentCompletion(params.runId, waitTimeoutMs);
|
|
}
|
|
|
|
async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
|
try {
|
|
const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs));
|
|
const wait = await callGateway<{
|
|
status?: string;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
error?: string;
|
|
}>({
|
|
method: "agent.wait",
|
|
params: {
|
|
runId,
|
|
timeoutMs,
|
|
},
|
|
timeoutMs: timeoutMs + 10_000,
|
|
});
|
|
if (wait?.status !== "ok" && wait?.status !== "error" && wait?.status !== "timeout") {
|
|
return;
|
|
}
|
|
const entry = subagentRuns.get(runId);
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
let mutated = false;
|
|
if (typeof wait.startedAt === "number") {
|
|
entry.startedAt = wait.startedAt;
|
|
mutated = true;
|
|
}
|
|
if (typeof wait.endedAt === "number") {
|
|
entry.endedAt = wait.endedAt;
|
|
mutated = true;
|
|
}
|
|
if (!entry.endedAt) {
|
|
entry.endedAt = Date.now();
|
|
mutated = true;
|
|
}
|
|
const waitError = typeof wait.error === "string" ? wait.error : undefined;
|
|
const outcome: SubagentRunOutcome =
|
|
wait.status === "error"
|
|
? { status: "error", error: waitError }
|
|
: wait.status === "timeout"
|
|
? { status: "timeout" }
|
|
: { status: "ok" };
|
|
if (!runOutcomesEqual(entry.outcome, outcome)) {
|
|
entry.outcome = outcome;
|
|
mutated = true;
|
|
}
|
|
if (mutated) {
|
|
persistSubagentRuns();
|
|
}
|
|
await completeSubagentRun({
|
|
runId,
|
|
endedAt: entry.endedAt,
|
|
outcome,
|
|
reason:
|
|
wait.status === "error" ? SUBAGENT_ENDED_REASON_ERROR : SUBAGENT_ENDED_REASON_COMPLETE,
|
|
sendFarewell: true,
|
|
accountId: entry.requesterOrigin?.accountId,
|
|
triggerCleanup: true,
|
|
});
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
|
|
export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
|
|
subagentRuns.clear();
|
|
resumedRuns.clear();
|
|
endedHookInFlightRunIds.clear();
|
|
clearAllPendingLifecycleErrors();
|
|
resetAnnounceQueuesForTests();
|
|
stopSweeper();
|
|
restoreAttempted = false;
|
|
if (listenerStop) {
|
|
listenerStop();
|
|
listenerStop = null;
|
|
}
|
|
listenerStarted = false;
|
|
if (opts?.persist !== false) {
|
|
persistSubagentRuns();
|
|
}
|
|
}
|
|
|
|
export function addSubagentRunForTests(entry: SubagentRunRecord) {
|
|
subagentRuns.set(entry.runId, entry);
|
|
}
|
|
|
|
export function releaseSubagentRun(runId: string) {
|
|
clearPendingLifecycleError(runId);
|
|
const entry = subagentRuns.get(runId);
|
|
if (entry) {
|
|
void notifyContextEngineSubagentEnded({
|
|
childSessionKey: entry.childSessionKey,
|
|
reason: "released",
|
|
});
|
|
}
|
|
const didDelete = subagentRuns.delete(runId);
|
|
if (didDelete) {
|
|
persistSubagentRuns();
|
|
}
|
|
if (subagentRuns.size === 0) {
|
|
stopSweeper();
|
|
}
|
|
}
|
|
|
|
function findRunIdsByChildSessionKey(childSessionKey: string): string[] {
|
|
return findRunIdsByChildSessionKeyFromRuns(subagentRuns, childSessionKey);
|
|
}
|
|
|
|
export function resolveRequesterForChildSession(childSessionKey: string): {
|
|
requesterSessionKey: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
} | null {
|
|
const resolved = resolveRequesterForChildSessionFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
childSessionKey,
|
|
);
|
|
if (!resolved) {
|
|
return null;
|
|
}
|
|
return {
|
|
requesterSessionKey: resolved.requesterSessionKey,
|
|
requesterOrigin: normalizeDeliveryContext(resolved.requesterOrigin),
|
|
};
|
|
}
|
|
|
|
export function isSubagentSessionRunActive(childSessionKey: string): boolean {
|
|
const runIds = findRunIdsByChildSessionKey(childSessionKey);
|
|
for (const runId of runIds) {
|
|
const entry = subagentRuns.get(runId);
|
|
if (!entry) {
|
|
continue;
|
|
}
|
|
if (typeof entry.endedAt !== "number") {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
export function shouldIgnorePostCompletionAnnounceForSession(childSessionKey: string): boolean {
|
|
return shouldIgnorePostCompletionAnnounceForSessionFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
childSessionKey,
|
|
);
|
|
}
|
|
|
|
export function markSubagentRunTerminated(params: {
|
|
runId?: string;
|
|
childSessionKey?: string;
|
|
reason?: string;
|
|
}): number {
|
|
const runIds = new Set<string>();
|
|
if (typeof params.runId === "string" && params.runId.trim()) {
|
|
runIds.add(params.runId.trim());
|
|
}
|
|
if (typeof params.childSessionKey === "string" && params.childSessionKey.trim()) {
|
|
for (const runId of findRunIdsByChildSessionKey(params.childSessionKey)) {
|
|
runIds.add(runId);
|
|
}
|
|
}
|
|
if (runIds.size === 0) {
|
|
return 0;
|
|
}
|
|
|
|
const now = Date.now();
|
|
const reason = params.reason?.trim() || "killed";
|
|
let updated = 0;
|
|
const entriesByChildSessionKey = new Map<string, SubagentRunRecord>();
|
|
for (const runId of runIds) {
|
|
clearPendingLifecycleError(runId);
|
|
const entry = subagentRuns.get(runId);
|
|
if (!entry) {
|
|
continue;
|
|
}
|
|
if (typeof entry.endedAt === "number") {
|
|
continue;
|
|
}
|
|
entry.endedAt = now;
|
|
entry.outcome = { status: "error", error: reason };
|
|
entry.endedReason = SUBAGENT_ENDED_REASON_KILLED;
|
|
entry.cleanupHandled = true;
|
|
entry.cleanupCompletedAt = now;
|
|
entry.suppressAnnounceReason = "killed";
|
|
if (!entriesByChildSessionKey.has(entry.childSessionKey)) {
|
|
entriesByChildSessionKey.set(entry.childSessionKey, entry);
|
|
}
|
|
updated += 1;
|
|
}
|
|
if (updated > 0) {
|
|
persistSubagentRuns();
|
|
for (const entry of entriesByChildSessionKey.values()) {
|
|
void emitSubagentEndedHookOnce({
|
|
entry,
|
|
reason: SUBAGENT_ENDED_REASON_KILLED,
|
|
sendFarewell: true,
|
|
outcome: SUBAGENT_ENDED_OUTCOME_KILLED,
|
|
error: reason,
|
|
inFlightRunIds: endedHookInFlightRunIds,
|
|
persist: persistSubagentRuns,
|
|
}).catch(() => {
|
|
// Hook failures should not break termination flow.
|
|
});
|
|
}
|
|
}
|
|
return updated;
|
|
}
|
|
|
|
export function listSubagentRunsForRequester(
|
|
requesterSessionKey: string,
|
|
options?: { requesterRunId?: string },
|
|
): SubagentRunRecord[] {
|
|
return listRunsForRequesterFromRuns(subagentRuns, requesterSessionKey, options);
|
|
}
|
|
|
|
export function countActiveRunsForSession(requesterSessionKey: string): number {
|
|
return countActiveRunsForSessionFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
requesterSessionKey,
|
|
);
|
|
}
|
|
|
|
export function countActiveDescendantRuns(rootSessionKey: string): number {
|
|
return countActiveDescendantRunsFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
rootSessionKey,
|
|
);
|
|
}
|
|
|
|
export function countPendingDescendantRuns(rootSessionKey: string): number {
|
|
return countPendingDescendantRunsFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
rootSessionKey,
|
|
);
|
|
}
|
|
|
|
export function countPendingDescendantRunsExcludingRun(
|
|
rootSessionKey: string,
|
|
excludeRunId: string,
|
|
): number {
|
|
return countPendingDescendantRunsExcludingRunFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
rootSessionKey,
|
|
excludeRunId,
|
|
);
|
|
}
|
|
|
|
export function listDescendantRunsForRequester(rootSessionKey: string): SubagentRunRecord[] {
|
|
return listDescendantRunsForRequesterFromRuns(
|
|
getSubagentRunsSnapshotForRead(subagentRuns),
|
|
rootSessionKey,
|
|
);
|
|
}
|
|
|
|
export function initSubagentRegistry() {
|
|
restoreSubagentRunsOnce();
|
|
}
|