diff --git a/src/agents/acp-spawn.test.ts b/src/agents/acp-spawn.test.ts index 9d651803b6a..18c75a673be 100644 --- a/src/agents/acp-spawn.test.ts +++ b/src/agents/acp-spawn.test.ts @@ -386,6 +386,76 @@ describe("spawnAcpDirect", () => { expect(transcriptCalls[1]?.threadId).toBe("child-thread"); }); + it("spawns Matrix thread-bound ACP sessions from top-level room targets", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + channels: { + ...hoisted.state.cfg.channels, + matrix: { + threadBindings: { + enabled: true, + spawnAcpSessions: true, + }, + }, + }, + }; + hoisted.sessionBindingBindMock.mockImplementationOnce( + async (input: { + targetSessionKey: string; + conversation: { accountId: string; conversationId: string; parentConversationId?: string }; + metadata?: Record; + }) => + createSessionBinding({ + targetSessionKey: input.targetSessionKey, + conversation: { + channel: "matrix", + accountId: input.conversation.accountId, + conversationId: "child-thread", + parentConversationId: input.conversation.parentConversationId ?? "!room:example", + }, + metadata: { + boundBy: + typeof input.metadata?.boundBy === "string" ? input.metadata.boundBy : "system", + agentId: "codex", + webhookId: "wh-1", + }, + }), + ); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + mode: "session", + thread: true, + }, + { + agentSessionKey: "agent:main:matrix:channel:!room:example", + agentChannel: "matrix", + agentAccountId: "default", + agentTo: "room:!room:example", + }, + ); + + expect(result.status).toBe("accepted"); + expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith( + expect.objectContaining({ + placement: "child", + conversation: expect.objectContaining({ + channel: "matrix", + accountId: "default", + conversationId: "!room:example", + }), + }), + ); + const agentCall = hoisted.callGatewayMock.mock.calls + .map((call: unknown[]) => call[0] as { method?: string; params?: Record }) + .find((request) => request.method === "agent"); + expect(agentCall?.params?.channel).toBe("matrix"); + expect(agentCall?.params?.to).toBe("room:child-thread"); + expect(agentCall?.params?.threadId).toBe("child-thread"); + }); + it("does not inline delivery for fresh oneshot ACP runs", async () => { const result = await spawnAcpDirect( { diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index 9d68a234aea..b35a9be923b 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -41,7 +41,11 @@ import { normalizeAgentId, parseAgentSessionKey, } from "../routing/session-key.js"; -import { deliveryContextFromSession, normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { + deliveryContextFromSession, + formatConversationTarget, + normalizeDeliveryContext, +} from "../utils/delivery-context.js"; import { type AcpSpawnParentRelayHandle, resolveAcpSpawnStreamLogPath, @@ -666,9 +670,16 @@ export async function spawnAcpDirect( const fallbackThreadId = fallbackThreadIdRaw != null ? String(fallbackThreadIdRaw).trim() || undefined : undefined; const deliveryThreadId = boundThreadId ?? fallbackThreadId; - const inferredDeliveryTo = boundThreadId - ? `channel:${boundThreadId}` - : requesterOrigin?.to?.trim() || (deliveryThreadId ? `channel:${deliveryThreadId}` : undefined); + const inferredDeliveryTo = + formatConversationTarget({ + channel: requesterOrigin?.channel ?? binding?.conversation.channel, + conversationId: boundThreadId, + }) ?? + requesterOrigin?.to?.trim() ?? + formatConversationTarget({ + channel: requesterOrigin?.channel, + conversationId: deliveryThreadId, + }); const hasDeliveryTarget = Boolean(requesterOrigin?.channel && inferredDeliveryTo); // Fresh one-shot ACP runs should bootstrap the worker first, then let higher layers // decide how to relay status. Inline delivery is reserved for thread-bound sessions. diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 2a74dab1ef9..583eb117f0a 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -4,6 +4,8 @@ import { __testing as sessionBindingServiceTesting, registerSessionBindingAdapter, } from "../infra/outbound/session-binding-service.js"; +import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createTestRegistry } from "../test-utils/channel-plugins.js"; type AgentCallRequest = { method?: string; params?: Record }; type RequesterResolution = { @@ -173,6 +175,7 @@ vi.mock("../config/config.js", async (importOriginal) => { describe("subagent announce formatting", () => { let previousFastTestEnv: string | undefined; let runSubagentAnnounceFlow: (typeof import("./subagent-announce.js"))["runSubagentAnnounceFlow"]; + let matrixPlugin: (typeof import("../../extensions/matrix/src/channel.js"))["matrixPlugin"]; beforeAll(async () => { // Set FAST_TEST_MODE before importing the module to ensure the module-level @@ -181,6 +184,7 @@ describe("subagent announce formatting", () => { // See: https://github.com/openclaw/openclaw/issues/31298 previousFastTestEnv = process.env.OPENCLAW_TEST_FAST; process.env.OPENCLAW_TEST_FAST = "1"; + ({ matrixPlugin } = await import("../../extensions/matrix/src/channel.js")); ({ runSubagentAnnounceFlow } = await import("./subagent-announce.js")); }); @@ -232,6 +236,9 @@ describe("subagent announce formatting", () => { chatHistoryMock.mockReset().mockResolvedValue({ messages: [] }); sessionStore = {}; sessionBindingServiceTesting.resetSessionBindingAdaptersForTests(); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "matrix", plugin: matrixPlugin, source: "test" }]), + ); configOverride = { session: { mainKey: "main", @@ -835,6 +842,64 @@ describe("subagent announce formatting", () => { expect(directTargets).not.toContain("channel:main-parent-channel"); }); + it("routes Matrix bound completion delivery to room targets", async () => { + sessionStore = { + "agent:main:subagent:matrix-child": { + sessionId: "child-session-matrix", + }, + "agent:main:main": { + sessionId: "requester-session-matrix", + }, + }; + chatHistoryMock.mockResolvedValueOnce({ + messages: [{ role: "assistant", content: [{ type: "text", text: "matrix bound answer" }] }], + }); + subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:main" ? 1 : 0, + ); + registerSessionBindingAdapter({ + channel: "matrix", + accountId: "acct-matrix", + listBySession: (targetSessionKey: string) => + targetSessionKey === "agent:main:subagent:matrix-child" + ? [ + { + bindingId: "matrix:acct-matrix:$thread-bound-1", + targetSessionKey, + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: "acct-matrix", + conversationId: "$thread-bound-1", + parentConversationId: "!room:example", + }, + status: "active", + boundAt: Date.now(), + }, + ] + : [], + resolveByConversation: () => null, + }); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:matrix-child", + childRunId: "run-session-bound-matrix", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + requesterOrigin: { channel: "matrix", to: "room:!room:example", accountId: "acct-matrix" }, + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + spawnMode: "session", + }); + + expect(didAnnounce).toBe(true); + expect(sendSpy).not.toHaveBeenCalled(); + expect(agentSpy).toHaveBeenCalledTimes(1); + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("matrix"); + expect(call?.params?.to).toBe("room:$thread-bound-1"); + }); + it("includes completion status details for error and timeout outcomes", async () => { const cases = [ { diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 5070b204392..eea8b5156e3 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -10,6 +10,7 @@ import { } from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; import { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js"; +import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; import type { ConversationRef } from "../infra/outbound/session-binding-service.js"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js"; @@ -19,6 +20,7 @@ import { extractTextFromChatContent } from "../shared/chat-content.js"; import { type DeliveryContext, deliveryContextFromSession, + formatConversationTarget, mergeDeliveryContext, normalizeDeliveryContext, } from "../utils/delivery-context.js"; @@ -537,7 +539,11 @@ async function resolveSubagentCompletionOrigin(params: { ? String(requesterOrigin.threadId).trim() : undefined; const conversationId = - threadId || (to?.startsWith("channel:") ? to.slice("channel:".length) : ""); + threadId || + resolveConversationIdFromTargets({ + targets: [to], + }) || + ""; const requesterConversation: ConversationRef | undefined = channel && conversationId ? { channel, accountId, conversationId } : undefined; @@ -552,7 +558,10 @@ async function resolveSubagentCompletionOrigin(params: { { channel: route.binding.conversation.channel, accountId: route.binding.conversation.accountId, - to: `channel:${route.binding.conversation.conversationId}`, + to: formatConversationTarget({ + channel: route.binding.conversation.channel, + conversationId: route.binding.conversation.conversationId, + }), threadId: requesterOrigin?.threadId != null && requesterOrigin.threadId !== "" ? String(requesterOrigin.threadId) diff --git a/src/infra/outbound/conversation-id.test.ts b/src/infra/outbound/conversation-id.test.ts index b35c8e2e4a1..985cb40c162 100644 --- a/src/infra/outbound/conversation-id.test.ts +++ b/src/infra/outbound/conversation-id.test.ts @@ -17,6 +17,20 @@ describe("resolveConversationIdFromTargets", () => { expect(resolved).toBe("987654321"); }); + it("extracts room ids from room: targets", () => { + const resolved = resolveConversationIdFromTargets({ + targets: ["room:!room:example"], + }); + expect(resolved).toBe("!room:example"); + }); + + it("extracts room ids from matrix-scoped room targets", () => { + const resolved = resolveConversationIdFromTargets({ + targets: ["matrix:room:!room:example"], + }); + expect(resolved).toBe("!room:example"); + }); + it("extracts ids from Discord channel mentions", () => { const resolved = resolveConversationIdFromTargets({ targets: ["<#1475250310120214812>"], diff --git a/src/infra/outbound/conversation-id.ts b/src/infra/outbound/conversation-id.ts index a6f8ed1fd6b..51dcd81e8b3 100644 --- a/src/infra/outbound/conversation-id.ts +++ b/src/infra/outbound/conversation-id.ts @@ -6,6 +6,26 @@ function normalizeConversationId(value: unknown): string | undefined { return trimmed || undefined; } +function resolveScopedConversationTarget(value: string): string | undefined { + const trimmed = normalizeConversationId(value); + if (!trimmed) { + return undefined; + } + const matrixScoped = trimmed.toLowerCase().startsWith("matrix:") + ? trimmed.slice("matrix:".length).trim() + : trimmed; + for (const prefix of ["channel:", "room:"]) { + if (!matrixScoped.startsWith(prefix)) { + continue; + } + const conversationId = normalizeConversationId(matrixScoped.slice(prefix.length)); + if (conversationId) { + return conversationId; + } + } + return undefined; +} + export function resolveConversationIdFromTargets(params: { threadId?: string | number; targets: Array; @@ -21,12 +41,9 @@ export function resolveConversationIdFromTargets(params: { if (!target) { continue; } - if (target.startsWith("channel:")) { - const channelId = normalizeConversationId(target.slice("channel:".length)); - if (channelId) { - return channelId; - } - continue; + const scopedConversationId = resolveScopedConversationTarget(target); + if (scopedConversationId) { + return scopedConversationId; } const mentionMatch = target.match(/^<#(\d+)>$/); if (mentionMatch?.[1]) { diff --git a/src/utils/delivery-context.test.ts b/src/utils/delivery-context.test.ts index 67c7cbbcede..3c62f7f5a39 100644 --- a/src/utils/delivery-context.test.ts +++ b/src/utils/delivery-context.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "vitest"; import { + formatConversationTarget, deliveryContextKey, deliveryContextFromSession, mergeDeliveryContext, @@ -77,6 +78,16 @@ describe("delivery context helpers", () => { ); }); + it("formats channel-aware conversation targets", () => { + expect(formatConversationTarget({ channel: "discord", conversationId: "123" })).toBe( + "channel:123", + ); + expect(formatConversationTarget({ channel: "matrix", conversationId: "!room:example" })).toBe( + "room:!room:example", + ); + expect(formatConversationTarget({ channel: "matrix", conversationId: " " })).toBeUndefined(); + }); + it("derives delivery context from a session entry", () => { expect( deliveryContextFromSession({ diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts index 2fadcac0851..6c0fd829e14 100644 --- a/src/utils/delivery-context.ts +++ b/src/utils/delivery-context.ts @@ -49,6 +49,26 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon return normalized; } +export function formatConversationTarget(params: { + channel?: string; + conversationId?: string | number; +}): string | undefined { + const channel = + typeof params.channel === "string" + ? (normalizeMessageChannel(params.channel) ?? params.channel.trim()) + : undefined; + const conversationId = + typeof params.conversationId === "number" && Number.isFinite(params.conversationId) + ? String(Math.trunc(params.conversationId)) + : typeof params.conversationId === "string" + ? params.conversationId.trim() + : undefined; + if (!channel || !conversationId) { + return undefined; + } + return channel === "matrix" ? `room:${conversationId}` : `channel:${conversationId}`; +} + export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): { deliveryContext?: DeliveryContext; lastChannel?: string;