Files
openclaw/extensions/telegram/src/bot-update-tracker.ts
Peter Steinberger 1507a9701b refactor: centralize inbound supplemental context
* refactor: centralize inbound supplemental context

* refactor: trim supplemental finalizer typing

* docs: clarify supplemental context projection

* refactor: move inbound finalization into core

* refactor: simplify channel inbound facts

* refactor: fold supplemental media into inbound finalizer

* refactor: migrate channel inbound callers to builder

* docs: mark inbound finalizer compat types deprecated

* refactor: wire runtime turn context builder

* refactor: replace channel turn runtime API

* fix: respect discord quote visibility

* fix: avoid deprecated line dispatch helper

* refactor: deprecate channel message SDK seams

* docs: trim channel outbound SDK page

* test: migrate irc inbound assertion

* refactor: deprecate outbound SDK facades

* refactor: deprecate channel helper SDK facades

* refactor: deprecate channel streaming SDK facade

* refactor: move direct dm helpers into inbound SDK

* chore: mark legacy test-utils SDK alias deprecated

* refactor: remove unused allow-from read helper

* refactor: route remaining channel dispatch through core

* refactor: enforce modern extension SDK imports

* test: give slow image root tests more time

* ci: support node fallback on windows

* fix: add transcripts tool display metadata

* refactor: trim legacy channel test seams

* fix: preserve channel compat after rebase

* fix: keep deprecated channel inbound aliases

* fix: preserve discord thread context visibility

* fix: clean final rebase conflicts

* fix: preserve channel message dispatch aliases

* fix: sync channel refactor after rebase

* fix: sync channel refactor after latest main

* fix: dedupe memory-core subagent mock

* test: align clickclack inbound dispatch assertions

* fix: sync plugin sdk api hash after rebase

* fix: sync channel refactor after latest main

* fix: sync plugin sdk api hash after rebase

* fix: sync plugin sdk api hash after latest main

* test: remove stale inbound context awaits
2026-05-27 09:26:06 +01:00

302 lines
9.3 KiB
TypeScript

