mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
Matrix: scope dispatcher compatibility fallback
This commit is contained in:
@@ -1,15 +1,13 @@
|
||||
import {
|
||||
buildNoCapabilityModelConfiguredMessage,
|
||||
resolveCapabilityModelCandidates,
|
||||
throwCapabilityGenerationFailure,
|
||||
} from "../../../src/media-generation/runtime-shared.js";
|
||||
import {
|
||||
createSubsystemLogger,
|
||||
describeFailoverError,
|
||||
getImageGenerationProvider,
|
||||
isFailoverError,
|
||||
listImageGenerationProviders,
|
||||
parseImageGenerationModelRef,
|
||||
resolveCapabilityModelCandidates,
|
||||
throwCapabilityGenerationFailure,
|
||||
type AuthProfileStore,
|
||||
type FallbackAttempt,
|
||||
type GeneratedImageAsset,
|
||||
|
||||
@@ -12,10 +12,8 @@ export * from "./src/storage-paths.js";
|
||||
export { ensureMatrixSdkInstalled, isMatrixSdkAvailable } from "./src/matrix/deps.js";
|
||||
export {
|
||||
assertHttpUrlTargetsPrivateNetwork,
|
||||
canBypassPinnedDispatcherForCompatibility,
|
||||
closeDispatcher,
|
||||
createPinnedDispatcher,
|
||||
isPinnedDispatcherRuntimeCompatibilityError,
|
||||
resolvePinnedHostnameWithPolicy,
|
||||
ssrfPolicyFromDangerouslyAllowPrivateNetwork,
|
||||
ssrfPolicyFromAllowPrivateNetwork,
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
import type { PinnedDispatcherPolicy } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import {
|
||||
buildTimeoutAbortSignal,
|
||||
canBypassPinnedDispatcherForCompatibility,
|
||||
closeDispatcher,
|
||||
createPinnedDispatcher,
|
||||
isPinnedDispatcherRuntimeCompatibilityError,
|
||||
resolvePinnedHostnameWithPolicy,
|
||||
type SsrFPolicy,
|
||||
} from "../../runtime-api.js";
|
||||
@@ -86,10 +84,43 @@ function buildBufferedResponse(params: {
|
||||
return response;
|
||||
}
|
||||
|
||||
type ErrorWithCause = {
|
||||
code?: unknown;
|
||||
message?: unknown;
|
||||
cause?: unknown;
|
||||
};
|
||||
|
||||
function* iterateErrorCauseChain(error: unknown): Generator<ErrorWithCause> {
|
||||
const seen = new Set<unknown>();
|
||||
let current: unknown = error;
|
||||
while (current && typeof current === "object" && !seen.has(current)) {
|
||||
seen.add(current);
|
||||
yield current as ErrorWithCause;
|
||||
current = (current as ErrorWithCause).cause;
|
||||
}
|
||||
}
|
||||
|
||||
function canBypassPinnedDispatcherForCompatibility(policy?: PinnedDispatcherPolicy): boolean {
|
||||
return !policy || policy.mode === "direct";
|
||||
}
|
||||
|
||||
function isPinnedDispatcherRuntimeCompatibilityError(error: unknown): boolean {
|
||||
for (const candidate of iterateErrorCauseChain(error)) {
|
||||
const message = typeof candidate.message === "string" ? candidate.message : "";
|
||||
if (
|
||||
candidate.code === "UND_ERR_INVALID_ARG" &&
|
||||
message.toLowerCase().includes("onrequeststart")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async function fetchWithPinnedDispatcherCompatibilityRetry(params: {
|
||||
url: string;
|
||||
init: RequestInit & { dispatcher?: unknown };
|
||||
canBypassPinnedDispatcher: boolean;
|
||||
dispatcherPolicy?: PinnedDispatcherPolicy;
|
||||
dispatcher: ReturnType<typeof createPinnedDispatcher> | undefined;
|
||||
}): Promise<Response> {
|
||||
try {
|
||||
@@ -97,7 +128,7 @@ async function fetchWithPinnedDispatcherCompatibilityRetry(params: {
|
||||
} catch (error) {
|
||||
if (
|
||||
!params.dispatcher ||
|
||||
!params.canBypassPinnedDispatcher ||
|
||||
!canBypassPinnedDispatcherForCompatibility(params.dispatcherPolicy) ||
|
||||
!isPinnedDispatcherRuntimeCompatibilityError(error)
|
||||
) {
|
||||
throw error;
|
||||
@@ -136,9 +167,7 @@ async function fetchWithMatrixGuardedRedirects(params: {
|
||||
dispatcher = createPinnedDispatcher(pinned, params.dispatcherPolicy, params.ssrfPolicy);
|
||||
const response = await fetchWithPinnedDispatcherCompatibilityRetry({
|
||||
url: currentUrl.toString(),
|
||||
canBypassPinnedDispatcher: canBypassPinnedDispatcherForCompatibility(
|
||||
params.dispatcherPolicy,
|
||||
),
|
||||
dispatcherPolicy: params.dispatcherPolicy,
|
||||
dispatcher,
|
||||
init: {
|
||||
...params.init,
|
||||
|
||||
@@ -61,10 +61,8 @@ export {
|
||||
export type { RuntimeEnv } from "openclaw/plugin-sdk/runtime";
|
||||
export {
|
||||
assertHttpUrlTargetsPrivateNetwork,
|
||||
canBypassPinnedDispatcherForCompatibility,
|
||||
closeDispatcher,
|
||||
createPinnedDispatcher,
|
||||
isPinnedDispatcherRuntimeCompatibilityError,
|
||||
isPrivateOrLoopbackHost,
|
||||
resolvePinnedHostnameWithPolicy,
|
||||
ssrfPolicyFromDangerouslyAllowPrivateNetwork,
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
import {
|
||||
buildNoCapabilityModelConfiguredMessage,
|
||||
resolveCapabilityModelCandidates,
|
||||
throwCapabilityGenerationFailure,
|
||||
} from "../../../src/media-generation/runtime-shared.js";
|
||||
import {
|
||||
createSubsystemLogger,
|
||||
describeFailoverError,
|
||||
getVideoGenerationProvider,
|
||||
isFailoverError,
|
||||
listVideoGenerationProviders,
|
||||
parseVideoGenerationModelRef,
|
||||
resolveCapabilityModelCandidates,
|
||||
throwCapabilityGenerationFailure,
|
||||
type AuthProfileStore,
|
||||
type FallbackAttempt,
|
||||
type GeneratedVideoAsset,
|
||||
|
||||
@@ -169,6 +169,8 @@
|
||||
"memory-core-host-runtime-files",
|
||||
"memory-lancedb",
|
||||
"msteams",
|
||||
"music-generation",
|
||||
"music-generation-core",
|
||||
"models-provider-runtime",
|
||||
"skill-commands-runtime",
|
||||
"native-command-registry",
|
||||
|
||||
@@ -317,7 +317,7 @@ describe("fetchWithSsrFGuard hardening", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("retries without the direct pinned dispatcher when the runtime rejects that dispatcher shape", async () => {
|
||||
it("fails closed when the runtime rejects the pinned dispatcher shape", async () => {
|
||||
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => {
|
||||
const requestInit = init as RequestInit & { dispatcher?: unknown };
|
||||
if (requestInit.dispatcher) {
|
||||
@@ -326,42 +326,11 @@ describe("fetchWithSsrFGuard hardening", () => {
|
||||
return okResponse();
|
||||
});
|
||||
|
||||
const result = await fetchWithSsrFGuard({
|
||||
url: "https://public.example/resource",
|
||||
fetchImpl,
|
||||
lookupFn: createPublicLookup(),
|
||||
});
|
||||
|
||||
expect(fetchImpl).toHaveBeenCalledTimes(2);
|
||||
expect(
|
||||
(fetchImpl.mock.calls[0]?.[1] as RequestInit & { dispatcher?: unknown })?.dispatcher,
|
||||
).toBeDefined();
|
||||
expect(
|
||||
(fetchImpl.mock.calls[1]?.[1] as RequestInit & { dispatcher?: unknown })?.dispatcher,
|
||||
).toBeUndefined();
|
||||
await result.release();
|
||||
});
|
||||
|
||||
it("does not bypass proxy routing when proxy dispatchers fail the same way", async () => {
|
||||
const fetchImpl = vi.fn(async () => {
|
||||
throw createPinnedDispatcherCompatibilityError();
|
||||
});
|
||||
const lookupFn = vi.fn(async (hostname: string) => [
|
||||
{
|
||||
address: hostname === "proxy.example" ? "93.184.216.35" : "93.184.216.34",
|
||||
family: 4,
|
||||
},
|
||||
]) as unknown as LookupFn;
|
||||
|
||||
await expect(
|
||||
fetchWithSsrFGuard({
|
||||
url: "https://public.example/resource",
|
||||
fetchImpl,
|
||||
lookupFn,
|
||||
dispatcherPolicy: {
|
||||
mode: "explicit-proxy",
|
||||
proxyUrl: "http://proxy.example:7890",
|
||||
},
|
||||
lookupFn: createPublicLookup(),
|
||||
}),
|
||||
).rejects.toThrow("fetch failed");
|
||||
expect(fetchImpl).toHaveBeenCalledTimes(1);
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
import type { Dispatcher } from "undici";
|
||||
import { logWarn } from "../../logger.js";
|
||||
import { buildTimeoutAbortSignal } from "../../utils/fetch-timeout.js";
|
||||
import {
|
||||
canBypassPinnedDispatcherForCompatibility,
|
||||
isPinnedDispatcherRuntimeCompatibilityError,
|
||||
} from "./pinned-dispatcher-compat.ts";
|
||||
import { hasProxyEnvConfigured } from "./proxy-env.js";
|
||||
import { retainSafeHeadersForCrossOriginRedirect as retainSafeRedirectHeaders } from "./redirect-headers.js";
|
||||
import {
|
||||
@@ -190,29 +186,6 @@ function retainSafeHeadersForCrossOriginRedirect(init?: RequestInit): RequestIni
|
||||
return { ...init, headers: retainSafeRedirectHeaders(init.headers) };
|
||||
}
|
||||
|
||||
async function fetchWithPinnedDispatcherCompatibilityRetry(params: {
|
||||
url: string;
|
||||
init: DispatcherAwareRequestInit;
|
||||
dispatcher: Dispatcher | null;
|
||||
fetchImpl: FetchLike;
|
||||
canBypassPinnedDispatcher: boolean;
|
||||
}): Promise<Response> {
|
||||
try {
|
||||
return await params.fetchImpl(params.url, params.init);
|
||||
} catch (error) {
|
||||
if (
|
||||
!params.dispatcher ||
|
||||
!params.canBypassPinnedDispatcher ||
|
||||
!isPinnedDispatcherRuntimeCompatibilityError(error)
|
||||
) {
|
||||
throw error;
|
||||
}
|
||||
await closeDispatcher(params.dispatcher);
|
||||
const { dispatcher: _dispatcher, ...retryInit } = params.init;
|
||||
return await params.fetchImpl(params.url, retryInit);
|
||||
}
|
||||
}
|
||||
|
||||
function dropBodyHeaders(headers?: HeadersInit): HeadersInit | undefined {
|
||||
if (!headers) {
|
||||
return headers;
|
||||
@@ -349,24 +322,8 @@ export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise<G
|
||||
// dispatchers.
|
||||
const shouldUseRuntimeFetch = Boolean(dispatcher) && !supportsDispatcherInit;
|
||||
const response = shouldUseRuntimeFetch
|
||||
? await fetchWithPinnedDispatcherCompatibilityRetry({
|
||||
url: parsedUrl.toString(),
|
||||
init,
|
||||
dispatcher,
|
||||
canBypassPinnedDispatcher: canBypassPinnedDispatcherForCompatibility(
|
||||
params.dispatcherPolicy,
|
||||
),
|
||||
fetchImpl: fetchWithRuntimeDispatcher,
|
||||
})
|
||||
: await fetchWithPinnedDispatcherCompatibilityRetry({
|
||||
url: parsedUrl.toString(),
|
||||
init,
|
||||
dispatcher,
|
||||
canBypassPinnedDispatcher: canBypassPinnedDispatcherForCompatibility(
|
||||
params.dispatcherPolicy,
|
||||
),
|
||||
fetchImpl: defaultFetch,
|
||||
});
|
||||
? await fetchWithRuntimeDispatcher(parsedUrl.toString(), init)
|
||||
: await defaultFetch(parsedUrl.toString(), init);
|
||||
|
||||
if (isRedirectStatus(response.status)) {
|
||||
const location = response.headers.get("location");
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
import type { PinnedDispatcherPolicy } from "./ssrf.js";
|
||||
|
||||
type ErrorWithCause = {
|
||||
code?: unknown;
|
||||
message?: unknown;
|
||||
cause?: unknown;
|
||||
};
|
||||
|
||||
function* iterateErrorCauseChain(error: unknown): Generator<ErrorWithCause> {
|
||||
const seen = new Set<unknown>();
|
||||
let current: unknown = error;
|
||||
while (current && typeof current === "object" && !seen.has(current)) {
|
||||
seen.add(current);
|
||||
yield current as ErrorWithCause;
|
||||
current = (current as ErrorWithCause).cause;
|
||||
}
|
||||
}
|
||||
|
||||
export function canBypassPinnedDispatcherForCompatibility(
|
||||
policy?: PinnedDispatcherPolicy,
|
||||
): boolean {
|
||||
return !policy || policy.mode === "direct";
|
||||
}
|
||||
|
||||
export function isPinnedDispatcherRuntimeCompatibilityError(error: unknown): boolean {
|
||||
for (const candidate of iterateErrorCauseChain(error)) {
|
||||
const message = typeof candidate.message === "string" ? candidate.message : "";
|
||||
if (
|
||||
candidate.code === "UND_ERR_INVALID_ARG" &&
|
||||
message.toLowerCase().includes("onrequeststart")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -15,6 +15,11 @@ export type {
|
||||
export type { OpenClawConfig } from "../config/config.js";
|
||||
|
||||
export { describeFailoverError, isFailoverError } from "../agents/failover-error.js";
|
||||
export {
|
||||
buildNoCapabilityModelConfiguredMessage,
|
||||
resolveCapabilityModelCandidates,
|
||||
throwCapabilityGenerationFailure,
|
||||
} from "../media-generation/runtime-shared.js";
|
||||
export {
|
||||
resolveAgentModelFallbackValues,
|
||||
resolveAgentModelPrimaryValue,
|
||||
|
||||
@@ -10,10 +10,6 @@ export {
|
||||
type LookupFn,
|
||||
type SsrFPolicy,
|
||||
} from "../infra/net/ssrf.js";
|
||||
export {
|
||||
canBypassPinnedDispatcherForCompatibility,
|
||||
isPinnedDispatcherRuntimeCompatibilityError,
|
||||
} from "../infra/net/pinned-dispatcher-compat.ts";
|
||||
export { formatErrorMessage } from "../infra/errors.js";
|
||||
export { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js";
|
||||
export {
|
||||
|
||||
@@ -16,6 +16,11 @@ export type {
|
||||
export type { OpenClawConfig } from "../config/config.js";
|
||||
|
||||
export { describeFailoverError, isFailoverError } from "../agents/failover-error.js";
|
||||
export {
|
||||
buildNoCapabilityModelConfiguredMessage,
|
||||
resolveCapabilityModelCandidates,
|
||||
throwCapabilityGenerationFailure,
|
||||
} from "../media-generation/runtime-shared.js";
|
||||
export {
|
||||
resolveAgentModelFallbackValues,
|
||||
resolveAgentModelPrimaryValue,
|
||||
|
||||
@@ -41,6 +41,7 @@ const EXPECTED_SHARED_FAMILY_CONTRACTS: Record<string, ExpectedSharedFamilyContr
|
||||
google: {
|
||||
replayFamilies: ["google-gemini"],
|
||||
streamFamilies: ["google-thinking"],
|
||||
toolCompatFamilies: ["gemini"],
|
||||
},
|
||||
kilocode: {
|
||||
replayFamilies: ["passthrough-gemini"],
|
||||
|
||||
@@ -142,7 +142,7 @@ describe("shared runtime seam contracts", () => {
|
||||
};
|
||||
|
||||
const lookupFn = vi.fn(
|
||||
async () => ({ address: "93.184.216.34", family: 4 }) as const,
|
||||
async () => [{ address: "93.184.216.34", family: 4 }] as const,
|
||||
) as unknown as NonNullable<Parameters<typeof fetchWithSsrFGuard>[0]["lookupFn"]>;
|
||||
|
||||
const result = await fetchWithSsrFGuard({
|
||||
|
||||
Reference in New Issue
Block a user