mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:30:44 +00:00
fix(bluebubbles): restore inbound image attachments and accept updated-message events
Four interconnected fixes for BlueBubbles inbound media: 1. Strip bundled-undici dispatcher from non-SSRF fetch path so attachment downloads no longer silently fail on Node 22+ (#64105, #61861) 2. Accept updated-message webhook events that carry attachments instead of filtering them as non-reaction events (#65430) 3. Include eventType in the persistent GUID dedup key so updated-message follow-ups are not rejected as duplicates of the original new-message (#52277) 4. Retry attachment fetch from BB API (2s delay) when the initial webhook arrives with an empty attachments array — image-only messages and updated-message events only (#67437) Closes #64105, closes #61861, closes #65430.
This commit is contained in:
@@ -95,6 +95,79 @@ function readMediaFetchErrorCode(error: unknown): MediaFetchErrorCode | undefine
|
||||
: undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch attachment metadata for a message from the BlueBubbles API.
|
||||
*
|
||||
* BlueBubbles sometimes fires the `new-message` webhook before attachment
|
||||
* indexing is complete, so `attachments` arrives as `[]`. This function
|
||||
* GETs the message by GUID and returns whatever attachments the server
|
||||
* has indexed by now. (#65430, #67437)
|
||||
*/
|
||||
export async function fetchBlueBubblesMessageAttachments(
|
||||
messageGuid: string,
|
||||
opts: {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
timeoutMs?: number;
|
||||
allowPrivateNetwork?: boolean;
|
||||
},
|
||||
): Promise<BlueBubblesAttachment[]> {
|
||||
const url = buildBlueBubblesApiUrl({
|
||||
baseUrl: opts.baseUrl,
|
||||
path: `/api/v1/message/${encodeURIComponent(messageGuid)}`,
|
||||
password: opts.password,
|
||||
});
|
||||
const policy = opts.allowPrivateNetwork ? { allowPrivateNetwork: true } : {};
|
||||
const response = await blueBubblesFetchWithTimeout(
|
||||
url,
|
||||
{ method: "GET" },
|
||||
opts.timeoutMs,
|
||||
policy,
|
||||
);
|
||||
if (!response.ok) {
|
||||
return [];
|
||||
}
|
||||
const json = (await response.json()) as Record<string, unknown>;
|
||||
const data = json.data as Record<string, unknown> | undefined;
|
||||
const rawAttachments = data?.attachments;
|
||||
if (!Array.isArray(rawAttachments)) {
|
||||
return [];
|
||||
}
|
||||
const out: BlueBubblesAttachment[] = [];
|
||||
for (const entry of rawAttachments) {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
continue;
|
||||
}
|
||||
const record = entry as Record<string, unknown>;
|
||||
const guid = typeof record.guid === "string" ? record.guid.trim() : undefined;
|
||||
if (!guid) {
|
||||
continue;
|
||||
}
|
||||
out.push({
|
||||
guid,
|
||||
mimeType:
|
||||
typeof record.mimeType === "string"
|
||||
? record.mimeType
|
||||
: typeof record.mime_type === "string"
|
||||
? record.mime_type
|
||||
: undefined,
|
||||
transferName:
|
||||
typeof record.transferName === "string"
|
||||
? record.transferName
|
||||
: typeof record.transfer_name === "string"
|
||||
? record.transfer_name
|
||||
: undefined,
|
||||
totalBytes:
|
||||
typeof record.totalBytes === "number"
|
||||
? record.totalBytes
|
||||
: typeof record.total_bytes === "number"
|
||||
? record.total_bytes
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export async function downloadBlueBubblesAttachment(
|
||||
attachment: BlueBubblesAttachment,
|
||||
opts: BlueBubblesAttachmentOpts & { maxBytes?: number } = {},
|
||||
|
||||
@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
_resetBlueBubblesInboundDedupForTest,
|
||||
claimBlueBubblesInboundMessage,
|
||||
resolveBlueBubblesInboundDedupeKey,
|
||||
} from "./inbound-dedupe.js";
|
||||
|
||||
async function claimAndFinalize(guid: string | undefined, accountId: string): Promise<string> {
|
||||
@@ -56,3 +57,38 @@ describe("claimBlueBubblesInboundMessage", () => {
|
||||
expect(await claimAndFinalize("g1", "acc")).toBe("claimed");
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveBlueBubblesInboundDedupeKey", () => {
|
||||
it("returns messageId for new-message events", () => {
|
||||
expect(resolveBlueBubblesInboundDedupeKey({ messageId: "msg-1" })).toBe("msg-1");
|
||||
});
|
||||
|
||||
it("returns associatedMessageGuid for balloon events", () => {
|
||||
expect(
|
||||
resolveBlueBubblesInboundDedupeKey({
|
||||
messageId: "balloon-1",
|
||||
balloonBundleId: "com.apple.messages.URLBalloonProvider",
|
||||
associatedMessageGuid: "msg-1",
|
||||
}),
|
||||
).toBe("msg-1");
|
||||
});
|
||||
|
||||
it("suffixes key with :updated for updated-message events", () => {
|
||||
expect(
|
||||
resolveBlueBubblesInboundDedupeKey({ messageId: "msg-1", eventType: "updated-message" }),
|
||||
).toBe("msg-1:updated");
|
||||
});
|
||||
|
||||
it("updated-message and new-message for same GUID produce distinct keys", () => {
|
||||
const newKey = resolveBlueBubblesInboundDedupeKey({ messageId: "msg-1" });
|
||||
const updatedKey = resolveBlueBubblesInboundDedupeKey({
|
||||
messageId: "msg-1",
|
||||
eventType: "updated-message",
|
||||
});
|
||||
expect(newKey).not.toBe(updatedKey);
|
||||
});
|
||||
|
||||
it("returns undefined when messageId is missing", () => {
|
||||
expect(resolveBlueBubblesInboundDedupeKey({})).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -136,15 +136,27 @@ function sanitizeGuid(guid: string | undefined | null): string | null {
|
||||
export function resolveBlueBubblesInboundDedupeKey(
|
||||
message: Pick<
|
||||
NormalizedWebhookMessage,
|
||||
"messageId" | "balloonBundleId" | "associatedMessageGuid"
|
||||
"messageId" | "balloonBundleId" | "associatedMessageGuid" | "eventType"
|
||||
>,
|
||||
): string | undefined {
|
||||
const balloonBundleId = message.balloonBundleId?.trim();
|
||||
const associatedMessageGuid = message.associatedMessageGuid?.trim();
|
||||
let base: string | undefined;
|
||||
if (balloonBundleId && associatedMessageGuid) {
|
||||
return associatedMessageGuid;
|
||||
base = associatedMessageGuid;
|
||||
} else {
|
||||
base = message.messageId?.trim() || undefined;
|
||||
}
|
||||
return message.messageId?.trim() || undefined;
|
||||
if (!base) {
|
||||
return undefined;
|
||||
}
|
||||
// `updated-message` events get a distinct key so they are not rejected as
|
||||
// duplicates of the already-committed `new-message` for the same GUID.
|
||||
// This lets attachment-carrying follow-up webhooks through. (#65430, #52277)
|
||||
if (message.eventType === "updated-message") {
|
||||
return `${base}:updated`;
|
||||
}
|
||||
return base;
|
||||
}
|
||||
|
||||
export type InboundDedupeClaim =
|
||||
|
||||
@@ -477,6 +477,8 @@ export type NormalizedWebhookMessage = {
|
||||
replyToId?: string;
|
||||
replyToBody?: string;
|
||||
replyToSender?: string;
|
||||
/** Webhook event type preserved for dedup key differentiation. */
|
||||
eventType?: string;
|
||||
};
|
||||
|
||||
export type NormalizedWebhookReaction = {
|
||||
@@ -687,6 +689,7 @@ function extractMessagePayload(payload: Record<string, unknown>): Record<string,
|
||||
|
||||
export function normalizeWebhookMessage(
|
||||
payload: Record<string, unknown>,
|
||||
options?: { eventType?: string },
|
||||
): NormalizedWebhookMessage | null {
|
||||
const message = extractMessagePayload(payload);
|
||||
if (!message) {
|
||||
@@ -774,6 +777,7 @@ export function normalizeWebhookMessage(
|
||||
replyToId: replyMetadata.replyToId,
|
||||
replyToBody: replyMetadata.replyToBody,
|
||||
replyToSender: replyMetadata.replyToSender,
|
||||
eventType: options?.eventType,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,10 @@ import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
} from "openclaw/plugin-sdk/text-runtime";
|
||||
import { downloadBlueBubblesAttachment } from "./attachments.js";
|
||||
import {
|
||||
downloadBlueBubblesAttachment,
|
||||
fetchBlueBubblesMessageAttachments,
|
||||
} from "./attachments.js";
|
||||
import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js";
|
||||
import { resolveBlueBubblesConversationRoute } from "./conversation-route.js";
|
||||
import { fetchBlueBubblesHistory } from "./history.js";
|
||||
@@ -1065,14 +1068,52 @@ async function processMessageAfterDedupe(
|
||||
? account.config.mediaMaxMb * 1024 * 1024
|
||||
: 8 * 1024 * 1024;
|
||||
|
||||
// BlueBubbles may fire the webhook before attachment indexing is complete,
|
||||
// so the initial `attachments` array can be empty for messages that actually
|
||||
// have media. When the message text is empty (image-only) or this is an
|
||||
// `updated-message` event, wait briefly and re-fetch from the BB API as a
|
||||
// fallback for cases where BB doesn't send a follow-up webhook. (#65430, #67437)
|
||||
let resolvedAttachments = attachments;
|
||||
const shouldRetryAttachments =
|
||||
resolvedAttachments.length === 0 &&
|
||||
message.messageId &&
|
||||
baseUrl &&
|
||||
password &&
|
||||
(text.length === 0 || message.eventType === "updated-message");
|
||||
if (shouldRetryAttachments) {
|
||||
try {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 2_000));
|
||||
const fetched = await fetchBlueBubblesMessageAttachments(message.messageId!, {
|
||||
baseUrl,
|
||||
password,
|
||||
timeoutMs: 10_000,
|
||||
allowPrivateNetwork: isPrivateNetworkOptInEnabled(account.config),
|
||||
});
|
||||
if (fetched.length > 0) {
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`attachment retry found ${fetched.length} attachment(s) for msgId=${message.messageId}`,
|
||||
);
|
||||
resolvedAttachments = fetched;
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`attachment retry failed for msgId=${message.messageId}: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mediaUrls: string[] = [];
|
||||
let mediaPaths: string[] = [];
|
||||
let mediaTypes: string[] = [];
|
||||
if (attachments.length > 0) {
|
||||
if (resolvedAttachments.length > 0) {
|
||||
if (!baseUrl || !password) {
|
||||
logVerbose(core, runtime, "attachment download skipped (missing serverUrl/password)");
|
||||
} else {
|
||||
for (const attachment of attachments) {
|
||||
for (const attachment of resolvedAttachments) {
|
||||
if (!attachment.guid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -249,11 +249,21 @@ export async function handleBlueBubblesWebhookRequest(
|
||||
return true;
|
||||
}
|
||||
const reaction = normalizeWebhookReaction(payload);
|
||||
// BlueBubbles fires `updated-message` when attachments are indexed after the
|
||||
// initial `new-message` (which may arrive with attachments: []). Let those
|
||||
// through so the agent can ingest the image. (#65430)
|
||||
const dataRecord = asRecord(asRecord(payload)?.data);
|
||||
const dataAttachments = dataRecord?.attachments;
|
||||
const isAttachmentUpdate =
|
||||
eventType === "updated-message" &&
|
||||
Array.isArray(dataAttachments) &&
|
||||
dataAttachments.length > 0;
|
||||
if (
|
||||
(eventType === "updated-message" ||
|
||||
eventType === "message-reaction" ||
|
||||
eventType === "reaction") &&
|
||||
!reaction
|
||||
!reaction &&
|
||||
!isAttachmentUpdate
|
||||
) {
|
||||
res.statusCode = 200;
|
||||
res.end("ok");
|
||||
@@ -266,7 +276,7 @@ export async function handleBlueBubblesWebhookRequest(
|
||||
}
|
||||
return true;
|
||||
}
|
||||
const message = reaction ? null : normalizeWebhookMessage(payload);
|
||||
const message = reaction ? null : normalizeWebhookMessage(payload, { eventType });
|
||||
if (!message && !reaction) {
|
||||
res.statusCode = 400;
|
||||
res.end("invalid payload");
|
||||
|
||||
@@ -175,10 +175,16 @@ export async function blueBubblesFetchWithTimeout(
|
||||
await release();
|
||||
}
|
||||
}
|
||||
// Strip `dispatcher` from init — the SSRF guard may have attached a bundled-undici
|
||||
// dispatcher that is incompatible with Node 22+'s built-in undici backing globalThis.fetch().
|
||||
// Passing it through causes a silent TypeError (invalid onRequestStart method). (#64105)
|
||||
const { dispatcher: _dispatcher, ...safeInit } = (init ?? {}) as RequestInit & {
|
||||
dispatcher?: unknown;
|
||||
};
|
||||
const controller = new AbortController();
|
||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||
try {
|
||||
return await fetch(url, { ...init, signal: controller.signal });
|
||||
return await fetch(url, { ...safeInit, signal: controller.signal });
|
||||
} finally {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user