From a9817a5f97bd68680b009f40bb63401859fb00b9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 4 May 2026 22:00:59 +0100 Subject: [PATCH] fix(gateway): clear reply run before followup drain --- CHANGELOG.md | 1 + .../reply/agent-runner-helpers.test.ts | 20 +------ src/auto-reply/reply/agent-runner-helpers.ts | 10 ---- .../agent-runner.misc.runreplyagent.test.ts | 54 ++++++++++++++++++- src/auto-reply/reply/agent-runner.ts | 52 +++++++++--------- 5 files changed, 82 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f9c67ad290..e88dc969c0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Diagnostics: grant the internal diagnostics event bus to official installed diagnostics exporter plugins, so npm-installed `@openclaw/diagnostics-prometheus` can emit metrics without broadening the capability to arbitrary global plugins. Fixes #76628. Thanks @RayWoo. - Browser: enforce strict SSRF current-URL checks before existing-session screenshots, matching existing-session snapshot handling. Thanks @vincentkoc. - Active Memory: give timeout partial transcript recovery enough abort-settle headroom so temporary recall summaries are returned before cleanup. Thanks @vincentkoc. +- Gateway/chat: clear the active reply-run guard before draining queued same-session follow-up turns, so sequential `chat.send` calls no longer trip `ReplyRunAlreadyActiveError` every other request. Fixes #77485. Thanks @bws14email. - Doctor/config: restore legacy group chat config migrations for `routing.allowFrom`, `routing.groupChat.*`, and `channels.telegram.requireMention` so upgrades keep WhatsApp, Telegram, and iMessage group mention gates and history settings instead of leaving configs invalid or silently blocked. Thanks @scoootscooob. - CLI/update: make package-update follow-up processes write completion results and exit explicitly, so Windows packaged upgrades do not hang after the new package finishes post-core plugin work. Thanks @vincentkoc. - Release validation: skip Slack live QA unless Slack credentials are explicitly configured, so release gates can keep proving non-Slack surfaces while Slack is still local and credential-gated. Thanks @vincentkoc. diff --git a/src/auto-reply/reply/agent-runner-helpers.test.ts b/src/auto-reply/reply/agent-runner-helpers.test.ts index 1d7ceaf61ff..d478af66985 100644 --- a/src/auto-reply/reply/agent-runner-helpers.test.ts +++ b/src/auto-reply/reply/agent-runner-helpers.test.ts @@ -4,8 +4,7 @@ import type { TypingSignaler } from "./typing-mode.js"; const hoisted = vi.hoisted(() => { const loadSessionStoreMock = vi.fn(); - const scheduleFollowupDrainMock = vi.fn(); - return { loadSessionStoreMock, scheduleFollowupDrainMock }; + return { loadSessionStoreMock }; }); vi.mock("../../config/sessions.js", async () => { @@ -18,18 +17,9 @@ vi.mock("../../config/sessions.js", async () => { }; }); -vi.mock("./queue.js", async () => { - const actual = await vi.importActual("./queue.js"); - return { - ...actual, - scheduleFollowupDrain: (...args: unknown[]) => hoisted.scheduleFollowupDrainMock(...args), - }; -}); - const { createShouldEmitToolOutput, createShouldEmitToolResult, - finalizeWithFollowup, isAudioPayload, signalTypingIfNeeded, } = await import("./agent-runner-helpers.js"); @@ -38,7 +28,6 @@ describe("agent runner helpers", () => { beforeEach(() => { vi.useRealTimers(); hoisted.loadSessionStoreMock.mockReset(); - hoisted.scheduleFollowupDrainMock.mockReset(); }); it("detects audio payloads from mediaUrl/mediaUrls", () => { @@ -119,13 +108,6 @@ describe("agent runner helpers", () => { expect(fallbackFull()).toBe(true); }); - it("schedules followup drain and returns the original value", () => { - const runFollowupTurn = vi.fn(); - const value = { ok: true }; - expect(finalizeWithFollowup(value, "queue-key", runFollowupTurn)).toBe(value); - expect(hoisted.scheduleFollowupDrainMock).toHaveBeenCalledWith("queue-key", runFollowupTurn); - }); - it("signals typing only when any payload has text or media", async () => { const signalRunStart = vi.fn().mockResolvedValue(undefined); const typingSignals = { signalRunStart } as unknown as TypingSignaler; diff --git a/src/auto-reply/reply/agent-runner-helpers.ts b/src/auto-reply/reply/agent-runner-helpers.ts index 00cd20ce43c..12a35381278 100644 --- a/src/auto-reply/reply/agent-runner-helpers.ts +++ b/src/auto-reply/reply/agent-runner-helpers.ts @@ -6,7 +6,6 @@ import { loadSessionStore } from "../../config/sessions.js"; import { isAudioFileName } from "../../media/mime.js"; import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js"; import type { ReplyPayload } from "../types.js"; -import { scheduleFollowupDrain } from "./queue.js"; import type { TypingSignaler } from "./typing-mode.js"; const hasAudioMedia = (urls?: string[]): boolean => @@ -78,15 +77,6 @@ export const createShouldEmitToolOutput = (params: VerboseGateParams): (() => bo return createVerboseGate(params, (level) => level === "full"); }; -export const finalizeWithFollowup = ( - value: T, - queueKey: string, - runFollowupTurn: Parameters[1], -): T => { - scheduleFollowupDrain(queueKey, runFollowupTurn); - return value; -}; - export const signalTypingIfNeeded = async ( payloads: ReplyPayload[], typingSignals: TypingSignaler, diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index 3fce0ad29a7..c1d0ba7cfd6 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -22,7 +22,8 @@ import { } from "../../plugins/memory-state.js"; import type { TemplateContext } from "../templating.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; -import { __testing as replyRunRegistryTesting } from "./reply-run-registry.js"; +import { scheduleFollowupDrain } from "./queue.js"; +import { __testing as replyRunRegistryTesting, replyRunRegistry } from "./reply-run-registry.js"; import { createMockTypingController } from "./test-helpers.js"; function createCliBackendTestConfig() { @@ -165,6 +166,7 @@ beforeEach(() => { clearSessionQueuesMock.mockReturnValue({ followupCleared: 0, laneCleared: 0, keys: [] }); refreshQueuedFollowupSessionMock.mockReset(); refreshQueuedFollowupSessionMock.mockResolvedValue(undefined); + vi.mocked(scheduleFollowupDrain).mockReset(); loadCronStoreMock.mockClear(); // Default: no cron jobs in store. loadCronStoreMock.mockResolvedValue({ version: 1, jobs: [] }); @@ -326,6 +328,56 @@ describe("runReplyAgent auto-compaction token update", () => { expect(stored[sessionKey].totalTokens).toBe(55_000); }); + it("starts queued followup drain only after clearing the active reply operation", async () => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 50_000, + }; + runEmbeddedPiAgentMock.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { agentMeta: {} }, + }); + + vi.mocked(scheduleFollowupDrain).mockImplementation((key) => { + expect(key).toBe(sessionKey); + expect(replyRunRegistry.get(sessionKey)).toBeUndefined(); + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath: "", + sessionEntry, + }); + + const result = await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: sessionKey, + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(result).toMatchObject({ text: "ok" }); + expect(scheduleFollowupDrain).toHaveBeenCalledTimes(1); + }); + it("reports live diagnostic context from promptTokens, not provider usage totals", async () => { const { usageEvent } = await runBaseReplyWithAgentMeta({ tmpPrefix: "openclaw-usage-diagnostic-", diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 2485061d593..022c13106b7 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -46,7 +46,6 @@ import { runAgentTurnWithFallback } from "./agent-runner-execution.js"; import { createShouldEmitToolOutput, createShouldEmitToolResult, - finalizeWithFollowup, isAudioPayload, signalTypingIfNeeded, } from "./agent-runner-helpers.js"; @@ -71,6 +70,7 @@ import { enqueueFollowupRun, refreshQueuedFollowupSession, resolvePiSteeringModeForQueueMode, + scheduleFollowupDrain, type FollowupRun, type QueueSettings, } from "./queue.js"; @@ -1064,7 +1064,7 @@ export async function runReplyAgent(params: { // the followup queue idle if the original run already finished. const queuedBehindActiveRun = isRunActive?.() === true; if (!queuedBehindActiveRun) { - finalizeWithFollowup(undefined, queueKey, queuedRunFollowupTurn); + scheduleFollowupDrain(queueKey, queuedRunFollowupTurn); } await touchActiveSessionEntry(); if (queuedBehindActiveRun) { @@ -1148,6 +1148,11 @@ export async function runReplyAgent(params: { throw error; } let runFollowupTurn = queuedRunFollowupTurn; + let shouldDrainFollowupsAfterReplyOperationClears = false; + const returnAfterReplyOperationClearsThenDrainFollowups = (value: T): T => { + shouldDrainFollowupsAfterReplyOperationClears = true; + return value; + }; const prePreflightCompactionCount = activeSessionEntry?.compactionCount ?? 0; let preflightCompactionApplied = false; @@ -1283,7 +1288,7 @@ export async function runReplyAgent(params: { if (!replyOperation.result) { replyOperation.fail("run_failed", new Error("reply operation exited with final payload")); } - return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn); + return returnAfterReplyOperationClearsThenDrainFollowups(runOutcome.payload); } const { @@ -1416,7 +1421,7 @@ export async function runReplyAgent(params: { // Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and // keep the typing indicator stuck. if (payloadArray.length === 0) { - return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); + return returnAfterReplyOperationClearsThenDrainFollowups(undefined); } const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; @@ -1448,7 +1453,7 @@ export async function runReplyAgent(params: { didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip; if (replyPayloads.length === 0) { - return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); + return returnAfterReplyOperationClearsThenDrainFollowups(undefined); } const successfulCronAdds = runResult.successfulCronAdds ?? 0; @@ -1865,10 +1870,8 @@ export async function runReplyAgent(params: { } } - const result = finalizeWithFollowup( + const result = returnAfterReplyOperationClearsThenDrainFollowups( finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, - queueKey, - runFollowupTurn, ); return result; @@ -1877,38 +1880,37 @@ export async function runReplyAgent(params: { replyOperation.result?.kind === "aborted" && replyOperation.result.code === "aborted_for_restart" ) { - return finalizeWithFollowup( - { text: "⚠️ Gateway is restarting. Please wait a few seconds and try again." }, - queueKey, - runFollowupTurn, - ); + return returnAfterReplyOperationClearsThenDrainFollowups({ + text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + }); } if (replyOperation.result?.kind === "aborted") { - return finalizeWithFollowup({ text: SILENT_REPLY_TOKEN }, queueKey, runFollowupTurn); + return returnAfterReplyOperationClearsThenDrainFollowups({ text: SILENT_REPLY_TOKEN }); } if (error instanceof GatewayDrainingError) { replyOperation.fail("gateway_draining", error); - return finalizeWithFollowup( - { text: "⚠️ Gateway is restarting. Please wait a few seconds and try again." }, - queueKey, - runFollowupTurn, - ); + return returnAfterReplyOperationClearsThenDrainFollowups({ + text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + }); } if (error instanceof CommandLaneClearedError) { replyOperation.fail("command_lane_cleared", error); - return finalizeWithFollowup( - { text: "⚠️ Gateway is restarting. Please wait a few seconds and try again." }, - queueKey, - runFollowupTurn, - ); + return returnAfterReplyOperationClearsThenDrainFollowups({ + text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + }); } replyOperation.fail("run_failed", error); // Keep the followup queue moving even when an unexpected exception escapes // the run path; the caller still receives the original error. - finalizeWithFollowup(undefined, queueKey, runFollowupTurn); + returnAfterReplyOperationClearsThenDrainFollowups(undefined); throw error; } finally { replyOperation.complete(); + if (shouldDrainFollowupsAfterReplyOperationClears) { + // Same-session follow-up turns create their own ReplyOperation; start them + // only after this run clears the active-run guard. + scheduleFollowupDrain(queueKey, runFollowupTurn); + } blockReplyPipeline?.stop(); typing.markRunComplete(); // Safety net: the dispatcher's onIdle callback normally fires