diff --git a/src/agents/runtime/proxy.test.ts b/src/agents/runtime/proxy.test.ts new file mode 100644 index 00000000000..a89ef58b946 --- /dev/null +++ b/src/agents/runtime/proxy.test.ts @@ -0,0 +1,94 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { Context, Model, Usage } from "../../llm/types.js"; +import { streamProxy } from "./proxy.js"; + +const usage: Usage = { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, +}; + +const model: Model = { + id: "test-model", + provider: "test", + api: "openai-responses", + maxTokens: 1024, +}; + +const context: Context = { + messages: [{ role: "user", content: "hello" }], +}; + +function responseFromText(text: string): Response { + return new Response( + new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(text)); + controller.close(); + }, + }), + { status: 200 }, + ); +} + +describe("streamProxy", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("flushes a final SSE frame without a trailing newline", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async () => + responseFromText( + `data: ${JSON.stringify({ + type: "done", + reason: "stop", + usage, + })}`, + ), + ), + ); + + const stream = streamProxy(model, context, { + authToken: "token", + proxyUrl: "https://proxy.example", + }); + const events = []; + for await (const event of stream) { + events.push(event); + } + + expect(events.at(-1)?.type).toBe("done"); + await expect(stream.result()).resolves.toMatchObject({ + role: "assistant", + stopReason: "stop", + usage, + }); + }); + + it("returns an error result when EOF arrives without a terminal event", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async () => responseFromText(`data: ${JSON.stringify({ type: "start" })}`)), + ); + + const stream = streamProxy(model, context, { + authToken: "token", + proxyUrl: "https://proxy.example", + }); + const events = []; + for await (const event of stream) { + events.push(event); + } + + expect(events.at(-1)?.type).toBe("error"); + await expect(stream.result()).resolves.toMatchObject({ + stopReason: "error", + errorMessage: "Proxy stream ended before terminal event", + }); + }); +}); diff --git a/src/agents/runtime/proxy.ts b/src/agents/runtime/proxy.ts index d7ca996fd04..e624963ed36 100644 --- a/src/agents/runtime/proxy.ts +++ b/src/agents/runtime/proxy.ts @@ -189,6 +189,24 @@ export function streamProxy( reader = response.body!.getReader(); const decoder = new TextDecoder(); let buffer = ""; + let terminalEventSeen = false; + + const processSseLine = (line: string) => { + if (!line.startsWith("data: ")) { + return; + } + const data = line.slice(6).trim(); + if (!data) { + return; + } + const proxyEvent = JSON.parse(data) as ProxyAssistantMessageEvent; + const event = processProxyEvent(proxyEvent, partial); + if (!event) { + return; + } + terminalEventSeen = event.type === "done" || event.type === "error"; + stream.push(event); + }; while (true) { const { done, value } = await reader.read(); @@ -205,22 +223,20 @@ export function streamProxy( buffer = lines.pop() || ""; for (const line of lines) { - if (line.startsWith("data: ")) { - const data = line.slice(6).trim(); - if (data) { - const proxyEvent = JSON.parse(data) as ProxyAssistantMessageEvent; - const event = processProxyEvent(proxyEvent, partial); - if (event) { - stream.push(event); - } - } - } + processSseLine(line); } } if (options.signal?.aborted) { throw new Error("Request aborted by user"); } + buffer += decoder.decode(); + if (buffer.trim()) { + processSseLine(buffer); + } + if (!terminalEventSeen) { + throw new Error("Proxy stream ended before terminal event"); + } stream.end(); } catch (error) { diff --git a/src/plugin-sdk/agent-core.test.ts b/src/plugin-sdk/agent-core.test.ts new file mode 100644 index 00000000000..9c3e6af3cb4 --- /dev/null +++ b/src/plugin-sdk/agent-core.test.ts @@ -0,0 +1,12 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { describe, expect, it } from "vitest"; + +describe("plugin-sdk/agent-core", () => { + it("keeps public declaration imports package-relative", () => { + const source = readFileSync(resolve(process.cwd(), "src/plugin-sdk/agent-core.ts"), "utf8"); + + expect(source).toContain("../../packages/agent-core/src/index.js"); + expect(source).not.toContain("../agents/runtime/index.js"); + }); +}); diff --git a/src/plugin-sdk/agent-core.ts b/src/plugin-sdk/agent-core.ts index dc1ce0f0faa..d3f1c8b56ab 100644 --- a/src/plugin-sdk/agent-core.ts +++ b/src/plugin-sdk/agent-core.ts @@ -1 +1,23 @@ -export * from "../agents/runtime/index.js"; +import { + Agent as CoreAgent, + type AgentOptions as CoreAgentOptions, +} from "../../packages/agent-core/src/agent.js"; +import type { CompleteSimpleFn, StreamFn } from "../../packages/agent-core/src/llm.js"; +import type { AgentCoreRuntimeDeps } from "../../packages/agent-core/src/runtime-deps.js"; +import { completeSimple, streamSimple } from "./llm.js"; + +export const openClawAgentCoreRuntime = { + completeSimple: completeSimple as unknown as CompleteSimpleFn, + streamSimple: streamSimple as unknown as StreamFn, +} satisfies AgentCoreRuntimeDeps; + +export class Agent extends CoreAgent { + constructor(options: CoreAgentOptions = {}) { + super({ runtime: openClawAgentCoreRuntime, ...options }); + } +} + +// OpenClaw-owned reusable agent core +export * from "../../packages/agent-core/src/index.js"; +// Proxy utilities +export * from "../agents/runtime/proxy.js";