From b04dd6d05c3d630fe0c00c26208a562eb5eabdfd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 6 Apr 2026 13:23:18 +0100 Subject: [PATCH] refactor: consolidate session history sanitization --- ...pi-embedded-helpers.validate-turns.test.ts | 71 ++++++-- src/agents/pi-embedded-helpers/turns.ts | 142 +++++++++++---- src/agents/tools/chat-history-text.ts | 52 ++++-- src/agents/tools/session-message-text.ts | 55 +----- src/agents/tools/sessions.test.ts | 19 ++ src/config/sessions/transcript.runtime.ts | 5 +- src/config/sessions/transcript.test.ts | 80 ++++++++- src/config/sessions/transcript.ts | 103 ++++++++--- src/gateway/server-methods/chat.ts | 20 +-- src/gateway/session-history-state.ts | 166 ++++++++++++++++++ src/gateway/sessions-history-http.test.ts | 45 +++-- src/gateway/sessions-history-http.ts | 119 +++---------- 12 files changed, 600 insertions(+), 277 deletions(-) create mode 100644 src/gateway/session-history-state.ts diff --git a/src/agents/pi-embedded-helpers.validate-turns.test.ts b/src/agents/pi-embedded-helpers.validate-turns.test.ts index 342dbc8dfef..a073d5b1f3d 100644 --- a/src/agents/pi-embedded-helpers.validate-turns.test.ts +++ b/src/agents/pi-embedded-helpers.validate-turns.test.ts @@ -12,8 +12,8 @@ function asMessages(messages: unknown[]): AgentMessage[] { function makeDualToolUseAssistantContent() { return [ - { type: "toolUse", id: "tool-1", name: "test1", input: {} }, - { type: "toolUse", id: "tool-2", name: "test2", input: {} }, + { type: "toolUse", id: "tool-1", name: "test1", arguments: {} }, + { type: "toolUse", id: "tool-2", name: "test2", arguments: {} }, { type: "text", text: "Done" }, ]; } @@ -368,7 +368,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => { { role: "assistant", content: [ - { type: "toolUse", id: "tool-1", name: "test", input: {} }, + { type: "toolUse", id: "tool-1", name: "test", arguments: {} }, { type: "text", text: "I'll check that" }, ], }, @@ -389,7 +389,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => { { role: "assistant", content: [ - { type: "toolUse", id: "tool-1", name: "test", input: {} }, + { type: "toolUse", id: "tool-1", name: "test", arguments: {} }, { type: "text", text: "Here's result" }, ], }, @@ -408,7 +408,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => { // tool_use should be preserved because matching tool_result exists const assistantContent = (result[1] as { content?: unknown[] }).content; expect(assistantContent).toEqual([ - { type: "toolUse", id: "tool-1", name: "test", input: {} }, + { type: "toolUse", id: "tool-1", name: "test", arguments: {} }, { type: "text", text: "Here's result" }, ]); }); @@ -418,7 +418,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => { { role: "user", content: [{ type: "text", text: "Use tool" }] }, { role: "assistant", - content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }], + content: [{ type: "toolUse", id: "tool-1", name: "test", arguments: {} }], }, { role: "user", content: [{ type: "text", text: "Hello" }] }, ]); @@ -431,6 +431,23 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => { expect(assistantContent).toEqual([{ type: "text", text: "[tool calls omitted]" }]); }); + it("leaves aborted tool-only assistant turns empty instead of synthesizing fallback text", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + stopReason: "aborted", + content: [{ type: "toolCall", id: "tool-1", name: "test", arguments: {} }], + }, + { role: "user", content: [{ type: "text", text: "Hello" }] }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + expect((result[1] as { content?: unknown[] }).content).toEqual([]); + }); + it("should handle multiple dangling tool_use blocks", () => { const msgs = makeDualToolAnthropicTurns([{ type: "text", text: "OK" }]); @@ -458,28 +475,54 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => { // tool-1 should be preserved (has matching tool_result), tool-2 stripped, text preserved const assistantContent = (result[1] as { content?: unknown[] }).content; expect(assistantContent).toEqual([ - { type: "toolUse", id: "tool-1", name: "test1", input: {} }, + { type: "toolUse", id: "tool-1", name: "test1", arguments: {} }, { type: "text", text: "Done" }, ]); }); - it("should not modify messages when next is not user", () => { + it("matches standalone toolResult messages before the next assistant turn", () => { const msgs = asMessages([ { role: "user", content: [{ type: "text", text: "Use tool" }] }, { role: "assistant", - content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }], + content: [{ type: "toolCall", id: "tool-1", name: "test", arguments: {} }], }, - // Next is assistant, not user - should not strip - { role: "assistant", content: [{ type: "text", text: "Continue" }] }, + { role: "toolResult", toolCallId: "tool-1", content: [{ type: "text", text: "data" }] }, + { role: "user", content: [{ type: "text", text: "Continue" }] }, ]); const result = validateAnthropicTurns(msgs); - expect(result).toHaveLength(3); - // Original tool_use should be preserved + expect(result).toHaveLength(4); const assistantContent = (result[1] as { content?: unknown[] }).content; - expect(assistantContent).toEqual([{ type: "toolUse", id: "tool-1", name: "test", input: {} }]); + expect(assistantContent).toEqual([ + { type: "toolCall", id: "tool-1", name: "test", arguments: {} }, + ]); + }); + + it("matches tool result blocks across intermediate non-assistant messages", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + content: [ + { type: "functionCall", id: "tool-1", name: "test", arguments: {} }, + { type: "text", text: "Checking" }, + ], + }, + { role: "user", content: [{ type: "text", text: "still waiting" }] }, + { role: "tool", toolCallId: "tool-1", content: [{ type: "text", text: "data" }] }, + { role: "user", content: [{ type: "text", text: "Continue" }] }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(5); + const assistantContent = (result[1] as { content?: unknown[] }).content; + expect(assistantContent).toEqual([ + { type: "functionCall", id: "tool-1", name: "test", arguments: {} }, + { type: "text", text: "Checking" }, + ]); }); it("is replay-safe across repeated validation passes", () => { diff --git a/src/agents/pi-embedded-helpers/turns.ts b/src/agents/pi-embedded-helpers/turns.ts index df90ee30dfb..20fbbd387d8 100644 --- a/src/agents/pi-embedded-helpers/turns.ts +++ b/src/agents/pi-embedded-helpers/turns.ts @@ -1,16 +1,101 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { extractToolCallsFromAssistant, extractToolResultId } from "../tool-call-id.js"; type AnthropicContentBlock = { - type: "text" | "toolUse" | "toolResult"; + type: "text" | "toolUse" | "toolCall" | "functionCall" | "toolResult" | "tool"; text?: string; id?: string; name?: string; toolUseId?: string; + toolCallId?: string; }; +function trimNonEmptyString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed || undefined; +} + +function isToolCallBlock(block: AnthropicContentBlock): boolean { + return block.type === "toolUse" || block.type === "toolCall" || block.type === "functionCall"; +} + +function isAbortedAssistantTurn(message: AgentMessage): boolean { + const stopReason = (message as { stopReason?: unknown }).stopReason; + return stopReason === "aborted" || stopReason === "error"; +} + +function extractToolResultIdsFromRecord(record: Record): string[] { + const ids = [ + trimNonEmptyString(record.toolUseId), + trimNonEmptyString(record.toolCallId), + trimNonEmptyString(record.tool_use_id), + trimNonEmptyString(record.tool_call_id), + trimNonEmptyString(record.callId), + trimNonEmptyString(record.call_id), + ].filter((value): value is string => typeof value === "string"); + return [...new Set(ids)]; +} + +function collectMatchingToolResultIds(message: AgentMessage): Set { + const ids = new Set(); + const role = (message as { role?: unknown }).role; + if (role === "toolResult") { + const toolResultId = extractToolResultId( + message as Extract, + ); + if (toolResultId) { + ids.add(toolResultId); + } + } else if (role === "tool") { + for (const id of extractToolResultIdsFromRecord(message as Record)) { + ids.add(id); + } + } + + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content)) { + return ids; + } + + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const record = block as Record; + if (record.type !== "toolResult" && record.type !== "tool") { + continue; + } + for (const id of extractToolResultIdsFromRecord(record)) { + ids.add(id); + } + } + + return ids; +} + +function collectFutureToolResultIds(messages: AgentMessage[], startIndex: number): Set { + const ids = new Set(); + for (let index = startIndex + 1; index < messages.length; index += 1) { + const candidate = messages[index]; + if (!candidate || typeof candidate !== "object") { + continue; + } + if ((candidate as { role?: unknown }).role === "assistant") { + break; + } + for (const id of collectMatchingToolResultIds(candidate)) { + ids.add(id); + } + } + return ids; +} + /** - * Strips dangling tool_use blocks from assistant messages when the immediately - * following user message does not contain a matching tool_result block. + * Strips dangling tool-call blocks from assistant messages when no later + * tool-result span before the next assistant turn resolves them. * This fixes the "tool_use ids found without tool_result blocks" error from Anthropic. */ function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[] { @@ -32,51 +117,42 @@ function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[ const assistantMsg = msg as { content?: AnthropicContentBlock[]; }; - - // Get the next message to check for tool_result blocks - const nextMsg = messages[i + 1]; - const nextMsgRole = - nextMsg && typeof nextMsg === "object" - ? ((nextMsg as { role?: unknown }).role as string | undefined) - : undefined; - - // If next message is not user, keep the assistant message as-is - if (nextMsgRole !== "user") { + const originalContent = Array.isArray(assistantMsg.content) ? assistantMsg.content : []; + if (originalContent.length === 0) { result.push(msg); continue; } - - // Collect tool_use_ids from the next user message's tool_result blocks - const nextUserMsg = nextMsg as { - content?: AnthropicContentBlock[]; - }; - const validToolUseIds = new Set(); - if (Array.isArray(nextUserMsg.content)) { - for (const block of nextUserMsg.content) { - if (block && block.type === "toolResult" && block.toolUseId) { - validToolUseIds.add(block.toolUseId); - } - } + if ( + extractToolCallsFromAssistant(msg as Extract).length === + 0 + ) { + result.push(msg); + continue; } + const validToolUseIds = collectFutureToolResultIds(messages, i); - // Filter out tool_use blocks that don't have matching tool_result - const originalContent = Array.isArray(assistantMsg.content) ? assistantMsg.content : []; const filteredContent = originalContent.filter((block) => { if (!block) { return false; } - if (block.type !== "toolUse") { + if (!isToolCallBlock(block)) { return true; } - // Keep tool_use if its id is in the valid set - return validToolUseIds.has(block.id || ""); + const blockId = trimNonEmptyString(block.id); + return blockId ? validToolUseIds.has(blockId) : false; }); - // If all content would be removed, insert a minimal fallback text block + if (filteredContent.length === originalContent.length) { + result.push(msg); + continue; + } + if (originalContent.length > 0 && filteredContent.length === 0) { result.push({ ...assistantMsg, - content: [{ type: "text", text: "[tool calls omitted]" }], + content: isAbortedAssistantTurn(msg) + ? [] + : ([{ type: "text", text: "[tool calls omitted]" }] as AnthropicContentBlock[]), } as AgentMessage); } else { result.push({ @@ -190,7 +266,7 @@ export function mergeConsecutiveUserTurns( * Also strips dangling tool_use blocks that lack corresponding tool_result blocks. */ export function validateAnthropicTurns(messages: AgentMessage[]): AgentMessage[] { - // First, strip dangling tool_use blocks from assistant messages + // First, strip dangling tool-call blocks from assistant messages. const stripped = stripDanglingAnthropicToolUses(messages); return validateTurnsWithConsecutiveMerge({ diff --git a/src/agents/tools/chat-history-text.ts b/src/agents/tools/chat-history-text.ts index fc903c6edf1..9cf661df0bc 100644 --- a/src/agents/tools/chat-history-text.ts +++ b/src/agents/tools/chat-history-text.ts @@ -31,6 +31,25 @@ export function sanitizeTextContent(text: string): string { ); } +export function hasAssistantPhaseMetadata(message: unknown): boolean { + if (!message || typeof message !== "object") { + return false; + } + if ((message as { role?: unknown }).role !== "assistant") { + return false; + } + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content)) { + return false; + } + return content.some( + (block) => + block && + typeof block === "object" && + typeof (block as { textSignature?: unknown }).textSignature === "string", + ); +} + export function extractAssistantText(message: unknown): string | undefined { if (!message || typeof message !== "object") { return undefined; @@ -38,29 +57,26 @@ export function extractAssistantText(message: unknown): string | undefined { if ((message as { role?: unknown }).role !== "assistant") { return undefined; } - const content = (message as { content?: unknown }).content; - if (!Array.isArray(content)) { - return undefined; + if (hasAssistantPhaseMetadata(message)) { + const visibleText = extractAssistantVisibleText( + message as Parameters[0], + ); + return visibleText?.trim() ? visibleText : undefined; } - const hasPhaseMetadata = content.some( - (block) => - block && typeof block === "object" && typeof (block as { textSignature?: unknown }).textSignature === "string", - ); - const joined = hasPhaseMetadata - ? (extractAssistantVisibleText(message as Parameters[0]) ?? "") - : ( - extractTextFromChatContent(content, { - sanitizeText: sanitizeTextContent, - joinWith: "", - normalizeText: (text) => text.trim(), - }) ?? "" + const content = (message as { content?: unknown }).content; + const joined = Array.isArray(content) + ? (extractTextFromChatContent(content, { + sanitizeText: sanitizeTextContent, + joinWith: "", + normalizeText: (text) => text.trim(), + }) ?? "") + : sanitizeTextContent( + extractAssistantVisibleText(message as Parameters[0]) ?? + "", ); if (!joined.trim()) { return undefined; } - if (hasPhaseMetadata) { - return joined; - } const stopReason = (message as { stopReason?: unknown }).stopReason; // Gate on stopReason only — a non-error response with a stale/background errorMessage // should not have its content rewritten with error templates (#13935). diff --git a/src/agents/tools/session-message-text.ts b/src/agents/tools/session-message-text.ts index f92211a49e7..f4fcc6a3a7a 100644 --- a/src/agents/tools/session-message-text.ts +++ b/src/agents/tools/session-message-text.ts @@ -1,50 +1,5 @@ -import { extractTextFromChatContent } from "../../shared/chat-content.js"; -import { sanitizeUserFacingText } from "../pi-embedded-helpers.js"; -import { - stripDowngradedToolCallText, - stripMinimaxToolCallXml, - stripModelSpecialTokens, - stripThinkingTagsFromText, -} from "../pi-embedded-utils.js"; - -export function stripToolMessages(messages: unknown[]): unknown[] { - return messages.filter((msg) => { - if (!msg || typeof msg !== "object") { - return true; - } - const role = (msg as { role?: unknown }).role; - return role !== "toolResult" && role !== "tool"; - }); -} - -export function sanitizeTextContent(text: string): string { - if (!text) { - return text; - } - return stripThinkingTagsFromText( - stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))), - ); -} - -export function extractAssistantText(message: unknown): string | undefined { - if (!message || typeof message !== "object") { - return undefined; - } - if ((message as { role?: unknown }).role !== "assistant") { - return undefined; - } - const content = (message as { content?: unknown }).content; - if (!Array.isArray(content)) { - return undefined; - } - const joined = - extractTextFromChatContent(content, { - sanitizeText: sanitizeTextContent, - joinWith: "", - normalizeText: (text) => text.trim(), - }) ?? ""; - const stopReason = (message as { stopReason?: unknown }).stopReason; - const errorContext = stopReason === "error"; - - return joined ? sanitizeUserFacingText(joined, { errorContext }) : undefined; -} +export { + extractAssistantText, + sanitizeTextContent, + stripToolMessages, +} from "./chat-history-text.js"; diff --git a/src/agents/tools/sessions.test.ts b/src/agents/tools/sessions.test.ts index d4556f07d59..4ab10a1813c 100644 --- a/src/agents/tools/sessions.test.ts +++ b/src/agents/tools/sessions.test.ts @@ -218,6 +218,25 @@ describe("extractAssistantText", () => { }; expect(extractAssistantText(message)).toBe("Handle payment required errors in your API."); }); + + it("prefers final_answer text when phased assistant history is present", () => { + const message = { + role: "assistant", + content: [ + { + type: "text", + text: "internal reasoning", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + }; + expect(extractAssistantText(message)).toBe("Done."); + }); }); describe("resolveAnnounceTarget", () => { diff --git a/src/config/sessions/transcript.runtime.ts b/src/config/sessions/transcript.runtime.ts index b886816b75c..2bcd5a6073a 100644 --- a/src/config/sessions/transcript.runtime.ts +++ b/src/config/sessions/transcript.runtime.ts @@ -1 +1,4 @@ -export { appendAssistantMessageToSessionTranscript } from "./transcript.js"; +export { + appendAssistantMessageToSessionTranscript, + appendExactAssistantMessageToSessionTranscript, +} from "./transcript.js"; diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index bba7f57ae0c..5ae342231e6 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -3,7 +3,10 @@ import { describe, expect, it, vi } from "vitest"; import * as transcriptEvents from "../../sessions/transcript-events.js"; import { resolveSessionTranscriptPathInDir } from "./paths.js"; import { useTempSessionsFixture } from "./test-helpers.js"; -import { appendAssistantMessageToSessionTranscript } from "./transcript.js"; +import { + appendAssistantMessageToSessionTranscript, + appendExactAssistantMessageToSessionTranscript, +} from "./transcript.js"; describe("appendAssistantMessageToSessionTranscript", () => { const fixture = useTempSessionsFixture("transcript-test-"); @@ -193,4 +196,79 @@ describe("appendAssistantMessageToSessionTranscript", () => { const lines = fs.readFileSync(sessionFile, "utf-8").trim().split("\n"); expect(lines.length).toBe(3); }); + + it("appends exact assistant transcript messages without rewriting phased content", async () => { + writeTranscriptStore(); + + const result = await appendExactAssistantMessageToSessionTranscript({ + sessionKey, + storePath: fixture.storePath(), + message: { + role: "assistant", + content: [ + { + type: "text", + text: "internal reasoning", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + api: "openai-responses", + provider: "openclaw", + model: "delivery-mirror", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, + stopReason: "stop", + timestamp: Date.now(), + }, + }); + + expect(result.ok).toBe(true); + if (result.ok) { + const lines = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n"); + const messageLine = JSON.parse(lines[1]); + expect(messageLine.message.content).toEqual([ + { + type: "text", + text: "internal reasoning", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ]); + } + }); + + it("can emit file-only transcript refresh events for exact assistant appends", async () => { + writeTranscriptStore(); + const emitSpy = vi.spyOn(transcriptEvents, "emitSessionTranscriptUpdate"); + + const result = await appendExactAssistantMessageToSessionTranscript({ + sessionKey, + storePath: fixture.storePath(), + updateMode: "file-only", + message: { + role: "assistant", + content: [{ type: "text", text: "Done." }], + api: "openai-responses", + provider: "openclaw", + model: "delivery-mirror", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, + stopReason: "stop", + timestamp: Date.now(), + }, + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(emitSpy).toHaveBeenCalledWith(result.sessionFile); + } + emitSpy.mockRestore(); + }); }); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index c478c850bc8..454feded0fa 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -35,6 +35,16 @@ async function ensureSessionHeader(params: { }); } +export type SessionTranscriptAppendResult = + | { ok: true; sessionFile: string; messageId: string } + | { ok: false; reason: string }; + +export type SessionTranscriptUpdateMode = "inline" | "file-only" | "none"; + +export type SessionTranscriptAssistantMessage = Parameters[0] & { + role: "assistant"; +}; + export async function resolveSessionTranscriptFile(params: { sessionId: string; sessionKey: string; @@ -88,7 +98,8 @@ export async function appendAssistantMessageToSessionTranscript(params: { idempotencyKey?: string; /** Optional override for store path (mostly for tests). */ storePath?: string; -}): Promise<{ ok: true; sessionFile: string; messageId: string } | { ok: false; reason: string }> { + updateMode?: SessionTranscriptUpdateMode; +}): Promise { const sessionKey = params.sessionKey.trim(); if (!sessionKey) { return { ok: false, reason: "missing sessionKey" }; @@ -102,6 +113,54 @@ export async function appendAssistantMessageToSessionTranscript(params: { return { ok: false, reason: "empty text" }; } + return appendExactAssistantMessageToSessionTranscript({ + agentId: params.agentId, + sessionKey, + storePath: params.storePath, + idempotencyKey: params.idempotencyKey, + updateMode: params.updateMode, + message: { + role: "assistant" as const, + content: [{ type: "text", text: mirrorText }], + api: "openai-responses", + provider: "openclaw", + model: "delivery-mirror", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + stopReason: "stop" as const, + timestamp: Date.now(), + }, + }); +} + +export async function appendExactAssistantMessageToSessionTranscript(params: { + agentId?: string; + sessionKey: string; + message: SessionTranscriptAssistantMessage; + idempotencyKey?: string; + storePath?: string; + updateMode?: SessionTranscriptUpdateMode; +}): Promise { + const sessionKey = params.sessionKey.trim(); + if (!sessionKey) { + return { ok: false, reason: "missing sessionKey" }; + } + if (params.message.role !== "assistant") { + return { ok: false, reason: "message role must be assistant" }; + } + const storePath = params.storePath ?? resolveDefaultSessionStorePath(params.agentId); const store = loadSessionStore(storePath, { skipCache: true }); const normalizedKey = normalizeStoreSessionKey(sessionKey); @@ -131,41 +190,33 @@ export async function appendAssistantMessageToSessionTranscript(params: { await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId }); - const existingMessageId = params.idempotencyKey - ? await transcriptHasIdempotencyKey(sessionFile, params.idempotencyKey) + const explicitIdempotencyKey = + params.idempotencyKey ?? + ((params.message as { idempotencyKey?: unknown }).idempotencyKey as string | undefined); + const existingMessageId = explicitIdempotencyKey + ? await transcriptHasIdempotencyKey(sessionFile, explicitIdempotencyKey) : undefined; if (existingMessageId) { return { ok: true, sessionFile, messageId: existingMessageId }; } const message = { - role: "assistant" as const, - content: [{ type: "text", text: mirrorText }], - api: "openai-responses", - provider: "openclaw", - model: "delivery-mirror", - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - total: 0, - }, - }, - stopReason: "stop" as const, - timestamp: Date.now(), - ...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}), + ...params.message, + ...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}), } as Parameters[0]; const sessionManager = SessionManager.open(sessionFile); const messageId = sessionManager.appendMessage(message); - emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId }); + switch (params.updateMode ?? "inline") { + case "inline": + emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId }); + break; + case "file-only": + emitSessionTranscriptUpdate(sessionFile); + break; + case "none": + break; + } return { ok: true, sessionFile, messageId }; } diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index b8ff09e446b..c51ab09cc17 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -5,6 +5,10 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; +import { + extractAssistantText as extractAssistantHistoryText, + hasAssistantPhaseMetadata, +} from "../../agents/tools/chat-history-text.js"; import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import type { MsgContext } from "../../auto-reply/templating.js"; @@ -19,7 +23,6 @@ import { normalizeInputProvenance, type InputProvenance } from "../../sessions/i import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; -import { extractAssistantVisibleText } from "../../shared/chat-message-content.js"; import { stripInlineDirectiveTagsForDisplay, stripInlineDirectiveTagsFromMessageForDisplay, @@ -660,18 +663,9 @@ function sanitizeChatHistoryMessage( } else if (Array.isArray(entry.content)) { const updated = entry.content.map((block) => sanitizeChatHistoryContentBlock(block, maxChars)); const sanitizedBlocks = updated.map((item) => item.block); - const hasPhaseMetadata = - entry.role === "assistant" && - entry.content.some( - (block) => - block && - typeof block === "object" && - typeof (block as { textSignature?: unknown }).textSignature === "string", - ); + const hasPhaseMetadata = hasAssistantPhaseMetadata(entry); if (hasPhaseMetadata) { - const stripped = stripInlineDirectiveTagsForDisplay( - extractAssistantVisibleText(entry as Parameters[0]), - ); + const stripped = stripInlineDirectiveTagsForDisplay(extractAssistantHistoryText(entry)); const res = truncateChatHistoryText(stripped.text, maxChars); const nonTextBlocks = sanitizedBlocks.filter( (block) => @@ -704,7 +698,7 @@ function sanitizeChatHistoryMessage( * dropping messages that carry real text alongside a stale `content: "NO_REPLY"`. */ function extractAssistantTextForSilentCheck(message: unknown): string | undefined { - return extractAssistantVisibleText(message); + return extractAssistantHistoryText(message); } export function sanitizeChatHistoryMessages(messages: unknown[], maxChars: number): unknown[] { diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts new file mode 100644 index 00000000000..7c139927728 --- /dev/null +++ b/src/gateway/session-history-state.ts @@ -0,0 +1,166 @@ +import { + DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, + sanitizeChatHistoryMessages, +} from "./server-methods/chat.js"; +import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js"; + +export type PaginatedSessionHistory = { + items: unknown[]; + messages: unknown[]; + nextCursor?: string; + hasMore: boolean; +}; + +type SessionHistoryTranscriptTarget = { + sessionId: string; + storePath?: string; + sessionFile?: string; +}; + +function resolveCursorSeq(cursor: string | undefined): number | undefined { + if (!cursor) { + return undefined; + } + const normalized = cursor.startsWith("seq:") ? cursor.slice(4) : cursor; + const value = Number.parseInt(normalized, 10); + return Number.isFinite(value) && value > 0 ? value : undefined; +} + +export function resolveMessageSeq(message: unknown): number | undefined { + if (!message || typeof message !== "object" || Array.isArray(message)) { + return undefined; + } + const meta = (message as { __openclaw?: unknown }).__openclaw; + if (!meta || typeof meta !== "object" || Array.isArray(meta)) { + return undefined; + } + const seq = (meta as { seq?: unknown }).seq; + return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined; +} + +export function paginateSessionMessages( + messages: unknown[], + limit: number | undefined, + cursor: string | undefined, +): PaginatedSessionHistory { + const cursorSeq = resolveCursorSeq(cursor); + let endExclusive = messages.length; + if (typeof cursorSeq === "number") { + endExclusive = messages.findIndex((message, index) => { + const seq = resolveMessageSeq(message); + if (typeof seq === "number") { + return seq >= cursorSeq; + } + return index + 1 >= cursorSeq; + }); + if (endExclusive < 0) { + endExclusive = messages.length; + } + } + const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0; + const items = messages.slice(start, endExclusive); + const firstSeq = resolveMessageSeq(items[0]); + return { + items, + messages: items, + hasMore: start > 0, + ...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}), + }; +} + +function sanitizeRawTranscriptMessages(params: { + rawMessages: unknown[]; + maxChars?: number; + limit?: number; + cursor?: string; +}): PaginatedSessionHistory { + return paginateSessionMessages( + sanitizeChatHistoryMessages( + params.rawMessages, + params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, + ), + params.limit, + params.cursor, + ); +} + +export class SessionHistorySseState { + private readonly target: SessionHistoryTranscriptTarget; + private readonly maxChars: number; + private readonly limit: number | undefined; + private readonly cursor: string | undefined; + private sentHistory: PaginatedSessionHistory; + private rawTranscriptSeq: number; + + constructor(params: { + target: SessionHistoryTranscriptTarget; + maxChars?: number; + limit?: number; + cursor?: string; + }) { + this.target = params.target; + this.maxChars = params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS; + this.limit = params.limit; + this.cursor = params.cursor; + const rawMessages = this.readRawMessages(); + this.sentHistory = sanitizeRawTranscriptMessages({ + rawMessages, + maxChars: this.maxChars, + limit: this.limit, + cursor: this.cursor, + }); + this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length; + } + + snapshot(): PaginatedSessionHistory { + return this.sentHistory; + } + + appendInlineMessage(update: { + message: unknown; + messageId?: string; + }): { message: unknown; messageSeq?: number } | null { + if (this.limit !== undefined || this.cursor !== undefined) { + return null; + } + this.rawTranscriptSeq += 1; + const nextMessage = attachOpenClawTranscriptMeta(update.message, { + ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), + seq: this.rawTranscriptSeq, + }); + const sanitized = sanitizeChatHistoryMessages([nextMessage], this.maxChars); + if (sanitized.length === 0) { + return null; + } + const sanitizedMessage = sanitized[0]; + this.sentHistory = { + items: [...this.sentHistory.items, sanitizedMessage], + messages: [...this.sentHistory.items, sanitizedMessage], + hasMore: false, + }; + return { + message: sanitizedMessage, + messageSeq: resolveMessageSeq(sanitizedMessage), + }; + } + + refresh(): PaginatedSessionHistory { + const rawMessages = this.readRawMessages(); + this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length; + this.sentHistory = sanitizeRawTranscriptMessages({ + rawMessages, + maxChars: this.maxChars, + limit: this.limit, + cursor: this.cursor, + }); + return this.sentHistory; + } + + private readRawMessages(): unknown[] { + return readSessionMessages( + this.target.sessionId, + this.target.storePath, + this.target.sessionFile, + ); + } +} diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index 75653115ed3..11781cfcd39 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -1,10 +1,11 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; import { afterEach, describe, expect, test } from "vitest"; -import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js"; -import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { + appendAssistantMessageToSessionTranscript, + appendExactAssistantMessageToSessionTranscript, +} from "../config/sessions/transcript.js"; import { testState } from "./test-helpers.runtime-state.js"; import { connectReq, @@ -85,25 +86,23 @@ function makeTranscriptAssistantMessage(params: { }; } -function appendTranscriptMessage(params: { - sessionFile: string; +async function appendTranscriptMessage(params: { sessionKey: string; message: ReturnType; emitInlineMessage?: boolean; -}): string { - const sessionManager = SessionManager.open(params.sessionFile); - const messageId = sessionManager.appendMessage(params.message); - emitSessionTranscriptUpdate( - params.emitInlineMessage === false - ? params.sessionFile - : { - sessionFile: params.sessionFile, - sessionKey: params.sessionKey, - message: params.message, - messageId, - }, - ); - return messageId; + storePath?: string; +}): Promise { + const appended = await appendExactAssistantMessageToSessionTranscript({ + sessionKey: params.sessionKey, + storePath: params.storePath ?? testState.sessionStorePath, + updateMode: params.emitInlineMessage === false ? "file-only" : "inline", + message: params.message, + }); + expect(appended.ok).toBe(true); + if (!appended.ok) { + throw new Error(`append failed: ${appended.reason}`); + } + return appended.messageId; } async function fetchSessionHistory( @@ -404,9 +403,9 @@ describe("session history HTTP endpoints", () => { if (!hidden.ok) { throw new Error(`append failed: ${hidden.reason}`); } - const visibleMessageId = appendTranscriptMessage({ - sessionFile: hidden.sessionFile, + const visibleMessageId = await appendTranscriptMessage({ sessionKey: "agent:main:main", + storePath, message: makeTranscriptAssistantMessage({ text: "Done.", content: [ @@ -582,9 +581,9 @@ describe("session history HTTP endpoints", () => { if (!second.ok) { throw new Error(`append failed: ${second.reason}`); } - appendTranscriptMessage({ - sessionFile: second.sessionFile, + await appendTranscriptMessage({ sessionKey: "agent:main:main", + storePath, message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }), emitInlineMessage: false, }); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index b9231e41e94..bf2eaffa7bc 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -22,8 +22,8 @@ import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, sanitizeChatHistoryMessages, } from "./server-methods/chat.js"; +import { paginateSessionMessages, SessionHistorySseState } from "./session-history-state.js"; import { - attachOpenClawTranscriptMeta, readSessionMessages, resolveFreshestSessionEntryFromStoreKeys, resolveGatewaySessionStoreTarget, @@ -72,64 +72,6 @@ function resolveCursor(req: IncomingMessage): string | undefined { return trimmed ? trimmed : undefined; } -type PaginatedSessionHistory = { - items: unknown[]; - messages: unknown[]; - nextCursor?: string; - hasMore: boolean; -}; - -function resolveCursorSeq(cursor: string | undefined): number | undefined { - if (!cursor) { - return undefined; - } - const normalized = cursor.startsWith("seq:") ? cursor.slice(4) : cursor; - const value = Number.parseInt(normalized, 10); - return Number.isFinite(value) && value > 0 ? value : undefined; -} - -function resolveMessageSeq(message: unknown): number | undefined { - if (!message || typeof message !== "object" || Array.isArray(message)) { - return undefined; - } - const meta = (message as { __openclaw?: unknown }).__openclaw; - if (!meta || typeof meta !== "object" || Array.isArray(meta)) { - return undefined; - } - const seq = (meta as { seq?: unknown }).seq; - return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined; -} - -function paginateSessionMessages( - messages: unknown[], - limit: number | undefined, - cursor: string | undefined, -): PaginatedSessionHistory { - const cursorSeq = resolveCursorSeq(cursor); - let endExclusive = messages.length; - if (typeof cursorSeq === "number") { - endExclusive = messages.findIndex((message, index) => { - const seq = resolveMessageSeq(message); - if (typeof seq === "number") { - return seq >= cursorSeq; - } - return index + 1 >= cursorSeq; - }); - if (endExclusive < 0) { - endExclusive = messages.length; - } - } - const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0; - const items = messages.slice(start, endExclusive); - const firstSeq = resolveMessageSeq(items[0]); - return { - items, - messages: items, - hasMore: start > 0, - ...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}), - }; -} - function canonicalizePath(value: string | undefined): string | undefined { const trimmed = value?.trim(); if (!trimmed) { @@ -248,13 +190,17 @@ export async function handleSessionHistoryHttpRequest( : new Set(); let sentHistory = history; - // Initialize rawTranscriptSeq from the raw transcript's last __openclaw.seq - // value, not the sanitized history tail, so seq numbering stays correct even - // when sanitization drops messages (e.g. silent replies). - const rawMessages = entry?.sessionId - ? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile) - : []; - let rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length; + const sseState = new SessionHistorySseState({ + target: { + sessionId: entry.sessionId, + storePath: target.storePath, + sessionFile: entry.sessionFile, + }, + maxChars: effectiveMaxChars, + limit, + cursor, + }); + sentHistory = sseState.snapshot(); setSseHeaders(res); res.write("retry: 1000\n\n"); sseWrite(res, "history", { @@ -277,48 +223,25 @@ export async function handleSessionHistoryHttpRequest( return; } if (update.message !== undefined) { - rawTranscriptSeq += 1; - const nextMessage = attachOpenClawTranscriptMeta(update.message, { - ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), - seq: rawTranscriptSeq, - }); if (limit === undefined && cursor === undefined) { - const sanitized = sanitizeChatHistoryMessages([nextMessage], effectiveMaxChars); - if (sanitized.length === 0) { + const nextEvent = sseState.appendInlineMessage({ + message: update.message, + messageId: update.messageId, + }); + if (!nextEvent) { return; } - const sanitizedMsg = sanitized[0]; - sentHistory = { - items: [...sentHistory.items, sanitizedMsg], - messages: [...sentHistory.items, sanitizedMsg], - hasMore: false, - }; + sentHistory = sseState.snapshot(); sseWrite(res, "message", { sessionKey: target.canonicalKey, - message: sanitizedMsg, + message: nextEvent.message, ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), - messageSeq: resolveMessageSeq(sanitizedMsg), + messageSeq: nextEvent.messageSeq, }); return; } } - // Transcript-only updates can advance the raw transcript without carrying - // an inline message payload. Resync the raw seq counter before the next - // fast-path append so later messageSeq values stay monotonic. - const refreshedRawMessages = readSessionMessages( - entry.sessionId, - target.storePath, - entry.sessionFile, - ); - rawTranscriptSeq = - resolveMessageSeq(refreshedRawMessages.at(-1)) ?? refreshedRawMessages.length; - // Bounded SSE history refreshes: apply sanitizeChatHistoryMessages before - // pagination, consistent with the unbounded path. - sentHistory = paginateSessionMessages( - sanitizeChatHistoryMessages(refreshedRawMessages, effectiveMaxChars), - limit, - cursor, - ); + sentHistory = sseState.refresh(); sseWrite(res, "history", { sessionKey: target.canonicalKey, ...sentHistory,