mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-27 22:12:13 +00:00
fix: scrub volatile queues from backups
This commit is contained in:
@@ -18,6 +18,7 @@ import {
|
||||
type BackupCreateResult,
|
||||
} from "./backup-create.js";
|
||||
import { executeSqliteQuerySync, getNodeSqliteKysely } from "./kysely-sync.js";
|
||||
import { requireNodeSqlite } from "./node-sqlite.js";
|
||||
|
||||
function makeResult(overrides: Partial<BackupCreateResult> = {}): BackupCreateResult {
|
||||
return {
|
||||
@@ -37,7 +38,7 @@ function makeResult(overrides: Partial<BackupCreateResult> = {}): BackupCreateRe
|
||||
|
||||
type BackupCreateTestDatabase = Pick<
|
||||
OpenClawStateKyselyDatabase,
|
||||
"diagnostic_events" | "backup_runs"
|
||||
"diagnostic_events" | "backup_runs" | "delivery_queue_entries"
|
||||
>;
|
||||
|
||||
async function listArchiveEntries(archivePath: string): Promise<string[]> {
|
||||
@@ -53,6 +54,30 @@ async function listArchiveEntries(archivePath: string): Promise<string[]> {
|
||||
return entries;
|
||||
}
|
||||
|
||||
async function extractArchiveToTemp(archivePath: string): Promise<string> {
|
||||
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-backup-test-extract-"));
|
||||
await tar.x({
|
||||
file: archivePath,
|
||||
gzip: true,
|
||||
cwd: tempDir,
|
||||
});
|
||||
return tempDir;
|
||||
}
|
||||
|
||||
function countDeliveryQueueRows(sqlitePath: string): number {
|
||||
const sqlite = requireNodeSqlite();
|
||||
const db = new sqlite.DatabaseSync(sqlitePath, { readOnly: true });
|
||||
try {
|
||||
const row = db.prepare("SELECT COUNT(*) AS count FROM delivery_queue_entries").get() as
|
||||
| { count?: number | bigint }
|
||||
| undefined;
|
||||
const count = row?.count ?? 0;
|
||||
return typeof count === "bigint" ? Number(count) : count;
|
||||
} finally {
|
||||
db.close();
|
||||
}
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
});
|
||||
@@ -289,6 +314,58 @@ describe("createBackupArchive", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("scrubs volatile delivery queue rows from SQLite snapshots", async () => {
|
||||
await withOpenClawTestState(
|
||||
{
|
||||
layout: "state-only",
|
||||
prefix: "openclaw-backup-queue-scrub-",
|
||||
scenario: "minimal",
|
||||
},
|
||||
async (state) => {
|
||||
const outputDir = state.path("backups");
|
||||
await fs.mkdir(outputDir, { recursive: true });
|
||||
const database = openOpenClawStateDatabase();
|
||||
const db = getNodeSqliteKysely<BackupCreateTestDatabase>(database.db);
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
db.insertInto("delivery_queue_entries").values({
|
||||
queue_name: "outbound",
|
||||
id: "queued-send",
|
||||
status: "pending",
|
||||
entry_kind: "message",
|
||||
session_key: "session-1",
|
||||
channel: "telegram",
|
||||
target: "chat-1",
|
||||
account_id: null,
|
||||
entry_json: JSON.stringify({ text: "do not replay" }),
|
||||
enqueued_at: 1,
|
||||
updated_at: 1,
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await createBackupArchive({
|
||||
output: outputDir,
|
||||
includeWorkspace: false,
|
||||
nowMs: Date.UTC(2026, 4, 11, 12, 0, 0),
|
||||
});
|
||||
const stateAsset = result.assets.find((asset) => asset.kind === "state");
|
||||
expect(stateAsset).toBeDefined();
|
||||
const extractDir = await extractArchiveToTemp(result.archivePath);
|
||||
try {
|
||||
const archivedStateDb = path.join(
|
||||
extractDir,
|
||||
stateAsset!.archivePath,
|
||||
"state",
|
||||
"openclaw.sqlite",
|
||||
);
|
||||
expect(countDeliveryQueueRows(archivedStateDb)).toBe(0);
|
||||
} finally {
|
||||
await fs.rm(extractDir, { recursive: true, force: true });
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("does not duplicate the root manifest when the system tempdir lives inside the state dir", async () => {
|
||||
await withOpenClawTestState(
|
||||
{
|
||||
|
||||
@@ -169,6 +169,7 @@ function buildTempArchivePath(outputPath: string): string {
|
||||
|
||||
const BACKUP_TAR_MAX_ATTEMPTS = 3;
|
||||
const BACKUP_TAR_BACKOFF_MS = [250, 1000] as const;
|
||||
const VOLATILE_SQLITE_TABLES = ["delivery_queue_entries"] as const;
|
||||
|
||||
function isTarEofRaceError(err: unknown): boolean {
|
||||
const code = (err as NodeJS.ErrnoException | undefined)?.code;
|
||||
@@ -466,6 +467,27 @@ function sqlStringLiteral(value: string): string {
|
||||
return `'${value.replaceAll("'", "''")}'`;
|
||||
}
|
||||
|
||||
function sqliteTableExists(db: import("node:sqlite").DatabaseSync, tableName: string): boolean {
|
||||
const row = db
|
||||
.prepare("SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ? LIMIT 1")
|
||||
.get(tableName);
|
||||
return row !== undefined;
|
||||
}
|
||||
|
||||
function scrubVolatileSqliteSnapshotRows(db: import("node:sqlite").DatabaseSync): void {
|
||||
let scrubbed = false;
|
||||
for (const tableName of VOLATILE_SQLITE_TABLES) {
|
||||
if (!sqliteTableExists(db, tableName)) {
|
||||
continue;
|
||||
}
|
||||
db.exec(`DELETE FROM ${tableName};`);
|
||||
scrubbed = true;
|
||||
}
|
||||
if (scrubbed) {
|
||||
db.exec("VACUUM;");
|
||||
}
|
||||
}
|
||||
|
||||
async function listSqliteDatabasePaths(root: string): Promise<string[]> {
|
||||
const results: string[] = [];
|
||||
async function walk(dir: string): Promise<void> {
|
||||
@@ -531,6 +553,16 @@ async function snapshotSqliteDatabase(params: {
|
||||
} finally {
|
||||
integrityDb.close();
|
||||
}
|
||||
const snapshotDb = new sqlite.DatabaseSync(params.snapshotPath);
|
||||
try {
|
||||
scrubVolatileSqliteSnapshotRows(snapshotDb);
|
||||
assertSqliteIntegrityOk(
|
||||
snapshotDb,
|
||||
`SQLite integrity check failed after volatile backup scrub: ${params.sourcePath}`,
|
||||
);
|
||||
} finally {
|
||||
snapshotDb.close();
|
||||
}
|
||||
const stat = await fs.stat(params.snapshotPath);
|
||||
return { byteSize: stat.size };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user