diff --git a/src/channels/registry.ts b/src/channels/registry.ts index 162c1088745..a9359918b3c 100644 --- a/src/channels/registry.ts +++ b/src/channels/registry.ts @@ -1,4 +1,7 @@ -import { getActivePluginChannelRegistry, getActivePluginRegistry } from "../plugins/runtime.js"; +import { + getActivePluginChannelRegistryFromState, + getPluginRegistryState, +} from "../plugins/runtime-state.js"; import { normalizeOptionalLowercaseString, normalizeOptionalString, @@ -23,11 +26,11 @@ type RegisteredChannelPluginEntry = { }; function listRegisteredChannelPluginEntries(): RegisteredChannelPluginEntry[] { - const channelRegistry = getActivePluginChannelRegistry(); + const channelRegistry = getActivePluginChannelRegistryFromState(); if (channelRegistry && channelRegistry.channels && channelRegistry.channels.length > 0) { return channelRegistry.channels; } - return getActivePluginRegistry()?.channels ?? []; + return getPluginRegistryState()?.activeRegistry?.channels ?? []; } function findRegisteredChannelPluginEntry( diff --git a/src/config/group-policy.ts b/src/config/group-policy.ts index 9b03e62f009..0f57554a0fa 100644 --- a/src/config/group-policy.ts +++ b/src/config/group-policy.ts @@ -5,7 +5,7 @@ import { normalizeLowercaseStringOrEmpty, normalizeOptionalString, } from "../shared/string-coerce.js"; -import type { OpenClawConfig } from "./config.js"; +import type { OpenClawConfig } from "./types.openclaw.js"; import { parseToolsBySenderTypedKey, type GroupToolPolicyBySenderConfig, diff --git a/src/config/sessions/runtime-types.ts b/src/config/sessions/runtime-types.ts new file mode 100644 index 00000000000..ad415929c31 --- /dev/null +++ b/src/config/sessions/runtime-types.ts @@ -0,0 +1,28 @@ +import type { MsgContext } from "../../auto-reply/templating.js"; +import type { DeliveryContext } from "../../utils/delivery-context.shared.js"; +import type { SessionEntry, GroupKeyResolution } from "./types.js"; + +export type ReadSessionUpdatedAt = (params: { + storePath: string; + sessionKey: string; +}) => number | undefined; + +export type RecordSessionMetaFromInbound = (params: { + storePath: string; + sessionKey: string; + ctx: MsgContext; + groupResolution?: GroupKeyResolution | null; + createIfMissing?: boolean; +}) => Promise; + +export type UpdateLastRoute = (params: { + storePath: string; + sessionKey: string; + channel?: SessionEntry["lastChannel"]; + to?: string; + accountId?: string; + threadId?: string | number; + deliveryContext?: DeliveryContext; + ctx?: MsgContext; + groupResolution?: GroupKeyResolution | null; +}) => Promise; diff --git a/src/plugins/runtime-state.ts b/src/plugins/runtime-state.ts index da376ef9bcf..fdb6de5909c 100644 --- a/src/plugins/runtime-state.ts +++ b/src/plugins/runtime-state.ts @@ -1,15 +1,35 @@ -import type { PluginRegistry } from "./registry.js"; - export const PLUGIN_REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState"); +export type RuntimeTrackedPluginRecord = { + id: string; + status?: string; + format?: string; +}; + +export type RuntimeTrackedChannelEntry = { + plugin: { + id?: string | null; + meta?: { + aliases?: string[]; + markdownCapable?: boolean; + } | null; + }; +}; + +export type RuntimeTrackedPluginRegistry = { + plugins: RuntimeTrackedPluginRecord[]; + httpRoutes?: unknown[]; + channels?: RuntimeTrackedChannelEntry[]; +}; + export type RegistrySurfaceState = { - registry: PluginRegistry | null; + registry: RuntimeTrackedPluginRegistry | null; pinned: boolean; version: number; }; export type RegistryState = { - activeRegistry: PluginRegistry | null; + activeRegistry: RuntimeTrackedPluginRegistry | null; activeVersion: number; httpRoute: RegistrySurfaceState; channel: RegistrySurfaceState; @@ -27,7 +47,7 @@ export function getPluginRegistryState(): RegistryState | undefined { return (globalThis as GlobalRegistryState)[PLUGIN_REGISTRY_STATE]; } -export function getActivePluginChannelRegistryFromState(): PluginRegistry | null { +export function getActivePluginChannelRegistryFromState(): RuntimeTrackedPluginRegistry | null { const state = getPluginRegistryState(); return state?.channel.registry ?? state?.activeRegistry ?? null; } diff --git a/src/plugins/runtime.ts b/src/plugins/runtime.ts index 4140854ea39..68b24115101 100644 --- a/src/plugins/runtime.ts +++ b/src/plugins/runtime.ts @@ -6,6 +6,10 @@ import { type RegistrySurfaceState, } from "./runtime-state.js"; +function asPluginRegistry(registry: RegistryState["activeRegistry"]): PluginRegistry | null { + return registry as PluginRegistry | null; +} + const state: RegistryState = (() => { const globalState = globalThis as typeof globalThis & { [PLUGIN_REGISTRY_STATE]?: RegistryState; @@ -85,7 +89,7 @@ export function setActivePluginRegistry( } export function getActivePluginRegistry(): PluginRegistry | null { - return state.activeRegistry; + return asPluginRegistry(state.activeRegistry); } export function getActivePluginRegistryWorkspaceDir(): string | undefined { @@ -114,7 +118,7 @@ export function releasePinnedPluginHttpRouteRegistry(registry?: PluginRegistry) } export function getActivePluginHttpRouteRegistry(): PluginRegistry | null { - return state.httpRoute.registry ?? state.activeRegistry; + return asPluginRegistry(state.httpRoute.registry ?? state.activeRegistry); } export function getActivePluginHttpRouteRegistryVersion(): number { @@ -163,7 +167,7 @@ export function releasePinnedPluginChannelRegistry(registry?: PluginRegistry) { * When pinned, this returns the startup registry regardless of subsequent * `setActivePluginRegistry` calls. */ export function getActivePluginChannelRegistry(): PluginRegistry | null { - return state.channel.registry ?? state.activeRegistry; + return asPluginRegistry(state.channel.registry ?? state.activeRegistry); } export function getActivePluginChannelRegistryVersion(): number { diff --git a/src/plugins/runtime/runtime-taskflow.ts b/src/plugins/runtime/runtime-taskflow.ts index 7d8b00337e6..f5e33df137a 100644 --- a/src/plugins/runtime/runtime-taskflow.ts +++ b/src/plugins/runtime/runtime-taskflow.ts @@ -28,7 +28,7 @@ import type { TaskRegistrySummary, TaskRuntime, } from "../../tasks/task-registry.types.js"; -import { normalizeDeliveryContext } from "../../utils/delivery-context.js"; +import { normalizeDeliveryContext } from "../../utils/delivery-context.shared.js"; import type { OpenClawPluginToolContext } from "../tool-types.js"; export type ManagedTaskFlowRecord = TaskFlowRecord & { diff --git a/src/plugins/runtime/runtime-tasks.ts b/src/plugins/runtime/runtime-tasks.ts index ddc3f2d246d..a1ba7a130db 100644 --- a/src/plugins/runtime/runtime-tasks.ts +++ b/src/plugins/runtime/runtime-tasks.ts @@ -20,7 +20,7 @@ import { listTasksForRelatedSessionKeyForOwner, resolveTaskForLookupTokenForOwner, } from "../../tasks/task-owner-access.js"; -import { normalizeDeliveryContext } from "../../utils/delivery-context.js"; +import { normalizeDeliveryContext } from "../../utils/delivery-context.shared.js"; import type { OpenClawPluginToolContext } from "../tool-types.js"; import type { PluginRuntimeTaskFlow } from "./runtime-taskflow.js"; import type { diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index 324aa637071..b575426d40f 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -9,6 +9,10 @@ type ReadChannelAllowFromStore = typeof import("../../pairing/pairing-store.js").readChannelAllowFromStore; type UpsertChannelPairingRequest = typeof import("../../pairing/pairing-store.js").upsertChannelPairingRequest; +type ReadSessionUpdatedAt = import("../../config/sessions/runtime-types.js").ReadSessionUpdatedAt; +type RecordSessionMetaFromInbound = + import("../../config/sessions/runtime-types.js").RecordSessionMetaFromInbound; +type UpdateLastRoute = import("../../config/sessions/runtime-types.js").UpdateLastRoute; type ReadChannelAllowFromStoreForAccount = (params: { channel: Parameters[0]; @@ -105,11 +109,11 @@ export type PluginRuntimeChannel = { get: typeof import("../../infra/channel-activity.js").getChannelActivity; }; session: { - resolveStorePath: typeof import("../../config/sessions.js").resolveStorePath; - readSessionUpdatedAt: typeof import("../../config/sessions.js").readSessionUpdatedAt; - recordSessionMetaFromInbound: typeof import("../../config/sessions.js").recordSessionMetaFromInbound; + resolveStorePath: typeof import("../../config/sessions/paths.js").resolveStorePath; + readSessionUpdatedAt: ReadSessionUpdatedAt; + recordSessionMetaFromInbound: RecordSessionMetaFromInbound; recordInboundSession: typeof import("../../channels/session.js").recordInboundSession; - updateLastRoute: typeof import("../../config/sessions.js").updateLastRoute; + updateLastRoute: UpdateLastRoute; }; mentions: { buildMentionRegexes: typeof import("../../auto-reply/reply/mentions.js").buildMentionRegexes; diff --git a/src/plugins/runtime/types-core.ts b/src/plugins/runtime/types-core.ts index 25469e42052..e842ecaa1a6 100644 --- a/src/plugins/runtime/types-core.ts +++ b/src/plugins/runtime/types-core.ts @@ -54,10 +54,10 @@ export type PluginRuntimeCore = { resolveAgentTimeoutMs: typeof import("../../agents/timeout.js").resolveAgentTimeoutMs; ensureAgentWorkspace: typeof import("../../agents/workspace.js").ensureAgentWorkspace; session: { - resolveStorePath: typeof import("../../config/sessions.js").resolveStorePath; - loadSessionStore: typeof import("../../config/sessions.js").loadSessionStore; - saveSessionStore: typeof import("../../config/sessions.js").saveSessionStore; - resolveSessionFilePath: typeof import("../../config/sessions.js").resolveSessionFilePath; + resolveStorePath: typeof import("../../config/sessions/paths.js").resolveStorePath; + loadSessionStore: typeof import("../../config/sessions/store-load.js").loadSessionStore; + saveSessionStore: typeof import("../../config/sessions/store.js").saveSessionStore; + resolveSessionFilePath: typeof import("../../config/sessions/paths.js").resolveSessionFilePath; }; }; system: { diff --git a/src/plugins/tool-types.ts b/src/plugins/tool-types.ts index 263a5cc6965..217d62b264a 100644 --- a/src/plugins/tool-types.ts +++ b/src/plugins/tool-types.ts @@ -2,7 +2,7 @@ import type { ToolFsPolicy } from "../agents/tool-fs-policy.js"; import type { AnyAgentTool } from "../agents/tools/common.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { HookEntry } from "../hooks/types.js"; -import type { DeliveryContext } from "../utils/delivery-context.js"; +import type { DeliveryContext } from "../utils/delivery-context.shared.js"; /** Trusted execution context passed to plugin-owned agent tool factories. */ export type OpenClawPluginToolContext = { diff --git a/src/tasks/task-flow-registry.types.ts b/src/tasks/task-flow-registry.types.ts index f1fed9b7f1e..7f38a5e5638 100644 --- a/src/tasks/task-flow-registry.types.ts +++ b/src/tasks/task-flow-registry.types.ts @@ -1,4 +1,4 @@ -import type { DeliveryContext } from "../utils/delivery-context.js"; +import type { DeliveryContext } from "../utils/delivery-context.shared.js"; import type { TaskNotifyPolicy } from "./task-registry.types.js"; export type JsonValue = diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 5c5ff6151e9..be0c7a2a1fc 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -7,7 +7,7 @@ import { enqueueSystemEvent } from "../infra/system-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; -import { normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; import { formatTaskBlockedFollowupMessage, diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index 0a770a8229f..b14bf8da3cf 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -1,4 +1,4 @@ -import type { DeliveryContext } from "../utils/delivery-context.js"; +import type { DeliveryContext } from "../utils/delivery-context.shared.js"; export type TaskRuntime = "subagent" | "acp" | "cli" | "cron"; diff --git a/src/utils/delivery-context.shared.ts b/src/utils/delivery-context.shared.ts new file mode 100644 index 00000000000..dd94fa731cf --- /dev/null +++ b/src/utils/delivery-context.shared.ts @@ -0,0 +1,159 @@ +import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { normalizeAccountId } from "./account-id.js"; +import { normalizeMessageChannel } from "./message-channel.js"; + +export type DeliveryContext = { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; +}; + +export type DeliveryContextSessionSource = { + channel?: string; + lastChannel?: string; + lastTo?: string; + lastAccountId?: string; + lastThreadId?: string | number; + origin?: { + provider?: string; + accountId?: string; + threadId?: string | number; + }; + deliveryContext?: DeliveryContext; +}; + +export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { + if (!context) { + return undefined; + } + const channel = + typeof context.channel === "string" + ? (normalizeMessageChannel(context.channel) ?? context.channel.trim()) + : undefined; + const to = normalizeOptionalString(context.to); + const accountId = normalizeAccountId(context.accountId); + const threadId = + typeof context.threadId === "number" && Number.isFinite(context.threadId) + ? Math.trunc(context.threadId) + : typeof context.threadId === "string" + ? normalizeOptionalString(context.threadId) + : undefined; + const normalizedThreadId = + typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId; + if (!channel && !to && !accountId && normalizedThreadId == null) { + return undefined; + } + const normalized: DeliveryContext = { + channel: channel || undefined, + to: to || undefined, + accountId, + }; + if (normalizedThreadId != null) { + normalized.threadId = normalizedThreadId; + } + return normalized; +} + +export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): { + deliveryContext?: DeliveryContext; + lastChannel?: string; + lastTo?: string; + lastAccountId?: string; + lastThreadId?: string | number; +} { + if (!source) { + return { + deliveryContext: undefined, + lastChannel: undefined, + lastTo: undefined, + lastAccountId: undefined, + lastThreadId: undefined, + }; + } + + const merged = mergeDeliveryContext( + normalizeDeliveryContext({ + channel: source.lastChannel ?? source.channel, + to: source.lastTo, + accountId: source.lastAccountId, + threadId: source.lastThreadId, + }), + normalizeDeliveryContext(source.deliveryContext), + ); + + if (!merged) { + return { + deliveryContext: undefined, + lastChannel: undefined, + lastTo: undefined, + lastAccountId: undefined, + lastThreadId: undefined, + }; + } + + return { + deliveryContext: merged, + lastChannel: merged.channel, + lastTo: merged.to, + lastAccountId: merged.accountId, + lastThreadId: merged.threadId, + }; +} + +export function deliveryContextFromSession( + entry?: DeliveryContextSessionSource, +): DeliveryContext | undefined { + if (!entry) { + return undefined; + } + const source: DeliveryContextSessionSource = { + channel: entry.channel ?? entry.origin?.provider, + lastChannel: entry.lastChannel, + lastTo: entry.lastTo, + lastAccountId: entry.lastAccountId ?? entry.origin?.accountId, + lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId, + origin: entry.origin, + deliveryContext: entry.deliveryContext, + }; + return normalizeSessionDeliveryFields(source).deliveryContext; +} + +export function mergeDeliveryContext( + primary?: DeliveryContext, + fallback?: DeliveryContext, +): DeliveryContext | undefined { + const normalizedPrimary = normalizeDeliveryContext(primary); + const normalizedFallback = normalizeDeliveryContext(fallback); + if (!normalizedPrimary && !normalizedFallback) { + return undefined; + } + const channelsConflict = + normalizedPrimary?.channel && + normalizedFallback?.channel && + normalizedPrimary.channel !== normalizedFallback.channel; + return normalizeDeliveryContext({ + channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, + // Keep route fields paired to their channel; avoid crossing fields between + // unrelated channels during session context merges. + to: channelsConflict + ? normalizedPrimary?.to + : (normalizedPrimary?.to ?? normalizedFallback?.to), + accountId: channelsConflict + ? normalizedPrimary?.accountId + : (normalizedPrimary?.accountId ?? normalizedFallback?.accountId), + threadId: channelsConflict + ? normalizedPrimary?.threadId + : (normalizedPrimary?.threadId ?? normalizedFallback?.threadId), + }); +} + +export function deliveryContextKey(context?: DeliveryContext): string | undefined { + const normalized = normalizeDeliveryContext(context); + if (!normalized?.channel || !normalized?.to) { + return undefined; + } + const threadId = + normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : ""; + return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`; +} diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts index d4a0f9dfe08..635df4481a6 100644 --- a/src/utils/delivery-context.ts +++ b/src/utils/delivery-context.ts @@ -1,60 +1,14 @@ import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; -import { normalizeAccountId } from "./account-id.js"; -import { normalizeMessageChannel } from "./message-channel.js"; - -export type DeliveryContext = { - channel?: string; - to?: string; - accountId?: string; - threadId?: string | number; -}; - -export type DeliveryContextSessionSource = { - channel?: string; - lastChannel?: string; - lastTo?: string; - lastAccountId?: string; - lastThreadId?: string | number; - origin?: { - provider?: string; - accountId?: string; - threadId?: string | number; - }; - deliveryContext?: DeliveryContext; -}; - -export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { - if (!context) { - return undefined; - } - const channel = - typeof context.channel === "string" - ? (normalizeMessageChannel(context.channel) ?? context.channel.trim()) - : undefined; - const to = normalizeOptionalString(context.to); - const accountId = normalizeAccountId(context.accountId); - const threadId = - typeof context.threadId === "number" && Number.isFinite(context.threadId) - ? Math.trunc(context.threadId) - : typeof context.threadId === "string" - ? normalizeOptionalString(context.threadId) - : undefined; - const normalizedThreadId = - typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId; - if (!channel && !to && !accountId && normalizedThreadId == null) { - return undefined; - } - const normalized: DeliveryContext = { - channel: channel || undefined, - to: to || undefined, - accountId, - }; - if (normalizedThreadId != null) { - normalized.threadId = normalizedThreadId; - } - return normalized; -} +export { + deliveryContextFromSession, + deliveryContextKey, + mergeDeliveryContext, + normalizeDeliveryContext, + normalizeSessionDeliveryFields, + type DeliveryContext, + type DeliveryContextSessionSource, +} from "./delivery-context.shared.js"; export function formatConversationTarget(params: { channel?: string; @@ -146,106 +100,3 @@ export function resolveConversationDeliveryTarget(params: { const to = formatConversationTarget(params); return { to }; } - -export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): { - deliveryContext?: DeliveryContext; - lastChannel?: string; - lastTo?: string; - lastAccountId?: string; - lastThreadId?: string | number; -} { - if (!source) { - return { - deliveryContext: undefined, - lastChannel: undefined, - lastTo: undefined, - lastAccountId: undefined, - lastThreadId: undefined, - }; - } - - const merged = mergeDeliveryContext( - normalizeDeliveryContext({ - channel: source.lastChannel ?? source.channel, - to: source.lastTo, - accountId: source.lastAccountId, - threadId: source.lastThreadId, - }), - normalizeDeliveryContext(source.deliveryContext), - ); - - if (!merged) { - return { - deliveryContext: undefined, - lastChannel: undefined, - lastTo: undefined, - lastAccountId: undefined, - lastThreadId: undefined, - }; - } - - return { - deliveryContext: merged, - lastChannel: merged.channel, - lastTo: merged.to, - lastAccountId: merged.accountId, - lastThreadId: merged.threadId, - }; -} - -export function deliveryContextFromSession( - entry?: DeliveryContextSessionSource, -): DeliveryContext | undefined { - if (!entry) { - return undefined; - } - const source: DeliveryContextSessionSource = { - channel: entry.channel ?? entry.origin?.provider, - lastChannel: entry.lastChannel, - lastTo: entry.lastTo, - lastAccountId: entry.lastAccountId ?? entry.origin?.accountId, - lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId, - origin: entry.origin, - deliveryContext: entry.deliveryContext, - }; - return normalizeSessionDeliveryFields(source).deliveryContext; -} - -export function mergeDeliveryContext( - primary?: DeliveryContext, - fallback?: DeliveryContext, -): DeliveryContext | undefined { - const normalizedPrimary = normalizeDeliveryContext(primary); - const normalizedFallback = normalizeDeliveryContext(fallback); - if (!normalizedPrimary && !normalizedFallback) { - return undefined; - } - const channelsConflict = - normalizedPrimary?.channel && - normalizedFallback?.channel && - normalizedPrimary.channel !== normalizedFallback.channel; - return normalizeDeliveryContext({ - channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, - // Keep route fields paired to their channel; avoid crossing fields between - // unrelated channels during session context merges. - to: channelsConflict - ? normalizedPrimary?.to - : (normalizedPrimary?.to ?? normalizedFallback?.to), - accountId: channelsConflict - ? normalizedPrimary?.accountId - : (normalizedPrimary?.accountId ?? normalizedFallback?.accountId), - threadId: channelsConflict - ? normalizedPrimary?.threadId - : (normalizedPrimary?.threadId ?? normalizedFallback?.threadId), - }); -} - -export function deliveryContextKey(context?: DeliveryContext): string | undefined { - const normalized = normalizeDeliveryContext(context); - if (!normalized?.channel || !normalized?.to) { - return undefined; - } - const threadId = - normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : ""; - return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`; -}