diff --git a/CHANGELOG.md b/CHANGELOG.md index 11d2e1c8a0e..c78d18f83f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -330,6 +330,7 @@ Docs: https://docs.openclaw.ai - Agents/embed: keep message_end safety delivery armed when a silent text_end chunk produces no block reply, fixing dropped Telegram/forum replies. Fixes #77833. (#77840) Thanks @neeravmakwana. - Install/postinstall: skip noisy compile-cache prune warnings when `EACCES`/`EPERM` prevent removing shared `/tmp/node-compile-cache` entries owned by another user. Fixes #76353. (#76362) Thanks @RayWoo and @neeravmakwana. - Agents/messaging: surface CLI subprocess watchdog/turn timeout messages to chat users when verbose failures are off, instead of collapsing them into generic external-run failure copy. Fixes #77007. (#77015) Thanks @neeravmakwana. +- Agents/sessions: after embedded Pi runs, append assistant-visible reply text to session JSONL only when Pi did not already persist an equivalent tail assistant entry, without re-mirroring the user prompt Pi owns. Fixes #77823. (#77839) Thanks @neeravmakwana. ## 2026.5.3-1 diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index d2b0ae4f488..584ba721769 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -1196,7 +1196,8 @@ async function agentCommandInternal( sessionEntry = sessionStore[sessionKey] ?? sessionEntry; } - if (result.meta.executionTrace?.runner === "cli") { + const transcriptPersistenceRunner = result.meta.executionTrace?.runner; + if (transcriptPersistenceRunner === "cli" || transcriptPersistenceRunner === "embedded") { try { sessionEntry = await attemptExecutionRuntime.persistCliTurnTranscript({ body, @@ -1211,6 +1212,7 @@ async function agentCommandInternal( threadId: opts.threadId, sessionCwd: workspaceDir, config: cfg, + embeddedAssistantGapFill: transcriptPersistenceRunner === "embedded", }); sessionEntry = await ( await loadCliCompactionRuntime() @@ -1235,7 +1237,7 @@ async function agentCommandInternal( }); } catch (error) { log.warn( - `CLI transcript persistence failed for ${sessionKey ?? sessionId}: ${error instanceof Error ? error.message : String(error)}`, + `Turn transcript persistence failed for ${sessionKey ?? sessionId}: ${error instanceof Error ? error.message : String(error)}`, ); } } diff --git a/src/agents/command/attempt-execution.cli.test.ts b/src/agents/command/attempt-execution.cli.test.ts index f383e723436..087385545df 100644 --- a/src/agents/command/attempt-execution.cli.test.ts +++ b/src/agents/command/attempt-execution.cli.test.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; +import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { FailoverError } from "../failover-error.js"; import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js"; @@ -418,6 +419,130 @@ describe("CLI attempt execution", () => { }); }); + it("embedded assistant gap-fill skips user mirror and dedupes identical assistant tails", async () => { + const sessionKey = "agent:main:subagent:embedded-gap-fill"; + const sessionEntry: SessionEntry = { + sessionId: "session-embedded-gap-fill", + updatedAt: Date.now(), + }; + const sessionStore: Record = { [sessionKey]: sessionEntry }; + await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8"); + + const result = makeCliResult("already mirrored"); + result.meta.executionTrace = { + winnerProvider: "anthropic", + winnerModel: "claude-opus-4-6", + fallbackUsed: false, + runner: "embedded", + }; + + const updatedFirst = await persistCliTurnTranscript({ + body: "ignored for gap fill", + transcriptBody: "also ignored", + result, + sessionId: sessionEntry.sessionId, + sessionKey, + sessionEntry, + sessionStore, + storePath, + sessionAgentId: "main", + sessionCwd: tmpDir, + config: {}, + embeddedAssistantGapFill: true, + }); + + let messages = await readSessionMessages(updatedFirst?.sessionFile ?? ""); + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + role: "assistant", + content: [{ type: "text", text: "already mirrored" }], + }); + + await persistCliTurnTranscript({ + body: "still ignored", + result, + sessionId: sessionEntry.sessionId, + sessionKey, + sessionEntry: updatedFirst, + sessionStore, + storePath, + sessionAgentId: "main", + sessionCwd: tmpDir, + config: {}, + embeddedAssistantGapFill: true, + }); + + messages = await readSessionMessages(updatedFirst?.sessionFile ?? ""); + expect(messages).toHaveLength(1); + }); + + it("embedded assistant gap-fill appends repeated replies after a user tail", async () => { + const sessionKey = "agent:main:subagent:embedded-repeated-reply"; + const sessionEntry: SessionEntry = { + sessionId: "session-embedded-repeated-reply", + updatedAt: Date.now(), + }; + const sessionStore: Record = { [sessionKey]: sessionEntry }; + await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8"); + + const result = makeCliResult("same answer"); + result.meta.executionTrace = { + winnerProvider: "anthropic", + winnerModel: "claude-opus-4-6", + fallbackUsed: false, + runner: "embedded", + }; + + const updatedFirst = await persistCliTurnTranscript({ + body: "ignored for gap fill", + result, + sessionId: sessionEntry.sessionId, + sessionKey, + sessionEntry, + sessionStore, + storePath, + sessionAgentId: "main", + sessionCwd: tmpDir, + config: {}, + embeddedAssistantGapFill: true, + }); + const sessionFile = updatedFirst?.sessionFile; + expect(sessionFile).toBeTruthy(); + + await appendSessionTranscriptMessage({ + transcriptPath: sessionFile!, + sessionId: sessionEntry.sessionId, + cwd: tmpDir, + config: {}, + message: { + role: "user", + content: "next prompt", + timestamp: Date.now(), + }, + }); + + await persistCliTurnTranscript({ + body: "still ignored", + result, + sessionId: sessionEntry.sessionId, + sessionKey, + sessionEntry: updatedFirst, + sessionStore, + storePath, + sessionAgentId: "main", + sessionCwd: tmpDir, + config: {}, + embeddedAssistantGapFill: true, + }); + + const messages = await readSessionMessages(sessionFile!); + expect(messages).toHaveLength(3); + expect(messages.map((message) => message.role)).toEqual(["assistant", "user", "assistant"]); + expect(messages[2]).toMatchObject({ + content: [{ type: "text", text: "same answer" }], + }); + }); + it("persists the transcript body instead of runtime-only CLI prompt context", async () => { const sessionKey = "agent:main:subagent:cli-transcript-clean"; const sessionEntry: SessionEntry = { diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index ebd84a4e0a9..b602936fd7a 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -2,7 +2,10 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js"; import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js"; import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js"; -import { resolveSessionTranscriptFile } from "../../config/sessions/transcript.js"; +import { + readTailAssistantTextFromSessionTranscript, + resolveSessionTranscriptFile, +} from "../../config/sessions/transcript.js"; import type { SessionEntry } from "../../config/sessions/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; @@ -45,6 +48,10 @@ export { const log = createSubsystemLogger("agents/agent-command"); +function normalizeTranscriptMirrorText(value: string): string { + return value.trim().replace(/\s+/gu, " "); +} + const ACP_TRANSCRIPT_USAGE = { input: 0, output: 0, @@ -81,6 +88,7 @@ type PersistTextTurnTranscriptParams = { threadId?: string | number; sessionCwd: string; config: OpenClawConfig; + embeddedAssistantGapFill?: boolean; assistant: { api: string; provider: string; @@ -217,22 +225,33 @@ async function persistTextTurnTranscript( } if (replyText) { - await appendSessionTranscriptMessage({ - transcriptPath: sessionFile, - sessionId: params.sessionId, - cwd: params.sessionCwd, - config: params.config, - message: { - role: "assistant", - content: [{ type: "text", text: replyText }], - api: params.assistant.api, - provider: params.assistant.provider, - model: params.assistant.model, - usage: resolveTranscriptUsage(params.assistant.usage), - stopReason: "stop", - timestamp: Date.now(), - }, - }); + let appendAssistant = true; + if (params.embeddedAssistantGapFill) { + const latest = await readTailAssistantTextFromSessionTranscript(sessionFile); + const normalizedReply = normalizeTranscriptMirrorText(replyText); + const normalizedLatest = latest?.text ? normalizeTranscriptMirrorText(latest.text) : ""; + if (normalizedLatest && normalizedLatest === normalizedReply) { + appendAssistant = false; + } + } + if (appendAssistant) { + await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + sessionId: params.sessionId, + cwd: params.sessionCwd, + config: params.config, + message: { + role: "assistant", + content: [{ type: "text", text: replyText }], + api: params.assistant.api, + provider: params.assistant.provider, + model: params.assistant.model, + usage: resolveTranscriptUsage(params.assistant.usage), + stopReason: "stop", + timestamp: Date.now(), + }, + }); + } } } finally { await lock.release(); @@ -296,14 +315,16 @@ export async function persistCliTurnTranscript(params: { threadId?: string | number; sessionCwd: string; config: OpenClawConfig; + embeddedAssistantGapFill?: boolean; }): Promise { const replyText = resolveCliTranscriptReplyText(params.result); const provider = params.result.meta.agentMeta?.provider?.trim() ?? "cli"; const model = params.result.meta.agentMeta?.model?.trim() ?? "default"; + const gapFill = params.embeddedAssistantGapFill ?? false; return await persistTextTurnTranscript({ - body: params.body, - transcriptBody: params.transcriptBody, + body: gapFill ? "" : params.body, + transcriptBody: gapFill ? undefined : params.transcriptBody, finalText: replyText, sessionId: params.sessionId, sessionKey: params.sessionKey, @@ -314,6 +335,7 @@ export async function persistCliTurnTranscript(params: { threadId: params.threadId, sessionCwd: params.sessionCwd, config: params.config, + embeddedAssistantGapFill: gapFill, assistant: { api: "cli", provider, diff --git a/src/commands/agent.test.ts b/src/commands/agent.test.ts index 74eabf2ea52..2295cc5e117 100644 --- a/src/commands/agent.test.ts +++ b/src/commands/agent.test.ts @@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, type MockInstance, vi } from "vitest" import "./agent-command.test-mocks.js"; import { __testing as acpManagerTesting } from "../acp/control-plane/manager.js"; import * as authProfileStoreModule from "../agents/auth-profiles/store.js"; +import * as attemptExecutionRuntime from "../agents/command/attempt-execution.runtime.js"; import { loadManifestModelCatalog, loadModelCatalog } from "../agents/model-catalog.js"; import * as modelSelectionModule from "../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; @@ -392,6 +393,30 @@ describe("agentCommand", () => { }); }); + it("persists embedded-runner turns to the session transcript", async () => { + await withTempHome(async (home) => { + const store = path.join(home, "sessions.json"); + mockConfig(home, store); + const base = createDefaultAgentResult({ payloads: [{ text: "assistant-visible" }] }); + vi.mocked(runEmbeddedPiAgent).mockResolvedValueOnce({ + ...base, + meta: { + ...base.meta, + executionTrace: { runner: "embedded" }, + }, + }); + + await agentCommand({ message: "hello from user", agentId: "main" }, runtime); + + expect(vi.mocked(attemptExecutionRuntime.persistCliTurnTranscript)).toHaveBeenCalledTimes(1); + const persistArgs = vi.mocked(attemptExecutionRuntime.persistCliTurnTranscript).mock + .calls[0]?.[0]; + expect(persistArgs?.embeddedAssistantGapFill).toBe(true); + expect(persistArgs?.body).toBe("hello from user"); + expect(persistArgs?.result.meta?.executionTrace?.runner).toBe("embedded"); + }); + }); + it("passes configured fast mode to embedded runs", async () => { await withTempHome(async (home) => { const store = path.join(home, "sessions.json"); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index c174d78b2be..7ebd91e490e 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -58,12 +58,37 @@ export type SessionTranscriptAssistantMessage = Parameters= 0; index -= 1) { - const line = lines[index]; + for (const line of raw.split(/\r?\n/).toReversed()) { if (!line.trim()) { continue; } try { - const parsed = JSON.parse(line) as { - id?: unknown; - message?: unknown; - }; - const message = parsed.message as { role?: unknown; timestamp?: unknown } | undefined; - if (!message || message.role !== "assistant") { - continue; + const assistantText = parseAssistantTranscriptText(line); + if (assistantText) { + return assistantText; } - const text = extractAssistantVisibleText(message)?.trim(); - if (!text) { - continue; - } - return { - ...(typeof parsed.id === "string" && parsed.id ? { id: parsed.id } : {}), - text, - ...(typeof message.timestamp === "number" && Number.isFinite(message.timestamp) - ? { timestamp: message.timestamp } - : {}), - }; } catch { continue; } @@ -156,6 +164,33 @@ export async function readLatestAssistantTextFromSessionTranscript( return undefined; } +export async function readTailAssistantTextFromSessionTranscript( + sessionFile: string | undefined, +): Promise { + if (!sessionFile?.trim()) { + return undefined; + } + + let raw: string; + try { + raw = await fs.promises.readFile(sessionFile, "utf-8"); + } catch { + return undefined; + } + + for (const line of raw.split(/\r?\n/).toReversed()) { + if (!line.trim()) { + continue; + } + try { + return parseAssistantTranscriptText(line); + } catch { + return undefined; + } + } + return undefined; +} + export async function appendAssistantMessageToSessionTranscript(params: { agentId?: string; sessionKey: string;