fix(agents): abort stalled Anthropic SSE reads

This commit is contained in:
Peter Steinberger
2026-04-28 09:00:16 +01:00
parent a8b64b7d52
commit f3191b7962
5 changed files with 281 additions and 3 deletions

View File

@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Control UI/WebChat: keep large attachment payloads out of Lit state and optimistic chat messages, using object URL previews plus send-time payload serialization so PDF/image uploads no longer trigger `RangeError: Maximum call stack size exceeded`. Fixes #73360; refs #54378 and #63432. Thanks @hejunhui-73, @Ansub, and @christianhernandez3-afk.
- Agents/Anthropic: cancel stalled Anthropic Messages SSE body reads when abort signals fire, so active-memory timeouts release transport resources instead of leaving hidden recall runs parked on `reader.read()`. Refs #72965 and #73120. Thanks @wdeveloper16.
- Agents/models: keep per-agent primary models strict when `fallbacks` is omitted, so probe-only custom providers are not tried as hidden fallback candidates unless the agent explicitly opts in. Fixes #73332. Thanks @haumanto.
- Gateway/models: add `models.pricing.enabled` so offline or restricted-network installs can skip startup OpenRouter and LiteLLM pricing-catalog fetches while keeping explicit model costs working. Fixes #53639. Thanks @callebtc, @palewire, and @rjdjohnston.
- Onboarding: pin interactive and non-interactive health checks to the just-configured setup token/password so stale `OPENCLAW_GATEWAY_TOKEN` or `OPENCLAW_GATEWAY_PASSWORD` values do not produce false gateway-token-mismatch failures after setup. Fixes #72203. Thanks @galiniliev.

View File

