diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 0312f7a979a..24eb0f55e6d 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -ae28566c922ce79527943b069abc199de28e3898ec08eea12c4ff6050795f276 plugin-sdk-api-baseline.json -79446b23832949553b23e7cf92be37b81c69d123fc09bed6f8fc04bd98e9257d plugin-sdk-api-baseline.jsonl +d26a70c9ea3bd277135a1712556f07195fb464b5cd846d04f18c2166c319a73d plugin-sdk-api-baseline.json +9fe2cb122fb3de17eaaf54c7768f268aa689063cf9091bd4b0be9422550a70a8 plugin-sdk-api-baseline.jsonl diff --git a/extensions/imessage/src/monitor/monitor-provider.ts b/extensions/imessage/src/monitor/monitor-provider.ts index b7e0cebd522..50a56c7c100 100644 --- a/extensions/imessage/src/monitor/monitor-provider.ts +++ b/extensions/imessage/src/monitor/monitor-provider.ts @@ -12,16 +12,9 @@ import { } from "openclaw/plugin-sdk/conversation-runtime"; import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeScpRemoteHost } from "openclaw/plugin-sdk/host-runtime"; -import { - hasFinalInboundReplyDispatch, - runPreparedInboundReplyTurn, -} from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { isInboundPathAllowed, kindFromMime } from "openclaw/plugin-sdk/media-runtime"; -import { - clearHistoryEntriesIfEnabled, - DEFAULT_GROUP_HISTORY_LIMIT, - type HistoryEntry, -} from "openclaw/plugin-sdk/reply-history"; +import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history"; import { resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-runtime"; import { dispatchInboundMessage } from "openclaw/plugin-sdk/reply-runtime"; import { createReplyDispatcher } from "openclaw/plugin-sdk/reply-runtime"; @@ -442,7 +435,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }); - const { dispatchResult } = await runPreparedInboundReplyTurn({ + await runPreparedInboundReplyTurn({ channel: "imessage", accountId: decision.route.accountId, routeSessionKey: decision.route.sessionKey, @@ -475,6 +468,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P logVerbose(`imessage: failed updating session meta: ${String(err)}`); }, }, + history: { + isGroup: decision.isGroup, + historyKey: decision.historyKey, + historyMap: groupHistories, + limit: historyLimit, + }, onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher }), runDispatch: () => dispatchInboundMessage({ @@ -490,23 +489,6 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }), }); - if (!hasFinalInboundReplyDispatch(dispatchResult)) { - if (decision.isGroup && decision.historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: groupHistories, - historyKey: decision.historyKey, - limit: historyLimit, - }); - } - return; - } - if (decision.isGroup && decision.historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: groupHistories, - historyKey: decision.historyKey, - limit: historyLimit, - }); - } } const handleMessage = async (raw: unknown) => { diff --git a/extensions/line/src/monitor.ts b/extensions/line/src/monitor.ts index 00380f12f5d..4894f21739e 100644 --- a/extensions/line/src/monitor.ts +++ b/extensions/line/src/monitor.ts @@ -1,15 +1,8 @@ import type { webhook } from "@line/bot-sdk"; import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; -import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; -import { - hasFinalInboundReplyDispatch, - runPreparedInboundReplyTurn, -} from "openclaw/plugin-sdk/inbound-reply-dispatch"; -import { - dispatchReplyWithBufferedBlockDispatcher, - chunkMarkdownText, -} from "openclaw/plugin-sdk/reply-runtime"; +import { hasFinalInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { chunkMarkdownText } from "openclaw/plugin-sdk/reply-runtime"; import { danger, logVerbose, @@ -32,6 +25,7 @@ import { deliverLineAutoReply } from "./auto-reply-delivery.js"; import { createLineBot } from "./bot.js"; import { processLineMessage } from "./markdown-to-line.js"; import { sendLineReplyChunks } from "./reply-chunks.js"; +import { getLineRuntime } from "./runtime.js"; import { createFlexMessage, createImageMessage, @@ -236,21 +230,36 @@ export async function monitorLineProvider( accountId: route.accountId, }); - const { dispatchResult } = await runPreparedInboundReplyTurn({ + const core = getLineRuntime(); + const { dispatchResult } = await core.channel.turn.run({ channel: "line", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath: ctx.turn.storePath, - ctxPayload, - recordInboundSession, - record: ctx.turn.record, - runDispatch: () => - dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, + raw: ctx, + adapter: { + ingest: () => ({ + id: ctxPayload.MessageSid ?? `${ctxPayload.From}:${Date.now()}`, + rawText: ctxPayload.RawBody ?? ctxPayload.BodyForAgent ?? "", + }), + resolveTurn: () => ({ cfg: config, + channel: "line", + accountId: route.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath: ctx.turn.storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + record: ctx.turn.record, dispatcherOptions: { ...replyPipeline, - deliver: async (payload, _info) => { + }, + replyOptions: { + onModelSelected, + }, + delivery: { + deliver: async (payload) => { const lineData = (payload.channelData?.line as LineChannelData | undefined) ?? {}; if (ctx.userId && !ctx.isGroup) { @@ -304,10 +313,8 @@ export async function monitorLineProvider( runtime.error?.(danger(`line ${info.kind} reply failed: ${String(err)}`)); }, }, - replyOptions: { - onModelSelected, - }, }), + }, }); if (!hasFinalInboundReplyDispatch(dispatchResult)) { logVerbose(`line: no response generated for message from ${ctxPayload.From}`); diff --git a/extensions/signal/src/monitor/event-handler.ts b/extensions/signal/src/monitor/event-handler.ts index e05da655040..a81b550b6cc 100644 --- a/extensions/signal/src/monitor/event-handler.ts +++ b/extensions/signal/src/monitor/event-handler.ts @@ -25,14 +25,10 @@ import { toInternalMessageReceivedContext, triggerInternalHook, } from "openclaw/plugin-sdk/hook-runtime"; -import { - hasFinalInboundReplyDispatch, - runPreparedInboundReplyTurn, -} from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { kindFromMime } from "openclaw/plugin-sdk/media-runtime"; import { buildPendingHistoryContextFromMap, - clearHistoryEntriesIfEnabled, recordPendingHistoryEntryIfEnabled, } from "openclaw/plugin-sdk/reply-history"; import { dispatchInboundMessage } from "openclaw/plugin-sdk/reply-runtime"; @@ -292,7 +288,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { }, }); - const { dispatchResult } = await runPreparedInboundReplyTurn({ + await runPreparedInboundReplyTurn({ channel: "signal", accountId: route.accountId, routeSessionKey: route.sessionKey, @@ -331,6 +327,12 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { logVerbose(`signal: failed updating session meta: ${String(err)}`); }, }, + history: { + isGroup: entry.isGroup, + historyKey, + historyMap: deps.groupHistories, + limit: deps.historyLimit, + }, onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher, @@ -354,23 +356,6 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { } }, }); - if (!hasFinalInboundReplyDispatch(dispatchResult)) { - if (entry.isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: deps.groupHistories, - historyKey, - limit: deps.historyLimit, - }); - } - return; - } - if (entry.isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: deps.groupHistories, - historyKey, - limit: deps.historyLimit, - }); - } } const { debouncer: inboundDebouncer } = createChannelInboundDebouncer({ diff --git a/src/channels/turn/context.test.ts b/src/channels/turn/context.test.ts index cec36b15223..24353024f33 100644 --- a/src/channels/turn/context.test.ts +++ b/src/channels/turn/context.test.ts @@ -1,5 +1,40 @@ import { describe, expect, it } from "vitest"; -import { buildChannelTurnContext } from "./context.js"; +import { buildChannelTurnContext, type BuildChannelTurnContextParams } from "./context.js"; + +function createBaseContextParams( + overrides: Partial = {}, +): BuildChannelTurnContextParams { + return { + channel: "test", + accountId: "acct", + messageId: "msg-1", + from: "test:user:u1", + sender: { + id: "u1", + }, + conversation: { + kind: "group", + id: "room-1", + routePeer: { + kind: "group", + id: "room-1", + }, + }, + route: { + agentId: "main", + routeSessionKey: "agent:main:test:group:room-1", + }, + reply: { + to: "test:room:room-1", + originatingTo: "test:room:room-1", + }, + message: { + rawBody: "hello", + envelopeFrom: "User One", + }, + ...overrides, + }; +} describe("buildChannelTurnContext", () => { it("maps normalized turn facts into a finalized message context", () => { @@ -139,4 +174,92 @@ describe("buildChannelTurnContext", () => { }), ); }); + + it("filters supplemental context with channel visibility policy", () => { + const ctx = buildChannelTurnContext( + createBaseContextParams({ + supplemental: { + quote: { + id: "quote-1", + body: "quoted", + sender: "Quoted User", + senderAllowed: false, + isQuote: true, + }, + forwarded: { + from: "Forwarded User", + fromId: "f1", + senderAllowed: false, + }, + thread: { + starterBody: "thread starter", + historyBody: "thread history", + senderAllowed: false, + }, + }, + contextVisibility: "allowlist", + }), + ); + + expect(ctx.ReplyToBody).toBeUndefined(); + expect(ctx.ReplyToSender).toBeUndefined(); + expect(ctx.ForwardedFrom).toBeUndefined(); + expect(ctx.ThreadStarterBody).toBeUndefined(); + expect(ctx.ThreadHistoryBody).toBeUndefined(); + }); + + it("keeps quoted context in allowlist_quote mode", () => { + const ctx = buildChannelTurnContext( + createBaseContextParams({ + supplemental: { + quote: { + id: "quote-1", + body: "quoted", + sender: "Quoted User", + senderAllowed: false, + isQuote: true, + }, + thread: { + starterBody: "thread starter", + senderAllowed: false, + }, + }, + contextVisibility: "allowlist_quote", + }), + ); + + expect(ctx.ReplyToBody).toBe("quoted"); + expect(ctx.ReplyToSender).toBe("Quoted User"); + expect(ctx.ThreadStarterBody).toBeUndefined(); + }); + + it("drops supplemental context with unknown sender allow state in restrictive modes", () => { + const ctx = buildChannelTurnContext( + createBaseContextParams({ + supplemental: { + quote: { + id: "quote-1", + body: "quoted", + sender: "Quoted User", + isQuote: true, + }, + forwarded: { + from: "Forwarded User", + fromId: "f1", + }, + thread: { + starterBody: "thread starter", + historyBody: "thread history", + }, + }, + contextVisibility: "allowlist_quote", + }), + ); + + expect(ctx.ReplyToBody).toBeUndefined(); + expect(ctx.ReplyToSender).toBeUndefined(); + expect(ctx.ForwardedFrom).toBeUndefined(); + expect(ctx.ThreadStarterBody).toBeUndefined(); + expect(ctx.ThreadHistoryBody).toBeUndefined(); + }); }); diff --git a/src/channels/turn/context.ts b/src/channels/turn/context.ts index 306a32be257..8a29db43eb9 100644 --- a/src/channels/turn/context.ts +++ b/src/channels/turn/context.ts @@ -1,5 +1,7 @@ import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import type { FinalizedMsgContext } from "../../auto-reply/templating.js"; +import type { ContextVisibilityMode } from "../../config/types.base.js"; +import { shouldIncludeSupplementalContext } from "../../security/context-visibility.js"; import type { AccessFacts, ConversationFacts, @@ -28,6 +30,7 @@ export type BuildChannelTurnContextParams = { access?: AccessFacts; media?: InboundMediaFacts[]; supplemental?: SupplementalContextFacts; + contextVisibility?: ContextVisibilityMode; extra?: Record; }; @@ -51,11 +54,70 @@ function commandAuthorized(access: AccessFacts | undefined): boolean | undefined return commands.authorizers.some((entry) => entry.allowed); } +function keepSupplementalContext(params: { + mode?: ContextVisibilityMode; + kind: "quote" | "forwarded" | "thread"; + senderAllowed?: boolean; +}): boolean { + if (!params.mode || params.mode === "all") { + return true; + } + if (params.senderAllowed === undefined) { + return false; + } + return shouldIncludeSupplementalContext({ + mode: params.mode, + kind: params.kind, + senderAllowed: params.senderAllowed, + }); +} + +export function filterChannelTurnSupplementalContext(params: { + supplemental?: SupplementalContextFacts; + contextVisibility?: ContextVisibilityMode; +}): SupplementalContextFacts | undefined { + const supplemental = params.supplemental; + if (!supplemental) { + return undefined; + } + const quote = keepSupplementalContext({ + mode: params.contextVisibility, + kind: "quote", + senderAllowed: supplemental.quote?.senderAllowed, + }) + ? supplemental.quote + : undefined; + const forwarded = keepSupplementalContext({ + mode: params.contextVisibility, + kind: "forwarded", + senderAllowed: supplemental.forwarded?.senderAllowed, + }) + ? supplemental.forwarded + : undefined; + const thread = keepSupplementalContext({ + mode: params.contextVisibility, + kind: "thread", + senderAllowed: supplemental.thread?.senderAllowed, + }) + ? supplemental.thread + : undefined; + + return { + ...supplemental, + quote, + forwarded, + thread, + }; +} + export function buildChannelTurnContext( params: BuildChannelTurnContextParams, ): FinalizedMsgContext { const media = params.media ?? []; - const supplemental = params.supplemental; + const supplemental = filterChannelTurnSupplementalContext({ + supplemental: params.supplemental, + contextVisibility: params.contextVisibility, + }); const body = params.message.body ?? params.message.rawBody; return finalizeInboundContext({ diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index 37df367f9be..58e10ceb9ec 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -127,6 +127,30 @@ describe("channel turn kernel", () => { ); }); + it("clears pending group history after a successful prepared turn", async () => { + const historyMap = new Map([["room-1", [{ sender: "User", body: "queued before reply" }]]]); + + await runPreparedChannelTurn({ + channel: "test", + routeSessionKey: "agent:main:test:group:room-1", + storePath: "/tmp/sessions.json", + ctxPayload: createCtx(), + recordInboundSession: createRecordInboundSession(), + runDispatch: vi.fn(async () => ({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + })), + history: { + isGroup: true, + historyKey: "room-1", + historyMap, + limit: 50, + }, + }); + + expect(historyMap.get("room-1")).toEqual([]); + }); + it("cleans up pre-created dispatchers when session recording fails", async () => { const events: string[] = []; const recordError = new Error("session store failed"); diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index ebc4f7e88de..2e5afe56fd7 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -1,11 +1,13 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; -export { buildChannelTurnContext } from "./context.js"; +import { clearHistoryEntriesIfEnabled } from "../../auto-reply/reply/history.js"; +export { buildChannelTurnContext, filterChannelTurnSupplementalContext } from "./context.js"; export type { BuildChannelTurnContextParams } from "./context.js"; import type { AssembledChannelTurn, ChannelEventClass, ChannelTurnAdmission, ChannelTurnDeliveryAdapter, + ChannelTurnHistoryFinalizeOptions, ChannelTurnLogEvent, ChannelTurnResult, DispatchedChannelTurnResult, @@ -31,6 +33,7 @@ export type { ChannelTurnAdapter, ChannelTurnAdmission, ChannelTurnDeliveryAdapter, + ChannelTurnHistoryFinalizeOptions, ChannelTurnDispatcherOptions, ChannelTurnLogEvent, ChannelTurnRecordOptions, @@ -97,6 +100,17 @@ export function createNoopChannelTurnDeliveryAdapter(): ChannelTurnDeliveryAdapt }; } +function clearPendingHistoryAfterTurn(params?: ChannelTurnHistoryFinalizeOptions): void { + if (!params?.isGroup || !params.historyKey || !params.historyMap || params.limit === undefined) { + return; + } + clearHistoryEntriesIfEnabled({ + historyMap: params.historyMap, + historyKey: params.historyKey, + limit: params.limit, + }); +} + export async function dispatchAssembledChannelTurn( params: AssembledChannelTurn, ): Promise { @@ -108,6 +122,7 @@ export async function dispatchAssembledChannelTurn( ctxPayload: params.ctxPayload, recordInboundSession: params.recordInboundSession, record: params.record, + history: params.history, admission: params.admission, log: params.log, messageId: params.messageId, @@ -222,6 +237,7 @@ export async function runPreparedChannelTurn< admission: admission.kind, }, }); + clearPendingHistoryAfterTurn(params.history); return { admission, diff --git a/src/channels/turn/types.ts b/src/channels/turn/types.ts index 5714c6c5c47..b6e60b15b0d 100644 --- a/src/channels/turn/types.ts +++ b/src/channels/turn/types.ts @@ -2,6 +2,7 @@ import type { GetReplyOptions } from "../../auto-reply/get-reply-options.types.j import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import type { DispatchFromConfigResult } from "../../auto-reply/reply/dispatch-from-config.types.js"; import type { GetReplyFromConfig } from "../../auto-reply/reply/get-reply.types.js"; +import type { HistoryEntry } from "../../auto-reply/reply/history.js"; import type { DispatchReplyWithBufferedBlockDispatcher } from "../../auto-reply/reply/provider-dispatcher.types.js"; import type { ReplyDispatcherWithTypingOptions } from "../../auto-reply/reply/reply-dispatcher.js"; import type { ReplyDispatchKind } from "../../auto-reply/reply/reply-dispatcher.types.js"; @@ -133,6 +134,7 @@ export type SupplementalContextFacts = { fromType?: string; fromId?: string; date?: number; + senderAllowed?: boolean; }; thread?: { id?: string; @@ -189,6 +191,13 @@ export type ChannelTurnRecordOptions = { trackSessionMetaTask?: (task: Promise) => void; }; +export type ChannelTurnHistoryFinalizeOptions = { + isGroup?: boolean; + historyKey?: string; + historyMap?: Map; + limit?: number; +}; + export type ChannelTurnDispatcherOptions = Omit< ReplyDispatcherWithTypingOptions, "deliver" | "onError" @@ -209,6 +218,7 @@ export type AssembledChannelTurn = { replyOptions?: Omit; replyResolver?: GetReplyFromConfig; record?: ChannelTurnRecordOptions; + history?: ChannelTurnHistoryFinalizeOptions; admission?: Extract; log?: (event: ChannelTurnLogEvent) => void; messageId?: string; @@ -222,6 +232,7 @@ export type PreparedChannelTurn = { ctxPayload: FinalizedMsgContext; recordInboundSession: RecordInboundSession; record?: ChannelTurnRecordOptions; + history?: ChannelTurnHistoryFinalizeOptions; onPreDispatchFailure?: (err: unknown) => void | Promise; runDispatch: () => Promise; admission?: Extract;