From 7544beea17943fa16b43bba670ffe089ba00091f Mon Sep 17 00:00:00 2001 From: Shakker Date: Wed, 6 May 2026 06:03:30 +0100 Subject: [PATCH] fix: preserve embedded dispatcher timeouts --- .../run/attempt-http-runtime.ts | 4 +- .../attempt.spawn-workspace.test-support.ts | 6 ++ .../attempt.spawn-workspace.timeout.test.ts | 10 +- .../net/undici-global-dispatcher.test.ts | 22 +++++ src/infra/net/undici-global-dispatcher.ts | 97 +++++++++++++------ 5 files changed, 103 insertions(+), 36 deletions(-) diff --git a/src/agents/pi-embedded-runner/run/attempt-http-runtime.ts b/src/agents/pi-embedded-runner/run/attempt-http-runtime.ts index 95453742a87..3c79be7b1d9 100644 --- a/src/agents/pi-embedded-runner/run/attempt-http-runtime.ts +++ b/src/agents/pi-embedded-runner/run/attempt-http-runtime.ts @@ -1,14 +1,14 @@ import { DEFAULT_UNDICI_STREAM_TIMEOUT_MS, + ensureGlobalUndiciDispatcherStreamTimeouts, ensureGlobalUndiciEnvProxyDispatcher, - ensureGlobalUndiciStreamTimeouts, } from "../../../infra/net/undici-global-dispatcher.js"; export function configureEmbeddedAttemptHttpRuntime(params: { timeoutMs: number }): void { // Proxy bootstrap must happen before timeout tuning so the timeouts wrap the // active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher. ensureGlobalUndiciEnvProxyDispatcher(); - ensureGlobalUndiciStreamTimeouts({ + ensureGlobalUndiciDispatcherStreamTimeouts({ timeoutMs: Math.max(params.timeoutMs, DEFAULT_UNDICI_STREAM_TIMEOUT_MS), }); } diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index 4c38126520a..43ac30b51da 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -60,6 +60,7 @@ type AttemptSpawnWorkspaceHoisted = { sessionManagerOpenMock: UnknownMock; resolveSandboxContextMock: UnknownMock; ensureGlobalUndiciEnvProxyDispatcherMock: UnknownMock; + ensureGlobalUndiciDispatcherStreamTimeoutsMock: UnknownMock; ensureGlobalUndiciStreamTimeoutsMock: UnknownMock; buildEmbeddedMessageActionDiscoveryInputMock: UnknownMock; createOpenClawCodingToolsMock: UnknownMock; @@ -125,6 +126,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { const sessionManagerOpenMock = vi.fn(); const resolveSandboxContextMock = vi.fn(); const ensureGlobalUndiciEnvProxyDispatcherMock = vi.fn(); + const ensureGlobalUndiciDispatcherStreamTimeoutsMock = vi.fn(); const ensureGlobalUndiciStreamTimeoutsMock = vi.fn(); const buildEmbeddedMessageActionDiscoveryInputMock = vi.fn((params: unknown) => params); const createOpenClawCodingToolsMock = vi.fn(() => []); @@ -193,6 +195,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { sessionManagerOpenMock, resolveSandboxContextMock, ensureGlobalUndiciEnvProxyDispatcherMock, + ensureGlobalUndiciDispatcherStreamTimeoutsMock, ensureGlobalUndiciStreamTimeoutsMock, buildEmbeddedMessageActionDiscoveryInputMock, createOpenClawCodingToolsMock, @@ -287,6 +290,8 @@ vi.mock("../../../infra/net/undici-global-dispatcher.js", () => ({ DEFAULT_UNDICI_STREAM_TIMEOUT_MS: 120_000, ensureGlobalUndiciEnvProxyDispatcher: (...args: unknown[]) => hoisted.ensureGlobalUndiciEnvProxyDispatcherMock(...args), + ensureGlobalUndiciDispatcherStreamTimeouts: (...args: unknown[]) => + hoisted.ensureGlobalUndiciDispatcherStreamTimeoutsMock(...args), ensureGlobalUndiciStreamTimeouts: (...args: unknown[]) => hoisted.ensureGlobalUndiciStreamTimeoutsMock(...args), })); @@ -792,6 +797,7 @@ export function resetEmbeddedAttemptHarness( hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager); hoisted.resolveSandboxContextMock.mockReset(); hoisted.ensureGlobalUndiciEnvProxyDispatcherMock.mockReset(); + hoisted.ensureGlobalUndiciDispatcherStreamTimeoutsMock.mockReset(); hoisted.ensureGlobalUndiciStreamTimeoutsMock.mockReset(); hoisted.buildEmbeddedMessageActionDiscoveryInputMock .mockReset() diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.timeout.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.timeout.test.ts index 5db5b486423..536c9adf0cd 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.timeout.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.timeout.test.ts @@ -2,14 +2,14 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const mocks = vi.hoisted(() => ({ DEFAULT_UNDICI_STREAM_TIMEOUT_MS: 30 * 60 * 1000, + ensureGlobalUndiciDispatcherStreamTimeouts: vi.fn(), ensureGlobalUndiciEnvProxyDispatcher: vi.fn(), - ensureGlobalUndiciStreamTimeouts: vi.fn(), })); vi.mock("../../../infra/net/undici-global-dispatcher.js", () => ({ DEFAULT_UNDICI_STREAM_TIMEOUT_MS: mocks.DEFAULT_UNDICI_STREAM_TIMEOUT_MS, + ensureGlobalUndiciDispatcherStreamTimeouts: mocks.ensureGlobalUndiciDispatcherStreamTimeouts, ensureGlobalUndiciEnvProxyDispatcher: mocks.ensureGlobalUndiciEnvProxyDispatcher, - ensureGlobalUndiciStreamTimeouts: mocks.ensureGlobalUndiciStreamTimeouts, })); import { configureEmbeddedAttemptHttpRuntime } from "./attempt-http-runtime.js"; @@ -17,14 +17,14 @@ import { configureEmbeddedAttemptHttpRuntime } from "./attempt-http-runtime.js"; describe("runEmbeddedAttempt undici timeout wiring", () => { beforeEach(() => { mocks.ensureGlobalUndiciEnvProxyDispatcher.mockReset(); - mocks.ensureGlobalUndiciStreamTimeouts.mockReset(); + mocks.ensureGlobalUndiciDispatcherStreamTimeouts.mockReset(); }); it("does not lower global undici stream tuning below the shared default", () => { configureEmbeddedAttemptHttpRuntime({ timeoutMs: 123_456 }); expect(mocks.ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce(); - expect(mocks.ensureGlobalUndiciStreamTimeouts).toHaveBeenCalledWith({ + expect(mocks.ensureGlobalUndiciDispatcherStreamTimeouts).toHaveBeenCalledWith({ timeoutMs: mocks.DEFAULT_UNDICI_STREAM_TIMEOUT_MS, }); }); @@ -35,7 +35,7 @@ describe("runEmbeddedAttempt undici timeout wiring", () => { configureEmbeddedAttemptHttpRuntime({ timeoutMs }); expect(mocks.ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce(); - expect(mocks.ensureGlobalUndiciStreamTimeouts).toHaveBeenCalledWith({ + expect(mocks.ensureGlobalUndiciDispatcherStreamTimeouts).toHaveBeenCalledWith({ timeoutMs, }); }); diff --git a/src/infra/net/undici-global-dispatcher.test.ts b/src/infra/net/undici-global-dispatcher.test.ts index 01db4cc5f00..6c941359ea6 100644 --- a/src/infra/net/undici-global-dispatcher.test.ts +++ b/src/infra/net/undici-global-dispatcher.test.ts @@ -79,6 +79,7 @@ vi.mock("../wsl.js", () => ({ import { isWSL2Sync } from "../wsl.js"; import { hasEnvHttpProxyAgentConfigured, resolveEnvHttpProxyAgentOptions } from "./proxy-env.js"; let DEFAULT_UNDICI_STREAM_TIMEOUT_MS: typeof import("./undici-global-dispatcher.js").DEFAULT_UNDICI_STREAM_TIMEOUT_MS; +let ensureGlobalUndiciDispatcherStreamTimeouts: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciDispatcherStreamTimeouts; let ensureGlobalUndiciEnvProxyDispatcher: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciEnvProxyDispatcher; let ensureGlobalUndiciStreamTimeouts: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciStreamTimeouts; let forceResetGlobalDispatcher: typeof import("./undici-global-dispatcher.js").forceResetGlobalDispatcher; @@ -90,6 +91,7 @@ describe("ensureGlobalUndiciStreamTimeouts", () => { undiciGlobalDispatcherModule = await import("./undici-global-dispatcher.js"); ({ DEFAULT_UNDICI_STREAM_TIMEOUT_MS, + ensureGlobalUndiciDispatcherStreamTimeouts, ensureGlobalUndiciEnvProxyDispatcher, ensureGlobalUndiciStreamTimeouts, forceResetGlobalDispatcher, @@ -150,6 +152,26 @@ describe("ensureGlobalUndiciStreamTimeouts", () => { expect(output.trim()).toBe("ok"); }); + it("explicitly tunes the global dispatcher when requested for embedded attempts", () => { + getDefaultAutoSelectFamily.mockReturnValue(false); + + ensureGlobalUndiciDispatcherStreamTimeouts({ timeoutMs: 1_900_000 }); + + expect(loadUndiciGlobalDispatcherDeps).toHaveBeenCalledTimes(1); + expect(setGlobalDispatcher).toHaveBeenCalledTimes(1); + const next = getCurrentDispatcher() as { options?: Record }; + expect(next).toBeInstanceOf(Agent); + expect(next.options).toEqual({ + bodyTimeout: 1_900_000, + headersTimeout: 1_900_000, + connect: { + autoSelectFamily: false, + autoSelectFamilyAttemptTimeout: 300, + }, + }); + expect(undiciGlobalDispatcherModule._globalUndiciStreamTimeoutMs).toBe(1_900_000); + }); + it("replaces EnvHttpProxyAgent dispatcher while preserving env-proxy mode", () => { getDefaultAutoSelectFamily.mockReturnValue(false); vi.mocked(hasEnvHttpProxyAgentConfigured).mockReturnValue(true); diff --git a/src/infra/net/undici-global-dispatcher.ts b/src/infra/net/undici-global-dispatcher.ts index 47660ebfde9..c75847633eb 100644 --- a/src/infra/net/undici-global-dispatcher.ts +++ b/src/infra/net/undici-global-dispatcher.ts @@ -49,9 +49,17 @@ function resolveDispatcherKey(params: { return `${params.kind}:${params.timeoutMs}:${autoSelectToken}`; } +function resolveStreamTimeoutMs(opts?: { timeoutMs?: number }): number | null { + const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS; + if (!Number.isFinite(timeoutMsRaw)) { + return null; + } + return Math.max(DEFAULT_UNDICI_STREAM_TIMEOUT_MS, Math.floor(timeoutMsRaw)); +} + function resolveCurrentDispatcherKind( runtime: Pick, -): DispatcherKind | null { +): Exclude | null { let dispatcher: unknown; try { dispatcher = runtime.getGlobalDispatcher(); @@ -92,27 +100,12 @@ export function ensureGlobalUndiciEnvProxyDispatcher(): void { } } -export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): void { - const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS; - if (!Number.isFinite(timeoutMsRaw)) { - return; - } - const timeoutMs = Math.max(DEFAULT_UNDICI_STREAM_TIMEOUT_MS, Math.floor(timeoutMsRaw)); - _globalUndiciStreamTimeoutMs = timeoutMs; - if (!hasEnvHttpProxyAgentConfigured()) { - lastAppliedTimeoutKey = null; - return; - } - const runtime = loadUndiciGlobalDispatcherDeps(); - const { EnvHttpProxyAgent, setGlobalDispatcher } = runtime; - const kind = resolveCurrentDispatcherKind(runtime); - if (kind === null) { - return; - } - if (kind !== "env-proxy") { - return; - } - +function applyGlobalDispatcherStreamTimeouts(params: { + runtime: UndiciGlobalDispatcherDeps; + kind: Exclude; + timeoutMs: number; +}): void { + const { runtime, kind, timeoutMs } = params; const autoSelectFamily = resolveUndiciAutoSelectFamily(); const nextKey = resolveDispatcherKey({ kind, timeoutMs, autoSelectFamily }); if (lastAppliedTimeoutKey === nextKey) { @@ -121,19 +114,65 @@ export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): const connect = createUndiciAutoSelectFamilyConnectOptions(autoSelectFamily); try { - const proxyOptions = { - ...resolveEnvHttpProxyAgentOptions(), - bodyTimeout: timeoutMs, - headersTimeout: timeoutMs, - ...(connect ? { connect } : {}), - } as ConstructorParameters[0]; - setGlobalDispatcher(new EnvHttpProxyAgent(proxyOptions)); + if (kind === "env-proxy") { + const proxyOptions = { + ...resolveEnvHttpProxyAgentOptions(), + bodyTimeout: timeoutMs, + headersTimeout: timeoutMs, + ...(connect ? { connect } : {}), + } as ConstructorParameters[0]; + runtime.setGlobalDispatcher(new runtime.EnvHttpProxyAgent(proxyOptions)); + } else { + runtime.setGlobalDispatcher( + new runtime.Agent({ + bodyTimeout: timeoutMs, + headersTimeout: timeoutMs, + ...(connect ? { connect } : {}), + }), + ); + } lastAppliedTimeoutKey = nextKey; } catch { // Best-effort hardening only. } } +export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): void { + const timeoutMs = resolveStreamTimeoutMs(opts); + if (timeoutMs === null) { + return; + } + _globalUndiciStreamTimeoutMs = timeoutMs; + if (!hasEnvHttpProxyAgentConfigured()) { + lastAppliedTimeoutKey = null; + return; + } + const runtime = loadUndiciGlobalDispatcherDeps(); + const kind = resolveCurrentDispatcherKind(runtime); + if (kind === null) { + return; + } + if (kind !== "env-proxy") { + return; + } + + applyGlobalDispatcherStreamTimeouts({ runtime, kind, timeoutMs }); +} + +export function ensureGlobalUndiciDispatcherStreamTimeouts(opts?: { timeoutMs?: number }): void { + const timeoutMs = resolveStreamTimeoutMs(opts); + if (timeoutMs === null) { + return; + } + _globalUndiciStreamTimeoutMs = timeoutMs; + const runtime = loadUndiciGlobalDispatcherDeps(); + const kind = resolveCurrentDispatcherKind(runtime); + if (kind === null) { + return; + } + applyGlobalDispatcherStreamTimeouts({ runtime, kind, timeoutMs }); +} + export function resetGlobalUndiciStreamTimeoutsForTests(): void { lastAppliedTimeoutKey = null; lastAppliedProxyBootstrap = false;