Refactor: centralize native approval lifecycle assembly (#62135)

Merged via squash.

Prepared head SHA: b7c20a7398
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-04-07 14:40:26 -04:00
committed by GitHub
parent 4108901932
commit d78512b09d
128 changed files with 8839 additions and 3995 deletions

View File

@@ -442,7 +442,7 @@ describe("plugin-sdk subpath exports", () => {
resolve(REPO_ROOT, "extensions"),
resolve(REPO_ROOT, "test"),
],
pattern: /openclaw\/plugin-sdk\/channel-runtime/u,
pattern: /openclaw\/plugin-sdk\/channel-runtime(?=["'])/u,
exclude: ["src/plugins/sdk-alias.test.ts"],
});
expect(matches).toEqual([]);

View File

@@ -0,0 +1,226 @@
import { describe, expect, it, vi } from "vitest";
import { createRuntimeChannel } from "./runtime-channel.js";
describe("runtimeContexts", () => {
it("registers, resolves, watches, and unregisters contexts", () => {
const channel = createRuntimeChannel();
const onEvent = vi.fn();
const unsubscribe = channel.runtimeContexts.watch({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
onEvent,
});
const lease = channel.runtimeContexts.register({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
context: { client: "ok" },
});
expect(
channel.runtimeContexts.get<{ client: string }>({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
}),
).toEqual({ client: "ok" });
expect(onEvent).toHaveBeenCalledWith({
type: "registered",
key: {
channelId: "matrix",
accountId: "default",
capability: "approval.native",
},
context: { client: "ok" },
});
lease.dispose();
expect(
channel.runtimeContexts.get({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
}),
).toBeUndefined();
expect(onEvent).toHaveBeenLastCalledWith({
type: "unregistered",
key: {
channelId: "matrix",
accountId: "default",
capability: "approval.native",
},
});
unsubscribe();
});
it("auto-disposes registrations when the abort signal fires", () => {
const channel = createRuntimeChannel();
const controller = new AbortController();
const lease = channel.runtimeContexts.register({
channelId: "telegram",
accountId: "default",
capability: "approval.native",
context: { token: "abc" },
abortSignal: controller.signal,
});
controller.abort();
expect(
channel.runtimeContexts.get({
channelId: "telegram",
accountId: "default",
capability: "approval.native",
}),
).toBeUndefined();
lease.dispose();
});
it("does not register contexts when the abort signal is already aborted", () => {
const channel = createRuntimeChannel();
const onEvent = vi.fn();
const controller = new AbortController();
controller.abort();
channel.runtimeContexts.watch({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
onEvent,
});
const lease = channel.runtimeContexts.register({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
context: { client: "stale" },
abortSignal: controller.signal,
});
expect(
channel.runtimeContexts.get({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
}),
).toBeUndefined();
expect(onEvent).not.toHaveBeenCalled();
lease.dispose();
});
it("isolates watcher exceptions so registration and disposal still complete", () => {
const channel = createRuntimeChannel();
const badWatcher = vi.fn((event) => {
throw new Error(`boom:${event.type}`);
});
const goodWatcher = vi.fn();
channel.runtimeContexts.watch({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
onEvent: badWatcher,
});
channel.runtimeContexts.watch({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
onEvent: goodWatcher,
});
const lease = channel.runtimeContexts.register({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
context: { client: "ok" },
});
expect(
channel.runtimeContexts.get({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
}),
).toEqual({ client: "ok" });
expect(badWatcher).toHaveBeenCalledWith(
expect.objectContaining({
type: "registered",
}),
);
expect(goodWatcher).toHaveBeenCalledWith(
expect.objectContaining({
type: "registered",
}),
);
lease.dispose();
expect(
channel.runtimeContexts.get({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
}),
).toBeUndefined();
expect(badWatcher).toHaveBeenCalledWith(
expect.objectContaining({
type: "unregistered",
}),
);
expect(goodWatcher).toHaveBeenCalledWith(
expect.objectContaining({
type: "unregistered",
}),
);
});
it("auto-disposes when a watcher aborts during the registered event", () => {
const channel = createRuntimeChannel();
const controller = new AbortController();
const onEvent = vi.fn((event) => {
if (event.type === "registered") {
controller.abort();
}
});
channel.runtimeContexts.watch({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
onEvent,
});
const lease = channel.runtimeContexts.register({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
context: { client: "ok" },
abortSignal: controller.signal,
});
expect(
channel.runtimeContexts.get({
channelId: "matrix",
accountId: "default",
capability: "approval.native",
}),
).toBeUndefined();
expect(onEvent).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
type: "registered",
}),
);
expect(onEvent).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
type: "unregistered",
}),
);
lease.dispose();
});
});

View File

