Files
openclaw/extensions/qqbot/src/engine/gateway/message-queue.ts
2026-05-01 18:44:51 +01:00

500 lines
16 KiB
TypeScript

/**
* Per-user concurrent message queue.
*
* Messages are serialized per **peer** (one DM user, one group, one guild
* channel) and processed in parallel across peers up to
* {@link DEFAULT_MAX_CONCURRENT_USERS}.
*
* Group-specific enhancements (added when merging from the standalone build):
* - Group peers have a larger queue cap ({@link DEFAULT_GROUP_QUEUE_SIZE})
* because groups can burst more chatter than a single DM.
* - When a group's queue overflows, bot-authored messages are evicted
* preferentially so human messages don't get dropped.
* - When draining a group peer with more than one queued message, the
* non-command messages are **merged** into one logical turn (see
* {@link mergeGroupMessages}). Slash commands are always processed
* individually to avoid conflating a "/stop" with surrounding chatter.
*
* The module is self-contained: the only injected dependency is the
* logger / abort probe supplied via {@link MessageQueueContext}.
*/
import { formatErrorMessage } from "../utils/format.js";
// ============ Queue limits ============
/** Global cap across all peers. */
const DEFAULT_GLOBAL_QUEUE_SIZE = 1000;
/** Per-DM / per-channel cap. */
const DEFAULT_PER_PEER_QUEUE_SIZE = 20;
/** Per-group cap — larger because groups burst more. */
const DEFAULT_GROUP_QUEUE_SIZE = 50;
/** Parallel fanout across peers. */
const DEFAULT_MAX_CONCURRENT_USERS = 10;
// ============ Types ============
/** Mention entry carried on group messages (subset of QQ's shape). */
export interface QueuedMention {
scope?: "all" | "single";
id?: string;
user_openid?: string;
member_openid?: string;
username?: string;
nickname?: string;
bot?: boolean;
is_you?: boolean;
}
/**
* Metadata attached to a merged group turn.
*
* When the drainer folds multiple non-command messages into one
* representative turn, the merge information lands here instead of
* being scattered across `_` -prefixed fields on {@link QueuedMessage}.
*/
interface QueuedMergeInfo {
/** Number of original messages folded in. Always >= 2. */
count: number;
/** Original messages in insertion order — `messages.at(-1)` is "current". */
messages: readonly QueuedMessage[];
}
/**
* Queue item used for asynchronous message handling without blocking heartbeats.
*/
export interface QueuedMessage {
type: "c2c" | "guild" | "dm" | "group";
senderId: string;
senderName?: string;
/** Whether the sender is another bot. Used by the eviction policy. */
senderIsBot?: boolean;
content: string;
messageId: string;
timestamp: string;
channelId?: string;
guildId?: string;
groupOpenid?: string;
attachments?: Array<{
content_type: string;
url: string;
filename?: string;
voice_wav_url?: string;
asr_refer_text?: string;
}>;
/** refIdx of the quoted message. */
refMsgIdx?: string;
/** refIdx assigned to this message for future quoting. */
msgIdx?: string;
/** QQ message type (103 = quote). */
msgType?: number;
/** Referenced message elements (for quote messages). */
msgElements?: Array<{
msg_idx?: string;
content?: string;
attachments?: Array<{
content_type: string;
url: string;
filename?: string;
height?: number;
width?: number;
size?: number;
voice_wav_url?: string;
asr_refer_text?: string;
}>;
}>;
/**
* Raw event type (e.g. `GROUP_AT_MESSAGE_CREATE`). Used by the gate to
* detect explicit @bot without parsing `mentions` ourselves, and by
* the group merger to decide whether the merged result represents an
* @bot turn.
*/
eventType?: string;
/** @mentions list from the raw event. */
mentions?: QueuedMention[];
/** Scene info (source channel + ext bag). */
messageScene?: { source?: string; ext?: string[] };
/**
* Set only on merged group turns; absent on single-message turns.
* See {@link mergeGroupMessages} for merge semantics.
*/
merge?: QueuedMergeInfo;
}
/** Convenience predicate: is this a merged multi-message turn? */
export function isMergedTurn(msg: QueuedMessage): msg is QueuedMessage & {
merge: QueuedMergeInfo;
} {
return (msg.merge?.count ?? 0) > 1;
}
interface MessageQueueContext {
accountId: string;
log?: {
info: (msg: string, meta?: Record<string, unknown>) => void;
error: (msg: string, meta?: Record<string, unknown>) => void;
debug?: (msg: string, meta?: Record<string, unknown>) => void;
};
/** Abort-state probe supplied by the caller. */
isAborted: () => boolean;
/** Per-group queue cap. Defaults to {@link DEFAULT_GROUP_QUEUE_SIZE}. */
groupQueueSize?: number;
/** Per-DM / per-channel queue cap. Defaults to {@link DEFAULT_PER_PEER_QUEUE_SIZE}. */
peerQueueSize?: number;
/** Global queue cap. Defaults to {@link DEFAULT_GLOBAL_QUEUE_SIZE}. */
globalQueueSize?: number;
/** Max concurrent peers. Defaults to {@link DEFAULT_MAX_CONCURRENT_USERS}. */
maxConcurrentUsers?: number;
}
/** Snapshot of the queue state for diagnostics. */
interface QueueSnapshot {
totalPending: number;
activeUsers: number;
maxConcurrentUsers: number;
senderPending: number;
}
interface MessageQueue {
enqueue: (msg: QueuedMessage) => void;
startProcessor: (handleMessageFn: (msg: QueuedMessage) => Promise<void>) => void;
getSnapshot: (senderPeerId: string) => QueueSnapshot;
getMessagePeerId: (msg: QueuedMessage) => string;
/** Clear a user's queued messages and return how many were dropped. */
clearUserQueue: (peerId: string) => number;
/** Execute one message immediately, bypassing the queue for urgent commands. */
executeImmediate: (msg: QueuedMessage) => void;
}
// ============ Group merging ============
/** Return true when the peer id refers to a group-like conversation. */
function isGroupPeer(peerId: string): boolean {
return peerId.startsWith("group:") || peerId.startsWith("guild:");
}
/** Slash-command test used by {@link drainGroupBatch}. */
function isSlashCommand(msg: QueuedMessage): boolean {
return (msg.content ?? "").trim().startsWith("/");
}
/**
* Merge several queued group messages into one representative message.
*
* Merge semantics:
* - `content` is joined with newlines; each line prefixed with `[sender]`
* so the downstream formatter can attribute speakers.
* - `attachments` is concatenated.
* - `mentions` is deduplicated by member/user openid; if *any* source
* message was a `GROUP_AT_MESSAGE_CREATE`, the merged result inherits
* that eventType (the merged turn effectively @-s the bot).
* - `messageId`, `msgIdx`, `timestamp` come from the last message — the
* most recent identity is what the outbound reply should quote.
* - `refMsgIdx` (the message that the user quoted) comes from the FIRST
* message in the batch because the first quote anchors the topic.
* - `senderIsBot` is true only when every source message was authored
* by a bot. Any human participation flips the flag.
*
* A single-message batch is returned unchanged (no merge overhead).
*/
export function mergeGroupMessages(batch: QueuedMessage[]): QueuedMessage {
if (batch.length === 0) {
throw new Error("mergeGroupMessages: empty batch");
}
if (batch.length === 1) {
return batch[0];
}
const first = batch[0];
const last = batch[batch.length - 1];
const mergedContent = batch
.map((m) => `[${m.senderName ?? m.senderId}]: ${m.content}`)
.join("\n");
const mergedAttachments: QueuedMessage["attachments"] = [];
for (const m of batch) {
if (m.attachments?.length) {
mergedAttachments.push(...m.attachments);
}
}
const seenMentionIds = new Set<string>();
const mergedMentions: NonNullable<QueuedMessage["mentions"]> = [];
let anyAtYouEvent = false;
for (const m of batch) {
if (m.eventType === "GROUP_AT_MESSAGE_CREATE") {
anyAtYouEvent = true;
}
if (m.mentions) {
for (const mt of m.mentions) {
const key = mt.member_openid ?? mt.id ?? mt.user_openid ?? "";
if (key && seenMentionIds.has(key)) {
continue;
}
if (key) {
seenMentionIds.add(key);
}
mergedMentions.push(mt);
}
}
}
const allFromBot = batch.every((m) => m.senderIsBot);
return {
type: last.type,
senderId: last.senderId,
senderName: last.senderName,
senderIsBot: allFromBot,
content: mergedContent,
messageId: last.messageId,
timestamp: last.timestamp,
channelId: last.channelId,
guildId: last.guildId,
groupOpenid: last.groupOpenid,
attachments: mergedAttachments.length > 0 ? mergedAttachments : undefined,
refMsgIdx: first.refMsgIdx,
msgIdx: last.msgIdx,
eventType: anyAtYouEvent ? "GROUP_AT_MESSAGE_CREATE" : last.eventType,
mentions: mergedMentions.length > 0 ? mergedMentions : undefined,
messageScene: last.messageScene,
merge: { count: batch.length, messages: batch },
};
}
// ============ Queue factory ============
/**
* Create a per-user concurrent queue with built-in group enhancements.
*/
export function createMessageQueue(ctx: MessageQueueContext): MessageQueue {
const { accountId: _accountId, log } = ctx;
const globalQueueSize = ctx.globalQueueSize ?? DEFAULT_GLOBAL_QUEUE_SIZE;
const peerQueueSize = ctx.peerQueueSize ?? DEFAULT_PER_PEER_QUEUE_SIZE;
const groupQueueSize = ctx.groupQueueSize ?? DEFAULT_GROUP_QUEUE_SIZE;
const maxConcurrentUsers = ctx.maxConcurrentUsers ?? DEFAULT_MAX_CONCURRENT_USERS;
const userQueues = new Map<string, QueuedMessage[]>();
const activeUsers = new Set<string>();
let handleMessageFnRef: ((msg: QueuedMessage) => Promise<void>) | null = null;
let totalEnqueued = 0;
const getMessagePeerId = (msg: QueuedMessage): string => {
if (msg.type === "guild") {
return `guild:${msg.channelId ?? "unknown"}`;
}
if (msg.type === "group") {
return `group:${msg.groupOpenid ?? "unknown"}`;
}
return `dm:${msg.senderId}`;
};
/**
* Evict one message from an over-full queue.
*
* For group peers we prefer to drop a bot-authored message so human
* input never gets lost. Falling back to dropping the oldest keeps the
* queue bounded when all members are bots.
*/
const evictOne = (queue: QueuedMessage[], isGroup: boolean): QueuedMessage | undefined => {
if (isGroup) {
const botIdx = queue.findIndex((m) => m.senderIsBot);
if (botIdx >= 0) {
return queue.splice(botIdx, 1)[0];
}
}
return queue.shift();
};
/** Run a single message, capturing errors in the log. */
const processOne = async (msg: QueuedMessage, peerId: string, label: string): Promise<void> => {
try {
await handleMessageFnRef!(msg);
} catch (err) {
log?.error(`${label} error for ${peerId}: ${formatErrorMessage(err)}`);
}
};
/**
* Drain a group's batch:
* - slash commands are processed one by one (order preserved);
* - the remaining messages are merged into a single turn.
*/
const drainGroupBatch = async (batch: QueuedMessage[], peerId: string): Promise<void> => {
const commands: QueuedMessage[] = [];
const normal: QueuedMessage[] = [];
for (const m of batch) {
if (isSlashCommand(m)) {
commands.push(m);
} else {
normal.push(m);
}
}
for (const cmd of commands) {
log?.debug?.(
`Processing command independently for ${peerId}: ${(cmd.content ?? "").trim().slice(0, 50)}`,
);
await processOne(cmd, peerId, "Command processor");
}
if (normal.length > 0) {
const merged = mergeGroupMessages(normal);
if (normal.length > 1) {
log?.debug?.(`Merged ${normal.length} queued group messages for ${peerId} into one`);
}
await processOne(merged, peerId, `Message processor (merged batch of ${normal.length})`);
}
};
/** Process one peer's queue serially. */
const drainUserQueue = async (peerId: string): Promise<void> => {
if (activeUsers.has(peerId)) {
return;
}
if (activeUsers.size >= maxConcurrentUsers) {
log?.debug?.(`Max concurrent users (${maxConcurrentUsers}) reached, ${peerId} will wait`);
return;
}
const queue = userQueues.get(peerId);
if (!queue || queue.length === 0) {
userQueues.delete(peerId);
return;
}
activeUsers.add(peerId);
const isGroup = isGroupPeer(peerId);
try {
while (queue.length > 0 && !ctx.isAborted()) {
// Group peers with more than one queued message: batch-merge.
if (isGroup && queue.length > 1 && handleMessageFnRef) {
const batch = queue.splice(0);
totalEnqueued = Math.max(0, totalEnqueued - batch.length);
await drainGroupBatch(batch, peerId);
continue;
}
// Single-message (or non-group) path.
const msg = queue.shift()!;
totalEnqueued = Math.max(0, totalEnqueued - 1);
if (handleMessageFnRef) {
await processOne(msg, peerId, "Message processor");
}
}
} finally {
activeUsers.delete(peerId);
userQueues.delete(peerId);
// Fill any freed concurrency slots.
for (const [waitingPeerId, waitingQueue] of userQueues) {
if (activeUsers.size >= maxConcurrentUsers) {
break;
}
if (waitingQueue.length > 0 && !activeUsers.has(waitingPeerId)) {
void drainUserQueue(waitingPeerId);
}
}
}
};
const enqueue = (msg: QueuedMessage): void => {
const peerId = getMessagePeerId(msg);
const isGroup = isGroupPeer(peerId);
let queue = userQueues.get(peerId);
if (!queue) {
queue = [];
userQueues.set(peerId, queue);
}
const maxSize = isGroup ? groupQueueSize : peerQueueSize;
if (queue.length >= maxSize) {
const dropped = evictOne(queue, isGroup);
totalEnqueued = Math.max(0, totalEnqueued - 1);
if (isGroup && dropped?.senderIsBot) {
log?.info(`Queue full for ${peerId}, dropping bot message ${dropped.messageId}`, {
accountId: ctx.accountId,
peerId,
droppedMessageId: dropped.messageId,
reason: "queue_full_evict_bot",
});
} else {
log?.error(`Queue full for ${peerId}, dropping oldest message ${dropped?.messageId}`, {
accountId: ctx.accountId,
peerId,
droppedMessageId: dropped?.messageId,
reason: "queue_full_evict_oldest",
});
}
}
totalEnqueued++;
if (totalEnqueued > globalQueueSize) {
log?.error(
`Global queue limit reached (${totalEnqueued}), message from ${peerId} may be delayed`,
{ accountId: ctx.accountId, peerId, totalEnqueued, globalQueueSize },
);
}
queue.push(msg);
log?.debug?.(
`Message enqueued for ${peerId}, user queue: ${queue.length}, active users: ${activeUsers.size}`,
);
void drainUserQueue(peerId);
};
const startProcessor = (handleMessageFn: (msg: QueuedMessage) => Promise<void>): void => {
handleMessageFnRef = handleMessageFn;
log?.debug?.(
`Message processor started (per-user concurrency, max ${maxConcurrentUsers} users)`,
);
};
const getSnapshot = (senderPeerId: string): QueueSnapshot => {
let totalPending = 0;
for (const [, q] of userQueues) {
totalPending += q.length;
}
const senderQueue = userQueues.get(senderPeerId);
return {
totalPending,
activeUsers: activeUsers.size,
maxConcurrentUsers,
senderPending: senderQueue ? senderQueue.length : 0,
};
};
const clearUserQueue = (peerId: string): number => {
const queue = userQueues.get(peerId);
if (!queue || queue.length === 0) {
return 0;
}
const droppedCount = queue.length;
queue.length = 0;
totalEnqueued = Math.max(0, totalEnqueued - droppedCount);
return droppedCount;
};
const executeImmediate = (msg: QueuedMessage): void => {
if (handleMessageFnRef) {
handleMessageFnRef(msg).catch((err) => {
log?.error(`Immediate execution error: ${err}`);
});
}
};
return {
enqueue,
startProcessor,
getSnapshot,
getMessagePeerId,
clearUserQueue,
executeImmediate,
};
}