diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index a9698616c8b..dcc317cbaa9 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -1018,6 +1018,39 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.flush).toHaveBeenCalled(); }); + it("renders Telegram progress drafts before slow status reactions resolve", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + let releaseSetTool: (() => void) | undefined; + const statusReactionController = createStatusReactionController(); + statusReactionController.setTool.mockImplementation( + () => + new Promise((resolve) => { + releaseSetTool = resolve; + }), + ); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await Promise.resolve(); + await Promise.resolve(); + const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0]; + releaseSetTool?.(); + await pendingToolStart; + expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`🛠️ Exec`$/); + return { queuedFinal: false }; + }); + + await dispatchWithContext({ + context: createContext({ + statusReactionController: statusReactionController as never, + }), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(statusReactionController.setTool).toHaveBeenCalledWith("exec"); + }); + it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => { const draftStream = createSequencedDraftStream(2001); createTelegramDraftStream.mockReturnValue(draftStream); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index d537d1f945c..805ca1f332d 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -1207,10 +1207,7 @@ export const dispatchTelegramMessage = async ({ !streamDeliveryEnabled || Boolean(answerLane.stream), onToolStart: async (payload) => { const toolName = payload.name?.trim(); - if (statusReactionController && toolName) { - await statusReactionController.setTool(toolName); - } - await pushStreamToolProgress( + const progressPromise = pushStreamToolProgress( formatChannelProgressDraftLineForEntry( telegramCfg, { @@ -1223,6 +1220,10 @@ export const dispatchTelegramMessage = async ({ ), { toolName, startImmediately: true }, ); + if (statusReactionController && toolName) { + await statusReactionController.setTool(toolName); + } + await progressPromise; }, onItemEvent: async (payload) => { await pushStreamToolProgress( diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index 7ecb22af4d4..72ccfc26518 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -1178,6 +1178,54 @@ describe("runAgentTurnWithFallback", () => { }); }); + it("fires tool-start progress before slow typing signals resolve for best-effort Pi events", async () => { + const onToolStart = vi.fn(async () => {}); + let releaseTyping: (() => void) | undefined; + const typingSignals = createMockTypingSignaler(); + vi.mocked(typingSignals.signalToolStart).mockImplementation( + () => + new Promise((resolve) => { + releaseTyping = resolve; + }), + ); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + void params.onAgentEvent?.({ + stream: "tool", + data: { + name: "exec", + phase: "start", + args: { command: "echo hi" }, + }, + }); + await Promise.resolve(); + await Promise.resolve(); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + ...createMinimalRunAgentTurnParams({ + opts: { + onToolStart, + } satisfies GetReplyOptions, + }), + typingSignals, + }); + + try { + expect(result.kind).toBe("success"); + expect(onToolStart).toHaveBeenCalledWith({ + name: "exec", + phase: "start", + args: { command: "echo hi" }, + detailMode: undefined, + }); + } finally { + releaseTyping?.(); + await Promise.resolve(); + } + }); + it("leaves Codex app-server telemetry publication to the harness", async () => { const agentEvents = await import("../../infra/agent-events.js"); const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent); diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index a962203c19d..53126ec75b1 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -1660,8 +1660,7 @@ export async function runAgentTurnWithFallback(params: { const phase = readStringValue(evt.data.phase) ?? ""; const name = readStringValue(evt.data.name); if (phase === "start" || phase === "update") { - await params.typingSignals.signalToolStart(); - await params.opts?.onToolStart?.({ + const toolStartProgressPromise = params.opts?.onToolStart?.({ name, phase, args: @@ -1670,6 +1669,10 @@ export async function runAgentTurnWithFallback(params: { : undefined, detailMode: params.toolProgressDetail, }); + await Promise.all([ + params.typingSignals.signalToolStart(), + toolStartProgressPromise, + ]); } } if (evt.stream === "item") {