fix(proxy): preserve capture store compatibility

This commit is contained in:
Vincent Koc
2026-06-18 10:49:21 +02:00
parent 6e6bd5633f
commit d37de2a39a
3 changed files with 458 additions and 42 deletions

View File

@@ -79,6 +79,62 @@ describe("DebugProxyCaptureStore", () => {
second.release();
});
it("preserves the shipped path-based Plugin SDK overloads", () => {
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-legacy-sdk-"));
cleanupDirs.push(root);
const dbPath = path.join(root, "capture.sqlite");
const blobDir = path.join(root, "blobs");
const lease = acquireDebugProxyCaptureStore(dbPath, blobDir);
expect(getDebugProxyCaptureStore(dbPath, blobDir)).toBe(lease.store);
lease.store.upsertSession({
id: "legacy-sdk-session",
startedAt: 1,
mode: "sdk",
sourceScope: "openclaw",
sourceProcess: "plugin",
dbPath,
blobDir,
});
const blob = lease.store.persistPayload(Buffer.from("legacy sdk payload"), "text/plain");
lease.store.recordEvent({
sessionId: "legacy-sdk-session",
ts: 2,
sourceScope: "openclaw",
sourceProcess: "plugin",
protocol: "https",
direction: "outbound",
kind: "request",
flowId: "legacy-sdk-flow",
dataBlobId: blob.blobId,
dataSha256: blob.sha256,
});
expect(lease.store.readBlob(blob.blobId)).toBe("legacy sdk payload");
expect(blob.path).toBe(path.join(blobDir, `${blob.blobId}.bin.gz`));
expect(fs.existsSync(dbPath)).toBe(true);
expect(fs.existsSync(blob.path ?? "")).toBe(true);
expect(
lease.store.db
.prepare("SELECT db_path AS dbPath, blob_dir AS blobDir FROM capture_sessions WHERE id = ?")
.get("legacy-sdk-session"),
).toEqual({ dbPath, blobDir });
expect(
lease.store.db
.prepare("SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'capture_blobs'")
.get(),
).toBeUndefined();
expect(lease.store.deleteSessions(["legacy-sdk-session"])).toEqual({
sessions: 1,
events: 1,
blobs: 1,
});
expect(fs.existsSync(blob.path ?? "")).toBe(false);
lease.release();
expect(lease.store.isClosed).toBe(true);
});
it("uses rollback journaling for captures on NFS-backed volumes", () => {
vi.spyOn(fs, "statfsSync").mockReturnValue({
type: 0x6969,

View File

@@ -1,10 +1,19 @@
// Proxy capture SQLite store persists capture metadata and replayable exchanges.
import { createHash } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import { gunzipSync, gzipSync } from "node:zlib";
import { normalizeNullableString as normalizeObservedValue } from "@openclaw/normalization-core/string-coerce";
import { normalizeUniqueStringEntries } from "@openclaw/normalization-core/string-normalization";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { applyPrivateModeSync } from "../infra/private-mode.js";
import { resolveSqliteDatabaseFilePaths } from "../infra/sqlite-files.js";
import { runSqliteImmediateTransactionSync } from "../infra/sqlite-transaction.js";
import {
configureSqliteConnectionPragmas,
type SqliteWalMaintenance,
} from "../infra/sqlite-wal.js";
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
import type {
CaptureBlobRecord,
@@ -22,6 +31,129 @@ export type DebugProxyCaptureStoreOptions = {
env?: NodeJS.ProcessEnv;
};
type PathBasedDebugProxyCaptureStore = {
blobDir: string;
walMaintenance: SqliteWalMaintenance;
};
const DEBUG_PROXY_CAPTURE_DIR_MODE = 0o700;
const DEBUG_PROXY_CAPTURE_FILE_MODE = 0o600;
function isInMemoryDatabasePath(dbPath: string): boolean {
if (dbPath === ":memory:") {
return true;
}
if (!dbPath.startsWith("file:")) {
return false;
}
const fragmentIndex = dbPath.indexOf("#");
const uriWithoutFragment = fragmentIndex === -1 ? dbPath : dbPath.slice(0, fragmentIndex);
const queryIndex = uriWithoutFragment.indexOf("?");
const uriPath =
queryIndex === -1 ? uriWithoutFragment : uriWithoutFragment.slice(0, queryIndex);
try {
if (decodeURIComponent(uriPath.slice("file:".length)) === ":memory:") {
return true;
}
} catch {
// Malformed escapes cannot identify a memory URI; retain file-backed handling.
}
return (
queryIndex !== -1 &&
new URLSearchParams(uriWithoutFragment.slice(queryIndex + 1)).get("mode") === "memory"
);
}
function hardenLegacyDatabaseFiles(dbPath: string): void {
for (const candidate of resolveSqliteDatabaseFilePaths(dbPath)) {
if (fs.existsSync(candidate)) {
applyPrivateModeSync(candidate, DEBUG_PROXY_CAPTURE_FILE_MODE);
}
}
}
function openPathBasedDebugProxyCaptureStore(
dbPath: string,
blobDir: string,
): { db: DatabaseSync; pathBased: PathBasedDebugProxyCaptureStore } {
const fileBackedPath = isInMemoryDatabasePath(dbPath) ? undefined : dbPath;
if (fileBackedPath) {
fs.mkdirSync(path.dirname(fileBackedPath), {
recursive: true,
mode: DEBUG_PROXY_CAPTURE_DIR_MODE,
});
if (!fs.existsSync(fileBackedPath)) {
fs.closeSync(fs.openSync(fileBackedPath, "a", DEBUG_PROXY_CAPTURE_FILE_MODE));
}
}
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(dbPath);
let walMaintenance: SqliteWalMaintenance | undefined;
try {
if (fileBackedPath) {
applyPrivateModeSync(fileBackedPath, DEBUG_PROXY_CAPTURE_FILE_MODE);
}
walMaintenance = configureSqliteConnectionPragmas(db, {
busyTimeoutMs: 5000,
databaseLabel: "debug-proxy-capture-sdk",
...(fileBackedPath ? { databasePath: fileBackedPath } : {}),
foreignKeys: true,
});
db.exec(`
CREATE TABLE IF NOT EXISTS capture_sessions (
id TEXT PRIMARY KEY,
started_at INTEGER NOT NULL,
ended_at INTEGER,
mode TEXT NOT NULL,
source_scope TEXT NOT NULL,
source_process TEXT NOT NULL,
proxy_url TEXT,
db_path TEXT NOT NULL,
blob_dir TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS capture_events (
id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL,
ts INTEGER NOT NULL,
source_scope TEXT NOT NULL,
source_process TEXT NOT NULL,
protocol TEXT NOT NULL,
direction TEXT NOT NULL,
kind TEXT NOT NULL,
flow_id TEXT NOT NULL,
method TEXT,
host TEXT,
path TEXT,
status INTEGER,
close_code INTEGER,
content_type TEXT,
headers_json TEXT,
data_text TEXT,
data_blob_id TEXT,
data_sha256 TEXT,
error_text TEXT,
meta_json TEXT
);
CREATE INDEX IF NOT EXISTS capture_events_session_ts_idx ON capture_events(session_id, ts);
CREATE INDEX IF NOT EXISTS capture_events_flow_idx ON capture_events(flow_id, ts);
`);
if (fileBackedPath) {
hardenLegacyDatabaseFiles(fileBackedPath);
}
return {
db,
pathBased: {
blobDir,
walMaintenance,
},
};
} catch (err) {
walMaintenance?.close();
db.close();
throw err;
}
}
function serializeJson(value: unknown): string | null {
return value == null ? null : JSON.stringify(value);
}
@@ -49,18 +181,43 @@ function sortObservedCounts(counts: Map<string, number>): CaptureObservedDimensi
export class DebugProxyCaptureStore {
readonly db: DatabaseSync;
readonly dbPath: string;
readonly blobDir: string;
private readonly pathBased?: PathBasedDebugProxyCaptureStore;
private closed = false;
constructor(options: DebugProxyCaptureStoreOptions = {}) {
const database = openOpenClawStateDatabase({ env: options.env });
constructor(options?: DebugProxyCaptureStoreOptions);
/** @deprecated Use the options overload so capture data lives in shared SQLite state. */
constructor(dbPath: string, blobDir: string);
constructor(
optionsOrDbPath: DebugProxyCaptureStoreOptions | string = {},
legacyBlobDir?: string,
) {
if (typeof optionsOrDbPath === "string") {
if (!legacyBlobDir) {
throw new TypeError("legacy debug proxy capture store requires a blob directory");
}
const opened = openPathBasedDebugProxyCaptureStore(optionsOrDbPath, legacyBlobDir);
this.db = opened.db;
this.dbPath = optionsOrDbPath;
this.blobDir = legacyBlobDir;
this.pathBased = opened.pathBased;
return;
}
const database = openOpenClawStateDatabase({ env: optionsOrDbPath.env });
this.db = database.db;
this.dbPath = database.path;
// Retain the shipped public property while shared-state blobs live in this DB.
this.blobDir = database.path;
}
close(): void {
if (this.closed) {
return;
}
if (this.pathBased) {
this.pathBased.walMaintenance.close();
this.db.close();
}
this.closed = true;
}
@@ -69,6 +226,30 @@ export class DebugProxyCaptureStore {
}
upsertSession(session: CaptureSessionRecord): void {
if (this.pathBased) {
this.db
.prepare(
`INSERT INTO capture_sessions (
id, started_at, ended_at, mode, source_scope, source_process, proxy_url, db_path, blob_dir
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
ended_at=excluded.ended_at,
proxy_url=excluded.proxy_url,
source_process=excluded.source_process`,
)
.run(
session.id,
session.startedAt,
session.endedAt ?? null,
session.mode,
session.sourceScope,
session.sourceProcess,
session.proxyUrl ?? null,
session.dbPath ?? this.dbPath,
session.blobDir ?? this.pathBased.blobDir,
);
return;
}
this.db
.prepare(
`INSERT INTO capture_sessions (
@@ -104,6 +285,27 @@ export class DebugProxyCaptureStore {
persistPayload(data: Buffer, contentType?: string): CaptureBlobRecord {
const sha256 = createHash("sha256").update(data).digest("hex");
const blobId = sha256.slice(0, 24);
if (this.pathBased) {
fs.mkdirSync(this.pathBased.blobDir, {
recursive: true,
mode: DEBUG_PROXY_CAPTURE_DIR_MODE,
});
const outputPath = path.join(this.pathBased.blobDir, `${blobId}.bin.gz`);
if (!fs.existsSync(outputPath)) {
fs.writeFileSync(outputPath, gzipSync(data), {
mode: DEBUG_PROXY_CAPTURE_FILE_MODE,
});
}
applyPrivateModeSync(outputPath, DEBUG_PROXY_CAPTURE_FILE_MODE);
return {
blobId,
path: outputPath,
encoding: "gzip",
sizeBytes: data.byteLength,
sha256,
...(contentType ? { contentType } : {}),
};
}
this.db
.prepare(
`INSERT OR IGNORE INTO capture_blobs (
@@ -129,6 +331,10 @@ export class DebugProxyCaptureStore {
}
recordEvent(event: CaptureEventRecord): void {
if (this.pathBased) {
this.insertEvent(event, event.dataBlobId ?? null);
return;
}
runSqliteImmediateTransactionSync(this.db, () => {
// Capture can be invoked directly by provider seams before the top-level
// runtime initializes. Keep the shared-schema foreign key valid without
@@ -147,39 +353,43 @@ export class DebugProxyCaptureStore {
this.db.prepare(`SELECT 1 FROM capture_blobs WHERE blob_id = ?`).get(event.dataBlobId)
? event.dataBlobId
: null;
this.db
.prepare(
`INSERT INTO capture_events (
session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id,
method, host, path, status, close_code, content_type, headers_json,
data_text, data_blob_id, data_sha256, error_text, meta_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
event.sessionId,
event.ts,
event.sourceScope,
event.sourceProcess,
event.protocol,
event.direction,
event.kind,
event.flowId,
event.method ?? null,
event.host ?? null,
event.path ?? null,
event.status ?? null,
event.closeCode ?? null,
event.contentType ?? null,
event.headersJson ?? null,
event.dataText ?? null,
dataBlobId,
event.dataSha256 ?? null,
event.errorText ?? null,
event.metaJson ?? null,
);
this.insertEvent(event, dataBlobId);
});
}
private insertEvent(event: CaptureEventRecord, dataBlobId: string | null): void {
this.db
.prepare(
`INSERT INTO capture_events (
session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id,
method, host, path, status, close_code, content_type, headers_json,
data_text, data_blob_id, data_sha256, error_text, meta_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
event.sessionId,
event.ts,
event.sourceScope,
event.sourceProcess,
event.protocol,
event.direction,
event.kind,
event.flowId,
event.method ?? null,
event.host ?? null,
event.path ?? null,
event.status ?? null,
event.closeCode ?? null,
event.contentType ?? null,
event.headersJson ?? null,
event.dataText ?? null,
dataBlobId,
event.dataSha256 ?? null,
event.errorText ?? null,
event.metaJson ?? null,
);
}
listSessions(limit = 50): CaptureSessionSummary[] {
return this.db
.prepare(
@@ -274,14 +484,26 @@ export class DebugProxyCaptureStore {
}
readBlob(blobId: string): string | null {
if (this.pathBased) {
const legacyRow = this.db
.prepare(`SELECT data_blob_id AS blobId FROM capture_events WHERE data_blob_id = ? LIMIT 1`)
.get(blobId) as { blobId?: string } | undefined;
if (!legacyRow?.blobId) {
return null;
}
const blobPath = path.join(this.pathBased.blobDir, `${legacyRow.blobId}.bin.gz`);
return fs.existsSync(blobPath)
? gunzipSync(fs.readFileSync(blobPath)).toString("utf8")
: null;
}
const row = this.db
.prepare(`SELECT encoding, data FROM capture_blobs WHERE blob_id = ?`)
.get(blobId) as { data?: Uint8Array; encoding?: string } | undefined;
if (!row?.data) {
return null;
if (row?.data) {
const data = Buffer.from(row.data);
return (row.encoding === "gzip" ? gunzipSync(data) : data).toString("utf8");
}
const data = Buffer.from(row.data);
return (row.encoding === "gzip" ? gunzipSync(data) : data).toString("utf8");
return null;
}
queryPreset(preset: CaptureQueryPreset, sessionId?: string): CaptureQueryRow[] {
@@ -365,6 +587,26 @@ export class DebugProxyCaptureStore {
}
purgeAll(): { sessions: number; events: number; blobs: number } {
if (this.pathBased) {
const sessionCount =
(
this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as {
count: number;
}
).count ?? 0;
const eventCount =
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number })
.count ?? 0;
this.db.exec(`DELETE FROM capture_events; DELETE FROM capture_sessions;`);
let blobs = 0;
if (fs.existsSync(this.pathBased.blobDir)) {
for (const entry of fs.readdirSync(this.pathBased.blobDir)) {
fs.rmSync(path.join(this.pathBased.blobDir, entry), { force: true });
blobs += 1;
}
}
return { sessions: sessionCount, events: eventCount, blobs };
}
return runSqliteImmediateTransactionSync(this.db, () => {
const sessionCount =
(
@@ -390,6 +632,9 @@ export class DebugProxyCaptureStore {
if (uniqueSessionIds.length === 0) {
return { sessions: 0, events: 0, blobs: 0 };
}
if (this.pathBased) {
return this.deletePathBasedSessions(uniqueSessionIds);
}
return runSqliteImmediateTransactionSync(this.db, () => {
const placeholders = uniqueSessionIds.map(() => "?").join(", ");
const blobRows = this.db
@@ -461,6 +706,82 @@ export class DebugProxyCaptureStore {
return { sessions: sessionCount, events: eventCount, blobs };
});
}
private deletePathBasedSessions(sessionIds: string[]): {
sessions: number;
events: number;
blobs: number;
} {
const pathBased = this.pathBased;
if (!pathBased) {
throw new Error("path-based debug proxy capture store is unavailable");
}
const placeholders = sessionIds.map(() => "?").join(", ");
const blobRows = this.db
.prepare(
`SELECT DISTINCT data_blob_id AS blobId
FROM capture_events
WHERE session_id IN (${placeholders})
AND data_blob_id IS NOT NULL`,
)
.all(...sessionIds) as Array<{ blobId?: string | null }>;
const eventCount =
(
this.db
.prepare(
`SELECT COUNT(*) AS count
FROM capture_events
WHERE session_id IN (${placeholders})`,
)
.get(...sessionIds) as { count: number }
).count ?? 0;
const sessionCount =
(
this.db
.prepare(
`SELECT COUNT(*) AS count
FROM capture_sessions
WHERE id IN (${placeholders})`,
)
.get(...sessionIds) as { count: number }
).count ?? 0;
this.db.prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`).run(
...sessionIds,
);
this.db.prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`).run(...sessionIds);
const candidateBlobIds = blobRows
.map((row) => row.blobId?.trim())
.filter((blobId): blobId is string => Boolean(blobId));
const remainingBlobRefs =
candidateBlobIds.length > 0
? new Set(
(
this.db
.prepare(
`SELECT DISTINCT data_blob_id AS blobId
FROM capture_events
WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")})
AND data_blob_id IS NOT NULL`,
)
.all(...candidateBlobIds) as Array<{ blobId?: string | null }>
)
.map((row) => row.blobId?.trim())
.filter((blobId): blobId is string => Boolean(blobId)),
)
: new Set<string>();
let blobs = 0;
for (const blobId of candidateBlobIds) {
if (remainingBlobRefs.has(blobId)) {
continue;
}
const blobPath = path.join(pathBased.blobDir, `${blobId}.bin.gz`);
if (fs.existsSync(blobPath)) {
fs.rmSync(blobPath, { force: true });
blobs += 1;
}
}
return { sessions: sessionCount, events: eventCount, blobs };
}
}
type CachedStoreEntry = {
@@ -470,15 +791,33 @@ type CachedStoreEntry = {
const cachedStores = new Map<string, CachedStoreEntry>();
function resolveDebugProxyCaptureStoreKey(
optionsOrDbPath: DebugProxyCaptureStoreOptions | string,
legacyBlobDir?: string,
): string {
return typeof optionsOrDbPath === "string"
? `legacy:${optionsOrDbPath}:${legacyBlobDir ?? ""}`
: `shared:${openOpenClawStateDatabase({ env: optionsOrDbPath.env }).path}`;
}
export function getDebugProxyCaptureStore(
options: DebugProxyCaptureStoreOptions = {},
options?: DebugProxyCaptureStoreOptions,
): DebugProxyCaptureStore;
/** @deprecated Use the options overload so capture data lives in shared SQLite state. */
export function getDebugProxyCaptureStore(dbPath: string, blobDir: string): DebugProxyCaptureStore;
export function getDebugProxyCaptureStore(
optionsOrDbPath: DebugProxyCaptureStoreOptions | string = {},
legacyBlobDir?: string,
): DebugProxyCaptureStore {
const key = openOpenClawStateDatabase({ env: options.env }).path;
const key = resolveDebugProxyCaptureStoreKey(optionsOrDbPath, legacyBlobDir);
const cached = cachedStores.get(key);
if (cached && !cached.store.isClosed) {
return cached.store;
}
const store = new DebugProxyCaptureStore(options);
const store =
typeof optionsOrDbPath === "string"
? new DebugProxyCaptureStore(optionsOrDbPath, legacyBlobDir!)
: new DebugProxyCaptureStore(optionsOrDbPath);
cachedStores.set(key, { store, leases: 0 });
return store;
}
@@ -492,12 +831,27 @@ export function closeDebugProxyCaptureStore(): void {
// Lease API keeps one cached capture-store wrapper alive across related
// operations, then releases it without closing the shared state database.
export function acquireDebugProxyCaptureStore(options: DebugProxyCaptureStoreOptions = {}): {
export function acquireDebugProxyCaptureStore(options?: DebugProxyCaptureStoreOptions): {
store: DebugProxyCaptureStore;
release: () => void;
};
/** @deprecated Use the options overload so capture data lives in shared SQLite state. */
export function acquireDebugProxyCaptureStore(dbPath: string, blobDir: string): {
store: DebugProxyCaptureStore;
release: () => void;
};
export function acquireDebugProxyCaptureStore(
optionsOrDbPath: DebugProxyCaptureStoreOptions | string = {},
legacyBlobDir?: string,
): {
store: DebugProxyCaptureStore;
release: () => void;
} {
const store = getDebugProxyCaptureStore(options);
const key = store.dbPath;
const key = resolveDebugProxyCaptureStoreKey(optionsOrDbPath, legacyBlobDir);
const store =
typeof optionsOrDbPath === "string"
? getDebugProxyCaptureStore(optionsOrDbPath, legacyBlobDir!)
: getDebugProxyCaptureStore(optionsOrDbPath);
const cached = cachedStores.get(key);
if (!cached || cached.store !== store) {
throw new Error("debug proxy capture store cache changed while acquiring a lease");

View File

@@ -23,10 +23,16 @@ export type CaptureSessionRecord = {
sourceScope: "openclaw";
sourceProcess: string;
proxyUrl?: string;
/** @deprecated Capture storage now lives in the shared state database. */
dbPath?: string;
/** @deprecated Capture payloads now live in the shared state database. */
blobDir?: string;
};
export type CaptureBlobRecord = {
blobId: string;
/** @deprecated Shared-state capture blobs do not have a standalone file path. */
path?: string;
encoding: "gzip";
sizeBytes: number;
sha256: string;