mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-22 07:20:59 +00:00
315 lines
6.7 KiB
TypeScript
315 lines
6.7 KiB
TypeScript
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
|
import type { DedupeEntry } from "../server-shared.js";
|
|
import {
|
|
__testing,
|
|
readTerminalSnapshotFromGatewayDedupe,
|
|
setGatewayDedupeEntry,
|
|
waitForTerminalGatewayDedupe,
|
|
} from "./agent-wait-dedupe.js";
|
|
|
|
describe("agent wait dedupe helper", () => {
|
|
function setRunEntry(params: {
|
|
dedupe: Map<string, DedupeEntry>;
|
|
kind: "agent" | "chat";
|
|
runId: string;
|
|
ts?: number;
|
|
ok?: boolean;
|
|
payload: Record<string, unknown>;
|
|
}) {
|
|
setGatewayDedupeEntry({
|
|
dedupe: params.dedupe,
|
|
key: `${params.kind}:${params.runId}`,
|
|
entry: {
|
|
ts: params.ts ?? Date.now(),
|
|
ok: params.ok ?? true,
|
|
payload: params.payload,
|
|
},
|
|
});
|
|
}
|
|
|
|
beforeEach(() => {
|
|
__testing.resetWaiters();
|
|
vi.useFakeTimers();
|
|
});
|
|
|
|
afterEach(() => {
|
|
__testing.resetWaiters();
|
|
vi.useRealTimers();
|
|
});
|
|
|
|
it("unblocks waiters when a terminal chat dedupe entry is written", async () => {
|
|
const dedupe = new Map();
|
|
const runId = "run-chat-terminal";
|
|
const waiter = waitForTerminalGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
timeoutMs: 1_000,
|
|
});
|
|
|
|
await Promise.resolve();
|
|
expect(__testing.getWaiterCount(runId)).toBe(1);
|
|
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "chat",
|
|
runId,
|
|
payload: {
|
|
runId,
|
|
status: "ok",
|
|
startedAt: 100,
|
|
endedAt: 200,
|
|
},
|
|
});
|
|
|
|
await expect(waiter).resolves.toEqual({
|
|
status: "ok",
|
|
startedAt: 100,
|
|
endedAt: 200,
|
|
error: undefined,
|
|
});
|
|
expect(__testing.getWaiterCount(runId)).toBe(0);
|
|
});
|
|
|
|
it("keeps stale chat dedupe blocked while agent dedupe is in-flight", async () => {
|
|
const dedupe = new Map();
|
|
const runId = "run-stale-chat";
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "chat",
|
|
runId,
|
|
payload: {
|
|
runId,
|
|
status: "ok",
|
|
},
|
|
});
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "agent",
|
|
runId,
|
|
payload: {
|
|
runId,
|
|
status: "accepted",
|
|
},
|
|
});
|
|
|
|
const snapshot = readTerminalSnapshotFromGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
});
|
|
expect(snapshot).toBeNull();
|
|
|
|
const blockedWait = waitForTerminalGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
timeoutMs: 25,
|
|
});
|
|
await vi.advanceTimersByTimeAsync(30);
|
|
await expect(blockedWait).resolves.toBeNull();
|
|
expect(__testing.getWaiterCount(runId)).toBe(0);
|
|
});
|
|
|
|
it("uses newer terminal chat snapshot when agent entry is non-terminal", () => {
|
|
const dedupe = new Map();
|
|
const runId = "run-nonterminal-agent-with-newer-chat";
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "agent",
|
|
runId,
|
|
ts: 100,
|
|
payload: {
|
|
runId,
|
|
status: "accepted",
|
|
},
|
|
});
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "chat",
|
|
runId,
|
|
ts: 200,
|
|
payload: {
|
|
runId,
|
|
status: "ok",
|
|
startedAt: 1,
|
|
endedAt: 2,
|
|
},
|
|
});
|
|
|
|
expect(
|
|
readTerminalSnapshotFromGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
}),
|
|
).toEqual({
|
|
status: "ok",
|
|
startedAt: 1,
|
|
endedAt: 2,
|
|
error: undefined,
|
|
});
|
|
});
|
|
|
|
it("ignores stale agent snapshots when waiting for an active chat run", async () => {
|
|
const dedupe = new Map();
|
|
const runId = "run-chat-active-ignore-agent";
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "agent",
|
|
runId,
|
|
payload: {
|
|
runId,
|
|
status: "ok",
|
|
},
|
|
});
|
|
|
|
expect(
|
|
readTerminalSnapshotFromGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
ignoreAgentTerminalSnapshot: true,
|
|
}),
|
|
).toBeNull();
|
|
|
|
const wait = waitForTerminalGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
timeoutMs: 1_000,
|
|
ignoreAgentTerminalSnapshot: true,
|
|
});
|
|
await Promise.resolve();
|
|
expect(__testing.getWaiterCount(runId)).toBe(1);
|
|
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "chat",
|
|
runId,
|
|
payload: {
|
|
runId,
|
|
status: "ok",
|
|
startedAt: 123,
|
|
endedAt: 456,
|
|
},
|
|
});
|
|
|
|
await expect(wait).resolves.toEqual({
|
|
status: "ok",
|
|
startedAt: 123,
|
|
endedAt: 456,
|
|
error: undefined,
|
|
});
|
|
});
|
|
|
|
it("prefers the freshest terminal snapshot when agent/chat dedupe keys collide", () => {
|
|
const runId = "run-collision";
|
|
const dedupe = new Map();
|
|
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "agent",
|
|
runId,
|
|
ts: 100,
|
|
payload: { runId, status: "ok", startedAt: 10, endedAt: 20 },
|
|
});
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "chat",
|
|
runId,
|
|
ts: 200,
|
|
ok: false,
|
|
payload: { runId, status: "error", startedAt: 30, endedAt: 40, error: "chat failed" },
|
|
});
|
|
|
|
expect(
|
|
readTerminalSnapshotFromGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
}),
|
|
).toEqual({
|
|
status: "error",
|
|
startedAt: 30,
|
|
endedAt: 40,
|
|
error: "chat failed",
|
|
});
|
|
|
|
const dedupeReverse = new Map();
|
|
setRunEntry({
|
|
dedupe: dedupeReverse,
|
|
kind: "chat",
|
|
runId,
|
|
ts: 100,
|
|
payload: { runId, status: "ok", startedAt: 1, endedAt: 2 },
|
|
});
|
|
setRunEntry({
|
|
dedupe: dedupeReverse,
|
|
kind: "agent",
|
|
runId,
|
|
ts: 200,
|
|
payload: { runId, status: "timeout", startedAt: 3, endedAt: 4, error: "still running" },
|
|
});
|
|
|
|
expect(
|
|
readTerminalSnapshotFromGatewayDedupe({
|
|
dedupe: dedupeReverse,
|
|
runId,
|
|
}),
|
|
).toEqual({
|
|
status: "timeout",
|
|
startedAt: 3,
|
|
endedAt: 4,
|
|
error: "still running",
|
|
});
|
|
});
|
|
|
|
it("resolves multiple waiters for the same run id", async () => {
|
|
const dedupe = new Map();
|
|
const runId = "run-multi";
|
|
const first = waitForTerminalGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
timeoutMs: 1_000,
|
|
});
|
|
const second = waitForTerminalGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
timeoutMs: 1_000,
|
|
});
|
|
|
|
await Promise.resolve();
|
|
expect(__testing.getWaiterCount(runId)).toBe(2);
|
|
|
|
setRunEntry({
|
|
dedupe,
|
|
kind: "chat",
|
|
runId,
|
|
payload: { runId, status: "ok" },
|
|
});
|
|
|
|
await expect(first).resolves.toEqual(
|
|
expect.objectContaining({
|
|
status: "ok",
|
|
}),
|
|
);
|
|
await expect(second).resolves.toEqual(
|
|
expect.objectContaining({
|
|
status: "ok",
|
|
}),
|
|
);
|
|
expect(__testing.getWaiterCount(runId)).toBe(0);
|
|
});
|
|
|
|
it("cleans up waiter registration on timeout", async () => {
|
|
const dedupe = new Map();
|
|
const runId = "run-timeout";
|
|
const wait = waitForTerminalGatewayDedupe({
|
|
dedupe,
|
|
runId,
|
|
timeoutMs: 20,
|
|
});
|
|
|
|
await Promise.resolve();
|
|
expect(__testing.getWaiterCount(runId)).toBe(1);
|
|
|
|
await vi.advanceTimersByTimeAsync(25);
|
|
await expect(wait).resolves.toBeNull();
|
|
expect(__testing.getWaiterCount(runId)).toBe(0);
|
|
});
|
|
});
|