From 8d9a2f82a464070659edd07a7f7ace89294278c8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 01:44:32 +0100 Subject: [PATCH] fix(gateway): keep bundled channel startup light --- CHANGELOG.md | 1 + src/channels/plugins/index.ts | 1 + src/channels/plugins/registry-loaded.ts | 28 ++- src/channels/plugins/registry.ts | 14 +- src/channels/plugins/types.adapters.ts | 6 +- src/gateway/server-channels.test.ts | 70 +++++++- src/gateway/server-channels.ts | 31 +++- src/gateway/server.impl.ts | 13 ++ src/plugins/channel-registry-state.types.ts | 2 + .../runtime/channel-runtime-contexts.ts | 168 ++++++++++++++++++ src/plugins/runtime/runtime-channel.ts | 167 +---------------- 11 files changed, 324 insertions(+), 177 deletions(-) create mode 100644 src/plugins/runtime/channel-runtime-contexts.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b62339943e..43b5ef10f7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ Docs: https://docs.openclaw.ai - Channels/commands: make generated `/dock-*` commands switch the active session reply route through `session.identityLinks` instead of falling through to normal chat. Fixes #69206; carries forward #73033. Thanks @clawbones and @michaelatamuk. - Providers/Cloudflare AI Gateway: strip assistant prefill turns from Anthropic Messages payloads when thinking is enabled, so Claude requests through Cloudflare AI Gateway no longer fail Anthropic conversation-ending validation. Fixes #72905; carries forward #73005. Thanks @AaronFaby and @sahilsatralkar. - Gateway/startup: keep primary-model startup prewarm on scoped metadata preparation, let native approval bootstraps retry outside channel startup, and skip the global hook runner when no `gateway_start` hook is registered, so clean post-ready sidecar work stays off the critical path. Refs #72846. Thanks @RayWoo, @livekm0309, and @mrz1836. +- Gateway/channels: start bundled channel accounts with a lightweight `runtimeContexts` surface instead of importing the full reply/routing/session channel runtime before `startAccount`, so Discord, Telegram, Slack, Matrix, and QQBot startup no longer block on unrelated channel helper graphs. Refs #72846 and #72960. Thanks @mrz1836, @RayWoo, and @rollingshmily. - Gateway/supervisor: exit cleanly when a supervised restart finds an existing healthy gateway and bound retries when the existing gateway stays unhealthy, so stale lock contention cannot loop indefinitely. Refs #72846. Thanks @azgardtek. - Gateway/startup: scope primary-model provider discovery during channel prewarm to the configured provider owner and add split startup trace timings, so boot avoids staging unrelated bundled provider dependencies while setup discovery remains broad. Fixes #73002. Thanks @Schnup03. - Plugins/runtime deps: declare retained staged bundled plugin dependencies in the npm staging manifest while installing only newly missing packages, so Gateway restarts avoid reinstalling the full retained dependency set when one runtime dependency is absent. Fixes #73055. Thanks @GCorp2026. diff --git a/src/channels/plugins/index.ts b/src/channels/plugins/index.ts index 0541ae77837..3ec7836c8e2 100644 --- a/src/channels/plugins/index.ts +++ b/src/channels/plugins/index.ts @@ -1,6 +1,7 @@ export { getChannelPlugin, getLoadedChannelPlugin, + getLoadedChannelPluginOrigin, listChannelPlugins, normalizeChannelId, } from "./registry.js"; diff --git a/src/channels/plugins/registry-loaded.ts b/src/channels/plugins/registry-loaded.ts index 7ad0b045235..66c3f851436 100644 --- a/src/channels/plugins/registry-loaded.ts +++ b/src/channels/plugins/registry-loaded.ts @@ -1,4 +1,7 @@ -import type { ActiveChannelPluginRuntimeShape } from "../../plugins/channel-registry-state.types.js"; +import type { + ActiveChannelPluginRuntimeShape, + ActivePluginChannelRegistration, +} from "../../plugins/channel-registry-state.types.js"; import { getActivePluginChannelRegistryFromState, getActivePluginChannelRegistryVersionFromState, @@ -11,11 +14,16 @@ export type LoadedChannelPlugin = ActiveChannelPluginRuntimeShape & { meta: NonNullable; }; +export type LoadedChannelPluginEntry = ActivePluginChannelRegistration & { + plugin: LoadedChannelPlugin; +}; + type CachedChannelPlugins = { registryVersion: number; registryRef: object | null; sorted: LoadedChannelPlugin[]; byId: Map; + entriesById: Map; }; const EMPTY_CHANNEL_PLUGIN_CACHE: CachedChannelPlugins = { @@ -23,6 +31,7 @@ const EMPTY_CHANNEL_PLUGIN_CACHE: CachedChannelPlugins = { registryRef: null, sorted: [], byId: new Map(), + entriesById: new Map(), }; let cachedChannelPlugins = EMPTY_CHANNEL_PLUGIN_CACHE; @@ -63,11 +72,13 @@ function resolveCachedChannelPlugins(): CachedChannelPlugins { } const channelPlugins: LoadedChannelPlugin[] = []; + const pluginEntries: LoadedChannelPluginEntry[] = []; if (registry && Array.isArray(registry.channels)) { for (const entry of registry.channels) { const plugin = coerceLoadedChannelPlugin(entry?.plugin); if (plugin) { channelPlugins.push(plugin); + pluginEntries.push({ ...entry, plugin }); } } } @@ -83,8 +94,14 @@ function resolveCachedChannelPlugins(): CachedChannelPlugins { return a.id.localeCompare(b.id); }); const byId = new Map(); + const entriesById = new Map(); + const unsortedEntriesById = new Map(pluginEntries.map((entry) => [entry.plugin.id, entry])); for (const plugin of sorted) { byId.set(plugin.id, plugin); + const entry = unsortedEntriesById.get(plugin.id); + if (entry) { + entriesById.set(plugin.id, entry); + } } const next: CachedChannelPlugins = { @@ -92,6 +109,7 @@ function resolveCachedChannelPlugins(): CachedChannelPlugins { registryRef: registry, sorted, byId, + entriesById, }; cachedChannelPlugins = next; return next; @@ -108,3 +126,11 @@ export function getLoadedChannelPluginById(id: string): LoadedChannelPlugin | un } return resolveCachedChannelPlugins().byId.get(resolvedId); } + +export function getLoadedChannelPluginEntryById(id: string): LoadedChannelPluginEntry | undefined { + const resolvedId = normalizeOptionalString(id) ?? ""; + if (!resolvedId) { + return undefined; + } + return resolveCachedChannelPlugins().entriesById.get(resolvedId); +} diff --git a/src/channels/plugins/registry.ts b/src/channels/plugins/registry.ts index 772252e428a..2ceeec069ea 100644 --- a/src/channels/plugins/registry.ts +++ b/src/channels/plugins/registry.ts @@ -1,7 +1,11 @@ import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { normalizeAnyChannelId } from "../registry.js"; import { getBundledChannelPlugin } from "./bundled.js"; -import { getLoadedChannelPluginById, listLoadedChannelPlugins } from "./registry-loaded.js"; +import { + getLoadedChannelPluginById, + getLoadedChannelPluginEntryById, + listLoadedChannelPlugins, +} from "./registry-loaded.js"; import type { ChannelPlugin } from "./types.plugin.js"; import type { ChannelId } from "./types.public.js"; @@ -17,6 +21,14 @@ export function getLoadedChannelPlugin(id: ChannelId): ChannelPlugin | undefined return getLoadedChannelPluginById(resolvedId) as ChannelPlugin | undefined; } +export function getLoadedChannelPluginOrigin(id: ChannelId): string | undefined { + const resolvedId = normalizeOptionalString(id) ?? ""; + if (!resolvedId) { + return undefined; + } + return normalizeOptionalString(getLoadedChannelPluginEntryById(resolvedId)?.origin) ?? undefined; +} + export function getChannelPlugin(id: ChannelId): ChannelPlugin | undefined { const resolvedId = normalizeOptionalString(id) ?? ""; if (!resolvedId) { diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index 88421ea0eb4..4592ee226d1 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -301,8 +301,10 @@ export type ChannelGatewayContext = { * - Bundled channels typically don't use this field * because they can directly import internal modules * - External plugins should check for undefined before using - * - When provided, this must be a full `createPluginRuntime().channel` surface; - * partial stubs are not supported + * - `runtimeContexts` is the stable startup-safe subset. Bundled channels + * may receive only that subset during provider boot. + * - External channel plugins that need reply/routing/session helpers receive + * a full `createPluginRuntime().channel` surface from the Gateway. * * @since Plugin SDK 2026.2.19 * @see {@link https://docs.openclaw.ai/plugins/building-plugins | Plugin SDK documentation} diff --git a/src/gateway/server-channels.test.ts b/src/gateway/server-channels.test.ts index 17e4dcef111..b7a6e20ea13 100644 --- a/src/gateway/server-channels.test.ts +++ b/src/gateway/server-channels.test.ts @@ -1,4 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ChannelRuntimeSurface } from "../channels/plugins/channel-runtime-surface.types.js"; import { type ChannelGatewayContext, type ChannelId, @@ -11,6 +12,7 @@ import { } from "../logging/subsystem.js"; import { createEmptyPluginRegistry, type PluginRegistry } from "../plugins/registry.js"; import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js"; +import { createChannelRuntimeContextRegistry } from "../plugins/runtime/channel-runtime-contexts.js"; import { createRuntimeChannel } from "../plugins/runtime/runtime-channel.js"; import type { PluginRuntime } from "../plugins/runtime/types.js"; import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js"; @@ -102,11 +104,17 @@ function createDeferred(): { promise: Promise; resolve: () => void } { return { promise, resolve: resolvePromise }; } -function installTestRegistry(...plugins: ChannelPlugin[]) { +function installTestRegistry( + ...plugins: Array< + ChannelPlugin | { plugin: ChannelPlugin; origin: string } + > +) { const registry = createEmptyPluginRegistry(); - for (const plugin of plugins) { + for (const candidate of plugins) { + const plugin = "plugin" in candidate ? candidate.plugin : candidate; registry.channels.push({ pluginId: plugin.id, + ...("origin" in candidate ? { origin: candidate.origin as never } : {}), source: "test", plugin, }); @@ -115,8 +123,9 @@ function installTestRegistry(...plugins: ChannelPlugin[]) { } function createManager(options?: { - channelRuntime?: PluginRuntime["channel"]; - resolveChannelRuntime?: () => PluginRuntime["channel"] | Promise; + channelRuntime?: ChannelRuntimeSurface; + resolveChannelRuntime?: () => ChannelRuntimeSurface | Promise; + resolveStartupChannelRuntime?: () => ChannelRuntimeSurface | Promise; getRuntimeConfig?: () => Record; channelIds?: ChannelId[]; startupTrace?: { measure: (name: string, run: () => T | Promise) => Promise }; @@ -138,6 +147,9 @@ function createManager(options?: { ...(options?.resolveChannelRuntime ? { resolveChannelRuntime: options.resolveChannelRuntime } : {}), + ...(options?.resolveStartupChannelRuntime + ? { resolveStartupChannelRuntime: options.resolveStartupChannelRuntime } + : {}), ...(options?.startupTrace ? { startupTrace: options.startupTrace } : {}), }); } @@ -377,6 +389,56 @@ describe("server-channels auto restart", () => { expect(ctx?.channelRuntime).not.toBe(channelRuntime); }); + it("uses a lightweight startup runtime for bundled channels", async () => { + const fullRuntime = { + ...createRuntimeChannel(), + marker: "full-channel-runtime", + } as PluginRuntime["channel"] & { marker: string }; + const startupRuntime = { + runtimeContexts: createChannelRuntimeContextRegistry(), + marker: "startup-channel-runtime", + }; + const resolveChannelRuntime = vi.fn(() => fullRuntime); + const resolveStartupChannelRuntime = vi.fn(() => startupRuntime); + const startAccount = vi.fn(async (_ctx: ChannelGatewayContext) => {}); + + installTestRegistry({ plugin: createTestPlugin({ startAccount }), origin: "bundled" }); + const manager = createManager({ resolveChannelRuntime, resolveStartupChannelRuntime }); + + await manager.startChannels(); + + expect(resolveStartupChannelRuntime).toHaveBeenCalledTimes(1); + expect(resolveChannelRuntime).not.toHaveBeenCalled(); + expect(startAccount).toHaveBeenCalledTimes(1); + const [ctx] = startAccount.mock.calls[0] ?? []; + expect(ctx?.channelRuntime).toMatchObject({ marker: "startup-channel-runtime" }); + expect(ctx?.channelRuntime).not.toBe(startupRuntime); + }); + + it("keeps the full runtime path for non-bundled channels", async () => { + const fullRuntime = { + ...createRuntimeChannel(), + marker: "full-channel-runtime", + } as PluginRuntime["channel"] & { marker: string }; + const startupRuntime = { + runtimeContexts: createChannelRuntimeContextRegistry(), + marker: "startup-channel-runtime", + }; + const resolveChannelRuntime = vi.fn(() => fullRuntime); + const resolveStartupChannelRuntime = vi.fn(() => startupRuntime); + const startAccount = vi.fn(async (_ctx: ChannelGatewayContext) => {}); + + installTestRegistry({ plugin: createTestPlugin({ startAccount }), origin: "workspace" }); + const manager = createManager({ resolveChannelRuntime, resolveStartupChannelRuntime }); + + await manager.startChannels(); + + expect(resolveStartupChannelRuntime).not.toHaveBeenCalled(); + expect(resolveChannelRuntime).toHaveBeenCalledTimes(1); + const [ctx] = startAccount.mock.calls[0] ?? []; + expect(ctx?.channelRuntime).toMatchObject({ marker: "full-channel-runtime" }); + }); + it("does not resolve channelRuntime for disabled accounts", async () => { const channelRuntime = createRuntimeChannel(); const resolveChannelRuntime = vi.fn(() => channelRuntime); diff --git a/src/gateway/server-channels.ts b/src/gateway/server-channels.ts index 77969a32497..0bf74a2e5bf 100644 --- a/src/gateway/server-channels.ts +++ b/src/gateway/server-channels.ts @@ -1,6 +1,11 @@ import type { ChannelRuntimeSurface } from "../channels/plugins/channel-runtime-surface.types.js"; import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js"; -import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js"; +import { + type ChannelId, + getChannelPlugin, + getLoadedChannelPluginOrigin, + listChannelPlugins, +} from "../channels/plugins/index.js"; import type { ChannelAccountSnapshot } from "../channels/plugins/types.public.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { startChannelApprovalHandlerBootstrap } from "../infra/approval-handler-bootstrap.js"; @@ -165,6 +170,12 @@ type ChannelManagerOptions = { * `createPluginRuntime().channel` surface. */ resolveChannelRuntime?: () => ChannelRuntimeSurface | Promise; + /** + * Lightweight channel runtime used for bundled channel startup. Bundled + * channels only need `runtimeContexts` while booting, so this avoids pulling + * the full reply/routing/session runtime graph onto the critical path. + */ + resolveStartupChannelRuntime?: () => ChannelRuntimeSurface | Promise; startupTrace?: GatewayStartupTrace; }; @@ -192,6 +203,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage channelRuntimeEnvs, channelRuntime, resolveChannelRuntime, + resolveStartupChannelRuntime, startupTrace, } = opts; @@ -289,8 +301,19 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage return next; }; - const getChannelRuntime = async (): Promise => { - return channelRuntime ?? (await resolveChannelRuntime?.()); + const getChannelRuntime = async ( + channelId: ChannelId, + ): Promise => { + if (channelRuntime) { + return channelRuntime; + } + if (getLoadedChannelPluginOrigin(channelId) === "bundled") { + const startupRuntime = await resolveStartupChannelRuntime?.(); + if (startupRuntime) { + return startupRuntime; + } + } + return await resolveChannelRuntime?.(); }; const measureStartup = async (name: string, run: () => T | Promise): Promise => { return startupTrace ? startupTrace.measure(name, run) : await run(); @@ -437,7 +460,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage scopedChannelRuntime = await measureStartup(`channels.${channelId}.runtime`, async () => createTaskScopedChannelRuntime({ - channelRuntime: await getChannelRuntime(), + channelRuntime: await getChannelRuntime(channelId), }), ); channelRuntimeForTask = scopedChannelRuntime.channelRuntime; diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 743ccccd61c..1cc59c7afd2 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -2,6 +2,7 @@ import { monitorEventLoopDelay } from "node:perf_hooks"; import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js"; import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; import type { CanvasHostServer } from "../canvas-host/server.js"; +import type { ChannelRuntimeSurface } from "../channels/plugins/channel-runtime-surface.types.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { createDefaultDeps } from "../cli/deps.js"; import { isRestartEnabled } from "../config/commands.flags.js"; @@ -122,6 +123,7 @@ const logTailscale = log.child("tailscale"); const logChannels = log.child("channels"); let cachedChannelRuntimePromise: Promise | null = null; +let cachedStartupChannelRuntimePromise: Promise | null = null; function getChannelRuntime() { cachedChannelRuntimePromise ??= import("../plugins/runtime/runtime-channel.js").then( @@ -130,6 +132,16 @@ function getChannelRuntime() { return cachedChannelRuntimePromise; } +function getStartupChannelRuntime() { + cachedStartupChannelRuntimePromise ??= + import("../plugins/runtime/channel-runtime-contexts.js").then( + ({ createChannelRuntimeContextRegistry }) => ({ + runtimeContexts: createChannelRuntimeContextRegistry(), + }), + ); + return cachedStartupChannelRuntimePromise; +} + async function closeMcpLoopbackServerOnDemand(): Promise { const { closeMcpLoopbackServer } = await import("./mcp-http.js"); await closeMcpLoopbackServer(); @@ -558,6 +570,7 @@ export async function startGatewayServer( channelLogs, channelRuntimeEnvs, resolveChannelRuntime: getChannelRuntime, + resolveStartupChannelRuntime: getStartupChannelRuntime, startupTrace, }); const getReadiness = createReadinessChecker({ diff --git a/src/plugins/channel-registry-state.types.ts b/src/plugins/channel-registry-state.types.ts index d5416d4ca2a..f2606a07cd0 100644 --- a/src/plugins/channel-registry-state.types.ts +++ b/src/plugins/channel-registry-state.types.ts @@ -15,6 +15,8 @@ export type ActiveChannelPluginRuntimeShape = { export type ActivePluginChannelRegistration = { plugin: ActiveChannelPluginRuntimeShape; + pluginId?: string | null; + origin?: string | null; }; export type ActivePluginChannelRegistry = { diff --git a/src/plugins/runtime/channel-runtime-contexts.ts b/src/plugins/runtime/channel-runtime-contexts.ts new file mode 100644 index 00000000000..a1cdaceb87e --- /dev/null +++ b/src/plugins/runtime/channel-runtime-contexts.ts @@ -0,0 +1,168 @@ +import type { + ChannelRuntimeContextEvent, + ChannelRuntimeContextKey, + ChannelRuntimeContextRegistry, +} from "../../channels/plugins/channel-runtime-surface.types.js"; +import { createSubsystemLogger } from "../../logging.js"; +import { normalizeOptionalString } from "../../shared/string-coerce.js"; + +type StoredRuntimeContext = { + token: symbol; + context: unknown; + normalizedKey: { + channelId: string; + accountId?: string; + capability: string; + }; +}; + +const log = createSubsystemLogger("plugins/runtime-channel"); + +function normalizeRuntimeContextString(value: string | null | undefined): string { + return normalizeOptionalString(value) ?? ""; +} + +function normalizeRuntimeContextKey(params: ChannelRuntimeContextKey): { + mapKey: string; + normalizedKey: { + channelId: string; + accountId?: string; + capability: string; + }; +} | null { + const channelId = normalizeRuntimeContextString(params.channelId); + const capability = normalizeRuntimeContextString(params.capability); + const accountId = normalizeRuntimeContextString(params.accountId); + if (!channelId || !capability) { + return null; + } + return { + mapKey: `${channelId}\u0000${accountId}\u0000${capability}`, + normalizedKey: { + channelId, + capability, + ...(accountId ? { accountId } : {}), + }, + }; +} + +function doesRuntimeContextWatcherMatch(params: { + watcher: { + channelId?: string; + accountId?: string; + capability?: string; + }; + event: ChannelRuntimeContextEvent; +}): boolean { + if (params.watcher.channelId && params.watcher.channelId !== params.event.key.channelId) { + return false; + } + if ( + params.watcher.accountId !== undefined && + params.watcher.accountId !== (params.event.key.accountId ?? "") + ) { + return false; + } + if (params.watcher.capability && params.watcher.capability !== params.event.key.capability) { + return false; + } + return true; +} + +export function createChannelRuntimeContextRegistry(): ChannelRuntimeContextRegistry { + const runtimeContexts = new Map(); + const runtimeContextWatchers = new Set<{ + filter: { + channelId?: string; + accountId?: string; + capability?: string; + }; + onEvent: (event: ChannelRuntimeContextEvent) => void; + }>(); + const emitRuntimeContextEvent = (event: ChannelRuntimeContextEvent) => { + for (const watcher of runtimeContextWatchers) { + if (!doesRuntimeContextWatcherMatch({ watcher: watcher.filter, event })) { + continue; + } + try { + watcher.onEvent(event); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + log.error( + `runtime context watcher failed during ${event.type} ` + + `channel=${event.key.channelId} capability=${event.key.capability}` + + (event.key.accountId ? ` account=${event.key.accountId}` : "") + + `: ${message}`, + ); + } + } + }; + return { + register: (params) => { + const normalized = normalizeRuntimeContextKey(params); + if (!normalized) { + return { dispose: () => {} }; + } + if (params.abortSignal?.aborted) { + return { dispose: () => {} }; + } + const token = Symbol(normalized.mapKey); + let disposed = false; + const dispose = () => { + if (disposed) { + return; + } + disposed = true; + const current = runtimeContexts.get(normalized.mapKey); + if (!current || current.token !== token) { + return; + } + runtimeContexts.delete(normalized.mapKey); + emitRuntimeContextEvent({ + type: "unregistered", + key: normalized.normalizedKey, + }); + }; + params.abortSignal?.addEventListener("abort", dispose, { once: true }); + if (params.abortSignal?.aborted) { + dispose(); + return { dispose }; + } + runtimeContexts.set(normalized.mapKey, { + token, + context: params.context, + normalizedKey: normalized.normalizedKey, + }); + if (disposed) { + return { dispose }; + } + emitRuntimeContextEvent({ + type: "registered", + key: normalized.normalizedKey, + context: params.context, + }); + return { dispose }; + }, + get: (params) => { + const normalized = normalizeRuntimeContextKey(params); + if (!normalized) { + return undefined; + } + return runtimeContexts.get(normalized.mapKey)?.context as never; + }, + watch: (params) => { + const watcher = { + filter: { + ...(params.channelId?.trim() ? { channelId: params.channelId.trim() } : {}), + ...(params.accountId != null ? { accountId: params.accountId.trim() } : {}), + ...(params.capability?.trim() ? { capability: params.capability.trim() } : {}), + }, + onEvent: params.onEvent, + }; + runtimeContextWatchers.add(watcher); + return () => { + runtimeContextWatchers.delete(watcher); + }; + }, + }; +} diff --git a/src/plugins/runtime/runtime-channel.ts b/src/plugins/runtime/runtime-channel.ts index 4fd895f9a5e..981c75ae7dd 100644 --- a/src/plugins/runtime/runtime-channel.ts +++ b/src/plugins/runtime/runtime-channel.ts @@ -62,7 +62,6 @@ import { updateLastRoute, } from "../../config/sessions.js"; import { getChannelActivity, recordChannelActivity } from "../../infra/channel-activity.js"; -import { createSubsystemLogger } from "../../logging.js"; import { convertMarkdownTables } from "../../markdown/tables.js"; import { fetchRemoteMedia } from "../../media/fetch.js"; import { saveMediaBuffer } from "../../media/store.js"; @@ -72,104 +71,10 @@ import { upsertChannelPairingRequest, } from "../../pairing/pairing-store.js"; import { buildAgentSessionKey, resolveAgentRoute } from "../../routing/resolve-route.js"; -import { normalizeOptionalString } from "../../shared/string-coerce.js"; -import type { - PluginRuntimeChannelContextEvent, - PluginRuntimeChannelContextKey, -} from "./types-channel.js"; +import { createChannelRuntimeContextRegistry } from "./channel-runtime-contexts.js"; import type { PluginRuntime } from "./types.js"; -type StoredRuntimeContext = { - token: symbol; - context: unknown; - normalizedKey: { - channelId: string; - accountId?: string; - capability: string; - }; -}; - -const log = createSubsystemLogger("plugins/runtime-channel"); - -function normalizeRuntimeContextString(value: string | null | undefined): string { - return normalizeOptionalString(value) ?? ""; -} - -function normalizeRuntimeContextKey(params: PluginRuntimeChannelContextKey): { - mapKey: string; - normalizedKey: { - channelId: string; - accountId?: string; - capability: string; - }; -} | null { - const channelId = normalizeRuntimeContextString(params.channelId); - const capability = normalizeRuntimeContextString(params.capability); - const accountId = normalizeRuntimeContextString(params.accountId); - if (!channelId || !capability) { - return null; - } - return { - mapKey: `${channelId}\u0000${accountId}\u0000${capability}`, - normalizedKey: { - channelId, - capability, - ...(accountId ? { accountId } : {}), - }, - }; -} - -function doesRuntimeContextWatcherMatch(params: { - watcher: { - channelId?: string; - accountId?: string; - capability?: string; - }; - event: PluginRuntimeChannelContextEvent; -}): boolean { - if (params.watcher.channelId && params.watcher.channelId !== params.event.key.channelId) { - return false; - } - if ( - params.watcher.accountId !== undefined && - params.watcher.accountId !== (params.event.key.accountId ?? "") - ) { - return false; - } - if (params.watcher.capability && params.watcher.capability !== params.event.key.capability) { - return false; - } - return true; -} - export function createRuntimeChannel(): PluginRuntime["channel"] { - const runtimeContexts = new Map(); - const runtimeContextWatchers = new Set<{ - filter: { - channelId?: string; - accountId?: string; - capability?: string; - }; - onEvent: (event: PluginRuntimeChannelContextEvent) => void; - }>(); - const emitRuntimeContextEvent = (event: PluginRuntimeChannelContextEvent) => { - for (const watcher of runtimeContextWatchers) { - if (!doesRuntimeContextWatcherMatch({ watcher: watcher.filter, event })) { - continue; - } - try { - watcher.onEvent(event); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - log.error( - `runtime context watcher failed during ${event.type} ` + - `channel=${event.key.channelId} capability=${event.key.capability}` + - (event.key.accountId ? ` account=${event.key.accountId}` : "") + - `: ${message}`, - ); - } - } - }; const channelRuntime = { text: { chunkByNewline, @@ -275,75 +180,7 @@ export function createRuntimeChannel(): PluginRuntime["channel"] { maxAgeMs, }), }, - runtimeContexts: { - register: (params) => { - const normalized = normalizeRuntimeContextKey(params); - if (!normalized) { - return { dispose: () => {} }; - } - if (params.abortSignal?.aborted) { - return { dispose: () => {} }; - } - const token = Symbol(normalized.mapKey); - let disposed = false; - const dispose = () => { - if (disposed) { - return; - } - disposed = true; - const current = runtimeContexts.get(normalized.mapKey); - if (!current || current.token !== token) { - return; - } - runtimeContexts.delete(normalized.mapKey); - emitRuntimeContextEvent({ - type: "unregistered", - key: normalized.normalizedKey, - }); - }; - params.abortSignal?.addEventListener("abort", dispose, { once: true }); - if (params.abortSignal?.aborted) { - dispose(); - return { dispose }; - } - runtimeContexts.set(normalized.mapKey, { - token, - context: params.context, - normalizedKey: normalized.normalizedKey, - }); - if (disposed) { - return { dispose }; - } - emitRuntimeContextEvent({ - type: "registered", - key: normalized.normalizedKey, - context: params.context, - }); - return { dispose }; - }, - // oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Runtime context values are caller-typed by key. - get: (params: PluginRuntimeChannelContextKey) => { - const normalized = normalizeRuntimeContextKey(params); - if (!normalized) { - return undefined; - } - return runtimeContexts.get(normalized.mapKey)?.context as T | undefined; - }, - watch: (params) => { - const watcher = { - filter: { - ...(params.channelId?.trim() ? { channelId: params.channelId.trim() } : {}), - ...(params.accountId != null ? { accountId: params.accountId.trim() } : {}), - ...(params.capability?.trim() ? { capability: params.capability.trim() } : {}), - }, - onEvent: params.onEvent, - }; - runtimeContextWatchers.add(watcher); - return () => { - runtimeContextWatchers.delete(watcher); - }; - }, - }, + runtimeContexts: createChannelRuntimeContextRegistry(), } satisfies PluginRuntime["channel"]; return channelRuntime as PluginRuntime["channel"];