diff --git a/src/agents/subagent-announce-delivery.runtime.ts b/src/agents/subagent-announce-delivery.runtime.ts index fc4773c76eb..cc99f692215 100644 --- a/src/agents/subagent-announce-delivery.runtime.ts +++ b/src/agents/subagent-announce-delivery.runtime.ts @@ -11,4 +11,8 @@ export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best- export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js"; export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; export { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; -export { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded-runner/runs.js"; +export { + isEmbeddedPiRunActive, + queueEmbeddedPiMessage, + resolveActiveEmbeddedRunSessionId, +} from "./pi-embedded-runner/runs.js"; diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 81cdbaf8c77..76312b675a7 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -1,6 +1,12 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { __testing, deliverSubagentAnnouncement } from "./subagent-announce-delivery.js"; +import { callGateway as runtimeCallGateway } from "./subagent-announce-delivery.runtime.js"; import { resolveAnnounceOrigin } from "./subagent-announce-origin.js"; +afterEach(() => { + __testing.setDepsForTest(); +}); + describe("resolveAnnounceOrigin telegram forum topics", () => { it("preserves stored forum topic thread ids when requester origin omits one for the same chat", () => { expect( @@ -61,3 +67,191 @@ describe("resolveAnnounceOrigin telegram forum topics", () => { }); }); }); + +describe("deliverSubagentAnnouncement completion delivery", () => { + it("keeps completion announces session-internal while preserving route context for active requesters", async () => { + const callGateway = vi.fn( + async () => ({}) as Record, + ) as unknown as typeof runtimeCallGateway; + const queueEmbeddedPiMessage = vi.fn(() => true); + __testing.setDepsForTest({ + callGateway, + getRequesterSessionActivity: () => ({ + sessionId: "requester-session-1", + isActive: true, + }), + loadConfig: () => ({}) as never, + queueEmbeddedPiMessage, + }); + + const result = await deliverSubagentAnnouncement({ + requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + triggerMessage: "child done", + steerMessage: "child done", + requesterOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + requesterSessionOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + completionDirectOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + directOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + requesterIsSubagent: false, + expectsCompletionMessage: true, + bestEffortDeliver: true, + directIdempotencyKey: "announce-1", + }); + + expect(result).toEqual( + expect.objectContaining({ + delivered: true, + path: "steered", + }), + ); + expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-1", "child done"); + expect(callGateway).not.toHaveBeenCalled(); + }); + + it("keeps direct external delivery for dormant completion requesters", async () => { + const callGateway = vi.fn( + async () => ({}) as Record, + ) as unknown as typeof runtimeCallGateway; + __testing.setDepsForTest({ + callGateway, + getRequesterSessionActivity: () => ({ + sessionId: "requester-session-2", + isActive: false, + }), + loadConfig: () => ({}) as never, + }); + + await deliverSubagentAnnouncement({ + requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + triggerMessage: "child done", + steerMessage: "child done", + requesterOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + requesterSessionOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + completionDirectOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + directOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + requesterIsSubagent: false, + expectsCompletionMessage: true, + bestEffortDeliver: true, + directIdempotencyKey: "announce-1b", + }); + + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "agent", + params: expect.objectContaining({ + deliver: true, + channel: "slack", + accountId: "acct-1", + to: "channel:C123", + threadId: "171.222", + bestEffortDeliver: true, + }), + }), + ); + }); + + it("keeps direct external delivery for non-completion announces", async () => { + const callGateway = vi.fn( + async () => ({}) as Record, + ) as unknown as typeof runtimeCallGateway; + __testing.setDepsForTest({ + callGateway, + getRequesterSessionActivity: () => ({ + sessionId: "requester-session-3", + isActive: false, + }), + loadConfig: () => ({}) as never, + }); + + await deliverSubagentAnnouncement({ + requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + triggerMessage: "child done", + steerMessage: "child done", + requesterOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + requesterSessionOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + completionDirectOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + directOrigin: { + channel: "slack", + to: "channel:C123", + accountId: "acct-1", + threadId: "171.222", + }, + requesterIsSubagent: false, + expectsCompletionMessage: false, + bestEffortDeliver: true, + directIdempotencyKey: "announce-2", + }); + + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "agent", + params: expect.objectContaining({ + deliver: true, + channel: "slack", + accountId: "acct-1", + to: "channel:C123", + threadId: "171.222", + bestEffortDeliver: true, + }), + }), + ); + }); +}); diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 7ce75de28ce..da2946e4900 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -25,6 +25,7 @@ import { loadConfig, loadSessionStore, queueEmbeddedPiMessage, + resolveActiveEmbeddedRunSessionId, resolveAgentIdFromSessionKey, resolveConversationIdFromTargets, resolveExternalBestEffortDeliveryTarget, @@ -49,16 +50,44 @@ const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000; type SubagentAnnounceDeliveryDeps = { callGateway: typeof callGateway; loadConfig: typeof loadConfig; + getRequesterSessionActivity: (requesterSessionKey: string) => { + sessionId?: string; + isActive: boolean; + }; + queueEmbeddedPiMessage: typeof queueEmbeddedPiMessage; }; const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { callGateway, loadConfig, + getRequesterSessionActivity: (requesterSessionKey: string) => { + const sessionId = + resolveActiveEmbeddedRunSessionId(requesterSessionKey) ?? + loadRequesterSessionEntry(requesterSessionKey).entry?.sessionId; + return { + sessionId, + isActive: Boolean(sessionId && isEmbeddedPiRunActive(sessionId)), + }; + }, + queueEmbeddedPiMessage, }; let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = defaultSubagentAnnounceDeliveryDeps; +function resolveRequesterSessionActivity(requesterSessionKey: string) { + const activity = subagentAnnounceDeliveryDeps.getRequesterSessionActivity(requesterSessionKey); + if (activity.sessionId || activity.isActive) { + return activity; + } + const { entry } = loadRequesterSessionEntry(requesterSessionKey); + const sessionId = entry?.sessionId; + return { + sessionId, + isActive: Boolean(sessionId && isEmbeddedPiRunActive(sessionId)), + }; +} + function resolveDirectAnnounceTransientRetryDelaysMs() { return process.env.OPENCLAW_TEST_FAST === "1" ? ([8, 16, 32] as const) @@ -348,7 +377,7 @@ async function maybeQueueSubagentAnnounce(params: { } const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey); const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); - const sessionId = entry?.sessionId; + const { sessionId, isActive } = resolveRequesterSessionActivity(canonicalKey); if (!sessionId) { return "none"; } @@ -358,11 +387,13 @@ async function maybeQueueSubagentAnnounce(params: { channel: entry?.channel ?? entry?.lastChannel ?? entry?.origin?.provider, sessionEntry: entry, }); - const isActive = isEmbeddedPiRunActive(sessionId); const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog"; if (shouldSteer) { - const steered = queueEmbeddedPiMessage(sessionId, params.steerMessage); + const steered = subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage( + sessionId, + params.steerMessage, + ); if (steered) { return "steered"; } @@ -457,6 +488,26 @@ async function sendSubagentAnnounceDirectly(params: { isGatewayMessageChannel(normalizedSessionOnlyOriginChannel) ? normalizedSessionOnlyOriginChannel : undefined; + const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey); + if (params.expectsCompletionMessage && requesterActivity.isActive) { + const woke = requesterActivity.sessionId + ? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage( + requesterActivity.sessionId, + params.triggerMessage, + ) + : false; + if (woke) { + return { + delivered: true, + path: "steered", + }; + } + return { + delivered: false, + path: "direct", + error: "active requester session could not be woken", + }; + } if (params.signal?.aborted) { return { delivered: false, diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index e0393ea28e6..10bca5ccd99 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -1702,7 +1702,7 @@ describe("subagent announce formatting", () => { expect(new Set(idempotencyKeys).size).toBe(2); }); - it("prefers direct delivery first for completion-mode and then queues on direct failure", async () => { + it("falls back to queued follow-up delivery when an active completion wake cannot be injected", async () => { embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); sessionStore = { @@ -1729,15 +1729,11 @@ describe("subagent announce formatting", () => { expect(didAnnounce).toBe(true); expect(sendSpy).not.toHaveBeenCalled(); - expect(agentSpy).toHaveBeenCalledTimes(2); + expect(agentSpy).toHaveBeenCalledTimes(1); expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({ method: "agent", params: { sessionKey: "agent:main:main", channel: "whatsapp", to: "+1555", deliver: true }, }); - expect(agentSpy.mock.calls[1]?.[0]).toMatchObject({ - method: "agent", - params: { sessionKey: "agent:main:main" }, - }); }); it("falls back to internal requester-session injection when completion route is missing", async () => {