mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
refactor: consolidate session history sanitization
This commit is contained in:
@@ -12,8 +12,8 @@ function asMessages(messages: unknown[]): AgentMessage[] {
|
||||
|
||||
function makeDualToolUseAssistantContent() {
|
||||
return [
|
||||
{ type: "toolUse", id: "tool-1", name: "test1", input: {} },
|
||||
{ type: "toolUse", id: "tool-2", name: "test2", input: {} },
|
||||
{ type: "toolUse", id: "tool-1", name: "test1", arguments: {} },
|
||||
{ type: "toolUse", id: "tool-2", name: "test2", arguments: {} },
|
||||
{ type: "text", text: "Done" },
|
||||
];
|
||||
}
|
||||
@@ -368,7 +368,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "toolUse", id: "tool-1", name: "test", input: {} },
|
||||
{ type: "toolUse", id: "tool-1", name: "test", arguments: {} },
|
||||
{ type: "text", text: "I'll check that" },
|
||||
],
|
||||
},
|
||||
@@ -389,7 +389,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "toolUse", id: "tool-1", name: "test", input: {} },
|
||||
{ type: "toolUse", id: "tool-1", name: "test", arguments: {} },
|
||||
{ type: "text", text: "Here's result" },
|
||||
],
|
||||
},
|
||||
@@ -408,7 +408,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
|
||||
// tool_use should be preserved because matching tool_result exists
|
||||
const assistantContent = (result[1] as { content?: unknown[] }).content;
|
||||
expect(assistantContent).toEqual([
|
||||
{ type: "toolUse", id: "tool-1", name: "test", input: {} },
|
||||
{ type: "toolUse", id: "tool-1", name: "test", arguments: {} },
|
||||
{ type: "text", text: "Here's result" },
|
||||
]);
|
||||
});
|
||||
@@ -418,7 +418,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
|
||||
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }],
|
||||
content: [{ type: "toolUse", id: "tool-1", name: "test", arguments: {} }],
|
||||
},
|
||||
{ role: "user", content: [{ type: "text", text: "Hello" }] },
|
||||
]);
|
||||
@@ -431,6 +431,23 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
|
||||
expect(assistantContent).toEqual([{ type: "text", text: "[tool calls omitted]" }]);
|
||||
});
|
||||
|
||||
it("leaves aborted tool-only assistant turns empty instead of synthesizing fallback text", () => {
|
||||
const msgs = asMessages([
|
||||
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
|
||||
{
|
||||
role: "assistant",
|
||||
stopReason: "aborted",
|
||||
content: [{ type: "toolCall", id: "tool-1", name: "test", arguments: {} }],
|
||||
},
|
||||
{ role: "user", content: [{ type: "text", text: "Hello" }] },
|
||||
]);
|
||||
|
||||
const result = validateAnthropicTurns(msgs);
|
||||
|
||||
expect(result).toHaveLength(3);
|
||||
expect((result[1] as { content?: unknown[] }).content).toEqual([]);
|
||||
});
|
||||
|
||||
it("should handle multiple dangling tool_use blocks", () => {
|
||||
const msgs = makeDualToolAnthropicTurns([{ type: "text", text: "OK" }]);
|
||||
|
||||
@@ -458,28 +475,54 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
|
||||
// tool-1 should be preserved (has matching tool_result), tool-2 stripped, text preserved
|
||||
const assistantContent = (result[1] as { content?: unknown[] }).content;
|
||||
expect(assistantContent).toEqual([
|
||||
{ type: "toolUse", id: "tool-1", name: "test1", input: {} },
|
||||
{ type: "toolUse", id: "tool-1", name: "test1", arguments: {} },
|
||||
{ type: "text", text: "Done" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("should not modify messages when next is not user", () => {
|
||||
it("matches standalone toolResult messages before the next assistant turn", () => {
|
||||
const msgs = asMessages([
|
||||
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }],
|
||||
content: [{ type: "toolCall", id: "tool-1", name: "test", arguments: {} }],
|
||||
},
|
||||
// Next is assistant, not user - should not strip
|
||||
{ role: "assistant", content: [{ type: "text", text: "Continue" }] },
|
||||
{ role: "toolResult", toolCallId: "tool-1", content: [{ type: "text", text: "data" }] },
|
||||
{ role: "user", content: [{ type: "text", text: "Continue" }] },
|
||||
]);
|
||||
|
||||
const result = validateAnthropicTurns(msgs);
|
||||
|
||||
expect(result).toHaveLength(3);
|
||||
// Original tool_use should be preserved
|
||||
expect(result).toHaveLength(4);
|
||||
const assistantContent = (result[1] as { content?: unknown[] }).content;
|
||||
expect(assistantContent).toEqual([{ type: "toolUse", id: "tool-1", name: "test", input: {} }]);
|
||||
expect(assistantContent).toEqual([
|
||||
{ type: "toolCall", id: "tool-1", name: "test", arguments: {} },
|
||||
]);
|
||||
});
|
||||
|
||||
it("matches tool result blocks across intermediate non-assistant messages", () => {
|
||||
const msgs = asMessages([
|
||||
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "functionCall", id: "tool-1", name: "test", arguments: {} },
|
||||
{ type: "text", text: "Checking" },
|
||||
],
|
||||
},
|
||||
{ role: "user", content: [{ type: "text", text: "still waiting" }] },
|
||||
{ role: "tool", toolCallId: "tool-1", content: [{ type: "text", text: "data" }] },
|
||||
{ role: "user", content: [{ type: "text", text: "Continue" }] },
|
||||
]);
|
||||
|
||||
const result = validateAnthropicTurns(msgs);
|
||||
|
||||
expect(result).toHaveLength(5);
|
||||
const assistantContent = (result[1] as { content?: unknown[] }).content;
|
||||
expect(assistantContent).toEqual([
|
||||
{ type: "functionCall", id: "tool-1", name: "test", arguments: {} },
|
||||
{ type: "text", text: "Checking" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("is replay-safe across repeated validation passes", () => {
|
||||
|
||||
@@ -1,16 +1,101 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { extractToolCallsFromAssistant, extractToolResultId } from "../tool-call-id.js";
|
||||
|
||||
type AnthropicContentBlock = {
|
||||
type: "text" | "toolUse" | "toolResult";
|
||||
type: "text" | "toolUse" | "toolCall" | "functionCall" | "toolResult" | "tool";
|
||||
text?: string;
|
||||
id?: string;
|
||||
name?: string;
|
||||
toolUseId?: string;
|
||||
toolCallId?: string;
|
||||
};
|
||||
|
||||
function trimNonEmptyString(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed || undefined;
|
||||
}
|
||||
|
||||
function isToolCallBlock(block: AnthropicContentBlock): boolean {
|
||||
return block.type === "toolUse" || block.type === "toolCall" || block.type === "functionCall";
|
||||
}
|
||||
|
||||
function isAbortedAssistantTurn(message: AgentMessage): boolean {
|
||||
const stopReason = (message as { stopReason?: unknown }).stopReason;
|
||||
return stopReason === "aborted" || stopReason === "error";
|
||||
}
|
||||
|
||||
function extractToolResultIdsFromRecord(record: Record<string, unknown>): string[] {
|
||||
const ids = [
|
||||
trimNonEmptyString(record.toolUseId),
|
||||
trimNonEmptyString(record.toolCallId),
|
||||
trimNonEmptyString(record.tool_use_id),
|
||||
trimNonEmptyString(record.tool_call_id),
|
||||
trimNonEmptyString(record.callId),
|
||||
trimNonEmptyString(record.call_id),
|
||||
].filter((value): value is string => typeof value === "string");
|
||||
return [...new Set(ids)];
|
||||
}
|
||||
|
||||
function collectMatchingToolResultIds(message: AgentMessage): Set<string> {
|
||||
const ids = new Set<string>();
|
||||
const role = (message as { role?: unknown }).role;
|
||||
if (role === "toolResult") {
|
||||
const toolResultId = extractToolResultId(
|
||||
message as Extract<AgentMessage, { role: "toolResult" }>,
|
||||
);
|
||||
if (toolResultId) {
|
||||
ids.add(toolResultId);
|
||||
}
|
||||
} else if (role === "tool") {
|
||||
for (const id of extractToolResultIdsFromRecord(message as Record<string, unknown>)) {
|
||||
ids.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
const content = (message as { content?: unknown }).content;
|
||||
if (!Array.isArray(content)) {
|
||||
return ids;
|
||||
}
|
||||
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") {
|
||||
continue;
|
||||
}
|
||||
const record = block as Record<string, unknown>;
|
||||
if (record.type !== "toolResult" && record.type !== "tool") {
|
||||
continue;
|
||||
}
|
||||
for (const id of extractToolResultIdsFromRecord(record)) {
|
||||
ids.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
return ids;
|
||||
}
|
||||
|
||||
function collectFutureToolResultIds(messages: AgentMessage[], startIndex: number): Set<string> {
|
||||
const ids = new Set<string>();
|
||||
for (let index = startIndex + 1; index < messages.length; index += 1) {
|
||||
const candidate = messages[index];
|
||||
if (!candidate || typeof candidate !== "object") {
|
||||
continue;
|
||||
}
|
||||
if ((candidate as { role?: unknown }).role === "assistant") {
|
||||
break;
|
||||
}
|
||||
for (const id of collectMatchingToolResultIds(candidate)) {
|
||||
ids.add(id);
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strips dangling tool_use blocks from assistant messages when the immediately
|
||||
* following user message does not contain a matching tool_result block.
|
||||
* Strips dangling tool-call blocks from assistant messages when no later
|
||||
* tool-result span before the next assistant turn resolves them.
|
||||
* This fixes the "tool_use ids found without tool_result blocks" error from Anthropic.
|
||||
*/
|
||||
function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[] {
|
||||
@@ -32,51 +117,42 @@ function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[
|
||||
const assistantMsg = msg as {
|
||||
content?: AnthropicContentBlock[];
|
||||
};
|
||||
|
||||
// Get the next message to check for tool_result blocks
|
||||
const nextMsg = messages[i + 1];
|
||||
const nextMsgRole =
|
||||
nextMsg && typeof nextMsg === "object"
|
||||
? ((nextMsg as { role?: unknown }).role as string | undefined)
|
||||
: undefined;
|
||||
|
||||
// If next message is not user, keep the assistant message as-is
|
||||
if (nextMsgRole !== "user") {
|
||||
const originalContent = Array.isArray(assistantMsg.content) ? assistantMsg.content : [];
|
||||
if (originalContent.length === 0) {
|
||||
result.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Collect tool_use_ids from the next user message's tool_result blocks
|
||||
const nextUserMsg = nextMsg as {
|
||||
content?: AnthropicContentBlock[];
|
||||
};
|
||||
const validToolUseIds = new Set<string>();
|
||||
if (Array.isArray(nextUserMsg.content)) {
|
||||
for (const block of nextUserMsg.content) {
|
||||
if (block && block.type === "toolResult" && block.toolUseId) {
|
||||
validToolUseIds.add(block.toolUseId);
|
||||
}
|
||||
}
|
||||
if (
|
||||
extractToolCallsFromAssistant(msg as Extract<AgentMessage, { role: "assistant" }>).length ===
|
||||
0
|
||||
) {
|
||||
result.push(msg);
|
||||
continue;
|
||||
}
|
||||
const validToolUseIds = collectFutureToolResultIds(messages, i);
|
||||
|
||||
// Filter out tool_use blocks that don't have matching tool_result
|
||||
const originalContent = Array.isArray(assistantMsg.content) ? assistantMsg.content : [];
|
||||
const filteredContent = originalContent.filter((block) => {
|
||||
if (!block) {
|
||||
return false;
|
||||
}
|
||||
if (block.type !== "toolUse") {
|
||||
if (!isToolCallBlock(block)) {
|
||||
return true;
|
||||
}
|
||||
// Keep tool_use if its id is in the valid set
|
||||
return validToolUseIds.has(block.id || "");
|
||||
const blockId = trimNonEmptyString(block.id);
|
||||
return blockId ? validToolUseIds.has(blockId) : false;
|
||||
});
|
||||
|
||||
// If all content would be removed, insert a minimal fallback text block
|
||||
if (filteredContent.length === originalContent.length) {
|
||||
result.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (originalContent.length > 0 && filteredContent.length === 0) {
|
||||
result.push({
|
||||
...assistantMsg,
|
||||
content: [{ type: "text", text: "[tool calls omitted]" }],
|
||||
content: isAbortedAssistantTurn(msg)
|
||||
? []
|
||||
: ([{ type: "text", text: "[tool calls omitted]" }] as AnthropicContentBlock[]),
|
||||
} as AgentMessage);
|
||||
} else {
|
||||
result.push({
|
||||
@@ -190,7 +266,7 @@ export function mergeConsecutiveUserTurns(
|
||||
* Also strips dangling tool_use blocks that lack corresponding tool_result blocks.
|
||||
*/
|
||||
export function validateAnthropicTurns(messages: AgentMessage[]): AgentMessage[] {
|
||||
// First, strip dangling tool_use blocks from assistant messages
|
||||
// First, strip dangling tool-call blocks from assistant messages.
|
||||
const stripped = stripDanglingAnthropicToolUses(messages);
|
||||
|
||||
return validateTurnsWithConsecutiveMerge({
|
||||
|
||||
@@ -31,6 +31,25 @@ export function sanitizeTextContent(text: string): string {
|
||||
);
|
||||
}
|
||||
|
||||
export function hasAssistantPhaseMetadata(message: unknown): boolean {
|
||||
if (!message || typeof message !== "object") {
|
||||
return false;
|
||||
}
|
||||
if ((message as { role?: unknown }).role !== "assistant") {
|
||||
return false;
|
||||
}
|
||||
const content = (message as { content?: unknown }).content;
|
||||
if (!Array.isArray(content)) {
|
||||
return false;
|
||||
}
|
||||
return content.some(
|
||||
(block) =>
|
||||
block &&
|
||||
typeof block === "object" &&
|
||||
typeof (block as { textSignature?: unknown }).textSignature === "string",
|
||||
);
|
||||
}
|
||||
|
||||
export function extractAssistantText(message: unknown): string | undefined {
|
||||
if (!message || typeof message !== "object") {
|
||||
return undefined;
|
||||
@@ -38,29 +57,26 @@ export function extractAssistantText(message: unknown): string | undefined {
|
||||
if ((message as { role?: unknown }).role !== "assistant") {
|
||||
return undefined;
|
||||
}
|
||||
const content = (message as { content?: unknown }).content;
|
||||
if (!Array.isArray(content)) {
|
||||
return undefined;
|
||||
if (hasAssistantPhaseMetadata(message)) {
|
||||
const visibleText = extractAssistantVisibleText(
|
||||
message as Parameters<typeof extractAssistantVisibleText>[0],
|
||||
);
|
||||
return visibleText?.trim() ? visibleText : undefined;
|
||||
}
|
||||
const hasPhaseMetadata = content.some(
|
||||
(block) =>
|
||||
block && typeof block === "object" && typeof (block as { textSignature?: unknown }).textSignature === "string",
|
||||
);
|
||||
const joined = hasPhaseMetadata
|
||||
? (extractAssistantVisibleText(message as Parameters<typeof extractAssistantVisibleText>[0]) ?? "")
|
||||
: (
|
||||
extractTextFromChatContent(content, {
|
||||
sanitizeText: sanitizeTextContent,
|
||||
joinWith: "",
|
||||
normalizeText: (text) => text.trim(),
|
||||
}) ?? ""
|
||||
const content = (message as { content?: unknown }).content;
|
||||
const joined = Array.isArray(content)
|
||||
? (extractTextFromChatContent(content, {
|
||||
sanitizeText: sanitizeTextContent,
|
||||
joinWith: "",
|
||||
normalizeText: (text) => text.trim(),
|
||||
}) ?? "")
|
||||
: sanitizeTextContent(
|
||||
extractAssistantVisibleText(message as Parameters<typeof extractAssistantVisibleText>[0]) ??
|
||||
"",
|
||||
);
|
||||
if (!joined.trim()) {
|
||||
return undefined;
|
||||
}
|
||||
if (hasPhaseMetadata) {
|
||||
return joined;
|
||||
}
|
||||
const stopReason = (message as { stopReason?: unknown }).stopReason;
|
||||
// Gate on stopReason only — a non-error response with a stale/background errorMessage
|
||||
// should not have its content rewritten with error templates (#13935).
|
||||
|
||||
@@ -1,50 +1,5 @@
|
||||
import { extractTextFromChatContent } from "../../shared/chat-content.js";
|
||||
import { sanitizeUserFacingText } from "../pi-embedded-helpers.js";
|
||||
import {
|
||||
stripDowngradedToolCallText,
|
||||
stripMinimaxToolCallXml,
|
||||
stripModelSpecialTokens,
|
||||
stripThinkingTagsFromText,
|
||||
} from "../pi-embedded-utils.js";
|
||||
|
||||
export function stripToolMessages(messages: unknown[]): unknown[] {
|
||||
return messages.filter((msg) => {
|
||||
if (!msg || typeof msg !== "object") {
|
||||
return true;
|
||||
}
|
||||
const role = (msg as { role?: unknown }).role;
|
||||
return role !== "toolResult" && role !== "tool";
|
||||
});
|
||||
}
|
||||
|
||||
export function sanitizeTextContent(text: string): string {
|
||||
if (!text) {
|
||||
return text;
|
||||
}
|
||||
return stripThinkingTagsFromText(
|
||||
stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))),
|
||||
);
|
||||
}
|
||||
|
||||
export function extractAssistantText(message: unknown): string | undefined {
|
||||
if (!message || typeof message !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
if ((message as { role?: unknown }).role !== "assistant") {
|
||||
return undefined;
|
||||
}
|
||||
const content = (message as { content?: unknown }).content;
|
||||
if (!Array.isArray(content)) {
|
||||
return undefined;
|
||||
}
|
||||
const joined =
|
||||
extractTextFromChatContent(content, {
|
||||
sanitizeText: sanitizeTextContent,
|
||||
joinWith: "",
|
||||
normalizeText: (text) => text.trim(),
|
||||
}) ?? "";
|
||||
const stopReason = (message as { stopReason?: unknown }).stopReason;
|
||||
const errorContext = stopReason === "error";
|
||||
|
||||
return joined ? sanitizeUserFacingText(joined, { errorContext }) : undefined;
|
||||
}
|
||||
export {
|
||||
extractAssistantText,
|
||||
sanitizeTextContent,
|
||||
stripToolMessages,
|
||||
} from "./chat-history-text.js";
|
||||
|
||||
@@ -218,6 +218,25 @@ describe("extractAssistantText", () => {
|
||||
};
|
||||
expect(extractAssistantText(message)).toBe("Handle payment required errors in your API.");
|
||||
});
|
||||
|
||||
it("prefers final_answer text when phased assistant history is present", () => {
|
||||
const message = {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: "internal reasoning",
|
||||
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
|
||||
},
|
||||
{
|
||||
type: "text",
|
||||
text: "Done.",
|
||||
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
|
||||
},
|
||||
],
|
||||
};
|
||||
expect(extractAssistantText(message)).toBe("Done.");
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveAnnounceTarget", () => {
|
||||
|
||||
@@ -1 +1,4 @@
|
||||
export { appendAssistantMessageToSessionTranscript } from "./transcript.js";
|
||||
export {
|
||||
appendAssistantMessageToSessionTranscript,
|
||||
appendExactAssistantMessageToSessionTranscript,
|
||||
} from "./transcript.js";
|
||||
|
||||
@@ -3,7 +3,10 @@ import { describe, expect, it, vi } from "vitest";
|
||||
import * as transcriptEvents from "../../sessions/transcript-events.js";
|
||||
import { resolveSessionTranscriptPathInDir } from "./paths.js";
|
||||
import { useTempSessionsFixture } from "./test-helpers.js";
|
||||
import { appendAssistantMessageToSessionTranscript } from "./transcript.js";
|
||||
import {
|
||||
appendAssistantMessageToSessionTranscript,
|
||||
appendExactAssistantMessageToSessionTranscript,
|
||||
} from "./transcript.js";
|
||||
|
||||
describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
const fixture = useTempSessionsFixture("transcript-test-");
|
||||
@@ -193,4 +196,79 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
const lines = fs.readFileSync(sessionFile, "utf-8").trim().split("\n");
|
||||
expect(lines.length).toBe(3);
|
||||
});
|
||||
|
||||
it("appends exact assistant transcript messages without rewriting phased content", async () => {
|
||||
writeTranscriptStore();
|
||||
|
||||
const result = await appendExactAssistantMessageToSessionTranscript({
|
||||
sessionKey,
|
||||
storePath: fixture.storePath(),
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: "internal reasoning",
|
||||
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
|
||||
},
|
||||
{
|
||||
type: "text",
|
||||
text: "Done.",
|
||||
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
|
||||
},
|
||||
],
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "delivery-mirror",
|
||||
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
|
||||
stopReason: "stop",
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
if (result.ok) {
|
||||
const lines = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n");
|
||||
const messageLine = JSON.parse(lines[1]);
|
||||
expect(messageLine.message.content).toEqual([
|
||||
{
|
||||
type: "text",
|
||||
text: "internal reasoning",
|
||||
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
|
||||
},
|
||||
{
|
||||
type: "text",
|
||||
text: "Done.",
|
||||
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
|
||||
},
|
||||
]);
|
||||
}
|
||||
});
|
||||
|
||||
it("can emit file-only transcript refresh events for exact assistant appends", async () => {
|
||||
writeTranscriptStore();
|
||||
const emitSpy = vi.spyOn(transcriptEvents, "emitSessionTranscriptUpdate");
|
||||
|
||||
const result = await appendExactAssistantMessageToSessionTranscript({
|
||||
sessionKey,
|
||||
storePath: fixture.storePath(),
|
||||
updateMode: "file-only",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Done." }],
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "delivery-mirror",
|
||||
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
|
||||
stopReason: "stop",
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
if (result.ok) {
|
||||
expect(emitSpy).toHaveBeenCalledWith(result.sessionFile);
|
||||
}
|
||||
emitSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -35,6 +35,16 @@ async function ensureSessionHeader(params: {
|
||||
});
|
||||
}
|
||||
|
||||
export type SessionTranscriptAppendResult =
|
||||
| { ok: true; sessionFile: string; messageId: string }
|
||||
| { ok: false; reason: string };
|
||||
|
||||
export type SessionTranscriptUpdateMode = "inline" | "file-only" | "none";
|
||||
|
||||
export type SessionTranscriptAssistantMessage = Parameters<SessionManager["appendMessage"]>[0] & {
|
||||
role: "assistant";
|
||||
};
|
||||
|
||||
export async function resolveSessionTranscriptFile(params: {
|
||||
sessionId: string;
|
||||
sessionKey: string;
|
||||
@@ -88,7 +98,8 @@ export async function appendAssistantMessageToSessionTranscript(params: {
|
||||
idempotencyKey?: string;
|
||||
/** Optional override for store path (mostly for tests). */
|
||||
storePath?: string;
|
||||
}): Promise<{ ok: true; sessionFile: string; messageId: string } | { ok: false; reason: string }> {
|
||||
updateMode?: SessionTranscriptUpdateMode;
|
||||
}): Promise<SessionTranscriptAppendResult> {
|
||||
const sessionKey = params.sessionKey.trim();
|
||||
if (!sessionKey) {
|
||||
return { ok: false, reason: "missing sessionKey" };
|
||||
@@ -102,6 +113,54 @@ export async function appendAssistantMessageToSessionTranscript(params: {
|
||||
return { ok: false, reason: "empty text" };
|
||||
}
|
||||
|
||||
return appendExactAssistantMessageToSessionTranscript({
|
||||
agentId: params.agentId,
|
||||
sessionKey,
|
||||
storePath: params.storePath,
|
||||
idempotencyKey: params.idempotencyKey,
|
||||
updateMode: params.updateMode,
|
||||
message: {
|
||||
role: "assistant" as const,
|
||||
content: [{ type: "text", text: mirrorText }],
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "delivery-mirror",
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
total: 0,
|
||||
},
|
||||
},
|
||||
stopReason: "stop" as const,
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
agentId?: string;
|
||||
sessionKey: string;
|
||||
message: SessionTranscriptAssistantMessage;
|
||||
idempotencyKey?: string;
|
||||
storePath?: string;
|
||||
updateMode?: SessionTranscriptUpdateMode;
|
||||
}): Promise<SessionTranscriptAppendResult> {
|
||||
const sessionKey = params.sessionKey.trim();
|
||||
if (!sessionKey) {
|
||||
return { ok: false, reason: "missing sessionKey" };
|
||||
}
|
||||
if (params.message.role !== "assistant") {
|
||||
return { ok: false, reason: "message role must be assistant" };
|
||||
}
|
||||
|
||||
const storePath = params.storePath ?? resolveDefaultSessionStorePath(params.agentId);
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const normalizedKey = normalizeStoreSessionKey(sessionKey);
|
||||
@@ -131,41 +190,33 @@ export async function appendAssistantMessageToSessionTranscript(params: {
|
||||
|
||||
await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId });
|
||||
|
||||
const existingMessageId = params.idempotencyKey
|
||||
? await transcriptHasIdempotencyKey(sessionFile, params.idempotencyKey)
|
||||
const explicitIdempotencyKey =
|
||||
params.idempotencyKey ??
|
||||
((params.message as { idempotencyKey?: unknown }).idempotencyKey as string | undefined);
|
||||
const existingMessageId = explicitIdempotencyKey
|
||||
? await transcriptHasIdempotencyKey(sessionFile, explicitIdempotencyKey)
|
||||
: undefined;
|
||||
if (existingMessageId) {
|
||||
return { ok: true, sessionFile, messageId: existingMessageId };
|
||||
}
|
||||
|
||||
const message = {
|
||||
role: "assistant" as const,
|
||||
content: [{ type: "text", text: mirrorText }],
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "delivery-mirror",
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
total: 0,
|
||||
},
|
||||
},
|
||||
stopReason: "stop" as const,
|
||||
timestamp: Date.now(),
|
||||
...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}),
|
||||
...params.message,
|
||||
...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}),
|
||||
} as Parameters<SessionManager["appendMessage"]>[0];
|
||||
const sessionManager = SessionManager.open(sessionFile);
|
||||
const messageId = sessionManager.appendMessage(message);
|
||||
|
||||
emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId });
|
||||
switch (params.updateMode ?? "inline") {
|
||||
case "inline":
|
||||
emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId });
|
||||
break;
|
||||
case "file-only":
|
||||
emitSessionTranscriptUpdate(sessionFile);
|
||||
break;
|
||||
case "none":
|
||||
break;
|
||||
}
|
||||
return { ok: true, sessionFile, messageId };
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,10 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import { resolveThinkingDefault } from "../../agents/model-selection.js";
|
||||
import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js";
|
||||
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
||||
import {
|
||||
extractAssistantText as extractAssistantHistoryText,
|
||||
hasAssistantPhaseMetadata,
|
||||
} from "../../agents/tools/chat-history-text.js";
|
||||
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
|
||||
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import type { MsgContext } from "../../auto-reply/templating.js";
|
||||
@@ -19,7 +23,6 @@ import { normalizeInputProvenance, type InputProvenance } from "../../sessions/i
|
||||
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
|
||||
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import { extractAssistantVisibleText } from "../../shared/chat-message-content.js";
|
||||
import {
|
||||
stripInlineDirectiveTagsForDisplay,
|
||||
stripInlineDirectiveTagsFromMessageForDisplay,
|
||||
@@ -660,18 +663,9 @@ function sanitizeChatHistoryMessage(
|
||||
} else if (Array.isArray(entry.content)) {
|
||||
const updated = entry.content.map((block) => sanitizeChatHistoryContentBlock(block, maxChars));
|
||||
const sanitizedBlocks = updated.map((item) => item.block);
|
||||
const hasPhaseMetadata =
|
||||
entry.role === "assistant" &&
|
||||
entry.content.some(
|
||||
(block) =>
|
||||
block &&
|
||||
typeof block === "object" &&
|
||||
typeof (block as { textSignature?: unknown }).textSignature === "string",
|
||||
);
|
||||
const hasPhaseMetadata = hasAssistantPhaseMetadata(entry);
|
||||
if (hasPhaseMetadata) {
|
||||
const stripped = stripInlineDirectiveTagsForDisplay(
|
||||
extractAssistantVisibleText(entry as Parameters<typeof extractAssistantVisibleText>[0]),
|
||||
);
|
||||
const stripped = stripInlineDirectiveTagsForDisplay(extractAssistantHistoryText(entry));
|
||||
const res = truncateChatHistoryText(stripped.text, maxChars);
|
||||
const nonTextBlocks = sanitizedBlocks.filter(
|
||||
(block) =>
|
||||
@@ -704,7 +698,7 @@ function sanitizeChatHistoryMessage(
|
||||
* dropping messages that carry real text alongside a stale `content: "NO_REPLY"`.
|
||||
*/
|
||||
function extractAssistantTextForSilentCheck(message: unknown): string | undefined {
|
||||
return extractAssistantVisibleText(message);
|
||||
return extractAssistantHistoryText(message);
|
||||
}
|
||||
|
||||
export function sanitizeChatHistoryMessages(messages: unknown[], maxChars: number): unknown[] {
|
||||
|
||||
166
src/gateway/session-history-state.ts
Normal file
166
src/gateway/session-history-state.ts
Normal file
@@ -0,0 +1,166 @@
|
||||
import {
|
||||
DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
|
||||
sanitizeChatHistoryMessages,
|
||||
} from "./server-methods/chat.js";
|
||||
import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js";
|
||||
|
||||
export type PaginatedSessionHistory = {
|
||||
items: unknown[];
|
||||
messages: unknown[];
|
||||
nextCursor?: string;
|
||||
hasMore: boolean;
|
||||
};
|
||||
|
||||
type SessionHistoryTranscriptTarget = {
|
||||
sessionId: string;
|
||||
storePath?: string;
|
||||
sessionFile?: string;
|
||||
};
|
||||
|
||||
function resolveCursorSeq(cursor: string | undefined): number | undefined {
|
||||
if (!cursor) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = cursor.startsWith("seq:") ? cursor.slice(4) : cursor;
|
||||
const value = Number.parseInt(normalized, 10);
|
||||
return Number.isFinite(value) && value > 0 ? value : undefined;
|
||||
}
|
||||
|
||||
export function resolveMessageSeq(message: unknown): number | undefined {
|
||||
if (!message || typeof message !== "object" || Array.isArray(message)) {
|
||||
return undefined;
|
||||
}
|
||||
const meta = (message as { __openclaw?: unknown }).__openclaw;
|
||||
if (!meta || typeof meta !== "object" || Array.isArray(meta)) {
|
||||
return undefined;
|
||||
}
|
||||
const seq = (meta as { seq?: unknown }).seq;
|
||||
return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined;
|
||||
}
|
||||
|
||||
export function paginateSessionMessages(
|
||||
messages: unknown[],
|
||||
limit: number | undefined,
|
||||
cursor: string | undefined,
|
||||
): PaginatedSessionHistory {
|
||||
const cursorSeq = resolveCursorSeq(cursor);
|
||||
let endExclusive = messages.length;
|
||||
if (typeof cursorSeq === "number") {
|
||||
endExclusive = messages.findIndex((message, index) => {
|
||||
const seq = resolveMessageSeq(message);
|
||||
if (typeof seq === "number") {
|
||||
return seq >= cursorSeq;
|
||||
}
|
||||
return index + 1 >= cursorSeq;
|
||||
});
|
||||
if (endExclusive < 0) {
|
||||
endExclusive = messages.length;
|
||||
}
|
||||
}
|
||||
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
|
||||
const items = messages.slice(start, endExclusive);
|
||||
const firstSeq = resolveMessageSeq(items[0]);
|
||||
return {
|
||||
items,
|
||||
messages: items,
|
||||
hasMore: start > 0,
|
||||
...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function sanitizeRawTranscriptMessages(params: {
|
||||
rawMessages: unknown[];
|
||||
maxChars?: number;
|
||||
limit?: number;
|
||||
cursor?: string;
|
||||
}): PaginatedSessionHistory {
|
||||
return paginateSessionMessages(
|
||||
sanitizeChatHistoryMessages(
|
||||
params.rawMessages,
|
||||
params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
|
||||
),
|
||||
params.limit,
|
||||
params.cursor,
|
||||
);
|
||||
}
|
||||
|
||||
export class SessionHistorySseState {
|
||||
private readonly target: SessionHistoryTranscriptTarget;
|
||||
private readonly maxChars: number;
|
||||
private readonly limit: number | undefined;
|
||||
private readonly cursor: string | undefined;
|
||||
private sentHistory: PaginatedSessionHistory;
|
||||
private rawTranscriptSeq: number;
|
||||
|
||||
constructor(params: {
|
||||
target: SessionHistoryTranscriptTarget;
|
||||
maxChars?: number;
|
||||
limit?: number;
|
||||
cursor?: string;
|
||||
}) {
|
||||
this.target = params.target;
|
||||
this.maxChars = params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
|
||||
this.limit = params.limit;
|
||||
this.cursor = params.cursor;
|
||||
const rawMessages = this.readRawMessages();
|
||||
this.sentHistory = sanitizeRawTranscriptMessages({
|
||||
rawMessages,
|
||||
maxChars: this.maxChars,
|
||||
limit: this.limit,
|
||||
cursor: this.cursor,
|
||||
});
|
||||
this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
|
||||
}
|
||||
|
||||
snapshot(): PaginatedSessionHistory {
|
||||
return this.sentHistory;
|
||||
}
|
||||
|
||||
appendInlineMessage(update: {
|
||||
message: unknown;
|
||||
messageId?: string;
|
||||
}): { message: unknown; messageSeq?: number } | null {
|
||||
if (this.limit !== undefined || this.cursor !== undefined) {
|
||||
return null;
|
||||
}
|
||||
this.rawTranscriptSeq += 1;
|
||||
const nextMessage = attachOpenClawTranscriptMeta(update.message, {
|
||||
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
|
||||
seq: this.rawTranscriptSeq,
|
||||
});
|
||||
const sanitized = sanitizeChatHistoryMessages([nextMessage], this.maxChars);
|
||||
if (sanitized.length === 0) {
|
||||
return null;
|
||||
}
|
||||
const sanitizedMessage = sanitized[0];
|
||||
this.sentHistory = {
|
||||
items: [...this.sentHistory.items, sanitizedMessage],
|
||||
messages: [...this.sentHistory.items, sanitizedMessage],
|
||||
hasMore: false,
|
||||
};
|
||||
return {
|
||||
message: sanitizedMessage,
|
||||
messageSeq: resolveMessageSeq(sanitizedMessage),
|
||||
};
|
||||
}
|
||||
|
||||
refresh(): PaginatedSessionHistory {
|
||||
const rawMessages = this.readRawMessages();
|
||||
this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
|
||||
this.sentHistory = sanitizeRawTranscriptMessages({
|
||||
rawMessages,
|
||||
maxChars: this.maxChars,
|
||||
limit: this.limit,
|
||||
cursor: this.cursor,
|
||||
});
|
||||
return this.sentHistory;
|
||||
}
|
||||
|
||||
private readRawMessages(): unknown[] {
|
||||
return readSessionMessages(
|
||||
this.target.sessionId,
|
||||
this.target.storePath,
|
||||
this.target.sessionFile,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,11 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { afterEach, describe, expect, test } from "vitest";
|
||||
import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js";
|
||||
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import {
|
||||
appendAssistantMessageToSessionTranscript,
|
||||
appendExactAssistantMessageToSessionTranscript,
|
||||
} from "../config/sessions/transcript.js";
|
||||
import { testState } from "./test-helpers.runtime-state.js";
|
||||
import {
|
||||
connectReq,
|
||||
@@ -85,25 +86,23 @@ function makeTranscriptAssistantMessage(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function appendTranscriptMessage(params: {
|
||||
sessionFile: string;
|
||||
async function appendTranscriptMessage(params: {
|
||||
sessionKey: string;
|
||||
message: ReturnType<typeof makeTranscriptAssistantMessage>;
|
||||
emitInlineMessage?: boolean;
|
||||
}): string {
|
||||
const sessionManager = SessionManager.open(params.sessionFile);
|
||||
const messageId = sessionManager.appendMessage(params.message);
|
||||
emitSessionTranscriptUpdate(
|
||||
params.emitInlineMessage === false
|
||||
? params.sessionFile
|
||||
: {
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
message: params.message,
|
||||
messageId,
|
||||
},
|
||||
);
|
||||
return messageId;
|
||||
storePath?: string;
|
||||
}): Promise<string> {
|
||||
const appended = await appendExactAssistantMessageToSessionTranscript({
|
||||
sessionKey: params.sessionKey,
|
||||
storePath: params.storePath ?? testState.sessionStorePath,
|
||||
updateMode: params.emitInlineMessage === false ? "file-only" : "inline",
|
||||
message: params.message,
|
||||
});
|
||||
expect(appended.ok).toBe(true);
|
||||
if (!appended.ok) {
|
||||
throw new Error(`append failed: ${appended.reason}`);
|
||||
}
|
||||
return appended.messageId;
|
||||
}
|
||||
|
||||
async function fetchSessionHistory(
|
||||
@@ -404,9 +403,9 @@ describe("session history HTTP endpoints", () => {
|
||||
if (!hidden.ok) {
|
||||
throw new Error(`append failed: ${hidden.reason}`);
|
||||
}
|
||||
const visibleMessageId = appendTranscriptMessage({
|
||||
sessionFile: hidden.sessionFile,
|
||||
const visibleMessageId = await appendTranscriptMessage({
|
||||
sessionKey: "agent:main:main",
|
||||
storePath,
|
||||
message: makeTranscriptAssistantMessage({
|
||||
text: "Done.",
|
||||
content: [
|
||||
@@ -582,9 +581,9 @@ describe("session history HTTP endpoints", () => {
|
||||
if (!second.ok) {
|
||||
throw new Error(`append failed: ${second.reason}`);
|
||||
}
|
||||
appendTranscriptMessage({
|
||||
sessionFile: second.sessionFile,
|
||||
await appendTranscriptMessage({
|
||||
sessionKey: "agent:main:main",
|
||||
storePath,
|
||||
message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }),
|
||||
emitInlineMessage: false,
|
||||
});
|
||||
|
||||
@@ -22,8 +22,8 @@ import {
|
||||
DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
|
||||
sanitizeChatHistoryMessages,
|
||||
} from "./server-methods/chat.js";
|
||||
import { paginateSessionMessages, SessionHistorySseState } from "./session-history-state.js";
|
||||
import {
|
||||
attachOpenClawTranscriptMeta,
|
||||
readSessionMessages,
|
||||
resolveFreshestSessionEntryFromStoreKeys,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
@@ -72,64 +72,6 @@ function resolveCursor(req: IncomingMessage): string | undefined {
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
type PaginatedSessionHistory = {
|
||||
items: unknown[];
|
||||
messages: unknown[];
|
||||
nextCursor?: string;
|
||||
hasMore: boolean;
|
||||
};
|
||||
|
||||
function resolveCursorSeq(cursor: string | undefined): number | undefined {
|
||||
if (!cursor) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = cursor.startsWith("seq:") ? cursor.slice(4) : cursor;
|
||||
const value = Number.parseInt(normalized, 10);
|
||||
return Number.isFinite(value) && value > 0 ? value : undefined;
|
||||
}
|
||||
|
||||
function resolveMessageSeq(message: unknown): number | undefined {
|
||||
if (!message || typeof message !== "object" || Array.isArray(message)) {
|
||||
return undefined;
|
||||
}
|
||||
const meta = (message as { __openclaw?: unknown }).__openclaw;
|
||||
if (!meta || typeof meta !== "object" || Array.isArray(meta)) {
|
||||
return undefined;
|
||||
}
|
||||
const seq = (meta as { seq?: unknown }).seq;
|
||||
return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined;
|
||||
}
|
||||
|
||||
function paginateSessionMessages(
|
||||
messages: unknown[],
|
||||
limit: number | undefined,
|
||||
cursor: string | undefined,
|
||||
): PaginatedSessionHistory {
|
||||
const cursorSeq = resolveCursorSeq(cursor);
|
||||
let endExclusive = messages.length;
|
||||
if (typeof cursorSeq === "number") {
|
||||
endExclusive = messages.findIndex((message, index) => {
|
||||
const seq = resolveMessageSeq(message);
|
||||
if (typeof seq === "number") {
|
||||
return seq >= cursorSeq;
|
||||
}
|
||||
return index + 1 >= cursorSeq;
|
||||
});
|
||||
if (endExclusive < 0) {
|
||||
endExclusive = messages.length;
|
||||
}
|
||||
}
|
||||
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
|
||||
const items = messages.slice(start, endExclusive);
|
||||
const firstSeq = resolveMessageSeq(items[0]);
|
||||
return {
|
||||
items,
|
||||
messages: items,
|
||||
hasMore: start > 0,
|
||||
...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function canonicalizePath(value: string | undefined): string | undefined {
|
||||
const trimmed = value?.trim();
|
||||
if (!trimmed) {
|
||||
@@ -248,13 +190,17 @@ export async function handleSessionHistoryHttpRequest(
|
||||
: new Set<string>();
|
||||
|
||||
let sentHistory = history;
|
||||
// Initialize rawTranscriptSeq from the raw transcript's last __openclaw.seq
|
||||
// value, not the sanitized history tail, so seq numbering stays correct even
|
||||
// when sanitization drops messages (e.g. silent replies).
|
||||
const rawMessages = entry?.sessionId
|
||||
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
|
||||
: [];
|
||||
let rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
|
||||
const sseState = new SessionHistorySseState({
|
||||
target: {
|
||||
sessionId: entry.sessionId,
|
||||
storePath: target.storePath,
|
||||
sessionFile: entry.sessionFile,
|
||||
},
|
||||
maxChars: effectiveMaxChars,
|
||||
limit,
|
||||
cursor,
|
||||
});
|
||||
sentHistory = sseState.snapshot();
|
||||
setSseHeaders(res);
|
||||
res.write("retry: 1000\n\n");
|
||||
sseWrite(res, "history", {
|
||||
@@ -277,48 +223,25 @@ export async function handleSessionHistoryHttpRequest(
|
||||
return;
|
||||
}
|
||||
if (update.message !== undefined) {
|
||||
rawTranscriptSeq += 1;
|
||||
const nextMessage = attachOpenClawTranscriptMeta(update.message, {
|
||||
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
|
||||
seq: rawTranscriptSeq,
|
||||
});
|
||||
if (limit === undefined && cursor === undefined) {
|
||||
const sanitized = sanitizeChatHistoryMessages([nextMessage], effectiveMaxChars);
|
||||
if (sanitized.length === 0) {
|
||||
const nextEvent = sseState.appendInlineMessage({
|
||||
message: update.message,
|
||||
messageId: update.messageId,
|
||||
});
|
||||
if (!nextEvent) {
|
||||
return;
|
||||
}
|
||||
const sanitizedMsg = sanitized[0];
|
||||
sentHistory = {
|
||||
items: [...sentHistory.items, sanitizedMsg],
|
||||
messages: [...sentHistory.items, sanitizedMsg],
|
||||
hasMore: false,
|
||||
};
|
||||
sentHistory = sseState.snapshot();
|
||||
sseWrite(res, "message", {
|
||||
sessionKey: target.canonicalKey,
|
||||
message: sanitizedMsg,
|
||||
message: nextEvent.message,
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
messageSeq: resolveMessageSeq(sanitizedMsg),
|
||||
messageSeq: nextEvent.messageSeq,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Transcript-only updates can advance the raw transcript without carrying
|
||||
// an inline message payload. Resync the raw seq counter before the next
|
||||
// fast-path append so later messageSeq values stay monotonic.
|
||||
const refreshedRawMessages = readSessionMessages(
|
||||
entry.sessionId,
|
||||
target.storePath,
|
||||
entry.sessionFile,
|
||||
);
|
||||
rawTranscriptSeq =
|
||||
resolveMessageSeq(refreshedRawMessages.at(-1)) ?? refreshedRawMessages.length;
|
||||
// Bounded SSE history refreshes: apply sanitizeChatHistoryMessages before
|
||||
// pagination, consistent with the unbounded path.
|
||||
sentHistory = paginateSessionMessages(
|
||||
sanitizeChatHistoryMessages(refreshedRawMessages, effectiveMaxChars),
|
||||
limit,
|
||||
cursor,
|
||||
);
|
||||
sentHistory = sseState.refresh();
|
||||
sseWrite(res, "history", {
|
||||
sessionKey: target.canonicalKey,
|
||||
...sentHistory,
|
||||
|
||||
Reference in New Issue
Block a user