diff --git a/CHANGELOG.md b/CHANGELOG.md index 36625a134e1..66cdaf15bb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai - Memory/sqlite-vec: emit the degraded sqlite-vec warning once per degraded episode instead of repeating it for every file write, while preserving the latch across safe-reindex rollback and resetting it when vector state is genuinely rebuilt. (#67898) Thanks @rubencu. - Reply/block streaming: preserve post-stream incomplete-turn error payloads after block streaming already emitted content, so users get the warning instead of silence. (#67991) Thanks @obviyus. - Telegram/streaming: clear the compaction replay guard after visible non-final boundaries so a post-tool assistant reply rotates to a fresh preview instead of editing the pre-compaction message. (#67993) Thanks @obviyus. +- Matrix: fix `sessions_spawn --thread` subagent session spawning — thread binding creation, cleanup on session end, and completion-message delivery target resolution now work end-to-end. (#67643) Thanks @eejohnso-ops. ## 2026.4.15 diff --git a/extensions/matrix/index.ts b/extensions/matrix/index.ts index 9dd62e49be0..e46b307e1b6 100644 --- a/extensions/matrix/index.ts +++ b/extensions/matrix/index.ts @@ -2,6 +2,15 @@ import { defineBundledChannelEntry } from "openclaw/plugin-sdk/channel-entry-con import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { registerMatrixCliMetadata } from "./cli-metadata.js"; +type MatrixSubagentHooksModule = typeof import("./src/matrix/subagent-hooks.js"); + +let matrixSubagentHooksPromise: Promise | null = null; + +function loadMatrixSubagentHooksModule() { + matrixSubagentHooksPromise ??= import("./src/matrix/subagent-hooks.js"); + return matrixSubagentHooksPromise; +} + export default defineBundledChannelEntry({ id: "matrix", name: "Matrix", @@ -47,5 +56,18 @@ export default defineBundledChannelEntry({ const { handleVerificationStatus } = await import("./plugin-entry.handlers.runtime.js"); await handleVerificationStatus(ctx); }); + + api.on("subagent_spawning", async (event) => { + const { handleMatrixSubagentSpawning } = await loadMatrixSubagentHooksModule(); + return await handleMatrixSubagentSpawning(api, event); + }); + api.on("subagent_ended", async (event) => { + const { handleMatrixSubagentEnded } = await loadMatrixSubagentHooksModule(); + await handleMatrixSubagentEnded(event); + }); + api.on("subagent_delivery_target", async (event) => { + const { handleMatrixSubagentDeliveryTarget } = await loadMatrixSubagentHooksModule(); + return handleMatrixSubagentDeliveryTarget(event); + }); }, }); diff --git a/extensions/matrix/src/matrix/subagent-hooks.test.ts b/extensions/matrix/src/matrix/subagent-hooks.test.ts new file mode 100644 index 00000000000..41cb50f6030 --- /dev/null +++ b/extensions/matrix/src/matrix/subagent-hooks.test.ts @@ -0,0 +1,568 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +// Hoisted stubs referenced in vi.mock factories below +const bindMock = vi.hoisted(() => vi.fn()); +const getManagerMock = vi.hoisted(() => vi.fn()); +const listAllBindingsMock = vi.hoisted(() => vi.fn((): any[] => [])); +const listBindingsForAccountMock = vi.hoisted(() => vi.fn((): any[] => [])); +const removeBindingRecordMock = vi.hoisted(() => vi.fn(() => false)); +const resolveMatrixBaseConfigMock = vi.hoisted(() => vi.fn((): any => ({}))); +const findMatrixAccountConfigMock = vi.hoisted(() => vi.fn((): any => undefined)); + +vi.mock("openclaw/plugin-sdk/conversation-binding-runtime", () => ({ + getSessionBindingService: () => ({ bind: bindMock }), +})); + +vi.mock("./account-config.js", () => ({ + resolveMatrixBaseConfig: resolveMatrixBaseConfigMock, + findMatrixAccountConfig: findMatrixAccountConfigMock, +})); + +vi.mock("./thread-bindings-shared.js", () => ({ + getMatrixThreadBindingManager: getManagerMock, + listAllBindings: listAllBindingsMock, + listBindingsForAccount: listBindingsForAccountMock, + removeBindingRecord: removeBindingRecordMock, +})); + +import { + handleMatrixSubagentDeliveryTarget, + handleMatrixSubagentEnded, + handleMatrixSubagentSpawning, +} from "./subagent-hooks.js"; + +// A minimal fake api — only config is used by these hooks +const fakeApi = { config: {} } as never; + +function makeSpawnEvent( + overrides: Partial<{ + threadRequested: boolean; + channel: string; + accountId: string; + to: string; + childSessionKey: string; + agentId: string; + label: string; + }> = {}, +) { + return { + threadRequested: overrides.threadRequested ?? true, + requester: { + channel: overrides.channel ?? "matrix", + accountId: overrides.accountId ?? "default", + to: overrides.to ?? "room:!room123:example.org", + }, + childSessionKey: overrides.childSessionKey ?? "agent:default:subagent:child", + agentId: overrides.agentId ?? "worker", + label: overrides.label, + }; +} + +describe("handleMatrixSubagentSpawning", () => { + beforeEach(() => { + bindMock.mockReset(); + getManagerMock.mockReset(); + resolveMatrixBaseConfigMock.mockReset(); + findMatrixAccountConfigMock.mockReset(); + // Default: bindings enabled, spawn enabled + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: true }, + }); + findMatrixAccountConfigMock.mockReturnValue(undefined); + // Default: manager exists + getManagerMock.mockReturnValue({ persist: vi.fn() }); + // Default: bind resolves ok + bindMock.mockResolvedValue({ conversation: {} }); + }); + + it("returns undefined when threadRequested is false", async () => { + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ threadRequested: false }), + ); + expect(result).toBeUndefined(); + expect(bindMock).not.toHaveBeenCalled(); + }); + + it("returns undefined when channel is not matrix", async () => { + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ channel: "slack" }), + ); + expect(result).toBeUndefined(); + expect(bindMock).not.toHaveBeenCalled(); + }); + + it("returns undefined when channel has mixed casing but is still matrix", async () => { + // channel.trim().toLowerCase() must equal "matrix" + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ channel: " Matrix " }), + ); + expect(result).not.toBeUndefined(); + // It proceeds (no early-return for non-matrix) + }); + + it("returns error when thread bindings are disabled", async () => { + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: false, spawnSubagentSessions: true }, + }); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("thread bindings are disabled"), + }), + ); + }); + + it("returns error when spawnSubagentSessions is false", async () => { + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: false }, + }); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("spawnSubagentSessions"), + }), + ); + }); + + it("returns error when spawnSubagentSessions defaults to false (no config)", async () => { + resolveMatrixBaseConfigMock.mockReturnValue({}); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("spawnSubagentSessions"), + }), + ); + }); + + it("returns error when requester.to has no room target", async () => { + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ to: "@user:example.org" }), + ); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("no room target"), + }), + ); + }); + + it("returns error when requester.to is empty", async () => { + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ to: "" })); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("no room target"), + }), + ); + }); + + it("returns error when no binding manager is available for the account", async () => { + getManagerMock.mockReturnValue(null); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("No Matrix thread binding manager"), + }), + ); + }); + + it("calls bind with the resolved room id and returns ok", async () => { + bindMock.mockResolvedValue({ conversation: {} }); + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ + accountId: "ops", + to: "room:!roomAbc:technerik.com", + childSessionKey: "agent:ops:subagent:worker", + agentId: "builder", + label: "Build Agent", + }), + ); + + expect(bindMock).toHaveBeenCalledWith( + expect.objectContaining({ + targetSessionKey: "agent:ops:subagent:worker", + targetKind: "subagent", + conversation: expect.objectContaining({ + channel: "matrix", + accountId: "ops", + conversationId: "!roomAbc:technerik.com", + }), + placement: "child", + metadata: expect.objectContaining({ + agentId: "builder", + label: "Build Agent", + }), + }), + ); + expect(result).toEqual({ status: "ok", threadBindingReady: true }); + }); + + it("uses 'default' as accountId when requester.accountId is absent", async () => { + bindMock.mockResolvedValue({ conversation: {} }); + await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ accountId: undefined as never })); + expect(getManagerMock).toHaveBeenCalledWith("default"); + expect(bindMock).toHaveBeenCalledWith( + expect.objectContaining({ + conversation: expect.objectContaining({ accountId: "default" }), + }), + ); + }); + + it("returns error when bind() throws", async () => { + bindMock.mockRejectedValue(new Error("provider auth failed")); + const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent()); + expect(result).toEqual( + expect.objectContaining({ + status: "error", + error: expect.stringContaining("provider auth failed"), + }), + ); + }); + + it("respects per-account threadBindings override over base config", async () => { + // Base says spawnSubagentSessions=false; account override says true + resolveMatrixBaseConfigMock.mockReturnValue({ + threadBindings: { enabled: true, spawnSubagentSessions: false }, + }); + findMatrixAccountConfigMock.mockReturnValue({ + threadBindings: { spawnSubagentSessions: true }, + }); + bindMock.mockResolvedValue({ conversation: {} }); + + const result = await handleMatrixSubagentSpawning( + fakeApi, + makeSpawnEvent({ accountId: "forge" }), + ); + expect(result).toEqual({ status: "ok", threadBindingReady: true }); + }); +}); + +describe("handleMatrixSubagentEnded", () => { + const mockManager = { persist: vi.fn() }; + + beforeEach(() => { + getManagerMock.mockReset(); + listAllBindingsMock.mockReset(); + listBindingsForAccountMock.mockReset(); + removeBindingRecordMock.mockReset(); + mockManager.persist.mockReset(); + }); + + it("does nothing when no matching bindings exist", async () => { + listBindingsForAccountMock.mockReturnValue([]); + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + }); + expect(getManagerMock).not.toHaveBeenCalled(); + }); + + it("removes matching bindings and calls persist on the manager", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + removeBindingRecordMock.mockReturnValue(true); + getManagerMock.mockReturnValue(mockManager); + mockManager.persist.mockResolvedValue(undefined); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + }); + + expect(removeBindingRecordMock).toHaveBeenCalledWith(binding); + expect(getManagerMock).toHaveBeenCalledWith("ops"); + expect(mockManager.persist).toHaveBeenCalled(); + }); + + it("skips persist when removeBindingRecord returns false (binding not found in store)", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:orphan", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + removeBindingRecordMock.mockReturnValue(false); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:orphan", + targetKind: "subagent", + accountId: "ops", + }); + + expect(getManagerMock).not.toHaveBeenCalled(); + }); + + it("falls back to listAllBindings when accountId is absent", async () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listAllBindingsMock.mockReturnValue([binding]); + removeBindingRecordMock.mockReturnValue(true); + getManagerMock.mockReturnValue(mockManager); + mockManager.persist.mockResolvedValue(undefined); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + }); + + expect(listAllBindingsMock).toHaveBeenCalled(); + expect(listBindingsForAccountMock).not.toHaveBeenCalled(); + expect(mockManager.persist).toHaveBeenCalled(); + }); + + it("does not double-persist when multiple bindings share the same account", async () => { + const mkBinding = (conversationId: string) => ({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId, + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }); + listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]); + removeBindingRecordMock.mockReturnValue(true); + getManagerMock.mockReturnValue(mockManager); + mockManager.persist.mockResolvedValue(undefined); + + await handleMatrixSubagentEnded({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + }); + + // persist must be called exactly once per unique accountId, not once per binding + expect(mockManager.persist).toHaveBeenCalledTimes(1); + }); +}); + +describe("handleMatrixSubagentDeliveryTarget", () => { + beforeEach(() => { + listAllBindingsMock.mockReset(); + listBindingsForAccountMock.mockReset(); + }); + + it("returns undefined when expectsCompletionMessage is false", () => { + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: false, + }); + expect(result).toBeUndefined(); + }); + + it("returns undefined when requester channel is not matrix", () => { + listBindingsForAccountMock.mockReturnValue([]); + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "slack", accountId: "ops" }, + expectsCompletionMessage: true, + }); + expect(result).toBeUndefined(); + }); + + it("returns undefined when no bindings match the child session key", () => { + listBindingsForAccountMock.mockReturnValue([ + { + targetSessionKey: "agent:ops:subagent:OTHER", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }, + ]); + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: true, + }); + expect(result).toBeUndefined(); + }); + + it("returns origin with threadId when binding has a distinct parentConversationId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread123", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$thread123" }, + expectsCompletionMessage: true, + }); + + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + threadId: "$thread123", + }, + }); + }); + + it("returns origin without threadId when conversationId equals parentConversationId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "!room:example", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: true, + }); + + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + }, + }); + expect(result?.origin).not.toHaveProperty("threadId"); + }); + + it("returns origin without threadId when binding has no parentConversationId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops" }, + expectsCompletionMessage: true, + }); + + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + }, + }); + }); + + it("falls back to the single binding when requesterOrigin threadId does not match any binding", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread123", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listBindingsForAccountMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$threadOTHER" }, + expectsCompletionMessage: true, + }); + + // No threadId match, but single binding → falls back to it + expect(result).toEqual({ + origin: { + channel: "matrix", + accountId: "ops", + to: "room:!room:example", + threadId: "$thread123", + }, + }); + }); + + it("returns undefined when multiple bindings exist and threadId matches none", () => { + const mkBinding = (threadId: string) => ({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: threadId, + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }); + listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$tNONE" }, + expectsCompletionMessage: true, + }); + + expect(result).toBeUndefined(); + }); + + it("uses listAllBindings when requesterOrigin has no accountId", () => { + const binding = { + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + accountId: "ops", + conversationId: "$thread123", + parentConversationId: "!room:example", + boundAt: 0, + lastActivityAt: 0, + }; + listAllBindingsMock.mockReturnValue([binding]); + + const result = handleMatrixSubagentDeliveryTarget({ + childSessionKey: "agent:ops:subagent:child", + requesterOrigin: { channel: "matrix" }, + expectsCompletionMessage: true, + }); + + expect(listAllBindingsMock).toHaveBeenCalled(); + expect(listBindingsForAccountMock).not.toHaveBeenCalled(); + expect(result).toBeDefined(); + }); +}); diff --git a/extensions/matrix/src/matrix/subagent-hooks.ts b/extensions/matrix/src/matrix/subagent-hooks.ts new file mode 100644 index 00000000000..075c1b4f014 --- /dev/null +++ b/extensions/matrix/src/matrix/subagent-hooks.ts @@ -0,0 +1,256 @@ +import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-binding-runtime"; +import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import { findMatrixAccountConfig, resolveMatrixBaseConfig } from "./account-config.js"; +import { resolveMatrixTargetIdentity } from "./target-ids.js"; +import { + getMatrixThreadBindingManager, + listAllBindings, + listBindingsForAccount, + removeBindingRecord, +} from "./thread-bindings-shared.js"; + +type MatrixSubagentSpawningEvent = { + threadRequested: boolean; + requester?: { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; + }; + childSessionKey: string; + agentId: string; + label?: string; +}; + +type MatrixSubagentEndedEvent = { + targetSessionKey: string; + targetKind: string; + accountId?: string; +}; + +type MatrixSubagentDeliveryTargetEvent = { + childSessionKey: string; + requesterOrigin?: { + channel?: string; + accountId?: string; + to?: string; + threadId?: string | number; + }; + expectsCompletionMessage: boolean; +}; + +type SpawningResult = + | { status: "ok"; threadBindingReady?: boolean } + | { status: "error"; error: string }; + +type DeliveryTargetResult = { + origin: { + channel: string; + accountId: string; + to: string; + threadId?: string; + }; +}; + +function summarizeError(err: unknown): string { + if (err instanceof Error) { + return err.message; + } + if (typeof err === "string") { + return err; + } + return "error"; +} + +function resolveThreadBindingFlags( + api: OpenClawPluginApi, + accountId?: string, +): { enabled: boolean; spawnSubagentSessions: boolean } { + const matrix = resolveMatrixBaseConfig(api.config); + const baseThreadBindings = matrix.threadBindings; + const accountThreadBindings = accountId + ? findMatrixAccountConfig(api.config, accountId)?.threadBindings + : undefined; + return { + enabled: + accountThreadBindings?.enabled ?? + baseThreadBindings?.enabled ?? + api.config.session?.threadBindings?.enabled ?? + true, + spawnSubagentSessions: + accountThreadBindings?.spawnSubagentSessions ?? + baseThreadBindings?.spawnSubagentSessions ?? + false, + }; +} + +export async function handleMatrixSubagentSpawning( + api: OpenClawPluginApi, + event: MatrixSubagentSpawningEvent, +): Promise { + if (!event.threadRequested) { + return undefined; + } + const channel = event.requester?.channel?.trim().toLowerCase(); + if (channel !== "matrix") { + return undefined; + } + + const accountId = normalizeOptionalString(event.requester?.accountId) || undefined; + const flags = resolveThreadBindingFlags(api, accountId); + + if (!flags.enabled) { + return { + status: "error", + error: + "Matrix thread bindings are disabled (set channels.matrix.threadBindings.enabled=true to override for this account, or session.threadBindings.enabled=true globally).", + }; + } + if (!flags.spawnSubagentSessions) { + return { + status: "error", + error: + "Matrix thread-bound subagent spawns are disabled for this account (set channels.matrix.threadBindings.spawnSubagentSessions=true to enable).", + }; + } + + // Resolve the raw Matrix room ID from the requester's `to` field + // (e.g. "room:!abc123:example.org" → "!abc123:example.org"). + const rawTo = normalizeOptionalString(event.requester?.to) ?? ""; + const matrixTarget = rawTo ? resolveMatrixTargetIdentity(rawTo) : null; + const roomId = matrixTarget?.kind === "room" ? matrixTarget.id : ""; + + if (!roomId) { + return { + status: "error", + error: + "Cannot create Matrix thread binding: no room target in spawn request (requester.to must be a Matrix room ID).", + }; + } + + const resolvedAccountId = accountId || "default"; + + // Verify the thread binding manager is running for this account. The manager + // holds the captured Matrix client the SessionBindingAdapter needs to send + // the intro message that bootstraps the thread. + const manager = getMatrixThreadBindingManager(resolvedAccountId); + if (!manager) { + return { + status: "error", + error: `No Matrix thread binding manager available for account "${resolvedAccountId}". Is the Matrix channel running?`, + }; + } + + try { + // placement="child" tells the Matrix SessionBindingAdapter to: + // 1. Send an intro message to the room, creating a new thread root event + // 2. Use the returned event ID as boundConversationId (the thread ID) + // 3. Register the binding record in the in-memory store and persist it + // + // We do NOT call setBindingRecord here — the adapter's bind() handles + // record creation, thread creation, and persistence atomically. + await getSessionBindingService().bind({ + targetSessionKey: event.childSessionKey, + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: resolvedAccountId, + conversationId: roomId, + }, + placement: "child", + metadata: { + agentId: event.agentId?.trim() || undefined, + label: normalizeOptionalString(event.label) || undefined, + boundBy: "system", + }, + }); + } catch (err) { + return { + status: "error", + error: `Matrix thread bind failed: ${summarizeError(err)}`, + }; + } + + return { status: "ok", threadBindingReady: true }; +} + +export async function handleMatrixSubagentEnded(event: MatrixSubagentEndedEvent): Promise { + const accountId = normalizeOptionalString(event.accountId) || undefined; + // Use the targeted account list when available; fall back to a full scan + // so bindings are cleaned up even when accountId is absent. + const candidates = accountId ? listBindingsForAccount(accountId) : listAllBindings(); + const matching = candidates.filter( + (entry) => entry.targetSessionKey === event.targetSessionKey && entry.targetKind === "subagent", + ); + const affectedAccountIds = new Set(); + for (const binding of matching) { + if (removeBindingRecord(binding)) { + affectedAccountIds.add(binding.accountId); + } + } + // Flush each affected account's manager so removals are persisted to disk. + for (const acctId of affectedAccountIds) { + const manager = getMatrixThreadBindingManager(acctId); + await manager?.persist(); + } +} + +export function handleMatrixSubagentDeliveryTarget( + event: MatrixSubagentDeliveryTargetEvent, +): DeliveryTargetResult | undefined { + if (!event.expectsCompletionMessage) { + return undefined; + } + const requesterChannel = event.requesterOrigin?.channel?.trim().toLowerCase(); + if (requesterChannel !== "matrix") { + return undefined; + } + + const requesterAccountId = normalizeOptionalString(event.requesterOrigin?.accountId); + const requesterThreadId = + event.requesterOrigin?.threadId != null && event.requesterOrigin.threadId !== "" + ? String(event.requesterOrigin.threadId).trim() + : ""; + + // Search the targeted account when available; otherwise scan all accounts. + const candidates = requesterAccountId + ? listBindingsForAccount(requesterAccountId) + : listAllBindings(); + const bindings = candidates.filter( + (entry) => entry.targetSessionKey === event.childSessionKey && entry.targetKind === "subagent", + ); + if (bindings.length === 0) { + return undefined; + } + + let binding: (typeof bindings)[number] | undefined; + if (requesterThreadId) { + binding = bindings.find( + (entry) => + entry.conversationId === requesterThreadId && + (!requesterAccountId || entry.accountId === requesterAccountId), + ); + } + if (!binding && bindings.length === 1) { + binding = bindings[0]; + } + if (!binding) { + return undefined; + } + + const roomId = binding.parentConversationId ?? binding.conversationId; + const threadId = + binding.parentConversationId && binding.parentConversationId !== binding.conversationId + ? binding.conversationId + : undefined; + + return { + origin: { + channel: "matrix", + accountId: binding.accountId, + to: `room:${roomId}`, + ...(threadId ? { threadId } : {}), + }, + }; +} diff --git a/extensions/matrix/src/matrix/thread-bindings-shared.ts b/extensions/matrix/src/matrix/thread-bindings-shared.ts index 6c63a731490..298f7757782 100644 --- a/extensions/matrix/src/matrix/thread-bindings-shared.ts +++ b/extensions/matrix/src/matrix/thread-bindings-shared.ts @@ -40,6 +40,7 @@ export type MatrixThreadBindingManager = { targetSessionKey: string; maxAgeMs: number; }) => MatrixThreadBindingRecord[]; + persist: () => Promise; stop: () => void; }; @@ -135,6 +136,10 @@ export function listBindingsForAccount(accountId: string): MatrixThreadBindingRe ); } +export function listAllBindings(): MatrixThreadBindingRecord[] { + return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()]; +} + export function getMatrixThreadBindingManagerEntry( accountId: string, ): MatrixThreadBindingManagerCacheEntry | null { diff --git a/extensions/matrix/src/matrix/thread-bindings.ts b/extensions/matrix/src/matrix/thread-bindings.ts index 397fc67db4e..6544967bdaa 100644 --- a/extensions/matrix/src/matrix/thread-bindings.ts +++ b/extensions/matrix/src/matrix/thread-bindings.ts @@ -292,6 +292,7 @@ export async function createMatrixThreadBindingManager(params: { accountId: params.accountId, getIdleTimeoutMs: () => defaults.idleTimeoutMs, getMaxAgeMs: () => defaults.maxAgeMs, + persist, getByConversation: ({ conversationId, parentConversationId }) => listBindingsForAccount(params.accountId).find((entry) => { if (entry.conversationId !== conversationId.trim()) {