mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-21 22:21:33 +00:00
fix(subagents): deliver announces immediately and guard parallel descendants
This commit is contained in:
@@ -438,7 +438,7 @@ describe("subagent announce formatting", () => {
|
||||
expect(msg).not.toContain("✅ Subagent");
|
||||
});
|
||||
|
||||
it("keeps direct completion announce delivery when only the announcing run itself is pending", async () => {
|
||||
it("keeps direct completion announce delivery immediate even when sibling counters are non-zero", async () => {
|
||||
sessionStore = {
|
||||
"agent:main:subagent:test": {
|
||||
sessionId: "child-session-self-pending",
|
||||
@@ -451,11 +451,11 @@ describe("subagent announce formatting", () => {
|
||||
messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: done" }] }],
|
||||
});
|
||||
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||
sessionKey === "agent:main:main" ? 1 : 0,
|
||||
sessionKey === "agent:main:main" ? 2 : 0,
|
||||
);
|
||||
subagentRegistryMock.countPendingDescendantRunsExcludingRun.mockImplementation(
|
||||
(sessionKey: string, runId: string) =>
|
||||
sessionKey === "agent:main:main" && runId === "run-direct-self-pending" ? 0 : 1,
|
||||
sessionKey === "agent:main:main" && runId === "run-direct-self-pending" ? 1 : 2,
|
||||
);
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
@@ -469,12 +469,12 @@ describe("subagent announce formatting", () => {
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(true);
|
||||
expect(subagentRegistryMock.countPendingDescendantRunsExcludingRun).toHaveBeenCalledWith(
|
||||
"agent:main:main",
|
||||
"run-direct-self-pending",
|
||||
);
|
||||
expect(sendSpy).not.toHaveBeenCalled();
|
||||
expect(agentSpy).toHaveBeenCalledTimes(1);
|
||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
|
||||
expect(call?.params?.deliver).toBe(true);
|
||||
expect(call?.params?.channel).toBe("discord");
|
||||
expect(call?.params?.to).toBe("channel:12345");
|
||||
});
|
||||
|
||||
it("suppresses completion delivery when subagent reply is ANNOUNCE_SKIP", async () => {
|
||||
@@ -590,7 +590,7 @@ describe("subagent announce formatting", () => {
|
||||
expect(sendSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps completion-mode delivery coordinated when sibling runs are still active", async () => {
|
||||
it("delivers completion-mode announces immediately even when sibling runs are still active", async () => {
|
||||
sessionStore = {
|
||||
"agent:main:subagent:test": {
|
||||
sessionId: "child-session-coordinated",
|
||||
@@ -622,13 +622,11 @@ describe("subagent announce formatting", () => {
|
||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
|
||||
const rawMessage = call?.params?.message;
|
||||
const msg = typeof rawMessage === "string" ? rawMessage : "";
|
||||
expect(call?.params?.deliver).toBe(false);
|
||||
expect(call?.params?.channel).toBeUndefined();
|
||||
expect(call?.params?.to).toBeUndefined();
|
||||
expect(msg).toContain("There are still 1 active subagent run for this session.");
|
||||
expect(msg).toContain(
|
||||
"If they are part of the same workflow, wait for the remaining results before sending a user update.",
|
||||
);
|
||||
expect(call?.params?.deliver).toBe(true);
|
||||
expect(call?.params?.channel).toBe("discord");
|
||||
expect(call?.params?.to).toBe("channel:12345");
|
||||
expect(msg).not.toContain("There are still");
|
||||
expect(msg).not.toContain("wait for the remaining results");
|
||||
});
|
||||
|
||||
it("keeps session-mode completion delivery on the bound destination when sibling runs are active", async () => {
|
||||
@@ -1660,7 +1658,7 @@ describe("subagent announce formatting", () => {
|
||||
expect(call?.expectFinal).toBe(true);
|
||||
});
|
||||
|
||||
it("injects direct announce into requester subagent session instead of chat channel", async () => {
|
||||
it("injects direct announce into requester subagent session as a user-turn agent call", async () => {
|
||||
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
|
||||
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
|
||||
|
||||
@@ -1679,6 +1677,7 @@ describe("subagent announce formatting", () => {
|
||||
expect(call?.params?.deliver).toBe(false);
|
||||
expect(call?.params?.channel).toBeUndefined();
|
||||
expect(call?.params?.to).toBeUndefined();
|
||||
expect((call?.params as { role?: unknown } | undefined)?.role).toBeUndefined();
|
||||
expect(call?.params?.inputProvenance).toMatchObject({
|
||||
kind: "inter_session",
|
||||
sourceSessionKey: "agent:main:subagent:worker",
|
||||
@@ -1753,7 +1752,7 @@ describe("subagent announce formatting", () => {
|
||||
expect(call?.params?.message).not.toContain("(no output)");
|
||||
});
|
||||
|
||||
it("uses advisory guidance when sibling subagents are still active", async () => {
|
||||
it("does not include batching guidance when sibling subagents are still active", async () => {
|
||||
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||
sessionKey === "agent:main:main" ? 2 : 0,
|
||||
);
|
||||
@@ -1768,11 +1767,46 @@ describe("subagent announce formatting", () => {
|
||||
|
||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
|
||||
const msg = call?.params?.message as string;
|
||||
expect(msg).toContain("There are still 2 active subagent runs for this session.");
|
||||
expect(msg).toContain(
|
||||
"If they are part of the same workflow, wait for the remaining results before sending a user update.",
|
||||
expect(msg).not.toContain("There are still");
|
||||
expect(msg).not.toContain("wait for the remaining results");
|
||||
expect(msg).not.toContain(
|
||||
"If they are unrelated, respond normally using only the result above.",
|
||||
);
|
||||
expect(msg).toContain("If they are unrelated, respond normally using only the result above.");
|
||||
});
|
||||
|
||||
it("defers nested parent announce when active descendants exist even if pending snapshot is stale", async () => {
|
||||
subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0);
|
||||
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
|
||||
);
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:parent",
|
||||
childRunId: "run-parent-active-descendant",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
...defaultOutcomeAnnounce,
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(false);
|
||||
expect(agentSpy).not.toHaveBeenCalled();
|
||||
expect(sendSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps single subagent announces self contained without batching hints", async () => {
|
||||
await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
childRunId: "run-self-contained",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
...defaultOutcomeAnnounce,
|
||||
});
|
||||
|
||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
|
||||
const msg = call?.params?.message as string;
|
||||
expect(msg).not.toContain("There are still");
|
||||
expect(msg).not.toContain("wait for the remaining results");
|
||||
});
|
||||
|
||||
it("defers announce while finished runs still have active descendants", async () => {
|
||||
|
||||
@@ -770,38 +770,6 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
!params.requesterIsSubagent &&
|
||||
(!params.expectsCompletionMessage || hasDeliverableDirectTarget);
|
||||
|
||||
if (params.expectsCompletionMessage && hasDeliverableDirectTarget) {
|
||||
const forceBoundSessionDirectDelivery =
|
||||
params.spawnMode === "session" &&
|
||||
(params.completionRouteMode === "bound" || params.completionRouteMode === "hook");
|
||||
if (!forceBoundSessionDirectDelivery) {
|
||||
let pendingDescendantRuns = 0;
|
||||
try {
|
||||
const { countPendingDescendantRuns, countPendingDescendantRunsExcludingRun } =
|
||||
await loadSubagentRegistryRuntime();
|
||||
if (params.currentRunId) {
|
||||
pendingDescendantRuns = Math.max(
|
||||
0,
|
||||
countPendingDescendantRunsExcludingRun(
|
||||
canonicalRequesterSessionKey,
|
||||
params.currentRunId,
|
||||
),
|
||||
);
|
||||
} else {
|
||||
pendingDescendantRuns = Math.max(
|
||||
0,
|
||||
countPendingDescendantRuns(canonicalRequesterSessionKey),
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
// Best-effort only; default to immediate delivery when registry runtime is unavailable.
|
||||
}
|
||||
if (pendingDescendantRuns > 0) {
|
||||
shouldDeliverExternally = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const threadId =
|
||||
effectiveDirectOrigin?.threadId != null && effectiveDirectOrigin.threadId !== ""
|
||||
? String(effectiveDirectOrigin.threadId)
|
||||
@@ -1044,15 +1012,10 @@ export type SubagentRunOutcome = {
|
||||
export type SubagentAnnounceType = "subagent task" | "cron job";
|
||||
|
||||
function buildAnnounceReplyInstruction(params: {
|
||||
remainingActiveSubagentRuns: number;
|
||||
requesterIsSubagent: boolean;
|
||||
announceType: SubagentAnnounceType;
|
||||
expectsCompletionMessage?: boolean;
|
||||
}): string {
|
||||
if (params.remainingActiveSubagentRuns > 0) {
|
||||
const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs";
|
||||
return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`;
|
||||
}
|
||||
if (params.requesterIsSubagent) {
|
||||
return `Convert this completion into a concise internal orchestration update for your parent agent in your own words. Keep this internal context private (don't mention system/log/stats/session details or announce type). If this result is duplicate or no update is needed, reply ONLY: ${SILENT_REPLY_TOKEN}.`;
|
||||
}
|
||||
@@ -1193,8 +1156,11 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
|
||||
let pendingChildDescendantRuns = 0;
|
||||
try {
|
||||
const { countPendingDescendantRuns } = await loadSubagentRegistryRuntime();
|
||||
pendingChildDescendantRuns = Math.max(0, countPendingDescendantRuns(params.childSessionKey));
|
||||
const { countPendingDescendantRuns, countActiveDescendantRuns } =
|
||||
await loadSubagentRegistryRuntime();
|
||||
const pending = Math.max(0, countPendingDescendantRuns(params.childSessionKey));
|
||||
const active = Math.max(0, countActiveDescendantRuns(params.childSessionKey));
|
||||
pendingChildDescendantRuns = Math.max(pending, active);
|
||||
} catch {
|
||||
// Best-effort only; fall back to direct announce behavior when unavailable.
|
||||
}
|
||||
@@ -1279,18 +1245,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
}
|
||||
}
|
||||
|
||||
let remainingActiveSubagentRuns = 0;
|
||||
try {
|
||||
const { countActiveDescendantRuns } = await loadSubagentRegistryRuntime();
|
||||
remainingActiveSubagentRuns = Math.max(
|
||||
0,
|
||||
countActiveDescendantRuns(targetRequesterSessionKey),
|
||||
);
|
||||
} catch {
|
||||
// Best-effort only; fall back to default announce instructions when unavailable.
|
||||
}
|
||||
const replyInstruction = buildAnnounceReplyInstruction({
|
||||
remainingActiveSubagentRuns,
|
||||
requesterIsSubagent,
|
||||
announceType,
|
||||
expectsCompletionMessage,
|
||||
|
||||
@@ -212,6 +212,82 @@ describe("subagent registry nested agent tracking", () => {
|
||||
expect(countPendingDescendantRuns("agent:main:subagent:orch-pending")).toBe(1);
|
||||
});
|
||||
|
||||
it("keeps parent pending for parallel children until both descendants complete cleanup", async () => {
|
||||
const { addSubagentRunForTests, countPendingDescendantRuns } = subagentRegistry;
|
||||
const parentSessionKey = "agent:main:subagent:orch-parallel";
|
||||
|
||||
addSubagentRunForTests({
|
||||
runId: "run-parent-parallel",
|
||||
childSessionKey: parentSessionKey,
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "parallel orchestrator",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
endedAt: 2,
|
||||
cleanupHandled: false,
|
||||
cleanupCompletedAt: undefined,
|
||||
});
|
||||
addSubagentRunForTests({
|
||||
runId: "run-leaf-a",
|
||||
childSessionKey: `${parentSessionKey}:subagent:leaf-a`,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
requesterDisplayKey: "orch-parallel",
|
||||
task: "leaf a",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
endedAt: 2,
|
||||
cleanupHandled: true,
|
||||
cleanupCompletedAt: undefined,
|
||||
});
|
||||
addSubagentRunForTests({
|
||||
runId: "run-leaf-b",
|
||||
childSessionKey: `${parentSessionKey}:subagent:leaf-b`,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
requesterDisplayKey: "orch-parallel",
|
||||
task: "leaf b",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
cleanupHandled: false,
|
||||
cleanupCompletedAt: undefined,
|
||||
});
|
||||
|
||||
expect(countPendingDescendantRuns(parentSessionKey)).toBe(2);
|
||||
|
||||
addSubagentRunForTests({
|
||||
runId: "run-leaf-a",
|
||||
childSessionKey: `${parentSessionKey}:subagent:leaf-a`,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
requesterDisplayKey: "orch-parallel",
|
||||
task: "leaf a",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
endedAt: 2,
|
||||
cleanupHandled: true,
|
||||
cleanupCompletedAt: 3,
|
||||
});
|
||||
expect(countPendingDescendantRuns(parentSessionKey)).toBe(1);
|
||||
|
||||
addSubagentRunForTests({
|
||||
runId: "run-leaf-b",
|
||||
childSessionKey: `${parentSessionKey}:subagent:leaf-b`,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
requesterDisplayKey: "orch-parallel",
|
||||
task: "leaf b",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
endedAt: 4,
|
||||
cleanupHandled: true,
|
||||
cleanupCompletedAt: 5,
|
||||
});
|
||||
expect(countPendingDescendantRuns(parentSessionKey)).toBe(0);
|
||||
});
|
||||
|
||||
it("countPendingDescendantRunsExcludingRun ignores only the active announce run", async () => {
|
||||
const { addSubagentRunForTests, countPendingDescendantRunsExcludingRun } = subagentRegistry;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user