mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-13 19:10:39 +00:00
260 lines
6.6 KiB
TypeScript
260 lines
6.6 KiB
TypeScript
export type QueueSummaryState = {
|
|
dropPolicy: "summarize" | "old" | "new";
|
|
droppedCount: number;
|
|
summaryLines: string[];
|
|
};
|
|
|
|
export type QueueDropPolicy = QueueSummaryState["dropPolicy"];
|
|
|
|
export type QueueState<T> = QueueSummaryState & {
|
|
items: T[];
|
|
cap: number;
|
|
};
|
|
|
|
export function clearQueueSummaryState(state: QueueSummaryState): void {
|
|
state.droppedCount = 0;
|
|
state.summaryLines = [];
|
|
}
|
|
|
|
export function previewQueueSummaryPrompt(params: {
|
|
state: QueueSummaryState;
|
|
noun: string;
|
|
title?: string;
|
|
}): string | undefined {
|
|
return buildQueueSummaryPrompt({
|
|
state: {
|
|
dropPolicy: params.state.dropPolicy,
|
|
droppedCount: params.state.droppedCount,
|
|
summaryLines: [...params.state.summaryLines],
|
|
},
|
|
noun: params.noun,
|
|
title: params.title,
|
|
});
|
|
}
|
|
|
|
export function applyQueueRuntimeSettings<TMode extends string>(params: {
|
|
target: {
|
|
mode: TMode;
|
|
debounceMs: number;
|
|
cap: number;
|
|
dropPolicy: QueueDropPolicy;
|
|
};
|
|
settings: {
|
|
mode: TMode;
|
|
debounceMs?: number;
|
|
cap?: number;
|
|
dropPolicy?: QueueDropPolicy;
|
|
};
|
|
}): void {
|
|
params.target.mode = params.settings.mode;
|
|
params.target.debounceMs =
|
|
typeof params.settings.debounceMs === "number"
|
|
? Math.max(0, params.settings.debounceMs)
|
|
: params.target.debounceMs;
|
|
params.target.cap =
|
|
typeof params.settings.cap === "number" && params.settings.cap > 0
|
|
? Math.floor(params.settings.cap)
|
|
: params.target.cap;
|
|
params.target.dropPolicy = params.settings.dropPolicy ?? params.target.dropPolicy;
|
|
}
|
|
|
|
export function elideQueueText(text: string, limit = 140): string {
|
|
if (text.length <= limit) {
|
|
return text;
|
|
}
|
|
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`;
|
|
}
|
|
|
|
export function buildQueueSummaryLine(text: string, limit = 160): string {
|
|
const cleaned = text.replace(/\s+/g, " ").trim();
|
|
return elideQueueText(cleaned, limit);
|
|
}
|
|
|
|
export function shouldSkipQueueItem<T>(params: {
|
|
item: T;
|
|
items: T[];
|
|
dedupe?: (item: T, items: T[]) => boolean;
|
|
}): boolean {
|
|
if (!params.dedupe) {
|
|
return false;
|
|
}
|
|
return params.dedupe(params.item, params.items);
|
|
}
|
|
|
|
export function applyQueueDropPolicy<T>(params: {
|
|
queue: QueueState<T>;
|
|
summarize: (item: T) => string;
|
|
summaryLimit?: number;
|
|
}): boolean {
|
|
const cap = params.queue.cap;
|
|
if (cap <= 0 || params.queue.items.length < cap) {
|
|
return true;
|
|
}
|
|
if (params.queue.dropPolicy === "new") {
|
|
return false;
|
|
}
|
|
const dropCount = params.queue.items.length - cap + 1;
|
|
const dropped = params.queue.items.splice(0, dropCount);
|
|
if (params.queue.dropPolicy === "summarize") {
|
|
for (const item of dropped) {
|
|
params.queue.droppedCount += 1;
|
|
params.queue.summaryLines.push(buildQueueSummaryLine(params.summarize(item)));
|
|
}
|
|
const limit = Math.max(0, params.summaryLimit ?? cap);
|
|
while (params.queue.summaryLines.length > limit) {
|
|
params.queue.summaryLines.shift();
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
export function waitForQueueDebounce(queue: {
|
|
debounceMs: number;
|
|
lastEnqueuedAt: number;
|
|
}): Promise<void> {
|
|
if (process.env.OPENCLAW_TEST_FAST === "1") {
|
|
return Promise.resolve();
|
|
}
|
|
const debounceMs = Math.max(0, queue.debounceMs);
|
|
if (debounceMs <= 0) {
|
|
return Promise.resolve();
|
|
}
|
|
return new Promise<void>((resolve) => {
|
|
const check = () => {
|
|
const since = Date.now() - queue.lastEnqueuedAt;
|
|
if (since >= debounceMs) {
|
|
resolve();
|
|
return;
|
|
}
|
|
setTimeout(check, debounceMs - since);
|
|
};
|
|
check();
|
|
});
|
|
}
|
|
|
|
export function beginQueueDrain<T extends { draining: boolean }>(
|
|
map: Map<string, T>,
|
|
key: string,
|
|
): T | undefined {
|
|
const queue = map.get(key);
|
|
if (!queue || queue.draining) {
|
|
return undefined;
|
|
}
|
|
queue.draining = true;
|
|
return queue;
|
|
}
|
|
|
|
export async function drainNextQueueItem<T>(
|
|
items: T[],
|
|
run: (item: T) => Promise<void>,
|
|
): Promise<boolean> {
|
|
const next = items[0];
|
|
if (!next) {
|
|
return false;
|
|
}
|
|
await run(next);
|
|
items.shift();
|
|
return true;
|
|
}
|
|
|
|
export async function drainCollectItemIfNeeded<T>(params: {
|
|
forceIndividualCollect: boolean;
|
|
isCrossChannel: boolean;
|
|
setForceIndividualCollect?: (next: boolean) => void;
|
|
items: T[];
|
|
run: (item: T) => Promise<void>;
|
|
}): Promise<"skipped" | "drained" | "empty"> {
|
|
if (!params.forceIndividualCollect && !params.isCrossChannel) {
|
|
return "skipped";
|
|
}
|
|
if (params.isCrossChannel) {
|
|
params.setForceIndividualCollect?.(true);
|
|
}
|
|
const drained = await drainNextQueueItem(params.items, params.run);
|
|
return drained ? "drained" : "empty";
|
|
}
|
|
|
|
export async function drainCollectQueueStep<T>(params: {
|
|
collectState: { forceIndividualCollect: boolean };
|
|
isCrossChannel: boolean;
|
|
items: T[];
|
|
run: (item: T) => Promise<void>;
|
|
}): Promise<"skipped" | "drained" | "empty"> {
|
|
return await drainCollectItemIfNeeded({
|
|
forceIndividualCollect: params.collectState.forceIndividualCollect,
|
|
isCrossChannel: params.isCrossChannel,
|
|
setForceIndividualCollect: (next) => {
|
|
params.collectState.forceIndividualCollect = next;
|
|
},
|
|
items: params.items,
|
|
run: params.run,
|
|
});
|
|
}
|
|
|
|
export function buildQueueSummaryPrompt(params: {
|
|
state: QueueSummaryState;
|
|
noun: string;
|
|
title?: string;
|
|
}): string | undefined {
|
|
if (params.state.dropPolicy !== "summarize" || params.state.droppedCount <= 0) {
|
|
return undefined;
|
|
}
|
|
const noun = params.noun;
|
|
const title =
|
|
params.title ??
|
|
`[Queue overflow] Dropped ${params.state.droppedCount} ${noun}${params.state.droppedCount === 1 ? "" : "s"} due to cap.`;
|
|
const lines = [title];
|
|
if (params.state.summaryLines.length > 0) {
|
|
lines.push("Summary:");
|
|
for (const line of params.state.summaryLines) {
|
|
lines.push(`- ${line}`);
|
|
}
|
|
}
|
|
clearQueueSummaryState(params.state);
|
|
return lines.join("\n");
|
|
}
|
|
|
|
export function buildCollectPrompt<T>(params: {
|
|
title: string;
|
|
items: T[];
|
|
summary?: string;
|
|
renderItem: (item: T, index: number) => string;
|
|
}): string {
|
|
const blocks: string[] = [params.title];
|
|
if (params.summary) {
|
|
blocks.push(params.summary);
|
|
}
|
|
params.items.forEach((item, idx) => {
|
|
blocks.push(params.renderItem(item, idx));
|
|
});
|
|
return blocks.join("\n\n");
|
|
}
|
|
|
|
export function hasCrossChannelItems<T>(
|
|
items: T[],
|
|
resolveKey: (item: T) => { key?: string; cross?: boolean },
|
|
): boolean {
|
|
const keys = new Set<string>();
|
|
let hasUnkeyed = false;
|
|
|
|
for (const item of items) {
|
|
const resolved = resolveKey(item);
|
|
if (resolved.cross) {
|
|
return true;
|
|
}
|
|
if (!resolved.key) {
|
|
hasUnkeyed = true;
|
|
continue;
|
|
}
|
|
keys.add(resolved.key);
|
|
}
|
|
|
|
if (keys.size === 0) {
|
|
return false;
|
|
}
|
|
if (hasUnkeyed) {
|
|
return true;
|
|
}
|
|
return keys.size > 1;
|
|
}
|