diff --git a/extensions/tlon/src/monitor/index.ts b/extensions/tlon/src/monitor/index.ts index f5940a1b2b7..b1d8d546248 100644 --- a/extensions/tlon/src/monitor/index.ts +++ b/extensions/tlon/src/monitor/index.ts @@ -22,7 +22,10 @@ import { createTlonCitationResolver } from "./cites.js"; import { fetchAllChannels, fetchInitData } from "./discovery.js"; import { cacheMessage, fetchThreadHistory, getChannelHistory } from "./history.js"; import { downloadMessageImages } from "./media.js"; -import { createProcessedMessageTracker } from "./processed-messages.js"; +import { + createProcessedMessageTracker, + runWithProcessedMessageClaim, +} from "./processed-messages.js"; import { applyTlonSettingsOverrides, buildTlonSettingsMigrations, @@ -725,105 +728,113 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { + const senderShip = normalizeShip(readString(content, "author") ?? ""); + if (!senderShip || senderShip === botShipName) { return; } - } + + const rawText = extractMessageText(content.content); + if (!rawText.trim()) { + return; + } + + const contentBody = content.content; + const sentAt = readNumber(content, "sent") ?? Date.now(); + + cacheMessage(nest, { + author: senderShip, + content: rawText, + timestamp: sentAt, + id: messageId, + }); + + // Get thread info early for participation check + const seal = isThreadReply ? asRecord(replySet?.seal) : asRecord(set?.seal); + const parentId = readString(seal, "parent-id") ?? readString(seal, "parent") ?? null; + + // Check if we should respond: + // 1. Direct mention always triggers response + // 2. Thread replies where we've participated - respond if relevant (let agent decide) + const mentioned = isBotMentioned(rawText, botShipName, botNickname ?? undefined); + const inParticipatedThread = + isThreadReply && parentId && participatedThreads.has(parentId); + + if (!mentioned && !inParticipatedThread) { + return; + } + + // Log why we're responding + if (inParticipatedThread && !mentioned) { + runtime.log?.( + `[tlon] Responding to thread we participated in (no mention): ${parentId}`, + ); + } + + // Owner is always allowed + if (isOwner(senderShip)) { + runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`); + } else { + const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings); + if (mode === "restricted") { + const normalizedAllowed = allowedShips.map(normalizeShip); + if (!normalizedAllowed.includes(senderShip)) { + // If owner is configured, queue approval request + if (effectiveOwnerShip) { + const approval = createPendingApproval({ + type: "channel", + requestingShip: senderShip, + channelNest: nest, + messagePreview: rawText.substring(0, 100), + originalMessage: { + messageId: messageId ?? "", + messageText: rawText, + messageContent: contentBody, + timestamp: sentAt, + parentId: parentId ?? undefined, + isThreadReply, + }, + }); + await queueApprovalRequest(approval); + } else { + runtime.log?.( + `[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`, + ); + } + return; + } + } + } + + const messageText = await resolveAuthorizedMessageText({ + rawText, + content: contentBody, + authorizedForCites: true, + resolveAllCites, + }); + + const parsed = parseChannelNest(nest); + await processMessage({ + messageId: messageId ?? "", + senderShip, + messageText, + messageContent: contentBody, // Pass raw content for media extraction + isGroup: true, + channelNest: nest, + hostShip: parsed?.hostShip, + channelName: parsed?.channelName, + timestamp: sentAt, + parentId, + isThreadReply, + }); + }, + }); + if (processed.kind === "duplicate") { + return; } - - const messageText = await resolveAuthorizedMessageText({ - rawText, - content: contentBody, - authorizedForCites: true, - resolveAllCites, - }); - - const parsed = parseChannelNest(nest); - await processMessage({ - messageId: messageId ?? "", - senderShip, - messageText, - messageContent: contentBody, // Pass raw content for media extraction - isGroup: true, - channelNest: nest, - hostShip: parsed?.hostShip, - channelName: parsed?.channelName, - timestamp: sentAt, - parentId, - isThreadReply, - }); } catch (error: unknown) { runtime.error?.(`[tlon] Error handling channel firehose event: ${formatErrorMessage(error)}`); } @@ -906,108 +917,113 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { + const authorShip = normalizeShip(readString(essay, "author") ?? ""); + const partnerShip = extractDmPartnerShip(whom); + const senderShip = partnerShip || authorShip; - const authorShip = normalizeShip(readString(essay, "author") ?? ""); - const partnerShip = extractDmPartnerShip(whom); - const senderShip = partnerShip || authorShip; + // Ignore the bot's own outbound DM events. + if (authorShip === botShipName) { + return; + } + if (!senderShip || senderShip === botShipName) { + return; + } - // Ignore the bot's own outbound DM events. - if (authorShip === botShipName) { - return; - } - if (!senderShip || senderShip === botShipName) { - return; - } + // Log mismatch between author and partner for debugging + if (authorShip && partnerShip && authorShip !== partnerShip) { + runtime.log?.( + `[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`, + ); + } - // Log mismatch between author and partner for debugging - if (authorShip && partnerShip && authorShip !== partnerShip) { - runtime.log?.( - `[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`, - ); - } + const rawText = extractMessageText(essay.content); + if (!rawText.trim()) { + return; + } - const rawText = extractMessageText(essay.content); - if (!rawText.trim()) { - return; - } + // Check if this is the owner sending an approval response + const messageText = rawText; + if (isOwner(senderShip) && isApprovalResponse(messageText)) { + const handled = await handleApprovalResponse(messageText); + if (handled) { + runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`); + return; + } + } - // Check if this is the owner sending an approval response - const messageText = rawText; - if (isOwner(senderShip) && isApprovalResponse(messageText)) { - const handled = await handleApprovalResponse(messageText); - if (handled) { - runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`); - return; - } - } + // Check if this is the owner sending an admin command + if (isOwner(senderShip) && isAdminCommand(messageText)) { + const handled = await handleAdminCommand(messageText); + if (handled) { + runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`); + return; + } + } - // Check if this is the owner sending an admin command - if (isOwner(senderShip) && isAdminCommand(messageText)) { - const handled = await handleAdminCommand(messageText); - if (handled) { - runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`); - return; - } - } - - // Owner is always allowed to DM (bypass allowlist) - if (isOwner(senderShip)) { - const resolvedMessageText = await resolveAuthorizedMessageText({ - rawText, - content: essay.content, - authorizedForCites: true, - resolveAllCites, - }); - runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`); - await processMessage({ - messageId: messageId ?? "", - senderShip, - messageText: resolvedMessageText, - messageContent: essay.content, - isGroup: false, - timestamp: readNumber(essay, "sent") ?? Date.now(), - }); - return; - } - - // For DMs from others, check allowlist - if (!isDmAllowed(senderShip, effectiveDmAllowlist)) { - // If owner is configured, queue approval request - if (effectiveOwnerShip) { - const approval = createPendingApproval({ - type: "dm", - requestingShip: senderShip, - messagePreview: messageText.substring(0, 100), - originalMessage: { + // Owner is always allowed to DM (bypass allowlist) + if (isOwner(senderShip)) { + const resolvedMessageText = await resolveAuthorizedMessageText({ + rawText, + content: essay.content, + authorizedForCites: true, + resolveAllCites, + }); + runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`); + await processMessage({ messageId: messageId ?? "", - messageText, + senderShip, + messageText: resolvedMessageText, messageContent: essay.content, + isGroup: false, timestamp: readNumber(essay, "sent") ?? Date.now(), - }, + }); + return; + } + + // For DMs from others, check allowlist + if (!isDmAllowed(senderShip, effectiveDmAllowlist)) { + // If owner is configured, queue approval request + if (effectiveOwnerShip) { + const approval = createPendingApproval({ + type: "dm", + requestingShip: senderShip, + messagePreview: messageText.substring(0, 100), + originalMessage: { + messageId: messageId ?? "", + messageText, + messageContent: essay.content, + timestamp: readNumber(essay, "sent") ?? Date.now(), + }, + }); + await queueApprovalRequest(approval); + } else { + runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`); + } + return; + } + + await processMessage({ + messageText: await resolveAuthorizedMessageText({ + rawText, + content: essay.content, + authorizedForCites: true, + resolveAllCites, + }), + messageId: messageId ?? "", + senderShip, + messageContent: essay.content, // Pass raw content for media extraction + isGroup: false, + timestamp: readNumber(essay, "sent") ?? Date.now(), }); - await queueApprovalRequest(approval); - } else { - runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`); - } + }, + }); + if (processed.kind === "duplicate") { return; } - - await processMessage({ - messageText: await resolveAuthorizedMessageText({ - rawText, - content: essay.content, - authorizedForCites: true, - resolveAllCites, - }), - messageId: messageId ?? "", - senderShip, - messageContent: essay.content, // Pass raw content for media extraction - isGroup: false, - timestamp: readNumber(essay, "sent") ?? Date.now(), - }); } catch (error: unknown) { runtime.error?.(`[tlon] Error handling chat firehose event: ${formatErrorMessage(error)}`); } diff --git a/extensions/tlon/src/monitor/processed-messages.test.ts b/extensions/tlon/src/monitor/processed-messages.test.ts index 00855690445..e4e350cadee 100644 --- a/extensions/tlon/src/monitor/processed-messages.test.ts +++ b/extensions/tlon/src/monitor/processed-messages.test.ts @@ -1,5 +1,8 @@ import { describe, expect, it } from "vitest"; -import { createProcessedMessageTracker } from "./processed-messages.js"; +import { + createProcessedMessageTracker, + runWithProcessedMessageClaim, +} from "./processed-messages.js"; describe("createProcessedMessageTracker", () => { it("dedupes and evicts oldest entries", () => { @@ -20,4 +23,36 @@ describe("createProcessedMessageTracker", () => { expect(tracker.has("c")).toBe(true); expect(tracker.has("d")).toBe(true); }); + + it("releases failed claims so retries can run again", async () => { + const tracker = createProcessedMessageTracker(); + + await expect( + runWithProcessedMessageClaim({ + tracker, + id: "evt-1", + task: async () => { + throw new Error("boom"); + }, + }), + ).rejects.toThrow("boom"); + + expect(tracker.has("evt-1")).toBe(false); + expect(tracker.claim("evt-1")).toEqual({ kind: "claimed" }); + }); + + it("keeps successful claims deduped", async () => { + const tracker = createProcessedMessageTracker(); + + await expect( + runWithProcessedMessageClaim({ + tracker, + id: "evt-2", + task: async () => undefined, + }), + ).resolves.toEqual({ kind: "processed", value: undefined }); + + expect(tracker.has("evt-2")).toBe(true); + expect(tracker.claim("evt-2")).toEqual({ kind: "duplicate" }); + }); }); diff --git a/extensions/tlon/src/monitor/processed-messages.ts b/extensions/tlon/src/monitor/processed-messages.ts index ba5caa979a7..8aaf56ebf02 100644 --- a/extensions/tlon/src/monitor/processed-messages.ts +++ b/extensions/tlon/src/monitor/processed-messages.ts @@ -1,6 +1,9 @@ import { createDedupeCache } from "../../runtime-api.js"; export type ProcessedMessageTracker = { + claim: (id?: string | null) => { kind: "claimed" } | { kind: "duplicate" }; + commit: (id?: string | null) => void; + release: (id?: string | null) => void; mark: (id?: string | null) => boolean; has: (id?: string | null) => boolean; size: () => number; @@ -8,13 +11,44 @@ export type ProcessedMessageTracker = { export function createProcessedMessageTracker(limit = 2000): ProcessedMessageTracker { const dedupe = createDedupeCache({ ttlMs: 0, maxSize: limit }); + const inFlight = new Set(); - const mark = (id?: string | null) => { + const claim = (id?: string | null) => { const trimmed = id?.trim(); if (!trimmed) { - return true; + return { kind: "claimed" } as const; } - return !dedupe.check(trimmed); + if (inFlight.has(trimmed) || dedupe.peek(trimmed)) { + return { kind: "duplicate" } as const; + } + inFlight.add(trimmed); + return { kind: "claimed" } as const; + }; + + const commit = (id?: string | null) => { + const trimmed = id?.trim(); + if (!trimmed) { + return; + } + inFlight.delete(trimmed); + dedupe.check(trimmed); + }; + + const release = (id?: string | null) => { + const trimmed = id?.trim(); + if (!trimmed) { + return; + } + inFlight.delete(trimmed); + }; + + const mark = (id?: string | null) => { + const claimed = claim(id); + if (claimed.kind === "duplicate") { + return false; + } + commit(id); + return true; }; const has = (id?: string | null) => { @@ -26,8 +60,30 @@ export function createProcessedMessageTracker(limit = 2000): ProcessedMessageTra }; return { + claim, + commit, + release, mark, has, size: () => dedupe.size(), }; } + +export async function runWithProcessedMessageClaim(params: { + tracker: ProcessedMessageTracker; + id?: string | null; + task: () => Promise; +}): Promise<{ kind: "processed"; value: T } | { kind: "duplicate" }> { + const claim = params.tracker.claim(params.id); + if (claim.kind === "duplicate") { + return claim; + } + try { + const value = await params.task(); + params.tracker.commit(params.id); + return { kind: "processed", value }; + } catch (error) { + params.tracker.release(params.id); + throw error; + } +}