refactor: dedupe gateway agent trimmed readers

This commit is contained in:
Peter Steinberger
2026-04-07 22:55:41 +01:00
parent 7999767a0f
commit fdf60c06b0
6 changed files with 75 additions and 107 deletions

View File

@@ -224,7 +224,7 @@ function resolvePlacementWithoutChannelPlugin(params: {
}
function normalizeLineConversationIdFallback(value: string | undefined): string | undefined {
const trimmed = value?.trim() ?? "";
const trimmed = normalizeOptionalString(value) ?? "";
if (!trimmed) {
return undefined;
}
@@ -249,7 +249,7 @@ function normalizeTelegramConversationIdFallback(params: {
return `${explicitGroupId}:topic:${explicitThreadId}`;
}
const trimmed = params.to?.trim() ?? "";
const trimmed = normalizeOptionalString(params.to) ?? "";
if (!trimmed) {
return undefined;
}
@@ -303,7 +303,7 @@ function isHeartbeatEnabledForSessionAgent(params: {
resolveAgentConfig(params.cfg, requesterAgentId)?.heartbeat?.every ??
params.cfg.agents?.defaults?.heartbeat?.every ??
DEFAULT_HEARTBEAT_EVERY;
const trimmedEvery = typeof heartbeatEvery === "string" ? heartbeatEvery.trim() : "";
const trimmedEvery = normalizeOptionalString(heartbeatEvery) ?? "";
if (!trimmedEvery) {
return false;
}
@@ -349,10 +349,10 @@ function hasSessionLocalHeartbeatRelayRoute(params: {
// Explicit delivery overrides are not session-local and can route updates
// to unrelated destinations (for example a pinned ops channel).
if (typeof heartbeat?.to === "string" && heartbeat.to.trim().length > 0) {
if (normalizeOptionalString(heartbeat?.to)) {
return false;
}
if (typeof heartbeat?.accountId === "string" && heartbeat.accountId.trim().length > 0) {
if (normalizeOptionalString(heartbeat?.accountId)) {
return false;
}
@@ -387,7 +387,7 @@ function resolveTargetAcpAgentId(params: {
}
function normalizeOptionalAgentId(value: string | undefined | null): string | undefined {
const trimmed = (value ?? "").trim();
const trimmed = normalizeOptionalString(value) ?? "";
if (!trimmed) {
return undefined;
}
@@ -430,7 +430,7 @@ async function resolveRuntimeCwdForAcpSpawn(params: {
if (!params.resolvedCwd) {
return undefined;
}
if (typeof params.explicitCwd === "string" && params.explicitCwd.trim()) {
if (normalizeOptionalString(params.explicitCwd)) {
return params.resolvedCwd;
}
try {
@@ -449,7 +449,7 @@ function resolveRequesterInternalSessionKey(params: {
requesterSessionKey?: string;
}): string {
const { mainKey, alias } = resolveMainSessionAlias(params.cfg);
const requesterSessionKey = params.requesterSessionKey?.trim();
const requesterSessionKey = normalizeOptionalString(params.requesterSessionKey);
return requesterSessionKey
? resolveInternalSessionKey({
key: requesterSessionKey,
@@ -505,8 +505,8 @@ function resolveConversationIdForThreadBinding(params: {
isGroup: true,
})?.conversationId
: null;
if (pluginResolvedConversationId?.trim()) {
return pluginResolvedConversationId.trim();
if (normalizeOptionalString(pluginResolvedConversationId)) {
return normalizeOptionalString(pluginResolvedConversationId);
}
if (channelKey === "line") {
const lineConversationId = normalizeLineConversationIdFallback(params.groupId ?? params.to);
@@ -536,7 +536,7 @@ function resolveAcpSpawnChannelAccountId(params: {
accountId?: string;
}): string | undefined {
const channel = normalizeOptionalLowercaseString(params.channel);
const explicitAccountId = params.accountId?.trim();
const explicitAccountId = normalizeOptionalString(params.accountId);
if (explicitAccountId) {
return explicitAccountId;
}
@@ -545,9 +545,7 @@ function resolveAcpSpawnChannelAccountId(params: {
}
const channels = params.cfg.channels as Record<string, { defaultAccount?: unknown } | undefined>;
const configuredDefaultAccountId = channels?.[channel]?.defaultAccount;
return typeof configuredDefaultAccountId === "string" && configuredDefaultAccountId.trim()
? configuredDefaultAccountId.trim()
: "default";
return normalizeOptionalString(configuredDefaultAccountId) ?? "default";
}
function prepareAcpThreadBinding(params: {
@@ -663,7 +661,7 @@ function resolveAcpSpawnRequesterState(params: {
: false;
const hasThreadContext =
typeof params.ctx.agentThreadId === "string"
? params.ctx.agentThreadId.trim().length > 0
? Boolean(normalizeOptionalString(params.ctx.agentThreadId))
: params.ctx.agentThreadId != null;
const requesterAgentId = requesterParsedSession?.agentId;
@@ -887,9 +885,11 @@ function resolveAcpSpawnBootstrapDeliveryPlan(params: {
parentConversationId: params.binding?.conversation.parentConversationId,
});
const inferredDeliveryTo =
(bindingMatchesRequesterConversation ? params.requester.origin?.to?.trim() : undefined) ??
(bindingMatchesRequesterConversation
? normalizeOptionalString(params.requester.origin?.to)
: undefined) ??
boundDeliveryTarget.to ??
params.requester.origin?.to?.trim() ??
normalizeOptionalString(params.requester.origin?.to) ??
formatConversationTarget({
channel: params.requester.origin?.channel,
conversationId: deliveryThreadId,
@@ -932,7 +932,7 @@ export async function spawnAcpDirect(
});
}
const streamToParentRequested = params.streamTo === "parent";
const parentSessionKey = ctx.agentSessionKey?.trim();
const parentSessionKey = normalizeOptionalString(ctx.agentSessionKey);
if (streamToParentRequested && !parentSessionKey) {
return createAcpSpawnFailure({
status: "error",
@@ -1136,8 +1136,9 @@ export async function spawnAcpDirect(
},
timeoutMs: 10_000,
});
if (typeof response?.runId === "string" && response.runId.trim()) {
childRunId = response.runId.trim();
const responseRunId = normalizeOptionalString(response?.runId);
if (responseRunId) {
childRunId = responseRunId;
}
} catch (err) {
parentRelay?.dispose();
@@ -1181,7 +1182,7 @@ export async function spawnAcpDirect(
label: params.label,
task: params.task,
preferMetadata: true,
deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing",
deliveryStatus: requesterInternalKey ? "pending" : "parent_missing",
startedAt: Date.now(),
});
} catch (error) {
@@ -1213,7 +1214,7 @@ export async function spawnAcpDirect(
label: params.label,
task: params.task,
preferMetadata: true,
deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing",
deliveryStatus: requesterInternalKey ? "pending" : "parent_missing",
startedAt: Date.now(),
});
} catch (error) {

View File

@@ -82,7 +82,7 @@ function extractErrorField(value: unknown): string | undefined {
if (direct) {
return direct;
}
const status = typeof record.status === "string" ? record.status.trim() : "";
const status = normalizeOptionalString(record.status) ?? "";
if (!status || !isErrorLikeStatus(status)) {
return undefined;
}
@@ -318,7 +318,7 @@ export function extractToolResultMediaArtifact(
// structured media details or MEDIA: text.
if (hasImageContent) {
const details = record.details as Record<string, unknown> | undefined;
const p = typeof details?.path === "string" ? details.path.trim() : "";
const p = normalizeOptionalString(details?.path) ?? "";
if (p) {
return { mediaUrls: [p] };
}
@@ -389,7 +389,7 @@ export function extractMessagingToolSend(
args: Record<string, unknown>,
): MessagingToolSend | undefined {
// Provider docking: new provider tools must implement plugin.actions.extractToolSend.
const action = typeof args.action === "string" ? args.action.trim() : "";
const action = normalizeOptionalString(args.action) ?? "";
const accountId = normalizeOptionalString(args.accountId);
if (toolName === "message") {
if (action !== "send" && action !== "thread-reply") {
@@ -399,8 +399,8 @@ export function extractMessagingToolSend(
if (!toRaw) {
return undefined;
}
const providerRaw = typeof args.provider === "string" ? args.provider.trim() : "";
const channelRaw = typeof args.channel === "string" ? args.channel.trim() : "";
const providerRaw = normalizeOptionalString(args.provider) ?? "";
const channelRaw = normalizeOptionalString(args.channel) ?? "";
const providerHint = providerRaw || channelRaw;
const providerId = providerHint ? normalizeChannelId(providerHint) : null;
const provider = providerId ?? normalizeOptionalLowercaseString(providerHint) ?? "message";

View File

@@ -116,16 +116,13 @@ export function registerCronAddCommand(cron: Command) {
tz: opts.tz,
});
const wakeModeRaw = typeof opts.wake === "string" ? opts.wake : "now";
const wakeMode = wakeModeRaw.trim() || "now";
const wakeMode = normalizeOptionalString(opts.wake) ?? "now";
if (wakeMode !== "now" && wakeMode !== "next-heartbeat") {
throw new Error("--wake must be now or next-heartbeat");
}
const agentId =
typeof opts.agent === "string" && opts.agent.trim()
? sanitizeAgentId(opts.agent.trim())
: undefined;
const rawAgentId = normalizeOptionalString(opts.agent);
const agentId = rawAgentId ? sanitizeAgentId(rawAgentId) : undefined;
const hasAnnounce = Boolean(opts.announce) || opts.deliver === true;
const hasNoDeliver = opts.deliver === false;
@@ -135,8 +132,8 @@ export function registerCronAddCommand(cron: Command) {
}
const payload = (() => {
const systemEvent = typeof opts.systemEvent === "string" ? opts.systemEvent.trim() : "";
const message = typeof opts.message === "string" ? opts.message.trim() : "";
const systemEvent = normalizeOptionalString(opts.systemEvent) ?? "";
const message = normalizeOptionalString(opts.message) ?? "";
const chosen = [Boolean(systemEvent), Boolean(message)].filter(Boolean).length;
if (chosen !== 1) {
throw new Error("Choose exactly one payload: --system-event or --message");
@@ -157,8 +154,8 @@ export function registerCronAddCommand(cron: Command) {
typeof opts.tools === "string" && opts.tools.trim()
? opts.tools
.split(",")
.map((t: string) => t.trim())
.filter(Boolean)
.map((t: string) => normalizeOptionalString(t))
.filter((t): t is string => Boolean(t))
: undefined,
};
})();
@@ -168,13 +165,13 @@ export function registerCronAddCommand(cron: Command) {
? (name: string) => cmd.getOptionValueSource(name)
: () => undefined;
const sessionSource = optionSource("session");
const sessionTargetRaw = typeof opts.session === "string" ? opts.session.trim() : "";
const sessionTargetRaw = normalizeOptionalString(opts.session) ?? "";
const inferredSessionTarget = payload.kind === "agentTurn" ? "isolated" : "main";
const sessionTarget =
sessionSource === "cli" ? sessionTargetRaw || "" : inferredSessionTarget;
const isCustomSessionTarget =
normalizeLowercaseStringOrEmpty(sessionTarget).startsWith("session:") &&
sessionTarget.slice(8).trim().length > 0;
Boolean(normalizeOptionalString(sessionTarget.slice(8)));
const isIsolatedLikeSessionTarget =
sessionTarget === "isolated" || sessionTarget === "current" || isCustomSessionTarget;
if (sessionTarget !== "main" && !isIsolatedLikeSessionTarget) {
@@ -198,10 +195,7 @@ export function registerCronAddCommand(cron: Command) {
throw new Error("--announce/--no-deliver require a non-main agentTurn session target.");
}
const accountId =
typeof opts.account === "string" && opts.account.trim()
? opts.account.trim()
: undefined;
const accountId = normalizeOptionalString(opts.account);
if (accountId && (!isIsolatedLikeSessionTarget || payload.kind !== "agentTurn")) {
throw new Error("--account requires a non-main agentTurn job with delivery.");
@@ -216,21 +210,14 @@ export function registerCronAddCommand(cron: Command) {
: "announce"
: undefined;
const nameRaw = typeof opts.name === "string" ? opts.name : "";
const name = nameRaw.trim();
const name = normalizeOptionalString(opts.name) ?? "";
if (!name) {
throw new Error("--name is required");
}
const description =
typeof opts.description === "string" && opts.description.trim()
? opts.description.trim()
: undefined;
const description = normalizeOptionalString(opts.description);
const sessionKey =
typeof opts.sessionKey === "string" && opts.sessionKey.trim()
? opts.sessionKey.trim()
: undefined;
const sessionKey = normalizeOptionalString(opts.sessionKey);
const params = {
name,
@@ -246,10 +233,7 @@ export function registerCronAddCommand(cron: Command) {
delivery: deliveryMode
? {
mode: deliveryMode,
channel:
typeof opts.channel === "string" && opts.channel.trim()
? opts.channel.trim()
: undefined,
channel: normalizeOptionalString(opts.channel),
to: normalizeOptionalString(opts.to),
accountId,
bestEffort: opts.bestEffortDeliver ? true : undefined,

View File

@@ -20,6 +20,7 @@ import { inspectReadOnlyChannelAccount } from "../../channels/read-only-account-
import type { OpenClawConfig } from "../../config/config.js";
import { sha256HexPrefix } from "../../logging/redact-identifier.js";
import { asRecord } from "../../shared/record-coerce.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { formatTimeAgo } from "./format.js";
export type ChannelRow = {
@@ -62,7 +63,7 @@ function summarizeSources(sources: Array<string | undefined>): {
}
function existsSyncMaybe(p: string | undefined): boolean | null {
const path = p?.trim() || "";
const path = normalizeOptionalString(p) ?? "";
if (!path) {
return null;
}
@@ -355,14 +356,14 @@ function summarizeTokenConfig(params: {
const unavailable = enabled.filter((a) => hasConfiguredUnavailableCredentialStatus(a.account));
const ready = enabled.filter((a) => {
const rec = asRecord(a.account);
const bot = typeof rec.botToken === "string" ? rec.botToken.trim() : "";
const app = typeof rec.appToken === "string" ? rec.appToken.trim() : "";
const bot = normalizeOptionalString(rec.botToken) ?? "";
const app = normalizeOptionalString(rec.appToken) ?? "";
return Boolean(bot) && Boolean(app);
});
const partial = enabled.filter((a) => {
const rec = asRecord(a.account);
const bot = typeof rec.botToken === "string" ? rec.botToken.trim() : "";
const app = typeof rec.appToken === "string" ? rec.appToken.trim() : "";
const bot = normalizeOptionalString(rec.botToken) ?? "";
const app = normalizeOptionalString(rec.appToken) ?? "";
const hasBot = Boolean(bot);
const hasApp = Boolean(app);
return (hasBot && !hasApp) || (!hasBot && hasApp);
@@ -410,7 +411,7 @@ function summarizeTokenConfig(params: {
const unavailable = enabled.filter((a) => hasConfiguredUnavailableCredentialStatus(a.account));
const ready = enabled.filter((a) => {
const rec = asRecord(a.account);
const bot = typeof rec.botToken === "string" ? rec.botToken.trim() : "";
const bot = normalizeOptionalString(rec.botToken) ?? "";
return Boolean(bot);
});
@@ -441,7 +442,7 @@ function summarizeTokenConfig(params: {
const unavailable = enabled.filter((a) => hasConfiguredUnavailableCredentialStatus(a.account));
const ready = enabled.filter((a) => {
const rec = asRecord(a.account);
return typeof rec.token === "string" ? Boolean(rec.token.trim()) : false;
return Boolean(normalizeOptionalString(rec.token));
});
if (unavailable.length > 0) {
return {

View File

@@ -16,7 +16,11 @@ import { buildOutboundSessionContext } from "../../infra/outbound/session-contex
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
import { normalizePollInput } from "../../polls.js";
import { normalizeOptionalLowercaseString, readStringValue } from "../../shared/string-coerce.js";
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
readStringValue,
} from "../../shared/string-coerce.js";
import {
ErrorCodes,
errorShape,
@@ -218,16 +222,13 @@ export const sendHandlers: GatewayRequestHandlers = {
respond(result.ok, result.payload, result.error, meta);
return;
}
const to = request.to.trim();
const message = typeof request.message === "string" ? request.message.trim() : "";
const mediaUrl =
typeof request.mediaUrl === "string" && request.mediaUrl.trim().length > 0
? request.mediaUrl.trim()
: undefined;
const to = normalizeOptionalString(request.to) ?? "";
const message = normalizeOptionalString(request.message) ?? "";
const mediaUrl = normalizeOptionalString(request.mediaUrl);
const mediaUrls = Array.isArray(request.mediaUrls)
? request.mediaUrls
.map((entry) => (typeof entry === "string" ? entry.trim() : ""))
.filter((entry) => entry.length > 0)
.map((entry) => normalizeOptionalString(entry))
.filter((entry): entry is string => Boolean(entry))
: undefined;
if (!message && !mediaUrl && (mediaUrls?.length ?? 0) === 0) {
respond(
@@ -247,14 +248,8 @@ export const sendHandlers: GatewayRequestHandlers = {
return;
}
const { cfg, channel } = resolvedChannel;
const accountId =
typeof request.accountId === "string" && request.accountId.trim().length
? request.accountId.trim()
: undefined;
const threadId =
typeof request.threadId === "string" && request.threadId.trim().length
? request.threadId.trim()
: undefined;
const accountId = normalizeOptionalString(request.accountId);
const threadId = normalizeOptionalString(request.threadId);
const outboundChannel = channel;
const plugin = resolveOutboundChannelPlugin({ channel, cfg });
if (!plugin) {
@@ -299,14 +294,8 @@ export const sendHandlers: GatewayRequestHandlers = {
const mirrorMediaUrls = mirrorPayloads.flatMap(
(payload) => resolveSendableOutboundReplyParts(payload).mediaUrls,
);
const providedSessionKey =
typeof request.sessionKey === "string" && request.sessionKey.trim()
? (normalizeOptionalLowercaseString(request.sessionKey) ?? undefined)
: undefined;
const explicitAgentId =
typeof request.agentId === "string" && request.agentId.trim()
? request.agentId.trim()
: undefined;
const providedSessionKey = normalizeOptionalLowercaseString(request.sessionKey);
const explicitAgentId = normalizeOptionalString(request.agentId);
const sessionAgentId = providedSessionKey
? resolveSessionAgentId({ sessionKey: providedSessionKey, config: cfg })
: undefined;
@@ -469,14 +458,8 @@ export const sendHandlers: GatewayRequestHandlers = {
durationSeconds: request.durationSeconds,
durationHours: request.durationHours,
};
const threadId =
typeof request.threadId === "string" && request.threadId.trim().length
? request.threadId.trim()
: undefined;
const accountId =
typeof request.accountId === "string" && request.accountId.trim().length
? request.accountId.trim()
: undefined;
const threadId = normalizeOptionalString(request.threadId);
const accountId = normalizeOptionalString(request.accountId);
try {
if (!outbound?.sendPoll) {
respond(

View File

@@ -102,7 +102,7 @@ function requireSessionKey(key: unknown, respond: RespondFn): string | null {
: typeof key === "bigint"
? String(key)
: "";
const normalized = raw.trim();
const normalized = normalizeOptionalString(raw) ?? "";
if (!normalized) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required"));
return null;
@@ -634,8 +634,8 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const p = params;
const keysRaw = Array.isArray(p.keys) ? p.keys : [];
const keys = keysRaw
.map((key) => String(key ?? "").trim())
.filter(Boolean)
.map((key) => normalizeOptionalString(String(key ?? "")))
.filter((key): key is string => Boolean(key))
.slice(0, 64);
const limit =
typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, p.limit) : 12;
@@ -745,8 +745,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
if (!key) {
return;
}
const checkpointId =
typeof p.checkpointId === "string" && p.checkpointId.trim() ? p.checkpointId.trim() : "";
const checkpointId = normalizeOptionalString(p.checkpointId) ?? "";
if (!checkpointId) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "checkpointId required"));
return;
@@ -828,8 +827,8 @@ export const sessionsHandlers: GatewayRequestHandlers = {
storeKey: target.canonicalKey,
patch: {
key: target.canonicalKey,
label: typeof p.label === "string" ? p.label.trim() : undefined,
model: typeof p.model === "string" ? p.model.trim() : undefined,
label: normalizeOptionalString(p.label),
model: normalizeOptionalString(p.model),
},
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
});
@@ -1228,8 +1227,8 @@ export const sessionsHandlers: GatewayRequestHandlers = {
payload &&
typeof payload === "object" &&
Array.isArray((payload as { runIds?: unknown[] }).runIds)
? (payload as { runIds: unknown[] }).runIds.filter(
(value): value is string => typeof value === "string" && value.trim().length > 0,
? (payload as { runIds: unknown[] }).runIds.filter((value): value is string =>
Boolean(normalizeOptionalString(value)),
)
: [];
abortedRunId = runIds[0] ?? null;
@@ -1603,7 +1602,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
const raw = fs.readFileSync(filePath, "utf-8");
const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0);
const lines = raw.split(/\r?\n/).filter((l) => Boolean(normalizeOptionalString(l)));
if (lines.length <= maxLines) {
respond(
true,