From bb8547e9b82ce163deae86f59844a1048917c7bb Mon Sep 17 00:00:00 2001 From: Hunter Miller Date: Thu, 19 Feb 2026 11:02:19 -0600 Subject: [PATCH] fix(tlon): restore SSRF protection with event ack tracking - Restore context.ts and channel-ops.ts for SSRF support - Restore sse-client.ts with urbitFetch for SSRF-protected requests - Add event ack tracking from openclaw-tlon (acks every 20 events) - Pass ssrfPolicy through authenticate() and UrbitSSEClient - Fixes security regression from sync with openclaw-tlon --- extensions/tlon/src/channel.ts | 13 +- extensions/tlon/src/monitor/index.ts | 5 +- extensions/tlon/src/urbit/channel-ops.ts | 164 ++++++++++++ extensions/tlon/src/urbit/context.ts | 47 ++++ extensions/tlon/src/urbit/sse-client.ts | 327 ++++++++++++++--------- 5 files changed, 423 insertions(+), 133 deletions(-) create mode 100644 extensions/tlon/src/urbit/channel-ops.ts create mode 100644 extensions/tlon/src/urbit/context.ts diff --git a/extensions/tlon/src/channel.ts b/extensions/tlon/src/channel.ts index ac5881273c3..cecf1dd536b 100644 --- a/extensions/tlon/src/channel.ts +++ b/extensions/tlon/src/channel.ts @@ -16,6 +16,7 @@ import { tlonOnboardingAdapter } from "./onboarding.js"; import { formatTargetHint, normalizeShip, parseTlonTarget } from "./targets.js"; import { resolveTlonAccount, listTlonAccountIds } from "./types.js"; import { authenticate } from "./urbit/auth.js"; +import { ssrfPolicyFromAllowPrivateNetwork } from "./urbit/context.js"; import { ensureUrbitConnectPatched, Urbit } from "./urbit/http-api.js"; import { buildMediaStory, @@ -27,8 +28,14 @@ import { import { uploadImageFromUrl } from "./urbit/upload.js"; // Simple HTTP-only poke that doesn't open an EventSource (avoids conflict with monitor's SSE) -async function createHttpPokeApi(params: { url: string; code: string; ship: string }) { - const cookie = await authenticate(params.url, params.code); +async function createHttpPokeApi(params: { + url: string; + code: string; + ship: string; + allowPrivateNetwork?: boolean; +}) { + const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(params.allowPrivateNetwork); + const cookie = await authenticate(params.url, params.code, { ssrfPolicy }); const channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`; const channelUrl = `${params.url}/~/channel/${channelId}`; const shipName = params.ship.replace(/^~/, ""); @@ -172,6 +179,7 @@ const tlonOutbound: ChannelOutboundAdapter = { url: account.url, ship: account.ship, code: account.code, + allowPrivateNetwork: account.allowPrivateNetwork ?? undefined, }); try { @@ -226,6 +234,7 @@ const tlonOutbound: ChannelOutboundAdapter = { url: account.url, ship: account.ship, code: account.code, + allowPrivateNetwork: account.allowPrivateNetwork ?? undefined, }); try { diff --git a/extensions/tlon/src/monitor/index.ts b/extensions/tlon/src/monitor/index.ts index cc438bb57ec..ee74d8a3495 100644 --- a/extensions/tlon/src/monitor/index.ts +++ b/extensions/tlon/src/monitor/index.ts @@ -5,6 +5,7 @@ import { createSettingsManager, type TlonSettingsStore } from "../settings.js"; import { normalizeShip, parseChannelNest } from "../targets.js"; import { resolveTlonAccount } from "../types.js"; import { authenticate } from "../urbit/auth.js"; +import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js"; import type { Foreigns, DmInvite } from "../urbit/foreigns.js"; import { sendDm, sendGroupMessage } from "../urbit/send.js"; import { UrbitSSEClient } from "../urbit/sse-client.js"; @@ -104,10 +105,12 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise runtime.log?.(message), error: (message) => runtime.error?.(message), diff --git a/extensions/tlon/src/urbit/channel-ops.ts b/extensions/tlon/src/urbit/channel-ops.ts new file mode 100644 index 00000000000..077e8d01816 --- /dev/null +++ b/extensions/tlon/src/urbit/channel-ops.ts @@ -0,0 +1,164 @@ +import type { LookupFn, SsrFPolicy } from "openclaw/plugin-sdk"; +import { UrbitHttpError } from "./errors.js"; +import { urbitFetch } from "./fetch.js"; + +export type UrbitChannelDeps = { + baseUrl: string; + cookie: string; + ship: string; + channelId: string; + ssrfPolicy?: SsrFPolicy; + lookupFn?: LookupFn; + fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; +}; + +export async function pokeUrbitChannel( + deps: UrbitChannelDeps, + params: { app: string; mark: string; json: unknown; auditContext: string }, +): Promise { + const pokeId = Date.now(); + const pokeData = { + id: pokeId, + action: "poke", + ship: deps.ship, + app: params.app, + mark: params.mark, + json: params.json, + }; + + const { response, release } = await urbitFetch({ + baseUrl: deps.baseUrl, + path: `/~/channel/${deps.channelId}`, + init: { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: deps.cookie, + }, + body: JSON.stringify([pokeData]), + }, + ssrfPolicy: deps.ssrfPolicy, + lookupFn: deps.lookupFn, + fetchImpl: deps.fetchImpl, + timeoutMs: 30_000, + auditContext: params.auditContext, + }); + + try { + if (!response.ok && response.status !== 204) { + const errorText = await response.text().catch(() => ""); + throw new Error(`Poke failed: ${response.status}${errorText ? ` - ${errorText}` : ""}`); + } + return pokeId; + } finally { + await release(); + } +} + +export async function scryUrbitPath( + deps: Pick, + params: { path: string; auditContext: string }, +): Promise { + const scryPath = `/~/scry${params.path}`; + const { response, release } = await urbitFetch({ + baseUrl: deps.baseUrl, + path: scryPath, + init: { + method: "GET", + headers: { Cookie: deps.cookie }, + }, + ssrfPolicy: deps.ssrfPolicy, + lookupFn: deps.lookupFn, + fetchImpl: deps.fetchImpl, + timeoutMs: 30_000, + auditContext: params.auditContext, + }); + + try { + if (!response.ok) { + throw new Error(`Scry failed: ${response.status} for path ${params.path}`); + } + return await response.json(); + } finally { + await release(); + } +} + +export async function createUrbitChannel( + deps: UrbitChannelDeps, + params: { body: unknown; auditContext: string }, +): Promise { + const { response, release } = await urbitFetch({ + baseUrl: deps.baseUrl, + path: `/~/channel/${deps.channelId}`, + init: { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: deps.cookie, + }, + body: JSON.stringify(params.body), + }, + ssrfPolicy: deps.ssrfPolicy, + lookupFn: deps.lookupFn, + fetchImpl: deps.fetchImpl, + timeoutMs: 30_000, + auditContext: params.auditContext, + }); + + try { + if (!response.ok && response.status !== 204) { + throw new UrbitHttpError({ operation: "Channel creation", status: response.status }); + } + } finally { + await release(); + } +} + +export async function wakeUrbitChannel(deps: UrbitChannelDeps): Promise { + const { response, release } = await urbitFetch({ + baseUrl: deps.baseUrl, + path: `/~/channel/${deps.channelId}`, + init: { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: deps.cookie, + }, + body: JSON.stringify([ + { + id: Date.now(), + action: "poke", + ship: deps.ship, + app: "hood", + mark: "helm-hi", + json: "Opening API channel", + }, + ]), + }, + ssrfPolicy: deps.ssrfPolicy, + lookupFn: deps.lookupFn, + fetchImpl: deps.fetchImpl, + timeoutMs: 30_000, + auditContext: "tlon-urbit-channel-wake", + }); + + try { + if (!response.ok && response.status !== 204) { + throw new UrbitHttpError({ operation: "Channel activation", status: response.status }); + } + } finally { + await release(); + } +} + +export async function ensureUrbitChannelOpen( + deps: UrbitChannelDeps, + params: { createBody: unknown; createAuditContext: string }, +): Promise { + await createUrbitChannel(deps, { + body: params.createBody, + auditContext: params.createAuditContext, + }); + await wakeUrbitChannel(deps); +} diff --git a/extensions/tlon/src/urbit/context.ts b/extensions/tlon/src/urbit/context.ts new file mode 100644 index 00000000000..90c2721c7b8 --- /dev/null +++ b/extensions/tlon/src/urbit/context.ts @@ -0,0 +1,47 @@ +import type { SsrFPolicy } from "openclaw/plugin-sdk"; +import { validateUrbitBaseUrl } from "./base-url.js"; +import { UrbitUrlError } from "./errors.js"; + +export type UrbitContext = { + baseUrl: string; + hostname: string; + ship: string; +}; + +export function resolveShipFromHostname(hostname: string): string { + const trimmed = hostname.trim().toLowerCase().replace(/\.$/, ""); + if (!trimmed) { + return ""; + } + if (trimmed.includes(".")) { + return trimmed.split(".")[0] ?? trimmed; + } + return trimmed; +} + +export function normalizeUrbitShip(ship: string | undefined, hostname: string): string { + const raw = ship?.replace(/^~/, "") ?? resolveShipFromHostname(hostname); + return raw.trim(); +} + +export function normalizeUrbitCookie(cookie: string): string { + return cookie.split(";")[0] ?? cookie; +} + +export function getUrbitContext(url: string, ship?: string): UrbitContext { + const validated = validateUrbitBaseUrl(url); + if (!validated.ok) { + throw new UrbitUrlError(validated.error); + } + return { + baseUrl: validated.baseUrl, + hostname: validated.hostname, + ship: normalizeUrbitShip(ship, validated.hostname), + }; +} + +export function ssrfPolicyFromAllowPrivateNetwork( + allowPrivateNetwork: boolean | null | undefined, +): SsrFPolicy | undefined { + return allowPrivateNetwork ? { allowPrivateNetwork: true } : undefined; +} diff --git a/extensions/tlon/src/urbit/sse-client.ts b/extensions/tlon/src/urbit/sse-client.ts index fc793f8b40f..65736905a65 100644 --- a/extensions/tlon/src/urbit/sse-client.ts +++ b/extensions/tlon/src/urbit/sse-client.ts @@ -1,6 +1,8 @@ -import { randomUUID } from "node:crypto"; import { Readable } from "node:stream"; +import type { LookupFn, SsrFPolicy } from "openclaw/plugin-sdk"; +import { ensureUrbitChannelOpen, pokeUrbitChannel, scryUrbitPath } from "./channel-ops.js"; import { getUrbitContext, normalizeUrbitCookie } from "./context.js"; +import { urbitFetch } from "./fetch.js"; export type UrbitSseLogger = { log?: (message: string) => void; @@ -9,6 +11,9 @@ export type UrbitSseLogger = { type UrbitSseOptions = { ship?: string; + ssrfPolicy?: SsrFPolicy; + lookupFn?: LookupFn; + fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; onReconnect?: (client: UrbitSSEClient) => Promise | void; autoReconnect?: boolean; maxReconnectAttempts?: number; @@ -44,6 +49,10 @@ export class UrbitSSEClient { maxReconnectDelay: number; isConnected = false; logger: UrbitSseLogger; + ssrfPolicy?: SsrFPolicy; + lookupFn?: LookupFn; + fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; + streamRelease: (() => Promise) | null = null; // Event ack tracking - must ack every ~50 events to keep channel healthy private lastHeardEventId = -1; @@ -55,7 +64,7 @@ export class UrbitSSEClient { this.url = ctx.baseUrl; this.cookie = normalizeUrbitCookie(cookie); this.ship = ctx.ship; - this.channelId = `${Math.floor(Date.now() / 1000)}-${randomUUID()}`; + this.channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`; this.channelUrl = new URL(`/~/channel/${this.channelId}`, this.url).toString(); this.onReconnect = options.onReconnect ?? null; this.autoReconnect = options.autoReconnect !== false; @@ -63,6 +72,9 @@ export class UrbitSSEClient { this.reconnectDelay = options.reconnectDelay ?? 1000; this.maxReconnectDelay = options.maxReconnectDelay ?? 30000; this.logger = options.logger ?? {}; + this.ssrfPolicy = options.ssrfPolicy; + this.lookupFn = options.lookupFn; + this.fetchImpl = options.fetchImpl; } async subscribe(params: { @@ -102,56 +114,52 @@ export class UrbitSSEClient { app: string; path: string; }) { - const response = await fetch(this.channelUrl, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, + const { response, release } = await urbitFetch({ + baseUrl: this.url, + path: `/~/channel/${this.channelId}`, + init: { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify([subscription]), }, - body: JSON.stringify([subscription]), + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + timeoutMs: 30_000, + auditContext: "tlon-urbit-subscribe", }); - if (!response.ok && response.status !== 204) { - const errorText = await response.text(); - throw new Error(`Subscribe failed: ${response.status} - ${errorText}`); + try { + if (!response.ok && response.status !== 204) { + const errorText = await response.text().catch(() => ""); + throw new Error( + `Subscribe failed: ${response.status}${errorText ? ` - ${errorText}` : ""}`, + ); + } + } finally { + await release(); } } async connect() { - const createResp = await fetch(this.channelUrl, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, + await ensureUrbitChannelOpen( + { + baseUrl: this.url, + cookie: this.cookie, + ship: this.ship, + channelId: this.channelId, + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, }, - body: JSON.stringify(this.subscriptions), - }); - - if (!createResp.ok && createResp.status !== 204) { - throw new Error(`Channel creation failed: ${createResp.status}`); - } - - const pokeResp = await fetch(this.channelUrl, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, + { + createBody: this.subscriptions, + createAuditContext: "tlon-urbit-channel-create", }, - body: JSON.stringify([ - { - id: Date.now(), - action: "poke", - ship: this.ship, - app: "hood", - mark: "helm-hi", - json: "Opening API channel", - }, - ]), - }); - - if (!pokeResp.ok && pokeResp.status !== 204) { - throw new Error(`Channel activation failed: ${pokeResp.status}`); - } + ); await this.openStream(); this.isConnected = true; @@ -159,15 +167,38 @@ export class UrbitSSEClient { } async openStream() { - const response = await fetch(this.channelUrl, { - method: "GET", - headers: { - Accept: "text/event-stream", - Cookie: this.cookie, + // Use AbortController with manual timeout so we only abort during initial connection, + // not after the SSE stream is established and actively streaming. + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 60_000); + + this.streamController = controller; + + const { response, release } = await urbitFetch({ + baseUrl: this.url, + path: `/~/channel/${this.channelId}`, + init: { + method: "GET", + headers: { + Accept: "text/event-stream", + Cookie: this.cookie, + }, }, + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + signal: controller.signal, + auditContext: "tlon-urbit-sse-stream", }); + this.streamRelease = release; + + // Clear timeout once connection established (headers received). + clearTimeout(timeoutId); + if (!response.ok) { + await release(); + this.streamRelease = null; throw new Error(`Stream connection failed: ${response.status}`); } @@ -187,8 +218,8 @@ export class UrbitSSEClient { if (!body) { return; } - // @ts-expect-error - ReadableStream type variance issue between DOM and Node types - const stream = body instanceof ReadableStream ? Readable.fromWeb(body) : body; + // oxlint-disable-next-line typescript/no-explicit-any + const stream = body instanceof ReadableStream ? Readable.fromWeb(body as any) : body; let buffer = ""; try { @@ -205,6 +236,12 @@ export class UrbitSSEClient { } } } finally { + if (this.streamRelease) { + const release = this.streamRelease; + this.streamRelease = null; + await release(); + } + this.streamController = null; if (!this.aborted && this.autoReconnect) { this.isConnected = false; this.logger.log?.("[SSE] Stream ended, attempting reconnection..."); @@ -219,12 +256,12 @@ export class UrbitSSEClient { let eventId: number | null = null; for (const line of lines) { - if (line.startsWith("data: ")) { - data = line.substring(6); - } if (line.startsWith("id: ")) { eventId = parseInt(line.substring(4), 10); } + if (line.startsWith("data: ")) { + data = line.substring(6); + } } if (!data) { @@ -273,72 +310,68 @@ export class UrbitSSEClient { } } + async poke(params: { app: string; mark: string; json: unknown }) { + return await pokeUrbitChannel( + { + baseUrl: this.url, + cookie: this.cookie, + ship: this.ship, + channelId: this.channelId, + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + }, + { ...params, auditContext: "tlon-urbit-poke" }, + ); + } + + async scry(path: string) { + return await scryUrbitPath( + { + baseUrl: this.url, + cookie: this.cookie, + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + }, + { path, auditContext: "tlon-urbit-scry" }, + ); + } + private async ack(eventId: number): Promise { this.lastAcknowledgedEventId = eventId; const ackData = { + id: Date.now(), action: "ack", "event-id": eventId, }; - const response = await fetch(this.channelUrl, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, + const { response, release } = await urbitFetch({ + baseUrl: this.url, + path: `/~/channel/${this.channelId}`, + init: { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify([ackData]), }, - body: JSON.stringify([ackData]), + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + timeoutMs: 10_000, + auditContext: "tlon-urbit-ack", }); - if (!response.ok && response.status !== 204) { - throw new Error(`Ack failed: ${response.status}`); + try { + if (!response.ok) { + throw new Error(`Ack failed with status ${response.status}`); + } + } finally { + await release(); } - - this.logger.log?.(`[SSE] Acked event ${eventId}`); - } - - async poke(params: { app: string; mark: string; json: unknown }) { - const pokeId = Date.now(); - const pokeData = { - id: pokeId, - action: "poke", - ship: this.ship, - app: params.app, - mark: params.mark, - json: params.json, - }; - - const response = await fetch(this.channelUrl, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, - }, - body: JSON.stringify([pokeData]), - }); - - if (!response.ok && response.status !== 204) { - const errorText = await response.text(); - throw new Error(`Poke failed: ${response.status} - ${errorText}`); - } - - return pokeId; - } - - async scry(path: string) { - const scryUrl = `${this.url}/~/scry${path}`; - const response = await fetch(scryUrl, { - method: "GET", - headers: { - Cookie: this.cookie, - }, - }); - - if (!response.ok) { - throw new Error(`Scry failed: ${response.status} for path ${path}`); - } - - return await response.json(); } async attemptReconnect() { @@ -347,16 +380,11 @@ export class UrbitSSEClient { return; } - // Reset after max attempts with extended backoff, then continue trying forever if (this.reconnectAttempts >= this.maxReconnectAttempts) { - this.logger.log?.( - `[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Will reset and retry after extended backoff...`, + this.logger.error?.( + `[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Giving up.`, ); - // Wait 10 seconds before resetting and trying again - const extendedBackoff = 10000; // 10 seconds - await new Promise((resolve) => setTimeout(resolve, extendedBackoff)); - this.reconnectAttempts = 0; // Reset counter to continue trying - this.logger.log?.("[SSE] Reconnection attempts reset, resuming reconnection..."); + return; } this.reconnectAttempts += 1; @@ -372,7 +400,7 @@ export class UrbitSSEClient { await new Promise((resolve) => setTimeout(resolve, delay)); try { - this.channelId = `${Math.floor(Date.now() / 1000)}-${randomUUID()}`; + this.channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`; this.channelUrl = new URL(`/~/channel/${this.channelId}`, this.url).toString(); if (this.onReconnect) { @@ -390,6 +418,7 @@ export class UrbitSSEClient { async close() { this.aborted = true; this.isConnected = false; + this.streamController?.abort(); try { const unsubscribes = this.subscriptions.map((sub) => ({ @@ -398,23 +427,61 @@ export class UrbitSSEClient { subscription: sub.id, })); - await fetch(this.channelUrl, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, - }, - body: JSON.stringify(unsubscribes), - }); + { + const { response, release } = await urbitFetch({ + baseUrl: this.url, + path: `/~/channel/${this.channelId}`, + init: { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify(unsubscribes), + }, + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + timeoutMs: 30_000, + auditContext: "tlon-urbit-unsubscribe", + }); + try { + void response.body?.cancel(); + } finally { + await release(); + } + } - await fetch(this.channelUrl, { - method: "DELETE", - headers: { - Cookie: this.cookie, - }, - }); + { + const { response, release } = await urbitFetch({ + baseUrl: this.url, + path: `/~/channel/${this.channelId}`, + init: { + method: "DELETE", + headers: { + Cookie: this.cookie, + }, + }, + ssrfPolicy: this.ssrfPolicy, + lookupFn: this.lookupFn, + fetchImpl: this.fetchImpl, + timeoutMs: 30_000, + auditContext: "tlon-urbit-channel-close", + }); + try { + void response.body?.cancel(); + } finally { + await release(); + } + } } catch (error) { this.logger.error?.(`Error closing channel: ${String(error)}`); } + + if (this.streamRelease) { + const release = this.streamRelease; + this.streamRelease = null; + await release(); + } } }