fix(msteams): keep streaming alive during long tool chains via typing indicator (#59731) (#64088)

* fix(msteams): keep streaming alive during long tool chains via periodic typing (#59731)

* test(msteams): align thread-session store mock with interface

* fix(msteams): treat failed streams as inactive

---------

Co-authored-by: Brad Groux <bradgroux@users.noreply.github.com>
Co-authored-by: Brad Groux <3053586+BradGroux@users.noreply.github.com>
This commit is contained in:
sudie-codes
2026-04-10 12:42:41 -07:00
committed by GitHub
parent 01ea7e4921
commit 99f76ec4c6
4 changed files with 304 additions and 24 deletions

View File

@@ -10,6 +10,9 @@ const streamInstances = vi.hoisted(
() =>
[] as Array<{
hasContent: boolean;
isFinalized: boolean;
isFailed: boolean;
streamedLength: number;
sendInformativeUpdate: ReturnType<typeof vi.fn>;
update: ReturnType<typeof vi.fn>;
finalize: ReturnType<typeof vi.fn>;
@@ -45,9 +48,14 @@ vi.mock("./revoked-context.js", () => ({
vi.mock("./streaming-message.js", () => ({
TeamsHttpStream: class {
hasContent = false;
isFinalized = false;
isFailed = false;
streamedLength = 0;
sendInformativeUpdate = vi.fn(async () => {});
update = vi.fn();
finalize = vi.fn(async () => {});
finalize = vi.fn(async function (this: { isFinalized: boolean }) {
this.isFinalized = true;
});
constructor() {
streamInstances.push(this);
@@ -103,12 +111,17 @@ describe("createMSTeamsReplyDispatcher", () => {
});
});
let lastCreatedDispatcher: ReturnType<typeof createMSTeamsReplyDispatcher> | undefined;
let lastContextSendActivity: ReturnType<typeof vi.fn> | undefined;
function createDispatcher(
conversationType: string = "personal",
msteamsConfig: Record<string, unknown> = {},
extraParams: { onSentMessageIds?: (ids: string[]) => void } = {},
) {
return createMSTeamsReplyDispatcher({
const contextSendActivity = vi.fn(async () => ({ id: "activity-1" }));
lastContextSendActivity = contextSendActivity;
const dispatcher = createMSTeamsReplyDispatcher({
cfg: { channels: { msteams: msteamsConfig } } as never,
agentId: "agent",
sessionKey: "agent:main:main",
@@ -129,12 +142,28 @@ describe("createMSTeamsReplyDispatcher", () => {
serviceUrl: "https://service.example.com",
} as never,
context: {
sendActivity: vi.fn(async () => ({ id: "activity-1" })),
sendActivity: contextSendActivity,
} as never,
replyStyle: "thread",
textLimit: 4000,
...extraParams,
});
lastCreatedDispatcher = dispatcher;
return dispatcher;
}
function getContextSendActivity(): ReturnType<typeof vi.fn> {
if (!lastContextSendActivity) {
throw new Error("createDispatcher must be called first");
}
return lastContextSendActivity;
}
async function triggerPartialReply(text: string): Promise<void> {
if (!lastCreatedDispatcher) {
throw new Error("createDispatcher must be called first");
}
await lastCreatedDispatcher.replyOptions.onPartialReply?.({ text });
}
it("sends an informative status update on reply start for personal chats", async () => {
@@ -145,9 +174,127 @@ describe("createMSTeamsReplyDispatcher", () => {
expect(streamInstances).toHaveLength(1);
expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1);
});
it("starts the typing keepalive in personal chats so the TurnContext survives long tool chains", async () => {
createDispatcher("personal");
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.onReplyStart?.();
// In addition to the streaming card's informative update, the typing
// keepalive is now started on personal chats so Bot Framework proxies
// stay alive during long tool chains (#59731).
expect(typingCallbacks.onReplyStart).toHaveBeenCalledTimes(1);
});
it("skips the typing keepalive in personal chats when typingIndicator=false", async () => {
createDispatcher("personal", { typingIndicator: false });
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.onReplyStart?.();
// Even though we still send the informative update, the opt-out
// disables the typing keepalive.
expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1);
expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled();
});
it("passes a longer keepalive TTL so the loop survives long tool chains", () => {
createDispatcher("personal");
const pipelineArgs = createChannelReplyPipelineMock.mock.calls[0]?.[0];
expect(pipelineArgs?.typing?.keepaliveIntervalMs).toBeGreaterThan(3_000);
expect(pipelineArgs?.typing?.keepaliveIntervalMs).toBeLessThanOrEqual(10_000);
// Issue #59731 reports 60s+ tool chains — the default 60s TTL is too
// tight so the dispatcher passes its own generous ceiling.
expect(pipelineArgs?.typing?.maxDurationMs).toBeGreaterThanOrEqual(300_000);
});
it("allows typing keepalive sends before any stream tokens arrive", async () => {
createDispatcher("personal");
const pipelineArgs = createChannelReplyPipelineMock.mock.calls[0]?.[0];
const sendTyping = pipelineArgs?.typing?.start as () => Promise<void>;
// No onPartialReply has been called yet, so the stream is not active.
// The typing keepalive should be allowed to warm the TurnContext.
const contextSendActivity = getContextSendActivity();
contextSendActivity.mockClear();
await sendTyping();
expect(contextSendActivity).toHaveBeenCalledWith({ type: "typing" });
});
it("suppresses typing keepalive sends while the stream card is actively chunking", async () => {
createDispatcher("personal");
const pipelineArgs = createChannelReplyPipelineMock.mock.calls[0]?.[0];
const sendTyping = pipelineArgs?.typing?.start as () => Promise<void>;
// Simulate the stream actively receiving a partial chunk. While the
// stream card is live we do not want a plain "..." typing indicator
// layered on top of it.
await triggerPartialReply("streaming content");
const contextSendActivity = getContextSendActivity();
contextSendActivity.mockClear();
await sendTyping();
expect(contextSendActivity).not.toHaveBeenCalled();
});
it("resumes typing keepalive sends once the stream finalizes between tool rounds", async () => {
createDispatcher("personal");
const pipelineArgs = createChannelReplyPipelineMock.mock.calls[0]?.[0];
const sendTyping = pipelineArgs?.typing?.start as () => Promise<void>;
// First segment: tokens flow, stream is active, typing is gated off.
await triggerPartialReply("first segment tokens");
const stream = streamInstances[0];
if (!stream) {
throw new Error("expected a Teams stream instance to be created");
}
const contextSendActivity = getContextSendActivity();
contextSendActivity.mockClear();
await sendTyping();
expect(contextSendActivity).not.toHaveBeenCalled();
// First segment complete: the stream is finalized ahead of the tool
// chain. Mirror what preparePayload does by flipping the mocked stream's
// finalized flag. The controller's isStreamActive check reads this via
// the real stream controller wired into the dispatcher.
stream.isFinalized = true;
// During the tool chain the loop should be allowed to fire again so
// the Bot Framework proxy stays warm. See #59731.
contextSendActivity.mockClear();
await sendTyping();
expect(contextSendActivity).toHaveBeenCalledWith({ type: "typing" });
});
it("fires native typing in group chats (no stream) because the gate never applies", async () => {
createDispatcher("groupchat");
const pipelineArgs = createChannelReplyPipelineMock.mock.calls[0]?.[0];
const sendTyping = pipelineArgs?.typing?.start as () => Promise<void>;
// In group chats we don't create a stream, so isStreamActive() always
// returns false and the typing indicator still fires normally.
const contextSendActivity = getContextSendActivity();
contextSendActivity.mockClear();
await sendTyping();
expect(contextSendActivity).toHaveBeenCalledWith({ type: "typing" });
});
it("is a no-op for channel conversations (typing unsupported)", async () => {
createDispatcher("channel");
const pipelineArgs = createChannelReplyPipelineMock.mock.calls[0]?.[0];
const sendTyping = pipelineArgs?.typing?.start as () => Promise<void>;
const contextSendActivity = getContextSendActivity();
contextSendActivity.mockClear();
await sendTyping();
// Teams channel conversations do not support the typing activity at
// all, so the start callback is a no-op regardless of stream state.
expect(contextSendActivity).not.toHaveBeenCalled();
});
it("sends native typing indicator for channel conversations by default", async () => {
createDispatcher("channel");
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];

View File

@@ -53,26 +53,62 @@ export function createMSTeamsReplyDispatcher(params: {
);
const isTypingSupported = conversationType === "personal" || conversationType === "groupchat";
/**
* Keepalive cadence for the typing indicator while the bot is running
* (including long tool chains). Bot Framework 1:1 TurnContext proxies
* expire after ~30s of inactivity; sending a typing activity every 8s
* keeps the proxy alive so the post-tool reply can still land via the
* turn context. Sits in the middle of the 5-10s range recommended in
* #59731.
*/
const TYPING_KEEPALIVE_INTERVAL_MS = 8_000;
/**
* TTL ceiling for the typing keepalive loop. The default in
* createTypingCallbacks is 60s, which is too short for the Teams long tool
* chains described in #59731 (60s+ total runs are common). Give tool
* chains up to 10 minutes before auto-stopping the keepalive.
*/
const TYPING_KEEPALIVE_MAX_DURATION_MS = 10 * 60_000;
// Forward reference: sendTypingIndicator is built before the stream
// controller exists, but the keepalive tick needs to check stream state so
// we don't overlay "..." typing on the visible streaming card. The ref is
// wired once the stream controller is constructed below.
const streamActiveRef: { current: () => boolean } = { current: () => false };
const rawSendTypingIndicator = async () => {
await withRevokedProxyFallback({
run: async () => {
await params.context.sendActivity({ type: "typing" });
},
onRevoked: async () => {
const baseRef = buildConversationReference(params.conversationRef);
await params.adapter.continueConversation(
params.appId,
{ ...baseRef, activityId: undefined },
async (ctx) => {
await ctx.sendActivity({ type: "typing" });
},
);
},
onRevokedLog: () => {
params.log.debug?.("turn context revoked, sending typing via proactive messaging");
},
});
};
const sendTypingIndicator = isTypingSupported
? async () => {
await withRevokedProxyFallback({
run: async () => {
await params.context.sendActivity({ type: "typing" });
},
onRevoked: async () => {
const baseRef = buildConversationReference(params.conversationRef);
await params.adapter.continueConversation(
params.appId,
{ ...baseRef, activityId: undefined },
async (ctx) => {
await ctx.sendActivity({ type: "typing" });
},
);
},
onRevokedLog: () => {
params.log.debug?.("turn context revoked, sending typing via proactive messaging");
},
});
// While the streaming card is actively being updated the user
// already sees a live indicator in the stream — don't overlay a
// plain "..." typing on top of it. Between segments (tool chain)
// the stream is finalized, so typing indicators are appropriate
// and they are what keep the TurnContext alive. See #59731.
if (streamActiveRef.current()) {
return;
}
await rawSendTypingIndicator();
}
: async () => {};
@@ -83,6 +119,8 @@ export function createMSTeamsReplyDispatcher(params: {
accountId: params.accountId,
typing: {
start: sendTypingIndicator,
keepaliveIntervalMs: TYPING_KEEPALIVE_INTERVAL_MS,
maxDurationMs: TYPING_KEEPALIVE_MAX_DURATION_MS,
onStartError: (err) => {
logTypingFailure({
log: (message) => params.log.debug?.(message),
@@ -110,6 +148,8 @@ export function createMSTeamsReplyDispatcher(params: {
feedbackLoopEnabled,
log: params.log,
});
// Wire the forward-declared gate used by sendTypingIndicator.
streamActiveRef.current = () => streamController.isStreamActive();
const blockStreamingEnabled =
typeof msteamsCfg?.blockStreaming === "boolean" ? msteamsCfg.blockStreaming : false;
@@ -215,8 +255,14 @@ export function createMSTeamsReplyDispatcher(params: {
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
onReplyStart: async () => {
await streamController.onReplyStart();
// Avoid duplicate typing UX in DMs: stream status already shows progress.
if (typingIndicatorEnabled && !streamController.hasStream()) {
// Always start the typing keepalive loop when typing is enabled and
// supported by this conversation type. The sendTypingIndicator gate
// skips actual sends while the stream card is visually active, so
// during the first text segment the user only sees the streaming UI.
// Once the stream finalizes (between segments / during tool chains),
// the loop starts sending typing activities and keeps the Bot Framework
// TurnContext alive so the post-tool reply can still land. See #59731.
if (typingIndicatorEnabled) {
await typingCallbacks?.onReplyStart?.();
}
},

View File

@@ -179,4 +179,57 @@ describe("createTeamsReplyStreamController", () => {
mediaUrl: "https://example.com/image.png",
});
});
describe("isStreamActive", () => {
it("returns false before any tokens arrive so typing keepalive can warm up", () => {
const ctrl = createController();
expect(ctrl.isStreamActive()).toBe(false);
});
it("returns false after the informative update but before tokens arrive", async () => {
const ctrl = createController();
await ctrl.onReplyStart();
expect(ctrl.isStreamActive()).toBe(false);
});
it("returns true while the stream is actively receiving tokens", () => {
const ctrl = createController();
ctrl.onPartialReply({ text: "Streaming tokens" });
expect(ctrl.isStreamActive()).toBe(true);
});
it("returns false after the stream is finalized between tool rounds", () => {
const ctrl = createController();
ctrl.onPartialReply({ text: "First segment" });
expect(ctrl.isStreamActive()).toBe(true);
// First segment complete: stream is finalized so the typing keepalive
// can resume during the tool chain that follows.
ctrl.preparePayload({ text: "First segment" });
expect(ctrl.isStreamActive()).toBe(false);
});
it("returns false when the stream has failed", () => {
const ctrl = createController();
ctrl.onPartialReply({ text: "First segment" });
expect(ctrl.isStreamActive()).toBe(true);
streamInstances[0].isFailed = true;
expect(ctrl.isStreamActive()).toBe(false);
});
it("returns false when conversationType is not personal", () => {
streamInstances.length = 0;
const ctrl = createTeamsReplyStreamController({
conversationType: "channel",
context: { sendActivity: vi.fn() } as never,
feedbackLoopEnabled: false,
log: { debug: vi.fn() } as never,
});
ctrl.onPartialReply({ text: "anything" });
expect(ctrl.isStreamActive()).toBe(false);
});
});
});

View File

@@ -5,6 +5,13 @@ import type { MSTeamsMonitorLogger } from "./monitor-types.js";
import type { MSTeamsTurnContext } from "./sdk-types.js";
import { TeamsHttpStream } from "./streaming-message.js";
// Local generic wrapper to defer union resolution. Works around a
// single-file-mode limitation in the type-aware lint where imported
// types resolved via extension runtime-api barrels are treated as
// `error` (acting as `any`) and trip `no-redundant-type-constituents`
// when combined with `undefined` in a union.
type Maybe<T> = T | undefined;
const INFORMATIVE_STATUS_TEXTS = [
"Thinking...",
"Working on that...",
@@ -56,7 +63,7 @@ export function createTeamsReplyStreamController(params: {
stream.update(payload.text);
},
preparePayload(payload: ReplyPayload): ReplyPayload | undefined {
preparePayload(payload: ReplyPayload): Maybe<ReplyPayload> {
if (!stream || !streamReceivedTokens) {
return payload;
}
@@ -109,5 +116,32 @@ export function createTeamsReplyStreamController(params: {
hasStream(): boolean {
return Boolean(stream);
},
/**
* Whether the Teams streaming card is currently receiving LLM tokens.
* Used to gate side-channel keepalive activity so we don't overlay plain
* "typing" indicators on top of a live streaming card.
*
* Returns true only while the stream is actively chunking text into the
* streaming card. The informative update (blue progress bar) is short
* lived so we intentionally do not count it as "active"; this way the
* typing keepalive can still fire during the informative window and
* during tool chains between text segments.
*
* Returns false when:
* - No stream exists (non-personal conversation).
* - Stream has not yet received any text tokens.
* - Stream has been finalized (e.g. after the first text segment, while
* tools run before the next segment).
*/
isStreamActive(): boolean {
if (!stream) {
return false;
}
if (stream.isFinalized || stream.isFailed) {
return false;
}
return streamReceivedTokens;
},
};
}