mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:10:42 +00:00
refactor(auto-reply): extract effective reply route resolution
This commit is contained in:
@@ -237,6 +237,17 @@ const TOOLING_TEST_TARGETS = new Map([
|
||||
]);
|
||||
const SOURCE_TEST_TARGETS = new Map([
|
||||
["src/agents/live-model-turn-probes.ts", ["src/agents/live-model-turn-probes.test.ts"]],
|
||||
[
|
||||
"src/auto-reply/reply/dispatch-from-config.ts",
|
||||
["src/auto-reply/reply/dispatch-from-config.test.ts"],
|
||||
],
|
||||
[
|
||||
"src/auto-reply/reply/effective-reply-route.ts",
|
||||
[
|
||||
"src/auto-reply/reply/effective-reply-route.test.ts",
|
||||
"src/auto-reply/reply/dispatch-from-config.test.ts",
|
||||
],
|
||||
],
|
||||
]);
|
||||
const GENERATED_CHANGED_TEST_TARGETS = new Set([
|
||||
"src/canvas-host/a2ui/.bundle.hash",
|
||||
|
||||
@@ -67,6 +67,7 @@ import type {
|
||||
DispatchFromConfigParams,
|
||||
DispatchFromConfigResult,
|
||||
} from "./dispatch-from-config.types.js";
|
||||
import { resolveEffectiveReplyRoute } from "./effective-reply-route.js";
|
||||
import { claimInboundDedupe, commitInboundDedupe, releaseInboundDedupe } from "./inbound-dedupe.js";
|
||||
import { resolveReplyRoutingDecision } from "./routing-policy.js";
|
||||
import { resolveRunTypingPolicy } from "./typing-policy.js";
|
||||
@@ -105,33 +106,6 @@ function loadReplyMediaPathsRuntime() {
|
||||
return replyMediaPathsRuntimePromise;
|
||||
}
|
||||
|
||||
function isSystemEventProvider(provider?: string): boolean {
|
||||
return provider === "heartbeat" || provider === "cron-event" || provider === "exec-event";
|
||||
}
|
||||
|
||||
function resolveEffectiveReplyRoute(params: {
|
||||
ctx: Pick<FinalizedMsgContext, "Provider" | "OriginatingChannel" | "OriginatingTo" | "AccountId">;
|
||||
entry?: Pick<SessionEntry, "deliveryContext" | "lastChannel" | "lastTo" | "lastAccountId">;
|
||||
}): { channel?: string; to?: string; accountId?: string } {
|
||||
if (!isSystemEventProvider(params.ctx.Provider)) {
|
||||
return {
|
||||
channel: params.ctx.OriginatingChannel,
|
||||
to: params.ctx.OriginatingTo,
|
||||
accountId: params.ctx.AccountId,
|
||||
};
|
||||
}
|
||||
const persistedDeliveryContext = params.entry?.deliveryContext;
|
||||
return {
|
||||
channel:
|
||||
params.ctx.OriginatingChannel ??
|
||||
persistedDeliveryContext?.channel ??
|
||||
params.entry?.lastChannel,
|
||||
to: params.ctx.OriginatingTo ?? persistedDeliveryContext?.to ?? params.entry?.lastTo,
|
||||
accountId:
|
||||
params.ctx.AccountId ?? persistedDeliveryContext?.accountId ?? params.entry?.lastAccountId,
|
||||
};
|
||||
}
|
||||
|
||||
async function maybeApplyTtsToReplyPayload(
|
||||
params: Parameters<Awaited<ReturnType<typeof loadTtsRuntime>>["maybeApplyTtsToPayload"]>[0],
|
||||
) {
|
||||
@@ -314,7 +288,7 @@ export async function dispatchReplyFromConfig(
|
||||
"",
|
||||
) ?? "off",
|
||||
});
|
||||
const effectiveReplyRoute = resolveEffectiveReplyRoute({ ctx, entry: sessionStoreEntry.entry });
|
||||
const replyRoute = resolveEffectiveReplyRoute({ ctx, entry: sessionStoreEntry.entry });
|
||||
// Restore route thread context only from the active turn or the thread-scoped session key.
|
||||
// Do not read thread ids from the normalised session store here: `origin.threadId` can be
|
||||
// folded back into lastThreadId/deliveryContext during store normalisation and resurrect a
|
||||
@@ -347,7 +321,7 @@ export async function dispatchReplyFromConfig(
|
||||
//
|
||||
// Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts`
|
||||
const suppressAcpChildUserDelivery = isParentOwnedBackgroundAcpSession(sessionStoreEntry.entry);
|
||||
const normalizedOriginatingChannel = normalizeMessageChannel(effectiveReplyRoute.channel);
|
||||
const normalizedRouteReplyChannel = normalizeMessageChannel(replyRoute.channel);
|
||||
const normalizedProviderChannel = normalizeMessageChannel(ctx.Provider);
|
||||
const normalizedSurfaceChannel = normalizeMessageChannel(ctx.Surface);
|
||||
const normalizedCurrentSurface = normalizedProviderChannel ?? normalizedSurfaceChannel;
|
||||
@@ -358,30 +332,34 @@ export async function dispatchReplyFromConfig(
|
||||
const hasRouteReplyCandidate = Boolean(
|
||||
!suppressAcpChildUserDelivery &&
|
||||
!isInternalWebchatTurn &&
|
||||
normalizedOriginatingChannel &&
|
||||
effectiveReplyRoute.to &&
|
||||
normalizedOriginatingChannel !== normalizedCurrentSurface,
|
||||
normalizedRouteReplyChannel &&
|
||||
replyRoute.to &&
|
||||
normalizedRouteReplyChannel !== normalizedCurrentSurface,
|
||||
);
|
||||
const routeReplyRuntime = hasRouteReplyCandidate ? await loadRouteReplyRuntime() : undefined;
|
||||
const { originatingChannel, currentSurface, shouldRouteToOriginating, shouldSuppressTyping } =
|
||||
resolveReplyRoutingDecision({
|
||||
provider: ctx.Provider,
|
||||
surface: ctx.Surface,
|
||||
explicitDeliverRoute: ctx.ExplicitDeliverRoute,
|
||||
originatingChannel: effectiveReplyRoute.channel,
|
||||
originatingTo: effectiveReplyRoute.to,
|
||||
suppressDirectUserDelivery: suppressAcpChildUserDelivery,
|
||||
isRoutableChannel: routeReplyRuntime?.isRoutableChannel ?? (() => false),
|
||||
});
|
||||
const originatingTo = effectiveReplyRoute.to;
|
||||
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
|
||||
const {
|
||||
originatingChannel: routeReplyChannel,
|
||||
currentSurface,
|
||||
shouldRouteToOriginating,
|
||||
shouldSuppressTyping,
|
||||
} = resolveReplyRoutingDecision({
|
||||
provider: ctx.Provider,
|
||||
surface: ctx.Surface,
|
||||
explicitDeliverRoute: ctx.ExplicitDeliverRoute,
|
||||
originatingChannel: replyRoute.channel,
|
||||
originatingTo: replyRoute.to,
|
||||
suppressDirectUserDelivery: suppressAcpChildUserDelivery,
|
||||
isRoutableChannel: routeReplyRuntime?.isRoutableChannel ?? (() => false),
|
||||
});
|
||||
const routeReplyTo = replyRoute.to;
|
||||
const deliveryChannel = shouldRouteToOriginating ? routeReplyChannel : currentSurface;
|
||||
const { createReplyMediaPathNormalizer } = await loadReplyMediaPathsRuntime();
|
||||
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
|
||||
cfg,
|
||||
sessionKey: acpDispatchSessionKey,
|
||||
workspaceDir: resolveAgentWorkspaceDir(cfg, sessionAgentId),
|
||||
messageProvider: ttsChannel,
|
||||
accountId: effectiveReplyRoute.accountId,
|
||||
messageProvider: deliveryChannel,
|
||||
accountId: replyRoute.accountId,
|
||||
groupId,
|
||||
groupChannel: ctx.GroupChannel,
|
||||
groupSpace: ctx.GroupSpace,
|
||||
@@ -401,19 +379,19 @@ export async function dispatchReplyFromConfig(
|
||||
payload: ReplyPayload,
|
||||
options?: { abortSignal?: AbortSignal; mirror?: boolean },
|
||||
) => {
|
||||
if (!shouldRouteToOriginating || !originatingChannel || !originatingTo || !routeReplyRuntime) {
|
||||
if (!shouldRouteToOriginating || !routeReplyChannel || !routeReplyTo || !routeReplyRuntime) {
|
||||
return null;
|
||||
}
|
||||
return await routeReplyRuntime.routeReply({
|
||||
payload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
channel: routeReplyChannel,
|
||||
to: routeReplyTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
policySessionKey:
|
||||
ctx.CommandSource === "native"
|
||||
? (ctx.CommandTargetSessionKey ?? ctx.SessionKey)
|
||||
: ctx.SessionKey,
|
||||
accountId: effectiveReplyRoute.accountId,
|
||||
accountId: replyRoute.accountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
@@ -431,7 +409,7 @@ export async function dispatchReplyFromConfig(
|
||||
* Helper to send a payload via route-reply (async).
|
||||
* Only used when actually routing to a different provider.
|
||||
* Note: Only called when shouldRouteToOriginating is true, so
|
||||
* originatingChannel and originatingTo are guaranteed to be defined.
|
||||
* routeReplyChannel and routeReplyTo are guaranteed to be defined.
|
||||
*/
|
||||
const sendPayloadAsync = async (
|
||||
payload: ReplyPayload,
|
||||
@@ -440,7 +418,7 @@ export async function dispatchReplyFromConfig(
|
||||
): Promise<void> => {
|
||||
// Keep the runtime guard explicit because this helper is called from nested
|
||||
// reply callbacks where TypeScript cannot narrow shouldRouteToOriginating.
|
||||
if (!routeReplyRuntime || !originatingChannel || !originatingTo) {
|
||||
if (!routeReplyRuntime || !routeReplyChannel || !routeReplyTo) {
|
||||
return;
|
||||
}
|
||||
if (abortSignal?.aborted) {
|
||||
@@ -500,9 +478,9 @@ export async function dispatchReplyFromConfig(
|
||||
entry: sessionStoreEntry.entry,
|
||||
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
|
||||
channel:
|
||||
(shouldRouteToOriginating ? originatingChannel : undefined) ??
|
||||
(shouldRouteToOriginating ? routeReplyChannel : undefined) ??
|
||||
sessionStoreEntry.entry?.channel ??
|
||||
effectiveReplyRoute.channel ??
|
||||
replyRoute.channel ??
|
||||
ctx.Surface ??
|
||||
ctx.Provider ??
|
||||
undefined,
|
||||
@@ -670,7 +648,7 @@ export async function dispatchReplyFromConfig(
|
||||
const ttsPayload = await maybeApplyTtsToReplyPayload({
|
||||
payload,
|
||||
cfg,
|
||||
channel: ttsChannel,
|
||||
channel: deliveryChannel,
|
||||
kind: "final",
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
@@ -740,11 +718,11 @@ export async function dispatchReplyFromConfig(
|
||||
images: params.replyOptions?.images,
|
||||
inboundAudio,
|
||||
sessionTtsAuto,
|
||||
ttsChannel,
|
||||
ttsChannel: deliveryChannel,
|
||||
suppressUserDelivery: suppressHookUserDelivery,
|
||||
shouldRouteToOriginating,
|
||||
originatingChannel,
|
||||
originatingTo,
|
||||
originatingChannel: routeReplyChannel,
|
||||
originatingTo: routeReplyTo,
|
||||
shouldSendToolSummaries,
|
||||
sendPolicy,
|
||||
},
|
||||
@@ -910,7 +888,7 @@ export async function dispatchReplyFromConfig(
|
||||
requestedPolicy: params.replyOptions?.typingPolicy,
|
||||
suppressTyping:
|
||||
suppressDelivery || params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
|
||||
originatingChannel,
|
||||
originatingChannel: routeReplyChannel,
|
||||
systemEvent: shouldRouteToOriginating,
|
||||
});
|
||||
const suppressDefaultToolProgressMessages =
|
||||
@@ -937,7 +915,7 @@ export async function dispatchReplyFromConfig(
|
||||
const ttsPayload = await maybeApplyTtsToReplyPayload({
|
||||
payload,
|
||||
cfg,
|
||||
channel: ttsChannel,
|
||||
channel: deliveryChannel,
|
||||
kind: "tool",
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
@@ -1038,7 +1016,7 @@ export async function dispatchReplyFromConfig(
|
||||
const ttsPayload = await maybeApplyTtsToReplyPayload({
|
||||
payload,
|
||||
cfg,
|
||||
channel: ttsChannel,
|
||||
channel: deliveryChannel,
|
||||
kind: "block",
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
@@ -1069,11 +1047,11 @@ export async function dispatchReplyFromConfig(
|
||||
images: params.replyOptions?.images,
|
||||
inboundAudio,
|
||||
sessionTtsAuto,
|
||||
ttsChannel,
|
||||
ttsChannel: deliveryChannel,
|
||||
suppressUserDelivery: suppressHookUserDelivery,
|
||||
shouldRouteToOriginating,
|
||||
originatingChannel,
|
||||
originatingTo,
|
||||
originatingChannel: routeReplyChannel,
|
||||
originatingTo: routeReplyTo,
|
||||
shouldSendToolSummaries,
|
||||
sendPolicy,
|
||||
isTailDispatch: true,
|
||||
@@ -1126,7 +1104,7 @@ export async function dispatchReplyFromConfig(
|
||||
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
|
||||
payload: { text: accumulatedBlockText },
|
||||
cfg,
|
||||
channel: ttsChannel,
|
||||
channel: deliveryChannel,
|
||||
kind: "final",
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
|
||||
160
src/auto-reply/reply/effective-reply-route.test.ts
Normal file
160
src/auto-reply/reply/effective-reply-route.test.ts
Normal file
@@ -0,0 +1,160 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
isSystemEventProvider,
|
||||
resolveEffectiveReplyRoute,
|
||||
type EffectiveReplyRouteContext,
|
||||
type EffectiveReplyRouteEntry,
|
||||
} from "./effective-reply-route.js";
|
||||
|
||||
const ctx = (params: EffectiveReplyRouteContext): EffectiveReplyRouteContext => params;
|
||||
const entry = (params: EffectiveReplyRouteEntry): EffectiveReplyRouteEntry => params;
|
||||
|
||||
describe("resolveEffectiveReplyRoute", () => {
|
||||
it("uses live origin context for normal providers", () => {
|
||||
expect(
|
||||
resolveEffectiveReplyRoute({
|
||||
ctx: ctx({
|
||||
Provider: "slack",
|
||||
OriginatingChannel: "discord",
|
||||
OriginatingTo: "channel:live",
|
||||
AccountId: "live-account",
|
||||
}),
|
||||
entry: entry({
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "chat:persisted",
|
||||
accountId: "persisted-account",
|
||||
},
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: "last-to",
|
||||
lastAccountId: "last-account",
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "discord",
|
||||
to: "channel:live",
|
||||
accountId: "live-account",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not use persisted fallbacks for normal providers", () => {
|
||||
expect(
|
||||
resolveEffectiveReplyRoute({
|
||||
ctx: ctx({ Provider: "slack" }),
|
||||
entry: entry({
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "chat:persisted",
|
||||
accountId: "persisted-account",
|
||||
},
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: "last-to",
|
||||
lastAccountId: "last-account",
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
channel: undefined,
|
||||
to: undefined,
|
||||
accountId: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("prefers live origin context for exec-event replies", () => {
|
||||
expect(
|
||||
resolveEffectiveReplyRoute({
|
||||
ctx: ctx({
|
||||
Provider: "exec-event",
|
||||
OriginatingChannel: "telegram",
|
||||
OriginatingTo: "chat:live",
|
||||
AccountId: "live-account",
|
||||
}),
|
||||
entry: entry({
|
||||
deliveryContext: {
|
||||
channel: "discord",
|
||||
to: "channel:persisted",
|
||||
accountId: "persisted-account",
|
||||
},
|
||||
lastChannel: "slack",
|
||||
lastTo: "last-to",
|
||||
lastAccountId: "last-account",
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "telegram",
|
||||
to: "chat:live",
|
||||
accountId: "live-account",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to deliveryContext for exec-event replies", () => {
|
||||
expect(
|
||||
resolveEffectiveReplyRoute({
|
||||
ctx: ctx({ Provider: "exec-event" }),
|
||||
entry: entry({
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "chat:persisted",
|
||||
accountId: "persisted-account",
|
||||
},
|
||||
lastChannel: "slack",
|
||||
lastTo: "last-to",
|
||||
lastAccountId: "last-account",
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "telegram",
|
||||
to: "chat:persisted",
|
||||
accountId: "persisted-account",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to legacy last route fields for exec-event replies", () => {
|
||||
expect(
|
||||
resolveEffectiveReplyRoute({
|
||||
ctx: ctx({ Provider: "exec-event" }),
|
||||
entry: entry({
|
||||
lastChannel: "slack",
|
||||
lastTo: "last-to",
|
||||
lastAccountId: "last-account",
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "slack",
|
||||
to: "last-to",
|
||||
accountId: "last-account",
|
||||
});
|
||||
});
|
||||
|
||||
it("fills partial exec-event route from persisted context", () => {
|
||||
expect(
|
||||
resolveEffectiveReplyRoute({
|
||||
ctx: ctx({
|
||||
Provider: "exec-event",
|
||||
OriginatingChannel: "telegram",
|
||||
OriginatingTo: "chat:live",
|
||||
}),
|
||||
entry: entry({
|
||||
deliveryContext: {
|
||||
channel: "discord",
|
||||
to: "channel:persisted",
|
||||
accountId: "persisted-account",
|
||||
},
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "telegram",
|
||||
to: "chat:live",
|
||||
accountId: "persisted-account",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("isSystemEventProvider", () => {
|
||||
it("recognizes persisted-delivery event providers", () => {
|
||||
expect(isSystemEventProvider("heartbeat")).toBe(true);
|
||||
expect(isSystemEventProvider("cron-event")).toBe(true);
|
||||
expect(isSystemEventProvider("exec-event")).toBe(true);
|
||||
expect(isSystemEventProvider("slack")).toBe(false);
|
||||
expect(isSystemEventProvider(undefined)).toBe(false);
|
||||
});
|
||||
});
|
||||
45
src/auto-reply/reply/effective-reply-route.ts
Normal file
45
src/auto-reply/reply/effective-reply-route.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { FinalizedMsgContext } from "../templating.js";
|
||||
|
||||
export type EffectiveReplyRouteContext = Pick<
|
||||
FinalizedMsgContext,
|
||||
"Provider" | "OriginatingChannel" | "OriginatingTo" | "AccountId"
|
||||
>;
|
||||
|
||||
export type EffectiveReplyRouteEntry = Pick<
|
||||
SessionEntry,
|
||||
"deliveryContext" | "lastChannel" | "lastTo" | "lastAccountId"
|
||||
>;
|
||||
|
||||
export type EffectiveReplyRoute = {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
export function isSystemEventProvider(provider?: string): boolean {
|
||||
return provider === "heartbeat" || provider === "cron-event" || provider === "exec-event";
|
||||
}
|
||||
|
||||
export function resolveEffectiveReplyRoute(params: {
|
||||
ctx: EffectiveReplyRouteContext;
|
||||
entry?: EffectiveReplyRouteEntry;
|
||||
}): EffectiveReplyRoute {
|
||||
if (!isSystemEventProvider(params.ctx.Provider)) {
|
||||
return {
|
||||
channel: params.ctx.OriginatingChannel,
|
||||
to: params.ctx.OriginatingTo,
|
||||
accountId: params.ctx.AccountId,
|
||||
};
|
||||
}
|
||||
const persistedDeliveryContext = params.entry?.deliveryContext;
|
||||
return {
|
||||
channel:
|
||||
params.ctx.OriginatingChannel ??
|
||||
persistedDeliveryContext?.channel ??
|
||||
params.entry?.lastChannel,
|
||||
to: params.ctx.OriginatingTo ?? persistedDeliveryContext?.to ?? params.entry?.lastTo,
|
||||
accountId:
|
||||
params.ctx.AccountId ?? persistedDeliveryContext?.accountId ?? params.entry?.lastAccountId,
|
||||
};
|
||||
}
|
||||
@@ -244,6 +244,22 @@ describe("scripts/test-projects changed-target routing", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("routes auto-reply route source files to route regression tests", () => {
|
||||
expect(
|
||||
resolveChangedTestTargetPlan([
|
||||
"src/auto-reply/reply/dispatch-from-config.ts",
|
||||
"src/auto-reply/reply/effective-reply-route.ts",
|
||||
"src/auto-reply/reply/effective-reply-route.test.ts",
|
||||
]),
|
||||
).toEqual({
|
||||
mode: "targets",
|
||||
targets: [
|
||||
"src/auto-reply/reply/dispatch-from-config.test.ts",
|
||||
"src/auto-reply/reply/effective-reply-route.test.ts",
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it("routes changed utils and shared files to their light scoped lanes", () => {
|
||||
const plans = buildVitestRunPlans(["--changed", "origin/main"], process.cwd(), () => [
|
||||
"src/shared/string-normalization.ts",
|
||||
|
||||
Reference in New Issue
Block a user