From 9e0d6329283f39b07909085f93d3b730fd445a97 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 6 Apr 2026 15:26:26 +0100 Subject: [PATCH] fix(gateway): unify session history snapshots --- src/gateway/session-history-state.test.ts | 28 ++++- src/gateway/session-history-state.ts | 124 +++++++++++++++------- src/gateway/sessions-history-http.test.ts | 61 ++++++++++- src/gateway/sessions-history-http.ts | 20 ++-- 4 files changed, 180 insertions(+), 53 deletions(-) diff --git a/src/gateway/session-history-state.test.ts b/src/gateway/session-history-state.test.ts index 9471b4daa51..844dbb6546f 100644 --- a/src/gateway/session-history-state.test.ts +++ b/src/gateway/session-history-state.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test, vi } from "vitest"; -import { SessionHistorySseState } from "./session-history-state.js"; +import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js"; import * as sessionUtils from "./session-utils.js"; describe("SessionHistorySseState", () => { @@ -12,9 +12,9 @@ describe("SessionHistorySseState", () => { }, ]); try { - const state = new SessionHistorySseState({ + const state = SessionHistorySseState.fromRawSnapshot({ target: { sessionId: "sess-main" }, - initialRawMessages: [ + rawMessages: [ { role: "assistant", content: [{ type: "text", text: "fresh snapshot message" }], @@ -53,4 +53,26 @@ describe("SessionHistorySseState", () => { readSpy.mockRestore(); } }); + + test("reuses one canonical array for items and messages", () => { + const snapshot = buildSessionHistorySnapshot({ + rawMessages: [ + { + role: "assistant", + content: [{ type: "text", text: "first" }], + __openclaw: { seq: 1 }, + }, + { + role: "assistant", + content: [{ type: "text", text: "second" }], + __openclaw: { seq: 2 }, + }, + ], + limit: 1, + }); + + expect(snapshot.history.items).toBe(snapshot.history.messages); + expect(snapshot.history.messages[0]?.__openclaw?.seq).toBe(2); + expect(snapshot.rawTranscriptSeq).toBe(2); + }); }); diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts index 2bf830a54bf..3c85f3ae1f1 100644 --- a/src/gateway/session-history-state.ts +++ b/src/gateway/session-history-state.ts @@ -4,13 +4,26 @@ import { } from "./server-methods/chat.js"; import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js"; +type SessionHistoryTranscriptMeta = { + seq?: number; +}; + +export type SessionHistoryMessage = Record & { + __openclaw?: SessionHistoryTranscriptMeta; +}; + export type PaginatedSessionHistory = { - items: unknown[]; - messages: unknown[]; + items: SessionHistoryMessage[]; + messages: SessionHistoryMessage[]; nextCursor?: string; hasMore: boolean; }; +export type SessionHistorySnapshot = { + history: PaginatedSessionHistory; + rawTranscriptSeq: number; +}; + type SessionHistoryTranscriptTarget = { sessionId: string; storePath?: string; @@ -26,20 +39,33 @@ function resolveCursorSeq(cursor: string | undefined): number | undefined { return Number.isFinite(value) && value > 0 ? value : undefined; } -export function resolveMessageSeq(message: unknown): number | undefined { - if (!message || typeof message !== "object" || Array.isArray(message)) { - return undefined; - } - const meta = (message as { __openclaw?: unknown }).__openclaw; - if (!meta || typeof meta !== "object" || Array.isArray(meta)) { - return undefined; - } - const seq = (meta as { seq?: unknown }).seq; +function toSessionHistoryMessages(messages: unknown[]): SessionHistoryMessage[] { + return messages.filter( + (message): message is SessionHistoryMessage => + Boolean(message) && typeof message === "object" && !Array.isArray(message), + ); +} + +function buildPaginatedSessionHistory(params: { + messages: SessionHistoryMessage[]; + hasMore: boolean; + nextCursor?: string; +}): PaginatedSessionHistory { + return { + items: params.messages, + messages: params.messages, + hasMore: params.hasMore, + ...(params.nextCursor ? { nextCursor: params.nextCursor } : {}), + }; +} + +export function resolveMessageSeq(message: SessionHistoryMessage | undefined): number | undefined { + const seq = message?.__openclaw?.seq; return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined; } export function paginateSessionMessages( - messages: unknown[], + messages: SessionHistoryMessage[], limit: number | undefined, cursor: string | undefined, ): PaginatedSessionHistory { @@ -58,30 +84,36 @@ export function paginateSessionMessages( } } const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0; - const items = messages.slice(start, endExclusive); - const firstSeq = resolveMessageSeq(items[0]); - return { - items, - messages: items, + const paginatedMessages = messages.slice(start, endExclusive); + const firstSeq = resolveMessageSeq(paginatedMessages[0]); + return buildPaginatedSessionHistory({ + messages: paginatedMessages, hasMore: start > 0, ...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}), - }; + }); } -function sanitizeRawTranscriptMessages(params: { +export function buildSessionHistorySnapshot(params: { rawMessages: unknown[]; maxChars?: number; limit?: number; cursor?: string; -}): PaginatedSessionHistory { - return paginateSessionMessages( - sanitizeChatHistoryMessages( - params.rawMessages, - params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, +}): SessionHistorySnapshot { + const history = paginateSessionMessages( + toSessionHistoryMessages( + sanitizeChatHistoryMessages( + params.rawMessages, + params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, + ), ), params.limit, params.cursor, ); + const rawHistoryMessages = toSessionHistoryMessages(params.rawMessages); + return { + history, + rawTranscriptSeq: resolveMessageSeq(rawHistoryMessages.at(-1)) ?? rawHistoryMessages.length, + }; } export class SessionHistorySseState { @@ -92,6 +124,22 @@ export class SessionHistorySseState { private sentHistory: PaginatedSessionHistory; private rawTranscriptSeq: number; + static fromRawSnapshot(params: { + target: SessionHistoryTranscriptTarget; + rawMessages: unknown[]; + maxChars?: number; + limit?: number; + cursor?: string; + }): SessionHistorySseState { + return new SessionHistorySseState({ + target: params.target, + maxChars: params.maxChars, + limit: params.limit, + cursor: params.cursor, + initialRawMessages: params.rawMessages, + }); + } + constructor(params: { target: SessionHistoryTranscriptTarget; maxChars?: number; @@ -104,13 +152,14 @@ export class SessionHistorySseState { this.limit = params.limit; this.cursor = params.cursor; const rawMessages = params.initialRawMessages ?? this.readRawMessages(); - this.sentHistory = sanitizeRawTranscriptMessages({ + const snapshot = buildSessionHistorySnapshot({ rawMessages, maxChars: this.maxChars, limit: this.limit, cursor: this.cursor, }); - this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length; + this.sentHistory = snapshot.history; + this.rawTranscriptSeq = snapshot.rawTranscriptSeq; } snapshot(): PaginatedSessionHistory { @@ -133,12 +182,15 @@ export class SessionHistorySseState { if (sanitized.length === 0) { return null; } - const sanitizedMessage = sanitized[0]; - this.sentHistory = { - items: [...this.sentHistory.items, sanitizedMessage], - messages: [...this.sentHistory.items, sanitizedMessage], + const [sanitizedMessage] = toSessionHistoryMessages(sanitized); + if (!sanitizedMessage) { + return null; + } + const nextMessages = [...this.sentHistory.messages, sanitizedMessage]; + this.sentHistory = buildPaginatedSessionHistory({ + messages: nextMessages, hasMore: false, - }; + }); return { message: sanitizedMessage, messageSeq: resolveMessageSeq(sanitizedMessage), @@ -146,15 +198,15 @@ export class SessionHistorySseState { } refresh(): PaginatedSessionHistory { - const rawMessages = this.readRawMessages(); - this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length; - this.sentHistory = sanitizeRawTranscriptMessages({ - rawMessages, + const snapshot = buildSessionHistorySnapshot({ + rawMessages: this.readRawMessages(), maxChars: this.maxChars, limit: this.limit, cursor: this.cursor, }); - return this.sentHistory; + this.rawTranscriptSeq = snapshot.rawTranscriptSeq; + this.sentHistory = snapshot.history; + return snapshot.history; } private readRawMessages(): unknown[] { diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index 0075bdc370e..33c2b984adc 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -13,6 +13,7 @@ import { createGatewaySuiteHarness, installGatewayTestHooks, rpcReq, + startServerWithClient, writeSessionStore, } from "./test-helpers.server.js"; @@ -33,6 +34,10 @@ async function createSessionStoreFile(): Promise { cleanupDirs.push(dir); const storePath = path.join(dir, "sessions.json"); testState.sessionStorePath = storePath; + await writeSessionStore({ + entries: {}, + storePath, + }); return storePath; } @@ -504,6 +509,51 @@ describe("session history HTTP endpoints", () => { }); }); + test("seeds SSE raw sequence state from startup snapshots, not only visible history", async () => { + const { storePath } = await seedSession({ text: "first message" }); + await appendTranscriptMessage({ + sessionKey: "agent:main:main", + storePath, + message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }), + emitInlineMessage: false, + }); + + await withGatewayHarness(async (harness) => { + const res = await fetchSessionHistory(harness.port, "agent:main:main", { + headers: { Accept: "text/event-stream" }, + }); + + expect(res.status).toBe(200); + const reader = res.body?.getReader(); + expect(reader).toBeTruthy(); + const streamState = { buffer: "" }; + const historyEvent = await readSseEvent(reader!, streamState); + expect(historyEvent.event).toBe("history"); + expect( + ( + historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> } + ).messages?.map((message) => message.content?.[0]?.text), + ).toEqual(["first message"]); + + const visible = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "third visible message", + storePath, + }); + expect(visible.ok).toBe(true); + + const messageEvent = await readSseEvent(reader!, streamState); + expect(messageEvent.event).toBe("message"); + expect( + (messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message + ?.content?.[0]?.text, + ).toBe("third visible message"); + expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3); + + await reader?.cancel(); + }); + }); + test("suppresses NO_REPLY-only SSE fast-path updates while preserving raw sequence numbering", async () => { const { storePath } = await seedSession({ text: "first message" }); @@ -629,8 +679,8 @@ describe("session history HTTP endpoints", () => { test("rejects session history when operator.read is not requested", async () => { await seedSession({ text: "scope-guarded history" }); - const harness = await createGatewaySuiteHarness(); - const ws = await harness.openWs(); + const started = await startServerWithClient("test-gateway-token-1234567890"); + const { server, ws, port, envSnapshot } = started; try { const connect = await connectReq(ws, { token: "test-gateway-token-1234567890", @@ -646,7 +696,7 @@ describe("session history HTTP endpoints", () => { expect(wsHistory.error?.message).toBe("missing scope: operator.read"); const httpHistory = await fetch( - `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`, + `http://127.0.0.1:${port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`, { headers: { ...AUTH_HEADER, @@ -664,7 +714,7 @@ describe("session history HTTP endpoints", () => { }); const httpHistoryWithoutScopes = await fetch( - `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`, + `http://127.0.0.1:${port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`, { headers: AUTH_HEADER, }, @@ -679,7 +729,8 @@ describe("session history HTTP endpoints", () => { }); } finally { ws.close(); - await harness.close(); + await server.close(); + envSnapshot.restore(); } }); }); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index bf59bcb233e..d48b5d6ec62 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -18,11 +18,8 @@ import { resolveTrustedHttpOperatorScopes, } from "./http-utils.js"; import { authorizeOperatorScopesForMethod } from "./method-scopes.js"; -import { - DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, - sanitizeChatHistoryMessages, -} from "./server-methods/chat.js"; -import { paginateSessionMessages, SessionHistorySseState } from "./session-history-state.js"; +import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS } from "./server-methods/chat.js"; +import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js"; import { readSessionMessages, resolveFreshestSessionEntryFromStoreKeys, @@ -166,8 +163,13 @@ export async function handleSessionHistoryHttpRequest( const rawSnapshot = entry?.sessionId ? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile) : []; - const sanitizedMessages = sanitizeChatHistoryMessages(rawSnapshot, effectiveMaxChars); - const history = paginateSessionMessages(sanitizedMessages, limit, cursor); + const historySnapshot = buildSessionHistorySnapshot({ + rawMessages: rawSnapshot, + maxChars: effectiveMaxChars, + limit, + cursor, + }); + const history = historySnapshot.history; if (!shouldStreamSse(req)) { sendJson(res, 200, { @@ -191,16 +193,16 @@ export async function handleSessionHistoryHttpRequest( : new Set(); let sentHistory = history; - const sseState = new SessionHistorySseState({ + const sseState = SessionHistorySseState.fromRawSnapshot({ target: { sessionId: entry.sessionId, storePath: target.storePath, sessionFile: entry.sessionFile, }, + rawMessages: rawSnapshot, maxChars: effectiveMaxChars, limit, cursor, - initialRawMessages: rawSnapshot, }); sentHistory = sseState.snapshot(); setSseHeaders(res);