diff --git a/extensions/msteams/src/monitor-handler.ts b/extensions/msteams/src/monitor-handler.ts index 7f4d609ade3..9cab7913e96 100644 --- a/extensions/msteams/src/monitor-handler.ts +++ b/extensions/msteams/src/monitor-handler.ts @@ -1,10 +1,11 @@ +import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing"; import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime"; import { type OpenClawConfig, type RuntimeEnv } from "../runtime-api.js"; import type { MSTeamsConversationStore } from "./conversation-store.js"; import { formatUnknownError } from "./errors.js"; import { buildFeedbackEvent, runFeedbackReflection } from "./feedback-reflection.js"; import { buildFileInfoCard, parseFileConsentInvoke, uploadToConsentUrl } from "./file-consent.js"; -import { normalizeMSTeamsConversationId } from "./inbound.js"; +import { extractMSTeamsConversationMessageId, normalizeMSTeamsConversationId } from "./inbound.js"; import type { MSTeamsAdapter } from "./messenger.js"; import { resolveMSTeamsSenderAccess } from "./monitor-handler/access.js"; import { createMSTeamsMessageHandler } from "./monitor-handler/message-handler.js"; @@ -257,7 +258,8 @@ async function handleFeedbackInvoke( } // Strip ;messageid=... suffix to match the normalized ID used by the message handler. - const conversationId = normalizeMSTeamsConversationId(activity.conversation?.id ?? "unknown"); + const rawConversationId = activity.conversation?.id ?? "unknown"; + const conversationId = normalizeMSTeamsConversationId(rawConversationId); const senderId = activity.from?.aadObjectId ?? activity.from?.id ?? "unknown"; const messageId = value.replyToId ?? activity.replyToId ?? "unknown"; const isNegative = reaction === "dislike"; @@ -278,6 +280,22 @@ async function handleFeedbackInvoke( }, }); + // Match the thread-aware session key used by the message handler so feedback + // events land in the correct per-thread transcript. For channel threads, the + // thread root ID comes from the ;messageid= suffix on the conversation ID or + // from activity.replyToId. + const feedbackThreadId = isChannel + ? (extractMSTeamsConversationMessageId(rawConversationId) ?? activity.replyToId ?? undefined) + : undefined; + if (feedbackThreadId) { + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey: route.sessionKey, + threadId: feedbackThreadId, + parentSessionKey: route.sessionKey, + }); + route.sessionKey = threadKeys.sessionKey; + } + // Log feedback event to session JSONL const feedbackEvent = buildFeedbackEvent({ messageId, diff --git a/extensions/msteams/src/monitor-handler/message-handler.thread-session.test.ts b/extensions/msteams/src/monitor-handler/message-handler.thread-session.test.ts new file mode 100644 index 00000000000..6fef9ccd294 --- /dev/null +++ b/extensions/msteams/src/monitor-handler/message-handler.thread-session.test.ts @@ -0,0 +1,262 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig, PluginRuntime, RuntimeEnv } from "../../runtime-api.js"; +import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js"; +import { setMSTeamsRuntime } from "../runtime.js"; +import { createMSTeamsMessageHandler } from "./message-handler.js"; + +const runtimeApiMockState = vi.hoisted(() => ({ + dispatchReplyFromConfigWithSettledDispatcher: vi.fn(async (params: { ctxPayload: unknown }) => ({ + queuedFinal: false, + counts: {}, + capturedCtxPayload: params.ctxPayload, + })), +})); + +vi.mock("../../runtime-api.js", async () => { + const actual = + await vi.importActual("../../runtime-api.js"); + return { + ...actual, + dispatchReplyFromConfigWithSettledDispatcher: + runtimeApiMockState.dispatchReplyFromConfigWithSettledDispatcher, + }; +}); + +vi.mock("../graph-thread.js", async () => { + const actual = await vi.importActual("../graph-thread.js"); + return { + ...actual, + resolveTeamGroupId: vi.fn(async () => "group-1"), + fetchChannelMessage: vi.fn(async () => undefined), + fetchThreadReplies: vi.fn(async () => []), + }; +}); + +vi.mock("../reply-dispatcher.js", () => ({ + createMSTeamsReplyDispatcher: () => ({ + dispatcher: {}, + replyOptions: {}, + markDispatchIdle: vi.fn(), + }), +})); + +describe("msteams thread session isolation", () => { + const channelConversationId = "19:general@thread.tacv2"; + + function createDeps(cfg: OpenClawConfig) { + const recordInboundSession = vi.fn(async (_params: { sessionKey: string }) => undefined); + const resolveAgentRoute = vi.fn(({ peer }: { peer: { kind: string; id: string } }) => ({ + sessionKey: `agent:main:msteams:${peer.kind}:${peer.id}`, + agentId: "main", + accountId: "default", + mainSessionKey: "agent:main:main", + lastRoutePolicy: "session" as const, + matchedBy: "default" as const, + })); + + setMSTeamsRuntime({ + logging: { shouldLogVerbose: () => false }, + system: { enqueueSystemEvent: vi.fn() }, + channel: { + debounce: { + resolveInboundDebounceMs: () => 0, + createInboundDebouncer: (params: { + onFlush: (entries: T[]) => Promise; + }): { enqueue: (entry: T) => Promise } => ({ + enqueue: async (entry: T) => { + await params.onFlush([entry]); + }, + }), + }, + pairing: { + readAllowFromStore: vi.fn(async () => []), + upsertPairingRequest: vi.fn(async () => null), + }, + text: { + hasControlCommand: () => false, + resolveTextChunkLimit: () => 4000, + }, + routing: { + resolveAgentRoute, + }, + reply: { + formatAgentEnvelope: ({ body }: { body: string }) => body, + finalizeInboundContext: >(ctx: T) => ctx, + }, + session: { + recordInboundSession, + resolveStorePath: () => "/tmp/test-store", + }, + }, + } as unknown as PluginRuntime); + + const deps: MSTeamsMessageHandlerDeps = { + cfg, + runtime: { error: vi.fn() } as unknown as RuntimeEnv, + appId: "test-app", + adapter: {} as MSTeamsMessageHandlerDeps["adapter"], + tokenProvider: { + getAccessToken: vi.fn(async () => "token"), + }, + textLimit: 4000, + mediaMaxBytes: 1024 * 1024, + conversationStore: { + upsert: vi.fn(async () => undefined), + } as unknown as MSTeamsMessageHandlerDeps["conversationStore"], + pollStore: { + recordVote: vi.fn(async () => null), + } as unknown as MSTeamsMessageHandlerDeps["pollStore"], + log: { + info: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + } as unknown as MSTeamsMessageHandlerDeps["log"], + }; + + return { + deps, + recordInboundSession, + resolveAgentRoute, + }; + } + + function buildActivity(overrides: Record = {}) { + return { + id: "msg-1", + type: "message", + text: "hello", + from: { + id: "user-id", + aadObjectId: "user-aad", + name: "Test User", + }, + recipient: { + id: "bot-id", + name: "Bot", + }, + conversation: { + id: channelConversationId, + conversationType: "channel", + }, + channelData: { team: { id: "team-1" } }, + attachments: [], + entities: [{ type: "mention", mentioned: { id: "bot-id" } }], + ...overrides, + }; + } + + it("appends thread suffix to session key for channel thread replies", async () => { + const cfg: OpenClawConfig = { + channels: { msteams: { groupPolicy: "open" } }, + } as OpenClawConfig; + const { deps, recordInboundSession } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + // Thread reply: has replyToId pointing to the thread root + await handler({ + activity: buildActivity({ replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(recordInboundSession).toHaveBeenCalledTimes(1); + const sessionKey = recordInboundSession.mock.calls[0]?.[0]?.sessionKey; + expect(sessionKey).toContain("thread:"); + expect(sessionKey).toContain("thread-root-123"); + }); + + it("does not append thread suffix for top-level channel messages", async () => { + const cfg: OpenClawConfig = { + channels: { msteams: { groupPolicy: "open" } }, + } as OpenClawConfig; + const { deps, recordInboundSession } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + // Top-level channel message: no replyToId + await handler({ + activity: buildActivity({ replyToId: undefined }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(recordInboundSession).toHaveBeenCalledTimes(1); + const sessionKey = recordInboundSession.mock.calls[0]?.[0]?.sessionKey; + expect(sessionKey).not.toContain("thread:"); + expect(sessionKey).toBe(`agent:main:msteams:channel:${channelConversationId}`); + }); + + it("produces different session keys for different threads in the same channel", async () => { + const cfg: OpenClawConfig = { + channels: { msteams: { groupPolicy: "open" } }, + } as OpenClawConfig; + const { deps, recordInboundSession } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: buildActivity({ id: "msg-1", replyToId: "thread-A" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + await handler({ + activity: buildActivity({ id: "msg-2", replyToId: "thread-B" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(recordInboundSession).toHaveBeenCalledTimes(2); + const sessionKeyA = recordInboundSession.mock.calls[0]?.[0]?.sessionKey; + const sessionKeyB = recordInboundSession.mock.calls[1]?.[0]?.sessionKey; + expect(sessionKeyA).not.toBe(sessionKeyB); + expect(sessionKeyA).toContain("thread-a"); // normalized lowercase + expect(sessionKeyB).toContain("thread-b"); + }); + + it("does not affect DM session keys", async () => { + const cfg: OpenClawConfig = { + channels: { msteams: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const { deps, recordInboundSession } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: { + ...buildActivity(), + conversation: { + id: "a:dm-conversation", + conversationType: "personal", + }, + channelData: {}, + replyToId: "some-reply-id", + entities: [], + }, + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(recordInboundSession).toHaveBeenCalledTimes(1); + const sessionKey = recordInboundSession.mock.calls[0]?.[0]?.sessionKey; + expect(sessionKey).not.toContain("thread:"); + }); + + it("does not affect group chat session keys", async () => { + const cfg: OpenClawConfig = { + channels: { msteams: { groupPolicy: "open" } }, + } as OpenClawConfig; + const { deps, recordInboundSession } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: { + ...buildActivity(), + conversation: { + id: "19:group-chat-id@unq.gbl.spaces", + conversationType: "groupChat", + }, + channelData: {}, + replyToId: "some-reply-id", + entities: [{ type: "mention", mentioned: { id: "bot-id" } }], + }, + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(recordInboundSession).toHaveBeenCalledTimes(1); + const sessionKey = recordInboundSession.mock.calls[0]?.[0]?.sessionKey; + expect(sessionKey).not.toContain("thread:"); + }); +}); diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index 1f4c64ea75f..60c8b9792fb 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -1,4 +1,5 @@ import { resolveInboundMentionDecision } from "openclaw/plugin-sdk/channel-inbound"; +import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing"; import { buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, @@ -442,6 +443,21 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { }, }); + // Isolate channel thread sessions: each thread gets its own session key so + // context does not bleed across threads. Prefer conversationMessageId (the + // ;messageid= portion of conversation.id, i.e. the thread root) over + // activity.replyToId (which may point to a non-root parent in deep threads). + // DMs and group chats are unaffected — only channel thread replies fork. + const channelThreadId = isChannel + ? (conversationMessageId ?? activity.replyToId ?? undefined) + : undefined; + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey: route.sessionKey, + threadId: channelThreadId, + parentSessionKey: channelThreadId ? route.sessionKey : undefined, + }); + route.sessionKey = threadKeys.sessionKey; + const preview = rawBody.replace(/\s+/g, " ").slice(0, 160); const inboundLabel = isDirectMessage ? `Teams DM from ${senderName}`