fix(channels): keep typing indicators off reply critical path

This commit is contained in:
Ayaan Zaidi
2026-05-01 07:34:30 +05:30
parent 40b0b1bfe0
commit 45b8645079
3 changed files with 50 additions and 10 deletions

View File

@@ -46,6 +46,7 @@ export function createTypingController(params: {
let active = false;
let runComplete = false;
let dispatchIdle = false;
let triggerInFlight = false;
// Important: callbacks (tool/block streaming) can fire late (after the run completed),
// especially when upstream event emitters don't await async listeners.
// Once we stop typing, we "seal" the controller so late events can't restart typing forever.
@@ -120,9 +121,24 @@ export function createTypingController(params: {
});
const triggerTyping = async () => {
await startGuard.run(async () => {
await onReplyStart?.();
});
if (triggerInFlight) {
return;
}
triggerInFlight = true;
try {
await startGuard.run(async () => {
await onReplyStart?.();
});
} catch (err) {
log?.(`typing start failed: ${String(err)}`);
} finally {
triggerInFlight = false;
}
};
const scheduleTyping = async () => {
void triggerTyping();
await Promise.resolve();
};
const typingLoop = createTypingKeepaliveLoop({
@@ -145,7 +161,7 @@ export function createTypingController(params: {
return;
}
started = true;
await triggerTyping();
await scheduleTyping();
};
const maybeStopOnIdle = () => {

View File

@@ -67,10 +67,28 @@ describe("createTypingCallbacks", () => {
});
await callbacks.onReplyStart();
await flushMicrotasks();
expect(onStartError).toHaveBeenCalledTimes(1);
});
it("does not block reply start on a pending typing request", async () => {
let resolveStart!: () => void;
const { start, callbacks } = createTypingHarness({
start: vi.fn(
() =>
new Promise<void>((resolve) => {
resolveStart = resolve;
}),
),
});
await callbacks.onReplyStart();
expect(start).toHaveBeenCalledTimes(1);
resolveStart();
});
it("invokes stop on idle and reports stop errors", async () => {
const { stop, onStopError, callbacks } = createTypingHarness({
stop: vi.fn().mockRejectedValue(new Error("stop")),
@@ -113,6 +131,7 @@ describe("createTypingCallbacks", () => {
start: vi.fn().mockRejectedValue(new Error("gone")),
});
await callbacks.onReplyStart();
await flushMicrotasks();
expect(start).toHaveBeenCalledTimes(1);
expect(onStartError).toHaveBeenCalledTimes(1);
@@ -133,6 +152,7 @@ describe("createTypingCallbacks", () => {
});
await callbacks.onReplyStart();
await flushMicrotasks();
expect(start).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(9_000);
@@ -154,6 +174,7 @@ describe("createTypingCallbacks", () => {
maxConsecutiveFailures: 2,
});
await callbacks.onReplyStart(); // fail
await flushMicrotasks();
await vi.advanceTimersByTimeAsync(3_000); // success
await vi.advanceTimersByTimeAsync(3_000); // fail
await vi.advanceTimersByTimeAsync(3_000); // success

View File

@@ -76,12 +76,15 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi
startGuard.reset();
keepaliveLoop.stop();
clearTtlTimer();
await fireStart();
if (startGuard.isTripped()) {
return;
}
keepaliveLoop.start();
startTtlTimer(); // Start TTL safety timer
const startPromise = fireStart();
void startPromise.then(() => {
if (closed || startGuard.isTripped()) {
return;
}
keepaliveLoop.start();
startTtlTimer();
});
await Promise.resolve();
};
const fireStop = () => {