fix(gateway): unify session history snapshots

This commit is contained in:
Peter Steinberger
2026-04-06 15:26:26 +01:00
parent 8838fdc916
commit 9e0d632928
4 changed files with 180 additions and 53 deletions

View File

@@ -1,5 +1,5 @@
import { describe, expect, test, vi } from "vitest";
import { SessionHistorySseState } from "./session-history-state.js";
import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js";
import * as sessionUtils from "./session-utils.js";
describe("SessionHistorySseState", () => {
@@ -12,9 +12,9 @@ describe("SessionHistorySseState", () => {
},
]);
try {
const state = new SessionHistorySseState({
const state = SessionHistorySseState.fromRawSnapshot({
target: { sessionId: "sess-main" },
initialRawMessages: [
rawMessages: [
{
role: "assistant",
content: [{ type: "text", text: "fresh snapshot message" }],
@@ -53,4 +53,26 @@ describe("SessionHistorySseState", () => {
readSpy.mockRestore();
}
});
test("reuses one canonical array for items and messages", () => {
const snapshot = buildSessionHistorySnapshot({
rawMessages: [
{
role: "assistant",
content: [{ type: "text", text: "first" }],
__openclaw: { seq: 1 },
},
{
role: "assistant",
content: [{ type: "text", text: "second" }],
__openclaw: { seq: 2 },
},
],
limit: 1,
});
expect(snapshot.history.items).toBe(snapshot.history.messages);
expect(snapshot.history.messages[0]?.__openclaw?.seq).toBe(2);
expect(snapshot.rawTranscriptSeq).toBe(2);
});
});

View File

@@ -4,13 +4,26 @@ import {
} from "./server-methods/chat.js";
import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js";
type SessionHistoryTranscriptMeta = {
seq?: number;
};
export type SessionHistoryMessage = Record<string, unknown> & {
__openclaw?: SessionHistoryTranscriptMeta;
};
export type PaginatedSessionHistory = {
items: unknown[];
messages: unknown[];
items: SessionHistoryMessage[];
messages: SessionHistoryMessage[];
nextCursor?: string;
hasMore: boolean;
};
export type SessionHistorySnapshot = {
history: PaginatedSessionHistory;
rawTranscriptSeq: number;
};
type SessionHistoryTranscriptTarget = {
sessionId: string;
storePath?: string;
@@ -26,20 +39,33 @@ function resolveCursorSeq(cursor: string | undefined): number | undefined {
return Number.isFinite(value) && value > 0 ? value : undefined;
}
export function resolveMessageSeq(message: unknown): number | undefined {
if (!message || typeof message !== "object" || Array.isArray(message)) {
return undefined;
}
const meta = (message as { __openclaw?: unknown }).__openclaw;
if (!meta || typeof meta !== "object" || Array.isArray(meta)) {
return undefined;
}
const seq = (meta as { seq?: unknown }).seq;
function toSessionHistoryMessages(messages: unknown[]): SessionHistoryMessage[] {
return messages.filter(
(message): message is SessionHistoryMessage =>
Boolean(message) && typeof message === "object" && !Array.isArray(message),
);
}
function buildPaginatedSessionHistory(params: {
messages: SessionHistoryMessage[];
hasMore: boolean;
nextCursor?: string;
}): PaginatedSessionHistory {
return {
items: params.messages,
messages: params.messages,
hasMore: params.hasMore,
...(params.nextCursor ? { nextCursor: params.nextCursor } : {}),
};
}
export function resolveMessageSeq(message: SessionHistoryMessage | undefined): number | undefined {
const seq = message?.__openclaw?.seq;
return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined;
}
export function paginateSessionMessages(
messages: unknown[],
messages: SessionHistoryMessage[],
limit: number | undefined,
cursor: string | undefined,
): PaginatedSessionHistory {
@@ -58,30 +84,36 @@ export function paginateSessionMessages(
}
}
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
const items = messages.slice(start, endExclusive);
const firstSeq = resolveMessageSeq(items[0]);
return {
items,
messages: items,
const paginatedMessages = messages.slice(start, endExclusive);
const firstSeq = resolveMessageSeq(paginatedMessages[0]);
return buildPaginatedSessionHistory({
messages: paginatedMessages,
hasMore: start > 0,
...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}),
};
});
}
function sanitizeRawTranscriptMessages(params: {
export function buildSessionHistorySnapshot(params: {
rawMessages: unknown[];
maxChars?: number;
limit?: number;
cursor?: string;
}): PaginatedSessionHistory {
return paginateSessionMessages(
sanitizeChatHistoryMessages(
params.rawMessages,
params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
}): SessionHistorySnapshot {
const history = paginateSessionMessages(
toSessionHistoryMessages(
sanitizeChatHistoryMessages(
params.rawMessages,
params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
),
),
params.limit,
params.cursor,
);
const rawHistoryMessages = toSessionHistoryMessages(params.rawMessages);
return {
history,
rawTranscriptSeq: resolveMessageSeq(rawHistoryMessages.at(-1)) ?? rawHistoryMessages.length,
};
}
export class SessionHistorySseState {
@@ -92,6 +124,22 @@ export class SessionHistorySseState {
private sentHistory: PaginatedSessionHistory;
private rawTranscriptSeq: number;
static fromRawSnapshot(params: {
target: SessionHistoryTranscriptTarget;
rawMessages: unknown[];
maxChars?: number;
limit?: number;
cursor?: string;
}): SessionHistorySseState {
return new SessionHistorySseState({
target: params.target,
maxChars: params.maxChars,
limit: params.limit,
cursor: params.cursor,
initialRawMessages: params.rawMessages,
});
}
constructor(params: {
target: SessionHistoryTranscriptTarget;
maxChars?: number;
@@ -104,13 +152,14 @@ export class SessionHistorySseState {
this.limit = params.limit;
this.cursor = params.cursor;
const rawMessages = params.initialRawMessages ?? this.readRawMessages();
this.sentHistory = sanitizeRawTranscriptMessages({
const snapshot = buildSessionHistorySnapshot({
rawMessages,
maxChars: this.maxChars,
limit: this.limit,
cursor: this.cursor,
});
this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
this.sentHistory = snapshot.history;
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
}
snapshot(): PaginatedSessionHistory {
@@ -133,12 +182,15 @@ export class SessionHistorySseState {
if (sanitized.length === 0) {
return null;
}
const sanitizedMessage = sanitized[0];
this.sentHistory = {
items: [...this.sentHistory.items, sanitizedMessage],
messages: [...this.sentHistory.items, sanitizedMessage],
const [sanitizedMessage] = toSessionHistoryMessages(sanitized);
if (!sanitizedMessage) {
return null;
}
const nextMessages = [...this.sentHistory.messages, sanitizedMessage];
this.sentHistory = buildPaginatedSessionHistory({
messages: nextMessages,
hasMore: false,
};
});
return {
message: sanitizedMessage,
messageSeq: resolveMessageSeq(sanitizedMessage),
@@ -146,15 +198,15 @@ export class SessionHistorySseState {
}
refresh(): PaginatedSessionHistory {
const rawMessages = this.readRawMessages();
this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
this.sentHistory = sanitizeRawTranscriptMessages({
rawMessages,
const snapshot = buildSessionHistorySnapshot({
rawMessages: this.readRawMessages(),
maxChars: this.maxChars,
limit: this.limit,
cursor: this.cursor,
});
return this.sentHistory;
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
this.sentHistory = snapshot.history;
return snapshot.history;
}
private readRawMessages(): unknown[] {

View File

@@ -13,6 +13,7 @@ import {
createGatewaySuiteHarness,
installGatewayTestHooks,
rpcReq,
startServerWithClient,
writeSessionStore,
} from "./test-helpers.server.js";
@@ -33,6 +34,10 @@ async function createSessionStoreFile(): Promise<string> {
cleanupDirs.push(dir);
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
await writeSessionStore({
entries: {},
storePath,
});
return storePath;
}
@@ -504,6 +509,51 @@ describe("session history HTTP endpoints", () => {
});
});
test("seeds SSE raw sequence state from startup snapshots, not only visible history", async () => {
const { storePath } = await seedSession({ text: "first message" });
await appendTranscriptMessage({
sessionKey: "agent:main:main",
storePath,
message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }),
emitInlineMessage: false,
});
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect(
(
historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }
).messages?.map((message) => message.content?.[0]?.text),
).toEqual(["first message"]);
const visible = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "third visible message",
storePath,
});
expect(visible.ok).toBe(true);
const messageEvent = await readSseEvent(reader!, streamState);
expect(messageEvent.event).toBe("message");
expect(
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("third visible message");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3);
await reader?.cancel();
});
});
test("suppresses NO_REPLY-only SSE fast-path updates while preserving raw sequence numbering", async () => {
const { storePath } = await seedSession({ text: "first message" });
@@ -629,8 +679,8 @@ describe("session history HTTP endpoints", () => {
test("rejects session history when operator.read is not requested", async () => {
await seedSession({ text: "scope-guarded history" });
const harness = await createGatewaySuiteHarness();
const ws = await harness.openWs();
const started = await startServerWithClient("test-gateway-token-1234567890");
const { server, ws, port, envSnapshot } = started;
try {
const connect = await connectReq(ws, {
token: "test-gateway-token-1234567890",
@@ -646,7 +696,7 @@ describe("session history HTTP endpoints", () => {
expect(wsHistory.error?.message).toBe("missing scope: operator.read");
const httpHistory = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
`http://127.0.0.1:${port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
{
headers: {
...AUTH_HEADER,
@@ -664,7 +714,7 @@ describe("session history HTTP endpoints", () => {
});
const httpHistoryWithoutScopes = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
`http://127.0.0.1:${port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
{
headers: AUTH_HEADER,
},
@@ -679,7 +729,8 @@ describe("session history HTTP endpoints", () => {
});
} finally {
ws.close();
await harness.close();
await server.close();
envSnapshot.restore();
}
});
});

View File

@@ -18,11 +18,8 @@ import {
resolveTrustedHttpOperatorScopes,
} from "./http-utils.js";
import { authorizeOperatorScopesForMethod } from "./method-scopes.js";
import {
DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
sanitizeChatHistoryMessages,
} from "./server-methods/chat.js";
import { paginateSessionMessages, SessionHistorySseState } from "./session-history-state.js";
import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS } from "./server-methods/chat.js";
import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js";
import {
readSessionMessages,
resolveFreshestSessionEntryFromStoreKeys,
@@ -166,8 +163,13 @@ export async function handleSessionHistoryHttpRequest(
const rawSnapshot = entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: [];
const sanitizedMessages = sanitizeChatHistoryMessages(rawSnapshot, effectiveMaxChars);
const history = paginateSessionMessages(sanitizedMessages, limit, cursor);
const historySnapshot = buildSessionHistorySnapshot({
rawMessages: rawSnapshot,
maxChars: effectiveMaxChars,
limit,
cursor,
});
const history = historySnapshot.history;
if (!shouldStreamSse(req)) {
sendJson(res, 200, {
@@ -191,16 +193,16 @@ export async function handleSessionHistoryHttpRequest(
: new Set<string>();
let sentHistory = history;
const sseState = new SessionHistorySseState({
const sseState = SessionHistorySseState.fromRawSnapshot({
target: {
sessionId: entry.sessionId,
storePath: target.storePath,
sessionFile: entry.sessionFile,
},
rawMessages: rawSnapshot,
maxChars: effectiveMaxChars,
limit,
cursor,
initialRawMessages: rawSnapshot,
});
sentHistory = sseState.snapshot();
setSseHeaders(res);