From 2eee70e0a64b4dcef7de908abcd52d6cd5e51d87 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 16:06:22 +0100 Subject: [PATCH] refactor: run prepared Discord and Slack turns Route Discord and Slack prepared message turns through the core prepared-turn runner directly. Local proof before landing: - node scripts/run-vitest.mjs src/channels/turn/kernel.test.ts extensions/discord/src/monitor/message-handler.process.test.ts extensions/slack/src/monitor/message-handler/prepare.test.ts extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts - node scripts/run-tsgo.mjs -p tsconfig.core.json --incremental false - node scripts/run-tsgo.mjs -p tsconfig.extensions.json --incremental false - OPENCLAW_TESTBOX_REMOTE_RUN=1 OPENCLAW_VITEST_MAX_WORKERS=1 pnpm check:changed - codex-review clean after accepted Slack bot-loop history cleanup finding was fixed in core GitHub checks had no failures; Blacksmith/GitHub runner jobs were still queued when maintainer approved landing based on local proof. --- .../src/monitor/message-handler.process.ts | 337 +++++++++--------- .../src/monitor/message-handler/dispatch.ts | 312 ++++++++-------- .../src/monitor/message-handler/prepare.ts | 9 + .../src/monitor/message-handler/types.ts | 7 + src/channels/turn/kernel.test.ts | 10 + src/channels/turn/kernel.ts | 1 + 6 files changed, 329 insertions(+), 347 deletions(-) diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 31ca9947789..f76344784c3 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -26,7 +26,7 @@ import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; import { hasFinalInboundReplyDispatch, recordChannelBotPairLoopAndCheckSuppression, - runInboundReplyTurn, + runPreparedInboundReplyTurn, } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime"; import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime"; @@ -631,184 +631,169 @@ export async function processDiscordMessage( await settleDispatchBeforeStart(); return; } - const preparedResult = await runInboundReplyTurn({ + const preparedResult = await runPreparedInboundReplyTurn({ channel: "discord", accountId: route.accountId, - raw: ctx, - adapter: { - ingest: () => ({ - id: message.id, - timestamp: message.timestamp ? Date.parse(message.timestamp) : undefined, - rawText: text, - textForAgent: ctxPayload.BodyForAgent, - textForCommands: ctxPayload.CommandBody, - raw: message, - }), - resolveTurn: () => ({ - channel: "discord", - accountId: route.accountId, - routeSessionKey: persistedSessionKey, - storePath: turn.storePath, - ctxPayload, - recordInboundSession, - record: turn.record, - history: { - isGroup: isGuildMessage, - historyKey: messageChannelId, - historyMap: guildHistories, - limit: historyLimit, - }, - onPreDispatchFailure: settleDispatchBeforeStart, - runDispatch: async () => { - return await dispatchInboundMessage({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - abortSignal, - skillFilter: channelConfig?.skills, - sourceReplyDeliveryMode, - disableBlockStreaming: sourceRepliesAreToolOnly - ? true - : (draftPreview.disableBlockStreamingForDraft ?? - (typeof resolvedBlockStreamingEnabled === "boolean" - ? !resolvedBlockStreamingEnabled - : undefined)), - onPartialReply: draftPreview.draftStream - ? (payload) => draftPreview.updateFromPartial(payload.text) - : undefined, - onAssistantMessageStart: draftPreview.draftStream - ? () => draftPreview.handleAssistantMessageBoundary() - : undefined, - onReasoningEnd: draftPreview.draftStream - ? () => draftPreview.handleAssistantMessageBoundary() - : undefined, - onModelSelected, - suppressDefaultToolProgressMessages: - draftPreview.suppressDefaultToolProgressMessages ? true : undefined, - onReasoningStream: async (payload) => { - await statusReactions.setThinking(); - const formattedText = payload?.text - ? formatReasoningMessage(payload.text) - : undefined; - await draftPreview.pushReasoningProgress(formattedText); - }, - onToolStart: async (payload) => { - if (isProcessAborted(abortSignal)) { - return; - } - await maybeBindStatusReactionsToToolReaction(payload); - await statusReactions.setTool(payload.name); - await draftPreview.pushToolProgress( - buildChannelProgressDraftLineForEntry( - discordConfig, - { - event: "tool", - name: payload.name, - phase: payload.phase, - args: payload.args, - }, - payload.detailMode ? { detailMode: payload.detailMode } : undefined, - ), - { toolName: payload.name }, - ); - }, - onItemEvent: async (payload) => { - await draftPreview.pushToolProgress( - buildChannelProgressDraftLineForEntry(discordConfig, { - event: "item", - itemId: payload.itemId, - itemKind: payload.kind, - title: payload.title, - name: payload.name, - phase: payload.phase, - status: payload.status, - summary: payload.summary, - progressText: payload.progressText, - meta: payload.meta, - }), - ); - }, - onPlanUpdate: async (payload) => { - if (payload.phase !== "update") { - return; - } - await draftPreview.pushToolProgress( - buildChannelProgressDraftLine({ - event: "plan", - phase: payload.phase, - title: payload.title, - explanation: payload.explanation, - steps: payload.steps, - }), - ); - }, - onApprovalEvent: async (payload) => { - if (payload.phase !== "requested") { - return; - } - await draftPreview.pushToolProgress( - buildChannelProgressDraftLine({ - event: "approval", - phase: payload.phase, - title: payload.title, - command: payload.command, - reason: payload.reason, - message: payload.message, - }), - ); - }, - onCommandOutput: async (payload) => { - if (payload.phase !== "end") { - return; - } - await draftPreview.pushToolProgress( - buildChannelProgressDraftLine({ - event: "command-output", - phase: payload.phase, - title: payload.title, - name: payload.name, - status: payload.status, - exitCode: payload.exitCode, - }), - ); - }, - onPatchSummary: async (payload) => { - if (payload.phase !== "end") { - return; - } - await draftPreview.pushToolProgress( - buildChannelProgressDraftLine({ - event: "patch", - phase: payload.phase, - title: payload.title, - name: payload.name, - added: payload.added, - modified: payload.modified, - deleted: payload.deleted, - summary: payload.summary, - }), - ); - }, - onCompactionStart: async () => { - if (isProcessAborted(abortSignal)) { - return; - } - await statusReactions.setCompacting(); - }, - onCompactionEnd: async () => { - if (isProcessAborted(abortSignal)) { - return; - } - statusReactions.cancelPending(); - await statusReactions.setThinking(); - }, - }, - }); - }, - }), + routeSessionKey: persistedSessionKey, + storePath: turn.storePath, + ctxPayload, + recordInboundSession, + record: turn.record, + history: { + isGroup: isGuildMessage, + historyKey: messageChannelId, + historyMap: guildHistories, + limit: historyLimit, }, + onPreDispatchFailure: settleDispatchBeforeStart, + runDispatch: async () => + await dispatchInboundMessage({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + abortSignal, + skillFilter: channelConfig?.skills, + sourceReplyDeliveryMode, + disableBlockStreaming: sourceRepliesAreToolOnly + ? true + : (draftPreview.disableBlockStreamingForDraft ?? + (typeof resolvedBlockStreamingEnabled === "boolean" + ? !resolvedBlockStreamingEnabled + : undefined)), + onPartialReply: draftPreview.draftStream + ? (payload) => draftPreview.updateFromPartial(payload.text) + : undefined, + onAssistantMessageStart: draftPreview.draftStream + ? () => draftPreview.handleAssistantMessageBoundary() + : undefined, + onReasoningEnd: draftPreview.draftStream + ? () => draftPreview.handleAssistantMessageBoundary() + : undefined, + onModelSelected, + suppressDefaultToolProgressMessages: draftPreview.suppressDefaultToolProgressMessages + ? true + : undefined, + onReasoningStream: async (payload) => { + await statusReactions.setThinking(); + const formattedText = payload?.text + ? formatReasoningMessage(payload.text) + : undefined; + await draftPreview.pushReasoningProgress(formattedText); + }, + onToolStart: async (payload) => { + if (isProcessAborted(abortSignal)) { + return; + } + await maybeBindStatusReactionsToToolReaction(payload); + await statusReactions.setTool(payload.name); + await draftPreview.pushToolProgress( + buildChannelProgressDraftLineForEntry( + discordConfig, + { + event: "tool", + name: payload.name, + phase: payload.phase, + args: payload.args, + }, + payload.detailMode ? { detailMode: payload.detailMode } : undefined, + ), + { toolName: payload.name }, + ); + }, + onItemEvent: async (payload) => { + await draftPreview.pushToolProgress( + buildChannelProgressDraftLineForEntry(discordConfig, { + event: "item", + itemId: payload.itemId, + itemKind: payload.kind, + title: payload.title, + name: payload.name, + phase: payload.phase, + status: payload.status, + summary: payload.summary, + progressText: payload.progressText, + meta: payload.meta, + }), + ); + }, + onPlanUpdate: async (payload) => { + if (payload.phase !== "update") { + return; + } + await draftPreview.pushToolProgress( + buildChannelProgressDraftLine({ + event: "plan", + phase: payload.phase, + title: payload.title, + explanation: payload.explanation, + steps: payload.steps, + }), + ); + }, + onApprovalEvent: async (payload) => { + if (payload.phase !== "requested") { + return; + } + await draftPreview.pushToolProgress( + buildChannelProgressDraftLine({ + event: "approval", + phase: payload.phase, + title: payload.title, + command: payload.command, + reason: payload.reason, + message: payload.message, + }), + ); + }, + onCommandOutput: async (payload) => { + if (payload.phase !== "end") { + return; + } + await draftPreview.pushToolProgress( + buildChannelProgressDraftLine({ + event: "command-output", + phase: payload.phase, + title: payload.title, + name: payload.name, + status: payload.status, + exitCode: payload.exitCode, + }), + ); + }, + onPatchSummary: async (payload) => { + if (payload.phase !== "end") { + return; + } + await draftPreview.pushToolProgress( + buildChannelProgressDraftLine({ + event: "patch", + phase: payload.phase, + title: payload.title, + name: payload.name, + added: payload.added, + modified: payload.modified, + deleted: payload.deleted, + summary: payload.summary, + }), + ); + }, + onCompactionStart: async () => { + if (isProcessAborted(abortSignal)) { + return; + } + await statusReactions.setCompacting(); + }, + onCompactionEnd: async () => { + if (isProcessAborted(abortSignal)) { + return; + } + statusReactions.cancelPending(); + await statusReactions.setThinking(); + }, + }, + }), }); if (!preparedResult.dispatched) { return; diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 21cbbd930e4..c94f637fa51 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -34,11 +34,10 @@ import { type ChannelBotLoopProtectionFacts, type ChannelTurnRecordOptions, hasVisibleInboundReplyDispatch, - runInboundReplyTurn, + runPreparedInboundReplyTurn, } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime"; import { mergePairLoopGuardConfig } from "openclaw/plugin-sdk/pair-loop-guard-runtime"; -import { createChannelHistoryWindow } from "openclaw/plugin-sdk/reply-history"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import type { ReplyDispatchKind, ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { resolveInboundLastRouteSessionKey } from "openclaw/plugin-sdk/routing"; @@ -1157,163 +1156,149 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag let counts: { final?: number; block?: number } = {}; let dispatchSettledBeforeStart = false; try { - const turnResult = await runInboundReplyTurn({ + const turnResult = await runPreparedInboundReplyTurn({ channel: "slack", accountId: route.accountId, - raw: prepared.message, - adapter: { - ingest: () => ({ - id: prepared.message.ts ?? `${prepared.ctxPayload.From}:${Date.now()}`, - timestamp: prepared.message.ts ? Number(prepared.message.ts) * 1000 : undefined, - rawText: prepared.ctxPayload.RawBody ?? "", - textForAgent: prepared.ctxPayload.BodyForAgent, - textForCommands: prepared.ctxPayload.CommandBody, - raw: prepared.message, - }), - resolveTurn: () => ({ - channel: "slack", - accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath: prepared.turn.storePath, - ctxPayload: prepared.ctxPayload, - recordInboundSession, - record: prepared.turn.record as ChannelTurnRecordOptions, - botLoopProtection: resolveSlackBotLoopProtection(prepared), - onPreDispatchFailure: async () => { - dispatchSettledBeforeStart = true; - await settleReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), - }); - }, - runDispatch: () => - dispatchInboundMessage({ - ctx: prepared.ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: prepared.channelConfig?.skills, - sourceReplyDeliveryMode, - hasRepliedRef, - disableBlockStreaming, - onModelSelected, - suppressDefaultToolProgressMessages: suppressDefaultToolProgressMessages - ? true - : undefined, - onPartialReply: useStreaming - ? undefined - : !previewStreamingEnabled - ? undefined - : async (payload) => { - updateDraftFromPartial(payload.text); - }, - onAssistantMessageStart: onDraftBoundary, - onReasoningEnd: onDraftBoundary, - onReasoningStream: statusReactionsEnabled - ? async () => { - await statusReactions.setThinking(); - } - : undefined, - onToolStart: async (payload) => { - if (statusReactionsEnabled) { - await statusReactions.setTool(payload.name); - } - await pushPreviewToolProgress( - buildChannelProgressDraftLineForEntry( - account.config, - { - event: "tool", - name: payload.name, - phase: payload.phase, - args: payload.args, - }, - payload.detailMode ? { detailMode: payload.detailMode } : undefined, - ), - { toolName: payload.name }, - ); - }, - onItemEvent: async (payload) => { - await pushPreviewToolProgress( - buildChannelProgressDraftLineForEntry(account.config, { - event: "item", - itemId: payload.itemId, - itemKind: payload.kind, - title: payload.title, - name: payload.name, - phase: payload.phase, - status: payload.status, - summary: payload.summary, - progressText: payload.progressText, - meta: payload.meta, - }), - ); - }, - onPlanUpdate: async (payload) => { - if (payload.phase !== "update") { - return; - } - await pushPreviewToolProgress( - buildChannelProgressDraftLine({ - event: "plan", - phase: payload.phase, - title: payload.title, - explanation: payload.explanation, - steps: payload.steps, - }), - ); - }, - onApprovalEvent: async (payload) => { - if (payload.phase !== "requested") { - return; - } - await pushPreviewToolProgress( - buildChannelProgressDraftLine({ - event: "approval", - phase: payload.phase, - title: payload.title, - command: payload.command, - reason: payload.reason, - message: payload.message, - }), - ); - }, - onCommandOutput: async (payload) => { - if (payload.phase !== "end") { - return; - } - await pushPreviewToolProgress( - buildChannelProgressDraftLine({ - event: "command-output", - phase: payload.phase, - title: payload.title, - name: payload.name, - status: payload.status, - exitCode: payload.exitCode, - }), - ); - }, - onPatchSummary: async (payload) => { - if (payload.phase !== "end") { - return; - } - await pushPreviewToolProgress( - buildChannelProgressDraftLine({ - event: "patch", - phase: payload.phase, - title: payload.title, - name: payload.name, - added: payload.added, - modified: payload.modified, - deleted: payload.deleted, - summary: payload.summary, - }), - ); - }, - }, - }), - }), + routeSessionKey: route.sessionKey, + storePath: prepared.turn.storePath, + ctxPayload: prepared.ctxPayload, + recordInboundSession, + record: prepared.turn.record as ChannelTurnRecordOptions, + history: prepared.turn.history, + botLoopProtection: resolveSlackBotLoopProtection(prepared), + onPreDispatchFailure: async () => { + dispatchSettledBeforeStart = true; + await settleReplyDispatcher({ + dispatcher, + onSettled: () => markDispatchIdle(), + }); }, + runDispatch: () => + dispatchInboundMessage({ + ctx: prepared.ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: prepared.channelConfig?.skills, + sourceReplyDeliveryMode, + hasRepliedRef, + disableBlockStreaming, + onModelSelected, + suppressDefaultToolProgressMessages: suppressDefaultToolProgressMessages + ? true + : undefined, + onPartialReply: useStreaming + ? undefined + : !previewStreamingEnabled + ? undefined + : async (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: onDraftBoundary, + onReasoningEnd: onDraftBoundary, + onReasoningStream: statusReactionsEnabled + ? async () => { + await statusReactions.setThinking(); + } + : undefined, + onToolStart: async (payload) => { + if (statusReactionsEnabled) { + await statusReactions.setTool(payload.name); + } + await pushPreviewToolProgress( + buildChannelProgressDraftLineForEntry( + account.config, + { + event: "tool", + name: payload.name, + phase: payload.phase, + args: payload.args, + }, + payload.detailMode ? { detailMode: payload.detailMode } : undefined, + ), + { toolName: payload.name }, + ); + }, + onItemEvent: async (payload) => { + await pushPreviewToolProgress( + buildChannelProgressDraftLineForEntry(account.config, { + event: "item", + itemId: payload.itemId, + itemKind: payload.kind, + title: payload.title, + name: payload.name, + phase: payload.phase, + status: payload.status, + summary: payload.summary, + progressText: payload.progressText, + meta: payload.meta, + }), + ); + }, + onPlanUpdate: async (payload) => { + if (payload.phase !== "update") { + return; + } + await pushPreviewToolProgress( + buildChannelProgressDraftLine({ + event: "plan", + phase: payload.phase, + title: payload.title, + explanation: payload.explanation, + steps: payload.steps, + }), + ); + }, + onApprovalEvent: async (payload) => { + if (payload.phase !== "requested") { + return; + } + await pushPreviewToolProgress( + buildChannelProgressDraftLine({ + event: "approval", + phase: payload.phase, + title: payload.title, + command: payload.command, + reason: payload.reason, + message: payload.message, + }), + ); + }, + onCommandOutput: async (payload) => { + if (payload.phase !== "end") { + return; + } + await pushPreviewToolProgress( + buildChannelProgressDraftLine({ + event: "command-output", + phase: payload.phase, + title: payload.title, + name: payload.name, + status: payload.status, + exitCode: payload.exitCode, + }), + ); + }, + onPatchSummary: async (payload) => { + if (payload.phase !== "end") { + return; + } + await pushPreviewToolProgress( + buildChannelProgressDraftLine({ + event: "patch", + phase: payload.phase, + title: payload.title, + name: payload.name, + added: payload.added, + modified: payload.modified, + deleted: payload.deleted, + summary: payload.summary, + }), + ); + }, + }, + }), }); if (turnResult.dispatched) { const result = turnResult.dispatchResult; @@ -1396,16 +1381,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag agentId: route.agentId, }); } - const channelHistory = createChannelHistoryWindow({ historyMap: ctx.channelHistories }); - if (!anyReplyDelivered) { await draftStream?.clear(); - if (prepared.isRoomish && prepared.requireMention) { - channelHistory.clear({ - historyKey: prepared.historyKey, - limit: ctx.historyLimit, - }); - } return; } @@ -1441,11 +1418,4 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }, }); } - - if (prepared.isRoomish && prepared.requireMention) { - channelHistory.clear({ - historyKey: prepared.historyKey, - limit: ctx.historyLimit, - }); - } } diff --git a/extensions/slack/src/monitor/message-handler/prepare.ts b/extensions/slack/src/monitor/message-handler/prepare.ts index 17680f58fcb..7f6dcbf0d0f 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.ts @@ -1242,6 +1242,15 @@ export async function prepareSlackMessage(params: { ); }, }, + history: + isRoomish && shouldRequireMention + ? { + isGroup: true, + historyKey, + historyMap: ctx.channelHistories, + limit: ctx.historyLimit, + } + : undefined, }, replyToMode, requireMention: shouldRequireMention, diff --git a/extensions/slack/src/monitor/message-handler/types.ts b/extensions/slack/src/monitor/message-handler/types.ts index a948271452e..6bfc401952e 100644 --- a/extensions/slack/src/monitor/message-handler/types.ts +++ b/extensions/slack/src/monitor/message-handler/types.ts @@ -1,3 +1,4 @@ +import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history"; import type { FinalizedMsgContext } from "openclaw/plugin-sdk/reply-runtime"; import type { ResolvedAgentRoute } from "openclaw/plugin-sdk/routing"; import type { ResolvedSlackAccount } from "../../accounts.js"; @@ -16,6 +17,12 @@ export type PreparedSlackMessage = { turn: { storePath: string; record: unknown; + history?: { + isGroup?: boolean; + historyKey?: string; + historyMap?: Map; + limit?: number; + }; }; replyToMode: "off" | "first" | "all" | "batched"; requireMention: boolean; diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index 43094d789c6..55ee139e1f6 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -597,6 +597,9 @@ describe("channel turn kernel", () => { it("drops direct prepared turns with bot-loop protection before record and dispatch", async () => { const events: string[] = []; const log = vi.fn(); + const historyMap = new Map([ + ["room", [{ sender: "User", body: "queued before suppression" }]], + ]); const recordInboundSession = createRecordInboundSession(events); const runDispatch = vi.fn(async () => { events.push("dispatch"); @@ -633,6 +636,12 @@ describe("channel turn kernel", () => { log, messageId: "msg-loop", botLoopProtection: { ...botLoopProtection, nowMs: 1_001 }, + history: { + isGroup: true, + historyKey: "room", + historyMap, + limit: 50, + }, }); expect(first.dispatched).toBe(true); @@ -644,6 +653,7 @@ describe("channel turn kernel", () => { expect(events).toEqual(["record", "dispatch"]); expect(recordInboundSession).toHaveBeenCalledTimes(1); expect(runDispatch).toHaveBeenCalledTimes(1); + expect(historyMap.get("room")).toStrictEqual([]); expect(loggedEvents(log)).toEqual([ { stage: "authorize", event: "drop", messageId: "msg-loop" }, ]); diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index 4626e9e6fb0..c67f0d187c8 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -420,6 +420,7 @@ async function runPreparedChannelTurnCore< const admission = params.admission ?? ({ kind: "dispatch" } as const); const botLoopDrop = resolveBotLoopProtectionDrop(params); if (botLoopDrop) { + clearPendingHistoryAfterTurn(params.history); return botLoopDrop; } emit({