refactor: run prepared Discord and Slack turns

Route Discord and Slack prepared message turns through the core prepared-turn runner directly.

Local proof before landing:
- node scripts/run-vitest.mjs src/channels/turn/kernel.test.ts extensions/discord/src/monitor/message-handler.process.test.ts extensions/slack/src/monitor/message-handler/prepare.test.ts extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts
- node scripts/run-tsgo.mjs -p tsconfig.core.json --incremental false
- node scripts/run-tsgo.mjs -p tsconfig.extensions.json --incremental false
- OPENCLAW_TESTBOX_REMOTE_RUN=1 OPENCLAW_VITEST_MAX_WORKERS=1 pnpm check:changed
- codex-review clean after accepted Slack bot-loop history cleanup finding was fixed in core

GitHub checks had no failures; Blacksmith/GitHub runner jobs were still queued when maintainer approved landing based on local proof.
This commit is contained in:
Peter Steinberger
2026-05-15 16:06:22 +01:00
committed by GitHub
parent 369917ff79
commit 2eee70e0a6
6 changed files with 329 additions and 347 deletions

View File

@@ -26,7 +26,7 @@ import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import {
hasFinalInboundReplyDispatch,
recordChannelBotPairLoopAndCheckSuppression,
runInboundReplyTurn,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
@@ -631,184 +631,169 @@ export async function processDiscordMessage(
await settleDispatchBeforeStart();
return;
}
const preparedResult = await runInboundReplyTurn({
const preparedResult = await runPreparedInboundReplyTurn({
channel: "discord",
accountId: route.accountId,
raw: ctx,
adapter: {
ingest: () => ({
id: message.id,
timestamp: message.timestamp ? Date.parse(message.timestamp) : undefined,
rawText: text,
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: message,
}),
resolveTurn: () => ({
channel: "discord",
accountId: route.accountId,
routeSessionKey: persistedSessionKey,
storePath: turn.storePath,
ctxPayload,
recordInboundSession,
record: turn.record,
history: {
isGroup: isGuildMessage,
historyKey: messageChannelId,
historyMap: guildHistories,
limit: historyLimit,
},
onPreDispatchFailure: settleDispatchBeforeStart,
runDispatch: async () => {
return await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
abortSignal,
skillFilter: channelConfig?.skills,
sourceReplyDeliveryMode,
disableBlockStreaming: sourceRepliesAreToolOnly
? true
: (draftPreview.disableBlockStreamingForDraft ??
(typeof resolvedBlockStreamingEnabled === "boolean"
? !resolvedBlockStreamingEnabled
: undefined)),
onPartialReply: draftPreview.draftStream
? (payload) => draftPreview.updateFromPartial(payload.text)
: undefined,
onAssistantMessageStart: draftPreview.draftStream
? () => draftPreview.handleAssistantMessageBoundary()
: undefined,
onReasoningEnd: draftPreview.draftStream
? () => draftPreview.handleAssistantMessageBoundary()
: undefined,
onModelSelected,
suppressDefaultToolProgressMessages:
draftPreview.suppressDefaultToolProgressMessages ? true : undefined,
onReasoningStream: async (payload) => {
await statusReactions.setThinking();
const formattedText = payload?.text
? formatReasoningMessage(payload.text)
: undefined;
await draftPreview.pushReasoningProgress(formattedText);
},
onToolStart: async (payload) => {
if (isProcessAborted(abortSignal)) {
return;
}
await maybeBindStatusReactionsToToolReaction(payload);
await statusReactions.setTool(payload.name);
await draftPreview.pushToolProgress(
buildChannelProgressDraftLineForEntry(
discordConfig,
{
event: "tool",
name: payload.name,
phase: payload.phase,
args: payload.args,
},
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
),
{ toolName: payload.name },
);
},
onItemEvent: async (payload) => {
await draftPreview.pushToolProgress(
buildChannelProgressDraftLineForEntry(discordConfig, {
event: "item",
itemId: payload.itemId,
itemKind: payload.kind,
title: payload.title,
name: payload.name,
phase: payload.phase,
status: payload.status,
summary: payload.summary,
progressText: payload.progressText,
meta: payload.meta,
}),
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "plan",
phase: payload.phase,
title: payload.title,
explanation: payload.explanation,
steps: payload.steps,
}),
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "approval",
phase: payload.phase,
title: payload.title,
command: payload.command,
reason: payload.reason,
message: payload.message,
}),
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "command-output",
phase: payload.phase,
title: payload.title,
name: payload.name,
status: payload.status,
exitCode: payload.exitCode,
}),
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "patch",
phase: payload.phase,
title: payload.title,
name: payload.name,
added: payload.added,
modified: payload.modified,
deleted: payload.deleted,
summary: payload.summary,
}),
);
},
onCompactionStart: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setCompacting();
},
onCompactionEnd: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
statusReactions.cancelPending();
await statusReactions.setThinking();
},
},
});
},
}),
routeSessionKey: persistedSessionKey,
storePath: turn.storePath,
ctxPayload,
recordInboundSession,
record: turn.record,
history: {
isGroup: isGuildMessage,
historyKey: messageChannelId,
historyMap: guildHistories,
limit: historyLimit,
},
onPreDispatchFailure: settleDispatchBeforeStart,
runDispatch: async () =>
await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
abortSignal,
skillFilter: channelConfig?.skills,
sourceReplyDeliveryMode,
disableBlockStreaming: sourceRepliesAreToolOnly
? true
: (draftPreview.disableBlockStreamingForDraft ??
(typeof resolvedBlockStreamingEnabled === "boolean"
? !resolvedBlockStreamingEnabled
: undefined)),
onPartialReply: draftPreview.draftStream
? (payload) => draftPreview.updateFromPartial(payload.text)
: undefined,
onAssistantMessageStart: draftPreview.draftStream
? () => draftPreview.handleAssistantMessageBoundary()
: undefined,
onReasoningEnd: draftPreview.draftStream
? () => draftPreview.handleAssistantMessageBoundary()
: undefined,
onModelSelected,
suppressDefaultToolProgressMessages: draftPreview.suppressDefaultToolProgressMessages
? true
: undefined,
onReasoningStream: async (payload) => {
await statusReactions.setThinking();
const formattedText = payload?.text
? formatReasoningMessage(payload.text)
: undefined;
await draftPreview.pushReasoningProgress(formattedText);
},
onToolStart: async (payload) => {
if (isProcessAborted(abortSignal)) {
return;
}
await maybeBindStatusReactionsToToolReaction(payload);
await statusReactions.setTool(payload.name);
await draftPreview.pushToolProgress(
buildChannelProgressDraftLineForEntry(
discordConfig,
{
event: "tool",
name: payload.name,
phase: payload.phase,
args: payload.args,
},
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
),
{ toolName: payload.name },
);
},
onItemEvent: async (payload) => {
await draftPreview.pushToolProgress(
buildChannelProgressDraftLineForEntry(discordConfig, {
event: "item",
itemId: payload.itemId,
itemKind: payload.kind,
title: payload.title,
name: payload.name,
phase: payload.phase,
status: payload.status,
summary: payload.summary,
progressText: payload.progressText,
meta: payload.meta,
}),
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "plan",
phase: payload.phase,
title: payload.title,
explanation: payload.explanation,
steps: payload.steps,
}),
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "approval",
phase: payload.phase,
title: payload.title,
command: payload.command,
reason: payload.reason,
message: payload.message,
}),
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "command-output",
phase: payload.phase,
title: payload.title,
name: payload.name,
status: payload.status,
exitCode: payload.exitCode,
}),
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
await draftPreview.pushToolProgress(
buildChannelProgressDraftLine({
event: "patch",
phase: payload.phase,
title: payload.title,
name: payload.name,
added: payload.added,
modified: payload.modified,
deleted: payload.deleted,
summary: payload.summary,
}),
);
},
onCompactionStart: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setCompacting();
},
onCompactionEnd: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
statusReactions.cancelPending();
await statusReactions.setThinking();
},
},
}),
});
if (!preparedResult.dispatched) {
return;

View File

@@ -34,11 +34,10 @@ import {
type ChannelBotLoopProtectionFacts,
type ChannelTurnRecordOptions,
hasVisibleInboundReplyDispatch,
runInboundReplyTurn,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime";
import { mergePairLoopGuardConfig } from "openclaw/plugin-sdk/pair-loop-guard-runtime";
import { createChannelHistoryWindow } from "openclaw/plugin-sdk/reply-history";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import type { ReplyDispatchKind, ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { resolveInboundLastRouteSessionKey } from "openclaw/plugin-sdk/routing";
@@ -1157,163 +1156,149 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
let counts: { final?: number; block?: number } = {};
let dispatchSettledBeforeStart = false;
try {
const turnResult = await runInboundReplyTurn({
const turnResult = await runPreparedInboundReplyTurn({
channel: "slack",
accountId: route.accountId,
raw: prepared.message,
adapter: {
ingest: () => ({
id: prepared.message.ts ?? `${prepared.ctxPayload.From}:${Date.now()}`,
timestamp: prepared.message.ts ? Number(prepared.message.ts) * 1000 : undefined,
rawText: prepared.ctxPayload.RawBody ?? "",
textForAgent: prepared.ctxPayload.BodyForAgent,
textForCommands: prepared.ctxPayload.CommandBody,
raw: prepared.message,
}),
resolveTurn: () => ({
channel: "slack",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath: prepared.turn.storePath,
ctxPayload: prepared.ctxPayload,
recordInboundSession,
record: prepared.turn.record as ChannelTurnRecordOptions,
botLoopProtection: resolveSlackBotLoopProtection(prepared),
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
});
},
runDispatch: () =>
dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
sourceReplyDeliveryMode,
hasRepliedRef,
disableBlockStreaming,
onModelSelected,
suppressDefaultToolProgressMessages: suppressDefaultToolProgressMessages
? true
: undefined,
onPartialReply: useStreaming
? undefined
: !previewStreamingEnabled
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
onReasoningStream: statusReactionsEnabled
? async () => {
await statusReactions.setThinking();
}
: undefined,
onToolStart: async (payload) => {
if (statusReactionsEnabled) {
await statusReactions.setTool(payload.name);
}
await pushPreviewToolProgress(
buildChannelProgressDraftLineForEntry(
account.config,
{
event: "tool",
name: payload.name,
phase: payload.phase,
args: payload.args,
},
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
),
{ toolName: payload.name },
);
},
onItemEvent: async (payload) => {
await pushPreviewToolProgress(
buildChannelProgressDraftLineForEntry(account.config, {
event: "item",
itemId: payload.itemId,
itemKind: payload.kind,
title: payload.title,
name: payload.name,
phase: payload.phase,
status: payload.status,
summary: payload.summary,
progressText: payload.progressText,
meta: payload.meta,
}),
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "plan",
phase: payload.phase,
title: payload.title,
explanation: payload.explanation,
steps: payload.steps,
}),
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "approval",
phase: payload.phase,
title: payload.title,
command: payload.command,
reason: payload.reason,
message: payload.message,
}),
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "command-output",
phase: payload.phase,
title: payload.title,
name: payload.name,
status: payload.status,
exitCode: payload.exitCode,
}),
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "patch",
phase: payload.phase,
title: payload.title,
name: payload.name,
added: payload.added,
modified: payload.modified,
deleted: payload.deleted,
summary: payload.summary,
}),
);
},
},
}),
}),
routeSessionKey: route.sessionKey,
storePath: prepared.turn.storePath,
ctxPayload: prepared.ctxPayload,
recordInboundSession,
record: prepared.turn.record as ChannelTurnRecordOptions,
history: prepared.turn.history,
botLoopProtection: resolveSlackBotLoopProtection(prepared),
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
});
},
runDispatch: () =>
dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
sourceReplyDeliveryMode,
hasRepliedRef,
disableBlockStreaming,
onModelSelected,
suppressDefaultToolProgressMessages: suppressDefaultToolProgressMessages
? true
: undefined,
onPartialReply: useStreaming
? undefined
: !previewStreamingEnabled
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
onReasoningStream: statusReactionsEnabled
? async () => {
await statusReactions.setThinking();
}
: undefined,
onToolStart: async (payload) => {
if (statusReactionsEnabled) {
await statusReactions.setTool(payload.name);
}
await pushPreviewToolProgress(
buildChannelProgressDraftLineForEntry(
account.config,
{
event: "tool",
name: payload.name,
phase: payload.phase,
args: payload.args,
},
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
),
{ toolName: payload.name },
);
},
onItemEvent: async (payload) => {
await pushPreviewToolProgress(
buildChannelProgressDraftLineForEntry(account.config, {
event: "item",
itemId: payload.itemId,
itemKind: payload.kind,
title: payload.title,
name: payload.name,
phase: payload.phase,
status: payload.status,
summary: payload.summary,
progressText: payload.progressText,
meta: payload.meta,
}),
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "plan",
phase: payload.phase,
title: payload.title,
explanation: payload.explanation,
steps: payload.steps,
}),
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "approval",
phase: payload.phase,
title: payload.title,
command: payload.command,
reason: payload.reason,
message: payload.message,
}),
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "command-output",
phase: payload.phase,
title: payload.title,
name: payload.name,
status: payload.status,
exitCode: payload.exitCode,
}),
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
await pushPreviewToolProgress(
buildChannelProgressDraftLine({
event: "patch",
phase: payload.phase,
title: payload.title,
name: payload.name,
added: payload.added,
modified: payload.modified,
deleted: payload.deleted,
summary: payload.summary,
}),
);
},
},
}),
});
if (turnResult.dispatched) {
const result = turnResult.dispatchResult;
@@ -1396,16 +1381,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
agentId: route.agentId,
});
}
const channelHistory = createChannelHistoryWindow({ historyMap: ctx.channelHistories });
if (!anyReplyDelivered) {
await draftStream?.clear();
if (prepared.isRoomish && prepared.requireMention) {
channelHistory.clear({
historyKey: prepared.historyKey,
limit: ctx.historyLimit,
});
}
return;
}
@@ -1441,11 +1418,4 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
},
});
}
if (prepared.isRoomish && prepared.requireMention) {
channelHistory.clear({
historyKey: prepared.historyKey,
limit: ctx.historyLimit,
});
}
}

