From 1bc59cc09df21d65e817791eaec58ebd707d6e50 Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:56:00 +0100 Subject: [PATCH] Gateway: tighten node pending drain semantics (#41429) Merged via squash. Prepared head SHA: 361c2eb5c84e3b532862d843536ca68b21336fb2 Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky --- CHANGELOG.md | 1 + src/gateway/node-pending-work.test.ts | 21 +++++++++++++++++++ src/gateway/node-pending-work.ts | 29 ++++++++++++++++++--------- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b25de7522d..98fcb8153a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - iOS/gateway foreground recovery: reconnect immediately on foreground return after stale background sockets are torn down, so the app no longer stays disconnected until a later wake path happens. (#41384) Thanks @mbelinky. - Cron/subagent followup: do not misclassify empty or `NO_REPLY` cron responses as interim acknowledgements that need a rerun, so deliberately silent cron jobs are no longer retried. (#41383) thanks @jackal092927. - Auth/cooldowns: reset expired auth-profile cooldown error counters before computing the next backoff so stale on-disk counters do not re-escalate into long cooldown loops after expiry. (#41028) thanks @zerone0x. +- Gateway/node pending drain followup: keep `hasMore` true when the deferred baseline status item still needs delivery, and avoid allocating empty pending-work state for drain-only nodes with no queued work. (#41429) Thanks @mbelinky. ## 2026.3.8 diff --git a/src/gateway/node-pending-work.test.ts b/src/gateway/node-pending-work.test.ts index 3c2222dd3a9..2e89e2f20b2 100644 --- a/src/gateway/node-pending-work.test.ts +++ b/src/gateway/node-pending-work.test.ts @@ -3,6 +3,7 @@ import { acknowledgeNodePendingWork, drainNodePendingWork, enqueueNodePendingWork, + getNodePendingWorkStateCountForTests, resetNodePendingWorkForTests, } from "./node-pending-work.js"; @@ -43,4 +44,24 @@ describe("node pending work", () => { const afterAck = drainNodePendingWork("node-2"); expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]); }); + + it("keeps hasMore true when the baseline status item is deferred by maxItems", () => { + enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" }); + + const drained = drainNodePendingWork("node-3", { maxItems: 1 }); + + expect(drained.items.map((item) => item.type)).toEqual(["location.request"]); + expect(drained.hasMore).toBe(true); + }); + + it("does not allocate state for drain-only nodes with no queued work", () => { + expect(getNodePendingWorkStateCountForTests()).toBe(0); + + const drained = drainNodePendingWork("node-4"); + const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] }); + + expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]); + expect(acked).toEqual({ revision: 0, removedItemIds: [] }); + expect(getNodePendingWorkStateCountForTests()).toBe(0); + }); }); diff --git a/src/gateway/node-pending-work.ts b/src/gateway/node-pending-work.ts index 33d356777d2..437b8c12bb7 100644 --- a/src/gateway/node-pending-work.ts +++ b/src/gateway/node-pending-work.ts @@ -45,7 +45,7 @@ const PRIORITY_RANK: Record = { const stateByNodeId = new Map(); -function getState(nodeId: string): NodePendingWorkState { +function getOrCreateState(nodeId: string): NodePendingWorkState { let state = stateByNodeId.get(nodeId); if (!state) { state = { @@ -106,7 +106,7 @@ export function enqueueNodePendingWork(params: { throw new Error("nodeId required"); } const nowMs = Date.now(); - const state = getState(nodeId); + const state = getOrCreateState(nodeId); pruneExpired(state, nowMs); const existing = [...state.itemsById.values()].find((item) => item.type === params.type); if (existing) { @@ -134,21 +134,25 @@ export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): D return { revision: 0, items: [], hasMore: false }; } const nowMs = opts.nowMs ?? Date.now(); - const state = getState(normalizedNodeId); - pruneExpired(state, nowMs); + const state = stateByNodeId.get(normalizedNodeId); + const revision = state?.revision ?? 0; + if (state) { + pruneExpired(state, nowMs); + } const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS))); - const explicitItems = sortedItems(state); + const explicitItems = state ? sortedItems(state) : []; const items = explicitItems.slice(0, maxItems); const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request"); const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus; if (includeBaseline && items.length < maxItems) { items.push(makeBaselineStatusItem(nowMs)); } + const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length; + const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID); return { - revision: state.revision, + revision, items, - hasMore: - explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length, + hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded), }; } @@ -160,7 +164,10 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st if (!nodeId) { return { revision: 0, removedItemIds: [] }; } - const state = getState(nodeId); + const state = stateByNodeId.get(nodeId); + if (!state) { + return { revision: 0, removedItemIds: [] }; + } const removedItemIds: string[] = []; for (const itemId of params.itemIds) { const trimmedId = itemId.trim(); @@ -180,3 +187,7 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st export function resetNodePendingWorkForTests() { stateByNodeId.clear(); } + +export function getNodePendingWorkStateCountForTests(): number { + return stateByNodeId.size; +}