From cd37dbd4e53546d51def23da562e6c2933c3952d Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 30 May 2026 13:38:50 +0200 Subject: [PATCH] refactor: share block reply coalescer enqueue --- src/auto-reply/reply/block-reply-pipeline.ts | 65 +++++++++----------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index c70cd8a40bb..354d0969594 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -235,6 +235,32 @@ export function createBlockReplyPipeline(params: { bufferedPayloadKeys.clear(); }; + const enqueueCoalescedPayload = (payload: ReplyPayload) => { + if (!coalescer) { + return; + } + const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex; + if ( + assistantMessageIndex !== undefined && + bufferedAssistantMessageIndex !== undefined && + assistantMessageIndex !== bufferedAssistantMessageIndex && + coalescer.hasBuffered() + ) { + // Logical assistant blocks must not be merged together by the generic + // coalescer. Force-flush the previous buffered block before starting a + // new assistant-message block. + flushBufferedAssistantBlock(); + } + const payloadKey = createBlockReplyPayloadKey(payload); + if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedKeys.has(payloadKey)) { + return; + } + seenKeys.add(payloadKey); + bufferedKeys.add(payloadKey); + bufferedAssistantMessageIndex = assistantMessageIndex; + coalescer.enqueue(payload); + }; + const enqueue = (payload: ReplyPayload) => { if (aborted) { return; @@ -248,23 +274,7 @@ export function createBlockReplyPipeline(params: { { trimText: true }, ); if (reply.hasMedia && coalescer && !hasNonTextContent) { - const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex; - if ( - assistantMessageIndex !== undefined && - bufferedAssistantMessageIndex !== undefined && - assistantMessageIndex !== bufferedAssistantMessageIndex && - coalescer.hasBuffered() - ) { - flushBufferedAssistantBlock(); - } - const payloadKey = createBlockReplyPayloadKey(payload); - if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedKeys.has(payloadKey)) { - return; - } - seenKeys.add(payloadKey); - bufferedKeys.add(payloadKey); - bufferedAssistantMessageIndex = assistantMessageIndex; - coalescer.enqueue(payload); + enqueueCoalescedPayload(payload); return; } if (reply.hasMedia || hasNonTextContent) { @@ -273,26 +283,7 @@ export function createBlockReplyPipeline(params: { return; } if (coalescer) { - const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex; - if ( - assistantMessageIndex !== undefined && - bufferedAssistantMessageIndex !== undefined && - assistantMessageIndex !== bufferedAssistantMessageIndex && - coalescer.hasBuffered() - ) { - // Logical assistant blocks must not be merged together by the generic - // coalescer. Force-flush the previous buffered block before starting a - // new assistant-message block. - flushBufferedAssistantBlock(); - } - const payloadKey = createBlockReplyPayloadKey(payload); - if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedKeys.has(payloadKey)) { - return; - } - seenKeys.add(payloadKey); - bufferedKeys.add(payloadKey); - bufferedAssistantMessageIndex = assistantMessageIndex; - coalescer.enqueue(payload); + enqueueCoalescedPayload(payload); return; } sendPayload(payload, /* bypassSeenCheck */ false);