fix: harden agent core sdk proxy surfaces

This commit is contained in:
Peter Steinberger
2026-05-27 00:05:19 +01:00
parent c5a50b06ce
commit e29b564bd7
4 changed files with 155 additions and 11 deletions

View File

@@ -0,0 +1,94 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { Context, Model, Usage } from "../../llm/types.js";
import { streamProxy } from "./proxy.js";
const usage: Usage = {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
};
const model: Model = {
id: "test-model",
provider: "test",
api: "openai-responses",
maxTokens: 1024,
};
const context: Context = {
messages: [{ role: "user", content: "hello" }],
};
function responseFromText(text: string): Response {
return new Response(
new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(text));
controller.close();
},
}),
{ status: 200 },
);
}
describe("streamProxy", () => {
afterEach(() => {
vi.unstubAllGlobals();
});
it("flushes a final SSE frame without a trailing newline", async () => {
vi.stubGlobal(
"fetch",
vi.fn(async () =>
responseFromText(
`data: ${JSON.stringify({
type: "done",
reason: "stop",
usage,
})}`,
),
),
);
const stream = streamProxy(model, context, {
authToken: "token",
proxyUrl: "https://proxy.example",
});
const events = [];
for await (const event of stream) {
events.push(event);
}
expect(events.at(-1)?.type).toBe("done");
await expect(stream.result()).resolves.toMatchObject({
role: "assistant",
stopReason: "stop",
usage,
});
});
it("returns an error result when EOF arrives without a terminal event", async () => {
vi.stubGlobal(
"fetch",
vi.fn(async () => responseFromText(`data: ${JSON.stringify({ type: "start" })}`)),
);
const stream = streamProxy(model, context, {
authToken: "token",
proxyUrl: "https://proxy.example",
});
const events = [];
for await (const event of stream) {
events.push(event);
}
expect(events.at(-1)?.type).toBe("error");
await expect(stream.result()).resolves.toMatchObject({
stopReason: "error",
errorMessage: "Proxy stream ended before terminal event",
});
});
});

View File

@@ -189,6 +189,24 @@ export function streamProxy(
reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
let terminalEventSeen = false;
const processSseLine = (line: string) => {
if (!line.startsWith("data: ")) {
return;
}
const data = line.slice(6).trim();
if (!data) {
return;
}
const proxyEvent = JSON.parse(data) as ProxyAssistantMessageEvent;
const event = processProxyEvent(proxyEvent, partial);
if (!event) {
return;
}
terminalEventSeen = event.type === "done" || event.type === "error";
stream.push(event);
};
while (true) {
const { done, value } = await reader.read();
@@ -205,22 +223,20 @@ export function streamProxy(
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6).trim();
if (data) {
const proxyEvent = JSON.parse(data) as ProxyAssistantMessageEvent;
const event = processProxyEvent(proxyEvent, partial);
if (event) {
stream.push(event);
}
}
}
processSseLine(line);
}
}
if (options.signal?.aborted) {
throw new Error("Request aborted by user");
}
buffer += decoder.decode();
if (buffer.trim()) {
processSseLine(buffer);
}
if (!terminalEventSeen) {
throw new Error("Proxy stream ended before terminal event");
}
stream.end();
} catch (error) {

View File

@@ -0,0 +1,12 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { describe, expect, it } from "vitest";
describe("plugin-sdk/agent-core", () => {
it("keeps public declaration imports package-relative", () => {
const source = readFileSync(resolve(process.cwd(), "src/plugin-sdk/agent-core.ts"), "utf8");
expect(source).toContain("../../packages/agent-core/src/index.js");
expect(source).not.toContain("../agents/runtime/index.js");
});
});

View File

@@ -1 +1,23 @@
export * from "../agents/runtime/index.js";
import {
Agent as CoreAgent,
type AgentOptions as CoreAgentOptions,
} from "../../packages/agent-core/src/agent.js";
import type { CompleteSimpleFn, StreamFn } from "../../packages/agent-core/src/llm.js";
import type { AgentCoreRuntimeDeps } from "../../packages/agent-core/src/runtime-deps.js";
import { completeSimple, streamSimple } from "./llm.js";
export const openClawAgentCoreRuntime = {
completeSimple: completeSimple as unknown as CompleteSimpleFn,
streamSimple: streamSimple as unknown as StreamFn,
} satisfies AgentCoreRuntimeDeps;
export class Agent extends CoreAgent {
constructor(options: CoreAgentOptions = {}) {
super({ runtime: openClawAgentCoreRuntime, ...options });
}
}
// OpenClaw-owned reusable agent core
export * from "../../packages/agent-core/src/index.js";
// Proxy utilities
export * from "../agents/runtime/proxy.js";