mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-08 15:51:06 +00:00
test: split reply flow coverage by owner surface
This commit is contained in:
182
src/auto-reply/reply/inbound-context.test.ts
Normal file
182
src/auto-reply/reply/inbound-context.test.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { expectChannelInboundContextContract as expectInboundContextContract } from "../../channels/plugins/contracts/test-helpers.js";
|
||||
import type { MsgContext } from "../templating.js";
|
||||
import { finalizeInboundContext } from "./inbound-context.js";
|
||||
import { normalizeInboundTextNewlines } from "./inbound-text.js";
|
||||
|
||||
describe("normalizeInboundTextNewlines", () => {
|
||||
it("normalizes real newlines and preserves literal backslash-n sequences", () => {
|
||||
const cases = [
|
||||
{ input: "hello\r\nworld", expected: "hello\nworld" },
|
||||
{ input: "hello\rworld", expected: "hello\nworld" },
|
||||
{ input: "C:\\Work\\nxxx\\README.md", expected: "C:\\Work\\nxxx\\README.md" },
|
||||
{
|
||||
input: "Please read the file at C:\\Work\\nxxx\\README.md",
|
||||
expected: "Please read the file at C:\\Work\\nxxx\\README.md",
|
||||
},
|
||||
{ input: "C:\\new\\notes\\nested", expected: "C:\\new\\notes\\nested" },
|
||||
{ input: "Line 1\r\nC:\\Work\\nxxx", expected: "Line 1\nC:\\Work\\nxxx" },
|
||||
] as const;
|
||||
|
||||
for (const testCase of cases) {
|
||||
expect(normalizeInboundTextNewlines(testCase.input)).toBe(testCase.expected);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("inbound context contract (providers + extensions)", () => {
|
||||
const cases: Array<{ name: string; ctx: MsgContext }> = [
|
||||
{
|
||||
name: "whatsapp group",
|
||||
ctx: {
|
||||
Provider: "whatsapp",
|
||||
Surface: "whatsapp",
|
||||
ChatType: "group",
|
||||
From: "123@g.us",
|
||||
To: "+15550001111",
|
||||
Body: "[WhatsApp 123@g.us] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "telegram group",
|
||||
ctx: {
|
||||
Provider: "telegram",
|
||||
Surface: "telegram",
|
||||
ChatType: "group",
|
||||
From: "group:123",
|
||||
To: "telegram:123",
|
||||
Body: "[Telegram group:123] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "Telegram Group",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "slack channel",
|
||||
ctx: {
|
||||
Provider: "slack",
|
||||
Surface: "slack",
|
||||
ChatType: "channel",
|
||||
From: "slack:channel:C123",
|
||||
To: "channel:C123",
|
||||
Body: "[Slack #general] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "#general",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "discord channel",
|
||||
ctx: {
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
ChatType: "channel",
|
||||
From: "group:123",
|
||||
To: "channel:123",
|
||||
Body: "[Discord #general] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "#general",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "signal dm",
|
||||
ctx: {
|
||||
Provider: "signal",
|
||||
Surface: "signal",
|
||||
ChatType: "direct",
|
||||
From: "signal:+15550001111",
|
||||
To: "signal:+15550002222",
|
||||
Body: "[Signal] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "imessage group",
|
||||
ctx: {
|
||||
Provider: "imessage",
|
||||
Surface: "imessage",
|
||||
ChatType: "group",
|
||||
From: "group:chat_id:123",
|
||||
To: "chat_id:123",
|
||||
Body: "[iMessage Group] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "iMessage Group",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "matrix channel",
|
||||
ctx: {
|
||||
Provider: "matrix",
|
||||
Surface: "matrix",
|
||||
ChatType: "channel",
|
||||
From: "matrix:channel:!room:example.org",
|
||||
To: "room:!room:example.org",
|
||||
Body: "[Matrix] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "#general",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "msteams channel",
|
||||
ctx: {
|
||||
Provider: "msteams",
|
||||
Surface: "msteams",
|
||||
ChatType: "channel",
|
||||
From: "msteams:channel:19:abc@thread.tacv2",
|
||||
To: "msteams:channel:19:abc@thread.tacv2",
|
||||
Body: "[Teams] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "Teams Channel",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "zalo dm",
|
||||
ctx: {
|
||||
Provider: "zalo",
|
||||
Surface: "zalo",
|
||||
ChatType: "direct",
|
||||
From: "zalo:123",
|
||||
To: "zalo:123",
|
||||
Body: "[Zalo] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "zalouser group",
|
||||
ctx: {
|
||||
Provider: "zalouser",
|
||||
Surface: "zalouser",
|
||||
ChatType: "group",
|
||||
From: "group:123",
|
||||
To: "zalouser:123",
|
||||
Body: "[Zalo Personal] hi",
|
||||
RawBody: "hi",
|
||||
CommandBody: "hi",
|
||||
GroupSubject: "Zalouser Group",
|
||||
SenderName: "Alice",
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
for (const entry of cases) {
|
||||
it(entry.name, () => {
|
||||
const ctx = finalizeInboundContext({ ...entry.ctx });
|
||||
expectInboundContextContract(ctx);
|
||||
});
|
||||
}
|
||||
});
|
||||
387
src/auto-reply/reply/line-directives.test.ts
Normal file
387
src/auto-reply/reply/line-directives.test.ts
Normal file
@@ -0,0 +1,387 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { hasLineDirectives, parseLineDirectives } from "./line-directives.js";
|
||||
|
||||
const getLineData = (result: ReturnType<typeof parseLineDirectives>) =>
|
||||
(result.channelData?.line as Record<string, unknown> | undefined) ?? {};
|
||||
|
||||
describe("hasLineDirectives", () => {
|
||||
it("matches expected detection across directive patterns", () => {
|
||||
const cases: Array<{ text: string; expected: boolean }> = [
|
||||
{ text: "Here are options [[quick_replies: A, B, C]]", expected: true },
|
||||
{ text: "[[location: Place | Address | 35.6 | 139.7]]", expected: true },
|
||||
{ text: "[[confirm: Continue? | Yes | No]]", expected: true },
|
||||
{ text: "[[buttons: Menu | Choose | Opt1:data1, Opt2:data2]]", expected: true },
|
||||
{ text: "Just regular text", expected: false },
|
||||
{ text: "[[not_a_directive: something]]", expected: false },
|
||||
{ text: "[[media_player: Song | Artist | Speaker]]", expected: true },
|
||||
{ text: "[[event: Meeting | Jan 24 | 2pm]]", expected: true },
|
||||
{ text: "[[agenda: Today | Meeting:9am, Lunch:12pm]]", expected: true },
|
||||
{ text: "[[device: TV | Room]]", expected: true },
|
||||
{ text: "[[appletv_remote: Apple TV | Playing]]", expected: true },
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
expect(hasLineDirectives(testCase.text)).toBe(testCase.expected);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("parseLineDirectives", () => {
|
||||
describe("quick_replies", () => {
|
||||
it("parses quick replies variants", () => {
|
||||
const cases: Array<{
|
||||
text: string;
|
||||
channelData?: { line: { quickReplies: string[] } };
|
||||
quickReplies: string[];
|
||||
outputText?: string;
|
||||
}> = [
|
||||
{
|
||||
text: "Choose one:\n[[quick_replies: Option A, Option B, Option C]]",
|
||||
quickReplies: ["Option A", "Option B", "Option C"],
|
||||
outputText: "Choose one:",
|
||||
},
|
||||
{
|
||||
text: "Before [[quick_replies: A, B]] After",
|
||||
quickReplies: ["A", "B"],
|
||||
outputText: "Before After",
|
||||
},
|
||||
{
|
||||
text: "Text [[quick_replies: C, D]]",
|
||||
channelData: { line: { quickReplies: ["A", "B"] } },
|
||||
quickReplies: ["A", "B", "C", "D"],
|
||||
outputText: "Text",
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({
|
||||
text: testCase.text,
|
||||
channelData: testCase.channelData,
|
||||
});
|
||||
expect(getLineData(result).quickReplies).toEqual(testCase.quickReplies);
|
||||
if (testCase.outputText !== undefined) {
|
||||
expect(result.text).toBe(testCase.outputText);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("location", () => {
|
||||
it("parses location variants", () => {
|
||||
const existing = { title: "Existing", address: "Addr", latitude: 1, longitude: 2 };
|
||||
const cases: Array<{
|
||||
text: string;
|
||||
channelData?: { line: { location: typeof existing } };
|
||||
location?: typeof existing;
|
||||
outputText?: string;
|
||||
}> = [
|
||||
{
|
||||
text: "Here's the location:\n[[location: Tokyo Station | Tokyo, Japan | 35.6812 | 139.7671]]",
|
||||
location: {
|
||||
title: "Tokyo Station",
|
||||
address: "Tokyo, Japan",
|
||||
latitude: 35.6812,
|
||||
longitude: 139.7671,
|
||||
},
|
||||
outputText: "Here's the location:",
|
||||
},
|
||||
{
|
||||
text: "[[location: Place | Address | invalid | 139.7]]",
|
||||
location: undefined,
|
||||
},
|
||||
{
|
||||
text: "[[location: New | New Addr | 35.6 | 139.7]]",
|
||||
channelData: { line: { location: existing } },
|
||||
location: existing,
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({
|
||||
text: testCase.text,
|
||||
channelData: testCase.channelData,
|
||||
});
|
||||
expect(getLineData(result).location).toEqual(testCase.location);
|
||||
if (testCase.outputText !== undefined) {
|
||||
expect(result.text).toBe(testCase.outputText);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("confirm", () => {
|
||||
it("parses confirm directives with default and custom action payloads", () => {
|
||||
const cases = [
|
||||
{
|
||||
name: "default yes/no data",
|
||||
text: "[[confirm: Delete this item? | Yes | No]]",
|
||||
expectedTemplate: {
|
||||
type: "confirm",
|
||||
text: "Delete this item?",
|
||||
confirmLabel: "Yes",
|
||||
confirmData: "yes",
|
||||
cancelLabel: "No",
|
||||
cancelData: "no",
|
||||
altText: "Delete this item?",
|
||||
},
|
||||
expectedText: undefined,
|
||||
},
|
||||
{
|
||||
name: "custom action data",
|
||||
text: "[[confirm: Proceed? | OK:action=confirm | Cancel:action=cancel]]",
|
||||
expectedTemplate: {
|
||||
type: "confirm",
|
||||
text: "Proceed?",
|
||||
confirmLabel: "OK",
|
||||
confirmData: "action=confirm",
|
||||
cancelLabel: "Cancel",
|
||||
cancelData: "action=cancel",
|
||||
altText: "Proceed?",
|
||||
},
|
||||
expectedText: undefined,
|
||||
},
|
||||
] as const;
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
expect(getLineData(result).templateMessage, testCase.name).toEqual(
|
||||
testCase.expectedTemplate,
|
||||
);
|
||||
expect(result.text, testCase.name).toBe(testCase.expectedText);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("buttons", () => {
|
||||
it("parses message/uri/postback button actions and enforces action caps", () => {
|
||||
const cases = [
|
||||
{
|
||||
name: "message actions",
|
||||
text: "[[buttons: Menu | Select an option | Help:/help, Status:/status]]",
|
||||
expectedTemplate: {
|
||||
type: "buttons",
|
||||
title: "Menu",
|
||||
text: "Select an option",
|
||||
actions: [
|
||||
{ type: "message", label: "Help", data: "/help" },
|
||||
{ type: "message", label: "Status", data: "/status" },
|
||||
],
|
||||
altText: "Menu: Select an option",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "uri action",
|
||||
text: "[[buttons: Links | Visit us | Site:https://example.com]]",
|
||||
expectedFirstAction: {
|
||||
type: "uri",
|
||||
label: "Site",
|
||||
uri: "https://example.com",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "postback action",
|
||||
text: "[[buttons: Actions | Choose | Select:action=select&id=1]]",
|
||||
expectedFirstAction: {
|
||||
type: "postback",
|
||||
label: "Select",
|
||||
data: "action=select&id=1",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "action cap",
|
||||
text: "[[buttons: Menu | Text | A:a, B:b, C:c, D:d, E:e, F:f]]",
|
||||
expectedActionCount: 4,
|
||||
},
|
||||
] as const;
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
const templateMessage = getLineData(result).templateMessage as {
|
||||
type?: string;
|
||||
actions?: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(templateMessage?.type, testCase.name).toBe("buttons");
|
||||
if ("expectedTemplate" in testCase) {
|
||||
expect(templateMessage, testCase.name).toEqual(testCase.expectedTemplate);
|
||||
}
|
||||
if ("expectedFirstAction" in testCase) {
|
||||
expect(templateMessage?.actions?.[0], testCase.name).toEqual(
|
||||
testCase.expectedFirstAction,
|
||||
);
|
||||
}
|
||||
if ("expectedActionCount" in testCase) {
|
||||
expect(templateMessage?.actions?.length, testCase.name).toBe(
|
||||
testCase.expectedActionCount,
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("media_player", () => {
|
||||
it("parses media_player directives across full/minimal/paused variants", () => {
|
||||
const cases = [
|
||||
{
|
||||
name: "all fields",
|
||||
text: "Now playing:\n[[media_player: Bohemian Rhapsody | Queen | Speaker | https://example.com/album.jpg | playing]]",
|
||||
expectedAltText: "🎵 Bohemian Rhapsody - Queen",
|
||||
expectedText: "Now playing:",
|
||||
expectFooter: true,
|
||||
expectBodyContents: false,
|
||||
},
|
||||
{
|
||||
name: "minimal",
|
||||
text: "[[media_player: Unknown Track]]",
|
||||
expectedAltText: "🎵 Unknown Track",
|
||||
expectedText: undefined,
|
||||
expectFooter: false,
|
||||
expectBodyContents: false,
|
||||
},
|
||||
{
|
||||
name: "paused status",
|
||||
text: "[[media_player: Song | Artist | Player | | paused]]",
|
||||
expectedAltText: undefined,
|
||||
expectedText: undefined,
|
||||
expectFooter: false,
|
||||
expectBodyContents: true,
|
||||
},
|
||||
] as const;
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
const flexMessage = getLineData(result).flexMessage as {
|
||||
altText?: string;
|
||||
contents?: { footer?: { contents?: unknown[] }; body?: { contents?: unknown[] } };
|
||||
};
|
||||
expect(flexMessage, testCase.name).toBeDefined();
|
||||
if (testCase.expectedAltText !== undefined) {
|
||||
expect(flexMessage?.altText, testCase.name).toBe(testCase.expectedAltText);
|
||||
}
|
||||
if (testCase.expectedText !== undefined) {
|
||||
expect(result.text, testCase.name).toBe(testCase.expectedText);
|
||||
}
|
||||
if (testCase.expectFooter) {
|
||||
expect(flexMessage?.contents?.footer?.contents?.length, testCase.name).toBeGreaterThan(0);
|
||||
}
|
||||
if ("expectBodyContents" in testCase && testCase.expectBodyContents) {
|
||||
expect(flexMessage?.contents?.body?.contents, testCase.name).toBeDefined();
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("event", () => {
|
||||
it("parses event variants", () => {
|
||||
const cases = [
|
||||
{
|
||||
text: "[[event: Team Meeting | January 24, 2026 | 2:00 PM - 3:00 PM | Conference Room A | Discuss Q1 roadmap]]",
|
||||
altText: "📅 Team Meeting - January 24, 2026 2:00 PM - 3:00 PM",
|
||||
},
|
||||
{
|
||||
text: "[[event: Birthday Party | March 15]]",
|
||||
altText: "📅 Birthday Party - March 15",
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
const flexMessage = getLineData(result).flexMessage as { altText?: string };
|
||||
expect(flexMessage).toBeDefined();
|
||||
expect(flexMessage?.altText).toBe(testCase.altText);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("agenda", () => {
|
||||
it("parses agenda variants", () => {
|
||||
const cases = [
|
||||
{
|
||||
text: "[[agenda: Today's Schedule | Team Meeting:9:00 AM, Lunch:12:00 PM, Review:3:00 PM]]",
|
||||
altText: "📋 Today's Schedule (3 events)",
|
||||
},
|
||||
{
|
||||
text: "[[agenda: Tasks | Buy groceries, Call mom, Workout]]",
|
||||
altText: "📋 Tasks (3 events)",
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
const flexMessage = getLineData(result).flexMessage as { altText?: string };
|
||||
expect(flexMessage).toBeDefined();
|
||||
expect(flexMessage?.altText).toBe(testCase.altText);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("device", () => {
|
||||
it("parses device variants", () => {
|
||||
const cases = [
|
||||
{
|
||||
text: "[[device: TV | Streaming Box | Playing | Play/Pause:toggle, Menu:menu]]",
|
||||
altText: "📱 TV: Playing",
|
||||
},
|
||||
{
|
||||
text: "[[device: Speaker]]",
|
||||
altText: "📱 Speaker",
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
const flexMessage = getLineData(result).flexMessage as { altText?: string };
|
||||
expect(flexMessage).toBeDefined();
|
||||
expect(flexMessage?.altText).toBe(testCase.altText);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("appletv_remote", () => {
|
||||
it("parses appletv remote variants", () => {
|
||||
const cases = [
|
||||
{
|
||||
text: "[[appletv_remote: Apple TV | Playing]]",
|
||||
contains: "Apple TV",
|
||||
},
|
||||
{
|
||||
text: "[[appletv_remote: Apple TV]]",
|
||||
contains: undefined,
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
const result = parseLineDirectives({ text: testCase.text });
|
||||
const flexMessage = getLineData(result).flexMessage as { altText?: string };
|
||||
expect(flexMessage).toBeDefined();
|
||||
if (testCase.contains) {
|
||||
expect(flexMessage?.altText).toContain(testCase.contains);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("combined directives", () => {
|
||||
it("handles text with no directives", () => {
|
||||
const result = parseLineDirectives({
|
||||
text: "Just plain text here",
|
||||
});
|
||||
|
||||
expect(result.text).toBe("Just plain text here");
|
||||
expect(getLineData(result).quickReplies).toBeUndefined();
|
||||
expect(getLineData(result).location).toBeUndefined();
|
||||
expect(getLineData(result).templateMessage).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves other payload fields", () => {
|
||||
const result = parseLineDirectives({
|
||||
text: "Hello [[quick_replies: A, B]]",
|
||||
mediaUrl: "https://example.com/image.jpg",
|
||||
replyToId: "msg123",
|
||||
});
|
||||
|
||||
expect(result.mediaUrl).toBe("https://example.com/image.jpg");
|
||||
expect(result.replyToId).toBe("msg123");
|
||||
expect(getLineData(result).quickReplies).toEqual(["A", "B"]);
|
||||
});
|
||||
});
|
||||
});
|
||||
338
src/auto-reply/reply/queue.collect.test.ts
Normal file
338
src/auto-reply/reply/queue.collect.test.ts
Normal file
@@ -0,0 +1,338 @@
|
||||
import { afterAll, beforeAll, describe, expect, it } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import type { FollowupRun, QueueSettings } from "./queue.js";
|
||||
import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js";
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
function createRun(params: {
|
||||
prompt: string;
|
||||
messageId?: string;
|
||||
originatingChannel?: FollowupRun["originatingChannel"];
|
||||
originatingTo?: string;
|
||||
originatingAccountId?: string;
|
||||
originatingThreadId?: string | number;
|
||||
}): FollowupRun {
|
||||
return {
|
||||
prompt: params.prompt,
|
||||
messageId: params.messageId,
|
||||
enqueuedAt: Date.now(),
|
||||
originatingChannel: params.originatingChannel,
|
||||
originatingTo: params.originatingTo,
|
||||
originatingAccountId: params.originatingAccountId,
|
||||
originatingThreadId: params.originatingThreadId,
|
||||
run: {
|
||||
agentId: "agent",
|
||||
agentDir: "/tmp",
|
||||
sessionId: "sess",
|
||||
sessionFile: "/tmp/session.json",
|
||||
workspaceDir: "/tmp",
|
||||
config: {} as OpenClawConfig,
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timeoutMs: 10_000,
|
||||
blockReplyBreak: "text_end",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
let previousRuntimeError: typeof defaultRuntime.error;
|
||||
|
||||
beforeAll(() => {
|
||||
previousRuntimeError = defaultRuntime.error;
|
||||
defaultRuntime.error = (() => {}) as typeof defaultRuntime.error;
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
defaultRuntime.error = previousRuntimeError;
|
||||
});
|
||||
|
||||
describe("followup queue collect routing", () => {
|
||||
it("does not collect when destinations differ", async () => {
|
||||
const key = `test-collect-diff-to-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const expectedCalls = 2;
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
if (calls.length >= expectedCalls) {
|
||||
done.resolve();
|
||||
}
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "one",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "two",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:B",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toBe("one");
|
||||
expect(calls[1]?.prompt).toBe("two");
|
||||
});
|
||||
|
||||
it("collects when channel+destination match", async () => {
|
||||
const key = `test-collect-same-to-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "one",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "two",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]");
|
||||
expect(calls[0]?.originatingChannel).toBe("slack");
|
||||
expect(calls[0]?.originatingTo).toBe("channel:A");
|
||||
});
|
||||
|
||||
it("collects Slack messages in same thread and preserves string thread id", async () => {
|
||||
const key = `test-collect-slack-thread-same-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "one",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
originatingThreadId: "1706000000.000001",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "two",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
originatingThreadId: "1706000000.000001",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]");
|
||||
expect(calls[0]?.originatingThreadId).toBe("1706000000.000001");
|
||||
});
|
||||
|
||||
it("does not collect Slack messages when thread ids differ", async () => {
|
||||
const key = `test-collect-slack-thread-diff-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const expectedCalls = 2;
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
if (calls.length >= expectedCalls) {
|
||||
done.resolve();
|
||||
}
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "one",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
originatingThreadId: "1706000000.000001",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "two",
|
||||
originatingChannel: "slack",
|
||||
originatingTo: "channel:A",
|
||||
originatingThreadId: "1706000000.000002",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toBe("one");
|
||||
expect(calls[1]?.prompt).toBe("two");
|
||||
expect(calls[0]?.originatingThreadId).toBe("1706000000.000001");
|
||||
expect(calls[1]?.originatingThreadId).toBe("1706000000.000002");
|
||||
});
|
||||
|
||||
it("retries collect-mode batches without losing queued items", async () => {
|
||||
const key = `test-collect-retry-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
let attempt = 0;
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
attempt += 1;
|
||||
if (attempt === 1) {
|
||||
throw new Error("transient failure");
|
||||
}
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "one" }), settings);
|
||||
enqueueFollowupRun(key, createRun({ prompt: "two" }), settings);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toContain("Queued #1\none");
|
||||
expect(calls[0]?.prompt).toContain("Queued #2\ntwo");
|
||||
});
|
||||
|
||||
it("retries overflow summary delivery without losing dropped previews", async () => {
|
||||
const key = `test-overflow-summary-retry-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
let attempt = 0;
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
attempt += 1;
|
||||
if (attempt === 1) {
|
||||
throw new Error("transient failure");
|
||||
}
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 1,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "first" }), settings);
|
||||
enqueueFollowupRun(key, createRun({ prompt: "second" }), settings);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
|
||||
expect(calls[0]?.prompt).toContain("- first");
|
||||
});
|
||||
|
||||
it("preserves routing metadata on overflow summary followups", async () => {
|
||||
const key = `test-overflow-summary-routing-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 1,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
originatingAccountId: "work",
|
||||
originatingThreadId: "1739142736.000100",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "second",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
originatingAccountId: "work",
|
||||
originatingThreadId: "1739142736.000100",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
|
||||
expect(calls[0]?.originatingChannel).toBe("discord");
|
||||
expect(calls[0]?.originatingTo).toBe("channel:C1");
|
||||
expect(calls[0]?.originatingAccountId).toBe("work");
|
||||
expect(calls[0]?.originatingThreadId).toBe("1739142736.000100");
|
||||
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
|
||||
});
|
||||
});
|
||||
389
src/auto-reply/reply/queue.dedupe.test.ts
Normal file
389
src/auto-reply/reply/queue.dedupe.test.ts
Normal file
@@ -0,0 +1,389 @@
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
|
||||
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import type { FollowupRun, QueueSettings } from "./queue.js";
|
||||
import {
|
||||
enqueueFollowupRun,
|
||||
resetRecentQueuedMessageIdDedupe,
|
||||
scheduleFollowupDrain,
|
||||
} from "./queue.js";
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
function createRun(params: {
|
||||
prompt: string;
|
||||
messageId?: string;
|
||||
originatingChannel?: FollowupRun["originatingChannel"];
|
||||
originatingTo?: string;
|
||||
originatingAccountId?: string;
|
||||
originatingThreadId?: string | number;
|
||||
}): FollowupRun {
|
||||
return {
|
||||
prompt: params.prompt,
|
||||
messageId: params.messageId,
|
||||
enqueuedAt: Date.now(),
|
||||
originatingChannel: params.originatingChannel,
|
||||
originatingTo: params.originatingTo,
|
||||
originatingAccountId: params.originatingAccountId,
|
||||
originatingThreadId: params.originatingThreadId,
|
||||
run: {
|
||||
agentId: "agent",
|
||||
agentDir: "/tmp",
|
||||
sessionId: "sess",
|
||||
sessionFile: "/tmp/session.json",
|
||||
workspaceDir: "/tmp",
|
||||
config: {} as OpenClawConfig,
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timeoutMs: 10_000,
|
||||
blockReplyBreak: "text_end",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
let previousRuntimeError: typeof defaultRuntime.error;
|
||||
|
||||
beforeAll(() => {
|
||||
previousRuntimeError = defaultRuntime.error;
|
||||
defaultRuntime.error = (() => {}) as typeof defaultRuntime.error;
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
defaultRuntime.error = previousRuntimeError;
|
||||
});
|
||||
|
||||
describe("followup queue deduplication", () => {
|
||||
beforeEach(() => {
|
||||
resetRecentQueuedMessageIdDedupe();
|
||||
});
|
||||
|
||||
it("deduplicates messages with same Discord message_id", async () => {
|
||||
const key = `test-dedup-message-id-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const expectedCalls = 1;
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
if (calls.length >= expectedCalls) {
|
||||
done.resolve();
|
||||
}
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
const first = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "[Discord Guild #test channel id:123] Hello",
|
||||
messageId: "m1",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:123",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(first).toBe(true);
|
||||
|
||||
const second = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "[Discord Guild #test channel id:123] Hello (dupe)",
|
||||
messageId: "m1",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:123",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(second).toBe(false);
|
||||
|
||||
const third = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "[Discord Guild #test channel id:123] World",
|
||||
messageId: "m2",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:123",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(third).toBe(true);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]");
|
||||
});
|
||||
|
||||
it("deduplicates same message_id after queue drain restarts", async () => {
|
||||
const key = `test-dedup-after-drain-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
const first = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "+10000000000",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(first).toBe(true);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
|
||||
const redelivery = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first-redelivery",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "+10000000000",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
expect(redelivery).toBe(false);
|
||||
expect(calls).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("deduplicates same message_id across distinct enqueue module instances", async () => {
|
||||
const enqueueA = await importFreshModule<typeof import("./queue/enqueue.js")>(
|
||||
import.meta.url,
|
||||
"./queue/enqueue.js?scope=dedupe-a",
|
||||
);
|
||||
const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>(
|
||||
import.meta.url,
|
||||
"./queue/enqueue.js?scope=dedupe-b",
|
||||
);
|
||||
const { clearSessionQueues } = await import("./queue.js");
|
||||
const key = `test-dedup-cross-module-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueA.resetRecentQueuedMessageIdDedupe();
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
|
||||
try {
|
||||
expect(
|
||||
enqueueA.enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "+10000000000",
|
||||
}),
|
||||
settings,
|
||||
),
|
||||
).toBe(true);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(
|
||||
enqueueB.enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first-redelivery",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "+10000000000",
|
||||
}),
|
||||
settings,
|
||||
),
|
||||
).toBe(false);
|
||||
expect(calls).toHaveLength(1);
|
||||
} finally {
|
||||
clearSessionQueues([key]);
|
||||
enqueueA.resetRecentQueuedMessageIdDedupe();
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not collide recent message-id keys when routing contains delimiters", async () => {
|
||||
const key = `test-dedup-key-collision-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
const first = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal|group",
|
||||
originatingTo: "peer",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(first).toBe(true);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
|
||||
const second = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "second",
|
||||
messageId: "same-id",
|
||||
originatingChannel: "signal",
|
||||
originatingTo: "group|peer",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(second).toBe(true);
|
||||
});
|
||||
|
||||
it("deduplicates exact prompt when routing matches and no message id", async () => {
|
||||
const key = `test-dedup-whatsapp-${Date.now()}`;
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
const first = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Hello world",
|
||||
originatingChannel: "whatsapp",
|
||||
originatingTo: "+1234567890",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(first).toBe(true);
|
||||
|
||||
const second = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Hello world",
|
||||
originatingChannel: "whatsapp",
|
||||
originatingTo: "+1234567890",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(second).toBe(true);
|
||||
|
||||
const third = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Hello world 2",
|
||||
originatingChannel: "whatsapp",
|
||||
originatingTo: "+1234567890",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(third).toBe(true);
|
||||
});
|
||||
|
||||
it("does not deduplicate across different providers without message id", async () => {
|
||||
const key = `test-dedup-cross-provider-${Date.now()}`;
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
const first = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Same text",
|
||||
originatingChannel: "whatsapp",
|
||||
originatingTo: "+1234567890",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(first).toBe(true);
|
||||
|
||||
const second = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Same text",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:123",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
expect(second).toBe(true);
|
||||
});
|
||||
|
||||
it("can opt-in to prompt-based dedupe when message id is absent", async () => {
|
||||
const key = `test-dedup-prompt-mode-${Date.now()}`;
|
||||
const settings: QueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 50,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
const first = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Hello world",
|
||||
originatingChannel: "whatsapp",
|
||||
originatingTo: "+1234567890",
|
||||
}),
|
||||
settings,
|
||||
"prompt",
|
||||
);
|
||||
expect(first).toBe(true);
|
||||
|
||||
const second = enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "Hello world",
|
||||
originatingChannel: "whatsapp",
|
||||
originatingTo: "+1234567890",
|
||||
}),
|
||||
settings,
|
||||
"prompt",
|
||||
);
|
||||
expect(second).toBe(false);
|
||||
});
|
||||
});
|
||||
331
src/auto-reply/reply/queue.drain-restart.test.ts
Normal file
331
src/auto-reply/reply/queue.drain-restart.test.ts
Normal file
@@ -0,0 +1,331 @@
|
||||
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import type { FollowupRun, QueueSettings } from "./queue.js";
|
||||
import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js";
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
function createRun(params: {
|
||||
prompt: string;
|
||||
messageId?: string;
|
||||
originatingChannel?: FollowupRun["originatingChannel"];
|
||||
originatingTo?: string;
|
||||
originatingAccountId?: string;
|
||||
originatingThreadId?: string | number;
|
||||
}): FollowupRun {
|
||||
return {
|
||||
prompt: params.prompt,
|
||||
messageId: params.messageId,
|
||||
enqueuedAt: Date.now(),
|
||||
originatingChannel: params.originatingChannel,
|
||||
originatingTo: params.originatingTo,
|
||||
originatingAccountId: params.originatingAccountId,
|
||||
originatingThreadId: params.originatingThreadId,
|
||||
run: {
|
||||
agentId: "agent",
|
||||
agentDir: "/tmp",
|
||||
sessionId: "sess",
|
||||
sessionFile: "/tmp/session.json",
|
||||
workspaceDir: "/tmp",
|
||||
config: {} as OpenClawConfig,
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timeoutMs: 10_000,
|
||||
blockReplyBreak: "text_end",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
let previousRuntimeError: typeof defaultRuntime.error;
|
||||
|
||||
beforeAll(() => {
|
||||
previousRuntimeError = defaultRuntime.error;
|
||||
defaultRuntime.error = (() => {}) as typeof defaultRuntime.error;
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
defaultRuntime.error = previousRuntimeError;
|
||||
});
|
||||
|
||||
describe("followup queue drain restart after idle window", () => {
|
||||
it("does not retain stale callbacks when scheduleFollowupDrain runs with an empty queue", async () => {
|
||||
const key = `test-no-stale-callback-${Date.now()}`;
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
const staleCalls: FollowupRun[] = [];
|
||||
const freshCalls: FollowupRun[] = [];
|
||||
const drained = createDeferred<void>();
|
||||
|
||||
scheduleFollowupDrain(key, async (run) => {
|
||||
staleCalls.push(run);
|
||||
});
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "after-empty-schedule" }), settings);
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
expect(staleCalls).toHaveLength(0);
|
||||
|
||||
scheduleFollowupDrain(key, async (run) => {
|
||||
freshCalls.push(run);
|
||||
drained.resolve();
|
||||
});
|
||||
await drained.promise;
|
||||
|
||||
expect(staleCalls).toHaveLength(0);
|
||||
expect(freshCalls).toHaveLength(1);
|
||||
expect(freshCalls[0]?.prompt).toBe("after-empty-schedule");
|
||||
});
|
||||
|
||||
it("processes a message enqueued after the drain empties when enqueue refreshes the callback", async () => {
|
||||
const key = `test-idle-window-race-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
|
||||
const firstProcessed = createDeferred<void>();
|
||||
const secondProcessed = createDeferred<void>();
|
||||
let callCount = 0;
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
callCount++;
|
||||
calls.push(run);
|
||||
if (callCount === 1) {
|
||||
firstProcessed.resolve();
|
||||
}
|
||||
if (callCount === 2) {
|
||||
secondProcessed.resolve();
|
||||
}
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await firstProcessed.promise;
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({ prompt: "after-idle" }),
|
||||
settings,
|
||||
"message-id",
|
||||
runFollowup,
|
||||
);
|
||||
|
||||
await secondProcessed.promise;
|
||||
|
||||
expect(calls).toHaveLength(2);
|
||||
expect(calls[0]?.prompt).toBe("before-idle");
|
||||
expect(calls[1]?.prompt).toBe("after-idle");
|
||||
});
|
||||
|
||||
it("restarts an idle drain with the newest followup callback", async () => {
|
||||
const key = `test-idle-window-fresh-callback-${Date.now()}`;
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
const staleCalls: FollowupRun[] = [];
|
||||
const freshCalls: FollowupRun[] = [];
|
||||
const firstProcessed = createDeferred<void>();
|
||||
const secondProcessed = createDeferred<void>();
|
||||
|
||||
const staleFollowup = async (run: FollowupRun) => {
|
||||
staleCalls.push(run);
|
||||
if (staleCalls.length === 1) {
|
||||
firstProcessed.resolve();
|
||||
}
|
||||
};
|
||||
const freshFollowup = async (run: FollowupRun) => {
|
||||
freshCalls.push(run);
|
||||
secondProcessed.resolve();
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
|
||||
scheduleFollowupDrain(key, staleFollowup);
|
||||
await firstProcessed.promise;
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({ prompt: "after-idle" }),
|
||||
settings,
|
||||
"message-id",
|
||||
freshFollowup,
|
||||
);
|
||||
await secondProcessed.promise;
|
||||
|
||||
expect(staleCalls).toHaveLength(1);
|
||||
expect(staleCalls[0]?.prompt).toBe("before-idle");
|
||||
expect(freshCalls).toHaveLength(1);
|
||||
expect(freshCalls[0]?.prompt).toBe("after-idle");
|
||||
});
|
||||
|
||||
it("does not auto-start a drain when a busy run only refreshes the callback", async () => {
|
||||
const key = `test-busy-run-refreshes-callback-${Date.now()}`;
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
const staleCalls: FollowupRun[] = [];
|
||||
const freshCalls: FollowupRun[] = [];
|
||||
|
||||
const staleFollowup = async (run: FollowupRun) => {
|
||||
staleCalls.push(run);
|
||||
};
|
||||
const freshFollowup = async (run: FollowupRun) => {
|
||||
freshCalls.push(run);
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({ prompt: "queued-while-busy" }),
|
||||
settings,
|
||||
"message-id",
|
||||
freshFollowup,
|
||||
false,
|
||||
);
|
||||
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
expect(freshCalls).toHaveLength(0);
|
||||
|
||||
scheduleFollowupDrain(key, staleFollowup);
|
||||
await vi.waitFor(() => {
|
||||
expect(freshCalls).toHaveLength(1);
|
||||
});
|
||||
|
||||
expect(staleCalls).toHaveLength(0);
|
||||
expect(freshCalls[0]?.prompt).toBe("queued-while-busy");
|
||||
});
|
||||
|
||||
it("restarts an idle drain across distinct enqueue and drain module instances when enqueue refreshes the callback", async () => {
|
||||
const drainA = await importFreshModule<typeof import("./queue/drain.js")>(
|
||||
import.meta.url,
|
||||
"./queue/drain.js?scope=restart-a",
|
||||
);
|
||||
const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>(
|
||||
import.meta.url,
|
||||
"./queue/enqueue.js?scope=restart-b",
|
||||
);
|
||||
const { clearSessionQueues } = await import("./queue.js");
|
||||
const key = `test-idle-window-cross-module-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
const firstProcessed = createDeferred<void>();
|
||||
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
|
||||
try {
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
if (calls.length === 1) {
|
||||
firstProcessed.resolve();
|
||||
}
|
||||
};
|
||||
|
||||
enqueueB.enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
|
||||
drainA.scheduleFollowupDrain(key, runFollowup);
|
||||
await firstProcessed.promise;
|
||||
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
enqueueB.enqueueFollowupRun(
|
||||
key,
|
||||
createRun({ prompt: "after-idle" }),
|
||||
settings,
|
||||
"message-id",
|
||||
runFollowup,
|
||||
);
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(calls).toHaveLength(2);
|
||||
},
|
||||
{ timeout: 1_000 },
|
||||
);
|
||||
|
||||
expect(calls[0]?.prompt).toBe("before-idle");
|
||||
expect(calls[1]?.prompt).toBe("after-idle");
|
||||
} finally {
|
||||
clearSessionQueues([key]);
|
||||
drainA.clearFollowupDrainCallback(key);
|
||||
enqueueB.resetRecentQueuedMessageIdDedupe();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not double-drain when a message arrives while drain is still running", async () => {
|
||||
const key = `test-no-double-drain-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
|
||||
const allProcessed = createDeferred<void>();
|
||||
let runFollowupResolve!: () => void;
|
||||
const runFollowupGate = new Promise<void>((res) => {
|
||||
runFollowupResolve = res;
|
||||
});
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
await runFollowupGate;
|
||||
calls.push(run);
|
||||
if (calls.length >= 2) {
|
||||
allProcessed.resolve();
|
||||
}
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "first" }), settings);
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
enqueueFollowupRun(key, createRun({ prompt: "second" }), settings);
|
||||
runFollowupResolve();
|
||||
|
||||
await allProcessed.promise;
|
||||
expect(calls).toHaveLength(2);
|
||||
expect(calls[0]?.prompt).toBe("first");
|
||||
expect(calls[1]?.prompt).toBe("second");
|
||||
});
|
||||
|
||||
it("does not process messages after clearSessionQueues clears the callback", async () => {
|
||||
const key = `test-clear-callback-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
|
||||
const firstProcessed = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
firstProcessed.resolve();
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "before-clear" }), settings);
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await firstProcessed.promise;
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
const { clearSessionQueues } = await import("./queue.js");
|
||||
clearSessionQueues([key]);
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "after-clear" }), settings);
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.prompt).toBe("before-clear");
|
||||
});
|
||||
|
||||
it("clears the remembered callback after a queue drains fully", async () => {
|
||||
const key = `test-auto-clear-callback-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
|
||||
const firstProcessed = createDeferred<void>();
|
||||
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
firstProcessed.resolve();
|
||||
};
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await firstProcessed.promise;
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings);
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.prompt).toBe("before-idle");
|
||||
});
|
||||
});
|
||||
File diff suppressed because it is too large
Load Diff
220
src/auto-reply/reply/slack-directives.test.ts
Normal file
220
src/auto-reply/reply/slack-directives.test.ts
Normal file
@@ -0,0 +1,220 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { hasSlackDirectives, parseSlackDirectives } from "./slack-directives.js";
|
||||
|
||||
const getSlackInteractive = (result: ReturnType<typeof parseSlackDirectives>) =>
|
||||
result.interactive?.blocks ?? [];
|
||||
|
||||
describe("hasSlackDirectives", () => {
|
||||
it("matches expected detection across Slack directive patterns", () => {
|
||||
const cases: Array<{ text: string; expected: boolean }> = [
|
||||
{ text: "Pick one [[slack_buttons: Approve:approve, Reject:reject]]", expected: true },
|
||||
{
|
||||
text: "[[slack_select: Choose a project | Alpha:alpha, Beta:beta]]",
|
||||
expected: true,
|
||||
},
|
||||
{ text: "Just regular text", expected: false },
|
||||
{ text: "[[buttons: Menu | Choose | A:a]]", expected: false },
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
expect(hasSlackDirectives(testCase.text)).toBe(testCase.expected);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("parseSlackDirectives", () => {
|
||||
it("builds shared text and button blocks from slack_buttons directives", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "Choose an action [[slack_buttons: Approve:approve, Reject:reject]]",
|
||||
});
|
||||
|
||||
expect(result.text).toBe("Choose an action");
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "text",
|
||||
text: "Choose an action",
|
||||
},
|
||||
{
|
||||
type: "buttons",
|
||||
buttons: [
|
||||
{
|
||||
label: "Approve",
|
||||
value: "approve",
|
||||
},
|
||||
{
|
||||
label: "Reject",
|
||||
value: "reject",
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("builds shared select blocks from slack_select directives", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "[[slack_select: Choose a project | Alpha:alpha, Beta:beta]]",
|
||||
});
|
||||
|
||||
expect(result.text).toBeUndefined();
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "select",
|
||||
placeholder: "Choose a project",
|
||||
options: [
|
||||
{ label: "Alpha", value: "alpha" },
|
||||
{ label: "Beta", value: "beta" },
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("leaves existing slack blocks in channelData and appends shared interactive blocks", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "Act now [[slack_buttons: Retry:retry]]",
|
||||
channelData: {
|
||||
slack: {
|
||||
blocks: [{ type: "divider" }],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.text).toBe("Act now");
|
||||
expect(result.channelData).toEqual({
|
||||
slack: {
|
||||
blocks: [{ type: "divider" }],
|
||||
},
|
||||
});
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "text",
|
||||
text: "Act now",
|
||||
},
|
||||
{
|
||||
type: "buttons",
|
||||
buttons: [{ label: "Retry", value: "retry" }],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves authored order for mixed Slack directives", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "[[slack_select: Pick one | Alpha:alpha]] then [[slack_buttons: Retry:retry]]",
|
||||
});
|
||||
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "select",
|
||||
placeholder: "Pick one",
|
||||
options: [{ label: "Alpha", value: "alpha" }],
|
||||
},
|
||||
{
|
||||
type: "text",
|
||||
text: "then",
|
||||
},
|
||||
{
|
||||
type: "buttons",
|
||||
buttons: [{ label: "Retry", value: "retry" }],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves long Slack directive values in the shared interactive model", () => {
|
||||
const long = "x".repeat(120);
|
||||
const result = parseSlackDirectives({
|
||||
text: `${"y".repeat(3100)} [[slack_select: ${long} | ${long}:${long}]] [[slack_buttons: ${long}:${long}]]`,
|
||||
});
|
||||
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "text",
|
||||
text: "y".repeat(3100),
|
||||
},
|
||||
{
|
||||
type: "select",
|
||||
placeholder: long,
|
||||
options: [{ label: long, value: long }],
|
||||
},
|
||||
{
|
||||
type: "buttons",
|
||||
buttons: [{ label: long, value: long }],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("parses optional Slack button styles without truncating callback values", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "[[slack_buttons: Approve:pluginbind:approval-123:o:primary, Reject:deny:danger, Skip:skip:secondary]]",
|
||||
});
|
||||
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "buttons",
|
||||
buttons: [
|
||||
{
|
||||
label: "Approve",
|
||||
value: "pluginbind:approval-123:o",
|
||||
style: "primary",
|
||||
},
|
||||
{
|
||||
label: "Reject",
|
||||
value: "deny",
|
||||
style: "danger",
|
||||
},
|
||||
{
|
||||
label: "Skip",
|
||||
value: "skip",
|
||||
style: "secondary",
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves slack_select values that end in style-like suffixes", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "[[slack_select: Choose one | Queue:queue:danger, Archive:archive:primary]]",
|
||||
});
|
||||
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{
|
||||
type: "select",
|
||||
placeholder: "Choose one",
|
||||
options: [
|
||||
{
|
||||
label: "Queue",
|
||||
value: "queue:danger",
|
||||
},
|
||||
{
|
||||
label: "Archive",
|
||||
value: "archive:primary",
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps existing interactive blocks when compiling additional Slack directives", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "Choose [[slack_buttons: Retry:retry]]",
|
||||
interactive: {
|
||||
blocks: [{ type: "text", text: "Existing" }],
|
||||
},
|
||||
});
|
||||
|
||||
expect(getSlackInteractive(result)).toEqual([
|
||||
{ type: "text", text: "Existing" },
|
||||
{ type: "text", text: "Choose" },
|
||||
{ type: "buttons", buttons: [{ label: "Retry", value: "retry" }] },
|
||||
]);
|
||||
});
|
||||
|
||||
it("ignores malformed directive choices when none remain", () => {
|
||||
const result = parseSlackDirectives({
|
||||
text: "Choose [[slack_buttons: : , : ]]",
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
text: "Choose [[slack_buttons: : , : ]]",
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user