mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:30:44 +00:00
refactor(telegram): split preview lifecycle from cleanup
This commit is contained in:
@@ -963,6 +963,74 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("still finalizes the active preview after an archived final edit is retained", async () => {
|
||||
let answerMessageId: number | undefined;
|
||||
let answerDraftParams:
|
||||
| {
|
||||
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
|
||||
}
|
||||
| undefined;
|
||||
const answerDraftStream = {
|
||||
update: vi.fn().mockImplementation((text: string) => {
|
||||
if (text.includes("Message B")) {
|
||||
answerMessageId = 1002;
|
||||
}
|
||||
}),
|
||||
flush: vi.fn().mockResolvedValue(undefined),
|
||||
messageId: vi.fn().mockImplementation(() => answerMessageId),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
forceNewMessage: vi.fn().mockImplementation(() => {
|
||||
answerMessageId = undefined;
|
||||
}),
|
||||
};
|
||||
const reasoningDraftStream = createDraftStream();
|
||||
createTelegramDraftStream
|
||||
.mockImplementationOnce((params) => {
|
||||
answerDraftParams = params as typeof answerDraftParams;
|
||||
return answerDraftStream;
|
||||
})
|
||||
.mockImplementationOnce(() => reasoningDraftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
|
||||
answerDraftParams?.onSupersededPreview?.({
|
||||
messageId: 1001,
|
||||
textSnapshot: "Message A partial",
|
||||
});
|
||||
|
||||
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram
|
||||
.mockRejectedValueOnce(new Error("400: Bad Request: message to edit not found"))
|
||||
.mockResolvedValueOnce({ ok: true, chatId: "123", messageId: "1002" });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
123,
|
||||
1001,
|
||||
"Message A final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(editMessageTelegram).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
123,
|
||||
1002,
|
||||
"Message B final",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(answerDraftStream.clear).not.toHaveBeenCalled();
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.each(["partial", "block"] as const)(
|
||||
"keeps finalized text preview when the next assistant message is media-only (%s mode)",
|
||||
async (streamMode) => {
|
||||
|
||||
@@ -38,7 +38,7 @@ import {
|
||||
createLaneTextDeliverer,
|
||||
type DraftLaneState,
|
||||
type LaneName,
|
||||
type LanePreviewDisposition,
|
||||
type LanePreviewLifecycle,
|
||||
} from "./lane-delivery.js";
|
||||
import {
|
||||
createTelegramReasoningStepState,
|
||||
@@ -240,10 +240,17 @@ export const dispatchTelegramMessage = async ({
|
||||
answer: createDraftLane("answer", canStreamAnswerDraft),
|
||||
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
|
||||
};
|
||||
const previewDispositionByLane: Record<LaneName, LanePreviewDisposition> = {
|
||||
// Active preview lifecycle answers "can this current preview still be
|
||||
// finalized?" Cleanup retention is separate so archived-preview decisions do
|
||||
// not poison the active lane.
|
||||
const activePreviewLifecycleByLane: Record<LaneName, LanePreviewLifecycle> = {
|
||||
answer: "transient",
|
||||
reasoning: "transient",
|
||||
};
|
||||
const retainPreviewOnCleanupByLane: Record<LaneName, boolean> = {
|
||||
answer: false,
|
||||
reasoning: false,
|
||||
};
|
||||
const answerLane = lanes.answer;
|
||||
const reasoningLane = lanes.reasoning;
|
||||
let splitReasoningOnNextStream = false;
|
||||
@@ -289,7 +296,10 @@ export const dispatchTelegramMessage = async ({
|
||||
// so it remains visible across tool boundaries.
|
||||
const materializedId = await answerLane.stream?.materialize?.();
|
||||
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
|
||||
if (typeof previewMessageId === "number" && previewDispositionByLane.answer === "transient") {
|
||||
if (
|
||||
typeof previewMessageId === "number" &&
|
||||
activePreviewLifecycleByLane.answer === "transient"
|
||||
) {
|
||||
archivedAnswerPreviews.push({
|
||||
messageId: previewMessageId,
|
||||
textSnapshot: answerLane.lastPartialText,
|
||||
@@ -302,7 +312,8 @@ export const dispatchTelegramMessage = async ({
|
||||
resetDraftLaneState(answerLane);
|
||||
if (didForceNewMessage) {
|
||||
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
|
||||
previewDispositionByLane.answer = "transient";
|
||||
activePreviewLifecycleByLane.answer = "transient";
|
||||
retainPreviewOnCleanupByLane.answer = false;
|
||||
}
|
||||
return didForceNewMessage;
|
||||
};
|
||||
@@ -332,7 +343,7 @@ export const dispatchTelegramMessage = async ({
|
||||
const ingestDraftLaneSegments = async (text: string | undefined) => {
|
||||
const split = splitTextIntoLaneSegments(text);
|
||||
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
|
||||
if (hasAnswerSegment && previewDispositionByLane.answer !== "transient") {
|
||||
if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") {
|
||||
// Some providers can emit the first partial of a new assistant message before
|
||||
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
|
||||
// the previously finalized preview message with the next message's text.
|
||||
@@ -470,7 +481,8 @@ export const dispatchTelegramMessage = async ({
|
||||
const deliverLaneText = createLaneTextDeliverer({
|
||||
lanes,
|
||||
archivedAnswerPreviews,
|
||||
previewDispositionByLane,
|
||||
activePreviewLifecycleByLane,
|
||||
retainPreviewOnCleanupByLane,
|
||||
draftMaxChars,
|
||||
applyTextToPayload,
|
||||
sendPayload,
|
||||
@@ -597,7 +609,8 @@ export const dispatchTelegramMessage = async ({
|
||||
}
|
||||
if (info.kind === "final") {
|
||||
if (reasoningLane.hasStreamedMessage) {
|
||||
previewDispositionByLane.reasoning = "finalized";
|
||||
activePreviewLifecycleByLane.reasoning = "complete";
|
||||
retainPreviewOnCleanupByLane.reasoning = true;
|
||||
}
|
||||
reasoningStepState.resetForNextStep();
|
||||
}
|
||||
@@ -675,7 +688,8 @@ export const dispatchTelegramMessage = async ({
|
||||
reasoningStepState.resetForNextStep();
|
||||
if (skipNextAnswerMessageStartRotation) {
|
||||
skipNextAnswerMessageStartRotation = false;
|
||||
previewDispositionByLane.answer = "transient";
|
||||
activePreviewLifecycleByLane.answer = "transient";
|
||||
retainPreviewOnCleanupByLane.answer = false;
|
||||
return;
|
||||
}
|
||||
await rotateAnswerLaneForNewAssistantMessage();
|
||||
@@ -683,7 +697,8 @@ export const dispatchTelegramMessage = async ({
|
||||
// Even when no forceNewMessage happened (e.g. prior answer had no
|
||||
// streamed partials), the next partial belongs to a fresh lifecycle
|
||||
// and must not trigger late pre-rotation mid-message.
|
||||
previewDispositionByLane.answer = "transient";
|
||||
activePreviewLifecycleByLane.answer = "transient";
|
||||
retainPreviewOnCleanupByLane.answer = false;
|
||||
})
|
||||
: undefined,
|
||||
onReasoningEnd: reasoningLane.stream
|
||||
@@ -732,8 +747,7 @@ export const dispatchTelegramMessage = async ({
|
||||
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
|
||||
);
|
||||
const shouldClear =
|
||||
previewDispositionByLane[laneState.laneName] === "transient" &&
|
||||
!hasBoundaryFinalizedActivePreview;
|
||||
!retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
|
||||
const existing = streamCleanupStates.get(stream);
|
||||
if (!existing) {
|
||||
streamCleanupStates.set(stream, { shouldClear });
|
||||
|
||||
@@ -49,7 +49,7 @@ export type ArchivedPreview = {
|
||||
deleteIfUnused?: boolean;
|
||||
};
|
||||
|
||||
export type LanePreviewDisposition = "transient" | "retained" | "finalized";
|
||||
export type LanePreviewLifecycle = "transient" | "complete";
|
||||
|
||||
export type LaneDeliveryResult =
|
||||
| "preview-finalized"
|
||||
@@ -61,7 +61,8 @@ export type LaneDeliveryResult =
|
||||
type CreateLaneTextDelivererParams = {
|
||||
lanes: Record<LaneName, DraftLaneState>;
|
||||
archivedAnswerPreviews: ArchivedPreview[];
|
||||
previewDispositionByLane: Record<LaneName, LanePreviewDisposition>;
|
||||
activePreviewLifecycleByLane: Record<LaneName, LanePreviewLifecycle>;
|
||||
retainPreviewOnCleanupByLane: Record<LaneName, boolean>;
|
||||
draftMaxChars: number;
|
||||
applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload;
|
||||
sendPayload: (payload: ReplyPayload) => Promise<boolean>;
|
||||
@@ -162,6 +163,10 @@ function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTarget
|
||||
|
||||
export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
|
||||
const markActivePreviewComplete = (laneName: LaneName) => {
|
||||
params.activePreviewLifecycleByLane[laneName] = "complete";
|
||||
params.retainPreviewOnCleanupByLane[laneName] = true;
|
||||
};
|
||||
const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft";
|
||||
const canMaterializeDraftFinal = (
|
||||
lane: DraftLaneState,
|
||||
@@ -401,7 +406,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
return "preview-finalized";
|
||||
}
|
||||
if (finalized === "retained") {
|
||||
params.previewDispositionByLane.answer = "retained";
|
||||
params.retainPreviewOnCleanupByLane.answer = true;
|
||||
return "preview-retained";
|
||||
}
|
||||
}
|
||||
@@ -448,7 +453,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
return archivedResult;
|
||||
}
|
||||
}
|
||||
if (canEditViaPreview && params.previewDispositionByLane[laneName] === "transient") {
|
||||
if (canEditViaPreview && params.activePreviewLifecycleByLane[laneName] === "transient") {
|
||||
await params.flushDraftLane(lane);
|
||||
if (laneName === "answer") {
|
||||
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
|
||||
@@ -469,7 +474,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
text,
|
||||
});
|
||||
if (materialized) {
|
||||
params.previewDispositionByLane[laneName] = "finalized";
|
||||
markActivePreviewComplete(laneName);
|
||||
return "preview-finalized";
|
||||
}
|
||||
}
|
||||
@@ -483,11 +488,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
context: "final",
|
||||
});
|
||||
if (finalized === "edited") {
|
||||
params.previewDispositionByLane[laneName] = "finalized";
|
||||
markActivePreviewComplete(laneName);
|
||||
return "preview-finalized";
|
||||
}
|
||||
if (finalized === "retained") {
|
||||
params.previewDispositionByLane[laneName] = "retained";
|
||||
markActivePreviewComplete(laneName);
|
||||
return "preview-retained";
|
||||
}
|
||||
} else if (!hasMedia && !payload.isError && text.length > params.draftMaxChars) {
|
||||
|
||||
@@ -42,7 +42,8 @@ function createHarness(params?: {
|
||||
const deletePreviewMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const log = vi.fn();
|
||||
const markDelivered = vi.fn();
|
||||
const previewDispositionByLane = { answer: "transient", reasoning: "transient" } as const;
|
||||
const activePreviewLifecycleByLane = { answer: "transient", reasoning: "transient" } as const;
|
||||
const retainPreviewOnCleanupByLane = { answer: false, reasoning: false } as const;
|
||||
const archivedAnswerPreviews: Array<{
|
||||
messageId: number;
|
||||
textSnapshot: string;
|
||||
@@ -52,7 +53,8 @@ function createHarness(params?: {
|
||||
const deliverLaneText = createLaneTextDeliverer({
|
||||
lanes,
|
||||
archivedAnswerPreviews,
|
||||
previewDispositionByLane: { ...previewDispositionByLane },
|
||||
activePreviewLifecycleByLane: { ...activePreviewLifecycleByLane },
|
||||
retainPreviewOnCleanupByLane: { ...retainPreviewOnCleanupByLane },
|
||||
draftMaxChars: params?.draftMaxChars ?? 4_096,
|
||||
applyTextToPayload: (payload: ReplyPayload, text: string) => ({ ...payload, text }),
|
||||
sendPayload,
|
||||
|
||||
@@ -4,7 +4,7 @@ export {
|
||||
type DraftLaneState,
|
||||
type LaneDeliveryResult,
|
||||
type LaneName,
|
||||
type LanePreviewDisposition,
|
||||
type LanePreviewLifecycle,
|
||||
} from "./lane-delivery-text-deliverer.js";
|
||||
export {
|
||||
createLaneDeliveryStateTracker,
|
||||
|
||||
Reference in New Issue
Block a user