fix(msteams): batch multi-block replies into single continueConversation call (#29379) (#49587)

Teams silently drops blocks 2+ when each deliver() opens its own
continueConversation() call. Accumulate rendered messages across all
deliver() calls and flush them together in markDispatchIdle().

On batch failure, retry each message individually so trailing blocks
are not silently lost. Log a warning when any individual messages fail
so flush failures are visible in logs.
This commit is contained in:
sudie-codes
2026-03-22 18:16:17 -07:00
committed by GitHub
parent 71113ea0cb
commit 8b5eeba386
3 changed files with 170 additions and 52 deletions

View File

@@ -20,6 +20,7 @@ vi.mock("./graph-upload.js", async () => {
import { resolvePreferredOpenClawTmpDir } from "../../../src/infra/tmp-openclaw-dir.js";
import {
type MSTeamsAdapter,
type MSTeamsRenderedMessage,
renderReplyPayloadsToMessages,
sendMSTeamsMessages,
} from "./messenger.js";
@@ -407,5 +408,53 @@ describe("msteams messenger", () => {
expect(attempts).toEqual(["hello", "hello"]);
expect(ids).toEqual(["id:hello"]);
});
it("delivers all blocks in a multi-block reply via a single continueConversation call (#29379)", async () => {
// Regression: multiple text blocks (e.g. text -> tool -> text) must all
// reach the user. Previously each deliver() call opened a separate
// continueConversation(); Teams silently drops blocks 2+ in that case.
// The fix batches all rendered messages into one sendMSTeamsMessages call
// so they share a single continueConversation().
const conversationCallTexts: string[][] = [];
const adapter: MSTeamsAdapter = {
continueConversation: async (_appId, _reference, logic) => {
const batchTexts: string[] = [];
await logic({
sendActivity: async (activity: unknown) => {
const { text } = activity as { text?: string };
batchTexts.push(text ?? "");
return { id: `id:${text ?? ""}` };
},
});
conversationCallTexts.push(batchTexts);
},
process: async () => {},
updateActivity: noopUpdateActivity,
deleteActivity: noopDeleteActivity,
};
// Three blocks (text + code + text) sent together in one call.
const ids = await sendMSTeamsMessages({
replyStyle: "top-level",
adapter,
appId: "app123",
conversationRef: baseRef,
messages: [
{ text: "Let me look that up..." },
{ text: "```\nresult = 42\n```" },
{ text: "The answer is 42." },
],
});
// All three blocks delivered.
expect(ids).toHaveLength(3);
// All three arrive in a single continueConversation() call, not three.
expect(conversationCallTexts).toHaveLength(1);
expect(conversationCallTexts[0]).toEqual([
"Let me look that up...",
"```\nresult = 42\n```",
"The answer is 42.",
]);
});
});
});

View File

@@ -575,9 +575,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
cfg,
ctxPayload,
dispatcher,
onSettled: () => {
markDispatchIdle();
},
onSettled: () => markDispatchIdle(),
replyOptions,
});

View File

