fix: contain block reply media failures

This commit is contained in:
Ayaan Zaidi
2026-03-07 10:28:32 +05:30
committed by Ayaan Zaidi
parent 77ef672468
commit 059aedeb08
5 changed files with 160 additions and 4 deletions

View File

@@ -0,0 +1,57 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
createSubscribedSessionHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
emitMessageStartAndEndForAssistantText,
} from "./pi-embedded-subscribe.e2e-harness.js";
const waitForAsyncCallbacks = async () => {
await Promise.resolve();
await new Promise((resolve) => setTimeout(resolve, 0));
};
describe("subscribeEmbeddedPiSession block reply rejections", () => {
const unhandledRejections: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {
unhandledRejections.push(reason);
};
afterEach(() => {
process.off("unhandledRejection", onUnhandledRejection);
unhandledRejections.length = 0;
});
it("contains rejected async text_end block replies", async () => {
process.on("unhandledRejection", onUnhandledRejection);
const onBlockReply = vi.fn().mockRejectedValue(new Error("boom"));
const { emit } = createSubscribedSessionHarness({
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
emitAssistantTextDelta({ emit, delta: "Hello block" });
emitAssistantTextEnd({ emit });
await waitForAsyncCallbacks();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(unhandledRejections).toHaveLength(0);
});
it("contains rejected async message_end block replies", async () => {
process.on("unhandledRejection", onUnhandledRejection);
const onBlockReply = vi.fn().mockRejectedValue(new Error("boom"));
const { emit } = createSubscribedSessionHarness({
runId: "run",
onBlockReply,
blockReplyBreak: "message_end",
});
emitMessageStartAndEndForAssistantText({ emit, text: "Hello block" });
await waitForAsyncCallbacks();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(unhandledRejections).toHaveLength(0);
});
});

View File

@@ -326,6 +326,16 @@ export function handleMessageEnd(
ctx.finalizeAssistantTexts({ text, addedDuringMessage, chunkerHasBuffered });
const onBlockReply = ctx.params.onBlockReply;
const emitBlockReplySafely = (payload: Parameters<NonNullable<typeof onBlockReply>>[0]) => {
if (!onBlockReply) {
return;
}
void Promise.resolve()
.then(() => onBlockReply(payload))
.catch((err) => {
ctx.log.warn(`block reply callback failed: ${String(err)}`);
});
};
const shouldEmitReasoning = Boolean(
ctx.state.includeReasoning &&
formattedReasoning &&
@@ -339,7 +349,7 @@ export function handleMessageEnd(
return;
}
ctx.state.lastReasoningSent = formattedReasoning;
void onBlockReply?.({ text: formattedReasoning, isReasoning: true });
emitBlockReplySafely({ text: formattedReasoning, isReasoning: true });
};
if (shouldEmitReasoningBeforeAnswer) {
@@ -362,7 +372,7 @@ export function handleMessageEnd(
} = splitResult;
// Emit if there's content OR audioAsVoice flag (to propagate the flag).
if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) {
void onBlockReply({
emitBlockReplySafely({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,

View File

@@ -100,6 +100,18 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const pendingMessagingTargets = state.pendingMessagingTargets;
const replyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const emitBlockReplySafely = (
payload: Parameters<NonNullable<SubscribeEmbeddedPiSessionParams["onBlockReply"]>>[0],
) => {
if (!params.onBlockReply) {
return;
}
void Promise.resolve()
.then(() => params.onBlockReply?.(payload))
.catch((err) => {
log.warn(`block reply callback failed: ${String(err)}`);
});
};
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
state.deltaBuffer = "";
@@ -510,7 +522,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) {
return;
}
void params.onBlockReply({
emitBlockReplySafely({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,

View File

@@ -32,6 +32,32 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads[0].mediaUrl).toBe("file:///tmp/photo.jpg");
});
it("normalizes sent media URLs before deduping normalized reply media", async () => {
const normalizeMediaPaths = async (payload: { mediaUrl?: string; mediaUrls?: string[] }) => {
const normalizeMedia = (value?: string) =>
value === "./out/photo.jpg" ? "/tmp/workspace/out/photo.jpg" : value;
return {
...payload,
mediaUrl: normalizeMedia(payload.mediaUrl),
mediaUrls: payload.mediaUrls?.map((value) => normalizeMedia(value) ?? value),
};
};
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello", mediaUrl: "./out/photo.jpg" }],
messagingToolSentMediaUrls: ["./out/photo.jpg"],
normalizeMediaPaths,
});
expect(replyPayloads).toHaveLength(1);
expect(replyPayloads[0]).toMatchObject({
text: "hello",
mediaUrl: undefined,
mediaUrls: undefined,
});
});
it("applies media filter after text filter", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,

View File

@@ -20,6 +20,51 @@ import {
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
async function normalizeSentMediaUrlsForDedupe(params: {
sentMediaUrls: string[];
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
}): Promise<string[]> {
if (params.sentMediaUrls.length === 0 || !params.normalizeMediaPaths) {
return params.sentMediaUrls;
}
const normalizedUrls: string[] = [];
const seen = new Set<string>();
for (const raw of params.sentMediaUrls) {
const trimmed = raw.trim();
if (!trimmed) {
continue;
}
if (!seen.has(trimmed)) {
seen.add(trimmed);
normalizedUrls.push(trimmed);
}
try {
const normalized = await params.normalizeMediaPaths({
mediaUrl: trimmed,
mediaUrls: [trimmed],
});
const normalizedMediaUrls = normalized.mediaUrls?.length
? normalized.mediaUrls
: normalized.mediaUrl
? [normalized.mediaUrl]
: [];
for (const mediaUrl of normalizedMediaUrls) {
const candidate = mediaUrl.trim();
if (!candidate || seen.has(candidate)) {
continue;
}
seen.add(candidate);
normalizedUrls.push(candidate);
}
} catch (err) {
logVerbose(`messaging tool sent-media normalization failed: ${String(err)}`);
}
}
return normalizedUrls;
}
export async function buildReplyPayloads(params: {
payloads: ReplyPayload[];
isHeartbeat: boolean;
@@ -113,6 +158,12 @@ export async function buildReplyPayloads(params: {
// If target metadata is unavailable, keep legacy dedupe behavior.
const dedupeMessagingToolPayloads =
suppressMessagingToolReplies || messagingToolSentTargets.length === 0;
const messagingToolSentMediaUrls = dedupeMessagingToolPayloads
? await normalizeSentMediaUrlsForDedupe({
sentMediaUrls: params.messagingToolSentMediaUrls ?? [],
normalizeMediaPaths: params.normalizeMediaPaths,
})
: (params.messagingToolSentMediaUrls ?? []);
const dedupedPayloads = dedupeMessagingToolPayloads
? filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
@@ -122,7 +173,7 @@ export async function buildReplyPayloads(params: {
const mediaFilteredPayloads = dedupeMessagingToolPayloads
? filterMessagingToolMediaDuplicates({
payloads: dedupedPayloads,
sentMediaUrls: params.messagingToolSentMediaUrls ?? [],
sentMediaUrls: messagingToolSentMediaUrls,
})
: dedupedPayloads;
// Filter out payloads already sent via pipeline or directly during tool flush.