mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:00:43 +00:00
fix(discord): surface stalled transport health (#76327)
* fix(discord): surface stalled transport health * fix(discord): surface stalled transport health * fix(discord): surface stalled transport health --------- Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com>
This commit is contained in:
@@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Plugins/onboarding: let Manual setup install optional official plugins, including ClawHub-backed diagnostics with npm fallback, and expose the external Codex plugin as a selectable provider setup choice. Thanks @vincentkoc.
|
||||
- Plugins/CLI: include package dependency install state in `openclaw plugins list --json` so scripts can spot missing plugin dependencies without runtime-loading plugins.
|
||||
- Discord/status: add degraded Discord transport and gateway event-loop starvation signals to `openclaw channels status`, `openclaw status --deep`, and fetch-timeout logs so intermittent socket resets do not look like a healthy running channel. (#76327) Thanks @joshavant.
|
||||
- Plugins/update: on the beta OpenClaw update channel, default-line npm and ClawHub plugin updates try `@beta` first and fall back to default/latest when no plugin beta release exists.
|
||||
- Channels/WhatsApp: support explicit WhatsApp Channel/Newsletter `@newsletter` outbound message targets with channel session metadata instead of DM routing. Fixes #13417; carries forward the narrow outbound target idea from #13424. Thanks @vincentkoc and @agentz-manfred.
|
||||
|
||||
|
||||
@@ -67,4 +67,26 @@ describe("collectDiscordStatusIssues", () => {
|
||||
expect(issues[0]?.message).toContain("alerts");
|
||||
expect(issues[0]?.message).toContain("guilds.ops.channels");
|
||||
});
|
||||
|
||||
it("reports degraded runtime transport state", () => {
|
||||
const issues = collectDiscordStatusIssues([
|
||||
{
|
||||
accountId: "ops",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
running: true,
|
||||
connected: true,
|
||||
healthState: "stale-socket",
|
||||
} as ChannelAccountSnapshot,
|
||||
]);
|
||||
|
||||
expect(issues).toEqual([
|
||||
expect.objectContaining({
|
||||
channel: "discord",
|
||||
accountId: "ops",
|
||||
kind: "runtime",
|
||||
message: expect.stringContaining("stale-socket"),
|
||||
}),
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -21,6 +21,9 @@ type DiscordAccountStatus = {
|
||||
accountId?: unknown;
|
||||
enabled?: unknown;
|
||||
configured?: unknown;
|
||||
running?: unknown;
|
||||
connected?: unknown;
|
||||
healthState?: unknown;
|
||||
application?: unknown;
|
||||
audit?: unknown;
|
||||
};
|
||||
@@ -45,6 +48,9 @@ function readDiscordAccountStatus(value: ChannelAccountSnapshot): DiscordAccount
|
||||
accountId: value.accountId,
|
||||
enabled: value.enabled,
|
||||
configured: value.configured,
|
||||
running: value.running,
|
||||
connected: value.connected,
|
||||
healthState: value.healthState,
|
||||
application: value.application,
|
||||
audit: value.audit,
|
||||
};
|
||||
@@ -124,6 +130,32 @@ export function collectDiscordStatusIssues(
|
||||
continue;
|
||||
}
|
||||
|
||||
const running = account.running === true;
|
||||
const healthState = asString(account.healthState);
|
||||
if (
|
||||
healthState === "stale-socket" ||
|
||||
healthState === "stuck" ||
|
||||
healthState === "disconnected" ||
|
||||
healthState === "not-running"
|
||||
) {
|
||||
const runningLabel = running ? "running" : "not running";
|
||||
issues.push({
|
||||
channel: "discord",
|
||||
accountId,
|
||||
kind: "runtime",
|
||||
message: `Discord gateway transport is degraded (${healthState}; account is ${runningLabel}).`,
|
||||
fix: "Check gateway event-loop health and Discord connectivity, then restart the Discord channel or gateway if the transport does not recover.",
|
||||
});
|
||||
} else if (running && account.connected === false) {
|
||||
issues.push({
|
||||
channel: "discord",
|
||||
accountId,
|
||||
kind: "runtime",
|
||||
message: "Discord gateway transport is running but disconnected.",
|
||||
fix: "Check gateway logs for Discord websocket errors and wait for reconnect; restart the Discord channel or gateway if it does not recover.",
|
||||
});
|
||||
}
|
||||
|
||||
const app = readDiscordApplicationSummary(account.application);
|
||||
const messageContent = app.intents?.messageContent;
|
||||
if (messageContent === "disabled") {
|
||||
|
||||
@@ -83,4 +83,23 @@ describe("channels command", () => {
|
||||
expect(lines.join("\n")).toMatch(/imessage/i);
|
||||
expect(lines.join("\n")).toMatch(/Channel error/i);
|
||||
});
|
||||
|
||||
it("surfaces degraded gateway event-loop health in channels status output", () => {
|
||||
const lines = formatGatewayChannelsStatusLines({
|
||||
eventLoop: {
|
||||
degraded: true,
|
||||
reasons: ["event_loop_delay", "cpu"],
|
||||
intervalMs: 62_000,
|
||||
delayP99Ms: 61_000,
|
||||
delayMaxMs: 62_000,
|
||||
utilization: 1,
|
||||
cpuCoreRatio: 1,
|
||||
},
|
||||
channelLabels: {},
|
||||
channelAccounts: {},
|
||||
});
|
||||
|
||||
expect(lines.join("\n")).toMatch(/Gateway event loop degraded/);
|
||||
expect(lines.join("\n")).toMatch(/eventLoopDelayMaxMs=62000/);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -39,9 +39,46 @@ function formatChannelsStatusError(err: unknown): string {
|
||||
return redactGatewayUrlSecretsInText(formatErrorMessage(err));
|
||||
}
|
||||
|
||||
function formatEventLoopBits(value: unknown): string | null {
|
||||
if (!value || typeof value !== "object") {
|
||||
return null;
|
||||
}
|
||||
const record = value as Record<string, unknown>;
|
||||
if (record.degraded !== true) {
|
||||
return null;
|
||||
}
|
||||
const reasons = Array.isArray(record.reasons)
|
||||
? record.reasons.filter((reason): reason is string => typeof reason === "string")
|
||||
: [];
|
||||
const delayMaxMs =
|
||||
typeof record.delayMaxMs === "number" && Number.isFinite(record.delayMaxMs)
|
||||
? Math.round(record.delayMaxMs)
|
||||
: null;
|
||||
const utilization =
|
||||
typeof record.utilization === "number" && Number.isFinite(record.utilization)
|
||||
? record.utilization
|
||||
: null;
|
||||
const cpuCoreRatio =
|
||||
typeof record.cpuCoreRatio === "number" && Number.isFinite(record.cpuCoreRatio)
|
||||
? record.cpuCoreRatio
|
||||
: null;
|
||||
return [
|
||||
reasons.length ? `reasons=${reasons.join(",")}` : null,
|
||||
delayMaxMs != null ? `eventLoopDelayMaxMs=${delayMaxMs}` : null,
|
||||
utilization != null ? `eventLoopUtilization=${utilization}` : null,
|
||||
cpuCoreRatio != null ? `cpuCoreRatio=${cpuCoreRatio}` : null,
|
||||
]
|
||||
.filter((part): part is string => Boolean(part))
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
export function formatGatewayChannelsStatusLines(payload: Record<string, unknown>): string[] {
|
||||
const lines: string[] = [];
|
||||
lines.push(theme.success("Gateway reachable."));
|
||||
const eventLoopLine = formatEventLoopBits(payload.eventLoop);
|
||||
if (eventLoopLine) {
|
||||
lines.push(theme.warn(`Gateway event loop degraded: ${eventLoopLine}`));
|
||||
}
|
||||
const channelLabels =
|
||||
payload.channelLabels && typeof payload.channelLabels === "object"
|
||||
? (payload.channelLabels as Record<string, unknown>)
|
||||
@@ -108,6 +145,9 @@ export function formatGatewayChannelsStatusLines(payload: Record<string, unknown
|
||||
if (account.allowUnmentionedGroups === true) {
|
||||
bits.push("groups:unmentioned");
|
||||
}
|
||||
if (typeof account.healthState === "string" && account.healthState) {
|
||||
bits.push(`health:${account.healthState}`);
|
||||
}
|
||||
appendBaseUrlBit(bits, account);
|
||||
const probe = account.probe as { ok?: boolean } | undefined;
|
||||
if (probe && typeof probe.ok === "boolean") {
|
||||
|
||||
@@ -14,6 +14,11 @@ import { getRuntimeConfig } from "../config/config.js";
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { buildGatewayConnectionDetails, callGateway } from "../gateway/call.js";
|
||||
import {
|
||||
DEFAULT_CHANNEL_CONNECT_GRACE_MS,
|
||||
DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS,
|
||||
evaluateChannelHealth,
|
||||
} from "../gateway/channel-health-policy.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../gateway/server-channel-runtime.types.js";
|
||||
import { info } from "../globals.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
@@ -92,6 +97,20 @@ const formatDurationParts = (ms: number): string => {
|
||||
return parts.join(" ");
|
||||
};
|
||||
|
||||
function formatEventLoopHealthLine(summary: HealthSummary): string | null {
|
||||
const eventLoop = summary.eventLoop;
|
||||
if (!eventLoop) {
|
||||
return null;
|
||||
}
|
||||
const state = eventLoop.degraded ? "degraded" : "ok";
|
||||
const reasons = eventLoop.reasons.length > 0 ? ` reasons=${eventLoop.reasons.join(",")}` : "";
|
||||
return `Gateway event loop: ${state}${reasons} max=${Math.round(
|
||||
eventLoop.delayMaxMs,
|
||||
)}ms p99=${Math.round(eventLoop.delayP99Ms)}ms util=${eventLoop.utilization} cpu=${
|
||||
eventLoop.cpuCoreRatio
|
||||
}`;
|
||||
}
|
||||
|
||||
const resolveHeartbeatSummary = (cfg: OpenClawConfig, agentId: string) =>
|
||||
resolveHeartbeatSummaryForAgent(cfg, agentId);
|
||||
|
||||
@@ -307,6 +326,7 @@ export async function getHealthSnapshot(params?: {
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
runtimeSnapshot?: ChannelRuntimeSnapshot;
|
||||
eventLoop?: HealthSummary["eventLoop"];
|
||||
}): Promise<HealthSummary> {
|
||||
const timeoutMs = params?.timeoutMs;
|
||||
const cfg = getRuntimeConfig();
|
||||
@@ -432,6 +452,15 @@ export async function getHealthSnapshot(params?: {
|
||||
if (lastProbeAt) {
|
||||
snapshot.lastProbeAt = lastProbeAt;
|
||||
}
|
||||
const health = evaluateChannelHealth(snapshot, {
|
||||
channelId: plugin.id,
|
||||
now: Date.now(),
|
||||
staleEventThresholdMs: DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS,
|
||||
channelConnectGraceMs: DEFAULT_CHANNEL_CONNECT_GRACE_MS,
|
||||
});
|
||||
if (!health.healthy) {
|
||||
snapshot.healthState = health.reason;
|
||||
}
|
||||
|
||||
const summary = plugin.status?.buildChannelSummary
|
||||
? await plugin.status.buildChannelSummary({
|
||||
@@ -483,6 +512,7 @@ export async function getHealthSnapshot(params?: {
|
||||
ok: true,
|
||||
ts: Date.now(),
|
||||
durationMs: Date.now() - start,
|
||||
...(params?.eventLoop ? { eventLoop: params.eventLoop } : {}),
|
||||
...(pluginHealth ? { plugins: pluginHealth } : {}),
|
||||
channels,
|
||||
channelOrder,
|
||||
@@ -667,6 +697,10 @@ export async function healthCommand(
|
||||
for (const line of channelLines) {
|
||||
runtime.log(styleHealthChannelLine(line, rich));
|
||||
}
|
||||
const eventLoopLine = formatEventLoopHealthLine(summary);
|
||||
if (eventLoopLine) {
|
||||
runtime.log(styleHealthChannelLine(eventLoopLine, rich));
|
||||
}
|
||||
for (const plugin of displayPlugins) {
|
||||
const channelSummary = summary.channels?.[plugin.id];
|
||||
if (!channelSummary || channelSummary.linked !== true) {
|
||||
|
||||
@@ -39,6 +39,7 @@ export type HealthSummary = {
|
||||
ok: true;
|
||||
ts: number;
|
||||
durationMs: number;
|
||||
eventLoop?: import("../gateway/server/event-loop-health.js").GatewayEventLoopHealth;
|
||||
plugins?: PluginHealthSummary;
|
||||
channels: Record<string, ChannelHealthSummary>;
|
||||
channelOrder: string[];
|
||||
|
||||
@@ -137,6 +137,36 @@ describe("status.command-sections", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("adds degraded event-loop health to status rows", () => {
|
||||
const rows = buildStatusHealthRows({
|
||||
health: {
|
||||
durationMs: 42,
|
||||
eventLoop: {
|
||||
degraded: true,
|
||||
reasons: ["event_loop_delay"],
|
||||
intervalMs: 62_000,
|
||||
delayP99Ms: 61_000,
|
||||
delayMaxMs: 62_000,
|
||||
utilization: 1,
|
||||
cpuCoreRatio: 1,
|
||||
},
|
||||
} as HealthSummary,
|
||||
formatHealthChannelLines: () => [],
|
||||
ok: (value) => `ok(${value})`,
|
||||
warn: (value) => `warn(${value})`,
|
||||
muted: (value) => `muted(${value})`,
|
||||
});
|
||||
|
||||
expect(rows).toEqual([
|
||||
{ Item: "Gateway", Status: "ok(reachable)", Detail: "42ms" },
|
||||
{
|
||||
Item: "Event loop",
|
||||
Status: "warn(WARN)",
|
||||
Detail: "reasons event_loop_delay · max 62000ms · p99 61000ms · util 1 · cpu 1",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("builds footer lines from update and reachability state", () => {
|
||||
expect(
|
||||
buildStatusFooterLines({
|
||||
|
||||
@@ -23,6 +23,7 @@ type SummaryLike = Pick<StatusSummary, "tasks" | "taskAudit" | "heartbeat" | "se
|
||||
type MemoryLike = MemoryStatusSnapshot | null;
|
||||
type MemoryPluginLike = MemoryPluginStatus;
|
||||
type SessionsRecentLike = SessionStatus;
|
||||
type EventLoopHealthLike = NonNullable<HealthSummary["eventLoop"]>;
|
||||
|
||||
export type StatusMemoryStateResolvers = {
|
||||
resolveMemoryVectorState: (value: NonNullable<MemoryStatusSnapshot["vector"]>) => {
|
||||
@@ -260,6 +261,13 @@ export function buildStatusHealthRows(params: {
|
||||
Detail: `${params.health.durationMs}ms`,
|
||||
},
|
||||
];
|
||||
if (params.health.eventLoop) {
|
||||
rows.push({
|
||||
Item: "Event loop",
|
||||
Status: params.health.eventLoop.degraded ? params.warn("WARN") : params.ok("OK"),
|
||||
Detail: formatEventLoopHealthDetail(params.health.eventLoop),
|
||||
});
|
||||
}
|
||||
for (const line of params.formatHealthChannelLines(params.health, { accountMode: "all" })) {
|
||||
const colon = line.indexOf(":");
|
||||
if (colon === -1) {
|
||||
@@ -286,6 +294,17 @@ export function buildStatusHealthRows(params: {
|
||||
return rows;
|
||||
}
|
||||
|
||||
export function formatEventLoopHealthDetail(eventLoop: EventLoopHealthLike): string {
|
||||
const parts = [
|
||||
eventLoop.reasons.length > 0 ? `reasons ${eventLoop.reasons.join(",")}` : "healthy",
|
||||
`max ${Math.round(eventLoop.delayMaxMs)}ms`,
|
||||
`p99 ${Math.round(eventLoop.delayP99Ms)}ms`,
|
||||
`util ${eventLoop.utilization}`,
|
||||
`cpu ${eventLoop.cpuCoreRatio}`,
|
||||
];
|
||||
return parts.join(" · ");
|
||||
}
|
||||
|
||||
export function buildStatusSessionsRows(params: {
|
||||
recent: SessionsRecentLike[];
|
||||
verbose?: boolean;
|
||||
|
||||
@@ -39,6 +39,7 @@ export type HeartbeatStatus = {
|
||||
|
||||
export type StatusSummary = {
|
||||
runtimeVersion?: string | null;
|
||||
eventLoop?: import("../gateway/server/event-loop-health.js").GatewayEventLoopHealth;
|
||||
linkChannel?: {
|
||||
id: ChannelId;
|
||||
label: string;
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import AjvPkg from "ajv";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { WebLoginWaitParamsSchema } from "./schema/channels.js";
|
||||
import { ChannelsStatusResultSchema, WebLoginWaitParamsSchema } from "./schema/channels.js";
|
||||
|
||||
const Ajv = AjvPkg as unknown as new (opts?: object) => import("ajv").default;
|
||||
|
||||
describe("WebLoginWaitParamsSchema", () => {
|
||||
const Ajv = AjvPkg as unknown as new (opts?: object) => import("ajv").default;
|
||||
const validate = new Ajv().compile(WebLoginWaitParamsSchema);
|
||||
|
||||
it("bounds caller-provided QR data URLs", () => {
|
||||
@@ -25,3 +26,40 @@ describe("WebLoginWaitParamsSchema", () => {
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ChannelsStatusResultSchema", () => {
|
||||
const validate = new Ajv().compile(ChannelsStatusResultSchema);
|
||||
|
||||
it("accepts gateway event-loop diagnostics emitted by channels.status", () => {
|
||||
expect(
|
||||
validate({
|
||||
ts: Date.now(),
|
||||
channelOrder: ["discord"],
|
||||
channelLabels: { discord: "Discord" },
|
||||
channels: { discord: { configured: true } },
|
||||
channelAccounts: {
|
||||
discord: [
|
||||
{
|
||||
accountId: "default",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
running: true,
|
||||
connected: false,
|
||||
healthState: "stale-socket",
|
||||
},
|
||||
],
|
||||
},
|
||||
channelDefaultAccountId: { discord: "default" },
|
||||
eventLoop: {
|
||||
degraded: true,
|
||||
reasons: ["event_loop_delay", "cpu"],
|
||||
intervalMs: 62_000,
|
||||
delayP99Ms: 1_250.5,
|
||||
delayMaxMs: 62_000,
|
||||
utilization: 0.98,
|
||||
cpuCoreRatio: 1.2,
|
||||
},
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -287,6 +287,25 @@ export const ChannelUiMetaSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const ChannelEventLoopHealthSchema = Type.Object(
|
||||
{
|
||||
degraded: Type.Boolean(),
|
||||
reasons: Type.Array(
|
||||
Type.Union([
|
||||
Type.Literal("event_loop_delay"),
|
||||
Type.Literal("event_loop_utilization"),
|
||||
Type.Literal("cpu"),
|
||||
]),
|
||||
),
|
||||
intervalMs: Type.Integer({ minimum: 0 }),
|
||||
delayP99Ms: Type.Number({ minimum: 0 }),
|
||||
delayMaxMs: Type.Number({ minimum: 0 }),
|
||||
utilization: Type.Number({ minimum: 0 }),
|
||||
cpuCoreRatio: Type.Number({ minimum: 0 }),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const ChannelsStatusResultSchema = Type.Object(
|
||||
{
|
||||
ts: Type.Integer({ minimum: 0 }),
|
||||
@@ -298,6 +317,7 @@ export const ChannelsStatusResultSchema = Type.Object(
|
||||
channels: Type.Record(NonEmptyString, Type.Unknown()),
|
||||
channelAccounts: Type.Record(NonEmptyString, Type.Array(ChannelAccountSnapshotSchema)),
|
||||
channelDefaultAccountId: Type.Record(NonEmptyString, NonEmptyString),
|
||||
eventLoop: Type.Optional(ChannelEventLoopHealthSchema),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
@@ -163,4 +163,60 @@ describe("channelsHandlers channels.status", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("annotates unhealthy channel snapshots and includes event-loop health", async () => {
|
||||
const now = Date.now();
|
||||
mocks.applyPluginAutoEnable.mockReturnValue({ config: { autoEnabled: true }, changes: [] });
|
||||
mocks.buildChannelAccountSnapshot.mockResolvedValue({
|
||||
accountId: "default",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
running: true,
|
||||
connected: true,
|
||||
lastStartAt: now - 60 * 60_000,
|
||||
lastTransportActivityAt: now - 40 * 60_000,
|
||||
});
|
||||
const eventLoop = {
|
||||
degraded: true,
|
||||
reasons: ["event_loop_delay"],
|
||||
intervalMs: 62_000,
|
||||
delayP99Ms: 62_000,
|
||||
delayMaxMs: 62_000,
|
||||
utilization: 1,
|
||||
cpuCoreRatio: 1,
|
||||
};
|
||||
const respond = vi.fn();
|
||||
|
||||
await channelsHandlers["channels.status"](
|
||||
createOptions(
|
||||
{ probe: false, timeoutMs: 2000 },
|
||||
{
|
||||
respond,
|
||||
context: {
|
||||
getRuntimeConfig: mocks.getRuntimeConfig,
|
||||
getRuntimeSnapshot: () => ({
|
||||
channels: {},
|
||||
channelAccounts: {},
|
||||
}),
|
||||
getEventLoopHealth: () => eventLoop,
|
||||
} as never,
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({
|
||||
eventLoop,
|
||||
channelAccounts: {
|
||||
whatsapp: [
|
||||
expect.objectContaining({
|
||||
healthState: "stale-socket",
|
||||
}),
|
||||
],
|
||||
},
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -17,6 +17,11 @@ import { DEFAULT_ACCOUNT_ID } from "../../routing/session-key.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { runTasksWithConcurrency } from "../../utils/run-with-concurrency.js";
|
||||
import {
|
||||
DEFAULT_CHANNEL_CONNECT_GRACE_MS,
|
||||
DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS,
|
||||
evaluateChannelHealth,
|
||||
} from "../channel-health-policy.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
@@ -277,6 +282,15 @@ export const channelsHandlers: GatewayRequestHandlers = {
|
||||
if (snapshot.lastOutboundAt == null) {
|
||||
snapshot.lastOutboundAt = activity.outboundAt;
|
||||
}
|
||||
const health = evaluateChannelHealth(snapshot, {
|
||||
channelId,
|
||||
now: Date.now(),
|
||||
staleEventThresholdMs: DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS,
|
||||
channelConnectGraceMs: DEFAULT_CHANNEL_CONNECT_GRACE_MS,
|
||||
});
|
||||
if (!health.healthy) {
|
||||
snapshot.healthState = health.reason;
|
||||
}
|
||||
return { accountId: accountId, account, snapshot };
|
||||
};
|
||||
|
||||
@@ -324,6 +338,7 @@ export const channelsHandlers: GatewayRequestHandlers = {
|
||||
channelDetailLabels: uiCatalog.detailLabels,
|
||||
channelSystemImages: uiCatalog.systemImages,
|
||||
channelMeta: uiCatalog.entries,
|
||||
...(context.getEventLoopHealth ? { eventLoop: context.getEventLoopHealth() } : {}),
|
||||
channels: {} as Record<string, unknown>,
|
||||
channelAccounts: {} as Record<string, unknown>,
|
||||
channelDefaultAccountId: {} as Record<string, unknown>,
|
||||
|
||||
@@ -107,6 +107,9 @@ export const healthHandlers: GatewayRequestHandlers = {
|
||||
!cachedDiffersFromRuntime &&
|
||||
now - cached.ts < HEALTH_REFRESH_INTERVAL_MS
|
||||
) {
|
||||
if (context.getEventLoopHealth) {
|
||||
cached.eventLoop = context.getEventLoopHealth();
|
||||
}
|
||||
respond(true, cached, undefined, { cached: true });
|
||||
void refreshHealthSnapshot({ probe: false, includeSensitive }).catch((err) =>
|
||||
logHealth.error(`background health refresh failed: ${formatError(err)}`),
|
||||
@@ -120,12 +123,15 @@ export const healthHandlers: GatewayRequestHandlers = {
|
||||
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)));
|
||||
}
|
||||
},
|
||||
status: async ({ respond, client, params }) => {
|
||||
status: async ({ respond, client, params, context }) => {
|
||||
const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : [];
|
||||
const status = await getStatusSummary({
|
||||
includeSensitive: scopes.includes(ADMIN_SCOPE),
|
||||
includeChannelSummary: params.includeChannelSummary !== false,
|
||||
});
|
||||
if (context.getEventLoopHealth) {
|
||||
status.eventLoop = context.getEventLoopHealth();
|
||||
}
|
||||
respond(true, status, undefined);
|
||||
},
|
||||
};
|
||||
|
||||
@@ -2008,6 +2008,65 @@ describe("gateway healthHandlers.health cache freshness", () => {
|
||||
expect(respond).toHaveBeenCalledWith(true, fresh, undefined);
|
||||
});
|
||||
|
||||
it("preserves event-loop health sampled by the refresh path", async () => {
|
||||
const eventLoop = {
|
||||
degraded: true,
|
||||
reasons: ["event_loop_delay" as const],
|
||||
intervalMs: 2_000,
|
||||
delayP99Ms: 1_500,
|
||||
delayMaxMs: 1_800,
|
||||
utilization: 0.2,
|
||||
cpuCoreRatio: 0.1,
|
||||
};
|
||||
const replacementEventLoop = {
|
||||
degraded: false,
|
||||
reasons: [],
|
||||
intervalMs: 1,
|
||||
delayP99Ms: 0,
|
||||
delayMaxMs: 0,
|
||||
utilization: 0,
|
||||
cpuCoreRatio: 0,
|
||||
};
|
||||
const fresh = {
|
||||
ok: true,
|
||||
ts: Date.now(),
|
||||
durationMs: 1,
|
||||
channels: {},
|
||||
channelOrder: [],
|
||||
channelLabels: {},
|
||||
heartbeatSeconds: 0,
|
||||
defaultAgentId: "main",
|
||||
agents: [],
|
||||
sessions: { path: "/tmp/sessions.json", count: 0, recent: [] },
|
||||
eventLoop,
|
||||
};
|
||||
const respond = vi.fn();
|
||||
const refreshHealthSnapshot = vi.fn().mockResolvedValue(fresh);
|
||||
const getEventLoopHealth = vi.fn(() => replacementEventLoop);
|
||||
|
||||
await healthHandlers.health({
|
||||
req: {} as never,
|
||||
params: {} as never,
|
||||
respond: respond as never,
|
||||
context: {
|
||||
getHealthCache: () => null,
|
||||
refreshHealthSnapshot,
|
||||
getRuntimeSnapshot: () => ({ channels: {}, channelAccounts: {} }),
|
||||
getEventLoopHealth,
|
||||
logHealth: { error: vi.fn() },
|
||||
} as never,
|
||||
client: { connect: { role: "operator", scopes: ["operator.read"] } } as never,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({
|
||||
probe: false,
|
||||
includeSensitive: false,
|
||||
});
|
||||
expect(getEventLoopHealth).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(true, expect.objectContaining({ eventLoop }), undefined);
|
||||
});
|
||||
|
||||
it("refreshes cached health when a runtime account is missing from the cached account summary", async () => {
|
||||
const cached = {
|
||||
ok: true,
|
||||
|
||||
@@ -13,6 +13,7 @@ import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.
|
||||
import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast-types.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js";
|
||||
import type { DedupeEntry } from "../server-shared.js";
|
||||
import type { GatewayEventLoopHealth } from "../server/event-loop-health.js";
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
|
||||
@@ -91,6 +92,7 @@ export type GatewayRequestContext = {
|
||||
findRunningWizard: () => string | null;
|
||||
purgeWizardSession: (id: string) => void;
|
||||
getRuntimeSnapshot: () => ChannelRuntimeSnapshot;
|
||||
getEventLoopHealth?: () => GatewayEventLoopHealth | undefined;
|
||||
startChannel: (
|
||||
channel: import("../../channels/plugins/types.public.js").ChannelId,
|
||||
accountId?: string,
|
||||
|
||||
@@ -52,6 +52,7 @@ type GatewayRequestContextParams = {
|
||||
findRunningWizard: GatewayRequestContext["findRunningWizard"];
|
||||
purgeWizardSession: GatewayRequestContext["purgeWizardSession"];
|
||||
getRuntimeSnapshot: GatewayRequestContext["getRuntimeSnapshot"];
|
||||
getEventLoopHealth?: GatewayRequestContext["getEventLoopHealth"];
|
||||
startChannel: GatewayRequestContext["startChannel"];
|
||||
stopChannel: GatewayRequestContext["stopChannel"];
|
||||
markChannelLoggedOut: GatewayRequestContext["markChannelLoggedOut"];
|
||||
@@ -147,6 +148,7 @@ export function createGatewayRequestContext(
|
||||
findRunningWizard: params.findRunningWizard,
|
||||
purgeWizardSession: params.purgeWizardSession,
|
||||
getRuntimeSnapshot: params.getRuntimeSnapshot,
|
||||
getEventLoopHealth: params.getEventLoopHealth,
|
||||
startChannel: params.startChannel,
|
||||
stopChannel: params.stopChannel,
|
||||
markChannelLoggedOut: params.markChannelLoggedOut,
|
||||
|
||||
@@ -874,6 +874,7 @@ export async function startGatewayServer(
|
||||
refreshGatewayHealthSnapshot({
|
||||
...opts,
|
||||
getRuntimeSnapshot,
|
||||
getEventLoopHealth: readinessEventLoopHealth.snapshot,
|
||||
});
|
||||
const createCloseHandler =
|
||||
() => async (opts?: { reason?: string; restartExpectedMs?: number | null }) => {
|
||||
@@ -1221,6 +1222,7 @@ export async function startGatewayServer(
|
||||
findRunningWizard,
|
||||
purgeWizardSession,
|
||||
getRuntimeSnapshot,
|
||||
getEventLoopHealth: readinessEventLoopHealth.snapshot,
|
||||
startChannel,
|
||||
stopChannel,
|
||||
markChannelLoggedOut,
|
||||
|
||||
@@ -57,10 +57,37 @@ describe("refreshGatewayHealthSnapshot", () => {
|
||||
includeSensitive: false,
|
||||
runtimeSnapshot: undefined,
|
||||
});
|
||||
expect(Object.hasOwn(getHealthSnapshotMock.mock.calls[0]?.[0] ?? {}, "eventLoop")).toBe(false);
|
||||
resolveSnapshot?.(createHealthSummary());
|
||||
await expect(Promise.all([first, second])).resolves.toHaveLength(2);
|
||||
});
|
||||
|
||||
it("passes event-loop health only when the hook returns a snapshot", async () => {
|
||||
const healthState = await loadHealthState();
|
||||
const eventLoop = {
|
||||
degraded: true,
|
||||
reasons: ["event_loop_delay" as const],
|
||||
intervalMs: 2_000,
|
||||
delayP99Ms: 1_500,
|
||||
delayMaxMs: 1_700,
|
||||
utilization: 0.2,
|
||||
cpuCoreRatio: 0.1,
|
||||
};
|
||||
|
||||
await healthState.refreshGatewayHealthSnapshot({
|
||||
probe: false,
|
||||
getEventLoopHealth: () => eventLoop,
|
||||
});
|
||||
await healthState.refreshGatewayHealthSnapshot({
|
||||
probe: true,
|
||||
getEventLoopHealth: () => undefined,
|
||||
});
|
||||
|
||||
expect(getHealthSnapshotMock).toHaveBeenCalledTimes(2);
|
||||
expect(getHealthSnapshotMock.mock.calls[0]?.[0]?.eventLoop).toBe(eventLoop);
|
||||
expect(Object.hasOwn(getHealthSnapshotMock.mock.calls[1]?.[0] ?? {}, "eventLoop")).toBe(false);
|
||||
});
|
||||
|
||||
it("captures runtime snapshots for completed refreshes and guards snapshot failures", async () => {
|
||||
const healthState = await loadHealthState();
|
||||
const runtimeSnapshot = {
|
||||
@@ -98,7 +125,9 @@ describe("refreshGatewayHealthSnapshot", () => {
|
||||
const sensitiveSummary = createHealthSummary();
|
||||
const safeSummary = createHealthSummary();
|
||||
const broadcast = vi.fn();
|
||||
getHealthSnapshotMock.mockResolvedValueOnce(sensitiveSummary).mockResolvedValueOnce(safeSummary);
|
||||
getHealthSnapshotMock
|
||||
.mockResolvedValueOnce(sensitiveSummary)
|
||||
.mockResolvedValueOnce(safeSummary);
|
||||
healthState.setBroadcastHealthUpdate(broadcast);
|
||||
const version = healthState.getHealthVersion();
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import { normalizeMainKey } from "../../routing/session-key.js";
|
||||
import { resolveGatewayAuth } from "../auth.js";
|
||||
import type { Snapshot } from "../protocol/index.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js";
|
||||
import type { GatewayEventLoopHealth } from "./event-loop-health.js";
|
||||
|
||||
let presenceVersion = 1;
|
||||
let healthVersion = 1;
|
||||
@@ -76,6 +77,7 @@ export async function refreshGatewayHealthSnapshot(opts?: {
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
getRuntimeSnapshot?: () => ChannelRuntimeSnapshot;
|
||||
getEventLoopHealth?: () => GatewayEventLoopHealth | undefined;
|
||||
}) {
|
||||
const includeSensitive = opts?.includeSensitive === true;
|
||||
let refresh = includeSensitive ? sensitiveHealthRefresh : healthRefresh;
|
||||
@@ -87,10 +89,12 @@ export async function refreshGatewayHealthSnapshot(opts?: {
|
||||
} catch {
|
||||
runtimeSnapshot = undefined;
|
||||
}
|
||||
const eventLoop = opts?.getEventLoopHealth?.();
|
||||
const snap = await getHealthSnapshot({
|
||||
probe: opts?.probe,
|
||||
includeSensitive,
|
||||
runtimeSnapshot,
|
||||
...(eventLoop ? { eventLoop } : {}),
|
||||
});
|
||||
if (!includeSensitive) {
|
||||
healthCache = snap;
|
||||
|
||||
@@ -45,6 +45,31 @@ describe("buildTimeoutAbortSignal", () => {
|
||||
cleanup();
|
||||
});
|
||||
|
||||
it("annotates timeout logs when the timer fires late", async () => {
|
||||
vi.setSystemTime(0);
|
||||
const { cleanup } = buildTimeoutAbortSignal({
|
||||
timeoutMs: 25,
|
||||
operation: "unit-test",
|
||||
url: "https://example.com/v1/responses",
|
||||
});
|
||||
|
||||
vi.setSystemTime(2_000);
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"fetch timeout reached; aborting operation",
|
||||
expect.objectContaining({
|
||||
timerDelayMs: 2000,
|
||||
eventLoopDelayHint: "timer delayed 2000ms, likely event-loop starvation",
|
||||
consoleMessage: expect.stringContaining(
|
||||
"timer delayed 2000ms, likely event-loop starvation",
|
||||
),
|
||||
}),
|
||||
);
|
||||
|
||||
cleanup();
|
||||
});
|
||||
|
||||
it("strips query strings and hashes from relative timeout URL logs", async () => {
|
||||
const { cleanup } = buildTimeoutAbortSignal({
|
||||
timeoutMs: 25,
|
||||
|
||||
@@ -65,9 +65,15 @@ function abortDueToTimeout(
|
||||
}
|
||||
const sanitizedUrl = sanitizeTimeoutLogUrl(url);
|
||||
const elapsedMs = Math.max(0, Date.now() - startedAtMs);
|
||||
const delayMs = Math.max(0, elapsedMs - timeoutMs);
|
||||
const eventLoopDelayHint =
|
||||
delayMs >= Math.max(1000, timeoutMs * 0.5)
|
||||
? `timer delayed ${delayMs}ms, likely event-loop starvation`
|
||||
: null;
|
||||
const consoleMessage = [
|
||||
`fetch timeout after ${timeoutMs}ms`,
|
||||
`(elapsed ${elapsedMs}ms)`,
|
||||
eventLoopDelayHint,
|
||||
operation ? `operation=${operation}` : null,
|
||||
sanitizedUrl ? `url=${sanitizedUrl}` : null,
|
||||
]
|
||||
@@ -76,6 +82,7 @@ function abortDueToTimeout(
|
||||
log.warn("fetch timeout reached; aborting operation", {
|
||||
timeoutMs,
|
||||
elapsedMs,
|
||||
...(eventLoopDelayHint ? { timerDelayMs: delayMs, eventLoopDelayHint } : {}),
|
||||
consoleMessage,
|
||||
...(operation ? { operation } : {}),
|
||||
...(sanitizedUrl ? { url: sanitizedUrl } : {}),
|
||||
|
||||
Reference in New Issue
Block a user