fix(gateway): preserve rpc abort terminal snapshots

Co-authored-by: openclaw-clawsweeper[bot] <280122609+openclaw-clawsweeper[bot]@users.noreply.github.com>
This commit is contained in:
clawsweeper[bot]
2026-04-29 22:27:44 -07:00
committed by GitHub
parent a34ba362c6
commit 0459206c40
6 changed files with 185 additions and 15 deletions

View File

@@ -1013,6 +1013,27 @@ describe("agent event handler", () => {
resetAgentRunContextForTest();
});
it("keeps aborted chat run markers through terminal lifecycle cleanup", () => {
const { broadcast, chatRunState, handler } = createHarness();
chatRunState.registry.add("run-aborted", {
sessionKey: "session-aborted",
clientRunId: "client-aborted",
});
chatRunState.abortedRuns.set("client-aborted", 1_000);
handler({
runId: "run-aborted",
seq: 2,
stream: "lifecycle",
ts: 1_500,
data: { phase: "end", aborted: true, stopReason: "rpc" },
});
expect(chatRunState.abortedRuns.has("client-aborted")).toBe(true);
expect(chatRunState.registry.peek("run-aborted")).toBeUndefined();
expect(chatBroadcastCalls(broadcast)).toHaveLength(0);
});
it("keeps live session setting metadata at the top level for lifecycle updates", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "session-finished",

View File

@@ -341,8 +341,6 @@ export function createAgentEventHandler({
);
}
} else {
chatRunState.abortedRuns.delete(clientRunId);
chatRunState.abortedRuns.delete(evt.runId);
clearBufferedChatState(clientRunId);
if (chatLink) {
chatRunState.registry.remove(evt.runId, clientRunId, sessionKey);

View File

@@ -297,6 +297,71 @@ describe("agent wait dedupe helper", () => {
});
});
it("preserves an RPC cancel snapshot when late completion writes the same key", () => {
const dedupe = new Map();
const runId = "run-cancel-wins";
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 100,
payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 },
});
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 200,
payload: { runId, status: "ok", endedAt: 200 },
});
expect(
readTerminalSnapshotFromGatewayDedupe({
dedupe,
runId,
}),
).toEqual({
status: "timeout",
endedAt: 100,
error: undefined,
stopReason: "rpc",
});
});
it("preserves an RPC cancel snapshot when late rejection writes the same chat key", () => {
const dedupe = new Map();
const runId = "run-cancel-chat-error";
setRunEntry({
dedupe,
kind: "chat",
runId,
ts: 100,
payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 },
});
setRunEntry({
dedupe,
kind: "chat",
runId,
ts: 200,
ok: false,
payload: { runId, status: "error", summary: "late failure", endedAt: 200 },
});
expect(
readTerminalSnapshotFromGatewayDedupe({
dedupe,
runId,
}),
).toEqual({
status: "timeout",
endedAt: 100,
error: undefined,
stopReason: "rpc",
});
});
it("resolves multiple waiters for the same run id", async () => {
const dedupe = new Map();
const runId = "run-multi";

View File

@@ -235,13 +235,18 @@ export function setGatewayDedupeEntry(params: {
key: string;
entry: DedupeEntry;
}) {
const existing = params.dedupe.get(params.key);
const existingSnapshot = existing ? readTerminalSnapshotFromDedupeEntry(existing) : null;
const incomingSnapshot = readTerminalSnapshotFromDedupeEntry(params.entry);
if (existingSnapshot?.status === "timeout" && existingSnapshot.stopReason === "rpc") {
return;
}
params.dedupe.set(params.key, params.entry);
const runId = parseRunIdFromDedupeKey(params.key);
if (!runId) {
return;
}
const snapshot = readTerminalSnapshotFromDedupeEntry(params.entry);
if (!snapshot) {
if (!incomingSnapshot) {
return;
}
notifyWaiters(runId);

View File

@@ -12,6 +12,7 @@ import {
resetTaskRegistryForTests,
} from "../../tasks/task-registry.js";
import { withTempDir } from "../../test-helpers/temp-dir.js";
import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js";
import { agentHandlers } from "./agent.js";
import { chatHandlers } from "./chat.js";
import { expectSubagentFollowupReactivation } from "./subagent-followup.test-helpers.js";
@@ -1451,6 +1452,7 @@ describe("gateway agent handler", () => {
payloads: [],
meta: { durationMs: 100, aborted: true },
});
const context = makeContext();
await invokeAgent(
{
@@ -1458,7 +1460,7 @@ describe("gateway agent handler", () => {
sessionKey: "agent:main:main",
idempotencyKey: "task-registry-agent-run-aborted",
},
{ reqId: "task-registry-agent-run-aborted" },
{ context, reqId: "task-registry-agent-run-aborted" },
);
await waitForAssertion(() => {
@@ -1468,6 +1470,11 @@ describe("gateway agent handler", () => {
status: "timed_out",
terminalSummary: "aborted",
});
expect(context.dedupe.get("agent:task-registry-agent-run-aborted")?.payload).toMatchObject({
runId: "task-registry-agent-run-aborted",
status: "timeout",
summary: "aborted",
});
});
});
});
@@ -1480,6 +1487,7 @@ describe("gateway agent handler", () => {
const abortError = new Error("This operation was aborted");
abortError.name = "AbortError";
mocks.agentCommand.mockRejectedValueOnce(abortError);
const context = makeContext();
await invokeAgent(
{
@@ -1487,7 +1495,7 @@ describe("gateway agent handler", () => {
sessionKey: "agent:main:main",
idempotencyKey: "task-registry-agent-run-abort-error",
},
{ reqId: "task-registry-agent-run-abort-error" },
{ context, reqId: "task-registry-agent-run-abort-error" },
);
await waitForAssertion(() => {
@@ -1497,6 +1505,13 @@ describe("gateway agent handler", () => {
status: "timed_out",
error: "AbortError: This operation was aborted",
});
expect(
context.dedupe.get("agent:task-registry-agent-run-abort-error")?.payload,
).toMatchObject({
runId: "task-registry-agent-run-abort-error",
status: "timeout",
summary: "aborted",
});
});
});
});
@@ -2896,6 +2911,69 @@ describe("gateway agent handler chat.abort integration", () => {
expect(context.chatAbortControllers.has(runId)).toBe(false);
});
it("keeps the sessions.abort wait snapshot after late agent completion", async () => {
prime();
let capturedSignal: AbortSignal | undefined;
let resolveRun:
| ((value: { payloads: Array<{ text: string }>; meta: { durationMs: number } }) => void)
| undefined;
mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
capturedSignal = opts.abortSignal;
return new Promise((resolve) => {
resolveRun = resolve;
});
});
const context = makeContext();
const runId = "idem-abort-snapshot-wins";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expect(capturedSignal?.aborted).toBe(true);
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `agent:${runId}`,
entry: {
ts: 100,
ok: true,
payload: {
runId,
status: "timeout",
stopReason: "rpc",
endedAt: 100,
},
},
});
resolveRun?.({ payloads: [{ text: "late ok" }], meta: { durationMs: 1 } });
await waitForAssertion(() => {
expect(context.dedupe.get(`agent:${runId}`)?.payload).toMatchObject({
runId,
status: "timeout",
stopReason: "rpc",
endedAt: 100,
});
});
});
it("chat.abort without runId aborts the active agent run for the sessionKey", async () => {
prime();
let capturedSignal: AbortSignal | undefined;

View File

@@ -413,8 +413,8 @@ function dispatchAgentRunFromGateway(params: {
}
void agentCommandFromIngress(params.ingressOpts, defaultRuntime, params.context.deps)
.then((result) => {
const aborted = result?.meta?.aborted === true;
if (shouldTrackTask) {
const aborted = result?.meta?.aborted === true;
tryFinalizeTrackedAgentTask({
runId: params.runId,
status: aborted ? "timed_out" : "succeeded",
@@ -423,8 +423,9 @@ function dispatchAgentRunFromGateway(params: {
}
const payload = {
runId: params.runId,
status: "ok" as const,
summary: "completed",
status: aborted ? ("timeout" as const) : ("ok" as const),
summary: aborted ? "aborted" : "completed",
...(aborted ? { stopReason: result?.meta?.stopReason ?? "rpc" } : {}),
result,
};
setGatewayDedupeEntry({
@@ -441,6 +442,7 @@ function dispatchAgentRunFromGateway(params: {
params.respond(true, payload, undefined, { runId: params.runId });
})
.catch((err) => {
const aborted = isAbortError(err);
if (shouldTrackTask) {
const error = String(err);
tryFinalizeTrackedAgentTask({
@@ -453,22 +455,23 @@ function dispatchAgentRunFromGateway(params: {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: params.runId,
status: "error" as const,
summary: String(err),
status: aborted ? ("timeout" as const) : ("error" as const),
summary: aborted ? "aborted" : String(err),
...(aborted ? { stopReason: "rpc" } : {}),
};
setGatewayDedupeEntry({
dedupe: params.context.dedupe,
key: `agent:${params.idempotencyKey}`,
entry: {
ts: Date.now(),
ok: false,
ok: aborted,
payload,
error,
...(aborted ? {} : { error }),
},
});
params.respond(false, payload, error, {
params.respond(aborted, payload, aborted ? undefined : error, {
runId: params.runId,
error: formatForLog(err),
...(aborted ? {} : { error: formatForLog(err) }),
});
})
.finally(() => {