diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 7b4a386bdad..821ffaecc45 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -23,9 +23,6 @@ export class GatewayDrainingError extends Error { } } -// Set while gateway is draining for restart; new enqueues are rejected. -let gatewayDraining = false; - // Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow // low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for @@ -49,11 +46,25 @@ type LaneState = { generation: number; }; -const lanes = new Map(); -let nextTaskId = 1; +/** + * Keep queue runtime state on globalThis so every bundled entry/chunk shares + * the same lanes, counters, and draining flag in production builds. + */ +const _g = globalThis as typeof globalThis & { + __openclaw_command_queue_state__?: { + gatewayDraining: boolean; + lanes: Map; + nextTaskId: number; + }; +}; +const queueState = (_g.__openclaw_command_queue_state__ ??= { + gatewayDraining: false, + lanes: new Map(), + nextTaskId: 1, +}); function getLaneState(lane: string): LaneState { - const existing = lanes.get(lane); + const existing = queueState.lanes.get(lane); if (existing) { return existing; } @@ -65,7 +76,7 @@ function getLaneState(lane: string): LaneState { draining: false, generation: 0, }; - lanes.set(lane, created); + queueState.lanes.set(lane, created); return created; } @@ -105,7 +116,7 @@ function drainLane(lane: string) { ); } logLaneDequeue(lane, waitedMs, state.queue.length); - const taskId = nextTaskId++; + const taskId = queueState.nextTaskId++; const taskGeneration = state.generation; state.activeTaskIds.add(taskId); void (async () => { @@ -148,7 +159,7 @@ function drainLane(lane: string) { * `GatewayDrainingError` instead of being silently killed on shutdown. */ export function markGatewayDraining(): void { - gatewayDraining = true; + queueState.gatewayDraining = true; } export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { @@ -166,7 +177,7 @@ export function enqueueCommandInLane( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { - if (gatewayDraining) { + if (queueState.gatewayDraining) { return Promise.reject(new GatewayDrainingError()); } const cleaned = lane.trim() || CommandLane.Main; @@ -198,7 +209,7 @@ export function enqueueCommand( export function getQueueSize(lane: string = CommandLane.Main) { const resolved = lane.trim() || CommandLane.Main; - const state = lanes.get(resolved); + const state = queueState.lanes.get(resolved); if (!state) { return 0; } @@ -207,7 +218,7 @@ export function getQueueSize(lane: string = CommandLane.Main) { export function getTotalQueueSize() { let total = 0; - for (const s of lanes.values()) { + for (const s of queueState.lanes.values()) { total += s.queue.length + s.activeTaskIds.size; } return total; @@ -215,7 +226,7 @@ export function getTotalQueueSize() { export function clearCommandLane(lane: string = CommandLane.Main) { const cleaned = lane.trim() || CommandLane.Main; - const state = lanes.get(cleaned); + const state = queueState.lanes.get(cleaned); if (!state) { return 0; } @@ -242,9 +253,9 @@ export function clearCommandLane(lane: string = CommandLane.Main) { * `enqueueCommandInLane()` call (which may never come). */ export function resetAllLanes(): void { - gatewayDraining = false; + queueState.gatewayDraining = false; const lanesToDrain: string[] = []; - for (const state of lanes.values()) { + for (const state of queueState.lanes.values()) { state.generation += 1; state.activeTaskIds.clear(); state.draining = false; @@ -264,7 +275,7 @@ export function resetAllLanes(): void { */ export function getActiveTaskCount(): number { let total = 0; - for (const s of lanes.values()) { + for (const s of queueState.lanes.values()) { total += s.activeTaskIds.size; } return total; @@ -283,7 +294,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea const POLL_INTERVAL_MS = 50; const deadline = Date.now() + timeoutMs; const activeAtStart = new Set(); - for (const state of lanes.values()) { + for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) { activeAtStart.add(taskId); } @@ -297,7 +308,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea } let hasPending = false; - for (const state of lanes.values()) { + for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) { if (activeAtStart.has(taskId)) { hasPending = true;