From e1015a51975552185b077b8bd6e3c4391b859ed5 Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Tue, 17 Feb 2026 11:12:09 -0800 Subject: [PATCH] fix(bluebubbles): recover outbound message IDs and include sender metadata --- CHANGELOG.md | 3 + .../bluebubbles/src/monitor-processing.ts | 215 ++++++++++++++++-- extensions/bluebubbles/src/monitor.test.ts | 143 ++++++++++++ src/auto-reply/reply/inbound-meta.test.ts | 28 +++ src/auto-reply/reply/inbound-meta.ts | 1 + 5 files changed, 375 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6148c92d5cd..15c50c110f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,9 @@ Docs: https://docs.openclaw.ai ### Fixes +- BlueBubbles: add fallback path to recover outbound `message_id` from `fromMe` webhooks when platform message IDs are missing. +- BlueBubbles: match outbound message-id fallback recovery by chat identifier as well as account context. +- BlueBubbles: include sender identifier in untrusted conversation metadata for conversation info payloads. - macOS/Update: correct the Sparkle appcast version for 2026.2.15 so updates are offered again. (#18201) - Gateway/Auth: clear stale device-auth tokens after device token mismatch errors so re-paired clients can re-auth. (#18201) - Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky. diff --git a/extensions/bluebubbles/src/monitor-processing.ts b/extensions/bluebubbles/src/monitor-processing.ts index 1b5e80352e6..0719c548556 100644 --- a/extensions/bluebubbles/src/monitor-processing.ts +++ b/extensions/bluebubbles/src/monitor-processing.ts @@ -6,6 +6,7 @@ import { logTypingFailure, resolveAckReaction, resolveControlCommandGate, + stripMarkdown, } from "openclaw/plugin-sdk"; import { downloadBlueBubblesAttachment } from "./attachments.js"; import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js"; @@ -40,6 +41,135 @@ import { formatBlueBubblesChatTarget, isAllowedBlueBubblesSender } from "./targe const DEFAULT_TEXT_LIMIT = 4000; const invalidAckReactions = new Set(); const REPLY_DIRECTIVE_TAG_RE = /\[\[\s*(?:reply_to_current|reply_to\s*:\s*[^\]\n]+)\s*\]\]/gi; +const PENDING_OUTBOUND_MESSAGE_ID_TTL_MS = 2 * 60 * 1000; + +type PendingOutboundMessageId = { + id: number; + accountId: string; + sessionKey: string; + outboundTarget: string; + chatGuid?: string; + chatIdentifier?: string; + chatId?: number; + snippetRaw: string; + snippetNorm: string; + isMediaSnippet: boolean; + createdAt: number; +}; + +const pendingOutboundMessageIds: PendingOutboundMessageId[] = []; +let pendingOutboundMessageIdCounter = 0; + +function trimOrUndefined(value?: string | null): string | undefined { + const trimmed = value?.trim(); + return trimmed ? trimmed : undefined; +} + +function normalizeSnippet(value: string): string { + return stripMarkdown(value).replace(/\s+/g, " ").trim().toLowerCase(); +} + +function prunePendingOutboundMessageIds(now = Date.now()): void { + const cutoff = now - PENDING_OUTBOUND_MESSAGE_ID_TTL_MS; + for (let i = pendingOutboundMessageIds.length - 1; i >= 0; i--) { + if (pendingOutboundMessageIds[i].createdAt < cutoff) { + pendingOutboundMessageIds.splice(i, 1); + } + } +} + +function rememberPendingOutboundMessageId(entry: { + accountId: string; + sessionKey: string; + outboundTarget: string; + chatGuid?: string; + chatIdentifier?: string; + chatId?: number; + snippet: string; +}): number { + prunePendingOutboundMessageIds(); + pendingOutboundMessageIdCounter += 1; + const snippetRaw = entry.snippet.trim(); + const snippetNorm = normalizeSnippet(snippetRaw); + pendingOutboundMessageIds.push({ + id: pendingOutboundMessageIdCounter, + accountId: entry.accountId, + sessionKey: entry.sessionKey, + outboundTarget: entry.outboundTarget, + chatGuid: trimOrUndefined(entry.chatGuid), + chatIdentifier: trimOrUndefined(entry.chatIdentifier), + chatId: typeof entry.chatId === "number" ? entry.chatId : undefined, + snippetRaw, + snippetNorm, + isMediaSnippet: snippetRaw.toLowerCase().startsWith(" entry.id === id); + if (index >= 0) { + pendingOutboundMessageIds.splice(index, 1); + } +} + +function chatsMatch( + left: Pick, + right: { chatGuid?: string; chatIdentifier?: string; chatId?: number }, +): boolean { + const leftGuid = trimOrUndefined(left.chatGuid); + const rightGuid = trimOrUndefined(right.chatGuid); + if (leftGuid && rightGuid) { + return leftGuid === rightGuid; + } + + const leftIdentifier = trimOrUndefined(left.chatIdentifier); + const rightIdentifier = trimOrUndefined(right.chatIdentifier); + if (leftIdentifier && rightIdentifier) { + return leftIdentifier === rightIdentifier; + } + + const leftChatId = typeof left.chatId === "number" ? left.chatId : undefined; + const rightChatId = typeof right.chatId === "number" ? right.chatId : undefined; + if (leftChatId !== undefined && rightChatId !== undefined) { + return leftChatId === rightChatId; + } + + return false; +} + +function consumePendingOutboundMessageId(params: { + accountId: string; + chatGuid?: string; + chatIdentifier?: string; + chatId?: number; + body: string; +}): PendingOutboundMessageId | null { + prunePendingOutboundMessageIds(); + const bodyNorm = normalizeSnippet(params.body); + const isMediaBody = params.body.trim().toLowerCase().startsWith(" 12 ? "…" : ""}"` + : ""; + core.system.enqueueSystemEvent(`Assistant sent${preview} [message_id:${displayId}]`, { + sessionKey: pending.sessionKey, + contextKey: `bluebubbles:outbound:${pending.outboundTarget}:${cacheMessageId}`, + }); + } + } return; } @@ -629,10 +779,10 @@ export async function processMessage( ? formatBlueBubblesChatTarget({ chatGuid: chatGuidForActions }) : message.senderId; - const maybeEnqueueOutboundMessageId = (messageId?: string, snippet?: string) => { + const maybeEnqueueOutboundMessageId = (messageId?: string, snippet?: string): boolean => { const trimmed = messageId?.trim(); if (!trimmed || trimmed === "ok" || trimmed === "unknown") { - return; + return false; } // Cache outbound message to get short ID const cacheEntry = rememberBlueBubblesReplyCache({ @@ -651,6 +801,7 @@ export async function processMessage( sessionKey: route.sessionKey, contextKey: `bluebubbles:outbound:${outboundTarget}:${trimmed}`, }); + return true; }; const sanitizeReplyDirectiveText = (value: string): string => { if (privateApiEnabled) { @@ -768,16 +919,33 @@ export async function processMessage( for (const mediaUrl of mediaList) { const caption = first ? text : undefined; first = false; - const result = await sendBlueBubblesMedia({ - cfg: config, - to: outboundTarget, - mediaUrl, - caption: caption ?? undefined, - replyToId: replyToMessageGuid || null, - accountId: account.accountId, - }); const cachedBody = (caption ?? "").trim() || ""; - maybeEnqueueOutboundMessageId(result.messageId, cachedBody); + const pendingId = rememberPendingOutboundMessageId({ + accountId: account.accountId, + sessionKey: route.sessionKey, + outboundTarget, + chatGuid: chatGuidForActions ?? chatGuid, + chatIdentifier, + chatId, + snippet: cachedBody, + }); + let result: Awaited>; + try { + result = await sendBlueBubblesMedia({ + cfg: config, + to: outboundTarget, + mediaUrl, + caption: caption ?? undefined, + replyToId: replyToMessageGuid || null, + accountId: account.accountId, + }); + } catch (err) { + forgetPendingOutboundMessageId(pendingId); + throw err; + } + if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) { + forgetPendingOutboundMessageId(pendingId); + } sentMessage = true; statusSink?.({ lastOutboundAt: Date.now() }); if (info.kind === "block") { @@ -811,12 +979,29 @@ export async function processMessage( return; } for (const chunk of chunks) { - const result = await sendMessageBlueBubbles(outboundTarget, chunk, { - cfg: config, + const pendingId = rememberPendingOutboundMessageId({ accountId: account.accountId, - replyToMessageGuid: replyToMessageGuid || undefined, + sessionKey: route.sessionKey, + outboundTarget, + chatGuid: chatGuidForActions ?? chatGuid, + chatIdentifier, + chatId, + snippet: chunk, }); - maybeEnqueueOutboundMessageId(result.messageId, chunk); + let result: Awaited>; + try { + result = await sendMessageBlueBubbles(outboundTarget, chunk, { + cfg: config, + accountId: account.accountId, + replyToMessageGuid: replyToMessageGuid || undefined, + }); + } catch (err) { + forgetPendingOutboundMessageId(pendingId); + throw err; + } + if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) { + forgetPendingOutboundMessageId(pendingId); + } sentMessage = true; statusSink?.({ lastOutboundAt: Date.now() }); if (info.kind === "block") { diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index 3f08a78c9a2..6e4d39cbb53 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -2470,6 +2470,149 @@ describe("BlueBubbles webhook monitor", () => { }), ); }); + + it("falls back to from-me webhook when send response has no message id", async () => { + mockEnqueueSystemEvent.mockClear(); + + const { sendMessageBlueBubbles } = await import("./send.js"); + vi.mocked(sendMessageBlueBubbles).mockResolvedValueOnce({ messageId: "ok" }); + + mockDispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async (params) => { + await params.dispatcherOptions.deliver({ text: "replying now" }, { kind: "final" }); + }); + + const account = createMockAccount(); + const config: OpenClawConfig = {}; + const core = createMockRuntime(); + setBlueBubblesRuntime(core); + + unregister = registerBlueBubblesWebhookTarget({ + account, + config, + runtime: { log: vi.fn(), error: vi.fn() }, + core, + path: "/bluebubbles-webhook", + }); + + const inboundPayload = { + type: "new-message", + data: { + text: "hello", + handle: { address: "+15551234567" }, + isGroup: false, + isFromMe: false, + guid: "msg-1", + chatGuid: "iMessage;-;+15551234567", + date: Date.now(), + }, + }; + + const inboundReq = createMockRequest("POST", "/bluebubbles-webhook", inboundPayload); + const inboundRes = createMockResponse(); + + await handleBlueBubblesWebhookRequest(inboundReq, inboundRes); + await flushAsync(); + + // Send response did not include a message id, so nothing should be enqueued yet. + expect(mockEnqueueSystemEvent).not.toHaveBeenCalled(); + + const fromMePayload = { + type: "new-message", + data: { + text: "replying now", + handle: { address: "+15557654321" }, + isGroup: false, + isFromMe: true, + guid: "msg-out-456", + chatGuid: "iMessage;-;+15551234567", + date: Date.now(), + }, + }; + + const fromMeReq = createMockRequest("POST", "/bluebubbles-webhook", fromMePayload); + const fromMeRes = createMockResponse(); + + await handleBlueBubblesWebhookRequest(fromMeReq, fromMeRes); + await flushAsync(); + + expect(mockEnqueueSystemEvent).toHaveBeenCalledWith( + 'Assistant sent "replying now" [message_id:2]', + expect.objectContaining({ + sessionKey: "agent:main:bluebubbles:dm:+15551234567", + }), + ); + }); + + it("matches from-me fallback by chatIdentifier when chatGuid is missing", async () => { + mockEnqueueSystemEvent.mockClear(); + + const { sendMessageBlueBubbles } = await import("./send.js"); + vi.mocked(sendMessageBlueBubbles).mockResolvedValueOnce({ messageId: "ok" }); + + mockDispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async (params) => { + await params.dispatcherOptions.deliver({ text: "replying now" }, { kind: "final" }); + }); + + const account = createMockAccount(); + const config: OpenClawConfig = {}; + const core = createMockRuntime(); + setBlueBubblesRuntime(core); + + unregister = registerBlueBubblesWebhookTarget({ + account, + config, + runtime: { log: vi.fn(), error: vi.fn() }, + core, + path: "/bluebubbles-webhook", + }); + + const inboundPayload = { + type: "new-message", + data: { + text: "hello", + handle: { address: "+15551234567" }, + isGroup: false, + isFromMe: false, + guid: "msg-1", + chatGuid: "iMessage;-;+15551234567", + date: Date.now(), + }, + }; + + const inboundReq = createMockRequest("POST", "/bluebubbles-webhook", inboundPayload); + const inboundRes = createMockResponse(); + + await handleBlueBubblesWebhookRequest(inboundReq, inboundRes); + await flushAsync(); + + expect(mockEnqueueSystemEvent).not.toHaveBeenCalled(); + + const fromMePayload = { + type: "new-message", + data: { + text: "replying now", + handle: { address: "+15557654321" }, + isGroup: false, + isFromMe: true, + guid: "msg-out-789", + chatIdentifier: "+15551234567", + date: Date.now(), + }, + }; + + const fromMeReq = createMockRequest("POST", "/bluebubbles-webhook", fromMePayload); + const fromMeRes = createMockResponse(); + + await handleBlueBubblesWebhookRequest(fromMeReq, fromMeRes); + await flushAsync(); + + expect(mockEnqueueSystemEvent).toHaveBeenCalledWith( + 'Assistant sent "replying now" [message_id:2]', + expect.objectContaining({ + sessionKey: "agent:main:bluebubbles:dm:+15551234567", + }), + ); + }); }); describe("reaction events", () => { diff --git a/src/auto-reply/reply/inbound-meta.test.ts b/src/auto-reply/reply/inbound-meta.test.ts index a678623dcd0..7e0e8854b13 100644 --- a/src/auto-reply/reply/inbound-meta.test.ts +++ b/src/auto-reply/reply/inbound-meta.test.ts @@ -10,6 +10,14 @@ function parseInboundMetaPayload(text: string): Record { return JSON.parse(match[1]) as Record; } +function parseConversationInfoPayload(text: string): Record { + const match = text.match(/Conversation info \(untrusted metadata\):\n```json\n([\s\S]*?)\n```/); + if (!match?.[1]) { + throw new Error("missing conversation info json block"); + } + return JSON.parse(match[1]) as Record; +} + describe("buildInboundMetaSystemPrompt", () => { it("includes trusted message and routing ids for tool actions", () => { const prompt = buildInboundMetaSystemPrompt({ @@ -127,4 +135,24 @@ describe("buildInboundUserContextPrefix", () => { expect(text).toContain("Conversation info (untrusted metadata):"); expect(text).toContain('"conversation_label": "ops-room"'); }); + + it("includes sender identifier in conversation info", () => { + const text = buildInboundUserContextPrefix({ + ChatType: "direct", + SenderE164: " +15551234567 ", + } as TemplateContext); + + const conversationInfo = parseConversationInfoPayload(text); + expect(conversationInfo["sender"]).toBe("+15551234567"); + }); + + it("falls back to SenderId when sender phone is missing", () => { + const text = buildInboundUserContextPrefix({ + ChatType: "direct", + SenderId: " user@example.com ", + } as TemplateContext); + + const conversationInfo = parseConversationInfoPayload(text); + expect(conversationInfo["sender"]).toBe("user@example.com"); + }); }); diff --git a/src/auto-reply/reply/inbound-meta.ts b/src/auto-reply/reply/inbound-meta.ts index 5fdc1751193..109a8a3b073 100644 --- a/src/auto-reply/reply/inbound-meta.ts +++ b/src/auto-reply/reply/inbound-meta.ts @@ -62,6 +62,7 @@ export function buildInboundUserContextPrefix(ctx: TemplateContext): string { const conversationInfo = { conversation_label: isDirect ? undefined : safeTrim(ctx.ConversationLabel), + sender: safeTrim(ctx.SenderE164) ?? safeTrim(ctx.SenderId) ?? safeTrim(ctx.SenderUsername), group_subject: safeTrim(ctx.GroupSubject), group_channel: safeTrim(ctx.GroupChannel), group_space: safeTrim(ctx.GroupSpace),