ACP: honor Matrix room targets for thread delivery

This commit is contained in:
Gustavo Madeira Santana
2026-03-14 23:39:49 +00:00
parent 92be3952e9
commit 1c71f76f33
8 changed files with 229 additions and 12 deletions

View File

@@ -386,6 +386,76 @@ describe("spawnAcpDirect", () => {
expect(transcriptCalls[1]?.threadId).toBe("child-thread");
});
it("spawns Matrix thread-bound ACP sessions from top-level room targets", async () => {
hoisted.state.cfg = {
...hoisted.state.cfg,
channels: {
...hoisted.state.cfg.channels,
matrix: {
threadBindings: {
enabled: true,
spawnAcpSessions: true,
},
},
},
};
hoisted.sessionBindingBindMock.mockImplementationOnce(
async (input: {
targetSessionKey: string;
conversation: { accountId: string; conversationId: string; parentConversationId?: string };
metadata?: Record<string, unknown>;
}) =>
createSessionBinding({
targetSessionKey: input.targetSessionKey,
conversation: {
channel: "matrix",
accountId: input.conversation.accountId,
conversationId: "child-thread",
parentConversationId: input.conversation.parentConversationId ?? "!room:example",
},
metadata: {
boundBy:
typeof input.metadata?.boundBy === "string" ? input.metadata.boundBy : "system",
agentId: "codex",
webhookId: "wh-1",
},
}),
);
const result = await spawnAcpDirect(
{
task: "Investigate flaky tests",
agentId: "codex",
mode: "session",
thread: true,
},
{
agentSessionKey: "agent:main:matrix:channel:!room:example",
agentChannel: "matrix",
agentAccountId: "default",
agentTo: "room:!room:example",
},
);
expect(result.status).toBe("accepted");
expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith(
expect.objectContaining({
placement: "child",
conversation: expect.objectContaining({
channel: "matrix",
accountId: "default",
conversationId: "!room:example",
}),
}),
);
const agentCall = hoisted.callGatewayMock.mock.calls
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find((request) => request.method === "agent");
expect(agentCall?.params?.channel).toBe("matrix");
expect(agentCall?.params?.to).toBe("room:child-thread");
expect(agentCall?.params?.threadId).toBe("child-thread");
});
it("does not inline delivery for fresh oneshot ACP runs", async () => {
const result = await spawnAcpDirect(
{

View File

@@ -41,7 +41,11 @@ import {
normalizeAgentId,
parseAgentSessionKey,
} from "../routing/session-key.js";
import { deliveryContextFromSession, normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
deliveryContextFromSession,
formatConversationTarget,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import {
type AcpSpawnParentRelayHandle,
resolveAcpSpawnStreamLogPath,
@@ -666,9 +670,16 @@ export async function spawnAcpDirect(
const fallbackThreadId =
fallbackThreadIdRaw != null ? String(fallbackThreadIdRaw).trim() || undefined : undefined;
const deliveryThreadId = boundThreadId ?? fallbackThreadId;
const inferredDeliveryTo = boundThreadId
? `channel:${boundThreadId}`
: requesterOrigin?.to?.trim() || (deliveryThreadId ? `channel:${deliveryThreadId}` : undefined);
const inferredDeliveryTo =
formatConversationTarget({
channel: requesterOrigin?.channel ?? binding?.conversation.channel,
conversationId: boundThreadId,
}) ??
requesterOrigin?.to?.trim() ??
formatConversationTarget({
channel: requesterOrigin?.channel,
conversationId: deliveryThreadId,
});
const hasDeliveryTarget = Boolean(requesterOrigin?.channel && inferredDeliveryTo);
// Fresh one-shot ACP runs should bootstrap the worker first, then let higher layers
// decide how to relay status. Inline delivery is reserved for thread-bound sessions.

View File

@@ -4,6 +4,8 @@ import {
__testing as sessionBindingServiceTesting,
registerSessionBindingAdapter,
} from "../infra/outbound/session-binding-service.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createTestRegistry } from "../test-utils/channel-plugins.js";
type AgentCallRequest = { method?: string; params?: Record<string, unknown> };
type RequesterResolution = {
@@ -173,6 +175,7 @@ vi.mock("../config/config.js", async (importOriginal) => {
describe("subagent announce formatting", () => {
let previousFastTestEnv: string | undefined;
let runSubagentAnnounceFlow: (typeof import("./subagent-announce.js"))["runSubagentAnnounceFlow"];
let matrixPlugin: (typeof import("../../extensions/matrix/src/channel.js"))["matrixPlugin"];
beforeAll(async () => {
// Set FAST_TEST_MODE before importing the module to ensure the module-level
@@ -181,6 +184,7 @@ describe("subagent announce formatting", () => {
// See: https://github.com/openclaw/openclaw/issues/31298
previousFastTestEnv = process.env.OPENCLAW_TEST_FAST;
process.env.OPENCLAW_TEST_FAST = "1";
({ matrixPlugin } = await import("../../extensions/matrix/src/channel.js"));
({ runSubagentAnnounceFlow } = await import("./subagent-announce.js"));
});
@@ -232,6 +236,9 @@ describe("subagent announce formatting", () => {
chatHistoryMock.mockReset().mockResolvedValue({ messages: [] });
sessionStore = {};
sessionBindingServiceTesting.resetSessionBindingAdaptersForTests();
setActivePluginRegistry(
createTestRegistry([{ pluginId: "matrix", plugin: matrixPlugin, source: "test" }]),
);
configOverride = {
session: {
mainKey: "main",
@@ -835,6 +842,64 @@ describe("subagent announce formatting", () => {
expect(directTargets).not.toContain("channel:main-parent-channel");
});
it("routes Matrix bound completion delivery to room targets", async () => {
sessionStore = {
"agent:main:subagent:matrix-child": {
sessionId: "child-session-matrix",
},
"agent:main:main": {
sessionId: "requester-session-matrix",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "matrix bound answer" }] }],
});
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:main" ? 1 : 0,
);
registerSessionBindingAdapter({
channel: "matrix",
accountId: "acct-matrix",
listBySession: (targetSessionKey: string) =>
targetSessionKey === "agent:main:subagent:matrix-child"
? [
{
bindingId: "matrix:acct-matrix:$thread-bound-1",
targetSessionKey,
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "acct-matrix",
conversationId: "$thread-bound-1",
parentConversationId: "!room:example",
},
status: "active",
boundAt: Date.now(),
},
]
: [],
resolveByConversation: () => null,
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:matrix-child",
childRunId: "run-session-bound-matrix",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "matrix", to: "room:!room:example", accountId: "acct-matrix" },
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).not.toHaveBeenCalled();
expect(agentSpy).toHaveBeenCalledTimes(1);
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("matrix");
expect(call?.params?.to).toBe("room:$thread-bound-1");
});
it("includes completion status details for error and timeout outcomes", async () => {
const cases = [
{

View File

@@ -10,6 +10,7 @@ import {
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
@@ -19,6 +20,7 @@ import { extractTextFromChatContent } from "../shared/chat-content.js";
import {
type DeliveryContext,
deliveryContextFromSession,
formatConversationTarget,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
@@ -537,7 +539,11 @@ async function resolveSubagentCompletionOrigin(params: {
? String(requesterOrigin.threadId).trim()
: undefined;
const conversationId =
threadId || (to?.startsWith("channel:") ? to.slice("channel:".length) : "");
threadId ||
resolveConversationIdFromTargets({
targets: [to],
}) ||
"";
const requesterConversation: ConversationRef | undefined =
channel && conversationId ? { channel, accountId, conversationId } : undefined;
@@ -552,7 +558,10 @@ async function resolveSubagentCompletionOrigin(params: {
{
channel: route.binding.conversation.channel,
accountId: route.binding.conversation.accountId,
to: `channel:${route.binding.conversation.conversationId}`,
to: formatConversationTarget({
channel: route.binding.conversation.channel,
conversationId: route.binding.conversation.conversationId,
}),
threadId:
requesterOrigin?.threadId != null && requesterOrigin.threadId !== ""
? String(requesterOrigin.threadId)

View File

@@ -17,6 +17,20 @@ describe("resolveConversationIdFromTargets", () => {
expect(resolved).toBe("987654321");
});
it("extracts room ids from room: targets", () => {
const resolved = resolveConversationIdFromTargets({
targets: ["room:!room:example"],
});
expect(resolved).toBe("!room:example");
});
it("extracts room ids from matrix-scoped room targets", () => {
const resolved = resolveConversationIdFromTargets({
targets: ["matrix:room:!room:example"],
});
expect(resolved).toBe("!room:example");
});
it("extracts ids from Discord channel mentions", () => {
const resolved = resolveConversationIdFromTargets({
targets: ["<#1475250310120214812>"],

View File

@@ -6,6 +6,26 @@ function normalizeConversationId(value: unknown): string | undefined {
return trimmed || undefined;
}
function resolveScopedConversationTarget(value: string): string | undefined {
const trimmed = normalizeConversationId(value);
if (!trimmed) {
return undefined;
}
const matrixScoped = trimmed.toLowerCase().startsWith("matrix:")
? trimmed.slice("matrix:".length).trim()
: trimmed;
for (const prefix of ["channel:", "room:"]) {
if (!matrixScoped.startsWith(prefix)) {
continue;
}
const conversationId = normalizeConversationId(matrixScoped.slice(prefix.length));
if (conversationId) {
return conversationId;
}
}
return undefined;
}
export function resolveConversationIdFromTargets(params: {
threadId?: string | number;
targets: Array<string | undefined | null>;
@@ -21,12 +41,9 @@ export function resolveConversationIdFromTargets(params: {
if (!target) {
continue;
}
if (target.startsWith("channel:")) {
const channelId = normalizeConversationId(target.slice("channel:".length));
if (channelId) {
return channelId;
}
continue;
const scopedConversationId = resolveScopedConversationTarget(target);
if (scopedConversationId) {
return scopedConversationId;
}
const mentionMatch = target.match(/^<#(\d+)>$/);
if (mentionMatch?.[1]) {

View File

@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import {
formatConversationTarget,
deliveryContextKey,
deliveryContextFromSession,
mergeDeliveryContext,
@@ -77,6 +78,16 @@ describe("delivery context helpers", () => {
);
});
it("formats channel-aware conversation targets", () => {
expect(formatConversationTarget({ channel: "discord", conversationId: "123" })).toBe(
"channel:123",
);
expect(formatConversationTarget({ channel: "matrix", conversationId: "!room:example" })).toBe(
"room:!room:example",
);
expect(formatConversationTarget({ channel: "matrix", conversationId: " " })).toBeUndefined();
});
it("derives delivery context from a session entry", () => {
expect(
deliveryContextFromSession({

View File

@@ -49,6 +49,26 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon
return normalized;
}
export function formatConversationTarget(params: {
channel?: string;
conversationId?: string | number;
}): string | undefined {
const channel =
typeof params.channel === "string"
? (normalizeMessageChannel(params.channel) ?? params.channel.trim())
: undefined;
const conversationId =
typeof params.conversationId === "number" && Number.isFinite(params.conversationId)
? String(Math.trunc(params.conversationId))
: typeof params.conversationId === "string"
? params.conversationId.trim()
: undefined;
if (!channel || !conversationId) {
return undefined;
}
return channel === "matrix" ? `room:${conversationId}` : `channel:${conversationId}`;
}
export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): {
deliveryContext?: DeliveryContext;
lastChannel?: string;