Fix Slack retry for session init conflicts (#99647)

* Fix Slack retry for session init conflicts

* fix(slack): separate native and relay retries

* style(slack): make completion exits explicit

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Peter Steinberger
2026-07-03 15:45:36 -07:00
committed by GitHub
parent 0cd936cc77
commit fbdfd19622
2 changed files with 223 additions and 14 deletions

View File

@@ -6,6 +6,8 @@ const flushKeyMock = vi.fn(async (_key: string) => {});
const onFlushCallbacks: Array<(entries: Array<Record<string, unknown>>) => Promise<void>> = [];
const prepareSlackMessageMock = vi.fn(async () => ({ ctxPayload: {} }));
const dispatchPreparedSlackMessageMock = vi.fn(async () => {});
const hasSlackInboundMessageDeliveryMock = vi.fn(async () => false);
const recordSlackInboundMessageDeliveriesMock = vi.fn(async () => {});
const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record<string, unknown> }) => ({
...message,
}));
@@ -45,8 +47,8 @@ vi.mock("./message-handler/pipeline.runtime.js", () => ({
}));
vi.mock("./inbound-delivery-state.js", () => ({
hasSlackInboundMessageDelivery: vi.fn(async () => false),
recordSlackInboundMessageDeliveries: vi.fn(async () => {}),
hasSlackInboundMessageDelivery: hasSlackInboundMessageDeliveryMock,
recordSlackInboundMessageDeliveries: recordSlackInboundMessageDeliveriesMock,
}));
function createContext(overrides?: {
@@ -101,6 +103,9 @@ describe("createSlackMessageHandler", () => {
onFlushCallbacks.length = 0;
prepareSlackMessageMock.mockClear();
dispatchPreparedSlackMessageMock.mockClear();
hasSlackInboundMessageDeliveryMock.mockReset();
hasSlackInboundMessageDeliveryMock.mockResolvedValue(false);
recordSlackInboundMessageDeliveriesMock.mockClear();
resolveThreadTsMock.mockClear();
});
@@ -270,4 +275,138 @@ describe("createSlackMessageHandler", () => {
const flushFailure = expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow("dispatch failed");
await Promise.all([handledFailure, flushFailure]);
});
it("retries native session initialization conflicts through the delivery gates", async () => {
const releaseSeenMessage = vi.fn();
dispatchPreparedSlackMessageMock.mockRejectedValueOnce(
new Error("Slack dispatch failed", {
cause: new Error(
"reply session initialization conflicted for agent:main:main:thread:123.456",
),
}),
);
const { handler } = createHandlerWithTracker({ releaseSeenMessage });
await handler(
{
type: "message",
channel: "C111",
user: "U111",
ts: "1709000000.000700",
text: "native message",
} as never,
{ source: "message" },
);
const entry = enqueueMock.mock.calls[0]?.[0] as Record<string, unknown>;
vi.useFakeTimers();
try {
await expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow("Slack dispatch failed");
await vi.advanceTimersByTimeAsync(1000);
expect(releaseSeenMessage).toHaveBeenCalledWith("C111", "1709000000.000700");
expect(recordSlackInboundMessageDeliveriesMock).not.toHaveBeenCalled();
expect(hasSlackInboundMessageDeliveryMock).toHaveBeenCalledTimes(2);
expect(enqueueMock).toHaveBeenCalledTimes(2);
expect(enqueueMock.mock.calls[1]?.[0]).toMatchObject({
opts: {
retryAttempt: 1,
},
});
expect(enqueueMock.mock.calls[1]?.[0]).not.toHaveProperty("opts.dispatchCompletion");
} finally {
vi.useRealTimers();
}
});
it("leaves relay session conflict retries to unacknowledged redelivery", async () => {
const releaseSeenMessage = vi.fn();
dispatchPreparedSlackMessageMock.mockRejectedValueOnce(
new Error("Slack dispatch failed", {
cause: new Error(
"reply session initialization conflicted for agent:main:main:thread:123.456",
),
}),
);
const { handler } = createHandlerWithTracker({ releaseSeenMessage });
const handled = handler(
{
type: "message",
channel: "C111",
user: "U111",
ts: "1709000000.000800",
text: "relay message",
} as never,
{ source: "message", awaitDispatch: true },
);
await vi.waitFor(() => expect(enqueueMock).toHaveBeenCalledTimes(1));
const entry = enqueueMock.mock.calls[0]?.[0] as Record<string, unknown>;
vi.useFakeTimers();
try {
const handledFailure = expect(handled).rejects.toThrow("Slack dispatch failed");
const flushFailure = expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow(
"Slack dispatch failed",
);
await Promise.all([handledFailure, flushFailure]);
await vi.advanceTimersByTimeAsync(1000);
expect(releaseSeenMessage).toHaveBeenCalledWith("C111", "1709000000.000800");
expect(recordSlackInboundMessageDeliveriesMock).not.toHaveBeenCalled();
expect(enqueueMock).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
});
it("settles an already-delivered relay event without enqueueing", async () => {
hasSlackInboundMessageDeliveryMock.mockResolvedValueOnce(true);
const { handler } = createHandlerWithTracker();
await expect(
handler(
{
type: "message",
channel: "C111",
user: "U111",
ts: "1709000000.000850",
text: "relay replay",
} as never,
{ source: "message", awaitDispatch: true },
),
).resolves.toBeUndefined();
expect(enqueueMock).not.toHaveBeenCalled();
});
it("skips a native retry when another delivery already succeeded", async () => {
dispatchPreparedSlackMessageMock.mockRejectedValueOnce(
new Error("reply session initialization conflicted for agent:main:main:thread:123.456"),
);
hasSlackInboundMessageDeliveryMock.mockResolvedValueOnce(false).mockResolvedValueOnce(true);
const { handler } = createHandlerWithTracker();
await handler(
{
type: "message",
channel: "C111",
user: "U111",
ts: "1709000000.000900",
text: "native message",
} as never,
{ source: "message" },
);
const entry = enqueueMock.mock.calls[0]?.[0] as Record<string, unknown>;
vi.useFakeTimers();
try {
await expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow(
"reply session initialization conflicted",
);
await vi.advanceTimersByTimeAsync(1000);
expect(hasSlackInboundMessageDeliveryMock).toHaveBeenCalledTimes(2);
expect(enqueueMock).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -3,7 +3,7 @@ import {
createChannelInboundDebouncer,
shouldDebounceTextInbound,
} from "openclaw/plugin-sdk/channel-inbound";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { collectErrorGraphCandidates, formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { createLazyRuntimeModule } from "openclaw/plugin-sdk/lazy-runtime";
import {
asDateTimestampMs,
@@ -45,7 +45,11 @@ type SlackDispatchCompletion = {
reject: (error: unknown) => void;
};
type QueuedSlackMessageOptions = Parameters<SlackMessageHandler>[1] & {
type IngressSlackMessageOptions = Parameters<SlackMessageHandler>[1] & {
retryAttempt?: number;
};
type QueuedSlackMessageOptions = IngressSlackMessageOptions & {
dispatchCompletion?: Omit<SlackDispatchCompletion, "promise">;
};
@@ -60,6 +64,9 @@ function createSlackDispatchCompletion(): SlackDispatchCompletion {
}
const APP_MENTION_RETRY_TTL_MS = 60_000;
const RETRYABLE_FLUSH_MAX_ATTEMPTS = 3;
const RETRYABLE_FLUSH_RETRY_DELAY_MS = 1_000;
const REPLY_SESSION_INIT_CONFLICT_MESSAGE_RE = /reply session initialization conflicted for \S+/u;
export class SlackRetryableInboundError extends Error {
constructor(message: string, options?: ErrorOptions) {
@@ -68,6 +75,15 @@ export class SlackRetryableInboundError extends Error {
}
}
function isRetryableSlackInboundError(error: unknown): boolean {
if (error instanceof SlackRetryableInboundError) {
return true;
}
return collectErrorGraphCandidates(error, (current) => [current.cause, current.error]).some(
(candidate) => REPLY_SESSION_INIT_CONFLICT_MESSAGE_RE.test(formatErrorMessage(candidate)),
);
}
function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonitorContext["cfg"]) {
const text = message.text ?? "";
const textForCommandDetection = stripSlackMentionsForCommandDetection(text);
@@ -101,6 +117,46 @@ export function createSlackMessageHandler(params: {
buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId),
shouldDebounce: (entry) => shouldDebounceSlackMessage(entry.message, ctx.cfg),
onFlush: async (entries) => {
const retryEntries = (sourceError: unknown): boolean => {
if (!isRetryableSlackInboundError(sourceError)) {
return false;
}
const nextEntries = entries
.map((entry) => {
// Relay delivery owns retry until its dispatch completion is acknowledged.
// Scheduling here as well can race the router redelivery and duplicate a reply.
if (entry.opts.dispatchCompletion) {
return null;
}
const retryAttempt = entry.opts.retryAttempt ?? 0;
if (retryAttempt >= RETRYABLE_FLUSH_MAX_ATTEMPTS) {
return null;
}
const { dispatchCompletion: _dispatchCompletion, ...retryOpts } = entry.opts;
return {
...entry,
opts: {
...retryOpts,
retryAttempt: retryAttempt + 1,
},
};
})
.filter((entry) => entry !== null);
if (nextEntries.length === 0) {
return false;
}
const retryTimer = setTimeout(() => {
for (const entry of nextEntries) {
// Re-enter ingress so a relay replay or another successful attempt wins
// through the normal delivery and seen-message gates before dispatch.
void enqueueSlackMessage(entry.message, entry.opts).catch((err: unknown) => {
ctx.runtime.error?.(`slack inbound retry enqueue failed: ${formatErrorMessage(err)}`);
});
}
}, RETRYABLE_FLUSH_RETRY_DELAY_MS);
retryTimer.unref?.();
return true;
};
const completions = entries
.map((entry) => entry.opts.dispatchCompletion)
.filter((completion) => completion !== undefined);
@@ -187,7 +243,7 @@ export function createSlackMessageHandler(params: {
messages: entries.map((entry) => entry.message),
});
} catch (error) {
if (!(error instanceof SlackRetryableInboundError)) {
if (!isRetryableSlackInboundError(error)) {
await recordSlackInboundMessageDeliveries({
accountId: ctx.accountId,
messages: entries.map((entry) => entry.message),
@@ -196,11 +252,16 @@ export function createSlackMessageHandler(params: {
throw error;
}
} catch (error) {
if (error instanceof SlackRetryableInboundError) {
if (seenMessageKey) {
appMentionDispatchedKeys.delete(seenMessageKey);
if (isRetryableSlackInboundError(error)) {
// Every buffered event passed the seen gate before this combined dispatch.
// Release all of them so the retry can rebuild the same batch.
for (const entry of entries) {
const entrySeenKey = buildSeenMessageKey(entry.message.channel, entry.message.ts);
if (entrySeenKey) {
appMentionDispatchedKeys.delete(entrySeenKey);
}
ctx.releaseSeenMessage(entry.message.channel, entry.message.ts);
}
ctx.releaseSeenMessage(last.message.channel, last.message.ts);
}
throw error;
}
@@ -209,6 +270,7 @@ export function createSlackMessageHandler(params: {
completion.resolve();
}
} catch (error) {
retryEntries(error);
for (const completion of completions) {
completion.reject(error);
}
@@ -271,9 +333,12 @@ export function createSlackMessageHandler(params: {
return true;
};
return async (message, opts) => {
async function enqueueSlackMessage(
message: SlackMessageEvent,
opts: IngressSlackMessageOptions,
): Promise<SlackDispatchCompletion | undefined> {
if (opts.source === "message" && message.type !== "message") {
return;
return undefined;
}
if (
opts.source === "message" &&
@@ -282,7 +347,7 @@ export function createSlackMessageHandler(params: {
message.subtype !== "bot_message" &&
message.subtype !== "thread_broadcast"
) {
return;
return undefined;
}
const seenMessageKey = buildSeenMessageKey(message.channel, message.ts);
if (
@@ -293,7 +358,7 @@ export function createSlackMessageHandler(params: {
ts: message.ts,
}))
) {
return;
return undefined;
}
const wasSeen = seenMessageKey ? ctx.markMessageSeen(message.channel, message.ts) : false;
if (seenMessageKey && opts.source === "message" && !wasSeen) {
@@ -305,7 +370,7 @@ export function createSlackMessageHandler(params: {
// Allow exactly one app_mention retry if the same ts was previously dropped
// from the message stream before it reached dispatch.
if (opts.source !== "app_mention" || !consumeAppMentionRetryKey(seenMessageKey)) {
return;
return undefined;
}
}
trackEvent?.();
@@ -342,6 +407,11 @@ export function createSlackMessageHandler(params: {
: {}),
},
});
return dispatchCompletion;
}
return async (message, opts) => {
const dispatchCompletion = await enqueueSlackMessage(message, opts);
await dispatchCompletion?.promise;
};
}