refactor: simplify reply-threading and test helpers

This commit is contained in:
Peter Steinberger
2026-04-03 23:04:34 +09:00
parent 1125cd3b97
commit 8d0bed458e
8 changed files with 277 additions and 119 deletions

View File

@@ -2,6 +2,10 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, ToolResultMessage, UserMessage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
buildSessionWriteLockModuleMock,
resetModulesWithSessionWriteLockDoMock,
} from "../../test-utils/session-write-lock-module-mock.js";
import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js";
const acquireSessionWriteLockReleaseMock = vi.hoisted(() => vi.fn(async () => {}));
@@ -9,13 +13,12 @@ const acquireSessionWriteLockMock = vi.hoisted(() =>
vi.fn(async (_params?: unknown) => ({ release: acquireSessionWriteLockReleaseMock })),
);
vi.mock("../session-write-lock.js", async (importOriginal) => {
const original = await importOriginal<typeof import("../session-write-lock.js")>();
return {
...original,
acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params),
};
});
vi.mock("../session-write-lock.js", (importOriginal) =>
buildSessionWriteLockModuleMock(
importOriginal as () => Promise<typeof import("../session-write-lock.js")>,
(params) => acquireSessionWriteLockMock(params),
),
);
let truncateToolResultText: typeof import("./tool-result-truncation.js").truncateToolResultText;
let truncateToolResultMessage: typeof import("./tool-result-truncation.js").truncateToolResultMessage;
@@ -29,14 +32,9 @@ let HARD_MAX_TOOL_RESULT_CHARS: typeof import("./tool-result-truncation.js").HAR
let onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.js").onSessionTranscriptUpdate;
async function loadFreshToolResultTruncationModuleForTest() {
vi.resetModules();
vi.doMock("../session-write-lock.js", async (importOriginal) => {
const original = await importOriginal<typeof import("../session-write-lock.js")>();
return {
...original,
acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params),
};
});
resetModulesWithSessionWriteLockDoMock("../session-write-lock.js", (params) =>
acquireSessionWriteLockMock(params),
);
({ onSessionTranscriptUpdate } = await import("../../sessions/transcript-events.js"));
({
truncateToolResultText,

View File

@@ -1,19 +1,22 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
buildSessionWriteLockModuleMock,
resetModulesWithSessionWriteLockDoMock,
} from "../../test-utils/session-write-lock-module-mock.js";
const acquireSessionWriteLockReleaseMock = vi.hoisted(() => vi.fn(async () => {}));
const acquireSessionWriteLockMock = vi.hoisted(() =>
vi.fn(async (_params?: unknown) => ({ release: acquireSessionWriteLockReleaseMock })),
);
vi.mock("../session-write-lock.js", async (importOriginal) => {
const original = await importOriginal<typeof import("../session-write-lock.js")>();
return {
...original,
acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params),
};
});
vi.mock("../session-write-lock.js", (importOriginal) =>
buildSessionWriteLockModuleMock(
importOriginal as () => Promise<typeof import("../session-write-lock.js")>,
(params) => acquireSessionWriteLockMock(params),
),
);
let rewriteTranscriptEntriesInSessionFile: typeof import("./transcript-rewrite.js").rewriteTranscriptEntriesInSessionFile;
let rewriteTranscriptEntriesInSessionManager: typeof import("./transcript-rewrite.js").rewriteTranscriptEntriesInSessionManager;
@@ -21,14 +24,9 @@ let onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.j
let installSessionToolResultGuard: typeof import("../session-tool-result-guard.js").installSessionToolResultGuard;
async function loadFreshTranscriptRewriteModuleForTest() {
vi.resetModules();
vi.doMock("../session-write-lock.js", async (importOriginal) => {
const original = await importOriginal<typeof import("../session-write-lock.js")>();
return {
...original,
acquireSessionWriteLock: (params: unknown) => acquireSessionWriteLockMock(params),
};
});
resetModulesWithSessionWriteLockDoMock("../session-write-lock.js", (params) =>
acquireSessionWriteLockMock(params),
);
({ onSessionTranscriptUpdate } = await import("../../sessions/transcript-events.js"));
({ installSessionToolResultGuard } = await import("../session-tool-result-guard.js"));
({ rewriteTranscriptEntriesInSessionFile, rewriteTranscriptEntriesInSessionManager } =

View File

@@ -15,7 +15,7 @@ import {
scheduleFollowupDrain,
} from "./queue.js";
import { createReplyDispatcher } from "./reply-dispatcher.js";
import { createReplyToModeFilter, resolveReplyToMode } from "./reply-threading.js";
import { createReplyToModeFilter } from "./reply-threading.js";
import { parseSlackDirectives, hasSlackDirectives } from "./slack-directives.js";
describe("normalizeInboundTextNewlines", () => {
@@ -1778,8 +1778,6 @@ describe("followup queue drain restart after idle window", () => {
});
});
const emptyCfg = {} as OpenClawConfig;
describe("createReplyDispatcher", () => {
it("drops empty payloads and exact silent tokens without media", async () => {
const deliver = vi.fn().mockResolvedValue(undefined);
@@ -1963,69 +1961,6 @@ describe("createReplyDispatcher", () => {
});
});
describe("resolveReplyToMode", () => {
it("falls back to all when channel threading plugins are unavailable", () => {
const configuredCfg = {
channels: {
telegram: { replyToMode: "all" },
discord: { replyToMode: "first" },
slack: { replyToMode: "all" },
},
} as OpenClawConfig;
const chatTypeCfg = {
channels: {
slack: {
replyToMode: "off",
replyToModeByChatType: { direct: "all", group: "first" },
},
},
} as OpenClawConfig;
const topLevelFallbackCfg = {
channels: {
slack: {
replyToMode: "first",
},
},
} as OpenClawConfig;
const legacyDmCfg = {
channels: {
slack: {
replyToMode: "off",
dm: { replyToMode: "all" },
},
},
} as OpenClawConfig;
const cases: Array<{
cfg: OpenClawConfig;
channel?: "telegram" | "discord" | "slack";
chatType?: "direct" | "group" | "channel";
expected: "off" | "all" | "first";
}> = [
{ cfg: emptyCfg, channel: "telegram", expected: "all" },
{ cfg: emptyCfg, channel: "discord", expected: "all" },
{ cfg: emptyCfg, channel: "slack", expected: "all" },
{ cfg: emptyCfg, channel: undefined, expected: "all" },
{ cfg: configuredCfg, channel: "telegram", expected: "all" },
{ cfg: configuredCfg, channel: "discord", expected: "all" },
{ cfg: configuredCfg, channel: "slack", expected: "all" },
{ cfg: chatTypeCfg, channel: "slack", chatType: "direct", expected: "all" },
{ cfg: chatTypeCfg, channel: "slack", chatType: "group", expected: "all" },
{ cfg: chatTypeCfg, channel: "slack", chatType: "channel", expected: "all" },
{ cfg: chatTypeCfg, channel: "slack", chatType: undefined, expected: "all" },
{ cfg: topLevelFallbackCfg, channel: "slack", chatType: "direct", expected: "all" },
{ cfg: topLevelFallbackCfg, channel: "slack", chatType: "channel", expected: "all" },
{ cfg: legacyDmCfg, channel: "slack", chatType: "direct", expected: "all" },
{ cfg: legacyDmCfg, channel: "slack", chatType: "channel", expected: "all" },
];
for (const testCase of cases) {
expect(resolveReplyToMode(testCase.cfg, testCase.channel, null, testCase.chatType)).toBe(
testCase.expected,
);
}
});
});
describe("createReplyToModeFilter", () => {
it("handles off/all mode behavior for replyToId", () => {
const cases: Array<{

View File

@@ -0,0 +1,113 @@
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import {
resolveConfiguredReplyToMode,
resolveReplyToMode,
resolveReplyToModeWithThreading,
} from "./reply-threading.js";
const emptyCfg = {} as OpenClawConfig;
describe("resolveReplyToMode", () => {
it("falls back to configured channel defaults when channel threading plugins are unavailable", () => {
const configuredCfg = {
channels: {
telegram: { replyToMode: "all" },
discord: { replyToMode: "first" },
slack: { replyToMode: "all" },
},
} as OpenClawConfig;
const chatTypeCfg = {
channels: {
slack: {
replyToMode: "off",
replyToModeByChatType: { direct: "all", group: "first" },
},
},
} as OpenClawConfig;
const topLevelFallbackCfg = {
channels: {
slack: {
replyToMode: "first",
},
},
} as OpenClawConfig;
const legacyDmCfg = {
channels: {
slack: {
replyToMode: "off",
dm: { replyToMode: "all" },
},
},
} as OpenClawConfig;
const cases: Array<{
cfg: OpenClawConfig;
channel?: "telegram" | "discord" | "slack";
chatType?: "direct" | "group" | "channel";
expected: "off" | "all" | "first";
}> = [
{ cfg: emptyCfg, channel: "telegram", expected: "all" },
{ cfg: emptyCfg, channel: "discord", expected: "all" },
{ cfg: emptyCfg, channel: "slack", expected: "all" },
{ cfg: emptyCfg, channel: undefined, expected: "all" },
{ cfg: configuredCfg, channel: "telegram", expected: "all" },
{ cfg: configuredCfg, channel: "discord", expected: "first" },
{ cfg: configuredCfg, channel: "slack", expected: "all" },
{ cfg: chatTypeCfg, channel: "slack", chatType: "direct", expected: "all" },
{ cfg: chatTypeCfg, channel: "slack", chatType: "group", expected: "first" },
{ cfg: chatTypeCfg, channel: "slack", chatType: "channel", expected: "off" },
{ cfg: chatTypeCfg, channel: "slack", chatType: undefined, expected: "off" },
{ cfg: topLevelFallbackCfg, channel: "slack", chatType: "direct", expected: "first" },
{ cfg: topLevelFallbackCfg, channel: "slack", chatType: "channel", expected: "first" },
{ cfg: legacyDmCfg, channel: "slack", chatType: "direct", expected: "all" },
{ cfg: legacyDmCfg, channel: "slack", chatType: "channel", expected: "off" },
];
for (const testCase of cases) {
expect(resolveReplyToMode(testCase.cfg, testCase.channel, null, testCase.chatType)).toBe(
testCase.expected,
);
}
});
it("prefers plugin threading adapters over config fallback when available", () => {
expect(
resolveReplyToModeWithThreading(
{
channels: {
slack: {
replyToMode: "off",
},
},
} as OpenClawConfig,
{
resolveReplyToMode: () => "first",
},
{
channel: "slack",
accountId: "acct-1",
chatType: "direct",
},
),
).toBe("first");
});
});
describe("resolveConfiguredReplyToMode", () => {
it("handles top-level, chat-type, and legacy DM fallback without plugin registry access", () => {
const cfg = {
channels: {
slack: {
replyToMode: "off",
replyToModeByChatType: { direct: "all", group: "first" },
dm: { replyToMode: "all" },
},
},
} as OpenClawConfig;
expect(resolveConfiguredReplyToMode(cfg, "slack", "direct")).toBe("all");
expect(resolveConfiguredReplyToMode(cfg, "slack", "group")).toBe("first");
expect(resolveConfiguredReplyToMode(cfg, "slack", "channel")).toBe("off");
expect(resolveConfiguredReplyToMode(cfg, "slack", undefined)).toBe("off");
});
});

View File

@@ -1,25 +1,103 @@
import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js";
import {
getChannelPlugin,
normalizeChannelId as normalizePluginChannelId,
} from "../../channels/plugins/index.js";
import type { ChannelThreadingAdapter } from "../../channels/plugins/types.core.js";
import { normalizeChannelId as normalizeBuiltInChannelId } from "../../channels/registry.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
type ReplyToModeChannelConfig = {
replyToMode?: ReplyToMode;
replyToModeByChatType?: Partial<Record<"direct" | "group" | "channel", ReplyToMode>>;
dm?: {
replyToMode?: ReplyToMode;
};
};
function normalizeReplyToModeChatType(
chatType?: string | null,
): "direct" | "group" | "channel" | undefined {
return chatType === "direct" || chatType === "group" || chatType === "channel"
? chatType
: undefined;
}
function resolveReplyToModeChannelKey(channel?: OriginatingChannelType): string | undefined {
if (typeof channel !== "string") {
return undefined;
}
return (
(normalizeBuiltInChannelId(channel) ??
normalizePluginChannelId(channel) ??
channel.trim().toLowerCase()) ||
undefined
);
}
export function resolveConfiguredReplyToMode(
cfg: OpenClawConfig,
channel?: OriginatingChannelType,
chatType?: string | null,
): ReplyToMode {
const provider = resolveReplyToModeChannelKey(channel);
if (!provider) {
return "all";
}
const channelConfig = (cfg.channels as Record<string, ReplyToModeChannelConfig> | undefined)?.[
provider
];
const normalizedChatType = normalizeReplyToModeChatType(chatType);
if (normalizedChatType) {
const scopedMode = channelConfig?.replyToModeByChatType?.[normalizedChatType];
if (scopedMode !== undefined) {
return scopedMode;
}
}
if (normalizedChatType === "direct") {
const legacyDirectMode = channelConfig?.dm?.replyToMode;
if (legacyDirectMode !== undefined) {
return legacyDirectMode;
}
}
return channelConfig?.replyToMode ?? "all";
}
export function resolveReplyToModeWithThreading(
cfg: OpenClawConfig,
threading: ChannelThreadingAdapter | undefined,
params: {
channel?: OriginatingChannelType;
accountId?: string | null;
chatType?: string | null;
} = {},
): ReplyToMode {
const resolved = threading?.resolveReplyToMode?.({
cfg,
accountId: params.accountId,
chatType: params.chatType,
});
return resolved ?? resolveConfiguredReplyToMode(cfg, params.channel, params.chatType);
}
export function resolveReplyToMode(
cfg: OpenClawConfig,
channel?: OriginatingChannelType,
accountId?: string | null,
chatType?: string | null,
): ReplyToMode {
const provider = normalizeChannelId(channel);
if (!provider) {
return "all";
}
const resolved = getChannelPlugin(provider)?.threading?.resolveReplyToMode?.({
const provider = normalizePluginChannelId(channel);
return resolveReplyToModeWithThreading(
cfg,
accountId,
chatType,
});
return resolved ?? "all";
provider ? getChannelPlugin(provider)?.threading : undefined,
{
channel,
accountId,
chatType,
},
);
}
export function createReplyToModeFilter(
@@ -70,7 +148,7 @@ export function createReplyToModeFilterForChannel(
mode: ReplyToMode,
channel?: OriginatingChannelType,
) {
const provider = normalizeChannelId(channel);
const provider = normalizePluginChannelId(channel);
const normalized = typeof channel === "string" ? channel.trim().toLowerCase() : undefined;
const isWebchat = normalized === "webchat";
// Default: allow explicit reply tags/directives even when replyToMode is "off".

View File

@@ -21,18 +21,18 @@ import {
createChannelTestPluginBase,
createTestRegistry,
} from "../../test-utils/channel-plugins.js";
import { buildSessionWriteLockModuleMock } from "../../test-utils/session-write-lock-module-mock.js";
import { drainFormattedSystemEvents } from "./session-updates.js";
import { persistSessionUsageUpdate } from "./session-usage.js";
import { initSessionState } from "./session.js";
// Perf: session-store locks are exercised elsewhere; most session tests don't need FS lock files.
vi.mock("../../agents/session-write-lock.js", async (importOriginal) => {
const original = await importOriginal<typeof import("../../agents/session-write-lock.js")>();
return {
...original,
acquireSessionWriteLock: async () => ({ release: async () => {} }),
};
});
vi.mock("../../agents/session-write-lock.js", (importOriginal) =>
buildSessionWriteLockModuleMock(
importOriginal as () => Promise<typeof import("../../agents/session-write-lock.js")>,
async () => ({ release: async () => {} }),
),
);
vi.mock("../../agents/model-catalog.js", () => ({
loadModelCatalog: vi.fn(async () => [

View File

@@ -246,6 +246,7 @@ function createChatContext(): Pick<
}
type ChatContext = ReturnType<typeof createChatContext>;
type NonStreamingChatSendWaitFor = "broadcast" | "dedupe" | "none";
async function runNonStreamingChatSend(params: {
context: ChatContext;
@@ -259,6 +260,7 @@ async function runNonStreamingChatSend(params: {
requestParams?: Record<string, unknown>;
waitForCompletion?: boolean;
waitForDedupe?: boolean;
waitFor?: NonStreamingChatSendWaitFor;
}) {
const sendParams: {
sessionKey: string;
@@ -287,11 +289,17 @@ async function runNonStreamingChatSend(params: {
context: params.context as GatewayRequestContext,
});
const shouldExpectBroadcast = params.expectBroadcast ?? true;
if (!shouldExpectBroadcast) {
if (params.waitForCompletion === false || params.waitForDedupe === false) {
return undefined;
}
const waitFor =
params.waitFor ??
(params.waitForCompletion === false || params.waitForDedupe === false
? "none"
: params.expectBroadcast === false
? "dedupe"
: "broadcast");
if (waitFor === "none") {
return undefined;
}
if (waitFor === "dedupe") {
await waitForAssertion(() => {
expect(params.context.dedupe.has(`chat:${params.idempotencyKey}`)).toBe(true);
});

View File

@@ -0,0 +1,28 @@
import { vi } from "vitest";
import type * as SessionWriteLockModule from "../agents/session-write-lock.js";
type SessionWriteLockModuleShape = typeof SessionWriteLockModule;
export async function buildSessionWriteLockModuleMock(
importOriginal: () => Promise<SessionWriteLockModuleShape>,
acquireSessionWriteLock: SessionWriteLockModuleShape["acquireSessionWriteLock"],
): Promise<SessionWriteLockModuleShape> {
const original = await importOriginal();
return {
...original,
acquireSessionWriteLock,
};
}
export function resetModulesWithSessionWriteLockDoMock(
modulePath: string,
acquireSessionWriteLock: SessionWriteLockModuleShape["acquireSessionWriteLock"],
): void {
vi.resetModules();
vi.doMock(modulePath, (importOriginal) =>
buildSessionWriteLockModuleMock(
importOriginal as () => Promise<SessionWriteLockModuleShape>,
acquireSessionWriteLock,
),
);
}