ACP: fix Matrix binding resolution

This commit is contained in:
Gustavo Madeira Santana
2026-03-09 06:20:47 -04:00
parent 93d25a520f
commit 748769e8dd
9 changed files with 372 additions and 86 deletions

View File

@@ -817,15 +817,29 @@ describe("/acp command", () => {
expect(result?.reply?.text).toContain("Removed 1 binding");
});
it("lists ACP sessions from the session store", async () => {
it("lists ACP sessions across ACP agent stores", async () => {
hoisted.sessionBindingListBySessionMock.mockImplementation((key: string) =>
key === defaultAcpSessionKey ? [createBoundThreadSession(key) as SessionBindingRecord] : [],
);
hoisted.loadSessionStoreMock.mockReturnValue({
[defaultAcpSessionKey]: {
sessionId: "sess-1",
updatedAt: Date.now(),
label: "codex-main",
hoisted.listAcpSessionEntriesMock.mockResolvedValue([
{
cfg: baseCfg,
storePath: "/tmp/agents/codex/sessions.json",
sessionKey: defaultAcpSessionKey,
storeSessionKey: defaultAcpSessionKey,
entry: {
sessionId: "sess-1",
updatedAt: Date.now(),
label: "codex-main",
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime-1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
},
acp: {
backend: "acpx",
agent: "codex",
@@ -835,9 +849,12 @@ describe("/acp command", () => {
lastActivityAt: Date.now(),
},
},
]);
hoisted.loadSessionStoreMock.mockReturnValue({
"agent:main:main": {
sessionId: "sess-main",
sessionId: "sess-1",
updatedAt: Date.now(),
label: "main-session",
},
});
@@ -848,6 +865,27 @@ describe("/acp command", () => {
expect(result?.reply?.text).toContain(`thread:${defaultThreadId}`);
});
it("explains that Matrix --thread here requires an existing thread", async () => {
const cfg = {
...baseCfg,
channels: {
...baseCfg.channels,
matrix: {
threadBindings: {
enabled: true,
spawnAcpSessions: true,
},
},
},
} satisfies OpenClawConfig;
const result = await runMatrixRoomAcpCommand("/acp spawn codex --thread here", cfg);
expect(result?.reply?.text).toContain("existing Matrix thread");
expect(result?.reply?.text).toContain("use --thread auto");
expect(hoisted.sessionBindingBindMock).not.toHaveBeenCalled();
});
it("shows ACP status for the thread-bound ACP session", async () => {
mockBoundThreadSession({
identity: {
@@ -868,6 +906,66 @@ describe("/acp command", () => {
expect(hoisted.getStatusMock).toHaveBeenCalledTimes(1);
});
it("resolves ACP key-tail shorthand for explicit ACP session targets", async () => {
hoisted.callGatewayMock.mockRejectedValue(new Error("not found"));
hoisted.listAcpSessionEntriesMock.mockResolvedValue([
{
cfg: baseCfg,
storePath: "/tmp/agents/codex/sessions.json",
sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
storeSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
entry: {
sessionId: "session-random-id",
updatedAt: Date.now(),
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime-1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime-1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
},
]);
hoisted.readAcpSessionEntryMock.mockReturnValue({
sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
storeSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime-1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
const result = await runDiscordAcpCommand(
"/acp status acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
baseCfg,
);
expect(result?.reply?.text).toContain(
"session: agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
);
expect(hoisted.getStatusMock).toHaveBeenCalledWith(
expect.objectContaining({
handle: expect.objectContaining({
sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
}),
}),
);
});
it("updates ACP runtime mode via /acp set-mode", async () => {
mockBoundThreadSession();
const result = await runThreadAcpCommand("/acp set-mode plan", baseCfg);

View File

@@ -2,8 +2,10 @@ import { getAcpSessionManager } from "../../../acp/control-plane/manager.js";
import { formatAcpRuntimeErrorText } from "../../../acp/runtime/error-text.js";
import { toAcpRuntimeError } from "../../../acp/runtime/errors.js";
import { getAcpRuntimeBackend, requireAcpRuntimeBackend } from "../../../acp/runtime/registry.js";
import { resolveSessionStorePathForAcp } from "../../../acp/runtime/session-meta.js";
import { loadSessionStore } from "../../../config/sessions.js";
import {
listAcpSessionEntries,
type AcpSessionStoreEntry,
} from "../../../acp/runtime/session-meta.js";
import type { SessionEntry } from "../../../config/sessions/types.js";
import { getSessionBindingService } from "../../../infra/outbound/session-binding-service.js";
import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js";
@@ -144,10 +146,10 @@ function formatAcpSessionLine(params: {
return `${marker} ${label} (${acp.mode}, ${acp.state}, backend:${acp.backend}${threadText}) -> ${params.key}`;
}
export function handleAcpSessionsAction(
export async function handleAcpSessionsAction(
params: HandleCommandsParams,
restTokens: string[],
): CommandHandlerResult {
): Promise<CommandHandlerResult> {
if (restTokens.length > 0) {
return stopWithText(ACP_SESSIONS_USAGE);
}
@@ -157,28 +159,27 @@ export function handleAcpSessionsAction(
return stopWithText("⚠️ Missing session key.");
}
const { storePath } = resolveSessionStorePathForAcp({
cfg: params.cfg,
sessionKey: currentSessionKey,
});
let store: Record<string, SessionEntry>;
let sessions: Awaited<ReturnType<typeof listAcpSessionEntries>>;
try {
store = loadSessionStore(storePath);
sessions = await listAcpSessionEntries({ cfg: params.cfg });
} catch {
store = {};
sessions = [];
}
const bindingContext = resolveAcpCommandBindingContext(params);
const normalizedChannel = bindingContext.channel;
const normalizedAccountId = bindingContext.accountId || undefined;
const bindingService = getSessionBindingService();
const sessionsWithEntry = sessions.filter(
(session): session is AcpSessionStoreEntry & { entry: SessionEntry } =>
Boolean(session.entry?.acp),
);
const rows = Object.entries(store)
.filter(([, entry]) => Boolean(entry?.acp))
.toSorted(([, a], [, b]) => (b?.updatedAt ?? 0) - (a?.updatedAt ?? 0))
const rows = sessionsWithEntry
.toSorted((a, b) => (b.entry.updatedAt ?? 0) - (a.entry.updatedAt ?? 0))
.slice(0, 20)
.map(([key, entry]) => {
.map((session) => {
const key = session.sessionKey;
const bindingThreadId = bindingService
.listBySession(key)
.find(
@@ -188,7 +189,7 @@ export function handleAcpSessionsAction(
)?.conversation.conversationId;
return formatAcpSessionLine({
key,
entry,
entry: session.entry,
currentSessionKey,
threadId: bindingThreadId,
});

View File

@@ -131,9 +131,13 @@ async function bindSpawnedAcpSessionToThread(params: {
((requiresThreadIdForHere && !currentThreadId) ||
(!requiresThreadIdForHere && !currentConversationId))
) {
const hereError =
channel === "matrix"
? "--thread here requires running /acp spawn inside an existing Matrix thread. In a top-level Matrix room or DM, use --thread auto to create and bind a new thread."
: `--thread here requires running /acp spawn inside an active ${channel} thread/conversation.`;
return {
ok: false,
error: `--thread here requires running /acp spawn inside an active ${channel} thread/conversation.`,
error: hereError,
};
}

View File

@@ -1,38 +1,8 @@
import { callGateway } from "../../../gateway/call.js";
import { resolveEffectiveResetTargetSessionKey } from "../acp-reset-target.js";
import { resolveRequesterSessionKey } from "../commands-subagents/shared.js";
import type { HandleCommandsParams } from "../commands-types.js";
import { resolveSessionKeyByReference } from "../session-target-resolution.js";
import { resolveAcpCommandBindingContext } from "./context.js";
import { SESSION_ID_RE } from "./shared.js";
async function resolveSessionKeyByToken(token: string): Promise<string | null> {
const trimmed = token.trim();
if (!trimmed) {
return null;
}
const attempts: Array<Record<string, string>> = [{ key: trimmed }];
if (SESSION_ID_RE.test(trimmed)) {
attempts.push({ sessionId: trimmed });
}
attempts.push({ label: trimmed });
for (const params of attempts) {
try {
const resolved = await callGateway<{ key?: string }>({
method: "sessions.resolve",
params,
timeoutMs: 8_000,
});
const key = typeof resolved?.key === "string" ? resolved.key.trim() : "";
if (key) {
return key;
}
} catch {
// Try next resolver strategy.
}
}
return null;
}
export function resolveBoundAcpThreadSessionKey(params: HandleCommandsParams): string | undefined {
const commandTargetSessionKey =
@@ -59,7 +29,10 @@ export async function resolveAcpTargetSessionKey(params: {
}): Promise<{ ok: true; sessionKey: string } | { ok: false; error: string }> {
const token = params.token?.trim() || "";
if (token) {
const resolved = await resolveSessionKeyByToken(token);
const resolved = await resolveSessionKeyByReference({
cfg: params.commandParams.cfg,
token,
});
if (!resolved) {
return {
ok: false,

View File

@@ -9,6 +9,7 @@ import { installSubagentsCommandCoreMocks } from "./commands-subagents.test-mock
const hoisted = vi.hoisted(() => {
const callGatewayMock = vi.fn();
const listAcpSessionEntriesMock = vi.fn();
const readAcpSessionEntryMock = vi.fn();
const sessionBindingCapabilitiesMock = vi.fn();
const sessionBindingBindMock = vi.fn();
@@ -17,6 +18,7 @@ const hoisted = vi.hoisted(() => {
const sessionBindingUnbindMock = vi.fn();
return {
callGatewayMock,
listAcpSessionEntriesMock,
readAcpSessionEntryMock,
sessionBindingCapabilitiesMock,
sessionBindingBindMock,
@@ -57,6 +59,7 @@ vi.mock("../../acp/runtime/session-meta.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../acp/runtime/session-meta.js")>();
return {
...actual,
listAcpSessionEntries: (params: unknown) => hoisted.listAcpSessionEntriesMock(params),
readAcpSessionEntry: (params: unknown) => hoisted.readAcpSessionEntryMock(params),
};
});
@@ -188,6 +191,7 @@ describe("/focus, /unfocus, /agents", () => {
beforeEach(() => {
resetSubagentRegistryForTests();
hoisted.callGatewayMock.mockReset();
hoisted.listAcpSessionEntriesMock.mockReset().mockResolvedValue([]);
hoisted.readAcpSessionEntryMock.mockReset().mockReturnValue(null);
hoisted.sessionBindingCapabilitiesMock
.mockReset()
@@ -261,6 +265,71 @@ describe("/focus, /unfocus, /agents", () => {
);
});
it("/focus resolves ACP session key shorthand from the ACP key suffix", async () => {
hoisted.callGatewayMock.mockRejectedValue(new Error("not found"));
hoisted.sessionBindingCapabilitiesMock.mockReturnValue(createSessionBindingCapabilities());
hoisted.sessionBindingResolveByConversationMock.mockReturnValue(null);
hoisted.sessionBindingBindMock.mockImplementation(
async (input: {
targetSessionKey: string;
conversation: { channel: string; accountId: string; conversationId: string };
metadata?: Record<string, unknown>;
}) =>
createSessionBindingRecord({
targetSessionKey: input.targetSessionKey,
conversation: {
channel: input.conversation.channel,
accountId: input.conversation.accountId,
conversationId: input.conversation.conversationId,
},
metadata: {
boundBy:
typeof input.metadata?.boundBy === "string" ? input.metadata.boundBy : "user-1",
},
}),
);
hoisted.listAcpSessionEntriesMock.mockResolvedValue([
{
cfg: baseCfg,
storePath: "/tmp/agents/codex/sessions.json",
sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
storeSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
entry: {
sessionId: "session-random-id",
updatedAt: Date.now(),
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime-1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime-1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
},
]);
const result = await handleSubagentsCommand(
createDiscordCommandParams("/focus acp:982649c1-143b-40e4-9bb8-d90ae83f174f"),
true,
);
expect(result?.reply?.text).toContain("bound this thread");
expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith(
expect.objectContaining({
targetSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f",
}),
);
});
it("/focus rejects Matrix child thread creation when spawn config is not enabled", async () => {
const result = await focusCodexAcp(createMatrixCommandParams("/focus codex-acp"));

View File

@@ -11,13 +11,13 @@ import {
sanitizeTextContent,
stripToolMessages,
} from "../../../agents/tools/sessions-helpers.js";
import type { OpenClawConfig } from "../../../config/config.js";
import type {
SessionEntry,
loadSessionStore as loadSessionStoreFn,
resolveStorePath as resolveStorePathFn,
} from "../../../config/sessions.js";
import { parseDiscordTarget } from "../../../discord/targets.js";
import { callGateway } from "../../../gateway/call.js";
import { formatTimeAgo } from "../../../infra/format-time/format-relative.ts";
import { parseAgentSessionKey } from "../../../routing/session-key.js";
import { isSubagentSessionKey } from "../../../routing/session-key.js";
@@ -41,6 +41,7 @@ import {
resolveMatrixConversationId,
resolveMatrixParentConversationId,
} from "../matrix-context.ts";
import { resolveSessionKeyByReference } from "../session-target-resolution.js";
import {
formatRunLabel,
formatRunStatus,
@@ -356,6 +357,7 @@ export function resolveDiscordChannelIdForFocus(
}
export async function resolveFocusTargetSession(params: {
cfg: OpenClawConfig;
runs: SubagentRunRecord[];
token: string;
}): Promise<FocusTargetResolution | null> {
@@ -376,33 +378,18 @@ export async function resolveFocusTargetSession(params: {
return null;
}
const attempts: Array<Record<string, string>> = [];
attempts.push({ key: token });
if (looksLikeSessionId(token)) {
attempts.push({ sessionId: token });
}
attempts.push({ label: token });
for (const attempt of attempts) {
try {
const resolved = await callGateway<{ key?: string }>({
method: "sessions.resolve",
params: attempt,
});
const key = typeof resolved?.key === "string" ? resolved.key.trim() : "";
if (!key) {
continue;
}
const parsed = parseAgentSessionKey(key);
return {
targetKind: key.includes(":subagent:") ? "subagent" : "acp",
targetSessionKey: key,
agentId: parsed?.agentId ?? "main",
label: token,
};
} catch {
// Try the next resolution strategy.
}
const key = await resolveSessionKeyByReference({
cfg: params.cfg,
token,
});
if (key) {
const parsed = parseAgentSessionKey(key);
return {
targetKind: key.includes(":subagent:") ? "subagent" : "acp",
targetSessionKey: key,
agentId: parsed?.agentId ?? "main",
label: token,
};
}
return null;
}

View File

@@ -0,0 +1,95 @@
import { listAcpSessionEntries } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import { callGateway } from "../../gateway/call.js";
import { SESSION_ID_RE } from "../../sessions/session-id.js";
function resolveAcpSessionKeySuffixToken(token: string): string | null {
const trimmed = token.trim();
if (!trimmed) {
return null;
}
if (SESSION_ID_RE.test(trimmed)) {
return trimmed;
}
const lower = trimmed.toLowerCase();
if (!lower.startsWith("acp:")) {
return null;
}
const suffix = trimmed.slice("acp:".length).trim();
return SESSION_ID_RE.test(suffix) ? suffix : null;
}
async function resolveSessionKeyViaGateway(token: string): Promise<string | null> {
const trimmed = token.trim();
if (!trimmed) {
return null;
}
const attempts: Array<Record<string, string>> = [{ key: trimmed }];
if (SESSION_ID_RE.test(trimmed)) {
attempts.push({ sessionId: trimmed });
}
attempts.push({ label: trimmed });
for (const params of attempts) {
try {
const resolved = await callGateway<{ key?: string }>({
method: "sessions.resolve",
params,
timeoutMs: 8_000,
});
const key = typeof resolved?.key === "string" ? resolved.key.trim() : "";
if (key) {
return key;
}
} catch {
// Try the next resolution strategy.
}
}
return null;
}
async function resolveAcpSessionKeyViaFallback(params: {
cfg: OpenClawConfig;
token: string;
}): Promise<string | null> {
const trimmed = params.token.trim();
const suffix = resolveAcpSessionKeySuffixToken(trimmed);
if (!suffix) {
return null;
}
let sessions: Awaited<ReturnType<typeof listAcpSessionEntries>>;
try {
sessions = await listAcpSessionEntries({ cfg: params.cfg });
} catch {
return null;
}
const matches = sessions
.map((session) => session.sessionKey.trim())
.filter((sessionKey) => sessionKey.endsWith(`:acp:${suffix}`));
if (matches.length !== 1) {
return null;
}
return matches[0] ?? null;
}
export async function resolveSessionKeyByReference(params: {
cfg: OpenClawConfig;
token: string;
}): Promise<string | null> {
const trimmed = params.token.trim();
if (!trimmed) {
return null;
}
const resolved = await resolveSessionKeyViaGateway(trimmed);
if (resolved) {
return resolved;
}
return await resolveAcpSessionKeyViaFallback({
cfg: params.cfg,
token: trimmed,
});
}

View File

@@ -1,3 +1,5 @@
import path from "node:path";
import { createJiti } from "jiti";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
__testing,
@@ -199,6 +201,38 @@ describe("session binding service", () => {
});
});
it("shares adapter registrations across native ESM and Jiti loader paths", () => {
const serviceModulePath = path.join(
process.cwd(),
"src/infra/outbound/session-binding-service.ts",
);
const jiti = createJiti(import.meta.url, {
interopDefault: true,
extensions: [".ts", ".tsx", ".mts", ".cts", ".js", ".mjs", ".cjs", ".json"],
});
const viaJiti = jiti(serviceModulePath) as typeof import("./session-binding-service.js");
viaJiti.registerSessionBindingAdapter({
channel: "matrix",
accountId: "default",
capabilities: {
placements: ["current", "child"],
},
bind: async (input) => createRecord(input),
listBySession: () => [],
resolveByConversation: () => null,
});
expect(
getSessionBindingService().getCapabilities({ channel: "matrix", accountId: "default" }),
).toEqual({
adapterAvailable: true,
bindSupported: true,
unbindSupported: false,
placements: ["current", "child"],
});
});
it("routes lifecycle updates through the channel/account adapter", async () => {
const setIdleTimeoutBySession = vi.fn(() => [
{

View File

@@ -165,7 +165,32 @@ function resolveAdapterCapabilities(
};
}
const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map<string, SessionBindingAdapter>();
type SessionBindingServiceGlobalState = {
adaptersByChannelAccount: Map<string, SessionBindingAdapter>;
};
const SESSION_BINDING_SERVICE_STATE_KEY = Symbol.for("openclaw.sessionBindingServiceState");
function createSessionBindingServiceGlobalState(): SessionBindingServiceGlobalState {
return {
adaptersByChannelAccount: new Map<string, SessionBindingAdapter>(),
};
}
function resolveSessionBindingServiceGlobalState(): SessionBindingServiceGlobalState {
const runtimeGlobal = globalThis as typeof globalThis & {
[SESSION_BINDING_SERVICE_STATE_KEY]?: SessionBindingServiceGlobalState;
};
if (!runtimeGlobal[SESSION_BINDING_SERVICE_STATE_KEY]) {
runtimeGlobal[SESSION_BINDING_SERVICE_STATE_KEY] = createSessionBindingServiceGlobalState();
}
return runtimeGlobal[SESSION_BINDING_SERVICE_STATE_KEY];
}
// Plugins are loaded via Jiti while core code imports this module via native ESM.
// Keep adapter state on globalThis so both loader paths observe the same registry.
const ADAPTERS_BY_CHANNEL_ACCOUNT =
resolveSessionBindingServiceGlobalState().adaptersByChannelAccount;
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
const key = toAdapterKey({