refactor: converge plugin sdk channel helpers

This commit is contained in:
Peter Steinberger
2026-03-19 00:25:12 +00:00
parent 62b7b350c9
commit c70837f07d
16 changed files with 166 additions and 170 deletions

View File

@@ -38,10 +38,9 @@ import { normalizeBlueBubblesReactionInput, sendBlueBubblesReaction } from "./re
import type { OpenClawConfig } from "./runtime-api.js";
import {
DM_GROUP_ACCESS_REASON,
createScopedPairingAccess,
createReplyPrefixOptions,
createChannelPairingController,
createChannelReplyPipeline,
evictOldHistoryKeys,
issuePairingChallenge,
logAckFailure,
logInboundDrop,
logTypingFailure,
@@ -452,7 +451,7 @@ export async function processMessage(
target: WebhookTarget,
): Promise<void> {
const { account, config, runtime, core, statusSink } = target;
const pairing = createScopedPairingAccess({
const pairing = createChannelPairingController({
core,
channel: "bluebubbles",
accountId: account.accountId,
@@ -654,12 +653,10 @@ export async function processMessage(
}
if (accessDecision.decision === "pairing") {
await issuePairingChallenge({
channel: "bluebubbles",
await pairing.issueChallenge({
senderId: message.senderId,
senderIdLine: `Your BlueBubbles sender id: ${message.senderId}`,
meta: { name: message.senderName },
upsertPairingRequest: pairing.upsertPairingRequest,
onCreated: () => {
runtime.log?.(`[bluebubbles] pairing request sender=${message.senderId} created=true`);
logVerbose(core, runtime, `bluebubbles pairing request sender=${message.senderId}`);
@@ -1228,17 +1225,47 @@ export async function processMessage(
}, typingRestartDelayMs);
};
try {
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg: config,
agentId: route.agentId,
channel: "bluebubbles",
accountId: account.accountId,
typingCallbacks: {
onReplyStart: async () => {
if (!chatGuidForActions) {
return;
}
if (!baseUrl || !password) {
return;
}
streamingActive = true;
clearTypingRestartTimer();
try {
await sendBlueBubblesTyping(chatGuidForActions, true, {
cfg: config,
accountId: account.accountId,
});
} catch (err) {
runtime.error?.(`[bluebubbles] typing start failed: ${String(err)}`);
}
},
onIdle: () => {
if (!chatGuidForActions) {
return;
}
if (!baseUrl || !password) {
return;
}
// Intentionally no-op for block streaming. We stop typing in finally
// after the run completes to avoid flicker between paragraph blocks.
},
},
});
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
...prefixOptions,
...replyPipeline,
deliver: async (payload, info) => {
const rawReplyToId =
privateApiEnabled && typeof payload.replyToId === "string"
@@ -1356,34 +1383,8 @@ export async function processMessage(
}
}
},
onReplyStart: async () => {
if (!chatGuidForActions) {
return;
}
if (!baseUrl || !password) {
return;
}
streamingActive = true;
clearTypingRestartTimer();
try {
await sendBlueBubblesTyping(chatGuidForActions, true, {
cfg: config,
accountId: account.accountId,
});
} catch (err) {
runtime.error?.(`[bluebubbles] typing start failed: ${String(err)}`);
}
},
onIdle: async () => {
if (!chatGuidForActions) {
return;
}
if (!baseUrl || !password) {
return;
}
// Intentionally no-op for block streaming. We stop typing in finally
// after the run completes to avoid flicker between paragraph blocks.
},
onReplyStart: typingCallbacks?.onReplyStart,
onIdle: typingCallbacks?.onIdle,
onError: (err, info) => {
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${String(err)}`);
},
@@ -1447,7 +1448,7 @@ export async function processReaction(
target: WebhookTarget,
): Promise<void> {
const { account, config, runtime, core } = target;
const pairing = createScopedPairingAccess({
const pairing = createChannelPairingController({
core,
channel: "bluebubbles",
accountId: account.accountId,

View File

@@ -1,7 +1,6 @@
import { KILOCODE_BASE_URL, KILOCODE_DEFAULT_MODEL_REF } from "openclaw/plugin-sdk/provider-models";
import {
applyAgentDefaultModelPrimary,
applyProviderConfigWithModelCatalog,
applyProviderConfigWithModelCatalogPreset,
type OpenClawConfig,
} from "openclaw/plugin-sdk/provider-onboard";
import { buildKilocodeProvider } from "./provider-catalog.js";
@@ -9,24 +8,22 @@ import { buildKilocodeProvider } from "./provider-catalog.js";
export { KILOCODE_BASE_URL, KILOCODE_DEFAULT_MODEL_REF };
export function applyKilocodeProviderConfig(cfg: OpenClawConfig): OpenClawConfig {
const models = { ...cfg.agents?.defaults?.models };
models[KILOCODE_DEFAULT_MODEL_REF] = {
...models[KILOCODE_DEFAULT_MODEL_REF],
alias: models[KILOCODE_DEFAULT_MODEL_REF]?.alias ?? "Kilo Gateway",
};
return applyProviderConfigWithModelCatalog(cfg, {
agentModels: models,
return applyProviderConfigWithModelCatalogPreset(cfg, {
providerId: "kilocode",
api: "openai-completions",
baseUrl: KILOCODE_BASE_URL,
catalogModels: buildKilocodeProvider().models ?? [],
aliases: [{ modelRef: KILOCODE_DEFAULT_MODEL_REF, alias: "Kilo Gateway" }],
});
}
export function applyKilocodeConfig(cfg: OpenClawConfig): OpenClawConfig {
return applyAgentDefaultModelPrimary(
applyKilocodeProviderConfig(cfg),
KILOCODE_DEFAULT_MODEL_REF,
);
return applyProviderConfigWithModelCatalogPreset(cfg, {
providerId: "kilocode",
api: "openai-completions",
baseUrl: KILOCODE_BASE_URL,
catalogModels: buildKilocodeProvider().models ?? [],
aliases: [{ modelRef: KILOCODE_DEFAULT_MODEL_REF, alias: "Kilo Gateway" }],
primaryModelRef: KILOCODE_DEFAULT_MODEL_REF,
});
}

View File

@@ -1,6 +1,6 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../runtime-api.js";
import { createReplyPrefixOptions } from "../runtime-api.js";
import { createChannelReplyPipeline } from "../runtime-api.js";
const { sendMessageMattermostMock } = vi.hoisted(() => ({
sendMessageMattermostMock: vi.fn(),
}));
@@ -431,7 +431,7 @@ describe("mattermostPlugin", () => {
},
};
const prefixContext = createReplyPrefixOptions({
const prefixContext = createChannelReplyPipeline({
cfg,
agentId: "main",
channel: "mattermost",

View File

@@ -9,9 +9,8 @@ import {
buildAgentMediaPayload,
buildModelsProviderData,
DM_GROUP_ACCESS_REASON,
createScopedPairingAccess,
createReplyPrefixOptions,
createTypingCallbacks,
createChannelPairingController,
createChannelReplyPipeline,
logInboundDrop,
logTypingFailure,
buildPendingHistoryContextFromMap,
@@ -245,7 +244,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
cfg,
accountId: opts.accountId,
});
const pairing = createScopedPairingAccess({
const pairing = createChannelPairingController({
core,
channel: "mattermost",
accountId: account.accountId,
@@ -462,26 +461,26 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
channel: "mattermost",
accountId: account.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "mattermost",
accountId: account.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: () => sendTypingIndicator(opts.channelId, threadContext.effectiveReplyToId),
onStartError: (err) => {
logTypingFailure({
log: (message) => logger.debug?.(message),
channel: "mattermost",
target: opts.channelId,
error: err,
});
typing: {
start: () => sendTypingIndicator(opts.channelId, threadContext.effectiveReplyToId),
onStartError: (err) => {
logTypingFailure({
log: (message) => logger.debug?.(message),
channel: "mattermost",
target: opts.channelId,
error: err,
});
},
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
await deliverMattermostReplyPayload({
@@ -504,7 +503,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
onError: (err, info) => {
runtime.error?.(`mattermost button-click ${info.kind} reply failed: ${String(err)}`);
},
onReplyStart: typingCallbacks.onReplyStart,
onReplyStart: typingCallbacks?.onReplyStart,
});
await core.channel.reply.dispatchReplyFromConfig({
@@ -653,30 +652,30 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
fallbackLimit: account.textChunkLimit ?? 4000,
},
);
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const shouldDeliverReplies = params.deliverReplies === true;
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: params.route.agentId,
channel: "mattermost",
accountId: account.accountId,
typing: shouldDeliverReplies
? {
start: () => sendTypingIndicator(params.channelId, params.effectiveReplyToId),
onStartError: (err) => {
logTypingFailure({
log: (message) => logger.debug?.(message),
channel: "mattermost",
target: params.channelId,
error: err,
});
},
}
: undefined,
});
const shouldDeliverReplies = params.deliverReplies === true;
const capturedTexts: string[] = [];
const typingCallbacks = shouldDeliverReplies
? createTypingCallbacks({
start: () => sendTypingIndicator(params.channelId, params.effectiveReplyToId),
onStartError: (err) => {
logTypingFailure({
log: (message) => logger.debug?.(message),
channel: "mattermost",
target: params.channelId,
error: err,
});
},
})
: undefined;
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
// Picker-triggered confirmations should stay immediate.
deliver: async (payload: ReplyPayload) => {
const trimmedPayload = {
@@ -1379,27 +1378,26 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
accountId: account.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "mattermost",
accountId: account.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: () => sendTypingIndicator(channelId, effectiveReplyToId),
onStartError: (err) => {
logTypingFailure({
log: (message) => logger.debug?.(message),
channel: "mattermost",
target: channelId,
error: err,
});
typing: {
start: () => sendTypingIndicator(channelId, effectiveReplyToId),
onStartError: (err) => {
logTypingFailure({
log: (message) => logger.debug?.(message),
channel: "mattermost",
target: channelId,
error: err,
});
},
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
typingCallbacks,
deliver: async (payload: ReplyPayload) => {

View File

@@ -9,8 +9,7 @@ import type { IncomingMessage, ServerResponse } from "node:http";
import type { ResolvedMattermostAccount } from "../mattermost/accounts.js";
import {
buildModelsProviderData,
createReplyPrefixOptions,
createTypingCallbacks,
createChannelReplyPipeline,
isRequestBodyLimitError,
logTypingFailure,
readRequestBodyWithLimit,
@@ -466,29 +465,28 @@ async function handleSlashCommandAsync(params: {
accountId: account.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "mattermost",
accountId: account.accountId,
typing: {
start: () => sendMattermostTyping(client, { channelId }),
onStartError: (err) => {
logTypingFailure({
log: (message) => log?.(message),
channel: "mattermost",
target: channelId,
error: err,
});
},
},
});
const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId);
const typingCallbacks = createTypingCallbacks({
start: () => sendMattermostTyping(client, { channelId }),
onStartError: (err) => {
logTypingFailure({
log: (message) => log?.(message),
channel: "mattermost",
target: channelId,
error: err,
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
humanDelay,
deliver: async (payload: ReplyPayload) => {
await deliverMattermostReplyPayload({
@@ -507,7 +505,7 @@ async function handleSlashCommandAsync(params: {
onError: (err, info) => {
runtime.error?.(`mattermost slash ${info.kind} reply failed: ${String(err)}`);
},
onReplyStart: typingCallbacks.onReplyStart,
onReplyStart: typingCallbacks?.onReplyStart,
});
await core.channel.reply.withReplyDispatcher({

View File

@@ -1,3 +1,4 @@
export type { SecretInput } from "openclaw/plugin-sdk/secret-input";
export {
buildSecretInputSchema,
hasConfiguredSecretInput,

View File

@@ -5,11 +5,11 @@ import {
applyAccountNameToChannelSection,
applySetupAccountConfigPatch,
DEFAULT_ACCOUNT_ID,
hasConfiguredSecretInput,
migrateBaseNameToDefaultAccount,
normalizeAccountId,
type OpenClawConfig,
} from "./runtime-api.js";
import { hasConfiguredSecretInput } from "./secret-input.js";
const channel = "mattermost" as const;

View File

@@ -5,9 +5,9 @@ import { normalizeMattermostBaseUrl } from "./mattermost/client.js";
import {
applySetupAccountConfigPatch,
DEFAULT_ACCOUNT_ID,
hasConfiguredSecretInput,
type OpenClawConfig,
} from "./runtime-api.js";
import { hasConfiguredSecretInput } from "./secret-input.js";
import {
isMattermostConfigured,
mattermostSetupAdapter,

View File

@@ -1,9 +1,5 @@
import type {
BlockStreamingCoalesceConfig,
DmPolicy,
GroupPolicy,
SecretInput,
} from "./runtime-api.js";
import type { BlockStreamingCoalesceConfig, DmPolicy, GroupPolicy } from "./runtime-api.js";
import type { SecretInput } from "./secret-input.js";
export type MattermostReplyToMode = "off" | "first" | "all";
export type MattermostChatTypeKey = "direct" | "channel" | "group";

View File

@@ -2,9 +2,9 @@ import {
DEFAULT_ACCOUNT_ID,
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
createChannelPairingController,
dispatchReplyFromConfigWithSettledDispatcher,
DEFAULT_GROUP_HISTORY_LIMIT,
createScopedPairingAccess,
logInboundDrop,
evaluateSenderGroupAccessForPolicy,
resolveSenderScopedGroupPolicy,
@@ -63,7 +63,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
log,
} = deps;
const core = getMSTeamsRuntime();
const pairing = createScopedPairingAccess({
const pairing = createChannelPairingController({
core,
channel: "msteams",
accountId: DEFAULT_ACCOUNT_ID,

View File

@@ -1,6 +1,5 @@
import {
createReplyPrefixOptions,
createTypingCallbacks,
createChannelReplyPipeline,
logTypingFailure,
resolveChannelMediaMaxBytes,
type OpenClawConfig,
@@ -73,28 +72,28 @@ export function createMSTeamsReplyDispatcher(params: {
});
};
const typingCallbacks = createTypingCallbacks({
start: sendTypingIndicator,
onStartError: (err) => {
logTypingFailure({
log: (message) => params.log.debug?.(message),
channel: "msteams",
action: "start",
error: err,
});
},
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg: params.cfg,
agentId: params.agentId,
channel: "msteams",
accountId: params.accountId,
typing: {
start: sendTypingIndicator,
onStartError: (err) => {
logTypingFailure({
log: (message) => params.log.debug?.(message),
channel: "msteams",
action: "start",
error: err,
});
},
},
});
const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "msteams");
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
typingCallbacks,
deliver: async (payload) => {

View File

@@ -51,15 +51,9 @@ export type {
ChannelMessageActionName,
} from "../channels/plugins/types.js";
export type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
export { createReplyPrefixOptions } from "../channels/reply-prefix.js";
export { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
export type { OpenClawConfig } from "../config/config.js";
export type { DmPolicy, GroupPolicy } from "../config/types.js";
export {
hasConfiguredSecretInput,
normalizeResolvedSecretInputString,
normalizeSecretInputString,
} from "../config/types.secrets.js";
export { buildSecretInputSchema } from "./secret-input-schema.js";
export { ToolPolicySchema } from "../config/zod-schema.agent-runtime.js";
export { MarkdownConfigSchema } from "../config/zod-schema.core.js";
export type { ParsedChatTarget } from "../../extensions/imessage/api.js";
@@ -85,23 +79,19 @@ export type { WizardPrompter } from "../wizard/prompts.js";
export { isAllowedParsedChatSender } from "./allow-from.js";
export { readBooleanParam } from "./boolean-param.js";
export { mapAllowFromEntries } from "./channel-config-helpers.js";
export { createScopedPairingAccess } from "./pairing-access.js";
export { issuePairingChallenge } from "../pairing/pairing-challenge.js";
export { createChannelPairingController } from "./channel-pairing.js";
export { resolveRequestUrl } from "./request-url.js";
export {
buildComputedAccountStatusSnapshot,
buildProbeChannelStatusSummary,
} from "./status-helpers.js";
export { extractToolSend } from "./tool-send.js";
export { normalizeWebhookPath } from "./webhook-path.js";
export {
beginWebhookRequestPipelineOrReject,
createWebhookInFlightLimiter,
normalizeWebhookPath,
readWebhookBodyOrReject,
} from "./webhook-request-guards.js";
export {
registerWebhookTargetWithPluginRoute,
resolveWebhookTargets,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
} from "./webhook-targets.js";
} from "./webhook-ingress.js";

View File

@@ -36,4 +36,24 @@ describe("createChannelReplyPipeline", () => {
expect(start).toHaveBeenCalled();
expect(stop).toHaveBeenCalled();
});
it("preserves explicit typing callbacks when a channel needs custom lifecycle hooks", async () => {
const onReplyStart = vi.fn(async () => {});
const onIdle = vi.fn(() => {});
const pipeline = createChannelReplyPipeline({
cfg: {},
agentId: "main",
channel: "bluebubbles",
typingCallbacks: {
onReplyStart,
onIdle,
},
});
await pipeline.typingCallbacks?.onReplyStart();
pipeline.typingCallbacks?.onIdle?.();
expect(onReplyStart).toHaveBeenCalledTimes(1);
expect(onIdle).toHaveBeenCalledTimes(1);
});
});

View File

@@ -25,6 +25,7 @@ export function createChannelReplyPipeline(params: {
channel?: string;
accountId?: string;
typing?: CreateTypingCallbacksParams;
typingCallbacks?: TypingCallbacks;
}): ChannelReplyPipeline {
return {
...createReplyPrefixOptions({
@@ -33,6 +34,10 @@ export function createChannelReplyPipeline(params: {
channel: params.channel,
accountId: params.accountId,
}),
...(params.typing ? { typingCallbacks: createTypingCallbacks(params.typing) } : {}),
...(params.typingCallbacks
? { typingCallbacks: params.typingCallbacks }
: params.typing
? { typingCallbacks: createTypingCallbacks(params.typing) }
: {}),
};
}

View File

@@ -50,8 +50,7 @@ export type {
} from "../channels/plugins/types.js";
export type { ChannelDirectoryEntry } from "../channels/plugins/types.core.js";
export type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
export { createReplyPrefixOptions } from "../channels/reply-prefix.js";
export { createTypingCallbacks } from "../channels/typing.js";
export { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
export type { OpenClawConfig } from "../config/config.js";
export { isDangerousNameMatchingEnabled } from "../config/dangerous-name-matching.js";
export { loadSessionStore, resolveStorePath } from "../config/sessions.js";
@@ -61,13 +60,6 @@ export {
warnMissingProviderGroupPolicyFallbackOnce,
} from "../config/runtime-group-policy.js";
export type { BlockStreamingCoalesceConfig, DmPolicy, GroupPolicy } from "../config/types.js";
export type { SecretInput } from "../config/types.secrets.js";
export {
hasConfiguredSecretInput,
normalizeResolvedSecretInputString,
normalizeSecretInputString,
} from "../config/types.secrets.js";
export { buildSecretInputSchema } from "./secret-input-schema.js";
export {
BlockStreamingCoalesceSchema,
DmPolicySchema,
@@ -100,5 +92,5 @@ export type { WizardPrompter } from "../wizard/prompts.js";
export { buildAgentMediaPayload } from "./agent-media-payload.js";
export { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
export { loadOutboundMediaFromUrl } from "./outbound-media.js";
export { createScopedPairingAccess } from "./pairing-access.js";
export { createChannelPairingController } from "./channel-pairing.js";
export { isRequestBodyLimitError, readRequestBodyWithLimit } from "../infra/http-body.js";

View File

@@ -52,8 +52,7 @@ export type {
ChannelOutboundAdapter,
} from "../channels/plugins/types.js";
export type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
export { createChannelReplyPipeline, createReplyPrefixOptions } from "./channel-reply-pipeline.js";
export { createTypingCallbacks } from "./channel-reply-pipeline.js";
export { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
export type { OpenClawConfig } from "../config/config.js";
export { isDangerousNameMatchingEnabled } from "../config/dangerous-name-matching.js";
export { resolveToolsBySender } from "../config/group-policy.js";
@@ -106,7 +105,7 @@ export { withFileLock } from "./file-lock.js";
export { dispatchReplyFromConfigWithSettledDispatcher } from "./inbound-reply-dispatch.js";
export { readJsonFileWithFallback, writeJsonFileAtomically } from "./json-store.js";
export { loadOutboundMediaFromUrl } from "./outbound-media.js";
export { createChannelPairingController, createScopedPairingAccess } from "./channel-pairing.js";
export { createChannelPairingController } from "./channel-pairing.js";
export { resolveInboundSessionEnvelopeContext } from "../channels/session-envelope.js";
export {
buildHostnameAllowlistPolicyFromSuffixAllowlist,