import fs from "node:fs/promises"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { loadModelCatalog } from "../agents/model-catalog.js"; import type { OpenClawConfig } from "../config/config.js"; import { withTempHome as withTempHomeHarness } from "../config/home-env.test-harness.js"; import { getReplyFromConfig } from "./reply.js"; type RunEmbeddedPiAgent = typeof import("../agents/pi-embedded.js").runEmbeddedPiAgent; type RunEmbeddedPiAgentParams = Parameters[0]; type RunEmbeddedPiAgentReply = Awaited>; const piEmbeddedMock = vi.hoisted(() => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), })); vi.mock("/src/agents/pi-embedded.js", () => piEmbeddedMock); vi.mock("../agents/pi-embedded.js", () => piEmbeddedMock); vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), })); type GetReplyOptions = NonNullable[1]>; function createEmbeddedReply(text: string): RunEmbeddedPiAgentReply { return { payloads: [{ text }], meta: { durationMs: 5, agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }; } function createTelegramMessage(messageSid: string) { return { Body: "ping", From: "+1004", To: "+2000", MessageSid: messageSid, Provider: "telegram", } as const; } function createReplyConfig(home: string, streamMode?: "block"): OpenClawConfig { return { agents: { defaults: { model: { primary: "anthropic/claude-opus-4-5" }, workspace: path.join(home, "openclaw"), }, }, channels: { telegram: { allowFrom: ["*"], streamMode } }, session: { store: path.join(home, "sessions.json") }, }; } async function runTelegramReply(params: { home: string; messageSid: string; onBlockReply?: GetReplyOptions["onBlockReply"]; onReplyStart?: GetReplyOptions["onReplyStart"]; disableBlockStreaming?: boolean; streamMode?: "block"; }) { return getReplyFromConfig( createTelegramMessage(params.messageSid), { onReplyStart: params.onReplyStart, onBlockReply: params.onBlockReply, disableBlockStreaming: params.disableBlockStreaming, }, createReplyConfig(params.home, params.streamMode), ); } async function withTempHome(fn: (home: string) => Promise): Promise { return withTempHomeHarness("openclaw-stream-", async (home) => { await fs.mkdir(path.join(home, ".openclaw", "agents", "main", "sessions"), { recursive: true }); return fn(home); }); } describe("block streaming", () => { beforeEach(() => { vi.stubEnv("OPENCLAW_TEST_FAST", "1"); piEmbeddedMock.abortEmbeddedPiRun.mockClear().mockReturnValue(false); piEmbeddedMock.queueEmbeddedPiMessage.mockClear().mockReturnValue(false); piEmbeddedMock.isEmbeddedPiRunActive.mockClear().mockReturnValue(false); piEmbeddedMock.isEmbeddedPiRunStreaming.mockClear().mockReturnValue(false); piEmbeddedMock.runEmbeddedPiAgent.mockClear(); vi.mocked(loadModelCatalog).mockResolvedValue([ { id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" }, { id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" }, ]); }); it("handles ordering, timeout fallback, and telegram streamMode block", async () => { await withTempHome(async (home) => { let releaseTyping: (() => void) | undefined; const typingGate = new Promise((resolve) => { releaseTyping = resolve; }); let resolveOnReplyStart: (() => void) | undefined; const onReplyStartCalled = new Promise((resolve) => { resolveOnReplyStart = resolve; }); const onReplyStart = vi.fn(() => { resolveOnReplyStart?.(); return typingGate; }); const seen: string[] = []; const onBlockReply = vi.fn(async (payload) => { seen.push(payload.text ?? ""); }); const impl = async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "first" }); void params.onBlockReply?.({ text: "second" }); return { payloads: [{ text: "first" }, { text: "second" }], meta: createEmbeddedReply("first").meta, }; }; piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); const replyPromise = runTelegramReply({ home, messageSid: "msg-123", onReplyStart, onBlockReply, disableBlockStreaming: false, }); await onReplyStartCalled; releaseTyping?.(); const res = await replyPromise; expect(res).toBeUndefined(); expect(seen).toEqual(["first\n\nsecond"]); const onBlockReplyStreamMode = vi.fn().mockResolvedValue(undefined); piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(async () => createEmbeddedReply("final"), ); const resStreamMode = await runTelegramReply({ home, messageSid: "msg-127", onBlockReply: onBlockReplyStreamMode, streamMode: "block", }); const streamPayload = Array.isArray(resStreamMode) ? resStreamMode[0] : resStreamMode; expect(streamPayload?.text).toBe("final"); expect(onBlockReplyStreamMode).not.toHaveBeenCalled(); }); }); it("trims leading whitespace in block-streamed replies", async () => { await withTempHome(async (home) => { const seen: string[] = []; const onBlockReply = vi.fn(async (payload) => { seen.push(payload.text ?? ""); }); piEmbeddedMock.runEmbeddedPiAgent.mockImplementation( async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "\n\n Hello from stream" }); return createEmbeddedReply("\n\n Hello from stream"); }, ); const res = await runTelegramReply({ home, messageSid: "msg-128", onBlockReply, disableBlockStreaming: false, }); expect(res).toBeUndefined(); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(seen).toEqual(["Hello from stream"]); }); }); it("still parses media directives for direct block payloads", async () => { await withTempHome(async (home) => { const onBlockReply = vi.fn(); piEmbeddedMock.runEmbeddedPiAgent.mockImplementation( async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "Result\nMEDIA: ./image.png" }); return createEmbeddedReply("Result\nMEDIA: ./image.png"); }, ); const res = await runTelegramReply({ home, messageSid: "msg-129", onBlockReply, disableBlockStreaming: false, }); expect(res).toBeUndefined(); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(onBlockReply.mock.calls[0][0]).toMatchObject({ text: "Result", mediaUrls: [path.join(home, "openclaw", "image.png")], }); }); }); });