diff --git a/src/auto-reply/reply/commands-acp.test.ts b/src/auto-reply/reply/commands-acp.test.ts index b466fad5cff..a933b176abd 100644 --- a/src/auto-reply/reply/commands-acp.test.ts +++ b/src/auto-reply/reply/commands-acp.test.ts @@ -817,15 +817,29 @@ describe("/acp command", () => { expect(result?.reply?.text).toContain("Removed 1 binding"); }); - it("lists ACP sessions from the session store", async () => { + it("lists ACP sessions across ACP agent stores", async () => { hoisted.sessionBindingListBySessionMock.mockImplementation((key: string) => key === defaultAcpSessionKey ? [createBoundThreadSession(key) as SessionBindingRecord] : [], ); - hoisted.loadSessionStoreMock.mockReturnValue({ - [defaultAcpSessionKey]: { - sessionId: "sess-1", - updatedAt: Date.now(), - label: "codex-main", + hoisted.listAcpSessionEntriesMock.mockResolvedValue([ + { + cfg: baseCfg, + storePath: "/tmp/agents/codex/sessions.json", + sessionKey: defaultAcpSessionKey, + storeSessionKey: defaultAcpSessionKey, + entry: { + sessionId: "sess-1", + updatedAt: Date.now(), + label: "codex-main", + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime-1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }, acp: { backend: "acpx", agent: "codex", @@ -835,9 +849,12 @@ describe("/acp command", () => { lastActivityAt: Date.now(), }, }, + ]); + hoisted.loadSessionStoreMock.mockReturnValue({ "agent:main:main": { - sessionId: "sess-main", + sessionId: "sess-1", updatedAt: Date.now(), + label: "main-session", }, }); @@ -848,6 +865,27 @@ describe("/acp command", () => { expect(result?.reply?.text).toContain(`thread:${defaultThreadId}`); }); + it("explains that Matrix --thread here requires an existing thread", async () => { + const cfg = { + ...baseCfg, + channels: { + ...baseCfg.channels, + matrix: { + threadBindings: { + enabled: true, + spawnAcpSessions: true, + }, + }, + }, + } satisfies OpenClawConfig; + + const result = await runMatrixRoomAcpCommand("/acp spawn codex --thread here", cfg); + + expect(result?.reply?.text).toContain("existing Matrix thread"); + expect(result?.reply?.text).toContain("use --thread auto"); + expect(hoisted.sessionBindingBindMock).not.toHaveBeenCalled(); + }); + it("shows ACP status for the thread-bound ACP session", async () => { mockBoundThreadSession({ identity: { @@ -868,6 +906,66 @@ describe("/acp command", () => { expect(hoisted.getStatusMock).toHaveBeenCalledTimes(1); }); + it("resolves ACP key-tail shorthand for explicit ACP session targets", async () => { + hoisted.callGatewayMock.mockRejectedValue(new Error("not found")); + hoisted.listAcpSessionEntriesMock.mockResolvedValue([ + { + cfg: baseCfg, + storePath: "/tmp/agents/codex/sessions.json", + sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + storeSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + entry: { + sessionId: "session-random-id", + updatedAt: Date.now(), + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime-1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime-1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }, + ]); + hoisted.readAcpSessionEntryMock.mockReturnValue({ + sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + storeSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime-1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }); + + const result = await runDiscordAcpCommand( + "/acp status acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + baseCfg, + ); + + expect(result?.reply?.text).toContain( + "session: agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + ); + expect(hoisted.getStatusMock).toHaveBeenCalledWith( + expect.objectContaining({ + handle: expect.objectContaining({ + sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + }), + }), + ); + }); + it("updates ACP runtime mode via /acp set-mode", async () => { mockBoundThreadSession(); const result = await runThreadAcpCommand("/acp set-mode plan", baseCfg); diff --git a/src/auto-reply/reply/commands-acp/diagnostics.ts b/src/auto-reply/reply/commands-acp/diagnostics.ts index d521ac7ae5f..e8f11a74c77 100644 --- a/src/auto-reply/reply/commands-acp/diagnostics.ts +++ b/src/auto-reply/reply/commands-acp/diagnostics.ts @@ -2,8 +2,10 @@ import { getAcpSessionManager } from "../../../acp/control-plane/manager.js"; import { formatAcpRuntimeErrorText } from "../../../acp/runtime/error-text.js"; import { toAcpRuntimeError } from "../../../acp/runtime/errors.js"; import { getAcpRuntimeBackend, requireAcpRuntimeBackend } from "../../../acp/runtime/registry.js"; -import { resolveSessionStorePathForAcp } from "../../../acp/runtime/session-meta.js"; -import { loadSessionStore } from "../../../config/sessions.js"; +import { + listAcpSessionEntries, + type AcpSessionStoreEntry, +} from "../../../acp/runtime/session-meta.js"; import type { SessionEntry } from "../../../config/sessions/types.js"; import { getSessionBindingService } from "../../../infra/outbound/session-binding-service.js"; import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js"; @@ -144,10 +146,10 @@ function formatAcpSessionLine(params: { return `${marker} ${label} (${acp.mode}, ${acp.state}, backend:${acp.backend}${threadText}) -> ${params.key}`; } -export function handleAcpSessionsAction( +export async function handleAcpSessionsAction( params: HandleCommandsParams, restTokens: string[], -): CommandHandlerResult { +): Promise { if (restTokens.length > 0) { return stopWithText(ACP_SESSIONS_USAGE); } @@ -157,28 +159,27 @@ export function handleAcpSessionsAction( return stopWithText("⚠️ Missing session key."); } - const { storePath } = resolveSessionStorePathForAcp({ - cfg: params.cfg, - sessionKey: currentSessionKey, - }); - - let store: Record; + let sessions: Awaited>; try { - store = loadSessionStore(storePath); + sessions = await listAcpSessionEntries({ cfg: params.cfg }); } catch { - store = {}; + sessions = []; } const bindingContext = resolveAcpCommandBindingContext(params); const normalizedChannel = bindingContext.channel; const normalizedAccountId = bindingContext.accountId || undefined; const bindingService = getSessionBindingService(); + const sessionsWithEntry = sessions.filter( + (session): session is AcpSessionStoreEntry & { entry: SessionEntry } => + Boolean(session.entry?.acp), + ); - const rows = Object.entries(store) - .filter(([, entry]) => Boolean(entry?.acp)) - .toSorted(([, a], [, b]) => (b?.updatedAt ?? 0) - (a?.updatedAt ?? 0)) + const rows = sessionsWithEntry + .toSorted((a, b) => (b.entry.updatedAt ?? 0) - (a.entry.updatedAt ?? 0)) .slice(0, 20) - .map(([key, entry]) => { + .map((session) => { + const key = session.sessionKey; const bindingThreadId = bindingService .listBySession(key) .find( @@ -188,7 +189,7 @@ export function handleAcpSessionsAction( )?.conversation.conversationId; return formatAcpSessionLine({ key, - entry, + entry: session.entry, currentSessionKey, threadId: bindingThreadId, }); diff --git a/src/auto-reply/reply/commands-acp/lifecycle.ts b/src/auto-reply/reply/commands-acp/lifecycle.ts index 8cb880381b9..c55cbc01620 100644 --- a/src/auto-reply/reply/commands-acp/lifecycle.ts +++ b/src/auto-reply/reply/commands-acp/lifecycle.ts @@ -131,9 +131,13 @@ async function bindSpawnedAcpSessionToThread(params: { ((requiresThreadIdForHere && !currentThreadId) || (!requiresThreadIdForHere && !currentConversationId)) ) { + const hereError = + channel === "matrix" + ? "--thread here requires running /acp spawn inside an existing Matrix thread. In a top-level Matrix room or DM, use --thread auto to create and bind a new thread." + : `--thread here requires running /acp spawn inside an active ${channel} thread/conversation.`; return { ok: false, - error: `--thread here requires running /acp spawn inside an active ${channel} thread/conversation.`, + error: hereError, }; } diff --git a/src/auto-reply/reply/commands-acp/targets.ts b/src/auto-reply/reply/commands-acp/targets.ts index b517ea19d75..8f26e57eec6 100644 --- a/src/auto-reply/reply/commands-acp/targets.ts +++ b/src/auto-reply/reply/commands-acp/targets.ts @@ -1,38 +1,8 @@ -import { callGateway } from "../../../gateway/call.js"; import { resolveEffectiveResetTargetSessionKey } from "../acp-reset-target.js"; import { resolveRequesterSessionKey } from "../commands-subagents/shared.js"; import type { HandleCommandsParams } from "../commands-types.js"; +import { resolveSessionKeyByReference } from "../session-target-resolution.js"; import { resolveAcpCommandBindingContext } from "./context.js"; -import { SESSION_ID_RE } from "./shared.js"; - -async function resolveSessionKeyByToken(token: string): Promise { - const trimmed = token.trim(); - if (!trimmed) { - return null; - } - const attempts: Array> = [{ key: trimmed }]; - if (SESSION_ID_RE.test(trimmed)) { - attempts.push({ sessionId: trimmed }); - } - attempts.push({ label: trimmed }); - - for (const params of attempts) { - try { - const resolved = await callGateway<{ key?: string }>({ - method: "sessions.resolve", - params, - timeoutMs: 8_000, - }); - const key = typeof resolved?.key === "string" ? resolved.key.trim() : ""; - if (key) { - return key; - } - } catch { - // Try next resolver strategy. - } - } - return null; -} export function resolveBoundAcpThreadSessionKey(params: HandleCommandsParams): string | undefined { const commandTargetSessionKey = @@ -59,7 +29,10 @@ export async function resolveAcpTargetSessionKey(params: { }): Promise<{ ok: true; sessionKey: string } | { ok: false; error: string }> { const token = params.token?.trim() || ""; if (token) { - const resolved = await resolveSessionKeyByToken(token); + const resolved = await resolveSessionKeyByReference({ + cfg: params.commandParams.cfg, + token, + }); if (!resolved) { return { ok: false, diff --git a/src/auto-reply/reply/commands-subagents-focus.test.ts b/src/auto-reply/reply/commands-subagents-focus.test.ts index 6f4ee2c08ff..5f9947ed4fe 100644 --- a/src/auto-reply/reply/commands-subagents-focus.test.ts +++ b/src/auto-reply/reply/commands-subagents-focus.test.ts @@ -9,6 +9,7 @@ import { installSubagentsCommandCoreMocks } from "./commands-subagents.test-mock const hoisted = vi.hoisted(() => { const callGatewayMock = vi.fn(); + const listAcpSessionEntriesMock = vi.fn(); const readAcpSessionEntryMock = vi.fn(); const sessionBindingCapabilitiesMock = vi.fn(); const sessionBindingBindMock = vi.fn(); @@ -17,6 +18,7 @@ const hoisted = vi.hoisted(() => { const sessionBindingUnbindMock = vi.fn(); return { callGatewayMock, + listAcpSessionEntriesMock, readAcpSessionEntryMock, sessionBindingCapabilitiesMock, sessionBindingBindMock, @@ -57,6 +59,7 @@ vi.mock("../../acp/runtime/session-meta.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, + listAcpSessionEntries: (params: unknown) => hoisted.listAcpSessionEntriesMock(params), readAcpSessionEntry: (params: unknown) => hoisted.readAcpSessionEntryMock(params), }; }); @@ -188,6 +191,7 @@ describe("/focus, /unfocus, /agents", () => { beforeEach(() => { resetSubagentRegistryForTests(); hoisted.callGatewayMock.mockReset(); + hoisted.listAcpSessionEntriesMock.mockReset().mockResolvedValue([]); hoisted.readAcpSessionEntryMock.mockReset().mockReturnValue(null); hoisted.sessionBindingCapabilitiesMock .mockReset() @@ -261,6 +265,71 @@ describe("/focus, /unfocus, /agents", () => { ); }); + it("/focus resolves ACP session key shorthand from the ACP key suffix", async () => { + hoisted.callGatewayMock.mockRejectedValue(new Error("not found")); + hoisted.sessionBindingCapabilitiesMock.mockReturnValue(createSessionBindingCapabilities()); + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(null); + hoisted.sessionBindingBindMock.mockImplementation( + async (input: { + targetSessionKey: string; + conversation: { channel: string; accountId: string; conversationId: string }; + metadata?: Record; + }) => + createSessionBindingRecord({ + targetSessionKey: input.targetSessionKey, + conversation: { + channel: input.conversation.channel, + accountId: input.conversation.accountId, + conversationId: input.conversation.conversationId, + }, + metadata: { + boundBy: + typeof input.metadata?.boundBy === "string" ? input.metadata.boundBy : "user-1", + }, + }), + ); + hoisted.listAcpSessionEntriesMock.mockResolvedValue([ + { + cfg: baseCfg, + storePath: "/tmp/agents/codex/sessions.json", + sessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + storeSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + entry: { + sessionId: "session-random-id", + updatedAt: Date.now(), + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime-1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime-1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }, + ]); + + const result = await handleSubagentsCommand( + createDiscordCommandParams("/focus acp:982649c1-143b-40e4-9bb8-d90ae83f174f"), + true, + ); + + expect(result?.reply?.text).toContain("bound this thread"); + expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith( + expect.objectContaining({ + targetSessionKey: "agent:codex:acp:982649c1-143b-40e4-9bb8-d90ae83f174f", + }), + ); + }); + it("/focus rejects Matrix child thread creation when spawn config is not enabled", async () => { const result = await focusCodexAcp(createMatrixCommandParams("/focus codex-acp")); diff --git a/src/auto-reply/reply/commands-subagents/shared.ts b/src/auto-reply/reply/commands-subagents/shared.ts index 839acdc98c3..a2e3cdd11f3 100644 --- a/src/auto-reply/reply/commands-subagents/shared.ts +++ b/src/auto-reply/reply/commands-subagents/shared.ts @@ -11,13 +11,13 @@ import { sanitizeTextContent, stripToolMessages, } from "../../../agents/tools/sessions-helpers.js"; +import type { OpenClawConfig } from "../../../config/config.js"; import type { SessionEntry, loadSessionStore as loadSessionStoreFn, resolveStorePath as resolveStorePathFn, } from "../../../config/sessions.js"; import { parseDiscordTarget } from "../../../discord/targets.js"; -import { callGateway } from "../../../gateway/call.js"; import { formatTimeAgo } from "../../../infra/format-time/format-relative.ts"; import { parseAgentSessionKey } from "../../../routing/session-key.js"; import { isSubagentSessionKey } from "../../../routing/session-key.js"; @@ -41,6 +41,7 @@ import { resolveMatrixConversationId, resolveMatrixParentConversationId, } from "../matrix-context.ts"; +import { resolveSessionKeyByReference } from "../session-target-resolution.js"; import { formatRunLabel, formatRunStatus, @@ -356,6 +357,7 @@ export function resolveDiscordChannelIdForFocus( } export async function resolveFocusTargetSession(params: { + cfg: OpenClawConfig; runs: SubagentRunRecord[]; token: string; }): Promise { @@ -376,33 +378,18 @@ export async function resolveFocusTargetSession(params: { return null; } - const attempts: Array> = []; - attempts.push({ key: token }); - if (looksLikeSessionId(token)) { - attempts.push({ sessionId: token }); - } - attempts.push({ label: token }); - - for (const attempt of attempts) { - try { - const resolved = await callGateway<{ key?: string }>({ - method: "sessions.resolve", - params: attempt, - }); - const key = typeof resolved?.key === "string" ? resolved.key.trim() : ""; - if (!key) { - continue; - } - const parsed = parseAgentSessionKey(key); - return { - targetKind: key.includes(":subagent:") ? "subagent" : "acp", - targetSessionKey: key, - agentId: parsed?.agentId ?? "main", - label: token, - }; - } catch { - // Try the next resolution strategy. - } + const key = await resolveSessionKeyByReference({ + cfg: params.cfg, + token, + }); + if (key) { + const parsed = parseAgentSessionKey(key); + return { + targetKind: key.includes(":subagent:") ? "subagent" : "acp", + targetSessionKey: key, + agentId: parsed?.agentId ?? "main", + label: token, + }; } return null; } diff --git a/src/auto-reply/reply/session-target-resolution.ts b/src/auto-reply/reply/session-target-resolution.ts new file mode 100644 index 00000000000..90d9ca21cbe --- /dev/null +++ b/src/auto-reply/reply/session-target-resolution.ts @@ -0,0 +1,95 @@ +import { listAcpSessionEntries } from "../../acp/runtime/session-meta.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { callGateway } from "../../gateway/call.js"; +import { SESSION_ID_RE } from "../../sessions/session-id.js"; + +function resolveAcpSessionKeySuffixToken(token: string): string | null { + const trimmed = token.trim(); + if (!trimmed) { + return null; + } + if (SESSION_ID_RE.test(trimmed)) { + return trimmed; + } + const lower = trimmed.toLowerCase(); + if (!lower.startsWith("acp:")) { + return null; + } + const suffix = trimmed.slice("acp:".length).trim(); + return SESSION_ID_RE.test(suffix) ? suffix : null; +} + +async function resolveSessionKeyViaGateway(token: string): Promise { + const trimmed = token.trim(); + if (!trimmed) { + return null; + } + const attempts: Array> = [{ key: trimmed }]; + if (SESSION_ID_RE.test(trimmed)) { + attempts.push({ sessionId: trimmed }); + } + attempts.push({ label: trimmed }); + + for (const params of attempts) { + try { + const resolved = await callGateway<{ key?: string }>({ + method: "sessions.resolve", + params, + timeoutMs: 8_000, + }); + const key = typeof resolved?.key === "string" ? resolved.key.trim() : ""; + if (key) { + return key; + } + } catch { + // Try the next resolution strategy. + } + } + return null; +} + +async function resolveAcpSessionKeyViaFallback(params: { + cfg: OpenClawConfig; + token: string; +}): Promise { + const trimmed = params.token.trim(); + const suffix = resolveAcpSessionKeySuffixToken(trimmed); + if (!suffix) { + return null; + } + + let sessions: Awaited>; + try { + sessions = await listAcpSessionEntries({ cfg: params.cfg }); + } catch { + return null; + } + + const matches = sessions + .map((session) => session.sessionKey.trim()) + .filter((sessionKey) => sessionKey.endsWith(`:acp:${suffix}`)); + if (matches.length !== 1) { + return null; + } + return matches[0] ?? null; +} + +export async function resolveSessionKeyByReference(params: { + cfg: OpenClawConfig; + token: string; +}): Promise { + const trimmed = params.token.trim(); + if (!trimmed) { + return null; + } + + const resolved = await resolveSessionKeyViaGateway(trimmed); + if (resolved) { + return resolved; + } + + return await resolveAcpSessionKeyViaFallback({ + cfg: params.cfg, + token: trimmed, + }); +} diff --git a/src/infra/outbound/session-binding-service.test.ts b/src/infra/outbound/session-binding-service.test.ts index 446a2eb1276..044f6373a16 100644 --- a/src/infra/outbound/session-binding-service.test.ts +++ b/src/infra/outbound/session-binding-service.test.ts @@ -1,3 +1,5 @@ +import path from "node:path"; +import { createJiti } from "jiti"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { __testing, @@ -199,6 +201,38 @@ describe("session binding service", () => { }); }); + it("shares adapter registrations across native ESM and Jiti loader paths", () => { + const serviceModulePath = path.join( + process.cwd(), + "src/infra/outbound/session-binding-service.ts", + ); + const jiti = createJiti(import.meta.url, { + interopDefault: true, + extensions: [".ts", ".tsx", ".mts", ".cts", ".js", ".mjs", ".cjs", ".json"], + }); + const viaJiti = jiti(serviceModulePath) as typeof import("./session-binding-service.js"); + + viaJiti.registerSessionBindingAdapter({ + channel: "matrix", + accountId: "default", + capabilities: { + placements: ["current", "child"], + }, + bind: async (input) => createRecord(input), + listBySession: () => [], + resolveByConversation: () => null, + }); + + expect( + getSessionBindingService().getCapabilities({ channel: "matrix", accountId: "default" }), + ).toEqual({ + adapterAvailable: true, + bindSupported: true, + unbindSupported: false, + placements: ["current", "child"], + }); + }); + it("routes lifecycle updates through the channel/account adapter", async () => { const setIdleTimeoutBySession = vi.fn(() => [ { diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index 15ec9c874ea..9b0f992d49a 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -165,7 +165,32 @@ function resolveAdapterCapabilities( }; } -const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map(); +type SessionBindingServiceGlobalState = { + adaptersByChannelAccount: Map; +}; + +const SESSION_BINDING_SERVICE_STATE_KEY = Symbol.for("openclaw.sessionBindingServiceState"); + +function createSessionBindingServiceGlobalState(): SessionBindingServiceGlobalState { + return { + adaptersByChannelAccount: new Map(), + }; +} + +function resolveSessionBindingServiceGlobalState(): SessionBindingServiceGlobalState { + const runtimeGlobal = globalThis as typeof globalThis & { + [SESSION_BINDING_SERVICE_STATE_KEY]?: SessionBindingServiceGlobalState; + }; + if (!runtimeGlobal[SESSION_BINDING_SERVICE_STATE_KEY]) { + runtimeGlobal[SESSION_BINDING_SERVICE_STATE_KEY] = createSessionBindingServiceGlobalState(); + } + return runtimeGlobal[SESSION_BINDING_SERVICE_STATE_KEY]; +} + +// Plugins are loaded via Jiti while core code imports this module via native ESM. +// Keep adapter state on globalThis so both loader paths observe the same registry. +const ADAPTERS_BY_CHANNEL_ACCOUNT = + resolveSessionBindingServiceGlobalState().adaptersByChannelAccount; export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void { const key = toAdapterKey({