mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix: use transport activity for stale health
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
7c1b8b34618f44d56817ff54b930701710087dc7e76beaf4a554b6a5a25ba87c config-baseline.json
|
||||
ed0c093e8acab2364608be3e65b98836600aea07df73ebb51d11919969c6c8fe config-baseline.core.json
|
||||
c25e4b5e1c1469ec66bd9ced3759c2542a05b6ecb0db9aa71fa5a8054f8ef0a2 config-baseline.json
|
||||
ed4e305904b4b954ffa72c07ea1900a116bfd874ac0c637227883abb99f753f9 config-baseline.core.json
|
||||
6c0069b971ae298ae68516ebcd3eae0e8c82820d2e8f42ecbd2f53a2f9077371 config-baseline.channel.json
|
||||
e5b7756b5f45ba227aa1bfab990dcf8a2a8b409b9ca01ea8bb1d5cd7adc06c90 config-baseline.plugin.json
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
6f605be396ee42efbe26cfd0cc90d7710ca378959aecd6388dd81a5b97996b43 plugin-sdk-api-baseline.json
|
||||
9c34c7c068f6d3bc5cf44817fe14c470c1c091595296f829e1efb4d6e7ba3599 plugin-sdk-api-baseline.jsonl
|
||||
3fa4d37ea6dbe5dfd540bcaa8c3bb2b81f2cfd439fa446d548ba159388c7b520 plugin-sdk-api-baseline.json
|
||||
bfb88286eeb8dd11871242d17de4d4bd9196fb27ce5d30dbaa1c0d277c27be85 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -1066,6 +1066,8 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
const trackInboundEvent = opts.setStatus
|
||||
? () => {
|
||||
const at = Date.now();
|
||||
// Carbon handles gateway heartbeats internally but does not expose a
|
||||
// stable heartbeat-ack event, so Discord app events stay app-level only.
|
||||
opts.setStatus?.({ lastEventAt: at, lastInboundAt: at });
|
||||
}
|
||||
: undefined;
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
|
||||
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
|
||||
import {
|
||||
createConnectedChannelStatusPatch,
|
||||
createTransportActivityStatusPatch,
|
||||
} from "openclaw/plugin-sdk/gateway-runtime";
|
||||
import { formatMatrixErrorMessage } from "../errors.js";
|
||||
import {
|
||||
isMatrixDisconnectedSyncState,
|
||||
@@ -52,12 +55,15 @@ export function createMatrixMonitorStatusController(params: {
|
||||
});
|
||||
};
|
||||
|
||||
const noteConnected = (at = Date.now()) => {
|
||||
const noteConnected = (at = Date.now(), options?: { transportActivity?: boolean }) => {
|
||||
if (status.connected === true) {
|
||||
status.lastEventAt = at;
|
||||
} else {
|
||||
Object.assign(status, createConnectedChannelStatusPatch(at));
|
||||
}
|
||||
if (options?.transportActivity) {
|
||||
Object.assign(status, createTransportActivityStatusPatch(at));
|
||||
}
|
||||
status.lastError = null;
|
||||
status.lastDisconnect = null;
|
||||
status.healthState = "healthy";
|
||||
@@ -83,7 +89,10 @@ export function createMatrixMonitorStatusController(params: {
|
||||
return {
|
||||
noteSyncState(state: MatrixSyncState, error?: unknown, at = Date.now()) {
|
||||
if (isMatrixReadySyncState(state)) {
|
||||
noteConnected(at);
|
||||
// matrix-js-sdk emits SYNCING after each successful /sync response.
|
||||
// PREPARED can be cache-backed and CATCHUP is a lifecycle bridge, so
|
||||
// neither should refresh transport liveness.
|
||||
noteConnected(at, { transportActivity: state === "SYNCING" });
|
||||
return;
|
||||
}
|
||||
if (isMatrixDisconnectedSyncState(state)) {
|
||||
|
||||
@@ -129,6 +129,42 @@ describe("createMatrixMonitorSyncLifecycle", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("only refreshes transport liveness for successful sync responses", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
const { client, lifecycle, setStatus } = createSyncLifecycleHarness();
|
||||
try {
|
||||
setStatus.mockClear();
|
||||
|
||||
client.emit("sync.state", "PREPARED", null, undefined);
|
||||
expect(setStatus).toHaveBeenLastCalledWith(
|
||||
expect.not.objectContaining({
|
||||
lastTransportActivityAt: expect.any(Number),
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
client.emit("sync.state", "SYNCING", "PREPARED", undefined);
|
||||
const syncAt = Date.now();
|
||||
expect(setStatus).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
lastTransportActivityAt: syncAt,
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
client.emit("sync.state", "CATCHUP", "SYNCING", undefined);
|
||||
expect(setStatus).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
lastTransportActivityAt: syncAt,
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
lifecycle.dispose();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not downgrade a fatal error to stopped during shutdown", async () => {
|
||||
const { client, lifecycle, setStatus, setStopping, statusController } =
|
||||
createSyncLifecycleHarness({
|
||||
|
||||
@@ -284,10 +284,6 @@ describe("slackPlugin actions", () => {
|
||||
});
|
||||
|
||||
describe("slackPlugin status", () => {
|
||||
it("opts out of the generic stale socket health check", () => {
|
||||
expect(slackPlugin.status?.skipStaleSocketHealthCheck).toBe(true);
|
||||
});
|
||||
|
||||
it("uses the direct Slack probe helper when runtime is not initialized", async () => {
|
||||
const probeSpy = vi.spyOn(probeModule, "probeSlack").mockResolvedValueOnce({
|
||||
ok: true,
|
||||
|
||||
@@ -508,9 +508,8 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
removeAckAfterReply,
|
||||
});
|
||||
|
||||
// Wire up event liveness tracking: update lastEventAt on every inbound event
|
||||
// so the health monitor can detect "half-dead" sockets that pass health checks
|
||||
// but silently stop delivering events.
|
||||
// Slack's socket-mode client keeps ping/pong health private and closes on
|
||||
// missed pongs. App events are useful status activity, but not transport proof.
|
||||
const trackEvent = opts.setStatus
|
||||
? () => {
|
||||
opts.setStatus!({ lastEventAt: Date.now(), lastInboundAt: Date.now() });
|
||||
|
||||
@@ -14,7 +14,7 @@ export type MonitorSlackOpts = {
|
||||
abortSignal?: AbortSignal;
|
||||
mediaMaxMb?: number;
|
||||
slashCommand?: SlackSlashCommandConfig;
|
||||
/** Callback to update the channel account status snapshot (e.g. lastEventAt). */
|
||||
/** Callback to update app-level channel account activity (e.g. lastEventAt). */
|
||||
setStatus?: (next: Record<string, unknown>) => void;
|
||||
/** Callback to read the current channel account status snapshot. */
|
||||
getStatus?: () => Record<string, unknown>;
|
||||
|
||||
@@ -590,6 +590,7 @@ describe("TelegramPollingSession", () => {
|
||||
connected: false,
|
||||
lastConnectedAt: null,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
});
|
||||
const connectedPatch = setStatus.mock.calls.find(
|
||||
([patch]) => (patch as Record<string, unknown>).connected === true,
|
||||
@@ -599,9 +600,11 @@ describe("TelegramPollingSession", () => {
|
||||
mode: "polling",
|
||||
lastConnectedAt: expect.any(Number),
|
||||
lastEventAt: expect.any(Number),
|
||||
lastTransportActivityAt: expect.any(Number),
|
||||
lastError: null,
|
||||
});
|
||||
expect(connectedPatch?.lastConnectedAt).toBe(connectedPatch?.lastEventAt);
|
||||
expect(connectedPatch?.lastTransportActivityAt).toBe(connectedPatch?.lastEventAt);
|
||||
|
||||
abort.abort();
|
||||
resolveFirstTask();
|
||||
@@ -681,6 +684,7 @@ describe("TelegramPollingSession", () => {
|
||||
mode: "polling",
|
||||
lastConnectedAt: null,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
});
|
||||
expect(disconnectedPatches[1]?.[0]).toEqual({
|
||||
mode: "polling",
|
||||
|
||||
@@ -15,12 +15,14 @@ describe("createTelegramPollingStatusPublisher", () => {
|
||||
connected: false,
|
||||
lastConnectedAt: null,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
});
|
||||
expect(setStatus).toHaveBeenNthCalledWith(2, {
|
||||
mode: "polling",
|
||||
connected: true,
|
||||
lastConnectedAt: 1234,
|
||||
lastEventAt: 1234,
|
||||
lastTransportActivityAt: 1234,
|
||||
lastError: null,
|
||||
});
|
||||
expect(setStatus).toHaveBeenNthCalledWith(3, {
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
|
||||
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
|
||||
import {
|
||||
createConnectedChannelStatusPatch,
|
||||
createTransportActivityStatusPatch,
|
||||
} from "openclaw/plugin-sdk/gateway-runtime";
|
||||
|
||||
type TelegramPollingStatusSink = (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
|
||||
|
||||
@@ -11,11 +14,15 @@ export function createTelegramPollingStatusPublisher(setStatus?: TelegramPolling
|
||||
connected: false,
|
||||
lastConnectedAt: null,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
});
|
||||
},
|
||||
notePollSuccess(at = Date.now()) {
|
||||
setStatus?.({
|
||||
...createConnectedChannelStatusPatch(at),
|
||||
// A successful getUpdates call proves the Telegram HTTP long-poll is alive
|
||||
// even when the response has no user-visible updates.
|
||||
...createTransportActivityStatusPatch(at),
|
||||
mode: "polling",
|
||||
lastError: null,
|
||||
});
|
||||
|
||||
@@ -34,4 +34,16 @@ describe("projectSafeChannelAccountSnapshotFields", () => {
|
||||
baseUrl: "https://chat.example.test/",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves non-secret transport liveness timestamps", () => {
|
||||
const snapshot = projectSafeChannelAccountSnapshotFields({
|
||||
lastInboundAt: 123,
|
||||
lastTransportActivityAt: 456,
|
||||
});
|
||||
|
||||
expect(snapshot).toEqual({
|
||||
lastInboundAt: 123,
|
||||
lastTransportActivityAt: 456,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -186,6 +186,9 @@ export function projectSafeChannelAccountSnapshotFields(
|
||||
...(readNumber(record, "lastInboundAt") !== undefined
|
||||
? { lastInboundAt: readNumber(record, "lastInboundAt") }
|
||||
: {}),
|
||||
...(readNumber(record, "lastTransportActivityAt") !== undefined
|
||||
? { lastTransportActivityAt: readNumber(record, "lastTransportActivityAt") }
|
||||
: {}),
|
||||
...(healthState ? { healthState } : {}),
|
||||
...(mode ? { mode } : {}),
|
||||
...(dmPolicy ? { dmPolicy } : {}),
|
||||
|
||||
@@ -172,9 +172,6 @@ export type ChannelGroupAdapter = {
|
||||
|
||||
export type ChannelStatusAdapter<ResolvedAccount, Probe = unknown, Audit = unknown> = {
|
||||
defaultRuntime?: ChannelAccountSnapshot;
|
||||
skipStaleSocketHealthCheck?: boolean;
|
||||
/** Runtime `mode` values where `lastEventAt` can prove connected socket liveness. */
|
||||
staleSocketHealthCheckModes?: readonly string[];
|
||||
buildChannelSummary?: BivariantCallback<
|
||||
(params: {
|
||||
account: ResolvedAccount;
|
||||
|
||||
@@ -199,6 +199,7 @@ export type ChannelAccountSnapshot = {
|
||||
| null;
|
||||
lastMessageAt?: number | null;
|
||||
lastEventAt?: number | null;
|
||||
lastTransportActivityAt?: number | null;
|
||||
lastError?: string | null;
|
||||
healthState?: string;
|
||||
lastStartAt?: number | null;
|
||||
|
||||
@@ -21244,7 +21244,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
maximum: 9007199254740991,
|
||||
title: "Gateway Channel Stale Event Threshold (min)",
|
||||
description:
|
||||
"How many minutes a connected channel can go without receiving any event before the health monitor treats it as a stale socket and triggers a restart. Default: 30.",
|
||||
"How many minutes a connected channel can go without provider-proven transport activity before the health monitor treats it as a stale socket and triggers a restart. Default: 30.",
|
||||
},
|
||||
channelMaxRestartsPerHour: {
|
||||
type: "integer",
|
||||
@@ -23561,7 +23561,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
},
|
||||
"gateway.channelStaleEventThresholdMinutes": {
|
||||
label: "Gateway Channel Stale Event Threshold (min)",
|
||||
help: "How many minutes a connected channel can go without receiving any event before the health monitor treats it as a stale socket and triggers a restart. Default: 30.",
|
||||
help: "How many minutes a connected channel can go without provider-proven transport activity before the health monitor treats it as a stale socket and triggers a restart. Default: 30.",
|
||||
tags: ["network"],
|
||||
},
|
||||
"gateway.channelMaxRestartsPerHour": {
|
||||
|
||||
@@ -98,7 +98,7 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"gateway.channelHealthCheckMinutes":
|
||||
"Interval in minutes for automatic channel health probing and status updates. Use lower intervals for faster detection, or higher intervals to reduce periodic probe noise.",
|
||||
"gateway.channelStaleEventThresholdMinutes":
|
||||
"How many minutes a connected channel can go without receiving any event before the health monitor treats it as a stale socket and triggers a restart. Default: 30.",
|
||||
"How many minutes a connected channel can go without provider-proven transport activity before the health monitor treats it as a stale socket and triggers a restart. Default: 30.",
|
||||
"gateway.channelMaxRestartsPerHour":
|
||||
"Maximum number of health-monitor-initiated channel restarts allowed within a rolling one-hour window. Once hit, further restarts are skipped until the window expires. Default: 10.",
|
||||
"gateway.tailscale":
|
||||
|
||||
@@ -440,9 +440,9 @@ export type GatewayConfig = {
|
||||
*/
|
||||
channelHealthCheckMinutes?: number;
|
||||
/**
|
||||
* Stale event threshold in minutes for the channel health monitor.
|
||||
* A connected channel that receives no events for this duration is treated
|
||||
* as a stale socket and restarted. Default: 30.
|
||||
* Stale transport-activity threshold in minutes for the channel health monitor.
|
||||
* A connected channel that reports no provider-proven transport activity for
|
||||
* this duration is treated as a stale socket and restarted. Default: 30.
|
||||
*/
|
||||
channelStaleEventThresholdMinutes?: number;
|
||||
/**
|
||||
|
||||
@@ -496,23 +496,23 @@ describe("channel-health-monitor", () => {
|
||||
describe("stale socket detection", () => {
|
||||
const STALE_THRESHOLD = 30 * 60_000;
|
||||
|
||||
it("restarts a channel with no events past the stale threshold", async () => {
|
||||
it("restarts a channel with no transport activity past the stale threshold", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSlackSnapshotManager(
|
||||
runningConnectedSlackAccount({
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: now - STALE_THRESHOLD - 30_000,
|
||||
lastTransportActivityAt: now - STALE_THRESHOLD - 30_000,
|
||||
}),
|
||||
);
|
||||
await expectRestartedChannel(manager, "slack");
|
||||
});
|
||||
|
||||
it("skips channels with recent events", async () => {
|
||||
it("skips channels with recent transport activity", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSlackSnapshotManager(
|
||||
runningConnectedSlackAccount({
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: now - 5_000,
|
||||
lastTransportActivityAt: now - 5_000,
|
||||
}),
|
||||
);
|
||||
await expectNoRestart(manager);
|
||||
@@ -523,24 +523,24 @@ describe("channel-health-monitor", () => {
|
||||
const manager = createSlackSnapshotManager(
|
||||
runningConnectedSlackAccount({
|
||||
lastStartAt: now - 5_000,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
}),
|
||||
);
|
||||
await expectNoRestart(manager);
|
||||
});
|
||||
|
||||
it("restarts a channel that has seen no events since connect past the stale threshold", async () => {
|
||||
it("restarts a channel with no transport activity since connect past the stale threshold", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSlackSnapshotManager(
|
||||
runningConnectedSlackAccount({
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastTransportActivityAt: now - STALE_THRESHOLD - 60_000,
|
||||
}),
|
||||
);
|
||||
await expectRestartedChannel(manager, "slack");
|
||||
});
|
||||
|
||||
it("skips connected channels that do not report event liveness", async () => {
|
||||
it("skips connected channels that do not report transport liveness", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
telegram: {
|
||||
@@ -550,7 +550,7 @@ describe("channel-health-monitor", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
},
|
||||
},
|
||||
});
|
||||
@@ -563,7 +563,7 @@ describe("channel-health-monitor", () => {
|
||||
const manager = createSlackSnapshotManager(
|
||||
runningConnectedSlackAccount({
|
||||
lastStartAt: now - customThreshold - 60_000,
|
||||
lastEventAt: now - customThreshold - 30_000,
|
||||
lastTransportActivityAt: now - customThreshold - 30_000,
|
||||
}),
|
||||
);
|
||||
const monitor = await startAndRunCheck(manager, {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { getChannelPlugin } from "../channels/plugins/index.js";
|
||||
import type { ChannelId } from "../channels/plugins/types.public.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import {
|
||||
@@ -19,10 +18,10 @@ const DEFAULT_MAX_RESTARTS_PER_HOUR = 10;
|
||||
const ONE_HOUR_MS = 60 * 60_000;
|
||||
|
||||
/**
|
||||
* How long a connected channel can go without receiving any event before
|
||||
* How long a connected channel can go without proven transport activity before
|
||||
* the health monitor treats it as a "stale socket" and triggers a restart.
|
||||
* This catches the half-dead WebSocket scenario where the connection appears
|
||||
* alive (health checks pass) but Slack silently stops delivering events.
|
||||
* Providers should only publish that timestamp from transport/heartbeat/poll
|
||||
* signals, not from ordinary app messages.
|
||||
*/
|
||||
export type ChannelHealthTimingPolicy = {
|
||||
monitorStartupGraceMs: number;
|
||||
@@ -125,14 +124,11 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
|
||||
if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) {
|
||||
continue;
|
||||
}
|
||||
const channelPluginStatus = getChannelPlugin(channelId)?.status;
|
||||
const healthPolicy: ChannelHealthPolicy = {
|
||||
channelId,
|
||||
now,
|
||||
staleEventThresholdMs: timing.staleEventThresholdMs,
|
||||
channelConnectGraceMs: timing.channelConnectGraceMs,
|
||||
skipStaleSocketCheck: channelPluginStatus?.skipStaleSocketHealthCheck,
|
||||
staleSocketHealthCheckModes: channelPluginStatus?.staleSocketHealthCheckModes,
|
||||
};
|
||||
const health = evaluateChannelHealth(status, healthPolicy);
|
||||
if (health.healthy) {
|
||||
|
||||
@@ -116,7 +116,27 @@ describe("evaluateChannelHealth", () => {
|
||||
expect(evaluation).toEqual({ healthy: false, reason: "disconnected" });
|
||||
});
|
||||
|
||||
it("flags stale sockets when no events arrive beyond threshold", () => {
|
||||
it("flags stale sockets when transport activity ages beyond threshold", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastTransportActivityAt: 0,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
|
||||
});
|
||||
|
||||
it("ignores stale app events without transport activity", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
@@ -133,10 +153,10 @@ describe("evaluateChannelHealth", () => {
|
||||
staleEventThresholdMs: 30_000,
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("flags stale sockets for channels with an allowed health-check mode", () => {
|
||||
it("flags stale sockets for telegram polling channels with transport activity", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
@@ -144,7 +164,7 @@ describe("evaluateChannelHealth", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: 0,
|
||||
lastTransportActivityAt: 0,
|
||||
mode: "polling",
|
||||
},
|
||||
{
|
||||
@@ -152,13 +172,12 @@ describe("evaluateChannelHealth", () => {
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
staleSocketHealthCheckModes: ["polling"],
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
|
||||
});
|
||||
|
||||
it("skips stale-socket detection when an allowlisted health-check mode is missing", () => {
|
||||
it("does not special-case malformed channel mode when transport activity is explicit", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
@@ -166,28 +185,7 @@ describe("evaluateChannelHealth", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: 0,
|
||||
},
|
||||
{
|
||||
channelId: "example",
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
staleSocketHealthCheckModes: ["polling"],
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("skips stale-socket detection when the health-check mode is malformed", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: 0,
|
||||
lastTransportActivityAt: 0,
|
||||
mode: { polling: true } as unknown as string,
|
||||
},
|
||||
{
|
||||
@@ -195,33 +193,32 @@ describe("evaluateChannelHealth", () => {
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
staleSocketHealthCheckModes: ["polling"],
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
|
||||
});
|
||||
|
||||
it("skips stale-socket detection for channels in webhook mode", () => {
|
||||
it("trusts explicit transport activity instead of webhook mode heuristics", () => {
|
||||
const evaluation = evaluateDiscordHealth({
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: 0,
|
||||
lastTransportActivityAt: 0,
|
||||
mode: "webhook",
|
||||
});
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
|
||||
});
|
||||
|
||||
it("does not flag stale sockets for channels without event tracking", () => {
|
||||
it("does not flag stale sockets for channels without transport tracking", () => {
|
||||
const evaluation = evaluateDiscordHealth({
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
});
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
@@ -233,7 +230,7 @@ describe("evaluateChannelHealth", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: 0,
|
||||
lastTransportActivityAt: 0,
|
||||
},
|
||||
75_000,
|
||||
"slack",
|
||||
@@ -241,7 +238,7 @@ describe("evaluateChannelHealth", () => {
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("ignores inherited event timestamps from a previous lifecycle", () => {
|
||||
it("ignores inherited transport timestamps from a previous lifecycle", () => {
|
||||
const evaluation = evaluateDiscordHealth(
|
||||
{
|
||||
running: true,
|
||||
@@ -249,7 +246,7 @@ describe("evaluateChannelHealth", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 50_000,
|
||||
lastEventAt: 10_000,
|
||||
lastTransportActivityAt: 10_000,
|
||||
},
|
||||
75_000,
|
||||
"slack",
|
||||
@@ -257,7 +254,7 @@ describe("evaluateChannelHealth", () => {
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("flags inherited event timestamps after the lifecycle exceeds the stale threshold", () => {
|
||||
it("flags inherited transport timestamps after the lifecycle exceeds the stale threshold", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
@@ -265,7 +262,7 @@ describe("evaluateChannelHealth", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 50_000,
|
||||
lastEventAt: 10_000,
|
||||
lastTransportActivityAt: 10_000,
|
||||
},
|
||||
{
|
||||
channelId: "slack",
|
||||
|
||||
@@ -10,6 +10,7 @@ export type ChannelHealthSnapshot = {
|
||||
activeRuns?: number;
|
||||
lastRunActivityAt?: number | null;
|
||||
lastEventAt?: number | null;
|
||||
lastTransportActivityAt?: number | null;
|
||||
lastStartAt?: number | null;
|
||||
reconnectAttempts?: number;
|
||||
mode?: string;
|
||||
@@ -35,8 +36,6 @@ export type ChannelHealthPolicy = {
|
||||
now: number;
|
||||
staleEventThresholdMs: number;
|
||||
channelConnectGraceMs: number;
|
||||
skipStaleSocketCheck?: boolean;
|
||||
staleSocketHealthCheckModes?: readonly string[];
|
||||
};
|
||||
|
||||
export type ChannelRestartReason =
|
||||
@@ -56,24 +55,10 @@ const BUSY_ACTIVITY_STALE_THRESHOLD_MS = 25 * 60_000;
|
||||
export const DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS = 30 * 60_000;
|
||||
export const DEFAULT_CHANNEL_CONNECT_GRACE_MS = 120_000;
|
||||
|
||||
function shouldCheckStaleSocketForMode(
|
||||
mode: string | undefined,
|
||||
healthCheckModes: readonly string[] | undefined,
|
||||
): boolean {
|
||||
if (healthCheckModes) {
|
||||
const normalizedModes = new Set(
|
||||
healthCheckModes.map((entry) => entry.trim().toLowerCase()).filter(Boolean),
|
||||
);
|
||||
return Boolean(mode && normalizedModes.has(mode));
|
||||
}
|
||||
return mode !== "webhook";
|
||||
}
|
||||
|
||||
export function evaluateChannelHealth(
|
||||
snapshot: ChannelHealthSnapshot,
|
||||
policy: ChannelHealthPolicy,
|
||||
): ChannelHealthEvaluation {
|
||||
const mode = typeof snapshot.mode === "string" ? snapshot.mode.trim().toLowerCase() : undefined;
|
||||
if (!isManagedAccount(snapshot)) {
|
||||
return { healthy: true, reason: "unmanaged" };
|
||||
}
|
||||
@@ -93,9 +78,10 @@ export function evaluateChannelHealth(
|
||||
typeof snapshot.lastRunActivityAt === "number" && Number.isFinite(snapshot.lastRunActivityAt)
|
||||
? snapshot.lastRunActivityAt
|
||||
: null;
|
||||
const lastEventAt =
|
||||
typeof snapshot.lastEventAt === "number" && Number.isFinite(snapshot.lastEventAt)
|
||||
? snapshot.lastEventAt
|
||||
const lastTransportActivityAt =
|
||||
typeof snapshot.lastTransportActivityAt === "number" &&
|
||||
Number.isFinite(snapshot.lastTransportActivityAt)
|
||||
? snapshot.lastTransportActivityAt
|
||||
: null;
|
||||
const busyStateInitializedForLifecycle =
|
||||
lastStartAt == null || (lastRunActivityAt != null && lastRunActivityAt >= lastStartAt);
|
||||
@@ -126,20 +112,18 @@ export function evaluateChannelHealth(
|
||||
if (snapshot.connected === false) {
|
||||
return { healthy: false, reason: "disconnected" };
|
||||
}
|
||||
const shouldCheckStaleSocket =
|
||||
policy.skipStaleSocketCheck !== true &&
|
||||
snapshot.connected === true &&
|
||||
lastEventAt != null &&
|
||||
shouldCheckStaleSocketForMode(mode, policy.staleSocketHealthCheckModes);
|
||||
// App-level events are not socket liveness: quiet Slack/Discord workspaces can
|
||||
// go idle while their upstream clients maintain heartbeats internally.
|
||||
const shouldCheckStaleSocket = snapshot.connected === true && lastTransportActivityAt != null;
|
||||
if (shouldCheckStaleSocket) {
|
||||
if (lastStartAt != null && lastEventAt < lastStartAt) {
|
||||
if (lastStartAt != null && lastTransportActivityAt < lastStartAt) {
|
||||
const lifecycleEventGap = Math.max(0, policy.now - lastStartAt);
|
||||
if (lifecycleEventGap <= policy.staleEventThresholdMs) {
|
||||
return { healthy: true, reason: "healthy" };
|
||||
}
|
||||
return { healthy: false, reason: "stale-socket" };
|
||||
}
|
||||
const eventAge = policy.now - lastEventAt;
|
||||
const eventAge = policy.now - lastTransportActivityAt;
|
||||
if (eventAge > policy.staleEventThresholdMs) {
|
||||
return { healthy: false, reason: "stale-socket" };
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { createConnectedChannelStatusPatch } from "./channel-status-patches.js";
|
||||
import {
|
||||
createConnectedChannelStatusPatch,
|
||||
createTransportActivityStatusPatch,
|
||||
} from "./channel-status-patches.js";
|
||||
|
||||
describe("createConnectedChannelStatusPatch", () => {
|
||||
it("uses one timestamp for connected event-liveness state", () => {
|
||||
@@ -10,3 +13,11 @@ describe("createConnectedChannelStatusPatch", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("createTransportActivityStatusPatch", () => {
|
||||
it("reports transport liveness without implying a new connection event", () => {
|
||||
expect(createTransportActivityStatusPatch(1234)).toEqual({
|
||||
lastTransportActivityAt: 1234,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,6 +4,10 @@ export type ConnectedChannelStatusPatch = {
|
||||
lastEventAt: number;
|
||||
};
|
||||
|
||||
export type TransportActivityChannelStatusPatch = {
|
||||
lastTransportActivityAt: number;
|
||||
};
|
||||
|
||||
export function createConnectedChannelStatusPatch(
|
||||
at: number = Date.now(),
|
||||
): ConnectedChannelStatusPatch {
|
||||
@@ -13,3 +17,11 @@ export function createConnectedChannelStatusPatch(
|
||||
lastEventAt: at,
|
||||
};
|
||||
}
|
||||
|
||||
export function createTransportActivityStatusPatch(
|
||||
at: number = Date.now(),
|
||||
): TransportActivityChannelStatusPatch {
|
||||
return {
|
||||
lastTransportActivityAt: at,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -130,6 +130,7 @@ export const ChannelAccountSnapshotSchema = Type.Object(
|
||||
lastStopAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastInboundAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastOutboundAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastTransportActivityAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
busy: Type.Optional(Type.Boolean()),
|
||||
activeRuns: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastRunActivityAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
|
||||
@@ -32,7 +32,10 @@ function createManager(snapshot: ChannelRuntimeSnapshot): ChannelManager {
|
||||
};
|
||||
}
|
||||
|
||||
function createHealthyDiscordManager(startedAt: number, lastEventAt: number): ChannelManager {
|
||||
function createHealthyDiscordManager(
|
||||
startedAt: number,
|
||||
lastTransportActivityAt: number,
|
||||
): ChannelManager {
|
||||
return createManager(
|
||||
snapshotWith({
|
||||
discord: {
|
||||
@@ -41,7 +44,7 @@ function createHealthyDiscordManager(startedAt: number, lastEventAt: number): Ch
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: startedAt,
|
||||
lastEventAt,
|
||||
lastTransportActivityAt,
|
||||
},
|
||||
}),
|
||||
);
|
||||
@@ -216,7 +219,7 @@ describe("createReadinessChecker", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: startedAt,
|
||||
lastEventAt: Date.now() - 31 * 60_000,
|
||||
lastTransportActivityAt: Date.now() - 31 * 60_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
@@ -236,7 +239,7 @@ describe("createReadinessChecker", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: startedAt,
|
||||
lastEventAt: null,
|
||||
lastTransportActivityAt: null,
|
||||
},
|
||||
},
|
||||
});
|
||||
@@ -255,7 +258,7 @@ describe("createReadinessChecker", () => {
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: Date.now() - 5 * 60_000,
|
||||
lastEventAt: Date.now() - 1_000,
|
||||
lastTransportActivityAt: Date.now() - 1_000,
|
||||
},
|
||||
},
|
||||
cacheTtlMs: 1_000,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { getChannelPlugin } from "../../channels/plugins/index.js";
|
||||
import type { ChannelAccountSnapshot } from "../../channels/plugins/types.public.js";
|
||||
import {
|
||||
DEFAULT_CHANNEL_CONNECT_GRACE_MS,
|
||||
@@ -64,14 +63,11 @@ export function createReadinessChecker(deps: {
|
||||
if (!accountSnapshot) {
|
||||
continue;
|
||||
}
|
||||
const channelPluginStatus = getChannelPlugin(channelId)?.status;
|
||||
const policy: ChannelHealthPolicy = {
|
||||
now,
|
||||
staleEventThresholdMs: DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS,
|
||||
channelConnectGraceMs: DEFAULT_CHANNEL_CONNECT_GRACE_MS,
|
||||
channelId,
|
||||
skipStaleSocketCheck: channelPluginStatus?.skipStaleSocketHealthCheck,
|
||||
staleSocketHealthCheckModes: channelPluginStatus?.staleSocketHealthCheckModes,
|
||||
};
|
||||
const health = evaluateChannelHealth(accountSnapshot, policy);
|
||||
if (!health.healthy && !shouldIgnoreReadinessFailure(accountSnapshot, health)) {
|
||||
|
||||
@@ -99,7 +99,6 @@ function createComputedStatusAdapter() {
|
||||
{ ok: boolean }
|
||||
>({
|
||||
defaultRuntime: createDefaultChannelRuntimeState("default"),
|
||||
skipStaleSocketHealthCheck: true,
|
||||
resolveAccountSnapshot: ({ account, runtime, probe }) => ({
|
||||
accountId: account.accountId,
|
||||
enabled: account.enabled,
|
||||
@@ -119,7 +118,6 @@ function createAsyncStatusAdapter() {
|
||||
{ ok: boolean }
|
||||
>({
|
||||
defaultRuntime: createDefaultChannelRuntimeState("default"),
|
||||
skipStaleSocketHealthCheck: true,
|
||||
resolveAccountSnapshot: async ({ account, runtime, probe }) => ({
|
||||
accountId: account.accountId,
|
||||
enabled: account.enabled,
|
||||
@@ -285,7 +283,6 @@ describe("computed account status adapters", () => {
|
||||
"builds account snapshots from $name computed account metadata and extras",
|
||||
async ({ createStatus }) => {
|
||||
const status = createStatus();
|
||||
expect(status.skipStaleSocketHealthCheck).toBe(true);
|
||||
await expect(
|
||||
Promise.resolve(
|
||||
status.buildAccountSnapshot?.({
|
||||
@@ -331,6 +328,7 @@ describe("buildRuntimeAccountStatusSnapshot", () => {
|
||||
lastConnectedAt: 11,
|
||||
lastDisconnect: { at: 12, error: "boom" },
|
||||
lastEventAt: 13,
|
||||
lastTransportActivityAt: 14,
|
||||
healthState: "healthy",
|
||||
running: true,
|
||||
},
|
||||
@@ -345,6 +343,7 @@ describe("buildRuntimeAccountStatusSnapshot", () => {
|
||||
lastConnectedAt: 11,
|
||||
lastDisconnect: { at: 12, error: "boom" },
|
||||
lastEventAt: 13,
|
||||
lastTransportActivityAt: 14,
|
||||
healthState: "healthy",
|
||||
probe: undefined,
|
||||
},
|
||||
|
||||
@@ -30,6 +30,7 @@ type RuntimeLifecycleSnapshot = {
|
||||
}
|
||||
| null;
|
||||
lastEventAt?: number | null;
|
||||
lastTransportActivityAt?: number | null;
|
||||
healthState?: string | null;
|
||||
lastStartAt?: number | null;
|
||||
lastStopAt?: number | null;
|
||||
@@ -68,7 +69,6 @@ function buildComputedAccountStatusAdapterBase<ResolvedAccount, Probe, Audit>(
|
||||
): Omit<ChannelStatusAdapter<ResolvedAccount, Probe, Audit>, "buildAccountSnapshot"> {
|
||||
return {
|
||||
defaultRuntime: options.defaultRuntime,
|
||||
skipStaleSocketHealthCheck: options.skipStaleSocketHealthCheck,
|
||||
buildChannelSummary: options.buildChannelSummary,
|
||||
probeAccount: options.probeAccount,
|
||||
formatCapabilitiesProbe: options.formatCapabilitiesProbe,
|
||||
@@ -310,6 +310,9 @@ export function buildRuntimeAccountStatusSnapshot<TExtra extends StatusSnapshotE
|
||||
: {}),
|
||||
...(runtime?.lastDisconnect ? { lastDisconnect: runtime.lastDisconnect } : {}),
|
||||
...(typeof runtime?.lastEventAt === "number" ? { lastEventAt: runtime.lastEventAt } : {}),
|
||||
...(typeof runtime?.lastTransportActivityAt === "number"
|
||||
? { lastTransportActivityAt: runtime.lastTransportActivityAt }
|
||||
: {}),
|
||||
...(typeof runtime?.healthState === "string" ? { healthState: runtime.healthState } : {}),
|
||||
...(extra ?? ({} as TExtra)),
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user