fix(clickclack): clear gateway status after poll failures

This commit is contained in:
Vincent Koc
2026-06-17 16:40:43 +02:00
parent 45d7167ea2
commit 405df6f166
2 changed files with 73 additions and 48 deletions

View File

@@ -190,4 +190,24 @@ describe("ClickClack gateway", () => {
abort.abort();
await run;
});
it("clears running status when backlog polling fails", async () => {
mocks.client.events.mockRejectedValue(new Error("clickclack unavailable"));
const abort = new AbortController();
const ctx = createGatewayContext(abort.signal);
await expect(startClickClackGatewayAccount(ctx)).rejects.toThrow("clickclack unavailable");
expect(ctx.setStatus).toHaveBeenCalledWith({
accountId: "default",
running: true,
configured: true,
enabled: true,
baseUrl: "https://clickclack.example",
});
expect(ctx.setStatus).toHaveBeenLastCalledWith({
accountId: "default",
running: false,
});
});
});

View File

@@ -146,62 +146,67 @@ export async function startClickClackGatewayAccount(
});
let afterCursor = "";
let initialized = false;
while (!ctx.abortSignal.aborted) {
const backlog = await client.events(workspaceId, afterCursor);
if (!initialized) {
// First pass establishes the cursor without replaying historical backlog
// into fresh gateway sessions.
for (const event of backlog) {
afterCursor = event.cursor || afterCursor;
}
initialized = true;
} else {
for (const event of backlog) {
afterCursor = event.cursor || afterCursor;
await processEvent({
account,
config: ctx.cfg,
client,
event,
botUserId: account.botUserId,
});
}
}
const socket = client.websocket(workspaceId, afterCursor);
await new Promise<void>((resolve, reject) => {
const abort = () => {
socket.close();
resolve();
};
ctx.abortSignal.addEventListener("abort", abort, { once: true });
socket.on("message", (data) => {
void (async () => {
const event = parseSocketEvent(data);
if (!event) {
ctx.log?.warn?.(`[${account.accountId}] skipped malformed ClickClack websocket event`);
return;
}
try {
while (!ctx.abortSignal.aborted) {
const backlog = await client.events(workspaceId, afterCursor);
if (!initialized) {
// First pass establishes the cursor without replaying historical backlog
// into fresh gateway sessions.
for (const event of backlog) {
afterCursor = event.cursor || afterCursor;
}
initialized = true;
} else {
for (const event of backlog) {
afterCursor = event.cursor || afterCursor;
await processEvent({
account,
config: ctx.cfg,
client,
event,
botUserId: account.botUserId ?? "",
botUserId: account.botUserId,
});
})().catch(reject);
});
socket.on("close", () => {
ctx.abortSignal.removeEventListener("abort", abort);
resolve();
});
socket.on("error", reject);
});
if (!ctx.abortSignal.aborted) {
await new Promise((resolve) => {
setTimeout(resolve, account.reconnectMs);
}
}
const socket = client.websocket(workspaceId, afterCursor);
await new Promise<void>((resolve, reject) => {
const abort = () => {
socket.close();
resolve();
};
ctx.abortSignal.addEventListener("abort", abort, { once: true });
socket.on("message", (data) => {
void (async () => {
const event = parseSocketEvent(data);
if (!event) {
ctx.log?.warn?.(
`[${account.accountId}] skipped malformed ClickClack websocket event`,
);
return;
}
afterCursor = event.cursor || afterCursor;
await processEvent({
account,
config: ctx.cfg,
client,
event,
botUserId: account.botUserId ?? "",
});
})().catch(reject);
});
socket.on("close", () => {
ctx.abortSignal.removeEventListener("abort", abort);
resolve();
});
socket.on("error", reject);
});
if (!ctx.abortSignal.aborted) {
await new Promise((resolve) => {
setTimeout(resolve, account.reconnectMs);
});
}
}
} finally {
ctx.setStatus({ accountId: account.accountId, running: false });
}
ctx.setStatus({ accountId: account.accountId, running: false });
}