fix: keep Telegram DM thread IDs on flat sessions

This commit is contained in:
Peter Steinberger
2026-05-02 10:48:34 +01:00
parent f727fbc775
commit 7db255150c
15 changed files with 264 additions and 28 deletions

View File

@@ -56,7 +56,7 @@ Docs: https://docs.openclaw.ai
- Control UI/Talk: allow the OpenAI Realtime WebRTC offer endpoint through the Control UI CSP, configure browser sessions with explicit VAD/transcription input settings, and surface OpenAI realtime error/lifecycle events instead of leaving Talk stuck as live with no diagnostic. Fixes #73427.
- Plugins: clarify config-selected duplicate plugin override diagnostics and document manifest schema updates for bundled-plugin forks. Fixes #8582. Thanks @sachah.
- CLI backends/Claude: make live-session JSONL turn caps bounded and configurable via `reliability.outputLimits`, raising the default guard for tool-heavy Claude CLI turns while preserving memory limits. Fixes #75838. Thanks @hcordoba840.
- Telegram/DMs: keep incidental `message_thread_id` reply-with-quote metadata on the flat DM session by default while preserving opt-in DM topic isolation for configured topics. Fixes #75975. Thanks @ProjectEvolutionEVE.
- Telegram/DMs: keep incidental `message_thread_id` reply-with-quote metadata on the flat DM session by default while preserving opt-in DM topic isolation for configured topics, `dm.threadReplies`, and `direct.<chatId>.threadReplies`. Fixes #75975. Thanks @ProjectEvolutionEVE.
- Providers/OpenAI: resolve `keychain:<service>:<account>` `OPENAI_API_KEY` refs before creating OpenAI Realtime browser sessions or voice bridges, with a bounded cached Keychain lookup. Fixes #72120. Thanks @ctbritt.
- Discord/gateway: reconnect when the gateway socket closes while waiting for the shared IDENTIFY concurrency window, instead of silently skipping IDENTIFY and leaving the bot online but unresponsive. Fixes #74617. Thanks @zeeskdr-ai.
- Voice Call: add `sessionScope: "per-call"` for fresh per-call agent memory while preserving the default per-phone caller history. Fixes #45280. Thanks @pondcountry.

View File

@@ -1,4 +1,4 @@
ba41e5775c361dba63fec0441f943106d8cd0cb0f10c10fee36becc1555d5059 config-baseline.json
7d12cd37e835641b78986fa909208e34990a51b0cfe71480ecbd073db4576a0a config-baseline.json
7b1716d578d22e5b4388f56140b50d326f61327b760f8c580bdd9b971335fb85 config-baseline.core.json
74632b512b6470a155652c7d15b9e430738a05df3b5a85dca16cc4d84dcea764 config-baseline.channel.json
a2a949a99f5cc5960d4d7ae0159b6b48c4d6b1f813be67cda196457ab2f88034 config-baseline.channel.json
fffe0e74eab92a88c3c57952a70bc932438ce3a7f5f9982688437f2cdaee0bcb config-baseline.plugin.json

View File

