fix(config): normalize channel streaming config shape (#61381)

* feat(config): add canonical streaming config helpers

* refactor(runtime): prefer canonical streaming accessors

* feat(config): normalize preview channel streaming shape

* test(config): lock streaming normalization followups

* fix(config): polish streaming migration edges

* chore(config): refresh streaming baseline hash
This commit is contained in:
Vincent Koc
2026-04-06 05:08:20 +01:00
committed by GitHub
parent 93ddcb37de
commit 0fdf9e874b
48 changed files with 3012 additions and 705 deletions

View File

@@ -5,6 +5,7 @@ import {
removeAckReactionAfterReply,
} from "openclaw/plugin-sdk/channel-feedback";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import { resolveChannelStreamingBlockEnabled } from "openclaw/plugin-sdk/channel-streaming";
import type {
OpenClawConfig,
ReplyToMode,
@@ -199,9 +200,8 @@ export const dispatchTelegramMessage = async ({
parseMode: "HTML" as const,
});
const accountBlockStreamingEnabled =
typeof telegramCfg.blockStreaming === "boolean"
? telegramCfg.blockStreaming
: cfg.agents?.defaults?.blockStreamingDefault === "on";
resolveChannelStreamingBlockEnabled(telegramCfg) ??
cfg.agents?.defaults?.blockStreamingDefault === "on";
const resolvedReasoningLevel = resolveTelegramReasoningLevel({
cfg,
sessionKey: ctxPayload.SessionKey,
@@ -389,12 +389,13 @@ export const dispatchTelegramMessage = async ({
await lane.stream.flush();
};
const resolvedBlockStreamingEnabled = resolveChannelStreamingBlockEnabled(telegramCfg);
const disableBlockStreaming = !previewStreamingEnabled
? true
: forceBlockStreamingForReasoning
? false
: typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: typeof resolvedBlockStreamingEnabled === "boolean"
? !resolvedBlockStreamingEnabled
: canStreamAnswerDraft
? true
: undefined;

View File

@@ -8,6 +8,7 @@ import { pluginCommandMocks, resetPluginCommandMocks } from "./test-support/plug
let registerTelegramNativeCommands: typeof import("./bot-native-commands.js").registerTelegramNativeCommands;
let parseTelegramNativeCommandCallbackData: typeof import("./bot-native-commands.js").parseTelegramNativeCommandCallbackData;
let resolveTelegramNativeCommandDisableBlockStreaming: typeof import("./bot-native-commands.js").resolveTelegramNativeCommandDisableBlockStreaming;
import {
createCommandBot,
createNativeCommandTestParams,
@@ -22,8 +23,11 @@ import {
describe("registerTelegramNativeCommands", () => {
beforeAll(async () => {
({ registerTelegramNativeCommands, parseTelegramNativeCommandCallbackData } =
await import("./bot-native-commands.js"));
({
registerTelegramNativeCommands,
parseTelegramNativeCommandCallbackData,
resolveTelegramNativeCommandDisableBlockStreaming,
} = await import("./bot-native-commands.js"));
});
beforeEach(() => {
@@ -281,6 +285,27 @@ describe("registerTelegramNativeCommands", () => {
expect(sendMessage).not.toHaveBeenCalledWith(123, "Command not found.");
});
it("uses nested streaming.block.enabled for native command block-streaming behavior", () => {
expect(
resolveTelegramNativeCommandDisableBlockStreaming({
streaming: {
block: {
enabled: false,
},
},
} as TelegramAccountConfig),
).toBe(true);
expect(
resolveTelegramNativeCommandDisableBlockStreaming({
streaming: {
block: {
enabled: true,
},
},
} as TelegramAccountConfig),
).toBe(false);
});
it("uses plugin command metadata to send and edit a Telegram progress placeholder", async () => {
const { bot, commandHandlers, sendMessage, deleteMessage } = createCommandBot();

View File

@@ -1,4 +1,5 @@
import type { Bot, Context } from "grammy";
import { resolveChannelStreamingBlockEnabled } from "openclaw/plugin-sdk/channel-streaming";
import {
resolveCommandAuthorization,
resolveCommandAuthorizedFromAuthorizers,
@@ -213,6 +214,13 @@ export function parseTelegramNativeCommandCallbackData(data?: string | null): st
return commandText.startsWith("/") ? commandText : null;
}
export function resolveTelegramNativeCommandDisableBlockStreaming(
telegramCfg: TelegramAccountConfig,
): boolean | undefined {
const blockStreamingEnabled = resolveChannelStreamingBlockEnabled(telegramCfg);
return typeof blockStreamingEnabled === "boolean" ? !blockStreamingEnabled : undefined;
}
export type RegisterTelegramNativeCommandsParams = {
bot: Bot;
cfg: OpenClawConfig;
@@ -900,9 +908,7 @@ export const registerTelegramNativeCommands = ({
});
const disableBlockStreaming =
typeof runtimeTelegramCfg.blockStreaming === "boolean"
? !runtimeTelegramCfg.blockStreaming
: undefined;
resolveTelegramNativeCommandDisableBlockStreaming(runtimeTelegramCfg);
const deliveryState = {
delivered: false,
skippedNonSilent: 0,

View File

@@ -33,6 +33,34 @@ export const telegramChannelConfigUiHints = {
label: "Telegram Streaming Mode",
help: 'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" maps to "partial" on Telegram. Legacy boolean/streamMode keys are auto-mapped.',
},
"streaming.mode": {
label: "Telegram Streaming Mode",
help: 'Canonical Telegram preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" maps to "partial" on Telegram.',
},
"streaming.chunkMode": {
label: "Telegram Chunk Mode",
help: 'Chunking mode for outbound Telegram text delivery: "length" (default) or "newline".',
},
"streaming.block.enabled": {
label: "Telegram Block Streaming Enabled",
help: 'Enable chunked block-style Telegram preview delivery when channels.telegram.streaming.mode="block".',
},
"streaming.block.coalesce": {
label: "Telegram Block Streaming Coalesce",
help: "Merge streamed Telegram block replies before sending final delivery.",
},
"streaming.preview.chunk.minChars": {
label: "Telegram Draft Chunk Min Chars",
help: 'Minimum chars before emitting a Telegram block preview chunk when channels.telegram.streaming.mode="block".',
},
"streaming.preview.chunk.maxChars": {
label: "Telegram Draft Chunk Max Chars",
help: 'Target max size for a Telegram block preview chunk when channels.telegram.streaming.mode="block".',
},
"streaming.preview.chunk.breakPreference": {
label: "Telegram Draft Chunk Break Preference",
help: "Preferred breakpoints for Telegram draft chunks (paragraph | newline | sentence).",
},
"retry.attempts": {
label: "Telegram Retry Attempts",
help: "Max retry attempts for outbound Telegram API calls (default: 3).",

View File

@@ -11,43 +11,108 @@ function asObjectRecord(value: unknown): Record<string, unknown> | null {
: null;
}
function ensureNestedRecord(owner: Record<string, unknown>, key: string): Record<string, unknown> {
const existing = asObjectRecord(owner[key]);
if (existing) {
return { ...existing };
}
return {};
}
function normalizeTelegramStreamingAliases(params: {
entry: Record<string, unknown>;
pathPrefix: string;
changes: string[];
}): { entry: Record<string, unknown>; changed: boolean } {
let updated = params.entry;
const hadLegacyStreamMode = updated.streamMode !== undefined;
const beforeStreaming = updated.streaming;
const resolved = resolveTelegramPreviewStreamMode(updated);
const beforeStreaming = params.entry.streaming;
const hadLegacyStreamMode = params.entry.streamMode !== undefined;
const hasLegacyFlatFields =
params.entry.chunkMode !== undefined ||
params.entry.blockStreaming !== undefined ||
params.entry.draftChunk !== undefined ||
params.entry.blockStreamingCoalesce !== undefined;
const resolved = resolveTelegramPreviewStreamMode(params.entry);
const shouldNormalize =
hadLegacyStreamMode ||
typeof beforeStreaming === "boolean" ||
(typeof beforeStreaming === "string" && beforeStreaming !== resolved);
typeof beforeStreaming === "string" ||
hasLegacyFlatFields;
if (!shouldNormalize) {
return { entry: updated, changed: false };
return { entry: params.entry, changed: false };
}
let updated = { ...params.entry };
let changed = false;
if (beforeStreaming !== resolved) {
updated = { ...updated, streaming: resolved };
const streaming = ensureNestedRecord(updated, "streaming");
const block = ensureNestedRecord(streaming, "block");
const preview = ensureNestedRecord(streaming, "preview");
if (
(hadLegacyStreamMode ||
typeof beforeStreaming === "boolean" ||
typeof beforeStreaming === "string") &&
streaming.mode === undefined
) {
streaming.mode = resolved;
if (hadLegacyStreamMode) {
params.changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming.mode (${resolved}).`,
);
} else if (typeof beforeStreaming === "boolean") {
params.changes.push(
`Moved ${params.pathPrefix}.streaming (boolean) → ${params.pathPrefix}.streaming.mode (${resolved}).`,
);
} else if (typeof beforeStreaming === "string") {
params.changes.push(
`Moved ${params.pathPrefix}.streaming (scalar) → ${params.pathPrefix}.streaming.mode (${resolved}).`,
);
}
changed = true;
}
if (hadLegacyStreamMode) {
const { streamMode: _ignored, ...rest } = updated;
updated = rest;
delete updated.streamMode;
changed = true;
params.changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`,
);
}
if (typeof beforeStreaming === "boolean") {
params.changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`);
} else if (typeof beforeStreaming === "string" && beforeStreaming !== resolved) {
if (updated.chunkMode !== undefined && streaming.chunkMode === undefined) {
streaming.chunkMode = updated.chunkMode;
delete updated.chunkMode;
params.changes.push(
`Normalized ${params.pathPrefix}.streaming (${beforeStreaming}) → (${resolved}).`,
`Moved ${params.pathPrefix}.chunkMode → ${params.pathPrefix}.streaming.chunkMode.`,
);
changed = true;
}
if (updated.blockStreaming !== undefined && block.enabled === undefined) {
block.enabled = updated.blockStreaming;
delete updated.blockStreaming;
params.changes.push(
`Moved ${params.pathPrefix}.blockStreaming → ${params.pathPrefix}.streaming.block.enabled.`,
);
changed = true;
}
if (updated.draftChunk !== undefined && preview.chunk === undefined) {
preview.chunk = updated.draftChunk;
delete updated.draftChunk;
params.changes.push(
`Moved ${params.pathPrefix}.draftChunk → ${params.pathPrefix}.streaming.preview.chunk.`,
);
changed = true;
}
if (updated.blockStreamingCoalesce !== undefined && block.coalesce === undefined) {
block.coalesce = updated.blockStreamingCoalesce;
delete updated.blockStreamingCoalesce;
params.changes.push(
`Moved ${params.pathPrefix}.blockStreamingCoalesce → ${params.pathPrefix}.streaming.block.coalesce.`,
);
changed = true;
}
if (Object.keys(preview).length > 0) {
streaming.preview = preview;
}
if (Object.keys(block).length > 0) {
streaming.block = block;
}
updated.streaming = streaming;
return { entry: updated, changed };
}
@@ -59,8 +124,11 @@ function hasLegacyTelegramStreamingAliases(value: unknown): boolean {
return (
entry.streamMode !== undefined ||
typeof entry.streaming === "boolean" ||
(typeof entry.streaming === "string" &&
entry.streaming !== resolveTelegramPreviewStreamMode(entry))
typeof entry.streaming === "string" ||
entry.chunkMode !== undefined ||
entry.blockStreaming !== undefined ||
entry.draftChunk !== undefined ||
entry.blockStreamingCoalesce !== undefined
);
}
@@ -99,13 +167,13 @@ export const legacyConfigRules: ChannelDoctorLegacyConfigRule[] = [
{
path: ["channels", "telegram"],
message:
'channels.telegram.streamMode and boolean channels.telegram.streaming are legacy; use channels.telegram.streaming="off|partial|block".',
"channels.telegram.streamMode, channels.telegram.streaming (scalar), chunkMode, blockStreaming, draftChunk, and blockStreamingCoalesce are legacy; use channels.telegram.streaming.{mode,chunkMode,preview.chunk,block.enabled,block.coalesce}.",
match: hasLegacyTelegramStreamingAliases,
},
{
path: ["channels", "telegram", "accounts"],
message:
'channels.telegram.accounts.<id>.streamMode and boolean channels.telegram.accounts.<id>.streaming are legacy; use channels.telegram.accounts.<id>.streaming="off|partial|block".',
"channels.telegram.accounts.<id>.streamMode, streaming (scalar), chunkMode, blockStreaming, draftChunk, and blockStreamingCoalesce are legacy; use channels.telegram.accounts.<id>.streaming.{mode,chunkMode,preview.chunk,block.enabled,block.coalesce}.",
match: hasLegacyTelegramAccountStreamingAliases,
},
];

View File

@@ -6,6 +6,7 @@ import {
collectTelegramGroupPolicyWarnings,
maybeRepairTelegramAllowFromUsernames,
scanTelegramAllowFromUsernameEntries,
telegramDoctor,
} from "./doctor.js";
const resolveCommandSecretRefsViaGatewayMock = vi.hoisted(() => vi.fn());
@@ -66,6 +67,94 @@ describe("telegram doctor", () => {
lookupTelegramChatIdMock.mockReset();
});
it("normalizes legacy telegram streaming aliases into the nested streaming shape", () => {
const normalize = telegramDoctor.normalizeCompatibilityConfig;
expect(normalize).toBeDefined();
if (!normalize) {
return;
}
const result = normalize({
cfg: {
channels: {
telegram: {
streamMode: "block",
chunkMode: "newline",
blockStreaming: true,
draftChunk: {
minChars: 120,
},
accounts: {
work: {
streaming: false,
blockStreamingCoalesce: {
idleMs: 250,
},
},
},
},
},
} as never,
});
expect(result.config.channels?.telegram?.streaming).toEqual({
mode: "block",
chunkMode: "newline",
block: {
enabled: true,
},
preview: {
chunk: {
minChars: 120,
},
},
});
expect(result.config.channels?.telegram?.accounts?.work?.streaming).toEqual({
mode: "off",
block: {
coalesce: {
idleMs: 250,
},
},
});
expect(result.changes).toEqual(
expect.arrayContaining([
"Moved channels.telegram.streamMode → channels.telegram.streaming.mode (block).",
"Moved channels.telegram.chunkMode → channels.telegram.streaming.chunkMode.",
"Moved channels.telegram.blockStreaming → channels.telegram.streaming.block.enabled.",
"Moved channels.telegram.draftChunk → channels.telegram.streaming.preview.chunk.",
"Moved channels.telegram.accounts.work.streaming (boolean) → channels.telegram.accounts.work.streaming.mode (off).",
"Moved channels.telegram.accounts.work.blockStreamingCoalesce → channels.telegram.accounts.work.streaming.block.coalesce.",
]),
);
});
it("does not duplicate streaming.mode change messages when streamMode wins over boolean streaming", () => {
const normalize = telegramDoctor.normalizeCompatibilityConfig;
expect(normalize).toBeDefined();
if (!normalize) {
return;
}
const result = normalize({
cfg: {
channels: {
telegram: {
streamMode: "block",
streaming: false,
},
},
} as never,
});
expect(result.config.channels?.telegram?.streaming).toEqual({
mode: "block",
});
expect(
result.changes.filter((change) => change.includes("channels.telegram.streaming.mode")),
).toEqual(["Moved channels.telegram.streamMode → channels.telegram.streaming.mode (block)."]);
});
it("finds username allowFrom entries across scopes", () => {
const hits = scanTelegramAllowFromUsernameEntries({
channels: {

View File

@@ -27,6 +27,14 @@ function asObjectRecord(value: unknown): Record<string, unknown> | null {
: null;
}
function ensureNestedRecord(owner: Record<string, unknown>, key: string): Record<string, unknown> {
const existing = asObjectRecord(owner[key]);
if (existing) {
return { ...existing };
}
return {};
}
function sanitizeForLog(value: string): string {
return value.replace(/[\u0000-\u001f\u007f]+/g, " ").trim();
}
@@ -40,38 +48,95 @@ function normalizeTelegramStreamingAliases(params: {
pathPrefix: string;
changes: string[];
}): { entry: Record<string, unknown>; changed: boolean } {
let updated = params.entry;
const hadLegacyStreamMode = updated.streamMode !== undefined;
const beforeStreaming = updated.streaming;
const resolved = resolveTelegramPreviewStreamMode(updated);
const beforeStreaming = params.entry.streaming;
const hadLegacyStreamMode = params.entry.streamMode !== undefined;
const hasLegacyFlatFields =
params.entry.chunkMode !== undefined ||
params.entry.blockStreaming !== undefined ||
params.entry.draftChunk !== undefined ||
params.entry.blockStreamingCoalesce !== undefined;
const resolved = resolveTelegramPreviewStreamMode(params.entry);
const shouldNormalize =
hadLegacyStreamMode ||
typeof beforeStreaming === "boolean" ||
(typeof beforeStreaming === "string" && beforeStreaming !== resolved);
typeof beforeStreaming === "string" ||
hasLegacyFlatFields;
if (!shouldNormalize) {
return { entry: updated, changed: false };
return { entry: params.entry, changed: false };
}
let updated = { ...params.entry };
let changed = false;
if (beforeStreaming !== resolved) {
updated = { ...updated, streaming: resolved };
const streaming = ensureNestedRecord(updated, "streaming");
const block = ensureNestedRecord(streaming, "block");
const preview = ensureNestedRecord(streaming, "preview");
if (
(hadLegacyStreamMode ||
typeof beforeStreaming === "boolean" ||
typeof beforeStreaming === "string") &&
streaming.mode === undefined
) {
streaming.mode = resolved;
if (hadLegacyStreamMode) {
params.changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming.mode (${resolved}).`,
);
} else if (typeof beforeStreaming === "boolean") {
params.changes.push(
`Moved ${params.pathPrefix}.streaming (boolean) → ${params.pathPrefix}.streaming.mode (${resolved}).`,
);
} else if (typeof beforeStreaming === "string") {
params.changes.push(
`Moved ${params.pathPrefix}.streaming (scalar) → ${params.pathPrefix}.streaming.mode (${resolved}).`,
);
}
changed = true;
}
if (hadLegacyStreamMode) {
const { streamMode: _ignored, ...rest } = updated;
updated = rest;
delete updated.streamMode;
changed = true;
params.changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`,
);
}
if (typeof beforeStreaming === "boolean") {
params.changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`);
} else if (typeof beforeStreaming === "string" && beforeStreaming !== resolved) {
if (updated.chunkMode !== undefined && streaming.chunkMode === undefined) {
streaming.chunkMode = updated.chunkMode;
delete updated.chunkMode;
params.changes.push(
`Normalized ${params.pathPrefix}.streaming (${beforeStreaming}) → (${resolved}).`,
`Moved ${params.pathPrefix}.chunkMode → ${params.pathPrefix}.streaming.chunkMode.`,
);
changed = true;
}
if (updated.blockStreaming !== undefined && block.enabled === undefined) {
block.enabled = updated.blockStreaming;
delete updated.blockStreaming;
params.changes.push(
`Moved ${params.pathPrefix}.blockStreaming → ${params.pathPrefix}.streaming.block.enabled.`,
);
changed = true;
}
if (updated.draftChunk !== undefined && preview.chunk === undefined) {
preview.chunk = updated.draftChunk;
delete updated.draftChunk;
params.changes.push(
`Moved ${params.pathPrefix}.draftChunk → ${params.pathPrefix}.streaming.preview.chunk.`,
);
changed = true;
}
if (updated.blockStreamingCoalesce !== undefined && block.coalesce === undefined) {
block.coalesce = updated.blockStreamingCoalesce;
delete updated.blockStreamingCoalesce;
params.changes.push(
`Moved ${params.pathPrefix}.blockStreamingCoalesce → ${params.pathPrefix}.streaming.block.coalesce.`,
);
changed = true;
}
if (Object.keys(preview).length > 0) {
streaming.preview = preview;
}
if (Object.keys(block).length > 0) {
streaming.block = block;
}
updated.streaming = streaming;
return { entry: updated, changed };
}
@@ -458,8 +523,11 @@ function hasLegacyTelegramStreamingAliases(value: unknown): boolean {
return (
entry.streamMode !== undefined ||
typeof entry.streaming === "boolean" ||
(typeof entry.streaming === "string" &&
entry.streaming !== resolveTelegramPreviewStreamMode(entry))
typeof entry.streaming === "string" ||
entry.chunkMode !== undefined ||
entry.blockStreaming !== undefined ||
entry.draftChunk !== undefined ||
entry.blockStreamingCoalesce !== undefined
);
}
@@ -475,13 +543,13 @@ const TELEGRAM_LEGACY_CONFIG_RULES: ChannelDoctorLegacyConfigRule[] = [
{
path: ["channels", "telegram"],
message:
'channels.telegram.streamMode and boolean channels.telegram.streaming are legacy; use channels.telegram.streaming="off|partial|block".',
"channels.telegram.streamMode, channels.telegram.streaming (scalar), chunkMode, blockStreaming, draftChunk, and blockStreamingCoalesce are legacy; use channels.telegram.streaming.{mode,chunkMode,preview.chunk,block.enabled,block.coalesce}.",
match: hasLegacyTelegramStreamingAliases,
},
{
path: ["channels", "telegram", "accounts"],
message:
'channels.telegram.accounts.<id>.streamMode and boolean channels.telegram.accounts.<id>.streaming are legacy; use channels.telegram.accounts.<id>.streaming="off|partial|block".',
"channels.telegram.accounts.<id>.streamMode, streaming (scalar), chunkMode, blockStreaming, draftChunk, and blockStreamingCoalesce are legacy; use channels.telegram.accounts.<id>.streaming.{mode,chunkMode,preview.chunk,block.enabled,block.coalesce}.",
match: hasLegacyTelegramAccountStreamingAliases,
},
];

View File

@@ -1,3 +1,4 @@
import { resolveChannelStreamingPreviewChunk } from "openclaw/plugin-sdk/channel-streaming";
import { type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-chunking";
import { resolveAccountEntry } from "openclaw/plugin-sdk/routing";
@@ -20,7 +21,11 @@ export function resolveTelegramDraftStreamingChunking(
});
const normalizedAccountId = normalizeAccountId(accountId);
const accountCfg = resolveAccountEntry(cfg?.channels?.telegram?.accounts, normalizedAccountId);
const draftCfg = accountCfg?.draftChunk ?? cfg?.channels?.telegram?.draftChunk;
const draftCfg =
resolveChannelStreamingPreviewChunk(accountCfg) ??
resolveChannelStreamingPreviewChunk(cfg?.channels?.telegram) ??
accountCfg?.draftChunk ??
cfg?.channels?.telegram?.draftChunk;
const maxRequested = Math.max(
1,

View File

@@ -1,3 +1,5 @@
import { getChannelStreamingConfigObject } from "openclaw/plugin-sdk/channel-streaming";
export type TelegramPreviewStreamMode = "off" | "partial" | "block";
function normalizeStreamingMode(value: unknown): string | null {
@@ -35,7 +37,9 @@ export function resolveTelegramPreviewStreamMode(
streaming?: unknown;
} = {},
): TelegramPreviewStreamMode {
const parsedStreaming = parseStreamingMode(params.streaming);
const parsedStreaming = parseStreamingMode(
getChannelStreamingConfigObject(params)?.mode ?? params.streaming,
);
if (parsedStreaming) {
if (parsedStreaming === "progress") {
return "partial";