diff --git a/scripts/generate-base-config-schema.ts b/scripts/generate-base-config-schema.ts index c1817b30f42..4c0b1ed8c4f 100644 --- a/scripts/generate-base-config-schema.ts +++ b/scripts/generate-base-config-schema.ts @@ -1,9 +1,9 @@ #!/usr/bin/env node -import { spawnSync } from "node:child_process"; import fs from "node:fs"; import path from "node:path"; import { fileURLToPath } from "node:url"; import { computeBaseConfigSchemaResponse } from "../src/config/schema-base.js"; +import { formatGeneratedModule } from "./lib/format-generated-module.mjs"; const GENERATED_BY = "scripts/generate-base-config-schema.ts"; const DEFAULT_OUTPUT_PATH = "src/config/schema.base.generated.ts"; @@ -18,26 +18,11 @@ function readIfExists(filePath: string): string | null { function formatTypeScriptModule(source: string, outputPath: string): string { const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), ".."); - const formatter = spawnSync( - process.platform === "win32" ? "pnpm" : "pnpm", - ["exec", "oxfmt", "--stdin-filepath", outputPath], - { - cwd: repoRoot, - input: source, - encoding: "utf8", - // Windows requires a shell to launch package-manager shim scripts reliably. - ...(process.platform === "win32" ? { shell: true } : {}), - }, - ); - if (formatter.status !== 0) { - const details = - formatter.stderr?.trim() || - formatter.stdout?.trim() || - formatter.error?.message || - "unknown formatter failure"; - throw new Error(`failed to format generated base config schema: ${details}`); - } - return formatter.stdout; + return formatGeneratedModule(source, { + repoRoot, + outputPath, + errorLabel: "base config schema", + }); } export function renderBaseConfigSchemaModule(params?: { generatedAt?: string }): string { diff --git a/scripts/generate-bundled-plugin-metadata.mjs b/scripts/generate-bundled-plugin-metadata.mjs index 7dd67308acd..e214c5a7061 100644 --- a/scripts/generate-bundled-plugin-metadata.mjs +++ b/scripts/generate-bundled-plugin-metadata.mjs @@ -1,14 +1,13 @@ -import { spawnSync } from "node:child_process"; import fs from "node:fs"; import path from "node:path"; import { pathToFileURL } from "node:url"; +import { formatGeneratedModule } from "./lib/format-generated-module.mjs"; import { writeTextFileIfChanged } from "./runtime-postbuild-shared.mjs"; const GENERATED_BY = "scripts/generate-bundled-plugin-metadata.mjs"; const DEFAULT_OUTPUT_PATH = "src/plugins/bundled-plugin-metadata.generated.ts"; const MANIFEST_KEY = "openclaw"; const FORMATTER_CWD = path.resolve(import.meta.dirname, ".."); -const OXFMT_BIN = path.join(FORMATTER_CWD, "node_modules", ".bin", "oxfmt"); const CANONICAL_PACKAGE_ID_ALIASES = { "elevenlabs-speech": "elevenlabs", "microsoft-speech": "microsoft", @@ -128,28 +127,11 @@ function normalizePluginManifest(raw) { } function formatTypeScriptModule(source, { outputPath }) { - const formatterPath = path.relative(FORMATTER_CWD, outputPath) || outputPath; - const useDirectFormatter = process.platform !== "win32" && fs.existsSync(OXFMT_BIN); - const command = useDirectFormatter ? OXFMT_BIN : "pnpm"; - const args = useDirectFormatter - ? ["--stdin-filepath", formatterPath] - : ["exec", "oxfmt", "--stdin-filepath", formatterPath]; - const formatter = spawnSync(command, args, { - cwd: FORMATTER_CWD, - input: source, - encoding: "utf8", - // Windows requires a shell to launch package-manager shim scripts reliably. - ...(process.platform === "win32" ? { shell: true } : {}), + return formatGeneratedModule(source, { + repoRoot: FORMATTER_CWD, + outputPath, + errorLabel: "bundled plugin metadata", }); - if (formatter.status !== 0) { - const details = - formatter.stderr?.trim() || - formatter.stdout?.trim() || - formatter.error?.message || - "unknown formatter failure"; - throw new Error(`failed to format generated bundled plugin metadata: ${details}`); - } - return formatter.stdout; } export function collectBundledPluginMetadata(params = {}) { diff --git a/scripts/lib/format-generated-module.mjs b/scripts/lib/format-generated-module.mjs new file mode 100644 index 00000000000..bfd65997d63 --- /dev/null +++ b/scripts/lib/format-generated-module.mjs @@ -0,0 +1,34 @@ +import { spawnSync } from "node:child_process"; +import fs from "node:fs"; +import path from "node:path"; + +export function formatGeneratedModule(source, { repoRoot, outputPath, errorLabel }) { + const resolvedRepoRoot = path.resolve(repoRoot); + const resolvedOutputPath = path.resolve( + resolvedRepoRoot, + path.isAbsolute(outputPath) ? path.relative(resolvedRepoRoot, outputPath) : outputPath, + ); + const formatterPath = path.relative(resolvedRepoRoot, resolvedOutputPath) || resolvedOutputPath; + const directFormatterPath = path.join(resolvedRepoRoot, "node_modules", ".bin", "oxfmt"); + const useDirectFormatter = process.platform !== "win32" && fs.existsSync(directFormatterPath); + const command = useDirectFormatter ? directFormatterPath : "pnpm"; + const args = useDirectFormatter + ? ["--stdin-filepath", formatterPath] + : ["exec", "oxfmt", "--stdin-filepath", formatterPath]; + const formatter = spawnSync(command, args, { + cwd: resolvedRepoRoot, + input: source, + encoding: "utf8", + // Windows requires a shell to launch package-manager shim scripts reliably. + ...(process.platform === "win32" ? { shell: true } : {}), + }); + if (formatter.status !== 0) { + const details = + formatter.stderr?.trim() || + formatter.stdout?.trim() || + formatter.error?.message || + "unknown formatter failure"; + throw new Error(`failed to format generated ${errorLabel}: ${details}`); + } + return formatter.stdout; +} diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts new file mode 100644 index 00000000000..390c53eeb5f --- /dev/null +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -0,0 +1,220 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import { + ackDelivery, + failDelivery, + loadPendingDeliveries, + moveToFailed, + type QueuedDelivery, + type QueuedDeliveryPayload, +} from "./delivery-queue-storage.js"; + +export type RecoverySummary = { + recovered: number; + failed: number; + skippedMaxRetries: number; + deferredBackoff: number; +}; + +export type DeliverFn = ( + params: { + cfg: OpenClawConfig; + } & QueuedDeliveryPayload & { + skipQueue?: boolean; + }, +) => Promise; + +export interface RecoveryLogger { + info(msg: string): void; + warn(msg: string): void; + error(msg: string): void; +} + +const MAX_RETRIES = 5; + +/** Backoff delays in milliseconds indexed by retry count (1-based). */ +const BACKOFF_MS: readonly number[] = [ + 5_000, // retry 1: 5s + 25_000, // retry 2: 25s + 120_000, // retry 3: 2m + 600_000, // retry 4: 10m +]; + +const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [ + /no conversation reference found/i, + /chat not found/i, + /user not found/i, + /bot was blocked by the user/i, + /forbidden: bot was kicked/i, + /chat_id is empty/i, + /recipient is not a valid/i, + /outbound not configured for channel/i, + /ambiguous discord recipient/i, +]; + +function createEmptyRecoverySummary(): RecoverySummary { + return { + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }; +} + +function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) { + return { + cfg, + channel: entry.channel, + to: entry.to, + accountId: entry.accountId, + payloads: entry.payloads, + threadId: entry.threadId, + replyToId: entry.replyToId, + bestEffort: entry.bestEffort, + gifPlayback: entry.gifPlayback, + forceDocument: entry.forceDocument, + silent: entry.silent, + mirror: entry.mirror, + skipQueue: true, // Prevent re-enqueueing during recovery. + } satisfies Parameters[0]; +} + +async function moveEntryToFailedWithLogging( + entryId: string, + log: RecoveryLogger, + stateDir?: string, +): Promise { + try { + await moveToFailed(entryId, stateDir); + } catch (err) { + log.error(`Failed to move entry ${entryId} to failed/: ${String(err)}`); + } +} + +async function deferRemainingEntriesForBudget( + entries: readonly QueuedDelivery[], + stateDir: string | undefined, +): Promise { + // Increment retryCount so entries that are repeatedly deferred by the + // recovery budget eventually hit MAX_RETRIES and get pruned. + await Promise.allSettled( + entries.map((entry) => failDelivery(entry.id, "recovery time budget exceeded", stateDir)), + ); +} + +/** Compute the backoff delay in ms for a given retry count. */ +export function computeBackoffMs(retryCount: number): number { + if (retryCount <= 0) { + return 0; + } + return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0; +} + +export function isEntryEligibleForRecoveryRetry( + entry: QueuedDelivery, + now: number, +): { eligible: true } | { eligible: false; remainingBackoffMs: number } { + const backoff = computeBackoffMs(entry.retryCount + 1); + if (backoff <= 0) { + return { eligible: true }; + } + const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined; + if (firstReplayAfterCrash) { + return { eligible: true }; + } + const hasAttemptTimestamp = + typeof entry.lastAttemptAt === "number" && + Number.isFinite(entry.lastAttemptAt) && + entry.lastAttemptAt > 0; + const baseAttemptAt = hasAttemptTimestamp + ? (entry.lastAttemptAt ?? entry.enqueuedAt) + : entry.enqueuedAt; + const nextEligibleAt = baseAttemptAt + backoff; + if (now >= nextEligibleAt) { + return { eligible: true }; + } + return { eligible: false, remainingBackoffMs: nextEligibleAt - now }; +} + +export function isPermanentDeliveryError(error: string): boolean { + return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error)); +} + +/** + * On gateway startup, scan the delivery queue and retry any pending entries. + * Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/. + */ +export async function recoverPendingDeliveries(opts: { + deliver: DeliverFn; + log: RecoveryLogger; + cfg: OpenClawConfig; + stateDir?: string; + /** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next startup. Default: 60 000. */ + maxRecoveryMs?: number; +}): Promise { + const pending = await loadPendingDeliveries(opts.stateDir); + if (pending.length === 0) { + return createEmptyRecoverySummary(); + } + + pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt); + opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`); + + const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000); + const summary = createEmptyRecoverySummary(); + + for (let i = 0; i < pending.length; i++) { + const entry = pending[i]; + const now = Date.now(); + if (now >= deadline) { + opts.log.warn(`Recovery time budget exceeded — remaining entries deferred to next startup`); + await deferRemainingEntriesForBudget(pending.slice(i), opts.stateDir); + break; + } + if (entry.retryCount >= MAX_RETRIES) { + opts.log.warn( + `Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`, + ); + await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir); + summary.skippedMaxRetries += 1; + continue; + } + + const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now); + if (!retryEligibility.eligible) { + summary.deferredBackoff += 1; + opts.log.info( + `Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`, + ); + continue; + } + + try { + await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg)); + await ackDelivery(entry.id, opts.stateDir); + summary.recovered += 1; + opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + if (isPermanentDeliveryError(errMsg)) { + opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`); + await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir); + summary.failed += 1; + continue; + } + try { + await failDelivery(entry.id, errMsg, opts.stateDir); + } catch { + // Best-effort update. + } + summary.failed += 1; + opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`); + } + } + + opts.log.info( + `Delivery recovery complete: ${summary.recovered} recovered, ${summary.failed} failed, ${summary.skippedMaxRetries} skipped (max retries), ${summary.deferredBackoff} deferred (backoff)`, + ); + return summary; +} + +export { MAX_RETRIES }; diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts new file mode 100644 index 00000000000..ce8eba05b7b --- /dev/null +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -0,0 +1,241 @@ +import fs from "node:fs"; +import path from "node:path"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import { resolveStateDir } from "../../config/paths.js"; +import { generateSecureUuid } from "../secure-random.js"; +import type { OutboundMirror } from "./mirror.js"; +import type { OutboundChannel } from "./targets.js"; + +const QUEUE_DIRNAME = "delivery-queue"; +const FAILED_DIRNAME = "failed"; + +export type QueuedDeliveryPayload = { + channel: Exclude; + to: string; + accountId?: string; + /** + * Original payloads before plugin hooks. On recovery, hooks re-run on these + * payloads — this is intentional since hooks are stateless transforms and + * should produce the same result on replay. + */ + payloads: ReplyPayload[]; + threadId?: string | number | null; + replyToId?: string | null; + bestEffort?: boolean; + gifPlayback?: boolean; + forceDocument?: boolean; + silent?: boolean; + mirror?: OutboundMirror; +}; + +export interface QueuedDelivery extends QueuedDeliveryPayload { + id: string; + enqueuedAt: number; + retryCount: number; + lastAttemptAt?: number; + lastError?: string; +} + +function resolveQueueDir(stateDir?: string): string { + const base = stateDir ?? resolveStateDir(); + return path.join(base, QUEUE_DIRNAME); +} + +function resolveFailedDir(stateDir?: string): string { + return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME); +} + +function resolveQueueEntryPaths( + id: string, + stateDir?: string, +): { + jsonPath: string; + deliveredPath: string; +} { + const queueDir = resolveQueueDir(stateDir); + return { + jsonPath: path.join(queueDir, `${id}.json`), + deliveredPath: path.join(queueDir, `${id}.delivered`), + }; +} + +function getErrnoCode(err: unknown): string | null { + return err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; +} + +async function unlinkBestEffort(filePath: string): Promise { + try { + await fs.promises.unlink(filePath); + } catch { + // Best-effort cleanup. + } +} + +async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise { + const tmp = `${filePath}.${process.pid}.tmp`; + await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { + encoding: "utf-8", + mode: 0o600, + }); + await fs.promises.rename(tmp, filePath); +} + +async function readQueueEntry(filePath: string): Promise { + return JSON.parse(await fs.promises.readFile(filePath, "utf-8")) as QueuedDelivery; +} + +function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): { + entry: QueuedDelivery; + migrated: boolean; +} { + const hasAttemptTimestamp = + typeof entry.lastAttemptAt === "number" && + Number.isFinite(entry.lastAttemptAt) && + entry.lastAttemptAt > 0; + if (hasAttemptTimestamp || entry.retryCount <= 0) { + return { entry, migrated: false }; + } + const hasEnqueuedTimestamp = + typeof entry.enqueuedAt === "number" && + Number.isFinite(entry.enqueuedAt) && + entry.enqueuedAt > 0; + if (!hasEnqueuedTimestamp) { + return { entry, migrated: false }; + } + return { + entry: { + ...entry, + lastAttemptAt: entry.enqueuedAt, + }, + migrated: true, + }; +} + +/** Ensure the queue directory (and failed/ subdirectory) exist. */ +export async function ensureQueueDir(stateDir?: string): Promise { + const queueDir = resolveQueueDir(stateDir); + await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 }); + await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 }); + return queueDir; +} + +/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */ +export async function enqueueDelivery( + params: QueuedDeliveryPayload, + stateDir?: string, +): Promise { + const queueDir = await ensureQueueDir(stateDir); + const id = generateSecureUuid(); + await writeQueueEntry(path.join(queueDir, `${id}.json`), { + id, + enqueuedAt: Date.now(), + channel: params.channel, + to: params.to, + accountId: params.accountId, + payloads: params.payloads, + threadId: params.threadId, + replyToId: params.replyToId, + bestEffort: params.bestEffort, + gifPlayback: params.gifPlayback, + forceDocument: params.forceDocument, + silent: params.silent, + mirror: params.mirror, + retryCount: 0, + }); + return id; +} + +/** Remove a successfully delivered entry from the queue. + * + * Uses a two-phase approach so that a crash between delivery and cleanup + * does not cause the message to be replayed on the next recovery scan: + * Phase 1: atomic rename {id}.json → {id}.delivered + * Phase 2: unlink the .delivered marker + * If the process dies between phase 1 and phase 2 the marker is cleaned up + * by {@link loadPendingDeliveries} on the next startup without re-sending. + */ +export async function ackDelivery(id: string, stateDir?: string): Promise { + const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir); + try { + // Phase 1: atomic rename marks the delivery as complete. + await fs.promises.rename(jsonPath, deliveredPath); + } catch (err) { + const code = getErrnoCode(err); + if (code === "ENOENT") { + // .json already gone — may have been renamed by a previous ack attempt. + // Try to clean up a leftover .delivered marker if present. + await unlinkBestEffort(deliveredPath); + return; + } + throw err; + } + // Phase 2: remove the marker file. + await unlinkBestEffort(deliveredPath); +} + +/** Update a queue entry after a failed delivery attempt. */ +export async function failDelivery(id: string, error: string, stateDir?: string): Promise { + const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); + const entry = await readQueueEntry(filePath); + entry.retryCount += 1; + entry.lastAttemptAt = Date.now(); + entry.lastError = error; + await writeQueueEntry(filePath, entry); +} + +/** Load all pending delivery entries from the queue directory. */ +export async function loadPendingDeliveries(stateDir?: string): Promise { + const queueDir = resolveQueueDir(stateDir); + let files: string[]; + try { + files = await fs.promises.readdir(queueDir); + } catch (err) { + const code = getErrnoCode(err); + if (code === "ENOENT") { + return []; + } + throw err; + } + + // Clean up .delivered markers left by ackDelivery if the process crashed + // between the rename and the unlink. + for (const file of files) { + if (file.endsWith(".delivered")) { + await unlinkBestEffort(path.join(queueDir, file)); + } + } + + const entries: QueuedDelivery[] = []; + for (const file of files) { + if (!file.endsWith(".json")) { + continue; + } + const filePath = path.join(queueDir, file); + try { + const stat = await fs.promises.stat(filePath); + if (!stat.isFile()) { + continue; + } + const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry( + await readQueueEntry(filePath), + ); + if (migrated) { + await writeQueueEntry(filePath, entry); + } + entries.push(entry); + } catch { + // Skip malformed or inaccessible entries. + } + } + return entries; +} + +/** Move a queue entry to the failed/ subdirectory. */ +export async function moveToFailed(id: string, stateDir?: string): Promise { + const queueDir = resolveQueueDir(stateDir); + const failedDir = resolveFailedDir(stateDir); + await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 }); + await fs.promises.rename(path.join(queueDir, `${id}.json`), path.join(failedDir, `${id}.json`)); +} diff --git a/src/infra/outbound/delivery-queue.policy.test.ts b/src/infra/outbound/delivery-queue.policy.test.ts new file mode 100644 index 00000000000..5319dc6bf2a --- /dev/null +++ b/src/infra/outbound/delivery-queue.policy.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it } from "vitest"; +import { + computeBackoffMs, + isEntryEligibleForRecoveryRetry, + isPermanentDeliveryError, +} from "./delivery-queue.js"; + +describe("delivery-queue policy", () => { + describe("isPermanentDeliveryError", () => { + it.each([ + "No conversation reference found for user:abc", + "Telegram send failed: chat not found (chat_id=user:123)", + "user not found", + "Bot was blocked by the user", + "Forbidden: bot was kicked from the group chat", + "chat_id is empty", + "Outbound not configured for channel: msteams", + ])("returns true for permanent error: %s", (msg) => { + expect(isPermanentDeliveryError(msg)).toBe(true); + }); + + it.each([ + "network down", + "ETIMEDOUT", + "socket hang up", + "rate limited", + "500 Internal Server Error", + ])("returns false for transient error: %s", (msg) => { + expect(isPermanentDeliveryError(msg)).toBe(false); + }); + }); + + describe("computeBackoffMs", () => { + it("returns scheduled backoff values and clamps at max retry", () => { + const cases = [ + { retryCount: 0, expected: 0 }, + { retryCount: 1, expected: 5_000 }, + { retryCount: 2, expected: 25_000 }, + { retryCount: 3, expected: 120_000 }, + { retryCount: 4, expected: 600_000 }, + { retryCount: 5, expected: 600_000 }, + ] as const; + + for (const testCase of cases) { + expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe( + testCase.expected, + ); + } + }); + }); + + describe("isEntryEligibleForRecoveryRetry", () => { + it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => { + const now = Date.now(); + const result = isEntryEligibleForRecoveryRetry( + { + id: "entry-1", + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + enqueuedAt: now, + retryCount: 0, + }, + now, + ); + expect(result).toEqual({ eligible: true }); + }); + + it("defers retry entries until backoff window elapses", () => { + const now = Date.now(); + const result = isEntryEligibleForRecoveryRetry( + { + id: "entry-2", + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + enqueuedAt: now - 30_000, + retryCount: 3, + lastAttemptAt: now, + }, + now, + ); + expect(result.eligible).toBe(false); + if (result.eligible) { + throw new Error("Expected ineligible retry entry"); + } + expect(result.remainingBackoffMs).toBeGreaterThan(0); + }); + }); +}); diff --git a/src/infra/outbound/delivery-queue.recovery.test.ts b/src/infra/outbound/delivery-queue.recovery.test.ts new file mode 100644 index 00000000000..c22b4cf5da4 --- /dev/null +++ b/src/infra/outbound/delivery-queue.recovery.test.ts @@ -0,0 +1,287 @@ +import fs from "node:fs"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import { + enqueueDelivery, + loadPendingDeliveries, + MAX_RETRIES, + recoverPendingDeliveries, +} from "./delivery-queue.js"; +import { + asDeliverFn, + createRecoveryLog, + installDeliveryQueueTmpDirHooks, + setQueuedEntryState, +} from "./delivery-queue.test-helpers.js"; + +describe("delivery-queue recovery", () => { + const { tmpDir } = installDeliveryQueueTmpDirHooks(); + const baseCfg = {}; + + const enqueueCrashRecoveryEntries = async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir()); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir()); + }; + + const runRecovery = async ({ + deliver, + log = createRecoveryLog(), + maxRecoveryMs, + }: { + deliver: ReturnType; + log?: ReturnType; + maxRecoveryMs?: number; + }) => { + const result = await recoverPendingDeliveries({ + deliver: asDeliverFn(deliver), + log, + cfg: baseCfg, + stateDir: tmpDir(), + ...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }), + }); + return { result, log }; + }; + + it("recovers entries from a simulated crash", async () => { + await enqueueCrashRecoveryEntries(); + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledTimes(2); + expect(result).toEqual({ + recovered: 2, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + }); + + it("moves entries that exceeded max retries to failed/", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { retryCount: MAX_RETRIES }); + + const deliver = vi.fn(); + const { result } = await runRecovery({ deliver }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result.skippedMaxRetries).toBe(1); + expect(result.deferredBackoff).toBe(0); + expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true); + }); + + it("increments retryCount on failed recovery attempt", async () => { + await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir()); + + const deliver = vi.fn().mockRejectedValue(new Error("network down")); + const { result } = await runRecovery({ deliver }); + + expect(result.failed).toBe(1); + expect(result.recovered).toBe(0); + + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.retryCount).toBe(1); + expect(entries[0]?.lastError).toBe("network down"); + }); + + it("moves entries to failed/ immediately on permanent delivery errors", async () => { + const id = await enqueueDelivery( + { channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] }, + tmpDir(), + ); + const deliver = vi + .fn() + .mockRejectedValue(new Error("No conversation reference found for user:abc")); + const log = createRecoveryLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(result.failed).toBe(1); + expect(result.recovered).toBe(0); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error")); + }); + + it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir()); + + const deliver = vi.fn().mockResolvedValue([]); + await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); + }); + + it("replays stored delivery options during recovery", async () => { + await enqueueDelivery( + { + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "a", + mediaUrls: ["https://example.com/a.png"], + }, + }, + tmpDir(), + ); + + const deliver = vi.fn().mockResolvedValue([]); + await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "a", + mediaUrls: ["https://example.com/a.png"], + }, + }), + ); + }); + + it("respects maxRecoveryMs time budget and bumps deferred retries", async () => { + await enqueueCrashRecoveryEntries(); + await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir()); + + const deliver = vi.fn().mockResolvedValue([]); + const { result, log } = await runRecovery({ + deliver, + maxRecoveryMs: 0, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + + const remaining = await loadPendingDeliveries(tmpDir()); + expect(remaining).toHaveLength(3); + expect(remaining.every((entry) => entry.retryCount === 1)).toBe(true); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup")); + }); + + it("defers entries until backoff becomes eligible", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { retryCount: 3, lastAttemptAt: Date.now() }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result, log } = await runRecovery({ + deliver, + maxRecoveryMs: 60_000, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(1); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); + }); + + it("continues past high-backoff entries and recovers ready entries behind them", async () => { + const now = Date.now(); + const blockedId = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] }, + tmpDir(), + ); + const readyId = await enqueueDelivery( + { channel: "telegram", to: "2", payloads: [{ text: "ready" }] }, + tmpDir(), + ); + + setQueuedEntryState(tmpDir(), blockedId, { + retryCount: 3, + lastAttemptAt: now, + enqueuedAt: now - 30_000, + }); + setQueuedEntryState(tmpDir(), readyId, { retryCount: 0, enqueuedAt: now - 10_000 }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 }); + + expect(result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); + expect(deliver).toHaveBeenCalledTimes(1); + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }), + ); + + const remaining = await loadPendingDeliveries(tmpDir()); + expect(remaining).toHaveLength(1); + expect(remaining[0]?.id).toBe(blockedId); + }); + + it("recovers deferred entries on a later restart once backoff elapsed", async () => { + vi.useFakeTimers(); + const start = new Date("2026-01-01T00:00:00.000Z"); + vi.setSystemTime(start); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { retryCount: 3, lastAttemptAt: start.getTime() }); + + const firstDeliver = vi.fn().mockResolvedValue([]); + const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 }); + expect(firstRun.result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); + expect(firstDeliver).not.toHaveBeenCalled(); + + vi.setSystemTime(new Date(start.getTime() + 600_000 + 1)); + const secondDeliver = vi.fn().mockResolvedValue([]); + const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 }); + expect(secondRun.result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(secondDeliver).toHaveBeenCalledTimes(1); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + + vi.useRealTimers(); + }); + + it("returns zeros when queue is empty", async () => { + const deliver = vi.fn(); + const { result } = await runRecovery({ deliver }); + + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(deliver).not.toHaveBeenCalled(); + }); +}); diff --git a/src/infra/outbound/delivery-queue.storage.test.ts b/src/infra/outbound/delivery-queue.storage.test.ts new file mode 100644 index 00000000000..a0719aa240f --- /dev/null +++ b/src/infra/outbound/delivery-queue.storage.test.ts @@ -0,0 +1,179 @@ +import fs from "node:fs"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { + ackDelivery, + enqueueDelivery, + failDelivery, + loadPendingDeliveries, + moveToFailed, +} from "./delivery-queue.js"; +import { installDeliveryQueueTmpDirHooks, readQueuedEntry } from "./delivery-queue.test-helpers.js"; + +describe("delivery-queue storage", () => { + const { tmpDir } = installDeliveryQueueTmpDirHooks(); + + describe("enqueue + ack lifecycle", () => { + it("creates and removes a queue entry", async () => { + const id = await enqueueDelivery( + { + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "hello", + mediaUrls: ["https://example.com/file.png"], + }, + }, + tmpDir(), + ); + + const queueDir = path.join(tmpDir(), "delivery-queue"); + const files = fs.readdirSync(queueDir).filter((file) => file.endsWith(".json")); + expect(files).toHaveLength(1); + expect(files[0]).toBe(`${id}.json`); + + const entry = readQueuedEntry(tmpDir(), id); + expect(entry).toMatchObject({ + id, + channel: "whatsapp", + to: "+1555", + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "hello", + mediaUrls: ["https://example.com/file.png"], + }, + retryCount: 0, + }); + expect(entry.payloads).toEqual([{ text: "hello" }]); + + await ackDelivery(id, tmpDir()); + expect(fs.readdirSync(queueDir).filter((file) => file.endsWith(".json"))).toHaveLength(0); + }); + + it("ack is idempotent (no error on missing file)", async () => { + await expect(ackDelivery("nonexistent-id", tmpDir())).resolves.toBeUndefined(); + }); + + it("ack cleans up leftover .delivered marker when .json is already gone", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] }, + tmpDir(), + ); + const queueDir = path.join(tmpDir(), "delivery-queue"); + + fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); + await expect(ackDelivery(id, tmpDir())).resolves.toBeUndefined(); + + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + + it("ack removes .delivered marker so recovery does not replay", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] }, + tmpDir(), + ); + const queueDir = path.join(tmpDir(), "delivery-queue"); + + await ackDelivery(id, tmpDir()); + + expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + + it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => { + const id = await enqueueDelivery( + { channel: "telegram", to: "99", payloads: [{ text: "stale" }] }, + tmpDir(), + ); + const queueDir = path.join(tmpDir(), "delivery-queue"); + + fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); + + const entries = await loadPendingDeliveries(tmpDir()); + + expect(entries).toHaveLength(0); + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + }); + + describe("failDelivery", () => { + it("increments retryCount, records attempt time, and sets lastError", async () => { + const id = await enqueueDelivery( + { + channel: "telegram", + to: "123", + payloads: [{ text: "test" }], + }, + tmpDir(), + ); + + await failDelivery(id, "connection refused", tmpDir()); + + const entry = readQueuedEntry(tmpDir(), id); + expect(entry.retryCount).toBe(1); + expect(typeof entry.lastAttemptAt).toBe("number"); + expect((entry.lastAttemptAt as number) > 0).toBe(true); + expect(entry.lastError).toBe("connection refused"); + }); + }); + + describe("moveToFailed", () => { + it("moves entry to failed/ subdirectory", async () => { + const id = await enqueueDelivery( + { + channel: "slack", + to: "#general", + payloads: [{ text: "hi" }], + }, + tmpDir(), + ); + + await moveToFailed(id, tmpDir()); + + const queueDir = path.join(tmpDir(), "delivery-queue"); + const failedDir = path.join(queueDir, "failed"); + expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + }); + }); + + describe("loadPendingDeliveries", () => { + it("returns empty array when queue directory does not exist", async () => { + expect(await loadPendingDeliveries(path.join(tmpDir(), "no-such-dir"))).toEqual([]); + }); + + it("loads multiple entries", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir()); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir()); + + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(2); + }); + + it("backfills lastAttemptAt for legacy retry entries during load", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] }, + tmpDir(), + ); + const filePath = path.join(tmpDir(), "delivery-queue", `${id}.json`); + const legacyEntry = readQueuedEntry(tmpDir(), id); + legacyEntry.retryCount = 2; + delete legacyEntry.lastAttemptAt; + fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8"); + + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt); + + const persisted = readQueuedEntry(tmpDir(), id); + expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt); + }); + }); +}); diff --git a/src/infra/outbound/delivery-queue.test-helpers.ts b/src/infra/outbound/delivery-queue.test-helpers.ts new file mode 100644 index 00000000000..f2bf02120e6 --- /dev/null +++ b/src/infra/outbound/delivery-queue.test-helpers.ts @@ -0,0 +1,73 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterAll, beforeAll, beforeEach, vi } from "vitest"; +import type { DeliverFn, RecoveryLogger } from "./delivery-queue.js"; + +export function installDeliveryQueueTmpDirHooks(): { readonly tmpDir: () => string } { + let tmpDir = ""; + let fixtureRoot = ""; + let fixtureCount = 0; + + beforeAll(() => { + fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-")); + }); + + beforeEach(() => { + tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`); + fs.mkdirSync(tmpDir, { recursive: true }); + }); + + afterAll(() => { + if (!fixtureRoot) { + return; + } + fs.rmSync(fixtureRoot, { recursive: true, force: true }); + fixtureRoot = ""; + }); + + return { + tmpDir: () => tmpDir, + }; +} + +export function readQueuedEntry(tmpDir: string, id: string): Record { + return JSON.parse( + fs.readFileSync(path.join(tmpDir, "delivery-queue", `${id}.json`), "utf-8"), + ) as Record; +} + +export function setQueuedEntryState( + tmpDir: string, + id: string, + state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number }, +): void { + const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const entry = readQueuedEntry(tmpDir, id); + entry.retryCount = state.retryCount; + if (state.lastAttemptAt === undefined) { + delete entry.lastAttemptAt; + } else { + entry.lastAttemptAt = state.lastAttemptAt; + } + if (state.enqueuedAt !== undefined) { + entry.enqueuedAt = state.enqueuedAt; + } + fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); +} + +export function createRecoveryLog(): RecoveryLogger & { + info: ReturnType void>>; + warn: ReturnType void>>; + error: ReturnType void>>; +} { + return { + info: vi.fn<(msg: string) => void>(), + warn: vi.fn<(msg: string) => void>(), + error: vi.fn<(msg: string) => void>(), + }; +} + +export function asDeliverFn(deliver: ReturnType): DeliverFn { + return deliver as DeliverFn; +} diff --git a/src/infra/outbound/delivery-queue.test.ts b/src/infra/outbound/delivery-queue.test.ts deleted file mode 100644 index 011d4e4ba58..00000000000 --- a/src/infra/outbound/delivery-queue.test.ts +++ /dev/null @@ -1,580 +0,0 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { - ackDelivery, - computeBackoffMs, - type DeliverFn, - enqueueDelivery, - failDelivery, - isEntryEligibleForRecoveryRetry, - isPermanentDeliveryError, - loadPendingDeliveries, - MAX_RETRIES, - moveToFailed, - recoverPendingDeliveries, -} from "./delivery-queue.js"; - -describe("delivery-queue", () => { - let tmpDir: string; - let fixtureRoot = ""; - let fixtureCount = 0; - - beforeAll(() => { - fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-")); - }); - - beforeEach(() => { - tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`); - fs.mkdirSync(tmpDir, { recursive: true }); - }); - - afterAll(() => { - if (!fixtureRoot) { - return; - } - fs.rmSync(fixtureRoot, { recursive: true, force: true }); - fixtureRoot = ""; - }); - - describe("enqueue + ack lifecycle", () => { - it("creates and removes a queue entry", async () => { - const id = await enqueueDelivery( - { - channel: "whatsapp", - to: "+1555", - payloads: [{ text: "hello" }], - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "hello", - mediaUrls: ["https://example.com/file.png"], - }, - }, - tmpDir, - ); - - const queueDir = path.join(tmpDir, "delivery-queue"); - const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); - expect(files).toHaveLength(1); - expect(files[0]).toBe(`${id}.json`); - - const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8")); - expect(entry).toMatchObject({ - id, - channel: "whatsapp", - to: "+1555", - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "hello", - mediaUrls: ["https://example.com/file.png"], - }, - retryCount: 0, - }); - expect(entry.payloads).toEqual([{ text: "hello" }]); - - await ackDelivery(id, tmpDir); - const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); - expect(remaining).toHaveLength(0); - }); - - it("ack is idempotent (no error on missing file)", async () => { - await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined(); - }); - - it("ack cleans up leftover .delivered marker when .json is already gone", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] }, - tmpDir, - ); - const queueDir = path.join(tmpDir, "delivery-queue"); - - fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); - await expect(ackDelivery(id, tmpDir)).resolves.toBeUndefined(); - - expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); - }); - - it("ack removes .delivered marker so recovery does not replay", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] }, - tmpDir, - ); - const queueDir = path.join(tmpDir, "delivery-queue"); - - await ackDelivery(id, tmpDir); - - expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); - expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); - }); - - it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => { - const id = await enqueueDelivery( - { channel: "telegram", to: "99", payloads: [{ text: "stale" }] }, - tmpDir, - ); - const queueDir = path.join(tmpDir, "delivery-queue"); - - fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); - - const entries = await loadPendingDeliveries(tmpDir); - - expect(entries).toHaveLength(0); - expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); - }); - }); - - describe("failDelivery", () => { - it("increments retryCount, records attempt time, and sets lastError", async () => { - const id = await enqueueDelivery( - { - channel: "telegram", - to: "123", - payloads: [{ text: "test" }], - }, - tmpDir, - ); - - await failDelivery(id, "connection refused", tmpDir); - - const queueDir = path.join(tmpDir, "delivery-queue"); - const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8")); - expect(entry.retryCount).toBe(1); - expect(typeof entry.lastAttemptAt).toBe("number"); - expect(entry.lastAttemptAt).toBeGreaterThan(0); - expect(entry.lastError).toBe("connection refused"); - }); - }); - - describe("moveToFailed", () => { - it("moves entry to failed/ subdirectory", async () => { - const id = await enqueueDelivery( - { - channel: "slack", - to: "#general", - payloads: [{ text: "hi" }], - }, - tmpDir, - ); - - await moveToFailed(id, tmpDir); - - const queueDir = path.join(tmpDir, "delivery-queue"); - const failedDir = path.join(queueDir, "failed"); - expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); - expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); - }); - }); - - describe("isPermanentDeliveryError", () => { - it.each([ - "No conversation reference found for user:abc", - "Telegram send failed: chat not found (chat_id=user:123)", - "user not found", - "Bot was blocked by the user", - "Forbidden: bot was kicked from the group chat", - "chat_id is empty", - "Outbound not configured for channel: msteams", - ])("returns true for permanent error: %s", (msg) => { - expect(isPermanentDeliveryError(msg)).toBe(true); - }); - - it.each([ - "network down", - "ETIMEDOUT", - "socket hang up", - "rate limited", - "500 Internal Server Error", - ])("returns false for transient error: %s", (msg) => { - expect(isPermanentDeliveryError(msg)).toBe(false); - }); - }); - - describe("loadPendingDeliveries", () => { - it("returns empty array when queue directory does not exist", async () => { - const nonexistent = path.join(tmpDir, "no-such-dir"); - const entries = await loadPendingDeliveries(nonexistent); - expect(entries).toEqual([]); - }); - - it("loads multiple entries", async () => { - await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); - await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); - - const entries = await loadPendingDeliveries(tmpDir); - expect(entries).toHaveLength(2); - }); - - it("backfills lastAttemptAt for legacy retry entries during load", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] }, - tmpDir, - ); - const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); - const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8")); - legacyEntry.retryCount = 2; - delete legacyEntry.lastAttemptAt; - fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8"); - - const entries = await loadPendingDeliveries(tmpDir); - expect(entries).toHaveLength(1); - expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt); - - const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8")); - expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt); - }); - }); - - describe("computeBackoffMs", () => { - it("returns scheduled backoff values and clamps at max retry", () => { - const cases = [ - { retryCount: 0, expected: 0 }, - { retryCount: 1, expected: 5_000 }, - { retryCount: 2, expected: 25_000 }, - { retryCount: 3, expected: 120_000 }, - { retryCount: 4, expected: 600_000 }, - { retryCount: 5, expected: 600_000 }, - ] as const; - - for (const testCase of cases) { - expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe( - testCase.expected, - ); - } - }); - }); - - describe("isEntryEligibleForRecoveryRetry", () => { - it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => { - const now = Date.now(); - const result = isEntryEligibleForRecoveryRetry( - { - id: "entry-1", - channel: "whatsapp", - to: "+1", - payloads: [{ text: "a" }], - enqueuedAt: now, - retryCount: 0, - }, - now, - ); - expect(result).toEqual({ eligible: true }); - }); - - it("defers retry entries until backoff window elapses", () => { - const now = Date.now(); - const result = isEntryEligibleForRecoveryRetry( - { - id: "entry-2", - channel: "whatsapp", - to: "+1", - payloads: [{ text: "a" }], - enqueuedAt: now - 30_000, - retryCount: 3, - lastAttemptAt: now, - }, - now, - ); - expect(result.eligible).toBe(false); - if (result.eligible) { - throw new Error("Expected ineligible retry entry"); - } - expect(result.remainingBackoffMs).toBeGreaterThan(0); - }); - }); - - describe("recoverPendingDeliveries", () => { - const baseCfg = {}; - const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }); - const enqueueCrashRecoveryEntries = async () => { - await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); - await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); - }; - const setEntryState = ( - id: string, - state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number }, - ) => { - const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); - const entry = JSON.parse(fs.readFileSync(filePath, "utf-8")); - entry.retryCount = state.retryCount; - if (state.lastAttemptAt === undefined) { - delete entry.lastAttemptAt; - } else { - entry.lastAttemptAt = state.lastAttemptAt; - } - if (state.enqueuedAt !== undefined) { - entry.enqueuedAt = state.enqueuedAt; - } - fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); - }; - const runRecovery = async ({ - deliver, - log = createLog(), - maxRecoveryMs, - }: { - deliver: ReturnType; - log?: ReturnType; - maxRecoveryMs?: number; - }) => { - const result = await recoverPendingDeliveries({ - deliver: deliver as DeliverFn, - log, - cfg: baseCfg, - stateDir: tmpDir, - ...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }), - }); - return { result, log }; - }; - - it("recovers entries from a simulated crash", async () => { - await enqueueCrashRecoveryEntries(); - const deliver = vi.fn().mockResolvedValue([]); - const { result } = await runRecovery({ deliver }); - - expect(deliver).toHaveBeenCalledTimes(2); - expect(result.recovered).toBe(2); - expect(result.failed).toBe(0); - expect(result.skippedMaxRetries).toBe(0); - expect(result.deferredBackoff).toBe(0); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(0); - }); - - it("moves entries that exceeded max retries to failed/", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, - tmpDir, - ); - setEntryState(id, { retryCount: MAX_RETRIES }); - - const deliver = vi.fn(); - const { result } = await runRecovery({ deliver }); - - expect(deliver).not.toHaveBeenCalled(); - expect(result.skippedMaxRetries).toBe(1); - expect(result.deferredBackoff).toBe(0); - - const failedDir = path.join(tmpDir, "delivery-queue", "failed"); - expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); - }); - - it("increments retryCount on failed recovery attempt", async () => { - await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir); - - const deliver = vi.fn().mockRejectedValue(new Error("network down")); - const { result } = await runRecovery({ deliver }); - - expect(result.failed).toBe(1); - expect(result.recovered).toBe(0); - - const entries = await loadPendingDeliveries(tmpDir); - expect(entries).toHaveLength(1); - expect(entries[0].retryCount).toBe(1); - expect(entries[0].lastError).toBe("network down"); - }); - - it("moves entries to failed/ immediately on permanent delivery errors", async () => { - const id = await enqueueDelivery( - { channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] }, - tmpDir, - ); - const deliver = vi - .fn() - .mockRejectedValue(new Error("No conversation reference found for user:abc")); - const log = createLog(); - const { result } = await runRecovery({ deliver, log }); - - expect(result.failed).toBe(1); - expect(result.recovered).toBe(0); - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(0); - const failedDir = path.join(tmpDir, "delivery-queue", "failed"); - expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); - expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error")); - }); - - it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => { - await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); - - const deliver = vi.fn().mockResolvedValue([]); - await runRecovery({ deliver }); - - expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); - }); - - it("replays stored delivery options during recovery", async () => { - await enqueueDelivery( - { - channel: "whatsapp", - to: "+1", - payloads: [{ text: "a" }], - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "a", - mediaUrls: ["https://example.com/a.png"], - }, - }, - tmpDir, - ); - - const deliver = vi.fn().mockResolvedValue([]); - await runRecovery({ deliver }); - - expect(deliver).toHaveBeenCalledWith( - expect.objectContaining({ - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "a", - mediaUrls: ["https://example.com/a.png"], - }, - }), - ); - }); - - it("respects maxRecoveryMs time budget", async () => { - await enqueueCrashRecoveryEntries(); - await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir); - - const deliver = vi.fn().mockResolvedValue([]); - const { result, log } = await runRecovery({ - deliver, - maxRecoveryMs: 0, - }); - - expect(deliver).not.toHaveBeenCalled(); - expect(result.recovered).toBe(0); - expect(result.failed).toBe(0); - expect(result.skippedMaxRetries).toBe(0); - expect(result.deferredBackoff).toBe(0); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(3); - - expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup")); - }); - - it("defers entries until backoff becomes eligible", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, - tmpDir, - ); - setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() }); - - const deliver = vi.fn().mockResolvedValue([]); - const { result, log } = await runRecovery({ - deliver, - maxRecoveryMs: 60_000, - }); - - expect(deliver).not.toHaveBeenCalled(); - expect(result).toEqual({ - recovered: 0, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 1, - }); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(1); - - expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); - }); - - it("continues past high-backoff entries and recovers ready entries behind them", async () => { - const now = Date.now(); - const blockedId = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] }, - tmpDir, - ); - const readyId = await enqueueDelivery( - { channel: "telegram", to: "2", payloads: [{ text: "ready" }] }, - tmpDir, - ); - - setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 }); - setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 }); - - const deliver = vi.fn().mockResolvedValue([]); - const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 }); - - expect(result).toEqual({ - recovered: 1, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 1, - }); - expect(deliver).toHaveBeenCalledTimes(1); - expect(deliver).toHaveBeenCalledWith( - expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }), - ); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(1); - expect(remaining[0]?.id).toBe(blockedId); - }); - - it("recovers deferred entries on a later restart once backoff elapsed", async () => { - vi.useFakeTimers(); - const start = new Date("2026-01-01T00:00:00.000Z"); - vi.setSystemTime(start); - - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] }, - tmpDir, - ); - setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() }); - - const firstDeliver = vi.fn().mockResolvedValue([]); - const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 }); - expect(firstRun.result).toEqual({ - recovered: 0, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 1, - }); - expect(firstDeliver).not.toHaveBeenCalled(); - - vi.setSystemTime(new Date(start.getTime() + 600_000 + 1)); - const secondDeliver = vi.fn().mockResolvedValue([]); - const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 }); - expect(secondRun.result).toEqual({ - recovered: 1, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 0, - }); - expect(secondDeliver).toHaveBeenCalledTimes(1); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(0); - - vi.useRealTimers(); - }); - - it("returns zeros when queue is empty", async () => { - const deliver = vi.fn(); - const { result } = await runRecovery({ deliver }); - - expect(result).toEqual({ - recovered: 0, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 0, - }); - expect(deliver).not.toHaveBeenCalled(); - }); - }); -}); diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index a2968bdcc30..f4da3e991ed 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -1,444 +1,17 @@ -import fs from "node:fs"; -import path from "node:path"; -import type { ReplyPayload } from "../../auto-reply/types.js"; -import type { OpenClawConfig } from "../../config/config.js"; -import { resolveStateDir } from "../../config/paths.js"; -import { generateSecureUuid } from "../secure-random.js"; -import type { OutboundMirror } from "./mirror.js"; -import type { OutboundChannel } from "./targets.js"; - -const QUEUE_DIRNAME = "delivery-queue"; -const FAILED_DIRNAME = "failed"; -const MAX_RETRIES = 5; - -/** Backoff delays in milliseconds indexed by retry count (1-based). */ -const BACKOFF_MS: readonly number[] = [ - 5_000, // retry 1: 5s - 25_000, // retry 2: 25s - 120_000, // retry 3: 2m - 600_000, // retry 4: 10m -]; - -type QueuedDeliveryPayload = { - channel: Exclude; - to: string; - accountId?: string; - /** - * Original payloads before plugin hooks. On recovery, hooks re-run on these - * payloads — this is intentional since hooks are stateless transforms and - * should produce the same result on replay. - */ - payloads: ReplyPayload[]; - threadId?: string | number | null; - replyToId?: string | null; - bestEffort?: boolean; - gifPlayback?: boolean; - forceDocument?: boolean; - silent?: boolean; - mirror?: OutboundMirror; -}; - -export interface QueuedDelivery extends QueuedDeliveryPayload { - id: string; - enqueuedAt: number; - retryCount: number; - lastAttemptAt?: number; - lastError?: string; -} - -export type RecoverySummary = { - recovered: number; - failed: number; - skippedMaxRetries: number; - deferredBackoff: number; -}; - -function resolveQueueDir(stateDir?: string): string { - const base = stateDir ?? resolveStateDir(); - return path.join(base, QUEUE_DIRNAME); -} - -function resolveFailedDir(stateDir?: string): string { - return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME); -} - -function resolveQueueEntryPaths( - id: string, - stateDir?: string, -): { - jsonPath: string; - deliveredPath: string; -} { - const queueDir = resolveQueueDir(stateDir); - return { - jsonPath: path.join(queueDir, `${id}.json`), - deliveredPath: path.join(queueDir, `${id}.delivered`), - }; -} - -function getErrnoCode(err: unknown): string | null { - return err && typeof err === "object" && "code" in err - ? String((err as { code?: unknown }).code) - : null; -} - -async function unlinkBestEffort(filePath: string): Promise { - try { - await fs.promises.unlink(filePath); - } catch { - // Best-effort cleanup. - } -} - -/** Ensure the queue directory (and failed/ subdirectory) exist. */ -export async function ensureQueueDir(stateDir?: string): Promise { - const queueDir = resolveQueueDir(stateDir); - await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 }); - await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 }); - return queueDir; -} - -/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */ -type QueuedDeliveryParams = QueuedDeliveryPayload; - -export async function enqueueDelivery( - params: QueuedDeliveryParams, - stateDir?: string, -): Promise { - const queueDir = await ensureQueueDir(stateDir); - const id = generateSecureUuid(); - const entry: QueuedDelivery = { - id, - enqueuedAt: Date.now(), - channel: params.channel, - to: params.to, - accountId: params.accountId, - payloads: params.payloads, - threadId: params.threadId, - replyToId: params.replyToId, - bestEffort: params.bestEffort, - gifPlayback: params.gifPlayback, - forceDocument: params.forceDocument, - silent: params.silent, - mirror: params.mirror, - retryCount: 0, - }; - const filePath = path.join(queueDir, `${id}.json`); - const tmp = `${filePath}.${process.pid}.tmp`; - const json = JSON.stringify(entry, null, 2); - await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 }); - await fs.promises.rename(tmp, filePath); - return id; -} - -/** Remove a successfully delivered entry from the queue. - * - * Uses a two-phase approach so that a crash between delivery and cleanup - * does not cause the message to be replayed on the next recovery scan: - * Phase 1: atomic rename {id}.json → {id}.delivered - * Phase 2: unlink the .delivered marker - * If the process dies between phase 1 and phase 2 the marker is cleaned up - * by {@link loadPendingDeliveries} on the next startup without re-sending. - */ -export async function ackDelivery(id: string, stateDir?: string): Promise { - const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir); - try { - // Phase 1: atomic rename marks the delivery as complete. - await fs.promises.rename(jsonPath, deliveredPath); - } catch (err) { - const code = getErrnoCode(err); - if (code === "ENOENT") { - // .json already gone — may have been renamed by a previous ack attempt. - // Try to clean up a leftover .delivered marker if present. - await unlinkBestEffort(deliveredPath); - return; - } - throw err; - } - // Phase 2: remove the marker file. - await unlinkBestEffort(deliveredPath); -} - -/** Update a queue entry after a failed delivery attempt. */ -export async function failDelivery(id: string, error: string, stateDir?: string): Promise { - const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); - const raw = await fs.promises.readFile(filePath, "utf-8"); - const entry: QueuedDelivery = JSON.parse(raw); - entry.retryCount += 1; - entry.lastAttemptAt = Date.now(); - entry.lastError = error; - const tmp = `${filePath}.${process.pid}.tmp`; - await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { - encoding: "utf-8", - mode: 0o600, - }); - await fs.promises.rename(tmp, filePath); -} - -/** Load all pending delivery entries from the queue directory. */ -export async function loadPendingDeliveries(stateDir?: string): Promise { - const queueDir = resolveQueueDir(stateDir); - let files: string[]; - try { - files = await fs.promises.readdir(queueDir); - } catch (err) { - const code = getErrnoCode(err); - if (code === "ENOENT") { - return []; - } - throw err; - } - // Clean up .delivered markers left by ackDelivery if the process crashed - // between the rename and the unlink. - for (const file of files) { - if (!file.endsWith(".delivered")) { - continue; - } - await unlinkBestEffort(path.join(queueDir, file)); - } - - const entries: QueuedDelivery[] = []; - for (const file of files) { - if (!file.endsWith(".json")) { - continue; - } - const filePath = path.join(queueDir, file); - try { - const stat = await fs.promises.stat(filePath); - if (!stat.isFile()) { - continue; - } - const raw = await fs.promises.readFile(filePath, "utf-8"); - const parsed = JSON.parse(raw) as QueuedDelivery; - const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(parsed); - if (migrated) { - const tmp = `${filePath}.${process.pid}.tmp`; - await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { - encoding: "utf-8", - mode: 0o600, - }); - await fs.promises.rename(tmp, filePath); - } - entries.push(entry); - } catch { - // Skip malformed or inaccessible entries. - } - } - return entries; -} - -/** Move a queue entry to the failed/ subdirectory. */ -export async function moveToFailed(id: string, stateDir?: string): Promise { - const queueDir = resolveQueueDir(stateDir); - const failedDir = resolveFailedDir(stateDir); - await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 }); - const src = path.join(queueDir, `${id}.json`); - const dest = path.join(failedDir, `${id}.json`); - await fs.promises.rename(src, dest); -} - -/** Compute the backoff delay in ms for a given retry count. */ -export function computeBackoffMs(retryCount: number): number { - if (retryCount <= 0) { - return 0; - } - return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0; -} - -export function isEntryEligibleForRecoveryRetry( - entry: QueuedDelivery, - now: number, -): { eligible: true } | { eligible: false; remainingBackoffMs: number } { - const backoff = computeBackoffMs(entry.retryCount + 1); - if (backoff <= 0) { - return { eligible: true }; - } - const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined; - if (firstReplayAfterCrash) { - return { eligible: true }; - } - const hasAttemptTimestamp = - typeof entry.lastAttemptAt === "number" && - Number.isFinite(entry.lastAttemptAt) && - entry.lastAttemptAt > 0; - const baseAttemptAt = hasAttemptTimestamp - ? (entry.lastAttemptAt ?? entry.enqueuedAt) - : entry.enqueuedAt; - const nextEligibleAt = baseAttemptAt + backoff; - if (now >= nextEligibleAt) { - return { eligible: true }; - } - return { eligible: false, remainingBackoffMs: nextEligibleAt - now }; -} - -function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): { - entry: QueuedDelivery; - migrated: boolean; -} { - const hasAttemptTimestamp = - typeof entry.lastAttemptAt === "number" && - Number.isFinite(entry.lastAttemptAt) && - entry.lastAttemptAt > 0; - if (hasAttemptTimestamp || entry.retryCount <= 0) { - return { entry, migrated: false }; - } - const hasEnqueuedTimestamp = - typeof entry.enqueuedAt === "number" && - Number.isFinite(entry.enqueuedAt) && - entry.enqueuedAt > 0; - if (!hasEnqueuedTimestamp) { - return { entry, migrated: false }; - } - return { - entry: { - ...entry, - lastAttemptAt: entry.enqueuedAt, - }, - migrated: true, - }; -} - -export type DeliverFn = ( - params: { - cfg: OpenClawConfig; - } & QueuedDeliveryParams & { - skipQueue?: boolean; - }, -) => Promise; - -export interface RecoveryLogger { - info(msg: string): void; - warn(msg: string): void; - error(msg: string): void; -} - -/** - * On gateway startup, scan the delivery queue and retry any pending entries. - * Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/. - */ -export async function recoverPendingDeliveries(opts: { - deliver: DeliverFn; - log: RecoveryLogger; - cfg: OpenClawConfig; - stateDir?: string; - /** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next startup. Default: 60 000. */ - maxRecoveryMs?: number; -}): Promise { - const pending = await loadPendingDeliveries(opts.stateDir); - if (pending.length === 0) { - return { recovered: 0, failed: 0, skippedMaxRetries: 0, deferredBackoff: 0 }; - } - - // Process oldest first. - pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt); - - opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`); - - const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000); - - let recovered = 0; - let failed = 0; - let skippedMaxRetries = 0; - let deferredBackoff = 0; - - for (let i = 0; i < pending.length; i++) { - const entry = pending[i]; - const now = Date.now(); - if (now >= deadline) { - opts.log.warn(`Recovery time budget exceeded — remaining entries deferred to next startup`); - // Increment retryCount for all remaining entries so that entries which - // are consistently deferred by the time budget eventually reach - // MAX_RETRIES and are pruned rather than looping forever. - await Promise.allSettled( - pending - .slice(i) - .map((e) => failDelivery(e.id, "recovery time budget exceeded", opts.stateDir)), - ); - break; - } - if (entry.retryCount >= MAX_RETRIES) { - opts.log.warn( - `Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`, - ); - try { - await moveToFailed(entry.id, opts.stateDir); - } catch (err) { - opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(err)}`); - } - skippedMaxRetries += 1; - continue; - } - - const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now); - if (!retryEligibility.eligible) { - deferredBackoff += 1; - opts.log.info( - `Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`, - ); - continue; - } - - try { - await opts.deliver({ - cfg: opts.cfg, - channel: entry.channel, - to: entry.to, - accountId: entry.accountId, - payloads: entry.payloads, - threadId: entry.threadId, - replyToId: entry.replyToId, - bestEffort: entry.bestEffort, - gifPlayback: entry.gifPlayback, - forceDocument: entry.forceDocument, - silent: entry.silent, - mirror: entry.mirror, - skipQueue: true, // Prevent re-enqueueing during recovery - }); - await ackDelivery(entry.id, opts.stateDir); - recovered += 1; - opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`); - } catch (err) { - const errMsg = err instanceof Error ? err.message : String(err); - if (isPermanentDeliveryError(errMsg)) { - opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`); - try { - await moveToFailed(entry.id, opts.stateDir); - } catch (moveErr) { - opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(moveErr)}`); - } - failed += 1; - continue; - } - try { - await failDelivery(entry.id, errMsg, opts.stateDir); - } catch { - // Best-effort update. - } - failed += 1; - opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`); - } - } - - opts.log.info( - `Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skippedMaxRetries} skipped (max retries), ${deferredBackoff} deferred (backoff)`, - ); - return { recovered, failed, skippedMaxRetries, deferredBackoff }; -} - -export { MAX_RETRIES }; - -const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [ - /no conversation reference found/i, - /chat not found/i, - /user not found/i, - /bot was blocked by the user/i, - /forbidden: bot was kicked/i, - /chat_id is empty/i, - /recipient is not a valid/i, - /outbound not configured for channel/i, - /ambiguous discord recipient/i, -]; - -export function isPermanentDeliveryError(error: string): boolean { - return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error)); -} +export { + ackDelivery, + enqueueDelivery, + ensureQueueDir, + failDelivery, + loadPendingDeliveries, + moveToFailed, +} from "./delivery-queue-storage.js"; +export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js"; +export { + computeBackoffMs, + isEntryEligibleForRecoveryRetry, + isPermanentDeliveryError, + MAX_RETRIES, + recoverPendingDeliveries, +} from "./delivery-queue-recovery.js"; +export type { DeliverFn, RecoveryLogger, RecoverySummary } from "./delivery-queue-recovery.js"; diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index 1f6a5f4649c..9de658e2b7e 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -1,26 +1,10 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ReplyPayload } from "../../auto-reply/types.js"; import { setDefaultChannelPluginRegistryForTests } from "../../commands/channel-test-helpers.js"; import type { OpenClawConfig } from "../../config/config.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { createTestRegistry } from "../../test-utils/channel-plugins.js"; import { typedCases } from "../../test-utils/typed-cases.js"; -import { - ackDelivery, - computeBackoffMs, - type DeliverFn, - enqueueDelivery, - failDelivery, - isEntryEligibleForRecoveryRetry, - isPermanentDeliveryError, - loadPendingDeliveries, - MAX_RETRIES, - moveToFailed, - recoverPendingDeliveries, -} from "./delivery-queue.js"; import { DirectoryCache } from "./directory-cache.js"; import { buildOutboundResultEnvelope } from "./envelope.js"; import type { OutboundDeliveryJson } from "./format.js"; @@ -46,589 +30,6 @@ beforeEach(() => { setActivePluginRegistry(createTestRegistry([])); }); -describe("delivery-queue", () => { - let tmpDir: string; - let fixtureRoot = ""; - let fixtureCount = 0; - - beforeAll(() => { - fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-")); - }); - - beforeEach(() => { - tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`); - fs.mkdirSync(tmpDir, { recursive: true }); - }); - - afterAll(() => { - if (!fixtureRoot) { - return; - } - fs.rmSync(fixtureRoot, { recursive: true, force: true }); - fixtureRoot = ""; - }); - - describe("enqueue + ack lifecycle", () => { - it("creates and removes a queue entry", async () => { - const id = await enqueueDelivery( - { - channel: "whatsapp", - to: "+1555", - payloads: [{ text: "hello" }], - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "hello", - mediaUrls: ["https://example.com/file.png"], - }, - }, - tmpDir, - ); - - // Entry file exists after enqueue. - const queueDir = path.join(tmpDir, "delivery-queue"); - const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); - expect(files).toHaveLength(1); - expect(files[0]).toBe(`${id}.json`); - - // Entry contents are correct. - const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8")); - expect(entry).toMatchObject({ - id, - channel: "whatsapp", - to: "+1555", - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "hello", - mediaUrls: ["https://example.com/file.png"], - }, - retryCount: 0, - }); - expect(entry.payloads).toEqual([{ text: "hello" }]); - - // Ack removes the file. - await ackDelivery(id, tmpDir); - const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); - expect(remaining).toHaveLength(0); - }); - - it("ack is idempotent (no error on missing file)", async () => { - await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined(); - }); - - it("ack cleans up leftover .delivered marker when .json is already gone", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] }, - tmpDir, - ); - const queueDir = path.join(tmpDir, "delivery-queue"); - - fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); - await expect(ackDelivery(id, tmpDir)).resolves.toBeUndefined(); - - expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); - }); - - it("ack removes .delivered marker so recovery does not replay", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] }, - tmpDir, - ); - const queueDir = path.join(tmpDir, "delivery-queue"); - - await ackDelivery(id, tmpDir); - - // Neither .json nor .delivered should remain. - expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); - expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); - }); - - it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => { - const id = await enqueueDelivery( - { channel: "telegram", to: "99", payloads: [{ text: "stale" }] }, - tmpDir, - ); - const queueDir = path.join(tmpDir, "delivery-queue"); - - // Simulate crash between ack phase 1 (rename) and phase 2 (unlink): - // rename .json → .delivered, then pretend the process died. - fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); - - const entries = await loadPendingDeliveries(tmpDir); - - // The .delivered entry must NOT appear as pending. - expect(entries).toHaveLength(0); - // And the marker file should have been cleaned up. - expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); - }); - }); - - describe("failDelivery", () => { - it("increments retryCount, records attempt time, and sets lastError", async () => { - const id = await enqueueDelivery( - { - channel: "telegram", - to: "123", - payloads: [{ text: "test" }], - }, - tmpDir, - ); - - await failDelivery(id, "connection refused", tmpDir); - - const queueDir = path.join(tmpDir, "delivery-queue"); - const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8")); - expect(entry.retryCount).toBe(1); - expect(typeof entry.lastAttemptAt).toBe("number"); - expect(entry.lastAttemptAt).toBeGreaterThan(0); - expect(entry.lastError).toBe("connection refused"); - }); - }); - - describe("moveToFailed", () => { - it("moves entry to failed/ subdirectory", async () => { - const id = await enqueueDelivery( - { - channel: "slack", - to: "#general", - payloads: [{ text: "hi" }], - }, - tmpDir, - ); - - await moveToFailed(id, tmpDir); - - const queueDir = path.join(tmpDir, "delivery-queue"); - const failedDir = path.join(queueDir, "failed"); - expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); - expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); - }); - }); - - describe("isPermanentDeliveryError", () => { - it.each([ - "No conversation reference found for user:abc", - "Telegram send failed: chat not found (chat_id=user:123)", - "user not found", - "Bot was blocked by the user", - "Forbidden: bot was kicked from the group chat", - "chat_id is empty", - "Outbound not configured for channel: msteams", - ])("returns true for permanent error: %s", (msg) => { - expect(isPermanentDeliveryError(msg)).toBe(true); - }); - - it.each([ - "network down", - "ETIMEDOUT", - "socket hang up", - "rate limited", - "500 Internal Server Error", - ])("returns false for transient error: %s", (msg) => { - expect(isPermanentDeliveryError(msg)).toBe(false); - }); - }); - - describe("loadPendingDeliveries", () => { - it("returns empty array when queue directory does not exist", async () => { - const nonexistent = path.join(tmpDir, "no-such-dir"); - const entries = await loadPendingDeliveries(nonexistent); - expect(entries).toEqual([]); - }); - - it("loads multiple entries", async () => { - await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); - await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); - - const entries = await loadPendingDeliveries(tmpDir); - expect(entries).toHaveLength(2); - }); - - it("backfills lastAttemptAt for legacy retry entries during load", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] }, - tmpDir, - ); - const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); - const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8")); - legacyEntry.retryCount = 2; - delete legacyEntry.lastAttemptAt; - fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8"); - - const entries = await loadPendingDeliveries(tmpDir); - expect(entries).toHaveLength(1); - expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt); - - const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8")); - expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt); - }); - }); - - describe("computeBackoffMs", () => { - it("returns scheduled backoff values and clamps at max retry", () => { - const cases = [ - { retryCount: 0, expected: 0 }, - { retryCount: 1, expected: 5_000 }, - { retryCount: 2, expected: 25_000 }, - { retryCount: 3, expected: 120_000 }, - { retryCount: 4, expected: 600_000 }, - // Beyond defined schedule -- clamps to last value. - { retryCount: 5, expected: 600_000 }, - ] as const; - - for (const testCase of cases) { - expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe( - testCase.expected, - ); - } - }); - }); - - describe("isEntryEligibleForRecoveryRetry", () => { - it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => { - const now = Date.now(); - const result = isEntryEligibleForRecoveryRetry( - { - id: "entry-1", - channel: "whatsapp", - to: "+1", - payloads: [{ text: "a" }], - enqueuedAt: now, - retryCount: 0, - }, - now, - ); - expect(result).toEqual({ eligible: true }); - }); - - it("defers retry entries until backoff window elapses", () => { - const now = Date.now(); - const result = isEntryEligibleForRecoveryRetry( - { - id: "entry-2", - channel: "whatsapp", - to: "+1", - payloads: [{ text: "a" }], - enqueuedAt: now - 30_000, - retryCount: 3, - lastAttemptAt: now, - }, - now, - ); - expect(result.eligible).toBe(false); - if (result.eligible) { - throw new Error("Expected ineligible retry entry"); - } - expect(result.remainingBackoffMs).toBeGreaterThan(0); - }); - }); - - describe("recoverPendingDeliveries", () => { - const baseCfg = {}; - const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }); - const enqueueCrashRecoveryEntries = async () => { - await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); - await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); - }; - const setEntryState = ( - id: string, - state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number }, - ) => { - const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); - const entry = JSON.parse(fs.readFileSync(filePath, "utf-8")); - entry.retryCount = state.retryCount; - if (state.lastAttemptAt === undefined) { - delete entry.lastAttemptAt; - } else { - entry.lastAttemptAt = state.lastAttemptAt; - } - if (state.enqueuedAt !== undefined) { - entry.enqueuedAt = state.enqueuedAt; - } - fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); - }; - const runRecovery = async ({ - deliver, - log = createLog(), - maxRecoveryMs, - }: { - deliver: ReturnType; - log?: ReturnType; - maxRecoveryMs?: number; - }) => { - const result = await recoverPendingDeliveries({ - deliver: deliver as DeliverFn, - log, - cfg: baseCfg, - stateDir: tmpDir, - ...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }), - }); - return { result, log }; - }; - - it("recovers entries from a simulated crash", async () => { - // Manually create queue entries as if gateway crashed before delivery. - await enqueueCrashRecoveryEntries(); - const deliver = vi.fn().mockResolvedValue([]); - const { result } = await runRecovery({ deliver }); - - expect(deliver).toHaveBeenCalledTimes(2); - expect(result.recovered).toBe(2); - expect(result.failed).toBe(0); - expect(result.skippedMaxRetries).toBe(0); - expect(result.deferredBackoff).toBe(0); - - // Queue should be empty after recovery. - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(0); - }); - - it("moves entries that exceeded max retries to failed/", async () => { - // Create an entry and manually set retryCount to MAX_RETRIES. - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, - tmpDir, - ); - setEntryState(id, { retryCount: MAX_RETRIES }); - - const deliver = vi.fn(); - const { result } = await runRecovery({ deliver }); - - expect(deliver).not.toHaveBeenCalled(); - expect(result.skippedMaxRetries).toBe(1); - expect(result.deferredBackoff).toBe(0); - - // Entry should be in failed/ directory. - const failedDir = path.join(tmpDir, "delivery-queue", "failed"); - expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); - }); - - it("increments retryCount on failed recovery attempt", async () => { - await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir); - - const deliver = vi.fn().mockRejectedValue(new Error("network down")); - const { result } = await runRecovery({ deliver }); - - expect(result.failed).toBe(1); - expect(result.recovered).toBe(0); - - // Entry should still be in queue with incremented retryCount. - const entries = await loadPendingDeliveries(tmpDir); - expect(entries).toHaveLength(1); - expect(entries[0].retryCount).toBe(1); - expect(entries[0].lastError).toBe("network down"); - }); - - it("moves entries to failed/ immediately on permanent delivery errors", async () => { - const id = await enqueueDelivery( - { channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] }, - tmpDir, - ); - const deliver = vi - .fn() - .mockRejectedValue(new Error("No conversation reference found for user:abc")); - const log = createLog(); - const { result } = await runRecovery({ deliver, log }); - - expect(result.failed).toBe(1); - expect(result.recovered).toBe(0); - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(0); - const failedDir = path.join(tmpDir, "delivery-queue", "failed"); - expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); - expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error")); - }); - - it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => { - await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); - - const deliver = vi.fn().mockResolvedValue([]); - await runRecovery({ deliver }); - - expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); - }); - - it("replays stored delivery options during recovery", async () => { - await enqueueDelivery( - { - channel: "whatsapp", - to: "+1", - payloads: [{ text: "a" }], - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "a", - mediaUrls: ["https://example.com/a.png"], - }, - }, - tmpDir, - ); - - const deliver = vi.fn().mockResolvedValue([]); - await runRecovery({ deliver }); - - expect(deliver).toHaveBeenCalledWith( - expect.objectContaining({ - bestEffort: true, - gifPlayback: true, - silent: true, - mirror: { - sessionKey: "agent:main:main", - text: "a", - mediaUrls: ["https://example.com/a.png"], - }, - }), - ); - }); - - it("respects maxRecoveryMs time budget", async () => { - await enqueueCrashRecoveryEntries(); - await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir); - - const deliver = vi.fn().mockResolvedValue([]); - const { result, log } = await runRecovery({ - deliver, - maxRecoveryMs: 0, // Immediate timeout -- no entries should be processed. - }); - - expect(deliver).not.toHaveBeenCalled(); - expect(result.recovered).toBe(0); - expect(result.failed).toBe(0); - expect(result.skippedMaxRetries).toBe(0); - expect(result.deferredBackoff).toBe(0); - - // All entries should still be in the queue (retryCount < MAX_RETRIES). - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(3); - - // retryCount should be incremented on all deferred entries so they - // eventually reach MAX_RETRIES and are pruned rather than looping forever. - expect(remaining.every((e) => e.retryCount === 1)).toBe(true); - - // Should have logged a warning about deferred entries. - expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup")); - }); - - it("defers entries until backoff becomes eligible", async () => { - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, - tmpDir, - ); - setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() }); - - const deliver = vi.fn().mockResolvedValue([]); - const { result, log } = await runRecovery({ - deliver, - maxRecoveryMs: 60_000, - }); - - expect(deliver).not.toHaveBeenCalled(); - expect(result).toEqual({ - recovered: 0, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 1, - }); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(1); - - expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); - }); - - it("continues past high-backoff entries and recovers ready entries behind them", async () => { - const now = Date.now(); - const blockedId = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] }, - tmpDir, - ); - const readyId = await enqueueDelivery( - { channel: "telegram", to: "2", payloads: [{ text: "ready" }] }, - tmpDir, - ); - - setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 }); - setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 }); - - const deliver = vi.fn().mockResolvedValue([]); - const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 }); - - expect(result).toEqual({ - recovered: 1, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 1, - }); - expect(deliver).toHaveBeenCalledTimes(1); - expect(deliver).toHaveBeenCalledWith( - expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }), - ); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(1); - expect(remaining[0]?.id).toBe(blockedId); - }); - - it("recovers deferred entries on a later restart once backoff elapsed", async () => { - vi.useFakeTimers(); - const start = new Date("2026-01-01T00:00:00.000Z"); - vi.setSystemTime(start); - - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] }, - tmpDir, - ); - setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() }); - - const firstDeliver = vi.fn().mockResolvedValue([]); - const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 }); - expect(firstRun.result).toEqual({ - recovered: 0, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 1, - }); - expect(firstDeliver).not.toHaveBeenCalled(); - - vi.setSystemTime(new Date(start.getTime() + 600_000 + 1)); - const secondDeliver = vi.fn().mockResolvedValue([]); - const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 }); - expect(secondRun.result).toEqual({ - recovered: 1, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 0, - }); - expect(secondDeliver).toHaveBeenCalledTimes(1); - - const remaining = await loadPendingDeliveries(tmpDir); - expect(remaining).toHaveLength(0); - - vi.useRealTimers(); - }); - - it("returns zeros when queue is empty", async () => { - const deliver = vi.fn(); - const { result } = await runRecovery({ deliver }); - - expect(result).toEqual({ - recovered: 0, - failed: 0, - skippedMaxRetries: 0, - deferredBackoff: 0, - }); - expect(deliver).not.toHaveBeenCalled(); - }); - }); -}); - describe("DirectoryCache", () => { const cfg = {} as OpenClawConfig;