From 3e8baf1e8803b4a068ef756d7575bbc8a73c8a2c Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Fri, 13 Feb 2026 05:54:26 +0000 Subject: [PATCH] fix: drain queued work immediately after resetAllLanes() resetAllLanes() now calls drainLane() for lanes with pending queue entries after resetting generation/activeTaskIds. This prevents queued work from stalling indefinitely when no subsequent enqueueCommandInLane() call arrives after a SIGUSR1 restart. Safe because the drain happens after all lanes are fully reset (generation bumped, activeTaskIds cleared, draining=false), so concurrency invariants are preserved. --- src/process/command-queue.test.ts | 27 +++++++-------------------- src/process/command-queue.ts | 13 +++++++++++-- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index f55115452e9..ac617ddda23 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -169,7 +169,7 @@ describe("command queue", () => { await task; }); - it("resetAllLanes requires a fresh enqueue to trigger draining", async () => { + it("resetAllLanes drains queued work immediately after reset", async () => { const lane = `reset-test-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 1); @@ -196,31 +196,18 @@ describe("command queue", () => { await new Promise((r) => setTimeout(r, 5)); expect(task2Ran).toBe(false); - // Simulate SIGUSR1: reset all lanes (as if interrupted tasks' finally blocks never ran). + // Simulate SIGUSR1: reset all lanes. Queued work (task2) should be + // drained immediately — no fresh enqueue needed. resetAllLanes(); - // Complete the stale in-flight task; generation mismatch should make - // its completion path a no-op for queue bookkeeping/draining. + // Complete the stale in-flight task; generation mismatch makes its + // completion path a no-op for queue bookkeeping. resolve1(); await task1; - await new Promise((r) => setTimeout(r, 5)); - const task2BeforeTrigger = await Promise.race([ - task2.then(() => "ran"), - new Promise<"timed-out">((resolve) => setTimeout(() => resolve("timed-out"), 50)), - ]); - expect(task2BeforeTrigger).toBe("timed-out"); - expect(task2Ran).toBe(false); - - // A fresh enqueue triggers drain and allows queued work to resume. - let task3Ran = false; - const task3 = enqueueCommandInLane(lane, async () => { - task3Ran = true; - }); - - await Promise.all([task2, task3]); + // task2 should have been pumped by resetAllLanes's drain pass. + await task2; expect(task2Ran).toBe(true); - expect(task3Ran).toBe(true); }); it("waitForActiveTasks ignores tasks that start after the call", async () => { diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 62e333eaac4..9ee4c741719 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -200,14 +200,23 @@ export function clearCommandLane(lane: string = CommandLane.Main) { * preserved — they represent pending user work that should still execute * after restart. * - * Does not call `drainLane()` directly. New work naturally resumes when the - * next `enqueueCommandInLane()` call triggers a fresh drain pass. + * After resetting, drains any lanes that still have queued entries so + * preserved work is pumped immediately rather than waiting for a future + * `enqueueCommandInLane()` call (which may never come). */ export function resetAllLanes(): void { + const lanesToDrain: string[] = []; for (const state of lanes.values()) { state.generation += 1; state.activeTaskIds.clear(); state.draining = false; + if (state.queue.length > 0) { + lanesToDrain.push(state.lane); + } + } + // Drain after the full reset pass so all lanes are in a clean state first. + for (const lane of lanesToDrain) { + drainLane(lane); } }