diff --git a/extensions/tlon/index.ts b/extensions/tlon/index.ts index 2a31956dd39..f96b40d51e7 100644 --- a/extensions/tlon/index.ts +++ b/extensions/tlon/index.ts @@ -1,8 +1,113 @@ +import { spawn } from "node:child_process"; +import { existsSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import { emptyPluginConfigSchema } from "openclaw/plugin-sdk"; import { tlonPlugin } from "./src/channel.js"; import { setTlonRuntime } from "./src/runtime.js"; +const __dirname = dirname(fileURLToPath(import.meta.url)); + +/** + * Find the tlon binary from the skill package + */ +function findTlonBinary(): string { + // Check in node_modules/.bin + const skillBin = join(__dirname, "node_modules", ".bin", "tlon"); + console.log(`[tlon] Checking for binary at: ${skillBin}, exists: ${existsSync(skillBin)}`); + if (existsSync(skillBin)) return skillBin; + + // Check for platform-specific binary directly + const platform = process.platform; + const arch = process.arch; + const platformPkg = `@tloncorp/tlon-skill-${platform}-${arch}`; + const platformBin = join(__dirname, "node_modules", platformPkg, "tlon"); + console.log( + `[tlon] Checking for platform binary at: ${platformBin}, exists: ${existsSync(platformBin)}`, + ); + if (existsSync(platformBin)) return platformBin; + + // Fallback to PATH + console.log(`[tlon] Falling back to PATH lookup for 'tlon'`); + return "tlon"; +} + +/** + * Shell-like argument splitter that respects quotes + */ +function shellSplit(str: string): string[] { + const args: string[] = []; + let cur = ""; + let inDouble = false; + let inSingle = false; + let escape = false; + + for (const ch of str) { + if (escape) { + cur += ch; + escape = false; + continue; + } + if (ch === "\\" && !inSingle) { + escape = true; + continue; + } + if (ch === '"' && !inSingle) { + inDouble = !inDouble; + continue; + } + if (ch === "'" && !inDouble) { + inSingle = !inSingle; + continue; + } + if (/\s/.test(ch) && !inDouble && !inSingle) { + if (cur) { + args.push(cur); + cur = ""; + } + continue; + } + cur += ch; + } + if (cur) args.push(cur); + return args; +} + +/** + * Run the tlon command and return the result + */ +function runTlonCommand(binary: string, args: string[]): Promise { + return new Promise((resolve, reject) => { + const child = spawn(binary, args, { + env: process.env, + }); + + let stdout = ""; + let stderr = ""; + + child.stdout.on("data", (data) => { + stdout += data.toString(); + }); + + child.stderr.on("data", (data) => { + stderr += data.toString(); + }); + + child.on("error", (err) => { + reject(new Error(`Failed to run tlon: ${err.message}`)); + }); + + child.on("close", (code) => { + if (code !== 0) { + reject(new Error(stderr || `tlon exited with code ${code}`)); + } else { + resolve(stdout); + } + }); + }); +} + const plugin = { id: "tlon", name: "Tlon", @@ -11,6 +116,44 @@ const plugin = { register(api: OpenClawPluginApi) { setTlonRuntime(api.runtime); api.registerChannel({ plugin: tlonPlugin }); + + // Register the tlon tool + const tlonBinary = findTlonBinary(); + api.logger.info(`[tlon] Registering tlon tool, binary: ${tlonBinary}`); + api.registerTool({ + name: "tlon", + label: "Tlon CLI", + description: + "Tlon/Urbit API operations: activity, channels, contacts, groups, messages, dms, posts, notebook, settings. " + + "Examples: 'activity mentions --limit 10', 'channels groups', 'contacts self', 'groups list'", + parameters: { + type: "object", + properties: { + command: { + type: "string", + description: + "The tlon command and arguments. " + + "Examples: 'activity mentions --limit 10', 'contacts get ~sampel-palnet', 'groups list'", + }, + }, + required: ["command"], + }, + async execute(_id: string, params: { command: string }) { + try { + const args = shellSplit(params.command); + const output = await runTlonCommand(tlonBinary, args); + return { + content: [{ type: "text" as const, text: output }], + details: undefined, + }; + } catch (error: any) { + return { + content: [{ type: "text" as const, text: `Error: ${error.message}` }], + details: { error: true }, + }; + } + }, + }); }, }; diff --git a/extensions/tlon/openclaw.plugin.json b/extensions/tlon/openclaw.plugin.json index aa4e78dfbb2..799cc0b184c 100644 --- a/extensions/tlon/openclaw.plugin.json +++ b/extensions/tlon/openclaw.plugin.json @@ -1,6 +1,7 @@ { "id": "tlon", "channels": ["tlon"], + "skills": ["node_modules/@tloncorp/tlon-skill"], "configSchema": { "type": "object", "additionalProperties": false, diff --git a/extensions/tlon/package.json b/extensions/tlon/package.json index 99c952536c9..fabadbb8b7c 100644 --- a/extensions/tlon/package.json +++ b/extensions/tlon/package.json @@ -4,7 +4,10 @@ "description": "OpenClaw Tlon/Urbit channel plugin", "type": "module", "dependencies": { - "@urbit/aura": "^3.0.0" + "@tloncorp/api": "github:tloncorp/api-beta#main", + "@tloncorp/tlon-skill": "0.1.9", + "@urbit/aura": "^3.0.0", + "@urbit/http-api": "^3.0.0" }, "openclaw": { "extensions": [ diff --git a/extensions/tlon/src/channel.ts b/extensions/tlon/src/channel.ts index cc7f14ea3e5..b7cbdc9963f 100644 --- a/extensions/tlon/src/channel.ts +++ b/extensions/tlon/src/channel.ts @@ -1,25 +1,72 @@ import type { - ChannelAccountSnapshot, ChannelOutboundAdapter, ChannelPlugin, ChannelSetupInput, OpenClawConfig, } from "openclaw/plugin-sdk"; +// NOTE: configureClient not available in current @tloncorp/api-beta version +// import { configureClient } from "@tloncorp/api"; import { applyAccountNameToChannelSection, DEFAULT_ACCOUNT_ID, normalizeAccountId, } from "openclaw/plugin-sdk"; -import { buildTlonAccountFields } from "./account-fields.js"; import { tlonChannelConfigSchema } from "./config-schema.js"; import { monitorTlonProvider } from "./monitor/index.js"; import { tlonOnboardingAdapter } from "./onboarding.js"; import { formatTargetHint, normalizeShip, parseTlonTarget } from "./targets.js"; import { resolveTlonAccount, listTlonAccountIds } from "./types.js"; import { authenticate } from "./urbit/auth.js"; -import { UrbitChannelClient } from "./urbit/channel-client.js"; -import { ssrfPolicyFromAllowPrivateNetwork } from "./urbit/context.js"; -import { buildMediaText, sendDm, sendGroupMessage } from "./urbit/send.js"; +import { ensureUrbitConnectPatched, Urbit } from "./urbit/http-api.js"; +import { + buildMediaStory, + sendDm, + sendGroupMessage, + sendDmWithStory, + sendGroupMessageWithStory, +} from "./urbit/send.js"; +import { uploadImageFromUrl } from "./urbit/upload.js"; + +// Simple HTTP-only poke that doesn't open an EventSource (avoids conflict with monitor's SSE) +async function createHttpPokeApi(params: { url: string; code: string; ship: string }) { + const cookie = await authenticate(params.url, params.code); + const channelId = `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(2, 8)}`; + const channelUrl = `${params.url}/~/channel/${channelId}`; + const shipName = params.ship.replace(/^~/, ""); + + return { + poke: async (pokeParams: { app: string; mark: string; json: unknown }) => { + const pokeId = Date.now(); + const pokeData = { + id: pokeId, + action: "poke", + ship: shipName, + app: pokeParams.app, + mark: pokeParams.mark, + json: pokeParams.json, + }; + + const response = await fetch(channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: cookie.split(";")[0], + }, + body: JSON.stringify([pokeData]), + }); + + if (!response.ok && response.status !== 204) { + const errorText = await response.text(); + throw new Error(`Poke failed: ${response.status} - ${errorText}`); + } + + return pokeId; + }, + delete: async () => { + // No-op for HTTP-only client + }, + }; +} const TLON_CHANNEL_ID = "tlon" as const; @@ -27,7 +74,6 @@ type TlonSetupInput = ChannelSetupInput & { ship?: string; url?: string; code?: string; - allowPrivateNetwork?: boolean; groupChannels?: string[]; dmAllowlist?: string[]; autoDiscoverChannels?: boolean; @@ -48,7 +94,16 @@ function applyTlonSetupConfig(params: { }); const base = namedConfig.channels?.tlon ?? {}; - const payload = buildTlonAccountFields(input); + const payload = { + ...(input.ship ? { ship: input.ship } : {}), + ...(input.url ? { url: input.url } : {}), + ...(input.code ? { code: input.code } : {}), + ...(input.groupChannels ? { groupChannels: input.groupChannels } : {}), + ...(input.dmAllowlist ? { dmAllowlist: input.dmAllowlist } : {}), + ...(typeof input.autoDiscoverChannels === "boolean" + ? { autoDiscoverChannels: input.autoDiscoverChannels } + : {}), + }; if (useDefault) { return { @@ -97,7 +152,7 @@ const tlonOutbound: ChannelOutboundAdapter = { error: new Error(`Invalid Tlon target. Use ${formatTargetHint()}`), }; } - if (parsed.kind === "direct") { + if (parsed.kind === "dm") { return { ok: true, to: parsed.ship }; } return { ok: true, to: parsed.nest }; @@ -113,16 +168,16 @@ const tlonOutbound: ChannelOutboundAdapter = { throw new Error(`Invalid Tlon target. Use ${formatTargetHint()}`); } - const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(account.allowPrivateNetwork); - const cookie = await authenticate(account.url, account.code, { ssrfPolicy }); - const api = new UrbitChannelClient(account.url, cookie, { - ship: account.ship.replace(/^~/, ""), - ssrfPolicy, + // Use HTTP-only poke (no EventSource) to avoid conflicts with monitor's SSE connection + const api = await createHttpPokeApi({ + url: account.url, + ship: account.ship, + code: account.code, }); try { const fromShip = normalizeShip(account.ship); - if (parsed.kind === "direct") { + if (parsed.kind === "dm") { return await sendDm({ api, fromShip, @@ -140,19 +195,68 @@ const tlonOutbound: ChannelOutboundAdapter = { replyToId: replyId, }); } finally { - await api.close(); + try { + await api.delete(); + } catch { + // ignore cleanup errors + } } }, sendMedia: async ({ cfg, to, text, mediaUrl, accountId, replyToId, threadId }) => { - const mergedText = buildMediaText(text, mediaUrl); - return await tlonOutbound.sendText!({ - cfg, - to, - text: mergedText, - accountId, - replyToId, - threadId, + const account = resolveTlonAccount(cfg, accountId ?? undefined); + if (!account.configured || !account.ship || !account.url || !account.code) { + throw new Error("Tlon account not configured"); + } + + const parsed = parseTlonTarget(to); + if (!parsed) { + throw new Error(`Invalid Tlon target. Use ${formatTargetHint()}`); + } + + // NOTE: configureClient not available in current @tloncorp/api-beta version + // configureClient({ + // shipUrl: account.url, + // shipName: account.ship.replace(/^~/, ""), + // verbose: false, + // getCode: async () => account.code!, + // }); + + const uploadedUrl = mediaUrl ? await uploadImageFromUrl(mediaUrl) : undefined; + + const api = await createHttpPokeApi({ + url: account.url, + ship: account.ship, + code: account.code, }); + + try { + const fromShip = normalizeShip(account.ship); + const story = buildMediaStory(text, uploadedUrl); + + if (parsed.kind === "dm") { + return await sendDmWithStory({ + api, + fromShip, + toShip: parsed.ship, + story, + }); + } + const replyId = (replyToId ?? threadId) ? String(replyToId ?? threadId) : undefined; + return await sendGroupMessageWithStory({ + api, + fromShip, + hostShip: parsed.hostShip, + channelName: parsed.channelName, + story, + replyToId: replyId, + }); + } finally { + try { + await api.delete(); + } catch { + // ignore cleanup errors + } + } }, }; @@ -170,7 +274,7 @@ export const tlonPlugin: ChannelPlugin = { }, capabilities: { chatTypes: ["direct", "group", "thread"], - media: false, + media: true, reply: true, threads: true, }, @@ -189,7 +293,7 @@ export const tlonPlugin: ChannelPlugin = { channels: { ...cfg.channels, tlon: { - ...(cfg.channels?.tlon as Record), + ...cfg.channels?.tlon, enabled, }, }, @@ -200,7 +304,7 @@ export const tlonPlugin: ChannelPlugin = { channels: { ...cfg.channels, tlon: { - ...(cfg.channels?.tlon as Record), + ...cfg.channels?.tlon, accounts: { ...cfg.channels?.tlon?.accounts, [accountId]: { @@ -215,11 +319,13 @@ export const tlonPlugin: ChannelPlugin = { deleteAccount: ({ cfg, accountId }) => { const useDefault = !accountId || accountId === "default"; if (useDefault) { - // oxlint-disable-next-line no-unused-vars - const { ship, code, url, name, ...rest } = (cfg.channels?.tlon ?? {}) as Record< - string, - unknown - >; + const { + ship: _ship, + code: _code, + url: _url, + name: _name, + ...rest + } = cfg.channels?.tlon ?? {}; return { ...cfg, channels: { @@ -228,15 +334,13 @@ export const tlonPlugin: ChannelPlugin = { }, } as OpenClawConfig; } - // oxlint-disable-next-line no-unused-vars - const { [accountId]: removed, ...remainingAccounts } = (cfg.channels?.tlon?.accounts ?? - {}) as Record; + const { [accountId]: _removed, ...remainingAccounts } = cfg.channels?.tlon?.accounts ?? {}; return { ...cfg, channels: { ...cfg.channels, tlon: { - ...(cfg.channels?.tlon as Record), + ...cfg.channels?.tlon, accounts: remainingAccounts, }, }, @@ -291,7 +395,7 @@ export const tlonPlugin: ChannelPlugin = { if (!parsed) { return target.trim(); } - if (parsed.kind === "direct") { + if (parsed.kind === "dm") { return parsed.ship; } return parsed.nest; @@ -325,45 +429,53 @@ export const tlonPlugin: ChannelPlugin = { return []; }); }, - buildChannelSummary: ({ snapshot }) => ({ - configured: snapshot.configured ?? false, - ship: (snapshot as { ship?: string | null }).ship ?? null, - url: (snapshot as { url?: string | null }).url ?? null, - }), + buildChannelSummary: ({ snapshot }) => { + const s = snapshot as { configured?: boolean; ship?: string; url?: string }; + return { + configured: s.configured ?? false, + ship: s.ship ?? null, + url: s.url ?? null, + }; + }, probeAccount: async ({ account }) => { if (!account.configured || !account.ship || !account.url || !account.code) { return { ok: false, error: "Not configured" }; } try { - const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(account.allowPrivateNetwork); - const cookie = await authenticate(account.url, account.code, { ssrfPolicy }); - const api = new UrbitChannelClient(account.url, cookie, { + ensureUrbitConnectPatched(); + const api = await Urbit.authenticate({ ship: account.ship.replace(/^~/, ""), - ssrfPolicy, + url: account.url, + code: account.code, + verbose: false, }); try { await api.getOurName(); return { ok: true }; } finally { - await api.close(); + await api.delete(); } - } catch (error) { - return { ok: false, error: (error as { message?: string })?.message ?? String(error) }; + } catch (error: any) { + return { ok: false, error: error?.message ?? String(error) }; } }, - buildAccountSnapshot: ({ account, runtime, probe }) => ({ - accountId: account.accountId, - name: account.name, - enabled: account.enabled, - configured: account.configured, - ship: account.ship, - url: account.url, - running: runtime?.running ?? false, - lastStartAt: runtime?.lastStartAt ?? null, - lastStopAt: runtime?.lastStopAt ?? null, - lastError: runtime?.lastError ?? null, - probe, - }), + buildAccountSnapshot: ({ account, runtime, probe }) => { + // Tlon-specific snapshot with ship/url for status display + const snapshot = { + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: account.configured, + ship: account.ship, + url: account.url, + running: runtime?.running ?? false, + lastStartAt: runtime?.lastStartAt ?? null, + lastStopAt: runtime?.lastStopAt ?? null, + lastError: runtime?.lastError ?? null, + probe, + }; + return snapshot as import("openclaw/plugin-sdk").ChannelAccountSnapshot; + }, }, gateway: { startAccount: async (ctx) => { @@ -372,7 +484,7 @@ export const tlonPlugin: ChannelPlugin = { accountId: account.accountId, ship: account.ship, url: account.url, - } as ChannelAccountSnapshot); + } as import("openclaw/plugin-sdk").ChannelAccountSnapshot); ctx.log?.info(`[${account.accountId}] starting Tlon provider for ${account.ship ?? "tlon"}`); return monitorTlonProvider({ runtime: ctx.runtime, diff --git a/extensions/tlon/src/config-schema.ts b/extensions/tlon/src/config-schema.ts index ea80212088d..4a091c8f650 100644 --- a/extensions/tlon/src/config-schema.ts +++ b/extensions/tlon/src/config-schema.ts @@ -25,6 +25,11 @@ const tlonCommonConfigFields = { autoDiscoverChannels: z.boolean().optional(), showModelSignature: z.boolean().optional(), responsePrefix: z.string().optional(), + // Auto-accept settings + autoAcceptDmInvites: z.boolean().optional(), // Auto-accept DMs from ships in dmAllowlist + autoAcceptGroupInvites: z.boolean().optional(), // Auto-accept all group invites + // Owner ship for approval system + ownerShip: ShipSchema.optional(), // Ship that receives approval requests and can approve/deny } satisfies z.ZodRawShape; export const TlonAccountSchema = z.object({ diff --git a/extensions/tlon/src/monitor/approval.ts b/extensions/tlon/src/monitor/approval.ts new file mode 100644 index 00000000000..549be04a88a --- /dev/null +++ b/extensions/tlon/src/monitor/approval.ts @@ -0,0 +1,278 @@ +/** + * Approval system for managing DM, channel mention, and group invite approvals. + * + * When an unknown ship tries to interact with the bot, the owner receives + * a notification and can approve or deny the request. + */ + +import type { PendingApproval } from "../settings.js"; + +export type { PendingApproval }; + +export type ApprovalType = "dm" | "channel" | "group"; + +export type CreateApprovalParams = { + type: ApprovalType; + requestingShip: string; + channelNest?: string; + groupFlag?: string; + messagePreview?: string; + originalMessage?: { + messageId: string; + messageText: string; + messageContent: unknown; + timestamp: number; + parentId?: string; + isThreadReply?: boolean; + }; +}; + +/** + * Generate a unique approval ID in the format: {type}-{timestamp}-{shortHash} + */ +export function generateApprovalId(type: ApprovalType): string { + const timestamp = Date.now(); + const randomPart = Math.random().toString(36).substring(2, 6); + return `${type}-${timestamp}-${randomPart}`; +} + +/** + * Create a pending approval object. + */ +export function createPendingApproval(params: CreateApprovalParams): PendingApproval { + return { + id: generateApprovalId(params.type), + type: params.type, + requestingShip: params.requestingShip, + channelNest: params.channelNest, + groupFlag: params.groupFlag, + messagePreview: params.messagePreview, + originalMessage: params.originalMessage, + timestamp: Date.now(), + }; +} + +/** + * Truncate text to a maximum length with ellipsis. + */ +function truncate(text: string, maxLength: number): string { + if (text.length <= maxLength) { + return text; + } + return text.substring(0, maxLength - 3) + "..."; +} + +/** + * Format a notification message for the owner about a pending approval. + */ +export function formatApprovalRequest(approval: PendingApproval): string { + const preview = approval.messagePreview ? `\n"${truncate(approval.messagePreview, 100)}"` : ""; + + switch (approval.type) { + case "dm": + return ( + `New DM request from ${approval.requestingShip}:${preview}\n\n` + + `Reply "approve", "deny", or "block" (ID: ${approval.id})` + ); + + case "channel": + return ( + `${approval.requestingShip} mentioned you in ${approval.channelNest}:${preview}\n\n` + + `Reply "approve", "deny", or "block"\n` + + `(ID: ${approval.id})` + ); + + case "group": + return ( + `Group invite from ${approval.requestingShip} to join ${approval.groupFlag}\n\n` + + `Reply "approve", "deny", or "block"\n` + + `(ID: ${approval.id})` + ); + } +} + +export type ApprovalResponse = { + action: "approve" | "deny" | "block"; + id?: string; +}; + +/** + * Parse an owner's response to an approval request. + * Supports formats: + * - "approve" / "deny" / "block" (applies to most recent pending) + * - "approve dm-1234567890-abc" / "deny dm-1234567890-abc" (specific ID) + * - "block" permanently blocks the ship via Tlon's native blocking + */ +export function parseApprovalResponse(text: string): ApprovalResponse | null { + const trimmed = text.trim().toLowerCase(); + + // Match "approve", "deny", or "block" optionally followed by an ID + const match = trimmed.match(/^(approve|deny|block)(?:\s+(.+))?$/); + if (!match) { + return null; + } + + const action = match[1] as "approve" | "deny" | "block"; + const id = match[2]?.trim(); + + return { action, id }; +} + +/** + * Check if a message text looks like an approval response. + * Used to determine if we should intercept the message before normal processing. + */ +export function isApprovalResponse(text: string): boolean { + const trimmed = text.trim().toLowerCase(); + return trimmed.startsWith("approve") || trimmed.startsWith("deny") || trimmed.startsWith("block"); +} + +/** + * Find a pending approval by ID, or return the most recent if no ID specified. + */ +export function findPendingApproval( + pendingApprovals: PendingApproval[], + id?: string, +): PendingApproval | undefined { + if (id) { + return pendingApprovals.find((a) => a.id === id); + } + // Return most recent + return pendingApprovals[pendingApprovals.length - 1]; +} + +/** + * Check if there's already a pending approval for the same ship/channel/group combo. + * Used to avoid sending duplicate notifications. + */ +export function hasDuplicatePending( + pendingApprovals: PendingApproval[], + type: ApprovalType, + requestingShip: string, + channelNest?: string, + groupFlag?: string, +): boolean { + return pendingApprovals.some((approval) => { + if (approval.type !== type || approval.requestingShip !== requestingShip) { + return false; + } + if (type === "channel" && approval.channelNest !== channelNest) { + return false; + } + if (type === "group" && approval.groupFlag !== groupFlag) { + return false; + } + return true; + }); +} + +/** + * Remove a pending approval from the list by ID. + */ +export function removePendingApproval( + pendingApprovals: PendingApproval[], + id: string, +): PendingApproval[] { + return pendingApprovals.filter((a) => a.id !== id); +} + +/** + * Format a confirmation message after an approval action. + */ +export function formatApprovalConfirmation( + approval: PendingApproval, + action: "approve" | "deny" | "block", +): string { + if (action === "block") { + return `Blocked ${approval.requestingShip}. They will no longer be able to contact the bot.`; + } + + const actionText = action === "approve" ? "Approved" : "Denied"; + + switch (approval.type) { + case "dm": + if (action === "approve") { + return `${actionText} DM access for ${approval.requestingShip}. They can now message the bot.`; + } + return `${actionText} DM request from ${approval.requestingShip}.`; + + case "channel": + if (action === "approve") { + return `${actionText} ${approval.requestingShip} for ${approval.channelNest}. They can now interact in this channel.`; + } + return `${actionText} ${approval.requestingShip} for ${approval.channelNest}.`; + + case "group": + if (action === "approve") { + return `${actionText} group invite from ${approval.requestingShip} to ${approval.groupFlag}. Joining group...`; + } + return `${actionText} group invite from ${approval.requestingShip} to ${approval.groupFlag}.`; + } +} + +// ============================================================================ +// Admin Commands +// ============================================================================ + +export type AdminCommand = + | { type: "unblock"; ship: string } + | { type: "blocked" } + | { type: "pending" }; + +/** + * Parse an admin command from owner message. + * Supports: + * - "unblock ~ship" - unblock a specific ship + * - "blocked" - list all blocked ships + * - "pending" - list all pending approvals + */ +export function parseAdminCommand(text: string): AdminCommand | null { + const trimmed = text.trim().toLowerCase(); + + // "blocked" - list blocked ships + if (trimmed === "blocked") { + return { type: "blocked" }; + } + + // "pending" - list pending approvals + if (trimmed === "pending") { + return { type: "pending" }; + } + + // "unblock ~ship" - unblock a specific ship + const unblockMatch = trimmed.match(/^unblock\s+(~[\w-]+)$/); + if (unblockMatch) { + return { type: "unblock", ship: unblockMatch[1] }; + } + + return null; +} + +/** + * Check if a message text looks like an admin command. + */ +export function isAdminCommand(text: string): boolean { + return parseAdminCommand(text) !== null; +} + +/** + * Format the list of blocked ships for display to owner. + */ +export function formatBlockedList(ships: string[]): string { + if (ships.length === 0) { + return "No ships are currently blocked."; + } + return `Blocked ships (${ships.length}):\n${ships.map((s) => `• ${s}`).join("\n")}`; +} + +/** + * Format the list of pending approvals for display to owner. + */ +export function formatPendingList(approvals: PendingApproval[]): string { + if (approvals.length === 0) { + return "No pending approval requests."; + } + return `Pending approvals (${approvals.length}):\n${approvals + .map((a) => `• ${a.id}: ${a.type} from ${a.requestingShip}`) + .join("\n")}`; +} diff --git a/extensions/tlon/src/monitor/discovery.ts b/extensions/tlon/src/monitor/discovery.ts index cc7f5d6b213..cce767ea4db 100644 --- a/extensions/tlon/src/monitor/discovery.ts +++ b/extensions/tlon/src/monitor/discovery.ts @@ -1,4 +1,5 @@ import type { RuntimeEnv } from "openclaw/plugin-sdk"; +import type { Foreigns } from "../urbit/foreigns.js"; import { formatChangesDate } from "./utils.js"; export async function fetchGroupChanges( @@ -15,34 +16,33 @@ export async function fetchGroupChanges( return changes; } return null; - } catch (error) { + } catch (error: any) { runtime.log?.( - `[tlon] Failed to fetch changes (falling back to full init): ${(error as { message?: string })?.message ?? String(error)}`, + `[tlon] Failed to fetch changes (falling back to full init): ${error?.message ?? String(error)}`, ); return null; } } -export async function fetchAllChannels( +export interface InitData { + channels: string[]; + foreigns: Foreigns | null; +} + +/** + * Fetch groups-ui init data, returning channels and foreigns. + * This is a single scry that provides both channel discovery and pending invites. + */ +export async function fetchInitData( api: { scry: (path: string) => Promise }, runtime: RuntimeEnv, -): Promise { +): Promise { try { - runtime.log?.("[tlon] Attempting auto-discovery of group channels..."); - const changes = await fetchGroupChanges(api, runtime, 5); - - // oxlint-disable-next-line typescript/no-explicit-any - let initData: any; - if (changes) { - runtime.log?.("[tlon] Changes data received, using full init for channel extraction"); - initData = await api.scry("/groups-ui/v6/init.json"); - } else { - initData = await api.scry("/groups-ui/v6/init.json"); - } + runtime.log?.("[tlon] Fetching groups-ui init data..."); + const initData = (await api.scry("/groups-ui/v6/init.json")) as any; const channels: string[] = []; - if (initData && initData.groups) { - // oxlint-disable-next-line typescript/no-explicit-any + if (initData?.groups) { for (const groupData of Object.values(initData.groups as Record)) { if (groupData && typeof groupData === "object" && groupData.channels) { for (const channelNest of Object.keys(groupData.channels)) { @@ -56,23 +56,31 @@ export async function fetchAllChannels( if (channels.length > 0) { runtime.log?.(`[tlon] Auto-discovered ${channels.length} chat channel(s)`); - runtime.log?.( - `[tlon] Channels: ${channels.slice(0, 5).join(", ")}${channels.length > 5 ? "..." : ""}`, - ); } else { runtime.log?.("[tlon] No chat channels found via auto-discovery"); - runtime.log?.("[tlon] Add channels manually to config: channels.tlon.groupChannels"); } - return channels; - } catch (error) { - runtime.log?.( - `[tlon] Auto-discovery failed: ${(error as { message?: string })?.message ?? String(error)}`, - ); - runtime.log?.( - "[tlon] To monitor group channels, add them to config: channels.tlon.groupChannels", - ); - runtime.log?.('[tlon] Example: ["chat/~host-ship/channel-name"]'); - return []; + const foreigns = (initData?.foreigns as Foreigns) || null; + if (foreigns) { + const pendingCount = Object.values(foreigns).filter((f) => + f.invites?.some((i) => i.valid), + ).length; + if (pendingCount > 0) { + runtime.log?.(`[tlon] Found ${pendingCount} pending group invite(s)`); + } + } + + return { channels, foreigns }; + } catch (error: any) { + runtime.log?.(`[tlon] Init data fetch failed: ${error?.message ?? String(error)}`); + return { channels: [], foreigns: null }; } } + +export async function fetchAllChannels( + api: { scry: (path: string) => Promise }, + runtime: RuntimeEnv, +): Promise { + const { channels } = await fetchInitData(api, runtime); + return channels; +} diff --git a/extensions/tlon/src/monitor/history.ts b/extensions/tlon/src/monitor/history.ts index 03360a12a6d..3674b175b3c 100644 --- a/extensions/tlon/src/monitor/history.ts +++ b/extensions/tlon/src/monitor/history.ts @@ -1,6 +1,25 @@ import type { RuntimeEnv } from "openclaw/plugin-sdk"; import { extractMessageText } from "./utils.js"; +/** + * Format a number as @ud (with dots every 3 digits from the right) + * e.g., 170141184507799509469114119040828178432 -> 170.141.184.507.799.509.469.114.119.040.828.178.432 + */ +function formatUd(id: string | number): string { + const str = String(id).replace(/\./g, ""); // Remove any existing dots + const reversed = str.split("").toReversed(); + const chunks: string[] = []; + for (let i = 0; i < reversed.length; i += 3) { + chunks.push( + reversed + .slice(i, i + 3) + .toReversed() + .join(""), + ); + } + return chunks.toReversed().join("."); +} + export type TlonHistoryEntry = { author: string; content: string; @@ -35,13 +54,11 @@ export async function fetchChannelHistory( const scryPath = `/channels/v4/${channelNest}/posts/newest/${count}/outline.json`; runtime?.log?.(`[tlon] Fetching history: ${scryPath}`); - // oxlint-disable-next-line typescript/no-explicit-any const data: any = await api.scry(scryPath); if (!data) { return []; } - // oxlint-disable-next-line typescript/no-explicit-any let posts: any[] = []; if (Array.isArray(data)) { posts = data; @@ -67,10 +84,8 @@ export async function fetchChannelHistory( runtime?.log?.(`[tlon] Extracted ${messages.length} messages from history`); return messages; - } catch (error) { - runtime?.log?.( - `[tlon] Error fetching channel history: ${(error as { message?: string })?.message ?? String(error)}`, - ); + } catch (error: any) { + runtime?.log?.(`[tlon] Error fetching channel history: ${error?.message ?? String(error)}`); return []; } } @@ -90,3 +105,87 @@ export async function getChannelHistory( runtime?.log?.(`[tlon] Cache has ${cache.length} messages, need ${count}, fetching from scry...`); return await fetchChannelHistory(api, channelNest, count, runtime); } + +/** + * Fetch thread/reply history for a specific parent post. + * Used to get context when entering a thread conversation. + */ +export async function fetchThreadHistory( + api: { scry: (path: string) => Promise }, + channelNest: string, + parentId: string, + count = 50, + runtime?: RuntimeEnv, +): Promise { + try { + // Tlon API: fetch replies to a specific post + // Format: /channels/v4/{nest}/posts/post/{parentId}/replies/newest/{count}.json + // parentId needs @ud formatting (dots every 3 digits) + const formattedParentId = formatUd(parentId); + runtime?.log?.( + `[tlon] Thread history - parentId: ${parentId} -> formatted: ${formattedParentId}`, + ); + + const scryPath = `/channels/v4/${channelNest}/posts/post/id/${formattedParentId}/replies/newest/${count}.json`; + runtime?.log?.(`[tlon] Fetching thread history: ${scryPath}`); + + const data: any = await api.scry(scryPath); + if (!data) { + runtime?.log?.(`[tlon] No thread history data returned`); + return []; + } + + let replies: any[] = []; + if (Array.isArray(data)) { + replies = data; + } else if (data.replies && Array.isArray(data.replies)) { + replies = data.replies; + } else if (typeof data === "object") { + replies = Object.values(data); + } + + const messages = replies + .map((item) => { + // Thread replies use 'memo' structure + const memo = item.memo || item["r-reply"]?.set?.memo || item; + const seal = item.seal || item["r-reply"]?.set?.seal; + + return { + author: memo?.author || "unknown", + content: extractMessageText(memo?.content || []), + timestamp: memo?.sent || Date.now(), + id: seal?.id || item.id, + } as TlonHistoryEntry; + }) + .filter((msg) => msg.content); + + runtime?.log?.(`[tlon] Extracted ${messages.length} thread replies from history`); + return messages; + } catch (error: any) { + runtime?.log?.(`[tlon] Error fetching thread history: ${error?.message ?? String(error)}`); + // Fall back to trying alternate path structure + try { + const altPath = `/channels/v4/${channelNest}/posts/post/id/${formatUd(parentId)}.json`; + runtime?.log?.(`[tlon] Trying alternate path: ${altPath}`); + const data: any = await api.scry(altPath); + + if (data?.seal?.meta?.replyCount > 0 && data?.replies) { + const replies = Array.isArray(data.replies) ? data.replies : Object.values(data.replies); + const messages = replies + .map((reply: any) => ({ + author: reply.memo?.author || "unknown", + content: extractMessageText(reply.memo?.content || []), + timestamp: reply.memo?.sent || Date.now(), + id: reply.seal?.id, + })) + .filter((msg: TlonHistoryEntry) => msg.content); + + runtime?.log?.(`[tlon] Extracted ${messages.length} replies from post data`); + return messages; + } + } catch (altError: any) { + runtime?.log?.(`[tlon] Alternate path also failed: ${altError?.message ?? String(altError)}`); + } + return []; + } +} diff --git a/extensions/tlon/src/monitor/index.ts b/extensions/tlon/src/monitor/index.ts index 7d2e8dbd31f..cc438bb57ec 100644 --- a/extensions/tlon/src/monitor/index.ts +++ b/extensions/tlon/src/monitor/index.ts @@ -1,28 +1,42 @@ import type { RuntimeEnv, ReplyPayload, OpenClawConfig } from "openclaw/plugin-sdk"; import { createLoggerBackedRuntime, createReplyPrefixOptions } from "openclaw/plugin-sdk"; import { getTlonRuntime } from "../runtime.js"; +import { createSettingsManager, type TlonSettingsStore } from "../settings.js"; import { normalizeShip, parseChannelNest } from "../targets.js"; import { resolveTlonAccount } from "../types.js"; import { authenticate } from "../urbit/auth.js"; -import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js"; +import type { Foreigns, DmInvite } from "../urbit/foreigns.js"; import { sendDm, sendGroupMessage } from "../urbit/send.js"; import { UrbitSSEClient } from "../urbit/sse-client.js"; -import { fetchAllChannels } from "./discovery.js"; -import { cacheMessage, getChannelHistory } from "./history.js"; +import { + type PendingApproval, + type AdminCommand, + createPendingApproval, + formatApprovalRequest, + formatApprovalConfirmation, + parseApprovalResponse, + isApprovalResponse, + findPendingApproval, + removePendingApproval, + parseAdminCommand, + isAdminCommand, + formatBlockedList, + formatPendingList, +} from "./approval.js"; +import { fetchAllChannels, fetchInitData } from "./discovery.js"; +import { cacheMessage, getChannelHistory, fetchThreadHistory } from "./history.js"; +import { downloadMessageImages } from "./media.js"; import { createProcessedMessageTracker } from "./processed-messages.js"; import { extractMessageText, + extractCites, formatModelName, isBotMentioned, isDmAllowed, isSummarizationRequest, + type ParsedCite, } from "./utils.js"; -function formatError(err: unknown): string { - if (err instanceof Error) return err.message; - return String(err); -} - export type MonitorTlonOpts = { runtime?: RuntimeEnv; abortSignal?: AbortSignal; @@ -34,37 +48,14 @@ type ChannelAuthorization = { allowedShips?: string[]; }; -type UrbitMemo = { - author?: string; - content?: unknown; - sent?: number; -}; - -type UrbitSeal = { - "parent-id"?: string; - parent?: string; -}; - -type UrbitUpdate = { - id?: string | number; - response?: { - add?: { memo?: UrbitMemo }; - post?: { - id?: string | number; - "r-post"?: { - set?: { essay?: UrbitMemo; seal?: UrbitSeal }; - reply?: { - id?: string | number; - "r-reply"?: { set?: { memo?: UrbitMemo; seal?: UrbitSeal } }; - }; - }; - }; - }; -}; - +/** + * Resolve channel authorization by merging file config with settings store. + * Settings store takes precedence for fields it defines. + */ function resolveChannelAuthorization( cfg: OpenClawConfig, channelNest: string, + settings?: TlonSettingsStore, ): { mode: "restricted" | "open"; allowedShips: string[] } { const tlonConfig = cfg.channels?.tlon as | { @@ -72,16 +63,23 @@ function resolveChannelAuthorization( defaultAuthorizedShips?: string[]; } | undefined; - const rules = tlonConfig?.authorization?.channelRules ?? {}; - const rule = rules[channelNest]; - const allowedShips = rule?.allowedShips ?? tlonConfig?.defaultAuthorizedShips ?? []; + + // Merge channel rules: settings override file config + const fileRules = tlonConfig?.authorization?.channelRules ?? {}; + const settingsRules = settings?.channelRules ?? {}; + const rule = settingsRules[channelNest] ?? fileRules[channelNest]; + + // Merge default authorized ships: settings override file config + const defaultShips = settings?.defaultAuthorizedShips ?? tlonConfig?.defaultAuthorizedShips ?? []; + + const allowedShips = rule?.allowedShips ?? defaultShips; const mode = rule?.mode ?? "restricted"; return { mode, allowedShips }; } export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { const core = getTlonRuntime(); - const cfg = core.config.loadConfig(); + const cfg = core.config.loadConfig() as OpenClawConfig; if (cfg.channels?.tlon?.enabled === false) { return; } @@ -106,39 +104,232 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise runtime.log?.(message), error: (message) => runtime.error?.(message), }, }); - } catch (error) { - runtime.error?.(`[tlon] Failed to authenticate: ${formatError(error)}`); + } catch (error: any) { + runtime.error?.(`[tlon] Failed to authenticate: ${error?.message ?? String(error)}`); throw error; } const processedTracker = createProcessedMessageTracker(2000); let groupChannels: string[] = []; + let botNickname: string | null = null; - if (account.autoDiscoverChannels !== false) { - try { - const discoveredChannels = await fetchAllChannels(api, runtime); - if (discoveredChannels.length > 0) { - groupChannels = discoveredChannels; + // Settings store manager for hot-reloading config + const settingsManager = createSettingsManager(api, { + log: (msg) => runtime.log?.(msg), + error: (msg) => runtime.error?.(msg), + }); + + // Reactive state that can be updated via settings store + let effectiveDmAllowlist: string[] = account.dmAllowlist; + let effectiveShowModelSig: boolean = account.showModelSignature ?? false; + let effectiveAutoAcceptDmInvites: boolean = account.autoAcceptDmInvites ?? false; + let effectiveAutoAcceptGroupInvites: boolean = account.autoAcceptGroupInvites ?? false; + let effectiveGroupInviteAllowlist: string[] = account.groupInviteAllowlist; + let effectiveAutoDiscoverChannels: boolean = account.autoDiscoverChannels ?? false; + let effectiveOwnerShip: string | null = account.ownerShip + ? normalizeShip(account.ownerShip) + : null; + let pendingApprovals: PendingApproval[] = []; + let currentSettings: TlonSettingsStore = {}; + + // Track threads we've participated in (by parentId) - respond without mention requirement + const participatedThreads = new Set(); + + // Track DM senders per session to detect shared sessions (security warning) + const dmSendersBySession = new Map>(); + let sharedSessionWarningSent = false; + + // Fetch bot's nickname from contacts + try { + const selfProfile = await api.scry("/contacts/v1/self.json"); + if (selfProfile && typeof selfProfile === "object") { + const profile = selfProfile as { nickname?: { value?: string } }; + botNickname = profile.nickname?.value || null; + if (botNickname) { + runtime.log?.(`[tlon] Bot nickname: ${botNickname}`); + } + } + } catch (error: any) { + runtime.log?.(`[tlon] Could not fetch nickname: ${error?.message ?? String(error)}`); + } + + // Store init foreigns for processing after settings are loaded + let initForeigns: Foreigns | null = null; + + // Migrate file config to settings store (seed on first run) + async function migrateConfigToSettings() { + const migrations: Array<{ key: string; fileValue: unknown; settingsValue: unknown }> = [ + { + key: "dmAllowlist", + fileValue: account.dmAllowlist, + settingsValue: currentSettings.dmAllowlist, + }, + { + key: "groupInviteAllowlist", + fileValue: account.groupInviteAllowlist, + settingsValue: currentSettings.groupInviteAllowlist, + }, + { + key: "groupChannels", + fileValue: account.groupChannels, + settingsValue: currentSettings.groupChannels, + }, + { + key: "defaultAuthorizedShips", + fileValue: account.defaultAuthorizedShips, + settingsValue: currentSettings.defaultAuthorizedShips, + }, + { + key: "autoDiscoverChannels", + fileValue: account.autoDiscoverChannels, + settingsValue: currentSettings.autoDiscoverChannels, + }, + { + key: "autoAcceptDmInvites", + fileValue: account.autoAcceptDmInvites, + settingsValue: currentSettings.autoAcceptDmInvites, + }, + { + key: "autoAcceptGroupInvites", + fileValue: account.autoAcceptGroupInvites, + settingsValue: currentSettings.autoAcceptGroupInvites, + }, + { + key: "showModelSig", + fileValue: account.showModelSignature, + settingsValue: currentSettings.showModelSig, + }, + ]; + + for (const { key, fileValue, settingsValue } of migrations) { + // Only migrate if file has a value and settings store doesn't + const hasFileValue = Array.isArray(fileValue) ? fileValue.length > 0 : fileValue != null; + const hasSettingsValue = Array.isArray(settingsValue) + ? settingsValue.length > 0 + : settingsValue != null; + + if (hasFileValue && !hasSettingsValue) { + try { + await api!.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + "bucket-key": "tlon", + "entry-key": key, + value: fileValue, + desk: "moltbot", + }, + }, + }); + runtime.log?.(`[tlon] Migrated ${key} from config to settings store`); + } catch (err) { + runtime.log?.(`[tlon] Failed to migrate ${key}: ${String(err)}`); + } } - } catch (error) { - runtime.error?.(`[tlon] Auto-discovery failed: ${formatError(error)}`); } } - if (groupChannels.length === 0 && account.groupChannels.length > 0) { - groupChannels = account.groupChannels; - runtime.log?.(`[tlon] Using manual groupChannels config: ${groupChannels.join(", ")}`); + // Load settings from settings store (hot-reloadable config) + try { + currentSettings = await settingsManager.load(); + + // Migrate file config to settings store if not already present + await migrateConfigToSettings(); + + // Apply settings overrides + // Note: groupChannels from settings store are merged AFTER discovery runs (below) + if (currentSettings.defaultAuthorizedShips?.length) { + runtime.log?.( + `[tlon] Using defaultAuthorizedShips from settings store: ${currentSettings.defaultAuthorizedShips.join(", ")}`, + ); + } + if (currentSettings.autoDiscoverChannels !== undefined) { + effectiveAutoDiscoverChannels = currentSettings.autoDiscoverChannels; + runtime.log?.( + `[tlon] Using autoDiscoverChannels from settings store: ${effectiveAutoDiscoverChannels}`, + ); + } + if (currentSettings.dmAllowlist?.length) { + effectiveDmAllowlist = currentSettings.dmAllowlist; + runtime.log?.( + `[tlon] Using dmAllowlist from settings store: ${effectiveDmAllowlist.join(", ")}`, + ); + } + if (currentSettings.showModelSig !== undefined) { + effectiveShowModelSig = currentSettings.showModelSig; + } + if (currentSettings.autoAcceptDmInvites !== undefined) { + effectiveAutoAcceptDmInvites = currentSettings.autoAcceptDmInvites; + runtime.log?.( + `[tlon] Using autoAcceptDmInvites from settings store: ${effectiveAutoAcceptDmInvites}`, + ); + } + if (currentSettings.autoAcceptGroupInvites !== undefined) { + effectiveAutoAcceptGroupInvites = currentSettings.autoAcceptGroupInvites; + runtime.log?.( + `[tlon] Using autoAcceptGroupInvites from settings store: ${effectiveAutoAcceptGroupInvites}`, + ); + } + if (currentSettings.groupInviteAllowlist?.length) { + effectiveGroupInviteAllowlist = currentSettings.groupInviteAllowlist; + runtime.log?.( + `[tlon] Using groupInviteAllowlist from settings store: ${effectiveGroupInviteAllowlist.join(", ")}`, + ); + } + if (currentSettings.ownerShip) { + effectiveOwnerShip = normalizeShip(currentSettings.ownerShip); + runtime.log?.(`[tlon] Using ownerShip from settings store: ${effectiveOwnerShip}`); + } + if (currentSettings.pendingApprovals?.length) { + pendingApprovals = currentSettings.pendingApprovals; + runtime.log?.(`[tlon] Loaded ${pendingApprovals.length} pending approval(s) from settings`); + } + } catch (err) { + runtime.log?.(`[tlon] Settings store not available, using file config: ${String(err)}`); + } + + // Run channel discovery AFTER settings are loaded (so settings store value is used) + if (effectiveAutoDiscoverChannels) { + try { + const initData = await fetchInitData(api, runtime); + if (initData.channels.length > 0) { + groupChannels = initData.channels; + } + initForeigns = initData.foreigns; + } catch (error: any) { + runtime.error?.(`[tlon] Auto-discovery failed: ${error?.message ?? String(error)}`); + } + } + + // Merge manual config with auto-discovered channels + if (account.groupChannels.length > 0) { + for (const ch of account.groupChannels) { + if (!groupChannels.includes(ch)) { + groupChannels.push(ch); + } + } + runtime.log?.( + `[tlon] Added ${account.groupChannels.length} manual groupChannels to monitoring`, + ); + } + + // Also merge settings store groupChannels (may have been set via tlon settings command) + if (currentSettings.groupChannels?.length) { + for (const ch of currentSettings.groupChannels) { + if (!groupChannels.includes(ch)) { + groupChannels.push(ch); + } + } } if (groupChannels.length > 0) { @@ -149,142 +340,485 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { - try { - const memo = update?.response?.add?.memo; - if (!memo) { - return; - } - - const messageId = update.id != null ? String(update.id) : undefined; - if (!processedTracker.mark(messageId)) { - return; - } - - const senderShip = normalizeShip(memo.author ?? ""); - if (!senderShip || senderShip === botShipName) { - return; - } - - const messageText = extractMessageText(memo.content); - if (!messageText) { - return; - } - - if (!isDmAllowed(senderShip, account.dmAllowlist)) { - runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`); - return; - } - - await processMessage({ - messageId: messageId ?? "", - senderShip, - messageText, - isGroup: false, - timestamp: memo.sent || Date.now(), - }); - } catch (error) { - runtime.error?.(`[tlon] Error handling DM: ${formatError(error)}`); + // Helper to resolve cited message content + async function resolveCiteContent(cite: ParsedCite): Promise { + if (cite.type !== "chan" || !cite.nest || !cite.postId) { + return null; } - }; - const handleIncomingGroupMessage = (channelNest: string) => async (update: UrbitUpdate) => { try { - const parsed = parseChannelNest(channelNest); - if (!parsed) { - return; + // Scry for the specific post: /v4/{nest}/posts/post/{postId} + const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`; + runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`); + + const data: any = await api!.scry(scryPath); + + // Extract text from the post's essay content + if (data?.essay?.content) { + const text = extractMessageText(data.essay.content); + return text || null; } - const post = update?.response?.post?.["r-post"]; - const essay = post?.set?.essay; - const memo = post?.reply?.["r-reply"]?.set?.memo; - if (!essay && !memo) { - return; - } - - const content = memo || essay; - if (!content) { - return; - } - const isThreadReply = Boolean(memo); - const rawMessageId = isThreadReply ? post?.reply?.id : update?.response?.post?.id; - const messageId = rawMessageId != null ? String(rawMessageId) : undefined; - - if (!processedTracker.mark(messageId)) { - return; - } - - const senderShip = normalizeShip(content.author ?? ""); - if (!senderShip || senderShip === botShipName) { - return; - } - - const messageText = extractMessageText(content.content); - if (!messageText) { - return; - } - - cacheMessage(channelNest, { - author: senderShip, - content: messageText, - timestamp: content.sent || Date.now(), - id: messageId, - }); - - const mentioned = isBotMentioned(messageText, botShipName); - if (!mentioned) { - return; - } - - const { mode, allowedShips } = resolveChannelAuthorization(cfg, channelNest); - if (mode === "restricted") { - if (allowedShips.length === 0) { - runtime.log?.(`[tlon] Access denied: ${senderShip} in ${channelNest} (no allowlist)`); - return; - } - const normalizedAllowed = allowedShips.map(normalizeShip); - if (!normalizedAllowed.includes(senderShip)) { - runtime.log?.( - `[tlon] Access denied: ${senderShip} in ${channelNest} (allowed: ${allowedShips.join(", ")})`, - ); - return; - } - } - - const seal = isThreadReply - ? update?.response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal - : update?.response?.post?.["r-post"]?.set?.seal; - - const parentId = seal?.["parent-id"] || seal?.parent || null; - - await processMessage({ - messageId: messageId ?? "", - senderShip, - messageText, - isGroup: true, - groupChannel: channelNest, - groupName: `${parsed.hostShip}/${parsed.channelName}`, - timestamp: content.sent || Date.now(), - parentId, - }); - } catch (error) { - runtime.error?.(`[tlon] Error handling group message: ${formatError(error)}`); + return null; + } catch (err) { + runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`); + return null; } - }; + } + + // Resolve all cites in message content and return quoted text + async function resolveAllCites(content: unknown): Promise { + const cites = extractCites(content); + if (cites.length === 0) { + return ""; + } + + const resolved: string[] = []; + for (const cite of cites) { + const text = await resolveCiteContent(cite); + if (text) { + const author = cite.author || "unknown"; + resolved.push(`> ${author} wrote: ${text}`); + } + } + + return resolved.length > 0 ? resolved.join("\n") + "\n\n" : ""; + } + + // Helper to save pending approvals to settings store + async function savePendingApprovals(): Promise { + try { + await api!.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + desk: "moltbot", + "bucket-key": "tlon", + "entry-key": "pendingApprovals", + value: JSON.stringify(pendingApprovals), + }, + }, + }); + } catch (err) { + runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`); + } + } + + // Helper to update dmAllowlist in settings store + async function addToDmAllowlist(ship: string): Promise { + const normalizedShip = normalizeShip(ship); + if (!effectiveDmAllowlist.includes(normalizedShip)) { + effectiveDmAllowlist = [...effectiveDmAllowlist, normalizedShip]; + } + try { + await api!.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + desk: "moltbot", + "bucket-key": "tlon", + "entry-key": "dmAllowlist", + value: effectiveDmAllowlist, + }, + }, + }); + runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`); + } catch (err) { + runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`); + } + } + + // Helper to update channelRules in settings store + async function addToChannelAllowlist(ship: string, channelNest: string): Promise { + const normalizedShip = normalizeShip(ship); + const channelRules = currentSettings.channelRules ?? {}; + const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] }; + const allowedShips = [...(rule.allowedShips ?? [])]; // Clone to avoid mutation + + if (!allowedShips.includes(normalizedShip)) { + allowedShips.push(normalizedShip); + } + + const updatedRules = { + ...channelRules, + [channelNest]: { ...rule, allowedShips }, + }; + + // Update local state immediately (don't wait for settings subscription) + currentSettings = { ...currentSettings, channelRules: updatedRules }; + + try { + await api!.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + desk: "moltbot", + "bucket-key": "tlon", + "entry-key": "channelRules", + value: JSON.stringify(updatedRules), + }, + }, + }); + runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`); + } catch (err) { + runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`); + } + } + + // Helper to block a ship using Tlon's native blocking + async function blockShip(ship: string): Promise { + const normalizedShip = normalizeShip(ship); + try { + await api!.poke({ + app: "chat", + mark: "chat-block-ship", + json: { ship: normalizedShip }, + }); + runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`); + } + } + + // Check if a ship is blocked using Tlon's native block list + async function isShipBlocked(ship: string): Promise { + const normalizedShip = normalizeShip(ship); + try { + const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined; + return Array.isArray(blocked) && blocked.some((s) => normalizeShip(s) === normalizedShip); + } catch (err) { + runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`); + return false; + } + } + + // Get all blocked ships + async function getBlockedShips(): Promise { + try { + const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined; + return Array.isArray(blocked) ? blocked : []; + } catch (err) { + runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`); + return []; + } + } + + // Helper to unblock a ship using Tlon's native blocking + async function unblockShip(ship: string): Promise { + const normalizedShip = normalizeShip(ship); + try { + await api!.poke({ + app: "chat", + mark: "chat-unblock-ship", + json: { ship: normalizedShip }, + }); + runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`); + return true; + } catch (err) { + runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`); + return false; + } + } + + // Helper to send DM notification to owner + async function sendOwnerNotification(message: string): Promise { + if (!effectiveOwnerShip) { + runtime.log?.("[tlon] No ownerShip configured, cannot send notification"); + return; + } + try { + await sendDm({ + api: api!, + fromShip: botShipName, + toShip: effectiveOwnerShip, + text: message, + }); + runtime.log?.(`[tlon] Sent notification to owner ${effectiveOwnerShip}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`); + } + } + + // Queue a new approval request and notify the owner + async function queueApprovalRequest(approval: PendingApproval): Promise { + // Check if ship is blocked - silently ignore + if (await isShipBlocked(approval.requestingShip)) { + runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`); + return; + } + + // Check for duplicate - if found, update it with new content and re-notify + const existingIndex = pendingApprovals.findIndex( + (a) => + a.type === approval.type && + a.requestingShip === approval.requestingShip && + (approval.type !== "channel" || a.channelNest === approval.channelNest) && + (approval.type !== "group" || a.groupFlag === approval.groupFlag), + ); + + if (existingIndex !== -1) { + // Update existing approval with new content (preserves the original ID) + const existing = pendingApprovals[existingIndex]; + if (approval.originalMessage) { + existing.originalMessage = approval.originalMessage; + existing.messagePreview = approval.messagePreview; + } + runtime.log?.( + `[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`, + ); + await savePendingApprovals(); + const message = formatApprovalRequest(existing); + await sendOwnerNotification(message); + return; + } + + pendingApprovals.push(approval); + await savePendingApprovals(); + + const message = formatApprovalRequest(approval); + await sendOwnerNotification(message); + runtime.log?.( + `[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`, + ); + } + + // Process the owner's approval response + async function handleApprovalResponse(text: string): Promise { + const parsed = parseApprovalResponse(text); + if (!parsed) { + return false; + } + + const approval = findPendingApproval(pendingApprovals, parsed.id); + if (!approval) { + await sendOwnerNotification( + "No pending approval found" + (parsed.id ? ` for ID: ${parsed.id}` : ""), + ); + return true; // Still consumed the message + } + + if (parsed.action === "approve") { + switch (approval.type) { + case "dm": + await addToDmAllowlist(approval.requestingShip); + // Process the original message if available + if (approval.originalMessage) { + runtime.log?.( + `[tlon] Processing original message from ${approval.requestingShip} after approval`, + ); + await processMessage({ + messageId: approval.originalMessage.messageId, + senderShip: approval.requestingShip, + messageText: approval.originalMessage.messageText, + messageContent: approval.originalMessage.messageContent, + isGroup: false, + timestamp: approval.originalMessage.timestamp, + }); + } + break; + + case "channel": + if (approval.channelNest) { + await addToChannelAllowlist(approval.requestingShip, approval.channelNest); + // Process the original message if available + if (approval.originalMessage) { + const parsed = parseChannelNest(approval.channelNest); + runtime.log?.( + `[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`, + ); + await processMessage({ + messageId: approval.originalMessage.messageId, + senderShip: approval.requestingShip, + messageText: approval.originalMessage.messageText, + messageContent: approval.originalMessage.messageContent, + isGroup: true, + channelNest: approval.channelNest, + hostShip: parsed?.hostShip, + channelName: parsed?.channelName, + timestamp: approval.originalMessage.timestamp, + parentId: approval.originalMessage.parentId, + isThreadReply: approval.originalMessage.isThreadReply, + }); + } + } + break; + + case "group": + // Accept the group invite (don't add to allowlist - each invite requires approval) + if (approval.groupFlag) { + try { + await api!.poke({ + app: "groups", + mark: "group-join", + json: { + flag: approval.groupFlag, + "join-all": true, + }, + }); + runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`); + + // Immediately discover channels from the newly joined group + // Small delay to allow the join to propagate + setTimeout(async () => { + try { + const discoveredChannels = await fetchAllChannels(api!, runtime); + let newCount = 0; + for (const channelNest of discoveredChannels) { + if (!watchedChannels.has(channelNest)) { + watchedChannels.add(channelNest); + newCount++; + } + } + if (newCount > 0) { + runtime.log?.( + `[tlon] Discovered ${newCount} new channel(s) after joining group`, + ); + } + } catch (err) { + runtime.log?.(`[tlon] Channel discovery after group join failed: ${String(err)}`); + } + }, 2000); + } catch (err) { + runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`); + } + } + break; + } + + await sendOwnerNotification(formatApprovalConfirmation(approval, "approve")); + } else if (parsed.action === "block") { + // Block the ship using Tlon's native blocking + await blockShip(approval.requestingShip); + await sendOwnerNotification(formatApprovalConfirmation(approval, "block")); + } else { + // Denied - just remove from pending, no notification to requester + await sendOwnerNotification(formatApprovalConfirmation(approval, "deny")); + } + + // Remove from pending + pendingApprovals = removePendingApproval(pendingApprovals, approval.id); + await savePendingApprovals(); + + return true; + } + + // Handle admin commands from owner (unblock, blocked, pending) + async function handleAdminCommand(text: string): Promise { + const command = parseAdminCommand(text); + if (!command) { + return false; + } + + switch (command.type) { + case "blocked": { + const blockedShips = await getBlockedShips(); + await sendOwnerNotification(formatBlockedList(blockedShips)); + runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`); + return true; + } + + case "pending": { + await sendOwnerNotification(formatPendingList(pendingApprovals)); + runtime.log?.( + `[tlon] Owner requested pending approvals list (${pendingApprovals.length} pending)`, + ); + return true; + } + + case "unblock": { + const shipToUnblock = command.ship; + const isBlocked = await isShipBlocked(shipToUnblock); + if (!isBlocked) { + await sendOwnerNotification(`${shipToUnblock} is not blocked.`); + return true; + } + const success = await unblockShip(shipToUnblock); + if (success) { + await sendOwnerNotification(`Unblocked ${shipToUnblock}.`); + } else { + await sendOwnerNotification(`Failed to unblock ${shipToUnblock}.`); + } + return true; + } + } + } + + // Check if a ship is the owner (always allowed to DM) + function isOwner(ship: string): boolean { + if (!effectiveOwnerShip) { + return false; + } + return normalizeShip(ship) === effectiveOwnerShip; + } const processMessage = async (params: { messageId: string; senderShip: string; messageText: string; + messageContent?: unknown; // Raw Tlon content for media extraction isGroup: boolean; - groupChannel?: string; - groupName?: string; + channelNest?: string; + hostShip?: string; + channelName?: string; timestamp: number; parentId?: string | null; + isThreadReply?: boolean; }) => { - const { messageId, senderShip, isGroup, groupChannel, groupName, timestamp, parentId } = params; + const { + messageId, + senderShip, + isGroup, + channelNest, + hostShip, + channelName, + timestamp, + parentId, + isThreadReply, + messageContent, + } = params; + const groupChannel = channelNest; // For compatibility let messageText = params.messageText; + // Download any images from the message content + let attachments: Array<{ path: string; contentType: string }> = []; + if (messageContent) { + try { + attachments = await downloadMessageImages(messageContent); + if (attachments.length > 0) { + runtime.log?.(`[tlon] Downloaded ${attachments.length} image(s) from message`); + } + } catch (error: any) { + runtime.log?.(`[tlon] Failed to download images: ${error?.message ?? String(error)}`); + } + } + + // Fetch thread context when entering a thread for the first time + if (isThreadReply && parentId && groupChannel) { + try { + const threadHistory = await fetchThreadHistory(api, groupChannel, parentId, 20, runtime); + if (threadHistory.length > 0) { + const threadContext = threadHistory + .slice(-10) // Last 10 messages for context + .map((msg) => `${msg.author}: ${msg.content}`) + .join("\n"); + + // Prepend thread context to the message + // Include note about ongoing conversation for agent judgment + const contextNote = `[Thread conversation - ${threadHistory.length} previous replies. You are participating in this thread. Only respond if relevant or helpful - you don't need to reply to every message.]`; + messageText = `${contextNote}\n\n[Previous messages]\n${threadContext}\n\n[Current message]\n${messageText}`; + runtime?.log?.( + `[tlon] Added thread context (${threadHistory.length} replies) to message`, + ); + } + } catch (error: any) { + runtime?.log?.(`[tlon] Could not fetch thread context: ${error?.message ?? String(error)}`); + // Continue without thread context - not critical + } + } + if (isGroup && groupChannel && isSummarizationRequest(messageText)) { try { const history = await getChannelHistory(api, groupChannel, 50, runtime); @@ -326,8 +860,8 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise 0 && !senders.has(senderShip)) { + // Log warning + runtime.log?.( + `[tlon] ⚠️ SECURITY: Multiple users sharing DM session. ` + + `Configure "session.dmScope: per-channel-peer" in OpenClaw config.`, + ); + + // Notify owner via DM (once per monitor session) + if (!sharedSessionWarningSent && effectiveOwnerShip) { + sharedSessionWarningSent = true; + const warningMsg = + `⚠️ Security Warning: Multiple users are sharing a DM session with this bot. ` + + `This can leak conversation context between users.\n\n` + + `Fix: Add to your OpenClaw config:\n` + + `session:\n dmScope: "per-channel-peer"\n\n` + + `Docs: https://docs.openclaw.ai/concepts/session#secure-dm-mode`; + + // Send async, don't block message processing + sendDm({ + api, + fromShip: botShipName, + toShip: effectiveOwnerShip, + text: warningMsg, + }).catch((err) => + runtime.error?.(`[tlon] Failed to send security warning to owner: ${err}`), + ); + } + } + senders.add(senderShip); + } + + const senderRole = isOwner(senderShip) ? "owner" : "user"; + const fromLabel = isGroup + ? `${senderShip} [${senderRole}] in ${channelNest}` + : `${senderShip} [${senderRole}]`; + + // Prepend attachment annotations to message body (similar to Signal format) + let bodyWithAttachments = messageText; + if (attachments.length > 0) { + const mediaLines = attachments + .map((a) => `[media attached: ${a.path} (${a.contentType}) | ${a.path}]`) + .join("\n"); + bodyWithAttachments = mediaLines + "\n" + messageText; + } + const body = core.channel.reply.formatAgentEnvelope({ channel: "Tlon", from: fromLabel, timestamp, - body: messageText, + body: bodyWithAttachments, }); const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, - BodyForAgent: messageText, RawBody: messageText, CommandBody: messageText, From: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`, @@ -377,28 +961,31 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise 0 && { Attachments: attachments }), OriginatingChannel: "tlon", OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`, + // Include thread context for automatic reply routing + ...(parentId && { ThreadId: String(parentId), ReplyToId: String(parentId) }), }); const dispatchStartTime = Date.now(); - const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ + const responsePrefix = core.channel.reply.resolveEffectiveMessagesConfig( cfg, - agentId: route.agentId, - channel: "tlon", - accountId: route.accountId, - }); + route.agentId, + ).responsePrefix; const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, dispatcherOptions: { - ...prefixOptions, + responsePrefix, humanDelay, deliver: async (payload: ReplyPayload) => { let replyText = payload.text; @@ -406,8 +993,8 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise(); - const subscribedDMs = new Set(); - - async function subscribeToChannel(channelNest: string) { - if (subscribedChannels.has(channelNest)) { - return; - } - const parsed = parseChannelNest(channelNest); - if (!parsed) { - runtime.error?.(`[tlon] Invalid channel format: ${channelNest}`); - return; - } + // Track which channels we're interested in for filtering firehose events + const watchedChannels = new Set(groupChannels); + const _watchedDMs = new Set(); + // Firehose handler for all channel messages (/v2) + const handleChannelsFirehose = async (event: any) => { try { - await api!.subscribe({ - app: "channels", - path: `/${channelNest}`, - event: (data: unknown) => { - handleIncomingGroupMessage(channelNest)(data as UrbitUpdate); - }, - err: (error) => { - runtime.error?.(`[tlon] Group subscription error for ${channelNest}: ${String(error)}`); - }, - quit: () => { - runtime.log?.(`[tlon] Group subscription ended for ${channelNest}`); - subscribedChannels.delete(channelNest); - }, + const nest = event?.nest; + if (!nest) { + return; + } + + // Only process channels we're watching + if (!watchedChannels.has(nest)) { + return; + } + + const response = event?.response; + if (!response) { + return; + } + + // Handle post responses (new posts and replies) + const essay = response?.post?.["r-post"]?.set?.essay; + const memo = response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo; + if (!essay && !memo) { + return; + } + + const content = memo || essay; + const isThreadReply = Boolean(memo); + const messageId = isThreadReply ? response?.post?.["r-post"]?.reply?.id : response?.post?.id; + + if (!processedTracker.mark(messageId)) { + return; + } + + const senderShip = normalizeShip(content.author ?? ""); + if (!senderShip || senderShip === botShipName) { + return; + } + + // Resolve any cited/quoted messages first + const citedContent = await resolveAllCites(content.content); + const rawText = extractMessageText(content.content); + const messageText = citedContent + rawText; + if (!messageText.trim()) { + return; + } + + cacheMessage(nest, { + author: senderShip, + content: messageText, + timestamp: content.sent || Date.now(), + id: messageId, }); - subscribedChannels.add(channelNest); - runtime.log?.(`[tlon] Subscribed to group channel: ${channelNest}`); - } catch (error) { - runtime.error?.(`[tlon] Failed to subscribe to ${channelNest}: ${formatError(error)}`); - } - } - async function subscribeToDM(dmShip: string) { - if (subscribedDMs.has(dmShip)) { - return; - } - try { - await api!.subscribe({ - app: "chat", - path: `/dm/${dmShip}`, - event: (data: unknown) => { - handleIncomingDM(data as UrbitUpdate); - }, - err: (error) => { - runtime.error?.(`[tlon] DM subscription error for ${dmShip}: ${String(error)}`); - }, - quit: () => { - runtime.log?.(`[tlon] DM subscription ended for ${dmShip}`); - subscribedDMs.delete(dmShip); - }, - }); - subscribedDMs.add(dmShip); - runtime.log?.(`[tlon] Subscribed to DM with ${dmShip}`); - } catch (error) { - runtime.error?.(`[tlon] Failed to subscribe to DM with ${dmShip}: ${formatError(error)}`); - } - } + // Get thread info early for participation check + const seal = isThreadReply + ? response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal + : response?.post?.["r-post"]?.set?.seal; + const parentId = seal?.["parent-id"] || seal?.parent || null; - async function refreshChannelSubscriptions() { - try { - const dmShips = await api!.scry("/chat/dm.json"); - if (Array.isArray(dmShips)) { - for (const dmShip of dmShips) { - await subscribeToDM(dmShip); + // Check if we should respond: + // 1. Direct mention always triggers response + // 2. Thread replies where we've participated - respond if relevant (let agent decide) + const mentioned = isBotMentioned(messageText, botShipName, botNickname ?? undefined); + const inParticipatedThread = + isThreadReply && parentId && participatedThreads.has(String(parentId)); + + if (!mentioned && !inParticipatedThread) { + return; + } + + // Log why we're responding + if (inParticipatedThread && !mentioned) { + runtime.log?.(`[tlon] Responding to thread we participated in (no mention): ${parentId}`); + } + + // Owner is always allowed + if (isOwner(senderShip)) { + runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`); + } else { + const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings); + if (mode === "restricted") { + const normalizedAllowed = allowedShips.map(normalizeShip); + if (!normalizedAllowed.includes(senderShip)) { + // If owner is configured, queue approval request + if (effectiveOwnerShip) { + const approval = createPendingApproval({ + type: "channel", + requestingShip: senderShip, + channelNest: nest, + messagePreview: messageText.substring(0, 100), + originalMessage: { + messageId: messageId ?? "", + messageText, + messageContent: content.content, + timestamp: content.sent || Date.now(), + parentId: parentId ?? undefined, + isThreadReply, + }, + }); + await queueApprovalRequest(approval); + } else { + runtime.log?.( + `[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`, + ); + } + return; + } } } - if (account.autoDiscoverChannels !== false) { - const discoveredChannels = await fetchAllChannels(api!, runtime); - for (const channelNest of discoveredChannels) { - await subscribeToChannel(channelNest); + const parsed = parseChannelNest(nest); + await processMessage({ + messageId: messageId ?? "", + senderShip, + messageText, + messageContent: content.content, // Pass raw content for media extraction + isGroup: true, + channelNest: nest, + hostShip: parsed?.hostShip, + channelName: parsed?.channelName, + timestamp: content.sent || Date.now(), + parentId, + isThreadReply, + }); + } catch (error: any) { + runtime.error?.( + `[tlon] Error handling channel firehose event: ${error?.message ?? String(error)}`, + ); + } + }; + + // Firehose handler for all DM messages (/v3) + // Track which DM invites we've already processed to avoid duplicate accepts + const processedDmInvites = new Set(); + + const handleChatFirehose = async (event: any) => { + try { + // Handle DM invite lists (arrays) + if (Array.isArray(event)) { + for (const invite of event as DmInvite[]) { + const ship = normalizeShip(invite.ship || ""); + if (!ship || processedDmInvites.has(ship)) { + continue; + } + + // Owner is always allowed + if (isOwner(ship)) { + try { + await api.poke({ + app: "chat", + mark: "chat-dm-rsvp", + json: { ship, ok: true }, + }); + processedDmInvites.add(ship); + runtime.log?.(`[tlon] Auto-accepted DM invite from owner ${ship}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to auto-accept DM from owner: ${String(err)}`); + } + continue; + } + + // Auto-accept if on allowlist and auto-accept is enabled + if (effectiveAutoAcceptDmInvites && isDmAllowed(ship, effectiveDmAllowlist)) { + try { + await api.poke({ + app: "chat", + mark: "chat-dm-rsvp", + json: { ship, ok: true }, + }); + processedDmInvites.add(ship); + runtime.log?.(`[tlon] Auto-accepted DM invite from ${ship}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to auto-accept DM from ${ship}: ${String(err)}`); + } + continue; + } + + // If owner is configured and ship is not on allowlist, queue approval + if (effectiveOwnerShip && !isDmAllowed(ship, effectiveDmAllowlist)) { + const approval = createPendingApproval({ + type: "dm", + requestingShip: ship, + messagePreview: "(DM invite - no message yet)", + }); + await queueApprovalRequest(approval); + processedDmInvites.add(ship); // Mark as processed to avoid duplicate notifications + } + } + return; + } + if (!("whom" in event) || !("response" in event)) { + return; + } + + const _whom = event.whom; // DM partner ship or club ID + const messageId = event.id; + const response = event.response; + + // Handle add events (new messages) + const essay = response?.add?.essay; + if (!essay) { + return; + } + + if (!processedTracker.mark(messageId)) { + return; + } + + const senderShip = normalizeShip(essay.author ?? ""); + if (!senderShip || senderShip === botShipName) { + return; + } + + // Resolve any cited/quoted messages first + const citedContent = await resolveAllCites(essay.content); + const rawText = extractMessageText(essay.content); + const messageText = citedContent + rawText; + if (!messageText.trim()) { + return; + } + + // Check if this is the owner sending an approval response + if (isOwner(senderShip) && isApprovalResponse(messageText)) { + const handled = await handleApprovalResponse(messageText); + if (handled) { + runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`); + return; } } - } catch (error) { - runtime.error?.(`[tlon] Channel refresh failed: ${formatError(error)}`); + + // Check if this is the owner sending an admin command + if (isOwner(senderShip) && isAdminCommand(messageText)) { + const handled = await handleAdminCommand(messageText); + if (handled) { + runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`); + return; + } + } + + // Owner is always allowed to DM (bypass allowlist) + if (isOwner(senderShip)) { + runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`); + await processMessage({ + messageId: messageId ?? "", + senderShip, + messageText, + messageContent: essay.content, + isGroup: false, + timestamp: essay.sent || Date.now(), + }); + return; + } + + // For DMs from others, check allowlist + if (!isDmAllowed(senderShip, effectiveDmAllowlist)) { + // If owner is configured, queue approval request + if (effectiveOwnerShip) { + const approval = createPendingApproval({ + type: "dm", + requestingShip: senderShip, + messagePreview: messageText.substring(0, 100), + originalMessage: { + messageId: messageId ?? "", + messageText, + messageContent: essay.content, + timestamp: essay.sent || Date.now(), + }, + }); + await queueApprovalRequest(approval); + } else { + runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`); + } + return; + } + + await processMessage({ + messageId: messageId ?? "", + senderShip, + messageText, + messageContent: essay.content, // Pass raw content for media extraction + isGroup: false, + timestamp: essay.sent || Date.now(), + }); + } catch (error: any) { + runtime.error?.( + `[tlon] Error handling chat firehose event: ${error?.message ?? String(error)}`, + ); } - } + }; try { - runtime.log?.("[tlon] Subscribing to updates..."); + runtime.log?.("[tlon] Subscribing to firehose updates..."); - let dmShips: string[] = []; - try { - const dmList = await api.scry("/chat/dm.json"); - if (Array.isArray(dmList)) { - dmShips = dmList; - runtime.log?.(`[tlon] Found ${dmShips.length} DM conversation(s)`); + // Subscribe to channels firehose (/v2) + await api.subscribe({ + app: "channels", + path: "/v2", + event: handleChannelsFirehose, + err: (error) => { + runtime.error?.(`[tlon] Channels firehose error: ${String(error)}`); + }, + quit: () => { + runtime.log?.("[tlon] Channels firehose subscription ended"); + }, + }); + runtime.log?.("[tlon] Subscribed to channels firehose (/v2)"); + + // Subscribe to chat/DM firehose (/v3) + await api.subscribe({ + app: "chat", + path: "/v3", + event: handleChatFirehose, + err: (error) => { + runtime.error?.(`[tlon] Chat firehose error: ${String(error)}`); + }, + quit: () => { + runtime.log?.("[tlon] Chat firehose subscription ended"); + }, + }); + runtime.log?.("[tlon] Subscribed to chat firehose (/v3)"); + + // Subscribe to contacts updates to track nickname changes + await api.subscribe({ + app: "contacts", + path: "/v1/news", + event: (event: any) => { + try { + // Look for self profile updates + if (event?.self) { + const selfUpdate = event.self; + if (selfUpdate?.contact?.nickname?.value !== undefined) { + const newNickname = selfUpdate.contact.nickname.value || null; + if (newNickname !== botNickname) { + botNickname = newNickname; + runtime.log?.(`[tlon] Nickname updated: ${botNickname}`); + } + } + } + } catch (error: any) { + runtime.error?.( + `[tlon] Error handling contacts event: ${error?.message ?? String(error)}`, + ); + } + }, + err: (error) => { + runtime.error?.(`[tlon] Contacts subscription error: ${String(error)}`); + }, + quit: () => { + runtime.log?.("[tlon] Contacts subscription ended"); + }, + }); + runtime.log?.("[tlon] Subscribed to contacts updates (/v1/news)"); + + // Subscribe to settings store for hot-reloading config + settingsManager.onChange((newSettings) => { + currentSettings = newSettings; + + // Update watched channels if settings changed + if (newSettings.groupChannels?.length) { + const newChannels = newSettings.groupChannels; + for (const ch of newChannels) { + if (!watchedChannels.has(ch)) { + watchedChannels.add(ch); + runtime.log?.(`[tlon] Settings: now watching channel ${ch}`); + } + } + // Note: we don't remove channels from watchedChannels to avoid missing messages + // during transitions. The authorization check handles access control. } - } catch (error) { - runtime.error?.(`[tlon] Failed to fetch DM list: ${formatError(error)}`); + + // Update DM allowlist + if (newSettings.dmAllowlist !== undefined) { + effectiveDmAllowlist = + newSettings.dmAllowlist.length > 0 ? newSettings.dmAllowlist : account.dmAllowlist; + runtime.log?.(`[tlon] Settings: dmAllowlist updated to ${effectiveDmAllowlist.join(", ")}`); + } + + // Update model signature setting + if (newSettings.showModelSig !== undefined) { + effectiveShowModelSig = newSettings.showModelSig; + runtime.log?.(`[tlon] Settings: showModelSig = ${effectiveShowModelSig}`); + } + + // Update auto-accept DM invites setting + if (newSettings.autoAcceptDmInvites !== undefined) { + effectiveAutoAcceptDmInvites = newSettings.autoAcceptDmInvites; + runtime.log?.(`[tlon] Settings: autoAcceptDmInvites = ${effectiveAutoAcceptDmInvites}`); + } + + // Update auto-accept group invites setting + if (newSettings.autoAcceptGroupInvites !== undefined) { + effectiveAutoAcceptGroupInvites = newSettings.autoAcceptGroupInvites; + runtime.log?.( + `[tlon] Settings: autoAcceptGroupInvites = ${effectiveAutoAcceptGroupInvites}`, + ); + } + + // Update group invite allowlist + if (newSettings.groupInviteAllowlist !== undefined) { + effectiveGroupInviteAllowlist = + newSettings.groupInviteAllowlist.length > 0 + ? newSettings.groupInviteAllowlist + : account.groupInviteAllowlist; + runtime.log?.( + `[tlon] Settings: groupInviteAllowlist updated to ${effectiveGroupInviteAllowlist.join(", ")}`, + ); + } + + if (newSettings.defaultAuthorizedShips !== undefined) { + runtime.log?.( + `[tlon] Settings: defaultAuthorizedShips updated to ${(newSettings.defaultAuthorizedShips || []).join(", ")}`, + ); + } + + // Update auto-discover channels + if (newSettings.autoDiscoverChannels !== undefined) { + effectiveAutoDiscoverChannels = newSettings.autoDiscoverChannels; + runtime.log?.(`[tlon] Settings: autoDiscoverChannels = ${effectiveAutoDiscoverChannels}`); + } + + // Update owner ship + if (newSettings.ownerShip !== undefined) { + effectiveOwnerShip = newSettings.ownerShip + ? normalizeShip(newSettings.ownerShip) + : account.ownerShip + ? normalizeShip(account.ownerShip) + : null; + runtime.log?.(`[tlon] Settings: ownerShip = ${effectiveOwnerShip}`); + } + + // Update pending approvals + if (newSettings.pendingApprovals !== undefined) { + pendingApprovals = newSettings.pendingApprovals; + runtime.log?.( + `[tlon] Settings: pendingApprovals updated (${pendingApprovals.length} items)`, + ); + } + }); + + try { + await settingsManager.startSubscription(); + } catch (err) { + // Settings subscription is optional - don't fail if it doesn't work + runtime.log?.(`[tlon] Settings subscription not available: ${String(err)}`); } - for (const dmShip of dmShips) { - await subscribeToDM(dmShip); + // Subscribe to groups-ui for real-time channel additions (when invites are accepted) + try { + await api.subscribe({ + app: "groups", + path: "/groups/ui", + event: async (event: any) => { + try { + // Handle group/channel join events + // Event structure: { group: { flag: "~host/group-name", ... }, channels: { ... } } + if (event && typeof event === "object") { + // Check for new channels being added to groups + if (event.channels && typeof event.channels === "object") { + const channels = event.channels as Record; + for (const [channelNest, _channelData] of Object.entries(channels)) { + // Only monitor chat channels + if (!channelNest.startsWith("chat/")) { + continue; + } + + // If this is a new channel we're not watching yet, add it + if (!watchedChannels.has(channelNest)) { + watchedChannels.add(channelNest); + runtime.log?.( + `[tlon] Auto-detected new channel (invite accepted): ${channelNest}`, + ); + + // Persist to settings store so it survives restarts + if (effectiveAutoAcceptGroupInvites) { + try { + const currentChannels = currentSettings.groupChannels || []; + if (!currentChannels.includes(channelNest)) { + const updatedChannels = [...currentChannels, channelNest]; + // Poke settings store to persist + await api.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + "bucket-key": "tlon", + "entry-key": "groupChannels", + value: updatedChannels, + desk: "moltbot", + }, + }, + }); + runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`); + } + } catch (err) { + runtime.error?.( + `[tlon] Failed to persist channel to settings: ${String(err)}`, + ); + } + } + } + } + } + + // Also check for the "join" event structure + if (event.join && typeof event.join === "object") { + const join = event.join as { group?: string; channels?: string[] }; + if (join.channels) { + for (const channelNest of join.channels) { + if (!channelNest.startsWith("chat/")) { + continue; + } + if (!watchedChannels.has(channelNest)) { + watchedChannels.add(channelNest); + runtime.log?.(`[tlon] Auto-detected joined channel: ${channelNest}`); + + // Persist to settings store + if (effectiveAutoAcceptGroupInvites) { + try { + const currentChannels = currentSettings.groupChannels || []; + if (!currentChannels.includes(channelNest)) { + const updatedChannels = [...currentChannels, channelNest]; + await api.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + "bucket-key": "tlon", + "entry-key": "groupChannels", + value: updatedChannels, + desk: "moltbot", + }, + }, + }); + runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`); + } + } catch (err) { + runtime.error?.( + `[tlon] Failed to persist channel to settings: ${String(err)}`, + ); + } + } + } + } + } + } + } + } catch (error: any) { + runtime.error?.( + `[tlon] Error handling groups-ui event: ${error?.message ?? String(error)}`, + ); + } + }, + err: (error) => { + runtime.error?.(`[tlon] Groups-ui subscription error: ${String(error)}`); + }, + quit: () => { + runtime.log?.("[tlon] Groups-ui subscription ended"); + }, + }); + runtime.log?.("[tlon] Subscribed to groups-ui for real-time channel detection"); + } catch (err) { + // Groups-ui subscription is optional - channel discovery will still work via polling + runtime.log?.(`[tlon] Groups-ui subscription failed (will rely on polling): ${String(err)}`); } - for (const channelNest of groupChannels) { - await subscribeToChannel(channelNest); + // Subscribe to foreigns for auto-accepting group invites + // Always subscribe so we can hot-reload the setting via settings store + { + const processedGroupInvites = new Set(); + + // Helper to process pending invites + const processPendingInvites = async (foreigns: Foreigns) => { + if (!foreigns || typeof foreigns !== "object") { + return; + } + + for (const [groupFlag, foreign] of Object.entries(foreigns)) { + if (processedGroupInvites.has(groupFlag)) { + continue; + } + if (!foreign.invites || foreign.invites.length === 0) { + continue; + } + + const validInvite = foreign.invites.find((inv) => inv.valid); + if (!validInvite) { + continue; + } + + const inviterShip = validInvite.from; + const normalizedInviter = normalizeShip(inviterShip); + + // Owner invites are always accepted + if (isOwner(inviterShip)) { + try { + await api.poke({ + app: "groups", + mark: "group-join", + json: { + flag: groupFlag, + "join-all": true, + }, + }); + processedGroupInvites.add(groupFlag); + runtime.log?.(`[tlon] Auto-accepted group invite from owner: ${groupFlag}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to accept group invite from owner: ${String(err)}`); + } + continue; + } + + // Skip if auto-accept is disabled + if (!effectiveAutoAcceptGroupInvites) { + // If owner is configured, queue approval + if (effectiveOwnerShip) { + const approval = createPendingApproval({ + type: "group", + requestingShip: inviterShip, + groupFlag, + }); + await queueApprovalRequest(approval); + processedGroupInvites.add(groupFlag); + } + continue; + } + + // Check if inviter is on allowlist + const isAllowed = + effectiveGroupInviteAllowlist.length > 0 + ? effectiveGroupInviteAllowlist + .map((s) => normalizeShip(s)) + .some((s) => s === normalizedInviter) + : false; // Fail-safe: empty allowlist means deny + + if (!isAllowed) { + // If owner is configured, queue approval + if (effectiveOwnerShip) { + const approval = createPendingApproval({ + type: "group", + requestingShip: inviterShip, + groupFlag, + }); + await queueApprovalRequest(approval); + processedGroupInvites.add(groupFlag); + } else { + runtime.log?.( + `[tlon] Rejected group invite from ${inviterShip} (not in groupInviteAllowlist): ${groupFlag}`, + ); + processedGroupInvites.add(groupFlag); + } + continue; + } + + // Inviter is on allowlist - accept the invite + try { + await api.poke({ + app: "groups", + mark: "group-join", + json: { + flag: groupFlag, + "join-all": true, + }, + }); + processedGroupInvites.add(groupFlag); + runtime.log?.( + `[tlon] Auto-accepted group invite: ${groupFlag} (from ${validInvite.from})`, + ); + } catch (err) { + runtime.error?.(`[tlon] Failed to auto-accept group ${groupFlag}: ${String(err)}`); + } + } + }; + + // Process existing pending invites from init data + if (initForeigns) { + await processPendingInvites(initForeigns); + } + + try { + await api.subscribe({ + app: "groups", + path: "/v1/foreigns", + event: (data: unknown) => { + void (async () => { + try { + await processPendingInvites(data as Foreigns); + } catch (error: any) { + runtime.error?.( + `[tlon] Error handling foreigns event: ${error?.message ?? String(error)}`, + ); + } + })(); + }, + err: (error) => { + runtime.error?.(`[tlon] Foreigns subscription error: ${String(error)}`); + }, + quit: () => { + runtime.log?.("[tlon] Foreigns subscription ended"); + }, + }); + runtime.log?.( + "[tlon] Subscribed to foreigns (/v1/foreigns) for auto-accepting group invites", + ); + } catch (err) { + runtime.log?.(`[tlon] Foreigns subscription failed: ${String(err)}`); + } + } + + // Discover channels to watch + if (effectiveAutoDiscoverChannels) { + const discoveredChannels = await fetchAllChannels(api, runtime); + for (const channelNest of discoveredChannels) { + watchedChannels.add(channelNest); + } + runtime.log?.(`[tlon] Watching ${watchedChannels.size} channel(s)`); + } + + // Log watched channels + for (const channelNest of watchedChannels) { + runtime.log?.(`[tlon] Watching channel: ${channelNest}`); } runtime.log?.("[tlon] All subscriptions registered, connecting to SSE stream..."); await api.connect(); - runtime.log?.("[tlon] Connected! All subscriptions active"); + runtime.log?.("[tlon] Connected! Firehose subscriptions active"); + // Periodically refresh channel discovery const pollInterval = setInterval( - () => { + async () => { if (!opts.abortSignal?.aborted) { - refreshChannelSubscriptions().catch((error) => { - runtime.error?.(`[tlon] Channel refresh error: ${formatError(error)}`); - }); + try { + if (effectiveAutoDiscoverChannels) { + const discoveredChannels = await fetchAllChannels(api, runtime); + for (const channelNest of discoveredChannels) { + if (!watchedChannels.has(channelNest)) { + watchedChannels.add(channelNest); + runtime.log?.(`[tlon] Now watching new channel: ${channelNest}`); + } + } + } + } catch (error: any) { + runtime.error?.(`[tlon] Channel refresh error: ${error?.message ?? String(error)}`); + } } }, 2 * 60 * 1000, @@ -589,8 +1816,8 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { + try { + // Ensure media directory exists + await mkdir(mediaDir, { recursive: true }); + + // Fetch the image + const response = await fetch(url); + if (!response.ok) { + console.error(`[tlon-media] Failed to fetch ${url}: ${response.status}`); + return null; + } + + // Determine content type and extension + const contentType = response.headers.get("content-type") || "application/octet-stream"; + const ext = getExtensionFromContentType(contentType) || getExtensionFromUrl(url) || "bin"; + + // Generate unique filename + const filename = `${randomUUID()}.${ext}`; + const localPath = path.join(mediaDir, filename); + + // Stream to file + const body = response.body; + if (!body) { + console.error(`[tlon-media] No response body for ${url}`); + return null; + } + + const writeStream = createWriteStream(localPath); + await pipeline(Readable.fromWeb(body as any), writeStream); + + return { + localPath, + contentType, + originalUrl: url, + }; + } catch (error: any) { + console.error(`[tlon-media] Error downloading ${url}: ${error?.message ?? String(error)}`); + return null; + } +} + +function getExtensionFromContentType(contentType: string): string | null { + const map: Record = { + "image/jpeg": "jpg", + "image/jpg": "jpg", + "image/png": "png", + "image/gif": "gif", + "image/webp": "webp", + "image/svg+xml": "svg", + "video/mp4": "mp4", + "video/webm": "webm", + "audio/mpeg": "mp3", + "audio/ogg": "ogg", + }; + return map[contentType.split(";")[0].trim()] ?? null; +} + +function getExtensionFromUrl(url: string): string | null { + try { + const pathname = new URL(url).pathname; + const match = pathname.match(/\.([a-z0-9]+)$/i); + return match ? match[1].toLowerCase() : null; + } catch { + return null; + } +} + +/** + * Download all images from a message and return attachment metadata. + * Format matches OpenClaw's expected attachment structure. + */ +export async function downloadMessageImages( + content: unknown, + mediaDir?: string, +): Promise> { + const images = extractImageBlocks(content); + if (images.length === 0) { + return []; + } + + const attachments: Array<{ path: string; contentType: string }> = []; + + for (const image of images) { + const downloaded = await downloadMedia(image.url, mediaDir); + if (downloaded) { + attachments.push({ + path: downloaded.localPath, + contentType: downloaded.contentType, + }); + } + } + + return attachments; +} diff --git a/extensions/tlon/src/monitor/utils.ts b/extensions/tlon/src/monitor/utils.ts index 3c0103a7235..3995fbb727d 100644 --- a/extensions/tlon/src/monitor/utils.ts +++ b/extensions/tlon/src/monitor/utils.ts @@ -1,12 +1,76 @@ import { normalizeShip } from "../targets.js"; +// Cite types for message references +export interface ChanCite { + chan: { nest: string; where: string }; +} +export interface GroupCite { + group: string; +} +export interface DeskCite { + desk: { flag: string; where: string }; +} +export interface BaitCite { + bait: { group: string; graph: string; where: string }; +} +export type Cite = ChanCite | GroupCite | DeskCite | BaitCite; + +export interface ParsedCite { + type: "chan" | "group" | "desk" | "bait"; + nest?: string; + author?: string; + postId?: string; + group?: string; + flag?: string; + where?: string; +} + +// Extract all cites from message content +export function extractCites(content: unknown): ParsedCite[] { + if (!content || !Array.isArray(content)) { + return []; + } + + const cites: ParsedCite[] = []; + + for (const verse of content) { + if (verse?.block?.cite && typeof verse.block.cite === "object") { + const cite = verse.block.cite; + + if (cite.chan && typeof cite.chan === "object") { + const { nest, where } = cite.chan; + const whereMatch = where?.match(/\/msg\/(~[a-z-]+)\/(.+)/); + cites.push({ + type: "chan", + nest, + where, + author: whereMatch?.[1], + postId: whereMatch?.[2], + }); + } else if (cite.group && typeof cite.group === "string") { + cites.push({ type: "group", group: cite.group }); + } else if (cite.desk && typeof cite.desk === "object") { + cites.push({ type: "desk", flag: cite.desk.flag, where: cite.desk.where }); + } else if (cite.bait && typeof cite.bait === "object") { + cites.push({ + type: "bait", + group: cite.bait.group, + nest: cite.bait.graph, + where: cite.bait.where, + }); + } + } + } + + return cites; +} + export function formatModelName(modelString?: string | null): string { if (!modelString) { return "AI"; } const modelName = modelString.includes("/") ? modelString.split("/")[1] : modelString; const modelMappings: Record = { - "claude-opus-4-6": "Claude Opus 4.6", "claude-opus-4-5": "Claude Opus 4.5", "claude-sonnet-4-5": "Claude Sonnet 4.5", "claude-sonnet-3-5": "Claude Sonnet 3.5", @@ -27,62 +91,225 @@ export function formatModelName(modelString?: string | null): string { .join(" "); } -export function isBotMentioned(messageText: string, botShipName: string): boolean { +export function isBotMentioned( + messageText: string, + botShipName: string, + nickname?: string, +): boolean { if (!messageText || !botShipName) { return false; } + + // Check for @all mention + if (/@all\b/i.test(messageText)) { + return true; + } + + // Check for ship mention const normalizedBotShip = normalizeShip(botShipName); const escapedShip = normalizedBotShip.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); const mentionPattern = new RegExp(`(^|\\s)${escapedShip}(?=\\s|$)`, "i"); - return mentionPattern.test(messageText); + if (mentionPattern.test(messageText)) { + return true; + } + + // Check for nickname mention (case-insensitive, word boundary) + if (nickname) { + const escapedNickname = nickname.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const nicknamePattern = new RegExp(`(^|\\s)${escapedNickname}(?=\\s|$|[,!?.])`, "i"); + if (nicknamePattern.test(messageText)) { + return true; + } + } + + return false; } export function isDmAllowed(senderShip: string, allowlist: string[] | undefined): boolean { if (!allowlist || allowlist.length === 0) { - return true; + return false; } const normalizedSender = normalizeShip(senderShip); return allowlist.map((ship) => normalizeShip(ship)).some((ship) => ship === normalizedSender); } +/** + * Check if a group invite from a ship should be auto-accepted. + * + * SECURITY: Fail-safe to deny. If allowlist is empty or undefined, + * ALL invites are rejected - even if autoAcceptGroupInvites is enabled. + * This prevents misconfigured bots from accepting malicious invites. + */ +export function isGroupInviteAllowed( + inviterShip: string, + allowlist: string[] | undefined, +): boolean { + // SECURITY: Fail-safe to deny when no allowlist configured + if (!allowlist || allowlist.length === 0) { + return false; + } + const normalizedInviter = normalizeShip(inviterShip); + return allowlist.map((ship) => normalizeShip(ship)).some((ship) => ship === normalizedInviter); +} + +// Helper to recursively extract text from inline content +function extractInlineText(items: any[]): string { + return items + .map((item: any) => { + if (typeof item === "string") { + return item; + } + if (item && typeof item === "object") { + if (item.ship) { + return item.ship; + } + if ("sect" in item) { + return `@${item.sect || "all"}`; + } + if (item["inline-code"]) { + return `\`${item["inline-code"]}\``; + } + if (item.code) { + return `\`${item.code}\``; + } + if (item.link && item.link.href) { + return item.link.content || item.link.href; + } + if (item.bold && Array.isArray(item.bold)) { + return `**${extractInlineText(item.bold)}**`; + } + if (item.italics && Array.isArray(item.italics)) { + return `*${extractInlineText(item.italics)}*`; + } + if (item.strike && Array.isArray(item.strike)) { + return `~~${extractInlineText(item.strike)}~~`; + } + } + return ""; + }) + .join(""); +} + export function extractMessageText(content: unknown): string { if (!content || !Array.isArray(content)) { return ""; } - return ( - content - // oxlint-disable-next-line typescript/no-explicit-any - .map((block: any) => { - if (block.inline && Array.isArray(block.inline)) { - return ( - block.inline - // oxlint-disable-next-line typescript/no-explicit-any - .map((item: any) => { - if (typeof item === "string") { - return item; - } - if (item && typeof item === "object") { - if (item.ship) { - return item.ship; - } - if (item.break !== undefined) { - return "\n"; - } - if (item.link && item.link.href) { - return item.link.href; - } - } - return ""; - }) - .join("") - ); + return content + .map((verse: any) => { + // Handle inline content (text, ships, links, etc.) + if (verse.inline && Array.isArray(verse.inline)) { + return verse.inline + .map((item: any) => { + if (typeof item === "string") { + return item; + } + if (item && typeof item === "object") { + if (item.ship) { + return item.ship; + } + // Handle sect (role mentions like @all) + if ("sect" in item) { + return `@${item.sect || "all"}`; + } + if (item.break !== undefined) { + return "\n"; + } + if (item.link && item.link.href) { + return item.link.href; + } + // Handle inline code (Tlon uses "inline-code" key) + if (item["inline-code"]) { + return `\`${item["inline-code"]}\``; + } + if (item.code) { + return `\`${item.code}\``; + } + // Handle bold/italic/strike - recursively extract text + if (item.bold && Array.isArray(item.bold)) { + return `**${extractInlineText(item.bold)}**`; + } + if (item.italics && Array.isArray(item.italics)) { + return `*${extractInlineText(item.italics)}*`; + } + if (item.strike && Array.isArray(item.strike)) { + return `~~${extractInlineText(item.strike)}~~`; + } + // Handle blockquote inline + if (item.blockquote && Array.isArray(item.blockquote)) { + return `> ${extractInlineText(item.blockquote)}`; + } + } + return ""; + }) + .join(""); + } + + // Handle block content (images, code blocks, etc.) + if (verse.block && typeof verse.block === "object") { + const block = verse.block; + + // Image blocks + if (block.image && block.image.src) { + const alt = block.image.alt ? ` (${block.image.alt})` : ""; + return `\n${block.image.src}${alt}\n`; } - return ""; - }) - .join("\n") - .trim() - ); + + // Code blocks + if (block.code && typeof block.code === "object") { + const lang = block.code.lang || ""; + const code = block.code.code || ""; + return `\n\`\`\`${lang}\n${code}\n\`\`\`\n`; + } + + // Header blocks + if (block.header && typeof block.header === "object") { + const text = + block.header.content + ?.map((item: any) => (typeof item === "string" ? item : "")) + .join("") || ""; + return `\n## ${text}\n`; + } + + // Cite/quote blocks - parse the reference structure + if (block.cite && typeof block.cite === "object") { + const cite = block.cite; + + // ChanCite - reference to a channel message + if (cite.chan && typeof cite.chan === "object") { + const { nest, where } = cite.chan; + // where is typically /msg/~author/timestamp + const whereMatch = where?.match(/\/msg\/(~[a-z-]+)\/(.+)/); + if (whereMatch) { + const [, author, _postId] = whereMatch; + return `\n> [quoted: ${author} in ${nest}]\n`; + } + return `\n> [quoted from ${nest}]\n`; + } + + // GroupCite - reference to a group + if (cite.group && typeof cite.group === "string") { + return `\n> [ref: group ${cite.group}]\n`; + } + + // DeskCite - reference to an app/desk + if (cite.desk && typeof cite.desk === "object") { + return `\n> [ref: ${cite.desk.flag}]\n`; + } + + // BaitCite - reference with group+graph context + if (cite.bait && typeof cite.bait === "object") { + return `\n> [ref: ${cite.bait.graph} in ${cite.bait.group}]\n`; + } + + return `\n> [quoted message]\n`; + } + } + + return ""; + }) + .join("\n") + .trim(); } export function isSummarizationRequest(messageText: string): boolean { diff --git a/extensions/tlon/src/settings.ts b/extensions/tlon/src/settings.ts new file mode 100644 index 00000000000..8e74009049d --- /dev/null +++ b/extensions/tlon/src/settings.ts @@ -0,0 +1,391 @@ +/** + * Settings Store integration for hot-reloading Tlon plugin config. + * + * Settings are stored in Urbit's %settings agent under: + * desk: "moltbot" + * bucket: "tlon" + * + * This allows config changes via poke from any Landscape client + * without requiring a gateway restart. + */ + +import type { UrbitSSEClient } from "./urbit/sse-client.js"; + +/** Pending approval request stored for persistence */ +export type PendingApproval = { + id: string; + type: "dm" | "channel" | "group"; + requestingShip: string; + channelNest?: string; + groupFlag?: string; + messagePreview?: string; + /** Full message context for processing after approval */ + originalMessage?: { + messageId: string; + messageText: string; + messageContent: unknown; + timestamp: number; + parentId?: string; + isThreadReply?: boolean; + }; + timestamp: number; +}; + +export type TlonSettingsStore = { + groupChannels?: string[]; + dmAllowlist?: string[]; + autoDiscover?: boolean; + showModelSig?: boolean; + autoAcceptDmInvites?: boolean; + autoDiscoverChannels?: boolean; + autoAcceptGroupInvites?: boolean; + /** Ships allowed to invite us to groups (when autoAcceptGroupInvites is true) */ + groupInviteAllowlist?: string[]; + channelRules?: Record< + string, + { + mode?: "restricted" | "open"; + allowedShips?: string[]; + } + >; + defaultAuthorizedShips?: string[]; + /** Ship that receives approval requests for DMs, channel mentions, and group invites */ + ownerShip?: string; + /** Pending approval requests awaiting owner response */ + pendingApprovals?: PendingApproval[]; +}; + +export type TlonSettingsState = { + current: TlonSettingsStore; + loaded: boolean; +}; + +const SETTINGS_DESK = "moltbot"; +const SETTINGS_BUCKET = "tlon"; + +/** + * Parse channelRules - handles both JSON string and object formats. + * Settings-store doesn't support nested objects, so we store as JSON string. + */ +function parseChannelRules( + value: unknown, +): Record | undefined { + if (!value) { + return undefined; + } + + // If it's a string, try to parse as JSON + if (typeof value === "string") { + try { + const parsed = JSON.parse(value); + if (isChannelRulesObject(parsed)) { + return parsed; + } + } catch { + return undefined; + } + } + + // If it's already an object, use directly + if (isChannelRulesObject(value)) { + return value; + } + + return undefined; +} + +/** + * Parse settings from the raw Urbit settings-store response. + * The response shape is: { [bucket]: { [key]: value } } + */ +function parseSettingsResponse(raw: unknown): TlonSettingsStore { + if (!raw || typeof raw !== "object") { + return {}; + } + + const desk = raw as Record; + const bucket = desk[SETTINGS_BUCKET]; + if (!bucket || typeof bucket !== "object") { + return {}; + } + + const settings = bucket as Record; + + return { + groupChannels: Array.isArray(settings.groupChannels) + ? settings.groupChannels.filter((x): x is string => typeof x === "string") + : undefined, + dmAllowlist: Array.isArray(settings.dmAllowlist) + ? settings.dmAllowlist.filter((x): x is string => typeof x === "string") + : undefined, + autoDiscover: typeof settings.autoDiscover === "boolean" ? settings.autoDiscover : undefined, + showModelSig: typeof settings.showModelSig === "boolean" ? settings.showModelSig : undefined, + autoAcceptDmInvites: + typeof settings.autoAcceptDmInvites === "boolean" ? settings.autoAcceptDmInvites : undefined, + autoAcceptGroupInvites: + typeof settings.autoAcceptGroupInvites === "boolean" + ? settings.autoAcceptGroupInvites + : undefined, + groupInviteAllowlist: Array.isArray(settings.groupInviteAllowlist) + ? settings.groupInviteAllowlist.filter((x): x is string => typeof x === "string") + : undefined, + channelRules: parseChannelRules(settings.channelRules), + defaultAuthorizedShips: Array.isArray(settings.defaultAuthorizedShips) + ? settings.defaultAuthorizedShips.filter((x): x is string => typeof x === "string") + : undefined, + ownerShip: typeof settings.ownerShip === "string" ? settings.ownerShip : undefined, + pendingApprovals: parsePendingApprovals(settings.pendingApprovals), + }; +} + +function isChannelRulesObject( + val: unknown, +): val is Record { + if (!val || typeof val !== "object" || Array.isArray(val)) { + return false; + } + for (const [, rule] of Object.entries(val)) { + if (!rule || typeof rule !== "object") { + return false; + } + } + return true; +} + +/** + * Parse pendingApprovals - handles both JSON string and array formats. + * Settings-store stores complex objects as JSON strings. + */ +function parsePendingApprovals(value: unknown): PendingApproval[] | undefined { + if (!value) { + return undefined; + } + + // If it's a string, try to parse as JSON + let parsed: unknown = value; + if (typeof value === "string") { + try { + parsed = JSON.parse(value); + } catch { + return undefined; + } + } + + // Validate it's an array + if (!Array.isArray(parsed)) { + return undefined; + } + + // Filter to valid PendingApproval objects + return parsed.filter((item): item is PendingApproval => { + if (!item || typeof item !== "object") { + return false; + } + const obj = item as Record; + return ( + typeof obj.id === "string" && + (obj.type === "dm" || obj.type === "channel" || obj.type === "group") && + typeof obj.requestingShip === "string" && + typeof obj.timestamp === "number" + ); + }); +} + +/** + * Parse a single settings entry update event. + */ +function parseSettingsEvent(event: unknown): { key: string; value: unknown } | null { + if (!event || typeof event !== "object") { + return null; + } + + const evt = event as Record; + + // Handle put-entry events + if (evt["put-entry"]) { + const put = evt["put-entry"] as Record; + if (put.desk !== SETTINGS_DESK || put["bucket-key"] !== SETTINGS_BUCKET) { + return null; + } + return { + key: String(put["entry-key"] ?? ""), + value: put.value, + }; + } + + // Handle del-entry events + if (evt["del-entry"]) { + const del = evt["del-entry"] as Record; + if (del.desk !== SETTINGS_DESK || del["bucket-key"] !== SETTINGS_BUCKET) { + return null; + } + return { + key: String(del["entry-key"] ?? ""), + value: undefined, + }; + } + + return null; +} + +/** + * Apply a single settings update to the current state. + */ +function applySettingsUpdate( + current: TlonSettingsStore, + key: string, + value: unknown, +): TlonSettingsStore { + const next = { ...current }; + + switch (key) { + case "groupChannels": + next.groupChannels = Array.isArray(value) + ? value.filter((x): x is string => typeof x === "string") + : undefined; + break; + case "dmAllowlist": + next.dmAllowlist = Array.isArray(value) + ? value.filter((x): x is string => typeof x === "string") + : undefined; + break; + case "autoDiscover": + next.autoDiscover = typeof value === "boolean" ? value : undefined; + break; + case "showModelSig": + next.showModelSig = typeof value === "boolean" ? value : undefined; + break; + case "autoAcceptDmInvites": + next.autoAcceptDmInvites = typeof value === "boolean" ? value : undefined; + break; + case "autoAcceptGroupInvites": + next.autoAcceptGroupInvites = typeof value === "boolean" ? value : undefined; + break; + case "groupInviteAllowlist": + next.groupInviteAllowlist = Array.isArray(value) + ? value.filter((x): x is string => typeof x === "string") + : undefined; + break; + case "channelRules": + next.channelRules = parseChannelRules(value); + break; + case "defaultAuthorizedShips": + next.defaultAuthorizedShips = Array.isArray(value) + ? value.filter((x): x is string => typeof x === "string") + : undefined; + break; + case "ownerShip": + next.ownerShip = typeof value === "string" ? value : undefined; + break; + case "pendingApprovals": + next.pendingApprovals = parsePendingApprovals(value); + break; + } + + return next; +} + +export type SettingsLogger = { + log?: (msg: string) => void; + error?: (msg: string) => void; +}; + +/** + * Create a settings store subscription manager. + * + * Usage: + * const settings = createSettingsManager(api, logger); + * await settings.load(); + * settings.subscribe((newSettings) => { ... }); + */ +export function createSettingsManager(api: UrbitSSEClient, logger?: SettingsLogger) { + let state: TlonSettingsState = { + current: {}, + loaded: false, + }; + + const listeners = new Set<(settings: TlonSettingsStore) => void>(); + + const notify = () => { + for (const listener of listeners) { + try { + listener(state.current); + } catch (err) { + logger?.error?.(`[settings] Listener error: ${String(err)}`); + } + } + }; + + return { + /** + * Get current settings (may be empty if not loaded yet). + */ + get current(): TlonSettingsStore { + return state.current; + }, + + /** + * Whether initial settings have been loaded. + */ + get loaded(): boolean { + return state.loaded; + }, + + /** + * Load initial settings via scry. + */ + async load(): Promise { + try { + const raw = await api.scry("/settings/all.json"); + // Response shape: { all: { [desk]: { [bucket]: { [key]: value } } } } + const allData = raw as { all?: Record> }; + const deskData = allData?.all?.[SETTINGS_DESK]; + state.current = parseSettingsResponse(deskData ?? {}); + state.loaded = true; + logger?.log?.(`[settings] Loaded: ${JSON.stringify(state.current)}`); + return state.current; + } catch (err) { + // Settings desk may not exist yet - that's fine, use defaults + logger?.log?.(`[settings] No settings found (using defaults): ${String(err)}`); + state.current = {}; + state.loaded = true; + return state.current; + } + }, + + /** + * Subscribe to settings changes. + */ + async startSubscription(): Promise { + await api.subscribe({ + app: "settings", + path: "/desk/" + SETTINGS_DESK, + event: (event) => { + const update = parseSettingsEvent(event); + if (!update) { + return; + } + + logger?.log?.(`[settings] Update: ${update.key} = ${JSON.stringify(update.value)}`); + state.current = applySettingsUpdate(state.current, update.key, update.value); + notify(); + }, + err: (error) => { + logger?.error?.(`[settings] Subscription error: ${String(error)}`); + }, + quit: () => { + logger?.log?.("[settings] Subscription ended"); + }, + }); + logger?.log?.("[settings] Subscribed to settings updates"); + }, + + /** + * Register a listener for settings changes. + */ + onChange(listener: (settings: TlonSettingsStore) => void): () => void { + listeners.add(listener); + return () => listeners.delete(listener); + }, + }; +} diff --git a/extensions/tlon/src/targets.ts b/extensions/tlon/src/targets.ts index b93ede64bae..bacc6d576c0 100644 --- a/extensions/tlon/src/targets.ts +++ b/extensions/tlon/src/targets.ts @@ -1,5 +1,5 @@ export type TlonTarget = - | { kind: "direct"; ship: string } + | { kind: "dm"; ship: string } | { kind: "group"; nest: string; hostShip: string; channelName: string }; const SHIP_RE = /^~?[a-z-]+$/i; @@ -32,7 +32,7 @@ export function parseTlonTarget(raw?: string | null): TlonTarget | null { const dmPrefix = withoutPrefix.match(/^dm[/:](.+)$/i); if (dmPrefix) { - return { kind: "direct", ship: normalizeShip(dmPrefix[1]) }; + return { kind: "dm", ship: normalizeShip(dmPrefix[1]) }; } const groupPrefix = withoutPrefix.match(/^(group|room)[/:](.+)$/i); @@ -78,7 +78,7 @@ export function parseTlonTarget(raw?: string | null): TlonTarget | null { } if (SHIP_RE.test(withoutPrefix)) { - return { kind: "direct", ship: normalizeShip(withoutPrefix) }; + return { kind: "dm", ship: normalizeShip(withoutPrefix) }; } return null; diff --git a/extensions/tlon/src/types.ts b/extensions/tlon/src/types.ts index 9447e6c9b8a..81f38adc76b 100644 --- a/extensions/tlon/src/types.ts +++ b/extensions/tlon/src/types.ts @@ -11,8 +11,15 @@ export type TlonResolvedAccount = { allowPrivateNetwork: boolean | null; groupChannels: string[]; dmAllowlist: string[]; + /** Ships allowed to invite us to groups (security: prevent malicious group invites) */ + groupInviteAllowlist: string[]; autoDiscoverChannels: boolean | null; showModelSignature: boolean | null; + autoAcceptDmInvites: boolean | null; + autoAcceptGroupInvites: boolean | null; + defaultAuthorizedShips: string[]; + /** Ship that receives approval requests for DMs, channel mentions, and group invites */ + ownerShip: string | null; }; export function resolveTlonAccount( @@ -29,8 +36,12 @@ export function resolveTlonAccount( allowPrivateNetwork?: boolean; groupChannels?: string[]; dmAllowlist?: string[]; + groupInviteAllowlist?: string[]; autoDiscoverChannels?: boolean; showModelSignature?: boolean; + autoAcceptDmInvites?: boolean; + autoAcceptGroupInvites?: boolean; + ownerShip?: string; accounts?: Record>; } | undefined; @@ -47,8 +58,13 @@ export function resolveTlonAccount( allowPrivateNetwork: null, groupChannels: [], dmAllowlist: [], + groupInviteAllowlist: [], autoDiscoverChannels: null, showModelSignature: null, + autoAcceptDmInvites: null, + autoAcceptGroupInvites: null, + defaultAuthorizedShips: [], + ownerShip: null, }; } @@ -63,12 +79,25 @@ export function resolveTlonAccount( | null; const groupChannels = (account?.groupChannels ?? base.groupChannels ?? []) as string[]; const dmAllowlist = (account?.dmAllowlist ?? base.dmAllowlist ?? []) as string[]; + const groupInviteAllowlist = (account?.groupInviteAllowlist ?? + base.groupInviteAllowlist ?? + []) as string[]; const autoDiscoverChannels = (account?.autoDiscoverChannels ?? base.autoDiscoverChannels ?? null) as boolean | null; const showModelSignature = (account?.showModelSignature ?? base.showModelSignature ?? null) as | boolean | null; + const autoAcceptDmInvites = (account?.autoAcceptDmInvites ?? base.autoAcceptDmInvites ?? null) as + | boolean + | null; + const autoAcceptGroupInvites = (account?.autoAcceptGroupInvites ?? + base.autoAcceptGroupInvites ?? + null) as boolean | null; + const ownerShip = (account?.ownerShip ?? base.ownerShip ?? null) as string | null; + const defaultAuthorizedShips = ((account as Record)?.defaultAuthorizedShips ?? + (base as Record)?.defaultAuthorizedShips ?? + []) as string[]; const configured = Boolean(ship && url && code); return { @@ -82,8 +111,13 @@ export function resolveTlonAccount( allowPrivateNetwork, groupChannels, dmAllowlist, + groupInviteAllowlist, autoDiscoverChannels, showModelSignature, + autoAcceptDmInvites, + autoAcceptGroupInvites, + defaultAuthorizedShips, + ownerShip, }; } diff --git a/extensions/tlon/src/urbit/foreigns.ts b/extensions/tlon/src/urbit/foreigns.ts new file mode 100644 index 00000000000..c9ce7c5002a --- /dev/null +++ b/extensions/tlon/src/urbit/foreigns.ts @@ -0,0 +1,49 @@ +/** + * Types for Urbit groups foreigns (group invites) + * Based on packages/shared/src/urbit/groups.ts from homestead + */ + +export interface GroupPreviewV7 { + meta: { + title: string; + description: string; + image: string; + cover: string; + }; + "channel-count": number; + "member-count": number; + admissions: { + privacy: "public" | "private" | "secret"; + }; +} + +export interface ForeignInvite { + flag: string; // group flag e.g. "~host/group-name" + time: number; // timestamp + from: string; // ship that sent invite + token: string | null; + note: string | null; + preview: GroupPreviewV7; + valid: boolean; // tracks if invite has been revoked +} + +export type Lookup = "preview" | "done" | "error"; +export type Progress = "ask" | "join" | "watch" | "done" | "error"; + +export interface Foreign { + invites: ForeignInvite[]; + lookup: Lookup | null; + preview: GroupPreviewV7 | null; + progress: Progress | null; + token: string | null; +} + +export interface Foreigns { + [flag: string]: Foreign; +} + +// DM invite structure from chat /v3 firehose +export interface DmInvite { + ship: string; + // Additional fields may be present +} diff --git a/extensions/tlon/src/urbit/http-api.ts b/extensions/tlon/src/urbit/http-api.ts new file mode 100644 index 00000000000..13edb97b805 --- /dev/null +++ b/extensions/tlon/src/urbit/http-api.ts @@ -0,0 +1,38 @@ +import { Urbit } from "@urbit/http-api"; + +let patched = false; + +export function ensureUrbitConnectPatched() { + if (patched) { + return; + } + patched = true; + Urbit.prototype.connect = async function patchedConnect() { + const resp = await fetch(`${this.url}/~/login`, { + method: "POST", + body: `password=${this.code}`, + credentials: "include", + }); + + if (resp.status >= 400) { + throw new Error(`Login failed with status ${resp.status}`); + } + + const cookie = resp.headers.get("set-cookie"); + if (cookie) { + const match = /urbauth-~([\w-]+)/.exec(cookie); + if (match) { + if (!(this as unknown as { ship?: string | null }).ship) { + (this as unknown as { ship?: string | null }).ship = match[1]; + } + (this as unknown as { nodeId?: string }).nodeId = match[1]; + } + (this as unknown as { cookie?: string }).cookie = cookie; + } + + await (this as typeof Urbit.prototype).getShipName(); + await (this as typeof Urbit.prototype).getOurName(); + }; +} + +export { Urbit }; diff --git a/extensions/tlon/src/urbit/send.ts b/extensions/tlon/src/urbit/send.ts index b848e99f4e4..70a16ce57d3 100644 --- a/extensions/tlon/src/urbit/send.ts +++ b/extensions/tlon/src/urbit/send.ts @@ -1,4 +1,5 @@ import { scot, da } from "@urbit/aura"; +import { markdownToStory, createImageBlock, isImageUrl, type Story } from "./story.js"; export type TlonPokeApi = { poke: (params: { app: string; mark: string; json: unknown }) => Promise; @@ -11,8 +12,19 @@ type SendTextParams = { text: string; }; +type SendStoryParams = { + api: TlonPokeApi; + fromShip: string; + toShip: string; + story: Story; +}; + export async function sendDm({ api, fromShip, toShip, text }: SendTextParams) { - const story = [{ inline: [text] }]; + const story: Story = markdownToStory(text); + return sendDmWithStory({ api, fromShip, toShip, story }); +} + +export async function sendDmWithStory({ api, fromShip, toShip, story }: SendStoryParams) { const sentAt = Date.now(); const idUd = scot("ud", da.fromUnix(sentAt)); const id = `${fromShip}/${idUd}`; @@ -52,6 +64,15 @@ type SendGroupParams = { replyToId?: string | null; }; +type SendGroupStoryParams = { + api: TlonPokeApi; + fromShip: string; + hostShip: string; + channelName: string; + story: Story; + replyToId?: string | null; +}; + export async function sendGroupMessage({ api, fromShip, @@ -60,13 +81,25 @@ export async function sendGroupMessage({ text, replyToId, }: SendGroupParams) { - const story = [{ inline: [text] }]; + const story: Story = markdownToStory(text); + return sendGroupMessageWithStory({ api, fromShip, hostShip, channelName, story, replyToId }); +} + +export async function sendGroupMessageWithStory({ + api, + fromShip, + hostShip, + channelName, + story, + replyToId, +}: SendGroupStoryParams) { const sentAt = Date.now(); // Format reply ID as @ud (with dots) - required for Tlon to recognize thread replies let formattedReplyId = replyToId; if (replyToId && /^\d+$/.test(replyToId)) { try { + // scot('ud', n) formats a number as @ud with dots formattedReplyId = scot("ud", BigInt(replyToId)); } catch { // Fall back to raw ID if formatting fails @@ -129,3 +162,27 @@ export function buildMediaText(text: string | undefined, mediaUrl: string | unde } return cleanText; } + +/** + * Build a story with text and optional media (image) + */ +export function buildMediaStory(text: string | undefined, mediaUrl: string | undefined): Story { + const story: Story = []; + const cleanText = text?.trim() ?? ""; + const cleanUrl = mediaUrl?.trim() ?? ""; + + // Add text content if present + if (cleanText) { + story.push(...markdownToStory(cleanText)); + } + + // Add image block if URL looks like an image + if (cleanUrl && isImageUrl(cleanUrl)) { + story.push(createImageBlock(cleanUrl, "")); + } else if (cleanUrl) { + // For non-image URLs, add as a link + story.push({ inline: [{ link: { href: cleanUrl, content: cleanUrl } }] }); + } + + return story.length > 0 ? story : [{ inline: [""] }]; +} diff --git a/extensions/tlon/src/urbit/sse-client.test.ts b/extensions/tlon/src/urbit/sse-client.test.ts deleted file mode 100644 index b37c3be05f8..00000000000 --- a/extensions/tlon/src/urbit/sse-client.test.ts +++ /dev/null @@ -1,44 +0,0 @@ -import type { LookupFn } from "openclaw/plugin-sdk"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { UrbitSSEClient } from "./sse-client.js"; - -const mockFetch = vi.fn(); - -describe("UrbitSSEClient", () => { - beforeEach(() => { - vi.stubGlobal("fetch", mockFetch); - mockFetch.mockReset(); - }); - - afterEach(() => { - vi.unstubAllGlobals(); - }); - - it("sends subscriptions added after connect", async () => { - mockFetch.mockResolvedValue({ ok: true, status: 200, text: async () => "" }); - const lookupFn = (async () => [{ address: "1.1.1.1", family: 4 }]) as unknown as LookupFn; - - const client = new UrbitSSEClient("https://example.com", "urbauth-~zod=123", { - lookupFn, - }); - (client as { isConnected: boolean }).isConnected = true; - - await client.subscribe({ - app: "chat", - path: "/dm/~zod", - event: () => {}, - }); - - expect(mockFetch).toHaveBeenCalledTimes(1); - const [url, init] = mockFetch.mock.calls[0]; - expect(url).toBe(client.channelUrl); - expect(init.method).toBe("PUT"); - const body = JSON.parse(init.body as string); - expect(body).toHaveLength(1); - expect(body[0]).toMatchObject({ - action: "subscribe", - app: "chat", - path: "/dm/~zod", - }); - }); -}); diff --git a/extensions/tlon/src/urbit/sse-client.ts b/extensions/tlon/src/urbit/sse-client.ts index df128e51b87..fc793f8b40f 100644 --- a/extensions/tlon/src/urbit/sse-client.ts +++ b/extensions/tlon/src/urbit/sse-client.ts @@ -1,9 +1,6 @@ import { randomUUID } from "node:crypto"; import { Readable } from "node:stream"; -import type { LookupFn, SsrFPolicy } from "openclaw/plugin-sdk"; -import { ensureUrbitChannelOpen, pokeUrbitChannel, scryUrbitPath } from "./channel-ops.js"; import { getUrbitContext, normalizeUrbitCookie } from "./context.js"; -import { urbitFetch } from "./fetch.js"; export type UrbitSseLogger = { log?: (message: string) => void; @@ -12,9 +9,6 @@ export type UrbitSseLogger = { type UrbitSseOptions = { ship?: string; - ssrfPolicy?: SsrFPolicy; - lookupFn?: LookupFn; - fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; onReconnect?: (client: UrbitSSEClient) => Promise | void; autoReconnect?: boolean; maxReconnectAttempts?: number; @@ -50,10 +44,11 @@ export class UrbitSSEClient { maxReconnectDelay: number; isConnected = false; logger: UrbitSseLogger; - ssrfPolicy?: SsrFPolicy; - lookupFn?: LookupFn; - fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; - streamRelease: (() => Promise) | null = null; + + // Event ack tracking - must ack every ~50 events to keep channel healthy + private lastHeardEventId = -1; + private lastAcknowledgedEventId = -1; + private readonly ackThreshold = 20; constructor(url: string, cookie: string, options: UrbitSseOptions = {}) { const ctx = getUrbitContext(url, options.ship); @@ -68,9 +63,6 @@ export class UrbitSSEClient { this.reconnectDelay = options.reconnectDelay ?? 1000; this.maxReconnectDelay = options.maxReconnectDelay ?? 30000; this.logger = options.logger ?? {}; - this.ssrfPolicy = options.ssrfPolicy; - this.lookupFn = options.lookupFn; - this.fetchImpl = options.fetchImpl; } async subscribe(params: { @@ -110,52 +102,56 @@ export class UrbitSSEClient { app: string; path: string; }) { - const { response, release } = await urbitFetch({ - baseUrl: this.url, - path: `/~/channel/${this.channelId}`, - init: { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, - }, - body: JSON.stringify([subscription]), + const response = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, }, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, - timeoutMs: 30_000, - auditContext: "tlon-urbit-subscribe", + body: JSON.stringify([subscription]), }); - try { - if (!response.ok && response.status !== 204) { - const errorText = await response.text().catch(() => ""); - throw new Error( - `Subscribe failed: ${response.status}${errorText ? ` - ${errorText}` : ""}`, - ); - } - } finally { - await release(); + if (!response.ok && response.status !== 204) { + const errorText = await response.text(); + throw new Error(`Subscribe failed: ${response.status} - ${errorText}`); } } async connect() { - await ensureUrbitChannelOpen( - { - baseUrl: this.url, - cookie: this.cookie, - ship: this.ship, - channelId: this.channelId, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, + const createResp = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, }, - { - createBody: this.subscriptions, - createAuditContext: "tlon-urbit-channel-create", + body: JSON.stringify(this.subscriptions), + }); + + if (!createResp.ok && createResp.status !== 204) { + throw new Error(`Channel creation failed: ${createResp.status}`); + } + + const pokeResp = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, }, - ); + body: JSON.stringify([ + { + id: Date.now(), + action: "poke", + ship: this.ship, + app: "hood", + mark: "helm-hi", + json: "Opening API channel", + }, + ]), + }); + + if (!pokeResp.ok && pokeResp.status !== 204) { + throw new Error(`Channel activation failed: ${pokeResp.status}`); + } await this.openStream(); this.isConnected = true; @@ -163,38 +159,15 @@ export class UrbitSSEClient { } async openStream() { - // Use AbortController with manual timeout so we only abort during initial connection, - // not after the SSE stream is established and actively streaming. - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 60_000); - - this.streamController = controller; - - const { response, release } = await urbitFetch({ - baseUrl: this.url, - path: `/~/channel/${this.channelId}`, - init: { - method: "GET", - headers: { - Accept: "text/event-stream", - Cookie: this.cookie, - }, + const response = await fetch(this.channelUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + Cookie: this.cookie, }, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, - signal: controller.signal, - auditContext: "tlon-urbit-sse-stream", }); - this.streamRelease = release; - - // Clear timeout once connection established (headers received). - clearTimeout(timeoutId); - if (!response.ok) { - await release(); - this.streamRelease = null; throw new Error(`Stream connection failed: ${response.status}`); } @@ -214,8 +187,8 @@ export class UrbitSSEClient { if (!body) { return; } - // oxlint-disable-next-line typescript/no-explicit-any - const stream = body instanceof ReadableStream ? Readable.fromWeb(body as any) : body; + // @ts-expect-error - ReadableStream type variance issue between DOM and Node types + const stream = body instanceof ReadableStream ? Readable.fromWeb(body) : body; let buffer = ""; try { @@ -232,12 +205,6 @@ export class UrbitSSEClient { } } } finally { - if (this.streamRelease) { - const release = this.streamRelease; - this.streamRelease = null; - await release(); - } - this.streamController = null; if (!this.aborted && this.autoReconnect) { this.isConnected = false; this.logger.log?.("[SSE] Stream ended, attempting reconnection..."); @@ -249,17 +216,33 @@ export class UrbitSSEClient { processEvent(eventData: string) { const lines = eventData.split("\n"); let data: string | null = null; + let eventId: number | null = null; for (const line of lines) { if (line.startsWith("data: ")) { data = line.substring(6); } + if (line.startsWith("id: ")) { + eventId = parseInt(line.substring(4), 10); + } } if (!data) { return; } + // Track event ID and send ack if needed + if (eventId !== null && !isNaN(eventId)) { + if (eventId > this.lastHeardEventId) { + this.lastHeardEventId = eventId; + if (eventId - this.lastAcknowledgedEventId > this.ackThreshold) { + this.ack(eventId).catch((err) => { + this.logger.error?.(`Failed to ack event ${eventId}: ${String(err)}`); + }); + } + } + } + try { const parsed = JSON.parse(data) as { id?: number; json?: unknown; response?: string }; @@ -290,32 +273,72 @@ export class UrbitSSEClient { } } - async poke(params: { app: string; mark: string; json: unknown }) { - return await pokeUrbitChannel( - { - baseUrl: this.url, - cookie: this.cookie, - ship: this.ship, - channelId: this.channelId, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, + private async ack(eventId: number): Promise { + this.lastAcknowledgedEventId = eventId; + + const ackData = { + action: "ack", + "event-id": eventId, + }; + + const response = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, }, - { ...params, auditContext: "tlon-urbit-poke" }, - ); + body: JSON.stringify([ackData]), + }); + + if (!response.ok && response.status !== 204) { + throw new Error(`Ack failed: ${response.status}`); + } + + this.logger.log?.(`[SSE] Acked event ${eventId}`); + } + + async poke(params: { app: string; mark: string; json: unknown }) { + const pokeId = Date.now(); + const pokeData = { + id: pokeId, + action: "poke", + ship: this.ship, + app: params.app, + mark: params.mark, + json: params.json, + }; + + const response = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify([pokeData]), + }); + + if (!response.ok && response.status !== 204) { + const errorText = await response.text(); + throw new Error(`Poke failed: ${response.status} - ${errorText}`); + } + + return pokeId; } async scry(path: string) { - return await scryUrbitPath( - { - baseUrl: this.url, - cookie: this.cookie, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, + const scryUrl = `${this.url}/~/scry${path}`; + const response = await fetch(scryUrl, { + method: "GET", + headers: { + Cookie: this.cookie, }, - { path, auditContext: "tlon-urbit-scry" }, - ); + }); + + if (!response.ok) { + throw new Error(`Scry failed: ${response.status} for path ${path}`); + } + + return await response.json(); } async attemptReconnect() { @@ -324,11 +347,16 @@ export class UrbitSSEClient { return; } + // Reset after max attempts with extended backoff, then continue trying forever if (this.reconnectAttempts >= this.maxReconnectAttempts) { - this.logger.error?.( - `[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Giving up.`, + this.logger.log?.( + `[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Will reset and retry after extended backoff...`, ); - return; + // Wait 10 seconds before resetting and trying again + const extendedBackoff = 10000; // 10 seconds + await new Promise((resolve) => setTimeout(resolve, extendedBackoff)); + this.reconnectAttempts = 0; // Reset counter to continue trying + this.logger.log?.("[SSE] Reconnection attempts reset, resuming reconnection..."); } this.reconnectAttempts += 1; @@ -362,7 +390,6 @@ export class UrbitSSEClient { async close() { this.aborted = true; this.isConnected = false; - this.streamController?.abort(); try { const unsubscribes = this.subscriptions.map((sub) => ({ @@ -371,61 +398,23 @@ export class UrbitSSEClient { subscription: sub.id, })); - { - const { response, release } = await urbitFetch({ - baseUrl: this.url, - path: `/~/channel/${this.channelId}`, - init: { - method: "PUT", - headers: { - "Content-Type": "application/json", - Cookie: this.cookie, - }, - body: JSON.stringify(unsubscribes), - }, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, - timeoutMs: 30_000, - auditContext: "tlon-urbit-unsubscribe", - }); - try { - void response.body?.cancel(); - } finally { - await release(); - } - } + await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify(unsubscribes), + }); - { - const { response, release } = await urbitFetch({ - baseUrl: this.url, - path: `/~/channel/${this.channelId}`, - init: { - method: "DELETE", - headers: { - Cookie: this.cookie, - }, - }, - ssrfPolicy: this.ssrfPolicy, - lookupFn: this.lookupFn, - fetchImpl: this.fetchImpl, - timeoutMs: 30_000, - auditContext: "tlon-urbit-channel-close", - }); - try { - void response.body?.cancel(); - } finally { - await release(); - } - } + await fetch(this.channelUrl, { + method: "DELETE", + headers: { + Cookie: this.cookie, + }, + }); } catch (error) { this.logger.error?.(`Error closing channel: ${String(error)}`); } - - if (this.streamRelease) { - const release = this.streamRelease; - this.streamRelease = null; - await release(); - } } } diff --git a/extensions/tlon/src/urbit/story.ts b/extensions/tlon/src/urbit/story.ts new file mode 100644 index 00000000000..01a18c2eb09 --- /dev/null +++ b/extensions/tlon/src/urbit/story.ts @@ -0,0 +1,347 @@ +/** + * Tlon Story Format - Rich text converter + * + * Converts markdown-like text to Tlon's story format. + */ + +// Inline content types +export type StoryInline = + | string + | { bold: StoryInline[] } + | { italics: StoryInline[] } + | { strike: StoryInline[] } + | { blockquote: StoryInline[] } + | { "inline-code": string } + | { code: string } + | { ship: string } + | { link: { href: string; content: string } } + | { break: null } + | { tag: string }; + +// Block content types +export type StoryBlock = + | { header: { tag: "h1" | "h2" | "h3" | "h4" | "h5" | "h6"; content: StoryInline[] } } + | { code: { code: string; lang: string } } + | { image: { src: string; height: number; width: number; alt: string } } + | { rule: null } + | { listing: StoryListing }; + +export type StoryListing = + | { + list: { + type: "ordered" | "unordered" | "tasklist"; + items: StoryListing[]; + contents: StoryInline[]; + }; + } + | { item: StoryInline[] }; + +// A verse is either a block or inline content +export type StoryVerse = { block: StoryBlock } | { inline: StoryInline[] }; + +// A story is a list of verses +export type Story = StoryVerse[]; + +/** + * Parse inline markdown formatting (bold, italic, code, links, mentions) + */ +function parseInlineMarkdown(text: string): StoryInline[] { + const result: StoryInline[] = []; + let remaining = text; + + while (remaining.length > 0) { + // Ship mentions: ~sampel-palnet + const shipMatch = remaining.match(/^(~[a-z][-a-z0-9]*)/); + if (shipMatch) { + result.push({ ship: shipMatch[1] }); + remaining = remaining.slice(shipMatch[0].length); + continue; + } + + // Bold: **text** or __text__ + const boldMatch = remaining.match(/^\*\*(.+?)\*\*|^__(.+?)__/); + if (boldMatch) { + const content = boldMatch[1] || boldMatch[2]; + result.push({ bold: parseInlineMarkdown(content) }); + remaining = remaining.slice(boldMatch[0].length); + continue; + } + + // Italics: *text* or _text_ (but not inside words for _) + const italicsMatch = remaining.match(/^\*([^*]+?)\*|^_([^_]+?)_(?![a-zA-Z0-9])/); + if (italicsMatch) { + const content = italicsMatch[1] || italicsMatch[2]; + result.push({ italics: parseInlineMarkdown(content) }); + remaining = remaining.slice(italicsMatch[0].length); + continue; + } + + // Strikethrough: ~~text~~ + const strikeMatch = remaining.match(/^~~(.+?)~~/); + if (strikeMatch) { + result.push({ strike: parseInlineMarkdown(strikeMatch[1]) }); + remaining = remaining.slice(strikeMatch[0].length); + continue; + } + + // Inline code: `code` + const codeMatch = remaining.match(/^`([^`]+)`/); + if (codeMatch) { + result.push({ "inline-code": codeMatch[1] }); + remaining = remaining.slice(codeMatch[0].length); + continue; + } + + // Links: [text](url) + const linkMatch = remaining.match(/^\[([^\]]+)\]\(([^)]+)\)/); + if (linkMatch) { + result.push({ link: { href: linkMatch[2], content: linkMatch[1] } }); + remaining = remaining.slice(linkMatch[0].length); + continue; + } + + // Markdown images: ![alt](url) + const imageMatch = remaining.match(/^!\[([^\]]*)\]\(([^)]+)\)/); + if (imageMatch) { + // Return a special marker that will be hoisted to a block + result.push({ + __image: { src: imageMatch[2], alt: imageMatch[1] }, + } as unknown as StoryInline); + remaining = remaining.slice(imageMatch[0].length); + continue; + } + + // Plain URL detection + const urlMatch = remaining.match(/^(https?:\/\/[^\s<>"\]]+)/); + if (urlMatch) { + result.push({ link: { href: urlMatch[1], content: urlMatch[1] } }); + remaining = remaining.slice(urlMatch[0].length); + continue; + } + + // Hashtags: #tag - disabled, chat UI doesn't render them + // const tagMatch = remaining.match(/^#([a-zA-Z][a-zA-Z0-9_-]*)/); + // if (tagMatch) { + // result.push({ tag: tagMatch[1] }); + // remaining = remaining.slice(tagMatch[0].length); + // continue; + // } + + // Plain text: consume until next special character or URL start + // Exclude : and / to allow URL detection to work (stops before https://) + const plainMatch = remaining.match(/^[^*_`~[#~\n:/]+/); + if (plainMatch) { + result.push(plainMatch[0]); + remaining = remaining.slice(plainMatch[0].length); + continue; + } + + // Single special char that didn't match a pattern + result.push(remaining[0]); + remaining = remaining.slice(1); + } + + // Merge adjacent strings + return mergeAdjacentStrings(result); +} + +/** + * Merge adjacent string elements in an inline array + */ +function mergeAdjacentStrings(inlines: StoryInline[]): StoryInline[] { + const result: StoryInline[] = []; + for (const item of inlines) { + if (typeof item === "string" && typeof result[result.length - 1] === "string") { + result[result.length - 1] = (result[result.length - 1] as string) + item; + } else { + result.push(item); + } + } + return result; +} + +/** + * Create an image block + */ +export function createImageBlock( + src: string, + alt: string = "", + height: number = 0, + width: number = 0, +): StoryVerse { + return { + block: { + image: { src, height, width, alt }, + }, + }; +} + +/** + * Check if URL looks like an image + */ +export function isImageUrl(url: string): boolean { + const imageExtensions = /\.(jpg|jpeg|png|gif|webp|svg|bmp|ico)(\?.*)?$/i; + return imageExtensions.test(url); +} + +/** + * Process inlines and extract any image markers into blocks + */ +function processInlinesForImages(inlines: StoryInline[]): { + inlines: StoryInline[]; + imageBlocks: StoryVerse[]; +} { + const cleanInlines: StoryInline[] = []; + const imageBlocks: StoryVerse[] = []; + + for (const inline of inlines) { + if (typeof inline === "object" && "__image" in inline) { + const img = (inline as unknown as { __image: { src: string; alt: string } }).__image; + imageBlocks.push(createImageBlock(img.src, img.alt)); + } else { + cleanInlines.push(inline); + } + } + + return { inlines: cleanInlines, imageBlocks }; +} + +/** + * Convert markdown text to Tlon story format + */ +export function markdownToStory(markdown: string): Story { + const story: Story = []; + const lines = markdown.split("\n"); + let i = 0; + + while (i < lines.length) { + const line = lines[i]; + + // Code block: ```lang\ncode\n``` + if (line.startsWith("```")) { + const lang = line.slice(3).trim() || "plaintext"; + const codeLines: string[] = []; + i++; + while (i < lines.length && !lines[i].startsWith("```")) { + codeLines.push(lines[i]); + i++; + } + story.push({ + block: { + code: { + code: codeLines.join("\n"), + lang, + }, + }, + }); + i++; // skip closing ``` + continue; + } + + // Headers: # H1, ## H2, etc. + const headerMatch = line.match(/^(#{1,6})\s+(.+)$/); + if (headerMatch) { + const level = headerMatch[1].length as 1 | 2 | 3 | 4 | 5 | 6; + const tag = `h${level}` as "h1" | "h2" | "h3" | "h4" | "h5" | "h6"; + story.push({ + block: { + header: { + tag, + content: parseInlineMarkdown(headerMatch[2]), + }, + }, + }); + i++; + continue; + } + + // Horizontal rule: --- or *** + if (/^(-{3,}|\*{3,})$/.test(line.trim())) { + story.push({ block: { rule: null } }); + i++; + continue; + } + + // Blockquote: > text + if (line.startsWith("> ")) { + const quoteLines: string[] = []; + while (i < lines.length && lines[i].startsWith("> ")) { + quoteLines.push(lines[i].slice(2)); + i++; + } + const quoteText = quoteLines.join("\n"); + story.push({ + inline: [{ blockquote: parseInlineMarkdown(quoteText) }], + }); + continue; + } + + // Empty line - skip + if (line.trim() === "") { + i++; + continue; + } + + // Regular paragraph - collect consecutive non-empty lines + const paragraphLines: string[] = []; + while ( + i < lines.length && + lines[i].trim() !== "" && + !lines[i].startsWith("#") && + !lines[i].startsWith("```") && + !lines[i].startsWith("> ") && + !/^(-{3,}|\*{3,})$/.test(lines[i].trim()) + ) { + paragraphLines.push(lines[i]); + i++; + } + + if (paragraphLines.length > 0) { + const paragraphText = paragraphLines.join("\n"); + // Convert newlines within paragraph to break elements + const inlines = parseInlineMarkdown(paragraphText); + // Replace \n in strings with break elements + const withBreaks: StoryInline[] = []; + for (const inline of inlines) { + if (typeof inline === "string" && inline.includes("\n")) { + const parts = inline.split("\n"); + for (let j = 0; j < parts.length; j++) { + if (parts[j]) { + withBreaks.push(parts[j]); + } + if (j < parts.length - 1) { + withBreaks.push({ break: null }); + } + } + } else { + withBreaks.push(inline); + } + } + + // Extract any images from inlines and add as separate blocks + const { inlines: cleanInlines, imageBlocks } = processInlinesForImages(withBreaks); + + if (cleanInlines.length > 0) { + story.push({ inline: cleanInlines }); + } + story.push(...imageBlocks); + } + } + + return story; +} + +/** + * Convert plain text to simple story (no markdown parsing) + */ +export function textToStory(text: string): Story { + return [{ inline: [text] }]; +} + +/** + * Check if text contains markdown formatting + */ +export function hasMarkdown(text: string): boolean { + // Check for common markdown patterns + return /(\*\*|__|~~|`|^#{1,6}\s|^```|^\s*[-*]\s|\[.*\]\(.*\)|^>\s)/m.test(text); +} diff --git a/extensions/tlon/src/urbit/upload.ts b/extensions/tlon/src/urbit/upload.ts new file mode 100644 index 00000000000..7c3e1cb6c6a --- /dev/null +++ b/extensions/tlon/src/urbit/upload.ts @@ -0,0 +1,17 @@ +/** + * Upload an image from a URL to Tlon storage. + * + * NOTE: uploadFile is not yet available in @tloncorp/api-beta. + * For now, this returns the original URL. Once the API supports uploads, + * we can implement proper Tlon storage uploads. + */ + +/** + * Fetch an image from a URL and upload it to Tlon storage. + * Returns the uploaded URL, or falls back to the original URL on error. + */ +export async function uploadImageFromUrl(imageUrl: string): Promise { + // TODO: Implement once @tloncorp/api exports uploadFile + // For now, just return the original URL + return imageUrl; +}