Files
openclaw/src/infra/outbound/delivery-queue-storage.ts
Devin Robison b7d70ade3b Fix/telegram writeback admin scope gate (#54561)
* fix(telegram): require operator.admin for legacy target writeback persistence

* Address claude feedback

* Update extensions/telegram/src/target-writeback.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Remove stray brace

* Add updated docs

* Add missing test file, address codex concerns

* Fix test formatting error

* Address comments, fix tests

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-25 12:12:09 -06:00

245 lines
7.6 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveStateDir } from "../../config/paths.js";
import { generateSecureUuid } from "../secure-random.js";
import type { OutboundMirror } from "./mirror.js";
import type { OutboundChannel } from "./targets.js";
const QUEUE_DIRNAME = "delivery-queue";
const FAILED_DIRNAME = "failed";
export type QueuedDeliveryPayload = {
channel: Exclude<OutboundChannel, "none">;
to: string;
accountId?: string;
/**
* Original payloads before plugin hooks. On recovery, hooks re-run on these
* payloads — this is intentional since hooks are stateless transforms and
* should produce the same result on replay.
*/
payloads: ReplyPayload[];
threadId?: string | number | null;
replyToId?: string | null;
bestEffort?: boolean;
gifPlayback?: boolean;
forceDocument?: boolean;
silent?: boolean;
mirror?: OutboundMirror;
/** Gateway caller scopes at enqueue time, preserved for recovery replay. */
gatewayClientScopes?: readonly string[];
};
export interface QueuedDelivery extends QueuedDeliveryPayload {
id: string;
enqueuedAt: number;
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
}
function resolveQueueDir(stateDir?: string): string {
const base = stateDir ?? resolveStateDir();
return path.join(base, QUEUE_DIRNAME);
}
function resolveFailedDir(stateDir?: string): string {
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
}
function resolveQueueEntryPaths(
id: string,
stateDir?: string,
): {
jsonPath: string;
deliveredPath: string;
} {
const queueDir = resolveQueueDir(stateDir);
return {
jsonPath: path.join(queueDir, `${id}.json`),
deliveredPath: path.join(queueDir, `${id}.delivered`),
};
}
function getErrnoCode(err: unknown): string | null {
return err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
}
async function unlinkBestEffort(filePath: string): Promise<void> {
try {
await fs.promises.unlink(filePath);
} catch {
// Best-effort cleanup.
}
}
async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise<void> {
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
await fs.promises.rename(tmp, filePath);
}
async function readQueueEntry(filePath: string): Promise<QueuedDelivery> {
return JSON.parse(await fs.promises.readFile(filePath, "utf-8")) as QueuedDelivery;
}
function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): {
entry: QueuedDelivery;
migrated: boolean;
} {
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
if (hasAttemptTimestamp || entry.retryCount <= 0) {
return { entry, migrated: false };
}
const hasEnqueuedTimestamp =
typeof entry.enqueuedAt === "number" &&
Number.isFinite(entry.enqueuedAt) &&
entry.enqueuedAt > 0;
if (!hasEnqueuedTimestamp) {
return { entry, migrated: false };
}
return {
entry: {
...entry,
lastAttemptAt: entry.enqueuedAt,
},
migrated: true,
};
}
/** Ensure the queue directory (and failed/ subdirectory) exist. */
export async function ensureQueueDir(stateDir?: string): Promise<string> {
const queueDir = resolveQueueDir(stateDir);
await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 });
await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 });
return queueDir;
}
/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */
export async function enqueueDelivery(
params: QueuedDeliveryPayload,
stateDir?: string,
): Promise<string> {
const queueDir = await ensureQueueDir(stateDir);
const id = generateSecureUuid();
await writeQueueEntry(path.join(queueDir, `${id}.json`), {
id,
enqueuedAt: Date.now(),
channel: params.channel,
to: params.to,
accountId: params.accountId,
payloads: params.payloads,
threadId: params.threadId,
replyToId: params.replyToId,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
silent: params.silent,
mirror: params.mirror,
gatewayClientScopes: params.gatewayClientScopes,
retryCount: 0,
});
return id;
}
/** Remove a successfully delivered entry from the queue.
*
* Uses a two-phase approach so that a crash between delivery and cleanup
* does not cause the message to be replayed on the next recovery scan:
* Phase 1: atomic rename {id}.json → {id}.delivered
* Phase 2: unlink the .delivered marker
* If the process dies between phase 1 and phase 2 the marker is cleaned up
* by {@link loadPendingDeliveries} on the next startup without re-sending.
*/
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir);
try {
// Phase 1: atomic rename marks the delivery as complete.
await fs.promises.rename(jsonPath, deliveredPath);
} catch (err) {
const code = getErrnoCode(err);
if (code === "ENOENT") {
// .json already gone — may have been renamed by a previous ack attempt.
// Try to clean up a leftover .delivered marker if present.
await unlinkBestEffort(deliveredPath);
return;
}
throw err;
}
// Phase 2: remove the marker file.
await unlinkBestEffort(deliveredPath);
}
/** Update a queue entry after a failed delivery attempt. */
export async function failDelivery(id: string, error: string, stateDir?: string): Promise<void> {
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
const entry = await readQueueEntry(filePath);
entry.retryCount += 1;
entry.lastAttemptAt = Date.now();
entry.lastError = error;
await writeQueueEntry(filePath, entry);
}
/** Load all pending delivery entries from the queue directory. */
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
const queueDir = resolveQueueDir(stateDir);
let files: string[];
try {
files = await fs.promises.readdir(queueDir);
} catch (err) {
const code = getErrnoCode(err);
if (code === "ENOENT") {
return [];
}
throw err;
}
// Clean up .delivered markers left by ackDelivery if the process crashed
// between the rename and the unlink.
for (const file of files) {
if (file.endsWith(".delivered")) {
await unlinkBestEffort(path.join(queueDir, file));
}
}
const entries: QueuedDelivery[] = [];
for (const file of files) {
if (!file.endsWith(".json")) {
continue;
}
const filePath = path.join(queueDir, file);
try {
const stat = await fs.promises.stat(filePath);
if (!stat.isFile()) {
continue;
}
const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(
await readQueueEntry(filePath),
);
if (migrated) {
await writeQueueEntry(filePath, entry);
}
entries.push(entry);
} catch {
// Skip malformed or inaccessible entries.
}
}
return entries;
}
/** Move a queue entry to the failed/ subdirectory. */
export async function moveToFailed(id: string, stateDir?: string): Promise<void> {
const queueDir = resolveQueueDir(stateDir);
const failedDir = resolveFailedDir(stateDir);
await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 });
await fs.promises.rename(path.join(queueDir, `${id}.json`), path.join(failedDir, `${id}.json`));
}