mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 04:50:44 +00:00
Gateway: add pending node work primitives (#41409)
Merged via squash.
Prepared head SHA: a6d7ca90d7
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { nodePendingHandlers } from "./nodes-pending.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
drainNodePendingWork: vi.fn(),
|
||||
enqueueNodePendingWork: vi.fn(),
|
||||
maybeWakeNodeWithApns: vi.fn(),
|
||||
maybeSendNodeWakeNudge: vi.fn(),
|
||||
waitForNodeReconnect: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../node-pending-work.js", () => ({
|
||||
drainNodePendingWork: mocks.drainNodePendingWork,
|
||||
enqueueNodePendingWork: mocks.enqueueNodePendingWork,
|
||||
}));
|
||||
|
||||
vi.mock("./nodes.js", () => ({
|
||||
NODE_WAKE_RECONNECT_WAIT_MS: 3_000,
|
||||
NODE_WAKE_RECONNECT_RETRY_WAIT_MS: 12_000,
|
||||
maybeWakeNodeWithApns: mocks.maybeWakeNodeWithApns,
|
||||
maybeSendNodeWakeNudge: mocks.maybeSendNodeWakeNudge,
|
||||
waitForNodeReconnect: mocks.waitForNodeReconnect,
|
||||
}));
|
||||
|
||||
type RespondCall = [
|
||||
boolean,
|
||||
unknown?,
|
||||
{
|
||||
code?: number;
|
||||
message?: string;
|
||||
details?: unknown;
|
||||
}?,
|
||||
];
|
||||
|
||||
function makeContext(overrides?: Partial<Record<string, unknown>>) {
|
||||
return {
|
||||
nodeRegistry: {
|
||||
get: vi.fn(() => undefined),
|
||||
},
|
||||
logGateway: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("node.pending handlers", () => {
|
||||
beforeEach(() => {
|
||||
mocks.drainNodePendingWork.mockReset();
|
||||
mocks.enqueueNodePendingWork.mockReset();
|
||||
mocks.maybeWakeNodeWithApns.mockReset();
|
||||
mocks.maybeSendNodeWakeNudge.mockReset();
|
||||
mocks.waitForNodeReconnect.mockReset();
|
||||
});
|
||||
|
||||
it("drains pending work for the connected node identity", async () => {
|
||||
mocks.drainNodePendingWork.mockReturnValue({
|
||||
revision: 2,
|
||||
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||
hasMore: false,
|
||||
});
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.drain"]({
|
||||
params: { maxItems: 3 },
|
||||
respond: respond as never,
|
||||
client: { connect: { device: { id: "ios-node-1" } } } as never,
|
||||
context: makeContext() as never,
|
||||
req: { type: "req", id: "req-node-pending-drain", method: "node.pending.drain" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.drainNodePendingWork).toHaveBeenCalledWith("ios-node-1", {
|
||||
maxItems: 3,
|
||||
includeDefaultStatus: true,
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
{
|
||||
nodeId: "ios-node-1",
|
||||
revision: 2,
|
||||
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||
hasMore: false,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects node.pending.drain without a connected device identity", async () => {
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.drain"]({
|
||||
params: {},
|
||||
respond: respond as never,
|
||||
client: null,
|
||||
context: makeContext() as never,
|
||||
req: { type: "req", id: "req-node-pending-drain-missing", method: "node.pending.drain" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
const call = respond.mock.calls[0] as RespondCall | undefined;
|
||||
expect(call?.[0]).toBe(false);
|
||||
expect(call?.[2]?.message).toContain("connected device identity");
|
||||
});
|
||||
|
||||
it("enqueues pending work and wakes a disconnected node once", async () => {
|
||||
mocks.enqueueNodePendingWork.mockReturnValue({
|
||||
revision: 4,
|
||||
deduped: false,
|
||||
item: {
|
||||
id: "pending-1",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
createdAtMs: 100,
|
||||
expiresAtMs: null,
|
||||
},
|
||||
});
|
||||
mocks.maybeWakeNodeWithApns.mockResolvedValue({
|
||||
available: true,
|
||||
throttled: false,
|
||||
path: "apns",
|
||||
durationMs: 12,
|
||||
apnsStatus: 200,
|
||||
apnsReason: null,
|
||||
});
|
||||
let connected = false;
|
||||
mocks.waitForNodeReconnect.mockImplementation(async () => {
|
||||
connected = true;
|
||||
return true;
|
||||
});
|
||||
const context = makeContext({
|
||||
nodeRegistry: {
|
||||
get: vi.fn(() => (connected ? { nodeId: "ios-node-2" } : undefined)),
|
||||
},
|
||||
});
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.enqueue"]({
|
||||
params: {
|
||||
nodeId: "ios-node-2",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
},
|
||||
respond: respond as never,
|
||||
client: null,
|
||||
context: context as never,
|
||||
req: { type: "req", id: "req-node-pending-enqueue", method: "node.pending.enqueue" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.enqueueNodePendingWork).toHaveBeenCalledWith({
|
||||
nodeId: "ios-node-2",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
expiresInMs: undefined,
|
||||
});
|
||||
expect(mocks.maybeWakeNodeWithApns).toHaveBeenCalledWith("ios-node-2", {
|
||||
wakeReason: "node.pending",
|
||||
});
|
||||
expect(mocks.waitForNodeReconnect).toHaveBeenCalledWith({
|
||||
nodeId: "ios-node-2",
|
||||
context,
|
||||
timeoutMs: 3_000,
|
||||
});
|
||||
expect(mocks.maybeSendNodeWakeNudge).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({
|
||||
nodeId: "ios-node-2",
|
||||
revision: 4,
|
||||
wakeTriggered: true,
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
});
|
||||
159
src/gateway/server-methods/nodes-pending.ts
Normal file
159
src/gateway/server-methods/nodes-pending.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import {
|
||||
drainNodePendingWork,
|
||||
enqueueNodePendingWork,
|
||||
type NodePendingWorkPriority,
|
||||
type NodePendingWorkType,
|
||||
} from "../node-pending-work.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
validateNodePendingDrainParams,
|
||||
validateNodePendingEnqueueParams,
|
||||
} from "../protocol/index.js";
|
||||
import { respondInvalidParams, respondUnavailableOnThrow } from "./nodes.helpers.js";
|
||||
import {
|
||||
maybeSendNodeWakeNudge,
|
||||
maybeWakeNodeWithApns,
|
||||
NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||
NODE_WAKE_RECONNECT_WAIT_MS,
|
||||
waitForNodeReconnect,
|
||||
} from "./nodes.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
function resolveClientNodeId(
|
||||
client: { connect?: { device?: { id?: string }; client?: { id?: string } } } | null,
|
||||
): string | null {
|
||||
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id ?? "";
|
||||
const trimmed = nodeId.trim();
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
export const nodePendingHandlers: GatewayRequestHandlers = {
|
||||
"node.pending.drain": async ({ params, respond, client }) => {
|
||||
if (!validateNodePendingDrainParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
method: "node.pending.drain",
|
||||
validator: validateNodePendingDrainParams,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const nodeId = resolveClientNodeId(client);
|
||||
if (!nodeId) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
"node.pending.drain requires a connected device identity",
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const p = params as { maxItems?: number };
|
||||
const drained = drainNodePendingWork(nodeId, {
|
||||
maxItems: p.maxItems,
|
||||
includeDefaultStatus: true,
|
||||
});
|
||||
respond(true, { nodeId, ...drained }, undefined);
|
||||
},
|
||||
"node.pending.enqueue": async ({ params, respond, context }) => {
|
||||
if (!validateNodePendingEnqueueParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
method: "node.pending.enqueue",
|
||||
validator: validateNodePendingEnqueueParams,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const p = params as {
|
||||
nodeId: string;
|
||||
type: NodePendingWorkType;
|
||||
priority?: NodePendingWorkPriority;
|
||||
expiresInMs?: number;
|
||||
wake?: boolean;
|
||||
};
|
||||
await respondUnavailableOnThrow(respond, async () => {
|
||||
const queued = enqueueNodePendingWork({
|
||||
nodeId: p.nodeId,
|
||||
type: p.type,
|
||||
priority: p.priority,
|
||||
expiresInMs: p.expiresInMs,
|
||||
});
|
||||
let wakeTriggered = false;
|
||||
if (p.wake !== false && !queued.deduped && !context.nodeRegistry.get(p.nodeId)) {
|
||||
const wakeReqId = queued.item.id;
|
||||
context.logGateway.info(
|
||||
`node pending wake start node=${p.nodeId} req=${wakeReqId} type=${queued.item.type}`,
|
||||
);
|
||||
const wake = await maybeWakeNodeWithApns(p.nodeId, { wakeReason: "node.pending" });
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wake1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`available=${wake.available} throttled=${wake.throttled} ` +
|
||||
`path=${wake.path} durationMs=${wake.durationMs} ` +
|
||||
`apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`,
|
||||
);
|
||||
wakeTriggered = wake.available;
|
||||
if (wake.available) {
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId: p.nodeId,
|
||||
context,
|
||||
timeoutMs: NODE_WAKE_RECONNECT_WAIT_MS,
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wait1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_WAIT_MS}`,
|
||||
);
|
||||
}
|
||||
if (!context.nodeRegistry.get(p.nodeId) && wake.available) {
|
||||
const retryWake = await maybeWakeNodeWithApns(p.nodeId, {
|
||||
force: true,
|
||||
wakeReason: "node.pending",
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wake2 node=${p.nodeId} req=${wakeReqId} force=true ` +
|
||||
`available=${retryWake.available} throttled=${retryWake.throttled} ` +
|
||||
`path=${retryWake.path} durationMs=${retryWake.durationMs} ` +
|
||||
`apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`,
|
||||
);
|
||||
if (retryWake.available) {
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId: p.nodeId,
|
||||
context,
|
||||
timeoutMs: NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wait2 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_RETRY_WAIT_MS}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (!context.nodeRegistry.get(p.nodeId)) {
|
||||
const nudge = await maybeSendNodeWakeNudge(p.nodeId);
|
||||
context.logGateway.info(
|
||||
`node pending wake nudge node=${p.nodeId} req=${wakeReqId} sent=${nudge.sent} ` +
|
||||
`throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` +
|
||||
`apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`,
|
||||
);
|
||||
context.logGateway.warn(
|
||||
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=false reason=not_connected`,
|
||||
);
|
||||
} else {
|
||||
context.logGateway.info(
|
||||
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=true`,
|
||||
);
|
||||
}
|
||||
}
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
nodeId: p.nodeId,
|
||||
revision: queued.revision,
|
||||
queued: queued.item,
|
||||
wakeTriggered,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
},
|
||||
};
|
||||
@@ -47,9 +47,9 @@ import {
|
||||
} from "./nodes.helpers.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
export const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
export const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
export const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
const NODE_WAKE_THROTTLE_MS = 15_000;
|
||||
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
|
||||
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
|
||||
@@ -208,9 +208,9 @@ function toPendingParamsJSON(params: unknown): string | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeWakeNodeWithApns(
|
||||
export async function maybeWakeNodeWithApns(
|
||||
nodeId: string,
|
||||
opts?: { force?: boolean },
|
||||
opts?: { force?: boolean; wakeReason?: string },
|
||||
): Promise<NodeWakeAttempt> {
|
||||
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
||||
nodeWakeById.set(nodeId, state);
|
||||
@@ -253,7 +253,7 @@ async function maybeWakeNodeWithApns(
|
||||
auth: auth.value,
|
||||
registration,
|
||||
nodeId,
|
||||
wakeReason: "node.invoke",
|
||||
wakeReason: opts?.wakeReason ?? "node.invoke",
|
||||
});
|
||||
if (!wakeResult.ok) {
|
||||
return withDuration({
|
||||
@@ -298,7 +298,7 @@ async function maybeWakeNodeWithApns(
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
export async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
const startedAtMs = Date.now();
|
||||
const withDuration = (
|
||||
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
|
||||
@@ -362,7 +362,7 @@ async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAtte
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForNodeReconnect(params: {
|
||||
export async function waitForNodeReconnect(params: {
|
||||
nodeId: string;
|
||||
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
||||
timeoutMs?: number;
|
||||
|
||||
Reference in New Issue
Block a user