Reply: preserve phased block metadata

This commit is contained in:
Gustavo Madeira Santana
2026-04-14 23:44:04 -04:00
parent 0bc4472b7e
commit 8db4bb7583
8 changed files with 313 additions and 67 deletions

View File

@@ -21,6 +21,7 @@ function createMessageUpdateContext(
onAgentEvent?: ReturnType<typeof vi.fn>;
onPartialReply?: ReturnType<typeof vi.fn>;
flushBlockReplyBuffer?: ReturnType<typeof vi.fn>;
resetAssistantMessageState?: ReturnType<typeof vi.fn>;
debug?: ReturnType<typeof vi.fn>;
shouldEmitPartialReplies?: boolean;
} = {},
@@ -50,6 +51,8 @@ function createMessageUpdateContext(
shouldEmitPartialReplies: params.shouldEmitPartialReplies ?? true,
blockReplyBreak: "text_end",
assistantMessageIndex: 0,
lastAssistantStreamItemId: undefined,
assistantTexts: [],
},
log: { debug: params.debug ?? vi.fn() },
noteLastAssistant: vi.fn(),
@@ -57,6 +60,7 @@ function createMessageUpdateContext(
consumePartialReplyDirectives: vi.fn(() => null),
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(),
resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(),
} as unknown as EmbeddedPiSubscribeContext;
}
@@ -190,6 +194,58 @@ describe("buildAssistantStreamData", () => {
});
});
describe("handleMessageUpdate", () => {
it("treats phased textSignature item changes as assistant-message boundaries", () => {
const flushBlockReplyBuffer = vi.fn();
const resetAssistantMessageState = vi.fn();
const onAssistantMessageStart = vi.fn();
const context = createMessageUpdateContext({
flushBlockReplyBuffer,
resetAssistantMessageState,
});
context.params.onAssistantMessageStart = onAssistantMessageStart;
context.state.lastAssistantStreamItemId = "item-1";
context.state.assistantMessageIndex = 7;
handleMessageUpdate(context, {
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
contentIndex: 1,
delta: "Second block",
partial: {
role: "assistant",
phase: "final_answer",
content: [
createOpenAiResponsesTextBlock({
text: "First block",
id: "item-1",
phase: "final_answer",
}),
createOpenAiResponsesTextBlock({
text: "Second block",
id: "item-2",
phase: "final_answer",
}),
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
} as never);
expect(flushBlockReplyBuffer).toHaveBeenCalledWith({ assistantMessageIndex: 7 });
expect(resetAssistantMessageState).toHaveBeenCalledWith(0);
expect(onAssistantMessageStart).toHaveBeenCalledTimes(1);
expect(context.state.lastAssistantStreamItemId).toBe("item-2");
});
});
describe("consumePendingToolMediaIntoReply", () => {
it("attaches queued tool media to the next assistant reply", () => {
const state = {

View File

@@ -6,6 +6,7 @@ import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
import {
parseAssistantTextSignature,
resolveAssistantMessagePhase,
type AssistantPhase,
} from "../shared/chat-message-content.js";
@@ -84,6 +85,38 @@ function isTranscriptOnlyOpenClawAssistantMessage(message: AgentMessage | undefi
return provider === "openclaw" && (model === "delivery-mirror" || model === "gateway-injected");
}
function resolveAssistantStreamItemId(params: {
contentIndex?: unknown;
message: AgentMessage | undefined;
}): string | undefined {
const content = (params.message as { content?: unknown } | undefined)?.content;
if (!Array.isArray(content)) {
return undefined;
}
const contentIndex =
typeof params.contentIndex === "number" &&
Number.isInteger(params.contentIndex) &&
params.contentIndex >= 0
? params.contentIndex
: undefined;
const candidateBlocks =
contentIndex !== undefined ? [content[contentIndex]] : content.toReversed();
for (const block of candidateBlocks) {
if (!block || typeof block !== "object") {
continue;
}
const record = block as { type?: unknown; textSignature?: unknown };
if (record.type !== "text") {
continue;
}
const signature = parseAssistantTextSignature(record.textSignature);
if (signature?.id) {
return signature.id;
}
}
return undefined;
}
function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) {
if (!ctx.state.reasoningStreamOpen) {
return;
@@ -92,6 +125,66 @@ function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) {
void ctx.params.onReasoningEnd?.();
}
function openReasoningStream(ctx: EmbeddedPiSubscribeContext) {
ctx.state.reasoningStreamOpen = true;
}
function shouldSuppressDeterministicApprovalOutput(
state: Pick<
EmbeddedPiSubscribeState,
"deterministicApprovalPromptPending" | "deterministicApprovalPromptSent"
>,
): boolean {
return state.deterministicApprovalPromptPending || state.deterministicApprovalPromptSent;
}
function appendBlockReplyChunk(ctx: EmbeddedPiSubscribeContext, chunk: string) {
if (ctx.blockChunker) {
ctx.blockChunker.append(chunk);
return;
}
ctx.state.blockBuffer += chunk;
}
function replaceBlockReplyBuffer(ctx: EmbeddedPiSubscribeContext, text: string) {
if (ctx.blockChunker) {
ctx.blockChunker.reset();
ctx.blockChunker.append(text);
return;
}
ctx.state.blockBuffer = text;
}
function resolveAssistantTextChunk(params: {
evtType: "text_delta" | "text_start" | "text_end";
delta: string;
content: string;
accumulatedText: string;
}): string {
const { evtType, delta, content, accumulatedText } = params;
if (evtType === "text_delta") {
return delta;
}
if (delta) {
return delta;
}
if (!content) {
return "";
}
// KNOWN: Some providers resend full content on `text_end`.
// We only append a suffix (or nothing) to keep output monotonic.
if (content.startsWith(accumulatedText)) {
return content.slice(accumulatedText.length);
}
if (accumulatedText.startsWith(content)) {
return "";
}
if (!accumulatedText.includes(content)) {
return content;
}
return "";
}
export function resolveSilentReplyFallbackText(params: {
text: unknown;
messagingToolSentTexts: string[];
@@ -219,8 +312,7 @@ export function handleMessageUpdate(
if (suppressVisibleAssistantOutput) {
return;
}
const suppressDeterministicApprovalOutput =
ctx.state.deterministicApprovalPromptPending || ctx.state.deterministicApprovalPromptSent;
const suppressDeterministicApprovalOutput = shouldSuppressDeterministicApprovalOutput(ctx.state);
const assistantEvent = evt.assistantMessageEvent;
const assistantPhase = resolveAssistantMessagePhase(msg);
@@ -232,7 +324,7 @@ export function handleMessageUpdate(
if (evtType === "thinking_start" || evtType === "thinking_delta" || evtType === "thinking_end") {
if (evtType === "thinking_start" || evtType === "thinking_delta") {
ctx.state.reasoningStreamOpen = true;
openReasoningStream(ctx);
}
const thinkingDelta = typeof assistantRecord?.delta === "string" ? assistantRecord.delta : "";
const thinkingContent =
@@ -253,7 +345,7 @@ export function handleMessageUpdate(
}
if (evtType === "thinking_end") {
if (!ctx.state.reasoningStreamOpen) {
ctx.state.reasoningStreamOpen = true;
openReasoningStream(ctx);
}
emitReasoningEnd(ctx);
}
@@ -277,30 +369,31 @@ export function handleMessageUpdate(
content,
});
let chunk = "";
if (evtType === "text_delta") {
chunk = delta;
} else if (evtType === "text_start" || evtType === "text_end") {
if (delta) {
chunk = delta;
} else if (content) {
// KNOWN: Some providers resend full content on `text_end`.
// We only append a suffix (or nothing) to keep output monotonic.
if (content.startsWith(ctx.state.deltaBuffer)) {
chunk = content.slice(ctx.state.deltaBuffer.length);
} else if (ctx.state.deltaBuffer.startsWith(content)) {
chunk = "";
} else if (!ctx.state.deltaBuffer.includes(content)) {
chunk = content;
}
}
}
const chunk = resolveAssistantTextChunk({
evtType,
delta,
content,
accumulatedText: ctx.state.deltaBuffer,
});
const partialAssistant =
assistantRecord?.partial && typeof assistantRecord.partial === "object"
? (assistantRecord.partial as AssistantMessage)
: msg;
const deliveryPhase = resolveAssistantMessagePhase(partialAssistant);
const streamItemId = resolveAssistantStreamItemId({
contentIndex: assistantRecord?.contentIndex,
message: partialAssistant,
});
if (deliveryPhase && streamItemId) {
const previousStreamItemId = ctx.state.lastAssistantStreamItemId;
if (previousStreamItemId && previousStreamItemId !== streamItemId) {
void ctx.flushBlockReplyBuffer({ assistantMessageIndex: ctx.state.assistantMessageIndex });
ctx.resetAssistantMessageState(ctx.state.assistantTexts.length);
void ctx.params.onAssistantMessageStart?.();
}
ctx.state.lastAssistantStreamItemId = streamItemId;
}
if (deliveryPhase === "commentary") {
return;
}
@@ -310,11 +403,7 @@ export function handleMessageUpdate(
if (chunk) {
ctx.state.deltaBuffer += chunk;
if (!shouldUsePhaseAwareBlockReply) {
if (ctx.blockChunker) {
ctx.blockChunker.append(chunk);
} else {
ctx.state.blockBuffer += chunk;
}
appendBlockReplyChunk(ctx, chunk);
}
}
@@ -337,7 +426,7 @@ export function handleMessageUpdate(
const wasThinking = ctx.state.partialBlockState.thinking;
const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : "";
if (!wasThinking && ctx.state.partialBlockState.thinking) {
ctx.state.reasoningStreamOpen = true;
openReasoningStream(ctx);
}
// Detect when thinking block ends (</think> tag processed)
if (wasThinking && !ctx.state.partialBlockState.thinking) {
@@ -370,20 +459,11 @@ export function handleMessageUpdate(
}
const blockReplyChunk = replace ? cleanedText : deltaText;
if (blockReplyChunk) {
if (ctx.blockChunker) {
ctx.blockChunker.append(blockReplyChunk);
} else {
ctx.state.blockBuffer += blockReplyChunk;
}
appendBlockReplyChunk(ctx, blockReplyChunk);
}
if (evtType === "text_end" && !ctx.state.lastBlockReplyText && cleanedText) {
if (ctx.blockChunker) {
ctx.blockChunker.reset();
ctx.blockChunker.append(cleanedText);
} else {
ctx.state.blockBuffer = cleanedText;
}
replaceBlockReplyBuffer(ctx, cleanedText);
}
}
@@ -455,8 +535,7 @@ export function handleMessageEnd(
const assistantMessage = msg;
const assistantPhase = resolveAssistantMessagePhase(assistantMessage);
const suppressVisibleAssistantOutput = shouldSuppressAssistantVisibleOutput(assistantMessage);
const suppressDeterministicApprovalOutput =
ctx.state.deterministicApprovalPromptPending || ctx.state.deterministicApprovalPromptSent;
const suppressDeterministicApprovalOutput = shouldSuppressDeterministicApprovalOutput(ctx.state);
ctx.noteLastAssistant(assistantMessage);
ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage);
if (suppressVisibleAssistantOutput) {

View File

@@ -53,6 +53,7 @@ export type EmbeddedPiSubscribeState = {
lastBlockReplyText?: string;
reasoningStreamOpen: boolean;
assistantMessageIndex: number;
lastAssistantStreamItemId?: string;
lastAssistantTextMessageIndex: number;
lastAssistantTextNormalized?: string;
lastAssistantTextTrimmed?: string;

View File

@@ -97,6 +97,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
lastBlockReplyText: undefined,
reasoningStreamOpen: false,
assistantMessageIndex: 0,
lastAssistantStreamItemId: undefined,
lastAssistantTextMessageIndex: -1,
lastAssistantTextNormalized: undefined,
lastAssistantTextTrimmed: undefined,
@@ -206,6 +207,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.reasoningStreamOpen = false;
state.suppressBlockChunks = false;
state.assistantMessageIndex += 1;
state.lastAssistantStreamItemId = undefined;
state.lastAssistantTextMessageIndex = -1;
state.lastAssistantTextNormalized = undefined;
state.lastAssistantTextTrimmed = undefined;
@@ -322,26 +324,26 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
ensureCompactionPromise();
};
const resolveCompactionPromiseIfIdle = () => {
if (state.pendingCompactionRetry !== 0 || state.compactionInFlight) {
return;
}
state.compactionRetryResolve?.();
state.compactionRetryResolve = undefined;
state.compactionRetryReject = undefined;
state.compactionRetryPromise = null;
};
const resolveCompactionRetry = () => {
if (state.pendingCompactionRetry <= 0) {
return;
}
state.pendingCompactionRetry -= 1;
if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) {
state.compactionRetryResolve?.();
state.compactionRetryResolve = undefined;
state.compactionRetryReject = undefined;
state.compactionRetryPromise = null;
}
resolveCompactionPromiseIfIdle();
};
const maybeResolveCompactionWait = () => {
if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) {
state.compactionRetryResolve?.();
state.compactionRetryResolve = undefined;
state.compactionRetryReject = undefined;
state.compactionRetryPromise = null;
}
resolveCompactionPromiseIfIdle();
};
const recordAssistantUsage = (usageLike: unknown) => {
const usage = normalizeUsage((usageLike ?? undefined) as UsageLike | undefined);

View File

@@ -1,4 +1,5 @@
import { describe, expect, it } from "vitest";
import { setReplyPayloadMetadata } from "../reply-payload.js";
import {
createBlockReplyContentKey,
createBlockReplyPayloadKey,
@@ -55,7 +56,7 @@ describe("createBlockReplyPipeline dedup with threading", () => {
pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" });
pipeline.enqueue({ text: "response text", replyToId: undefined });
await pipeline.flush();
await pipeline.flush({ force: true });
expect(sent).toEqual([
{ text: "response text", replyToId: "thread-root-1" },
@@ -70,10 +71,32 @@ describe("createBlockReplyPipeline dedup with threading", () => {
});
pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" });
await pipeline.flush();
await pipeline.flush({ force: true });
// Final payload with no replyToId should be recognized as already sent
expect(pipeline.hasSentPayload({ text: "response text" })).toBe(true);
expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true);
});
it("does not coalesce logical assistant blocks across assistantMessageIndex boundaries", async () => {
const sent: string[] = [];
const pipeline = createBlockReplyPipeline({
onBlockReply: async (payload) => {
sent.push(payload.text ?? "");
},
timeoutMs: 5000,
coalescing: {
minChars: 100,
maxChars: 200,
idleMs: 1000,
joiner: "\n\n",
},
});
pipeline.enqueue(setReplyPayloadMetadata({ text: "Alpha" }, { assistantMessageIndex: 0 }));
pipeline.enqueue(setReplyPayloadMetadata({ text: "Beta" }, { assistantMessageIndex: 1 }));
await pipeline.flush({ force: true });
expect(sent).toEqual(["Alpha", "Beta"]);
});
});

View File

@@ -1,5 +1,6 @@
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { logVerbose } from "../../globals.js";
import { getReplyPayloadMetadata } from "../reply-payload.js";
import type { ReplyPayload } from "../types.js";
import { createBlockReplyCoalescer } from "./block-reply-coalescer.js";
import type { BlockStreamingCoalescing } from "./block-streaming.js";
@@ -90,11 +91,20 @@ export function createBlockReplyPipeline(params: {
const bufferedKeys = new Set<string>();
const bufferedPayloadKeys = new Set<string>();
const bufferedPayloads: ReplyPayload[] = [];
let bufferedAssistantMessageIndex: number | undefined;
let sendChain: Promise<void> = Promise.resolve();
let aborted = false;
let didStream = false;
let didLogTimeout = false;
const hasSeenOrQueuedPayloadKey = (payloadKey: string) =>
seenKeys.has(payloadKey) || sentKeys.has(payloadKey) || pendingKeys.has(payloadKey);
const flushBufferedAssistantBlock = () => {
bufferedAssistantMessageIndex = undefined;
void coalescer?.flush({ force: true });
};
const sendPayload = (payload: ReplyPayload, bypassSeenCheck: boolean = false) => {
if (aborted) {
return;
@@ -163,6 +173,7 @@ export function createBlockReplyPipeline(params: {
config: coalescing,
shouldAbort: () => aborted,
onFlush: (payload) => {
bufferedAssistantMessageIndex = undefined;
bufferedKeys.clear();
sendPayload(payload, /* bypassSeenCheck */ true);
},
@@ -175,12 +186,7 @@ export function createBlockReplyPipeline(params: {
return false;
}
const payloadKey = createBlockReplyPayloadKey(payload);
if (
seenKeys.has(payloadKey) ||
sentKeys.has(payloadKey) ||
pendingKeys.has(payloadKey) ||
bufferedPayloadKeys.has(payloadKey)
) {
if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedPayloadKeys.has(payloadKey)) {
return true;
}
seenKeys.add(payloadKey);
@@ -215,12 +221,25 @@ export function createBlockReplyPipeline(params: {
return;
}
if (coalescer) {
const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex;
if (
assistantMessageIndex !== undefined &&
bufferedAssistantMessageIndex !== undefined &&
assistantMessageIndex !== bufferedAssistantMessageIndex &&
coalescer.hasBuffered()
) {
// Logical assistant blocks must not be merged together by the generic
// coalescer. Force-flush the previous buffered block before starting a
// new assistant-message block.
flushBufferedAssistantBlock();
}
const payloadKey = createBlockReplyPayloadKey(payload);
if (seenKeys.has(payloadKey) || pendingKeys.has(payloadKey) || bufferedKeys.has(payloadKey)) {
if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedKeys.has(payloadKey)) {
return;
}
seenKeys.add(payloadKey);
bufferedKeys.add(payloadKey);
bufferedAssistantMessageIndex = assistantMessageIndex;
coalescer.enqueue(payload);
return;
}
@@ -229,6 +248,7 @@ export function createBlockReplyPipeline(params: {
const flush = async (options?: { force?: boolean }) => {
await coalescer?.flush(options);
bufferedAssistantMessageIndex = undefined;
flushBuffered();
await sendChain;
};

View File

@@ -1,5 +1,6 @@
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js";
import { createBlockReplyContentKey } from "./block-reply-pipeline.js";
import {
createBlockReplyDeliveryHandler,
@@ -159,4 +160,41 @@ describe("createBlockReplyDeliveryHandler", () => {
audioAsVoice: false,
});
});
it("preserves reply payload metadata across block-reply normalization", async () => {
const enqueue = vi.fn();
const blockReplyPipeline = {
enqueue,
} as unknown as BlockReplyPipelineLike;
const handler = createBlockReplyDeliveryHandler({
onBlockReply: vi.fn(async () => {}),
normalizeStreamingText: (payload) => ({ text: payload.text, skip: false }),
applyReplyToMode: (payload) => ({ ...payload, replyToTag: true }),
typingSignals: {
signalTextDelta: vi.fn(async () => {}),
} as unknown as TypingSignaler,
blockStreamingEnabled: true,
blockReplyPipeline,
directlySentBlockKeys: new Set(),
});
const payload = setReplyPayloadMetadata({ text: "Alpha" }, { assistantMessageIndex: 7 });
await handler(payload);
const enqueuedPayload = enqueue.mock.calls[0]?.[0];
expect(enqueuedPayload).toEqual({
text: "Alpha",
mediaUrl: undefined,
replyToId: undefined,
replyToCurrent: undefined,
replyToTag: true,
audioAsVoice: false,
mediaUrls: undefined,
});
expect(getReplyPayloadMetadata(enqueuedPayload)).toEqual({
assistantMessageIndex: 7,
});
});
});

View File

@@ -1,5 +1,6 @@
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { logVerbose } from "../../globals.js";
import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { BlockReplyContext, ReplyPayload } from "../types.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
@@ -58,6 +59,21 @@ export function normalizeReplyPayloadDirectives(params: {
};
}
function carryReplyPayloadMetadata(source: ReplyPayload, target: ReplyPayload): ReplyPayload {
const metadata = getReplyPayloadMetadata(source);
return metadata ? setReplyPayloadMetadata(target, metadata) : target;
}
async function sendDirectBlockReply(params: {
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
directlySentBlockKeys: Set<string>;
trackingPayload: ReplyPayload;
payload: ReplyPayload;
}) {
params.directlySentBlockKeys.add(createBlockReplyContentKey(params.trackingPayload));
await params.onBlockReply(params.payload);
}
export function createBlockReplyDeliveryHandler(params: {
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
currentMessageId?: string;
@@ -103,7 +119,10 @@ export function createBlockReplyDeliveryHandler(params: {
const mediaNormalizedPayload = params.normalizeMediaPaths
? await params.normalizeMediaPaths(normalized.payload)
: normalized.payload;
const blockPayload = params.applyReplyToMode(mediaNormalizedPayload);
const blockPayload = carryReplyPayloadMetadata(
payload,
params.applyReplyToMode(mediaNormalizedPayload),
);
const blockHasMedia = resolveSendableOutboundReplyParts(blockPayload).hasMedia;
// Skip empty payloads unless they have audioAsVoice flag (need to track it).
@@ -126,14 +145,22 @@ export function createBlockReplyDeliveryHandler(params: {
} else if (params.blockStreamingEnabled) {
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
// Track sent key to avoid duplicate in final payloads.
params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload));
await params.onBlockReply(blockPayload);
await sendDirectBlockReply({
onBlockReply: params.onBlockReply,
directlySentBlockKeys: params.directlySentBlockKeys,
trackingPayload: blockPayload,
payload: blockPayload,
});
} else if (blockHasMedia) {
// When block streaming is disabled, text-only block replies are accumulated into the
// final response. Media cannot be reconstructed later, so send it immediately and let
// the assistant's final text arrive through the normal final-reply path.
params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload));
await params.onBlockReply({ ...blockPayload, text: undefined });
await sendDirectBlockReply({
onBlockReply: params.onBlockReply,
directlySentBlockKeys: params.directlySentBlockKeys,
trackingPayload: blockPayload,
payload: { ...blockPayload, text: undefined },
});
}
// When streaming is disabled entirely, text-only blocks are accumulated in final text.
};