From 405df6f1663c52855424324685884b9de05b9fc2 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 17 Jun 2026 16:40:43 +0200 Subject: [PATCH] fix(clickclack): clear gateway status after poll failures --- extensions/clickclack/src/gateway.test.ts | 20 +++++ extensions/clickclack/src/gateway.ts | 101 ++++++++++++---------- 2 files changed, 73 insertions(+), 48 deletions(-) diff --git a/extensions/clickclack/src/gateway.test.ts b/extensions/clickclack/src/gateway.test.ts index af26c84ed36..7e1742f72d4 100644 --- a/extensions/clickclack/src/gateway.test.ts +++ b/extensions/clickclack/src/gateway.test.ts @@ -190,4 +190,24 @@ describe("ClickClack gateway", () => { abort.abort(); await run; }); + + it("clears running status when backlog polling fails", async () => { + mocks.client.events.mockRejectedValue(new Error("clickclack unavailable")); + const abort = new AbortController(); + const ctx = createGatewayContext(abort.signal); + + await expect(startClickClackGatewayAccount(ctx)).rejects.toThrow("clickclack unavailable"); + + expect(ctx.setStatus).toHaveBeenCalledWith({ + accountId: "default", + running: true, + configured: true, + enabled: true, + baseUrl: "https://clickclack.example", + }); + expect(ctx.setStatus).toHaveBeenLastCalledWith({ + accountId: "default", + running: false, + }); + }); }); diff --git a/extensions/clickclack/src/gateway.ts b/extensions/clickclack/src/gateway.ts index 230b58cc356..8005a8f4fac 100644 --- a/extensions/clickclack/src/gateway.ts +++ b/extensions/clickclack/src/gateway.ts @@ -146,62 +146,67 @@ export async function startClickClackGatewayAccount( }); let afterCursor = ""; let initialized = false; - while (!ctx.abortSignal.aborted) { - const backlog = await client.events(workspaceId, afterCursor); - if (!initialized) { - // First pass establishes the cursor without replaying historical backlog - // into fresh gateway sessions. - for (const event of backlog) { - afterCursor = event.cursor || afterCursor; - } - initialized = true; - } else { - for (const event of backlog) { - afterCursor = event.cursor || afterCursor; - await processEvent({ - account, - config: ctx.cfg, - client, - event, - botUserId: account.botUserId, - }); - } - } - const socket = client.websocket(workspaceId, afterCursor); - await new Promise((resolve, reject) => { - const abort = () => { - socket.close(); - resolve(); - }; - ctx.abortSignal.addEventListener("abort", abort, { once: true }); - socket.on("message", (data) => { - void (async () => { - const event = parseSocketEvent(data); - if (!event) { - ctx.log?.warn?.(`[${account.accountId}] skipped malformed ClickClack websocket event`); - return; - } + try { + while (!ctx.abortSignal.aborted) { + const backlog = await client.events(workspaceId, afterCursor); + if (!initialized) { + // First pass establishes the cursor without replaying historical backlog + // into fresh gateway sessions. + for (const event of backlog) { + afterCursor = event.cursor || afterCursor; + } + initialized = true; + } else { + for (const event of backlog) { afterCursor = event.cursor || afterCursor; await processEvent({ account, config: ctx.cfg, client, event, - botUserId: account.botUserId ?? "", + botUserId: account.botUserId, }); - })().catch(reject); - }); - socket.on("close", () => { - ctx.abortSignal.removeEventListener("abort", abort); - resolve(); - }); - socket.on("error", reject); - }); - if (!ctx.abortSignal.aborted) { - await new Promise((resolve) => { - setTimeout(resolve, account.reconnectMs); + } + } + const socket = client.websocket(workspaceId, afterCursor); + await new Promise((resolve, reject) => { + const abort = () => { + socket.close(); + resolve(); + }; + ctx.abortSignal.addEventListener("abort", abort, { once: true }); + socket.on("message", (data) => { + void (async () => { + const event = parseSocketEvent(data); + if (!event) { + ctx.log?.warn?.( + `[${account.accountId}] skipped malformed ClickClack websocket event`, + ); + return; + } + afterCursor = event.cursor || afterCursor; + await processEvent({ + account, + config: ctx.cfg, + client, + event, + botUserId: account.botUserId ?? "", + }); + })().catch(reject); + }); + socket.on("close", () => { + ctx.abortSignal.removeEventListener("abort", abort); + resolve(); + }); + socket.on("error", reject); }); + if (!ctx.abortSignal.aborted) { + await new Promise((resolve) => { + setTimeout(resolve, account.reconnectMs); + }); + } } + } finally { + ctx.setStatus({ accountId: account.accountId, running: false }); } - ctx.setStatus({ accountId: account.accountId, running: false }); }