From 27afd01577a250145771c1b4700837e32197a487 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 12 Apr 2026 18:17:37 +0100 Subject: [PATCH] test(gateway): share session history sse helpers --- src/gateway/sessions-history-http.test.ts | 277 +++++++++------------- 1 file changed, 114 insertions(+), 163 deletions(-) diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index e40c3d8bd8e..aa6079c1a6c 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -195,6 +195,78 @@ async function readSseEvent( } } +type SessionHistorySseStream = { + reader: ReadableStreamDefaultReader; + streamState: { buffer: string }; +}; + +async function openSessionHistorySse( + port: number, + sessionKey: string, + params?: { query?: string }, +): Promise { + const res = await fetchSessionHistory(port, sessionKey, { + query: params?.query, + headers: { Accept: "text/event-stream" }, + }); + expect(res.status).toBe(200); + const reader = res.body?.getReader(); + expect(reader).toBeTruthy(); + return { reader: reader!, streamState: { buffer: "" } }; +} + +async function expectHistoryEventTexts(stream: SessionHistorySseStream, expectedTexts: string[]) { + const event = await readSseEvent(stream.reader, stream.streamState); + expect(event.event).toBe("history"); + expect( + (event.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }).messages?.map( + (message) => message.content?.[0]?.text, + ), + ).toEqual(expectedTexts); + return event; +} + +async function expectMessageEventMatch( + stream: SessionHistorySseStream, + params: { text: string; seq: number; id?: string }, +) { + const event = await readSseEvent(stream.reader, stream.streamState); + expect(event.event).toBe("message"); + expect( + (event.data as { message?: { content?: Array<{ text?: string }> } }).message?.content?.[0] + ?.text, + ).toBe(params.text); + expect((event.data as { messageSeq?: number }).messageSeq).toBe(params.seq); + if (params.id !== undefined) { + expect( + (event.data as { message?: { __openclaw?: { id?: string; seq?: number } } }).message + ?.__openclaw, + ).toMatchObject({ + id: params.id, + seq: params.seq, + }); + } + return event; +} + +async function openBoundedHistoryStreamWithSecondMessage( + harnessPort: number, + storePath: string, +): Promise { + const second = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "second message", + storePath, + }); + expect(second.ok).toBe(true); + + const stream = await openSessionHistorySse(harnessPort, "agent:main:main", { + query: "?limit=1", + }); + await expectHistoryEventTexts(stream, ["second message"]); + return stream; +} + describe("session history HTTP endpoints", () => { test("returns session history over direct REST", async () => { await seedSession({ text: "hello from history" }); @@ -344,29 +416,9 @@ describe("session history HTTP endpoints", () => { test("streams bounded history windows over SSE", async () => { const { storePath } = await seedSession({ text: "first message" }); - const second = await appendAssistantMessageToSessionTranscript({ - sessionKey: "agent:main:main", - text: "second message", - storePath, - }); - expect(second.ok).toBe(true); await withGatewayHarness(async (harness) => { - const res = await fetchSessionHistory(harness.port, "agent:main:main", { - query: "?limit=1", - headers: { Accept: "text/event-stream" }, - }); - - expect(res.status).toBe(200); - const reader = res.body?.getReader(); - expect(reader).toBeTruthy(); - const streamState = { buffer: "" }; - const historyEvent = await readSseEvent(reader!, streamState); - expect(historyEvent.event).toBe("history"); - expect( - (historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }) - .messages?.[0]?.content?.[0]?.text, - ).toBe("second message"); + const stream = await openBoundedHistoryStreamWithSecondMessage(harness.port, storePath); const thirdMessageId = await appendTranscriptMessage({ sessionKey: "agent:main:main", @@ -375,7 +427,7 @@ describe("session history HTTP endpoints", () => { message: makeTranscriptAssistantMessage({ text: "third message" }), }); - const nextEvent = await readSseEvent(reader!, streamState); + const nextEvent = await readSseEvent(stream.reader, stream.streamState); expect(nextEvent.event).toBe("history"); const nextData = nextEvent.data as { messages?: Array<{ @@ -389,35 +441,15 @@ describe("session history HTTP endpoints", () => { seq: 3, }); - await reader?.cancel(); + await stream.reader.cancel(); }); }); test("seeds bounded SSE windows from visible history when transcript refreshes are silent", async () => { const { storePath } = await seedSession({ text: "first message" }); - const second = await appendAssistantMessageToSessionTranscript({ - sessionKey: "agent:main:main", - text: "second message", - storePath, - }); - expect(second.ok).toBe(true); await withGatewayHarness(async (harness) => { - const res = await fetchSessionHistory(harness.port, "agent:main:main", { - query: "?limit=1", - headers: { Accept: "text/event-stream" }, - }); - - expect(res.status).toBe(200); - const reader = res.body?.getReader(); - expect(reader).toBeTruthy(); - const streamState = { buffer: "" }; - const historyEvent = await readSseEvent(reader!, streamState); - expect(historyEvent.event).toBe("history"); - expect( - (historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }) - .messages?.[0]?.content?.[0]?.text, - ).toBe("second message"); + const stream = await openBoundedHistoryStreamWithSecondMessage(harness.port, storePath); await appendTranscriptMessage({ sessionKey: "agent:main:main", @@ -426,7 +458,7 @@ describe("session history HTTP endpoints", () => { message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }), }); - const refreshEvent = await readSseEvent(reader!, streamState); + const refreshEvent = await readSseEvent(stream.reader, stream.streamState); expect(refreshEvent.event).toBe("history"); const refreshData = refreshEvent.data as { messages?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>; @@ -434,7 +466,7 @@ describe("session history HTTP endpoints", () => { expect(refreshData.messages?.[0]?.content?.[0]?.text).toBe("second message"); expect(refreshData.messages?.[0]?.__openclaw?.seq).toBe(2); - await reader?.cancel(); + await stream.reader.cancel(); }); }); @@ -505,21 +537,8 @@ describe("session history HTTP endpoints", () => { const { storePath } = await seedSession({ text: "first message" }); await withGatewayHarness(async (harness) => { - const res = await fetchSessionHistory(harness.port, "agent:main:main", { - headers: { Accept: "text/event-stream" }, - }); - - expect(res.status).toBe(200); - expect(res.headers.get("content-type") ?? "").toContain("text/event-stream"); - const reader = res.body?.getReader(); - expect(reader).toBeTruthy(); - const streamState = { buffer: "" }; - const historyEvent = await readSseEvent(reader!, streamState); - expect(historyEvent.event).toBe("history"); - expect( - (historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }) - .messages?.[0]?.content?.[0]?.text, - ).toBe("first message"); + const stream = await openSessionHistorySse(harness.port, "agent:main:main"); + await expectHistoryEventTexts(stream, ["first message"]); const appended = await appendAssistantMessageToSessionTranscript({ sessionKey: "agent:main:main", @@ -528,36 +547,16 @@ describe("session history HTTP endpoints", () => { }); expect(appended.ok).toBe(true); - const messageEvent = await readSseEvent(reader!, streamState); - expect(messageEvent.event).toBe("message"); - expect( - ( - messageEvent.data as { - sessionKey?: string; - message?: { content?: Array<{ text?: string }> }; - } - ).sessionKey, - ).toBe("agent:main:main"); - expect( - (messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message - ?.content?.[0]?.text, - ).toBe("second message"); - expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(2); if (!appended.ok) { throw new Error(`append failed: ${appended.reason}`); } - expect( - ( - messageEvent.data as { - message?: { __openclaw?: { id?: string; seq?: number } }; - } - ).message?.__openclaw, - ).toMatchObject({ - id: appended.ok ? appended.messageId : undefined, + await expectMessageEventMatch(stream, { + text: "second message", seq: 2, + id: appended.messageId, }); - await reader?.cancel(); + await stream.reader.cancel(); }); }); @@ -571,21 +570,8 @@ describe("session history HTTP endpoints", () => { }); await withGatewayHarness(async (harness) => { - const res = await fetchSessionHistory(harness.port, "agent:main:main", { - headers: { Accept: "text/event-stream" }, - }); - - expect(res.status).toBe(200); - const reader = res.body?.getReader(); - expect(reader).toBeTruthy(); - const streamState = { buffer: "" }; - const historyEvent = await readSseEvent(reader!, streamState); - expect(historyEvent.event).toBe("history"); - expect( - ( - historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> } - ).messages?.map((message) => message.content?.[0]?.text), - ).toEqual(["first message"]); + const stream = await openSessionHistorySse(harness.port, "agent:main:main"); + await expectHistoryEventTexts(stream, ["first message"]); const visible = await appendAssistantMessageToSessionTranscript({ sessionKey: "agent:main:main", @@ -594,15 +580,12 @@ describe("session history HTTP endpoints", () => { }); expect(visible.ok).toBe(true); - const messageEvent = await readSseEvent(reader!, streamState); - expect(messageEvent.event).toBe("message"); - expect( - (messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message - ?.content?.[0]?.text, - ).toBe("third visible message"); - expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3); + await expectMessageEventMatch(stream, { + text: "third visible message", + seq: 3, + }); - await reader?.cancel(); + await stream.reader.cancel(); }); }); @@ -610,15 +593,8 @@ describe("session history HTTP endpoints", () => { const { storePath } = await seedSession({ text: "first message" }); await withGatewayHarness(async (harness) => { - const res = await fetchSessionHistory(harness.port, "agent:main:main", { - headers: { Accept: "text/event-stream" }, - }); - - expect(res.status).toBe(200); - const reader = res.body?.getReader(); - expect(reader).toBeTruthy(); - const streamState = { buffer: "" }; - await readSseEvent(reader!, streamState); + const stream = await openSessionHistorySse(harness.port, "agent:main:main"); + await expectHistoryEventTexts(stream, ["first message"]); const silent = await appendAssistantMessageToSessionTranscript({ sessionKey: "agent:main:main", @@ -634,25 +610,16 @@ describe("session history HTTP endpoints", () => { }); expect(visible.ok).toBe(true); - const messageEvent = await readSseEvent(reader!, streamState); - expect(messageEvent.event).toBe("message"); - expect( - (messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message - ?.content?.[0]?.text, - ).toBe("third visible message"); - expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3); - expect( - ( - messageEvent.data as { - message?: { __openclaw?: { id?: string; seq?: number } }; - } - ).message?.__openclaw, - ).toMatchObject({ - id: visible.ok ? visible.messageId : undefined, + if (!visible.ok) { + throw new Error(`append failed: ${visible.reason}`); + } + await expectMessageEventMatch(stream, { + text: "third visible message", seq: 3, + id: visible.messageId, }); - await reader?.cancel(); + await stream.reader.cancel(); }); }); @@ -660,15 +627,8 @@ describe("session history HTTP endpoints", () => { const { storePath } = await seedSession({ text: "first message" }); await withGatewayHarness(async (harness) => { - const res = await fetchSessionHistory(harness.port, "agent:main:main", { - headers: { Accept: "text/event-stream" }, - }); - - expect(res.status).toBe(200); - const reader = res.body?.getReader(); - expect(reader).toBeTruthy(); - const streamState = { buffer: "" }; - await readSseEvent(reader!, streamState); + const stream = await openSessionHistorySse(harness.port, "agent:main:main"); + await expectHistoryEventTexts(stream, ["first message"]); const second = await appendAssistantMessageToSessionTranscript({ sessionKey: "agent:main:main", @@ -677,13 +637,13 @@ describe("session history HTTP endpoints", () => { }); expect(second.ok).toBe(true); - const secondEvent = await readSseEvent(reader!, streamState); - expect(secondEvent.event).toBe("message"); - expect((secondEvent.data as { messageSeq?: number }).messageSeq).toBe(2); - if (!second.ok) { throw new Error(`append failed: ${second.reason}`); } + await expectMessageEventMatch(stream, { + text: "second visible message", + seq: 2, + }); await appendTranscriptMessage({ sessionKey: "agent:main:main", storePath, @@ -691,7 +651,7 @@ describe("session history HTTP endpoints", () => { emitInlineMessage: false, }); - const refreshEvent = await readSseEvent(reader!, streamState); + const refreshEvent = await readSseEvent(stream.reader, stream.streamState); expect(refreshEvent.event).toBe("history"); expect( ( @@ -706,25 +666,16 @@ describe("session history HTTP endpoints", () => { }); expect(third.ok).toBe(true); - const thirdEvent = await readSseEvent(reader!, streamState); - expect(thirdEvent.event).toBe("message"); - expect( - (thirdEvent.data as { message?: { content?: Array<{ text?: string }> } }).message - ?.content?.[0]?.text, - ).toBe("third visible message"); - expect((thirdEvent.data as { messageSeq?: number }).messageSeq).toBe(4); - expect( - ( - thirdEvent.data as { - message?: { __openclaw?: { id?: string; seq?: number } }; - } - ).message?.__openclaw, - ).toMatchObject({ - id: third.ok ? third.messageId : undefined, + if (!third.ok) { + throw new Error(`append failed: ${third.reason}`); + } + await expectMessageEventMatch(stream, { + text: "third visible message", seq: 4, + id: third.messageId, }); - await reader?.cancel(); + await stream.reader.cancel(); }); });