fix: fence Telegram stale reply delivery after abort (#68100) (thanks @rubencu)

* fix(telegram): fence stale reply delivery after abort

* refactor(telegram): narrow abort fence scope

* fix(telegram): ignore stale reply finalization after abort

* fix(telegram): close abort supersession races

* fix(telegram): release abort fences on setup errors

* fix(telegram): discard superseded draft cleanup

* refactor(telegram): distill abort fence cleanup

* fix: fence Telegram stale reply delivery after abort (#68100) (thanks @rubencu)

---------

Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
Rubén Cuevas
2026-04-18 00:32:38 -04:00
committed by GitHub
parent 2c3542e315
commit 996eb9a024
6 changed files with 1301 additions and 539 deletions

View File

@@ -58,6 +58,14 @@ const wasSentByBot = vi.hoisted(() => vi.fn(() => false));
const loadSessionStore = vi.hoisted(() => vi.fn());
const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json"));
const generateTopicLabel = vi.hoisted(() => vi.fn());
const describeStickerImage = vi.hoisted(() => vi.fn(async () => null));
const loadModelCatalog = vi.hoisted(() => vi.fn(async () => ({})));
const findModelInCatalog = vi.hoisted(() => vi.fn(() => null));
const modelSupportsVision = vi.hoisted(() => vi.fn(() => false));
const resolveAgentDir = vi.hoisted(() => vi.fn(() => "/tmp/agent"));
const resolveDefaultModelForAgent = vi.hoisted(() =>
vi.fn(() => ({ provider: "openai", model: "gpt-test" })),
);
vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
@@ -95,16 +103,26 @@ vi.mock("./bot-message-dispatch.runtime.js", () => ({
resolveStorePath,
}));
vi.mock("./bot-message-dispatch.agent.runtime.js", () => ({
findModelInCatalog,
loadModelCatalog,
modelSupportsVision,
resolveAgentDir,
resolveDefaultModelForAgent,
}));
vi.mock("./sticker-cache.js", () => ({
cacheSticker: vi.fn(),
getCachedSticker: () => null,
getCacheStats: () => ({ count: 0 }),
searchStickers: () => [],
getAllCachedStickers: () => [],
describeStickerImage: vi.fn(),
describeStickerImage,
}));
let dispatchTelegramMessage: typeof import("./bot-message-dispatch.js").dispatchTelegramMessage;
let getTelegramAbortFenceSizeForTests: typeof import("./bot-message-dispatch.js").getTelegramAbortFenceSizeForTests;
let resetTelegramAbortFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramAbortFenceForTests;
const telegramDepsForTest: TelegramBotDeps = {
loadConfig: loadConfig as TelegramBotDeps["loadConfig"],
@@ -135,10 +153,15 @@ describe("dispatchTelegramMessage draft streaming", () => {
type TelegramMessageContext = Parameters<typeof dispatchTelegramMessage>[0]["context"];
beforeAll(async () => {
({ dispatchTelegramMessage } = await import("./bot-message-dispatch.js"));
({
dispatchTelegramMessage,
getTelegramAbortFenceSizeForTests,
resetTelegramAbortFenceForTests,
} = await import("./bot-message-dispatch.js"));
});
beforeEach(() => {
resetTelegramAbortFenceForTests();
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
@@ -162,6 +185,12 @@ describe("dispatchTelegramMessage draft streaming", () => {
loadSessionStore.mockReset();
resolveStorePath.mockReset();
generateTopicLabel.mockReset();
describeStickerImage.mockReset();
loadModelCatalog.mockReset();
findModelInCatalog.mockReset();
modelSupportsVision.mockReset();
resolveAgentDir.mockReset();
resolveDefaultModelForAgent.mockReset();
loadConfig.mockReturnValue({});
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
@@ -199,6 +228,15 @@ describe("dispatchTelegramMessage draft streaming", () => {
resolveStorePath.mockReturnValue("/tmp/sessions.json");
loadSessionStore.mockReturnValue({});
generateTopicLabel.mockResolvedValue("Topic label");
describeStickerImage.mockResolvedValue(null);
loadModelCatalog.mockResolvedValue({});
findModelInCatalog.mockReturnValue(null);
modelSupportsVision.mockReturnValue(false);
resolveAgentDir.mockReturnValue("/tmp/agent");
resolveDefaultModelForAgent.mockReturnValue({
provider: "openai",
model: "gpt-test",
});
});
const createDraftStream = (messageId?: number) => createTestDraftStream({ messageId });
@@ -2683,6 +2721,608 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftB.clear).toHaveBeenCalledTimes(1);
});
it("ignores stale answer finalization after an abort dispatch supersedes the same session", async () => {
let releaseFirstFinal!: () => void;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolvePreviewVisible!: () => void;
const previewVisible = new Promise<void>((resolve) => {
resolvePreviewVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
resolvePreviewVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const abortAnswerDraft = createDraftStream();
const abortReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => abortAnswerDraft)
.mockImplementationOnce(() => abortReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await previewVisible;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseFirstFinal();
await Promise.all([firstPromise, abortPromise]);
expect(editMessageTelegram).not.toHaveBeenCalledWith(
123,
1001,
"Old reply final",
expect.any(Object),
);
expect(firstAnswerDraft.clear).not.toHaveBeenCalled();
});
it("discards hidden short partials instead of flushing a stale preview after abort", async () => {
let releaseFirstCleanup!: () => void;
const firstCleanupGate = new Promise<void>((resolve) => {
releaseFirstCleanup = resolve;
});
let resolveShortPartialQueued!: () => void;
const shortPartialQueued = new Promise<void>((resolve) => {
resolveShortPartialQueued = resolve;
});
const firstAnswerDraft = createTestDraftStream({
onUpdate: (text) => {
if (text === "tiny") {
resolveShortPartialQueued();
}
},
onStop: () => {
throw new Error("superseded cleanup should discard instead of stop");
},
});
const firstReasoningDraft = createDraftStream();
const abortAnswerDraft = createDraftStream();
const abortReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => abortAnswerDraft)
.mockImplementationOnce(() => abortReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "tiny" });
await firstCleanupGate;
return { queuedFinal: false };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await shortPartialQueued;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseFirstCleanup();
await Promise.all([firstPromise, abortPromise]);
expect(firstAnswerDraft.discard).toHaveBeenCalledTimes(1);
expect(firstAnswerDraft.stop).not.toHaveBeenCalled();
expect(firstAnswerDraft.clear).not.toHaveBeenCalled();
});
it("suppresses stale replies when abort lands during async pre-dispatch work", async () => {
let releaseCatalogLoad!: () => void;
const catalogLoadGate = new Promise<Record<string, never>>((resolve) => {
releaseCatalogLoad = () => resolve({});
});
let resolveCatalogLoadStarted!: () => void;
const catalogLoadStarted = new Promise<void>((resolve) => {
resolveCatalogLoadStarted = resolve;
});
loadModelCatalog.mockImplementationOnce(async () => {
resolveCatalogLoadStarted();
return await catalogLoadGate;
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ ctx, dispatcherOptions }) => {
if (ctx.CommandBody === "abort") {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
}
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
MediaPath: "/tmp/sticker.png",
Sticker: {
fileId: "file-id",
fileUniqueId: "file-unique-id",
},
} as never,
}),
});
await catalogLoadStarted;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseCatalogLoad();
await Promise.all([firstPromise, abortPromise]);
expect(deliverReplies).not.toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "Old reply final" }],
}),
);
});
it("releases the abort fence when pre-dispatch setup throws", async () => {
describeStickerImage.mockRejectedValueOnce(new Error("sticker setup failed"));
await expect(
dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
MediaPath: "/tmp/sticker.png",
Sticker: {
fileId: "file-id",
fileUniqueId: "file-unique-id",
},
} as never,
}),
}),
).rejects.toThrow("sticker setup failed");
expect(getTelegramAbortFenceSizeForTests()).toBe(0);
});
it("keeps older answer finalization when abort targets a different session", async () => {
let releaseFirstFinal!: () => void;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolvePreviewVisible!: () => void;
const previewVisible = new Promise<void>((resolve) => {
resolvePreviewVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
resolvePreviewVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const abortAnswerDraft = createDraftStream();
const abortReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => abortAnswerDraft)
.mockImplementationOnce(() => abortReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await previewVisible;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s2",
CommandTargetSessionKey: "s2",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseFirstFinal();
await Promise.all([firstPromise, abortPromise]);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
1001,
"Old reply final",
expect.any(Object),
);
});
it("finalizes stale status reactions when an abort supersedes the same session", async () => {
let releaseFirstFinal!: () => void;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolvePreviewVisible!: () => void;
const previewVisible = new Promise<void>((resolve) => {
resolvePreviewVisible = resolve;
});
const statusReactionController = {
setQueued: vi.fn(),
setThinking: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
cancelPending: vi.fn(),
setError: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
};
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
resolvePreviewVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const abortAnswerDraft = createDraftStream();
const abortReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => abortAnswerDraft)
.mockImplementationOnce(() => abortReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
const firstPromise = dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await previewVisible;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseFirstFinal();
await Promise.all([firstPromise, abortPromise]);
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.setError).not.toHaveBeenCalled();
});
it("keeps an existing preview when abort arrives during queued draft-lane cleanup", async () => {
let releaseMaterialize!: () => void;
const materializeGate = new Promise<void>((resolve) => {
releaseMaterialize = resolve;
});
let resolveMaterializeStarted!: () => void;
const materializeStarted = new Promise<void>((resolve) => {
resolveMaterializeStarted = resolve;
});
let resolvePreviewVisible!: () => void;
const previewVisible = new Promise<void>((resolve) => {
resolvePreviewVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
clearMessageIdOnForceNew: true,
onUpdate: (text) => {
if (text === "Old reply partial") {
resolvePreviewVisible();
}
},
});
firstAnswerDraft.materialize.mockImplementation(async () => {
resolveMaterializeStarted();
await materializeGate;
return 1001;
});
const firstReasoningDraft = createDraftStream();
const abortAnswerDraft = createDraftStream();
const abortReasoningDraft = createDraftStream();
const bot = createBot();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => abortAnswerDraft)
.mockImplementationOnce(() => abortReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
void replyOptions?.onAssistantMessageStart?.();
return { queuedFinal: false };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
bot,
});
await previewVisible;
await materializeStarted;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
bot,
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseMaterialize();
await Promise.all([firstPromise, abortPromise]);
expect(firstAnswerDraft.clear).not.toHaveBeenCalled();
expect(bot.api.deleteMessage as ReturnType<typeof vi.fn>).not.toHaveBeenCalledWith(123, 1001);
});
it("ignores stale answer finalization when abort targets the session via CommandTargetSessionKey", async () => {
let releaseFirstFinal!: () => void;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolvePreviewVisible!: () => void;
const previewVisible = new Promise<void>((resolve) => {
resolvePreviewVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
resolvePreviewVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const abortAnswerDraft = createDraftStream();
const abortReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => abortAnswerDraft)
.mockImplementationOnce(() => abortReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await previewVisible;
const abortPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "telegram:123:control",
CommandTargetSessionKey: "s1",
Body: "abort",
RawBody: "abort",
CommandBody: "abort",
} as never,
}),
});
await vi.waitFor(() => {
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [{ text: "⚙️ Agent was aborted." }],
}),
);
});
releaseFirstFinal();
await Promise.all([firstPromise, abortPromise]);
expect(editMessageTelegram).not.toHaveBeenCalledWith(
123,
1001,
"Old reply final",
expect.any(Object),
);
expect(firstAnswerDraft.clear).not.toHaveBeenCalled();
});
it("swallows post-connect network timeout on preview edit to prevent duplicate messages", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);

