From f2c544f05725e106321e6d00698b85762184c4b4 Mon Sep 17 00:00:00 2001 From: Mariano Belinky Date: Mon, 9 Mar 2026 21:44:18 +0100 Subject: [PATCH] Gateway: tighten node pending drain semantics --- src/gateway/node-pending-work.test.ts | 21 +++++++++++++++++++ src/gateway/node-pending-work.ts | 29 ++++++++++++++++++--------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/src/gateway/node-pending-work.test.ts b/src/gateway/node-pending-work.test.ts index 3c2222dd3a9..2e89e2f20b2 100644 --- a/src/gateway/node-pending-work.test.ts +++ b/src/gateway/node-pending-work.test.ts @@ -3,6 +3,7 @@ import { acknowledgeNodePendingWork, drainNodePendingWork, enqueueNodePendingWork, + getNodePendingWorkStateCountForTests, resetNodePendingWorkForTests, } from "./node-pending-work.js"; @@ -43,4 +44,24 @@ describe("node pending work", () => { const afterAck = drainNodePendingWork("node-2"); expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]); }); + + it("keeps hasMore true when the baseline status item is deferred by maxItems", () => { + enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" }); + + const drained = drainNodePendingWork("node-3", { maxItems: 1 }); + + expect(drained.items.map((item) => item.type)).toEqual(["location.request"]); + expect(drained.hasMore).toBe(true); + }); + + it("does not allocate state for drain-only nodes with no queued work", () => { + expect(getNodePendingWorkStateCountForTests()).toBe(0); + + const drained = drainNodePendingWork("node-4"); + const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] }); + + expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]); + expect(acked).toEqual({ revision: 0, removedItemIds: [] }); + expect(getNodePendingWorkStateCountForTests()).toBe(0); + }); }); diff --git a/src/gateway/node-pending-work.ts b/src/gateway/node-pending-work.ts index 33d356777d2..437b8c12bb7 100644 --- a/src/gateway/node-pending-work.ts +++ b/src/gateway/node-pending-work.ts @@ -45,7 +45,7 @@ const PRIORITY_RANK: Record = { const stateByNodeId = new Map(); -function getState(nodeId: string): NodePendingWorkState { +function getOrCreateState(nodeId: string): NodePendingWorkState { let state = stateByNodeId.get(nodeId); if (!state) { state = { @@ -106,7 +106,7 @@ export function enqueueNodePendingWork(params: { throw new Error("nodeId required"); } const nowMs = Date.now(); - const state = getState(nodeId); + const state = getOrCreateState(nodeId); pruneExpired(state, nowMs); const existing = [...state.itemsById.values()].find((item) => item.type === params.type); if (existing) { @@ -134,21 +134,25 @@ export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): D return { revision: 0, items: [], hasMore: false }; } const nowMs = opts.nowMs ?? Date.now(); - const state = getState(normalizedNodeId); - pruneExpired(state, nowMs); + const state = stateByNodeId.get(normalizedNodeId); + const revision = state?.revision ?? 0; + if (state) { + pruneExpired(state, nowMs); + } const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS))); - const explicitItems = sortedItems(state); + const explicitItems = state ? sortedItems(state) : []; const items = explicitItems.slice(0, maxItems); const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request"); const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus; if (includeBaseline && items.length < maxItems) { items.push(makeBaselineStatusItem(nowMs)); } + const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length; + const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID); return { - revision: state.revision, + revision, items, - hasMore: - explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length, + hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded), }; } @@ -160,7 +164,10 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st if (!nodeId) { return { revision: 0, removedItemIds: [] }; } - const state = getState(nodeId); + const state = stateByNodeId.get(nodeId); + if (!state) { + return { revision: 0, removedItemIds: [] }; + } const removedItemIds: string[] = []; for (const itemId of params.itemIds) { const trimmedId = itemId.trim(); @@ -180,3 +187,7 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st export function resetNodePendingWorkForTests() { stateByNodeId.clear(); } + +export function getNodePendingWorkStateCountForTests(): number { + return stateByNodeId.size; +}