mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:00:42 +00:00
fix(slack): make inbound retries explicit
This commit is contained in:
@@ -68,6 +68,7 @@ export type SlackMonitorContext = {
|
||||
|
||||
logger: ReturnType<typeof getChildLogger>;
|
||||
markMessageSeen: (channelId: string | undefined, ts?: string) => boolean;
|
||||
releaseSeenMessage: (channelId: string | undefined, ts?: string) => void;
|
||||
shouldDropMismatchedSlackEvent: (body: unknown) => boolean;
|
||||
resolveSlackSystemEventSessionKey: (params: {
|
||||
channelId?: string | null;
|
||||
@@ -160,6 +161,13 @@ export function createSlackMonitorContext(params: {
|
||||
return seenMessages.check(`${channelId}:${ts}`);
|
||||
};
|
||||
|
||||
const releaseSeenMessage = (channelId: string | undefined, ts?: string) => {
|
||||
if (!channelId || !ts) {
|
||||
return;
|
||||
}
|
||||
seenMessages.delete(`${channelId}:${ts}`);
|
||||
};
|
||||
|
||||
const resolveSlackSystemEventSessionKey = (p: {
|
||||
channelId?: string | null;
|
||||
channelType?: string | null;
|
||||
@@ -433,6 +441,7 @@ export function createSlackMonitorContext(params: {
|
||||
removeAckAfterReply: params.removeAckAfterReply,
|
||||
logger,
|
||||
markMessageSeen,
|
||||
releaseSeenMessage,
|
||||
shouldDropMismatchedSlackEvent,
|
||||
resolveSlackSystemEventSessionKey,
|
||||
isChannelAllowed,
|
||||
|
||||
@@ -51,30 +51,41 @@ vi.mock("./message-handler/dispatch.js", () => ({
|
||||
}));
|
||||
|
||||
let createSlackMessageHandler: typeof import("./message-handler.js").createSlackMessageHandler;
|
||||
let SlackRetryableInboundError: typeof import("./message-handler.js").SlackRetryableInboundError;
|
||||
|
||||
function createMarkMessageSeen() {
|
||||
const seen = new Set<string>();
|
||||
return (channel: string | undefined, ts: string | undefined) => {
|
||||
if (!channel || !ts) {
|
||||
return {
|
||||
markMessageSeen(channel: string | undefined, ts: string | undefined) {
|
||||
if (!channel || !ts) {
|
||||
return false;
|
||||
}
|
||||
const key = `${channel}:${ts}`;
|
||||
if (seen.has(key)) {
|
||||
return true;
|
||||
}
|
||||
seen.add(key);
|
||||
return false;
|
||||
}
|
||||
const key = `${channel}:${ts}`;
|
||||
if (seen.has(key)) {
|
||||
return true;
|
||||
}
|
||||
seen.add(key);
|
||||
return false;
|
||||
},
|
||||
releaseSeenMessage(channel: string | undefined, ts: string | undefined) {
|
||||
if (!channel || !ts) {
|
||||
return;
|
||||
}
|
||||
seen.delete(`${channel}:${ts}`);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function createTestHandler() {
|
||||
const seenMessages = createMarkMessageSeen();
|
||||
return createSlackMessageHandler({
|
||||
ctx: {
|
||||
cfg: {},
|
||||
accountId: "default",
|
||||
app: { client: {} },
|
||||
runtime: {},
|
||||
markMessageSeen: createMarkMessageSeen(),
|
||||
markMessageSeen: seenMessages.markMessageSeen,
|
||||
releaseSeenMessage: seenMessages.releaseSeenMessage,
|
||||
} as Parameters<typeof createSlackMessageHandler>[0]["ctx"],
|
||||
account: { accountId: "default" } as Parameters<typeof createSlackMessageHandler>[0]["account"],
|
||||
});
|
||||
@@ -118,7 +129,8 @@ async function createInFlightMessageScenario(ts: string) {
|
||||
|
||||
describe("createSlackMessageHandler app_mention race handling", () => {
|
||||
beforeAll(async () => {
|
||||
({ createSlackMessageHandler } = await import("./message-handler.js"));
|
||||
({ createSlackMessageHandler, SlackRetryableInboundError } =
|
||||
await import("./message-handler.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
@@ -183,4 +195,34 @@ describe("createSlackMessageHandler app_mention race handling", () => {
|
||||
expect(prepareSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("retries message replay after an explicit retryable dispatch failure", async () => {
|
||||
prepareSlackMessageMock.mockResolvedValue({ ctxPayload: {} });
|
||||
dispatchPreparedSlackMessageMock
|
||||
.mockRejectedValueOnce(new SlackRetryableInboundError("retry me"))
|
||||
.mockResolvedValueOnce(undefined);
|
||||
|
||||
const handler = createTestHandler();
|
||||
|
||||
await expect(sendMessageEvent(handler, "1700000000.000250")).rejects.toThrow("retry me");
|
||||
await expect(sendMessageEvent(handler, "1700000000.000250")).resolves.toBeUndefined();
|
||||
|
||||
expect(prepareSlackMessageMock).toHaveBeenCalledTimes(2);
|
||||
expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("keeps message replay deduped after a non-retryable dispatch failure", async () => {
|
||||
prepareSlackMessageMock.mockResolvedValue({ ctxPayload: {} });
|
||||
dispatchPreparedSlackMessageMock.mockRejectedValueOnce(new Error("post-send failure"));
|
||||
|
||||
const handler = createTestHandler();
|
||||
|
||||
await expect(sendMessageEvent(handler, "1700000000.000300")).rejects.toThrow(
|
||||
"post-send failure",
|
||||
);
|
||||
await sendMessageEvent(handler, "1700000000.000300");
|
||||
|
||||
expect(prepareSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -32,6 +32,7 @@ vi.mock("./thread-resolution.js", () => ({
|
||||
|
||||
function createContext(overrides?: {
|
||||
markMessageSeen?: (channel: string | undefined, ts: string | undefined) => boolean;
|
||||
releaseSeenMessage?: (channel: string | undefined, ts: string | undefined) => void;
|
||||
}) {
|
||||
return {
|
||||
cfg: {},
|
||||
@@ -42,11 +43,14 @@ function createContext(overrides?: {
|
||||
runtime: {},
|
||||
markMessageSeen: (channel: string | undefined, ts: string | undefined) =>
|
||||
overrides?.markMessageSeen?.(channel, ts) ?? false,
|
||||
releaseSeenMessage: (channel: string | undefined, ts: string | undefined) =>
|
||||
overrides?.releaseSeenMessage?.(channel, ts),
|
||||
} as Parameters<typeof createSlackMessageHandler>[0]["ctx"];
|
||||
}
|
||||
|
||||
function createHandlerWithTracker(overrides?: {
|
||||
markMessageSeen?: (channel: string | undefined, ts: string | undefined) => boolean;
|
||||
releaseSeenMessage?: (channel: string | undefined, ts: string | undefined) => void;
|
||||
}) {
|
||||
const trackEvent = vi.fn();
|
||||
const handler = createSlackMessageHandler({
|
||||
|
||||
@@ -17,6 +17,13 @@ export type SlackMessageHandler = (
|
||||
|
||||
const APP_MENTION_RETRY_TTL_MS = 60_000;
|
||||
|
||||
export class SlackRetryableInboundError extends Error {
|
||||
constructor(message: string, options?: ErrorOptions) {
|
||||
super(message, options);
|
||||
this.name = "SlackRetryableInboundError";
|
||||
}
|
||||
}
|
||||
|
||||
function resolveSlackSenderId(message: SlackMessageEvent): string | null {
|
||||
return message.user ?? message.bot_id ?? null;
|
||||
}
|
||||
@@ -133,40 +140,53 @@ export function createSlackMessageHandler(params: {
|
||||
...last.message,
|
||||
text: combinedText,
|
||||
};
|
||||
const prepared = await prepareSlackMessage({
|
||||
ctx,
|
||||
account,
|
||||
message: syntheticMessage,
|
||||
opts: {
|
||||
...last.opts,
|
||||
wasMentioned: combinedMentioned || last.opts.wasMentioned,
|
||||
},
|
||||
});
|
||||
const seenMessageKey = buildSeenMessageKey(last.message.channel, last.message.ts);
|
||||
if (!prepared) {
|
||||
return;
|
||||
}
|
||||
if (seenMessageKey) {
|
||||
pruneAppMentionRetryKeys(Date.now());
|
||||
if (last.opts.source === "app_mention") {
|
||||
// If app_mention wins the race and dispatches first, drop the later message dispatch.
|
||||
appMentionDispatchedKeys.set(seenMessageKey, Date.now() + APP_MENTION_RETRY_TTL_MS);
|
||||
} else if (last.opts.source === "message" && appMentionDispatchedKeys.has(seenMessageKey)) {
|
||||
appMentionDispatchedKeys.delete(seenMessageKey);
|
||||
appMentionRetryKeys.delete(seenMessageKey);
|
||||
try {
|
||||
const prepared = await prepareSlackMessage({
|
||||
ctx,
|
||||
account,
|
||||
message: syntheticMessage,
|
||||
opts: {
|
||||
...last.opts,
|
||||
wasMentioned: combinedMentioned || last.opts.wasMentioned,
|
||||
},
|
||||
});
|
||||
if (!prepared) {
|
||||
return;
|
||||
}
|
||||
appMentionRetryKeys.delete(seenMessageKey);
|
||||
}
|
||||
if (entries.length > 1) {
|
||||
const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[];
|
||||
if (ids.length > 0) {
|
||||
prepared.ctxPayload.MessageSids = ids;
|
||||
prepared.ctxPayload.MessageSidFirst = ids[0];
|
||||
prepared.ctxPayload.MessageSidLast = ids[ids.length - 1];
|
||||
if (seenMessageKey) {
|
||||
pruneAppMentionRetryKeys(Date.now());
|
||||
if (last.opts.source === "app_mention") {
|
||||
// If app_mention wins the race and dispatches first, drop the later message dispatch.
|
||||
appMentionDispatchedKeys.set(seenMessageKey, Date.now() + APP_MENTION_RETRY_TTL_MS);
|
||||
} else if (
|
||||
last.opts.source === "message" &&
|
||||
appMentionDispatchedKeys.has(seenMessageKey)
|
||||
) {
|
||||
appMentionDispatchedKeys.delete(seenMessageKey);
|
||||
appMentionRetryKeys.delete(seenMessageKey);
|
||||
return;
|
||||
}
|
||||
appMentionRetryKeys.delete(seenMessageKey);
|
||||
}
|
||||
if (entries.length > 1) {
|
||||
const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[];
|
||||
if (ids.length > 0) {
|
||||
prepared.ctxPayload.MessageSids = ids;
|
||||
prepared.ctxPayload.MessageSidFirst = ids[0];
|
||||
prepared.ctxPayload.MessageSidLast = ids[ids.length - 1];
|
||||
}
|
||||
}
|
||||
await dispatchPreparedSlackMessage(prepared);
|
||||
} catch (error) {
|
||||
if (error instanceof SlackRetryableInboundError) {
|
||||
if (seenMessageKey) {
|
||||
appMentionDispatchedKeys.delete(seenMessageKey);
|
||||
}
|
||||
ctx.releaseSeenMessage(last.message.channel, last.message.ts);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
await dispatchPreparedSlackMessage(prepared);
|
||||
},
|
||||
onError: (err) => {
|
||||
ctx.runtime.error?.(`slack inbound debounce flush failed: ${String(err)}`);
|
||||
|
||||
@@ -642,6 +642,7 @@ describe("prepareSlackMessage sender prefix", () => {
|
||||
removeAckAfterReply: false,
|
||||
logger: { info: vi.fn(), warn: vi.fn() },
|
||||
markMessageSeen: () => false,
|
||||
releaseSeenMessage: () => {},
|
||||
shouldDropMismatchedSlackEvent: () => false,
|
||||
resolveSlackSystemEventSessionKey: () => "agent:main:slack:channel:c1",
|
||||
isChannelAllowed: () => true,
|
||||
|
||||
Reference in New Issue
Block a user