From 1ce25961950d7065b8bcc4638e96d935193e76dc Mon Sep 17 00:00:00 2001 From: EE Date: Fri, 17 Apr 2026 01:17:56 -0500 Subject: [PATCH] matrix: fix sessions_spawn --thread subagent session spawning (#67643) Merged via squash. Prepared head SHA: 1e5127e217e5c3c30d8430a689e13e8c64056e6a Co-authored-by: eejohnso-ops <238848106+eejohnso-ops@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + extensions/matrix/index.test.ts | 42 + extensions/matrix/index.ts | 22 + .../matrix/src/matrix/subagent-hooks.test.ts | 734 ++++++++++++++++++ .../matrix/src/matrix/subagent-hooks.ts | 311 ++++++++ .../src/matrix/thread-bindings-shared.ts | 5 + .../matrix/src/matrix/thread-bindings.ts | 1 + src/agents/subagent-spawn.runtime.ts | 7 +- src/agents/subagent-spawn.test-helpers.ts | 32 + .../subagent-spawn.thread-binding.test.ts | 183 +++++ src/agents/subagent-spawn.ts | 47 +- .../channel-outbound-send.test.ts | 29 + src/cli/send-runtime/channel-outbound-send.ts | 55 +- .../outbound/session-binding-service.test.ts | 31 +- src/infra/outbound/session-binding-service.ts | 4 +- src/plugins/hook-types.ts | 6 + src/plugins/hooks.ts | 2 + src/plugins/wired-hooks-subagent.test.ts | 64 +- 18 files changed, 1521 insertions(+), 55 deletions(-) create mode 100644 extensions/matrix/src/matrix/subagent-hooks.test.ts create mode 100644 extensions/matrix/src/matrix/subagent-hooks.ts create mode 100644 src/agents/subagent-spawn.thread-binding.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 36625a134e1..8af2cd93340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai - Memory/sqlite-vec: emit the degraded sqlite-vec warning once per degraded episode instead of repeating it for every file write, while preserving the latch across safe-reindex rollback and resetting it when vector state is genuinely rebuilt. (#67898) Thanks @rubencu. - Reply/block streaming: preserve post-stream incomplete-turn error payloads after block streaming already emitted content, so users get the warning instead of silence. (#67991) Thanks @obviyus. - Telegram/streaming: clear the compaction replay guard after visible non-final boundaries so a post-tool assistant reply rotates to a fresh preview instead of editing the pre-compaction message. (#67993) Thanks @obviyus. +- Matrix: fix `sessions_spawn --thread` subagent session spawning — thread binding creation, cleanup on session end, and completion-message delivery target resolution now work end-to-end. (#67643) Thanks @eejohnso-ops and @gumadeiras. ## 2026.4.15 diff --git a/extensions/matrix/index.test.ts b/extensions/matrix/index.test.ts index 6f2dcddcc9f..60c42c1c257 100644 --- a/extensions/matrix/index.test.ts +++ b/extensions/matrix/index.test.ts @@ -7,12 +7,23 @@ const cliMocks = vi.hoisted(() => ({ registerMatrixCli: vi.fn(), })); +const runtimeMocks = vi.hoisted(() => ({ + ensureMatrixCryptoRuntime: vi.fn(async () => {}), + handleVerificationBootstrap: vi.fn(async () => {}), + handleVerificationStatus: vi.fn(async () => {}), + handleVerifyRecoveryKey: vi.fn(async () => {}), + setMatrixRuntime: vi.fn(), +})); + vi.mock("./src/cli.js", () => { return { registerMatrixCli: cliMocks.registerMatrixCli, }; }); +vi.mock("./plugin-entry.handlers.runtime.js", () => runtimeMocks); +vi.mock("./runtime-api.js", () => ({ setMatrixRuntime: runtimeMocks.setMatrixRuntime })); + describe("matrix plugin", () => { it("registers matrix CLI through a descriptor-backed lazy registrar", async () => { const registerCli = vi.fn(); @@ -56,4 +67,35 @@ describe("matrix plugin", () => { expect(entry.id).toBe("matrix"); expect(entry.name).toBe("Matrix"); }); + + it("registers subagent lifecycle hooks during full registration", () => { + const on = vi.fn(); + const registerChannel = vi.fn(); + const registerGatewayMethod = vi.fn(); + const api = createTestPluginApi({ + id: "matrix", + name: "Matrix", + source: "test", + config: {}, + runtime: {} as never, + registrationMode: "full", + on, + registerChannel, + registerGatewayMethod, + }); + + entry.register(api); + + expect(registerChannel).toHaveBeenCalledWith({ + plugin: expect.objectContaining({ id: "matrix" }), + }); + expect(on.mock.calls.map(([hookName]) => hookName)).toEqual([ + "subagent_spawning", + "subagent_ended", + "subagent_delivery_target", + ]); + for (const [, handler] of on.mock.calls) { + expect(handler).toEqual(expect.any(Function)); + } + }); }); diff --git a/extensions/matrix/index.ts b/extensions/matrix/index.ts index 9dd62e49be0..e46b307e1b6 100644 --- a/extensions/matrix/index.ts +++ b/extensions/matrix/index.ts @@ -2,6 +2,15 @@ import { defineBundledChannelEntry } from "openclaw/plugin-sdk/channel-entry-con import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { registerMatrixCliMetadata } from "./cli-metadata.js"; +type MatrixSubagentHooksModule = typeof import("./src/matrix/subagent-hooks.js"); + +let matrixSubagentHooksPromise: Promise | null = null; + +function loadMatrixSubagentHooksModule() { + matrixSubagentHooksPromise ??= import("./src/matrix/subagent-hooks.js"); + return matrixSubagentHooksPromise; +} + export default defineBundledChannelEntry({ id: "matrix", name: "Matrix", @@ -47,5 +56,18 @@ export default defineBundledChannelEntry({ const { handleVerificationStatus } = await import("./plugin-entry.handlers.runtime.js"); await handleVerificationStatus(ctx); }); + + api.on("subagent_spawning", async (event) => { + const { handleMatrixSubagentSpawning } = await loadMatrixSubagentHooksModule(); + return await handleMatrixSubagentSpawning(api, event); + }); + api.on("subagent_ended", async (event) => { + const { handleMatrixSubagentEnded } = await loadMatrixSubagentHooksModule(); + await handleMatrixSubagentEnded(event); + }); + api.on("subagent_delivery_target", async (event) => { + const { handleMatrixSubagentDeliveryTarget } = await loadMatrixSubagentHooksModule(); + return handleMatrixSubagentDeliveryTarget(event); + }); }, }); diff --git a/extensions/matrix/src/matrix/subagent-hooks.test.ts b/extensions/matrix/src/matrix/subagent-hooks.test.ts new file mode 100644 index 00000000000..2e19ebfdf4e --- /dev/null +++ b/extensions/matrix/src/matrix/subagent-hooks.test.ts @@ -0,0 +1,734 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +// Hoisted stubs referenced in vi.mock factories below +const bindMock = vi.hoisted(() => vi.fn()); +const unbindMock = vi.hoisted(() => vi.fn()); +const getManagerMock = vi.hoisted(() => vi.fn()); +const listAllBindingsMock = vi.hoisted(() => vi.fn((): any[] => [])); +const listBindingsForAccountMock = vi.hoisted(() => vi.fn((): any[] => [])); +const removeBindingRecordMock = vi.hoisted(() => vi.fn(() => false)); +const resolveMatrixBaseConfigMock = vi.hoisted(() => vi.fn((): any => ({}))); +const findMatrixAccountConfigMock = vi.hoisted(() => vi.fn((): any => undefined)); + +vi.mock("openclaw/plugin-sdk/conversation-binding-runtime", () => ({ + getSessionBindingService: () => ({ bind: bindMock, unbind: unbindMock }), +})); + +vi.mock("./account-config.js", () => ({ + resolveMatrixBaseConfig: resolveMatrixBaseConfigMock, + findMatrixAccountConfig: findMatrixAccountConfigMock, +})); + +vi.mock("./thread-bindings-shared.js", () => ({ + getMatrixThreadBindingManager: getManagerMock, + listAllBindings: listAllBindingsMock, + listBindingsForAccount: listBindingsForAccountMock, + removeBindingRecord: removeBindingRecordMock, + resolveBindingKey: (params: { + accountId: string; + conversationId: string; + parentConversationId?: string; + }) => + `${params.accountId}:${params.parentConversationId?.trim() || "-"}:${params.conversationId}`, +})); + +import { + handleMatrixSubagentDeliveryTarget, + handleMatrixSubagentEnded, + handleMatrixSubagentSpawning, +} from "./subagent-hooks.js"; + +// A minimal fake api — only config is used by these hooks +const fakeApi = { config: {} } as never; + +function makeSpawnEvent( + overrides: Partial<{ + threadRequested: boolean; + channel: string; + accountId: string; + to: string; + childSessionKey: string; + agentId: string; + label: string; + }> = {}, +) { + return { + threadRequested: overrides.threadRequested ?? true, + requester: { + channel: overrides.channel ?? "matrix", + accountId: overrides.accountId ?? "default", + to: overrides.to ?? "room:!room123:example.org", + }, + childSessionKey: overrides.childSessionKey ?? "agent:default:subagent:child", + agentId: overrides.agentId ?? "worker", + label: overrides.label, + }; +} + +describe("handleMatrixSubagentSpawning", () => { + beforeEach(() => { + bindMock.mockReset(); + getManagerMock.mockReset(); + resolveMatrixBaseConfigMock.mockReset(); + findMatrixAccountConfigMock.mockReset(); + // Default: bindings enabled, spawn enabled + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: true }, + }); + findMatrixAccountConfigMock.mockReturnValue(undefined); + // Default: manager exists + getManagerMock.mockReturnValue({ persist: vi.fn() }); + // Default: bind resolves ok + bindMock.mockResolvedValue({ + conversation: { + accountId: "default", + conversationId: "$thread-root", + parentConversationId: "!room123:example.org", + }, + }); + }); + + it("returns undefined when threadRequested is false", async () => { + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ threadRequested: false }), + ); + expect(result).toBeUndefined(); + expect(bindMock).not.toHaveBeenCalled(); + }); + + it("returns undefined when channel is not matrix", async () => { + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ channel: "slack" }), + ); + expect(result).toBeUndefined(); + expect(bindMock).not.toHaveBeenCalled(); + }); + + it("proceeds past channel check when channel is 'matrix' with mixed casing", async () => { + // channel.trim().toLowerCase() must equal "matrix" — mixed case is accepted + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ channel: " Matrix " }), + ); + expect(result).not.toBeUndefined(); + }); + + it("returns error when thread bindings are disabled", async () => { + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: false, spawnSubagentSessions: true }, + }); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("thread bindings are disabled"), + }), + ); + }); + + it("returns error when spawnSubagentSessions is false", async () => { + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: false }, + }); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("spawnSubagentSessions"), + }), + ); + }); + + it("returns error when spawnSubagentSessions defaults to false (no config)", async () => { + resolveMatrixBaseConfigMock.mockReturnValue({}); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("spawnSubagentSessions"), + }), + ); + }); + + it("returns error when requester.to has no room target", async () => { + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ to: "@user:example.org" }), + ); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("no room target"), + }), + ); + }); + + it("returns error when requester.to is empty", async () => { + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ to: "" })); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("no room target"), + }), + ); + }); + + it("returns error when no binding manager is available for the account", async () => { + getManagerMock.mockReturnValue(null); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("No Matrix thread binding manager"), + }), + ); + }); + + it("calls bind with the resolved room id and returns ok", async () => { + bindMock.mockResolvedValue({ + conversation: { + accountId: "ops", + conversationId: "$thread-ops", + parentConversationId: "!roomAbc:technerik.com", + }, + }); + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ + accountId: "ops", + to: "room:!roomAbc:technerik.com", + childSessionKey: "agent:ops:subagent:worker", + agentId: "builder", + label: "Build Agent", + }), + ); + + expect(bindMock).toHaveBeenCalledWith( + expect.objectContaining({ + targetSessionKey: "agent:ops:subagent:worker", + targetKind: "subagent", + conversation: expect.objectContaining({ + channel: "matrix", + accountId: "ops", + conversationId: "!roomAbc:technerik.com", + }), + placement: "child", + metadata: expect.objectContaining({ + agentId: "builder", + label: "Build Agent", + }), + }), + ); + expect(result).toMatchObject({ + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "ops", + to: "room:!roomAbc:technerik.com", + threadId: "$thread-ops", + }, + }); + }); + + it("uses 'default' as accountId when requester.accountId is absent", async () => { + bindMock.mockResolvedValue({ + conversation: { + accountId: "default", + conversationId: "$thread-default", + parentConversationId: "!room123:example.org", + }, + }); + await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ accountId: undefined as never })); + expect(getManagerMock).toHaveBeenCalledWith("default"); + expect(bindMock).toHaveBeenCalledWith( + expect.objectContaining({ + conversation: expect.objectContaining({ accountId: "default" }), + }), + ); + }); + + it("returns error when bind() throws", async () => { + bindMock.mockRejectedValue(new Error("provider auth failed")); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("provider auth failed"), + }), + ); + }); + + it("respects per-account threadBindings override over base config", async () => { + // Base says spawnSubagentSessions=false; account override says true + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: false }, + }); + findMatrixAccountConfigMock.mockReturnValue({ + threadBindings: { spawnSubagentSessions: true }, + }); + bindMock.mockResolvedValue({ conversation: {} }); + + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ accountId: "forge" }), + ); + expect(result).toMatchObject({ status: "ok", threadBindingReady: true }); + }); +}); + +describe("handleMatrixSubagentEnded", () => { + const mockManager = { persist: vi.fn() }; + + beforeEach(() => { + getManagerMock.mockReset(); + listAllBindingsMock.mockReset(); + listBindingsForAccountMock.mockReset(); + removeBindingRecordMock.mockReset(); + unbindMock.mockReset(); + mockManager.persist.mockReset(); + }); + + it("does nothing when no matching bindings exist", async () => { + listBindingsForAccountMock.mockReturnValue([]); + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + }); + expect(getManagerMock).not.toHaveBeenCalled(); + }); + + it("removes matching bindings and calls persist on the manager", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + removeBindingRecordMock.mockReturnValue(true); + getManagerMock.mockReturnValue(mockManager); + mockManager.persist.mockResolvedValue(undefined); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + }); + + expect(removeBindingRecordMock).toHaveBeenCalledWith(binding); + expect(getManagerMock).toHaveBeenCalledWith("ops"); + expect(mockManager.persist).toHaveBeenCalled(); + }); + + it("sends farewell through the binding service when requested", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + unbindMock.mockResolvedValue([ + { + bindingId: "ops:!room:example:$thread", + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + status: "active", + boundAt: 0, + }, + ]); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + reason: "spawn-failed", + sendFarewell: true, + }); + + expect(unbindMock).toHaveBeenCalledWith({ + bindingId: "ops:!room:example:$thread", + reason: "spawn-failed", + }); + expect(removeBindingRecordMock).not.toHaveBeenCalled(); + expect(getManagerMock).not.toHaveBeenCalled(); + }); + + it("skips persist when removeBindingRecord returns false (binding not found in store)", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:orphan", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + removeBindingRecordMock.mockReturnValue(false); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:orphan", + targetKind: "subagent", + accountId: "ops", + }); + + expect(getManagerMock).not.toHaveBeenCalled(); + }); + + it("falls back to listAllBindings when accountId is absent", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listAllBindingsMock.mockReturnValue([binding]); + removeBindingRecordMock.mockReturnValue(true); + getManagerMock.mockReturnValue(mockManager); + mockManager.persist.mockResolvedValue(undefined); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + }); + + expect(listAllBindingsMock).toHaveBeenCalled(); + expect(listBindingsForAccountMock).not.toHaveBeenCalled(); + expect(mockManager.persist).toHaveBeenCalled(); + }); + + it("does not double-persist when multiple bindings share the same account", async () => { + const mkBinding = (conversationId: string) => ({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId, + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }); + listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]); + removeBindingRecordMock.mockReturnValue(true); + getManagerMock.mockReturnValue(mockManager); + mockManager.persist.mockResolvedValue(undefined); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + }); + + // persist must be called exactly once per unique accountId, not once per binding + expect(mockManager.persist).toHaveBeenCalledTimes(1); + }); +}); + +describe("handleMatrixSubagentDeliveryTarget", () => { + beforeEach(() => { + listAllBindingsMock.mockReset(); + listBindingsForAccountMock.mockReset(); + }); + + it("returns undefined when expectsCompletionMessage is false", () => { + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: false, + }); + expect(result).toBeUndefined(); + }); + + it("returns undefined when requester channel is not matrix", () => { + listBindingsForAccountMock.mockReturnValue([]); + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "slack", accountId: "ops" }, + expectsCompletionMessage: true, + }); + expect(result).toBeUndefined(); + }); + + it("returns undefined when no bindings match the child session key", () => { + listBindingsForAccountMock.mockReturnValue([ + { + targetSessionKey: "agent:ops:subagent:OTHER", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }, + ]); + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: true, + }); + expect(result).toBeUndefined(); + }); + + it("returns origin with threadId when binding has a distinct parentConversationId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread123", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$thread123" }, + expectsCompletionMessage: true, + }); + + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + threadId: "$thread123", + }, + }); + }); + + it("returns origin without threadId when conversationId equals parentConversationId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "!room:example", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: true, + }); + + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + }, + }); + expect(result?.origin).not.toHaveProperty("threadId"); + }); + + it("returns origin without threadId when binding has no parentConversationId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: true, + }); + + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + }, + }); + }); + + it("falls back to the single binding when requesterOrigin threadId does not match any binding", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread123", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$threadOTHER" }, + expectsCompletionMessage: true, + }); + + // No threadId match, but single binding → falls back to it + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + threadId: "$thread123", + }, + }); + }); + + it("returns undefined when multiple bindings exist and threadId matches none", () => { + const mkBinding = (threadId: string) => ({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: threadId, + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }); + listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$tNONE" }, + expectsCompletionMessage: true, + }); + + expect(result).toBeUndefined(); + }); + + it("uses listAllBindings when requesterOrigin has no accountId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread123", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listAllBindingsMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix" }, + expectsCompletionMessage: true, + }); + + expect(listAllBindingsMock).toHaveBeenCalled(); + expect(listBindingsForAccountMock).not.toHaveBeenCalled(); + expect(result).toBeDefined(); + }); +}); + +describe("concurrent spawns across accounts", () => { + beforeEach(() => { + bindMock.mockReset(); + getManagerMock.mockReset(); + resolveMatrixBaseConfigMock.mockReset(); + findMatrixAccountConfigMock.mockReset(); + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: true }, + }); + findMatrixAccountConfigMock.mockReturnValue(undefined); + getManagerMock.mockReturnValue({ persist: vi.fn() }); + }); + + it("resolves both spawns independently when two accounts fire simultaneously", async () => { + // Each account gets its own bind call that resolves with a distinct conversation + bindMock + .mockResolvedValueOnce({ conversation: { accountId: "ops", conversationId: "$t-ops" } }) + .mockResolvedValueOnce({ conversation: { accountId: "forge", conversationId: "$t-forge" } }); + + const [opsResult, forgeResult] = await Promise.all([ + handleMatrixSubagentSpawning(fakeApi, { + threadRequested: true, + requester: { channel: "matrix", accountId: "ops", to: "room:!room-ops:example.org" }, + childSessionKey: "agent:ops:subagent:child-ops", + agentId: "worker-ops", + }), + handleMatrixSubagentSpawning(fakeApi, { + threadRequested: true, + requester: { channel: "matrix", accountId: "forge", to: "room:!room-forge:example.org" }, + childSessionKey: "agent:forge:subagent:child-forge", + agentId: "worker-forge", + }), + ]); + + expect(opsResult).toMatchObject({ status: "ok", threadBindingReady: true }); + expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true }); + expect(bindMock).toHaveBeenCalledTimes(2); + + // Each bind call targeted the correct account's room + expect(bindMock).toHaveBeenCalledWith( + expect.objectContaining({ + targetSessionKey: "agent:ops:subagent:child-ops", + conversation: expect.objectContaining({ + accountId: "ops", + conversationId: "!room-ops:example.org", + }), + }), + ); + expect(bindMock).toHaveBeenCalledWith( + expect.objectContaining({ + targetSessionKey: "agent:forge:subagent:child-forge", + conversation: expect.objectContaining({ + accountId: "forge", + conversationId: "!room-forge:example.org", + }), + }), + ); + }); + + it("one account bind failure does not affect the other account's spawn", async () => { + bindMock + .mockRejectedValueOnce(new Error("ops provider auth failed")) + .mockResolvedValueOnce({ conversation: { accountId: "forge", conversationId: "$t-forge" } }); + + const [opsResult, forgeResult] = await Promise.all([ + handleMatrixSubagentSpawning(fakeApi, { + threadRequested: true, + requester: { channel: "matrix", accountId: "ops", to: "room:!room-ops:example.org" }, + childSessionKey: "agent:ops:subagent:child-ops", + agentId: "worker-ops", + }), + handleMatrixSubagentSpawning(fakeApi, { + threadRequested: true, + requester: { channel: "matrix", accountId: "forge", to: "room:!room-forge:example.org" }, + childSessionKey: "agent:forge:subagent:child-forge", + agentId: "worker-forge", + }), + ]); + + expect(opsResult).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("ops provider auth failed"), + }), + ); + expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true }); + }); +}); diff --git a/extensions/matrix/src/matrix/subagent-hooks.ts b/extensions/matrix/src/matrix/subagent-hooks.ts new file mode 100644 index 00000000000..8940b0217af --- /dev/null +++ b/extensions/matrix/src/matrix/subagent-hooks.ts @@ -0,0 +1,311 @@ +import { DEFAULT_ACCOUNT_ID } from "openclaw/plugin-sdk/account-id"; +import { + getSessionBindingService, + type SessionBindingRecord, +} from "openclaw/plugin-sdk/conversation-binding-runtime"; +import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import { findMatrixAccountConfig, resolveMatrixBaseConfig } from "./account-config.js"; +import { resolveMatrixTargetIdentity } from "./target-ids.js"; +import { + getMatrixThreadBindingManager, + listAllBindings, + listBindingsForAccount, + removeBindingRecord, + resolveBindingKey, +} from "./thread-bindings-shared.js"; + +type MatrixSubagentSpawningEvent = { + threadRequested: boolean; + requester?: { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; + }; + childSessionKey: string; + agentId: string; + label?: string; +}; + +type MatrixSubagentEndedEvent = { + targetSessionKey: string; + targetKind: string; + accountId?: string; + reason?: string; + sendFarewell?: boolean; +}; + +type MatrixSubagentDeliveryTargetEvent = { + childSessionKey: string; + requesterOrigin?: { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; + }; + expectsCompletionMessage: boolean; +}; + +type MatrixDeliveryOrigin = { + channel: "matrix"; + accountId: string; + to: string; + threadId?: string; +}; + +type SpawningResult = + | { + status: "ok"; + threadBindingReady?: boolean; + deliveryOrigin?: MatrixDeliveryOrigin; + } + | { status: "error"; error: string }; + +type DeliveryTargetResult = { + origin: MatrixDeliveryOrigin; +}; + +function summarizeError(err: unknown): string { + if (err instanceof Error) { + return err.message; + } + if (typeof err === "string") { + return err; + } + return "error"; +} + +function resolveThreadBindingFlags( + api: OpenClawPluginApi, + accountId?: string, +): { enabled: boolean; spawnSubagentSessions: boolean } { + const matrix = resolveMatrixBaseConfig(api.config); + const baseThreadBindings = matrix.threadBindings; + const accountThreadBindings = accountId + ? findMatrixAccountConfig(api.config, accountId)?.threadBindings + : undefined; + return { + enabled: + accountThreadBindings?.enabled ?? + baseThreadBindings?.enabled ?? + api.config.session?.threadBindings?.enabled ?? + true, + spawnSubagentSessions: + accountThreadBindings?.spawnSubagentSessions ?? + baseThreadBindings?.spawnSubagentSessions ?? + false, + }; +} + +function resolveMatrixBindingThreadId(binding: SessionBindingRecord): string | undefined { + const { conversationId, parentConversationId } = binding.conversation; + return parentConversationId && parentConversationId !== conversationId + ? conversationId + : undefined; +} + +function resolveMatrixBindingDeliveryOrigin( + binding: SessionBindingRecord, + fallbackAccountId: string, +): MatrixDeliveryOrigin { + const boundRoomId = + binding.conversation.parentConversationId ?? binding.conversation.conversationId; + const threadId = resolveMatrixBindingThreadId(binding); + return { + channel: "matrix", + accountId: binding.conversation.accountId ?? fallbackAccountId, + to: `room:${boundRoomId}`, + ...(threadId ? { threadId } : {}), + }; +} + +export async function handleMatrixSubagentSpawning( + api: OpenClawPluginApi, + event: MatrixSubagentSpawningEvent, +): Promise { + if (!event.threadRequested) { + return undefined; + } + const channel = event.requester?.channel?.trim().toLowerCase(); + if (channel !== "matrix") { + return undefined; + } + + // Normalize early so per-account config and manager lookup use the same id. + // Falls back to DEFAULT_ACCOUNT_ID so accounts.default.threadBindings.* is + // respected even when the requester omits accountId. + const accountId = normalizeOptionalString(event.requester?.accountId) || DEFAULT_ACCOUNT_ID; + const flags = resolveThreadBindingFlags(api, accountId); + + if (!flags.enabled) { + return { + status: "error", + error: + "Matrix thread bindings are disabled (set channels.matrix.threadBindings.enabled=true to override for this account, or session.threadBindings.enabled=true globally).", + } satisfies SpawningResult; + } + if (!flags.spawnSubagentSessions) { + return { + status: "error", + error: + "Matrix thread-bound subagent spawns are disabled for this account (set channels.matrix.threadBindings.spawnSubagentSessions=true to enable).", + }; + } + + // Resolve the raw Matrix room ID from the requester's `to` field + // (e.g. "room:!abc123:example.org" → "!abc123:example.org"). + const rawTo = normalizeOptionalString(event.requester?.to) ?? ""; + const matrixTarget = rawTo ? resolveMatrixTargetIdentity(rawTo) : null; + const roomId = matrixTarget?.kind === "room" ? matrixTarget.id : ""; + + if (!roomId) { + return { + status: "error", + error: + "Cannot create Matrix thread binding: no room target in spawn request (requester.to must be a Matrix room ID).", + }; + } + + // Verify the thread binding manager is running for this account. The manager + // holds the captured Matrix client the SessionBindingAdapter needs to send + // the intro message that bootstraps the thread. + const manager = getMatrixThreadBindingManager(accountId); + if (!manager) { + return { + status: "error", + error: `No Matrix thread binding manager available for account "${accountId}". Is the Matrix channel running?`, + }; + } + + try { + // placement="child" tells the Matrix SessionBindingAdapter to: + // 1. Send an intro message to the room, creating a new thread root event + // 2. Use the returned event ID as boundConversationId (the thread ID) + // 3. Register the binding record in the in-memory store and persist it + // + // We do NOT call setBindingRecord here — the adapter's bind() handles + // record creation, thread creation, and persistence atomically. + const binding = await getSessionBindingService().bind({ + targetSessionKey: event.childSessionKey, + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId, + conversationId: roomId, + }, + placement: "child", + metadata: { + agentId: event.agentId?.trim() || undefined, + label: normalizeOptionalString(event.label) || undefined, + boundBy: "system", + }, + }); + return { + status: "ok", + threadBindingReady: true, + deliveryOrigin: resolveMatrixBindingDeliveryOrigin(binding, accountId), + } satisfies SpawningResult; + } catch (err) { + return { + status: "error", + error: `Matrix thread bind failed: ${summarizeError(err)}`, + }; + } +} + +export async function handleMatrixSubagentEnded(event: MatrixSubagentEndedEvent): Promise { + const accountId = normalizeOptionalString(event.accountId) || undefined; + // Use the targeted account list when available; fall back to a full scan + // so bindings are cleaned up even when accountId is absent. + const candidates = accountId ? listBindingsForAccount(accountId) : listAllBindings(); + const matching = candidates.filter( + (entry) => entry.targetSessionKey === event.targetSessionKey && entry.targetKind === "subagent", + ); + const removedBindingKeys = new Set(); + if (event.sendFarewell) { + const bindingService = getSessionBindingService(); + const reason = normalizeOptionalString(event.reason) || "subagent-ended"; + for (const binding of matching) { + const bindingId = resolveBindingKey(binding); + const removed = await bindingService.unbind({ bindingId, reason }); + if (removed.some((entry) => entry.bindingId === bindingId)) { + removedBindingKeys.add(bindingId); + } + } + } + + const affectedAccountIds = new Set(); + for (const binding of matching) { + if (removedBindingKeys.has(resolveBindingKey(binding))) { + continue; + } + if (removeBindingRecord(binding)) { + affectedAccountIds.add(binding.accountId); + } + } + // Flush each affected account's manager so removals are persisted to disk. + for (const acctId of affectedAccountIds) { + const manager = getMatrixThreadBindingManager(acctId); + await manager?.persist(); + } +} + +export function handleMatrixSubagentDeliveryTarget( + event: MatrixSubagentDeliveryTargetEvent, +): DeliveryTargetResult | undefined { + if (!event.expectsCompletionMessage) { + return undefined; + } + const requesterChannel = event.requesterOrigin?.channel?.trim().toLowerCase(); + if (requesterChannel !== "matrix") { + return undefined; + } + + const requesterAccountId = normalizeOptionalString(event.requesterOrigin?.accountId); + const requesterThreadId = + event.requesterOrigin?.threadId != null && event.requesterOrigin.threadId !== "" + ? String(event.requesterOrigin.threadId).trim() + : ""; + + // Search the targeted account when available; otherwise scan all accounts. + const candidates = requesterAccountId + ? listBindingsForAccount(requesterAccountId) + : listAllBindings(); + const bindings = candidates.filter( + (entry) => entry.targetSessionKey === event.childSessionKey && entry.targetKind === "subagent", + ); + if (bindings.length === 0) { + return undefined; + } + + let binding: (typeof bindings)[number] | undefined; + if (requesterThreadId) { + binding = bindings.find( + (entry) => + entry.conversationId === requesterThreadId && + (!requesterAccountId || entry.accountId === requesterAccountId), + ); + } + if (!binding && bindings.length === 1) { + binding = bindings[0]; + } + if (!binding) { + return undefined; + } + + const roomId = binding.parentConversationId ?? binding.conversationId; + const threadId = + binding.parentConversationId && binding.parentConversationId !== binding.conversationId + ? binding.conversationId + : undefined; + + return { + origin: { + channel: "matrix", + accountId: binding.accountId, + to: `room:${roomId}`, + ...(threadId ? { threadId } : {}), + }, + }; +} diff --git a/extensions/matrix/src/matrix/thread-bindings-shared.ts b/extensions/matrix/src/matrix/thread-bindings-shared.ts index 6c63a731490..298f7757782 100644 --- a/extensions/matrix/src/matrix/thread-bindings-shared.ts +++ b/extensions/matrix/src/matrix/thread-bindings-shared.ts @@ -40,6 +40,7 @@ export type MatrixThreadBindingManager = { targetSessionKey: string; maxAgeMs: number; }) => MatrixThreadBindingRecord[]; + persist: () => Promise; stop: () => void; }; @@ -135,6 +136,10 @@ export function listBindingsForAccount(accountId: string): MatrixThreadBindingRe ); } +export function listAllBindings(): MatrixThreadBindingRecord[] { + return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()]; +} + export function getMatrixThreadBindingManagerEntry( accountId: string, ): MatrixThreadBindingManagerCacheEntry | null { diff --git a/extensions/matrix/src/matrix/thread-bindings.ts b/extensions/matrix/src/matrix/thread-bindings.ts index 397fc67db4e..6544967bdaa 100644 --- a/extensions/matrix/src/matrix/thread-bindings.ts +++ b/extensions/matrix/src/matrix/thread-bindings.ts @@ -292,6 +292,7 @@ export async function createMatrixThreadBindingManager(params: { accountId: params.accountId, getIdleTimeoutMs: () => defaults.idleTimeoutMs, getMaxAgeMs: () => defaults.maxAgeMs, + persist, getByConversation: ({ conversationId, parentConversationId }) => listBindingsForAccount(params.accountId).find((entry) => { if (entry.conversationId !== conversationId.trim()) { diff --git a/src/agents/subagent-spawn.runtime.ts b/src/agents/subagent-spawn.runtime.ts index 2dbfef61369..e5973519456 100644 --- a/src/agents/subagent-spawn.runtime.ts +++ b/src/agents/subagent-spawn.runtime.ts @@ -10,7 +10,12 @@ export { } from "../gateway/session-utils.js"; export { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; export { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; -export { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; +export { + mergeDeliveryContext, + normalizeDeliveryContext, +} from "../utils/delivery-context.shared.js"; +export { resolveConversationDeliveryTarget } from "../utils/delivery-context.js"; +export { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; export { resolveAgentConfig } from "./agent-scope.js"; export { AGENT_LANE_SUBAGENT } from "./lanes.js"; export { resolveSubagentSpawnModelSelection } from "./model-selection.js"; diff --git a/src/agents/subagent-spawn.test-helpers.ts b/src/agents/subagent-spawn.test-helpers.ts index c0fe1d426a0..e1e46470232 100644 --- a/src/agents/subagent-spawn.test-helpers.ts +++ b/src/agents/subagent-spawn.test-helpers.ts @@ -125,6 +125,22 @@ export async function loadSubagentSpawnModuleForTest(params: { cfg?: Record; sessionKey?: string; }) => { sandboxed: boolean }; + getSessionBindingService?: () => { + listBySession: (targetSessionKey: string) => Array<{ + status?: string; + conversation: { + channel: string; + accountId?: string; + conversationId: string; + parentConversationId?: string; + }; + }>; + }; + resolveConversationDeliveryTarget?: (params: { + channel?: string; + conversationId?: string | number; + parentConversationId?: string | number; + }) => { to?: string; threadId?: string }; workspaceDir?: string; sessionStorePath?: string; resetModules?: boolean; @@ -165,6 +181,22 @@ export async function loadSubagentSpawnModuleForTest(params: { isAdminOnlyMethod: (method: string) => method === "sessions.patch" || method === "sessions.delete", pruneLegacyStoreKeys: (...args: unknown[]) => params.pruneLegacyStoreKeysMock?.(...args), + getSessionBindingService: + params.getSessionBindingService ?? (() => ({ listBySession: () => [] })), + resolveConversationDeliveryTarget: + params.resolveConversationDeliveryTarget ?? + ((targetParams: { channel?: string; conversationId?: string | number }) => ({ + to: targetParams.conversationId + ? `channel:${String(targetParams.conversationId)}` + : undefined, + })), + mergeDeliveryContext: ( + primary?: Record, + fallback?: Record, + ) => ({ + ...fallback, + ...primary, + }), resolveGatewaySessionStoreTarget: (targetParams: { key: string }) => ({ agentId: "main", storePath: params.sessionStorePath ?? "/tmp/subagent-spawn-model-session.json", diff --git a/src/agents/subagent-spawn.thread-binding.test.ts b/src/agents/subagent-spawn.thread-binding.test.ts new file mode 100644 index 00000000000..21c07c7db31 --- /dev/null +++ b/src/agents/subagent-spawn.thread-binding.test.ts @@ -0,0 +1,183 @@ +import os from "node:os"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + createSubagentSpawnTestConfig, + installSessionStoreCaptureMock, + loadSubagentSpawnModuleForTest, +} from "./subagent-spawn.test-helpers.js"; +import { installAcceptedSubagentGatewayMock } from "./test-helpers/subagent-gateway.js"; + +const hoisted = vi.hoisted(() => ({ + callGatewayMock: vi.fn(), + updateSessionStoreMock: vi.fn(), + registerSubagentRunMock: vi.fn(), + emitSessionLifecycleEventMock: vi.fn(), + hookRunner: { + hasHooks: vi.fn(), + runSubagentSpawning: vi.fn(), + }, +})); + +describe("spawnSubagentDirect thread binding delivery", () => { + beforeEach(() => { + vi.resetModules(); + hoisted.callGatewayMock.mockReset(); + hoisted.updateSessionStoreMock.mockReset(); + hoisted.registerSubagentRunMock.mockReset(); + hoisted.emitSessionLifecycleEventMock.mockReset(); + hoisted.hookRunner.hasHooks.mockReset(); + hoisted.hookRunner.runSubagentSpawning.mockReset(); + installAcceptedSubagentGatewayMock(hoisted.callGatewayMock); + installSessionStoreCaptureMock(hoisted.updateSessionStoreMock); + }); + + it("seeds a thread-bound child session from the binding created during spawn", async () => { + hoisted.hookRunner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "subagent_spawning", + ); + hoisted.hookRunner.runSubagentSpawning.mockResolvedValue({ + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "sut", + to: "room:!room:example", + threadId: "$thread-root", + }, + }); + const { spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({ + callGatewayMock: hoisted.callGatewayMock, + loadConfig: () => + createSubagentSpawnTestConfig(os.tmpdir(), { + agents: { + defaults: { + workspace: os.tmpdir(), + }, + list: [{ id: "main", workspace: "/tmp/workspace-main" }], + }, + }), + updateSessionStoreMock: hoisted.updateSessionStoreMock, + registerSubagentRunMock: hoisted.registerSubagentRunMock, + emitSessionLifecycleEventMock: hoisted.emitSessionLifecycleEventMock, + hookRunner: hoisted.hookRunner, + resolveSubagentSpawnModelSelection: () => "openai-codex/gpt-5.4", + resolveSandboxRuntimeStatus: () => ({ sandboxed: false }), + }); + + const result = await spawnSubagentDirect( + { + task: "reply with a marker", + thread: true, + mode: "session", + }, + { + agentSessionKey: "agent:main:main", + agentChannel: "matrix", + agentAccountId: "sut", + agentTo: "room:!room:example", + }, + ); + + expect(result.status).toBe("accepted"); + const agentCall = hoisted.callGatewayMock.mock.calls.find( + ([call]) => (call as { method?: string }).method === "agent", + )?.[0] as { params?: Record } | undefined; + expect(agentCall?.params).toMatchObject({ + channel: "matrix", + accountId: "sut", + to: "room:!room:example", + threadId: "$thread-root", + deliver: true, + }); + expect(hoisted.registerSubagentRunMock).toHaveBeenCalledWith( + expect.objectContaining({ + requesterOrigin: { + channel: "matrix", + accountId: "sut", + to: "room:!room:example", + threadId: "$thread-root", + }, + expectsCompletionMessage: false, + spawnMode: "session", + }), + ); + }); + + it("keeps completion announcements when only a generic binding is available", async () => { + hoisted.hookRunner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "subagent_spawning", + ); + hoisted.hookRunner.runSubagentSpawning.mockResolvedValue({ + status: "ok", + threadBindingReady: true, + }); + const { spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({ + callGatewayMock: hoisted.callGatewayMock, + loadConfig: () => + createSubagentSpawnTestConfig(os.tmpdir(), { + agents: { + defaults: { + workspace: os.tmpdir(), + }, + list: [{ id: "main", workspace: "/tmp/workspace-main" }], + }, + }), + updateSessionStoreMock: hoisted.updateSessionStoreMock, + registerSubagentRunMock: hoisted.registerSubagentRunMock, + emitSessionLifecycleEventMock: hoisted.emitSessionLifecycleEventMock, + hookRunner: hoisted.hookRunner, + getSessionBindingService: () => ({ + listBySession: () => [ + { + status: "active", + conversation: { + channel: "feishu", + accountId: "work", + conversationId: "oc_dm_chat_1", + }, + }, + ], + }), + resolveConversationDeliveryTarget: () => ({ + to: "channel:oc_dm_chat_1", + }), + resolveSubagentSpawnModelSelection: () => "openai-codex/gpt-5.4", + resolveSandboxRuntimeStatus: () => ({ sandboxed: false }), + }); + + const result = await spawnSubagentDirect( + { + task: "reply with a marker", + thread: true, + mode: "session", + }, + { + agentSessionKey: "agent:main:main", + agentChannel: "matrix", + agentAccountId: "sut", + agentTo: "room:!parent:example", + }, + ); + + expect(result.status).toBe("accepted"); + const agentCall = hoisted.callGatewayMock.mock.calls.find( + ([call]) => (call as { method?: string }).method === "agent", + )?.[0] as { params?: Record } | undefined; + expect(agentCall?.params).toMatchObject({ + channel: "matrix", + accountId: "sut", + to: "room:!parent:example", + deliver: false, + }); + expect(hoisted.registerSubagentRunMock).toHaveBeenCalledWith( + expect.objectContaining({ + expectsCompletionMessage: true, + requesterOrigin: { + channel: "matrix", + accountId: "sut", + to: "room:!parent:example", + }, + }), + ); + }); +}); diff --git a/src/agents/subagent-spawn.ts b/src/agents/subagent-spawn.ts index 8a450706399..1c8b55bf673 100644 --- a/src/agents/subagent-spawn.ts +++ b/src/agents/subagent-spawn.ts @@ -7,6 +7,7 @@ import { normalizeLowercaseStringOrEmpty, normalizeOptionalString, } from "../shared/string-coerce.js"; +import type { DeliveryContext } from "../utils/delivery-context.types.js"; import type { BootstrapContextMode } from "./bootstrap-files.js"; import { mapToolContextToSpawnedRunMetadata, @@ -41,6 +42,7 @@ import { getGlobalHookRunner, loadConfig, mergeSessionEntry, + mergeDeliveryContext, normalizeDeliveryContext, pruneLegacyStoreKeys, resolveAgentConfig, @@ -295,7 +297,9 @@ async function ensureThreadBindingForSubagentSpawn(params: { to?: string; threadId?: string | number; }; -}): Promise<{ status: "ok" } | { status: "error"; error: string }> { +}): Promise< + { status: "ok"; deliveryOrigin?: DeliveryContext } | { status: "error"; error: string } +> { const hookRunner = params.hookRunner; if (!hookRunner?.hasHooks("subagent_spawning")) { return { @@ -334,7 +338,11 @@ async function ensureThreadBindingForSubagentSpawn(params: { "Unable to create or bind a thread for this subagent session. Session mode is unavailable for this target.", }; } - return { status: "ok" }; + const deliveryOrigin = normalizeDeliveryContext(result.deliveryOrigin); + return { + status: "ok", + ...(deliveryOrigin ? { deliveryOrigin } : {}), + }; } catch (err) { return { status: "error", @@ -343,6 +351,12 @@ async function ensureThreadBindingForSubagentSpawn(params: { } } +function hasRoutableDeliveryOrigin( + origin?: DeliveryContext, +): origin is DeliveryContext & { channel: string; to: string } { + return Boolean(origin?.channel && origin.to); +} + export async function spawnSubagentDirect( params: SpawnSubagentParams, ctx: SpawnSubagentContext, @@ -388,6 +402,7 @@ export async function spawnSubagentDirect( to: ctx.agentTo, threadId: ctx.agentThreadId, }); + let childSessionOrigin = requesterOrigin; const hookRunner = subagentSpawnDeps.getGlobalHookRunner(); const cfg = loadSubagentConfig(); @@ -400,6 +415,7 @@ export async function spawnSubagentDirect( }); let modelApplied = false; let threadBindingReady = false; + let hasBoundThreadDeliveryOrigin = false; const { mainKey, alias } = resolveMainSessionAlias(cfg); const requesterSessionKey = ctx.agentSessionKey; const requesterInternalKey = requesterSessionKey @@ -597,12 +613,15 @@ export async function spawnSubagentDirect( }; } threadBindingReady = true; + hasBoundThreadDeliveryOrigin = hasRoutableDeliveryOrigin(bindResult.deliveryOrigin); + childSessionOrigin = + mergeDeliveryContext(bindResult.deliveryOrigin, requesterOrigin) ?? childSessionOrigin; } const mountPathHint = sanitizeMountPathHint(params.attachMountPath); let childSystemPrompt = buildSubagentSystemPrompt({ requesterSessionKey, - requesterOrigin, + requesterOrigin: childSessionOrigin, childSessionKey, label: label || undefined, task, @@ -698,6 +717,11 @@ export async function spawnSubagentDirect( const childIdem = crypto.randomUUID(); let childRunId: string = childIdem; + const deliverInitialChildRunDirectly = + requestThreadBinding && spawnMode === "session" && hasBoundThreadDeliveryOrigin; + const shouldAnnounceCompletion = deliverInitialChildRunDirectly + ? false + : expectsCompletionMessage; try { const { spawnedBy: _spawnedBy, @@ -709,12 +733,13 @@ export async function spawnSubagentDirect( params: { message: childTaskMessage, sessionKey: childSessionKey, - channel: requesterOrigin?.channel, - to: requesterOrigin?.to ?? undefined, - accountId: requesterOrigin?.accountId ?? undefined, - threadId: requesterOrigin?.threadId != null ? String(requesterOrigin.threadId) : undefined, + channel: childSessionOrigin?.channel, + to: childSessionOrigin?.to ?? undefined, + accountId: childSessionOrigin?.accountId ?? undefined, + threadId: + childSessionOrigin?.threadId != null ? String(childSessionOrigin.threadId) : undefined, idempotencyKey: childIdem, - deliver: false, + deliver: deliverInitialChildRunDirectly, lane: AGENT_LANE_SUBAGENT, extraSystemPrompt: childSystemPrompt, thinking: thinkingOverride, @@ -754,7 +779,7 @@ export async function spawnSubagentDirect( targetKind: "subagent", reason: "spawn-failed", sendFarewell: true, - accountId: requesterOrigin?.accountId, + accountId: childSessionOrigin?.accountId, runId: childRunId, outcome: "error", error: "Session failed to start", @@ -802,7 +827,7 @@ export async function spawnSubagentDirect( childSessionKey, controllerSessionKey: requesterInternalKey, requesterSessionKey: requesterInternalKey, - requesterOrigin, + requesterOrigin: childSessionOrigin, requesterDisplayKey, task, cleanup, @@ -810,7 +835,7 @@ export async function spawnSubagentDirect( model: resolvedModel, workspaceDir: spawnedMetadata.workspaceDir, runTimeoutSeconds, - expectsCompletionMessage, + expectsCompletionMessage: shouldAnnounceCompletion, spawnMode, attachmentsDir: attachmentAbsDir, attachmentsRootDir: attachmentRootDir, diff --git a/src/cli/send-runtime/channel-outbound-send.test.ts b/src/cli/send-runtime/channel-outbound-send.test.ts index 684cae030e1..f37ae9502a4 100644 --- a/src/cli/send-runtime/channel-outbound-send.test.ts +++ b/src/cli/send-runtime/channel-outbound-send.test.ts @@ -86,6 +86,35 @@ describe("createChannelOutboundRuntimeSend", () => { ); }); + it("accepts plugin outbound thread and reply aliases", async () => { + const sendText = vi.fn(async () => ({ channel: "matrix", messageId: "$reply" })); + mocks.loadChannelOutboundAdapter.mockResolvedValue({ + sendText, + }); + + const { createChannelOutboundRuntimeSend } = await import("./channel-outbound-send.js"); + const runtimeSend = createChannelOutboundRuntimeSend({ + channelId: "matrix" as never, + unavailableMessage: "unavailable", + }); + + await runtimeSend.sendMessage("room:!ops:example.org", "hello thread", { + cfg: {}, + accountId: "sut", + replyToId: "$parent", + threadId: "$thread-root", + }); + + expect(sendText).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "sut", + replyToId: "$parent", + threadId: "$thread-root", + to: "room:!ops:example.org", + }), + ); + }); + it("falls back to sendText when media is present but sendMedia is unavailable", async () => { const sendText = vi.fn(async () => ({ channel: "whatsapp", messageId: "wa-3" })); mocks.loadChannelOutboundAdapter.mockResolvedValue({ diff --git a/src/cli/send-runtime/channel-outbound-send.ts b/src/cli/send-runtime/channel-outbound-send.ts index 4649cb28b77..57bbd4e9597 100644 --- a/src/cli/send-runtime/channel-outbound-send.ts +++ b/src/cli/send-runtime/channel-outbound-send.ts @@ -12,7 +12,9 @@ type RuntimeSendOpts = { mediaLocalRoots?: readonly string[]; mediaReadFile?: (filePath: string) => Promise; accountId?: string; + threadId?: string | number | null; messageThreadId?: string | number; + replyToId?: string | number | null; replyToMessageId?: string | number; silent?: boolean; forceDocument?: boolean; @@ -20,6 +22,15 @@ type RuntimeSendOpts = { gatewayClientScopes?: readonly string[]; }; +function resolveRuntimeThreadId(opts: RuntimeSendOpts): string | number | undefined { + return opts.messageThreadId ?? opts.threadId ?? undefined; +} + +function resolveRuntimeReplyToId(opts: RuntimeSendOpts): string | undefined { + const raw = opts.replyToMessageId ?? opts.replyToId; + return raw == null ? undefined : normalizeOptionalString(String(raw)); +} + export function createChannelOutboundRuntimeSend(params: { channelId: ChannelId; unavailableMessage: string; @@ -27,32 +38,9 @@ export function createChannelOutboundRuntimeSend(params: { return { sendMessage: async (to: string, text: string, opts: RuntimeSendOpts = {}) => { const outbound = await loadChannelOutboundAdapter(params.channelId); - const hasMedia = Boolean(opts.mediaUrl); - if (hasMedia && outbound?.sendMedia) { - return await outbound.sendMedia({ - cfg: opts.cfg ?? loadConfig(), - to, - text, - mediaUrl: opts.mediaUrl, - mediaAccess: opts.mediaAccess, - mediaLocalRoots: opts.mediaLocalRoots, - mediaReadFile: opts.mediaReadFile, - accountId: opts.accountId, - threadId: opts.messageThreadId, - replyToId: - opts.replyToMessageId == null - ? undefined - : normalizeOptionalString(String(opts.replyToMessageId)), - silent: opts.silent, - forceDocument: opts.forceDocument, - gifPlayback: opts.gifPlayback, - gatewayClientScopes: opts.gatewayClientScopes, - }); - } - if (!outbound?.sendText) { - throw new Error(params.unavailableMessage); - } - return await outbound.sendText({ + const threadId = resolveRuntimeThreadId(opts); + const replyToId = resolveRuntimeReplyToId(opts); + const buildContext = () => ({ cfg: opts.cfg ?? loadConfig(), to, text, @@ -61,16 +49,21 @@ export function createChannelOutboundRuntimeSend(params: { mediaLocalRoots: opts.mediaLocalRoots, mediaReadFile: opts.mediaReadFile, accountId: opts.accountId, - threadId: opts.messageThreadId, - replyToId: - opts.replyToMessageId == null - ? undefined - : normalizeOptionalString(String(opts.replyToMessageId)), + threadId, + replyToId, silent: opts.silent, forceDocument: opts.forceDocument, gifPlayback: opts.gifPlayback, gatewayClientScopes: opts.gatewayClientScopes, }); + const hasMedia = Boolean(opts.mediaUrl); + if (hasMedia && outbound?.sendMedia) { + return await outbound.sendMedia(buildContext()); + } + if (!outbound?.sendText) { + throw new Error(params.unavailableMessage); + } + return await outbound.sendText(buildContext()); }, }; } diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 8083c4664ae..bd59d20823c 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -437,7 +437,7 @@ describe("session binding service", () => { } }); - it("keeps the first live adapter authoritative until it unregisters", () => { + it("keeps the newest live adapter authoritative until it unregisters", () => { const firstBinding = { bindingId: "first-binding", targetSessionKey: "agent:main", @@ -457,17 +457,30 @@ describe("session binding service", () => { targetSessionKey === "agent:main" ? [firstBinding] : [], resolveByConversation: () => null, }; + const secondBinding = { + bindingId: "second-binding", + targetSessionKey: "agent:main", + targetKind: "session" as const, + conversation: { + channel: "demo-binding", + accountId: "default", + conversationId: "thread-2", + }, + status: "active" as const, + boundAt: 2, + }; const secondAdapter: SessionBindingAdapter = { channel: "Demo-Binding", accountId: "DEFAULT", - listBySession: () => [], + listBySession: (targetSessionKey) => + targetSessionKey === "agent:main" ? [secondBinding] : [], resolveByConversation: () => null, }; registerSessionBindingAdapter(firstAdapter); registerSessionBindingAdapter(secondAdapter); - expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]); + expect(getSessionBindingService().listBySession("agent:main")).toEqual([secondBinding]); unregisterSessionBindingAdapter({ channel: "demo-binding", @@ -529,13 +542,13 @@ describe("session binding service", () => { conversationId: "thread-1", }), }); - expect(firstBind).toHaveBeenCalledTimes(1); - expect(secondBind).not.toHaveBeenCalled(); + expect(firstBind).not.toHaveBeenCalled(); + expect(secondBind).toHaveBeenCalledTimes(1); - first.unregisterSessionBindingAdapter({ + second.unregisterSessionBindingAdapter({ channel: "demo-binding", accountId: "default", - adapter: firstAdapter, + adapter: secondAdapter, }); await expect( @@ -558,10 +571,10 @@ describe("session binding service", () => { expect(firstBind).toHaveBeenCalledTimes(1); expect(secondBind).toHaveBeenCalledTimes(1); - second.unregisterSessionBindingAdapter({ + first.unregisterSessionBindingAdapter({ channel: "demo-binding", accountId: "default", - adapter: secondAdapter, + adapter: firstAdapter, }); await expect( diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index 39d088942d0..0b520e56a89 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -135,7 +135,7 @@ const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap registrations[0]?.normalizedAdapter ?? null) + .map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null) .filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter)); } diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index a5186685f52..765c1f3a1f2 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -421,6 +421,12 @@ export type PluginHookSubagentSpawningResult = | { status: "ok"; threadBindingReady?: boolean; + deliveryOrigin?: { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; + }; } | { status: "error"; diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index b7d3e5f4d22..2389ab30fc7 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -258,9 +258,11 @@ export function createHookRunner( if (next.status === "error") { return next; } + const deliveryOrigin = acc?.deliveryOrigin ?? next.deliveryOrigin; return { status: "ok", threadBindingReady: Boolean(acc?.threadBindingReady || next.threadBindingReady), + ...(deliveryOrigin ? { deliveryOrigin } : {}), }; }; diff --git a/src/plugins/wired-hooks-subagent.test.ts b/src/plugins/wired-hooks-subagent.test.ts index 58dc5be433b..e51072cd8a9 100644 --- a/src/plugins/wired-hooks-subagent.test.ts +++ b/src/plugins/wired-hooks-subagent.test.ts @@ -2,7 +2,7 @@ * Test: subagent_spawning, subagent_delivery_target, subagent_spawned & subagent_ended hook wiring */ import { describe, expect, it, vi } from "vitest"; -import { createHookRunnerWithRegistry } from "./hooks.test-helpers.js"; +import { addStaticTestHooks, createHookRunnerWithRegistry } from "./hooks.test-helpers.js"; describe("subagent hook runner methods", () => { const baseRequester = { @@ -162,4 +162,66 @@ describe("subagent hook runner methods", () => { expect(runner.hasHooks("subagent_spawned")).toBe(false); expect(runner.hasHooks("subagent_ended")).toBe(false); }); + + it("runSubagentSpawning preserves higher-priority delivery origins", async () => { + const { registry, runner } = createHookRunnerWithRegistry([]); + addStaticTestHooks(registry, { + hookName: "subagent_spawning", + hooks: [ + { + pluginId: "high", + priority: 100, + result: { + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "ops", + to: "room:!high:example", + threadId: "$high", + }, + }, + }, + { + pluginId: "low", + priority: 10, + result: { + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "ops", + to: "room:!low:example", + threadId: "$low", + }, + }, + }, + ], + }); + + const result = await runner.runSubagentSpawning( + { + childSessionKey: "agent:main:subagent:child", + agentId: "main", + mode: "session", + requester: baseRequester, + threadRequested: true, + }, + { + childSessionKey: "agent:main:subagent:child", + requesterSessionKey: "agent:main:main", + }, + ); + + expect(result).toEqual({ + status: "ok", + threadBindingReady: true, + deliveryOrigin: { + channel: "matrix", + accountId: "ops", + to: "room:!high:example", + threadId: "$high", + }, + }); + }); });