fix(agents): resolve compaction wait before channel flush (#59308)

Merged via squash.

Prepared head SHA: bf17502df8
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-04-01 21:40:23 -04:00
committed by GitHub
parent 326490ab76
commit 32fa5c3be5
3 changed files with 50 additions and 11 deletions

View File

@@ -9,7 +9,10 @@ vi.mock("../infra/agent-events.js", () => ({
function createContext(
lastAssistant: unknown,
overrides?: { onAgentEvent?: (event: unknown) => void },
overrides?: {
onAgentEvent?: (event: unknown) => void;
onBlockReplyFlush?: () => void | Promise<void>;
},
): EmbeddedPiSubscribeContext {
const onBlockReply = vi.fn();
return {
@@ -19,6 +22,7 @@ function createContext(
sessionKey: "agent:main:main",
onAgentEvent: overrides?.onAgentEvent,
onBlockReply,
onBlockReplyFlush: overrides?.onBlockReplyFlush,
},
state: {
lastAssistant: lastAssistant as EmbeddedPiSubscribeContext["state"]["lastAssistant"],
@@ -179,4 +183,45 @@ describe("handleAgentEnd", () => {
expect(ctx.state.pendingToolMediaUrls).toEqual([]);
expect(ctx.state.pendingToolAudioAsVoice).toBe(false);
});
it("resolves compaction wait before awaiting an async block reply flush", async () => {
let resolveFlush: (() => void) | undefined;
const ctx = createContext(undefined);
ctx.flushBlockReplyBuffer = vi
.fn()
.mockImplementationOnce(
() =>
new Promise<void>((resolve) => {
resolveFlush = resolve;
}),
)
.mockImplementation(() => {});
const endPromise = handleAgentEnd(ctx);
expect(ctx.maybeResolveCompactionWait).toHaveBeenCalledTimes(1);
expect(ctx.resolveCompactionRetry).not.toHaveBeenCalled();
resolveFlush?.();
await endPromise;
});
it("resolves compaction wait before awaiting an async channel flush", async () => {
let resolveChannelFlush: (() => void) | undefined;
const onBlockReplyFlush = vi.fn(
() =>
new Promise<void>((resolve) => {
resolveChannelFlush = resolve;
}),
);
const ctx = createContext(undefined, { onBlockReplyFlush });
const endPromise = handleAgentEnd(ctx);
expect(ctx.maybeResolveCompactionWait).toHaveBeenCalledTimes(1);
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
resolveChannelFlush?.();
await endPromise;
});
});

View File

@@ -136,20 +136,13 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
};
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer();
finalizeAgentEnd();
if (isPromiseLike<void>(flushBlockReplyBufferResult)) {
return flushBlockReplyBufferResult
.then(() => flushPendingMediaAndChannel())
.finally(() => {
finalizeAgentEnd();
});
return flushBlockReplyBufferResult.then(() => flushPendingMediaAndChannel());
}
const flushPendingMediaAndChannelResult = flushPendingMediaAndChannel();
if (isPromiseLike<void>(flushPendingMediaAndChannelResult)) {
return flushPendingMediaAndChannelResult.finally(() => {
finalizeAgentEnd();
});
return flushPendingMediaAndChannelResult;
}
finalizeAgentEnd();
}