@@ -90,66 +90,137 @@ export function createMSTeamsReplyDispatcher(params: {
},
});
const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "msteams");
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: params.cfg,
channel: "msteams",
});
const mediaMaxBytes = resolveChannelMediaMaxBytes({
cfg: params.cfg,
resolveChannelLimitMb: ({ cfg }) => cfg.channels?.msteams?.mediaMaxMb,
});
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
typingCallbacks,
deliver: async (payload) => {
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: params.cfg,
channel: "msteams",
});
const messages = renderReplyPayloadsToMessages([payload], {
textChunkLimit: params.textLimit,
chunkText: true,
mediaMode: "split",
tableMode,
chunkMode,
});
const mediaMaxBytes = resolveChannelMediaMaxBytes({
cfg: params.cfg,
resolveChannelLimitMb: ({ cfg }) => cfg.channels?.msteams?.mediaMaxMb,
});
const ids = await sendMSTeamsMessages({
// Accumulate rendered messages from all deliver() calls so the entire turn's
// reply is sent in a single sendMSTeamsMessages() call. This avoids Teams
// silently dropping blocks 2+ when each deliver() opened its own independent
// continueConversation() call — only the first proactive send per turn context
// window succeeds. (#29379)
const pendingMessages: MSTeamsRenderedMessage[] = [];
const sendMessages = async (messages: MSTeamsRenderedMessage[]): Promise<string[]> => {
return sendMSTeamsMessages({
replyStyle: params.replyStyle,
adapter: params.adapter,
appId: params.appId,
conversationRef: params.conversationRef,
context: params.context,
messages,
// Enable default retry/backoff for throttling/transient failures.
retry: {},
onRetry: (event) => {
params.log.debug?.("retrying send", {
replyStyle: params.replyStyle,
adapter: params.adapter,
appId: params.appId,
conversationRef: params.conversationRef,
context: params.context,
messages,
// Enable default retry/backoff for throttling/transient failures.
retry: {},
onRetry: (event) => {
params.log.debug?.("retrying send", {
replyStyle: params.replyStyle,
...event,
});
},
tokenProvider: params.tokenProvider,
sharePointSiteId: params.sharePointSiteId,
mediaMaxBytes,
...event,
});
if (ids.length > 0) {
params.onSentMessageIds?.(ids);
}
},
onError: (err, info) => {
tokenProvider: params.tokenProvider,
sharePointSiteId: params.sharePointSiteId,
mediaMaxBytes,
});
};
const flushPendingMessages = async () => {
if (pendingMessages.length === 0) {
return;
}
// Copy the buffer before draining so we have a reference for per-message
// retry if the batch send fails.
const toSend = pendingMessages.splice(0);
const total = toSend.length;
let ids: string[];
try {
ids = await sendMessages(toSend);
} catch {
// Batch send failed (e.g. bad attachment on one message); retry each
// message individually so trailing blocks are not silently lost.
ids = [];
let failed = 0;
for (const msg of toSend) {
try {
const msgIds = await sendMessages([msg]);
ids.push(...msgIds);
} catch {
failed += 1;
params.log.debug?.("individual message send failed, continuing with remaining blocks");
}
}
if (failed > 0) {
params.log.warn?.(`failed to deliver ${failed} of ${total} message blocks`, {
failed,
total,
});
}
}
if (ids.length > 0) {
params.onSentMessageIds?.(ids);
}
};
const {
dispatcher,
replyOptions,
markDispatchIdle: baseMarkDispatchIdle,
} = core.channel.reply.createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
typingCallbacks,
deliver: async (payload) => {
// Render the payload to messages and accumulate them. All messages from
// this turn are flushed together in markDispatchIdle() so they go out
// in a single continueConversation() call.
const messages = renderReplyPayloadsToMessages([payload], {
textChunkLimit: params.textLimit,
chunkText: true,
mediaMode: "split",
tableMode,
chunkMode,
});
pendingMessages.push(...messages);
},
onError: (err, info) => {
const errMsg = formatUnknownError(err);
const classification = classifyMSTeamsSendError(err);
const hint = formatMSTeamsSendErrorHint(classification);
params.runtime.error?.(
`msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`,
);
params.log.error("reply failed", {
kind: info.kind,
error: errMsg,
classification,
hint,
});
},
});
// Wrap markDispatchIdle to flush all accumulated messages before signalling idle.
// Returns a promise so callers (e.g. onSettled) can await completion.
const markDispatchIdle = (): Promise<void> => {
return flushPendingMessages()
.catch((err) => {
const errMsg = formatUnknownError(err);
const classification = classifyMSTeamsSendError(err);
const hint = formatMSTeamsSendErrorHint(classification);
params.runtime.error?.(
`msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`,
);
params.log.error("reply failed", {
kind: info.kind,
params.runtime.error?.(`msteams flush reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`);
params.log.error("flush reply failed", {
error: errMsg,
classification,
hint,
});
},
});
})
.finally(() => {
baseMarkDispatchIdle();
});
};
return {
dispatcher,