mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-24 07:31:44 +00:00
* fix(msteams): keep streaming alive during long tool chains via periodic typing (#59731) * test(msteams): align thread-session store mock with interface * fix(msteams): treat failed streams as inactive --------- Co-authored-by: Brad Groux <bradgroux@users.noreply.github.com> Co-authored-by: Brad Groux <3053586+BradGroux@users.noreply.github.com>
347 lines
12 KiB
TypeScript
347 lines
12 KiB
TypeScript
import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime";
|
|
import {
|
|
createChannelReplyPipeline,
|
|
logTypingFailure,
|
|
resolveChannelMediaMaxBytes,
|
|
type OpenClawConfig,
|
|
type MSTeamsReplyStyle,
|
|
type RuntimeEnv,
|
|
} from "../runtime-api.js";
|
|
import type { MSTeamsAccessTokenProvider } from "./attachments/types.js";
|
|
import type { StoredConversationReference } from "./conversation-store.js";
|
|
import {
|
|
classifyMSTeamsSendError,
|
|
formatMSTeamsSendErrorHint,
|
|
formatUnknownError,
|
|
} from "./errors.js";
|
|
import {
|
|
buildConversationReference,
|
|
type MSTeamsAdapter,
|
|
type MSTeamsRenderedMessage,
|
|
renderReplyPayloadsToMessages,
|
|
sendMSTeamsMessages,
|
|
} from "./messenger.js";
|
|
import type { MSTeamsMonitorLogger } from "./monitor-types.js";
|
|
import { createTeamsReplyStreamController } from "./reply-stream-controller.js";
|
|
import { withRevokedProxyFallback } from "./revoked-context.js";
|
|
import { getMSTeamsRuntime } from "./runtime.js";
|
|
import type { MSTeamsTurnContext } from "./sdk-types.js";
|
|
|
|
export { pickInformativeStatusText } from "./reply-stream-controller.js";
|
|
|
|
export function createMSTeamsReplyDispatcher(params: {
|
|
cfg: OpenClawConfig;
|
|
agentId: string;
|
|
sessionKey: string;
|
|
accountId?: string;
|
|
runtime: RuntimeEnv;
|
|
log: MSTeamsMonitorLogger;
|
|
adapter: MSTeamsAdapter;
|
|
appId: string;
|
|
conversationRef: StoredConversationReference;
|
|
context: MSTeamsTurnContext;
|
|
replyStyle: MSTeamsReplyStyle;
|
|
textLimit: number;
|
|
onSentMessageIds?: (ids: string[]) => void;
|
|
tokenProvider?: MSTeamsAccessTokenProvider;
|
|
sharePointSiteId?: string;
|
|
}) {
|
|
const core = getMSTeamsRuntime();
|
|
const msteamsCfg = params.cfg.channels?.msteams;
|
|
const conversationType = normalizeOptionalLowercaseString(
|
|
params.conversationRef.conversation?.conversationType,
|
|
);
|
|
const isTypingSupported = conversationType === "personal" || conversationType === "groupchat";
|
|
|
|
/**
|
|
* Keepalive cadence for the typing indicator while the bot is running
|
|
* (including long tool chains). Bot Framework 1:1 TurnContext proxies
|
|
* expire after ~30s of inactivity; sending a typing activity every 8s
|
|
* keeps the proxy alive so the post-tool reply can still land via the
|
|
* turn context. Sits in the middle of the 5-10s range recommended in
|
|
* #59731.
|
|
*/
|
|
const TYPING_KEEPALIVE_INTERVAL_MS = 8_000;
|
|
|
|
/**
|
|
* TTL ceiling for the typing keepalive loop. The default in
|
|
* createTypingCallbacks is 60s, which is too short for the Teams long tool
|
|
* chains described in #59731 (60s+ total runs are common). Give tool
|
|
* chains up to 10 minutes before auto-stopping the keepalive.
|
|
*/
|
|
const TYPING_KEEPALIVE_MAX_DURATION_MS = 10 * 60_000;
|
|
|
|
// Forward reference: sendTypingIndicator is built before the stream
|
|
// controller exists, but the keepalive tick needs to check stream state so
|
|
// we don't overlay "..." typing on the visible streaming card. The ref is
|
|
// wired once the stream controller is constructed below.
|
|
const streamActiveRef: { current: () => boolean } = { current: () => false };
|
|
|
|
const rawSendTypingIndicator = async () => {
|
|
await withRevokedProxyFallback({
|
|
run: async () => {
|
|
await params.context.sendActivity({ type: "typing" });
|
|
},
|
|
onRevoked: async () => {
|
|
const baseRef = buildConversationReference(params.conversationRef);
|
|
await params.adapter.continueConversation(
|
|
params.appId,
|
|
{ ...baseRef, activityId: undefined },
|
|
async (ctx) => {
|
|
await ctx.sendActivity({ type: "typing" });
|
|
},
|
|
);
|
|
},
|
|
onRevokedLog: () => {
|
|
params.log.debug?.("turn context revoked, sending typing via proactive messaging");
|
|
},
|
|
});
|
|
};
|
|
|
|
const sendTypingIndicator = isTypingSupported
|
|
? async () => {
|
|
// While the streaming card is actively being updated the user
|
|
// already sees a live indicator in the stream — don't overlay a
|
|
// plain "..." typing on top of it. Between segments (tool chain)
|
|
// the stream is finalized, so typing indicators are appropriate
|
|
// and they are what keep the TurnContext alive. See #59731.
|
|
if (streamActiveRef.current()) {
|
|
return;
|
|
}
|
|
await rawSendTypingIndicator();
|
|
}
|
|
: async () => {};
|
|
|
|
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
|
|
cfg: params.cfg,
|
|
agentId: params.agentId,
|
|
channel: "msteams",
|
|
accountId: params.accountId,
|
|
typing: {
|
|
start: sendTypingIndicator,
|
|
keepaliveIntervalMs: TYPING_KEEPALIVE_INTERVAL_MS,
|
|
maxDurationMs: TYPING_KEEPALIVE_MAX_DURATION_MS,
|
|
onStartError: (err) => {
|
|
logTypingFailure({
|
|
log: (message) => params.log.debug?.(message),
|
|
channel: "msteams",
|
|
action: "start",
|
|
error: err,
|
|
});
|
|
},
|
|
},
|
|
});
|
|
|
|
const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "msteams");
|
|
const tableMode = core.channel.text.resolveMarkdownTableMode({
|
|
cfg: params.cfg,
|
|
channel: "msteams",
|
|
});
|
|
const mediaMaxBytes = resolveChannelMediaMaxBytes({
|
|
cfg: params.cfg,
|
|
resolveChannelLimitMb: ({ cfg }) => cfg.channels?.msteams?.mediaMaxMb,
|
|
});
|
|
const feedbackLoopEnabled = params.cfg.channels?.msteams?.feedbackEnabled !== false;
|
|
const streamController = createTeamsReplyStreamController({
|
|
conversationType,
|
|
context: params.context,
|
|
feedbackLoopEnabled,
|
|
log: params.log,
|
|
});
|
|
// Wire the forward-declared gate used by sendTypingIndicator.
|
|
streamActiveRef.current = () => streamController.isStreamActive();
|
|
|
|
const blockStreamingEnabled =
|
|
typeof msteamsCfg?.blockStreaming === "boolean" ? msteamsCfg.blockStreaming : false;
|
|
const typingIndicatorEnabled =
|
|
typeof msteamsCfg?.typingIndicator === "boolean" ? msteamsCfg.typingIndicator : true;
|
|
|
|
const pendingMessages: MSTeamsRenderedMessage[] = [];
|
|
|
|
const sendMessages = async (messages: MSTeamsRenderedMessage[]): Promise<string[]> => {
|
|
return sendMSTeamsMessages({
|
|
replyStyle: params.replyStyle,
|
|
adapter: params.adapter,
|
|
appId: params.appId,
|
|
conversationRef: params.conversationRef,
|
|
context: params.context,
|
|
messages,
|
|
retry: {},
|
|
onRetry: (event) => {
|
|
params.log.debug?.("retrying send", {
|
|
replyStyle: params.replyStyle,
|
|
...event,
|
|
});
|
|
},
|
|
tokenProvider: params.tokenProvider,
|
|
sharePointSiteId: params.sharePointSiteId,
|
|
mediaMaxBytes,
|
|
feedbackLoopEnabled,
|
|
});
|
|
};
|
|
|
|
const queueDeliveryFailureSystemEvent = (failure: {
|
|
failed: number;
|
|
total: number;
|
|
error: unknown;
|
|
}) => {
|
|
const classification = classifyMSTeamsSendError(failure.error);
|
|
const errorText = formatUnknownError(failure.error);
|
|
const failedAll = failure.failed >= failure.total;
|
|
const summary = failedAll
|
|
? "the previous reply was not delivered"
|
|
: `${failure.failed} of ${failure.total} message blocks were not delivered`;
|
|
const sentences = [
|
|
`Microsoft Teams delivery failed: ${summary}.`,
|
|
`The user may not have received ${failedAll ? "that reply" : "the full reply"}.`,
|
|
`Error: ${errorText}.`,
|
|
classification.statusCode != null ? `Status: ${classification.statusCode}.` : undefined,
|
|
classification.kind === "transient" || classification.kind === "throttled"
|
|
? "Retrying later may succeed."
|
|
: undefined,
|
|
].filter(Boolean);
|
|
core.system.enqueueSystemEvent(sentences.join(" "), {
|
|
sessionKey: params.sessionKey,
|
|
contextKey: `msteams:delivery-failure:${params.conversationRef.conversation?.id ?? "unknown"}`,
|
|
});
|
|
};
|
|
|
|
const flushPendingMessages = async () => {
|
|
if (pendingMessages.length === 0) {
|
|
return;
|
|
}
|
|
const toSend = pendingMessages.splice(0);
|
|
const total = toSend.length;
|
|
let ids: string[];
|
|
try {
|
|
ids = await sendMessages(toSend);
|
|
} catch (batchError) {
|
|
ids = [];
|
|
let failed = 0;
|
|
let lastFailedError: unknown = batchError;
|
|
for (const msg of toSend) {
|
|
try {
|
|
const msgIds = await sendMessages([msg]);
|
|
ids.push(...msgIds);
|
|
} catch (msgError) {
|
|
failed += 1;
|
|
lastFailedError = msgError;
|
|
params.log.debug?.("individual message send failed, continuing with remaining blocks");
|
|
}
|
|
}
|
|
if (failed > 0) {
|
|
params.log.warn?.(`failed to deliver ${failed} of ${total} message blocks`, {
|
|
failed,
|
|
total,
|
|
});
|
|
queueDeliveryFailureSystemEvent({
|
|
failed,
|
|
total,
|
|
error: lastFailedError,
|
|
});
|
|
}
|
|
}
|
|
if (ids.length > 0) {
|
|
params.onSentMessageIds?.(ids);
|
|
}
|
|
};
|
|
|
|
const {
|
|
dispatcher,
|
|
replyOptions,
|
|
markDispatchIdle: baseMarkDispatchIdle,
|
|
} = core.channel.reply.createReplyDispatcherWithTyping({
|
|
...replyPipeline,
|
|
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
|
|
onReplyStart: async () => {
|
|
await streamController.onReplyStart();
|
|
// Always start the typing keepalive loop when typing is enabled and
|
|
// supported by this conversation type. The sendTypingIndicator gate
|
|
// skips actual sends while the stream card is visually active, so
|
|
// during the first text segment the user only sees the streaming UI.
|
|
// Once the stream finalizes (between segments / during tool chains),
|
|
// the loop starts sending typing activities and keeps the Bot Framework
|
|
// TurnContext alive so the post-tool reply can still land. See #59731.
|
|
if (typingIndicatorEnabled) {
|
|
await typingCallbacks?.onReplyStart?.();
|
|
}
|
|
},
|
|
typingCallbacks,
|
|
deliver: async (payload) => {
|
|
const preparedPayload = streamController.preparePayload(payload);
|
|
if (!preparedPayload) {
|
|
return;
|
|
}
|
|
|
|
const messages = renderReplyPayloadsToMessages([preparedPayload], {
|
|
textChunkLimit: params.textLimit,
|
|
chunkText: true,
|
|
mediaMode: "split",
|
|
tableMode,
|
|
chunkMode,
|
|
});
|
|
pendingMessages.push(...messages);
|
|
|
|
// When block streaming is enabled, flush immediately so blocks are
|
|
// delivered progressively instead of batching until markDispatchIdle.
|
|
if (blockStreamingEnabled) {
|
|
await flushPendingMessages();
|
|
}
|
|
},
|
|
onError: (err, info) => {
|
|
const errMsg = formatUnknownError(err);
|
|
const classification = classifyMSTeamsSendError(err);
|
|
const hint = formatMSTeamsSendErrorHint(classification);
|
|
params.runtime.error?.(
|
|
`msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`,
|
|
);
|
|
params.log.error("reply failed", {
|
|
kind: info.kind,
|
|
error: errMsg,
|
|
classification,
|
|
hint,
|
|
});
|
|
},
|
|
});
|
|
|
|
const markDispatchIdle = (): Promise<void> => {
|
|
return flushPendingMessages()
|
|
.catch((err) => {
|
|
const errMsg = formatUnknownError(err);
|
|
const classification = classifyMSTeamsSendError(err);
|
|
const hint = formatMSTeamsSendErrorHint(classification);
|
|
params.runtime.error?.(`msteams flush reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`);
|
|
params.log.error("flush reply failed", {
|
|
error: errMsg,
|
|
classification,
|
|
hint,
|
|
});
|
|
})
|
|
.then(() => {
|
|
return streamController.finalize().catch((err) => {
|
|
params.log.debug?.("stream finalize failed", { error: formatUnknownError(err) });
|
|
});
|
|
})
|
|
.finally(() => {
|
|
baseMarkDispatchIdle();
|
|
});
|
|
};
|
|
|
|
return {
|
|
dispatcher,
|
|
replyOptions: {
|
|
...replyOptions,
|
|
...(streamController.hasStream()
|
|
? {
|
|
onPartialReply: (payload: { text?: string }) =>
|
|
streamController.onPartialReply(payload),
|
|
}
|
|
: {}),
|
|
disableBlockStreaming:
|
|
typeof msteamsCfg?.blockStreaming === "boolean" ? !msteamsCfg.blockStreaming : undefined,
|
|
onModelSelected,
|
|
},
|
|
markDispatchIdle,
|
|
};
|
|
}
|