diff --git a/CHANGELOG.md b/CHANGELOG.md index 41dd8731259..950bb1cffac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -238,6 +238,7 @@ Docs: https://docs.openclaw.ai - Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc. - Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq. - Hooks/session-memory: use the host local timezone for memory filenames, fallback timestamp slugs, and markdown headers instead of UTC dates. Fixes #46703. (#46721) Thanks @Astro-Han. +- Gateway health: preserve live runtime-backed channel/account state in `gateway.health` snapshots and cached refreshes while keeping raw probe payloads on sensitive/admin paths only. (#39921, #42586, #46527, #52770, #42543) Thanks @FAL1989, @rstar327, @0xble, and @ajayr. - Feishu: extract quoted/replied interactive-card text across schema 1.0, schema 2.0, i18n, template-variable, and post-format fallback shapes without carrying broad generated/config churn from related parser experiments. (#38776, #60383, #42218, #45936) Thanks @lishuaigit, @lskun, @just2gooo, and @Br1an67. - Telegram/agents: hide raw failed write/edit warning messages in Telegram when the assistant already explicitly acknowledges the failed action, while keeping warnings when the reply claims success or omits the failure; #39406 remains the broader configurable delivery-policy follow-up. Fixes #51065; covers #39631. Thanks @Bartok9 and @Bortlesboat. - Exec approvals: accept a symlinked `OPENCLAW_HOME` as the trusted approvals root while still rejecting symlinked `.openclaw` path components below it. (#64663) Thanks @FunJim. diff --git a/src/channels/account-snapshot-fields.test.ts b/src/channels/account-snapshot-fields.test.ts index 938374004a9..9a563d42343 100644 --- a/src/channels/account-snapshot-fields.test.ts +++ b/src/channels/account-snapshot-fields.test.ts @@ -37,12 +37,25 @@ describe("projectSafeChannelAccountSnapshotFields", () => { it("preserves non-secret transport liveness timestamps", () => { const snapshot = projectSafeChannelAccountSnapshotFields({ + connected: true, + lastConnectedAt: 123, lastInboundAt: 123, + lastOutboundAt: 234, + lastMessageAt: null, + lastEventAt: 345, lastTransportActivityAt: 456, + channelAccessToken: "line-token", + channelSecret: "line-secret", // pragma: allowlist secret + probe: { ok: true, token: "probe-secret" }, }); expect(snapshot).toEqual({ + connected: true, + lastConnectedAt: 123, lastInboundAt: 123, + lastOutboundAt: 234, + lastMessageAt: null, + lastEventAt: 345, lastTransportActivityAt: 456, }); }); diff --git a/src/channels/account-snapshot-fields.ts b/src/channels/account-snapshot-fields.ts index d1100cad376..98e0ec402c6 100644 --- a/src/channels/account-snapshot-fields.ts +++ b/src/channels/account-snapshot-fields.ts @@ -26,6 +26,16 @@ function readNumber(record: Record, key: string): number | unde return typeof value === "number" && Number.isFinite(value) ? value : undefined; } +function readNullableNumber( + record: Record, + key: string, +): number | null | undefined { + if (record[key] === null) { + return null; + } + return readNumber(record, key); +} + function readStringArray(record: Record, key: string): string[] | undefined { const value = record[key]; if (!Array.isArray(value)) { @@ -162,6 +172,7 @@ export function projectSafeChannelAccountSnapshotFields( return {}; } const name = normalizeOptionalString(record.name); + const statusState = normalizeOptionalString(record.statusState); const healthState = normalizeOptionalString(record.healthState); const mode = normalizeOptionalString(record.mode); const dmPolicy = normalizeOptionalString(record.dmPolicy); @@ -180,16 +191,39 @@ export function projectSafeChannelAccountSnapshotFields( ...(readBoolean(record, "connected") !== undefined ? { connected: readBoolean(record, "connected") } : {}), + ...(readBoolean(record, "restartPending") !== undefined + ? { restartPending: readBoolean(record, "restartPending") } + : {}), ...(readNumber(record, "reconnectAttempts") !== undefined ? { reconnectAttempts: readNumber(record, "reconnectAttempts") } : {}), + ...(readNullableNumber(record, "lastConnectedAt") !== undefined + ? { lastConnectedAt: readNullableNumber(record, "lastConnectedAt") } + : {}), ...(readNumber(record, "lastInboundAt") !== undefined ? { lastInboundAt: readNumber(record, "lastInboundAt") } : {}), + ...(readNullableNumber(record, "lastOutboundAt") !== undefined + ? { lastOutboundAt: readNullableNumber(record, "lastOutboundAt") } + : {}), + ...(readNullableNumber(record, "lastMessageAt") !== undefined + ? { lastMessageAt: readNullableNumber(record, "lastMessageAt") } + : {}), + ...(readNullableNumber(record, "lastEventAt") !== undefined + ? { lastEventAt: readNullableNumber(record, "lastEventAt") } + : {}), ...(readNumber(record, "lastTransportActivityAt") !== undefined ? { lastTransportActivityAt: readNumber(record, "lastTransportActivityAt") } : {}), + ...(statusState ? { statusState } : {}), ...(healthState ? { healthState } : {}), + ...(readBoolean(record, "busy") !== undefined ? { busy: readBoolean(record, "busy") } : {}), + ...(readNumber(record, "activeRuns") !== undefined + ? { activeRuns: readNumber(record, "activeRuns") } + : {}), + ...(readNullableNumber(record, "lastRunActivityAt") !== undefined + ? { lastRunActivityAt: readNullableNumber(record, "lastRunActivityAt") } + : {}), ...(mode ? { mode } : {}), ...(dmPolicy ? { dmPolicy } : {}), ...(readStringArray(record, "allowFrom") diff --git a/src/commands/health.snapshot.test.ts b/src/commands/health.snapshot.test.ts index d9ff11c4aa3..cbb28559e52 100644 --- a/src/commands/health.snapshot.test.ts +++ b/src/commands/health.snapshot.test.ts @@ -13,6 +13,10 @@ let setActivePluginRegistry: typeof import("../plugins/runtime.js").setActivePlu let createChannelTestPluginBase: typeof import("../test-utils/channel-plugins.js").createChannelTestPluginBase; let createTestRegistry: typeof import("../test-utils/channel-plugins.js").createTestRegistry; let getHealthSnapshot: typeof import("./health.js").getHealthSnapshot; +let buildTelegramHealthSummaryForTest = buildTelegramHealthSummary; +let probeTelegramAccountForTestOverride: + | ((account: TelegramHealthAccount, timeoutMs: number) => Promise>) + | undefined; type TelegramHealthAccount = { accountId: string; @@ -289,9 +293,12 @@ function createTelegramHealthPlugin(): Pick< isConfigured: (account) => Boolean((account as TelegramHealthAccount).token.trim()), }, status: { - buildChannelSummary: ({ snapshot }) => buildTelegramHealthSummary(snapshot), + buildChannelSummary: ({ snapshot }) => buildTelegramHealthSummaryForTest(snapshot), probeAccount: async ({ account, timeoutMs }) => - await probeTelegramAccountForTest(account as TelegramHealthAccount, timeoutMs), + await (probeTelegramAccountForTestOverride ?? probeTelegramAccountForTest)( + account as TelegramHealthAccount, + timeoutMs, + ), }, }; } @@ -307,6 +314,8 @@ describe("getHealthSnapshot", () => { }); beforeEach(() => { + buildTelegramHealthSummaryForTest = buildTelegramHealthSummary; + probeTelegramAccountForTestOverride = undefined; setActivePluginRegistry( createTestRegistry([ { pluginId: "telegram", plugin: createTelegramHealthPlugin(), source: "test" }, @@ -421,6 +430,116 @@ describe("getHealthSnapshot", () => { } }); + it("preserves runtime state and probe payloads when plugin summaries omit them", async () => { + testConfig = { channels: { telegram: { botToken: "t-1" } } }; + testStore = {}; + vi.stubEnv("DISCORD_BOT_TOKEN", ""); + buildTelegramHealthSummaryForTest = (snapshot) => ({ + accountId: snapshot.accountId, + configured: Boolean(snapshot.configured), + }); + probeTelegramAccountForTestOverride = async () => ({ + ok: true, + bot: { username: "runtime_bot" }, + }); + + const snap = await getHealthSnapshot({ + timeoutMs: 25, + runtimeSnapshot: { + channels: { + telegram: { + accountId: "default", + connected: true, + lastConnectedAt: 123, + }, + }, + channelAccounts: {}, + }, + }); + const telegram = snap.channels.telegram as { + connected?: boolean; + lastConnectedAt?: number; + probe?: { ok?: boolean; bot?: { username?: string } }; + accounts?: Record< + string, + { + connected?: boolean; + lastConnectedAt?: number; + probe?: { ok?: boolean; bot?: { username?: string } }; + } + >; + }; + + expect(telegram.connected).toBe(true); + expect(telegram.lastConnectedAt).toBe(123); + expect(telegram.probe?.bot?.username).toBe("runtime_bot"); + expect(telegram.accounts?.default?.connected).toBe(true); + expect(telegram.accounts?.default?.probe?.ok).toBe(true); + }); + + it("omits secret runtime fields and raw probe payloads from non-sensitive health snapshots", async () => { + testConfig = { channels: { telegram: { botToken: "t-1" } } }; + testStore = {}; + vi.stubEnv("DISCORD_BOT_TOKEN", ""); + buildTelegramHealthSummaryForTest = (snapshot) => ({ + accountId: snapshot.accountId, + configured: Boolean(snapshot.configured), + probe: { ok: true, token: "summary-secret" }, + }); + probeTelegramAccountForTestOverride = async () => ({ + ok: true, + bot: { username: "runtime_bot" }, + token: "probe-secret", + }); + + const snap = await getHealthSnapshot({ + timeoutMs: 25, + includeSensitive: false, + runtimeSnapshot: { + channels: { + telegram: { + accountId: "default", + connected: true, + lastConnectedAt: 123, + channelAccessToken: "line-token", + channelSecret: "line-secret", // pragma: allowlist secret + webhookUrl: "https://example.test/hook?secret=1", + }, + }, + channelAccounts: {}, + }, + }); + const telegram = snap.channels.telegram as { + connected?: boolean; + lastConnectedAt?: number; + probe?: unknown; + channelAccessToken?: string; + channelSecret?: string; + webhookUrl?: string; + accounts?: Record< + string, + { + connected?: boolean; + lastConnectedAt?: number; + probe?: unknown; + channelAccessToken?: string; + channelSecret?: string; + webhookUrl?: string; + } + >; + }; + + expect(telegram.connected).toBe(true); + expect(telegram.lastConnectedAt).toBe(123); + expect(telegram.probe).toBeUndefined(); + expect(telegram.channelAccessToken).toBeUndefined(); + expect(telegram.channelSecret).toBeUndefined(); + expect(telegram.webhookUrl).toBeUndefined(); + expect(telegram.accounts?.default?.connected).toBe(true); + expect(telegram.accounts?.default?.probe).toBeUndefined(); + expect(telegram.accounts?.default?.channelAccessToken).toBeUndefined(); + }); + it("returns structured telegram probe errors", async () => { testConfig = { channels: { telegram: { botToken: "bad-token" } } }; testStore = {}; diff --git a/src/commands/health.ts b/src/commands/health.ts index 9ff7b40a4ad..5f5c01db314 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -1,4 +1,5 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js"; +import { projectSafeChannelAccountSnapshotFields } from "../channels/account-snapshot-fields.js"; import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js"; import { listReadOnlyChannelPluginsForConfig } from "../channels/plugins/read-only.js"; import type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; @@ -9,6 +10,7 @@ import { getRuntimeConfig } from "../config/config.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { buildGatewayConnectionDetails, callGateway } from "../gateway/call.js"; +import type { ChannelRuntimeSnapshot } from "../gateway/server-channel-runtime.types.js"; import { info } from "../globals.js"; import { isTruthyEnvValue } from "../infra/env.js"; import { formatErrorMessage } from "../infra/errors.js"; @@ -255,6 +257,8 @@ async function resolveHealthAccountContext(params: { export async function getHealthSnapshot(params?: { timeoutMs?: number; probe?: boolean; + includeSensitive?: boolean; + runtimeSnapshot?: ChannelRuntimeSnapshot; }): Promise { const timeoutMs = params?.timeoutMs; const cfg = getRuntimeConfig(); @@ -285,6 +289,7 @@ export async function getHealthSnapshot(params?: { const start = Date.now(); const cappedTimeout = timeoutMs === undefined ? DEFAULT_TIMEOUT_MS : Math.max(50, timeoutMs); const doProbe = params?.probe !== false; + const includeSensitive = params?.includeSensitive !== false; const channels: Record = {}; const plugins = listReadOnlyChannelPluginsForConfig(cfg, { includeSetupRuntimeFallback: false, @@ -362,12 +367,16 @@ export async function getHealthSnapshot(params?: { debugHealth("probe.bot", { channel: plugin.id, accountId, username: bot.username }); } + const runtimeSnapshot = + params?.runtimeSnapshot?.channelAccounts[plugin.id]?.[accountId] ?? + (accountId === defaultAccountId ? params?.runtimeSnapshot?.channels[plugin.id] : undefined); const snapshot: ChannelAccountSnapshot = { + ...projectSafeChannelAccountSnapshotFields(runtimeSnapshot), accountId, enabled, configured, }; - if (probe !== undefined) { + if (includeSensitive && probe !== undefined) { snapshot.probe = probe; } if (lastProbeAt) { @@ -384,16 +393,21 @@ export async function getHealthSnapshot(params?: { : undefined; const record = summary && typeof summary === "object" - ? (summary as ChannelAccountHealthSummary) + ? ({ ...snapshot, ...summary } as ChannelAccountHealthSummary) : ({ + ...snapshot, accountId, configured, - probe, - lastProbeAt, } satisfies ChannelAccountHealthSummary); if (record.configured === undefined) { record.configured = configured; } + if (includeSensitive && record.probe === undefined && probe !== undefined) { + record.probe = probe; + } + if (!includeSensitive) { + delete record.probe; + } if (record.lastProbeAt === undefined && lastProbeAt) { record.lastProbeAt = lastProbeAt; } diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index ce4fe1d7956..c7fd4bcfd37 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -26,7 +26,10 @@ export function startGatewayMaintenanceTimers(params: { nodeSendToAllSubscribed: (event: string, payload: unknown) => void; getPresenceVersion: () => number; getHealthVersion: () => number; - refreshGatewayHealthSnapshot: (opts?: { probe?: boolean }) => Promise; + refreshGatewayHealthSnapshot: (opts?: { + probe?: boolean; + includeSensitive?: boolean; + }) => Promise; logHealth: { error: (msg: string) => void }; dedupe: Map; chatAbortControllers: Map; diff --git a/src/gateway/server-methods/health.ts b/src/gateway/server-methods/health.ts index f89030a14c6..b7e8d55f701 100644 --- a/src/gateway/server-methods/health.ts +++ b/src/gateway/server-methods/health.ts @@ -8,20 +8,22 @@ import type { GatewayRequestHandlers } from "./types.js"; const ADMIN_SCOPE = "operator.admin"; export const healthHandlers: GatewayRequestHandlers = { - health: async ({ respond, context, params }) => { + health: async ({ respond, context, params, client }) => { const { getHealthCache, refreshHealthSnapshot, logHealth } = context; const wantsProbe = params?.probe === true; + const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : []; + const includeSensitive = scopes.includes(ADMIN_SCOPE); const now = Date.now(); const cached = getHealthCache(); if (!wantsProbe && cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) { respond(true, cached, undefined, { cached: true }); - void refreshHealthSnapshot({ probe: false }).catch((err) => + void refreshHealthSnapshot({ probe: false, includeSensitive }).catch((err) => logHealth.error(`background health refresh failed: ${formatError(err)}`), ); return; } try { - const snap = await refreshHealthSnapshot({ probe: wantsProbe }); + const snap = await refreshHealthSnapshot({ probe: wantsProbe, includeSensitive }); respond(true, snap, undefined); } catch (err) { respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); diff --git a/src/gateway/server-methods/shared-types.ts b/src/gateway/server-methods/shared-types.ts index 2196d6c47e5..f15b0bd28dc 100644 --- a/src/gateway/server-methods/shared-types.ts +++ b/src/gateway/server-methods/shared-types.ts @@ -46,7 +46,10 @@ export type GatewayRequestContext = { pluginApprovalManager?: ExecApprovalManager; loadGatewayModelCatalog: () => Promise; getHealthCache: () => HealthSummary | null; - refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise; + refreshHealthSnapshot: (opts?: { + probe?: boolean; + includeSensitive?: boolean; + }) => Promise; logHealth: { error: (message: string) => void }; logGateway: SubsystemLogger; incrementPresenceVersion: () => number; diff --git a/src/gateway/server-node-events-types.ts b/src/gateway/server-node-events-types.ts index 8be460be957..bacc3c14239 100644 --- a/src/gateway/server-node-events-types.ts +++ b/src/gateway/server-node-events-types.ts @@ -25,7 +25,10 @@ export type NodeEventContext = { dedupe: Map; agentRunSeq: Map; getHealthCache: () => HealthSummary | null; - refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise; + refreshHealthSnapshot: (opts?: { + probe?: boolean; + includeSensitive?: boolean; + }) => Promise; loadGatewayModelCatalog: () => Promise; logGateway: { warn: (msg: string) => void }; }; diff --git a/src/gateway/server-ws-runtime.ts b/src/gateway/server-ws-runtime.ts index 2f976a9dfb8..92478e8469b 100644 --- a/src/gateway/server-ws-runtime.ts +++ b/src/gateway/server-ws-runtime.ts @@ -5,7 +5,7 @@ import { type GatewayWsSharedHandlerParams, } from "./server/ws-connection.js"; -type GatewayWsRuntimeParams = GatewayWsSharedHandlerParams & { +type GatewayWsRuntimeParams = Omit & { logGateway: ReturnType; logHealth: ReturnType; logWsControl: ReturnType; @@ -37,6 +37,7 @@ export function attachGatewayWsHandlers(params: GatewayWsRuntimeParams) { browserRateLimiter: params.browserRateLimiter, gatewayMethods: params.gatewayMethods, events: params.events, + refreshHealthSnapshot: params.context.refreshHealthSnapshot, logGateway: params.logGateway, logHealth: params.logHealth, logWsControl: params.logWsControl, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index f26855d4371..29cebd5f17e 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -667,6 +667,11 @@ export async function startGatewayServer( }; const { getRuntimeSnapshot, startChannels, startChannel, stopChannel, markChannelLoggedOut } = channelManager; + const refreshGatewayHealthSnapshotWithRuntime: typeof refreshGatewayHealthSnapshot = (opts) => + refreshGatewayHealthSnapshot({ + ...opts, + getRuntimeSnapshot, + }); const createCloseHandler = () => createGatewayCloseHandler({ bonjourStop: runtimeState.bonjourStop, @@ -721,7 +726,7 @@ export async function startGatewayServer( nodeSendToAllSubscribed, getPresenceVersion, getHealthVersion, - refreshGatewayHealthSnapshot, + refreshGatewayHealthSnapshot: refreshGatewayHealthSnapshotWithRuntime, logHealth, dedupe, chatAbortControllers, @@ -804,7 +809,7 @@ export async function startGatewayServer( pluginApprovalManager, loadGatewayModelCatalog, getHealthCache, - refreshHealthSnapshot: refreshGatewayHealthSnapshot, + refreshHealthSnapshot: refreshGatewayHealthSnapshotWithRuntime, logHealth, logGateway: log, incrementPresenceVersion, diff --git a/src/gateway/server/health-state.test.ts b/src/gateway/server/health-state.test.ts new file mode 100644 index 00000000000..c4a63b55c23 --- /dev/null +++ b/src/gateway/server/health-state.test.ts @@ -0,0 +1,155 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { HealthSummary } from "../../commands/health.js"; + +const getHealthSnapshotMock = vi.hoisted(() => vi.fn()); + +vi.mock("../../commands/health.js", () => ({ + getHealthSnapshot: getHealthSnapshotMock, +})); + +function createHealthSummary(): HealthSummary { + return { + ok: true, + ts: Date.now(), + durationMs: 1, + channels: {}, + channelOrder: [], + channelLabels: {}, + heartbeatSeconds: 0, + defaultAgentId: "main", + agents: [], + sessions: { + path: "/tmp/sessions.json", + count: 0, + recent: [], + }, + }; +} + +async function loadHealthState() { + vi.resetModules(); + getHealthSnapshotMock.mockReset(); + getHealthSnapshotMock.mockResolvedValue(createHealthSummary()); + return await import("./health-state.js"); +} + +describe("refreshGatewayHealthSnapshot", () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it("keeps refreshes coalesced while preserving the first probe intent", async () => { + const healthState = await loadHealthState(); + let resolveSnapshot: ((summary: HealthSummary) => void) | undefined; + getHealthSnapshotMock.mockImplementation( + () => + new Promise((resolve) => { + resolveSnapshot = resolve; + }), + ); + + const first = healthState.refreshGatewayHealthSnapshot({ probe: false }); + const second = healthState.refreshGatewayHealthSnapshot({ probe: true }); + + expect(getHealthSnapshotMock).toHaveBeenCalledTimes(1); + expect(getHealthSnapshotMock).toHaveBeenCalledWith({ + probe: false, + includeSensitive: false, + runtimeSnapshot: undefined, + }); + resolveSnapshot?.(createHealthSummary()); + await expect(Promise.all([first, second])).resolves.toHaveLength(2); + }); + + it("captures runtime snapshots for completed refreshes and guards snapshot failures", async () => { + const healthState = await loadHealthState(); + const runtimeSnapshot = { + channels: { discord: { accountId: "default", connected: true } }, + channelAccounts: {}, + }; + + await healthState.refreshGatewayHealthSnapshot({ + probe: false, + getRuntimeSnapshot: () => runtimeSnapshot, + }); + await healthState.refreshGatewayHealthSnapshot({ + probe: true, + getRuntimeSnapshot: () => { + throw new Error("bad channel config"); + }, + }); + + expect(getHealthSnapshotMock).toHaveBeenCalledTimes(2); + expect( + getHealthSnapshotMock.mock.calls + .map((call) => call[0]?.probe) + .toSorted((a, b) => Number(a) - Number(b)), + ).toEqual([false, true]); + expect(getHealthSnapshotMock.mock.calls.map((call) => call[0]?.includeSensitive)).toEqual([ + false, + false, + ]); + expect(getHealthSnapshotMock.mock.calls[0]?.[0]?.runtimeSnapshot).toBe(runtimeSnapshot); + expect(getHealthSnapshotMock.mock.calls[1]?.[0]?.runtimeSnapshot).toBeUndefined(); + }); + + it("does not cache or broadcast sensitive health refreshes", async () => { + const healthState = await loadHealthState(); + const sensitiveSummary = createHealthSummary(); + const safeSummary = createHealthSummary(); + const broadcast = vi.fn(); + getHealthSnapshotMock.mockResolvedValueOnce(sensitiveSummary).mockResolvedValueOnce(safeSummary); + healthState.setBroadcastHealthUpdate(broadcast); + const version = healthState.getHealthVersion(); + + await healthState.refreshGatewayHealthSnapshot({ probe: true, includeSensitive: true }); + + expect(healthState.getHealthCache()).toBeNull(); + expect(healthState.getHealthVersion()).toBe(version); + expect(broadcast).not.toHaveBeenCalled(); + + await healthState.refreshGatewayHealthSnapshot({ probe: false }); + + expect(healthState.getHealthCache()).toBe(safeSummary); + expect(healthState.getHealthVersion()).toBe(version + 1); + expect(broadcast).toHaveBeenCalledWith(safeSummary); + }); + + it("keeps sensitive and public refreshes on separate in-flight promises", async () => { + const healthState = await loadHealthState(); + const sensitiveSummary = createHealthSummary(); + const safeSummary = createHealthSummary(); + let resolveSensitive: (() => void) | undefined; + let resolveSafe: (() => void) | undefined; + getHealthSnapshotMock + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveSensitive = () => resolve(sensitiveSummary); + }), + ) + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveSafe = () => resolve(safeSummary); + }), + ); + + const sensitive = healthState.refreshGatewayHealthSnapshot({ + probe: true, + includeSensitive: true, + }); + const safe = healthState.refreshGatewayHealthSnapshot({ probe: false }); + + expect(getHealthSnapshotMock).toHaveBeenCalledTimes(2); + expect(getHealthSnapshotMock.mock.calls[0]?.[0]?.includeSensitive).toBe(true); + expect(getHealthSnapshotMock.mock.calls[1]?.[0]?.includeSensitive).toBe(false); + + resolveSensitive?.(); + resolveSafe?.(); + + await expect(sensitive).resolves.toBe(sensitiveSummary); + await expect(safe).resolves.toBe(safeSummary); + expect(healthState.getHealthCache()).toBe(safeSummary); + }); +}); diff --git a/src/gateway/server/health-state.ts b/src/gateway/server/health-state.ts index 76cc977fb1e..30891e17a18 100644 --- a/src/gateway/server/health-state.ts +++ b/src/gateway/server/health-state.ts @@ -7,11 +7,13 @@ import { getUpdateAvailable } from "../../infra/update-startup.js"; import { normalizeMainKey } from "../../routing/session-key.js"; import { resolveGatewayAuth } from "../auth.js"; import type { Snapshot } from "../protocol/index.js"; +import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js"; let presenceVersion = 1; let healthVersion = 1; let healthCache: HealthSummary | null = null; let healthRefresh: Promise | null = null; +let sensitiveHealthRefresh: Promise | null = null; let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null; export function buildGatewaySnapshot(opts?: { includeSensitive?: boolean }): Snapshot { @@ -69,19 +71,46 @@ export function setBroadcastHealthUpdate(fn: ((snap: HealthSummary) => void) | n broadcastHealthUpdate = fn; } -export async function refreshGatewayHealthSnapshot(opts?: { probe?: boolean }) { - if (!healthRefresh) { - healthRefresh = (async () => { - const snap = await getHealthSnapshot({ probe: opts?.probe }); - healthCache = snap; - healthVersion += 1; - if (broadcastHealthUpdate) { - broadcastHealthUpdate(snap); +export async function refreshGatewayHealthSnapshot(opts?: { + probe?: boolean; + includeSensitive?: boolean; + getRuntimeSnapshot?: () => ChannelRuntimeSnapshot; +}) { + const includeSensitive = opts?.includeSensitive === true; + let refresh = includeSensitive ? sensitiveHealthRefresh : healthRefresh; + if (!refresh) { + refresh = (async () => { + let runtimeSnapshot: ChannelRuntimeSnapshot | undefined; + try { + runtimeSnapshot = opts?.getRuntimeSnapshot?.(); + } catch { + runtimeSnapshot = undefined; + } + const snap = await getHealthSnapshot({ + probe: opts?.probe, + includeSensitive, + runtimeSnapshot, + }); + if (!includeSensitive) { + healthCache = snap; + healthVersion += 1; + if (broadcastHealthUpdate) { + broadcastHealthUpdate(snap); + } } return snap; })().finally(() => { - healthRefresh = null; + if (includeSensitive) { + sensitiveHealthRefresh = null; + } else { + healthRefresh = null; + } }); + if (includeSensitive) { + sensitiveHealthRefresh = refresh; + } else { + healthRefresh = refresh; + } } - return healthRefresh; + return refresh; } diff --git a/src/gateway/server/ws-connection.test.ts b/src/gateway/server/ws-connection.test.ts index 628edf3b71b..941f0302e36 100644 --- a/src/gateway/server/ws-connection.test.ts +++ b/src/gateway/server/ws-connection.test.ts @@ -70,6 +70,7 @@ describe("attachGatewayWsConnectionHandler", () => { getResolvedAuth: () => currentAuth, gatewayMethods: [], events: [], + refreshHealthSnapshot: vi.fn(async () => ({}) as never), logGateway: createLogger() as never, logHealth: createLogger() as never, logWsControl: createLogger() as never, @@ -133,6 +134,7 @@ describe("attachGatewayWsConnectionHandler", () => { resolvedAuth: createResolvedAuth("token"), gatewayMethods: [], events: [], + refreshHealthSnapshot: vi.fn(), logGateway: createLogger() as never, logHealth: createLogger() as never, logWsControl: createLogger() as never, diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index 59c76338f64..a1b8b8fafc6 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -133,6 +133,7 @@ export type GatewayWsSharedHandlerParams = { browserRateLimiter?: AuthRateLimiter; gatewayMethods: string[]; events: string[]; + refreshHealthSnapshot: GatewayRequestContext["refreshHealthSnapshot"]; }; export type AttachGatewayWsConnectionHandlerParams = GatewayWsSharedHandlerParams & { @@ -168,6 +169,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti browserRateLimiter, gatewayMethods, events, + refreshHealthSnapshot, logGateway, logHealth, logWsControl, @@ -402,6 +404,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti events, extraHandlers, buildRequestContext, + refreshHealthSnapshot, send, close, isClosed: () => closed, diff --git a/src/gateway/server/ws-connection/message-handler.post-connect-health.test.ts b/src/gateway/server/ws-connection/message-handler.post-connect-health.test.ts new file mode 100644 index 00000000000..25735fc61a7 --- /dev/null +++ b/src/gateway/server/ws-connection/message-handler.post-connect-health.test.ts @@ -0,0 +1,178 @@ +import type { IncomingMessage } from "node:http"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { WebSocket } from "ws"; +import type { ResolvedGatewayAuth } from "../../auth.js"; +import { PROTOCOL_VERSION } from "../../protocol/index.js"; +import type { GatewayRequestContext } from "../../server-methods/types.js"; + +const { + buildGatewaySnapshotMock, + getHealthCacheMock, + getHealthVersionMock, + incrementPresenceVersionMock, + loadConfigMock, + upsertPresenceMock, +} = vi.hoisted(() => ({ + buildGatewaySnapshotMock: vi.fn(() => ({ + presence: [], + health: {}, + stateVersion: { presence: 1, health: 1 }, + uptimeMs: 1, + sessionDefaults: { + defaultAgentId: "main", + mainKey: "main", + mainSessionKey: "main", + scope: "per-sender", + }, + })), + getHealthCacheMock: vi.fn(() => null), + getHealthVersionMock: vi.fn(() => 1), + incrementPresenceVersionMock: vi.fn(() => 2), + loadConfigMock: vi.fn(() => ({ + gateway: { + auth: { mode: "none" }, + controlUi: { + allowedOrigins: ["http://127.0.0.1:19001"], + dangerouslyDisableDeviceAuth: true, + }, + }, + })), + upsertPresenceMock: vi.fn(), +})); + +vi.mock("../../../config/config.js", () => ({ + getRuntimeConfig: loadConfigMock, + loadConfig: loadConfigMock, +})); + +vi.mock("../../../infra/system-presence.js", () => ({ + upsertPresence: upsertPresenceMock, +})); + +vi.mock("../../server-methods.js", () => ({ + handleGatewayRequest: vi.fn(), +})); + +vi.mock("../health-state.js", () => ({ + buildGatewaySnapshot: buildGatewaySnapshotMock, + getHealthCache: getHealthCacheMock, + getHealthVersion: getHealthVersionMock, + incrementPresenceVersion: incrementPresenceVersionMock, +})); + +import { attachGatewayWsMessageHandler } from "./message-handler.js"; + +function createLogger() { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; +} + +describe("attachGatewayWsMessageHandler post-connect health refresh", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("uses the injected runtime-aware health refresh after hello", async () => { + let resolveRefresh: (() => void) | undefined; + const refreshHealthSnapshot = vi.fn( + () => + new Promise((resolve) => { + resolveRefresh = () => resolve({} as never); + }), + ) as GatewayRequestContext["refreshHealthSnapshot"]; + const socketSend = vi.fn((_payload: string, cb?: (err?: Error) => void) => { + cb?.(); + }); + let onMessage: ((data: string) => void) | undefined; + const socket = { + _receiver: {}, + send: socketSend, + on: vi.fn((event: string, handler: (data: string) => void) => { + if (event === "message") { + onMessage = handler; + } + return socket; + }), + } as unknown as WebSocket; + const send = vi.fn(); + const isClosed = vi.fn(() => false); + let client: unknown = null; + const resolvedAuth: ResolvedGatewayAuth = { + mode: "none", + allowTailscale: false, + }; + + attachGatewayWsMessageHandler({ + socket, + upgradeReq: { + headers: { host: "127.0.0.1:19001", origin: "http://127.0.0.1:19001" }, + socket: { localAddress: "127.0.0.1", remoteAddress: "127.0.0.1" }, + } as unknown as IncomingMessage, + connId: "conn-1", + remoteAddr: "127.0.0.1", + localAddr: "127.0.0.1", + requestHost: "127.0.0.1:19001", + requestOrigin: "http://127.0.0.1:19001", + connectNonce: "nonce-1", + getResolvedAuth: () => resolvedAuth, + gatewayMethods: [], + events: [], + extraHandlers: {}, + buildRequestContext: () => ({}) as GatewayRequestContext, + refreshHealthSnapshot, + send, + close: vi.fn(), + isClosed, + clearHandshakeTimer: vi.fn(), + getClient: () => client as never, + setClient: (next) => { + client = next; + return true; + }, + setHandshakeState: vi.fn(), + setCloseCause: vi.fn(), + setLastFrameMeta: vi.fn(), + originCheckMetrics: { hostHeaderFallbackAccepted: 0 }, + logGateway: createLogger() as never, + logHealth: createLogger() as never, + logWsControl: createLogger() as never, + }); + + expect(onMessage).toBeDefined(); + + onMessage?.( + JSON.stringify({ + type: "req", + id: "connect-1", + method: "connect", + params: { + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, + client: { + id: "openclaw-control-ui", + version: "dev", + platform: "test", + mode: "ui", + }, + role: "operator", + caps: [], + }, + }), + ); + + await vi.waitFor(() => { + expect(socketSend).toHaveBeenCalled(); + }); + const hello = JSON.parse(socketSend.mock.calls[0]?.[0] ?? "{}") as { ok?: boolean }; + expect(hello.ok).toBe(true); + + await vi.waitFor(() => { + expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: true }); + }); + resolveRefresh?.(); + }); +}); diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index deb2144a4b2..f5d85d7b982 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -112,7 +112,6 @@ import { getHealthCache, getHealthVersion, incrementPresenceVersion, - refreshGatewayHealthSnapshot, } from "../health-state.js"; import { resolveSharedGatewaySessionGeneration } from "../ws-shared-generation.js"; import type { GatewayWsClient } from "../ws-types.js"; @@ -196,6 +195,7 @@ export function attachGatewayWsMessageHandler(params: { events: string[]; extraHandlers: GatewayRequestHandlers; buildRequestContext: () => GatewayRequestContext; + refreshHealthSnapshot: GatewayRequestContext["refreshHealthSnapshot"]; send: (obj: unknown) => void; close: (code?: number, reason?: string) => void; isClosed: () => boolean; @@ -234,6 +234,7 @@ export function attachGatewayWsMessageHandler(params: { events, extraHandlers, buildRequestContext, + refreshHealthSnapshot, send, close, isClosed, @@ -1491,7 +1492,7 @@ export function attachGatewayWsMessageHandler(params: { presence: snapshot.presence.length, stateVersion: snapshot.stateVersion.presence, }); - void refreshGatewayHealthSnapshot({ probe: true }).catch((err) => + void refreshHealthSnapshot({ probe: true }).catch((err) => logHealth.error(`post-connect health refresh failed: ${formatError(err)}`), ); return;