mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 19:32:27 +00:00
fix(bluebubbles): guard debounce flush against null text (#56573)
Sanitize message text at the debounce enqueue boundary and add an independent guard in combineDebounceEntries(). Prevents TypeError when a queued entry has null text that reaches .trim() during flush. Add regression test: enqueue null-text entry alongside valid message, verify flush completes without error and valid message is delivered. Fixes #35777
This commit is contained in:
@@ -11,6 +11,23 @@ type BlueBubblesDebounceEntry = {
|
||||
target: WebhookTarget;
|
||||
};
|
||||
|
||||
function normalizeDebounceMessageText(text: unknown): string {
|
||||
return typeof text === "string" ? text : "";
|
||||
}
|
||||
|
||||
function sanitizeDebounceEntry(entry: BlueBubblesDebounceEntry): BlueBubblesDebounceEntry {
|
||||
if (typeof entry.message.text === "string") {
|
||||
return entry;
|
||||
}
|
||||
return {
|
||||
...entry,
|
||||
message: {
|
||||
...entry.message,
|
||||
text: "",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export type BlueBubblesDebouncer = {
|
||||
enqueue: (item: BlueBubblesDebounceEntry) => Promise<void>;
|
||||
flushKey: (key: string) => Promise<void>;
|
||||
@@ -48,7 +65,7 @@ function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): Normalized
|
||||
const textParts: string[] = [];
|
||||
|
||||
for (const entry of entries) {
|
||||
const text = entry.message.text.trim();
|
||||
const text = normalizeDebounceMessageText(entry.message.text).trim();
|
||||
if (!text) {
|
||||
continue;
|
||||
}
|
||||
@@ -120,7 +137,7 @@ export function createBlueBubblesDebounceRegistry(params: {
|
||||
}
|
||||
|
||||
const { account, config, runtime, core } = target;
|
||||
const debouncer = core.channel.debounce.createInboundDebouncer<BlueBubblesDebounceEntry>({
|
||||
const baseDebouncer = core.channel.debounce.createInboundDebouncer<BlueBubblesDebounceEntry>({
|
||||
debounceMs: resolveBlueBubblesDebounceMs(config, core),
|
||||
buildKey: (entry) => {
|
||||
const msg = entry.message;
|
||||
@@ -195,6 +212,13 @@ export function createBlueBubblesDebounceRegistry(params: {
|
||||
},
|
||||
});
|
||||
|
||||
const debouncer: BlueBubblesDebouncer = {
|
||||
enqueue: async (item) => {
|
||||
await baseDebouncer.enqueue(sanitizeDebounceEntry(item));
|
||||
},
|
||||
flushKey: (key) => baseDebouncer.flushKey(key),
|
||||
};
|
||||
|
||||
targetDebouncers.set(target, debouncer);
|
||||
return debouncer;
|
||||
},
|
||||
|
||||
@@ -27,6 +27,8 @@ import {
|
||||
resetBlueBubblesParticipantContactNameCacheForTest,
|
||||
setBlueBubblesParticipantContactDepsForTest,
|
||||
} from "./participant-contact-names.js";
|
||||
import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js";
|
||||
import type { NormalizedWebhookMessage } from "./monitor-normalize.js";
|
||||
import type { OpenClawConfig, PluginRuntime } from "./runtime-api.js";
|
||||
|
||||
// Mock dependencies
|
||||
@@ -147,6 +149,73 @@ function getFirstDispatchCall(): DispatchReplyParams {
|
||||
return callArgs;
|
||||
}
|
||||
|
||||
function installTimingAwareInboundDebouncer(core: PluginRuntime) {
|
||||
// Use a timing-aware debouncer test double that respects debounceMs/buildKey/shouldDebounce.
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
core.channel.debounce.createInboundDebouncer = vi.fn((params: any) => {
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
type Item = any;
|
||||
const buckets = new Map<string, { items: Item[]; timer: ReturnType<typeof setTimeout> | null }>();
|
||||
|
||||
const flush = async (key: string) => {
|
||||
const bucket = buckets.get(key);
|
||||
if (!bucket) {
|
||||
return;
|
||||
}
|
||||
if (bucket.timer) {
|
||||
clearTimeout(bucket.timer);
|
||||
bucket.timer = null;
|
||||
}
|
||||
const items = bucket.items;
|
||||
bucket.items = [];
|
||||
if (items.length > 0) {
|
||||
try {
|
||||
await params.onFlush(items);
|
||||
} catch (err) {
|
||||
params.onError?.(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
enqueue: async (item: Item) => {
|
||||
if (params.shouldDebounce && !params.shouldDebounce(item)) {
|
||||
await params.onFlush([item]);
|
||||
return;
|
||||
}
|
||||
|
||||
const key = params.buildKey(item);
|
||||
const existing = buckets.get(key);
|
||||
const bucket = existing ?? { items: [], timer: null };
|
||||
bucket.items.push(item);
|
||||
if (bucket.timer) {
|
||||
clearTimeout(bucket.timer);
|
||||
}
|
||||
bucket.timer = setTimeout(async () => {
|
||||
await flush(key);
|
||||
}, params.debounceMs);
|
||||
buckets.set(key, bucket);
|
||||
},
|
||||
flushKey: vi.fn(async (key: string) => {
|
||||
await flush(key);
|
||||
}),
|
||||
};
|
||||
}) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"];
|
||||
}
|
||||
|
||||
function createDebounceTestMessage(
|
||||
overrides: Partial<NormalizedWebhookMessage> = {},
|
||||
): NormalizedWebhookMessage {
|
||||
return {
|
||||
text: "hello",
|
||||
senderId: "+15551234567",
|
||||
senderIdExplicit: true,
|
||||
isGroup: false,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("BlueBubbles webhook monitor", () => {
|
||||
let unregister: () => void;
|
||||
|
||||
@@ -724,62 +793,7 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
|
||||
// Use a timing-aware debouncer test double that respects debounceMs/buildKey/shouldDebounce.
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
core.channel.debounce.createInboundDebouncer = vi.fn((params: any) => {
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
type Item = any;
|
||||
const buckets = new Map<
|
||||
string,
|
||||
{ items: Item[]; timer: ReturnType<typeof setTimeout> | null }
|
||||
>();
|
||||
|
||||
const flush = async (key: string) => {
|
||||
const bucket = buckets.get(key);
|
||||
if (!bucket) {
|
||||
return;
|
||||
}
|
||||
if (bucket.timer) {
|
||||
clearTimeout(bucket.timer);
|
||||
bucket.timer = null;
|
||||
}
|
||||
const items = bucket.items;
|
||||
bucket.items = [];
|
||||
if (items.length > 0) {
|
||||
try {
|
||||
await params.onFlush(items);
|
||||
} catch (err) {
|
||||
params.onError?.(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
enqueue: async (item: Item) => {
|
||||
if (params.shouldDebounce && !params.shouldDebounce(item)) {
|
||||
await params.onFlush([item]);
|
||||
return;
|
||||
}
|
||||
|
||||
const key = params.buildKey(item);
|
||||
const existing = buckets.get(key);
|
||||
const bucket = existing ?? { items: [], timer: null };
|
||||
bucket.items.push(item);
|
||||
if (bucket.timer) {
|
||||
clearTimeout(bucket.timer);
|
||||
}
|
||||
bucket.timer = setTimeout(async () => {
|
||||
await flush(key);
|
||||
}, params.debounceMs);
|
||||
buckets.set(key, bucket);
|
||||
},
|
||||
flushKey: vi.fn(async (key: string) => {
|
||||
await flush(key);
|
||||
}),
|
||||
};
|
||||
}) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"];
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
|
||||
const registration = trackWebhookRegistrationForTest(
|
||||
setupWebhookTargetForTest({
|
||||
@@ -832,6 +846,61 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("skips null-text entries during flush and still delivers the valid message", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const core = createMockRuntime();
|
||||
installTimingAwareInboundDebouncer(core);
|
||||
|
||||
const processMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const registry = createBlueBubblesDebounceRegistry({ processMessage });
|
||||
const account = createMockAccount();
|
||||
const target = {
|
||||
account,
|
||||
config: {},
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
};
|
||||
const debouncer = registry.getOrCreateDebouncer(target);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: {
|
||||
...createDebounceTestMessage({
|
||||
messageId: "msg-null",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
}),
|
||||
text: null,
|
||||
} as unknown as NormalizedWebhookMessage,
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
await debouncer.enqueue({
|
||||
message: createDebounceTestMessage({
|
||||
text: "hello from valid entry",
|
||||
messageId: "msg-null",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
}),
|
||||
target,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
expect(processMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
text: "hello from valid entry",
|
||||
}),
|
||||
target,
|
||||
);
|
||||
expect(target.runtime.error).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("reply metadata", () => {
|
||||
|
||||
Reference in New Issue
Block a user