diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index a7dc52d6e1b..67032214df2 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -1,7 +1,7 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { HealthSummary } from "../commands/health.js"; import type { ChatAbortControllerEntry } from "./chat-abort.js"; -import { DEDUPE_MAX } from "./server-constants.js"; +import { DEDUPE_MAX, DEDUPE_TTL_MS } from "./server-constants.js"; const cleanOldMediaMock = vi.fn(async () => {}); @@ -16,7 +16,10 @@ vi.mock("../media/store.js", async () => { const MEDIA_CLEANUP_TTL_MS = 24 * 60 * 60_000; const ABORTED_RUN_TTL_MS = 60 * 60_000; -function createActiveRun(sessionKey: string): ChatAbortControllerEntry { +function createActiveRun( + sessionKey: string, + kind?: ChatAbortControllerEntry["kind"], +): ChatAbortControllerEntry { const now = Date.now(); return { controller: new AbortController(), @@ -24,6 +27,7 @@ function createActiveRun(sessionKey: string): ChatAbortControllerEntry { sessionKey, startedAtMs: now, expiresAtMs: now + ABORTED_RUN_TTL_MS, + kind, }; } @@ -224,6 +228,34 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + it("keeps active agent dedupe entries past the normal ttl", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const now = Date.now(); + deps.chatAbortControllers.set("active-agent", createActiveRun("agent:main:main", "agent")); + deps.dedupe.set("agent:active-agent", { + ts: now - DEDUPE_TTL_MS - 1, + ok: true, + payload: { runId: "active-agent", status: "accepted" }, + }); + deps.dedupe.set("agent:stale-agent", { + ts: now - DEDUPE_TTL_MS - 1, + ok: true, + payload: { runId: "stale-agent", status: "accepted" }, + }); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.dedupe.has("agent:active-agent")).toBe(true); + expect(deps.dedupe.has("agent:stale-agent")).toBe(false); + + stopMaintenanceTimers(timers); + }); + it("evicts dedupe overflow by oldest timestamp even after reinsertion", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); @@ -306,4 +338,35 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + + it("does not evict active agent dedupe entries while trimming overflow", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const now = Date.now(); + + for (let index = 0; index < DEDUPE_MAX; index += 1) { + deps.dedupe.set(`stable-${index}`, { ts: now - 1_000 + index, ok: true }); + } + deps.chatAbortControllers.set("active-oldest", createActiveRun("agent:main:main", "agent")); + deps.dedupe.set("agent:active-oldest", { + ts: now - 10_000, + ok: true, + payload: { runId: "active-oldest", status: "accepted" }, + }); + deps.dedupe.set("overflow-newest", { ts: now, ok: true }); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.dedupe.size).toBe(DEDUPE_MAX); + expect(deps.dedupe.has("agent:active-oldest")).toBe(true); + expect(deps.dedupe.has("stable-0")).toBe(false); + expect(deps.dedupe.has("stable-1")).toBe(false); + expect(deps.dedupe.has("overflow-newest")).toBe(true); + + stopMaintenanceTimers(timers); + }); }); diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index ef7d284cb49..914c64ee89b 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -84,16 +84,29 @@ export function startGatewayMaintenanceTimers(params: { const dedupeCleanup = setInterval(() => { const AGENT_RUN_SEQ_MAX = 10_000; const now = Date.now(); + const isActiveRunDedupeKey = (key: string) => { + if (!key.startsWith("agent:") && !key.startsWith("chat:")) { + return false; + } + const runId = key.slice(key.indexOf(":") + 1); + const entry = runId ? params.chatAbortControllers.get(runId) : undefined; + if (!entry) { + return false; + } + return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent"; + }; for (const [k, v] of params.dedupe) { + if (isActiveRunDedupeKey(k)) { + continue; + } if (now - v.ts > DEDUPE_TTL_MS) { params.dedupe.delete(k); } } if (params.dedupe.size > DEDUPE_MAX) { const excess = params.dedupe.size - DEDUPE_MAX; - // Keep overflow eviction aligned with the entry timestamp, not Map - // insertion order, so refresh/reinsert paths still prune the oldest data. const oldestKeys = [...params.dedupe.entries()] + .filter(([key]) => !isActiveRunDedupeKey(key)) .toSorted(([, left], [, right]) => left.ts - right.ts) .slice(0, excess) .map(([key]) => key); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index d93036c5d92..02fdf8da503 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -3321,7 +3321,7 @@ describe("gateway agent handler chat.abort integration", () => { ); }); - it("does not overwrite or evict a pre-existing chatAbortControllers entry with the same runId", async () => { + it("does not dispatch a duplicate agent run when dedupe was evicted but the run is active", async () => { prime(); mocks.agentCommand.mockResolvedValueOnce({ payloads: [{ text: "ok" }], @@ -3340,6 +3340,8 @@ describe("gateway agent handler chat.abort integration", () => { ownerDeviceId: undefined, }; context.chatAbortControllers.set(runId, preExisting); + context.dedupe.delete(`agent:${runId}`); + const respond = vi.fn(); await invokeAgent( { @@ -3348,16 +3350,14 @@ describe("gateway agent handler chat.abort integration", () => { sessionKey: "agent:main:main", idempotencyKey: runId, }, - { context, reqId: runId }, + { context, reqId: runId, respond }, ); expect(context.chatAbortControllers.get(runId)).toBe(preExisting); - // Cleanup after the agent run completes must not evict the pre-existing - // entry owned by a concurrent chat.send. - await waitForAssertion(() => { - expect(mocks.agentCommand).toHaveBeenCalled(); + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(respond).toHaveBeenCalledWith(true, { runId, status: "in_flight" }, undefined, { + cached: true, + runId, }); - await new Promise((resolve) => setImmediate(resolve)); - expect(context.chatAbortControllers.get(runId)).toBe(preExisting); }); }); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index af5cb41dae9..b295e48bf40 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -1284,6 +1284,13 @@ export const agentHandlers: GatewayRequestHandlers = { typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined, kind: "agent", }); + if (!activeRunAbort.registered && context.chatAbortControllers.has(runId)) { + respond(true, { runId, status: "in_flight" as const }, undefined, { + cached: true, + runId, + }); + return; + } const accepted = { runId,