Process: share command queue runtime state

This commit is contained in:
Vincent Koc
2026-03-11 19:47:02 -04:00
parent 2aa1270fdd
commit fb303d7d67

View File

@@ -23,9 +23,6 @@ export class GatewayDrainingError extends Error {
}
}
// Set while gateway is draining for restart; new enqueues are rejected.
let gatewayDraining = false;
// Minimal in-process queue to serialize command executions.
// Default lane ("main") preserves the existing behavior. Additional lanes allow
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
@@ -49,11 +46,25 @@ type LaneState = {
generation: number;
};
const lanes = new Map<string, LaneState>();
let nextTaskId = 1;
/**
* Keep queue runtime state on globalThis so every bundled entry/chunk shares
* the same lanes, counters, and draining flag in production builds.
*/
const _g = globalThis as typeof globalThis & {
__openclaw_command_queue_state__?: {
gatewayDraining: boolean;
lanes: Map<string, LaneState>;
nextTaskId: number;
};
};
const queueState = (_g.__openclaw_command_queue_state__ ??= {
gatewayDraining: false,
lanes: new Map<string, LaneState>(),
nextTaskId: 1,
});
function getLaneState(lane: string): LaneState {
const existing = lanes.get(lane);
const existing = queueState.lanes.get(lane);
if (existing) {
return existing;
}
@@ -65,7 +76,7 @@ function getLaneState(lane: string): LaneState {
draining: false,
generation: 0,
};
lanes.set(lane, created);
queueState.lanes.set(lane, created);
return created;
}
@@ -105,7 +116,7 @@ function drainLane(lane: string) {
);
}
logLaneDequeue(lane, waitedMs, state.queue.length);
const taskId = nextTaskId++;
const taskId = queueState.nextTaskId++;
const taskGeneration = state.generation;
state.activeTaskIds.add(taskId);
void (async () => {
@@ -148,7 +159,7 @@ function drainLane(lane: string) {
* `GatewayDrainingError` instead of being silently killed on shutdown.
*/
export function markGatewayDraining(): void {
gatewayDraining = true;
queueState.gatewayDraining = true;
}
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
@@ -166,7 +177,7 @@ export function enqueueCommandInLane<T>(
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
if (gatewayDraining) {
if (queueState.gatewayDraining) {
return Promise.reject(new GatewayDrainingError());
}
const cleaned = lane.trim() || CommandLane.Main;
@@ -198,7 +209,7 @@ export function enqueueCommand<T>(
export function getQueueSize(lane: string = CommandLane.Main) {
const resolved = lane.trim() || CommandLane.Main;
const state = lanes.get(resolved);
const state = queueState.lanes.get(resolved);
if (!state) {
return 0;
}
@@ -207,7 +218,7 @@ export function getQueueSize(lane: string = CommandLane.Main) {
export function getTotalQueueSize() {
let total = 0;
for (const s of lanes.values()) {
for (const s of queueState.lanes.values()) {
total += s.queue.length + s.activeTaskIds.size;
}
return total;
@@ -215,7 +226,7 @@ export function getTotalQueueSize() {
export function clearCommandLane(lane: string = CommandLane.Main) {
const cleaned = lane.trim() || CommandLane.Main;
const state = lanes.get(cleaned);
const state = queueState.lanes.get(cleaned);
if (!state) {
return 0;
}
@@ -242,9 +253,9 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
* `enqueueCommandInLane()` call (which may never come).
*/
export function resetAllLanes(): void {
gatewayDraining = false;
queueState.gatewayDraining = false;
const lanesToDrain: string[] = [];
for (const state of lanes.values()) {
for (const state of queueState.lanes.values()) {
state.generation += 1;
state.activeTaskIds.clear();
state.draining = false;
@@ -264,7 +275,7 @@ export function resetAllLanes(): void {
*/
export function getActiveTaskCount(): number {
let total = 0;
for (const s of lanes.values()) {
for (const s of queueState.lanes.values()) {
total += s.activeTaskIds.size;
}
return total;
@@ -283,7 +294,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea
const POLL_INTERVAL_MS = 50;
const deadline = Date.now() + timeoutMs;
const activeAtStart = new Set<number>();
for (const state of lanes.values()) {
for (const state of queueState.lanes.values()) {
for (const taskId of state.activeTaskIds) {
activeAtStart.add(taskId);
}
@@ -297,7 +308,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea
}
let hasPending = false;
for (const state of lanes.values()) {
for (const state of queueState.lanes.values()) {
for (const taskId of state.activeTaskIds) {
if (activeAtStart.has(taskId)) {
hasPending = true;