diff --git a/CHANGELOG.md b/CHANGELOG.md index 0819c276084..6738589f2b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai - CLI/browser: preserve parent flags while lazy-loading browser subcommands, so `openclaw browser --json open` and `openclaw browser --json tabs` keep machine-readable output after reparsing. Fixes #74574. Thanks @devintegeritsm. - Plugins/runtime-deps: add `openclaw plugins deps` inspection and repair with script-free package-manager defaults shared across plugin installers, so operators can repair missing bundled runtime deps without corrupting JSON output or blocking unrelated conflict-free deps. Thanks @vincentkoc. - Agents/output: strip internal `[tool calls omitted]` replay placeholders from user-facing replies while preserving visible reply whitespace. Fixes #74573. Thanks @blaspat. +- Providers/Google Vertex: route authorized_user ADC credentials through OpenClaw's REST transport so Docker installs using gcloud application-default credentials no longer crash in the Google SDK before requests are sent. Fixes #74628. Thanks @frankhal2001-design. - Agents/sessions: emit a terminal lifecycle backstop when embedded timeout/error turns return without `agent_end`, so Gateway sessions no longer stay stuck in `running` after failover surfaces a timeout. Fixes #74607. Thanks @millerc79. - Gateway/diagnostics: include stuck-session reason hints and recovery skip causes in warnings, so operators can tell whether a lane is waiting on active work, queued work, or stale bookkeeping. Thanks @vincentkoc. - Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc. diff --git a/extensions/google/provider-registration.ts b/extensions/google/provider-registration.ts index e429e4ef55e..f97d0cb9d26 100644 --- a/extensions/google/provider-registration.ts +++ b/extensions/google/provider-registration.ts @@ -9,7 +9,11 @@ import { normalizeGoogleProviderConfig, resolveGoogleGenerativeAiTransport, } from "./provider-policy.js"; -import { createGoogleGenerativeAiTransportStreamFn } from "./transport-stream.js"; +import { + createGoogleGenerativeAiTransportStreamFn, + createGoogleVertexTransportStreamFn, +} from "./transport-stream.js"; +import { hasGoogleVertexAuthorizedUserAdcSync } from "./vertex-adc.js"; export function buildGoogleProvider(): ProviderPlugin { return { @@ -49,10 +53,15 @@ export function buildGoogleProvider(): ProviderPlugin { providerId: ctx.provider, ctx, }), - createStreamFn: ({ model }) => - model.api === "google-generative-ai" - ? createGoogleGenerativeAiTransportStreamFn() - : undefined, + createStreamFn: ({ model }) => { + if (model.api === "google-generative-ai") { + return createGoogleGenerativeAiTransportStreamFn(); + } + if (model.api === "google-vertex" && hasGoogleVertexAuthorizedUserAdcSync()) { + return createGoogleVertexTransportStreamFn(); + } + return undefined; + }, ...GOOGLE_GEMINI_PROVIDER_HOOKS, isModernModelRef: ({ modelId }) => isModernGoogleModel(modelId), }; diff --git a/extensions/google/transport-stream.test.ts b/extensions/google/transport-stream.test.ts index 4e40b5a4683..c72fa6c0365 100644 --- a/extensions/google/transport-stream.test.ts +++ b/extensions/google/transport-stream.test.ts @@ -1,5 +1,8 @@ +import { mkdtemp, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import type { Model } from "@mariozechner/pi-ai"; -import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const { buildGuardedModelFetchMock, guardedFetchMock } = vi.hoisted(() => ({ buildGuardedModelFetchMock: vi.fn(), @@ -13,6 +16,8 @@ vi.mock("openclaw/plugin-sdk/provider-transport-runtime", async (importOriginal) let buildGoogleGenerativeAiParams: typeof import("./transport-stream.js").buildGoogleGenerativeAiParams; let createGoogleGenerativeAiTransportStreamFn: typeof import("./transport-stream.js").createGoogleGenerativeAiTransportStreamFn; +let createGoogleVertexTransportStreamFn: typeof import("./transport-stream.js").createGoogleVertexTransportStreamFn; +let hasGoogleVertexAuthorizedUserAdcSync: typeof import("./vertex-adc.js").hasGoogleVertexAuthorizedUserAdcSync; const MODEL_PROVIDER_REQUEST_TRANSPORT_SYMBOL = Symbol.for( "openclaw.modelProviderRequestTransport", @@ -63,8 +68,12 @@ function buildSseResponse(events: unknown[]): Response { describe("google transport stream", () => { beforeAll(async () => { - ({ buildGoogleGenerativeAiParams, createGoogleGenerativeAiTransportStreamFn } = - await import("./transport-stream.js")); + ({ + buildGoogleGenerativeAiParams, + createGoogleGenerativeAiTransportStreamFn, + createGoogleVertexTransportStreamFn, + } = await import("./transport-stream.js")); + ({ hasGoogleVertexAuthorizedUserAdcSync } = await import("./vertex-adc.js")); }); beforeEach(() => { @@ -73,6 +82,10 @@ describe("google transport stream", () => { buildGuardedModelFetchMock.mockReturnValue(guardedFetchMock); }); + afterEach(() => { + vi.unstubAllEnvs(); + }); + it("uses the guarded fetch transport and parses Gemini SSE output", async () => { guardedFetchMock.mockResolvedValueOnce( buildSseResponse([ @@ -257,6 +270,89 @@ describe("google transport stream", () => { ); }); + it("refreshes authorized_user ADC before Google Vertex requests", async () => { + const tempDir = await mkdtemp(path.join(os.tmpdir(), "openclaw-google-vertex-adc-")); + const credentialsPath = path.join(tempDir, "application_default_credentials.json"); + await writeFile( + credentialsPath, + JSON.stringify({ + type: "authorized_user", + client_id: "client-id", + client_secret: "client-secret", + refresh_token: "refresh-token", + }), + "utf8", + ); + vi.stubEnv("GOOGLE_APPLICATION_CREDENTIALS", credentialsPath); + vi.stubEnv("GOOGLE_CLOUD_PROJECT", "vertex-project"); + vi.stubEnv("GOOGLE_CLOUD_LOCATION", "global"); + const tokenFetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ access_token: "ya29.vertex-token", expires_in: 3600 }), { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + guardedFetchMock.mockResolvedValueOnce( + buildSseResponse([ + { + candidates: [{ content: { parts: [{ text: "ok" }] }, finishReason: "STOP" }], + }, + ]), + ); + + expect(hasGoogleVertexAuthorizedUserAdcSync()).toBe(true); + + const model = { + id: "gemini-3.1-pro-preview", + name: "Gemini 3.1 Pro Preview", + api: "google-vertex", + provider: "google-vertex", + baseUrl: "https://{location}-aiplatform.googleapis.com", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128000, + maxTokens: 8192, + } satisfies Model<"google-vertex">; + + const streamFn = createGoogleVertexTransportStreamFn(); + const stream = await Promise.resolve( + streamFn( + model, + { + messages: [{ role: "user", content: "hello", timestamp: 0 }], + } as Parameters[1], + { + apiKey: "gcp-vertex-credentials", + fetch: tokenFetchMock, + } as Parameters[2], + ), + ); + const result = await stream.result(); + + expect(tokenFetchMock).toHaveBeenCalledWith( + "https://oauth2.googleapis.com/token", + expect.objectContaining({ method: "POST" }), + ); + expect(guardedFetchMock).toHaveBeenCalledWith( + "https://aiplatform.googleapis.com/v1/projects/vertex-project/locations/global/publishers/google/models/gemini-3.1-pro-preview:streamGenerateContent?alt=sse", + expect.objectContaining({ + method: "POST", + headers: expect.objectContaining({ + Authorization: "Bearer ya29.vertex-token", + "Content-Type": "application/json", + accept: "text/event-stream", + }), + }), + ); + expect(result).toMatchObject({ + api: "google-vertex", + provider: "google-vertex", + stopReason: "stop", + content: [{ type: "text", text: "ok" }], + }); + }); + it("coerces replayed malformed tool-call args to an object for Google payloads", () => { const params = buildGoogleGenerativeAiParams(buildGeminiModel(), { messages: [ diff --git a/extensions/google/transport-stream.ts b/extensions/google/transport-stream.ts index 9ba85ce9eaa..6544d3df92a 100644 --- a/extensions/google/transport-stream.ts +++ b/extensions/google/transport-stream.ts @@ -33,8 +33,14 @@ import { type GoogleThinkingInputLevel, type GoogleThinkingLevel, } from "./thinking-api.js"; +import { + isGoogleVertexCredentialsMarker, + resolveGoogleVertexAuthorizedUserHeaders, +} from "./vertex-adc.js"; -type GoogleTransportModel = Model<"google-generative-ai"> & { +type GoogleTransportApi = "google-generative-ai" | "google-vertex"; + +type GoogleTransportModel = Model & { headers?: Record; provider: string; }; @@ -82,7 +88,7 @@ type GoogleTransportContentBlock = type MutableAssistantOutput = { role: "assistant"; content: Array; - api: "google-generative-ai"; + api: GoogleTransportApi; provider: string; model: string; usage: { @@ -99,6 +105,8 @@ type MutableAssistantOutput = { errorMessage?: string; }; +const GOOGLE_VERTEX_DEFAULT_API_VERSION = "v1"; + type GoogleSseChunk = { responseId?: string; candidates?: Array<{ @@ -127,6 +135,10 @@ type GoogleSseChunk = { let toolCallCounter = 0; +function normalizeOptionalString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + function requiresToolCallId(modelId: string): boolean { return modelId.startsWith("claude-") || modelId.startsWith("gpt-oss-"); } @@ -188,11 +200,66 @@ function resolveGoogleModelPath(modelId: string): string { return `models/${modelId}`; } -function buildGoogleRequestUrl(model: GoogleTransportModel): string { +function buildGoogleGenerativeAiRequestUrl(model: GoogleTransportModel): string { const baseUrl = normalizeGoogleApiBaseUrl(model.baseUrl); return `${baseUrl}/${resolveGoogleModelPath(model.id)}:streamGenerateContent?alt=sse`; } +function resolveGoogleVertexProject(options: GoogleTransportOptions | undefined): string { + const project = + normalizeOptionalString((options as { project?: unknown } | undefined)?.project) || + normalizeOptionalString(process.env.GOOGLE_CLOUD_PROJECT) || + normalizeOptionalString(process.env.GCLOUD_PROJECT); + if (!project) { + throw new Error( + "Vertex AI requires a project ID. Set GOOGLE_CLOUD_PROJECT/GCLOUD_PROJECT or pass project in options.", + ); + } + return project; +} + +function resolveGoogleVertexLocation(options: GoogleTransportOptions | undefined): string { + const location = + normalizeOptionalString((options as { location?: unknown } | undefined)?.location) || + normalizeOptionalString(process.env.GOOGLE_CLOUD_LOCATION); + if (!location) { + throw new Error( + "Vertex AI requires a location. Set GOOGLE_CLOUD_LOCATION or pass location in options.", + ); + } + return location; +} + +function resolveGoogleVertexBaseOrigin(model: GoogleTransportModel, location: string): string { + const configured = normalizeOptionalString(model.baseUrl); + if (configured && !configured.includes("{location}")) { + try { + const url = new URL(configured); + url.pathname = ""; + url.search = ""; + url.hash = ""; + return url.toString().replace(/\/$/u, ""); + } catch { + return configured.replace(/\/+$/u, ""); + } + } + if (location === "global") { + return "https://aiplatform.googleapis.com"; + } + return `https://${location}-aiplatform.googleapis.com`; +} + +function buildGoogleVertexRequestUrl( + model: GoogleTransportModel, + options: GoogleTransportOptions | undefined, +): string { + const project = encodeURIComponent(resolveGoogleVertexProject(options)); + const location = encodeURIComponent(resolveGoogleVertexLocation(options)); + const modelId = encodeURIComponent(model.id); + const origin = resolveGoogleVertexBaseOrigin(model, decodeURIComponent(location)); + return `${origin}/${GOOGLE_VERTEX_DEFAULT_API_VERSION}/projects/${project}/locations/${location}/publishers/google/models/${modelId}:streamGenerateContent?alt=sse`; +} + function resolveThinkingLevel(level: ThinkingLevel, modelId: string): GoogleThinkingLevel { const resolved = resolveGoogleGemini3ThinkingLevel({ modelId, thinkingLevel: level }); if (resolved) { @@ -515,17 +582,71 @@ function buildGoogleHeaders( return ( mergeTransportHeaders( { + "Content-Type": "application/json", accept: "text/event-stream", }, authHeaders, model.headers, optionHeaders, ) ?? { + "Content-Type": "application/json", accept: "text/event-stream", } ); } +async function buildGoogleVertexHeaders( + model: GoogleTransportModel, + apiKey: string | undefined, + optionHeaders: Record | undefined, + fetchImpl?: typeof fetch, +): Promise> { + const authHeaders = isGoogleVertexCredentialsMarker(apiKey) + ? await resolveGoogleVertexAuthorizedUserHeaders(fetchImpl) + : { "x-goog-api-key": apiKey }; + return ( + mergeTransportHeaders( + { + "Content-Type": "application/json", + accept: "text/event-stream", + }, + authHeaders, + model.headers, + optionHeaders, + ) ?? { + "Content-Type": "application/json", + accept: "text/event-stream", + } + ); +} + +function buildGoogleTransportRequestUrl( + kind: GoogleTransportApi, + model: GoogleTransportModel, + options: GoogleTransportOptions | undefined, +): string { + return kind === "google-vertex" + ? buildGoogleVertexRequestUrl(model, options) + : buildGoogleGenerativeAiRequestUrl(model); +} + +async function buildGoogleTransportHeaders(params: { + kind: GoogleTransportApi; + model: GoogleTransportModel; + apiKey: string | undefined; + optionHeaders: Record | undefined; + fetchImpl?: typeof fetch; +}): Promise> { + return params.kind === "google-vertex" + ? await buildGoogleVertexHeaders( + params.model, + params.apiKey, + params.optionHeaders, + params.fetchImpl, + ) + : buildGoogleHeaders(params.model, params.apiKey, params.optionHeaders); +} + async function* parseGoogleSseChunks( response: Response, signal?: AbortSignal, @@ -621,7 +742,7 @@ function pushTextBlockEnd( } } -export function createGoogleGenerativeAiTransportStreamFn(): StreamFn { +function createGoogleTransportStreamFn(kind: GoogleTransportApi): StreamFn { return (rawModel, context, rawOptions) => { const model = rawModel as GoogleTransportModel; const options = rawOptions as GoogleTransportOptions | undefined; @@ -630,7 +751,7 @@ export function createGoogleGenerativeAiTransportStreamFn(): StreamFn { const output: MutableAssistantOutput = { role: "assistant", content: [], - api: "google-generative-ai", + api: kind, provider: model.provider, model: model.id, usage: createEmptyTransportUsage(), @@ -645,14 +766,25 @@ export function createGoogleGenerativeAiTransportStreamFn(): StreamFn { if (nextParams !== undefined) { params = nextParams as GoogleGenerateContentRequest; } - const response = await guardedFetch(buildGoogleRequestUrl(model), { + const response = await guardedFetch(buildGoogleTransportRequestUrl(kind, model, options), { method: "POST", - headers: buildGoogleHeaders(model, apiKey, options?.headers), + headers: await buildGoogleTransportHeaders({ + kind, + model, + apiKey, + optionHeaders: options?.headers, + fetchImpl: (options as { fetch?: typeof fetch } | undefined)?.fetch, + }), body: JSON.stringify(params), signal: options?.signal, }); if (!response.ok) { - throw await createProviderHttpError(response, "Google Generative AI API error"); + throw await createProviderHttpError( + response, + kind === "google-vertex" + ? "Google Vertex AI API error" + : "Google Generative AI API error", + ); } stream.push({ type: "start", partial: output as never }); let currentBlockIndex = -1; @@ -779,3 +911,11 @@ export function createGoogleGenerativeAiTransportStreamFn(): StreamFn { return eventStream as unknown as ReturnType; }; } + +export function createGoogleGenerativeAiTransportStreamFn(): StreamFn { + return createGoogleTransportStreamFn("google-generative-ai"); +} + +export function createGoogleVertexTransportStreamFn(): StreamFn { + return createGoogleTransportStreamFn("google-vertex"); +} diff --git a/extensions/google/vertex-adc.ts b/extensions/google/vertex-adc.ts new file mode 100644 index 00000000000..36d822330f6 --- /dev/null +++ b/extensions/google/vertex-adc.ts @@ -0,0 +1,171 @@ +import { existsSync, readFileSync } from "node:fs"; +import { readFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +type GoogleAuthorizedUserCredentials = { + type: "authorized_user"; + client_id?: string; + client_secret?: string; + refresh_token?: string; +}; + +type GoogleVertexAuthorizedUserToken = { + token: string; + expiresAtMs: number; + credentialsPath: string; + refreshToken: string; +}; + +const GCP_VERTEX_CREDENTIALS_MARKER = "gcp-vertex-credentials"; +const GOOGLE_OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"; + +let cachedGoogleVertexAuthorizedUserToken: GoogleVertexAuthorizedUserToken | undefined; + +function normalizeOptionalString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +export function isGoogleVertexCredentialsMarker( + apiKey: string | undefined, +): apiKey is undefined | typeof GCP_VERTEX_CREDENTIALS_MARKER { + return apiKey === undefined || apiKey === GCP_VERTEX_CREDENTIALS_MARKER; +} + +function resolveGoogleApplicationCredentialsPath( + env: NodeJS.ProcessEnv = process.env, +): string | undefined { + const explicit = normalizeOptionalString(env.GOOGLE_APPLICATION_CREDENTIALS); + if (explicit) { + return existsSync(explicit) ? explicit : undefined; + } + const homeDir = normalizeOptionalString(env.HOME) ?? os.homedir(); + const fallback = path.join(homeDir, ".config", "gcloud", "application_default_credentials.json"); + return existsSync(fallback) ? fallback : undefined; +} + +async function readGoogleAuthorizedUserCredentials( + credentialsPath: string, +): Promise { + let parsed: unknown; + try { + parsed = JSON.parse(await readFile(credentialsPath, "utf8")) as unknown; + } catch { + return undefined; + } + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return undefined; + } + const record = parsed as Record; + if (record.type !== "authorized_user") { + return undefined; + } + return { + type: "authorized_user", + client_id: normalizeOptionalString(record.client_id), + client_secret: normalizeOptionalString(record.client_secret), + refresh_token: normalizeOptionalString(record.refresh_token), + }; +} + +export function hasGoogleVertexAuthorizedUserAdcSync( + env: NodeJS.ProcessEnv = process.env, +): boolean { + const credentialsPath = resolveGoogleApplicationCredentialsPath(env); + if (!credentialsPath) { + return false; + } + try { + const parsed = JSON.parse(readFileSync(credentialsPath, "utf8")) as unknown; + return ( + Boolean(parsed) && + typeof parsed === "object" && + !Array.isArray(parsed) && + (parsed as { type?: unknown }).type === "authorized_user" + ); + } catch { + return false; + } +} + +async function refreshGoogleVertexAuthorizedUserAccessToken(params: { + credentialsPath: string; + credentials: GoogleAuthorizedUserCredentials; + fetchImpl?: typeof fetch; +}): Promise { + const clientId = normalizeOptionalString(params.credentials.client_id); + const clientSecret = normalizeOptionalString(params.credentials.client_secret); + const refreshToken = normalizeOptionalString(params.credentials.refresh_token); + if (!clientId || !clientSecret || !refreshToken) { + throw new Error( + "Google Vertex authorized_user ADC is missing client_id, client_secret, or refresh_token.", + ); + } + + const cached = cachedGoogleVertexAuthorizedUserToken; + if ( + cached?.credentialsPath === params.credentialsPath && + cached.refreshToken === refreshToken && + cached.expiresAtMs - Date.now() > 60_000 + ) { + return cached.token; + } + + const body = new URLSearchParams({ + client_id: clientId, + client_secret: clientSecret, + refresh_token: refreshToken, + grant_type: "refresh_token", + }); + const response = await (params.fetchImpl ?? fetch)(GOOGLE_OAUTH_TOKEN_URL, { + method: "POST", + headers: { "Content-Type": "application/x-www-form-urlencoded" }, + body, + }); + const payload = (await response.json().catch(() => undefined)) as + | { access_token?: unknown; expires_in?: unknown; error?: unknown; error_description?: unknown } + | undefined; + if (!response.ok) { + const description = normalizeOptionalString(payload?.error_description); + const code = normalizeOptionalString(payload?.error); + throw new Error( + `Google Vertex ADC token refresh failed: ${response.status}${code ? ` ${code}` : ""}${description ? ` (${description})` : ""}`, + ); + } + const token = normalizeOptionalString(payload?.access_token); + if (!token) { + throw new Error("Google Vertex ADC token refresh response did not include an access_token."); + } + const expiresInSeconds = + typeof payload?.expires_in === "number" && Number.isFinite(payload.expires_in) + ? payload.expires_in + : 3600; + cachedGoogleVertexAuthorizedUserToken = { + token, + expiresAtMs: Date.now() + Math.max(1, expiresInSeconds) * 1000, + credentialsPath: params.credentialsPath, + refreshToken, + }; + return token; +} + +export async function resolveGoogleVertexAuthorizedUserHeaders( + fetchImpl?: typeof fetch, +): Promise> { + const credentialsPath = resolveGoogleApplicationCredentialsPath(); + if (!credentialsPath) { + throw new Error( + "Google Vertex ADC credentials not found. Set GOOGLE_APPLICATION_CREDENTIALS or run gcloud auth application-default login.", + ); + } + const credentials = await readGoogleAuthorizedUserCredentials(credentialsPath); + if (!credentials) { + throw new Error("Google Vertex ADC fallback requires an authorized_user credentials file."); + } + const token = await refreshGoogleVertexAuthorizedUserAccessToken({ + credentialsPath, + credentials, + fetchImpl, + }); + return { Authorization: `Bearer ${token}` }; +}