mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-05 18:50:24 +00:00
refactor: eliminate remaining duplicate blocks across draft streams and tests
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import type { Bot } from "grammy";
|
||||
import { createDraftStreamLoop } from "../channels/draft-stream-loop.js";
|
||||
import { createFinalizableDraftLifecycle } from "../channels/draft-stream-controls.js";
|
||||
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
|
||||
|
||||
const TELEGRAM_STREAM_MAX_CHARS = 4096;
|
||||
@@ -55,16 +55,15 @@ export function createTelegramDraftStream(params: {
|
||||
? { ...threadParams, reply_to_message_id: params.replyToMessageId }
|
||||
: threadParams;
|
||||
|
||||
const streamState = { stopped: false, final: false };
|
||||
let streamMessageId: number | undefined;
|
||||
let lastSentText = "";
|
||||
let lastSentParseMode: "HTML" | undefined;
|
||||
let stopped = false;
|
||||
let isFinal = false;
|
||||
let generation = 0;
|
||||
|
||||
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
|
||||
// Allow final flush even if stopped (e.g., after clear()).
|
||||
if (stopped && !isFinal) {
|
||||
if (streamState.stopped && !streamState.final) {
|
||||
return false;
|
||||
}
|
||||
const trimmed = text.trimEnd();
|
||||
@@ -80,7 +79,7 @@ export function createTelegramDraftStream(params: {
|
||||
if (renderedText.length > maxChars) {
|
||||
// Telegram text messages/edits cap at 4096 chars.
|
||||
// Stop streaming once we exceed the cap to avoid repeated API failures.
|
||||
stopped = true;
|
||||
streamState.stopped = true;
|
||||
params.warn?.(
|
||||
`telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`,
|
||||
);
|
||||
@@ -92,7 +91,7 @@ export function createTelegramDraftStream(params: {
|
||||
const sendGeneration = generation;
|
||||
|
||||
// Debounce first preview send for better push notification quality.
|
||||
if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) {
|
||||
if (typeof streamMessageId !== "number" && minInitialChars != null && !streamState.final) {
|
||||
if (renderedText.length < minInitialChars) {
|
||||
return false;
|
||||
}
|
||||
@@ -120,7 +119,7 @@ export function createTelegramDraftStream(params: {
|
||||
const sent = await params.api.sendMessage(chatId, renderedText, sendParams);
|
||||
const sentMessageId = sent?.message_id;
|
||||
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
|
||||
stopped = true;
|
||||
streamState.stopped = true;
|
||||
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
|
||||
return false;
|
||||
}
|
||||
@@ -136,7 +135,7 @@ export function createTelegramDraftStream(params: {
|
||||
streamMessageId = normalizedMessageId;
|
||||
return true;
|
||||
} catch (err) {
|
||||
stopped = true;
|
||||
streamState.stopped = true;
|
||||
params.warn?.(
|
||||
`telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
@@ -144,42 +143,23 @@ export function createTelegramDraftStream(params: {
|
||||
}
|
||||
};
|
||||
|
||||
const loop = createDraftStreamLoop({
|
||||
const { loop, update, stop, clear } = createFinalizableDraftLifecycle({
|
||||
throttleMs,
|
||||
isStopped: () => stopped,
|
||||
state: streamState,
|
||||
sendOrEditStreamMessage,
|
||||
});
|
||||
|
||||
const update = (text: string) => {
|
||||
if (stopped || isFinal) {
|
||||
return;
|
||||
}
|
||||
loop.update(text);
|
||||
};
|
||||
|
||||
const stop = async (): Promise<void> => {
|
||||
isFinal = true;
|
||||
await loop.flush();
|
||||
};
|
||||
|
||||
const clear = async () => {
|
||||
stopped = true;
|
||||
loop.stop();
|
||||
await loop.waitForInFlight();
|
||||
const messageId = streamMessageId;
|
||||
streamMessageId = undefined;
|
||||
if (typeof messageId !== "number") {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await params.api.deleteMessage(chatId, messageId);
|
||||
readMessageId: () => streamMessageId,
|
||||
clearMessageId: () => {
|
||||
streamMessageId = undefined;
|
||||
},
|
||||
isValidMessageId: (value): value is number =>
|
||||
typeof value === "number" && Number.isFinite(value),
|
||||
deleteMessage: (messageId) => params.api.deleteMessage(chatId, messageId),
|
||||
onDeleteSuccess: (messageId) => {
|
||||
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
|
||||
} catch (err) {
|
||||
params.warn?.(
|
||||
`telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
},
|
||||
warn: params.warn,
|
||||
warnPrefix: "telegram stream preview cleanup failed",
|
||||
});
|
||||
|
||||
const forceNewMessage = () => {
|
||||
generation += 1;
|
||||
|
||||
Reference in New Issue
Block a user