fix(cycles): split runtime delivery and registry seams

This commit is contained in:
Vincent Koc
2026-04-11 13:20:41 +01:00
parent 41ab0f7d5c
commit 543c14a4ed
15 changed files with 253 additions and 184 deletions

View File

@@ -1,4 +1,7 @@
import { getActivePluginChannelRegistry, getActivePluginRegistry } from "../plugins/runtime.js";
import {
getActivePluginChannelRegistryFromState,
getPluginRegistryState,
} from "../plugins/runtime-state.js";
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
@@ -23,11 +26,11 @@ type RegisteredChannelPluginEntry = {
};
function listRegisteredChannelPluginEntries(): RegisteredChannelPluginEntry[] {
const channelRegistry = getActivePluginChannelRegistry();
const channelRegistry = getActivePluginChannelRegistryFromState();
if (channelRegistry && channelRegistry.channels && channelRegistry.channels.length > 0) {
return channelRegistry.channels;
}
return getActivePluginRegistry()?.channels ?? [];
return getPluginRegistryState()?.activeRegistry?.channels ?? [];
}
function findRegisteredChannelPluginEntry(

View File

@@ -5,7 +5,7 @@ import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "../shared/string-coerce.js";
import type { OpenClawConfig } from "./config.js";
import type { OpenClawConfig } from "./types.openclaw.js";
import {
parseToolsBySenderTypedKey,
type GroupToolPolicyBySenderConfig,

View File

@@ -0,0 +1,28 @@
import type { MsgContext } from "../../auto-reply/templating.js";
import type { DeliveryContext } from "../../utils/delivery-context.shared.js";
import type { SessionEntry, GroupKeyResolution } from "./types.js";
export type ReadSessionUpdatedAt = (params: {
storePath: string;
sessionKey: string;
}) => number | undefined;
export type RecordSessionMetaFromInbound = (params: {
storePath: string;
sessionKey: string;
ctx: MsgContext;
groupResolution?: GroupKeyResolution | null;
createIfMissing?: boolean;
}) => Promise<SessionEntry | null>;
export type UpdateLastRoute = (params: {
storePath: string;
sessionKey: string;
channel?: SessionEntry["lastChannel"];
to?: string;
accountId?: string;
threadId?: string | number;
deliveryContext?: DeliveryContext;
ctx?: MsgContext;
groupResolution?: GroupKeyResolution | null;
}) => Promise<SessionEntry>;

View File

@@ -1,15 +1,35 @@
import type { PluginRegistry } from "./registry.js";
export const PLUGIN_REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState");
export type RuntimeTrackedPluginRecord = {
id: string;
status?: string;
format?: string;
};
export type RuntimeTrackedChannelEntry = {
plugin: {
id?: string | null;
meta?: {
aliases?: string[];
markdownCapable?: boolean;
} | null;
};
};
export type RuntimeTrackedPluginRegistry = {
plugins: RuntimeTrackedPluginRecord[];
httpRoutes?: unknown[];
channels?: RuntimeTrackedChannelEntry[];
};
export type RegistrySurfaceState = {
registry: PluginRegistry | null;
registry: RuntimeTrackedPluginRegistry | null;
pinned: boolean;
version: number;
};
export type RegistryState = {
activeRegistry: PluginRegistry | null;
activeRegistry: RuntimeTrackedPluginRegistry | null;
activeVersion: number;
httpRoute: RegistrySurfaceState;
channel: RegistrySurfaceState;
@@ -27,7 +47,7 @@ export function getPluginRegistryState(): RegistryState | undefined {
return (globalThis as GlobalRegistryState)[PLUGIN_REGISTRY_STATE];
}
export function getActivePluginChannelRegistryFromState(): PluginRegistry | null {
export function getActivePluginChannelRegistryFromState(): RuntimeTrackedPluginRegistry | null {
const state = getPluginRegistryState();
return state?.channel.registry ?? state?.activeRegistry ?? null;
}

View File

@@ -6,6 +6,10 @@ import {
type RegistrySurfaceState,
} from "./runtime-state.js";
function asPluginRegistry(registry: RegistryState["activeRegistry"]): PluginRegistry | null {
return registry as PluginRegistry | null;
}
const state: RegistryState = (() => {
const globalState = globalThis as typeof globalThis & {
[PLUGIN_REGISTRY_STATE]?: RegistryState;
@@ -85,7 +89,7 @@ export function setActivePluginRegistry(
}
export function getActivePluginRegistry(): PluginRegistry | null {
return state.activeRegistry;
return asPluginRegistry(state.activeRegistry);
}
export function getActivePluginRegistryWorkspaceDir(): string | undefined {
@@ -114,7 +118,7 @@ export function releasePinnedPluginHttpRouteRegistry(registry?: PluginRegistry)
}
export function getActivePluginHttpRouteRegistry(): PluginRegistry | null {
return state.httpRoute.registry ?? state.activeRegistry;
return asPluginRegistry(state.httpRoute.registry ?? state.activeRegistry);
}
export function getActivePluginHttpRouteRegistryVersion(): number {
@@ -163,7 +167,7 @@ export function releasePinnedPluginChannelRegistry(registry?: PluginRegistry) {
* When pinned, this returns the startup registry regardless of subsequent
* `setActivePluginRegistry` calls. */
export function getActivePluginChannelRegistry(): PluginRegistry | null {
return state.channel.registry ?? state.activeRegistry;
return asPluginRegistry(state.channel.registry ?? state.activeRegistry);
}
export function getActivePluginChannelRegistryVersion(): number {

View File

@@ -28,7 +28,7 @@ import type {
TaskRegistrySummary,
TaskRuntime,
} from "../../tasks/task-registry.types.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.shared.js";
import type { OpenClawPluginToolContext } from "../tool-types.js";
export type ManagedTaskFlowRecord = TaskFlowRecord & {

View File

@@ -20,7 +20,7 @@ import {
listTasksForRelatedSessionKeyForOwner,
resolveTaskForLookupTokenForOwner,
} from "../../tasks/task-owner-access.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.shared.js";
import type { OpenClawPluginToolContext } from "../tool-types.js";
import type { PluginRuntimeTaskFlow } from "./runtime-taskflow.js";
import type {

View File

@@ -9,6 +9,10 @@ type ReadChannelAllowFromStore =
typeof import("../../pairing/pairing-store.js").readChannelAllowFromStore;
type UpsertChannelPairingRequest =
typeof import("../../pairing/pairing-store.js").upsertChannelPairingRequest;
type ReadSessionUpdatedAt = import("../../config/sessions/runtime-types.js").ReadSessionUpdatedAt;
type RecordSessionMetaFromInbound =
import("../../config/sessions/runtime-types.js").RecordSessionMetaFromInbound;
type UpdateLastRoute = import("../../config/sessions/runtime-types.js").UpdateLastRoute;
type ReadChannelAllowFromStoreForAccount = (params: {
channel: Parameters<ReadChannelAllowFromStore>[0];
@@ -105,11 +109,11 @@ export type PluginRuntimeChannel = {
get: typeof import("../../infra/channel-activity.js").getChannelActivity;
};
session: {
resolveStorePath: typeof import("../../config/sessions.js").resolveStorePath;
readSessionUpdatedAt: typeof import("../../config/sessions.js").readSessionUpdatedAt;
recordSessionMetaFromInbound: typeof import("../../config/sessions.js").recordSessionMetaFromInbound;
resolveStorePath: typeof import("../../config/sessions/paths.js").resolveStorePath;
readSessionUpdatedAt: ReadSessionUpdatedAt;
recordSessionMetaFromInbound: RecordSessionMetaFromInbound;
recordInboundSession: typeof import("../../channels/session.js").recordInboundSession;
updateLastRoute: typeof import("../../config/sessions.js").updateLastRoute;
updateLastRoute: UpdateLastRoute;
};
mentions: {
buildMentionRegexes: typeof import("../../auto-reply/reply/mentions.js").buildMentionRegexes;

View File

@@ -54,10 +54,10 @@ export type PluginRuntimeCore = {
resolveAgentTimeoutMs: typeof import("../../agents/timeout.js").resolveAgentTimeoutMs;
ensureAgentWorkspace: typeof import("../../agents/workspace.js").ensureAgentWorkspace;
session: {
resolveStorePath: typeof import("../../config/sessions.js").resolveStorePath;
loadSessionStore: typeof import("../../config/sessions.js").loadSessionStore;
saveSessionStore: typeof import("../../config/sessions.js").saveSessionStore;
resolveSessionFilePath: typeof import("../../config/sessions.js").resolveSessionFilePath;
resolveStorePath: typeof import("../../config/sessions/paths.js").resolveStorePath;
loadSessionStore: typeof import("../../config/sessions/store-load.js").loadSessionStore;
saveSessionStore: typeof import("../../config/sessions/store.js").saveSessionStore;
resolveSessionFilePath: typeof import("../../config/sessions/paths.js").resolveSessionFilePath;
};
};
system: {

View File

@@ -2,7 +2,7 @@ import type { ToolFsPolicy } from "../agents/tool-fs-policy.js";
import type { AnyAgentTool } from "../agents/tools/common.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { HookEntry } from "../hooks/types.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { DeliveryContext } from "../utils/delivery-context.shared.js";
/** Trusted execution context passed to plugin-owned agent tool factories. */
export type OpenClawPluginToolContext = {

View File

@@ -1,4 +1,4 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { DeliveryContext } from "../utils/delivery-context.shared.js";
import type { TaskNotifyPolicy } from "./task-registry.types.js";
export type JsonValue =

View File

@@ -7,7 +7,7 @@ import { enqueueSystemEvent } from "../infra/system-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import {
formatTaskBlockedFollowupMessage,

View File

@@ -1,4 +1,4 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { DeliveryContext } from "../utils/delivery-context.shared.js";
export type TaskRuntime = "subagent" | "acp" | "cli" | "cron";

View File

@@ -0,0 +1,159 @@
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { normalizeAccountId } from "./account-id.js";
import { normalizeMessageChannel } from "./message-channel.js";
export type DeliveryContext = {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
export type DeliveryContextSessionSource = {
channel?: string;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
origin?: {
provider?: string;
accountId?: string;
threadId?: string | number;
};
deliveryContext?: DeliveryContext;
};
export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context) {
return undefined;
}
const channel =
typeof context.channel === "string"
? (normalizeMessageChannel(context.channel) ?? context.channel.trim())
: undefined;
const to = normalizeOptionalString(context.to);
const accountId = normalizeAccountId(context.accountId);
const threadId =
typeof context.threadId === "number" && Number.isFinite(context.threadId)
? Math.trunc(context.threadId)
: typeof context.threadId === "string"
? normalizeOptionalString(context.threadId)
: undefined;
const normalizedThreadId =
typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId;
if (!channel && !to && !accountId && normalizedThreadId == null) {
return undefined;
}
const normalized: DeliveryContext = {
channel: channel || undefined,
to: to || undefined,
accountId,
};
if (normalizedThreadId != null) {
normalized.threadId = normalizedThreadId;
}
return normalized;
}
export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): {
deliveryContext?: DeliveryContext;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
} {
if (!source) {
return {
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
};
}
const merged = mergeDeliveryContext(
normalizeDeliveryContext({
channel: source.lastChannel ?? source.channel,
to: source.lastTo,
accountId: source.lastAccountId,
threadId: source.lastThreadId,
}),
normalizeDeliveryContext(source.deliveryContext),
);
if (!merged) {
return {
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
};
}
return {
deliveryContext: merged,
lastChannel: merged.channel,
lastTo: merged.to,
lastAccountId: merged.accountId,
lastThreadId: merged.threadId,
};
}
export function deliveryContextFromSession(
entry?: DeliveryContextSessionSource,
): DeliveryContext | undefined {
if (!entry) {
return undefined;
}
const source: DeliveryContextSessionSource = {
channel: entry.channel ?? entry.origin?.provider,
lastChannel: entry.lastChannel,
lastTo: entry.lastTo,
lastAccountId: entry.lastAccountId ?? entry.origin?.accountId,
lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId,
origin: entry.origin,
deliveryContext: entry.deliveryContext,
};
return normalizeSessionDeliveryFields(source).deliveryContext;
}
export function mergeDeliveryContext(
primary?: DeliveryContext,
fallback?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedPrimary = normalizeDeliveryContext(primary);
const normalizedFallback = normalizeDeliveryContext(fallback);
if (!normalizedPrimary && !normalizedFallback) {
return undefined;
}
const channelsConflict =
normalizedPrimary?.channel &&
normalizedFallback?.channel &&
normalizedPrimary.channel !== normalizedFallback.channel;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
// Keep route fields paired to their channel; avoid crossing fields between
// unrelated channels during session context merges.
to: channelsConflict
? normalizedPrimary?.to
: (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: channelsConflict
? normalizedPrimary?.accountId
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
threadId: channelsConflict
? normalizedPrimary?.threadId
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
});
}
export function deliveryContextKey(context?: DeliveryContext): string | undefined {
const normalized = normalizeDeliveryContext(context);
if (!normalized?.channel || !normalized?.to) {
return undefined;
}
const threadId =
normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : "";
return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`;
}

View File

@@ -1,60 +1,14 @@
import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { normalizeAccountId } from "./account-id.js";
import { normalizeMessageChannel } from "./message-channel.js";
export type DeliveryContext = {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
export type DeliveryContextSessionSource = {
channel?: string;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
origin?: {
provider?: string;
accountId?: string;
threadId?: string | number;
};
deliveryContext?: DeliveryContext;
};
export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context) {
return undefined;
}
const channel =
typeof context.channel === "string"
? (normalizeMessageChannel(context.channel) ?? context.channel.trim())
: undefined;
const to = normalizeOptionalString(context.to);
const accountId = normalizeAccountId(context.accountId);
const threadId =
typeof context.threadId === "number" && Number.isFinite(context.threadId)
? Math.trunc(context.threadId)
: typeof context.threadId === "string"
? normalizeOptionalString(context.threadId)
: undefined;
const normalizedThreadId =
typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId;
if (!channel && !to && !accountId && normalizedThreadId == null) {
return undefined;
}
const normalized: DeliveryContext = {
channel: channel || undefined,
to: to || undefined,
accountId,
};
if (normalizedThreadId != null) {
normalized.threadId = normalizedThreadId;
}
return normalized;
}
export {
deliveryContextFromSession,
deliveryContextKey,
mergeDeliveryContext,
normalizeDeliveryContext,
normalizeSessionDeliveryFields,
type DeliveryContext,
type DeliveryContextSessionSource,
} from "./delivery-context.shared.js";
export function formatConversationTarget(params: {
channel?: string;
@@ -146,106 +100,3 @@ export function resolveConversationDeliveryTarget(params: {
const to = formatConversationTarget(params);
return { to };
}
export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): {
deliveryContext?: DeliveryContext;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
} {
if (!source) {
return {
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
};
}
const merged = mergeDeliveryContext(
normalizeDeliveryContext({
channel: source.lastChannel ?? source.channel,
to: source.lastTo,
accountId: source.lastAccountId,
threadId: source.lastThreadId,
}),
normalizeDeliveryContext(source.deliveryContext),
);
if (!merged) {
return {
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
};
}
return {
deliveryContext: merged,
lastChannel: merged.channel,
lastTo: merged.to,
lastAccountId: merged.accountId,
lastThreadId: merged.threadId,
};
}
export function deliveryContextFromSession(
entry?: DeliveryContextSessionSource,
): DeliveryContext | undefined {
if (!entry) {
return undefined;
}
const source: DeliveryContextSessionSource = {
channel: entry.channel ?? entry.origin?.provider,
lastChannel: entry.lastChannel,
lastTo: entry.lastTo,
lastAccountId: entry.lastAccountId ?? entry.origin?.accountId,
lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId,
origin: entry.origin,
deliveryContext: entry.deliveryContext,
};
return normalizeSessionDeliveryFields(source).deliveryContext;
}
export function mergeDeliveryContext(
primary?: DeliveryContext,
fallback?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedPrimary = normalizeDeliveryContext(primary);
const normalizedFallback = normalizeDeliveryContext(fallback);
if (!normalizedPrimary && !normalizedFallback) {
return undefined;
}
const channelsConflict =
normalizedPrimary?.channel &&
normalizedFallback?.channel &&
normalizedPrimary.channel !== normalizedFallback.channel;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
// Keep route fields paired to their channel; avoid crossing fields between
// unrelated channels during session context merges.
to: channelsConflict
? normalizedPrimary?.to
: (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: channelsConflict
? normalizedPrimary?.accountId
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
threadId: channelsConflict
? normalizedPrimary?.threadId
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
});
}
export function deliveryContextKey(context?: DeliveryContext): string | undefined {
const normalized = normalizeDeliveryContext(context);
if (!normalized?.channel || !normalized?.to) {
return undefined;
}
const threadId =
normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : "";
return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`;
}