Files
openclaw/extensions/telegram/src/thread-bindings.test.ts
Chinar Amrutkar 8205de84a9 fix: clear stale telegram ACP bindings on startup (#67822) (thanks @chinar-amrutkar)
* fix(telegram): clean up thread bindings to stale/failed ACP sessions on startup

When loading persisted thread bindings on manager creation, validate each
ACP session against the session store. Remove bindings where:
- Session entry doesn't exist (deleted externally)
- Session status is failed/killed/timeout
- ACP runtime state is 'error'

This addresses issue #60102 where Telegram DMs remained routed to stale
ACP sessions even after restart, because the binding file persisted
across restarts without validating the target session was still valid.

* fix(telegram): guard against null session entry and transient store read failures

Address review comments on PR #67822:

1. Skip bindings when readAcpSessionEntry returns null or when
   session store is temporarily unreadable (storeReadFailed: true).
   Without this, a transient I/O error would mark all ACP bindings
   as stale and delete them on every startup.

2. Only set needsPersist when bindings were actually removed.
   Previously, stale session keys from OTHER accounts could set
   needsPersist=true even when zero bindings were removed for
   the current account — causing spurious disk writes.

Also clean up redundant optional chaining on entry.status now
that we guard against undefined/nullable sessionEntry.

* perf(telegram): dedupe ACP session reads in startup cleanup

Cache readAcpSessionEntry calls by targetSessionKey. Multiple bindings
to the same ACP session now result in a single session store read instead
of one read per binding.

Addresses chatgpt-codex-connector P2 review comment on PR #67822.

* fix(telegram): skip non-ACP session keys in stale binding cleanup

Address chatgpt-codex-connector P1 review comment on PR #67822:

Plugin-bound Telegram conversations use "plugin-binding:*" keys
with targetKind === "acp", but these are NOT ACP runtime sessions.
readAcpSessionEntry() returns no entry for them, so !sessionEntry.entry
would classify them as stale and delete them on every startup.

Now checks isAcpSessionKey(binding.targetSessionKey) to skip plugin-bound
sessions from the stale session cleanup scan.

Also clarifies the comment to explain why we use targetKind === "acp"
// together with isAcpSessionKey() check.

* fix(telegram): import isAcpSessionKey from sessions/session-key-utils

isAcpSessionKey is not re-exported from openclaw/plugin-sdk/routing.
Fix import to use the correct subpath: openclaw/sessions/session-key-utils.

Addresses chatgpt-codex-connector P1 review comment on PR #67822.

* fix(telegram): import from relative path, remove unused variable

- Import isAcpSessionKey from relative path ../../sessions/session-key-utils.js
  (not openclaw/sessions/session-key-utils which doesn't exist)
- Remove unused 'bindings' variable in for-of loop

Addresses CI failures on PR #67822.

* fix(telegram): export isAcpSessionKey from plugin-sdk/routing

isAcpSessionKey lives in src/routing/session-key.ts, which is already
exported via openclaw/plugin-sdk/routing. Re-export it from routing.ts
so extensions can import via the public plugin-sdk path.

Fixes chatgpt-codex-connector P1: relative path ../../sessions/session-key-utils.js
doesn't exist in the build output, making the Telegram extension fail
module resolution before startup cleanup can run.

* test(telegram): cover startup ACP binding cleanup

* fix: clear stale telegram ACP bindings on startup (#67822) (thanks @chinar-amrutkar)

---------

Co-authored-by: Ayaan Zaidi <hi@obviy.us>
2026-04-17 11:03:36 +05:30

523 lines
16 KiB
TypeScript

import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn());
const readAcpSessionEntryMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/acp-runtime", async () => {
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/acp-runtime")>(
"openclaw/plugin-sdk/acp-runtime",
);
readAcpSessionEntryMock.mockImplementation(actual.readAcpSessionEntry);
return {
...actual,
readAcpSessionEntry: readAcpSessionEntryMock,
};
});
vi.mock("openclaw/plugin-sdk/json-store", async () => {
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/json-store")>(
"openclaw/plugin-sdk/json-store",
);
writeJsonFileAtomicallyMock.mockImplementation(actual.writeJsonFileAtomically);
return {
...actual,
writeJsonFileAtomically: writeJsonFileAtomicallyMock,
};
});
import {
__testing,
createTelegramThreadBindingManager,
setTelegramThreadBindingIdleTimeoutBySessionKey,
setTelegramThreadBindingMaxAgeBySessionKey,
} from "./thread-bindings.js";
async function flushMicrotasks(): Promise<void> {
await Promise.resolve();
await new Promise<void>((resolve) => queueMicrotask(resolve));
}
describe("telegram thread bindings", () => {
let stateDirOverride: string | undefined;
beforeEach(async () => {
writeJsonFileAtomicallyMock.mockClear();
readAcpSessionEntryMock.mockReset();
const acpRuntime = await vi.importActual<typeof import("openclaw/plugin-sdk/acp-runtime")>(
"openclaw/plugin-sdk/acp-runtime",
);
readAcpSessionEntryMock.mockImplementation(acpRuntime.readAcpSessionEntry);
await __testing.resetTelegramThreadBindingsForTests();
});
afterEach(async () => {
vi.useRealTimers();
await __testing.resetTelegramThreadBindingsForTests();
if (stateDirOverride) {
delete process.env.OPENCLAW_STATE_DIR;
fs.rmSync(stateDirOverride, { recursive: true, force: true });
stateDirOverride = undefined;
}
});
it("registers a telegram binding adapter and binds current conversations", async () => {
const manager = createTelegramThreadBindingManager({
accountId: "work",
persist: false,
enableSweeper: false,
idleTimeoutMs: 30_000,
maxAgeMs: 0,
});
const bound = await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-1",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "work",
conversationId: "-100200300:topic:77",
},
placement: "current",
metadata: {
boundBy: "user-1",
},
});
expect(bound.conversation.channel).toBe("telegram");
expect(bound.conversation.accountId).toBe("work");
expect(bound.conversation.conversationId).toBe("-100200300:topic:77");
expect(bound.targetSessionKey).toBe("agent:main:subagent:child-1");
expect(manager.getByConversationId("-100200300:topic:77")?.boundBy).toBe("user-1");
});
it("rejects child placement when conversationId is a bare topic ID with no group context", async () => {
createTelegramThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
});
await expect(
getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-1",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "77",
},
placement: "child",
}),
).rejects.toMatchObject({
code: "BINDING_CREATE_FAILED",
});
});
it("rejects child placement when parentConversationId is also a bare topic ID", async () => {
createTelegramThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
});
await expect(
getSessionBindingService().bind({
targetSessionKey: "agent:main:acp:child-acp-1",
targetKind: "session",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "77",
parentConversationId: "99",
},
placement: "child",
}),
).rejects.toMatchObject({
code: "BINDING_CREATE_FAILED",
});
});
it("shares binding state across distinct module instances", async () => {
const bindingsA = await importFreshModule<typeof import("./thread-bindings.js")>(
import.meta.url,
"./thread-bindings.js?scope=shared-a",
);
const bindingsB = await importFreshModule<typeof import("./thread-bindings.js")>(
import.meta.url,
"./thread-bindings.js?scope=shared-b",
);
await bindingsA.__testing.resetTelegramThreadBindingsForTests();
try {
const managerA = bindingsA.createTelegramThreadBindingManager({
accountId: "shared-runtime",
persist: false,
enableSweeper: false,
});
const managerB = bindingsB.createTelegramThreadBindingManager({
accountId: "shared-runtime",
persist: false,
enableSweeper: false,
});
expect(managerB).toBe(managerA);
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-shared",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "shared-runtime",
conversationId: "-100200300:topic:44",
},
placement: "current",
});
expect(
bindingsB
.getTelegramThreadBindingManager("shared-runtime")
?.getByConversationId("-100200300:topic:44")?.targetSessionKey,
).toBe("agent:main:subagent:child-shared");
} finally {
await bindingsA.__testing.resetTelegramThreadBindingsForTests();
}
});
it("updates lifecycle windows by session key", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
const manager = createTelegramThreadBindingManager({
accountId: "work",
persist: false,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-1",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "work",
conversationId: "1234",
},
});
const original = manager.listBySessionKey("agent:main:subagent:child-1")[0];
expect(original).toBeDefined();
const idleUpdated = setTelegramThreadBindingIdleTimeoutBySessionKey({
accountId: "work",
targetSessionKey: "agent:main:subagent:child-1",
idleTimeoutMs: 2 * 60 * 60 * 1000,
});
vi.setSystemTime(new Date("2026-03-06T12:00:00.000Z"));
const maxAgeUpdated = setTelegramThreadBindingMaxAgeBySessionKey({
accountId: "work",
targetSessionKey: "agent:main:subagent:child-1",
maxAgeMs: 6 * 60 * 60 * 1000,
});
expect(idleUpdated).toHaveLength(1);
expect(idleUpdated[0]?.idleTimeoutMs).toBe(2 * 60 * 60 * 1000);
expect(maxAgeUpdated).toHaveLength(1);
expect(maxAgeUpdated[0]?.maxAgeMs).toBe(6 * 60 * 60 * 1000);
expect(maxAgeUpdated[0]?.boundAt).toBe(original?.boundAt);
expect(maxAgeUpdated[0]?.lastActivityAt).toBe(Date.parse("2026-03-06T12:00:00.000Z"));
expect(manager.listBySessionKey("agent:main:subagent:child-1")[0]?.maxAgeMs).toBe(
6 * 60 * 60 * 1000,
);
});
it("does not persist lifecycle updates when manager persistence is disabled", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
createTelegramThreadBindingManager({
accountId: "no-persist",
persist: false,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-2",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "no-persist",
conversationId: "-100200300:topic:88",
},
});
setTelegramThreadBindingIdleTimeoutBySessionKey({
accountId: "no-persist",
targetSessionKey: "agent:main:subagent:child-2",
idleTimeoutMs: 60 * 60 * 1000,
});
setTelegramThreadBindingMaxAgeBySessionKey({
accountId: "no-persist",
targetSessionKey: "agent:main:subagent:child-2",
maxAgeMs: 2 * 60 * 60 * 1000,
});
const statePath = path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-no-persist.json",
);
expect(fs.existsSync(statePath)).toBe(false);
});
it("persists unbinds before restart so removed bindings do not come back", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
const bound = await getSessionBindingService().bind({
targetSessionKey: "plugin-binding:openclaw-codex-app-server:abc123",
targetKind: "session",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "8460800771",
},
});
await getSessionBindingService().unbind({
bindingId: bound.bindingId,
reason: "test-detach",
});
await __testing.resetTelegramThreadBindingsForTests();
const reloaded = createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
expect(reloaded.getByConversationId("8460800771")).toBeUndefined();
});
it("cleans up stale ACP bindings before restart routing can reuse them", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:acp:stale-1",
targetKind: "session",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "cleanup-me",
},
});
await __testing.resetTelegramThreadBindingsForTests();
readAcpSessionEntryMock.mockReturnValue({
cfg: {} as never,
storePath: "/tmp/acp-store.json",
sessionKey: "agent:main:acp:stale-1",
storeSessionKey: "agent:main:acp:stale-1",
entry: undefined,
acp: undefined,
storeReadFailed: false,
});
const reloaded = createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
expect(reloaded.getByConversationId("cleanup-me")).toBeUndefined();
await __testing.resetTelegramThreadBindingsForTests();
const persisted = JSON.parse(
fs.readFileSync(
path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-default.json",
),
"utf8",
),
) as { bindings?: Array<{ conversationId?: string }> };
expect(persisted.bindings?.map((binding) => binding.conversationId)).not.toContain(
"cleanup-me",
);
});
it("keeps plugin-owned bindings when ACP cleanup runs on startup", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "plugin-binding:openclaw-codex-app-server:still-valid",
targetKind: "session",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "plugin-binding-convo",
},
});
await __testing.resetTelegramThreadBindingsForTests();
const reloaded = createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
expect(reloaded.getByConversationId("plugin-binding-convo")?.targetSessionKey).toBe(
"plugin-binding:openclaw-codex-app-server:still-valid",
);
expect(readAcpSessionEntryMock).not.toHaveBeenCalled();
});
it("keeps ACP bindings when the session store cannot be read during startup cleanup", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:acp:read-failed",
targetKind: "session",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "keep-on-read-failure",
},
});
await __testing.resetTelegramThreadBindingsForTests();
readAcpSessionEntryMock.mockReturnValue({
cfg: {} as never,
storePath: "/tmp/acp-store.json",
sessionKey: "agent:main:acp:read-failed",
storeSessionKey: "agent:main:acp:read-failed",
entry: undefined,
acp: undefined,
storeReadFailed: true,
});
const reloaded = createTelegramThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
});
expect(reloaded.getByConversationId("keep-on-read-failure")?.targetSessionKey).toBe(
"agent:main:acp:read-failed",
);
});
it("flushes pending lifecycle update persists before test reset", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
createTelegramThreadBindingManager({
accountId: "persist-reset",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-3",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "persist-reset",
conversationId: "-100200300:topic:99",
},
});
setTelegramThreadBindingIdleTimeoutBySessionKey({
accountId: "persist-reset",
targetSessionKey: "agent:main:subagent:child-3",
idleTimeoutMs: 90_000,
});
await __testing.resetTelegramThreadBindingsForTests();
const statePath = path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-persist-reset.json",
);
const persisted = JSON.parse(fs.readFileSync(statePath, "utf8")) as {
bindings?: Array<{ idleTimeoutMs?: number }>;
};
expect(persisted.bindings?.[0]?.idleTimeoutMs).toBe(90_000);
});
it("does not leak unhandled rejections when a persist write fails", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
const unhandled: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {
unhandled.push(reason);
};
process.on("unhandledRejection", onUnhandledRejection);
try {
const manager = createTelegramThreadBindingManager({
accountId: "persist-failure",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-persist-failure",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "persist-failure",
conversationId: "-100200300:topic:100",
},
});
writeJsonFileAtomicallyMock.mockImplementationOnce(async () => {
throw new Error("persist boom");
});
manager.touchConversation("-100200300:topic:100");
await __testing.resetTelegramThreadBindingsForTests();
await flushMicrotasks();
expect(unhandled).toEqual([]);
} finally {
process.off("unhandledRejection", onUnhandledRejection);
}
});
});