mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:10:45 +00:00
fix(gateway): align sessions abort wait semantics (#74751) thanks @BunsDev
Co-authored-by: Val Alexander <68980965+BunsDev@users.noreply.github.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
|
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
|
||||||
|
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||||
|
|
||||||
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
|
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
|
||||||
|
|
||||||
@@ -177,6 +178,19 @@ export function abortChatRunById(
|
|||||||
ops.chatDeltaLastBroadcastLen.delete(runId);
|
ops.chatDeltaLastBroadcastLen.delete(runId);
|
||||||
const removed = ops.removeChatRun(runId, runId, sessionKey);
|
const removed = ops.removeChatRun(runId, runId, sessionKey);
|
||||||
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
|
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
|
||||||
|
emitAgentEvent({
|
||||||
|
runId,
|
||||||
|
sessionKey,
|
||||||
|
stream: "lifecycle",
|
||||||
|
data: {
|
||||||
|
phase: "end",
|
||||||
|
status: "cancelled",
|
||||||
|
aborted: true,
|
||||||
|
stopReason,
|
||||||
|
startedAt: active.startedAtMs,
|
||||||
|
endedAt: Date.now(),
|
||||||
|
},
|
||||||
|
});
|
||||||
ops.agentRunSeq.delete(runId);
|
ops.agentRunSeq.delete(runId);
|
||||||
if (removed?.clientRunId) {
|
if (removed?.clientRunId) {
|
||||||
ops.agentRunSeq.delete(removed.clientRunId);
|
ops.agentRunSeq.delete(removed.clientRunId);
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ const mocks = vi.hoisted(() => ({
|
|||||||
updateSessionStore: vi.fn(),
|
updateSessionStore: vi.fn(),
|
||||||
agentCommand: vi.fn(),
|
agentCommand: vi.fn(),
|
||||||
registerAgentRunContext: vi.fn(),
|
registerAgentRunContext: vi.fn(),
|
||||||
|
emitAgentEvent: vi.fn(),
|
||||||
performGatewaySessionReset: vi.fn(),
|
performGatewaySessionReset: vi.fn(),
|
||||||
getLatestSubagentRunByChildSessionKey: vi.fn(),
|
getLatestSubagentRunByChildSessionKey: vi.fn(),
|
||||||
replaceSubagentRunAfterSteer: vi.fn(),
|
replaceSubagentRunAfterSteer: vi.fn(),
|
||||||
@@ -102,6 +103,7 @@ vi.mock("../../auto-reply/reply/session-reset-prompt.js", async () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
vi.mock("../../infra/agent-events.js", () => ({
|
vi.mock("../../infra/agent-events.js", () => ({
|
||||||
|
emitAgentEvent: mocks.emitAgentEvent,
|
||||||
registerAgentRunContext: mocks.registerAgentRunContext,
|
registerAgentRunContext: mocks.registerAgentRunContext,
|
||||||
onAgentEvent: vi.fn(),
|
onAgentEvent: vi.fn(),
|
||||||
}));
|
}));
|
||||||
|
|||||||
@@ -2519,15 +2519,17 @@ export const chatHandlers: GatewayRequestHandlers = {
|
|||||||
} else {
|
} else {
|
||||||
void emitUserTranscriptUpdate();
|
void emitUserTranscriptUpdate();
|
||||||
}
|
}
|
||||||
setGatewayDedupeEntry({
|
if (!context.chatAbortedRuns.has(clientRunId)) {
|
||||||
dedupe: context.dedupe,
|
setGatewayDedupeEntry({
|
||||||
key: `chat:${clientRunId}`,
|
dedupe: context.dedupe,
|
||||||
entry: {
|
key: `chat:${clientRunId}`,
|
||||||
ts: Date.now(),
|
entry: {
|
||||||
ok: true,
|
ts: Date.now(),
|
||||||
payload: { runId: clientRunId, status: "ok" as const },
|
ok: true,
|
||||||
},
|
payload: { runId: clientRunId, status: "ok" as const },
|
||||||
});
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
void rewriteUserTranscriptMedia().catch((rewriteErr) => {
|
void rewriteUserTranscriptMedia().catch((rewriteErr) => {
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ import {
|
|||||||
} from "../session-utils.js";
|
} from "../session-utils.js";
|
||||||
import { applySessionsPatchToStore } from "../sessions-patch.js";
|
import { applySessionsPatchToStore } from "../sessions-patch.js";
|
||||||
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
|
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
|
||||||
|
import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js";
|
||||||
import { chatHandlers } from "./chat.js";
|
import { chatHandlers } from "./chat.js";
|
||||||
import type {
|
import type {
|
||||||
GatewayClient,
|
GatewayClient,
|
||||||
@@ -1314,6 +1315,19 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
|||||||
canonicalKey,
|
canonicalKey,
|
||||||
runId: requestedRunId,
|
runId: requestedRunId,
|
||||||
});
|
});
|
||||||
|
// Capture run kinds before the abort because abortChatRunById deletes entries
|
||||||
|
// from chatAbortControllers synchronously. We use this snapshot to choose the
|
||||||
|
// correct dedupe namespace: agent-kind runs use "agent:" (their runId equals
|
||||||
|
// their idempotency key), while chat-send runs use "chat:" so the abort
|
||||||
|
// snapshot does not collide with the agent RPC dedupe cache.
|
||||||
|
const preAbortRunKinds = new Map<string, "chat-send" | "agent" | undefined>();
|
||||||
|
if (requestedRunId) {
|
||||||
|
preAbortRunKinds.set(requestedRunId, context.chatAbortControllers.get(requestedRunId)?.kind);
|
||||||
|
} else {
|
||||||
|
for (const [rid, entry] of context.chatAbortControllers) {
|
||||||
|
preAbortRunKinds.set(rid, entry.kind);
|
||||||
|
}
|
||||||
|
}
|
||||||
let abortedRunId: string | null = null;
|
let abortedRunId: string | null = null;
|
||||||
await chatHandlers["chat.abort"]({
|
await chatHandlers["chat.abort"]({
|
||||||
req,
|
req,
|
||||||
@@ -1334,7 +1348,27 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
|||||||
Boolean(normalizeOptionalString(value)),
|
Boolean(normalizeOptionalString(value)),
|
||||||
)
|
)
|
||||||
: [];
|
: [];
|
||||||
abortedRunId = runIds[0] ?? null;
|
const firstAbortedRunId = runIds[0] ?? null;
|
||||||
|
abortedRunId = firstAbortedRunId;
|
||||||
|
if (firstAbortedRunId) {
|
||||||
|
const endedAt = Date.now();
|
||||||
|
const runKind = preAbortRunKinds.get(firstAbortedRunId);
|
||||||
|
const dedupePrefix = runKind === "agent" ? "agent" : "chat";
|
||||||
|
setGatewayDedupeEntry({
|
||||||
|
dedupe: context.dedupe,
|
||||||
|
key: `${dedupePrefix}:${firstAbortedRunId}`,
|
||||||
|
entry: {
|
||||||
|
ts: endedAt,
|
||||||
|
ok: true,
|
||||||
|
payload: {
|
||||||
|
status: "timeout",
|
||||||
|
runId: firstAbortedRunId,
|
||||||
|
stopReason: "rpc",
|
||||||
|
endedAt,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
respond(
|
respond(
|
||||||
true,
|
true,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -78,7 +78,11 @@ async function withGatewayChatHarness(
|
|||||||
clearConfigCache();
|
clearConfigCache();
|
||||||
testState.sessionStorePath = undefined;
|
testState.sessionStorePath = undefined;
|
||||||
ws.close();
|
ws.close();
|
||||||
await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true })));
|
await Promise.all(
|
||||||
|
tempDirs.map((dir) =>
|
||||||
|
fs.rm(dir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }),
|
||||||
|
),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -275,6 +275,26 @@ describe("gateway server chat", () => {
|
|||||||
});
|
});
|
||||||
expect(sendRes.ok).toBe(true);
|
expect(sendRes.ok).toBe(true);
|
||||||
|
|
||||||
|
const cancelledEventP = onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) => {
|
||||||
|
const data =
|
||||||
|
o.payload?.data && typeof o.payload.data === "object"
|
||||||
|
? (o.payload.data as Record<string, unknown>)
|
||||||
|
: {};
|
||||||
|
return (
|
||||||
|
o.type === "event" &&
|
||||||
|
o.event === "agent" &&
|
||||||
|
o.payload?.runId === "idem-sessions-abort-1" &&
|
||||||
|
o.payload?.stream === "lifecycle" &&
|
||||||
|
data.phase === "end" &&
|
||||||
|
data.stopReason === "rpc"
|
||||||
|
);
|
||||||
|
},
|
||||||
|
8000,
|
||||||
|
);
|
||||||
|
void cancelledEventP.catch(() => undefined);
|
||||||
|
|
||||||
const abortRes = await rpcReq(ws, "sessions.abort", {
|
const abortRes = await rpcReq(ws, "sessions.abort", {
|
||||||
key: "agent:main:dashboard:test-abort",
|
key: "agent:main:dashboard:test-abort",
|
||||||
runId: "idem-sessions-abort-1",
|
runId: "idem-sessions-abort-1",
|
||||||
@@ -283,6 +303,23 @@ describe("gateway server chat", () => {
|
|||||||
expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status);
|
expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status);
|
||||||
if (abortRes.payload?.status === "aborted") {
|
if (abortRes.payload?.status === "aborted") {
|
||||||
expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1");
|
expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1");
|
||||||
|
const cancelledEvent = await cancelledEventP;
|
||||||
|
expect(cancelledEvent.payload?.data).toMatchObject({
|
||||||
|
phase: "end",
|
||||||
|
status: "cancelled",
|
||||||
|
aborted: true,
|
||||||
|
stopReason: "rpc",
|
||||||
|
});
|
||||||
|
const waitRes = await rpcReq(ws, "agent.wait", {
|
||||||
|
runId: "idem-sessions-abort-1",
|
||||||
|
timeoutMs: 0,
|
||||||
|
});
|
||||||
|
expect(waitRes.ok).toBe(true);
|
||||||
|
expect(waitRes.payload).toMatchObject({
|
||||||
|
runId: "idem-sessions-abort-1",
|
||||||
|
status: "timeout",
|
||||||
|
stopReason: "rpc",
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
expect(abortRes.payload?.abortedRunId).toBeNull();
|
expect(abortRes.payload?.abortedRunId).toBeNull();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user