mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:00:43 +00:00
fix(feishu): repair WebSocket reconnect and heartbeat config (#72411)
This commit is contained in:
@@ -114,10 +114,8 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins: share package entrypoint resolution between install and discovery, reject mismatched `runtimeExtensions`, and cache bundled runtime-dependency manifest reads during scans. Thanks @vincentkoc.
|
||||
- WhatsApp/Web: keep quiet but healthy linked-device sessions connected by basing the watchdog on WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Fixes #70678; carries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis.
|
||||
- Discord/gateway: count failed health-monitor restart attempts toward cooldown and hourly caps, and evict stale account lifecycle state during channel reloads so repeated Discord gateway recovery cannot loop on old status. Fixes #38596. (#40413) Thanks @jellyAI-dev and @vashquez.
|
||||
|
||||
### Fixes
|
||||
|
||||
- TTS/BlueBubbles: pre-transcode synthesized MP3 audio to opus-in-CAF (mono, 24 kHz — validated against macOS 15.x Messages.app's native voice-memo CAF descriptor) on macOS hosts before handing the file to BlueBubbles, so iMessage renders the result as a native voice-memo bubble with proper duration and waveform UI instead of a plain file attachment. Adds an opt-in `tts.voice.preferAudioFileFormat` channel capability and a magic-byte sniff for the CAF container so the host-local-media validator (which uses `file-type` and didn't recognize CAF natively) can verify the pre-transcoded buffer. Channels that don't opt in are unaffected. (#72586) Fixes #72506. Thanks @omarshahine.
|
||||
- Feishu: retry WebSocket startup failures with monitor-owned backoff while preserving SDK-local heartbeat defaults, so persistent-connection startup failures no longer leave the monitor hung. Fixes #68766; related #42354 and #55532. Thanks @alex-xuweilong, @120106835, @sirfengyu, and @tianhaocui.
|
||||
|
||||
## 2026.4.26
|
||||
|
||||
|
||||
@@ -6,6 +6,14 @@ afterEach(() => {
|
||||
});
|
||||
|
||||
describe("waitForAbortableDelay", () => {
|
||||
it("resolves false immediately when already aborted", async () => {
|
||||
vi.useFakeTimers();
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await expect(waitForAbortableDelay(60_000, abortController.signal)).resolves.toBe(false);
|
||||
});
|
||||
|
||||
it("resolves false immediately when aborted during backoff", async () => {
|
||||
vi.useFakeTimers();
|
||||
const abortController = new AbortController();
|
||||
|
||||
@@ -70,17 +70,35 @@ export function waitForAbortableDelay(
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const handleAbort = () => {
|
||||
clearTimeout(timer);
|
||||
resolve(false);
|
||||
let settled = false;
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
let handleAbort: (() => void) | undefined;
|
||||
|
||||
const finish = (value: boolean) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
if (handleAbort) {
|
||||
abortSignal?.removeEventListener("abort", handleAbort);
|
||||
}
|
||||
resolve(value);
|
||||
};
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
abortSignal?.removeEventListener("abort", handleAbort);
|
||||
resolve(true);
|
||||
}, delayMs);
|
||||
timer.unref?.();
|
||||
handleAbort = () => {
|
||||
finish(false);
|
||||
};
|
||||
|
||||
abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
||||
if (abortSignal?.aborted) {
|
||||
finish(false);
|
||||
return;
|
||||
}
|
||||
|
||||
timer = setTimeout(() => finish(true), delayMs);
|
||||
timer.unref?.();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -119,9 +119,9 @@ function readCallOptions(
|
||||
return isRecord(call) ? call : {};
|
||||
}
|
||||
|
||||
function firstWsClientOptions(): { agent?: unknown } {
|
||||
function firstWsClientOptions(): { agent?: unknown; wsConfig?: unknown } {
|
||||
const options = readCallOptions(wsClientCtorMock, 0);
|
||||
return { agent: options.agent };
|
||||
return { agent: options.agent, wsConfig: options.wsConfig };
|
||||
}
|
||||
|
||||
beforeAll(async () => {
|
||||
@@ -345,6 +345,16 @@ describe("createFeishuClient HTTP timeout", () => {
|
||||
});
|
||||
|
||||
describe("createFeishuWSClient proxy handling", () => {
|
||||
it("passes heartbeat wsConfig defaults to Lark.WSClient", async () => {
|
||||
await createFeishuWSClient(baseAccount);
|
||||
|
||||
const options = firstWsClientOptions();
|
||||
expect(options.wsConfig).toEqual({
|
||||
PingInterval: 30,
|
||||
PingTimeout: 3,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not set a ws proxy agent when proxy env is absent", async () => {
|
||||
await createFeishuWSClient(baseAccount);
|
||||
|
||||
|
||||
@@ -15,6 +15,11 @@ export { pluginVersion };
|
||||
const FEISHU_USER_AGENT = `openclaw-feishu-builtin/${pluginVersion}/${process.platform}`;
|
||||
export { FEISHU_USER_AGENT };
|
||||
|
||||
const FEISHU_WS_CONFIG = {
|
||||
PingInterval: 30,
|
||||
PingTimeout: 3,
|
||||
} as const;
|
||||
|
||||
/** User-Agent header value for all Feishu API requests. */
|
||||
export function getFeishuUserAgent(): string {
|
||||
return FEISHU_USER_AGENT;
|
||||
@@ -232,7 +237,10 @@ export async function createFeishuWSClient(account: ResolvedFeishuAccount): Prom
|
||||
appSecret,
|
||||
domain: resolveDomain(domain),
|
||||
loggerLevel: feishuClientSdk.LoggerLevel.info,
|
||||
wsConfig: FEISHU_WS_CONFIG,
|
||||
...(agent ? { agent } : {}),
|
||||
} as ConstructorParameters<typeof feishuClientSdk.WSClient>[0] & {
|
||||
wsConfig: typeof FEISHU_WS_CONFIG;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ function createWsClient(): MockWsClient {
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
stopFeishuMonitorState();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
@@ -79,6 +80,98 @@ describe("feishu websocket cleanup", () => {
|
||||
expect(botNames.has(accountId)).toBe(false);
|
||||
});
|
||||
|
||||
it("retries with backoff after websocket start rejects", async () => {
|
||||
vi.useFakeTimers();
|
||||
const failedClient = createWsClient();
|
||||
failedClient.start.mockRejectedValueOnce(
|
||||
new Error("connect failed\nAuthorization: Bearer token_abc appSecret=secret_abc"),
|
||||
);
|
||||
const recoveredClient = createWsClient();
|
||||
createFeishuWSClientMock
|
||||
.mockResolvedValueOnce(failedClient)
|
||||
.mockResolvedValueOnce(recoveredClient);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
exit: vi.fn(),
|
||||
};
|
||||
const accountId = "retry";
|
||||
|
||||
const monitorPromise = monitorWebSocket({
|
||||
account: createAccount(accountId),
|
||||
accountId,
|
||||
runtime,
|
||||
abortSignal: abortController.signal,
|
||||
eventDispatcher: {} as never,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(failedClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(failedClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.has(accountId)).toBe(false);
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(recoveredClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.get(accountId)).toBe(recoveredClient);
|
||||
});
|
||||
|
||||
abortController.abort();
|
||||
await monitorPromise;
|
||||
|
||||
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(2);
|
||||
expect(recoveredClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("WebSocket start failed, retrying in 1000ms"),
|
||||
);
|
||||
const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? "");
|
||||
expect(errorMessage).not.toContain("\n");
|
||||
expect(errorMessage).not.toContain("token_abc");
|
||||
expect(errorMessage).not.toContain("secret_abc");
|
||||
expect(errorMessage).toContain("Authorization: Bearer [redacted]");
|
||||
expect(errorMessage).toContain("appSecret=[redacted]");
|
||||
});
|
||||
|
||||
it("redacts websocket close errors during abort cleanup", async () => {
|
||||
const wsClient = createWsClient();
|
||||
wsClient.close.mockImplementationOnce(() => {
|
||||
throw new Error("close failed\naccess_token=secret_token");
|
||||
});
|
||||
createFeishuWSClientMock.mockReturnValue(wsClient);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
exit: vi.fn(),
|
||||
};
|
||||
|
||||
const monitorPromise = monitorWebSocket({
|
||||
account: createAccount("close-error"),
|
||||
accountId: "close-error",
|
||||
runtime,
|
||||
abortSignal: abortController.signal,
|
||||
eventDispatcher: {} as never,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(wsClient.start).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
abortController.abort();
|
||||
await monitorPromise;
|
||||
|
||||
const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? "");
|
||||
expect(errorMessage).toContain("error closing WebSocket client");
|
||||
expect(errorMessage).toContain("access_token=[redacted]");
|
||||
expect(errorMessage).not.toContain("\n");
|
||||
expect(errorMessage).not.toContain("secret_token");
|
||||
});
|
||||
|
||||
it("closes targeted websocket clients during stop cleanup", () => {
|
||||
const alphaClient = createWsClient();
|
||||
const betaClient = createWsClient();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import crypto from "node:crypto";
|
||||
import * as http from "node:http";
|
||||
import * as Lark from "@larksuiteoapi/node-sdk";
|
||||
import { waitForAbortableDelay } from "./async.js";
|
||||
import { createFeishuWSClient } from "./client.js";
|
||||
import {
|
||||
applyBasicWebhookRequestGuards,
|
||||
@@ -29,6 +30,10 @@ export type MonitorTransportParams = {
|
||||
eventDispatcher: Lark.EventDispatcher;
|
||||
};
|
||||
|
||||
const FEISHU_WS_RECONNECT_INITIAL_DELAY_MS = 1_000;
|
||||
const FEISHU_WS_RECONNECT_MAX_DELAY_MS = 30_000;
|
||||
const FEISHU_WS_LOG_ERROR_MAX_LENGTH = 500;
|
||||
|
||||
function isFeishuWebhookPayload(value: unknown): value is Record<string, unknown> {
|
||||
return !!value && typeof value === "object" && !Array.isArray(value);
|
||||
}
|
||||
@@ -82,6 +87,79 @@ function respondText(res: http.ServerResponse, statusCode: number, body: string)
|
||||
res.end(body);
|
||||
}
|
||||
|
||||
function getFeishuWsReconnectDelayMs(attempt: number): number {
|
||||
return Math.min(
|
||||
FEISHU_WS_RECONNECT_INITIAL_DELAY_MS * 2 ** Math.max(0, attempt - 1),
|
||||
FEISHU_WS_RECONNECT_MAX_DELAY_MS,
|
||||
);
|
||||
}
|
||||
|
||||
function formatFeishuWsErrorForLog(err: unknown): string {
|
||||
const raw = err instanceof Error ? err.message || err.name : String(err);
|
||||
const singleLine = Array.from(raw, (char) => {
|
||||
const code = char.charCodeAt(0);
|
||||
return code <= 31 || code === 127 ? " " : char;
|
||||
}).join("");
|
||||
const redacted = singleLine
|
||||
.replace(/:\/\/[^:@/\s]+:[^@/\s]+@/g, "://[redacted]@")
|
||||
.replace(/\b(authorization\s*[:=]\s*Bearer\s+)[^\s,;]+/gi, "$1[redacted]")
|
||||
.replace(/\b(Bearer\s+)[A-Za-z0-9._~+/-]+=*/g, "$1[redacted]")
|
||||
.replace(
|
||||
/\b((?:app[_-]?secret|tenant[_-]?access[_-]?token|access[_-]?token|refresh[_-]?token|token|secret|password)\s*[:=]\s*)[^\s&;,]+/gi,
|
||||
"$1[redacted]",
|
||||
)
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
|
||||
if (!redacted) {
|
||||
return "unknown error";
|
||||
}
|
||||
if (redacted.length <= FEISHU_WS_LOG_ERROR_MAX_LENGTH) {
|
||||
return redacted;
|
||||
}
|
||||
return `${redacted.slice(0, FEISHU_WS_LOG_ERROR_MAX_LENGTH)}...`;
|
||||
}
|
||||
|
||||
function cleanupFeishuWsClient(params: {
|
||||
accountId: string;
|
||||
wsClient?: Lark.WSClient;
|
||||
error: (message: string) => void;
|
||||
}): void {
|
||||
const { accountId, wsClient, error } = params;
|
||||
if (wsClient) {
|
||||
try {
|
||||
wsClient.close();
|
||||
} catch (err) {
|
||||
error(
|
||||
`feishu[${accountId}]: error closing WebSocket client: ${formatFeishuWsErrorForLog(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
wsClients.delete(accountId);
|
||||
botOpenIds.delete(accountId);
|
||||
botNames.delete(accountId);
|
||||
}
|
||||
|
||||
function waitForFeishuWsAbort(abortSignal?: AbortSignal): Promise<void> {
|
||||
if (abortSignal?.aborted) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return new Promise((resolve) => {
|
||||
if (!abortSignal) {
|
||||
// No external lifecycle owner was provided, so keep the SDK-managed connection alive.
|
||||
return;
|
||||
}
|
||||
const handleAbort = () => {
|
||||
abortSignal.removeEventListener("abort", handleAbort);
|
||||
resolve();
|
||||
};
|
||||
abortSignal.addEventListener("abort", handleAbort, { once: true });
|
||||
if (abortSignal.aborted) {
|
||||
handleAbort();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export async function monitorWebSocket({
|
||||
account,
|
||||
accountId,
|
||||
@@ -91,53 +169,46 @@ export async function monitorWebSocket({
|
||||
}: MonitorTransportParams): Promise<void> {
|
||||
const log = runtime?.log ?? console.log;
|
||||
const error = runtime?.error ?? console.error;
|
||||
log(`feishu[${accountId}]: starting WebSocket connection...`);
|
||||
|
||||
const wsClient = await createFeishuWSClient(account);
|
||||
wsClients.set(accountId, wsClient);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
let cleanedUp = false;
|
||||
|
||||
const cleanup = () => {
|
||||
if (cleanedUp) {
|
||||
return;
|
||||
}
|
||||
cleanedUp = true;
|
||||
abortSignal?.removeEventListener("abort", handleAbort);
|
||||
try {
|
||||
wsClient.close();
|
||||
} catch (err) {
|
||||
error(`feishu[${accountId}]: error closing WebSocket client: ${String(err)}`);
|
||||
} finally {
|
||||
wsClients.delete(accountId);
|
||||
botOpenIds.delete(accountId);
|
||||
botNames.delete(accountId);
|
||||
}
|
||||
};
|
||||
|
||||
function handleAbort() {
|
||||
log(`feishu[${accountId}]: abort signal received, stopping`);
|
||||
cleanup();
|
||||
resolve();
|
||||
}
|
||||
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
if (abortSignal?.aborted) {
|
||||
cleanup();
|
||||
resolve();
|
||||
return;
|
||||
break;
|
||||
}
|
||||
|
||||
abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
||||
|
||||
let wsClient: Lark.WSClient | undefined;
|
||||
try {
|
||||
void wsClient.start({ eventDispatcher });
|
||||
log(`feishu[${accountId}]: starting WebSocket connection...`);
|
||||
wsClient = await createFeishuWSClient(account);
|
||||
if (abortSignal?.aborted) {
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error });
|
||||
break;
|
||||
}
|
||||
wsClients.set(accountId, wsClient);
|
||||
await wsClient.start({ eventDispatcher });
|
||||
attempt = 0;
|
||||
log(`feishu[${accountId}]: WebSocket client started`);
|
||||
await waitForFeishuWsAbort(abortSignal);
|
||||
log(`feishu[${accountId}]: abort signal received, stopping`);
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error });
|
||||
return;
|
||||
} catch (err) {
|
||||
cleanup();
|
||||
reject(err);
|
||||
cleanupFeishuWsClient({ accountId, wsClient, error });
|
||||
if (abortSignal?.aborted) {
|
||||
break;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
const delayMs = getFeishuWsReconnectDelayMs(attempt);
|
||||
error(
|
||||
`feishu[${accountId}]: WebSocket start failed, retrying in ${delayMs}ms: ${formatFeishuWsErrorForLog(err)}`,
|
||||
);
|
||||
const shouldRetry = await waitForAbortableDelay(delayMs, abortSignal);
|
||||
if (!shouldRetry) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function monitorWebhook({
|
||||
|
||||
Reference in New Issue
Block a user