fix(auto-reply): honor recovered system-event reply routes

This commit is contained in:
Peter Steinberger
2026-04-23 20:11:30 +01:00
parent 87a08dd4c2
commit ce933f3bbc
2 changed files with 133 additions and 23 deletions

View File

@@ -748,6 +748,102 @@ describe("dispatchReplyFromConfig", () => {
accountId: "acc-1",
}),
);
expect(replyMediaPathMocks.createReplyMediaPathNormalizer).toHaveBeenCalledWith(
expect.objectContaining({
messageProvider: "telegram",
accountId: "acc-1",
}),
);
expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith(
expect.objectContaining({
shouldRouteToOriginating: true,
originatingChannel: "telegram",
originatingTo: "telegram:999",
}),
expect.any(Object),
);
});
it("routes exec-event replies using last route fields when delivery context is missing", async () => {
setNoAbort();
mocks.routeReply.mockClear();
sessionStoreMocks.currentEntry = {
lastChannel: "discord",
lastTo: "channel:123",
lastAccountId: "default",
};
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "exec-event",
Surface: "exec-event",
SessionKey: "agent:main:main",
AccountId: undefined,
OriginatingChannel: undefined,
OriginatingTo: undefined,
});
const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload;
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(mocks.routeReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "channel:123",
accountId: "default",
}),
);
});
it("honors sendPolicy deny for recovered exec-event delivery channel", async () => {
setNoAbort();
mocks.routeReply.mockClear();
sessionStoreMocks.currentEntry = {
deliveryContext: {
channel: "telegram",
to: "telegram:999",
accountId: "acc-1",
},
lastChannel: "telegram",
lastTo: "telegram:999",
lastAccountId: "acc-1",
};
const cfg = {
session: {
sendPolicy: {
default: "allow",
rules: [{ action: "deny", match: { channel: "telegram" } }],
},
},
} as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "exec-event",
Surface: "exec-event",
SessionKey: "agent:main:main",
AccountId: undefined,
OriginatingChannel: undefined,
OriginatingTo: undefined,
});
const replyResolver = vi.fn(async () => ({ text: "hi" }) satisfies ReplyPayload);
const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(mocks.routeReply).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(result.queuedFinal).toBe(false);
expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith(
expect.objectContaining({
sendPolicy: "deny",
suppressUserDelivery: true,
shouldRouteToOriginating: true,
originatingChannel: "telegram",
originatingTo: "telegram:999",
}),
expect.any(Object),
);
});
it("falls back to thread-scoped session key when current ctx has no MessageThreadId", async () => {

View File

@@ -105,6 +105,33 @@ function loadReplyMediaPathsRuntime() {
return replyMediaPathsRuntimePromise;
}
function isSystemEventProvider(provider?: string): boolean {
return provider === "heartbeat" || provider === "cron-event" || provider === "exec-event";
}
function resolveEffectiveReplyRoute(params: {
ctx: Pick<FinalizedMsgContext, "Provider" | "OriginatingChannel" | "OriginatingTo" | "AccountId">;
entry?: Pick<SessionEntry, "deliveryContext" | "lastChannel" | "lastTo" | "lastAccountId">;
}): { channel?: string; to?: string; accountId?: string } {
if (!isSystemEventProvider(params.ctx.Provider)) {
return {
channel: params.ctx.OriginatingChannel,
to: params.ctx.OriginatingTo,
accountId: params.ctx.AccountId,
};
}
const persistedDeliveryContext = params.entry?.deliveryContext;
return {
channel:
params.ctx.OriginatingChannel ??
persistedDeliveryContext?.channel ??
params.entry?.lastChannel,
to: params.ctx.OriginatingTo ?? persistedDeliveryContext?.to ?? params.entry?.lastTo,
accountId:
params.ctx.AccountId ?? persistedDeliveryContext?.accountId ?? params.entry?.lastAccountId,
};
}
async function maybeApplyTtsToReplyPayload(
params: Parameters<Awaited<ReturnType<typeof loadTtsRuntime>>["maybeApplyTtsToPayload"]>[0],
) {
@@ -287,18 +314,7 @@ export async function dispatchReplyFromConfig(
"",
) ?? "off",
});
const isSystemEventTurn =
ctx.Provider === "heartbeat" || ctx.Provider === "cron-event" || ctx.Provider === "exec-event";
const persistedDeliveryContext = sessionStoreEntry.entry?.deliveryContext;
const fallbackOriginatingChannel = isSystemEventTurn
? (persistedDeliveryContext?.channel ?? sessionStoreEntry.entry?.lastChannel)
: undefined;
const fallbackOriginatingTo = isSystemEventTurn
? (persistedDeliveryContext?.to ?? sessionStoreEntry.entry?.lastTo)
: undefined;
const fallbackOriginatingAccountId = isSystemEventTurn
? (persistedDeliveryContext?.accountId ?? sessionStoreEntry.entry?.lastAccountId)
: undefined;
const effectiveReplyRoute = resolveEffectiveReplyRoute({ ctx, entry: sessionStoreEntry.entry });
// Restore route thread context only from the active turn or the thread-scoped session key.
// Do not read thread ids from the normalised session store here: `origin.threadId` can be
// folded back into lastThreadId/deliveryContext during store normalisation and resurrect a
@@ -331,10 +347,7 @@ export async function dispatchReplyFromConfig(
//
// Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts`
const suppressAcpChildUserDelivery = isParentOwnedBackgroundAcpSession(sessionStoreEntry.entry);
const effectiveOriginatingChannel = ctx.OriginatingChannel ?? fallbackOriginatingChannel;
const effectiveOriginatingTo = ctx.OriginatingTo ?? fallbackOriginatingTo;
const routeAccountId = ctx.AccountId ?? fallbackOriginatingAccountId;
const normalizedOriginatingChannel = normalizeMessageChannel(effectiveOriginatingChannel);
const normalizedOriginatingChannel = normalizeMessageChannel(effectiveReplyRoute.channel);
const normalizedProviderChannel = normalizeMessageChannel(ctx.Provider);
const normalizedSurfaceChannel = normalizeMessageChannel(ctx.Surface);
const normalizedCurrentSurface = normalizedProviderChannel ?? normalizedSurfaceChannel;
@@ -346,7 +359,7 @@ export async function dispatchReplyFromConfig(
!suppressAcpChildUserDelivery &&
!isInternalWebchatTurn &&
normalizedOriginatingChannel &&
effectiveOriginatingTo &&
effectiveReplyRoute.to &&
normalizedOriginatingChannel !== normalizedCurrentSurface,
);
const routeReplyRuntime = hasRouteReplyCandidate ? await loadRouteReplyRuntime() : undefined;
@@ -355,12 +368,12 @@ export async function dispatchReplyFromConfig(
provider: ctx.Provider,
surface: ctx.Surface,
explicitDeliverRoute: ctx.ExplicitDeliverRoute,
originatingChannel: effectiveOriginatingChannel,
originatingTo: effectiveOriginatingTo,
originatingChannel: effectiveReplyRoute.channel,
originatingTo: effectiveReplyRoute.to,
suppressDirectUserDelivery: suppressAcpChildUserDelivery,
isRoutableChannel: routeReplyRuntime?.isRoutableChannel ?? (() => false),
});
const originatingTo = effectiveOriginatingTo;
const originatingTo = effectiveReplyRoute.to;
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
const { createReplyMediaPathNormalizer } = await loadReplyMediaPathsRuntime();
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
@@ -368,7 +381,7 @@ export async function dispatchReplyFromConfig(
sessionKey: acpDispatchSessionKey,
workspaceDir: resolveAgentWorkspaceDir(cfg, sessionAgentId),
messageProvider: ttsChannel,
accountId: routeAccountId,
accountId: effectiveReplyRoute.accountId,
groupId,
groupChannel: ctx.GroupChannel,
groupSpace: ctx.GroupSpace,
@@ -400,7 +413,7 @@ export async function dispatchReplyFromConfig(
ctx.CommandSource === "native"
? (ctx.CommandTargetSessionKey ?? ctx.SessionKey)
: ctx.SessionKey,
accountId: routeAccountId,
accountId: effectiveReplyRoute.accountId,
requesterSenderId: ctx.SenderId,
requesterSenderName: ctx.SenderName,
requesterSenderUsername: ctx.SenderUsername,
@@ -487,8 +500,9 @@ export async function dispatchReplyFromConfig(
entry: sessionStoreEntry.entry,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
channel:
(shouldRouteToOriginating ? originatingChannel : undefined) ??
sessionStoreEntry.entry?.channel ??
ctx.OriginatingChannel ??
effectiveReplyRoute.channel ??
ctx.Surface ??
ctx.Provider ??
undefined,