mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-29 10:50:58 +00:00
refactor: split subagent registry lifecycle
This commit is contained in:
499
src/agents/subagent-registry-lifecycle.ts
Normal file
499
src/agents/subagent-registry-lifecycle.ts
Normal file
@@ -0,0 +1,499 @@
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import {
|
||||
captureSubagentCompletionReply,
|
||||
runSubagentAnnounceFlow,
|
||||
type SubagentRunOutcome,
|
||||
} from "./subagent-announce.js";
|
||||
import {
|
||||
SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
type SubagentLifecycleEndedReason,
|
||||
} from "./subagent-lifecycle-events.js";
|
||||
import {
|
||||
resolveCleanupCompletionReason,
|
||||
resolveDeferredCleanupDecision,
|
||||
} from "./subagent-registry-cleanup.js";
|
||||
import { runOutcomesEqual } from "./subagent-registry-completion.js";
|
||||
import {
|
||||
ANNOUNCE_COMPLETION_HARD_EXPIRY_MS,
|
||||
ANNOUNCE_EXPIRY_MS,
|
||||
capFrozenResultText,
|
||||
logAnnounceGiveUp,
|
||||
MAX_ANNOUNCE_RETRY_COUNT,
|
||||
MIN_ANNOUNCE_RETRY_DELAY_MS,
|
||||
persistSubagentSessionTiming,
|
||||
resolveAnnounceRetryDelayMs,
|
||||
safeRemoveAttachmentsDir,
|
||||
} from "./subagent-registry-helpers.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
export function createSubagentRegistryLifecycleController(params: {
|
||||
runs: Map<string, SubagentRunRecord>;
|
||||
resumedRuns: Set<string>;
|
||||
subagentAnnounceTimeoutMs: number;
|
||||
persist(): void;
|
||||
clearPendingLifecycleError(runId: string): void;
|
||||
countPendingDescendantRuns(rootSessionKey: string): number;
|
||||
suppressAnnounceForSteerRestart(entry?: SubagentRunRecord): boolean;
|
||||
shouldEmitEndedHookForRun(args: {
|
||||
entry: SubagentRunRecord;
|
||||
reason: SubagentLifecycleEndedReason;
|
||||
}): boolean;
|
||||
emitSubagentEndedHookForRun(args: {
|
||||
entry: SubagentRunRecord;
|
||||
reason?: SubagentLifecycleEndedReason;
|
||||
sendFarewell?: boolean;
|
||||
accountId?: string;
|
||||
}): Promise<void>;
|
||||
notifyContextEngineSubagentEnded(args: {
|
||||
childSessionKey: string;
|
||||
reason: "completed" | "deleted";
|
||||
workspaceDir?: string;
|
||||
}): Promise<void>;
|
||||
resumeSubagentRun(runId: string): void;
|
||||
warn(message: string, meta?: Record<string, unknown>): void;
|
||||
}) {
|
||||
const freezeRunResultAtCompletion = async (entry: SubagentRunRecord): Promise<boolean> => {
|
||||
if (entry.frozenResultText !== undefined) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
const captured = await captureSubagentCompletionReply(entry.childSessionKey);
|
||||
entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null;
|
||||
} catch {
|
||||
entry.frozenResultText = null;
|
||||
}
|
||||
entry.frozenResultCapturedAt = Date.now();
|
||||
return true;
|
||||
};
|
||||
|
||||
const listPendingCompletionRunsForSession = (sessionKey: string): SubagentRunRecord[] => {
|
||||
const key = sessionKey.trim();
|
||||
if (!key) {
|
||||
return [];
|
||||
}
|
||||
const out: SubagentRunRecord[] = [];
|
||||
for (const entry of params.runs.values()) {
|
||||
if (entry.childSessionKey !== key) {
|
||||
continue;
|
||||
}
|
||||
if (entry.expectsCompletionMessage !== true) {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.cleanupCompletedAt === "number") {
|
||||
continue;
|
||||
}
|
||||
out.push(entry);
|
||||
}
|
||||
return out;
|
||||
};
|
||||
|
||||
const refreshFrozenResultFromSession = async (sessionKey: string): Promise<boolean> => {
|
||||
const candidates = listPendingCompletionRunsForSession(sessionKey);
|
||||
if (candidates.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let captured: string | undefined;
|
||||
try {
|
||||
captured = await captureSubagentCompletionReply(sessionKey);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
const trimmed = captured?.trim();
|
||||
if (!trimmed || isSilentReplyText(trimmed, SILENT_REPLY_TOKEN)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const nextFrozen = capFrozenResultText(trimmed);
|
||||
const capturedAt = Date.now();
|
||||
let changed = false;
|
||||
for (const entry of candidates) {
|
||||
if (entry.frozenResultText === nextFrozen) {
|
||||
continue;
|
||||
}
|
||||
entry.frozenResultText = nextFrozen;
|
||||
entry.frozenResultCapturedAt = capturedAt;
|
||||
changed = true;
|
||||
}
|
||||
if (changed) {
|
||||
params.persist();
|
||||
}
|
||||
return changed;
|
||||
};
|
||||
|
||||
const emitCompletionEndedHookIfNeeded = async (
|
||||
entry: SubagentRunRecord,
|
||||
reason: SubagentLifecycleEndedReason,
|
||||
) => {
|
||||
if (
|
||||
entry.expectsCompletionMessage === true &&
|
||||
params.shouldEmitEndedHookForRun({
|
||||
entry,
|
||||
reason,
|
||||
})
|
||||
) {
|
||||
await params.emitSubagentEndedHookForRun({
|
||||
entry,
|
||||
reason,
|
||||
sendFarewell: true,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const finalizeResumedAnnounceGiveUp = async (giveUpParams: {
|
||||
runId: string;
|
||||
entry: SubagentRunRecord;
|
||||
reason: "retry-limit" | "expiry";
|
||||
}) => {
|
||||
giveUpParams.entry.wakeOnDescendantSettle = undefined;
|
||||
giveUpParams.entry.fallbackFrozenResultText = undefined;
|
||||
giveUpParams.entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const shouldDeleteAttachments =
|
||||
giveUpParams.entry.cleanup === "delete" || !giveUpParams.entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(giveUpParams.entry);
|
||||
}
|
||||
const completionReason = resolveCleanupCompletionReason(giveUpParams.entry);
|
||||
await emitCompletionEndedHookIfNeeded(giveUpParams.entry, completionReason);
|
||||
logAnnounceGiveUp(giveUpParams.entry, giveUpParams.reason);
|
||||
completeCleanupBookkeeping({
|
||||
runId: giveUpParams.runId,
|
||||
entry: giveUpParams.entry,
|
||||
cleanup: giveUpParams.entry.cleanup,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
};
|
||||
|
||||
const beginSubagentCleanup = (runId: string) => {
|
||||
const entry = params.runs.get(runId);
|
||||
if (!entry) {
|
||||
return false;
|
||||
}
|
||||
if (entry.cleanupCompletedAt || entry.cleanupHandled) {
|
||||
return false;
|
||||
}
|
||||
entry.cleanupHandled = true;
|
||||
params.persist();
|
||||
return true;
|
||||
};
|
||||
|
||||
const retryDeferredCompletedAnnounces = (excludeRunId?: string) => {
|
||||
const now = Date.now();
|
||||
for (const [runId, entry] of params.runs.entries()) {
|
||||
if (excludeRunId && runId === excludeRunId) {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
continue;
|
||||
}
|
||||
if (entry.cleanupCompletedAt || entry.cleanupHandled) {
|
||||
continue;
|
||||
}
|
||||
if (params.suppressAnnounceForSteerRestart(entry)) {
|
||||
continue;
|
||||
}
|
||||
const endedAgo = now - (entry.endedAt ?? now);
|
||||
if (entry.expectsCompletionMessage !== true && endedAgo > ANNOUNCE_EXPIRY_MS) {
|
||||
if (!beginSubagentCleanup(runId)) {
|
||||
continue;
|
||||
}
|
||||
void finalizeResumedAnnounceGiveUp({
|
||||
runId,
|
||||
entry,
|
||||
reason: "expiry",
|
||||
}).catch((error) => {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent expiry finalize failed during deferred retry for run ${runId}: ${String(error)}`,
|
||||
);
|
||||
const current = params.runs.get(runId);
|
||||
if (!current || current.cleanupCompletedAt) {
|
||||
return;
|
||||
}
|
||||
current.cleanupHandled = false;
|
||||
params.persist();
|
||||
});
|
||||
continue;
|
||||
}
|
||||
params.resumedRuns.delete(runId);
|
||||
params.resumeSubagentRun(runId);
|
||||
}
|
||||
};
|
||||
|
||||
const completeCleanupBookkeeping = (cleanupParams: {
|
||||
runId: string;
|
||||
entry: SubagentRunRecord;
|
||||
cleanup: "delete" | "keep";
|
||||
completedAt: number;
|
||||
}) => {
|
||||
if (cleanupParams.cleanup === "delete") {
|
||||
params.clearPendingLifecycleError(cleanupParams.runId);
|
||||
void params.notifyContextEngineSubagentEnded({
|
||||
childSessionKey: cleanupParams.entry.childSessionKey,
|
||||
reason: "deleted",
|
||||
workspaceDir: cleanupParams.entry.workspaceDir,
|
||||
});
|
||||
params.runs.delete(cleanupParams.runId);
|
||||
params.persist();
|
||||
retryDeferredCompletedAnnounces(cleanupParams.runId);
|
||||
return;
|
||||
}
|
||||
void params.notifyContextEngineSubagentEnded({
|
||||
childSessionKey: cleanupParams.entry.childSessionKey,
|
||||
reason: "completed",
|
||||
workspaceDir: cleanupParams.entry.workspaceDir,
|
||||
});
|
||||
cleanupParams.entry.cleanupCompletedAt = cleanupParams.completedAt;
|
||||
params.persist();
|
||||
retryDeferredCompletedAnnounces(cleanupParams.runId);
|
||||
};
|
||||
|
||||
const finalizeSubagentCleanup = async (
|
||||
runId: string,
|
||||
cleanup: "delete" | "keep",
|
||||
didAnnounce: boolean,
|
||||
) => {
|
||||
const entry = params.runs.get(runId);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
if (didAnnounce) {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
entry.fallbackFrozenResultText = undefined;
|
||||
entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const completionReason = resolveCleanupCompletionReason(entry);
|
||||
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
||||
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
if (cleanup === "delete") {
|
||||
entry.frozenResultText = undefined;
|
||||
entry.frozenResultCapturedAt = undefined;
|
||||
}
|
||||
completeCleanupBookkeeping({
|
||||
runId,
|
||||
entry,
|
||||
cleanup,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const deferredDecision = resolveDeferredCleanupDecision({
|
||||
entry,
|
||||
now,
|
||||
activeDescendantRuns: Math.max(0, params.countPendingDescendantRuns(entry.childSessionKey)),
|
||||
announceExpiryMs: ANNOUNCE_EXPIRY_MS,
|
||||
announceCompletionHardExpiryMs: ANNOUNCE_COMPLETION_HARD_EXPIRY_MS,
|
||||
maxAnnounceRetryCount: MAX_ANNOUNCE_RETRY_COUNT,
|
||||
deferDescendantDelayMs: MIN_ANNOUNCE_RETRY_DELAY_MS,
|
||||
resolveAnnounceRetryDelayMs,
|
||||
});
|
||||
|
||||
if (deferredDecision.kind === "defer-descendants") {
|
||||
entry.lastAnnounceRetryAt = now;
|
||||
entry.wakeOnDescendantSettle = true;
|
||||
entry.cleanupHandled = false;
|
||||
params.resumedRuns.delete(runId);
|
||||
params.persist();
|
||||
setTimeout(() => {
|
||||
params.resumeSubagentRun(runId);
|
||||
}, deferredDecision.delayMs).unref?.();
|
||||
return;
|
||||
}
|
||||
|
||||
if (deferredDecision.retryCount != null) {
|
||||
entry.announceRetryCount = deferredDecision.retryCount;
|
||||
entry.lastAnnounceRetryAt = now;
|
||||
}
|
||||
|
||||
if (deferredDecision.kind === "give-up") {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
entry.fallbackFrozenResultText = undefined;
|
||||
entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
const completionReason = resolveCleanupCompletionReason(entry);
|
||||
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
||||
logAnnounceGiveUp(entry, deferredDecision.reason);
|
||||
completeCleanupBookkeeping({
|
||||
runId,
|
||||
entry,
|
||||
cleanup,
|
||||
completedAt: now,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
entry.cleanupHandled = false;
|
||||
params.resumedRuns.delete(runId);
|
||||
params.persist();
|
||||
if (deferredDecision.resumeDelayMs == null) {
|
||||
return;
|
||||
}
|
||||
setTimeout(() => {
|
||||
params.resumeSubagentRun(runId);
|
||||
}, deferredDecision.resumeDelayMs).unref?.();
|
||||
};
|
||||
|
||||
const startSubagentAnnounceCleanupFlow = (runId: string, entry: SubagentRunRecord): boolean => {
|
||||
if (!beginSubagentCleanup(runId)) {
|
||||
return false;
|
||||
}
|
||||
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
|
||||
const finalizeAnnounceCleanup = (didAnnounce: boolean) => {
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce).catch((err) => {
|
||||
defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`);
|
||||
const current = params.runs.get(runId);
|
||||
if (!current || current.cleanupCompletedAt) {
|
||||
return;
|
||||
}
|
||||
current.cleanupHandled = false;
|
||||
params.persist();
|
||||
});
|
||||
};
|
||||
|
||||
void runSubagentAnnounceFlow({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
childRunId: entry.runId,
|
||||
requesterSessionKey: entry.requesterSessionKey,
|
||||
requesterOrigin,
|
||||
requesterDisplayKey: entry.requesterDisplayKey,
|
||||
task: entry.task,
|
||||
timeoutMs: params.subagentAnnounceTimeoutMs,
|
||||
cleanup: entry.cleanup,
|
||||
roundOneReply: entry.frozenResultText ?? undefined,
|
||||
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
|
||||
waitForCompletion: false,
|
||||
startedAt: entry.startedAt,
|
||||
endedAt: entry.endedAt,
|
||||
label: entry.label,
|
||||
outcome: entry.outcome,
|
||||
spawnMode: entry.spawnMode,
|
||||
expectsCompletionMessage: entry.expectsCompletionMessage,
|
||||
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
|
||||
})
|
||||
.then((didAnnounce) => {
|
||||
finalizeAnnounceCleanup(didAnnounce);
|
||||
})
|
||||
.catch((error) => {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`,
|
||||
);
|
||||
finalizeAnnounceCleanup(false);
|
||||
});
|
||||
return true;
|
||||
};
|
||||
|
||||
const completeSubagentRun = async (completeParams: {
|
||||
runId: string;
|
||||
endedAt?: number;
|
||||
outcome: SubagentRunOutcome;
|
||||
reason: SubagentLifecycleEndedReason;
|
||||
sendFarewell?: boolean;
|
||||
accountId?: string;
|
||||
triggerCleanup: boolean;
|
||||
}) => {
|
||||
params.clearPendingLifecycleError(completeParams.runId);
|
||||
const entry = params.runs.get(completeParams.runId);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
let mutated = false;
|
||||
if (
|
||||
completeParams.reason === SUBAGENT_ENDED_REASON_COMPLETE &&
|
||||
entry.suppressAnnounceReason === "killed" &&
|
||||
(entry.cleanupHandled || typeof entry.cleanupCompletedAt === "number")
|
||||
) {
|
||||
entry.suppressAnnounceReason = undefined;
|
||||
entry.cleanupHandled = false;
|
||||
entry.cleanupCompletedAt = undefined;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const endedAt =
|
||||
typeof completeParams.endedAt === "number" ? completeParams.endedAt : Date.now();
|
||||
if (entry.endedAt !== endedAt) {
|
||||
entry.endedAt = endedAt;
|
||||
mutated = true;
|
||||
}
|
||||
if (!runOutcomesEqual(entry.outcome, completeParams.outcome)) {
|
||||
entry.outcome = completeParams.outcome;
|
||||
mutated = true;
|
||||
}
|
||||
if (entry.endedReason !== completeParams.reason) {
|
||||
entry.endedReason = completeParams.reason;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (await freezeRunResultAtCompletion(entry)) {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
params.persist();
|
||||
}
|
||||
|
||||
try {
|
||||
await persistSubagentSessionTiming(entry);
|
||||
} catch (err) {
|
||||
params.warn("failed to persist subagent session timing", {
|
||||
err,
|
||||
runId: entry.runId,
|
||||
childSessionKey: entry.childSessionKey,
|
||||
});
|
||||
}
|
||||
|
||||
const suppressedForSteerRestart = params.suppressAnnounceForSteerRestart(entry);
|
||||
if (mutated && !suppressedForSteerRestart) {
|
||||
emitSessionLifecycleEvent({
|
||||
sessionKey: entry.childSessionKey,
|
||||
reason: "subagent-status",
|
||||
parentSessionKey: entry.requesterSessionKey,
|
||||
label: entry.label,
|
||||
});
|
||||
}
|
||||
const shouldEmitEndedHook =
|
||||
!suppressedForSteerRestart &&
|
||||
params.shouldEmitEndedHookForRun({
|
||||
entry,
|
||||
reason: completeParams.reason,
|
||||
});
|
||||
const shouldDeferEndedHook =
|
||||
shouldEmitEndedHook &&
|
||||
completeParams.triggerCleanup &&
|
||||
entry.expectsCompletionMessage === true &&
|
||||
!suppressedForSteerRestart;
|
||||
if (!shouldDeferEndedHook && shouldEmitEndedHook) {
|
||||
await params.emitSubagentEndedHookForRun({
|
||||
entry,
|
||||
reason: completeParams.reason,
|
||||
sendFarewell: completeParams.sendFarewell,
|
||||
accountId: completeParams.accountId,
|
||||
});
|
||||
}
|
||||
|
||||
if (!completeParams.triggerCleanup || suppressedForSteerRestart) {
|
||||
return;
|
||||
}
|
||||
startSubagentAnnounceCleanupFlow(completeParams.runId, entry);
|
||||
};
|
||||
|
||||
return {
|
||||
completeCleanupBookkeeping,
|
||||
completeSubagentRun,
|
||||
finalizeResumedAnnounceGiveUp,
|
||||
refreshFrozenResultFromSession,
|
||||
startSubagentAnnounceCleanupFlow,
|
||||
};
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { ensureContextEnginesInitialized } from "../context-engine/init.js";
|
||||
import { resolveContextEngine } from "../context-engine/registry.js";
|
||||
@@ -6,41 +5,23 @@ import type { SubagentEndReason } from "../context-engine/types.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
|
||||
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
|
||||
import {
|
||||
captureSubagentCompletionReply,
|
||||
runSubagentAnnounceFlow,
|
||||
type SubagentRunOutcome,
|
||||
} from "./subagent-announce.js";
|
||||
import type { SubagentRunOutcome } from "./subagent-announce.js";
|
||||
import {
|
||||
SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
SUBAGENT_ENDED_REASON_ERROR,
|
||||
SUBAGENT_ENDED_REASON_KILLED,
|
||||
type SubagentLifecycleEndedReason,
|
||||
} from "./subagent-lifecycle-events.js";
|
||||
import {
|
||||
resolveCleanupCompletionReason,
|
||||
resolveDeferredCleanupDecision,
|
||||
} from "./subagent-registry-cleanup.js";
|
||||
import {
|
||||
emitSubagentEndedHookOnce,
|
||||
resolveLifecycleOutcomeFromRunOutcome,
|
||||
runOutcomesEqual,
|
||||
} from "./subagent-registry-completion.js";
|
||||
import {
|
||||
ANNOUNCE_COMPLETION_HARD_EXPIRY_MS,
|
||||
ANNOUNCE_EXPIRY_MS,
|
||||
capFrozenResultText,
|
||||
getSubagentSessionRuntimeMs,
|
||||
getSubagentSessionStartedAt,
|
||||
logAnnounceGiveUp,
|
||||
MAX_ANNOUNCE_RETRY_COUNT,
|
||||
MIN_ANNOUNCE_RETRY_DELAY_MS,
|
||||
persistSubagentSessionTiming,
|
||||
reconcileOrphanedRestoredRuns,
|
||||
reconcileOrphanedRun,
|
||||
resolveAnnounceRetryDelayMs,
|
||||
@@ -48,6 +29,7 @@ import {
|
||||
resolveSubagentSessionStatus,
|
||||
safeRemoveAttachmentsDir,
|
||||
} from "./subagent-registry-helpers.js";
|
||||
import { createSubagentRegistryLifecycleController } from "./subagent-registry-lifecycle.js";
|
||||
import { subagentRuns } from "./subagent-registry-memory.js";
|
||||
import {
|
||||
countActiveDescendantRunsFromRuns,
|
||||
@@ -229,233 +211,28 @@ async function emitSubagentEndedHookForRun(params: {
|
||||
});
|
||||
}
|
||||
|
||||
async function freezeRunResultAtCompletion(entry: SubagentRunRecord): Promise<boolean> {
|
||||
if (entry.frozenResultText !== undefined) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
const captured = await captureSubagentCompletionReply(entry.childSessionKey);
|
||||
entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null;
|
||||
} catch {
|
||||
entry.frozenResultText = null;
|
||||
}
|
||||
entry.frozenResultCapturedAt = Date.now();
|
||||
return true;
|
||||
}
|
||||
const subagentLifecycleController = createSubagentRegistryLifecycleController({
|
||||
runs: subagentRuns,
|
||||
resumedRuns,
|
||||
subagentAnnounceTimeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
|
||||
persist: persistSubagentRuns,
|
||||
clearPendingLifecycleError,
|
||||
countPendingDescendantRuns,
|
||||
suppressAnnounceForSteerRestart,
|
||||
shouldEmitEndedHookForRun,
|
||||
emitSubagentEndedHookForRun,
|
||||
notifyContextEngineSubagentEnded,
|
||||
resumeSubagentRun,
|
||||
warn: (message, meta) => log.warn(message, meta),
|
||||
});
|
||||
|
||||
function listPendingCompletionRunsForSession(sessionKey: string): SubagentRunRecord[] {
|
||||
const key = sessionKey.trim();
|
||||
if (!key) {
|
||||
return [];
|
||||
}
|
||||
const out: SubagentRunRecord[] = [];
|
||||
for (const entry of subagentRuns.values()) {
|
||||
if (entry.childSessionKey !== key) {
|
||||
continue;
|
||||
}
|
||||
if (entry.expectsCompletionMessage !== true) {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.cleanupCompletedAt === "number") {
|
||||
continue;
|
||||
}
|
||||
out.push(entry);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
async function refreshFrozenResultFromSession(sessionKey: string): Promise<boolean> {
|
||||
const candidates = listPendingCompletionRunsForSession(sessionKey);
|
||||
if (candidates.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let captured: string | undefined;
|
||||
try {
|
||||
captured = await captureSubagentCompletionReply(sessionKey);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
const trimmed = captured?.trim();
|
||||
if (!trimmed || isSilentReplyText(trimmed, SILENT_REPLY_TOKEN)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const nextFrozen = capFrozenResultText(trimmed);
|
||||
const capturedAt = Date.now();
|
||||
let changed = false;
|
||||
for (const entry of candidates) {
|
||||
if (entry.frozenResultText === nextFrozen) {
|
||||
continue;
|
||||
}
|
||||
entry.frozenResultText = nextFrozen;
|
||||
entry.frozenResultCapturedAt = capturedAt;
|
||||
changed = true;
|
||||
}
|
||||
if (changed) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
async function completeSubagentRun(params: {
|
||||
runId: string;
|
||||
endedAt?: number;
|
||||
outcome: SubagentRunOutcome;
|
||||
reason: SubagentLifecycleEndedReason;
|
||||
sendFarewell?: boolean;
|
||||
accountId?: string;
|
||||
triggerCleanup: boolean;
|
||||
}) {
|
||||
clearPendingLifecycleError(params.runId);
|
||||
const entry = subagentRuns.get(params.runId);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
let mutated = false;
|
||||
// If a late lifecycle completion arrives after an earlier kill marker, allow
|
||||
// completion cleanup/announce to run instead of staying permanently suppressed.
|
||||
if (
|
||||
params.reason === SUBAGENT_ENDED_REASON_COMPLETE &&
|
||||
entry.suppressAnnounceReason === "killed" &&
|
||||
(entry.cleanupHandled || typeof entry.cleanupCompletedAt === "number")
|
||||
) {
|
||||
entry.suppressAnnounceReason = undefined;
|
||||
entry.cleanupHandled = false;
|
||||
entry.cleanupCompletedAt = undefined;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const endedAt = typeof params.endedAt === "number" ? params.endedAt : Date.now();
|
||||
if (entry.endedAt !== endedAt) {
|
||||
entry.endedAt = endedAt;
|
||||
mutated = true;
|
||||
}
|
||||
if (!runOutcomesEqual(entry.outcome, params.outcome)) {
|
||||
entry.outcome = params.outcome;
|
||||
mutated = true;
|
||||
}
|
||||
if (entry.endedReason !== params.reason) {
|
||||
entry.endedReason = params.reason;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (await freezeRunResultAtCompletion(entry)) {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
|
||||
try {
|
||||
await persistSubagentSessionTiming(entry);
|
||||
} catch (err) {
|
||||
log.warn("failed to persist subagent session timing", {
|
||||
err,
|
||||
runId: entry.runId,
|
||||
childSessionKey: entry.childSessionKey,
|
||||
});
|
||||
}
|
||||
|
||||
const suppressedForSteerRestart = suppressAnnounceForSteerRestart(entry);
|
||||
if (mutated && !suppressedForSteerRestart) {
|
||||
// The gateway also emits sessions.changed directly from raw lifecycle
|
||||
// events, but for subagent sessions the visible status comes from this
|
||||
// registry. When a restarted follow-up run ends, the raw lifecycle `end`
|
||||
// event can reach websocket subscribers before this registry records
|
||||
// endedAt/outcome, leaving the dashboard stuck on the stale "running"
|
||||
// snapshot. Emit a follow-up lifecycle change after persisting the
|
||||
// registry update so subscribers receive the authoritative completed
|
||||
// status.
|
||||
emitSessionLifecycleEvent({
|
||||
sessionKey: entry.childSessionKey,
|
||||
reason: "subagent-status",
|
||||
parentSessionKey: entry.requesterSessionKey,
|
||||
label: entry.label,
|
||||
});
|
||||
}
|
||||
const shouldEmitEndedHook =
|
||||
!suppressedForSteerRestart &&
|
||||
shouldEmitEndedHookForRun({
|
||||
entry,
|
||||
reason: params.reason,
|
||||
});
|
||||
const shouldDeferEndedHook =
|
||||
shouldEmitEndedHook &&
|
||||
params.triggerCleanup &&
|
||||
entry.expectsCompletionMessage === true &&
|
||||
!suppressedForSteerRestart;
|
||||
if (!shouldDeferEndedHook && shouldEmitEndedHook) {
|
||||
await emitSubagentEndedHookForRun({
|
||||
entry,
|
||||
reason: params.reason,
|
||||
sendFarewell: params.sendFarewell,
|
||||
accountId: params.accountId,
|
||||
});
|
||||
}
|
||||
|
||||
if (!params.triggerCleanup) {
|
||||
return;
|
||||
}
|
||||
if (suppressedForSteerRestart) {
|
||||
return;
|
||||
}
|
||||
startSubagentAnnounceCleanupFlow(params.runId, entry);
|
||||
}
|
||||
|
||||
function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecord): boolean {
|
||||
if (!beginSubagentCleanup(runId)) {
|
||||
return false;
|
||||
}
|
||||
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
|
||||
const finalizeAnnounceCleanup = (didAnnounce: boolean) => {
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce).catch((err) => {
|
||||
defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`);
|
||||
const current = subagentRuns.get(runId);
|
||||
if (!current || current.cleanupCompletedAt) {
|
||||
return;
|
||||
}
|
||||
current.cleanupHandled = false;
|
||||
persistSubagentRuns();
|
||||
});
|
||||
};
|
||||
|
||||
void runSubagentAnnounceFlow({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
childRunId: entry.runId,
|
||||
requesterSessionKey: entry.requesterSessionKey,
|
||||
requesterOrigin,
|
||||
requesterDisplayKey: entry.requesterDisplayKey,
|
||||
task: entry.task,
|
||||
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
|
||||
cleanup: entry.cleanup,
|
||||
roundOneReply: entry.frozenResultText ?? undefined,
|
||||
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
|
||||
waitForCompletion: false,
|
||||
startedAt: entry.startedAt,
|
||||
endedAt: entry.endedAt,
|
||||
label: entry.label,
|
||||
outcome: entry.outcome,
|
||||
spawnMode: entry.spawnMode,
|
||||
expectsCompletionMessage: entry.expectsCompletionMessage,
|
||||
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
|
||||
})
|
||||
.then((didAnnounce) => {
|
||||
finalizeAnnounceCleanup(didAnnounce);
|
||||
})
|
||||
.catch((error) => {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`,
|
||||
);
|
||||
finalizeAnnounceCleanup(false);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
const {
|
||||
completeCleanupBookkeeping,
|
||||
completeSubagentRun,
|
||||
finalizeResumedAnnounceGiveUp,
|
||||
refreshFrozenResultFromSession,
|
||||
startSubagentAnnounceCleanupFlow,
|
||||
} = subagentLifecycleController;
|
||||
|
||||
function resumeSubagentRun(runId: string) {
|
||||
if (!runId || resumedRuns.has(runId)) {
|
||||
@@ -716,174 +493,6 @@ function ensureListener() {
|
||||
});
|
||||
}
|
||||
|
||||
async function finalizeSubagentCleanup(
|
||||
runId: string,
|
||||
cleanup: "delete" | "keep",
|
||||
didAnnounce: boolean,
|
||||
) {
|
||||
const entry = subagentRuns.get(runId);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
if (didAnnounce) {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
entry.fallbackFrozenResultText = undefined;
|
||||
entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const completionReason = resolveCleanupCompletionReason(entry);
|
||||
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
||||
// Clean up attachments before the run record is removed.
|
||||
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
if (cleanup === "delete") {
|
||||
entry.frozenResultText = undefined;
|
||||
entry.frozenResultCapturedAt = undefined;
|
||||
}
|
||||
completeCleanupBookkeeping({
|
||||
runId,
|
||||
entry,
|
||||
cleanup,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const deferredDecision = resolveDeferredCleanupDecision({
|
||||
entry,
|
||||
now,
|
||||
// Defer until descendants are fully settled, including post-end cleanup.
|
||||
activeDescendantRuns: Math.max(0, countPendingDescendantRuns(entry.childSessionKey)),
|
||||
announceExpiryMs: ANNOUNCE_EXPIRY_MS,
|
||||
announceCompletionHardExpiryMs: ANNOUNCE_COMPLETION_HARD_EXPIRY_MS,
|
||||
maxAnnounceRetryCount: MAX_ANNOUNCE_RETRY_COUNT,
|
||||
deferDescendantDelayMs: MIN_ANNOUNCE_RETRY_DELAY_MS,
|
||||
resolveAnnounceRetryDelayMs,
|
||||
});
|
||||
|
||||
if (deferredDecision.kind === "defer-descendants") {
|
||||
entry.lastAnnounceRetryAt = now;
|
||||
entry.wakeOnDescendantSettle = true;
|
||||
entry.cleanupHandled = false;
|
||||
resumedRuns.delete(runId);
|
||||
persistSubagentRuns();
|
||||
setTimeout(() => {
|
||||
resumeSubagentRun(runId);
|
||||
}, deferredDecision.delayMs).unref?.();
|
||||
return;
|
||||
}
|
||||
|
||||
if (deferredDecision.retryCount != null) {
|
||||
entry.announceRetryCount = deferredDecision.retryCount;
|
||||
entry.lastAnnounceRetryAt = now;
|
||||
}
|
||||
|
||||
if (deferredDecision.kind === "give-up") {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
entry.fallbackFrozenResultText = undefined;
|
||||
entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
const completionReason = resolveCleanupCompletionReason(entry);
|
||||
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
||||
logAnnounceGiveUp(entry, deferredDecision.reason);
|
||||
completeCleanupBookkeeping({
|
||||
runId,
|
||||
entry,
|
||||
cleanup,
|
||||
completedAt: now,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Keep both cleanup modes retryable after deferred/failed announce.
|
||||
// Delete-mode is finalized only after announce succeeds or give-up triggers.
|
||||
entry.cleanupHandled = false;
|
||||
// Clear the in-flight resume marker so the scheduled retry can run again.
|
||||
resumedRuns.delete(runId);
|
||||
persistSubagentRuns();
|
||||
if (deferredDecision.resumeDelayMs == null) {
|
||||
return;
|
||||
}
|
||||
setTimeout(() => {
|
||||
resumeSubagentRun(runId);
|
||||
}, deferredDecision.resumeDelayMs).unref?.();
|
||||
}
|
||||
|
||||
async function finalizeResumedAnnounceGiveUp(params: {
|
||||
runId: string;
|
||||
entry: SubagentRunRecord;
|
||||
reason: "retry-limit" | "expiry";
|
||||
}) {
|
||||
params.entry.wakeOnDescendantSettle = undefined;
|
||||
params.entry.fallbackFrozenResultText = undefined;
|
||||
params.entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const shouldDeleteAttachments =
|
||||
params.entry.cleanup === "delete" || !params.entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(params.entry);
|
||||
}
|
||||
const completionReason = resolveCleanupCompletionReason(params.entry);
|
||||
await emitCompletionEndedHookIfNeeded(params.entry, completionReason);
|
||||
logAnnounceGiveUp(params.entry, params.reason);
|
||||
completeCleanupBookkeeping({
|
||||
runId: params.runId,
|
||||
entry: params.entry,
|
||||
cleanup: params.entry.cleanup,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
async function emitCompletionEndedHookIfNeeded(
|
||||
entry: SubagentRunRecord,
|
||||
reason: SubagentLifecycleEndedReason,
|
||||
) {
|
||||
if (
|
||||
entry.expectsCompletionMessage === true &&
|
||||
shouldEmitEndedHookForRun({
|
||||
entry,
|
||||
reason,
|
||||
})
|
||||
) {
|
||||
await emitSubagentEndedHookForRun({
|
||||
entry,
|
||||
reason,
|
||||
sendFarewell: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function completeCleanupBookkeeping(params: {
|
||||
runId: string;
|
||||
entry: SubagentRunRecord;
|
||||
cleanup: "delete" | "keep";
|
||||
completedAt: number;
|
||||
}) {
|
||||
if (params.cleanup === "delete") {
|
||||
clearPendingLifecycleError(params.runId);
|
||||
void notifyContextEngineSubagentEnded({
|
||||
childSessionKey: params.entry.childSessionKey,
|
||||
reason: "deleted",
|
||||
workspaceDir: params.entry.workspaceDir,
|
||||
});
|
||||
subagentRuns.delete(params.runId);
|
||||
persistSubagentRuns();
|
||||
retryDeferredCompletedAnnounces(params.runId);
|
||||
return;
|
||||
}
|
||||
void notifyContextEngineSubagentEnded({
|
||||
childSessionKey: params.entry.childSessionKey,
|
||||
reason: "completed",
|
||||
workspaceDir: params.entry.workspaceDir,
|
||||
});
|
||||
params.entry.cleanupCompletedAt = params.completedAt;
|
||||
persistSubagentRuns();
|
||||
retryDeferredCompletedAnnounces(params.runId);
|
||||
}
|
||||
|
||||
const subagentRunManager = createSubagentRunManager({
|
||||
runs: subagentRuns,
|
||||
resumedRuns,
|
||||
@@ -900,66 +509,6 @@ const subagentRunManager = createSubagentRunManager({
|
||||
completeSubagentRun,
|
||||
});
|
||||
|
||||
function retryDeferredCompletedAnnounces(excludeRunId?: string) {
|
||||
const now = Date.now();
|
||||
for (const [runId, entry] of subagentRuns.entries()) {
|
||||
if (excludeRunId && runId === excludeRunId) {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
continue;
|
||||
}
|
||||
if (entry.cleanupCompletedAt || entry.cleanupHandled) {
|
||||
continue;
|
||||
}
|
||||
if (suppressAnnounceForSteerRestart(entry)) {
|
||||
continue;
|
||||
}
|
||||
// Force-expire stale non-completion announces; completion-message flows can
|
||||
// stay pending while descendants run for a long time.
|
||||
const endedAgo = now - (entry.endedAt ?? now);
|
||||
if (entry.expectsCompletionMessage !== true && endedAgo > ANNOUNCE_EXPIRY_MS) {
|
||||
if (!beginSubagentCleanup(runId)) {
|
||||
continue;
|
||||
}
|
||||
void finalizeResumedAnnounceGiveUp({
|
||||
runId,
|
||||
entry,
|
||||
reason: "expiry",
|
||||
}).catch((error) => {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent expiry finalize failed during deferred retry for run ${runId}: ${String(error)}`,
|
||||
);
|
||||
const current = subagentRuns.get(runId);
|
||||
if (!current || current.cleanupCompletedAt) {
|
||||
return;
|
||||
}
|
||||
current.cleanupHandled = false;
|
||||
persistSubagentRuns();
|
||||
});
|
||||
continue;
|
||||
}
|
||||
resumedRuns.delete(runId);
|
||||
resumeSubagentRun(runId);
|
||||
}
|
||||
}
|
||||
|
||||
function beginSubagentCleanup(runId: string) {
|
||||
const entry = subagentRuns.get(runId);
|
||||
if (!entry) {
|
||||
return false;
|
||||
}
|
||||
if (entry.cleanupCompletedAt) {
|
||||
return false;
|
||||
}
|
||||
if (entry.cleanupHandled) {
|
||||
return false;
|
||||
}
|
||||
entry.cleanupHandled = true;
|
||||
persistSubagentRuns();
|
||||
return true;
|
||||
}
|
||||
|
||||
export function markSubagentRunForSteerRestart(runId: string) {
|
||||
return subagentRunManager.markSubagentRunForSteerRestart(runId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user