mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:40:44 +00:00
feat(bluebubbles): add reply-context API fallback for cache misses (#71820)
Merged via squash.
Prepared head SHA: 04f6a8740a
Co-authored-by: coletebou <12384893+coletebou@users.noreply.github.com>
Co-authored-by: omarshahine <10343873+omarshahine@users.noreply.github.com>
Reviewed-by: @omarshahine
This commit is contained in:
@@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Messages/docs: clarify that `BodyForAgent` is the primary inbound model text while `Body` is the legacy envelope fallback, and add Signal coverage so channel hardening patches target the real prompt path. Refs #66198. Thanks @defonota3box.
|
||||
- Control UI/Usage: add UTC quarter-hour token buckets for the Usage Mosaic and reuse them for hour filtering, keeping the legacy session-span fallback for older summaries. (#74337) Thanks @konanok.
|
||||
- BlueBubbles: add opt-in `channels.bluebubbles.replyContextApiFallback` that fetches the original message from the BlueBubbles HTTP API when the in-memory reply-context cache misses (multi-instance deployments sharing one BB account, post-restart, after long-lived TTL/LRU eviction). Off by default; channel-level setting propagates to accounts that omit the flag through `mergeAccountConfig`; routed through the typed `BlueBubblesClient` so every fetch is SSRF-guarded by the same three-mode policy as every other BB client request; reply-id shape is validated and part-index prefixes (`p:0/<guid>`) are stripped before the request; concurrent webhooks for the same `replyToId` coalesce into one fetch and successful responses populate the reply cache for subsequent hits. Also promotes BlueBubbles attachment download failures from verbose to runtime error so silently-dropped inbound images are visible at default log level, and extends `sanitizeForLog` to redact `?password=…`/`?token=…` query params and `Authorization:` headers before they reach the log sink (CWE-532). (#71820) Thanks @coletebou and @zqchris.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -581,6 +581,7 @@ Full configuration: [Configuration](/gateway/configuration)
|
||||
- `channels.bluebubbles.coalesceSameSenderDms`: Merge consecutive same-sender DM webhooks into one agent turn so Apple's text+URL split-send arrives as a single message (default: `false`). See [Coalescing split-send DMs](#coalescing-split-send-dms-command--url-in-one-composition) for scenarios, window tuning, and trade-offs. Widens the default inbound debounce window from 500 ms to 2500 ms when enabled without an explicit `messages.inbound.byChannel.bluebubbles`.
|
||||
- `channels.bluebubbles.historyLimit`: Max group messages for context (0 disables).
|
||||
- `channels.bluebubbles.dmHistoryLimit`: DM history limit.
|
||||
- `channels.bluebubbles.replyContextApiFallback`: When an inbound reply lands without `replyToBody`/`replyToSender` and the in-memory reply-context cache misses, fetch the original message from the BlueBubbles HTTP API as a best-effort fallback (default: `false`). Useful for multi-instance deployments sharing one BlueBubbles account, after process restarts, or after long-lived TTL/LRU cache eviction. The fetch is SSRF-guarded by the same policy as every other BlueBubbles client request, never throws, and populates the cache so subsequent replies amortize. Per-account override: `channels.bluebubbles.accounts.<accountId>.replyContextApiFallback`. A channel-level setting propagates to accounts that omit the flag.
|
||||
|
||||
</Accordion>
|
||||
<Accordion title="Actions and accounts">
|
||||
|
||||
@@ -93,6 +93,21 @@ const bluebubblesAccountSchema = z
|
||||
network: bluebubblesNetworkSchema,
|
||||
catchup: bluebubblesCatchupSchema,
|
||||
blockStreaming: z.boolean().optional(),
|
||||
/**
|
||||
* When an inbound reply lands without `replyToBody`/`replyToSender` and the
|
||||
* in-memory reply cache misses (e.g., multi-instance deployments sharing
|
||||
* one BlueBubbles account, after process restarts, or after long-lived
|
||||
* cache eviction), opt in to fetching the original message from the
|
||||
* BlueBubbles HTTP API as a best-effort fallback. Off by default.
|
||||
*
|
||||
* Left as `.optional()` rather than `.optional().default(false)` so that a
|
||||
* channel-level `channels.bluebubbles.replyContextApiFallback: true` still
|
||||
* propagates to accounts that omit the field. With a hard per-account
|
||||
* default, the merge would clobber the channel value with `false` and
|
||||
* operators would have to duplicate the flag under every `accounts.<id>`.
|
||||
* (PR #71820 review)
|
||||
*/
|
||||
replyContextApiFallback: z.boolean().optional(),
|
||||
groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(),
|
||||
coalesceSameSenderDms: z.boolean().optional(),
|
||||
})
|
||||
|
||||
@@ -60,6 +60,7 @@ import {
|
||||
resolveBlueBubblesMessageId,
|
||||
resolveReplyContextFromCache,
|
||||
} from "./monitor-reply-cache.js";
|
||||
import { fetchBlueBubblesReplyContext } from "./monitor-reply-fetch.js";
|
||||
import {
|
||||
hasBlueBubblesSelfChatCopy,
|
||||
rememberBlueBubblesSelfChatCopy,
|
||||
@@ -1235,11 +1236,17 @@ async function processMessageAfterDedupe(
|
||||
mediaTypes.push(saved.contentType);
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`attachment download failed guid=${sanitizeForLog(attachment.guid)} err=${sanitizeForLog(err)}`,
|
||||
// Promote to runtime.error so silently-dropped inbound images are
|
||||
// visible at default log level, while keeping verbose detail for
|
||||
// debug sessions. Sanitize both fields — BB attachment GUIDs are
|
||||
// user-influenced and the error chain can carry the password
|
||||
// (see sanitizeForLog above).
|
||||
const safeGuid = sanitizeForLog(attachment.guid, 80);
|
||||
const safeErr = sanitizeForLog(err);
|
||||
runtime.error?.(
|
||||
`[bluebubbles] attachment download failed guid=${safeGuid} err=${safeErr}`,
|
||||
);
|
||||
logVerbose(core, runtime, `attachment download failed guid=${safeGuid} err=${safeErr}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1280,6 +1287,49 @@ async function processMessageAfterDedupe(
|
||||
}
|
||||
}
|
||||
|
||||
// Opt-in fallback: if the in-memory cache missed and the BB credentials are
|
||||
// available, ask the BlueBubbles HTTP API for the original message. Useful
|
||||
// when multiple OpenClaw instances share one BB account, after a restart,
|
||||
// or when the cache TTL has evicted the message. Best-effort, never throws.
|
||||
if (
|
||||
replyToId &&
|
||||
(!replyToBody || !replyToSender) &&
|
||||
baseUrl &&
|
||||
password &&
|
||||
account.config.replyContextApiFallback === true
|
||||
) {
|
||||
const fetched = await fetchBlueBubblesReplyContext({
|
||||
accountId: account.accountId,
|
||||
replyToId,
|
||||
baseUrl,
|
||||
password,
|
||||
accountConfig: account.config,
|
||||
chatGuid: message.chatGuid,
|
||||
chatIdentifier: message.chatIdentifier,
|
||||
chatId: message.chatId,
|
||||
});
|
||||
if (fetched) {
|
||||
if (!replyToBody && fetched.body) {
|
||||
replyToBody = fetched.body;
|
||||
}
|
||||
if (!replyToSender && fetched.sender) {
|
||||
replyToSender = fetched.sender;
|
||||
}
|
||||
if (core.logging.shouldLogVerbose()) {
|
||||
// Run the body preview through sanitizeForLog so the redaction regex
|
||||
// (?password=, ?token=, Authorization: …) catches credential-shaped
|
||||
// strings that may appear in user message bodies, matching the
|
||||
// hygiene of adjacent verbose log lines in this file.
|
||||
const preview = sanitizeForLog((fetched.body ?? "").replace(/\s+/g, " "), 120);
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`reply-context API fallback replyToId=${sanitizeForLog(replyToId)} sender=${sanitizeForLog(fetched.sender ?? "")} body="${preview}"`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no cached short ID, try to get one from the UUID directly
|
||||
if (replyToId && !replyToShortId) {
|
||||
replyToShortId = getShortIdForUuid(replyToId);
|
||||
|
||||
450
extensions/bluebubbles/src/monitor-reply-fetch.test.ts
Normal file
450
extensions/bluebubbles/src/monitor-reply-fetch.test.ts
Normal file
@@ -0,0 +1,450 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { BlueBubblesClient, createBlueBubblesClientFromParts } from "./client.js";
|
||||
import {
|
||||
_resetBlueBubblesShortIdState,
|
||||
resolveReplyContextFromCache,
|
||||
} from "./monitor-reply-cache.js";
|
||||
import {
|
||||
_resetBlueBubblesReplyFetchState,
|
||||
fetchBlueBubblesReplyContext,
|
||||
} from "./monitor-reply-fetch.js";
|
||||
|
||||
type FactoryParams = Parameters<typeof createBlueBubblesClientFromParts>[0];
|
||||
type RequestParams = Parameters<BlueBubblesClient["request"]>[0];
|
||||
|
||||
const baseParams = {
|
||||
accountId: "default",
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "s3cret",
|
||||
} as const;
|
||||
|
||||
function jsonResponse(body: unknown, status = 200): Response {
|
||||
return new Response(JSON.stringify(body), {
|
||||
status,
|
||||
headers: { "content-type": "application/json" },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a fake client factory that records every constructor + request call
|
||||
* and serves a queue of canned responses. Returns the factory plus a `calls`
|
||||
* accessor so tests can assert on factory params (SSRF mode inputs) and
|
||||
* request params (path, timeout).
|
||||
*/
|
||||
function makeFakeClient(
|
||||
responses:
|
||||
| Array<Response | Error | (() => Promise<Response>)>
|
||||
| (() => Response | Promise<Response>),
|
||||
) {
|
||||
const factoryCalls: FactoryParams[] = [];
|
||||
const requestCalls: RequestParams[] = [];
|
||||
let cursor = 0;
|
||||
const factory = vi.fn((factoryParams: FactoryParams): BlueBubblesClient => {
|
||||
factoryCalls.push(factoryParams);
|
||||
const request = vi.fn(async (requestParams: RequestParams) => {
|
||||
requestCalls.push(requestParams);
|
||||
if (typeof responses === "function") {
|
||||
return await responses();
|
||||
}
|
||||
const next = responses[cursor++];
|
||||
if (next instanceof Error) {
|
||||
throw next;
|
||||
}
|
||||
if (typeof next === "function") {
|
||||
return await next();
|
||||
}
|
||||
return next ?? new Response("", { status: 500 });
|
||||
});
|
||||
return { request } as unknown as BlueBubblesClient;
|
||||
});
|
||||
return { factory, factoryCalls, requestCalls };
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
_resetBlueBubblesReplyFetchState();
|
||||
_resetBlueBubblesShortIdState();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
_resetBlueBubblesReplyFetchState();
|
||||
_resetBlueBubblesShortIdState();
|
||||
});
|
||||
|
||||
describe("fetchBlueBubblesReplyContext", () => {
|
||||
it("returns null when replyToId is empty", async () => {
|
||||
const { factory } = makeFakeClient([]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: " ",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
expect(factory).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns null when baseUrl or password are missing", async () => {
|
||||
const { factory } = makeFakeClient([]);
|
||||
expect(
|
||||
await fetchBlueBubblesReplyContext({
|
||||
accountId: "default",
|
||||
baseUrl: "",
|
||||
password: "x",
|
||||
replyToId: "msg-1",
|
||||
clientFactory: factory,
|
||||
}),
|
||||
).toBeNull();
|
||||
expect(
|
||||
await fetchBlueBubblesReplyContext({
|
||||
accountId: "default",
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "",
|
||||
replyToId: "msg-1",
|
||||
clientFactory: factory,
|
||||
}),
|
||||
).toBeNull();
|
||||
expect(factory).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rejects pathological reply ids before issuing a request", async () => {
|
||||
// Each case is rejected for a different reason: empty/whitespace, trailing
|
||||
// slash that yields an empty bare segment, characters outside the GUID
|
||||
// charset, or length cap. Note: `../etc/passwd` is *not* pathological —
|
||||
// sanitizeReplyToId strips to `passwd`, which is a syntactically valid
|
||||
// bare GUID. The path goes through encodeURIComponent, so there is no
|
||||
// traversal; the server returns 404 and the caller proceeds with null.
|
||||
const cases = ["", " ", "abc/", "abc def", "abc?x=1", "a".repeat(129)];
|
||||
for (const replyToId of cases) {
|
||||
const { factory } = makeFakeClient([]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId,
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result, `replyToId=${JSON.stringify(replyToId)}`).toBeNull();
|
||||
expect(factory, `replyToId=${JSON.stringify(replyToId)}`).not.toHaveBeenCalled();
|
||||
}
|
||||
});
|
||||
|
||||
it("strips part-index prefix (`p:0/<guid>` → `<guid>`) before fetching", async () => {
|
||||
const { factory, requestCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "hi", handle: { address: "+15551234567" } } }),
|
||||
]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "p:0/msg-bare-guid",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result?.body).toBe("hi");
|
||||
expect(requestCalls[0]?.path).toBe("/api/v1/message/msg-bare-guid");
|
||||
});
|
||||
|
||||
it("fetches the BB API and returns body + normalized sender on success", async () => {
|
||||
const { factory, requestCalls } = makeFakeClient([
|
||||
jsonResponse({
|
||||
data: {
|
||||
text: " hello world ",
|
||||
handle: { address: " +15551234567 " },
|
||||
},
|
||||
}),
|
||||
]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-1",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result).toEqual({ body: "hello world", sender: "+15551234567" });
|
||||
expect(factory).toHaveBeenCalledTimes(1);
|
||||
expect(requestCalls[0]?.method).toBe("GET");
|
||||
expect(requestCalls[0]?.path).toBe("/api/v1/message/msg-1");
|
||||
});
|
||||
|
||||
it("lowercases email handles via normalizeBlueBubblesHandle", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "hi", handle: { address: "Foo@Example.COM" } } }),
|
||||
]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-email",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result?.sender).toBe("foo@example.com");
|
||||
});
|
||||
|
||||
it("populates the reply cache so subsequent lookups hit RAM", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "cached me", handle: { address: "+15551112222" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-cache",
|
||||
chatGuid: "iMessage;-;+15551112222",
|
||||
clientFactory: factory,
|
||||
});
|
||||
const cached = resolveReplyContextFromCache({
|
||||
accountId: "default",
|
||||
replyToId: "msg-cache",
|
||||
chatGuid: "iMessage;-;+15551112222",
|
||||
});
|
||||
expect(cached?.body).toBe("cached me");
|
||||
expect(cached?.senderLabel).toBe("+15551112222");
|
||||
expect(cached?.shortId).toBeTruthy();
|
||||
});
|
||||
|
||||
it("falls back through text → body → subject for the message body", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ data: { body: "from body field" } }),
|
||||
jsonResponse({ data: { subject: "from subject field" } }),
|
||||
]);
|
||||
const a = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-a",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(a?.body).toBe("from body field");
|
||||
const b = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-b",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(b?.body).toBe("from subject field");
|
||||
});
|
||||
|
||||
it("falls back through handle.address → handle.id → senderId → sender for the sender", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { id: "+15550000001" } } }),
|
||||
jsonResponse({ data: { text: "x", senderId: "+15550000002" } }),
|
||||
jsonResponse({ data: { text: "x", sender: "+15550000003" } }),
|
||||
]);
|
||||
const a = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "h-a",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(a?.sender).toBe("+15550000001");
|
||||
const b = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "h-b",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(b?.sender).toBe("+15550000002");
|
||||
const c = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "h-c",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(c?.sender).toBe("+15550000003");
|
||||
});
|
||||
|
||||
it("accepts the BB response either wrapped under `data` or at the top level", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ text: "no envelope", handle: { address: "user@host" } }),
|
||||
]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-flat",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result?.body).toBe("no envelope");
|
||||
expect(result?.sender).toBe("user@host");
|
||||
});
|
||||
|
||||
it("returns null on non-2xx without throwing", async () => {
|
||||
const { factory } = makeFakeClient([new Response("nope", { status: 404 })]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "missing",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when the underlying request throws (network error / timeout)", async () => {
|
||||
const { factory } = makeFakeClient([new Error("ECONNRESET")]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "boom",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when JSON parsing fails", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
new Response("not json", { status: 200, headers: { "content-type": "text/plain" } }),
|
||||
]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "garbage",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when neither body nor sender can be extracted", async () => {
|
||||
const { factory } = makeFakeClient([jsonResponse({ data: { irrelevant: 1 } })]);
|
||||
const result = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "blank",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("dedupes concurrent fetches for the same accountId + replyToId", async () => {
|
||||
let resolveOnce: (value: Response) => void = () => {};
|
||||
const pending = new Promise<Response>((resolve) => {
|
||||
resolveOnce = resolve;
|
||||
});
|
||||
const { factory } = makeFakeClient(() => pending);
|
||||
const a = fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "shared",
|
||||
clientFactory: factory,
|
||||
});
|
||||
const b = fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "shared",
|
||||
clientFactory: factory,
|
||||
});
|
||||
// Only one client construction; in-flight dedupe coalesces both callers.
|
||||
expect(factory).toHaveBeenCalledTimes(1);
|
||||
resolveOnce(
|
||||
jsonResponse({ data: { text: "shared body", handle: { address: "+15558675309" } } }),
|
||||
);
|
||||
const [resA, resB] = await Promise.all([a, b]);
|
||||
expect(resA).toEqual({ body: "shared body", sender: "+15558675309" });
|
||||
expect(resB).toEqual(resA);
|
||||
});
|
||||
|
||||
it("does not dedupe across different accountIds", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "a", handle: { address: "+15551000001" } } }),
|
||||
jsonResponse({ data: { text: "b", handle: { address: "+15551000002" } } }),
|
||||
]);
|
||||
const [a, b] = await Promise.all([
|
||||
fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
accountId: "acct-a",
|
||||
replyToId: "same",
|
||||
clientFactory: factory,
|
||||
}),
|
||||
fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
accountId: "acct-b",
|
||||
replyToId: "same",
|
||||
clientFactory: factory,
|
||||
}),
|
||||
]);
|
||||
expect(factory).toHaveBeenCalledTimes(2);
|
||||
expect(a?.body).toBe("a");
|
||||
expect(b?.body).toBe("b");
|
||||
});
|
||||
|
||||
it("releases the in-flight slot once a request completes (next call re-fetches)", async () => {
|
||||
const { factory } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "first", handle: { address: "+15552000001" } } }),
|
||||
jsonResponse({ data: { text: "second", handle: { address: "+15552000002" } } }),
|
||||
]);
|
||||
const first = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-x",
|
||||
clientFactory: factory,
|
||||
});
|
||||
const second = await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "msg-x",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(factory).toHaveBeenCalledTimes(2);
|
||||
expect(first?.body).toBe("first");
|
||||
expect(second?.body).toBe("second");
|
||||
});
|
||||
|
||||
it("threads explicit private-network opt-in through to the typed client (mode 1)", async () => {
|
||||
const { factory, factoryCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { address: "+15553000001" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "ssrf-on",
|
||||
accountConfig: { network: { dangerouslyAllowPrivateNetwork: true } },
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(factoryCalls[0]?.allowPrivateNetwork).toBe(true);
|
||||
expect(factoryCalls[0]?.allowPrivateNetworkConfig).toBe(true);
|
||||
});
|
||||
|
||||
it("treats local/loopback baseUrls as implicit private-network opt-in (mode 1)", async () => {
|
||||
// `http://localhost:1234` is a private hostname; without an explicit
|
||||
// opt-out the resolver treats this as the self-hosted case, matching
|
||||
// resolveBlueBubblesEffectiveAllowPrivateNetworkFromConfig.
|
||||
const { factory, factoryCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { address: "+15554000001" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "ssrf-implicit",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(factoryCalls[0]?.allowPrivateNetwork).toBe(true);
|
||||
expect(factoryCalls[0]?.allowPrivateNetworkConfig).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not mark public BB hosts as private-network when opt-in is absent (mode 2)", async () => {
|
||||
const { factory, factoryCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { address: "user@example.com" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
accountId: "default",
|
||||
baseUrl: "https://bb.example.com",
|
||||
password: "s3cret",
|
||||
replyToId: "ssrf-public",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(factoryCalls[0]?.allowPrivateNetwork).toBe(false);
|
||||
});
|
||||
|
||||
it("propagates explicit opt-out on a private host (mode 3)", async () => {
|
||||
const { factory, factoryCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { address: "+15555000001" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "ssrf-opt-out",
|
||||
accountConfig: { network: { dangerouslyAllowPrivateNetwork: false } },
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(factoryCalls[0]?.allowPrivateNetwork).toBe(false);
|
||||
expect(factoryCalls[0]?.allowPrivateNetworkConfig).toBe(false);
|
||||
});
|
||||
|
||||
it("never passes undefined for allowPrivateNetwork to the typed client (regression for #71820 codex review)", async () => {
|
||||
// The typed client owns SSRF policy resolution internally and cannot
|
||||
// produce an undefined policy. This test guards the invariant at the
|
||||
// call boundary: we always pass a concrete boolean for
|
||||
// allowPrivateNetwork so the resolver picks a deterministic mode.
|
||||
const { factory, factoryCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { address: "+15556000001" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "ssrf-defined",
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(typeof factoryCalls[0]?.allowPrivateNetwork).toBe("boolean");
|
||||
});
|
||||
|
||||
it("uses the configured timeout on both the factory and the request call", async () => {
|
||||
const { factory, factoryCalls, requestCalls } = makeFakeClient([
|
||||
jsonResponse({ data: { text: "x", handle: { address: "+15555000001" } } }),
|
||||
]);
|
||||
await fetchBlueBubblesReplyContext({
|
||||
...baseParams,
|
||||
replyToId: "tm",
|
||||
timeoutMs: 1234,
|
||||
clientFactory: factory,
|
||||
});
|
||||
expect(factoryCalls[0]?.timeoutMs).toBe(1234);
|
||||
expect(requestCalls[0]?.timeoutMs).toBe(1234);
|
||||
});
|
||||
});
|
||||
198
extensions/bluebubbles/src/monitor-reply-fetch.ts
Normal file
198
extensions/bluebubbles/src/monitor-reply-fetch.ts
Normal file
@@ -0,0 +1,198 @@
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import {
|
||||
resolveBlueBubblesEffectiveAllowPrivateNetworkFromConfig,
|
||||
resolveBlueBubblesPrivateNetworkConfigValue,
|
||||
} from "./accounts-normalization.js";
|
||||
import { createBlueBubblesClientFromParts } from "./client.js";
|
||||
import { rememberBlueBubblesReplyCache } from "./monitor-reply-cache.js";
|
||||
import { normalizeBlueBubblesHandle } from "./targets.js";
|
||||
import type { BlueBubblesAccountConfig } from "./types.js";
|
||||
|
||||
const DEFAULT_REPLY_FETCH_TIMEOUT_MS = 5_000;
|
||||
|
||||
// Reject pathological GUIDs before they reach the API path: a trailing slash
|
||||
// would yield an empty bare GUID and turn the request into a list query
|
||||
// against `/api/v1/message/`; arbitrary characters could let a malformed
|
||||
// payload steer encoded path segments. Real BlueBubbles GUIDs are alnum + the
|
||||
// punctuation set below; 128 chars is comfortable headroom (CWE-20).
|
||||
const REPLY_TO_ID_PATTERN = /^[A-Za-z0-9._:-]+$/;
|
||||
const REPLY_TO_ID_MAX_LENGTH = 128;
|
||||
|
||||
export type BlueBubblesReplyFetchResult = {
|
||||
body?: string;
|
||||
sender?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* In-flight dedupe so concurrent webhooks for replies to the same message
|
||||
* (e.g., several recipients in a group chat replying near-simultaneously)
|
||||
* coalesce into a single BlueBubbles HTTP fetch.
|
||||
*
|
||||
* Key shape: `${accountId}:${replyToId}` to keep accounts isolated.
|
||||
*/
|
||||
const inflight = new Map<string, Promise<BlueBubblesReplyFetchResult | null>>();
|
||||
|
||||
/**
|
||||
* @internal Reset shared module state. Test-only.
|
||||
*/
|
||||
export function _resetBlueBubblesReplyFetchState(): void {
|
||||
inflight.clear();
|
||||
}
|
||||
|
||||
export type FetchBlueBubblesReplyContextParams = {
|
||||
accountId: string;
|
||||
replyToId: string;
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
/**
|
||||
* Optional account config — used to resolve the SSRF policy for this fetch
|
||||
* via the same three-mode resolver the BlueBubbles client uses. Even when
|
||||
* omitted the request is still SSRF-guarded; the typed client routes
|
||||
* through the resolver internally and never returns `undefined`.
|
||||
*/
|
||||
accountConfig?: BlueBubblesAccountConfig;
|
||||
/** Optional chat scope used to populate the reply cache for subsequent hits. */
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
/** Defaults to 5_000 ms. */
|
||||
timeoutMs?: number;
|
||||
/** Override the typed client factory. Test seam. */
|
||||
clientFactory?: typeof createBlueBubblesClientFromParts;
|
||||
};
|
||||
|
||||
/**
|
||||
* Best-effort fallback: when the local in-memory reply cache misses, ask the
|
||||
* BlueBubbles HTTP API for the original message so the agent still gets reply
|
||||
* context. Returns `null` on any failure (network error, non-2xx, parse error,
|
||||
* empty payload). Never throws.
|
||||
*
|
||||
* On success, the cache is populated so subsequent replies to the same message
|
||||
* resolve from RAM without another round-trip.
|
||||
*
|
||||
* Cache misses happen in legitimate, common deployments: multi-instance setups
|
||||
* sharing one BB account, container/process restarts, cross-tenant shared
|
||||
* groups, and long-lived chats where TTL/LRU has evicted the message.
|
||||
*/
|
||||
export function fetchBlueBubblesReplyContext(
|
||||
params: FetchBlueBubblesReplyContextParams,
|
||||
): Promise<BlueBubblesReplyFetchResult | null> {
|
||||
const replyToId = sanitizeReplyToId(params.replyToId);
|
||||
if (!replyToId || !params.baseUrl || !params.password) {
|
||||
return Promise.resolve(null);
|
||||
}
|
||||
const key = `${params.accountId}:${replyToId}`;
|
||||
const existing = inflight.get(key);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const promise = runFetch(params, replyToId).finally(() => {
|
||||
inflight.delete(key);
|
||||
});
|
||||
inflight.set(key, promise);
|
||||
return promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip a part-index prefix (`p:0/<guid>` → `<guid>`) and validate the result
|
||||
* against the GUID character set + length cap. Returns null when the id is
|
||||
* empty or cannot safely be used as a path segment.
|
||||
*/
|
||||
function sanitizeReplyToId(raw: string): string | null {
|
||||
const trimmed = raw.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const bare = trimmed.includes("/") ? (trimmed.split("/").pop() ?? "") : trimmed;
|
||||
if (!bare || bare.length > REPLY_TO_ID_MAX_LENGTH || !REPLY_TO_ID_PATTERN.test(bare)) {
|
||||
return null;
|
||||
}
|
||||
return bare;
|
||||
}
|
||||
|
||||
async function runFetch(
|
||||
params: FetchBlueBubblesReplyContextParams,
|
||||
replyToId: string,
|
||||
): Promise<BlueBubblesReplyFetchResult | null> {
|
||||
const factory = params.clientFactory ?? createBlueBubblesClientFromParts;
|
||||
// Route through the typed BlueBubbles client. `client.request()` always
|
||||
// applies the SSRF policy resolved via the canonical three-mode helper
|
||||
// (mode 1: explicit private-network opt-in, mode 2: hostname allowlist for
|
||||
// trusted self-hosted servers, mode 3: default-deny guard). Going through
|
||||
// the typed surface guarantees consistency with every other BB client
|
||||
// request and removes the risk of an `undefined` policy slipping past the
|
||||
// guard. (PR #71820 review; same threat model as #68234.)
|
||||
const client = factory({
|
||||
accountId: params.accountId,
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
allowPrivateNetwork: resolveBlueBubblesEffectiveAllowPrivateNetworkFromConfig({
|
||||
baseUrl: params.baseUrl,
|
||||
config: params.accountConfig,
|
||||
}),
|
||||
allowPrivateNetworkConfig: resolveBlueBubblesPrivateNetworkConfigValue(params.accountConfig),
|
||||
timeoutMs: params.timeoutMs ?? DEFAULT_REPLY_FETCH_TIMEOUT_MS,
|
||||
});
|
||||
try {
|
||||
const response = await client.request({
|
||||
method: "GET",
|
||||
path: `/api/v1/message/${encodeURIComponent(replyToId)}`,
|
||||
timeoutMs: params.timeoutMs ?? DEFAULT_REPLY_FETCH_TIMEOUT_MS,
|
||||
});
|
||||
if (!response.ok) {
|
||||
return null;
|
||||
}
|
||||
const json = (await response.json()) as Record<string, unknown>;
|
||||
const data = (json.data ?? json) as Record<string, unknown> | undefined;
|
||||
if (!data || typeof data !== "object") {
|
||||
return null;
|
||||
}
|
||||
const body = extractBody(data);
|
||||
const sender = extractSender(data);
|
||||
if (!body && !sender) {
|
||||
return null;
|
||||
}
|
||||
rememberBlueBubblesReplyCache({
|
||||
accountId: params.accountId,
|
||||
messageId: replyToId,
|
||||
chatGuid: params.chatGuid,
|
||||
chatIdentifier: params.chatIdentifier,
|
||||
chatId: params.chatId,
|
||||
senderLabel: sender,
|
||||
body,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
return { body, sender };
|
||||
} catch {
|
||||
// Best-effort: swallow network/parse errors. Caller proceeds with empty
|
||||
// reply context, which matches existing pre-fallback behavior.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function extractBody(data: Record<string, unknown>): string | undefined {
|
||||
return (
|
||||
normalizeOptionalString(data.text) ??
|
||||
normalizeOptionalString(data.body) ??
|
||||
normalizeOptionalString(data.subject)
|
||||
);
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): Record<string, unknown> | undefined {
|
||||
return value !== null && typeof value === "object"
|
||||
? (value as Record<string, unknown>)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function extractSender(data: Record<string, unknown>): string | undefined {
|
||||
const handle = asRecord(data.handle) ?? asRecord(data.sender);
|
||||
const raw =
|
||||
normalizeOptionalString(handle?.address) ??
|
||||
normalizeOptionalString(handle?.id) ??
|
||||
normalizeOptionalString(data.senderId) ??
|
||||
normalizeOptionalString(data.sender);
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
return normalizeBlueBubblesHandle(raw) || raw;
|
||||
}
|
||||
@@ -324,6 +324,54 @@ describe("resolveBlueBubblesAccount", () => {
|
||||
expect(resolved.baseUrl).toBe("http://localhost:1234");
|
||||
});
|
||||
|
||||
it("inherits channel-level replyContextApiFallback for accounts that omit the flag (#71820)", () => {
|
||||
// Codex P2: a per-account `.default(false)` would clobber channel-level
|
||||
// `replyContextApiFallback: true` during the merge, so multi-account
|
||||
// operators flipping the global toggle would silently get nothing
|
||||
// unless they duplicated the flag under every `accounts.<id>` block.
|
||||
// Verify the runtime resolver actually picks up the channel value.
|
||||
const resolved = resolveBlueBubblesAccount({
|
||||
cfg: {
|
||||
channels: {
|
||||
bluebubbles: {
|
||||
replyContextApiFallback: true,
|
||||
accounts: {
|
||||
work: {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "secret", // pragma: allowlist secret
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
accountId: "work",
|
||||
});
|
||||
|
||||
expect(resolved.config.replyContextApiFallback).toBe(true);
|
||||
});
|
||||
|
||||
it("lets account-level replyContextApiFallback override channel-level (#71820)", () => {
|
||||
const resolved = resolveBlueBubblesAccount({
|
||||
cfg: {
|
||||
channels: {
|
||||
bluebubbles: {
|
||||
replyContextApiFallback: true,
|
||||
accounts: {
|
||||
work: {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "secret", // pragma: allowlist secret
|
||||
replyContextApiFallback: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
accountId: "work",
|
||||
});
|
||||
|
||||
expect(resolved.config.replyContextApiFallback).toBe(false);
|
||||
});
|
||||
|
||||
it("strips stale legacy private-network aliases after canonical normalization", () => {
|
||||
const resolved = resolveBlueBubblesAccount({
|
||||
cfg: {
|
||||
@@ -462,6 +510,53 @@ describe("BlueBubblesConfigSchema", () => {
|
||||
|
||||
expect(parsed.success).toBe(true);
|
||||
});
|
||||
|
||||
it("does not materialize a per-account default for replyContextApiFallback (#71820)", () => {
|
||||
// Codex review: a per-account `.default(false)` would clobber a
|
||||
// channel-level `replyContextApiFallback: true` during account merge,
|
||||
// forcing operators to duplicate the flag under every `accounts.<id>`.
|
||||
// The schema is `.optional()` (no default) so account-level absence
|
||||
// means "inherit from channel".
|
||||
const parsed = BlueBubblesConfigSchema.safeParse({
|
||||
replyContextApiFallback: true,
|
||||
accounts: {
|
||||
work: {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "secret", // pragma: allowlist secret
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(parsed.success).toBe(true);
|
||||
if (!parsed.success) {
|
||||
return;
|
||||
}
|
||||
const accountConfig = (
|
||||
parsed.data as { accounts?: { work?: { replyContextApiFallback?: boolean } } }
|
||||
).accounts?.work;
|
||||
expect(accountConfig?.replyContextApiFallback).toBeUndefined();
|
||||
});
|
||||
|
||||
it("accepts explicit replyContextApiFallback at channel and account scope", () => {
|
||||
const parsed = BlueBubblesConfigSchema.safeParse({
|
||||
replyContextApiFallback: true,
|
||||
accounts: {
|
||||
work: {
|
||||
replyContextApiFallback: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(parsed.success).toBe(true);
|
||||
if (!parsed.success) {
|
||||
return;
|
||||
}
|
||||
expect((parsed.data as { replyContextApiFallback?: boolean }).replyContextApiFallback).toBe(
|
||||
true,
|
||||
);
|
||||
expect(
|
||||
(parsed.data as { accounts?: { work?: { replyContextApiFallback?: boolean } } }).accounts
|
||||
?.work?.replyContextApiFallback,
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("bluebubbles group policy", () => {
|
||||
|
||||
@@ -86,6 +86,14 @@ export type BlueBubblesAccountConfig = {
|
||||
blockStreaming?: boolean;
|
||||
/** Merge streamed block replies before sending. */
|
||||
blockStreamingCoalesce?: Record<string, unknown>;
|
||||
/**
|
||||
* When an inbound reply lands without `replyToBody`/`replyToSender` and the
|
||||
* in-memory reply cache misses (e.g., multi-instance deployments sharing
|
||||
* one BlueBubbles account, after process restarts, or after long-lived
|
||||
* cache eviction), fetch the original message from the BlueBubbles HTTP API
|
||||
* as a best-effort fallback. Default: false.
|
||||
*/
|
||||
replyContextApiFallback?: boolean;
|
||||
/** Max outbound media size in MB. */
|
||||
mediaMaxMb?: number;
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user