fix(msteams): reset stream state after tool calls to prevent message loss (#56071)

* fix(msteams): reset stream state after preparePayload suppresses delivery

When an agent uses tools mid-response (text → tool calls → more text),
the stream controller's preparePayload would suppress fallback delivery
for ALL text segments because streamReceivedTokens stayed true. This
caused the second text segment to be silently lost or duplicated.

Fix: after preparePayload suppresses delivery for a streamed segment,
finalize the stream and reset streamReceivedTokens so subsequent
segments use fallback delivery.

Fixes openclaw/openclaw#56040

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(msteams): guard preparePayload against finalized stream re-suppression

When onPartialReply fires after the stream is finalized (post-tool
partial tokens), streamReceivedTokens gets set back to true but the
stream can't deliver. Add stream.isFinalized check so a finalized
stream never suppresses fallback delivery.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(msteams): await pending finalize in controller to prevent race

Store the fire-and-forget finalize promise from preparePayload and
await it in the controller's finalize() method. This ensures
markDispatchIdle waits for the in-flight stream finalization to
complete before context cleanup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test(msteams): add edge case tests for multi-round and media payloads

Add tests for 3+ tool call rounds (text → tool → text → tool → text)
and media+text payloads after stream finalization, covering the full
contract of preparePayload across all input types and cycle counts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sid Uppal
2026-03-27 19:36:37 -07:00
committed by GitHub
parent 4752aca926
commit 295d1de8d9
2 changed files with 155 additions and 1 deletions

View File

@@ -0,0 +1,146 @@
import { describe, expect, it, vi } from "vitest";
const streamInstances = vi.hoisted(
() =>
[] as Array<{
hasContent: boolean;
isFinalized: boolean;
sendInformativeUpdate: ReturnType<typeof vi.fn>;
update: ReturnType<typeof vi.fn>;
finalize: ReturnType<typeof vi.fn>;
}>,
);
vi.mock("./streaming-message.js", () => ({
TeamsHttpStream: class {
hasContent = false;
isFinalized = false;
sendInformativeUpdate = vi.fn(async () => {});
update = vi.fn(function (this: { hasContent: boolean }) {
this.hasContent = true;
});
finalize = vi.fn(async function (this: { isFinalized: boolean }) {
this.isFinalized = true;
});
constructor() {
streamInstances.push(this as never);
}
},
}));
import { createTeamsReplyStreamController } from "./reply-stream-controller.js";
describe("createTeamsReplyStreamController", () => {
function createController() {
streamInstances.length = 0;
return createTeamsReplyStreamController({
conversationType: "personal",
context: { sendActivity: vi.fn(async () => ({ id: "a" })) } as never,
feedbackLoopEnabled: false,
log: { debug: vi.fn() } as never,
});
}
it("suppresses fallback for first text segment that was streamed", () => {
const ctrl = createController();
ctrl.onPartialReply({ text: "Hello world" });
const result = ctrl.preparePayload({ text: "Hello world" });
expect(result).toBeUndefined();
});
it("allows fallback delivery for second text segment after tool calls", () => {
const ctrl = createController();
// First text segment: streaming tokens arrive
ctrl.onPartialReply({ text: "First segment" });
// First segment complete: preparePayload suppresses (stream handled it)
const result1 = ctrl.preparePayload({ text: "First segment" });
expect(result1).toBeUndefined();
// Tool calls happen... then second text segment arrives via deliver()
// preparePayload should allow fallback delivery for this segment
const result2 = ctrl.preparePayload({ text: "Second segment after tools" });
expect(result2).toEqual({ text: "Second segment after tools" });
});
it("finalizes the stream when suppressing first segment", () => {
const ctrl = createController();
ctrl.onPartialReply({ text: "Streamed text" });
ctrl.preparePayload({ text: "Streamed text" });
expect(streamInstances[0]?.finalize).toHaveBeenCalled();
});
it("uses fallback even when onPartialReply fires after stream finalized", () => {
const ctrl = createController();
// First text segment: streaming tokens arrive
ctrl.onPartialReply({ text: "First segment" });
// First segment complete: preparePayload suppresses and finalizes stream
const result1 = ctrl.preparePayload({ text: "First segment" });
expect(result1).toBeUndefined();
expect(streamInstances[0]?.isFinalized).toBe(true);
// Post-tool partial replies fire again (stream.update is a no-op since finalized)
ctrl.onPartialReply({ text: "Second segment" });
// Must still use fallback because stream is finalized and can't deliver
const result2 = ctrl.preparePayload({ text: "Second segment" });
expect(result2).toEqual({ text: "Second segment" });
});
it("delivers all segments across 3+ tool call rounds", () => {
const ctrl = createController();
// Round 1: text → tool
ctrl.onPartialReply({ text: "Segment 1" });
expect(ctrl.preparePayload({ text: "Segment 1" })).toBeUndefined();
// Round 2: text → tool
ctrl.onPartialReply({ text: "Segment 2" });
const r2 = ctrl.preparePayload({ text: "Segment 2" });
expect(r2).toEqual({ text: "Segment 2" });
// Round 3: final text
ctrl.onPartialReply({ text: "Segment 3" });
const r3 = ctrl.preparePayload({ text: "Segment 3" });
expect(r3).toEqual({ text: "Segment 3" });
});
it("passes media+text payload through fully after stream finalized", () => {
const ctrl = createController();
// First segment streamed and finalized
ctrl.onPartialReply({ text: "Streamed text" });
ctrl.preparePayload({ text: "Streamed text" });
// Second segment has both text and media — should pass through fully
const result = ctrl.preparePayload({
text: "Post-tool text with image",
mediaUrl: "https://example.com/tool-output.png",
});
expect(result).toEqual({
text: "Post-tool text with image",
mediaUrl: "https://example.com/tool-output.png",
});
});
it("still strips text from media payloads when stream handled text", () => {
const ctrl = createController();
ctrl.onPartialReply({ text: "Some text" });
const result = ctrl.preparePayload({
text: "Some text",
mediaUrl: "https://example.com/image.png",
});
expect(result).toEqual({
text: undefined,
mediaUrl: "https://example.com/image.png",
});
});
});

View File

@@ -35,6 +35,7 @@ export function createTeamsReplyStreamController(params: {
let streamReceivedTokens = false;
let informativeUpdateSent = false;
let pendingFinalize: Promise<void> | undefined;
return {
async onReplyStart(): Promise<void> {
@@ -54,10 +55,16 @@ export function createTeamsReplyStreamController(params: {
},
preparePayload(payload: ReplyPayload): ReplyPayload | undefined {
if (!stream || !streamReceivedTokens || !stream.hasContent) {
if (!stream || !streamReceivedTokens || !stream.hasContent || stream.isFinalized) {
return payload;
}
// Stream handled this text segment — finalize it and reset so any
// subsequent text segments (after tool calls) use fallback delivery.
// finalize() is idempotent; the later call in markDispatchIdle is a no-op.
streamReceivedTokens = false;
pendingFinalize = stream.finalize();
const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length);
if (!hasMedia) {
return undefined;
@@ -66,6 +73,7 @@ export function createTeamsReplyStreamController(params: {
},
async finalize(): Promise<void> {
await pendingFinalize;
await stream?.finalize();
},