@@ -260,7 +260,7 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
- Routing is deterministic: Telegram inbound replies back to Telegram (the model does not pick channels).
- Inbound messages normalize into the shared channel envelope with reply metadata and media placeholders.
- Group sessions are isolated by group ID. Forum topics append `:topic:<threadId>` to keep topics isolated.
- DM messages can carry `message_thread_id`; OpenClaw preserves the thread ID for replies but keeps DMs on the flat session by default. Configure `channels.telegram.direct.<chatId>.threadReplies: "inbound"` or `requireTopic: true` when you intentionally want DM topic session isolation.
- DM messages can carry `message_thread_id`; OpenClaw preserves the thread ID for replies but keeps DMs on the flat session by default. Configure `channels.telegram.dm.threadReplies: "inbound"`, `channels.telegram.direct.<chatId>.threadReplies: "inbound"`, `requireTopic: true`, or a matching topic config when you intentionally want DM topic session isolation.
- Long polling uses grammY runner with per-chat/per-thread sequencing. Overall runner sink concurrency uses `agents.defaults.maxConcurrent`.
- Long polling is guarded inside each gateway process so only one active poller can use a bot token at a time. If you still see `getUpdates` 409 conflicts, another OpenClaw gateway, script, or external poller is likely using the same token.
- Long-polling watchdog restarts trigger after 120 seconds without completed `getUpdates` liveness by default. Increase `channels.telegram.pollingStallThresholdMs` only if your deployment still sees false polling-stall restarts during long-running work. The value is in milliseconds and is allowed from `30000` to `600000`; per-account overrides are supported.
@@ -542,7 +542,7 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
**Thread-bound ACP spawn from chat**: `/acp spawn <agent> --thread here|auto` binds the current topic to a new ACP session; follow-ups route there directly. OpenClaw pins the spawn confirmation in-topic. Requires `channels.telegram.threadBindings.spawnSessions` to remain enabled (default: `true`).
Template context exposes `MessageThreadId` and `IsForum`. DM chats with `message_thread_id` keep DM routing and reply metadata; they only use thread-aware session keys when the DM is configured with `threadReplies: "inbound"`, `threadReplies: "always"`, `requireTopic: true`, or a matching topic config.
Template context exposes `MessageThreadId` and `IsForum`. DM chats with `message_thread_id` keep DM routing and reply metadata on flat sessions by default; they only use thread-aware session keys when configured with `threadReplies: "inbound"`, `threadReplies: "always"`, `requireTopic: true`, or a matching topic config. Use top-level `channels.telegram.dm.threadReplies` for the account default, or `direct.<chatId>.threadReplies` for one DM.
</Accordion>
@@ -941,7 +941,7 @@ Primary reference: [Configuration reference - Telegram](/gateway/config-channels
- access control: `dmPolicy`, `allowFrom`, `groupPolicy`, `groupAllowFrom`, `groups`, `groups.*.topics.*`, top-level `bindings[]` (`type: "acp"`)
- exec approvals: `execApprovals`, `accounts.*.execApprovals`
- command/menu: `commands.native`, `commands.nativeSkills`, `customCommands`
- threading/replies: `replyToMode`
- threading/replies: `replyToMode`, `dm.threadReplies`, `direct.*.threadReplies`
- streaming: `streaming` (preview), `streaming.preview.toolProgress`, `blockStreaming`
- formatting/delivery: `textChunkLimit`, `chunkMode`, `linkPreview`, `responsePrefix`
- media/network: `mediaMaxMb`, `timeoutSeconds`, `pollingStallThresholdMs`, `retry`, `network.autoSelectFamily`, `network.dangerouslyAllowPrivateNetwork`, `proxy`

View File

@@ -34,7 +34,7 @@ import {
resolveSessionStoreEntry,
updateSessionStore,
} from "openclaw/plugin-sdk/session-store-runtime";
import { resolveTelegramMediaRuntimeOptions } from "./accounts.js";
import { resolveTelegramAccount, resolveTelegramMediaRuntimeOptions } from "./accounts.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import {
isSenderAllowed,
@@ -329,6 +329,12 @@ export const registerTelegramHandlers = ({
const directConfig = !params.isGroup
? (groupConfig as TelegramDirectConfig | undefined)
: undefined;
let accountConfig = telegramCfg;
try {
accountConfig = resolveTelegramAccount({ cfg: runtimeCfg, accountId }).config;
} catch {
// Keep the startup snapshot when live config is temporarily unavailable.
}
const { route } = resolveTelegramConversationRoute({
cfg: runtimeCfg,
accountId,
@@ -346,9 +352,11 @@ export const registerTelegramHandlers = ({
isGroup: params.isGroup,
senderId: params.senderId,
});
const threadKeys = shouldUseTelegramDmThreadSession({ dmThreadId, directConfig, topicConfig })
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` })
: null;
const threadKeys =
shouldUseTelegramDmThreadSession({ dmThreadId, accountConfig, directConfig, topicConfig }) &&
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const storePath = telegramDeps.resolveStorePath(runtimeCfg.session?.store, {
agentId: route.agentId,

View File

@@ -63,7 +63,7 @@ describe("buildTelegramMessageContext dm thread sessions", () => {
message: Record<string, unknown>,
params?: Pick<
Parameters<typeof buildTelegramMessageContextForTest>[0],
"resolveTelegramGroupConfig"
"cfg" | "resolveTelegramGroupConfig"
>,
) =>
await buildTelegramMessageContextForTest({
@@ -109,6 +109,75 @@ describe("buildTelegramMessageContext dm thread sessions", () => {
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:1234:42");
});
it("uses thread session key for DM topics when dm.threadReplies is inbound", async () => {
const ctx = await buildContext(
{
message_id: 1,
chat: { id: 1234, type: "private" },
date: 1700000000,
text: "hello",
message_thread_id: 42,
from: { id: 42, first_name: "Alice" },
},
{
cfg: {
agents: {
defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/openclaw" },
},
channels: {
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
dm: { threadReplies: "inbound" },
},
},
messages: { groupChat: { mentionPatterns: [] } },
},
},
);
expect(ctx).not.toBeNull();
expect(ctx?.ctxPayload?.MessageThreadId).toBe(42);
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:1234:42");
});
it("lets direct chat config opt one DM back into thread session keys", async () => {
const cfg = {
agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/openclaw" } },
channels: {
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
direct: {
"1234": {
threadReplies: "inbound",
},
},
},
},
messages: { groupChat: { mentionPatterns: [] } },
};
const ctx = await buildTelegramMessageContextForTest({
cfg,
message: {
message_id: 1,
chat: { id: 1234, type: "private" },
date: 1700000000,
text: "hello",
message_thread_id: 42,
from: { id: 42, first_name: "Alice" },
},
resolveTelegramGroupConfig: () => ({
groupConfig: { threadReplies: "inbound" },
topicConfig: undefined,
}),
});
expect(ctx).not.toBeNull();
expect(ctx?.ctxPayload?.MessageThreadId).toBe(42);
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:1234:42");
});
it("uses the main session key when no thread id", async () => {
const ctx = await buildContext({
message_id: 2,

View File

@@ -8,7 +8,7 @@ import type { TelegramDirectConfig, TelegramGroupConfig } from "openclaw/plugin-
import { deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing";
import { normalizeAccountId, resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolveDefaultTelegramAccountId } from "./accounts.js";
import { mergeTelegramAccountConfig, resolveDefaultTelegramAccountId } from "./accounts.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { firstDefined, normalizeAllowFrom, normalizeDmAllowFromWithStore } from "./bot-access.js";
import { resolveTelegramInboundBody } from "./bot-message-context.body.js";
@@ -225,6 +225,7 @@ export const buildTelegramMessageContext = async ({
const freshCfg =
loadFreshConfig?.() ??
(runtime?.getRuntimeConfig ?? (await loadTelegramMessageContextRuntime()).getRuntimeConfig)();
const telegramCfg = mergeTelegramAccountConfig(freshCfg, account.accountId);
let { route, configuredBinding, configuredBindingSessionKey } = resolveTelegramConversationRoute({
cfg: freshCfg,
accountId: account.accountId,
@@ -384,12 +385,14 @@ export const buildTelegramMessageContext = async ({
});
const useDmThreadSession = shouldUseTelegramDmThreadSession({
dmThreadId,
accountConfig: telegramCfg,
directConfig,
topicConfig,
});
const threadKeys = useDmThreadSession
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${chatId}:${dmThreadId}` })
: null;
const threadKeys =
useDmThreadSession && dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${chatId}:${dmThreadId}` })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
route = {
...route,

View File

@@ -888,16 +888,21 @@ export const registerTelegramNativeCommands = ({
senderId,
});
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
const threadKeys = shouldUseTelegramDmThreadSession({
dmThreadId,
directConfig: !isGroup ? (groupConfig as TelegramDirectConfig | undefined) : undefined,
topicConfig,
})
? (await resolveNativeCommandRuntime()).resolveThreadSessionKeys({
baseSessionKey,
threadId: `${chatId}:${dmThreadId}`,
})
: null;
const directConfig = !isGroup
? (groupConfig as TelegramDirectConfig | undefined)
: undefined;
const threadKeys =
shouldUseTelegramDmThreadSession({
dmThreadId,
accountConfig: runtimeTelegramCfg,
directConfig,
topicConfig,
}) && dmThreadId != null
? (await resolveNativeCommandRuntime()).resolveThreadSessionKeys({
baseSessionKey,
threadId: `${chatId}:${dmThreadId}`,
})
: null;
cachedTargetSessionKey = threadKeys?.sessionKey ?? baseSessionKey;
return cachedTargetSessionKey;
};

View File

@@ -1,8 +1,10 @@
import type { Chat, Message } from "@grammyjs/types";
import { formatLocationText } from "openclaw/plugin-sdk/channel-inbound";
import type {
TelegramAccountConfig,
TelegramDirectConfig,
TelegramGroupConfig,
TelegramDmThreadReplies,
TelegramTopicConfig,
} from "openclaw/plugin-sdk/config-types";
import { readChannelAllowFromStore } from "openclaw/plugin-sdk/conversation-runtime";
@@ -75,6 +77,36 @@ export type TelegramThreadSpec = {
scope: "dm" | "forum" | "none";
};
function normalizeTelegramDmThreadReplies(value: unknown): TelegramDmThreadReplies | undefined {
return value === "off" || value === "inbound" || value === "always" ? value : undefined;
}
export function resolveTelegramDmThreadReplies(params: {
accountConfig?: TelegramAccountConfig;
directConfig?: TelegramDirectConfig;
}): TelegramDmThreadReplies {
return (
normalizeTelegramDmThreadReplies(params.directConfig?.threadReplies) ??
normalizeTelegramDmThreadReplies(params.accountConfig?.dm?.threadReplies) ??
"off"
);
}
export function shouldUseTelegramDmThreadSession(params: {
dmThreadId?: number;
accountConfig?: TelegramAccountConfig;
directConfig?: TelegramDirectConfig;
topicConfig?: TelegramTopicConfig;
}): boolean {
if (params.dmThreadId == null) {
return false;
}
if (params.directConfig?.requireTopic === true || params.topicConfig) {
return true;
}
return resolveTelegramDmThreadReplies(params) !== "off";
}
export function extractTelegramForumFlag(value: unknown): boolean | undefined {
if (!value || typeof value !== "object" || !("is_forum" in value)) {
return undefined;

View File

@@ -66,6 +66,43 @@ describe("telegram custom commands schema", () => {
}
});
it("accepts DM thread reply policy overrides", () => {
const res = TelegramConfigSchema.safeParse({
dm: { threadReplies: "off" },
direct: {
"123456789": {
threadReplies: "inbound",
},
},
accounts: {
ops: {
dm: { threadReplies: "always" },
},
},
});
expect(res.success).toBe(true);
if (res.success) {
expect(res.data.dm?.threadReplies).toBe("off");
expect(res.data.direct?.["123456789"]?.threadReplies).toBe("inbound");
expect(res.data.accounts?.ops?.dm?.threadReplies).toBe("always");
}
});
it("rejects unknown DM thread reply policy values", () => {
expectTelegramConfigIssue({ dm: { threadReplies: "first" } }, "dm.threadReplies");
expectTelegramConfigIssue(
{
direct: {
"123456789": {
threadReplies: "first",
},
},
},
"direct.123456789.threadReplies",
);
});
it("rejects pollingStallThresholdMs outside the watchdog bounds", () => {
expectTelegramConfigIssue({ pollingStallThresholdMs: 29_999 }, "pollingStallThresholdMs");
expectTelegramConfigIssue({ pollingStallThresholdMs: 600_001 }, "pollingStallThresholdMs");

View File

@@ -17,6 +17,14 @@ export const telegramChannelConfigUiHints = {
label: "Telegram DM Policy",
help: 'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].',
},
"dm.threadReplies": {
label: "Telegram DM Thread Replies",
help: 'Controls whether Telegram DMs with message_thread_id use flat sessions ("off", default) or thread-scoped sessions ("inbound" or "always"). Thread IDs are still preserved for replies when sessions stay flat.',
},
"direct.*.threadReplies": {
label: "Telegram Per-DM Thread Replies",
help: 'Per-DM override for message_thread_id session threading. Use "inbound" only when a specific direct chat intentionally uses Telegram DM topics as separate sessions.',
},
configWrites: {
label: "Telegram Config Writes",
help: "Allow Telegram to write config in response to channel events/commands (default: true).",

View File

@@ -146,9 +146,26 @@ export async function loadChannelConfigSurfaceModule(
});
return jiti(resolvedPath) as Record<string, unknown>;
};
const loadFromPath = (
const loadViaNativeImport = async (candidatePath: string) => {
const imported = (await import(pathToFileURL(path.resolve(candidatePath)).href)) as Record<
string,
unknown
>;
return resolveConfigSchemaExport(imported);
};
const loadFromPath = async (
candidatePath: string,
): { schema: Record<string, unknown>; uiHints?: Record<string, unknown> } | null => {
): Promise<{ schema: Record<string, unknown>; uiHints?: Record<string, unknown> } | null> => {
try {
const resolved = await loadViaNativeImport(candidatePath);
if (resolved) {
return resolved;
}
} catch {
// Fall through to the compatibility loaders when the module needs custom
// plugin SDK aliasing or cannot be imported by the current Node loader.
}
try {
// Prefer the source-aware Jiti path so generated config metadata stays
// stable before and after build output exists in the repo.

View File

@@ -12962,6 +12962,16 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
},
],
},
dm: {
type: "object",
properties: {
threadReplies: {
type: "string",
enum: ["off", "inbound", "always"],
},
},
additionalProperties: false,
},
groups: {
type: "object",
propertyNames: {
@@ -13221,6 +13231,10 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
type: "string",
enum: ["pairing", "allowlist", "open", "disabled"],
},
threadReplies: {
type: "string",
enum: ["off", "inbound", "always"],
},
tools: {
type: "object",
properties: {
@@ -14010,6 +14024,16 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
},
],
},
dm: {
type: "object",
properties: {
threadReplies: {
type: "string",
enum: ["off", "inbound", "always"],
},
},
additionalProperties: false,
},
groups: {
type: "object",
propertyNames: {
@@ -14269,6 +14293,10 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
type: "string",
enum: ["pairing", "allowlist", "open", "disabled"],
},
threadReplies: {
type: "string",
enum: ["off", "inbound", "always"],
},
tools: {
type: "object",
properties: {
@@ -14864,6 +14892,14 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
label: "Telegram DM Policy",
help: 'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].',
},
"dm.threadReplies": {
label: "Telegram DM Thread Replies",
help: 'Controls whether Telegram DMs with message_thread_id use flat sessions ("off", default) or thread-scoped sessions ("inbound" or "always"). Thread IDs are still preserved for replies when sessions stay flat.',
},
"direct.*.threadReplies": {
label: "Telegram Per-DM Thread Replies",
help: 'Per-DM override for message_thread_id session threading. Use "inbound" only when a specific direct chat intentionally uses Telegram DM topics as separate sessions.',
},
configWrites: {
label: "Telegram Config Writes",
help: "Allow Telegram to write config in response to channel events/commands (default: true).",

View File

@@ -149,7 +149,7 @@ describe("loadChannelConfigSurfaceModule", () => {
it("falls back to bun when the source-aware loader fails", async () => {
await withTempDir({ prefix: "openclaw-config-surface-" }, async (repoRoot) => {
const { modulePath } = createDemoConfigSchemaModule(repoRoot);
const { modulePath } = createDemoConfigSchemaModule(repoRoot, ["export const = ;"]);
const {
loadChannelConfigSurfaceModule: loadWithFailingJiti,

View File

@@ -119,6 +119,8 @@ export type TelegramAccountConfig = {
tokenFile?: string;
/** Control reply threading when reply tags are present (off|first|all|batched). */
replyToMode?: ReplyToMode;
/** Direct-message threading behavior. Defaults to flat DM sessions. */
dm?: TelegramDmConfig;
groups?: Record<string, TelegramGroupConfig>;
/** Per-DM configuration for Telegram DM topics (key is chat ID). */
direct?: Record<string, TelegramDirectConfig>;
@@ -218,6 +220,13 @@ export type TelegramAccountConfig = {
autoTopicLabel?: AutoTopicLabelConfig;
};
export type TelegramDmThreadReplies = "off" | "inbound" | "always";
export type TelegramDmConfig = {
/** DM-only session threading override for message_thread_id (off|inbound|always). Default: off. */
threadReplies?: TelegramDmThreadReplies;
};
export type TelegramTopicConfig = {
requireMention?: boolean;
/** Emit internal message hooks for mention-skipped topic messages. */
@@ -290,6 +299,8 @@ export type TelegramDirectConfig = {
skills?: string[];
/** Per-topic configuration for DM topics (key is message_thread_id as string) */
topics?: Record<string, TelegramTopicConfig>;
/** Per-DM override for message_thread_id session threading. */
threadReplies?: TelegramDmThreadReplies;
/** If false, disable the bot for this DM (and its topics). */
enabled?: boolean;
/** If true, require messages to be from a topic when topics are enabled. */

View File

@@ -147,6 +147,14 @@ export const TelegramGroupSchema = z
})
.strict();
const TelegramDmThreadRepliesSchema = z.enum(["off", "inbound", "always"]);
const TelegramDmSchema = z
.object({
threadReplies: TelegramDmThreadRepliesSchema.optional(),
})
.strict();
const AutoTopicLabelSchema = z
.union([
z.boolean(),
@@ -170,6 +178,7 @@ export const TelegramDirectSchema = z
allowFrom: z.array(z.union([z.string(), z.number()])).optional(),
systemPrompt: z.string().optional(),
topics: z.record(z.string(), TelegramTopicSchema.optional()).optional(),
threadReplies: TelegramDmThreadRepliesSchema.optional(),
errorPolicy: TelegramErrorPolicySchema,
errorCooldownMs: z.number().int().nonnegative().optional(),
requireTopic: z.boolean().optional(),
@@ -229,6 +238,7 @@ export const TelegramAccountSchemaBase = z
botToken: SecretInputSchema.optional().register(sensitive),
tokenFile: z.string().optional(),
replyToMode: ReplyToModeSchema.optional(),
dm: TelegramDmSchema.optional(),
groups: z.record(z.string(), TelegramGroupSchema.optional()).optional(),
allowFrom: z.array(z.union([z.string(), z.number()])).optional(),
defaultTo: z.union([z.string(), z.number()]).optional(),