Files
openclaw/src/plugins/runtime.channel-pin.test.ts
Andy Ye b2c5ba6d4c fix(outbound): resolve send-capable channel registry (#83733)
Summary:
- The PR changes outbound channel registry loading and bootstrap to fall back from pinned setup-only channel entries to the active runtime registry, with regression tests and a changelog entry.
- Reproducibility: yes. at source level. Current main can select a pinned setup-only channel entry and skip th ... module live output showing delivery after the fallback; I did not run local tests in this read-only review.

Automerge notes:
- PR branch already contained follow-up commit before automerge: fix(outbound): resolve send-capable channel registry

Validation:
- ClawSweeper review passed for head 67c20aa72b.
- Required merge gates passed before the squash merge.

Prepared head SHA: 67c20aa72b
Review: https://github.com/openclaw/openclaw/pull/83733#issuecomment-4481084888

Co-authored-by: Andy Ye <35905412+TurboTheTurtle@users.noreply.github.com>
Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com>
Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com>
Approved-by: takhoffman
Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
2026-05-18 20:30:51 +00:00

349 lines
12 KiB
TypeScript

import { afterEach, describe, expect, it } from "vitest";
import { loadChannelOutboundAdapter } from "../channels/plugins/outbound/load.js";
import { getChannelPlugin } from "../channels/plugins/registry.js";
import { emitAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js";
import { createEmptyPluginRegistry } from "./registry-empty.js";
import { isPluginRegistryRetired } from "./registry-lifecycle.js";
import {
getActivePluginChannelRegistryVersion,
getActivePluginRegistryVersion,
getActivePluginChannelRegistry,
pinActivePluginChannelRegistry,
releasePinnedPluginChannelRegistry,
requireActivePluginChannelRegistry,
resetPluginRuntimeStateForTest,
setActivePluginRegistry,
} from "./runtime.js";
function createRegistryWithChannel(pluginId = "demo-channel") {
const registry = createEmptyPluginRegistry();
const plugin = { id: pluginId, meta: {} } as never;
registry.channels = [{ plugin }] as never;
return { registry, plugin };
}
function createChannelRegistryPair(pluginId = "demo-channel") {
return {
first: createRegistryWithChannel(pluginId),
second: createRegistryWithChannel(pluginId),
};
}
function createRegistrySet() {
return {
startup: createEmptyPluginRegistry(),
replacement: createEmptyPluginRegistry(),
unrelated: createEmptyPluginRegistry(),
};
}
function expectActiveChannelRegistry(registry: ReturnType<typeof createEmptyPluginRegistry>) {
expect(getActivePluginChannelRegistry()).toBe(registry);
}
function expectPinnedChannelRegistry(
startupRegistry: ReturnType<typeof createEmptyPluginRegistry>,
replacementRegistry: ReturnType<typeof createEmptyPluginRegistry>,
) {
setActivePluginRegistry(startupRegistry);
pinActivePluginChannelRegistry(startupRegistry);
setActivePluginRegistry(replacementRegistry);
expectActiveChannelRegistry(startupRegistry);
}
function expectResetClearsPinnedChannelRegistry(params: {
startupRegistry: ReturnType<typeof createEmptyPluginRegistry>;
freshRegistry: ReturnType<typeof createEmptyPluginRegistry>;
}) {
setActivePluginRegistry(params.startupRegistry);
pinActivePluginChannelRegistry(params.startupRegistry);
resetPluginRuntimeStateForTest();
setActivePluginRegistry(params.freshRegistry);
expectActiveChannelRegistry(params.freshRegistry);
}
function expectChannelRegistrySwap(params: {
startupRegistry: ReturnType<typeof createEmptyPluginRegistry>;
replacementRegistry: ReturnType<typeof createEmptyPluginRegistry>;
pin?: boolean;
releaseRegistry?: ReturnType<typeof createEmptyPluginRegistry>;
expectedDuringSwap: ReturnType<typeof createEmptyPluginRegistry>;
expectedAfterRelease: ReturnType<typeof createEmptyPluginRegistry>;
}) {
setActivePluginRegistry(params.startupRegistry);
if (params.pin) {
pinActivePluginChannelRegistry(params.startupRegistry);
}
setActivePluginRegistry(params.replacementRegistry);
expectActiveChannelRegistry(params.expectedDuringSwap);
if (params.pin && params.releaseRegistry) {
releasePinnedPluginChannelRegistry(params.releaseRegistry);
}
expectActiveChannelRegistry(params.expectedAfterRelease);
}
describe("channel registry pinning", () => {
afterEach(() => {
resetAgentEventsForTest();
resetPluginRuntimeStateForTest();
});
it("returns the active registry when not pinned", () => {
const registry = createEmptyPluginRegistry();
setActivePluginRegistry(registry);
expectActiveChannelRegistry(registry);
});
it("preserves pinned channel registry across setActivePluginRegistry calls", () => {
const { registry: startup } = createRegistryWithChannel();
// A subsequent registry swap (e.g., config-schema load) must not evict channels.
const replacement = createEmptyPluginRegistry();
expectPinnedChannelRegistry(startup, replacement);
expect(getActivePluginChannelRegistry()!.channels).toHaveLength(1);
});
it("keeps pinned channel registries live until they are released", () => {
const { registry: startup } = createRegistryWithChannel();
const replacement = createEmptyPluginRegistry();
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
setActivePluginRegistry(replacement);
expect(getActivePluginChannelRegistry()).toBe(startup);
expect(isPluginRegistryRetired(startup)).toBe(false);
releasePinnedPluginChannelRegistry(startup);
expect(getActivePluginChannelRegistry()).toBe(replacement);
expect(isPluginRegistryRetired(startup)).toBe(true);
});
it("falls back to the active channel registry when the pinned registry is empty", () => {
const startup = createEmptyPluginRegistry();
const { registry: replacement } = createRegistryWithChannel("replacement-channel");
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
const channelVersionBeforeSwap = getActivePluginChannelRegistryVersion();
setActivePluginRegistry(replacement);
expectActiveChannelRegistry(replacement);
expect(getActivePluginChannelRegistryVersion()).not.toBe(channelVersionBeforeSwap);
});
it("re-pin invalidates cached channel lookups", () => {
const { first, second } = createChannelRegistryPair();
const { registry: setup, plugin: setupPlugin } = first;
setActivePluginRegistry(setup);
pinActivePluginChannelRegistry(setup);
expect(getChannelPlugin("demo-channel")).toBe(setupPlugin);
const { registry: full, plugin: fullPlugin } = second;
setActivePluginRegistry(full);
expect(getChannelPlugin("demo-channel")).toBe(setupPlugin);
const activeVersionBeforeRepin = getActivePluginRegistryVersion();
const channelVersionBeforeRepin = getActivePluginChannelRegistryVersion();
pinActivePluginChannelRegistry(full);
expect(getActivePluginRegistryVersion()).toBe(activeVersionBeforeRepin);
expect(getActivePluginChannelRegistryVersion()).toBe(channelVersionBeforeRepin + 1);
expect(getChannelPlugin("demo-channel")).toBe(fullPlugin);
});
it.each([
{
name: "updates channel registry on swap when not pinned",
pin: false,
releasePinnedRegistry: false,
expectDuringPin: false,
expectAfterSwap: "second",
},
{
name: "release restores live-tracking behavior",
pin: true,
releasePinnedRegistry: true,
expectDuringPin: true,
expectAfterSwap: "second",
},
{
name: "release is a no-op when the pinned registry does not match",
pin: true,
releasePinnedRegistry: false,
expectDuringPin: true,
expectAfterSwap: "first",
},
] as const)("$name", ({ pin, releasePinnedRegistry, expectDuringPin, expectAfterSwap }) => {
const { startup, replacement, unrelated } = createRegistrySet();
expectChannelRegistrySwap({
startupRegistry: startup,
replacementRegistry: replacement,
...(pin ? { pin: true } : {}),
...(pin ? { releaseRegistry: releasePinnedRegistry ? startup : unrelated } : {}),
expectedDuringSwap: expectDuringPin ? startup : replacement,
expectedAfterRelease: expectAfterSwap === "second" ? replacement : startup,
});
});
it("requireActivePluginChannelRegistry creates a registry when none exists", () => {
resetPluginRuntimeStateForTest();
const registry = requireActivePluginChannelRegistry();
expect(registry).toEqual(createEmptyPluginRegistry());
});
it("resetPluginRuntimeStateForTest clears channel pin", () => {
const { startup, replacement: fresh } = createRegistrySet();
expectResetClearsPinnedChannelRegistry({
startupRegistry: startup,
freshRegistry: fresh,
});
});
it("loadChannelOutboundAdapter resolves from pinned registry after active registry replacement", async () => {
const outboundAdapter = { send: async () => ({ messageId: "1" }) };
const startup = createEmptyPluginRegistry();
startup.channels = [
{
pluginId: "telegram",
plugin: { id: "telegram", meta: {}, outbound: outboundAdapter },
source: "test",
},
] as never;
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
// Simulate a post-boot registry replacement (e.g. config-schema load, plugin status query).
const replacement = createEmptyPluginRegistry();
setActivePluginRegistry(replacement);
// The outbound loader must still find the telegram adapter from the pinned registry.
const adapter = await loadChannelOutboundAdapter("telegram");
expect(adapter).toBe(outboundAdapter);
});
it("loadChannelOutboundAdapter falls back to active registry when pinned setup entry cannot send", async () => {
const outboundAdapter = { sendText: async () => ({ messageId: "1" }) };
const startup = createEmptyPluginRegistry();
startup.channels = [
{
pluginId: "discord",
plugin: { id: "discord", meta: {} },
source: "setup",
},
] as never;
const replacement = createEmptyPluginRegistry();
replacement.channels = [
{
pluginId: "discord",
plugin: { id: "discord", meta: {}, outbound: outboundAdapter },
source: "runtime",
},
] as never;
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
setActivePluginRegistry(replacement);
const adapter = await loadChannelOutboundAdapter("discord");
expect(adapter).toBe(outboundAdapter);
});
it("keeps pinned channel registry agent-event subscriptions live after active registry replacement", () => {
const observed: string[] = [];
const startup = createEmptyPluginRegistry();
startup.agentEventSubscriptions = [
{
pluginId: "startup-plugin",
pluginName: "Startup Plugin",
source: "test",
subscription: {
id: "startup-subscription",
handle: (event) => {
observed.push(`startup:${event.stream}`);
},
},
},
];
const replacement = createEmptyPluginRegistry();
replacement.agentEventSubscriptions = [
{
pluginId: "replacement-plugin",
pluginName: "Replacement Plugin",
source: "test",
subscription: {
id: "replacement-subscription",
handle: (event) => {
observed.push(`replacement:${event.stream}`);
},
},
},
];
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
setActivePluginRegistry(replacement);
emitAgentEvent({
runId: "run-pinned-agent-events",
stream: "approval",
data: { state: "queued" },
});
expect(observed.toSorted()).toEqual(["replacement:approval", "startup:approval"]);
});
it("dedupes the agent-event bridge across multiple runtime module instances", async () => {
const observed: string[] = [];
const runtimeA = await import(new URL("./runtime.ts?runtimeA", import.meta.url).href);
const runtimeB = await import(new URL("./runtime.ts?runtimeB", import.meta.url).href);
const startup = createEmptyPluginRegistry();
startup.agentEventSubscriptions = [
{
pluginId: "startup-plugin",
pluginName: "Startup Plugin",
source: "test",
subscription: {
id: "startup-subscription",
handle: (event) => {
observed.push(`startup:${event.stream}`);
},
},
},
];
const replacement = createEmptyPluginRegistry();
replacement.agentEventSubscriptions = [
{
pluginId: "replacement-plugin",
pluginName: "Replacement Plugin",
source: "test",
subscription: {
id: "replacement-subscription",
handle: (event) => {
observed.push(`replacement:${event.stream}`);
},
},
},
];
runtimeA.setActivePluginRegistry(startup);
runtimeA.pinActivePluginChannelRegistry(startup);
runtimeB.setActivePluginRegistry(replacement);
emitAgentEvent({
runId: "run-cross-module-pinned-agent-events",
stream: "approval",
data: { state: "queued" },
});
expect(observed.toSorted()).toEqual(["replacement:approval", "startup:approval"]);
});
});