Fix Telegram Pi tool progress timing

This commit is contained in:
Kelaw - Keshav's Agent
2026-05-05 23:56:04 +05:30
committed by Peter Steinberger
parent e483115131
commit 14044edf28
4 changed files with 91 additions and 6 deletions

View File

@@ -1018,6 +1018,39 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.flush).toHaveBeenCalled();
});
it("renders Telegram progress drafts before slow status reactions resolve", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
let releaseSetTool: (() => void) | undefined;
const statusReactionController = createStatusReactionController();
statusReactionController.setTool.mockImplementation(
() =>
new Promise<void>((resolve) => {
releaseSetTool = resolve;
}),
);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await Promise.resolve();
await Promise.resolve();
const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0];
releaseSetTool?.();
await pendingToolStart;
expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`🛠️ Exec`$/);
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
}),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(statusReactionController.setTool).toHaveBeenCalledWith("exec");
});
it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);

View File

@@ -1207,10 +1207,7 @@ export const dispatchTelegramMessage = async ({
!streamDeliveryEnabled || Boolean(answerLane.stream),
onToolStart: async (payload) => {
const toolName = payload.name?.trim();
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
await pushStreamToolProgress(
const progressPromise = pushStreamToolProgress(
formatChannelProgressDraftLineForEntry(
telegramCfg,
{
@@ -1223,6 +1220,10 @@ export const dispatchTelegramMessage = async ({
),
{ toolName, startImmediately: true },
);
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
await progressPromise;
},
onItemEvent: async (payload) => {
await pushStreamToolProgress(

View File

@@ -1178,6 +1178,54 @@ describe("runAgentTurnWithFallback", () => {
});
});
it("fires tool-start progress before slow typing signals resolve for best-effort Pi events", async () => {
const onToolStart = vi.fn(async () => {});
let releaseTyping: (() => void) | undefined;
const typingSignals = createMockTypingSignaler();
vi.mocked(typingSignals.signalToolStart).mockImplementation(
() =>
new Promise<void>((resolve) => {
releaseTyping = resolve;
}),
);
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
void params.onAgentEvent?.({
stream: "tool",
data: {
name: "exec",
phase: "start",
args: { command: "echo hi" },
},
});
await Promise.resolve();
await Promise.resolve();
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
...createMinimalRunAgentTurnParams({
opts: {
onToolStart,
} satisfies GetReplyOptions,
}),
typingSignals,
});
try {
expect(result.kind).toBe("success");
expect(onToolStart).toHaveBeenCalledWith({
name: "exec",
phase: "start",
args: { command: "echo hi" },
detailMode: undefined,
});
} finally {
releaseTyping?.();
await Promise.resolve();
}
});
it("leaves Codex app-server telemetry publication to the harness", async () => {
const agentEvents = await import("../../infra/agent-events.js");
const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent);

View File

@@ -1660,8 +1660,7 @@ export async function runAgentTurnWithFallback(params: {
const phase = readStringValue(evt.data.phase) ?? "";
const name = readStringValue(evt.data.name);
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
await params.opts?.onToolStart?.({
const toolStartProgressPromise = params.opts?.onToolStart?.({
name,
phase,
args:
@@ -1670,6 +1669,10 @@ export async function runAgentTurnWithFallback(params: {
: undefined,
detailMode: params.toolProgressDetail,
});
await Promise.all([
params.typingSignals.signalToolStart(),
toolStartProgressPromise,
]);
}
}
if (evt.stream === "item") {