mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-30 12:41:04 +00:00
fix: thread managed image state dir through sqlite stores
This commit is contained in:
@@ -4,6 +4,7 @@ import type { AddressInfo } from "node:net";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { upsertSessionEntry } from "../config/sessions/store.js";
|
||||
import { replaceSqliteSessionTranscriptEvents } from "../config/sessions/transcript-store.sqlite.js";
|
||||
import {
|
||||
executeSqliteQuerySync,
|
||||
@@ -13,10 +14,12 @@ import {
|
||||
import { createPinnedLookup } from "../infra/net/ssrf.js";
|
||||
import { resolvePreferredOpenClawTmpDir } from "../infra/tmp-openclaw-dir.js";
|
||||
import {
|
||||
readMediaBuffer,
|
||||
resolveMediaBufferPath,
|
||||
saveMediaBuffer,
|
||||
setMediaStoreNetworkDepsForTest,
|
||||
} from "../media/store.js";
|
||||
import { DEFAULT_AGENT_ID, resolveAgentIdFromSessionKey } from "../routing/session-key.js";
|
||||
import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js";
|
||||
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
|
||||
import {
|
||||
@@ -149,7 +152,13 @@ async function createFixture(
|
||||
},
|
||||
};
|
||||
await writeManagedImageRecord(record as Parameters<typeof writeManagedImageRecord>[0], stateDir);
|
||||
return { attachmentId, sessionKey, originalPath };
|
||||
upsertSessionEntry({
|
||||
agentId: resolveAgentIdFromSessionKey(sessionKey) ?? DEFAULT_AGENT_ID,
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: stateDir },
|
||||
sessionKey,
|
||||
entry: { sessionId: "sess-main", updatedAt: Date.now() },
|
||||
});
|
||||
return { attachmentId, mediaId: savedOriginal.id, sessionKey, originalPath };
|
||||
}
|
||||
|
||||
type ManagedImageTestDatabase = Pick<OpenClawStateKyselyDatabase, "managed_outgoing_image_records">;
|
||||
@@ -276,6 +285,23 @@ async function requestManagedImage(params: {
|
||||
},
|
||||
],
|
||||
);
|
||||
const encodedSessionKey = params.pathName.split("/").at(-3);
|
||||
if (encodedSessionKey) {
|
||||
try {
|
||||
const sessionKey = decodeURIComponent(encodedSessionKey);
|
||||
upsertSessionEntry({
|
||||
agentId: resolveAgentIdFromSessionKey(sessionKey) ?? DEFAULT_AGENT_ID,
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: params.stateDir },
|
||||
sessionKey,
|
||||
entry: {
|
||||
sessionId: params.sessionEntry?.sessionId ?? "sess-1",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
// Malformed encoded session-key tests should reach the HTTP handler.
|
||||
}
|
||||
}
|
||||
|
||||
const auth = { mode: "test" } as never;
|
||||
const server = http.createServer(async (req, res) => {
|
||||
@@ -1073,9 +1099,6 @@ describe("cleanupManagedOutgoingImageRecords", () => {
|
||||
|
||||
it("cleans up dereferenced records and original files", async () => {
|
||||
const fixture = await createFixture(stateDir);
|
||||
loadSessionEntryMock.mockReturnValue({
|
||||
entry: { sessionId: "sess-main" },
|
||||
});
|
||||
readSessionMessagesMock.mockReturnValue([]);
|
||||
|
||||
const result = await cleanupManagedOutgoingImageRecords({ stateDir });
|
||||
@@ -1084,13 +1107,15 @@ describe("cleanupManagedOutgoingImageRecords", () => {
|
||||
expect(result.deletedFileCount).toBe(1);
|
||||
expect(result.retainedCount).toBe(0);
|
||||
await expectPathMissing(fixture.originalPath);
|
||||
await expect(
|
||||
readMediaBuffer(fixture.mediaId, "outgoing/originals", undefined, {
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: stateDir },
|
||||
}),
|
||||
).rejects.toThrow(/does not resolve to a file/u);
|
||||
});
|
||||
|
||||
it("retains committed records that are still referenced by a full-image block", async () => {
|
||||
const fixture = await createFixture(stateDir);
|
||||
loadSessionEntryMock.mockReturnValue({
|
||||
entry: { sessionId: "sess-main" },
|
||||
});
|
||||
readSessionMessagesMock.mockReturnValue([
|
||||
{
|
||||
__openclaw: { id: "msg-1" },
|
||||
@@ -1121,9 +1146,6 @@ describe("cleanupManagedOutgoingImageRecords", () => {
|
||||
attachmentId: "22222222-2222-4222-8222-222222222222",
|
||||
filename: "att-2.png",
|
||||
});
|
||||
loadSessionEntryMock.mockReturnValue({
|
||||
entry: { sessionId: "sess-main" },
|
||||
});
|
||||
readSessionMessagesMock.mockReturnValue([
|
||||
{
|
||||
__openclaw: { id: "msg-1" },
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import path from "node:path";
|
||||
import type { Insertable, Selectable } from "kysely";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { getSessionEntry } from "../config/sessions.js";
|
||||
import { getSqliteSessionTranscriptStats } from "../config/sessions/transcript-store.sqlite.js";
|
||||
import { readLocalFileSafely } from "../infra/fs-safe.js";
|
||||
import {
|
||||
@@ -26,6 +27,7 @@ import {
|
||||
saveMediaSource,
|
||||
} from "../media/store.js";
|
||||
import { DEFAULT_AGENT_ID, resolveAgentIdFromSessionKey } from "../routing/session-key.js";
|
||||
import { resolveOpenClawAgentSqlitePath } from "../state/openclaw-agent-db.paths.js";
|
||||
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
|
||||
import {
|
||||
openOpenClawStateDatabase,
|
||||
@@ -253,6 +255,13 @@ function managedImageRecordDbOptions(stateDir: string): OpenClawStateDatabaseOpt
|
||||
return { env: { ...process.env, OPENCLAW_STATE_DIR: stateDir } };
|
||||
}
|
||||
|
||||
function managedImageAgentDbPath(agentId: string, stateDir: string): string {
|
||||
return resolveOpenClawAgentSqlitePath({
|
||||
agentId,
|
||||
env: managedImageRecordDbOptions(stateDir).env,
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeManagedOutgoingOriginalSubdir(value: string | undefined): string {
|
||||
return value === MANAGED_OUTGOING_ORIGINALS_SUBDIR ? value : MANAGED_OUTGOING_ORIGINALS_SUBDIR;
|
||||
}
|
||||
@@ -339,15 +348,29 @@ async function getVariantStats(filePath: string) {
|
||||
};
|
||||
}
|
||||
|
||||
async function readManagedImageOriginalBuffer(record: ManagedImageRecord): Promise<Buffer> {
|
||||
async function readManagedImageOriginalBuffer(
|
||||
record: ManagedImageRecord,
|
||||
stateDir = resolveStateDir(),
|
||||
): Promise<Buffer> {
|
||||
const subdir = normalizeManagedOutgoingOriginalSubdir(record.original.mediaSubdir);
|
||||
return (await readMediaBuffer(record.original.mediaId, subdir)).buffer;
|
||||
return (
|
||||
await readMediaBuffer(
|
||||
record.original.mediaId,
|
||||
subdir,
|
||||
DEFAULT_MANAGED_IMAGE_ATTACHMENT_LIMITS.maxBytes,
|
||||
managedImageRecordDbOptions(stateDir),
|
||||
)
|
||||
).buffer;
|
||||
}
|
||||
|
||||
async function deleteManagedImageOriginal(original: ManagedImageRecordVariant): Promise<number> {
|
||||
async function deleteManagedImageOriginal(
|
||||
original: ManagedImageRecordVariant,
|
||||
stateDir = resolveStateDir(),
|
||||
): Promise<number> {
|
||||
await deleteMediaBuffer(
|
||||
original.mediaId,
|
||||
normalizeManagedOutgoingOriginalSubdir(original.mediaSubdir),
|
||||
managedImageRecordDbOptions(stateDir),
|
||||
);
|
||||
return 1;
|
||||
}
|
||||
@@ -439,7 +462,7 @@ async function deleteManagedImageRecordArtifacts(
|
||||
record: ManagedImageRecord,
|
||||
stateDir = resolveStateDir(),
|
||||
) {
|
||||
const deletedFileCount = await deleteManagedImageOriginal(record.original);
|
||||
const deletedFileCount = await deleteManagedImageOriginal(record.original, stateDir);
|
||||
const { database, db } = managedImageRecordDatabase(stateDir);
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
@@ -671,14 +694,20 @@ async function getSessionManagedOutgoingAttachmentIndex(
|
||||
if (cache?.has(sessionKey)) {
|
||||
return cache.get(sessionKey) ?? null;
|
||||
}
|
||||
const { entry } = loadSessionEntry(sessionKey);
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey) ?? DEFAULT_AGENT_ID;
|
||||
const entry = stateDir
|
||||
? getSessionEntry({
|
||||
agentId,
|
||||
env: managedImageRecordDbOptions(stateDir).env,
|
||||
sessionKey,
|
||||
})
|
||||
: loadSessionEntry(sessionKey).entry;
|
||||
const sessionId = entry?.sessionId;
|
||||
if (!sessionId) {
|
||||
cache?.set(sessionKey, null);
|
||||
return null;
|
||||
}
|
||||
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey) ?? DEFAULT_AGENT_ID;
|
||||
const transcriptStat = getSqliteSessionTranscriptStats({
|
||||
agentId,
|
||||
sessionId,
|
||||
@@ -695,7 +724,11 @@ async function getSessionManagedOutgoingAttachmentIndex(
|
||||
}
|
||||
|
||||
const messages = await readSessionMessagesAsync(
|
||||
{ agentId, sessionId },
|
||||
{
|
||||
agentId,
|
||||
sessionId,
|
||||
...(stateDir ? { path: managedImageAgentDbPath(agentId, stateDir) } : {}),
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "managed outgoing attachment index",
|
||||
@@ -1067,7 +1100,7 @@ export async function handleManagedOutgoingImageHttpRequest(
|
||||
|
||||
let body: Buffer;
|
||||
try {
|
||||
body = await readManagedImageOriginalBuffer(record);
|
||||
body = await readManagedImageOriginalBuffer(record, opts.stateDir);
|
||||
} catch {
|
||||
sendStatus(res, 404, "not found");
|
||||
return true;
|
||||
|
||||
@@ -117,8 +117,12 @@ function getMediaKysely(db: DatabaseSync) {
|
||||
return getNodeSqliteKysely<MediaKyselyDatabase>(db);
|
||||
}
|
||||
|
||||
function getMediaBlobRow(params: { subdir: string; id: string }): MediaBlobRow | undefined {
|
||||
const database = openOpenClawStateDatabase();
|
||||
function getMediaBlobRow(params: {
|
||||
subdir: string;
|
||||
id: string;
|
||||
state?: OpenClawStateDatabaseOptions;
|
||||
}): MediaBlobRow | undefined {
|
||||
const database = openOpenClawStateDatabase(params.state);
|
||||
return executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
getMediaKysely(database.db)
|
||||
@@ -968,10 +972,14 @@ export async function saveMediaStream(
|
||||
* Prefer readMediaBuffer when the caller needs the bytes; this path-returning
|
||||
* helper is for channel surfaces that need a stable local attachment path.
|
||||
*/
|
||||
export async function resolveMediaBufferPath(id: string, subdir = "inbound"): Promise<string> {
|
||||
export async function resolveMediaBufferPath(
|
||||
id: string,
|
||||
subdir = "inbound",
|
||||
state?: OpenClawStateDatabaseOptions,
|
||||
): Promise<string> {
|
||||
const safeSubdir = resolveMediaSubdir(subdir, "resolveMediaBufferPath");
|
||||
resolveMediaRelativePath(id, subdir, "resolveMediaBufferPath");
|
||||
const row = getMediaBlobRow({ subdir: safeSubdir, id });
|
||||
const row = getMediaBlobRow({ subdir: safeSubdir, id, state });
|
||||
if (!row) {
|
||||
throw new Error(
|
||||
`resolveMediaBufferPath: media ID does not resolve to a file: ${JSON.stringify(id)}`,
|
||||
@@ -995,10 +1003,11 @@ export async function readMediaBuffer(
|
||||
id: string,
|
||||
subdir: string = "inbound",
|
||||
maxBytes = MAX_BYTES,
|
||||
state?: OpenClawStateDatabaseOptions,
|
||||
): Promise<ReadMediaBufferResult> {
|
||||
const safeSubdir = resolveMediaSubdir(subdir, "readMediaBuffer");
|
||||
resolveMediaRelativePath(id, subdir, "readMediaBuffer");
|
||||
const row = getMediaBlobRow({ subdir: safeSubdir, id });
|
||||
const row = getMediaBlobRow({ subdir: safeSubdir, id, state });
|
||||
if (!row) {
|
||||
throw new Error(`readMediaBuffer: media ID does not resolve to a file: ${JSON.stringify(id)}`);
|
||||
}
|
||||
@@ -1035,7 +1044,11 @@ export async function readMediaBuffer(
|
||||
* @param id The media ID as returned by SavedMedia.id.
|
||||
* @param subdir The subdirectory the file was saved into (default "inbound").
|
||||
*/
|
||||
export async function deleteMediaBuffer(id: string, subdir = "inbound"): Promise<void> {
|
||||
export async function deleteMediaBuffer(
|
||||
id: string,
|
||||
subdir = "inbound",
|
||||
state?: OpenClawStateDatabaseOptions,
|
||||
): Promise<void> {
|
||||
const safeSubdir = resolveMediaSubdir(subdir, "deleteMediaBuffer");
|
||||
resolveMediaRelativePath(id, subdir, "deleteMediaBuffer");
|
||||
runOpenClawStateWriteTransaction((database) => {
|
||||
@@ -1046,7 +1059,7 @@ export async function deleteMediaBuffer(id: string, subdir = "inbound"): Promise
|
||||
.where("subdir", "=", safeSubdir)
|
||||
.where("id", "=", id),
|
||||
);
|
||||
});
|
||||
}, state);
|
||||
await fs.rm(path.join(resolveMediaScopedDir(subdir, "deleteMediaBuffer"), id), {
|
||||
force: true,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user