diff --git a/src/commands/agent.acp.test.ts b/src/commands/agent.acp.test.ts index cde0ab54a94..1576e758251 100644 --- a/src/commands/agent.acp.test.ts +++ b/src/commands/agent.acp.test.ts @@ -7,6 +7,7 @@ import { AcpRuntimeError } from "../acp/runtime/errors.js"; import * as embeddedModule from "../agents/pi-embedded.js"; import type { OpenClawConfig } from "../config/config.js"; import * as configModule from "../config/config.js"; +import { onAgentEvent } from "../infra/agent-events.js"; import type { RuntimeEnv } from "../runtime.js"; import { agentCommand } from "./agent.js"; @@ -195,6 +196,95 @@ describe("agentCommand ACP runtime routing", () => { }); }); + it("suppresses ACP NO_REPLY lead fragments before emitting assistant text", async () => { + await withTempHome(async (home) => { + const storePath = path.join(home, "sessions.json"); + writeAcpSessionStore(storePath); + mockConfig(home, storePath); + + const assistantEvents: Array<{ text?: string; delta?: string }> = []; + const stop = onAgentEvent((evt) => { + if (evt.stream !== "assistant") { + return; + } + assistantEvents.push({ + text: typeof evt.data?.text === "string" ? evt.data.text : undefined, + delta: typeof evt.data?.delta === "string" ? evt.data.delta : undefined, + }); + }); + + const runTurn = vi.fn(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + onEvent?: (event: { type: string; text?: string; stopReason?: string }) => Promise; + }; + for (const text of ["NO", "NO_", "NO_RE", "NO_REPLY", "Actual answer"]) { + await params.onEvent?.({ type: "text_delta", text }); + } + await params.onEvent?.({ type: "done", stopReason: "stop" }); + }); + + mockAcpManager({ + runTurn: (params: unknown) => runTurn(params), + }); + + try { + await agentCommand({ message: "ping", sessionKey: "agent:codex:acp:test" }, runtime); + } finally { + stop(); + } + + expect(assistantEvents).toEqual([{ text: "Actual answer", delta: "Actual answer" }]); + + const logLines = vi.mocked(runtime.log).mock.calls.map(([first]) => String(first)); + expect(logLines.some((line) => line.includes("NO_REPLY"))).toBe(false); + expect(logLines.some((line) => line.includes("Actual answer"))).toBe(true); + }); + }); + + it("keeps silent-only ACP turns out of assistant output", async () => { + await withTempHome(async (home) => { + const storePath = path.join(home, "sessions.json"); + writeAcpSessionStore(storePath); + mockConfig(home, storePath); + + const assistantEvents: string[] = []; + const stop = onAgentEvent((evt) => { + if (evt.stream !== "assistant") { + return; + } + if (typeof evt.data?.text === "string") { + assistantEvents.push(evt.data.text); + } + }); + + const runTurn = vi.fn(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + onEvent?: (event: { type: string; text?: string; stopReason?: string }) => Promise; + }; + for (const text of ["NO", "NO_", "NO_RE", "NO_REPLY"]) { + await params.onEvent?.({ type: "text_delta", text }); + } + await params.onEvent?.({ type: "done", stopReason: "stop" }); + }); + + mockAcpManager({ + runTurn: (params: unknown) => runTurn(params), + }); + + try { + await agentCommand({ message: "ping", sessionKey: "agent:codex:acp:test" }, runtime); + } finally { + stop(); + } + + expect(assistantEvents).toEqual([]); + + const logLines = vi.mocked(runtime.log).mock.calls.map(([first]) => String(first)); + expect(logLines.some((line) => line.includes("NO_REPLY"))).toBe(false); + expect(logLines.some((line) => line.includes("No reply from agent."))).toBe(true); + }); + }); + it("fails closed for ACP-shaped session keys missing ACP metadata", async () => { await withTempHome(async (home) => { const storePath = path.join(home, "sessions.json"); diff --git a/src/commands/agent.ts b/src/commands/agent.ts index fcbe593ec03..f7f735724ad 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -38,6 +38,7 @@ import { buildWorkspaceSkillSnapshot } from "../agents/skills.js"; import { getSkillsSnapshotVersion } from "../agents/skills/refresh.js"; import { resolveAgentTimeoutMs } from "../agents/timeout.js"; import { ensureAgentWorkspace } from "../agents/workspace.js"; +import { normalizeReplyPayload } from "../auto-reply/reply/normalize-reply.js"; import { formatThinkingLevels, formatXHighModelHint, @@ -47,6 +48,11 @@ import { type ThinkLevel, type VerboseLevel, } from "../auto-reply/thinking.js"; +import { + isSilentReplyPrefixText, + isSilentReplyText, + SILENT_REPLY_TOKEN, +} from "../auto-reply/tokens.js"; import { formatCliCommand } from "../cli/command-format.js"; import { resolveCommandSecretRefsViaGateway } from "../cli/command-secret-gateway.js"; import { getAgentRuntimeCommandSecretTargetIds } from "../cli/command-secret-targets.js"; @@ -148,6 +154,76 @@ function prependInternalEventContext( return [renderedEvents, body].filter(Boolean).join("\n\n"); } +function appendUniqueSuffix(base: string, suffix: string): { text: string; delta: string } { + if (!suffix) { + return { text: base, delta: "" }; + } + if (!base) { + return { text: suffix, delta: suffix }; + } + if (base.endsWith(suffix)) { + return { text: base, delta: "" }; + } + const maxOverlap = Math.min(base.length, suffix.length); + for (let overlap = maxOverlap; overlap > 0; overlap -= 1) { + if (base.slice(-overlap) === suffix.slice(0, overlap)) { + const delta = suffix.slice(overlap); + return { + text: `${base}${delta}`, + delta, + }; + } + } + return { + text: `${base}${suffix}`, + delta: suffix, + }; +} + +function createAcpVisibleTextAccumulator() { + let pendingSilentPrefix = ""; + let visibleText = ""; + + return { + consume(chunk: string): { text: string; delta: string } | null { + if (!chunk) { + return null; + } + + if (!visibleText) { + const leadCandidate = appendUniqueSuffix(pendingSilentPrefix, chunk); + const trimmedLeadCandidate = leadCandidate.text.trim(); + if ( + isSilentReplyText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) || + isSilentReplyPrefixText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) + ) { + pendingSilentPrefix = leadCandidate.text; + return null; + } + if (pendingSilentPrefix) { + const visibleDelta = leadCandidate.text.startsWith(pendingSilentPrefix) + ? leadCandidate.text.slice(pendingSilentPrefix.length) + : chunk; + pendingSilentPrefix = ""; + if (!visibleDelta) { + return null; + } + const nextVisible = appendUniqueSuffix(visibleText, visibleDelta); + visibleText = nextVisible.text; + return nextVisible.delta ? nextVisible : null; + } + } + + const nextVisible = appendUniqueSuffix(visibleText, chunk); + visibleText = nextVisible.text; + return nextVisible.delta ? nextVisible : null; + }, + finalize(): string { + return visibleText.trim(); + }, + }; +} + function runAgentAttempt(params: { providerOverride: string; modelOverride: string; @@ -492,7 +568,7 @@ async function agentCommandInternal( }, }); - let streamedText = ""; + const visibleTextAccumulator = createAcpVisibleTextAccumulator(); let stopReason: string | undefined; try { const dispatchPolicyError = resolveAcpDispatchPolicyError(cfg); @@ -528,13 +604,16 @@ async function agentCommandInternal( if (!event.text) { return; } - streamedText += event.text; + const visibleUpdate = visibleTextAccumulator.consume(event.text); + if (!visibleUpdate) { + return; + } emitAgentEvent({ runId, stream: "assistant", data: { - text: streamedText, - delta: event.text, + text: visibleUpdate.text, + delta: visibleUpdate.delta, }, }); }, @@ -566,14 +645,10 @@ async function agentCommandInternal( }, }); - const finalText = streamedText.trim(); - const payloads = finalText - ? [ - { - text: finalText, - }, - ] - : []; + const normalizedFinalPayload = normalizeReplyPayload({ + text: visibleTextAccumulator.finalize(), + }); + const payloads = normalizedFinalPayload ? [normalizedFinalPayload] : []; const result = { payloads, meta: { diff --git a/src/telegram/send.ts b/src/telegram/send.ts index e5b868cc5fc..61292f66608 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -6,7 +6,6 @@ import type { } from "@grammyjs/types"; import { type ApiClientOptions, Bot, HttpError, InputFile } from "grammy"; import { loadConfig } from "../config/config.js"; -import { isSilentReplyText } from "../auto-reply/tokens.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; import { logVerbose } from "../globals.js"; import { recordChannelActivity } from "../infra/channel-activity.js"; @@ -464,15 +463,6 @@ export async function sendMessageTelegram( text: string, opts: TelegramSendOpts = {}, ): Promise { - const trimmedText = text?.trim() ?? ""; - if (isSilentReplyText(trimmedText) && !opts.mediaUrl) { - logVerbose("telegram send: suppressed NO_REPLY token before API call"); - return { - messageId: "suppressed", - chatId: "", - }; - } - const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = await resolveAndPersistChatId({