fix: sanitize SSE history fast path and preserve cursor paging

This commit is contained in:
Eva
2026-04-06 02:09:44 +07:00
committed by Peter Steinberger
parent e7311334cb
commit dea515e833
4 changed files with 104 additions and 13 deletions

View File

@@ -1,4 +1,3 @@
import { extractTextFromChatContent } from "../../shared/chat-content.js";
import { sanitizeUserFacingText } from "../pi-embedded-helpers.js";
import {
stripDowngradedToolCallText,

View File

@@ -684,7 +684,7 @@ function extractAssistantTextForSilentCheck(message: unknown): string | undefine
return extractAssistantVisibleText(message);
}
function sanitizeChatHistoryMessages(messages: unknown[], maxChars: number): unknown[] {
export function sanitizeChatHistoryMessages(messages: unknown[], maxChars: number): unknown[] {
if (messages.length === 0) {
return messages;
}

View File

@@ -329,6 +329,81 @@ describe("session history HTTP endpoints", () => {
});
});
test("sanitizes unbounded SSE push updates before emitting them", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
storePath,
});
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect((historyEvent.data as { messages?: unknown[] }).messages ?? []).toHaveLength(0);
const hidden = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "NO_REPLY",
storePath,
});
expect(hidden.ok).toBe(true);
const visible = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: JSON.stringify({
role: "assistant",
content: [
{
type: "text",
text: "internal reasoning",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
}),
storePath,
});
expect(visible.ok).toBe(true);
const messageEvent = await readSseEvent(reader!, streamState);
expect(messageEvent.event).toBe("message");
expect(
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message?.content?.[0]
?.text,
).toBe("Done.");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(2);
expect(
(
messageEvent.data as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: visible.ok ? visible.messageId : undefined,
seq: 2,
});
await reader?.cancel();
});
});
test("streams session history updates over SSE", async () => {
const { storePath } = await seedSession({ text: "first message" });

View File

@@ -103,10 +103,15 @@ function paginateSessionMessages(
const cursorSeq = resolveCursorSeq(cursor);
const endExclusive =
typeof cursorSeq === "number"
? Math.max(0, Math.min(messages.length, cursorSeq - 1))
: messages.length;
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
const items = messages.slice(start, endExclusive);
? messages.findIndex((message) => {
const seq = resolveMessageSeq(message);
return typeof seq === "number" && seq >= cursorSeq;
})
: -1;
const boundedEndExclusive = endExclusive >= 0 ? endExclusive : messages.length;
const start =
typeof limit === "number" && limit > 0 ? Math.max(0, boundedEndExclusive - limit) : 0;
const items = messages.slice(start, boundedEndExclusive);
const firstSeq = resolveMessageSeq(items[0]);
return {
items,
@@ -200,10 +205,17 @@ export async function handleSessionHistoryHttpRequest(
}
const limit = resolveLimit(req);
const cursor = resolveCursor(req);
const effectiveMaxChars =
typeof cfg.gateway?.webchat?.chatHistoryMaxChars === "number"
? cfg.gateway.webchat.chatHistoryMaxChars
: DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
const history = paginateSessionMessages(
entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: [],
sanitizeChatHistoryMessages(
entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: [],
effectiveMaxChars,
),
limit,
cursor,
);
@@ -261,16 +273,21 @@ export async function handleSessionHistoryHttpRequest(
: readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile).length,
});
if (limit === undefined && cursor === undefined) {
const sanitized = sanitizeChatHistoryMessages([nextMessage], effectiveMaxChars);
if (sanitized.length === 0) {
return;
}
const sanitizedMessage = sanitized[0];
sentHistory = {
items: [...sentHistory.items, nextMessage],
messages: [...sentHistory.items, nextMessage],
items: [...sentHistory.items, sanitizedMessage],
messages: [...sentHistory.items, sanitizedMessage],
hasMore: false,
};
sseWrite(res, "message", {
sessionKey: target.canonicalKey,
message: nextMessage,
message: sanitizedMessage,
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
messageSeq: resolveMessageSeq(nextMessage),
messageSeq: resolveMessageSeq(sanitizedMessage),
});
return;
}