Feat/main session durable delivery pr (#75280)

* feat: generalize pending-final-delivery for subagents and main session

(cherry picked from commit 677fcbfaf87c8cd6de8b5bd02099b29b7d49e916)

* feat(agents): implement Phase 2 durable final delivery for main sessions

(cherry picked from commit b4e39f0ddf6dbd3f0d3b9226df8e714ad722f751)

* fix(agents): narrow heartbeat deferral to pending final delivery

* fix(agents): clear final delivery after dispatch

* fix(agents): gate durable delivery retry capture

---------

Co-authored-by: Mert Basar <MertBasar0@users.noreply.github.com>
This commit is contained in:
Mert Başar
2026-05-04 20:44:11 +03:00
committed by GitHub
parent 7b8315d18e
commit c240e718e9
18 changed files with 647 additions and 30 deletions

View File

@@ -17,8 +17,11 @@ import {
import { formatErrorMessage } from "../infra/errors.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { resolveAgentIdFromSessionKey } from "../routing/session-key.js";
import {
isSubagentSessionKey,
normalizeAgentId,
resolveAgentIdFromSessionKey,
} from "../routing/session-key.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { applyVerboseOverride } from "../sessions/level-overrides.js";
import { applyModelOverrideToSessionEntry } from "../sessions/model-overrides.js";
@@ -1246,8 +1249,43 @@ async function agentCommandInternal(
}
const payloads = result.payloads ?? [];
// Phase 2: Persist pending final delivery for main sessions before attempting delivery.
// This ensures that if the process restarts during delivery, the payload is durable.
if (
opts.deliver === true &&
sessionStore &&
sessionKey &&
payloads.length > 0 &&
!isSubagentSessionKey(sessionKey)
) {
const now = Date.now();
const combinedPayload = payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n");
if (combinedPayload) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const next: SessionEntry = {
...entry,
pendingFinalDelivery: true,
pendingFinalDeliveryText: combinedPayload,
pendingFinalDeliveryCreatedAt: now,
updatedAt: now,
};
await persistSessionEntry({
sessionStore,
sessionKey,
storePath,
entry: next,
});
sessionEntry = next;
}
}
const { deliverAgentCommandResult } = await loadDeliveryRuntime();
return await deliverAgentCommandResult({
const deliveryResult = await deliverAgentCommandResult({
cfg,
deps: resolvedDeps,
runtime,
@@ -1257,6 +1295,32 @@ async function agentCommandInternal(
result,
payloads,
});
// Phase 2: Clear pending delivery payload after successful delivery.
if (
deliveryResult?.deliverySucceeded === true &&
sessionStore &&
sessionKey &&
!isSubagentSessionKey(sessionKey)
) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const next: SessionEntry = {
...entry,
pendingFinalDelivery: undefined,
pendingFinalDeliveryText: undefined,
pendingFinalDeliveryCreatedAt: undefined,
updatedAt: Date.now(),
};
await persistSessionEntry({
sessionStore,
sessionKey,
storePath,
entry: next,
});
sessionEntry = next;
}
return deliveryResult;
} finally {
clearAgentRunContext(runId);
}

View File

@@ -215,6 +215,52 @@ describe("normalizeAgentCommandReplyPayloads", () => {
});
});
it("reports successful requested delivery", async () => {
deliverOutboundPayloadsMock.mockResolvedValue([]);
const delivered = await deliverMediaReplyForTest({
key: "agent:tester:slack:direct:alice",
agentId: "tester",
} as never);
expect(delivered.deliverySucceeded).toBe(true);
});
it("does not report success when best-effort delivery records an error", async () => {
deliverOutboundPayloadsMock.mockImplementationOnce(async (params: unknown) => {
(params as { onError?: (err: unknown) => void }).onError?.(new Error("send failed"));
return [];
});
const runtime = { log: vi.fn(), error: vi.fn() };
const delivered = await deliverAgentCommandResult({
cfg: {
agents: {
list: [{ id: "tester", workspace: "/tmp/agent-workspace" }],
},
} as OpenClawConfig,
deps: {} as CliDeps,
runtime: runtime as never,
opts: {
message: "go",
deliver: true,
bestEffortDeliver: true,
replyChannel: "slack",
replyTo: "#general",
} as AgentCommandOpts,
outboundSession: {
key: "agent:tester:slack:direct:alice",
agentId: "tester",
} as never,
sessionEntry: undefined,
payloads: [{ text: "here you go" }],
result: createResult(),
});
expect(delivered.deliverySucceeded).toBe(false);
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("send failed"));
});
it("threads agentId into the normalizer when sessionKey is unresolved", async () => {
createReplyMediaPathNormalizerMock.mockReturnValue(async (payload: ReplyPayload) => payload);
deliverOutboundPayloadsMock.mockResolvedValue([]);

View File

@@ -354,6 +354,8 @@ export async function deliverAgentCommandResult(params: {
}
const deliveryPayloads = projectOutboundPayloadPlanForOutbound(outboundPayloadPlan);
let deliverySucceeded = false;
let deliveryHadError = false;
const logPayload = (payload: NormalizedOutboundPayload) => {
if (opts.json) {
return;
@@ -368,6 +370,10 @@ export async function deliverAgentCommandResult(params: {
}
runtime.log(output);
};
const markDeliveryError = (err: unknown) => {
deliveryHadError = true;
logDeliveryError(err);
};
if (!deliver) {
for (const payload of deliveryPayloads) {
logPayload(payload);
@@ -385,12 +391,13 @@ export async function deliverAgentCommandResult(params: {
replyToId: resolvedReplyToId ?? null,
threadId: resolvedThreadTarget ?? null,
bestEffort: bestEffortDeliver,
onError: (err) => logDeliveryError(err),
onError: markDeliveryError,
onPayload: logPayload,
deps: createOutboundSendDeps(deps),
});
deliverySucceeded = !deliveryHadError;
}
}
return { payloads: normalizedPayloads, meta: resultMeta };
return { payloads: normalizedPayloads, meta: resultMeta, deliverySucceeded };
}

View File

@@ -278,6 +278,43 @@ describe("main-session-restart-recovery", () => {
expect(store["agent:main:main"]?.abortedLastRun).toBe(true);
});
it("resumes marked sessions with a durable pending final delivery payload (Phase 2)", async () => {
const sessionsDir = await makeSessionsDir();
const pendingPayload = "The final answer is 42.";
await writeStore(sessionsDir, {
"agent:main:main": {
sessionId: "main-session",
updatedAt: Date.now() - 10_000,
status: "running",
abortedLastRun: true,
pendingFinalDelivery: true,
pendingFinalDeliveryText: pendingPayload,
pendingFinalDeliveryCreatedAt: Date.now() - 5_000,
},
});
await writeTranscript(sessionsDir, "main-session", [
{ role: "user", content: "calculate the answer" },
{ role: "assistant", content: [{ type: "toolCall", id: "call-1", name: "calc" }] },
{ role: "toolResult", content: "42" },
]);
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
expect(callGateway).toHaveBeenCalledOnce();
const callParams = vi.mocked(callGateway).mock.calls[0]?.[0].params as { message?: string };
expect(callParams.message).toContain(pendingPayload);
const store = loadSessionStore(path.join(sessionsDir, "sessions.json"));
expect(store["agent:main:main"]?.abortedLastRun).toBe(false);
expect(store["agent:main:main"]?.pendingFinalDelivery).toBe(true);
expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe(pendingPayload);
expect(store["agent:main:main"]?.pendingFinalDeliveryCreatedAt).toBeDefined();
expect(store["agent:main:main"]?.pendingFinalDeliveryAttemptCount).toBe(1);
expect(store["agent:main:main"]?.pendingFinalDeliveryLastAttemptAt).toBeDefined();
expect(store["agent:main:main"]?.pendingFinalDeliveryLastError).toBeNull();
});
it("does not scan ordinary running sessions without the restart-aborted marker", async () => {
const sessionsDir = await makeSessionsDir();
await writeStore(sessionsDir, {

View File

@@ -116,12 +116,15 @@ function resolveMainSessionResumeBlockReason(messages: unknown[]): string | null
return null;
}
function buildResumeMessage(): string {
return (
function buildResumeMessage(pendingFinalDeliveryText?: string | null): string {
const base =
"[System] Your previous turn was interrupted by a gateway restart while " +
"OpenClaw was waiting on tool/model work. Continue from the existing " +
"transcript and finish the interrupted response."
);
"transcript and finish the interrupted response.";
if (pendingFinalDeliveryText) {
return `${base}\n\nNote: The interrupted final reply was captured: "${pendingFinalDeliveryText}"`;
}
return base;
}
async function markSessionFailed(params: {
@@ -140,6 +143,13 @@ async function markSessionFailed(params: {
entry.abortedLastRun = true;
entry.endedAt = Date.now();
entry.updatedAt = entry.endedAt;
entry.pendingFinalDelivery = undefined;
entry.pendingFinalDeliveryText = undefined;
entry.pendingFinalDeliveryCreatedAt = undefined;
entry.pendingFinalDeliveryLastAttemptAt = undefined;
entry.pendingFinalDeliveryAttemptCount = undefined;
entry.pendingFinalDeliveryLastError = undefined;
entry.pendingFinalDeliveryContext = undefined;
store[params.sessionKey] = entry;
},
{ skipMaintenance: true },
@@ -150,12 +160,13 @@ async function markSessionFailed(params: {
async function resumeMainSession(params: {
storePath: string;
sessionKey: string;
pendingFinalDeliveryText?: string | null;
}): Promise<boolean> {
try {
await callGateway<{ runId: string }>({
method: "agent",
params: {
message: buildResumeMessage(),
message: buildResumeMessage(params.pendingFinalDeliveryText),
sessionKey: params.sessionKey,
idempotencyKey: crypto.randomUUID(),
deliver: false,
@@ -170,13 +181,24 @@ async function resumeMainSession(params: {
if (!entry) {
return;
}
const now = Date.now();
entry.abortedLastRun = false;
entry.updatedAt = Date.now();
entry.updatedAt = now;
if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) {
entry.pendingFinalDeliveryLastAttemptAt = now;
entry.pendingFinalDeliveryAttemptCount =
(entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
entry.pendingFinalDeliveryLastError = null;
}
store[params.sessionKey] = entry;
},
{ skipMaintenance: true },
);
log.info(`resumed interrupted main session: ${params.sessionKey}`);
log.info(
`resumed interrupted main session: ${params.sessionKey}${
params.pendingFinalDeliveryText ? " (with pending payload)" : ""
}`,
);
return true;
} catch (err) {
log.warn(`failed to resume interrupted main session ${params.sessionKey}: ${String(err)}`);
@@ -290,6 +312,7 @@ async function recoverStore(params: {
const resumed = await resumeMainSession({
storePath: params.storePath,
sessionKey,
pendingFinalDeliveryText: entry.pendingFinalDeliveryText,
});
if (resumed) {
params.resumedSessionKeys.add(sessionKey);

View File

@@ -34,7 +34,7 @@ import {
resolveAnnounceRetryDelayMs,
safeRemoveAttachmentsDir,
} from "./subagent-registry-helpers.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
import type { PendingFinalDeliveryPayload, SubagentRunRecord } from "./subagent-registry.types.js";
import { deleteSubagentSessionForCleanup } from "./subagent-session-cleanup.js";
type CaptureSubagentCompletionReply =
@@ -315,11 +315,64 @@ export function createSubagentRegistryLifecycleController(params: {
}
};
const clearPendingFinalDelivery = (entry: SubagentRunRecord) => {
entry.pendingFinalDelivery = undefined;
entry.pendingFinalDeliveryCreatedAt = undefined;
entry.pendingFinalDeliveryLastAttemptAt = undefined;
entry.pendingFinalDeliveryAttemptCount = undefined;
entry.pendingFinalDeliveryLastError = undefined;
entry.pendingFinalDeliveryPayload = undefined;
};
const loadPendingFinalDeliveryPayload = (
entry: SubagentRunRecord,
): PendingFinalDeliveryPayload => {
return {
requesterSessionKey:
entry.pendingFinalDeliveryPayload?.requesterSessionKey ?? entry.requesterSessionKey,
requesterOrigin: entry.pendingFinalDeliveryPayload?.requesterOrigin ?? entry.requesterOrigin,
requesterDisplayKey:
entry.pendingFinalDeliveryPayload?.requesterDisplayKey ?? entry.requesterDisplayKey,
childSessionKey: entry.pendingFinalDeliveryPayload?.childSessionKey ?? entry.childSessionKey,
childRunId: entry.pendingFinalDeliveryPayload?.childRunId ?? entry.runId,
task: entry.pendingFinalDeliveryPayload?.task ?? entry.task,
label: entry.pendingFinalDeliveryPayload?.label ?? entry.label,
startedAt: entry.pendingFinalDeliveryPayload?.startedAt ?? entry.startedAt,
endedAt: entry.pendingFinalDeliveryPayload?.endedAt ?? entry.endedAt,
outcome: entry.pendingFinalDeliveryPayload?.outcome ?? entry.outcome,
expectsCompletionMessage:
entry.pendingFinalDeliveryPayload?.expectsCompletionMessage ??
entry.expectsCompletionMessage,
spawnMode: entry.pendingFinalDeliveryPayload?.spawnMode ?? entry.spawnMode,
frozenResultText:
entry.pendingFinalDeliveryPayload?.frozenResultText ?? entry.frozenResultText,
fallbackFrozenResultText:
entry.pendingFinalDeliveryPayload?.fallbackFrozenResultText ??
entry.fallbackFrozenResultText,
wakeOnDescendantSettle:
entry.pendingFinalDeliveryPayload?.wakeOnDescendantSettle ?? entry.wakeOnDescendantSettle,
};
};
const markPendingFinalDelivery = (args: { entry: SubagentRunRecord; error?: string }) => {
const now = Date.now();
const payload: PendingFinalDeliveryPayload = loadPendingFinalDeliveryPayload(args.entry);
args.entry.pendingFinalDelivery = true;
args.entry.pendingFinalDeliveryCreatedAt ??= now;
args.entry.pendingFinalDeliveryLastAttemptAt = now;
args.entry.pendingFinalDeliveryAttemptCount =
(args.entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
args.entry.pendingFinalDeliveryLastError = args.error ?? null;
args.entry.pendingFinalDeliveryPayload = payload;
};
const finalizeResumedAnnounceGiveUp = async (giveUpParams: {
runId: string;
entry: SubagentRunRecord;
reason: "retry-limit" | "expiry";
}) => {
clearPendingFinalDelivery(giveUpParams.entry);
safeSetSubagentTaskDeliveryStatus({
runId: giveUpParams.runId,
childSessionKey: giveUpParams.entry.childSessionKey,
@@ -486,6 +539,7 @@ export function createSubagentRegistryLifecycleController(params: {
entry.completionAnnouncedAt = Date.now();
params.persist();
}
clearPendingFinalDelivery(entry);
if (!options?.skipDeliveryStatus) {
safeSetSubagentTaskDeliveryStatus({
runId,
@@ -544,6 +598,7 @@ export function createSubagentRegistryLifecycleController(params: {
}
if (deferredDecision.kind === "give-up") {
clearPendingFinalDelivery(entry);
safeSetSubagentTaskDeliveryStatus({
runId,
childSessionKey: entry.childSessionKey,
@@ -571,6 +626,10 @@ export function createSubagentRegistryLifecycleController(params: {
return;
}
markPendingFinalDelivery({
entry,
error: didAnnounce ? undefined : "announce deferred or direct delivery failed",
});
entry.cleanupHandled = false;
params.resumedRuns.delete(runId);
params.persist();
@@ -631,7 +690,8 @@ export function createSubagentRegistryLifecycleController(params: {
});
return true;
}
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
const pendingPayload = loadPendingFinalDeliveryPayload(entry);
const requesterOrigin = normalizeDeliveryContext(pendingPayload.requesterOrigin);
let latestDeliveryError = entry.lastAnnounceDeliveryError;
const finalizeAnnounceCleanup = (didAnnounce: boolean) => {
if (!didAnnounce && latestDeliveryError) {
@@ -650,24 +710,24 @@ export function createSubagentRegistryLifecycleController(params: {
void params
.runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey,
childSessionKey: pendingPayload.childSessionKey,
childRunId: pendingPayload.childRunId,
requesterSessionKey: pendingPayload.requesterSessionKey,
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
requesterDisplayKey: pendingPayload.requesterDisplayKey,
task: pendingPayload.task,
timeoutMs: params.subagentAnnounceTimeoutMs,
cleanup: entry.cleanup,
roundOneReply: entry.frozenResultText ?? undefined,
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
roundOneReply: pendingPayload.frozenResultText ?? undefined,
fallbackReply: pendingPayload.fallbackFrozenResultText ?? undefined,
waitForCompletion: false,
startedAt: entry.startedAt,
endedAt: entry.endedAt,
label: entry.label,
outcome: entry.outcome,
spawnMode: entry.spawnMode,
expectsCompletionMessage: entry.expectsCompletionMessage,
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
startedAt: pendingPayload.startedAt,
endedAt: pendingPayload.endedAt,
label: pendingPayload.label,
outcome: pendingPayload.outcome,
spawnMode: pendingPayload.spawnMode,
expectsCompletionMessage: pendingPayload.expectsCompletionMessage,
wakeOnDescendantSettle: pendingPayload.wakeOnDescendantSettle === true,
onDeliveryResult: (delivery) => {
if (delivery.delivered) {
if (entry.lastAnnounceDeliveryError !== undefined) {

View File

@@ -3,6 +3,24 @@ import type { SubagentRunOutcome } from "./subagent-announce-output.js";
import type { SubagentLifecycleEndedReason } from "./subagent-lifecycle-events.js";
import type { SpawnSubagentMode } from "./subagent-spawn.types.js";
export type PendingFinalDeliveryPayload = {
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
requesterDisplayKey: string;
childSessionKey: string;
childRunId: string;
task: string;
label?: string;
startedAt?: number;
endedAt?: number;
outcome?: SubagentRunOutcome;
expectsCompletionMessage?: boolean;
spawnMode?: SpawnSubagentMode;
frozenResultText?: string | null;
fallbackFrozenResultText?: string | null;
wakeOnDescendantSettle?: boolean;
};
export type SubagentRunRecord = {
runId: string;
childSessionKey: string;
@@ -39,7 +57,15 @@ export type SubagentRunRecord = {
frozenResultCapturedAt?: number;
fallbackFrozenResultText?: string | null;
fallbackFrozenResultCapturedAt?: number;
/** Set after the subagent_ended hook has been emitted successfully once. */
endedHookEmittedAt?: number;
/** Durable marker that final user delivery still needs a retry/resume pass. */
pendingFinalDelivery?: boolean;
pendingFinalDeliveryCreatedAt?: number;
pendingFinalDeliveryLastAttemptAt?: number;
pendingFinalDeliveryAttemptCount?: number;
pendingFinalDeliveryLastError?: string | null;
pendingFinalDeliveryPayload?: PendingFinalDeliveryPayload;
completionAnnouncedAt?: number;
attachmentsDir?: string;
attachmentsRootDir?: string;

View File

@@ -1,3 +1,6 @@
import { mkdtemp, readFile, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
@@ -112,6 +115,7 @@ function createMinimalRun(params?: {
isRunActive?: () => boolean;
shouldFollowup?: boolean;
resolvedQueueMode?: string;
sessionCtx?: Partial<TemplateContext>;
runOverrides?: Partial<FollowupRun["run"]>;
}) {
const typing = createMockTypingController();
@@ -119,6 +123,7 @@ function createMinimalRun(params?: {
const sessionCtx = {
Provider: "whatsapp",
MessageSid: "msg",
...params?.sessionCtx,
} as unknown as TemplateContext;
const resolvedQueue = {
mode: params?.resolvedQueueMode ?? "interrupt",
@@ -277,6 +282,100 @@ describe("runReplyAgent heartbeat followup guard", () => {
});
});
describe("runReplyAgent pending final delivery capture", () => {
async function createSessionStoreFile(entry: SessionEntry) {
const dir = await mkdtemp(join(tmpdir(), "openclaw-agent-runner-pending-"));
const storePath = join(dir, "sessions.json");
await writeFile(storePath, JSON.stringify({ main: entry }), "utf8");
return storePath;
}
async function readStoredMainSession(storePath: string): Promise<SessionEntry> {
const raw = await readFile(storePath, "utf8");
return JSON.parse(raw).main as SessionEntry;
}
it("does not persist message-tool-only final replies for heartbeat replay", async () => {
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
};
const sessionStore = { main: sessionEntry };
const storePath = await createSessionStoreFile(sessionEntry);
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "private final" }],
meta: {},
});
const { run } = createMinimalRun({
opts: { sourceReplyDeliveryMode: "message_tool_only" },
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
await run();
const stored = await readStoredMainSession(storePath);
expect(stored.pendingFinalDelivery).toBeUndefined();
expect(stored.pendingFinalDeliveryText).toBeUndefined();
});
it("does not persist sendPolicy-denied final replies for heartbeat replay", async () => {
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
sendPolicy: "deny",
};
const sessionStore = { main: sessionEntry };
const storePath = await createSessionStoreFile(sessionEntry);
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "denied final" }],
meta: {},
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
await run();
const stored = await readStoredMainSession(storePath);
expect(stored.pendingFinalDelivery).toBeUndefined();
expect(stored.pendingFinalDeliveryText).toBeUndefined();
});
it("persists only visible non-reasoning final reply text", async () => {
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
};
const sessionStore = { main: sessionEntry };
const storePath = await createSessionStoreFile(sessionEntry);
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hidden reasoning", isReasoning: true }, { text: "visible final" }],
meta: {},
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
await run();
const stored = await readStoredMainSession(storePath);
expect(stored.pendingFinalDelivery).toBe(true);
expect(stored.pendingFinalDeliveryText).toBe("visible final");
});
});
describe("runReplyAgent typing (heartbeat)", () => {
it("signals typing for normal runs", async () => {
const onPartialReply = vi.fn();

View File

@@ -26,6 +26,7 @@ import {
} from "../../infra/diagnostic-trace-context.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import {
estimateUsageCost,
@@ -82,6 +83,7 @@ import {
} from "./reply-run-registry.js";
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
import { resolveSourceReplyVisibilityPolicy } from "./source-reply-delivery-mode.js";
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
@@ -804,6 +806,14 @@ function joinCommitmentAssistantText(payloads: ReplyPayload[]): string {
.trim();
}
function buildPendingFinalDeliveryText(payloads: ReplyPayload[]): string {
return payloads
.filter((payload) => payload.isReasoning !== true)
.map((payload) => payload.text)
.filter((text): text is string => Boolean(text))
.join("\n\n");
}
function enqueueCommitmentExtractionForTurn(params: {
cfg: OpenClawConfig;
commandBody: string;
@@ -1817,11 +1827,51 @@ export async function runReplyAgent(params: {
finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
}
return finalizeWithFollowup(
// Capture only policy-visible final payloads in session store to support
// durable delivery retries. Hidden reasoning, message-tool-only replies,
// and sendPolicy-denied replies must not become heartbeat-replayable text.
if (sessionKey && storePath && finalPayloads.length > 0) {
const sendPolicy = resolveSendPolicy({
cfg,
entry: activeSessionEntry,
sessionKey: params.runtimePolicySessionKey ?? sessionKey,
channel:
sessionCtx.OriginatingChannel ??
sessionCtx.Surface ??
sessionCtx.Provider ??
activeSessionEntry?.channel,
chatType: activeSessionEntry?.chatType,
});
const sourceReplyPolicy = resolveSourceReplyVisibilityPolicy({
cfg,
ctx: sessionCtx,
requested: opts?.sourceReplyDeliveryMode,
sendPolicy,
});
const pendingText = sourceReplyPolicy.suppressDelivery
? ""
: buildPendingFinalDeliveryText(finalPayloads);
if (pendingText) {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({
pendingFinalDelivery: true,
pendingFinalDeliveryText: pendingText,
pendingFinalDeliveryCreatedAt: Date.now(),
updatedAt: Date.now(),
}),
});
}
}
const result = finalizeWithFollowup(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
queueKey,
runFollowupTurn,
);
return result;
} catch (error) {
if (
replyOperation.result?.kind === "aborted" &&

View File

@@ -62,6 +62,7 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => {
sessionStoreMocks.loadSessionStore.mockReset().mockReturnValue({});
sessionStoreMocks.resolveStorePath.mockReset().mockReturnValue("/tmp/mock-sessions.json");
sessionStoreMocks.resolveSessionStoreEntry.mockReset().mockReturnValue({ existing: undefined });
sessionStoreMocks.updateSessionStoreEntry.mockClear();
acpManagerRuntimeMocks.getAcpSessionManager.mockReset();
acpManagerRuntimeMocks.getAcpSessionManager.mockImplementation(() => ({
resolveSession: () => ({ kind: "none" as const }),
@@ -149,4 +150,67 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => {
counts: { tool: 0, block: 0, final: 0 },
});
});
it("clears pending final delivery after final dispatch succeeds", async () => {
hookMocks.runner.hasHooks.mockReturnValue(false);
sessionStoreMocks.currentEntry = {
sessionKey: "agent:test:session",
pendingFinalDelivery: true,
pendingFinalDeliveryText: "durable reply",
pendingFinalDeliveryCreatedAt: 1,
pendingFinalDeliveryLastAttemptAt: 2,
pendingFinalDeliveryAttemptCount: 3,
pendingFinalDeliveryLastError: "previous failure",
pendingFinalDeliveryContext: { source: "heartbeat" },
};
sessionStoreMocks.resolveSessionStoreEntry.mockReturnValue({
existing: sessionStoreMocks.currentEntry,
});
mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" });
const result = await dispatchReplyFromConfig({
ctx: createHookCtx(),
cfg: emptyConfig,
dispatcher: createDispatcher(),
replyResolver: async () => ({ text: "durable reply" }),
});
expect(result.queuedFinal).toBe(true);
expect(sessionStoreMocks.updateSessionStoreEntry).toHaveBeenCalledOnce();
expect(sessionStoreMocks.currentEntry?.pendingFinalDelivery).toBeUndefined();
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryText).toBeUndefined();
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryCreatedAt).toBeUndefined();
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryLastAttemptAt).toBeUndefined();
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryAttemptCount).toBeUndefined();
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryLastError).toBeUndefined();
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryContext).toBeUndefined();
});
it("preserves pending final delivery when final dispatch fails", async () => {
hookMocks.runner.hasHooks.mockReturnValue(false);
sessionStoreMocks.currentEntry = {
sessionKey: "agent:test:session",
pendingFinalDelivery: true,
pendingFinalDeliveryText: "durable reply",
pendingFinalDeliveryCreatedAt: 1,
};
sessionStoreMocks.resolveSessionStoreEntry.mockReturnValue({
existing: sessionStoreMocks.currentEntry,
});
const dispatcher = createDispatcher();
vi.mocked(dispatcher.sendFinalReply).mockReturnValue(false);
const result = await dispatchReplyFromConfig({
ctx: createHookCtx(),
cfg: emptyConfig,
dispatcher,
replyResolver: async () => ({ text: "durable reply" }),
});
expect(result.queuedFinal).toBe(false);
expect(sessionStoreMocks.updateSessionStoreEntry).not.toHaveBeenCalled();
expect(sessionStoreMocks.currentEntry?.pendingFinalDelivery).toBe(true);
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryText).toBe("durable reply");
expect(sessionStoreMocks.currentEntry?.pendingFinalDeliveryCreatedAt).toBe(1);
});
});

View File

@@ -1,3 +1,7 @@
export { resolveStorePath } from "../../config/sessions/paths.js";
export { loadSessionStore, resolveSessionStoreEntry } from "../../config/sessions/store.js";
export {
loadSessionStore,
resolveSessionStoreEntry,
updateSessionStoreEntry,
} from "../../config/sessions/store.js";
export { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";

View File

@@ -92,6 +92,21 @@ const sessionStoreMocks = vi.hoisted(() => ({
loadSessionStore: vi.fn(() => ({})),
resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"),
resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })),
updateSessionStoreEntry: vi.fn(
async (params: {
update: (entry: Record<string, unknown>) => Promise<Record<string, unknown> | null>;
}) => {
if (!sessionStoreMocks.currentEntry) {
return null;
}
const patch = await params.update(sessionStoreMocks.currentEntry);
if (!patch) {
return sessionStoreMocks.currentEntry;
}
sessionStoreMocks.currentEntry = { ...sessionStoreMocks.currentEntry, ...patch };
return sessionStoreMocks.currentEntry;
},
),
}));
const acpManagerRuntimeMocks = vi.hoisted(() => ({
getAcpSessionManager: vi.fn(),
@@ -192,6 +207,7 @@ vi.mock("./dispatch-from-config.runtime.js", () => ({
resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry,
resolveStorePath: sessionStoreMocks.resolveStorePath,
triggerInternalHook: internalHookMocks.triggerInternalHook,
updateSessionStoreEntry: sessionStoreMocks.updateSessionStoreEntry,
}));
vi.mock("../../plugins/hook-runner-global.js", () => ({
initializeGlobalHookRunner: vi.fn(),

View File

@@ -109,6 +109,21 @@ const sessionStoreMocks = vi.hoisted(() => ({
loadSessionStore: vi.fn(() => ({})),
resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"),
resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })),
updateSessionStoreEntry: vi.fn(
async (params: {
update: (entry: Record<string, unknown>) => Promise<Record<string, unknown> | null>;
}) => {
if (!sessionStoreMocks.currentEntry) {
return null;
}
const patch = await params.update(sessionStoreMocks.currentEntry);
if (!patch) {
return sessionStoreMocks.currentEntry;
}
sessionStoreMocks.currentEntry = { ...sessionStoreMocks.currentEntry, ...patch };
return sessionStoreMocks.currentEntry;
},
),
}));
const acpManagerRuntimeMocks = vi.hoisted(() => ({
getAcpSessionManager: vi.fn(),
@@ -358,6 +373,7 @@ vi.mock("./dispatch-from-config.runtime.js", () => ({
resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry,
resolveStorePath: sessionStoreMocks.resolveStorePath,
triggerInternalHook: internalHookMocks.triggerInternalHook,
updateSessionStoreEntry: sessionStoreMocks.updateSessionStoreEntry,
}));
vi.mock("../../plugins/hook-runner-global.js", () => ({

View File

@@ -84,6 +84,7 @@ import {
resolveSessionStoreEntry,
resolveStorePath,
triggerInternalHook,
updateSessionStoreEntry,
} from "./dispatch-from-config.runtime.js";
import type {
DispatchFromConfigParams,
@@ -326,6 +327,34 @@ const resolveHarnessSourceVisibleRepliesDefault = (params: {
}
};
async function clearPendingFinalDeliveryAfterSuccess(params: {
storePath?: string;
sessionKey?: string;
}): Promise<void> {
if (!params.storePath || !params.sessionKey) {
return;
}
await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async (entry) => {
if (!entry.pendingFinalDelivery && !entry.pendingFinalDeliveryText) {
return null;
}
return {
pendingFinalDelivery: undefined,
pendingFinalDeliveryText: undefined,
pendingFinalDeliveryCreatedAt: undefined,
pendingFinalDeliveryLastAttemptAt: undefined,
pendingFinalDeliveryAttemptCount: undefined,
pendingFinalDeliveryLastError: undefined,
pendingFinalDeliveryContext: undefined,
updatedAt: Date.now(),
};
},
});
}
export type {
DispatchFromConfigParams,
DispatchFromConfigResult,
@@ -1470,6 +1499,8 @@ export async function dispatchReplyFromConfig(
let queuedFinal = false;
let routedFinalCount = 0;
let attemptedFinalDelivery = false;
let finalDeliveryFailed = false;
if (!suppressDelivery) {
for (const reply of replies) {
// Suppress reasoning payloads from channel delivery — channels using this
@@ -1477,9 +1508,20 @@ export async function dispatchReplyFromConfig(
if (reply.isReasoning === true) {
continue;
}
attemptedFinalDelivery = true;
const finalReply = await sendFinalPayload(reply);
queuedFinal = finalReply.queuedFinal || queuedFinal;
routedFinalCount += finalReply.routedFinalCount;
if (!finalReply.queuedFinal && finalReply.routedFinalCount === 0) {
finalDeliveryFailed = true;
}
}
if (attemptedFinalDelivery && !finalDeliveryFailed) {
await clearPendingFinalDeliveryAfterSuccess({
storePath: sessionStoreEntry.storePath,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
});
}
const ttsMode = resolveConfiguredTtsMode(cfg, {

View File

@@ -310,6 +310,40 @@ export async function getReplyFromConfig(
triggerBodyNormalized,
bodyStripped,
} = sessionState;
if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) {
const text = sessionEntry.pendingFinalDeliveryText;
// If it's a heartbeat, we definitely want to try delivering the lost reply now.
// If it's a user message, we deliver the lost reply first, then continue.
// For now, let's just return the lost reply if it's a heartbeat.
if (opts?.isHeartbeat) {
const updatedAt = Date.now();
const attemptCount = (sessionEntry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
sessionEntry.pendingFinalDeliveryLastAttemptAt = updatedAt;
sessionEntry.pendingFinalDeliveryAttemptCount = attemptCount;
sessionEntry.pendingFinalDeliveryLastError = null;
sessionEntry.updatedAt = updatedAt;
if (sessionKey && sessionStore) {
sessionStore[sessionKey] = sessionEntry;
}
if (sessionKey && storePath) {
const { updateSessionStoreEntry } = await import("../../config/sessions.js");
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({
pendingFinalDeliveryLastAttemptAt: updatedAt,
pendingFinalDeliveryAttemptCount: attemptCount,
pendingFinalDeliveryLastError: null,
updatedAt,
}),
});
}
return { text };
}
}
if (resetTriggered && normalizeOptionalString(bodyStripped)) {
const { applyResetModelOverride } = await loadSessionResetModelRuntime();
await applyResetModelOverride({

View File

@@ -267,6 +267,16 @@ export type SessionEntry = {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
/** Durable marker that final user reply delivery still needs a retry/resume pass. */
pendingFinalDelivery?: boolean;
pendingFinalDeliveryCreatedAt?: number;
pendingFinalDeliveryLastAttemptAt?: number;
pendingFinalDeliveryAttemptCount?: number;
pendingFinalDeliveryLastError?: string | null;
/** Frozen reply text that needs delivery. */
pendingFinalDeliveryText?: string | null;
/** Original delivery context (channel, recipient, etc). */
pendingFinalDeliveryContext?: DeliveryContext;
/**
* Whether totalTokens reflects a fresh context snapshot for the latest run.
* Undefined means legacy/unknown freshness; false forces consumers to treat

View File

@@ -1208,6 +1208,24 @@ export async function runHeartbeatOnce(opts: {
return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY };
}
// Phase 2: Stronger heartbeat deferral while a final delivery replay is pending.
// Plain `updatedAt` changes are normal for heartbeat sessions and should not
// suppress heartbeat runs; only defer when final delivery recovery is active.
const { entry: recentSessionEntry } = resolveHeartbeatSession(
cfg,
agentId,
heartbeat,
opts.sessionKey,
);
const HEARTBEAT_DEFER_WINDOW_MS = 30_000;
if (
recentSessionEntry?.pendingFinalDelivery === true &&
recentSessionEntry?.updatedAt &&
startedAt - recentSessionEntry.updatedAt < HEARTBEAT_DEFER_WINDOW_MS
) {
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
}
// Preflight centralizes trigger classification, event inspection, and HEARTBEAT.md gating.
const preflight = await resolveHeartbeatPreflight({
cfg,