diff --git a/extensions/discord/src/monitor/message-handler.draft-preview.ts b/extensions/discord/src/monitor/message-handler.draft-preview.ts index 3e6f19ecfec..c1ab9b2672e 100644 --- a/extensions/discord/src/monitor/message-handler.draft-preview.ts +++ b/extensions/discord/src/monitor/message-handler.draft-preview.ts @@ -256,12 +256,14 @@ export function createDiscordDraftPreviewController(params: { ); } const alreadyStarted = progressDraftGate.hasStarted; + let progressActive = false; if (shouldStartDiscordProgressDraftNow(line)) { await progressDraftGate.startNow(); + progressActive = progressDraftGate.hasStarted; } else { - await progressDraftGate.noteWork(); + progressActive = await progressDraftGate.noteWork(); } - if (alreadyStarted && progressDraftGate.hasStarted) { + if ((alreadyStarted || progressActive) && progressDraftGate.hasStarted) { await renderProgressDraft(); } }, @@ -294,9 +296,8 @@ export function createDiscordDraftPreviewController(params: { } lastReasoningProgressLine = normalized; } - const alreadyStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); - if (alreadyStarted && progressDraftGate.hasStarted) { + const progressActive = await progressDraftGate.noteWork(); + if (progressActive && progressDraftGate.hasStarted) { await renderProgressDraft(); } }, diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index ebeddc69c76..26992f01e35 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1650,8 +1650,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ); } const alreadyStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); - if (alreadyStarted && progressDraftGate.hasStarted) { + const progressActive = await progressDraftGate.noteWork(); + if ((alreadyStarted || progressActive) && progressDraftGate.hasStarted) { renderProgressDraft(); } }; diff --git a/extensions/msteams/src/reply-stream-controller.ts b/extensions/msteams/src/reply-stream-controller.ts index 36c879e9459..27c886ddb91 100644 --- a/extensions/msteams/src/reply-stream-controller.ts +++ b/extensions/msteams/src/reply-stream-controller.ts @@ -215,10 +215,10 @@ export function createTeamsReplyStreamController(params: { return; } const hadStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); + const progressActive = await progressDraftGate.noteWork(); // If the gate was already started, the call above is a no-op — refresh // the informative line manually so the latest progress lines render. - if (hadStarted && progressDraftGate.hasStarted) { + if ((hadStarted || progressActive) && progressDraftGate.hasStarted) { renderInformativeUpdate(); } }, @@ -252,8 +252,8 @@ export function createTeamsReplyStreamController(params: { } } const hadStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); - if (hadStarted && progressDraftGate.hasStarted) { + const progressActive = await progressDraftGate.noteWork(); + if ((hadStarted || progressActive) && progressDraftGate.hasStarted) { renderInformativeUpdate(); } }, diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index f01b95fb6dc..3a492973648 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -1492,8 +1492,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag return; } const alreadyStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); - if (alreadyStarted && progressDraftGate.hasStarted) { + const progressActive = await progressDraftGate.noteWork(); + if ((alreadyStarted || progressActive) && progressDraftGate.hasStarted) { await refreshStartedProgressDraft(); } return; @@ -1533,12 +1533,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag await updateNativeProgressStream(); } else { await progressDraftGate.startNow(); + if (progressDraftGate.hasStarted) { + await updateNativeProgressStream(); + } } return; } const alreadyStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); - if (alreadyStarted && progressDraftGate.hasStarted) { + const progressActive = await progressDraftGate.noteWork(); + if ((alreadyStarted || progressActive) && progressDraftGate.hasStarted) { await refreshStartedProgressDraft(); } }; diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 568fa449820..7aef61810a2 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -1046,17 +1046,16 @@ export const dispatchTelegramMessage = async ({ } streamToolProgressLines = nextLines; if (options?.startImmediately) { - const alreadyStarted = progressDraftGate.hasStarted; await progressDraftGate.startNow(); - if (alreadyStarted && progressDraftGate.hasStarted) { + if (progressDraftGate.hasStarted) { await renderProgressDraft(); return true; } return progressDraftGate.hasStarted; } const alreadyStarted = progressDraftGate.hasStarted; - await progressDraftGate.noteWork(); - if (alreadyStarted && progressDraftGate.hasStarted) { + const progressActive = await progressDraftGate.noteWork(); + if ((alreadyStarted || progressActive) && progressDraftGate.hasStarted) { await renderProgressDraft(); return true; } diff --git a/src/channels/streaming.ts b/src/channels/streaming.ts index 72baeddbbff..ce266928891 100644 --- a/src/channels/streaming.ts +++ b/src/channels/streaming.ts @@ -539,9 +539,29 @@ export function createChannelProgressDraftGate(params: { if (disposed || started) { return startPromise ?? Promise.resolve(); } - started = true; + if (startPromise) { + return startPromise; + } clearTimer(); - startPromise = Promise.resolve().then(params.onStart); + started = true; + const nextStart = Promise.resolve() + .then(params.onStart) + .then(() => { + if (disposed) { + started = false; + } + if (startPromise === nextStart) { + startPromise = undefined; + } + }) + .catch((error: unknown) => { + if (startPromise === nextStart) { + startPromise = undefined; + } + started = false; + throw error; + }); + startPromise = nextStart; return startPromise; }; @@ -567,12 +587,16 @@ export function createChannelProgressDraftGate(params: { return false; } workEvents += 1; + if (startPromise) { + await startPromise; + return started; + } if (started) { return true; } if (workEvents > 1) { await start(); - return true; + return started; } schedule(); return false; @@ -582,6 +606,7 @@ export function createChannelProgressDraftGate(params: { }, cancel(): void { disposed = true; + started = false; clearTimer(); }, }; diff --git a/src/plugin-sdk/channel-streaming.test.ts b/src/plugin-sdk/channel-streaming.test.ts index 1539876accd..58bed9b6341 100644 --- a/src/plugin-sdk/channel-streaming.test.ts +++ b/src/plugin-sdk/channel-streaming.test.ts @@ -590,6 +590,103 @@ describe("channel-streaming", () => { expect(onStart).toHaveBeenCalledTimes(1); }); + it("does not report started when delayed progress startup rejects", async () => { + vi.useFakeTimers(); + const onStart = vi + .fn<() => Promise>() + .mockRejectedValueOnce(new Error("draft unavailable")) + .mockResolvedValueOnce(undefined); + const gate = createChannelProgressDraftGate({ onStart }); + + await expect(gate.noteWork()).resolves.toBe(false); + await vi.advanceTimersByTimeAsync(5_000); + + expect(onStart).toHaveBeenCalledTimes(1); + expect(gate.hasStarted).toBe(false); + + await expect(gate.noteWork()).resolves.toBe(true); + + expect(onStart).toHaveBeenCalledTimes(2); + expect(gate.hasStarted).toBe(true); + }); + + it("keeps concurrent progress startup single-flight until onStart resolves", async () => { + vi.useFakeTimers(); + let resolveStart: (() => void) | undefined; + const onStart = vi.fn( + () => + new Promise((resolve) => { + resolveStart = resolve; + }), + ); + const gate = createChannelProgressDraftGate({ onStart }); + + await gate.noteWork(); + const firstStart = gate.noteWork(); + const secondStart = gate.startNow(); + await Promise.resolve(); + + expect(onStart).toHaveBeenCalledTimes(1); + expect(gate.hasStarted).toBe(true); + + resolveStart?.(); + await expect(firstStart).resolves.toBe(true); + await expect(secondStart).resolves.toBeUndefined(); + + expect(onStart).toHaveBeenCalledTimes(1); + expect(gate.hasStarted).toBe(true); + }); + + it("does not report active when cancel wins the startup race", async () => { + vi.useFakeTimers(); + let resolveStart: (() => void) | undefined; + const onStart = vi.fn( + () => + new Promise((resolve) => { + resolveStart = resolve; + }), + ); + const gate = createChannelProgressDraftGate({ onStart }); + + await gate.noteWork(); + const startResult = gate.noteWork(); + await Promise.resolve(); + + expect(onStart).toHaveBeenCalledTimes(1); + gate.cancel(); + + resolveStart?.(); + + await expect(startResult).resolves.toBe(false); + expect(gate.hasStarted).toBe(false); + }); + + it("joins explicit startup before applying the first-work delay", async () => { + vi.useFakeTimers(); + let resolveStart: (() => void) | undefined; + const onStart = vi.fn( + () => + new Promise((resolve) => { + resolveStart = resolve; + }), + ); + const gate = createChannelProgressDraftGate({ onStart }); + + const explicitStart = gate.startNow(); + await Promise.resolve(); + const workDuringStart = gate.noteWork(); + + expect(onStart).toHaveBeenCalledTimes(1); + expect(gate.hasStarted).toBe(true); + + resolveStart?.(); + + await expect(explicitStart).resolves.toBeUndefined(); + await expect(workDuringStart).resolves.toBe(true); + expect(onStart).toHaveBeenCalledTimes(1); + expect(gate.hasStarted).toBe(true); + }); + it("ignores message-like tools for progress draft work", () => { expect(isChannelProgressDraftWorkToolName("message")).toBe(false); expect(isChannelProgressDraftWorkToolName("react")).toBe(false);