diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index c855e21282d..0274293bab8 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -272,6 +272,7 @@ function createMockReplyOperation(): { attachBackend: vi.fn(), detachBackend: vi.fn(), complete: vi.fn(), + completeThen: vi.fn((afterClear: () => void) => afterClear()), fail: failMock, abortByUser: vi.fn(), abortForRestart: vi.fn(), diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 022c13106b7..eddefddae0a 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1148,11 +1148,14 @@ export async function runReplyAgent(params: { throw error; } let runFollowupTurn = queuedRunFollowupTurn; - let shouldDrainFollowupsAfterReplyOperationClears = false; - const returnAfterReplyOperationClearsThenDrainFollowups = (value: T): T => { - shouldDrainFollowupsAfterReplyOperationClears = true; + let shouldDrainQueuedFollowupsAfterClear = false; + const returnWithQueuedFollowupDrain = (value: T): T => { + shouldDrainQueuedFollowupsAfterClear = true; return value; }; + const drainQueuedFollowupsAfterClear = () => { + scheduleFollowupDrain(queueKey, runFollowupTurn); + }; const prePreflightCompactionCount = activeSessionEntry?.compactionCount ?? 0; let preflightCompactionApplied = false; @@ -1288,7 +1291,7 @@ export async function runReplyAgent(params: { if (!replyOperation.result) { replyOperation.fail("run_failed", new Error("reply operation exited with final payload")); } - return returnAfterReplyOperationClearsThenDrainFollowups(runOutcome.payload); + return returnWithQueuedFollowupDrain(runOutcome.payload); } const { @@ -1421,7 +1424,7 @@ export async function runReplyAgent(params: { // Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and // keep the typing indicator stuck. if (payloadArray.length === 0) { - return returnAfterReplyOperationClearsThenDrainFollowups(undefined); + return returnWithQueuedFollowupDrain(undefined); } const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; @@ -1453,7 +1456,7 @@ export async function runReplyAgent(params: { didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip; if (replyPayloads.length === 0) { - return returnAfterReplyOperationClearsThenDrainFollowups(undefined); + return returnWithQueuedFollowupDrain(undefined); } const successfulCronAdds = runResult.successfulCronAdds ?? 0; @@ -1870,7 +1873,7 @@ export async function runReplyAgent(params: { } } - const result = returnAfterReplyOperationClearsThenDrainFollowups( + const result = returnWithQueuedFollowupDrain( finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, ); @@ -1880,36 +1883,35 @@ export async function runReplyAgent(params: { replyOperation.result?.kind === "aborted" && replyOperation.result.code === "aborted_for_restart" ) { - return returnAfterReplyOperationClearsThenDrainFollowups({ + return returnWithQueuedFollowupDrain({ text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", }); } if (replyOperation.result?.kind === "aborted") { - return returnAfterReplyOperationClearsThenDrainFollowups({ text: SILENT_REPLY_TOKEN }); + return returnWithQueuedFollowupDrain({ text: SILENT_REPLY_TOKEN }); } if (error instanceof GatewayDrainingError) { replyOperation.fail("gateway_draining", error); - return returnAfterReplyOperationClearsThenDrainFollowups({ + return returnWithQueuedFollowupDrain({ text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", }); } if (error instanceof CommandLaneClearedError) { replyOperation.fail("command_lane_cleared", error); - return returnAfterReplyOperationClearsThenDrainFollowups({ + return returnWithQueuedFollowupDrain({ text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", }); } replyOperation.fail("run_failed", error); // Keep the followup queue moving even when an unexpected exception escapes // the run path; the caller still receives the original error. - returnAfterReplyOperationClearsThenDrainFollowups(undefined); + returnWithQueuedFollowupDrain(undefined); throw error; } finally { - replyOperation.complete(); - if (shouldDrainFollowupsAfterReplyOperationClears) { - // Same-session follow-up turns create their own ReplyOperation; start them - // only after this run clears the active-run guard. - scheduleFollowupDrain(queueKey, runFollowupTurn); + if (shouldDrainQueuedFollowupsAfterClear) { + replyOperation.completeThen(drainQueuedFollowupsAfterClear); + } else { + replyOperation.complete(); } blockReplyPipeline?.stop(); typing.markRunComplete(); diff --git a/src/auto-reply/reply/reply-run-registry.test.ts b/src/auto-reply/reply/reply-run-registry.test.ts index 848cb368f21..59f89c96b67 100644 --- a/src/auto-reply/reply/reply-run-registry.test.ts +++ b/src/auto-reply/reply/reply-run-registry.test.ts @@ -66,6 +66,23 @@ describe("reply run registry", () => { expect(replyRunRegistry.isActive("agent:main:main")).toBe(false); }); + it("runs completeThen callbacks after active state clears", () => { + const operation = createReplyOperation({ + sessionKey: "agent:main:main", + sessionId: "session-complete", + resetTriggered: false, + }); + const afterClear = vi.fn(() => { + expect(replyRunRegistry.isActive("agent:main:main")).toBe(false); + expect(isReplyRunActiveForSessionId("session-complete")).toBe(false); + }); + + operation.completeThen(afterClear); + + expect(operation.result).toEqual({ kind: "completed" }); + expect(afterClear).toHaveBeenCalledTimes(1); + }); + it("force-clears a running operation after abort without backend cleanup", async () => { vi.useFakeTimers(); try { diff --git a/src/auto-reply/reply/reply-run-registry.ts b/src/auto-reply/reply/reply-run-registry.ts index f0c0127c872..4a37f4b7025 100644 --- a/src/auto-reply/reply/reply-run-registry.ts +++ b/src/auto-reply/reply/reply-run-registry.ts @@ -54,6 +54,11 @@ export type ReplyOperation = { attachBackend(handle: ReplyBackendHandle): void; detachBackend(handle: ReplyBackendHandle): void; complete(): void; + /** + * Complete the operation, clear active-run state, then run follow-up work. + * Use when the follow-up can create another ReplyOperation for this session. + */ + completeThen(afterClear: () => void): void; fail(code: Exclude, cause?: unknown): void; abortByUser(): void; abortForRestart(): void; @@ -332,6 +337,10 @@ export function createReplyOperation(params: { } clearState(); }, + completeThen(afterClear) { + operation.complete(); + afterClear(); + }, fail(code, cause) { if (!result) { result = { kind: "failed", code, cause }; diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index eab2c1eda32..35af73e73c0 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -439,6 +439,109 @@ describe("gateway server chat", () => { } }); + test("chat.send starts the next WebChat turn after the prior internal run finishes", async () => { + const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + try { + testState.sessionStorePath = path.join(sessionDir, "sessions.json"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + }); + + const responses: Array<{ id: string; ok: boolean; payload?: unknown; error?: unknown }> = []; + const context = { + loadGatewayModelCatalog: vi.fn(), + logGateway: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, + agentRunSeq: new Map(), + chatAbortControllers: new Map(), + chatAbortedRuns: new Map(), + chatRunBuffers: new Map(), + chatDeltaSentAt: new Map(), + chatDeltaLastBroadcastLen: new Map(), + addChatRun: vi.fn(), + removeChatRun: vi.fn(), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + registerToolEventRecipient: vi.fn(), + dedupe: new Map(), + } as unknown as GatewayRequestContext; + dispatchInboundMessageMock.mockResolvedValue(undefined); + + const { chatHandlers } = await import("./server-methods/chat.js"); + const callSend = (id: string, message: string, idempotencyKey: string) => + chatHandlers["chat.send"]({ + req: { + type: "req", + id, + method: "chat.send", + params: { + sessionKey: "main", + message, + idempotencyKey, + }, + }, + params: { + sessionKey: "main", + message, + idempotencyKey, + }, + client: { + connect: { + client: { + id: GATEWAY_CLIENT_NAMES.CONTROL_UI, + mode: GATEWAY_CLIENT_MODES.WEBCHAT, + }, + scopes: ["operator.write"], + }, + } as never, + isWebchatConnect: () => true, + respond: ((ok, payload, error) => { + responses.push({ id, ok, payload, error }); + }) as RespondFn, + context, + }); + + await callSend("first", "first message", "idem-sequential-a"); + await vi.waitFor(() => { + expect(context.removeChatRun).toHaveBeenCalledTimes(1); + }, FAST_WAIT_OPTS); + + await callSend("second", "second message", "idem-sequential-b"); + await vi.waitFor(() => { + expect(context.removeChatRun).toHaveBeenCalledTimes(2); + }, FAST_WAIT_OPTS); + + expect(responses).toContainEqual({ + id: "first", + ok: true, + payload: { runId: "idem-sequential-a", status: "started" }, + error: undefined, + }); + expect(responses).toContainEqual({ + id: "second", + ok: true, + payload: { runId: "idem-sequential-b", status: "started" }, + error: undefined, + }); + expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2); + expect(context.addChatRun).toHaveBeenCalledTimes(2); + } finally { + dispatchInboundMessageMock.mockReset(); + testState.sessionStorePath = undefined; + clearConfigCache(); + await fs.rm(sessionDir, { recursive: true, force: true }); + } + }); + test("chat.history backfills claude-cli sessions from Claude project files", async () => { await withGatewayChatHarness(async ({ ws, createSessionDir }) => { await connectOk(ws);