From 8d0bed458ee58a3f74e3b97acf4aad3d5aec1c54 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 3 Apr 2026 23:04:34 +0900 Subject: [PATCH] refactor: simplify reply-threading and test helpers --- .../tool-result-truncation.test.ts | 28 ++--- .../transcript-rewrite.test.ts | 28 ++--- src/auto-reply/reply/reply-flow.test.ts | 67 +---------- src/auto-reply/reply/reply-threading.test.ts | 113 ++++++++++++++++++ src/auto-reply/reply/reply-threading.ts | 100 ++++++++++++++-- src/auto-reply/reply/session.test.ts | 14 +-- .../chat.directive-tags.test.ts | 18 ++- .../session-write-lock-module-mock.ts | 28 +++++ 8 files changed, 277 insertions(+), 119 deletions(-) create mode 100644 src/auto-reply/reply/reply-threading.test.ts create mode 100644 src/test-utils/session-write-lock-module-mock.ts diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.test.ts b/src/agents/pi-embedded-runner/tool-result-truncation.test.ts index 69c84aa8a20..d81f13ecb71 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.test.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.test.ts @@ -2,6 +2,10 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, ToolResultMessage, UserMessage } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + buildSessionWriteLockModuleMock, + resetModulesWithSessionWriteLockDoMock, +} from "../../test-utils/session-write-lock-module-mock.js"; import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; const acquireSessionWriteLockReleaseMock = vi.hoisted(() => vi.fn(async () => {})); @@ -9,13 +13,12 @@ const acquireSessionWriteLockMock = vi.hoisted(() => vi.fn(async (_params?: unknown) => ({ release: acquireSessionWriteLockReleaseMock })), ); -vi.mock("../session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params), - }; -}); +vi.mock("../session-write-lock.js", (importOriginal) => + buildSessionWriteLockModuleMock( + importOriginal as () => Promise, + (params) => acquireSessionWriteLockMock(params), + ), +); let truncateToolResultText: typeof import("./tool-result-truncation.js").truncateToolResultText; let truncateToolResultMessage: typeof import("./tool-result-truncation.js").truncateToolResultMessage; @@ -29,14 +32,9 @@ let HARD_MAX_TOOL_RESULT_CHARS: typeof import("./tool-result-truncation.js").HAR let onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.js").onSessionTranscriptUpdate; async function loadFreshToolResultTruncationModuleForTest() { - vi.resetModules(); - vi.doMock("../session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params), - }; - }); + resetModulesWithSessionWriteLockDoMock("../session-write-lock.js", (params) => + acquireSessionWriteLockMock(params), + ); ({ onSessionTranscriptUpdate } = await import("../../sessions/transcript-events.js")); ({ truncateToolResultText, diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts index 4d99fbb0625..db7bb00c709 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts @@ -1,19 +1,22 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + buildSessionWriteLockModuleMock, + resetModulesWithSessionWriteLockDoMock, +} from "../../test-utils/session-write-lock-module-mock.js"; const acquireSessionWriteLockReleaseMock = vi.hoisted(() => vi.fn(async () => {})); const acquireSessionWriteLockMock = vi.hoisted(() => vi.fn(async (_params?: unknown) => ({ release: acquireSessionWriteLockReleaseMock })), ); -vi.mock("../session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params), - }; -}); +vi.mock("../session-write-lock.js", (importOriginal) => + buildSessionWriteLockModuleMock( + importOriginal as () => Promise, + (params) => acquireSessionWriteLockMock(params), + ), +); let rewriteTranscriptEntriesInSessionFile: typeof import("./transcript-rewrite.js").rewriteTranscriptEntriesInSessionFile; let rewriteTranscriptEntriesInSessionManager: typeof import("./transcript-rewrite.js").rewriteTranscriptEntriesInSessionManager; @@ -21,14 +24,9 @@ let onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.j let installSessionToolResultGuard: typeof import("../session-tool-result-guard.js").installSessionToolResultGuard; async function loadFreshTranscriptRewriteModuleForTest() { - vi.resetModules(); - vi.doMock("../session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params), - }; - }); + resetModulesWithSessionWriteLockDoMock("../session-write-lock.js", (params) => + acquireSessionWriteLockMock(params), + ); ({ onSessionTranscriptUpdate } = await import("../../sessions/transcript-events.js")); ({ installSessionToolResultGuard } = await import("../session-tool-result-guard.js")); ({ rewriteTranscriptEntriesInSessionFile, rewriteTranscriptEntriesInSessionManager } = diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 4795a07905c..5aca9f5f469 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -15,7 +15,7 @@ import { scheduleFollowupDrain, } from "./queue.js"; import { createReplyDispatcher } from "./reply-dispatcher.js"; -import { createReplyToModeFilter, resolveReplyToMode } from "./reply-threading.js"; +import { createReplyToModeFilter } from "./reply-threading.js"; import { parseSlackDirectives, hasSlackDirectives } from "./slack-directives.js"; describe("normalizeInboundTextNewlines", () => { @@ -1778,8 +1778,6 @@ describe("followup queue drain restart after idle window", () => { }); }); -const emptyCfg = {} as OpenClawConfig; - describe("createReplyDispatcher", () => { it("drops empty payloads and exact silent tokens without media", async () => { const deliver = vi.fn().mockResolvedValue(undefined); @@ -1963,69 +1961,6 @@ describe("createReplyDispatcher", () => { }); }); -describe("resolveReplyToMode", () => { - it("falls back to all when channel threading plugins are unavailable", () => { - const configuredCfg = { - channels: { - telegram: { replyToMode: "all" }, - discord: { replyToMode: "first" }, - slack: { replyToMode: "all" }, - }, - } as OpenClawConfig; - const chatTypeCfg = { - channels: { - slack: { - replyToMode: "off", - replyToModeByChatType: { direct: "all", group: "first" }, - }, - }, - } as OpenClawConfig; - const topLevelFallbackCfg = { - channels: { - slack: { - replyToMode: "first", - }, - }, - } as OpenClawConfig; - const legacyDmCfg = { - channels: { - slack: { - replyToMode: "off", - dm: { replyToMode: "all" }, - }, - }, - } as OpenClawConfig; - - const cases: Array<{ - cfg: OpenClawConfig; - channel?: "telegram" | "discord" | "slack"; - chatType?: "direct" | "group" | "channel"; - expected: "off" | "all" | "first"; - }> = [ - { cfg: emptyCfg, channel: "telegram", expected: "all" }, - { cfg: emptyCfg, channel: "discord", expected: "all" }, - { cfg: emptyCfg, channel: "slack", expected: "all" }, - { cfg: emptyCfg, channel: undefined, expected: "all" }, - { cfg: configuredCfg, channel: "telegram", expected: "all" }, - { cfg: configuredCfg, channel: "discord", expected: "all" }, - { cfg: configuredCfg, channel: "slack", expected: "all" }, - { cfg: chatTypeCfg, channel: "slack", chatType: "direct", expected: "all" }, - { cfg: chatTypeCfg, channel: "slack", chatType: "group", expected: "all" }, - { cfg: chatTypeCfg, channel: "slack", chatType: "channel", expected: "all" }, - { cfg: chatTypeCfg, channel: "slack", chatType: undefined, expected: "all" }, - { cfg: topLevelFallbackCfg, channel: "slack", chatType: "direct", expected: "all" }, - { cfg: topLevelFallbackCfg, channel: "slack", chatType: "channel", expected: "all" }, - { cfg: legacyDmCfg, channel: "slack", chatType: "direct", expected: "all" }, - { cfg: legacyDmCfg, channel: "slack", chatType: "channel", expected: "all" }, - ]; - for (const testCase of cases) { - expect(resolveReplyToMode(testCase.cfg, testCase.channel, null, testCase.chatType)).toBe( - testCase.expected, - ); - } - }); -}); - describe("createReplyToModeFilter", () => { it("handles off/all mode behavior for replyToId", () => { const cases: Array<{ diff --git a/src/auto-reply/reply/reply-threading.test.ts b/src/auto-reply/reply/reply-threading.test.ts new file mode 100644 index 00000000000..648ba956f0f --- /dev/null +++ b/src/auto-reply/reply/reply-threading.test.ts @@ -0,0 +1,113 @@ +import { describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import { + resolveConfiguredReplyToMode, + resolveReplyToMode, + resolveReplyToModeWithThreading, +} from "./reply-threading.js"; + +const emptyCfg = {} as OpenClawConfig; + +describe("resolveReplyToMode", () => { + it("falls back to configured channel defaults when channel threading plugins are unavailable", () => { + const configuredCfg = { + channels: { + telegram: { replyToMode: "all" }, + discord: { replyToMode: "first" }, + slack: { replyToMode: "all" }, + }, + } as OpenClawConfig; + const chatTypeCfg = { + channels: { + slack: { + replyToMode: "off", + replyToModeByChatType: { direct: "all", group: "first" }, + }, + }, + } as OpenClawConfig; + const topLevelFallbackCfg = { + channels: { + slack: { + replyToMode: "first", + }, + }, + } as OpenClawConfig; + const legacyDmCfg = { + channels: { + slack: { + replyToMode: "off", + dm: { replyToMode: "all" }, + }, + }, + } as OpenClawConfig; + + const cases: Array<{ + cfg: OpenClawConfig; + channel?: "telegram" | "discord" | "slack"; + chatType?: "direct" | "group" | "channel"; + expected: "off" | "all" | "first"; + }> = [ + { cfg: emptyCfg, channel: "telegram", expected: "all" }, + { cfg: emptyCfg, channel: "discord", expected: "all" }, + { cfg: emptyCfg, channel: "slack", expected: "all" }, + { cfg: emptyCfg, channel: undefined, expected: "all" }, + { cfg: configuredCfg, channel: "telegram", expected: "all" }, + { cfg: configuredCfg, channel: "discord", expected: "first" }, + { cfg: configuredCfg, channel: "slack", expected: "all" }, + { cfg: chatTypeCfg, channel: "slack", chatType: "direct", expected: "all" }, + { cfg: chatTypeCfg, channel: "slack", chatType: "group", expected: "first" }, + { cfg: chatTypeCfg, channel: "slack", chatType: "channel", expected: "off" }, + { cfg: chatTypeCfg, channel: "slack", chatType: undefined, expected: "off" }, + { cfg: topLevelFallbackCfg, channel: "slack", chatType: "direct", expected: "first" }, + { cfg: topLevelFallbackCfg, channel: "slack", chatType: "channel", expected: "first" }, + { cfg: legacyDmCfg, channel: "slack", chatType: "direct", expected: "all" }, + { cfg: legacyDmCfg, channel: "slack", chatType: "channel", expected: "off" }, + ]; + for (const testCase of cases) { + expect(resolveReplyToMode(testCase.cfg, testCase.channel, null, testCase.chatType)).toBe( + testCase.expected, + ); + } + }); + + it("prefers plugin threading adapters over config fallback when available", () => { + expect( + resolveReplyToModeWithThreading( + { + channels: { + slack: { + replyToMode: "off", + }, + }, + } as OpenClawConfig, + { + resolveReplyToMode: () => "first", + }, + { + channel: "slack", + accountId: "acct-1", + chatType: "direct", + }, + ), + ).toBe("first"); + }); +}); + +describe("resolveConfiguredReplyToMode", () => { + it("handles top-level, chat-type, and legacy DM fallback without plugin registry access", () => { + const cfg = { + channels: { + slack: { + replyToMode: "off", + replyToModeByChatType: { direct: "all", group: "first" }, + dm: { replyToMode: "all" }, + }, + }, + } as OpenClawConfig; + + expect(resolveConfiguredReplyToMode(cfg, "slack", "direct")).toBe("all"); + expect(resolveConfiguredReplyToMode(cfg, "slack", "group")).toBe("first"); + expect(resolveConfiguredReplyToMode(cfg, "slack", "channel")).toBe("off"); + expect(resolveConfiguredReplyToMode(cfg, "slack", undefined)).toBe("off"); + }); +}); diff --git a/src/auto-reply/reply/reply-threading.ts b/src/auto-reply/reply/reply-threading.ts index 5c0e1e423bc..c422c7f001f 100644 --- a/src/auto-reply/reply/reply-threading.ts +++ b/src/auto-reply/reply/reply-threading.ts @@ -1,25 +1,103 @@ -import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js"; +import { + getChannelPlugin, + normalizeChannelId as normalizePluginChannelId, +} from "../../channels/plugins/index.js"; +import type { ChannelThreadingAdapter } from "../../channels/plugins/types.core.js"; +import { normalizeChannelId as normalizeBuiltInChannelId } from "../../channels/registry.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { ReplyToMode } from "../../config/types.js"; import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload } from "../types.js"; +type ReplyToModeChannelConfig = { + replyToMode?: ReplyToMode; + replyToModeByChatType?: Partial>; + dm?: { + replyToMode?: ReplyToMode; + }; +}; + +function normalizeReplyToModeChatType( + chatType?: string | null, +): "direct" | "group" | "channel" | undefined { + return chatType === "direct" || chatType === "group" || chatType === "channel" + ? chatType + : undefined; +} + +function resolveReplyToModeChannelKey(channel?: OriginatingChannelType): string | undefined { + if (typeof channel !== "string") { + return undefined; + } + return ( + (normalizeBuiltInChannelId(channel) ?? + normalizePluginChannelId(channel) ?? + channel.trim().toLowerCase()) || + undefined + ); +} + +export function resolveConfiguredReplyToMode( + cfg: OpenClawConfig, + channel?: OriginatingChannelType, + chatType?: string | null, +): ReplyToMode { + const provider = resolveReplyToModeChannelKey(channel); + if (!provider) { + return "all"; + } + const channelConfig = (cfg.channels as Record | undefined)?.[ + provider + ]; + const normalizedChatType = normalizeReplyToModeChatType(chatType); + if (normalizedChatType) { + const scopedMode = channelConfig?.replyToModeByChatType?.[normalizedChatType]; + if (scopedMode !== undefined) { + return scopedMode; + } + } + if (normalizedChatType === "direct") { + const legacyDirectMode = channelConfig?.dm?.replyToMode; + if (legacyDirectMode !== undefined) { + return legacyDirectMode; + } + } + return channelConfig?.replyToMode ?? "all"; +} + +export function resolveReplyToModeWithThreading( + cfg: OpenClawConfig, + threading: ChannelThreadingAdapter | undefined, + params: { + channel?: OriginatingChannelType; + accountId?: string | null; + chatType?: string | null; + } = {}, +): ReplyToMode { + const resolved = threading?.resolveReplyToMode?.({ + cfg, + accountId: params.accountId, + chatType: params.chatType, + }); + return resolved ?? resolveConfiguredReplyToMode(cfg, params.channel, params.chatType); +} + export function resolveReplyToMode( cfg: OpenClawConfig, channel?: OriginatingChannelType, accountId?: string | null, chatType?: string | null, ): ReplyToMode { - const provider = normalizeChannelId(channel); - if (!provider) { - return "all"; - } - const resolved = getChannelPlugin(provider)?.threading?.resolveReplyToMode?.({ + const provider = normalizePluginChannelId(channel); + return resolveReplyToModeWithThreading( cfg, - accountId, - chatType, - }); - return resolved ?? "all"; + provider ? getChannelPlugin(provider)?.threading : undefined, + { + channel, + accountId, + chatType, + }, + ); } export function createReplyToModeFilter( @@ -70,7 +148,7 @@ export function createReplyToModeFilterForChannel( mode: ReplyToMode, channel?: OriginatingChannelType, ) { - const provider = normalizeChannelId(channel); + const provider = normalizePluginChannelId(channel); const normalized = typeof channel === "string" ? channel.trim().toLowerCase() : undefined; const isWebchat = normalized === "webchat"; // Default: allow explicit reply tags/directives even when replyToMode is "off". diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index a70cabca8fc..e24262a0fa9 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -21,18 +21,18 @@ import { createChannelTestPluginBase, createTestRegistry, } from "../../test-utils/channel-plugins.js"; +import { buildSessionWriteLockModuleMock } from "../../test-utils/session-write-lock-module-mock.js"; import { drainFormattedSystemEvents } from "./session-updates.js"; import { persistSessionUsageUpdate } from "./session-usage.js"; import { initSessionState } from "./session.js"; // Perf: session-store locks are exercised elsewhere; most session tests don't need FS lock files. -vi.mock("../../agents/session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: async () => ({ release: async () => {} }), - }; -}); +vi.mock("../../agents/session-write-lock.js", (importOriginal) => + buildSessionWriteLockModuleMock( + importOriginal as () => Promise, + async () => ({ release: async () => {} }), + ), +); vi.mock("../../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(async () => [ diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index 0b688880642..2b1965ee555 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -246,6 +246,7 @@ function createChatContext(): Pick< } type ChatContext = ReturnType; +type NonStreamingChatSendWaitFor = "broadcast" | "dedupe" | "none"; async function runNonStreamingChatSend(params: { context: ChatContext; @@ -259,6 +260,7 @@ async function runNonStreamingChatSend(params: { requestParams?: Record; waitForCompletion?: boolean; waitForDedupe?: boolean; + waitFor?: NonStreamingChatSendWaitFor; }) { const sendParams: { sessionKey: string; @@ -287,11 +289,17 @@ async function runNonStreamingChatSend(params: { context: params.context as GatewayRequestContext, }); - const shouldExpectBroadcast = params.expectBroadcast ?? true; - if (!shouldExpectBroadcast) { - if (params.waitForCompletion === false || params.waitForDedupe === false) { - return undefined; - } + const waitFor = + params.waitFor ?? + (params.waitForCompletion === false || params.waitForDedupe === false + ? "none" + : params.expectBroadcast === false + ? "dedupe" + : "broadcast"); + if (waitFor === "none") { + return undefined; + } + if (waitFor === "dedupe") { await waitForAssertion(() => { expect(params.context.dedupe.has(`chat:${params.idempotencyKey}`)).toBe(true); }); diff --git a/src/test-utils/session-write-lock-module-mock.ts b/src/test-utils/session-write-lock-module-mock.ts new file mode 100644 index 00000000000..0155b3b1411 --- /dev/null +++ b/src/test-utils/session-write-lock-module-mock.ts @@ -0,0 +1,28 @@ +import { vi } from "vitest"; +import type * as SessionWriteLockModule from "../agents/session-write-lock.js"; + +type SessionWriteLockModuleShape = typeof SessionWriteLockModule; + +export async function buildSessionWriteLockModuleMock( + importOriginal: () => Promise, + acquireSessionWriteLock: SessionWriteLockModuleShape["acquireSessionWriteLock"], +): Promise { + const original = await importOriginal(); + return { + ...original, + acquireSessionWriteLock, + }; +} + +export function resetModulesWithSessionWriteLockDoMock( + modulePath: string, + acquireSessionWriteLock: SessionWriteLockModuleShape["acquireSessionWriteLock"], +): void { + vi.resetModules(); + vi.doMock(modulePath, (importOriginal) => + buildSessionWriteLockModuleMock( + importOriginal as () => Promise, + acquireSessionWriteLock, + ), + ); +}