fix(auto-reply): preserve streaming reply directives (#70243)

Preserve streamed MEDIA/reply/audio directives across chunk boundaries and phase-aware final_answer delivery.\n\nThanks @zqchris.
This commit is contained in:
zqchris
2026-04-23 03:11:00 +08:00
committed by GitHub
parent b1b1979841
commit b24ae8b18b
8 changed files with 455 additions and 40 deletions

View File

@@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai
- Gateway/pairing: shared-secret loopback CLI clients now silently auto-approve `metadata-upgrade` pairing (platform / device family refresh) instead of being disconnected with `1008 pairing required`. This matches the scope-upgrade and role-upgrade behavior added in #69431 and unblocks non-interactive CLI automation when a paired-device record has a stale platform string (e.g. device key replicated across hosts, install migrated between OSes, or platform-string format changed between OpenClaw versions). Browser / Control-UI clients keep the existing approval-required flow for metadata changes.
- Gateway/pairing: treat any forwarded-header evidence (`Forwarded`, `X-Forwarded-*`, or `X-Real-IP`) as proxied WebSocket traffic before pairing locality checks, so reverse-proxy topologies cannot use the loopback shared-secret helper auto-pairing path.
- Agents/OpenAI: treat exact `NO_REPLY` assistant output as a deliberate silent reply in embedded runs, so GPT-5.4 turns with signed reasoning plus a silent final no longer surface a false incomplete-turn error.
- Auto-reply/streaming: preserve streamed reply directives through chunk boundaries and phase-aware `final_answer` delivery, so split `MEDIA:<path>` lines, voice tags, and reply targets reach channel delivery instead of leaking as text or being dropped. (#70243) Thanks @zqchris.
- Gateway/pairing webchat: render `/pair qr` replies as structured media instead of raw markdown text, preserve inline reply threading and silent-control handling on media replies, avoid persisting sensitive QR images into transcript history, and keep local webchat media embedding behind internal-only trust markers. (#70047) Thanks @BunsDev.
- Codex harness: default app-server runs to unchained local execution, so OpenAI heartbeats can use network and shell tools without stalling behind native Codex approvals or the workspace-write sandbox.
- Codex harness: apply the GPT-5 behavior and heartbeat prompt overlay to native Codex app-server runs, so `codex/gpt-5.x` sessions get the same follow-through, tool-use, and proactive heartbeat guidance as OpenAI GPT-5 runs.

View File

@@ -1,12 +1,15 @@
import { describe, expect, it, vi } from "vitest";
import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
import {
buildAssistantStreamData,
consumePendingAssistantReplyDirectivesIntoReply,
consumePendingToolMediaIntoReply,
consumePendingToolMediaReply,
handleMessageEnd,
handleMessageUpdate,
hasAssistantVisibleReply,
recordPendingAssistantReplyDirectives,
resolveSilentReplyFallbackText,
} from "./pi-embedded-subscribe.handlers.messages.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
@@ -24,6 +27,8 @@ function createMessageUpdateContext(
resetAssistantMessageState?: ReturnType<typeof vi.fn>;
debug?: ReturnType<typeof vi.fn>;
shouldEmitPartialReplies?: boolean;
consumePartialReplyDirectives?: ReturnType<typeof vi.fn>;
state?: Record<string, unknown>;
} = {},
) {
return {
@@ -53,11 +58,13 @@ function createMessageUpdateContext(
assistantMessageIndex: 0,
lastAssistantStreamItemId: undefined,
assistantTexts: [],
pendingAssistantReplyDirectives: undefined,
...params.state,
},
log: { debug: params.debug ?? vi.fn() },
noteLastAssistant: vi.fn(),
stripBlockTags: (text: string) => text,
consumePartialReplyDirectives: vi.fn(() => null),
consumePartialReplyDirectives: params.consumePartialReplyDirectives ?? vi.fn(() => null),
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(),
resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(),
@@ -194,6 +201,54 @@ describe("buildAssistantStreamData", () => {
});
});
describe("pending assistant reply directives", () => {
it("merges directive metadata into the next non-reasoning block reply", () => {
const state = { pendingAssistantReplyDirectives: undefined };
recordPendingAssistantReplyDirectives(state, {
text: "",
mediaUrls: ["/tmp/reply.ogg"],
replyToCurrent: true,
replyToTag: true,
audioAsVoice: true,
isSilent: false,
});
expect(
consumePendingAssistantReplyDirectivesIntoReply(state, {
text: "Done.",
}),
).toEqual({
text: "Done.",
mediaUrls: ["/tmp/reply.ogg"],
audioAsVoice: true,
replyToId: undefined,
replyToTag: true,
replyToCurrent: true,
});
expect(state.pendingAssistantReplyDirectives).toBeUndefined();
});
it("does not consume pending directive metadata on reasoning replies", () => {
const state = {
pendingAssistantReplyDirectives: {
mediaUrls: ["/tmp/reply.png"],
},
};
expect(
consumePendingAssistantReplyDirectivesIntoReply(state, {
text: "Thinking...",
isReasoning: true,
}),
).toEqual({
text: "Thinking...",
isReasoning: true,
});
expect(state.pendingAssistantReplyDirectives?.mediaUrls).toEqual(["/tmp/reply.png"]);
});
});
describe("handleMessageUpdate", () => {
it("treats phased textSignature item changes as assistant-message boundaries", () => {
const flushBlockReplyBuffer = vi.fn();
@@ -244,6 +299,54 @@ describe("handleMessageUpdate", () => {
expect(onAssistantMessageStart).toHaveBeenCalledTimes(1);
expect(context.state.lastAssistantStreamItemId).toBe("item-2");
});
it("preserves phase-aware media, voice, and reply directives for block delivery", () => {
const accumulator = createStreamingDirectiveAccumulator();
const ctx = createMessageUpdateContext({
consumePartialReplyDirectives: vi.fn((text: string, options?: { final?: boolean }) =>
accumulator.consume(text, options),
),
state: {
blockReplyBreak: "message_end",
},
});
const replyText = "Done.\n\n[[reply_to_current]]\n[[audio_as_voice]]\nMEDIA:/tmp/reply.ogg";
handleMessageUpdate(
ctx,
createTextUpdateEvent({
type: "text_delta",
text: replyText,
id: "item-final",
signaturePhase: "final_answer",
partialPhase: "final_answer",
}),
);
handleMessageUpdate(
ctx,
createTextUpdateEvent({
type: "text_end",
text: replyText,
id: "item-final",
signaturePhase: "final_answer",
partialPhase: "final_answer",
}),
);
expect(ctx.state.blockBuffer).toBe("Done.");
expect(
consumePendingAssistantReplyDirectivesIntoReply(ctx.state, {
text: "Done.",
}),
).toEqual({
text: "Done.",
mediaUrls: ["/tmp/reply.ogg"],
audioAsVoice: true,
replyToId: undefined,
replyToTag: true,
replyToCurrent: true,
});
});
});
describe("consumePendingToolMediaIntoReply", () => {

View File

@@ -1,7 +1,11 @@
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import {
parseReplyDirectives,
type ReplyDirectiveParseResult,
} from "../auto-reply/reply/reply-directives.js";
import { splitTrailingDirective } from "../auto-reply/reply/streaming-directives.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
@@ -33,21 +37,6 @@ import {
promoteThinkingTagsToBlocks,
} from "./pi-embedded-utils.js";
const stripTrailingDirective = (text: string): string => {
const openIndex = text.lastIndexOf("[[");
if (openIndex < 0) {
if (text.endsWith("[")) {
return text.slice(0, -1);
}
return text;
}
const closeIndex = text.indexOf("]]", openIndex + 2);
if (closeIndex >= 0) {
return text;
}
return text.slice(0, openIndex);
};
function shouldSuppressAssistantVisibleOutput(message: AgentMessage | undefined): boolean {
return resolveAssistantMessagePhase(message) === "commentary";
}
@@ -242,6 +231,88 @@ export function consumePendingToolMediaReply(
return payload;
}
function hasReplyDirectiveMetadata(parsed: ReplyDirectiveParseResult | null | undefined): boolean {
return Boolean(
parsed &&
((parsed.mediaUrls?.length ?? 0) > 0 ||
parsed.audioAsVoice ||
parsed.replyToId ||
parsed.replyToTag ||
parsed.replyToCurrent),
);
}
function hasReplyDirectiveMetadataResult(
parsed: ReplyDirectiveParseResult | null | undefined,
): parsed is ReplyDirectiveParseResult {
return hasReplyDirectiveMetadata(parsed);
}
function mergeReplyDirectiveResults(
first: ReplyDirectiveParseResult | null | undefined,
second: ReplyDirectiveParseResult | null | undefined,
): ReplyDirectiveParseResult | null {
if (!first) {
return second ?? null;
}
if (!second) {
return first;
}
const mediaUrls = Array.from(new Set([...(first.mediaUrls ?? []), ...(second.mediaUrls ?? [])]));
return {
text: `${first.text ?? ""}${second.text ?? ""}`,
mediaUrls: mediaUrls.length ? mediaUrls : undefined,
mediaUrl: mediaUrls[0] ?? first.mediaUrl ?? second.mediaUrl,
replyToId: second.replyToId ?? first.replyToId,
replyToCurrent: first.replyToCurrent || second.replyToCurrent,
replyToTag: first.replyToTag || second.replyToTag,
audioAsVoice: first.audioAsVoice || second.audioAsVoice || undefined,
isSilent: first.isSilent || second.isSilent,
};
}
export function recordPendingAssistantReplyDirectives(
state: Pick<EmbeddedPiSubscribeState, "pendingAssistantReplyDirectives">,
parsed: ReplyDirectiveParseResult | null | undefined,
) {
if (!hasReplyDirectiveMetadataResult(parsed)) {
return;
}
const current = state.pendingAssistantReplyDirectives;
const mediaUrls = Array.from(
new Set([...(current?.mediaUrls ?? []), ...(parsed.mediaUrls ?? [])]),
);
state.pendingAssistantReplyDirectives = {
mediaUrls: mediaUrls.length ? mediaUrls : undefined,
audioAsVoice: current?.audioAsVoice || parsed?.audioAsVoice || undefined,
replyToId: parsed?.replyToId ?? current?.replyToId,
replyToTag: current?.replyToTag || parsed.replyToTag || undefined,
replyToCurrent: current?.replyToCurrent || parsed.replyToCurrent || undefined,
};
}
export function consumePendingAssistantReplyDirectivesIntoReply(
state: Pick<EmbeddedPiSubscribeState, "pendingAssistantReplyDirectives">,
payload: BlockReplyPayload,
): BlockReplyPayload {
if (payload.isReasoning || !state.pendingAssistantReplyDirectives) {
return payload;
}
const pending = state.pendingAssistantReplyDirectives;
const mediaUrls = Array.from(
new Set([...(payload.mediaUrls ?? []), ...(pending.mediaUrls ?? [])]),
);
state.pendingAssistantReplyDirectives = undefined;
return {
...payload,
mediaUrls: mediaUrls.length ? mediaUrls : undefined,
audioAsVoice: payload.audioAsVoice || pending.audioAsVoice || undefined,
replyToId: payload.replyToId ?? pending.replyToId,
replyToTag: Boolean(payload.replyToTag || pending.replyToTag) || undefined,
replyToCurrent: Boolean(payload.replyToCurrent || pending.replyToCurrent) || undefined,
};
}
export function hasAssistantVisibleReply(params: {
text?: string;
mediaUrls?: string[];
@@ -431,10 +502,16 @@ export function handleMessageUpdate(
emitReasoningEnd(ctx);
}
const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null;
const parsedFull = parseReplyDirectives(stripTrailingDirective(next));
const finalParsedDelta =
evtType === "text_end" ? ctx.consumePartialReplyDirectives("", { final: true }) : null;
const parsedStreamDirectives = mergeReplyDirectiveResults(parsedDelta, finalParsedDelta);
if (shouldUsePhaseAwareBlockReply) {
recordPendingAssistantReplyDirectives(ctx.state, parsedStreamDirectives);
}
const parsedFull = parseReplyDirectives(splitTrailingDirective(next).text);
const cleanedText = parsedFull.text;
const { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedDelta ?? {});
const hasAudio = Boolean(parsedDelta?.audioAsVoice);
const { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedStreamDirectives ?? {});
const hasAudio = Boolean(parsedStreamDirectives?.audioAsVoice);
const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? "";
let shouldEmit = false;
@@ -562,7 +639,9 @@ export function handleMessageEnd(
: "";
const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : "";
const trimmedText = text.trim();
const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null;
const parsedText = trimmedText
? parseReplyDirectives(splitTrailingDirective(trimmedText, { final: true }).text)
: null;
let cleanedText = parsedText?.text ?? "";
let { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedText ?? {});
@@ -695,6 +774,14 @@ export function handleMessageEnd(
if (hasBufferedBlockReply && ctx.blockChunker?.hasBuffered()) {
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
ctx.blockChunker.reset();
// Final-flush the streaming directive accumulator so any partial
// directive tail held back by splitTrailingDirective (for example a
// trailing `MEDIA:<path>` that arrived without a closing newline)
// gets emitted here. Without this, a reply ending in a directive
// line whose URL is complete but un-terminated would sit in
// pendingTail forever and the attachment would be silently dropped
// on the message_end / blockReplyChunking path.
emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true }));
} else if (text !== ctx.state.lastBlockReplyText) {
// Guard: for text_end channels, if text_end already delivered content
// (lastBlockReplyText is set), skip this safety send. The text comparison

View File

@@ -82,6 +82,10 @@ export type EmbeddedPiSubscribeState = {
pendingToolMediaUrls: string[];
pendingToolAudioAsVoice: boolean;
pendingToolTrustedLocalMedia: boolean;
pendingAssistantReplyDirectives?: Pick<
BlockReplyPayload,
"mediaUrls" | "audioAsVoice" | "replyToId" | "replyToTag" | "replyToCurrent"
>;
deterministicApprovalPromptPending: boolean;
deterministicApprovalPromptSent: boolean;
lastAssistant?: AgentMessage;

View File

@@ -446,7 +446,7 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payloads).toHaveLength(1);
});
it("emits a replacement snapshot when cleaned text rewinds mid-stream", () => {
it("emits one cleaned media snapshot when a streamed MEDIA line resolves to caption text", () => {
const { emit, onAgentEvent } = createAgentEventHarness();
emit({ type: "message_start", message: { role: "assistant" } });
@@ -454,20 +454,26 @@ describe("subscribeEmbeddedPiSession", () => {
emitAssistantTextDelta(emit, " https://example.com/a.png\nCaption");
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
expect(payloads).toHaveLength(2);
expect(payloads[0]?.text).toBe("MEDIA:");
expect(payloads[0]?.delta).toBe("MEDIA:");
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe("Caption");
expect(payloads[0]?.delta).toBe("Caption");
expect(payloads[0]?.replace).toBeUndefined();
expect(payloads[1]?.text).toBe("Caption");
expect(payloads[1]?.delta).toBe("");
expect(payloads[1]?.replace).toBe(true);
expect(payloads[0]?.mediaUrls).toEqual(["https://example.com/a.png"]);
});
it("emits agent events when media arrives without text", () => {
it("emits agent events when media-only text is finalized", () => {
const { emit, onAgentEvent } = createAgentEventHarness();
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta(emit, "MEDIA: https://example.com/a.png");
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: "MEDIA: https://example.com/a.png",
},
});
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
expect(payloads).toHaveLength(1);

View File

@@ -21,7 +21,10 @@ import {
} from "./pi-embedded-runner/replay-state.js";
import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js";
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
import { consumePendingToolMediaIntoReply } from "./pi-embedded-subscribe.handlers.messages.js";
import {
consumePendingAssistantReplyDirectivesIntoReply,
consumePendingToolMediaIntoReply,
} from "./pi-embedded-subscribe.handlers.messages.js";
import type {
EmbeddedPiSubscribeContext,
EmbeddedPiSubscribeState,
@@ -124,6 +127,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
pendingToolMediaUrls: initialPendingToolMediaUrls,
pendingToolAudioAsVoice: false,
pendingToolTrustedLocalMedia: false,
pendingAssistantReplyDirectives: undefined,
deterministicApprovalPromptPending: false,
deterministicApprovalPromptSent: false,
};
@@ -184,7 +188,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
payload: BlockReplyPayload,
options?: { assistantMessageIndex?: number },
) => {
emitBlockReplySafely(consumePendingToolMediaIntoReply(state, payload), options);
const withAssistantDirectives = consumePendingAssistantReplyDirectivesIntoReply(state, payload);
emitBlockReplySafely(consumePendingToolMediaIntoReply(state, withAssistantDirectives), options);
};
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
@@ -213,6 +218,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.lastAssistantTextNormalized = undefined;
state.lastAssistantTextTrimmed = undefined;
state.assistantTextBaseline = nextAssistantTextBaseline;
state.pendingAssistantReplyDirectives = undefined;
};
const rememberAssistantText = (text: string) => {
@@ -710,6 +716,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.pendingMessagingMediaUrls.clear();
state.pendingToolMediaUrls = [];
state.pendingToolAudioAsVoice = false;
state.pendingAssistantReplyDirectives = undefined;
state.deterministicApprovalPromptPending = false;
state.deterministicApprovalPromptSent = false;
state.replayState = mergeEmbeddedRunReplayState(state.replayState, params.initialReplayState);

View File

@@ -10,7 +10,10 @@ import {
hasTemplateVariables,
resolveResponsePrefixTemplate,
} from "./response-prefix-template.js";
import { createStreamingDirectiveAccumulator } from "./streaming-directives.js";
import {
createStreamingDirectiveAccumulator,
splitTrailingDirective,
} from "./streaming-directives.js";
import { createMockTypingController } from "./test-helpers.js";
import { createTypingSignaler, resolveTypingMode } from "./typing-mode.js";
import { createTypingController } from "./typing.js";
@@ -989,6 +992,15 @@ describe("createStreamingDirectiveAccumulator", () => {
expect(result?.replyToCurrent).toBe(true);
});
it("handles reply tags split before the second bracket", () => {
const accumulator = createStreamingDirectiveAccumulator();
expect(accumulator.consume("[")).toBeNull();
const result = accumulator.consume("[reply_to_current]] Yo");
expect(result?.text).toBe("Yo");
expect(result?.replyToCurrent).toBe(true);
});
it("propagates explicit reply ids across current and subsequent chunks", () => {
const accumulator = createStreamingDirectiveAccumulator();
@@ -1033,6 +1045,135 @@ describe("createStreamingDirectiveAccumulator", () => {
expect(result?.text).toBe("NO_REPLY: explanation");
});
it("reassembles MEDIA: directives split between the token and the colon", () => {
const accumulator = createStreamingDirectiveAccumulator();
const first = accumulator.consume("这次直接发图。\n\nMEDIA");
expect(first?.text).toBe("这次直接发图。");
expect(first?.mediaUrls).toBeUndefined();
const second = accumulator.consume(":/tmp/spy-family.png");
expect(second).toBeNull();
const finalResult = accumulator.consume("", { final: true });
expect(finalResult?.mediaUrls).toEqual(["/tmp/spy-family.png"]);
expect((finalResult?.text ?? "").includes("MEDIA")).toBe(false);
});
it("reassembles MEDIA: directives split inside the URL path", () => {
const accumulator = createStreamingDirectiveAccumulator();
const first = accumulator.consume("Preview below.\n\nMEDIA:/var/folders/tool-image");
expect(first?.text).toBe("Preview below.");
expect(first?.mediaUrls).toBeUndefined();
const second = accumulator.consume("-generation/cover.png");
expect(second).toBeNull();
const finalResult = accumulator.consume("", { final: true });
expect(finalResult?.mediaUrls).toEqual(["/var/folders/tool-image-generation/cover.png"]);
});
it("buffers partial MEDIA prefixes (M/ME/MED/MEDI) across chunk boundaries", () => {
for (const prefix of ["M", "ME", "MED", "MEDI"]) {
const accumulator = createStreamingDirectiveAccumulator();
const head = `Here is the file.\n\n${prefix}`;
const headResult = accumulator.consume(head);
expect(headResult?.text, `prefix=${prefix} head emits text`).toBe("Here is the file.");
const rest = `MEDIA:/tmp/file.png`.slice(prefix.length);
const restResult = accumulator.consume(rest);
expect(restResult, `prefix=${prefix} mid returns null`).toBeNull();
const finalResult = accumulator.consume("", { final: true });
expect(finalResult?.mediaUrls, `prefix=${prefix} final mediaUrls`).toEqual(["/tmp/file.png"]);
}
});
it("does not buffer a trailing letter that appears mid-line", () => {
const accumulator = createStreamingDirectiveAccumulator();
// "I am" ends in "m". The prefix guard only anchors to line-start (`^`
// or immediately after `\n`), so ordinary prose whose last character
// happens to be an `M|ME|MED|MEDI|MEDIA` letter stays in the emitted
// text.
const result = accumulator.consume("I am");
expect(result?.text).toBe("I am");
expect(result?.mediaUrls).toBeUndefined();
});
it("does not buffer prose that merely contains the token MEDIA:", () => {
const accumulator = createStreamingDirectiveAccumulator();
// Matches what upstream `splitMediaFromOutput` considers a directive:
// only lines whose trimmed start is `MEDIA:`. A line that merely
// contains "MEDIA:" mid-sentence is ordinary prose and must flush
// immediately — otherwise on a stream-item boundary (which may call
// `reset()` without a preceding `consume("", { final: true })`) the
// buffered prose would be silently dropped.
const result = accumulator.consume("See the MEDIA: section for details");
expect(result?.text).toBe("See the MEDIA: section for details");
expect(result?.mediaUrls).toBeUndefined();
});
it("still buffers an indented MEDIA directive line that is mid-stream", () => {
const accumulator = createStreamingDirectiveAccumulator();
// Upstream parser treats `line.trimStart().startsWith("MEDIA:")` as a
// directive, so the guard must also buffer the indented form across
// a chunk boundary.
const first = accumulator.consume("Preview:\n MEDIA:/tmp/cover");
expect(first?.text).toBe("Preview:");
expect(first?.mediaUrls).toBeUndefined();
const second = accumulator.consume(".png");
expect(second).toBeNull();
const finalResult = accumulator.consume("", { final: true });
expect(finalResult?.mediaUrls).toEqual(["/tmp/cover.png"]);
});
it("does not rewrite mid-prose MEDIA into a directive across chunks", () => {
const accumulator = createStreamingDirectiveAccumulator();
// A chunk can legitimately end with `MEDIA` mid-sentence (e.g. "this
// uses legacy MEDIA"). A later chunk starting with `:` must NOT join
// with the buffered token to synthesize a `MEDIA:<rest>` directive —
// upstream `MEDIA_TOKEN_RE` captures `[^\n]+`, and treating the rest
// of that sentence as a media path would invent a media reply the
// agent never authored.
const first = accumulator.consume("The legacy pipeline uses MEDIA");
expect(first?.text).toBe("The legacy pipeline uses MEDIA");
expect(first?.mediaUrls).toBeUndefined();
const second = accumulator.consume(": kind=disk capacity=1TB");
expect(second?.text).toBe(": kind=disk capacity=1TB");
expect(second?.mediaUrls).toBeUndefined();
});
it("passes plain text through when there is no incomplete directive tail", () => {
const accumulator = createStreamingDirectiveAccumulator();
const result = accumulator.consume("Hello world.\nThis is a complete block.");
expect(result?.text).toBe("Hello world.\nThis is a complete block.");
expect(result?.mediaUrls).toBeUndefined();
});
it("keeps MEDIA directives that arrive in a single complete chunk working", () => {
const accumulator = createStreamingDirectiveAccumulator();
const result = accumulator.consume("Here it is.\n\nMEDIA:/tmp/complete.png\n");
expect(result?.text.includes("MEDIA")).toBe(false);
expect(result?.mediaUrls).toEqual(["/tmp/complete.png"]);
});
it("does not strip a complete final MEDIA line when parsing final text", () => {
expect(splitTrailingDirective("Here.\nMEDIA:/tmp/final.png", { final: true })).toEqual({
text: "Here.\nMEDIA:/tmp/final.png",
tail: "",
});
});
});
describe("extractShortModelName", () => {

View File

@@ -25,18 +25,84 @@ type ConsumeOptions = {
silentToken?: string;
};
const splitTrailingDirective = (text: string): { text: string; tail: string } => {
type SplitTrailingDirectiveOptions = {
final?: boolean;
};
// Holds back incomplete streaming-directive tails so parseChunk only ever sees
// complete directives. Otherwise, upstream token boundaries can split markers
// like `MEDIA:<path>` between chunks and cause the first half to be emitted as
// plain text (e.g. the `MEDIA` token leaking into a channel reply while the
// matching file path is silently dropped on the next chunk).
export const splitTrailingDirective = (
text: string,
options: SplitTrailingDirectiveOptions = {},
): { text: string; tail: string } => {
let bufferStart = text.length;
// 1. Unclosed `[[…` reply/audio directive tail.
const openIndex = text.lastIndexOf("[[");
if (openIndex < 0) {
return { text, tail: "" };
}
const closeIndex = text.indexOf("]]", openIndex + 2);
if (closeIndex >= 0) {
if (openIndex >= 0 && text.indexOf("]]", openIndex + 2) < 0) {
if (openIndex < bufferStart) {
bufferStart = openIndex;
}
}
if (text.endsWith("[") && text.length - 1 < bufferStart) {
bufferStart = text.length - 1;
}
if (options.final) {
if (bufferStart >= text.length) {
return { text, tail: "" };
}
return {
text: text.slice(0, bufferStart),
tail: text.slice(bufferStart),
};
}
// 2. `MEDIA:` line without a trailing newline — the URL may still be
// streaming. `splitMediaFromOutput` in src/media/parse.ts treats a
// line as a media directive only when `line.trimStart()` begins with
// `MEDIA:`, so we match the same shape here: only buffer when the
// last line looks like an actual directive line (optional leading
// whitespace, then `MEDIA:`). Prose such as
// "See the MEDIA: section for details" does NOT qualify and is
// flushed as ordinary text — otherwise it could sit in pendingTail
// and be silently dropped if a stream-item boundary calls `reset()`
// without a preceding `consume("", { final: true })`.
const lastNewline = text.lastIndexOf("\n");
const lastLine = lastNewline < 0 ? text : text.slice(lastNewline + 1);
if (/^\s*MEDIA:/i.test(lastLine)) {
const mediaLineStart = lastNewline < 0 ? 0 : lastNewline + 1;
if (mediaLineStart < bufferStart) {
bufferStart = mediaLineStart;
}
}
// 3. Trailing `M|ME|MED|MEDI|MEDIA` prefix (no colon yet) at the start of
// a line — the next chunk might turn this into `MEDIA:<url>`. Only a
// line-start anchor (`^` or immediately after `\n`) is accepted so
// mid-prose tokens like "_M", "3ME", or "token MEDIA" are not
// speculatively buffered and cannot accidentally be glued to a
// following `:` into a synthetic directive. Matches the canonical
// MEDIA directive placement (own line after `\n\n`).
const prefixMatch = text.match(/(?:^|\n)(MEDIA|MEDI|MED|ME|M)$/i);
if (prefixMatch) {
const prefixStart = text.length - prefixMatch[1].length;
if (prefixStart < bufferStart) {
bufferStart = prefixStart;
}
}
if (bufferStart >= text.length) {
return { text, tail: "" };
}
return {
text: text.slice(0, openIndex),
tail: text.slice(openIndex),
text: text.slice(0, bufferStart),
tail: text.slice(bufferStart),
};
};