mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-04 04:00:24 +00:00
fix: defer plugin runtime globals until use
This commit is contained in:
@@ -77,17 +77,19 @@ type TelegramThreadBindingsState = {
|
||||
*/
|
||||
const TELEGRAM_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.telegramThreadBindingsState");
|
||||
|
||||
const threadBindingsState = resolveGlobalSingleton<TelegramThreadBindingsState>(
|
||||
TELEGRAM_THREAD_BINDINGS_STATE_KEY,
|
||||
() => ({
|
||||
managersByAccountId: new Map<string, TelegramThreadBindingManager>(),
|
||||
bindingsByAccountConversation: new Map<string, TelegramThreadBindingRecord>(),
|
||||
persistQueueByAccountId: new Map<string, Promise<void>>(),
|
||||
}),
|
||||
);
|
||||
const MANAGERS_BY_ACCOUNT_ID = threadBindingsState.managersByAccountId;
|
||||
const BINDINGS_BY_ACCOUNT_CONVERSATION = threadBindingsState.bindingsByAccountConversation;
|
||||
const PERSIST_QUEUE_BY_ACCOUNT_ID = threadBindingsState.persistQueueByAccountId;
|
||||
let threadBindingsState: TelegramThreadBindingsState | undefined;
|
||||
|
||||
function getThreadBindingsState(): TelegramThreadBindingsState {
|
||||
threadBindingsState ??= resolveGlobalSingleton<TelegramThreadBindingsState>(
|
||||
TELEGRAM_THREAD_BINDINGS_STATE_KEY,
|
||||
() => ({
|
||||
managersByAccountId: new Map<string, TelegramThreadBindingManager>(),
|
||||
bindingsByAccountConversation: new Map<string, TelegramThreadBindingRecord>(),
|
||||
persistQueueByAccountId: new Map<string, Promise<void>>(),
|
||||
}),
|
||||
);
|
||||
return threadBindingsState;
|
||||
}
|
||||
|
||||
function normalizeDurationMs(raw: unknown, fallback: number): number {
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
@@ -168,7 +170,7 @@ function fromSessionBindingInput(params: {
|
||||
}): TelegramThreadBindingRecord {
|
||||
const now = Date.now();
|
||||
const metadata = params.input.metadata ?? {};
|
||||
const existing = BINDINGS_BY_ACCOUNT_CONVERSATION.get(
|
||||
const existing = getThreadBindingsState().bindingsByAccountConversation.get(
|
||||
resolveBindingKey({
|
||||
accountId: params.accountId,
|
||||
conversationId: params.input.conversationId,
|
||||
@@ -310,7 +312,7 @@ async function persistBindingsToDisk(params: {
|
||||
version: STORE_VERSION,
|
||||
bindings:
|
||||
params.bindings ??
|
||||
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
|
||||
[...getThreadBindingsState().bindingsByAccountConversation.values()].filter(
|
||||
(entry) => entry.accountId === params.accountId,
|
||||
),
|
||||
};
|
||||
@@ -322,7 +324,7 @@ async function persistBindingsToDisk(params: {
|
||||
}
|
||||
|
||||
function listBindingsForAccount(accountId: string): TelegramThreadBindingRecord[] {
|
||||
return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
|
||||
return [...getThreadBindingsState().bindingsByAccountConversation.values()].filter(
|
||||
(entry) => entry.accountId === accountId,
|
||||
);
|
||||
}
|
||||
@@ -335,16 +337,17 @@ function enqueuePersistBindings(params: {
|
||||
if (!params.persist) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
const previous = PERSIST_QUEUE_BY_ACCOUNT_ID.get(params.accountId) ?? Promise.resolve();
|
||||
const previous =
|
||||
getThreadBindingsState().persistQueueByAccountId.get(params.accountId) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.catch(() => undefined)
|
||||
.then(async () => {
|
||||
await persistBindingsToDisk(params);
|
||||
});
|
||||
PERSIST_QUEUE_BY_ACCOUNT_ID.set(params.accountId, next);
|
||||
getThreadBindingsState().persistQueueByAccountId.set(params.accountId, next);
|
||||
void next.finally(() => {
|
||||
if (PERSIST_QUEUE_BY_ACCOUNT_ID.get(params.accountId) === next) {
|
||||
PERSIST_QUEUE_BY_ACCOUNT_ID.delete(params.accountId);
|
||||
if (getThreadBindingsState().persistQueueByAccountId.get(params.accountId) === next) {
|
||||
getThreadBindingsState().persistQueueByAccountId.delete(params.accountId);
|
||||
}
|
||||
});
|
||||
return next;
|
||||
@@ -412,7 +415,7 @@ export function createTelegramThreadBindingManager(
|
||||
} = {},
|
||||
): TelegramThreadBindingManager {
|
||||
const accountId = normalizeAccountId(params.accountId);
|
||||
const existing = MANAGERS_BY_ACCOUNT_ID.get(accountId);
|
||||
const existing = getThreadBindingsState().managersByAccountId.get(accountId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
@@ -430,7 +433,7 @@ export function createTelegramThreadBindingManager(
|
||||
accountId,
|
||||
conversationId: entry.conversationId,
|
||||
});
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, {
|
||||
getThreadBindingsState().bindingsByAccountConversation.set(key, {
|
||||
...entry,
|
||||
accountId,
|
||||
});
|
||||
@@ -448,7 +451,7 @@ export function createTelegramThreadBindingManager(
|
||||
if (!conversationId) {
|
||||
return undefined;
|
||||
}
|
||||
return BINDINGS_BY_ACCOUNT_CONVERSATION.get(
|
||||
return getThreadBindingsState().bindingsByAccountConversation.get(
|
||||
resolveBindingKey({
|
||||
accountId,
|
||||
conversationId,
|
||||
@@ -471,7 +474,7 @@ export function createTelegramThreadBindingManager(
|
||||
return null;
|
||||
}
|
||||
const key = resolveBindingKey({ accountId, conversationId });
|
||||
const existing = BINDINGS_BY_ACCOUNT_CONVERSATION.get(key);
|
||||
const existing = getThreadBindingsState().bindingsByAccountConversation.get(key);
|
||||
if (!existing) {
|
||||
return null;
|
||||
}
|
||||
@@ -479,7 +482,7 @@ export function createTelegramThreadBindingManager(
|
||||
...existing,
|
||||
lastActivityAt: normalizeTimestampMs(at ?? Date.now()),
|
||||
};
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, nextRecord);
|
||||
getThreadBindingsState().bindingsByAccountConversation.set(key, nextRecord);
|
||||
persistBindingsSafely({
|
||||
accountId,
|
||||
persist: manager.shouldPersistMutations(),
|
||||
@@ -494,11 +497,11 @@ export function createTelegramThreadBindingManager(
|
||||
return null;
|
||||
}
|
||||
const key = resolveBindingKey({ accountId, conversationId });
|
||||
const removed = BINDINGS_BY_ACCOUNT_CONVERSATION.get(key) ?? null;
|
||||
const removed = getThreadBindingsState().bindingsByAccountConversation.get(key) ?? null;
|
||||
if (!removed) {
|
||||
return null;
|
||||
}
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key);
|
||||
getThreadBindingsState().bindingsByAccountConversation.delete(key);
|
||||
persistBindingsSafely({
|
||||
accountId,
|
||||
persist: manager.shouldPersistMutations(),
|
||||
@@ -521,7 +524,7 @@ export function createTelegramThreadBindingManager(
|
||||
accountId,
|
||||
conversationId: entry.conversationId,
|
||||
});
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key);
|
||||
getThreadBindingsState().bindingsByAccountConversation.delete(key);
|
||||
removed.push(entry);
|
||||
}
|
||||
if (removed.length > 0) {
|
||||
@@ -540,9 +543,9 @@ export function createTelegramThreadBindingManager(
|
||||
sweepTimer = null;
|
||||
}
|
||||
unregisterSessionBindingAdapter({ channel: "telegram", accountId });
|
||||
const existingManager = MANAGERS_BY_ACCOUNT_ID.get(accountId);
|
||||
const existingManager = getThreadBindingsState().managersByAccountId.get(accountId);
|
||||
if (existingManager === manager) {
|
||||
MANAGERS_BY_ACCOUNT_ID.delete(accountId);
|
||||
getThreadBindingsState().managersByAccountId.delete(accountId);
|
||||
}
|
||||
},
|
||||
};
|
||||
@@ -574,7 +577,7 @@ export function createTelegramThreadBindingManager(
|
||||
metadata: input.metadata,
|
||||
},
|
||||
});
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(
|
||||
getThreadBindingsState().bindingsByAccountConversation.set(
|
||||
resolveBindingKey({ accountId, conversationId }),
|
||||
record,
|
||||
);
|
||||
@@ -714,14 +717,14 @@ export function createTelegramThreadBindingManager(
|
||||
sweepTimer.unref?.();
|
||||
}
|
||||
|
||||
MANAGERS_BY_ACCOUNT_ID.set(accountId, manager);
|
||||
getThreadBindingsState().managersByAccountId.set(accountId, manager);
|
||||
return manager;
|
||||
}
|
||||
|
||||
export function getTelegramThreadBindingManager(
|
||||
accountId?: string,
|
||||
): TelegramThreadBindingManager | null {
|
||||
return MANAGERS_BY_ACCOUNT_ID.get(normalizeAccountId(accountId)) ?? null;
|
||||
return getThreadBindingsState().managersByAccountId.get(normalizeAccountId(accountId)) ?? null;
|
||||
}
|
||||
|
||||
function updateTelegramBindingsBySessionKey(params: {
|
||||
@@ -741,7 +744,7 @@ function updateTelegramBindingsBySessionKey(params: {
|
||||
conversationId: entry.conversationId,
|
||||
});
|
||||
const next = params.update(entry, now);
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, next);
|
||||
getThreadBindingsState().bindingsByAccountConversation.set(key, next);
|
||||
updated.push(next);
|
||||
}
|
||||
if (updated.length > 0) {
|
||||
@@ -799,12 +802,12 @@ export function setTelegramThreadBindingMaxAgeBySessionKey(params: {
|
||||
|
||||
export const __testing = {
|
||||
async resetTelegramThreadBindingsForTests() {
|
||||
for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) {
|
||||
for (const manager of getThreadBindingsState().managersByAccountId.values()) {
|
||||
manager.stop();
|
||||
}
|
||||
await Promise.allSettled(PERSIST_QUEUE_BY_ACCOUNT_ID.values());
|
||||
PERSIST_QUEUE_BY_ACCOUNT_ID.clear();
|
||||
MANAGERS_BY_ACCOUNT_ID.clear();
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.clear();
|
||||
await Promise.allSettled(getThreadBindingsState().persistQueueByAccountId.values());
|
||||
getThreadBindingsState().persistQueueByAccountId.clear();
|
||||
getThreadBindingsState().managersByAccountId.clear();
|
||||
getThreadBindingsState().bindingsByAccountConversation.clear();
|
||||
},
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user