@@ -57,6 +57,7 @@ import {
updateLastRoute,
} from "../../config/sessions.js";
import { getChannelActivity, recordChannelActivity } from "../../infra/channel-activity.js";
import { createSubsystemLogger } from "../../logging.js";
import { convertMarkdownTables } from "../../markdown/tables.js";
import { fetchRemoteMedia } from "../../media/fetch.js";
import { saveMediaBuffer } from "../../media/store.js";
@@ -66,9 +67,103 @@ import {
upsertChannelPairingRequest,
} from "../../pairing/pairing-store.js";
import { buildAgentSessionKey, resolveAgentRoute } from "../../routing/resolve-route.js";
import type {
PluginRuntimeChannelContextEvent,
PluginRuntimeChannelContextKey,
} from "./types-channel.js";
import type { PluginRuntime } from "./types.js";
type StoredRuntimeContext = {
token: symbol;
context: unknown;
normalizedKey: {
channelId: string;
accountId?: string;
capability: string;
};
};
const log = createSubsystemLogger("plugins/runtime-channel");
function normalizeRuntimeContextString(value: string | null | undefined): string {
return value?.trim() ?? "";
}
function normalizeRuntimeContextKey(params: PluginRuntimeChannelContextKey): {
mapKey: string;
normalizedKey: {
channelId: string;
accountId?: string;
capability: string;
};
} | null {
const channelId = normalizeRuntimeContextString(params.channelId);
const capability = normalizeRuntimeContextString(params.capability);
const accountId = normalizeRuntimeContextString(params.accountId);
if (!channelId || !capability) {
return null;
}
return {
mapKey: `${channelId}\u0000${accountId}\u0000${capability}`,
normalizedKey: {
channelId,
capability,
...(accountId ? { accountId } : {}),
},
};
}
function doesRuntimeContextWatcherMatch(params: {
watcher: {
channelId?: string;
accountId?: string;
capability?: string;
};
event: PluginRuntimeChannelContextEvent;
}): boolean {
if (params.watcher.channelId && params.watcher.channelId !== params.event.key.channelId) {
return false;
}
if (
params.watcher.accountId !== undefined &&
params.watcher.accountId !== (params.event.key.accountId ?? "")
) {
return false;
}
if (params.watcher.capability && params.watcher.capability !== params.event.key.capability) {
return false;
}
return true;
}
export function createRuntimeChannel(): PluginRuntime["channel"] {
const runtimeContexts = new Map<string, StoredRuntimeContext>();
const runtimeContextWatchers = new Set<{
filter: {
channelId?: string;
accountId?: string;
capability?: string;
};
onEvent: (event: PluginRuntimeChannelContextEvent) => void;
}>();
const emitRuntimeContextEvent = (event: PluginRuntimeChannelContextEvent) => {
for (const watcher of runtimeContextWatchers) {
if (!doesRuntimeContextWatcherMatch({ watcher: watcher.filter, event })) {
continue;
}
try {
watcher.onEvent(event);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
log.error(
`runtime context watcher failed during ${event.type} ` +
`channel=${event.key.channelId} capability=${event.key.capability}` +
(event.key.accountId ? ` account=${event.key.accountId}` : "") +
`: ${message}`,
);
}
}
};
const channelRuntime = {
text: {
chunkByNewline,
@@ -172,6 +267,74 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
maxAgeMs,
}),
},
runtimeContexts: {
register: (params) => {
const normalized = normalizeRuntimeContextKey(params);
if (!normalized) {
return { dispose: () => {} };
}
if (params.abortSignal?.aborted) {
return { dispose: () => {} };
}
const token = Symbol(normalized.mapKey);
let disposed = false;
const dispose = () => {
if (disposed) {
return;
}
disposed = true;
const current = runtimeContexts.get(normalized.mapKey);
if (!current || current.token !== token) {
return;
}
runtimeContexts.delete(normalized.mapKey);
emitRuntimeContextEvent({
type: "unregistered",
key: normalized.normalizedKey,
});
};
params.abortSignal?.addEventListener("abort", dispose, { once: true });
if (params.abortSignal?.aborted) {
dispose();
return { dispose };
}
runtimeContexts.set(normalized.mapKey, {
token,
context: params.context,
normalizedKey: normalized.normalizedKey,
});
if (disposed) {
return { dispose };
}
emitRuntimeContextEvent({
type: "registered",
key: normalized.normalizedKey,
context: params.context,
});
return { dispose };
},
get: <T = unknown>(params: PluginRuntimeChannelContextKey) => {
const normalized = normalizeRuntimeContextKey(params);
if (!normalized) {
return undefined;
}
return runtimeContexts.get(normalized.mapKey)?.context as T | undefined;
},
watch: (params) => {
const watcher = {
filter: {
...(params.channelId?.trim() ? { channelId: params.channelId.trim() } : {}),
...(params.accountId != null ? { accountId: params.accountId.trim() } : {}),
...(params.capability?.trim() ? { capability: params.capability.trim() } : {}),
},
onEvent: params.onEvent,
};
runtimeContextWatchers.add(watcher);
return () => {
runtimeContextWatchers.delete(watcher);
};
},
},
} satisfies PluginRuntime["channel"];
return channelRuntime as PluginRuntime["channel"];

View File

@@ -29,6 +29,38 @@ export type RuntimeThreadBindingLifecycleRecord =
maxAgeMs?: number;
};
export type PluginRuntimeChannelContextKey = {
channelId: string;
accountId?: string | null;
capability: string;
};
export type PluginRuntimeChannelContextEvent = {
type: "registered" | "unregistered";
key: {
channelId: string;
accountId?: string;
capability: string;
};
context?: unknown;
};
export type PluginRuntimeChannelContextRegistry = {
register: (
params: PluginRuntimeChannelContextKey & {
context: unknown;
abortSignal?: AbortSignal;
},
) => { dispose: () => void };
get: <T = unknown>(params: PluginRuntimeChannelContextKey) => T | undefined;
watch: (params: {
channelId?: string;
accountId?: string | null;
capability?: string;
onEvent: (event: PluginRuntimeChannelContextEvent) => void;
}) => () => void;
};
export type PluginRuntimeChannel = {
text: {
chunkByNewline: typeof import("../../auto-reply/chunk.js").chunkByNewline;
@@ -121,4 +153,5 @@ export type PluginRuntimeChannel = {
maxAgeMs: number;
}) => RuntimeThreadBindingLifecycleRecord[];
};
runtimeContexts: PluginRuntimeChannelContextRegistry;
};