diff --git a/CHANGELOG.md b/CHANGELOG.md index 5233b835441..8d75edbc6e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai - Plugins/onboarding: let Manual setup install optional official plugins, including ClawHub-backed diagnostics with npm fallback, and expose the external Codex plugin as a selectable provider setup choice. Thanks @vincentkoc. - Plugins/CLI: include package dependency install state in `openclaw plugins list --json` so scripts can spot missing plugin dependencies without runtime-loading plugins. +- Discord/status: add degraded Discord transport and gateway event-loop starvation signals to `openclaw channels status`, `openclaw status --deep`, and fetch-timeout logs so intermittent socket resets do not look like a healthy running channel. (#76327) Thanks @joshavant. - Plugins/update: on the beta OpenClaw update channel, default-line npm and ClawHub plugin updates try `@beta` first and fall back to default/latest when no plugin beta release exists. - Channels/WhatsApp: support explicit WhatsApp Channel/Newsletter `@newsletter` outbound message targets with channel session metadata instead of DM routing. Fixes #13417; carries forward the narrow outbound target idea from #13424. Thanks @vincentkoc and @agentz-manfred. diff --git a/extensions/discord/src/status-issues.test.ts b/extensions/discord/src/status-issues.test.ts index 7e571166450..4b514afbe07 100644 --- a/extensions/discord/src/status-issues.test.ts +++ b/extensions/discord/src/status-issues.test.ts @@ -67,4 +67,26 @@ describe("collectDiscordStatusIssues", () => { expect(issues[0]?.message).toContain("alerts"); expect(issues[0]?.message).toContain("guilds.ops.channels"); }); + + it("reports degraded runtime transport state", () => { + const issues = collectDiscordStatusIssues([ + { + accountId: "ops", + enabled: true, + configured: true, + running: true, + connected: true, + healthState: "stale-socket", + } as ChannelAccountSnapshot, + ]); + + expect(issues).toEqual([ + expect.objectContaining({ + channel: "discord", + accountId: "ops", + kind: "runtime", + message: expect.stringContaining("stale-socket"), + }), + ]); + }); }); diff --git a/extensions/discord/src/status-issues.ts b/extensions/discord/src/status-issues.ts index f095221483e..db42011d75b 100644 --- a/extensions/discord/src/status-issues.ts +++ b/extensions/discord/src/status-issues.ts @@ -21,6 +21,9 @@ type DiscordAccountStatus = { accountId?: unknown; enabled?: unknown; configured?: unknown; + running?: unknown; + connected?: unknown; + healthState?: unknown; application?: unknown; audit?: unknown; }; @@ -45,6 +48,9 @@ function readDiscordAccountStatus(value: ChannelAccountSnapshot): DiscordAccount accountId: value.accountId, enabled: value.enabled, configured: value.configured, + running: value.running, + connected: value.connected, + healthState: value.healthState, application: value.application, audit: value.audit, }; @@ -124,6 +130,32 @@ export function collectDiscordStatusIssues( continue; } + const running = account.running === true; + const healthState = asString(account.healthState); + if ( + healthState === "stale-socket" || + healthState === "stuck" || + healthState === "disconnected" || + healthState === "not-running" + ) { + const runningLabel = running ? "running" : "not running"; + issues.push({ + channel: "discord", + accountId, + kind: "runtime", + message: `Discord gateway transport is degraded (${healthState}; account is ${runningLabel}).`, + fix: "Check gateway event-loop health and Discord connectivity, then restart the Discord channel or gateway if the transport does not recover.", + }); + } else if (running && account.connected === false) { + issues.push({ + channel: "discord", + accountId, + kind: "runtime", + message: "Discord gateway transport is running but disconnected.", + fix: "Check gateway logs for Discord websocket errors and wait for reconnect; restart the Discord channel or gateway if it does not recover.", + }); + } + const app = readDiscordApplicationSummary(account.application); const messageContent = app.intents?.messageContent; if (messageContent === "disabled") { diff --git a/src/commands/channels.surfaces-signal-runtime-errors-channels-status-output.test.ts b/src/commands/channels.surfaces-signal-runtime-errors-channels-status-output.test.ts index 315ba812ee7..57dcaa6f747 100644 --- a/src/commands/channels.surfaces-signal-runtime-errors-channels-status-output.test.ts +++ b/src/commands/channels.surfaces-signal-runtime-errors-channels-status-output.test.ts @@ -83,4 +83,23 @@ describe("channels command", () => { expect(lines.join("\n")).toMatch(/imessage/i); expect(lines.join("\n")).toMatch(/Channel error/i); }); + + it("surfaces degraded gateway event-loop health in channels status output", () => { + const lines = formatGatewayChannelsStatusLines({ + eventLoop: { + degraded: true, + reasons: ["event_loop_delay", "cpu"], + intervalMs: 62_000, + delayP99Ms: 61_000, + delayMaxMs: 62_000, + utilization: 1, + cpuCoreRatio: 1, + }, + channelLabels: {}, + channelAccounts: {}, + }); + + expect(lines.join("\n")).toMatch(/Gateway event loop degraded/); + expect(lines.join("\n")).toMatch(/eventLoopDelayMaxMs=62000/); + }); }); diff --git a/src/commands/channels/status.ts b/src/commands/channels/status.ts index bd6ca7d1ada..12c6f73a226 100644 --- a/src/commands/channels/status.ts +++ b/src/commands/channels/status.ts @@ -39,9 +39,46 @@ function formatChannelsStatusError(err: unknown): string { return redactGatewayUrlSecretsInText(formatErrorMessage(err)); } +function formatEventLoopBits(value: unknown): string | null { + if (!value || typeof value !== "object") { + return null; + } + const record = value as Record; + if (record.degraded !== true) { + return null; + } + const reasons = Array.isArray(record.reasons) + ? record.reasons.filter((reason): reason is string => typeof reason === "string") + : []; + const delayMaxMs = + typeof record.delayMaxMs === "number" && Number.isFinite(record.delayMaxMs) + ? Math.round(record.delayMaxMs) + : null; + const utilization = + typeof record.utilization === "number" && Number.isFinite(record.utilization) + ? record.utilization + : null; + const cpuCoreRatio = + typeof record.cpuCoreRatio === "number" && Number.isFinite(record.cpuCoreRatio) + ? record.cpuCoreRatio + : null; + return [ + reasons.length ? `reasons=${reasons.join(",")}` : null, + delayMaxMs != null ? `eventLoopDelayMaxMs=${delayMaxMs}` : null, + utilization != null ? `eventLoopUtilization=${utilization}` : null, + cpuCoreRatio != null ? `cpuCoreRatio=${cpuCoreRatio}` : null, + ] + .filter((part): part is string => Boolean(part)) + .join(" "); +} + export function formatGatewayChannelsStatusLines(payload: Record): string[] { const lines: string[] = []; lines.push(theme.success("Gateway reachable.")); + const eventLoopLine = formatEventLoopBits(payload.eventLoop); + if (eventLoopLine) { + lines.push(theme.warn(`Gateway event loop degraded: ${eventLoopLine}`)); + } const channelLabels = payload.channelLabels && typeof payload.channelLabels === "object" ? (payload.channelLabels as Record) @@ -108,6 +145,9 @@ export function formatGatewayChannelsStatusLines(payload: Record { return parts.join(" "); }; +function formatEventLoopHealthLine(summary: HealthSummary): string | null { + const eventLoop = summary.eventLoop; + if (!eventLoop) { + return null; + } + const state = eventLoop.degraded ? "degraded" : "ok"; + const reasons = eventLoop.reasons.length > 0 ? ` reasons=${eventLoop.reasons.join(",")}` : ""; + return `Gateway event loop: ${state}${reasons} max=${Math.round( + eventLoop.delayMaxMs, + )}ms p99=${Math.round(eventLoop.delayP99Ms)}ms util=${eventLoop.utilization} cpu=${ + eventLoop.cpuCoreRatio + }`; +} + const resolveHeartbeatSummary = (cfg: OpenClawConfig, agentId: string) => resolveHeartbeatSummaryForAgent(cfg, agentId); @@ -307,6 +326,7 @@ export async function getHealthSnapshot(params?: { probe?: boolean; includeSensitive?: boolean; runtimeSnapshot?: ChannelRuntimeSnapshot; + eventLoop?: HealthSummary["eventLoop"]; }): Promise { const timeoutMs = params?.timeoutMs; const cfg = getRuntimeConfig(); @@ -432,6 +452,15 @@ export async function getHealthSnapshot(params?: { if (lastProbeAt) { snapshot.lastProbeAt = lastProbeAt; } + const health = evaluateChannelHealth(snapshot, { + channelId: plugin.id, + now: Date.now(), + staleEventThresholdMs: DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS, + channelConnectGraceMs: DEFAULT_CHANNEL_CONNECT_GRACE_MS, + }); + if (!health.healthy) { + snapshot.healthState = health.reason; + } const summary = plugin.status?.buildChannelSummary ? await plugin.status.buildChannelSummary({ @@ -483,6 +512,7 @@ export async function getHealthSnapshot(params?: { ok: true, ts: Date.now(), durationMs: Date.now() - start, + ...(params?.eventLoop ? { eventLoop: params.eventLoop } : {}), ...(pluginHealth ? { plugins: pluginHealth } : {}), channels, channelOrder, @@ -667,6 +697,10 @@ export async function healthCommand( for (const line of channelLines) { runtime.log(styleHealthChannelLine(line, rich)); } + const eventLoopLine = formatEventLoopHealthLine(summary); + if (eventLoopLine) { + runtime.log(styleHealthChannelLine(eventLoopLine, rich)); + } for (const plugin of displayPlugins) { const channelSummary = summary.channels?.[plugin.id]; if (!channelSummary || channelSummary.linked !== true) { diff --git a/src/commands/health.types.ts b/src/commands/health.types.ts index 7a22f4a4c28..6d2d03acc7c 100644 --- a/src/commands/health.types.ts +++ b/src/commands/health.types.ts @@ -39,6 +39,7 @@ export type HealthSummary = { ok: true; ts: number; durationMs: number; + eventLoop?: import("../gateway/server/event-loop-health.js").GatewayEventLoopHealth; plugins?: PluginHealthSummary; channels: Record; channelOrder: string[]; diff --git a/src/commands/status.command-sections.test.ts b/src/commands/status.command-sections.test.ts index 84da78a74f3..f839b8c7f10 100644 --- a/src/commands/status.command-sections.test.ts +++ b/src/commands/status.command-sections.test.ts @@ -137,6 +137,36 @@ describe("status.command-sections", () => { ]); }); + it("adds degraded event-loop health to status rows", () => { + const rows = buildStatusHealthRows({ + health: { + durationMs: 42, + eventLoop: { + degraded: true, + reasons: ["event_loop_delay"], + intervalMs: 62_000, + delayP99Ms: 61_000, + delayMaxMs: 62_000, + utilization: 1, + cpuCoreRatio: 1, + }, + } as HealthSummary, + formatHealthChannelLines: () => [], + ok: (value) => `ok(${value})`, + warn: (value) => `warn(${value})`, + muted: (value) => `muted(${value})`, + }); + + expect(rows).toEqual([ + { Item: "Gateway", Status: "ok(reachable)", Detail: "42ms" }, + { + Item: "Event loop", + Status: "warn(WARN)", + Detail: "reasons event_loop_delay · max 62000ms · p99 61000ms · util 1 · cpu 1", + }, + ]); + }); + it("builds footer lines from update and reachability state", () => { expect( buildStatusFooterLines({ diff --git a/src/commands/status.command-sections.ts b/src/commands/status.command-sections.ts index ffae54fda5e..fd8b42705eb 100644 --- a/src/commands/status.command-sections.ts +++ b/src/commands/status.command-sections.ts @@ -23,6 +23,7 @@ type SummaryLike = Pick; export type StatusMemoryStateResolvers = { resolveMemoryVectorState: (value: NonNullable) => { @@ -260,6 +261,13 @@ export function buildStatusHealthRows(params: { Detail: `${params.health.durationMs}ms`, }, ]; + if (params.health.eventLoop) { + rows.push({ + Item: "Event loop", + Status: params.health.eventLoop.degraded ? params.warn("WARN") : params.ok("OK"), + Detail: formatEventLoopHealthDetail(params.health.eventLoop), + }); + } for (const line of params.formatHealthChannelLines(params.health, { accountMode: "all" })) { const colon = line.indexOf(":"); if (colon === -1) { @@ -286,6 +294,17 @@ export function buildStatusHealthRows(params: { return rows; } +export function formatEventLoopHealthDetail(eventLoop: EventLoopHealthLike): string { + const parts = [ + eventLoop.reasons.length > 0 ? `reasons ${eventLoop.reasons.join(",")}` : "healthy", + `max ${Math.round(eventLoop.delayMaxMs)}ms`, + `p99 ${Math.round(eventLoop.delayP99Ms)}ms`, + `util ${eventLoop.utilization}`, + `cpu ${eventLoop.cpuCoreRatio}`, + ]; + return parts.join(" · "); +} + export function buildStatusSessionsRows(params: { recent: SessionsRecentLike[]; verbose?: boolean; diff --git a/src/commands/status.types.ts b/src/commands/status.types.ts index 59f118f9fc2..0aa2c899289 100644 --- a/src/commands/status.types.ts +++ b/src/commands/status.types.ts @@ -39,6 +39,7 @@ export type HeartbeatStatus = { export type StatusSummary = { runtimeVersion?: string | null; + eventLoop?: import("../gateway/server/event-loop-health.js").GatewayEventLoopHealth; linkChannel?: { id: ChannelId; label: string; diff --git a/src/gateway/protocol/channels.schema.test.ts b/src/gateway/protocol/channels.schema.test.ts index bffbdc4c57b..6d415afddb7 100644 --- a/src/gateway/protocol/channels.schema.test.ts +++ b/src/gateway/protocol/channels.schema.test.ts @@ -1,9 +1,10 @@ import AjvPkg from "ajv"; import { describe, expect, it } from "vitest"; -import { WebLoginWaitParamsSchema } from "./schema/channels.js"; +import { ChannelsStatusResultSchema, WebLoginWaitParamsSchema } from "./schema/channels.js"; + +const Ajv = AjvPkg as unknown as new (opts?: object) => import("ajv").default; describe("WebLoginWaitParamsSchema", () => { - const Ajv = AjvPkg as unknown as new (opts?: object) => import("ajv").default; const validate = new Ajv().compile(WebLoginWaitParamsSchema); it("bounds caller-provided QR data URLs", () => { @@ -25,3 +26,40 @@ describe("WebLoginWaitParamsSchema", () => { ).toBe(false); }); }); + +describe("ChannelsStatusResultSchema", () => { + const validate = new Ajv().compile(ChannelsStatusResultSchema); + + it("accepts gateway event-loop diagnostics emitted by channels.status", () => { + expect( + validate({ + ts: Date.now(), + channelOrder: ["discord"], + channelLabels: { discord: "Discord" }, + channels: { discord: { configured: true } }, + channelAccounts: { + discord: [ + { + accountId: "default", + enabled: true, + configured: true, + running: true, + connected: false, + healthState: "stale-socket", + }, + ], + }, + channelDefaultAccountId: { discord: "default" }, + eventLoop: { + degraded: true, + reasons: ["event_loop_delay", "cpu"], + intervalMs: 62_000, + delayP99Ms: 1_250.5, + delayMaxMs: 62_000, + utilization: 0.98, + cpuCoreRatio: 1.2, + }, + }), + ).toBe(true); + }); +}); diff --git a/src/gateway/protocol/schema/channels.ts b/src/gateway/protocol/schema/channels.ts index cd831b50ac1..87d95f609c5 100644 --- a/src/gateway/protocol/schema/channels.ts +++ b/src/gateway/protocol/schema/channels.ts @@ -287,6 +287,25 @@ export const ChannelUiMetaSchema = Type.Object( { additionalProperties: false }, ); +export const ChannelEventLoopHealthSchema = Type.Object( + { + degraded: Type.Boolean(), + reasons: Type.Array( + Type.Union([ + Type.Literal("event_loop_delay"), + Type.Literal("event_loop_utilization"), + Type.Literal("cpu"), + ]), + ), + intervalMs: Type.Integer({ minimum: 0 }), + delayP99Ms: Type.Number({ minimum: 0 }), + delayMaxMs: Type.Number({ minimum: 0 }), + utilization: Type.Number({ minimum: 0 }), + cpuCoreRatio: Type.Number({ minimum: 0 }), + }, + { additionalProperties: false }, +); + export const ChannelsStatusResultSchema = Type.Object( { ts: Type.Integer({ minimum: 0 }), @@ -298,6 +317,7 @@ export const ChannelsStatusResultSchema = Type.Object( channels: Type.Record(NonEmptyString, Type.Unknown()), channelAccounts: Type.Record(NonEmptyString, Type.Array(ChannelAccountSnapshotSchema)), channelDefaultAccountId: Type.Record(NonEmptyString, NonEmptyString), + eventLoop: Type.Optional(ChannelEventLoopHealthSchema), }, { additionalProperties: false }, ); diff --git a/src/gateway/server-methods/channels.status.test.ts b/src/gateway/server-methods/channels.status.test.ts index 66e29f4e325..0df3e4520b9 100644 --- a/src/gateway/server-methods/channels.status.test.ts +++ b/src/gateway/server-methods/channels.status.test.ts @@ -163,4 +163,60 @@ describe("channelsHandlers channels.status", () => { }), ); }); + + it("annotates unhealthy channel snapshots and includes event-loop health", async () => { + const now = Date.now(); + mocks.applyPluginAutoEnable.mockReturnValue({ config: { autoEnabled: true }, changes: [] }); + mocks.buildChannelAccountSnapshot.mockResolvedValue({ + accountId: "default", + enabled: true, + configured: true, + running: true, + connected: true, + lastStartAt: now - 60 * 60_000, + lastTransportActivityAt: now - 40 * 60_000, + }); + const eventLoop = { + degraded: true, + reasons: ["event_loop_delay"], + intervalMs: 62_000, + delayP99Ms: 62_000, + delayMaxMs: 62_000, + utilization: 1, + cpuCoreRatio: 1, + }; + const respond = vi.fn(); + + await channelsHandlers["channels.status"]( + createOptions( + { probe: false, timeoutMs: 2000 }, + { + respond, + context: { + getRuntimeConfig: mocks.getRuntimeConfig, + getRuntimeSnapshot: () => ({ + channels: {}, + channelAccounts: {}, + }), + getEventLoopHealth: () => eventLoop, + } as never, + }, + ), + ); + + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + eventLoop, + channelAccounts: { + whatsapp: [ + expect.objectContaining({ + healthState: "stale-socket", + }), + ], + }, + }), + undefined, + ); + }); }); diff --git a/src/gateway/server-methods/channels.ts b/src/gateway/server-methods/channels.ts index 095dface47f..47bd45c2e12 100644 --- a/src/gateway/server-methods/channels.ts +++ b/src/gateway/server-methods/channels.ts @@ -17,6 +17,11 @@ import { DEFAULT_ACCOUNT_ID } from "../../routing/session-key.js"; import { defaultRuntime } from "../../runtime.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { runTasksWithConcurrency } from "../../utils/run-with-concurrency.js"; +import { + DEFAULT_CHANNEL_CONNECT_GRACE_MS, + DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS, + evaluateChannelHealth, +} from "../channel-health-policy.js"; import { ErrorCodes, errorShape, @@ -277,6 +282,15 @@ export const channelsHandlers: GatewayRequestHandlers = { if (snapshot.lastOutboundAt == null) { snapshot.lastOutboundAt = activity.outboundAt; } + const health = evaluateChannelHealth(snapshot, { + channelId, + now: Date.now(), + staleEventThresholdMs: DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS, + channelConnectGraceMs: DEFAULT_CHANNEL_CONNECT_GRACE_MS, + }); + if (!health.healthy) { + snapshot.healthState = health.reason; + } return { accountId: accountId, account, snapshot }; }; @@ -324,6 +338,7 @@ export const channelsHandlers: GatewayRequestHandlers = { channelDetailLabels: uiCatalog.detailLabels, channelSystemImages: uiCatalog.systemImages, channelMeta: uiCatalog.entries, + ...(context.getEventLoopHealth ? { eventLoop: context.getEventLoopHealth() } : {}), channels: {} as Record, channelAccounts: {} as Record, channelDefaultAccountId: {} as Record, diff --git a/src/gateway/server-methods/health.ts b/src/gateway/server-methods/health.ts index cbe8e60e7a4..00d20efd1be 100644 --- a/src/gateway/server-methods/health.ts +++ b/src/gateway/server-methods/health.ts @@ -107,6 +107,9 @@ export const healthHandlers: GatewayRequestHandlers = { !cachedDiffersFromRuntime && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS ) { + if (context.getEventLoopHealth) { + cached.eventLoop = context.getEventLoopHealth(); + } respond(true, cached, undefined, { cached: true }); void refreshHealthSnapshot({ probe: false, includeSensitive }).catch((err) => logHealth.error(`background health refresh failed: ${formatError(err)}`), @@ -120,12 +123,15 @@ export const healthHandlers: GatewayRequestHandlers = { respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); } }, - status: async ({ respond, client, params }) => { + status: async ({ respond, client, params, context }) => { const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : []; const status = await getStatusSummary({ includeSensitive: scopes.includes(ADMIN_SCOPE), includeChannelSummary: params.includeChannelSummary !== false, }); + if (context.getEventLoopHealth) { + status.eventLoop = context.getEventLoopHealth(); + } respond(true, status, undefined); }, }; diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 2a365fe0242..6066b165daa 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -2008,6 +2008,65 @@ describe("gateway healthHandlers.health cache freshness", () => { expect(respond).toHaveBeenCalledWith(true, fresh, undefined); }); + it("preserves event-loop health sampled by the refresh path", async () => { + const eventLoop = { + degraded: true, + reasons: ["event_loop_delay" as const], + intervalMs: 2_000, + delayP99Ms: 1_500, + delayMaxMs: 1_800, + utilization: 0.2, + cpuCoreRatio: 0.1, + }; + const replacementEventLoop = { + degraded: false, + reasons: [], + intervalMs: 1, + delayP99Ms: 0, + delayMaxMs: 0, + utilization: 0, + cpuCoreRatio: 0, + }; + const fresh = { + ok: true, + ts: Date.now(), + durationMs: 1, + channels: {}, + channelOrder: [], + channelLabels: {}, + heartbeatSeconds: 0, + defaultAgentId: "main", + agents: [], + sessions: { path: "/tmp/sessions.json", count: 0, recent: [] }, + eventLoop, + }; + const respond = vi.fn(); + const refreshHealthSnapshot = vi.fn().mockResolvedValue(fresh); + const getEventLoopHealth = vi.fn(() => replacementEventLoop); + + await healthHandlers.health({ + req: {} as never, + params: {} as never, + respond: respond as never, + context: { + getHealthCache: () => null, + refreshHealthSnapshot, + getRuntimeSnapshot: () => ({ channels: {}, channelAccounts: {} }), + getEventLoopHealth, + logHealth: { error: vi.fn() }, + } as never, + client: { connect: { role: "operator", scopes: ["operator.read"] } } as never, + isWebchatConnect: () => false, + }); + + expect(refreshHealthSnapshot).toHaveBeenCalledWith({ + probe: false, + includeSensitive: false, + }); + expect(getEventLoopHealth).not.toHaveBeenCalled(); + expect(respond).toHaveBeenCalledWith(true, expect.objectContaining({ eventLoop }), undefined); + }); + it("refreshes cached health when a runtime account is missing from the cached account summary", async () => { const cached = { ok: true, diff --git a/src/gateway/server-methods/shared-types.ts b/src/gateway/server-methods/shared-types.ts index f15b0bd28dc..1961d200f85 100644 --- a/src/gateway/server-methods/shared-types.ts +++ b/src/gateway/server-methods/shared-types.ts @@ -13,6 +13,7 @@ import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index. import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast-types.js"; import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js"; import type { DedupeEntry } from "../server-shared.js"; +import type { GatewayEventLoopHealth } from "../server/event-loop-health.js"; type SubsystemLogger = ReturnType; @@ -91,6 +92,7 @@ export type GatewayRequestContext = { findRunningWizard: () => string | null; purgeWizardSession: (id: string) => void; getRuntimeSnapshot: () => ChannelRuntimeSnapshot; + getEventLoopHealth?: () => GatewayEventLoopHealth | undefined; startChannel: ( channel: import("../../channels/plugins/types.public.js").ChannelId, accountId?: string, diff --git a/src/gateway/server-request-context.ts b/src/gateway/server-request-context.ts index 25ff29973e0..7fc674cc308 100644 --- a/src/gateway/server-request-context.ts +++ b/src/gateway/server-request-context.ts @@ -52,6 +52,7 @@ type GatewayRequestContextParams = { findRunningWizard: GatewayRequestContext["findRunningWizard"]; purgeWizardSession: GatewayRequestContext["purgeWizardSession"]; getRuntimeSnapshot: GatewayRequestContext["getRuntimeSnapshot"]; + getEventLoopHealth?: GatewayRequestContext["getEventLoopHealth"]; startChannel: GatewayRequestContext["startChannel"]; stopChannel: GatewayRequestContext["stopChannel"]; markChannelLoggedOut: GatewayRequestContext["markChannelLoggedOut"]; @@ -147,6 +148,7 @@ export function createGatewayRequestContext( findRunningWizard: params.findRunningWizard, purgeWizardSession: params.purgeWizardSession, getRuntimeSnapshot: params.getRuntimeSnapshot, + getEventLoopHealth: params.getEventLoopHealth, startChannel: params.startChannel, stopChannel: params.stopChannel, markChannelLoggedOut: params.markChannelLoggedOut, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index f103d2a9b26..06176cdd08e 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -874,6 +874,7 @@ export async function startGatewayServer( refreshGatewayHealthSnapshot({ ...opts, getRuntimeSnapshot, + getEventLoopHealth: readinessEventLoopHealth.snapshot, }); const createCloseHandler = () => async (opts?: { reason?: string; restartExpectedMs?: number | null }) => { @@ -1221,6 +1222,7 @@ export async function startGatewayServer( findRunningWizard, purgeWizardSession, getRuntimeSnapshot, + getEventLoopHealth: readinessEventLoopHealth.snapshot, startChannel, stopChannel, markChannelLoggedOut, diff --git a/src/gateway/server/health-state.test.ts b/src/gateway/server/health-state.test.ts index c4a63b55c23..1382d1c08d4 100644 --- a/src/gateway/server/health-state.test.ts +++ b/src/gateway/server/health-state.test.ts @@ -57,10 +57,37 @@ describe("refreshGatewayHealthSnapshot", () => { includeSensitive: false, runtimeSnapshot: undefined, }); + expect(Object.hasOwn(getHealthSnapshotMock.mock.calls[0]?.[0] ?? {}, "eventLoop")).toBe(false); resolveSnapshot?.(createHealthSummary()); await expect(Promise.all([first, second])).resolves.toHaveLength(2); }); + it("passes event-loop health only when the hook returns a snapshot", async () => { + const healthState = await loadHealthState(); + const eventLoop = { + degraded: true, + reasons: ["event_loop_delay" as const], + intervalMs: 2_000, + delayP99Ms: 1_500, + delayMaxMs: 1_700, + utilization: 0.2, + cpuCoreRatio: 0.1, + }; + + await healthState.refreshGatewayHealthSnapshot({ + probe: false, + getEventLoopHealth: () => eventLoop, + }); + await healthState.refreshGatewayHealthSnapshot({ + probe: true, + getEventLoopHealth: () => undefined, + }); + + expect(getHealthSnapshotMock).toHaveBeenCalledTimes(2); + expect(getHealthSnapshotMock.mock.calls[0]?.[0]?.eventLoop).toBe(eventLoop); + expect(Object.hasOwn(getHealthSnapshotMock.mock.calls[1]?.[0] ?? {}, "eventLoop")).toBe(false); + }); + it("captures runtime snapshots for completed refreshes and guards snapshot failures", async () => { const healthState = await loadHealthState(); const runtimeSnapshot = { @@ -98,7 +125,9 @@ describe("refreshGatewayHealthSnapshot", () => { const sensitiveSummary = createHealthSummary(); const safeSummary = createHealthSummary(); const broadcast = vi.fn(); - getHealthSnapshotMock.mockResolvedValueOnce(sensitiveSummary).mockResolvedValueOnce(safeSummary); + getHealthSnapshotMock + .mockResolvedValueOnce(sensitiveSummary) + .mockResolvedValueOnce(safeSummary); healthState.setBroadcastHealthUpdate(broadcast); const version = healthState.getHealthVersion(); diff --git a/src/gateway/server/health-state.ts b/src/gateway/server/health-state.ts index b213faa8fc2..5ca4b298277 100644 --- a/src/gateway/server/health-state.ts +++ b/src/gateway/server/health-state.ts @@ -9,6 +9,7 @@ import { normalizeMainKey } from "../../routing/session-key.js"; import { resolveGatewayAuth } from "../auth.js"; import type { Snapshot } from "../protocol/index.js"; import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js"; +import type { GatewayEventLoopHealth } from "./event-loop-health.js"; let presenceVersion = 1; let healthVersion = 1; @@ -76,6 +77,7 @@ export async function refreshGatewayHealthSnapshot(opts?: { probe?: boolean; includeSensitive?: boolean; getRuntimeSnapshot?: () => ChannelRuntimeSnapshot; + getEventLoopHealth?: () => GatewayEventLoopHealth | undefined; }) { const includeSensitive = opts?.includeSensitive === true; let refresh = includeSensitive ? sensitiveHealthRefresh : healthRefresh; @@ -87,10 +89,12 @@ export async function refreshGatewayHealthSnapshot(opts?: { } catch { runtimeSnapshot = undefined; } + const eventLoop = opts?.getEventLoopHealth?.(); const snap = await getHealthSnapshot({ probe: opts?.probe, includeSensitive, runtimeSnapshot, + ...(eventLoop ? { eventLoop } : {}), }); if (!includeSensitive) { healthCache = snap; diff --git a/src/utils/fetch-timeout.test.ts b/src/utils/fetch-timeout.test.ts index b0724d24c2c..179d46ae87e 100644 --- a/src/utils/fetch-timeout.test.ts +++ b/src/utils/fetch-timeout.test.ts @@ -45,6 +45,31 @@ describe("buildTimeoutAbortSignal", () => { cleanup(); }); + it("annotates timeout logs when the timer fires late", async () => { + vi.setSystemTime(0); + const { cleanup } = buildTimeoutAbortSignal({ + timeoutMs: 25, + operation: "unit-test", + url: "https://example.com/v1/responses", + }); + + vi.setSystemTime(2_000); + await vi.advanceTimersByTimeAsync(25); + + expect(warn).toHaveBeenCalledWith( + "fetch timeout reached; aborting operation", + expect.objectContaining({ + timerDelayMs: 2000, + eventLoopDelayHint: "timer delayed 2000ms, likely event-loop starvation", + consoleMessage: expect.stringContaining( + "timer delayed 2000ms, likely event-loop starvation", + ), + }), + ); + + cleanup(); + }); + it("strips query strings and hashes from relative timeout URL logs", async () => { const { cleanup } = buildTimeoutAbortSignal({ timeoutMs: 25, diff --git a/src/utils/fetch-timeout.ts b/src/utils/fetch-timeout.ts index 52fe29d6335..fa43dcd7a8e 100644 --- a/src/utils/fetch-timeout.ts +++ b/src/utils/fetch-timeout.ts @@ -65,9 +65,15 @@ function abortDueToTimeout( } const sanitizedUrl = sanitizeTimeoutLogUrl(url); const elapsedMs = Math.max(0, Date.now() - startedAtMs); + const delayMs = Math.max(0, elapsedMs - timeoutMs); + const eventLoopDelayHint = + delayMs >= Math.max(1000, timeoutMs * 0.5) + ? `timer delayed ${delayMs}ms, likely event-loop starvation` + : null; const consoleMessage = [ `fetch timeout after ${timeoutMs}ms`, `(elapsed ${elapsedMs}ms)`, + eventLoopDelayHint, operation ? `operation=${operation}` : null, sanitizedUrl ? `url=${sanitizedUrl}` : null, ] @@ -76,6 +82,7 @@ function abortDueToTimeout( log.warn("fetch timeout reached; aborting operation", { timeoutMs, elapsedMs, + ...(eventLoopDelayHint ? { timerDelayMs: delayMs, eventLoopDelayHint } : {}), consoleMessage, ...(operation ? { operation } : {}), ...(sanitizedUrl ? { url: sanitizedUrl } : {}),