From 0e97f962ac564aa3651945e2a521b8a63dbb3a94 Mon Sep 17 00:00:00 2001 From: "openclaw-clownfish[bot]" <280122609+openclaw-clownfish[bot]@users.noreply.github.com> Date: Thu, 30 Apr 2026 03:57:31 +0100 Subject: [PATCH] fix(mattermost): add WebSocket ping/pong keepalive (#73979) Adds Mattermost WebSocket ping/pong liveness checks so half-open sockets terminate and the existing reconnect loop recovers. Fixes #41837. Carries forward #57621. Refs #50138, #44160, and #51104. Thanks @JasonWang1124. Co-authored-by: JasonWang1124 <56307673+JasonWang1124@users.noreply.github.com> --- CHANGELOG.md | 1 + .../src/mattermost/monitor-websocket.test.ts | 95 ++++++++++++++++++- .../src/mattermost/monitor-websocket.ts | 65 +++++++++++++ .../monitor.inbound-system-event.test.ts | 10 +- 4 files changed, 169 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06aaa60c138..0cc8f050b7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -208,6 +208,7 @@ Docs: https://docs.openclaw.ai - CLI/status: fall back to a bounded local `status` RPC when loopback detail probes time out or report unknown capability, so reachable local gateways are no longer marked unreachable by slow read diagnostics. Fixes #73535; refs #48360, #62762, #51357, and #42019. Thanks @RacecarGuy, @justinschille, @DJBlackhawk, @tianyaqpzm, and @0xrsydn. - CLI/gateway: reuse cached paired-device auth during `gateway probe` and report post-connect diagnostic failures as degraded reachability, so healthy local gateways are no longer marked unreachable after loopback auth or read timeouts. Fixes #48360. Thanks @RacecarGuy. - Channels/Discord: give Discord Gateway WebSocket handshakes a 30s timeout so stalled TLS/network transitions emit an error and Carbon can continue its reconnect loop instead of leaving the bot silent until restart. Refs #50046. Thanks @codexGW. +- Mattermost/WebSocket: send protocol ping/pong keepalives and terminate stale sessions when pongs stop arriving, so silent TCP drops reconnect instead of leaving monitoring idle. Fixes #41837; carries forward #57621; refs #50138, #44160, and #51104. Thanks @JasonWang1124. - Channels/Telegram: suppress standalone failed edit/write warning payloads when a user-facing assistant error reply already covers the turn, while keeping unresolved mutating failures visible behind success-looking or suppressed-error replies. Fixes #39631; refs #73750; carries forward #39636 and #39717; leaves #39406 for configurable delivery policy. Thanks @Bartok9 and @Bortlesboat. - Control UI/agents: persist the Set Default action through `agents.list[].default` instead of writing the unsupported `agents.defaultId` field, so saved default-agent changes survive config validation. Fixes #65565; carries forward #72585. Thanks @luyao618. - NVIDIA/NIM: persist the `NVIDIA_API_KEY` provider marker and mark bundled NVIDIA Chat Completions models as string-content compatible, so NIM models load from `models.json` and OpenAI-compatible subagent calls send plain text content. Fixes #73013 and #50107; refs #73014. Thanks @bautrey, @iot2edge, @ifearghal, and @futhgar. diff --git a/extensions/mattermost/src/mattermost/monitor-websocket.test.ts b/extensions/mattermost/src/mattermost/monitor-websocket.test.ts index 46287f9021a..34ceeda962f 100644 --- a/extensions/mattermost/src/mattermost/monitor-websocket.test.ts +++ b/extensions/mattermost/src/mattermost/monitor-websocket.test.ts @@ -8,18 +8,21 @@ import { class FakeWebSocket implements MattermostWebSocketLike { public readonly sent: string[] = []; + public pingCalls = 0; public closeCalls = 0; public terminateCalls = 0; private openListeners: Array<() => void> = []; private messageListeners: Array<(data: Buffer) => void | Promise> = []; + private pongListeners: Array<(data: Buffer) => void> = []; private closeListeners: Array<(code: number, reason: Buffer) => void> = []; private errorListeners: Array<(err: unknown) => void> = []; on(event: "open", listener: () => void): void; on(event: "message", listener: (data: Buffer) => void | Promise): void; + on(event: "pong", listener: (data: Buffer) => void): void; on(event: "close", listener: (code: number, reason: Buffer) => void): void; on(event: "error", listener: (err: unknown) => void): void; - on(event: "open" | "message" | "close" | "error", listener: unknown): void { + on(event: "open" | "message" | "pong" | "close" | "error", listener: unknown): void { if (event === "open") { this.openListeners.push(listener as () => void); return; @@ -28,6 +31,10 @@ class FakeWebSocket implements MattermostWebSocketLike { this.messageListeners.push(listener as (data: Buffer) => void | Promise); return; } + if (event === "pong") { + this.pongListeners.push(listener as (data: Buffer) => void); + return; + } if (event === "close") { this.closeListeners.push(listener as (code: number, reason: Buffer) => void); return; @@ -35,6 +42,10 @@ class FakeWebSocket implements MattermostWebSocketLike { this.errorListeners.push(listener as (err: unknown) => void); } + ping(): void { + this.pingCalls++; + } + send(data: string): void { this.sent.push(data); } @@ -59,6 +70,12 @@ class FakeWebSocket implements MattermostWebSocketLike { } } + emitPong(data = Buffer.alloc(0)): void { + for (const listener of this.pongListeners) { + listener(data); + } + } + emitClose(code: number, reason = ""): void { const buffer = Buffer.from(reason, "utf8"); for (const listener of this.closeListeners) { @@ -282,6 +299,82 @@ describe("mattermost websocket monitor", () => { vi.useRealTimers(); }); + it("continues protocol keepalive when Mattermost responds with pong", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime: testRuntime(), + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + pingIntervalMs: 100, + pongTimeoutMs: 25, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + await vi.advanceTimersByTimeAsync(100); + expect(socket.pingCalls).toBe(1); + + socket.emitPong(); + await vi.advanceTimersByTimeAsync(25); + expect(socket.terminateCalls).toBe(0); + + await vi.advanceTimersByTimeAsync(75); + expect(socket.pingCalls).toBe(2); + + socket.emitClose(1000); + await connected; + vi.useRealTimers(); + }); + + it("terminates silent websocket drops when Mattermost misses pong timeout", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const runtime = testRuntime(); + let pollCount = 0; + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime, + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + getBotUpdateAt: async () => { + pollCount++; + return 1000; + }, + healthCheckIntervalMs: 100, + pingIntervalMs: 50, + pongTimeoutMs: 25, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + await vi.advanceTimersByTimeAsync(0); + expect(pollCount).toBe(1); + + await vi.advanceTimersByTimeAsync(50); + expect(socket.pingCalls).toBe(1); + expect(socket.terminateCalls).toBe(0); + + await vi.advanceTimersByTimeAsync(25); + expect(socket.terminateCalls).toBe(1); + expect(runtime.error).toHaveBeenCalledWith("mattermost websocket pong timeout — reconnecting"); + + await vi.advanceTimersByTimeAsync(500); + expect(socket.pingCalls).toBe(1); + expect(pollCount).toBe(1); + + socket.emitClose(1006); + await connected; + vi.useRealTimers(); + }); + it("does not terminate when getBotUpdateAt throws", async () => { vi.useFakeTimers(); const socket = new FakeWebSocket(); diff --git a/extensions/mattermost/src/mattermost/monitor-websocket.ts b/extensions/mattermost/src/mattermost/monitor-websocket.ts index 1bbd54ee0bf..168004f8bdc 100644 --- a/extensions/mattermost/src/mattermost/monitor-websocket.ts +++ b/extensions/mattermost/src/mattermost/monitor-websocket.ts @@ -33,8 +33,10 @@ export type MattermostEventPayload = { export type MattermostWebSocketLike = { on(event: "open", listener: () => void): void; on(event: "message", listener: (data: WebSocket.RawData) => void | Promise): void; + on(event: "pong", listener: (data: Buffer) => void): void; on(event: "close", listener: (code: number, reason: Buffer) => void): void; on(event: "error", listener: (err: unknown) => void): void; + ping(): void; send(data: string): void; close(): void; terminate(): void; @@ -104,6 +106,8 @@ type CreateMattermostConnectOnceOpts = { */ getBotUpdateAt?: () => Promise; healthCheckIntervalMs?: number; + pingIntervalMs?: number; + pongTimeoutMs?: number; }; export const defaultMattermostWebSocketFactory: MattermostWebSocketFactory = (url) => { @@ -144,6 +148,8 @@ export function createMattermostConnectOnce( ): () => Promise { const webSocketFactory = opts.webSocketFactory ?? defaultMattermostWebSocketFactory; const healthCheckIntervalMs = opts.healthCheckIntervalMs ?? 30_000; + const pingIntervalMs = opts.pingIntervalMs ?? 30_000; + const pongTimeoutMs = opts.pongTimeoutMs ?? 10_000; return async () => { const flowId = randomUUID(); const ws = webSocketFactory(opts.wsUrl); @@ -158,6 +164,9 @@ export function createMattermostConnectOnce( let healthCheckEnabled = getBotUpdateAt != null; let healthCheckInFlight = false; let healthCheckTimer: ReturnType | undefined; + let protocolKeepaliveEnabled = true; + let protocolPingTimer: ReturnType | undefined; + let protocolPongTimer: ReturnType | undefined; let initialUpdateAt: number | undefined; const clearTimers = () => { @@ -165,13 +174,60 @@ export function createMattermostConnectOnce( clearTimeout(healthCheckTimer); healthCheckTimer = undefined; } + if (protocolPingTimer !== undefined) { + clearTimeout(protocolPingTimer); + protocolPingTimer = undefined; + } + if (protocolPongTimer !== undefined) { + clearTimeout(protocolPongTimer); + protocolPongTimer = undefined; + } }; const stopHealthChecks = () => { healthCheckEnabled = false; + protocolKeepaliveEnabled = false; clearTimers(); }; + const sendProtocolPing = () => { + if (!protocolKeepaliveEnabled || settled) { + return; + } + if (protocolPongTimer !== undefined) { + clearTimeout(protocolPongTimer); + } + protocolPongTimer = setTimeout(() => { + protocolPongTimer = undefined; + if (!protocolKeepaliveEnabled || settled) { + return; + } + opts.runtime.error?.("mattermost websocket pong timeout — reconnecting"); + stopHealthChecks(); + ws.terminate(); + }, pongTimeoutMs); + try { + ws.ping(); + } catch (err) { + if (!protocolKeepaliveEnabled || settled) { + return; + } + opts.runtime.error?.(`mattermost websocket ping failed: ${String(err)}`); + stopHealthChecks(); + ws.terminate(); + } + }; + + const scheduleProtocolPing = () => { + if (!protocolKeepaliveEnabled || settled || protocolPingTimer !== undefined) { + return; + } + protocolPingTimer = setTimeout(() => { + protocolPingTimer = undefined; + sendProtocolPing(); + }, pingIntervalMs); + }; + const scheduleHealthCheck = () => { if (!getBotUpdateAt || !healthCheckEnabled || settled || healthCheckInFlight) { return; @@ -263,6 +319,7 @@ export function createMattermostConnectOnce( meta: { subsystem: "mattermost-websocket", eventType: "authentication_challenge" }, }); ws.send(authPayload); + scheduleProtocolPing(); // Periodically check if the bot account was modified (e.g. disable/enable). // After such a cycle the WebSocket silently stops delivering events even @@ -274,6 +331,14 @@ export function createMattermostConnectOnce( } }); + ws.on("pong", () => { + if (protocolPongTimer !== undefined) { + clearTimeout(protocolPongTimer); + protocolPongTimer = undefined; + } + scheduleProtocolPing(); + }); + ws.on("message", async (data) => { captureWsEvent({ url: opts.wsUrl, diff --git a/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts b/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts index 565130203cf..62912a66bd9 100644 --- a/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts @@ -5,14 +5,16 @@ class FakeWebSocket { public readonly sent: string[] = []; private readonly openListeners: Array<() => void> = []; private readonly messageListeners: Array<(data: Buffer) => void | Promise> = []; + private readonly pongListeners: Array<(data: Buffer) => void> = []; private readonly closeListeners: Array<(code: number, reason: Buffer) => void> = []; private readonly errorListeners: Array<(err: unknown) => void> = []; on(event: "open", listener: () => void): void; on(event: "message", listener: (data: Buffer) => void | Promise): void; + on(event: "pong", listener: (data: Buffer) => void): void; on(event: "close", listener: (code: number, reason: Buffer) => void): void; on(event: "error", listener: (err: unknown) => void): void; - on(event: "open" | "message" | "close" | "error", listener: unknown): void { + on(event: "open" | "message" | "pong" | "close" | "error", listener: unknown): void { if (event === "open") { this.openListeners.push(listener as () => void); return; @@ -21,6 +23,10 @@ class FakeWebSocket { this.messageListeners.push(listener as (data: Buffer) => void | Promise); return; } + if (event === "pong") { + this.pongListeners.push(listener as (data: Buffer) => void); + return; + } if (event === "close") { this.closeListeners.push(listener as (code: number, reason: Buffer) => void); return; @@ -32,6 +38,8 @@ class FakeWebSocket { this.sent.push(data); } + ping(): void {} + close(): void {} terminate(): void {}