mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-23 07:51:33 +00:00
feat: notify user when context compaction starts and completes (#38805)
Merged via squash.
Prepared head SHA: 0f48c1bbf6
Co-authored-by: zidongdesign <81469543+zidongdesign@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
@@ -58,6 +58,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Telegram/topics: auto-rename DM forum topics on first message with LLM-generated labels, with per-account and per-DM `autoTopicLabel` overrides. (#51502) Thanks @Lukavyi.
|
||||
- Docs/plugins: add the community wecom plugin listing to the docs catalog. (#29905) Thanks @sliverp.
|
||||
- Models/GitHub Copilot: allow forward-compat dynamic model ids without code updates, while preserving configured provider and per-model overrides for those synthetic models. (#51325) Thanks @fuller-stack-dev.
|
||||
- Agents/compaction: notify users when followup auto-compaction starts and finishes, keeping those notices out of TTS and preserving reply threading for the real assistant reply. (#38805) Thanks @zidongdesign.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -199,6 +199,23 @@ export async function runAgentTurnWithFallback(params: {
|
||||
return text;
|
||||
};
|
||||
const blockReplyPipeline = params.blockReplyPipeline;
|
||||
// Build the delivery handler once so both onAgentEvent (compaction start
|
||||
// notice) and the onBlockReply field share the same instance. This
|
||||
// ensures replyToId threading (replyToMode=all|first) is applied to
|
||||
// compaction notices just like every other block reply.
|
||||
const blockReplyHandler = params.opts?.onBlockReply
|
||||
? createBlockReplyDeliveryHandler({
|
||||
onBlockReply: params.opts.onBlockReply,
|
||||
currentMessageId: params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
|
||||
normalizeStreamingText,
|
||||
applyReplyToMode: params.applyReplyToMode,
|
||||
normalizeMediaPaths: normalizeReplyMediaPaths,
|
||||
typingSignals: params.typingSignals,
|
||||
blockStreamingEnabled: params.blockStreamingEnabled,
|
||||
blockReplyPipeline,
|
||||
directlySentBlockKeys,
|
||||
})
|
||||
: undefined;
|
||||
const onToolResult = params.opts?.onToolResult;
|
||||
const fallbackResult = await runWithModelFallback({
|
||||
...resolveModelFallbackOptions(params.followupRun.run),
|
||||
@@ -394,11 +411,34 @@ export async function runAgentTurnWithFallback(params: {
|
||||
await params.opts?.onToolStart?.({ name, phase });
|
||||
}
|
||||
}
|
||||
// Track auto-compaction completion and notify UI layer.
|
||||
// Track auto-compaction and notify higher layers.
|
||||
if (evt.stream === "compaction") {
|
||||
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
|
||||
if (phase === "start") {
|
||||
await params.opts?.onCompactionStart?.();
|
||||
if (params.opts?.onCompactionStart) {
|
||||
await params.opts.onCompactionStart();
|
||||
} else if (params.opts?.onBlockReply) {
|
||||
// Send directly via opts.onBlockReply (bypassing the
|
||||
// pipeline) so the notice does not cause final payloads
|
||||
// to be discarded on non-streaming model paths.
|
||||
const currentMessageId =
|
||||
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
|
||||
const noticePayload = params.applyReplyToMode({
|
||||
text: "🧹 Compacting context...",
|
||||
replyToId: currentMessageId,
|
||||
replyToCurrent: true,
|
||||
isCompactionNotice: true,
|
||||
});
|
||||
try {
|
||||
await params.opts.onBlockReply(noticePayload);
|
||||
} catch (err) {
|
||||
// Non-critical notice delivery failure should not
|
||||
// bubble out of the fire-and-forget event handler.
|
||||
logVerbose(
|
||||
`compaction start notice delivery failed (non-fatal): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
const completed = evt.data?.completed === true;
|
||||
if (phase === "end" && completed) {
|
||||
@@ -410,20 +450,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
|
||||
// even when regular block streaming is disabled. The handler sends directly
|
||||
// via opts.onBlockReply when the pipeline isn't available.
|
||||
onBlockReply: params.opts?.onBlockReply
|
||||
? createBlockReplyDeliveryHandler({
|
||||
onBlockReply: params.opts.onBlockReply,
|
||||
currentMessageId:
|
||||
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
|
||||
normalizeStreamingText,
|
||||
applyReplyToMode: params.applyReplyToMode,
|
||||
normalizeMediaPaths: normalizeReplyMediaPaths,
|
||||
typingSignals: params.typingSignals,
|
||||
blockStreamingEnabled: params.blockStreamingEnabled,
|
||||
blockReplyPipeline,
|
||||
directlySentBlockKeys,
|
||||
})
|
||||
: undefined,
|
||||
onBlockReply: blockReplyHandler,
|
||||
onBlockReplyFlush:
|
||||
params.blockStreamingEnabled && blockReplyPipeline
|
||||
? async () => {
|
||||
|
||||
@@ -418,6 +418,11 @@ export async function runReplyAgent(params: {
|
||||
await blockReplyPipeline.flush({ force: true });
|
||||
blockReplyPipeline.stop();
|
||||
}
|
||||
|
||||
// NOTE: The compaction completion notice for block-streaming mode is sent
|
||||
// further below — after incrementRunCompactionCount — so it can include
|
||||
// the `(count N)` suffix. Sending it here (before the count is known)
|
||||
// would omit that information.
|
||||
if (pendingToolTasks.size > 0) {
|
||||
await Promise.allSettled(pendingToolTasks);
|
||||
}
|
||||
@@ -697,9 +702,48 @@ export async function runReplyAgent(params: {
|
||||
});
|
||||
}
|
||||
|
||||
if (verboseEnabled) {
|
||||
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
||||
verboseNotices.push({ text: `🧹 Auto-compaction complete${suffix}.` });
|
||||
// Always notify the user when compaction completes — not just in verbose
|
||||
// mode. The "🧹 Compacting context..." notice was already sent at start,
|
||||
// so the completion message closes the loop for every user regardless of
|
||||
// their verbose setting.
|
||||
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
||||
const completionText = verboseEnabled
|
||||
? `🧹 Auto-compaction complete${suffix}.`
|
||||
: `✅ Context compacted${suffix}.`;
|
||||
|
||||
if (blockReplyPipeline && opts?.onBlockReply) {
|
||||
// In block-streaming mode, send the completion notice via
|
||||
// fire-and-forget *after* the pipeline has flushed (so it does not set
|
||||
// didStream()=true and cause buildReplyPayloads to discard the real
|
||||
// assistant reply). Now that the count is known we can include it.
|
||||
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
|
||||
const noticePayload = applyReplyToMode({
|
||||
text: completionText,
|
||||
replyToId: currentMessageId,
|
||||
replyToCurrent: true,
|
||||
isCompactionNotice: true,
|
||||
});
|
||||
void Promise.race([
|
||||
opts.onBlockReply(noticePayload),
|
||||
new Promise<void>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs),
|
||||
),
|
||||
]).catch(() => {
|
||||
// Intentionally swallowed — the notice is informational only.
|
||||
});
|
||||
} else {
|
||||
// Non-streaming: push into verboseNotices with full compaction metadata
|
||||
// so threading exemptions apply and replyToMode=first does not thread
|
||||
// the notice instead of the real assistant reply.
|
||||
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
|
||||
verboseNotices.push(
|
||||
applyReplyToMode({
|
||||
text: completionText,
|
||||
replyToId: currentMessageId,
|
||||
replyToCurrent: true,
|
||||
isCompactionNotice: true,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
if (verboseNotices.length > 0) {
|
||||
|
||||
@@ -582,8 +582,10 @@ export async function dispatchReplyFromConfig(params: {
|
||||
if (shouldSuppressReasoningPayload(payload)) {
|
||||
return;
|
||||
}
|
||||
// Accumulate block text for TTS generation after streaming
|
||||
if (payload.text) {
|
||||
// Accumulate block text for TTS generation after streaming.
|
||||
// Exclude compaction status notices — they are informational UI
|
||||
// signals and must not be synthesised into the spoken reply.
|
||||
if (payload.text && !payload.isCompactionNotice) {
|
||||
if (accumulatedBlockText.length > 0) {
|
||||
accumulatedBlockText += "\n";
|
||||
}
|
||||
|
||||
@@ -70,6 +70,10 @@ function mockCompactionRun(params: {
|
||||
async (args: {
|
||||
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
|
||||
}) => {
|
||||
args.onAgentEvent?.({
|
||||
stream: "compaction",
|
||||
data: { phase: "start" },
|
||||
});
|
||||
args.onAgentEvent?.({
|
||||
stream: "compaction",
|
||||
data: { phase: "end", willRetry: params.willRetry, completed: true },
|
||||
@@ -84,7 +88,7 @@ function createAsyncReplySpy() {
|
||||
}
|
||||
|
||||
describe("createFollowupRunner compaction", () => {
|
||||
it("adds verbose auto-compaction notice and tracks count", async () => {
|
||||
it("adds compaction notices and tracks count in verbose mode", async () => {
|
||||
const storePath = path.join(
|
||||
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")),
|
||||
"sessions.json",
|
||||
@@ -122,9 +126,15 @@ describe("createFollowupRunner compaction", () => {
|
||||
|
||||
await runner(queued);
|
||||
|
||||
expect(onBlockReply).toHaveBeenCalled();
|
||||
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
|
||||
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(3);
|
||||
const calls = onBlockReply.mock.calls as unknown as Array<
|
||||
Array<{ text?: string; isCompactionNotice?: boolean }>
|
||||
>;
|
||||
expect(calls[0]?.[0]?.text).toBe("🧹 Compacting context...");
|
||||
expect(calls[0]?.[0]?.isCompactionNotice).toBe(true);
|
||||
expect(calls[1]?.[0]?.text).toContain("Auto-compaction complete");
|
||||
expect(calls[1]?.[0]?.isCompactionNotice).toBe(true);
|
||||
expect(calls[2]?.[0]?.text).toBe("final");
|
||||
expect(sessionStore.main.compactionCount).toBe(1);
|
||||
});
|
||||
|
||||
@@ -171,12 +181,84 @@ describe("createFollowupRunner compaction", () => {
|
||||
|
||||
await runner(queued);
|
||||
|
||||
expect(onBlockReply).toHaveBeenCalled();
|
||||
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
|
||||
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(2);
|
||||
const calls = onBlockReply.mock.calls as unknown as Array<
|
||||
Array<{ text?: string; isCompactionNotice?: boolean }>
|
||||
>;
|
||||
expect(calls[0]?.[0]?.text).toContain("Auto-compaction complete");
|
||||
expect(calls[0]?.[0]?.isCompactionNotice).toBe(true);
|
||||
expect(calls[1]?.[0]?.text).toBe("final");
|
||||
expect(sessionStore.main.compactionCount).toBe(2);
|
||||
});
|
||||
|
||||
it("threads followup compaction notices without consuming the first reply slot", async () => {
|
||||
const storePath = path.join(
|
||||
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-threading-")),
|
||||
"sessions.json",
|
||||
);
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
const sessionStore: Record<string, SessionEntry> = {
|
||||
main: sessionEntry,
|
||||
};
|
||||
const onBlockReply = vi.fn(async () => {});
|
||||
|
||||
mockCompactionRun({
|
||||
willRetry: true,
|
||||
result: { payloads: [{ text: "final" }], meta: {} },
|
||||
});
|
||||
|
||||
const runner = createFollowupRunner({
|
||||
opts: { onBlockReply },
|
||||
typing: createMockTypingController(),
|
||||
typingMode: "instant",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
defaultModel: "anthropic/claude-opus-4-5",
|
||||
});
|
||||
|
||||
const queued = createQueuedRun({
|
||||
messageId: "msg-42",
|
||||
run: {
|
||||
messageProvider: "discord",
|
||||
config: {
|
||||
channels: {
|
||||
discord: {
|
||||
replyToMode: "first",
|
||||
},
|
||||
},
|
||||
},
|
||||
verboseLevel: "off",
|
||||
},
|
||||
});
|
||||
|
||||
await runner(queued);
|
||||
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(3);
|
||||
const calls = onBlockReply.mock.calls as unknown as Array<
|
||||
Array<{ text?: string; replyToId?: string; isCompactionNotice?: boolean }>
|
||||
>;
|
||||
expect(calls[0]?.[0]).toMatchObject({
|
||||
text: "🧹 Compacting context...",
|
||||
replyToId: "msg-42",
|
||||
isCompactionNotice: true,
|
||||
});
|
||||
expect(calls[1]?.[0]).toMatchObject({
|
||||
text: "✅ Context compacted (count 1).",
|
||||
replyToId: "msg-42",
|
||||
isCompactionNotice: true,
|
||||
});
|
||||
expect(calls[2]?.[0]).toMatchObject({
|
||||
text: "final",
|
||||
replyToId: "msg-42",
|
||||
});
|
||||
expect(calls[2]?.[0]?.isCompactionNotice).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not count failed compaction end events in followup runs", async () => {
|
||||
const storePath = path.join(
|
||||
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-failed-")),
|
||||
|
||||
@@ -148,6 +148,43 @@ export function createFollowupRunner(params: {
|
||||
isControlUiVisible: shouldSurfaceToControlUi,
|
||||
});
|
||||
}
|
||||
const replyToChannel = resolveOriginMessageProvider({
|
||||
originatingChannel: queued.originatingChannel,
|
||||
provider: queued.run.messageProvider,
|
||||
}) as OriginatingChannelType | undefined;
|
||||
const replyToMode = resolveReplyToMode(
|
||||
queued.run.config,
|
||||
replyToChannel,
|
||||
queued.originatingAccountId,
|
||||
queued.originatingChatType,
|
||||
);
|
||||
const currentMessageId = queued.messageId?.trim() || undefined;
|
||||
const applyFollowupReplyThreading = (payloads: ReplyPayload[]) =>
|
||||
applyReplyThreading({
|
||||
payloads,
|
||||
replyToMode,
|
||||
replyToChannel,
|
||||
currentMessageId,
|
||||
});
|
||||
const sendCompactionNotice = async (text: string) => {
|
||||
const noticePayloads = applyFollowupReplyThreading([
|
||||
{
|
||||
text,
|
||||
replyToCurrent: true,
|
||||
isCompactionNotice: true,
|
||||
},
|
||||
]);
|
||||
if (noticePayloads.length === 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await sendFollowupPayloads(noticePayloads, queued);
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`followup queue: compaction notice delivery failed (non-fatal): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
let autoCompactionCount = 0;
|
||||
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
||||
let fallbackProvider = queued.run.provider;
|
||||
@@ -229,6 +266,9 @@ export function createFollowupRunner(params: {
|
||||
return;
|
||||
}
|
||||
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
|
||||
if (phase === "start") {
|
||||
void sendCompactionNotice("🧹 Compacting context...");
|
||||
}
|
||||
const completed = evt.data?.completed === true;
|
||||
if (phase === "end" && completed) {
|
||||
attemptCompactionCount += 1;
|
||||
@@ -284,9 +324,6 @@ export function createFollowupRunner(params: {
|
||||
}
|
||||
|
||||
const payloadArray = runResult.payloads ?? [];
|
||||
if (payloadArray.length === 0) {
|
||||
return;
|
||||
}
|
||||
const sanitizedPayloads = payloadArray.flatMap((payload) => {
|
||||
const text = payload.text;
|
||||
if (!text || !text.includes("HEARTBEAT_OK")) {
|
||||
@@ -299,22 +336,7 @@ export function createFollowupRunner(params: {
|
||||
}
|
||||
return [{ ...payload, text: stripped.text }];
|
||||
});
|
||||
const replyToChannel = resolveOriginMessageProvider({
|
||||
originatingChannel: queued.originatingChannel,
|
||||
provider: queued.run.messageProvider,
|
||||
}) as OriginatingChannelType | undefined;
|
||||
const replyToMode = resolveReplyToMode(
|
||||
queued.run.config,
|
||||
replyToChannel,
|
||||
queued.originatingAccountId,
|
||||
queued.originatingChatType,
|
||||
);
|
||||
|
||||
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
|
||||
payloads: sanitizedPayloads,
|
||||
replyToMode,
|
||||
replyToChannel,
|
||||
});
|
||||
const replyTaggedPayloads = applyFollowupReplyThreading(sanitizedPayloads);
|
||||
|
||||
const dedupedPayloads = filterMessagingToolDuplicates({
|
||||
payloads: replyTaggedPayloads,
|
||||
@@ -338,11 +360,7 @@ export function createFollowupRunner(params: {
|
||||
accountId: queued.run.agentAccountId,
|
||||
}),
|
||||
});
|
||||
const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;
|
||||
|
||||
if (finalPayloads.length === 0) {
|
||||
return;
|
||||
}
|
||||
let finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;
|
||||
|
||||
if (autoCompactionCount > 0) {
|
||||
const count = await incrementRunCompactionCount({
|
||||
@@ -354,12 +372,25 @@ export function createFollowupRunner(params: {
|
||||
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
|
||||
contextTokensUsed,
|
||||
});
|
||||
if (queued.run.verboseLevel && queued.run.verboseLevel !== "off") {
|
||||
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
||||
finalPayloads.unshift({
|
||||
text: `🧹 Auto-compaction complete${suffix}.`,
|
||||
});
|
||||
}
|
||||
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
||||
const completionText =
|
||||
queued.run.verboseLevel && queued.run.verboseLevel !== "off"
|
||||
? `🧹 Auto-compaction complete${suffix}.`
|
||||
: `✅ Context compacted${suffix}.`;
|
||||
finalPayloads = [
|
||||
...applyFollowupReplyThreading([
|
||||
{
|
||||
text: completionText,
|
||||
replyToCurrent: true,
|
||||
isCompactionNotice: true,
|
||||
},
|
||||
]),
|
||||
...finalPayloads,
|
||||
];
|
||||
}
|
||||
|
||||
if (finalPayloads.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await sendFollowupPayloads(finalPayloads, queued);
|
||||
|
||||
@@ -33,7 +33,12 @@ export function createReplyToModeFilter(
|
||||
}
|
||||
if (mode === "off") {
|
||||
const isExplicit = Boolean(payload.replyToTag) || Boolean(payload.replyToCurrent);
|
||||
if (opts.allowExplicitReplyTagsWhenOff && isExplicit) {
|
||||
// Compaction notices must never be threaded when replyToMode=off — even
|
||||
// if they carry explicit reply tags (replyToCurrent). Honouring the
|
||||
// explicit tag here would make status notices appear in-thread while
|
||||
// normal assistant replies stay off-thread, contradicting the off-mode
|
||||
// expectation. Strip replyToId unconditionally for compaction payloads.
|
||||
if (opts.allowExplicitReplyTagsWhenOff && isExplicit && !payload.isCompactionNotice) {
|
||||
return payload;
|
||||
}
|
||||
return { ...payload, replyToId: undefined };
|
||||
@@ -42,9 +47,21 @@ export function createReplyToModeFilter(
|
||||
return payload;
|
||||
}
|
||||
if (hasThreaded) {
|
||||
// Compaction notices are transient status messages that should always
|
||||
// appear in-thread, even after the first assistant block has already
|
||||
// consumed the "first" slot. Let them keep their replyToId.
|
||||
if (payload.isCompactionNotice) {
|
||||
return payload;
|
||||
}
|
||||
return { ...payload, replyToId: undefined };
|
||||
}
|
||||
hasThreaded = true;
|
||||
// Compaction notices are transient status messages — they should be
|
||||
// threaded (so they appear in-context), but they must not consume the
|
||||
// "first" slot of the replyToMode=first filter. Skip advancing
|
||||
// hasThreaded so the real assistant reply still gets replyToId.
|
||||
if (!payload.isCompactionNotice) {
|
||||
hasThreaded = true;
|
||||
}
|
||||
return payload;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -91,6 +91,10 @@ export type ReplyPayload = {
|
||||
/** Marks this payload as a reasoning/thinking block. Channels that do not
|
||||
* have a dedicated reasoning lane (e.g. WhatsApp, web) should suppress it. */
|
||||
isReasoning?: boolean;
|
||||
/** Marks this payload as a compaction status notice (start/end).
|
||||
* Should be excluded from TTS transcript accumulation so compaction
|
||||
* status lines are not synthesised into the spoken assistant reply. */
|
||||
isCompactionNotice?: boolean;
|
||||
/** Channel-specific payload data (per-channel envelope). */
|
||||
channelData?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
@@ -825,6 +825,10 @@ export async function maybeApplyTtsToPayload(params: {
|
||||
inboundAudio?: boolean;
|
||||
ttsAuto?: string;
|
||||
}): Promise<ReplyPayload> {
|
||||
// Compaction notices are informational UI signals — never synthesise them as speech.
|
||||
if (params.payload.isCompactionNotice) {
|
||||
return params.payload;
|
||||
}
|
||||
const config = resolveTtsConfig(params.cfg);
|
||||
const prefsPath = resolveTtsPrefsPath(config);
|
||||
const autoMode = resolveTtsAutoMode({
|
||||
|
||||
Reference in New Issue
Block a user