mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 18:30:42 +00:00
fix(gateway): preserve active agent dedupe retries
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import type { ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import { DEDUPE_MAX } from "./server-constants.js";
|
||||
import { DEDUPE_MAX, DEDUPE_TTL_MS } from "./server-constants.js";
|
||||
|
||||
const cleanOldMediaMock = vi.fn(async () => {});
|
||||
|
||||
@@ -16,7 +16,10 @@ vi.mock("../media/store.js", async () => {
|
||||
const MEDIA_CLEANUP_TTL_MS = 24 * 60 * 60_000;
|
||||
const ABORTED_RUN_TTL_MS = 60 * 60_000;
|
||||
|
||||
function createActiveRun(sessionKey: string): ChatAbortControllerEntry {
|
||||
function createActiveRun(
|
||||
sessionKey: string,
|
||||
kind?: ChatAbortControllerEntry["kind"],
|
||||
): ChatAbortControllerEntry {
|
||||
const now = Date.now();
|
||||
return {
|
||||
controller: new AbortController(),
|
||||
@@ -24,6 +27,7 @@ function createActiveRun(sessionKey: string): ChatAbortControllerEntry {
|
||||
sessionKey,
|
||||
startedAtMs: now,
|
||||
expiresAtMs: now + ABORTED_RUN_TTL_MS,
|
||||
kind,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -224,6 +228,34 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("keeps active agent dedupe entries past the normal ttl", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
const now = Date.now();
|
||||
deps.chatAbortControllers.set("active-agent", createActiveRun("agent:main:main", "agent"));
|
||||
deps.dedupe.set("agent:active-agent", {
|
||||
ts: now - DEDUPE_TTL_MS - 1,
|
||||
ok: true,
|
||||
payload: { runId: "active-agent", status: "accepted" },
|
||||
});
|
||||
deps.dedupe.set("agent:stale-agent", {
|
||||
ts: now - DEDUPE_TTL_MS - 1,
|
||||
ok: true,
|
||||
payload: { runId: "stale-agent", status: "accepted" },
|
||||
});
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.dedupe.has("agent:active-agent")).toBe(true);
|
||||
expect(deps.dedupe.has("agent:stale-agent")).toBe(false);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("evicts dedupe overflow by oldest timestamp even after reinsertion", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
@@ -306,4 +338,35 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("does not evict active agent dedupe entries while trimming overflow", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
const now = Date.now();
|
||||
|
||||
for (let index = 0; index < DEDUPE_MAX; index += 1) {
|
||||
deps.dedupe.set(`stable-${index}`, { ts: now - 1_000 + index, ok: true });
|
||||
}
|
||||
deps.chatAbortControllers.set("active-oldest", createActiveRun("agent:main:main", "agent"));
|
||||
deps.dedupe.set("agent:active-oldest", {
|
||||
ts: now - 10_000,
|
||||
ok: true,
|
||||
payload: { runId: "active-oldest", status: "accepted" },
|
||||
});
|
||||
deps.dedupe.set("overflow-newest", { ts: now, ok: true });
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.dedupe.size).toBe(DEDUPE_MAX);
|
||||
expect(deps.dedupe.has("agent:active-oldest")).toBe(true);
|
||||
expect(deps.dedupe.has("stable-0")).toBe(false);
|
||||
expect(deps.dedupe.has("stable-1")).toBe(false);
|
||||
expect(deps.dedupe.has("overflow-newest")).toBe(true);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -84,16 +84,29 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
const dedupeCleanup = setInterval(() => {
|
||||
const AGENT_RUN_SEQ_MAX = 10_000;
|
||||
const now = Date.now();
|
||||
const isActiveRunDedupeKey = (key: string) => {
|
||||
if (!key.startsWith("agent:") && !key.startsWith("chat:")) {
|
||||
return false;
|
||||
}
|
||||
const runId = key.slice(key.indexOf(":") + 1);
|
||||
const entry = runId ? params.chatAbortControllers.get(runId) : undefined;
|
||||
if (!entry) {
|
||||
return false;
|
||||
}
|
||||
return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent";
|
||||
};
|
||||
for (const [k, v] of params.dedupe) {
|
||||
if (isActiveRunDedupeKey(k)) {
|
||||
continue;
|
||||
}
|
||||
if (now - v.ts > DEDUPE_TTL_MS) {
|
||||
params.dedupe.delete(k);
|
||||
}
|
||||
}
|
||||
if (params.dedupe.size > DEDUPE_MAX) {
|
||||
const excess = params.dedupe.size - DEDUPE_MAX;
|
||||
// Keep overflow eviction aligned with the entry timestamp, not Map
|
||||
// insertion order, so refresh/reinsert paths still prune the oldest data.
|
||||
const oldestKeys = [...params.dedupe.entries()]
|
||||
.filter(([key]) => !isActiveRunDedupeKey(key))
|
||||
.toSorted(([, left], [, right]) => left.ts - right.ts)
|
||||
.slice(0, excess)
|
||||
.map(([key]) => key);
|
||||
|
||||
@@ -3321,7 +3321,7 @@ describe("gateway agent handler chat.abort integration", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("does not overwrite or evict a pre-existing chatAbortControllers entry with the same runId", async () => {
|
||||
it("does not dispatch a duplicate agent run when dedupe was evicted but the run is active", async () => {
|
||||
prime();
|
||||
mocks.agentCommand.mockResolvedValueOnce({
|
||||
payloads: [{ text: "ok" }],
|
||||
@@ -3340,6 +3340,8 @@ describe("gateway agent handler chat.abort integration", () => {
|
||||
ownerDeviceId: undefined,
|
||||
};
|
||||
context.chatAbortControllers.set(runId, preExisting);
|
||||
context.dedupe.delete(`agent:${runId}`);
|
||||
const respond = vi.fn();
|
||||
|
||||
await invokeAgent(
|
||||
{
|
||||
@@ -3348,16 +3350,14 @@ describe("gateway agent handler chat.abort integration", () => {
|
||||
sessionKey: "agent:main:main",
|
||||
idempotencyKey: runId,
|
||||
},
|
||||
{ context, reqId: runId },
|
||||
{ context, reqId: runId, respond },
|
||||
);
|
||||
|
||||
expect(context.chatAbortControllers.get(runId)).toBe(preExisting);
|
||||
// Cleanup after the agent run completes must not evict the pre-existing
|
||||
// entry owned by a concurrent chat.send.
|
||||
await waitForAssertion(() => {
|
||||
expect(mocks.agentCommand).toHaveBeenCalled();
|
||||
expect(mocks.agentCommand).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(true, { runId, status: "in_flight" }, undefined, {
|
||||
cached: true,
|
||||
runId,
|
||||
});
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
expect(context.chatAbortControllers.get(runId)).toBe(preExisting);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1284,6 +1284,13 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined,
|
||||
kind: "agent",
|
||||
});
|
||||
if (!activeRunAbort.registered && context.chatAbortControllers.has(runId)) {
|
||||
respond(true, { runId, status: "in_flight" as const }, undefined, {
|
||||
cached: true,
|
||||
runId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const accepted = {
|
||||
runId,
|
||||
|
||||
Reference in New Issue
Block a user