mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix: unify telegram streaming answer delivery
This commit is contained in:
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.
|
||||
- Telegram/streaming: keep one answer preview lane per inbound turn and never send a replacement final text bubble when preview finalization edits fail, fixing duplicate split replies during streamed answers.
|
||||
|
||||
## 2026.3.8
|
||||
|
||||
|
||||
@@ -405,7 +405,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
});
|
||||
|
||||
it.each(["block", "partial"] as const)(
|
||||
"forces new message when assistant message restarts (%s mode)",
|
||||
"keeps one answer preview when assistant message restarts (%s mode)",
|
||||
async (streamMode) => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
@@ -419,146 +419,22 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode });
|
||||
|
||||
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(
|
||||
123,
|
||||
999,
|
||||
"First response After tool call",
|
||||
expect.any(Object),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it("materializes boundary preview and keeps it when no matching final arrives", async () => {
|
||||
const answerDraftStream = createDraftStream(999);
|
||||
answerDraftStream.materialize.mockResolvedValue(4321);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Before tool boundary" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
return { queuedFinal: false };
|
||||
});
|
||||
|
||||
const bot = createBot();
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
|
||||
|
||||
expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1);
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
|
||||
const deleteMessageCalls = (
|
||||
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
|
||||
).deleteMessage.mock.calls;
|
||||
expect(deleteMessageCalls).not.toContainEqual([123, 4321]);
|
||||
});
|
||||
|
||||
it("waits for queued boundary rotation before final lane delivery", async () => {
|
||||
const answerDraftStream = createSequencedDraftStream(1001);
|
||||
let resolveMaterialize: ((value: number | undefined) => void) | undefined;
|
||||
const materializePromise = new Promise<number | undefined>((resolve) => {
|
||||
resolveMaterialize = resolve;
|
||||
});
|
||||
answerDraftStream.materialize.mockImplementation(() => materializePromise);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
const startPromise = replyOptions?.onAssistantMessageStart?.();
|
||||
const finalPromise = dispatcherOptions.deliver(
|
||||
{ text: "Message B final" },
|
||||
{ kind: "final" },
|
||||
);
|
||||
resolveMaterialize?.(1001);
|
||||
await startPromise;
|
||||
await finalPromise;
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram).toHaveBeenCalledTimes(2);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("clears active preview even when an unrelated boundary archive exists", async () => {
|
||||
const answerDraftStream = createDraftStream(999);
|
||||
answerDraftStream.materialize.mockResolvedValue(4321);
|
||||
answerDraftStream.forceNewMessage.mockImplementation(() => {
|
||||
answerDraftStream.setMessageId(5555);
|
||||
});
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Before tool boundary" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Unfinalized next preview" });
|
||||
return { queuedFinal: false };
|
||||
});
|
||||
|
||||
const bot = createBot();
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
|
||||
|
||||
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
|
||||
const deleteMessageCalls = (
|
||||
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
|
||||
).deleteMessage.mock.calls;
|
||||
expect(deleteMessageCalls).not.toContainEqual([123, 4321]);
|
||||
});
|
||||
|
||||
it("queues late partials behind async boundary materialization", async () => {
|
||||
const answerDraftStream = createDraftStream(999);
|
||||
let resolveMaterialize: ((value: number | undefined) => void) | undefined;
|
||||
const materializePromise = new Promise<number | undefined>((resolve) => {
|
||||
resolveMaterialize = resolve;
|
||||
});
|
||||
answerDraftStream.materialize.mockImplementation(() => materializePromise);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
|
||||
|
||||
// Simulate provider fire-and-forget ordering: boundary callback starts
|
||||
// and a new partial arrives before boundary materialization resolves.
|
||||
const startPromise = replyOptions?.onAssistantMessageStart?.();
|
||||
const nextPartialPromise = replyOptions?.onPartialReply?.({ text: "Message B early" });
|
||||
|
||||
expect(answerDraftStream.update).toHaveBeenCalledTimes(1);
|
||||
resolveMaterialize?.(4321);
|
||||
|
||||
await startPromise;
|
||||
await nextPartialPromise;
|
||||
return { queuedFinal: false };
|
||||
});
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1);
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(answerDraftStream.update).toHaveBeenCalledTimes(2);
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early");
|
||||
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
|
||||
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
|
||||
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
|
||||
});
|
||||
|
||||
it("keeps final-only preview lane finalized until a real boundary rotation happens", async () => {
|
||||
it("keeps one answer lane when a later partial arrives before assistant message start", async () => {
|
||||
const answerDraftStream = createSequencedDraftStream(1001);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
@@ -566,9 +442,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// Final-only first response (no streamed partials).
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
// Simulate provider ordering bug: first chunk arrives before message-start callback.
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B early" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
@@ -581,173 +455,22 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not force new message on first assistant message start", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// First assistant message starts (no previous output)
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
// Partial updates
|
||||
await replyOptions?.onPartialReply?.({ text: "Hello" });
|
||||
await replyOptions?.onPartialReply?.({ text: "Hello world" });
|
||||
await dispatcherOptions.deliver({ text: "Hello world" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "block" });
|
||||
|
||||
// First message start shouldn't trigger forceNewMessage (no previous output)
|
||||
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rotates before a late second-message partial so finalized preview is not overwritten", async () => {
|
||||
const answerDraftStream = createSequencedDraftStream(1001);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
// Simulate provider ordering bug: first chunk arrives before message-start callback.
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B early" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early");
|
||||
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
|
||||
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
|
||||
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not skip message-start rotation when pre-rotation did not force a new message", async () => {
|
||||
const answerDraftStream = createSequencedDraftStream(1002);
|
||||
answerDraftStream.setMessageId(1001);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// First message has only final text (no streamed partials), so answer lane
|
||||
// reaches finalized state with hasStreamedMessage still false.
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
// Provider ordering bug: next message partial arrives before message-start.
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B early" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
|
||||
const bot = createBot();
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
|
||||
|
||||
// Early pre-rotation could not force (no streamed partials yet), so the
|
||||
// real assistant message_start must still rotate once.
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B early");
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B partial");
|
||||
const earlyUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0];
|
||||
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
|
||||
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
|
||||
expect(earlyUpdateOrder).toBeLessThan(boundaryRotationOrder);
|
||||
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect((bot.api.deleteMessage as ReturnType<typeof vi.fn>).mock.calls).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("does not trigger late pre-rotation mid-message after an explicit assistant message start", async () => {
|
||||
const answerDraftStream = createDraftStream(1001);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// Message A finalizes without streamed partials.
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
// Message B starts normally before partials.
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B first chunk" });
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B second chunk" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
// The explicit message_start boundary must clear finalized state so
|
||||
// same-message partials do not force a new preview mid-stream.
|
||||
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B first chunk");
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B second chunk");
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message A final Message B early");
|
||||
expect(answerDraftStream.update).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"Message A final Message B partial",
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(
|
||||
123,
|
||||
1001,
|
||||
"Message A final Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("finalizes multi-message assistant stream to matching preview messages in order", async () => {
|
||||
it("collapses multi-message assistant finals into one final edit", async () => {
|
||||
const answerDraftStream = createSequencedDraftStream(1001);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
@@ -760,7 +483,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message C partial" });
|
||||
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "Message C final" }, { kind: "final" });
|
||||
@@ -772,111 +494,20 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
123,
|
||||
1003,
|
||||
"Message C final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("maps finals correctly when first preview id resolves after message boundary", async () => {
|
||||
let answerMessageId: number | undefined;
|
||||
let answerDraftParams:
|
||||
| {
|
||||
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
|
||||
}
|
||||
| undefined;
|
||||
const answerDraftStream = {
|
||||
update: vi.fn().mockImplementation((text: string) => {
|
||||
if (text.includes("Message B")) {
|
||||
answerMessageId = 1002;
|
||||
}
|
||||
}),
|
||||
flush: vi.fn().mockResolvedValue(undefined),
|
||||
messageId: vi.fn().mockImplementation(() => answerMessageId),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
forceNewMessage: vi.fn().mockImplementation(() => {
|
||||
answerMessageId = undefined;
|
||||
}),
|
||||
};
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce((params) => {
|
||||
answerDraftParams = params as typeof answerDraftParams;
|
||||
return answerDraftStream;
|
||||
})
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
// Simulate late resolution of message A preview ID after boundary rotation.
|
||||
answerDraftParams?.onSupersededPreview?.({
|
||||
messageId: 1001,
|
||||
textSnapshot: "Message A partial",
|
||||
});
|
||||
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram.mock.calls[0]?.[0]).toBe(123);
|
||||
expect(editMessageTelegram.mock.calls[0]?.[1]).toBe(1001);
|
||||
expect(editMessageTelegram.mock.calls[0]?.[2]).toContain("Message A final");
|
||||
expect(editMessageTelegram.mock.calls[0]?.[2]).toContain("Message B final");
|
||||
expect(editMessageTelegram.mock.calls[0]?.[2]).toContain("Message C final");
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.each(["partial", "block"] as const)(
|
||||
"keeps finalized text preview when the next assistant message is media-only (%s mode)",
|
||||
async (streamMode) => {
|
||||
let answerMessageId: number | undefined = 1001;
|
||||
const answerDraftStream = {
|
||||
update: vi.fn(),
|
||||
flush: vi.fn().mockResolvedValue(undefined),
|
||||
messageId: vi.fn().mockImplementation(() => answerMessageId),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
forceNewMessage: vi.fn().mockImplementation(() => {
|
||||
answerMessageId = undefined;
|
||||
}),
|
||||
};
|
||||
const answerDraftStream = createDraftStream(1001);
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce(() => answerDraftStream)
|
||||
@@ -896,6 +527,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode, bot });
|
||||
|
||||
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(
|
||||
123,
|
||||
1001,
|
||||
@@ -909,75 +541,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
},
|
||||
);
|
||||
|
||||
it("maps finals correctly when archived preview id arrives during final flush", async () => {
|
||||
let answerMessageId: number | undefined;
|
||||
let answerDraftParams:
|
||||
| {
|
||||
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
|
||||
}
|
||||
| undefined;
|
||||
let emittedSupersededPreview = false;
|
||||
const answerDraftStream = {
|
||||
update: vi.fn().mockImplementation((text: string) => {
|
||||
if (text.includes("Message B")) {
|
||||
answerMessageId = 1002;
|
||||
}
|
||||
}),
|
||||
flush: vi.fn().mockImplementation(async () => {
|
||||
if (!emittedSupersededPreview) {
|
||||
emittedSupersededPreview = true;
|
||||
answerDraftParams?.onSupersededPreview?.({
|
||||
messageId: 1001,
|
||||
textSnapshot: "Message A partial",
|
||||
});
|
||||
}
|
||||
}),
|
||||
messageId: vi.fn().mockImplementation(() => answerMessageId),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
forceNewMessage: vi.fn().mockImplementation(() => {
|
||||
answerMessageId = undefined;
|
||||
}),
|
||||
};
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce((params) => {
|
||||
answerDraftParams = params as typeof answerDraftParams;
|
||||
return answerDraftStream;
|
||||
})
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.each(["block", "partial"] as const)(
|
||||
"splits reasoning lane only when a later reasoning block starts (%s mode)",
|
||||
async (streamMode) => {
|
||||
@@ -1349,9 +912,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
|
||||
await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(1, 123, 999, "3", expect.any(Object));
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "3", expect.any(Object));
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(
|
||||
123,
|
||||
111,
|
||||
"Reasoning:\nIf I count r in strawberry, I see positions 3, 8, and 9. So the total is 3.",
|
||||
@@ -1712,7 +1274,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect.objectContaining({
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
text: expect.stringContaining("No response"),
|
||||
text: "Something went wrong while processing your request. Please try again.",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
@@ -1741,7 +1303,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect.objectContaining({
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
text: expect.stringContaining("No response"),
|
||||
text: "Something went wrong while processing your request. Please try again.",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
|
||||
@@ -32,7 +32,6 @@ import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { createTelegramDraftStream } from "./draft-stream.js";
|
||||
import { renderTelegramHtmlText } from "./format.js";
|
||||
import {
|
||||
type ArchivedPreview,
|
||||
createLaneDeliveryStateTracker,
|
||||
createLaneTextDeliverer,
|
||||
type DraftLaneState,
|
||||
@@ -49,6 +48,24 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
|
||||
|
||||
/** Minimum chars before sending first streaming message (improves push notification UX) */
|
||||
const DRAFT_MIN_INITIAL_CHARS = 30;
|
||||
const ANSWER_SEGMENT_NO_SPACE_BEFORE_RE = /^[,.;:!?)}\]]/;
|
||||
|
||||
function appendAnswerSegment(prefix: string, segment: string): string {
|
||||
if (!prefix) {
|
||||
return segment;
|
||||
}
|
||||
if (!segment) {
|
||||
return prefix;
|
||||
}
|
||||
if (
|
||||
/\s$/.test(prefix) ||
|
||||
/^\s/.test(segment) ||
|
||||
ANSWER_SEGMENT_NO_SPACE_BEFORE_RE.test(segment)
|
||||
) {
|
||||
return `${prefix}${segment}`;
|
||||
}
|
||||
return `${prefix} ${segment}`;
|
||||
}
|
||||
|
||||
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
|
||||
try {
|
||||
@@ -195,7 +212,6 @@ export const dispatchTelegramMessage = async ({
|
||||
// a visible duplicate flash at finalize time.
|
||||
const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft;
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
const archivedAnswerPreviews: ArchivedPreview[] = [];
|
||||
const archivedReasoningPreviewIds: number[] = [];
|
||||
const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => {
|
||||
const stream = enabled
|
||||
@@ -209,19 +225,11 @@ export const dispatchTelegramMessage = async ({
|
||||
minInitialChars: draftMinInitialChars,
|
||||
renderText: renderDraftPreview,
|
||||
onSupersededPreview:
|
||||
laneName === "answer" || laneName === "reasoning"
|
||||
laneName === "reasoning"
|
||||
? (preview) => {
|
||||
if (laneName === "reasoning") {
|
||||
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
|
||||
archivedReasoningPreviewIds.push(preview.messageId);
|
||||
}
|
||||
return;
|
||||
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
|
||||
archivedReasoningPreviewIds.push(preview.messageId);
|
||||
}
|
||||
archivedAnswerPreviews.push({
|
||||
messageId: preview.messageId,
|
||||
textSnapshot: preview.textSnapshot,
|
||||
deleteIfUnused: true,
|
||||
});
|
||||
}
|
||||
: undefined,
|
||||
log: logVerbose,
|
||||
@@ -245,7 +253,14 @@ export const dispatchTelegramMessage = async ({
|
||||
const answerLane = lanes.answer;
|
||||
const reasoningLane = lanes.reasoning;
|
||||
let splitReasoningOnNextStream = false;
|
||||
let skipNextAnswerMessageStartRotation = false;
|
||||
let answerSegmentPrefixText = "";
|
||||
let pendingAnswerFinalSlots = 1;
|
||||
let bufferedAnswerFinal:
|
||||
| {
|
||||
payload: ReplyPayload;
|
||||
text: string;
|
||||
}
|
||||
| undefined;
|
||||
let draftLaneEventQueue = Promise.resolve();
|
||||
const reasoningStepState = createTelegramReasoningStepState();
|
||||
const enqueueDraftLaneEvent = (task: () => Promise<void>): Promise<void> => {
|
||||
@@ -276,34 +291,20 @@ export const dispatchTelegramMessage = async ({
|
||||
Boolean(split.reasoningText) && suppressReasoning && !split.answerText,
|
||||
};
|
||||
};
|
||||
const getCurrentAnswerText = () => bufferedAnswerFinal?.text ?? answerLane.lastPartialText;
|
||||
const composeAnswerSegmentText = (text: string) =>
|
||||
appendAnswerSegment(answerSegmentPrefixText, text);
|
||||
const rememberAnswerBoundary = () => {
|
||||
answerSegmentPrefixText = getCurrentAnswerText();
|
||||
};
|
||||
const bufferAnswerFinal = (payload: ReplyPayload, text: string) => {
|
||||
bufferedAnswerFinal = { payload, text };
|
||||
answerSegmentPrefixText = text;
|
||||
};
|
||||
const resetDraftLaneState = (lane: DraftLaneState) => {
|
||||
lane.lastPartialText = "";
|
||||
lane.hasStreamedMessage = false;
|
||||
};
|
||||
const rotateAnswerLaneForNewAssistantMessage = async () => {
|
||||
let didForceNewMessage = false;
|
||||
if (answerLane.hasStreamedMessage) {
|
||||
// Materialize the current streamed draft into a permanent message
|
||||
// so it remains visible across tool boundaries.
|
||||
const materializedId = await answerLane.stream?.materialize?.();
|
||||
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
|
||||
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
|
||||
archivedAnswerPreviews.push({
|
||||
messageId: previewMessageId,
|
||||
textSnapshot: answerLane.lastPartialText,
|
||||
deleteIfUnused: false,
|
||||
});
|
||||
}
|
||||
answerLane.stream?.forceNewMessage();
|
||||
didForceNewMessage = true;
|
||||
}
|
||||
resetDraftLaneState(answerLane);
|
||||
if (didForceNewMessage) {
|
||||
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
|
||||
finalizedPreviewByLane.answer = false;
|
||||
}
|
||||
return didForceNewMessage;
|
||||
};
|
||||
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
|
||||
const laneStream = lane.stream;
|
||||
if (!laneStream || !text) {
|
||||
@@ -329,19 +330,14 @@ export const dispatchTelegramMessage = async ({
|
||||
};
|
||||
const ingestDraftLaneSegments = async (text: string | undefined) => {
|
||||
const split = splitTextIntoLaneSegments(text);
|
||||
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
|
||||
if (hasAnswerSegment && finalizedPreviewByLane.answer) {
|
||||
// Some providers can emit the first partial of a new assistant message before
|
||||
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
|
||||
// the previously finalized preview message with the next message's text.
|
||||
skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage();
|
||||
}
|
||||
for (const segment of split.segments) {
|
||||
if (segment.lane === "reasoning") {
|
||||
reasoningStepState.noteReasoningHint();
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
updateDraftFromPartial(lanes.reasoning, segment.text);
|
||||
continue;
|
||||
}
|
||||
updateDraftFromPartial(lanes[segment.lane], segment.text);
|
||||
updateDraftFromPartial(lanes.answer, composeAnswerSegmentText(segment.text));
|
||||
}
|
||||
};
|
||||
const flushDraftLane = async (lane: DraftLaneState) => {
|
||||
@@ -464,7 +460,6 @@ export const dispatchTelegramMessage = async ({
|
||||
};
|
||||
const deliverLaneText = createLaneTextDeliverer({
|
||||
lanes,
|
||||
archivedAnswerPreviews,
|
||||
finalizedPreviewByLane,
|
||||
draftMaxChars,
|
||||
applyTextToPayload,
|
||||
@@ -482,14 +477,29 @@ export const dispatchTelegramMessage = async ({
|
||||
buttons: previewButtons,
|
||||
});
|
||||
},
|
||||
deletePreviewMessage: async (messageId) => {
|
||||
await bot.api.deleteMessage(chatId, messageId);
|
||||
},
|
||||
log: logVerbose,
|
||||
markDelivered: () => {
|
||||
deliveryState.markDelivered();
|
||||
},
|
||||
});
|
||||
const flushBufferedAnswerFinal = async () => {
|
||||
if (!bufferedAnswerFinal) {
|
||||
return;
|
||||
}
|
||||
const { payload, text } = bufferedAnswerFinal;
|
||||
bufferedAnswerFinal = undefined;
|
||||
const previewButtons = (
|
||||
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
|
||||
)?.buttons;
|
||||
await deliverLaneText({
|
||||
laneName: "answer",
|
||||
text,
|
||||
payload,
|
||||
infoKind: "final",
|
||||
previewButtons,
|
||||
});
|
||||
reasoningStepState.resetForNextStep();
|
||||
};
|
||||
|
||||
let queuedFinal = false;
|
||||
|
||||
@@ -530,59 +540,39 @@ export const dispatchTelegramMessage = async ({
|
||||
const segments = split.segments;
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
|
||||
const flushBufferedFinalAnswer = async () => {
|
||||
const buffered = reasoningStepState.takeBufferedFinalAnswer();
|
||||
if (!buffered) {
|
||||
return;
|
||||
}
|
||||
const bufferedButtons = (
|
||||
buffered.payload.channelData?.telegram as
|
||||
| { buttons?: TelegramInlineButtons }
|
||||
| undefined
|
||||
)?.buttons;
|
||||
await deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: buffered.text,
|
||||
payload: buffered.payload,
|
||||
infoKind: "final",
|
||||
previewButtons: bufferedButtons,
|
||||
});
|
||||
reasoningStepState.resetForNextStep();
|
||||
};
|
||||
|
||||
for (const segment of segments) {
|
||||
if (
|
||||
segment.lane === "answer" &&
|
||||
info.kind === "final" &&
|
||||
reasoningStepState.shouldBufferFinalAnswer()
|
||||
) {
|
||||
reasoningStepState.bufferFinalAnswer({ payload, text: segment.text });
|
||||
continue;
|
||||
}
|
||||
if (segment.lane === "reasoning") {
|
||||
reasoningStepState.noteReasoningHint();
|
||||
const result = await deliverLaneText({
|
||||
laneName: "reasoning",
|
||||
text: segment.text,
|
||||
payload,
|
||||
infoKind: info.kind,
|
||||
previewButtons,
|
||||
allowPreviewUpdateForNonFinal: true,
|
||||
});
|
||||
if (result !== "skipped") {
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const result = await deliverLaneText({
|
||||
laneName: segment.lane,
|
||||
text: segment.text,
|
||||
const answerText = composeAnswerSegmentText(segment.text);
|
||||
if (info.kind === "final") {
|
||||
if (pendingAnswerFinalSlots <= 0) {
|
||||
await sendPayload(payload);
|
||||
continue;
|
||||
}
|
||||
pendingAnswerFinalSlots -= 1;
|
||||
bufferAnswerFinal(payload, answerText);
|
||||
continue;
|
||||
}
|
||||
await deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: answerText,
|
||||
payload,
|
||||
infoKind: info.kind,
|
||||
previewButtons,
|
||||
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
|
||||
});
|
||||
if (segment.lane === "reasoning") {
|
||||
if (result !== "skipped") {
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (info.kind === "final") {
|
||||
if (reasoningLane.hasStreamedMessage) {
|
||||
finalizedPreviewByLane.reasoning = true;
|
||||
}
|
||||
reasoningStepState.resetForNextStep();
|
||||
}
|
||||
}
|
||||
if (segments.length > 0) {
|
||||
return;
|
||||
@@ -593,9 +583,6 @@ export const dispatchTelegramMessage = async ({
|
||||
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
|
||||
await sendPayload(payloadWithoutSuppressedReasoning);
|
||||
}
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -607,15 +594,9 @@ export const dispatchTelegramMessage = async ({
|
||||
const canSendAsIs =
|
||||
hasMedia || (typeof payload.text === "string" && payload.text.length > 0);
|
||||
if (!canSendAsIs) {
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
return;
|
||||
}
|
||||
await sendPayload(payload);
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
},
|
||||
onSkip: (_payload, info) => {
|
||||
if (info.reason !== "silent") {
|
||||
@@ -655,17 +636,10 @@ export const dispatchTelegramMessage = async ({
|
||||
? () =>
|
||||
enqueueDraftLaneEvent(async () => {
|
||||
reasoningStepState.resetForNextStep();
|
||||
if (skipNextAnswerMessageStartRotation) {
|
||||
skipNextAnswerMessageStartRotation = false;
|
||||
finalizedPreviewByLane.answer = false;
|
||||
return;
|
||||
if (getCurrentAnswerText()) {
|
||||
pendingAnswerFinalSlots += 1;
|
||||
rememberAnswerBoundary();
|
||||
}
|
||||
await rotateAnswerLaneForNewAssistantMessage();
|
||||
// Message-start is an explicit assistant-message boundary.
|
||||
// Even when no forceNewMessage happened (e.g. prior answer had no
|
||||
// streamed partials), the next partial belongs to a fresh lifecycle
|
||||
// and must not trigger late pre-rotation mid-message.
|
||||
finalizedPreviewByLane.answer = false;
|
||||
})
|
||||
: undefined,
|
||||
onReasoningEnd: reasoningLane.stream
|
||||
@@ -683,6 +657,10 @@ export const dispatchTelegramMessage = async ({
|
||||
onModelSelected,
|
||||
},
|
||||
}));
|
||||
await flushBufferedAnswerFinal();
|
||||
if (reasoningLane.hasStreamedMessage) {
|
||||
finalizedPreviewByLane.reasoning = true;
|
||||
}
|
||||
} catch (err) {
|
||||
dispatchError = err;
|
||||
runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`));
|
||||
@@ -704,17 +682,7 @@ export const dispatchTelegramMessage = async ({
|
||||
if (!stream) {
|
||||
continue;
|
||||
}
|
||||
// Don't clear (delete) the stream if: (a) it was finalized, or
|
||||
// (b) the active stream message is itself a boundary-finalized archive.
|
||||
const activePreviewMessageId = stream.messageId();
|
||||
const hasBoundaryFinalizedActivePreview =
|
||||
laneState.laneName === "answer" &&
|
||||
typeof activePreviewMessageId === "number" &&
|
||||
archivedAnswerPreviews.some(
|
||||
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
|
||||
);
|
||||
const shouldClear =
|
||||
!finalizedPreviewByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
|
||||
const shouldClear = !finalizedPreviewByLane[laneState.laneName];
|
||||
const existing = streamCleanupStates.get(stream);
|
||||
if (!existing) {
|
||||
streamCleanupStates.set(stream, { shouldClear });
|
||||
@@ -728,18 +696,6 @@ export const dispatchTelegramMessage = async ({
|
||||
await stream.clear();
|
||||
}
|
||||
}
|
||||
for (const archivedPreview of archivedAnswerPreviews) {
|
||||
if (archivedPreview.deleteIfUnused === false) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
for (const messageId of archivedReasoningPreviewIds) {
|
||||
try {
|
||||
await bot.api.deleteMessage(chatId, messageId);
|
||||
|
||||
@@ -27,19 +27,10 @@ export type DraftLaneState = {
|
||||
hasStreamedMessage: boolean;
|
||||
};
|
||||
|
||||
export type ArchivedPreview = {
|
||||
messageId: number;
|
||||
textSnapshot: string;
|
||||
// Boundary-finalized previews should remain visible even if no matching
|
||||
// final edit arrives; superseded previews can be safely deleted.
|
||||
deleteIfUnused?: boolean;
|
||||
};
|
||||
|
||||
export type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
|
||||
|
||||
type CreateLaneTextDelivererParams = {
|
||||
lanes: Record<LaneName, DraftLaneState>;
|
||||
archivedAnswerPreviews: ArchivedPreview[];
|
||||
finalizedPreviewByLane: Record<LaneName, boolean>;
|
||||
draftMaxChars: number;
|
||||
applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload;
|
||||
@@ -53,7 +44,6 @@ type CreateLaneTextDelivererParams = {
|
||||
context: "final" | "update";
|
||||
previewButtons?: TelegramInlineButtons;
|
||||
}) => Promise<void>;
|
||||
deletePreviewMessage: (messageId: number) => Promise<void>;
|
||||
log: (message: string) => void;
|
||||
markDelivered: () => void;
|
||||
};
|
||||
@@ -80,14 +70,6 @@ type TryUpdatePreviewParams = {
|
||||
previewTextSnapshot?: string;
|
||||
};
|
||||
|
||||
type ConsumeArchivedAnswerPreviewParams = {
|
||||
lane: DraftLaneState;
|
||||
text: string;
|
||||
payload: ReplyPayload;
|
||||
previewButtons?: TelegramInlineButtons;
|
||||
canEditViaPreview: boolean;
|
||||
};
|
||||
|
||||
type PreviewUpdateContext = "final" | "update";
|
||||
type RegressiveSkipMode = "always" | "existingOnly";
|
||||
|
||||
@@ -140,6 +122,10 @@ function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTarget
|
||||
export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
|
||||
const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft";
|
||||
const markLaneDelivered = (lane: DraftLaneState, text: string) => {
|
||||
lane.lastPartialText = text;
|
||||
params.markDelivered();
|
||||
};
|
||||
const canMaterializeDraftFinal = (
|
||||
lane: DraftLaneState,
|
||||
previewButtons?: TelegramInlineButtons,
|
||||
@@ -171,8 +157,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
);
|
||||
return false;
|
||||
}
|
||||
args.lane.lastPartialText = args.text;
|
||||
params.markDelivered();
|
||||
markLaneDelivered(args.lane, args.text);
|
||||
return true;
|
||||
};
|
||||
|
||||
@@ -194,13 +179,16 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
previewButtons: args.previewButtons,
|
||||
context: args.context,
|
||||
});
|
||||
if (args.updateLaneSnapshot) {
|
||||
if (args.updateLaneSnapshot || args.context === "final") {
|
||||
args.lane.lastPartialText = args.text;
|
||||
}
|
||||
params.markDelivered();
|
||||
return true;
|
||||
} catch (err) {
|
||||
if (isMessageNotModifiedError(err)) {
|
||||
if (args.context === "final") {
|
||||
args.lane.lastPartialText = args.text;
|
||||
}
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`,
|
||||
);
|
||||
@@ -208,8 +196,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
return true;
|
||||
}
|
||||
if (args.treatEditFailureAsDelivered) {
|
||||
if (args.context === "final") {
|
||||
args.lane.lastPartialText = args.text;
|
||||
}
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview ${args.context} edit failed after stop-created flush; treating as delivered (${String(err)})`,
|
||||
`telegram: ${args.laneName} preview ${args.context} edit failed; keeping existing preview (${String(err)})`,
|
||||
);
|
||||
params.markDelivered();
|
||||
return true;
|
||||
@@ -300,55 +291,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
}
|
||||
return finalizePreview(
|
||||
previewTargetAfterStop.previewMessageId,
|
||||
false,
|
||||
context === "final",
|
||||
previewTargetAfterStop.hadPreviewMessage,
|
||||
);
|
||||
};
|
||||
|
||||
const consumeArchivedAnswerPreviewForFinal = async ({
|
||||
lane,
|
||||
text,
|
||||
payload,
|
||||
previewButtons,
|
||||
canEditViaPreview,
|
||||
}: ConsumeArchivedAnswerPreviewParams): Promise<LaneDeliveryResult | undefined> => {
|
||||
const archivedPreview = params.archivedAnswerPreviews.shift();
|
||||
if (!archivedPreview) {
|
||||
return undefined;
|
||||
}
|
||||
if (canEditViaPreview) {
|
||||
const finalized = await tryUpdatePreviewForLane({
|
||||
lane,
|
||||
laneName: "answer",
|
||||
text,
|
||||
previewButtons,
|
||||
stopBeforeEdit: false,
|
||||
skipRegressive: "existingOnly",
|
||||
context: "final",
|
||||
previewMessageId: archivedPreview.messageId,
|
||||
previewTextSnapshot: archivedPreview.textSnapshot,
|
||||
});
|
||||
if (finalized) {
|
||||
return "preview-finalized";
|
||||
}
|
||||
}
|
||||
// Send the replacement message first, then clean up the old preview.
|
||||
// This avoids the visual "disappear then reappear" flash.
|
||||
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
|
||||
// Once this archived preview is consumed by a fallback final send, delete it
|
||||
// regardless of deleteIfUnused. That flag only applies to unconsumed boundaries.
|
||||
if (delivered || archivedPreview.deleteIfUnused !== false) {
|
||||
try {
|
||||
await params.deletePreviewMessage(archivedPreview.messageId);
|
||||
} catch (err) {
|
||||
params.log(
|
||||
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
return delivered ? "sent" : "skipped";
|
||||
};
|
||||
|
||||
return async ({
|
||||
laneName,
|
||||
text,
|
||||
@@ -363,32 +310,8 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
!hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError;
|
||||
|
||||
if (infoKind === "final") {
|
||||
if (laneName === "answer") {
|
||||
const archivedResult = await consumeArchivedAnswerPreviewForFinal({
|
||||
lane,
|
||||
text,
|
||||
payload,
|
||||
previewButtons,
|
||||
canEditViaPreview,
|
||||
});
|
||||
if (archivedResult) {
|
||||
return archivedResult;
|
||||
}
|
||||
}
|
||||
if (canEditViaPreview && !params.finalizedPreviewByLane[laneName]) {
|
||||
await params.flushDraftLane(lane);
|
||||
if (laneName === "answer") {
|
||||
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
|
||||
lane,
|
||||
text,
|
||||
payload,
|
||||
previewButtons,
|
||||
canEditViaPreview,
|
||||
});
|
||||
if (archivedResultAfterFlush) {
|
||||
return archivedResultAfterFlush;
|
||||
}
|
||||
}
|
||||
if (canMaterializeDraftFinal(lane, previewButtons)) {
|
||||
const materialized = await tryMaterializeDraftPreviewForFinal({
|
||||
lane,
|
||||
@@ -420,6 +343,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
}
|
||||
await params.stopDraftLane(lane);
|
||||
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
|
||||
if (delivered) {
|
||||
lane.lastPartialText = text;
|
||||
}
|
||||
return delivered ? "sent" : "skipped";
|
||||
}
|
||||
|
||||
|
||||
@@ -39,19 +39,12 @@ function createHarness(params?: {
|
||||
await lane.stream?.stop();
|
||||
});
|
||||
const editPreview = vi.fn().mockResolvedValue(undefined);
|
||||
const deletePreviewMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const log = vi.fn();
|
||||
const markDelivered = vi.fn();
|
||||
const finalizedPreviewByLane: Record<LaneName, boolean> = { answer: false, reasoning: false };
|
||||
const archivedAnswerPreviews: Array<{
|
||||
messageId: number;
|
||||
textSnapshot: string;
|
||||
deleteIfUnused?: boolean;
|
||||
}> = [];
|
||||
|
||||
const deliverLaneText = createLaneTextDeliverer({
|
||||
lanes,
|
||||
archivedAnswerPreviews,
|
||||
finalizedPreviewByLane,
|
||||
draftMaxChars: params?.draftMaxChars ?? 4_096,
|
||||
applyTextToPayload: (payload: ReplyPayload, text: string) => ({ ...payload, text }),
|
||||
@@ -59,7 +52,6 @@ function createHarness(params?: {
|
||||
flushDraftLane,
|
||||
stopDraftLane,
|
||||
editPreview,
|
||||
deletePreviewMessage,
|
||||
log,
|
||||
markDelivered,
|
||||
});
|
||||
@@ -75,10 +67,8 @@ function createHarness(params?: {
|
||||
flushDraftLane,
|
||||
stopDraftLane,
|
||||
editPreview,
|
||||
deletePreviewMessage,
|
||||
log,
|
||||
markDelivered,
|
||||
archivedAnswerPreviews,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -143,7 +133,7 @@ describe("createLaneTextDeliverer", () => {
|
||||
expect(result).toBe("preview-finalized");
|
||||
expect(harness.editPreview).toHaveBeenCalledTimes(1);
|
||||
expect(harness.sendPayload).not.toHaveBeenCalled();
|
||||
expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("treating as delivered"));
|
||||
expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("keeping existing preview"));
|
||||
});
|
||||
|
||||
it("treats 'message is not modified' preview edit errors as delivered", async () => {
|
||||
@@ -170,7 +160,7 @@ describe("createLaneTextDeliverer", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to normal delivery when editing an existing preview fails", async () => {
|
||||
it("keeps existing preview when editing an existing preview fails", async () => {
|
||||
const harness = createHarness({ answerMessageId: 999 });
|
||||
harness.editPreview.mockRejectedValue(new Error("500: preview edit failed"));
|
||||
|
||||
@@ -181,11 +171,10 @@ describe("createLaneTextDeliverer", () => {
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("sent");
|
||||
expect(result).toBe("preview-finalized");
|
||||
expect(harness.editPreview).toHaveBeenCalledTimes(1);
|
||||
expect(harness.sendPayload).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ text: "Hello final" }),
|
||||
);
|
||||
expect(harness.sendPayload).not.toHaveBeenCalled();
|
||||
expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("keeping existing preview"));
|
||||
});
|
||||
|
||||
it("falls back to normal delivery when stop-created preview has no message id", async () => {
|
||||
@@ -361,26 +350,4 @@ describe("createLaneTextDeliverer", () => {
|
||||
);
|
||||
expect(harness.markDelivered).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("deletes consumed boundary previews after fallback final send", async () => {
|
||||
const harness = createHarness();
|
||||
harness.archivedAnswerPreviews.push({
|
||||
messageId: 4444,
|
||||
textSnapshot: "Boundary preview",
|
||||
deleteIfUnused: false,
|
||||
});
|
||||
|
||||
const result = await harness.deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: "Final with media",
|
||||
payload: { text: "Final with media", mediaUrl: "file:///tmp/example.png" },
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("sent");
|
||||
expect(harness.sendPayload).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ text: "Final with media", mediaUrl: "file:///tmp/example.png" }),
|
||||
);
|
||||
expect(harness.deletePreviewMessage).toHaveBeenCalledWith(4444);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
export {
|
||||
type ArchivedPreview,
|
||||
createLaneTextDeliverer,
|
||||
type DraftLaneState,
|
||||
type LaneDeliveryResult,
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { formatReasoningMessage } from "../agents/pi-embedded-utils.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
import { findCodeRegions, isInsideCode } from "../shared/text/code-regions.js";
|
||||
import { stripReasoningTagsFromText } from "../shared/text/reasoning-tags.js";
|
||||
|
||||
@@ -87,14 +86,8 @@ export function splitTelegramReasoningText(text?: string): TelegramReasoningSpli
|
||||
return { reasoningText, answerText };
|
||||
}
|
||||
|
||||
export type BufferedFinalAnswer = {
|
||||
payload: ReplyPayload;
|
||||
text: string;
|
||||
};
|
||||
|
||||
export function createTelegramReasoningStepState() {
|
||||
let reasoningStatus: "none" | "hinted" | "delivered" = "none";
|
||||
let bufferedFinalAnswer: BufferedFinalAnswer | undefined;
|
||||
|
||||
const noteReasoningHint = () => {
|
||||
if (reasoningStatus === "none") {
|
||||
@@ -106,31 +99,13 @@ export function createTelegramReasoningStepState() {
|
||||
reasoningStatus = "delivered";
|
||||
};
|
||||
|
||||
const shouldBufferFinalAnswer = () => {
|
||||
return reasoningStatus === "hinted" && !bufferedFinalAnswer;
|
||||
};
|
||||
|
||||
const bufferFinalAnswer = (value: BufferedFinalAnswer) => {
|
||||
bufferedFinalAnswer = value;
|
||||
};
|
||||
|
||||
const takeBufferedFinalAnswer = (): BufferedFinalAnswer | undefined => {
|
||||
const value = bufferedFinalAnswer;
|
||||
bufferedFinalAnswer = undefined;
|
||||
return value;
|
||||
};
|
||||
|
||||
const resetForNextStep = () => {
|
||||
reasoningStatus = "none";
|
||||
bufferedFinalAnswer = undefined;
|
||||
};
|
||||
|
||||
return {
|
||||
noteReasoningHint,
|
||||
noteReasoningDelivered,
|
||||
shouldBufferFinalAnswer,
|
||||
bufferFinalAnswer,
|
||||
takeBufferedFinalAnswer,
|
||||
resetForNextStep,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user