diff --git a/CHANGELOG.md b/CHANGELOG.md index 72ecbbf0e83..b29cdaebcf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Security/dependency audit: patch transitive Hono vulnerabilities by pinning `hono` to `4.12.5` and `@hono/node-server` to `1.19.10` in production resolution paths. Thanks @shakkernerd. - Security/dependency audit: bump `tar` to `7.5.10` (from `7.5.9`) to address the high-severity hardlink path traversal advisory (`GHSA-qffp-2rhf-9h96`). Thanks @shakkernerd. +- Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3. - Auto-reply/system events: restore runtime system events to the message timeline (`System:` lines), preserve think-hint parsing with prepended events, and carry events into deferred followup/collect/steer-backlog prompts to keep cache behavior stable without dropping queued metadata. (#34794) Thanks @anisoptera. - Security/audit account handling: avoid prototype-chain account IDs in audit validation by using own-property checks for `accounts`. (#34982) Thanks @HOYALIM. - Agents/session usage tracking: preserve accumulated usage metadata on embedded Pi runner error exits so failed turns still update session `totalTokens` from real usage instead of stale prior values. (#34275) thanks @RealKai42. diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index be1d287aa3c..1f1698c4722 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -469,6 +469,53 @@ describe("subagent announce formatting", () => { expect(agentSpy).not.toHaveBeenCalled(); }); + it("keeps cron completion direct delivery even when sibling runs are still active", async () => { + sessionStore = { + "agent:main:subagent:test": { + sessionId: "child-session-cron-direct", + }, + "agent:main:main": { + sessionId: "requester-session-cron-direct", + }, + }; + readLatestAssistantReplyMock.mockResolvedValue(""); + chatHistoryMock.mockResolvedValueOnce({ + messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: cron" }] }], + }); + subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:main" ? 1 : 0, + ); + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:main" ? 1 : 0, + ); + subagentRegistryMock.countPendingDescendantRunsExcludingRun.mockImplementation( + (sessionKey: string, runId: string) => + sessionKey === "agent:main:main" && runId === "run-direct-cron-active-siblings" ? 1 : 0, + ); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-direct-cron-active-siblings", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" }, + announceType: "cron job", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + + expect(didAnnounce).toBe(true); + expect(sendSpy).toHaveBeenCalledTimes(1); + expect(agentSpy).not.toHaveBeenCalled(); + const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; + const rawMessage = call?.params?.message; + const msg = typeof rawMessage === "string" ? rawMessage : ""; + expect(call?.params?.channel).toBe("discord"); + expect(call?.params?.to).toBe("channel:12345"); + expect(msg).toContain("final answer: cron"); + expect(msg).not.toContain("There are still 1 active subagent run for this session."); + }); + it("suppresses completion delivery when subagent reply is ANNOUNCE_SKIP", async () => { const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index bbb618b3239..8b0c432db3b 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -736,6 +736,7 @@ async function sendSubagentAnnounceDirectly(params: { bestEffortDeliver?: boolean; completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; + announceType?: SubagentAnnounceType; directIdempotencyKey: string; currentRunId?: string; completionDirectOrigin?: DeliveryContext; @@ -778,8 +779,9 @@ async function sendSubagentAnnounceDirectly(params: { const forceBoundSessionDirectDelivery = params.spawnMode === "session" && (params.completionRouteMode === "bound" || params.completionRouteMode === "hook"); + const forceCronDirectDelivery = params.announceType === "cron job"; let shouldSendCompletionDirectly = true; - if (!forceBoundSessionDirectDelivery) { + if (!forceBoundSessionDirectDelivery && !forceCronDirectDelivery) { let pendingDescendantRuns = 0; try { const { countPendingDescendantRuns, countPendingDescendantRunsExcludingRun } = @@ -919,6 +921,7 @@ async function deliverSubagentAnnouncement(params: { bestEffortDeliver?: boolean; completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; + announceType?: SubagentAnnounceType; directIdempotencyKey: string; currentRunId?: string; signal?: AbortSignal; @@ -948,6 +951,7 @@ async function deliverSubagentAnnouncement(params: { completionDirectOrigin: params.completionDirectOrigin, completionRouteMode: params.completionRouteMode, spawnMode: params.spawnMode, + announceType: params.announceType, directOrigin: params.directOrigin, requesterIsSubagent: params.requesterIsSubagent, expectsCompletionMessage: params.expectsCompletionMessage, @@ -1233,7 +1237,8 @@ export async function runSubagentAnnounceFlow(params: { } catch { // Best-effort only; fall back to direct announce behavior when unavailable. } - if (pendingChildDescendantRuns > 0) { + const isCronAnnounce = params.announceType === "cron job"; + if (pendingChildDescendantRuns > 0 && !isCronAnnounce) { // The finished run still has pending descendant subagents (either active, // or ended but still finishing their own announce and cleanup flow). Defer // announcing this run until descendants fully settle. @@ -1406,6 +1411,7 @@ export async function runSubagentAnnounceFlow(params: { bestEffortDeliver: params.bestEffortDeliver, completionRouteMode: completionResolution.routeMode, spawnMode: params.spawnMode, + announceType, directIdempotencyKey, currentRunId: params.childRunId, signal: params.signal, diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 06daf55bb45..a4522279c63 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -393,7 +393,7 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("returns ok when announce delivery reports false and best-effort is disabled", async () => { + it("falls back to direct delivery when announce reports false and best-effort is disabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps(); @@ -412,13 +412,12 @@ describe("runCronIsolatedAgentTurn", () => { }, }); - // Announce delivery failure should not mark a successful agent execution - // as error. The execution succeeded; only delivery failed. + // When announce delivery fails, the direct-delivery fallback fires + // so the message still reaches the target channel. expect(res.status).toBe("ok"); - expect(res.delivered).toBe(false); + expect(res.delivered).toBe(true); expect(res.deliveryAttempted).toBe(true); - expect(res.error).toBe("cron announce delivery failed"); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); }); }); @@ -431,7 +430,7 @@ describe("runCronIsolatedAgentTurn", () => { expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); - it("returns ok when announce flow throws and best-effort is disabled", async () => { + it("falls back to direct delivery when announce flow throws and best-effort is disabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps(); @@ -452,13 +451,12 @@ describe("runCronIsolatedAgentTurn", () => { }, }); - // Even when announce throws (e.g. "pairing required"), the agent - // execution succeeded so the job status should be ok. + // When announce throws (e.g. "pairing required"), the direct-delivery + // fallback fires so the message still reaches the target channel. expect(res.status).toBe("ok"); - expect(res.delivered).toBe(false); + expect(res.delivered).toBe(true); expect(res.deliveryAttempted).toBe(true); - expect(res.error).toContain("pairing required"); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); }); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts new file mode 100644 index 00000000000..6de82039241 --- /dev/null +++ b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts @@ -0,0 +1,99 @@ +import { describe, expect, it, vi } from "vitest"; +import { matchesMessagingToolDeliveryTarget } from "./delivery-dispatch.js"; + +// Mock the announce flow dependencies to test the fallback behavior. +vi.mock("../../agents/subagent-announce.js", () => ({ + runSubagentAnnounceFlow: vi.fn(), +})); +vi.mock("../../agents/subagent-registry.js", () => ({ + countActiveDescendantRuns: vi.fn().mockReturnValue(0), +})); + +describe("matchesMessagingToolDeliveryTarget", () => { + it("matches when channel and to agree", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456" }, + { channel: "telegram", to: "123456" }, + ), + ).toBe(true); + }); + + it("rejects when channel differs", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "whatsapp", to: "123456" }, + { channel: "telegram", to: "123456" }, + ), + ).toBe(false); + }); + + it("rejects when to is missing from delivery", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456" }, + { channel: "telegram", to: undefined }, + ), + ).toBe(false); + }); + + it("rejects when channel is missing from delivery", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456" }, + { channel: undefined, to: "123456" }, + ), + ).toBe(false); + }); + + it("strips :topic:NNN suffix from target.to before comparing", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "-1003597428309:topic:462" }, + { channel: "telegram", to: "-1003597428309" }, + ), + ).toBe(true); + }); + + it("matches when provider is 'message' (generic)", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "message", to: "123456" }, + { channel: "telegram", to: "123456" }, + ), + ).toBe(true); + }); + + it("rejects when accountIds differ", () => { + expect( + matchesMessagingToolDeliveryTarget( + { provider: "telegram", to: "123456", accountId: "bot-a" }, + { channel: "telegram", to: "123456", accountId: "bot-b" }, + ), + ).toBe(false); + }); +}); + +describe("resolveCronDeliveryBestEffort", () => { + // Import dynamically to avoid top-level side effects + it("returns false by default (no bestEffort set)", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { delivery: {}, payload: { kind: "agentTurn" } } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(false); + }); + + it("returns true when delivery.bestEffort is true", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { delivery: { bestEffort: true }, payload: { kind: "agentTurn" } } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(true); + }); + + it("returns true when payload.bestEffortDeliver is true and no delivery.bestEffort", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { + delivery: {}, + payload: { kind: "agentTurn", bestEffortDeliver: true }, + } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(true); + }); +}); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 39ab40843c4..0fc301cc2b7 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -7,7 +7,10 @@ import type { OpenClawConfig } from "../../config/config.js"; import { resolveAgentMainSessionKey } from "../../config/sessions.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; -import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js"; +import { + ensureOutboundSessionEntry, + resolveOutboundSessionRoute, +} from "../../infra/outbound/outbound-session.js"; import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { logWarn } from "../../logger.js"; import type { CronJob, CronRunTelemetry } from "../types.js"; @@ -93,7 +96,20 @@ async function resolveCronAnnounceSessionKey(params: { threadId: params.delivery.threadId, }); const resolved = route?.sessionKey?.trim(); - if (resolved) { + if (route && resolved) { + // Ensure the session entry exists so downstream announce / queue delivery + // can look up channel metadata (lastChannel, to, sessionId). Named agents + // may not have a session entry for this target yet, causing announce + // delivery to silently fail (#32432). + await ensureOutboundSessionEntry({ + cfg: params.cfg, + agentId: params.agentId, + channel: params.delivery.channel, + accountId: params.delivery.accountId, + route, + }).catch(() => { + // Best-effort: don't block delivery on session entry creation. + }); return resolved; } } catch { @@ -156,6 +172,12 @@ export async function dispatchCronDelivery( // Keep this strict so timer fallback can safely decide whether to wake main. let delivered = params.skipMessagingToolDelivery; let deliveryAttempted = params.skipMessagingToolDelivery; + // Tracks whether `runSubagentAnnounceFlow` was actually called. Early + // returns from `deliverViaAnnounce` (active subagents, interim suppression, + // SILENT_REPLY_TOKEN) are intentional suppressions — not delivery failures — + // so the direct-delivery fallback must only fire when the announce send was + // actually attempted and failed. + let announceDeliveryWasAttempted = false; const failDeliveryTarget = (error: string) => params.withRunSession({ status: "error", @@ -313,6 +335,7 @@ export async function dispatchCronDelivery( }); } deliveryAttempted = true; + announceDeliveryWasAttempted = true; const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: params.agentSessionKey, childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`, @@ -443,6 +466,38 @@ export async function dispatchCronDelivery( } else { const announceResult = await deliverViaAnnounce(params.resolvedDelivery); if (announceResult) { + // Fall back to direct delivery only when the announce send was + // actually attempted and failed. Early returns from + // deliverViaAnnounce (active subagents, interim suppression, + // SILENT_REPLY_TOKEN) are intentional suppressions that must NOT + // trigger direct delivery — doing so would bypass the suppression + // guard and leak partial/stale content to the channel. (#32432) + if (announceDeliveryWasAttempted && !delivered && !params.isAborted()) { + const directFallback = await deliverViaDirect(params.resolvedDelivery); + if (directFallback) { + return { + result: directFallback, + delivered, + deliveryAttempted, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + // If direct delivery succeeded (returned null without error), + // `delivered` has been set to true by deliverViaDirect. + if (delivered) { + return { + delivered, + deliveryAttempted, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + } return { result: announceResult, delivered,