mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-05 00:20:22 +00:00
fix: harden iMessage echo dedupe and reasoning suppression (#25897)
This commit is contained in:
@@ -35,4 +35,16 @@ describe("extractMessagingToolSend", () => {
|
||||
expect(result?.provider).toBe("slack");
|
||||
expect(result?.to).toBe("channel:C1");
|
||||
});
|
||||
|
||||
it("accepts target alias when to is omitted", () => {
|
||||
const result = extractMessagingToolSend("message", {
|
||||
action: "send",
|
||||
channel: "telegram",
|
||||
target: "123",
|
||||
});
|
||||
|
||||
expect(result?.tool).toBe("message");
|
||||
expect(result?.provider).toBe("telegram");
|
||||
expect(result?.to).toBe("telegram:123");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -298,7 +298,12 @@ export function extractMessagingToolSend(
|
||||
if (action !== "send" && action !== "thread-reply") {
|
||||
return undefined;
|
||||
}
|
||||
const toRaw = typeof args.to === "string" ? args.to : undefined;
|
||||
const toRaw =
|
||||
typeof args.to === "string"
|
||||
? args.to
|
||||
: typeof args.target === "string"
|
||||
? args.target
|
||||
: undefined;
|
||||
if (!toRaw) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@@ -144,6 +144,18 @@ describe("routeReply", () => {
|
||||
expect(mocks.sendMessageSlack).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("suppresses reasoning payloads", async () => {
|
||||
mocks.sendMessageSlack.mockClear();
|
||||
const res = await routeReply({
|
||||
payload: { text: "Reasoning:\n_step_", isReasoning: true },
|
||||
channel: "slack",
|
||||
to: "channel:C123",
|
||||
cfg: {} as never,
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
expect(mocks.sendMessageSlack).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("drops silent token payloads", async () => {
|
||||
mocks.sendMessageSlack.mockClear();
|
||||
const res = await routeReply({
|
||||
|
||||
@@ -56,6 +56,9 @@ export type RouteReplyResult = {
|
||||
*/
|
||||
export async function routeReply(params: RouteReplyParams): Promise<RouteReplyResult> {
|
||||
const { payload, channel, to, accountId, threadId, cfg, abortSignal } = params;
|
||||
if (payload.isReasoning) {
|
||||
return { ok: true };
|
||||
}
|
||||
const normalizedChannel = normalizeMessageChannel(channel);
|
||||
const resolvedAgentId = params.sessionKey
|
||||
? resolveSessionAgentId({
|
||||
|
||||
@@ -123,4 +123,30 @@ describe("deliverReplies", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("records outbound text and message ids in sent-message cache", async () => {
|
||||
const remember = vi.fn();
|
||||
chunkTextWithModeMock.mockImplementation((text: string) => text.split("|"));
|
||||
|
||||
await deliverReplies({
|
||||
replies: [{ text: "first|second" }],
|
||||
target: "chat_id:30",
|
||||
client,
|
||||
accountId: "acct-3",
|
||||
runtime,
|
||||
maxBytes: 2048,
|
||||
textLimit: 4000,
|
||||
sentMessageCache: { remember },
|
||||
});
|
||||
|
||||
expect(remember).toHaveBeenCalledWith("acct-3:chat_id:30", { text: "first|second" });
|
||||
expect(remember).toHaveBeenCalledWith("acct-3:chat_id:30", {
|
||||
text: "first",
|
||||
messageId: "imsg-1",
|
||||
});
|
||||
expect(remember).toHaveBeenCalledWith("acct-3:chat_id:30", {
|
||||
text: "second",
|
||||
messageId: "imsg-1",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -8,7 +8,7 @@ import type { createIMessageRpcClient } from "../client.js";
|
||||
import { sendMessageIMessage } from "../send.js";
|
||||
|
||||
type SentMessageCache = {
|
||||
remember: (scope: string, text: string) => void;
|
||||
remember: (scope: string, lookup: { text?: string; messageId?: string }) => void;
|
||||
};
|
||||
|
||||
export async function deliverReplies(params: {
|
||||
@@ -39,31 +39,32 @@ export async function deliverReplies(params: {
|
||||
continue;
|
||||
}
|
||||
if (mediaList.length === 0) {
|
||||
sentMessageCache?.remember(scope, text);
|
||||
sentMessageCache?.remember(scope, { text });
|
||||
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
|
||||
await sendMessageIMessage(target, chunk, {
|
||||
const sent = await sendMessageIMessage(target, chunk, {
|
||||
maxBytes,
|
||||
client,
|
||||
accountId,
|
||||
replyToId: payload.replyToId,
|
||||
});
|
||||
sentMessageCache?.remember(scope, chunk);
|
||||
sentMessageCache?.remember(scope, { text: chunk, messageId: sent.messageId });
|
||||
}
|
||||
} else {
|
||||
let first = true;
|
||||
for (const url of mediaList) {
|
||||
const caption = first ? text : "";
|
||||
first = false;
|
||||
await sendMessageIMessage(target, caption, {
|
||||
const sent = await sendMessageIMessage(target, caption, {
|
||||
mediaUrl: url,
|
||||
maxBytes,
|
||||
client,
|
||||
accountId,
|
||||
replyToId: payload.replyToId,
|
||||
});
|
||||
if (caption) {
|
||||
sentMessageCache?.remember(scope, caption);
|
||||
}
|
||||
sentMessageCache?.remember(scope, {
|
||||
text: caption || undefined,
|
||||
messageId: sent.messageId,
|
||||
});
|
||||
}
|
||||
}
|
||||
runtime.log?.(`imessage: delivered reply to ${target}`);
|
||||
|
||||
60
src/imessage/monitor/inbound-processing.test.ts
Normal file
60
src/imessage/monitor/inbound-processing.test.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
describeIMessageEchoDropLog,
|
||||
resolveIMessageInboundDecision,
|
||||
} from "./inbound-processing.js";
|
||||
|
||||
describe("resolveIMessageInboundDecision echo detection", () => {
|
||||
const cfg = {} as OpenClawConfig;
|
||||
|
||||
it("drops inbound messages when outbound message id matches echo cache", () => {
|
||||
const echoHas = vi.fn((_scope: string, lookup: { text?: string; messageId?: string }) => {
|
||||
return lookup.messageId === "42";
|
||||
});
|
||||
|
||||
const decision = resolveIMessageInboundDecision({
|
||||
cfg,
|
||||
accountId: "default",
|
||||
message: {
|
||||
id: 42,
|
||||
sender: "+15555550123",
|
||||
text: "Reasoning:\n_step_",
|
||||
is_from_me: false,
|
||||
is_group: false,
|
||||
},
|
||||
opts: undefined,
|
||||
messageText: "Reasoning:\n_step_",
|
||||
bodyText: "Reasoning:\n_step_",
|
||||
allowFrom: [],
|
||||
groupAllowFrom: [],
|
||||
groupPolicy: "open",
|
||||
dmPolicy: "open",
|
||||
storeAllowFrom: [],
|
||||
historyLimit: 0,
|
||||
groupHistories: new Map(),
|
||||
echoCache: { has: echoHas },
|
||||
logVerbose: undefined,
|
||||
});
|
||||
|
||||
expect(decision).toEqual({ kind: "drop", reason: "echo" });
|
||||
expect(echoHas).toHaveBeenCalledWith(
|
||||
"default:imessage:+15555550123",
|
||||
expect.objectContaining({
|
||||
text: "Reasoning:\n_step_",
|
||||
messageId: "42",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("describeIMessageEchoDropLog", () => {
|
||||
it("includes message id when available", () => {
|
||||
expect(
|
||||
describeIMessageEchoDropLog({
|
||||
messageText: "Reasoning:\n_step_",
|
||||
messageId: "abc-123",
|
||||
}),
|
||||
).toContain("id=abc-123");
|
||||
});
|
||||
});
|
||||
@@ -95,7 +95,7 @@ export function resolveIMessageInboundDecision(params: {
|
||||
storeAllowFrom: string[];
|
||||
historyLimit: number;
|
||||
groupHistories: Map<string, HistoryEntry[]>;
|
||||
echoCache?: { has: (scope: string, text: string) => boolean };
|
||||
echoCache?: { has: (scope: string, lookup: { text?: string; messageId?: string }) => boolean };
|
||||
logVerbose?: (msg: string) => void;
|
||||
}): IMessageInboundDecision {
|
||||
const senderRaw = params.message.sender ?? "";
|
||||
@@ -224,15 +224,23 @@ export function resolveIMessageInboundDecision(params: {
|
||||
|
||||
// Echo detection: check if the received message matches a recently sent message (within 5 seconds).
|
||||
// Scope by conversation so same text in different chats is not conflated.
|
||||
if (params.echoCache && messageText) {
|
||||
const inboundMessageId = params.message.id != null ? String(params.message.id) : undefined;
|
||||
if (params.echoCache && (messageText || inboundMessageId)) {
|
||||
const echoScope = buildIMessageEchoScope({
|
||||
accountId: params.accountId,
|
||||
isGroup,
|
||||
chatId,
|
||||
sender,
|
||||
});
|
||||
if (params.echoCache.has(echoScope, messageText)) {
|
||||
params.logVerbose?.(describeIMessageEchoDropLog({ messageText }));
|
||||
if (
|
||||
params.echoCache.has(echoScope, {
|
||||
text: messageText || undefined,
|
||||
messageId: inboundMessageId,
|
||||
})
|
||||
) {
|
||||
params.logVerbose?.(
|
||||
describeIMessageEchoDropLog({ messageText, messageId: inboundMessageId }),
|
||||
);
|
||||
return { kind: "drop", reason: "echo" };
|
||||
}
|
||||
}
|
||||
@@ -479,6 +487,11 @@ export function buildIMessageEchoScope(params: {
|
||||
return `${params.accountId}:${params.isGroup ? formatIMessageChatTarget(params.chatId) : `imessage:${params.sender}`}`;
|
||||
}
|
||||
|
||||
export function describeIMessageEchoDropLog(params: { messageText: string }): string {
|
||||
return `imessage: skipping echo message (matches recently sent text within 5s): "${truncateUtf16Safe(params.messageText, 50)}"`;
|
||||
export function describeIMessageEchoDropLog(params: {
|
||||
messageText: string;
|
||||
messageId?: string;
|
||||
}): string {
|
||||
const preview = truncateUtf16Safe(params.messageText, 50);
|
||||
const messageIdPart = params.messageId ? ` id=${params.messageId}` : "";
|
||||
return `imessage: skipping echo message${messageIdPart}: "${preview}"`;
|
||||
}
|
||||
|
||||
43
src/imessage/monitor/monitor-provider.echo-cache.test.ts
Normal file
43
src/imessage/monitor/monitor-provider.echo-cache.test.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { __testing } from "./monitor-provider.js";
|
||||
|
||||
describe("iMessage sent-message echo cache", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("matches recent text within the same scope", () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-02-25T00:00:00Z"));
|
||||
const cache = __testing.createSentMessageCache();
|
||||
|
||||
cache.remember("acct:imessage:+1555", { text: " Reasoning:\r\n_step_ " });
|
||||
|
||||
expect(cache.has("acct:imessage:+1555", { text: "Reasoning:\n_step_" })).toBe(true);
|
||||
expect(cache.has("acct:imessage:+1666", { text: "Reasoning:\n_step_" })).toBe(false);
|
||||
});
|
||||
|
||||
it("matches by outbound message id and ignores placeholder ids", () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-02-25T00:00:00Z"));
|
||||
const cache = __testing.createSentMessageCache();
|
||||
|
||||
cache.remember("acct:imessage:+1555", { messageId: "abc-123" });
|
||||
cache.remember("acct:imessage:+1555", { messageId: "ok" });
|
||||
|
||||
expect(cache.has("acct:imessage:+1555", { messageId: "abc-123" })).toBe(true);
|
||||
expect(cache.has("acct:imessage:+1555", { messageId: "ok" })).toBe(false);
|
||||
});
|
||||
|
||||
it("keeps message-id lookups longer than text fallback", () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-02-25T00:00:00Z"));
|
||||
const cache = __testing.createSentMessageCache();
|
||||
|
||||
cache.remember("acct:imessage:+1555", { text: "hello", messageId: "m-1" });
|
||||
vi.advanceTimersByTime(6000);
|
||||
|
||||
expect(cache.has("acct:imessage:+1555", { text: "hello" })).toBe(false);
|
||||
expect(cache.has("acct:imessage:+1555", { messageId: "m-1" })).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -83,43 +83,80 @@ async function detectRemoteHostFromCliPath(cliPath: string): Promise<string | un
|
||||
/**
|
||||
* Cache for recently sent messages, used for echo detection.
|
||||
* Keys are scoped by conversation (accountId:target) so the same text in different chats is not conflated.
|
||||
* Entries expire after 5 seconds; we do not forget on match so multiple echo deliveries are all filtered.
|
||||
* Message IDs use a longer TTL than text fallback to improve resilience when inbound polling is delayed.
|
||||
*/
|
||||
class SentMessageCache {
|
||||
private cache = new Map<string, number>();
|
||||
private readonly ttlMs = 5000; // 5 seconds
|
||||
const SENT_MESSAGE_TEXT_TTL_MS = 5000;
|
||||
const SENT_MESSAGE_ID_TTL_MS = 60_000;
|
||||
|
||||
remember(scope: string, text: string): void {
|
||||
if (!text?.trim()) {
|
||||
return;
|
||||
function normalizeEchoTextKey(text: string | undefined): string | null {
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
const normalized = text.replace(/\r\n?/g, "\n").trim();
|
||||
return normalized ? normalized : null;
|
||||
}
|
||||
|
||||
function normalizeEchoMessageIdKey(messageId: string | undefined): string | null {
|
||||
if (!messageId) {
|
||||
return null;
|
||||
}
|
||||
const normalized = messageId.trim();
|
||||
if (!normalized || normalized === "ok" || normalized === "unknown") {
|
||||
return null;
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
type SentMessageLookup = {
|
||||
text?: string;
|
||||
messageId?: string;
|
||||
};
|
||||
|
||||
class SentMessageCache {
|
||||
private textCache = new Map<string, number>();
|
||||
private messageIdCache = new Map<string, number>();
|
||||
|
||||
remember(scope: string, lookup: SentMessageLookup): void {
|
||||
const textKey = normalizeEchoTextKey(lookup.text);
|
||||
if (textKey) {
|
||||
this.textCache.set(`${scope}:${textKey}`, Date.now());
|
||||
}
|
||||
const messageIdKey = normalizeEchoMessageIdKey(lookup.messageId);
|
||||
if (messageIdKey) {
|
||||
this.messageIdCache.set(`${scope}:${messageIdKey}`, Date.now());
|
||||
}
|
||||
const key = `${scope}:${text.trim()}`;
|
||||
this.cache.set(key, Date.now());
|
||||
this.cleanup();
|
||||
}
|
||||
|
||||
has(scope: string, text: string): boolean {
|
||||
if (!text?.trim()) {
|
||||
return false;
|
||||
has(scope: string, lookup: SentMessageLookup): boolean {
|
||||
this.cleanup();
|
||||
const messageIdKey = normalizeEchoMessageIdKey(lookup.messageId);
|
||||
if (messageIdKey) {
|
||||
const idTimestamp = this.messageIdCache.get(`${scope}:${messageIdKey}`);
|
||||
if (idTimestamp && Date.now() - idTimestamp <= SENT_MESSAGE_ID_TTL_MS) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
const key = `${scope}:${text.trim()}`;
|
||||
const timestamp = this.cache.get(key);
|
||||
if (!timestamp) {
|
||||
return false;
|
||||
const textKey = normalizeEchoTextKey(lookup.text);
|
||||
if (textKey) {
|
||||
const textTimestamp = this.textCache.get(`${scope}:${textKey}`);
|
||||
if (textTimestamp && Date.now() - textTimestamp <= SENT_MESSAGE_TEXT_TTL_MS) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
const age = Date.now() - timestamp;
|
||||
if (age > this.ttlMs) {
|
||||
this.cache.delete(key);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private cleanup(): void {
|
||||
const now = Date.now();
|
||||
for (const [text, timestamp] of this.cache.entries()) {
|
||||
if (now - timestamp > this.ttlMs) {
|
||||
this.cache.delete(text);
|
||||
for (const [key, timestamp] of this.textCache.entries()) {
|
||||
if (now - timestamp > SENT_MESSAGE_TEXT_TTL_MS) {
|
||||
this.textCache.delete(key);
|
||||
}
|
||||
}
|
||||
for (const [key, timestamp] of this.messageIdCache.entries()) {
|
||||
if (now - timestamp > SENT_MESSAGE_ID_TTL_MS) {
|
||||
this.messageIdCache.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -527,4 +564,5 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
export const __testing = {
|
||||
resolveIMessageRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy,
|
||||
resolveDefaultGroupPolicy,
|
||||
createSentMessageCache: () => new SentMessageCache(),
|
||||
};
|
||||
|
||||
@@ -908,6 +908,14 @@ describe("normalizeOutboundPayloadsForJson", () => {
|
||||
expect(normalizeOutboundPayloadsForJson(input)).toEqual(testCase.expected);
|
||||
}
|
||||
});
|
||||
|
||||
it("suppresses reasoning payloads", () => {
|
||||
const normalized = normalizeOutboundPayloadsForJson([
|
||||
{ text: "Reasoning:\n_step_", isReasoning: true },
|
||||
{ text: "final answer" },
|
||||
]);
|
||||
expect(normalized).toEqual([{ text: "final answer", mediaUrl: null, mediaUrls: undefined }]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("normalizeOutboundPayloads", () => {
|
||||
@@ -916,6 +924,14 @@ describe("normalizeOutboundPayloads", () => {
|
||||
const normalized = normalizeOutboundPayloads([{ channelData }]);
|
||||
expect(normalized).toEqual([{ text: "", mediaUrls: [], channelData }]);
|
||||
});
|
||||
|
||||
it("suppresses reasoning payloads", () => {
|
||||
const normalized = normalizeOutboundPayloads([
|
||||
{ text: "Reasoning:\n_step_", isReasoning: true },
|
||||
{ text: "final answer" },
|
||||
]);
|
||||
expect(normalized).toEqual([{ text: "final answer", mediaUrls: [] }]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatOutboundPayloadLog", () => {
|
||||
|
||||
@@ -41,6 +41,9 @@ export function normalizeReplyPayloadsForDelivery(
|
||||
payloads: readonly ReplyPayload[],
|
||||
): ReplyPayload[] {
|
||||
return payloads.flatMap((payload) => {
|
||||
if (payload.isReasoning) {
|
||||
return [];
|
||||
}
|
||||
const parsed = parseReplyDirectives(payload.text ?? "");
|
||||
const explicitMediaUrls = payload.mediaUrls ?? parsed.mediaUrls;
|
||||
const explicitMediaUrl = payload.mediaUrl ?? parsed.mediaUrl;
|
||||
|
||||
Reference in New Issue
Block a user