fix: stabilize live and docker test lanes

This commit is contained in:
Peter Steinberger
2026-04-03 21:41:26 +01:00
parent 5d3edb1d40
commit 0204b8dd28
17 changed files with 329 additions and 146 deletions

View File

@@ -47,6 +47,12 @@ type LaneState = {
generation: number;
};
type ActiveTaskWaiter = {
activeTaskIds: Set<number>;
resolve: (value: { drained: boolean }) => void;
timeout?: ReturnType<typeof setTimeout>;
};
function isExpectedNonErrorLaneFailure(err: unknown): boolean {
return err instanceof Error && err.name === "LiveSessionModelSwitchError";
}
@@ -61,6 +67,7 @@ function getQueueState() {
return resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
gatewayDraining: false,
lanes: new Map<string, LaneState>(),
activeTaskWaiters: new Set<ActiveTaskWaiter>(),
nextTaskId: 1,
}));
}
@@ -99,6 +106,38 @@ function completeTask(state: LaneState, taskId: number, taskGeneration: number):
return true;
}
function hasPendingActiveTasks(taskIds: Set<number>): boolean {
const queueState = getQueueState();
for (const state of queueState.lanes.values()) {
for (const taskId of state.activeTaskIds) {
if (taskIds.has(taskId)) {
return true;
}
}
}
return false;
}
function resolveActiveTaskWaiter(waiter: ActiveTaskWaiter, result: { drained: boolean }): void {
const queueState = getQueueState();
if (!queueState.activeTaskWaiters.delete(waiter)) {
return;
}
if (waiter.timeout) {
clearTimeout(waiter.timeout);
}
waiter.resolve(result);
}
function notifyActiveTaskWaiters(): void {
const queueState = getQueueState();
for (const waiter of Array.from(queueState.activeTaskWaiters)) {
if (waiter.activeTaskIds.size === 0 || !hasPendingActiveTasks(waiter.activeTaskIds)) {
resolveActiveTaskWaiter(waiter, { drained: true });
}
}
}
function drainLane(lane: string) {
const state = getLaneState(lane);
if (state.draining) {
@@ -136,6 +175,7 @@ function drainLane(lane: string) {
const result = await entry.task();
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
if (completedCurrentGeneration) {
notifyActiveTaskWaiters();
diag.debug(
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
);
@@ -155,6 +195,7 @@ function drainLane(lane: string) {
);
}
if (completedCurrentGeneration) {
notifyActiveTaskWaiters();
pump();
}
entry.reject(err);
@@ -263,6 +304,9 @@ export function resetCommandQueueStateForTest(): void {
const queueState = getQueueState();
queueState.gatewayDraining = false;
queueState.lanes.clear();
for (const waiter of Array.from(queueState.activeTaskWaiters)) {
resolveActiveTaskWaiter(waiter, { drained: true });
}
queueState.nextTaskId = 1;
}
@@ -296,6 +340,7 @@ export function resetAllLanes(): void {
for (const lane of lanesToDrain) {
drainLane(lane);
}
notifyActiveTaskWaiters();
}
/**
@@ -320,9 +365,6 @@ export function getActiveTaskCount(): number {
* already executing are waited on.
*/
export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> {
// Keep shutdown/drain checks responsive without busy looping.
const POLL_INTERVAL_MS = 50;
const deadline = Date.now() + timeoutMs;
const queueState = getQueueState();
const activeAtStart = new Set<number>();
for (const state of queueState.lanes.values()) {
@@ -331,36 +373,22 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea
}
}
if (activeAtStart.size === 0) {
return Promise.resolve({ drained: true });
}
if (timeoutMs <= 0) {
return Promise.resolve({ drained: false });
}
return new Promise((resolve) => {
const check = () => {
if (activeAtStart.size === 0) {
resolve({ drained: true });
return;
}
let hasPending = false;
for (const state of queueState.lanes.values()) {
for (const taskId of state.activeTaskIds) {
if (activeAtStart.has(taskId)) {
hasPending = true;
break;
}
}
if (hasPending) {
break;
}
}
if (!hasPending) {
resolve({ drained: true });
return;
}
if (Date.now() >= deadline) {
resolve({ drained: false });
return;
}
setTimeout(check, POLL_INTERVAL_MS);
const waiter: ActiveTaskWaiter = {
activeTaskIds: activeAtStart,
resolve,
};
check();
waiter.timeout = setTimeout(() => {
resolveActiveTaskWaiter(waiter, { drained: false });
}, timeoutMs);
queueState.activeTaskWaiters.add(waiter);
notifyActiveTaskWaiters();
});
}