feat(msteams): auto-inject parent message context for thread replies (#54932) (#63945)

* feat(msteams): auto-inject parent message context for thread replies (#54932)

* msteams: use Promise.allSettled for thread context, remove no-op buildInjectedKey

* fix(msteams): gate thread parent context by visibility

---------

Co-authored-by: Brad Groux <3053586+BradGroux@users.noreply.github.com>
This commit is contained in:
sudie-codes
2026-04-10 12:42:02 -07:00
committed by GitHub
parent 4edf0bb750
commit 01ea7e4921
5 changed files with 770 additions and 12 deletions

View File

@@ -4,6 +4,7 @@ import type { MSTeamsConversationStore } from "../conversation-store.js";
import type { GraphThreadMessage } from "../graph-thread.js";
import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js";
import { setMSTeamsRuntime } from "../runtime.js";
import { _resetThreadParentContextCachesForTest } from "../thread-parent-context.js";
import { createMSTeamsMessageHandler } from "./message-handler.js";
type HandlerInput = Parameters<ReturnType<typeof createMSTeamsMessageHandler>>[0];
@@ -160,6 +161,9 @@ describe("msteams monitor handler authz", () => {
graphThreadMockState.resolveTeamGroupId.mockClear();
graphThreadMockState.fetchChannelMessage.mockReset();
graphThreadMockState.fetchThreadReplies.mockReset();
// Parent-context LRU + per-session dedupe are module-level; clear between
// cases so stale parent fetches from earlier tests don't bleed in.
_resetThreadParentContextCachesForTest();
}
function createThreadMessage(params: {

View File

@@ -0,0 +1,323 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig, PluginRuntime, RuntimeEnv } from "../../runtime-api.js";
import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js";
import { setMSTeamsRuntime } from "../runtime.js";
import { _resetThreadParentContextCachesForTest } from "../thread-parent-context.js";
import { createMSTeamsMessageHandler } from "./message-handler.js";
const runtimeApiMockState = vi.hoisted(() => ({
dispatchReplyFromConfigWithSettledDispatcher: vi.fn(async (params: { ctxPayload: unknown }) => ({
queuedFinal: false,
counts: {},
capturedCtxPayload: params.ctxPayload,
})),
}));
vi.mock("../../runtime-api.js", async () => {
const actual =
await vi.importActual<typeof import("../../runtime-api.js")>("../../runtime-api.js");
return {
...actual,
dispatchReplyFromConfigWithSettledDispatcher:
runtimeApiMockState.dispatchReplyFromConfigWithSettledDispatcher,
};
});
const fetchChannelMessageMock = vi.hoisted(() => vi.fn());
const fetchThreadRepliesMock = vi.hoisted(() => vi.fn(async () => []));
const resolveTeamGroupIdMock = vi.hoisted(() => vi.fn(async () => "group-1"));
vi.mock("../graph-thread.js", async () => {
const actual = await vi.importActual<typeof import("../graph-thread.js")>("../graph-thread.js");
return {
...actual,
resolveTeamGroupId: resolveTeamGroupIdMock,
fetchChannelMessage: fetchChannelMessageMock,
fetchThreadReplies: fetchThreadRepliesMock,
};
});
vi.mock("../reply-dispatcher.js", () => ({
createMSTeamsReplyDispatcher: () => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: vi.fn(),
}),
}));
describe("msteams thread parent context injection", () => {
const channelConversationId = "19:general@thread.tacv2";
function createDeps(cfg: OpenClawConfig) {
const enqueueSystemEvent = vi.fn();
const recordInboundSession = vi.fn(async (_params: { sessionKey: string }) => undefined);
const resolveAgentRoute = vi.fn(({ peer }: { peer: { kind: string; id: string } }) => ({
sessionKey: `agent:main:msteams:${peer.kind}:${peer.id}`,
agentId: "main",
accountId: "default",
mainSessionKey: "agent:main:main",
lastRoutePolicy: "session" as const,
matchedBy: "default" as const,
}));
setMSTeamsRuntime({
logging: { shouldLogVerbose: () => false },
system: { enqueueSystemEvent },
channel: {
debounce: {
resolveInboundDebounceMs: () => 0,
createInboundDebouncer: <T>(params: {
onFlush: (entries: T[]) => Promise<void>;
}): { enqueue: (entry: T) => Promise<void> } => ({
enqueue: async (entry: T) => {
await params.onFlush([entry]);
},
}),
},
pairing: {
readAllowFromStore: vi.fn(async () => []),
upsertPairingRequest: vi.fn(async () => null),
},
text: {
hasControlCommand: () => false,
resolveTextChunkLimit: () => 4000,
},
routing: { resolveAgentRoute },
reply: {
formatAgentEnvelope: ({ body }: { body: string }) => body,
finalizeInboundContext: <T extends Record<string, unknown>>(ctx: T) => ctx,
},
session: {
recordInboundSession,
resolveStorePath: () => "/tmp/test-store",
},
},
} as unknown as PluginRuntime);
const deps: MSTeamsMessageHandlerDeps = {
cfg,
runtime: { error: vi.fn() } as unknown as RuntimeEnv,
appId: "test-app",
adapter: {} as MSTeamsMessageHandlerDeps["adapter"],
tokenProvider: {
getAccessToken: vi.fn(async () => "token"),
},
textLimit: 4000,
mediaMaxBytes: 1024 * 1024,
conversationStore: {
upsert: vi.fn(async () => undefined),
} as unknown as MSTeamsMessageHandlerDeps["conversationStore"],
pollStore: {
recordVote: vi.fn(async () => null),
} as unknown as MSTeamsMessageHandlerDeps["pollStore"],
log: {
info: vi.fn(),
debug: vi.fn(),
error: vi.fn(),
} as unknown as MSTeamsMessageHandlerDeps["log"],
};
return { deps, enqueueSystemEvent };
}
function channelActivity(overrides: Record<string, unknown> = {}) {
return {
id: "msg-1",
type: "message",
text: "hello",
from: { id: "user-id", aadObjectId: "user-aad", name: "Test User" },
recipient: { id: "bot-id", name: "Bot" },
conversation: { id: channelConversationId, conversationType: "channel" },
channelData: { team: { id: "team-1" } },
attachments: [],
entities: [{ type: "mention", mentioned: { id: "bot-id" } }],
...overrides,
};
}
function findParentSystemEventCall(
mock: ReturnType<typeof vi.fn>,
): [string, { sessionKey: string; contextKey?: string }] | undefined {
const calls = mock.mock.calls as Array<[string, { sessionKey: string; contextKey?: string }]>;
return calls.find(([text]) => text.startsWith("Replying to @"));
}
beforeEach(() => {
_resetThreadParentContextCachesForTest();
fetchChannelMessageMock.mockReset();
fetchThreadRepliesMock.mockReset();
fetchThreadRepliesMock.mockImplementation(async () => []);
resolveTeamGroupIdMock.mockReset();
resolveTeamGroupIdMock.mockImplementation(async () => "group-1");
runtimeApiMockState.dispatchReplyFromConfigWithSettledDispatcher.mockClear();
});
const cfg: OpenClawConfig = {
channels: { msteams: { groupPolicy: "open" } },
} as OpenClawConfig;
it("enqueues a Replying to @sender system event on the first thread reply", async () => {
fetchChannelMessageMock.mockResolvedValueOnce({
id: "thread-root-123",
from: { user: { displayName: "Alice", id: "alice-id" } },
body: { content: "Can someone investigate the latency spike?", contentType: "text" },
});
const { deps, enqueueSystemEvent } = createDeps(cfg);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
const parentCall = findParentSystemEventCall(enqueueSystemEvent);
expect(parentCall).toBeDefined();
expect(parentCall?.[0]).toBe("Replying to @Alice: Can someone investigate the latency spike?");
expect(parentCall?.[1]?.contextKey).toContain("msteams:thread-parent:");
expect(parentCall?.[1]?.contextKey).toContain("thread-root-123");
});
it("caches parent fetches across thread replies in the same session", async () => {
fetchChannelMessageMock.mockResolvedValue({
id: "thread-root-123",
from: { user: { displayName: "Alice" } },
body: { content: "Original question", contentType: "text" },
});
const { deps } = createDeps(cfg);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
await handler({
activity: channelActivity({ id: "msg-reply-2", replyToId: "thread-root-123" }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
// Parent message fetched exactly once across two replies thanks to LRU cache.
expect(fetchChannelMessageMock).toHaveBeenCalledTimes(1);
});
it("does not re-enqueue the same parent context within the same session", async () => {
fetchChannelMessageMock.mockResolvedValue({
id: "thread-root-123",
from: { user: { displayName: "Alice" } },
body: { content: "Original question", contentType: "text" },
});
const { deps, enqueueSystemEvent } = createDeps(cfg);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
await handler({
activity: channelActivity({ id: "msg-reply-2", replyToId: "thread-root-123" }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
const parentCalls = enqueueSystemEvent.mock.calls.filter(
([text]) => typeof text === "string" && text.startsWith("Replying to @"),
);
expect(parentCalls).toHaveLength(1);
});
it("does not enqueue parent context when allowlist visibility blocks the parent sender", async () => {
fetchChannelMessageMock.mockResolvedValue({
id: "thread-root-123",
from: { user: { displayName: "Mallory", id: "mallory-aad" } },
body: { content: "Blocked context", contentType: "text" },
});
const { deps, enqueueSystemEvent } = createDeps({
channels: {
msteams: {
groupPolicy: "allowlist",
groupAllowFrom: ["alice-aad"],
contextVisibility: "allowlist",
teams: {
"team-1": {
channels: {
[channelConversationId]: { requireMention: false },
},
},
},
},
},
} as OpenClawConfig);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: channelActivity({
id: "msg-reply-1",
replyToId: "thread-root-123",
from: { id: "alice-id", aadObjectId: "alice-aad", name: "Alice" },
}),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
expect(findParentSystemEventCall(enqueueSystemEvent)).toBeUndefined();
});
it("handles Graph failure gracefully without throwing or emitting a parent event", async () => {
fetchChannelMessageMock.mockRejectedValueOnce(new Error("graph down"));
const { deps, enqueueSystemEvent } = createDeps(cfg);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: channelActivity({ id: "msg-reply-1", replyToId: "thread-root-123" }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
const parentCall = findParentSystemEventCall(enqueueSystemEvent);
expect(parentCall).toBeUndefined();
// Original inbound system event still fires (best-effort parent fetch does not block).
expect(enqueueSystemEvent).toHaveBeenCalled();
});
it("does not fetch parent for DM replyToId", async () => {
fetchChannelMessageMock.mockResolvedValue({
id: "x",
from: { user: { displayName: "Alice" } },
body: { content: "should-not-happen", contentType: "text" },
});
const { deps, enqueueSystemEvent } = createDeps({
channels: { msteams: { allowFrom: ["*"] } },
} as OpenClawConfig);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: {
...channelActivity(),
conversation: { id: "a:dm-conversation", conversationType: "personal" },
channelData: {},
replyToId: "dm-parent",
entities: [],
},
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
expect(fetchChannelMessageMock).not.toHaveBeenCalled();
expect(findParentSystemEventCall(enqueueSystemEvent)).toBeUndefined();
});
it("does not fetch parent for top-level channel messages without replyToId", async () => {
fetchChannelMessageMock.mockResolvedValue({
id: "x",
from: { user: { displayName: "Alice" } },
body: { content: "should-not-happen", contentType: "text" },
});
const { deps, enqueueSystemEvent } = createDeps(cfg);
const handler = createMSTeamsMessageHandler(deps);
await handler({
activity: channelActivity({ id: "msg-root-1", replyToId: undefined }),
sendActivity: vi.fn(async () => undefined),
} as unknown as Parameters<typeof handler>[0]);
expect(fetchChannelMessageMock).not.toHaveBeenCalled();
expect(findParentSystemEventCall(enqueueSystemEvent)).toBeUndefined();
});
});

View File

@@ -26,10 +26,10 @@ import { isRecord } from "../attachments/shared.js";
import type { StoredConversationReference } from "../conversation-store.js";
import { formatUnknownError } from "../errors.js";
import {
fetchChannelMessage,
fetchThreadReplies,
formatThreadContext,
resolveTeamGroupId,
type GraphThreadMessage,
} from "../graph-thread.js";
import { resolveGraphChatId } from "../graph-upload.js";
import {
@@ -41,6 +41,13 @@ import {
translateMSTeamsDmConversationIdForGraph,
wasMSTeamsBotMentioned,
} from "../inbound.js";
import {
fetchParentMessageCached,
formatParentContextEvent,
markParentContextInjected,
shouldInjectParentContext,
summarizeParentMessage,
} from "../thread-parent-context.js";
function extractTextFromHtmlAttachments(attachments: MSTeamsAttachmentLike[]): string {
for (const attachment of attachments) {
@@ -597,15 +604,64 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
// Fetch thread history when the message is a reply inside a Teams channel thread.
// This is a best-effort enhancement; errors are logged and do not block the reply.
//
// We also enqueue a compact `Replying to @sender: …` system event when the parent
// is resolvable. On brand-new thread sessions (see PR #62713), this gives the agent
// immediate parent context even before the fuller `[Thread history]` block is assembled.
// Parent fetches are cached (5 min LRU, 100 entries) and per-session deduped so
// consecutive replies in the same thread do not re-inject identical context.
let threadContext: string | undefined;
if (activity.replyToId && isChannel && teamId) {
try {
const graphToken = await tokenProvider.getAccessToken("https://graph.microsoft.com");
const groupId = await resolveTeamGroupId(graphToken, teamId);
const [parentMsg, replies] = await Promise.all([
fetchChannelMessage(graphToken, groupId, conversationId, activity.replyToId),
// Use allSettled so a failure in one fetch does not discard the other.
// For example, reply-fetch 403 should not throw away a successful parent fetch.
const [parentResult, repliesResult] = await Promise.allSettled([
fetchParentMessageCached(graphToken, groupId, conversationId, activity.replyToId),
fetchThreadReplies(graphToken, groupId, conversationId, activity.replyToId),
]);
const parentMsg = parentResult.status === "fulfilled" ? parentResult.value : undefined;
const replies = repliesResult.status === "fulfilled" ? repliesResult.value : [];
if (parentResult.status === "rejected") {
log.debug?.("failed to fetch parent message", {
error: formatUnknownError(parentResult.reason),
});
}
if (repliesResult.status === "rejected") {
log.debug?.("failed to fetch thread replies", {
error: formatUnknownError(repliesResult.reason),
});
}
const isThreadSenderAllowed = (msg: GraphThreadMessage) =>
groupPolicy === "allowlist"
? resolveMSTeamsAllowlistMatch({
allowFrom: effectiveGroupAllowFrom,
senderId: msg.from?.user?.id ?? "",
senderName: msg.from?.user?.displayName,
allowNameMatching,
}).allowed
: true;
const parentSummary = summarizeParentMessage(parentMsg);
const visibleParentMessages = parentMsg
? filterSupplementalContextItems({
items: [parentMsg],
mode: contextVisibilityMode,
kind: "thread",
isSenderAllowed: isThreadSenderAllowed,
}).items
: [];
if (
parentSummary &&
visibleParentMessages.length > 0 &&
shouldInjectParentContext(route.sessionKey, activity.replyToId)
) {
core.system.enqueueSystemEvent(formatParentContextEvent(parentSummary), {
sessionKey: route.sessionKey,
contextKey: `msteams:thread-parent:${conversationId}:${activity.replyToId}`,
});
markParentContextInjected(route.sessionKey, activity.replyToId);
}
const allMessages = parentMsg ? [parentMsg, ...replies] : replies;
quoteSenderId = parentMsg?.from?.user?.id ?? parentMsg?.from?.application?.id ?? undefined;
quoteSenderName =
@@ -616,15 +672,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
items: allMessages,
mode: contextVisibilityMode,
kind: "thread",
isSenderAllowed: (msg) =>
groupPolicy === "allowlist"
? resolveMSTeamsAllowlistMatch({
allowFrom: effectiveGroupAllowFrom,
senderId: msg.from?.user?.id ?? "",
senderName: msg.from?.user?.displayName,
allowNameMatching,
}).allowed
: true,
isSenderAllowed: isThreadSenderAllowed,
});
const formatted = formatThreadContext(threadMessages, activity.id);
if (formatted) {

View File

@@ -0,0 +1,224 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { GraphThreadMessage } from "./graph-thread.js";
import {
_resetThreadParentContextCachesForTest,
fetchParentMessageCached,
formatParentContextEvent,
markParentContextInjected,
shouldInjectParentContext,
summarizeParentMessage,
} from "./thread-parent-context.js";
describe("summarizeParentMessage", () => {
it("returns undefined for missing message", () => {
expect(summarizeParentMessage(undefined)).toBeUndefined();
});
it("returns undefined when body is blank", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { user: { displayName: "Alice" } },
body: { content: " ", contentType: "text" },
};
expect(summarizeParentMessage(msg)).toBeUndefined();
});
it("extracts sender + plain text", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { user: { displayName: "Alice" } },
body: { content: "Hello world", contentType: "text" },
};
expect(summarizeParentMessage(msg)).toEqual({ sender: "Alice", text: "Hello world" });
});
it("strips HTML for html contentType", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { user: { displayName: "Bob" } },
body: { content: "<p>Hi <b>there</b></p>", contentType: "html" },
};
expect(summarizeParentMessage(msg)).toEqual({ sender: "Bob", text: "Hi there" });
});
it("collapses whitespace in text contentType", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { user: { displayName: "Carol" } },
body: { content: "line one\n line two\t\ttrailing", contentType: "text" },
};
expect(summarizeParentMessage(msg)).toEqual({
sender: "Carol",
text: "line one line two trailing",
});
});
it("falls back to application displayName", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { application: { displayName: "BotApp" } },
body: { content: "heads up", contentType: "text" },
};
expect(summarizeParentMessage(msg)).toEqual({ sender: "BotApp", text: "heads up" });
});
it("falls back to unknown when sender is missing", () => {
const msg: GraphThreadMessage = {
id: "p1",
body: { content: "orphan", contentType: "text" },
};
expect(summarizeParentMessage(msg)).toEqual({ sender: "unknown", text: "orphan" });
});
it("truncates overly long parent text", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { user: { displayName: "Dana" } },
body: { content: "x".repeat(1000), contentType: "text" },
};
const summary = summarizeParentMessage(msg);
expect(summary?.text.length).toBeLessThanOrEqual(400);
expect(summary?.text.endsWith("…")).toBe(true);
});
});
describe("formatParentContextEvent", () => {
it("formats as Replying to @sender: body", () => {
expect(formatParentContextEvent({ sender: "Alice", text: "hello there" })).toBe(
"Replying to @Alice: hello there",
);
});
});
describe("fetchParentMessageCached", () => {
beforeEach(() => {
_resetThreadParentContextCachesForTest();
});
it("invokes the fetcher on first call", async () => {
const mockMsg: GraphThreadMessage = {
id: "p1",
body: { content: "hi", contentType: "text" },
};
const fetcher = vi.fn(async () => mockMsg);
const result = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
expect(result).toEqual(mockMsg);
expect(fetcher).toHaveBeenCalledTimes(1);
expect(fetcher).toHaveBeenCalledWith("tok", "g1", "c1", "p1");
});
it("returns cached value on repeat fetch without invoking fetcher", async () => {
const mockMsg: GraphThreadMessage = {
id: "p1",
body: { content: "hi", contentType: "text" },
};
const fetcher = vi.fn(async () => mockMsg);
await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
const third = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
expect(fetcher).toHaveBeenCalledTimes(1);
expect(third).toEqual(mockMsg);
});
it("caches undefined (Graph error) so failures do not re-fetch on burst", async () => {
const fetcher = vi.fn(async () => undefined);
const first = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
const second = await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
expect(first).toBeUndefined();
expect(second).toBeUndefined();
expect(fetcher).toHaveBeenCalledTimes(1);
});
it("scopes cache by groupId/channelId/parentId", async () => {
const fetcher = vi.fn(async (_tok, _g, _c, parentId) => ({
id: parentId,
body: { content: `content-${parentId}`, contentType: "text" },
}));
await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
await fetchParentMessageCached("tok", "g1", "c1", "p2", fetcher);
await fetchParentMessageCached("tok", "g2", "c1", "p1", fetcher);
expect(fetcher).toHaveBeenCalledTimes(3);
});
it("re-fetches after TTL expires", async () => {
vi.useFakeTimers();
try {
const fetcher = vi.fn(async () => ({
id: "p1",
body: { content: "hi", contentType: "text" },
}));
await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
// 5 min TTL: advance just beyond.
vi.advanceTimersByTime(5 * 60 * 1000 + 1);
await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
expect(fetcher).toHaveBeenCalledTimes(2);
} finally {
vi.useRealTimers();
}
});
it("evicts oldest entries when exceeding the 100-entry cap", async () => {
const fetcher = vi.fn(async (_tok, _g, _c, parentId) => ({
id: String(parentId),
body: { content: `v-${parentId}`, contentType: "text" },
}));
// Fill cache with 100 distinct parents.
for (let i = 0; i < 100; i += 1) {
await fetchParentMessageCached("tok", "g1", "c1", `p${i}`, fetcher);
}
expect(fetcher).toHaveBeenCalledTimes(100);
// First entry should still be cached (no evictions yet).
await fetchParentMessageCached("tok", "g1", "c1", "p0", fetcher);
expect(fetcher).toHaveBeenCalledTimes(100);
// Push one more distinct parent to trigger an eviction.
// The just-touched p0 is now the newest; the next-oldest (p1) should be evicted.
await fetchParentMessageCached("tok", "g1", "c1", "p100", fetcher);
expect(fetcher).toHaveBeenCalledTimes(101);
// Fetching p1 again should miss the cache.
await fetchParentMessageCached("tok", "g1", "c1", "p1", fetcher);
expect(fetcher).toHaveBeenCalledTimes(102);
// p0 is still cached because we refreshed it.
await fetchParentMessageCached("tok", "g1", "c1", "p0", fetcher);
expect(fetcher).toHaveBeenCalledTimes(102);
});
});
describe("shouldInjectParentContext / markParentContextInjected", () => {
beforeEach(() => {
_resetThreadParentContextCachesForTest();
});
it("returns true for first observation", () => {
expect(shouldInjectParentContext("session-1", "parent-1")).toBe(true);
});
it("returns false after marking the same parent", () => {
markParentContextInjected("session-1", "parent-1");
expect(shouldInjectParentContext("session-1", "parent-1")).toBe(false);
});
it("returns true again when a different parent appears in the session", () => {
markParentContextInjected("session-1", "parent-1");
expect(shouldInjectParentContext("session-1", "parent-2")).toBe(true);
});
it("dedupe is scoped per session key", () => {
markParentContextInjected("session-1", "parent-1");
expect(shouldInjectParentContext("session-2", "parent-1")).toBe(true);
});
});

View File

@@ -0,0 +1,159 @@
// Parent-message context injection for Teams channel thread replies.
//
// When an inbound message arrives as a reply inside a Teams channel thread,
// the triggering message often makes no sense on its own (for example, a
// one-word "yes" or "go ahead"). Per-thread session isolation (PR #62713)
// gives each thread its own session, but the first message in a brand-new
// thread session still has no parent context.
//
// This module fetches the parent message via Graph and prepends a compact
// `Replying to @sender: …` system event to the next agent turn so the agent
// knows what is being responded to. Fetches are cached to avoid repeated
// Graph calls within the same active thread, and per-session dedupe ensures
// the same parent is not re-injected on every subsequent reply in the
// thread.
import { fetchChannelMessage, stripHtmlFromTeamsMessage } from "./graph-thread.js";
import type { GraphThreadMessage } from "./graph-thread.js";
// LRU cache for parent message fetches. Keyed by `teamId:channelId:parentId`.
// 5-minute TTL and 100-entry cap keep active-thread chatter fast without
// holding stale data when a thread goes quiet. Eviction uses Map insertion
// order for LRU semantics (get() re-inserts on hit).
const PARENT_CACHE_TTL_MS = 5 * 60 * 1000;
const PARENT_CACHE_MAX = 100;
type ParentCacheEntry = {
message: GraphThreadMessage | undefined;
expiresAt: number;
};
const parentCache = new Map<string, ParentCacheEntry>();
// Per-session dedupe: remembers the most recent parent id we injected for a
// given session key. When the same thread session sees another reply against
// the same parent, we skip re-enqueueing the identical system event. We keep
// a small LRU so idle sessions eventually drop out.
const INJECTED_MAX = 200;
const injectedParents = new Map<string, string>();
export type ThreadParentContextFetcher = (
token: string,
groupId: string,
channelId: string,
messageId: string,
) => Promise<GraphThreadMessage | undefined>;
function touchLru<K, V>(map: Map<K, V>, key: K, value: V, max: number): void {
if (map.has(key)) {
map.delete(key);
} else if (map.size >= max) {
// Drop the oldest (first-inserted) entry.
const firstKey = map.keys().next().value;
if (firstKey !== undefined) {
map.delete(firstKey);
}
}
map.set(key, value);
}
function buildParentCacheKey(groupId: string, channelId: string, parentId: string): string {
return `${groupId}\u0000${channelId}\u0000${parentId}`;
}
/**
* Fetch a channel parent message with an LRU+TTL cache.
*
* Uses the injected `fetchParent` (defaults to `fetchChannelMessage`) so
* tests can swap in a stub without mocking the Graph transport.
*/
export async function fetchParentMessageCached(
token: string,
groupId: string,
channelId: string,
parentId: string,
fetchParent: ThreadParentContextFetcher = fetchChannelMessage,
): Promise<GraphThreadMessage | undefined> {
const key = buildParentCacheKey(groupId, channelId, parentId);
const now = Date.now();
const cached = parentCache.get(key);
if (cached && cached.expiresAt > now) {
// Refresh LRU ordering on hit.
parentCache.delete(key);
parentCache.set(key, cached);
return cached.message;
}
const message = await fetchParent(token, groupId, channelId, parentId);
touchLru(parentCache, key, { message, expiresAt: now + PARENT_CACHE_TTL_MS }, PARENT_CACHE_MAX);
return message;
}
export type ParentContextSummary = {
/** Display name of the parent message author, or "unknown". */
sender: string;
/** Stripped, single-line parent body text (or empty if unresolved). */
text: string;
};
const PARENT_TEXT_MAX_CHARS = 400;
/**
* Extract a compact summary (sender + plain-text body) from a Graph parent
* message. Returns undefined when the parent cannot be summarized (missing
* or blank body).
*/
export function summarizeParentMessage(
message: GraphThreadMessage | undefined,
): ParentContextSummary | undefined {
if (!message) {
return undefined;
}
const sender =
message.from?.user?.displayName ?? message.from?.application?.displayName ?? "unknown";
const contentType = message.body?.contentType ?? "text";
const raw = message.body?.content ?? "";
const text =
contentType === "html" ? stripHtmlFromTeamsMessage(raw) : raw.replace(/\s+/g, " ").trim();
if (!text) {
return undefined;
}
return {
sender,
text:
text.length > PARENT_TEXT_MAX_CHARS ? `${text.slice(0, PARENT_TEXT_MAX_CHARS - 1)}` : text,
};
}
/**
* Build the single-line `Replying to @sender: body` system event text.
* Callers should pass this text to `enqueueSystemEvent` together with a
* stable contextKey derived from the parent id.
*/
export function formatParentContextEvent(summary: ParentContextSummary): string {
return `Replying to @${summary.sender}: ${summary.text}`;
}
/**
* Decide whether a parent context event should be enqueued for the current
* session. Returns `false` when we already injected the same parent for this
* session recently (prevents re-prepending identical context on every reply
* in the thread).
*/
export function shouldInjectParentContext(sessionKey: string, parentId: string): boolean {
const key = sessionKey;
return injectedParents.get(key) !== parentId;
}
/**
* Record that `parentId` was just injected for `sessionKey` so subsequent
* replies with the same parent can short-circuit via `shouldInjectParentContext`.
*/
export function markParentContextInjected(sessionKey: string, parentId: string): void {
touchLru(injectedParents, sessionKey, parentId, INJECTED_MAX);
}
// Exported for test isolation.
export function _resetThreadParentContextCachesForTest(): void {
parentCache.clear();
injectedParents.clear();
}