From 8b8167d54751cf2761ea26d3538bf0cc6cf5bb64 Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 5 Mar 2026 11:31:33 +0800 Subject: [PATCH] fix(agents): bypass pendingDescendantRuns guard for cron announce delivery (#35185) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(agents): bypass pendingDescendantRuns guard for cron announce delivery Standalone cron job completions were blocked from direct channel delivery when the cron run had spawned subagents that were still registered as pending. The pendingDescendantRuns guard exists for live orchestration coordination and should not apply to fire-and-forget cron announce sends. Thread the announceType through the delivery chain and skip both the child-descendant and requester-descendant pending-run guards when the announce originates from a cron job. Closes #34966 * fix: ensure outbound session entry for cron announce with named agents (#32432) Named agents may not have a session entry for their delivery target, causing the announce flow to silently fail (delivered=false, no error). Two fixes: 1. Call ensureOutboundSessionEntry when resolving the cron announce session key so downstream delivery can find channel metadata. 2. Fall back to direct outbound delivery when announce delivery fails to ensure cron output reaches the target channel. Closes #32432 Co-Authored-By: Claude Opus 4.6 * fix: guard announce direct-delivery fallback against suppression leaks (#32432) The `!delivered` fallback condition was too broad — it caught intentional suppressions (active subagents, interim messages, SILENT_REPLY_TOKEN) in addition to actual announce delivery failures. Add an `announceDeliveryWasAttempted` flag so the direct-delivery fallback only fires when `runSubagentAnnounceFlow` was actually called and failed. Also remove the redundant `if (route)` guard in `resolveCronAnnounceSessionKey` since `resolved` being truthy guarantees `route` is non-null. Co-Authored-By: Claude Opus 4.6 * fix(cron): harden announce synthesis follow-ups --------- Co-authored-by: scoootscooob Co-authored-by: Claude Opus 4.6 Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> --- CHANGELOG.md | 1 + .../subagent-announce.format.e2e.test.ts | 47 +++++++++ src/agents/subagent-announce.ts | 10 +- ...p-recipient-besteffortdeliver-true.test.ts | 22 ++--- .../delivery-dispatch.named-agent.test.ts | 99 +++++++++++++++++++ src/cron/isolated-agent/delivery-dispatch.ts | 59 ++++++++++- 6 files changed, 222 insertions(+), 16 deletions(-) create mode 100644 src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 72ecbbf0e83..b29cdaebcf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Security/dependency audit: patch transitive Hono vulnerabilities by pinning `hono` to `4.12.5` and `@hono/node-server` to `1.19.10` in production resolution paths. Thanks @shakkernerd. - Security/dependency audit: bump `tar` to `7.5.10` (from `7.5.9`) to address the high-severity hardlink path traversal advisory (`GHSA-qffp-2rhf-9h96`). Thanks @shakkernerd. +- Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3. - Auto-reply/system events: restore runtime system events to the message timeline (`System:` lines), preserve think-hint parsing with prepended events, and carry events into deferred followup/collect/steer-backlog prompts to keep cache behavior stable without dropping queued metadata. (#34794) Thanks @anisoptera. - Security/audit account handling: avoid prototype-chain account IDs in audit validation by using own-property checks for `accounts`. (#34982) Thanks @HOYALIM. - Agents/session usage tracking: preserve accumulated usage metadata on embedded Pi runner error exits so failed turns still update session `totalTokens` from real usage instead of stale prior values. (#34275) thanks @RealKai42. diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index be1d287aa3c..1f1698c4722 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -469,6 +469,53 @@ describe("subagent announce formatting", () => { expect(agentSpy).not.toHaveBeenCalled(); }); + it("keeps cron completion direct delivery even when sibling runs are still active", async () => { + sessionStore = { + "agent:main:subagent:test": { + sessionId: "child-session-cron-direct", + }, + "agent:main:main": { + sessionId: "requester-session-cron-direct", + }, + }; + readLatestAssistantReplyMock.mockResolvedValue(""); + chatHistoryMock.mockResolvedValueOnce({ + messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: cron" }] }], + }); + subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:main" ? 1 : 0, + ); + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:main" ? 1 : 0, + ); + subagentRegistryMock.countPendingDescendantRunsExcludingRun.mockImplementation( + (sessionKey: string, runId: string) => + sessionKey === "agent:main:main" && runId === "run-direct-cron-active-siblings" ? 1 : 0, + ); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-direct-cron-active-siblings", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" }, + announceType: "cron job", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + + expect(didAnnounce).toBe(true); + expect(sendSpy).toHaveBeenCalledTimes(1); + expect(agentSpy).not.toHaveBeenCalled(); + const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; + const rawMessage = call?.params?.message; + const msg = typeof rawMessage === "string" ? rawMessage : ""; + expect(call?.params?.channel).toBe("discord"); + expect(call?.params?.to).toBe("channel:12345"); + expect(msg).toContain("final answer: cron"); + expect(msg).not.toContain("There are still 1 active subagent run for this session."); + }); + it("suppresses completion delivery when subagent reply is ANNOUNCE_SKIP", async () => { const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index bbb618b3239..8b0c432db3b 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -736,6 +736,7 @@ async function sendSubagentAnnounceDirectly(params: { bestEffortDeliver?: boolean; completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; + announceType?: SubagentAnnounceType; directIdempotencyKey: string; currentRunId?: string; completionDirectOrigin?: DeliveryContext; @@ -778,8 +779,9 @@ async function sendSubagentAnnounceDirectly(params: { const forceBoundSessionDirectDelivery = params.spawnMode === "session" && (params.completionRouteMode === "bound" || params.completionRouteMode === "hook"); + const forceCronDirectDelivery = params.announceType === "cron job"; let shouldSendCompletionDirectly = true; - if (!forceBoundSessionDirectDelivery) { + if (!forceBoundSessionDirectDelivery && !forceCronDirectDelivery) { let pendingDescendantRuns = 0; try { const { countPendingDescendantRuns, countPendingDescendantRunsExcludingRun } = @@ -919,6 +921,7 @@ async function deliverSubagentAnnouncement(params: { bestEffortDeliver?: boolean; completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; + announceType?: SubagentAnnounceType; directIdempotencyKey: string; currentRunId?: string; signal?: AbortSignal; @@ -948,6 +951,7 @@ async function deliverSubagentAnnouncement(params: { completionDirectOrigin: params.completionDirectOrigin, completionRouteMode: params.completionRouteMode, spawnMode: params.spawnMode, + announceType: params.announceType, directOrigin: params.directOrigin, requesterIsSubagent: params.requesterIsSubagent, expectsCompletionMessage: params.expectsCompletionMessage, @@ -1233,7 +1237,8 @@ export async function runSubagentAnnounceFlow(params: { } catch { // Best-effort only; fall back to direct announce behavior when unavailable. } - if (pendingChildDescendantRuns > 0) { + const isCronAnnounce = params.announceType === "cron job"; + if (pendingChildDescendantRuns > 0 && !isCronAnnounce) { // The finished run still has pending descendant subagents (either active, // or ended but still finishing their own announce and cleanup flow). Defer // announcing this run until descendants fully settle. @@ -1406,6 +1411,7 @@ export async function runSubagentAnnounceFlow(params: { bestEffortDeliver: params.bestEffortDeliver, completionRouteMode: completionResolution.routeMode, spawnMode: params.spawnMode, + announceType, directIdempotencyKey, currentRunId: params.childRunId, signal: params.signal, diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 06daf55bb45..a4522279c63 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -393,7 +393,7 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("returns ok when announce delivery reports false and best-effort is disabled", async () => { + it("falls back to direct delivery when announce reports false and best-effort is disabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps(); @@ -412,13 +412,12 @@ describe("runCronIsolatedAgentTurn", () => { }, }); - // Announce delivery failure should not mark a successful agent execution - // as error. The execution succeeded; only delivery failed. + // When announce delivery fails, the direct-delivery fallback fires + // so the message still reaches the target channel. expect(res.status).toBe("ok"); - expect(res.delivered).toBe(false); + expect(res.delivered).toBe(true); expect(res.deliveryAttempted).toBe(true); - expect(res.error).toBe("cron announce delivery failed"); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); }); }); @@ -431,7 +430,7 @@ describe("runCronIsolatedAgentTurn", () => { expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); - it("returns ok when announce flow throws and best-effort is disabled", async () => { + it("falls back to direct delivery when announce flow throws and best-effort is disabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps(); @@ -452,13 +451,12 @@ describe("runCronIsolatedAgentTurn", () => { }, }); - // Even when announce throws (e.g. "pairing required"), the agent - // execution succeeded so the job status should be ok. + // When announce throws (e.g. "pairing required"), the direct-delivery + // fallback fires so the message still reaches the target channel. expect(res.status).toBe("ok"); - expect(res.delivered).toBe(false); + expect(res.delivered).toBe(true); expect(res.deliveryAttempted).toBe(true); - expect(res.error).toContain("pairing required"); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); }); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts new file mode 100644 index 00000000000..6de82039241 --- /dev/null +++ b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts @@ -0,0 +1,99 @@ +import { describe, expect, it, vi } from "vitest"; +import { matchesMessagingToolDeliveryTarget } from "./delivery-dispatch.js"; + +// Mock the announce flow dependencies to test the fallback behavior. +vi.mock("../../agents/subagent-announce.js", () => ({ + runSubagentAnnounceFlow: vi.fn(), +})); +vi.mock("../../agents/subagent-registry.js", () => ({ + countActiveDescendantRuns: vi.fn().mockReturnValue(0), +})); + +describe("matchesMessagingToolDeliveryTarget", () => { + it("matches when channel and to agree", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456" }, + { channel: "telegram", to: "123456" }, + ), + ).toBe(true); + }); + + it("rejects when channel differs", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "whatsapp", to: "123456" }, + { channel: "telegram", to: "123456" }, + ), + ).toBe(false); + }); + + it("rejects when to is missing from delivery", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456" }, + { channel: "telegram", to: undefined }, + ), + ).toBe(false); + }); + + it("rejects when channel is missing from delivery", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456" }, + { channel: undefined, to: "123456" }, + ), + ).toBe(false); + }); + + it("strips :topic:NNN suffix from target.to before comparing", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "-1003597428309:topic:462" }, + { channel: "telegram", to: "-1003597428309" }, + ), + ).toBe(true); + }); + + it("matches when provider is 'message' (generic)", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "message", to: "123456" }, + { channel: "telegram", to: "123456" }, + ), + ).toBe(true); + }); + + it("rejects when accountIds differ", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456", accountId: "bot-a" }, + { channel: "telegram", to: "123456", accountId: "bot-b" }, + ), + ).toBe(false); + }); +}); + +describe("resolveCronDeliveryBestEffort", () => { + // Import dynamically to avoid top-level side effects + it("returns false by default (no bestEffort set)", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { delivery: {}, payload: { kind: "agentTurn" } } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(false); + }); + + it("returns true when delivery.bestEffort is true", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { delivery: { bestEffort: true }, payload: { kind: "agentTurn" } } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(true); + }); + + it("returns true when payload.bestEffortDeliver is true and no delivery.bestEffort", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { + delivery: {}, + payload: { kind: "agentTurn", bestEffortDeliver: true }, + } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(true); + }); +}); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 39ab40843c4..0fc301cc2b7 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -7,7 +7,10 @@ import type { OpenClawConfig } from "../../config/config.js"; import { resolveAgentMainSessionKey } from "../../config/sessions.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; -import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js"; +import { + ensureOutboundSessionEntry, + resolveOutboundSessionRoute, +} from "../../infra/outbound/outbound-session.js"; import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { logWarn } from "../../logger.js"; import type { CronJob, CronRunTelemetry } from "../types.js"; @@ -93,7 +96,20 @@ async function resolveCronAnnounceSessionKey(params: { threadId: params.delivery.threadId, }); const resolved = route?.sessionKey?.trim(); - if (resolved) { + if (route && resolved) { + // Ensure the session entry exists so downstream announce / queue delivery + // can look up channel metadata (lastChannel, to, sessionId). Named agents + // may not have a session entry for this target yet, causing announce + // delivery to silently fail (#32432). + await ensureOutboundSessionEntry({ + cfg: params.cfg, + agentId: params.agentId, + channel: params.delivery.channel, + accountId: params.delivery.accountId, + route, + }).catch(() => { + // Best-effort: don't block delivery on session entry creation. + }); return resolved; } } catch { @@ -156,6 +172,12 @@ export async function dispatchCronDelivery( // Keep this strict so timer fallback can safely decide whether to wake main. let delivered = params.skipMessagingToolDelivery; let deliveryAttempted = params.skipMessagingToolDelivery; + // Tracks whether `runSubagentAnnounceFlow` was actually called. Early + // returns from `deliverViaAnnounce` (active subagents, interim suppression, + // SILENT_REPLY_TOKEN) are intentional suppressions — not delivery failures — + // so the direct-delivery fallback must only fire when the announce send was + // actually attempted and failed. + let announceDeliveryWasAttempted = false; const failDeliveryTarget = (error: string) => params.withRunSession({ status: "error", @@ -313,6 +335,7 @@ export async function dispatchCronDelivery( }); } deliveryAttempted = true; + announceDeliveryWasAttempted = true; const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: params.agentSessionKey, childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`, @@ -443,6 +466,38 @@ export async function dispatchCronDelivery( } else { const announceResult = await deliverViaAnnounce(params.resolvedDelivery); if (announceResult) { + // Fall back to direct delivery only when the announce send was + // actually attempted and failed. Early returns from + // deliverViaAnnounce (active subagents, interim suppression, + // SILENT_REPLY_TOKEN) are intentional suppressions that must NOT + // trigger direct delivery — doing so would bypass the suppression + // guard and leak partial/stale content to the channel. (#32432) + if (announceDeliveryWasAttempted && !delivered && !params.isAborted()) { + const directFallback = await deliverViaDirect(params.resolvedDelivery); + if (directFallback) { + return { + result: directFallback, + delivered, + deliveryAttempted, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + // If direct delivery succeeded (returned null without error), + // `delivered` has been set to true by deliverViaDirect. + if (delivered) { + return { + delivered, + deliveryAttempted, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + } return { result: announceResult, delivered,