From a35dcf608e8ec82d76cfc7759be27cc07f4bcf66 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 23 Mar 2026 09:45:21 -0700 Subject: [PATCH] fix(reply): refresh followup drain callbacks --- src/auto-reply/reply/agent-runner.ts | 14 ++++++++- src/auto-reply/reply/queue/drain.ts | 9 +++++- src/auto-reply/reply/queue/enqueue.ts | 6 +++- src/auto-reply/reply/reply-flow.test.ts | 39 +++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 659045a3136..ecd1ff31207 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -215,13 +215,25 @@ export async function runReplyAgent(params: { queueMode: resolvedQueue.mode, }); + const queuedRunFollowupTurn = createFollowupRunner({ + opts, + typing, + typingMode, + sessionEntry: activeSessionEntry, + sessionStore: activeSessionStore, + sessionKey, + storePath, + defaultModel, + agentCfgContextTokens, + }); + if (activeRunQueueAction === "drop") { typing.cleanup(); return undefined; } if (activeRunQueueAction === "enqueue-followup") { - enqueueFollowupRun(queueKey, followupRun, resolvedQueue); + enqueueFollowupRun(queueKey, followupRun, resolvedQueue, "message-id", queuedRunFollowupTurn); await touchActiveSessionEntry(); typing.cleanup(); return undefined; diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 1e2fb33e4e0..a178723ba0f 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -22,6 +22,13 @@ const FOLLOWUP_RUN_CALLBACKS = resolveGlobalMap Pr FOLLOWUP_DRAIN_CALLBACKS_KEY, ); +export function rememberFollowupDrainCallback( + key: string, + runFollowup: (run: FollowupRun) => Promise, +): void { + FOLLOWUP_RUN_CALLBACKS.set(key, runFollowup); +} + export function clearFollowupDrainCallback(key: string): void { FOLLOWUP_RUN_CALLBACKS.delete(key); } @@ -78,7 +85,7 @@ export function scheduleFollowupDrain( } // Cache callback only when a drain actually starts. Avoid keeping stale // callbacks around from finalize calls where no queue work is pending. - FOLLOWUP_RUN_CALLBACKS.set(key, runFollowup); + rememberFollowupDrainCallback(key, runFollowup); void (async () => { try { const collectState = { forceIndividualCollect: false }; diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index ed37c1d2904..fca25b8f79d 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,6 +1,6 @@ import { resolveGlobalDedupeCache } from "../../../infra/dedupe.js"; import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; -import { kickFollowupDrainIfIdle } from "./drain.js"; +import { kickFollowupDrainIfIdle, rememberFollowupDrainCallback } from "./drain.js"; import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; @@ -59,6 +59,7 @@ export function enqueueFollowupRun( run: FollowupRun, settings: QueueSettings, dedupeMode: QueueDedupeMode = "message-id", + runFollowup?: (run: FollowupRun) => Promise, ): boolean { const queue = getFollowupQueue(key, settings); const recentMessageIdKey = dedupeMode !== "none" ? buildRecentMessageIdKey(run, key) : undefined; @@ -92,6 +93,9 @@ export function enqueueFollowupRun( if (recentMessageIdKey) { RECENT_QUEUE_MESSAGE_IDS.check(recentMessageIdKey); } + if (runFollowup) { + rememberFollowupDrainCallback(key, runFollowup); + } // If drain finished and deleted the queue before this item arrived, a new queue // object was created (draining: false) but nobody scheduled a drain for it. // Use the cached callback to restart the drain now. diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 21a22faf8b2..41e639264d7 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -1496,6 +1496,45 @@ describe("followup queue drain restart after idle window", () => { expect(calls[1]?.prompt).toBe("after-idle"); }); + it("restarts an idle drain with the newest followup callback", async () => { + const key = `test-idle-window-fresh-callback-${Date.now()}`; + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; + const staleCalls: FollowupRun[] = []; + const freshCalls: FollowupRun[] = []; + const firstProcessed = createDeferred(); + const secondProcessed = createDeferred(); + + const staleFollowup = async (run: FollowupRun) => { + staleCalls.push(run); + if (staleCalls.length === 1) { + firstProcessed.resolve(); + } + }; + const freshFollowup = async (run: FollowupRun) => { + freshCalls.push(run); + secondProcessed.resolve(); + }; + + enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings); + scheduleFollowupDrain(key, staleFollowup); + await firstProcessed.promise; + await new Promise((resolve) => setImmediate(resolve)); + + enqueueFollowupRun( + key, + createRun({ prompt: "after-idle" }), + settings, + "message-id", + freshFollowup, + ); + await secondProcessed.promise; + + expect(staleCalls).toHaveLength(1); + expect(staleCalls[0]?.prompt).toBe("before-idle"); + expect(freshCalls).toHaveLength(1); + expect(freshCalls[0]?.prompt).toBe("after-idle"); + }); + it("restarts an idle drain across distinct enqueue and drain module instances", async () => { const drainA = await importFreshModule( import.meta.url,