refactor(channels): move more turn policy into kernel

This commit is contained in:
Peter Steinberger
2026-04-30 03:44:18 +01:00
parent a6390efeba
commit c403ea9063
9 changed files with 287 additions and 77 deletions

View File

@@ -1,2 +1,2 @@
ae28566c922ce79527943b069abc199de28e3898ec08eea12c4ff6050795f276 plugin-sdk-api-baseline.json
79446b23832949553b23e7cf92be37b81c69d123fc09bed6f8fc04bd98e9257d plugin-sdk-api-baseline.jsonl
d26a70c9ea3bd277135a1712556f07195fb464b5cd846d04f18c2166c319a73d plugin-sdk-api-baseline.json
9fe2cb122fb3de17eaaf54c7768f268aa689063cf9091bd4b0be9422550a70a8 plugin-sdk-api-baseline.jsonl

View File

@@ -12,16 +12,9 @@ import {
} from "openclaw/plugin-sdk/conversation-runtime";
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import { normalizeScpRemoteHost } from "openclaw/plugin-sdk/host-runtime";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { isInboundPathAllowed, kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import {
clearHistoryEntriesIfEnabled,
DEFAULT_GROUP_HISTORY_LIMIT,
type HistoryEntry,
} from "openclaw/plugin-sdk/reply-history";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import { resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-runtime";
import { dispatchInboundMessage } from "openclaw/plugin-sdk/reply-runtime";
import { createReplyDispatcher } from "openclaw/plugin-sdk/reply-runtime";
@@ -442,7 +435,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
});
const { dispatchResult } = await runPreparedInboundReplyTurn({
await runPreparedInboundReplyTurn({
channel: "imessage",
accountId: decision.route.accountId,
routeSessionKey: decision.route.sessionKey,
@@ -475,6 +468,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
},
},
history: {
isGroup: decision.isGroup,
historyKey: decision.historyKey,
historyMap: groupHistories,
limit: historyLimit,
},
onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher }),
runDispatch: () =>
dispatchInboundMessage({
@@ -490,23 +489,6 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
}),
});
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
if (decision.isGroup && decision.historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: groupHistories,
historyKey: decision.historyKey,
limit: historyLimit,
});
}
return;
}
if (decision.isGroup && decision.historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: groupHistories,
historyKey: decision.historyKey,
limit: historyLimit,
});
}
}
const handleMessage = async (raw: unknown) => {

View File

@@ -1,15 +1,8 @@
import type { webhook } from "@line/bot-sdk";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
dispatchReplyWithBufferedBlockDispatcher,
chunkMarkdownText,
} from "openclaw/plugin-sdk/reply-runtime";
import { hasFinalInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { chunkMarkdownText } from "openclaw/plugin-sdk/reply-runtime";
import {
danger,
logVerbose,
@@ -32,6 +25,7 @@ import { deliverLineAutoReply } from "./auto-reply-delivery.js";
import { createLineBot } from "./bot.js";
import { processLineMessage } from "./markdown-to-line.js";
import { sendLineReplyChunks } from "./reply-chunks.js";
import { getLineRuntime } from "./runtime.js";
import {
createFlexMessage,
createImageMessage,
@@ -236,21 +230,36 @@ export async function monitorLineProvider(
accountId: route.accountId,
});
const { dispatchResult } = await runPreparedInboundReplyTurn({
const core = getLineRuntime();
const { dispatchResult } = await core.channel.turn.run({
channel: "line",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath: ctx.turn.storePath,
ctxPayload,
recordInboundSession,
record: ctx.turn.record,
runDispatch: () =>
dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
raw: ctx,
adapter: {
ingest: () => ({
id: ctxPayload.MessageSid ?? `${ctxPayload.From}:${Date.now()}`,
rawText: ctxPayload.RawBody ?? ctxPayload.BodyForAgent ?? "",
}),
resolveTurn: () => ({
cfg: config,
channel: "line",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath: ctx.turn.storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
record: ctx.turn.record,
dispatcherOptions: {
...replyPipeline,
deliver: async (payload, _info) => {
},
replyOptions: {
onModelSelected,
},
delivery: {
deliver: async (payload) => {
const lineData = (payload.channelData?.line as LineChannelData | undefined) ?? {};
if (ctx.userId && !ctx.isGroup) {
@@ -304,10 +313,8 @@ export async function monitorLineProvider(
runtime.error?.(danger(`line ${info.kind} reply failed: ${String(err)}`));
},
},
replyOptions: {
onModelSelected,
},
}),
},
});
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
logVerbose(`line: no response generated for message from ${ctxPayload.From}`);

View File

@@ -25,14 +25,10 @@ import {
toInternalMessageReceivedContext,
triggerInternalHook,
} from "openclaw/plugin-sdk/hook-runtime";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
recordPendingHistoryEntryIfEnabled,
} from "openclaw/plugin-sdk/reply-history";
import { dispatchInboundMessage } from "openclaw/plugin-sdk/reply-runtime";
@@ -292,7 +288,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
},
});
const { dispatchResult } = await runPreparedInboundReplyTurn({
await runPreparedInboundReplyTurn({
channel: "signal",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
@@ -331,6 +327,12 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
},
history: {
isGroup: entry.isGroup,
historyKey,
historyMap: deps.groupHistories,
limit: deps.historyLimit,
},
onPreDispatchFailure: () =>
settleReplyDispatcher({
dispatcher,
@@ -354,23 +356,6 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
}
},
});
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
return;
}
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
}
const { debouncer: inboundDebouncer } = createChannelInboundDebouncer<SignalInboundEntry>({

View File

@@ -1,5 +1,40 @@
import { describe, expect, it } from "vitest";
import { buildChannelTurnContext } from "./context.js";
import { buildChannelTurnContext, type BuildChannelTurnContextParams } from "./context.js";
function createBaseContextParams(
overrides: Partial<BuildChannelTurnContextParams> = {},
): BuildChannelTurnContextParams {
return {
channel: "test",
accountId: "acct",
messageId: "msg-1",
from: "test:user:u1",
sender: {
id: "u1",
},
conversation: {
kind: "group",
id: "room-1",
routePeer: {
kind: "group",
id: "room-1",
},
},
route: {
agentId: "main",
routeSessionKey: "agent:main:test:group:room-1",
},
reply: {
to: "test:room:room-1",
originatingTo: "test:room:room-1",
},
message: {
rawBody: "hello",
envelopeFrom: "User One",
},
...overrides,
};
}
describe("buildChannelTurnContext", () => {
it("maps normalized turn facts into a finalized message context", () => {
@@ -139,4 +174,92 @@ describe("buildChannelTurnContext", () => {
}),
);
});
it("filters supplemental context with channel visibility policy", () => {
const ctx = buildChannelTurnContext(
createBaseContextParams({
supplemental: {
quote: {
id: "quote-1",
body: "quoted",
sender: "Quoted User",
senderAllowed: false,
isQuote: true,
},
forwarded: {
from: "Forwarded User",
fromId: "f1",
senderAllowed: false,
},
thread: {
starterBody: "thread starter",
historyBody: "thread history",
senderAllowed: false,
},
},
contextVisibility: "allowlist",
}),
);
expect(ctx.ReplyToBody).toBeUndefined();
expect(ctx.ReplyToSender).toBeUndefined();
expect(ctx.ForwardedFrom).toBeUndefined();
expect(ctx.ThreadStarterBody).toBeUndefined();
expect(ctx.ThreadHistoryBody).toBeUndefined();
});
it("keeps quoted context in allowlist_quote mode", () => {
const ctx = buildChannelTurnContext(
createBaseContextParams({
supplemental: {
quote: {
id: "quote-1",
body: "quoted",
sender: "Quoted User",
senderAllowed: false,
isQuote: true,
},
thread: {
starterBody: "thread starter",
senderAllowed: false,
},
},
contextVisibility: "allowlist_quote",
}),
);
expect(ctx.ReplyToBody).toBe("quoted");
expect(ctx.ReplyToSender).toBe("Quoted User");
expect(ctx.ThreadStarterBody).toBeUndefined();
});
it("drops supplemental context with unknown sender allow state in restrictive modes", () => {
const ctx = buildChannelTurnContext(
createBaseContextParams({
supplemental: {
quote: {
id: "quote-1",
body: "quoted",
sender: "Quoted User",
isQuote: true,
},
forwarded: {
from: "Forwarded User",
fromId: "f1",
},
thread: {
starterBody: "thread starter",
historyBody: "thread history",
},
},
contextVisibility: "allowlist_quote",
}),
);
expect(ctx.ReplyToBody).toBeUndefined();
expect(ctx.ReplyToSender).toBeUndefined();
expect(ctx.ForwardedFrom).toBeUndefined();
expect(ctx.ThreadStarterBody).toBeUndefined();
expect(ctx.ThreadHistoryBody).toBeUndefined();
});
});

View File

@@ -1,5 +1,7 @@
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import type { FinalizedMsgContext } from "../../auto-reply/templating.js";
import type { ContextVisibilityMode } from "../../config/types.base.js";
import { shouldIncludeSupplementalContext } from "../../security/context-visibility.js";
import type {
AccessFacts,
ConversationFacts,
@@ -28,6 +30,7 @@ export type BuildChannelTurnContextParams = {
access?: AccessFacts;
media?: InboundMediaFacts[];
supplemental?: SupplementalContextFacts;
contextVisibility?: ContextVisibilityMode;
extra?: Record<string, unknown>;
};
@@ -51,11 +54,70 @@ function commandAuthorized(access: AccessFacts | undefined): boolean | undefined
return commands.authorizers.some((entry) => entry.allowed);
}
function keepSupplementalContext(params: {
mode?: ContextVisibilityMode;
kind: "quote" | "forwarded" | "thread";
senderAllowed?: boolean;
}): boolean {
if (!params.mode || params.mode === "all") {
return true;
}
if (params.senderAllowed === undefined) {
return false;
}
return shouldIncludeSupplementalContext({
mode: params.mode,
kind: params.kind,
senderAllowed: params.senderAllowed,
});
}
export function filterChannelTurnSupplementalContext(params: {
supplemental?: SupplementalContextFacts;
contextVisibility?: ContextVisibilityMode;
}): SupplementalContextFacts | undefined {
const supplemental = params.supplemental;
if (!supplemental) {
return undefined;
}
const quote = keepSupplementalContext({
mode: params.contextVisibility,
kind: "quote",
senderAllowed: supplemental.quote?.senderAllowed,
})
? supplemental.quote
: undefined;
const forwarded = keepSupplementalContext({
mode: params.contextVisibility,
kind: "forwarded",
senderAllowed: supplemental.forwarded?.senderAllowed,
})
? supplemental.forwarded
: undefined;
const thread = keepSupplementalContext({
mode: params.contextVisibility,
kind: "thread",
senderAllowed: supplemental.thread?.senderAllowed,
})
? supplemental.thread
: undefined;
return {
...supplemental,
quote,
forwarded,
thread,
};
}
export function buildChannelTurnContext(
params: BuildChannelTurnContextParams,
): FinalizedMsgContext {
const media = params.media ?? [];
const supplemental = params.supplemental;
const supplemental = filterChannelTurnSupplementalContext({
supplemental: params.supplemental,
contextVisibility: params.contextVisibility,
});
const body = params.message.body ?? params.message.rawBody;
return finalizeInboundContext({

View File

@@ -127,6 +127,30 @@ describe("channel turn kernel", () => {
);
});
it("clears pending group history after a successful prepared turn", async () => {
const historyMap = new Map([["room-1", [{ sender: "User", body: "queued before reply" }]]]);
await runPreparedChannelTurn({
channel: "test",
routeSessionKey: "agent:main:test:group:room-1",
storePath: "/tmp/sessions.json",
ctxPayload: createCtx(),
recordInboundSession: createRecordInboundSession(),
runDispatch: vi.fn(async () => ({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
})),
history: {
isGroup: true,
historyKey: "room-1",
historyMap,
limit: 50,
},
});
expect(historyMap.get("room-1")).toEqual([]);
});
it("cleans up pre-created dispatchers when session recording fails", async () => {
const events: string[] = [];
const recordError = new Error("session store failed");

View File

@@ -1,11 +1,13 @@
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
export { buildChannelTurnContext } from "./context.js";
import { clearHistoryEntriesIfEnabled } from "../../auto-reply/reply/history.js";
export { buildChannelTurnContext, filterChannelTurnSupplementalContext } from "./context.js";
export type { BuildChannelTurnContextParams } from "./context.js";
import type {
AssembledChannelTurn,
ChannelEventClass,
ChannelTurnAdmission,
ChannelTurnDeliveryAdapter,
ChannelTurnHistoryFinalizeOptions,
ChannelTurnLogEvent,
ChannelTurnResult,
DispatchedChannelTurnResult,
@@ -31,6 +33,7 @@ export type {
ChannelTurnAdapter,
ChannelTurnAdmission,
ChannelTurnDeliveryAdapter,
ChannelTurnHistoryFinalizeOptions,
ChannelTurnDispatcherOptions,
ChannelTurnLogEvent,
ChannelTurnRecordOptions,
@@ -97,6 +100,17 @@ export function createNoopChannelTurnDeliveryAdapter(): ChannelTurnDeliveryAdapt
};
}
function clearPendingHistoryAfterTurn(params?: ChannelTurnHistoryFinalizeOptions): void {
if (!params?.isGroup || !params.historyKey || !params.historyMap || params.limit === undefined) {
return;
}
clearHistoryEntriesIfEnabled({
historyMap: params.historyMap,
historyKey: params.historyKey,
limit: params.limit,
});
}
export async function dispatchAssembledChannelTurn(
params: AssembledChannelTurn,
): Promise<DispatchedChannelTurnResult> {
@@ -108,6 +122,7 @@ export async function dispatchAssembledChannelTurn(
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
record: params.record,
history: params.history,
admission: params.admission,
log: params.log,
messageId: params.messageId,
@@ -222,6 +237,7 @@ export async function runPreparedChannelTurn<
admission: admission.kind,
},
});
clearPendingHistoryAfterTurn(params.history);
return {
admission,

View File

@@ -2,6 +2,7 @@ import type { GetReplyOptions } from "../../auto-reply/get-reply-options.types.j
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import type { DispatchFromConfigResult } from "../../auto-reply/reply/dispatch-from-config.types.js";
import type { GetReplyFromConfig } from "../../auto-reply/reply/get-reply.types.js";
import type { HistoryEntry } from "../../auto-reply/reply/history.js";
import type { DispatchReplyWithBufferedBlockDispatcher } from "../../auto-reply/reply/provider-dispatcher.types.js";
import type { ReplyDispatcherWithTypingOptions } from "../../auto-reply/reply/reply-dispatcher.js";
import type { ReplyDispatchKind } from "../../auto-reply/reply/reply-dispatcher.types.js";
@@ -133,6 +134,7 @@ export type SupplementalContextFacts = {
fromType?: string;
fromId?: string;
date?: number;
senderAllowed?: boolean;
};
thread?: {
id?: string;
@@ -189,6 +191,13 @@ export type ChannelTurnRecordOptions = {
trackSessionMetaTask?: (task: Promise<unknown>) => void;
};
export type ChannelTurnHistoryFinalizeOptions = {
isGroup?: boolean;
historyKey?: string;
historyMap?: Map<string, HistoryEntry[]>;
limit?: number;
};
export type ChannelTurnDispatcherOptions = Omit<
ReplyDispatcherWithTypingOptions,
"deliver" | "onError"
@@ -209,6 +218,7 @@ export type AssembledChannelTurn = {
replyOptions?: Omit<GetReplyOptions, "onBlockReply">;
replyResolver?: GetReplyFromConfig;
record?: ChannelTurnRecordOptions;
history?: ChannelTurnHistoryFinalizeOptions;
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
log?: (event: ChannelTurnLogEvent) => void;
messageId?: string;
@@ -222,6 +232,7 @@ export type PreparedChannelTurn<TDispatchResult = DispatchFromConfigResult> = {
ctxPayload: FinalizedMsgContext;
recordInboundSession: RecordInboundSession;
record?: ChannelTurnRecordOptions;
history?: ChannelTurnHistoryFinalizeOptions;
onPreDispatchFailure?: (err: unknown) => void | Promise<void>;
runDispatch: () => Promise<TDispatchResult>;
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;