From 8205de84a9b9d46a1d06282a6dbae8671f55fd1a Mon Sep 17 00:00:00 2001 From: Chinar Amrutkar Date: Fri, 17 Apr 2026 06:33:36 +0100 Subject: [PATCH] fix: clear stale telegram ACP bindings on startup (#67822) (thanks @chinar-amrutkar) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(telegram): clean up thread bindings to stale/failed ACP sessions on startup When loading persisted thread bindings on manager creation, validate each ACP session against the session store. Remove bindings where: - Session entry doesn't exist (deleted externally) - Session status is failed/killed/timeout - ACP runtime state is 'error' This addresses issue #60102 where Telegram DMs remained routed to stale ACP sessions even after restart, because the binding file persisted across restarts without validating the target session was still valid. * fix(telegram): guard against null session entry and transient store read failures Address review comments on PR #67822: 1. Skip bindings when readAcpSessionEntry returns null or when session store is temporarily unreadable (storeReadFailed: true). Without this, a transient I/O error would mark all ACP bindings as stale and delete them on every startup. 2. Only set needsPersist when bindings were actually removed. Previously, stale session keys from OTHER accounts could set needsPersist=true even when zero bindings were removed for the current account — causing spurious disk writes. Also clean up redundant optional chaining on entry.status now that we guard against undefined/nullable sessionEntry. * perf(telegram): dedupe ACP session reads in startup cleanup Cache readAcpSessionEntry calls by targetSessionKey. Multiple bindings to the same ACP session now result in a single session store read instead of one read per binding. Addresses chatgpt-codex-connector P2 review comment on PR #67822. * fix(telegram): skip non-ACP session keys in stale binding cleanup Address chatgpt-codex-connector P1 review comment on PR #67822: Plugin-bound Telegram conversations use "plugin-binding:*" keys with targetKind === "acp", but these are NOT ACP runtime sessions. readAcpSessionEntry() returns no entry for them, so !sessionEntry.entry would classify them as stale and delete them on every startup. Now checks isAcpSessionKey(binding.targetSessionKey) to skip plugin-bound sessions from the stale session cleanup scan. Also clarifies the comment to explain why we use targetKind === "acp" // together with isAcpSessionKey() check. * fix(telegram): import isAcpSessionKey from sessions/session-key-utils isAcpSessionKey is not re-exported from openclaw/plugin-sdk/routing. Fix import to use the correct subpath: openclaw/sessions/session-key-utils. Addresses chatgpt-codex-connector P1 review comment on PR #67822. * fix(telegram): import from relative path, remove unused variable - Import isAcpSessionKey from relative path ../../sessions/session-key-utils.js (not openclaw/sessions/session-key-utils which doesn't exist) - Remove unused 'bindings' variable in for-of loop Addresses CI failures on PR #67822. * fix(telegram): export isAcpSessionKey from plugin-sdk/routing isAcpSessionKey lives in src/routing/session-key.ts, which is already exported via openclaw/plugin-sdk/routing. Re-export it from routing.ts so extensions can import via the public plugin-sdk path. Fixes chatgpt-codex-connector P1: relative path ../../sessions/session-key-utils.js doesn't exist in the build output, making the Telegram extension fail module resolution before startup cleanup can run. * test(telegram): cover startup ACP binding cleanup * fix: clear stale telegram ACP bindings on startup (#67822) (thanks @chinar-amrutkar) --------- Co-authored-by: Ayaan Zaidi --- CHANGELOG.md | 1 + .../telegram/src/thread-bindings.test.ts | 147 ++++++++++++++++++ extensions/telegram/src/thread-bindings.ts | 55 ++++++- src/plugin-sdk/routing.ts | 1 + 4 files changed, 203 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6ec1a7a771..c5ef367a1bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai - Models/config: preserve an existing `models.json` provider `baseUrl` during merge-mode regeneration so custom endpoints do not get reset on restart. (#67893) Thanks @lawrence3699. - Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus. - Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus. +- Telegram/ACP bindings: drop persisted DM bindings that still point at missing or failed ACP sessions on restart, while preserving plugin-owned bindings and uncertain store reads. (#67822) Thanks @chinar-amrutkar. - Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu. ## 2026.4.15 diff --git a/extensions/telegram/src/thread-bindings.test.ts b/extensions/telegram/src/thread-bindings.test.ts index b5d78f16c01..b18059da371 100644 --- a/extensions/telegram/src/thread-bindings.test.ts +++ b/extensions/telegram/src/thread-bindings.test.ts @@ -7,6 +7,18 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { importFreshModule } from "../../../test/helpers/import-fresh.js"; const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn()); +const readAcpSessionEntryMock = vi.hoisted(() => vi.fn()); + +vi.mock("openclaw/plugin-sdk/acp-runtime", async () => { + const actual = await vi.importActual( + "openclaw/plugin-sdk/acp-runtime", + ); + readAcpSessionEntryMock.mockImplementation(actual.readAcpSessionEntry); + return { + ...actual, + readAcpSessionEntry: readAcpSessionEntryMock, + }; +}); vi.mock("openclaw/plugin-sdk/json-store", async () => { const actual = await vi.importActual( @@ -36,6 +48,11 @@ describe("telegram thread bindings", () => { beforeEach(async () => { writeJsonFileAtomicallyMock.mockClear(); + readAcpSessionEntryMock.mockReset(); + const acpRuntime = await vi.importActual( + "openclaw/plugin-sdk/acp-runtime", + ); + readAcpSessionEntryMock.mockImplementation(acpRuntime.readAcpSessionEntry); await __testing.resetTelegramThreadBindingsForTests(); }); @@ -293,6 +310,136 @@ describe("telegram thread bindings", () => { expect(reloaded.getByConversationId("8460800771")).toBeUndefined(); }); + it("cleans up stale ACP bindings before restart routing can reuse them", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + + createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:acp:stale-1", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "cleanup-me", + }, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + readAcpSessionEntryMock.mockReturnValue({ + cfg: {} as never, + storePath: "/tmp/acp-store.json", + sessionKey: "agent:main:acp:stale-1", + storeSessionKey: "agent:main:acp:stale-1", + entry: undefined, + acp: undefined, + storeReadFailed: false, + }); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("cleanup-me")).toBeUndefined(); + await __testing.resetTelegramThreadBindingsForTests(); + const persisted = JSON.parse( + fs.readFileSync( + path.join( + resolveStateDir(process.env, os.homedir), + "telegram", + "thread-bindings-default.json", + ), + "utf8", + ), + ) as { bindings?: Array<{ conversationId?: string }> }; + expect(persisted.bindings?.map((binding) => binding.conversationId)).not.toContain( + "cleanup-me", + ); + }); + + it("keeps plugin-owned bindings when ACP cleanup runs on startup", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + + createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "plugin-binding:openclaw-codex-app-server:still-valid", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "plugin-binding-convo", + }, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("plugin-binding-convo")?.targetSessionKey).toBe( + "plugin-binding:openclaw-codex-app-server:still-valid", + ); + expect(readAcpSessionEntryMock).not.toHaveBeenCalled(); + }); + + it("keeps ACP bindings when the session store cannot be read during startup cleanup", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + + createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:acp:read-failed", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "keep-on-read-failure", + }, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + readAcpSessionEntryMock.mockReturnValue({ + cfg: {} as never, + storePath: "/tmp/acp-store.json", + sessionKey: "agent:main:acp:read-failed", + storeSessionKey: "agent:main:acp:read-failed", + entry: undefined, + acp: undefined, + storeReadFailed: true, + }); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("keep-on-read-failure")?.targetSessionKey).toBe( + "agent:main:acp:read-failed", + ); + }); + it("flushes pending lifecycle update persists before test reset", async () => { stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); process.env.OPENCLAW_STATE_DIR = stateDirOverride; diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index 38e40eea017..e789cdd169b 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { readAcpSessionEntry } from "openclaw/plugin-sdk/acp-runtime"; import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import { formatThreadBindingDurationLabel, @@ -14,7 +15,7 @@ import { } from "openclaw/plugin-sdk/conversation-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; -import { normalizeAccountId } from "openclaw/plugin-sdk/routing"; +import { normalizeAccountId, isAcpSessionKey } from "openclaw/plugin-sdk/routing"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; @@ -440,6 +441,58 @@ export function createTelegramThreadBindingManager( }); } + const acpSessionKeys = new Set(); + for (const binding of getThreadBindingsState().bindingsByAccountConversation.values()) { + if (binding.targetKind !== "acp" || !isAcpSessionKey(binding.targetSessionKey)) { + continue; + } + acpSessionKeys.add(binding.targetSessionKey); + } + + const staleSessionKeys = new Set(); + for (const targetSessionKey of acpSessionKeys) { + const sessionEntry = readAcpSessionEntry({ sessionKey: targetSessionKey }); + if (!sessionEntry || sessionEntry.storeReadFailed) { + continue; + } + const isStale = + !sessionEntry.entry || + sessionEntry.entry.status === "failed" || + sessionEntry.entry.status === "killed" || + sessionEntry.entry.status === "timeout" || + sessionEntry.entry.acp?.state === "error"; + if (isStale) { + staleSessionKeys.add(targetSessionKey); + } + } + + let needsPersist = false; + for (const sessionKey of staleSessionKeys) { + const bindingsToRemove = listBindingsForAccount(accountId).filter( + (b) => b.targetSessionKey === sessionKey, + ); + for (const binding of bindingsToRemove) { + getThreadBindingsState().bindingsByAccountConversation.delete( + resolveBindingKey({ accountId, conversationId: binding.conversationId }), + ); + } + if (bindingsToRemove.length > 0) { + needsPersist = true; + logVerbose( + `telegram thread binding: cleaned up ${bindingsToRemove.length} stale binding(s) for session ${sessionKey}`, + ); + } + } + + if (needsPersist && persist) { + persistBindingsSafely({ + accountId, + persist: true, + bindings: listBindingsForAccount(accountId), + reason: "cleanup-stale", + }); + } + let sweepTimer: NodeJS.Timeout | null = null; const manager: TelegramThreadBindingManager = { diff --git a/src/plugin-sdk/routing.ts b/src/plugin-sdk/routing.ts index 6bf7917170b..b1c6d3b55ed 100644 --- a/src/plugin-sdk/routing.ts +++ b/src/plugin-sdk/routing.ts @@ -13,6 +13,7 @@ export { DEFAULT_MAIN_KEY, buildGroupHistoryKey, isCronSessionKey, + isAcpSessionKey, isSubagentSessionKey, normalizeAccountId, normalizeAgentId,