fix(tlon): release replay claims after handler failures

This commit is contained in:
Vincent Koc
2026-04-13 16:45:51 +01:00
parent eddcf722da
commit 74b4a08592
3 changed files with 297 additions and 190 deletions

View File

@@ -22,7 +22,10 @@ import { createTlonCitationResolver } from "./cites.js";
import { fetchAllChannels, fetchInitData } from "./discovery.js";
import { cacheMessage, fetchThreadHistory, getChannelHistory } from "./history.js";
import { downloadMessageImages } from "./media.js";
import { createProcessedMessageTracker } from "./processed-messages.js";
import {
createProcessedMessageTracker,
runWithProcessedMessageClaim,
} from "./processed-messages.js";
import {
applyTlonSettingsOverrides,
buildTlonSettingsMigrations,
@@ -725,105 +728,113 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
return;
}
if (!processedTracker.mark(messageId)) {
return;
}
const senderShip = normalizeShip(readString(content, "author") ?? "");
if (!senderShip || senderShip === botShipName) {
return;
}
const rawText = extractMessageText(content.content);
if (!rawText.trim()) {
return;
}
const contentBody = content.content;
const sentAt = readNumber(content, "sent") ?? Date.now();
cacheMessage(nest, {
author: senderShip,
content: rawText,
timestamp: sentAt,
const processed = await runWithProcessedMessageClaim({
tracker: processedTracker,
id: messageId,
});
// Get thread info early for participation check
const seal = isThreadReply ? asRecord(replySet?.seal) : asRecord(set?.seal);
const parentId = readString(seal, "parent-id") ?? readString(seal, "parent") ?? null;
// Check if we should respond:
// 1. Direct mention always triggers response
// 2. Thread replies where we've participated - respond if relevant (let agent decide)
const mentioned = isBotMentioned(rawText, botShipName, botNickname ?? undefined);
const inParticipatedThread = isThreadReply && parentId && participatedThreads.has(parentId);
if (!mentioned && !inParticipatedThread) {
return;
}
// Log why we're responding
if (inParticipatedThread && !mentioned) {
runtime.log?.(`[tlon] Responding to thread we participated in (no mention): ${parentId}`);
}
// Owner is always allowed
if (isOwner(senderShip)) {
runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`);
} else {
const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings);
if (mode === "restricted") {
const normalizedAllowed = allowedShips.map(normalizeShip);
if (!normalizedAllowed.includes(senderShip)) {
// If owner is configured, queue approval request
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "channel",
requestingShip: senderShip,
channelNest: nest,
messagePreview: rawText.substring(0, 100),
originalMessage: {
messageId: messageId ?? "",
messageText: rawText,
messageContent: contentBody,
timestamp: sentAt,
parentId: parentId ?? undefined,
isThreadReply,
},
});
await queueApprovalRequest(approval);
} else {
runtime.log?.(
`[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`,
);
}
task: async () => {
const senderShip = normalizeShip(readString(content, "author") ?? "");
if (!senderShip || senderShip === botShipName) {
return;
}
}
const rawText = extractMessageText(content.content);
if (!rawText.trim()) {
return;
}
const contentBody = content.content;
const sentAt = readNumber(content, "sent") ?? Date.now();
cacheMessage(nest, {
author: senderShip,
content: rawText,
timestamp: sentAt,
id: messageId,
});
// Get thread info early for participation check
const seal = isThreadReply ? asRecord(replySet?.seal) : asRecord(set?.seal);
const parentId = readString(seal, "parent-id") ?? readString(seal, "parent") ?? null;
// Check if we should respond:
// 1. Direct mention always triggers response
// 2. Thread replies where we've participated - respond if relevant (let agent decide)
const mentioned = isBotMentioned(rawText, botShipName, botNickname ?? undefined);
const inParticipatedThread =
isThreadReply && parentId && participatedThreads.has(parentId);
if (!mentioned && !inParticipatedThread) {
return;
}
// Log why we're responding
if (inParticipatedThread && !mentioned) {
runtime.log?.(
`[tlon] Responding to thread we participated in (no mention): ${parentId}`,
);
}
// Owner is always allowed
if (isOwner(senderShip)) {
runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`);
} else {
const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings);
if (mode === "restricted") {
const normalizedAllowed = allowedShips.map(normalizeShip);
if (!normalizedAllowed.includes(senderShip)) {
// If owner is configured, queue approval request
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "channel",
requestingShip: senderShip,
channelNest: nest,
messagePreview: rawText.substring(0, 100),
originalMessage: {
messageId: messageId ?? "",
messageText: rawText,
messageContent: contentBody,
timestamp: sentAt,
parentId: parentId ?? undefined,
isThreadReply,
},
});
await queueApprovalRequest(approval);
} else {
runtime.log?.(
`[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`,
);
}
return;
}
}
}
const messageText = await resolveAuthorizedMessageText({
rawText,
content: contentBody,
authorizedForCites: true,
resolveAllCites,
});
const parsed = parseChannelNest(nest);
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
messageContent: contentBody, // Pass raw content for media extraction
isGroup: true,
channelNest: nest,
hostShip: parsed?.hostShip,
channelName: parsed?.channelName,
timestamp: sentAt,
parentId,
isThreadReply,
});
},
});
if (processed.kind === "duplicate") {
return;
}
const messageText = await resolveAuthorizedMessageText({
rawText,
content: contentBody,
authorizedForCites: true,
resolveAllCites,
});
const parsed = parseChannelNest(nest);
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
messageContent: contentBody, // Pass raw content for media extraction
isGroup: true,
channelNest: nest,
hostShip: parsed?.hostShip,
channelName: parsed?.channelName,
timestamp: sentAt,
parentId,
isThreadReply,
});
} catch (error: unknown) {
runtime.error?.(`[tlon] Error handling channel firehose event: ${formatErrorMessage(error)}`);
}
@@ -906,108 +917,113 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
return;
}
if (!processedTracker.mark(messageId)) {
return;
}
const processed = await runWithProcessedMessageClaim({
tracker: processedTracker,
id: messageId,
task: async () => {
const authorShip = normalizeShip(readString(essay, "author") ?? "");
const partnerShip = extractDmPartnerShip(whom);
const senderShip = partnerShip || authorShip;
const authorShip = normalizeShip(readString(essay, "author") ?? "");
const partnerShip = extractDmPartnerShip(whom);
const senderShip = partnerShip || authorShip;
// Ignore the bot's own outbound DM events.
if (authorShip === botShipName) {
return;
}
if (!senderShip || senderShip === botShipName) {
return;
}
// Ignore the bot's own outbound DM events.
if (authorShip === botShipName) {
return;
}
if (!senderShip || senderShip === botShipName) {
return;
}
// Log mismatch between author and partner for debugging
if (authorShip && partnerShip && authorShip !== partnerShip) {
runtime.log?.(
`[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`,
);
}
// Log mismatch between author and partner for debugging
if (authorShip && partnerShip && authorShip !== partnerShip) {
runtime.log?.(
`[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`,
);
}
const rawText = extractMessageText(essay.content);
if (!rawText.trim()) {
return;
}
const rawText = extractMessageText(essay.content);
if (!rawText.trim()) {
return;
}
// Check if this is the owner sending an approval response
const messageText = rawText;
if (isOwner(senderShip) && isApprovalResponse(messageText)) {
const handled = await handleApprovalResponse(messageText);
if (handled) {
runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`);
return;
}
}
// Check if this is the owner sending an approval response
const messageText = rawText;
if (isOwner(senderShip) && isApprovalResponse(messageText)) {
const handled = await handleApprovalResponse(messageText);
if (handled) {
runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`);
return;
}
}
// Check if this is the owner sending an admin command
if (isOwner(senderShip) && isAdminCommand(messageText)) {
const handled = await handleAdminCommand(messageText);
if (handled) {
runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`);
return;
}
}
// Check if this is the owner sending an admin command
if (isOwner(senderShip) && isAdminCommand(messageText)) {
const handled = await handleAdminCommand(messageText);
if (handled) {
runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`);
return;
}
}
// Owner is always allowed to DM (bypass allowlist)
if (isOwner(senderShip)) {
const resolvedMessageText = await resolveAuthorizedMessageText({
rawText,
content: essay.content,
authorizedForCites: true,
resolveAllCites,
});
runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`);
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText: resolvedMessageText,
messageContent: essay.content,
isGroup: false,
timestamp: readNumber(essay, "sent") ?? Date.now(),
});
return;
}
// For DMs from others, check allowlist
if (!isDmAllowed(senderShip, effectiveDmAllowlist)) {
// If owner is configured, queue approval request
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "dm",
requestingShip: senderShip,
messagePreview: messageText.substring(0, 100),
originalMessage: {
// Owner is always allowed to DM (bypass allowlist)
if (isOwner(senderShip)) {
const resolvedMessageText = await resolveAuthorizedMessageText({
rawText,
content: essay.content,
authorizedForCites: true,
resolveAllCites,
});
runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`);
await processMessage({
messageId: messageId ?? "",
messageText,
senderShip,
messageText: resolvedMessageText,
messageContent: essay.content,
isGroup: false,
timestamp: readNumber(essay, "sent") ?? Date.now(),
},
});
return;
}
// For DMs from others, check allowlist
if (!isDmAllowed(senderShip, effectiveDmAllowlist)) {
// If owner is configured, queue approval request
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "dm",
requestingShip: senderShip,
messagePreview: messageText.substring(0, 100),
originalMessage: {
messageId: messageId ?? "",
messageText,
messageContent: essay.content,
timestamp: readNumber(essay, "sent") ?? Date.now(),
},
});
await queueApprovalRequest(approval);
} else {
runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`);
}
return;
}
await processMessage({
messageText: await resolveAuthorizedMessageText({
rawText,
content: essay.content,
authorizedForCites: true,
resolveAllCites,
}),
messageId: messageId ?? "",
senderShip,
messageContent: essay.content, // Pass raw content for media extraction
isGroup: false,
timestamp: readNumber(essay, "sent") ?? Date.now(),
});
await queueApprovalRequest(approval);
} else {
runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`);
}
},
});
if (processed.kind === "duplicate") {
return;
}
await processMessage({
messageText: await resolveAuthorizedMessageText({
rawText,
content: essay.content,
authorizedForCites: true,
resolveAllCites,
}),
messageId: messageId ?? "",
senderShip,
messageContent: essay.content, // Pass raw content for media extraction
isGroup: false,
timestamp: readNumber(essay, "sent") ?? Date.now(),
});
} catch (error: unknown) {
runtime.error?.(`[tlon] Error handling chat firehose event: ${formatErrorMessage(error)}`);
}

