mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-05 14:21:32 +00:00
* fix(msteams): reset stream state after preparePayload suppresses delivery When an agent uses tools mid-response (text → tool calls → more text), the stream controller's preparePayload would suppress fallback delivery for ALL text segments because streamReceivedTokens stayed true. This caused the second text segment to be silently lost or duplicated. Fix: after preparePayload suppresses delivery for a streamed segment, finalize the stream and reset streamReceivedTokens so subsequent segments use fallback delivery. Fixes openclaw/openclaw#56040 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(msteams): guard preparePayload against finalized stream re-suppression When onPartialReply fires after the stream is finalized (post-tool partial tokens), streamReceivedTokens gets set back to true but the stream can't deliver. Add stream.isFinalized check so a finalized stream never suppresses fallback delivery. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(msteams): await pending finalize in controller to prevent race Store the fire-and-forget finalize promise from preparePayload and await it in the controller's finalize() method. This ensures markDispatchIdle waits for the in-flight stream finalization to complete before context cleanup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(msteams): add edge case tests for multi-round and media payloads Add tests for 3+ tool call rounds (text → tool → text → tool → text) and media+text payloads after stream finalization, covering the full contract of preparePayload across all input types and cycle counts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
85 lines
2.6 KiB
TypeScript
85 lines
2.6 KiB
TypeScript
import type { ReplyPayload } from "../runtime-api.js";
|
|
import type { MSTeamsMonitorLogger } from "./monitor-types.js";
|
|
import type { MSTeamsTurnContext } from "./sdk-types.js";
|
|
import { TeamsHttpStream } from "./streaming-message.js";
|
|
|
|
const INFORMATIVE_STATUS_TEXTS = [
|
|
"Thinking...",
|
|
"Working on that...",
|
|
"Checking the details...",
|
|
"Putting an answer together...",
|
|
];
|
|
|
|
export function pickInformativeStatusText(random = Math.random): string {
|
|
const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length);
|
|
return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!;
|
|
}
|
|
|
|
export function createTeamsReplyStreamController(params: {
|
|
conversationType?: string;
|
|
context: MSTeamsTurnContext;
|
|
feedbackLoopEnabled: boolean;
|
|
log: MSTeamsMonitorLogger;
|
|
random?: () => number;
|
|
}) {
|
|
const isPersonal = params.conversationType?.toLowerCase() === "personal";
|
|
const stream = isPersonal
|
|
? new TeamsHttpStream({
|
|
sendActivity: (activity) => params.context.sendActivity(activity),
|
|
feedbackLoopEnabled: params.feedbackLoopEnabled,
|
|
onError: (err) => {
|
|
params.log.debug?.(`stream error: ${err instanceof Error ? err.message : String(err)}`);
|
|
},
|
|
})
|
|
: undefined;
|
|
|
|
let streamReceivedTokens = false;
|
|
let informativeUpdateSent = false;
|
|
let pendingFinalize: Promise<void> | undefined;
|
|
|
|
return {
|
|
async onReplyStart(): Promise<void> {
|
|
if (!stream || informativeUpdateSent) {
|
|
return;
|
|
}
|
|
informativeUpdateSent = true;
|
|
await stream.sendInformativeUpdate(pickInformativeStatusText(params.random));
|
|
},
|
|
|
|
onPartialReply(payload: { text?: string }): void {
|
|
if (!stream || !payload.text) {
|
|
return;
|
|
}
|
|
streamReceivedTokens = true;
|
|
stream.update(payload.text);
|
|
},
|
|
|
|
preparePayload(payload: ReplyPayload): ReplyPayload | undefined {
|
|
if (!stream || !streamReceivedTokens || !stream.hasContent || stream.isFinalized) {
|
|
return payload;
|
|
}
|
|
|
|
// Stream handled this text segment — finalize it and reset so any
|
|
// subsequent text segments (after tool calls) use fallback delivery.
|
|
// finalize() is idempotent; the later call in markDispatchIdle is a no-op.
|
|
streamReceivedTokens = false;
|
|
pendingFinalize = stream.finalize();
|
|
|
|
const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length);
|
|
if (!hasMedia) {
|
|
return undefined;
|
|
}
|
|
return { ...payload, text: undefined };
|
|
},
|
|
|
|
async finalize(): Promise<void> {
|
|
await pendingFinalize;
|
|
await stream?.finalize();
|
|
},
|
|
|
|
hasStream(): boolean {
|
|
return Boolean(stream);
|
|
},
|
|
};
|
|
}
|