@@ -1879,7 +1879,7 @@ async function maybeResolveActiveRecall(params: {
if (controller.signal.aborted) {
const result: ActiveRecallResult = {
status: "timeout",
elapsedMs: Date.now() - startedAt,
elapsedMs: params.config.timeoutMs,
summary: null,
};
if (params.config.logging) {

View File

@@ -0,0 +1,132 @@
import http from "node:http";
import type { Model } from "@mariozechner/pi-ai";
import { describe, expect, it } from "vitest";
import { createAnthropicMessagesTransportStreamFn } from "./anthropic-transport-stream.js";
import { isLiveTestEnabled } from "./live-test-helpers.js";
const LIVE = isLiveTestEnabled(["ANTHROPIC_TRANSPORT_LIVE_TEST"]);
const describeLive = LIVE ? describe : describe.skip;
type AnthropicMessagesModel = Model<"anthropic-messages">;
type AnthropicStreamFn = ReturnType<typeof createAnthropicMessagesTransportStreamFn>;
type AnthropicStreamContext = Parameters<AnthropicStreamFn>[1];
type AnthropicStreamOptions = Parameters<AnthropicStreamFn>[2];
function delay<T>(ms: number, value: T): Promise<T> {
return new Promise((resolve) => {
setTimeout(() => resolve(value), ms);
});
}
function waitForServerListening(server: http.Server): Promise<number> {
return new Promise((resolve, reject) => {
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
server.off("error", reject);
const address = server.address();
if (!address || typeof address === "string") {
reject(new Error("Expected loopback server to listen on a TCP port"));
return;
}
resolve(address.port);
});
});
}
async function closeServer(server: http.Server): Promise<void> {
if (!server.listening) {
return;
}
await new Promise<void>((resolve, reject) => {
server.close((error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
async function readRequestBody(request: http.IncomingMessage): Promise<string> {
const chunks: Buffer[] = [];
for await (const chunk of request) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return Buffer.concat(chunks).toString("utf8");
}
describeLive("anthropic transport stream live", () => {
it("cancels an in-flight SSE body read over a real HTTP stream", async () => {
const controller = new AbortController();
const abortReason = new Error("live anthropic stream abort");
let requestBody = "";
let requestClosed = false;
let resolveRequestClosed: (() => void) | undefined;
const requestClosedPromise = new Promise<void>((resolve) => {
resolveRequestClosed = resolve;
});
const server = http.createServer((request, response) => {
request.on("close", () => {
requestClosed = true;
resolveRequestClosed?.();
});
void readRequestBody(request).then((body) => {
requestBody = body;
response.writeHead(200, {
"content-type": "text/event-stream",
"cache-control": "no-cache",
});
response.write(
'data: {"type":"message_start","message":{"id":"msg_live","usage":{"input_tokens":1,"output_tokens":0}}}\n\n',
);
});
});
const port = await waitForServerListening(server);
try {
setTimeout(() => controller.abort(abortReason), 50);
const model: AnthropicMessagesModel = {
id: "claude-sonnet-4-6",
name: "Claude Sonnet 4.6",
api: "anthropic-messages",
provider: "anthropic",
baseUrl: `http://127.0.0.1:${port}/v1`,
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
};
const streamFn = createAnthropicMessagesTransportStreamFn();
const stream = await Promise.resolve(
streamFn(
model,
{ messages: [{ role: "user", content: "hello" }] } as AnthropicStreamContext,
{
apiKey: "sk-ant-live-transport-test",
signal: controller.signal,
} as AnthropicStreamOptions,
),
);
const timedOut = Symbol("timed out");
const result = await Promise.race([stream.result(), delay(1_000, timedOut)]);
if (result === timedOut) {
throw new Error("Anthropic live SSE stream did not abort within 1000ms");
}
await Promise.race([requestClosedPromise, delay(1_000, undefined)]);
expect(result.stopReason).toBe("aborted");
expect(result.errorMessage).toBe("live anthropic stream abort");
expect(requestClosed).toBe(true);
expect(JSON.parse(requestBody)).toMatchObject({
model: "claude-sonnet-4-6",
stream: true,
});
} finally {
await closeServer(server);
}
}, 10_000);
});

View File

@@ -27,6 +27,32 @@ function createSseResponse(events: Record<string, unknown>[] = []): Response {
});
}
function createStalledSseResponse(params: { onCancel: (reason: unknown) => void }): Response {
const encoder = new TextEncoder();
const body = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(
encoder.encode(
'data: {"type":"message_start","message":{"id":"msg_1","usage":{"input_tokens":1,"output_tokens":0}}}\n\n',
),
);
},
cancel(reason) {
params.onCancel(reason);
},
});
return new Response(body, {
status: 200,
headers: { "content-type": "text/event-stream" },
});
}
function delay<T>(ms: number, value: T): Promise<T> {
return new Promise((resolve) => {
setTimeout(() => resolve(value), ms);
});
}
function latestAnthropicRequest() {
const [, init] = guardedFetchMock.mock.calls.at(-1) ?? [];
const body = init?.body;
@@ -514,6 +540,64 @@ describe("anthropic transport stream", () => {
);
});
it("cancels stalled SSE body reads when the abort signal fires mid-stream", async () => {
const controller = new AbortController();
const abortReason = new Error("anthropic test abort");
let cancelReason: unknown;
guardedFetchMock.mockResolvedValueOnce(
createStalledSseResponse({
onCancel: (reason) => {
cancelReason = reason;
},
}),
);
setTimeout(() => controller.abort(abortReason), 50);
const timedOut = Symbol("timed out");
const startedAt = Date.now();
const result = await Promise.race([
runTransportStream(
makeAnthropicTransportModel(),
{ messages: [{ role: "user", content: "hello" }] } as AnthropicStreamContext,
{ apiKey: "sk-ant-api", signal: controller.signal } as AnthropicStreamOptions,
),
delay(1_000, timedOut),
]);
if (result === timedOut) {
throw new Error("Anthropic SSE stream did not abort within 1000ms");
}
expect(Date.now() - startedAt).toBeLessThan(1_000);
expect(result.stopReason).toBe("aborted");
expect(result.errorMessage).toBe("anthropic test abort");
expect(cancelReason).toBe(abortReason);
});
it("treats already-aborted signals as abort errors before reading SSE chunks", async () => {
const controller = new AbortController();
const abortReason = new Error("pre-aborted stream");
let cancelReason: unknown;
guardedFetchMock.mockResolvedValueOnce(
createStalledSseResponse({
onCancel: (reason) => {
cancelReason = reason;
},
}),
);
controller.abort(abortReason);
const result = await runTransportStream(
makeAnthropicTransportModel(),
{ messages: [{ role: "user", content: "hello" }] } as AnthropicStreamContext,
{ apiKey: "sk-ant-api", signal: controller.signal } as AnthropicStreamOptions,
);
expect(result.stopReason).toBe("aborted");
expect(result.errorMessage).toBe("pre-aborted stream");
expect(cancelReason).toBe(abortReason);
});
it("maps adaptive thinking effort for Claude 4.6 transport runs", async () => {
const model = makeAnthropicTransportModel({
id: "claude-opus-4-6",

View File

@@ -452,15 +452,76 @@ function resolveAnthropicMessagesUrl(baseUrl?: string): string {
return normalized.endsWith("/v1") ? `${normalized}/messages` : `${normalized}/v1/messages`;
}
function createAbortError(signal: AbortSignal): Error {
const reason = signal.reason;
if (reason instanceof Error) {
return reason;
}
const error =
reason === undefined
? new Error("Request was aborted")
: new Error("Request was aborted", { cause: reason });
error.name = "AbortError";
return error;
}
function readAnthropicSseChunk(
reader: ReadableStreamDefaultReader<Uint8Array>,
signal?: AbortSignal,
): Promise<ReadableStreamReadResult<Uint8Array>> {
if (!signal) {
return reader.read();
}
return new Promise((resolve, reject) => {
let settled = false;
const onAbort = () => {
if (settled) {
return;
}
settled = true;
signal.removeEventListener("abort", onAbort);
reader.cancel(signal.reason).catch(() => undefined);
reject(createAbortError(signal));
};
if (signal.aborted) {
onAbort();
return;
}
signal.addEventListener("abort", onAbort, { once: true });
reader.read().then(
(result) => {
if (settled) {
return;
}
settled = true;
signal.removeEventListener("abort", onAbort);
resolve(result);
},
(error: unknown) => {
if (settled) {
return;
}
settled = true;
signal.removeEventListener("abort", onAbort);
reject(error);
},
);
});
}
async function* parseAnthropicSseBody(
body: ReadableStream<Uint8Array>,
signal?: AbortSignal,
): AsyncIterable<Record<string, unknown>> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
const { done, value } = await readAnthropicSseChunk(reader, signal);
if (done) {
break;
}
@@ -531,7 +592,7 @@ function createAnthropicMessagesClient(params: {
if (!response.body) {
return;
}
yield* parseAnthropicSseBody(response.body);
yield* parseAnthropicSseBody(response.body, options?.signal);
},
},
};