mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-18 05:20:48 +00:00
fix: drain queued work immediately after resetAllLanes()
resetAllLanes() now calls drainLane() for lanes with pending queue entries after resetting generation/activeTaskIds. This prevents queued work from stalling indefinitely when no subsequent enqueueCommandInLane() call arrives after a SIGUSR1 restart. Safe because the drain happens after all lanes are fully reset (generation bumped, activeTaskIds cleared, draining=false), so concurrency invariants are preserved.
This commit is contained in:
committed by
Gustavo Madeira Santana
parent
3a1bb339e4
commit
3e8baf1e88
@@ -169,7 +169,7 @@ describe("command queue", () => {
|
||||
await task;
|
||||
});
|
||||
|
||||
it("resetAllLanes requires a fresh enqueue to trigger draining", async () => {
|
||||
it("resetAllLanes drains queued work immediately after reset", async () => {
|
||||
const lane = `reset-test-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||
setCommandLaneConcurrency(lane, 1);
|
||||
|
||||
@@ -196,31 +196,18 @@ describe("command queue", () => {
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
expect(task2Ran).toBe(false);
|
||||
|
||||
// Simulate SIGUSR1: reset all lanes (as if interrupted tasks' finally blocks never ran).
|
||||
// Simulate SIGUSR1: reset all lanes. Queued work (task2) should be
|
||||
// drained immediately — no fresh enqueue needed.
|
||||
resetAllLanes();
|
||||
|
||||
// Complete the stale in-flight task; generation mismatch should make
|
||||
// its completion path a no-op for queue bookkeeping/draining.
|
||||
// Complete the stale in-flight task; generation mismatch makes its
|
||||
// completion path a no-op for queue bookkeeping.
|
||||
resolve1();
|
||||
await task1;
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
|
||||
const task2BeforeTrigger = await Promise.race([
|
||||
task2.then(() => "ran"),
|
||||
new Promise<"timed-out">((resolve) => setTimeout(() => resolve("timed-out"), 50)),
|
||||
]);
|
||||
expect(task2BeforeTrigger).toBe("timed-out");
|
||||
expect(task2Ran).toBe(false);
|
||||
|
||||
// A fresh enqueue triggers drain and allows queued work to resume.
|
||||
let task3Ran = false;
|
||||
const task3 = enqueueCommandInLane(lane, async () => {
|
||||
task3Ran = true;
|
||||
});
|
||||
|
||||
await Promise.all([task2, task3]);
|
||||
// task2 should have been pumped by resetAllLanes's drain pass.
|
||||
await task2;
|
||||
expect(task2Ran).toBe(true);
|
||||
expect(task3Ran).toBe(true);
|
||||
});
|
||||
|
||||
it("waitForActiveTasks ignores tasks that start after the call", async () => {
|
||||
|
||||
@@ -200,14 +200,23 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
* preserved — they represent pending user work that should still execute
|
||||
* after restart.
|
||||
*
|
||||
* Does not call `drainLane()` directly. New work naturally resumes when the
|
||||
* next `enqueueCommandInLane()` call triggers a fresh drain pass.
|
||||
* After resetting, drains any lanes that still have queued entries so
|
||||
* preserved work is pumped immediately rather than waiting for a future
|
||||
* `enqueueCommandInLane()` call (which may never come).
|
||||
*/
|
||||
export function resetAllLanes(): void {
|
||||
const lanesToDrain: string[] = [];
|
||||
for (const state of lanes.values()) {
|
||||
state.generation += 1;
|
||||
state.activeTaskIds.clear();
|
||||
state.draining = false;
|
||||
if (state.queue.length > 0) {
|
||||
lanesToDrain.push(state.lane);
|
||||
}
|
||||
}
|
||||
// Drain after the full reset pass so all lanes are in a clean state first.
|
||||
for (const lane of lanesToDrain) {
|
||||
drainLane(lane);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user