fix(gateway): improve shutdown error visibility and add close timeout

Adds structured warning collection to gateway shutdown, preserves lifecycle timeout handling, and covers HTTP/WebSocket/subsystem warning paths.

Co-authored-by: Eden <146086744+edenfunf@users.noreply.github.com>
Co-authored-by: vincentkoc <25068+vincentkoc@users.noreply.github.com>
This commit is contained in:
Eden
2026-04-29 10:01:11 +08:00
committed by GitHub
parent df9d26eb43
commit bb6a15da04
3 changed files with 316 additions and 154 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/shutdown: report structured shutdown warnings and HTTP close timeout warnings through `ShutdownResult` while preserving lifecycle hook hardening. Carries forward #41296. Thanks @edenfunf.
- Security/audit: recognize dangerous node command IDs as valid `gateway.nodes.denyCommands` entries, so audit only warns on real typos or unsupported patterns. (#56923) Thanks @chziyue.
- Telegram/exec approvals: stop treating general Telegram chat allowlists and `defaultTo` routes as native exec approvers; Telegram now uses explicit `execApprovals.approvers` or owner identity from `commands.ownerAllowFrom`, matching the first-pairing owner bootstrap path. Thanks @pashpashpash.
- Chat commands: route sensitive group `/diagnostics` and `/export-trajectory` approvals and results to a private owner route, preferring same-surface DMs before falling back to the first configured owner route, so Discord group invocations can land in Telegram when that is the primary owner interface. Thanks @pashpashpash.

View File

@@ -4,7 +4,9 @@ import type { InternalHookEvent } from "../hooks/internal-hooks.js";
type TriggerInternalHookMock = (event: InternalHookEvent) => Promise<void>;
const mocks = {
logInfo: vi.fn(),
logWarn: vi.fn(),
listChannelPlugins: vi.fn((): Array<{ id: "telegram" | "discord" }> => []),
disposeAgentHarnesses: vi.fn(async () => undefined),
disposeAllSessionMcpRuntimes: vi.fn(async () => undefined),
triggerInternalHook: vi.fn<TriggerInternalHookMock>(async (_event) => undefined),
@@ -20,7 +22,7 @@ vi.mock("../channels/plugins/index.js", async () => ({
...(await vi.importActual<typeof import("../channels/plugins/index.js")>(
"../channels/plugins/index.js",
)),
listChannelPlugins: () => [],
listChannelPlugins: mocks.listChannelPlugins,
}));
vi.mock("../hooks/gmail-watcher.js", () => ({
@@ -57,6 +59,7 @@ vi.mock("../agents/pi-bundle-lsp-runtime.js", async () => ({
vi.mock("../logging/subsystem.js", () => ({
createSubsystemLogger: vi.fn(() => ({
info: mocks.logInfo,
warn: mocks.logWarn,
})),
}));
@@ -107,8 +110,12 @@ function createGatewayCloseTestDeps(
describe("createGatewayCloseHandler", () => {
beforeEach(() => {
vi.useRealTimers();
mocks.logInfo.mockClear();
mocks.logWarn.mockClear();
mocks.listChannelPlugins.mockReset();
mocks.listChannelPlugins.mockReturnValue([]);
mocks.disposeAgentHarnesses.mockClear();
mocks.disposeAgentHarnesses.mockResolvedValue(undefined);
mocks.disposeAllSessionMcpRuntimes.mockClear();
mocks.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined);
mocks.triggerInternalHook.mockReset();
@@ -121,6 +128,19 @@ describe("createGatewayCloseHandler", () => {
vi.useRealTimers();
});
it("completes a clean shutdown with a ShutdownResult", async () => {
const deps = createGatewayCloseTestDeps();
const close = createGatewayCloseHandler(deps);
const result = await close({ reason: "test" });
expect(result.warnings).toEqual([]);
expect(result.durationMs).toBeGreaterThanOrEqual(0);
expect(deps.cron.stop).toHaveBeenCalledTimes(1);
expect(deps.heartbeatRunner.stop).toHaveBeenCalledTimes(1);
expect(deps.chatRunState.clear).toHaveBeenCalledTimes(1);
});
it("emits gateway shutdown and pre-restart hooks", async () => {
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
@@ -146,7 +166,7 @@ describe("createGatewayCloseHandler", () => {
});
});
it("continues shutdown when gateway shutdown hook stalls", async () => {
it("continues shutdown and records a warning when gateway shutdown hook stalls", async () => {
vi.useFakeTimers();
mocks.triggerInternalHook.mockImplementation((event: InternalHookEvent) => {
if (event.action === "shutdown") {
@@ -161,8 +181,9 @@ describe("createGatewayCloseHandler", () => {
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(GATEWAY_LIFECYCLE_HOOK_TIMEOUT_MS);
await closePromise;
const result = await closePromise;
expect(result.warnings).toContain("gateway:shutdown");
expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1);
expect(
mocks.logWarn.mock.calls.some(([message]) =>
@@ -171,7 +192,7 @@ describe("createGatewayCloseHandler", () => {
).toBe(true);
});
it("continues restart shutdown when gateway pre-restart hook stalls", async () => {
it("continues restart shutdown and records a warning when gateway pre-restart hook stalls", async () => {
vi.useFakeTimers();
mocks.triggerInternalHook.mockImplementation((event: InternalHookEvent) => {
if (event.action === "pre-restart") {
@@ -179,40 +200,68 @@ describe("createGatewayCloseHandler", () => {
}
return Promise.resolve(undefined);
});
const stopTaskRegistryMaintenance = vi.fn();
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({ stopTaskRegistryMaintenance }),
);
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
const closePromise = close({
reason: "test restart",
restartExpectedMs: 123,
});
await vi.advanceTimersByTimeAsync(GATEWAY_LIFECYCLE_HOOK_TIMEOUT_MS);
await closePromise;
const result = await closePromise;
expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1);
expect(result.warnings).toContain("gateway:pre-restart");
expect(mocks.triggerInternalHook).toHaveBeenCalledTimes(2);
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("gateway:pre-restart hook timed out after 1000ms"),
),
).toBe(true);
});
it("unsubscribes lifecycle listeners during shutdown", async () => {
it("records subsystem shutdown warnings without aborting later cleanup", async () => {
mocks.listChannelPlugins.mockReturnValue([{ id: "telegram" }, { id: "discord" }]);
const lifecycleUnsub = vi.fn();
const stopChannel = vi.fn(async (id: string) => {
if (id === "telegram") {
throw new Error("telegram stuck");
}
});
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({
bonjourStop: vi.fn(async () => {
throw new Error("mdns unavailable");
}),
canvasHost: {
close: vi.fn(async () => {
throw new Error("canvas error");
}),
} as never,
lifecycleUnsub,
stopChannel,
}),
);
const result = await close({ reason: "test shutdown" });
expect(result.warnings).toEqual(
expect.arrayContaining(["bonjour", "canvas-host", "channel/telegram"]),
);
expect(result.warnings).not.toContain("channel/discord");
expect(lifecycleUnsub).toHaveBeenCalledTimes(1);
expect(stopChannel).toHaveBeenCalledTimes(2);
});
it("unsubscribes lifecycle listeners and disposes bundle runtimes during shutdown", async () => {
const lifecycleUnsub = vi.fn();
const transcriptUnsub = vi.fn();
const stopTaskRegistryMaintenance = vi.fn();
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({
stopTaskRegistryMaintenance,
lifecycleUnsub,
transcriptUnsub,
}),
);
await close({ reason: "test shutdown" });
expect(lifecycleUnsub).toHaveBeenCalledTimes(1);
expect(transcriptUnsub).toHaveBeenCalledTimes(1);
expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1);
expect(mocks.disposeAgentHarnesses).toHaveBeenCalledTimes(1);
expect(mocks.disposeAllSessionMcpRuntimes).toHaveBeenCalledTimes(1);
@@ -247,15 +296,16 @@ describe("createGatewayCloseHandler", () => {
}
});
it("continues shutdown when bundle MCP runtime disposal hangs", async () => {
it("continues shutdown and records a warning when bundle MCP runtime disposal hangs", async () => {
vi.useFakeTimers();
mocks.disposeAllSessionMcpRuntimes.mockReturnValue(new Promise(() => undefined));
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(5_000);
await closePromise;
const result = await closePromise;
expect(result.warnings).toContain("bundle-mcp");
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("bundle-mcp runtime disposal exceeded 5000ms"),
@@ -263,15 +313,16 @@ describe("createGatewayCloseHandler", () => {
).toBe(true);
});
it("continues shutdown when bundle LSP runtime disposal hangs", async () => {
it("continues shutdown and records a warning when bundle LSP runtime disposal hangs", async () => {
vi.useFakeTimers();
mocks.disposeAllBundleLspRuntimes.mockReturnValue(new Promise(() => undefined));
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(5_000);
await closePromise;
const result = await closePromise;
expect(result.warnings).toContain("bundle-lsp");
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("bundle-lsp runtime disposal exceeded 5000ms"),
@@ -299,15 +350,11 @@ describe("createGatewayCloseHandler", () => {
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(WEBSOCKET_CLOSE_GRACE_MS);
await closePromise;
const result = await closePromise;
expect(result.warnings).toContain("websocket-server");
expect(terminate).toHaveBeenCalledTimes(1);
expect(vi.getTimerCount()).toBe(0);
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("websocket server close exceeded 1000ms"),
),
).toBe(true);
});
it("continues shutdown when websocket close hangs without tracked clients", async () => {
@@ -324,8 +371,9 @@ describe("createGatewayCloseHandler", () => {
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(WEBSOCKET_CLOSE_GRACE_MS + WEBSOCKET_CLOSE_FORCE_CONTINUE_MS);
await closePromise;
const result = await closePromise;
expect(result.warnings).toContain("websocket-server");
expect(vi.getTimerCount()).toBe(0);
expect(
mocks.logWarn.mock.calls.some(([message]) =>
@@ -334,7 +382,41 @@ describe("createGatewayCloseHandler", () => {
).toBe(true);
});
it("forces lingering HTTP connections closed when server close exceeds the grace window", async () => {
it("records a warning when a websocket client close throws", async () => {
const clients = new Set<GatewayCloseClient>([
{
socket: {
close: vi.fn(() => {
throw new Error("already closed");
}),
},
},
{ socket: { close: vi.fn() } },
]);
const close = createGatewayCloseHandler(createGatewayCloseTestDeps({ clients }));
const result = await close({ reason: "test shutdown" });
expect(result.warnings).toContain("ws-clients");
expect(clients.size).toBe(0);
});
it("records a warning when HTTP server close fails", async () => {
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({
httpServer: {
close: (cb: (err?: Error | null) => void) => cb(new Error("EADDRINUSE")),
closeIdleConnections: vi.fn(),
} as never,
}),
);
const result = await close({ reason: "test shutdown" });
expect(result.warnings).toContain("http-server");
});
it("forces lingering HTTP connections closed and records a timeout warning", async () => {
vi.useFakeTimers();
let closeCallback: ((err?: Error | null) => void) | null = null;
@@ -355,13 +437,14 @@ describe("createGatewayCloseHandler", () => {
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(HTTP_CLOSE_GRACE_MS);
await closePromise;
const result = await closePromise;
expect(result.warnings).toContain("http-server");
expect(closeAllConnections).toHaveBeenCalledTimes(1);
expect(vi.getTimerCount()).toBe(0);
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("http server close exceeded 1000ms"),
String(message).includes("http-server close exceeded 1000ms"),
),
).toBe(true);
});
@@ -381,47 +464,63 @@ describe("createGatewayCloseHandler", () => {
const closePromise = close({ reason: "test shutdown" });
const closeExpectation = expect(closePromise).rejects.toThrow(
"http server close still pending after forced connection shutdown (5000ms)",
"http-server close still pending after forced connection shutdown (5000ms)",
);
await vi.advanceTimersByTimeAsync(HTTP_CLOSE_GRACE_MS + HTTP_CLOSE_FORCE_WAIT_MS);
await closeExpectation;
expect(vi.getTimerCount()).toBe(0);
});
it("ignores unbound http servers during shutdown", async () => {
const close = createGatewayCloseHandler({
bonjourStop: null,
tailscaleCleanup: null,
canvasHost: null,
canvasHostServer: null,
stopChannel: vi.fn(async () => undefined),
pluginServices: null,
cron: { stop: vi.fn() },
heartbeatRunner: { stop: vi.fn() } as never,
updateCheckStop: null,
nodePresenceTimers: new Map(),
broadcast: vi.fn(),
tickInterval: setInterval(() => undefined, 60_000),
healthInterval: setInterval(() => undefined, 60_000),
dedupeCleanup: setInterval(() => undefined, 60_000),
mediaCleanup: null,
agentUnsub: null,
heartbeatUnsub: null,
transcriptUnsub: null,
lifecycleUnsub: null,
chatRunState: { clear: vi.fn() },
clients: new Set(),
configReloader: { stop: vi.fn(async () => undefined) },
wss: { close: (cb: () => void) => cb() } as never,
httpServer: {
close: (cb: (err?: NodeJS.ErrnoException | null) => void) =>
cb(
Object.assign(new Error("Server is not running."), { code: "ERR_SERVER_NOT_RUNNING" }),
),
closeIdleConnections: vi.fn(),
} as never,
});
it("labels warnings for multiple HTTP servers with their index", async () => {
const okServer = {
close: (cb: (err?: Error | null) => void) => cb(null),
closeIdleConnections: vi.fn(),
};
const failServer = {
close: (cb: (err?: Error | null) => void) => cb(new Error("port busy")),
closeIdleConnections: vi.fn(),
};
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({
httpServers: [okServer as never, failServer as never],
}),
);
await expect(close({ reason: "startup failed before bind" })).resolves.toBeUndefined();
const result = await close({ reason: "test shutdown" });
expect(result.warnings).toContain("http-server[1]");
expect(result.warnings).not.toContain("http-server[0]");
});
it("ignores unbound http servers during shutdown", async () => {
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({
httpServer: {
close: (cb: (err?: NodeJS.ErrnoException | null) => void) =>
cb(
Object.assign(new Error("Server is not running."), {
code: "ERR_SERVER_NOT_RUNNING",
}),
),
closeIdleConnections: vi.fn(),
} as never,
}),
);
await expect(close({ reason: "startup failed before bind" })).resolves.toMatchObject({
warnings: [],
});
});
it("broadcasts normalized shutdown metadata", async () => {
const deps = createGatewayCloseTestDeps();
const close = createGatewayCloseHandler(deps);
await close({ reason: " upgrade ", restartExpectedMs: Number.NaN });
expect(deps.broadcast).toHaveBeenCalledWith("shutdown", {
reason: "upgrade",
restartExpectedMs: null,
});
});
});

View File

@@ -20,6 +20,11 @@ const HTTP_CLOSE_FORCE_WAIT_MS = 5_000;
const MCP_RUNTIME_CLOSE_GRACE_MS = 5_000;
const LSP_RUNTIME_CLOSE_GRACE_MS = 5_000;
export type ShutdownResult = {
durationMs: number;
warnings: string[];
};
function createTimeoutRace<T>(timeoutMs: number, onTimeout: () => T) {
let timer: ReturnType<typeof setTimeout> | null = null;
timer = setTimeout(() => {
@@ -47,11 +52,33 @@ function createTimeoutRace<T>(timeoutMs: number, onTimeout: () => T) {
};
}
async function shutdownStep(
name: string,
fn: () => Promise<void> | void,
warnings: string[],
): Promise<boolean> {
try {
await fn();
return true;
} catch (err: unknown) {
const detail = err instanceof Error ? err.message : String(err);
shutdownLog.warn(`${name}: ${detail}`);
recordShutdownWarning(warnings, name);
return false;
}
}
function recordShutdownWarning(warnings: string[], name: string): void {
if (!warnings.includes(name)) {
warnings.push(name);
}
}
async function triggerGatewayLifecycleHookWithTimeout(params: {
event: ReturnType<typeof createInternalHookEvent>;
hookName: "gateway:shutdown" | "gateway:pre-restart";
timeoutMs: number;
}): Promise<void> {
}): Promise<"completed" | "timeout"> {
let timeout: ReturnType<typeof setTimeout> | undefined;
const hookPromise = triggerInternalHook(params.event);
void hookPromise.catch(() => undefined);
@@ -68,6 +95,7 @@ async function triggerGatewayLifecycleHookWithTimeout(params: {
`${params.hookName} hook timed out after ${params.timeoutMs}ms; continuing shutdown`,
);
}
return result;
} finally {
if (timeout) {
clearTimeout(timeout);
@@ -79,16 +107,19 @@ async function disposeRuntimeWithShutdownGrace(params: {
label: "bundle-mcp" | "bundle-lsp";
dispose: () => Promise<void>;
graceMs: number;
warnings: string[];
}): Promise<void> {
const disposePromise = Promise.resolve()
.then(params.dispose)
.catch((err: unknown) => {
shutdownLog.warn(`${params.label} runtime disposal failed during shutdown: ${String(err)}`);
recordShutdownWarning(params.warnings, params.label);
});
const disposeTimeout = createTimeoutRace(params.graceMs, () => {
shutdownLog.warn(
`${params.label} runtime disposal exceeded ${params.graceMs}ms; continuing shutdown`,
);
recordShutdownWarning(params.warnings, params.label);
});
await Promise.race([disposePromise, disposeTimeout.promise]);
disposeTimeout.clear();
@@ -168,7 +199,12 @@ export function createGatewayCloseHandler(params: {
httpServer: HttpServer;
httpServers?: HttpServer[];
}) {
return async (opts?: { reason?: string; restartExpectedMs?: number | null }) => {
return async (opts?: {
reason?: string;
restartExpectedMs?: number | null;
}): Promise<ShutdownResult> => {
const start = Date.now();
const warnings: string[] = [];
try {
const reasonRaw = normalizeOptionalString(opts?.reason) ?? "";
const reason = reasonRaw || "gateway stopping";
@@ -176,91 +212,93 @@ export function createGatewayCloseHandler(params: {
typeof opts?.restartExpectedMs === "number" && Number.isFinite(opts.restartExpectedMs)
? Math.max(0, Math.floor(opts.restartExpectedMs))
: null;
try {
const shutdownEvent = createInternalHookEvent("gateway", "shutdown", "gateway:shutdown", {
reason,
restartExpectedMs,
});
await triggerGatewayLifecycleHookWithTimeout({
event: shutdownEvent,
hookName: "gateway:shutdown",
timeoutMs: GATEWAY_SHUTDOWN_HOOK_TIMEOUT_MS,
});
if (restartExpectedMs !== null) {
const preRestartEvent = createInternalHookEvent(
"gateway",
"pre-restart",
"gateway:pre-restart",
{
reason,
restartExpectedMs,
},
);
await triggerGatewayLifecycleHookWithTimeout({
event: preRestartEvent,
hookName: "gateway:pre-restart",
timeoutMs: GATEWAY_PRE_RESTART_HOOK_TIMEOUT_MS,
shutdownLog.info(`shutdown started: ${reason}`);
await shutdownStep(
"gateway:shutdown",
async () => {
const shutdownEvent = createInternalHookEvent("gateway", "shutdown", "gateway:shutdown", {
reason,
restartExpectedMs,
});
}
} catch {
// Best-effort only; shutdown should proceed even if hooks fail.
const result = await triggerGatewayLifecycleHookWithTimeout({
event: shutdownEvent,
hookName: "gateway:shutdown",
timeoutMs: GATEWAY_SHUTDOWN_HOOK_TIMEOUT_MS,
});
if (result === "timeout") {
recordShutdownWarning(warnings, "gateway:shutdown");
}
},
warnings,
);
if (restartExpectedMs !== null) {
await shutdownStep(
"gateway:pre-restart",
async () => {
const preRestartEvent = createInternalHookEvent(
"gateway",
"pre-restart",
"gateway:pre-restart",
{
reason,
restartExpectedMs,
},
);
const result = await triggerGatewayLifecycleHookWithTimeout({
event: preRestartEvent,
hookName: "gateway:pre-restart",
timeoutMs: GATEWAY_PRE_RESTART_HOOK_TIMEOUT_MS,
});
if (result === "timeout") {
recordShutdownWarning(warnings, "gateway:pre-restart");
}
},
warnings,
);
}
if (params.bonjourStop) {
try {
await params.bonjourStop();
} catch {
/* ignore */
}
await shutdownStep("bonjour", () => params.bonjourStop!(), warnings);
}
if (params.tailscaleCleanup) {
await params.tailscaleCleanup();
await shutdownStep("tailscale", () => params.tailscaleCleanup!(), warnings);
}
if (params.canvasHost) {
try {
await params.canvasHost.close();
} catch {
/* ignore */
}
await shutdownStep("canvas-host", () => params.canvasHost!.close(), warnings);
}
if (params.canvasHostServer) {
try {
await params.canvasHostServer.close();
} catch {
/* ignore */
}
await shutdownStep("canvas-host-server", () => params.canvasHostServer!.close(), warnings);
}
for (const plugin of listChannelPlugins()) {
await params.stopChannel(plugin.id);
await shutdownStep(`channel/${plugin.id}`, () => params.stopChannel(plugin.id), warnings);
}
await disposeRegisteredAgentHarnesses();
await shutdownStep("agent-harnesses", () => disposeRegisteredAgentHarnesses(), warnings);
await Promise.all([
disposeRuntimeWithShutdownGrace({
label: "bundle-mcp",
dispose: params.disposeSessionMcpRuntimes ?? disposeAllSessionMcpRuntimes,
graceMs: MCP_RUNTIME_CLOSE_GRACE_MS,
warnings,
}),
disposeRuntimeWithShutdownGrace({
label: "bundle-lsp",
dispose: params.disposeBundleLspRuntimes ?? disposeAllBundleLspRuntimesOnDemand,
graceMs: LSP_RUNTIME_CLOSE_GRACE_MS,
warnings,
}),
]);
if (params.pluginServices) {
await params.pluginServices.stop().catch(() => {});
await shutdownStep("plugin-services", () => params.pluginServices!.stop(), warnings);
}
await stopGmailWatcherOnDemand();
await shutdownStep("gmail-watcher", () => stopGmailWatcherOnDemand(), warnings);
params.cron.stop();
params.heartbeatRunner.stop();
try {
params.stopTaskRegistryMaintenance?.();
} catch {
/* ignore */
}
try {
params.updateCheckStop?.();
} catch {
/* ignore */
}
await shutdownStep(
"task-registry-maintenance",
() => params.stopTaskRegistryMaintenance?.(),
warnings,
);
await shutdownStep("update-check", () => params.updateCheckStop?.(), warnings);
for (const timer of params.nodePresenceTimers.values()) {
clearInterval(timer);
}
@@ -276,43 +314,32 @@ export function createGatewayCloseHandler(params: {
clearInterval(params.mediaCleanup);
}
if (params.agentUnsub) {
try {
params.agentUnsub();
} catch {
/* ignore */
}
await shutdownStep("agent-unsub", () => params.agentUnsub!(), warnings);
}
if (params.heartbeatUnsub) {
try {
params.heartbeatUnsub();
} catch {
/* ignore */
}
await shutdownStep("heartbeat-unsub", () => params.heartbeatUnsub!(), warnings);
}
if (params.transcriptUnsub) {
try {
params.transcriptUnsub();
} catch {
/* ignore */
}
await shutdownStep("transcript-unsub", () => params.transcriptUnsub!(), warnings);
}
if (params.lifecycleUnsub) {
try {
params.lifecycleUnsub();
} catch {
/* ignore */
}
await shutdownStep("lifecycle-unsub", () => params.lifecycleUnsub!(), warnings);
}
params.chatRunState.clear();
let clientCloseFailures = 0;
for (const c of params.clients) {
try {
c.socket.close(1012, "service restart");
} catch {
/* ignore */
clientCloseFailures++;
}
}
if (clientCloseFailures > 0) {
shutdownLog.warn(`failed to close ${clientCloseFailures} WebSocket client(s)`);
recordShutdownWarning(warnings, "ws-clients");
}
params.clients.clear();
await params.configReloader.stop().catch(() => {});
await shutdownStep("config-reloader", () => params.configReloader.stop(), warnings);
const wsClients = params.wss.clients ?? new Set();
const closePromise = new Promise<void>((resolve) => params.wss.close(() => resolve()));
const websocketGraceTimeout = createTimeoutRace(
@@ -328,6 +355,7 @@ export function createGatewayCloseHandler(params: {
shutdownLog.warn(
`websocket server close exceeded ${WEBSOCKET_CLOSE_GRACE_MS}ms; forcing shutdown continuation with ${wsClients.size} tracked client(s)`,
);
recordShutdownWarning(warnings, "websocket-server");
for (const client of wsClients) {
try {
client.terminate();
@@ -347,11 +375,12 @@ export function createGatewayCloseHandler(params: {
params.httpServers && params.httpServers.length > 0
? params.httpServers
: [params.httpServer];
for (const server of servers) {
const httpServer = server as HttpServer & {
for (let i = 0; i < servers.length; i++) {
const httpServer = servers[i] as HttpServer & {
closeAllConnections?: () => void;
closeIdleConnections?: () => void;
};
const label = servers.length > 1 ? `http-server[${i}]` : "http-server";
if (typeof httpServer.closeIdleConnections === "function") {
httpServer.closeIdleConnections();
}
@@ -364,29 +393,51 @@ export function createGatewayCloseHandler(params: {
reject(err);
}),
);
void closePromise.catch(() => undefined);
const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const);
const closedWithinGrace = await Promise.race([
closePromise.then(() => true),
closePromise.then(
() => true,
(err: unknown) => {
throw err;
},
),
httpGraceTimeout.promise,
]);
]).catch((err: unknown) => {
const detail = err instanceof Error ? err.message : String(err);
shutdownLog.warn(`${label}: ${detail}`);
recordShutdownWarning(warnings, label);
return true;
});
httpGraceTimeout.clear();
if (!closedWithinGrace) {
shutdownLog.warn(
`http server close exceeded ${HTTP_CLOSE_GRACE_MS}ms; forcing connection shutdown and waiting for close`,
`${label} close exceeded ${HTTP_CLOSE_GRACE_MS}ms; forcing connection shutdown and waiting for close`,
);
recordShutdownWarning(warnings, label);
httpServer.closeAllConnections?.();
const httpForceTimeout = createTimeoutRace(
HTTP_CLOSE_FORCE_WAIT_MS,
() => false as const,
);
const closedAfterForce = await Promise.race([
closePromise.then(() => true),
closePromise.then(
() => true,
(err: unknown) => {
throw err;
},
),
httpForceTimeout.promise,
]);
]).catch((err: unknown) => {
const detail = err instanceof Error ? err.message : String(err);
shutdownLog.warn(`${label}: ${detail}`);
recordShutdownWarning(warnings, label);
return true;
});
httpForceTimeout.clear();
if (!closedAfterForce) {
throw new Error(
`http server close still pending after forced connection shutdown (${HTTP_CLOSE_FORCE_WAIT_MS}ms)`,
`${label} close still pending after forced connection shutdown (${HTTP_CLOSE_FORCE_WAIT_MS}ms)`,
);
}
}
@@ -398,5 +449,16 @@ export function createGatewayCloseHandler(params: {
/* ignore */
}
}
const durationMs = Date.now() - start;
if (warnings.length > 0) {
shutdownLog.warn(
`shutdown completed in ${durationMs}ms with warnings: ${warnings.join(", ")}`,
);
} else {
shutdownLog.info(`shutdown completed cleanly in ${durationMs}ms`);
}
return { durationMs, warnings };
};
}