From 4dfecfbca882e307b1d7f7db73cf4235a3a5b1de Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Fri, 17 Apr 2026 00:25:38 -0400 Subject: [PATCH] Matrix: preserve subagent thread delivery --- .../matrix/src/matrix/subagent-hooks.test.ts | 43 +++++-- .../matrix/src/matrix/subagent-hooks.ts | 35 +++++- src/agents/subagent-spawn.runtime.ts | 7 +- src/agents/subagent-spawn.test-helpers.ts | 32 +++++ .../subagent-spawn.thread-binding.test.ts | 110 ++++++++++++++++++ src/agents/subagent-spawn.ts | 79 +++++++++++-- .../channel-outbound-send.test.ts | 29 +++++ src/cli/send-runtime/channel-outbound-send.ts | 27 +++-- .../outbound/session-binding-service.test.ts | 31 +++-- src/infra/outbound/session-binding-service.ts | 4 +- src/plugins/hook-types.ts | 6 + src/plugins/hooks.ts | 2 + 12 files changed, 358 insertions(+), 47 deletions(-) create mode 100644 src/agents/subagent-spawn.thread-binding.test.ts diff --git a/extensions/matrix/src/matrix/subagent-hooks.test.ts b/extensions/matrix/src/matrix/subagent-hooks.test.ts index 1a507f94b57..aade99dd3af 100644 --- a/extensions/matrix/src/matrix/subagent-hooks.test.ts +++ b/extensions/matrix/src/matrix/subagent-hooks.test.ts @@ -72,7 +72,13 @@ describe("handleMatrixSubagentSpawning", () => { // Default: manager exists getManagerMock.mockReturnValue({ persist: vi.fn() }); // Default: bind resolves ok - bindMock.mockResolvedValue({ conversation: {} }); + bindMock.mockResolvedValue({ + conversation: { + accountId: "default", + conversationId: "$thread-root", + parentConversationId: "!room123:example.org", + }, + }); }); it("returns undefined when threadRequested is false", async () => { @@ -174,7 +180,13 @@ describe("handleMatrixSubagentSpawning", () => { }); it("calls bind with the resolved room id and returns ok", async () => { - bindMock.mockResolvedValue({ conversation: {} }); + bindMock.mockResolvedValue({ + conversation: { + accountId: "ops", + conversationId: "$thread-ops", + parentConversationId: "!roomAbc:technerik.com", + }, + }); const result = await handleMatrixSubagentSpawning( fakeApi, makeSpawnEvent({ @@ -202,11 +214,26 @@ describe("handleMatrixSubagentSpawning", () => { }), }), ); - expect(result).toEqual({ status: "ok", threadBindingReady: true }); + expect(result).toMatchObject({ + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "ops", + to: "room:!roomAbc:technerik.com", + threadId: "$thread-ops", + }, + }); }); it("uses 'default' as accountId when requester.accountId is absent", async () => { - bindMock.mockResolvedValue({ conversation: {} }); + bindMock.mockResolvedValue({ + conversation: { + accountId: "default", + conversationId: "$thread-default", + parentConversationId: "!room123:example.org", + }, + }); await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ accountId: undefined as never })); expect(getManagerMock).toHaveBeenCalledWith("default"); expect(bindMock).toHaveBeenCalledWith( @@ -241,7 +268,7 @@ describe("handleMatrixSubagentSpawning", () => { fakeApi, makeSpawnEvent({ accountId: "forge" }), ); - expect(result).toEqual({ status: "ok", threadBindingReady: true }); + expect(result).toMatchObject({ status: "ok", threadBindingReady: true }); }); }); @@ -600,8 +627,8 @@ describe("concurrent spawns across accounts", () => { }), ]); - expect(opsResult).toEqual({ status: "ok", threadBindingReady: true }); - expect(forgeResult).toEqual({ status: "ok", threadBindingReady: true }); + expect(opsResult).toMatchObject({ status: "ok", threadBindingReady: true }); + expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true }); expect(bindMock).toHaveBeenCalledTimes(2); // Each bind call targeted the correct account's room @@ -651,6 +678,6 @@ describe("concurrent spawns across accounts", () => { error: expect.stringContaining("ops provider auth failed"), }), ); - expect(forgeResult).toEqual({ status: "ok", threadBindingReady: true }); + expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true }); }); }); diff --git a/extensions/matrix/src/matrix/subagent-hooks.ts b/extensions/matrix/src/matrix/subagent-hooks.ts index 1b2ee727dd4..61b9efe262c 100644 --- a/extensions/matrix/src/matrix/subagent-hooks.ts +++ b/extensions/matrix/src/matrix/subagent-hooks.ts @@ -42,7 +42,16 @@ type MatrixSubagentDeliveryTargetEvent = { }; type SpawningResult = - | { status: "ok"; threadBindingReady?: boolean } + | { + status: "ok"; + threadBindingReady?: boolean; + deliveryOrigin?: { + channel: string; + accountId: string; + to: string; + threadId?: string; + }; + } | { status: "error"; error: string }; type DeliveryTargetResult = { @@ -109,7 +118,7 @@ export async function handleMatrixSubagentSpawning( status: "error", error: "Matrix thread bindings are disabled (set channels.matrix.threadBindings.enabled=true to override for this account, or session.threadBindings.enabled=true globally).", - }; + } satisfies SpawningResult; } if (!flags.spawnSubagentSessions) { return { @@ -152,7 +161,7 @@ export async function handleMatrixSubagentSpawning( // // We do NOT call setBindingRecord here — the adapter's bind() handles // record creation, thread creation, and persistence atomically. - await getSessionBindingService().bind({ + const binding = await getSessionBindingService().bind({ targetSessionKey: event.childSessionKey, targetKind: "subagent", conversation: { @@ -167,14 +176,30 @@ export async function handleMatrixSubagentSpawning( boundBy: "system", }, }); + const boundRoomId = + binding.conversation.parentConversationId ?? binding.conversation.conversationId; + const threadId = + binding.conversation.parentConversationId && + binding.conversation.parentConversationId !== binding.conversation.conversationId + ? binding.conversation.conversationId + : undefined; + const result = { + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: binding.conversation.accountId ?? accountId, + to: `room:${boundRoomId}`, + ...(threadId ? { threadId } : {}), + }, + } satisfies SpawningResult; + return result; } catch (err) { return { status: "error", error: `Matrix thread bind failed: ${summarizeError(err)}`, }; } - - return { status: "ok", threadBindingReady: true }; } export async function handleMatrixSubagentEnded(event: MatrixSubagentEndedEvent): Promise { diff --git a/src/agents/subagent-spawn.runtime.ts b/src/agents/subagent-spawn.runtime.ts index 2dbfef61369..e5973519456 100644 --- a/src/agents/subagent-spawn.runtime.ts +++ b/src/agents/subagent-spawn.runtime.ts @@ -10,7 +10,12 @@ export { } from "../gateway/session-utils.js"; export { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; export { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; -export { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; +export { + mergeDeliveryContext, + normalizeDeliveryContext, +} from "../utils/delivery-context.shared.js"; +export { resolveConversationDeliveryTarget } from "../utils/delivery-context.js"; +export { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; export { resolveAgentConfig } from "./agent-scope.js"; export { AGENT_LANE_SUBAGENT } from "./lanes.js"; export { resolveSubagentSpawnModelSelection } from "./model-selection.js"; diff --git a/src/agents/subagent-spawn.test-helpers.ts b/src/agents/subagent-spawn.test-helpers.ts index c0fe1d426a0..e1e46470232 100644 --- a/src/agents/subagent-spawn.test-helpers.ts +++ b/src/agents/subagent-spawn.test-helpers.ts @@ -125,6 +125,22 @@ export async function loadSubagentSpawnModuleForTest(params: { cfg?: Record; sessionKey?: string; }) => { sandboxed: boolean }; + getSessionBindingService?: () => { + listBySession: (targetSessionKey: string) => Array<{ + status?: string; + conversation: { + channel: string; + accountId?: string; + conversationId: string; + parentConversationId?: string; + }; + }>; + }; + resolveConversationDeliveryTarget?: (params: { + channel?: string; + conversationId?: string | number; + parentConversationId?: string | number; + }) => { to?: string; threadId?: string }; workspaceDir?: string; sessionStorePath?: string; resetModules?: boolean; @@ -165,6 +181,22 @@ export async function loadSubagentSpawnModuleForTest(params: { isAdminOnlyMethod: (method: string) => method === "sessions.patch" || method === "sessions.delete", pruneLegacyStoreKeys: (...args: unknown[]) => params.pruneLegacyStoreKeysMock?.(...args), + getSessionBindingService: + params.getSessionBindingService ?? (() => ({ listBySession: () => [] })), + resolveConversationDeliveryTarget: + params.resolveConversationDeliveryTarget ?? + ((targetParams: { channel?: string; conversationId?: string | number }) => ({ + to: targetParams.conversationId + ? `channel:${String(targetParams.conversationId)}` + : undefined, + })), + mergeDeliveryContext: ( + primary?: Record, + fallback?: Record, + ) => ({ + ...fallback, + ...primary, + }), resolveGatewaySessionStoreTarget: (targetParams: { key: string }) => ({ agentId: "main", storePath: params.sessionStorePath ?? "/tmp/subagent-spawn-model-session.json", diff --git a/src/agents/subagent-spawn.thread-binding.test.ts b/src/agents/subagent-spawn.thread-binding.test.ts new file mode 100644 index 00000000000..f0b804941bf --- /dev/null +++ b/src/agents/subagent-spawn.thread-binding.test.ts @@ -0,0 +1,110 @@ +import os from "node:os"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + createSubagentSpawnTestConfig, + installSessionStoreCaptureMock, + loadSubagentSpawnModuleForTest, +} from "./subagent-spawn.test-helpers.js"; +import { installAcceptedSubagentGatewayMock } from "./test-helpers/subagent-gateway.js"; + +const hoisted = vi.hoisted(() => ({ + callGatewayMock: vi.fn(), + updateSessionStoreMock: vi.fn(), + registerSubagentRunMock: vi.fn(), + emitSessionLifecycleEventMock: vi.fn(), + hookRunner: { + hasHooks: vi.fn(), + runSubagentSpawning: vi.fn(), + }, +})); + +describe("spawnSubagentDirect thread binding delivery", () => { + beforeEach(() => { + vi.resetModules(); + hoisted.callGatewayMock.mockReset(); + hoisted.updateSessionStoreMock.mockReset(); + hoisted.registerSubagentRunMock.mockReset(); + hoisted.emitSessionLifecycleEventMock.mockReset(); + hoisted.hookRunner.hasHooks.mockReset(); + hoisted.hookRunner.runSubagentSpawning.mockReset(); + installAcceptedSubagentGatewayMock(hoisted.callGatewayMock); + installSessionStoreCaptureMock(hoisted.updateSessionStoreMock); + }); + + it("seeds a thread-bound child session from the binding created during spawn", async () => { + hoisted.hookRunner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "subagent_spawning", + ); + hoisted.hookRunner.runSubagentSpawning.mockResolvedValue({ + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "sut", + to: "room:!room:example", + threadId: "$thread-root", + }, + }); + const { spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({ + callGatewayMock: hoisted.callGatewayMock, + loadConfig: () => + createSubagentSpawnTestConfig(os.tmpdir(), { + agents: { + defaults: { + workspace: os.tmpdir(), + }, + list: [{ id: "main", workspace: "/tmp/workspace-main" }], + }, + }), + updateSessionStoreMock: hoisted.updateSessionStoreMock, + registerSubagentRunMock: hoisted.registerSubagentRunMock, + emitSessionLifecycleEventMock: hoisted.emitSessionLifecycleEventMock, + hookRunner: hoisted.hookRunner, + getSessionBindingService: () => ({ listBySession: () => [] }), + resolveConversationDeliveryTarget: () => ({ + to: "room:!room:example", + threadId: "$thread-root", + }), + resolveSubagentSpawnModelSelection: () => "openai-codex/gpt-5.4", + resolveSandboxRuntimeStatus: () => ({ sandboxed: false }), + }); + + const result = await spawnSubagentDirect( + { + task: "reply with a marker", + thread: true, + mode: "session", + }, + { + agentSessionKey: "agent:main:main", + agentChannel: "matrix", + agentAccountId: "sut", + agentTo: "room:!room:example", + }, + ); + + expect(result.status).toBe("accepted"); + const agentCall = hoisted.callGatewayMock.mock.calls.find( + ([call]) => (call as { method?: string }).method === "agent", + )?.[0] as { params?: Record } | undefined; + expect(agentCall?.params).toMatchObject({ + channel: "matrix", + accountId: "sut", + to: "room:!room:example", + threadId: "$thread-root", + deliver: true, + }); + expect(hoisted.registerSubagentRunMock).toHaveBeenCalledWith( + expect.objectContaining({ + requesterOrigin: { + channel: "matrix", + accountId: "sut", + to: "room:!room:example", + threadId: "$thread-root", + }, + expectsCompletionMessage: false, + spawnMode: "session", + }), + ); + }); +}); diff --git a/src/agents/subagent-spawn.ts b/src/agents/subagent-spawn.ts index 8a450706399..ebb2a23595a 100644 --- a/src/agents/subagent-spawn.ts +++ b/src/agents/subagent-spawn.ts @@ -40,10 +40,13 @@ import { emitSessionLifecycleEvent, getGlobalHookRunner, loadConfig, + getSessionBindingService, mergeSessionEntry, + mergeDeliveryContext, normalizeDeliveryContext, pruneLegacyStoreKeys, resolveAgentConfig, + resolveConversationDeliveryTarget, resolveDisplaySessionKey, resolveGatewaySessionStoreTarget, resolveInternalSessionKey, @@ -71,6 +74,13 @@ type SubagentSpawnDeps = { updateSessionStore: typeof updateSessionStore; }; +type SubagentDeliveryOrigin = { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; +}; + const defaultSubagentSpawnDeps: SubagentSpawnDeps = { callGateway, getGlobalHookRunner, @@ -295,7 +305,9 @@ async function ensureThreadBindingForSubagentSpawn(params: { to?: string; threadId?: string | number; }; -}): Promise<{ status: "ok" } | { status: "error"; error: string }> { +}): Promise< + { status: "ok"; deliveryOrigin?: SubagentDeliveryOrigin } | { status: "error"; error: string } +> { const hookRunner = params.hookRunner; if (!hookRunner?.hasHooks("subagent_spawning")) { return { @@ -334,7 +346,13 @@ async function ensureThreadBindingForSubagentSpawn(params: { "Unable to create or bind a thread for this subagent session. Session mode is unavailable for this target.", }; } - return { status: "ok" }; + const deliveryOrigin = + normalizeDeliveryContext(result.deliveryOrigin) ?? + resolveThreadBindingDeliveryOrigin(params.childSessionKey); + return { + status: "ok", + ...(deliveryOrigin ? { deliveryOrigin } : {}), + }; } catch (err) { return { status: "error", @@ -343,6 +361,32 @@ async function ensureThreadBindingForSubagentSpawn(params: { } } +function resolveThreadBindingDeliveryOrigin( + childSessionKey: string, +): SubagentDeliveryOrigin | undefined { + const activeBindings = getSessionBindingService() + .listBySession(childSessionKey) + .filter((binding) => binding.status === "active"); + if (activeBindings.length !== 1) { + return undefined; + } + const binding = activeBindings[0]; + if (!binding) { + return undefined; + } + const target = resolveConversationDeliveryTarget({ + channel: binding.conversation.channel, + conversationId: binding.conversation.conversationId, + parentConversationId: binding.conversation.parentConversationId, + }); + return normalizeDeliveryContext({ + channel: binding.conversation.channel, + accountId: binding.conversation.accountId, + to: target.to, + threadId: target.threadId, + }) as SubagentDeliveryOrigin | undefined; +} + export async function spawnSubagentDirect( params: SpawnSubagentParams, ctx: SpawnSubagentContext, @@ -387,7 +431,8 @@ export async function spawnSubagentDirect( accountId: ctx.agentAccountId, to: ctx.agentTo, threadId: ctx.agentThreadId, - }); + }) as SubagentDeliveryOrigin | undefined; + let childSessionOrigin = requesterOrigin; const hookRunner = subagentSpawnDeps.getGlobalHookRunner(); const cfg = loadSubagentConfig(); @@ -597,12 +642,14 @@ export async function spawnSubagentDirect( }; } threadBindingReady = true; + childSessionOrigin = + mergeDeliveryContext(bindResult.deliveryOrigin, requesterOrigin) ?? childSessionOrigin; } const mountPathHint = sanitizeMountPathHint(params.attachMountPath); let childSystemPrompt = buildSubagentSystemPrompt({ requesterSessionKey, - requesterOrigin, + requesterOrigin: childSessionOrigin, childSessionKey, label: label || undefined, task, @@ -698,6 +745,13 @@ export async function spawnSubagentDirect( const childIdem = crypto.randomUUID(); let childRunId: string = childIdem; + const deliverInitialChildRunDirectly = + requestThreadBinding && + spawnMode === "session" && + Boolean(childSessionOrigin?.channel && childSessionOrigin.to); + const shouldAnnounceCompletion = deliverInitialChildRunDirectly + ? false + : expectsCompletionMessage; try { const { spawnedBy: _spawnedBy, @@ -709,12 +763,13 @@ export async function spawnSubagentDirect( params: { message: childTaskMessage, sessionKey: childSessionKey, - channel: requesterOrigin?.channel, - to: requesterOrigin?.to ?? undefined, - accountId: requesterOrigin?.accountId ?? undefined, - threadId: requesterOrigin?.threadId != null ? String(requesterOrigin.threadId) : undefined, + channel: childSessionOrigin?.channel, + to: childSessionOrigin?.to ?? undefined, + accountId: childSessionOrigin?.accountId ?? undefined, + threadId: + childSessionOrigin?.threadId != null ? String(childSessionOrigin.threadId) : undefined, idempotencyKey: childIdem, - deliver: false, + deliver: deliverInitialChildRunDirectly, lane: AGENT_LANE_SUBAGENT, extraSystemPrompt: childSystemPrompt, thinking: thinkingOverride, @@ -754,7 +809,7 @@ export async function spawnSubagentDirect( targetKind: "subagent", reason: "spawn-failed", sendFarewell: true, - accountId: requesterOrigin?.accountId, + accountId: childSessionOrigin?.accountId, runId: childRunId, outcome: "error", error: "Session failed to start", @@ -802,7 +857,7 @@ export async function spawnSubagentDirect( childSessionKey, controllerSessionKey: requesterInternalKey, requesterSessionKey: requesterInternalKey, - requesterOrigin, + requesterOrigin: childSessionOrigin, requesterDisplayKey, task, cleanup, @@ -810,7 +865,7 @@ export async function spawnSubagentDirect( model: resolvedModel, workspaceDir: spawnedMetadata.workspaceDir, runTimeoutSeconds, - expectsCompletionMessage, + expectsCompletionMessage: shouldAnnounceCompletion, spawnMode, attachmentsDir: attachmentAbsDir, attachmentsRootDir: attachmentRootDir, diff --git a/src/cli/send-runtime/channel-outbound-send.test.ts b/src/cli/send-runtime/channel-outbound-send.test.ts index 684cae030e1..f37ae9502a4 100644 --- a/src/cli/send-runtime/channel-outbound-send.test.ts +++ b/src/cli/send-runtime/channel-outbound-send.test.ts @@ -86,6 +86,35 @@ describe("createChannelOutboundRuntimeSend", () => { ); }); + it("accepts plugin outbound thread and reply aliases", async () => { + const sendText = vi.fn(async () => ({ channel: "matrix", messageId: "$reply" })); + mocks.loadChannelOutboundAdapter.mockResolvedValue({ + sendText, + }); + + const { createChannelOutboundRuntimeSend } = await import("./channel-outbound-send.js"); + const runtimeSend = createChannelOutboundRuntimeSend({ + channelId: "matrix" as never, + unavailableMessage: "unavailable", + }); + + await runtimeSend.sendMessage("room:!ops:example.org", "hello thread", { + cfg: {}, + accountId: "sut", + replyToId: "$parent", + threadId: "$thread-root", + }); + + expect(sendText).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "sut", + replyToId: "$parent", + threadId: "$thread-root", + to: "room:!ops:example.org", + }), + ); + }); + it("falls back to sendText when media is present but sendMedia is unavailable", async () => { const sendText = vi.fn(async () => ({ channel: "whatsapp", messageId: "wa-3" })); mocks.loadChannelOutboundAdapter.mockResolvedValue({ diff --git a/src/cli/send-runtime/channel-outbound-send.ts b/src/cli/send-runtime/channel-outbound-send.ts index 4649cb28b77..e39cc3c6202 100644 --- a/src/cli/send-runtime/channel-outbound-send.ts +++ b/src/cli/send-runtime/channel-outbound-send.ts @@ -12,7 +12,9 @@ type RuntimeSendOpts = { mediaLocalRoots?: readonly string[]; mediaReadFile?: (filePath: string) => Promise; accountId?: string; + threadId?: string | number | null; messageThreadId?: string | number; + replyToId?: string | number | null; replyToMessageId?: string | number; silent?: boolean; forceDocument?: boolean; @@ -20,6 +22,15 @@ type RuntimeSendOpts = { gatewayClientScopes?: readonly string[]; }; +function resolveRuntimeThreadId(opts: RuntimeSendOpts): string | number | undefined { + return opts.messageThreadId ?? opts.threadId ?? undefined; +} + +function resolveRuntimeReplyToId(opts: RuntimeSendOpts): string | undefined { + const raw = opts.replyToMessageId ?? opts.replyToId; + return raw == null ? undefined : normalizeOptionalString(String(raw)); +} + export function createChannelOutboundRuntimeSend(params: { channelId: ChannelId; unavailableMessage: string; @@ -27,6 +38,8 @@ export function createChannelOutboundRuntimeSend(params: { return { sendMessage: async (to: string, text: string, opts: RuntimeSendOpts = {}) => { const outbound = await loadChannelOutboundAdapter(params.channelId); + const threadId = resolveRuntimeThreadId(opts); + const replyToId = resolveRuntimeReplyToId(opts); const hasMedia = Boolean(opts.mediaUrl); if (hasMedia && outbound?.sendMedia) { return await outbound.sendMedia({ @@ -38,11 +51,8 @@ export function createChannelOutboundRuntimeSend(params: { mediaLocalRoots: opts.mediaLocalRoots, mediaReadFile: opts.mediaReadFile, accountId: opts.accountId, - threadId: opts.messageThreadId, - replyToId: - opts.replyToMessageId == null - ? undefined - : normalizeOptionalString(String(opts.replyToMessageId)), + threadId, + replyToId, silent: opts.silent, forceDocument: opts.forceDocument, gifPlayback: opts.gifPlayback, @@ -61,11 +71,8 @@ export function createChannelOutboundRuntimeSend(params: { mediaLocalRoots: opts.mediaLocalRoots, mediaReadFile: opts.mediaReadFile, accountId: opts.accountId, - threadId: opts.messageThreadId, - replyToId: - opts.replyToMessageId == null - ? undefined - : normalizeOptionalString(String(opts.replyToMessageId)), + threadId, + replyToId, silent: opts.silent, forceDocument: opts.forceDocument, gifPlayback: opts.gifPlayback, diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 8083c4664ae..bd59d20823c 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -437,7 +437,7 @@ describe("session binding service", () => { } }); - it("keeps the first live adapter authoritative until it unregisters", () => { + it("keeps the newest live adapter authoritative until it unregisters", () => { const firstBinding = { bindingId: "first-binding", targetSessionKey: "agent:main", @@ -457,17 +457,30 @@ describe("session binding service", () => { targetSessionKey === "agent:main" ? [firstBinding] : [], resolveByConversation: () => null, }; + const secondBinding = { + bindingId: "second-binding", + targetSessionKey: "agent:main", + targetKind: "session" as const, + conversation: { + channel: "demo-binding", + accountId: "default", + conversationId: "thread-2", + }, + status: "active" as const, + boundAt: 2, + }; const secondAdapter: SessionBindingAdapter = { channel: "Demo-Binding", accountId: "DEFAULT", - listBySession: () => [], + listBySession: (targetSessionKey) => + targetSessionKey === "agent:main" ? [secondBinding] : [], resolveByConversation: () => null, }; registerSessionBindingAdapter(firstAdapter); registerSessionBindingAdapter(secondAdapter); - expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]); + expect(getSessionBindingService().listBySession("agent:main")).toEqual([secondBinding]); unregisterSessionBindingAdapter({ channel: "demo-binding", @@ -529,13 +542,13 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).toHaveBeenCalledTimes(1); - expect(secondBind).not.toHaveBeenCalled(); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(1); - first.unregisterSessionBindingAdapter({ + second.unregisterSessionBindingAdapter({ channel: "demo-binding", accountId: "default", - adapter: firstAdapter, + adapter: secondAdapter, }); await expect( @@ -558,10 +571,10 @@ describe("session binding service", () => { expect(firstBind).toHaveBeenCalledTimes(1); expect(secondBind).toHaveBeenCalledTimes(1); - second.unregisterSessionBindingAdapter({ + first.unregisterSessionBindingAdapter({ channel: "demo-binding", accountId: "default", - adapter: secondAdapter, + adapter: firstAdapter, }); await expect( diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index 39d088942d0..0b520e56a89 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -135,7 +135,7 @@ const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap registrations[0]?.normalizedAdapter ?? null) + .map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null) .filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter)); } diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index a5186685f52..765c1f3a1f2 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -421,6 +421,12 @@ export type PluginHookSubagentSpawningResult = | { status: "ok"; threadBindingReady?: boolean; + deliveryOrigin?: { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; + }; } | { status: "error"; diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index b7d3e5f4d22..5972531f413 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -258,9 +258,11 @@ export function createHookRunner( if (next.status === "error") { return next; } + const deliveryOrigin = next.deliveryOrigin ?? acc?.deliveryOrigin; return { status: "ok", threadBindingReady: Boolean(acc?.threadBindingReady || next.threadBindingReady), + ...(deliveryOrigin ? { deliveryOrigin } : {}), }; };