diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a23e79c8a6..1ec253ac6b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,10 +114,8 @@ Docs: https://docs.openclaw.ai - Plugins: share package entrypoint resolution between install and discovery, reject mismatched `runtimeExtensions`, and cache bundled runtime-dependency manifest reads during scans. Thanks @vincentkoc. - WhatsApp/Web: keep quiet but healthy linked-device sessions connected by basing the watchdog on WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Fixes #70678; carries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis. - Discord/gateway: count failed health-monitor restart attempts toward cooldown and hourly caps, and evict stale account lifecycle state during channel reloads so repeated Discord gateway recovery cannot loop on old status. Fixes #38596. (#40413) Thanks @jellyAI-dev and @vashquez. - -### Fixes - - TTS/BlueBubbles: pre-transcode synthesized MP3 audio to opus-in-CAF (mono, 24 kHz — validated against macOS 15.x Messages.app's native voice-memo CAF descriptor) on macOS hosts before handing the file to BlueBubbles, so iMessage renders the result as a native voice-memo bubble with proper duration and waveform UI instead of a plain file attachment. Adds an opt-in `tts.voice.preferAudioFileFormat` channel capability and a magic-byte sniff for the CAF container so the host-local-media validator (which uses `file-type` and didn't recognize CAF natively) can verify the pre-transcoded buffer. Channels that don't opt in are unaffected. (#72586) Fixes #72506. Thanks @omarshahine. +- Feishu: retry WebSocket startup failures with monitor-owned backoff while preserving SDK-local heartbeat defaults, so persistent-connection startup failures no longer leave the monitor hung. Fixes #68766; related #42354 and #55532. Thanks @alex-xuweilong, @120106835, @sirfengyu, and @tianhaocui. ## 2026.4.26 diff --git a/extensions/feishu/src/async.test.ts b/extensions/feishu/src/async.test.ts index a59763d4bc9..1053b940d09 100644 --- a/extensions/feishu/src/async.test.ts +++ b/extensions/feishu/src/async.test.ts @@ -6,6 +6,14 @@ afterEach(() => { }); describe("waitForAbortableDelay", () => { + it("resolves false immediately when already aborted", async () => { + vi.useFakeTimers(); + const abortController = new AbortController(); + abortController.abort(); + + await expect(waitForAbortableDelay(60_000, abortController.signal)).resolves.toBe(false); + }); + it("resolves false immediately when aborted during backoff", async () => { vi.useFakeTimers(); const abortController = new AbortController(); diff --git a/extensions/feishu/src/async.ts b/extensions/feishu/src/async.ts index f04978d0bd1..6a175849ca9 100644 --- a/extensions/feishu/src/async.ts +++ b/extensions/feishu/src/async.ts @@ -70,17 +70,35 @@ export function waitForAbortableDelay( } return new Promise((resolve) => { - const handleAbort = () => { - clearTimeout(timer); - resolve(false); + let settled = false; + let timer: ReturnType | undefined; + let handleAbort: (() => void) | undefined; + + const finish = (value: boolean) => { + if (settled) { + return; + } + settled = true; + if (timer) { + clearTimeout(timer); + } + if (handleAbort) { + abortSignal?.removeEventListener("abort", handleAbort); + } + resolve(value); }; - const timer = setTimeout(() => { - abortSignal?.removeEventListener("abort", handleAbort); - resolve(true); - }, delayMs); - timer.unref?.(); + handleAbort = () => { + finish(false); + }; abortSignal?.addEventListener("abort", handleAbort, { once: true }); + if (abortSignal?.aborted) { + finish(false); + return; + } + + timer = setTimeout(() => finish(true), delayMs); + timer.unref?.(); }); } diff --git a/extensions/feishu/src/client.test.ts b/extensions/feishu/src/client.test.ts index 80172b70744..3d2eccc7ff8 100644 --- a/extensions/feishu/src/client.test.ts +++ b/extensions/feishu/src/client.test.ts @@ -119,9 +119,9 @@ function readCallOptions( return isRecord(call) ? call : {}; } -function firstWsClientOptions(): { agent?: unknown } { +function firstWsClientOptions(): { agent?: unknown; wsConfig?: unknown } { const options = readCallOptions(wsClientCtorMock, 0); - return { agent: options.agent }; + return { agent: options.agent, wsConfig: options.wsConfig }; } beforeAll(async () => { @@ -345,6 +345,16 @@ describe("createFeishuClient HTTP timeout", () => { }); describe("createFeishuWSClient proxy handling", () => { + it("passes heartbeat wsConfig defaults to Lark.WSClient", async () => { + await createFeishuWSClient(baseAccount); + + const options = firstWsClientOptions(); + expect(options.wsConfig).toEqual({ + PingInterval: 30, + PingTimeout: 3, + }); + }); + it("does not set a ws proxy agent when proxy env is absent", async () => { await createFeishuWSClient(baseAccount); diff --git a/extensions/feishu/src/client.ts b/extensions/feishu/src/client.ts index dc8da9e4c21..7622dc3630b 100644 --- a/extensions/feishu/src/client.ts +++ b/extensions/feishu/src/client.ts @@ -15,6 +15,11 @@ export { pluginVersion }; const FEISHU_USER_AGENT = `openclaw-feishu-builtin/${pluginVersion}/${process.platform}`; export { FEISHU_USER_AGENT }; +const FEISHU_WS_CONFIG = { + PingInterval: 30, + PingTimeout: 3, +} as const; + /** User-Agent header value for all Feishu API requests. */ export function getFeishuUserAgent(): string { return FEISHU_USER_AGENT; @@ -232,7 +237,10 @@ export async function createFeishuWSClient(account: ResolvedFeishuAccount): Prom appSecret, domain: resolveDomain(domain), loggerLevel: feishuClientSdk.LoggerLevel.info, + wsConfig: FEISHU_WS_CONFIG, ...(agent ? { agent } : {}), + } as ConstructorParameters[0] & { + wsConfig: typeof FEISHU_WS_CONFIG; }); } diff --git a/extensions/feishu/src/monitor.cleanup.test.ts b/extensions/feishu/src/monitor.cleanup.test.ts index ed103db2f9a..b840546d04c 100644 --- a/extensions/feishu/src/monitor.cleanup.test.ts +++ b/extensions/feishu/src/monitor.cleanup.test.ts @@ -38,6 +38,7 @@ function createWsClient(): MockWsClient { } afterEach(() => { + vi.useRealTimers(); stopFeishuMonitorState(); vi.clearAllMocks(); }); @@ -79,6 +80,98 @@ describe("feishu websocket cleanup", () => { expect(botNames.has(accountId)).toBe(false); }); + it("retries with backoff after websocket start rejects", async () => { + vi.useFakeTimers(); + const failedClient = createWsClient(); + failedClient.start.mockRejectedValueOnce( + new Error("connect failed\nAuthorization: Bearer token_abc appSecret=secret_abc"), + ); + const recoveredClient = createWsClient(); + createFeishuWSClientMock + .mockResolvedValueOnce(failedClient) + .mockResolvedValueOnce(recoveredClient); + + const abortController = new AbortController(); + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + const accountId = "retry"; + + const monitorPromise = monitorWebSocket({ + account: createAccount(accountId), + accountId, + runtime, + abortSignal: abortController.signal, + eventDispatcher: {} as never, + }); + + await vi.waitFor(() => { + expect(failedClient.start).toHaveBeenCalledTimes(1); + expect(failedClient.close).toHaveBeenCalledTimes(1); + expect(wsClients.has(accountId)).toBe(false); + }); + + await vi.advanceTimersByTimeAsync(1_000); + + await vi.waitFor(() => { + expect(recoveredClient.start).toHaveBeenCalledTimes(1); + expect(wsClients.get(accountId)).toBe(recoveredClient); + }); + + abortController.abort(); + await monitorPromise; + + expect(createFeishuWSClientMock).toHaveBeenCalledTimes(2); + expect(recoveredClient.close).toHaveBeenCalledTimes(1); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("WebSocket start failed, retrying in 1000ms"), + ); + const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? ""); + expect(errorMessage).not.toContain("\n"); + expect(errorMessage).not.toContain("token_abc"); + expect(errorMessage).not.toContain("secret_abc"); + expect(errorMessage).toContain("Authorization: Bearer [redacted]"); + expect(errorMessage).toContain("appSecret=[redacted]"); + }); + + it("redacts websocket close errors during abort cleanup", async () => { + const wsClient = createWsClient(); + wsClient.close.mockImplementationOnce(() => { + throw new Error("close failed\naccess_token=secret_token"); + }); + createFeishuWSClientMock.mockReturnValue(wsClient); + + const abortController = new AbortController(); + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + + const monitorPromise = monitorWebSocket({ + account: createAccount("close-error"), + accountId: "close-error", + runtime, + abortSignal: abortController.signal, + eventDispatcher: {} as never, + }); + + await vi.waitFor(() => { + expect(wsClient.start).toHaveBeenCalledTimes(1); + }); + + abortController.abort(); + await monitorPromise; + + const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? ""); + expect(errorMessage).toContain("error closing WebSocket client"); + expect(errorMessage).toContain("access_token=[redacted]"); + expect(errorMessage).not.toContain("\n"); + expect(errorMessage).not.toContain("secret_token"); + }); + it("closes targeted websocket clients during stop cleanup", () => { const alphaClient = createWsClient(); const betaClient = createWsClient(); diff --git a/extensions/feishu/src/monitor.transport.ts b/extensions/feishu/src/monitor.transport.ts index 5f617145466..90f3c1e4d82 100644 --- a/extensions/feishu/src/monitor.transport.ts +++ b/extensions/feishu/src/monitor.transport.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import * as http from "node:http"; import * as Lark from "@larksuiteoapi/node-sdk"; +import { waitForAbortableDelay } from "./async.js"; import { createFeishuWSClient } from "./client.js"; import { applyBasicWebhookRequestGuards, @@ -29,6 +30,10 @@ export type MonitorTransportParams = { eventDispatcher: Lark.EventDispatcher; }; +const FEISHU_WS_RECONNECT_INITIAL_DELAY_MS = 1_000; +const FEISHU_WS_RECONNECT_MAX_DELAY_MS = 30_000; +const FEISHU_WS_LOG_ERROR_MAX_LENGTH = 500; + function isFeishuWebhookPayload(value: unknown): value is Record { return !!value && typeof value === "object" && !Array.isArray(value); } @@ -82,6 +87,79 @@ function respondText(res: http.ServerResponse, statusCode: number, body: string) res.end(body); } +function getFeishuWsReconnectDelayMs(attempt: number): number { + return Math.min( + FEISHU_WS_RECONNECT_INITIAL_DELAY_MS * 2 ** Math.max(0, attempt - 1), + FEISHU_WS_RECONNECT_MAX_DELAY_MS, + ); +} + +function formatFeishuWsErrorForLog(err: unknown): string { + const raw = err instanceof Error ? err.message || err.name : String(err); + const singleLine = Array.from(raw, (char) => { + const code = char.charCodeAt(0); + return code <= 31 || code === 127 ? " " : char; + }).join(""); + const redacted = singleLine + .replace(/:\/\/[^:@/\s]+:[^@/\s]+@/g, "://[redacted]@") + .replace(/\b(authorization\s*[:=]\s*Bearer\s+)[^\s,;]+/gi, "$1[redacted]") + .replace(/\b(Bearer\s+)[A-Za-z0-9._~+/-]+=*/g, "$1[redacted]") + .replace( + /\b((?:app[_-]?secret|tenant[_-]?access[_-]?token|access[_-]?token|refresh[_-]?token|token|secret|password)\s*[:=]\s*)[^\s&;,]+/gi, + "$1[redacted]", + ) + .replace(/\s+/g, " ") + .trim(); + + if (!redacted) { + return "unknown error"; + } + if (redacted.length <= FEISHU_WS_LOG_ERROR_MAX_LENGTH) { + return redacted; + } + return `${redacted.slice(0, FEISHU_WS_LOG_ERROR_MAX_LENGTH)}...`; +} + +function cleanupFeishuWsClient(params: { + accountId: string; + wsClient?: Lark.WSClient; + error: (message: string) => void; +}): void { + const { accountId, wsClient, error } = params; + if (wsClient) { + try { + wsClient.close(); + } catch (err) { + error( + `feishu[${accountId}]: error closing WebSocket client: ${formatFeishuWsErrorForLog(err)}`, + ); + } + } + wsClients.delete(accountId); + botOpenIds.delete(accountId); + botNames.delete(accountId); +} + +function waitForFeishuWsAbort(abortSignal?: AbortSignal): Promise { + if (abortSignal?.aborted) { + return Promise.resolve(); + } + return new Promise((resolve) => { + if (!abortSignal) { + // No external lifecycle owner was provided, so keep the SDK-managed connection alive. + return; + } + const handleAbort = () => { + abortSignal.removeEventListener("abort", handleAbort); + resolve(); + }; + abortSignal.addEventListener("abort", handleAbort, { once: true }); + if (abortSignal.aborted) { + handleAbort(); + } + }); +} + export async function monitorWebSocket({ account, accountId, @@ -91,53 +169,46 @@ export async function monitorWebSocket({ }: MonitorTransportParams): Promise { const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - log(`feishu[${accountId}]: starting WebSocket connection...`); - - const wsClient = await createFeishuWSClient(account); - wsClients.set(accountId, wsClient); - - return new Promise((resolve, reject) => { - let cleanedUp = false; - - const cleanup = () => { - if (cleanedUp) { - return; - } - cleanedUp = true; - abortSignal?.removeEventListener("abort", handleAbort); - try { - wsClient.close(); - } catch (err) { - error(`feishu[${accountId}]: error closing WebSocket client: ${String(err)}`); - } finally { - wsClients.delete(accountId); - botOpenIds.delete(accountId); - botNames.delete(accountId); - } - }; - - function handleAbort() { - log(`feishu[${accountId}]: abort signal received, stopping`); - cleanup(); - resolve(); - } + let attempt = 0; + while (true) { if (abortSignal?.aborted) { - cleanup(); - resolve(); - return; + break; } - abortSignal?.addEventListener("abort", handleAbort, { once: true }); - + let wsClient: Lark.WSClient | undefined; try { - void wsClient.start({ eventDispatcher }); + log(`feishu[${accountId}]: starting WebSocket connection...`); + wsClient = await createFeishuWSClient(account); + if (abortSignal?.aborted) { + cleanupFeishuWsClient({ accountId, wsClient, error }); + break; + } + wsClients.set(accountId, wsClient); + await wsClient.start({ eventDispatcher }); + attempt = 0; log(`feishu[${accountId}]: WebSocket client started`); + await waitForFeishuWsAbort(abortSignal); + log(`feishu[${accountId}]: abort signal received, stopping`); + cleanupFeishuWsClient({ accountId, wsClient, error }); + return; } catch (err) { - cleanup(); - reject(err); + cleanupFeishuWsClient({ accountId, wsClient, error }); + if (abortSignal?.aborted) { + break; + } + + attempt += 1; + const delayMs = getFeishuWsReconnectDelayMs(attempt); + error( + `feishu[${accountId}]: WebSocket start failed, retrying in ${delayMs}ms: ${formatFeishuWsErrorForLog(err)}`, + ); + const shouldRetry = await waitForAbortableDelay(delayMs, abortSignal); + if (!shouldRetry) { + break; + } } - }); + } } export async function monitorWebhook({