File diff suppressed because it is too large Load Diff

View File

@@ -11,6 +11,7 @@ export type TestDraftStream = {
lastDeliveredText: ReturnType<typeof vi.fn<() => string>>;
clear: ReturnType<typeof vi.fn<() => Promise<void>>>;
stop: ReturnType<typeof vi.fn<() => Promise<void>>>;
discard: ReturnType<typeof vi.fn<() => Promise<void>>>;
materialize: ReturnType<typeof vi.fn<() => Promise<number | undefined>>>;
forceNewMessage: ReturnType<typeof vi.fn<() => void>>;
sendMayHaveLanded: ReturnType<typeof vi.fn<() => boolean>>;
@@ -22,6 +23,7 @@ export function createTestDraftStream(params?: {
previewMode?: DraftPreviewMode;
onUpdate?: (text: string) => void;
onStop?: () => void | Promise<void>;
onDiscard?: () => void | Promise<void>;
clearMessageIdOnForceNew?: boolean;
}): TestDraftStream {
let messageId = params?.messageId;
@@ -42,6 +44,9 @@ export function createTestDraftStream(params?: {
stop: vi.fn().mockImplementation(async () => {
await params?.onStop?.();
}),
discard: vi.fn().mockImplementation(async () => {
await params?.onDiscard?.();
}),
materialize: vi.fn().mockImplementation(async () => messageId),
forceNewMessage: vi.fn().mockImplementation(() => {
if (params?.clearMessageIdOnForceNew) {
@@ -75,6 +80,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
discard: vi.fn().mockResolvedValue(undefined),
materialize: vi.fn().mockImplementation(async () => activeMessageId),
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;

View File

@@ -624,17 +624,28 @@ describe("draft stream initial message debounce", () => {
const api = createMockApi();
const stream = createDebouncedStream(api);
stream.update("Processing"); // 10 chars, below 30
stream.update("Processing");
await stream.flush();
expect(api.sendMessage).not.toHaveBeenCalled();
});
it("does not send a first message when discard() supersedes a short partial", async () => {
const api = createMockApi();
const stream = createDebouncedStream(api);
stream.update("Processing");
await stream.discard?.();
await stream.flush();
expect(api.sendMessage).not.toHaveBeenCalled();
expect(api.editMessageText).not.toHaveBeenCalled();
});
it("sends first message when reaching threshold", async () => {
const api = createMockApi();
const stream = createDebouncedStream(api);
// Exactly 30 chars
stream.update("I am processing your request..");
await stream.flush();
@@ -645,7 +656,7 @@ describe("draft stream initial message debounce", () => {
const api = createMockApi();
const stream = createDebouncedStream(api);
stream.update("I am processing your request, please wait a moment"); // 50 chars
stream.update("I am processing your request, please wait a moment");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalled();
@@ -657,17 +668,15 @@ describe("draft stream initial message debounce", () => {
const api = createMockApi();
const stream = createDebouncedStream(api);
// First message at threshold (30 chars)
stream.update("I am processing your request..");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(1);
// Subsequent updates should edit, not wait for threshold
stream.update("I am processing your request.. and summarizing");
await stream.flush();
expect(api.editMessageText).toHaveBeenCalled();
expect(api.sendMessage).toHaveBeenCalledTimes(1); // still only 1 send
expect(api.sendMessage).toHaveBeenCalledTimes(1);
});
});
@@ -677,7 +686,6 @@ describe("draft stream initial message debounce", () => {
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
// no minInitialChars (backward-compatible behavior)
});
stream.update("Hi");

View File

@@ -1,5 +1,8 @@
import type { Bot } from "grammy";
import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle";
import {
clearFinalizableDraftMessage,
createFinalizableDraftStreamControlsForState,
} from "openclaw/plugin-sdk/channel-lifecycle";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js";
@@ -96,6 +99,8 @@ export type TelegramDraftStream = {
lastDeliveredText?: () => string;
clear: () => Promise<void>;
stop: () => Promise<void>;
/** Stop without a final flush or delete. */
discard?: () => Promise<void>;
/** Convert the current draft preview into a permanent message (sendMessage). */
materialize?: () => Promise<number | undefined>;
/** Reset internal state so the next update creates a new message instead of editing. */
@@ -240,9 +245,6 @@ export function createTelegramDraftStream(params: {
"telegram stream preview send failed with message_thread_id, retrying without thread",
}));
} catch (err) {
// Pre-connect failures (DNS, refused) and explicit Telegram rejections (4xx)
// guarantee the message was never delivered — clear the flag so
// sendMayHaveLanded() doesn't suppress fallback.
if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) {
messageSendAttempted = false;
}
@@ -288,7 +290,6 @@ export function createTelegramDraftStream(params: {
};
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
// Allow final flush even if stopped (e.g., after clear()).
if (streamState.stopped && !streamState.final) {
return false;
}
@@ -303,8 +304,6 @@ export function createTelegramDraftStream(params: {
return false;
}
if (renderedText.length > maxChars) {
// Telegram text messages/edits cap at 4096 chars.
// Stop streaming once we exceed the cap to avoid repeated API failures.
streamState.stopped = true;
params.warn?.(
`telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`,
@@ -316,7 +315,6 @@ export function createTelegramDraftStream(params: {
}
const sendGeneration = generation;
// Debounce first preview send for better push notification quality.
if (typeof streamMessageId !== "number" && minInitialChars != null && !streamState.final) {
if (renderedText.length < minInitialChars) {
return false;
@@ -368,29 +366,37 @@ export function createTelegramDraftStream(params: {
}
};
const { loop, update, stop, clear } = createFinalizableDraftLifecycle({
const { loop, update, stop, stopForClear } = createFinalizableDraftStreamControlsForState({
throttleMs,
state: streamState,
sendOrEditStreamMessage,
readMessageId: () => streamMessageId,
clearMessageId: () => {
streamMessageId = undefined;
},
isValidMessageId: (value): value is number =>
typeof value === "number" && Number.isFinite(value),
deleteMessage: async (messageId) => {
await params.api.deleteMessage(chatId, messageId);
},
onDeleteSuccess: (messageId) => {
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
},
warn: params.warn,
warnPrefix: "telegram stream preview cleanup failed",
});
const clear = async () => {
await clearFinalizableDraftMessage({
stopForClear,
readMessageId: () => streamMessageId,
clearMessageId: () => {
streamMessageId = undefined;
},
isValidMessageId: (value): value is number =>
typeof value === "number" && Number.isFinite(value),
deleteMessage: async (messageId) => {
await params.api.deleteMessage(chatId, messageId);
},
onDeleteSuccess: (messageId) => {
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
},
warn: params.warn,
warnPrefix: "telegram stream preview cleanup failed",
});
};
const discard = async () => {
await stopForClear();
};
const forceNewMessage = () => {
// Boundary rotation may call stop() to finalize the previous draft.
// Re-open the stream lifecycle for the next assistant segment.
streamState.final = false;
generation += 1;
messageSendAttempted = false;
@@ -404,20 +410,11 @@ export function createTelegramDraftStream(params: {
loop.resetThrottleWindow();
};
/**
* Materialize the current draft into a permanent message.
* For draft transport: sends the accumulated text as a real sendMessage.
* For message transport: the message is already permanent (noop).
* Returns the permanent message id, or undefined if nothing to materialize.
*/
const materialize = async (): Promise<number | undefined> => {
await stop();
// If using message transport, the streamMessageId is already a real message.
if (previewTransport === "message" && typeof streamMessageId === "number") {
return streamMessageId;
}
// For draft transport, use the rendered snapshot first so parse_mode stays
// aligned with the text being materialized.
const renderedText = lastSentText || lastDeliveredText;
if (!renderedText) {
return undefined;
@@ -433,8 +430,6 @@ export function createTelegramDraftStream(params: {
const sentId = sent?.message_id;
if (typeof sentId === "number" && Number.isFinite(sentId)) {
streamMessageId = Math.trunc(sentId);
// Clear the draft so Telegram's input area doesn't briefly show a
// stale copy alongside the newly materialized real message.
if (resolvedDraftApi != null && streamDraftId != null) {
const clearDraftId = streamDraftId;
const clearThreadParams =
@@ -443,9 +438,7 @@ export function createTelegramDraftStream(params: {
: undefined;
try {
await resolvedDraftApi(chatId, clearDraftId, "", clearThreadParams);
} catch {
// Best-effort cleanup; draft clear failure is cosmetic.
}
} catch {}
}
return streamMessageId;
}
@@ -466,6 +459,7 @@ export function createTelegramDraftStream(params: {
lastDeliveredText: () => lastDeliveredText,
clear,
stop,
discard,
materialize,
forceNewMessage,
sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",