mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:20:43 +00:00
fix(gateway): preserve runtime-backed health state (#72417)
* fix(gateway): preserve runtime-backed health state * fix(clownfish): address review for ghcrawl-207035-agentic-merge (1) * fix(gateway): harden health snapshot exposure
This commit is contained in:
@@ -238,6 +238,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc.
|
||||
- Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq.
|
||||
- Hooks/session-memory: use the host local timezone for memory filenames, fallback timestamp slugs, and markdown headers instead of UTC dates. Fixes #46703. (#46721) Thanks @Astro-Han.
|
||||
- Gateway health: preserve live runtime-backed channel/account state in `gateway.health` snapshots and cached refreshes while keeping raw probe payloads on sensitive/admin paths only. (#39921, #42586, #46527, #52770, #42543) Thanks @FAL1989, @rstar327, @0xble, and @ajayr.
|
||||
- Feishu: extract quoted/replied interactive-card text across schema 1.0, schema 2.0, i18n, template-variable, and post-format fallback shapes without carrying broad generated/config churn from related parser experiments. (#38776, #60383, #42218, #45936) Thanks @lishuaigit, @lskun, @just2gooo, and @Br1an67.
|
||||
- Telegram/agents: hide raw failed write/edit warning messages in Telegram when the assistant already explicitly acknowledges the failed action, while keeping warnings when the reply claims success or omits the failure; #39406 remains the broader configurable delivery-policy follow-up. Fixes #51065; covers #39631. Thanks @Bartok9 and @Bortlesboat.
|
||||
- Exec approvals: accept a symlinked `OPENCLAW_HOME` as the trusted approvals root while still rejecting symlinked `.openclaw` path components below it. (#64663) Thanks @FunJim.
|
||||
|
||||
@@ -37,12 +37,25 @@ describe("projectSafeChannelAccountSnapshotFields", () => {
|
||||
|
||||
it("preserves non-secret transport liveness timestamps", () => {
|
||||
const snapshot = projectSafeChannelAccountSnapshotFields({
|
||||
connected: true,
|
||||
lastConnectedAt: 123,
|
||||
lastInboundAt: 123,
|
||||
lastOutboundAt: 234,
|
||||
lastMessageAt: null,
|
||||
lastEventAt: 345,
|
||||
lastTransportActivityAt: 456,
|
||||
channelAccessToken: "line-token",
|
||||
channelSecret: "line-secret", // pragma: allowlist secret
|
||||
probe: { ok: true, token: "probe-secret" },
|
||||
});
|
||||
|
||||
expect(snapshot).toEqual({
|
||||
connected: true,
|
||||
lastConnectedAt: 123,
|
||||
lastInboundAt: 123,
|
||||
lastOutboundAt: 234,
|
||||
lastMessageAt: null,
|
||||
lastEventAt: 345,
|
||||
lastTransportActivityAt: 456,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -26,6 +26,16 @@ function readNumber(record: Record<string, unknown>, key: string): number | unde
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
function readNullableNumber(
|
||||
record: Record<string, unknown>,
|
||||
key: string,
|
||||
): number | null | undefined {
|
||||
if (record[key] === null) {
|
||||
return null;
|
||||
}
|
||||
return readNumber(record, key);
|
||||
}
|
||||
|
||||
function readStringArray(record: Record<string, unknown>, key: string): string[] | undefined {
|
||||
const value = record[key];
|
||||
if (!Array.isArray(value)) {
|
||||
@@ -162,6 +172,7 @@ export function projectSafeChannelAccountSnapshotFields(
|
||||
return {};
|
||||
}
|
||||
const name = normalizeOptionalString(record.name);
|
||||
const statusState = normalizeOptionalString(record.statusState);
|
||||
const healthState = normalizeOptionalString(record.healthState);
|
||||
const mode = normalizeOptionalString(record.mode);
|
||||
const dmPolicy = normalizeOptionalString(record.dmPolicy);
|
||||
@@ -180,16 +191,39 @@ export function projectSafeChannelAccountSnapshotFields(
|
||||
...(readBoolean(record, "connected") !== undefined
|
||||
? { connected: readBoolean(record, "connected") }
|
||||
: {}),
|
||||
...(readBoolean(record, "restartPending") !== undefined
|
||||
? { restartPending: readBoolean(record, "restartPending") }
|
||||
: {}),
|
||||
...(readNumber(record, "reconnectAttempts") !== undefined
|
||||
? { reconnectAttempts: readNumber(record, "reconnectAttempts") }
|
||||
: {}),
|
||||
...(readNullableNumber(record, "lastConnectedAt") !== undefined
|
||||
? { lastConnectedAt: readNullableNumber(record, "lastConnectedAt") }
|
||||
: {}),
|
||||
...(readNumber(record, "lastInboundAt") !== undefined
|
||||
? { lastInboundAt: readNumber(record, "lastInboundAt") }
|
||||
: {}),
|
||||
...(readNullableNumber(record, "lastOutboundAt") !== undefined
|
||||
? { lastOutboundAt: readNullableNumber(record, "lastOutboundAt") }
|
||||
: {}),
|
||||
...(readNullableNumber(record, "lastMessageAt") !== undefined
|
||||
? { lastMessageAt: readNullableNumber(record, "lastMessageAt") }
|
||||
: {}),
|
||||
...(readNullableNumber(record, "lastEventAt") !== undefined
|
||||
? { lastEventAt: readNullableNumber(record, "lastEventAt") }
|
||||
: {}),
|
||||
...(readNumber(record, "lastTransportActivityAt") !== undefined
|
||||
? { lastTransportActivityAt: readNumber(record, "lastTransportActivityAt") }
|
||||
: {}),
|
||||
...(statusState ? { statusState } : {}),
|
||||
...(healthState ? { healthState } : {}),
|
||||
...(readBoolean(record, "busy") !== undefined ? { busy: readBoolean(record, "busy") } : {}),
|
||||
...(readNumber(record, "activeRuns") !== undefined
|
||||
? { activeRuns: readNumber(record, "activeRuns") }
|
||||
: {}),
|
||||
...(readNullableNumber(record, "lastRunActivityAt") !== undefined
|
||||
? { lastRunActivityAt: readNullableNumber(record, "lastRunActivityAt") }
|
||||
: {}),
|
||||
...(mode ? { mode } : {}),
|
||||
...(dmPolicy ? { dmPolicy } : {}),
|
||||
...(readStringArray(record, "allowFrom")
|
||||
|
||||
@@ -13,6 +13,10 @@ let setActivePluginRegistry: typeof import("../plugins/runtime.js").setActivePlu
|
||||
let createChannelTestPluginBase: typeof import("../test-utils/channel-plugins.js").createChannelTestPluginBase;
|
||||
let createTestRegistry: typeof import("../test-utils/channel-plugins.js").createTestRegistry;
|
||||
let getHealthSnapshot: typeof import("./health.js").getHealthSnapshot;
|
||||
let buildTelegramHealthSummaryForTest = buildTelegramHealthSummary;
|
||||
let probeTelegramAccountForTestOverride:
|
||||
| ((account: TelegramHealthAccount, timeoutMs: number) => Promise<Record<string, unknown>>)
|
||||
| undefined;
|
||||
|
||||
type TelegramHealthAccount = {
|
||||
accountId: string;
|
||||
@@ -289,9 +293,12 @@ function createTelegramHealthPlugin(): Pick<
|
||||
isConfigured: (account) => Boolean((account as TelegramHealthAccount).token.trim()),
|
||||
},
|
||||
status: {
|
||||
buildChannelSummary: ({ snapshot }) => buildTelegramHealthSummary(snapshot),
|
||||
buildChannelSummary: ({ snapshot }) => buildTelegramHealthSummaryForTest(snapshot),
|
||||
probeAccount: async ({ account, timeoutMs }) =>
|
||||
await probeTelegramAccountForTest(account as TelegramHealthAccount, timeoutMs),
|
||||
await (probeTelegramAccountForTestOverride ?? probeTelegramAccountForTest)(
|
||||
account as TelegramHealthAccount,
|
||||
timeoutMs,
|
||||
),
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -307,6 +314,8 @@ describe("getHealthSnapshot", () => {
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
buildTelegramHealthSummaryForTest = buildTelegramHealthSummary;
|
||||
probeTelegramAccountForTestOverride = undefined;
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{ pluginId: "telegram", plugin: createTelegramHealthPlugin(), source: "test" },
|
||||
@@ -421,6 +430,116 @@ describe("getHealthSnapshot", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("preserves runtime state and probe payloads when plugin summaries omit them", async () => {
|
||||
testConfig = { channels: { telegram: { botToken: "t-1" } } };
|
||||
testStore = {};
|
||||
vi.stubEnv("DISCORD_BOT_TOKEN", "");
|
||||
buildTelegramHealthSummaryForTest = (snapshot) => ({
|
||||
accountId: snapshot.accountId,
|
||||
configured: Boolean(snapshot.configured),
|
||||
});
|
||||
probeTelegramAccountForTestOverride = async () => ({
|
||||
ok: true,
|
||||
bot: { username: "runtime_bot" },
|
||||
});
|
||||
|
||||
const snap = await getHealthSnapshot({
|
||||
timeoutMs: 25,
|
||||
runtimeSnapshot: {
|
||||
channels: {
|
||||
telegram: {
|
||||
accountId: "default",
|
||||
connected: true,
|
||||
lastConnectedAt: 123,
|
||||
},
|
||||
},
|
||||
channelAccounts: {},
|
||||
},
|
||||
});
|
||||
const telegram = snap.channels.telegram as {
|
||||
connected?: boolean;
|
||||
lastConnectedAt?: number;
|
||||
probe?: { ok?: boolean; bot?: { username?: string } };
|
||||
accounts?: Record<
|
||||
string,
|
||||
{
|
||||
connected?: boolean;
|
||||
lastConnectedAt?: number;
|
||||
probe?: { ok?: boolean; bot?: { username?: string } };
|
||||
}
|
||||
>;
|
||||
};
|
||||
|
||||
expect(telegram.connected).toBe(true);
|
||||
expect(telegram.lastConnectedAt).toBe(123);
|
||||
expect(telegram.probe?.bot?.username).toBe("runtime_bot");
|
||||
expect(telegram.accounts?.default?.connected).toBe(true);
|
||||
expect(telegram.accounts?.default?.probe?.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("omits secret runtime fields and raw probe payloads from non-sensitive health snapshots", async () => {
|
||||
testConfig = { channels: { telegram: { botToken: "t-1" } } };
|
||||
testStore = {};
|
||||
vi.stubEnv("DISCORD_BOT_TOKEN", "");
|
||||
buildTelegramHealthSummaryForTest = (snapshot) => ({
|
||||
accountId: snapshot.accountId,
|
||||
configured: Boolean(snapshot.configured),
|
||||
probe: { ok: true, token: "summary-secret" },
|
||||
});
|
||||
probeTelegramAccountForTestOverride = async () => ({
|
||||
ok: true,
|
||||
bot: { username: "runtime_bot" },
|
||||
token: "probe-secret",
|
||||
});
|
||||
|
||||
const snap = await getHealthSnapshot({
|
||||
timeoutMs: 25,
|
||||
includeSensitive: false,
|
||||
runtimeSnapshot: {
|
||||
channels: {
|
||||
telegram: {
|
||||
accountId: "default",
|
||||
connected: true,
|
||||
lastConnectedAt: 123,
|
||||
channelAccessToken: "line-token",
|
||||
channelSecret: "line-secret", // pragma: allowlist secret
|
||||
webhookUrl: "https://example.test/hook?secret=1",
|
||||
},
|
||||
},
|
||||
channelAccounts: {},
|
||||
},
|
||||
});
|
||||
const telegram = snap.channels.telegram as {
|
||||
connected?: boolean;
|
||||
lastConnectedAt?: number;
|
||||
probe?: unknown;
|
||||
channelAccessToken?: string;
|
||||
channelSecret?: string;
|
||||
webhookUrl?: string;
|
||||
accounts?: Record<
|
||||
string,
|
||||
{
|
||||
connected?: boolean;
|
||||
lastConnectedAt?: number;
|
||||
probe?: unknown;
|
||||
channelAccessToken?: string;
|
||||
channelSecret?: string;
|
||||
webhookUrl?: string;
|
||||
}
|
||||
>;
|
||||
};
|
||||
|
||||
expect(telegram.connected).toBe(true);
|
||||
expect(telegram.lastConnectedAt).toBe(123);
|
||||
expect(telegram.probe).toBeUndefined();
|
||||
expect(telegram.channelAccessToken).toBeUndefined();
|
||||
expect(telegram.channelSecret).toBeUndefined();
|
||||
expect(telegram.webhookUrl).toBeUndefined();
|
||||
expect(telegram.accounts?.default?.connected).toBe(true);
|
||||
expect(telegram.accounts?.default?.probe).toBeUndefined();
|
||||
expect(telegram.accounts?.default?.channelAccessToken).toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns structured telegram probe errors", async () => {
|
||||
testConfig = { channels: { telegram: { botToken: "bad-token" } } };
|
||||
testStore = {};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { projectSafeChannelAccountSnapshotFields } from "../channels/account-snapshot-fields.js";
|
||||
import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js";
|
||||
import { listReadOnlyChannelPluginsForConfig } from "../channels/plugins/read-only.js";
|
||||
import type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
|
||||
@@ -9,6 +10,7 @@ import { getRuntimeConfig } from "../config/config.js";
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { buildGatewayConnectionDetails, callGateway } from "../gateway/call.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../gateway/server-channel-runtime.types.js";
|
||||
import { info } from "../globals.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
@@ -255,6 +257,8 @@ async function resolveHealthAccountContext(params: {
|
||||
export async function getHealthSnapshot(params?: {
|
||||
timeoutMs?: number;
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
runtimeSnapshot?: ChannelRuntimeSnapshot;
|
||||
}): Promise<HealthSummary> {
|
||||
const timeoutMs = params?.timeoutMs;
|
||||
const cfg = getRuntimeConfig();
|
||||
@@ -285,6 +289,7 @@ export async function getHealthSnapshot(params?: {
|
||||
const start = Date.now();
|
||||
const cappedTimeout = timeoutMs === undefined ? DEFAULT_TIMEOUT_MS : Math.max(50, timeoutMs);
|
||||
const doProbe = params?.probe !== false;
|
||||
const includeSensitive = params?.includeSensitive !== false;
|
||||
const channels: Record<string, ChannelHealthSummary> = {};
|
||||
const plugins = listReadOnlyChannelPluginsForConfig(cfg, {
|
||||
includeSetupRuntimeFallback: false,
|
||||
@@ -362,12 +367,16 @@ export async function getHealthSnapshot(params?: {
|
||||
debugHealth("probe.bot", { channel: plugin.id, accountId, username: bot.username });
|
||||
}
|
||||
|
||||
const runtimeSnapshot =
|
||||
params?.runtimeSnapshot?.channelAccounts[plugin.id]?.[accountId] ??
|
||||
(accountId === defaultAccountId ? params?.runtimeSnapshot?.channels[plugin.id] : undefined);
|
||||
const snapshot: ChannelAccountSnapshot = {
|
||||
...projectSafeChannelAccountSnapshotFields(runtimeSnapshot),
|
||||
accountId,
|
||||
enabled,
|
||||
configured,
|
||||
};
|
||||
if (probe !== undefined) {
|
||||
if (includeSensitive && probe !== undefined) {
|
||||
snapshot.probe = probe;
|
||||
}
|
||||
if (lastProbeAt) {
|
||||
@@ -384,16 +393,21 @@ export async function getHealthSnapshot(params?: {
|
||||
: undefined;
|
||||
const record =
|
||||
summary && typeof summary === "object"
|
||||
? (summary as ChannelAccountHealthSummary)
|
||||
? ({ ...snapshot, ...summary } as ChannelAccountHealthSummary)
|
||||
: ({
|
||||
...snapshot,
|
||||
accountId,
|
||||
configured,
|
||||
probe,
|
||||
lastProbeAt,
|
||||
} satisfies ChannelAccountHealthSummary);
|
||||
if (record.configured === undefined) {
|
||||
record.configured = configured;
|
||||
}
|
||||
if (includeSensitive && record.probe === undefined && probe !== undefined) {
|
||||
record.probe = probe;
|
||||
}
|
||||
if (!includeSensitive) {
|
||||
delete record.probe;
|
||||
}
|
||||
if (record.lastProbeAt === undefined && lastProbeAt) {
|
||||
record.lastProbeAt = lastProbeAt;
|
||||
}
|
||||
|
||||
@@ -26,7 +26,10 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
nodeSendToAllSubscribed: (event: string, payload: unknown) => void;
|
||||
getPresenceVersion: () => number;
|
||||
getHealthVersion: () => number;
|
||||
refreshGatewayHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
|
||||
refreshGatewayHealthSnapshot: (opts?: {
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
}) => Promise<HealthSummary>;
|
||||
logHealth: { error: (msg: string) => void };
|
||||
dedupe: Map<string, DedupeEntry>;
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
|
||||
@@ -8,20 +8,22 @@ import type { GatewayRequestHandlers } from "./types.js";
|
||||
const ADMIN_SCOPE = "operator.admin";
|
||||
|
||||
export const healthHandlers: GatewayRequestHandlers = {
|
||||
health: async ({ respond, context, params }) => {
|
||||
health: async ({ respond, context, params, client }) => {
|
||||
const { getHealthCache, refreshHealthSnapshot, logHealth } = context;
|
||||
const wantsProbe = params?.probe === true;
|
||||
const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : [];
|
||||
const includeSensitive = scopes.includes(ADMIN_SCOPE);
|
||||
const now = Date.now();
|
||||
const cached = getHealthCache();
|
||||
if (!wantsProbe && cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) {
|
||||
respond(true, cached, undefined, { cached: true });
|
||||
void refreshHealthSnapshot({ probe: false }).catch((err) =>
|
||||
void refreshHealthSnapshot({ probe: false, includeSensitive }).catch((err) =>
|
||||
logHealth.error(`background health refresh failed: ${formatError(err)}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const snap = await refreshHealthSnapshot({ probe: wantsProbe });
|
||||
const snap = await refreshHealthSnapshot({ probe: wantsProbe, includeSensitive });
|
||||
respond(true, snap, undefined);
|
||||
} catch (err) {
|
||||
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)));
|
||||
|
||||
@@ -46,7 +46,10 @@ export type GatewayRequestContext = {
|
||||
pluginApprovalManager?: ExecApprovalManager<PluginApprovalRequestPayload>;
|
||||
loadGatewayModelCatalog: () => Promise<ModelCatalogEntry[]>;
|
||||
getHealthCache: () => HealthSummary | null;
|
||||
refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
|
||||
refreshHealthSnapshot: (opts?: {
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
}) => Promise<HealthSummary>;
|
||||
logHealth: { error: (message: string) => void };
|
||||
logGateway: SubsystemLogger;
|
||||
incrementPresenceVersion: () => number;
|
||||
|
||||
@@ -25,7 +25,10 @@ export type NodeEventContext = {
|
||||
dedupe: Map<string, DedupeEntry>;
|
||||
agentRunSeq: Map<string, number>;
|
||||
getHealthCache: () => HealthSummary | null;
|
||||
refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
|
||||
refreshHealthSnapshot: (opts?: {
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
}) => Promise<HealthSummary>;
|
||||
loadGatewayModelCatalog: () => Promise<ModelCatalogEntry[]>;
|
||||
logGateway: { warn: (msg: string) => void };
|
||||
};
|
||||
|
||||
@@ -5,7 +5,7 @@ import {
|
||||
type GatewayWsSharedHandlerParams,
|
||||
} from "./server/ws-connection.js";
|
||||
|
||||
type GatewayWsRuntimeParams = GatewayWsSharedHandlerParams & {
|
||||
type GatewayWsRuntimeParams = Omit<GatewayWsSharedHandlerParams, "refreshHealthSnapshot"> & {
|
||||
logGateway: ReturnType<typeof createSubsystemLogger>;
|
||||
logHealth: ReturnType<typeof createSubsystemLogger>;
|
||||
logWsControl: ReturnType<typeof createSubsystemLogger>;
|
||||
@@ -37,6 +37,7 @@ export function attachGatewayWsHandlers(params: GatewayWsRuntimeParams) {
|
||||
browserRateLimiter: params.browserRateLimiter,
|
||||
gatewayMethods: params.gatewayMethods,
|
||||
events: params.events,
|
||||
refreshHealthSnapshot: params.context.refreshHealthSnapshot,
|
||||
logGateway: params.logGateway,
|
||||
logHealth: params.logHealth,
|
||||
logWsControl: params.logWsControl,
|
||||
|
||||
@@ -667,6 +667,11 @@ export async function startGatewayServer(
|
||||
};
|
||||
const { getRuntimeSnapshot, startChannels, startChannel, stopChannel, markChannelLoggedOut } =
|
||||
channelManager;
|
||||
const refreshGatewayHealthSnapshotWithRuntime: typeof refreshGatewayHealthSnapshot = (opts) =>
|
||||
refreshGatewayHealthSnapshot({
|
||||
...opts,
|
||||
getRuntimeSnapshot,
|
||||
});
|
||||
const createCloseHandler = () =>
|
||||
createGatewayCloseHandler({
|
||||
bonjourStop: runtimeState.bonjourStop,
|
||||
@@ -721,7 +726,7 @@ export async function startGatewayServer(
|
||||
nodeSendToAllSubscribed,
|
||||
getPresenceVersion,
|
||||
getHealthVersion,
|
||||
refreshGatewayHealthSnapshot,
|
||||
refreshGatewayHealthSnapshot: refreshGatewayHealthSnapshotWithRuntime,
|
||||
logHealth,
|
||||
dedupe,
|
||||
chatAbortControllers,
|
||||
@@ -804,7 +809,7 @@ export async function startGatewayServer(
|
||||
pluginApprovalManager,
|
||||
loadGatewayModelCatalog,
|
||||
getHealthCache,
|
||||
refreshHealthSnapshot: refreshGatewayHealthSnapshot,
|
||||
refreshHealthSnapshot: refreshGatewayHealthSnapshotWithRuntime,
|
||||
logHealth,
|
||||
logGateway: log,
|
||||
incrementPresenceVersion,
|
||||
|
||||
155
src/gateway/server/health-state.test.ts
Normal file
155
src/gateway/server/health-state.test.ts
Normal file
@@ -0,0 +1,155 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { HealthSummary } from "../../commands/health.js";
|
||||
|
||||
const getHealthSnapshotMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("../../commands/health.js", () => ({
|
||||
getHealthSnapshot: getHealthSnapshotMock,
|
||||
}));
|
||||
|
||||
function createHealthSummary(): HealthSummary {
|
||||
return {
|
||||
ok: true,
|
||||
ts: Date.now(),
|
||||
durationMs: 1,
|
||||
channels: {},
|
||||
channelOrder: [],
|
||||
channelLabels: {},
|
||||
heartbeatSeconds: 0,
|
||||
defaultAgentId: "main",
|
||||
agents: [],
|
||||
sessions: {
|
||||
path: "/tmp/sessions.json",
|
||||
count: 0,
|
||||
recent: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function loadHealthState() {
|
||||
vi.resetModules();
|
||||
getHealthSnapshotMock.mockReset();
|
||||
getHealthSnapshotMock.mockResolvedValue(createHealthSummary());
|
||||
return await import("./health-state.js");
|
||||
}
|
||||
|
||||
describe("refreshGatewayHealthSnapshot", () => {
|
||||
beforeEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("keeps refreshes coalesced while preserving the first probe intent", async () => {
|
||||
const healthState = await loadHealthState();
|
||||
let resolveSnapshot: ((summary: HealthSummary) => void) | undefined;
|
||||
getHealthSnapshotMock.mockImplementation(
|
||||
() =>
|
||||
new Promise<HealthSummary>((resolve) => {
|
||||
resolveSnapshot = resolve;
|
||||
}),
|
||||
);
|
||||
|
||||
const first = healthState.refreshGatewayHealthSnapshot({ probe: false });
|
||||
const second = healthState.refreshGatewayHealthSnapshot({ probe: true });
|
||||
|
||||
expect(getHealthSnapshotMock).toHaveBeenCalledTimes(1);
|
||||
expect(getHealthSnapshotMock).toHaveBeenCalledWith({
|
||||
probe: false,
|
||||
includeSensitive: false,
|
||||
runtimeSnapshot: undefined,
|
||||
});
|
||||
resolveSnapshot?.(createHealthSummary());
|
||||
await expect(Promise.all([first, second])).resolves.toHaveLength(2);
|
||||
});
|
||||
|
||||
it("captures runtime snapshots for completed refreshes and guards snapshot failures", async () => {
|
||||
const healthState = await loadHealthState();
|
||||
const runtimeSnapshot = {
|
||||
channels: { discord: { accountId: "default", connected: true } },
|
||||
channelAccounts: {},
|
||||
};
|
||||
|
||||
await healthState.refreshGatewayHealthSnapshot({
|
||||
probe: false,
|
||||
getRuntimeSnapshot: () => runtimeSnapshot,
|
||||
});
|
||||
await healthState.refreshGatewayHealthSnapshot({
|
||||
probe: true,
|
||||
getRuntimeSnapshot: () => {
|
||||
throw new Error("bad channel config");
|
||||
},
|
||||
});
|
||||
|
||||
expect(getHealthSnapshotMock).toHaveBeenCalledTimes(2);
|
||||
expect(
|
||||
getHealthSnapshotMock.mock.calls
|
||||
.map((call) => call[0]?.probe)
|
||||
.toSorted((a, b) => Number(a) - Number(b)),
|
||||
).toEqual([false, true]);
|
||||
expect(getHealthSnapshotMock.mock.calls.map((call) => call[0]?.includeSensitive)).toEqual([
|
||||
false,
|
||||
false,
|
||||
]);
|
||||
expect(getHealthSnapshotMock.mock.calls[0]?.[0]?.runtimeSnapshot).toBe(runtimeSnapshot);
|
||||
expect(getHealthSnapshotMock.mock.calls[1]?.[0]?.runtimeSnapshot).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not cache or broadcast sensitive health refreshes", async () => {
|
||||
const healthState = await loadHealthState();
|
||||
const sensitiveSummary = createHealthSummary();
|
||||
const safeSummary = createHealthSummary();
|
||||
const broadcast = vi.fn();
|
||||
getHealthSnapshotMock.mockResolvedValueOnce(sensitiveSummary).mockResolvedValueOnce(safeSummary);
|
||||
healthState.setBroadcastHealthUpdate(broadcast);
|
||||
const version = healthState.getHealthVersion();
|
||||
|
||||
await healthState.refreshGatewayHealthSnapshot({ probe: true, includeSensitive: true });
|
||||
|
||||
expect(healthState.getHealthCache()).toBeNull();
|
||||
expect(healthState.getHealthVersion()).toBe(version);
|
||||
expect(broadcast).not.toHaveBeenCalled();
|
||||
|
||||
await healthState.refreshGatewayHealthSnapshot({ probe: false });
|
||||
|
||||
expect(healthState.getHealthCache()).toBe(safeSummary);
|
||||
expect(healthState.getHealthVersion()).toBe(version + 1);
|
||||
expect(broadcast).toHaveBeenCalledWith(safeSummary);
|
||||
});
|
||||
|
||||
it("keeps sensitive and public refreshes on separate in-flight promises", async () => {
|
||||
const healthState = await loadHealthState();
|
||||
const sensitiveSummary = createHealthSummary();
|
||||
const safeSummary = createHealthSummary();
|
||||
let resolveSensitive: (() => void) | undefined;
|
||||
let resolveSafe: (() => void) | undefined;
|
||||
getHealthSnapshotMock
|
||||
.mockImplementationOnce(
|
||||
() =>
|
||||
new Promise<HealthSummary>((resolve) => {
|
||||
resolveSensitive = () => resolve(sensitiveSummary);
|
||||
}),
|
||||
)
|
||||
.mockImplementationOnce(
|
||||
() =>
|
||||
new Promise<HealthSummary>((resolve) => {
|
||||
resolveSafe = () => resolve(safeSummary);
|
||||
}),
|
||||
);
|
||||
|
||||
const sensitive = healthState.refreshGatewayHealthSnapshot({
|
||||
probe: true,
|
||||
includeSensitive: true,
|
||||
});
|
||||
const safe = healthState.refreshGatewayHealthSnapshot({ probe: false });
|
||||
|
||||
expect(getHealthSnapshotMock).toHaveBeenCalledTimes(2);
|
||||
expect(getHealthSnapshotMock.mock.calls[0]?.[0]?.includeSensitive).toBe(true);
|
||||
expect(getHealthSnapshotMock.mock.calls[1]?.[0]?.includeSensitive).toBe(false);
|
||||
|
||||
resolveSensitive?.();
|
||||
resolveSafe?.();
|
||||
|
||||
await expect(sensitive).resolves.toBe(sensitiveSummary);
|
||||
await expect(safe).resolves.toBe(safeSummary);
|
||||
expect(healthState.getHealthCache()).toBe(safeSummary);
|
||||
});
|
||||
});
|
||||
@@ -7,11 +7,13 @@ import { getUpdateAvailable } from "../../infra/update-startup.js";
|
||||
import { normalizeMainKey } from "../../routing/session-key.js";
|
||||
import { resolveGatewayAuth } from "../auth.js";
|
||||
import type { Snapshot } from "../protocol/index.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js";
|
||||
|
||||
let presenceVersion = 1;
|
||||
let healthVersion = 1;
|
||||
let healthCache: HealthSummary | null = null;
|
||||
let healthRefresh: Promise<HealthSummary> | null = null;
|
||||
let sensitiveHealthRefresh: Promise<HealthSummary> | null = null;
|
||||
let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null;
|
||||
|
||||
export function buildGatewaySnapshot(opts?: { includeSensitive?: boolean }): Snapshot {
|
||||
@@ -69,19 +71,46 @@ export function setBroadcastHealthUpdate(fn: ((snap: HealthSummary) => void) | n
|
||||
broadcastHealthUpdate = fn;
|
||||
}
|
||||
|
||||
export async function refreshGatewayHealthSnapshot(opts?: { probe?: boolean }) {
|
||||
if (!healthRefresh) {
|
||||
healthRefresh = (async () => {
|
||||
const snap = await getHealthSnapshot({ probe: opts?.probe });
|
||||
healthCache = snap;
|
||||
healthVersion += 1;
|
||||
if (broadcastHealthUpdate) {
|
||||
broadcastHealthUpdate(snap);
|
||||
export async function refreshGatewayHealthSnapshot(opts?: {
|
||||
probe?: boolean;
|
||||
includeSensitive?: boolean;
|
||||
getRuntimeSnapshot?: () => ChannelRuntimeSnapshot;
|
||||
}) {
|
||||
const includeSensitive = opts?.includeSensitive === true;
|
||||
let refresh = includeSensitive ? sensitiveHealthRefresh : healthRefresh;
|
||||
if (!refresh) {
|
||||
refresh = (async () => {
|
||||
let runtimeSnapshot: ChannelRuntimeSnapshot | undefined;
|
||||
try {
|
||||
runtimeSnapshot = opts?.getRuntimeSnapshot?.();
|
||||
} catch {
|
||||
runtimeSnapshot = undefined;
|
||||
}
|
||||
const snap = await getHealthSnapshot({
|
||||
probe: opts?.probe,
|
||||
includeSensitive,
|
||||
runtimeSnapshot,
|
||||
});
|
||||
if (!includeSensitive) {
|
||||
healthCache = snap;
|
||||
healthVersion += 1;
|
||||
if (broadcastHealthUpdate) {
|
||||
broadcastHealthUpdate(snap);
|
||||
}
|
||||
}
|
||||
return snap;
|
||||
})().finally(() => {
|
||||
healthRefresh = null;
|
||||
if (includeSensitive) {
|
||||
sensitiveHealthRefresh = null;
|
||||
} else {
|
||||
healthRefresh = null;
|
||||
}
|
||||
});
|
||||
if (includeSensitive) {
|
||||
sensitiveHealthRefresh = refresh;
|
||||
} else {
|
||||
healthRefresh = refresh;
|
||||
}
|
||||
}
|
||||
return healthRefresh;
|
||||
return refresh;
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
getResolvedAuth: () => currentAuth,
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
@@ -133,6 +134,7 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
resolvedAuth: createResolvedAuth("token"),
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(),
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
|
||||
@@ -133,6 +133,7 @@ export type GatewayWsSharedHandlerParams = {
|
||||
browserRateLimiter?: AuthRateLimiter;
|
||||
gatewayMethods: string[];
|
||||
events: string[];
|
||||
refreshHealthSnapshot: GatewayRequestContext["refreshHealthSnapshot"];
|
||||
};
|
||||
|
||||
export type AttachGatewayWsConnectionHandlerParams = GatewayWsSharedHandlerParams & {
|
||||
@@ -168,6 +169,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
|
||||
browserRateLimiter,
|
||||
gatewayMethods,
|
||||
events,
|
||||
refreshHealthSnapshot,
|
||||
logGateway,
|
||||
logHealth,
|
||||
logWsControl,
|
||||
@@ -402,6 +404,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
|
||||
events,
|
||||
extraHandlers,
|
||||
buildRequestContext,
|
||||
refreshHealthSnapshot,
|
||||
send,
|
||||
close,
|
||||
isClosed: () => closed,
|
||||
|
||||
@@ -0,0 +1,178 @@
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { WebSocket } from "ws";
|
||||
import type { ResolvedGatewayAuth } from "../../auth.js";
|
||||
import { PROTOCOL_VERSION } from "../../protocol/index.js";
|
||||
import type { GatewayRequestContext } from "../../server-methods/types.js";
|
||||
|
||||
const {
|
||||
buildGatewaySnapshotMock,
|
||||
getHealthCacheMock,
|
||||
getHealthVersionMock,
|
||||
incrementPresenceVersionMock,
|
||||
loadConfigMock,
|
||||
upsertPresenceMock,
|
||||
} = vi.hoisted(() => ({
|
||||
buildGatewaySnapshotMock: vi.fn(() => ({
|
||||
presence: [],
|
||||
health: {},
|
||||
stateVersion: { presence: 1, health: 1 },
|
||||
uptimeMs: 1,
|
||||
sessionDefaults: {
|
||||
defaultAgentId: "main",
|
||||
mainKey: "main",
|
||||
mainSessionKey: "main",
|
||||
scope: "per-sender",
|
||||
},
|
||||
})),
|
||||
getHealthCacheMock: vi.fn(() => null),
|
||||
getHealthVersionMock: vi.fn(() => 1),
|
||||
incrementPresenceVersionMock: vi.fn(() => 2),
|
||||
loadConfigMock: vi.fn(() => ({
|
||||
gateway: {
|
||||
auth: { mode: "none" },
|
||||
controlUi: {
|
||||
allowedOrigins: ["http://127.0.0.1:19001"],
|
||||
dangerouslyDisableDeviceAuth: true,
|
||||
},
|
||||
},
|
||||
})),
|
||||
upsertPresenceMock: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../../../config/config.js", () => ({
|
||||
getRuntimeConfig: loadConfigMock,
|
||||
loadConfig: loadConfigMock,
|
||||
}));
|
||||
|
||||
vi.mock("../../../infra/system-presence.js", () => ({
|
||||
upsertPresence: upsertPresenceMock,
|
||||
}));
|
||||
|
||||
vi.mock("../../server-methods.js", () => ({
|
||||
handleGatewayRequest: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../health-state.js", () => ({
|
||||
buildGatewaySnapshot: buildGatewaySnapshotMock,
|
||||
getHealthCache: getHealthCacheMock,
|
||||
getHealthVersion: getHealthVersionMock,
|
||||
incrementPresenceVersion: incrementPresenceVersionMock,
|
||||
}));
|
||||
|
||||
import { attachGatewayWsMessageHandler } from "./message-handler.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
describe("attachGatewayWsMessageHandler post-connect health refresh", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("uses the injected runtime-aware health refresh after hello", async () => {
|
||||
let resolveRefresh: (() => void) | undefined;
|
||||
const refreshHealthSnapshot = vi.fn(
|
||||
() =>
|
||||
new Promise((resolve) => {
|
||||
resolveRefresh = () => resolve({} as never);
|
||||
}),
|
||||
) as GatewayRequestContext["refreshHealthSnapshot"];
|
||||
const socketSend = vi.fn((_payload: string, cb?: (err?: Error) => void) => {
|
||||
cb?.();
|
||||
});
|
||||
let onMessage: ((data: string) => void) | undefined;
|
||||
const socket = {
|
||||
_receiver: {},
|
||||
send: socketSend,
|
||||
on: vi.fn((event: string, handler: (data: string) => void) => {
|
||||
if (event === "message") {
|
||||
onMessage = handler;
|
||||
}
|
||||
return socket;
|
||||
}),
|
||||
} as unknown as WebSocket;
|
||||
const send = vi.fn();
|
||||
const isClosed = vi.fn(() => false);
|
||||
let client: unknown = null;
|
||||
const resolvedAuth: ResolvedGatewayAuth = {
|
||||
mode: "none",
|
||||
allowTailscale: false,
|
||||
};
|
||||
|
||||
attachGatewayWsMessageHandler({
|
||||
socket,
|
||||
upgradeReq: {
|
||||
headers: { host: "127.0.0.1:19001", origin: "http://127.0.0.1:19001" },
|
||||
socket: { localAddress: "127.0.0.1", remoteAddress: "127.0.0.1" },
|
||||
} as unknown as IncomingMessage,
|
||||
connId: "conn-1",
|
||||
remoteAddr: "127.0.0.1",
|
||||
localAddr: "127.0.0.1",
|
||||
requestHost: "127.0.0.1:19001",
|
||||
requestOrigin: "http://127.0.0.1:19001",
|
||||
connectNonce: "nonce-1",
|
||||
getResolvedAuth: () => resolvedAuth,
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
extraHandlers: {},
|
||||
buildRequestContext: () => ({}) as GatewayRequestContext,
|
||||
refreshHealthSnapshot,
|
||||
send,
|
||||
close: vi.fn(),
|
||||
isClosed,
|
||||
clearHandshakeTimer: vi.fn(),
|
||||
getClient: () => client as never,
|
||||
setClient: (next) => {
|
||||
client = next;
|
||||
return true;
|
||||
},
|
||||
setHandshakeState: vi.fn(),
|
||||
setCloseCause: vi.fn(),
|
||||
setLastFrameMeta: vi.fn(),
|
||||
originCheckMetrics: { hostHeaderFallbackAccepted: 0 },
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
});
|
||||
|
||||
expect(onMessage).toBeDefined();
|
||||
|
||||
onMessage?.(
|
||||
JSON.stringify({
|
||||
type: "req",
|
||||
id: "connect-1",
|
||||
method: "connect",
|
||||
params: {
|
||||
minProtocol: PROTOCOL_VERSION,
|
||||
maxProtocol: PROTOCOL_VERSION,
|
||||
client: {
|
||||
id: "openclaw-control-ui",
|
||||
version: "dev",
|
||||
platform: "test",
|
||||
mode: "ui",
|
||||
},
|
||||
role: "operator",
|
||||
caps: [],
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(socketSend).toHaveBeenCalled();
|
||||
});
|
||||
const hello = JSON.parse(socketSend.mock.calls[0]?.[0] ?? "{}") as { ok?: boolean };
|
||||
expect(hello.ok).toBe(true);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: true });
|
||||
});
|
||||
resolveRefresh?.();
|
||||
});
|
||||
});
|
||||
@@ -112,7 +112,6 @@ import {
|
||||
getHealthCache,
|
||||
getHealthVersion,
|
||||
incrementPresenceVersion,
|
||||
refreshGatewayHealthSnapshot,
|
||||
} from "../health-state.js";
|
||||
import { resolveSharedGatewaySessionGeneration } from "../ws-shared-generation.js";
|
||||
import type { GatewayWsClient } from "../ws-types.js";
|
||||
@@ -196,6 +195,7 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
events: string[];
|
||||
extraHandlers: GatewayRequestHandlers;
|
||||
buildRequestContext: () => GatewayRequestContext;
|
||||
refreshHealthSnapshot: GatewayRequestContext["refreshHealthSnapshot"];
|
||||
send: (obj: unknown) => void;
|
||||
close: (code?: number, reason?: string) => void;
|
||||
isClosed: () => boolean;
|
||||
@@ -234,6 +234,7 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
events,
|
||||
extraHandlers,
|
||||
buildRequestContext,
|
||||
refreshHealthSnapshot,
|
||||
send,
|
||||
close,
|
||||
isClosed,
|
||||
@@ -1491,7 +1492,7 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
presence: snapshot.presence.length,
|
||||
stateVersion: snapshot.stateVersion.presence,
|
||||
});
|
||||
void refreshGatewayHealthSnapshot({ probe: true }).catch((err) =>
|
||||
void refreshHealthSnapshot({ probe: true }).catch((err) =>
|
||||
logHealth.error(`post-connect health refresh failed: ${formatError(err)}`),
|
||||
);
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user