From d37de2a39aed7a915e2eebe15d66bb6df624a9c8 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 18 Jun 2026 10:49:21 +0200 Subject: [PATCH] fix(proxy): preserve capture store compatibility --- src/proxy-capture/store.sqlite.test.ts | 56 ++++ src/proxy-capture/store.sqlite.ts | 438 ++++++++++++++++++++++--- src/proxy-capture/types.ts | 6 + 3 files changed, 458 insertions(+), 42 deletions(-) diff --git a/src/proxy-capture/store.sqlite.test.ts b/src/proxy-capture/store.sqlite.test.ts index f6b5907e407..675a9a4837c 100644 --- a/src/proxy-capture/store.sqlite.test.ts +++ b/src/proxy-capture/store.sqlite.test.ts @@ -79,6 +79,62 @@ describe("DebugProxyCaptureStore", () => { second.release(); }); + it("preserves the shipped path-based Plugin SDK overloads", () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-legacy-sdk-")); + cleanupDirs.push(root); + const dbPath = path.join(root, "capture.sqlite"); + const blobDir = path.join(root, "blobs"); + const lease = acquireDebugProxyCaptureStore(dbPath, blobDir); + + expect(getDebugProxyCaptureStore(dbPath, blobDir)).toBe(lease.store); + lease.store.upsertSession({ + id: "legacy-sdk-session", + startedAt: 1, + mode: "sdk", + sourceScope: "openclaw", + sourceProcess: "plugin", + dbPath, + blobDir, + }); + const blob = lease.store.persistPayload(Buffer.from("legacy sdk payload"), "text/plain"); + lease.store.recordEvent({ + sessionId: "legacy-sdk-session", + ts: 2, + sourceScope: "openclaw", + sourceProcess: "plugin", + protocol: "https", + direction: "outbound", + kind: "request", + flowId: "legacy-sdk-flow", + dataBlobId: blob.blobId, + dataSha256: blob.sha256, + }); + + expect(lease.store.readBlob(blob.blobId)).toBe("legacy sdk payload"); + expect(blob.path).toBe(path.join(blobDir, `${blob.blobId}.bin.gz`)); + expect(fs.existsSync(dbPath)).toBe(true); + expect(fs.existsSync(blob.path ?? "")).toBe(true); + expect( + lease.store.db + .prepare("SELECT db_path AS dbPath, blob_dir AS blobDir FROM capture_sessions WHERE id = ?") + .get("legacy-sdk-session"), + ).toEqual({ dbPath, blobDir }); + expect( + lease.store.db + .prepare("SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'capture_blobs'") + .get(), + ).toBeUndefined(); + expect(lease.store.deleteSessions(["legacy-sdk-session"])).toEqual({ + sessions: 1, + events: 1, + blobs: 1, + }); + expect(fs.existsSync(blob.path ?? "")).toBe(false); + + lease.release(); + expect(lease.store.isClosed).toBe(true); + }); + it("uses rollback journaling for captures on NFS-backed volumes", () => { vi.spyOn(fs, "statfsSync").mockReturnValue({ type: 0x6969, diff --git a/src/proxy-capture/store.sqlite.ts b/src/proxy-capture/store.sqlite.ts index 23d6030cb70..a1bdb8f5999 100644 --- a/src/proxy-capture/store.sqlite.ts +++ b/src/proxy-capture/store.sqlite.ts @@ -1,10 +1,19 @@ // Proxy capture SQLite store persists capture metadata and replayable exchanges. import { createHash } from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; import type { DatabaseSync } from "node:sqlite"; import { gunzipSync, gzipSync } from "node:zlib"; import { normalizeNullableString as normalizeObservedValue } from "@openclaw/normalization-core/string-coerce"; import { normalizeUniqueStringEntries } from "@openclaw/normalization-core/string-normalization"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import { applyPrivateModeSync } from "../infra/private-mode.js"; +import { resolveSqliteDatabaseFilePaths } from "../infra/sqlite-files.js"; import { runSqliteImmediateTransactionSync } from "../infra/sqlite-transaction.js"; +import { + configureSqliteConnectionPragmas, + type SqliteWalMaintenance, +} from "../infra/sqlite-wal.js"; import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js"; import type { CaptureBlobRecord, @@ -22,6 +31,129 @@ export type DebugProxyCaptureStoreOptions = { env?: NodeJS.ProcessEnv; }; +type PathBasedDebugProxyCaptureStore = { + blobDir: string; + walMaintenance: SqliteWalMaintenance; +}; + +const DEBUG_PROXY_CAPTURE_DIR_MODE = 0o700; +const DEBUG_PROXY_CAPTURE_FILE_MODE = 0o600; + +function isInMemoryDatabasePath(dbPath: string): boolean { + if (dbPath === ":memory:") { + return true; + } + if (!dbPath.startsWith("file:")) { + return false; + } + const fragmentIndex = dbPath.indexOf("#"); + const uriWithoutFragment = fragmentIndex === -1 ? dbPath : dbPath.slice(0, fragmentIndex); + const queryIndex = uriWithoutFragment.indexOf("?"); + const uriPath = + queryIndex === -1 ? uriWithoutFragment : uriWithoutFragment.slice(0, queryIndex); + try { + if (decodeURIComponent(uriPath.slice("file:".length)) === ":memory:") { + return true; + } + } catch { + // Malformed escapes cannot identify a memory URI; retain file-backed handling. + } + return ( + queryIndex !== -1 && + new URLSearchParams(uriWithoutFragment.slice(queryIndex + 1)).get("mode") === "memory" + ); +} + +function hardenLegacyDatabaseFiles(dbPath: string): void { + for (const candidate of resolveSqliteDatabaseFilePaths(dbPath)) { + if (fs.existsSync(candidate)) { + applyPrivateModeSync(candidate, DEBUG_PROXY_CAPTURE_FILE_MODE); + } + } +} + +function openPathBasedDebugProxyCaptureStore( + dbPath: string, + blobDir: string, +): { db: DatabaseSync; pathBased: PathBasedDebugProxyCaptureStore } { + const fileBackedPath = isInMemoryDatabasePath(dbPath) ? undefined : dbPath; + if (fileBackedPath) { + fs.mkdirSync(path.dirname(fileBackedPath), { + recursive: true, + mode: DEBUG_PROXY_CAPTURE_DIR_MODE, + }); + if (!fs.existsSync(fileBackedPath)) { + fs.closeSync(fs.openSync(fileBackedPath, "a", DEBUG_PROXY_CAPTURE_FILE_MODE)); + } + } + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(dbPath); + let walMaintenance: SqliteWalMaintenance | undefined; + try { + if (fileBackedPath) { + applyPrivateModeSync(fileBackedPath, DEBUG_PROXY_CAPTURE_FILE_MODE); + } + walMaintenance = configureSqliteConnectionPragmas(db, { + busyTimeoutMs: 5000, + databaseLabel: "debug-proxy-capture-sdk", + ...(fileBackedPath ? { databasePath: fileBackedPath } : {}), + foreignKeys: true, + }); + db.exec(` + CREATE TABLE IF NOT EXISTS capture_sessions ( + id TEXT PRIMARY KEY, + started_at INTEGER NOT NULL, + ended_at INTEGER, + mode TEXT NOT NULL, + source_scope TEXT NOT NULL, + source_process TEXT NOT NULL, + proxy_url TEXT, + db_path TEXT NOT NULL, + blob_dir TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS capture_events ( + id INTEGER PRIMARY KEY, + session_id TEXT NOT NULL, + ts INTEGER NOT NULL, + source_scope TEXT NOT NULL, + source_process TEXT NOT NULL, + protocol TEXT NOT NULL, + direction TEXT NOT NULL, + kind TEXT NOT NULL, + flow_id TEXT NOT NULL, + method TEXT, + host TEXT, + path TEXT, + status INTEGER, + close_code INTEGER, + content_type TEXT, + headers_json TEXT, + data_text TEXT, + data_blob_id TEXT, + data_sha256 TEXT, + error_text TEXT, + meta_json TEXT + ); + CREATE INDEX IF NOT EXISTS capture_events_session_ts_idx ON capture_events(session_id, ts); + CREATE INDEX IF NOT EXISTS capture_events_flow_idx ON capture_events(flow_id, ts); + `); + if (fileBackedPath) { + hardenLegacyDatabaseFiles(fileBackedPath); + } + return { + db, + pathBased: { + blobDir, + walMaintenance, + }, + }; + } catch (err) { + walMaintenance?.close(); + db.close(); + throw err; + } +} + function serializeJson(value: unknown): string | null { return value == null ? null : JSON.stringify(value); } @@ -49,18 +181,43 @@ function sortObservedCounts(counts: Map): CaptureObservedDimensi export class DebugProxyCaptureStore { readonly db: DatabaseSync; readonly dbPath: string; + readonly blobDir: string; + private readonly pathBased?: PathBasedDebugProxyCaptureStore; private closed = false; - constructor(options: DebugProxyCaptureStoreOptions = {}) { - const database = openOpenClawStateDatabase({ env: options.env }); + constructor(options?: DebugProxyCaptureStoreOptions); + /** @deprecated Use the options overload so capture data lives in shared SQLite state. */ + constructor(dbPath: string, blobDir: string); + constructor( + optionsOrDbPath: DebugProxyCaptureStoreOptions | string = {}, + legacyBlobDir?: string, + ) { + if (typeof optionsOrDbPath === "string") { + if (!legacyBlobDir) { + throw new TypeError("legacy debug proxy capture store requires a blob directory"); + } + const opened = openPathBasedDebugProxyCaptureStore(optionsOrDbPath, legacyBlobDir); + this.db = opened.db; + this.dbPath = optionsOrDbPath; + this.blobDir = legacyBlobDir; + this.pathBased = opened.pathBased; + return; + } + const database = openOpenClawStateDatabase({ env: optionsOrDbPath.env }); this.db = database.db; this.dbPath = database.path; + // Retain the shipped public property while shared-state blobs live in this DB. + this.blobDir = database.path; } close(): void { if (this.closed) { return; } + if (this.pathBased) { + this.pathBased.walMaintenance.close(); + this.db.close(); + } this.closed = true; } @@ -69,6 +226,30 @@ export class DebugProxyCaptureStore { } upsertSession(session: CaptureSessionRecord): void { + if (this.pathBased) { + this.db + .prepare( + `INSERT INTO capture_sessions ( + id, started_at, ended_at, mode, source_scope, source_process, proxy_url, db_path, blob_dir + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + ended_at=excluded.ended_at, + proxy_url=excluded.proxy_url, + source_process=excluded.source_process`, + ) + .run( + session.id, + session.startedAt, + session.endedAt ?? null, + session.mode, + session.sourceScope, + session.sourceProcess, + session.proxyUrl ?? null, + session.dbPath ?? this.dbPath, + session.blobDir ?? this.pathBased.blobDir, + ); + return; + } this.db .prepare( `INSERT INTO capture_sessions ( @@ -104,6 +285,27 @@ export class DebugProxyCaptureStore { persistPayload(data: Buffer, contentType?: string): CaptureBlobRecord { const sha256 = createHash("sha256").update(data).digest("hex"); const blobId = sha256.slice(0, 24); + if (this.pathBased) { + fs.mkdirSync(this.pathBased.blobDir, { + recursive: true, + mode: DEBUG_PROXY_CAPTURE_DIR_MODE, + }); + const outputPath = path.join(this.pathBased.blobDir, `${blobId}.bin.gz`); + if (!fs.existsSync(outputPath)) { + fs.writeFileSync(outputPath, gzipSync(data), { + mode: DEBUG_PROXY_CAPTURE_FILE_MODE, + }); + } + applyPrivateModeSync(outputPath, DEBUG_PROXY_CAPTURE_FILE_MODE); + return { + blobId, + path: outputPath, + encoding: "gzip", + sizeBytes: data.byteLength, + sha256, + ...(contentType ? { contentType } : {}), + }; + } this.db .prepare( `INSERT OR IGNORE INTO capture_blobs ( @@ -129,6 +331,10 @@ export class DebugProxyCaptureStore { } recordEvent(event: CaptureEventRecord): void { + if (this.pathBased) { + this.insertEvent(event, event.dataBlobId ?? null); + return; + } runSqliteImmediateTransactionSync(this.db, () => { // Capture can be invoked directly by provider seams before the top-level // runtime initializes. Keep the shared-schema foreign key valid without @@ -147,39 +353,43 @@ export class DebugProxyCaptureStore { this.db.prepare(`SELECT 1 FROM capture_blobs WHERE blob_id = ?`).get(event.dataBlobId) ? event.dataBlobId : null; - this.db - .prepare( - `INSERT INTO capture_events ( - session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id, - method, host, path, status, close_code, content_type, headers_json, - data_text, data_blob_id, data_sha256, error_text, meta_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ) - .run( - event.sessionId, - event.ts, - event.sourceScope, - event.sourceProcess, - event.protocol, - event.direction, - event.kind, - event.flowId, - event.method ?? null, - event.host ?? null, - event.path ?? null, - event.status ?? null, - event.closeCode ?? null, - event.contentType ?? null, - event.headersJson ?? null, - event.dataText ?? null, - dataBlobId, - event.dataSha256 ?? null, - event.errorText ?? null, - event.metaJson ?? null, - ); + this.insertEvent(event, dataBlobId); }); } + private insertEvent(event: CaptureEventRecord, dataBlobId: string | null): void { + this.db + .prepare( + `INSERT INTO capture_events ( + session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id, + method, host, path, status, close_code, content_type, headers_json, + data_text, data_blob_id, data_sha256, error_text, meta_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + event.sessionId, + event.ts, + event.sourceScope, + event.sourceProcess, + event.protocol, + event.direction, + event.kind, + event.flowId, + event.method ?? null, + event.host ?? null, + event.path ?? null, + event.status ?? null, + event.closeCode ?? null, + event.contentType ?? null, + event.headersJson ?? null, + event.dataText ?? null, + dataBlobId, + event.dataSha256 ?? null, + event.errorText ?? null, + event.metaJson ?? null, + ); + } + listSessions(limit = 50): CaptureSessionSummary[] { return this.db .prepare( @@ -274,14 +484,26 @@ export class DebugProxyCaptureStore { } readBlob(blobId: string): string | null { + if (this.pathBased) { + const legacyRow = this.db + .prepare(`SELECT data_blob_id AS blobId FROM capture_events WHERE data_blob_id = ? LIMIT 1`) + .get(blobId) as { blobId?: string } | undefined; + if (!legacyRow?.blobId) { + return null; + } + const blobPath = path.join(this.pathBased.blobDir, `${legacyRow.blobId}.bin.gz`); + return fs.existsSync(blobPath) + ? gunzipSync(fs.readFileSync(blobPath)).toString("utf8") + : null; + } const row = this.db .prepare(`SELECT encoding, data FROM capture_blobs WHERE blob_id = ?`) .get(blobId) as { data?: Uint8Array; encoding?: string } | undefined; - if (!row?.data) { - return null; + if (row?.data) { + const data = Buffer.from(row.data); + return (row.encoding === "gzip" ? gunzipSync(data) : data).toString("utf8"); } - const data = Buffer.from(row.data); - return (row.encoding === "gzip" ? gunzipSync(data) : data).toString("utf8"); + return null; } queryPreset(preset: CaptureQueryPreset, sessionId?: string): CaptureQueryRow[] { @@ -365,6 +587,26 @@ export class DebugProxyCaptureStore { } purgeAll(): { sessions: number; events: number; blobs: number } { + if (this.pathBased) { + const sessionCount = + ( + this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as { + count: number; + } + ).count ?? 0; + const eventCount = + (this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number }) + .count ?? 0; + this.db.exec(`DELETE FROM capture_events; DELETE FROM capture_sessions;`); + let blobs = 0; + if (fs.existsSync(this.pathBased.blobDir)) { + for (const entry of fs.readdirSync(this.pathBased.blobDir)) { + fs.rmSync(path.join(this.pathBased.blobDir, entry), { force: true }); + blobs += 1; + } + } + return { sessions: sessionCount, events: eventCount, blobs }; + } return runSqliteImmediateTransactionSync(this.db, () => { const sessionCount = ( @@ -390,6 +632,9 @@ export class DebugProxyCaptureStore { if (uniqueSessionIds.length === 0) { return { sessions: 0, events: 0, blobs: 0 }; } + if (this.pathBased) { + return this.deletePathBasedSessions(uniqueSessionIds); + } return runSqliteImmediateTransactionSync(this.db, () => { const placeholders = uniqueSessionIds.map(() => "?").join(", "); const blobRows = this.db @@ -461,6 +706,82 @@ export class DebugProxyCaptureStore { return { sessions: sessionCount, events: eventCount, blobs }; }); } + + private deletePathBasedSessions(sessionIds: string[]): { + sessions: number; + events: number; + blobs: number; + } { + const pathBased = this.pathBased; + if (!pathBased) { + throw new Error("path-based debug proxy capture store is unavailable"); + } + const placeholders = sessionIds.map(() => "?").join(", "); + const blobRows = this.db + .prepare( + `SELECT DISTINCT data_blob_id AS blobId + FROM capture_events + WHERE session_id IN (${placeholders}) + AND data_blob_id IS NOT NULL`, + ) + .all(...sessionIds) as Array<{ blobId?: string | null }>; + const eventCount = + ( + this.db + .prepare( + `SELECT COUNT(*) AS count + FROM capture_events + WHERE session_id IN (${placeholders})`, + ) + .get(...sessionIds) as { count: number } + ).count ?? 0; + const sessionCount = + ( + this.db + .prepare( + `SELECT COUNT(*) AS count + FROM capture_sessions + WHERE id IN (${placeholders})`, + ) + .get(...sessionIds) as { count: number } + ).count ?? 0; + this.db.prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`).run( + ...sessionIds, + ); + this.db.prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`).run(...sessionIds); + const candidateBlobIds = blobRows + .map((row) => row.blobId?.trim()) + .filter((blobId): blobId is string => Boolean(blobId)); + const remainingBlobRefs = + candidateBlobIds.length > 0 + ? new Set( + ( + this.db + .prepare( + `SELECT DISTINCT data_blob_id AS blobId + FROM capture_events + WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")}) + AND data_blob_id IS NOT NULL`, + ) + .all(...candidateBlobIds) as Array<{ blobId?: string | null }> + ) + .map((row) => row.blobId?.trim()) + .filter((blobId): blobId is string => Boolean(blobId)), + ) + : new Set(); + let blobs = 0; + for (const blobId of candidateBlobIds) { + if (remainingBlobRefs.has(blobId)) { + continue; + } + const blobPath = path.join(pathBased.blobDir, `${blobId}.bin.gz`); + if (fs.existsSync(blobPath)) { + fs.rmSync(blobPath, { force: true }); + blobs += 1; + } + } + return { sessions: sessionCount, events: eventCount, blobs }; + } } type CachedStoreEntry = { @@ -470,15 +791,33 @@ type CachedStoreEntry = { const cachedStores = new Map(); +function resolveDebugProxyCaptureStoreKey( + optionsOrDbPath: DebugProxyCaptureStoreOptions | string, + legacyBlobDir?: string, +): string { + return typeof optionsOrDbPath === "string" + ? `legacy:${optionsOrDbPath}:${legacyBlobDir ?? ""}` + : `shared:${openOpenClawStateDatabase({ env: optionsOrDbPath.env }).path}`; +} + export function getDebugProxyCaptureStore( - options: DebugProxyCaptureStoreOptions = {}, + options?: DebugProxyCaptureStoreOptions, +): DebugProxyCaptureStore; +/** @deprecated Use the options overload so capture data lives in shared SQLite state. */ +export function getDebugProxyCaptureStore(dbPath: string, blobDir: string): DebugProxyCaptureStore; +export function getDebugProxyCaptureStore( + optionsOrDbPath: DebugProxyCaptureStoreOptions | string = {}, + legacyBlobDir?: string, ): DebugProxyCaptureStore { - const key = openOpenClawStateDatabase({ env: options.env }).path; + const key = resolveDebugProxyCaptureStoreKey(optionsOrDbPath, legacyBlobDir); const cached = cachedStores.get(key); if (cached && !cached.store.isClosed) { return cached.store; } - const store = new DebugProxyCaptureStore(options); + const store = + typeof optionsOrDbPath === "string" + ? new DebugProxyCaptureStore(optionsOrDbPath, legacyBlobDir!) + : new DebugProxyCaptureStore(optionsOrDbPath); cachedStores.set(key, { store, leases: 0 }); return store; } @@ -492,12 +831,27 @@ export function closeDebugProxyCaptureStore(): void { // Lease API keeps one cached capture-store wrapper alive across related // operations, then releases it without closing the shared state database. -export function acquireDebugProxyCaptureStore(options: DebugProxyCaptureStoreOptions = {}): { +export function acquireDebugProxyCaptureStore(options?: DebugProxyCaptureStoreOptions): { + store: DebugProxyCaptureStore; + release: () => void; +}; +/** @deprecated Use the options overload so capture data lives in shared SQLite state. */ +export function acquireDebugProxyCaptureStore(dbPath: string, blobDir: string): { + store: DebugProxyCaptureStore; + release: () => void; +}; +export function acquireDebugProxyCaptureStore( + optionsOrDbPath: DebugProxyCaptureStoreOptions | string = {}, + legacyBlobDir?: string, +): { store: DebugProxyCaptureStore; release: () => void; } { - const store = getDebugProxyCaptureStore(options); - const key = store.dbPath; + const key = resolveDebugProxyCaptureStoreKey(optionsOrDbPath, legacyBlobDir); + const store = + typeof optionsOrDbPath === "string" + ? getDebugProxyCaptureStore(optionsOrDbPath, legacyBlobDir!) + : getDebugProxyCaptureStore(optionsOrDbPath); const cached = cachedStores.get(key); if (!cached || cached.store !== store) { throw new Error("debug proxy capture store cache changed while acquiring a lease"); diff --git a/src/proxy-capture/types.ts b/src/proxy-capture/types.ts index f41b4a73810..1744529180f 100644 --- a/src/proxy-capture/types.ts +++ b/src/proxy-capture/types.ts @@ -23,10 +23,16 @@ export type CaptureSessionRecord = { sourceScope: "openclaw"; sourceProcess: string; proxyUrl?: string; + /** @deprecated Capture storage now lives in the shared state database. */ + dbPath?: string; + /** @deprecated Capture payloads now live in the shared state database. */ + blobDir?: string; }; export type CaptureBlobRecord = { blobId: string; + /** @deprecated Shared-state capture blobs do not have a standalone file path. */ + path?: string; encoding: "gzip"; sizeBytes: number; sha256: string;