From ed2d9cd55ce8d14f33c6c562c61b908377ba8d0b Mon Sep 17 00:00:00 2001 From: Tom Date: Tue, 3 Mar 2026 23:39:16 +0700 Subject: [PATCH] zalouser: add group message history for mention-gated chats --- docs/channels/zalouser.md | 2 + extensions/zalouser/src/channel.ts | 1 + extensions/zalouser/src/config-schema.ts | 1 + .../zalouser/src/monitor.group-gating.test.ts | 60 ++++++++++++ extensions/zalouser/src/monitor.ts | 97 ++++++++++++++++++- extensions/zalouser/src/types.ts | 1 + 6 files changed, 160 insertions(+), 2 deletions(-) diff --git a/docs/channels/zalouser.md b/docs/channels/zalouser.md index 7f440b00afc..9b62244e234 100644 --- a/docs/channels/zalouser.md +++ b/docs/channels/zalouser.md @@ -117,6 +117,8 @@ Example: - Resolution order: exact group id/name -> normalized group slug -> `*` -> default (`true`). - This applies both to allowlisted groups and open group mode. - Authorized control commands (for example `/new`) can bypass mention gating. +- When a group message is skipped because mention is required, OpenClaw stores it as pending group history and includes it on the next processed group message. +- Group history limit defaults to `messages.groupChat.historyLimit` (fallback `50`). You can override per account with `channels.zalouser.historyLimit`. Example: diff --git a/extensions/zalouser/src/channel.ts b/extensions/zalouser/src/channel.ts index 525ff1a86b1..a0414b7b245 100644 --- a/extensions/zalouser/src/channel.ts +++ b/extensions/zalouser/src/channel.ts @@ -342,6 +342,7 @@ export const zalouserPlugin: ChannelPlugin = { "name", "dmPolicy", "allowFrom", + "historyLimit", "groupAllowFrom", "groupPolicy", "groups", diff --git a/extensions/zalouser/src/config-schema.ts b/extensions/zalouser/src/config-schema.ts index 2b809233362..8f1f8a77fc7 100644 --- a/extensions/zalouser/src/config-schema.ts +++ b/extensions/zalouser/src/config-schema.ts @@ -17,6 +17,7 @@ const zalouserAccountSchema = z.object({ profile: z.string().optional(), dmPolicy: z.enum(["pairing", "allowlist", "open", "disabled"]).optional(), allowFrom: z.array(allowFromEntry).optional(), + historyLimit: z.number().int().min(0).optional(), groupAllowFrom: z.array(allowFromEntry).optional(), groupPolicy: z.enum(["disabled", "allowlist", "open"]).optional(), groups: z.object({}).catchall(groupConfigSchema).optional(), diff --git a/extensions/zalouser/src/monitor.group-gating.test.ts b/extensions/zalouser/src/monitor.group-gating.test.ts index e602912d63d..572b59d06c2 100644 --- a/extensions/zalouser/src/monitor.group-gating.test.ts +++ b/extensions/zalouser/src/monitor.group-gating.test.ts @@ -458,4 +458,64 @@ describe("zalouser monitor group mention gating", () => { expect(readAllowFromStore).not.toHaveBeenCalled(); }); + + it("includes skipped group messages as InboundHistory on the next processed message", async () => { + const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({ + commandAuthorized: false, + }); + const historyState = { + historyLimit: 5, + groupHistories: new Map< + string, + Array<{ sender: string; body: string; timestamp?: number; messageId?: string }> + >(), + }; + const account = createAccount(); + const config = createConfig(); + await __testing.processMessage({ + message: createGroupMessage({ + content: "first unmentioned line", + hasAnyMention: false, + wasExplicitlyMentioned: false, + }), + account, + config, + runtime: createRuntimeEnv(), + historyState, + }); + expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled(); + + await __testing.processMessage({ + message: createGroupMessage({ + content: "second line @bot", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account, + config, + runtime: createRuntimeEnv(), + historyState, + }); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); + const firstDispatch = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; + expect(firstDispatch?.ctx?.InboundHistory).toEqual([ + expect.objectContaining({ sender: "Alice", body: "first unmentioned line" }), + ]); + expect(String(firstDispatch?.ctx?.Body ?? "")).toContain("first unmentioned line"); + + await __testing.processMessage({ + message: createGroupMessage({ + content: "third line @bot", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account, + config, + runtime: createRuntimeEnv(), + historyState, + }); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(2); + const secondDispatch = dispatchReplyWithBufferedBlockDispatcher.mock.calls[1]?.[0]; + expect(secondDispatch?.ctx?.InboundHistory).toEqual([]); + }); }); diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index 3072361fafc..90b2b098a15 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -6,11 +6,16 @@ import type { } from "openclaw/plugin-sdk"; import { DM_GROUP_ACCESS_REASON, + DEFAULT_GROUP_HISTORY_LIMIT, + type HistoryEntry, + buildPendingHistoryContextFromMap, + clearHistoryEntriesIfEnabled, createTypingCallbacks, createScopedPairingAccess, createReplyPrefixOptions, resolveOutboundMediaUrls, mergeAllowlist, + recordPendingHistoryEntryIfEnabled, resolveDmGroupAccessWithLists, resolveMentionGatingWithBypass, resolveOpenProviderRuntimeGroupPolicy, @@ -75,6 +80,11 @@ function buildNameIndex(items: T[], nameFn: (item: T) => string | undefined): type ZalouserCoreRuntime = ReturnType; +type ZalouserGroupHistoryState = { + historyLimit: number; + groupHistories: Map; +}; + function logVerbose(core: ZalouserCoreRuntime, runtime: RuntimeEnv, message: string): void { if (core.logging.shouldLogVerbose()) { runtime.log(`[zalouser] ${message}`); @@ -161,6 +171,7 @@ async function processMessage( config: OpenClawConfig, core: ZalouserCoreRuntime, runtime: RuntimeEnv, + historyState: ZalouserGroupHistoryState, statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, ): Promise { const pairing = createScopedPairingAccess({ @@ -352,6 +363,7 @@ async function processMessage( id: peer.id, }, }); + const historyKey = isGroup ? route.sessionKey : undefined; const requireMention = isGroup ? resolveGroupRequireMention({ @@ -396,6 +408,24 @@ async function processMessage( return; } if (isGroup && mentionGate.shouldSkip) { + recordPendingHistoryEntryIfEnabled({ + historyMap: historyState.groupHistories, + historyKey: historyKey ?? "", + limit: historyState.historyLimit, + entry: + historyKey && rawBody + ? { + sender: senderName || senderId, + body: rawBody, + timestamp: message.timestampMs, + messageId: resolveZalouserMessageSid({ + msgId: message.msgId, + cliMsgId: message.cliMsgId, + fallback: `${message.timestampMs}`, + }), + } + : null, + }); logVerbose(core, runtime, `zalouser: skip group ${chatId} (mention required, not mentioned)`); return; } @@ -417,12 +447,40 @@ async function processMessage( envelope: envelopeOptions, body: rawBody, }); + const combinedBody = + isGroup && historyKey + ? buildPendingHistoryContextFromMap({ + historyMap: historyState.groupHistories, + historyKey, + limit: historyState.historyLimit, + currentMessage: body, + formatEntry: (entry) => + core.channel.reply.formatAgentEnvelope({ + channel: "Zalo Personal", + from: fromLabel, + timestamp: entry.timestamp, + envelope: envelopeOptions, + body: `${entry.sender}: ${entry.body}${ + entry.messageId ? ` [id:${entry.messageId}]` : "" + }`, + }), + }) + : body; + const inboundHistory = + isGroup && historyKey && historyState.historyLimit > 0 + ? (historyState.groupHistories.get(historyKey) ?? []).map((entry) => ({ + sender: entry.sender, + body: entry.body, + timestamp: entry.timestamp, + })) + : undefined; const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`; const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: body, + Body: combinedBody, BodyForAgent: rawBody, + InboundHistory: inboundHistory, RawBody: rawBody, CommandBody: commandBody, BodyForCommands: commandBody, @@ -516,6 +574,13 @@ async function processMessage( onModelSelected, }, }); + if (isGroup && historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: historyState.groupHistories, + historyKey, + limit: historyState.historyLimit, + }); + } } async function deliverZalouserReply(params: { @@ -581,6 +646,13 @@ export async function monitorZalouserProvider( const { abortSignal, statusSink, runtime } = options; const core = getZalouserRuntime(); + const historyLimit = Math.max( + 0, + account.config.historyLimit ?? + config.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = new Map(); try { const profile = account.profile; @@ -716,7 +788,15 @@ export async function monitorZalouserProvider( } logVerbose(core, runtime, `[${account.accountId}] inbound message`); statusSink?.({ lastInboundAt: Date.now() }); - processMessage(msg, account, config, core, runtime, statusSink).catch((err) => { + processMessage( + msg, + account, + config, + core, + runtime, + { historyLimit, groupHistories }, + statusSink, + ).catch((err) => { runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`); }); }, @@ -758,14 +838,27 @@ export const __testing = { account: ResolvedZalouserAccount; config: OpenClawConfig; runtime: RuntimeEnv; + historyState?: { + historyLimit?: number; + groupHistories?: Map; + }; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }) => { + const historyLimit = Math.max( + 0, + params.historyState?.historyLimit ?? + params.account.config.historyLimit ?? + params.config.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = params.historyState?.groupHistories ?? new Map(); await processMessage( params.message, params.account, params.config, getZalouserRuntime(), params.runtime, + { historyLimit, groupHistories }, params.statusSink, ); }, diff --git a/extensions/zalouser/src/types.ts b/extensions/zalouser/src/types.ts index 27d2da6fb0c..d704a1b3f78 100644 --- a/extensions/zalouser/src/types.ts +++ b/extensions/zalouser/src/types.ts @@ -93,6 +93,7 @@ type ZalouserSharedConfig = { profile?: string; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; allowFrom?: Array; + historyLimit?: number; groupAllowFrom?: Array; groupPolicy?: "open" | "allowlist" | "disabled"; groups?: Record;