From b534dfa3e0aa4367b174567e3a160905b6a540fc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 22:05:39 +0000 Subject: [PATCH] fix(slack,web): harden thread hints and monitor tuning --- src/slack/monitor/message-handler/dispatch.ts | 3 + src/slack/monitor/replies.ts | 16 ++-- ...compresses-common-formats-jpeg-cap.test.ts | 86 +++++++++++++++---- ....reconnects-after-connection-close.test.ts | 37 ++++---- src/web/auto-reply/monitor.ts | 7 +- src/web/auto-reply/types.ts | 2 + 6 files changed, 107 insertions(+), 44 deletions(-) diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 3e0a9136e46..d726f804c10 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -40,12 +40,14 @@ export function resolveSlackStreamingThreadHint(params: { replyToMode: "off" | "first" | "all"; incomingThreadTs: string | undefined; messageTs: string | undefined; + isThreadReply?: boolean; }): string | undefined { return resolveSlackThreadTs({ replyToMode: params.replyToMode, incomingThreadTs: params.incomingThreadTs, messageTs: params.messageTs, hasReplied: false, + isThreadReply: params.isThreadReply, }); } @@ -168,6 +170,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag replyToMode: ctx.replyToMode, incomingThreadTs, messageTs, + isThreadReply, }); const useStreaming = shouldUseStreaming({ streamingEnabled, diff --git a/src/slack/monitor/replies.ts b/src/slack/monitor/replies.ts index 38ccc638dfd..bc89942d29c 100644 --- a/src/slack/monitor/replies.ts +++ b/src/slack/monitor/replies.ts @@ -74,17 +74,12 @@ export function resolveSlackThreadTs(params: { hasReplied: boolean; isThreadReply?: boolean; }): string | undefined { - const isThreadReply = - params.isThreadReply ?? - (typeof params.incomingThreadTs === "string" && - params.incomingThreadTs.length > 0 && - params.incomingThreadTs !== params.messageTs); const planner = createSlackReplyReferencePlanner({ replyToMode: params.replyToMode, incomingThreadTs: params.incomingThreadTs, messageTs: params.messageTs, hasReplied: params.hasReplied, - isThreadReply, + isThreadReply: params.isThreadReply, }); return planner.use(); } @@ -101,9 +96,12 @@ function createSlackReplyReferencePlanner(params: { hasReplied?: boolean; isThreadReply?: boolean; }) { - // Only force threading for real user thread replies. If Slack auto-populates - // thread_ts on top-level messages, preserve the configured reply mode. - const effectiveMode = params.isThreadReply ? "all" : params.replyToMode; + // Keep backward-compatible behavior: when a thread id is present and caller + // does not provide explicit classification, stay in thread. Callers that can + // distinguish Slack's auto-populated top-level thread_ts should pass + // `isThreadReply: false` to preserve replyToMode behavior. + const effectiveIsThreadReply = params.isThreadReply ?? Boolean(params.incomingThreadTs); + const effectiveMode = effectiveIsThreadReply ? "all" : params.replyToMode; return createReplyReferencePlanner({ replyToMode: effectiveMode, existingId: params.incomingThreadTs, diff --git a/src/web/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts b/src/web/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts index a4a1d38bf46..6bc441272a5 100644 --- a/src/web/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts +++ b/src/web/auto-reply.web-auto-reply.compresses-common-formats-jpeg-cap.test.ts @@ -16,6 +16,8 @@ installWebAutoReplyTestHomeHooks(); describe("web auto-reply", () => { installWebAutoReplyUnitTestHooks({ pinDns: true }); type ListenerFactory = NonNullable[1]>; + const SMALL_MEDIA_CAP_MB = 0.1; + const SMALL_MEDIA_CAP_BYTES = Math.floor(SMALL_MEDIA_CAP_MB * 1024 * 1024); async function setupSingleInboundMessage(params: { resolverValue: { text: string; mediaUrl: string }; @@ -37,7 +39,12 @@ describe("web auto-reply", () => { return { reply, - dispatch: async (id = "msg1") => { + dispatch: async ( + id = "msg1", + overrides?: Partial< + Pick + >, + ) => { await capturedOnMessage?.({ body: "hello", from: "+1", @@ -46,6 +53,7 @@ describe("web auto-reply", () => { accountId: "default", chatType: "direct", chatId: "+1", + ...overrides, id, sendComposing, reply, @@ -143,39 +151,87 @@ describe("web auto-reply", () => { }, ] as const; - const width = 1150; - const height = 1150; + const width = 360; + const height = 360; const sharedRaw = crypto.randomBytes(width * height * 3); - for (const fmt of formats) { - const big = await fmt.make(sharedRaw, { width, height }); - expect(big.length).toBeGreaterThan(1 * 1024 * 1024); - await expectCompressedImageWithinCap({ - mediaUrl: `https://example.com/big.${fmt.name}`, - mime: fmt.mime, - image: big, - messageId: `msg-${fmt.name}`, + const renderedFormats = await Promise.all( + formats.map(async (fmt) => ({ + ...fmt, + image: await fmt.make(sharedRaw, { width, height }), + })), + ); + + await withMediaCap(SMALL_MEDIA_CAP_MB, async () => { + const sendMedia = vi.fn(); + const { reply, dispatch } = await setupSingleInboundMessage({ + resolverValue: { + text: "hi", + mediaUrl: "https://example.com/big.image", + }, + sendMedia, }); - } + let fetchIndex = 0; + + const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async () => { + const matched = + renderedFormats[Math.min(fetchIndex, renderedFormats.length - 1)] ?? renderedFormats[0]; + fetchIndex += 1; + const { image, mime } = matched; + return { + ok: true, + body: true, + arrayBuffer: async () => + image.buffer.slice(image.byteOffset, image.byteOffset + image.byteLength), + headers: { get: () => mime }, + status: 200, + } as unknown as Response; + }); + + try { + for (const [index, fmt] of renderedFormats.entries()) { + expect(fmt.image.length).toBeGreaterThan(SMALL_MEDIA_CAP_BYTES); + const beforeCalls = sendMedia.mock.calls.length; + await dispatch(`msg-${fmt.name}-${index}`, { + from: `+1${index}`, + conversationId: `conv-${index}`, + chatId: `conv-${index}`, + }); + expect(sendMedia).toHaveBeenCalledTimes(beforeCalls + 1); + const payload = sendMedia.mock.calls[beforeCalls]?.[0] as { + image: Buffer; + caption?: string; + mimetype?: string; + }; + expect(payload.image.length).toBeLessThanOrEqual(SMALL_MEDIA_CAP_BYTES); + expect(payload.mimetype).toBe("image/jpeg"); + } + expect(sendMedia).toHaveBeenCalledTimes(renderedFormats.length); + expect(reply).not.toHaveBeenCalled(); + } finally { + fetchMock.mockRestore(); + } + }); }); it("honors mediaMaxMb from config", async () => { const bigPng = await sharp({ create: { - width: 1200, - height: 1200, + width: 256, + height: 256, channels: 3, background: { r: 0, g: 0, b: 255 }, }, }) .png({ compressionLevel: 0 }) .toBuffer(); - expect(bigPng.length).toBeGreaterThan(1 * 1024 * 1024); + expect(bigPng.length).toBeGreaterThan(SMALL_MEDIA_CAP_BYTES); await expectCompressedImageWithinCap({ mediaUrl: "https://example.com/big.png", mime: "image/png", image: bigPng, messageId: "msg1", + mediaMaxMb: SMALL_MEDIA_CAP_MB, }); }); it("falls back to text when media is unsupported", async () => { diff --git a/src/web/auto-reply.web-auto-reply.reconnects-after-connection-close.test.ts b/src/web/auto-reply.web-auto-reply.reconnects-after-connection-close.test.ts index f2e545fc680..a599429ba12 100644 --- a/src/web/auto-reply.web-auto-reply.reconnects-after-connection-close.test.ts +++ b/src/web/auto-reply.web-auto-reply.reconnects-after-connection-close.test.ts @@ -26,6 +26,8 @@ function startMonitorWebChannel(params: { sleep: ReturnType; signal?: AbortSignal; heartbeatSeconds?: number; + messageTimeoutMs?: number; + watchdogCheckMs?: number; reconnect?: { initialMs: number; maxMs: number; maxAttempts: number; factor: number }; }) { const runtime = createRuntime(); @@ -39,6 +41,8 @@ function startMonitorWebChannel(params: { params.signal ?? controller.signal, { heartbeatSeconds: params.heartbeatSeconds ?? 1, + messageTimeoutMs: params.messageTimeoutMs, + watchdogCheckMs: params.watchdogCheckMs, reconnect: params.reconnect ?? { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, sleep: params.sleep, }, @@ -127,7 +131,6 @@ describe("web auto-reply", () => { try { const sleep = vi.fn(async () => {}); const closeResolvers: Array<(reason: unknown) => void> = []; - const signalCloseSpy = vi.fn(); let capturedOnMessage: | ((msg: import("./inbound.js").WebInboundMessage) => Promise) | undefined; @@ -144,22 +147,27 @@ describe("web auto-reply", () => { return { close: vi.fn(), onClose, - signalClose: (reason?: unknown) => { - signalCloseSpy(reason); - resolveClose(reason); - }, + signalClose: (reason?: unknown) => resolveClose(reason), }; }, ); - const { runtime, controller, run } = startMonitorWebChannel({ + const { controller, run } = startMonitorWebChannel({ monitorWebChannelFn: monitorWebChannel as never, listenerFactory, sleep, heartbeatSeconds: 60, + messageTimeoutMs: 30, + watchdogCheckMs: 5, }); await Promise.resolve(); expect(listenerFactory).toHaveBeenCalledTimes(1); + await vi.waitFor( + () => { + expect(capturedOnMessage).toBeTypeOf("function"); + }, + { timeout: 500, interval: 5 }, + ); const reply = vi.fn().mockResolvedValue(undefined); const sendComposing = vi.fn(); @@ -179,19 +187,14 @@ describe("web auto-reply", () => { }), ); - await vi.advanceTimersByTimeAsync(31 * 60 * 1000); + await vi.advanceTimersByTimeAsync(200); await Promise.resolve(); - - await vi.advanceTimersByTimeAsync(1); - expect(signalCloseSpy).toHaveBeenCalledWith( - expect.objectContaining({ status: 499, isLoggedOut: false, error: "watchdog-timeout" }), + await vi.waitFor( + () => { + expect(listenerFactory).toHaveBeenCalledTimes(2); + }, + { timeout: 500, interval: 5 }, ); - for (let i = 0; i < 20 && listenerFactory.mock.calls.length < 2; i += 1) { - await vi.advanceTimersByTimeAsync(50); - await Promise.resolve(); - } - expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(2); - expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("Retry 1")); controller.abort(); closeResolvers[1]?.({ status: 499, isLoggedOut: false }); diff --git a/src/web/auto-reply/monitor.ts b/src/web/auto-reply/monitor.ts index 8ee3c8a85c2..cab3490fedd 100644 --- a/src/web/auto-reply/monitor.ts +++ b/src/web/auto-reply/monitor.ts @@ -154,9 +154,10 @@ export async function monitorWebChannel( let _lastInboundMsg: WebInboundMsg | null = null; let unregisterUnhandled: (() => void) | null = null; - // Watchdog to detect stuck message processing (e.g., event emitter died) - const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages - const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute + // Watchdog to detect stuck message processing (e.g., event emitter died). + // Tuning overrides are test-oriented; production defaults remain unchanged. + const MESSAGE_TIMEOUT_MS = tuning.messageTimeoutMs ?? 30 * 60 * 1000; // 30m default + const WATCHDOG_CHECK_MS = tuning.watchdogCheckMs ?? 60 * 1000; // 1m default const backgroundTasks = new Set>(); const onMessage = createWebOnMessageHandler({ diff --git a/src/web/auto-reply/types.ts b/src/web/auto-reply/types.ts index cb6ce4ce415..df3d19e021a 100644 --- a/src/web/auto-reply/types.ts +++ b/src/web/auto-reply/types.ts @@ -26,6 +26,8 @@ export type WebChannelStatus = { export type WebMonitorTuning = { reconnect?: Partial; heartbeatSeconds?: number; + messageTimeoutMs?: number; + watchdogCheckMs?: number; sleep?: (ms: number, signal?: AbortSignal) => Promise; statusSink?: (status: WebChannelStatus) => void; /** WhatsApp account id. Default: "default". */