diff --git a/CHANGELOG.md b/CHANGELOG.md index 62b16df83e7..e101f7dd8b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Control UI/WebChat: keep large attachment payloads out of Lit state and optimistic chat messages, using object URL previews plus send-time payload serialization so PDF/image uploads no longer trigger `RangeError: Maximum call stack size exceeded`. Fixes #73360; refs #54378 and #63432. Thanks @hejunhui-73, @Ansub, and @christianhernandez3-afk. +- Agents/Anthropic: cancel stalled Anthropic Messages SSE body reads when abort signals fire, so active-memory timeouts release transport resources instead of leaving hidden recall runs parked on `reader.read()`. Refs #72965 and #73120. Thanks @wdeveloper16. - Agents/models: keep per-agent primary models strict when `fallbacks` is omitted, so probe-only custom providers are not tried as hidden fallback candidates unless the agent explicitly opts in. Fixes #73332. Thanks @haumanto. - Gateway/models: add `models.pricing.enabled` so offline or restricted-network installs can skip startup OpenRouter and LiteLLM pricing-catalog fetches while keeping explicit model costs working. Fixes #53639. Thanks @callebtc, @palewire, and @rjdjohnston. - Onboarding: pin interactive and non-interactive health checks to the just-configured setup token/password so stale `OPENCLAW_GATEWAY_TOKEN` or `OPENCLAW_GATEWAY_PASSWORD` values do not produce false gateway-token-mismatch failures after setup. Fixes #72203. Thanks @galiniliev. diff --git a/extensions/active-memory/index.ts b/extensions/active-memory/index.ts index d9f552d1a09..0677dc14724 100644 --- a/extensions/active-memory/index.ts +++ b/extensions/active-memory/index.ts @@ -1879,7 +1879,7 @@ async function maybeResolveActiveRecall(params: { if (controller.signal.aborted) { const result: ActiveRecallResult = { status: "timeout", - elapsedMs: Date.now() - startedAt, + elapsedMs: params.config.timeoutMs, summary: null, }; if (params.config.logging) { diff --git a/src/agents/anthropic-transport-stream.live.test.ts b/src/agents/anthropic-transport-stream.live.test.ts new file mode 100644 index 00000000000..13cec1f4902 --- /dev/null +++ b/src/agents/anthropic-transport-stream.live.test.ts @@ -0,0 +1,132 @@ +import http from "node:http"; +import type { Model } from "@mariozechner/pi-ai"; +import { describe, expect, it } from "vitest"; +import { createAnthropicMessagesTransportStreamFn } from "./anthropic-transport-stream.js"; +import { isLiveTestEnabled } from "./live-test-helpers.js"; + +const LIVE = isLiveTestEnabled(["ANTHROPIC_TRANSPORT_LIVE_TEST"]); +const describeLive = LIVE ? describe : describe.skip; + +type AnthropicMessagesModel = Model<"anthropic-messages">; +type AnthropicStreamFn = ReturnType; +type AnthropicStreamContext = Parameters[1]; +type AnthropicStreamOptions = Parameters[2]; + +function delay(ms: number, value: T): Promise { + return new Promise((resolve) => { + setTimeout(() => resolve(value), ms); + }); +} + +function waitForServerListening(server: http.Server): Promise { + return new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + server.off("error", reject); + const address = server.address(); + if (!address || typeof address === "string") { + reject(new Error("Expected loopback server to listen on a TCP port")); + return; + } + resolve(address.port); + }); + }); +} + +async function closeServer(server: http.Server): Promise { + if (!server.listening) { + return; + } + await new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +} + +async function readRequestBody(request: http.IncomingMessage): Promise { + const chunks: Buffer[] = []; + for await (const chunk of request) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks).toString("utf8"); +} + +describeLive("anthropic transport stream live", () => { + it("cancels an in-flight SSE body read over a real HTTP stream", async () => { + const controller = new AbortController(); + const abortReason = new Error("live anthropic stream abort"); + let requestBody = ""; + let requestClosed = false; + let resolveRequestClosed: (() => void) | undefined; + const requestClosedPromise = new Promise((resolve) => { + resolveRequestClosed = resolve; + }); + + const server = http.createServer((request, response) => { + request.on("close", () => { + requestClosed = true; + resolveRequestClosed?.(); + }); + void readRequestBody(request).then((body) => { + requestBody = body; + response.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + }); + response.write( + 'data: {"type":"message_start","message":{"id":"msg_live","usage":{"input_tokens":1,"output_tokens":0}}}\n\n', + ); + }); + }); + + const port = await waitForServerListening(server); + try { + setTimeout(() => controller.abort(abortReason), 50); + const model: AnthropicMessagesModel = { + id: "claude-sonnet-4-6", + name: "Claude Sonnet 4.6", + api: "anthropic-messages", + provider: "anthropic", + baseUrl: `http://127.0.0.1:${port}/v1`, + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + }; + const streamFn = createAnthropicMessagesTransportStreamFn(); + const stream = await Promise.resolve( + streamFn( + model, + { messages: [{ role: "user", content: "hello" }] } as AnthropicStreamContext, + { + apiKey: "sk-ant-live-transport-test", + signal: controller.signal, + } as AnthropicStreamOptions, + ), + ); + + const timedOut = Symbol("timed out"); + const result = await Promise.race([stream.result(), delay(1_000, timedOut)]); + if (result === timedOut) { + throw new Error("Anthropic live SSE stream did not abort within 1000ms"); + } + await Promise.race([requestClosedPromise, delay(1_000, undefined)]); + + expect(result.stopReason).toBe("aborted"); + expect(result.errorMessage).toBe("live anthropic stream abort"); + expect(requestClosed).toBe(true); + expect(JSON.parse(requestBody)).toMatchObject({ + model: "claude-sonnet-4-6", + stream: true, + }); + } finally { + await closeServer(server); + } + }, 10_000); +}); diff --git a/src/agents/anthropic-transport-stream.test.ts b/src/agents/anthropic-transport-stream.test.ts index a07cd59ea8b..d6cc77f4b95 100644 --- a/src/agents/anthropic-transport-stream.test.ts +++ b/src/agents/anthropic-transport-stream.test.ts @@ -27,6 +27,32 @@ function createSseResponse(events: Record[] = []): Response { }); } +function createStalledSseResponse(params: { onCancel: (reason: unknown) => void }): Response { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + 'data: {"type":"message_start","message":{"id":"msg_1","usage":{"input_tokens":1,"output_tokens":0}}}\n\n', + ), + ); + }, + cancel(reason) { + params.onCancel(reason); + }, + }); + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function delay(ms: number, value: T): Promise { + return new Promise((resolve) => { + setTimeout(() => resolve(value), ms); + }); +} + function latestAnthropicRequest() { const [, init] = guardedFetchMock.mock.calls.at(-1) ?? []; const body = init?.body; @@ -514,6 +540,64 @@ describe("anthropic transport stream", () => { ); }); + it("cancels stalled SSE body reads when the abort signal fires mid-stream", async () => { + const controller = new AbortController(); + const abortReason = new Error("anthropic test abort"); + let cancelReason: unknown; + guardedFetchMock.mockResolvedValueOnce( + createStalledSseResponse({ + onCancel: (reason) => { + cancelReason = reason; + }, + }), + ); + + setTimeout(() => controller.abort(abortReason), 50); + + const timedOut = Symbol("timed out"); + const startedAt = Date.now(); + const result = await Promise.race([ + runTransportStream( + makeAnthropicTransportModel(), + { messages: [{ role: "user", content: "hello" }] } as AnthropicStreamContext, + { apiKey: "sk-ant-api", signal: controller.signal } as AnthropicStreamOptions, + ), + delay(1_000, timedOut), + ]); + + if (result === timedOut) { + throw new Error("Anthropic SSE stream did not abort within 1000ms"); + } + expect(Date.now() - startedAt).toBeLessThan(1_000); + expect(result.stopReason).toBe("aborted"); + expect(result.errorMessage).toBe("anthropic test abort"); + expect(cancelReason).toBe(abortReason); + }); + + it("treats already-aborted signals as abort errors before reading SSE chunks", async () => { + const controller = new AbortController(); + const abortReason = new Error("pre-aborted stream"); + let cancelReason: unknown; + guardedFetchMock.mockResolvedValueOnce( + createStalledSseResponse({ + onCancel: (reason) => { + cancelReason = reason; + }, + }), + ); + controller.abort(abortReason); + + const result = await runTransportStream( + makeAnthropicTransportModel(), + { messages: [{ role: "user", content: "hello" }] } as AnthropicStreamContext, + { apiKey: "sk-ant-api", signal: controller.signal } as AnthropicStreamOptions, + ); + + expect(result.stopReason).toBe("aborted"); + expect(result.errorMessage).toBe("pre-aborted stream"); + expect(cancelReason).toBe(abortReason); + }); + it("maps adaptive thinking effort for Claude 4.6 transport runs", async () => { const model = makeAnthropicTransportModel({ id: "claude-opus-4-6", diff --git a/src/agents/anthropic-transport-stream.ts b/src/agents/anthropic-transport-stream.ts index a91c8ee3fbd..d07d8a1da27 100644 --- a/src/agents/anthropic-transport-stream.ts +++ b/src/agents/anthropic-transport-stream.ts @@ -452,15 +452,76 @@ function resolveAnthropicMessagesUrl(baseUrl?: string): string { return normalized.endsWith("/v1") ? `${normalized}/messages` : `${normalized}/v1/messages`; } +function createAbortError(signal: AbortSignal): Error { + const reason = signal.reason; + if (reason instanceof Error) { + return reason; + } + const error = + reason === undefined + ? new Error("Request was aborted") + : new Error("Request was aborted", { cause: reason }); + error.name = "AbortError"; + return error; +} + +function readAnthropicSseChunk( + reader: ReadableStreamDefaultReader, + signal?: AbortSignal, +): Promise> { + if (!signal) { + return reader.read(); + } + + return new Promise((resolve, reject) => { + let settled = false; + const onAbort = () => { + if (settled) { + return; + } + settled = true; + signal.removeEventListener("abort", onAbort); + reader.cancel(signal.reason).catch(() => undefined); + reject(createAbortError(signal)); + }; + + if (signal.aborted) { + onAbort(); + return; + } + + signal.addEventListener("abort", onAbort, { once: true }); + reader.read().then( + (result) => { + if (settled) { + return; + } + settled = true; + signal.removeEventListener("abort", onAbort); + resolve(result); + }, + (error: unknown) => { + if (settled) { + return; + } + settled = true; + signal.removeEventListener("abort", onAbort); + reject(error); + }, + ); + }); +} + async function* parseAnthropicSseBody( body: ReadableStream, + signal?: AbortSignal, ): AsyncIterable> { const reader = body.getReader(); const decoder = new TextDecoder(); let buffer = ""; try { while (true) { - const { done, value } = await reader.read(); + const { done, value } = await readAnthropicSseChunk(reader, signal); if (done) { break; } @@ -531,7 +592,7 @@ function createAnthropicMessagesClient(params: { if (!response.body) { return; } - yield* parseAnthropicSseBody(response.body); + yield* parseAnthropicSseBody(response.body, options?.signal); }, }, };