feat(slack): status reaction lifecycle for tool/thinking progress indicators (#56430)

Merged via squash.

Prepared head SHA: 1ba5df3e3b
Co-authored-by: hsiaoa <70124331+hsiaoa@users.noreply.github.com>
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Reviewed-by: @frankekn
This commit is contained in:
Hsiao A
2026-03-29 16:49:53 +08:00
committed by GitHub
parent e28fdb08b8
commit cea7162490
7 changed files with 617 additions and 53 deletions

View File

@@ -1,8 +1,11 @@
import { resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime";
import {
createStatusReactionController,
DEFAULT_TIMING,
logAckFailure,
logTypingFailure,
removeAckReactionAfterReply,
type StatusReactionAdapter,
} from "openclaw/plugin-sdk/channel-feedback";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import { resolveStorePath, updateLastRoute } from "openclaw/plugin-sdk/config-runtime";
@@ -36,6 +39,39 @@ import {
} from "../replies.js";
import type { PreparedSlackMessage } from "./types.js";
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// Slack reactions.add/remove expect shortcode names, not raw unicode emoji.
const UNICODE_TO_SLACK: Record<string, string> = {
"👀": "eyes",
"🤔": "thinking_face",
"🔥": "fire",
"👨‍💻": "male-technologist",
"👨💻": "male-technologist",
"👩‍💻": "female-technologist",
"⚡": "zap",
"🌐": "globe_with_meridians",
"✅": "white_check_mark",
"👍": "thumbsup",
"❌": "x",
"😱": "scream",
"🥱": "yawning_face",
"😨": "fearful",
"⏳": "hourglass_flowing_sand",
"⚠️": "warning",
"✍": "writing_hand",
"🧠": "brain",
"🛠️": "hammer_and_wrench",
"💻": "computer",
};
function toSlackEmojiName(emoji: string): string {
const trimmed = emoji.trim().replace(/^:+|:+$/g, "");
return UNICODE_TO_SLACK[trimmed] ?? trimmed;
}
function hasMedia(payload: ReplyPayload): boolean {
return resolveSendableOutboundReplyParts(payload).hasMedia;
}
@@ -154,9 +190,61 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
replyToMode: prepared.replyToMode,
});
const reactionMessageTs = prepared.ackReactionMessageTs;
const messageTs = message.ts ?? message.event_ts;
const incomingThreadTs = message.thread_ts;
let didSetStatus = false;
const statusReactionsEnabled =
Boolean(prepared.ackReactionPromise) &&
Boolean(reactionMessageTs) &&
cfg.messages?.statusReactions?.enabled !== false;
const slackStatusAdapter: StatusReactionAdapter = {
setReaction: async (emoji) => {
await reactSlackMessage(message.channel, reactionMessageTs ?? "", toSlackEmojiName(emoji), {
token: ctx.botToken,
client: ctx.app.client,
}).catch((err) => {
if (String(err).includes("already_reacted")) {
return;
}
throw err;
});
},
removeReaction: async (emoji) => {
await removeSlackReaction(message.channel, reactionMessageTs ?? "", toSlackEmojiName(emoji), {
token: ctx.botToken,
client: ctx.app.client,
}).catch((err) => {
if (String(err).includes("no_reaction")) {
return;
}
throw err;
});
},
};
const statusReactionTiming = {
...DEFAULT_TIMING,
...cfg.messages?.statusReactions?.timing,
};
const statusReactions = createStatusReactionController({
enabled: statusReactionsEnabled,
adapter: slackStatusAdapter,
initialEmoji: prepared.ackReactionValue || "eyes",
emojis: cfg.messages?.statusReactions?.emojis,
timing: cfg.messages?.statusReactions?.timing,
onError: (err) => {
logAckFailure({
log: logVerbose,
channel: "slack",
target: `${message.channel}/${message.ts}`,
error: err,
});
},
});
if (statusReactionsEnabled) {
void statusReactions.setQueued();
}
// Shared mutable ref for "replyToMode=first". Both tool + auto-reply flows
// mark this to ensure only the first reply is threaded.
@@ -260,6 +348,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
let usedReplyThreadTs: string | undefined;
let observedReplyDelivery = false;
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs();
@@ -274,6 +363,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
replyToMode: prepared.replyToMode,
...(slackIdentity ? { identity: slackIdentity } : {}),
});
observedReplyDelivery = true;
// Record the thread ts only after confirmed delivery success.
if (replyThreadTs) {
usedReplyThreadTs ??= replyThreadTs;
@@ -311,6 +401,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
teamId: ctx.teamId,
userId: message.user,
});
observedReplyDelivery = true;
usedReplyThreadTs ??= streamThreadTs;
replyPlan.markSent();
return;
@@ -367,6 +458,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
...(slackBlocks?.length ? { blocks: slackBlocks } : {}),
},
);
observedReplyDelivery = true;
return;
} catch (err) {
logVerbose(
@@ -471,34 +563,54 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}
};
const { queuedFinal, counts } = await dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
hasRepliedRef,
disableBlockStreaming: useStreaming
? true
: typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
onModelSelected,
onPartialReply: useStreaming
? undefined
: !previewStreamingEnabled
let dispatchError: unknown;
let queuedFinal = false;
let counts: { final?: number; block?: number } = {};
try {
const result = await dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
hasRepliedRef,
disableBlockStreaming: useStreaming
? true
: typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
onModelSelected,
onPartialReply: useStreaming
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
},
});
await draftStream?.flush();
draftStream?.stop();
markDispatchIdle();
: !previewStreamingEnabled
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
onReasoningStream: statusReactionsEnabled
? async () => {
await statusReactions.setThinking();
}
: undefined,
onToolStart: statusReactionsEnabled
? async (payload) => {
await statusReactions.setTool(payload.name);
}
: undefined,
},
});
queuedFinal = result.queuedFinal;
counts = result.counts;
} catch (err) {
dispatchError = err;
} finally {
await draftStream?.flush();
draftStream?.stop();
markDispatchIdle();
}
// -----------------------------------------------------------------------
// Finalize the stream if one was started
@@ -512,7 +624,44 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}
}
const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;
const anyReplyDelivered =
observedReplyDelivery || queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;
if (statusReactionsEnabled) {
if (dispatchError) {
await statusReactions.setError();
if (ctx.removeAckAfterReply) {
void (async () => {
await sleep(statusReactionTiming.errorHoldMs);
if (anyReplyDelivered) {
await statusReactions.clear();
return;
}
await statusReactions.restoreInitial();
})();
} else {
void statusReactions.restoreInitial();
}
} else if (anyReplyDelivered) {
await statusReactions.setDone();
if (ctx.removeAckAfterReply) {
void (async () => {
await sleep(statusReactionTiming.doneHoldMs);
await statusReactions.clear();
})();
} else {
void statusReactions.restoreInitial();
}
} else {
// Silent success should preserve queued state and clear any stall timers
// instead of transitioning to terminal/stall reactions after return.
await statusReactions.restoreInitial();
}
}
if (dispatchError) {
throw dispatchError;
}
// Record thread participation only when we actually delivered a reply and
// know the thread ts that was used (set by deliverNormally, streaming start,
@@ -541,29 +690,31 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
);
}
removeAckReactionAfterReply({
removeAfterReply: ctx.removeAckAfterReply,
ackReactionPromise: prepared.ackReactionPromise,
ackReactionValue: prepared.ackReactionValue,
remove: () =>
removeSlackReaction(
message.channel,
prepared.ackReactionMessageTs ?? "",
prepared.ackReactionValue,
{
token: ctx.botToken,
client: ctx.app.client,
},
),
onError: (err) => {
logAckFailure({
log: logVerbose,
channel: "slack",
target: `${message.channel}/${message.ts}`,
error: err,
});
},
});
if (!statusReactionsEnabled) {
removeAckReactionAfterReply({
removeAfterReply: ctx.removeAckAfterReply && anyReplyDelivered,
ackReactionPromise: prepared.ackReactionPromise,
ackReactionValue: prepared.ackReactionValue,
remove: () =>
removeSlackReaction(
message.channel,
prepared.ackReactionMessageTs ?? "",
prepared.ackReactionValue,
{
token: ctx.botToken,
client: ctx.app.client,
},
),
onError: (err) => {
logAckFailure({
log: logVerbose,
channel: "slack",
target: `${message.channel}/${message.ts}`,
error: err,
});
},
});
}
if (prepared.isRoomish) {
clearHistoryEntriesIfEnabled({

View File

@@ -214,6 +214,33 @@ describe("slack prepareSlackMessage inbound contract", () => {
expectInboundContextContract(prepared!.ctxPayload as any);
});
it("does not enable Slack status reactions when the message timestamp is missing", async () => {
const slackCtx = createInboundSlackCtx({
cfg: {
messages: {
ackReaction: "👀",
ackReactionScope: "all",
statusReactions: { enabled: true },
},
channels: { slack: { enabled: true } },
} as OpenClawConfig,
});
// oxlint-disable-next-line typescript/no-explicit-any
slackCtx.resolveUserName = async () => ({ name: "Alice" }) as any;
const prepared = await prepareMessageWith(slackCtx, defaultAccount, {
channel: "D123",
channel_type: "im",
user: "U1",
text: "hi",
event_ts: "1.000",
} as SlackMessageEvent);
expect(prepared).toBeTruthy();
expect(prepared?.ackReactionMessageTs).toBeUndefined();
expect(prepared?.ackReactionPromise).toBeNull();
});
it("includes forwarded shared attachment text in raw body", async () => {
const prepared = await prepareWithDefaultCtx(
createSlackMessage({

View File

@@ -555,8 +555,12 @@ export async function prepareSlackMessage(params: {
);
const ackReactionMessageTs = message.ts;
const statusReactionsWillHandle =
Boolean(ackReactionMessageTs) &&
cfg.messages?.statusReactions?.enabled !== false &&
shouldAckReaction();
const ackReactionPromise =
shouldAckReaction() && ackReactionMessageTs && ackReactionValue
!statusReactionsWillHandle && shouldAckReaction() && ackReactionMessageTs && ackReactionValue
? reactSlackMessage(message.channel, ackReactionMessageTs, ackReactionValue, {
token: ctx.botToken,
client: ctx.app.client,
@@ -567,7 +571,9 @@ export async function prepareSlackMessage(params: {
return false;
},
)
: null;
: statusReactionsWillHandle
? Promise.resolve(true)
: null;
const roomLabel = channelName ? `#${channelName}` : `#${message.channel}`;
const senderName = await resolveSenderName();