mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:20:43 +00:00
fix(gateway): align sessions abort wait semantics (#74751) thanks @BunsDev
Co-authored-by: Val Alexander <68980965+BunsDev@users.noreply.github.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
|
||||
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
|
||||
|
||||
@@ -177,6 +178,19 @@ export function abortChatRunById(
|
||||
ops.chatDeltaLastBroadcastLen.delete(runId);
|
||||
const removed = ops.removeChatRun(runId, runId, sessionKey);
|
||||
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
sessionKey,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
status: "cancelled",
|
||||
aborted: true,
|
||||
stopReason,
|
||||
startedAt: active.startedAtMs,
|
||||
endedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
ops.agentRunSeq.delete(runId);
|
||||
if (removed?.clientRunId) {
|
||||
ops.agentRunSeq.delete(removed.clientRunId);
|
||||
|
||||
@@ -25,6 +25,7 @@ const mocks = vi.hoisted(() => ({
|
||||
updateSessionStore: vi.fn(),
|
||||
agentCommand: vi.fn(),
|
||||
registerAgentRunContext: vi.fn(),
|
||||
emitAgentEvent: vi.fn(),
|
||||
performGatewaySessionReset: vi.fn(),
|
||||
getLatestSubagentRunByChildSessionKey: vi.fn(),
|
||||
replaceSubagentRunAfterSteer: vi.fn(),
|
||||
@@ -102,6 +103,7 @@ vi.mock("../../auto-reply/reply/session-reset-prompt.js", async () => {
|
||||
});
|
||||
|
||||
vi.mock("../../infra/agent-events.js", () => ({
|
||||
emitAgentEvent: mocks.emitAgentEvent,
|
||||
registerAgentRunContext: mocks.registerAgentRunContext,
|
||||
onAgentEvent: vi.fn(),
|
||||
}));
|
||||
|
||||
@@ -2519,15 +2519,17 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
} else {
|
||||
void emitUserTranscriptUpdate();
|
||||
}
|
||||
setGatewayDedupeEntry({
|
||||
dedupe: context.dedupe,
|
||||
key: `chat:${clientRunId}`,
|
||||
entry: {
|
||||
ts: Date.now(),
|
||||
ok: true,
|
||||
payload: { runId: clientRunId, status: "ok" as const },
|
||||
},
|
||||
});
|
||||
if (!context.chatAbortedRuns.has(clientRunId)) {
|
||||
setGatewayDedupeEntry({
|
||||
dedupe: context.dedupe,
|
||||
key: `chat:${clientRunId}`,
|
||||
entry: {
|
||||
ts: Date.now(),
|
||||
ok: true,
|
||||
payload: { runId: clientRunId, status: "ok" as const },
|
||||
},
|
||||
});
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
void rewriteUserTranscriptMedia().catch((rewriteErr) => {
|
||||
|
||||
@@ -91,6 +91,7 @@ import {
|
||||
} from "../session-utils.js";
|
||||
import { applySessionsPatchToStore } from "../sessions-patch.js";
|
||||
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
|
||||
import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js";
|
||||
import { chatHandlers } from "./chat.js";
|
||||
import type {
|
||||
GatewayClient,
|
||||
@@ -1314,6 +1315,19 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
canonicalKey,
|
||||
runId: requestedRunId,
|
||||
});
|
||||
// Capture run kinds before the abort because abortChatRunById deletes entries
|
||||
// from chatAbortControllers synchronously. We use this snapshot to choose the
|
||||
// correct dedupe namespace: agent-kind runs use "agent:" (their runId equals
|
||||
// their idempotency key), while chat-send runs use "chat:" so the abort
|
||||
// snapshot does not collide with the agent RPC dedupe cache.
|
||||
const preAbortRunKinds = new Map<string, "chat-send" | "agent" | undefined>();
|
||||
if (requestedRunId) {
|
||||
preAbortRunKinds.set(requestedRunId, context.chatAbortControllers.get(requestedRunId)?.kind);
|
||||
} else {
|
||||
for (const [rid, entry] of context.chatAbortControllers) {
|
||||
preAbortRunKinds.set(rid, entry.kind);
|
||||
}
|
||||
}
|
||||
let abortedRunId: string | null = null;
|
||||
await chatHandlers["chat.abort"]({
|
||||
req,
|
||||
@@ -1334,7 +1348,27 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
Boolean(normalizeOptionalString(value)),
|
||||
)
|
||||
: [];
|
||||
abortedRunId = runIds[0] ?? null;
|
||||
const firstAbortedRunId = runIds[0] ?? null;
|
||||
abortedRunId = firstAbortedRunId;
|
||||
if (firstAbortedRunId) {
|
||||
const endedAt = Date.now();
|
||||
const runKind = preAbortRunKinds.get(firstAbortedRunId);
|
||||
const dedupePrefix = runKind === "agent" ? "agent" : "chat";
|
||||
setGatewayDedupeEntry({
|
||||
dedupe: context.dedupe,
|
||||
key: `${dedupePrefix}:${firstAbortedRunId}`,
|
||||
entry: {
|
||||
ts: endedAt,
|
||||
ok: true,
|
||||
payload: {
|
||||
status: "timeout",
|
||||
runId: firstAbortedRunId,
|
||||
stopReason: "rpc",
|
||||
endedAt,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
|
||||
@@ -78,7 +78,11 @@ async function withGatewayChatHarness(
|
||||
clearConfigCache();
|
||||
testState.sessionStorePath = undefined;
|
||||
ws.close();
|
||||
await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true })));
|
||||
await Promise.all(
|
||||
tempDirs.map((dir) =>
|
||||
fs.rm(dir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -275,6 +275,26 @@ describe("gateway server chat", () => {
|
||||
});
|
||||
expect(sendRes.ok).toBe(true);
|
||||
|
||||
const cancelledEventP = onceMessage(
|
||||
ws,
|
||||
(o) => {
|
||||
const data =
|
||||
o.payload?.data && typeof o.payload.data === "object"
|
||||
? (o.payload.data as Record<string, unknown>)
|
||||
: {};
|
||||
return (
|
||||
o.type === "event" &&
|
||||
o.event === "agent" &&
|
||||
o.payload?.runId === "idem-sessions-abort-1" &&
|
||||
o.payload?.stream === "lifecycle" &&
|
||||
data.phase === "end" &&
|
||||
data.stopReason === "rpc"
|
||||
);
|
||||
},
|
||||
8000,
|
||||
);
|
||||
void cancelledEventP.catch(() => undefined);
|
||||
|
||||
const abortRes = await rpcReq(ws, "sessions.abort", {
|
||||
key: "agent:main:dashboard:test-abort",
|
||||
runId: "idem-sessions-abort-1",
|
||||
@@ -283,6 +303,23 @@ describe("gateway server chat", () => {
|
||||
expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status);
|
||||
if (abortRes.payload?.status === "aborted") {
|
||||
expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1");
|
||||
const cancelledEvent = await cancelledEventP;
|
||||
expect(cancelledEvent.payload?.data).toMatchObject({
|
||||
phase: "end",
|
||||
status: "cancelled",
|
||||
aborted: true,
|
||||
stopReason: "rpc",
|
||||
});
|
||||
const waitRes = await rpcReq(ws, "agent.wait", {
|
||||
runId: "idem-sessions-abort-1",
|
||||
timeoutMs: 0,
|
||||
});
|
||||
expect(waitRes.ok).toBe(true);
|
||||
expect(waitRes.payload).toMatchObject({
|
||||
runId: "idem-sessions-abort-1",
|
||||
status: "timeout",
|
||||
stopReason: "rpc",
|
||||
});
|
||||
} else {
|
||||
expect(abortRes.payload?.abortedRunId).toBeNull();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user