diff --git a/CHANGELOG.md b/CHANGELOG.md index 91c1d61d7b7..bb904f69b7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai - Gateway/pairing: shared-secret loopback CLI clients now silently auto-approve `metadata-upgrade` pairing (platform / device family refresh) instead of being disconnected with `1008 pairing required`. This matches the scope-upgrade and role-upgrade behavior added in #69431 and unblocks non-interactive CLI automation when a paired-device record has a stale platform string (e.g. device key replicated across hosts, install migrated between OSes, or platform-string format changed between OpenClaw versions). Browser / Control-UI clients keep the existing approval-required flow for metadata changes. - Gateway/pairing: treat any forwarded-header evidence (`Forwarded`, `X-Forwarded-*`, or `X-Real-IP`) as proxied WebSocket traffic before pairing locality checks, so reverse-proxy topologies cannot use the loopback shared-secret helper auto-pairing path. - Agents/OpenAI: treat exact `NO_REPLY` assistant output as a deliberate silent reply in embedded runs, so GPT-5.4 turns with signed reasoning plus a silent final no longer surface a false incomplete-turn error. +- Auto-reply/streaming: preserve streamed reply directives through chunk boundaries and phase-aware `final_answer` delivery, so split `MEDIA:` lines, voice tags, and reply targets reach channel delivery instead of leaking as text or being dropped. (#70243) Thanks @zqchris. - Gateway/pairing webchat: render `/pair qr` replies as structured media instead of raw markdown text, preserve inline reply threading and silent-control handling on media replies, avoid persisting sensitive QR images into transcript history, and keep local webchat media embedding behind internal-only trust markers. (#70047) Thanks @BunsDev. - Codex harness: default app-server runs to unchained local execution, so OpenAI heartbeats can use network and shell tools without stalling behind native Codex approvals or the workspace-write sandbox. - Codex harness: apply the GPT-5 behavior and heartbeat prompt overlay to native Codex app-server runs, so `codex/gpt-5.x` sessions get the same follow-through, tool-use, and proactive heartbeat guidance as OpenAI GPT-5 runs. diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index db315eca0d7..37151f0fbba 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -1,12 +1,15 @@ import { describe, expect, it, vi } from "vitest"; +import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; import { buildAssistantStreamData, + consumePendingAssistantReplyDirectivesIntoReply, consumePendingToolMediaIntoReply, consumePendingToolMediaReply, handleMessageEnd, handleMessageUpdate, hasAssistantVisibleReply, + recordPendingAssistantReplyDirectives, resolveSilentReplyFallbackText, } from "./pi-embedded-subscribe.handlers.messages.js"; import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; @@ -24,6 +27,8 @@ function createMessageUpdateContext( resetAssistantMessageState?: ReturnType; debug?: ReturnType; shouldEmitPartialReplies?: boolean; + consumePartialReplyDirectives?: ReturnType; + state?: Record; } = {}, ) { return { @@ -53,11 +58,13 @@ function createMessageUpdateContext( assistantMessageIndex: 0, lastAssistantStreamItemId: undefined, assistantTexts: [], + pendingAssistantReplyDirectives: undefined, + ...params.state, }, log: { debug: params.debug ?? vi.fn() }, noteLastAssistant: vi.fn(), stripBlockTags: (text: string) => text, - consumePartialReplyDirectives: vi.fn(() => null), + consumePartialReplyDirectives: params.consumePartialReplyDirectives ?? vi.fn(() => null), emitReasoningStream: vi.fn(), flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(), resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(), @@ -194,6 +201,54 @@ describe("buildAssistantStreamData", () => { }); }); +describe("pending assistant reply directives", () => { + it("merges directive metadata into the next non-reasoning block reply", () => { + const state = { pendingAssistantReplyDirectives: undefined }; + + recordPendingAssistantReplyDirectives(state, { + text: "", + mediaUrls: ["/tmp/reply.ogg"], + replyToCurrent: true, + replyToTag: true, + audioAsVoice: true, + isSilent: false, + }); + + expect( + consumePendingAssistantReplyDirectivesIntoReply(state, { + text: "Done.", + }), + ).toEqual({ + text: "Done.", + mediaUrls: ["/tmp/reply.ogg"], + audioAsVoice: true, + replyToId: undefined, + replyToTag: true, + replyToCurrent: true, + }); + expect(state.pendingAssistantReplyDirectives).toBeUndefined(); + }); + + it("does not consume pending directive metadata on reasoning replies", () => { + const state = { + pendingAssistantReplyDirectives: { + mediaUrls: ["/tmp/reply.png"], + }, + }; + + expect( + consumePendingAssistantReplyDirectivesIntoReply(state, { + text: "Thinking...", + isReasoning: true, + }), + ).toEqual({ + text: "Thinking...", + isReasoning: true, + }); + expect(state.pendingAssistantReplyDirectives?.mediaUrls).toEqual(["/tmp/reply.png"]); + }); +}); + describe("handleMessageUpdate", () => { it("treats phased textSignature item changes as assistant-message boundaries", () => { const flushBlockReplyBuffer = vi.fn(); @@ -244,6 +299,54 @@ describe("handleMessageUpdate", () => { expect(onAssistantMessageStart).toHaveBeenCalledTimes(1); expect(context.state.lastAssistantStreamItemId).toBe("item-2"); }); + + it("preserves phase-aware media, voice, and reply directives for block delivery", () => { + const accumulator = createStreamingDirectiveAccumulator(); + const ctx = createMessageUpdateContext({ + consumePartialReplyDirectives: vi.fn((text: string, options?: { final?: boolean }) => + accumulator.consume(text, options), + ), + state: { + blockReplyBreak: "message_end", + }, + }); + const replyText = "Done.\n\n[[reply_to_current]]\n[[audio_as_voice]]\nMEDIA:/tmp/reply.ogg"; + + handleMessageUpdate( + ctx, + createTextUpdateEvent({ + type: "text_delta", + text: replyText, + id: "item-final", + signaturePhase: "final_answer", + partialPhase: "final_answer", + }), + ); + handleMessageUpdate( + ctx, + createTextUpdateEvent({ + type: "text_end", + text: replyText, + id: "item-final", + signaturePhase: "final_answer", + partialPhase: "final_answer", + }), + ); + + expect(ctx.state.blockBuffer).toBe("Done."); + expect( + consumePendingAssistantReplyDirectivesIntoReply(ctx.state, { + text: "Done.", + }), + ).toEqual({ + text: "Done.", + mediaUrls: ["/tmp/reply.ogg"], + audioAsVoice: true, + replyToId: undefined, + replyToTag: true, + replyToCurrent: true, + }); + }); }); describe("consumePendingToolMediaIntoReply", () => { diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index eb8819b3c8b..11859f7d62d 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -1,7 +1,11 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage } from "@mariozechner/pi-ai"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; -import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; +import { + parseReplyDirectives, + type ReplyDirectiveParseResult, +} from "../auto-reply/reply/reply-directives.js"; +import { splitTrailingDirective } from "../auto-reply/reply/streaming-directives.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; @@ -33,21 +37,6 @@ import { promoteThinkingTagsToBlocks, } from "./pi-embedded-utils.js"; -const stripTrailingDirective = (text: string): string => { - const openIndex = text.lastIndexOf("[["); - if (openIndex < 0) { - if (text.endsWith("[")) { - return text.slice(0, -1); - } - return text; - } - const closeIndex = text.indexOf("]]", openIndex + 2); - if (closeIndex >= 0) { - return text; - } - return text.slice(0, openIndex); -}; - function shouldSuppressAssistantVisibleOutput(message: AgentMessage | undefined): boolean { return resolveAssistantMessagePhase(message) === "commentary"; } @@ -242,6 +231,88 @@ export function consumePendingToolMediaReply( return payload; } +function hasReplyDirectiveMetadata(parsed: ReplyDirectiveParseResult | null | undefined): boolean { + return Boolean( + parsed && + ((parsed.mediaUrls?.length ?? 0) > 0 || + parsed.audioAsVoice || + parsed.replyToId || + parsed.replyToTag || + parsed.replyToCurrent), + ); +} + +function hasReplyDirectiveMetadataResult( + parsed: ReplyDirectiveParseResult | null | undefined, +): parsed is ReplyDirectiveParseResult { + return hasReplyDirectiveMetadata(parsed); +} + +function mergeReplyDirectiveResults( + first: ReplyDirectiveParseResult | null | undefined, + second: ReplyDirectiveParseResult | null | undefined, +): ReplyDirectiveParseResult | null { + if (!first) { + return second ?? null; + } + if (!second) { + return first; + } + const mediaUrls = Array.from(new Set([...(first.mediaUrls ?? []), ...(second.mediaUrls ?? [])])); + return { + text: `${first.text ?? ""}${second.text ?? ""}`, + mediaUrls: mediaUrls.length ? mediaUrls : undefined, + mediaUrl: mediaUrls[0] ?? first.mediaUrl ?? second.mediaUrl, + replyToId: second.replyToId ?? first.replyToId, + replyToCurrent: first.replyToCurrent || second.replyToCurrent, + replyToTag: first.replyToTag || second.replyToTag, + audioAsVoice: first.audioAsVoice || second.audioAsVoice || undefined, + isSilent: first.isSilent || second.isSilent, + }; +} + +export function recordPendingAssistantReplyDirectives( + state: Pick, + parsed: ReplyDirectiveParseResult | null | undefined, +) { + if (!hasReplyDirectiveMetadataResult(parsed)) { + return; + } + const current = state.pendingAssistantReplyDirectives; + const mediaUrls = Array.from( + new Set([...(current?.mediaUrls ?? []), ...(parsed.mediaUrls ?? [])]), + ); + state.pendingAssistantReplyDirectives = { + mediaUrls: mediaUrls.length ? mediaUrls : undefined, + audioAsVoice: current?.audioAsVoice || parsed?.audioAsVoice || undefined, + replyToId: parsed?.replyToId ?? current?.replyToId, + replyToTag: current?.replyToTag || parsed.replyToTag || undefined, + replyToCurrent: current?.replyToCurrent || parsed.replyToCurrent || undefined, + }; +} + +export function consumePendingAssistantReplyDirectivesIntoReply( + state: Pick, + payload: BlockReplyPayload, +): BlockReplyPayload { + if (payload.isReasoning || !state.pendingAssistantReplyDirectives) { + return payload; + } + const pending = state.pendingAssistantReplyDirectives; + const mediaUrls = Array.from( + new Set([...(payload.mediaUrls ?? []), ...(pending.mediaUrls ?? [])]), + ); + state.pendingAssistantReplyDirectives = undefined; + return { + ...payload, + mediaUrls: mediaUrls.length ? mediaUrls : undefined, + audioAsVoice: payload.audioAsVoice || pending.audioAsVoice || undefined, + replyToId: payload.replyToId ?? pending.replyToId, + replyToTag: Boolean(payload.replyToTag || pending.replyToTag) || undefined, + replyToCurrent: Boolean(payload.replyToCurrent || pending.replyToCurrent) || undefined, + }; +} + export function hasAssistantVisibleReply(params: { text?: string; mediaUrls?: string[]; @@ -431,10 +502,16 @@ export function handleMessageUpdate( emitReasoningEnd(ctx); } const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null; - const parsedFull = parseReplyDirectives(stripTrailingDirective(next)); + const finalParsedDelta = + evtType === "text_end" ? ctx.consumePartialReplyDirectives("", { final: true }) : null; + const parsedStreamDirectives = mergeReplyDirectiveResults(parsedDelta, finalParsedDelta); + if (shouldUsePhaseAwareBlockReply) { + recordPendingAssistantReplyDirectives(ctx.state, parsedStreamDirectives); + } + const parsedFull = parseReplyDirectives(splitTrailingDirective(next).text); const cleanedText = parsedFull.text; - const { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedDelta ?? {}); - const hasAudio = Boolean(parsedDelta?.audioAsVoice); + const { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedStreamDirectives ?? {}); + const hasAudio = Boolean(parsedStreamDirectives?.audioAsVoice); const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? ""; let shouldEmit = false; @@ -562,7 +639,9 @@ export function handleMessageEnd( : ""; const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : ""; const trimmedText = text.trim(); - const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null; + const parsedText = trimmedText + ? parseReplyDirectives(splitTrailingDirective(trimmedText, { final: true }).text) + : null; let cleanedText = parsedText?.text ?? ""; let { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedText ?? {}); @@ -695,6 +774,14 @@ export function handleMessageEnd( if (hasBufferedBlockReply && ctx.blockChunker?.hasBuffered()) { ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk }); ctx.blockChunker.reset(); + // Final-flush the streaming directive accumulator so any partial + // directive tail held back by splitTrailingDirective (for example a + // trailing `MEDIA:` that arrived without a closing newline) + // gets emitted here. Without this, a reply ending in a directive + // line whose URL is complete but un-terminated would sit in + // pendingTail forever and the attachment would be silently dropped + // on the message_end / blockReplyChunking path. + emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true })); } else if (text !== ctx.state.lastBlockReplyText) { // Guard: for text_end channels, if text_end already delivered content // (lastBlockReplyText is set), skip this safety send. The text comparison diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index e0e544e4b7b..f541578ec23 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -82,6 +82,10 @@ export type EmbeddedPiSubscribeState = { pendingToolMediaUrls: string[]; pendingToolAudioAsVoice: boolean; pendingToolTrustedLocalMedia: boolean; + pendingAssistantReplyDirectives?: Pick< + BlockReplyPayload, + "mediaUrls" | "audioAsVoice" | "replyToId" | "replyToTag" | "replyToCurrent" + >; deterministicApprovalPromptPending: boolean; deterministicApprovalPromptSent: boolean; lastAssistant?: AgentMessage; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 2b069a4c001..58ef500b83f 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -446,7 +446,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(payloads).toHaveLength(1); }); - it("emits a replacement snapshot when cleaned text rewinds mid-stream", () => { + it("emits one cleaned media snapshot when a streamed MEDIA line resolves to caption text", () => { const { emit, onAgentEvent } = createAgentEventHarness(); emit({ type: "message_start", message: { role: "assistant" } }); @@ -454,20 +454,26 @@ describe("subscribeEmbeddedPiSession", () => { emitAssistantTextDelta(emit, " https://example.com/a.png\nCaption"); const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); - expect(payloads).toHaveLength(2); - expect(payloads[0]?.text).toBe("MEDIA:"); - expect(payloads[0]?.delta).toBe("MEDIA:"); + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe("Caption"); + expect(payloads[0]?.delta).toBe("Caption"); expect(payloads[0]?.replace).toBeUndefined(); - expect(payloads[1]?.text).toBe("Caption"); - expect(payloads[1]?.delta).toBe(""); - expect(payloads[1]?.replace).toBe(true); + expect(payloads[0]?.mediaUrls).toEqual(["https://example.com/a.png"]); }); - it("emits agent events when media arrives without text", () => { + it("emits agent events when media-only text is finalized", () => { const { emit, onAgentEvent } = createAgentEventHarness(); emit({ type: "message_start", message: { role: "assistant" } }); emitAssistantTextDelta(emit, "MEDIA: https://example.com/a.png"); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + content: "MEDIA: https://example.com/a.png", + }, + }); const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); expect(payloads).toHaveLength(1); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 3197f575f2c..c17376de978 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -21,7 +21,10 @@ import { } from "./pi-embedded-runner/replay-state.js"; import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js"; import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js"; -import { consumePendingToolMediaIntoReply } from "./pi-embedded-subscribe.handlers.messages.js"; +import { + consumePendingAssistantReplyDirectivesIntoReply, + consumePendingToolMediaIntoReply, +} from "./pi-embedded-subscribe.handlers.messages.js"; import type { EmbeddedPiSubscribeContext, EmbeddedPiSubscribeState, @@ -124,6 +127,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar pendingToolMediaUrls: initialPendingToolMediaUrls, pendingToolAudioAsVoice: false, pendingToolTrustedLocalMedia: false, + pendingAssistantReplyDirectives: undefined, deterministicApprovalPromptPending: false, deterministicApprovalPromptSent: false, }; @@ -184,7 +188,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar payload: BlockReplyPayload, options?: { assistantMessageIndex?: number }, ) => { - emitBlockReplySafely(consumePendingToolMediaIntoReply(state, payload), options); + const withAssistantDirectives = consumePendingAssistantReplyDirectivesIntoReply(state, payload); + emitBlockReplySafely(consumePendingToolMediaIntoReply(state, withAssistantDirectives), options); }; const resetAssistantMessageState = (nextAssistantTextBaseline: number) => { @@ -213,6 +218,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.lastAssistantTextNormalized = undefined; state.lastAssistantTextTrimmed = undefined; state.assistantTextBaseline = nextAssistantTextBaseline; + state.pendingAssistantReplyDirectives = undefined; }; const rememberAssistantText = (text: string) => { @@ -710,6 +716,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.pendingMessagingMediaUrls.clear(); state.pendingToolMediaUrls = []; state.pendingToolAudioAsVoice = false; + state.pendingAssistantReplyDirectives = undefined; state.deterministicApprovalPromptPending = false; state.deterministicApprovalPromptSent = false; state.replayState = mergeEmbeddedRunReplayState(state.replayState, params.initialReplayState); diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index 09a008b74f0..6c9f1fa82bc 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -10,7 +10,10 @@ import { hasTemplateVariables, resolveResponsePrefixTemplate, } from "./response-prefix-template.js"; -import { createStreamingDirectiveAccumulator } from "./streaming-directives.js"; +import { + createStreamingDirectiveAccumulator, + splitTrailingDirective, +} from "./streaming-directives.js"; import { createMockTypingController } from "./test-helpers.js"; import { createTypingSignaler, resolveTypingMode } from "./typing-mode.js"; import { createTypingController } from "./typing.js"; @@ -989,6 +992,15 @@ describe("createStreamingDirectiveAccumulator", () => { expect(result?.replyToCurrent).toBe(true); }); + it("handles reply tags split before the second bracket", () => { + const accumulator = createStreamingDirectiveAccumulator(); + expect(accumulator.consume("[")).toBeNull(); + + const result = accumulator.consume("[reply_to_current]] Yo"); + expect(result?.text).toBe("Yo"); + expect(result?.replyToCurrent).toBe(true); + }); + it("propagates explicit reply ids across current and subsequent chunks", () => { const accumulator = createStreamingDirectiveAccumulator(); @@ -1033,6 +1045,135 @@ describe("createStreamingDirectiveAccumulator", () => { expect(result?.text).toBe("NO_REPLY: explanation"); }); + + it("reassembles MEDIA: directives split between the token and the colon", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + const first = accumulator.consume("这次直接发图。\n\nMEDIA"); + expect(first?.text).toBe("这次直接发图。"); + expect(first?.mediaUrls).toBeUndefined(); + + const second = accumulator.consume(":/tmp/spy-family.png"); + expect(second).toBeNull(); + + const finalResult = accumulator.consume("", { final: true }); + expect(finalResult?.mediaUrls).toEqual(["/tmp/spy-family.png"]); + expect((finalResult?.text ?? "").includes("MEDIA")).toBe(false); + }); + + it("reassembles MEDIA: directives split inside the URL path", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + const first = accumulator.consume("Preview below.\n\nMEDIA:/var/folders/tool-image"); + expect(first?.text).toBe("Preview below."); + expect(first?.mediaUrls).toBeUndefined(); + + const second = accumulator.consume("-generation/cover.png"); + expect(second).toBeNull(); + + const finalResult = accumulator.consume("", { final: true }); + expect(finalResult?.mediaUrls).toEqual(["/var/folders/tool-image-generation/cover.png"]); + }); + + it("buffers partial MEDIA prefixes (M/ME/MED/MEDI) across chunk boundaries", () => { + for (const prefix of ["M", "ME", "MED", "MEDI"]) { + const accumulator = createStreamingDirectiveAccumulator(); + const head = `Here is the file.\n\n${prefix}`; + const headResult = accumulator.consume(head); + expect(headResult?.text, `prefix=${prefix} head emits text`).toBe("Here is the file."); + + const rest = `MEDIA:/tmp/file.png`.slice(prefix.length); + const restResult = accumulator.consume(rest); + expect(restResult, `prefix=${prefix} mid returns null`).toBeNull(); + + const finalResult = accumulator.consume("", { final: true }); + expect(finalResult?.mediaUrls, `prefix=${prefix} final mediaUrls`).toEqual(["/tmp/file.png"]); + } + }); + + it("does not buffer a trailing letter that appears mid-line", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + // "I am" ends in "m". The prefix guard only anchors to line-start (`^` + // or immediately after `\n`), so ordinary prose whose last character + // happens to be an `M|ME|MED|MEDI|MEDIA` letter stays in the emitted + // text. + const result = accumulator.consume("I am"); + expect(result?.text).toBe("I am"); + expect(result?.mediaUrls).toBeUndefined(); + }); + + it("does not buffer prose that merely contains the token MEDIA:", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + // Matches what upstream `splitMediaFromOutput` considers a directive: + // only lines whose trimmed start is `MEDIA:`. A line that merely + // contains "MEDIA:" mid-sentence is ordinary prose and must flush + // immediately — otherwise on a stream-item boundary (which may call + // `reset()` without a preceding `consume("", { final: true })`) the + // buffered prose would be silently dropped. + const result = accumulator.consume("See the MEDIA: section for details"); + expect(result?.text).toBe("See the MEDIA: section for details"); + expect(result?.mediaUrls).toBeUndefined(); + }); + + it("still buffers an indented MEDIA directive line that is mid-stream", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + // Upstream parser treats `line.trimStart().startsWith("MEDIA:")` as a + // directive, so the guard must also buffer the indented form across + // a chunk boundary. + const first = accumulator.consume("Preview:\n MEDIA:/tmp/cover"); + expect(first?.text).toBe("Preview:"); + expect(first?.mediaUrls).toBeUndefined(); + + const second = accumulator.consume(".png"); + expect(second).toBeNull(); + + const finalResult = accumulator.consume("", { final: true }); + expect(finalResult?.mediaUrls).toEqual(["/tmp/cover.png"]); + }); + + it("does not rewrite mid-prose MEDIA into a directive across chunks", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + // A chunk can legitimately end with `MEDIA` mid-sentence (e.g. "this + // uses legacy MEDIA"). A later chunk starting with `:` must NOT join + // with the buffered token to synthesize a `MEDIA:` directive — + // upstream `MEDIA_TOKEN_RE` captures `[^\n]+`, and treating the rest + // of that sentence as a media path would invent a media reply the + // agent never authored. + const first = accumulator.consume("The legacy pipeline uses MEDIA"); + expect(first?.text).toBe("The legacy pipeline uses MEDIA"); + expect(first?.mediaUrls).toBeUndefined(); + + const second = accumulator.consume(": kind=disk capacity=1TB"); + expect(second?.text).toBe(": kind=disk capacity=1TB"); + expect(second?.mediaUrls).toBeUndefined(); + }); + + it("passes plain text through when there is no incomplete directive tail", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + const result = accumulator.consume("Hello world.\nThis is a complete block."); + expect(result?.text).toBe("Hello world.\nThis is a complete block."); + expect(result?.mediaUrls).toBeUndefined(); + }); + + it("keeps MEDIA directives that arrive in a single complete chunk working", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + const result = accumulator.consume("Here it is.\n\nMEDIA:/tmp/complete.png\n"); + expect(result?.text.includes("MEDIA")).toBe(false); + expect(result?.mediaUrls).toEqual(["/tmp/complete.png"]); + }); + + it("does not strip a complete final MEDIA line when parsing final text", () => { + expect(splitTrailingDirective("Here.\nMEDIA:/tmp/final.png", { final: true })).toEqual({ + text: "Here.\nMEDIA:/tmp/final.png", + tail: "", + }); + }); }); describe("extractShortModelName", () => { diff --git a/src/auto-reply/reply/streaming-directives.ts b/src/auto-reply/reply/streaming-directives.ts index 9feb194deaa..5fe4c9680e4 100644 --- a/src/auto-reply/reply/streaming-directives.ts +++ b/src/auto-reply/reply/streaming-directives.ts @@ -25,18 +25,84 @@ type ConsumeOptions = { silentToken?: string; }; -const splitTrailingDirective = (text: string): { text: string; tail: string } => { +type SplitTrailingDirectiveOptions = { + final?: boolean; +}; + +// Holds back incomplete streaming-directive tails so parseChunk only ever sees +// complete directives. Otherwise, upstream token boundaries can split markers +// like `MEDIA:` between chunks and cause the first half to be emitted as +// plain text (e.g. the `MEDIA` token leaking into a channel reply while the +// matching file path is silently dropped on the next chunk). +export const splitTrailingDirective = ( + text: string, + options: SplitTrailingDirectiveOptions = {}, +): { text: string; tail: string } => { + let bufferStart = text.length; + + // 1. Unclosed `[[…` reply/audio directive tail. const openIndex = text.lastIndexOf("[["); - if (openIndex < 0) { - return { text, tail: "" }; - } - const closeIndex = text.indexOf("]]", openIndex + 2); - if (closeIndex >= 0) { + if (openIndex >= 0 && text.indexOf("]]", openIndex + 2) < 0) { + if (openIndex < bufferStart) { + bufferStart = openIndex; + } + } + if (text.endsWith("[") && text.length - 1 < bufferStart) { + bufferStart = text.length - 1; + } + + if (options.final) { + if (bufferStart >= text.length) { + return { text, tail: "" }; + } + + return { + text: text.slice(0, bufferStart), + tail: text.slice(bufferStart), + }; + } + + // 2. `MEDIA:` line without a trailing newline — the URL may still be + // streaming. `splitMediaFromOutput` in src/media/parse.ts treats a + // line as a media directive only when `line.trimStart()` begins with + // `MEDIA:`, so we match the same shape here: only buffer when the + // last line looks like an actual directive line (optional leading + // whitespace, then `MEDIA:`). Prose such as + // "See the MEDIA: section for details" does NOT qualify and is + // flushed as ordinary text — otherwise it could sit in pendingTail + // and be silently dropped if a stream-item boundary calls `reset()` + // without a preceding `consume("", { final: true })`. + const lastNewline = text.lastIndexOf("\n"); + const lastLine = lastNewline < 0 ? text : text.slice(lastNewline + 1); + if (/^\s*MEDIA:/i.test(lastLine)) { + const mediaLineStart = lastNewline < 0 ? 0 : lastNewline + 1; + if (mediaLineStart < bufferStart) { + bufferStart = mediaLineStart; + } + } + + // 3. Trailing `M|ME|MED|MEDI|MEDIA` prefix (no colon yet) at the start of + // a line — the next chunk might turn this into `MEDIA:`. Only a + // line-start anchor (`^` or immediately after `\n`) is accepted so + // mid-prose tokens like "_M", "3ME", or "token MEDIA" are not + // speculatively buffered and cannot accidentally be glued to a + // following `:` into a synthetic directive. Matches the canonical + // MEDIA directive placement (own line after `\n\n`). + const prefixMatch = text.match(/(?:^|\n)(MEDIA|MEDI|MED|ME|M)$/i); + if (prefixMatch) { + const prefixStart = text.length - prefixMatch[1].length; + if (prefixStart < bufferStart) { + bufferStart = prefixStart; + } + } + + if (bufferStart >= text.length) { return { text, tail: "" }; } + return { - text: text.slice(0, openIndex), - tail: text.slice(openIndex), + text: text.slice(0, bufferStart), + tail: text.slice(bufferStart), }; };