Fix foreground reply fence visibility

This commit is contained in:
Cavit Erginsoy
2026-05-22 22:40:22 +01:00
committed by Peter Steinberger
parent 841cb121fb
commit bd91107fc6
13 changed files with 1052 additions and 141 deletions

View File

@@ -466,6 +466,30 @@ describe("deliverWebReply", () => {
expect(logVerbose).toHaveBeenCalled();
});
it("marks errors visible after accepted media delivery", async () => {
const msg = makeMsg();
const error = new Error("tail send failed");
mockLoadedImageMedia();
vi.mocked(msg.reply).mockRejectedValue(error);
await expect(
deliverWebReply({
replyResult: { text: "captiontail", mediaUrl: "http://example.com/img.jpg" },
msg,
maxMediaBytes: 1024 * 1024,
textLimit: 7,
replyLogger,
skipLog: true,
}),
).rejects.toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(msg.sendMedia).toHaveBeenCalledTimes(1);
expect(msg.reply).toHaveBeenCalled();
});
it("preserves leading indentation after trimming only leading blank lines", async () => {
const msg = makeMsg();

View File

@@ -83,6 +83,20 @@ function createWhatsAppReplyDeliveryReceipt(
});
}
function markWhatsAppVisibleDeliveryError(error: unknown): unknown {
if (typeof error === "object" && error !== null && !Array.isArray(error)) {
try {
Object.assign(error, { sentBeforeError: true, visibleReplySent: true });
return error;
} catch {
// Fall back to a wrapper when a platform error object is non-extensible.
}
}
const visibleError = new Error("visible WhatsApp reply delivery failed", { cause: error });
Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true });
return visibleError;
}
export async function deliverWebReply(params: {
replyResult: ReplyPayload;
normalizedReplyResult?: DeliverableWhatsAppOutboundPayload<ReplyPayload>;
@@ -150,15 +164,22 @@ export async function deliverWebReply(params: {
};
const sendWithRetry = async <T>(fn: () => Promise<T>, label: string, maxAttempts = 3) => {
return await sendWhatsAppOutboundWithRetry({
send: fn,
maxAttempts,
onRetry: ({ attempt, maxAttempts: retryMaxAttempts, backoffMs, errorText }) => {
logVerbose(
`Retrying ${label} to ${msg.from} after failure (${attempt}/${retryMaxAttempts - 1}) in ${backoffMs}ms: ${errorText}`,
);
},
});
try {
return await sendWhatsAppOutboundWithRetry({
send: fn,
maxAttempts,
onRetry: ({ attempt, maxAttempts: retryMaxAttempts, backoffMs, errorText }) => {
logVerbose(
`Retrying ${label} to ${msg.from} after failure (${attempt}/${retryMaxAttempts - 1}) in ${backoffMs}ms: ${errorText}`,
);
},
});
} catch (error: unknown) {
if (sendResults.some((result) => result.providerAccepted)) {
throw markWhatsAppVisibleDeliveryError(error);
}
throw error;
}
};
// Text-only replies

View File

@@ -12,11 +12,27 @@ type CapturedReplyPayload = {
mediaUrls?: string[];
};
type CapturedDispatchParams = {
ctx?: unknown;
dispatcherOptions?: {
deliver?: (
payload: CapturedReplyPayload,
info: { kind: "tool" | "block" | "final" },
) => Promise<unknown>;
onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void;
onSettled?: () => Promise<unknown>;
};
replyOptions?: {
disableBlockStreaming?: boolean;
sourceReplyDeliveryMode?: "automatic" | "message_tool_only";
};
};
const {
dispatchReplyWithBufferedBlockDispatcherMock,
deliverInboundReplyWithMessageSendContextMock,
} = vi.hoisted(() => ({
dispatchReplyWithBufferedBlockDispatcherMock: vi.fn(async (params: { ctx: unknown }) => {
dispatchReplyWithBufferedBlockDispatcherMock: vi.fn(async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } };
}),
@@ -168,37 +184,19 @@ function makeMsg(overrides: Partial<TestMsg> = {}): TestMsg {
}
function getCapturedDeliver() {
return (
capturedDispatchParams as {
dispatcherOptions?: {
deliver?: (
payload: CapturedReplyPayload,
info: { kind: "tool" | "block" | "final" },
) => Promise<void>;
};
}
)?.dispatcherOptions?.deliver;
return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.deliver;
}
function getCapturedOnError() {
return (
capturedDispatchParams as {
dispatcherOptions?: {
onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void;
};
}
)?.dispatcherOptions?.onError;
return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.onError;
}
function getCapturedOnSettled() {
return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.onSettled;
}
function getCapturedReplyOptions() {
return (
capturedDispatchParams as {
replyOptions?: {
disableBlockStreaming?: boolean;
sourceReplyDeliveryMode?: "automatic" | "message_tool_only";
};
}
)?.replyOptions;
return (capturedDispatchParams as CapturedDispatchParams)?.replyOptions;
}
function requireRecord(value: unknown, label: string): Record<string, unknown> {
@@ -597,12 +595,14 @@ describe("whatsapp inbound dispatch", () => {
expect(deliverReply).not.toHaveBeenCalled();
expect(rememberSentText).not.toHaveBeenCalled();
await deliver?.(
{ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] },
{
kind: "tool",
},
);
await expect(
deliver?.(
{ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] },
{
kind: "tool",
},
),
).resolves.toMatchObject({ visibleReplySent: false });
expect(deliverReply).not.toHaveBeenCalled();
expect(rememberSentText).not.toHaveBeenCalled();
@@ -694,7 +694,9 @@ describe("whatsapp inbound dispatch", () => {
});
const deliver = getCapturedDeliver();
await deliver?.({ text: "cancelled by hook" }, { kind: "final" });
expect(await deliver?.({ text: "cancelled by hook" }, { kind: "final" })).toMatchObject({
visibleReplySent: false,
});
const durableParams = requireMockArg(
deliverInboundReplyWithMessageSendContextMock,
@@ -713,6 +715,155 @@ describe("whatsapp inbound dispatch", () => {
expect(rememberSentText).not.toHaveBeenCalled();
});
it("reports deferred media visible only after an accepted flush", async () => {
deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({
status: "handled_no_send",
reason: "no_visible_result",
delivery: {
messageIds: [],
visibleReplySent: false,
},
});
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
await dispatchBufferedReply({
deliverReply,
});
const deliver = getCapturedDeliver();
await expect(
deliver?.({ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, { kind: "tool" }),
).resolves.toMatchObject({ visibleReplySent: false });
await expect(deliver?.({ text: "cancelled final" }, { kind: "final" })).resolves.toMatchObject({
visibleReplySent: true,
});
expect(deliverReply).toHaveBeenCalledTimes(1);
});
it("flushes deferred media through the settled delivery hook", async () => {
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
const rememberSentText = vi.fn();
let settledResult: unknown;
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce(
async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
const deliver = params.dispatcherOptions?.deliver;
if (!deliver) {
throw new Error("expected captured deliver callback");
}
const onSettled = params.dispatcherOptions?.onSettled;
const deferred = await deliver(
{ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] },
{ kind: "tool" },
);
expect(deferred).toMatchObject({ visibleReplySent: false });
settledResult = await onSettled?.();
return {
queuedFinal: false,
counts: { tool: 1, block: 0, final: 0 },
};
},
);
await expect(
dispatchBufferedReply({
deliverReply,
rememberSentText,
}),
).resolves.toBe(true);
expect(settledResult).toMatchObject({ visibleReplySent: true });
expect(getCapturedOnSettled()).toBeTypeOf("function");
expect(deliverReply).toHaveBeenCalledTimes(1);
expectRememberSentContextFields(rememberSentText, undefined, {
combinedBody: "hi",
combinedBodySessionKey: "agent:main:whatsapp:direct:+1000",
});
});
it("marks deferred media flush failures visible after an earlier accepted flush", async () => {
const error = new Error("second deferred media failed");
const deliverReply = vi
.fn()
.mockResolvedValueOnce(acceptedDeliveryResult())
.mockRejectedValueOnce(error);
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce(
async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
const deliver = params.dispatcherOptions?.deliver;
if (!deliver) {
throw new Error("expected captured deliver callback");
}
const onSettled = params.dispatcherOptions?.onSettled;
await deliver({ text: "first image", mediaUrls: ["/tmp/first.jpg"] }, { kind: "tool" });
await deliver({ text: "second image", mediaUrls: ["/tmp/second.jpg"] }, { kind: "tool" });
await onSettled?.();
return {
queuedFinal: false,
counts: { tool: 2, block: 0, final: 0 },
};
},
);
await expect(dispatchBufferedReply({ deliverReply })).rejects.toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(error).toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(deliverReply).toHaveBeenCalledTimes(2);
});
it("marks downstream failures visible after deferred media flushes", async () => {
const error = new Error("durable text failed");
deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({
status: "failed",
error,
});
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
await dispatchBufferedReply({
deliverReply,
});
const deliver = getCapturedDeliver();
await expect(
deliver?.({ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] }, { kind: "tool" }),
).resolves.toMatchObject({ visibleReplySent: false });
await expect(deliver?.({ text: "final text" }, { kind: "final" })).rejects.toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(error).toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(deliverReply).toHaveBeenCalledTimes(1);
});
it("marks durable partial send failures as visible before rethrowing", async () => {
const error = new Error("second chunk failed");
deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({
status: "failed",
error,
sentBeforeError: true,
});
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
await dispatchBufferedReply({
deliverReply,
});
const deliver = getCapturedDeliver();
await expect(deliver?.({ text: "partial final" }, { kind: "final" })).rejects.toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(deliverReply).not.toHaveBeenCalled();
});
it("keeps media replies on the WhatsApp owner delivery path", async () => {
deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({
status: "handled_visible",
@@ -920,15 +1071,7 @@ describe("whatsapp inbound dispatch", () => {
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
const rememberSentText = vi.fn();
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcherOptions?: {
deliver?: (
payload: { text?: string },
info: { kind: "tool" | "block" | "final" },
) => Promise<void>;
};
}) => {
async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
await params.dispatcherOptions?.deliver?.({ text: "partial block" }, { kind: "block" });
return { queuedFinal: false, counts: { tool: 0, block: 1, final: 0 } };
@@ -956,15 +1099,7 @@ describe("whatsapp inbound dispatch", () => {
debug: vi.fn(),
} as unknown as BufferedReplyParams["replyLogger"];
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcherOptions?: {
deliver?: (
payload: { text?: string },
info: { kind: "tool" | "block" | "final" },
) => Promise<void>;
};
}) => {
async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
await params.dispatcherOptions?.deliver?.({ text: "final text" }, { kind: "final" });
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 1 } };
@@ -994,20 +1129,13 @@ describe("whatsapp inbound dispatch", () => {
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
const rememberSentText = vi.fn();
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce(
async (params: {
ctx: unknown;
dispatcherOptions?: {
deliver?: (
payload: CapturedReplyPayload,
info: { kind: "tool" | "block" | "final" },
) => Promise<void>;
};
}) => {
async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
await params.dispatcherOptions?.deliver?.(
{ text: "tool image", mediaUrls: ["/tmp/generated.jpg"] },
{ kind: "tool" },
);
await params.dispatcherOptions?.onSettled?.();
return { queuedFinal: false, counts: { tool: 1, block: 0, final: 0 } };
},
);

View File

@@ -87,6 +87,43 @@ function normalizeErrForLog(err: unknown): unknown {
return err;
}
type WhatsAppReplyDeliveryVisibility = {
visibleReplySent: boolean;
};
function whatsAppReplyDeliveryVisibility(
visibleReplySent: boolean,
): WhatsAppReplyDeliveryVisibility {
return { visibleReplySent };
}
function whatsAppReplyDeliveryVisibilityFromDurableResult(result: {
visibleReplySent?: boolean;
}): WhatsAppReplyDeliveryVisibility {
return whatsAppReplyDeliveryVisibility(result.visibleReplySent === true);
}
function markWhatsAppReplyDeliveryErrorVisible(error: unknown): unknown {
if (typeof error === "object" && error !== null && !Array.isArray(error)) {
try {
Object.assign(error, { sentBeforeError: true, visibleReplySent: true });
return error;
} catch {
// Fall back to a wrapper when a platform error object is non-extensible.
}
}
const visibleError = new Error("visible WhatsApp reply delivery failed", { cause: error });
Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true });
return visibleError;
}
function markWhatsAppReplyDeliveryErrorVisibleAfterFlush(
error: unknown,
flushResult: WhatsAppMediaOnlyFlushResult,
): unknown {
return flushResult.delivered > 0 ? markWhatsAppReplyDeliveryErrorVisible(error) : error;
}
function logWhatsAppReplyDeliveryError(params: {
err: unknown;
info: ReplyDeliveryInfo;
@@ -170,7 +207,7 @@ function shouldDeferWhatsAppMediaOnlyPayload(params: {
}
function createWhatsAppMediaOnlyReplyCoalescer(params: {
deliver: (pending: PendingWhatsAppMediaOnlyPayload) => Promise<void>;
deliver: (pending: PendingWhatsAppMediaOnlyPayload) => Promise<WhatsAppReplyDeliveryVisibility>;
}) {
const pendingMediaOnlyPayloads: PendingWhatsAppMediaOnlyPayload[] = [];
const flushExceptDuplicateMedia = async (
@@ -186,8 +223,14 @@ function createWhatsAppMediaOnlyReplyCoalescer(params: {
flushResult.droppedDuplicateMedia += 1;
continue;
}
await params.deliver(candidate);
flushResult.delivered += 1;
try {
const delivery = await params.deliver(candidate);
if (delivery.visibleReplySent) {
flushResult.delivered += 1;
}
} catch (error: unknown) {
throw markWhatsAppReplyDeliveryErrorVisibleAfterFlush(error, flushResult);
}
}
return flushResult;
};
@@ -511,10 +554,10 @@ export async function dispatchWhatsAppBufferedReply(params: {
const deliverNormalizedPayload = async (
normalizedDeliveryPayload: DeliverableWhatsAppOutboundPayload<ReplyPayload>,
info: ReplyDeliveryInfo,
) => {
): Promise<WhatsAppReplyDeliveryVisibility> => {
const reply = resolveSendableOutboundReplyParts(normalizedDeliveryPayload);
if (!reply.hasMedia && !reply.text.trim()) {
return;
return whatsAppReplyDeliveryVisibility(false);
}
const delivery = await params.deliverReply({
replyResult: normalizedDeliveryPayload,
@@ -542,7 +585,7 @@ export async function dispatchWhatsAppBufferedReply(params: {
},
"auto-reply was not accepted by WhatsApp provider",
);
return;
return whatsAppReplyDeliveryVisibility(false);
}
didSendReply = true;
const shouldLog = normalizedDeliveryPayload.text ? true : undefined;
@@ -557,11 +600,12 @@ export async function dispatchWhatsAppBufferedReply(params: {
const preview = normalizedDeliveryPayload.text != null ? reply.text : "<media>";
logVerbose(`Reply body: ${preview}${reply.hasMedia ? " (media)" : ""} -> ${fromDisplay}`);
}
return whatsAppReplyDeliveryVisibility(true);
};
const mediaOnlyCoalescer = createWhatsAppMediaOnlyReplyCoalescer({
deliver: async (pending) => {
await deliverNormalizedPayload(pending.payload, pending.info);
return await deliverNormalizedPayload(pending.payload, pending.info);
},
});
@@ -584,7 +628,7 @@ export async function dispatchWhatsAppBufferedReply(params: {
deliver: async (payload: ReplyPayload, info: { kind: ReplyLifecycleKind }) => {
const deliveryPayload = resolveWhatsAppDeliverablePayload(payload, info);
if (!deliveryPayload) {
return;
return whatsAppReplyDeliveryVisibility(false);
}
const normalizedOutboundPayload = normalizeWhatsAppOutboundPayload(deliveryPayload, {
normalizeText: normalizeWhatsAppPayloadTextPreservingIndentation,
@@ -595,43 +639,55 @@ export async function dispatchWhatsAppBufferedReply(params: {
: normalizedOutboundPayload;
const reply = resolveSendableOutboundReplyParts(normalizedDeliveryPayload);
if (!reply.hasMedia && !reply.text.trim()) {
return;
return whatsAppReplyDeliveryVisibility(false);
}
if (!reply.hasMedia) {
logWhatsAppMediaOnlyFlushResult(await mediaOnlyCoalescer.flushAll());
const durable = await deliverInboundReplyWithMessageSendContext({
cfg: params.cfg,
channel: "whatsapp",
accountId: params.route.accountId,
agentId: params.route.agentId,
ctxPayload: params.context as FinalizedMsgContext,
payload: normalizedDeliveryPayload,
info,
to: params.msg.from,
formatting: {
textLimit,
tableMode,
chunkMode,
},
});
if (durable.status === "failed") {
throw durable.error;
}
if (durable.status === "handled_visible") {
didSendReply = true;
const shouldLog = normalizedDeliveryPayload.text ? true : undefined;
params.rememberSentText(normalizedDeliveryPayload.text, {
combinedBody: params.context.Body as string | undefined,
combinedBodySessionKey: params.route.sessionKey,
logVerboseMessage: shouldLog,
const flushResult = await mediaOnlyCoalescer.flushAll();
logWhatsAppMediaOnlyFlushResult(flushResult);
try {
const durable = await deliverInboundReplyWithMessageSendContext({
cfg: params.cfg,
channel: "whatsapp",
accountId: params.route.accountId,
agentId: params.route.agentId,
ctxPayload: params.context as FinalizedMsgContext,
payload: normalizedDeliveryPayload,
info,
to: params.msg.from,
formatting: {
textLimit,
tableMode,
chunkMode,
},
});
return;
if (durable.status === "failed") {
if (durable.sentBeforeError === true) {
throw markWhatsAppReplyDeliveryErrorVisible(durable.error);
}
throw durable.error;
}
if (durable.status === "handled_visible") {
didSendReply = true;
const shouldLog = normalizedDeliveryPayload.text ? true : undefined;
params.rememberSentText(normalizedDeliveryPayload.text, {
combinedBody: params.context.Body as string | undefined,
combinedBodySessionKey: params.route.sessionKey,
logVerboseMessage: shouldLog,
});
return whatsAppReplyDeliveryVisibilityFromDurableResult(durable.delivery);
}
if (durable.status === "handled_no_send") {
return flushResult.delivered > 0
? whatsAppReplyDeliveryVisibility(true)
: whatsAppReplyDeliveryVisibilityFromDurableResult(durable.delivery);
}
const delivery = await deliverNormalizedPayload(normalizedDeliveryPayload, info);
return flushResult.delivered > 0 && !delivery.visibleReplySent
? whatsAppReplyDeliveryVisibility(true)
: delivery;
} catch (error: unknown) {
throw markWhatsAppReplyDeliveryErrorVisibleAfterFlush(error, flushResult);
}
if (durable.status === "handled_no_send") {
return;
}
await deliverNormalizedPayload(normalizedDeliveryPayload, info);
return;
}
const mediaUrls = getWhatsAppPayloadMediaUrls(normalizedDeliveryPayload);
if (shouldDeferWhatsAppMediaOnlyPayload({ info, mediaUrls, reply })) {
@@ -640,12 +696,23 @@ export async function dispatchWhatsAppBufferedReply(params: {
mediaUrls,
payload: normalizedDeliveryPayload,
});
return;
return whatsAppReplyDeliveryVisibility(false);
}
logWhatsAppMediaOnlyFlushResult(
await mediaOnlyCoalescer.flushExceptDuplicateMedia(mediaUrls),
);
await deliverNormalizedPayload(normalizedDeliveryPayload, info);
const flushResult = await mediaOnlyCoalescer.flushExceptDuplicateMedia(mediaUrls);
logWhatsAppMediaOnlyFlushResult(flushResult);
try {
const delivery = await deliverNormalizedPayload(normalizedDeliveryPayload, info);
return flushResult.delivered > 0 && !delivery.visibleReplySent
? whatsAppReplyDeliveryVisibility(true)
: delivery;
} catch (error: unknown) {
throw markWhatsAppReplyDeliveryErrorVisibleAfterFlush(error, flushResult);
}
},
onSettled: async () => {
const flushResult = await mediaOnlyCoalescer.flushAll();
logWhatsAppMediaOnlyFlushResult(flushResult);
return whatsAppReplyDeliveryVisibility(flushResult.delivered > 0);
},
onReplyStart: params.msg.sendComposing,
...(statusReactionController
@@ -686,8 +753,6 @@ export async function dispatchWhatsAppBufferedReply(params: {
: {}),
},
});
logWhatsAppMediaOnlyFlushResult(await mediaOnlyCoalescer.flushAll());
const didQueueVisibleReply = hasVisibleInboundReplyDispatch({ queuedFinal, counts });
if (!didQueueVisibleReply) {
if (statusReactionController) {

View File

@@ -1,5 +1,6 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { OutboundDeliveryError } from "../infra/outbound/deliver-types.js";
import { resetGlobalHookRunner } from "../plugins/hook-runner-global.js";
import type { ReplyDispatchBeforeDeliver } from "./reply/reply-dispatcher.js";
import { buildTestCtx } from "./reply/test-ctx.js";
@@ -59,16 +60,22 @@ function buildForegroundCtx(overrides: Partial<MsgContext> = {}): FinalizedMsgCo
function dispatchWithDeliveries(
ctx: FinalizedMsgContext,
deliveries: Delivery[],
dispatcherOptions: { beforeDeliver?: ReplyDispatchBeforeDeliver } = {},
dispatcherOptions: {
beforeDeliver?: ReplyDispatchBeforeDeliver;
deliver?: (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => Promise<unknown>;
onSettled?: () => unknown;
} = {},
) {
return dispatchInboundMessageWithBufferedDispatcher({
ctx,
cfg: {} as OpenClawConfig,
dispatcherOptions: {
...dispatcherOptions,
deliver: async (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => {
deliveries.push({ kind: info.kind, text: payload.text });
},
deliver:
dispatcherOptions.deliver ??
(async (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => {
deliveries.push({ kind: info.kind, text: payload.text });
}),
},
});
}
@@ -129,7 +136,7 @@ describe("foreground reply freshness", () => {
expect(deliveries).toEqual([{ kind: "final", text: "new final" }]);
});
it("suppresses an older foreground final when a newer inbound starts while beforeDeliver is pending", async () => {
it("keeps an older foreground final when a newer inbound has no visible delivery while beforeDeliver is pending", async () => {
const deliveries: Delivery[] = [];
const beforeDeliverStarted = createDeferred<void>();
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
@@ -174,6 +181,294 @@ describe("foreground reply freshness", () => {
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
expect(olderResult).toEqual({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
expect(deliveries).toEqual([{ kind: "final", text: "old rewritten final" }]);
});
it("keeps an older foreground final fenced while a newer visible delivery is unresolved", async () => {
const deliveries: Delivery[] = [];
const beforeDeliverStarted = createDeferred<void>();
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
const newerDeliverStarted = createDeferred<void>();
const releaseNewerDeliver = createDeferred<void>();
const beforeDeliver = vi.fn(() => {
beforeDeliverStarted.resolve();
return releaseBeforeDeliver.promise;
});
hoisted.dispatchReplyFromConfigMock.mockImplementation(
async (params: DispatchReplyFromConfigParams) => {
if (params.ctx.MessageSid === "old-message") {
params.dispatcher.sendFinalReply({ text: "old final" });
return queuedFinalResult();
}
if (params.ctx.MessageSid === "new-message") {
params.dispatcher.sendFinalReply({ text: "new final" });
return queuedFinalResult();
}
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
},
);
const olderDispatch = dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "old-message" }),
deliveries,
{ beforeDeliver },
);
await beforeDeliverStarted.promise;
const newerDispatch = dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "new-message" }),
deliveries,
{
deliver: async (payload, info) => {
newerDeliverStarted.resolve();
await releaseNewerDeliver.promise;
deliveries.push({ kind: info.kind, text: payload.text });
},
},
);
await newerDeliverStarted.promise;
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
await Promise.resolve();
expect(deliveries).toEqual([]);
releaseNewerDeliver.resolve();
const newerResult = await newerDispatch;
const olderResult = await olderDispatch;
expect(beforeDeliver).toHaveBeenCalledTimes(1);
expect(newerResult).toEqual({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
expect(olderResult).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
expect(deliveries).toEqual([{ kind: "final", text: "new final" }]);
});
it("keeps an older foreground final when a newer visible delivery fails", async () => {
const deliveries: Delivery[] = [];
const beforeDeliverStarted = createDeferred<void>();
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
const beforeDeliver = vi.fn(() => {
beforeDeliverStarted.resolve();
return releaseBeforeDeliver.promise;
});
hoisted.dispatchReplyFromConfigMock.mockImplementation(
async (params: DispatchReplyFromConfigParams) => {
if (params.ctx.MessageSid === "old-message") {
params.dispatcher.sendFinalReply({ text: "old final" });
return queuedFinalResult();
}
if (params.ctx.MessageSid === "new-message") {
params.dispatcher.sendFinalReply({ text: "new final" });
return queuedFinalResult();
}
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
},
);
const olderDispatch = dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "old-message" }),
deliveries,
{ beforeDeliver },
);
await beforeDeliverStarted.promise;
const newerResult = await dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "new-message" }),
deliveries,
{
deliver: async () => {
throw new Error("delivery failed");
},
},
);
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
const olderResult = await olderDispatch;
expect(beforeDeliver).toHaveBeenCalledTimes(1);
expect(newerResult).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
failedCounts: { tool: 0, block: 0, final: 1 },
});
expect(olderResult).toEqual({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
expect(deliveries).toEqual([{ kind: "final", text: "old rewritten final" }]);
});
it("suppresses an older foreground final when a newer delivery partially sends before failing", async () => {
const deliveries: Delivery[] = [];
const beforeDeliverStarted = createDeferred<void>();
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
const beforeDeliver = vi.fn(() => {
beforeDeliverStarted.resolve();
return releaseBeforeDeliver.promise;
});
hoisted.dispatchReplyFromConfigMock.mockImplementation(
async (params: DispatchReplyFromConfigParams) => {
if (params.ctx.MessageSid === "old-message") {
params.dispatcher.sendFinalReply({ text: "old final" });
return queuedFinalResult();
}
if (params.ctx.MessageSid === "new-message") {
params.dispatcher.sendFinalReply({ text: "new final" });
return queuedFinalResult();
}
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
},
);
const olderDispatch = dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "old-message" }),
deliveries,
{ beforeDeliver },
);
await beforeDeliverStarted.promise;
const newerResult = await dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "new-message" }),
deliveries,
{
deliver: async (payload, info) => {
deliveries.push({ kind: info.kind, text: payload.text });
throw new OutboundDeliveryError("second chunk failed", {
cause: new Error("second chunk failed"),
results: [{ channel: "whatsapp", messageId: "wa-1" }],
});
},
},
);
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
const olderResult = await olderDispatch;
expect(beforeDeliver).toHaveBeenCalledTimes(1);
expect(newerResult).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
failedCounts: { tool: 0, block: 0, final: 1 },
});
expect(olderResult).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
expect(deliveries).toEqual([{ kind: "final", text: "new final" }]);
});
it("keeps an older foreground final when a newer adapter reports non-visible delivery", async () => {
const deliveries: Delivery[] = [];
const beforeDeliverStarted = createDeferred<void>();
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
const beforeDeliver = vi.fn(() => {
beforeDeliverStarted.resolve();
return releaseBeforeDeliver.promise;
});
hoisted.dispatchReplyFromConfigMock.mockImplementation(
async (params: DispatchReplyFromConfigParams) => {
if (params.ctx.MessageSid === "old-message") {
params.dispatcher.sendFinalReply({ text: "old final" });
return queuedFinalResult();
}
if (params.ctx.MessageSid === "new-message") {
params.dispatcher.sendFinalReply({ text: "new final" });
return queuedFinalResult();
}
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
},
);
const olderDispatch = dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "old-message" }),
deliveries,
{ beforeDeliver },
);
await beforeDeliverStarted.promise;
const newerResult = await dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "new-message" }),
deliveries,
{
deliver: async () => ({ visibleReplySent: false }),
},
);
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
const olderResult = await olderDispatch;
expect(beforeDeliver).toHaveBeenCalledTimes(1);
expect(newerResult).toEqual({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
expect(olderResult).toEqual({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
expect(deliveries).toEqual([{ kind: "final", text: "old rewritten final" }]);
});
it("suppresses an older foreground final when a newer settled hook reports visible delivery", async () => {
const deliveries: Delivery[] = [];
const beforeDeliverStarted = createDeferred<void>();
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
const beforeDeliver = vi.fn(() => {
beforeDeliverStarted.resolve();
return releaseBeforeDeliver.promise;
});
hoisted.dispatchReplyFromConfigMock.mockImplementation(
async (params: DispatchReplyFromConfigParams) => {
if (params.ctx.MessageSid === "old-message") {
params.dispatcher.sendFinalReply({ text: "old final" });
return queuedFinalResult();
}
if (params.ctx.MessageSid === "new-message") {
params.dispatcher.sendFinalReply({ text: "new final" });
return queuedFinalResult();
}
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
},
);
const olderDispatch = dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "old-message" }),
deliveries,
{ beforeDeliver },
);
await beforeDeliverStarted.promise;
const newerResult = await dispatchWithDeliveries(
buildForegroundCtx({ MessageSid: "new-message" }),
deliveries,
{
deliver: async () => ({ visibleReplySent: false }),
onSettled: async () => ({ visibleReplySent: true }),
},
);
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
const olderResult = await olderDispatch;
expect(beforeDeliver).toHaveBeenCalledTimes(1);
expect(newerResult).toEqual({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
expect(olderResult).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
@@ -181,6 +476,31 @@ describe("foreground reply freshness", () => {
expect(deliveries).toEqual([]);
});
it("runs the settled delivery hook when dispatch fails after queueing a reply", async () => {
const deliveries: Delivery[] = [];
let settled = false;
const error = new Error("resolver failed");
hoisted.dispatchReplyFromConfigMock.mockImplementation(
async (params: DispatchReplyFromConfigParams) => {
params.dispatcher.sendFinalReply({ text: "queued final" });
throw error;
},
);
await expect(
dispatchWithDeliveries(buildForegroundCtx(), deliveries, {
deliver: async () => ({ visibleReplySent: false }),
onSettled: () => {
settled = true;
return { visibleReplySent: true };
},
}),
).rejects.toBe(error);
expect(settled).toBe(true);
});
it("keeps concurrent foreground finals isolated for different targets sharing a session", async () => {
const deliveries: Delivery[] = [];
const firstStarted = createDeferred<void>();

View File

@@ -9,7 +9,9 @@ import {
measureDiagnosticsTimelineSpan,
measureDiagnosticsTimelineSpanSync,
} from "../infra/diagnostics-timeline.js";
import { isOutboundDeliveryError } from "../infra/outbound/deliver-types.js";
import { logMessageReceived } from "../logging/diagnostic.js";
import { hasOutboundReplyContent } from "../plugin-sdk/reply-payload.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import type { SilentReplyConversationType } from "../shared/silent-reply-policy.js";
import {
@@ -34,7 +36,10 @@ import type { GetReplyOptions, ReplyPayload } from "./types.js";
type ForegroundReplyFenceState = {
generation: number;
visibleDeliveryGeneration: number;
activeDispatches: number;
activeGenerations: Map<number, number>;
waiters: Set<() => void>;
};
type ForegroundReplyFenceSnapshot = {
@@ -87,10 +92,17 @@ function beginForegroundReplyFence(
}
const state = foregroundReplyFenceByKey.get(key) ?? {
generation: 0,
visibleDeliveryGeneration: 0,
activeDispatches: 0,
activeGenerations: new Map<number, number>(),
waiters: new Set<() => void>(),
};
state.generation += 1;
state.activeDispatches += 1;
state.activeGenerations.set(
state.generation,
(state.activeGenerations.get(state.generation) ?? 0) + 1,
);
foregroundReplyFenceByKey.set(key, state);
return {
key,
@@ -98,21 +110,142 @@ function beginForegroundReplyFence(
};
}
function isForegroundReplyFenceSuperseded(
snapshot: ForegroundReplyFenceSnapshot | undefined,
function notifyForegroundReplyFenceWaiters(state: ForegroundReplyFenceState): void {
const waiters = [...state.waiters];
state.waiters.clear();
for (const resolve of waiters) {
resolve();
}
}
function hasNewerActiveForegroundReplyFenceGeneration(
state: ForegroundReplyFenceState,
generation: number,
): boolean {
return Boolean(
snapshot &&
(foregroundReplyFenceByKey.get(snapshot.key)?.generation ?? 0) !== snapshot.generation,
for (const [activeGeneration, count] of state.activeGenerations) {
if (activeGeneration > generation && count > 0) {
return true;
}
}
return false;
}
async function shouldCancelForegroundReplyDelivery(
snapshot: ForegroundReplyFenceSnapshot | undefined,
): Promise<boolean> {
if (!snapshot) {
return false;
}
while (true) {
const state = foregroundReplyFenceByKey.get(snapshot.key);
if (!state) {
return false;
}
if (state.visibleDeliveryGeneration > snapshot.generation) {
return true;
}
if (!hasNewerActiveForegroundReplyFenceGeneration(state, snapshot.generation)) {
return false;
}
await new Promise<void>((resolve) => {
state.waiters.add(resolve);
});
}
}
function markForegroundReplyFenceVisibleDelivery(
snapshot: ForegroundReplyFenceSnapshot | undefined,
payload: ReplyPayload,
deliveryResult: unknown,
): void {
if (!snapshot || !hasOutboundReplyContent(payload, { trimText: true })) {
return;
}
if (isExplicitlyNonVisibleDelivery(deliveryResult)) {
return;
}
markForegroundReplyFenceVisibleDeliveryGeneration(snapshot);
}
function markForegroundReplyFenceVisibleDeliveryGeneration(
snapshot: ForegroundReplyFenceSnapshot | undefined,
): void {
if (!snapshot) {
return;
}
const state = foregroundReplyFenceByKey.get(snapshot.key);
if (!state) {
return;
}
state.visibleDeliveryGeneration = Math.max(state.visibleDeliveryGeneration, snapshot.generation);
notifyForegroundReplyFenceWaiters(state);
}
function isExplicitlyNonVisibleDelivery(deliveryResult: unknown): boolean {
return (
typeof deliveryResult === "object" &&
deliveryResult !== null &&
!Array.isArray(deliveryResult) &&
"visibleReplySent" in deliveryResult &&
(deliveryResult as { visibleReplySent?: unknown }).visibleReplySent === false
);
}
function isExplicitlyVisibleDelivery(deliveryResult: unknown): boolean {
return (
typeof deliveryResult === "object" &&
deliveryResult !== null &&
!Array.isArray(deliveryResult) &&
(deliveryResult as { visibleReplySent?: unknown }).visibleReplySent === true
);
}
function isVisiblePartialDeliveryError(error: unknown): boolean {
if (isOutboundDeliveryError(error)) {
return error.sentBeforeError;
}
return (
typeof error === "object" &&
error !== null &&
!Array.isArray(error) &&
((error as { visibleReplySent?: unknown }).visibleReplySent === true ||
(error as { sentBeforeError?: unknown }).sentBeforeError === true)
);
}
async function runForegroundReplyFenceSettledDelivery(
snapshot: ForegroundReplyFenceSnapshot | undefined,
onSettled: (() => unknown) | undefined,
): Promise<void> {
if (!onSettled) {
return;
}
try {
const deliveryResult = await onSettled();
if (isExplicitlyVisibleDelivery(deliveryResult)) {
markForegroundReplyFenceVisibleDeliveryGeneration(snapshot);
}
} catch (err: unknown) {
if (isVisiblePartialDeliveryError(err)) {
markForegroundReplyFenceVisibleDeliveryGeneration(snapshot);
}
throw err;
}
}
function endForegroundReplyFence(snapshot: ForegroundReplyFenceSnapshot): void {
const state = foregroundReplyFenceByKey.get(snapshot.key);
if (!state) {
return;
}
const activeGenerationCount = state.activeGenerations.get(snapshot.generation) ?? 0;
if (activeGenerationCount <= 1) {
state.activeGenerations.delete(snapshot.generation);
} else {
state.activeGenerations.set(snapshot.generation, activeGenerationCount - 1);
}
state.activeDispatches -= 1;
notifyForegroundReplyFenceWaiters(state);
if (state.activeDispatches <= 0) {
foregroundReplyFenceByKey.delete(snapshot.key);
}
@@ -306,21 +439,39 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
const beforeDeliver: ReplyDispatchBeforeDeliver | undefined =
foregroundReplyFence || configuredBeforeDeliver
? async (payload, info) => {
if (isForegroundReplyFenceSuperseded(foregroundReplyFence)) {
if (await shouldCancelForegroundReplyDelivery(foregroundReplyFence)) {
return null;
}
const deliverPayload = configuredBeforeDeliver
? await configuredBeforeDeliver(payload, info)
: payload;
if (!deliverPayload || isForegroundReplyFenceSuperseded(foregroundReplyFence)) {
if (
!deliverPayload ||
(await shouldCancelForegroundReplyDelivery(foregroundReplyFence))
) {
return null;
}
return deliverPayload;
}
: undefined;
const deliver: ReplyDispatcherWithTypingOptions["deliver"] = async (payload, info) => {
try {
const result = await params.dispatcherOptions.deliver(payload, info);
markForegroundReplyFenceVisibleDelivery(foregroundReplyFence, payload, result);
return result;
} catch (err: unknown) {
if (isVisiblePartialDeliveryError(err)) {
markForegroundReplyFenceVisibleDelivery(foregroundReplyFence, payload, {
visibleReplySent: true,
});
}
throw err;
}
};
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
createReplyDispatcherWithTyping({
...params.dispatcherOptions,
deliver,
beforeDeliver,
silentReplyContext: params.dispatcherOptions.silentReplyContext ?? silentReplyContext,
});
@@ -336,11 +487,18 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
},
});
} finally {
if (foregroundReplyFence) {
endForegroundReplyFence(foregroundReplyFence);
try {
await runForegroundReplyFenceSettledDelivery(
foregroundReplyFence,
params.dispatcherOptions.onSettled,
);
} finally {
if (foregroundReplyFence) {
endForegroundReplyFence(foregroundReplyFence);
}
markRunComplete();
markDispatchIdle();
}
markRunComplete();
markDispatchIdle();
}
}

View File

@@ -81,6 +81,7 @@ export type ReplyDispatcherWithTypingOptions = Omit<ReplyDispatcherOptions, "onI
typingCallbacks?: TypingCallbacks;
onReplyStart?: () => Promise<void> | void;
onIdle?: () => void;
onSettled?: () => unknown;
/** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */
onCleanup?: () => void;
};

View File

@@ -215,6 +215,7 @@ describe("durable inbound reply delivery", () => {
}),
});
expect(result).toEqual({ status: "failed", error });
expect(result).toEqual({ status: "failed", error, sentBeforeError: true });
expect(error).toMatchObject({ sentBeforeError: true, visibleReplySent: true });
});
});

View File

@@ -48,7 +48,7 @@ export type DurableInboundReplyDeliveryResult =
}
| { status: "handled_visible"; delivery: ChannelDeliveryResult }
| { status: "handled_no_send"; reason: "no_visible_result"; delivery: ChannelDeliveryResult }
| { status: "failed"; error: unknown };
| { status: "failed"; error: unknown; sentBeforeError?: true };
function resolveDeliveryTarget(params: DurableInboundReplyDeliveryParams): string | undefined {
return (
@@ -106,10 +106,23 @@ export function throwIfDurableInboundReplyDeliveryFailed(
result: DurableInboundReplyDeliveryResult,
): void {
if (result.status === "failed") {
throw result.error;
throw result.sentBeforeError === true
? markDurableInboundReplyDeliveryErrorVisible(result.error)
: result.error;
}
}
function markDurableInboundReplyDeliveryErrorVisible(error: unknown): unknown {
if (typeof error === "object" && error !== null && Object.isExtensible(error)) {
Object.assign(error, { sentBeforeError: true, visibleReplySent: true });
return error;
}
const visibleError = new Error("visible durable reply delivery failed", { cause: error });
Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true });
return visibleError;
}
export async function deliverInboundReplyWithMessageSendContext(
params: DurableInboundReplyDeliveryParams,
): Promise<DurableInboundReplyDeliveryResult> {
@@ -192,7 +205,11 @@ export async function deliverInboundReplyWithMessageSendContext(
return { status: "failed" as const, error: send.error };
}
if (send.status === "partial_failed") {
return { status: "failed" as const, error: send.error };
return {
status: "failed" as const,
error: markDurableInboundReplyDeliveryErrorVisible(send.error),
sentBeforeError: true,
};
}
const delivery = createChannelDeliveryResultFromReceipt({

View File

@@ -401,6 +401,78 @@ describe("channel turn kernel", () => {
expect(deliver).not.toHaveBeenCalled();
});
it("preserves durable partial-send visibility when generic delivery throws", async () => {
const error = new Error("second chunk failed");
sendDurableMessageBatch.mockResolvedValueOnce({
status: "partial_failed",
results: [{ channel: "telegram", messageId: "tg-1" }],
receipt: {
primaryPlatformMessageId: "tg-1",
platformMessageIds: ["tg-1"],
parts: [{ platformMessageId: "tg-1", kind: "text", index: 0 }],
sentAt: 1,
},
error,
sentBeforeError: true,
});
const deliver = vi.fn(async () => ({ messageIds: ["legacy-1"], visibleReplySent: true }));
const dispatchReplyWithBufferedBlockDispatcher = createDispatch();
await expect(
dispatchAssembledChannelTurn({
cfg,
channel: "telegram",
accountId: "acct",
agentId: "main",
routeSessionKey: "agent:main:telegram:peer",
storePath: "/tmp/sessions.json",
ctxPayload: createCtx({ To: "123", OriginatingTo: "123" }),
recordInboundSession: createRecordInboundSession(),
dispatchReplyWithBufferedBlockDispatcher,
delivery: { deliver, durable: { replyToMode: "first" } },
}),
).rejects.toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(deliver).not.toHaveBeenCalled();
});
it("preserves visible delivery when post-delivery observers throw", async () => {
const error = new Error("observer failed");
const deliver = vi.fn(async () => ({ messageIds: ["local-1"], visibleReplySent: true }));
const dispatchReplyWithBufferedBlockDispatcher = createDispatch();
await expect(
dispatchAssembledChannelTurn({
cfg,
channel: "telegram",
accountId: "acct",
agentId: "main",
routeSessionKey: "agent:main:telegram:peer",
storePath: "/tmp/sessions.json",
ctxPayload: createCtx({ To: "123", OriginatingTo: "123" }),
recordInboundSession: createRecordInboundSession(),
dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver,
durable: false,
onDelivered: () => {
throw error;
},
},
}),
).rejects.toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
expect(error).toMatchObject({
sentBeforeError: true,
visibleReplySent: true,
});
});
it("returns custom delivery result to the buffered dispatcher", async () => {
let deliveredResult: unknown;
const dispatchReplyWithBufferedBlockDispatcher = vi.fn(

View File

@@ -263,6 +263,47 @@ function resolveObserveOnlyDispatchResult<TDispatchResult>(
}) as TDispatchResult;
}
function isExplicitlyNonVisibleChannelDelivery(result: unknown): boolean {
return (
typeof result === "object" &&
result !== null &&
!Array.isArray(result) &&
(result as { visibleReplySent?: unknown }).visibleReplySent === false
);
}
function markChannelDeliveryErrorVisible(error: unknown): unknown {
if (typeof error === "object" && error !== null && !Array.isArray(error)) {
try {
Object.assign(error, { sentBeforeError: true, visibleReplySent: true });
return error;
} catch {
// Fall back to a wrapper when a platform error object is non-extensible.
}
}
const visibleError = new Error("visible channel reply delivery failed", { cause: error });
Object.assign(visibleError, { sentBeforeError: true, visibleReplySent: true });
return visibleError;
}
async function runChannelDeliveryObserver(params: {
onDelivered: ChannelEventDeliveryAdapter["onDelivered"] | undefined;
payload: ReplyPayload;
info: Parameters<NonNullable<ChannelEventDeliveryAdapter["onDelivered"]>>[1];
result: Parameters<NonNullable<ChannelEventDeliveryAdapter["onDelivered"]>>[2];
}): Promise<void> {
if (!params.onDelivered) {
return;
}
try {
await params.onDelivered(params.payload, params.info, params.result);
} catch (error: unknown) {
throw isExplicitlyNonVisibleChannelDelivery(params.result)
? error
: markChannelDeliveryErrorVisible(error);
}
}
function resolveBotLoopProtectionDrop<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): ChannelTurnResult<TDispatchResult> | undefined {
@@ -358,12 +399,22 @@ export async function dispatchAssembledChannelTurn(
});
throwIfDurableInboundReplyDeliveryFailed(durable);
if (isDurableInboundReplyDeliveryHandled(durable)) {
await params.delivery.onDelivered?.(preparedPayload, info, durable.delivery);
await runChannelDeliveryObserver({
onDelivered: params.delivery.onDelivered,
payload: preparedPayload,
info,
result: durable.delivery,
});
return durable.delivery;
}
}
const result = await params.delivery.deliver(preparedPayload, info);
await params.delivery.onDelivered?.(preparedPayload, info, result);
await runChannelDeliveryObserver({
onDelivered: params.delivery.onDelivered,
payload: preparedPayload,
info,
result,
});
return result;
},
onError: params.delivery.onError,

View File

@@ -222,6 +222,59 @@ describe("recordInboundSessionAndDispatchReply", () => {
expect(deliver).not.toHaveBeenCalled();
});
it("returns durable no-send results through the SDK compatibility deliverer", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_no_send",
reason: "no_visible_result",
delivery: {
messageIds: [],
visibleReplySent: false,
},
});
const recordInboundSession = vi.fn(async () => undefined) as unknown as RecordInboundSession;
const deliver = vi.fn(async () => undefined);
let deliveryResult: unknown;
const dispatchReplyWithBufferedBlockDispatcher = vi.fn(async (params) => {
deliveryResult = await params.dispatcherOptions.deliver(
{ text: "cancelled durable" },
{ kind: "final" },
);
return {
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
};
}) as DispatchReplyWithBufferedBlockDispatcher;
await recordInboundSessionAndDispatchReply({
cfg: {} as OpenClawConfig,
channel: "telegram",
accountId: "default",
agentId: "main",
routeSessionKey: "agent:main:telegram:peer",
storePath: "/tmp/sessions.json",
ctxPayload: {
Body: "body",
RawBody: "body",
CommandBody: "body",
From: "sender",
To: "123",
OriginatingTo: "123",
SessionKey: "agent:main:telegram:peer",
Provider: "telegram",
Surface: "telegram",
} as FinalizedMsgContext,
recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher,
deliver,
durable: { replyToMode: "first" },
onRecordError: vi.fn(),
onDispatchError: vi.fn(),
});
expect(deliveryResult).toMatchObject({ visibleReplySent: false });
expect(deliver).not.toHaveBeenCalled();
});
it("exports shared visible reply dispatch helpers", () => {
expect(hasVisibleInboundReplyDispatch(undefined)).toBe(false);
expect(

View File

@@ -243,10 +243,10 @@ export async function recordChannelMessageReplyDispatch(
});
throwIfDurableInboundReplyDeliveryFailed(durable);
if (isDurableInboundReplyDeliveryHandled(durable)) {
return;
return durable.delivery;
}
}
await params.deliver(normalized);
return await params.deliver(normalized);
};
await runPreparedChannelTurn({