fix: land #33992 from @darkamenosa

Co-authored-by: Tom <hxtxmu@gmail.com>
This commit is contained in:
Peter Steinberger
2026-03-08 04:48:05 +00:00
parent d9670093cb
commit fcdc1a13e1
14 changed files with 1398 additions and 166 deletions

View File

@@ -123,6 +123,17 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
})) as unknown as PluginRuntime["channel"]["reply"]["resolveEnvelopeFormatOptions"],
},
routing: {
buildAgentSessionKey: vi.fn(
({
agentId,
channel,
peer,
}: {
agentId: string;
channel: string;
peer?: { kind?: string; id?: string };
}) => `agent:${agentId}:${channel}:${peer?.kind ?? "direct"}:${peer?.id ?? "peer"}`,
) as unknown as PluginRuntime["channel"]["routing"]["buildAgentSessionKey"],
resolveAgentRoute: vi.fn(() => ({
agentId: "main",
accountId: "default",

View File

@@ -0,0 +1,72 @@
import type { RuntimeEnv } from "openclaw/plugin-sdk/zalouser";
import { describe, expect, it, vi } from "vitest";
const listZaloGroupMembersMock = vi.hoisted(() => vi.fn(async () => []));
vi.mock("./zalo-js.js", async (importOriginal) => {
const actual = (await importOriginal()) as Record<string, unknown>;
return {
...actual,
listZaloGroupMembers: listZaloGroupMembersMock,
};
});
vi.mock("./accounts.js", async (importOriginal) => {
const actual = (await importOriginal()) as Record<string, unknown>;
return {
...actual,
resolveZalouserAccountSync: () => ({
accountId: "default",
profile: "default",
name: "test",
enabled: true,
authenticated: true,
config: {},
}),
};
});
import { zalouserPlugin } from "./channel.js";
const runtimeStub: RuntimeEnv = {
log: vi.fn(),
error: vi.fn(),
exit: ((code: number): never => {
throw new Error(`exit ${code}`);
}) as RuntimeEnv["exit"],
};
describe("zalouser directory group members", () => {
it("accepts prefixed group ids from directory groups list output", async () => {
await zalouserPlugin.directory!.listGroupMembers!({
cfg: {},
accountId: "default",
groupId: "group:1471383327500481391",
runtime: runtimeStub,
});
expect(listZaloGroupMembersMock).toHaveBeenCalledWith("default", "1471383327500481391");
});
it("keeps backward compatibility for raw group ids", async () => {
await zalouserPlugin.directory!.listGroupMembers!({
cfg: {},
accountId: "default",
groupId: "1471383327500481391",
runtime: runtimeStub,
});
expect(listZaloGroupMembersMock).toHaveBeenCalledWith("default", "1471383327500481391");
});
it("accepts provider-native g- group ids without stripping the prefix", async () => {
await zalouserPlugin.directory!.listGroupMembers!({
cfg: {},
accountId: "default",
groupId: "g-1471383327500481391",
runtime: runtimeStub,
});
expect(listZaloGroupMembersMock).toHaveBeenCalledWith("default", "g-1471383327500481391");
});
});

View File

@@ -24,7 +24,7 @@ vi.mock("./accounts.js", async (importOriginal) => {
function baseCtx(payload: ReplyPayload) {
return {
cfg: {},
to: "987654321",
to: "user:987654321",
text: "",
payload,
};
@@ -49,6 +49,22 @@ describe("zalouserPlugin outbound sendPayload", () => {
expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-t1" });
});
it("group target delegates with isGroup=true and stripped threadId", async () => {
mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-g1" });
const result = await zalouserPlugin.outbound!.sendPayload!({
...baseCtx({ text: "hello group" }),
to: "group:1471383327500481391",
});
expect(mockedSend).toHaveBeenCalledWith(
"1471383327500481391",
"hello group",
expect.objectContaining({ isGroup: true }),
);
expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-g1" });
});
it("single media delegates to sendMedia", async () => {
mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-m1" });
@@ -64,6 +80,38 @@ describe("zalouserPlugin outbound sendPayload", () => {
expect(result).toMatchObject({ channel: "zalouser" });
});
it("treats bare numeric targets as direct chats for backward compatibility", async () => {
mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-d1" });
const result = await zalouserPlugin.outbound!.sendPayload!({
...baseCtx({ text: "hello" }),
to: "987654321",
});
expect(mockedSend).toHaveBeenCalledWith(
"987654321",
"hello",
expect.objectContaining({ isGroup: false }),
);
expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-d1" });
});
it("preserves provider-native group ids when sending to raw g- targets", async () => {
mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-g-native" });
const result = await zalouserPlugin.outbound!.sendPayload!({
...baseCtx({ text: "hello native group" }),
to: "g-1471383327500481391",
});
expect(mockedSend).toHaveBeenCalledWith(
"g-1471383327500481391",
"hello native group",
expect.objectContaining({ isGroup: true }),
);
expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-g-native" });
});
it("multi-media iterates URLs with caption on first", async () => {
mockedSend
.mockResolvedValueOnce({ ok: true, messageId: "zlu-1" })
@@ -115,3 +163,31 @@ describe("zalouserPlugin outbound sendPayload", () => {
expect(result).toMatchObject({ channel: "zalouser" });
});
});
describe("zalouserPlugin messaging target normalization", () => {
it("normalizes user/group aliases to canonical targets", () => {
const normalize = zalouserPlugin.messaging?.normalizeTarget;
expect(normalize).toBeTypeOf("function");
if (!normalize) {
return;
}
expect(normalize("zlu:g:30003")).toBe("group:30003");
expect(normalize("zalouser:u:20002")).toBe("user:20002");
expect(normalize("zlu:g-30003")).toBe("group:g-30003");
expect(normalize("zalouser:u-20002")).toBe("user:u-20002");
expect(normalize("20002")).toBe("20002");
});
it("treats canonical and provider-native user/group targets as ids", () => {
const looksLikeId = zalouserPlugin.messaging?.targetResolver?.looksLikeId;
expect(looksLikeId).toBeTypeOf("function");
if (!looksLikeId) {
return;
}
expect(looksLikeId("user:20002")).toBe(true);
expect(looksLikeId("group:30003")).toBe(true);
expect(looksLikeId("g-30003")).toBe(true);
expect(looksLikeId("u-20002")).toBe(true);
expect(looksLikeId("Alice Nguyen")).toBe(false);
});
});

View File

@@ -66,6 +66,97 @@ const meta = {
quickstartAllowFrom: true,
};
function stripZalouserTargetPrefix(raw: string): string {
return raw
.trim()
.replace(/^(zalouser|zlu):/i, "")
.trim();
}
function normalizePrefixedTarget(raw: string): string | undefined {
const trimmed = stripZalouserTargetPrefix(raw);
if (!trimmed) {
return undefined;
}
const lower = trimmed.toLowerCase();
if (lower.startsWith("group:")) {
const id = trimmed.slice("group:".length).trim();
return id ? `group:${id}` : undefined;
}
if (lower.startsWith("g:")) {
const id = trimmed.slice("g:".length).trim();
return id ? `group:${id}` : undefined;
}
if (lower.startsWith("user:")) {
const id = trimmed.slice("user:".length).trim();
return id ? `user:${id}` : undefined;
}
if (lower.startsWith("dm:")) {
const id = trimmed.slice("dm:".length).trim();
return id ? `user:${id}` : undefined;
}
if (lower.startsWith("u:")) {
const id = trimmed.slice("u:".length).trim();
return id ? `user:${id}` : undefined;
}
if (/^g-\S+$/i.test(trimmed)) {
return `group:${trimmed}`;
}
if (/^u-\S+$/i.test(trimmed)) {
return `user:${trimmed}`;
}
return trimmed;
}
function parseZalouserOutboundTarget(raw: string): {
threadId: string;
isGroup: boolean;
} {
const normalized = normalizePrefixedTarget(raw);
if (!normalized) {
throw new Error("Zalouser target is required");
}
const lowered = normalized.toLowerCase();
if (lowered.startsWith("group:")) {
const threadId = normalized.slice("group:".length).trim();
if (!threadId) {
throw new Error("Zalouser group target is missing group id");
}
return { threadId, isGroup: true };
}
if (lowered.startsWith("user:")) {
const threadId = normalized.slice("user:".length).trim();
if (!threadId) {
throw new Error("Zalouser user target is missing user id");
}
return { threadId, isGroup: false };
}
// Backward-compatible fallback for bare IDs.
// Group sends should use explicit `group:<id>` targets.
return { threadId: normalized, isGroup: false };
}
function parseZalouserDirectoryGroupId(raw: string): string {
const normalized = normalizePrefixedTarget(raw);
if (!normalized) {
throw new Error("Zalouser group target is required");
}
const lowered = normalized.toLowerCase();
if (lowered.startsWith("group:")) {
const groupId = normalized.slice("group:".length).trim();
if (!groupId) {
throw new Error("Zalouser group target is missing group id");
}
return groupId;
}
if (lowered.startsWith("user:")) {
throw new Error("Zalouser group members lookup requires a group target (group:<id>)");
}
return normalized;
}
function resolveZalouserQrProfile(accountId?: string | null): string {
const normalized = normalizeAccountId(accountId);
if (!normalized || normalized === DEFAULT_ACCOUNT_ID) {
@@ -261,6 +352,8 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
"name",
"dmPolicy",
"allowFrom",
"historyLimit",
"groupAllowFrom",
"groupPolicy",
"groups",
"messagePrefix",
@@ -333,16 +426,19 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
},
},
messaging: {
normalizeTarget: (raw) => {
const trimmed = raw?.trim();
if (!trimmed) {
return undefined;
}
return trimmed.replace(/^(zalouser|zlu):/i, "");
},
normalizeTarget: (raw) => normalizePrefixedTarget(raw),
targetResolver: {
looksLikeId: isNumericTargetId,
hint: "<threadId>",
looksLikeId: (raw) => {
const normalized = normalizePrefixedTarget(raw);
if (!normalized) {
return false;
}
if (/^group:[^\s]+$/i.test(normalized) || /^user:[^\s]+$/i.test(normalized)) {
return true;
}
return isNumericTargetId(normalized);
},
hint: "<user:id|group:id>",
},
},
directory: {
@@ -377,7 +473,7 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
const groups = await listZaloGroupsMatching(account.profile, query);
const rows = groups.map((group) =>
mapGroup({
id: String(group.groupId),
id: `group:${String(group.groupId)}`,
name: group.name ?? null,
raw: group,
}),
@@ -386,7 +482,8 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
},
listGroupMembers: async ({ cfg, accountId, groupId, limit }) => {
const account = resolveZalouserAccountSync({ cfg: cfg, accountId });
const members = await listZaloGroupMembers(account.profile, groupId);
const normalizedGroupId = parseZalouserDirectoryGroupId(groupId);
const members = await listZaloGroupMembers(account.profile, normalizedGroupId);
const rows = members.map((member) =>
mapUser({
id: member.userId,
@@ -511,13 +608,19 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
}),
sendText: async ({ to, text, accountId, cfg }) => {
const account = resolveZalouserAccountSync({ cfg: cfg, accountId });
const result = await sendMessageZalouser(to, text, { profile: account.profile });
const target = parseZalouserOutboundTarget(to);
const result = await sendMessageZalouser(target.threadId, text, {
profile: account.profile,
isGroup: target.isGroup,
});
return buildChannelSendResult("zalouser", result);
},
sendMedia: async ({ to, text, mediaUrl, accountId, cfg, mediaLocalRoots }) => {
const account = resolveZalouserAccountSync({ cfg: cfg, accountId });
const result = await sendMessageZalouser(to, text, {
const target = parseZalouserOutboundTarget(to);
const result = await sendMessageZalouser(target.threadId, text, {
profile: account.profile,
isGroup: target.isGroup,
mediaUrl,
mediaLocalRoots,
});

View File

@@ -17,6 +17,8 @@ const zalouserAccountSchema = z.object({
profile: z.string().optional(),
dmPolicy: z.enum(["pairing", "allowlist", "open", "disabled"]).optional(),
allowFrom: z.array(allowFromEntry).optional(),
historyLimit: z.number().int().min(0).optional(),
groupAllowFrom: z.array(allowFromEntry).optional(),
groupPolicy: z.enum(["disabled", "allowlist", "open"]).optional(),
groups: z.object({}).catchall(groupConfigSchema).optional(),
messagePrefix: z.string().optional(),

View File

@@ -49,11 +49,65 @@ function createRuntimeEnv(): RuntimeEnv {
};
}
function installRuntime(params: { commandAuthorized: boolean }) {
function installRuntime(params: {
commandAuthorized?: boolean;
resolveCommandAuthorizedFromAuthorizers?: (params: {
useAccessGroups: boolean;
authorizers: Array<{ configured: boolean; allowed: boolean }>;
}) => boolean;
}) {
const dispatchReplyWithBufferedBlockDispatcher = vi.fn(async ({ dispatcherOptions, ctx }) => {
await dispatcherOptions.typingCallbacks?.onReplyStart?.();
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 }, ctx };
});
const resolveCommandAuthorizedFromAuthorizers = vi.fn(
(input: {
useAccessGroups: boolean;
authorizers: Array<{ configured: boolean; allowed: boolean }>;
}) => {
if (params.resolveCommandAuthorizedFromAuthorizers) {
return params.resolveCommandAuthorizedFromAuthorizers(input);
}
return params.commandAuthorized ?? false;
},
);
const resolveAgentRoute = vi.fn((input: { peer?: { kind?: string; id?: string } }) => {
const peerKind = input.peer?.kind === "direct" ? "direct" : "group";
const peerId = input.peer?.id ?? "1";
return {
agentId: "main",
sessionKey:
peerKind === "direct" ? "agent:main:main" : `agent:main:zalouser:${peerKind}:${peerId}`,
accountId: "default",
mainSessionKey: "agent:main:main",
};
});
const readAllowFromStore = vi.fn(async () => []);
const readSessionUpdatedAt = vi.fn(() => undefined);
const buildAgentSessionKey = vi.fn(
(input: {
agentId: string;
channel: string;
accountId?: string;
peer?: { kind?: string; id?: string };
dmScope?: string;
}) => {
const peerKind = input.peer?.kind === "direct" ? "direct" : "group";
const peerId = input.peer?.id ?? "1";
if (peerKind === "direct") {
if (input.dmScope === "per-account-channel-peer") {
return `agent:${input.agentId}:${input.channel}:${input.accountId ?? "default"}:direct:${peerId}`;
}
if (input.dmScope === "per-peer") {
return `agent:${input.agentId}:direct:${peerId}`;
}
if (input.dmScope === "main" || !input.dmScope) {
return "agent:main:main";
}
}
return `agent:${input.agentId}:${input.channel}:${peerKind}:${peerId}`;
},
);
setZalouserRuntime({
logging: {
@@ -61,13 +115,13 @@ function installRuntime(params: { commandAuthorized: boolean }) {
},
channel: {
pairing: {
readAllowFromStore: vi.fn(async () => []),
readAllowFromStore,
upsertPairingRequest: vi.fn(async () => ({ code: "PAIR", created: true })),
buildPairingReply: vi.fn(() => "pair"),
},
commands: {
shouldComputeCommandAuthorized: vi.fn((body: string) => body.trim().startsWith("/")),
resolveCommandAuthorizedFromAuthorizers: vi.fn(() => params.commandAuthorized),
resolveCommandAuthorizedFromAuthorizers,
isControlCommandMessage: vi.fn((body: string) => body.trim().startsWith("/")),
shouldHandleTextCommands: vi.fn(() => true),
},
@@ -93,16 +147,12 @@ function installRuntime(params: { commandAuthorized: boolean }) {
}),
},
routing: {
resolveAgentRoute: vi.fn(() => ({
agentId: "main",
sessionKey: "agent:main:zalouser:group:1",
accountId: "default",
mainSessionKey: "agent:main:main",
})),
buildAgentSessionKey,
resolveAgentRoute,
},
session: {
resolveStorePath: vi.fn(() => "/tmp"),
readSessionUpdatedAt: vi.fn(() => undefined),
readSessionUpdatedAt,
recordInboundSession: vi.fn(async () => {}),
},
reply: {
@@ -120,7 +170,14 @@ function installRuntime(params: { commandAuthorized: boolean }) {
},
} as unknown as PluginRuntime);
return { dispatchReplyWithBufferedBlockDispatcher };
return {
dispatchReplyWithBufferedBlockDispatcher,
resolveAgentRoute,
resolveCommandAuthorizedFromAuthorizers,
readAllowFromStore,
readSessionUpdatedAt,
buildAgentSessionKey,
};
}
function createGroupMessage(overrides: Partial<ZaloInboundMessage> = {}): ZaloInboundMessage {
@@ -142,6 +199,21 @@ function createGroupMessage(overrides: Partial<ZaloInboundMessage> = {}): ZaloIn
};
}
function createDmMessage(overrides: Partial<ZaloInboundMessage> = {}): ZaloInboundMessage {
return {
threadId: "u-1",
isGroup: false,
senderId: "321",
senderName: "Bob",
groupName: undefined,
content: "hello",
timestampMs: Date.now(),
msgId: "dm-1",
raw: { source: "test" },
...overrides,
};
}
describe("zalouser monitor group mention gating", () => {
beforeEach(() => {
sendMessageZalouserMock.mockClear();
@@ -165,6 +237,25 @@ describe("zalouser monitor group mention gating", () => {
expect(sendTypingZalouserMock).not.toHaveBeenCalled();
});
it("fails closed when requireMention=true but mention detection is unavailable", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: false,
});
await __testing.processMessage({
message: createGroupMessage({
canResolveExplicitMention: false,
hasAnyMention: false,
wasExplicitlyMentioned: false,
}),
account: createAccount(),
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled();
expect(sendTypingZalouserMock).not.toHaveBeenCalled();
});
it("dispatches explicitly-mentioned group messages and marks WasMentioned", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: false,
@@ -183,6 +274,8 @@ describe("zalouser monitor group mention gating", () => {
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.WasMentioned).toBe(true);
expect(callArg?.ctx?.To).toBe("zalouser:group:g-1");
expect(callArg?.ctx?.OriginatingTo).toBe("zalouser:group:g-1");
expect(sendTypingZalouserMock).toHaveBeenCalledWith("g-1", {
profile: "default",
isGroup: true,
@@ -208,4 +301,277 @@ describe("zalouser monitor group mention gating", () => {
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.WasMentioned).toBe(true);
});
it("uses commandContent for mention-prefixed control commands", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: true,
});
await __testing.processMessage({
message: createGroupMessage({
content: "@Bot /new",
commandContent: "/new",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: createAccount(),
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.CommandBody).toBe("/new");
expect(callArg?.ctx?.BodyForCommands).toBe("/new");
});
it("allows group control commands when only allowFrom is configured", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, resolveCommandAuthorizedFromAuthorizers } =
installRuntime({
resolveCommandAuthorizedFromAuthorizers: ({ useAccessGroups, authorizers }) =>
useAccessGroups && authorizers.some((entry) => entry.configured && entry.allowed),
});
await __testing.processMessage({
message: createGroupMessage({
content: "/new",
commandContent: "/new",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: {
...createAccount(),
config: {
...createAccount().config,
allowFrom: ["123"],
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const authCall = resolveCommandAuthorizedFromAuthorizers.mock.calls[0]?.[0];
expect(authCall?.authorizers).toEqual([
{ configured: true, allowed: true },
{ configured: true, allowed: true },
]);
});
it("blocks group messages when sender is not in groupAllowFrom/allowFrom", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: false,
});
await __testing.processMessage({
message: createGroupMessage({
content: "ping @bot",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: {
...createAccount(),
config: {
...createAccount().config,
groupPolicy: "allowlist",
allowFrom: ["999"],
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled();
});
it("allows group control commands when sender is in groupAllowFrom", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, resolveCommandAuthorizedFromAuthorizers } =
installRuntime({
resolveCommandAuthorizedFromAuthorizers: ({ useAccessGroups, authorizers }) =>
useAccessGroups && authorizers.some((entry) => entry.configured && entry.allowed),
});
await __testing.processMessage({
message: createGroupMessage({
content: "/new",
commandContent: "/new",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: {
...createAccount(),
config: {
...createAccount().config,
allowFrom: ["999"],
groupAllowFrom: ["123"],
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const authCall = resolveCommandAuthorizedFromAuthorizers.mock.calls[0]?.[0];
expect(authCall?.authorizers).toEqual([
{ configured: true, allowed: false },
{ configured: true, allowed: true },
]);
});
it("routes DM messages with direct peer kind", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, resolveAgentRoute, buildAgentSessionKey } =
installRuntime({
commandAuthorized: false,
});
const account = createAccount();
await __testing.processMessage({
message: createDmMessage(),
account: {
...account,
config: {
...account.config,
dmPolicy: "open",
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(resolveAgentRoute).toHaveBeenCalledWith(
expect.objectContaining({
peer: { kind: "direct", id: "321" },
}),
);
expect(buildAgentSessionKey).toHaveBeenCalledWith(
expect.objectContaining({
peer: { kind: "direct", id: "321" },
dmScope: "per-channel-peer",
}),
);
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.SessionKey).toBe("agent:main:zalouser:direct:321");
});
it("reuses the legacy DM session key when only the old group-shaped session exists", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, readSessionUpdatedAt } = installRuntime({
commandAuthorized: false,
});
readSessionUpdatedAt.mockImplementation(({ sessionKey }: { sessionKey: string }) =>
sessionKey === "agent:main:zalouser:group:321" ? 123 : undefined,
);
const account = createAccount();
await __testing.processMessage({
message: createDmMessage(),
account: {
...account,
config: {
...account.config,
dmPolicy: "open",
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.SessionKey).toBe("agent:main:zalouser:group:321");
});
it("reads pairing store for open DM control commands", async () => {
const { readAllowFromStore } = installRuntime({
commandAuthorized: false,
});
const account = createAccount();
await __testing.processMessage({
message: createDmMessage({ content: "/new", commandContent: "/new" }),
account: {
...account,
config: {
...account.config,
dmPolicy: "open",
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(readAllowFromStore).toHaveBeenCalledTimes(1);
});
it("skips pairing store read for open DM non-command messages", async () => {
const { readAllowFromStore } = installRuntime({
commandAuthorized: false,
});
const account = createAccount();
await __testing.processMessage({
message: createDmMessage({ content: "hello there" }),
account: {
...account,
config: {
...account.config,
dmPolicy: "open",
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(readAllowFromStore).not.toHaveBeenCalled();
});
it("includes skipped group messages as InboundHistory on the next processed message", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: false,
});
const historyState = {
historyLimit: 5,
groupHistories: new Map<
string,
Array<{ sender: string; body: string; timestamp?: number; messageId?: string }>
>(),
};
const account = createAccount();
const config = createConfig();
await __testing.processMessage({
message: createGroupMessage({
content: "first unmentioned line",
hasAnyMention: false,
wasExplicitlyMentioned: false,
}),
account,
config,
runtime: createRuntimeEnv(),
historyState,
});
expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled();
await __testing.processMessage({
message: createGroupMessage({
content: "second line @bot",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account,
config,
runtime: createRuntimeEnv(),
historyState,
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const firstDispatch = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(firstDispatch?.ctx?.InboundHistory).toEqual([
expect.objectContaining({ sender: "Alice", body: "first unmentioned line" }),
]);
expect(String(firstDispatch?.ctx?.Body ?? "")).toContain("first unmentioned line");
await __testing.processMessage({
message: createGroupMessage({
content: "third line @bot",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account,
config,
runtime: createRuntimeEnv(),
historyState,
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(2);
const secondDispatch = dispatchReplyWithBufferedBlockDispatcher.mock.calls[1]?.[0];
expect(secondDispatch?.ctx?.InboundHistory).toEqual([]);
});
});

View File

@@ -1,3 +1,13 @@
import {
DM_GROUP_ACCESS_REASON,
DEFAULT_GROUP_HISTORY_LIMIT,
type HistoryEntry,
KeyedAsyncQueue,
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
recordPendingHistoryEntryIfEnabled,
resolveDmGroupAccessWithLists,
} from "openclaw/plugin-sdk/compat";
import type {
MarkdownTableMode,
OpenClawConfig,
@@ -73,8 +83,111 @@ function buildNameIndex<T>(items: T[], nameFn: (item: T) => string | undefined):
return index;
}
function resolveUserAllowlistEntries(
entries: string[],
byName: Map<string, Array<{ userId: string }>>,
): {
additions: string[];
mapping: string[];
unresolved: string[];
} {
const additions: string[] = [];
const mapping: string[] = [];
const unresolved: string[] = [];
for (const entry of entries) {
if (/^\d+$/.test(entry)) {
additions.push(entry);
continue;
}
const matches = byName.get(entry.toLowerCase()) ?? [];
const match = matches[0];
const id = match?.userId ? String(match.userId) : undefined;
if (id) {
additions.push(id);
mapping.push(`${entry}->${id}`);
} else {
unresolved.push(entry);
}
}
return { additions, mapping, unresolved };
}
type ZalouserCoreRuntime = ReturnType<typeof getZalouserRuntime>;
type ZalouserGroupHistoryState = {
historyLimit: number;
groupHistories: Map<string, HistoryEntry[]>;
};
function resolveInboundQueueKey(message: ZaloInboundMessage): string {
const threadId = message.threadId?.trim() || "unknown";
if (message.isGroup) {
return `group:${threadId}`;
}
const senderId = message.senderId?.trim();
return `direct:${senderId || threadId}`;
}
function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
function resolveZalouserDmSessionScope(config: OpenClawConfig) {
const configured = config.session?.dmScope;
return configured === "main" || !configured ? "per-channel-peer" : configured;
}
function resolveZalouserInboundSessionKey(params: {
core: ZalouserCoreRuntime;
config: OpenClawConfig;
route: { agentId: string; accountId: string; sessionKey: string };
storePath: string;
isGroup: boolean;
senderId: string;
}): string {
if (params.isGroup) {
return params.route.sessionKey;
}
const directSessionKey = params.core.channel.routing
.buildAgentSessionKey({
agentId: params.route.agentId,
channel: "zalouser",
accountId: params.route.accountId,
peer: { kind: "direct", id: params.senderId },
dmScope: resolveZalouserDmSessionScope(params.config),
identityLinks: params.config.session?.identityLinks,
})
.toLowerCase();
const legacySessionKey = params.core.channel.routing
.buildAgentSessionKey({
agentId: params.route.agentId,
channel: "zalouser",
accountId: params.route.accountId,
peer: { kind: "group", id: params.senderId },
})
.toLowerCase();
const hasDirectSession =
params.core.channel.session.readSessionUpdatedAt({
storePath: params.storePath,
sessionKey: directSessionKey,
}) !== undefined;
const hasLegacySession =
params.core.channel.session.readSessionUpdatedAt({
storePath: params.storePath,
sessionKey: legacySessionKey,
}) !== undefined;
// Keep existing DM history on upgrade, but use canonical direct keys for new sessions.
return hasLegacySession && !hasDirectSession ? legacySessionKey : directSessionKey;
}
function logVerbose(core: ZalouserCoreRuntime, runtime: RuntimeEnv, message: string): void {
if (core.logging.shouldLogVerbose()) {
runtime.log(`[zalouser] ${message}`);
@@ -139,6 +252,7 @@ async function processMessage(
config: OpenClawConfig,
core: ZalouserCoreRuntime,
runtime: RuntimeEnv,
historyState: ZalouserGroupHistoryState,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
): Promise<void> {
const pairing = createScopedPairingAccess({
@@ -151,6 +265,7 @@ async function processMessage(
if (!rawBody) {
return;
}
const commandBody = message.commandContent?.trim() || rawBody;
const isGroup = message.isGroup;
const chatId = message.threadId;
@@ -237,65 +352,90 @@ async function processMessage(
const dmPolicy = account.config.dmPolicy ?? "pairing";
const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v));
const { senderAllowedForCommands, commandAuthorized } = await resolveSenderCommandAuthorization({
const configGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v));
const shouldComputeCommandAuth = core.channel.commands.shouldComputeCommandAuthorized(
commandBody,
config,
);
const storeAllowFrom =
!isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeCommandAuth)
? await pairing.readAllowFromStore().catch(() => [])
: [];
const accessDecision = resolveDmGroupAccessWithLists({
isGroup,
dmPolicy,
groupPolicy,
allowFrom: configAllowFrom,
groupAllowFrom: configGroupAllowFrom,
storeAllowFrom,
isSenderAllowed: (allowFrom) => isSenderAllowed(senderId, allowFrom),
});
if (isGroup && accessDecision.decision !== "allow") {
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) {
logVerbose(core, runtime, "Blocked zalouser group message (no group allowlist)");
} else if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) {
logVerbose(
core,
runtime,
`Blocked zalouser sender ${senderId} (not in groupAllowFrom/allowFrom)`,
);
}
return;
}
if (!isGroup && accessDecision.decision !== "allow") {
if (accessDecision.decision === "pairing") {
await issuePairingChallenge({
channel: "zalouser",
senderId,
senderIdLine: `Your Zalo user id: ${senderId}`,
meta: { name: senderName || undefined },
upsertPairingRequest: pairing.upsertPairingRequest,
onCreated: () => {
logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`);
},
sendPairingReply: async (text) => {
await sendMessageZalouser(chatId, text, { profile: account.profile });
statusSink?.({ lastOutboundAt: Date.now() });
},
onReplyError: (err) => {
logVerbose(
core,
runtime,
`zalouser pairing reply failed for ${senderId}: ${String(err)}`,
);
},
});
return;
}
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) {
logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
} else {
logVerbose(
core,
runtime,
`Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
const { commandAuthorized } = await resolveSenderCommandAuthorization({
cfg: config,
rawBody,
rawBody: commandBody,
isGroup,
dmPolicy,
configuredAllowFrom: configAllowFrom,
configuredGroupAllowFrom: configGroupAllowFrom,
senderId,
isSenderAllowed,
readAllowFromStore: pairing.readAllowFromStore,
readAllowFromStore: async () => storeAllowFrom,
shouldComputeCommandAuthorized: (body, cfg) =>
core.channel.commands.shouldComputeCommandAuthorized(body, cfg),
resolveCommandAuthorizedFromAuthorizers: (params) =>
core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params),
});
if (!isGroup) {
if (dmPolicy === "disabled") {
logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
return;
}
if (dmPolicy !== "open") {
const allowed = senderAllowedForCommands;
if (!allowed) {
if (dmPolicy === "pairing") {
await issuePairingChallenge({
channel: "zalouser",
senderId,
senderIdLine: `Your Zalo user id: ${senderId}`,
meta: { name: senderName || undefined },
upsertPairingRequest: pairing.upsertPairingRequest,
onCreated: () => {
logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`);
},
sendPairingReply: async (text) => {
await sendMessageZalouser(chatId, text, { profile: account.profile });
statusSink?.({ lastOutboundAt: Date.now() });
},
onReplyError: (err) => {
logVerbose(
core,
runtime,
`zalouser pairing reply failed for ${senderId}: ${String(err)}`,
);
},
});
} else {
logVerbose(
core,
runtime,
`Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
}
}
const hasControlCommand = core.channel.commands.isControlCommandMessage(rawBody, config);
const hasControlCommand = core.channel.commands.isControlCommandMessage(commandBody, config);
if (isGroup && hasControlCommand && commandAuthorized !== true) {
logVerbose(
core,
@@ -307,18 +447,19 @@ async function processMessage(
const peer = isGroup
? { kind: "group" as const, id: chatId }
: { kind: "group" as const, id: senderId };
: { kind: "direct" as const, id: senderId };
const route = core.channel.routing.resolveAgentRoute({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
peer: {
// Use "group" kind to avoid dmScope=main collapsing all DMs into the main session.
// Keep DM peer kind as "direct" so session keys follow dmScope and UI labels stay DM-shaped.
kind: peer.kind,
id: peer.id,
},
});
const historyKey = isGroup ? route.sessionKey : undefined;
const requireMention = isGroup
? resolveGroupRequireMention({
@@ -340,10 +481,11 @@ async function processMessage(
explicit: explicitMention,
})
: true;
const canDetectMention = mentionRegexes.length > 0 || explicitMention.canResolveExplicit;
const mentionGate = resolveMentionGatingWithBypass({
isGroup,
requireMention,
canDetectMention: mentionRegexes.length > 0 || explicitMention.canResolveExplicit,
canDetectMention,
wasMentioned,
implicitMention: message.implicitMention === true,
hasAnyMention: explicitMention.hasAnyMention,
@@ -354,7 +496,32 @@ async function processMessage(
hasControlCommand,
commandAuthorized: commandAuthorized === true,
});
if (isGroup && requireMention && !canDetectMention && !mentionGate.effectiveWasMentioned) {
runtime.error?.(
`[${account.accountId}] zalouser mention required but detection unavailable ` +
`(missing mention regexes and bot self id); dropping group ${chatId}`,
);
return;
}
if (isGroup && mentionGate.shouldSkip) {
recordPendingHistoryEntryIfEnabled({
historyMap: historyState.groupHistories,
historyKey: historyKey ?? "",
limit: historyState.historyLimit,
entry:
historyKey && rawBody
? {
sender: senderName || senderId,
body: rawBody,
timestamp: message.timestampMs,
messageId: resolveZalouserMessageSid({
msgId: message.msgId,
cliMsgId: message.cliMsgId,
fallback: `${message.timestampMs}`,
}),
}
: null,
});
logVerbose(core, runtime, `zalouser: skip group ${chatId} (mention required, not mentioned)`);
return;
}
@@ -363,10 +530,18 @@ async function processMessage(
const storePath = core.channel.session.resolveStorePath(config.session?.store, {
agentId: route.agentId,
});
const inboundSessionKey = resolveZalouserInboundSessionKey({
core,
config,
route,
storePath,
isGroup,
senderId,
});
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config);
const previousTimestamp = core.channel.session.readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
sessionKey: inboundSessionKey,
});
const body = core.channel.reply.formatAgentEnvelope({
channel: "Zalo Personal",
@@ -376,15 +551,46 @@ async function processMessage(
envelope: envelopeOptions,
body: rawBody,
});
const combinedBody =
isGroup && historyKey
? buildPendingHistoryContextFromMap({
historyMap: historyState.groupHistories,
historyKey,
limit: historyState.historyLimit,
currentMessage: body,
formatEntry: (entry) =>
core.channel.reply.formatAgentEnvelope({
channel: "Zalo Personal",
from: fromLabel,
timestamp: entry.timestamp,
envelope: envelopeOptions,
body: `${entry.sender}: ${entry.body}${
entry.messageId ? ` [id:${entry.messageId}]` : ""
}`,
}),
})
: body;
const inboundHistory =
isGroup && historyKey && historyState.historyLimit > 0
? (historyState.groupHistories.get(historyKey) ?? []).map((entry) => ({
sender: entry.sender,
body: entry.body,
timestamp: entry.timestamp,
}))
: undefined;
const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`;
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
Body: combinedBody,
BodyForAgent: rawBody,
InboundHistory: inboundHistory,
RawBody: rawBody,
CommandBody: rawBody,
CommandBody: commandBody,
BodyForCommands: commandBody,
From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`,
To: `zalouser:${chatId}`,
SessionKey: route.sessionKey,
To: normalizedTo,
SessionKey: inboundSessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
@@ -407,7 +613,7 @@ async function processMessage(
cliMsgId: message.cliMsgId,
}),
OriginatingChannel: "zalouser",
OriginatingTo: `zalouser:${chatId}`,
OriginatingTo: normalizedTo,
});
await core.channel.session.recordInboundSession({
@@ -433,6 +639,9 @@ async function processMessage(
});
},
onStartError: (err) => {
runtime.error?.(
`[${account.accountId}] zalouser typing start failed for ${chatId}: ${String(err)}`,
);
logVerbose(core, runtime, `zalouser typing failed for ${chatId}: ${String(err)}`);
},
});
@@ -469,6 +678,13 @@ async function processMessage(
onModelSelected,
},
});
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: historyState.groupHistories,
historyKey,
limit: historyState.historyLimit,
});
}
}
async function deliverZalouserReply(params: {
@@ -534,43 +750,60 @@ export async function monitorZalouserProvider(
const { abortSignal, statusSink, runtime } = options;
const core = getZalouserRuntime();
const inboundQueue = new KeyedAsyncQueue();
const historyLimit = Math.max(
0,
account.config.historyLimit ??
config.messages?.groupChat?.historyLimit ??
DEFAULT_GROUP_HISTORY_LIMIT,
);
const groupHistories = new Map<string, HistoryEntry[]>();
try {
const profile = account.profile;
const allowFromEntries = (account.config.allowFrom ?? [])
.map((entry) => normalizeZalouserEntry(String(entry)))
.filter((entry) => entry && entry !== "*");
const groupAllowFromEntries = (account.config.groupAllowFrom ?? [])
.map((entry) => normalizeZalouserEntry(String(entry)))
.filter((entry) => entry && entry !== "*");
if (allowFromEntries.length > 0) {
if (allowFromEntries.length > 0 || groupAllowFromEntries.length > 0) {
const friends = await listZaloFriends(profile);
const byName = buildNameIndex(friends, (friend) => friend.displayName);
const additions: string[] = [];
const mapping: string[] = [];
const unresolved: string[] = [];
for (const entry of allowFromEntries) {
if (/^\d+$/.test(entry)) {
additions.push(entry);
continue;
}
const matches = byName.get(entry.toLowerCase()) ?? [];
const match = matches[0];
const id = match?.userId ? String(match.userId) : undefined;
if (id) {
additions.push(id);
mapping.push(`${entry}${id}`);
} else {
unresolved.push(entry);
}
if (allowFromEntries.length > 0) {
const { additions, mapping, unresolved } = resolveUserAllowlistEntries(
allowFromEntries,
byName,
);
const allowFrom = mergeAllowlist({ existing: account.config.allowFrom, additions });
account = {
...account,
config: {
...account.config,
allowFrom,
},
};
summarizeMapping("zalouser users", mapping, unresolved, runtime);
}
if (groupAllowFromEntries.length > 0) {
const { additions, mapping, unresolved } = resolveUserAllowlistEntries(
groupAllowFromEntries,
byName,
);
const groupAllowFrom = mergeAllowlist({
existing: account.config.groupAllowFrom,
additions,
});
account = {
...account,
config: {
...account.config,
groupAllowFrom,
},
};
summarizeMapping("zalouser group users", mapping, unresolved, runtime);
}
const allowFrom = mergeAllowlist({ existing: account.config.allowFrom, additions });
account = {
...account,
config: {
...account.config,
allowFrom,
},
};
summarizeMapping("zalouser users", mapping, unresolved, runtime);
}
const groupsConfig = account.config.groups ?? {};
@@ -627,40 +860,92 @@ export async function monitorZalouserProvider(
listenerStop = null;
};
const listener = await startZaloListener({
accountId: account.accountId,
profile: account.profile,
abortSignal,
onMessage: (msg) => {
if (stopped) {
return;
}
logVerbose(core, runtime, `[${account.accountId}] inbound message`);
statusSink?.({ lastInboundAt: Date.now() });
processMessage(msg, account, config, core, runtime, statusSink).catch((err) => {
runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`);
});
},
onError: (err) => {
if (stopped || abortSignal.aborted) {
return;
}
runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`);
},
});
let settled = false;
const { promise: waitForExit, resolve: resolveRun, reject: rejectRun } = createDeferred<void>();
const settleSuccess = () => {
if (settled) {
return;
}
settled = true;
stop();
resolveRun();
};
const settleFailure = (error: unknown) => {
if (settled) {
return;
}
settled = true;
stop();
rejectRun(error instanceof Error ? error : new Error(String(error)));
};
const onAbort = () => {
settleSuccess();
};
abortSignal.addEventListener("abort", onAbort, { once: true });
let listener: Awaited<ReturnType<typeof startZaloListener>>;
try {
listener = await startZaloListener({
accountId: account.accountId,
profile: account.profile,
abortSignal,
onMessage: (msg) => {
if (stopped) {
return;
}
logVerbose(core, runtime, `[${account.accountId}] inbound message`);
statusSink?.({ lastInboundAt: Date.now() });
const queueKey = resolveInboundQueueKey(msg);
void inboundQueue
.enqueue(queueKey, async () => {
if (stopped || abortSignal.aborted) {
return;
}
await processMessage(
msg,
account,
config,
core,
runtime,
{ historyLimit, groupHistories },
statusSink,
);
})
.catch((err) => {
runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`);
});
},
onError: (err) => {
if (stopped || abortSignal.aborted) {
return;
}
runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`);
settleFailure(err);
},
});
} catch (error) {
abortSignal.removeEventListener("abort", onAbort);
throw error;
}
listenerStop = listener.stop;
if (stopped) {
listenerStop();
listenerStop = null;
}
await new Promise<void>((resolve) => {
abortSignal.addEventListener(
"abort",
() => {
stop();
resolve();
},
{ once: true },
);
});
if (abortSignal.aborted) {
settleSuccess();
}
try {
await waitForExit;
} finally {
abortSignal.removeEventListener("abort", onAbort);
}
return { stop };
}
@@ -671,14 +956,27 @@ export const __testing = {
account: ResolvedZalouserAccount;
config: OpenClawConfig;
runtime: RuntimeEnv;
historyState?: {
historyLimit?: number;
groupHistories?: Map<string, HistoryEntry[]>;
};
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
}) => {
const historyLimit = Math.max(
0,
params.historyState?.historyLimit ??
params.account.config.historyLimit ??
params.config.messages?.groupChat?.historyLimit ??
DEFAULT_GROUP_HISTORY_LIMIT,
);
const groupHistories = params.historyState?.groupHistories ?? new Map<string, HistoryEntry[]>();
await processMessage(
params.message,
params.account,
params.config,
getZalouserRuntime(),
params.runtime,
{ historyLimit, groupHistories },
params.statusSink,
);
},

View File

@@ -35,6 +35,7 @@ export type ZaloInboundMessage = {
senderName?: string;
groupName?: string;
content: string;
commandContent?: string;
timestampMs: number;
msgId?: string;
cliMsgId?: string;
@@ -92,6 +93,8 @@ type ZalouserSharedConfig = {
profile?: string;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
allowFrom?: Array<string | number>;
historyLimit?: number;
groupAllowFrom?: Array<string | number>;
groupPolicy?: "open" | "allowlist" | "disabled";
groups?: Record<string, ZalouserGroupConfig>;
messagePrefix?: string;

View File

@@ -37,6 +37,8 @@ const DEFAULT_QR_WAIT_TIMEOUT_MS = 120_000;
const GROUP_INFO_CHUNK_SIZE = 80;
const GROUP_CONTEXT_CACHE_TTL_MS = 5 * 60_000;
const GROUP_CONTEXT_CACHE_MAX_ENTRIES = 500;
const LISTENER_WATCHDOG_INTERVAL_MS = 30_000;
const LISTENER_WATCHDOG_MAX_GAP_MS = 35_000;
const apiByProfile = new Map<string, API>();
const apiInitByProfile = new Map<string, Promise<API>>();
@@ -63,6 +65,8 @@ type ActiveZaloListener = {
const activeListeners = new Map<string, ActiveZaloListener>();
const groupContextCache = new Map<string, { value: ZaloGroupContext; expiresAt: number }>();
type AccountInfoResponse = Awaited<ReturnType<API["fetchAccountInfo"]>>;
type ApiTypingCapability = {
sendTypingEvent: (
threadId: string,
@@ -155,6 +159,20 @@ function toStringValue(value: unknown): string {
return "";
}
function normalizeAccountInfoUser(info: AccountInfoResponse): User | null {
if (!info || typeof info !== "object") {
return null;
}
if ("profile" in info) {
const profile = (info as { profile?: unknown }).profile;
if (profile && typeof profile === "object") {
return profile as User;
}
return null;
}
return info as User;
}
function toInteger(value: unknown, fallback = 0): number {
if (typeof value === "number" && Number.isFinite(value)) {
return Math.trunc(value);
@@ -199,18 +217,128 @@ function resolveInboundTimestamp(rawTs: unknown): number {
return parsed > 1_000_000_000_000 ? parsed : parsed * 1000;
}
function extractMentionIds(raw: unknown): string[] {
if (!Array.isArray(raw)) {
function extractMentionIds(rawMentions: unknown): string[] {
if (!Array.isArray(rawMentions)) {
return [];
}
return raw
.map((entry) => {
if (!entry || typeof entry !== "object") {
return "";
}
return toNumberId((entry as { uid?: unknown }).uid);
})
.filter(Boolean);
const sink = new Set<string>();
for (const entry of rawMentions) {
if (!entry || typeof entry !== "object") {
continue;
}
const record = entry as { uid?: unknown };
const id = toNumberId(record.uid);
if (id) {
sink.add(id);
}
}
return Array.from(sink);
}
type MentionSpan = {
start: number;
end: number;
};
function toNonNegativeInteger(value: unknown): number | null {
if (typeof value === "number" && Number.isFinite(value)) {
const normalized = Math.trunc(value);
return normalized >= 0 ? normalized : null;
}
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number.parseInt(value.trim(), 10);
if (Number.isFinite(parsed)) {
return parsed >= 0 ? parsed : null;
}
}
return null;
}
function extractOwnMentionSpans(
rawMentions: unknown,
ownUserId: string,
contentLength: number,
): MentionSpan[] {
if (!Array.isArray(rawMentions) || !ownUserId || contentLength <= 0) {
return [];
}
const spans: MentionSpan[] = [];
for (const entry of rawMentions) {
if (!entry || typeof entry !== "object") {
continue;
}
const record = entry as {
uid?: unknown;
pos?: unknown;
start?: unknown;
offset?: unknown;
len?: unknown;
length?: unknown;
};
const uid = toNumberId(record.uid);
if (!uid || uid !== ownUserId) {
continue;
}
const startRaw = toNonNegativeInteger(record.pos ?? record.start ?? record.offset);
const lengthRaw = toNonNegativeInteger(record.len ?? record.length);
if (startRaw === null || lengthRaw === null || lengthRaw <= 0) {
continue;
}
const start = Math.min(startRaw, contentLength);
const end = Math.min(start + lengthRaw, contentLength);
if (end <= start) {
continue;
}
spans.push({ start, end });
}
if (spans.length <= 1) {
return spans;
}
spans.sort((a, b) => a.start - b.start);
const merged: MentionSpan[] = [];
for (const span of spans) {
const last = merged[merged.length - 1];
if (!last || span.start > last.end) {
merged.push({ ...span });
continue;
}
last.end = Math.max(last.end, span.end);
}
return merged;
}
function stripOwnMentionsForCommandBody(
content: string,
rawMentions: unknown,
ownUserId: string,
): string {
if (!content || !ownUserId) {
return content;
}
const spans = extractOwnMentionSpans(rawMentions, ownUserId, content.length);
if (spans.length === 0) {
return stripLeadingAtMentionForCommand(content);
}
let cursor = 0;
let output = "";
for (const span of spans) {
if (span.start > cursor) {
output += content.slice(cursor, span.start);
}
cursor = Math.max(cursor, span.end);
}
if (cursor < content.length) {
output += content.slice(cursor);
}
return output.replace(/\s+/g, " ").trim();
}
function stripLeadingAtMentionForCommand(content: string): string {
const fallbackMatch = content.match(/^\s*@[^\s]+(?:\s+|[:,-]\s*)([/!][\s\S]*)$/);
if (!fallbackMatch) {
return content;
}
return fallbackMatch[1].trim();
}
function resolveGroupNameFromMessageData(data: Record<string, unknown>): string | undefined {
@@ -250,9 +378,14 @@ function extractSendMessageId(result: unknown): string | undefined {
return undefined;
}
const payload = result as {
msgId?: string | number;
message?: { msgId?: string | number } | null;
attachment?: Array<{ msgId?: string | number }>;
};
const direct = payload.msgId;
if (direct !== undefined && direct !== null) {
return String(direct);
}
const primary = payload.message?.msgId;
if (primary !== undefined && primary !== null) {
return String(primary);
@@ -311,6 +444,35 @@ function resolveMediaFileName(params: {
return `upload.${ext}`;
}
function resolveUploadedVoiceAsset(
uploaded: Array<{
fileType?: string;
fileUrl?: string;
fileName?: string;
}>,
): { fileUrl: string; fileName?: string } | undefined {
for (const item of uploaded) {
if (!item || typeof item !== "object") {
continue;
}
const fileType = item.fileType?.toLowerCase();
const fileUrl = item.fileUrl?.trim();
if (!fileUrl) {
continue;
}
if (fileType === "others" || fileType === "video") {
return { fileUrl, fileName: item.fileName?.trim() || undefined };
}
}
return undefined;
}
function buildZaloVoicePlaybackUrl(asset: { fileUrl: string; fileName?: string }): string {
// zca-js uses uploadAttachment(...).fileUrl directly for sendVoice.
// Appending filename can produce URLs that play only in the local session.
return asset.fileUrl.trim();
}
function mapFriend(friend: User): ZcaFriend {
return {
userId: String(friend.userId),
@@ -602,6 +764,11 @@ function toInboundMessage(message: Message, ownUserId?: string): ZaloInboundMess
const wasExplicitlyMentioned = Boolean(
normalizedOwnUserId && mentionIds.some((id) => id === normalizedOwnUserId),
);
const commandContent = wasExplicitlyMentioned
? stripOwnMentionsForCommandBody(content, data.mentions, normalizedOwnUserId)
: hasAnyMention && !canResolveExplicitMention
? stripLeadingAtMentionForCommand(content)
: content;
const implicitMention = Boolean(
normalizedOwnUserId && quoteOwnerId && quoteOwnerId === normalizedOwnUserId,
);
@@ -613,6 +780,7 @@ function toInboundMessage(message: Message, ownUserId?: string): ZaloInboundMess
senderName: typeof data.dName === "string" ? data.dName.trim() || undefined : undefined,
groupName: isGroup ? resolveGroupNameFromMessageData(data) : undefined,
content,
commandContent,
timestampMs: resolveInboundTimestamp(data.ts),
msgId: typeof data.msgId === "string" ? data.msgId : undefined,
cliMsgId: typeof data.cliMsgId === "string" ? data.cliMsgId : undefined,
@@ -649,8 +817,7 @@ export async function getZaloUserInfo(profileInput?: string | null): Promise<Zca
const profile = normalizeProfile(profileInput);
const api = await ensureApi(profile);
const info = await api.fetchAccountInfo();
const user =
info && typeof info === "object" && "profile" in info ? (info.profile as User) : (info as User);
const user = normalizeAccountInfoUser(info);
if (!user?.userId) {
return null;
}
@@ -851,6 +1018,40 @@ export async function sendZaloTextMessage(
kind: media.kind,
});
const payloadText = (text || options.caption || "").slice(0, 2000);
if (media.kind === "audio") {
let textMessageId: string | undefined;
if (payloadText) {
const textResponse = await api.sendMessage(payloadText, trimmedThreadId, type);
textMessageId = extractSendMessageId(textResponse);
}
const attachmentFileName = fileName.includes(".") ? fileName : `${fileName}.bin`;
const uploaded = await api.uploadAttachment(
[
{
data: media.buffer,
filename: attachmentFileName as `${string}.${string}`,
metadata: {
totalSize: media.buffer.length,
},
},
],
trimmedThreadId,
type,
);
const voiceAsset = resolveUploadedVoiceAsset(uploaded);
if (!voiceAsset) {
throw new Error("Failed to resolve uploaded audio URL for voice message");
}
const voiceUrl = buildZaloVoicePlaybackUrl(voiceAsset);
const response = await api.sendVoice({ voiceUrl }, trimmedThreadId, type);
return {
ok: true,
messageId: extractSendMessageId(response) ?? textMessageId,
};
}
const response = await api.sendMessage(
{
msg: payloadText,
@@ -890,13 +1091,32 @@ export async function sendZaloTypingEvent(
const type = options.isGroup ? ThreadType.Group : ThreadType.User;
if ("sendTypingEvent" in api && typeof api.sendTypingEvent === "function") {
await (api as API & ApiTypingCapability).sendTypingEvent(trimmedThreadId, type);
return;
}
throw new Error("Zalo typing indicator is not supported by current API session");
}
async function resolveOwnUserId(api: API): Promise<string> {
const info = await api.fetchAccountInfo();
const profile = "profile" in info ? info.profile : info;
return toNumberId(profile.userId);
try {
const info = await api.fetchAccountInfo();
const resolved = toNumberId(normalizeAccountInfoUser(info)?.userId);
if (resolved) {
return resolved;
}
} catch {
// Fall back to getOwnId when account info shape changes.
}
try {
const ownId = toNumberId(api.getOwnId());
if (ownId) {
return ownId;
}
} catch {
// Ignore fallback probe failures and keep mention detection conservative.
}
return "";
}
export async function sendZaloReaction(params: {
@@ -1244,12 +1464,18 @@ export async function startZaloListener(params: {
const api = await ensureApi(profile);
const ownUserId = await resolveOwnUserId(api);
let stopped = false;
let watchdogTimer: ReturnType<typeof setInterval> | null = null;
let lastWatchdogTickAt = Date.now();
const cleanup = () => {
if (stopped) {
return;
}
stopped = true;
if (watchdogTimer) {
clearInterval(watchdogTimer);
watchdogTimer = null;
}
try {
api.listener.off("message", onMessage);
api.listener.off("error", onError);
@@ -1276,19 +1502,22 @@ export async function startZaloListener(params: {
params.onMessage(normalized);
};
const onError = (error: unknown) => {
const failListener = (error: Error) => {
if (stopped || params.abortSignal.aborted) {
return;
}
cleanup();
invalidateApi(profile);
params.onError(error);
};
const onError = (error: unknown) => {
const wrapped = error instanceof Error ? error : new Error(String(error));
params.onError(wrapped);
failListener(wrapped);
};
const onClosed = (code: number, reason: string) => {
if (stopped || params.abortSignal.aborted) {
return;
}
params.onError(new Error(`Zalo listener closed (${code}): ${reason || "no reason"}`));
failListener(new Error(`Zalo listener closed (${code}): ${reason || "no reason"}`));
};
api.listener.on("message", onMessage);
@@ -1296,12 +1525,30 @@ export async function startZaloListener(params: {
api.listener.on("closed", onClosed);
try {
api.listener.start({ retryOnClose: true });
api.listener.start({ retryOnClose: false });
} catch (error) {
cleanup();
throw error;
}
watchdogTimer = setInterval(() => {
if (stopped || params.abortSignal.aborted) {
return;
}
const now = Date.now();
const gapMs = now - lastWatchdogTickAt;
lastWatchdogTickAt = now;
if (gapMs <= LISTENER_WATCHDOG_MAX_GAP_MS) {
return;
}
failListener(
new Error(
`Zalo listener watchdog gap detected (${Math.round(gapMs / 1000)}s): forcing reconnect`,
),
);
}, LISTENER_WATCHDOG_INTERVAL_MS);
watchdogTimer.unref?.();
params.abortSignal.addEventListener(
"abort",
() => {

View File

@@ -152,7 +152,7 @@ export type API = {
cookies: unknown[];
};
};
fetchAccountInfo(): Promise<{ profile: User } | User>;
fetchAccountInfo(): Promise<User | { profile: User }>;
getAllFriends(): Promise<User[]>;
getOwnId(): string;
getAllGroups(): Promise<{
@@ -177,9 +177,53 @@ export type API = {
threadId: string,
type?: number,
): Promise<{
msgId?: string | number;
message?: { msgId?: string | number } | null;
attachment?: Array<{ msgId?: string | number }>;
}>;
uploadAttachment(
sources:
| string
| {
data: Buffer;
filename: `${string}.${string}`;
metadata: {
totalSize: number;
width?: number;
height?: number;
};
}
| Array<
| string
| {
data: Buffer;
filename: `${string}.${string}`;
metadata: {
totalSize: number;
width?: number;
height?: number;
};
}
>,
threadId: string,
type?: number,
): Promise<
Array<{
fileType: "image" | "video" | "others";
fileUrl?: string;
msgId?: string | number;
fileId?: string;
fileName?: string;
}>
>;
sendVoice(
options: {
voiceUrl: string;
ttl?: number;
},
threadId: string,
type?: number,
): Promise<{ msgId?: string | number }>;
sendLink(
payload: { link: string; msg?: string },
threadId: string,