refactor(reply): type reply threading policy

This commit is contained in:
Peter Steinberger
2026-04-05 21:40:46 +01:00
parent 456ad889c7
commit 9b7002ee59
24 changed files with 128 additions and 86 deletions

View File

@@ -5,7 +5,7 @@ import { logVerbose } from "../../globals.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload } from "../types.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
import { createBlockReplyContentKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import {
@@ -99,7 +99,7 @@ export async function buildReplyPayloads(params: {
replyToMode: ReplyToMode;
replyToChannel?: OriginatingChannelType;
currentMessageId?: string;
allowImplicitReplyToCurrentMessage?: boolean;
replyThreading?: ReplyThreadingPolicy;
messageProvider?: string;
messagingToolSentTexts?: string[];
messagingToolSentMediaUrls?: string[];
@@ -141,7 +141,7 @@ export async function buildReplyPayloads(params: {
replyToMode: params.replyToMode,
replyToChannel: params.replyToChannel,
currentMessageId: params.currentMessageId,
allowImplicitReplyToCurrentMessage: params.allowImplicitReplyToCurrentMessage,
replyThreading: params.replyThreading,
}).map(async (payload) => {
const parsed = normalizeReplyPayloadDirectives({
payload,

View File

@@ -595,7 +595,7 @@ export async function runReplyAgent(params: {
replyToMode,
replyToChannel,
currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
allowImplicitReplyToCurrentMessage: sessionCtx.AllowImplicitReplyToCurrentMessage,
replyThreading: sessionCtx.ReplyThreading,
messageProvider: followupRun.run.messageProvider,
messagingToolSentTexts: runResult.messagingToolSentTexts,
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,

View File

@@ -1,9 +1,12 @@
import type { ReplyToMode } from "../../config/types.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
import { createReplyToModeFilterForChannel } from "./reply-threading.js";
import {
createReplyToModeFilterForChannel,
resolveImplicitCurrentMessageReplyAllowance,
} from "./reply-threading.js";
export function formatBtwTextForExternalDelivery(payload: ReplyPayload): string | undefined {
const text = payload.text?.trim();
@@ -23,14 +26,14 @@ function resolveReplyThreadingForPayload(params: {
replyToMode?: ReplyToMode;
implicitReplyToId?: string;
currentMessageId?: string;
allowImplicitReplyToCurrentMessage?: boolean;
replyThreading?: ReplyThreadingPolicy;
}): ReplyPayload {
const implicitReplyToId = params.implicitReplyToId?.trim() || undefined;
const currentMessageId = params.currentMessageId?.trim() || undefined;
const allowImplicitReplyToCurrentMessage =
params.replyToMode === "batched"
? params.allowImplicitReplyToCurrentMessage === true
: params.allowImplicitReplyToCurrentMessage !== false;
const allowImplicitReplyToCurrentMessage = resolveImplicitCurrentMessageReplyAllowance(
params.replyToMode,
params.replyThreading,
);
let resolved: ReplyPayload =
params.payload.replyToId ||
@@ -84,15 +87,9 @@ export function applyReplyThreading(params: {
replyToMode: ReplyToMode;
replyToChannel?: OriginatingChannelType;
currentMessageId?: string;
allowImplicitReplyToCurrentMessage?: boolean;
replyThreading?: ReplyThreadingPolicy;
}): ReplyPayload[] {
const {
payloads,
replyToMode,
replyToChannel,
currentMessageId,
allowImplicitReplyToCurrentMessage,
} = params;
const { payloads, replyToMode, replyToChannel, currentMessageId, replyThreading } = params;
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
const implicitReplyToId = currentMessageId?.trim() || undefined;
return payloads
@@ -102,7 +99,7 @@ export function applyReplyThreading(params: {
replyToMode,
implicitReplyToId,
currentMessageId,
allowImplicitReplyToCurrentMessage,
replyThreading,
}),
)
.filter(isRenderablePayload)

View File

@@ -224,7 +224,7 @@ describe("applyReplyThreading auto-threading", () => {
payloads: [{ text: "A" }, { text: "B" }],
replyToMode: "batched",
currentMessageId: "42",
allowImplicitReplyToCurrentMessage: true,
replyThreading: { implicitCurrentMessage: "allow" },
});
expect(result).toHaveLength(2);
@@ -237,7 +237,7 @@ describe("applyReplyThreading auto-threading", () => {
payloads: [{ text: "Hello" }],
replyToMode: "batched",
currentMessageId: "42",
allowImplicitReplyToCurrentMessage: false,
replyThreading: { implicitCurrentMessage: "deny" },
});
expect(result).toHaveLength(1);
@@ -249,7 +249,7 @@ describe("applyReplyThreading auto-threading", () => {
payloads: [{ text: "Hello [[reply_to_current]]" }],
replyToMode: "batched",
currentMessageId: "42",
allowImplicitReplyToCurrentMessage: false,
replyThreading: { implicitCurrentMessage: "deny" },
});
expect(result).toHaveLength(1);
@@ -300,7 +300,7 @@ describe("applyReplyThreading auto-threading", () => {
});
expect(result).toHaveLength(1);
expect(result[0].replyToId).toBeUndefined();
expect(result[0].replyToId).toBe("42");
expect(result[0].replyToTag).toBe(true);
});
@@ -313,7 +313,7 @@ describe("applyReplyThreading auto-threading", () => {
});
expect(result).toHaveLength(1);
expect(result[0].replyToId).toBeUndefined();
expect(result[0].replyToId).toBe("42");
expect(result[0].replyToTag).toBe(true);
});

View File

@@ -9,6 +9,10 @@ export type ReplyReferencePlanner = {
hasReplied(): boolean;
};
export function isSingleUseReplyToMode(mode: ReplyToMode): boolean {
return mode === "first" || mode === "batched";
}
export function createReplyReferencePlanner(options: {
replyToMode: ReplyToMode;
/** Existing thread/reference id (preferred when allowed by replyToMode). */
@@ -40,12 +44,11 @@ export function createReplyReferencePlanner(options: {
hasReplied = true;
return id;
}
// "first" and "batched": only the first eligible reply gets a reference.
if (!hasReplied) {
hasReplied = true;
return id;
if (isSingleUseReplyToMode(options.replyToMode) && hasReplied) {
return undefined;
}
return undefined;
hasReplied = true;
return id;
};
const markSent = () => {

View File

@@ -7,7 +7,8 @@ import { normalizeChannelId as normalizeBuiltInChannelId } from "../../channels/
import type { OpenClawConfig } from "../../config/config.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import { isSingleUseReplyToMode } from "./reply-reference.js";
type ReplyToModeChannelConfig = {
replyToMode?: ReplyToMode;
@@ -124,8 +125,7 @@ export function createReplyToModeFilter(
if (mode === "all") {
return payload;
}
// "first" and "batched" both keep only the first eligible physical send.
if (hasThreaded) {
if (isSingleUseReplyToMode(mode) && hasThreaded) {
// Compaction notices are transient status messages that should always
// appear in-thread, even after the first assistant block has already
// consumed the "first" slot. Let them keep their replyToId.
@@ -138,13 +138,39 @@ export function createReplyToModeFilter(
// threaded (so they appear in-context), but they must not consume the
// "first" slot of the replyToMode=first|batched filter. Skip advancing
// hasThreaded so the real assistant reply still gets replyToId.
if (!payload.isCompactionNotice) {
if (isSingleUseReplyToMode(mode) && !payload.isCompactionNotice) {
hasThreaded = true;
}
return payload;
};
}
export function resolveImplicitCurrentMessageReplyAllowance(
mode: ReplyToMode | undefined,
policy?: ReplyThreadingPolicy,
): boolean {
const implicitCurrentMessage = policy?.implicitCurrentMessage ?? "default";
if (implicitCurrentMessage === "allow") {
return true;
}
if (implicitCurrentMessage === "deny") {
return false;
}
return mode !== "batched";
}
export function resolveBatchedReplyThreadingPolicy(
mode: ReplyToMode,
isBatched: boolean,
): ReplyThreadingPolicy | undefined {
if (mode !== "batched") {
return undefined;
}
return {
implicitCurrentMessage: isBatched ? "allow" : "deny",
};
}
export function createReplyToModeFilterForChannel(
mode: ReplyToMode,
channel?: OriginatingChannelType,

View File

@@ -4,7 +4,7 @@ import { parseAudioTag } from "./audio-tags.js";
import { createBlockReplyCoalescer } from "./block-reply-coalescer.js";
import { matchesMentionWithExplicit } from "./mentions.js";
import { normalizeReplyPayload } from "./normalize-reply.js";
import { createReplyReferencePlanner } from "./reply-reference.js";
import { createReplyReferencePlanner, isSingleUseReplyToMode } from "./reply-reference.js";
import {
extractShortModelName,
hasTemplateVariables,
@@ -888,6 +888,13 @@ describe("createReplyReferencePlanner", () => {
});
expect(existingIdPlanner.use()).toBe("thread-1");
expect(existingIdPlanner.use()).toBeUndefined();
const batchedPlanner = createReplyReferencePlanner({
replyToMode: "batched",
startId: "parent",
});
expect(batchedPlanner.use()).toBe("parent");
expect(batchedPlanner.use()).toBeUndefined();
});
it("honors allowReference=false", () => {
@@ -903,6 +910,15 @@ describe("createReplyReferencePlanner", () => {
});
});
describe("isSingleUseReplyToMode", () => {
it("treats first and batched as single-use reply modes", () => {
expect(isSingleUseReplyToMode("off")).toBe(false);
expect(isSingleUseReplyToMode("all")).toBe(false);
expect(isSingleUseReplyToMode("first")).toBe(true);
expect(isSingleUseReplyToMode("batched")).toBe(true);
});
});
describe("createStreamingDirectiveAccumulator", () => {
it("stashes reply_to_current until a renderable chunk arrives", () => {
const accumulator = createStreamingDirectiveAccumulator();

View File

@@ -5,6 +5,7 @@ import type {
} from "../media-understanding/types.js";
import type { InputProvenance } from "../sessions/input-provenance.js";
import type { CommandArgs } from "./commands-registry.types.js";
import type { ReplyThreadingPolicy } from "./types.js";
/** Valid message channels for routing. */
export type OriginatingChannelType = ChannelId;
@@ -64,11 +65,8 @@ export type MsgContext = {
MessageSids?: string[];
MessageSidFirst?: string;
MessageSidLast?: string;
/**
* Whether this inbound turn should implicitly reply to the current message
* when reply threading is enabled. Undefined preserves legacy behavior.
*/
AllowImplicitReplyToCurrentMessage?: boolean;
/** Per-turn reply-threading overrides. */
ReplyThreading?: ReplyThreadingPolicy;
ReplyToId?: string;
/**
* Root message id for thread reconstruction (used by Feishu for root_id).

View File

@@ -24,6 +24,11 @@ export type TypingPolicy =
| "internal_webchat"
| "heartbeat";
export type ReplyThreadingPolicy = {
/** Override implicit reply-to-current behavior for the current turn. */
implicitCurrentMessage?: "default" | "allow" | "deny";
};
export type GetReplyOptions = {
/** Override run id for agent events (defaults to random UUID). */
runId?: string;