diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 91b20baacb0..f1170c9889d 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -78,6 +78,7 @@ const METHOD_SCOPE_GROUPS: Record = { "last-heartbeat", "node.list", "node.describe", + "node.pending.drain", "chat.history", "config.get", "config.schema.lookup", @@ -102,6 +103,7 @@ const METHOD_SCOPE_GROUPS: Record = { "chat.abort", "browser.request", "push.test", + "node.pending.enqueue", ], [ADMIN_SCOPE]: [ "channels.logout", diff --git a/src/gateway/node-pending-work.test.ts b/src/gateway/node-pending-work.test.ts new file mode 100644 index 00000000000..3c2222dd3a9 --- /dev/null +++ b/src/gateway/node-pending-work.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it, beforeEach } from "vitest"; +import { + acknowledgeNodePendingWork, + drainNodePendingWork, + enqueueNodePendingWork, + resetNodePendingWorkForTests, +} from "./node-pending-work.js"; + +describe("node pending work", () => { + beforeEach(() => { + resetNodePendingWorkForTests(); + }); + + it("returns a baseline status request even when no explicit work is queued", () => { + const drained = drainNodePendingWork("node-1"); + expect(drained.items).toEqual([ + expect.objectContaining({ + id: "baseline-status", + type: "status.request", + priority: "default", + }), + ]); + expect(drained.hasMore).toBe(false); + }); + + it("dedupes explicit work by type and removes acknowledged items", () => { + const first = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" }); + const second = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" }); + + expect(first.deduped).toBe(false); + expect(second.deduped).toBe(true); + expect(second.item.id).toBe(first.item.id); + + const drained = drainNodePendingWork("node-2"); + expect(drained.items.map((item) => item.type)).toEqual(["location.request", "status.request"]); + + const acked = acknowledgeNodePendingWork({ + nodeId: "node-2", + itemIds: [first.item.id, "baseline-status"], + }); + expect(acked.removedItemIds).toEqual([first.item.id]); + + const afterAck = drainNodePendingWork("node-2"); + expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]); + }); +}); diff --git a/src/gateway/node-pending-work.ts b/src/gateway/node-pending-work.ts new file mode 100644 index 00000000000..33d356777d2 --- /dev/null +++ b/src/gateway/node-pending-work.ts @@ -0,0 +1,182 @@ +import { randomUUID } from "node:crypto"; + +export const NODE_PENDING_WORK_TYPES = ["status.request", "location.request"] as const; +export type NodePendingWorkType = (typeof NODE_PENDING_WORK_TYPES)[number]; + +export const NODE_PENDING_WORK_PRIORITIES = ["default", "normal", "high"] as const; +export type NodePendingWorkPriority = (typeof NODE_PENDING_WORK_PRIORITIES)[number]; + +export type NodePendingWorkItem = { + id: string; + type: NodePendingWorkType; + priority: NodePendingWorkPriority; + createdAtMs: number; + expiresAtMs: number | null; + payload?: Record; +}; + +type NodePendingWorkState = { + revision: number; + itemsById: Map; +}; + +type DrainOptions = { + maxItems?: number; + includeDefaultStatus?: boolean; + nowMs?: number; +}; + +type DrainResult = { + revision: number; + items: NodePendingWorkItem[]; + hasMore: boolean; +}; + +const DEFAULT_STATUS_ITEM_ID = "baseline-status"; +const DEFAULT_STATUS_PRIORITY: NodePendingWorkPriority = "default"; +const DEFAULT_PRIORITY: NodePendingWorkPriority = "normal"; +const DEFAULT_MAX_ITEMS = 4; +const MAX_ITEMS = 10; +const PRIORITY_RANK: Record = { + high: 3, + normal: 2, + default: 1, +}; + +const stateByNodeId = new Map(); + +function getState(nodeId: string): NodePendingWorkState { + let state = stateByNodeId.get(nodeId); + if (!state) { + state = { + revision: 0, + itemsById: new Map(), + }; + stateByNodeId.set(nodeId, state); + } + return state; +} + +function pruneExpired(state: NodePendingWorkState, nowMs: number): boolean { + let changed = false; + for (const [id, item] of state.itemsById) { + if (item.expiresAtMs !== null && item.expiresAtMs <= nowMs) { + state.itemsById.delete(id); + changed = true; + } + } + if (changed) { + state.revision += 1; + } + return changed; +} + +function sortedItems(state: NodePendingWorkState): NodePendingWorkItem[] { + return [...state.itemsById.values()].toSorted((a, b) => { + const priorityDelta = PRIORITY_RANK[b.priority] - PRIORITY_RANK[a.priority]; + if (priorityDelta !== 0) { + return priorityDelta; + } + if (a.createdAtMs !== b.createdAtMs) { + return a.createdAtMs - b.createdAtMs; + } + return a.id.localeCompare(b.id); + }); +} + +function makeBaselineStatusItem(nowMs: number): NodePendingWorkItem { + return { + id: DEFAULT_STATUS_ITEM_ID, + type: "status.request", + priority: DEFAULT_STATUS_PRIORITY, + createdAtMs: nowMs, + expiresAtMs: null, + }; +} + +export function enqueueNodePendingWork(params: { + nodeId: string; + type: NodePendingWorkType; + priority?: NodePendingWorkPriority; + expiresInMs?: number; + payload?: Record; +}): { revision: number; item: NodePendingWorkItem; deduped: boolean } { + const nodeId = params.nodeId.trim(); + if (!nodeId) { + throw new Error("nodeId required"); + } + const nowMs = Date.now(); + const state = getState(nodeId); + pruneExpired(state, nowMs); + const existing = [...state.itemsById.values()].find((item) => item.type === params.type); + if (existing) { + return { revision: state.revision, item: existing, deduped: true }; + } + const item: NodePendingWorkItem = { + id: randomUUID(), + type: params.type, + priority: params.priority ?? DEFAULT_PRIORITY, + createdAtMs: nowMs, + expiresAtMs: + typeof params.expiresInMs === "number" && Number.isFinite(params.expiresInMs) + ? nowMs + Math.max(1_000, Math.trunc(params.expiresInMs)) + : null, + ...(params.payload ? { payload: params.payload } : {}), + }; + state.itemsById.set(item.id, item); + state.revision += 1; + return { revision: state.revision, item, deduped: false }; +} + +export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): DrainResult { + const normalizedNodeId = nodeId.trim(); + if (!normalizedNodeId) { + return { revision: 0, items: [], hasMore: false }; + } + const nowMs = opts.nowMs ?? Date.now(); + const state = getState(normalizedNodeId); + pruneExpired(state, nowMs); + const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS))); + const explicitItems = sortedItems(state); + const items = explicitItems.slice(0, maxItems); + const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request"); + const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus; + if (includeBaseline && items.length < maxItems) { + items.push(makeBaselineStatusItem(nowMs)); + } + return { + revision: state.revision, + items, + hasMore: + explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length, + }; +} + +export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: string[] }): { + revision: number; + removedItemIds: string[]; +} { + const nodeId = params.nodeId.trim(); + if (!nodeId) { + return { revision: 0, removedItemIds: [] }; + } + const state = getState(nodeId); + const removedItemIds: string[] = []; + for (const itemId of params.itemIds) { + const trimmedId = itemId.trim(); + if (!trimmedId || trimmedId === DEFAULT_STATUS_ITEM_ID) { + continue; + } + if (state.itemsById.delete(trimmedId)) { + removedItemIds.push(trimmedId); + } + } + if (removedItemIds.length > 0) { + state.revision += 1; + } + return { revision: state.revision, removedItemIds }; +} + +export function resetNodePendingWorkForTests() { + stateByNodeId.clear(); +} diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 95306f27f12..9c469333363 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -140,6 +140,14 @@ import { NodeDescribeParamsSchema, type NodeEventParams, NodeEventParamsSchema, + type NodePendingDrainParams, + NodePendingDrainParamsSchema, + type NodePendingDrainResult, + NodePendingDrainResultSchema, + type NodePendingEnqueueParams, + NodePendingEnqueueParamsSchema, + type NodePendingEnqueueResult, + NodePendingEnqueueResultSchema, type NodeInvokeParams, NodeInvokeParamsSchema, type NodeInvokeResultParams, @@ -296,6 +304,12 @@ export const validateNodeInvokeResultParams = ajv.compile(NodeEventParamsSchema); +export const validateNodePendingDrainParams = ajv.compile( + NodePendingDrainParamsSchema, +); +export const validateNodePendingEnqueueParams = ajv.compile( + NodePendingEnqueueParamsSchema, +); export const validatePushTestParams = ajv.compile(PushTestParamsSchema); export const validateSecretsResolveParams = ajv.compile( SecretsResolveParamsSchema, @@ -472,6 +486,10 @@ export { NodeListParamsSchema, NodePendingAckParamsSchema, NodeInvokeParamsSchema, + NodePendingDrainParamsSchema, + NodePendingDrainResultSchema, + NodePendingEnqueueParamsSchema, + NodePendingEnqueueResultSchema, SessionsListParamsSchema, SessionsPreviewParamsSchema, SessionsPatchParamsSchema, @@ -621,6 +639,10 @@ export type { NodeInvokeParams, NodeInvokeResultParams, NodeEventParams, + NodePendingDrainParams, + NodePendingDrainResult, + NodePendingEnqueueParams, + NodePendingEnqueueResult, SessionsListParams, SessionsPreviewParams, SessionsResolveParams, diff --git a/src/gateway/protocol/schema/nodes.ts b/src/gateway/protocol/schema/nodes.ts index 7ce5a4fed0a..413bd42fa42 100644 --- a/src/gateway/protocol/schema/nodes.ts +++ b/src/gateway/protocol/schema/nodes.ts @@ -1,6 +1,14 @@ import { Type } from "@sinclair/typebox"; import { NonEmptyString } from "./primitives.js"; +const NodePendingWorkTypeSchema = Type.String({ + enum: ["status.request", "location.request"], +}); + +const NodePendingWorkPrioritySchema = Type.String({ + enum: ["normal", "high"], +}); + export const NodePairRequestParamsSchema = Type.Object( { nodeId: NonEmptyString, @@ -95,6 +103,56 @@ export const NodeEventParamsSchema = Type.Object( { additionalProperties: false }, ); +export const NodePendingDrainParamsSchema = Type.Object( + { + maxItems: Type.Optional(Type.Integer({ minimum: 1, maximum: 10 })), + }, + { additionalProperties: false }, +); + +export const NodePendingDrainItemSchema = Type.Object( + { + id: NonEmptyString, + type: NodePendingWorkTypeSchema, + priority: Type.String({ enum: ["default", "normal", "high"] }), + createdAtMs: Type.Integer({ minimum: 0 }), + expiresAtMs: Type.Optional(Type.Union([Type.Integer({ minimum: 0 }), Type.Null()])), + payload: Type.Optional(Type.Record(Type.String(), Type.Unknown())), + }, + { additionalProperties: false }, +); + +export const NodePendingDrainResultSchema = Type.Object( + { + nodeId: NonEmptyString, + revision: Type.Integer({ minimum: 0 }), + items: Type.Array(NodePendingDrainItemSchema), + hasMore: Type.Boolean(), + }, + { additionalProperties: false }, +); + +export const NodePendingEnqueueParamsSchema = Type.Object( + { + nodeId: NonEmptyString, + type: NodePendingWorkTypeSchema, + priority: Type.Optional(NodePendingWorkPrioritySchema), + expiresInMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 86_400_000 })), + wake: Type.Optional(Type.Boolean()), + }, + { additionalProperties: false }, +); + +export const NodePendingEnqueueResultSchema = Type.Object( + { + nodeId: NonEmptyString, + revision: Type.Integer({ minimum: 0 }), + queued: NodePendingDrainItemSchema, + wakeTriggered: Type.Boolean(), + }, + { additionalProperties: false }, +); + export const NodeInvokeRequestEventSchema = Type.Object( { id: NonEmptyString, diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 7ccd6cb2d1a..574a74d8d41 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -114,6 +114,10 @@ import { import { NodeDescribeParamsSchema, NodeEventParamsSchema, + NodePendingDrainParamsSchema, + NodePendingDrainResultSchema, + NodePendingEnqueueParamsSchema, + NodePendingEnqueueResultSchema, NodeInvokeParamsSchema, NodeInvokeResultParamsSchema, NodeInvokeRequestEventSchema, @@ -186,6 +190,10 @@ export const ProtocolSchemas = { NodeInvokeParams: NodeInvokeParamsSchema, NodeInvokeResultParams: NodeInvokeResultParamsSchema, NodeEventParams: NodeEventParamsSchema, + NodePendingDrainParams: NodePendingDrainParamsSchema, + NodePendingDrainResult: NodePendingDrainResultSchema, + NodePendingEnqueueParams: NodePendingEnqueueParamsSchema, + NodePendingEnqueueResult: NodePendingEnqueueResultSchema, NodeInvokeRequestEvent: NodeInvokeRequestEventSchema, PushTestParams: PushTestParamsSchema, PushTestResult: PushTestResultSchema, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index cc15b80fd1a..56656aff1a3 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -32,6 +32,10 @@ export type NodeDescribeParams = SchemaType<"NodeDescribeParams">; export type NodeInvokeParams = SchemaType<"NodeInvokeParams">; export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">; export type NodeEventParams = SchemaType<"NodeEventParams">; +export type NodePendingDrainParams = SchemaType<"NodePendingDrainParams">; +export type NodePendingDrainResult = SchemaType<"NodePendingDrainResult">; +export type NodePendingEnqueueParams = SchemaType<"NodePendingEnqueueParams">; +export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">; export type PushTestParams = SchemaType<"PushTestParams">; export type PushTestResult = SchemaType<"PushTestResult">; export type SessionsListParams = SchemaType<"SessionsListParams">; diff --git a/src/gateway/role-policy.test.ts b/src/gateway/role-policy.test.ts index ba371b56bfe..4c3815ec9a3 100644 --- a/src/gateway/role-policy.test.ts +++ b/src/gateway/role-policy.test.ts @@ -21,8 +21,10 @@ describe("gateway role policy", () => { test("authorizes roles against node vs operator methods", () => { expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true); + expect(isRoleAuthorizedForMethod("node", "node.pending.drain")).toBe(true); expect(isRoleAuthorizedForMethod("node", "status")).toBe(false); expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true); + expect(isRoleAuthorizedForMethod("operator", "node.pending.drain")).toBe(true); expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false); }); }); diff --git a/src/gateway/role-policy.ts b/src/gateway/role-policy.ts index 8366cd1c6c2..07b65eea8ca 100644 --- a/src/gateway/role-policy.ts +++ b/src/gateway/role-policy.ts @@ -1,6 +1,7 @@ import { isNodeRoleMethod } from "./method-scopes.js"; export const GATEWAY_ROLES = ["operator", "node"] as const; +const SHARED_ROLE_METHODS = new Set(["node.pending.drain"]); export type GatewayRole = (typeof GATEWAY_ROLES)[number]; @@ -16,6 +17,9 @@ export function roleCanSkipDeviceIdentity(role: GatewayRole, sharedAuthOk: boole } export function isRoleAuthorizedForMethod(role: GatewayRole, method: string): boolean { + if (SHARED_ROLE_METHODS.has(method)) { + return true; + } if (isNodeRoleMethod(method)) { return role === "node"; } diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index 5c5433ae2f7..2785eb7957e 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -76,6 +76,8 @@ const BASE_METHODS = [ "node.rename", "node.list", "node.describe", + "node.pending.drain", + "node.pending.enqueue", "node.invoke", "node.pending.pull", "node.pending.ack", diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 62cd6bbcd9e..483914b9bf5 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -18,6 +18,7 @@ import { execApprovalsHandlers } from "./server-methods/exec-approvals.js"; import { healthHandlers } from "./server-methods/health.js"; import { logsHandlers } from "./server-methods/logs.js"; import { modelsHandlers } from "./server-methods/models.js"; +import { nodePendingHandlers } from "./server-methods/nodes-pending.js"; import { nodeHandlers } from "./server-methods/nodes.js"; import { pushHandlers } from "./server-methods/push.js"; import { sendHandlers } from "./server-methods/send.js"; @@ -87,6 +88,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = { ...systemHandlers, ...updateHandlers, ...nodeHandlers, + ...nodePendingHandlers, ...pushHandlers, ...sendHandlers, ...usageHandlers, diff --git a/src/gateway/server-methods/nodes-pending.test.ts b/src/gateway/server-methods/nodes-pending.test.ts new file mode 100644 index 00000000000..110ef8711e4 --- /dev/null +++ b/src/gateway/server-methods/nodes-pending.test.ts @@ -0,0 +1,177 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { nodePendingHandlers } from "./nodes-pending.js"; + +const mocks = vi.hoisted(() => ({ + drainNodePendingWork: vi.fn(), + enqueueNodePendingWork: vi.fn(), + maybeWakeNodeWithApns: vi.fn(), + maybeSendNodeWakeNudge: vi.fn(), + waitForNodeReconnect: vi.fn(), +})); + +vi.mock("../node-pending-work.js", () => ({ + drainNodePendingWork: mocks.drainNodePendingWork, + enqueueNodePendingWork: mocks.enqueueNodePendingWork, +})); + +vi.mock("./nodes.js", () => ({ + NODE_WAKE_RECONNECT_WAIT_MS: 3_000, + NODE_WAKE_RECONNECT_RETRY_WAIT_MS: 12_000, + maybeWakeNodeWithApns: mocks.maybeWakeNodeWithApns, + maybeSendNodeWakeNudge: mocks.maybeSendNodeWakeNudge, + waitForNodeReconnect: mocks.waitForNodeReconnect, +})); + +type RespondCall = [ + boolean, + unknown?, + { + code?: number; + message?: string; + details?: unknown; + }?, +]; + +function makeContext(overrides?: Partial>) { + return { + nodeRegistry: { + get: vi.fn(() => undefined), + }, + logGateway: { + info: vi.fn(), + warn: vi.fn(), + }, + ...overrides, + }; +} + +describe("node.pending handlers", () => { + beforeEach(() => { + mocks.drainNodePendingWork.mockReset(); + mocks.enqueueNodePendingWork.mockReset(); + mocks.maybeWakeNodeWithApns.mockReset(); + mocks.maybeSendNodeWakeNudge.mockReset(); + mocks.waitForNodeReconnect.mockReset(); + }); + + it("drains pending work for the connected node identity", async () => { + mocks.drainNodePendingWork.mockReturnValue({ + revision: 2, + items: [{ id: "baseline-status", type: "status.request", priority: "default" }], + hasMore: false, + }); + const respond = vi.fn(); + + await nodePendingHandlers["node.pending.drain"]({ + params: { maxItems: 3 }, + respond: respond as never, + client: { connect: { device: { id: "ios-node-1" } } } as never, + context: makeContext() as never, + req: { type: "req", id: "req-node-pending-drain", method: "node.pending.drain" }, + isWebchatConnect: () => false, + }); + + expect(mocks.drainNodePendingWork).toHaveBeenCalledWith("ios-node-1", { + maxItems: 3, + includeDefaultStatus: true, + }); + expect(respond).toHaveBeenCalledWith( + true, + { + nodeId: "ios-node-1", + revision: 2, + items: [{ id: "baseline-status", type: "status.request", priority: "default" }], + hasMore: false, + }, + undefined, + ); + }); + + it("rejects node.pending.drain without a connected device identity", async () => { + const respond = vi.fn(); + + await nodePendingHandlers["node.pending.drain"]({ + params: {}, + respond: respond as never, + client: null, + context: makeContext() as never, + req: { type: "req", id: "req-node-pending-drain-missing", method: "node.pending.drain" }, + isWebchatConnect: () => false, + }); + + const call = respond.mock.calls[0] as RespondCall | undefined; + expect(call?.[0]).toBe(false); + expect(call?.[2]?.message).toContain("connected device identity"); + }); + + it("enqueues pending work and wakes a disconnected node once", async () => { + mocks.enqueueNodePendingWork.mockReturnValue({ + revision: 4, + deduped: false, + item: { + id: "pending-1", + type: "location.request", + priority: "high", + createdAtMs: 100, + expiresAtMs: null, + }, + }); + mocks.maybeWakeNodeWithApns.mockResolvedValue({ + available: true, + throttled: false, + path: "apns", + durationMs: 12, + apnsStatus: 200, + apnsReason: null, + }); + let connected = false; + mocks.waitForNodeReconnect.mockImplementation(async () => { + connected = true; + return true; + }); + const context = makeContext({ + nodeRegistry: { + get: vi.fn(() => (connected ? { nodeId: "ios-node-2" } : undefined)), + }, + }); + const respond = vi.fn(); + + await nodePendingHandlers["node.pending.enqueue"]({ + params: { + nodeId: "ios-node-2", + type: "location.request", + priority: "high", + }, + respond: respond as never, + client: null, + context: context as never, + req: { type: "req", id: "req-node-pending-enqueue", method: "node.pending.enqueue" }, + isWebchatConnect: () => false, + }); + + expect(mocks.enqueueNodePendingWork).toHaveBeenCalledWith({ + nodeId: "ios-node-2", + type: "location.request", + priority: "high", + expiresInMs: undefined, + }); + expect(mocks.maybeWakeNodeWithApns).toHaveBeenCalledWith("ios-node-2", { + wakeReason: "node.pending", + }); + expect(mocks.waitForNodeReconnect).toHaveBeenCalledWith({ + nodeId: "ios-node-2", + context, + timeoutMs: 3_000, + }); + expect(mocks.maybeSendNodeWakeNudge).not.toHaveBeenCalled(); + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + nodeId: "ios-node-2", + revision: 4, + wakeTriggered: true, + }), + undefined, + ); + }); +}); diff --git a/src/gateway/server-methods/nodes-pending.ts b/src/gateway/server-methods/nodes-pending.ts new file mode 100644 index 00000000000..8c46951b072 --- /dev/null +++ b/src/gateway/server-methods/nodes-pending.ts @@ -0,0 +1,159 @@ +import { + drainNodePendingWork, + enqueueNodePendingWork, + type NodePendingWorkPriority, + type NodePendingWorkType, +} from "../node-pending-work.js"; +import { + ErrorCodes, + errorShape, + validateNodePendingDrainParams, + validateNodePendingEnqueueParams, +} from "../protocol/index.js"; +import { respondInvalidParams, respondUnavailableOnThrow } from "./nodes.helpers.js"; +import { + maybeSendNodeWakeNudge, + maybeWakeNodeWithApns, + NODE_WAKE_RECONNECT_RETRY_WAIT_MS, + NODE_WAKE_RECONNECT_WAIT_MS, + waitForNodeReconnect, +} from "./nodes.js"; +import type { GatewayRequestHandlers } from "./types.js"; + +function resolveClientNodeId( + client: { connect?: { device?: { id?: string }; client?: { id?: string } } } | null, +): string | null { + const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id ?? ""; + const trimmed = nodeId.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +export const nodePendingHandlers: GatewayRequestHandlers = { + "node.pending.drain": async ({ params, respond, client }) => { + if (!validateNodePendingDrainParams(params)) { + respondInvalidParams({ + respond, + method: "node.pending.drain", + validator: validateNodePendingDrainParams, + }); + return; + } + const nodeId = resolveClientNodeId(client); + if (!nodeId) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "node.pending.drain requires a connected device identity", + ), + ); + return; + } + const p = params as { maxItems?: number }; + const drained = drainNodePendingWork(nodeId, { + maxItems: p.maxItems, + includeDefaultStatus: true, + }); + respond(true, { nodeId, ...drained }, undefined); + }, + "node.pending.enqueue": async ({ params, respond, context }) => { + if (!validateNodePendingEnqueueParams(params)) { + respondInvalidParams({ + respond, + method: "node.pending.enqueue", + validator: validateNodePendingEnqueueParams, + }); + return; + } + const p = params as { + nodeId: string; + type: NodePendingWorkType; + priority?: NodePendingWorkPriority; + expiresInMs?: number; + wake?: boolean; + }; + await respondUnavailableOnThrow(respond, async () => { + const queued = enqueueNodePendingWork({ + nodeId: p.nodeId, + type: p.type, + priority: p.priority, + expiresInMs: p.expiresInMs, + }); + let wakeTriggered = false; + if (p.wake !== false && !queued.deduped && !context.nodeRegistry.get(p.nodeId)) { + const wakeReqId = queued.item.id; + context.logGateway.info( + `node pending wake start node=${p.nodeId} req=${wakeReqId} type=${queued.item.type}`, + ); + const wake = await maybeWakeNodeWithApns(p.nodeId, { wakeReason: "node.pending" }); + context.logGateway.info( + `node pending wake stage=wake1 node=${p.nodeId} req=${wakeReqId} ` + + `available=${wake.available} throttled=${wake.throttled} ` + + `path=${wake.path} durationMs=${wake.durationMs} ` + + `apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`, + ); + wakeTriggered = wake.available; + if (wake.available) { + const reconnected = await waitForNodeReconnect({ + nodeId: p.nodeId, + context, + timeoutMs: NODE_WAKE_RECONNECT_WAIT_MS, + }); + context.logGateway.info( + `node pending wake stage=wait1 node=${p.nodeId} req=${wakeReqId} ` + + `reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_WAIT_MS}`, + ); + } + if (!context.nodeRegistry.get(p.nodeId) && wake.available) { + const retryWake = await maybeWakeNodeWithApns(p.nodeId, { + force: true, + wakeReason: "node.pending", + }); + context.logGateway.info( + `node pending wake stage=wake2 node=${p.nodeId} req=${wakeReqId} force=true ` + + `available=${retryWake.available} throttled=${retryWake.throttled} ` + + `path=${retryWake.path} durationMs=${retryWake.durationMs} ` + + `apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`, + ); + if (retryWake.available) { + const reconnected = await waitForNodeReconnect({ + nodeId: p.nodeId, + context, + timeoutMs: NODE_WAKE_RECONNECT_RETRY_WAIT_MS, + }); + context.logGateway.info( + `node pending wake stage=wait2 node=${p.nodeId} req=${wakeReqId} ` + + `reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_RETRY_WAIT_MS}`, + ); + } + } + if (!context.nodeRegistry.get(p.nodeId)) { + const nudge = await maybeSendNodeWakeNudge(p.nodeId); + context.logGateway.info( + `node pending wake nudge node=${p.nodeId} req=${wakeReqId} sent=${nudge.sent} ` + + `throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` + + `apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`, + ); + context.logGateway.warn( + `node pending wake done node=${p.nodeId} req=${wakeReqId} connected=false reason=not_connected`, + ); + } else { + context.logGateway.info( + `node pending wake done node=${p.nodeId} req=${wakeReqId} connected=true`, + ); + } + } + respond( + true, + { + nodeId: p.nodeId, + revision: queued.revision, + queued: queued.item, + wakeTriggered, + }, + undefined, + ); + }); + }, +}; diff --git a/src/gateway/server-methods/nodes.ts b/src/gateway/server-methods/nodes.ts index 22e3c0912e4..fadbb0e3742 100644 --- a/src/gateway/server-methods/nodes.ts +++ b/src/gateway/server-methods/nodes.ts @@ -47,9 +47,9 @@ import { } from "./nodes.helpers.js"; import type { GatewayRequestHandlers } from "./types.js"; -const NODE_WAKE_RECONNECT_WAIT_MS = 3_000; -const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000; -const NODE_WAKE_RECONNECT_POLL_MS = 150; +export const NODE_WAKE_RECONNECT_WAIT_MS = 3_000; +export const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000; +export const NODE_WAKE_RECONNECT_POLL_MS = 150; const NODE_WAKE_THROTTLE_MS = 15_000; const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000; const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000; @@ -208,9 +208,9 @@ function toPendingParamsJSON(params: unknown): string | undefined { } } -async function maybeWakeNodeWithApns( +export async function maybeWakeNodeWithApns( nodeId: string, - opts?: { force?: boolean }, + opts?: { force?: boolean; wakeReason?: string }, ): Promise { const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 }; nodeWakeById.set(nodeId, state); @@ -253,7 +253,7 @@ async function maybeWakeNodeWithApns( auth: auth.value, registration, nodeId, - wakeReason: "node.invoke", + wakeReason: opts?.wakeReason ?? "node.invoke", }); if (!wakeResult.ok) { return withDuration({ @@ -298,7 +298,7 @@ async function maybeWakeNodeWithApns( } } -async function maybeSendNodeWakeNudge(nodeId: string): Promise { +export async function maybeSendNodeWakeNudge(nodeId: string): Promise { const startedAtMs = Date.now(); const withDuration = ( attempt: Omit, @@ -362,7 +362,7 @@ async function maybeSendNodeWakeNudge(nodeId: string): Promise unknown } }; timeoutMs?: number;