mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:40:44 +00:00
fix: clear stale telegram ACP bindings on startup (#67822) (thanks @chinar-amrutkar)
* fix(telegram): clean up thread bindings to stale/failed ACP sessions on startup When loading persisted thread bindings on manager creation, validate each ACP session against the session store. Remove bindings where: - Session entry doesn't exist (deleted externally) - Session status is failed/killed/timeout - ACP runtime state is 'error' This addresses issue #60102 where Telegram DMs remained routed to stale ACP sessions even after restart, because the binding file persisted across restarts without validating the target session was still valid. * fix(telegram): guard against null session entry and transient store read failures Address review comments on PR #67822: 1. Skip bindings when readAcpSessionEntry returns null or when session store is temporarily unreadable (storeReadFailed: true). Without this, a transient I/O error would mark all ACP bindings as stale and delete them on every startup. 2. Only set needsPersist when bindings were actually removed. Previously, stale session keys from OTHER accounts could set needsPersist=true even when zero bindings were removed for the current account — causing spurious disk writes. Also clean up redundant optional chaining on entry.status now that we guard against undefined/nullable sessionEntry. * perf(telegram): dedupe ACP session reads in startup cleanup Cache readAcpSessionEntry calls by targetSessionKey. Multiple bindings to the same ACP session now result in a single session store read instead of one read per binding. Addresses chatgpt-codex-connector P2 review comment on PR #67822. * fix(telegram): skip non-ACP session keys in stale binding cleanup Address chatgpt-codex-connector P1 review comment on PR #67822: Plugin-bound Telegram conversations use "plugin-binding:*" keys with targetKind === "acp", but these are NOT ACP runtime sessions. readAcpSessionEntry() returns no entry for them, so !sessionEntry.entry would classify them as stale and delete them on every startup. Now checks isAcpSessionKey(binding.targetSessionKey) to skip plugin-bound sessions from the stale session cleanup scan. Also clarifies the comment to explain why we use targetKind === "acp" // together with isAcpSessionKey() check. * fix(telegram): import isAcpSessionKey from sessions/session-key-utils isAcpSessionKey is not re-exported from openclaw/plugin-sdk/routing. Fix import to use the correct subpath: openclaw/sessions/session-key-utils. Addresses chatgpt-codex-connector P1 review comment on PR #67822. * fix(telegram): import from relative path, remove unused variable - Import isAcpSessionKey from relative path ../../sessions/session-key-utils.js (not openclaw/sessions/session-key-utils which doesn't exist) - Remove unused 'bindings' variable in for-of loop Addresses CI failures on PR #67822. * fix(telegram): export isAcpSessionKey from plugin-sdk/routing isAcpSessionKey lives in src/routing/session-key.ts, which is already exported via openclaw/plugin-sdk/routing. Re-export it from routing.ts so extensions can import via the public plugin-sdk path. Fixes chatgpt-codex-connector P1: relative path ../../sessions/session-key-utils.js doesn't exist in the build output, making the Telegram extension fail module resolution before startup cleanup can run. * test(telegram): cover startup ACP binding cleanup * fix: clear stale telegram ACP bindings on startup (#67822) (thanks @chinar-amrutkar) --------- Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Models/config: preserve an existing `models.json` provider `baseUrl` during merge-mode regeneration so custom endpoints do not get reset on restart. (#67893) Thanks @lawrence3699.
|
||||
- Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus.
|
||||
- Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus.
|
||||
- Telegram/ACP bindings: drop persisted DM bindings that still point at missing or failed ACP sessions on restart, while preserving plugin-owned bindings and uncertain store reads. (#67822) Thanks @chinar-amrutkar.
|
||||
- Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu.
|
||||
|
||||
## 2026.4.15
|
||||
|
||||
@@ -7,6 +7,18 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
|
||||
|
||||
const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn());
|
||||
const readAcpSessionEntryMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/acp-runtime", async () => {
|
||||
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/acp-runtime")>(
|
||||
"openclaw/plugin-sdk/acp-runtime",
|
||||
);
|
||||
readAcpSessionEntryMock.mockImplementation(actual.readAcpSessionEntry);
|
||||
return {
|
||||
...actual,
|
||||
readAcpSessionEntry: readAcpSessionEntryMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/json-store", async () => {
|
||||
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/json-store")>(
|
||||
@@ -36,6 +48,11 @@ describe("telegram thread bindings", () => {
|
||||
|
||||
beforeEach(async () => {
|
||||
writeJsonFileAtomicallyMock.mockClear();
|
||||
readAcpSessionEntryMock.mockReset();
|
||||
const acpRuntime = await vi.importActual<typeof import("openclaw/plugin-sdk/acp-runtime")>(
|
||||
"openclaw/plugin-sdk/acp-runtime",
|
||||
);
|
||||
readAcpSessionEntryMock.mockImplementation(acpRuntime.readAcpSessionEntry);
|
||||
await __testing.resetTelegramThreadBindingsForTests();
|
||||
});
|
||||
|
||||
@@ -293,6 +310,136 @@ describe("telegram thread bindings", () => {
|
||||
expect(reloaded.getByConversationId("8460800771")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("cleans up stale ACP bindings before restart routing can reuse them", async () => {
|
||||
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
|
||||
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
|
||||
|
||||
createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: true,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:acp:stale-1",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "cleanup-me",
|
||||
},
|
||||
});
|
||||
|
||||
await __testing.resetTelegramThreadBindingsForTests();
|
||||
readAcpSessionEntryMock.mockReturnValue({
|
||||
cfg: {} as never,
|
||||
storePath: "/tmp/acp-store.json",
|
||||
sessionKey: "agent:main:acp:stale-1",
|
||||
storeSessionKey: "agent:main:acp:stale-1",
|
||||
entry: undefined,
|
||||
acp: undefined,
|
||||
storeReadFailed: false,
|
||||
});
|
||||
|
||||
const reloaded = createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: true,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
expect(reloaded.getByConversationId("cleanup-me")).toBeUndefined();
|
||||
await __testing.resetTelegramThreadBindingsForTests();
|
||||
const persisted = JSON.parse(
|
||||
fs.readFileSync(
|
||||
path.join(
|
||||
resolveStateDir(process.env, os.homedir),
|
||||
"telegram",
|
||||
"thread-bindings-default.json",
|
||||
),
|
||||
"utf8",
|
||||
),
|
||||
) as { bindings?: Array<{ conversationId?: string }> };
|
||||
expect(persisted.bindings?.map((binding) => binding.conversationId)).not.toContain(
|
||||
"cleanup-me",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps plugin-owned bindings when ACP cleanup runs on startup", async () => {
|
||||
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
|
||||
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
|
||||
|
||||
createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: true,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "plugin-binding:openclaw-codex-app-server:still-valid",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "plugin-binding-convo",
|
||||
},
|
||||
});
|
||||
|
||||
await __testing.resetTelegramThreadBindingsForTests();
|
||||
|
||||
const reloaded = createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: true,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
expect(reloaded.getByConversationId("plugin-binding-convo")?.targetSessionKey).toBe(
|
||||
"plugin-binding:openclaw-codex-app-server:still-valid",
|
||||
);
|
||||
expect(readAcpSessionEntryMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps ACP bindings when the session store cannot be read during startup cleanup", async () => {
|
||||
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
|
||||
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
|
||||
|
||||
createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: true,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:acp:read-failed",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "keep-on-read-failure",
|
||||
},
|
||||
});
|
||||
|
||||
await __testing.resetTelegramThreadBindingsForTests();
|
||||
readAcpSessionEntryMock.mockReturnValue({
|
||||
cfg: {} as never,
|
||||
storePath: "/tmp/acp-store.json",
|
||||
sessionKey: "agent:main:acp:read-failed",
|
||||
storeSessionKey: "agent:main:acp:read-failed",
|
||||
entry: undefined,
|
||||
acp: undefined,
|
||||
storeReadFailed: true,
|
||||
});
|
||||
|
||||
const reloaded = createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: true,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
expect(reloaded.getByConversationId("keep-on-read-failure")?.targetSessionKey).toBe(
|
||||
"agent:main:acp:read-failed",
|
||||
);
|
||||
});
|
||||
|
||||
it("flushes pending lifecycle update persists before test reset", async () => {
|
||||
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
|
||||
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { readAcpSessionEntry } from "openclaw/plugin-sdk/acp-runtime";
|
||||
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
|
||||
import {
|
||||
formatThreadBindingDurationLabel,
|
||||
@@ -14,7 +15,7 @@ import {
|
||||
} from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
|
||||
import { normalizeAccountId } from "openclaw/plugin-sdk/routing";
|
||||
import { normalizeAccountId, isAcpSessionKey } from "openclaw/plugin-sdk/routing";
|
||||
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
|
||||
@@ -440,6 +441,58 @@ export function createTelegramThreadBindingManager(
|
||||
});
|
||||
}
|
||||
|
||||
const acpSessionKeys = new Set<string>();
|
||||
for (const binding of getThreadBindingsState().bindingsByAccountConversation.values()) {
|
||||
if (binding.targetKind !== "acp" || !isAcpSessionKey(binding.targetSessionKey)) {
|
||||
continue;
|
||||
}
|
||||
acpSessionKeys.add(binding.targetSessionKey);
|
||||
}
|
||||
|
||||
const staleSessionKeys = new Set<string>();
|
||||
for (const targetSessionKey of acpSessionKeys) {
|
||||
const sessionEntry = readAcpSessionEntry({ sessionKey: targetSessionKey });
|
||||
if (!sessionEntry || sessionEntry.storeReadFailed) {
|
||||
continue;
|
||||
}
|
||||
const isStale =
|
||||
!sessionEntry.entry ||
|
||||
sessionEntry.entry.status === "failed" ||
|
||||
sessionEntry.entry.status === "killed" ||
|
||||
sessionEntry.entry.status === "timeout" ||
|
||||
sessionEntry.entry.acp?.state === "error";
|
||||
if (isStale) {
|
||||
staleSessionKeys.add(targetSessionKey);
|
||||
}
|
||||
}
|
||||
|
||||
let needsPersist = false;
|
||||
for (const sessionKey of staleSessionKeys) {
|
||||
const bindingsToRemove = listBindingsForAccount(accountId).filter(
|
||||
(b) => b.targetSessionKey === sessionKey,
|
||||
);
|
||||
for (const binding of bindingsToRemove) {
|
||||
getThreadBindingsState().bindingsByAccountConversation.delete(
|
||||
resolveBindingKey({ accountId, conversationId: binding.conversationId }),
|
||||
);
|
||||
}
|
||||
if (bindingsToRemove.length > 0) {
|
||||
needsPersist = true;
|
||||
logVerbose(
|
||||
`telegram thread binding: cleaned up ${bindingsToRemove.length} stale binding(s) for session ${sessionKey}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (needsPersist && persist) {
|
||||
persistBindingsSafely({
|
||||
accountId,
|
||||
persist: true,
|
||||
bindings: listBindingsForAccount(accountId),
|
||||
reason: "cleanup-stale",
|
||||
});
|
||||
}
|
||||
|
||||
let sweepTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
const manager: TelegramThreadBindingManager = {
|
||||
|
||||
@@ -13,6 +13,7 @@ export {
|
||||
DEFAULT_MAIN_KEY,
|
||||
buildGroupHistoryKey,
|
||||
isCronSessionKey,
|
||||
isAcpSessionKey,
|
||||
isSubagentSessionKey,
|
||||
normalizeAccountId,
|
||||
normalizeAgentId,
|
||||
|
||||
Reference in New Issue
Block a user