import {
createMessageReceiveContext,
type MessageAckPolicy,
type MessageReceiveContext,
} from "openclaw/plugin-sdk/channel-outbound";
import {
buildTelegramUpdateKey,
createTelegramUpdateDedupe,
resolveTelegramUpdateId,
type TelegramUpdateKeyContext,
} from "./bot-updates.js";
type PersistUpdateId = (updateId: number) => void | Promise<void>;
type TelegramUpdateTrackerOptions = {
initialUpdateId?: number | null;
persistenceFloorUpdateId?: number | null;
ackPolicy?: MessageAckPolicy;
onAcceptedUpdateId?: PersistUpdateId;
onPersistError?: (error: unknown) => void;
onSkip?: (key: string) => void;
};
type AcceptedTelegramUpdate = {
key?: string;
updateId?: number;
receiveContext?: MessageReceiveContext<TelegramUpdateKeyContext>;
};
type BeginUpdateResult =
| {
accepted: true;
update: AcceptedTelegramUpdate;
}
| {
accepted: false;
reason: "accepted-watermark" | "semantic-dedupe";
};
type FinishUpdateOptions = {
completed: boolean;
};
export type TelegramUpdateTrackerState = {
highestAcceptedUpdateId: number | null;
highestPersistedAcceptedUpdateId: number | null;
highestCompletedUpdateId: number | null;
safeCompletedUpdateId: number | null;
pendingUpdateIds: number[];
failedUpdateIds: number[];
};
function sortedIds(ids: Set<number>): number[] {
return [...ids].toSorted((a, b) => a - b);
}
export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOptions = {}) {
const initialUpdateId =
typeof options.initialUpdateId === "number" ? options.initialUpdateId : null;
const persistenceFloorUpdateId =
typeof options.persistenceFloorUpdateId === "number"
? options.persistenceFloorUpdateId
: initialUpdateId;
const ackPolicy = options.ackPolicy ?? "after_receive_record";
const recentUpdates = createTelegramUpdateDedupe();
const pendingUpdateKeys = new Set<string>();
const activeHandledUpdateKeys = new Map<string, boolean>();
const pendingUpdateIds = new Set<number>();
const failedUpdateIds = new Set<number>();
const completedFloorReplayUpdateIds = new Set<number>();
let highestAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistedAcceptedUpdateId: number | null = persistenceFloorUpdateId;
let highestPersistenceRequestedUpdateId: number | null = persistenceFloorUpdateId;
let highestCompletedUpdateId: number | null = persistenceFloorUpdateId;
let persistInFlight = false;
let persistTargetUpdateId: number | null = null;
const skip = (key: string) => {
options.onSkip?.(key);
};
const drainPersistQueue = async () => {
const persist = options.onAcceptedUpdateId;
if (persistInFlight || typeof persist !== "function") {
return;
}
persistInFlight = true;
try {
while (persistTargetUpdateId !== null) {
const updateId = persistTargetUpdateId;
persistTargetUpdateId = null;
try {
await persist(updateId);
if (
highestPersistedAcceptedUpdateId === null ||
updateId > highestPersistedAcceptedUpdateId
) {
highestPersistedAcceptedUpdateId = updateId;
}
} catch (err) {
options.onPersistError?.(err);
}
}
} finally {
persistInFlight = false;
}
};
const requestPersistAcceptedUpdateId = (updateId: number) => {
if (typeof options.onAcceptedUpdateId !== "function") {
return;
}
if (
highestPersistenceRequestedUpdateId !== null &&
updateId <= highestPersistenceRequestedUpdateId
) {
return;
}
highestPersistenceRequestedUpdateId = updateId;
persistTargetUpdateId = updateId;
void drainPersistQueue().catch((err) => {
options.onPersistError?.(err);
});
};
const acceptUpdateId = (updateId: number) => {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
return;
}
highestAcceptedUpdateId = updateId;
};
const isFloorReplayUpdateId = (updateId: number) =>
initialUpdateId === null &&
persistenceFloorUpdateId !== null &&
updateId <= persistenceFloorUpdateId;
function resolveSafeCompletedUpdateId() {
if (highestCompletedUpdateId === null) {
return null;
}
let safeCompletedUpdateId = highestCompletedUpdateId;
for (const updateId of pendingUpdateIds) {
if (persistenceFloorUpdateId !== null && updateId <= persistenceFloorUpdateId) {
continue;
}
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
for (const updateId of failedUpdateIds) {
if (persistenceFloorUpdateId !== null && updateId <= persistenceFloorUpdateId) {
continue;
}
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
return safeCompletedUpdateId;
}
const persistUpdateIdAfterAck = async (updateId: number) => {
const persistUpdateId =
ackPolicy === "after_agent_dispatch" ? resolveSafeCompletedUpdateId() : updateId;
if (persistUpdateId !== null) {
requestPersistAcceptedUpdateId(persistUpdateId);
}
};
const ackUpdateAfterStage = (
receiveContext: MessageReceiveContext<TelegramUpdateKeyContext> | undefined,
stage: "receive_record" | "agent_dispatch",
) => {
if (!receiveContext?.shouldAckAfter(stage)) {
return;
}
void receiveContext.ack().catch((err) => {
options.onPersistError?.(err);
});
};
const beginUpdate = (ctx: TelegramUpdateKeyContext): BeginUpdateResult => {
const updateId = resolveTelegramUpdateId(ctx);
const updateKey = buildTelegramUpdateKey(ctx);
if (typeof updateId === "number") {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
const floorReplay = isFloorReplayUpdateId(updateId);
if (!floorReplay && !failedUpdateIds.has(updateId)) {
skip(`update:${updateId}`);
return { accepted: false, reason: "accepted-watermark" };
}
if (floorReplay && completedFloorReplayUpdateIds.has(updateId)) {
skip(`update:${updateId}`);
return { accepted: false, reason: "accepted-watermark" };
}
} else {
failedUpdateIds.delete(updateId);
}
}
if (updateKey) {
if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) {
skip(updateKey);
return { accepted: false, reason: "semantic-dedupe" };
}
pendingUpdateKeys.add(updateKey);
activeHandledUpdateKeys.set(updateKey, false);
}
let receiveContext: MessageReceiveContext<TelegramUpdateKeyContext> | undefined;
if (typeof updateId === "number") {
pendingUpdateIds.add(updateId);
acceptUpdateId(updateId);
receiveContext = createMessageReceiveContext({
id: updateKey ?? `telegram:update:${updateId}`,
channel: "telegram",
message: ctx,
ackPolicy,
onAck: () => persistUpdateIdAfterAck(updateId),
});
ackUpdateAfterStage(receiveContext, "receive_record");
}
return {
accepted: true,
update: {
...(updateKey ? { key: updateKey } : {}),
...(typeof updateId === "number" ? { updateId } : {}),
...(receiveContext ? { receiveContext } : {}),
},
};
};
const finishUpdate = (update: AcceptedTelegramUpdate, finish: FinishUpdateOptions) => {
if (update.key) {
activeHandledUpdateKeys.delete(update.key);
if (finish.completed) {
recentUpdates.check(update.key);
}
pendingUpdateKeys.delete(update.key);
}
if (typeof update.updateId === "number") {
pendingUpdateIds.delete(update.updateId);
if (finish.completed) {
failedUpdateIds.delete(update.updateId);
if (isFloorReplayUpdateId(update.updateId)) {
completedFloorReplayUpdateIds.add(update.updateId);
}
if (highestCompletedUpdateId === null || update.updateId > highestCompletedUpdateId) {
highestCompletedUpdateId = update.updateId;
}
ackUpdateAfterStage(update.receiveContext, "agent_dispatch");
} else {
failedUpdateIds.add(update.updateId);
void update.receiveContext
?.nack(new Error("Telegram update handler did not complete"))
.catch((err) => {
options.onPersistError?.(err);
});
}
}
};
const shouldSkipHandlerDispatch = (ctx: TelegramUpdateKeyContext) => {
const updateId = resolveTelegramUpdateId(ctx);
if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) {
return true;
}
const key = buildTelegramUpdateKey(ctx);
if (!key) {
return false;
}
const handled = activeHandledUpdateKeys.get(key);
if (handled != null) {
if (handled) {
skip(key);
return true;
}
activeHandledUpdateKeys.set(key, true);
return false;
}
const skipped = recentUpdates.check(key);
if (skipped) {
skip(key);
}
return skipped;
};
const getState = (): TelegramUpdateTrackerState => ({
highestAcceptedUpdateId,
highestPersistedAcceptedUpdateId,
highestCompletedUpdateId,
safeCompletedUpdateId: resolveSafeCompletedUpdateId(),
pendingUpdateIds: sortedIds(pendingUpdateIds),
failedUpdateIds: sortedIds(failedUpdateIds),
});
return {
beginUpdate,
finishUpdate,
getState,
shouldSkipHandlerDispatch,
};
}