diff --git a/extensions/google/transport-stream.test.ts b/extensions/google/transport-stream.test.ts index 0134b97a8b6..fcdaf4e9b32 100644 --- a/extensions/google/transport-stream.test.ts +++ b/extensions/google/transport-stream.test.ts @@ -2,6 +2,7 @@ import { mkdir, mkdtemp, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import { gzipSync } from "node:zlib"; import type { Model } from "openclaw/plugin-sdk/llm"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; @@ -957,6 +958,64 @@ describe("google transport stream", () => { expect(result.content).toEqual([{ type: "text", text: "ok" }]); }); + it("refreshes authorized_user ADC from a compressed token response", async () => { + const tempDir = await mkdtemp(path.join(os.tmpdir(), "openclaw-google-vertex-adc-gzip-")); + const credentialsPath = path.join(tempDir, "application_default_credentials.json"); + await writeFile( + credentialsPath, + JSON.stringify({ + type: "authorized_user", + client_id: "client-id", + client_secret: "client-secret", + refresh_token: "gzip-refresh-token", + }), + "utf8", + ); + vi.stubEnv("GOOGLE_APPLICATION_CREDENTIALS", credentialsPath); + vi.stubEnv("GOOGLE_CLOUD_PROJECT", "vertex-project"); + vi.stubEnv("GOOGLE_CLOUD_LOCATION", "global"); + const tokenFetchMock = vi.fn().mockResolvedValue( + new Response( + gzipSync(JSON.stringify({ access_token: "ya29.gzip-token", expires_in: 3600 })), + { + status: 200, + headers: { + "content-encoding": "gzip", + "content-type": "application/json", + }, + }, + ), + ); + guardedFetchMock.mockResolvedValueOnce( + buildSseResponse([ + { + candidates: [{ content: { parts: [{ text: "ok" }] }, finishReason: "STOP" }], + }, + ]), + ); + + const streamFn = createGoogleVertexTransportStreamFn(); + const stream = await Promise.resolve( + streamFn( + buildGoogleVertexModel(), + { + messages: [{ role: "user", content: "hello", timestamp: 0 }], + } as Parameters[1], + { + apiKey: "gcp-vertex-credentials", + fetch: tokenFetchMock, + } as Parameters[2], + ), + ); + await stream.result(); + + expect(tokenFetchMock).toHaveBeenCalledTimes(1); + const guardedCall = requireMockCall(guardedFetchMock, 0, "guarded fetch"); + expectHeaders(requireRequestInit(guardedCall, "guarded fetch"), { + Authorization: "Bearer ya29.gzip-token", + }); + }); + it("does not reuse authorized_user ADC tokens with unsafe expiry lifetimes", async () => { const tempDir = await mkdtemp(path.join(os.tmpdir(), "openclaw-google-vertex-unsafe-adc-")); const credentialsPath = path.join(tempDir, "application_default_credentials.json"); diff --git a/extensions/google/vertex-adc.ts b/extensions/google/vertex-adc.ts index c30678ab00c..9bf2e45c799 100644 --- a/extensions/google/vertex-adc.ts +++ b/extensions/google/vertex-adc.ts @@ -3,6 +3,7 @@ import { existsSync, readFileSync } from "node:fs"; import { readFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import { gunzipSync } from "node:zlib"; import { asDateTimestampMs, resolveExpiresAtMsFromDurationMs, @@ -29,6 +30,13 @@ type GoogleVertexAdcToken = { expiresAtMs: number; }; +type GoogleOauthTokenResponsePayload = { + access_token?: unknown; + expires_in?: unknown; + error?: unknown; + error_description?: unknown; +}; + const GCP_VERTEX_CREDENTIALS_MARKER = "gcp-vertex-credentials"; const GOOGLE_OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"; const GOOGLE_VERTEX_OAUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; @@ -238,9 +246,7 @@ async function refreshGoogleVertexAuthorizedUserAccessToken(params: { headers: { "Content-Type": "application/x-www-form-urlencoded" }, body, }); - const payload = (await response.json().catch(() => undefined)) as - | { access_token?: unknown; expires_in?: unknown; error?: unknown; error_description?: unknown } - | undefined; + const payload = await readGoogleOauthTokenResponsePayload(response); if (!response.ok) { const description = normalizeOptionalString(payload?.error_description); const code = normalizeOptionalString(payload?.error); @@ -248,6 +254,9 @@ async function refreshGoogleVertexAuthorizedUserAccessToken(params: { `Google Vertex ADC token refresh failed: ${response.status}${code ? ` ${code}` : ""}${description ? ` (${description})` : ""}`, ); } + if (!payload) { + throw new Error("Google Vertex ADC token refresh response could not be parsed as JSON."); + } const token = normalizeOptionalString(payload?.access_token); if (!token) { throw new Error("Google Vertex ADC token refresh response did not include an access_token."); @@ -265,6 +274,45 @@ async function refreshGoogleVertexAuthorizedUserAccessToken(params: { return token; } +async function readGoogleOauthTokenResponsePayload( + response: Response, +): Promise { + const bytes = Buffer.from(await response.arrayBuffer()); + const text = decodeGoogleOauthTokenResponseBody(bytes, response.headers.get("content-encoding")); + if (!text.trim()) { + return undefined; + } + try { + return JSON.parse(text) as GoogleOauthTokenResponsePayload; + } catch { + return undefined; + } +} + +function decodeGoogleOauthTokenResponseBody(bytes: Buffer, contentEncoding: string | null): string { + if (shouldGunzipGoogleOauthTokenResponse(bytes, contentEncoding)) { + try { + return gunzipSync(bytes).toString("utf8"); + } catch { + return bytes.toString("utf8"); + } + } + return bytes.toString("utf8"); +} + +function shouldGunzipGoogleOauthTokenResponse( + bytes: Buffer, + contentEncoding: string | null, +): boolean { + if (bytes[0] === 0x1f && bytes[1] === 0x8b) { + return true; + } + return (contentEncoding ?? "") + .split(",") + .map((encoding) => encoding.trim().toLowerCase()) + .includes("gzip"); +} + async function resolveGoogleVertexAccessTokenViaGoogleAuth(): Promise { // Lazy-import + cache so we don't pay the google-auth-library load cost on // gateway startup; only when we actually need a non-authorized_user token.