diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index f5460098f9e..a6922d3b763 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -2290,7 +2290,7 @@ describe("dispatchTelegramMessage draft streaming", () => { ); await dispatcherOptions.deliver( setReplyPayloadMetadata({ text: "Repeated block." }, { assistantMessageIndex: 1 }), - { kind: "block", assistantMessageIndex: 1 } as { kind: "block" }, + { kind: "block", assistantMessageIndex: 1 }, ); return { queuedFinal: true }; }, @@ -2740,7 +2740,7 @@ describe("dispatchTelegramMessage draft streaming", () => { { mediaUrls: ["https://example.test/site-a.png"] }, { assistantMessageIndex: 0 }, ), - { kind: "block", assistantMessageIndex: 0 } as { kind: "block" }, + { kind: "block", assistantMessageIndex: 0 }, ); await replyOptions?.onPartialReply?.({ text: "Site B partial" }); return { queuedFinal: true }; diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 3a2410ece2e..f8e178a8a9d 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -1266,10 +1266,7 @@ export const dispatchTelegramMessage = async ({ recomputeQueuedAnswerBlockRotations(); return shouldRotateBeforeDelivery; }; - const dropQueuedAnswerBlockRotation = ( - payload: ReplyPayload, - assistantMessageIndex?: number, - ) => { + const dropQueuedAnswerBlockRotation = (payload: ReplyPayload, assistantMessageIndex?: number) => { let matchIndex = queuedAnswerBlockRotations.findIndex((entry) => queuedAnswerBlockRotationMatchesDelivery(entry, payload, assistantMessageIndex), ); @@ -1293,10 +1290,6 @@ export const dispatchTelegramMessage = async ({ recomputeQueuedAnswerBlockRotations(); } }; - const getReplyDispatchAssistantMessageIndex = (info: object): number | undefined => { - const value = (info as { assistantMessageIndex?: unknown }).assistantMessageIndex; - return typeof value === "number" ? value : undefined; - }; const updateDraftFromPartial = (lane: DraftLaneState, update: DraftPartialTextUpdate) => { const laneStream = lane.stream; if (!laneStream || !update.text) { @@ -1763,10 +1756,7 @@ export const dispatchTelegramMessage = async ({ if (!text?.trim()) { return false; } - // A block skipped by the duplicate-draft dedup was never rendered to its - // own draft update. Force the full delivery path (not the no-op finalize - // fast path) so the preserved intermediate block is materialized as a - // visible draft before the lane rotates for the next message. + // Skipped duplicate blocks must materialize before the next draft takes over. const wasSkippedDuplicate = skippedDuplicateAnswerBlockDraftDelivery; skippedDuplicateAnswerBlockDraftDelivery = false; const deliveredText = answerLane.stream.lastDeliveredText?.(); @@ -1901,10 +1891,7 @@ export const dispatchTelegramMessage = async ({ onBeforeDeliverCancelled: (payload, info) => { if (info.kind === "block") { return enqueueDraftLaneEvent(async () => { - dropQueuedAnswerBlockRotation( - payload, - getReplyDispatchAssistantMessageIndex(info), - ); + dropQueuedAnswerBlockRotation(payload, info.assistantMessageIndex); }); } return undefined; @@ -2021,10 +2008,7 @@ export const dispatchTelegramMessage = async ({ let blockDelivered = false; const hasAnswerSegment = segments.some((segment) => segment.lane === "answer"); if (info.kind === "block" && !hasAnswerSegment) { - dropQueuedAnswerBlockRotation( - effectivePayload, - getReplyDispatchAssistantMessageIndex(info), - ); + dropQueuedAnswerBlockRotation(effectivePayload, info.assistantMessageIndex); } for (const segment of segments) { if ( @@ -2068,14 +2052,14 @@ export const dispatchTelegramMessage = async ({ await prepareAnswerLaneForToolProgress(); } - const ownedByQueuedAnswerBlockRotation = - queuedAnswerBlockRotations.some((entry) => + const ownedByQueuedAnswerBlockRotation = queuedAnswerBlockRotations.some( + (entry) => queuedAnswerBlockRotationMatchesDelivery( entry, effectivePayload, - getReplyDispatchAssistantMessageIndex(info), + info.assistantMessageIndex, ), - ); + ); const skipTextOnlyBlock = streamMode === "partial" && @@ -2090,13 +2074,7 @@ export const dispatchTelegramMessage = async ({ segment.update.text.trimEnd() === answerLane.lastPartialText.trimEnd(); if (skipTextOnlyBlock) { - // Defer the duplicate block: do not emit a redundant draft - // update now. Record it so that if a later rotation (tool - // progress / next assistant message) follows, the skipped - // block is materialized first instead of being lost, and so - // that the dispatch-end finalize can commit it when nothing - // else follows. Re-enable progress-draft state so a - // following tool-progress step can still rotate the lane. + // Keep duplicate blocks available for later rotation/finalization. skippedDuplicateAnswerBlockDraftDelivery = true; lastAnswerBlockPayload = effectivePayload; lastAnswerBlockText = segment.update.text; @@ -2111,7 +2089,7 @@ export const dispatchTelegramMessage = async ({ const preparedAnswerLane = await prepareAnswerLaneForText(); const shouldRotateQueuedBlock = takeQueuedAnswerBlockRotation( effectivePayload, - getReplyDispatchAssistantMessageIndex(info), + info.assistantMessageIndex, ); if (shouldRotateQueuedBlock && !preparedAnswerLane) { await rotateAnswerLaneForNewMessage(); @@ -2220,10 +2198,7 @@ export const dispatchTelegramMessage = async ({ onSkip: (payload, info) => { if (info.kind === "block") { void enqueueDraftLaneEvent(async () => { - dropQueuedAnswerBlockRotation( - payload, - getReplyDispatchAssistantMessageIndex(info), - ); + dropQueuedAnswerBlockRotation(payload, info.assistantMessageIndex); }); } if (payload.isError === true) { diff --git a/src/auto-reply/reply/before-deliver.test.ts b/src/auto-reply/reply/before-deliver.test.ts index d18cd726922..86db874769a 100644 --- a/src/auto-reply/reply/before-deliver.test.ts +++ b/src/auto-reply/reply/before-deliver.test.ts @@ -79,16 +79,14 @@ describe("beforeDeliver in reply dispatcher", () => { }, onBeforeDeliverCancelled: (payload, info) => { cancelled.push({ - assistantMessageIndex: (info as { assistantMessageIndex?: number }) - .assistantMessageIndex, + assistantMessageIndex: info.assistantMessageIndex, kind: info.kind, text: payload.text ?? "", }); }, onError: (err, info) => { errors.push({ - assistantMessageIndex: (info as { assistantMessageIndex?: number }) - .assistantMessageIndex, + assistantMessageIndex: info.assistantMessageIndex, kind: info.kind, message: err instanceof Error ? err.message : String(err), }); @@ -105,9 +103,7 @@ describe("beforeDeliver in reply dispatcher", () => { await dispatcher.waitForIdle(); expect(delivered).toEqual([]); - expect(cancelled).toEqual([ - { assistantMessageIndex: 9, kind: "block", text: "blocked block" }, - ]); + expect(cancelled).toEqual([{ assistantMessageIndex: 9, kind: "block", text: "blocked block" }]); expect(errors).toEqual([ { assistantMessageIndex: 9, kind: "block", message: "pre-delivery failed" }, ]); @@ -145,9 +141,7 @@ describe("beforeDeliver in reply dispatcher", () => { const dispatcher = createReplyDispatcher({ deliver: async (payload, info) => { deliveredMetadata = getReplyPayloadMetadata(payload); - deliveredAssistantMessageIndex = ( - info as { assistantMessageIndex?: unknown } - ).assistantMessageIndex; + deliveredAssistantMessageIndex = info.assistantMessageIndex; }, beforeDeliver: async () => ({ text: "rewritten" }), }); diff --git a/src/auto-reply/reply/block-reply-coalescer.ts b/src/auto-reply/reply/block-reply-coalescer.ts index 992676ab9ce..030ab5516cf 100644 --- a/src/auto-reply/reply/block-reply-coalescer.ts +++ b/src/auto-reply/reply/block-reply-coalescer.ts @@ -96,10 +96,7 @@ export function createBlockReplyCoalescer(params: { isFallbackNotice: bufferIsFallbackNotice, isStatusNotice: bufferIsStatusNotice, }; - const metadataSource = bufferMetadataSource; - const payloadWithMetadata = metadataSource - ? copyReplyPayloadMetadata(metadataSource, payload) - : payload; + const payloadWithMetadata = copyReplyPayloadMetadata(bufferMetadataSource ?? payload, payload); resetBuffer(); await onFlush(payloadWithMetadata); }; @@ -127,9 +124,10 @@ export function createBlockReplyCoalescer(params: { text: mergedText, replyToId: payload.replyToId ?? bufferReplyToId, }; - const metadataMergedPayload = bufferMetadataSource - ? copyReplyPayloadMetadata(bufferMetadataSource, mergedPayload) - : mergedPayload; + const metadataMergedPayload = copyReplyPayloadMetadata( + bufferMetadataSource ?? mergedPayload, + mergedPayload, + ); resetBuffer(); return copyReplyPayloadMetadata(payload, metadataMergedPayload); }; diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 2859b1e9784..d03358d2d53 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -14,6 +14,7 @@ import { normalizeReplyPayload, type NormalizeReplySkipReason } from "./normaliz import type { ReplyDispatchBeforeDeliver, ReplyDispatchKind, + ReplyDispatchRuntimeInfo, ReplyDispatcher, } from "./reply-dispatcher.types.js"; import type { ResponsePrefixContext } from "./response-prefix-template.js"; @@ -23,22 +24,22 @@ export type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.type type ReplyDispatchErrorHandler = ( err: unknown, - info: { kind: ReplyDispatchKind }, + info: ReplyDispatchRuntimeInfo, ) => Promise | void; type ReplyDispatchSkipHandler = ( payload: ReplyPayload, - info: { kind: ReplyDispatchKind; reason: NormalizeReplySkipReason }, + info: ReplyDispatchRuntimeInfo & { reason: NormalizeReplySkipReason }, ) => void; type ReplyDispatchCancelHandler = ( payload: ReplyPayload, - info: { kind: ReplyDispatchKind }, + info: ReplyDispatchRuntimeInfo, ) => Promise | void; type ReplyDispatchDeliverer = ( payload: ReplyPayload, - info: { kind: ReplyDispatchKind }, + info: ReplyDispatchRuntimeInfo, ) => Promise; export type { ReplyDispatchBeforeDeliver }; @@ -47,8 +48,6 @@ const DEFAULT_HUMAN_DELAY_MIN_MS = 800; const DEFAULT_HUMAN_DELAY_MAX_MS = 2500; const silentReplyLogger = createSubsystemLogger("silent-reply/dispatcher"); -type ReplyDispatchRuntimeInfo = { kind: ReplyDispatchKind; assistantMessageIndex?: number }; - function buildReplyDispatchRuntimeInfo( payload: ReplyPayload, kind: ReplyDispatchKind, diff --git a/src/auto-reply/reply/reply-dispatcher.types.ts b/src/auto-reply/reply/reply-dispatcher.types.ts index 7b772bc6100..b870997913e 100644 --- a/src/auto-reply/reply/reply-dispatcher.types.ts +++ b/src/auto-reply/reply/reply-dispatcher.types.ts @@ -3,9 +3,14 @@ import type { ReplyPayload } from "../types.js"; export type ReplyDispatchKind = "tool" | "block" | "final"; +export type ReplyDispatchRuntimeInfo = { + kind: ReplyDispatchKind; + assistantMessageIndex?: number; +}; + export type ReplyDispatchBeforeDeliver = ( payload: ReplyPayload, - info: { kind: ReplyDispatchKind }, + info: ReplyDispatchRuntimeInfo, ) => Promise | ReplyPayload | null; export type ReplyDispatcher = {