mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:00:43 +00:00
fix(feishu): recover WebSocket after SDK retry exhaustion (#73739)
* fix(feishu): recover WebSocket after SDK retry exhaustion * fix(feishu): recover WebSocket after SDK retry exhaustion --------- Co-authored-by: openclaw-clownfish[bot] <280122609+openclaw-clownfish[bot]@users.noreply.github.com>
This commit is contained in:
@@ -476,6 +476,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Installer/Linux: warn before switching an unwritable npm global prefix to `~/.npm-global`, then tell users to run future global updates with `npm i -g openclaw@latest` without `sudo` so npm keeps using the redirected user prefix. Fixes #44365; carries forward #50479. Thanks @Sayeem3051.
|
||||
- Gateway/plugins: enable the native `require()` fast path on Windows for bundled plugin modules so plugin loading uses `require()` instead of Jiti's transform pipeline, reducing startup from ~39s to ~2s on typical 6-plugin setups. Fixes #68656. (#74173) Thanks @galiniliev.
|
||||
- macOS app: detect stale Gateway TLS certificate pins, automatically repair trusted Tailscale Serve rotations, and surface paired-but-disconnected Mac companion nodes so partial Gateway connections no longer look healthy. Thanks @guti.
|
||||
- Feishu: recreate WebSocket clients with monitor-owned backoff only after SDK reconnect exhaustion, preserving heartbeat defaults and shutdown cleanup without treating recoverable SDK callback errors as terminal, so persistent connections recover without manual gateway restart. Fixes #52618; duplicate evidence #59753; related #55532, #68766, #72411, and #73739. Thanks @vincentkoc, @schumilin, @alex-xuweilong, @120106835, @sirfengyu, and @tianhaocui.
|
||||
|
||||
## 2026.4.27
|
||||
|
||||
|
||||
@@ -119,9 +119,23 @@ function readCallOptions(
|
||||
return isRecord(call) ? call : {};
|
||||
}
|
||||
|
||||
function firstWsClientOptions(): { agent?: unknown; wsConfig?: unknown } {
|
||||
function firstWsClientOptions(): {
|
||||
agent?: unknown;
|
||||
wsConfig?: unknown;
|
||||
onError?: unknown;
|
||||
onReady?: unknown;
|
||||
onReconnected?: unknown;
|
||||
onReconnecting?: unknown;
|
||||
} {
|
||||
const options = readCallOptions(wsClientCtorMock, 0);
|
||||
return { agent: options.agent, wsConfig: options.wsConfig };
|
||||
return {
|
||||
agent: options.agent,
|
||||
wsConfig: options.wsConfig,
|
||||
onError: options.onError,
|
||||
onReady: options.onReady,
|
||||
onReconnected: options.onReconnected,
|
||||
onReconnecting: options.onReconnecting,
|
||||
};
|
||||
}
|
||||
|
||||
beforeAll(async () => {
|
||||
@@ -355,6 +369,30 @@ describe("createFeishuWSClient proxy handling", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("passes lifecycle callbacks while preserving heartbeat wsConfig defaults", async () => {
|
||||
const onError = vi.fn();
|
||||
const onReady = vi.fn();
|
||||
const onReconnected = vi.fn();
|
||||
const onReconnecting = vi.fn();
|
||||
|
||||
await createFeishuWSClient(baseAccount, {
|
||||
onError,
|
||||
onReady,
|
||||
onReconnected,
|
||||
onReconnecting,
|
||||
});
|
||||
|
||||
const options = firstWsClientOptions();
|
||||
expect(options.onError).toBe(onError);
|
||||
expect(options.onReady).toBe(onReady);
|
||||
expect(options.onReconnected).toBe(onReconnected);
|
||||
expect(options.onReconnecting).toBe(onReconnecting);
|
||||
expect(options.wsConfig).toEqual({
|
||||
PingInterval: 30,
|
||||
PingTimeout: 3,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not set a ws proxy agent when proxy env is absent", async () => {
|
||||
await createFeishuWSClient(baseAccount);
|
||||
|
||||
|
||||
@@ -220,11 +220,19 @@ export function createFeishuClient(creds: FeishuClientCredentials): Lark.Client
|
||||
return client;
|
||||
}
|
||||
|
||||
export type FeishuWsClientCallbacks = Pick<
|
||||
ConstructorParameters<typeof feishuClientSdk.WSClient>[0],
|
||||
"onError" | "onReady" | "onReconnected" | "onReconnecting"
|
||||
>;
|
||||
|
||||
/**
|
||||
* Create a Feishu WebSocket client for an account.
|
||||
* Note: WSClient is not cached since each call creates a new connection.
|
||||
*/
|
||||
export async function createFeishuWSClient(account: ResolvedFeishuAccount): Promise<Lark.WSClient> {
|
||||
export async function createFeishuWSClient(
|
||||
account: ResolvedFeishuAccount,
|
||||
callbacks: FeishuWsClientCallbacks = {},
|
||||
): Promise<Lark.WSClient> {
|
||||
const { accountId, appId, appSecret, domain } = account;
|
||||
|
||||
if (!appId || !appSecret) {
|
||||
@@ -236,6 +244,7 @@ export async function createFeishuWSClient(account: ResolvedFeishuAccount): Prom
|
||||
appId,
|
||||
appSecret,
|
||||
domain: resolveDomain(domain),
|
||||
...callbacks,
|
||||
loggerLevel: feishuClientSdk.LoggerLevel.info,
|
||||
wsConfig: FEISHU_WS_CONFIG,
|
||||
...(agent ? { agent } : {}),
|
||||
|
||||
@@ -136,6 +136,165 @@ describe("feishu websocket cleanup", () => {
|
||||
expect(errorMessage).toContain("appSecret=[redacted]");
|
||||
});
|
||||
|
||||
it("recreates the websocket client after sdk reconnect exhaustion", async () => {
|
||||
vi.useFakeTimers();
|
||||
const exhaustedClient = createWsClient();
|
||||
const recoveredClient = createWsClient();
|
||||
createFeishuWSClientMock
|
||||
.mockResolvedValueOnce(exhaustedClient)
|
||||
.mockResolvedValueOnce(recoveredClient);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
exit: vi.fn(),
|
||||
};
|
||||
const accountId = "exhausted";
|
||||
botOpenIds.set(accountId, "ou_exhausted");
|
||||
botNames.set(accountId, "Exhausted");
|
||||
|
||||
const monitorPromise = monitorWebSocket({
|
||||
account: createAccount(accountId),
|
||||
accountId,
|
||||
runtime,
|
||||
abortSignal: abortController.signal,
|
||||
eventDispatcher: {} as never,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(exhaustedClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.get(accountId)).toBe(exhaustedClient);
|
||||
});
|
||||
|
||||
const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as
|
||||
| { onError?: (err: Error) => void }
|
||||
| undefined;
|
||||
callbacks?.onError?.(
|
||||
new Error("WebSocket reconnect exhausted after 3 attempts\nBearer token_abc"),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(exhaustedClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.has(accountId)).toBe(false);
|
||||
});
|
||||
expect(botOpenIds.get(accountId)).toBe("ou_exhausted");
|
||||
expect(botNames.get(accountId)).toBe("Exhausted");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(recoveredClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.get(accountId)).toBe(recoveredClient);
|
||||
});
|
||||
|
||||
abortController.abort();
|
||||
await monitorPromise;
|
||||
|
||||
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(2);
|
||||
expect(recoveredClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(botOpenIds.has(accountId)).toBe(false);
|
||||
expect(botNames.has(accountId)).toBe(false);
|
||||
const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? "");
|
||||
expect(errorMessage).toContain("WebSocket connection ended, recreating client in 1000ms");
|
||||
expect(errorMessage).toContain("Bearer [redacted]");
|
||||
expect(errorMessage).not.toContain("\n");
|
||||
expect(errorMessage).not.toContain("token_abc");
|
||||
});
|
||||
|
||||
it("keeps the websocket client alive after recoverable sdk callback errors", async () => {
|
||||
vi.useFakeTimers();
|
||||
const wsClient = createWsClient();
|
||||
createFeishuWSClientMock.mockResolvedValueOnce(wsClient);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
exit: vi.fn(),
|
||||
};
|
||||
const accountId = "recoverable-callback";
|
||||
|
||||
const monitorPromise = monitorWebSocket({
|
||||
account: createAccount(accountId),
|
||||
accountId,
|
||||
runtime,
|
||||
abortSignal: abortController.signal,
|
||||
eventDispatcher: {} as never,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(wsClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.get(accountId)).toBe(wsClient);
|
||||
});
|
||||
|
||||
const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as
|
||||
| { onError?: (err: Error) => void }
|
||||
| undefined;
|
||||
callbacks?.onError?.(new Error("temporary callback failure\nBearer token_abc"));
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
|
||||
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(wsClient.close).not.toHaveBeenCalled();
|
||||
expect(wsClients.get(accountId)).toBe(wsClient);
|
||||
const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? "");
|
||||
expect(errorMessage).toContain("WebSocket SDK reported recoverable error");
|
||||
expect(errorMessage).toContain("Bearer [redacted]");
|
||||
expect(errorMessage).not.toContain("\n");
|
||||
expect(errorMessage).not.toContain("token_abc");
|
||||
|
||||
abortController.abort();
|
||||
await monitorPromise;
|
||||
|
||||
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(wsClient.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("clears identity without recreating a websocket when aborted during reconnect backoff", async () => {
|
||||
vi.useFakeTimers();
|
||||
const exhaustedClient = createWsClient();
|
||||
createFeishuWSClientMock.mockResolvedValueOnce(exhaustedClient);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const accountId = "abort-backoff";
|
||||
botOpenIds.set(accountId, "ou_abort");
|
||||
botNames.set(accountId, "Abort");
|
||||
|
||||
const monitorPromise = monitorWebSocket({
|
||||
account: createAccount(accountId),
|
||||
accountId,
|
||||
runtime: {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
exit: vi.fn(),
|
||||
},
|
||||
abortSignal: abortController.signal,
|
||||
eventDispatcher: {} as never,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(exhaustedClient.start).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as
|
||||
| { onError?: (err: Error) => void }
|
||||
| undefined;
|
||||
callbacks?.onError?.(new Error("WebSocket reconnect exhausted after 3 attempts"));
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(exhaustedClient.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
abortController.abort();
|
||||
await monitorPromise;
|
||||
|
||||
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.has(accountId)).toBe(false);
|
||||
expect(botOpenIds.has(accountId)).toBe(false);
|
||||
expect(botNames.has(accountId)).toBe(false);
|
||||
});
|
||||
|
||||
it("redacts websocket close errors during abort cleanup", async () => {
|
||||
const wsClient = createWsClient();
|
||||
wsClient.close.mockImplementationOnce(() => {
|
||||
|
||||
@@ -33,6 +33,9 @@ export type MonitorTransportParams = {
|
||||
const FEISHU_WS_RECONNECT_INITIAL_DELAY_MS = 1_000;
|
||||
const FEISHU_WS_RECONNECT_MAX_DELAY_MS = 30_000;
|
||||
const FEISHU_WS_LOG_ERROR_MAX_LENGTH = 500;
|
||||
const FEISHU_WS_RECONNECT_EXHAUSTED_RE = /^WebSocket reconnect exhausted after \d+ attempts?/;
|
||||
const FEISHU_WS_AUTORECONNECT_DISABLED_ERROR =
|
||||
"WebSocket connect failed and autoReconnect is disabled";
|
||||
|
||||
function isFeishuWebhookPayload(value: unknown): value is Record<string, unknown> {
|
||||
return !!value && typeof value === "object" && !Array.isArray(value);
|
||||
@@ -120,12 +123,21 @@ function formatFeishuWsErrorForLog(err: unknown): string {
|
||||
return `${redacted.slice(0, FEISHU_WS_LOG_ERROR_MAX_LENGTH)}...`;
|
||||
}
|
||||
|
||||
function isFeishuWsTerminalError(err: Error): boolean {
|
||||
const message = err.message.trim();
|
||||
return (
|
||||
FEISHU_WS_RECONNECT_EXHAUSTED_RE.test(message) ||
|
||||
message.startsWith(FEISHU_WS_AUTORECONNECT_DISABLED_ERROR)
|
||||
);
|
||||
}
|
||||
|
||||
function cleanupFeishuWsClient(params: {
|
||||
accountId: string;
|
||||
wsClient?: Lark.WSClient;
|
||||
error: (message: string) => void;
|
||||
clearIdentity: boolean;
|
||||
}): void {
|
||||
const { accountId, wsClient, error } = params;
|
||||
const { accountId, wsClient, error, clearIdentity } = params;
|
||||
if (wsClient) {
|
||||
try {
|
||||
wsClient.close();
|
||||
@@ -136,27 +148,43 @@ function cleanupFeishuWsClient(params: {
|
||||
}
|
||||
}
|
||||
wsClients.delete(accountId);
|
||||
botOpenIds.delete(accountId);
|
||||
botNames.delete(accountId);
|
||||
if (clearIdentity) {
|
||||
botOpenIds.delete(accountId);
|
||||
botNames.delete(accountId);
|
||||
}
|
||||
}
|
||||
|
||||
function waitForFeishuWsAbort(abortSignal?: AbortSignal): Promise<void> {
|
||||
if (abortSignal?.aborted) {
|
||||
return Promise.resolve();
|
||||
function waitForFeishuWsCycleEnd(params: {
|
||||
abortSignal?: AbortSignal;
|
||||
terminalError: Promise<Error>;
|
||||
}): Promise<"abort" | Error> {
|
||||
if (params.abortSignal?.aborted) {
|
||||
return Promise.resolve("abort");
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
if (!abortSignal) {
|
||||
// No external lifecycle owner was provided, so keep the SDK-managed connection alive.
|
||||
let settled = false;
|
||||
let handleAbort: (() => void) | undefined;
|
||||
|
||||
const finish = (result: "abort" | Error) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
if (handleAbort) {
|
||||
params.abortSignal?.removeEventListener("abort", handleAbort);
|
||||
}
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
handleAbort = () => finish("abort");
|
||||
params.abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
||||
if (params.abortSignal?.aborted) {
|
||||
finish("abort");
|
||||
return;
|
||||
}
|
||||
const handleAbort = () => {
|
||||
abortSignal.removeEventListener("abort", handleAbort);
|
||||
resolve();
|
||||
};
|
||||
abortSignal.addEventListener("abort", handleAbort, { once: true });
|
||||
if (abortSignal.aborted) {
|
||||
handleAbort();
|
||||
}
|
||||
|
||||
void params.terminalError.then(finish);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -178,22 +206,55 @@ export async function monitorWebSocket({
|
||||
|
||||
let wsClient: Lark.WSClient | undefined;
|
||||
try {
|
||||
let reportTerminalError: (err: Error) => void = () => {};
|
||||
const terminalError = new Promise<Error>((resolve) => {
|
||||
reportTerminalError = resolve;
|
||||
});
|
||||
const handleWsError = (err: Error) => {
|
||||
if (isFeishuWsTerminalError(err)) {
|
||||
reportTerminalError(err);
|
||||
return;
|
||||
}
|
||||
|
||||
error(
|
||||
`feishu[${accountId}]: WebSocket SDK reported recoverable error: ${formatFeishuWsErrorForLog(err)}`,
|
||||
);
|
||||
};
|
||||
log(`feishu[${accountId}]: starting WebSocket connection...`);
|
||||
wsClient = await createFeishuWSClient(account);
|
||||
wsClient = await createFeishuWSClient(account, {
|
||||
onError: handleWsError,
|
||||
});
|
||||
if (abortSignal?.aborted) {
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error });
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: true });
|
||||
break;
|
||||
}
|
||||
wsClients.set(accountId, wsClient);
|
||||
await wsClient.start({ eventDispatcher });
|
||||
attempt = 0;
|
||||
log(`feishu[${accountId}]: WebSocket client started`);
|
||||
await waitForFeishuWsAbort(abortSignal);
|
||||
log(`feishu[${accountId}]: abort signal received, stopping`);
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error });
|
||||
return;
|
||||
const cycleEnd = await waitForFeishuWsCycleEnd({ abortSignal, terminalError });
|
||||
if (cycleEnd === "abort") {
|
||||
log(`feishu[${accountId}]: abort signal received, stopping`);
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: true });
|
||||
return;
|
||||
}
|
||||
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: false });
|
||||
if (abortSignal?.aborted) {
|
||||
break;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
const delayMs = getFeishuWsReconnectDelayMs(attempt);
|
||||
error(
|
||||
`feishu[${accountId}]: WebSocket connection ended, recreating client in ${delayMs}ms: ${formatFeishuWsErrorForLog(cycleEnd)}`,
|
||||
);
|
||||
const shouldRetry = await waitForAbortableDelay(delayMs, abortSignal);
|
||||
if (!shouldRetry) {
|
||||
break;
|
||||
}
|
||||
} catch (err) {
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error });
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: false });
|
||||
if (abortSignal?.aborted) {
|
||||
break;
|
||||
}
|
||||
@@ -209,6 +270,7 @@ export async function monitorWebSocket({
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanupFeishuWsClient({ accountId, wsClient: undefined, error, clearIdentity: true });
|
||||
}
|
||||
|
||||
export async function monitorWebhook({
|
||||
|
||||
Reference in New Issue
Block a user