diff --git a/CHANGELOG.md b/CHANGELOG.md index 12afbc33926..421143bbf0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -379,6 +379,7 @@ Docs: https://docs.openclaw.ai - CLI/Banner taglines: add `cli.banner.taglineMode` (`random` | `default` | `off`) to control funny tagline behavior in startup output, with docs + FAQ guidance and regression tests for config override behavior. - Agents/compaction safeguard quality-audit rollout: keep summary quality audits disabled by default unless `agents.defaults.compaction.qualityGuard` is explicitly enabled, and add config plumbing for bounded retry control. (#25556) thanks @rodrigouroz. - Gateway/input_image MIME validation: sniff uploaded image bytes before MIME allowlist enforcement again so declared image types cannot mask concrete non-image payloads, while keeping HEIC/HEIF normalization behavior scoped to actual HEIC inputs. Thanks @vincentkoc. +- Zalo Personal plugin (`@openclaw/zalouser`): keep canonical DM routing while preserving legacy DM session continuity on upgrade, and preserve provider-native `g-`/`u-` target ids in outbound send and directory flows so #33992 lands without breaking existing sessions or stored targets. (#33992) Thanks @darkamenosa. ### Breaking diff --git a/docs/channels/zalouser.md b/docs/channels/zalouser.md index 4d40c2e9b4c..9b62244e234 100644 --- a/docs/channels/zalouser.md +++ b/docs/channels/zalouser.md @@ -86,10 +86,13 @@ Approve via: - Default: `channels.zalouser.groupPolicy = "open"` (groups allowed). Use `channels.defaults.groupPolicy` to override the default when unset. - Restrict to an allowlist with: - `channels.zalouser.groupPolicy = "allowlist"` - - `channels.zalouser.groups` (keys are group IDs or names) + - `channels.zalouser.groups` (keys are group IDs or names; controls which groups are allowed) + - `channels.zalouser.groupAllowFrom` (controls which senders in allowed groups can trigger the bot) - Block all groups: `channels.zalouser.groupPolicy = "disabled"`. - The configure wizard can prompt for group allowlists. - On startup, OpenClaw resolves group/user names in allowlists to IDs and logs the mapping; unresolved entries are kept as typed. +- If `groupAllowFrom` is unset, runtime falls back to `allowFrom` for group sender checks. +- Sender checks apply to both normal group messages and control commands (for example `/new`, `/reset`). Example: @@ -98,6 +101,7 @@ Example: channels: { zalouser: { groupPolicy: "allowlist", + groupAllowFrom: ["1471383327500481391"], groups: { "123456789": { allow: true }, "Work Chat": { allow: true }, @@ -112,6 +116,9 @@ Example: - `channels.zalouser.groups..requireMention` controls whether group replies require a mention. - Resolution order: exact group id/name -> normalized group slug -> `*` -> default (`true`). - This applies both to allowlisted groups and open group mode. +- Authorized control commands (for example `/new`) can bypass mention gating. +- When a group message is skipped because mention is required, OpenClaw stores it as pending group history and includes it on the next processed group message. +- Group history limit defaults to `messages.groupChat.historyLimit` (fallback `50`). You can override per account with `channels.zalouser.historyLimit`. Example: @@ -164,7 +171,7 @@ Accounts map to `zalouser` profiles in OpenClaw state. Example: **Allowlist/group name didn't resolve:** -- Use numeric IDs in `allowFrom`/`groups`, or exact friend/group names. +- Use numeric IDs in `allowFrom`/`groupAllowFrom`/`groups`, or exact friend/group names. **Upgraded from old CLI-based setup:** diff --git a/extensions/test-utils/plugin-runtime-mock.ts b/extensions/test-utils/plugin-runtime-mock.ts index 0526c6bf591..8c599599a31 100644 --- a/extensions/test-utils/plugin-runtime-mock.ts +++ b/extensions/test-utils/plugin-runtime-mock.ts @@ -123,6 +123,17 @@ export function createPluginRuntimeMock(overrides: DeepPartial = })) as unknown as PluginRuntime["channel"]["reply"]["resolveEnvelopeFormatOptions"], }, routing: { + buildAgentSessionKey: vi.fn( + ({ + agentId, + channel, + peer, + }: { + agentId: string; + channel: string; + peer?: { kind?: string; id?: string }; + }) => `agent:${agentId}:${channel}:${peer?.kind ?? "direct"}:${peer?.id ?? "peer"}`, + ) as unknown as PluginRuntime["channel"]["routing"]["buildAgentSessionKey"], resolveAgentRoute: vi.fn(() => ({ agentId: "main", accountId: "default", diff --git a/extensions/zalouser/src/channel.directory.test.ts b/extensions/zalouser/src/channel.directory.test.ts new file mode 100644 index 00000000000..f8c13b208e4 --- /dev/null +++ b/extensions/zalouser/src/channel.directory.test.ts @@ -0,0 +1,72 @@ +import type { RuntimeEnv } from "openclaw/plugin-sdk/zalouser"; +import { describe, expect, it, vi } from "vitest"; + +const listZaloGroupMembersMock = vi.hoisted(() => vi.fn(async () => [])); + +vi.mock("./zalo-js.js", async (importOriginal) => { + const actual = (await importOriginal()) as Record; + return { + ...actual, + listZaloGroupMembers: listZaloGroupMembersMock, + }; +}); + +vi.mock("./accounts.js", async (importOriginal) => { + const actual = (await importOriginal()) as Record; + return { + ...actual, + resolveZalouserAccountSync: () => ({ + accountId: "default", + profile: "default", + name: "test", + enabled: true, + authenticated: true, + config: {}, + }), + }; +}); + +import { zalouserPlugin } from "./channel.js"; + +const runtimeStub: RuntimeEnv = { + log: vi.fn(), + error: vi.fn(), + exit: ((code: number): never => { + throw new Error(`exit ${code}`); + }) as RuntimeEnv["exit"], +}; + +describe("zalouser directory group members", () => { + it("accepts prefixed group ids from directory groups list output", async () => { + await zalouserPlugin.directory!.listGroupMembers!({ + cfg: {}, + accountId: "default", + groupId: "group:1471383327500481391", + runtime: runtimeStub, + }); + + expect(listZaloGroupMembersMock).toHaveBeenCalledWith("default", "1471383327500481391"); + }); + + it("keeps backward compatibility for raw group ids", async () => { + await zalouserPlugin.directory!.listGroupMembers!({ + cfg: {}, + accountId: "default", + groupId: "1471383327500481391", + runtime: runtimeStub, + }); + + expect(listZaloGroupMembersMock).toHaveBeenCalledWith("default", "1471383327500481391"); + }); + + it("accepts provider-native g- group ids without stripping the prefix", async () => { + await zalouserPlugin.directory!.listGroupMembers!({ + cfg: {}, + accountId: "default", + groupId: "g-1471383327500481391", + runtime: runtimeStub, + }); + + expect(listZaloGroupMembersMock).toHaveBeenCalledWith("default", "g-1471383327500481391"); + }); +}); diff --git a/extensions/zalouser/src/channel.sendpayload.test.ts b/extensions/zalouser/src/channel.sendpayload.test.ts index 31eb6136cd5..534f9c39b95 100644 --- a/extensions/zalouser/src/channel.sendpayload.test.ts +++ b/extensions/zalouser/src/channel.sendpayload.test.ts @@ -24,7 +24,7 @@ vi.mock("./accounts.js", async (importOriginal) => { function baseCtx(payload: ReplyPayload) { return { cfg: {}, - to: "987654321", + to: "user:987654321", text: "", payload, }; @@ -49,6 +49,22 @@ describe("zalouserPlugin outbound sendPayload", () => { expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-t1" }); }); + it("group target delegates with isGroup=true and stripped threadId", async () => { + mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-g1" }); + + const result = await zalouserPlugin.outbound!.sendPayload!({ + ...baseCtx({ text: "hello group" }), + to: "group:1471383327500481391", + }); + + expect(mockedSend).toHaveBeenCalledWith( + "1471383327500481391", + "hello group", + expect.objectContaining({ isGroup: true }), + ); + expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-g1" }); + }); + it("single media delegates to sendMedia", async () => { mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-m1" }); @@ -64,6 +80,38 @@ describe("zalouserPlugin outbound sendPayload", () => { expect(result).toMatchObject({ channel: "zalouser" }); }); + it("treats bare numeric targets as direct chats for backward compatibility", async () => { + mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-d1" }); + + const result = await zalouserPlugin.outbound!.sendPayload!({ + ...baseCtx({ text: "hello" }), + to: "987654321", + }); + + expect(mockedSend).toHaveBeenCalledWith( + "987654321", + "hello", + expect.objectContaining({ isGroup: false }), + ); + expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-d1" }); + }); + + it("preserves provider-native group ids when sending to raw g- targets", async () => { + mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-g-native" }); + + const result = await zalouserPlugin.outbound!.sendPayload!({ + ...baseCtx({ text: "hello native group" }), + to: "g-1471383327500481391", + }); + + expect(mockedSend).toHaveBeenCalledWith( + "g-1471383327500481391", + "hello native group", + expect.objectContaining({ isGroup: true }), + ); + expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-g-native" }); + }); + it("multi-media iterates URLs with caption on first", async () => { mockedSend .mockResolvedValueOnce({ ok: true, messageId: "zlu-1" }) @@ -115,3 +163,31 @@ describe("zalouserPlugin outbound sendPayload", () => { expect(result).toMatchObject({ channel: "zalouser" }); }); }); + +describe("zalouserPlugin messaging target normalization", () => { + it("normalizes user/group aliases to canonical targets", () => { + const normalize = zalouserPlugin.messaging?.normalizeTarget; + expect(normalize).toBeTypeOf("function"); + if (!normalize) { + return; + } + expect(normalize("zlu:g:30003")).toBe("group:30003"); + expect(normalize("zalouser:u:20002")).toBe("user:20002"); + expect(normalize("zlu:g-30003")).toBe("group:g-30003"); + expect(normalize("zalouser:u-20002")).toBe("user:u-20002"); + expect(normalize("20002")).toBe("20002"); + }); + + it("treats canonical and provider-native user/group targets as ids", () => { + const looksLikeId = zalouserPlugin.messaging?.targetResolver?.looksLikeId; + expect(looksLikeId).toBeTypeOf("function"); + if (!looksLikeId) { + return; + } + expect(looksLikeId("user:20002")).toBe(true); + expect(looksLikeId("group:30003")).toBe(true); + expect(looksLikeId("g-30003")).toBe(true); + expect(looksLikeId("u-20002")).toBe(true); + expect(looksLikeId("Alice Nguyen")).toBe(false); + }); +}); diff --git a/extensions/zalouser/src/channel.ts b/extensions/zalouser/src/channel.ts index 2ca5c319231..e01775d0dbb 100644 --- a/extensions/zalouser/src/channel.ts +++ b/extensions/zalouser/src/channel.ts @@ -66,6 +66,97 @@ const meta = { quickstartAllowFrom: true, }; +function stripZalouserTargetPrefix(raw: string): string { + return raw + .trim() + .replace(/^(zalouser|zlu):/i, "") + .trim(); +} + +function normalizePrefixedTarget(raw: string): string | undefined { + const trimmed = stripZalouserTargetPrefix(raw); + if (!trimmed) { + return undefined; + } + + const lower = trimmed.toLowerCase(); + if (lower.startsWith("group:")) { + const id = trimmed.slice("group:".length).trim(); + return id ? `group:${id}` : undefined; + } + if (lower.startsWith("g:")) { + const id = trimmed.slice("g:".length).trim(); + return id ? `group:${id}` : undefined; + } + if (lower.startsWith("user:")) { + const id = trimmed.slice("user:".length).trim(); + return id ? `user:${id}` : undefined; + } + if (lower.startsWith("dm:")) { + const id = trimmed.slice("dm:".length).trim(); + return id ? `user:${id}` : undefined; + } + if (lower.startsWith("u:")) { + const id = trimmed.slice("u:".length).trim(); + return id ? `user:${id}` : undefined; + } + if (/^g-\S+$/i.test(trimmed)) { + return `group:${trimmed}`; + } + if (/^u-\S+$/i.test(trimmed)) { + return `user:${trimmed}`; + } + + return trimmed; +} + +function parseZalouserOutboundTarget(raw: string): { + threadId: string; + isGroup: boolean; +} { + const normalized = normalizePrefixedTarget(raw); + if (!normalized) { + throw new Error("Zalouser target is required"); + } + const lowered = normalized.toLowerCase(); + if (lowered.startsWith("group:")) { + const threadId = normalized.slice("group:".length).trim(); + if (!threadId) { + throw new Error("Zalouser group target is missing group id"); + } + return { threadId, isGroup: true }; + } + if (lowered.startsWith("user:")) { + const threadId = normalized.slice("user:".length).trim(); + if (!threadId) { + throw new Error("Zalouser user target is missing user id"); + } + return { threadId, isGroup: false }; + } + // Backward-compatible fallback for bare IDs. + // Group sends should use explicit `group:` targets. + return { threadId: normalized, isGroup: false }; +} + +function parseZalouserDirectoryGroupId(raw: string): string { + const normalized = normalizePrefixedTarget(raw); + if (!normalized) { + throw new Error("Zalouser group target is required"); + } + const lowered = normalized.toLowerCase(); + if (lowered.startsWith("group:")) { + const groupId = normalized.slice("group:".length).trim(); + if (!groupId) { + throw new Error("Zalouser group target is missing group id"); + } + return groupId; + } + if (lowered.startsWith("user:")) { + throw new Error("Zalouser group members lookup requires a group target (group:)"); + } + return normalized; +} + function resolveZalouserQrProfile(accountId?: string | null): string { const normalized = normalizeAccountId(accountId); if (!normalized || normalized === DEFAULT_ACCOUNT_ID) { @@ -261,6 +352,8 @@ export const zalouserPlugin: ChannelPlugin = { "name", "dmPolicy", "allowFrom", + "historyLimit", + "groupAllowFrom", "groupPolicy", "groups", "messagePrefix", @@ -333,16 +426,19 @@ export const zalouserPlugin: ChannelPlugin = { }, }, messaging: { - normalizeTarget: (raw) => { - const trimmed = raw?.trim(); - if (!trimmed) { - return undefined; - } - return trimmed.replace(/^(zalouser|zlu):/i, ""); - }, + normalizeTarget: (raw) => normalizePrefixedTarget(raw), targetResolver: { - looksLikeId: isNumericTargetId, - hint: "", + looksLikeId: (raw) => { + const normalized = normalizePrefixedTarget(raw); + if (!normalized) { + return false; + } + if (/^group:[^\s]+$/i.test(normalized) || /^user:[^\s]+$/i.test(normalized)) { + return true; + } + return isNumericTargetId(normalized); + }, + hint: "", }, }, directory: { @@ -377,7 +473,7 @@ export const zalouserPlugin: ChannelPlugin = { const groups = await listZaloGroupsMatching(account.profile, query); const rows = groups.map((group) => mapGroup({ - id: String(group.groupId), + id: `group:${String(group.groupId)}`, name: group.name ?? null, raw: group, }), @@ -386,7 +482,8 @@ export const zalouserPlugin: ChannelPlugin = { }, listGroupMembers: async ({ cfg, accountId, groupId, limit }) => { const account = resolveZalouserAccountSync({ cfg: cfg, accountId }); - const members = await listZaloGroupMembers(account.profile, groupId); + const normalizedGroupId = parseZalouserDirectoryGroupId(groupId); + const members = await listZaloGroupMembers(account.profile, normalizedGroupId); const rows = members.map((member) => mapUser({ id: member.userId, @@ -511,13 +608,19 @@ export const zalouserPlugin: ChannelPlugin = { }), sendText: async ({ to, text, accountId, cfg }) => { const account = resolveZalouserAccountSync({ cfg: cfg, accountId }); - const result = await sendMessageZalouser(to, text, { profile: account.profile }); + const target = parseZalouserOutboundTarget(to); + const result = await sendMessageZalouser(target.threadId, text, { + profile: account.profile, + isGroup: target.isGroup, + }); return buildChannelSendResult("zalouser", result); }, sendMedia: async ({ to, text, mediaUrl, accountId, cfg, mediaLocalRoots }) => { const account = resolveZalouserAccountSync({ cfg: cfg, accountId }); - const result = await sendMessageZalouser(to, text, { + const target = parseZalouserOutboundTarget(to); + const result = await sendMessageZalouser(target.threadId, text, { profile: account.profile, + isGroup: target.isGroup, mediaUrl, mediaLocalRoots, }); diff --git a/extensions/zalouser/src/config-schema.ts b/extensions/zalouser/src/config-schema.ts index bbc8457da6e..9d6b0bcec4a 100644 --- a/extensions/zalouser/src/config-schema.ts +++ b/extensions/zalouser/src/config-schema.ts @@ -17,6 +17,8 @@ const zalouserAccountSchema = z.object({ profile: z.string().optional(), dmPolicy: z.enum(["pairing", "allowlist", "open", "disabled"]).optional(), allowFrom: z.array(allowFromEntry).optional(), + historyLimit: z.number().int().min(0).optional(), + groupAllowFrom: z.array(allowFromEntry).optional(), groupPolicy: z.enum(["disabled", "allowlist", "open"]).optional(), groups: z.object({}).catchall(groupConfigSchema).optional(), messagePrefix: z.string().optional(), diff --git a/extensions/zalouser/src/monitor.group-gating.test.ts b/extensions/zalouser/src/monitor.group-gating.test.ts index 7e11680b315..522a2e4a7b5 100644 --- a/extensions/zalouser/src/monitor.group-gating.test.ts +++ b/extensions/zalouser/src/monitor.group-gating.test.ts @@ -49,11 +49,65 @@ function createRuntimeEnv(): RuntimeEnv { }; } -function installRuntime(params: { commandAuthorized: boolean }) { +function installRuntime(params: { + commandAuthorized?: boolean; + resolveCommandAuthorizedFromAuthorizers?: (params: { + useAccessGroups: boolean; + authorizers: Array<{ configured: boolean; allowed: boolean }>; + }) => boolean; +}) { const dispatchReplyWithBufferedBlockDispatcher = vi.fn(async ({ dispatcherOptions, ctx }) => { await dispatcherOptions.typingCallbacks?.onReplyStart?.(); return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 }, ctx }; }); + const resolveCommandAuthorizedFromAuthorizers = vi.fn( + (input: { + useAccessGroups: boolean; + authorizers: Array<{ configured: boolean; allowed: boolean }>; + }) => { + if (params.resolveCommandAuthorizedFromAuthorizers) { + return params.resolveCommandAuthorizedFromAuthorizers(input); + } + return params.commandAuthorized ?? false; + }, + ); + const resolveAgentRoute = vi.fn((input: { peer?: { kind?: string; id?: string } }) => { + const peerKind = input.peer?.kind === "direct" ? "direct" : "group"; + const peerId = input.peer?.id ?? "1"; + return { + agentId: "main", + sessionKey: + peerKind === "direct" ? "agent:main:main" : `agent:main:zalouser:${peerKind}:${peerId}`, + accountId: "default", + mainSessionKey: "agent:main:main", + }; + }); + const readAllowFromStore = vi.fn(async () => []); + const readSessionUpdatedAt = vi.fn(() => undefined); + const buildAgentSessionKey = vi.fn( + (input: { + agentId: string; + channel: string; + accountId?: string; + peer?: { kind?: string; id?: string }; + dmScope?: string; + }) => { + const peerKind = input.peer?.kind === "direct" ? "direct" : "group"; + const peerId = input.peer?.id ?? "1"; + if (peerKind === "direct") { + if (input.dmScope === "per-account-channel-peer") { + return `agent:${input.agentId}:${input.channel}:${input.accountId ?? "default"}:direct:${peerId}`; + } + if (input.dmScope === "per-peer") { + return `agent:${input.agentId}:direct:${peerId}`; + } + if (input.dmScope === "main" || !input.dmScope) { + return "agent:main:main"; + } + } + return `agent:${input.agentId}:${input.channel}:${peerKind}:${peerId}`; + }, + ); setZalouserRuntime({ logging: { @@ -61,13 +115,13 @@ function installRuntime(params: { commandAuthorized: boolean }) { }, channel: { pairing: { - readAllowFromStore: vi.fn(async () => []), + readAllowFromStore, upsertPairingRequest: vi.fn(async () => ({ code: "PAIR", created: true })), buildPairingReply: vi.fn(() => "pair"), }, commands: { shouldComputeCommandAuthorized: vi.fn((body: string) => body.trim().startsWith("/")), - resolveCommandAuthorizedFromAuthorizers: vi.fn(() => params.commandAuthorized), + resolveCommandAuthorizedFromAuthorizers, isControlCommandMessage: vi.fn((body: string) => body.trim().startsWith("/")), shouldHandleTextCommands: vi.fn(() => true), }, @@ -93,16 +147,12 @@ function installRuntime(params: { commandAuthorized: boolean }) { }), }, routing: { - resolveAgentRoute: vi.fn(() => ({ - agentId: "main", - sessionKey: "agent:main:zalouser:group:1", - accountId: "default", - mainSessionKey: "agent:main:main", - })), + buildAgentSessionKey, + resolveAgentRoute, }, session: { resolveStorePath: vi.fn(() => "/tmp"), - readSessionUpdatedAt: vi.fn(() => undefined), + readSessionUpdatedAt, recordInboundSession: vi.fn(async () => {}), }, reply: { @@ -120,7 +170,14 @@ function installRuntime(params: { commandAuthorized: boolean }) { }, } as unknown as PluginRuntime); - return { dispatchReplyWithBufferedBlockDispatcher }; + return { + dispatchReplyWithBufferedBlockDispatcher, + resolveAgentRoute, + resolveCommandAuthorizedFromAuthorizers, + readAllowFromStore, + readSessionUpdatedAt, + buildAgentSessionKey, + }; } function createGroupMessage(overrides: Partial = {}): ZaloInboundMessage { @@ -142,6 +199,21 @@ function createGroupMessage(overrides: Partial = {}): ZaloIn }; } +function createDmMessage(overrides: Partial = {}): ZaloInboundMessage { + return { + threadId: "u-1", + isGroup: false, + senderId: "321", + senderName: "Bob", + groupName: undefined, + content: "hello", + timestampMs: Date.now(), + msgId: "dm-1", + raw: { source: "test" }, + ...overrides, + }; +} + describe("zalouser monitor group mention gating", () => { beforeEach(() => { sendMessageZalouserMock.mockClear(); @@ -165,6 +237,25 @@ describe("zalouser monitor group mention gating", () => { expect(sendTypingZalouserMock).not.toHaveBeenCalled(); }); + it("fails closed when requireMention=true but mention detection is unavailable", async () => { + const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({ + commandAuthorized: false, + }); + await __testing.processMessage({ + message: createGroupMessage({ + canResolveExplicitMention: false, + hasAnyMention: false, + wasExplicitlyMentioned: false, + }), + account: createAccount(), + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled(); + expect(sendTypingZalouserMock).not.toHaveBeenCalled(); + }); + it("dispatches explicitly-mentioned group messages and marks WasMentioned", async () => { const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({ commandAuthorized: false, @@ -183,6 +274,8 @@ describe("zalouser monitor group mention gating", () => { expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; expect(callArg?.ctx?.WasMentioned).toBe(true); + expect(callArg?.ctx?.To).toBe("zalouser:group:g-1"); + expect(callArg?.ctx?.OriginatingTo).toBe("zalouser:group:g-1"); expect(sendTypingZalouserMock).toHaveBeenCalledWith("g-1", { profile: "default", isGroup: true, @@ -208,4 +301,277 @@ describe("zalouser monitor group mention gating", () => { const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; expect(callArg?.ctx?.WasMentioned).toBe(true); }); + + it("uses commandContent for mention-prefixed control commands", async () => { + const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({ + commandAuthorized: true, + }); + await __testing.processMessage({ + message: createGroupMessage({ + content: "@Bot /new", + commandContent: "/new", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account: createAccount(), + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); + const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; + expect(callArg?.ctx?.CommandBody).toBe("/new"); + expect(callArg?.ctx?.BodyForCommands).toBe("/new"); + }); + + it("allows group control commands when only allowFrom is configured", async () => { + const { dispatchReplyWithBufferedBlockDispatcher, resolveCommandAuthorizedFromAuthorizers } = + installRuntime({ + resolveCommandAuthorizedFromAuthorizers: ({ useAccessGroups, authorizers }) => + useAccessGroups && authorizers.some((entry) => entry.configured && entry.allowed), + }); + await __testing.processMessage({ + message: createGroupMessage({ + content: "/new", + commandContent: "/new", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account: { + ...createAccount(), + config: { + ...createAccount().config, + allowFrom: ["123"], + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); + const authCall = resolveCommandAuthorizedFromAuthorizers.mock.calls[0]?.[0]; + expect(authCall?.authorizers).toEqual([ + { configured: true, allowed: true }, + { configured: true, allowed: true }, + ]); + }); + + it("blocks group messages when sender is not in groupAllowFrom/allowFrom", async () => { + const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({ + commandAuthorized: false, + }); + await __testing.processMessage({ + message: createGroupMessage({ + content: "ping @bot", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account: { + ...createAccount(), + config: { + ...createAccount().config, + groupPolicy: "allowlist", + allowFrom: ["999"], + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled(); + }); + + it("allows group control commands when sender is in groupAllowFrom", async () => { + const { dispatchReplyWithBufferedBlockDispatcher, resolveCommandAuthorizedFromAuthorizers } = + installRuntime({ + resolveCommandAuthorizedFromAuthorizers: ({ useAccessGroups, authorizers }) => + useAccessGroups && authorizers.some((entry) => entry.configured && entry.allowed), + }); + await __testing.processMessage({ + message: createGroupMessage({ + content: "/new", + commandContent: "/new", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account: { + ...createAccount(), + config: { + ...createAccount().config, + allowFrom: ["999"], + groupAllowFrom: ["123"], + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); + const authCall = resolveCommandAuthorizedFromAuthorizers.mock.calls[0]?.[0]; + expect(authCall?.authorizers).toEqual([ + { configured: true, allowed: false }, + { configured: true, allowed: true }, + ]); + }); + + it("routes DM messages with direct peer kind", async () => { + const { dispatchReplyWithBufferedBlockDispatcher, resolveAgentRoute, buildAgentSessionKey } = + installRuntime({ + commandAuthorized: false, + }); + const account = createAccount(); + await __testing.processMessage({ + message: createDmMessage(), + account: { + ...account, + config: { + ...account.config, + dmPolicy: "open", + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(resolveAgentRoute).toHaveBeenCalledWith( + expect.objectContaining({ + peer: { kind: "direct", id: "321" }, + }), + ); + expect(buildAgentSessionKey).toHaveBeenCalledWith( + expect.objectContaining({ + peer: { kind: "direct", id: "321" }, + dmScope: "per-channel-peer", + }), + ); + const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; + expect(callArg?.ctx?.SessionKey).toBe("agent:main:zalouser:direct:321"); + }); + + it("reuses the legacy DM session key when only the old group-shaped session exists", async () => { + const { dispatchReplyWithBufferedBlockDispatcher, readSessionUpdatedAt } = installRuntime({ + commandAuthorized: false, + }); + readSessionUpdatedAt.mockImplementation(({ sessionKey }: { sessionKey: string }) => + sessionKey === "agent:main:zalouser:group:321" ? 123 : undefined, + ); + const account = createAccount(); + await __testing.processMessage({ + message: createDmMessage(), + account: { + ...account, + config: { + ...account.config, + dmPolicy: "open", + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; + expect(callArg?.ctx?.SessionKey).toBe("agent:main:zalouser:group:321"); + }); + + it("reads pairing store for open DM control commands", async () => { + const { readAllowFromStore } = installRuntime({ + commandAuthorized: false, + }); + const account = createAccount(); + await __testing.processMessage({ + message: createDmMessage({ content: "/new", commandContent: "/new" }), + account: { + ...account, + config: { + ...account.config, + dmPolicy: "open", + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(readAllowFromStore).toHaveBeenCalledTimes(1); + }); + + it("skips pairing store read for open DM non-command messages", async () => { + const { readAllowFromStore } = installRuntime({ + commandAuthorized: false, + }); + const account = createAccount(); + await __testing.processMessage({ + message: createDmMessage({ content: "hello there" }), + account: { + ...account, + config: { + ...account.config, + dmPolicy: "open", + }, + }, + config: createConfig(), + runtime: createRuntimeEnv(), + }); + + expect(readAllowFromStore).not.toHaveBeenCalled(); + }); + + it("includes skipped group messages as InboundHistory on the next processed message", async () => { + const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({ + commandAuthorized: false, + }); + const historyState = { + historyLimit: 5, + groupHistories: new Map< + string, + Array<{ sender: string; body: string; timestamp?: number; messageId?: string }> + >(), + }; + const account = createAccount(); + const config = createConfig(); + await __testing.processMessage({ + message: createGroupMessage({ + content: "first unmentioned line", + hasAnyMention: false, + wasExplicitlyMentioned: false, + }), + account, + config, + runtime: createRuntimeEnv(), + historyState, + }); + expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled(); + + await __testing.processMessage({ + message: createGroupMessage({ + content: "second line @bot", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account, + config, + runtime: createRuntimeEnv(), + historyState, + }); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); + const firstDispatch = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0]; + expect(firstDispatch?.ctx?.InboundHistory).toEqual([ + expect.objectContaining({ sender: "Alice", body: "first unmentioned line" }), + ]); + expect(String(firstDispatch?.ctx?.Body ?? "")).toContain("first unmentioned line"); + + await __testing.processMessage({ + message: createGroupMessage({ + content: "third line @bot", + hasAnyMention: true, + wasExplicitlyMentioned: true, + }), + account, + config, + runtime: createRuntimeEnv(), + historyState, + }); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(2); + const secondDispatch = dispatchReplyWithBufferedBlockDispatcher.mock.calls[1]?.[0]; + expect(secondDispatch?.ctx?.InboundHistory).toEqual([]); + }); }); diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index b7d7963956d..6590082e830 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -1,3 +1,13 @@ +import { + DM_GROUP_ACCESS_REASON, + DEFAULT_GROUP_HISTORY_LIMIT, + type HistoryEntry, + KeyedAsyncQueue, + buildPendingHistoryContextFromMap, + clearHistoryEntriesIfEnabled, + recordPendingHistoryEntryIfEnabled, + resolveDmGroupAccessWithLists, +} from "openclaw/plugin-sdk/compat"; import type { MarkdownTableMode, OpenClawConfig, @@ -73,8 +83,111 @@ function buildNameIndex(items: T[], nameFn: (item: T) => string | undefined): return index; } +function resolveUserAllowlistEntries( + entries: string[], + byName: Map>, +): { + additions: string[]; + mapping: string[]; + unresolved: string[]; +} { + const additions: string[] = []; + const mapping: string[] = []; + const unresolved: string[] = []; + for (const entry of entries) { + if (/^\d+$/.test(entry)) { + additions.push(entry); + continue; + } + const matches = byName.get(entry.toLowerCase()) ?? []; + const match = matches[0]; + const id = match?.userId ? String(match.userId) : undefined; + if (id) { + additions.push(id); + mapping.push(`${entry}->${id}`); + } else { + unresolved.push(entry); + } + } + return { additions, mapping, unresolved }; +} + type ZalouserCoreRuntime = ReturnType; +type ZalouserGroupHistoryState = { + historyLimit: number; + groupHistories: Map; +}; + +function resolveInboundQueueKey(message: ZaloInboundMessage): string { + const threadId = message.threadId?.trim() || "unknown"; + if (message.isGroup) { + return `group:${threadId}`; + } + const senderId = message.senderId?.trim(); + return `direct:${senderId || threadId}`; +} + +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +function resolveZalouserDmSessionScope(config: OpenClawConfig) { + const configured = config.session?.dmScope; + return configured === "main" || !configured ? "per-channel-peer" : configured; +} + +function resolveZalouserInboundSessionKey(params: { + core: ZalouserCoreRuntime; + config: OpenClawConfig; + route: { agentId: string; accountId: string; sessionKey: string }; + storePath: string; + isGroup: boolean; + senderId: string; +}): string { + if (params.isGroup) { + return params.route.sessionKey; + } + + const directSessionKey = params.core.channel.routing + .buildAgentSessionKey({ + agentId: params.route.agentId, + channel: "zalouser", + accountId: params.route.accountId, + peer: { kind: "direct", id: params.senderId }, + dmScope: resolveZalouserDmSessionScope(params.config), + identityLinks: params.config.session?.identityLinks, + }) + .toLowerCase(); + const legacySessionKey = params.core.channel.routing + .buildAgentSessionKey({ + agentId: params.route.agentId, + channel: "zalouser", + accountId: params.route.accountId, + peer: { kind: "group", id: params.senderId }, + }) + .toLowerCase(); + const hasDirectSession = + params.core.channel.session.readSessionUpdatedAt({ + storePath: params.storePath, + sessionKey: directSessionKey, + }) !== undefined; + const hasLegacySession = + params.core.channel.session.readSessionUpdatedAt({ + storePath: params.storePath, + sessionKey: legacySessionKey, + }) !== undefined; + + // Keep existing DM history on upgrade, but use canonical direct keys for new sessions. + return hasLegacySession && !hasDirectSession ? legacySessionKey : directSessionKey; +} + function logVerbose(core: ZalouserCoreRuntime, runtime: RuntimeEnv, message: string): void { if (core.logging.shouldLogVerbose()) { runtime.log(`[zalouser] ${message}`); @@ -139,6 +252,7 @@ async function processMessage( config: OpenClawConfig, core: ZalouserCoreRuntime, runtime: RuntimeEnv, + historyState: ZalouserGroupHistoryState, statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, ): Promise { const pairing = createScopedPairingAccess({ @@ -151,6 +265,7 @@ async function processMessage( if (!rawBody) { return; } + const commandBody = message.commandContent?.trim() || rawBody; const isGroup = message.isGroup; const chatId = message.threadId; @@ -237,65 +352,90 @@ async function processMessage( const dmPolicy = account.config.dmPolicy ?? "pairing"; const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v)); - const { senderAllowedForCommands, commandAuthorized } = await resolveSenderCommandAuthorization({ + const configGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v)); + const shouldComputeCommandAuth = core.channel.commands.shouldComputeCommandAuthorized( + commandBody, + config, + ); + const storeAllowFrom = + !isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeCommandAuth) + ? await pairing.readAllowFromStore().catch(() => []) + : []; + const accessDecision = resolveDmGroupAccessWithLists({ + isGroup, + dmPolicy, + groupPolicy, + allowFrom: configAllowFrom, + groupAllowFrom: configGroupAllowFrom, + storeAllowFrom, + isSenderAllowed: (allowFrom) => isSenderAllowed(senderId, allowFrom), + }); + if (isGroup && accessDecision.decision !== "allow") { + if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) { + logVerbose(core, runtime, "Blocked zalouser group message (no group allowlist)"); + } else if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) { + logVerbose( + core, + runtime, + `Blocked zalouser sender ${senderId} (not in groupAllowFrom/allowFrom)`, + ); + } + return; + } + + if (!isGroup && accessDecision.decision !== "allow") { + if (accessDecision.decision === "pairing") { + await issuePairingChallenge({ + channel: "zalouser", + senderId, + senderIdLine: `Your Zalo user id: ${senderId}`, + meta: { name: senderName || undefined }, + upsertPairingRequest: pairing.upsertPairingRequest, + onCreated: () => { + logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`); + }, + sendPairingReply: async (text) => { + await sendMessageZalouser(chatId, text, { profile: account.profile }); + statusSink?.({ lastOutboundAt: Date.now() }); + }, + onReplyError: (err) => { + logVerbose( + core, + runtime, + `zalouser pairing reply failed for ${senderId}: ${String(err)}`, + ); + }, + }); + return; + } + if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) { + logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`); + } else { + logVerbose( + core, + runtime, + `Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`, + ); + } + return; + } + + const { commandAuthorized } = await resolveSenderCommandAuthorization({ cfg: config, - rawBody, + rawBody: commandBody, isGroup, dmPolicy, configuredAllowFrom: configAllowFrom, + configuredGroupAllowFrom: configGroupAllowFrom, senderId, isSenderAllowed, - readAllowFromStore: pairing.readAllowFromStore, + readAllowFromStore: async () => storeAllowFrom, shouldComputeCommandAuthorized: (body, cfg) => core.channel.commands.shouldComputeCommandAuthorized(body, cfg), resolveCommandAuthorizedFromAuthorizers: (params) => core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params), }); - - if (!isGroup) { - if (dmPolicy === "disabled") { - logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`); - return; - } - - if (dmPolicy !== "open") { - const allowed = senderAllowedForCommands; - if (!allowed) { - if (dmPolicy === "pairing") { - await issuePairingChallenge({ - channel: "zalouser", - senderId, - senderIdLine: `Your Zalo user id: ${senderId}`, - meta: { name: senderName || undefined }, - upsertPairingRequest: pairing.upsertPairingRequest, - onCreated: () => { - logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`); - }, - sendPairingReply: async (text) => { - await sendMessageZalouser(chatId, text, { profile: account.profile }); - statusSink?.({ lastOutboundAt: Date.now() }); - }, - onReplyError: (err) => { - logVerbose( - core, - runtime, - `zalouser pairing reply failed for ${senderId}: ${String(err)}`, - ); - }, - }); - } else { - logVerbose( - core, - runtime, - `Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`, - ); - } - return; - } - } - } - - const hasControlCommand = core.channel.commands.isControlCommandMessage(rawBody, config); + const hasControlCommand = core.channel.commands.isControlCommandMessage(commandBody, config); if (isGroup && hasControlCommand && commandAuthorized !== true) { logVerbose( core, @@ -307,18 +447,19 @@ async function processMessage( const peer = isGroup ? { kind: "group" as const, id: chatId } - : { kind: "group" as const, id: senderId }; + : { kind: "direct" as const, id: senderId }; const route = core.channel.routing.resolveAgentRoute({ cfg: config, channel: "zalouser", accountId: account.accountId, peer: { - // Use "group" kind to avoid dmScope=main collapsing all DMs into the main session. + // Keep DM peer kind as "direct" so session keys follow dmScope and UI labels stay DM-shaped. kind: peer.kind, id: peer.id, }, }); + const historyKey = isGroup ? route.sessionKey : undefined; const requireMention = isGroup ? resolveGroupRequireMention({ @@ -340,10 +481,11 @@ async function processMessage( explicit: explicitMention, }) : true; + const canDetectMention = mentionRegexes.length > 0 || explicitMention.canResolveExplicit; const mentionGate = resolveMentionGatingWithBypass({ isGroup, requireMention, - canDetectMention: mentionRegexes.length > 0 || explicitMention.canResolveExplicit, + canDetectMention, wasMentioned, implicitMention: message.implicitMention === true, hasAnyMention: explicitMention.hasAnyMention, @@ -354,7 +496,32 @@ async function processMessage( hasControlCommand, commandAuthorized: commandAuthorized === true, }); + if (isGroup && requireMention && !canDetectMention && !mentionGate.effectiveWasMentioned) { + runtime.error?.( + `[${account.accountId}] zalouser mention required but detection unavailable ` + + `(missing mention regexes and bot self id); dropping group ${chatId}`, + ); + return; + } if (isGroup && mentionGate.shouldSkip) { + recordPendingHistoryEntryIfEnabled({ + historyMap: historyState.groupHistories, + historyKey: historyKey ?? "", + limit: historyState.historyLimit, + entry: + historyKey && rawBody + ? { + sender: senderName || senderId, + body: rawBody, + timestamp: message.timestampMs, + messageId: resolveZalouserMessageSid({ + msgId: message.msgId, + cliMsgId: message.cliMsgId, + fallback: `${message.timestampMs}`, + }), + } + : null, + }); logVerbose(core, runtime, `zalouser: skip group ${chatId} (mention required, not mentioned)`); return; } @@ -363,10 +530,18 @@ async function processMessage( const storePath = core.channel.session.resolveStorePath(config.session?.store, { agentId: route.agentId, }); + const inboundSessionKey = resolveZalouserInboundSessionKey({ + core, + config, + route, + storePath, + isGroup, + senderId, + }); const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config); const previousTimestamp = core.channel.session.readSessionUpdatedAt({ storePath, - sessionKey: route.sessionKey, + sessionKey: inboundSessionKey, }); const body = core.channel.reply.formatAgentEnvelope({ channel: "Zalo Personal", @@ -376,15 +551,46 @@ async function processMessage( envelope: envelopeOptions, body: rawBody, }); + const combinedBody = + isGroup && historyKey + ? buildPendingHistoryContextFromMap({ + historyMap: historyState.groupHistories, + historyKey, + limit: historyState.historyLimit, + currentMessage: body, + formatEntry: (entry) => + core.channel.reply.formatAgentEnvelope({ + channel: "Zalo Personal", + from: fromLabel, + timestamp: entry.timestamp, + envelope: envelopeOptions, + body: `${entry.sender}: ${entry.body}${ + entry.messageId ? ` [id:${entry.messageId}]` : "" + }`, + }), + }) + : body; + const inboundHistory = + isGroup && historyKey && historyState.historyLimit > 0 + ? (historyState.groupHistories.get(historyKey) ?? []).map((entry) => ({ + sender: entry.sender, + body: entry.body, + timestamp: entry.timestamp, + })) + : undefined; + + const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`; const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: body, + Body: combinedBody, BodyForAgent: rawBody, + InboundHistory: inboundHistory, RawBody: rawBody, - CommandBody: rawBody, + CommandBody: commandBody, + BodyForCommands: commandBody, From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`, - To: `zalouser:${chatId}`, - SessionKey: route.sessionKey, + To: normalizedTo, + SessionKey: inboundSessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", ConversationLabel: fromLabel, @@ -407,7 +613,7 @@ async function processMessage( cliMsgId: message.cliMsgId, }), OriginatingChannel: "zalouser", - OriginatingTo: `zalouser:${chatId}`, + OriginatingTo: normalizedTo, }); await core.channel.session.recordInboundSession({ @@ -433,6 +639,9 @@ async function processMessage( }); }, onStartError: (err) => { + runtime.error?.( + `[${account.accountId}] zalouser typing start failed for ${chatId}: ${String(err)}`, + ); logVerbose(core, runtime, `zalouser typing failed for ${chatId}: ${String(err)}`); }, }); @@ -469,6 +678,13 @@ async function processMessage( onModelSelected, }, }); + if (isGroup && historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: historyState.groupHistories, + historyKey, + limit: historyState.historyLimit, + }); + } } async function deliverZalouserReply(params: { @@ -534,43 +750,60 @@ export async function monitorZalouserProvider( const { abortSignal, statusSink, runtime } = options; const core = getZalouserRuntime(); + const inboundQueue = new KeyedAsyncQueue(); + const historyLimit = Math.max( + 0, + account.config.historyLimit ?? + config.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = new Map(); try { const profile = account.profile; const allowFromEntries = (account.config.allowFrom ?? []) .map((entry) => normalizeZalouserEntry(String(entry))) .filter((entry) => entry && entry !== "*"); + const groupAllowFromEntries = (account.config.groupAllowFrom ?? []) + .map((entry) => normalizeZalouserEntry(String(entry))) + .filter((entry) => entry && entry !== "*"); - if (allowFromEntries.length > 0) { + if (allowFromEntries.length > 0 || groupAllowFromEntries.length > 0) { const friends = await listZaloFriends(profile); const byName = buildNameIndex(friends, (friend) => friend.displayName); - const additions: string[] = []; - const mapping: string[] = []; - const unresolved: string[] = []; - for (const entry of allowFromEntries) { - if (/^\d+$/.test(entry)) { - additions.push(entry); - continue; - } - const matches = byName.get(entry.toLowerCase()) ?? []; - const match = matches[0]; - const id = match?.userId ? String(match.userId) : undefined; - if (id) { - additions.push(id); - mapping.push(`${entry}→${id}`); - } else { - unresolved.push(entry); - } + if (allowFromEntries.length > 0) { + const { additions, mapping, unresolved } = resolveUserAllowlistEntries( + allowFromEntries, + byName, + ); + const allowFrom = mergeAllowlist({ existing: account.config.allowFrom, additions }); + account = { + ...account, + config: { + ...account.config, + allowFrom, + }, + }; + summarizeMapping("zalouser users", mapping, unresolved, runtime); + } + if (groupAllowFromEntries.length > 0) { + const { additions, mapping, unresolved } = resolveUserAllowlistEntries( + groupAllowFromEntries, + byName, + ); + const groupAllowFrom = mergeAllowlist({ + existing: account.config.groupAllowFrom, + additions, + }); + account = { + ...account, + config: { + ...account.config, + groupAllowFrom, + }, + }; + summarizeMapping("zalouser group users", mapping, unresolved, runtime); } - const allowFrom = mergeAllowlist({ existing: account.config.allowFrom, additions }); - account = { - ...account, - config: { - ...account.config, - allowFrom, - }, - }; - summarizeMapping("zalouser users", mapping, unresolved, runtime); } const groupsConfig = account.config.groups ?? {}; @@ -627,40 +860,92 @@ export async function monitorZalouserProvider( listenerStop = null; }; - const listener = await startZaloListener({ - accountId: account.accountId, - profile: account.profile, - abortSignal, - onMessage: (msg) => { - if (stopped) { - return; - } - logVerbose(core, runtime, `[${account.accountId}] inbound message`); - statusSink?.({ lastInboundAt: Date.now() }); - processMessage(msg, account, config, core, runtime, statusSink).catch((err) => { - runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`); - }); - }, - onError: (err) => { - if (stopped || abortSignal.aborted) { - return; - } - runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`); - }, - }); + let settled = false; + const { promise: waitForExit, resolve: resolveRun, reject: rejectRun } = createDeferred(); + + const settleSuccess = () => { + if (settled) { + return; + } + settled = true; + stop(); + resolveRun(); + }; + + const settleFailure = (error: unknown) => { + if (settled) { + return; + } + settled = true; + stop(); + rejectRun(error instanceof Error ? error : new Error(String(error))); + }; + + const onAbort = () => { + settleSuccess(); + }; + abortSignal.addEventListener("abort", onAbort, { once: true }); + + let listener: Awaited>; + try { + listener = await startZaloListener({ + accountId: account.accountId, + profile: account.profile, + abortSignal, + onMessage: (msg) => { + if (stopped) { + return; + } + logVerbose(core, runtime, `[${account.accountId}] inbound message`); + statusSink?.({ lastInboundAt: Date.now() }); + const queueKey = resolveInboundQueueKey(msg); + void inboundQueue + .enqueue(queueKey, async () => { + if (stopped || abortSignal.aborted) { + return; + } + await processMessage( + msg, + account, + config, + core, + runtime, + { historyLimit, groupHistories }, + statusSink, + ); + }) + .catch((err) => { + runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`); + }); + }, + onError: (err) => { + if (stopped || abortSignal.aborted) { + return; + } + runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`); + settleFailure(err); + }, + }); + } catch (error) { + abortSignal.removeEventListener("abort", onAbort); + throw error; + } listenerStop = listener.stop; + if (stopped) { + listenerStop(); + listenerStop = null; + } - await new Promise((resolve) => { - abortSignal.addEventListener( - "abort", - () => { - stop(); - resolve(); - }, - { once: true }, - ); - }); + if (abortSignal.aborted) { + settleSuccess(); + } + + try { + await waitForExit; + } finally { + abortSignal.removeEventListener("abort", onAbort); + } return { stop }; } @@ -671,14 +956,27 @@ export const __testing = { account: ResolvedZalouserAccount; config: OpenClawConfig; runtime: RuntimeEnv; + historyState?: { + historyLimit?: number; + groupHistories?: Map; + }; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }) => { + const historyLimit = Math.max( + 0, + params.historyState?.historyLimit ?? + params.account.config.historyLimit ?? + params.config.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = params.historyState?.groupHistories ?? new Map(); await processMessage( params.message, params.account, params.config, getZalouserRuntime(), params.runtime, + { historyLimit, groupHistories }, params.statusSink, ); }, diff --git a/extensions/zalouser/src/types.ts b/extensions/zalouser/src/types.ts index aae9e43f6fa..d704a1b3f78 100644 --- a/extensions/zalouser/src/types.ts +++ b/extensions/zalouser/src/types.ts @@ -35,6 +35,7 @@ export type ZaloInboundMessage = { senderName?: string; groupName?: string; content: string; + commandContent?: string; timestampMs: number; msgId?: string; cliMsgId?: string; @@ -92,6 +93,8 @@ type ZalouserSharedConfig = { profile?: string; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; allowFrom?: Array; + historyLimit?: number; + groupAllowFrom?: Array; groupPolicy?: "open" | "allowlist" | "disabled"; groups?: Record; messagePrefix?: string; diff --git a/extensions/zalouser/src/zalo-js.ts b/extensions/zalouser/src/zalo-js.ts index 206efaed2a5..25d263b7d6a 100644 --- a/extensions/zalouser/src/zalo-js.ts +++ b/extensions/zalouser/src/zalo-js.ts @@ -37,6 +37,8 @@ const DEFAULT_QR_WAIT_TIMEOUT_MS = 120_000; const GROUP_INFO_CHUNK_SIZE = 80; const GROUP_CONTEXT_CACHE_TTL_MS = 5 * 60_000; const GROUP_CONTEXT_CACHE_MAX_ENTRIES = 500; +const LISTENER_WATCHDOG_INTERVAL_MS = 30_000; +const LISTENER_WATCHDOG_MAX_GAP_MS = 35_000; const apiByProfile = new Map(); const apiInitByProfile = new Map>(); @@ -63,6 +65,8 @@ type ActiveZaloListener = { const activeListeners = new Map(); const groupContextCache = new Map(); +type AccountInfoResponse = Awaited>; + type ApiTypingCapability = { sendTypingEvent: ( threadId: string, @@ -155,6 +159,20 @@ function toStringValue(value: unknown): string { return ""; } +function normalizeAccountInfoUser(info: AccountInfoResponse): User | null { + if (!info || typeof info !== "object") { + return null; + } + if ("profile" in info) { + const profile = (info as { profile?: unknown }).profile; + if (profile && typeof profile === "object") { + return profile as User; + } + return null; + } + return info as User; +} + function toInteger(value: unknown, fallback = 0): number { if (typeof value === "number" && Number.isFinite(value)) { return Math.trunc(value); @@ -199,18 +217,128 @@ function resolveInboundTimestamp(rawTs: unknown): number { return parsed > 1_000_000_000_000 ? parsed : parsed * 1000; } -function extractMentionIds(raw: unknown): string[] { - if (!Array.isArray(raw)) { +function extractMentionIds(rawMentions: unknown): string[] { + if (!Array.isArray(rawMentions)) { return []; } - return raw - .map((entry) => { - if (!entry || typeof entry !== "object") { - return ""; - } - return toNumberId((entry as { uid?: unknown }).uid); - }) - .filter(Boolean); + const sink = new Set(); + for (const entry of rawMentions) { + if (!entry || typeof entry !== "object") { + continue; + } + const record = entry as { uid?: unknown }; + const id = toNumberId(record.uid); + if (id) { + sink.add(id); + } + } + return Array.from(sink); +} + +type MentionSpan = { + start: number; + end: number; +}; + +function toNonNegativeInteger(value: unknown): number | null { + if (typeof value === "number" && Number.isFinite(value)) { + const normalized = Math.trunc(value); + return normalized >= 0 ? normalized : null; + } + if (typeof value === "string" && value.trim().length > 0) { + const parsed = Number.parseInt(value.trim(), 10); + if (Number.isFinite(parsed)) { + return parsed >= 0 ? parsed : null; + } + } + return null; +} + +function extractOwnMentionSpans( + rawMentions: unknown, + ownUserId: string, + contentLength: number, +): MentionSpan[] { + if (!Array.isArray(rawMentions) || !ownUserId || contentLength <= 0) { + return []; + } + const spans: MentionSpan[] = []; + for (const entry of rawMentions) { + if (!entry || typeof entry !== "object") { + continue; + } + const record = entry as { + uid?: unknown; + pos?: unknown; + start?: unknown; + offset?: unknown; + len?: unknown; + length?: unknown; + }; + const uid = toNumberId(record.uid); + if (!uid || uid !== ownUserId) { + continue; + } + const startRaw = toNonNegativeInteger(record.pos ?? record.start ?? record.offset); + const lengthRaw = toNonNegativeInteger(record.len ?? record.length); + if (startRaw === null || lengthRaw === null || lengthRaw <= 0) { + continue; + } + const start = Math.min(startRaw, contentLength); + const end = Math.min(start + lengthRaw, contentLength); + if (end <= start) { + continue; + } + spans.push({ start, end }); + } + if (spans.length <= 1) { + return spans; + } + spans.sort((a, b) => a.start - b.start); + const merged: MentionSpan[] = []; + for (const span of spans) { + const last = merged[merged.length - 1]; + if (!last || span.start > last.end) { + merged.push({ ...span }); + continue; + } + last.end = Math.max(last.end, span.end); + } + return merged; +} + +function stripOwnMentionsForCommandBody( + content: string, + rawMentions: unknown, + ownUserId: string, +): string { + if (!content || !ownUserId) { + return content; + } + const spans = extractOwnMentionSpans(rawMentions, ownUserId, content.length); + if (spans.length === 0) { + return stripLeadingAtMentionForCommand(content); + } + let cursor = 0; + let output = ""; + for (const span of spans) { + if (span.start > cursor) { + output += content.slice(cursor, span.start); + } + cursor = Math.max(cursor, span.end); + } + if (cursor < content.length) { + output += content.slice(cursor); + } + return output.replace(/\s+/g, " ").trim(); +} + +function stripLeadingAtMentionForCommand(content: string): string { + const fallbackMatch = content.match(/^\s*@[^\s]+(?:\s+|[:,-]\s*)([/!][\s\S]*)$/); + if (!fallbackMatch) { + return content; + } + return fallbackMatch[1].trim(); } function resolveGroupNameFromMessageData(data: Record): string | undefined { @@ -250,9 +378,14 @@ function extractSendMessageId(result: unknown): string | undefined { return undefined; } const payload = result as { + msgId?: string | number; message?: { msgId?: string | number } | null; attachment?: Array<{ msgId?: string | number }>; }; + const direct = payload.msgId; + if (direct !== undefined && direct !== null) { + return String(direct); + } const primary = payload.message?.msgId; if (primary !== undefined && primary !== null) { return String(primary); @@ -311,6 +444,35 @@ function resolveMediaFileName(params: { return `upload.${ext}`; } +function resolveUploadedVoiceAsset( + uploaded: Array<{ + fileType?: string; + fileUrl?: string; + fileName?: string; + }>, +): { fileUrl: string; fileName?: string } | undefined { + for (const item of uploaded) { + if (!item || typeof item !== "object") { + continue; + } + const fileType = item.fileType?.toLowerCase(); + const fileUrl = item.fileUrl?.trim(); + if (!fileUrl) { + continue; + } + if (fileType === "others" || fileType === "video") { + return { fileUrl, fileName: item.fileName?.trim() || undefined }; + } + } + return undefined; +} + +function buildZaloVoicePlaybackUrl(asset: { fileUrl: string; fileName?: string }): string { + // zca-js uses uploadAttachment(...).fileUrl directly for sendVoice. + // Appending filename can produce URLs that play only in the local session. + return asset.fileUrl.trim(); +} + function mapFriend(friend: User): ZcaFriend { return { userId: String(friend.userId), @@ -602,6 +764,11 @@ function toInboundMessage(message: Message, ownUserId?: string): ZaloInboundMess const wasExplicitlyMentioned = Boolean( normalizedOwnUserId && mentionIds.some((id) => id === normalizedOwnUserId), ); + const commandContent = wasExplicitlyMentioned + ? stripOwnMentionsForCommandBody(content, data.mentions, normalizedOwnUserId) + : hasAnyMention && !canResolveExplicitMention + ? stripLeadingAtMentionForCommand(content) + : content; const implicitMention = Boolean( normalizedOwnUserId && quoteOwnerId && quoteOwnerId === normalizedOwnUserId, ); @@ -613,6 +780,7 @@ function toInboundMessage(message: Message, ownUserId?: string): ZaloInboundMess senderName: typeof data.dName === "string" ? data.dName.trim() || undefined : undefined, groupName: isGroup ? resolveGroupNameFromMessageData(data) : undefined, content, + commandContent, timestampMs: resolveInboundTimestamp(data.ts), msgId: typeof data.msgId === "string" ? data.msgId : undefined, cliMsgId: typeof data.cliMsgId === "string" ? data.cliMsgId : undefined, @@ -649,8 +817,7 @@ export async function getZaloUserInfo(profileInput?: string | null): Promise { - const info = await api.fetchAccountInfo(); - const profile = "profile" in info ? info.profile : info; - return toNumberId(profile.userId); + try { + const info = await api.fetchAccountInfo(); + const resolved = toNumberId(normalizeAccountInfoUser(info)?.userId); + if (resolved) { + return resolved; + } + } catch { + // Fall back to getOwnId when account info shape changes. + } + + try { + const ownId = toNumberId(api.getOwnId()); + if (ownId) { + return ownId; + } + } catch { + // Ignore fallback probe failures and keep mention detection conservative. + } + + return ""; } export async function sendZaloReaction(params: { @@ -1244,12 +1464,18 @@ export async function startZaloListener(params: { const api = await ensureApi(profile); const ownUserId = await resolveOwnUserId(api); let stopped = false; + let watchdogTimer: ReturnType | null = null; + let lastWatchdogTickAt = Date.now(); const cleanup = () => { if (stopped) { return; } stopped = true; + if (watchdogTimer) { + clearInterval(watchdogTimer); + watchdogTimer = null; + } try { api.listener.off("message", onMessage); api.listener.off("error", onError); @@ -1276,19 +1502,22 @@ export async function startZaloListener(params: { params.onMessage(normalized); }; - const onError = (error: unknown) => { + const failListener = (error: Error) => { if (stopped || params.abortSignal.aborted) { return; } + cleanup(); + invalidateApi(profile); + params.onError(error); + }; + + const onError = (error: unknown) => { const wrapped = error instanceof Error ? error : new Error(String(error)); - params.onError(wrapped); + failListener(wrapped); }; const onClosed = (code: number, reason: string) => { - if (stopped || params.abortSignal.aborted) { - return; - } - params.onError(new Error(`Zalo listener closed (${code}): ${reason || "no reason"}`)); + failListener(new Error(`Zalo listener closed (${code}): ${reason || "no reason"}`)); }; api.listener.on("message", onMessage); @@ -1296,12 +1525,30 @@ export async function startZaloListener(params: { api.listener.on("closed", onClosed); try { - api.listener.start({ retryOnClose: true }); + api.listener.start({ retryOnClose: false }); } catch (error) { cleanup(); throw error; } + watchdogTimer = setInterval(() => { + if (stopped || params.abortSignal.aborted) { + return; + } + const now = Date.now(); + const gapMs = now - lastWatchdogTickAt; + lastWatchdogTickAt = now; + if (gapMs <= LISTENER_WATCHDOG_MAX_GAP_MS) { + return; + } + failListener( + new Error( + `Zalo listener watchdog gap detected (${Math.round(gapMs / 1000)}s): forcing reconnect`, + ), + ); + }, LISTENER_WATCHDOG_INTERVAL_MS); + watchdogTimer.unref?.(); + params.abortSignal.addEventListener( "abort", () => { diff --git a/extensions/zalouser/src/zca-client.ts b/extensions/zalouser/src/zca-client.ts index 605b07522d6..57172eef64d 100644 --- a/extensions/zalouser/src/zca-client.ts +++ b/extensions/zalouser/src/zca-client.ts @@ -152,7 +152,7 @@ export type API = { cookies: unknown[]; }; }; - fetchAccountInfo(): Promise<{ profile: User } | User>; + fetchAccountInfo(): Promise; getAllFriends(): Promise; getOwnId(): string; getAllGroups(): Promise<{ @@ -177,9 +177,53 @@ export type API = { threadId: string, type?: number, ): Promise<{ + msgId?: string | number; message?: { msgId?: string | number } | null; attachment?: Array<{ msgId?: string | number }>; }>; + uploadAttachment( + sources: + | string + | { + data: Buffer; + filename: `${string}.${string}`; + metadata: { + totalSize: number; + width?: number; + height?: number; + }; + } + | Array< + | string + | { + data: Buffer; + filename: `${string}.${string}`; + metadata: { + totalSize: number; + width?: number; + height?: number; + }; + } + >, + threadId: string, + type?: number, + ): Promise< + Array<{ + fileType: "image" | "video" | "others"; + fileUrl?: string; + msgId?: string | number; + fileId?: string; + fileName?: string; + }> + >; + sendVoice( + options: { + voiceUrl: string; + ttl?: number; + }, + threadId: string, + type?: number, + ): Promise<{ msgId?: string | number }>; sendLink( payload: { link: string; msg?: string }, threadId: string, diff --git a/src/plugins/runtime/runtime-channel.ts b/src/plugins/runtime/runtime-channel.ts index 46a7813a9df..13c87d70805 100644 --- a/src/plugins/runtime/runtime-channel.ts +++ b/src/plugins/runtime/runtime-channel.ts @@ -92,7 +92,7 @@ import { readChannelAllowFromStore, upsertChannelPairingRequest, } from "../../pairing/pairing-store.js"; -import { resolveAgentRoute } from "../../routing/resolve-route.js"; +import { buildAgentSessionKey, resolveAgentRoute } from "../../routing/resolve-route.js"; import { monitorSignalProvider } from "../../signal/index.js"; import { probeSignal } from "../../signal/probe.js"; import { sendMessageSignal } from "../../signal/send.js"; @@ -144,6 +144,7 @@ export function createRuntimeChannel(): PluginRuntime["channel"] { resolveEnvelopeFormatOptions, }, routing: { + buildAgentSessionKey, resolveAgentRoute, }, pairing: { diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index 7aae373e23f..0d1da0e24fd 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -40,6 +40,7 @@ export type PluginRuntimeChannel = { resolveEnvelopeFormatOptions: typeof import("../../auto-reply/envelope.js").resolveEnvelopeFormatOptions; }; routing: { + buildAgentSessionKey: typeof import("../../routing/resolve-route.js").buildAgentSessionKey; resolveAgentRoute: typeof import("../../routing/resolve-route.js").resolveAgentRoute; }; pairing: {