mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 17:04:46 +00:00
324 lines
10 KiB
TypeScript
324 lines
10 KiB
TypeScript
import { afterEach, describe, expect, it, vi } from "vitest";
|
|
import { TeamsHttpStream } from "./streaming-message.js";
|
|
|
|
async function flushStreamTimer(): Promise<void> {
|
|
await vi.advanceTimersByTimeAsync(1);
|
|
}
|
|
|
|
function requireMessageActivity(sent: unknown[]): Record<string, unknown> {
|
|
const activity = sent.find((entry) => (entry as Record<string, unknown>).type === "message") as
|
|
| Record<string, unknown>
|
|
| undefined;
|
|
if (!activity) {
|
|
throw new Error("expected final Teams message activity");
|
|
}
|
|
return activity;
|
|
}
|
|
|
|
function requireEntities(activity: Record<string, unknown>): Array<Record<string, unknown>> {
|
|
const entities = activity.entities;
|
|
if (!Array.isArray(entities)) {
|
|
throw new Error("expected Teams activity entities");
|
|
}
|
|
return entities as Array<Record<string, unknown>>;
|
|
}
|
|
|
|
function requireEntity(
|
|
activity: Record<string, unknown>,
|
|
predicate: (entity: Record<string, unknown>) => boolean,
|
|
label: string,
|
|
): Record<string, unknown> {
|
|
const entity = requireEntities(activity).find(predicate);
|
|
if (!entity) {
|
|
throw new Error(`expected ${label} entity`);
|
|
}
|
|
return entity;
|
|
}
|
|
|
|
function requireSendActivity(
|
|
sendActivity: ReturnType<typeof vi.fn>,
|
|
predicate: (activity: Record<string, unknown>) => boolean,
|
|
label: string,
|
|
): Record<string, unknown> {
|
|
const activity = sendActivity.mock.calls
|
|
.map(([sent]) => sent as Record<string, unknown>)
|
|
.find(predicate);
|
|
if (!activity) {
|
|
throw new Error(`expected ${label} sendActivity call`);
|
|
}
|
|
return activity;
|
|
}
|
|
|
|
describe("TeamsHttpStream", () => {
|
|
afterEach(() => {
|
|
vi.useRealTimers();
|
|
});
|
|
|
|
it("sends first chunk as typing activity with streaminfo", async () => {
|
|
vi.useFakeTimers();
|
|
|
|
const sent: unknown[] = [];
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "stream-1" };
|
|
}),
|
|
throttleMs: 1,
|
|
});
|
|
|
|
// Enough text to pass MIN_INITIAL_CHARS threshold
|
|
stream.update("Hello, this is a test response that is long enough.");
|
|
await flushStreamTimer();
|
|
|
|
expect(sent.length).toBeGreaterThanOrEqual(1);
|
|
const firstActivity = sent[0] as Record<string, unknown>;
|
|
expect(firstActivity.type).toBe("typing");
|
|
expect(typeof firstActivity.text).toBe("string");
|
|
expect(firstActivity.text as string).toContain("Hello");
|
|
// Should have streaminfo entity
|
|
const streamInfo = requireEntity(
|
|
firstActivity,
|
|
(entity) => entity.type === "streaminfo",
|
|
"streaminfo",
|
|
);
|
|
expect(streamInfo.streamType).toBe("streaming");
|
|
});
|
|
|
|
it("sends final message activity on finalize", async () => {
|
|
vi.useFakeTimers();
|
|
|
|
const sent: unknown[] = [];
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "stream-1" };
|
|
}),
|
|
throttleMs: 1,
|
|
});
|
|
|
|
stream.update("Hello, this is a complete response for finalization testing.");
|
|
await flushStreamTimer();
|
|
|
|
await stream.finalize();
|
|
|
|
// Find the final message activity
|
|
const finalActivity = requireMessageActivity(sent);
|
|
|
|
expect(finalActivity.text).toBe("Hello, this is a complete response for finalization testing.");
|
|
// No cursor in final
|
|
expect(finalActivity.text as string).not.toContain("\u258D");
|
|
|
|
// Should have AI-generated entity
|
|
const aiGenerated = requireEntity(
|
|
finalActivity,
|
|
(entity) =>
|
|
Array.isArray(entity.additionalType) &&
|
|
entity.additionalType.includes("AIGeneratedContent"),
|
|
"AI-generated content",
|
|
);
|
|
expect(aiGenerated.additionalType).toEqual(["AIGeneratedContent"]);
|
|
|
|
// Should have streaminfo with final type
|
|
const streamInfo = requireEntity(
|
|
finalActivity,
|
|
(entity) => entity.type === "streaminfo",
|
|
"streaminfo",
|
|
);
|
|
expect(streamInfo.streamType).toBe("final");
|
|
});
|
|
|
|
it("does not send below MIN_INITIAL_CHARS", async () => {
|
|
vi.useFakeTimers();
|
|
|
|
const sendActivity = vi.fn(async () => ({ id: "x" }));
|
|
const stream = new TeamsHttpStream({ sendActivity, throttleMs: 1 });
|
|
|
|
stream.update("Hi");
|
|
await flushStreamTimer();
|
|
|
|
expect(sendActivity).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("finalize with no content does nothing", async () => {
|
|
const sendActivity = vi.fn(async () => ({ id: "x" }));
|
|
const stream = new TeamsHttpStream({ sendActivity });
|
|
|
|
await stream.finalize();
|
|
expect(sendActivity).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("finalize sends content even if no chunks were streamed", async () => {
|
|
const sent: unknown[] = [];
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "msg-1" };
|
|
}),
|
|
});
|
|
|
|
// Short text — below MIN_INITIAL_CHARS, so no streaming chunk sent
|
|
stream.update("Short");
|
|
await stream.finalize();
|
|
|
|
// Should send final message even though no chunks were streamed
|
|
expect(sent.length).toBe(1);
|
|
const activity = sent[0] as Record<string, unknown>;
|
|
expect(activity.type).toBe("message");
|
|
expect(activity.text).toBe("Short");
|
|
});
|
|
|
|
it("sets feedbackLoopEnabled on final message", async () => {
|
|
vi.useFakeTimers();
|
|
|
|
const sent: unknown[] = [];
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "stream-1" };
|
|
}),
|
|
feedbackLoopEnabled: true,
|
|
throttleMs: 1,
|
|
});
|
|
|
|
stream.update("A response long enough to pass the minimum character threshold for streaming.");
|
|
await flushStreamTimer();
|
|
await stream.finalize();
|
|
|
|
const finalActivity = sent.find(
|
|
(a) => (a as Record<string, unknown>).type === "message",
|
|
) as Record<string, unknown>;
|
|
|
|
const channelData = finalActivity.channelData as Record<string, unknown>;
|
|
expect(channelData.feedbackLoopEnabled).toBe(true);
|
|
});
|
|
|
|
it("sends informative update with streamType informative", async () => {
|
|
const sent: unknown[] = [];
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "stream-1" };
|
|
}),
|
|
});
|
|
|
|
await stream.sendInformativeUpdate("Thinking...");
|
|
|
|
expect(sent.length).toBe(1);
|
|
const activity = sent[0] as Record<string, unknown>;
|
|
expect(activity.type).toBe("typing");
|
|
expect(activity.text).toBe("Thinking...");
|
|
const streamInfo = requireEntity(
|
|
activity,
|
|
(entity) => entity.type === "streaminfo",
|
|
"streaminfo",
|
|
);
|
|
expect(streamInfo.streamType).toBe("informative");
|
|
expect(streamInfo.streamSequence).toBe(1);
|
|
});
|
|
|
|
it("informative update establishes streamId for subsequent chunks", async () => {
|
|
vi.useFakeTimers();
|
|
|
|
const sent: unknown[] = [];
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "stream-1" };
|
|
}),
|
|
throttleMs: 1,
|
|
});
|
|
|
|
await stream.sendInformativeUpdate("Working...");
|
|
stream.update("Hello, this is a long enough response for streaming to begin.");
|
|
await flushStreamTimer();
|
|
|
|
// Second activity (streaming chunk) should have the streamId from the informative update
|
|
expect(sent.length).toBeGreaterThanOrEqual(2);
|
|
const chunk = sent[1] as Record<string, unknown>;
|
|
const streamInfo = requireEntity(chunk, (entity) => entity.type === "streaminfo", "streaminfo");
|
|
expect(streamInfo.streamId).toBe("stream-1");
|
|
});
|
|
|
|
it("reports failure when replacing informative progress with final text fails", async () => {
|
|
const sendActivity = vi.fn(async (activity: Record<string, unknown>) => {
|
|
if (activity.type === "message") {
|
|
throw new Error("final send rejected");
|
|
}
|
|
return { id: "stream-1" };
|
|
});
|
|
const stream = new TeamsHttpStream({ sendActivity, throttleMs: 1 });
|
|
|
|
await stream.sendInformativeUpdate("Thinking");
|
|
const carried = await stream.replaceInformativeWithFinal(
|
|
"Final response long enough to stream before the final message send fails.",
|
|
);
|
|
|
|
expect(carried).toBe(false);
|
|
expect(stream.isFailed).toBe(true);
|
|
const finalSend = requireSendActivity(
|
|
sendActivity,
|
|
(activity) => activity.type === "message",
|
|
"final message",
|
|
);
|
|
expect(finalSend.type).toBe("message");
|
|
expect(finalSend.text).toBe(
|
|
"Final response long enough to stream before the final message send fails.",
|
|
);
|
|
});
|
|
|
|
it("hasContent is true after update", () => {
|
|
const stream = new TeamsHttpStream({
|
|
sendActivity: vi.fn(async () => ({ id: "x" })),
|
|
});
|
|
|
|
expect(stream.hasContent).toBe(false);
|
|
stream.update("some text");
|
|
expect(stream.hasContent).toBe(true);
|
|
});
|
|
|
|
it("double finalize is a no-op", async () => {
|
|
const sendActivity = vi.fn(async () => ({ id: "x" }));
|
|
const stream = new TeamsHttpStream({ sendActivity });
|
|
|
|
stream.update("A response long enough to pass the minimum character threshold.");
|
|
await stream.finalize();
|
|
const callCount = sendActivity.mock.calls.length;
|
|
|
|
await stream.finalize();
|
|
expect(sendActivity.mock.calls.length).toBe(callCount);
|
|
});
|
|
|
|
it("stops streaming before stream age timeout and finalizes with last good text", async () => {
|
|
vi.useFakeTimers();
|
|
|
|
const sent: unknown[] = [];
|
|
const sendActivity = vi.fn(async (activity) => {
|
|
sent.push(activity);
|
|
return { id: "stream-1" };
|
|
});
|
|
const stream = new TeamsHttpStream({ sendActivity, throttleMs: 1 });
|
|
|
|
stream.update("Hello, this is a long enough response for streaming to begin.");
|
|
await vi.advanceTimersByTimeAsync(1);
|
|
|
|
stream.update(
|
|
"Hello, this is a long enough response for streaming to begin. More text before timeout.",
|
|
);
|
|
await vi.advanceTimersByTimeAsync(1);
|
|
|
|
vi.setSystemTime(new Date(Date.now() + 45_001));
|
|
stream.update(
|
|
"Hello, this is a long enough response for streaming to begin. More text before timeout. Even more text after timeout.",
|
|
);
|
|
await vi.advanceTimersByTimeAsync(1);
|
|
|
|
expect(stream.isFailed).toBe(true);
|
|
|
|
const finalActivity = requireMessageActivity(sent);
|
|
|
|
expect(finalActivity.text).toBe(
|
|
"Hello, this is a long enough response for streaming to begin. More text before timeout.",
|
|
);
|
|
});
|
|
});
|