fix(gateway): clear reply run before followup drain

This commit is contained in:
Peter Steinberger
2026-05-04 22:00:59 +01:00
parent e2eb8e3cfe
commit a9817a5f97
5 changed files with 82 additions and 55 deletions

View File

@@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai
- Diagnostics: grant the internal diagnostics event bus to official installed diagnostics exporter plugins, so npm-installed `@openclaw/diagnostics-prometheus` can emit metrics without broadening the capability to arbitrary global plugins. Fixes #76628. Thanks @RayWoo.
- Browser: enforce strict SSRF current-URL checks before existing-session screenshots, matching existing-session snapshot handling. Thanks @vincentkoc.
- Active Memory: give timeout partial transcript recovery enough abort-settle headroom so temporary recall summaries are returned before cleanup. Thanks @vincentkoc.
- Gateway/chat: clear the active reply-run guard before draining queued same-session follow-up turns, so sequential `chat.send` calls no longer trip `ReplyRunAlreadyActiveError` every other request. Fixes #77485. Thanks @bws14email.
- Doctor/config: restore legacy group chat config migrations for `routing.allowFrom`, `routing.groupChat.*`, and `channels.telegram.requireMention` so upgrades keep WhatsApp, Telegram, and iMessage group mention gates and history settings instead of leaving configs invalid or silently blocked. Thanks @scoootscooob.
- CLI/update: make package-update follow-up processes write completion results and exit explicitly, so Windows packaged upgrades do not hang after the new package finishes post-core plugin work. Thanks @vincentkoc.
- Release validation: skip Slack live QA unless Slack credentials are explicitly configured, so release gates can keep proving non-Slack surfaces while Slack is still local and credential-gated. Thanks @vincentkoc.

View File

