fix(msteams): harden feedback reflection follow-ups

This commit is contained in:
Peter Steinberger
2026-03-24 09:49:59 -07:00
parent 72300e8fd0
commit e727ad6898
4 changed files with 348 additions and 22 deletions

View File

@@ -1,13 +1,14 @@
import { mkdtemp, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
buildFeedbackEvent,
buildReflectionPrompt,
clearReflectionCooldowns,
isReflectionAllowed,
loadSessionLearnings,
parseReflectionResponse,
recordReflectionTime,
} from "./feedback-reflection.js";
@@ -77,13 +78,47 @@ describe("buildReflectionPrompt", () => {
it("works without optional params", () => {
const prompt = buildReflectionPrompt({});
expect(prompt).toContain("previous response wasn't helpful");
expect(prompt).toContain("reflect");
expect(prompt).toContain('"followUp":false');
});
});
describe("parseReflectionResponse", () => {
it("parses strict JSON output", () => {
expect(
parseReflectionResponse(
'{"learning":"Be more direct next time.","followUp":true,"userMessage":"Sorry about that. I will keep it tighter."}',
),
).toEqual({
learning: "Be more direct next time.",
followUp: true,
userMessage: "Sorry about that. I will keep it tighter.",
});
});
it("parses JSON inside markdown fences", () => {
expect(
parseReflectionResponse(
'```json\n{"learning":"Ask a clarifying question first.","followUp":false,"userMessage":""}\n```',
),
).toEqual({
learning: "Ask a clarifying question first.",
followUp: false,
userMessage: undefined,
});
});
it("falls back to internal-only learning when parsing fails", () => {
expect(parseReflectionResponse("Be more concise.\nFollow up: yes.")).toEqual({
learning: "Be more concise.\nFollow up: yes.",
followUp: false,
});
});
});
describe("reflection cooldown", () => {
afterEach(() => {
clearReflectionCooldowns();
vi.restoreAllMocks();
});
it("allows first reflection", () => {
@@ -108,6 +143,18 @@ describe("reflection cooldown", () => {
expect(isReflectionAllowed("session-1", 60_000)).toBe(false);
expect(isReflectionAllowed("session-2", 60_000)).toBe(true);
});
it("keeps longer custom cooldown entries during pruning", () => {
vi.spyOn(Date, "now").mockReturnValue(0);
recordReflectionTime("target", 600_000);
vi.spyOn(Date, "now").mockReturnValue(301_000);
for (let index = 0; index <= 500; index += 1) {
recordReflectionTime(`session-${index}`, 600_000);
}
expect(isReflectionAllowed("target", 600_000)).toBe(false);
});
});
describe("loadSessionLearnings", () => {

View File

@@ -79,6 +79,12 @@ export function buildFeedbackEvent(params: {
};
}
export type ParsedReflectionResponse = {
learning: string;
followUp: boolean;
userMessage?: string;
};
export function buildReflectionPrompt(params: {
thumbedDownResponse?: string;
userComment?: string;
@@ -99,17 +105,93 @@ export function buildReflectionPrompt(params: {
parts.push(
"\nBriefly reflect: what could you improve? Consider tone, length, " +
"accuracy, relevance, and specificity. Reply with:\n" +
"1. A short adjustment note (1-2 sentences) for your future behavior " +
"in this conversation.\n" +
"2. Whether you should follow up with the user (yes if the adjustment " +
"is non-obvious or you have a clarifying question; no if minor).\n" +
"3. If following up, draft a brief message to the user.",
"accuracy, relevance, and specificity. Reply with a single JSON object " +
'only, no markdown or prose, using this exact shape:\n{"learning":"...",' +
'"followUp":false,"userMessage":""}\n' +
"- learning: a short internal adjustment note (1-2 sentences) for your " +
"future behavior in this conversation.\n" +
"- followUp: true only if the user needs a direct follow-up message.\n" +
"- userMessage: only the exact user-facing message to send; empty string " +
"when followUp is false.",
);
return parts.join("\n");
}
function parseBooleanLike(value: unknown): boolean | undefined {
if (typeof value === "boolean") {
return value;
}
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (normalized === "true" || normalized === "yes") {
return true;
}
if (normalized === "false" || normalized === "no") {
return false;
}
}
return undefined;
}
function parseStructuredReflectionValue(value: unknown): ParsedReflectionResponse | null {
if (value == null || typeof value !== "object" || Array.isArray(value)) {
return null;
}
const candidate = value as {
learning?: unknown;
followUp?: unknown;
userMessage?: unknown;
};
const learning = typeof candidate.learning === "string" ? candidate.learning.trim() : undefined;
if (!learning) {
return null;
}
return {
learning,
followUp: parseBooleanLike(candidate.followUp) ?? false,
userMessage:
typeof candidate.userMessage === "string" && candidate.userMessage.trim()
? candidate.userMessage.trim()
: undefined,
};
}
export function parseReflectionResponse(text: string): ParsedReflectionResponse | null {
const trimmed = text.trim();
if (!trimmed) {
return null;
}
const candidates = [
trimmed,
...(trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i)?.slice(1, 2) ?? []),
];
for (const candidateText of candidates) {
const candidate = candidateText.trim();
if (!candidate) {
continue;
}
try {
const parsed = parseStructuredReflectionValue(JSON.parse(candidate));
if (parsed) {
return parsed;
}
} catch {
// Fall through to the next parse strategy.
}
}
// Safe fallback: keep the internal learning, but never auto-message the user.
return {
learning: trimmed,
followUp: false,
};
}
/**
* Check if a reflection is allowed (cooldown not active).
*/
@@ -125,9 +207,9 @@ export function isReflectionAllowed(sessionKey: string, cooldownMs?: number): bo
/**
* Record that a reflection was run for a session.
*/
export function recordReflectionTime(sessionKey: string): void {
export function recordReflectionTime(sessionKey: string, cooldownMs?: number): void {
lastReflectionBySession.set(sessionKey, Date.now());
pruneExpiredCooldowns(DEFAULT_COOLDOWN_MS);
pruneExpiredCooldowns(cooldownMs ?? DEFAULT_COOLDOWN_MS);
}
/**
@@ -251,12 +333,19 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams)
return;
}
const parsedReflection = parseReflectionResponse(reflectionResponse);
if (!parsedReflection) {
log.debug?.("reflection produced no structured output");
return;
}
// Reflection succeeded — record cooldown now
recordReflectionTime(sessionKey);
recordReflectionTime(sessionKey, cooldownMs);
log.info("reflection complete", {
sessionKey,
responseLength: reflectionResponse.length,
followUp: parsedReflection.followUp,
});
// Store the learning in the session
@@ -264,19 +353,16 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams)
await storeSessionLearning({
storePath,
sessionKey: params.sessionKey,
learning: reflectionResponse.trim(),
learning: parsedReflection.learning,
});
} catch (err) {
log.debug?.("failed to store reflection learning", { error: String(err) });
}
// Send proactive follow-up if the reflection suggests one.
// Simple heuristic: if the response contains "follow up: yes" or similar,
// or if it's reasonably short (a direct message to the user).
// For now, always send the reflection as a follow-up — the prompt asks
// the agent to decide, and it will draft a user-facing message if appropriate.
const conversationType = params.conversationRef.conversation?.conversationType?.toLowerCase();
const isDirectMessage = conversationType === "personal";
const shouldNotify =
reflectionResponse.toLowerCase().includes("follow up") || reflectionResponse.length < 300;
isDirectMessage && parsedReflection.followUp && Boolean(parsedReflection.userMessage);
if (shouldNotify) {
try {
@@ -286,13 +372,18 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams)
await params.adapter.continueConversation(params.appId, proactiveRef, async (ctx) => {
await ctx.sendActivity({
type: "message",
text: reflectionResponse.trim(),
text: parsedReflection.userMessage!,
});
});
log.info("sent reflection follow-up", { sessionKey });
} catch (err) {
log.debug?.("failed to send reflection follow-up", { error: String(err) });
}
} else if (parsedReflection.followUp && !isDirectMessage) {
log.debug?.("skipping reflection follow-up outside direct message", {
sessionKey,
conversationType,
});
}
}

View File

@@ -0,0 +1,171 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const createChannelReplyPipelineMock = vi.hoisted(() => vi.fn());
const createReplyDispatcherWithTypingMock = vi.hoisted(() => vi.fn());
const getMSTeamsRuntimeMock = vi.hoisted(() => vi.fn());
const renderReplyPayloadsToMessagesMock = vi.hoisted(() => vi.fn(() => []));
const sendMSTeamsMessagesMock = vi.hoisted(() => vi.fn(async () => []));
const streamInstances = vi.hoisted(
() =>
[] as Array<{
hasContent: boolean;
sendInformativeUpdate: ReturnType<typeof vi.fn>;
update: ReturnType<typeof vi.fn>;
finalize: ReturnType<typeof vi.fn>;
}>,
);
vi.mock("../runtime-api.js", () => ({
createChannelReplyPipeline: createChannelReplyPipelineMock,
logTypingFailure: vi.fn(),
resolveChannelMediaMaxBytes: vi.fn(() => 8 * 1024 * 1024),
}));
vi.mock("./runtime.js", () => ({
getMSTeamsRuntime: getMSTeamsRuntimeMock,
}));
vi.mock("./messenger.js", () => ({
buildConversationReference: vi.fn((ref) => ref),
renderReplyPayloadsToMessages: renderReplyPayloadsToMessagesMock,
sendMSTeamsMessages: sendMSTeamsMessagesMock,
}));
vi.mock("./errors.js", () => ({
classifyMSTeamsSendError: vi.fn(() => ({})),
formatMSTeamsSendErrorHint: vi.fn(() => undefined),
formatUnknownError: vi.fn((err) => String(err)),
}));
vi.mock("./revoked-context.js", () => ({
withRevokedProxyFallback: async ({ run }: { run: () => Promise<unknown> }) => await run(),
}));
vi.mock("./streaming-message.js", () => ({
TeamsHttpStream: class {
hasContent = false;
sendInformativeUpdate = vi.fn(async () => {});
update = vi.fn();
finalize = vi.fn(async () => {});
constructor() {
streamInstances.push(this);
}
},
}));
import { createMSTeamsReplyDispatcher, pickInformativeStatusText } from "./reply-dispatcher.js";
describe("createMSTeamsReplyDispatcher", () => {
let typingCallbacks: {
onReplyStart: ReturnType<typeof vi.fn>;
onIdle: ReturnType<typeof vi.fn>;
onCleanup: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
vi.clearAllMocks();
streamInstances.length = 0;
typingCallbacks = {
onReplyStart: vi.fn(async () => {}),
onIdle: vi.fn(),
onCleanup: vi.fn(),
};
createChannelReplyPipelineMock.mockReturnValue({
onModelSelected: vi.fn(),
typingCallbacks,
});
createReplyDispatcherWithTypingMock.mockImplementation((options) => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: vi.fn(),
_options: options,
}));
getMSTeamsRuntimeMock.mockReturnValue({
channel: {
text: {
resolveChunkMode: vi.fn(() => "length"),
resolveMarkdownTableMode: vi.fn(() => "code"),
},
reply: {
createReplyDispatcherWithTyping: createReplyDispatcherWithTypingMock,
resolveHumanDelayConfig: vi.fn(() => undefined),
},
},
});
});
function createDispatcher(conversationType: string = "personal") {
return createMSTeamsReplyDispatcher({
cfg: { channels: { msteams: {} } } as never,
agentId: "agent",
runtime: { error: vi.fn() } as never,
log: { debug: vi.fn(), error: vi.fn(), warn: vi.fn() } as never,
adapter: {
continueConversation: vi.fn(),
process: vi.fn(),
updateActivity: vi.fn(),
deleteActivity: vi.fn(),
} as never,
appId: "app",
conversationRef: {
conversation: { id: "conv", conversationType },
user: { id: "user" },
agent: { id: "bot" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
} as never,
context: {
sendActivity: vi.fn(async () => ({ id: "activity-1" })),
} as never,
replyStyle: "thread",
textLimit: 4000,
});
}
it("sends an informative status update on reply start for personal chats", async () => {
createDispatcher("personal");
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.onReplyStart?.();
expect(streamInstances).toHaveLength(1);
expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1);
expect(typingCallbacks.onReplyStart).toHaveBeenCalledTimes(1);
});
it("only sends the informative status update once", async () => {
createDispatcher("personal");
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.onReplyStart?.();
await options.onReplyStart?.();
expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1);
});
it("forwards partial replies into the Teams stream", async () => {
const dispatcher = createDispatcher("personal");
await dispatcher.replyOptions.onPartialReply?.({ text: "partial response" });
expect(streamInstances[0]?.update).toHaveBeenCalledWith("partial response");
});
it("does not create a stream for channel conversations", async () => {
createDispatcher("channel");
expect(streamInstances).toHaveLength(0);
});
});
describe("pickInformativeStatusText", () => {
it("selects a deterministic status line for a fixed random source", () => {
expect(pickInformativeStatusText(() => 0)).toBe("Thinking...");
expect(pickInformativeStatusText(() => 0.99)).toBe("Putting an answer together...");
});
});

View File

@@ -26,6 +26,18 @@ import { getMSTeamsRuntime } from "./runtime.js";
import type { MSTeamsTurnContext } from "./sdk-types.js";
import { TeamsHttpStream } from "./streaming-message.js";
const INFORMATIVE_STATUS_TEXTS = [
"Thinking...",
"Working on that...",
"Checking the details...",
"Putting an answer together...",
];
export function pickInformativeStatusText(random = Math.random): string {
const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length);
return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!;
}
export function createMSTeamsReplyDispatcher(params: {
cfg: OpenClawConfig;
agentId: string;
@@ -120,6 +132,7 @@ export function createMSTeamsReplyDispatcher(params: {
// Track whether onPartialReply was ever called — if so, the stream
// owns the text delivery and deliver should skip text payloads.
let streamReceivedTokens = false;
let informativeUpdateSent = false;
if (isPersonal) {
stream = new TeamsHttpStream({
@@ -197,6 +210,13 @@ export function createMSTeamsReplyDispatcher(params: {
} = core.channel.reply.createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
onReplyStart: async () => {
if (stream && !informativeUpdateSent) {
informativeUpdateSent = true;
await stream.sendInformativeUpdate(pickInformativeStatusText());
}
await typingCallbacks?.onReplyStart?.();
},
typingCallbacks,
deliver: async (payload) => {
// When streaming received tokens AND hasn't failed, skip text delivery —
@@ -266,9 +286,6 @@ export function createMSTeamsReplyDispatcher(params: {
};
// Build reply options with onPartialReply for streaming.
// Send the informative update on the first token (not eagerly at stream creation)
// so it only appears when the LLM is actually generating text — not when the
// agent uses a tool (e.g. sends an adaptive card) without streaming.
const streamingReplyOptions = stream
? {
onPartialReply: (payload: { text?: string }) => {