From c441dcd47a05983318b9e2fdf34669948cadf49e Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 16:38:58 +0100 Subject: [PATCH] fix(telegram): avoid leaking thread binding persist cleanup --- .../telegram/src/thread-bindings.test.ts | 54 +++++++++++++++++++ extensions/telegram/src/thread-bindings.ts | 5 +- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/extensions/telegram/src/thread-bindings.test.ts b/extensions/telegram/src/thread-bindings.test.ts index bbb8497c853..669121f6cc0 100644 --- a/extensions/telegram/src/thread-bindings.test.ts +++ b/extensions/telegram/src/thread-bindings.test.ts @@ -5,6 +5,20 @@ import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runti import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { importFreshModule } from "../../../test/helpers/import-fresh.js"; + +const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn()); + +vi.mock("openclaw/plugin-sdk/json-store", async () => { + const actual = await vi.importActual( + "openclaw/plugin-sdk/json-store", + ); + writeJsonFileAtomicallyMock.mockImplementation(actual.writeJsonFileAtomically); + return { + ...actual, + writeJsonFileAtomically: writeJsonFileAtomicallyMock, + }; +}); + import { __testing, createTelegramThreadBindingManager, @@ -16,6 +30,7 @@ describe("telegram thread bindings", () => { let stateDirOverride: string | undefined; beforeEach(async () => { + writeJsonFileAtomicallyMock.mockClear(); await __testing.resetTelegramThreadBindingsForTests(); }); @@ -313,4 +328,43 @@ describe("telegram thread bindings", () => { }; expect(persisted.bindings?.[0]?.idleTimeoutMs).toBe(90_000); }); + + it("does not leak unhandled rejections when a persist write fails", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + const unhandled: unknown[] = []; + const onUnhandledRejection = (reason: unknown) => { + unhandled.push(reason); + }; + process.on("unhandledRejection", onUnhandledRejection); + + try { + const manager = createTelegramThreadBindingManager({ + accountId: "persist-failure", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-persist-failure", + targetKind: "subagent", + conversation: { + channel: "telegram", + accountId: "persist-failure", + conversationId: "-100200300:topic:100", + }, + }); + + writeJsonFileAtomicallyMock.mockImplementationOnce(async () => { + throw new Error("persist boom"); + }); + manager.touchConversation("-100200300:topic:100"); + + await __testing.resetTelegramThreadBindingsForTests(); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(unhandled).toEqual([]); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + } + }); }); diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index e2b338f44c6..38e40eea017 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -345,11 +345,12 @@ function enqueuePersistBindings(params: { await persistBindingsToDisk(params); }); getThreadBindingsState().persistQueueByAccountId.set(params.accountId, next); - void next.finally(() => { + const cleanup = () => { if (getThreadBindingsState().persistQueueByAccountId.get(params.accountId) === next) { getThreadBindingsState().persistQueueByAccountId.delete(params.accountId); } - }); + }; + next.then(cleanup, cleanup); return next; }