From c223374b16fba09357a22c22d855da60c9bfcbb2 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 23:52:56 +0100 Subject: [PATCH] fix: keep fresh sqlite media materializations --- src/media/store.test.ts | 48 ++++++++++++++++++++++++++++++++++++++++- src/media/store.ts | 41 ++++++++++++++++++++++++++--------- 2 files changed, 78 insertions(+), 11 deletions(-) diff --git a/src/media/store.test.ts b/src/media/store.test.ts index e705fcbbd8e..2cdf7eea502 100644 --- a/src/media/store.test.ts +++ b/src/media/store.test.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "node:events"; import fs from "node:fs/promises"; import path from "node:path"; import { Readable } from "node:stream"; @@ -545,7 +546,7 @@ describe("media store", () => { ); return { removedFiles: [oldNested.path, oldFlat.path], - preservedFiles: [], + preservedFiles: [freshNested.path], preservedMediaRefs: [ { id: freshNested.id, @@ -685,6 +686,51 @@ describe("media store", () => { }); }); + it("rejects HTTP media downloads when the response stream errors", async () => { + await withTempStore(async (store) => { + const response = new Readable({ read() {} }) as Readable & { + statusCode: number; + headers: Record; + }; + response.statusCode = 200; + response.headers = { "content-type": "text/plain" }; + const request = new EventEmitter() as EventEmitter & { + end: () => void; + destroy: (error?: Error) => void; + }; + request.end = vi.fn(); + request.destroy = vi.fn((error?: Error) => { + if (error) { + request.emit("error", error); + } + }); + const httpRequest = vi.fn((_url: URL, _options: unknown, onResponse: unknown) => { + queueMicrotask(() => { + (onResponse as (res: typeof response) => void)(response); + response.emit("data", Buffer.from("partial")); + response.emit("error", new Error("socket reset")); + }); + return request; + }); + type NetworkDeps = NonNullable[0]>; + store.setMediaStoreNetworkDepsForTest({ + httpRequest: httpRequest as unknown as NetworkDeps["httpRequest"], + resolvePinnedHostname: (async () => ({ + hostname: "example.test", + addresses: ["93.184.216.34"], + lookup: vi.fn(), + })) as unknown as NetworkDeps["resolvePinnedHostname"], + }); + try { + await expect(store.saveMediaSource("http://example.test/media.txt")).rejects.toThrow( + "socket reset", + ); + } finally { + store.setMediaStoreNetworkDepsForTest(); + } + }); + }); + describe("extractOriginalFilename", () => { it.each([ { diff --git a/src/media/store.ts b/src/media/store.ts index 404ce434740..0a50bfe098a 100644 --- a/src/media/store.ts +++ b/src/media/store.ts @@ -57,6 +57,7 @@ export type LegacyMediaImportResult = { removed: number; skipped: number; }; +type ExpiredMediaBlobRow = Pick; const defaultHttpRequestImpl: RequestImpl = httpRequest; const defaultHttpsRequestImpl: RequestImpl = httpsRequest; @@ -358,19 +359,38 @@ async function retryAfterRecreatingDir(dir: string, run: () => Promise): P export async function cleanOldMedia(ttlMs = DEFAULT_TTL_MS, options: CleanOldMediaOptions = {}) { const cutoff = Date.now() - ttlMs; - runOpenClawStateWriteTransaction((database) => { - const query = getMediaKysely(database.db) - .deleteFrom("media_blobs") + const expiredRows = runOpenClawStateWriteTransaction((database) => { + const baseQuery = getMediaKysely(database.db) + .selectFrom("media_blobs") + .select(["id", "subdir"]) .where("created_at", "<", cutoff); - executeSqliteQuerySync( - database.db, - options.recursive === false ? query.where("subdir", "not like", `%${path.sep}%`) : query, - ); + const query = + options.recursive === false + ? baseQuery.where("subdir", "not like", `%${path.sep}%`) + : baseQuery; + const rows = executeSqliteQuerySync(database.db, query).rows as ExpiredMediaBlobRow[]; + for (const row of rows) { + executeSqliteQuerySync( + database.db, + getMediaKysely(database.db) + .deleteFrom("media_blobs") + .where("subdir", "=", row.subdir) + .where("id", "=", row.id), + ); + } + return rows; }); + const materializationRoot = resolveMediaMaterializationRoot(); + const removedDirs = new Set(); + for (const row of expiredRows) { + const filePath = path.join(resolveMediaScopedDir(row.subdir, "cleanOldMedia"), row.id); + await fs.rm(filePath, { force: true }).catch(() => {}); + removedDirs.add(path.dirname(filePath)); + } if (options.pruneEmptyDirs || options.recursive !== false) { - await fs - .rm(resolveMediaMaterializationRoot(), { recursive: true, force: true }) - .catch(() => {}); + for (const dir of removedDirs) { + await pruneEmptyMediaDirs(dir, materializationRoot); + } } } @@ -460,6 +480,7 @@ async function downloadToBuffer( reject(err); } }); + res.on("error", reject); }); req.on("error", reject); req.end();