diff --git a/CHANGELOG.md b/CHANGELOG.md index 27ba741a858..43b2b8f719d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -227,6 +227,8 @@ Docs: https://docs.openclaw.ai - Agents/video generation: accept `agents.defaults.videoGenerationModel` in strict config validation and `openclaw config set/get`, so gateways using `video_generate` no longer fail to boot after enabling a video model. - Matrix/streaming: add a quiet preview mode for streamed Matrix replies, keep legacy `partial` preview-first behavior, and finalize quiet media captions correctly so previews stop notifying early without dropping final text semantics. (#61450) Thanks @gumadeiras. +- Gateway/shutdown: bound websocket-server shutdown even when no tracked clients remain, so gateway restarts stop hanging until the watchdog kills the process. (#61565) Thanks @mbelinky. + ## 2026.4.2 ### Breaking diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index 5211e62c58e..b325032a479 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -1,5 +1,10 @@ -import { describe, expect, it, vi } from "vitest"; -import { createGatewayCloseHandler } from "./server-close.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const mocks = { + logWarn: vi.fn(), +}; +const WEBSOCKET_CLOSE_GRACE_MS = 1_000; +const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; vi.mock("../channels/plugins/index.js", () => ({ listChannelPlugins: () => [], @@ -9,7 +14,20 @@ vi.mock("../hooks/gmail-watcher.js", () => ({ stopGmailWatcher: vi.fn(async () => undefined), })); +vi.mock("../logging/subsystem.js", () => ({ + createSubsystemLogger: vi.fn(() => ({ + warn: mocks.logWarn, + })), +})); + +const { createGatewayCloseHandler } = await import("./server-close.js"); + describe("createGatewayCloseHandler", () => { + beforeEach(() => { + vi.useRealTimers(); + mocks.logWarn.mockClear(); + }); + it("unsubscribes lifecycle listeners during shutdown", async () => { const lifecycleUnsub = vi.fn(); const stopTaskRegistryMaintenance = vi.fn(); @@ -49,4 +67,107 @@ describe("createGatewayCloseHandler", () => { expect(lifecycleUnsub).toHaveBeenCalledTimes(1); expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1); }); + + it("terminates lingering websocket clients when websocket close exceeds the grace window", async () => { + vi.useFakeTimers(); + + let closeCallback: (() => void) | null = null; + const terminate = vi.fn(() => { + closeCallback?.(); + }); + const close = createGatewayCloseHandler({ + bonjourStop: null, + tailscaleCleanup: null, + canvasHost: null, + canvasHostServer: null, + stopChannel: vi.fn(async () => undefined), + pluginServices: null, + cron: { stop: vi.fn() }, + heartbeatRunner: { stop: vi.fn() } as never, + updateCheckStop: null, + stopTaskRegistryMaintenance: null, + nodePresenceTimers: new Map(), + broadcast: vi.fn(), + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: null, + agentUnsub: null, + heartbeatUnsub: null, + transcriptUnsub: null, + lifecycleUnsub: null, + chatRunState: { clear: vi.fn() }, + clients: new Set(), + configReloader: { stop: vi.fn(async () => undefined) }, + wss: { + clients: new Set([{ terminate }]), + close: (cb: () => void) => { + closeCallback = cb; + }, + } as never, + httpServer: { + close: (cb: (err?: Error | null) => void) => cb(null), + closeIdleConnections: vi.fn(), + } as never, + }); + + const closePromise = close({ reason: "test shutdown" }); + await vi.advanceTimersByTimeAsync(WEBSOCKET_CLOSE_GRACE_MS); + await closePromise; + + expect(terminate).toHaveBeenCalledTimes(1); + expect( + mocks.logWarn.mock.calls.some(([message]) => + String(message).includes("websocket server close exceeded 1000ms"), + ), + ).toBe(true); + }); + + it("continues shutdown when websocket close hangs without tracked clients", async () => { + vi.useFakeTimers(); + + const close = createGatewayCloseHandler({ + bonjourStop: null, + tailscaleCleanup: null, + canvasHost: null, + canvasHostServer: null, + stopChannel: vi.fn(async () => undefined), + pluginServices: null, + cron: { stop: vi.fn() }, + heartbeatRunner: { stop: vi.fn() } as never, + updateCheckStop: null, + stopTaskRegistryMaintenance: null, + nodePresenceTimers: new Map(), + broadcast: vi.fn(), + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: null, + agentUnsub: null, + heartbeatUnsub: null, + transcriptUnsub: null, + lifecycleUnsub: null, + chatRunState: { clear: vi.fn() }, + clients: new Set(), + configReloader: { stop: vi.fn(async () => undefined) }, + wss: { + clients: new Set(), + close: () => undefined, + } as never, + httpServer: { + close: (cb: (err?: Error | null) => void) => cb(null), + closeIdleConnections: vi.fn(), + } as never, + }); + + const closePromise = close({ reason: "test shutdown" }); + await vi.advanceTimersByTimeAsync(WEBSOCKET_CLOSE_GRACE_MS + WEBSOCKET_CLOSE_FORCE_CONTINUE_MS); + await closePromise; + + expect( + mocks.logWarn.mock.calls.some(([message]) => + String(message).includes("websocket server close still pending after 250ms force window"), + ), + ).toBe(true); + }); }); diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 2147e51fb91..047d4e515d1 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -4,8 +4,13 @@ import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server. import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { stopGmailWatcher } from "../hooks/gmail-watcher.js"; import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import type { PluginServicesHandle } from "../plugins/services.js"; +const shutdownLog = createSubsystemLogger("gateway/shutdown"); +const WEBSOCKET_CLOSE_GRACE_MS = 1_000; +const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; + export function createGatewayCloseHandler(params: { bonjourStop: (() => Promise) | null; tailscaleCleanup: (() => Promise) | null; @@ -138,7 +143,35 @@ export function createGatewayCloseHandler(params: { } params.clients.clear(); await params.configReloader.stop().catch(() => {}); - await new Promise((resolve) => params.wss.close(() => resolve())); + const wsClients = params.wss.clients ?? new Set(); + const closePromise = new Promise((resolve) => params.wss.close(() => resolve())); + const closedWithinGrace = await Promise.race([ + closePromise.then(() => true), + new Promise((resolve) => setTimeout(() => resolve(false), WEBSOCKET_CLOSE_GRACE_MS)), + ]); + if (!closedWithinGrace) { + shutdownLog.warn( + `websocket server close exceeded ${WEBSOCKET_CLOSE_GRACE_MS}ms; forcing shutdown continuation with ${wsClients.size} tracked client(s)`, + ); + for (const client of wsClients) { + try { + client.terminate(); + } catch { + /* ignore */ + } + } + await Promise.race([ + closePromise, + new Promise((resolve) => + setTimeout(() => { + shutdownLog.warn( + `websocket server close still pending after ${WEBSOCKET_CLOSE_FORCE_CONTINUE_MS}ms force window; continuing shutdown`, + ); + resolve(); + }, WEBSOCKET_CLOSE_FORCE_CONTINUE_MS), + ), + ]); + } const servers = params.httpServers && params.httpServers.length > 0 ? params.httpServers