diff --git a/CHANGELOG.md b/CHANGELOG.md index b6ec1a7a771..c5ef367a1bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai - Models/config: preserve an existing `models.json` provider `baseUrl` during merge-mode regeneration so custom endpoints do not get reset on restart. (#67893) Thanks @lawrence3699. - Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus. - Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus. +- Telegram/ACP bindings: drop persisted DM bindings that still point at missing or failed ACP sessions on restart, while preserving plugin-owned bindings and uncertain store reads. (#67822) Thanks @chinar-amrutkar. - Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu. ## 2026.4.15 diff --git a/extensions/telegram/src/thread-bindings.test.ts b/extensions/telegram/src/thread-bindings.test.ts index b5d78f16c01..b18059da371 100644 --- a/extensions/telegram/src/thread-bindings.test.ts +++ b/extensions/telegram/src/thread-bindings.test.ts @@ -7,6 +7,18 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { importFreshModule } from "../../../test/helpers/import-fresh.js"; const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn()); +const readAcpSessionEntryMock = vi.hoisted(() => vi.fn()); + +vi.mock("openclaw/plugin-sdk/acp-runtime", async () => { + const actual = await vi.importActual( + "openclaw/plugin-sdk/acp-runtime", + ); + readAcpSessionEntryMock.mockImplementation(actual.readAcpSessionEntry); + return { + ...actual, + readAcpSessionEntry: readAcpSessionEntryMock, + }; +}); vi.mock("openclaw/plugin-sdk/json-store", async () => { const actual = await vi.importActual( @@ -36,6 +48,11 @@ describe("telegram thread bindings", () => { beforeEach(async () => { writeJsonFileAtomicallyMock.mockClear(); + readAcpSessionEntryMock.mockReset(); + const acpRuntime = await vi.importActual( + "openclaw/plugin-sdk/acp-runtime", + ); + readAcpSessionEntryMock.mockImplementation(acpRuntime.readAcpSessionEntry); await __testing.resetTelegramThreadBindingsForTests(); }); @@ -293,6 +310,136 @@ describe("telegram thread bindings", () => { expect(reloaded.getByConversationId("8460800771")).toBeUndefined(); }); + it("cleans up stale ACP bindings before restart routing can reuse them", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + + createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:acp:stale-1", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "cleanup-me", + }, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + readAcpSessionEntryMock.mockReturnValue({ + cfg: {} as never, + storePath: "/tmp/acp-store.json", + sessionKey: "agent:main:acp:stale-1", + storeSessionKey: "agent:main:acp:stale-1", + entry: undefined, + acp: undefined, + storeReadFailed: false, + }); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("cleanup-me")).toBeUndefined(); + await __testing.resetTelegramThreadBindingsForTests(); + const persisted = JSON.parse( + fs.readFileSync( + path.join( + resolveStateDir(process.env, os.homedir), + "telegram", + "thread-bindings-default.json", + ), + "utf8", + ), + ) as { bindings?: Array<{ conversationId?: string }> }; + expect(persisted.bindings?.map((binding) => binding.conversationId)).not.toContain( + "cleanup-me", + ); + }); + + it("keeps plugin-owned bindings when ACP cleanup runs on startup", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + + createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "plugin-binding:openclaw-codex-app-server:still-valid", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "plugin-binding-convo", + }, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("plugin-binding-convo")?.targetSessionKey).toBe( + "plugin-binding:openclaw-codex-app-server:still-valid", + ); + expect(readAcpSessionEntryMock).not.toHaveBeenCalled(); + }); + + it("keeps ACP bindings when the session store cannot be read during startup cleanup", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + + createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:acp:read-failed", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "keep-on-read-failure", + }, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + readAcpSessionEntryMock.mockReturnValue({ + cfg: {} as never, + storePath: "/tmp/acp-store.json", + sessionKey: "agent:main:acp:read-failed", + storeSessionKey: "agent:main:acp:read-failed", + entry: undefined, + acp: undefined, + storeReadFailed: true, + }); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "default", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("keep-on-read-failure")?.targetSessionKey).toBe( + "agent:main:acp:read-failed", + ); + }); + it("flushes pending lifecycle update persists before test reset", async () => { stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); process.env.OPENCLAW_STATE_DIR = stateDirOverride; diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index 38e40eea017..e789cdd169b 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { readAcpSessionEntry } from "openclaw/plugin-sdk/acp-runtime"; import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import { formatThreadBindingDurationLabel, @@ -14,7 +15,7 @@ import { } from "openclaw/plugin-sdk/conversation-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; -import { normalizeAccountId } from "openclaw/plugin-sdk/routing"; +import { normalizeAccountId, isAcpSessionKey } from "openclaw/plugin-sdk/routing"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; @@ -440,6 +441,58 @@ export function createTelegramThreadBindingManager( }); } + const acpSessionKeys = new Set(); + for (const binding of getThreadBindingsState().bindingsByAccountConversation.values()) { + if (binding.targetKind !== "acp" || !isAcpSessionKey(binding.targetSessionKey)) { + continue; + } + acpSessionKeys.add(binding.targetSessionKey); + } + + const staleSessionKeys = new Set(); + for (const targetSessionKey of acpSessionKeys) { + const sessionEntry = readAcpSessionEntry({ sessionKey: targetSessionKey }); + if (!sessionEntry || sessionEntry.storeReadFailed) { + continue; + } + const isStale = + !sessionEntry.entry || + sessionEntry.entry.status === "failed" || + sessionEntry.entry.status === "killed" || + sessionEntry.entry.status === "timeout" || + sessionEntry.entry.acp?.state === "error"; + if (isStale) { + staleSessionKeys.add(targetSessionKey); + } + } + + let needsPersist = false; + for (const sessionKey of staleSessionKeys) { + const bindingsToRemove = listBindingsForAccount(accountId).filter( + (b) => b.targetSessionKey === sessionKey, + ); + for (const binding of bindingsToRemove) { + getThreadBindingsState().bindingsByAccountConversation.delete( + resolveBindingKey({ accountId, conversationId: binding.conversationId }), + ); + } + if (bindingsToRemove.length > 0) { + needsPersist = true; + logVerbose( + `telegram thread binding: cleaned up ${bindingsToRemove.length} stale binding(s) for session ${sessionKey}`, + ); + } + } + + if (needsPersist && persist) { + persistBindingsSafely({ + accountId, + persist: true, + bindings: listBindingsForAccount(accountId), + reason: "cleanup-stale", + }); + } + let sweepTimer: NodeJS.Timeout | null = null; const manager: TelegramThreadBindingManager = { diff --git a/src/plugin-sdk/routing.ts b/src/plugin-sdk/routing.ts index 6bf7917170b..b1c6d3b55ed 100644 --- a/src/plugin-sdk/routing.ts +++ b/src/plugin-sdk/routing.ts @@ -13,6 +13,7 @@ export { DEFAULT_MAIN_KEY, buildGroupHistoryKey, isCronSessionKey, + isAcpSessionKey, isSubagentSessionKey, normalizeAccountId, normalizeAgentId,