View File

@@ -1,5 +1,8 @@
import { describe, expect, it } from "vitest";
import { createProcessedMessageTracker } from "./processed-messages.js";
import {
createProcessedMessageTracker,
runWithProcessedMessageClaim,
} from "./processed-messages.js";
describe("createProcessedMessageTracker", () => {
it("dedupes and evicts oldest entries", () => {
@@ -20,4 +23,36 @@ describe("createProcessedMessageTracker", () => {
expect(tracker.has("c")).toBe(true);
expect(tracker.has("d")).toBe(true);
});
it("releases failed claims so retries can run again", async () => {
const tracker = createProcessedMessageTracker();
await expect(
runWithProcessedMessageClaim({
tracker,
id: "evt-1",
task: async () => {
throw new Error("boom");
},
}),
).rejects.toThrow("boom");
expect(tracker.has("evt-1")).toBe(false);
expect(tracker.claim("evt-1")).toEqual({ kind: "claimed" });
});
it("keeps successful claims deduped", async () => {
const tracker = createProcessedMessageTracker();
await expect(
runWithProcessedMessageClaim({
tracker,
id: "evt-2",
task: async () => undefined,
}),
).resolves.toEqual({ kind: "processed", value: undefined });
expect(tracker.has("evt-2")).toBe(true);
expect(tracker.claim("evt-2")).toEqual({ kind: "duplicate" });
});
});

View File

@@ -1,6 +1,9 @@
import { createDedupeCache } from "../../runtime-api.js";
export type ProcessedMessageTracker = {
claim: (id?: string | null) => { kind: "claimed" } | { kind: "duplicate" };
commit: (id?: string | null) => void;
release: (id?: string | null) => void;
mark: (id?: string | null) => boolean;
has: (id?: string | null) => boolean;
size: () => number;
@@ -8,13 +11,44 @@ export type ProcessedMessageTracker = {
export function createProcessedMessageTracker(limit = 2000): ProcessedMessageTracker {
const dedupe = createDedupeCache({ ttlMs: 0, maxSize: limit });
const inFlight = new Set<string>();
const mark = (id?: string | null) => {
const claim = (id?: string | null) => {
const trimmed = id?.trim();
if (!trimmed) {
return true;
return { kind: "claimed" } as const;
}
return !dedupe.check(trimmed);
if (inFlight.has(trimmed) || dedupe.peek(trimmed)) {
return { kind: "duplicate" } as const;
}
inFlight.add(trimmed);
return { kind: "claimed" } as const;
};
const commit = (id?: string | null) => {
const trimmed = id?.trim();
if (!trimmed) {
return;
}
inFlight.delete(trimmed);
dedupe.check(trimmed);
};
const release = (id?: string | null) => {
const trimmed = id?.trim();
if (!trimmed) {
return;
}
inFlight.delete(trimmed);
};
const mark = (id?: string | null) => {
const claimed = claim(id);
if (claimed.kind === "duplicate") {
return false;
}
commit(id);
return true;
};
const has = (id?: string | null) => {
@@ -26,8 +60,30 @@ export function createProcessedMessageTracker(limit = 2000): ProcessedMessageTra
};
return {
claim,
commit,
release,
mark,
has,
size: () => dedupe.size(),
};
}
export async function runWithProcessedMessageClaim<T>(params: {
tracker: ProcessedMessageTracker;
id?: string | null;
task: () => Promise<T>;
}): Promise<{ kind: "processed"; value: T } | { kind: "duplicate" }> {
const claim = params.tracker.claim(params.id);
if (claim.kind === "duplicate") {
return claim;
}
try {
const value = await params.task();
params.tracker.commit(params.id);
return { kind: "processed", value };
} catch (error) {
params.tracker.release(params.id);
throw error;
}
}