diff --git a/scripts/test-parallel.mjs b/scripts/test-parallel.mjs index 6ce82561969..fc5081d19f2 100644 --- a/scripts/test-parallel.mjs +++ b/scripts/test-parallel.mjs @@ -165,7 +165,7 @@ const defaultWorkerBudget = unit: Math.max(2, Math.min(8, Math.floor(localWorkers / 2))), unitIsolated: 1, extensions: Math.max(1, Math.min(4, Math.floor(localWorkers / 4))), - gateway: 1, + gateway: 2, }; // Keep worker counts predictable for local runs; trim macOS CI workers to avoid worker crashes/OOM. diff --git a/src/gateway/client.maxpayload.test.ts b/src/gateway/client.maxpayload.test.ts deleted file mode 100644 index acd23da179e..00000000000 --- a/src/gateway/client.maxpayload.test.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { describe, expect, test, vi } from "vitest"; -import { GatewayClient } from "./client.js"; - -const wsMockState = vi.hoisted(() => ({ - last: null as { url: unknown; opts: unknown } | null, -})); - -vi.mock("ws", () => ({ - WebSocket: class MockWebSocket { - on = vi.fn(); - close = vi.fn(); - send = vi.fn(); - - constructor(url: unknown, opts: unknown) { - wsMockState.last = { url, opts }; - } - }, -})); - -describe("GatewayClient", () => { - test("uses a large maxPayload for node snapshots", () => { - wsMockState.last = null; - const client = new GatewayClient({ url: "ws://127.0.0.1:1" }); - client.start(); - - expect(wsMockState.last?.url).toBe("ws://127.0.0.1:1"); - expect(wsMockState.last?.opts).toEqual( - expect.objectContaining({ maxPayload: 25 * 1024 * 1024 }), - ); - }); -}); diff --git a/src/gateway/control-ui.test.ts b/src/gateway/control-ui.test.ts deleted file mode 100644 index 13dd3020bf6..00000000000 --- a/src/gateway/control-ui.test.ts +++ /dev/null @@ -1,44 +0,0 @@ -import type { IncomingMessage, ServerResponse } from "node:http"; -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; -import { handleControlUiHttpRequest } from "./control-ui.js"; - -const makeResponse = (): { - res: ServerResponse; - setHeader: ReturnType; - end: ReturnType; -} => { - const setHeader = vi.fn(); - const end = vi.fn(); - const res = { - headersSent: false, - statusCode: 200, - setHeader, - end, - } as unknown as ServerResponse; - return { res, setHeader, end }; -}; - -describe("handleControlUiHttpRequest", () => { - it("sets anti-clickjacking headers for Control UI responses", async () => { - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ui-")); - try { - await fs.writeFile(path.join(tmp, "index.html"), "\n"); - const { res, setHeader } = makeResponse(); - const handled = handleControlUiHttpRequest( - { url: "/", method: "GET" } as IncomingMessage, - res, - { - root: { kind: "resolved", path: tmp }, - }, - ); - expect(handled).toBe(true); - expect(setHeader).toHaveBeenCalledWith("X-Frame-Options", "DENY"); - expect(setHeader).toHaveBeenCalledWith("Content-Security-Policy", "frame-ancestors 'none'"); - } finally { - await fs.rm(tmp, { recursive: true, force: true }); - } - }); -}); diff --git a/src/gateway/gateway-misc.test.ts b/src/gateway/gateway-misc.test.ts new file mode 100644 index 00000000000..abb3f7fcf93 --- /dev/null +++ b/src/gateway/gateway-misc.test.ts @@ -0,0 +1,283 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, test, vi } from "vitest"; +import type { RequestFrame } from "./protocol/index.js"; +import type { GatewayClient as GatewayMethodClient } from "./server-methods/types.js"; +import type { GatewayRequestContext, RespondFn } from "./server-methods/types.js"; +import type { GatewayWsClient } from "./server/ws-types.js"; +import { GatewayClient } from "./client.js"; +import { handleControlUiHttpRequest } from "./control-ui.js"; +import { + DEFAULT_DANGEROUS_NODE_COMMANDS, + resolveNodeCommandAllowlist, +} from "./node-command-policy.js"; +import { createGatewayBroadcaster } from "./server-broadcast.js"; +import { createChatRunRegistry } from "./server-chat.js"; +import { handleNodeInvokeResult } from "./server-methods/nodes.handlers.invoke-result.js"; +import { createNodeSubscriptionManager } from "./server-node-subscriptions.js"; + +const wsMockState = vi.hoisted(() => ({ + last: null as { url: unknown; opts: unknown } | null, +})); + +vi.mock("ws", () => ({ + WebSocket: class MockWebSocket { + on = vi.fn(); + close = vi.fn(); + send = vi.fn(); + + constructor(url: unknown, opts: unknown) { + wsMockState.last = { url, opts }; + } + }, +})); + +describe("GatewayClient", () => { + test("uses a large maxPayload for node snapshots", () => { + wsMockState.last = null; + const client = new GatewayClient({ url: "ws://127.0.0.1:1" }); + client.start(); + + expect(wsMockState.last?.url).toBe("ws://127.0.0.1:1"); + expect(wsMockState.last?.opts).toEqual( + expect.objectContaining({ maxPayload: 25 * 1024 * 1024 }), + ); + }); +}); + +const makeControlUiResponse = (): { + res: ServerResponse; + setHeader: ReturnType; + end: ReturnType; +} => { + const setHeader = vi.fn(); + const end = vi.fn(); + const res = { + headersSent: false, + statusCode: 200, + setHeader, + end, + } as unknown as ServerResponse; + return { res, setHeader, end }; +}; + +describe("handleControlUiHttpRequest", () => { + it("sets anti-clickjacking headers for Control UI responses", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ui-")); + try { + await fs.writeFile(path.join(tmp, "index.html"), "\n"); + const { res, setHeader } = makeControlUiResponse(); + const handled = handleControlUiHttpRequest( + { url: "/", method: "GET" } as IncomingMessage, + res, + { + root: { kind: "resolved", path: tmp }, + }, + ); + expect(handled).toBe(true); + expect(setHeader).toHaveBeenCalledWith("X-Frame-Options", "DENY"); + expect(setHeader).toHaveBeenCalledWith("Content-Security-Policy", "frame-ancestors 'none'"); + } finally { + await fs.rm(tmp, { recursive: true, force: true }); + } + }); +}); + +type TestSocket = { + bufferedAmount: number; + send: (payload: string) => void; + close: (code: number, reason: string) => void; +}; + +describe("gateway broadcaster", () => { + it("filters approval and pairing events by scope", () => { + const approvalsSocket: TestSocket = { + bufferedAmount: 0, + send: vi.fn(), + close: vi.fn(), + }; + const pairingSocket: TestSocket = { + bufferedAmount: 0, + send: vi.fn(), + close: vi.fn(), + }; + const readSocket: TestSocket = { + bufferedAmount: 0, + send: vi.fn(), + close: vi.fn(), + }; + + const clients = new Set([ + { + socket: approvalsSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.approvals"] } as GatewayWsClient["connect"], + connId: "c-approvals", + }, + { + socket: pairingSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"], + connId: "c-pairing", + }, + { + socket: readSocket as unknown as GatewayWsClient["socket"], + connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], + connId: "c-read", + }, + ]); + + const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients }); + + broadcast("exec.approval.requested", { id: "1" }); + broadcast("device.pair.requested", { requestId: "r1" }); + + expect(approvalsSocket.send).toHaveBeenCalledTimes(1); + expect(pairingSocket.send).toHaveBeenCalledTimes(1); + expect(readSocket.send).toHaveBeenCalledTimes(0); + + broadcastToConnIds("tick", { ts: 1 }, new Set(["c-read"])); + expect(readSocket.send).toHaveBeenCalledTimes(1); + expect(approvalsSocket.send).toHaveBeenCalledTimes(1); + expect(pairingSocket.send).toHaveBeenCalledTimes(1); + }); +}); + +describe("chat run registry", () => { + test("queues and removes runs per session", () => { + const registry = createChatRunRegistry(); + + registry.add("s1", { sessionKey: "main", clientRunId: "c1" }); + registry.add("s1", { sessionKey: "main", clientRunId: "c2" }); + + expect(registry.peek("s1")?.clientRunId).toBe("c1"); + expect(registry.shift("s1")?.clientRunId).toBe("c1"); + expect(registry.peek("s1")?.clientRunId).toBe("c2"); + + expect(registry.remove("s1", "c2")?.clientRunId).toBe("c2"); + expect(registry.peek("s1")).toBeUndefined(); + }); +}); + +describe("late-arriving invoke results", () => { + test("returns success for unknown invoke ids for both success and error payloads", async () => { + const nodeId = "node-123"; + const cases = [ + { + id: "unknown-invoke-id-12345", + ok: true, + payloadJSON: JSON.stringify({ result: "late" }), + }, + { + id: "another-unknown-invoke-id", + ok: false, + error: { code: "FAILED", message: "test error" }, + }, + ] as const; + + for (const params of cases) { + const respond = vi.fn(); + const context = { + nodeRegistry: { handleInvokeResult: () => false }, + logGateway: { debug: vi.fn() }, + } as unknown as GatewayRequestContext; + const client = { + connect: { device: { id: nodeId } }, + } as unknown as GatewayMethodClient; + + await handleNodeInvokeResult({ + req: { method: "node.invoke.result" } as unknown as RequestFrame, + params: { ...params, nodeId } as unknown as Record, + client, + isWebchatConnect: () => false, + respond, + context, + }); + + const [ok, payload, error] = respond.mock.lastCall ?? []; + + // Late-arriving results return success instead of error to reduce log noise. + expect(ok).toBe(true); + expect(error).toBeUndefined(); + expect(payload?.ok).toBe(true); + expect(payload?.ignored).toBe(true); + } + }); +}); + +describe("node subscription manager", () => { + test("routes events to subscribed nodes", () => { + const manager = createNodeSubscriptionManager(); + const sent: Array<{ + nodeId: string; + event: string; + payloadJSON?: string | null; + }> = []; + const sendEvent = (evt: { nodeId: string; event: string; payloadJSON?: string | null }) => + sent.push(evt); + + manager.subscribe("node-a", "main"); + manager.subscribe("node-b", "main"); + manager.sendToSession("main", "chat", { ok: true }, sendEvent); + + expect(sent).toHaveLength(2); + expect(sent.map((s) => s.nodeId).toSorted()).toEqual(["node-a", "node-b"]); + expect(sent[0].event).toBe("chat"); + }); + + test("unsubscribeAll clears session mappings", () => { + const manager = createNodeSubscriptionManager(); + const sent: string[] = []; + const sendEvent = (evt: { nodeId: string; event: string }) => + sent.push(`${evt.nodeId}:${evt.event}`); + + manager.subscribe("node-a", "main"); + manager.subscribe("node-a", "secondary"); + manager.unsubscribeAll("node-a"); + manager.sendToSession("main", "tick", {}, sendEvent); + manager.sendToSession("secondary", "tick", {}, sendEvent); + + expect(sent).toEqual([]); + }); +}); + +describe("resolveNodeCommandAllowlist", () => { + it("includes iOS service commands by default", () => { + const allow = resolveNodeCommandAllowlist( + {}, + { + platform: "ios 26.0", + deviceFamily: "iPhone", + }, + ); + + expect(allow.has("device.info")).toBe(true); + expect(allow.has("device.status")).toBe(true); + expect(allow.has("system.notify")).toBe(true); + expect(allow.has("contacts.search")).toBe(true); + expect(allow.has("calendar.events")).toBe(true); + expect(allow.has("reminders.list")).toBe(true); + expect(allow.has("photos.latest")).toBe(true); + expect(allow.has("motion.activity")).toBe(true); + + for (const cmd of DEFAULT_DANGEROUS_NODE_COMMANDS) { + expect(allow.has(cmd)).toBe(false); + } + }); + + it("can explicitly allow dangerous commands via allowCommands", () => { + const allow = resolveNodeCommandAllowlist( + { + gateway: { + nodes: { + allowCommands: ["camera.snap", "screen.record"], + }, + }, + }, + { platform: "ios", deviceFamily: "iPhone" }, + ); + expect(allow.has("camera.snap")).toBe(true); + expect(allow.has("screen.record")).toBe(true); + expect(allow.has("camera.clip")).toBe(false); + }); +}); diff --git a/src/gateway/node-command-policy.test.ts b/src/gateway/node-command-policy.test.ts deleted file mode 100644 index f96bd0eaf16..00000000000 --- a/src/gateway/node-command-policy.test.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { describe, expect, it } from "vitest"; -import { - DEFAULT_DANGEROUS_NODE_COMMANDS, - resolveNodeCommandAllowlist, -} from "./node-command-policy.js"; - -describe("resolveNodeCommandAllowlist", () => { - it("includes iOS service commands by default", () => { - const allow = resolveNodeCommandAllowlist( - {}, - { - platform: "ios 26.0", - deviceFamily: "iPhone", - }, - ); - - expect(allow.has("device.info")).toBe(true); - expect(allow.has("device.status")).toBe(true); - expect(allow.has("system.notify")).toBe(true); - expect(allow.has("contacts.search")).toBe(true); - expect(allow.has("calendar.events")).toBe(true); - expect(allow.has("reminders.list")).toBe(true); - expect(allow.has("photos.latest")).toBe(true); - expect(allow.has("motion.activity")).toBe(true); - - for (const cmd of DEFAULT_DANGEROUS_NODE_COMMANDS) { - expect(allow.has(cmd)).toBe(false); - } - }); - - it("can explicitly allow dangerous commands via allowCommands", () => { - const allow = resolveNodeCommandAllowlist( - { - gateway: { - nodes: { - allowCommands: ["camera.snap", "screen.record"], - }, - }, - }, - { platform: "ios", deviceFamily: "iPhone" }, - ); - expect(allow.has("camera.snap")).toBe(true); - expect(allow.has("screen.record")).toBe(true); - expect(allow.has("camera.clip")).toBe(false); - }); -}); diff --git a/src/gateway/server-broadcast.test.ts b/src/gateway/server-broadcast.test.ts deleted file mode 100644 index 2cb5855a98e..00000000000 --- a/src/gateway/server-broadcast.test.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; -import type { GatewayWsClient } from "./server/ws-types.js"; -import { createGatewayBroadcaster } from "./server-broadcast.js"; - -type TestSocket = { - bufferedAmount: number; - send: (payload: string) => void; - close: (code: number, reason: string) => void; -}; - -describe("gateway broadcaster", () => { - it("filters approval and pairing events by scope", () => { - const approvalsSocket: TestSocket = { - bufferedAmount: 0, - send: vi.fn(), - close: vi.fn(), - }; - const pairingSocket: TestSocket = { - bufferedAmount: 0, - send: vi.fn(), - close: vi.fn(), - }; - const readSocket: TestSocket = { - bufferedAmount: 0, - send: vi.fn(), - close: vi.fn(), - }; - - const clients = new Set([ - { - socket: approvalsSocket as unknown as GatewayWsClient["socket"], - connect: { role: "operator", scopes: ["operator.approvals"] } as GatewayWsClient["connect"], - connId: "c-approvals", - }, - { - socket: pairingSocket as unknown as GatewayWsClient["socket"], - connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"], - connId: "c-pairing", - }, - { - socket: readSocket as unknown as GatewayWsClient["socket"], - connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"], - connId: "c-read", - }, - ]); - - const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients }); - - broadcast("exec.approval.requested", { id: "1" }); - broadcast("device.pair.requested", { requestId: "r1" }); - - expect(approvalsSocket.send).toHaveBeenCalledTimes(1); - expect(pairingSocket.send).toHaveBeenCalledTimes(1); - expect(readSocket.send).toHaveBeenCalledTimes(0); - - broadcastToConnIds("tick", { ts: 1 }, new Set(["c-read"])); - expect(readSocket.send).toHaveBeenCalledTimes(1); - expect(approvalsSocket.send).toHaveBeenCalledTimes(1); - expect(pairingSocket.send).toHaveBeenCalledTimes(1); - }); -}); diff --git a/src/gateway/server-chat-registry.test.ts b/src/gateway/server-chat-registry.test.ts deleted file mode 100644 index 631b5bb5ee8..00000000000 --- a/src/gateway/server-chat-registry.test.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { describe, expect, test } from "vitest"; -import { createChatRunRegistry } from "./server-chat.js"; - -describe("chat run registry", () => { - test("queues and removes runs per session", () => { - const registry = createChatRunRegistry(); - - registry.add("s1", { sessionKey: "main", clientRunId: "c1" }); - registry.add("s1", { sessionKey: "main", clientRunId: "c2" }); - - expect(registry.peek("s1")?.clientRunId).toBe("c1"); - expect(registry.shift("s1")?.clientRunId).toBe("c1"); - expect(registry.peek("s1")?.clientRunId).toBe("c2"); - - expect(registry.remove("s1", "c2")?.clientRunId).toBe("c2"); - expect(registry.peek("s1")).toBeUndefined(); - }); -}); diff --git a/src/gateway/server-node-subscriptions.test.ts b/src/gateway/server-node-subscriptions.test.ts deleted file mode 100644 index 776e5a048f8..00000000000 --- a/src/gateway/server-node-subscriptions.test.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { describe, expect, test } from "vitest"; -import { createNodeSubscriptionManager } from "./server-node-subscriptions.js"; - -describe("node subscription manager", () => { - test("routes events to subscribed nodes", () => { - const manager = createNodeSubscriptionManager(); - const sent: Array<{ - nodeId: string; - event: string; - payloadJSON?: string | null; - }> = []; - const sendEvent = (evt: { nodeId: string; event: string; payloadJSON?: string | null }) => - sent.push(evt); - - manager.subscribe("node-a", "main"); - manager.subscribe("node-b", "main"); - manager.sendToSession("main", "chat", { ok: true }, sendEvent); - - expect(sent).toHaveLength(2); - expect(sent.map((s) => s.nodeId).toSorted()).toEqual(["node-a", "node-b"]); - expect(sent[0].event).toBe("chat"); - }); - - test("unsubscribeAll clears session mappings", () => { - const manager = createNodeSubscriptionManager(); - const sent: string[] = []; - const sendEvent = (evt: { nodeId: string; event: string }) => - sent.push(`${evt.nodeId}:${evt.event}`); - - manager.subscribe("node-a", "main"); - manager.subscribe("node-a", "secondary"); - manager.unsubscribeAll("node-a"); - manager.sendToSession("main", "tick", {}, sendEvent); - manager.sendToSession("secondary", "tick", {}, sendEvent); - - expect(sent).toEqual([]); - }); -}); diff --git a/src/gateway/server-reload.config-during-reply.test.ts b/src/gateway/server-reload.config-during-reply.test.ts deleted file mode 100644 index 3d8be89749b..00000000000 --- a/src/gateway/server-reload.config-during-reply.test.ts +++ /dev/null @@ -1,105 +0,0 @@ -/** - * E2E test for config reload during active reply sending. - * Tests that gateway restart is properly deferred until replies are sent. - */ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { - clearAllDispatchers, - getTotalPendingReplies, -} from "../auto-reply/reply/dispatcher-registry.js"; -import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; -import { getTotalQueueSize } from "../process/command-queue.js"; - -// Helper to flush all pending microtasks -async function flushMicrotasks() { - for (let i = 0; i < 10; i++) { - await Promise.resolve(); - } -} - -describe("gateway config reload during reply", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - afterEach(async () => { - vi.restoreAllMocks(); - // Wait for any pending microtasks (from markComplete()) to complete - await flushMicrotasks(); - clearAllDispatchers(); - }); - - it("should defer restart until reply dispatcher completes", async () => { - // Create a dispatcher (simulating message handling) - let deliveredReplies: string[] = []; - const dispatcher = createReplyDispatcher({ - deliver: async (payload) => { - // Keep delivery asynchronous without real wall-clock delay. - await Promise.resolve(); - deliveredReplies.push(payload.text ?? ""); - }, - onError: (err) => { - throw err; - }, - }); - - // Initially: pending=1 (reservation) - expect(getTotalPendingReplies()).toBe(1); - - // Simulate command finishing and enqueuing reply - dispatcher.sendFinalReply({ text: "Configuration updated successfully!" }); - - // Now: pending=2 (reservation + 1 enqueued reply) - expect(getTotalPendingReplies()).toBe(2); - - // Mark dispatcher complete (flags reservation for cleanup on last delivery) - dispatcher.markComplete(); - - // Reservation is still counted until the delivery .finally() clears it, - // but the important invariant is pending > 0 while delivery is in flight. - expect(getTotalPendingReplies()).toBeGreaterThan(0); - - // At this point, if gateway restart was requested, it should defer - // because getTotalPendingReplies() > 0 - - // Wait for reply to be delivered - await dispatcher.waitForIdle(); - - // Now: pending=0 (reply sent) - expect(getTotalPendingReplies()).toBe(0); - expect(deliveredReplies).toEqual(["Configuration updated successfully!"]); - - // Now restart can proceed safely - expect(getTotalQueueSize()).toBe(0); - expect(getTotalPendingReplies()).toBe(0); - }); - - it("should handle dispatcher reservation correctly when no replies sent", async () => { - const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); - - let deliverCalled = false; - const dispatcher = createReplyDispatcher({ - deliver: async () => { - deliverCalled = true; - }, - }); - - // Initially: pending=1 (reservation) - expect(getTotalPendingReplies()).toBe(1); - - // Mark complete without sending any replies - dispatcher.markComplete(); - - // Reservation is cleared via microtask — flush it - await flushMicrotasks(); - - // Now: pending=0 (reservation cleared, no replies were enqueued) - expect(getTotalPendingReplies()).toBe(0); - - // Wait for idle (should resolve immediately since no replies) - await dispatcher.waitForIdle(); - - expect(deliverCalled).toBe(false); - expect(getTotalPendingReplies()).toBe(0); - }); -}); diff --git a/src/gateway/server-reload.integration.test.ts b/src/gateway/server-reload.integration.test.ts deleted file mode 100644 index 9d788ad4947..00000000000 --- a/src/gateway/server-reload.integration.test.ts +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Integration test simulating full message handling + config change + reply flow. - * This tests the complete scenario where a user configures an adapter via chat - * and ensures they get a reply before the gateway restarts. - */ -import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; -import { - clearAllDispatchers, - getTotalPendingReplies, -} from "../auto-reply/reply/dispatcher-registry.js"; -import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; -import { getTotalQueueSize } from "../process/command-queue.js"; - -describe("gateway restart deferral integration", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - afterEach(async () => { - vi.restoreAllMocks(); - // Wait for any pending microtasks (from markComplete()) to complete - await Promise.resolve(); - clearAllDispatchers(); - }); - - it("should defer restart until dispatcher completes with reply", async () => { - const events: string[] = []; - - // T=0: Message received — dispatcher created (pending=1 reservation) - events.push("message-received"); - const deliveredReplies: Array<{ text: string; timestamp: number }> = []; - const dispatcher = createReplyDispatcher({ - deliver: async (payload) => { - // Keep delivery asynchronous without real wall-clock delay. - await Promise.resolve(); - deliveredReplies.push({ - text: payload.text ?? "", - timestamp: Date.now(), - }); - events.push(`reply-delivered: ${payload.text}`); - }, - }); - events.push("dispatcher-created"); - - // T=1: Config change detected - events.push("config-change-detected"); - - // Check if restart should be deferred - const queueSize = getTotalQueueSize(); - const pendingReplies = getTotalPendingReplies(); - const totalActive = queueSize + pendingReplies; - - events.push(`defer-check: queue=${queueSize} pending=${pendingReplies} total=${totalActive}`); - - // Should defer because dispatcher has reservation - expect(totalActive).toBeGreaterThan(0); - expect(pendingReplies).toBe(1); // reservation - - if (totalActive > 0) { - events.push("restart-deferred"); - } - - // T=2: Command finishes, enqueue replies - dispatcher.sendFinalReply({ text: "Adapter configured successfully!" }); - dispatcher.sendFinalReply({ text: "Gateway will restart to apply changes." }); - events.push("replies-enqueued"); - - // Now pending should be 3 (reservation + 2 replies) - expect(getTotalPendingReplies()).toBe(3); - - // Mark command complete (flags reservation for cleanup on last delivery) - dispatcher.markComplete(); - events.push("command-complete"); - - // Reservation still counted until delivery .finally() clears it, - // but the important invariant is pending > 0 while deliveries are in flight. - expect(getTotalPendingReplies()).toBeGreaterThan(0); - - // T=3: Wait for replies to be delivered - await dispatcher.waitForIdle(); - events.push("dispatcher-idle"); - - // Replies should be delivered - expect(deliveredReplies).toHaveLength(2); - expect(deliveredReplies[0].text).toBe("Adapter configured successfully!"); - expect(deliveredReplies[1].text).toBe("Gateway will restart to apply changes."); - - // Pending should be 0 - expect(getTotalPendingReplies()).toBe(0); - - // T=4: Check if restart can proceed - const finalQueueSize = getTotalQueueSize(); - const finalPendingReplies = getTotalPendingReplies(); - const finalTotalActive = finalQueueSize + finalPendingReplies; - - events.push( - `restart-check: queue=${finalQueueSize} pending=${finalPendingReplies} total=${finalTotalActive}`, - ); - - // Everything should be idle now - expect(finalTotalActive).toBe(0); - events.push("restart-can-proceed"); - - // Verify event sequence - expect(events).toEqual([ - "message-received", - "dispatcher-created", - "config-change-detected", - "defer-check: queue=0 pending=1 total=1", - "restart-deferred", - "replies-enqueued", - "command-complete", - "reply-delivered: Adapter configured successfully!", - "reply-delivered: Gateway will restart to apply changes.", - "dispatcher-idle", - "restart-check: queue=0 pending=0 total=0", - "restart-can-proceed", - ]); - }); -}); diff --git a/src/gateway/server-reload.real-scenario.test.ts b/src/gateway/server-restart-deferral.test.ts similarity index 56% rename from src/gateway/server-reload.real-scenario.test.ts rename to src/gateway/server-restart-deferral.test.ts index f26f660dc79..787e6d55d02 100644 --- a/src/gateway/server-reload.real-scenario.test.ts +++ b/src/gateway/server-restart-deferral.test.ts @@ -1,13 +1,17 @@ -/** - * REAL scenario test - simulates actual message handling with config changes. - * This test MUST fail if "imsg rpc not running" would occur in production. - */ -import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { clearAllDispatchers, getTotalPendingReplies, } from "../auto-reply/reply/dispatcher-registry.js"; import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; +import { getTotalQueueSize } from "../process/command-queue.js"; + +async function flushMicrotasks(count = 10): Promise { + for (let i = 0; i < count; i += 1) { + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(); + } +} function createDeferred() { let resolve!: (value: T | PromiseLike) => void; @@ -19,7 +23,7 @@ function createDeferred() { return { promise, resolve, reject }; } -describe("real scenario: config change during message processing", () => { +describe("gateway restart deferral", () => { let replyErrors: string[] = []; beforeEach(() => { @@ -29,12 +33,11 @@ describe("real scenario: config change during message processing", () => { afterEach(async () => { vi.restoreAllMocks(); - // Wait for any pending microtasks (from markComplete()) to complete - await Promise.resolve(); + await flushMicrotasks(); clearAllDispatchers(); }); - it("should NOT restart gateway while reply delivery is in flight", async () => { + it("defers restart while reply delivery is in flight", async () => { let rpcConnected = true; const deliveredReplies: string[] = []; const deliveryStarted = createDeferred(); @@ -53,7 +56,7 @@ describe("real scenario: config change during message processing", () => { deliveredReplies.push(payload.text ?? ""); }, onError: () => { - // Swallow delivery errors so the test can assert on replyErrors + // Swallow delivery errors so the test can assert on replyErrors. }, }); @@ -64,15 +67,11 @@ describe("real scenario: config change during message processing", () => { dispatcher.markComplete(); await deliveryStarted.promise; - // At this point: markComplete flagged, delivery is in flight. - // pending > 0 because the in-flight delivery keeps it alive. - const pendingDuringDelivery = getTotalPendingReplies(); - expect(pendingDuringDelivery).toBeGreaterThan(0); + // At this point: delivery is in flight; pending > 0 prevents restart. + expect(getTotalPendingReplies()).toBeGreaterThan(0); - // Simulate restart checks while delivery is in progress. - // If the tracking is broken, pending would be 0 and we'd restart. let restartTriggered = false; - for (let i = 0; i < 3; i++) { + for (let i = 0; i < 3; i += 1) { await Promise.resolve(); const pending = getTotalPendingReplies(); if (pending === 0) { @@ -83,54 +82,83 @@ describe("real scenario: config change during message processing", () => { } allowDelivery.resolve(); - // Wait for delivery to complete await dispatcher.waitForIdle(); - // Now pending should be 0 — restart can proceed expect(getTotalPendingReplies()).toBe(0); - - // CRITICAL: delivery must have succeeded without RPC being killed expect(restartTriggered).toBe(false); expect(replyErrors).toEqual([]); expect(deliveredReplies).toEqual(["Configuration updated!"]); }); - it("should keep pending > 0 until reply is actually enqueued", async () => { + it("keeps pending > 0 until the reply is actually enqueued", async () => { const allowDelivery = createDeferred(); const dispatcher = createReplyDispatcher({ - deliver: async (_payload) => { + deliver: async () => { await allowDelivery.promise; }, }); - // Initially: pending=1 (reservation) expect(getTotalPendingReplies()).toBe(1); - // Simulate command processing delay BEFORE reply is enqueued await Promise.resolve(); - - // During this delay, pending should STILL be 1 (reservation active) expect(getTotalPendingReplies()).toBe(1); - // Now enqueue reply dispatcher.sendFinalReply({ text: "Reply" }); - - // Now pending should be 2 (reservation + reply) expect(getTotalPendingReplies()).toBe(2); - // Mark complete dispatcher.markComplete(); - - // After markComplete, pending should still be > 0 if reply hasn't sent yet - const pendingAfterMarkComplete = getTotalPendingReplies(); - expect(pendingAfterMarkComplete).toBeGreaterThan(0); + expect(getTotalPendingReplies()).toBeGreaterThan(0); allowDelivery.resolve(); - // Wait for reply to send + await dispatcher.waitForIdle(); + expect(getTotalPendingReplies()).toBe(0); + }); + + it("defers restart until reply dispatcher completes", async () => { + const deliveredReplies: string[] = []; + const dispatcher = createReplyDispatcher({ + deliver: async (payload) => { + await Promise.resolve(); + deliveredReplies.push(payload.text ?? ""); + }, + onError: (err) => { + throw err; + }, + }); + + expect(getTotalPendingReplies()).toBe(1); + + dispatcher.sendFinalReply({ text: "Configuration updated successfully!" }); + expect(getTotalPendingReplies()).toBe(2); + + dispatcher.markComplete(); + expect(getTotalPendingReplies()).toBeGreaterThan(0); + await dispatcher.waitForIdle(); - // Now pending should be 0 + expect(getTotalPendingReplies()).toBe(0); + expect(deliveredReplies).toEqual(["Configuration updated successfully!"]); + expect(getTotalQueueSize()).toBe(0); + }); + + it("clears dispatcher reservation when no replies were sent", async () => { + let deliverCalled = false; + const dispatcher = createReplyDispatcher({ + deliver: async () => { + deliverCalled = true; + }, + }); + + expect(getTotalPendingReplies()).toBe(1); + + dispatcher.markComplete(); + await flushMicrotasks(); + + expect(getTotalPendingReplies()).toBe(0); + await dispatcher.waitForIdle(); + + expect(deliverCalled).toBe(false); expect(getTotalPendingReplies()).toBe(0); }); }); diff --git a/src/gateway/server.nodes.late-invoke.test.ts b/src/gateway/server.nodes.late-invoke.test.ts deleted file mode 100644 index e6b2fd5f2a1..00000000000 --- a/src/gateway/server.nodes.late-invoke.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { describe, expect, test, vi } from "vitest"; -import type { RequestFrame } from "./protocol/index.js"; -import type { GatewayClient, GatewayRequestContext, RespondFn } from "./server-methods/types.js"; -import { handleNodeInvokeResult } from "./server-methods/nodes.handlers.invoke-result.js"; - -describe("late-arriving invoke results", () => { - test("returns success for unknown invoke ids for both success and error payloads", async () => { - const nodeId = "node-123"; - const cases = [ - { - id: "unknown-invoke-id-12345", - ok: true, - payloadJSON: JSON.stringify({ result: "late" }), - }, - { - id: "another-unknown-invoke-id", - ok: false, - error: { code: "FAILED", message: "test error" }, - }, - ] as const; - - for (const params of cases) { - const respond = vi.fn(); - const context = { - nodeRegistry: { handleInvokeResult: () => false }, - logGateway: { debug: vi.fn() }, - } as unknown as GatewayRequestContext; - const client = { - connect: { device: { id: nodeId } }, - } as unknown as GatewayClient; - - await handleNodeInvokeResult({ - req: { method: "node.invoke.result" } as unknown as RequestFrame, - params: { ...params, nodeId } as unknown as Record, - client, - isWebchatConnect: () => false, - respond, - context, - }); - - const [ok, payload, error] = respond.mock.lastCall ?? []; - - // Late-arriving results return success instead of error to reduce log noise. - expect(ok).toBe(true); - expect(error).toBeUndefined(); - expect(payload?.ok).toBe(true); - expect(payload?.ignored).toBe(true); - } - }); -});