mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
Gateway: add pending node work primitives
This commit is contained in:
@@ -78,6 +78,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"last-heartbeat",
|
||||
"node.list",
|
||||
"node.describe",
|
||||
"node.pending.drain",
|
||||
"chat.history",
|
||||
"config.get",
|
||||
"config.schema.lookup",
|
||||
@@ -102,6 +103,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"chat.abort",
|
||||
"browser.request",
|
||||
"push.test",
|
||||
"node.pending.enqueue",
|
||||
],
|
||||
[ADMIN_SCOPE]: [
|
||||
"channels.logout",
|
||||
|
||||
46
src/gateway/node-pending-work.test.ts
Normal file
46
src/gateway/node-pending-work.test.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import { describe, expect, it, beforeEach } from "vitest";
|
||||
import {
|
||||
acknowledgeNodePendingWork,
|
||||
drainNodePendingWork,
|
||||
enqueueNodePendingWork,
|
||||
resetNodePendingWorkForTests,
|
||||
} from "./node-pending-work.js";
|
||||
|
||||
describe("node pending work", () => {
|
||||
beforeEach(() => {
|
||||
resetNodePendingWorkForTests();
|
||||
});
|
||||
|
||||
it("returns a baseline status request even when no explicit work is queued", () => {
|
||||
const drained = drainNodePendingWork("node-1");
|
||||
expect(drained.items).toEqual([
|
||||
expect.objectContaining({
|
||||
id: "baseline-status",
|
||||
type: "status.request",
|
||||
priority: "default",
|
||||
}),
|
||||
]);
|
||||
expect(drained.hasMore).toBe(false);
|
||||
});
|
||||
|
||||
it("dedupes explicit work by type and removes acknowledged items", () => {
|
||||
const first = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
|
||||
const second = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
|
||||
|
||||
expect(first.deduped).toBe(false);
|
||||
expect(second.deduped).toBe(true);
|
||||
expect(second.item.id).toBe(first.item.id);
|
||||
|
||||
const drained = drainNodePendingWork("node-2");
|
||||
expect(drained.items.map((item) => item.type)).toEqual(["location.request", "status.request"]);
|
||||
|
||||
const acked = acknowledgeNodePendingWork({
|
||||
nodeId: "node-2",
|
||||
itemIds: [first.item.id, "baseline-status"],
|
||||
});
|
||||
expect(acked.removedItemIds).toEqual([first.item.id]);
|
||||
|
||||
const afterAck = drainNodePendingWork("node-2");
|
||||
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
|
||||
});
|
||||
});
|
||||
182
src/gateway/node-pending-work.ts
Normal file
182
src/gateway/node-pending-work.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
export const NODE_PENDING_WORK_TYPES = ["status.request", "location.request"] as const;
|
||||
export type NodePendingWorkType = (typeof NODE_PENDING_WORK_TYPES)[number];
|
||||
|
||||
export const NODE_PENDING_WORK_PRIORITIES = ["default", "normal", "high"] as const;
|
||||
export type NodePendingWorkPriority = (typeof NODE_PENDING_WORK_PRIORITIES)[number];
|
||||
|
||||
export type NodePendingWorkItem = {
|
||||
id: string;
|
||||
type: NodePendingWorkType;
|
||||
priority: NodePendingWorkPriority;
|
||||
createdAtMs: number;
|
||||
expiresAtMs: number | null;
|
||||
payload?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
type NodePendingWorkState = {
|
||||
revision: number;
|
||||
itemsById: Map<string, NodePendingWorkItem>;
|
||||
};
|
||||
|
||||
type DrainOptions = {
|
||||
maxItems?: number;
|
||||
includeDefaultStatus?: boolean;
|
||||
nowMs?: number;
|
||||
};
|
||||
|
||||
type DrainResult = {
|
||||
revision: number;
|
||||
items: NodePendingWorkItem[];
|
||||
hasMore: boolean;
|
||||
};
|
||||
|
||||
const DEFAULT_STATUS_ITEM_ID = "baseline-status";
|
||||
const DEFAULT_STATUS_PRIORITY: NodePendingWorkPriority = "default";
|
||||
const DEFAULT_PRIORITY: NodePendingWorkPriority = "normal";
|
||||
const DEFAULT_MAX_ITEMS = 4;
|
||||
const MAX_ITEMS = 10;
|
||||
const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
|
||||
high: 3,
|
||||
normal: 2,
|
||||
default: 1,
|
||||
};
|
||||
|
||||
const stateByNodeId = new Map<string, NodePendingWorkState>();
|
||||
|
||||
function getState(nodeId: string): NodePendingWorkState {
|
||||
let state = stateByNodeId.get(nodeId);
|
||||
if (!state) {
|
||||
state = {
|
||||
revision: 0,
|
||||
itemsById: new Map(),
|
||||
};
|
||||
stateByNodeId.set(nodeId, state);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
function pruneExpired(state: NodePendingWorkState, nowMs: number): boolean {
|
||||
let changed = false;
|
||||
for (const [id, item] of state.itemsById) {
|
||||
if (item.expiresAtMs !== null && item.expiresAtMs <= nowMs) {
|
||||
state.itemsById.delete(id);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
if (changed) {
|
||||
state.revision += 1;
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
function sortedItems(state: NodePendingWorkState): NodePendingWorkItem[] {
|
||||
return [...state.itemsById.values()].toSorted((a, b) => {
|
||||
const priorityDelta = PRIORITY_RANK[b.priority] - PRIORITY_RANK[a.priority];
|
||||
if (priorityDelta !== 0) {
|
||||
return priorityDelta;
|
||||
}
|
||||
if (a.createdAtMs !== b.createdAtMs) {
|
||||
return a.createdAtMs - b.createdAtMs;
|
||||
}
|
||||
return a.id.localeCompare(b.id);
|
||||
});
|
||||
}
|
||||
|
||||
function makeBaselineStatusItem(nowMs: number): NodePendingWorkItem {
|
||||
return {
|
||||
id: DEFAULT_STATUS_ITEM_ID,
|
||||
type: "status.request",
|
||||
priority: DEFAULT_STATUS_PRIORITY,
|
||||
createdAtMs: nowMs,
|
||||
expiresAtMs: null,
|
||||
};
|
||||
}
|
||||
|
||||
export function enqueueNodePendingWork(params: {
|
||||
nodeId: string;
|
||||
type: NodePendingWorkType;
|
||||
priority?: NodePendingWorkPriority;
|
||||
expiresInMs?: number;
|
||||
payload?: Record<string, unknown>;
|
||||
}): { revision: number; item: NodePendingWorkItem; deduped: boolean } {
|
||||
const nodeId = params.nodeId.trim();
|
||||
if (!nodeId) {
|
||||
throw new Error("nodeId required");
|
||||
}
|
||||
const nowMs = Date.now();
|
||||
const state = getState(nodeId);
|
||||
pruneExpired(state, nowMs);
|
||||
const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
|
||||
if (existing) {
|
||||
return { revision: state.revision, item: existing, deduped: true };
|
||||
}
|
||||
const item: NodePendingWorkItem = {
|
||||
id: randomUUID(),
|
||||
type: params.type,
|
||||
priority: params.priority ?? DEFAULT_PRIORITY,
|
||||
createdAtMs: nowMs,
|
||||
expiresAtMs:
|
||||
typeof params.expiresInMs === "number" && Number.isFinite(params.expiresInMs)
|
||||
? nowMs + Math.max(1_000, Math.trunc(params.expiresInMs))
|
||||
: null,
|
||||
...(params.payload ? { payload: params.payload } : {}),
|
||||
};
|
||||
state.itemsById.set(item.id, item);
|
||||
state.revision += 1;
|
||||
return { revision: state.revision, item, deduped: false };
|
||||
}
|
||||
|
||||
export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): DrainResult {
|
||||
const normalizedNodeId = nodeId.trim();
|
||||
if (!normalizedNodeId) {
|
||||
return { revision: 0, items: [], hasMore: false };
|
||||
}
|
||||
const nowMs = opts.nowMs ?? Date.now();
|
||||
const state = getState(normalizedNodeId);
|
||||
pruneExpired(state, nowMs);
|
||||
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
|
||||
const explicitItems = sortedItems(state);
|
||||
const items = explicitItems.slice(0, maxItems);
|
||||
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
|
||||
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
|
||||
if (includeBaseline && items.length < maxItems) {
|
||||
items.push(makeBaselineStatusItem(nowMs));
|
||||
}
|
||||
return {
|
||||
revision: state.revision,
|
||||
items,
|
||||
hasMore:
|
||||
explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length,
|
||||
};
|
||||
}
|
||||
|
||||
export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: string[] }): {
|
||||
revision: number;
|
||||
removedItemIds: string[];
|
||||
} {
|
||||
const nodeId = params.nodeId.trim();
|
||||
if (!nodeId) {
|
||||
return { revision: 0, removedItemIds: [] };
|
||||
}
|
||||
const state = getState(nodeId);
|
||||
const removedItemIds: string[] = [];
|
||||
for (const itemId of params.itemIds) {
|
||||
const trimmedId = itemId.trim();
|
||||
if (!trimmedId || trimmedId === DEFAULT_STATUS_ITEM_ID) {
|
||||
continue;
|
||||
}
|
||||
if (state.itemsById.delete(trimmedId)) {
|
||||
removedItemIds.push(trimmedId);
|
||||
}
|
||||
}
|
||||
if (removedItemIds.length > 0) {
|
||||
state.revision += 1;
|
||||
}
|
||||
return { revision: state.revision, removedItemIds };
|
||||
}
|
||||
|
||||
export function resetNodePendingWorkForTests() {
|
||||
stateByNodeId.clear();
|
||||
}
|
||||
@@ -140,6 +140,14 @@ import {
|
||||
NodeDescribeParamsSchema,
|
||||
type NodeEventParams,
|
||||
NodeEventParamsSchema,
|
||||
type NodePendingDrainParams,
|
||||
NodePendingDrainParamsSchema,
|
||||
type NodePendingDrainResult,
|
||||
NodePendingDrainResultSchema,
|
||||
type NodePendingEnqueueParams,
|
||||
NodePendingEnqueueParamsSchema,
|
||||
type NodePendingEnqueueResult,
|
||||
NodePendingEnqueueResultSchema,
|
||||
type NodeInvokeParams,
|
||||
NodeInvokeParamsSchema,
|
||||
type NodeInvokeResultParams,
|
||||
@@ -296,6 +304,12 @@ export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams
|
||||
NodeInvokeResultParamsSchema,
|
||||
);
|
||||
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
|
||||
export const validateNodePendingDrainParams = ajv.compile<NodePendingDrainParams>(
|
||||
NodePendingDrainParamsSchema,
|
||||
);
|
||||
export const validateNodePendingEnqueueParams = ajv.compile<NodePendingEnqueueParams>(
|
||||
NodePendingEnqueueParamsSchema,
|
||||
);
|
||||
export const validatePushTestParams = ajv.compile<PushTestParams>(PushTestParamsSchema);
|
||||
export const validateSecretsResolveParams = ajv.compile<SecretsResolveParams>(
|
||||
SecretsResolveParamsSchema,
|
||||
@@ -472,6 +486,10 @@ export {
|
||||
NodeListParamsSchema,
|
||||
NodePendingAckParamsSchema,
|
||||
NodeInvokeParamsSchema,
|
||||
NodePendingDrainParamsSchema,
|
||||
NodePendingDrainResultSchema,
|
||||
NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResultSchema,
|
||||
SessionsListParamsSchema,
|
||||
SessionsPreviewParamsSchema,
|
||||
SessionsPatchParamsSchema,
|
||||
@@ -621,6 +639,10 @@ export type {
|
||||
NodeInvokeParams,
|
||||
NodeInvokeResultParams,
|
||||
NodeEventParams,
|
||||
NodePendingDrainParams,
|
||||
NodePendingDrainResult,
|
||||
NodePendingEnqueueParams,
|
||||
NodePendingEnqueueResult,
|
||||
SessionsListParams,
|
||||
SessionsPreviewParams,
|
||||
SessionsResolveParams,
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import { NonEmptyString } from "./primitives.js";
|
||||
|
||||
const NodePendingWorkTypeSchema = Type.String({
|
||||
enum: ["status.request", "location.request"],
|
||||
});
|
||||
|
||||
const NodePendingWorkPrioritySchema = Type.String({
|
||||
enum: ["normal", "high"],
|
||||
});
|
||||
|
||||
export const NodePairRequestParamsSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
@@ -95,6 +103,56 @@ export const NodeEventParamsSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingDrainParamsSchema = Type.Object(
|
||||
{
|
||||
maxItems: Type.Optional(Type.Integer({ minimum: 1, maximum: 10 })),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingDrainItemSchema = Type.Object(
|
||||
{
|
||||
id: NonEmptyString,
|
||||
type: NodePendingWorkTypeSchema,
|
||||
priority: Type.String({ enum: ["default", "normal", "high"] }),
|
||||
createdAtMs: Type.Integer({ minimum: 0 }),
|
||||
expiresAtMs: Type.Optional(Type.Union([Type.Integer({ minimum: 0 }), Type.Null()])),
|
||||
payload: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingDrainResultSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
revision: Type.Integer({ minimum: 0 }),
|
||||
items: Type.Array(NodePendingDrainItemSchema),
|
||||
hasMore: Type.Boolean(),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingEnqueueParamsSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
type: NodePendingWorkTypeSchema,
|
||||
priority: Type.Optional(NodePendingWorkPrioritySchema),
|
||||
expiresInMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 86_400_000 })),
|
||||
wake: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingEnqueueResultSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
revision: Type.Integer({ minimum: 0 }),
|
||||
queued: NodePendingDrainItemSchema,
|
||||
wakeTriggered: Type.Boolean(),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodeInvokeRequestEventSchema = Type.Object(
|
||||
{
|
||||
id: NonEmptyString,
|
||||
|
||||
@@ -114,6 +114,10 @@ import {
|
||||
import {
|
||||
NodeDescribeParamsSchema,
|
||||
NodeEventParamsSchema,
|
||||
NodePendingDrainParamsSchema,
|
||||
NodePendingDrainResultSchema,
|
||||
NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResultSchema,
|
||||
NodeInvokeParamsSchema,
|
||||
NodeInvokeResultParamsSchema,
|
||||
NodeInvokeRequestEventSchema,
|
||||
@@ -186,6 +190,10 @@ export const ProtocolSchemas = {
|
||||
NodeInvokeParams: NodeInvokeParamsSchema,
|
||||
NodeInvokeResultParams: NodeInvokeResultParamsSchema,
|
||||
NodeEventParams: NodeEventParamsSchema,
|
||||
NodePendingDrainParams: NodePendingDrainParamsSchema,
|
||||
NodePendingDrainResult: NodePendingDrainResultSchema,
|
||||
NodePendingEnqueueParams: NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResult: NodePendingEnqueueResultSchema,
|
||||
NodeInvokeRequestEvent: NodeInvokeRequestEventSchema,
|
||||
PushTestParams: PushTestParamsSchema,
|
||||
PushTestResult: PushTestResultSchema,
|
||||
|
||||
@@ -32,6 +32,10 @@ export type NodeDescribeParams = SchemaType<"NodeDescribeParams">;
|
||||
export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
|
||||
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;
|
||||
export type NodeEventParams = SchemaType<"NodeEventParams">;
|
||||
export type NodePendingDrainParams = SchemaType<"NodePendingDrainParams">;
|
||||
export type NodePendingDrainResult = SchemaType<"NodePendingDrainResult">;
|
||||
export type NodePendingEnqueueParams = SchemaType<"NodePendingEnqueueParams">;
|
||||
export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">;
|
||||
export type PushTestParams = SchemaType<"PushTestParams">;
|
||||
export type PushTestResult = SchemaType<"PushTestResult">;
|
||||
export type SessionsListParams = SchemaType<"SessionsListParams">;
|
||||
|
||||
@@ -21,8 +21,10 @@ describe("gateway role policy", () => {
|
||||
|
||||
test("authorizes roles against node vs operator methods", () => {
|
||||
expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("node", "node.pending.drain")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("node", "status")).toBe(false);
|
||||
expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("operator", "node.pending.drain")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { isNodeRoleMethod } from "./method-scopes.js";
|
||||
|
||||
export const GATEWAY_ROLES = ["operator", "node"] as const;
|
||||
const SHARED_ROLE_METHODS = new Set(["node.pending.drain"]);
|
||||
|
||||
export type GatewayRole = (typeof GATEWAY_ROLES)[number];
|
||||
|
||||
@@ -16,6 +17,9 @@ export function roleCanSkipDeviceIdentity(role: GatewayRole, sharedAuthOk: boole
|
||||
}
|
||||
|
||||
export function isRoleAuthorizedForMethod(role: GatewayRole, method: string): boolean {
|
||||
if (SHARED_ROLE_METHODS.has(method)) {
|
||||
return true;
|
||||
}
|
||||
if (isNodeRoleMethod(method)) {
|
||||
return role === "node";
|
||||
}
|
||||
|
||||
@@ -76,6 +76,8 @@ const BASE_METHODS = [
|
||||
"node.rename",
|
||||
"node.list",
|
||||
"node.describe",
|
||||
"node.pending.drain",
|
||||
"node.pending.enqueue",
|
||||
"node.invoke",
|
||||
"node.pending.pull",
|
||||
"node.pending.ack",
|
||||
|
||||
@@ -18,6 +18,7 @@ import { execApprovalsHandlers } from "./server-methods/exec-approvals.js";
|
||||
import { healthHandlers } from "./server-methods/health.js";
|
||||
import { logsHandlers } from "./server-methods/logs.js";
|
||||
import { modelsHandlers } from "./server-methods/models.js";
|
||||
import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
|
||||
import { nodeHandlers } from "./server-methods/nodes.js";
|
||||
import { pushHandlers } from "./server-methods/push.js";
|
||||
import { sendHandlers } from "./server-methods/send.js";
|
||||
@@ -87,6 +88,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
|
||||
...systemHandlers,
|
||||
...updateHandlers,
|
||||
...nodeHandlers,
|
||||
...nodePendingHandlers,
|
||||
...pushHandlers,
|
||||
...sendHandlers,
|
||||
...usageHandlers,
|
||||
|
||||
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { nodePendingHandlers } from "./nodes-pending.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
drainNodePendingWork: vi.fn(),
|
||||
enqueueNodePendingWork: vi.fn(),
|
||||
maybeWakeNodeWithApns: vi.fn(),
|
||||
maybeSendNodeWakeNudge: vi.fn(),
|
||||
waitForNodeReconnect: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../node-pending-work.js", () => ({
|
||||
drainNodePendingWork: mocks.drainNodePendingWork,
|
||||
enqueueNodePendingWork: mocks.enqueueNodePendingWork,
|
||||
}));
|
||||
|
||||
vi.mock("./nodes.js", () => ({
|
||||
NODE_WAKE_RECONNECT_WAIT_MS: 3_000,
|
||||
NODE_WAKE_RECONNECT_RETRY_WAIT_MS: 12_000,
|
||||
maybeWakeNodeWithApns: mocks.maybeWakeNodeWithApns,
|
||||
maybeSendNodeWakeNudge: mocks.maybeSendNodeWakeNudge,
|
||||
waitForNodeReconnect: mocks.waitForNodeReconnect,
|
||||
}));
|
||||
|
||||
type RespondCall = [
|
||||
boolean,
|
||||
unknown?,
|
||||
{
|
||||
code?: number;
|
||||
message?: string;
|
||||
details?: unknown;
|
||||
}?,
|
||||
];
|
||||
|
||||
function makeContext(overrides?: Partial<Record<string, unknown>>) {
|
||||
return {
|
||||
nodeRegistry: {
|
||||
get: vi.fn(() => undefined),
|
||||
},
|
||||
logGateway: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("node.pending handlers", () => {
|
||||
beforeEach(() => {
|
||||
mocks.drainNodePendingWork.mockReset();
|
||||
mocks.enqueueNodePendingWork.mockReset();
|
||||
mocks.maybeWakeNodeWithApns.mockReset();
|
||||
mocks.maybeSendNodeWakeNudge.mockReset();
|
||||
mocks.waitForNodeReconnect.mockReset();
|
||||
});
|
||||
|
||||
it("drains pending work for the connected node identity", async () => {
|
||||
mocks.drainNodePendingWork.mockReturnValue({
|
||||
revision: 2,
|
||||
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||
hasMore: false,
|
||||
});
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.drain"]({
|
||||
params: { maxItems: 3 },
|
||||
respond: respond as never,
|
||||
client: { connect: { device: { id: "ios-node-1" } } } as never,
|
||||
context: makeContext() as never,
|
||||
req: { type: "req", id: "req-node-pending-drain", method: "node.pending.drain" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.drainNodePendingWork).toHaveBeenCalledWith("ios-node-1", {
|
||||
maxItems: 3,
|
||||
includeDefaultStatus: true,
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
{
|
||||
nodeId: "ios-node-1",
|
||||
revision: 2,
|
||||
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||
hasMore: false,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects node.pending.drain without a connected device identity", async () => {
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.drain"]({
|
||||
params: {},
|
||||
respond: respond as never,
|
||||
client: null,
|
||||
context: makeContext() as never,
|
||||
req: { type: "req", id: "req-node-pending-drain-missing", method: "node.pending.drain" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
const call = respond.mock.calls[0] as RespondCall | undefined;
|
||||
expect(call?.[0]).toBe(false);
|
||||
expect(call?.[2]?.message).toContain("connected device identity");
|
||||
});
|
||||
|
||||
it("enqueues pending work and wakes a disconnected node once", async () => {
|
||||
mocks.enqueueNodePendingWork.mockReturnValue({
|
||||
revision: 4,
|
||||
deduped: false,
|
||||
item: {
|
||||
id: "pending-1",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
createdAtMs: 100,
|
||||
expiresAtMs: null,
|
||||
},
|
||||
});
|
||||
mocks.maybeWakeNodeWithApns.mockResolvedValue({
|
||||
available: true,
|
||||
throttled: false,
|
||||
path: "apns",
|
||||
durationMs: 12,
|
||||
apnsStatus: 200,
|
||||
apnsReason: null,
|
||||
});
|
||||
let connected = false;
|
||||
mocks.waitForNodeReconnect.mockImplementation(async () => {
|
||||
connected = true;
|
||||
return true;
|
||||
});
|
||||
const context = makeContext({
|
||||
nodeRegistry: {
|
||||
get: vi.fn(() => (connected ? { nodeId: "ios-node-2" } : undefined)),
|
||||
},
|
||||
});
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.enqueue"]({
|
||||
params: {
|
||||
nodeId: "ios-node-2",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
},
|
||||
respond: respond as never,
|
||||
client: null,
|
||||
context: context as never,
|
||||
req: { type: "req", id: "req-node-pending-enqueue", method: "node.pending.enqueue" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.enqueueNodePendingWork).toHaveBeenCalledWith({
|
||||
nodeId: "ios-node-2",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
expiresInMs: undefined,
|
||||
});
|
||||
expect(mocks.maybeWakeNodeWithApns).toHaveBeenCalledWith("ios-node-2", {
|
||||
wakeReason: "node.pending",
|
||||
});
|
||||
expect(mocks.waitForNodeReconnect).toHaveBeenCalledWith({
|
||||
nodeId: "ios-node-2",
|
||||
context,
|
||||
timeoutMs: 3_000,
|
||||
});
|
||||
expect(mocks.maybeSendNodeWakeNudge).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({
|
||||
nodeId: "ios-node-2",
|
||||
revision: 4,
|
||||
wakeTriggered: true,
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
});
|
||||
159
src/gateway/server-methods/nodes-pending.ts
Normal file
159
src/gateway/server-methods/nodes-pending.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import {
|
||||
drainNodePendingWork,
|
||||
enqueueNodePendingWork,
|
||||
type NodePendingWorkPriority,
|
||||
type NodePendingWorkType,
|
||||
} from "../node-pending-work.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
validateNodePendingDrainParams,
|
||||
validateNodePendingEnqueueParams,
|
||||
} from "../protocol/index.js";
|
||||
import { respondInvalidParams, respondUnavailableOnThrow } from "./nodes.helpers.js";
|
||||
import {
|
||||
maybeSendNodeWakeNudge,
|
||||
maybeWakeNodeWithApns,
|
||||
NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||
NODE_WAKE_RECONNECT_WAIT_MS,
|
||||
waitForNodeReconnect,
|
||||
} from "./nodes.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
function resolveClientNodeId(
|
||||
client: { connect?: { device?: { id?: string }; client?: { id?: string } } } | null,
|
||||
): string | null {
|
||||
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id ?? "";
|
||||
const trimmed = nodeId.trim();
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
export const nodePendingHandlers: GatewayRequestHandlers = {
|
||||
"node.pending.drain": async ({ params, respond, client }) => {
|
||||
if (!validateNodePendingDrainParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
method: "node.pending.drain",
|
||||
validator: validateNodePendingDrainParams,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const nodeId = resolveClientNodeId(client);
|
||||
if (!nodeId) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
"node.pending.drain requires a connected device identity",
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const p = params as { maxItems?: number };
|
||||
const drained = drainNodePendingWork(nodeId, {
|
||||
maxItems: p.maxItems,
|
||||
includeDefaultStatus: true,
|
||||
});
|
||||
respond(true, { nodeId, ...drained }, undefined);
|
||||
},
|
||||
"node.pending.enqueue": async ({ params, respond, context }) => {
|
||||
if (!validateNodePendingEnqueueParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
method: "node.pending.enqueue",
|
||||
validator: validateNodePendingEnqueueParams,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const p = params as {
|
||||
nodeId: string;
|
||||
type: NodePendingWorkType;
|
||||
priority?: NodePendingWorkPriority;
|
||||
expiresInMs?: number;
|
||||
wake?: boolean;
|
||||
};
|
||||
await respondUnavailableOnThrow(respond, async () => {
|
||||
const queued = enqueueNodePendingWork({
|
||||
nodeId: p.nodeId,
|
||||
type: p.type,
|
||||
priority: p.priority,
|
||||
expiresInMs: p.expiresInMs,
|
||||
});
|
||||
let wakeTriggered = false;
|
||||
if (p.wake !== false && !queued.deduped && !context.nodeRegistry.get(p.nodeId)) {
|
||||
const wakeReqId = queued.item.id;
|
||||
context.logGateway.info(
|
||||
`node pending wake start node=${p.nodeId} req=${wakeReqId} type=${queued.item.type}`,
|
||||
);
|
||||
const wake = await maybeWakeNodeWithApns(p.nodeId, { wakeReason: "node.pending" });
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wake1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`available=${wake.available} throttled=${wake.throttled} ` +
|
||||
`path=${wake.path} durationMs=${wake.durationMs} ` +
|
||||
`apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`,
|
||||
);
|
||||
wakeTriggered = wake.available;
|
||||
if (wake.available) {
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId: p.nodeId,
|
||||
context,
|
||||
timeoutMs: NODE_WAKE_RECONNECT_WAIT_MS,
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wait1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_WAIT_MS}`,
|
||||
);
|
||||
}
|
||||
if (!context.nodeRegistry.get(p.nodeId) && wake.available) {
|
||||
const retryWake = await maybeWakeNodeWithApns(p.nodeId, {
|
||||
force: true,
|
||||
wakeReason: "node.pending",
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wake2 node=${p.nodeId} req=${wakeReqId} force=true ` +
|
||||
`available=${retryWake.available} throttled=${retryWake.throttled} ` +
|
||||
`path=${retryWake.path} durationMs=${retryWake.durationMs} ` +
|
||||
`apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`,
|
||||
);
|
||||
if (retryWake.available) {
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId: p.nodeId,
|
||||
context,
|
||||
timeoutMs: NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wait2 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_RETRY_WAIT_MS}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (!context.nodeRegistry.get(p.nodeId)) {
|
||||
const nudge = await maybeSendNodeWakeNudge(p.nodeId);
|
||||
context.logGateway.info(
|
||||
`node pending wake nudge node=${p.nodeId} req=${wakeReqId} sent=${nudge.sent} ` +
|
||||
`throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` +
|
||||
`apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`,
|
||||
);
|
||||
context.logGateway.warn(
|
||||
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=false reason=not_connected`,
|
||||
);
|
||||
} else {
|
||||
context.logGateway.info(
|
||||
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=true`,
|
||||
);
|
||||
}
|
||||
}
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
nodeId: p.nodeId,
|
||||
revision: queued.revision,
|
||||
queued: queued.item,
|
||||
wakeTriggered,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
},
|
||||
};
|
||||
@@ -47,9 +47,9 @@ import {
|
||||
} from "./nodes.helpers.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
export const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
export const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
export const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
const NODE_WAKE_THROTTLE_MS = 15_000;
|
||||
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
|
||||
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
|
||||
@@ -208,9 +208,9 @@ function toPendingParamsJSON(params: unknown): string | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeWakeNodeWithApns(
|
||||
export async function maybeWakeNodeWithApns(
|
||||
nodeId: string,
|
||||
opts?: { force?: boolean },
|
||||
opts?: { force?: boolean; wakeReason?: string },
|
||||
): Promise<NodeWakeAttempt> {
|
||||
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
||||
nodeWakeById.set(nodeId, state);
|
||||
@@ -253,7 +253,7 @@ async function maybeWakeNodeWithApns(
|
||||
auth: auth.value,
|
||||
registration,
|
||||
nodeId,
|
||||
wakeReason: "node.invoke",
|
||||
wakeReason: opts?.wakeReason ?? "node.invoke",
|
||||
});
|
||||
if (!wakeResult.ok) {
|
||||
return withDuration({
|
||||
@@ -298,7 +298,7 @@ async function maybeWakeNodeWithApns(
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
export async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
const startedAtMs = Date.now();
|
||||
const withDuration = (
|
||||
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
|
||||
@@ -362,7 +362,7 @@ async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAtte
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForNodeReconnect(params: {
|
||||
export async function waitForNodeReconnect(params: {
|
||||
nodeId: string;
|
||||
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
||||
timeoutMs?: number;
|
||||
|
||||
Reference in New Issue
Block a user