Files
openclaw/src/infra/session-delivery-queue.storage.test.ts
2026-05-09 05:48:53 +01:00

96 lines
3.3 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
ackSessionDelivery,
enqueueSessionDelivery,
failSessionDelivery,
loadPendingSessionDeliveries,
resolveSessionDeliveryQueueDir,
} from "./session-delivery-queue.js";
describe("session-delivery queue storage", () => {
it("dedupes entries when an idempotency key is reused", async () => {
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
const firstId = await enqueueSessionDelivery(
{
kind: "agentTurn",
sessionKey: "agent:main:main",
message: "continue after restart",
messageId: "restart-sentinel:agent:main:main:agentTurn:123",
idempotencyKey: "restart-sentinel:agent:main:main:agentTurn:123",
},
tempDir,
);
const secondId = await enqueueSessionDelivery(
{
kind: "agentTurn",
sessionKey: "agent:main:main",
message: "continue after restart",
messageId: "restart-sentinel:agent:main:main:agentTurn:123",
idempotencyKey: "restart-sentinel:agent:main:main:agentTurn:123",
},
tempDir,
);
expect(secondId).toBe(firstId);
expect(await loadPendingSessionDeliveries(tempDir)).toHaveLength(1);
});
});
it("persists retry metadata and removes acked entries", async () => {
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
const id = await enqueueSessionDelivery(
{
kind: "systemEvent",
sessionKey: "agent:main:main",
text: "restart complete",
},
tempDir,
);
await failSessionDelivery(id, "dispatch failed", tempDir);
const [failedEntry] = await loadPendingSessionDeliveries(tempDir);
expect(failedEntry?.retryCount).toBe(1);
expect(failedEntry?.lastError).toBe("dispatch failed");
await ackSessionDelivery(id, tempDir);
expect(await loadPendingSessionDeliveries(tempDir)).toStrictEqual([]);
});
});
it("cleans up orphaned temporary queue files during load", async () => {
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
await enqueueSessionDelivery(
{
kind: "systemEvent",
sessionKey: "agent:main:main",
text: "restart complete",
},
tempDir,
);
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "orphan-entry.tmp");
fs.writeFileSync(tmpPath, "stale tmp");
const staleAt = new Date(Date.now() - 60_000);
fs.utimesSync(tmpPath, staleAt, staleAt);
await loadPendingSessionDeliveries(tempDir);
expect(fs.existsSync(tmpPath)).toBe(false);
});
});
it("keeps fresh temporary queue files while a write may still be in flight", async () => {
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "active-entry.tmp");
fs.mkdirSync(path.dirname(tmpPath), { recursive: true });
fs.writeFileSync(tmpPath, "active tmp");
await loadPendingSessionDeliveries(tempDir);
expect(fs.existsSync(tmpPath)).toBe(true);
});
});
});