From 0459206c40927cecc28f641689b1a27d52077847 Mon Sep 17 00:00:00 2001 From: "clawsweeper[bot]" <274271284+clawsweeper[bot]@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:27:44 -0700 Subject: [PATCH] fix(gateway): preserve rpc abort terminal snapshots Co-authored-by: openclaw-clawsweeper[bot] <280122609+openclaw-clawsweeper[bot]@users.noreply.github.com> --- src/gateway/server-chat.agent-events.test.ts | 21 +++++ src/gateway/server-chat.ts | 2 - .../server-methods/agent-wait-dedupe.test.ts | 65 +++++++++++++++ .../server-methods/agent-wait-dedupe.ts | 9 +- src/gateway/server-methods/agent.test.ts | 82 ++++++++++++++++++- src/gateway/server-methods/agent.ts | 21 +++-- 6 files changed, 185 insertions(+), 15 deletions(-) diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index a2828f255c4..437bd0da421 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -1013,6 +1013,27 @@ describe("agent event handler", () => { resetAgentRunContextForTest(); }); + it("keeps aborted chat run markers through terminal lifecycle cleanup", () => { + const { broadcast, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-aborted", { + sessionKey: "session-aborted", + clientRunId: "client-aborted", + }); + chatRunState.abortedRuns.set("client-aborted", 1_000); + + handler({ + runId: "run-aborted", + seq: 2, + stream: "lifecycle", + ts: 1_500, + data: { phase: "end", aborted: true, stopReason: "rpc" }, + }); + + expect(chatRunState.abortedRuns.has("client-aborted")).toBe(true); + expect(chatRunState.registry.peek("run-aborted")).toBeUndefined(); + expect(chatBroadcastCalls(broadcast)).toHaveLength(0); + }); + it("keeps live session setting metadata at the top level for lifecycle updates", () => { vi.mocked(loadGatewaySessionRow).mockReturnValue({ key: "session-finished", diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 1f31ce8ddf2..dcdefe54eb1 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -341,8 +341,6 @@ export function createAgentEventHandler({ ); } } else { - chatRunState.abortedRuns.delete(clientRunId); - chatRunState.abortedRuns.delete(evt.runId); clearBufferedChatState(clientRunId); if (chatLink) { chatRunState.registry.remove(evt.runId, clientRunId, sessionKey); diff --git a/src/gateway/server-methods/agent-wait-dedupe.test.ts b/src/gateway/server-methods/agent-wait-dedupe.test.ts index ad94f1888cb..0564bc96c80 100644 --- a/src/gateway/server-methods/agent-wait-dedupe.test.ts +++ b/src/gateway/server-methods/agent-wait-dedupe.test.ts @@ -297,6 +297,71 @@ describe("agent wait dedupe helper", () => { }); }); + it("preserves an RPC cancel snapshot when late completion writes the same key", () => { + const dedupe = new Map(); + const runId = "run-cancel-wins"; + + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 100, + payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 }, + }); + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 200, + payload: { runId, status: "ok", endedAt: 200 }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "timeout", + endedAt: 100, + error: undefined, + stopReason: "rpc", + }); + }); + + it("preserves an RPC cancel snapshot when late rejection writes the same chat key", () => { + const dedupe = new Map(); + const runId = "run-cancel-chat-error"; + + setRunEntry({ + dedupe, + kind: "chat", + runId, + ts: 100, + payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 }, + }); + setRunEntry({ + dedupe, + kind: "chat", + runId, + ts: 200, + ok: false, + payload: { runId, status: "error", summary: "late failure", endedAt: 200 }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "timeout", + endedAt: 100, + error: undefined, + stopReason: "rpc", + }); + }); + it("resolves multiple waiters for the same run id", async () => { const dedupe = new Map(); const runId = "run-multi"; diff --git a/src/gateway/server-methods/agent-wait-dedupe.ts b/src/gateway/server-methods/agent-wait-dedupe.ts index b5491044b7a..89d16b65fc8 100644 --- a/src/gateway/server-methods/agent-wait-dedupe.ts +++ b/src/gateway/server-methods/agent-wait-dedupe.ts @@ -235,13 +235,18 @@ export function setGatewayDedupeEntry(params: { key: string; entry: DedupeEntry; }) { + const existing = params.dedupe.get(params.key); + const existingSnapshot = existing ? readTerminalSnapshotFromDedupeEntry(existing) : null; + const incomingSnapshot = readTerminalSnapshotFromDedupeEntry(params.entry); + if (existingSnapshot?.status === "timeout" && existingSnapshot.stopReason === "rpc") { + return; + } params.dedupe.set(params.key, params.entry); const runId = parseRunIdFromDedupeKey(params.key); if (!runId) { return; } - const snapshot = readTerminalSnapshotFromDedupeEntry(params.entry); - if (!snapshot) { + if (!incomingSnapshot) { return; } notifyWaiters(runId); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 72de8ebe1b7..045310c18d7 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -12,6 +12,7 @@ import { resetTaskRegistryForTests, } from "../../tasks/task-registry.js"; import { withTempDir } from "../../test-helpers/temp-dir.js"; +import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js"; import { agentHandlers } from "./agent.js"; import { chatHandlers } from "./chat.js"; import { expectSubagentFollowupReactivation } from "./subagent-followup.test-helpers.js"; @@ -1451,6 +1452,7 @@ describe("gateway agent handler", () => { payloads: [], meta: { durationMs: 100, aborted: true }, }); + const context = makeContext(); await invokeAgent( { @@ -1458,7 +1460,7 @@ describe("gateway agent handler", () => { sessionKey: "agent:main:main", idempotencyKey: "task-registry-agent-run-aborted", }, - { reqId: "task-registry-agent-run-aborted" }, + { context, reqId: "task-registry-agent-run-aborted" }, ); await waitForAssertion(() => { @@ -1468,6 +1470,11 @@ describe("gateway agent handler", () => { status: "timed_out", terminalSummary: "aborted", }); + expect(context.dedupe.get("agent:task-registry-agent-run-aborted")?.payload).toMatchObject({ + runId: "task-registry-agent-run-aborted", + status: "timeout", + summary: "aborted", + }); }); }); }); @@ -1480,6 +1487,7 @@ describe("gateway agent handler", () => { const abortError = new Error("This operation was aborted"); abortError.name = "AbortError"; mocks.agentCommand.mockRejectedValueOnce(abortError); + const context = makeContext(); await invokeAgent( { @@ -1487,7 +1495,7 @@ describe("gateway agent handler", () => { sessionKey: "agent:main:main", idempotencyKey: "task-registry-agent-run-abort-error", }, - { reqId: "task-registry-agent-run-abort-error" }, + { context, reqId: "task-registry-agent-run-abort-error" }, ); await waitForAssertion(() => { @@ -1497,6 +1505,13 @@ describe("gateway agent handler", () => { status: "timed_out", error: "AbortError: This operation was aborted", }); + expect( + context.dedupe.get("agent:task-registry-agent-run-abort-error")?.payload, + ).toMatchObject({ + runId: "task-registry-agent-run-abort-error", + status: "timeout", + summary: "aborted", + }); }); }); }); @@ -2896,6 +2911,69 @@ describe("gateway agent handler chat.abort integration", () => { expect(context.chatAbortControllers.has(runId)).toBe(false); }); + it("keeps the sessions.abort wait snapshot after late agent completion", async () => { + prime(); + let capturedSignal: AbortSignal | undefined; + let resolveRun: + | ((value: { payloads: Array<{ text: string }>; meta: { durationMs: number } }) => void) + | undefined; + mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => { + capturedSignal = opts.abortSignal; + return new Promise((resolve) => { + resolveRun = resolve; + }); + }); + + const context = makeContext(); + const runId = "idem-abort-snapshot-wins"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + expect(capturedSignal?.aborted).toBe(true); + + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `agent:${runId}`, + entry: { + ts: 100, + ok: true, + payload: { + runId, + status: "timeout", + stopReason: "rpc", + endedAt: 100, + }, + }, + }); + + resolveRun?.({ payloads: [{ text: "late ok" }], meta: { durationMs: 1 } }); + + await waitForAssertion(() => { + expect(context.dedupe.get(`agent:${runId}`)?.payload).toMatchObject({ + runId, + status: "timeout", + stopReason: "rpc", + endedAt: 100, + }); + }); + }); + it("chat.abort without runId aborts the active agent run for the sessionKey", async () => { prime(); let capturedSignal: AbortSignal | undefined; diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 17008eb119f..d3ec30084e0 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -413,8 +413,8 @@ function dispatchAgentRunFromGateway(params: { } void agentCommandFromIngress(params.ingressOpts, defaultRuntime, params.context.deps) .then((result) => { + const aborted = result?.meta?.aborted === true; if (shouldTrackTask) { - const aborted = result?.meta?.aborted === true; tryFinalizeTrackedAgentTask({ runId: params.runId, status: aborted ? "timed_out" : "succeeded", @@ -423,8 +423,9 @@ function dispatchAgentRunFromGateway(params: { } const payload = { runId: params.runId, - status: "ok" as const, - summary: "completed", + status: aborted ? ("timeout" as const) : ("ok" as const), + summary: aborted ? "aborted" : "completed", + ...(aborted ? { stopReason: result?.meta?.stopReason ?? "rpc" } : {}), result, }; setGatewayDedupeEntry({ @@ -441,6 +442,7 @@ function dispatchAgentRunFromGateway(params: { params.respond(true, payload, undefined, { runId: params.runId }); }) .catch((err) => { + const aborted = isAbortError(err); if (shouldTrackTask) { const error = String(err); tryFinalizeTrackedAgentTask({ @@ -453,22 +455,23 @@ function dispatchAgentRunFromGateway(params: { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const payload = { runId: params.runId, - status: "error" as const, - summary: String(err), + status: aborted ? ("timeout" as const) : ("error" as const), + summary: aborted ? "aborted" : String(err), + ...(aborted ? { stopReason: "rpc" } : {}), }; setGatewayDedupeEntry({ dedupe: params.context.dedupe, key: `agent:${params.idempotencyKey}`, entry: { ts: Date.now(), - ok: false, + ok: aborted, payload, - error, + ...(aborted ? {} : { error }), }, }); - params.respond(false, payload, error, { + params.respond(aborted, payload, aborted ? undefined : error, { runId: params.runId, - error: formatForLog(err), + ...(aborted ? {} : { error: formatForLog(err) }), }); }) .finally(() => {