From 273973d374c245e5f37c302294c4a07f32278364 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 17:36:09 +0100 Subject: [PATCH] refactor: unify typing dispatch lifecycle and policy boundaries --- extensions/bluebubbles/src/monitor.test.ts | 18 +++++ extensions/feishu/src/bot.test.ts | 19 ++++++ extensions/feishu/src/bot.ts | 48 +++++++------- .../matrix/src/matrix/monitor/handler.ts | 62 +++++++++--------- .../mattermost/src/mattermost/monitor.ts | 48 +++++++------- .../src/monitor-handler/message-handler.ts | 56 ++++++++-------- scripts/check-no-raw-channel-fetch.mjs | 5 +- src/auto-reply/reply/dispatch-from-config.ts | 17 ++--- src/auto-reply/reply/get-reply-run.ts | 16 ++--- src/auto-reply/reply/typing-policy.test.ts | 61 +++++++++++++++++ src/auto-reply/reply/typing-policy.ts | 35 ++++++++++ src/auto-reply/reply/typing.ts | 18 ++--- src/channels/typing-start-guard.test.ts | 65 +++++++++++++++++++ src/channels/typing-start-guard.ts | 63 ++++++++++++++++++ src/channels/typing.ts | 35 ++++------ src/plugin-sdk/fetch-auth.test.ts | 10 +-- src/plugins/runtime/index.ts | 2 + src/plugins/runtime/types.ts | 2 + src/telegram/lane-delivery.ts | 4 +- 19 files changed, 420 insertions(+), 164 deletions(-) create mode 100644 src/auto-reply/reply/typing-policy.test.ts create mode 100644 src/auto-reply/reply/typing-policy.ts create mode 100644 src/channels/typing-start-guard.test.ts create mode 100644 src/channels/typing-start-guard.ts diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index 496d6c36278..00996e6a4c1 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -162,6 +162,24 @@ function createMockRuntime(): PluginRuntime { vi.fn() as unknown as PluginRuntime["channel"]["reply"]["resolveHumanDelayConfig"], dispatchReplyFromConfig: vi.fn() as unknown as PluginRuntime["channel"]["reply"]["dispatchReplyFromConfig"], + withReplyDispatcher: vi.fn( + async ({ + dispatcher, + run, + onSettled, + }: Parameters[0]) => { + try { + return await run(); + } finally { + dispatcher.markComplete(); + try { + await dispatcher.waitForIdle(); + } finally { + await onSettled?.(); + } + } + }, + ) as unknown as PluginRuntime["channel"]["reply"]["withReplyDispatcher"], finalizeInboundContext: vi.fn( (ctx: Record) => ctx, ) as unknown as PluginRuntime["channel"]["reply"]["finalizeInboundContext"], diff --git a/extensions/feishu/src/bot.test.ts b/extensions/feishu/src/bot.test.ts index 7e56c36c411..2679ce8e643 100644 --- a/extensions/feishu/src/bot.test.ts +++ b/extensions/feishu/src/bot.test.ts @@ -90,6 +90,24 @@ describe("handleFeishuMessage command authorization", () => { const mockDispatchReplyFromConfig = vi .fn() .mockResolvedValue({ queuedFinal: false, counts: { final: 1 } }); + const mockWithReplyDispatcher = vi.fn( + async ({ + dispatcher, + run, + onSettled, + }: Parameters[0]) => { + try { + return await run(); + } finally { + dispatcher.markComplete(); + try { + await dispatcher.waitForIdle(); + } finally { + await onSettled?.(); + } + } + }, + ); const mockResolveCommandAuthorizedFromAuthorizers = vi.fn(() => false); const mockShouldComputeCommandAuthorized = vi.fn(() => true); const mockReadAllowFromStore = vi.fn().mockResolvedValue([]); @@ -127,6 +145,7 @@ describe("handleFeishuMessage command authorization", () => { formatAgentEnvelope: vi.fn((params: { body: string }) => params.body), finalizeInboundContext: mockFinalizeInboundContext, dispatchReplyFromConfig: mockDispatchReplyFromConfig, + withReplyDispatcher: mockWithReplyDispatcher, }, commands: { shouldComputeCommandAuthorized: mockShouldComputeCommandAuthorized, diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index debbece77c8..37c22da2578 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -943,33 +943,31 @@ export async function handleFeishuMessage(params: { }); log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`); - try { - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions, - }); - - if (isGroup && historyKey && chatHistories) { - clearHistoryEntriesIfEnabled({ - historyMap: chatHistories, - historyKey, - limit: historyLimit, - }); - } - - log( - `feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`, - ); - } finally { - dispatcher.markComplete(); - try { - await dispatcher.waitForIdle(); - } finally { + const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { markDispatchIdle(); - } + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions, + }), + }); + + if (isGroup && historyKey && chatHistories) { + clearHistoryEntriesIfEnabled({ + historyMap: chatHistories, + historyKey, + limit: historyLimit, + }); } + + log( + `feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`, + ); } catch (err) { error(`feishu[${account.accountId}]: failed to dispatch message: ${String(err)}`); } diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index b9791b4063f..8907c9c033e 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -655,39 +655,37 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); - try { - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: roomConfig?.skills, - onModelSelected, - }, - }); - if (!queuedFinal) { - return; - } - didSendReply = true; - const finalCount = counts.final; - logVerboseMessage( - `matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, - ); - if (didSendReply) { - const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160); - core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, { - sessionKey: route.sessionKey, - contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`, - }); - } - } finally { - dispatcher.markComplete(); - try { - await dispatcher.waitForIdle(); - } finally { + const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { markDispatchIdle(); - } + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: roomConfig?.skills, + onModelSelected, + }, + }), + }); + if (!queuedFinal) { + return; + } + didSendReply = true; + const finalCount = counts.final; + logVerboseMessage( + `matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, + ); + if (didSendReply) { + const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160); + core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, { + sessionKey: route.sessionKey, + contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`, + }); } } catch (err) { runtime.error?.(`matrix handler failed: ${String(err)}`); diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index b84b5dae9a8..4aeca9eb212 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -772,32 +772,30 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }); - try { - await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, - onModelSelected, - }, - }); - if (historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, - }); - } - } finally { - dispatcher.markComplete(); - try { - await dispatcher.waitForIdle(); - } finally { + await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { markDispatchIdle(); - } + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + onModelSelected, + }, + }), + }); + if (historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: channelHistories, + historyKey, + limit: historyLimit, + }); } }; diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index af9d860901b..bba6d16acb5 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -533,30 +533,23 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { log.info("dispatching to agent", { sessionKey: route.sessionKey }); try { - try { - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions, - }); + const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions, + }), + }); - log.info("dispatch complete", { queuedFinal, counts }); + log.info("dispatch complete", { queuedFinal, counts }); - if (!queuedFinal) { - if (isRoomish && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: conversationHistories, - historyKey, - limit: historyLimit, - }); - } - return; - } - const finalCount = counts.final; - logVerboseMessage( - `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, - ); + if (!queuedFinal) { if (isRoomish && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: conversationHistories, @@ -564,13 +557,18 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { limit: historyLimit, }); } - } finally { - dispatcher.markComplete(); - try { - await dispatcher.waitForIdle(); - } finally { - markDispatchIdle(); - } + return; + } + const finalCount = counts.final; + logVerboseMessage( + `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, + ); + if (isRoomish && historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: conversationHistories, + historyKey, + limit: historyLimit, + }); } } catch (err) { log.error("dispatch failed", { error: String(err) }); diff --git a/scripts/check-no-raw-channel-fetch.mjs b/scripts/check-no-raw-channel-fetch.mjs index 639c698a3f3..56008b3f1d8 100644 --- a/scripts/check-no-raw-channel-fetch.mjs +++ b/scripts/check-no-raw-channel-fetch.mjs @@ -40,7 +40,7 @@ const allowedRawFetchCallsites = new Set([ "extensions/matrix/src/directory-live.ts:41", "extensions/matrix/src/matrix/client/config.ts:171", "extensions/mattermost/src/mattermost/client.ts:211", - "extensions/mattermost/src/mattermost/monitor.ts:234", + "extensions/mattermost/src/mattermost/monitor.ts:230", "extensions/mattermost/src/mattermost/probe.ts:27", "extensions/minimax-portal-auth/oauth.ts:71", "extensions/minimax-portal-auth/oauth.ts:112", @@ -89,6 +89,9 @@ async function collectTypeScriptFiles(targetPath) { for (const entry of entries) { const entryPath = path.join(targetPath, entry.name); if (entry.isDirectory()) { + if (entry.name === "node_modules") { + continue; + } files.push(...(await collectTypeScriptFiles(entryPath))); continue; } diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index ee6ac1791be..ff42ff2d81b 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -22,6 +22,7 @@ import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; import { shouldSuppressReasoningPayload } from "./reply-payloads.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; +import { resolveRunTypingPolicy } from "./typing-policy.js"; const AUDIO_PLACEHOLDER_RE = /^(\s*\([^)]*\))?$/i; const AUDIO_HEADER_RE = /^\[Audio\b/i; @@ -395,19 +396,19 @@ export async function dispatchReplyFromConfig(params: { } return { ...payload, text: undefined }; }; + const typing = resolveRunTypingPolicy({ + requestedPolicy: params.replyOptions?.typingPolicy, + suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping, + originatingChannel, + systemEvent: shouldRouteToOriginating, + }); const replyResult = await (params.replyResolver ?? getReplyFromConfig)( ctx, { ...params.replyOptions, - typingPolicy: - params.replyOptions?.typingPolicy ?? - (originatingChannel === INTERNAL_MESSAGE_CHANNEL - ? "internal_webchat" - : shouldRouteToOriginating - ? "system_event" - : undefined), - suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping, + typingPolicy: typing.typingPolicy, + suppressTyping: typing.suppressTyping, onToolResult: (payload: ReplyPayload) => { const run = async () => { const ttsPayload = await maybeApplyTtsToPayload({ diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 4363efc94f3..1df105427f7 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -18,7 +18,6 @@ import { import { logVerbose } from "../../globals.js"; import { clearCommandLane, getQueueSize } from "../../process/command-queue.js"; import { normalizeMainKey } from "../../routing/session-key.js"; -import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { hasControlCommand } from "../command-detection.js"; import { buildInboundMediaNote } from "../media-note.js"; @@ -47,6 +46,7 @@ import { routeReply } from "./route-reply.js"; import { BARE_SESSION_RESET_PROMPT } from "./session-reset-prompt.js"; import { ensureSkillSnapshot, prependSystemEvents } from "./session-updates.js"; import { resolveTypingMode } from "./typing-mode.js"; +import { resolveRunTypingPolicy } from "./typing-policy.js"; import type { TypingController } from "./typing.js"; import { appendUntrustedContext } from "./untrusted-context.js"; @@ -234,14 +234,12 @@ export async function runPreparedReply( const isGroupChat = sessionCtx.ChatType === "group"; const wasMentioned = ctx.WasMentioned === true; const isHeartbeat = opts?.isHeartbeat === true; - const typingPolicy = - opts?.typingPolicy ?? - (isHeartbeat - ? "heartbeat" - : ctx.OriginatingChannel === INTERNAL_MESSAGE_CHANNEL - ? "internal_webchat" - : "auto"); - const suppressTyping = opts?.suppressTyping === true; + const { typingPolicy, suppressTyping } = resolveRunTypingPolicy({ + requestedPolicy: opts?.typingPolicy, + suppressTyping: opts?.suppressTyping === true, + isHeartbeat, + originatingChannel: ctx.OriginatingChannel, + }); const typingMode = resolveTypingMode({ configured: sessionCfg?.typingMode ?? agentCfg?.typingMode, isGroupChat, diff --git a/src/auto-reply/reply/typing-policy.test.ts b/src/auto-reply/reply/typing-policy.test.ts new file mode 100644 index 00000000000..62854e84cea --- /dev/null +++ b/src/auto-reply/reply/typing-policy.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it } from "vitest"; +import { resolveRunTypingPolicy } from "./typing-policy.js"; + +describe("resolveRunTypingPolicy", () => { + it("forces heartbeat policy for heartbeat runs", () => { + const resolved = resolveRunTypingPolicy({ + requestedPolicy: "user_message", + isHeartbeat: true, + }); + expect(resolved).toEqual({ + typingPolicy: "heartbeat", + suppressTyping: true, + }); + }); + + it("forces internal webchat policy", () => { + const resolved = resolveRunTypingPolicy({ + requestedPolicy: "user_message", + originatingChannel: "webchat", + }); + expect(resolved).toEqual({ + typingPolicy: "internal_webchat", + suppressTyping: true, + }); + }); + + it("forces system event policy for routed turns", () => { + const resolved = resolveRunTypingPolicy({ + requestedPolicy: "user_message", + systemEvent: true, + originatingChannel: "telegram", + }); + expect(resolved).toEqual({ + typingPolicy: "system_event", + suppressTyping: true, + }); + }); + + it("preserves requested policy for regular user turns", () => { + const resolved = resolveRunTypingPolicy({ + requestedPolicy: "user_message", + originatingChannel: "telegram", + }); + expect(resolved).toEqual({ + typingPolicy: "user_message", + suppressTyping: false, + }); + }); + + it("respects explicit suppressTyping", () => { + const resolved = resolveRunTypingPolicy({ + requestedPolicy: "auto", + originatingChannel: "telegram", + suppressTyping: true, + }); + expect(resolved).toEqual({ + typingPolicy: "auto", + suppressTyping: true, + }); + }); +}); diff --git a/src/auto-reply/reply/typing-policy.ts b/src/auto-reply/reply/typing-policy.ts new file mode 100644 index 00000000000..db1688f3030 --- /dev/null +++ b/src/auto-reply/reply/typing-policy.ts @@ -0,0 +1,35 @@ +import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; +import type { TypingPolicy } from "../types.js"; + +export type ResolveRunTypingPolicyParams = { + requestedPolicy?: TypingPolicy; + suppressTyping?: boolean; + isHeartbeat?: boolean; + originatingChannel?: string; + systemEvent?: boolean; +}; + +export type ResolvedRunTypingPolicy = { + typingPolicy: TypingPolicy; + suppressTyping: boolean; +}; + +export function resolveRunTypingPolicy( + params: ResolveRunTypingPolicyParams, +): ResolvedRunTypingPolicy { + const typingPolicy = params.isHeartbeat + ? "heartbeat" + : params.originatingChannel === INTERNAL_MESSAGE_CHANNEL + ? "internal_webchat" + : params.systemEvent + ? "system_event" + : (params.requestedPolicy ?? "auto"); + + const suppressTyping = + params.suppressTyping === true || + typingPolicy === "heartbeat" || + typingPolicy === "system_event" || + typingPolicy === "internal_webchat"; + + return { typingPolicy, suppressTyping }; +} diff --git a/src/auto-reply/reply/typing.ts b/src/auto-reply/reply/typing.ts index 43174024606..ea2cad42772 100644 --- a/src/auto-reply/reply/typing.ts +++ b/src/auto-reply/reply/typing.ts @@ -1,4 +1,5 @@ import { createTypingKeepaliveLoop } from "../../channels/typing-lifecycle.js"; +import { createTypingStartGuard } from "../../channels/typing-start-guard.js"; import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; export type TypingController = { @@ -99,15 +100,16 @@ export function createTypingController(params: { const isActive = () => active && !sealed; + const startGuard = createTypingStartGuard({ + isSealed: () => sealed, + shouldBlock: () => runComplete, + rethrowOnError: true, + }); + const triggerTyping = async () => { - if (sealed) { - return; - } - // Late callbacks after a run completed should never restart typing. - if (runComplete) { - return; - } - await onReplyStart?.(); + await startGuard.run(async () => { + await onReplyStart?.(); + }); }; const typingLoop = createTypingKeepaliveLoop({ diff --git a/src/channels/typing-start-guard.test.ts b/src/channels/typing-start-guard.test.ts new file mode 100644 index 00000000000..b9104c19042 --- /dev/null +++ b/src/channels/typing-start-guard.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTypingStartGuard } from "./typing-start-guard.js"; + +describe("createTypingStartGuard", () => { + it("skips starts when sealed", async () => { + const start = vi.fn(); + const guard = createTypingStartGuard({ + isSealed: () => true, + }); + + const result = await guard.run(start); + expect(result).toBe("skipped"); + expect(start).not.toHaveBeenCalled(); + }); + + it("trips breaker after max consecutive failures", async () => { + const onStartError = vi.fn(); + const onTrip = vi.fn(); + const guard = createTypingStartGuard({ + isSealed: () => false, + onStartError, + onTrip, + maxConsecutiveFailures: 2, + }); + const start = vi.fn().mockRejectedValue(new Error("fail")); + + const first = await guard.run(start); + const second = await guard.run(start); + const third = await guard.run(start); + + expect(first).toBe("failed"); + expect(second).toBe("tripped"); + expect(third).toBe("skipped"); + expect(onStartError).toHaveBeenCalledTimes(2); + expect(onTrip).toHaveBeenCalledTimes(1); + }); + + it("resets breaker state", async () => { + const guard = createTypingStartGuard({ + isSealed: () => false, + maxConsecutiveFailures: 1, + }); + const failStart = vi.fn().mockRejectedValue(new Error("fail")); + const okStart = vi.fn().mockResolvedValue(undefined); + + const trip = await guard.run(failStart); + expect(trip).toBe("tripped"); + expect(guard.isTripped()).toBe(true); + + guard.reset(); + const started = await guard.run(okStart); + expect(started).toBe("started"); + expect(guard.isTripped()).toBe(false); + }); + + it("rethrows start errors when configured", async () => { + const guard = createTypingStartGuard({ + isSealed: () => false, + rethrowOnError: true, + }); + const start = vi.fn().mockRejectedValue(new Error("boom")); + + await expect(guard.run(start)).rejects.toThrow("boom"); + }); +}); diff --git a/src/channels/typing-start-guard.ts b/src/channels/typing-start-guard.ts new file mode 100644 index 00000000000..06a345d412e --- /dev/null +++ b/src/channels/typing-start-guard.ts @@ -0,0 +1,63 @@ +export type TypingStartGuard = { + run: (start: () => Promise | void) => Promise<"started" | "skipped" | "failed" | "tripped">; + reset: () => void; + isTripped: () => boolean; +}; + +export function createTypingStartGuard(params: { + isSealed: () => boolean; + shouldBlock?: () => boolean; + onStartError?: (err: unknown) => void; + maxConsecutiveFailures?: number; + onTrip?: () => void; + rethrowOnError?: boolean; +}): TypingStartGuard { + const maxConsecutiveFailures = + typeof params.maxConsecutiveFailures === "number" && params.maxConsecutiveFailures > 0 + ? Math.floor(params.maxConsecutiveFailures) + : undefined; + let consecutiveFailures = 0; + let tripped = false; + + const isBlocked = () => { + if (params.isSealed()) { + return true; + } + if (tripped) { + return true; + } + return params.shouldBlock?.() === true; + }; + + const run: TypingStartGuard["run"] = async (start) => { + if (isBlocked()) { + return "skipped"; + } + try { + await start(); + consecutiveFailures = 0; + return "started"; + } catch (err) { + consecutiveFailures += 1; + params.onStartError?.(err); + if (params.rethrowOnError) { + throw err; + } + if (maxConsecutiveFailures && consecutiveFailures >= maxConsecutiveFailures) { + tripped = true; + params.onTrip?.(); + return "tripped"; + } + return "failed"; + } + }; + + return { + run, + reset: () => { + consecutiveFailures = 0; + tripped = false; + }, + isTripped: () => tripped, + }; +} diff --git a/src/channels/typing.ts b/src/channels/typing.ts index 125f252dd7d..5d2a5c2e100 100644 --- a/src/channels/typing.ts +++ b/src/channels/typing.ts @@ -1,4 +1,5 @@ import { createTypingKeepaliveLoop } from "./typing-lifecycle.js"; +import { createTypingStartGuard } from "./typing-start-guard.js"; export type TypingCallbacks = { onReplyStart: () => Promise; @@ -26,28 +27,19 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi const maxDurationMs = params.maxDurationMs ?? 60_000; // Default 60s TTL let stopSent = false; let closed = false; - let consecutiveFailures = 0; - let breakerTripped = false; let ttlTimer: ReturnType | undefined; + const startGuard = createTypingStartGuard({ + isSealed: () => closed, + onStartError: params.onStartError, + maxConsecutiveFailures, + onTrip: () => { + keepaliveLoop.stop(); + }, + }); + const fireStart = async (): Promise => { - if (closed) { - return; - } - if (breakerTripped) { - return; - } - try { - await params.start(); - consecutiveFailures = 0; - } catch (err) { - consecutiveFailures += 1; - params.onStartError(err); - if (consecutiveFailures >= maxConsecutiveFailures) { - breakerTripped = true; - keepaliveLoop.stop(); - } - } + await startGuard.run(() => params.start()); }; const keepaliveLoop = createTypingKeepaliveLoop({ @@ -81,12 +73,11 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi return; } stopSent = false; - breakerTripped = false; - consecutiveFailures = 0; + startGuard.reset(); keepaliveLoop.stop(); clearTtlTimer(); await fireStart(); - if (breakerTripped) { + if (startGuard.isTripped()) { return; } keepaliveLoop.start(); diff --git a/src/plugin-sdk/fetch-auth.test.ts b/src/plugin-sdk/fetch-auth.test.ts index cc401cc1a3d..abf4aac80c2 100644 --- a/src/plugin-sdk/fetch-auth.test.ts +++ b/src/plugin-sdk/fetch-auth.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it, vi } from "vitest"; import { fetchWithBearerAuthScopeFallback } from "./fetch-auth.js"; +const asFetch = (fn: unknown): typeof fetch => fn as typeof fetch; + describe("fetchWithBearerAuthScopeFallback", () => { it("rejects non-https urls when https is required", async () => { await expect( @@ -19,7 +21,7 @@ describe("fetchWithBearerAuthScopeFallback", () => { const response = await fetchWithBearerAuthScopeFallback({ url: "https://example.com/file", scopes: ["https://graph.microsoft.com"], - fetchFn, + fetchFn: asFetch(fetchFn), tokenProvider, }); @@ -38,7 +40,7 @@ describe("fetchWithBearerAuthScopeFallback", () => { const response = await fetchWithBearerAuthScopeFallback({ url: "https://graph.microsoft.com/v1.0/me", scopes: ["https://graph.microsoft.com", "https://api.botframework.com"], - fetchFn, + fetchFn: asFetch(fetchFn), tokenProvider, }); @@ -57,7 +59,7 @@ describe("fetchWithBearerAuthScopeFallback", () => { const response = await fetchWithBearerAuthScopeFallback({ url: "https://example.com/file", scopes: ["https://graph.microsoft.com"], - fetchFn, + fetchFn: asFetch(fetchFn), tokenProvider, shouldAttachAuth: () => false, }); @@ -82,7 +84,7 @@ describe("fetchWithBearerAuthScopeFallback", () => { const response = await fetchWithBearerAuthScopeFallback({ url: "https://graph.microsoft.com/v1.0/me", scopes: ["https://first.example", "https://second.example"], - fetchFn, + fetchFn: asFetch(fetchFn), tokenProvider, }); diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index edfae611e7f..aa29294f7e3 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -17,6 +17,7 @@ import { shouldComputeCommandAuthorized, } from "../../auto-reply/command-detection.js"; import { shouldHandleTextCommands } from "../../auto-reply/commands-registry.js"; +import { withReplyDispatcher } from "../../auto-reply/dispatch.js"; import { formatAgentEnvelope, formatInboundEnvelope, @@ -304,6 +305,7 @@ function createRuntimeChannel(): PluginRuntime["channel"] { resolveEffectiveMessagesConfig, resolveHumanDelayConfig, dispatchReplyFromConfig, + withReplyDispatcher, finalizeInboundContext, formatAgentEnvelope, /** @deprecated Prefer `BodyForAgent` + structured user-context blocks (do not build plaintext envelopes for prompts). */ diff --git a/src/plugins/runtime/types.ts b/src/plugins/runtime/types.ts index 71b85d6f12a..0e2c20cf73f 100644 --- a/src/plugins/runtime/types.ts +++ b/src/plugins/runtime/types.ts @@ -55,6 +55,7 @@ type ShouldHandleTextCommands = typeof import("../../auto-reply/commands-registry.js").shouldHandleTextCommands; type DispatchReplyFromConfig = typeof import("../../auto-reply/reply/dispatch-from-config.js").dispatchReplyFromConfig; +type WithReplyDispatcher = typeof import("../../auto-reply/dispatch.js").withReplyDispatcher; type FinalizeInboundContext = typeof import("../../auto-reply/reply/inbound-context.js").finalizeInboundContext; type FormatAgentEnvelope = typeof import("../../auto-reply/envelope.js").formatAgentEnvelope; @@ -222,6 +223,7 @@ export type PluginRuntime = { resolveEffectiveMessagesConfig: ResolveEffectiveMessagesConfig; resolveHumanDelayConfig: ResolveHumanDelayConfig; dispatchReplyFromConfig: DispatchReplyFromConfig; + withReplyDispatcher: WithReplyDispatcher; finalizeInboundContext: FinalizeInboundContext; formatAgentEnvelope: FormatAgentEnvelope; /** @deprecated Prefer `BodyForAgent` + structured user-context blocks (do not build plaintext envelopes for prompts). */ diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts index 80fbe180cc2..890a2a5ec97 100644 --- a/src/telegram/lane-delivery.ts +++ b/src/telegram/lane-delivery.ts @@ -111,8 +111,10 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { hadPreviewMessage: boolean; }): boolean => { const currentPreviewText = args.currentPreviewText; + if (currentPreviewText === undefined) { + return false; + } return ( - currentPreviewText !== undefined && currentPreviewText.startsWith(args.text) && args.text.length < currentPreviewText.length && (args.skipRegressive === "always" || args.hadPreviewMessage)