diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fa7c3b34af..3b517c21ba4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -463,6 +463,7 @@ Docs: https://docs.openclaw.ai - Plugins/ClawHub: fall back to version metadata when the artifact resolver route is missing and keep the Docker ClawHub fixture aligned with npm-pack artifact resolution, avoiding false version-not-found failures during plugin install validation. Thanks @vincentkoc. - Providers/openai-codex: honor `providerConfig.baseUrl` in the dynamic-model synthesis fallback so codex providers configured with a custom upstream (for example a forwarding proxy) no longer silently bypass the configured URL when the registry has no template row to clone for the requested model id. (#76428) Thanks @arniesaha. - Status/channels: show configured channels in `openclaw status` and config-only `openclaw channels status` output even when the Gateway is unreachable, avoiding empty Channels tables on WSL and other no-Gateway paths. Thanks @vincentkoc. +- Agents/main-session: keep pending final delivery markers until the final reply is actually routed or queued, so restart and heartbeat recovery can retry failed delivery. Refs #65037. - Plugins/ClawHub: explain unavailable explicit ClawHub ClawPack artifact downloads with a temporary npm install hint while ClawHub artifact routing rolls out. Thanks @vincentkoc. - Media: accept home-relative `MEDIA:~/...` attachment paths while preserving existing file-read policy, traversal checks, and media type validation. Fixes #73796. Thanks @fabkury. - Onboarding/search: install official external web-search plugins such as Brave before saving provider config, and make doctor repair reconcile selected external search providers whose npm payload is missing. Thanks @vincentkoc. diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index dfde018cd13..e47193dedc6 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -17,8 +17,11 @@ import { import { formatErrorMessage } from "../infra/errors.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { normalizeAgentId } from "../routing/session-key.js"; -import { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; +import { + isSubagentSessionKey, + normalizeAgentId, + resolveAgentIdFromSessionKey, +} from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { applyVerboseOverride } from "../sessions/level-overrides.js"; import { applyModelOverrideToSessionEntry } from "../sessions/model-overrides.js"; @@ -1246,8 +1249,43 @@ async function agentCommandInternal( } const payloads = result.payloads ?? []; + + // Phase 2: Persist pending final delivery for main sessions before attempting delivery. + // This ensures that if the process restarts during delivery, the payload is durable. + if ( + opts.deliver === true && + sessionStore && + sessionKey && + payloads.length > 0 && + !isSubagentSessionKey(sessionKey) + ) { + const now = Date.now(); + const combinedPayload = payloads + .map((p) => (typeof p.text === "string" ? p.text : "")) + .filter(Boolean) + .join("\n\n"); + + if (combinedPayload) { + const entry = sessionStore[sessionKey] ?? sessionEntry; + const next: SessionEntry = { + ...entry, + pendingFinalDelivery: true, + pendingFinalDeliveryText: combinedPayload, + pendingFinalDeliveryCreatedAt: now, + updatedAt: now, + }; + await persistSessionEntry({ + sessionStore, + sessionKey, + storePath, + entry: next, + }); + sessionEntry = next; + } + } + const { deliverAgentCommandResult } = await loadDeliveryRuntime(); - return await deliverAgentCommandResult({ + const deliveryResult = await deliverAgentCommandResult({ cfg, deps: resolvedDeps, runtime, @@ -1257,6 +1295,32 @@ async function agentCommandInternal( result, payloads, }); + + // Phase 2: Clear pending delivery payload after successful delivery. + if ( + deliveryResult?.deliverySucceeded === true && + sessionStore && + sessionKey && + !isSubagentSessionKey(sessionKey) + ) { + const entry = sessionStore[sessionKey] ?? sessionEntry; + const next: SessionEntry = { + ...entry, + pendingFinalDelivery: undefined, + pendingFinalDeliveryText: undefined, + pendingFinalDeliveryCreatedAt: undefined, + updatedAt: Date.now(), + }; + await persistSessionEntry({ + sessionStore, + sessionKey, + storePath, + entry: next, + }); + sessionEntry = next; + } + + return deliveryResult; } finally { clearAgentRunContext(runId); } diff --git a/src/agents/command/delivery.test.ts b/src/agents/command/delivery.test.ts index 97eeaf7415b..2e82a135ead 100644 --- a/src/agents/command/delivery.test.ts +++ b/src/agents/command/delivery.test.ts @@ -215,6 +215,52 @@ describe("normalizeAgentCommandReplyPayloads", () => { }); }); + it("reports successful requested delivery", async () => { + deliverOutboundPayloadsMock.mockResolvedValue([]); + + const delivered = await deliverMediaReplyForTest({ + key: "agent:tester:slack:direct:alice", + agentId: "tester", + } as never); + + expect(delivered.deliverySucceeded).toBe(true); + }); + + it("does not report success when best-effort delivery records an error", async () => { + deliverOutboundPayloadsMock.mockImplementationOnce(async (params: unknown) => { + (params as { onError?: (err: unknown) => void }).onError?.(new Error("send failed")); + return []; + }); + + const runtime = { log: vi.fn(), error: vi.fn() }; + const delivered = await deliverAgentCommandResult({ + cfg: { + agents: { + list: [{ id: "tester", workspace: "/tmp/agent-workspace" }], + }, + } as OpenClawConfig, + deps: {} as CliDeps, + runtime: runtime as never, + opts: { + message: "go", + deliver: true, + bestEffortDeliver: true, + replyChannel: "slack", + replyTo: "#general", + } as AgentCommandOpts, + outboundSession: { + key: "agent:tester:slack:direct:alice", + agentId: "tester", + } as never, + sessionEntry: undefined, + payloads: [{ text: "here you go" }], + result: createResult(), + }); + + expect(delivered.deliverySucceeded).toBe(false); + expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("send failed")); + }); + it("threads agentId into the normalizer when sessionKey is unresolved", async () => { createReplyMediaPathNormalizerMock.mockReturnValue(async (payload: ReplyPayload) => payload); deliverOutboundPayloadsMock.mockResolvedValue([]); diff --git a/src/agents/command/delivery.ts b/src/agents/command/delivery.ts index 9a1f3c06524..2cad2a18179 100644 --- a/src/agents/command/delivery.ts +++ b/src/agents/command/delivery.ts @@ -354,6 +354,8 @@ export async function deliverAgentCommandResult(params: { } const deliveryPayloads = projectOutboundPayloadPlanForOutbound(outboundPayloadPlan); + let deliverySucceeded = false; + let deliveryHadError = false; const logPayload = (payload: NormalizedOutboundPayload) => { if (opts.json) { return; @@ -368,6 +370,10 @@ export async function deliverAgentCommandResult(params: { } runtime.log(output); }; + const markDeliveryError = (err: unknown) => { + deliveryHadError = true; + logDeliveryError(err); + }; if (!deliver) { for (const payload of deliveryPayloads) { logPayload(payload); @@ -385,12 +391,13 @@ export async function deliverAgentCommandResult(params: { replyToId: resolvedReplyToId ?? null, threadId: resolvedThreadTarget ?? null, bestEffort: bestEffortDeliver, - onError: (err) => logDeliveryError(err), + onError: markDeliveryError, onPayload: logPayload, deps: createOutboundSendDeps(deps), }); + deliverySucceeded = !deliveryHadError; } } - return { payloads: normalizedPayloads, meta: resultMeta }; + return { payloads: normalizedPayloads, meta: resultMeta, deliverySucceeded }; } diff --git a/src/agents/main-session-restart-recovery.test.ts b/src/agents/main-session-restart-recovery.test.ts index 0bf84c978ba..2aaba3004cd 100644 --- a/src/agents/main-session-restart-recovery.test.ts +++ b/src/agents/main-session-restart-recovery.test.ts @@ -278,6 +278,43 @@ describe("main-session-restart-recovery", () => { expect(store["agent:main:main"]?.abortedLastRun).toBe(true); }); + it("resumes marked sessions with a durable pending final delivery payload (Phase 2)", async () => { + const sessionsDir = await makeSessionsDir(); + const pendingPayload = "The final answer is 42."; + await writeStore(sessionsDir, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now() - 10_000, + status: "running", + abortedLastRun: true, + pendingFinalDelivery: true, + pendingFinalDeliveryText: pendingPayload, + pendingFinalDeliveryCreatedAt: Date.now() - 5_000, + }, + }); + await writeTranscript(sessionsDir, "main-session", [ + { role: "user", content: "calculate the answer" }, + { role: "assistant", content: [{ type: "toolCall", id: "call-1", name: "calc" }] }, + { role: "toolResult", content: "42" }, + ]); + + const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir }); + + expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(callGateway).toHaveBeenCalledOnce(); + const callParams = vi.mocked(callGateway).mock.calls[0]?.[0].params as { message?: string }; + expect(callParams.message).toContain(pendingPayload); + + const store = loadSessionStore(path.join(sessionsDir, "sessions.json")); + expect(store["agent:main:main"]?.abortedLastRun).toBe(false); + expect(store["agent:main:main"]?.pendingFinalDelivery).toBe(true); + expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe(pendingPayload); + expect(store["agent:main:main"]?.pendingFinalDeliveryCreatedAt).toBeDefined(); + expect(store["agent:main:main"]?.pendingFinalDeliveryAttemptCount).toBe(1); + expect(store["agent:main:main"]?.pendingFinalDeliveryLastAttemptAt).toBeDefined(); + expect(store["agent:main:main"]?.pendingFinalDeliveryLastError).toBeNull(); + }); + it("does not scan ordinary running sessions without the restart-aborted marker", async () => { const sessionsDir = await makeSessionsDir(); await writeStore(sessionsDir, { diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts index 585a931a286..68d0607511a 100644 --- a/src/agents/main-session-restart-recovery.ts +++ b/src/agents/main-session-restart-recovery.ts @@ -116,12 +116,15 @@ function resolveMainSessionResumeBlockReason(messages: unknown[]): string | null return null; } -function buildResumeMessage(): string { - return ( +function buildResumeMessage(pendingFinalDeliveryText?: string | null): string { + const base = "[System] Your previous turn was interrupted by a gateway restart while " + "OpenClaw was waiting on tool/model work. Continue from the existing " + - "transcript and finish the interrupted response." - ); + "transcript and finish the interrupted response."; + if (pendingFinalDeliveryText) { + return `${base}\n\nNote: The interrupted final reply was captured: "${pendingFinalDeliveryText}"`; + } + return base; } async function markSessionFailed(params: { @@ -140,6 +143,13 @@ async function markSessionFailed(params: { entry.abortedLastRun = true; entry.endedAt = Date.now(); entry.updatedAt = entry.endedAt; + entry.pendingFinalDelivery = undefined; + entry.pendingFinalDeliveryText = undefined; + entry.pendingFinalDeliveryCreatedAt = undefined; + entry.pendingFinalDeliveryLastAttemptAt = undefined; + entry.pendingFinalDeliveryAttemptCount = undefined; + entry.pendingFinalDeliveryLastError = undefined; + entry.pendingFinalDeliveryContext = undefined; store[params.sessionKey] = entry; }, { skipMaintenance: true }, @@ -150,12 +160,13 @@ async function markSessionFailed(params: { async function resumeMainSession(params: { storePath: string; sessionKey: string; + pendingFinalDeliveryText?: string | null; }): Promise { try { await callGateway<{ runId: string }>({ method: "agent", params: { - message: buildResumeMessage(), + message: buildResumeMessage(params.pendingFinalDeliveryText), sessionKey: params.sessionKey, idempotencyKey: crypto.randomUUID(), deliver: false, @@ -170,13 +181,24 @@ async function resumeMainSession(params: { if (!entry) { return; } + const now = Date.now(); entry.abortedLastRun = false; - entry.updatedAt = Date.now(); + entry.updatedAt = now; + if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) { + entry.pendingFinalDeliveryLastAttemptAt = now; + entry.pendingFinalDeliveryAttemptCount = + (entry.pendingFinalDeliveryAttemptCount ?? 0) + 1; + entry.pendingFinalDeliveryLastError = null; + } store[params.sessionKey] = entry; }, { skipMaintenance: true }, ); - log.info(`resumed interrupted main session: ${params.sessionKey}`); + log.info( + `resumed interrupted main session: ${params.sessionKey}${ + params.pendingFinalDeliveryText ? " (with pending payload)" : "" + }`, + ); return true; } catch (err) { log.warn(`failed to resume interrupted main session ${params.sessionKey}: ${String(err)}`); @@ -290,6 +312,7 @@ async function recoverStore(params: { const resumed = await resumeMainSession({ storePath: params.storePath, sessionKey, + pendingFinalDeliveryText: entry.pendingFinalDeliveryText, }); if (resumed) { params.resumedSessionKeys.add(sessionKey); diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index 30753bdcc5e..e629366fa83 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -34,7 +34,7 @@ import { resolveAnnounceRetryDelayMs, safeRemoveAttachmentsDir, } from "./subagent-registry-helpers.js"; -import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import type { PendingFinalDeliveryPayload, SubagentRunRecord } from "./subagent-registry.types.js"; import { deleteSubagentSessionForCleanup } from "./subagent-session-cleanup.js"; type CaptureSubagentCompletionReply = @@ -315,11 +315,64 @@ export function createSubagentRegistryLifecycleController(params: { } }; + const clearPendingFinalDelivery = (entry: SubagentRunRecord) => { + entry.pendingFinalDelivery = undefined; + entry.pendingFinalDeliveryCreatedAt = undefined; + entry.pendingFinalDeliveryLastAttemptAt = undefined; + entry.pendingFinalDeliveryAttemptCount = undefined; + entry.pendingFinalDeliveryLastError = undefined; + entry.pendingFinalDeliveryPayload = undefined; + }; + + const loadPendingFinalDeliveryPayload = ( + entry: SubagentRunRecord, + ): PendingFinalDeliveryPayload => { + return { + requesterSessionKey: + entry.pendingFinalDeliveryPayload?.requesterSessionKey ?? entry.requesterSessionKey, + requesterOrigin: entry.pendingFinalDeliveryPayload?.requesterOrigin ?? entry.requesterOrigin, + requesterDisplayKey: + entry.pendingFinalDeliveryPayload?.requesterDisplayKey ?? entry.requesterDisplayKey, + childSessionKey: entry.pendingFinalDeliveryPayload?.childSessionKey ?? entry.childSessionKey, + childRunId: entry.pendingFinalDeliveryPayload?.childRunId ?? entry.runId, + task: entry.pendingFinalDeliveryPayload?.task ?? entry.task, + label: entry.pendingFinalDeliveryPayload?.label ?? entry.label, + startedAt: entry.pendingFinalDeliveryPayload?.startedAt ?? entry.startedAt, + endedAt: entry.pendingFinalDeliveryPayload?.endedAt ?? entry.endedAt, + outcome: entry.pendingFinalDeliveryPayload?.outcome ?? entry.outcome, + expectsCompletionMessage: + entry.pendingFinalDeliveryPayload?.expectsCompletionMessage ?? + entry.expectsCompletionMessage, + spawnMode: entry.pendingFinalDeliveryPayload?.spawnMode ?? entry.spawnMode, + frozenResultText: + entry.pendingFinalDeliveryPayload?.frozenResultText ?? entry.frozenResultText, + fallbackFrozenResultText: + entry.pendingFinalDeliveryPayload?.fallbackFrozenResultText ?? + entry.fallbackFrozenResultText, + wakeOnDescendantSettle: + entry.pendingFinalDeliveryPayload?.wakeOnDescendantSettle ?? entry.wakeOnDescendantSettle, + }; + }; + + const markPendingFinalDelivery = (args: { entry: SubagentRunRecord; error?: string }) => { + const now = Date.now(); + const payload: PendingFinalDeliveryPayload = loadPendingFinalDeliveryPayload(args.entry); + + args.entry.pendingFinalDelivery = true; + args.entry.pendingFinalDeliveryCreatedAt ??= now; + args.entry.pendingFinalDeliveryLastAttemptAt = now; + args.entry.pendingFinalDeliveryAttemptCount = + (args.entry.pendingFinalDeliveryAttemptCount ?? 0) + 1; + args.entry.pendingFinalDeliveryLastError = args.error ?? null; + args.entry.pendingFinalDeliveryPayload = payload; + }; + const finalizeResumedAnnounceGiveUp = async (giveUpParams: { runId: string; entry: SubagentRunRecord; reason: "retry-limit" | "expiry"; }) => { + clearPendingFinalDelivery(giveUpParams.entry); safeSetSubagentTaskDeliveryStatus({ runId: giveUpParams.runId, childSessionKey: giveUpParams.entry.childSessionKey, @@ -486,6 +539,7 @@ export function createSubagentRegistryLifecycleController(params: { entry.completionAnnouncedAt = Date.now(); params.persist(); } + clearPendingFinalDelivery(entry); if (!options?.skipDeliveryStatus) { safeSetSubagentTaskDeliveryStatus({ runId, @@ -544,6 +598,7 @@ export function createSubagentRegistryLifecycleController(params: { } if (deferredDecision.kind === "give-up") { + clearPendingFinalDelivery(entry); safeSetSubagentTaskDeliveryStatus({ runId, childSessionKey: entry.childSessionKey, @@ -571,6 +626,10 @@ export function createSubagentRegistryLifecycleController(params: { return; } + markPendingFinalDelivery({ + entry, + error: didAnnounce ? undefined : "announce deferred or direct delivery failed", + }); entry.cleanupHandled = false; params.resumedRuns.delete(runId); params.persist(); @@ -631,7 +690,8 @@ export function createSubagentRegistryLifecycleController(params: { }); return true; } - const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); + const pendingPayload = loadPendingFinalDeliveryPayload(entry); + const requesterOrigin = normalizeDeliveryContext(pendingPayload.requesterOrigin); let latestDeliveryError = entry.lastAnnounceDeliveryError; const finalizeAnnounceCleanup = (didAnnounce: boolean) => { if (!didAnnounce && latestDeliveryError) { @@ -650,24 +710,24 @@ export function createSubagentRegistryLifecycleController(params: { void params .runSubagentAnnounceFlow({ - childSessionKey: entry.childSessionKey, - childRunId: entry.runId, - requesterSessionKey: entry.requesterSessionKey, + childSessionKey: pendingPayload.childSessionKey, + childRunId: pendingPayload.childRunId, + requesterSessionKey: pendingPayload.requesterSessionKey, requesterOrigin, - requesterDisplayKey: entry.requesterDisplayKey, - task: entry.task, + requesterDisplayKey: pendingPayload.requesterDisplayKey, + task: pendingPayload.task, timeoutMs: params.subagentAnnounceTimeoutMs, cleanup: entry.cleanup, - roundOneReply: entry.frozenResultText ?? undefined, - fallbackReply: entry.fallbackFrozenResultText ?? undefined, + roundOneReply: pendingPayload.frozenResultText ?? undefined, + fallbackReply: pendingPayload.fallbackFrozenResultText ?? undefined, waitForCompletion: false, - startedAt: entry.startedAt, - endedAt: entry.endedAt, - label: entry.label, - outcome: entry.outcome, - spawnMode: entry.spawnMode, - expectsCompletionMessage: entry.expectsCompletionMessage, - wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true, + startedAt: pendingPayload.startedAt, + endedAt: pendingPayload.endedAt, + label: pendingPayload.label, + outcome: pendingPayload.outcome, + spawnMode: pendingPayload.spawnMode, + expectsCompletionMessage: pendingPayload.expectsCompletionMessage, + wakeOnDescendantSettle: pendingPayload.wakeOnDescendantSettle === true, onDeliveryResult: (delivery) => { if (delivery.delivered) { if (entry.lastAnnounceDeliveryError !== undefined) { diff --git a/src/agents/subagent-registry.types.ts b/src/agents/subagent-registry.types.ts index 19577062ec1..f8640a9db22 100644 --- a/src/agents/subagent-registry.types.ts +++ b/src/agents/subagent-registry.types.ts @@ -3,6 +3,24 @@ import type { SubagentRunOutcome } from "./subagent-announce-output.js"; import type { SubagentLifecycleEndedReason } from "./subagent-lifecycle-events.js"; import type { SpawnSubagentMode } from "./subagent-spawn.types.js"; +export type PendingFinalDeliveryPayload = { + requesterSessionKey: string; + requesterOrigin?: DeliveryContext; + requesterDisplayKey: string; + childSessionKey: string; + childRunId: string; + task: string; + label?: string; + startedAt?: number; + endedAt?: number; + outcome?: SubagentRunOutcome; + expectsCompletionMessage?: boolean; + spawnMode?: SpawnSubagentMode; + frozenResultText?: string | null; + fallbackFrozenResultText?: string | null; + wakeOnDescendantSettle?: boolean; +}; + export type SubagentRunRecord = { runId: string; childSessionKey: string; @@ -39,7 +57,15 @@ export type SubagentRunRecord = { frozenResultCapturedAt?: number; fallbackFrozenResultText?: string | null; fallbackFrozenResultCapturedAt?: number; + /** Set after the subagent_ended hook has been emitted successfully once. */ endedHookEmittedAt?: number; + /** Durable marker that final user delivery still needs a retry/resume pass. */ + pendingFinalDelivery?: boolean; + pendingFinalDeliveryCreatedAt?: number; + pendingFinalDeliveryLastAttemptAt?: number; + pendingFinalDeliveryAttemptCount?: number; + pendingFinalDeliveryLastError?: string | null; + pendingFinalDeliveryPayload?: PendingFinalDeliveryPayload; completionAnnouncedAt?: number; attachmentsDir?: string; attachmentsRootDir?: string; diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 9aa9d6a56c8..78ee122be98 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -1,3 +1,6 @@ +import { mkdtemp, readFile, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; @@ -112,6 +115,7 @@ function createMinimalRun(params?: { isRunActive?: () => boolean; shouldFollowup?: boolean; resolvedQueueMode?: string; + sessionCtx?: Partial; runOverrides?: Partial; }) { const typing = createMockTypingController(); @@ -119,6 +123,7 @@ function createMinimalRun(params?: { const sessionCtx = { Provider: "whatsapp", MessageSid: "msg", + ...params?.sessionCtx, } as unknown as TemplateContext; const resolvedQueue = { mode: params?.resolvedQueueMode ?? "interrupt", @@ -277,6 +282,100 @@ describe("runReplyAgent heartbeat followup guard", () => { }); }); +describe("runReplyAgent pending final delivery capture", () => { + async function createSessionStoreFile(entry: SessionEntry) { + const dir = await mkdtemp(join(tmpdir(), "openclaw-agent-runner-pending-")); + const storePath = join(dir, "sessions.json"); + await writeFile(storePath, JSON.stringify({ main: entry }), "utf8"); + return storePath; + } + + async function readStoredMainSession(storePath: string): Promise { + const raw = await readFile(storePath, "utf8"); + return JSON.parse(raw).main as SessionEntry; + } + + it("does not persist message-tool-only final replies for heartbeat replay", async () => { + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore = { main: sessionEntry }; + const storePath = await createSessionStoreFile(sessionEntry); + state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "private final" }], + meta: {}, + }); + + const { run } = createMinimalRun({ + opts: { sourceReplyDeliveryMode: "message_tool_only" }, + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + + await run(); + + const stored = await readStoredMainSession(storePath); + expect(stored.pendingFinalDelivery).toBeUndefined(); + expect(stored.pendingFinalDeliveryText).toBeUndefined(); + }); + + it("does not persist sendPolicy-denied final replies for heartbeat replay", async () => { + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + sendPolicy: "deny", + }; + const sessionStore = { main: sessionEntry }; + const storePath = await createSessionStoreFile(sessionEntry); + state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "denied final" }], + meta: {}, + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + + await run(); + + const stored = await readStoredMainSession(storePath); + expect(stored.pendingFinalDelivery).toBeUndefined(); + expect(stored.pendingFinalDeliveryText).toBeUndefined(); + }); + + it("persists only visible non-reasoning final reply text", async () => { + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore = { main: sessionEntry }; + const storePath = await createSessionStoreFile(sessionEntry); + state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hidden reasoning", isReasoning: true }, { text: "visible final" }], + meta: {}, + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + + await run(); + + const stored = await readStoredMainSession(storePath); + expect(stored.pendingFinalDelivery).toBe(true); + expect(stored.pendingFinalDeliveryText).toBe("visible final"); + }); +}); + describe("runReplyAgent typing (heartbeat)", () => { it("signals typing for normal runs", async () => { const onPartialReply = vi.fn(); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 5581a00e79a..2485061d593 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -26,6 +26,7 @@ import { } from "../../infra/diagnostic-trace-context.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js"; +import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { estimateUsageCost, @@ -82,6 +83,7 @@ import { } from "./reply-run-registry.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; +import { resolveSourceReplyVisibilityPolicy } from "./source-reply-delivery-mode.js"; import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; @@ -804,6 +806,14 @@ function joinCommitmentAssistantText(payloads: ReplyPayload[]): string { .trim(); } +function buildPendingFinalDeliveryText(payloads: ReplyPayload[]): string { + return payloads + .filter((payload) => payload.isReasoning !== true) + .map((payload) => payload.text) + .filter((text): text is string => Boolean(text)) + .join("\n\n"); +} + function enqueueCommitmentExtractionForTurn(params: { cfg: OpenClawConfig; commandBody: string; @@ -1817,11 +1827,51 @@ export async function runReplyAgent(params: { finalPayloads = appendUsageLine(finalPayloads, responseUsageLine); } - return finalizeWithFollowup( + // Capture only policy-visible final payloads in session store to support + // durable delivery retries. Hidden reasoning, message-tool-only replies, + // and sendPolicy-denied replies must not become heartbeat-replayable text. + if (sessionKey && storePath && finalPayloads.length > 0) { + const sendPolicy = resolveSendPolicy({ + cfg, + entry: activeSessionEntry, + sessionKey: params.runtimePolicySessionKey ?? sessionKey, + channel: + sessionCtx.OriginatingChannel ?? + sessionCtx.Surface ?? + sessionCtx.Provider ?? + activeSessionEntry?.channel, + chatType: activeSessionEntry?.chatType, + }); + const sourceReplyPolicy = resolveSourceReplyVisibilityPolicy({ + cfg, + ctx: sessionCtx, + requested: opts?.sourceReplyDeliveryMode, + sendPolicy, + }); + const pendingText = sourceReplyPolicy.suppressDelivery + ? "" + : buildPendingFinalDeliveryText(finalPayloads); + if (pendingText) { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + pendingFinalDelivery: true, + pendingFinalDeliveryText: pendingText, + pendingFinalDeliveryCreatedAt: Date.now(), + updatedAt: Date.now(), + }), + }); + } + } + + const result = finalizeWithFollowup( finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, queueKey, runFollowupTurn, ); + + return result; } catch (error) { if ( replyOperation.result?.kind === "aborted" && diff --git a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts index 51b7b32503d..4e36f370733 100644 --- a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts @@ -62,6 +62,7 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => { sessionStoreMocks.loadSessionStore.mockReset().mockReturnValue({}); sessionStoreMocks.resolveStorePath.mockReset().mockReturnValue("/tmp/mock-sessions.json"); sessionStoreMocks.resolveSessionStoreEntry.mockReset().mockReturnValue({ existing: undefined }); + sessionStoreMocks.updateSessionStoreEntry.mockClear(); acpManagerRuntimeMocks.getAcpSessionManager.mockReset(); acpManagerRuntimeMocks.getAcpSessionManager.mockImplementation(() => ({ resolveSession: () => ({ kind: "none" as const }), @@ -149,4 +150,67 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => { counts: { tool: 0, block: 0, final: 0 }, }); }); + + it("clears pending final delivery after final dispatch succeeds", async () => { + hookMocks.runner.hasHooks.mockReturnValue(false); + sessionStoreMocks.currentEntry = { + sessionKey: "agent:test:session", + pendingFinalDelivery: true, + pendingFinalDeliveryText: "durable reply", + pendingFinalDeliveryCreatedAt: 1, + pendingFinalDeliveryLastAttemptAt: 2, + pendingFinalDeliveryAttemptCount: 3, + pendingFinalDeliveryLastError: "previous failure", + pendingFinalDeliveryContext: { source: "heartbeat" }, + }; + sessionStoreMocks.resolveSessionStoreEntry.mockReturnValue({ + existing: sessionStoreMocks.currentEntry, + }); + mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); + + const result = await dispatchReplyFromConfig({ + ctx: createHookCtx(), + cfg: emptyConfig, + dispatcher: createDispatcher(), + replyResolver: async () => ({ text: "durable reply" }), + }); + + expect(result.queuedFinal).toBe(true); + expect(sessionStoreMocks.updateSessionStoreEntry).toHaveBeenCalledOnce(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDelivery).toBeUndefined(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryText).toBeUndefined(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryCreatedAt).toBeUndefined(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryLastAttemptAt).toBeUndefined(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryAttemptCount).toBeUndefined(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryLastError).toBeUndefined(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryContext).toBeUndefined(); + }); + + it("preserves pending final delivery when final dispatch fails", async () => { + hookMocks.runner.hasHooks.mockReturnValue(false); + sessionStoreMocks.currentEntry = { + sessionKey: "agent:test:session", + pendingFinalDelivery: true, + pendingFinalDeliveryText: "durable reply", + pendingFinalDeliveryCreatedAt: 1, + }; + sessionStoreMocks.resolveSessionStoreEntry.mockReturnValue({ + existing: sessionStoreMocks.currentEntry, + }); + const dispatcher = createDispatcher(); + vi.mocked(dispatcher.sendFinalReply).mockReturnValue(false); + + const result = await dispatchReplyFromConfig({ + ctx: createHookCtx(), + cfg: emptyConfig, + dispatcher, + replyResolver: async () => ({ text: "durable reply" }), + }); + + expect(result.queuedFinal).toBe(false); + expect(sessionStoreMocks.updateSessionStoreEntry).not.toHaveBeenCalled(); + expect(sessionStoreMocks.currentEntry?.pendingFinalDelivery).toBe(true); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryText).toBe("durable reply"); + expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryCreatedAt).toBe(1); + }); }); diff --git a/src/auto-reply/reply/dispatch-from-config.runtime.ts b/src/auto-reply/reply/dispatch-from-config.runtime.ts index 7e14375d12e..3ce5f38ff46 100644 --- a/src/auto-reply/reply/dispatch-from-config.runtime.ts +++ b/src/auto-reply/reply/dispatch-from-config.runtime.ts @@ -1,3 +1,7 @@ export { resolveStorePath } from "../../config/sessions/paths.js"; -export { loadSessionStore, resolveSessionStoreEntry } from "../../config/sessions/store.js"; +export { + loadSessionStore, + resolveSessionStoreEntry, + updateSessionStoreEntry, +} from "../../config/sessions/store.js"; export { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; diff --git a/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts b/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts index 2e1bc8c2ac6..0db92384e75 100644 --- a/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts +++ b/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts @@ -92,6 +92,21 @@ const sessionStoreMocks = vi.hoisted(() => ({ loadSessionStore: vi.fn(() => ({})), resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), + updateSessionStoreEntry: vi.fn( + async (params: { + update: (entry: Record) => Promise | null>; + }) => { + if (!sessionStoreMocks.currentEntry) { + return null; + } + const patch = await params.update(sessionStoreMocks.currentEntry); + if (!patch) { + return sessionStoreMocks.currentEntry; + } + sessionStoreMocks.currentEntry = { ...sessionStoreMocks.currentEntry, ...patch }; + return sessionStoreMocks.currentEntry; + }, + ), })); const acpManagerRuntimeMocks = vi.hoisted(() => ({ getAcpSessionManager: vi.fn(), @@ -192,6 +207,7 @@ vi.mock("./dispatch-from-config.runtime.js", () => ({ resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry, resolveStorePath: sessionStoreMocks.resolveStorePath, triggerInternalHook: internalHookMocks.triggerInternalHook, + updateSessionStoreEntry: sessionStoreMocks.updateSessionStoreEntry, })); vi.mock("../../plugins/hook-runner-global.js", () => ({ initializeGlobalHookRunner: vi.fn(), diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index edb37f73ad6..40f5d4427cd 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -109,6 +109,21 @@ const sessionStoreMocks = vi.hoisted(() => ({ loadSessionStore: vi.fn(() => ({})), resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), + updateSessionStoreEntry: vi.fn( + async (params: { + update: (entry: Record) => Promise | null>; + }) => { + if (!sessionStoreMocks.currentEntry) { + return null; + } + const patch = await params.update(sessionStoreMocks.currentEntry); + if (!patch) { + return sessionStoreMocks.currentEntry; + } + sessionStoreMocks.currentEntry = { ...sessionStoreMocks.currentEntry, ...patch }; + return sessionStoreMocks.currentEntry; + }, + ), })); const acpManagerRuntimeMocks = vi.hoisted(() => ({ getAcpSessionManager: vi.fn(), @@ -358,6 +373,7 @@ vi.mock("./dispatch-from-config.runtime.js", () => ({ resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry, resolveStorePath: sessionStoreMocks.resolveStorePath, triggerInternalHook: internalHookMocks.triggerInternalHook, + updateSessionStoreEntry: sessionStoreMocks.updateSessionStoreEntry, })); vi.mock("../../plugins/hook-runner-global.js", () => ({ diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index fccb38f84b6..7ca3c5cea9a 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -84,6 +84,7 @@ import { resolveSessionStoreEntry, resolveStorePath, triggerInternalHook, + updateSessionStoreEntry, } from "./dispatch-from-config.runtime.js"; import type { DispatchFromConfigParams, @@ -326,6 +327,34 @@ const resolveHarnessSourceVisibleRepliesDefault = (params: { } }; +async function clearPendingFinalDeliveryAfterSuccess(params: { + storePath?: string; + sessionKey?: string; +}): Promise { + if (!params.storePath || !params.sessionKey) { + return; + } + await updateSessionStoreEntry({ + storePath: params.storePath, + sessionKey: params.sessionKey, + update: async (entry) => { + if (!entry.pendingFinalDelivery && !entry.pendingFinalDeliveryText) { + return null; + } + return { + pendingFinalDelivery: undefined, + pendingFinalDeliveryText: undefined, + pendingFinalDeliveryCreatedAt: undefined, + pendingFinalDeliveryLastAttemptAt: undefined, + pendingFinalDeliveryAttemptCount: undefined, + pendingFinalDeliveryLastError: undefined, + pendingFinalDeliveryContext: undefined, + updatedAt: Date.now(), + }; + }, + }); +} + export type { DispatchFromConfigParams, DispatchFromConfigResult, @@ -1470,6 +1499,8 @@ export async function dispatchReplyFromConfig( let queuedFinal = false; let routedFinalCount = 0; + let attemptedFinalDelivery = false; + let finalDeliveryFailed = false; if (!suppressDelivery) { for (const reply of replies) { // Suppress reasoning payloads from channel delivery — channels using this @@ -1477,9 +1508,20 @@ export async function dispatchReplyFromConfig( if (reply.isReasoning === true) { continue; } + attemptedFinalDelivery = true; const finalReply = await sendFinalPayload(reply); queuedFinal = finalReply.queuedFinal || queuedFinal; routedFinalCount += finalReply.routedFinalCount; + if (!finalReply.queuedFinal && finalReply.routedFinalCount === 0) { + finalDeliveryFailed = true; + } + } + + if (attemptedFinalDelivery && !finalDeliveryFailed) { + await clearPendingFinalDeliveryAfterSuccess({ + storePath: sessionStoreEntry.storePath, + sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, + }); } const ttsMode = resolveConfiguredTtsMode(cfg, { diff --git a/src/auto-reply/reply/get-reply.ts b/src/auto-reply/reply/get-reply.ts index 1bbaf5e93ca..d81b02f4646 100644 --- a/src/auto-reply/reply/get-reply.ts +++ b/src/auto-reply/reply/get-reply.ts @@ -310,6 +310,40 @@ export async function getReplyFromConfig( triggerBodyNormalized, bodyStripped, } = sessionState; + + if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) { + const text = sessionEntry.pendingFinalDeliveryText; + + // If it's a heartbeat, we definitely want to try delivering the lost reply now. + // If it's a user message, we deliver the lost reply first, then continue. + // For now, let's just return the lost reply if it's a heartbeat. + if (opts?.isHeartbeat) { + const updatedAt = Date.now(); + const attemptCount = (sessionEntry.pendingFinalDeliveryAttemptCount ?? 0) + 1; + sessionEntry.pendingFinalDeliveryLastAttemptAt = updatedAt; + sessionEntry.pendingFinalDeliveryAttemptCount = attemptCount; + sessionEntry.pendingFinalDeliveryLastError = null; + sessionEntry.updatedAt = updatedAt; + if (sessionKey && sessionStore) { + sessionStore[sessionKey] = sessionEntry; + } + if (sessionKey && storePath) { + const { updateSessionStoreEntry } = await import("../../config/sessions.js"); + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + pendingFinalDeliveryLastAttemptAt: updatedAt, + pendingFinalDeliveryAttemptCount: attemptCount, + pendingFinalDeliveryLastError: null, + updatedAt, + }), + }); + } + return { text }; + } + } + if (resetTriggered && normalizeOptionalString(bodyStripped)) { const { applyResetModelOverride } = await loadSessionResetModelRuntime(); await applyResetModelOverride({ diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index fcffc810ab6..6e8062a5a86 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -267,6 +267,16 @@ export type SessionEntry = { inputTokens?: number; outputTokens?: number; totalTokens?: number; + /** Durable marker that final user reply delivery still needs a retry/resume pass. */ + pendingFinalDelivery?: boolean; + pendingFinalDeliveryCreatedAt?: number; + pendingFinalDeliveryLastAttemptAt?: number; + pendingFinalDeliveryAttemptCount?: number; + pendingFinalDeliveryLastError?: string | null; + /** Frozen reply text that needs delivery. */ + pendingFinalDeliveryText?: string | null; + /** Original delivery context (channel, recipient, etc). */ + pendingFinalDeliveryContext?: DeliveryContext; /** * Whether totalTokens reflects a fresh context snapshot for the latest run. * Undefined means legacy/unknown freshness; false forces consumers to treat diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index c7ef602e922..1e0dd20c75e 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -1208,6 +1208,24 @@ export async function runHeartbeatOnce(opts: { return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY }; } + // Phase 2: Stronger heartbeat deferral while a final delivery replay is pending. + // Plain `updatedAt` changes are normal for heartbeat sessions and should not + // suppress heartbeat runs; only defer when final delivery recovery is active. + const { entry: recentSessionEntry } = resolveHeartbeatSession( + cfg, + agentId, + heartbeat, + opts.sessionKey, + ); + const HEARTBEAT_DEFER_WINDOW_MS = 30_000; + if ( + recentSessionEntry?.pendingFinalDelivery === true && + recentSessionEntry?.updatedAt && + startedAt - recentSessionEntry.updatedAt < HEARTBEAT_DEFER_WINDOW_MS + ) { + return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT }; + } + // Preflight centralizes trigger classification, event inspection, and HEARTBEAT.md gating. const preflight = await resolveHeartbeatPreflight({ cfg,