test(agents): stabilize anthropic abort live harness

This commit is contained in:
Vincent Koc
2026-05-03 15:52:10 -07:00
parent 9ba7183b63
commit d62fb9eac3

View File

@@ -62,22 +62,12 @@ describeLive("anthropic transport stream live", () => {
const abortReason = new Error("live anthropic stream abort");
let requestBody = "";
let requestBodyPromise: Promise<string> | undefined;
let connectionClosed = false;
let resolveConnectionClosed: (() => void) | undefined;
const connectionClosedPromise = new Promise<void>((resolve) => {
resolveConnectionClosed = resolve;
let resolveResponseStarted: (() => void) | undefined;
const responseStartedPromise = new Promise<void>((resolve) => {
resolveResponseStarted = resolve;
});
const server = http.createServer((request, response) => {
const markConnectionClosed = () => {
connectionClosed = true;
resolveConnectionClosed?.();
};
request.on("aborted", markConnectionClosed);
request.on("close", markConnectionClosed);
response.on("close", () => {
markConnectionClosed();
});
requestBodyPromise = readRequestBody(request).then((body) => {
requestBody = body;
response.writeHead(200, {
@@ -87,13 +77,13 @@ describeLive("anthropic transport stream live", () => {
response.write(
'data: {"type":"message_start","message":{"id":"msg_live","usage":{"input_tokens":1,"output_tokens":0}}}\n\n',
);
resolveResponseStarted?.();
return body;
});
});
const port = await waitForServerListening(server);
try {
setTimeout(() => controller.abort(abortReason), 50);
const model: AnthropicMessagesModel = {
id: "claude-sonnet-4-6",
name: "Claude Sonnet 4.6",
@@ -118,19 +108,21 @@ describeLive("anthropic transport stream live", () => {
),
);
const responseStarted = await Promise.race([
responseStartedPromise.then(() => true),
delay(1_000, false),
]);
expect(responseStarted).toBe(true);
controller.abort(abortReason);
const timedOut = Symbol("timed out");
const result = await Promise.race([stream.result(), delay(1_000, timedOut)]);
if (result === timedOut) {
throw new Error("Anthropic live SSE stream did not abort within 1000ms");
}
const observedConnectionClose = await Promise.race([
connectionClosedPromise.then(() => true),
delay(2_000, false),
]);
expect(result.stopReason).toBe("aborted");
expect(result.errorMessage).toBe("live anthropic stream abort");
expect(observedConnectionClose || connectionClosed).toBe(true);
const capturedRequestBody = requestBodyPromise
? await Promise.race([requestBodyPromise, delay(500, requestBody)])
: requestBody;
@@ -141,6 +133,9 @@ describeLive("anthropic transport stream live", () => {
});
}
} finally {
if (!controller.signal.aborted) {
controller.abort(abortReason);
}
await closeServer(server);
}
}, 10_000);