mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(bluebubbles): recover outbound message IDs and include sender metadata
This commit is contained in:
@@ -25,6 +25,9 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- BlueBubbles: add fallback path to recover outbound `message_id` from `fromMe` webhooks when platform message IDs are missing.
|
||||
- BlueBubbles: match outbound message-id fallback recovery by chat identifier as well as account context.
|
||||
- BlueBubbles: include sender identifier in untrusted conversation metadata for conversation info payloads.
|
||||
- macOS/Update: correct the Sparkle appcast version for 2026.2.15 so updates are offered again. (#18201)
|
||||
- Gateway/Auth: clear stale device-auth tokens after device token mismatch errors so re-paired clients can re-auth. (#18201)
|
||||
- Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky.
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
logTypingFailure,
|
||||
resolveAckReaction,
|
||||
resolveControlCommandGate,
|
||||
stripMarkdown,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import { downloadBlueBubblesAttachment } from "./attachments.js";
|
||||
import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js";
|
||||
@@ -40,6 +41,135 @@ import { formatBlueBubblesChatTarget, isAllowedBlueBubblesSender } from "./targe
|
||||
const DEFAULT_TEXT_LIMIT = 4000;
|
||||
const invalidAckReactions = new Set<string>();
|
||||
const REPLY_DIRECTIVE_TAG_RE = /\[\[\s*(?:reply_to_current|reply_to\s*:\s*[^\]\n]+)\s*\]\]/gi;
|
||||
const PENDING_OUTBOUND_MESSAGE_ID_TTL_MS = 2 * 60 * 1000;
|
||||
|
||||
type PendingOutboundMessageId = {
|
||||
id: number;
|
||||
accountId: string;
|
||||
sessionKey: string;
|
||||
outboundTarget: string;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
snippetRaw: string;
|
||||
snippetNorm: string;
|
||||
isMediaSnippet: boolean;
|
||||
createdAt: number;
|
||||
};
|
||||
|
||||
const pendingOutboundMessageIds: PendingOutboundMessageId[] = [];
|
||||
let pendingOutboundMessageIdCounter = 0;
|
||||
|
||||
function trimOrUndefined(value?: string | null): string | undefined {
|
||||
const trimmed = value?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function normalizeSnippet(value: string): string {
|
||||
return stripMarkdown(value).replace(/\s+/g, " ").trim().toLowerCase();
|
||||
}
|
||||
|
||||
function prunePendingOutboundMessageIds(now = Date.now()): void {
|
||||
const cutoff = now - PENDING_OUTBOUND_MESSAGE_ID_TTL_MS;
|
||||
for (let i = pendingOutboundMessageIds.length - 1; i >= 0; i--) {
|
||||
if (pendingOutboundMessageIds[i].createdAt < cutoff) {
|
||||
pendingOutboundMessageIds.splice(i, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function rememberPendingOutboundMessageId(entry: {
|
||||
accountId: string;
|
||||
sessionKey: string;
|
||||
outboundTarget: string;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
snippet: string;
|
||||
}): number {
|
||||
prunePendingOutboundMessageIds();
|
||||
pendingOutboundMessageIdCounter += 1;
|
||||
const snippetRaw = entry.snippet.trim();
|
||||
const snippetNorm = normalizeSnippet(snippetRaw);
|
||||
pendingOutboundMessageIds.push({
|
||||
id: pendingOutboundMessageIdCounter,
|
||||
accountId: entry.accountId,
|
||||
sessionKey: entry.sessionKey,
|
||||
outboundTarget: entry.outboundTarget,
|
||||
chatGuid: trimOrUndefined(entry.chatGuid),
|
||||
chatIdentifier: trimOrUndefined(entry.chatIdentifier),
|
||||
chatId: typeof entry.chatId === "number" ? entry.chatId : undefined,
|
||||
snippetRaw,
|
||||
snippetNorm,
|
||||
isMediaSnippet: snippetRaw.toLowerCase().startsWith("<media:"),
|
||||
createdAt: Date.now(),
|
||||
});
|
||||
return pendingOutboundMessageIdCounter;
|
||||
}
|
||||
|
||||
function forgetPendingOutboundMessageId(id: number): void {
|
||||
const index = pendingOutboundMessageIds.findIndex((entry) => entry.id === id);
|
||||
if (index >= 0) {
|
||||
pendingOutboundMessageIds.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
function chatsMatch(
|
||||
left: Pick<PendingOutboundMessageId, "chatGuid" | "chatIdentifier" | "chatId">,
|
||||
right: { chatGuid?: string; chatIdentifier?: string; chatId?: number },
|
||||
): boolean {
|
||||
const leftGuid = trimOrUndefined(left.chatGuid);
|
||||
const rightGuid = trimOrUndefined(right.chatGuid);
|
||||
if (leftGuid && rightGuid) {
|
||||
return leftGuid === rightGuid;
|
||||
}
|
||||
|
||||
const leftIdentifier = trimOrUndefined(left.chatIdentifier);
|
||||
const rightIdentifier = trimOrUndefined(right.chatIdentifier);
|
||||
if (leftIdentifier && rightIdentifier) {
|
||||
return leftIdentifier === rightIdentifier;
|
||||
}
|
||||
|
||||
const leftChatId = typeof left.chatId === "number" ? left.chatId : undefined;
|
||||
const rightChatId = typeof right.chatId === "number" ? right.chatId : undefined;
|
||||
if (leftChatId !== undefined && rightChatId !== undefined) {
|
||||
return leftChatId === rightChatId;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function consumePendingOutboundMessageId(params: {
|
||||
accountId: string;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
body: string;
|
||||
}): PendingOutboundMessageId | null {
|
||||
prunePendingOutboundMessageIds();
|
||||
const bodyNorm = normalizeSnippet(params.body);
|
||||
const isMediaBody = params.body.trim().toLowerCase().startsWith("<media:");
|
||||
|
||||
for (let i = 0; i < pendingOutboundMessageIds.length; i++) {
|
||||
const entry = pendingOutboundMessageIds[i];
|
||||
if (entry.accountId !== params.accountId) {
|
||||
continue;
|
||||
}
|
||||
if (!chatsMatch(entry, params)) {
|
||||
continue;
|
||||
}
|
||||
if (entry.snippetNorm && entry.snippetNorm === bodyNorm) {
|
||||
pendingOutboundMessageIds.splice(i, 1);
|
||||
return entry;
|
||||
}
|
||||
if (entry.isMediaSnippet && isMediaBody) {
|
||||
pendingOutboundMessageIds.splice(i, 1);
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function logVerbose(
|
||||
core: BlueBubblesCoreRuntime,
|
||||
@@ -158,6 +288,26 @@ export async function processMessage(
|
||||
if (message.fromMe) {
|
||||
// Cache from-me messages so reply context can resolve sender/body.
|
||||
cacheInboundMessage();
|
||||
if (cacheMessageId) {
|
||||
const pending = consumePendingOutboundMessageId({
|
||||
accountId: account.accountId,
|
||||
chatGuid: message.chatGuid,
|
||||
chatIdentifier: message.chatIdentifier,
|
||||
chatId: message.chatId,
|
||||
body: rawBody,
|
||||
});
|
||||
if (pending) {
|
||||
const displayId = getShortIdForUuid(cacheMessageId) || cacheMessageId;
|
||||
const previewSource = pending.snippetRaw || rawBody;
|
||||
const preview = previewSource
|
||||
? ` "${previewSource.slice(0, 12)}${previewSource.length > 12 ? "…" : ""}"`
|
||||
: "";
|
||||
core.system.enqueueSystemEvent(`Assistant sent${preview} [message_id:${displayId}]`, {
|
||||
sessionKey: pending.sessionKey,
|
||||
contextKey: `bluebubbles:outbound:${pending.outboundTarget}:${cacheMessageId}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -629,10 +779,10 @@ export async function processMessage(
|
||||
? formatBlueBubblesChatTarget({ chatGuid: chatGuidForActions })
|
||||
: message.senderId;
|
||||
|
||||
const maybeEnqueueOutboundMessageId = (messageId?: string, snippet?: string) => {
|
||||
const maybeEnqueueOutboundMessageId = (messageId?: string, snippet?: string): boolean => {
|
||||
const trimmed = messageId?.trim();
|
||||
if (!trimmed || trimmed === "ok" || trimmed === "unknown") {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
// Cache outbound message to get short ID
|
||||
const cacheEntry = rememberBlueBubblesReplyCache({
|
||||
@@ -651,6 +801,7 @@ export async function processMessage(
|
||||
sessionKey: route.sessionKey,
|
||||
contextKey: `bluebubbles:outbound:${outboundTarget}:${trimmed}`,
|
||||
});
|
||||
return true;
|
||||
};
|
||||
const sanitizeReplyDirectiveText = (value: string): string => {
|
||||
if (privateApiEnabled) {
|
||||
@@ -768,16 +919,33 @@ export async function processMessage(
|
||||
for (const mediaUrl of mediaList) {
|
||||
const caption = first ? text : undefined;
|
||||
first = false;
|
||||
const result = await sendBlueBubblesMedia({
|
||||
cfg: config,
|
||||
to: outboundTarget,
|
||||
mediaUrl,
|
||||
caption: caption ?? undefined,
|
||||
replyToId: replyToMessageGuid || null,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
|
||||
maybeEnqueueOutboundMessageId(result.messageId, cachedBody);
|
||||
const pendingId = rememberPendingOutboundMessageId({
|
||||
accountId: account.accountId,
|
||||
sessionKey: route.sessionKey,
|
||||
outboundTarget,
|
||||
chatGuid: chatGuidForActions ?? chatGuid,
|
||||
chatIdentifier,
|
||||
chatId,
|
||||
snippet: cachedBody,
|
||||
});
|
||||
let result: Awaited<ReturnType<typeof sendBlueBubblesMedia>>;
|
||||
try {
|
||||
result = await sendBlueBubblesMedia({
|
||||
cfg: config,
|
||||
to: outboundTarget,
|
||||
mediaUrl,
|
||||
caption: caption ?? undefined,
|
||||
replyToId: replyToMessageGuid || null,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
} catch (err) {
|
||||
forgetPendingOutboundMessageId(pendingId);
|
||||
throw err;
|
||||
}
|
||||
if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) {
|
||||
forgetPendingOutboundMessageId(pendingId);
|
||||
}
|
||||
sentMessage = true;
|
||||
statusSink?.({ lastOutboundAt: Date.now() });
|
||||
if (info.kind === "block") {
|
||||
@@ -811,12 +979,29 @@ export async function processMessage(
|
||||
return;
|
||||
}
|
||||
for (const chunk of chunks) {
|
||||
const result = await sendMessageBlueBubbles(outboundTarget, chunk, {
|
||||
cfg: config,
|
||||
const pendingId = rememberPendingOutboundMessageId({
|
||||
accountId: account.accountId,
|
||||
replyToMessageGuid: replyToMessageGuid || undefined,
|
||||
sessionKey: route.sessionKey,
|
||||
outboundTarget,
|
||||
chatGuid: chatGuidForActions ?? chatGuid,
|
||||
chatIdentifier,
|
||||
chatId,
|
||||
snippet: chunk,
|
||||
});
|
||||
maybeEnqueueOutboundMessageId(result.messageId, chunk);
|
||||
let result: Awaited<ReturnType<typeof sendMessageBlueBubbles>>;
|
||||
try {
|
||||
result = await sendMessageBlueBubbles(outboundTarget, chunk, {
|
||||
cfg: config,
|
||||
accountId: account.accountId,
|
||||
replyToMessageGuid: replyToMessageGuid || undefined,
|
||||
});
|
||||
} catch (err) {
|
||||
forgetPendingOutboundMessageId(pendingId);
|
||||
throw err;
|
||||
}
|
||||
if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) {
|
||||
forgetPendingOutboundMessageId(pendingId);
|
||||
}
|
||||
sentMessage = true;
|
||||
statusSink?.({ lastOutboundAt: Date.now() });
|
||||
if (info.kind === "block") {
|
||||
|
||||
@@ -2470,6 +2470,149 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to from-me webhook when send response has no message id", async () => {
|
||||
mockEnqueueSystemEvent.mockClear();
|
||||
|
||||
const { sendMessageBlueBubbles } = await import("./send.js");
|
||||
vi.mocked(sendMessageBlueBubbles).mockResolvedValueOnce({ messageId: "ok" });
|
||||
|
||||
mockDispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async (params) => {
|
||||
await params.dispatcherOptions.deliver({ text: "replying now" }, { kind: "final" });
|
||||
});
|
||||
|
||||
const account = createMockAccount();
|
||||
const config: OpenClawConfig = {};
|
||||
const core = createMockRuntime();
|
||||
setBlueBubblesRuntime(core);
|
||||
|
||||
unregister = registerBlueBubblesWebhookTarget({
|
||||
account,
|
||||
config,
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
});
|
||||
|
||||
const inboundPayload = {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "hello",
|
||||
handle: { address: "+15551234567" },
|
||||
isGroup: false,
|
||||
isFromMe: false,
|
||||
guid: "msg-1",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
date: Date.now(),
|
||||
},
|
||||
};
|
||||
|
||||
const inboundReq = createMockRequest("POST", "/bluebubbles-webhook", inboundPayload);
|
||||
const inboundRes = createMockResponse();
|
||||
|
||||
await handleBlueBubblesWebhookRequest(inboundReq, inboundRes);
|
||||
await flushAsync();
|
||||
|
||||
// Send response did not include a message id, so nothing should be enqueued yet.
|
||||
expect(mockEnqueueSystemEvent).not.toHaveBeenCalled();
|
||||
|
||||
const fromMePayload = {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "replying now",
|
||||
handle: { address: "+15557654321" },
|
||||
isGroup: false,
|
||||
isFromMe: true,
|
||||
guid: "msg-out-456",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
date: Date.now(),
|
||||
},
|
||||
};
|
||||
|
||||
const fromMeReq = createMockRequest("POST", "/bluebubbles-webhook", fromMePayload);
|
||||
const fromMeRes = createMockResponse();
|
||||
|
||||
await handleBlueBubblesWebhookRequest(fromMeReq, fromMeRes);
|
||||
await flushAsync();
|
||||
|
||||
expect(mockEnqueueSystemEvent).toHaveBeenCalledWith(
|
||||
'Assistant sent "replying now" [message_id:2]',
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:bluebubbles:dm:+15551234567",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("matches from-me fallback by chatIdentifier when chatGuid is missing", async () => {
|
||||
mockEnqueueSystemEvent.mockClear();
|
||||
|
||||
const { sendMessageBlueBubbles } = await import("./send.js");
|
||||
vi.mocked(sendMessageBlueBubbles).mockResolvedValueOnce({ messageId: "ok" });
|
||||
|
||||
mockDispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async (params) => {
|
||||
await params.dispatcherOptions.deliver({ text: "replying now" }, { kind: "final" });
|
||||
});
|
||||
|
||||
const account = createMockAccount();
|
||||
const config: OpenClawConfig = {};
|
||||
const core = createMockRuntime();
|
||||
setBlueBubblesRuntime(core);
|
||||
|
||||
unregister = registerBlueBubblesWebhookTarget({
|
||||
account,
|
||||
config,
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
});
|
||||
|
||||
const inboundPayload = {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "hello",
|
||||
handle: { address: "+15551234567" },
|
||||
isGroup: false,
|
||||
isFromMe: false,
|
||||
guid: "msg-1",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
date: Date.now(),
|
||||
},
|
||||
};
|
||||
|
||||
const inboundReq = createMockRequest("POST", "/bluebubbles-webhook", inboundPayload);
|
||||
const inboundRes = createMockResponse();
|
||||
|
||||
await handleBlueBubblesWebhookRequest(inboundReq, inboundRes);
|
||||
await flushAsync();
|
||||
|
||||
expect(mockEnqueueSystemEvent).not.toHaveBeenCalled();
|
||||
|
||||
const fromMePayload = {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "replying now",
|
||||
handle: { address: "+15557654321" },
|
||||
isGroup: false,
|
||||
isFromMe: true,
|
||||
guid: "msg-out-789",
|
||||
chatIdentifier: "+15551234567",
|
||||
date: Date.now(),
|
||||
},
|
||||
};
|
||||
|
||||
const fromMeReq = createMockRequest("POST", "/bluebubbles-webhook", fromMePayload);
|
||||
const fromMeRes = createMockResponse();
|
||||
|
||||
await handleBlueBubblesWebhookRequest(fromMeReq, fromMeRes);
|
||||
await flushAsync();
|
||||
|
||||
expect(mockEnqueueSystemEvent).toHaveBeenCalledWith(
|
||||
'Assistant sent "replying now" [message_id:2]',
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:bluebubbles:dm:+15551234567",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("reaction events", () => {
|
||||
|
||||
@@ -10,6 +10,14 @@ function parseInboundMetaPayload(text: string): Record<string, unknown> {
|
||||
return JSON.parse(match[1]) as Record<string, unknown>;
|
||||
}
|
||||
|
||||
function parseConversationInfoPayload(text: string): Record<string, unknown> {
|
||||
const match = text.match(/Conversation info \(untrusted metadata\):\n```json\n([\s\S]*?)\n```/);
|
||||
if (!match?.[1]) {
|
||||
throw new Error("missing conversation info json block");
|
||||
}
|
||||
return JSON.parse(match[1]) as Record<string, unknown>;
|
||||
}
|
||||
|
||||
describe("buildInboundMetaSystemPrompt", () => {
|
||||
it("includes trusted message and routing ids for tool actions", () => {
|
||||
const prompt = buildInboundMetaSystemPrompt({
|
||||
@@ -127,4 +135,24 @@ describe("buildInboundUserContextPrefix", () => {
|
||||
expect(text).toContain("Conversation info (untrusted metadata):");
|
||||
expect(text).toContain('"conversation_label": "ops-room"');
|
||||
});
|
||||
|
||||
it("includes sender identifier in conversation info", () => {
|
||||
const text = buildInboundUserContextPrefix({
|
||||
ChatType: "direct",
|
||||
SenderE164: " +15551234567 ",
|
||||
} as TemplateContext);
|
||||
|
||||
const conversationInfo = parseConversationInfoPayload(text);
|
||||
expect(conversationInfo["sender"]).toBe("+15551234567");
|
||||
});
|
||||
|
||||
it("falls back to SenderId when sender phone is missing", () => {
|
||||
const text = buildInboundUserContextPrefix({
|
||||
ChatType: "direct",
|
||||
SenderId: " user@example.com ",
|
||||
} as TemplateContext);
|
||||
|
||||
const conversationInfo = parseConversationInfoPayload(text);
|
||||
expect(conversationInfo["sender"]).toBe("user@example.com");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -62,6 +62,7 @@ export function buildInboundUserContextPrefix(ctx: TemplateContext): string {
|
||||
|
||||
const conversationInfo = {
|
||||
conversation_label: isDirect ? undefined : safeTrim(ctx.ConversationLabel),
|
||||
sender: safeTrim(ctx.SenderE164) ?? safeTrim(ctx.SenderId) ?? safeTrim(ctx.SenderUsername),
|
||||
group_subject: safeTrim(ctx.GroupSubject),
|
||||
group_channel: safeTrim(ctx.GroupChannel),
|
||||
group_space: safeTrim(ctx.GroupSpace),
|
||||
|
||||
Reference in New Issue
Block a user