mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:40:43 +00:00
refactor: clarify APNs proxy tunnel flow
This commit is contained in:
@@ -114,7 +114,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
const { openHttpConnectTunnel } = await import("./http-connect-tunnel.js");
|
||||
|
||||
const result = await openHttpConnectTunnel({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
proxyUrl: new URL("http://proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
timeoutMs: 10_000,
|
||||
@@ -146,7 +146,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
const { openHttpConnectTunnel } = await import("./http-connect-tunnel.js");
|
||||
|
||||
await openHttpConnectTunnel({
|
||||
proxyUrl: "https://proxy.example:8443",
|
||||
proxyUrl: new URL("https://proxy.example:8443"),
|
||||
targetHost: "api.sandbox.push.apple.com",
|
||||
targetPort: 443,
|
||||
});
|
||||
@@ -171,7 +171,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
|
||||
await expect(
|
||||
openHttpConnectTunnel({
|
||||
proxyUrl: "http://user:secret@proxy.example:8080",
|
||||
proxyUrl: new URL("http://user:secret@proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
}),
|
||||
@@ -192,7 +192,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
let caught: unknown;
|
||||
try {
|
||||
await openHttpConnectTunnel({
|
||||
proxyUrl: "http://user:secret@proxy.example:8080/?token=hidden#fragment",
|
||||
proxyUrl: new URL("http://user:secret@proxy.example:8080/?token=hidden#fragment"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
});
|
||||
@@ -219,7 +219,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
|
||||
await expect(
|
||||
openHttpConnectTunnel({
|
||||
proxyUrl: "http://%E0%A4%A@proxy.example:8080",
|
||||
proxyUrl: new URL("http://%E0%A4%A@proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
}),
|
||||
@@ -234,7 +234,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
|
||||
await expect(
|
||||
openHttpConnectTunnel({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
proxyUrl: new URL("http://proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
}),
|
||||
@@ -253,7 +253,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
|
||||
let resolved = false;
|
||||
const tunnel = openHttpConnectTunnel({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
proxyUrl: new URL("http://proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
}).then((socket) => {
|
||||
@@ -278,7 +278,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
|
||||
await expect(
|
||||
openHttpConnectTunnel({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
proxyUrl: new URL("http://proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
}),
|
||||
@@ -295,7 +295,7 @@ describe("openHttpConnectTunnel", () => {
|
||||
|
||||
await expect(
|
||||
openHttpConnectTunnel({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
proxyUrl: new URL("http://proxy.example:8080"),
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
timeoutMs: 1,
|
||||
|
||||
@@ -2,7 +2,7 @@ import * as net from "node:net";
|
||||
import * as tls from "node:tls";
|
||||
|
||||
export type HttpConnectTunnelParams = {
|
||||
proxyUrl: string;
|
||||
proxyUrl: URL;
|
||||
targetHost: string;
|
||||
targetPort: number;
|
||||
timeoutMs?: number;
|
||||
@@ -10,10 +10,24 @@ export type HttpConnectTunnelParams = {
|
||||
|
||||
const MAX_CONNECT_RESPONSE_HEADER_BYTES = 16 * 1024;
|
||||
|
||||
function redactProxyUrl(proxyUrl: string): string {
|
||||
type ProxySocket = net.Socket | tls.TLSSocket;
|
||||
type ConnectResponseBuffer = Buffer;
|
||||
|
||||
type ProxyConnectReadResult =
|
||||
| {
|
||||
kind: "incomplete";
|
||||
responseBuffer: ConnectResponseBuffer;
|
||||
}
|
||||
| {
|
||||
kind: "complete";
|
||||
responseBuffer: ConnectResponseBuffer;
|
||||
statusLine: string;
|
||||
tunneledBytes: ConnectResponseBuffer | undefined;
|
||||
};
|
||||
|
||||
function redactProxyUrl(proxyUrl: URL): string {
|
||||
try {
|
||||
const parsed = new URL(proxyUrl);
|
||||
return parsed.origin;
|
||||
return proxyUrl.origin;
|
||||
} catch {
|
||||
return "<invalid proxy URL>";
|
||||
}
|
||||
@@ -39,7 +53,7 @@ function resolveProxyAuthorization(proxy: URL): string | undefined {
|
||||
return `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`;
|
||||
}
|
||||
|
||||
function formatTunnelFailure(proxyUrl: string, err: unknown): Error {
|
||||
function formatTunnelFailure(proxyUrl: URL, err: unknown): Error {
|
||||
return new Error(
|
||||
`Proxy CONNECT failed via ${redactProxyUrl(proxyUrl)}: ${err instanceof Error ? err.message : String(err)}`,
|
||||
{ cause: err },
|
||||
@@ -55,195 +69,247 @@ function writeConnectRequest(socket: net.Socket, proxy: URL, target: string): vo
|
||||
socket.write([...headers, "", ""].join("\r\n"));
|
||||
}
|
||||
|
||||
function assertConnectHeaderBytesWithinLimit(size: number): void {
|
||||
if (size > MAX_CONNECT_RESPONSE_HEADER_BYTES) {
|
||||
throw new Error(
|
||||
`Proxy CONNECT response headers exceeded ${MAX_CONNECT_RESPONSE_HEADER_BYTES} bytes`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function readProxyConnectResponse(
|
||||
responseBuffer: ConnectResponseBuffer,
|
||||
chunk: ConnectResponseBuffer,
|
||||
): ProxyConnectReadResult {
|
||||
const nextBuffer = Buffer.concat([responseBuffer, chunk]);
|
||||
const headerEnd = nextBuffer.indexOf("\r\n\r\n");
|
||||
if (headerEnd === -1) {
|
||||
assertConnectHeaderBytesWithinLimit(nextBuffer.length);
|
||||
return { kind: "incomplete", responseBuffer: nextBuffer };
|
||||
}
|
||||
|
||||
const bodyOffset = headerEnd + 4;
|
||||
assertConnectHeaderBytesWithinLimit(bodyOffset);
|
||||
|
||||
const responseHeader = nextBuffer.subarray(0, bodyOffset).toString("latin1");
|
||||
const statusLine = responseHeader.split("\r\n", 1)[0] ?? "";
|
||||
const tunneledBytes =
|
||||
nextBuffer.length > bodyOffset ? nextBuffer.subarray(bodyOffset) : undefined;
|
||||
return {
|
||||
kind: "complete",
|
||||
responseBuffer: nextBuffer,
|
||||
statusLine,
|
||||
tunneledBytes,
|
||||
};
|
||||
}
|
||||
|
||||
function isSuccessfulConnectStatusLine(statusLine: string): boolean {
|
||||
return /^HTTP\/1\.[01] 2\d\d\b/.test(statusLine);
|
||||
}
|
||||
|
||||
function connectToProxy(proxy: URL): ProxySocket {
|
||||
const proxyHost = resolveProxyHost(proxy);
|
||||
const connectOptions = {
|
||||
host: proxyHost,
|
||||
port: resolveProxyPort(proxy),
|
||||
};
|
||||
if (proxy.protocol === "https:") {
|
||||
return tls.connect({
|
||||
...connectOptions,
|
||||
servername: proxyHost,
|
||||
ALPNProtocols: ["http/1.1"],
|
||||
});
|
||||
}
|
||||
return net.connect(connectOptions);
|
||||
}
|
||||
|
||||
class HttpConnectTunnelAttempt {
|
||||
private proxySocket: ProxySocket | undefined;
|
||||
private targetTlsSocket: tls.TLSSocket | undefined;
|
||||
private timeout: NodeJS.Timeout | undefined;
|
||||
private settled = false;
|
||||
private responseBuffer: ConnectResponseBuffer = Buffer.alloc(0);
|
||||
|
||||
constructor(
|
||||
private readonly params: HttpConnectTunnelParams,
|
||||
private readonly proxy: URL,
|
||||
private readonly resolve: (socket: tls.TLSSocket) => void,
|
||||
private readonly reject: (reason?: unknown) => void,
|
||||
) {}
|
||||
|
||||
public start(): void {
|
||||
try {
|
||||
this.startTimeout();
|
||||
this.proxySocket = connectToProxy(this.proxy);
|
||||
this.proxySocket.once(
|
||||
this.proxy.protocol === "https:" ? "secureConnect" : "connect",
|
||||
this.onProxyConnected,
|
||||
);
|
||||
this.proxySocket.on("data", this.onProxyData);
|
||||
this.proxySocket.once("end", this.onProxyClosedBeforeConnect);
|
||||
this.proxySocket.once("error", this.fail);
|
||||
this.proxySocket.once("close", this.onProxyClosedBeforeConnect);
|
||||
} catch (err) {
|
||||
this.fail(err);
|
||||
}
|
||||
}
|
||||
|
||||
private startTimeout(): void {
|
||||
const timeoutMs = this.params.timeoutMs;
|
||||
if (timeoutMs && Number.isFinite(timeoutMs) && timeoutMs > 0) {
|
||||
this.timeout = setTimeout(() => {
|
||||
this.fail(new Error(`Proxy CONNECT timed out after ${Math.trunc(timeoutMs)}ms`));
|
||||
}, Math.trunc(timeoutMs));
|
||||
}
|
||||
}
|
||||
|
||||
private clearTimer(): void {
|
||||
if (this.timeout) {
|
||||
clearTimeout(this.timeout);
|
||||
this.timeout = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private cleanupProxyListeners(): void {
|
||||
const socket = this.proxySocket;
|
||||
if (!socket) {
|
||||
return;
|
||||
}
|
||||
socket.off("data", this.onProxyData);
|
||||
socket.off("end", this.onProxyClosedBeforeConnect);
|
||||
socket.off("error", this.fail);
|
||||
socket.off("close", this.onProxyClosedBeforeConnect);
|
||||
socket.off("connect", this.onProxyConnected);
|
||||
socket.off("secureConnect", this.onProxyConnected);
|
||||
}
|
||||
|
||||
private cleanupTargetTlsListeners(): void {
|
||||
const socket = this.targetTlsSocket;
|
||||
if (!socket) {
|
||||
return;
|
||||
}
|
||||
socket.off("secureConnect", this.onTargetSecureConnect);
|
||||
socket.off("error", this.fail);
|
||||
socket.off("close", this.onTargetTlsClosedBeforeSecureConnect);
|
||||
}
|
||||
|
||||
private readonly fail = (err: unknown): void => {
|
||||
if (this.settled) {
|
||||
return;
|
||||
}
|
||||
this.settled = true;
|
||||
this.clearTimer();
|
||||
this.cleanupProxyListeners();
|
||||
this.cleanupTargetTlsListeners();
|
||||
this.targetTlsSocket?.destroy();
|
||||
this.proxySocket?.destroy();
|
||||
this.reject(formatTunnelFailure(this.params.proxyUrl, err));
|
||||
};
|
||||
|
||||
private succeed(socket: tls.TLSSocket): void {
|
||||
if (this.settled) {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
this.settled = true;
|
||||
this.clearTimer();
|
||||
this.cleanupProxyListeners();
|
||||
this.cleanupTargetTlsListeners();
|
||||
this.resolve(socket);
|
||||
}
|
||||
|
||||
private readonly onProxyConnected = (): void => {
|
||||
const socket = this.proxySocket;
|
||||
if (!socket) {
|
||||
this.fail(new Error("Proxy socket missing after connect"));
|
||||
return;
|
||||
}
|
||||
const target = `${this.params.targetHost}:${this.params.targetPort}`;
|
||||
try {
|
||||
writeConnectRequest(socket, this.proxy, target);
|
||||
} catch (err) {
|
||||
this.fail(err);
|
||||
}
|
||||
};
|
||||
|
||||
private readonly onProxyData = (chunk: Buffer): void => {
|
||||
let result: ProxyConnectReadResult;
|
||||
try {
|
||||
result = readProxyConnectResponse(this.responseBuffer, chunk);
|
||||
} catch (err) {
|
||||
this.fail(err);
|
||||
return;
|
||||
}
|
||||
|
||||
this.responseBuffer = result.responseBuffer;
|
||||
if (result.kind === "incomplete") {
|
||||
return;
|
||||
}
|
||||
|
||||
const socket = this.proxySocket;
|
||||
if (!socket) {
|
||||
this.fail(new Error("Proxy socket missing after CONNECT response"));
|
||||
return;
|
||||
}
|
||||
if (result.tunneledBytes) {
|
||||
socket.unshift(result.tunneledBytes);
|
||||
}
|
||||
if (!isSuccessfulConnectStatusLine(result.statusLine)) {
|
||||
this.fail(new Error(result.statusLine || "Proxy returned an invalid CONNECT response"));
|
||||
return;
|
||||
}
|
||||
|
||||
this.cleanupProxyListeners();
|
||||
this.startTargetTls(socket);
|
||||
};
|
||||
|
||||
private startTargetTls(socket: ProxySocket): void {
|
||||
try {
|
||||
this.targetTlsSocket = tls.connect({
|
||||
socket,
|
||||
servername: this.params.targetHost,
|
||||
ALPNProtocols: ["h2"],
|
||||
});
|
||||
this.targetTlsSocket.once("secureConnect", this.onTargetSecureConnect);
|
||||
this.targetTlsSocket.once("error", this.fail);
|
||||
this.targetTlsSocket.once("close", this.onTargetTlsClosedBeforeSecureConnect);
|
||||
} catch (err) {
|
||||
this.fail(err);
|
||||
}
|
||||
}
|
||||
|
||||
private readonly onTargetSecureConnect = (): void => {
|
||||
const socket = this.targetTlsSocket;
|
||||
if (!socket) {
|
||||
this.fail(new Error("APNs TLS socket missing after secureConnect"));
|
||||
return;
|
||||
}
|
||||
if (socket.alpnProtocol !== "h2") {
|
||||
const negotiated = socket.alpnProtocol || "no ALPN protocol";
|
||||
this.fail(new Error(`APNs TLS tunnel negotiated ${negotiated} instead of h2`));
|
||||
return;
|
||||
}
|
||||
this.succeed(socket);
|
||||
};
|
||||
|
||||
private readonly onTargetTlsClosedBeforeSecureConnect = (): void => {
|
||||
this.fail(new Error("APNs TLS tunnel closed before secureConnect"));
|
||||
};
|
||||
|
||||
private readonly onProxyClosedBeforeConnect = (): void => {
|
||||
this.fail(new Error("Proxy closed before CONNECT response"));
|
||||
};
|
||||
}
|
||||
|
||||
export async function openHttpConnectTunnel(
|
||||
params: HttpConnectTunnelParams,
|
||||
): Promise<tls.TLSSocket> {
|
||||
const proxy = new URL(params.proxyUrl);
|
||||
const proxy = new URL(params.proxyUrl.href);
|
||||
if (proxy.protocol !== "http:" && proxy.protocol !== "https:") {
|
||||
throw new Error(`Unsupported proxy protocol for APNs HTTP/2 CONNECT tunnel: ${proxy.protocol}`);
|
||||
}
|
||||
|
||||
return await new Promise<tls.TLSSocket>((resolve, reject) => {
|
||||
let proxySocket: net.Socket | tls.TLSSocket | undefined;
|
||||
let targetTlsSocket: tls.TLSSocket | undefined;
|
||||
let timeout: NodeJS.Timeout | undefined;
|
||||
let settled = false;
|
||||
let responseBuffer = Buffer.alloc(0);
|
||||
|
||||
const clearTimer = () => {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
timeout = undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const cleanupProxyListeners = () => {
|
||||
proxySocket?.off("data", onData);
|
||||
proxySocket?.off("end", onEnd);
|
||||
proxySocket?.off("error", onError);
|
||||
proxySocket?.off("close", onClose);
|
||||
proxySocket?.off("connect", onConnected);
|
||||
proxySocket?.off("secureConnect", onConnected);
|
||||
};
|
||||
|
||||
const cleanupTargetTlsListeners = () => {
|
||||
targetTlsSocket?.off("secureConnect", onTargetSecureConnect);
|
||||
targetTlsSocket?.off("error", onTargetTlsError);
|
||||
targetTlsSocket?.off("close", onTargetTlsClose);
|
||||
};
|
||||
|
||||
const fail = (err: unknown) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimer();
|
||||
cleanupProxyListeners();
|
||||
cleanupTargetTlsListeners();
|
||||
targetTlsSocket?.destroy();
|
||||
proxySocket?.destroy();
|
||||
reject(formatTunnelFailure(params.proxyUrl, err));
|
||||
};
|
||||
|
||||
const succeed = (socket: tls.TLSSocket) => {
|
||||
if (settled) {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimer();
|
||||
cleanupProxyListeners();
|
||||
cleanupTargetTlsListeners();
|
||||
resolve(socket);
|
||||
};
|
||||
|
||||
function onConnected(): void {
|
||||
if (!proxySocket) {
|
||||
fail(new Error("Proxy socket missing after connect"));
|
||||
return;
|
||||
}
|
||||
const target = `${params.targetHost}:${params.targetPort}`;
|
||||
try {
|
||||
writeConnectRequest(proxySocket, proxy, target);
|
||||
} catch (err) {
|
||||
fail(err);
|
||||
}
|
||||
}
|
||||
|
||||
function onData(chunk: Buffer | string): void {
|
||||
const nextChunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, "latin1");
|
||||
responseBuffer = Buffer.concat([responseBuffer, nextChunk]);
|
||||
const headerEnd = responseBuffer.indexOf("\r\n\r\n");
|
||||
if (headerEnd === -1) {
|
||||
if (responseBuffer.length > MAX_CONNECT_RESPONSE_HEADER_BYTES) {
|
||||
fail(
|
||||
new Error(
|
||||
`Proxy CONNECT response headers exceeded ${MAX_CONNECT_RESPONSE_HEADER_BYTES} bytes`,
|
||||
),
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!proxySocket) {
|
||||
fail(new Error("Proxy socket missing after CONNECT response"));
|
||||
return;
|
||||
}
|
||||
const bodyOffset = headerEnd + 4;
|
||||
if (bodyOffset > MAX_CONNECT_RESPONSE_HEADER_BYTES) {
|
||||
fail(
|
||||
new Error(
|
||||
`Proxy CONNECT response headers exceeded ${MAX_CONNECT_RESPONSE_HEADER_BYTES} bytes`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (responseBuffer.length > bodyOffset) {
|
||||
proxySocket.unshift(responseBuffer.subarray(bodyOffset));
|
||||
}
|
||||
const responseHeader = responseBuffer.subarray(0, bodyOffset).toString("latin1");
|
||||
const statusLine = responseHeader.split("\r\n", 1)[0] ?? "";
|
||||
if (!/^HTTP\/1\.[01] 2\d\d\b/.test(statusLine)) {
|
||||
fail(new Error(statusLine || "Proxy returned an invalid CONNECT response"));
|
||||
return;
|
||||
}
|
||||
|
||||
cleanupProxyListeners();
|
||||
try {
|
||||
targetTlsSocket = tls.connect({
|
||||
socket: proxySocket,
|
||||
servername: params.targetHost,
|
||||
ALPNProtocols: ["h2"],
|
||||
});
|
||||
targetTlsSocket.once("secureConnect", onTargetSecureConnect);
|
||||
targetTlsSocket.once("error", onTargetTlsError);
|
||||
targetTlsSocket.once("close", onTargetTlsClose);
|
||||
} catch (err) {
|
||||
fail(err);
|
||||
}
|
||||
}
|
||||
|
||||
function onTargetSecureConnect(): void {
|
||||
if (!targetTlsSocket) {
|
||||
fail(new Error("APNs TLS socket missing after secureConnect"));
|
||||
return;
|
||||
}
|
||||
if (targetTlsSocket.alpnProtocol !== "h2") {
|
||||
const negotiated = targetTlsSocket.alpnProtocol || "no ALPN protocol";
|
||||
fail(new Error(`APNs TLS tunnel negotiated ${negotiated} instead of h2`));
|
||||
return;
|
||||
}
|
||||
succeed(targetTlsSocket);
|
||||
}
|
||||
|
||||
function onTargetTlsError(err: Error): void {
|
||||
fail(err);
|
||||
}
|
||||
|
||||
function onTargetTlsClose(): void {
|
||||
fail(new Error("APNs TLS tunnel closed before secureConnect"));
|
||||
}
|
||||
|
||||
function onEnd(): void {
|
||||
fail(new Error("Proxy closed before CONNECT response"));
|
||||
}
|
||||
|
||||
function onClose(): void {
|
||||
fail(new Error("Proxy closed before CONNECT response"));
|
||||
}
|
||||
|
||||
function onError(err: Error): void {
|
||||
fail(err);
|
||||
}
|
||||
|
||||
try {
|
||||
if (params.timeoutMs && Number.isFinite(params.timeoutMs) && params.timeoutMs > 0) {
|
||||
timeout = setTimeout(() => {
|
||||
fail(new Error(`Proxy CONNECT timed out after ${Math.trunc(params.timeoutMs ?? 0)}ms`));
|
||||
}, Math.trunc(params.timeoutMs));
|
||||
}
|
||||
|
||||
const proxyHost = resolveProxyHost(proxy);
|
||||
const connectOptions = {
|
||||
host: proxyHost,
|
||||
port: resolveProxyPort(proxy),
|
||||
};
|
||||
proxySocket =
|
||||
proxy.protocol === "https:"
|
||||
? tls.connect({
|
||||
...connectOptions,
|
||||
servername: proxyHost,
|
||||
ALPNProtocols: ["http/1.1"],
|
||||
})
|
||||
: net.connect(connectOptions);
|
||||
|
||||
proxySocket.once(proxy.protocol === "https:" ? "secureConnect" : "connect", onConnected);
|
||||
proxySocket.on("data", onData);
|
||||
proxySocket.once("end", onEnd);
|
||||
proxySocket.once("error", onError);
|
||||
proxySocket.once("close", onClose);
|
||||
} catch (err) {
|
||||
fail(err);
|
||||
}
|
||||
new HttpConnectTunnelAttempt(params, proxy, resolve, reject).start();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
export type ActiveManagedProxyUrl = Readonly<URL>;
|
||||
|
||||
export type ActiveManagedProxyRegistration = {
|
||||
proxyUrl: string;
|
||||
proxyUrl: ActiveManagedProxyUrl;
|
||||
stopped: boolean;
|
||||
};
|
||||
|
||||
let activeProxyUrl: string | undefined;
|
||||
let activeProxyHandleCount = 0;
|
||||
let activeProxyUrl: ActiveManagedProxyUrl | undefined;
|
||||
|
||||
export function registerActiveManagedProxyUrl(proxyUrl: string): ActiveManagedProxyRegistration {
|
||||
if (activeProxyUrl !== undefined && activeProxyUrl !== proxyUrl) {
|
||||
export function registerActiveManagedProxyUrl(proxyUrl: URL): ActiveManagedProxyRegistration {
|
||||
if (activeProxyUrl !== undefined) {
|
||||
throw new Error(
|
||||
"proxy: cannot activate a different managed proxy while another proxy is active; " +
|
||||
"proxy: cannot activate a managed proxy while another proxy is active; " +
|
||||
"stop the current proxy before changing proxy.proxyUrl.",
|
||||
);
|
||||
}
|
||||
|
||||
activeProxyUrl = proxyUrl;
|
||||
activeProxyHandleCount += 1;
|
||||
return { proxyUrl, stopped: false };
|
||||
activeProxyUrl = new URL(proxyUrl.href);
|
||||
return { proxyUrl: activeProxyUrl, stopped: false };
|
||||
}
|
||||
|
||||
export function stopActiveManagedProxyRegistration(
|
||||
@@ -26,19 +26,15 @@ export function stopActiveManagedProxyRegistration(
|
||||
return;
|
||||
}
|
||||
registration.stopped = true;
|
||||
if (activeProxyHandleCount > 0) {
|
||||
activeProxyHandleCount -= 1;
|
||||
}
|
||||
if (activeProxyHandleCount === 0) {
|
||||
if (activeProxyUrl?.href === registration.proxyUrl.href) {
|
||||
activeProxyUrl = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export function getActiveManagedProxyUrl(): string | undefined {
|
||||
export function getActiveManagedProxyUrl(): ActiveManagedProxyUrl | undefined {
|
||||
return activeProxyUrl;
|
||||
}
|
||||
|
||||
export function _resetActiveManagedProxyStateForTests(): void {
|
||||
activeProxyUrl = undefined;
|
||||
activeProxyHandleCount = 0;
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ describe("startProxy", () => {
|
||||
proxyUrl: "http://127.0.0.1:3128",
|
||||
});
|
||||
|
||||
expect(getActiveManagedProxyUrl()).toBe("http://127.0.0.1:3128");
|
||||
expect(getActiveManagedProxyUrl()?.href).toBe("http://127.0.0.1:3128/");
|
||||
|
||||
await stopProxy(handle);
|
||||
|
||||
@@ -291,7 +291,7 @@ describe("startProxy", () => {
|
||||
expect((global as Record<string, unknown>)["GLOBAL_AGENT"]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps process-wide proxy hooks active until the last same-URL overlapping handle stops", async () => {
|
||||
it("rejects overlapping handles with the same managed proxy URL", async () => {
|
||||
const patchedHttpRequest = vi.fn() as unknown as typeof http.request;
|
||||
const patchedHttpGet = vi.fn() as unknown as typeof http.get;
|
||||
const patchedHttpsRequest = vi.fn() as unknown as typeof https.request;
|
||||
@@ -311,23 +311,20 @@ describe("startProxy", () => {
|
||||
enabled: true,
|
||||
proxyUrl: "http://127.0.0.1:3128",
|
||||
});
|
||||
const secondHandle = await startProxy({
|
||||
enabled: true,
|
||||
proxyUrl: "http://127.0.0.1:3128",
|
||||
});
|
||||
|
||||
expect(http.request).toBe(patchedHttpRequest);
|
||||
expect(https.request).toBe(patchedHttpsRequest);
|
||||
expect(process.env["HTTP_PROXY"]).toBe("http://127.0.0.1:3128");
|
||||
|
||||
await stopProxy(firstHandle);
|
||||
await expect(
|
||||
startProxy({
|
||||
enabled: true,
|
||||
proxyUrl: "http://127.0.0.1:3128",
|
||||
}),
|
||||
).rejects.toThrow("cannot activate a managed proxy");
|
||||
|
||||
expect(http.request).toBe(patchedHttpRequest);
|
||||
expect(https.request).toBe(patchedHttpsRequest);
|
||||
expect(process.env["HTTP_PROXY"]).toBe("http://127.0.0.1:3128");
|
||||
expect(process.env["OPENCLAW_PROXY_ACTIVE"]).toBe("1");
|
||||
|
||||
await stopProxy(secondHandle);
|
||||
await stopProxy(firstHandle);
|
||||
|
||||
expect(http.request).toBe(originalHttpRequest);
|
||||
expect(http.get).toBe(originalHttpGet);
|
||||
@@ -348,7 +345,7 @@ describe("startProxy", () => {
|
||||
enabled: true,
|
||||
proxyUrl: "http://127.0.0.1:3129",
|
||||
}),
|
||||
).rejects.toThrow("cannot activate a different managed proxy");
|
||||
).rejects.toThrow("cannot activate a managed proxy");
|
||||
|
||||
expect(process.env["HTTP_PROXY"]).toBe("http://127.0.0.1:3128");
|
||||
expect(process.env["OPENCLAW_PROXY_ACTIVE"]).toBe("1");
|
||||
|
||||
@@ -326,16 +326,6 @@ function restoreNodeHttpStackForProxyLifecycle(): void {
|
||||
}
|
||||
}
|
||||
|
||||
function reapplyActiveProxyRuntime(proxyUrl: string): void {
|
||||
applyProxyEnv(proxyUrl);
|
||||
resetUndiciDispatcherForProxyLifecycle();
|
||||
try {
|
||||
bootstrapNodeHttpStack(proxyUrl);
|
||||
} catch (err) {
|
||||
logWarn(`proxy: failed to refresh node HTTP proxy hooks: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
function restoreInactiveProxyRuntime(snapshot: ProxyEnvSnapshot): void {
|
||||
restoreProxyEnv(snapshot);
|
||||
resetUndiciDispatcherForProxyLifecycle();
|
||||
@@ -343,14 +333,7 @@ function restoreInactiveProxyRuntime(snapshot: ProxyEnvSnapshot): void {
|
||||
restoreNodeHttpStackForProxyLifecycle();
|
||||
}
|
||||
|
||||
function restoreAfterFailedProxyActivation(
|
||||
previousActiveProxyUrl: string | undefined,
|
||||
restoreSnapshot: ProxyEnvSnapshot,
|
||||
): void {
|
||||
if (previousActiveProxyUrl) {
|
||||
reapplyActiveProxyRuntime(previousActiveProxyUrl);
|
||||
return;
|
||||
}
|
||||
function restoreAfterFailedProxyActivation(restoreSnapshot: ProxyEnvSnapshot): void {
|
||||
restoreInactiveProxyRuntime(restoreSnapshot);
|
||||
baseProxyEnvSnapshot = null;
|
||||
}
|
||||
@@ -361,12 +344,6 @@ function stopActiveProxyRegistration(registration: ActiveManagedProxyRegistratio
|
||||
}
|
||||
stopActiveManagedProxyRegistration(registration);
|
||||
|
||||
const nextActiveProxyUrl = getActiveManagedProxyUrl();
|
||||
if (nextActiveProxyUrl) {
|
||||
reapplyActiveProxyRuntime(nextActiveProxyUrl);
|
||||
return;
|
||||
}
|
||||
|
||||
const restoreSnapshot = baseProxyEnvSnapshot ?? captureProxyEnv();
|
||||
baseProxyEnvSnapshot = null;
|
||||
restoreInactiveProxyRuntime(restoreSnapshot);
|
||||
@@ -413,7 +390,12 @@ export async function startProxy(config: ProxyConfig | undefined): Promise<Proxy
|
||||
}
|
||||
|
||||
const proxyUrl = resolveProxyUrl(config);
|
||||
const previousActiveProxyUrl = getActiveManagedProxyUrl();
|
||||
if (getActiveManagedProxyUrl()) {
|
||||
throw new Error(
|
||||
"proxy: cannot activate a managed proxy while another proxy is active; " +
|
||||
"stop the current proxy before changing proxy.proxyUrl.",
|
||||
);
|
||||
}
|
||||
baseProxyEnvSnapshot ??= captureProxyEnv();
|
||||
const lifecycleBaseEnvSnapshot = baseProxyEnvSnapshot;
|
||||
let injectedEnvSnapshot = captureProxyEnv();
|
||||
@@ -423,9 +405,9 @@ export async function startProxy(config: ProxyConfig | undefined): Promise<Proxy
|
||||
injectedEnvSnapshot = injectProxyEnv(proxyUrl);
|
||||
forceResetGlobalDispatcher();
|
||||
bootstrapNodeHttpStack(proxyUrl);
|
||||
registration = registerActiveManagedProxyUrl(proxyUrl);
|
||||
registration = registerActiveManagedProxyUrl(new URL(proxyUrl));
|
||||
} catch (err) {
|
||||
restoreAfterFailedProxyActivation(previousActiveProxyUrl, lifecycleBaseEnvSnapshot);
|
||||
restoreAfterFailedProxyActivation(lifecycleBaseEnvSnapshot);
|
||||
throw new Error(`proxy: failed to activate external proxy routing: ${String(err)}`, {
|
||||
cause: err,
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type http2 from "node:http2";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { HttpConnectTunnelParams } from "./net/http-connect-tunnel.js";
|
||||
import {
|
||||
_resetActiveManagedProxyStateForTests,
|
||||
registerActiveManagedProxyUrl,
|
||||
@@ -69,7 +70,7 @@ const { connectSpy, tunnelSpy, fakeRequest, fakeSession, fakeTlsSocket } = vi.ho
|
||||
fakeSession,
|
||||
fakeTlsSocket,
|
||||
connectSpy: vi.fn(() => fakeSession),
|
||||
tunnelSpy: vi.fn(async () => fakeTlsSocket),
|
||||
tunnelSpy: vi.fn(async (_params: HttpConnectTunnelParams) => fakeTlsSocket),
|
||||
};
|
||||
});
|
||||
|
||||
@@ -112,7 +113,7 @@ describe("connectApnsHttp2Session", () => {
|
||||
});
|
||||
|
||||
it("uses an HTTP CONNECT tunnel when managed proxy is active", async () => {
|
||||
const registration = registerActiveManagedProxyUrl("http://proxy.example:8080");
|
||||
const registration = registerActiveManagedProxyUrl(new URL("http://proxy.example:8080"));
|
||||
const { connectApnsHttp2Session } = await import("./push-apns-http2.js");
|
||||
|
||||
const session = await connectApnsHttp2Session({
|
||||
@@ -122,12 +123,16 @@ describe("connectApnsHttp2Session", () => {
|
||||
stopActiveManagedProxyRegistration(registration);
|
||||
|
||||
expect(session).toBe(fakeSession);
|
||||
expect(tunnelSpy).toHaveBeenCalledWith({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
targetHost: "api.push.apple.com",
|
||||
targetPort: 443,
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
const tunnelCall = tunnelSpy.mock.calls.at(-1)?.[0];
|
||||
const proxyUrl = tunnelCall?.proxyUrl;
|
||||
expect(proxyUrl).toBeInstanceOf(URL);
|
||||
if (!(proxyUrl instanceof URL)) {
|
||||
throw new Error("expected active managed proxy URL");
|
||||
}
|
||||
expect(proxyUrl.href).toBe("http://proxy.example:8080/");
|
||||
expect(tunnelCall?.targetHost).toBe("api.push.apple.com");
|
||||
expect(tunnelCall?.targetPort).toBe(443);
|
||||
expect(tunnelCall?.timeoutMs).toBe(10_000);
|
||||
expect(connectSpy).toHaveBeenCalledWith("https://api.push.apple.com", {
|
||||
createConnection: expect.any(Function),
|
||||
});
|
||||
@@ -170,12 +175,16 @@ describe("connectApnsHttp2Session", () => {
|
||||
});
|
||||
|
||||
expect(result).toEqual({ status: 403, body: '{"reason":"InvalidProviderToken"}' });
|
||||
expect(tunnelSpy).toHaveBeenCalledWith({
|
||||
proxyUrl: "http://proxy.example:8080",
|
||||
targetHost: "api.sandbox.push.apple.com",
|
||||
targetPort: 443,
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
const tunnelCall = tunnelSpy.mock.calls.at(-1)?.[0];
|
||||
const proxyUrl = tunnelCall?.proxyUrl;
|
||||
expect(proxyUrl).toBeInstanceOf(URL);
|
||||
if (!(proxyUrl instanceof URL)) {
|
||||
throw new Error("expected explicit proxy URL");
|
||||
}
|
||||
expect(proxyUrl.href).toBe("http://proxy.example:8080/");
|
||||
expect(tunnelCall?.targetHost).toBe("api.sandbox.push.apple.com");
|
||||
expect(tunnelCall?.targetPort).toBe(443);
|
||||
expect(tunnelCall?.timeoutMs).toBe(10_000);
|
||||
expect(fakeSession.request).toHaveBeenCalledWith({
|
||||
":method": "POST",
|
||||
":path": `/3/device/${"0".repeat(64)}`,
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import http2 from "node:http2";
|
||||
import { openHttpConnectTunnel } from "./net/http-connect-tunnel.js";
|
||||
import { getActiveManagedProxyUrl } from "./net/proxy/active-proxy-state.js";
|
||||
import {
|
||||
getActiveManagedProxyUrl,
|
||||
type ActiveManagedProxyUrl,
|
||||
} from "./net/proxy/active-proxy-state.js";
|
||||
|
||||
const APNS_AUTHORITIES = new Set([
|
||||
"https://api.push.apple.com",
|
||||
@@ -43,7 +46,7 @@ function assertApnsAuthority(authority: string): ApnsAuthority {
|
||||
|
||||
async function openProxiedApnsHttp2Session(params: {
|
||||
authority: ApnsAuthority;
|
||||
proxyUrl: string;
|
||||
proxyUrl: ActiveManagedProxyUrl;
|
||||
timeoutMs: number;
|
||||
}): Promise<http2.ClientHttp2Session> {
|
||||
const apnsHost = new URL(params.authority).hostname;
|
||||
@@ -81,7 +84,7 @@ export async function probeApnsHttp2ReachabilityViaProxy(
|
||||
const authority = assertApnsAuthority(params.authority);
|
||||
const session = await openProxiedApnsHttp2Session({
|
||||
authority,
|
||||
proxyUrl: params.proxyUrl,
|
||||
proxyUrl: new URL(params.proxyUrl),
|
||||
timeoutMs: params.timeoutMs,
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user