fix(agents): Wake active requester sessions for subagent completions while keeping dormant sessions externally deliverable (#62963)

Route subagent completion announces through embedded-run wake for active requesters, preserve external delivery for dormant ones
This commit is contained in:
Bek
2026-04-21 18:13:53 -04:00
committed by GitHub
parent 14dcbd4044
commit 0e1d324dd8
4 changed files with 256 additions and 11 deletions

View File

@@ -11,4 +11,8 @@ export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-
export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
export { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
export { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded-runner/runs.js";
export {
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
resolveActiveEmbeddedRunSessionId,
} from "./pi-embedded-runner/runs.js";

View File

@@ -1,6 +1,12 @@
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import { __testing, deliverSubagentAnnouncement } from "./subagent-announce-delivery.js";
import { callGateway as runtimeCallGateway } from "./subagent-announce-delivery.runtime.js";
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
afterEach(() => {
__testing.setDepsForTest();
});
describe("resolveAnnounceOrigin telegram forum topics", () => {
it("preserves stored forum topic thread ids when requester origin omits one for the same chat", () => {
expect(
@@ -61,3 +67,191 @@ describe("resolveAnnounceOrigin telegram forum topics", () => {
});
});
});
describe("deliverSubagentAnnouncement completion delivery", () => {
it("keeps completion announces session-internal while preserving route context for active requesters", async () => {
const callGateway = vi.fn(
async () => ({}) as Record<string, unknown>,
) as unknown as typeof runtimeCallGateway;
const queueEmbeddedPiMessage = vi.fn(() => true);
__testing.setDepsForTest({
callGateway,
getRequesterSessionActivity: () => ({
sessionId: "requester-session-1",
isActive: true,
}),
loadConfig: () => ({}) as never,
queueEmbeddedPiMessage,
});
const result = await deliverSubagentAnnouncement({
requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
triggerMessage: "child done",
steerMessage: "child done",
requesterOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
requesterSessionOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
completionDirectOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
directOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
requesterIsSubagent: false,
expectsCompletionMessage: true,
bestEffortDeliver: true,
directIdempotencyKey: "announce-1",
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "steered",
}),
);
expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-1", "child done");
expect(callGateway).not.toHaveBeenCalled();
});
it("keeps direct external delivery for dormant completion requesters", async () => {
const callGateway = vi.fn(
async () => ({}) as Record<string, unknown>,
) as unknown as typeof runtimeCallGateway;
__testing.setDepsForTest({
callGateway,
getRequesterSessionActivity: () => ({
sessionId: "requester-session-2",
isActive: false,
}),
loadConfig: () => ({}) as never,
});
await deliverSubagentAnnouncement({
requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
triggerMessage: "child done",
steerMessage: "child done",
requesterOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
requesterSessionOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
completionDirectOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
directOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
requesterIsSubagent: false,
expectsCompletionMessage: true,
bestEffortDeliver: true,
directIdempotencyKey: "announce-1b",
});
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "agent",
params: expect.objectContaining({
deliver: true,
channel: "slack",
accountId: "acct-1",
to: "channel:C123",
threadId: "171.222",
bestEffortDeliver: true,
}),
}),
);
});
it("keeps direct external delivery for non-completion announces", async () => {
const callGateway = vi.fn(
async () => ({}) as Record<string, unknown>,
) as unknown as typeof runtimeCallGateway;
__testing.setDepsForTest({
callGateway,
getRequesterSessionActivity: () => ({
sessionId: "requester-session-3",
isActive: false,
}),
loadConfig: () => ({}) as never,
});
await deliverSubagentAnnouncement({
requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
triggerMessage: "child done",
steerMessage: "child done",
requesterOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
requesterSessionOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
completionDirectOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
directOrigin: {
channel: "slack",
to: "channel:C123",
accountId: "acct-1",
threadId: "171.222",
},
requesterIsSubagent: false,
expectsCompletionMessage: false,
bestEffortDeliver: true,
directIdempotencyKey: "announce-2",
});
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "agent",
params: expect.objectContaining({
deliver: true,
channel: "slack",
accountId: "acct-1",
to: "channel:C123",
threadId: "171.222",
bestEffortDeliver: true,
}),
}),
);
});
});

