From bd91107fc6c7757c8b080cf6c79fcdf509fd3862 Mon Sep 17 00:00:00 2001 From: Cavit Erginsoy Date: Fri, 22 May 2026 22:40:22 +0100 Subject: [PATCH] Fix foreground reply fence visibility --- .../src/auto-reply/deliver-reply.test.ts | 24 ++ .../whatsapp/src/auto-reply/deliver-reply.ts | 39 ++- .../monitor/inbound-dispatch.test.ts | 248 +++++++++---- .../auto-reply/monitor/inbound-dispatch.ts | 161 ++++++--- src/auto-reply/dispatch.freshness.test.ts | 330 +++++++++++++++++- src/auto-reply/dispatch.ts | 180 +++++++++- src/auto-reply/reply/reply-dispatcher.ts | 1 + src/channels/turn/durable-delivery.test.ts | 3 +- src/channels/turn/durable-delivery.ts | 23 +- src/channels/turn/kernel.test.ts | 72 ++++ src/channels/turn/kernel.ts | 55 ++- src/plugin-sdk/inbound-reply-dispatch.test.ts | 53 +++ src/plugin-sdk/inbound-reply-dispatch.ts | 4 +- 13 files changed, 1052 insertions(+), 141 deletions(-) diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts index 2017b9ac811..2af52c32c17 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts @@ -466,6 +466,30 @@ describe("deliverWebReply", () => { expect(logVerbose).toHaveBeenCalled(); }); + it("marks errors visible after accepted media delivery", async () => { + const msg = makeMsg(); + const error = new Error("tail send failed"); + mockLoadedImageMedia(); + vi.mocked(msg.reply).mockRejectedValue(error); + + await expect( + deliverWebReply({ + replyResult: { text: "captiontail", mediaUrl: "http://example.com/img.jpg" }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 7, + replyLogger, + skipLog: true, + }), + ).rejects.toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + + expect(msg.sendMedia).toHaveBeenCalledTimes(1); + expect(msg.reply).toHaveBeenCalled(); + }); + it("preserves leading indentation after trimming only leading blank lines", async () => { const msg = makeMsg(); diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.ts index 3ea716d7601..02503b8dfdb 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.ts @@ -83,6 +83,20 @@ function createWhatsAppReplyDeliveryReceipt( }); } +function markWhatsAppVisibleDeliveryError(error: unknown): unknown { + if (typeof error === "object" && error !== null && !Array.isArray(error)) { + try { + Object.assign(error, { sentBeforeError: true, visibleReplySent: true }); + return error; + } catch { + // Fall back to a wrapper when a platform error object is non-extensible. + } + } + const visibleError = new Error("visible WhatsApp reply delivery failed", { cause: error }); + Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true }); + return visibleError; +} + export async function deliverWebReply(params: { replyResult: ReplyPayload; normalizedReplyResult?: DeliverableWhatsAppOutboundPayload; @@ -150,15 +164,22 @@ export async function deliverWebReply(params: { }; const sendWithRetry = async (fn: () => Promise, label: string, maxAttempts = 3) => { - return await sendWhatsAppOutboundWithRetry({ - send: fn, - maxAttempts, - onRetry: ({ attempt, maxAttempts: retryMaxAttempts, backoffMs, errorText }) => { - logVerbose( - `Retrying ${label} to ${msg.from} after failure (${attempt}/${retryMaxAttempts - 1}) in ${backoffMs}ms: ${errorText}`, - ); - }, - }); + try { + return await sendWhatsAppOutboundWithRetry({ + send: fn, + maxAttempts, + onRetry: ({ attempt, maxAttempts: retryMaxAttempts, backoffMs, errorText }) => { + logVerbose( + `Retrying ${label} to ${msg.from} after failure (${attempt}/${retryMaxAttempts - 1}) in ${backoffMs}ms: ${errorText}`, + ); + }, + }); + } catch (error: unknown) { + if (sendResults.some((result) => result.providerAccepted)) { + throw markWhatsAppVisibleDeliveryError(error); + } + throw error; + } }; // Text-only replies diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts index 0a831b64616..c419667212b 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts @@ -12,11 +12,27 @@ type CapturedReplyPayload = { mediaUrls?: string[]; }; +type CapturedDispatchParams = { + ctx?: unknown; + dispatcherOptions?: { + deliver?: ( + payload: CapturedReplyPayload, + info: { kind: "tool" | "block" | "final" }, + ) => Promise; + onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void; + onSettled?: () => Promise; + }; + replyOptions?: { + disableBlockStreaming?: boolean; + sourceReplyDeliveryMode?: "automatic" | "message_tool_only"; + }; +}; + const { dispatchReplyWithBufferedBlockDispatcherMock, deliverInboundReplyWithMessageSendContextMock, } = vi.hoisted(() => ({ - dispatchReplyWithBufferedBlockDispatcherMock: vi.fn(async (params: { ctx: unknown }) => { + dispatchReplyWithBufferedBlockDispatcherMock: vi.fn(async (params: CapturedDispatchParams) => { capturedDispatchParams = params; return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }; }), @@ -168,37 +184,19 @@ function makeMsg(overrides: Partial = {}): TestMsg { } function getCapturedDeliver() { - return ( - capturedDispatchParams as { - dispatcherOptions?: { - deliver?: ( - payload: CapturedReplyPayload, - info: { kind: "tool" | "block" | "final" }, - ) => Promise; - }; - } - )?.dispatcherOptions?.deliver; + return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.deliver; } function getCapturedOnError() { - return ( - capturedDispatchParams as { - dispatcherOptions?: { - onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void; - }; - } - )?.dispatcherOptions?.onError; + return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.onError; +} + +function getCapturedOnSettled() { + return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.onSettled; } function getCapturedReplyOptions() { - return ( - capturedDispatchParams as { - replyOptions?: { - disableBlockStreaming?: boolean; - sourceReplyDeliveryMode?: "automatic" | "message_tool_only"; - }; - } - )?.replyOptions; + return (capturedDispatchParams as CapturedDispatchParams)?.replyOptions; } function requireRecord(value: unknown, label: string): Record { @@ -597,12 +595,14 @@ describe("whatsapp inbound dispatch", () => { expect(deliverReply).not.toHaveBeenCalled(); expect(rememberSentText).not.toHaveBeenCalled(); - await deliver?.( - { text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, - { - kind: "tool", - }, - ); + await expect( + deliver?.( + { text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, + { + kind: "tool", + }, + ), + ).resolves.toMatchObject({ visibleReplySent: false }); expect(deliverReply).not.toHaveBeenCalled(); expect(rememberSentText).not.toHaveBeenCalled(); @@ -694,7 +694,9 @@ describe("whatsapp inbound dispatch", () => { }); const deliver = getCapturedDeliver(); - await deliver?.({ text: "cancelled by hook" }, { kind: "final" }); + expect(await deliver?.({ text: "cancelled by hook" }, { kind: "final" })).toMatchObject({ + visibleReplySent: false, + }); const durableParams = requireMockArg( deliverInboundReplyWithMessageSendContextMock, @@ -713,6 +715,155 @@ describe("whatsapp inbound dispatch", () => { expect(rememberSentText).not.toHaveBeenCalled(); }); + it("reports deferred media visible only after an accepted flush", async () => { + deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({ + status: "handled_no_send", + reason: "no_visible_result", + delivery: { + messageIds: [], + visibleReplySent: false, + }, + }); + const deliverReply = vi.fn(async () => acceptedDeliveryResult()); + + await dispatchBufferedReply({ + deliverReply, + }); + + const deliver = getCapturedDeliver(); + await expect( + deliver?.({ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, { kind: "tool" }), + ).resolves.toMatchObject({ visibleReplySent: false }); + await expect(deliver?.({ text: "cancelled final" }, { kind: "final" })).resolves.toMatchObject({ + visibleReplySent: true, + }); + expect(deliverReply).toHaveBeenCalledTimes(1); + }); + + it("flushes deferred media through the settled delivery hook", async () => { + const deliverReply = vi.fn(async () => acceptedDeliveryResult()); + const rememberSentText = vi.fn(); + let settledResult: unknown; + dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce( + async (params: CapturedDispatchParams) => { + capturedDispatchParams = params; + const deliver = params.dispatcherOptions?.deliver; + if (!deliver) { + throw new Error("expected captured deliver callback"); + } + const onSettled = params.dispatcherOptions?.onSettled; + const deferred = await deliver( + { text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, + { kind: "tool" }, + ); + expect(deferred).toMatchObject({ visibleReplySent: false }); + settledResult = await onSettled?.(); + return { + queuedFinal: false, + counts: { tool: 1, block: 0, final: 0 }, + }; + }, + ); + + await expect( + dispatchBufferedReply({ + deliverReply, + rememberSentText, + }), + ).resolves.toBe(true); + + expect(settledResult).toMatchObject({ visibleReplySent: true }); + expect(getCapturedOnSettled()).toBeTypeOf("function"); + expect(deliverReply).toHaveBeenCalledTimes(1); + expectRememberSentContextFields(rememberSentText, undefined, { + combinedBody: "hi", + combinedBodySessionKey: "agent:main:whatsapp:direct:+1000", + }); + }); + + it("marks deferred media flush failures visible after an earlier accepted flush", async () => { + const error = new Error("second deferred media failed"); + const deliverReply = vi + .fn() + .mockResolvedValueOnce(acceptedDeliveryResult()) + .mockRejectedValueOnce(error); + dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce( + async (params: CapturedDispatchParams) => { + capturedDispatchParams = params; + const deliver = params.dispatcherOptions?.deliver; + if (!deliver) { + throw new Error("expected captured deliver callback"); + } + const onSettled = params.dispatcherOptions?.onSettled; + await deliver({ text: "first image", mediaUrls: ["/tmp/first.jpg"] }, { kind: "tool" }); + await deliver({ text: "second image", mediaUrls: ["/tmp/second.jpg"] }, { kind: "tool" }); + await onSettled?.(); + return { + queuedFinal: false, + counts: { tool: 2, block: 0, final: 0 }, + }; + }, + ); + + await expect(dispatchBufferedReply({ deliverReply })).rejects.toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + expect(error).toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + expect(deliverReply).toHaveBeenCalledTimes(2); + }); + + it("marks downstream failures visible after deferred media flushes", async () => { + const error = new Error("durable text failed"); + deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({ + status: "failed", + error, + }); + const deliverReply = vi.fn(async () => acceptedDeliveryResult()); + + await dispatchBufferedReply({ + deliverReply, + }); + + const deliver = getCapturedDeliver(); + await expect( + deliver?.({ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, { kind: "tool" }), + ).resolves.toMatchObject({ visibleReplySent: false }); + await expect(deliver?.({ text: "final text" }, { kind: "final" })).rejects.toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + expect(error).toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + expect(deliverReply).toHaveBeenCalledTimes(1); + }); + + it("marks durable partial send failures as visible before rethrowing", async () => { + const error = new Error("second chunk failed"); + deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({ + status: "failed", + error, + sentBeforeError: true, + }); + const deliverReply = vi.fn(async () => acceptedDeliveryResult()); + + await dispatchBufferedReply({ + deliverReply, + }); + + const deliver = getCapturedDeliver(); + await expect(deliver?.({ text: "partial final" }, { kind: "final" })).rejects.toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + expect(deliverReply).not.toHaveBeenCalled(); + }); + it("keeps media replies on the WhatsApp owner delivery path", async () => { deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({ status: "handled_visible", @@ -920,15 +1071,7 @@ describe("whatsapp inbound dispatch", () => { const deliverReply = vi.fn(async () => acceptedDeliveryResult()); const rememberSentText = vi.fn(); dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce( - async (params: { - ctx: unknown; - dispatcherOptions?: { - deliver?: ( - payload: { text?: string }, - info: { kind: "tool" | "block" | "final" }, - ) => Promise; - }; - }) => { + async (params: CapturedDispatchParams) => { capturedDispatchParams = params; await params.dispatcherOptions?.deliver?.({ text: "partial block" }, { kind: "block" }); return { queuedFinal: false, counts: { tool: 0, block: 1, final: 0 } }; @@ -956,15 +1099,7 @@ describe("whatsapp inbound dispatch", () => { debug: vi.fn(), } as unknown as BufferedReplyParams["replyLogger"]; dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce( - async (params: { - ctx: unknown; - dispatcherOptions?: { - deliver?: ( - payload: { text?: string }, - info: { kind: "tool" | "block" | "final" }, - ) => Promise; - }; - }) => { + async (params: CapturedDispatchParams) => { capturedDispatchParams = params; await params.dispatcherOptions?.deliver?.({ text: "final text" }, { kind: "final" }); return { queuedFinal: false, counts: { tool: 0, block: 0, final: 1 } }; @@ -994,20 +1129,13 @@ describe("whatsapp inbound dispatch", () => { const deliverReply = vi.fn(async () => acceptedDeliveryResult()); const rememberSentText = vi.fn(); dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce( - async (params: { - ctx: unknown; - dispatcherOptions?: { - deliver?: ( - payload: CapturedReplyPayload, - info: { kind: "tool" | "block" | "final" }, - ) => Promise; - }; - }) => { + async (params: CapturedDispatchParams) => { capturedDispatchParams = params; await params.dispatcherOptions?.deliver?.( { text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, { kind: "tool" }, ); + await params.dispatcherOptions?.onSettled?.(); return { queuedFinal: false, counts: { tool: 1, block: 0, final: 0 } }; }, ); diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts index d72c424275f..ae8af884004 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts @@ -87,6 +87,43 @@ function normalizeErrForLog(err: unknown): unknown { return err; } +type WhatsAppReplyDeliveryVisibility = { + visibleReplySent: boolean; +}; + +function whatsAppReplyDeliveryVisibility( + visibleReplySent: boolean, +): WhatsAppReplyDeliveryVisibility { + return { visibleReplySent }; +} + +function whatsAppReplyDeliveryVisibilityFromDurableResult(result: { + visibleReplySent?: boolean; +}): WhatsAppReplyDeliveryVisibility { + return whatsAppReplyDeliveryVisibility(result.visibleReplySent === true); +} + +function markWhatsAppReplyDeliveryErrorVisible(error: unknown): unknown { + if (typeof error === "object" && error !== null && !Array.isArray(error)) { + try { + Object.assign(error, { sentBeforeError: true, visibleReplySent: true }); + return error; + } catch { + // Fall back to a wrapper when a platform error object is non-extensible. + } + } + const visibleError = new Error("visible WhatsApp reply delivery failed", { cause: error }); + Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true }); + return visibleError; +} + +function markWhatsAppReplyDeliveryErrorVisibleAfterFlush( + error: unknown, + flushResult: WhatsAppMediaOnlyFlushResult, +): unknown { + return flushResult.delivered > 0 ? markWhatsAppReplyDeliveryErrorVisible(error) : error; +} + function logWhatsAppReplyDeliveryError(params: { err: unknown; info: ReplyDeliveryInfo; @@ -170,7 +207,7 @@ function shouldDeferWhatsAppMediaOnlyPayload(params: { } function createWhatsAppMediaOnlyReplyCoalescer(params: { - deliver: (pending: PendingWhatsAppMediaOnlyPayload) => Promise; + deliver: (pending: PendingWhatsAppMediaOnlyPayload) => Promise; }) { const pendingMediaOnlyPayloads: PendingWhatsAppMediaOnlyPayload[] = []; const flushExceptDuplicateMedia = async ( @@ -186,8 +223,14 @@ function createWhatsAppMediaOnlyReplyCoalescer(params: { flushResult.droppedDuplicateMedia += 1; continue; } - await params.deliver(candidate); - flushResult.delivered += 1; + try { + const delivery = await params.deliver(candidate); + if (delivery.visibleReplySent) { + flushResult.delivered += 1; + } + } catch (error: unknown) { + throw markWhatsAppReplyDeliveryErrorVisibleAfterFlush(error, flushResult); + } } return flushResult; }; @@ -511,10 +554,10 @@ export async function dispatchWhatsAppBufferedReply(params: { const deliverNormalizedPayload = async ( normalizedDeliveryPayload: DeliverableWhatsAppOutboundPayload, info: ReplyDeliveryInfo, - ) => { + ): Promise => { const reply = resolveSendableOutboundReplyParts(normalizedDeliveryPayload); if (!reply.hasMedia && !reply.text.trim()) { - return; + return whatsAppReplyDeliveryVisibility(false); } const delivery = await params.deliverReply({ replyResult: normalizedDeliveryPayload, @@ -542,7 +585,7 @@ export async function dispatchWhatsAppBufferedReply(params: { }, "auto-reply was not accepted by WhatsApp provider", ); - return; + return whatsAppReplyDeliveryVisibility(false); } didSendReply = true; const shouldLog = normalizedDeliveryPayload.text ? true : undefined; @@ -557,11 +600,12 @@ export async function dispatchWhatsAppBufferedReply(params: { const preview = normalizedDeliveryPayload.text != null ? reply.text : ""; logVerbose(`Reply body: ${preview}${reply.hasMedia ? " (media)" : ""} -> ${fromDisplay}`); } + return whatsAppReplyDeliveryVisibility(true); }; const mediaOnlyCoalescer = createWhatsAppMediaOnlyReplyCoalescer({ deliver: async (pending) => { - await deliverNormalizedPayload(pending.payload, pending.info); + return await deliverNormalizedPayload(pending.payload, pending.info); }, }); @@ -584,7 +628,7 @@ export async function dispatchWhatsAppBufferedReply(params: { deliver: async (payload: ReplyPayload, info: { kind: ReplyLifecycleKind }) => { const deliveryPayload = resolveWhatsAppDeliverablePayload(payload, info); if (!deliveryPayload) { - return; + return whatsAppReplyDeliveryVisibility(false); } const normalizedOutboundPayload = normalizeWhatsAppOutboundPayload(deliveryPayload, { normalizeText: normalizeWhatsAppPayloadTextPreservingIndentation, @@ -595,43 +639,55 @@ export async function dispatchWhatsAppBufferedReply(params: { : normalizedOutboundPayload; const reply = resolveSendableOutboundReplyParts(normalizedDeliveryPayload); if (!reply.hasMedia && !reply.text.trim()) { - return; + return whatsAppReplyDeliveryVisibility(false); } if (!reply.hasMedia) { - logWhatsAppMediaOnlyFlushResult(await mediaOnlyCoalescer.flushAll()); - const durable = await deliverInboundReplyWithMessageSendContext({ - cfg: params.cfg, - channel: "whatsapp", - accountId: params.route.accountId, - agentId: params.route.agentId, - ctxPayload: params.context as FinalizedMsgContext, - payload: normalizedDeliveryPayload, - info, - to: params.msg.from, - formatting: { - textLimit, - tableMode, - chunkMode, - }, - }); - if (durable.status === "failed") { - throw durable.error; - } - if (durable.status === "handled_visible") { - didSendReply = true; - const shouldLog = normalizedDeliveryPayload.text ? true : undefined; - params.rememberSentText(normalizedDeliveryPayload.text, { - combinedBody: params.context.Body as string | undefined, - combinedBodySessionKey: params.route.sessionKey, - logVerboseMessage: shouldLog, + const flushResult = await mediaOnlyCoalescer.flushAll(); + logWhatsAppMediaOnlyFlushResult(flushResult); + try { + const durable = await deliverInboundReplyWithMessageSendContext({ + cfg: params.cfg, + channel: "whatsapp", + accountId: params.route.accountId, + agentId: params.route.agentId, + ctxPayload: params.context as FinalizedMsgContext, + payload: normalizedDeliveryPayload, + info, + to: params.msg.from, + formatting: { + textLimit, + tableMode, + chunkMode, + }, }); - return; + if (durable.status === "failed") { + if (durable.sentBeforeError === true) { + throw markWhatsAppReplyDeliveryErrorVisible(durable.error); + } + throw durable.error; + } + if (durable.status === "handled_visible") { + didSendReply = true; + const shouldLog = normalizedDeliveryPayload.text ? true : undefined; + params.rememberSentText(normalizedDeliveryPayload.text, { + combinedBody: params.context.Body as string | undefined, + combinedBodySessionKey: params.route.sessionKey, + logVerboseMessage: shouldLog, + }); + return whatsAppReplyDeliveryVisibilityFromDurableResult(durable.delivery); + } + if (durable.status === "handled_no_send") { + return flushResult.delivered > 0 + ? whatsAppReplyDeliveryVisibility(true) + : whatsAppReplyDeliveryVisibilityFromDurableResult(durable.delivery); + } + const delivery = await deliverNormalizedPayload(normalizedDeliveryPayload, info); + return flushResult.delivered > 0 && !delivery.visibleReplySent + ? whatsAppReplyDeliveryVisibility(true) + : delivery; + } catch (error: unknown) { + throw markWhatsAppReplyDeliveryErrorVisibleAfterFlush(error, flushResult); } - if (durable.status === "handled_no_send") { - return; - } - await deliverNormalizedPayload(normalizedDeliveryPayload, info); - return; } const mediaUrls = getWhatsAppPayloadMediaUrls(normalizedDeliveryPayload); if (shouldDeferWhatsAppMediaOnlyPayload({ info, mediaUrls, reply })) { @@ -640,12 +696,23 @@ export async function dispatchWhatsAppBufferedReply(params: { mediaUrls, payload: normalizedDeliveryPayload, }); - return; + return whatsAppReplyDeliveryVisibility(false); } - logWhatsAppMediaOnlyFlushResult( - await mediaOnlyCoalescer.flushExceptDuplicateMedia(mediaUrls), - ); - await deliverNormalizedPayload(normalizedDeliveryPayload, info); + const flushResult = await mediaOnlyCoalescer.flushExceptDuplicateMedia(mediaUrls); + logWhatsAppMediaOnlyFlushResult(flushResult); + try { + const delivery = await deliverNormalizedPayload(normalizedDeliveryPayload, info); + return flushResult.delivered > 0 && !delivery.visibleReplySent + ? whatsAppReplyDeliveryVisibility(true) + : delivery; + } catch (error: unknown) { + throw markWhatsAppReplyDeliveryErrorVisibleAfterFlush(error, flushResult); + } + }, + onSettled: async () => { + const flushResult = await mediaOnlyCoalescer.flushAll(); + logWhatsAppMediaOnlyFlushResult(flushResult); + return whatsAppReplyDeliveryVisibility(flushResult.delivered > 0); }, onReplyStart: params.msg.sendComposing, ...(statusReactionController @@ -686,8 +753,6 @@ export async function dispatchWhatsAppBufferedReply(params: { : {}), }, }); - logWhatsAppMediaOnlyFlushResult(await mediaOnlyCoalescer.flushAll()); - const didQueueVisibleReply = hasVisibleInboundReplyDispatch({ queuedFinal, counts }); if (!didQueueVisibleReply) { if (statusReactionController) { diff --git a/src/auto-reply/dispatch.freshness.test.ts b/src/auto-reply/dispatch.freshness.test.ts index 85a9ab6931c..41a2682ca4f 100644 --- a/src/auto-reply/dispatch.freshness.test.ts +++ b/src/auto-reply/dispatch.freshness.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { OutboundDeliveryError } from "../infra/outbound/deliver-types.js"; import { resetGlobalHookRunner } from "../plugins/hook-runner-global.js"; import type { ReplyDispatchBeforeDeliver } from "./reply/reply-dispatcher.js"; import { buildTestCtx } from "./reply/test-ctx.js"; @@ -59,16 +60,22 @@ function buildForegroundCtx(overrides: Partial = {}): FinalizedMsgCo function dispatchWithDeliveries( ctx: FinalizedMsgContext, deliveries: Delivery[], - dispatcherOptions: { beforeDeliver?: ReplyDispatchBeforeDeliver } = {}, + dispatcherOptions: { + beforeDeliver?: ReplyDispatchBeforeDeliver; + deliver?: (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => Promise; + onSettled?: () => unknown; + } = {}, ) { return dispatchInboundMessageWithBufferedDispatcher({ ctx, cfg: {} as OpenClawConfig, dispatcherOptions: { ...dispatcherOptions, - deliver: async (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => { - deliveries.push({ kind: info.kind, text: payload.text }); - }, + deliver: + dispatcherOptions.deliver ?? + (async (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => { + deliveries.push({ kind: info.kind, text: payload.text }); + }), }, }); } @@ -129,7 +136,7 @@ describe("foreground reply freshness", () => { expect(deliveries).toEqual([{ kind: "final", text: "new final" }]); }); - it("suppresses an older foreground final when a newer inbound starts while beforeDeliver is pending", async () => { + it("keeps an older foreground final when a newer inbound has no visible delivery while beforeDeliver is pending", async () => { const deliveries: Delivery[] = []; const beforeDeliverStarted = createDeferred(); const releaseBeforeDeliver = createDeferred(); @@ -174,6 +181,294 @@ describe("foreground reply freshness", () => { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 }, }); + expect(olderResult).toEqual({ + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }); + expect(deliveries).toEqual([{ kind: "final", text: "old rewritten final" }]); + }); + + it("keeps an older foreground final fenced while a newer visible delivery is unresolved", async () => { + const deliveries: Delivery[] = []; + const beforeDeliverStarted = createDeferred(); + const releaseBeforeDeliver = createDeferred(); + const newerDeliverStarted = createDeferred(); + const releaseNewerDeliver = createDeferred(); + const beforeDeliver = vi.fn(() => { + beforeDeliverStarted.resolve(); + return releaseBeforeDeliver.promise; + }); + + hoisted.dispatchReplyFromConfigMock.mockImplementation( + async (params: DispatchReplyFromConfigParams) => { + if (params.ctx.MessageSid === "old-message") { + params.dispatcher.sendFinalReply({ text: "old final" }); + return queuedFinalResult(); + } + if (params.ctx.MessageSid === "new-message") { + params.dispatcher.sendFinalReply({ text: "new final" }); + return queuedFinalResult(); + } + throw new Error(`unexpected test message ${params.ctx.MessageSid ?? ""}`); + }, + ); + + const olderDispatch = dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "old-message" }), + deliveries, + { beforeDeliver }, + ); + await beforeDeliverStarted.promise; + + const newerDispatch = dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "new-message" }), + deliveries, + { + deliver: async (payload, info) => { + newerDeliverStarted.resolve(); + await releaseNewerDeliver.promise; + deliveries.push({ kind: info.kind, text: payload.text }); + }, + }, + ); + await newerDeliverStarted.promise; + + releaseBeforeDeliver.resolve({ text: "old rewritten final" }); + await Promise.resolve(); + expect(deliveries).toEqual([]); + + releaseNewerDeliver.resolve(); + const newerResult = await newerDispatch; + const olderResult = await olderDispatch; + + expect(beforeDeliver).toHaveBeenCalledTimes(1); + expect(newerResult).toEqual({ + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }); + expect(olderResult).toEqual({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + expect(deliveries).toEqual([{ kind: "final", text: "new final" }]); + }); + + it("keeps an older foreground final when a newer visible delivery fails", async () => { + const deliveries: Delivery[] = []; + const beforeDeliverStarted = createDeferred(); + const releaseBeforeDeliver = createDeferred(); + const beforeDeliver = vi.fn(() => { + beforeDeliverStarted.resolve(); + return releaseBeforeDeliver.promise; + }); + + hoisted.dispatchReplyFromConfigMock.mockImplementation( + async (params: DispatchReplyFromConfigParams) => { + if (params.ctx.MessageSid === "old-message") { + params.dispatcher.sendFinalReply({ text: "old final" }); + return queuedFinalResult(); + } + if (params.ctx.MessageSid === "new-message") { + params.dispatcher.sendFinalReply({ text: "new final" }); + return queuedFinalResult(); + } + throw new Error(`unexpected test message ${params.ctx.MessageSid ?? ""}`); + }, + ); + + const olderDispatch = dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "old-message" }), + deliveries, + { beforeDeliver }, + ); + await beforeDeliverStarted.promise; + + const newerResult = await dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "new-message" }), + deliveries, + { + deliver: async () => { + throw new Error("delivery failed"); + }, + }, + ); + + releaseBeforeDeliver.resolve({ text: "old rewritten final" }); + const olderResult = await olderDispatch; + + expect(beforeDeliver).toHaveBeenCalledTimes(1); + expect(newerResult).toEqual({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + failedCounts: { tool: 0, block: 0, final: 1 }, + }); + expect(olderResult).toEqual({ + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }); + expect(deliveries).toEqual([{ kind: "final", text: "old rewritten final" }]); + }); + + it("suppresses an older foreground final when a newer delivery partially sends before failing", async () => { + const deliveries: Delivery[] = []; + const beforeDeliverStarted = createDeferred(); + const releaseBeforeDeliver = createDeferred(); + const beforeDeliver = vi.fn(() => { + beforeDeliverStarted.resolve(); + return releaseBeforeDeliver.promise; + }); + + hoisted.dispatchReplyFromConfigMock.mockImplementation( + async (params: DispatchReplyFromConfigParams) => { + if (params.ctx.MessageSid === "old-message") { + params.dispatcher.sendFinalReply({ text: "old final" }); + return queuedFinalResult(); + } + if (params.ctx.MessageSid === "new-message") { + params.dispatcher.sendFinalReply({ text: "new final" }); + return queuedFinalResult(); + } + throw new Error(`unexpected test message ${params.ctx.MessageSid ?? ""}`); + }, + ); + + const olderDispatch = dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "old-message" }), + deliveries, + { beforeDeliver }, + ); + await beforeDeliverStarted.promise; + + const newerResult = await dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "new-message" }), + deliveries, + { + deliver: async (payload, info) => { + deliveries.push({ kind: info.kind, text: payload.text }); + throw new OutboundDeliveryError("second chunk failed", { + cause: new Error("second chunk failed"), + results: [{ channel: "whatsapp", messageId: "wa-1" }], + }); + }, + }, + ); + + releaseBeforeDeliver.resolve({ text: "old rewritten final" }); + const olderResult = await olderDispatch; + + expect(beforeDeliver).toHaveBeenCalledTimes(1); + expect(newerResult).toEqual({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + failedCounts: { tool: 0, block: 0, final: 1 }, + }); + expect(olderResult).toEqual({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + expect(deliveries).toEqual([{ kind: "final", text: "new final" }]); + }); + + it("keeps an older foreground final when a newer adapter reports non-visible delivery", async () => { + const deliveries: Delivery[] = []; + const beforeDeliverStarted = createDeferred(); + const releaseBeforeDeliver = createDeferred(); + const beforeDeliver = vi.fn(() => { + beforeDeliverStarted.resolve(); + return releaseBeforeDeliver.promise; + }); + + hoisted.dispatchReplyFromConfigMock.mockImplementation( + async (params: DispatchReplyFromConfigParams) => { + if (params.ctx.MessageSid === "old-message") { + params.dispatcher.sendFinalReply({ text: "old final" }); + return queuedFinalResult(); + } + if (params.ctx.MessageSid === "new-message") { + params.dispatcher.sendFinalReply({ text: "new final" }); + return queuedFinalResult(); + } + throw new Error(`unexpected test message ${params.ctx.MessageSid ?? ""}`); + }, + ); + + const olderDispatch = dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "old-message" }), + deliveries, + { beforeDeliver }, + ); + await beforeDeliverStarted.promise; + + const newerResult = await dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "new-message" }), + deliveries, + { + deliver: async () => ({ visibleReplySent: false }), + }, + ); + + releaseBeforeDeliver.resolve({ text: "old rewritten final" }); + const olderResult = await olderDispatch; + + expect(beforeDeliver).toHaveBeenCalledTimes(1); + expect(newerResult).toEqual({ + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }); + expect(olderResult).toEqual({ + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }); + expect(deliveries).toEqual([{ kind: "final", text: "old rewritten final" }]); + }); + + it("suppresses an older foreground final when a newer settled hook reports visible delivery", async () => { + const deliveries: Delivery[] = []; + const beforeDeliverStarted = createDeferred(); + const releaseBeforeDeliver = createDeferred(); + const beforeDeliver = vi.fn(() => { + beforeDeliverStarted.resolve(); + return releaseBeforeDeliver.promise; + }); + + hoisted.dispatchReplyFromConfigMock.mockImplementation( + async (params: DispatchReplyFromConfigParams) => { + if (params.ctx.MessageSid === "old-message") { + params.dispatcher.sendFinalReply({ text: "old final" }); + return queuedFinalResult(); + } + if (params.ctx.MessageSid === "new-message") { + params.dispatcher.sendFinalReply({ text: "new final" }); + return queuedFinalResult(); + } + throw new Error(`unexpected test message ${params.ctx.MessageSid ?? ""}`); + }, + ); + + const olderDispatch = dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "old-message" }), + deliveries, + { beforeDeliver }, + ); + await beforeDeliverStarted.promise; + + const newerResult = await dispatchWithDeliveries( + buildForegroundCtx({ MessageSid: "new-message" }), + deliveries, + { + deliver: async () => ({ visibleReplySent: false }), + onSettled: async () => ({ visibleReplySent: true }), + }, + ); + + releaseBeforeDeliver.resolve({ text: "old rewritten final" }); + const olderResult = await olderDispatch; + + expect(beforeDeliver).toHaveBeenCalledTimes(1); + expect(newerResult).toEqual({ + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }); expect(olderResult).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 }, @@ -181,6 +476,31 @@ describe("foreground reply freshness", () => { expect(deliveries).toEqual([]); }); + it("runs the settled delivery hook when dispatch fails after queueing a reply", async () => { + const deliveries: Delivery[] = []; + let settled = false; + const error = new Error("resolver failed"); + + hoisted.dispatchReplyFromConfigMock.mockImplementation( + async (params: DispatchReplyFromConfigParams) => { + params.dispatcher.sendFinalReply({ text: "queued final" }); + throw error; + }, + ); + + await expect( + dispatchWithDeliveries(buildForegroundCtx(), deliveries, { + deliver: async () => ({ visibleReplySent: false }), + onSettled: () => { + settled = true; + return { visibleReplySent: true }; + }, + }), + ).rejects.toBe(error); + + expect(settled).toBe(true); + }); + it("keeps concurrent foreground finals isolated for different targets sharing a session", async () => { const deliveries: Delivery[] = []; const firstStarted = createDeferred(); diff --git a/src/auto-reply/dispatch.ts b/src/auto-reply/dispatch.ts index c4bdc28547e..1fdb1eef830 100644 --- a/src/auto-reply/dispatch.ts +++ b/src/auto-reply/dispatch.ts @@ -9,7 +9,9 @@ import { measureDiagnosticsTimelineSpan, measureDiagnosticsTimelineSpanSync, } from "../infra/diagnostics-timeline.js"; +import { isOutboundDeliveryError } from "../infra/outbound/deliver-types.js"; import { logMessageReceived } from "../logging/diagnostic.js"; +import { hasOutboundReplyContent } from "../plugin-sdk/reply-payload.js"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import type { SilentReplyConversationType } from "../shared/silent-reply-policy.js"; import { @@ -34,7 +36,10 @@ import type { GetReplyOptions, ReplyPayload } from "./types.js"; type ForegroundReplyFenceState = { generation: number; + visibleDeliveryGeneration: number; activeDispatches: number; + activeGenerations: Map; + waiters: Set<() => void>; }; type ForegroundReplyFenceSnapshot = { @@ -87,10 +92,17 @@ function beginForegroundReplyFence( } const state = foregroundReplyFenceByKey.get(key) ?? { generation: 0, + visibleDeliveryGeneration: 0, activeDispatches: 0, + activeGenerations: new Map(), + waiters: new Set<() => void>(), }; state.generation += 1; state.activeDispatches += 1; + state.activeGenerations.set( + state.generation, + (state.activeGenerations.get(state.generation) ?? 0) + 1, + ); foregroundReplyFenceByKey.set(key, state); return { key, @@ -98,21 +110,142 @@ function beginForegroundReplyFence( }; } -function isForegroundReplyFenceSuperseded( - snapshot: ForegroundReplyFenceSnapshot | undefined, +function notifyForegroundReplyFenceWaiters(state: ForegroundReplyFenceState): void { + const waiters = [...state.waiters]; + state.waiters.clear(); + for (const resolve of waiters) { + resolve(); + } +} + +function hasNewerActiveForegroundReplyFenceGeneration( + state: ForegroundReplyFenceState, + generation: number, ): boolean { - return Boolean( - snapshot && - (foregroundReplyFenceByKey.get(snapshot.key)?.generation ?? 0) !== snapshot.generation, + for (const [activeGeneration, count] of state.activeGenerations) { + if (activeGeneration > generation && count > 0) { + return true; + } + } + return false; +} + +async function shouldCancelForegroundReplyDelivery( + snapshot: ForegroundReplyFenceSnapshot | undefined, +): Promise { + if (!snapshot) { + return false; + } + while (true) { + const state = foregroundReplyFenceByKey.get(snapshot.key); + if (!state) { + return false; + } + if (state.visibleDeliveryGeneration > snapshot.generation) { + return true; + } + if (!hasNewerActiveForegroundReplyFenceGeneration(state, snapshot.generation)) { + return false; + } + await new Promise((resolve) => { + state.waiters.add(resolve); + }); + } +} + +function markForegroundReplyFenceVisibleDelivery( + snapshot: ForegroundReplyFenceSnapshot | undefined, + payload: ReplyPayload, + deliveryResult: unknown, +): void { + if (!snapshot || !hasOutboundReplyContent(payload, { trimText: true })) { + return; + } + if (isExplicitlyNonVisibleDelivery(deliveryResult)) { + return; + } + markForegroundReplyFenceVisibleDeliveryGeneration(snapshot); +} + +function markForegroundReplyFenceVisibleDeliveryGeneration( + snapshot: ForegroundReplyFenceSnapshot | undefined, +): void { + if (!snapshot) { + return; + } + const state = foregroundReplyFenceByKey.get(snapshot.key); + if (!state) { + return; + } + state.visibleDeliveryGeneration = Math.max(state.visibleDeliveryGeneration, snapshot.generation); + notifyForegroundReplyFenceWaiters(state); +} + +function isExplicitlyNonVisibleDelivery(deliveryResult: unknown): boolean { + return ( + typeof deliveryResult === "object" && + deliveryResult !== null && + !Array.isArray(deliveryResult) && + "visibleReplySent" in deliveryResult && + (deliveryResult as { visibleReplySent?: unknown }).visibleReplySent === false ); } +function isExplicitlyVisibleDelivery(deliveryResult: unknown): boolean { + return ( + typeof deliveryResult === "object" && + deliveryResult !== null && + !Array.isArray(deliveryResult) && + (deliveryResult as { visibleReplySent?: unknown }).visibleReplySent === true + ); +} + +function isVisiblePartialDeliveryError(error: unknown): boolean { + if (isOutboundDeliveryError(error)) { + return error.sentBeforeError; + } + return ( + typeof error === "object" && + error !== null && + !Array.isArray(error) && + ((error as { visibleReplySent?: unknown }).visibleReplySent === true || + (error as { sentBeforeError?: unknown }).sentBeforeError === true) + ); +} + +async function runForegroundReplyFenceSettledDelivery( + snapshot: ForegroundReplyFenceSnapshot | undefined, + onSettled: (() => unknown) | undefined, +): Promise { + if (!onSettled) { + return; + } + try { + const deliveryResult = await onSettled(); + if (isExplicitlyVisibleDelivery(deliveryResult)) { + markForegroundReplyFenceVisibleDeliveryGeneration(snapshot); + } + } catch (err: unknown) { + if (isVisiblePartialDeliveryError(err)) { + markForegroundReplyFenceVisibleDeliveryGeneration(snapshot); + } + throw err; + } +} + function endForegroundReplyFence(snapshot: ForegroundReplyFenceSnapshot): void { const state = foregroundReplyFenceByKey.get(snapshot.key); if (!state) { return; } + const activeGenerationCount = state.activeGenerations.get(snapshot.generation) ?? 0; + if (activeGenerationCount <= 1) { + state.activeGenerations.delete(snapshot.generation); + } else { + state.activeGenerations.set(snapshot.generation, activeGenerationCount - 1); + } state.activeDispatches -= 1; + notifyForegroundReplyFenceWaiters(state); if (state.activeDispatches <= 0) { foregroundReplyFenceByKey.delete(snapshot.key); } @@ -306,21 +439,39 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: { const beforeDeliver: ReplyDispatchBeforeDeliver | undefined = foregroundReplyFence || configuredBeforeDeliver ? async (payload, info) => { - if (isForegroundReplyFenceSuperseded(foregroundReplyFence)) { + if (await shouldCancelForegroundReplyDelivery(foregroundReplyFence)) { return null; } const deliverPayload = configuredBeforeDeliver ? await configuredBeforeDeliver(payload, info) : payload; - if (!deliverPayload || isForegroundReplyFenceSuperseded(foregroundReplyFence)) { + if ( + !deliverPayload || + (await shouldCancelForegroundReplyDelivery(foregroundReplyFence)) + ) { return null; } return deliverPayload; } : undefined; + const deliver: ReplyDispatcherWithTypingOptions["deliver"] = async (payload, info) => { + try { + const result = await params.dispatcherOptions.deliver(payload, info); + markForegroundReplyFenceVisibleDelivery(foregroundReplyFence, payload, result); + return result; + } catch (err: unknown) { + if (isVisiblePartialDeliveryError(err)) { + markForegroundReplyFenceVisibleDelivery(foregroundReplyFence, payload, { + visibleReplySent: true, + }); + } + throw err; + } + }; const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = createReplyDispatcherWithTyping({ ...params.dispatcherOptions, + deliver, beforeDeliver, silentReplyContext: params.dispatcherOptions.silentReplyContext ?? silentReplyContext, }); @@ -336,11 +487,18 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: { }, }); } finally { - if (foregroundReplyFence) { - endForegroundReplyFence(foregroundReplyFence); + try { + await runForegroundReplyFenceSettledDelivery( + foregroundReplyFence, + params.dispatcherOptions.onSettled, + ); + } finally { + if (foregroundReplyFence) { + endForegroundReplyFence(foregroundReplyFence); + } + markRunComplete(); + markDispatchIdle(); } - markRunComplete(); - markDispatchIdle(); } } diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 074bcce16a6..feebda2bdf5 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -81,6 +81,7 @@ export type ReplyDispatcherWithTypingOptions = Omit Promise | void; onIdle?: () => void; + onSettled?: () => unknown; /** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */ onCleanup?: () => void; }; diff --git a/src/channels/turn/durable-delivery.test.ts b/src/channels/turn/durable-delivery.test.ts index 10283f8650f..9d136cfb7fd 100644 --- a/src/channels/turn/durable-delivery.test.ts +++ b/src/channels/turn/durable-delivery.test.ts @@ -215,6 +215,7 @@ describe("durable inbound reply delivery", () => { }), }); - expect(result).toEqual({ status: "failed", error }); + expect(result).toEqual({ status: "failed", error, sentBeforeError: true }); + expect(error).toMatchObject({ sentBeforeError: true, visibleReplySent: true }); }); }); diff --git a/src/channels/turn/durable-delivery.ts b/src/channels/turn/durable-delivery.ts index bc9d9e7d7af..5deb3c4bc43 100644 --- a/src/channels/turn/durable-delivery.ts +++ b/src/channels/turn/durable-delivery.ts @@ -48,7 +48,7 @@ export type DurableInboundReplyDeliveryResult = } | { status: "handled_visible"; delivery: ChannelDeliveryResult } | { status: "handled_no_send"; reason: "no_visible_result"; delivery: ChannelDeliveryResult } - | { status: "failed"; error: unknown }; + | { status: "failed"; error: unknown; sentBeforeError?: true }; function resolveDeliveryTarget(params: DurableInboundReplyDeliveryParams): string | undefined { return ( @@ -106,10 +106,23 @@ export function throwIfDurableInboundReplyDeliveryFailed( result: DurableInboundReplyDeliveryResult, ): void { if (result.status === "failed") { - throw result.error; + throw result.sentBeforeError === true + ? markDurableInboundReplyDeliveryErrorVisible(result.error) + : result.error; } } +function markDurableInboundReplyDeliveryErrorVisible(error: unknown): unknown { + if (typeof error === "object" && error !== null && Object.isExtensible(error)) { + Object.assign(error, { sentBeforeError: true, visibleReplySent: true }); + return error; + } + + const visibleError = new Error("visible durable reply delivery failed", { cause: error }); + Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true }); + return visibleError; +} + export async function deliverInboundReplyWithMessageSendContext( params: DurableInboundReplyDeliveryParams, ): Promise { @@ -192,7 +205,11 @@ export async function deliverInboundReplyWithMessageSendContext( return { status: "failed" as const, error: send.error }; } if (send.status === "partial_failed") { - return { status: "failed" as const, error: send.error }; + return { + status: "failed" as const, + error: markDurableInboundReplyDeliveryErrorVisible(send.error), + sentBeforeError: true, + }; } const delivery = createChannelDeliveryResultFromReceipt({ diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index d5b2cd5b4a1..fb51eaab56d 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -401,6 +401,78 @@ describe("channel turn kernel", () => { expect(deliver).not.toHaveBeenCalled(); }); + it("preserves durable partial-send visibility when generic delivery throws", async () => { + const error = new Error("second chunk failed"); + sendDurableMessageBatch.mockResolvedValueOnce({ + status: "partial_failed", + results: [{ channel: "telegram", messageId: "tg-1" }], + receipt: { + primaryPlatformMessageId: "tg-1", + platformMessageIds: ["tg-1"], + parts: [{ platformMessageId: "tg-1", kind: "text", index: 0 }], + sentAt: 1, + }, + error, + sentBeforeError: true, + }); + const deliver = vi.fn(async () => ({ messageIds: ["legacy-1"], visibleReplySent: true })); + const dispatchReplyWithBufferedBlockDispatcher = createDispatch(); + + await expect( + dispatchAssembledChannelTurn({ + cfg, + channel: "telegram", + accountId: "acct", + agentId: "main", + routeSessionKey: "agent:main:telegram:peer", + storePath: "/tmp/sessions.json", + ctxPayload: createCtx({ To: "123", OriginatingTo: "123" }), + recordInboundSession: createRecordInboundSession(), + dispatchReplyWithBufferedBlockDispatcher, + delivery: { deliver, durable: { replyToMode: "first" } }, + }), + ).rejects.toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + + expect(deliver).not.toHaveBeenCalled(); + }); + + it("preserves visible delivery when post-delivery observers throw", async () => { + const error = new Error("observer failed"); + const deliver = vi.fn(async () => ({ messageIds: ["local-1"], visibleReplySent: true })); + const dispatchReplyWithBufferedBlockDispatcher = createDispatch(); + + await expect( + dispatchAssembledChannelTurn({ + cfg, + channel: "telegram", + accountId: "acct", + agentId: "main", + routeSessionKey: "agent:main:telegram:peer", + storePath: "/tmp/sessions.json", + ctxPayload: createCtx({ To: "123", OriginatingTo: "123" }), + recordInboundSession: createRecordInboundSession(), + dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver, + durable: false, + onDelivered: () => { + throw error; + }, + }, + }), + ).rejects.toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + expect(error).toMatchObject({ + sentBeforeError: true, + visibleReplySent: true, + }); + }); + it("returns custom delivery result to the buffered dispatcher", async () => { let deliveredResult: unknown; const dispatchReplyWithBufferedBlockDispatcher = vi.fn( diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index 59f8ad71125..cc7303183e3 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -263,6 +263,47 @@ function resolveObserveOnlyDispatchResult( }) as TDispatchResult; } +function isExplicitlyNonVisibleChannelDelivery(result: unknown): boolean { + return ( + typeof result === "object" && + result !== null && + !Array.isArray(result) && + (result as { visibleReplySent?: unknown }).visibleReplySent === false + ); +} + +function markChannelDeliveryErrorVisible(error: unknown): unknown { + if (typeof error === "object" && error !== null && !Array.isArray(error)) { + try { + Object.assign(error, { sentBeforeError: true, visibleReplySent: true }); + return error; + } catch { + // Fall back to a wrapper when a platform error object is non-extensible. + } + } + const visibleError = new Error("visible channel reply delivery failed", { cause: error }); + Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true }); + return visibleError; +} + +async function runChannelDeliveryObserver(params: { + onDelivered: ChannelEventDeliveryAdapter["onDelivered"] | undefined; + payload: ReplyPayload; + info: Parameters>[1]; + result: Parameters>[2]; +}): Promise { + if (!params.onDelivered) { + return; + } + try { + await params.onDelivered(params.payload, params.info, params.result); + } catch (error: unknown) { + throw isExplicitlyNonVisibleChannelDelivery(params.result) + ? error + : markChannelDeliveryErrorVisible(error); + } +} + function resolveBotLoopProtectionDrop( params: PreparedChannelTurn, ): ChannelTurnResult | undefined { @@ -358,12 +399,22 @@ export async function dispatchAssembledChannelTurn( }); throwIfDurableInboundReplyDeliveryFailed(durable); if (isDurableInboundReplyDeliveryHandled(durable)) { - await params.delivery.onDelivered?.(preparedPayload, info, durable.delivery); + await runChannelDeliveryObserver({ + onDelivered: params.delivery.onDelivered, + payload: preparedPayload, + info, + result: durable.delivery, + }); return durable.delivery; } } const result = await params.delivery.deliver(preparedPayload, info); - await params.delivery.onDelivered?.(preparedPayload, info, result); + await runChannelDeliveryObserver({ + onDelivered: params.delivery.onDelivered, + payload: preparedPayload, + info, + result, + }); return result; }, onError: params.delivery.onError, diff --git a/src/plugin-sdk/inbound-reply-dispatch.test.ts b/src/plugin-sdk/inbound-reply-dispatch.test.ts index 4c29d37b707..5ae4e7301c6 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.test.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.test.ts @@ -222,6 +222,59 @@ describe("recordInboundSessionAndDispatchReply", () => { expect(deliver).not.toHaveBeenCalled(); }); + it("returns durable no-send results through the SDK compatibility deliverer", async () => { + deliverInboundReplyWithMessageSendContext.mockResolvedValue({ + status: "handled_no_send", + reason: "no_visible_result", + delivery: { + messageIds: [], + visibleReplySent: false, + }, + }); + const recordInboundSession = vi.fn(async () => undefined) as unknown as RecordInboundSession; + const deliver = vi.fn(async () => undefined); + let deliveryResult: unknown; + const dispatchReplyWithBufferedBlockDispatcher = vi.fn(async (params) => { + deliveryResult = await params.dispatcherOptions.deliver( + { text: "cancelled durable" }, + { kind: "final" }, + ); + return { + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }; + }) as DispatchReplyWithBufferedBlockDispatcher; + + await recordInboundSessionAndDispatchReply({ + cfg: {} as OpenClawConfig, + channel: "telegram", + accountId: "default", + agentId: "main", + routeSessionKey: "agent:main:telegram:peer", + storePath: "/tmp/sessions.json", + ctxPayload: { + Body: "body", + RawBody: "body", + CommandBody: "body", + From: "sender", + To: "123", + OriginatingTo: "123", + SessionKey: "agent:main:telegram:peer", + Provider: "telegram", + Surface: "telegram", + } as FinalizedMsgContext, + recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher, + deliver, + durable: { replyToMode: "first" }, + onRecordError: vi.fn(), + onDispatchError: vi.fn(), + }); + + expect(deliveryResult).toMatchObject({ visibleReplySent: false }); + expect(deliver).not.toHaveBeenCalled(); + }); + it("exports shared visible reply dispatch helpers", () => { expect(hasVisibleInboundReplyDispatch(undefined)).toBe(false); expect( diff --git a/src/plugin-sdk/inbound-reply-dispatch.ts b/src/plugin-sdk/inbound-reply-dispatch.ts index 88a48c7e389..f4f849945cb 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.ts @@ -243,10 +243,10 @@ export async function recordChannelMessageReplyDispatch( }); throwIfDurableInboundReplyDeliveryFailed(durable); if (isDurableInboundReplyDeliveryHandled(durable)) { - return; + return durable.delivery; } } - await params.deliver(normalized); + return await params.deliver(normalized); }; await runPreparedChannelTurn({