From e727ad6898e36928eb2292a1f96743c44e2e314e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 24 Mar 2026 09:49:59 -0700 Subject: [PATCH] fix(msteams): harden feedback reflection follow-ups --- .../msteams/src/feedback-reflection.test.ts | 51 +++++- extensions/msteams/src/feedback-reflection.ts | 125 +++++++++++-- .../msteams/src/reply-dispatcher.test.ts | 171 ++++++++++++++++++ extensions/msteams/src/reply-dispatcher.ts | 23 ++- 4 files changed, 348 insertions(+), 22 deletions(-) create mode 100644 extensions/msteams/src/reply-dispatcher.test.ts diff --git a/extensions/msteams/src/feedback-reflection.test.ts b/extensions/msteams/src/feedback-reflection.test.ts index 599b5e5079e..7c07101edf1 100644 --- a/extensions/msteams/src/feedback-reflection.test.ts +++ b/extensions/msteams/src/feedback-reflection.test.ts @@ -1,13 +1,14 @@ import { mkdtemp, rm, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { buildFeedbackEvent, buildReflectionPrompt, clearReflectionCooldowns, isReflectionAllowed, loadSessionLearnings, + parseReflectionResponse, recordReflectionTime, } from "./feedback-reflection.js"; @@ -77,13 +78,47 @@ describe("buildReflectionPrompt", () => { it("works without optional params", () => { const prompt = buildReflectionPrompt({}); expect(prompt).toContain("previous response wasn't helpful"); - expect(prompt).toContain("reflect"); + expect(prompt).toContain('"followUp":false'); + }); +}); + +describe("parseReflectionResponse", () => { + it("parses strict JSON output", () => { + expect( + parseReflectionResponse( + '{"learning":"Be more direct next time.","followUp":true,"userMessage":"Sorry about that. I will keep it tighter."}', + ), + ).toEqual({ + learning: "Be more direct next time.", + followUp: true, + userMessage: "Sorry about that. I will keep it tighter.", + }); + }); + + it("parses JSON inside markdown fences", () => { + expect( + parseReflectionResponse( + '```json\n{"learning":"Ask a clarifying question first.","followUp":false,"userMessage":""}\n```', + ), + ).toEqual({ + learning: "Ask a clarifying question first.", + followUp: false, + userMessage: undefined, + }); + }); + + it("falls back to internal-only learning when parsing fails", () => { + expect(parseReflectionResponse("Be more concise.\nFollow up: yes.")).toEqual({ + learning: "Be more concise.\nFollow up: yes.", + followUp: false, + }); }); }); describe("reflection cooldown", () => { afterEach(() => { clearReflectionCooldowns(); + vi.restoreAllMocks(); }); it("allows first reflection", () => { @@ -108,6 +143,18 @@ describe("reflection cooldown", () => { expect(isReflectionAllowed("session-1", 60_000)).toBe(false); expect(isReflectionAllowed("session-2", 60_000)).toBe(true); }); + + it("keeps longer custom cooldown entries during pruning", () => { + vi.spyOn(Date, "now").mockReturnValue(0); + recordReflectionTime("target", 600_000); + + vi.spyOn(Date, "now").mockReturnValue(301_000); + for (let index = 0; index <= 500; index += 1) { + recordReflectionTime(`session-${index}`, 600_000); + } + + expect(isReflectionAllowed("target", 600_000)).toBe(false); + }); }); describe("loadSessionLearnings", () => { diff --git a/extensions/msteams/src/feedback-reflection.ts b/extensions/msteams/src/feedback-reflection.ts index a5481013e9f..cba7e35ab7a 100644 --- a/extensions/msteams/src/feedback-reflection.ts +++ b/extensions/msteams/src/feedback-reflection.ts @@ -79,6 +79,12 @@ export function buildFeedbackEvent(params: { }; } +export type ParsedReflectionResponse = { + learning: string; + followUp: boolean; + userMessage?: string; +}; + export function buildReflectionPrompt(params: { thumbedDownResponse?: string; userComment?: string; @@ -99,17 +105,93 @@ export function buildReflectionPrompt(params: { parts.push( "\nBriefly reflect: what could you improve? Consider tone, length, " + - "accuracy, relevance, and specificity. Reply with:\n" + - "1. A short adjustment note (1-2 sentences) for your future behavior " + - "in this conversation.\n" + - "2. Whether you should follow up with the user (yes if the adjustment " + - "is non-obvious or you have a clarifying question; no if minor).\n" + - "3. If following up, draft a brief message to the user.", + "accuracy, relevance, and specificity. Reply with a single JSON object " + + 'only, no markdown or prose, using this exact shape:\n{"learning":"...",' + + '"followUp":false,"userMessage":""}\n' + + "- learning: a short internal adjustment note (1-2 sentences) for your " + + "future behavior in this conversation.\n" + + "- followUp: true only if the user needs a direct follow-up message.\n" + + "- userMessage: only the exact user-facing message to send; empty string " + + "when followUp is false.", ); return parts.join("\n"); } +function parseBooleanLike(value: unknown): boolean | undefined { + if (typeof value === "boolean") { + return value; + } + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (normalized === "true" || normalized === "yes") { + return true; + } + if (normalized === "false" || normalized === "no") { + return false; + } + } + return undefined; +} + +function parseStructuredReflectionValue(value: unknown): ParsedReflectionResponse | null { + if (value == null || typeof value !== "object" || Array.isArray(value)) { + return null; + } + + const candidate = value as { + learning?: unknown; + followUp?: unknown; + userMessage?: unknown; + }; + const learning = typeof candidate.learning === "string" ? candidate.learning.trim() : undefined; + if (!learning) { + return null; + } + + return { + learning, + followUp: parseBooleanLike(candidate.followUp) ?? false, + userMessage: + typeof candidate.userMessage === "string" && candidate.userMessage.trim() + ? candidate.userMessage.trim() + : undefined, + }; +} + +export function parseReflectionResponse(text: string): ParsedReflectionResponse | null { + const trimmed = text.trim(); + if (!trimmed) { + return null; + } + + const candidates = [ + trimmed, + ...(trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i)?.slice(1, 2) ?? []), + ]; + + for (const candidateText of candidates) { + const candidate = candidateText.trim(); + if (!candidate) { + continue; + } + try { + const parsed = parseStructuredReflectionValue(JSON.parse(candidate)); + if (parsed) { + return parsed; + } + } catch { + // Fall through to the next parse strategy. + } + } + + // Safe fallback: keep the internal learning, but never auto-message the user. + return { + learning: trimmed, + followUp: false, + }; +} + /** * Check if a reflection is allowed (cooldown not active). */ @@ -125,9 +207,9 @@ export function isReflectionAllowed(sessionKey: string, cooldownMs?: number): bo /** * Record that a reflection was run for a session. */ -export function recordReflectionTime(sessionKey: string): void { +export function recordReflectionTime(sessionKey: string, cooldownMs?: number): void { lastReflectionBySession.set(sessionKey, Date.now()); - pruneExpiredCooldowns(DEFAULT_COOLDOWN_MS); + pruneExpiredCooldowns(cooldownMs ?? DEFAULT_COOLDOWN_MS); } /** @@ -251,12 +333,19 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams) return; } + const parsedReflection = parseReflectionResponse(reflectionResponse); + if (!parsedReflection) { + log.debug?.("reflection produced no structured output"); + return; + } + // Reflection succeeded — record cooldown now - recordReflectionTime(sessionKey); + recordReflectionTime(sessionKey, cooldownMs); log.info("reflection complete", { sessionKey, responseLength: reflectionResponse.length, + followUp: parsedReflection.followUp, }); // Store the learning in the session @@ -264,19 +353,16 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams) await storeSessionLearning({ storePath, sessionKey: params.sessionKey, - learning: reflectionResponse.trim(), + learning: parsedReflection.learning, }); } catch (err) { log.debug?.("failed to store reflection learning", { error: String(err) }); } - // Send proactive follow-up if the reflection suggests one. - // Simple heuristic: if the response contains "follow up: yes" or similar, - // or if it's reasonably short (a direct message to the user). - // For now, always send the reflection as a follow-up — the prompt asks - // the agent to decide, and it will draft a user-facing message if appropriate. + const conversationType = params.conversationRef.conversation?.conversationType?.toLowerCase(); + const isDirectMessage = conversationType === "personal"; const shouldNotify = - reflectionResponse.toLowerCase().includes("follow up") || reflectionResponse.length < 300; + isDirectMessage && parsedReflection.followUp && Boolean(parsedReflection.userMessage); if (shouldNotify) { try { @@ -286,13 +372,18 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams) await params.adapter.continueConversation(params.appId, proactiveRef, async (ctx) => { await ctx.sendActivity({ type: "message", - text: reflectionResponse.trim(), + text: parsedReflection.userMessage!, }); }); log.info("sent reflection follow-up", { sessionKey }); } catch (err) { log.debug?.("failed to send reflection follow-up", { error: String(err) }); } + } else if (parsedReflection.followUp && !isDirectMessage) { + log.debug?.("skipping reflection follow-up outside direct message", { + sessionKey, + conversationType, + }); } } diff --git a/extensions/msteams/src/reply-dispatcher.test.ts b/extensions/msteams/src/reply-dispatcher.test.ts new file mode 100644 index 00000000000..d7613044c9d --- /dev/null +++ b/extensions/msteams/src/reply-dispatcher.test.ts @@ -0,0 +1,171 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const createChannelReplyPipelineMock = vi.hoisted(() => vi.fn()); +const createReplyDispatcherWithTypingMock = vi.hoisted(() => vi.fn()); +const getMSTeamsRuntimeMock = vi.hoisted(() => vi.fn()); +const renderReplyPayloadsToMessagesMock = vi.hoisted(() => vi.fn(() => [])); +const sendMSTeamsMessagesMock = vi.hoisted(() => vi.fn(async () => [])); +const streamInstances = vi.hoisted( + () => + [] as Array<{ + hasContent: boolean; + sendInformativeUpdate: ReturnType; + update: ReturnType; + finalize: ReturnType; + }>, +); + +vi.mock("../runtime-api.js", () => ({ + createChannelReplyPipeline: createChannelReplyPipelineMock, + logTypingFailure: vi.fn(), + resolveChannelMediaMaxBytes: vi.fn(() => 8 * 1024 * 1024), +})); + +vi.mock("./runtime.js", () => ({ + getMSTeamsRuntime: getMSTeamsRuntimeMock, +})); + +vi.mock("./messenger.js", () => ({ + buildConversationReference: vi.fn((ref) => ref), + renderReplyPayloadsToMessages: renderReplyPayloadsToMessagesMock, + sendMSTeamsMessages: sendMSTeamsMessagesMock, +})); + +vi.mock("./errors.js", () => ({ + classifyMSTeamsSendError: vi.fn(() => ({})), + formatMSTeamsSendErrorHint: vi.fn(() => undefined), + formatUnknownError: vi.fn((err) => String(err)), +})); + +vi.mock("./revoked-context.js", () => ({ + withRevokedProxyFallback: async ({ run }: { run: () => Promise }) => await run(), +})); + +vi.mock("./streaming-message.js", () => ({ + TeamsHttpStream: class { + hasContent = false; + sendInformativeUpdate = vi.fn(async () => {}); + update = vi.fn(); + finalize = vi.fn(async () => {}); + + constructor() { + streamInstances.push(this); + } + }, +})); + +import { createMSTeamsReplyDispatcher, pickInformativeStatusText } from "./reply-dispatcher.js"; + +describe("createMSTeamsReplyDispatcher", () => { + let typingCallbacks: { + onReplyStart: ReturnType; + onIdle: ReturnType; + onCleanup: ReturnType; + }; + + beforeEach(() => { + vi.clearAllMocks(); + streamInstances.length = 0; + + typingCallbacks = { + onReplyStart: vi.fn(async () => {}), + onIdle: vi.fn(), + onCleanup: vi.fn(), + }; + + createChannelReplyPipelineMock.mockReturnValue({ + onModelSelected: vi.fn(), + typingCallbacks, + }); + + createReplyDispatcherWithTypingMock.mockImplementation((options) => ({ + dispatcher: {}, + replyOptions: {}, + markDispatchIdle: vi.fn(), + _options: options, + })); + + getMSTeamsRuntimeMock.mockReturnValue({ + channel: { + text: { + resolveChunkMode: vi.fn(() => "length"), + resolveMarkdownTableMode: vi.fn(() => "code"), + }, + reply: { + createReplyDispatcherWithTyping: createReplyDispatcherWithTypingMock, + resolveHumanDelayConfig: vi.fn(() => undefined), + }, + }, + }); + }); + + function createDispatcher(conversationType: string = "personal") { + return createMSTeamsReplyDispatcher({ + cfg: { channels: { msteams: {} } } as never, + agentId: "agent", + runtime: { error: vi.fn() } as never, + log: { debug: vi.fn(), error: vi.fn(), warn: vi.fn() } as never, + adapter: { + continueConversation: vi.fn(), + process: vi.fn(), + updateActivity: vi.fn(), + deleteActivity: vi.fn(), + } as never, + appId: "app", + conversationRef: { + conversation: { id: "conv", conversationType }, + user: { id: "user" }, + agent: { id: "bot" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + } as never, + context: { + sendActivity: vi.fn(async () => ({ id: "activity-1" })), + } as never, + replyStyle: "thread", + textLimit: 4000, + }); + } + + it("sends an informative status update on reply start for personal chats", async () => { + createDispatcher("personal"); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.onReplyStart?.(); + + expect(streamInstances).toHaveLength(1); + expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1); + expect(typingCallbacks.onReplyStart).toHaveBeenCalledTimes(1); + }); + + it("only sends the informative status update once", async () => { + createDispatcher("personal"); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.onReplyStart?.(); + await options.onReplyStart?.(); + + expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1); + }); + + it("forwards partial replies into the Teams stream", async () => { + const dispatcher = createDispatcher("personal"); + + await dispatcher.replyOptions.onPartialReply?.({ text: "partial response" }); + + expect(streamInstances[0]?.update).toHaveBeenCalledWith("partial response"); + }); + + it("does not create a stream for channel conversations", async () => { + createDispatcher("channel"); + + expect(streamInstances).toHaveLength(0); + }); +}); + +describe("pickInformativeStatusText", () => { + it("selects a deterministic status line for a fixed random source", () => { + expect(pickInformativeStatusText(() => 0)).toBe("Thinking..."); + expect(pickInformativeStatusText(() => 0.99)).toBe("Putting an answer together..."); + }); +}); diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index 5ed1aefa94d..e25f7c441ae 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -26,6 +26,18 @@ import { getMSTeamsRuntime } from "./runtime.js"; import type { MSTeamsTurnContext } from "./sdk-types.js"; import { TeamsHttpStream } from "./streaming-message.js"; +const INFORMATIVE_STATUS_TEXTS = [ + "Thinking...", + "Working on that...", + "Checking the details...", + "Putting an answer together...", +]; + +export function pickInformativeStatusText(random = Math.random): string { + const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length); + return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!; +} + export function createMSTeamsReplyDispatcher(params: { cfg: OpenClawConfig; agentId: string; @@ -120,6 +132,7 @@ export function createMSTeamsReplyDispatcher(params: { // Track whether onPartialReply was ever called — if so, the stream // owns the text delivery and deliver should skip text payloads. let streamReceivedTokens = false; + let informativeUpdateSent = false; if (isPersonal) { stream = new TeamsHttpStream({ @@ -197,6 +210,13 @@ export function createMSTeamsReplyDispatcher(params: { } = core.channel.reply.createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId), + onReplyStart: async () => { + if (stream && !informativeUpdateSent) { + informativeUpdateSent = true; + await stream.sendInformativeUpdate(pickInformativeStatusText()); + } + await typingCallbacks?.onReplyStart?.(); + }, typingCallbacks, deliver: async (payload) => { // When streaming received tokens AND hasn't failed, skip text delivery — @@ -266,9 +286,6 @@ export function createMSTeamsReplyDispatcher(params: { }; // Build reply options with onPartialReply for streaming. - // Send the informative update on the first token (not eagerly at stream creation) - // so it only appears when the LLM is actually generating text — not when the - // agent uses a tool (e.g. sends an adaptive card) without streaming. const streamingReplyOptions = stream ? { onPartialReply: (payload: { text?: string }) => {