test(gateway): share session history sse helpers

This commit is contained in:
Vincent Koc
2026-04-12 18:17:37 +01:00
parent 077cfca229
commit 27afd01577

View File

@@ -195,6 +195,78 @@ async function readSseEvent(
}
}
type SessionHistorySseStream = {
reader: ReadableStreamDefaultReader<Uint8Array>;
streamState: { buffer: string };
};
async function openSessionHistorySse(
port: number,
sessionKey: string,
params?: { query?: string },
): Promise<SessionHistorySseStream> {
const res = await fetchSessionHistory(port, sessionKey, {
query: params?.query,
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
return { reader: reader!, streamState: { buffer: "" } };
}
async function expectHistoryEventTexts(stream: SessionHistorySseStream, expectedTexts: string[]) {
const event = await readSseEvent(stream.reader, stream.streamState);
expect(event.event).toBe("history");
expect(
(event.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }).messages?.map(
(message) => message.content?.[0]?.text,
),
).toEqual(expectedTexts);
return event;
}
async function expectMessageEventMatch(
stream: SessionHistorySseStream,
params: { text: string; seq: number; id?: string },
) {
const event = await readSseEvent(stream.reader, stream.streamState);
expect(event.event).toBe("message");
expect(
(event.data as { message?: { content?: Array<{ text?: string }> } }).message?.content?.[0]
?.text,
).toBe(params.text);
expect((event.data as { messageSeq?: number }).messageSeq).toBe(params.seq);
if (params.id !== undefined) {
expect(
(event.data as { message?: { __openclaw?: { id?: string; seq?: number } } }).message
?.__openclaw,
).toMatchObject({
id: params.id,
seq: params.seq,
});
}
return event;
}
async function openBoundedHistoryStreamWithSecondMessage(
harnessPort: number,
storePath: string,
): Promise<SessionHistorySseStream> {
const second = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "second message",
storePath,
});
expect(second.ok).toBe(true);
const stream = await openSessionHistorySse(harnessPort, "agent:main:main", {
query: "?limit=1",
});
await expectHistoryEventTexts(stream, ["second message"]);
return stream;
}
describe("session history HTTP endpoints", () => {
test("returns session history over direct REST", async () => {
await seedSession({ text: "hello from history" });
@@ -344,29 +416,9 @@ describe("session history HTTP endpoints", () => {
test("streams bounded history windows over SSE", async () => {
const { storePath } = await seedSession({ text: "first message" });
const second = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "second message",
storePath,
});
expect(second.ok).toBe(true);
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
query: "?limit=1",
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect(
(historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> })
.messages?.[0]?.content?.[0]?.text,
).toBe("second message");
const stream = await openBoundedHistoryStreamWithSecondMessage(harness.port, storePath);
const thirdMessageId = await appendTranscriptMessage({
sessionKey: "agent:main:main",
@@ -375,7 +427,7 @@ describe("session history HTTP endpoints", () => {
message: makeTranscriptAssistantMessage({ text: "third message" }),
});
const nextEvent = await readSseEvent(reader!, streamState);
const nextEvent = await readSseEvent(stream.reader, stream.streamState);
expect(nextEvent.event).toBe("history");
const nextData = nextEvent.data as {
messages?: Array<{
@@ -389,35 +441,15 @@ describe("session history HTTP endpoints", () => {
seq: 3,
});
await reader?.cancel();
await stream.reader.cancel();
});
});
test("seeds bounded SSE windows from visible history when transcript refreshes are silent", async () => {
const { storePath } = await seedSession({ text: "first message" });
const second = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "second message",
storePath,
});
expect(second.ok).toBe(true);
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
query: "?limit=1",
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect(
(historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> })
.messages?.[0]?.content?.[0]?.text,
).toBe("second message");
const stream = await openBoundedHistoryStreamWithSecondMessage(harness.port, storePath);
await appendTranscriptMessage({
sessionKey: "agent:main:main",
@@ -426,7 +458,7 @@ describe("session history HTTP endpoints", () => {
message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }),
});
const refreshEvent = await readSseEvent(reader!, streamState);
const refreshEvent = await readSseEvent(stream.reader, stream.streamState);
expect(refreshEvent.event).toBe("history");
const refreshData = refreshEvent.data as {
messages?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>;
@@ -434,7 +466,7 @@ describe("session history HTTP endpoints", () => {
expect(refreshData.messages?.[0]?.content?.[0]?.text).toBe("second message");
expect(refreshData.messages?.[0]?.__openclaw?.seq).toBe(2);
await reader?.cancel();
await stream.reader.cancel();
});
});
@@ -505,21 +537,8 @@ describe("session history HTTP endpoints", () => {
const { storePath } = await seedSession({ text: "first message" });
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
expect(res.headers.get("content-type") ?? "").toContain("text/event-stream");
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect(
(historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> })
.messages?.[0]?.content?.[0]?.text,
).toBe("first message");
const stream = await openSessionHistorySse(harness.port, "agent:main:main");
await expectHistoryEventTexts(stream, ["first message"]);
const appended = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
@@ -528,36 +547,16 @@ describe("session history HTTP endpoints", () => {
});
expect(appended.ok).toBe(true);
const messageEvent = await readSseEvent(reader!, streamState);
expect(messageEvent.event).toBe("message");
expect(
(
messageEvent.data as {
sessionKey?: string;
message?: { content?: Array<{ text?: string }> };
}
).sessionKey,
).toBe("agent:main:main");
expect(
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("second message");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(2);
if (!appended.ok) {
throw new Error(`append failed: ${appended.reason}`);
}
expect(
(
messageEvent.data as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: appended.ok ? appended.messageId : undefined,
await expectMessageEventMatch(stream, {
text: "second message",
seq: 2,
id: appended.messageId,
});
await reader?.cancel();
await stream.reader.cancel();
});
});
@@ -571,21 +570,8 @@ describe("session history HTTP endpoints", () => {
});
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
const historyEvent = await readSseEvent(reader!, streamState);
expect(historyEvent.event).toBe("history");
expect(
(
historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }
).messages?.map((message) => message.content?.[0]?.text),
).toEqual(["first message"]);
const stream = await openSessionHistorySse(harness.port, "agent:main:main");
await expectHistoryEventTexts(stream, ["first message"]);
const visible = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
@@ -594,15 +580,12 @@ describe("session history HTTP endpoints", () => {
});
expect(visible.ok).toBe(true);
const messageEvent = await readSseEvent(reader!, streamState);
expect(messageEvent.event).toBe("message");
expect(
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("third visible message");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3);
await expectMessageEventMatch(stream, {
text: "third visible message",
seq: 3,
});
await reader?.cancel();
await stream.reader.cancel();
});
});
@@ -610,15 +593,8 @@ describe("session history HTTP endpoints", () => {
const { storePath } = await seedSession({ text: "first message" });
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
await readSseEvent(reader!, streamState);
const stream = await openSessionHistorySse(harness.port, "agent:main:main");
await expectHistoryEventTexts(stream, ["first message"]);
const silent = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
@@ -634,25 +610,16 @@ describe("session history HTTP endpoints", () => {
});
expect(visible.ok).toBe(true);
const messageEvent = await readSseEvent(reader!, streamState);
expect(messageEvent.event).toBe("message");
expect(
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("third visible message");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3);
expect(
(
messageEvent.data as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: visible.ok ? visible.messageId : undefined,
if (!visible.ok) {
throw new Error(`append failed: ${visible.reason}`);
}
await expectMessageEventMatch(stream, {
text: "third visible message",
seq: 3,
id: visible.messageId,
});
await reader?.cancel();
await stream.reader.cancel();
});
});
@@ -660,15 +627,8 @@ describe("session history HTTP endpoints", () => {
const { storePath } = await seedSession({ text: "first message" });
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
await readSseEvent(reader!, streamState);
const stream = await openSessionHistorySse(harness.port, "agent:main:main");
await expectHistoryEventTexts(stream, ["first message"]);
const second = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
@@ -677,13 +637,13 @@ describe("session history HTTP endpoints", () => {
});
expect(second.ok).toBe(true);
const secondEvent = await readSseEvent(reader!, streamState);
expect(secondEvent.event).toBe("message");
expect((secondEvent.data as { messageSeq?: number }).messageSeq).toBe(2);
if (!second.ok) {
throw new Error(`append failed: ${second.reason}`);
}
await expectMessageEventMatch(stream, {
text: "second visible message",
seq: 2,
});
await appendTranscriptMessage({
sessionKey: "agent:main:main",
storePath,
@@ -691,7 +651,7 @@ describe("session history HTTP endpoints", () => {
emitInlineMessage: false,
});
const refreshEvent = await readSseEvent(reader!, streamState);
const refreshEvent = await readSseEvent(stream.reader, stream.streamState);
expect(refreshEvent.event).toBe("history");
expect(
(
@@ -706,25 +666,16 @@ describe("session history HTTP endpoints", () => {
});
expect(third.ok).toBe(true);
const thirdEvent = await readSseEvent(reader!, streamState);
expect(thirdEvent.event).toBe("message");
expect(
(thirdEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("third visible message");
expect((thirdEvent.data as { messageSeq?: number }).messageSeq).toBe(4);
expect(
(
thirdEvent.data as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: third.ok ? third.messageId : undefined,
if (!third.ok) {
throw new Error(`append failed: ${third.reason}`);
}
await expectMessageEventMatch(stream, {
text: "third visible message",
seq: 4,
id: third.messageId,
});
await reader?.cancel();
await stream.reader.cancel();
});
});