From 01ea7e49212a244afd35a1dfd2ef8569731dad4f Mon Sep 17 00:00:00 2001 From: sudie-codes Date: Fri, 10 Apr 2026 12:42:02 -0700 Subject: [PATCH] feat(msteams): auto-inject parent message context for thread replies (#54932) (#63945) * feat(msteams): auto-inject parent message context for thread replies (#54932) * msteams: use Promise.allSettled for thread context, remove no-op buildInjectedKey * fix(msteams): gate thread parent context by visibility --------- Co-authored-by: Brad Groux <3053586+BradGroux@users.noreply.github.com> --- .../message-handler.authz.test.ts | 4 + .../message-handler.thread-parent.test.ts | 323 ++++++++++++++++++ .../src/monitor-handler/message-handler.ts | 72 +++- .../msteams/src/thread-parent-context.test.ts | 224 ++++++++++++ .../msteams/src/thread-parent-context.ts | 159 +++++++++ 5 files changed, 770 insertions(+), 12 deletions(-) create mode 100644 extensions/msteams/src/monitor-handler/message-handler.thread-parent.test.ts create mode 100644 extensions/msteams/src/thread-parent-context.test.ts create mode 100644 extensions/msteams/src/thread-parent-context.ts diff --git a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts index dc1483e99c4..e54cb355468 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.authz.test.ts @@ -4,6 +4,7 @@ import type { MSTeamsConversationStore } from "../conversation-store.js"; import type { GraphThreadMessage } from "../graph-thread.js"; import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js"; import { setMSTeamsRuntime } from "../runtime.js"; +import { _resetThreadParentContextCachesForTest } from "../thread-parent-context.js"; import { createMSTeamsMessageHandler } from "./message-handler.js"; type HandlerInput = Parameters>[0]; @@ -160,6 +161,9 @@ describe("msteams monitor handler authz", () => { graphThreadMockState.resolveTeamGroupId.mockClear(); graphThreadMockState.fetchChannelMessage.mockReset(); graphThreadMockState.fetchThreadReplies.mockReset(); + // Parent-context LRU + per-session dedupe are module-level; clear between + // cases so stale parent fetches from earlier tests don't bleed in. + _resetThreadParentContextCachesForTest(); } function createThreadMessage(params: { diff --git a/extensions/msteams/src/monitor-handler/message-handler.thread-parent.test.ts b/extensions/msteams/src/monitor-handler/message-handler.thread-parent.test.ts new file mode 100644 index 00000000000..61738bff614 --- /dev/null +++ b/extensions/msteams/src/monitor-handler/message-handler.thread-parent.test.ts @@ -0,0 +1,323 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig, PluginRuntime, RuntimeEnv } from "../../runtime-api.js"; +import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js"; +import { setMSTeamsRuntime } from "../runtime.js"; +import { _resetThreadParentContextCachesForTest } from "../thread-parent-context.js"; +import { createMSTeamsMessageHandler } from "./message-handler.js"; + +const runtimeApiMockState = vi.hoisted(() => ({ + dispatchReplyFromConfigWithSettledDispatcher: vi.fn(async (params: { ctxPayload: unknown }) => ({ + queuedFinal: false, + counts: {}, + capturedCtxPayload: params.ctxPayload, + })), +})); + +vi.mock("../../runtime-api.js", async () => { + const actual = + await vi.importActual("../../runtime-api.js"); + return { + ...actual, + dispatchReplyFromConfigWithSettledDispatcher: + runtimeApiMockState.dispatchReplyFromConfigWithSettledDispatcher, + }; +}); + +const fetchChannelMessageMock = vi.hoisted(() => vi.fn()); +const fetchThreadRepliesMock = vi.hoisted(() => vi.fn(async () => [])); +const resolveTeamGroupIdMock = vi.hoisted(() => vi.fn(async () => "group-1")); + +vi.mock("../graph-thread.js", async () => { + const actual = await vi.importActual("../graph-thread.js"); + return { + ...actual, + resolveTeamGroupId: resolveTeamGroupIdMock, + fetchChannelMessage: fetchChannelMessageMock, + fetchThreadReplies: fetchThreadRepliesMock, + }; +}); + +vi.mock("../reply-dispatcher.js", () => ({ + createMSTeamsReplyDispatcher: () => ({ + dispatcher: {}, + replyOptions: {}, + markDispatchIdle: vi.fn(), + }), +})); + +describe("msteams thread parent context injection", () => { + const channelConversationId = "19:general@thread.tacv2"; + + function createDeps(cfg: OpenClawConfig) { + const enqueueSystemEvent = vi.fn(); + const recordInboundSession = vi.fn(async (_params: { sessionKey: string }) => undefined); + const resolveAgentRoute = vi.fn(({ peer }: { peer: { kind: string; id: string } }) => ({ + sessionKey: `agent:main:msteams:${peer.kind}:${peer.id}`, + agentId: "main", + accountId: "default", + mainSessionKey: "agent:main:main", + lastRoutePolicy: "session" as const, + matchedBy: "default" as const, + })); + + setMSTeamsRuntime({ + logging: { shouldLogVerbose: () => false }, + system: { enqueueSystemEvent }, + channel: { + debounce: { + resolveInboundDebounceMs: () => 0, + createInboundDebouncer: (params: { + onFlush: (entries: T[]) => Promise; + }): { enqueue: (entry: T) => Promise } => ({ + enqueue: async (entry: T) => { + await params.onFlush([entry]); + }, + }), + }, + pairing: { + readAllowFromStore: vi.fn(async () => []), + upsertPairingRequest: vi.fn(async () => null), + }, + text: { + hasControlCommand: () => false, + resolveTextChunkLimit: () => 4000, + }, + routing: { resolveAgentRoute }, + reply: { + formatAgentEnvelope: ({ body }: { body: string }) => body, + finalizeInboundContext: >(ctx: T) => ctx, + }, + session: { + recordInboundSession, + resolveStorePath: () => "/tmp/test-store", + }, + }, + } as unknown as PluginRuntime); + + const deps: MSTeamsMessageHandlerDeps = { + cfg, + runtime: { error: vi.fn() } as unknown as RuntimeEnv, + appId: "test-app", + adapter: {} as MSTeamsMessageHandlerDeps["adapter"], + tokenProvider: { + getAccessToken: vi.fn(async () => "token"), + }, + textLimit: 4000, + mediaMaxBytes: 1024 * 1024, + conversationStore: { + upsert: vi.fn(async () => undefined), + } as unknown as MSTeamsMessageHandlerDeps["conversationStore"], + pollStore: { + recordVote: vi.fn(async () => null), + } as unknown as MSTeamsMessageHandlerDeps["pollStore"], + log: { + info: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + } as unknown as MSTeamsMessageHandlerDeps["log"], + }; + + return { deps, enqueueSystemEvent }; + } + + function channelActivity(overrides: Record = {}) { + return { + id: "msg-1", + type: "message", + text: "hello", + from: { id: "user-id", aadObjectId: "user-aad", name: "Test User" }, + recipient: { id: "bot-id", name: "Bot" }, + conversation: { id: channelConversationId, conversationType: "channel" }, + channelData: { team: { id: "team-1" } }, + attachments: [], + entities: [{ type: "mention", mentioned: { id: "bot-id" } }], + ...overrides, + }; + } + + function findParentSystemEventCall( + mock: ReturnType, + ): [string, { sessionKey: string; contextKey?: string }] | undefined { + const calls = mock.mock.calls as Array<[string, { sessionKey: string; contextKey?: string }]>; + return calls.find(([text]) => text.startsWith("Replying to @")); + } + + beforeEach(() => { + _resetThreadParentContextCachesForTest(); + fetchChannelMessageMock.mockReset(); + fetchThreadRepliesMock.mockReset(); + fetchThreadRepliesMock.mockImplementation(async () => []); + resolveTeamGroupIdMock.mockReset(); + resolveTeamGroupIdMock.mockImplementation(async () => "group-1"); + runtimeApiMockState.dispatchReplyFromConfigWithSettledDispatcher.mockClear(); + }); + + const cfg: OpenClawConfig = { + channels: { msteams: { groupPolicy: "open" } }, + } as OpenClawConfig; + + it("enqueues a Replying to @sender system event on the first thread reply", async () => { + fetchChannelMessageMock.mockResolvedValueOnce({ + id: "thread-root-123", + from: { user: { displayName: "Alice", id: "alice-id" } }, + body: { content: "Can someone investigate the latency spike?", contentType: "text" }, + }); + const { deps, enqueueSystemEvent } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + const parentCall = findParentSystemEventCall(enqueueSystemEvent); + expect(parentCall).toBeDefined(); + expect(parentCall?.[0]).toBe("Replying to @Alice: Can someone investigate the latency spike?"); + expect(parentCall?.[1]?.contextKey).toContain("msteams:thread-parent:"); + expect(parentCall?.[1]?.contextKey).toContain("thread-root-123"); + }); + + it("caches parent fetches across thread replies in the same session", async () => { + fetchChannelMessageMock.mockResolvedValue({ + id: "thread-root-123", + from: { user: { displayName: "Alice" } }, + body: { content: "Original question", contentType: "text" }, + }); + const { deps } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + await handler({ + activity: channelActivity({ id: "msg-reply-2", replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + // Parent message fetched exactly once across two replies thanks to LRU cache. + expect(fetchChannelMessageMock).toHaveBeenCalledTimes(1); + }); + + it("does not re-enqueue the same parent context within the same session", async () => { + fetchChannelMessageMock.mockResolvedValue({ + id: "thread-root-123", + from: { user: { displayName: "Alice" } }, + body: { content: "Original question", contentType: "text" }, + }); + const { deps, enqueueSystemEvent } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + await handler({ + activity: channelActivity({ id: "msg-reply-2", replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + const parentCalls = enqueueSystemEvent.mock.calls.filter( + ([text]) => typeof text === "string" && text.startsWith("Replying to @"), + ); + expect(parentCalls).toHaveLength(1); + }); + + it("does not enqueue parent context when allowlist visibility blocks the parent sender", async () => { + fetchChannelMessageMock.mockResolvedValue({ + id: "thread-root-123", + from: { user: { displayName: "Mallory", id: "mallory-aad" } }, + body: { content: "Blocked context", contentType: "text" }, + }); + const { deps, enqueueSystemEvent } = createDeps({ + channels: { + msteams: { + groupPolicy: "allowlist", + groupAllowFrom: ["alice-aad"], + contextVisibility: "allowlist", + teams: { + "team-1": { + channels: { + [channelConversationId]: { requireMention: false }, + }, + }, + }, + }, + }, + } as OpenClawConfig); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: channelActivity({ + id: "msg-reply-1", + replyToId: "thread-root-123", + from: { id: "alice-id", aadObjectId: "alice-aad", name: "Alice" }, + }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(findParentSystemEventCall(enqueueSystemEvent)).toBeUndefined(); + }); + + it("handles Graph failure gracefully without throwing or emitting a parent event", async () => { + fetchChannelMessageMock.mockRejectedValueOnce(new Error("graph down")); + const { deps, enqueueSystemEvent } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + const parentCall = findParentSystemEventCall(enqueueSystemEvent); + expect(parentCall).toBeUndefined(); + // Original inbound system event still fires (best-effort parent fetch does not block). + expect(enqueueSystemEvent).toHaveBeenCalled(); + }); + + it("does not fetch parent for DM replyToId", async () => { + fetchChannelMessageMock.mockResolvedValue({ + id: "x", + from: { user: { displayName: "Alice" } }, + body: { content: "should-not-happen", contentType: "text" }, + }); + const { deps, enqueueSystemEvent } = createDeps({ + channels: { msteams: { allowFrom: ["*"] } }, + } as OpenClawConfig); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: { + ...channelActivity(), + conversation: { id: "a:dm-conversation", conversationType: "personal" }, + channelData: {}, + replyToId: "dm-parent", + entities: [], + }, + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(fetchChannelMessageMock).not.toHaveBeenCalled(); + expect(findParentSystemEventCall(enqueueSystemEvent)).toBeUndefined(); + }); + + it("does not fetch parent for top-level channel messages without replyToId", async () => { + fetchChannelMessageMock.mockResolvedValue({ + id: "x", + from: { user: { displayName: "Alice" } }, + body: { content: "should-not-happen", contentType: "text" }, + }); + const { deps, enqueueSystemEvent } = createDeps(cfg); + const handler = createMSTeamsMessageHandler(deps); + + await handler({ + activity: channelActivity({ id: "msg-root-1", replyToId: undefined }), + sendActivity: vi.fn(async () => undefined), + } as unknown as Parameters[0]); + + expect(fetchChannelMessageMock).not.toHaveBeenCalled(); + expect(findParentSystemEventCall(enqueueSystemEvent)).toBeUndefined(); + }); +}); diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index 621d5ca98ee..8351ac91ff6 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -26,10 +26,10 @@ import { isRecord } from "../attachments/shared.js"; import type { StoredConversationReference } from "../conversation-store.js"; import { formatUnknownError } from "../errors.js"; import { - fetchChannelMessage, fetchThreadReplies, formatThreadContext, resolveTeamGroupId, + type GraphThreadMessage, } from "../graph-thread.js"; import { resolveGraphChatId } from "../graph-upload.js"; import { @@ -41,6 +41,13 @@ import { translateMSTeamsDmConversationIdForGraph, wasMSTeamsBotMentioned, } from "../inbound.js"; +import { + fetchParentMessageCached, + formatParentContextEvent, + markParentContextInjected, + shouldInjectParentContext, + summarizeParentMessage, +} from "../thread-parent-context.js"; function extractTextFromHtmlAttachments(attachments: MSTeamsAttachmentLike[]): string { for (const attachment of attachments) { @@ -597,15 +604,64 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { // Fetch thread history when the message is a reply inside a Teams channel thread. // This is a best-effort enhancement; errors are logged and do not block the reply. + // + // We also enqueue a compact `Replying to @sender: …` system event when the parent + // is resolvable. On brand-new thread sessions (see PR #62713), this gives the agent + // immediate parent context even before the fuller `[Thread history]` block is assembled. + // Parent fetches are cached (5 min LRU, 100 entries) and per-session deduped so + // consecutive replies in the same thread do not re-inject identical context. let threadContext: string | undefined; if (activity.replyToId && isChannel && teamId) { try { const graphToken = await tokenProvider.getAccessToken("https://graph.microsoft.com"); const groupId = await resolveTeamGroupId(graphToken, teamId); - const [parentMsg, replies] = await Promise.all([ - fetchChannelMessage(graphToken, groupId, conversationId, activity.replyToId), + // Use allSettled so a failure in one fetch does not discard the other. + // For example, reply-fetch 403 should not throw away a successful parent fetch. + const [parentResult, repliesResult] = await Promise.allSettled([ + fetchParentMessageCached(graphToken, groupId, conversationId, activity.replyToId), fetchThreadReplies(graphToken, groupId, conversationId, activity.replyToId), ]); + const parentMsg = parentResult.status === "fulfilled" ? parentResult.value : undefined; + const replies = repliesResult.status === "fulfilled" ? repliesResult.value : []; + if (parentResult.status === "rejected") { + log.debug?.("failed to fetch parent message", { + error: formatUnknownError(parentResult.reason), + }); + } + if (repliesResult.status === "rejected") { + log.debug?.("failed to fetch thread replies", { + error: formatUnknownError(repliesResult.reason), + }); + } + const isThreadSenderAllowed = (msg: GraphThreadMessage) => + groupPolicy === "allowlist" + ? resolveMSTeamsAllowlistMatch({ + allowFrom: effectiveGroupAllowFrom, + senderId: msg.from?.user?.id ?? "", + senderName: msg.from?.user?.displayName, + allowNameMatching, + }).allowed + : true; + const parentSummary = summarizeParentMessage(parentMsg); + const visibleParentMessages = parentMsg + ? filterSupplementalContextItems({ + items: [parentMsg], + mode: contextVisibilityMode, + kind: "thread", + isSenderAllowed: isThreadSenderAllowed, + }).items + : []; + if ( + parentSummary && + visibleParentMessages.length > 0 && + shouldInjectParentContext(route.sessionKey, activity.replyToId) + ) { + core.system.enqueueSystemEvent(formatParentContextEvent(parentSummary), { + sessionKey: route.sessionKey, + contextKey: `msteams:thread-parent:${conversationId}:${activity.replyToId}`, + }); + markParentContextInjected(route.sessionKey, activity.replyToId); + } const allMessages = parentMsg ? [parentMsg, ...replies] : replies; quoteSenderId = parentMsg?.from?.user?.id ?? parentMsg?.from?.application?.id ?? undefined; quoteSenderName = @@ -616,15 +672,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { items: allMessages, mode: contextVisibilityMode, kind: "thread", - isSenderAllowed: (msg) => - groupPolicy === "allowlist" - ? resolveMSTeamsAllowlistMatch({ - allowFrom: effectiveGroupAllowFrom, - senderId: msg.from?.user?.id ?? "", - senderName: msg.from?.user?.displayName, - allowNameMatching, - }).allowed - : true, + isSenderAllowed: isThreadSenderAllowed, }); const formatted = formatThreadContext(threadMessages, activity.id); if (formatted) { diff --git a/extensions/msteams/src/thread-parent-context.test.ts b/extensions/msteams/src/thread-parent-context.test.ts new file mode 100644 index 00000000000..3d61c441b1d --- /dev/null +++ b/extensions/msteams/src/thread-parent-context.test.ts @@ -0,0 +1,224 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { GraphThreadMessage } from "./graph-thread.js"; +import { + _resetThreadParentContextCachesForTest, + fetchParentMessageCached, + formatParentContextEvent, + markParentContextInjected, + shouldInjectParentContext, + summarizeParentMessage, +} from "./thread-parent-context.js"; + +describe("summarizeParentMessage", () => { + it("returns undefined for missing message", () => { + expect(summarizeParentMessage(undefined)).toBeUndefined(); + }); + + it("returns undefined when body is blank", () => { + const msg: GraphThreadMessage = { + id: "p1", + from: { user: { displayName: "Alice" } }, + body: { content: " ", contentType: "text" }, + }; + expect(summarizeParentMessage(msg)).toBeUndefined(); + }); + + it("extracts sender + plain text", () => { + const msg: GraphThreadMessage = { + id: "p1", + from: { user: { displayName: "Alice" } }, + body: { content: "Hello world", contentType: "text" }, + }; + expect(summarizeParentMessage(msg)).toEqual({ sender: "Alice", text: "Hello world" }); + }); + + it("strips HTML for html contentType", () => { + const msg: GraphThreadMessage = { + id: "p1", + from: { user: { displayName: "Bob" } }, + body: { content: "

Hi there

", contentType: "html" }, + }; + expect(summarizeParentMessage(msg)).toEqual({ sender: "Bob", text: "Hi there" }); + }); + + it("collapses whitespace in text contentType", () => { + const msg: GraphThreadMessage = { + id: "p1", + from: { user: { displayName: "Carol" } }, + body: { content: "line one\n line two\t\ttrailing", contentType: "text" }, + }; + expect(summarizeParentMessage(msg)).toEqual({ + sender: "Carol", + text: "line one line two trailing", + }); + }); + + it("falls back to application displayName", () => { + const msg: GraphThreadMessage = { + id: "p1", + from: { application: { displayName: "BotApp" } }, + body: { content: "heads up", contentType: "text" }, + }; + expect(summarizeParentMessage(msg)).toEqual({ sender: "BotApp", text: "heads up" }); + }); + + it("falls back to unknown when sender is missing", () => { + const msg: GraphThreadMessage = { + id: "p1", + body: { content: "orphan", contentType: "text" }, + }; + expect(summarizeParentMessage(msg)).toEqual({ sender: "unknown", text: "orphan" }); + }); + + it("truncates overly long parent text", () => { + const msg: GraphThreadMessage = { + id: "p1", + from: { user: { displayName: "Dana" } }, + body: { content: "x".repeat(1000), contentType: "text" }, + }; + const summary = summarizeParentMessage(msg); + expect(summary?.text.length).toBeLessThanOrEqual(400); + expect(summary?.text.endsWith("…")).toBe(true); + }); +}); + +describe("formatParentContextEvent", () => { + it("formats as Replying to @sender: body", () => { + expect(formatParentContextEvent({ sender: "Alice", text: "hello there" })).toBe( + "Replying to @Alice: hello there", + ); + }); +}); + +describe("fetchParentMessageCached", () => { + beforeEach(() => { + _resetThreadParentContextCachesForTest(); + }); + + it("invokes the fetcher on first call", async () => { + const mockMsg: GraphThreadMessage = { + id: "p1", + body: { content: "hi", contentType: "text" }, + }; + const fetcher = vi.fn(async () => mockMsg); + + const result = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + + expect(result).toEqual(mockMsg); + expect(fetcher).toHaveBeenCalledTimes(1); + expect(fetcher).toHaveBeenCalledWith("tok", "g1", "c1", "p1"); + }); + + it("returns cached value on repeat fetch without invoking fetcher", async () => { + const mockMsg: GraphThreadMessage = { + id: "p1", + body: { content: "hi", contentType: "text" }, + }; + const fetcher = vi.fn(async () => mockMsg); + + await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + const third = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + + expect(fetcher).toHaveBeenCalledTimes(1); + expect(third).toEqual(mockMsg); + }); + + it("caches undefined (Graph error) so failures do not re-fetch on burst", async () => { + const fetcher = vi.fn(async () => undefined); + + const first = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + const second = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + + expect(first).toBeUndefined(); + expect(second).toBeUndefined(); + expect(fetcher).toHaveBeenCalledTimes(1); + }); + + it("scopes cache by groupId/channelId/parentId", async () => { + const fetcher = vi.fn(async (_tok, _g, _c, parentId) => ({ + id: parentId, + body: { content: `content-${parentId}`, contentType: "text" }, + })); + + await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + await fetchParentMessageCached("tok", "g1", "c1", "p2", fetcher); + await fetchParentMessageCached("tok", "g2", "c1", "p1", fetcher); + + expect(fetcher).toHaveBeenCalledTimes(3); + }); + + it("re-fetches after TTL expires", async () => { + vi.useFakeTimers(); + try { + const fetcher = vi.fn(async () => ({ + id: "p1", + body: { content: "hi", contentType: "text" }, + })); + + await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + // 5 min TTL: advance just beyond. + vi.advanceTimersByTime(5 * 60 * 1000 + 1); + await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + + expect(fetcher).toHaveBeenCalledTimes(2); + } finally { + vi.useRealTimers(); + } + }); + + it("evicts oldest entries when exceeding the 100-entry cap", async () => { + const fetcher = vi.fn(async (_tok, _g, _c, parentId) => ({ + id: String(parentId), + body: { content: `v-${parentId}`, contentType: "text" }, + })); + + // Fill cache with 100 distinct parents. + for (let i = 0; i < 100; i += 1) { + await fetchParentMessageCached("tok", "g1", "c1", `p${i}`, fetcher); + } + expect(fetcher).toHaveBeenCalledTimes(100); + + // First entry should still be cached (no evictions yet). + await fetchParentMessageCached("tok", "g1", "c1", "p0", fetcher); + expect(fetcher).toHaveBeenCalledTimes(100); + + // Push one more distinct parent to trigger an eviction. + // The just-touched p0 is now the newest; the next-oldest (p1) should be evicted. + await fetchParentMessageCached("tok", "g1", "c1", "p100", fetcher); + expect(fetcher).toHaveBeenCalledTimes(101); + + // Fetching p1 again should miss the cache. + await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher); + expect(fetcher).toHaveBeenCalledTimes(102); + + // p0 is still cached because we refreshed it. + await fetchParentMessageCached("tok", "g1", "c1", "p0", fetcher); + expect(fetcher).toHaveBeenCalledTimes(102); + }); +}); + +describe("shouldInjectParentContext / markParentContextInjected", () => { + beforeEach(() => { + _resetThreadParentContextCachesForTest(); + }); + + it("returns true for first observation", () => { + expect(shouldInjectParentContext("session-1", "parent-1")).toBe(true); + }); + + it("returns false after marking the same parent", () => { + markParentContextInjected("session-1", "parent-1"); + expect(shouldInjectParentContext("session-1", "parent-1")).toBe(false); + }); + + it("returns true again when a different parent appears in the session", () => { + markParentContextInjected("session-1", "parent-1"); + expect(shouldInjectParentContext("session-1", "parent-2")).toBe(true); + }); + + it("dedupe is scoped per session key", () => { + markParentContextInjected("session-1", "parent-1"); + expect(shouldInjectParentContext("session-2", "parent-1")).toBe(true); + }); +}); diff --git a/extensions/msteams/src/thread-parent-context.ts b/extensions/msteams/src/thread-parent-context.ts new file mode 100644 index 00000000000..2ba381d91e9 --- /dev/null +++ b/extensions/msteams/src/thread-parent-context.ts @@ -0,0 +1,159 @@ +// Parent-message context injection for Teams channel thread replies. +// +// When an inbound message arrives as a reply inside a Teams channel thread, +// the triggering message often makes no sense on its own (for example, a +// one-word "yes" or "go ahead"). Per-thread session isolation (PR #62713) +// gives each thread its own session, but the first message in a brand-new +// thread session still has no parent context. +// +// This module fetches the parent message via Graph and prepends a compact +// `Replying to @sender: …` system event to the next agent turn so the agent +// knows what is being responded to. Fetches are cached to avoid repeated +// Graph calls within the same active thread, and per-session dedupe ensures +// the same parent is not re-injected on every subsequent reply in the +// thread. + +import { fetchChannelMessage, stripHtmlFromTeamsMessage } from "./graph-thread.js"; +import type { GraphThreadMessage } from "./graph-thread.js"; + +// LRU cache for parent message fetches. Keyed by `teamId:channelId:parentId`. +// 5-minute TTL and 100-entry cap keep active-thread chatter fast without +// holding stale data when a thread goes quiet. Eviction uses Map insertion +// order for LRU semantics (get() re-inserts on hit). +const PARENT_CACHE_TTL_MS = 5 * 60 * 1000; +const PARENT_CACHE_MAX = 100; + +type ParentCacheEntry = { + message: GraphThreadMessage | undefined; + expiresAt: number; +}; + +const parentCache = new Map(); + +// Per-session dedupe: remembers the most recent parent id we injected for a +// given session key. When the same thread session sees another reply against +// the same parent, we skip re-enqueueing the identical system event. We keep +// a small LRU so idle sessions eventually drop out. +const INJECTED_MAX = 200; +const injectedParents = new Map(); + +export type ThreadParentContextFetcher = ( + token: string, + groupId: string, + channelId: string, + messageId: string, +) => Promise; + +function touchLru(map: Map, key: K, value: V, max: number): void { + if (map.has(key)) { + map.delete(key); + } else if (map.size >= max) { + // Drop the oldest (first-inserted) entry. + const firstKey = map.keys().next().value; + if (firstKey !== undefined) { + map.delete(firstKey); + } + } + map.set(key, value); +} + +function buildParentCacheKey(groupId: string, channelId: string, parentId: string): string { + return `${groupId}\u0000${channelId}\u0000${parentId}`; +} + +/** + * Fetch a channel parent message with an LRU+TTL cache. + * + * Uses the injected `fetchParent` (defaults to `fetchChannelMessage`) so + * tests can swap in a stub without mocking the Graph transport. + */ +export async function fetchParentMessageCached( + token: string, + groupId: string, + channelId: string, + parentId: string, + fetchParent: ThreadParentContextFetcher = fetchChannelMessage, +): Promise { + const key = buildParentCacheKey(groupId, channelId, parentId); + const now = Date.now(); + const cached = parentCache.get(key); + if (cached && cached.expiresAt > now) { + // Refresh LRU ordering on hit. + parentCache.delete(key); + parentCache.set(key, cached); + return cached.message; + } + const message = await fetchParent(token, groupId, channelId, parentId); + touchLru(parentCache, key, { message, expiresAt: now + PARENT_CACHE_TTL_MS }, PARENT_CACHE_MAX); + return message; +} + +export type ParentContextSummary = { + /** Display name of the parent message author, or "unknown". */ + sender: string; + /** Stripped, single-line parent body text (or empty if unresolved). */ + text: string; +}; + +const PARENT_TEXT_MAX_CHARS = 400; + +/** + * Extract a compact summary (sender + plain-text body) from a Graph parent + * message. Returns undefined when the parent cannot be summarized (missing + * or blank body). + */ +export function summarizeParentMessage( + message: GraphThreadMessage | undefined, +): ParentContextSummary | undefined { + if (!message) { + return undefined; + } + const sender = + message.from?.user?.displayName ?? message.from?.application?.displayName ?? "unknown"; + const contentType = message.body?.contentType ?? "text"; + const raw = message.body?.content ?? ""; + const text = + contentType === "html" ? stripHtmlFromTeamsMessage(raw) : raw.replace(/\s+/g, " ").trim(); + if (!text) { + return undefined; + } + return { + sender, + text: + text.length > PARENT_TEXT_MAX_CHARS ? `${text.slice(0, PARENT_TEXT_MAX_CHARS - 1)}…` : text, + }; +} + +/** + * Build the single-line `Replying to @sender: body` system event text. + * Callers should pass this text to `enqueueSystemEvent` together with a + * stable contextKey derived from the parent id. + */ +export function formatParentContextEvent(summary: ParentContextSummary): string { + return `Replying to @${summary.sender}: ${summary.text}`; +} + +/** + * Decide whether a parent context event should be enqueued for the current + * session. Returns `false` when we already injected the same parent for this + * session recently (prevents re-prepending identical context on every reply + * in the thread). + */ +export function shouldInjectParentContext(sessionKey: string, parentId: string): boolean { + const key = sessionKey; + return injectedParents.get(key) !== parentId; +} + +/** + * Record that `parentId` was just injected for `sessionKey` so subsequent + * replies with the same parent can short-circuit via `shouldInjectParentContext`. + */ +export function markParentContextInjected(sessionKey: string, parentId: string): void { + touchLru(injectedParents, sessionKey, parentId, INJECTED_MAX); +} + +// Exported for test isolation. +export function _resetThreadParentContextCachesForTest(): void { + parentCache.clear(); + injectedParents.clear(); +}