Files
openclaw/src/auto-reply/reply-payload.ts
sandieman2 c67dc59b02 fix(reply): deliver final reply when queued follow-up claims session; scope dedupe to routed thread (#90943)
* fix(reply): deliver final reply when queued follow-up claims session; scope dedupe to routed thread

Two core bugs caused composed replies to be silently dropped (no delivery,
no error) when a second message arrived in the same thread mid-run:

1. dispatch-from-config: ensureDispatchReplyOperation only kept the
   dispatch-owned operation authoritative while it had no result. Once
   runReplyAgent completed the operation to drain queued follow-ups, a
   second same-thread inbound could claim the session and the first final
   reply would try to re-acquire the lane instead of finishing delivery,
   deadlocking behind the queued work. Keep the dispatch-owned operation
   authoritative through final delivery.

2. reply-payloads-dedupe: messaging-tool reply dedupe compared only the
   channel target, not the routed thread, so a send in one thread could
   suppress a later reply in a different thread. Thread the routed thread
   id through buildReplyPayloads + follow-up delivery and only fall back to
   channel-only matching for providers without a thread-aware suppression
   matcher when neither side carries thread evidence.

Adds regression tests; existing Telegram topic-suppression behavior is
preserved by gating the thread guard to providers lacking a plugin matcher.

* fix(reply): preserve threaded message delivery evidence

* fix(reply): dedupe final payloads by delivery route

* fix(slack): preserve native send thread evidence

* fix(reply): preserve explicit reply thread evidence

* fix(reply): align explicit reply route dedupe

* fix(reply): preserve delivery lane through final dispatch

* fix(mattermost): preserve threaded tool send routes

* chore(plugin-sdk): refresh API baseline

* fix(reply): align final delivery route dedupe

* fix(reply): gate followups on final delivery

* fix(reply): keep send receipts private

* fix(reply): infer implicit message provider

* fix(reply): align routed threading policy

* fix(reply): preserve queued delivery context

* fix(reply): hydrate queued system event routes

* fix(reply): hydrate queued execution routes

* fix(reply): scope final delivery barriers

* fix(slack): preserve DM target aliases

* fix(reply): mirror resolved source thread routes

* fix(mattermost): retain delayed delivery barrier

* fix(codex): separate message routing from tool policy

* fix(reply): consume normalized Slack DM targets once

* fix(slack): remove stale target alias

* style(reply): satisfy changed lint gates

* fix(mattermost): preserve explicit reply targets

* test: align Slack reply branch checks

* fix(reply): persist overflow summaries to admitted session

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
2026-06-14 09:11:05 -07:00

254 lines
9.5 KiB
TypeScript

/** Reply payload contracts and metadata helpers shared by dispatch and channel renderers. */
import type { ReplyToMode } from "../config/types.base.js";
import type {
InteractiveReply,
MessagePresentation,
ReplyPayloadDelivery,
} from "../interactive/payload.js";
/** Channel-agnostic assistant reply payload. */
export type ReplyPayload = {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
/** Internal-only trust signal for gateway webchat local media embedding. */
trustedLocalMedia?: boolean;
/** Treat media as live-only content and avoid persisting the underlying media reference. */
sensitiveMedia?: boolean;
/** Channel-agnostic rich presentation. Core degrades or asks the channel renderer to map it. */
presentation?: MessagePresentation;
/** Channel-agnostic delivery preferences, e.g. pin the sent message when supported. */
delivery?: ReplyPayloadDelivery;
/**
* @deprecated Use presentation.
*
* Internal legacy representation used by existing approval/reply helpers during migration.
*/
interactive?: InteractiveReply;
btw?: {
question: string;
};
replyToId?: string;
replyToTag?: boolean;
/** True when [[reply_to_current]] was present but not yet mapped to a message id. */
replyToCurrent?: boolean;
/** Send audio as voice message (bubble) instead of audio file. Defaults to false. */
audioAsVoice?: boolean;
/**
* Text synthesized into an audio-only TTS payload. Exposed to hooks for
* archival/search use when no visible channel text is sent.
*/
spokenText?: string;
/**
* Marks a TTS media payload as supplemental audio for assistant text that is
* already visible through streaming or transcript projection.
*/
ttsSupplement?: ReplyPayloadTtsSupplement;
isError?: boolean;
/** Marks this payload as a reasoning/thinking block. Channels that do not
* have a dedicated reasoning lane (e.g. WhatsApp, web) should suppress it. */
isReasoning?: boolean;
/** Reasoning stream text is a complete replacement snapshot, not a delta. */
isReasoningSnapshot?: boolean;
/** Marks this payload as a compaction status notice (start/end).
* Should be excluded from TTS transcript accumulation so compaction
* status lines are not synthesised into the spoken assistant reply. */
isCompactionNotice?: boolean;
/** Marks this payload as a model-fallback transition/recovery notice. */
isFallbackNotice?: boolean;
/** Marks this payload as transient status, not assistant answer content. */
isStatusNotice?: boolean;
/** Channel-specific payload data (per-channel envelope). */
channelData?: Record<string, unknown>;
};
/** Metadata for audio-only media that supplements already-visible assistant text. */
export type ReplyPayloadTtsSupplement = {
spokenText: string;
visibleTextAlreadyDelivered?: boolean;
};
/** Reply policy facts that provider adapters use to resolve the final transport route. */
export type ReplyDeliveryContext = {
chatType?: "direct" | "group" | "channel" | null;
replyToMode: ReplyToMode;
};
export const REPLY_MEDIA_FAILURE_WARNING = "⚠️ Media failed.";
/** Appends the standard media failure warning without duplicating it. */
export function appendReplyMediaFailureWarning(text: string | undefined): string {
if (!text?.trim()) {
return REPLY_MEDIA_FAILURE_WARNING;
}
if (text.includes(REPLY_MEDIA_FAILURE_WARNING)) {
return text;
}
return `${text}\n${REPLY_MEDIA_FAILURE_WARNING}`;
}
function normalizeTtsSupplementSpokenText(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value : undefined;
}
function hasReplyPayloadMedia(payload: Pick<ReplyPayload, "mediaUrl" | "mediaUrls">): boolean {
return Boolean(payload.mediaUrl?.trim() || payload.mediaUrls?.some((url) => url.trim()));
}
/** Returns normalized TTS supplement metadata only when the payload has media to carry it. */
export function getReplyPayloadTtsSupplement(
payload: Pick<ReplyPayload, "mediaUrl" | "mediaUrls" | "ttsSupplement">,
): ReplyPayloadTtsSupplement | undefined {
const spokenText = normalizeTtsSupplementSpokenText(payload.ttsSupplement?.spokenText);
if (!spokenText || !hasReplyPayloadMedia(payload)) {
return undefined;
}
return {
spokenText,
...(payload.ttsSupplement?.visibleTextAlreadyDelivered === true
? { visibleTextAlreadyDelivered: true }
: {}),
};
}
/** Returns true when the payload is a valid TTS supplement media payload. */
export function isReplyPayloadTtsSupplement(
payload: Pick<ReplyPayload, "mediaUrl" | "mediaUrls" | "ttsSupplement">,
): boolean {
return Boolean(getReplyPayloadTtsSupplement(payload));
}
/** Marks a reply payload as supplemental TTS media while preserving the original shape. */
export function markReplyPayloadAsTtsSupplement<T extends ReplyPayload>(
payload: T,
spokenText: string = payload.spokenText ?? payload.text ?? "",
options?: { visibleTextAlreadyDelivered?: boolean },
): T {
const normalizedSpokenText = normalizeTtsSupplementSpokenText(spokenText);
if (!normalizedSpokenText) {
return payload;
}
return {
...payload,
spokenText: normalizedSpokenText,
ttsSupplement: {
spokenText: normalizedSpokenText,
...(options?.visibleTextAlreadyDelivered === true
? { visibleTextAlreadyDelivered: true }
: {}),
},
};
}
/** Removes visible-only fields from a payload that should be delivered as TTS supplement media. */
export function buildTtsSupplementMediaPayload(payload: ReplyPayload): ReplyPayload {
const supplement = getReplyPayloadTtsSupplement(payload);
if (!supplement) {
return payload;
}
const {
text: _text,
presentation: _presentation,
interactive: _interactive,
btw: _btw,
...mediaPayload
} = payload;
return {
...mediaPayload,
spokenText: supplement.spokenText,
ttsSupplement: supplement,
};
}
/** WeakMap-backed metadata attached to payload objects without changing wire shape. */
export type ReplyPayloadMetadata = {
assistantMessageIndex?: number;
/** The runtime owns the transcript decision for this assistant payload. */
assistantTranscriptOwned?: boolean;
/** replyToId existed before reply threading could inject an implicit target. */
replyToIdExplicit?: boolean;
/** Canonical reply policy used by both message-tool dedupe and final delivery routing. */
replyDelivery?: ReplyDeliveryContext;
/** Route identity that produced replyDelivery, used to reject stale cross-route policy. */
replyDeliverySource?: {
channel: string;
accountId?: string;
};
/**
* Internal OpenClaw notices generated after a runtime/provider failure are
* not assistant source replies. Dispatch may deliver them even when normal
* assistant source replies are message-tool-only; sendPolicy deny still wins.
*/
deliverDespiteSourceReplySuppression?: boolean;
/**
* A message-tool reply to the active internal UI source. The final payload is
* still the live delivery vehicle; this mirror makes the reply durable for
* chat.history and page reloads without turning the internal UI into an
* outbound channel.
*/
sourceReplyTranscriptMirror?: {
sessionKey: string;
agentId?: string;
text?: string;
mediaUrls?: string[];
idempotencyKey?: string;
};
beforeAgentRunBlocked?: boolean;
/** Warning synthesized from an observed tool error after the run produced assistant output. */
nonTerminalToolErrorWarning?: boolean;
};
const replyPayloadMetadata = new WeakMap<object, ReplyPayloadMetadata>();
/** Adds internal metadata to a reply payload object. */
export function setReplyPayloadMetadata<T extends object>(
payload: T,
metadata: ReplyPayloadMetadata,
): T {
const previous = replyPayloadMetadata.get(payload);
replyPayloadMetadata.set(payload, { ...previous, ...metadata });
return payload;
}
/** Reads internal metadata attached to a reply payload object. */
export function getReplyPayloadMetadata(payload: object): ReplyPayloadMetadata | undefined {
return replyPayloadMetadata.get(payload);
}
/** Returns true when a payload is the synthesized warning for a non-terminal tool error. */
export function isReplyPayloadNonTerminalToolErrorWarning(payload: object): boolean {
return getReplyPayloadMetadata(payload)?.nonTerminalToolErrorWarning === true;
}
/** Copies internal payload metadata when cloning or transforming payload objects. */
export function copyReplyPayloadMetadata<T extends object>(source: object, payload: T): T {
const metadata = getReplyPayloadMetadata(source);
return metadata ? setReplyPayloadMetadata(payload, metadata) : payload;
}
/** Marks a notice payload as deliverable even when normal source replies are suppressed. */
export function markReplyPayloadForSourceSuppressionDelivery<T extends object>(payload: T): T {
return setReplyPayloadMetadata(payload, {
deliverDespiteSourceReplySuppression: true,
});
}
export function markCommandReplyForDelivery(
reply: ReplyPayload | ReplyPayload[] | undefined,
): ReplyPayload | ReplyPayload[] | undefined {
if (!reply) {
return reply;
}
if (Array.isArray(reply)) {
return reply.map((payload) => markReplyPayloadForSourceSuppressionDelivery(payload));
}
return markReplyPayloadForSourceSuppressionDelivery(reply);
}
/** Returns true for internal status/notice payloads, not assistant answer content. */
export function isReplyPayloadStatusNotice(
payload: Pick<ReplyPayload, "isCompactionNotice" | "isFallbackNotice" | "isStatusNotice">,
): boolean {
return Boolean(payload.isCompactionNotice || payload.isFallbackNotice || payload.isStatusNotice);
}