mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 03:00:34 +00:00
fix: keep fresh sqlite media materializations
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { Readable } from "node:stream";
|
||||
@@ -545,7 +546,7 @@ describe("media store", () => {
|
||||
);
|
||||
return {
|
||||
removedFiles: [oldNested.path, oldFlat.path],
|
||||
preservedFiles: [],
|
||||
preservedFiles: [freshNested.path],
|
||||
preservedMediaRefs: [
|
||||
{
|
||||
id: freshNested.id,
|
||||
@@ -685,6 +686,51 @@ describe("media store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects HTTP media downloads when the response stream errors", async () => {
|
||||
await withTempStore(async (store) => {
|
||||
const response = new Readable({ read() {} }) as Readable & {
|
||||
statusCode: number;
|
||||
headers: Record<string, string>;
|
||||
};
|
||||
response.statusCode = 200;
|
||||
response.headers = { "content-type": "text/plain" };
|
||||
const request = new EventEmitter() as EventEmitter & {
|
||||
end: () => void;
|
||||
destroy: (error?: Error) => void;
|
||||
};
|
||||
request.end = vi.fn();
|
||||
request.destroy = vi.fn((error?: Error) => {
|
||||
if (error) {
|
||||
request.emit("error", error);
|
||||
}
|
||||
});
|
||||
const httpRequest = vi.fn((_url: URL, _options: unknown, onResponse: unknown) => {
|
||||
queueMicrotask(() => {
|
||||
(onResponse as (res: typeof response) => void)(response);
|
||||
response.emit("data", Buffer.from("partial"));
|
||||
response.emit("error", new Error("socket reset"));
|
||||
});
|
||||
return request;
|
||||
});
|
||||
type NetworkDeps = NonNullable<Parameters<typeof store.setMediaStoreNetworkDepsForTest>[0]>;
|
||||
store.setMediaStoreNetworkDepsForTest({
|
||||
httpRequest: httpRequest as unknown as NetworkDeps["httpRequest"],
|
||||
resolvePinnedHostname: (async () => ({
|
||||
hostname: "example.test",
|
||||
addresses: ["93.184.216.34"],
|
||||
lookup: vi.fn(),
|
||||
})) as unknown as NetworkDeps["resolvePinnedHostname"],
|
||||
});
|
||||
try {
|
||||
await expect(store.saveMediaSource("http://example.test/media.txt")).rejects.toThrow(
|
||||
"socket reset",
|
||||
);
|
||||
} finally {
|
||||
store.setMediaStoreNetworkDepsForTest();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("extractOriginalFilename", () => {
|
||||
it.each([
|
||||
{
|
||||
|
||||
@@ -57,6 +57,7 @@ export type LegacyMediaImportResult = {
|
||||
removed: number;
|
||||
skipped: number;
|
||||
};
|
||||
type ExpiredMediaBlobRow = Pick<MediaBlobRow, "id" | "subdir">;
|
||||
|
||||
const defaultHttpRequestImpl: RequestImpl = httpRequest;
|
||||
const defaultHttpsRequestImpl: RequestImpl = httpsRequest;
|
||||
@@ -358,19 +359,38 @@ async function retryAfterRecreatingDir<T>(dir: string, run: () => Promise<T>): P
|
||||
|
||||
export async function cleanOldMedia(ttlMs = DEFAULT_TTL_MS, options: CleanOldMediaOptions = {}) {
|
||||
const cutoff = Date.now() - ttlMs;
|
||||
runOpenClawStateWriteTransaction((database) => {
|
||||
const query = getMediaKysely(database.db)
|
||||
.deleteFrom("media_blobs")
|
||||
const expiredRows = runOpenClawStateWriteTransaction((database) => {
|
||||
const baseQuery = getMediaKysely(database.db)
|
||||
.selectFrom("media_blobs")
|
||||
.select(["id", "subdir"])
|
||||
.where("created_at", "<", cutoff);
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
options.recursive === false ? query.where("subdir", "not like", `%${path.sep}%`) : query,
|
||||
);
|
||||
const query =
|
||||
options.recursive === false
|
||||
? baseQuery.where("subdir", "not like", `%${path.sep}%`)
|
||||
: baseQuery;
|
||||
const rows = executeSqliteQuerySync(database.db, query).rows as ExpiredMediaBlobRow[];
|
||||
for (const row of rows) {
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
getMediaKysely(database.db)
|
||||
.deleteFrom("media_blobs")
|
||||
.where("subdir", "=", row.subdir)
|
||||
.where("id", "=", row.id),
|
||||
);
|
||||
}
|
||||
return rows;
|
||||
});
|
||||
const materializationRoot = resolveMediaMaterializationRoot();
|
||||
const removedDirs = new Set<string>();
|
||||
for (const row of expiredRows) {
|
||||
const filePath = path.join(resolveMediaScopedDir(row.subdir, "cleanOldMedia"), row.id);
|
||||
await fs.rm(filePath, { force: true }).catch(() => {});
|
||||
removedDirs.add(path.dirname(filePath));
|
||||
}
|
||||
if (options.pruneEmptyDirs || options.recursive !== false) {
|
||||
await fs
|
||||
.rm(resolveMediaMaterializationRoot(), { recursive: true, force: true })
|
||||
.catch(() => {});
|
||||
for (const dir of removedDirs) {
|
||||
await pruneEmptyMediaDirs(dir, materializationRoot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -460,6 +480,7 @@ async function downloadToBuffer(
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
res.on("error", reject);
|
||||
});
|
||||
req.on("error", reject);
|
||||
req.end();
|
||||
|
||||
Reference in New Issue
Block a user