refactor(telegram): distill streamed block rotation cleanup

This commit is contained in:
Ayaan Zaidi
2026-06-08 09:09:46 +00:00
parent 4f31967141
commit 5b0061e7a2
6 changed files with 33 additions and 62 deletions

View File

@@ -2290,7 +2290,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
await dispatcherOptions.deliver(
setReplyPayloadMetadata({ text: "Repeated block." }, { assistantMessageIndex: 1 }),
{ kind: "block", assistantMessageIndex: 1 } as { kind: "block" },
{ kind: "block", assistantMessageIndex: 1 },
);
return { queuedFinal: true };
},
@@ -2740,7 +2740,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
{ mediaUrls: ["https://example.test/site-a.png"] },
{ assistantMessageIndex: 0 },
),
{ kind: "block", assistantMessageIndex: 0 } as { kind: "block" },
{ kind: "block", assistantMessageIndex: 0 },
);
await replyOptions?.onPartialReply?.({ text: "Site B partial" });
return { queuedFinal: true };

View File

@@ -1266,10 +1266,7 @@ export const dispatchTelegramMessage = async ({
recomputeQueuedAnswerBlockRotations();
return shouldRotateBeforeDelivery;
};
const dropQueuedAnswerBlockRotation = (
payload: ReplyPayload,
assistantMessageIndex?: number,
) => {
const dropQueuedAnswerBlockRotation = (payload: ReplyPayload, assistantMessageIndex?: number) => {
let matchIndex = queuedAnswerBlockRotations.findIndex((entry) =>
queuedAnswerBlockRotationMatchesDelivery(entry, payload, assistantMessageIndex),
);
@@ -1293,10 +1290,6 @@ export const dispatchTelegramMessage = async ({
recomputeQueuedAnswerBlockRotations();
}
};
const getReplyDispatchAssistantMessageIndex = (info: object): number | undefined => {
const value = (info as { assistantMessageIndex?: unknown }).assistantMessageIndex;
return typeof value === "number" ? value : undefined;
};
const updateDraftFromPartial = (lane: DraftLaneState, update: DraftPartialTextUpdate) => {
const laneStream = lane.stream;
if (!laneStream || !update.text) {
@@ -1763,10 +1756,7 @@ export const dispatchTelegramMessage = async ({
if (!text?.trim()) {
return false;
}
// A block skipped by the duplicate-draft dedup was never rendered to its
// own draft update. Force the full delivery path (not the no-op finalize
// fast path) so the preserved intermediate block is materialized as a
// visible draft before the lane rotates for the next message.
// Skipped duplicate blocks must materialize before the next draft takes over.
const wasSkippedDuplicate = skippedDuplicateAnswerBlockDraftDelivery;
skippedDuplicateAnswerBlockDraftDelivery = false;
const deliveredText = answerLane.stream.lastDeliveredText?.();
@@ -1901,10 +1891,7 @@ export const dispatchTelegramMessage = async ({
onBeforeDeliverCancelled: (payload, info) => {
if (info.kind === "block") {
return enqueueDraftLaneEvent(async () => {
dropQueuedAnswerBlockRotation(
payload,
getReplyDispatchAssistantMessageIndex(info),
);
dropQueuedAnswerBlockRotation(payload, info.assistantMessageIndex);
});
}
return undefined;
@@ -2021,10 +2008,7 @@ export const dispatchTelegramMessage = async ({
let blockDelivered = false;
const hasAnswerSegment = segments.some((segment) => segment.lane === "answer");
if (info.kind === "block" && !hasAnswerSegment) {
dropQueuedAnswerBlockRotation(
effectivePayload,
getReplyDispatchAssistantMessageIndex(info),
);
dropQueuedAnswerBlockRotation(effectivePayload, info.assistantMessageIndex);
}
for (const segment of segments) {
if (
@@ -2068,14 +2052,14 @@ export const dispatchTelegramMessage = async ({
await prepareAnswerLaneForToolProgress();
}
const ownedByQueuedAnswerBlockRotation =
queuedAnswerBlockRotations.some((entry) =>
const ownedByQueuedAnswerBlockRotation = queuedAnswerBlockRotations.some(
(entry) =>
queuedAnswerBlockRotationMatchesDelivery(
entry,
effectivePayload,
getReplyDispatchAssistantMessageIndex(info),
info.assistantMessageIndex,
),
);
);
const skipTextOnlyBlock =
streamMode === "partial" &&
@@ -2090,13 +2074,7 @@ export const dispatchTelegramMessage = async ({
segment.update.text.trimEnd() === answerLane.lastPartialText.trimEnd();
if (skipTextOnlyBlock) {
// Defer the duplicate block: do not emit a redundant draft
// update now. Record it so that if a later rotation (tool
// progress / next assistant message) follows, the skipped
// block is materialized first instead of being lost, and so
// that the dispatch-end finalize can commit it when nothing
// else follows. Re-enable progress-draft state so a
// following tool-progress step can still rotate the lane.
// Keep duplicate blocks available for later rotation/finalization.
skippedDuplicateAnswerBlockDraftDelivery = true;
lastAnswerBlockPayload = effectivePayload;
lastAnswerBlockText = segment.update.text;
@@ -2111,7 +2089,7 @@ export const dispatchTelegramMessage = async ({
const preparedAnswerLane = await prepareAnswerLaneForText();
const shouldRotateQueuedBlock = takeQueuedAnswerBlockRotation(
effectivePayload,
getReplyDispatchAssistantMessageIndex(info),
info.assistantMessageIndex,
);
if (shouldRotateQueuedBlock && !preparedAnswerLane) {
await rotateAnswerLaneForNewMessage();
@@ -2220,10 +2198,7 @@ export const dispatchTelegramMessage = async ({
onSkip: (payload, info) => {
if (info.kind === "block") {
void enqueueDraftLaneEvent(async () => {
dropQueuedAnswerBlockRotation(
payload,
getReplyDispatchAssistantMessageIndex(info),
);
dropQueuedAnswerBlockRotation(payload, info.assistantMessageIndex);
});
}
if (payload.isError === true) {

View File

@@ -79,16 +79,14 @@ describe("beforeDeliver in reply dispatcher", () => {
},
onBeforeDeliverCancelled: (payload, info) => {
cancelled.push({
assistantMessageIndex: (info as { assistantMessageIndex?: number })
.assistantMessageIndex,
assistantMessageIndex: info.assistantMessageIndex,
kind: info.kind,
text: payload.text ?? "",
});
},
onError: (err, info) => {
errors.push({
assistantMessageIndex: (info as { assistantMessageIndex?: number })
.assistantMessageIndex,
assistantMessageIndex: info.assistantMessageIndex,
kind: info.kind,
message: err instanceof Error ? err.message : String(err),
});
@@ -105,9 +103,7 @@ describe("beforeDeliver in reply dispatcher", () => {
await dispatcher.waitForIdle();
expect(delivered).toEqual([]);
expect(cancelled).toEqual([
{ assistantMessageIndex: 9, kind: "block", text: "blocked block" },
]);
expect(cancelled).toEqual([{ assistantMessageIndex: 9, kind: "block", text: "blocked block" }]);
expect(errors).toEqual([
{ assistantMessageIndex: 9, kind: "block", message: "pre-delivery failed" },
]);
@@ -145,9 +141,7 @@ describe("beforeDeliver in reply dispatcher", () => {
const dispatcher = createReplyDispatcher({
deliver: async (payload, info) => {
deliveredMetadata = getReplyPayloadMetadata(payload);
deliveredAssistantMessageIndex = (
info as { assistantMessageIndex?: unknown }
).assistantMessageIndex;
deliveredAssistantMessageIndex = info.assistantMessageIndex;
},
beforeDeliver: async () => ({ text: "rewritten" }),
});

View File

@@ -96,10 +96,7 @@ export function createBlockReplyCoalescer(params: {
isFallbackNotice: bufferIsFallbackNotice,
isStatusNotice: bufferIsStatusNotice,
};
const metadataSource = bufferMetadataSource;
const payloadWithMetadata = metadataSource
? copyReplyPayloadMetadata(metadataSource, payload)
: payload;
const payloadWithMetadata = copyReplyPayloadMetadata(bufferMetadataSource ?? payload, payload);
resetBuffer();
await onFlush(payloadWithMetadata);
};
@@ -127,9 +124,10 @@ export function createBlockReplyCoalescer(params: {
text: mergedText,
replyToId: payload.replyToId ?? bufferReplyToId,
};
const metadataMergedPayload = bufferMetadataSource
? copyReplyPayloadMetadata(bufferMetadataSource, mergedPayload)
: mergedPayload;
const metadataMergedPayload = copyReplyPayloadMetadata(
bufferMetadataSource ?? mergedPayload,
mergedPayload,
);
resetBuffer();
return copyReplyPayloadMetadata(payload, metadataMergedPayload);
};

View File

@@ -14,6 +14,7 @@ import { normalizeReplyPayload, type NormalizeReplySkipReason } from "./normaliz
import type {
ReplyDispatchBeforeDeliver,
ReplyDispatchKind,
ReplyDispatchRuntimeInfo,
ReplyDispatcher,
} from "./reply-dispatcher.types.js";
import type { ResponsePrefixContext } from "./response-prefix-template.js";
@@ -23,22 +24,22 @@ export type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.type
type ReplyDispatchErrorHandler = (
err: unknown,
info: { kind: ReplyDispatchKind },
info: ReplyDispatchRuntimeInfo,
) => Promise<void> | void;
type ReplyDispatchSkipHandler = (
payload: ReplyPayload,
info: { kind: ReplyDispatchKind; reason: NormalizeReplySkipReason },
info: ReplyDispatchRuntimeInfo & { reason: NormalizeReplySkipReason },
) => void;
type ReplyDispatchCancelHandler = (
payload: ReplyPayload,
info: { kind: ReplyDispatchKind },
info: ReplyDispatchRuntimeInfo,
) => Promise<void> | void;
type ReplyDispatchDeliverer = (
payload: ReplyPayload,
info: { kind: ReplyDispatchKind },
info: ReplyDispatchRuntimeInfo,
) => Promise<unknown>;
export type { ReplyDispatchBeforeDeliver };
@@ -47,8 +48,6 @@ const DEFAULT_HUMAN_DELAY_MIN_MS = 800;
const DEFAULT_HUMAN_DELAY_MAX_MS = 2500;
const silentReplyLogger = createSubsystemLogger("silent-reply/dispatcher");
type ReplyDispatchRuntimeInfo = { kind: ReplyDispatchKind; assistantMessageIndex?: number };
function buildReplyDispatchRuntimeInfo(
payload: ReplyPayload,
kind: ReplyDispatchKind,

View File

@@ -3,9 +3,14 @@ import type { ReplyPayload } from "../types.js";
export type ReplyDispatchKind = "tool" | "block" | "final";
export type ReplyDispatchRuntimeInfo = {
kind: ReplyDispatchKind;
assistantMessageIndex?: number;
};
export type ReplyDispatchBeforeDeliver = (
payload: ReplyPayload,
info: { kind: ReplyDispatchKind },
info: ReplyDispatchRuntimeInfo,
) => Promise<ReplyPayload | null> | ReplyPayload | null;
export type ReplyDispatcher = {