mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-18 05:20:48 +00:00
fix: derive active count from activeTaskIds.size, constrain ps scan
Address two additional review concerns: 1. Remove separate 'active' counter from LaneState; derive it from activeTaskIds.size instead. This makes negative-underflow impossible — the Set is the single source of truth for active task count. Previously, a double-reset scenario could drive 'active' negative, violating the concurrency check in pump(). 2. Replace unbounded 'ps -axo pid=,command=' with targeted pgrep pre-filter in orphan scanner. Only fetches full command info for candidate PIDs matching 'codex|claude', avoiding O(all-processes) overhead on large hosts.
This commit is contained in:
committed by
Gustavo Madeira Santana
parent
8eb80b6d0b
commit
b8190d35aa
@@ -59,8 +59,21 @@ function resolveCwd(pid) {
|
||||
return match ? match[1] : "unknown";
|
||||
}
|
||||
|
||||
const lines = run("ps -axo pid=,command=").split("\n");
|
||||
const includePatterns = [/\bcodex\b/i, /\bclaude\b/i, /claude\s+code/i];
|
||||
// Pre-filter candidate PIDs using pgrep to avoid scanning all processes.
|
||||
// Falls back to full ps scan if pgrep is unavailable.
|
||||
const candidatePids = run("pgrep -f 'codex|claude' 2>/dev/null || true")
|
||||
.split("\n")
|
||||
.map((s) => s.trim())
|
||||
.filter((s) => s.length > 0 && /^\d+$/.test(s));
|
||||
|
||||
let lines;
|
||||
if (candidatePids.length > 0) {
|
||||
// Fetch command info only for candidate PIDs.
|
||||
lines = run(`ps -o pid=,command= -p ${candidatePids.join(",")}`).split("\n");
|
||||
} else {
|
||||
lines = [];
|
||||
}
|
||||
|
||||
const excludePatterns = [
|
||||
/openclaw-gateway/i,
|
||||
/signal-cli/i,
|
||||
@@ -85,9 +98,6 @@ for (const rawLine of lines) {
|
||||
if (!Number.isInteger(pid) || pid <= 0 || pid === process.pid) {
|
||||
continue;
|
||||
}
|
||||
if (!includePatterns.some((pattern) => pattern.test(cmd))) {
|
||||
continue;
|
||||
}
|
||||
if (excludePatterns.some((pattern) => pattern.test(cmd))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ type QueueEntry = {
|
||||
type LaneState = {
|
||||
lane: string;
|
||||
queue: QueueEntry[];
|
||||
active: number;
|
||||
activeTaskIds: Set<number>;
|
||||
maxConcurrent: number;
|
||||
draining: boolean;
|
||||
@@ -47,7 +46,6 @@ function getLaneState(lane: string): LaneState {
|
||||
const created: LaneState = {
|
||||
lane,
|
||||
queue: [],
|
||||
active: 0,
|
||||
activeTaskIds: new Set(),
|
||||
maxConcurrent: 1,
|
||||
draining: false,
|
||||
@@ -61,7 +59,6 @@ function completeTask(state: LaneState, taskId: number, taskGeneration: number):
|
||||
if (taskGeneration !== state.generation) {
|
||||
return false;
|
||||
}
|
||||
state.active -= 1;
|
||||
state.activeTaskIds.delete(taskId);
|
||||
return true;
|
||||
}
|
||||
@@ -74,7 +71,7 @@ function drainLane(lane: string) {
|
||||
state.draining = true;
|
||||
|
||||
const pump = () => {
|
||||
while (state.active < state.maxConcurrent && state.queue.length > 0) {
|
||||
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
|
||||
const entry = state.queue.shift() as QueueEntry;
|
||||
const waitedMs = Date.now() - entry.enqueuedAt;
|
||||
if (waitedMs >= entry.warnAfterMs) {
|
||||
@@ -86,7 +83,6 @@ function drainLane(lane: string) {
|
||||
logLaneDequeue(lane, waitedMs, state.queue.length);
|
||||
const taskId = nextTaskId++;
|
||||
const taskGeneration = state.generation;
|
||||
state.active += 1;
|
||||
state.activeTaskIds.add(taskId);
|
||||
void (async () => {
|
||||
const startTime = Date.now();
|
||||
@@ -95,7 +91,7 @@ function drainLane(lane: string) {
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
if (completedCurrentGeneration) {
|
||||
diag.debug(
|
||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`,
|
||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
|
||||
);
|
||||
pump();
|
||||
}
|
||||
@@ -148,7 +144,7 @@ export function enqueueCommandInLane<T>(
|
||||
warnAfterMs,
|
||||
onWait: opts?.onWait,
|
||||
});
|
||||
logLaneEnqueue(cleaned, state.queue.length + state.active);
|
||||
logLaneEnqueue(cleaned, state.queue.length + state.activeTaskIds.size);
|
||||
drainLane(cleaned);
|
||||
});
|
||||
}
|
||||
@@ -169,13 +165,13 @@ export function getQueueSize(lane: string = CommandLane.Main) {
|
||||
if (!state) {
|
||||
return 0;
|
||||
}
|
||||
return state.queue.length + state.active;
|
||||
return state.queue.length + state.activeTaskIds.size;
|
||||
}
|
||||
|
||||
export function getTotalQueueSize() {
|
||||
let total = 0;
|
||||
for (const s of lanes.values()) {
|
||||
total += s.queue.length + s.active;
|
||||
total += s.queue.length + s.activeTaskIds.size;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
@@ -210,7 +206,6 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
export function resetAllLanes(): void {
|
||||
for (const state of lanes.values()) {
|
||||
state.generation += 1;
|
||||
state.active = 0;
|
||||
state.activeTaskIds.clear();
|
||||
state.draining = false;
|
||||
}
|
||||
@@ -223,7 +218,7 @@ export function resetAllLanes(): void {
|
||||
export function getActiveTaskCount(): number {
|
||||
let total = 0;
|
||||
for (const s of lanes.values()) {
|
||||
total += s.active;
|
||||
total += s.activeTaskIds.size;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user