View File

@@ -1242,6 +1242,15 @@ export async function prepareSlackMessage(params: {
);
},
},
history:
isRoomish && shouldRequireMention
? {
isGroup: true,
historyKey,
historyMap: ctx.channelHistories,
limit: ctx.historyLimit,
}
: undefined,
},
replyToMode,
requireMention: shouldRequireMention,

View File

@@ -1,3 +1,4 @@
import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import type { FinalizedMsgContext } from "openclaw/plugin-sdk/reply-runtime";
import type { ResolvedAgentRoute } from "openclaw/plugin-sdk/routing";
import type { ResolvedSlackAccount } from "../../accounts.js";
@@ -16,6 +17,12 @@ export type PreparedSlackMessage = {
turn: {
storePath: string;
record: unknown;
history?: {
isGroup?: boolean;
historyKey?: string;
historyMap?: Map<string, HistoryEntry[]>;
limit?: number;
};
};
replyToMode: "off" | "first" | "all" | "batched";
requireMention: boolean;

View File

@@ -597,6 +597,9 @@ describe("channel turn kernel", () => {
it("drops direct prepared turns with bot-loop protection before record and dispatch", async () => {
const events: string[] = [];
const log = vi.fn();
const historyMap = new Map<string, HistoryEntry[]>([
["room", [{ sender: "User", body: "queued before suppression" }]],
]);
const recordInboundSession = createRecordInboundSession(events);
const runDispatch = vi.fn(async () => {
events.push("dispatch");
@@ -633,6 +636,12 @@ describe("channel turn kernel", () => {
log,
messageId: "msg-loop",
botLoopProtection: { ...botLoopProtection, nowMs: 1_001 },
history: {
isGroup: true,
historyKey: "room",
historyMap,
limit: 50,
},
});
expect(first.dispatched).toBe(true);
@@ -644,6 +653,7 @@ describe("channel turn kernel", () => {
expect(events).toEqual(["record", "dispatch"]);
expect(recordInboundSession).toHaveBeenCalledTimes(1);
expect(runDispatch).toHaveBeenCalledTimes(1);
expect(historyMap.get("room")).toStrictEqual([]);
expect(loggedEvents(log)).toEqual([
{ stage: "authorize", event: "drop", messageId: "msg-loop" },
]);

View File

@@ -420,6 +420,7 @@ async function runPreparedChannelTurnCore<
const admission = params.admission ?? ({ kind: "dispatch" } as const);
const botLoopDrop = resolveBotLoopProtectionDrop(params);
if (botLoopDrop) {
clearPendingHistoryAfterTurn(params.history);
return botLoopDrop;
}
emit({