mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:10:44 +00:00
fix(bluebubbles): add opt-in coalesceSameSenderDms for split-send DMs
Two distinct user sends to a DM — a command followed by a pasted URL that iMessage renders as a standalone URL-balloon message — are delivered by Apple/BlueBubbles as two separate webhooks ~0.8-2.0 s apart. Without coalescing, the agent replies to the command alone before the URL arrives (the dump skill sees an empty payload), and the URL lands as a second queued turn. Opt-in behind channels.bluebubbles.coalesceSameSenderDms (default false). When set, DM messages with no associatedMessageGuid hash to dm:<chat>:<sender> so the debounce window merges text + URL-balloon into one merged agent turn. Group chats and legacy text+balloon pairs linked via associatedMessageGuid keep per-message keys. The default inbound debounce window widens from 500 ms to 2500 ms when the flag is on and no explicit messages.inbound.byChannel.bluebubbles is set, covering Apple's observed split-send cadence. Merged output is bounded (<=4000 chars text with an explicit truncation marker, <=20 attachments, first-plus-latest sampling beyond 10 source entries). Every source messageId folded into the merged view is committed to the inbound dedupe store after processing, so a later BlueBubbles MessagePoller replay of any individual source event is recognized as a duplicate. Tests (506 BlueBubbles tests passing, multiple new cases covering the on/off matrix, control-command override, orphan URL-balloon, group-chat preservation, default-window widening, replay-of-any-source-id dedupe, and the 25-message flood bound). Smoke tested end-to-end against live BlueBubbles webhook traffic: Dump+URL composed as one iMessage -> BB dispatches two webhooks ~1 s apart -> debouncer coalesces both under dm:<chat>:<sender> -> dump skill runs on merged payload in one turn. Also updates docs/channels/bluebubbles.md with a scenarios table, enablement guide, trade-offs, and three-layer troubleshooting checklist; docs/concepts/messages.md cross-references the control-command debounce exception.
This commit is contained in:
@@ -75,6 +75,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Codex/app-server: release the session lane when a downstream consumer throws while draining the `turn/completed` notification, so follow-up messages after a Codex plugin reply stop queueing behind a stale lane lock. Fixes #67996. (#69072) Thanks @ayeshakhalid192007-dev.
|
||||
- Codex/app-server: default approval handling to `on-request` so Codex harness sessions do not start with overly permissive tool approvals. (#68721) Thanks @Lucenx9.
|
||||
- Cron/delivery: keep isolated cron chat delivery tools available, resolve `channel: "last"` targets from the gateway, show delivery previews in `cron list/show`, and avoid duplicate fallback sends after direct message-tool delivery. (#69587) Thanks @obviyus.
|
||||
- BlueBubbles: add opt-in `channels.bluebubbles.coalesceSameSenderDms` so a single composed message with text + pasted URL (which Apple splits into two webhooks ~0.8-2.0 s apart) arrives as one agent turn instead of two. When enabled, DM messages that are not linked via `associatedMessageGuid` hash to `dm:<chat>:<sender>` so the inbound debounce window merges them into a single merged turn — including URL-preview balloon events, DM control-command sends (which normally bypass debouncing), and rapid same-sender follow-ups. The default inbound debounce window widens from 500 ms to 2500 ms when the flag is set without an explicit `messages.inbound.byChannel.bluebubbles`, covering the observed Apple split-send cadence. Every source `messageId` folded into the merged view is committed to the inbound dedupe store after processing, so a later MessagePoller replay of any individual source event is recognized as a duplicate. Merged output is bounded (≤4000 chars text with an explicit `…[truncated]` marker, ≤20 attachments, first-plus-latest sampling beyond 10 source entries) so a rapid-fire flood inside the window cannot amplify the downstream prompt. Group chats and existing text+balloon follow-ups continue to key per-message. See [Coalescing split-send DMs](https://docs.openclaw.ai/channels/bluebubbles#coalescing-split-send-dms-command--url-in-one-composition) for scenarios, tuning, and troubleshooting. (#69258) Thanks @omarshahine.
|
||||
- Cron/Telegram: key isolated direct-delivery dedupe to each cron execution instead of the reused session id, so recurring Telegram announce runs no longer report delivered while silently skipping later sends. (#69000) Thanks @obviyus.
|
||||
- Models/Kimi: default bundled Kimi thinking to off and normalize Anthropic-compatible `thinking` payloads so stale session `/think` state no longer silently re-enables reasoning on Kimi runs. (#68907) Thanks @frankekn.
|
||||
- Control UI/cron: keep the runtime-only `last` delivery sentinel from being materialized into persisted cron delivery and failure-alert channel configs when jobs are created or edited. (#68829) Thanks @tianhaocui.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
e3a16ceb9e933c5b707b717c18a1d9d50f98e687a98e6c35f4f3a290f7036a62 config-baseline.json
|
||||
ae1ab87635e7bf613c84fee04425af901ceeb67fb5dbcf1c74095aa00a59ee88 config-baseline.core.json
|
||||
e239cc20f20f8d0172812bc0ad3ee6df52da88e2e2702e3d03a47e01561132ae config-baseline.channel.json
|
||||
8fb3a1cf5fe56ab8fc2cb46341c3403aed32b0d1f0aaeac0e96cd3599db4f06e config-baseline.plugin.json
|
||||
cc473bcd00e63c3d3f351e4de1ceb390aae88dddce8616929e98a9d94412b1b9 config-baseline.json
|
||||
7956c319e82d288d496a51cb2ff4485ab72ef4900cb089f99e1df8b9ef3bfb73 config-baseline.core.json
|
||||
cd467228990cdbdebde2fa87d8b1384b94c149e791f2e67250bf17b13162d4a1 config-baseline.channel.json
|
||||
17a73724e5082b3aa846c220d38115916fb6003887439e6794510a99fc73f7de config-baseline.plugin.json
|
||||
|
||||
@@ -393,6 +393,103 @@ Use full IDs for durable automations and storage:
|
||||
|
||||
See [Configuration](/gateway/configuration) for template variables.
|
||||
|
||||
## Coalescing split-send DMs (command + URL in one composition)
|
||||
|
||||
When a user types a command and a URL together in iMessage — e.g. `Dump https://example.com/article` — Apple splits the send into **two separate webhook deliveries**:
|
||||
|
||||
1. A text message (`"Dump"`).
|
||||
2. A URL-preview balloon (`"https://..."`) with OG-preview images as attachments.
|
||||
|
||||
The two webhooks arrive at OpenClaw ~0.8-2.0 s apart on most setups. Without coalescing, the agent receives the command alone on turn 1, replies (often "send me the URL"), and only sees the URL on turn 2 — at which point the command context is already lost.
|
||||
|
||||
`channels.bluebubbles.coalesceSameSenderDms` opts a DM into merging consecutive same-sender webhooks into a single agent turn. Group chats continue to key per-message so multi-user turn structure is preserved.
|
||||
|
||||
### When to enable
|
||||
|
||||
Enable when:
|
||||
|
||||
- You ship skills that expect `command + payload` in one message (dump, paste, save, queue, etc.).
|
||||
- Your users paste URLs, images, or long content alongside commands.
|
||||
- You can accept the added DM turn latency (see below).
|
||||
|
||||
Leave disabled when:
|
||||
|
||||
- You need minimum command latency for single-word DM triggers.
|
||||
- All your flows are one-shot commands without payload follow-ups.
|
||||
|
||||
### Enabling
|
||||
|
||||
```json5
|
||||
{
|
||||
channels: {
|
||||
bluebubbles: {
|
||||
coalesceSameSenderDms: true, // opt in (default: false)
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
With the flag on and no explicit `messages.inbound.byChannel.bluebubbles`, the debounce window widens to **2500 ms** (the default for non-coalescing is 500 ms). The wider window is required — Apple's split-send cadence of 0.8-2.0 s does not fit in the tighter default.
|
||||
|
||||
To tune the window yourself:
|
||||
|
||||
```json5
|
||||
{
|
||||
messages: {
|
||||
inbound: {
|
||||
byChannel: {
|
||||
// 2500 ms works for most setups; raise to 4000 ms if your Mac is slow
|
||||
// or under memory pressure (observed gap can stretch past 2 s then).
|
||||
bluebubbles: 2500,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
### Trade-offs
|
||||
|
||||
- **Added latency for DM control commands.** With the flag on, DM control-command messages (like `Dump`, `Save`, etc.) now wait up to the debounce window before dispatching, in case a payload webhook is coming. Group-chat commands keep instant dispatch.
|
||||
- **Merged output is bounded** — merged text caps at 4000 chars with an explicit `…[truncated]` marker; attachments cap at 20; source entries cap at 10 (first-plus-latest retained beyond that). Every source `messageId` still reaches inbound-dedupe so a later MessagePoller replay of any individual event is recognized as a duplicate.
|
||||
- **Opt-in, per-channel.** Other channels (Telegram, WhatsApp, Slack, …) are unaffected.
|
||||
|
||||
### Scenarios and what the agent sees
|
||||
|
||||
| User composes | Apple delivers | Flag off (default) | Flag on + 2500 ms window |
|
||||
| ------------------------------------------------------------------ | ------------------------- | --------------------------------------- | ----------------------------------------------------------------------- |
|
||||
| `Dump https://example.com` (one send) | 2 webhooks ~1 s apart | Two agent turns: "Dump" alone, then URL | One turn: merged text `Dump https://example.com` |
|
||||
| `Save this 📎image.jpg caption` (attachment + text) | 2 webhooks | Two turns | One turn: text + image |
|
||||
| `/status` (standalone command) | 1 webhook | Instant dispatch | **Wait up to window, then dispatch** |
|
||||
| URL pasted alone | 1 webhook | Instant dispatch | Instant dispatch (only one entry in bucket) |
|
||||
| Text + URL sent as two deliberate separate messages, minutes apart | 2 webhooks outside window | Two turns | Two turns (window expires between them) |
|
||||
| Rapid flood (>10 small DMs inside window) | N webhooks | N turns | One turn, bounded output (first + latest, text/attachment caps applied) |
|
||||
|
||||
### Split-send coalescing troubleshooting
|
||||
|
||||
If the flag is on and split-sends still arrive as two turns, check each layer:
|
||||
|
||||
1. **Config actually loaded.**
|
||||
|
||||
```
|
||||
grep coalesceSameSenderDms ~/.openclaw/openclaw.json
|
||||
```
|
||||
|
||||
Then `openclaw gateway restart` — the flag is read at debouncer-registry creation.
|
||||
|
||||
2. **Debounce window wide enough for your setup.** Look at the BlueBubbles server log under `~/Library/Logs/bluebubbles-server/main.log`:
|
||||
|
||||
```
|
||||
grep -E "Dispatching event to webhook" main.log | tail -20
|
||||
```
|
||||
|
||||
Measure the gap between the `"Dump"`-style text dispatch and the `"https://..."; Attachments:` dispatch that follows. Raise `messages.inbound.byChannel.bluebubbles` to comfortably cover that gap.
|
||||
|
||||
3. **Session JSONL timestamps ≠ webhook arrival.** Session event timestamps (`~/.openclaw/agents/<id>/sessions/*.jsonl`) reflect when the gateway hands a message to the agent, **not** when the webhook arrived. A queued-second message tagged `[Queued messages while agent was busy]` means the first turn was still running when the second webhook arrived — the coalesce bucket had already flushed. Tune the window against the BB server log, not the session log.
|
||||
|
||||
4. **Memory pressure slowing reply dispatch.** On smaller machines (8 GB), agent turns can take long enough that the coalesce bucket flushes before the reply completes, and the URL lands as a queued second turn. Check `memory_pressure` and `ps -o rss -p $(pgrep openclaw-gateway)`; if the gateway is over ~500 MB RSS and the compressor is active, close other heavy processes or bump to a larger host.
|
||||
|
||||
5. **Reply-quote sends are a different path.** If the user tapped `Dump` as a **reply** to an existing URL-balloon (iMessage shows a "1 Reply" badge on the Dump bubble), the URL lives in `replyToBody`, not in a second webhook. Coalescing does not apply — that's a skill/prompt concern, not a debouncer concern.
|
||||
|
||||
## Block streaming
|
||||
|
||||
Control whether responses are sent as a single message or streamed in blocks:
|
||||
@@ -436,6 +533,7 @@ Provider options:
|
||||
- `channels.bluebubbles.chunkMode`: `length` (default) splits only when exceeding `textChunkLimit`; `newline` splits on blank lines (paragraph boundaries) before length chunking.
|
||||
- `channels.bluebubbles.mediaMaxMb`: Inbound/outbound media cap in MB (default: 8).
|
||||
- `channels.bluebubbles.mediaLocalRoots`: Explicit allowlist of absolute local directories permitted for outbound local media paths. Local path sends are denied by default unless this is configured. Per-account override: `channels.bluebubbles.accounts.<accountId>.mediaLocalRoots`.
|
||||
- `channels.bluebubbles.coalesceSameSenderDms`: Merge consecutive same-sender DM webhooks into one agent turn so Apple's text+URL split-send arrives as a single message (default: `false`). See [Coalescing split-send DMs](#coalescing-split-send-dms-command--url-in-one-composition) for scenarios, window tuning, and trade-offs. Widens the default inbound debounce window from 500 ms to 2500 ms when enabled without an explicit `messages.inbound.byChannel.bluebubbles`.
|
||||
- `channels.bluebubbles.historyLimit`: Max group messages for context (0 disables).
|
||||
- `channels.bluebubbles.dmHistoryLimit`: DM history limit.
|
||||
- `channels.bluebubbles.actions`: Enable/disable specific actions.
|
||||
@@ -471,6 +569,7 @@ Prefer `chat_guid` for stable routing:
|
||||
- Edit/unsend require macOS 13+ and a compatible BlueBubbles server version. On macOS 26 (Tahoe), edit is currently broken due to private API changes.
|
||||
- Group icon updates can be flaky on macOS 26 (Tahoe): the API may return success but the new icon does not sync.
|
||||
- OpenClaw auto-hides known-broken actions based on the BlueBubbles server's macOS version. If edit still appears on macOS 26 (Tahoe), disable it manually with `channels.bluebubbles.actions.edit=false`.
|
||||
- `coalesceSameSenderDms` enabled but split-sends (e.g. `Dump` + URL) still arrive as two turns: see the [split-send coalescing troubleshooting](#split-send-coalescing-troubleshooting) checklist — common causes are too-tight debounce window, session-log timestamps misread as webhook arrival, or a reply-quote send (which uses `replyToBody`, not a second webhook).
|
||||
- For status/health info: `openclaw status --all` or `openclaw status --deep`.
|
||||
|
||||
For general channel workflow reference, see [Channels](/channels) and the [Plugins](/tools/plugin) guide.
|
||||
|
||||
@@ -62,7 +62,7 @@ Config (global default + per-channel overrides):
|
||||
Notes:
|
||||
|
||||
- Debounce applies to **text-only** messages; media/attachments flush immediately.
|
||||
- Control commands bypass debouncing so they remain standalone.
|
||||
- Control commands bypass debouncing so they remain standalone — **except** when a channel explicitly opts in to same-sender DM coalescing (e.g. [BlueBubbles `coalesceSameSenderDms`](/channels/bluebubbles#coalescing-split-send-dms-command--url-in-one-composition)), where DM commands wait inside the debounce window so a split-send payload can join the same agent turn.
|
||||
|
||||
## Sessions and devices
|
||||
|
||||
|
||||
@@ -94,6 +94,7 @@ const bluebubblesAccountSchema = z
|
||||
catchup: bluebubblesCatchupSchema,
|
||||
blockStreaming: z.boolean().optional(),
|
||||
groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(),
|
||||
coalesceSameSenderDms: z.boolean().optional(),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
const serverUrl = value.serverUrl?.trim() ?? "";
|
||||
|
||||
@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
_resetBlueBubblesInboundDedupForTest,
|
||||
claimBlueBubblesInboundMessage,
|
||||
commitBlueBubblesCoalescedMessageIds,
|
||||
resolveBlueBubblesInboundDedupeKey,
|
||||
} from "./inbound-dedupe.js";
|
||||
|
||||
@@ -58,6 +59,47 @@ describe("claimBlueBubblesInboundMessage", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("commitBlueBubblesCoalescedMessageIds", () => {
|
||||
beforeEach(() => {
|
||||
_resetBlueBubblesInboundDedupForTest();
|
||||
});
|
||||
|
||||
it("marks every coalesced source messageId as seen so a later replay dedupes", async () => {
|
||||
// Primary was processed via claim+finalize by the debouncer flush.
|
||||
expect(await claimAndFinalize("primary", "acc")).toBe("claimed");
|
||||
// Secondaries reach dedupe through the bulk-commit path.
|
||||
await commitBlueBubblesCoalescedMessageIds({
|
||||
messageIds: ["secondary-1", "secondary-2"],
|
||||
accountId: "acc",
|
||||
});
|
||||
// A MessagePoller replay of any individual source event is now a duplicate
|
||||
// rather than a fresh agent turn — the core bug this helper exists to fix.
|
||||
expect(await claimAndFinalize("primary", "acc")).toBe("duplicate");
|
||||
expect(await claimAndFinalize("secondary-1", "acc")).toBe("duplicate");
|
||||
expect(await claimAndFinalize("secondary-2", "acc")).toBe("duplicate");
|
||||
});
|
||||
|
||||
it("scopes coalesced commits per account", async () => {
|
||||
await commitBlueBubblesCoalescedMessageIds({
|
||||
messageIds: ["g1"],
|
||||
accountId: "a",
|
||||
});
|
||||
// Same messageId under a different account is still claimable.
|
||||
expect(await claimAndFinalize("g1", "a")).toBe("duplicate");
|
||||
expect(await claimAndFinalize("g1", "b")).toBe("claimed");
|
||||
});
|
||||
|
||||
it("skips empty or overlong guids without throwing", async () => {
|
||||
await commitBlueBubblesCoalescedMessageIds({
|
||||
messageIds: ["", " ", "x".repeat(10_000), "valid"],
|
||||
accountId: "acc",
|
||||
});
|
||||
expect(await claimAndFinalize("valid", "acc")).toBe("duplicate");
|
||||
// Overlong guid was skipped by sanitization, not committed.
|
||||
expect(await claimAndFinalize("x".repeat(10_000), "acc")).toBe("skip");
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveBlueBubblesInboundDedupeKey", () => {
|
||||
it("returns messageId for new-message events", () => {
|
||||
expect(resolveBlueBubblesInboundDedupeKey({ messageId: "msg-1" })).toBe("msg-1");
|
||||
|
||||
@@ -210,6 +210,35 @@ export async function claimBlueBubblesInboundMessage(params: {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a set of source messageIds as already processed, without going through
|
||||
* the `claim()` protocol. Intended for the coalesced-batch case: when the
|
||||
* debouncer merges N webhook events into one agent turn, only the primary
|
||||
* messageId reaches `claimBlueBubblesInboundMessage`. The remaining source
|
||||
* messageIds must still be remembered so a later MessagePoller replay of any
|
||||
* single source event is recognized as a duplicate rather than re-processed.
|
||||
*
|
||||
* Best-effort — disk errors on secondary commits are surfaced via
|
||||
* `onDiskError` but never thrown, so a single persistence hiccup cannot block
|
||||
* the caller's main finalize path.
|
||||
*/
|
||||
export async function commitBlueBubblesCoalescedMessageIds(params: {
|
||||
messageIds: readonly string[];
|
||||
accountId: string;
|
||||
onDiskError?: (error: unknown) => void;
|
||||
}): Promise<void> {
|
||||
for (const raw of params.messageIds) {
|
||||
const normalized = sanitizeGuid(raw);
|
||||
if (!normalized) {
|
||||
continue;
|
||||
}
|
||||
await impl.commit(normalized, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the legacy→hashed dedupe file migration runs and the on-disk
|
||||
* store is warmed into memory for the given account. Call before any
|
||||
|
||||
@@ -46,6 +46,33 @@ export type BlueBubblesDebounceRegistry = {
|
||||
*/
|
||||
const DEFAULT_INBOUND_DEBOUNCE_MS = 500;
|
||||
|
||||
/**
|
||||
* Default debounce window when `coalesceSameSenderDms` is enabled.
|
||||
*
|
||||
* The legacy 500 ms default is tuned for BlueBubbles's own text+balloon
|
||||
* pairing, which is typically linked by `associatedMessageGuid` and arrives
|
||||
* within ~100-300 ms. The new split-send case this flag targets has a wider
|
||||
* cadence — live traces show Apple delivers `Dump` and its pasted-URL
|
||||
* balloon ~0.8-2.0 s apart — so 500 ms would flush the text alone before the
|
||||
* balloon webhook ever reaches the debouncer. 2500 ms comfortably covers the
|
||||
* observed range while keeping agent-reply latency acceptable for DMs. Users
|
||||
* who want tighter turnaround can still set `messages.inbound.byChannel.bluebubbles`
|
||||
* explicitly.
|
||||
*/
|
||||
const DEFAULT_COALESCE_INBOUND_DEBOUNCE_MS = 2500;
|
||||
|
||||
/**
|
||||
* Bounds on the combined output when multiple inbound events are merged into
|
||||
* one agent turn. Guards against amplification from a sender who rapid-fires
|
||||
* many small DMs inside the debounce window (concern raised on #69258): the
|
||||
* merged text, attachment list, and source-message count are each capped so
|
||||
* a flood cannot balloon a single agent prompt beyond a safe ceiling.
|
||||
* Callers still see every messageId via inbound-dedupe.
|
||||
*/
|
||||
const MAX_COALESCED_TEXT_CHARS = 4000;
|
||||
const MAX_COALESCED_ATTACHMENTS = 20;
|
||||
const MAX_COALESCED_ENTRIES = 10;
|
||||
|
||||
/**
|
||||
* Combines multiple debounced messages into a single message for processing.
|
||||
* Used when multiple webhook events arrive within the debounce window.
|
||||
@@ -61,11 +88,21 @@ function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): Normalized
|
||||
// Use the first message as the base (typically the text message)
|
||||
const first = entries[0].message;
|
||||
|
||||
// Combine text from all entries, filtering out duplicates and empty strings
|
||||
// Cap the number of source entries we fold into the merged view so a sender
|
||||
// who rapid-fires many small DMs cannot amplify the downstream prompt.
|
||||
// Prefer the first and the most recent — the first preserves the original
|
||||
// command/context and the last preserves the most recent payload — rather
|
||||
// than dropping either tail of the sequence.
|
||||
const boundedEntries =
|
||||
entries.length > MAX_COALESCED_ENTRIES
|
||||
? [...entries.slice(0, MAX_COALESCED_ENTRIES - 1), entries[entries.length - 1]]
|
||||
: entries;
|
||||
|
||||
// Combine text from bounded entries, filtering out duplicates and empty strings
|
||||
const seenTexts = new Set<string>();
|
||||
const textParts: string[] = [];
|
||||
|
||||
for (const entry of entries) {
|
||||
for (const entry of boundedEntries) {
|
||||
const text = normalizeDebounceMessageText(entry.message.text).trim();
|
||||
if (!text) {
|
||||
continue;
|
||||
@@ -79,8 +116,16 @@ function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): Normalized
|
||||
textParts.push(text);
|
||||
}
|
||||
|
||||
// Merge attachments from all entries
|
||||
const allAttachments = entries.flatMap((e) => e.message.attachments ?? []);
|
||||
let combinedText = textParts.join(" ");
|
||||
if (combinedText.length > MAX_COALESCED_TEXT_CHARS) {
|
||||
combinedText = `${combinedText.slice(0, MAX_COALESCED_TEXT_CHARS)}…[truncated]`;
|
||||
}
|
||||
|
||||
// Merge attachments from bounded entries, capped to keep downstream media
|
||||
// fan-out proportional to what a single message would carry.
|
||||
const allAttachments = boundedEntries
|
||||
.flatMap((e) => e.message.attachments ?? [])
|
||||
.slice(0, MAX_COALESCED_ATTACHMENTS);
|
||||
|
||||
// Use the latest timestamp
|
||||
const timestamps = entries
|
||||
@@ -91,16 +136,34 @@ function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): Normalized
|
||||
// Collect all message IDs for reference
|
||||
const messageId = entries.map((e) => e.message.messageId).find((id): id is string => Boolean(id));
|
||||
|
||||
// Every source messageId we're folding into this merged view must reach
|
||||
// inbound-dedupe, so a later BlueBubbles MessagePoller replay of any single
|
||||
// source event is recognized as a duplicate rather than re-processed as a
|
||||
// fresh agent turn. We walk the unbounded `entries` (not `boundedEntries`)
|
||||
// so even IDs whose text/attachments were dropped by the cap are still
|
||||
// remembered.
|
||||
const seenIds = new Set<string>();
|
||||
const coalescedMessageIds: string[] = [];
|
||||
for (const entry of entries) {
|
||||
const id = entry.message.messageId?.trim();
|
||||
if (!id || seenIds.has(id)) {
|
||||
continue;
|
||||
}
|
||||
seenIds.add(id);
|
||||
coalescedMessageIds.push(id);
|
||||
}
|
||||
|
||||
// Prefer reply context from any entry that has it
|
||||
const entryWithReply = entries.find((e) => e.message.replyToId);
|
||||
|
||||
return {
|
||||
...first,
|
||||
text: textParts.join(" "),
|
||||
text: combinedText,
|
||||
attachments: allAttachments.length > 0 ? allAttachments : first.attachments,
|
||||
timestamp: latestTimestamp,
|
||||
// Use first message's ID as primary (for reply reference), but we've coalesced others
|
||||
messageId: messageId ?? first.messageId,
|
||||
coalescedMessageIds: coalescedMessageIds.length > 0 ? coalescedMessageIds : undefined,
|
||||
// Preserve reply context if present
|
||||
replyToId: entryWithReply?.message.replyToId ?? first.replyToId,
|
||||
replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody,
|
||||
@@ -113,13 +176,24 @@ function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): Normalized
|
||||
function resolveBlueBubblesDebounceMs(
|
||||
config: OpenClawConfig,
|
||||
core: BlueBubblesCoreRuntime,
|
||||
accountConfig: { coalesceSameSenderDms?: boolean },
|
||||
): number {
|
||||
const inbound = config.messages?.inbound;
|
||||
const hasExplicitDebounce =
|
||||
typeof inbound?.debounceMs === "number" || typeof inbound?.byChannel?.bluebubbles === "number";
|
||||
if (!hasExplicitDebounce) {
|
||||
return DEFAULT_INBOUND_DEBOUNCE_MS;
|
||||
// When the opt-in coalesce flag is on, the default must cover Apple's
|
||||
// split-send cadence (~0.8-2.0 s) or the flag becomes a no-op. Other
|
||||
// users keep the legacy tight default tuned for text+balloon pairs
|
||||
// linked via `associatedMessageGuid`.
|
||||
return accountConfig.coalesceSameSenderDms
|
||||
? DEFAULT_COALESCE_INBOUND_DEBOUNCE_MS
|
||||
: DEFAULT_INBOUND_DEBOUNCE_MS;
|
||||
}
|
||||
// Explicit config path: delegate to the shared runtime helper so per-
|
||||
// channel scaling, clamps, or other future logic in
|
||||
// `src/auto-reply/inbound-debounce.ts` stay authoritative for every
|
||||
// channel uniformly.
|
||||
return core.channel.debounce.resolveInboundDebounceMs({ cfg: config, channel: "bluebubbles" });
|
||||
}
|
||||
|
||||
@@ -137,7 +211,7 @@ export function createBlueBubblesDebounceRegistry(params: {
|
||||
|
||||
const { account, config, runtime, core } = target;
|
||||
const baseDebouncer = core.channel.debounce.createInboundDebouncer<BlueBubblesDebounceEntry>({
|
||||
debounceMs: resolveBlueBubblesDebounceMs(config, core),
|
||||
debounceMs: resolveBlueBubblesDebounceMs(config, core, account.config),
|
||||
buildKey: (entry) => {
|
||||
const msg = entry.message;
|
||||
// Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the
|
||||
@@ -152,15 +226,37 @@ export function createBlueBubblesDebounceRegistry(params: {
|
||||
return `bluebubbles:${account.accountId}:msg:${associatedMessageGuid}`;
|
||||
}
|
||||
|
||||
// Optional: coalesce consecutive DM messages from the same sender
|
||||
// within the debounce window. Two distinct user sends (e.g.
|
||||
// `Dump` followed by a pasted URL that iMessage renders as a
|
||||
// standalone rich-link balloon) have distinct messageIds and no
|
||||
// associatedMessageGuid cross-reference, so the default per-message
|
||||
// key dispatches them as separate agent turns. Hashing to
|
||||
// chat:sender lets the debounce window merge them. DMs only —
|
||||
// group chats continue to key per-message to preserve multi-user
|
||||
// conversational structure.
|
||||
//
|
||||
// We intentionally do NOT guard on `!balloonBundleId` here: an
|
||||
// orphan URL-balloon (Apple split-send where the balloon event
|
||||
// carries `balloonBundleId` but no `associatedMessageGuid` linking
|
||||
// it back to the text) is exactly the traffic this feature
|
||||
// targets. The legacy text+balloon pairing case is already
|
||||
// captured above by the `balloonBundleId && associatedMessageGuid`
|
||||
// branch, so skipping balloons here would defeat the opt-in for
|
||||
// its primary motivating case.
|
||||
const chatKey =
|
||||
msg.chatGuid?.trim() ??
|
||||
msg.chatIdentifier?.trim() ??
|
||||
(msg.chatId ? String(msg.chatId) : "dm");
|
||||
if (account.config.coalesceSameSenderDms && !msg.isGroup && !associatedMessageGuid) {
|
||||
return `bluebubbles:${account.accountId}:dm:${chatKey}:${msg.senderId}`;
|
||||
}
|
||||
|
||||
const messageId = msg.messageId?.trim();
|
||||
if (messageId) {
|
||||
return `bluebubbles:${account.accountId}:msg:${messageId}`;
|
||||
}
|
||||
|
||||
const chatKey =
|
||||
msg.chatGuid?.trim() ??
|
||||
msg.chatIdentifier?.trim() ??
|
||||
(msg.chatId ? String(msg.chatId) : "dm");
|
||||
return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`;
|
||||
},
|
||||
shouldDebounce: (entry) => {
|
||||
@@ -169,8 +265,21 @@ export function createBlueBubblesDebounceRegistry(params: {
|
||||
if (msg.fromMe) {
|
||||
return false;
|
||||
}
|
||||
// Skip debouncing for control commands - process immediately
|
||||
// Control commands normally flush immediately so the command feels
|
||||
// instant. Exception: when `coalesceSameSenderDms` is enabled, a DM
|
||||
// control command is frequently the first half of a split-send
|
||||
// (e.g. `Dump` followed by a pasted URL that Apple delivers as a
|
||||
// separate webhook ~700-2000 ms later). Skipping debounce here
|
||||
// would flush the command alone before the URL bucket-mate arrives
|
||||
// — defeating the opt-in feature on exactly its target traffic.
|
||||
// Gate the delay on the same conditions as the buildKey coalesce
|
||||
// branch so group chats, balloon follow-ups, and disabled accounts
|
||||
// keep the instant-flush path.
|
||||
if (core.channel.text.hasControlCommand(msg.text, config)) {
|
||||
const associatedMessageGuid = msg.associatedMessageGuid?.trim();
|
||||
if (account.config.coalesceSameSenderDms && !msg.isGroup && !associatedMessageGuid) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
// Debounce all other messages to coalesce rapid-fire webhook events
|
||||
|
||||
@@ -479,6 +479,15 @@ export type NormalizedWebhookMessage = {
|
||||
replyToSender?: string;
|
||||
/** Webhook event type preserved for dedup key differentiation. */
|
||||
eventType?: string;
|
||||
/**
|
||||
* When the debouncer merges multiple source webhook events into one
|
||||
* processed message (see `combineDebounceEntries` in `monitor-debounce.ts`),
|
||||
* this preserves every source `messageId` that contributed to the merged
|
||||
* view. Downstream inbound-dedupe commits all of them so a later BlueBubbles
|
||||
* MessagePoller replay of any individual source event is recognized as a
|
||||
* duplicate rather than re-processed. Unset for single-event messages.
|
||||
*/
|
||||
coalescedMessageIds?: string[];
|
||||
};
|
||||
|
||||
export type NormalizedWebhookReaction = {
|
||||
|
||||
@@ -19,6 +19,7 @@ import { resolveBlueBubblesConversationRoute } from "./conversation-route.js";
|
||||
import { fetchBlueBubblesHistory } from "./history.js";
|
||||
import {
|
||||
claimBlueBubblesInboundMessage,
|
||||
commitBlueBubblesCoalescedMessageIds,
|
||||
resolveBlueBubblesInboundDedupeKey,
|
||||
} from "./inbound-dedupe.js";
|
||||
import { sendBlueBubblesMedia } from "./media-send.js";
|
||||
@@ -666,6 +667,32 @@ export async function processMessage(
|
||||
`inbound-dedupe: finalize failed for key=${sanitizeForLog(dedupeKey ?? "")}: ${sanitizeForLog(finalizeError)}`,
|
||||
);
|
||||
}
|
||||
// When the debouncer coalesced multiple source webhook events into this
|
||||
// single processed message, every source messageId must reach dedupe so
|
||||
// a later MessagePoller replay of any individual source event is
|
||||
// recognized as a duplicate. The primary is already finalized above;
|
||||
// commit the rest here (best-effort, per-id).
|
||||
const secondaryIds = (message.coalescedMessageIds ?? []).filter((id) => id !== dedupeKey);
|
||||
if (secondaryIds.length > 0) {
|
||||
try {
|
||||
await commitBlueBubblesCoalescedMessageIds({
|
||||
messageIds: secondaryIds,
|
||||
accountId: account.accountId,
|
||||
onDiskError: (error) =>
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`inbound-dedupe: coalesced secondary commit disk error: ${sanitizeForLog(error)}`,
|
||||
),
|
||||
});
|
||||
} catch (secondaryError) {
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`inbound-dedupe: coalesced secondary commit failed for primary=${sanitizeForLog(dedupeKey ?? "")}: ${sanitizeForLog(secondaryError)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1020,6 +1020,511 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("coalesces same-sender DM messages when coalesceSameSenderDms is enabled", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Pin an explicit short debounce window so these tests stay
|
||||
// decoupled from the coalesce-flag default (2500 ms). The
|
||||
// "widens the default debounce window" test intentionally omits
|
||||
// this override to exercise the new default.
|
||||
config: { messages: { inbound: { byChannel: { bluebubbles: 500 } } } },
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
|
||||
// Two distinct user sends: a command ("Dump") followed by a URL.
|
||||
// No associatedMessageGuid linking them. Default buildKey hashes by
|
||||
// per-message messageId, so historically they dispatched separately.
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "Dump",
|
||||
messageId: "dm-msg-1",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "https://example.com/article",
|
||||
messageId: "dm-msg-2",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
expect(processMessage).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
expect(processMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
text: "Dump https://example.com/article",
|
||||
// Every source messageId must reach inbound-dedupe so a later
|
||||
// MessagePoller replay of either event alone is recognized as a
|
||||
// duplicate rather than re-processed.
|
||||
coalescedMessageIds: ["dm-msg-1", "dm-msg-2"],
|
||||
}),
|
||||
target,
|
||||
);
|
||||
expect(target.runtime.error).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not coalesce same-sender DM messages when coalesceSameSenderDms is off (default)", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount();
|
||||
const target = {
|
||||
account,
|
||||
config: {},
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "Dump",
|
||||
messageId: "dm-msg-1",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "https://example.com/article",
|
||||
messageId: "dm-msg-2",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("bounds the coalesced output when many messages merge into one turn", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Pin an explicit short debounce window so these tests stay
|
||||
// decoupled from the coalesce-flag default (2500 ms). The
|
||||
// "widens the default debounce window" test intentionally omits
|
||||
// this override to exercise the new default.
|
||||
config: { messages: { inbound: { byChannel: { bluebubbles: 500 } } } },
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
// Use a unique long text block per entry to exceed MAX_COALESCED_TEXT_CHARS (4000)
|
||||
// after naive concatenation. 25 entries × ~400 chars ≈ 10_000 chars worth of content.
|
||||
const blob = "x".repeat(400);
|
||||
for (let i = 0; i < 25; i++) {
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: `msg-${i}-${blob}`,
|
||||
messageId: `flood-${i}`,
|
||||
attachments: [{ guid: `att-${i}`, mimeType: "image/jpeg", totalBytes: 1024 }],
|
||||
}),
|
||||
target,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
}
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
const [merged] = processMessage.mock.calls[0] as [NormalizedWebhookMessage, unknown];
|
||||
// Text is truncated with explicit marker instead of ballooning.
|
||||
expect(merged.text.length).toBeLessThanOrEqual(4000 + "…[truncated]".length);
|
||||
expect(merged.text.endsWith("…[truncated]")).toBe(true);
|
||||
// Attachments are capped so downstream media fan-out stays bounded.
|
||||
expect(merged.attachments?.length).toBeLessThanOrEqual(20);
|
||||
// Every source messageId — including ones whose text/attachments the
|
||||
// cap dropped — still reaches inbound-dedupe. Truncation caps prompt
|
||||
// size; it must not leak replay risk.
|
||||
expect(merged.coalescedMessageIds).toHaveLength(25);
|
||||
expect(merged.coalescedMessageIds?.[0]).toBe("flood-0");
|
||||
expect(merged.coalescedMessageIds?.[24]).toBe("flood-24");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not coalesce group-chat messages even with coalesceSameSenderDms enabled", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Pin an explicit short debounce window so these tests stay
|
||||
// decoupled from the coalesce-flag default (2500 ms). The
|
||||
// "widens the default debounce window" test intentionally omits
|
||||
// this override to exercise the new default.
|
||||
config: { messages: { inbound: { byChannel: { bluebubbles: 500 } } } },
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;group-abc";
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "first",
|
||||
messageId: "grp-msg-1",
|
||||
isGroup: true,
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "second",
|
||||
messageId: "grp-msg-2",
|
||||
isGroup: true,
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("debounces DM control commands when coalesceSameSenderDms is on so a split-send URL can join the bucket", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Pin an explicit short debounce window so these tests stay
|
||||
// decoupled from the coalesce-flag default (2500 ms). The
|
||||
// "widens the default debounce window" test intentionally omits
|
||||
// this override to exercise the new default.
|
||||
config: { messages: { inbound: { byChannel: { bluebubbles: 500 } } } },
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
// Simulate a registered skill alias ("Dump") — normally this would
|
||||
// flush immediately and miss its split-send URL. The coalesce flag
|
||||
// must override that short-circuit for DMs specifically.
|
||||
mockHasControlCommand.mockReturnValue(true);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "Dump",
|
||||
messageId: "cmd-msg-1",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
// Apple/BlueBubbles delivers the URL ~750 ms later — well inside a
|
||||
// reasonable coalesce window.
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
expect(processMessage).not.toHaveBeenCalled();
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "https://example.com/article",
|
||||
messageId: "cmd-msg-2",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
expect(processMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
text: "Dump https://example.com/article",
|
||||
coalescedMessageIds: ["cmd-msg-1", "cmd-msg-2"],
|
||||
}),
|
||||
target,
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
mockHasControlCommand.mockReturnValue(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("coalesces an orphan URL-balloon with a preceding DM control command (Apple split-send)", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Pin an explicit short debounce window so these tests stay
|
||||
// decoupled from the coalesce-flag default (2500 ms). The
|
||||
// "widens the default debounce window" test intentionally omits
|
||||
// this override to exercise the new default.
|
||||
config: { messages: { inbound: { byChannel: { bluebubbles: 500 } } } },
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
mockHasControlCommand.mockReturnValue(true);
|
||||
|
||||
// Matches the live trace from BB server:
|
||||
// 20:45:13.232 New Message "Dump"
|
||||
// 20:45:14.274 New Message "https://..."; Attachments: 3
|
||||
// The second webhook arrives with balloonBundleId set (URL-preview
|
||||
// balloon) but no associatedMessageGuid linking it back to "Dump" —
|
||||
// this is Apple's orphan split-send. buildKey must still place it in
|
||||
// the same dm:<chat>:<sender> bucket as the "Dump" event.
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "Dump",
|
||||
messageId: "split-cmd",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
// Stay inside the default 500 ms window so the bucket is still open
|
||||
// when the URL-balloon arrives. Real traffic needs `messages.inbound.byChannel.bluebubbles`
|
||||
// bumped to ~2500 ms for the observed ~800-1800 ms Apple split-send
|
||||
// cadence; this unit test keeps the default window and just proves
|
||||
// the key/shouldDebounce logic buckets both webhooks together.
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
// The URL-balloon's text is not a registered command, so the mock
|
||||
// must return false for that one call.
|
||||
mockHasControlCommand.mockReturnValueOnce(false);
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "https://www.theverge.com/tech/906873/sofa-app-track-tv-movies-installer",
|
||||
messageId: "split-url",
|
||||
balloonBundleId: "com.apple.messages.URLBalloonProvider",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
expect(processMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
text: "Dump https://www.theverge.com/tech/906873/sofa-app-track-tv-movies-installer",
|
||||
coalescedMessageIds: ["split-cmd", "split-url"],
|
||||
}),
|
||||
target,
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
mockHasControlCommand.mockReturnValue(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("widens the default debounce window when coalesceSameSenderDms is enabled without explicit config", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Intentionally NO messages.inbound.byChannel.bluebubbles —
|
||||
// this test exercises the coalesce-flag default (2500 ms).
|
||||
config: {},
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "first",
|
||||
messageId: "wide-1",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
// 1500 ms is well outside the legacy 500 ms default but inside the
|
||||
// 2500 ms coalesce default — without the new default, the first
|
||||
// entry would flush alone before the second enqueue arrives.
|
||||
await vi.advanceTimersByTimeAsync(1500);
|
||||
expect(processMessage).not.toHaveBeenCalled();
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid,
|
||||
text: "second",
|
||||
messageId: "wide-2",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(3000);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
expect(processMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
text: "first second",
|
||||
coalescedMessageIds: ["wide-1", "wide-2"],
|
||||
}),
|
||||
target,
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps the legacy 500 ms default window when coalesceSameSenderDms is off", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount(); // flag off
|
||||
const target = {
|
||||
account,
|
||||
config: {},
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
text: "only",
|
||||
messageId: "legacy-1",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
// Legacy behavior: flush within the tight 500 ms window so non-opt-in
|
||||
// users keep their existing responsiveness.
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps control commands instant for group chats even when coalesceSameSenderDms is enabled", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount({ coalesceSameSenderDms: true });
|
||||
const target = {
|
||||
account,
|
||||
// Pin an explicit short debounce window so these tests stay
|
||||
// decoupled from the coalesce-flag default (2500 ms). The
|
||||
// "widens the default debounce window" test intentionally omits
|
||||
// this override to exercise the new default.
|
||||
config: { messages: { inbound: { byChannel: { bluebubbles: 500 } } } },
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
mockHasControlCommand.mockReturnValue(true);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
chatGuid: "iMessage;-;group-xyz",
|
||||
text: "Dump",
|
||||
messageId: "grp-cmd-1",
|
||||
isGroup: true,
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
// Group-chat command must not wait for a hypothetical bucket-mate;
|
||||
// the per-message debounce key never shares anyway, so instant flush
|
||||
// is the correct behavior.
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
mockHasControlCommand.mockReturnValue(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("skips null-text entries during flush and still delivers the valid message", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
|
||||
@@ -105,6 +105,17 @@ export type BlueBubblesAccountConfig = {
|
||||
healthMonitor?: {
|
||||
enabled?: boolean;
|
||||
};
|
||||
/**
|
||||
* When true, consecutive DM messages (`isGroup === false`) from the same
|
||||
* sender within the inbound debounce window coalesce into a single agent
|
||||
* turn. Keys by `chat:sender` instead of the per-message `messageId` so
|
||||
* "command + payload as two sends" (e.g. a `dump` command followed by a
|
||||
* pasted URL that iMessage renders as its own URL balloon) reaches the
|
||||
* agent together. Does not apply to group chats or to BlueBubbles
|
||||
* text+balloon follow-ups, which still coalesce via
|
||||
* `associatedMessageGuid`. Default: false.
|
||||
*/
|
||||
coalesceSameSenderDms?: boolean;
|
||||
};
|
||||
|
||||
export type BlueBubblesConfig = Omit<BlueBubblesAccountConfig, "actions"> & {
|
||||
@@ -171,7 +182,11 @@ export function buildBlueBubblesApiUrl(params: {
|
||||
let _fetchGuard = fetchWithSsrFGuard;
|
||||
|
||||
/** @internal Replace the SSRF fetch guard in tests. */
|
||||
export function _setFetchGuardForTesting(impl: typeof fetchWithSsrFGuard | null): void {
|
||||
export function _setFetchGuardForTesting(
|
||||
impl:
|
||||
| ((...args: Parameters<typeof fetchWithSsrFGuard>) => ReturnType<typeof fetchWithSsrFGuard>)
|
||||
| null,
|
||||
): void {
|
||||
_fetchGuard = impl ?? fetchWithSsrFGuard;
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ const sourceRoots = ["src/channels", "src/routing", "src/line", "extensions"];
|
||||
// code should be rejected and migrated to fetchWithSsrFGuard/shared channel helpers.
|
||||
const allowedRawFetchCallsites = new Set([
|
||||
bundledPluginCallsite("bluebubbles", "src/test-harness.ts", 132),
|
||||
bundledPluginCallsite("bluebubbles", "src/types.ts", 194),
|
||||
bundledPluginCallsite("bluebubbles", "src/types.ts", 204),
|
||||
bundledPluginCallsite("browser", "src/browser/cdp.helpers.ts", 268),
|
||||
bundledPluginCallsite("browser", "src/browser/client-fetch.ts", 192),
|
||||
bundledPluginCallsite("browser", "src/browser/test-fetch.ts", 24),
|
||||
|
||||
@@ -311,10 +311,16 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
systemPrompt: {
|
||||
type: "string",
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
coalesceSameSenderDms: {
|
||||
type: "boolean",
|
||||
},
|
||||
accounts: {
|
||||
type: "object",
|
||||
properties: {},
|
||||
@@ -622,10 +628,16 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
systemPrompt: {
|
||||
type: "string",
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
coalesceSameSenderDms: {
|
||||
type: "boolean",
|
||||
},
|
||||
},
|
||||
required: ["enrichGroupParticipantsFromContacts"],
|
||||
additionalProperties: false,
|
||||
@@ -13069,6 +13081,11 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
|
||||
exclusiveMinimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
pollingStallThresholdMs: {
|
||||
type: "integer",
|
||||
minimum: 30000,
|
||||
maximum: 600000,
|
||||
},
|
||||
retry: {
|
||||
type: "object",
|
||||
properties: {
|
||||
@@ -14102,6 +14119,11 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
|
||||
exclusiveMinimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
pollingStallThresholdMs: {
|
||||
type: "integer",
|
||||
minimum: 30000,
|
||||
maximum: 600000,
|
||||
},
|
||||
retry: {
|
||||
type: "object",
|
||||
properties: {
|
||||
@@ -14482,6 +14504,10 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
|
||||
label: "Telegram API Timeout (seconds)",
|
||||
help: "Max seconds before Telegram API requests are aborted (default: 500 per grammY).",
|
||||
},
|
||||
pollingStallThresholdMs: {
|
||||
label: "Telegram Polling Stall Threshold (ms)",
|
||||
help: "Milliseconds without completed Telegram getUpdates liveness before the polling watchdog restarts the polling runner. Default: 120000.",
|
||||
},
|
||||
silentErrorReplies: {
|
||||
label: "Telegram Silent Error Replies",
|
||||
help: "When true, Telegram bot replies marked as errors are sent silently (no notification sound). Default: false.",
|
||||
|
||||
@@ -1460,6 +1460,7 @@ export const BlueBubblesAccountSchemaBase = z
|
||||
heartbeat: ChannelHeartbeatVisibilitySchema,
|
||||
healthMonitor: ChannelHealthMonitorSchema,
|
||||
responsePrefix: z.string().optional(),
|
||||
coalesceSameSenderDms: z.boolean().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
|
||||
@@ -325,9 +325,44 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
|
||||
flushKey: vi.fn(),
|
||||
}),
|
||||
) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"],
|
||||
resolveInboundDebounceMs: vi.fn(
|
||||
() => 0,
|
||||
) as unknown as PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"],
|
||||
resolveInboundDebounceMs: vi.fn((params: unknown) => {
|
||||
// Match the production contract so channel plugins that delegate to
|
||||
// `core.channel.debounce.resolveInboundDebounceMs({ cfg, channel })`
|
||||
// see the same per-channel/global/default precedence in tests as
|
||||
// they would at runtime. Prior to this, the mock returned 0
|
||||
// unconditionally, which meant any channel that delegated (vs.
|
||||
// reading config directly) effectively disabled its debounce
|
||||
// window in tests — a footgun that silently hid coverage for
|
||||
// per-channel overrides.
|
||||
const p = params as
|
||||
| {
|
||||
cfg?: {
|
||||
messages?: {
|
||||
inbound?: {
|
||||
debounceMs?: unknown;
|
||||
byChannel?: Record<string, unknown>;
|
||||
};
|
||||
};
|
||||
};
|
||||
channel?: string;
|
||||
overrideMs?: unknown;
|
||||
}
|
||||
| undefined;
|
||||
const override = typeof p?.overrideMs === "number" ? p.overrideMs : undefined;
|
||||
if (typeof override === "number") {
|
||||
return override;
|
||||
}
|
||||
const inbound = p?.cfg?.messages?.inbound;
|
||||
const perChannel =
|
||||
p?.channel && inbound?.byChannel ? inbound.byChannel[p.channel] : undefined;
|
||||
if (typeof perChannel === "number") {
|
||||
return perChannel;
|
||||
}
|
||||
if (typeof inbound?.debounceMs === "number") {
|
||||
return inbound.debounceMs;
|
||||
}
|
||||
return 0;
|
||||
}) as unknown as PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"],
|
||||
},
|
||||
commands: {
|
||||
resolveCommandAuthorizedFromAuthorizers: vi.fn(
|
||||
|
||||
Reference in New Issue
Block a user