From d62fb9eac315148ad2891169157b89bd9fe0a27e Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 3 May 2026 15:52:10 -0700 Subject: [PATCH] test(agents): stabilize anthropic abort live harness --- .../anthropic-transport-stream.live.test.ts | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/src/agents/anthropic-transport-stream.live.test.ts b/src/agents/anthropic-transport-stream.live.test.ts index d61e612ceb5..c861d0cfcd7 100644 --- a/src/agents/anthropic-transport-stream.live.test.ts +++ b/src/agents/anthropic-transport-stream.live.test.ts @@ -62,22 +62,12 @@ describeLive("anthropic transport stream live", () => { const abortReason = new Error("live anthropic stream abort"); let requestBody = ""; let requestBodyPromise: Promise | undefined; - let connectionClosed = false; - let resolveConnectionClosed: (() => void) | undefined; - const connectionClosedPromise = new Promise((resolve) => { - resolveConnectionClosed = resolve; + let resolveResponseStarted: (() => void) | undefined; + const responseStartedPromise = new Promise((resolve) => { + resolveResponseStarted = resolve; }); const server = http.createServer((request, response) => { - const markConnectionClosed = () => { - connectionClosed = true; - resolveConnectionClosed?.(); - }; - request.on("aborted", markConnectionClosed); - request.on("close", markConnectionClosed); - response.on("close", () => { - markConnectionClosed(); - }); requestBodyPromise = readRequestBody(request).then((body) => { requestBody = body; response.writeHead(200, { @@ -87,13 +77,13 @@ describeLive("anthropic transport stream live", () => { response.write( 'data: {"type":"message_start","message":{"id":"msg_live","usage":{"input_tokens":1,"output_tokens":0}}}\n\n', ); + resolveResponseStarted?.(); return body; }); }); const port = await waitForServerListening(server); try { - setTimeout(() => controller.abort(abortReason), 50); const model: AnthropicMessagesModel = { id: "claude-sonnet-4-6", name: "Claude Sonnet 4.6", @@ -118,19 +108,21 @@ describeLive("anthropic transport stream live", () => { ), ); + const responseStarted = await Promise.race([ + responseStartedPromise.then(() => true), + delay(1_000, false), + ]); + expect(responseStarted).toBe(true); + controller.abort(abortReason); + const timedOut = Symbol("timed out"); const result = await Promise.race([stream.result(), delay(1_000, timedOut)]); if (result === timedOut) { throw new Error("Anthropic live SSE stream did not abort within 1000ms"); } - const observedConnectionClose = await Promise.race([ - connectionClosedPromise.then(() => true), - delay(2_000, false), - ]); expect(result.stopReason).toBe("aborted"); expect(result.errorMessage).toBe("live anthropic stream abort"); - expect(observedConnectionClose || connectionClosed).toBe(true); const capturedRequestBody = requestBodyPromise ? await Promise.race([requestBodyPromise, delay(500, requestBody)]) : requestBody; @@ -141,6 +133,9 @@ describeLive("anthropic transport stream live", () => { }); } } finally { + if (!controller.signal.aborted) { + controller.abort(abortReason); + } await closeServer(server); } }, 10_000);