refactor: dedupe thread id normalizers

This commit is contained in:
Peter Steinberger
2026-04-07 09:57:48 +01:00
parent edfc8eb91a
commit 255abc57b9
10 changed files with 43 additions and 89 deletions

View File

@@ -1,5 +1,5 @@
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { normalizeOptionalString, normalizeOptionalThreadValue } from "../shared/string-coerce.js";
import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
export type CronDeliveryPlan = {
@@ -24,16 +24,6 @@ function normalizeChannel(value: unknown): CronMessageChannel | undefined {
return trimmed as CronMessageChannel;
}
function normalizeThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value !== "string") {
return undefined;
}
return normalizeOptionalString(value);
}
export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
const delivery = job.delivery;
const hasDelivery = delivery && typeof delivery === "object";
@@ -54,7 +44,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
(delivery as { channel?: unknown } | undefined)?.channel,
);
const deliveryTo = normalizeOptionalString((delivery as { to?: unknown } | undefined)?.to);
const deliveryThreadId = normalizeThreadId(
const deliveryThreadId = normalizeOptionalThreadValue(
(delivery as { threadId?: unknown } | undefined)?.threadId,
);
const channel = deliveryChannel ?? "last";

View File

@@ -1,6 +1,9 @@
import crypto from "node:crypto";
import { normalizeAgentId } from "../../routing/session-key.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import {
normalizeOptionalString,
normalizeOptionalThreadValue,
} from "../../shared/string-coerce.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import {
coerceFiniteScheduleNumber,
@@ -715,13 +718,6 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload {
};
}
function normalizeOptionalThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
return normalizeOptionalString(value);
}
function mergeCronDelivery(
existing: CronDelivery | undefined,
patch: CronDeliveryPatch,
@@ -746,7 +742,7 @@ function mergeCronDelivery(
next.to = normalizeOptionalString(patch.to);
}
if ("threadId" in patch) {
next.threadId = normalizeOptionalThreadId(patch.threadId);
next.threadId = normalizeOptionalThreadValue(patch.threadId);
}
if ("accountId" in patch) {
next.accountId = normalizeOptionalString(patch.accountId);