fix(mattermost): add WebSocket ping/pong keepalive (#73979)

Adds Mattermost WebSocket ping/pong liveness checks so half-open sockets terminate and the existing reconnect loop recovers.

Fixes #41837.
Carries forward #57621.
Refs #50138, #44160, and #51104.
Thanks @JasonWang1124.

Co-authored-by: JasonWang1124 <56307673+JasonWang1124@users.noreply.github.com>
This commit is contained in:
openclaw-clownfish[bot]
2026-04-30 03:57:31 +01:00
committed by GitHub
parent 2d1523e573
commit 0e97f962ac
4 changed files with 169 additions and 2 deletions

View File

@@ -208,6 +208,7 @@ Docs: https://docs.openclaw.ai
- CLI/status: fall back to a bounded local `status` RPC when loopback detail probes time out or report unknown capability, so reachable local gateways are no longer marked unreachable by slow read diagnostics. Fixes #73535; refs #48360, #62762, #51357, and #42019. Thanks @RacecarGuy, @justinschille, @DJBlackhawk, @tianyaqpzm, and @0xrsydn.
- CLI/gateway: reuse cached paired-device auth during `gateway probe` and report post-connect diagnostic failures as degraded reachability, so healthy local gateways are no longer marked unreachable after loopback auth or read timeouts. Fixes #48360. Thanks @RacecarGuy.
- Channels/Discord: give Discord Gateway WebSocket handshakes a 30s timeout so stalled TLS/network transitions emit an error and Carbon can continue its reconnect loop instead of leaving the bot silent until restart. Refs #50046. Thanks @codexGW.
- Mattermost/WebSocket: send protocol ping/pong keepalives and terminate stale sessions when pongs stop arriving, so silent TCP drops reconnect instead of leaving monitoring idle. Fixes #41837; carries forward #57621; refs #50138, #44160, and #51104. Thanks @JasonWang1124.
- Channels/Telegram: suppress standalone failed edit/write warning payloads when a user-facing assistant error reply already covers the turn, while keeping unresolved mutating failures visible behind success-looking or suppressed-error replies. Fixes #39631; refs #73750; carries forward #39636 and #39717; leaves #39406 for configurable delivery policy. Thanks @Bartok9 and @Bortlesboat.
- Control UI/agents: persist the Set Default action through `agents.list[].default` instead of writing the unsupported `agents.defaultId` field, so saved default-agent changes survive config validation. Fixes #65565; carries forward #72585. Thanks @luyao618.
- NVIDIA/NIM: persist the `NVIDIA_API_KEY` provider marker and mark bundled NVIDIA Chat Completions models as string-content compatible, so NIM models load from `models.json` and OpenAI-compatible subagent calls send plain text content. Fixes #73013 and #50107; refs #73014. Thanks @bautrey, @iot2edge, @ifearghal, and @futhgar.

View File

@@ -8,18 +8,21 @@ import {
class FakeWebSocket implements MattermostWebSocketLike {
public readonly sent: string[] = [];
public pingCalls = 0;
public closeCalls = 0;
public terminateCalls = 0;
private openListeners: Array<() => void> = [];
private messageListeners: Array<(data: Buffer) => void | Promise<void>> = [];
private pongListeners: Array<(data: Buffer) => void> = [];
private closeListeners: Array<(code: number, reason: Buffer) => void> = [];
private errorListeners: Array<(err: unknown) => void> = [];
on(event: "open", listener: () => void): void;
on(event: "message", listener: (data: Buffer) => void | Promise<void>): void;
on(event: "pong", listener: (data: Buffer) => void): void;
on(event: "close", listener: (code: number, reason: Buffer) => void): void;
on(event: "error", listener: (err: unknown) => void): void;
on(event: "open" | "message" | "close" | "error", listener: unknown): void {
on(event: "open" | "message" | "pong" | "close" | "error", listener: unknown): void {
if (event === "open") {
this.openListeners.push(listener as () => void);
return;
@@ -28,6 +31,10 @@ class FakeWebSocket implements MattermostWebSocketLike {
this.messageListeners.push(listener as (data: Buffer) => void | Promise<void>);
return;
}
if (event === "pong") {
this.pongListeners.push(listener as (data: Buffer) => void);
return;
}
if (event === "close") {
this.closeListeners.push(listener as (code: number, reason: Buffer) => void);
return;
@@ -35,6 +42,10 @@ class FakeWebSocket implements MattermostWebSocketLike {
this.errorListeners.push(listener as (err: unknown) => void);
}
ping(): void {
this.pingCalls++;
}
send(data: string): void {
this.sent.push(data);
}
@@ -59,6 +70,12 @@ class FakeWebSocket implements MattermostWebSocketLike {
}
}
emitPong(data = Buffer.alloc(0)): void {
for (const listener of this.pongListeners) {
listener(data);
}
}
emitClose(code: number, reason = ""): void {
const buffer = Buffer.from(reason, "utf8");
for (const listener of this.closeListeners) {
@@ -282,6 +299,82 @@ describe("mattermost websocket monitor", () => {
vi.useRealTimers();
});
it("continues protocol keepalive when Mattermost responds with pong", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime: testRuntime(),
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
pingIntervalMs: 100,
pongTimeoutMs: 25,
});
const connected = connectOnce();
socket.emitOpen();
await vi.advanceTimersByTimeAsync(100);
expect(socket.pingCalls).toBe(1);
socket.emitPong();
await vi.advanceTimersByTimeAsync(25);
expect(socket.terminateCalls).toBe(0);
await vi.advanceTimersByTimeAsync(75);
expect(socket.pingCalls).toBe(2);
socket.emitClose(1000);
await connected;
vi.useRealTimers();
});
it("terminates silent websocket drops when Mattermost misses pong timeout", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const runtime = testRuntime();
let pollCount = 0;
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime,
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
getBotUpdateAt: async () => {
pollCount++;
return 1000;
},
healthCheckIntervalMs: 100,
pingIntervalMs: 50,
pongTimeoutMs: 25,
});
const connected = connectOnce();
socket.emitOpen();
await vi.advanceTimersByTimeAsync(0);
expect(pollCount).toBe(1);
await vi.advanceTimersByTimeAsync(50);
expect(socket.pingCalls).toBe(1);
expect(socket.terminateCalls).toBe(0);
await vi.advanceTimersByTimeAsync(25);
expect(socket.terminateCalls).toBe(1);
expect(runtime.error).toHaveBeenCalledWith("mattermost websocket pong timeout — reconnecting");
await vi.advanceTimersByTimeAsync(500);
expect(socket.pingCalls).toBe(1);
expect(pollCount).toBe(1);
socket.emitClose(1006);
await connected;
vi.useRealTimers();
});
it("does not terminate when getBotUpdateAt throws", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();

View File

@@ -33,8 +33,10 @@ export type MattermostEventPayload = {
export type MattermostWebSocketLike = {
on(event: "open", listener: () => void): void;
on(event: "message", listener: (data: WebSocket.RawData) => void | Promise<void>): void;
on(event: "pong", listener: (data: Buffer) => void): void;
on(event: "close", listener: (code: number, reason: Buffer) => void): void;
on(event: "error", listener: (err: unknown) => void): void;
ping(): void;
send(data: string): void;
close(): void;
terminate(): void;
@@ -104,6 +106,8 @@ type CreateMattermostConnectOnceOpts = {
*/
getBotUpdateAt?: () => Promise<number>;
healthCheckIntervalMs?: number;
pingIntervalMs?: number;
pongTimeoutMs?: number;
};
export const defaultMattermostWebSocketFactory: MattermostWebSocketFactory = (url) => {
@@ -144,6 +148,8 @@ export function createMattermostConnectOnce(
): () => Promise<void> {
const webSocketFactory = opts.webSocketFactory ?? defaultMattermostWebSocketFactory;
const healthCheckIntervalMs = opts.healthCheckIntervalMs ?? 30_000;
const pingIntervalMs = opts.pingIntervalMs ?? 30_000;
const pongTimeoutMs = opts.pongTimeoutMs ?? 10_000;
return async () => {
const flowId = randomUUID();
const ws = webSocketFactory(opts.wsUrl);
@@ -158,6 +164,9 @@ export function createMattermostConnectOnce(
let healthCheckEnabled = getBotUpdateAt != null;
let healthCheckInFlight = false;
let healthCheckTimer: ReturnType<typeof setTimeout> | undefined;
let protocolKeepaliveEnabled = true;
let protocolPingTimer: ReturnType<typeof setTimeout> | undefined;
let protocolPongTimer: ReturnType<typeof setTimeout> | undefined;
let initialUpdateAt: number | undefined;
const clearTimers = () => {
@@ -165,13 +174,60 @@ export function createMattermostConnectOnce(
clearTimeout(healthCheckTimer);
healthCheckTimer = undefined;
}
if (protocolPingTimer !== undefined) {
clearTimeout(protocolPingTimer);
protocolPingTimer = undefined;
}
if (protocolPongTimer !== undefined) {
clearTimeout(protocolPongTimer);
protocolPongTimer = undefined;
}
};
const stopHealthChecks = () => {
healthCheckEnabled = false;
protocolKeepaliveEnabled = false;
clearTimers();
};
const sendProtocolPing = () => {
if (!protocolKeepaliveEnabled || settled) {
return;
}
if (protocolPongTimer !== undefined) {
clearTimeout(protocolPongTimer);
}
protocolPongTimer = setTimeout(() => {
protocolPongTimer = undefined;
if (!protocolKeepaliveEnabled || settled) {
return;
}
opts.runtime.error?.("mattermost websocket pong timeout — reconnecting");
stopHealthChecks();
ws.terminate();
}, pongTimeoutMs);
try {
ws.ping();
} catch (err) {
if (!protocolKeepaliveEnabled || settled) {
return;
}
opts.runtime.error?.(`mattermost websocket ping failed: ${String(err)}`);
stopHealthChecks();
ws.terminate();
}
};
const scheduleProtocolPing = () => {
if (!protocolKeepaliveEnabled || settled || protocolPingTimer !== undefined) {
return;
}
protocolPingTimer = setTimeout(() => {
protocolPingTimer = undefined;
sendProtocolPing();
}, pingIntervalMs);
};
const scheduleHealthCheck = () => {
if (!getBotUpdateAt || !healthCheckEnabled || settled || healthCheckInFlight) {
return;
@@ -263,6 +319,7 @@ export function createMattermostConnectOnce(
meta: { subsystem: "mattermost-websocket", eventType: "authentication_challenge" },
});
ws.send(authPayload);
scheduleProtocolPing();
// Periodically check if the bot account was modified (e.g. disable/enable).
// After such a cycle the WebSocket silently stops delivering events even
@@ -274,6 +331,14 @@ export function createMattermostConnectOnce(
}
});
ws.on("pong", () => {
if (protocolPongTimer !== undefined) {
clearTimeout(protocolPongTimer);
protocolPongTimer = undefined;
}
scheduleProtocolPing();
});
ws.on("message", async (data) => {
captureWsEvent({
url: opts.wsUrl,

View File

@@ -5,14 +5,16 @@ class FakeWebSocket {
public readonly sent: string[] = [];
private readonly openListeners: Array<() => void> = [];
private readonly messageListeners: Array<(data: Buffer) => void | Promise<void>> = [];
private readonly pongListeners: Array<(data: Buffer) => void> = [];
private readonly closeListeners: Array<(code: number, reason: Buffer) => void> = [];
private readonly errorListeners: Array<(err: unknown) => void> = [];
on(event: "open", listener: () => void): void;
on(event: "message", listener: (data: Buffer) => void | Promise<void>): void;
on(event: "pong", listener: (data: Buffer) => void): void;
on(event: "close", listener: (code: number, reason: Buffer) => void): void;
on(event: "error", listener: (err: unknown) => void): void;
on(event: "open" | "message" | "close" | "error", listener: unknown): void {
on(event: "open" | "message" | "pong" | "close" | "error", listener: unknown): void {
if (event === "open") {
this.openListeners.push(listener as () => void);
return;
@@ -21,6 +23,10 @@ class FakeWebSocket {
this.messageListeners.push(listener as (data: Buffer) => void | Promise<void>);
return;
}
if (event === "pong") {
this.pongListeners.push(listener as (data: Buffer) => void);
return;
}
if (event === "close") {
this.closeListeners.push(listener as (code: number, reason: Buffer) => void);
return;
@@ -32,6 +38,8 @@ class FakeWebSocket {
this.sent.push(data);
}
ping(): void {}
close(): void {}
terminate(): void {}