mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
Tests: cover shared followup queue runtime state
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
|
||||
import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
@@ -743,6 +744,71 @@ describe("followup queue deduplication", () => {
|
||||
expect(calls).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("deduplicates same message_id across distinct enqueue module instances", async () => {
|
||||
const enqueueA = await importFreshModule<typeof import("./queue/enqueue.js")>(
|
||||
import.meta.url,
|
||||
"./queue/enqueue.js?scope=dedupe-a",
|
||||
);
|
||||
const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>(
|
||||
import.meta.url,
|
||||
"./queue/enqueue.js?scope=dedupe-b",
|
||||
);
|
||||
const { clearSessionQueues } = await import("./queue.js");
|
||||
const key = `test-dedup-cross-module-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueA.resetRecentQueuedMessageIdDedupe();
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
|
||||
try {
|
||||
expect(
|
||||
enqueueA.enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "+10000000000",
|
||||
}),
|
||||
settings,
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(
|
||||
enqueueB.enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first-redelivery",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "+10000000000",
|
||||
}),
|
||||
settings,
|
||||
),
|
||||
).toBe(false);
|
||||
expect(calls).toHaveLength(1);
|
||||
} finally {
|
||||
clearSessionQueues([key]);
|
||||
enqueueA.resetRecentQueuedMessageIdDedupe();
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not collide recent message-id keys when routing contains delimiters", async () => {
|
||||
const key = `test-dedup-key-collision-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
@@ -1264,6 +1330,55 @@ describe("followup queue drain restart after idle window", () => {
|
||||
expect(calls[1]?.prompt).toBe("after-idle");
|
||||
});
|
||||
|
||||
it("restarts an idle drain across distinct enqueue and drain module instances", async () => {
|
||||
const drainA = await importFreshModule<typeof import("./queue/drain.js")>(
|
||||
import.meta.url,
|
||||
"./queue/drain.js?scope=restart-a",
|
||||
);
|
||||
const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>(
|
||||
import.meta.url,
|
||||
"./queue/enqueue.js?scope=restart-b",
|
||||
);
|
||||
const { clearSessionQueues } = await import("./queue.js");
|
||||
const key = `test-idle-window-cross-module-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
const firstProcessed = createDeferred<void>();
|
||||
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
|
||||
try {
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
if (calls.length === 1) {
|
||||
firstProcessed.resolve();
|
||||
}
|
||||
};
|
||||
|
||||
enqueueB.enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
|
||||
drainA.scheduleFollowupDrain(key, runFollowup);
|
||||
await firstProcessed.promise;
|
||||
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
enqueueB.enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings);
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(calls).toHaveLength(2);
|
||||
},
|
||||
{ timeout: 1_000 },
|
||||
);
|
||||
|
||||
expect(calls[0]?.prompt).toBe("before-idle");
|
||||
expect(calls[1]?.prompt).toBe("after-idle");
|
||||
} finally {
|
||||
clearSessionQueues([key]);
|
||||
drainA.clearFollowupDrainCallback(key);
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not double-drain when a message arrives while drain is still running", async () => {
|
||||
const key = `test-no-double-drain-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
|
||||
Reference in New Issue
Block a user