@@ -4,8 +4,7 @@ import type { TypingSignaler } from "./typing-mode.js";
const hoisted = vi.hoisted(() => {
const loadSessionStoreMock = vi.fn();
const scheduleFollowupDrainMock = vi.fn();
return { loadSessionStoreMock, scheduleFollowupDrainMock };
return { loadSessionStoreMock };
});
vi.mock("../../config/sessions.js", async () => {
@@ -18,18 +17,9 @@ vi.mock("../../config/sessions.js", async () => {
};
});
vi.mock("./queue.js", async () => {
const actual = await vi.importActual<typeof import("./queue.js")>("./queue.js");
return {
...actual,
scheduleFollowupDrain: (...args: unknown[]) => hoisted.scheduleFollowupDrainMock(...args),
};
});
const {
createShouldEmitToolOutput,
createShouldEmitToolResult,
finalizeWithFollowup,
isAudioPayload,
signalTypingIfNeeded,
} = await import("./agent-runner-helpers.js");
@@ -38,7 +28,6 @@ describe("agent runner helpers", () => {
beforeEach(() => {
vi.useRealTimers();
hoisted.loadSessionStoreMock.mockReset();
hoisted.scheduleFollowupDrainMock.mockReset();
});
it("detects audio payloads from mediaUrl/mediaUrls", () => {
@@ -119,13 +108,6 @@ describe("agent runner helpers", () => {
expect(fallbackFull()).toBe(true);
});
it("schedules followup drain and returns the original value", () => {
const runFollowupTurn = vi.fn();
const value = { ok: true };
expect(finalizeWithFollowup(value, "queue-key", runFollowupTurn)).toBe(value);
expect(hoisted.scheduleFollowupDrainMock).toHaveBeenCalledWith("queue-key", runFollowupTurn);
});
it("signals typing only when any payload has text or media", async () => {
const signalRunStart = vi.fn().mockResolvedValue(undefined);
const typingSignals = { signalRunStart } as unknown as TypingSignaler;

View File

@@ -6,7 +6,6 @@ import { loadSessionStore } from "../../config/sessions.js";
import { isAudioFileName } from "../../media/mime.js";
import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js";
import type { ReplyPayload } from "../types.js";
import { scheduleFollowupDrain } from "./queue.js";
import type { TypingSignaler } from "./typing-mode.js";
const hasAudioMedia = (urls?: string[]): boolean =>
@@ -78,15 +77,6 @@ export const createShouldEmitToolOutput = (params: VerboseGateParams): (() => bo
return createVerboseGate(params, (level) => level === "full");
};
export const finalizeWithFollowup = <T>(
value: T,
queueKey: string,
runFollowupTurn: Parameters<typeof scheduleFollowupDrain>[1],
): T => {
scheduleFollowupDrain(queueKey, runFollowupTurn);
return value;
};
export const signalTypingIfNeeded = async (
payloads: ReplyPayload[],
typingSignals: TypingSignaler,

View File

@@ -22,7 +22,8 @@ import {
} from "../../plugins/memory-state.js";
import type { TemplateContext } from "../templating.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import { __testing as replyRunRegistryTesting } from "./reply-run-registry.js";
import { scheduleFollowupDrain } from "./queue.js";
import { __testing as replyRunRegistryTesting, replyRunRegistry } from "./reply-run-registry.js";
import { createMockTypingController } from "./test-helpers.js";
function createCliBackendTestConfig() {
@@ -165,6 +166,7 @@ beforeEach(() => {
clearSessionQueuesMock.mockReturnValue({ followupCleared: 0, laneCleared: 0, keys: [] });
refreshQueuedFollowupSessionMock.mockReset();
refreshQueuedFollowupSessionMock.mockResolvedValue(undefined);
vi.mocked(scheduleFollowupDrain).mockReset();
loadCronStoreMock.mockClear();
// Default: no cron jobs in store.
loadCronStoreMock.mockResolvedValue({ version: 1, jobs: [] });
@@ -326,6 +328,56 @@ describe("runReplyAgent auto-compaction token update", () => {
expect(stored[sessionKey].totalTokens).toBe(55_000);
});
it("starts queued followup drain only after clearing the active reply operation", async () => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 50_000,
};
runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: { agentMeta: {} },
});
vi.mocked(scheduleFollowupDrain).mockImplementation((key) => {
expect(key).toBe(sessionKey);
expect(replyRunRegistry.get(sessionKey)).toBeUndefined();
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath: "",
sessionEntry,
});
const result = await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: sessionKey,
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
defaultModel: "anthropic/claude-opus-4-6",
agentCfgContextTokens: 200_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(result).toMatchObject({ text: "ok" });
expect(scheduleFollowupDrain).toHaveBeenCalledTimes(1);
});
it("reports live diagnostic context from promptTokens, not provider usage totals", async () => {
const { usageEvent } = await runBaseReplyWithAgentMeta({
tmpPrefix: "openclaw-usage-diagnostic-",

View File

@@ -46,7 +46,6 @@ import { runAgentTurnWithFallback } from "./agent-runner-execution.js";
import {
createShouldEmitToolOutput,
createShouldEmitToolResult,
finalizeWithFollowup,
isAudioPayload,
signalTypingIfNeeded,
} from "./agent-runner-helpers.js";
@@ -71,6 +70,7 @@ import {
enqueueFollowupRun,
refreshQueuedFollowupSession,
resolvePiSteeringModeForQueueMode,
scheduleFollowupDrain,
type FollowupRun,
type QueueSettings,
} from "./queue.js";
@@ -1064,7 +1064,7 @@ export async function runReplyAgent(params: {
// the followup queue idle if the original run already finished.
const queuedBehindActiveRun = isRunActive?.() === true;
if (!queuedBehindActiveRun) {
finalizeWithFollowup(undefined, queueKey, queuedRunFollowupTurn);
scheduleFollowupDrain(queueKey, queuedRunFollowupTurn);
}
await touchActiveSessionEntry();
if (queuedBehindActiveRun) {
@@ -1148,6 +1148,11 @@ export async function runReplyAgent(params: {
throw error;
}
let runFollowupTurn = queuedRunFollowupTurn;
let shouldDrainFollowupsAfterReplyOperationClears = false;
const returnAfterReplyOperationClearsThenDrainFollowups = <T>(value: T): T => {
shouldDrainFollowupsAfterReplyOperationClears = true;
return value;
};
const prePreflightCompactionCount = activeSessionEntry?.compactionCount ?? 0;
let preflightCompactionApplied = false;
@@ -1283,7 +1288,7 @@ export async function runReplyAgent(params: {
if (!replyOperation.result) {
replyOperation.fail("run_failed", new Error("reply operation exited with final payload"));
}
return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn);
return returnAfterReplyOperationClearsThenDrainFollowups(runOutcome.payload);
}
const {
@@ -1416,7 +1421,7 @@ export async function runReplyAgent(params: {
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if (payloadArray.length === 0) {
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
return returnAfterReplyOperationClearsThenDrainFollowups(undefined);
}
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
@@ -1448,7 +1453,7 @@ export async function runReplyAgent(params: {
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;
if (replyPayloads.length === 0) {
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
return returnAfterReplyOperationClearsThenDrainFollowups(undefined);
}
const successfulCronAdds = runResult.successfulCronAdds ?? 0;
@@ -1865,10 +1870,8 @@ export async function runReplyAgent(params: {
}
}
const result = finalizeWithFollowup(
const result = returnAfterReplyOperationClearsThenDrainFollowups(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
queueKey,
runFollowupTurn,
);
return result;
@@ -1877,38 +1880,37 @@ export async function runReplyAgent(params: {
replyOperation.result?.kind === "aborted" &&
replyOperation.result.code === "aborted_for_restart"
) {
return finalizeWithFollowup(
{ text: "⚠️ Gateway is restarting. Please wait a few seconds and try again." },
queueKey,
runFollowupTurn,
);
return returnAfterReplyOperationClearsThenDrainFollowups({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
}
if (replyOperation.result?.kind === "aborted") {
return finalizeWithFollowup({ text: SILENT_REPLY_TOKEN }, queueKey, runFollowupTurn);
return returnAfterReplyOperationClearsThenDrainFollowups({ text: SILENT_REPLY_TOKEN });
}
if (error instanceof GatewayDrainingError) {
replyOperation.fail("gateway_draining", error);
return finalizeWithFollowup(
{ text: "⚠️ Gateway is restarting. Please wait a few seconds and try again." },
queueKey,
runFollowupTurn,
);
return returnAfterReplyOperationClearsThenDrainFollowups({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
}
if (error instanceof CommandLaneClearedError) {
replyOperation.fail("command_lane_cleared", error);
return finalizeWithFollowup(
{ text: "⚠️ Gateway is restarting. Please wait a few seconds and try again." },
queueKey,
runFollowupTurn,
);
return returnAfterReplyOperationClearsThenDrainFollowups({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
}
replyOperation.fail("run_failed", error);
// Keep the followup queue moving even when an unexpected exception escapes
// the run path; the caller still receives the original error.
finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
returnAfterReplyOperationClearsThenDrainFollowups(undefined);
throw error;
} finally {
replyOperation.complete();
if (shouldDrainFollowupsAfterReplyOperationClears) {
// Same-session follow-up turns create their own ReplyOperation; start them
// only after this run clears the active-run guard.
scheduleFollowupDrain(queueKey, runFollowupTurn);
}
blockReplyPipeline?.stop();
typing.markRunComplete();
// Safety net: the dispatcher's onIdle callback normally fires