Gateway: tighten node pending drain semantics

This commit is contained in:
Mariano Belinky
2026-03-09 21:44:18 +01:00
parent c49af9ea7c
commit f2c544f057
2 changed files with 41 additions and 9 deletions

View File

@@ -3,6 +3,7 @@ import {
acknowledgeNodePendingWork, acknowledgeNodePendingWork,
drainNodePendingWork, drainNodePendingWork,
enqueueNodePendingWork, enqueueNodePendingWork,
getNodePendingWorkStateCountForTests,
resetNodePendingWorkForTests, resetNodePendingWorkForTests,
} from "./node-pending-work.js"; } from "./node-pending-work.js";
@@ -43,4 +44,24 @@ describe("node pending work", () => {
const afterAck = drainNodePendingWork("node-2"); const afterAck = drainNodePendingWork("node-2");
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]); expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
}); });
it("keeps hasMore true when the baseline status item is deferred by maxItems", () => {
enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" });
const drained = drainNodePendingWork("node-3", { maxItems: 1 });
expect(drained.items.map((item) => item.type)).toEqual(["location.request"]);
expect(drained.hasMore).toBe(true);
});
it("does not allocate state for drain-only nodes with no queued work", () => {
expect(getNodePendingWorkStateCountForTests()).toBe(0);
const drained = drainNodePendingWork("node-4");
const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] });
expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]);
expect(acked).toEqual({ revision: 0, removedItemIds: [] });
expect(getNodePendingWorkStateCountForTests()).toBe(0);
});
}); });

View File

@@ -45,7 +45,7 @@ const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
const stateByNodeId = new Map<string, NodePendingWorkState>(); const stateByNodeId = new Map<string, NodePendingWorkState>();
function getState(nodeId: string): NodePendingWorkState { function getOrCreateState(nodeId: string): NodePendingWorkState {
let state = stateByNodeId.get(nodeId); let state = stateByNodeId.get(nodeId);
if (!state) { if (!state) {
state = { state = {
@@ -106,7 +106,7 @@ export function enqueueNodePendingWork(params: {
throw new Error("nodeId required"); throw new Error("nodeId required");
} }
const nowMs = Date.now(); const nowMs = Date.now();
const state = getState(nodeId); const state = getOrCreateState(nodeId);
pruneExpired(state, nowMs); pruneExpired(state, nowMs);
const existing = [...state.itemsById.values()].find((item) => item.type === params.type); const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
if (existing) { if (existing) {
@@ -134,21 +134,25 @@ export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): D
return { revision: 0, items: [], hasMore: false }; return { revision: 0, items: [], hasMore: false };
} }
const nowMs = opts.nowMs ?? Date.now(); const nowMs = opts.nowMs ?? Date.now();
const state = getState(normalizedNodeId); const state = stateByNodeId.get(normalizedNodeId);
pruneExpired(state, nowMs); const revision = state?.revision ?? 0;
if (state) {
pruneExpired(state, nowMs);
}
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS))); const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
const explicitItems = sortedItems(state); const explicitItems = state ? sortedItems(state) : [];
const items = explicitItems.slice(0, maxItems); const items = explicitItems.slice(0, maxItems);
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request"); const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus; const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
if (includeBaseline && items.length < maxItems) { if (includeBaseline && items.length < maxItems) {
items.push(makeBaselineStatusItem(nowMs)); items.push(makeBaselineStatusItem(nowMs));
} }
const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length;
const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID);
return { return {
revision: state.revision, revision,
items, items,
hasMore: hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded),
explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length,
}; };
} }
@@ -160,7 +164,10 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st
if (!nodeId) { if (!nodeId) {
return { revision: 0, removedItemIds: [] }; return { revision: 0, removedItemIds: [] };
} }
const state = getState(nodeId); const state = stateByNodeId.get(nodeId);
if (!state) {
return { revision: 0, removedItemIds: [] };
}
const removedItemIds: string[] = []; const removedItemIds: string[] = [];
for (const itemId of params.itemIds) { for (const itemId of params.itemIds) {
const trimmedId = itemId.trim(); const trimmedId = itemId.trim();
@@ -180,3 +187,7 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st
export function resetNodePendingWorkForTests() { export function resetNodePendingWorkForTests() {
stateByNodeId.clear(); stateByNodeId.clear();
} }
export function getNodePendingWorkStateCountForTests(): number {
return stateByNodeId.size;
}