View File

@@ -25,6 +25,7 @@ import {
loadConfig,
loadSessionStore,
queueEmbeddedPiMessage,
resolveActiveEmbeddedRunSessionId,
resolveAgentIdFromSessionKey,
resolveConversationIdFromTargets,
resolveExternalBestEffortDeliveryTarget,
@@ -49,16 +50,44 @@ const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
type SubagentAnnounceDeliveryDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
getRequesterSessionActivity: (requesterSessionKey: string) => {
sessionId?: string;
isActive: boolean;
};
queueEmbeddedPiMessage: typeof queueEmbeddedPiMessage;
};
const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
callGateway,
loadConfig,
getRequesterSessionActivity: (requesterSessionKey: string) => {
const sessionId =
resolveActiveEmbeddedRunSessionId(requesterSessionKey) ??
loadRequesterSessionEntry(requesterSessionKey).entry?.sessionId;
return {
sessionId,
isActive: Boolean(sessionId && isEmbeddedPiRunActive(sessionId)),
};
},
queueEmbeddedPiMessage,
};
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
defaultSubagentAnnounceDeliveryDeps;
function resolveRequesterSessionActivity(requesterSessionKey: string) {
const activity = subagentAnnounceDeliveryDeps.getRequesterSessionActivity(requesterSessionKey);
if (activity.sessionId || activity.isActive) {
return activity;
}
const { entry } = loadRequesterSessionEntry(requesterSessionKey);
const sessionId = entry?.sessionId;
return {
sessionId,
isActive: Boolean(sessionId && isEmbeddedPiRunActive(sessionId)),
};
}
function resolveDirectAnnounceTransientRetryDelaysMs() {
return process.env.OPENCLAW_TEST_FAST === "1"
? ([8, 16, 32] as const)
@@ -348,7 +377,7 @@ async function maybeQueueSubagentAnnounce(params: {
}
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
const sessionId = entry?.sessionId;
const { sessionId, isActive } = resolveRequesterSessionActivity(canonicalKey);
if (!sessionId) {
return "none";
}
@@ -358,11 +387,13 @@ async function maybeQueueSubagentAnnounce(params: {
channel: entry?.channel ?? entry?.lastChannel ?? entry?.origin?.provider,
sessionEntry: entry,
});
const isActive = isEmbeddedPiRunActive(sessionId);
const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog";
if (shouldSteer) {
const steered = queueEmbeddedPiMessage(sessionId, params.steerMessage);
const steered = subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
sessionId,
params.steerMessage,
);
if (steered) {
return "steered";
}
@@ -457,6 +488,26 @@ async function sendSubagentAnnounceDirectly(params: {
isGatewayMessageChannel(normalizedSessionOnlyOriginChannel)
? normalizedSessionOnlyOriginChannel
: undefined;
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
if (params.expectsCompletionMessage && requesterActivity.isActive) {
const woke = requesterActivity.sessionId
? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
requesterActivity.sessionId,
params.triggerMessage,
)
: false;
if (woke) {
return {
delivered: true,
path: "steered",
};
}
return {
delivered: false,
path: "direct",
error: "active requester session could not be woken",
};
}
if (params.signal?.aborted) {
return {
delivered: false,

View File

@@ -1702,7 +1702,7 @@ describe("subagent announce formatting", () => {
expect(new Set(idempotencyKeys).size).toBe(2);
});
it("prefers direct delivery first for completion-mode and then queues on direct failure", async () => {
it("falls back to queued follow-up delivery when an active completion wake cannot be injected", async () => {
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
sessionStore = {
@@ -1729,15 +1729,11 @@ describe("subagent announce formatting", () => {
expect(didAnnounce).toBe(true);
expect(sendSpy).not.toHaveBeenCalled();
expect(agentSpy).toHaveBeenCalledTimes(2);
expect(agentSpy).toHaveBeenCalledTimes(1);
expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({
method: "agent",
params: { sessionKey: "agent:main:main", channel: "whatsapp", to: "+1555", deliver: true },
});
expect(agentSpy.mock.calls[1]?.[0]).toMatchObject({
method: "agent",
params: { sessionKey: "agent:main:main" },
});
});
it("falls back to internal requester-session injection when completion route is missing", async () => {