mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-02 12:51:57 +00:00
fix(tlon): restore SSRF protection with event ack tracking
- Restore context.ts and channel-ops.ts for SSRF support - Restore sse-client.ts with urbitFetch for SSRF-protected requests - Add event ack tracking from openclaw-tlon (acks every 20 events) - Pass ssrfPolicy through authenticate() and UrbitSSEClient - Fixes security regression from sync with openclaw-tlon
This commit is contained in:
committed by
Josh Lehman
parent
0995118ec6
commit
bb8547e9b8
@@ -16,6 +16,7 @@ import { tlonOnboardingAdapter } from "./onboarding.js";
|
||||
import { formatTargetHint, normalizeShip, parseTlonTarget } from "./targets.js";
|
||||
import { resolveTlonAccount, listTlonAccountIds } from "./types.js";
|
||||
import { authenticate } from "./urbit/auth.js";
|
||||
import { ssrfPolicyFromAllowPrivateNetwork } from "./urbit/context.js";
|
||||
import { ensureUrbitConnectPatched, Urbit } from "./urbit/http-api.js";
|
||||
import {
|
||||
buildMediaStory,
|
||||
@@ -27,8 +28,14 @@ import {
|
||||
import { uploadImageFromUrl } from "./urbit/upload.js";
|
||||
|
||||
// Simple HTTP-only poke that doesn't open an EventSource (avoids conflict with monitor's SSE)
|
||||
async function createHttpPokeApi(params: { url: string; code: string; ship: string }) {
|
||||
const cookie = await authenticate(params.url, params.code);
|
||||
async function createHttpPokeApi(params: {
|
||||
url: string;
|
||||
code: string;
|
||||
ship: string;
|
||||
allowPrivateNetwork?: boolean;
|
||||
}) {
|
||||
const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(params.allowPrivateNetwork);
|
||||
const cookie = await authenticate(params.url, params.code, { ssrfPolicy });
|
||||
const channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`;
|
||||
const channelUrl = `${params.url}/~/channel/${channelId}`;
|
||||
const shipName = params.ship.replace(/^~/, "");
|
||||
@@ -172,6 +179,7 @@ const tlonOutbound: ChannelOutboundAdapter = {
|
||||
url: account.url,
|
||||
ship: account.ship,
|
||||
code: account.code,
|
||||
allowPrivateNetwork: account.allowPrivateNetwork ?? undefined,
|
||||
});
|
||||
|
||||
try {
|
||||
@@ -226,6 +234,7 @@ const tlonOutbound: ChannelOutboundAdapter = {
|
||||
url: account.url,
|
||||
ship: account.ship,
|
||||
code: account.code,
|
||||
allowPrivateNetwork: account.allowPrivateNetwork ?? undefined,
|
||||
});
|
||||
|
||||
try {
|
||||
|
||||
@@ -5,6 +5,7 @@ import { createSettingsManager, type TlonSettingsStore } from "../settings.js";
|
||||
import { normalizeShip, parseChannelNest } from "../targets.js";
|
||||
import { resolveTlonAccount } from "../types.js";
|
||||
import { authenticate } from "../urbit/auth.js";
|
||||
import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js";
|
||||
import type { Foreigns, DmInvite } from "../urbit/foreigns.js";
|
||||
import { sendDm, sendGroupMessage } from "../urbit/send.js";
|
||||
import { UrbitSSEClient } from "../urbit/sse-client.js";
|
||||
@@ -104,10 +105,12 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
|
||||
|
||||
let api: UrbitSSEClient | null = null;
|
||||
try {
|
||||
const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(account.allowPrivateNetwork);
|
||||
runtime.log?.(`[tlon] Attempting authentication to ${account.url}...`);
|
||||
const cookie = await authenticate(account.url, account.code);
|
||||
const cookie = await authenticate(account.url, account.code, { ssrfPolicy });
|
||||
api = new UrbitSSEClient(account.url, cookie, {
|
||||
ship: botShipName,
|
||||
ssrfPolicy,
|
||||
logger: {
|
||||
log: (message) => runtime.log?.(message),
|
||||
error: (message) => runtime.error?.(message),
|
||||
|
||||
164
extensions/tlon/src/urbit/channel-ops.ts
Normal file
164
extensions/tlon/src/urbit/channel-ops.ts
Normal file
@@ -0,0 +1,164 @@
|
||||
import type { LookupFn, SsrFPolicy } from "openclaw/plugin-sdk";
|
||||
import { UrbitHttpError } from "./errors.js";
|
||||
import { urbitFetch } from "./fetch.js";
|
||||
|
||||
export type UrbitChannelDeps = {
|
||||
baseUrl: string;
|
||||
cookie: string;
|
||||
ship: string;
|
||||
channelId: string;
|
||||
ssrfPolicy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>;
|
||||
};
|
||||
|
||||
export async function pokeUrbitChannel(
|
||||
deps: UrbitChannelDeps,
|
||||
params: { app: string; mark: string; json: unknown; auditContext: string },
|
||||
): Promise<number> {
|
||||
const pokeId = Date.now();
|
||||
const pokeData = {
|
||||
id: pokeId,
|
||||
action: "poke",
|
||||
ship: deps.ship,
|
||||
app: params.app,
|
||||
mark: params.mark,
|
||||
json: params.json,
|
||||
};
|
||||
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: deps.baseUrl,
|
||||
path: `/~/channel/${deps.channelId}`,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: deps.cookie,
|
||||
},
|
||||
body: JSON.stringify([pokeData]),
|
||||
},
|
||||
ssrfPolicy: deps.ssrfPolicy,
|
||||
lookupFn: deps.lookupFn,
|
||||
fetchImpl: deps.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: params.auditContext,
|
||||
});
|
||||
|
||||
try {
|
||||
if (!response.ok && response.status !== 204) {
|
||||
const errorText = await response.text().catch(() => "");
|
||||
throw new Error(`Poke failed: ${response.status}${errorText ? ` - ${errorText}` : ""}`);
|
||||
}
|
||||
return pokeId;
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
export async function scryUrbitPath(
|
||||
deps: Pick<UrbitChannelDeps, "baseUrl" | "cookie" | "ssrfPolicy" | "lookupFn" | "fetchImpl">,
|
||||
params: { path: string; auditContext: string },
|
||||
): Promise<unknown> {
|
||||
const scryPath = `/~/scry${params.path}`;
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: deps.baseUrl,
|
||||
path: scryPath,
|
||||
init: {
|
||||
method: "GET",
|
||||
headers: { Cookie: deps.cookie },
|
||||
},
|
||||
ssrfPolicy: deps.ssrfPolicy,
|
||||
lookupFn: deps.lookupFn,
|
||||
fetchImpl: deps.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: params.auditContext,
|
||||
});
|
||||
|
||||
try {
|
||||
if (!response.ok) {
|
||||
throw new Error(`Scry failed: ${response.status} for path ${params.path}`);
|
||||
}
|
||||
return await response.json();
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
export async function createUrbitChannel(
|
||||
deps: UrbitChannelDeps,
|
||||
params: { body: unknown; auditContext: string },
|
||||
): Promise<void> {
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: deps.baseUrl,
|
||||
path: `/~/channel/${deps.channelId}`,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: deps.cookie,
|
||||
},
|
||||
body: JSON.stringify(params.body),
|
||||
},
|
||||
ssrfPolicy: deps.ssrfPolicy,
|
||||
lookupFn: deps.lookupFn,
|
||||
fetchImpl: deps.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: params.auditContext,
|
||||
});
|
||||
|
||||
try {
|
||||
if (!response.ok && response.status !== 204) {
|
||||
throw new UrbitHttpError({ operation: "Channel creation", status: response.status });
|
||||
}
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
export async function wakeUrbitChannel(deps: UrbitChannelDeps): Promise<void> {
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: deps.baseUrl,
|
||||
path: `/~/channel/${deps.channelId}`,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: deps.cookie,
|
||||
},
|
||||
body: JSON.stringify([
|
||||
{
|
||||
id: Date.now(),
|
||||
action: "poke",
|
||||
ship: deps.ship,
|
||||
app: "hood",
|
||||
mark: "helm-hi",
|
||||
json: "Opening API channel",
|
||||
},
|
||||
]),
|
||||
},
|
||||
ssrfPolicy: deps.ssrfPolicy,
|
||||
lookupFn: deps.lookupFn,
|
||||
fetchImpl: deps.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: "tlon-urbit-channel-wake",
|
||||
});
|
||||
|
||||
try {
|
||||
if (!response.ok && response.status !== 204) {
|
||||
throw new UrbitHttpError({ operation: "Channel activation", status: response.status });
|
||||
}
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
export async function ensureUrbitChannelOpen(
|
||||
deps: UrbitChannelDeps,
|
||||
params: { createBody: unknown; createAuditContext: string },
|
||||
): Promise<void> {
|
||||
await createUrbitChannel(deps, {
|
||||
body: params.createBody,
|
||||
auditContext: params.createAuditContext,
|
||||
});
|
||||
await wakeUrbitChannel(deps);
|
||||
}
|
||||
47
extensions/tlon/src/urbit/context.ts
Normal file
47
extensions/tlon/src/urbit/context.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import type { SsrFPolicy } from "openclaw/plugin-sdk";
|
||||
import { validateUrbitBaseUrl } from "./base-url.js";
|
||||
import { UrbitUrlError } from "./errors.js";
|
||||
|
||||
export type UrbitContext = {
|
||||
baseUrl: string;
|
||||
hostname: string;
|
||||
ship: string;
|
||||
};
|
||||
|
||||
export function resolveShipFromHostname(hostname: string): string {
|
||||
const trimmed = hostname.trim().toLowerCase().replace(/\.$/, "");
|
||||
if (!trimmed) {
|
||||
return "";
|
||||
}
|
||||
if (trimmed.includes(".")) {
|
||||
return trimmed.split(".")[0] ?? trimmed;
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
export function normalizeUrbitShip(ship: string | undefined, hostname: string): string {
|
||||
const raw = ship?.replace(/^~/, "") ?? resolveShipFromHostname(hostname);
|
||||
return raw.trim();
|
||||
}
|
||||
|
||||
export function normalizeUrbitCookie(cookie: string): string {
|
||||
return cookie.split(";")[0] ?? cookie;
|
||||
}
|
||||
|
||||
export function getUrbitContext(url: string, ship?: string): UrbitContext {
|
||||
const validated = validateUrbitBaseUrl(url);
|
||||
if (!validated.ok) {
|
||||
throw new UrbitUrlError(validated.error);
|
||||
}
|
||||
return {
|
||||
baseUrl: validated.baseUrl,
|
||||
hostname: validated.hostname,
|
||||
ship: normalizeUrbitShip(ship, validated.hostname),
|
||||
};
|
||||
}
|
||||
|
||||
export function ssrfPolicyFromAllowPrivateNetwork(
|
||||
allowPrivateNetwork: boolean | null | undefined,
|
||||
): SsrFPolicy | undefined {
|
||||
return allowPrivateNetwork ? { allowPrivateNetwork: true } : undefined;
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { Readable } from "node:stream";
|
||||
import type { LookupFn, SsrFPolicy } from "openclaw/plugin-sdk";
|
||||
import { ensureUrbitChannelOpen, pokeUrbitChannel, scryUrbitPath } from "./channel-ops.js";
|
||||
import { getUrbitContext, normalizeUrbitCookie } from "./context.js";
|
||||
import { urbitFetch } from "./fetch.js";
|
||||
|
||||
export type UrbitSseLogger = {
|
||||
log?: (message: string) => void;
|
||||
@@ -9,6 +11,9 @@ export type UrbitSseLogger = {
|
||||
|
||||
type UrbitSseOptions = {
|
||||
ship?: string;
|
||||
ssrfPolicy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>;
|
||||
onReconnect?: (client: UrbitSSEClient) => Promise<void> | void;
|
||||
autoReconnect?: boolean;
|
||||
maxReconnectAttempts?: number;
|
||||
@@ -44,6 +49,10 @@ export class UrbitSSEClient {
|
||||
maxReconnectDelay: number;
|
||||
isConnected = false;
|
||||
logger: UrbitSseLogger;
|
||||
ssrfPolicy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>;
|
||||
streamRelease: (() => Promise<void>) | null = null;
|
||||
|
||||
// Event ack tracking - must ack every ~50 events to keep channel healthy
|
||||
private lastHeardEventId = -1;
|
||||
@@ -55,7 +64,7 @@ export class UrbitSSEClient {
|
||||
this.url = ctx.baseUrl;
|
||||
this.cookie = normalizeUrbitCookie(cookie);
|
||||
this.ship = ctx.ship;
|
||||
this.channelId = `${Math.floor(Date.now() / 1000)}-${randomUUID()}`;
|
||||
this.channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`;
|
||||
this.channelUrl = new URL(`/~/channel/${this.channelId}`, this.url).toString();
|
||||
this.onReconnect = options.onReconnect ?? null;
|
||||
this.autoReconnect = options.autoReconnect !== false;
|
||||
@@ -63,6 +72,9 @@ export class UrbitSSEClient {
|
||||
this.reconnectDelay = options.reconnectDelay ?? 1000;
|
||||
this.maxReconnectDelay = options.maxReconnectDelay ?? 30000;
|
||||
this.logger = options.logger ?? {};
|
||||
this.ssrfPolicy = options.ssrfPolicy;
|
||||
this.lookupFn = options.lookupFn;
|
||||
this.fetchImpl = options.fetchImpl;
|
||||
}
|
||||
|
||||
async subscribe(params: {
|
||||
@@ -102,56 +114,52 @@ export class UrbitSSEClient {
|
||||
app: string;
|
||||
path: string;
|
||||
}) {
|
||||
const response = await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: this.url,
|
||||
path: `/~/channel/${this.channelId}`,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
body: JSON.stringify([subscription]),
|
||||
},
|
||||
body: JSON.stringify([subscription]),
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: "tlon-urbit-subscribe",
|
||||
});
|
||||
|
||||
if (!response.ok && response.status !== 204) {
|
||||
const errorText = await response.text();
|
||||
throw new Error(`Subscribe failed: ${response.status} - ${errorText}`);
|
||||
try {
|
||||
if (!response.ok && response.status !== 204) {
|
||||
const errorText = await response.text().catch(() => "");
|
||||
throw new Error(
|
||||
`Subscribe failed: ${response.status}${errorText ? ` - ${errorText}` : ""}`,
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
async connect() {
|
||||
const createResp = await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
await ensureUrbitChannelOpen(
|
||||
{
|
||||
baseUrl: this.url,
|
||||
cookie: this.cookie,
|
||||
ship: this.ship,
|
||||
channelId: this.channelId,
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
},
|
||||
body: JSON.stringify(this.subscriptions),
|
||||
});
|
||||
|
||||
if (!createResp.ok && createResp.status !== 204) {
|
||||
throw new Error(`Channel creation failed: ${createResp.status}`);
|
||||
}
|
||||
|
||||
const pokeResp = await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
{
|
||||
createBody: this.subscriptions,
|
||||
createAuditContext: "tlon-urbit-channel-create",
|
||||
},
|
||||
body: JSON.stringify([
|
||||
{
|
||||
id: Date.now(),
|
||||
action: "poke",
|
||||
ship: this.ship,
|
||||
app: "hood",
|
||||
mark: "helm-hi",
|
||||
json: "Opening API channel",
|
||||
},
|
||||
]),
|
||||
});
|
||||
|
||||
if (!pokeResp.ok && pokeResp.status !== 204) {
|
||||
throw new Error(`Channel activation failed: ${pokeResp.status}`);
|
||||
}
|
||||
);
|
||||
|
||||
await this.openStream();
|
||||
this.isConnected = true;
|
||||
@@ -159,15 +167,38 @@ export class UrbitSSEClient {
|
||||
}
|
||||
|
||||
async openStream() {
|
||||
const response = await fetch(this.channelUrl, {
|
||||
method: "GET",
|
||||
headers: {
|
||||
Accept: "text/event-stream",
|
||||
Cookie: this.cookie,
|
||||
// Use AbortController with manual timeout so we only abort during initial connection,
|
||||
// not after the SSE stream is established and actively streaming.
|
||||
const controller = new AbortController();
|
||||
const timeoutId = setTimeout(() => controller.abort(), 60_000);
|
||||
|
||||
this.streamController = controller;
|
||||
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: this.url,
|
||||
path: `/~/channel/${this.channelId}`,
|
||||
init: {
|
||||
method: "GET",
|
||||
headers: {
|
||||
Accept: "text/event-stream",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
},
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
signal: controller.signal,
|
||||
auditContext: "tlon-urbit-sse-stream",
|
||||
});
|
||||
|
||||
this.streamRelease = release;
|
||||
|
||||
// Clear timeout once connection established (headers received).
|
||||
clearTimeout(timeoutId);
|
||||
|
||||
if (!response.ok) {
|
||||
await release();
|
||||
this.streamRelease = null;
|
||||
throw new Error(`Stream connection failed: ${response.status}`);
|
||||
}
|
||||
|
||||
@@ -187,8 +218,8 @@ export class UrbitSSEClient {
|
||||
if (!body) {
|
||||
return;
|
||||
}
|
||||
// @ts-expect-error - ReadableStream type variance issue between DOM and Node types
|
||||
const stream = body instanceof ReadableStream ? Readable.fromWeb(body) : body;
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
const stream = body instanceof ReadableStream ? Readable.fromWeb(body as any) : body;
|
||||
let buffer = "";
|
||||
|
||||
try {
|
||||
@@ -205,6 +236,12 @@ export class UrbitSSEClient {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (this.streamRelease) {
|
||||
const release = this.streamRelease;
|
||||
this.streamRelease = null;
|
||||
await release();
|
||||
}
|
||||
this.streamController = null;
|
||||
if (!this.aborted && this.autoReconnect) {
|
||||
this.isConnected = false;
|
||||
this.logger.log?.("[SSE] Stream ended, attempting reconnection...");
|
||||
@@ -219,12 +256,12 @@ export class UrbitSSEClient {
|
||||
let eventId: number | null = null;
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith("data: ")) {
|
||||
data = line.substring(6);
|
||||
}
|
||||
if (line.startsWith("id: ")) {
|
||||
eventId = parseInt(line.substring(4), 10);
|
||||
}
|
||||
if (line.startsWith("data: ")) {
|
||||
data = line.substring(6);
|
||||
}
|
||||
}
|
||||
|
||||
if (!data) {
|
||||
@@ -273,72 +310,68 @@ export class UrbitSSEClient {
|
||||
}
|
||||
}
|
||||
|
||||
async poke(params: { app: string; mark: string; json: unknown }) {
|
||||
return await pokeUrbitChannel(
|
||||
{
|
||||
baseUrl: this.url,
|
||||
cookie: this.cookie,
|
||||
ship: this.ship,
|
||||
channelId: this.channelId,
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
},
|
||||
{ ...params, auditContext: "tlon-urbit-poke" },
|
||||
);
|
||||
}
|
||||
|
||||
async scry(path: string) {
|
||||
return await scryUrbitPath(
|
||||
{
|
||||
baseUrl: this.url,
|
||||
cookie: this.cookie,
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
},
|
||||
{ path, auditContext: "tlon-urbit-scry" },
|
||||
);
|
||||
}
|
||||
|
||||
private async ack(eventId: number): Promise<void> {
|
||||
this.lastAcknowledgedEventId = eventId;
|
||||
|
||||
const ackData = {
|
||||
id: Date.now(),
|
||||
action: "ack",
|
||||
"event-id": eventId,
|
||||
};
|
||||
|
||||
const response = await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: this.url,
|
||||
path: `/~/channel/${this.channelId}`,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
body: JSON.stringify([ackData]),
|
||||
},
|
||||
body: JSON.stringify([ackData]),
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
timeoutMs: 10_000,
|
||||
auditContext: "tlon-urbit-ack",
|
||||
});
|
||||
|
||||
if (!response.ok && response.status !== 204) {
|
||||
throw new Error(`Ack failed: ${response.status}`);
|
||||
try {
|
||||
if (!response.ok) {
|
||||
throw new Error(`Ack failed with status ${response.status}`);
|
||||
}
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
|
||||
this.logger.log?.(`[SSE] Acked event ${eventId}`);
|
||||
}
|
||||
|
||||
async poke(params: { app: string; mark: string; json: unknown }) {
|
||||
const pokeId = Date.now();
|
||||
const pokeData = {
|
||||
id: pokeId,
|
||||
action: "poke",
|
||||
ship: this.ship,
|
||||
app: params.app,
|
||||
mark: params.mark,
|
||||
json: params.json,
|
||||
};
|
||||
|
||||
const response = await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
body: JSON.stringify([pokeData]),
|
||||
});
|
||||
|
||||
if (!response.ok && response.status !== 204) {
|
||||
const errorText = await response.text();
|
||||
throw new Error(`Poke failed: ${response.status} - ${errorText}`);
|
||||
}
|
||||
|
||||
return pokeId;
|
||||
}
|
||||
|
||||
async scry(path: string) {
|
||||
const scryUrl = `${this.url}/~/scry${path}`;
|
||||
const response = await fetch(scryUrl, {
|
||||
method: "GET",
|
||||
headers: {
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Scry failed: ${response.status} for path ${path}`);
|
||||
}
|
||||
|
||||
return await response.json();
|
||||
}
|
||||
|
||||
async attemptReconnect() {
|
||||
@@ -347,16 +380,11 @@ export class UrbitSSEClient {
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset after max attempts with extended backoff, then continue trying forever
|
||||
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
this.logger.log?.(
|
||||
`[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Will reset and retry after extended backoff...`,
|
||||
this.logger.error?.(
|
||||
`[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Giving up.`,
|
||||
);
|
||||
// Wait 10 seconds before resetting and trying again
|
||||
const extendedBackoff = 10000; // 10 seconds
|
||||
await new Promise((resolve) => setTimeout(resolve, extendedBackoff));
|
||||
this.reconnectAttempts = 0; // Reset counter to continue trying
|
||||
this.logger.log?.("[SSE] Reconnection attempts reset, resuming reconnection...");
|
||||
return;
|
||||
}
|
||||
|
||||
this.reconnectAttempts += 1;
|
||||
@@ -372,7 +400,7 @@ export class UrbitSSEClient {
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
|
||||
try {
|
||||
this.channelId = `${Math.floor(Date.now() / 1000)}-${randomUUID()}`;
|
||||
this.channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`;
|
||||
this.channelUrl = new URL(`/~/channel/${this.channelId}`, this.url).toString();
|
||||
|
||||
if (this.onReconnect) {
|
||||
@@ -390,6 +418,7 @@ export class UrbitSSEClient {
|
||||
async close() {
|
||||
this.aborted = true;
|
||||
this.isConnected = false;
|
||||
this.streamController?.abort();
|
||||
|
||||
try {
|
||||
const unsubscribes = this.subscriptions.map((sub) => ({
|
||||
@@ -398,23 +427,61 @@ export class UrbitSSEClient {
|
||||
subscription: sub.id,
|
||||
}));
|
||||
|
||||
await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
body: JSON.stringify(unsubscribes),
|
||||
});
|
||||
{
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: this.url,
|
||||
path: `/~/channel/${this.channelId}`,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
body: JSON.stringify(unsubscribes),
|
||||
},
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: "tlon-urbit-unsubscribe",
|
||||
});
|
||||
try {
|
||||
void response.body?.cancel();
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
await fetch(this.channelUrl, {
|
||||
method: "DELETE",
|
||||
headers: {
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
});
|
||||
{
|
||||
const { response, release } = await urbitFetch({
|
||||
baseUrl: this.url,
|
||||
path: `/~/channel/${this.channelId}`,
|
||||
init: {
|
||||
method: "DELETE",
|
||||
headers: {
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
},
|
||||
ssrfPolicy: this.ssrfPolicy,
|
||||
lookupFn: this.lookupFn,
|
||||
fetchImpl: this.fetchImpl,
|
||||
timeoutMs: 30_000,
|
||||
auditContext: "tlon-urbit-channel-close",
|
||||
});
|
||||
try {
|
||||
void response.body?.cancel();
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error?.(`Error closing channel: ${String(error)}`);
|
||||
}
|
||||
|
||||
if (this.streamRelease) {
|
||||
const release = this.streamRelease;
|
||||
this.streamRelease = null;
|
||||
await release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user