diff --git a/CHANGELOG.md b/CHANGELOG.md index d0ec3c18a0d..5ad081f8775 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -476,6 +476,7 @@ Docs: https://docs.openclaw.ai - Installer/Linux: warn before switching an unwritable npm global prefix to `~/.npm-global`, then tell users to run future global updates with `npm i -g openclaw@latest` without `sudo` so npm keeps using the redirected user prefix. Fixes #44365; carries forward #50479. Thanks @Sayeem3051. - Gateway/plugins: enable the native `require()` fast path on Windows for bundled plugin modules so plugin loading uses `require()` instead of Jiti's transform pipeline, reducing startup from ~39s to ~2s on typical 6-plugin setups. Fixes #68656. (#74173) Thanks @galiniliev. - macOS app: detect stale Gateway TLS certificate pins, automatically repair trusted Tailscale Serve rotations, and surface paired-but-disconnected Mac companion nodes so partial Gateway connections no longer look healthy. Thanks @guti. +- Feishu: recreate WebSocket clients with monitor-owned backoff only after SDK reconnect exhaustion, preserving heartbeat defaults and shutdown cleanup without treating recoverable SDK callback errors as terminal, so persistent connections recover without manual gateway restart. Fixes #52618; duplicate evidence #59753; related #55532, #68766, #72411, and #73739. Thanks @vincentkoc, @schumilin, @alex-xuweilong, @120106835, @sirfengyu, and @tianhaocui. ## 2026.4.27 diff --git a/extensions/feishu/src/client.test.ts b/extensions/feishu/src/client.test.ts index 3d2eccc7ff8..cd33482f4c6 100644 --- a/extensions/feishu/src/client.test.ts +++ b/extensions/feishu/src/client.test.ts @@ -119,9 +119,23 @@ function readCallOptions( return isRecord(call) ? call : {}; } -function firstWsClientOptions(): { agent?: unknown; wsConfig?: unknown } { +function firstWsClientOptions(): { + agent?: unknown; + wsConfig?: unknown; + onError?: unknown; + onReady?: unknown; + onReconnected?: unknown; + onReconnecting?: unknown; +} { const options = readCallOptions(wsClientCtorMock, 0); - return { agent: options.agent, wsConfig: options.wsConfig }; + return { + agent: options.agent, + wsConfig: options.wsConfig, + onError: options.onError, + onReady: options.onReady, + onReconnected: options.onReconnected, + onReconnecting: options.onReconnecting, + }; } beforeAll(async () => { @@ -355,6 +369,30 @@ describe("createFeishuWSClient proxy handling", () => { }); }); + it("passes lifecycle callbacks while preserving heartbeat wsConfig defaults", async () => { + const onError = vi.fn(); + const onReady = vi.fn(); + const onReconnected = vi.fn(); + const onReconnecting = vi.fn(); + + await createFeishuWSClient(baseAccount, { + onError, + onReady, + onReconnected, + onReconnecting, + }); + + const options = firstWsClientOptions(); + expect(options.onError).toBe(onError); + expect(options.onReady).toBe(onReady); + expect(options.onReconnected).toBe(onReconnected); + expect(options.onReconnecting).toBe(onReconnecting); + expect(options.wsConfig).toEqual({ + PingInterval: 30, + PingTimeout: 3, + }); + }); + it("does not set a ws proxy agent when proxy env is absent", async () => { await createFeishuWSClient(baseAccount); diff --git a/extensions/feishu/src/client.ts b/extensions/feishu/src/client.ts index 7622dc3630b..d83424be8c4 100644 --- a/extensions/feishu/src/client.ts +++ b/extensions/feishu/src/client.ts @@ -220,11 +220,19 @@ export function createFeishuClient(creds: FeishuClientCredentials): Lark.Client return client; } +export type FeishuWsClientCallbacks = Pick< + ConstructorParameters[0], + "onError" | "onReady" | "onReconnected" | "onReconnecting" +>; + /** * Create a Feishu WebSocket client for an account. * Note: WSClient is not cached since each call creates a new connection. */ -export async function createFeishuWSClient(account: ResolvedFeishuAccount): Promise { +export async function createFeishuWSClient( + account: ResolvedFeishuAccount, + callbacks: FeishuWsClientCallbacks = {}, +): Promise { const { accountId, appId, appSecret, domain } = account; if (!appId || !appSecret) { @@ -236,6 +244,7 @@ export async function createFeishuWSClient(account: ResolvedFeishuAccount): Prom appId, appSecret, domain: resolveDomain(domain), + ...callbacks, loggerLevel: feishuClientSdk.LoggerLevel.info, wsConfig: FEISHU_WS_CONFIG, ...(agent ? { agent } : {}), diff --git a/extensions/feishu/src/monitor.cleanup.test.ts b/extensions/feishu/src/monitor.cleanup.test.ts index b840546d04c..fd81fd58cc7 100644 --- a/extensions/feishu/src/monitor.cleanup.test.ts +++ b/extensions/feishu/src/monitor.cleanup.test.ts @@ -136,6 +136,165 @@ describe("feishu websocket cleanup", () => { expect(errorMessage).toContain("appSecret=[redacted]"); }); + it("recreates the websocket client after sdk reconnect exhaustion", async () => { + vi.useFakeTimers(); + const exhaustedClient = createWsClient(); + const recoveredClient = createWsClient(); + createFeishuWSClientMock + .mockResolvedValueOnce(exhaustedClient) + .mockResolvedValueOnce(recoveredClient); + + const abortController = new AbortController(); + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + const accountId = "exhausted"; + botOpenIds.set(accountId, "ou_exhausted"); + botNames.set(accountId, "Exhausted"); + + const monitorPromise = monitorWebSocket({ + account: createAccount(accountId), + accountId, + runtime, + abortSignal: abortController.signal, + eventDispatcher: {} as never, + }); + + await vi.waitFor(() => { + expect(exhaustedClient.start).toHaveBeenCalledTimes(1); + expect(wsClients.get(accountId)).toBe(exhaustedClient); + }); + + const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as + | { onError?: (err: Error) => void } + | undefined; + callbacks?.onError?.( + new Error("WebSocket reconnect exhausted after 3 attempts\nBearer token_abc"), + ); + + await vi.waitFor(() => { + expect(exhaustedClient.close).toHaveBeenCalledTimes(1); + expect(wsClients.has(accountId)).toBe(false); + }); + expect(botOpenIds.get(accountId)).toBe("ou_exhausted"); + expect(botNames.get(accountId)).toBe("Exhausted"); + + await vi.advanceTimersByTimeAsync(1_000); + + await vi.waitFor(() => { + expect(recoveredClient.start).toHaveBeenCalledTimes(1); + expect(wsClients.get(accountId)).toBe(recoveredClient); + }); + + abortController.abort(); + await monitorPromise; + + expect(createFeishuWSClientMock).toHaveBeenCalledTimes(2); + expect(recoveredClient.close).toHaveBeenCalledTimes(1); + expect(botOpenIds.has(accountId)).toBe(false); + expect(botNames.has(accountId)).toBe(false); + const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? ""); + expect(errorMessage).toContain("WebSocket connection ended, recreating client in 1000ms"); + expect(errorMessage).toContain("Bearer [redacted]"); + expect(errorMessage).not.toContain("\n"); + expect(errorMessage).not.toContain("token_abc"); + }); + + it("keeps the websocket client alive after recoverable sdk callback errors", async () => { + vi.useFakeTimers(); + const wsClient = createWsClient(); + createFeishuWSClientMock.mockResolvedValueOnce(wsClient); + + const abortController = new AbortController(); + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + const accountId = "recoverable-callback"; + + const monitorPromise = monitorWebSocket({ + account: createAccount(accountId), + accountId, + runtime, + abortSignal: abortController.signal, + eventDispatcher: {} as never, + }); + + await vi.waitFor(() => { + expect(wsClient.start).toHaveBeenCalledTimes(1); + expect(wsClients.get(accountId)).toBe(wsClient); + }); + + const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as + | { onError?: (err: Error) => void } + | undefined; + callbacks?.onError?.(new Error("temporary callback failure\nBearer token_abc")); + + await vi.advanceTimersByTimeAsync(1_000); + + expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1); + expect(wsClient.close).not.toHaveBeenCalled(); + expect(wsClients.get(accountId)).toBe(wsClient); + const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? ""); + expect(errorMessage).toContain("WebSocket SDK reported recoverable error"); + expect(errorMessage).toContain("Bearer [redacted]"); + expect(errorMessage).not.toContain("\n"); + expect(errorMessage).not.toContain("token_abc"); + + abortController.abort(); + await monitorPromise; + + expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1); + expect(wsClient.close).toHaveBeenCalledTimes(1); + }); + + it("clears identity without recreating a websocket when aborted during reconnect backoff", async () => { + vi.useFakeTimers(); + const exhaustedClient = createWsClient(); + createFeishuWSClientMock.mockResolvedValueOnce(exhaustedClient); + + const abortController = new AbortController(); + const accountId = "abort-backoff"; + botOpenIds.set(accountId, "ou_abort"); + botNames.set(accountId, "Abort"); + + const monitorPromise = monitorWebSocket({ + account: createAccount(accountId), + accountId, + runtime: { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }, + abortSignal: abortController.signal, + eventDispatcher: {} as never, + }); + + await vi.waitFor(() => { + expect(exhaustedClient.start).toHaveBeenCalledTimes(1); + }); + + const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as + | { onError?: (err: Error) => void } + | undefined; + callbacks?.onError?.(new Error("WebSocket reconnect exhausted after 3 attempts")); + + await vi.waitFor(() => { + expect(exhaustedClient.close).toHaveBeenCalledTimes(1); + }); + + abortController.abort(); + await monitorPromise; + + expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1); + expect(wsClients.has(accountId)).toBe(false); + expect(botOpenIds.has(accountId)).toBe(false); + expect(botNames.has(accountId)).toBe(false); + }); + it("redacts websocket close errors during abort cleanup", async () => { const wsClient = createWsClient(); wsClient.close.mockImplementationOnce(() => { diff --git a/extensions/feishu/src/monitor.transport.ts b/extensions/feishu/src/monitor.transport.ts index 90f3c1e4d82..1d158d8a0e3 100644 --- a/extensions/feishu/src/monitor.transport.ts +++ b/extensions/feishu/src/monitor.transport.ts @@ -33,6 +33,9 @@ export type MonitorTransportParams = { const FEISHU_WS_RECONNECT_INITIAL_DELAY_MS = 1_000; const FEISHU_WS_RECONNECT_MAX_DELAY_MS = 30_000; const FEISHU_WS_LOG_ERROR_MAX_LENGTH = 500; +const FEISHU_WS_RECONNECT_EXHAUSTED_RE = /^WebSocket reconnect exhausted after \d+ attempts?/; +const FEISHU_WS_AUTORECONNECT_DISABLED_ERROR = + "WebSocket connect failed and autoReconnect is disabled"; function isFeishuWebhookPayload(value: unknown): value is Record { return !!value && typeof value === "object" && !Array.isArray(value); @@ -120,12 +123,21 @@ function formatFeishuWsErrorForLog(err: unknown): string { return `${redacted.slice(0, FEISHU_WS_LOG_ERROR_MAX_LENGTH)}...`; } +function isFeishuWsTerminalError(err: Error): boolean { + const message = err.message.trim(); + return ( + FEISHU_WS_RECONNECT_EXHAUSTED_RE.test(message) || + message.startsWith(FEISHU_WS_AUTORECONNECT_DISABLED_ERROR) + ); +} + function cleanupFeishuWsClient(params: { accountId: string; wsClient?: Lark.WSClient; error: (message: string) => void; + clearIdentity: boolean; }): void { - const { accountId, wsClient, error } = params; + const { accountId, wsClient, error, clearIdentity } = params; if (wsClient) { try { wsClient.close(); @@ -136,27 +148,43 @@ function cleanupFeishuWsClient(params: { } } wsClients.delete(accountId); - botOpenIds.delete(accountId); - botNames.delete(accountId); + if (clearIdentity) { + botOpenIds.delete(accountId); + botNames.delete(accountId); + } } -function waitForFeishuWsAbort(abortSignal?: AbortSignal): Promise { - if (abortSignal?.aborted) { - return Promise.resolve(); +function waitForFeishuWsCycleEnd(params: { + abortSignal?: AbortSignal; + terminalError: Promise; +}): Promise<"abort" | Error> { + if (params.abortSignal?.aborted) { + return Promise.resolve("abort"); } + return new Promise((resolve) => { - if (!abortSignal) { - // No external lifecycle owner was provided, so keep the SDK-managed connection alive. + let settled = false; + let handleAbort: (() => void) | undefined; + + const finish = (result: "abort" | Error) => { + if (settled) { + return; + } + settled = true; + if (handleAbort) { + params.abortSignal?.removeEventListener("abort", handleAbort); + } + resolve(result); + }; + + handleAbort = () => finish("abort"); + params.abortSignal?.addEventListener("abort", handleAbort, { once: true }); + if (params.abortSignal?.aborted) { + finish("abort"); return; } - const handleAbort = () => { - abortSignal.removeEventListener("abort", handleAbort); - resolve(); - }; - abortSignal.addEventListener("abort", handleAbort, { once: true }); - if (abortSignal.aborted) { - handleAbort(); - } + + void params.terminalError.then(finish); }); } @@ -178,22 +206,55 @@ export async function monitorWebSocket({ let wsClient: Lark.WSClient | undefined; try { + let reportTerminalError: (err: Error) => void = () => {}; + const terminalError = new Promise((resolve) => { + reportTerminalError = resolve; + }); + const handleWsError = (err: Error) => { + if (isFeishuWsTerminalError(err)) { + reportTerminalError(err); + return; + } + + error( + `feishu[${accountId}]: WebSocket SDK reported recoverable error: ${formatFeishuWsErrorForLog(err)}`, + ); + }; log(`feishu[${accountId}]: starting WebSocket connection...`); - wsClient = await createFeishuWSClient(account); + wsClient = await createFeishuWSClient(account, { + onError: handleWsError, + }); if (abortSignal?.aborted) { - cleanupFeishuWsClient({ accountId, wsClient, error }); + cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: true }); break; } wsClients.set(accountId, wsClient); await wsClient.start({ eventDispatcher }); attempt = 0; log(`feishu[${accountId}]: WebSocket client started`); - await waitForFeishuWsAbort(abortSignal); - log(`feishu[${accountId}]: abort signal received, stopping`); - cleanupFeishuWsClient({ accountId, wsClient, error }); - return; + const cycleEnd = await waitForFeishuWsCycleEnd({ abortSignal, terminalError }); + if (cycleEnd === "abort") { + log(`feishu[${accountId}]: abort signal received, stopping`); + cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: true }); + return; + } + + cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: false }); + if (abortSignal?.aborted) { + break; + } + + attempt += 1; + const delayMs = getFeishuWsReconnectDelayMs(attempt); + error( + `feishu[${accountId}]: WebSocket connection ended, recreating client in ${delayMs}ms: ${formatFeishuWsErrorForLog(cycleEnd)}`, + ); + const shouldRetry = await waitForAbortableDelay(delayMs, abortSignal); + if (!shouldRetry) { + break; + } } catch (err) { - cleanupFeishuWsClient({ accountId, wsClient, error }); + cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: false }); if (abortSignal?.aborted) { break; } @@ -209,6 +270,7 @@ export async function monitorWebSocket({ } } } + cleanupFeishuWsClient({ accountId, wsClient: undefined, error, clearIdentity: true }); } export async function monitorWebhook({