feat: send compaction start and completion notices (#67830)

Merged via squash.

Prepared head SHA: abedf6cf11
Co-authored-by: feniix <91633+feniix@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Sebastian B Otaegui
2026-04-20 15:55:17 -03:00
committed by GitHub
parent 1603577dfd
commit f48d040bf5
9 changed files with 248 additions and 37 deletions

View File

@@ -863,6 +863,199 @@ describe("runAgentTurnWithFallback", () => {
);
});
it("emits a compaction completion notice when notifyUser is enabled", async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", completed: true },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun,
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("success");
expect(onBlockReply).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
text: "🧹 Compacting context...",
replyToId: "msg",
replyToCurrent: true,
isCompactionNotice: true,
}),
);
expect(onBlockReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
text: "🧹 Compaction complete",
replyToId: "msg",
replyToCurrent: true,
isCompactionNotice: true,
}),
);
});
it("prefers onCompactionEnd callback over default notice when notifyUser is enabled", async () => {
const onBlockReply = vi.fn();
const onCompactionEnd = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", completed: true },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun,
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { onBlockReply, onCompactionEnd },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("success");
expect(onCompactionEnd).toHaveBeenCalledTimes(1);
// The start notice still fires (no onCompactionStart callback provided),
// but the completion notice is suppressed in favor of the callback.
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply).toHaveBeenCalledWith(
expect.objectContaining({
text: "🧹 Compacting context...",
isCompactionNotice: true,
}),
);
});
it("emits an incomplete compaction notice when compaction ends without completing", async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", completed: false },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun,
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("success");
expect(onBlockReply).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
text: "🧹 Compacting context...",
isCompactionNotice: true,
}),
);
expect(onBlockReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
text: "🧹 Compaction incomplete",
isCompactionNotice: true,
}),
);
});
it("does not show a rate-limit countdown for mixed-cause fallback exhaustion", async () => {
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(

View File

@@ -623,6 +623,33 @@ export async function runAgentTurnWithFallback(params: {
didNotifyAgentRunStart = true;
params.opts?.onAgentRunStart?.(runId);
};
const currentMessageId = params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
const shouldNotifyUserAboutCompaction =
runtimeConfig?.agents?.defaults?.compaction?.notifyUser === true;
const sendCompactionNotice = async (phase: "start" | "end" | "incomplete") => {
if (!params.opts?.onBlockReply) {
return;
}
const text =
phase === "start"
? "🧹 Compacting context..."
: phase === "end"
? "🧹 Compaction complete"
: "🧹 Compaction incomplete";
const noticePayload = params.applyReplyToMode({
text,
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
});
try {
await params.opts.onBlockReply(noticePayload);
} catch (err) {
// Non-critical notice delivery failure should not bubble out of the
// fire-and-forget event handler.
logVerbose(`compaction ${phase} notice delivery failed (non-fatal): ${String(err)}`);
}
};
const shouldSurfaceToControlUi = isInternalMessageChannel(
params.followupRun.run.messageProvider ??
params.sessionCtx.Surface ??
@@ -1142,37 +1169,27 @@ export async function runAgentTurnWithFallback(params: {
if (phase === "start") {
// Keep custom compaction callbacks active, but gate the
// fallback user-facing notice behind explicit opt-in.
const notifyUser =
runtimeConfig?.agents?.defaults?.compaction?.notifyUser === true;
if (params.opts?.onCompactionStart) {
await params.opts.onCompactionStart();
} else if (notifyUser && params.opts?.onBlockReply) {
} else if (shouldNotifyUserAboutCompaction) {
// Send directly via opts.onBlockReply (bypassing the
// pipeline) so the notice does not cause final payloads
// to be discarded on non-streaming model paths.
const currentMessageId =
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
const noticePayload = params.applyReplyToMode({
text: "🧹 Compacting context...",
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
});
try {
await params.opts.onBlockReply(noticePayload);
} catch (err) {
// Non-critical notice delivery failure should not
// bubble out of the fire-and-forget event handler.
logVerbose(
`compaction start notice delivery failed (non-fatal): ${String(err)}`,
);
}
await sendCompactionNotice("start");
}
}
const completed = evt.data?.completed === true;
if (phase === "end" && completed) {
attemptCompactionCount += 1;
await params.opts?.onCompactionEnd?.();
if (phase === "end") {
const completed = evt.data?.completed === true;
if (completed) {
attemptCompactionCount += 1;
if (params.opts?.onCompactionEnd) {
await params.opts.onCompactionEnd();
} else if (shouldNotifyUserAboutCompaction) {
await sendCompactionNotice("end");
}
} else if (shouldNotifyUserAboutCompaction) {
await sendCompactionNotice("incomplete");
}
}
}
},