diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index f774298b329..ece004a86ff 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -7c1b8b34618f44d56817ff54b930701710087dc7e76beaf4a554b6a5a25ba87c config-baseline.json -ed0c093e8acab2364608be3e65b98836600aea07df73ebb51d11919969c6c8fe config-baseline.core.json +c25e4b5e1c1469ec66bd9ced3759c2542a05b6ecb0db9aa71fa5a8054f8ef0a2 config-baseline.json +ed4e305904b4b954ffa72c07ea1900a116bfd874ac0c637227883abb99f753f9 config-baseline.core.json 6c0069b971ae298ae68516ebcd3eae0e8c82820d2e8f42ecbd2f53a2f9077371 config-baseline.channel.json e5b7756b5f45ba227aa1bfab990dcf8a2a8b409b9ca01ea8bb1d5cd7adc06c90 config-baseline.plugin.json diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 0b887d73e66..94aa3afffe5 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -6f605be396ee42efbe26cfd0cc90d7710ca378959aecd6388dd81a5b97996b43 plugin-sdk-api-baseline.json -9c34c7c068f6d3bc5cf44817fe14c470c1c091595296f829e1efb4d6e7ba3599 plugin-sdk-api-baseline.jsonl +3fa4d37ea6dbe5dfd540bcaa8c3bb2b81f2cfd439fa446d548ba159388c7b520 plugin-sdk-api-baseline.json +bfb88286eeb8dd11871242d17de4d4bd9196fb27ce5d30dbaa1c0d277c27be85 plugin-sdk-api-baseline.jsonl diff --git a/extensions/discord/src/monitor/provider.ts b/extensions/discord/src/monitor/provider.ts index 4ac946ff111..d28b0cd0082 100644 --- a/extensions/discord/src/monitor/provider.ts +++ b/extensions/discord/src/monitor/provider.ts @@ -1066,6 +1066,8 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const trackInboundEvent = opts.setStatus ? () => { const at = Date.now(); + // Carbon handles gateway heartbeats internally but does not expose a + // stable heartbeat-ack event, so Discord app events stay app-level only. opts.setStatus?.({ lastEventAt: at, lastInboundAt: at }); } : undefined; diff --git a/extensions/matrix/src/matrix/monitor/status.ts b/extensions/matrix/src/matrix/monitor/status.ts index 6e5630a845e..108485b2fad 100644 --- a/extensions/matrix/src/matrix/monitor/status.ts +++ b/extensions/matrix/src/matrix/monitor/status.ts @@ -1,5 +1,8 @@ import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; -import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; +import { + createConnectedChannelStatusPatch, + createTransportActivityStatusPatch, +} from "openclaw/plugin-sdk/gateway-runtime"; import { formatMatrixErrorMessage } from "../errors.js"; import { isMatrixDisconnectedSyncState, @@ -52,12 +55,15 @@ export function createMatrixMonitorStatusController(params: { }); }; - const noteConnected = (at = Date.now()) => { + const noteConnected = (at = Date.now(), options?: { transportActivity?: boolean }) => { if (status.connected === true) { status.lastEventAt = at; } else { Object.assign(status, createConnectedChannelStatusPatch(at)); } + if (options?.transportActivity) { + Object.assign(status, createTransportActivityStatusPatch(at)); + } status.lastError = null; status.lastDisconnect = null; status.healthState = "healthy"; @@ -83,7 +89,10 @@ export function createMatrixMonitorStatusController(params: { return { noteSyncState(state: MatrixSyncState, error?: unknown, at = Date.now()) { if (isMatrixReadySyncState(state)) { - noteConnected(at); + // matrix-js-sdk emits SYNCING after each successful /sync response. + // PREPARED can be cache-backed and CATCHUP is a lifecycle bridge, so + // neither should refresh transport liveness. + noteConnected(at, { transportActivity: state === "SYNCING" }); return; } if (isMatrixDisconnectedSyncState(state)) { diff --git a/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts b/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts index 487b6a14d7c..624ffbb2ff4 100644 --- a/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts +++ b/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts @@ -129,6 +129,42 @@ describe("createMatrixMonitorSyncLifecycle", () => { ); }); + it("only refreshes transport liveness for successful sync responses", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z")); + const { client, lifecycle, setStatus } = createSyncLifecycleHarness(); + try { + setStatus.mockClear(); + + client.emit("sync.state", "PREPARED", null, undefined); + expect(setStatus).toHaveBeenLastCalledWith( + expect.not.objectContaining({ + lastTransportActivityAt: expect.any(Number), + }), + ); + + await vi.advanceTimersByTimeAsync(2_000); + client.emit("sync.state", "SYNCING", "PREPARED", undefined); + const syncAt = Date.now(); + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + lastTransportActivityAt: syncAt, + }), + ); + + await vi.advanceTimersByTimeAsync(3_000); + client.emit("sync.state", "CATCHUP", "SYNCING", undefined); + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + lastTransportActivityAt: syncAt, + }), + ); + } finally { + lifecycle.dispose(); + vi.useRealTimers(); + } + }); + it("does not downgrade a fatal error to stopped during shutdown", async () => { const { client, lifecycle, setStatus, setStopping, statusController } = createSyncLifecycleHarness({ diff --git a/extensions/slack/src/channel.test.ts b/extensions/slack/src/channel.test.ts index 5f967d0e416..b79d92319e2 100644 --- a/extensions/slack/src/channel.test.ts +++ b/extensions/slack/src/channel.test.ts @@ -284,10 +284,6 @@ describe("slackPlugin actions", () => { }); describe("slackPlugin status", () => { - it("opts out of the generic stale socket health check", () => { - expect(slackPlugin.status?.skipStaleSocketHealthCheck).toBe(true); - }); - it("uses the direct Slack probe helper when runtime is not initialized", async () => { const probeSpy = vi.spyOn(probeModule, "probeSlack").mockResolvedValueOnce({ ok: true, diff --git a/extensions/slack/src/monitor/provider.ts b/extensions/slack/src/monitor/provider.ts index 25759313bec..f39127fc68a 100644 --- a/extensions/slack/src/monitor/provider.ts +++ b/extensions/slack/src/monitor/provider.ts @@ -508,9 +508,8 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { removeAckAfterReply, }); - // Wire up event liveness tracking: update lastEventAt on every inbound event - // so the health monitor can detect "half-dead" sockets that pass health checks - // but silently stop delivering events. + // Slack's socket-mode client keeps ping/pong health private and closes on + // missed pongs. App events are useful status activity, but not transport proof. const trackEvent = opts.setStatus ? () => { opts.setStatus!({ lastEventAt: Date.now(), lastInboundAt: Date.now() }); diff --git a/extensions/slack/src/monitor/types.ts b/extensions/slack/src/monitor/types.ts index b7040fa1444..7df610361f1 100644 --- a/extensions/slack/src/monitor/types.ts +++ b/extensions/slack/src/monitor/types.ts @@ -14,7 +14,7 @@ export type MonitorSlackOpts = { abortSignal?: AbortSignal; mediaMaxMb?: number; slashCommand?: SlackSlashCommandConfig; - /** Callback to update the channel account status snapshot (e.g. lastEventAt). */ + /** Callback to update app-level channel account activity (e.g. lastEventAt). */ setStatus?: (next: Record) => void; /** Callback to read the current channel account status snapshot. */ getStatus?: () => Record; diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 4822e20d24a..b9899d915c7 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -590,6 +590,7 @@ describe("TelegramPollingSession", () => { connected: false, lastConnectedAt: null, lastEventAt: null, + lastTransportActivityAt: null, }); const connectedPatch = setStatus.mock.calls.find( ([patch]) => (patch as Record).connected === true, @@ -599,9 +600,11 @@ describe("TelegramPollingSession", () => { mode: "polling", lastConnectedAt: expect.any(Number), lastEventAt: expect.any(Number), + lastTransportActivityAt: expect.any(Number), lastError: null, }); expect(connectedPatch?.lastConnectedAt).toBe(connectedPatch?.lastEventAt); + expect(connectedPatch?.lastTransportActivityAt).toBe(connectedPatch?.lastEventAt); abort.abort(); resolveFirstTask(); @@ -681,6 +684,7 @@ describe("TelegramPollingSession", () => { mode: "polling", lastConnectedAt: null, lastEventAt: null, + lastTransportActivityAt: null, }); expect(disconnectedPatches[1]?.[0]).toEqual({ mode: "polling", diff --git a/extensions/telegram/src/polling-status.test.ts b/extensions/telegram/src/polling-status.test.ts index d56b5fd4b44..90444d49bb6 100644 --- a/extensions/telegram/src/polling-status.test.ts +++ b/extensions/telegram/src/polling-status.test.ts @@ -15,12 +15,14 @@ describe("createTelegramPollingStatusPublisher", () => { connected: false, lastConnectedAt: null, lastEventAt: null, + lastTransportActivityAt: null, }); expect(setStatus).toHaveBeenNthCalledWith(2, { mode: "polling", connected: true, lastConnectedAt: 1234, lastEventAt: 1234, + lastTransportActivityAt: 1234, lastError: null, }); expect(setStatus).toHaveBeenNthCalledWith(3, { diff --git a/extensions/telegram/src/polling-status.ts b/extensions/telegram/src/polling-status.ts index 439013b39d5..b9f456306ee 100644 --- a/extensions/telegram/src/polling-status.ts +++ b/extensions/telegram/src/polling-status.ts @@ -1,5 +1,8 @@ import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; -import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; +import { + createConnectedChannelStatusPatch, + createTransportActivityStatusPatch, +} from "openclaw/plugin-sdk/gateway-runtime"; type TelegramPollingStatusSink = (patch: Omit) => void; @@ -11,11 +14,15 @@ export function createTelegramPollingStatusPublisher(setStatus?: TelegramPolling connected: false, lastConnectedAt: null, lastEventAt: null, + lastTransportActivityAt: null, }); }, notePollSuccess(at = Date.now()) { setStatus?.({ ...createConnectedChannelStatusPatch(at), + // A successful getUpdates call proves the Telegram HTTP long-poll is alive + // even when the response has no user-visible updates. + ...createTransportActivityStatusPatch(at), mode: "polling", lastError: null, }); diff --git a/src/channels/account-snapshot-fields.test.ts b/src/channels/account-snapshot-fields.test.ts index b6cf92a7836..938374004a9 100644 --- a/src/channels/account-snapshot-fields.test.ts +++ b/src/channels/account-snapshot-fields.test.ts @@ -34,4 +34,16 @@ describe("projectSafeChannelAccountSnapshotFields", () => { baseUrl: "https://chat.example.test/", }); }); + + it("preserves non-secret transport liveness timestamps", () => { + const snapshot = projectSafeChannelAccountSnapshotFields({ + lastInboundAt: 123, + lastTransportActivityAt: 456, + }); + + expect(snapshot).toEqual({ + lastInboundAt: 123, + lastTransportActivityAt: 456, + }); + }); }); diff --git a/src/channels/account-snapshot-fields.ts b/src/channels/account-snapshot-fields.ts index 0f4021613e8..d1100cad376 100644 --- a/src/channels/account-snapshot-fields.ts +++ b/src/channels/account-snapshot-fields.ts @@ -186,6 +186,9 @@ export function projectSafeChannelAccountSnapshotFields( ...(readNumber(record, "lastInboundAt") !== undefined ? { lastInboundAt: readNumber(record, "lastInboundAt") } : {}), + ...(readNumber(record, "lastTransportActivityAt") !== undefined + ? { lastTransportActivityAt: readNumber(record, "lastTransportActivityAt") } + : {}), ...(healthState ? { healthState } : {}), ...(mode ? { mode } : {}), ...(dmPolicy ? { dmPolicy } : {}), diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index d9cf9a9e60a..adb5e86c652 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -172,9 +172,6 @@ export type ChannelGroupAdapter = { export type ChannelStatusAdapter = { defaultRuntime?: ChannelAccountSnapshot; - skipStaleSocketHealthCheck?: boolean; - /** Runtime `mode` values where `lastEventAt` can prove connected socket liveness. */ - staleSocketHealthCheckModes?: readonly string[]; buildChannelSummary?: BivariantCallback< (params: { account: ResolvedAccount; diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index a5e1cc31c7f..667c07ed9a0 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -199,6 +199,7 @@ export type ChannelAccountSnapshot = { | null; lastMessageAt?: number | null; lastEventAt?: number | null; + lastTransportActivityAt?: number | null; lastError?: string | null; healthState?: string; lastStartAt?: number | null; diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 6622d1c3d95..937f8f32119 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -21244,7 +21244,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { maximum: 9007199254740991, title: "Gateway Channel Stale Event Threshold (min)", description: - "How many minutes a connected channel can go without receiving any event before the health monitor treats it as a stale socket and triggers a restart. Default: 30.", + "How many minutes a connected channel can go without provider-proven transport activity before the health monitor treats it as a stale socket and triggers a restart. Default: 30.", }, channelMaxRestartsPerHour: { type: "integer", @@ -23561,7 +23561,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { }, "gateway.channelStaleEventThresholdMinutes": { label: "Gateway Channel Stale Event Threshold (min)", - help: "How many minutes a connected channel can go without receiving any event before the health monitor treats it as a stale socket and triggers a restart. Default: 30.", + help: "How many minutes a connected channel can go without provider-proven transport activity before the health monitor treats it as a stale socket and triggers a restart. Default: 30.", tags: ["network"], }, "gateway.channelMaxRestartsPerHour": { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 0571eabe75f..f97561fd287 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -98,7 +98,7 @@ export const FIELD_HELP: Record = { "gateway.channelHealthCheckMinutes": "Interval in minutes for automatic channel health probing and status updates. Use lower intervals for faster detection, or higher intervals to reduce periodic probe noise.", "gateway.channelStaleEventThresholdMinutes": - "How many minutes a connected channel can go without receiving any event before the health monitor treats it as a stale socket and triggers a restart. Default: 30.", + "How many minutes a connected channel can go without provider-proven transport activity before the health monitor treats it as a stale socket and triggers a restart. Default: 30.", "gateway.channelMaxRestartsPerHour": "Maximum number of health-monitor-initiated channel restarts allowed within a rolling one-hour window. Once hit, further restarts are skipped until the window expires. Default: 10.", "gateway.tailscale": diff --git a/src/config/types.gateway.ts b/src/config/types.gateway.ts index 35ac0570a26..bfcca78b2bb 100644 --- a/src/config/types.gateway.ts +++ b/src/config/types.gateway.ts @@ -440,9 +440,9 @@ export type GatewayConfig = { */ channelHealthCheckMinutes?: number; /** - * Stale event threshold in minutes for the channel health monitor. - * A connected channel that receives no events for this duration is treated - * as a stale socket and restarted. Default: 30. + * Stale transport-activity threshold in minutes for the channel health monitor. + * A connected channel that reports no provider-proven transport activity for + * this duration is treated as a stale socket and restarted. Default: 30. */ channelStaleEventThresholdMinutes?: number; /** diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index efc392f8ee0..14fd42c4c1a 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -496,23 +496,23 @@ describe("channel-health-monitor", () => { describe("stale socket detection", () => { const STALE_THRESHOLD = 30 * 60_000; - it("restarts a channel with no events past the stale threshold", async () => { + it("restarts a channel with no transport activity past the stale threshold", async () => { const now = Date.now(); const manager = createSlackSnapshotManager( runningConnectedSlackAccount({ lastStartAt: now - STALE_THRESHOLD - 60_000, - lastEventAt: now - STALE_THRESHOLD - 30_000, + lastTransportActivityAt: now - STALE_THRESHOLD - 30_000, }), ); await expectRestartedChannel(manager, "slack"); }); - it("skips channels with recent events", async () => { + it("skips channels with recent transport activity", async () => { const now = Date.now(); const manager = createSlackSnapshotManager( runningConnectedSlackAccount({ lastStartAt: now - STALE_THRESHOLD - 60_000, - lastEventAt: now - 5_000, + lastTransportActivityAt: now - 5_000, }), ); await expectNoRestart(manager); @@ -523,24 +523,24 @@ describe("channel-health-monitor", () => { const manager = createSlackSnapshotManager( runningConnectedSlackAccount({ lastStartAt: now - 5_000, - lastEventAt: null, + lastTransportActivityAt: null, }), ); await expectNoRestart(manager); }); - it("restarts a channel that has seen no events since connect past the stale threshold", async () => { + it("restarts a channel with no transport activity since connect past the stale threshold", async () => { const now = Date.now(); const manager = createSlackSnapshotManager( runningConnectedSlackAccount({ lastStartAt: now - STALE_THRESHOLD - 60_000, - lastEventAt: now - STALE_THRESHOLD - 60_000, + lastTransportActivityAt: now - STALE_THRESHOLD - 60_000, }), ); await expectRestartedChannel(manager, "slack"); }); - it("skips connected channels that do not report event liveness", async () => { + it("skips connected channels that do not report transport liveness", async () => { const now = Date.now(); const manager = createSnapshotManager({ telegram: { @@ -550,7 +550,7 @@ describe("channel-health-monitor", () => { enabled: true, configured: true, lastStartAt: now - STALE_THRESHOLD - 60_000, - lastEventAt: null, + lastTransportActivityAt: null, }, }, }); @@ -563,7 +563,7 @@ describe("channel-health-monitor", () => { const manager = createSlackSnapshotManager( runningConnectedSlackAccount({ lastStartAt: now - customThreshold - 60_000, - lastEventAt: now - customThreshold - 30_000, + lastTransportActivityAt: now - customThreshold - 30_000, }), ); const monitor = await startAndRunCheck(manager, { diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index fdd328f0106..90bf3851a3a 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -1,4 +1,3 @@ -import { getChannelPlugin } from "../channels/plugins/index.js"; import type { ChannelId } from "../channels/plugins/types.public.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { @@ -19,10 +18,10 @@ const DEFAULT_MAX_RESTARTS_PER_HOUR = 10; const ONE_HOUR_MS = 60 * 60_000; /** - * How long a connected channel can go without receiving any event before + * How long a connected channel can go without proven transport activity before * the health monitor treats it as a "stale socket" and triggers a restart. - * This catches the half-dead WebSocket scenario where the connection appears - * alive (health checks pass) but Slack silently stops delivering events. + * Providers should only publish that timestamp from transport/heartbeat/poll + * signals, not from ordinary app messages. */ export type ChannelHealthTimingPolicy = { monitorStartupGraceMs: number; @@ -125,14 +124,11 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) { continue; } - const channelPluginStatus = getChannelPlugin(channelId)?.status; const healthPolicy: ChannelHealthPolicy = { channelId, now, staleEventThresholdMs: timing.staleEventThresholdMs, channelConnectGraceMs: timing.channelConnectGraceMs, - skipStaleSocketCheck: channelPluginStatus?.skipStaleSocketHealthCheck, - staleSocketHealthCheckModes: channelPluginStatus?.staleSocketHealthCheckModes, }; const health = evaluateChannelHealth(status, healthPolicy); if (health.healthy) { diff --git a/src/gateway/channel-health-policy.test.ts b/src/gateway/channel-health-policy.test.ts index 72cf246c117..daec553d2d8 100644 --- a/src/gateway/channel-health-policy.test.ts +++ b/src/gateway/channel-health-policy.test.ts @@ -116,7 +116,27 @@ describe("evaluateChannelHealth", () => { expect(evaluation).toEqual({ healthy: false, reason: "disconnected" }); }); - it("flags stale sockets when no events arrive beyond threshold", () => { + it("flags stale sockets when transport activity ages beyond threshold", () => { + const evaluation = evaluateChannelHealth( + { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: 0, + lastTransportActivityAt: 0, + }, + { + channelId: "discord", + now: 100_000, + channelConnectGraceMs: 10_000, + staleEventThresholdMs: 30_000, + }, + ); + expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); + }); + + it("ignores stale app events without transport activity", () => { const evaluation = evaluateChannelHealth( { running: true, @@ -133,10 +153,10 @@ describe("evaluateChannelHealth", () => { staleEventThresholdMs: 30_000, }, ); - expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); + expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); }); - it("flags stale sockets for channels with an allowed health-check mode", () => { + it("flags stale sockets for telegram polling channels with transport activity", () => { const evaluation = evaluateChannelHealth( { running: true, @@ -144,7 +164,7 @@ describe("evaluateChannelHealth", () => { enabled: true, configured: true, lastStartAt: 0, - lastEventAt: 0, + lastTransportActivityAt: 0, mode: "polling", }, { @@ -152,13 +172,12 @@ describe("evaluateChannelHealth", () => { now: 100_000, channelConnectGraceMs: 10_000, staleEventThresholdMs: 30_000, - staleSocketHealthCheckModes: ["polling"], }, ); expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); }); - it("skips stale-socket detection when an allowlisted health-check mode is missing", () => { + it("does not special-case malformed channel mode when transport activity is explicit", () => { const evaluation = evaluateChannelHealth( { running: true, @@ -166,28 +185,7 @@ describe("evaluateChannelHealth", () => { enabled: true, configured: true, lastStartAt: 0, - lastEventAt: 0, - }, - { - channelId: "example", - now: 100_000, - channelConnectGraceMs: 10_000, - staleEventThresholdMs: 30_000, - staleSocketHealthCheckModes: ["polling"], - }, - ); - expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); - }); - - it("skips stale-socket detection when the health-check mode is malformed", () => { - const evaluation = evaluateChannelHealth( - { - running: true, - connected: true, - enabled: true, - configured: true, - lastStartAt: 0, - lastEventAt: 0, + lastTransportActivityAt: 0, mode: { polling: true } as unknown as string, }, { @@ -195,33 +193,32 @@ describe("evaluateChannelHealth", () => { now: 100_000, channelConnectGraceMs: 10_000, staleEventThresholdMs: 30_000, - staleSocketHealthCheckModes: ["polling"], }, ); - expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); + expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); }); - it("skips stale-socket detection for channels in webhook mode", () => { + it("trusts explicit transport activity instead of webhook mode heuristics", () => { const evaluation = evaluateDiscordHealth({ running: true, connected: true, enabled: true, configured: true, lastStartAt: 0, - lastEventAt: 0, + lastTransportActivityAt: 0, mode: "webhook", }); - expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); + expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); }); - it("does not flag stale sockets for channels without event tracking", () => { + it("does not flag stale sockets for channels without transport tracking", () => { const evaluation = evaluateDiscordHealth({ running: true, connected: true, enabled: true, configured: true, lastStartAt: 0, - lastEventAt: null, + lastTransportActivityAt: null, }); expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); }); @@ -233,7 +230,7 @@ describe("evaluateChannelHealth", () => { enabled: true, configured: true, lastStartAt: 0, - lastEventAt: 0, + lastTransportActivityAt: 0, }, 75_000, "slack", @@ -241,7 +238,7 @@ describe("evaluateChannelHealth", () => { expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); }); - it("ignores inherited event timestamps from a previous lifecycle", () => { + it("ignores inherited transport timestamps from a previous lifecycle", () => { const evaluation = evaluateDiscordHealth( { running: true, @@ -249,7 +246,7 @@ describe("evaluateChannelHealth", () => { enabled: true, configured: true, lastStartAt: 50_000, - lastEventAt: 10_000, + lastTransportActivityAt: 10_000, }, 75_000, "slack", @@ -257,7 +254,7 @@ describe("evaluateChannelHealth", () => { expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); }); - it("flags inherited event timestamps after the lifecycle exceeds the stale threshold", () => { + it("flags inherited transport timestamps after the lifecycle exceeds the stale threshold", () => { const evaluation = evaluateChannelHealth( { running: true, @@ -265,7 +262,7 @@ describe("evaluateChannelHealth", () => { enabled: true, configured: true, lastStartAt: 50_000, - lastEventAt: 10_000, + lastTransportActivityAt: 10_000, }, { channelId: "slack", diff --git a/src/gateway/channel-health-policy.ts b/src/gateway/channel-health-policy.ts index 08049d9618e..47f8ea59f81 100644 --- a/src/gateway/channel-health-policy.ts +++ b/src/gateway/channel-health-policy.ts @@ -10,6 +10,7 @@ export type ChannelHealthSnapshot = { activeRuns?: number; lastRunActivityAt?: number | null; lastEventAt?: number | null; + lastTransportActivityAt?: number | null; lastStartAt?: number | null; reconnectAttempts?: number; mode?: string; @@ -35,8 +36,6 @@ export type ChannelHealthPolicy = { now: number; staleEventThresholdMs: number; channelConnectGraceMs: number; - skipStaleSocketCheck?: boolean; - staleSocketHealthCheckModes?: readonly string[]; }; export type ChannelRestartReason = @@ -56,24 +55,10 @@ const BUSY_ACTIVITY_STALE_THRESHOLD_MS = 25 * 60_000; export const DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS = 30 * 60_000; export const DEFAULT_CHANNEL_CONNECT_GRACE_MS = 120_000; -function shouldCheckStaleSocketForMode( - mode: string | undefined, - healthCheckModes: readonly string[] | undefined, -): boolean { - if (healthCheckModes) { - const normalizedModes = new Set( - healthCheckModes.map((entry) => entry.trim().toLowerCase()).filter(Boolean), - ); - return Boolean(mode && normalizedModes.has(mode)); - } - return mode !== "webhook"; -} - export function evaluateChannelHealth( snapshot: ChannelHealthSnapshot, policy: ChannelHealthPolicy, ): ChannelHealthEvaluation { - const mode = typeof snapshot.mode === "string" ? snapshot.mode.trim().toLowerCase() : undefined; if (!isManagedAccount(snapshot)) { return { healthy: true, reason: "unmanaged" }; } @@ -93,9 +78,10 @@ export function evaluateChannelHealth( typeof snapshot.lastRunActivityAt === "number" && Number.isFinite(snapshot.lastRunActivityAt) ? snapshot.lastRunActivityAt : null; - const lastEventAt = - typeof snapshot.lastEventAt === "number" && Number.isFinite(snapshot.lastEventAt) - ? snapshot.lastEventAt + const lastTransportActivityAt = + typeof snapshot.lastTransportActivityAt === "number" && + Number.isFinite(snapshot.lastTransportActivityAt) + ? snapshot.lastTransportActivityAt : null; const busyStateInitializedForLifecycle = lastStartAt == null || (lastRunActivityAt != null && lastRunActivityAt >= lastStartAt); @@ -126,20 +112,18 @@ export function evaluateChannelHealth( if (snapshot.connected === false) { return { healthy: false, reason: "disconnected" }; } - const shouldCheckStaleSocket = - policy.skipStaleSocketCheck !== true && - snapshot.connected === true && - lastEventAt != null && - shouldCheckStaleSocketForMode(mode, policy.staleSocketHealthCheckModes); + // App-level events are not socket liveness: quiet Slack/Discord workspaces can + // go idle while their upstream clients maintain heartbeats internally. + const shouldCheckStaleSocket = snapshot.connected === true && lastTransportActivityAt != null; if (shouldCheckStaleSocket) { - if (lastStartAt != null && lastEventAt < lastStartAt) { + if (lastStartAt != null && lastTransportActivityAt < lastStartAt) { const lifecycleEventGap = Math.max(0, policy.now - lastStartAt); if (lifecycleEventGap <= policy.staleEventThresholdMs) { return { healthy: true, reason: "healthy" }; } return { healthy: false, reason: "stale-socket" }; } - const eventAge = policy.now - lastEventAt; + const eventAge = policy.now - lastTransportActivityAt; if (eventAge > policy.staleEventThresholdMs) { return { healthy: false, reason: "stale-socket" }; } diff --git a/src/gateway/channel-status-patches.test.ts b/src/gateway/channel-status-patches.test.ts index 9297c23e69d..15239fcfd2d 100644 --- a/src/gateway/channel-status-patches.test.ts +++ b/src/gateway/channel-status-patches.test.ts @@ -1,5 +1,8 @@ import { describe, expect, it } from "vitest"; -import { createConnectedChannelStatusPatch } from "./channel-status-patches.js"; +import { + createConnectedChannelStatusPatch, + createTransportActivityStatusPatch, +} from "./channel-status-patches.js"; describe("createConnectedChannelStatusPatch", () => { it("uses one timestamp for connected event-liveness state", () => { @@ -10,3 +13,11 @@ describe("createConnectedChannelStatusPatch", () => { }); }); }); + +describe("createTransportActivityStatusPatch", () => { + it("reports transport liveness without implying a new connection event", () => { + expect(createTransportActivityStatusPatch(1234)).toEqual({ + lastTransportActivityAt: 1234, + }); + }); +}); diff --git a/src/gateway/channel-status-patches.ts b/src/gateway/channel-status-patches.ts index 9e1af6a33d7..85915c527a4 100644 --- a/src/gateway/channel-status-patches.ts +++ b/src/gateway/channel-status-patches.ts @@ -4,6 +4,10 @@ export type ConnectedChannelStatusPatch = { lastEventAt: number; }; +export type TransportActivityChannelStatusPatch = { + lastTransportActivityAt: number; +}; + export function createConnectedChannelStatusPatch( at: number = Date.now(), ): ConnectedChannelStatusPatch { @@ -13,3 +17,11 @@ export function createConnectedChannelStatusPatch( lastEventAt: at, }; } + +export function createTransportActivityStatusPatch( + at: number = Date.now(), +): TransportActivityChannelStatusPatch { + return { + lastTransportActivityAt: at, + }; +} diff --git a/src/gateway/protocol/schema/channels.ts b/src/gateway/protocol/schema/channels.ts index 2fb1bceea28..42015321801 100644 --- a/src/gateway/protocol/schema/channels.ts +++ b/src/gateway/protocol/schema/channels.ts @@ -130,6 +130,7 @@ export const ChannelAccountSnapshotSchema = Type.Object( lastStopAt: Type.Optional(Type.Integer({ minimum: 0 })), lastInboundAt: Type.Optional(Type.Integer({ minimum: 0 })), lastOutboundAt: Type.Optional(Type.Integer({ minimum: 0 })), + lastTransportActivityAt: Type.Optional(Type.Integer({ minimum: 0 })), busy: Type.Optional(Type.Boolean()), activeRuns: Type.Optional(Type.Integer({ minimum: 0 })), lastRunActivityAt: Type.Optional(Type.Integer({ minimum: 0 })), diff --git a/src/gateway/server/readiness.test.ts b/src/gateway/server/readiness.test.ts index 80ef1f9d90f..6541de9d0ca 100644 --- a/src/gateway/server/readiness.test.ts +++ b/src/gateway/server/readiness.test.ts @@ -32,7 +32,10 @@ function createManager(snapshot: ChannelRuntimeSnapshot): ChannelManager { }; } -function createHealthyDiscordManager(startedAt: number, lastEventAt: number): ChannelManager { +function createHealthyDiscordManager( + startedAt: number, + lastTransportActivityAt: number, +): ChannelManager { return createManager( snapshotWith({ discord: { @@ -41,7 +44,7 @@ function createHealthyDiscordManager(startedAt: number, lastEventAt: number): Ch enabled: true, configured: true, lastStartAt: startedAt, - lastEventAt, + lastTransportActivityAt, }, }), ); @@ -216,7 +219,7 @@ describe("createReadinessChecker", () => { enabled: true, configured: true, lastStartAt: startedAt, - lastEventAt: Date.now() - 31 * 60_000, + lastTransportActivityAt: Date.now() - 31 * 60_000, }, }, }); @@ -236,7 +239,7 @@ describe("createReadinessChecker", () => { enabled: true, configured: true, lastStartAt: startedAt, - lastEventAt: null, + lastTransportActivityAt: null, }, }, }); @@ -255,7 +258,7 @@ describe("createReadinessChecker", () => { enabled: true, configured: true, lastStartAt: Date.now() - 5 * 60_000, - lastEventAt: Date.now() - 1_000, + lastTransportActivityAt: Date.now() - 1_000, }, }, cacheTtlMs: 1_000, diff --git a/src/gateway/server/readiness.ts b/src/gateway/server/readiness.ts index f0901ffb026..40a6a1ff800 100644 --- a/src/gateway/server/readiness.ts +++ b/src/gateway/server/readiness.ts @@ -1,4 +1,3 @@ -import { getChannelPlugin } from "../../channels/plugins/index.js"; import type { ChannelAccountSnapshot } from "../../channels/plugins/types.public.js"; import { DEFAULT_CHANNEL_CONNECT_GRACE_MS, @@ -64,14 +63,11 @@ export function createReadinessChecker(deps: { if (!accountSnapshot) { continue; } - const channelPluginStatus = getChannelPlugin(channelId)?.status; const policy: ChannelHealthPolicy = { now, staleEventThresholdMs: DEFAULT_CHANNEL_STALE_EVENT_THRESHOLD_MS, channelConnectGraceMs: DEFAULT_CHANNEL_CONNECT_GRACE_MS, channelId, - skipStaleSocketCheck: channelPluginStatus?.skipStaleSocketHealthCheck, - staleSocketHealthCheckModes: channelPluginStatus?.staleSocketHealthCheckModes, }; const health = evaluateChannelHealth(accountSnapshot, policy); if (!health.healthy && !shouldIgnoreReadinessFailure(accountSnapshot, health)) { diff --git a/src/plugin-sdk/status-helpers.test.ts b/src/plugin-sdk/status-helpers.test.ts index ec1335656c5..1b52ab403c2 100644 --- a/src/plugin-sdk/status-helpers.test.ts +++ b/src/plugin-sdk/status-helpers.test.ts @@ -99,7 +99,6 @@ function createComputedStatusAdapter() { { ok: boolean } >({ defaultRuntime: createDefaultChannelRuntimeState("default"), - skipStaleSocketHealthCheck: true, resolveAccountSnapshot: ({ account, runtime, probe }) => ({ accountId: account.accountId, enabled: account.enabled, @@ -119,7 +118,6 @@ function createAsyncStatusAdapter() { { ok: boolean } >({ defaultRuntime: createDefaultChannelRuntimeState("default"), - skipStaleSocketHealthCheck: true, resolveAccountSnapshot: async ({ account, runtime, probe }) => ({ accountId: account.accountId, enabled: account.enabled, @@ -285,7 +283,6 @@ describe("computed account status adapters", () => { "builds account snapshots from $name computed account metadata and extras", async ({ createStatus }) => { const status = createStatus(); - expect(status.skipStaleSocketHealthCheck).toBe(true); await expect( Promise.resolve( status.buildAccountSnapshot?.({ @@ -331,6 +328,7 @@ describe("buildRuntimeAccountStatusSnapshot", () => { lastConnectedAt: 11, lastDisconnect: { at: 12, error: "boom" }, lastEventAt: 13, + lastTransportActivityAt: 14, healthState: "healthy", running: true, }, @@ -345,6 +343,7 @@ describe("buildRuntimeAccountStatusSnapshot", () => { lastConnectedAt: 11, lastDisconnect: { at: 12, error: "boom" }, lastEventAt: 13, + lastTransportActivityAt: 14, healthState: "healthy", probe: undefined, }, diff --git a/src/plugin-sdk/status-helpers.ts b/src/plugin-sdk/status-helpers.ts index 54091341eb1..98a9d2d77df 100644 --- a/src/plugin-sdk/status-helpers.ts +++ b/src/plugin-sdk/status-helpers.ts @@ -30,6 +30,7 @@ type RuntimeLifecycleSnapshot = { } | null; lastEventAt?: number | null; + lastTransportActivityAt?: number | null; healthState?: string | null; lastStartAt?: number | null; lastStopAt?: number | null; @@ -68,7 +69,6 @@ function buildComputedAccountStatusAdapterBase( ): Omit, "buildAccountSnapshot"> { return { defaultRuntime: options.defaultRuntime, - skipStaleSocketHealthCheck: options.skipStaleSocketHealthCheck, buildChannelSummary: options.buildChannelSummary, probeAccount: options.probeAccount, formatCapabilitiesProbe: options.formatCapabilitiesProbe, @@ -310,6 +310,9 @@ export function buildRuntimeAccountStatusSnapshot