refactor: dedupe reply session readers

This commit is contained in:
Peter Steinberger
2026-04-07 05:44:08 +01:00
parent 808c34b374
commit d9f1c61361
11 changed files with 130 additions and 122 deletions

View File

@@ -1,4 +1,5 @@
import type { SessionEntry } from "../../config/sessions/types.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import type { MsgContext } from "../templating.js";
export type AbortCutoff = {
@@ -10,9 +11,7 @@ type SessionAbortCutoffEntry = Pick<SessionEntry, "abortCutoffMessageSid" | "abo
export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined {
const messageSid =
(typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) ||
(typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) ||
undefined;
normalizeOptionalString(ctx.MessageSidFull) ?? normalizeOptionalString(ctx.MessageSid);
const timestamp =
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
if (!messageSid && timestamp === undefined) {
@@ -27,7 +26,7 @@ export function readAbortCutoffFromSessionEntry(
if (!entry) {
return undefined;
}
const messageSid = entry.abortCutoffMessageSid?.trim() || undefined;
const messageSid = normalizeOptionalString(entry.abortCutoffMessageSid);
const timestamp =
typeof entry.abortCutoffTimestamp === "number" && Number.isFinite(entry.abortCutoffTimestamp)
? entry.abortCutoffTimestamp
@@ -51,7 +50,7 @@ export function applyAbortCutoffToSessionEntry(
}
function toNumericMessageSid(value: string | undefined): bigint | undefined {
const trimmed = value?.trim();
const trimmed = normalizeOptionalString(value);
if (!trimmed || !/^\d+$/.test(trimmed)) {
return undefined;
}
@@ -68,8 +67,8 @@ export function shouldSkipMessageByAbortCutoff(params: {
messageSid?: string;
timestamp?: number;
}): boolean {
const cutoffSid = params.cutoffMessageSid?.trim();
const currentSid = params.messageSid?.trim();
const cutoffSid = normalizeOptionalString(params.cutoffMessageSid);
const currentSid = normalizeOptionalString(params.messageSid);
if (cutoffSid && currentSid) {
const cutoffNumeric = toNumericMessageSid(cutoffSid);
const currentNumeric = toNumericMessageSid(currentSid);
@@ -95,8 +94,8 @@ export function shouldPersistAbortCutoff(params: {
commandSessionKey?: string;
targetSessionKey?: string;
}): boolean {
const commandSessionKey = params.commandSessionKey?.trim();
const targetSessionKey = params.targetSessionKey?.trim();
const commandSessionKey = normalizeOptionalString(params.commandSessionKey);
const targetSessionKey = normalizeOptionalString(params.targetSessionKey);
if (!commandSessionKey || !targetSessionKey) {
return true;
}

View File

@@ -3,6 +3,7 @@ import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js"
import { formatToolSummary, resolveToolDisplay } from "../../agents/tool-display.js";
import type { OpenClawConfig } from "../../config/config.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import type { ReplyPayload } from "../types.js";
import {
type AcpHiddenBoundarySeparator,
@@ -140,15 +141,15 @@ function shouldFlushLiveBufferOnIdle(text: string): boolean {
function renderToolSummaryText(event: Extract<AcpRuntimeEvent, { type: "tool_call" }>): string {
const detailParts: string[] = [];
const title = event.title?.trim();
const title = normalizeOptionalString(event.title);
if (title) {
detailParts.push(title);
}
const status = event.status?.trim();
const status = normalizeOptionalString(event.status);
if (status) {
detailParts.push(`status=${status}`);
}
const fallback = event.text?.trim();
const fallback = normalizeOptionalString(event.text);
if (detailParts.length === 0 && fallback) {
detailParts.push(fallback);
}
@@ -334,7 +335,7 @@ export function createAcpReplyProjector(params: {
const renderedToolSummary = renderToolSummaryText(event);
const toolSummary = truncateText(renderedToolSummary, settings.maxSessionUpdateChars);
const hash = hashText(renderedToolSummary);
const toolCallId = event.toolCallId?.trim() || undefined;
const toolCallId = normalizeOptionalString(event.toolCallId);
const status = normalizeToolStatus(event.status);
const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false;
const isStart = status === "in_progress" || event.tag === "tool_call";

View File

@@ -3,6 +3,7 @@ import { getChannelPlugin } from "../../channels/plugins/index.js";
import type { ChannelId, ChannelThreadingToolContext } from "../../channels/plugins/types.js";
import { normalizeAnyChannelId, normalizeChannelId } from "../../channels/registry.js";
import type { OpenClawConfig } from "../../config/config.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import type { TemplateContext } from "../templating.js";
import {
@@ -38,7 +39,7 @@ export function buildThreadingToolContext(params: {
currentMessageId,
};
}
const rawProvider = originProvider?.trim().toLowerCase();
const rawProvider = normalizeOptionalString(originProvider)?.toLowerCase();
if (!rawProvider) {
return {
currentMessageId,
@@ -49,7 +50,7 @@ export function buildThreadingToolContext(params: {
const threading = provider ? getChannelPlugin(provider)?.threading : undefined;
if (!threading?.buildToolContext) {
return {
currentChannelId: originTo?.trim() || undefined,
currentChannelId: normalizeOptionalString(originTo),
currentChannelProvider: provider ?? (rawProvider as ChannelId),
currentMessageId,
hasRepliedRef,
@@ -184,10 +185,10 @@ export function buildEmbeddedContextFromTemplate(params: {
export function buildTemplateSenderContext(sessionCtx: TemplateContext) {
return {
senderId: sessionCtx.SenderId?.trim() || undefined,
senderName: sessionCtx.SenderName?.trim() || undefined,
senderUsername: sessionCtx.SenderUsername?.trim() || undefined,
senderE164: sessionCtx.SenderE164?.trim() || undefined,
senderId: normalizeOptionalString(sessionCtx.SenderId),
senderName: normalizeOptionalString(sessionCtx.SenderName),
senderUsername: normalizeOptionalString(sessionCtx.SenderUsername),
senderE164: normalizeOptionalString(sessionCtx.SenderE164),
};
}

View File

@@ -41,6 +41,7 @@ import {
type SessionBindingRecord,
type SessionBindingService,
} from "../../../infra/outbound/session-binding-service.js";
import { normalizeOptionalString } from "../../../shared/string-coerce.js";
import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js";
import {
resolveAcpCommandAccountId,
@@ -240,7 +241,7 @@ async function bindSpawnedAcpSessionToCurrentConversation(params: {
};
}
const currentConversationId = bindingContext.conversationId?.trim() || "";
const currentConversationId = normalizeOptionalString(bindingContext.conversationId) ?? "";
if (!currentConversationId) {
return {
ok: false,
@@ -248,8 +249,8 @@ async function bindSpawnedAcpSessionToCurrentConversation(params: {
};
}
const senderId = params.commandParams.command.senderId?.trim() || "";
const parentConversationId = bindingContext.parentConversationId?.trim() || undefined;
const senderId = normalizeOptionalString(params.commandParams.command.senderId) ?? "";
const parentConversationId = normalizeOptionalString(bindingContext.parentConversationId);
const conversationRef = {
channel: bindingPolicy.channel,
accountId: bindingPolicy.accountId,
@@ -259,10 +260,7 @@ async function bindSpawnedAcpSessionToCurrentConversation(params: {
: {}),
};
const existingBinding = bindingService.resolveByConversation(conversationRef);
const boundBy =
typeof existingBinding?.metadata?.boundBy === "string"
? existingBinding.metadata.boundBy.trim()
: "";
const boundBy = normalizeOptionalString(existingBinding?.metadata?.boundBy) ?? "";
if (existingBinding && boundBy && boundBy !== "system" && senderId && senderId !== boundBy) {
const currentLabel = resolveAcpBindingLabelNoun({
placement: "current",
@@ -364,7 +362,7 @@ async function bindSpawnedAcpSessionToThread(params: {
}
const currentThreadId = bindingContext.threadId ?? "";
const currentConversationId = bindingContext.conversationId?.trim() || "";
const currentConversationId = normalizeOptionalString(bindingContext.conversationId) ?? "";
const requiresThreadIdForHere = requiresNativeThreadContextForThreadHere(channel);
if (
threadMode === "here" &&
@@ -394,8 +392,8 @@ async function bindSpawnedAcpSessionToThread(params: {
};
}
const senderId = commandParams.command.senderId?.trim() || "";
const parentConversationId = bindingContext.parentConversationId?.trim() || undefined;
const senderId = normalizeOptionalString(commandParams.command.senderId) ?? "";
const parentConversationId = normalizeOptionalString(bindingContext.parentConversationId);
const conversationRef = {
channel: spawnPolicy.channel,
accountId: spawnPolicy.accountId,
@@ -406,10 +404,7 @@ async function bindSpawnedAcpSessionToThread(params: {
};
if (placement === "current") {
const existingBinding = bindingService.resolveByConversation(conversationRef);
const boundBy =
typeof existingBinding?.metadata?.boundBy === "string"
? existingBinding.metadata.boundBy.trim()
: "";
const boundBy = normalizeOptionalString(existingBinding?.metadata?.boundBy) ?? "";
if (existingBinding && boundBy && boundBy !== "system" && senderId && senderId !== boundBy) {
const currentLabel = resolveAcpBindingLabelNoun({
placement,
@@ -460,7 +455,7 @@ async function persistSpawnedSessionLabel(params: {
sessionKey: string;
label?: string;
}): Promise<void> {
const label = params.label?.trim();
const label = normalizeOptionalString(params.label);
if (!label) {
return;
}
@@ -616,7 +611,8 @@ export async function handleAcpSpawnAction(
`✅ Spawned ACP session ${sessionKey} (${spawn.mode}, backend ${initializedBackend}).`,
];
if (binding) {
const currentConversationId = resolveAcpCommandConversationId(params)?.trim() || "";
const currentConversationId =
normalizeOptionalString(resolveAcpCommandConversationId(params)) ?? "";
const boundConversationId = binding.conversation.conversationId.trim();
const bindingPlacement =
currentConversationId && boundConversationId === currentConversationId ? "current" : "child";
@@ -683,7 +679,7 @@ async function resolveAcpTokenTargetSessionKeyOrStop(params: {
commandParams: HandleCommandsParams;
restTokens: string[];
}): Promise<string | CommandHandlerResult> {
const token = params.restTokens.join(" ").trim() || undefined;
const token = normalizeOptionalString(params.restTokens.join(" "));
const target = await resolveAcpTargetSessionKey({
commandParams: params.commandParams,
token,

View File

@@ -5,6 +5,7 @@ import type { AcpRuntimeSessionMode } from "../../../acp/runtime/types.js";
import { supportsAutomaticThreadBindingSpawn } from "../../../channels/thread-bindings-policy.js";
import type { AcpSessionRuntimeOptions } from "../../../config/sessions/types.js";
import { normalizeAgentId } from "../../../routing/session-key.js";
import { normalizeOptionalString } from "../../../shared/string-coerce.js";
import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js";
import { resolveAcpCommandChannel, resolveAcpCommandThreadId } from "./context.js";
export { resolveAcpInstallCommandHint, resolveConfiguredAcpBackendId } from "./install-hints.js";
@@ -90,7 +91,7 @@ export function stopWithText(text: string): CommandHandlerResult {
}
export function resolveAcpAction(tokens: string[]): AcpAction {
const action = tokens[0]?.trim().toLowerCase();
const action = normalizeOptionalString(tokens[0])?.toLowerCase();
if (
action === "spawn" ||
action === "cancel" ||
@@ -199,7 +200,7 @@ export function parseSpawnInput(
if (modeOption.error) {
return { ok: false, error: `${modeOption.error}. ${ACP_SPAWN_USAGE}` };
}
const raw = modeOption.value?.trim().toLowerCase();
const raw = normalizeOptionalString(modeOption.value)?.toLowerCase();
if (raw !== "persistent" && raw !== "oneshot") {
return {
ok: false,
@@ -216,7 +217,7 @@ export function parseSpawnInput(
if (bindOption.error) {
return { ok: false, error: `${bindOption.error}. ${ACP_SPAWN_USAGE}` };
}
const raw = bindOption.value?.trim().toLowerCase();
const raw = normalizeOptionalString(bindOption.value)?.toLowerCase();
if (raw !== "here" && raw !== "off") {
return {
ok: false,
@@ -237,7 +238,7 @@ export function parseSpawnInput(
if (threadOption.error) {
return { ok: false, error: `${threadOption.error}. ${ACP_SPAWN_USAGE}` };
}
const raw = threadOption.value?.trim().toLowerCase();
const raw = normalizeOptionalString(threadOption.value)?.toLowerCase();
if (raw !== "auto" && raw !== "here" && raw !== "off") {
return {
ok: false,
@@ -255,7 +256,7 @@ export function parseSpawnInput(
if (cwdOption.error) {
return { ok: false, error: `${cwdOption.error}. ${ACP_SPAWN_USAGE}` };
}
cwd = cwdOption.value?.trim();
cwd = normalizeOptionalString(cwdOption.value);
i = cwdOption.nextIndex;
continue;
}
@@ -265,7 +266,7 @@ export function parseSpawnInput(
if (labelOption.error) {
return { ok: false, error: `${labelOption.error}. ${ACP_SPAWN_USAGE}` };
}
label = labelOption.value?.trim();
label = normalizeOptionalString(labelOption.value);
i = labelOption.nextIndex;
continue;
}
@@ -278,7 +279,7 @@ export function parseSpawnInput(
}
if (!rawAgentId) {
rawAgentId = token.trim();
rawAgentId = normalizeOptionalString(token);
i += 1;
continue;
}
@@ -289,8 +290,8 @@ export function parseSpawnInput(
};
}
const fallbackAgent = params.cfg.acp?.defaultAgent?.trim() || "";
const selectedAgent = (rawAgentId?.trim() || fallbackAgent).trim();
const fallbackAgent = normalizeOptionalString(params.cfg.acp?.defaultAgent) ?? "";
const selectedAgent = normalizeOptionalString(rawAgentId) ?? fallbackAgent;
if (!selectedAgent) {
return {
ok: false,
@@ -316,7 +317,7 @@ export function parseSpawnInput(
thread,
bind,
cwd,
label: label || undefined,
label,
},
};
}
@@ -341,7 +342,7 @@ export function parseSteerInput(
error: `${sessionOption.error}. ${ACP_STEER_USAGE}`,
};
}
sessionToken = sessionOption.value?.trim() || undefined;
sessionToken = normalizeOptionalString(sessionOption.value);
i = sessionOption.nextIndex;
continue;
}
@@ -371,14 +372,14 @@ export function parseSingleValueCommandInput(
tokens: string[],
usage: string,
): { ok: true; value: ParsedSingleValueCommandInput } | { ok: false; error: string } {
const value = tokens[0]?.trim() || "";
const value = normalizeOptionalString(tokens[0]) ?? "";
if (!value) {
return { ok: false, error: usage };
}
if (tokens.length > 2) {
return { ok: false, error: usage };
}
const sessionToken = tokens[1]?.trim() || undefined;
const sessionToken = normalizeOptionalString(tokens[1]);
return {
ok: true,
value: {
@@ -391,8 +392,8 @@ export function parseSingleValueCommandInput(
export function parseSetCommandInput(
tokens: string[],
): { ok: true; value: ParsedSetCommandInput } | { ok: false; error: string } {
const key = tokens[0]?.trim() || "";
const value = tokens[1]?.trim() || "";
const key = normalizeOptionalString(tokens[0]) ?? "";
const value = normalizeOptionalString(tokens[1]) ?? "";
if (!key || !value) {
return {
ok: false,
@@ -405,7 +406,7 @@ export function parseSetCommandInput(
error: ACP_SET_USAGE,
};
}
const sessionToken = tokens[2]?.trim() || undefined;
const sessionToken = normalizeOptionalString(tokens[2]);
return {
ok: true,
value: {
@@ -423,7 +424,7 @@ export function parseOptionalSingleTarget(
if (tokens.length > 1) {
return { ok: false, error: usage };
}
const token = tokens[0]?.trim() || "";
const token = normalizeOptionalString(tokens[0]) ?? "";
return {
ok: true,
...(token ? { sessionToken: token } : {}),
@@ -492,8 +493,11 @@ export function resolveCommandRequestId(params: HandleCommandsParams): string {
params.ctx.MessageSid ??
params.ctx.MessageSidFirst ??
params.ctx.MessageSidLast;
if (typeof value === "string" && value.trim()) {
return value.trim();
if (typeof value === "string") {
const normalizedValue = normalizeOptionalString(value);
if (normalizedValue) {
return normalizedValue;
}
}
if (typeof value === "number" || typeof value === "bigint") {
return String(value);

View File

@@ -6,6 +6,7 @@
*/
import { matchPluginCommand, executePluginCommand } from "../../plugins/commands.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import type { CommandHandler, CommandHandlerResult } from "./commands-types.js";
/**
@@ -50,7 +51,7 @@ export const handlePluginCommand: CommandHandler = async (
typeof params.ctx.MessageThreadId === "number"
? params.ctx.MessageThreadId
: undefined,
threadParentId: params.ctx.ThreadParentId?.trim() || undefined,
threadParentId: normalizeOptionalString(params.ctx.ThreadParentId),
});
return {

View File

@@ -14,6 +14,7 @@ import { formatErrorMessage } from "../../infra/errors.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import type { FinalizedMsgContext } from "../templating.js";
@@ -94,8 +95,11 @@ function hasInboundMediaForAcp(ctx: FinalizedMsgContext): boolean {
function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
if (typeof id === "string" && id.trim()) {
return id.trim();
if (typeof id === "string") {
const normalizedId = normalizeOptionalString(id);
if (normalizedId) {
return normalizedId;
}
}
if (typeof id === "number" || typeof id === "bigint") {
return String(id);
@@ -109,33 +113,24 @@ async function hasBoundConversationForSession(params: {
channelRaw: string | undefined;
accountIdRaw: string | undefined;
}): Promise<boolean> {
const channel = String(params.channelRaw ?? "")
.trim()
.toLowerCase();
const channel = normalizeOptionalString(params.channelRaw)?.toLowerCase() ?? "";
if (!channel) {
return false;
}
const accountId = String(params.accountIdRaw ?? "")
.trim()
.toLowerCase();
const accountId = normalizeOptionalString(params.accountIdRaw)?.toLowerCase() ?? "";
const channels = params.cfg.channels as Record<string, { defaultAccount?: unknown } | undefined>;
const configuredDefaultAccountId = channels?.[channel]?.defaultAccount;
const normalizedAccountId =
accountId ||
(typeof configuredDefaultAccountId === "string" && configuredDefaultAccountId.trim()
? configuredDefaultAccountId.trim().toLowerCase()
: "default");
accountId || normalizeOptionalString(configuredDefaultAccountId)?.toLowerCase() || "default";
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const bindingService = getSessionBindingService();
const bindings = bindingService.listBySession(params.sessionKey);
return bindings.some((binding) => {
const bindingChannel = String(binding.conversation.channel ?? "")
.trim()
.toLowerCase();
const bindingAccountId = String(binding.conversation.accountId ?? "")
.trim()
.toLowerCase();
const conversationId = String(binding.conversation.conversationId ?? "").trim();
const bindingChannel =
normalizeOptionalString(binding.conversation.channel)?.toLowerCase() ?? "";
const bindingAccountId =
normalizeOptionalString(binding.conversation.accountId)?.toLowerCase() ?? "";
const conversationId = normalizeOptionalString(binding.conversation.conversationId) ?? "";
return (
bindingChannel === channel &&
(bindingAccountId || "default") === normalizedAccountId &&
@@ -149,21 +144,17 @@ function resolveDispatchAccountId(params: {
channelRaw: string | undefined;
accountIdRaw: string | undefined;
}): string | undefined {
const channel = String(params.channelRaw ?? "")
.trim()
.toLowerCase();
const channel = normalizeOptionalString(params.channelRaw)?.toLowerCase() ?? "";
if (!channel) {
return params.accountIdRaw?.trim() || undefined;
return normalizeOptionalString(params.accountIdRaw);
}
const explicit = params.accountIdRaw?.trim();
const explicit = normalizeOptionalString(params.accountIdRaw);
if (explicit) {
return explicit;
}
const channels = params.cfg.channels as Record<string, { defaultAccount?: unknown } | undefined>;
const configuredDefaultAccountId = channels?.[channel]?.defaultAccount;
return typeof configuredDefaultAccountId === "string" && configuredDefaultAccountId.trim()
? configuredDefaultAccountId.trim()
: undefined;
return normalizeOptionalString(configuredDefaultAccountId);
}
export type AcpDispatchAttemptResult = {
@@ -315,7 +306,7 @@ export async function tryDispatchAcpReply(params: {
recordProcessed: DispatchProcessedRecorder;
markIdle: (reason: string) => void;
}): Promise<AcpDispatchAttemptResult | null> {
const sessionKey = params.sessionKey?.trim();
const sessionKey = normalizeOptionalString(params.sessionKey);
if (!sessionKey || params.bypassForCommand) {
return null;
}
@@ -362,11 +353,9 @@ export async function tryDispatchAcpReply(params: {
const resolvedAcpAgent =
acpResolution.kind === "ready"
? (
acpResolution.meta.agent?.trim() ||
params.cfg.acp?.defaultAgent?.trim() ||
resolveAgentIdFromSessionKey(canonicalSessionKey)
).trim()
? (normalizeOptionalString(acpResolution.meta.agent) ??
normalizeOptionalString(params.cfg.acp?.defaultAgent) ??
resolveAgentIdFromSessionKey(canonicalSessionKey))
: resolveAgentIdFromSessionKey(canonicalSessionKey);
const effectiveDispatchAccountId = resolveDispatchAccountId({
cfg: params.cfg,
@@ -470,9 +459,10 @@ export async function tryDispatchAcpReply(params: {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
if (params.runId?.trim()) {
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId: params.runId.trim(),
runId,
sessionKey,
stream: "lifecycle",
data: {
@@ -507,9 +497,10 @@ export async function tryDispatchAcpReply(params: {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
if (params.runId?.trim()) {
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId: params.runId.trim(),
runId,
sessionKey,
stream: "lifecycle",
data: {

View File

@@ -5,6 +5,7 @@ import { normalizeAnyChannelId } from "../../channels/registry.js";
import type { OpenClawConfig } from "../../config/config.js";
import { applyMergePatch } from "../../config/merge-patch.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { normalizeCommandBody } from "../commands-registry.js";
import type { MsgContext, TemplateContext } from "../templating.js";
import type { CommandContext } from "./commands-types.js";
@@ -26,12 +27,14 @@ function isSlowReplyTestAllowed(env: NodeJS.ProcessEnv = process.env): boolean {
}
function resolveFastSessionKey(ctx: MsgContext): string {
const existing = ctx.SessionKey?.trim();
const existing = normalizeOptionalString(ctx.SessionKey);
if (existing) {
return existing;
}
const provider = ctx.Provider?.trim() || ctx.Surface?.trim() || "main";
const destination = ctx.To?.trim() || ctx.From?.trim() || "default";
const provider =
normalizeOptionalString(ctx.Provider) ?? normalizeOptionalString(ctx.Surface) ?? "main";
const destination =
normalizeOptionalString(ctx.To) ?? normalizeOptionalString(ctx.From) ?? "default";
return `agent:main:${provider}:${destination}`;
}
@@ -152,10 +155,10 @@ export function buildFastReplyCommandContext(params: {
}): CommandContext {
const { ctx, cfg, agentId, sessionKey, isGroup, triggerBodyNormalized, commandAuthorized } =
params;
const surface = (ctx.Surface ?? ctx.Provider ?? "").trim().toLowerCase();
const channel = (ctx.Provider ?? surface).trim().toLowerCase();
const from = ctx.From?.trim() || undefined;
const to = ctx.To?.trim() || undefined;
const surface = normalizeOptionalString(ctx.Surface ?? ctx.Provider)?.toLowerCase() ?? "";
const channel = normalizeOptionalString(ctx.Provider ?? surface)?.toLowerCase() ?? "";
const from = normalizeOptionalString(ctx.From);
const to = normalizeOptionalString(ctx.To);
return {
surface,
channel,
@@ -212,9 +215,15 @@ export function initFastReplySessionState(params: {
sessionFile,
updatedAt: now,
...(normalizedChatType ? { chatType: normalizedChatType } : {}),
...(ctx.Provider?.trim() ? { channel: ctx.Provider.trim() } : {}),
...(ctx.GroupSubject?.trim() ? { subject: ctx.GroupSubject.trim() } : {}),
...(ctx.GroupChannel?.trim() ? { groupChannel: ctx.GroupChannel.trim() } : {}),
...(normalizeOptionalString(ctx.Provider)
? { channel: normalizeOptionalString(ctx.Provider) }
: {}),
...(normalizeOptionalString(ctx.GroupSubject)
? { subject: normalizeOptionalString(ctx.GroupSubject) }
: {}),
...(normalizeOptionalString(ctx.GroupChannel)
? { groupChannel: normalizeOptionalString(ctx.GroupChannel) }
: {}),
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
const sessionCtx: TemplateContext = {
@@ -235,7 +244,7 @@ export function initFastReplySessionState(params: {
resetTriggered,
systemSent: false,
abortedLastRun: false,
storePath: cfg.session?.store?.trim() ?? "",
storePath: normalizeOptionalString(cfg.session?.store) ?? "",
sessionScope,
groupResolution: undefined,
isGroup,

View File

@@ -13,6 +13,7 @@ import type { SessionEntry } from "../../config/sessions/types.js";
import { logVerbose } from "../../globals.js";
import { clearCommandLane, getQueueSize } from "../../process/command-queue.js";
import { normalizeMainKey } from "../../routing/session-key.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { hasControlCommand } from "../command-detection.js";
import { resolveEnvelopeFormatOptions } from "../envelope.js";
@@ -220,7 +221,7 @@ export async function runPreparedReply(
silentToken: SILENT_REPLY_TOKEN,
})
: "";
const groupSystemPrompt = sessionCtx.GroupSystemPrompt?.trim() ?? "";
const groupSystemPrompt = normalizeOptionalString(sessionCtx.GroupSystemPrompt) ?? "";
const inboundMetaPrompt = buildInboundMetaSystemPrompt(
isNewSession ? sessionCtx : { ...sessionCtx, ThreadStarterBody: undefined },
{ includeFormattingHints: !useFastReplyRuntime },
@@ -256,7 +257,7 @@ export async function runPreparedReply(
isNewSession
? {
...sessionCtx,
...(sessionCtx.ThreadHistoryBody?.trim()
...(normalizeOptionalString(sessionCtx.ThreadHistoryBody)
? { InboundHistory: undefined, ThreadStarterBody: undefined }
: {}),
}
@@ -306,8 +307,8 @@ export async function runPreparedReply(
}
}
const prefixedBodyCore = prefixedBodyBase;
const threadStarterBody = ctx.ThreadStarterBody?.trim();
const threadHistoryBody = ctx.ThreadHistoryBody?.trim();
const threadStarterBody = normalizeOptionalString(ctx.ThreadStarterBody);
const threadHistoryBody = normalizeOptionalString(ctx.ThreadHistoryBody);
const threadContextNote = threadHistoryBody
? `[Thread history - for context]\n${threadHistoryBody}`
: threadStarterBody
@@ -542,12 +543,14 @@ export async function runPreparedReply(
}),
agentAccountId: sessionCtx.AccountId,
groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined,
groupChannel: sessionCtx.GroupChannel?.trim() ?? sessionCtx.GroupSubject?.trim(),
groupSpace: sessionCtx.GroupSpace?.trim() ?? undefined,
senderId: sessionCtx.SenderId?.trim() || undefined,
senderName: sessionCtx.SenderName?.trim() || undefined,
senderUsername: sessionCtx.SenderUsername?.trim() || undefined,
senderE164: sessionCtx.SenderE164?.trim() || undefined,
groupChannel:
normalizeOptionalString(sessionCtx.GroupChannel) ??
normalizeOptionalString(sessionCtx.GroupSubject),
groupSpace: normalizeOptionalString(sessionCtx.GroupSpace),
senderId: normalizeOptionalString(sessionCtx.SenderId),
senderName: normalizeOptionalString(sessionCtx.SenderName),
senderUsername: normalizeOptionalString(sessionCtx.SenderUsername),
senderE164: normalizeOptionalString(sessionCtx.SenderE164),
senderIsOwner: command.senderIsOwner,
sessionFile: preparedSessionState.sessionFile,
workspaceDir,

View File

@@ -1,4 +1,5 @@
import { resolveGlobalMap } from "../../../shared/global-singleton.js";
import { normalizeOptionalString } from "../../../shared/string-coerce.js";
import { applyQueueRuntimeSettings } from "../../../utils/queue-helpers.js";
import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js";
@@ -123,8 +124,9 @@ export function refreshQueuedFollowupSession(params: {
}
if (shouldRewriteSession && run.sessionId === params.previousSessionId) {
run.sessionId = params.nextSessionId!;
if (params.nextSessionFile?.trim()) {
run.sessionFile = params.nextSessionFile;
const nextSessionFile = normalizeOptionalString(params.nextSessionFile);
if (nextSessionFile) {
run.sessionFile = nextSessionFile;
}
}
if (shouldRewriteSelection) {
@@ -135,7 +137,7 @@ export function refreshQueuedFollowupSession(params: {
run.model = params.nextModel;
}
if (Object.hasOwn(params, "nextAuthProfileId")) {
run.authProfileId = params.nextAuthProfileId?.trim() || undefined;
run.authProfileId = normalizeOptionalString(params.nextAuthProfileId);
}
if (Object.hasOwn(params, "nextAuthProfileIdSource")) {
run.authProfileIdSource = run.authProfileId ? params.nextAuthProfileIdSource : undefined;

View File

@@ -1,5 +1,6 @@
import type { ReplyToMode } from "../../config/types.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
@@ -9,11 +10,11 @@ import {
} from "./reply-threading.js";
export function formatBtwTextForExternalDelivery(payload: ReplyPayload): string | undefined {
const text = payload.text?.trim();
const text = normalizeOptionalString(payload.text);
if (!text) {
return payload.text;
}
const question = payload.btw?.question?.trim();
const question = normalizeOptionalString(payload.btw?.question);
if (!question) {
return payload.text;
}
@@ -28,8 +29,8 @@ function resolveReplyThreadingForPayload(params: {
currentMessageId?: string;
replyThreading?: ReplyThreadingPolicy;
}): ReplyPayload {
const implicitReplyToId = params.implicitReplyToId?.trim() || undefined;
const currentMessageId = params.currentMessageId?.trim() || undefined;
const implicitReplyToId = normalizeOptionalString(params.implicitReplyToId);
const currentMessageId = normalizeOptionalString(params.currentMessageId);
const allowImplicitReplyToCurrentMessage = resolveImplicitCurrentMessageReplyAllowance(
params.replyToMode,
params.replyThreading,
@@ -91,7 +92,7 @@ export function applyReplyThreading(params: {
}): ReplyPayload[] {
const { payloads, replyToMode, replyToChannel, currentMessageId, replyThreading } = params;
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
const implicitReplyToId = currentMessageId?.trim() || undefined;
const implicitReplyToId = normalizeOptionalString(currentMessageId);
return payloads
.map((payload) =>
resolveReplyThreadingForPayload({