From f48d040bf523a23ce20b5141ca2d991bbcb82aa6 Mon Sep 17 00:00:00 2001 From: Sebastian B Otaegui Date: Mon, 20 Apr 2026 15:55:17 -0300 Subject: [PATCH] feat: send compaction start and completion notices (#67830) Merged via squash. Prepared head SHA: abedf6cf1104df231a629bb880987b916c7eae3a Co-authored-by: feniix <91633+feniix@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + docs/.generated/config-baseline.sha256 | 4 +- docs/concepts/compaction.md | 10 +- docs/gateway/configuration-reference.md | 4 +- .../reply/agent-runner-execution.test.ts | 193 ++++++++++++++++++ .../reply/agent-runner-execution.ts | 65 +++--- src/config/schema.base.generated.ts | 4 +- src/config/schema.help.ts | 2 +- src/config/types.agent-defaults.ts | 2 +- 9 files changed, 248 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46ff90a4fb9..1b0e9ab8f49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai - Plugins/tests: reuse plugin loader alias and Jiti config resolution across repeated same-context loads, reducing import-heavy test overhead. (#69316) Thanks @amknight. - Cron: split runtime execution state into `jobs-state.json` so `jobs.json` stays stable for git-tracked job definitions. (#63105) Thanks @Feelw00. +- Agents/compaction: send opt-in start and completion notices during context compaction. (#67830) Thanks @feniix. ### Fixes diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index fb3a8eddf13..acffc2bf123 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -747a24d0acf12f95ec75feabb47dad8f03ff0e3a7173b4d277c648f75d956ce5 config-baseline.json -cbb9a6ee1cb69068d5eb63f00f95512ba19778415ea5b2eabe056aaea38978b5 config-baseline.core.json +ab40431597e9f7c09a9f010f267bab250c7f9c570c4a100776de98869f589a92 config-baseline.json +04a82c2208bf69e0a195e7712e3a518a8255c1bb002c31f712cb95003325635b config-baseline.core.json e239cc20f20f8d0172812bc0ad3ee6df52da88e2e2702e3d03a47e01561132ae config-baseline.channel.json b695cb31b4c0cf1d31f842f2892e99cc3ff8d84263ae72b72977cae844b81d6e config-baseline.plugin.json diff --git a/docs/concepts/compaction.md b/docs/concepts/compaction.md index 977d50cc70f..0f68089c410 100644 --- a/docs/concepts/compaction.md +++ b/docs/concepts/compaction.md @@ -132,10 +132,10 @@ capable model for better summaries: } ``` -## Compaction start notice +## Compaction notices -By default, compaction runs silently. To show a brief notice when compaction -starts, enable `notifyUser`: +By default, compaction runs silently. To show brief notices when compaction +starts and when it completes, enable `notifyUser`: ```json5 { @@ -149,8 +149,8 @@ starts, enable `notifyUser`: } ``` -When enabled, the user sees a short message (for example, "Compacting -context...") at the start of each compaction run. +When enabled, the user sees short status messages around each compaction run +(for example, "Compacting context..." and "Compaction complete"). ## Compaction vs pruning diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index f2a28276944..959cec5e85f 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -1392,7 +1392,7 @@ Periodic heartbeat runs. identifierInstructions: "Preserve deployment IDs, ticket IDs, and host:port pairs exactly.", // used when identifierPolicy=custom postCompactionSections: ["Session Startup", "Red Lines"], // [] disables reinjection model: "openrouter/anthropic/claude-sonnet-4-6", // optional compaction-only model override - notifyUser: true, // send a brief notice when compaction starts (default: false) + notifyUser: true, // send brief notices when compaction starts and completes (default: false) memoryFlush: { enabled: true, softThresholdTokens: 6000, @@ -1412,7 +1412,7 @@ Periodic heartbeat runs. - `identifierInstructions`: optional custom identifier-preservation text used when `identifierPolicy=custom`. - `postCompactionSections`: optional AGENTS.md H2/H3 section names to re-inject after compaction. Defaults to `["Session Startup", "Red Lines"]`; set `[]` to disable reinjection. When unset or explicitly set to that default pair, older `Every Session`/`Safety` headings are also accepted as a legacy fallback. - `model`: optional `provider/model-id` override for compaction summarization only. Use this when the main session should keep one model but compaction summaries should run on another; when unset, compaction uses the session's primary model. -- `notifyUser`: when `true`, sends a brief notice to the user when compaction starts (for example, "Compacting context..."). Disabled by default to keep compaction silent. +- `notifyUser`: when `true`, sends brief notices to the user when compaction starts and when it completes (for example, "Compacting context..." and "Compaction complete"). Disabled by default to keep compaction silent. - `memoryFlush`: silent agentic turn before auto-compaction to store durable memories. Skipped when workspace is read-only. ### `agents.defaults.contextPruning` diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index 7dfc2d142ce..ed1e46c86c5 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -863,6 +863,199 @@ describe("runAgentTurnWithFallback", () => { ); }); + it("emits a compaction completion notice when notifyUser is enabled", async () => { + const onBlockReply = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); + await params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", completed: true }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const followupRun = createFollowupRun(); + followupRun.run.config = { + agents: { + defaults: { + compaction: { + notifyUser: true, + }, + }, + }, + }; + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun, + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: { onBlockReply }, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("success"); + expect(onBlockReply).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + text: "🧹 Compacting context...", + replyToId: "msg", + replyToCurrent: true, + isCompactionNotice: true, + }), + ); + expect(onBlockReply).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + text: "🧹 Compaction complete", + replyToId: "msg", + replyToCurrent: true, + isCompactionNotice: true, + }), + ); + }); + + it("prefers onCompactionEnd callback over default notice when notifyUser is enabled", async () => { + const onBlockReply = vi.fn(); + const onCompactionEnd = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); + await params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", completed: true }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const followupRun = createFollowupRun(); + followupRun.run.config = { + agents: { + defaults: { + compaction: { + notifyUser: true, + }, + }, + }, + }; + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun, + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: { onBlockReply, onCompactionEnd }, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("success"); + expect(onCompactionEnd).toHaveBeenCalledTimes(1); + // The start notice still fires (no onCompactionStart callback provided), + // but the completion notice is suppressed in favor of the callback. + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply).toHaveBeenCalledWith( + expect.objectContaining({ + text: "🧹 Compacting context...", + isCompactionNotice: true, + }), + ); + }); + + it("emits an incomplete compaction notice when compaction ends without completing", async () => { + const onBlockReply = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); + await params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", completed: false }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const followupRun = createFollowupRun(); + followupRun.run.config = { + agents: { + defaults: { + compaction: { + notifyUser: true, + }, + }, + }, + }; + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun, + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: { onBlockReply }, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("success"); + expect(onBlockReply).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + text: "🧹 Compacting context...", + isCompactionNotice: true, + }), + ); + expect(onBlockReply).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + text: "🧹 Compaction incomplete", + isCompactionNotice: true, + }), + ); + }); + it("does not show a rate-limit countdown for mixed-cause fallback exhaustion", async () => { state.runWithModelFallbackMock.mockRejectedValueOnce( Object.assign( diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 7b0eb3acece..2f9f6aa34bf 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -623,6 +623,33 @@ export async function runAgentTurnWithFallback(params: { didNotifyAgentRunStart = true; params.opts?.onAgentRunStart?.(runId); }; + const currentMessageId = params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid; + const shouldNotifyUserAboutCompaction = + runtimeConfig?.agents?.defaults?.compaction?.notifyUser === true; + const sendCompactionNotice = async (phase: "start" | "end" | "incomplete") => { + if (!params.opts?.onBlockReply) { + return; + } + const text = + phase === "start" + ? "🧹 Compacting context..." + : phase === "end" + ? "🧹 Compaction complete" + : "🧹 Compaction incomplete"; + const noticePayload = params.applyReplyToMode({ + text, + replyToId: currentMessageId, + replyToCurrent: true, + isCompactionNotice: true, + }); + try { + await params.opts.onBlockReply(noticePayload); + } catch (err) { + // Non-critical notice delivery failure should not bubble out of the + // fire-and-forget event handler. + logVerbose(`compaction ${phase} notice delivery failed (non-fatal): ${String(err)}`); + } + }; const shouldSurfaceToControlUi = isInternalMessageChannel( params.followupRun.run.messageProvider ?? params.sessionCtx.Surface ?? @@ -1142,37 +1169,27 @@ export async function runAgentTurnWithFallback(params: { if (phase === "start") { // Keep custom compaction callbacks active, but gate the // fallback user-facing notice behind explicit opt-in. - const notifyUser = - runtimeConfig?.agents?.defaults?.compaction?.notifyUser === true; if (params.opts?.onCompactionStart) { await params.opts.onCompactionStart(); - } else if (notifyUser && params.opts?.onBlockReply) { + } else if (shouldNotifyUserAboutCompaction) { // Send directly via opts.onBlockReply (bypassing the // pipeline) so the notice does not cause final payloads // to be discarded on non-streaming model paths. - const currentMessageId = - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid; - const noticePayload = params.applyReplyToMode({ - text: "🧹 Compacting context...", - replyToId: currentMessageId, - replyToCurrent: true, - isCompactionNotice: true, - }); - try { - await params.opts.onBlockReply(noticePayload); - } catch (err) { - // Non-critical notice delivery failure should not - // bubble out of the fire-and-forget event handler. - logVerbose( - `compaction start notice delivery failed (non-fatal): ${String(err)}`, - ); - } + await sendCompactionNotice("start"); } } - const completed = evt.data?.completed === true; - if (phase === "end" && completed) { - attemptCompactionCount += 1; - await params.opts?.onCompactionEnd?.(); + if (phase === "end") { + const completed = evt.data?.completed === true; + if (completed) { + attemptCompactionCount += 1; + if (params.opts?.onCompactionEnd) { + await params.opts.onCompactionEnd(); + } else if (shouldNotifyUserAboutCompaction) { + await sendCompactionNotice("end"); + } + } else if (shouldNotifyUserAboutCompaction) { + await sendCompactionNotice("incomplete"); + } } } }, diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index a0cf9dfbac6..70d36406a17 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -4590,7 +4590,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { type: "boolean", title: "Compaction Notify User", description: - "When enabled, sends a brief compaction notice to the user (e.g. '🧹 Compacting context...') when compaction starts. Disabled by default to keep compaction silent and non-intrusive.", + "When enabled, sends brief compaction notices to the user when compaction starts and when it completes (for example, '🧹 Compacting context...' and '🧹 Compaction complete'). Disabled by default to keep compaction silent and non-intrusive.", }, }, additionalProperties: false, @@ -25716,7 +25716,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { }, "agents.defaults.compaction.notifyUser": { label: "Compaction Notify User", - help: "When enabled, sends a brief compaction notice to the user (e.g. '🧹 Compacting context...') when compaction starts. Disabled by default to keep compaction silent and non-intrusive.", + help: "When enabled, sends brief compaction notices to the user when compaction starts and when it completes (for example, '🧹 Compacting context...' and '🧹 Compaction complete'). Disabled by default to keep compaction silent and non-intrusive.", tags: ["advanced"], }, "agents.defaults.compaction.memoryFlush": { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 786e204a701..91ae848a02c 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -1216,7 +1216,7 @@ export const FIELD_HELP: Record = { "agents.defaults.compaction.truncateAfterCompaction": "When enabled, rewrites the session JSONL file after compaction to remove entries that were summarized. Prevents unbounded file growth in long-running sessions with many compaction cycles. Default: false.", "agents.defaults.compaction.notifyUser": - "When enabled, sends a brief compaction notice to the user (e.g. '🧹 Compacting context...') when compaction starts. Disabled by default to keep compaction silent and non-intrusive.", + "When enabled, sends brief compaction notices to the user when compaction starts and when it completes (for example, '🧹 Compacting context...' and '🧹 Compaction complete'). Disabled by default to keep compaction silent and non-intrusive.", "agents.defaults.compaction.memoryFlush": "Pre-compaction memory flush settings that run an agentic memory write before heavy compaction. Keep enabled for long sessions so salient context is persisted before aggressive trimming.", "agents.defaults.compaction.memoryFlush.enabled": diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index e0c98d05a67..dff4d97027c 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -450,7 +450,7 @@ export type AgentCompactionConfig = { */ truncateAfterCompaction?: boolean; /** - * Send a "🧹 Compacting context..." notice to the user when compaction starts. + * Send brief compaction notices to the user when compaction starts and completes. * Default: false (silent by default). */ notifyUser?: boolean;