mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-17 20:21:13 +00:00
fix: abort HTTP gateway turns on client disconnect (#54388) (thanks @Lellansin)
* fix: abort in-flight HTTP requests on client disconnect Abort running agent commands when the HTTP client disconnects for both /v1/chat/completions and /v1/responses endpoints. - Listen on res "close" instead of req "close" (the request body is already consumed so IncomingMessage auto-destroys before we get here). - Non-streaming: guard with !signal.aborted so the abort fires on genuine disconnects; a spurious abort after sendJson is harmless. - Streaming: guard with !closed so normal res.end() completions do not abort post-turn work still in flight. - Skip error logging and response writes when the signal is already aborted. Made-with: Cursor * fix: correct event listener name and improve error handling in HTTP requests Updated the event listener for client disconnects to use the correct name and enhanced error handling logic. The changes ensure that abort signals are properly checked before logging errors and returning responses, preventing unnecessary operations on aborted requests. Made-with: Cursor * fix: use correct 'close' event name for non-streaming disconnect handler * fix: watch socket close for HTTP aborts --------- Co-authored-by: 冰森 <dingheng.huang@urbanic.com> Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -106,3 +106,35 @@ export function setSseHeaders(res: ServerResponse) {
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.flushHeaders?.();
|
||||
}
|
||||
|
||||
export function watchClientDisconnect(
|
||||
req: IncomingMessage,
|
||||
res: ServerResponse,
|
||||
abortController: AbortController,
|
||||
onDisconnect?: () => void,
|
||||
) {
|
||||
const sockets = Array.from(
|
||||
new Set(
|
||||
[req.socket, res.socket].filter(
|
||||
(socket): socket is NonNullable<typeof socket> => socket !== null,
|
||||
),
|
||||
),
|
||||
);
|
||||
if (sockets.length === 0) {
|
||||
return () => {};
|
||||
}
|
||||
const handleClose = () => {
|
||||
onDisconnect?.();
|
||||
if (!abortController.signal.aborted) {
|
||||
abortController.abort();
|
||||
}
|
||||
};
|
||||
for (const socket of sockets) {
|
||||
socket.on("close", handleClose);
|
||||
}
|
||||
return () => {
|
||||
for (const socket of sockets) {
|
||||
socket.off("close", handleClose);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs/promises";
|
||||
import http from "node:http";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, describe, expect, it } from "vitest";
|
||||
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
|
||||
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
@@ -881,4 +882,109 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
|
||||
await server.close({ reason: "openai token auth owner test done" });
|
||||
}
|
||||
});
|
||||
|
||||
it("aborts agent command when streaming client disconnects", { timeout: 15_000 }, async () => {
|
||||
const port = enabledPort;
|
||||
let serverAbortSignal: AbortSignal | undefined;
|
||||
|
||||
agentCommand.mockClear();
|
||||
agentCommand.mockImplementationOnce(
|
||||
(opts: unknown) =>
|
||||
new Promise<undefined>((resolve) => {
|
||||
const signal = (opts as { abortSignal?: AbortSignal } | undefined)?.abortSignal;
|
||||
serverAbortSignal = signal;
|
||||
if (signal?.aborted) {
|
||||
resolve(undefined);
|
||||
return;
|
||||
}
|
||||
signal?.addEventListener("abort", () => resolve(undefined), { once: true });
|
||||
}),
|
||||
);
|
||||
|
||||
const clientReq = http.request({
|
||||
hostname: "127.0.0.1",
|
||||
port,
|
||||
path: "/v1/chat/completions",
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
authorization: "Bearer secret",
|
||||
},
|
||||
});
|
||||
clientReq.on("error", () => {});
|
||||
clientReq.end(
|
||||
JSON.stringify({
|
||||
stream: true,
|
||||
model: "openclaw",
|
||||
messages: [{ role: "user", content: "hi" }],
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(agentCommand).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
clientReq.destroy();
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(serverAbortSignal?.aborted).toBe(true);
|
||||
},
|
||||
{ timeout: 5_000, interval: 50 },
|
||||
);
|
||||
});
|
||||
|
||||
it(
|
||||
"aborts agent command when non-streaming client disconnects",
|
||||
{ timeout: 15_000 },
|
||||
async () => {
|
||||
const port = enabledPort;
|
||||
let serverAbortSignal: AbortSignal | undefined;
|
||||
|
||||
agentCommand.mockClear();
|
||||
agentCommand.mockImplementationOnce(
|
||||
(opts: unknown) =>
|
||||
new Promise<undefined>((resolve) => {
|
||||
const signal = (opts as { abortSignal?: AbortSignal } | undefined)?.abortSignal;
|
||||
serverAbortSignal = signal;
|
||||
if (signal?.aborted) {
|
||||
resolve(undefined);
|
||||
return;
|
||||
}
|
||||
signal?.addEventListener("abort", () => resolve(undefined), { once: true });
|
||||
}),
|
||||
);
|
||||
|
||||
const clientReq = http.request({
|
||||
hostname: "127.0.0.1",
|
||||
port,
|
||||
path: "/v1/chat/completions",
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
authorization: "Bearer secret",
|
||||
},
|
||||
});
|
||||
clientReq.on("error", () => {});
|
||||
clientReq.end(
|
||||
JSON.stringify({
|
||||
model: "openclaw",
|
||||
messages: [{ role: "user", content: "hi" }],
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(agentCommand).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
clientReq.destroy();
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(serverAbortSignal?.aborted).toBe(true);
|
||||
},
|
||||
{ timeout: 5_000, interval: 50 },
|
||||
);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -25,7 +25,7 @@ import {
|
||||
} from "./agent-prompt.js";
|
||||
import type { AuthRateLimiter } from "./auth-rate-limit.js";
|
||||
import type { ResolvedGatewayAuth } from "./auth.js";
|
||||
import { sendJson, setSseHeaders, writeDone } from "./http-common.js";
|
||||
import { sendJson, setSseHeaders, watchClientDisconnect, writeDone } from "./http-common.js";
|
||||
import { handleGatewayPostJsonEndpoint } from "./http-endpoint-helpers.js";
|
||||
import {
|
||||
resolveGatewayRequestContext,
|
||||
@@ -112,6 +112,7 @@ function buildAgentCommandInput(params: {
|
||||
runId: string;
|
||||
messageChannel: string;
|
||||
senderIsOwner: boolean;
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
return {
|
||||
message: params.prompt.message,
|
||||
@@ -125,6 +126,7 @@ function buildAgentCommandInput(params: {
|
||||
bestEffortDeliver: false as const,
|
||||
senderIsOwner: params.senderIsOwner,
|
||||
allowModelOverride: true as const,
|
||||
abortSignal: params.abortSignal,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -493,6 +495,7 @@ export async function handleOpenAiHttpRequest(
|
||||
|
||||
const runId = `chatcmpl_${randomUUID()}`;
|
||||
const deps = createDefaultDeps();
|
||||
const abortController = new AbortController();
|
||||
const commandInput = buildAgentCommandInput({
|
||||
prompt: {
|
||||
message: prompt.message,
|
||||
@@ -503,13 +506,19 @@ export async function handleOpenAiHttpRequest(
|
||||
sessionKey,
|
||||
runId,
|
||||
messageChannel,
|
||||
abortSignal: abortController.signal,
|
||||
senderIsOwner,
|
||||
});
|
||||
|
||||
if (!stream) {
|
||||
const stopWatchingDisconnect = watchClientDisconnect(req, res, abortController);
|
||||
try {
|
||||
const result = await agentCommandFromIngress(commandInput, defaultRuntime, deps);
|
||||
|
||||
if (abortController.signal.aborted) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const content = resolveAgentResponseText(result);
|
||||
|
||||
sendJson(res, 200, {
|
||||
@@ -527,10 +536,15 @@ export async function handleOpenAiHttpRequest(
|
||||
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
|
||||
});
|
||||
} catch (err) {
|
||||
if (abortController.signal.aborted) {
|
||||
return true;
|
||||
}
|
||||
logWarn(`openai-compat: chat completion failed: ${String(err)}`);
|
||||
sendJson(res, 500, {
|
||||
error: { message: "internal error", type: "api_error" },
|
||||
});
|
||||
} finally {
|
||||
stopWatchingDisconnect();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -540,6 +554,7 @@ export async function handleOpenAiHttpRequest(
|
||||
let wroteRole = false;
|
||||
let sawAssistantDelta = false;
|
||||
let closed = false;
|
||||
let stopWatchingDisconnect = () => {};
|
||||
|
||||
const unsubscribe = onAgentEvent((evt) => {
|
||||
if (evt.runId !== runId) {
|
||||
@@ -574,6 +589,7 @@ export async function handleOpenAiHttpRequest(
|
||||
const phase = evt.data?.phase;
|
||||
if (phase === "end" || phase === "error") {
|
||||
closed = true;
|
||||
stopWatchingDisconnect();
|
||||
unsubscribe();
|
||||
writeDone(res);
|
||||
res.end();
|
||||
@@ -581,7 +597,7 @@ export async function handleOpenAiHttpRequest(
|
||||
}
|
||||
});
|
||||
|
||||
req.on("close", () => {
|
||||
stopWatchingDisconnect = watchClientDisconnect(req, res, abortController, () => {
|
||||
closed = true;
|
||||
unsubscribe();
|
||||
});
|
||||
@@ -611,10 +627,10 @@ export async function handleOpenAiHttpRequest(
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
logWarn(`openai-compat: streaming chat completion failed: ${String(err)}`);
|
||||
if (closed) {
|
||||
if (closed || abortController.signal.aborted) {
|
||||
return;
|
||||
}
|
||||
logWarn(`openai-compat: streaming chat completion failed: ${String(err)}`);
|
||||
writeAssistantContentChunk(res, {
|
||||
runId,
|
||||
model,
|
||||
@@ -629,6 +645,7 @@ export async function handleOpenAiHttpRequest(
|
||||
} finally {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
stopWatchingDisconnect();
|
||||
unsubscribe();
|
||||
writeDone(res);
|
||||
res.end();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs/promises";
|
||||
import http from "node:http";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
|
||||
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
@@ -1169,4 +1170,109 @@ describe("OpenResponses HTTP API (e2e)", () => {
|
||||
await capServer.close({ reason: "responses url cap hardening test done" });
|
||||
}
|
||||
});
|
||||
|
||||
it("aborts agent command when streaming client disconnects", { timeout: 15_000 }, async () => {
|
||||
const port = enabledPort;
|
||||
let serverAbortSignal: AbortSignal | undefined;
|
||||
|
||||
agentCommand.mockClear();
|
||||
agentCommand.mockImplementationOnce(
|
||||
(opts: unknown) =>
|
||||
new Promise<undefined>((resolve) => {
|
||||
const signal = (opts as { abortSignal?: AbortSignal } | undefined)?.abortSignal;
|
||||
serverAbortSignal = signal;
|
||||
if (signal?.aborted) {
|
||||
resolve(undefined);
|
||||
return;
|
||||
}
|
||||
signal?.addEventListener("abort", () => resolve(undefined), { once: true });
|
||||
}),
|
||||
);
|
||||
|
||||
const clientReq = http.request({
|
||||
hostname: "127.0.0.1",
|
||||
port,
|
||||
path: "/v1/responses",
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
authorization: "Bearer secret",
|
||||
},
|
||||
});
|
||||
clientReq.on("error", () => {});
|
||||
clientReq.end(
|
||||
JSON.stringify({
|
||||
stream: true,
|
||||
model: "openclaw",
|
||||
input: "hi",
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(agentCommand).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
clientReq.destroy();
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(serverAbortSignal?.aborted).toBe(true);
|
||||
},
|
||||
{ timeout: 5_000, interval: 50 },
|
||||
);
|
||||
});
|
||||
|
||||
it(
|
||||
"aborts agent command when non-streaming client disconnects",
|
||||
{ timeout: 15_000 },
|
||||
async () => {
|
||||
const port = enabledPort;
|
||||
let serverAbortSignal: AbortSignal | undefined;
|
||||
|
||||
agentCommand.mockClear();
|
||||
agentCommand.mockImplementationOnce(
|
||||
(opts: unknown) =>
|
||||
new Promise<undefined>((resolve) => {
|
||||
const signal = (opts as { abortSignal?: AbortSignal } | undefined)?.abortSignal;
|
||||
serverAbortSignal = signal;
|
||||
if (signal?.aborted) {
|
||||
resolve(undefined);
|
||||
return;
|
||||
}
|
||||
signal?.addEventListener("abort", () => resolve(undefined), { once: true });
|
||||
}),
|
||||
);
|
||||
|
||||
const clientReq = http.request({
|
||||
hostname: "127.0.0.1",
|
||||
port,
|
||||
path: "/v1/responses",
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
authorization: "Bearer secret",
|
||||
},
|
||||
});
|
||||
clientReq.on("error", () => {});
|
||||
clientReq.end(
|
||||
JSON.stringify({
|
||||
model: "openclaw",
|
||||
input: "hi",
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(agentCommand).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
clientReq.destroy();
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(serverAbortSignal?.aborted).toBe(true);
|
||||
},
|
||||
{ timeout: 5_000, interval: 50 },
|
||||
);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -34,7 +34,7 @@ import { wrapExternalContent } from "../security/external-content.js";
|
||||
import { resolveAssistantStreamDeltaText } from "./agent-event-assistant-text.js";
|
||||
import type { AuthRateLimiter } from "./auth-rate-limit.js";
|
||||
import type { ResolvedGatewayAuth } from "./auth.js";
|
||||
import { sendJson, setSseHeaders, writeDone } from "./http-common.js";
|
||||
import { sendJson, setSseHeaders, watchClientDisconnect, writeDone } from "./http-common.js";
|
||||
import { handleGatewayPostJsonEndpoint } from "./http-endpoint-helpers.js";
|
||||
import {
|
||||
getBearerToken,
|
||||
@@ -407,6 +407,7 @@ async function runResponsesAgentCommand(params: {
|
||||
messageChannel: string;
|
||||
senderIsOwner: boolean;
|
||||
deps: ReturnType<typeof createDefaultDeps>;
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
return agentCommandFromIngress(
|
||||
{
|
||||
@@ -423,6 +424,7 @@ async function runResponsesAgentCommand(params: {
|
||||
bestEffortDeliver: false,
|
||||
senderIsOwner: params.senderIsOwner,
|
||||
allowModelOverride: true,
|
||||
abortSignal: params.abortSignal,
|
||||
},
|
||||
defaultRuntime,
|
||||
params.deps,
|
||||
@@ -675,12 +677,14 @@ export async function handleOpenResponsesHttpRequest(
|
||||
storeResponseSession(responseId, sessionKey, responseSessionScope);
|
||||
const outputItemId = `msg_${randomUUID()}`;
|
||||
const deps = createDefaultDeps();
|
||||
const abortController = new AbortController();
|
||||
const streamParams =
|
||||
typeof payload.max_output_tokens === "number"
|
||||
? { maxTokens: payload.max_output_tokens }
|
||||
: undefined;
|
||||
|
||||
if (!stream) {
|
||||
const stopWatchingDisconnect = watchClientDisconnect(req, res, abortController);
|
||||
try {
|
||||
const result = await runResponsesAgentCommand({
|
||||
message: prompt.message,
|
||||
@@ -694,8 +698,13 @@ export async function handleOpenResponsesHttpRequest(
|
||||
messageChannel,
|
||||
senderIsOwner,
|
||||
deps,
|
||||
abortSignal: abortController.signal,
|
||||
});
|
||||
|
||||
if (abortController.signal.aborted) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const payloads = (result as { payloads?: Array<{ text?: string }> } | null)?.payloads;
|
||||
const usage = extractUsageFromResult(result);
|
||||
const meta = (result as { meta?: unknown } | null)?.meta;
|
||||
@@ -772,6 +781,9 @@ export async function handleOpenResponsesHttpRequest(
|
||||
rememberResponseSession();
|
||||
sendJson(res, 200, response);
|
||||
} catch (err) {
|
||||
if (abortController.signal.aborted) {
|
||||
return true;
|
||||
}
|
||||
logWarn(`openresponses: non-stream response failed: ${String(err)}`);
|
||||
const response = createResponseResource({
|
||||
id: responseId,
|
||||
@@ -782,6 +794,8 @@ export async function handleOpenResponsesHttpRequest(
|
||||
});
|
||||
rememberResponseSession();
|
||||
sendJson(res, 500, response);
|
||||
} finally {
|
||||
stopWatchingDisconnect();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -796,6 +810,7 @@ export async function handleOpenResponsesHttpRequest(
|
||||
let sawAssistantDelta = false;
|
||||
let closed = false;
|
||||
let unsubscribe = () => {};
|
||||
let stopWatchingDisconnect = () => {};
|
||||
let finalUsage: Usage | undefined;
|
||||
let finalizeRequested: { status: ResponseResource["status"]; text: string } | null = null;
|
||||
|
||||
@@ -812,6 +827,7 @@ export async function handleOpenResponsesHttpRequest(
|
||||
const usage = finalUsage;
|
||||
|
||||
closed = true;
|
||||
stopWatchingDisconnect();
|
||||
unsubscribe();
|
||||
|
||||
writeSseEvent(res, {
|
||||
@@ -940,7 +956,7 @@ export async function handleOpenResponsesHttpRequest(
|
||||
}
|
||||
});
|
||||
|
||||
req.on("close", () => {
|
||||
stopWatchingDisconnect = watchClientDisconnect(req, res, abortController, () => {
|
||||
closed = true;
|
||||
unsubscribe();
|
||||
});
|
||||
@@ -959,6 +975,7 @@ export async function handleOpenResponsesHttpRequest(
|
||||
messageChannel,
|
||||
senderIsOwner,
|
||||
deps,
|
||||
abortSignal: abortController.signal,
|
||||
});
|
||||
|
||||
finalUsage = extractUsageFromResult(result);
|
||||
@@ -1046,6 +1063,7 @@ export async function handleOpenResponsesHttpRequest(
|
||||
usage,
|
||||
});
|
||||
closed = true;
|
||||
stopWatchingDisconnect();
|
||||
unsubscribe();
|
||||
rememberResponseSession();
|
||||
writeSseEvent(res, { type: "response.completed", response: incompleteResponse });
|
||||
@@ -1083,10 +1101,10 @@ export async function handleOpenResponsesHttpRequest(
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
logWarn(`openresponses: streaming response failed: ${String(err)}`);
|
||||
if (closed) {
|
||||
if (closed || abortController.signal.aborted) {
|
||||
return;
|
||||
}
|
||||
logWarn(`openresponses: streaming response failed: ${String(err)}`);
|
||||
|
||||
finalUsage = finalUsage ?? createEmptyUsage();
|
||||
const errorResponse = createResponseResource({
|
||||
|
||||
Reference in New Issue
Block a user