zalouser: align group behavior with channels and restart on listener transport loss

This commit is contained in:
Tom
2026-03-03 23:26:49 +07:00
parent 2421e2ef7d
commit 3cf7c305ae
7 changed files with 462 additions and 91 deletions

View File

@@ -86,10 +86,13 @@ Approve via:
- Default: `channels.zalouser.groupPolicy = "open"` (groups allowed). Use `channels.defaults.groupPolicy` to override the default when unset.
- Restrict to an allowlist with:
- `channels.zalouser.groupPolicy = "allowlist"`
- `channels.zalouser.groups` (keys are group IDs or names)
- `channels.zalouser.groups` (keys are group IDs or names; controls which groups are allowed)
- `channels.zalouser.groupAllowFrom` (controls which senders in allowed groups can trigger the bot)
- Block all groups: `channels.zalouser.groupPolicy = "disabled"`.
- The configure wizard can prompt for group allowlists.
- On startup, OpenClaw resolves group/user names in allowlists to IDs and logs the mapping; unresolved entries are kept as typed.
- If `groupAllowFrom` is unset, runtime falls back to `allowFrom` for group sender checks.
- Sender checks apply to both normal group messages and control commands (for example `/new`, `/reset`).
Example:
@@ -98,6 +101,7 @@ Example:
channels: {
zalouser: {
groupPolicy: "allowlist",
groupAllowFrom: ["1471383327500481391"],
groups: {
"123456789": { allow: true },
"Work Chat": { allow: true },
@@ -112,6 +116,7 @@ Example:
- `channels.zalouser.groups.<group>.requireMention` controls whether group replies require a mention.
- Resolution order: exact group id/name -> normalized group slug -> `*` -> default (`true`).
- This applies both to allowlisted groups and open group mode.
- Authorized control commands (for example `/new`) can bypass mention gating.
Example:
@@ -164,7 +169,7 @@ Accounts map to `zalouser` profiles in OpenClaw state. Example:
**Allowlist/group name didn't resolve:**
- Use numeric IDs in `allowFrom`/`groups`, or exact friend/group names.
- Use numeric IDs in `allowFrom`/`groupAllowFrom`/`groups`, or exact friend/group names.
**Upgraded from old CLI-based setup:**

View File

@@ -342,6 +342,7 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
"name",
"dmPolicy",
"allowFrom",
"groupAllowFrom",
"groupPolicy",
"groups",
"messagePrefix",

View File

@@ -17,6 +17,7 @@ const zalouserAccountSchema = z.object({
profile: z.string().optional(),
dmPolicy: z.enum(["pairing", "allowlist", "open", "disabled"]).optional(),
allowFrom: z.array(allowFromEntry).optional(),
groupAllowFrom: z.array(allowFromEntry).optional(),
groupPolicy: z.enum(["disabled", "allowlist", "open"]).optional(),
groups: z.object({}).catchall(groupConfigSchema).optional(),
messagePrefix: z.string().optional(),

View File

@@ -54,11 +54,28 @@ function createRuntimeEnv(): RuntimeEnv {
};
}
function installRuntime(params: { commandAuthorized: boolean }) {
function installRuntime(params: {
commandAuthorized?: boolean;
resolveCommandAuthorizedFromAuthorizers?: (params: {
useAccessGroups: boolean;
authorizers: Array<{ configured: boolean; allowed: boolean }>;
}) => boolean;
}) {
const dispatchReplyWithBufferedBlockDispatcher = vi.fn(async ({ dispatcherOptions, ctx }) => {
await dispatcherOptions.typingCallbacks?.onReplyStart?.();
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 }, ctx };
});
const resolveCommandAuthorizedFromAuthorizers = vi.fn(
(input: {
useAccessGroups: boolean;
authorizers: Array<{ configured: boolean; allowed: boolean }>;
}) => {
if (params.resolveCommandAuthorizedFromAuthorizers) {
return params.resolveCommandAuthorizedFromAuthorizers(input);
}
return params.commandAuthorized ?? false;
},
);
const resolveAgentRoute = vi.fn((input: { peer?: { kind?: string; id?: string } }) => {
const peerKind = input.peer?.kind === "direct" ? "direct" : "group";
const peerId = input.peer?.id ?? "1";
@@ -69,6 +86,7 @@ function installRuntime(params: { commandAuthorized: boolean }) {
mainSessionKey: "agent:main:main",
};
});
const readAllowFromStore = vi.fn(async () => []);
setZalouserRuntime({
logging: {
@@ -76,13 +94,13 @@ function installRuntime(params: { commandAuthorized: boolean }) {
},
channel: {
pairing: {
readAllowFromStore: vi.fn(async () => []),
readAllowFromStore,
upsertPairingRequest: vi.fn(async () => ({ code: "PAIR", created: true })),
buildPairingReply: vi.fn(() => "pair"),
},
commands: {
shouldComputeCommandAuthorized: vi.fn((body: string) => body.trim().startsWith("/")),
resolveCommandAuthorizedFromAuthorizers: vi.fn(() => params.commandAuthorized),
resolveCommandAuthorizedFromAuthorizers,
isControlCommandMessage: vi.fn((body: string) => body.trim().startsWith("/")),
shouldHandleTextCommands: vi.fn(() => true),
},
@@ -130,7 +148,12 @@ function installRuntime(params: { commandAuthorized: boolean }) {
},
} as unknown as PluginRuntime);
return { dispatchReplyWithBufferedBlockDispatcher, resolveAgentRoute };
return {
dispatchReplyWithBufferedBlockDispatcher,
resolveAgentRoute,
resolveCommandAuthorizedFromAuthorizers,
readAllowFromStore,
};
}
function createGroupMessage(overrides: Partial<ZaloInboundMessage> = {}): ZaloInboundMessage {
@@ -255,6 +278,118 @@ describe("zalouser monitor group mention gating", () => {
expect(callArg?.ctx?.WasMentioned).toBe(true);
});
it("uses commandContent for mention-prefixed control commands", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: true,
});
await __testing.processMessage({
message: createGroupMessage({
content: "@Bot /new",
commandContent: "/new",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: createAccount(),
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.CommandBody).toBe("/new");
expect(callArg?.ctx?.BodyForCommands).toBe("/new");
});
it("allows group control commands when only allowFrom is configured", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, resolveCommandAuthorizedFromAuthorizers } =
installRuntime({
resolveCommandAuthorizedFromAuthorizers: ({ useAccessGroups, authorizers }) =>
useAccessGroups && authorizers.some((entry) => entry.configured && entry.allowed),
});
await __testing.processMessage({
message: createGroupMessage({
content: "/new",
commandContent: "/new",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: {
...createAccount(),
config: {
...createAccount().config,
allowFrom: ["123"],
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const authCall = resolveCommandAuthorizedFromAuthorizers.mock.calls[0]?.[0];
expect(authCall?.authorizers).toEqual([
{ configured: true, allowed: true },
{ configured: true, allowed: true },
]);
});
it("blocks group messages when sender is not in groupAllowFrom/allowFrom", async () => {
const { dispatchReplyWithBufferedBlockDispatcher } = installRuntime({
commandAuthorized: false,
});
await __testing.processMessage({
message: createGroupMessage({
content: "ping @bot",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: {
...createAccount(),
config: {
...createAccount().config,
groupPolicy: "allowlist",
allowFrom: ["999"],
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled();
});
it("allows group control commands when sender is in groupAllowFrom", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, resolveCommandAuthorizedFromAuthorizers } =
installRuntime({
resolveCommandAuthorizedFromAuthorizers: ({ useAccessGroups, authorizers }) =>
useAccessGroups && authorizers.some((entry) => entry.configured && entry.allowed),
});
await __testing.processMessage({
message: createGroupMessage({
content: "/new",
commandContent: "/new",
hasAnyMention: true,
wasExplicitlyMentioned: true,
}),
account: {
...createAccount(),
config: {
...createAccount().config,
allowFrom: ["999"],
groupAllowFrom: ["123"],
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const authCall = resolveCommandAuthorizedFromAuthorizers.mock.calls[0]?.[0];
expect(authCall?.authorizers).toEqual([
{ configured: true, allowed: false },
{ configured: true, allowed: true },
]);
});
it("routes DM messages with direct peer kind", async () => {
const { dispatchReplyWithBufferedBlockDispatcher, resolveAgentRoute } = installRuntime({
commandAuthorized: false,
@@ -281,4 +416,46 @@ describe("zalouser monitor group mention gating", () => {
const callArg = dispatchReplyWithBufferedBlockDispatcher.mock.calls[0]?.[0];
expect(callArg?.ctx?.SessionKey).toBe("agent:main:zalouser:direct:321");
});
it("reads pairing store for open DM control commands", async () => {
const { readAllowFromStore } = installRuntime({
commandAuthorized: false,
});
const account = createAccount();
await __testing.processMessage({
message: createDmMessage({ content: "/new", commandContent: "/new" }),
account: {
...account,
config: {
...account.config,
dmPolicy: "open",
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(readAllowFromStore).toHaveBeenCalledTimes(1);
});
it("skips pairing store read for open DM non-command messages", async () => {
const { readAllowFromStore } = installRuntime({
commandAuthorized: false,
});
const account = createAccount();
await __testing.processMessage({
message: createDmMessage({ content: "hello there" }),
account: {
...account,
config: {
...account.config,
dmPolicy: "open",
},
},
config: createConfig(),
runtime: createRuntimeEnv(),
});
expect(readAllowFromStore).not.toHaveBeenCalled();
});
});

View File

@@ -5,11 +5,13 @@ import type {
RuntimeEnv,
} from "openclaw/plugin-sdk";
import {
DM_GROUP_ACCESS_REASON,
createTypingCallbacks,
createScopedPairingAccess,
createReplyPrefixOptions,
resolveOutboundMediaUrls,
mergeAllowlist,
resolveDmGroupAccessWithLists,
resolveMentionGatingWithBypass,
resolveOpenProviderRuntimeGroupPolicy,
resolveDefaultGroupPolicy,
@@ -171,6 +173,7 @@ async function processMessage(
if (!rawBody) {
return;
}
const commandBody = message.commandContent?.trim() || rawBody;
const isGroup = message.isGroup;
const chatId = message.threadId;
@@ -237,70 +240,95 @@ async function processMessage(
const dmPolicy = account.config.dmPolicy ?? "pairing";
const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v));
const { senderAllowedForCommands, commandAuthorized } = await resolveSenderCommandAuthorization({
const configGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v));
const shouldComputeCommandAuth = core.channel.commands.shouldComputeCommandAuthorized(
commandBody,
config,
);
const storeAllowFrom =
!isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeCommandAuth)
? await pairing.readAllowFromStore().catch(() => [])
: [];
const accessDecision = resolveDmGroupAccessWithLists({
isGroup,
dmPolicy,
groupPolicy,
allowFrom: configAllowFrom,
groupAllowFrom: configGroupAllowFrom,
storeAllowFrom,
isSenderAllowed: (allowFrom) => isSenderAllowed(senderId, allowFrom),
});
if (isGroup && accessDecision.decision !== "allow") {
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) {
logVerbose(core, runtime, "Blocked zalouser group message (no group allowlist)");
} else if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) {
logVerbose(
core,
runtime,
`Blocked zalouser sender ${senderId} (not in groupAllowFrom/allowFrom)`,
);
}
return;
}
if (!isGroup && accessDecision.decision !== "allow") {
if (accessDecision.decision === "pairing") {
const { code, created } = await pairing.upsertPairingRequest({
id: senderId,
meta: { name: senderName || undefined },
});
if (created) {
logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`);
try {
await sendMessageZalouser(
chatId,
core.channel.pairing.buildPairingReply({
channel: "zalouser",
idLine: `Your Zalo user id: ${senderId}`,
code,
}),
{ profile: account.profile },
);
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
logVerbose(
core,
runtime,
`zalouser pairing reply failed for ${senderId}: ${String(err)}`,
);
}
}
return;
}
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) {
logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
} else {
logVerbose(
core,
runtime,
`Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
const { commandAuthorized } = await resolveSenderCommandAuthorization({
cfg: config,
rawBody,
rawBody: commandBody,
isGroup,
dmPolicy,
configuredAllowFrom: configAllowFrom,
configuredGroupAllowFrom: configGroupAllowFrom,
senderId,
isSenderAllowed,
readAllowFromStore: pairing.readAllowFromStore,
readAllowFromStore: async () => storeAllowFrom,
shouldComputeCommandAuthorized: (body, cfg) =>
core.channel.commands.shouldComputeCommandAuthorized(body, cfg),
resolveCommandAuthorizedFromAuthorizers: (params) =>
core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params),
});
if (!isGroup) {
if (dmPolicy === "disabled") {
logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
return;
}
if (dmPolicy !== "open") {
const allowed = senderAllowedForCommands;
if (!allowed) {
if (dmPolicy === "pairing") {
const { code, created } = await pairing.upsertPairingRequest({
id: senderId,
meta: { name: senderName || undefined },
});
if (created) {
logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`);
try {
await sendMessageZalouser(
chatId,
core.channel.pairing.buildPairingReply({
channel: "zalouser",
idLine: `Your Zalo user id: ${senderId}`,
code,
}),
{ profile: account.profile },
);
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
logVerbose(
core,
runtime,
`zalouser pairing reply failed for ${senderId}: ${String(err)}`,
);
}
}
} else {
logVerbose(
core,
runtime,
`Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
}
}
const hasControlCommand = core.channel.commands.isControlCommandMessage(rawBody, config);
const hasControlCommand = core.channel.commands.isControlCommandMessage(commandBody, config);
if (isGroup && hasControlCommand && commandAuthorized !== true) {
logVerbose(
core,
@@ -396,7 +424,8 @@ async function processMessage(
Body: body,
BodyForAgent: rawBody,
RawBody: rawBody,
CommandBody: rawBody,
CommandBody: commandBody,
BodyForCommands: commandBody,
From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`,
To: normalizedTo,
SessionKey: route.sessionKey,
@@ -645,40 +674,80 @@ export async function monitorZalouserProvider(
listenerStop = null;
};
const listener = await startZaloListener({
accountId: account.accountId,
profile: account.profile,
abortSignal,
onMessage: (msg) => {
if (stopped) {
return;
}
logVerbose(core, runtime, `[${account.accountId}] inbound message`);
statusSink?.({ lastInboundAt: Date.now() });
processMessage(msg, account, config, core, runtime, statusSink).catch((err) => {
runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`);
});
},
onError: (err) => {
if (stopped || abortSignal.aborted) {
return;
}
runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`);
},
});
let settled = false;
const {
promise: waitForExit,
resolve: resolveRun,
reject: rejectRun,
} = Promise.withResolvers<void>();
const settleSuccess = () => {
if (settled) {
return;
}
settled = true;
stop();
resolveRun();
};
const settleFailure = (error: unknown) => {
if (settled) {
return;
}
settled = true;
stop();
rejectRun(error instanceof Error ? error : new Error(String(error)));
};
const onAbort = () => {
settleSuccess();
};
abortSignal.addEventListener("abort", onAbort, { once: true });
let listener: Awaited<ReturnType<typeof startZaloListener>>;
try {
listener = await startZaloListener({
accountId: account.accountId,
profile: account.profile,
abortSignal,
onMessage: (msg) => {
if (stopped) {
return;
}
logVerbose(core, runtime, `[${account.accountId}] inbound message`);
statusSink?.({ lastInboundAt: Date.now() });
processMessage(msg, account, config, core, runtime, statusSink).catch((err) => {
runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`);
});
},
onError: (err) => {
if (stopped || abortSignal.aborted) {
return;
}
runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`);
settleFailure(err);
},
});
} catch (error) {
abortSignal.removeEventListener("abort", onAbort);
throw error;
}
listenerStop = listener.stop;
if (stopped) {
listenerStop();
listenerStop = null;
}
await new Promise<void>((resolve) => {
abortSignal.addEventListener(
"abort",
() => {
stop();
resolve();
},
{ once: true },
);
});
if (abortSignal.aborted) {
settleSuccess();
}
try {
await waitForExit;
} finally {
abortSignal.removeEventListener("abort", onAbort);
}
return { stop };
}

View File

@@ -35,6 +35,7 @@ export type ZaloInboundMessage = {
senderName?: string;
groupName?: string;
content: string;
commandContent?: string;
timestampMs: number;
msgId?: string;
cliMsgId?: string;
@@ -92,6 +93,7 @@ type ZalouserSharedConfig = {
profile?: string;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
allowFrom?: Array<string | number>;
groupAllowFrom?: Array<string | number>;
groupPolicy?: "open" | "allowlist" | "disabled";
groups?: Record<string, ZalouserGroupConfig>;
messagePrefix?: string;

View File

@@ -217,6 +217,112 @@ function extractMentionIds(rawMentions: unknown): string[] {
return Array.from(sink);
}
type MentionSpan = {
start: number;
end: number;
};
function toNonNegativeInteger(value: unknown): number | null {
if (typeof value === "number" && Number.isFinite(value)) {
const normalized = Math.trunc(value);
return normalized >= 0 ? normalized : null;
}
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number.parseInt(value.trim(), 10);
if (Number.isFinite(parsed)) {
return parsed >= 0 ? parsed : null;
}
}
return null;
}
function extractOwnMentionSpans(
rawMentions: unknown,
ownUserId: string,
contentLength: number,
): MentionSpan[] {
if (!Array.isArray(rawMentions) || !ownUserId || contentLength <= 0) {
return [];
}
const spans: MentionSpan[] = [];
for (const entry of rawMentions) {
if (!entry || typeof entry !== "object") {
continue;
}
const record = entry as {
uid?: unknown;
pos?: unknown;
start?: unknown;
offset?: unknown;
len?: unknown;
length?: unknown;
};
const uid = toNumberId(record.uid);
if (!uid || uid !== ownUserId) {
continue;
}
const startRaw = toNonNegativeInteger(record.pos ?? record.start ?? record.offset);
const lengthRaw = toNonNegativeInteger(record.len ?? record.length);
if (startRaw === null || lengthRaw === null || lengthRaw <= 0) {
continue;
}
const start = Math.min(startRaw, contentLength);
const end = Math.min(start + lengthRaw, contentLength);
if (end <= start) {
continue;
}
spans.push({ start, end });
}
if (spans.length <= 1) {
return spans;
}
spans.sort((a, b) => a.start - b.start);
const merged: MentionSpan[] = [];
for (const span of spans) {
const last = merged[merged.length - 1];
if (!last || span.start > last.end) {
merged.push({ ...span });
continue;
}
last.end = Math.max(last.end, span.end);
}
return merged;
}
function stripOwnMentionsForCommandBody(
content: string,
rawMentions: unknown,
ownUserId: string,
): string {
if (!content || !ownUserId) {
return content;
}
const spans = extractOwnMentionSpans(rawMentions, ownUserId, content.length);
if (spans.length === 0) {
return stripLeadingAtMentionForCommand(content);
}
let cursor = 0;
let output = "";
for (const span of spans) {
if (span.start > cursor) {
output += content.slice(cursor, span.start);
}
cursor = Math.max(cursor, span.end);
}
if (cursor < content.length) {
output += content.slice(cursor);
}
return output.replace(/\s+/g, " ").trim();
}
function stripLeadingAtMentionForCommand(content: string): string {
const fallbackMatch = content.match(/^\s*@[^\s]+(?:\s+|[:,-]\s*)([/!][\s\S]*)$/);
if (!fallbackMatch) {
return content;
}
return fallbackMatch[1].trim();
}
function resolveGroupNameFromMessageData(data: Record<string, unknown>): string | undefined {
const candidates = [data.groupName, data.gName, data.idToName, data.threadName, data.roomName];
for (const candidate of candidates) {
@@ -640,6 +746,11 @@ function toInboundMessage(message: Message, ownUserId?: string): ZaloInboundMess
const wasExplicitlyMentioned = Boolean(
normalizedOwnUserId && mentionIds.some((id) => id === normalizedOwnUserId),
);
const commandContent = wasExplicitlyMentioned
? stripOwnMentionsForCommandBody(content, data.mentions, normalizedOwnUserId)
: hasAnyMention && !canResolveExplicitMention
? stripLeadingAtMentionForCommand(content)
: content;
const implicitMention = Boolean(
normalizedOwnUserId && quoteOwnerId && quoteOwnerId === normalizedOwnUserId,
);
@@ -651,6 +762,7 @@ function toInboundMessage(message: Message, ownUserId?: string): ZaloInboundMess
senderName: typeof data.dName === "string" ? data.dName.trim() || undefined : undefined,
groupName: isGroup ? resolveGroupNameFromMessageData(data) : undefined,
content,
commandContent,
timestampMs: resolveInboundTimestamp(data.ts),
msgId: typeof data.msgId === "string" ? data.msgId : undefined,
cliMsgId: typeof data.cliMsgId === "string" ? data.cliMsgId : undefined,
@@ -1371,6 +1483,8 @@ export async function startZaloListener(params: {
return;
}
const wrapped = error instanceof Error ? error : new Error(String(error));
cleanup();
invalidateApi(profile);
params.onError(wrapped);
};
@@ -1378,6 +1492,8 @@ export async function startZaloListener(params: {
if (stopped || params.abortSignal.aborted) {
return;
}
cleanup();
invalidateApi(profile);
params.onError(new Error(`Zalo listener closed (${code}): ${reason || "no reason"}`));
};