fix: centralize source reply delivery mode

This commit is contained in:
Peter Steinberger
2026-04-28 09:13:49 +01:00
parent 1257e0e4ae
commit 67b16a4a6d
21 changed files with 568 additions and 103 deletions

View File

@@ -675,6 +675,7 @@ describe("processDiscordMessage ack reactions", () => {
await processDiscordMessage(ctx as any);
await vi.waitFor(() => expect(sendMocks.removeReactionDiscord).toHaveBeenCalled());
expectRemoveAckCallAt(0, "👀", {
accountId: "default",
ackReaction: "👀",
@@ -861,7 +862,7 @@ describe("processDiscordMessage session routing", () => {
...createDirectMessageContextOverrides(),
})) as any,
);
expect(getLastDispatchReplyOptions()?.sourceReplyDeliveryMode).toBeUndefined();
expect(getLastDispatchReplyOptions()?.sourceReplyDeliveryMode).toBe("automatic");
});
it("prefers bound session keys and sets MessageThreadId for bound thread messages", async () => {

View File

@@ -16,7 +16,10 @@ import {
resolveEnvelopeFormatOptions,
} from "openclaw/plugin-sdk/channel-inbound";
import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
createChannelReplyPipeline,
resolveChannelSourceReplyDeliveryMode,
} from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
resolveChannelStreamingBlockEnabled,
resolveChannelStreamingPreviewToolProgress,
@@ -206,11 +209,11 @@ export async function processDiscordMessage(
if (boundThreadId && typeof threadBindings.touchThread === "function") {
threadBindings.touchThread({ threadId: boundThreadId });
}
const sourceReplyDeliveryMode = isGuildMessage
? cfg.messages?.groupChat?.visibleReplies === "automatic"
? ("automatic" as const)
: ("message_tool_only" as const)
: undefined;
const { createReplyDispatcherWithTyping, dispatchInboundMessage } = await loadReplyRuntime();
const sourceReplyDeliveryMode = resolveChannelSourceReplyDeliveryMode({
cfg,
ctx: { ChatType: isGuildMessage ? "channel" : undefined },
});
const sourceRepliesAreToolOnly = sourceReplyDeliveryMode === "message_tool_only";
const ackReaction = resolveAckReaction(cfg, route.agentId, {
channel: "discord",
@@ -279,8 +282,6 @@ export async function processDiscordMessage(
reactionAdapter: discordAdapter,
target: `${messageChannelId}/${message.id}`,
});
const { createReplyDispatcherWithTyping, dispatchInboundMessage } = await loadReplyRuntime();
const fromLabel = isDirectMessage
? buildDirectLabel(author)
: buildGuildLabel({

View File

@@ -230,6 +230,7 @@ describe("monitorSlackProvider tool results", () => {
responsePrefix: "PFX",
ackReaction: "👀",
ackReactionScope: "group-mentions",
groupChat: { visibleReplies: "automatic" },
removeAckAfterReply: true,
statusReactions: statusReactionsEnabled
? { enabled: true, timing: { debounceMs: 0, doneHoldMs: 0, errorHoldMs: 0 } }
@@ -521,6 +522,38 @@ describe("monitorSlackProvider tool results", () => {
expect(sendMock).toHaveBeenCalledTimes(1);
});
it("keeps always-on channel messages private by default", async () => {
slackTestState.config = {
messages: {
ackReaction: "👀",
ackReactionScope: "all",
statusReactions: {
enabled: true,
timing: { debounceMs: 0, doneHoldMs: 0, errorHoldMs: 0 },
},
},
channels: {
slack: {
dm: { enabled: true, policy: "open", allowFrom: ["*"] },
groupPolicy: "open",
requireMention: false,
},
},
};
replyMock.mockResolvedValue({ text: "quiet" });
await runSlackMessageOnce(monitorSlackProvider, {
event: makeSlackMessageEvent({
channel_type: "channel",
}),
});
await flush();
expect(replyMock).toHaveBeenCalledTimes(1);
expect(sendMock).not.toHaveBeenCalled();
expect(reactMock).not.toHaveBeenCalled();
});
it("treats control commands as mentions for group bypass", async () => {
replyMock.mockResolvedValue({ text: "ok" });
await runChannelMessageEvent("/elevated off");
@@ -584,6 +617,20 @@ describe("monitorSlackProvider tool results", () => {
it("reacts to mention-gated room messages when ackReaction is enabled", async () => {
replyMock.mockResolvedValue(undefined);
slackTestState.config = {
messages: {
responsePrefix: "PFX",
ackReaction: "👀",
ackReactionScope: "group-mentions",
groupChat: { visibleReplies: "automatic" },
},
channels: {
slack: {
dm: { enabled: true, policy: "open", allowFrom: ["*"] },
groupPolicy: "open",
},
},
};
const client = getSlackClient();
if (!client) {
throw new Error("Slack client not registered");

View File

@@ -146,6 +146,22 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({
},
onModelSelected: undefined,
}),
resolveChannelSourceReplyDeliveryMode: (params: {
cfg?: { messages?: { groupChat?: { visibleReplies?: string } } };
ctx?: { ChatType?: string };
requested?: "automatic" | "message_tool_only";
}) => {
if (params.requested) {
return params.requested;
}
const chatType = params.ctx?.ChatType;
if (chatType === "group" || chatType === "channel") {
return params.cfg?.messages?.groupChat?.visibleReplies === "automatic"
? "automatic"
: "message_tool_only";
}
return "automatic";
},
}));
vi.mock("openclaw/plugin-sdk/channel-streaming", () => ({

View File

@@ -8,7 +8,10 @@ import {
type StatusReactionAdapter,
} from "openclaw/plugin-sdk/channel-feedback";
import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
createChannelReplyPipeline,
resolveChannelSourceReplyDeliveryMode,
} from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
resolveChannelStreamingBlockEnabled,
resolveChannelStreamingNativeTransport,
@@ -282,12 +285,18 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
message,
replyToMode: prepared.replyToMode,
});
const sourceReplyDeliveryMode = resolveChannelSourceReplyDeliveryMode({
cfg,
ctx: prepared.ctxPayload,
});
const sourceRepliesAreToolOnly = sourceReplyDeliveryMode === "message_tool_only";
const reactionMessageTs = prepared.ackReactionMessageTs;
const messageTs = message.ts ?? message.event_ts;
const incomingThreadTs = message.thread_ts;
let didSetStatus = false;
const statusReactionsEnabled =
!sourceRepliesAreToolOnly &&
Boolean(prepared.ackReactionPromise) &&
Boolean(reactionMessageTs) &&
cfg.messages?.statusReactions?.enabled !== false;
@@ -361,57 +370,59 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
isSlackInteractiveRepliesEnabled({ cfg, accountId: route.accountId })
? compileSlackInteractiveReplies(payload)
: payload,
typing: {
start: async () => {
didSetStatus = true;
await ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "is typing...",
});
if (typingReaction && message.ts) {
await reactSlackMessage(message.channel, message.ts, typingReaction, {
token: ctx.botToken,
client: ctx.app.client,
}).catch(() => {});
}
},
stop: async () => {
if (!didSetStatus) {
return;
}
didSetStatus = false;
await ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
if (typingReaction && message.ts) {
await removeSlackReaction(message.channel, message.ts, typingReaction, {
token: ctx.botToken,
client: ctx.app.client,
}).catch(() => {});
}
},
onStartError: (err) => {
logTypingFailure({
log: (message) => runtime.error?.(danger(message)),
channel: "slack",
action: "start",
target: typingTarget,
error: err,
});
},
onStopError: (err) => {
logTypingFailure({
log: (message) => runtime.error?.(danger(message)),
channel: "slack",
action: "stop",
target: typingTarget,
error: err,
});
},
},
typing: sourceRepliesAreToolOnly
? undefined
: {
start: async () => {
didSetStatus = true;
await ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "is typing...",
});
if (typingReaction && message.ts) {
await reactSlackMessage(message.channel, message.ts, typingReaction, {
token: ctx.botToken,
client: ctx.app.client,
}).catch(() => {});
}
},
stop: async () => {
if (!didSetStatus) {
return;
}
didSetStatus = false;
await ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
if (typingReaction && message.ts) {
await removeSlackReaction(message.channel, message.ts, typingReaction, {
token: ctx.botToken,
client: ctx.app.client,
}).catch(() => {});
}
},
onStartError: (err) => {
logTypingFailure({
log: (message) => runtime.error?.(danger(message)),
channel: "slack",
action: "start",
target: typingTarget,
error: err,
});
},
onStopError: (err) => {
logTypingFailure({
log: (message) => runtime.error?.(danger(message)),
channel: "slack",
action: "stop",
target: typingTarget,
error: err,
});
},
},
});
const slackStreaming = resolveSlackStreamingConfig({
@@ -424,15 +435,19 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
messageTs,
isThreadReply,
});
const previewStreamingEnabled = shouldEnableSlackPreviewStreaming({
mode: slackStreaming.mode,
isDirectMessage: prepared.isDirectMessage,
threadTs: streamThreadHint,
});
const streamingEnabled = isSlackStreamingEnabled({
mode: slackStreaming.mode,
nativeStreaming: slackStreaming.nativeStreaming,
});
const previewStreamingEnabled =
!sourceRepliesAreToolOnly &&
shouldEnableSlackPreviewStreaming({
mode: slackStreaming.mode,
isDirectMessage: prepared.isDirectMessage,
threadTs: streamThreadHint,
});
const streamingEnabled =
!sourceRepliesAreToolOnly &&
isSlackStreamingEnabled({
mode: slackStreaming.mode,
nativeStreaming: slackStreaming.nativeStreaming,
});
const useStreaming = shouldUseStreaming({
streamingEnabled,
threadTs: streamThreadHint,
@@ -442,11 +457,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
useStreaming,
});
const blockStreamingEnabled = resolveChannelStreamingBlockEnabled(account.config);
const disableBlockStreaming = resolveSlackDisableBlockStreaming({
useStreaming,
shouldUseDraftStream,
blockStreamingEnabled,
});
const disableBlockStreaming = sourceRepliesAreToolOnly
? true
: resolveSlackDisableBlockStreaming({
useStreaming,
shouldUseDraftStream,
blockStreamingEnabled,
});
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
let usedReplyThreadTs: string | undefined;
@@ -967,6 +984,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
sourceReplyDeliveryMode,
hasRepliedRef,
disableBlockStreaming,
onModelSelected,

View File

@@ -12,6 +12,7 @@ import {
resolveEnvelopeFormatOptions,
resolveInboundMentionDecision,
} from "openclaw/plugin-sdk/channel-inbound";
import { resolveChannelSourceReplyDeliveryMode } from "openclaw/plugin-sdk/channel-reply-pipeline";
import { hasControlCommand } from "openclaw/plugin-sdk/command-detection";
import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-gating";
import { shouldHandleTextCommands } from "openclaw/plugin-sdk/command-surface";
@@ -524,12 +525,16 @@ export async function prepareSlackMessage(params: {
return null;
}
const { rawBody, effectiveDirectMedia } = resolvedMessageContent;
const chatType = resolveSlackChatType(conversation.resolvedChannelType);
const ackReaction = resolveAckReaction(cfg, route.agentId, {
channel: "slack",
accountId: account.accountId,
});
const ackReactionValue = ackReaction ?? "";
const sourceRepliesAreToolOnly =
resolveChannelSourceReplyDeliveryMode({ cfg, ctx: { ChatType: chatType } }) ===
"message_tool_only";
const shouldAckReaction = () =>
Boolean(
@@ -547,12 +552,13 @@ export async function prepareSlackMessage(params: {
);
const ackReactionMessageTs = message.ts;
const shouldSendAckReaction = !sourceRepliesAreToolOnly && shouldAckReaction();
const statusReactionsWillHandle =
Boolean(ackReactionMessageTs) &&
cfg.messages?.statusReactions?.enabled !== false &&
shouldAckReaction();
shouldSendAckReaction;
const ackReactionPromise =
!statusReactionsWillHandle && shouldAckReaction() && ackReactionMessageTs && ackReactionValue
!statusReactionsWillHandle && shouldSendAckReaction && ackReactionMessageTs && ackReactionValue
? reactSlackMessage(message.channel, ackReactionMessageTs, ackReactionValue, {
token: ctx.botToken,
client: ctx.app.client,
@@ -571,7 +577,6 @@ export async function prepareSlackMessage(params: {
const roomLabel = channelName ? `#${channelName}` : `#${message.channel}`;
const senderName = await resolveSenderName();
const chatType = resolveSlackChatType(conversation.resolvedChannelType);
const preview = rawBody.replace(/\s+/g, " ").slice(0, 160);
const inboundLabel = isDirectMessage
? `Slack DM from ${senderName}`