refactor: centralize reply followup drain lifecycle

This commit is contained in:
Peter Steinberger
2026-05-04 22:25:12 +01:00
parent 86385f72e9
commit 0909df1a4f
5 changed files with 149 additions and 17 deletions

View File

@@ -272,6 +272,7 @@ function createMockReplyOperation(): {
attachBackend: vi.fn(),
detachBackend: vi.fn(),
complete: vi.fn(),
completeThen: vi.fn((afterClear: () => void) => afterClear()),
fail: failMock,
abortByUser: vi.fn(),
abortForRestart: vi.fn(),

View File

@@ -1148,11 +1148,14 @@ export async function runReplyAgent(params: {
throw error;
}
let runFollowupTurn = queuedRunFollowupTurn;
let shouldDrainFollowupsAfterReplyOperationClears = false;
const returnAfterReplyOperationClearsThenDrainFollowups = <T>(value: T): T => {
shouldDrainFollowupsAfterReplyOperationClears = true;
let shouldDrainQueuedFollowupsAfterClear = false;
const returnWithQueuedFollowupDrain = <T>(value: T): T => {
shouldDrainQueuedFollowupsAfterClear = true;
return value;
};
const drainQueuedFollowupsAfterClear = () => {
scheduleFollowupDrain(queueKey, runFollowupTurn);
};
const prePreflightCompactionCount = activeSessionEntry?.compactionCount ?? 0;
let preflightCompactionApplied = false;
@@ -1288,7 +1291,7 @@ export async function runReplyAgent(params: {
if (!replyOperation.result) {
replyOperation.fail("run_failed", new Error("reply operation exited with final payload"));
}
return returnAfterReplyOperationClearsThenDrainFollowups(runOutcome.payload);
return returnWithQueuedFollowupDrain(runOutcome.payload);
}
const {
@@ -1421,7 +1424,7 @@ export async function runReplyAgent(params: {
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if (payloadArray.length === 0) {
return returnAfterReplyOperationClearsThenDrainFollowups(undefined);
return returnWithQueuedFollowupDrain(undefined);
}
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
@@ -1453,7 +1456,7 @@ export async function runReplyAgent(params: {
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;
if (replyPayloads.length === 0) {
return returnAfterReplyOperationClearsThenDrainFollowups(undefined);
return returnWithQueuedFollowupDrain(undefined);
}
const successfulCronAdds = runResult.successfulCronAdds ?? 0;
@@ -1870,7 +1873,7 @@ export async function runReplyAgent(params: {
}
}
const result = returnAfterReplyOperationClearsThenDrainFollowups(
const result = returnWithQueuedFollowupDrain(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
);
@@ -1880,36 +1883,35 @@ export async function runReplyAgent(params: {
replyOperation.result?.kind === "aborted" &&
replyOperation.result.code === "aborted_for_restart"
) {
return returnAfterReplyOperationClearsThenDrainFollowups({
return returnWithQueuedFollowupDrain({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
}
if (replyOperation.result?.kind === "aborted") {
return returnAfterReplyOperationClearsThenDrainFollowups({ text: SILENT_REPLY_TOKEN });
return returnWithQueuedFollowupDrain({ text: SILENT_REPLY_TOKEN });
}
if (error instanceof GatewayDrainingError) {
replyOperation.fail("gateway_draining", error);
return returnAfterReplyOperationClearsThenDrainFollowups({
return returnWithQueuedFollowupDrain({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
}
if (error instanceof CommandLaneClearedError) {
replyOperation.fail("command_lane_cleared", error);
return returnAfterReplyOperationClearsThenDrainFollowups({
return returnWithQueuedFollowupDrain({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
}
replyOperation.fail("run_failed", error);
// Keep the followup queue moving even when an unexpected exception escapes
// the run path; the caller still receives the original error.
returnAfterReplyOperationClearsThenDrainFollowups(undefined);
returnWithQueuedFollowupDrain(undefined);
throw error;
} finally {
replyOperation.complete();
if (shouldDrainFollowupsAfterReplyOperationClears) {
// Same-session follow-up turns create their own ReplyOperation; start them
// only after this run clears the active-run guard.
scheduleFollowupDrain(queueKey, runFollowupTurn);
if (shouldDrainQueuedFollowupsAfterClear) {
replyOperation.completeThen(drainQueuedFollowupsAfterClear);
} else {
replyOperation.complete();
}
blockReplyPipeline?.stop();
typing.markRunComplete();

View File

@@ -66,6 +66,23 @@ describe("reply run registry", () => {
expect(replyRunRegistry.isActive("agent:main:main")).toBe(false);
});
it("runs completeThen callbacks after active state clears", () => {
const operation = createReplyOperation({
sessionKey: "agent:main:main",
sessionId: "session-complete",
resetTriggered: false,
});
const afterClear = vi.fn(() => {
expect(replyRunRegistry.isActive("agent:main:main")).toBe(false);
expect(isReplyRunActiveForSessionId("session-complete")).toBe(false);
});
operation.completeThen(afterClear);
expect(operation.result).toEqual({ kind: "completed" });
expect(afterClear).toHaveBeenCalledTimes(1);
});
it("force-clears a running operation after abort without backend cleanup", async () => {
vi.useFakeTimers();
try {

View File

@@ -54,6 +54,11 @@ export type ReplyOperation = {
attachBackend(handle: ReplyBackendHandle): void;
detachBackend(handle: ReplyBackendHandle): void;
complete(): void;
/**
* Complete the operation, clear active-run state, then run follow-up work.
* Use when the follow-up can create another ReplyOperation for this session.
*/
completeThen(afterClear: () => void): void;
fail(code: Exclude<ReplyOperationFailureCode, "aborted_by_user">, cause?: unknown): void;
abortByUser(): void;
abortForRestart(): void;
@@ -332,6 +337,10 @@ export function createReplyOperation(params: {
}
clearState();
},
completeThen(afterClear) {
operation.complete();
afterClear();
},
fail(code, cause) {
if (!result) {
result = { kind: "failed", code, cause };

View File

@@ -439,6 +439,109 @@ describe("gateway server chat", () => {
}
});
test("chat.send starts the next WebChat turn after the prior internal run finishes", async () => {
const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-"));
try {
testState.sessionStorePath = path.join(sessionDir, "sessions.json");
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
});
const responses: Array<{ id: string; ok: boolean; payload?: unknown; error?: unknown }> = [];
const context = {
loadGatewayModelCatalog: vi.fn<GatewayRequestContext["loadGatewayModelCatalog"]>(),
logGateway: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
},
agentRunSeq: new Map<string, number>(),
chatAbortControllers: new Map(),
chatAbortedRuns: new Map(),
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
chatDeltaLastBroadcastLen: new Map(),
addChatRun: vi.fn(),
removeChatRun: vi.fn(),
broadcast: vi.fn(),
nodeSendToSession: vi.fn(),
registerToolEventRecipient: vi.fn(),
dedupe: new Map(),
} as unknown as GatewayRequestContext;
dispatchInboundMessageMock.mockResolvedValue(undefined);
const { chatHandlers } = await import("./server-methods/chat.js");
const callSend = (id: string, message: string, idempotencyKey: string) =>
chatHandlers["chat.send"]({
req: {
type: "req",
id,
method: "chat.send",
params: {
sessionKey: "main",
message,
idempotencyKey,
},
},
params: {
sessionKey: "main",
message,
idempotencyKey,
},
client: {
connect: {
client: {
id: GATEWAY_CLIENT_NAMES.CONTROL_UI,
mode: GATEWAY_CLIENT_MODES.WEBCHAT,
},
scopes: ["operator.write"],
},
} as never,
isWebchatConnect: () => true,
respond: ((ok, payload, error) => {
responses.push({ id, ok, payload, error });
}) as RespondFn,
context,
});
await callSend("first", "first message", "idem-sequential-a");
await vi.waitFor(() => {
expect(context.removeChatRun).toHaveBeenCalledTimes(1);
}, FAST_WAIT_OPTS);
await callSend("second", "second message", "idem-sequential-b");
await vi.waitFor(() => {
expect(context.removeChatRun).toHaveBeenCalledTimes(2);
}, FAST_WAIT_OPTS);
expect(responses).toContainEqual({
id: "first",
ok: true,
payload: { runId: "idem-sequential-a", status: "started" },
error: undefined,
});
expect(responses).toContainEqual({
id: "second",
ok: true,
payload: { runId: "idem-sequential-b", status: "started" },
error: undefined,
});
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
expect(context.addChatRun).toHaveBeenCalledTimes(2);
} finally {
dispatchInboundMessageMock.mockReset();
testState.sessionStorePath = undefined;
clearConfigCache();
await fs.rm(sessionDir, { recursive: true, force: true });
}
});
test("chat.history backfills claude-cli sessions from Claude project files", async () => {
await withGatewayChatHarness(async ({ ws, createSessionDir }) => {
await connectOk(ws);