mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
Wake settled subagent orchestrators by re-invoking parent run
This commit is contained in:
@@ -40,6 +40,9 @@ const subagentRegistryMock = {
|
||||
listSubagentRunsForRequester: vi.fn(
|
||||
(_sessionKey: string, _scope?: { requesterRunId?: string }) => [],
|
||||
),
|
||||
replaceSubagentRunAfterSteer: vi.fn(
|
||||
(_params: { previousRunId: string; nextRunId: string }) => true,
|
||||
),
|
||||
resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null),
|
||||
};
|
||||
const subagentDeliveryTargetHookMock = vi.fn(
|
||||
@@ -202,6 +205,7 @@ describe("subagent announce formatting", () => {
|
||||
subagentRegistryMock.countPendingDescendantRuns(sessionKey),
|
||||
);
|
||||
subagentRegistryMock.listSubagentRunsForRequester.mockClear().mockReturnValue([]);
|
||||
subagentRegistryMock.replaceSubagentRunAfterSteer.mockClear().mockReturnValue(true);
|
||||
subagentRegistryMock.resolveRequesterForChildSession.mockClear().mockReturnValue(null);
|
||||
hasSubagentDeliveryTargetHook = false;
|
||||
hookRunnerMock.hasHooks.mockClear();
|
||||
@@ -1991,6 +1995,84 @@ describe("subagent announce formatting", () => {
|
||||
expect(msg).not.toContain("placeholder waiting text that should be ignored");
|
||||
});
|
||||
|
||||
it("wakes an ended orchestrator run with settled child results before any upward announce", async () => {
|
||||
sessionStore = {
|
||||
"agent:main:subagent:parent": {
|
||||
sessionId: "session-parent",
|
||||
},
|
||||
};
|
||||
|
||||
subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0);
|
||||
subagentRegistryMock.listSubagentRunsForRequester.mockImplementation(
|
||||
(sessionKey: string, scope?: { requesterRunId?: string }) => {
|
||||
if (sessionKey !== "agent:main:subagent:parent") {
|
||||
return [];
|
||||
}
|
||||
if (scope?.requesterRunId !== "run-parent-phase-1") {
|
||||
return [];
|
||||
}
|
||||
return [
|
||||
{
|
||||
runId: "run-child-a",
|
||||
childSessionKey: "agent:main:subagent:parent:subagent:a",
|
||||
requesterSessionKey: "agent:main:subagent:parent",
|
||||
requesterDisplayKey: "parent",
|
||||
task: "child task a",
|
||||
label: "child-a",
|
||||
cleanup: "keep",
|
||||
createdAt: 10,
|
||||
endedAt: 20,
|
||||
cleanupCompletedAt: 21,
|
||||
frozenResultText: "result from child a",
|
||||
outcome: { status: "ok" },
|
||||
},
|
||||
{
|
||||
runId: "run-child-b",
|
||||
childSessionKey: "agent:main:subagent:parent:subagent:b",
|
||||
requesterSessionKey: "agent:main:subagent:parent",
|
||||
requesterDisplayKey: "parent",
|
||||
task: "child task b",
|
||||
label: "child-b",
|
||||
cleanup: "keep",
|
||||
createdAt: 11,
|
||||
endedAt: 21,
|
||||
cleanupCompletedAt: 22,
|
||||
frozenResultText: "result from child b",
|
||||
outcome: { status: "ok" },
|
||||
},
|
||||
];
|
||||
},
|
||||
);
|
||||
|
||||
agentSpy.mockResolvedValueOnce({ runId: "run-parent-phase-2", status: "ok" });
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:parent",
|
||||
childRunId: "run-parent-phase-1",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
...defaultOutcomeAnnounce,
|
||||
expectsCompletionMessage: true,
|
||||
wakeOnDescendantSettle: true,
|
||||
roundOneReply: "waiting for children",
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(true);
|
||||
expect(agentSpy).toHaveBeenCalledTimes(1);
|
||||
const call = agentSpy.mock.calls[0]?.[0] as {
|
||||
params?: { sessionKey?: string; message?: string };
|
||||
};
|
||||
expect(call?.params?.sessionKey).toBe("agent:main:subagent:parent");
|
||||
const message = call?.params?.message ?? "";
|
||||
expect(message).toContain("All pending descendants for that run have now settled");
|
||||
expect(message).toContain("result from child a");
|
||||
expect(message).toContain("result from child b");
|
||||
expect(subagentRegistryMock.replaceSubagentRunAfterSteer).toHaveBeenCalledWith({
|
||||
previousRunId: "run-parent-phase-1",
|
||||
nextRunId: "run-parent-phase-2",
|
||||
});
|
||||
});
|
||||
|
||||
it("nested completion chains re-check child then parent deterministically", async () => {
|
||||
const parentSessionKey = "agent:main:subagent:parent";
|
||||
const childSessionKey = "agent:main:subagent:parent:subagent:child";
|
||||
|
||||
@@ -1066,6 +1066,92 @@ function buildAnnounceSteerMessage(events: AgentInternalEvent[]): string {
|
||||
return rendered;
|
||||
}
|
||||
|
||||
function hasUsableSessionEntry(entry: unknown): boolean {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
return false;
|
||||
}
|
||||
const sessionId = (entry as { sessionId?: unknown }).sessionId;
|
||||
if (typeof sessionId === "string" && sessionId.trim() === "") {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function buildDescendantWakeMessage(params: { findings: string; taskLabel: string }): string {
|
||||
return [
|
||||
"[Subagent Context] Your prior run ended while waiting for descendant subagent completions.",
|
||||
"[Subagent Context] All pending descendants for that run have now settled.",
|
||||
"[Subagent Context] Continue your workflow using these results. Spawn more subagents if needed, otherwise send your final answer.",
|
||||
"",
|
||||
`Task: ${params.taskLabel}`,
|
||||
"",
|
||||
params.findings,
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
async function wakeSubagentRunAfterDescendants(params: {
|
||||
runId: string;
|
||||
childSessionKey: string;
|
||||
taskLabel: string;
|
||||
findings: string;
|
||||
announceId: string;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<boolean> {
|
||||
if (params.signal?.aborted) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const childEntry = loadSessionEntryByKey(params.childSessionKey);
|
||||
if (!hasUsableSessionEntry(childEntry)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const cfg = loadConfig();
|
||||
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
|
||||
const wakeMessage = buildDescendantWakeMessage({
|
||||
findings: params.findings,
|
||||
taskLabel: params.taskLabel,
|
||||
});
|
||||
|
||||
let wakeRunId = "";
|
||||
try {
|
||||
const wakeResponse = await runAnnounceDeliveryWithRetry<{ runId?: string }>({
|
||||
operation: "descendant wake agent call",
|
||||
signal: params.signal,
|
||||
run: async () =>
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: params.childSessionKey,
|
||||
message: wakeMessage,
|
||||
deliver: false,
|
||||
inputProvenance: {
|
||||
kind: "inter_session",
|
||||
sourceSessionKey: params.childSessionKey,
|
||||
sourceChannel: INTERNAL_MESSAGE_CHANNEL,
|
||||
sourceTool: "subagent_announce",
|
||||
},
|
||||
idempotencyKey: buildAnnounceIdempotencyKey(`${params.announceId}:wake`),
|
||||
},
|
||||
timeoutMs: announceTimeoutMs,
|
||||
}),
|
||||
});
|
||||
wakeRunId = typeof wakeResponse?.runId === "string" ? wakeResponse.runId.trim() : "";
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!wakeRunId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const { replaceSubagentRunAfterSteer } = await loadSubagentRegistryRuntime();
|
||||
return replaceSubagentRunAfterSteer({
|
||||
previousRunId: params.runId,
|
||||
nextRunId: wakeRunId,
|
||||
});
|
||||
}
|
||||
|
||||
export async function runSubagentAnnounceFlow(params: {
|
||||
childSessionKey: string;
|
||||
childRunId: string;
|
||||
@@ -1084,6 +1170,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
announceType?: SubagentAnnounceType;
|
||||
expectsCompletionMessage?: boolean;
|
||||
spawnMode?: SpawnSubagentMode;
|
||||
wakeOnDescendantSettle?: boolean;
|
||||
signal?: AbortSignal;
|
||||
bestEffortDeliver?: boolean;
|
||||
}): Promise<boolean> {
|
||||
@@ -1193,6 +1280,26 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
// Best-effort only; fall back to current-run reply extraction.
|
||||
}
|
||||
|
||||
const announceId = buildAnnounceIdFromChildRun({
|
||||
childSessionKey: params.childSessionKey,
|
||||
childRunId: params.childRunId,
|
||||
});
|
||||
|
||||
if (params.wakeOnDescendantSettle === true && childCompletionFindings?.trim()) {
|
||||
const woke = await wakeSubagentRunAfterDescendants({
|
||||
runId: params.childRunId,
|
||||
childSessionKey: params.childSessionKey,
|
||||
taskLabel: params.label || params.task || "task",
|
||||
findings: childCompletionFindings,
|
||||
announceId,
|
||||
signal: params.signal,
|
||||
});
|
||||
if (woke) {
|
||||
shouldDeleteChildSession = false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!childCompletionFindings) {
|
||||
if (!reply) {
|
||||
reply = await readLatestSubagentOutput(params.childSessionKey);
|
||||
@@ -1262,11 +1369,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
// Parent run has ended. Check if parent SESSION still exists.
|
||||
// If it does, the parent may be waiting for child results — inject there.
|
||||
const parentSessionEntry = loadSessionEntryByKey(targetRequesterSessionKey);
|
||||
const parentSessionId =
|
||||
typeof parentSessionEntry?.sessionId === "string"
|
||||
? parentSessionEntry.sessionId.trim()
|
||||
: "";
|
||||
const parentSessionAlive = Boolean(parentSessionId);
|
||||
const parentSessionAlive = hasUsableSessionEntry(parentSessionEntry);
|
||||
|
||||
if (!parentSessionAlive) {
|
||||
// Parent session is truly gone — fallback to grandparent
|
||||
@@ -1317,10 +1420,6 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
triggerMessage = buildAnnounceSteerMessage(internalEvents);
|
||||
steerMessage = triggerMessage;
|
||||
|
||||
const announceId = buildAnnounceIdFromChildRun({
|
||||
childSessionKey: params.childSessionKey,
|
||||
childRunId: params.childRunId,
|
||||
});
|
||||
// Send to the requester session. For nested subagents this is an internal
|
||||
// follow-up injection (deliver=false) so the orchestrator receives it.
|
||||
let directOrigin = targetRequesterOrigin;
|
||||
|
||||
@@ -4,6 +4,7 @@ export {
|
||||
countPendingDescendantRunsExcludingRun,
|
||||
isSubagentSessionRunActive,
|
||||
listSubagentRunsForRequester,
|
||||
replaceSubagentRunAfterSteer,
|
||||
resolveRequesterForChildSession,
|
||||
shouldIgnorePostCompletionAnnounceForSession,
|
||||
} from "./subagent-registry.js";
|
||||
|
||||
@@ -431,6 +431,7 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
|
||||
outcome: entry.outcome,
|
||||
spawnMode: entry.spawnMode,
|
||||
expectsCompletionMessage: entry.expectsCompletionMessage,
|
||||
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
|
||||
})
|
||||
.then((didAnnounce) => {
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
|
||||
@@ -725,6 +726,7 @@ async function finalizeSubagentCleanup(
|
||||
return;
|
||||
}
|
||||
if (didAnnounce) {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
const completionReason = resolveCleanupCompletionReason(entry);
|
||||
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
||||
// Clean up attachments before the run record is removed.
|
||||
@@ -756,6 +758,7 @@ async function finalizeSubagentCleanup(
|
||||
|
||||
if (deferredDecision.kind === "defer-descendants") {
|
||||
entry.lastAnnounceRetryAt = now;
|
||||
entry.wakeOnDescendantSettle = true;
|
||||
entry.cleanupHandled = false;
|
||||
resumedRuns.delete(runId);
|
||||
persistSubagentRuns();
|
||||
@@ -771,6 +774,7 @@ async function finalizeSubagentCleanup(
|
||||
}
|
||||
|
||||
if (deferredDecision.kind === "give-up") {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
@@ -964,6 +968,7 @@ export function replaceSubagentRunAfterSteer(params: {
|
||||
endedAt: undefined,
|
||||
endedReason: undefined,
|
||||
endedHookEmittedAt: undefined,
|
||||
wakeOnDescendantSettle: undefined,
|
||||
outcome: undefined,
|
||||
frozenResultText: undefined,
|
||||
frozenResultCapturedAt: undefined,
|
||||
@@ -1030,6 +1035,7 @@ export function registerSubagentRun(params: {
|
||||
startedAt: now,
|
||||
archiveAtMs,
|
||||
cleanupHandled: false,
|
||||
wakeOnDescendantSettle: undefined,
|
||||
attachmentsDir: params.attachmentsDir,
|
||||
attachmentsRootDir: params.attachmentsRootDir,
|
||||
retainAttachmentsOnKeep: params.retainAttachmentsOnKeep,
|
||||
|
||||
@@ -30,6 +30,8 @@ export type SubagentRunRecord = {
|
||||
lastAnnounceRetryAt?: number;
|
||||
/** Terminal lifecycle reason recorded when the run finishes. */
|
||||
endedReason?: SubagentLifecycleEndedReason;
|
||||
/** Run ended while descendants were still pending and should be re-invoked once they settle. */
|
||||
wakeOnDescendantSettle?: boolean;
|
||||
/** Frozen completion output captured when the run first transitions to ended state. */
|
||||
frozenResultText?: string | null;
|
||||
/** Timestamp when frozenResultText was captured and locked. */
|
||||
|
||||
Reference in New Issue
Block a user