From ce933f3bbc1df4eb1902bd91b756d87d050a79a2 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 23 Apr 2026 20:11:30 +0100 Subject: [PATCH] fix(auto-reply): honor recovered system-event reply routes --- .../reply/dispatch-from-config.test.ts | 96 +++++++++++++++++++ src/auto-reply/reply/dispatch-from-config.ts | 60 +++++++----- 2 files changed, 133 insertions(+), 23 deletions(-) diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index d290d31d955..6355c62825d 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -748,6 +748,102 @@ describe("dispatchReplyFromConfig", () => { accountId: "acc-1", }), ); + expect(replyMediaPathMocks.createReplyMediaPathNormalizer).toHaveBeenCalledWith( + expect.objectContaining({ + messageProvider: "telegram", + accountId: "acc-1", + }), + ); + expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith( + expect.objectContaining({ + shouldRouteToOriginating: true, + originatingChannel: "telegram", + originatingTo: "telegram:999", + }), + expect.any(Object), + ); + }); + + it("routes exec-event replies using last route fields when delivery context is missing", async () => { + setNoAbort(); + mocks.routeReply.mockClear(); + sessionStoreMocks.currentEntry = { + lastChannel: "discord", + lastTo: "channel:123", + lastAccountId: "default", + }; + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "exec-event", + Surface: "exec-event", + SessionKey: "agent:main:main", + AccountId: undefined, + OriginatingChannel: undefined, + OriginatingTo: undefined, + }); + + const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(mocks.routeReply).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + to: "channel:123", + accountId: "default", + }), + ); + }); + + it("honors sendPolicy deny for recovered exec-event delivery channel", async () => { + setNoAbort(); + mocks.routeReply.mockClear(); + sessionStoreMocks.currentEntry = { + deliveryContext: { + channel: "telegram", + to: "telegram:999", + accountId: "acc-1", + }, + lastChannel: "telegram", + lastTo: "telegram:999", + lastAccountId: "acc-1", + }; + const cfg = { + session: { + sendPolicy: { + default: "allow", + rules: [{ action: "deny", match: { channel: "telegram" } }], + }, + }, + } as OpenClawConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "exec-event", + Surface: "exec-event", + SessionKey: "agent:main:main", + AccountId: undefined, + OriginatingChannel: undefined, + OriginatingTo: undefined, + }); + + const replyResolver = vi.fn(async () => ({ text: "hi" }) satisfies ReplyPayload); + const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(mocks.routeReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(result.queuedFinal).toBe(false); + expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith( + expect.objectContaining({ + sendPolicy: "deny", + suppressUserDelivery: true, + shouldRouteToOriginating: true, + originatingChannel: "telegram", + originatingTo: "telegram:999", + }), + expect.any(Object), + ); }); it("falls back to thread-scoped session key when current ctx has no MessageThreadId", async () => { diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index e7e0ea6af37..cc923fd0dd6 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -105,6 +105,33 @@ function loadReplyMediaPathsRuntime() { return replyMediaPathsRuntimePromise; } +function isSystemEventProvider(provider?: string): boolean { + return provider === "heartbeat" || provider === "cron-event" || provider === "exec-event"; +} + +function resolveEffectiveReplyRoute(params: { + ctx: Pick; + entry?: Pick; +}): { channel?: string; to?: string; accountId?: string } { + if (!isSystemEventProvider(params.ctx.Provider)) { + return { + channel: params.ctx.OriginatingChannel, + to: params.ctx.OriginatingTo, + accountId: params.ctx.AccountId, + }; + } + const persistedDeliveryContext = params.entry?.deliveryContext; + return { + channel: + params.ctx.OriginatingChannel ?? + persistedDeliveryContext?.channel ?? + params.entry?.lastChannel, + to: params.ctx.OriginatingTo ?? persistedDeliveryContext?.to ?? params.entry?.lastTo, + accountId: + params.ctx.AccountId ?? persistedDeliveryContext?.accountId ?? params.entry?.lastAccountId, + }; +} + async function maybeApplyTtsToReplyPayload( params: Parameters>["maybeApplyTtsToPayload"]>[0], ) { @@ -287,18 +314,7 @@ export async function dispatchReplyFromConfig( "", ) ?? "off", }); - const isSystemEventTurn = - ctx.Provider === "heartbeat" || ctx.Provider === "cron-event" || ctx.Provider === "exec-event"; - const persistedDeliveryContext = sessionStoreEntry.entry?.deliveryContext; - const fallbackOriginatingChannel = isSystemEventTurn - ? (persistedDeliveryContext?.channel ?? sessionStoreEntry.entry?.lastChannel) - : undefined; - const fallbackOriginatingTo = isSystemEventTurn - ? (persistedDeliveryContext?.to ?? sessionStoreEntry.entry?.lastTo) - : undefined; - const fallbackOriginatingAccountId = isSystemEventTurn - ? (persistedDeliveryContext?.accountId ?? sessionStoreEntry.entry?.lastAccountId) - : undefined; + const effectiveReplyRoute = resolveEffectiveReplyRoute({ ctx, entry: sessionStoreEntry.entry }); // Restore route thread context only from the active turn or the thread-scoped session key. // Do not read thread ids from the normalised session store here: `origin.threadId` can be // folded back into lastThreadId/deliveryContext during store normalisation and resurrect a @@ -331,10 +347,7 @@ export async function dispatchReplyFromConfig( // // Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts` const suppressAcpChildUserDelivery = isParentOwnedBackgroundAcpSession(sessionStoreEntry.entry); - const effectiveOriginatingChannel = ctx.OriginatingChannel ?? fallbackOriginatingChannel; - const effectiveOriginatingTo = ctx.OriginatingTo ?? fallbackOriginatingTo; - const routeAccountId = ctx.AccountId ?? fallbackOriginatingAccountId; - const normalizedOriginatingChannel = normalizeMessageChannel(effectiveOriginatingChannel); + const normalizedOriginatingChannel = normalizeMessageChannel(effectiveReplyRoute.channel); const normalizedProviderChannel = normalizeMessageChannel(ctx.Provider); const normalizedSurfaceChannel = normalizeMessageChannel(ctx.Surface); const normalizedCurrentSurface = normalizedProviderChannel ?? normalizedSurfaceChannel; @@ -346,7 +359,7 @@ export async function dispatchReplyFromConfig( !suppressAcpChildUserDelivery && !isInternalWebchatTurn && normalizedOriginatingChannel && - effectiveOriginatingTo && + effectiveReplyRoute.to && normalizedOriginatingChannel !== normalizedCurrentSurface, ); const routeReplyRuntime = hasRouteReplyCandidate ? await loadRouteReplyRuntime() : undefined; @@ -355,12 +368,12 @@ export async function dispatchReplyFromConfig( provider: ctx.Provider, surface: ctx.Surface, explicitDeliverRoute: ctx.ExplicitDeliverRoute, - originatingChannel: effectiveOriginatingChannel, - originatingTo: effectiveOriginatingTo, + originatingChannel: effectiveReplyRoute.channel, + originatingTo: effectiveReplyRoute.to, suppressDirectUserDelivery: suppressAcpChildUserDelivery, isRoutableChannel: routeReplyRuntime?.isRoutableChannel ?? (() => false), }); - const originatingTo = effectiveOriginatingTo; + const originatingTo = effectiveReplyRoute.to; const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface; const { createReplyMediaPathNormalizer } = await loadReplyMediaPathsRuntime(); const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({ @@ -368,7 +381,7 @@ export async function dispatchReplyFromConfig( sessionKey: acpDispatchSessionKey, workspaceDir: resolveAgentWorkspaceDir(cfg, sessionAgentId), messageProvider: ttsChannel, - accountId: routeAccountId, + accountId: effectiveReplyRoute.accountId, groupId, groupChannel: ctx.GroupChannel, groupSpace: ctx.GroupSpace, @@ -400,7 +413,7 @@ export async function dispatchReplyFromConfig( ctx.CommandSource === "native" ? (ctx.CommandTargetSessionKey ?? ctx.SessionKey) : ctx.SessionKey, - accountId: routeAccountId, + accountId: effectiveReplyRoute.accountId, requesterSenderId: ctx.SenderId, requesterSenderName: ctx.SenderName, requesterSenderUsername: ctx.SenderUsername, @@ -487,8 +500,9 @@ export async function dispatchReplyFromConfig( entry: sessionStoreEntry.entry, sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, channel: + (shouldRouteToOriginating ? originatingChannel : undefined) ?? sessionStoreEntry.entry?.channel ?? - ctx.OriginatingChannel ?? + effectiveReplyRoute.channel ?? ctx.Surface ?? ctx.Provider ?? undefined,