fix(telegram): track block media only after successful delivery

Move sentBlockMediaUrls recording from before sends to after successful
delivery completion. If a block send throws, the URL is not recorded,
so final fallback delivery retains the media attachment instead of
incorrectly deduplicating it.

Add regression tests for both success-path dedup and failure-path
media preservation.

Addresses ClawSweeper P2: block-failure → final-media-loss.
This commit is contained in:
Roger Deng
2026-05-08 19:53:42 +08:00
committed by Ayaan Zaidi
parent bc3fd5bf0f
commit 22e564da4b
2 changed files with 105 additions and 11 deletions

View File

@@ -1791,4 +1791,91 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(replies?.[0]?.text?.trim()).toBeTruthy();
expect(replies?.[0]?.text).not.toBe("NO_REPLY");
});
describe("non-streaming media dedup", () => {
const finalDeliveryPayload = () => {
for (const [params] of deliverInboundReplyWithMessageSendContext.mock.calls) {
if (params.info.kind === "final") {
return params.payload;
}
}
throw new Error("missing final delivery");
};
it("deduplicates block-sent media from final reply", async () => {
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: { messageIds: ["101"], visibleReplySent: true },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrls: ["/tmp/cat.jpg"] }, { kind: "block" });
await dispatcherOptions.deliver(
{ text: "Here is the image", mediaUrls: ["/tmp/cat.jpg"] },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(finalDeliveryPayload().mediaUrls).toEqual([]);
});
it("preserves final media when block delivery reports no visible send", async () => {
deliverReplies.mockResolvedValueOnce({ delivered: false });
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: { messageIds: ["101"], visibleReplySent: true },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrls: ["/tmp/cat.jpg"] }, { kind: "block" });
await dispatcherOptions.deliver(
{ text: "Here is the image", mediaUrls: ["/tmp/cat.jpg"] },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(finalDeliveryPayload().mediaUrls).toEqual(["/tmp/cat.jpg"]);
});
it("preserves final media when block delivery fails", async () => {
deliverReplies.mockRejectedValueOnce(new Error("Telegram API error"));
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: { messageIds: ["101"], visibleReplySent: true },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
try {
await dispatcherOptions.deliver({ mediaUrls: ["/tmp/cat.jpg"] }, { kind: "block" });
} catch {}
await dispatcherOptions.deliver(
{ text: "Here is the image", mediaUrls: ["/tmp/cat.jpg"] },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(finalDeliveryPayload().mediaUrls).toEqual(["/tmp/cat.jpg"]);
});
});
});

View File

@@ -1186,15 +1186,6 @@ export const dispatchTelegramMessage = async ({
hadErrorReplyFailureOrSkip = true;
}
// Track media URLs from block replies so final replies
// can skip duplicates (non-streaming MEDIA: dedup).
if (info.kind === "block" && payload.mediaUrls?.length) {
for (const url of payload.mediaUrls) {
sentBlockMediaUrls.add(url);
}
}
// Filter out media already sent via block reply.
const deduped =
info.kind === "final"
? deduplicateBlockSentMedia(payload, sentBlockMediaUrls)
@@ -1251,6 +1242,7 @@ export const dispatchTelegramMessage = async ({
reasoningStepState.resetForNextStep();
};
let blockDelivered = false;
for (const segment of segments) {
if (
segment.lane === "answer" &&
@@ -1285,6 +1277,7 @@ export const dispatchTelegramMessage = async ({
if (info.kind === "final") {
emitPreviewFinalizedHook(result);
}
blockDelivered = blockDelivered || result.kind !== "skipped";
if (segment.lane === "reasoning") {
if (result.kind !== "skipped") {
reasoningStepState.noteReasoningDelivered();
@@ -1296,22 +1289,33 @@ export const dispatchTelegramMessage = async ({
reasoningStepState.resetForNextStep();
}
}
const trackBlockMedia = (delivered: boolean) => {
if (delivered && info.kind === "block" && payload.mediaUrls?.length) {
for (const url of payload.mediaUrls) {
sentBlockMediaUrls.add(url);
}
}
};
if (segments.length > 0) {
trackBlockMedia(blockDelivered);
return;
}
if (split.suppressedReasoningOnly) {
let delivered = false;
if (reply.hasMedia) {
const payloadWithoutSuppressedReasoning =
typeof effectivePayload.text === "string"
? { ...effectivePayload, text: "" }
: effectivePayload;
await sendPayload(payloadWithoutSuppressedReasoning, {
delivered = await sendPayload(payloadWithoutSuppressedReasoning, {
durable: info.kind === "final",
});
}
if (info.kind === "final") {
await flushBufferedFinalAnswer();
}
trackBlockMedia(delivered);
return;
}
@@ -1327,10 +1331,13 @@ export const dispatchTelegramMessage = async ({
}
return;
}
await sendPayload(effectivePayload, { durable: info.kind === "final" });
const delivered = await sendPayload(effectivePayload, {
durable: info.kind === "final",
});
if (info.kind === "final") {
await flushBufferedFinalAnswer();
}
trackBlockMedia(delivered);
},
onSkip: (payload, info) => {
if (payload.isError === true) {