mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 09:40:43 +00:00
perf(core): reduce queue head churn
This commit is contained in:
@@ -31,6 +31,7 @@ const DEFAULT_SLOW_LISTENER_THRESHOLD_MS = 30_000;
|
||||
export class DiscordEventQueue {
|
||||
private readonly options: Required<DiscordEventQueueOptions>;
|
||||
private readonly queue: DiscordEventQueueJob[] = [];
|
||||
private queueHead = 0;
|
||||
private processing = 0;
|
||||
private processedCount = 0;
|
||||
private droppedCount = 0;
|
||||
@@ -52,7 +53,7 @@ export class DiscordEventQueue {
|
||||
}
|
||||
|
||||
enqueue(params: Omit<DiscordEventQueueJob, "resolve" | "reject">): Promise<void> {
|
||||
if (this.queue.length >= this.options.maxQueueSize) {
|
||||
if (this.pendingQueueSize >= this.options.maxQueueSize) {
|
||||
this.droppedCount += 1;
|
||||
return Promise.reject(
|
||||
new Error(
|
||||
@@ -68,7 +69,7 @@ export class DiscordEventQueue {
|
||||
|
||||
getMetrics(): DiscordEventQueueMetrics {
|
||||
return {
|
||||
queueSize: this.queue.length,
|
||||
queueSize: this.pendingQueueSize,
|
||||
processing: this.processing,
|
||||
processed: this.processedCount,
|
||||
dropped: this.droppedCount,
|
||||
@@ -78,9 +79,31 @@ export class DiscordEventQueue {
|
||||
};
|
||||
}
|
||||
|
||||
private get pendingQueueSize(): number {
|
||||
return Math.max(0, this.queue.length - this.queueHead);
|
||||
}
|
||||
|
||||
private takeNextJob(): DiscordEventQueueJob | undefined {
|
||||
if (this.queueHead >= this.queue.length) {
|
||||
this.queue.length = 0;
|
||||
this.queueHead = 0;
|
||||
return undefined;
|
||||
}
|
||||
const job = this.queue[this.queueHead];
|
||||
this.queueHead += 1;
|
||||
if (this.queueHead >= this.queue.length) {
|
||||
this.queue.length = 0;
|
||||
this.queueHead = 0;
|
||||
} else if (this.queueHead > 256 && this.queueHead * 2 > this.queue.length) {
|
||||
this.queue.splice(0, this.queueHead);
|
||||
this.queueHead = 0;
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
private processNext(): void {
|
||||
while (this.processing < this.options.maxConcurrency && this.queue.length > 0) {
|
||||
const job = this.queue.shift();
|
||||
while (this.processing < this.options.maxConcurrency && this.pendingQueueSize > 0) {
|
||||
const job = this.takeNextJob();
|
||||
if (!job) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -394,16 +394,24 @@ function isRetryableError(error: Error): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
const codes = candidates
|
||||
.map((candidate) => readErrorCode(candidate))
|
||||
.filter((code): code is string => Boolean(code));
|
||||
const codes: string[] = [];
|
||||
for (const candidate of candidates) {
|
||||
const code = readErrorCode(candidate);
|
||||
if (code) {
|
||||
codes.push(code);
|
||||
}
|
||||
}
|
||||
if (codes.some((code) => RETRYABLE_NETWORK_ERROR_CODES.has(code))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const names = candidates
|
||||
.map((candidate) => readErrorName(candidate))
|
||||
.filter((name): name is string => Boolean(name));
|
||||
const names: string[] = [];
|
||||
for (const candidate of candidates) {
|
||||
const name = readErrorName(candidate);
|
||||
if (name) {
|
||||
names.push(name);
|
||||
}
|
||||
}
|
||||
if (names.some((name) => RETRYABLE_NETWORK_ERROR_NAMES.has(name))) {
|
||||
return true;
|
||||
}
|
||||
@@ -415,11 +423,13 @@ function isRetryableError(error: Error): boolean {
|
||||
|
||||
function collectErrorCandidates(error: unknown): unknown[] {
|
||||
const queue: unknown[] = [error];
|
||||
let queueIndex = 0;
|
||||
const seen = new Set<unknown>();
|
||||
const candidates: unknown[] = [];
|
||||
|
||||
while (queue.length > 0) {
|
||||
const current = queue.shift();
|
||||
while (queueIndex < queue.length) {
|
||||
const current = queue[queueIndex];
|
||||
queueIndex += 1;
|
||||
if (!current || seen.has(current)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -247,9 +247,17 @@ function capPendingBuffer(buffer: string[], pendingChars: number, cap: number) {
|
||||
buffer.push(last.slice(last.length - cap));
|
||||
return cap;
|
||||
}
|
||||
while (buffer.length && pendingChars - buffer[0].length >= cap) {
|
||||
pendingChars -= buffer[0].length;
|
||||
buffer.shift();
|
||||
let dropCount = 0;
|
||||
while (dropCount < buffer.length) {
|
||||
const chunk = buffer[dropCount];
|
||||
if (chunk === undefined || pendingChars - chunk.length < cap) {
|
||||
break;
|
||||
}
|
||||
pendingChars -= chunk.length;
|
||||
dropCount += 1;
|
||||
}
|
||||
if (dropCount > 0) {
|
||||
buffer.splice(0, dropCount);
|
||||
}
|
||||
if (buffer.length && pendingChars > cap) {
|
||||
const overflow = pendingChars - cap;
|
||||
|
||||
@@ -114,9 +114,10 @@ export class TranscriptFileState {
|
||||
const branch: SessionEntry[] = [];
|
||||
let current = (fromId ?? this.leafId) ? this.byId.get((fromId ?? this.leafId)!) : undefined;
|
||||
while (current) {
|
||||
branch.unshift(current);
|
||||
branch.push(current);
|
||||
current = current.parentId ? this.byId.get(current.parentId) : undefined;
|
||||
}
|
||||
branch.reverse();
|
||||
return branch;
|
||||
}
|
||||
|
||||
|
||||
@@ -660,7 +660,7 @@ export function recordToolCall(
|
||||
});
|
||||
|
||||
if (state.toolCallHistory.length > resolvedConfig.historySize) {
|
||||
state.toolCallHistory.shift();
|
||||
state.toolCallHistory.splice(0, state.toolCallHistory.length - resolvedConfig.historySize);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user