diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 594a4caadf7..300e8445c0f 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -4,8 +4,7 @@ import path from "node:path"; import { __testing as sessionBindingTesting, registerSessionBindingAdapter, -} from "openclaw/plugin-sdk/conversation-runtime"; -import { recordSessionMetaFromInbound } from "openclaw/plugin-sdk/session-store-runtime"; +} from "openclaw/plugin-sdk/session-binding-runtime"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { installMatrixMonitorTestRuntime } from "../../test-runtime.js"; import { MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY } from "../send/types.js"; @@ -54,6 +53,42 @@ vi.mock("./replies.js", () => ({ deliverMatrixReplies: deliverMatrixRepliesMock, })); +function writeMatrixSessionMeta( + storePath: string, + sessionKey: string, + origin: { + chatType: "direct" | "group"; + from: string; + to: string; + nativeChannelId?: string; + nativeDirectUserId?: string; + }, +): void { + const store = fs.existsSync(storePath) + ? (JSON.parse(fs.readFileSync(storePath, "utf8")) as Record>) + : {}; + const existing = store[sessionKey] ?? { + sessionId: `sess-${Object.keys(store).length + 1}`, + updatedAt: Date.now(), + }; + const existingOrigin = + typeof existing.origin === "object" && existing.origin !== null + ? (existing.origin as Record) + : {}; + store[sessionKey] = { + ...existing, + origin: { + ...existingOrigin, + provider: "matrix", + surface: "matrix", + accountId: "ops", + ...origin, + }, + }; + fs.mkdirSync(path.dirname(storePath), { recursive: true }); + fs.writeFileSync(storePath, JSON.stringify(store, null, 2), "utf8"); +} + beforeEach(() => { sessionBindingTesting.resetSessionBindingAdaptersForTests(); installMatrixMonitorTestRuntime(); @@ -776,21 +811,11 @@ describe("matrix monitor handler pairing account scope", () => { const sendNotice = vi.fn(async () => "$notice"); try { - await recordSessionMetaFromInbound({ - storePath, - sessionKey: "agent:ops:main", - ctx: { - SessionKey: "agent:ops:main", - AccountId: "ops", - ChatType: "direct", - Provider: "matrix", - Surface: "matrix", - From: "matrix:@user:example.org", - To: "room:!other:example.org", - NativeChannelId: "!other:example.org", - OriginatingChannel: "matrix", - OriginatingTo: "room:!other:example.org", - }, + writeMatrixSessionMeta(storePath, "agent:ops:main", { + chatType: "direct", + from: "matrix:@user:example.org", + to: "room:!other:example.org", + nativeChannelId: "!other:example.org", }); const { handler } = createMatrixHandlerTestHarness({ @@ -837,21 +862,11 @@ describe("matrix monitor handler pairing account scope", () => { const sendNotice = vi.fn(async () => "$notice"); try { - await recordSessionMetaFromInbound({ - storePath, - sessionKey: "agent:ops:matrix:direct:@user:example.org", - ctx: { - SessionKey: "agent:ops:matrix:direct:@user:example.org", - AccountId: "ops", - ChatType: "direct", - Provider: "matrix", - Surface: "matrix", - From: "matrix:@user:example.org", - To: "room:!other:example.org", - NativeChannelId: "!other:example.org", - OriginatingChannel: "matrix", - OriginatingTo: "room:!other:example.org", - }, + writeMatrixSessionMeta(storePath, "agent:ops:matrix:direct:@user:example.org", { + chatType: "direct", + from: "matrix:@user:example.org", + to: "room:!other:example.org", + nativeChannelId: "!other:example.org", }); const { handler } = createMatrixHandlerTestHarness({ @@ -896,21 +911,11 @@ describe("matrix monitor handler pairing account scope", () => { const sendNotice = vi.fn(async () => "$notice"); try { - await recordSessionMetaFromInbound({ - storePath, - sessionKey: "agent:ops:main", - ctx: { - SessionKey: "agent:ops:main", - AccountId: "ops", - ChatType: "direct", - Provider: "matrix", - Surface: "matrix", - From: "matrix:@user:example.org", - To: "room:!other:example.org", - NativeChannelId: "!other:example.org", - OriginatingChannel: "matrix", - OriginatingTo: "room:!other:example.org", - }, + writeMatrixSessionMeta(storePath, "agent:ops:main", { + chatType: "direct", + from: "matrix:@user:example.org", + to: "room:!other:example.org", + nativeChannelId: "!other:example.org", }); const { handler } = createMatrixHandlerTestHarness({ @@ -963,37 +968,17 @@ describe("matrix monitor handler pairing account scope", () => { const sendNotice = vi.fn(async () => "$notice"); try { - await recordSessionMetaFromInbound({ - storePath, - sessionKey: "agent:ops:main", - ctx: { - SessionKey: "agent:ops:main", - AccountId: "ops", - ChatType: "direct", - Provider: "matrix", - Surface: "matrix", - From: "matrix:@user:example.org", - To: "room:!other:example.org", - NativeChannelId: "!other:example.org", - OriginatingChannel: "matrix", - OriginatingTo: "room:!other:example.org", - }, + writeMatrixSessionMeta(storePath, "agent:ops:main", { + chatType: "direct", + from: "matrix:@user:example.org", + to: "room:!other:example.org", + nativeChannelId: "!other:example.org", }); - await recordSessionMetaFromInbound({ - storePath, - sessionKey: "agent:ops:main", - ctx: { - SessionKey: "agent:ops:main", - AccountId: "ops", - ChatType: "direct", - Provider: "matrix", - Surface: "matrix", - From: "matrix:@other:example.org", - To: "room:@other:example.org", - NativeDirectUserId: "@user:example.org", - OriginatingChannel: "matrix", - OriginatingTo: "room:@other:example.org", - }, + writeMatrixSessionMeta(storePath, "agent:ops:main", { + chatType: "direct", + from: "matrix:@other:example.org", + to: "room:@other:example.org", + nativeDirectUserId: "@user:example.org", }); const { handler } = createMatrixHandlerTestHarness({ @@ -1030,21 +1015,11 @@ describe("matrix monitor handler pairing account scope", () => { const sendNotice = vi.fn(async () => "$notice"); try { - await recordSessionMetaFromInbound({ - storePath, - sessionKey: "agent:ops:main", - ctx: { - SessionKey: "agent:ops:main", - AccountId: "ops", - ChatType: "group", - Provider: "matrix", - Surface: "matrix", - From: "matrix:channel:!group:example.org", - To: "room:!group:example.org", - NativeChannelId: "!group:example.org", - OriginatingChannel: "matrix", - OriginatingTo: "room:!group:example.org", - }, + writeMatrixSessionMeta(storePath, "agent:ops:main", { + chatType: "group", + from: "matrix:channel:!group:example.org", + to: "room:!group:example.org", + nativeChannelId: "!group:example.org", }); const { handler } = createMatrixHandlerTestHarness({ @@ -2167,6 +2142,15 @@ describe("matrix monitor handler draft streaming", () => { }) { let capturedDeliver: DeliverFn | undefined; let capturedReplyOpts: ReplyOpts | undefined; + let resolveCaptured: (() => void) | undefined; + const captured = new Promise((resolve) => { + resolveCaptured = resolve; + }); + const notifyCaptured = () => { + if (capturedDeliver && capturedReplyOpts) { + resolveCaptured?.(); + } + }; // Gate that keeps the handler's model run alive until the test releases it. let resolveRunGate: (() => void) | undefined; const runGate = new Promise((resolve) => { @@ -2189,6 +2173,7 @@ describe("matrix monitor handler draft streaming", () => { client: { redactEvent: redactEventMock }, createReplyDispatcherWithTyping: (params: Record | undefined) => { capturedDeliver = params?.deliver as DeliverFn | undefined; + notifyCaptured(); return { dispatcher: { markComplete: () => {}, @@ -2201,6 +2186,7 @@ describe("matrix monitor handler draft streaming", () => { }, dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => { capturedReplyOpts = args?.replyOptions; + notifyCaptured(); // Block until the test is done exercising callbacks. await runGate; return { queuedFinal: true, counts: { final: 1, block: 0, tool: 0 } }; @@ -2222,12 +2208,7 @@ describe("matrix monitor handler draft streaming", () => { "!room:example.org", createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), ); - // Wait for callbacks to be captured. - await vi.waitFor(() => { - if (!capturedDeliver || !capturedReplyOpts) { - throw new Error("Streaming callbacks not captured yet"); - } - }); + await captured; return { deliver: capturedDeliver!, opts: capturedReplyOpts!, diff --git a/src/plugin-sdk/session-binding-runtime.ts b/src/plugin-sdk/session-binding-runtime.ts index 125a219aa32..9cf3e692545 100644 --- a/src/plugin-sdk/session-binding-runtime.ts +++ b/src/plugin-sdk/session-binding-runtime.ts @@ -1,7 +1,9 @@ // Narrow session-binding runtime surface for channels that only need current // conversation binding state, not configured binding routing or pairing stores. export { + __testing, getSessionBindingService, + registerSessionBindingAdapter, type SessionBindingRecord, type SessionBindingService, } from "../infra/outbound/session-binding-service.js"; diff --git a/src/plugin-sdk/session-store-runtime.ts b/src/plugin-sdk/session-store-runtime.ts index 1cd613dd7b2..54ec8ee7448 100644 --- a/src/plugin-sdk/session-store-runtime.ts +++ b/src/plugin-sdk/session-store-runtime.ts @@ -2,4 +2,3 @@ export { loadSessionStore } from "../config/sessions/store-load.js"; export { resolveSessionStoreEntry } from "../config/sessions/store-entry.js"; -export { recordSessionMetaFromInbound } from "../config/sessions/store.js";