From 0876ed59e03e170c34ce66d46271ed2bd8a2e8a0 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 17 Jun 2026 06:46:53 +0200 Subject: [PATCH] refactor(proxy): store captures in shared state database --- extensions/microsoft/speech-provider.test.ts | 20 +- extensions/openai/tts.test.ts | 18 +- extensions/qa-lab/src/lab-server.test.ts | 8 - extensions/qa-lab/src/lab-server.ts | 5 +- .../debug-proxy-env-test-helpers.ts | 15 +- src/cli/proxy-cli.runtime.test.ts | 13 +- src/cli/proxy-cli.runtime.ts | 33 +- src/proxy-capture/blob-store.ts | 45 -- src/proxy-capture/env.test.ts | 27 + src/proxy-capture/env.ts | 21 +- src/proxy-capture/paths.ts | 14 +- .../proxy-server.managed-proxy.test.ts | 13 +- src/proxy-capture/proxy-server.test.ts | 16 +- src/proxy-capture/proxy-server.ts | 2 +- src/proxy-capture/runtime.test.ts | 2 - src/proxy-capture/runtime.ts | 16 +- src/proxy-capture/store.sqlite.test.ts | 189 +++---- src/proxy-capture/store.sqlite.ts | 463 ++++++++---------- src/proxy-capture/types.ts | 3 - 19 files changed, 391 insertions(+), 532 deletions(-) delete mode 100644 src/proxy-capture/blob-store.ts diff --git a/extensions/microsoft/speech-provider.test.ts b/extensions/microsoft/speech-provider.test.ts index 8ebfc153a34..822facbca75 100644 --- a/extensions/microsoft/speech-provider.test.ts +++ b/extensions/microsoft/speech-provider.test.ts @@ -99,8 +99,7 @@ describe("listMicrosoftVoices", () => { const tempDir = mkdtempSync(path.join(os.tmpdir(), "microsoft-voices-capture-")); proxyReset.captureProxyEnv(); process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1"; - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite"); - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs"); + process.env.OPENCLAW_STATE_DIR = tempDir; process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "ms-voices-session"; globalThis.fetch = vi @@ -109,18 +108,13 @@ describe("listMicrosoftVoices", () => { new Response(JSON.stringify([{ ShortName: "en-US-AvaNeural" }]), { status: 200 }), ) as unknown as typeof globalThis.fetch; - const store = getDebugProxyCaptureStore( - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, - ); + const store = getDebugProxyCaptureStore(); store.upsertSession({ id: "ms-voices-session", startedAt: Date.now(), mode: "test", sourceScope: "openclaw", sourceProcess: "openclaw", - dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, }); await listMicrosoftVoices(); @@ -144,26 +138,20 @@ describe("listMicrosoftVoices", () => { const tempDir = mkdtempSync(path.join(os.tmpdir(), "microsoft-voices-global-")); proxyReset.captureProxyEnv(); process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1"; - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite"); - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs"); + process.env.OPENCLAW_STATE_DIR = tempDir; process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "ms-voices-global-session"; globalThis.fetch = vi.fn( async () => new Response(JSON.stringify([{ ShortName: "en-US-AvaNeural" }]), { status: 200 }), ) as unknown as typeof globalThis.fetch; - const store = getDebugProxyCaptureStore( - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, - ); + const store = getDebugProxyCaptureStore(); store.upsertSession({ id: "ms-voices-global-session", startedAt: Date.now(), mode: "test", sourceScope: "openclaw", sourceProcess: "openclaw", - dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, }); initializeDebugProxyCapture("test"); diff --git a/extensions/openai/tts.test.ts b/extensions/openai/tts.test.ts index 758cddd08f9..f1ea2ba9213 100644 --- a/extensions/openai/tts.test.ts +++ b/extensions/openai/tts.test.ts @@ -378,8 +378,7 @@ describe("openai tts", () => { const tempDir = mkdtempSync(path.join(os.tmpdir(), "openai-tts-capture-")); proxyReset.captureProxyEnv(); process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1"; - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite"); - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs"); + process.env.OPENCLAW_STATE_DIR = tempDir; process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "tts-session"; globalThis.fetch = vi @@ -388,18 +387,13 @@ describe("openai tts", () => { new Response(Buffer.from("audio-bytes"), { status: 200 }), ) as unknown as typeof globalThis.fetch; - const store = getDebugProxyCaptureStore( - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, - ); + const store = getDebugProxyCaptureStore(); store.upsertSession({ id: "tts-session", startedAt: Date.now(), mode: "test", sourceScope: "openclaw", sourceProcess: "openclaw", - dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, }); await openaiTTS({ @@ -427,8 +421,7 @@ describe("openai tts", () => { const tempDir = mkdtempSync(path.join(os.tmpdir(), "openai-tts-patched-capture-")); proxyReset.captureProxyEnv(); process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1"; - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite"); - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs"); + process.env.OPENCLAW_STATE_DIR = tempDir; process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "tts-patched-session"; globalThis.fetch = vi @@ -449,10 +442,7 @@ describe("openai tts", () => { timeoutMs: 5_000, }); - const store = getDebugProxyCaptureStore( - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, - ); + const store = getDebugProxyCaptureStore(); let events: Array> = []; try { await vi.waitFor(() => { diff --git a/extensions/qa-lab/src/lab-server.test.ts b/extensions/qa-lab/src/lab-server.test.ts index 9f5331a559f..d48c4239135 100644 --- a/extensions/qa-lab/src/lab-server.test.ts +++ b/extensions/qa-lab/src/lab-server.test.ts @@ -150,8 +150,6 @@ vi.mock("openclaw/plugin-sdk/proxy-capture", () => ({ }), getDebugProxyCaptureStore: () => captureMock.store, resolveDebugProxySettings: () => ({ - dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH ?? "", - blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR ?? "", proxyUrl: process.env.OPENCLAW_DEBUG_PROXY_URL ?? "", sessionId: "qa-lab-test", }), @@ -876,8 +874,6 @@ describe("qa-lab server", () => { cleanups.push(async () => { await rm(tempDir, { recursive: true, force: true }); }); - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite"); - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs"); const store = captureMock.store; store.upsertSession({ id: "qa-capture-session", @@ -885,8 +881,6 @@ describe("qa-lab server", () => { mode: "proxy-run", sourceScope: "openclaw", sourceProcess: "openclaw", - dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH, - blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR, }); store.recordEvent({ sessionId: "qa-capture-session", @@ -954,8 +948,6 @@ describe("qa-lab server", () => { port: 0, }); cleanups.push(async () => { - delete process.env.OPENCLAW_DEBUG_PROXY_DB_PATH; - delete process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR; await lab.stop(); }); diff --git a/extensions/qa-lab/src/lab-server.ts b/extensions/qa-lab/src/lab-server.ts index b82454872a6..918a6840d8d 100644 --- a/extensions/qa-lab/src/lab-server.ts +++ b/extensions/qa-lab/src/lab-server.ts @@ -219,10 +219,7 @@ export async function startQaLabServer( ): Promise { const repoRoot = path.resolve(params?.repoRoot ?? process.cwd()); const captureSettings = resolveDebugProxySettings(); - const captureStoreLease = acquireDebugProxyCaptureStore( - captureSettings.dbPath, - captureSettings.blobDir, - ); + const captureStoreLease = acquireDebugProxyCaptureStore(); const captureStore = captureStoreLease.store; const state = createQaBusState(); let latestReport: QaLabLatestReport | null = null; diff --git a/extensions/test-support/debug-proxy-env-test-helpers.ts b/extensions/test-support/debug-proxy-env-test-helpers.ts index d81acb273b1..c17f7cff326 100644 --- a/extensions/test-support/debug-proxy-env-test-helpers.ts +++ b/extensions/test-support/debug-proxy-env-test-helpers.ts @@ -3,9 +3,8 @@ import { afterEach, vi } from "vitest"; const DEBUG_PROXY_ENV_KEYS = [ "OPENCLAW_DEBUG_PROXY_ENABLED", - "OPENCLAW_DEBUG_PROXY_DB_PATH", - "OPENCLAW_DEBUG_PROXY_BLOB_DIR", "OPENCLAW_DEBUG_PROXY_SESSION_ID", + "OPENCLAW_STATE_DIR", ] as const; type DebugProxyEnvKey = (typeof DEBUG_PROXY_ENV_KEYS)[number]; @@ -30,13 +29,19 @@ function restoreDebugProxyEnv(snapshot: DebugProxyEnvSnapshot): void { export function installDebugProxyTestResetHooks() { const originalFetch = globalThis.fetch; - let priorProxyEnv: DebugProxyEnvSnapshot = {}; + const originalProxyEnv = snapshotDebugProxyEnv(); + let priorProxyEnv = originalProxyEnv; - afterEach(() => { + afterEach(async () => { + const { closeDebugProxyCaptureStore } = await import("openclaw/plugin-sdk/proxy-capture"); + const { closeOpenClawStateDatabaseForTest } = + await import("openclaw/plugin-sdk/sqlite-runtime-testing"); + closeDebugProxyCaptureStore(); + closeOpenClawStateDatabaseForTest(); globalThis.fetch = originalFetch; vi.restoreAllMocks(); restoreDebugProxyEnv(priorProxyEnv); - priorProxyEnv = {}; + priorProxyEnv = originalProxyEnv; }); return { diff --git a/src/cli/proxy-cli.runtime.test.ts b/src/cli/proxy-cli.runtime.test.ts index de8b5371905..d4cc048ccf7 100644 --- a/src/cli/proxy-cli.runtime.test.ts +++ b/src/cli/proxy-cli.runtime.test.ts @@ -39,8 +39,7 @@ vi.mock("../infra/net/proxy/proxy-validation.js", () => ({ describe("proxy cli runtime", () => { const envKeys = [ - "OPENCLAW_DEBUG_PROXY_DB_PATH", - "OPENCLAW_DEBUG_PROXY_BLOB_DIR", + "OPENCLAW_STATE_DIR", "OPENCLAW_DEBUG_PROXY_CERT_DIR", "OPENCLAW_DEBUG_PROXY_SESSION_ID", "OPENCLAW_DEBUG_PROXY_ENABLED", @@ -52,8 +51,7 @@ describe("proxy cli runtime", () => { beforeEach(() => { tempDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-cli-runtime-")); - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite"); - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs"); + process.env.OPENCLAW_STATE_DIR = tempDir; process.env.OPENCLAW_DEBUG_PROXY_CERT_DIR = path.join(tempDir, "certs"); delete process.env.OPENCLAW_DEBUG_PROXY_ENABLED; delete process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID; @@ -92,7 +90,9 @@ describe("proxy cli runtime", () => { afterEach(async () => { const { closeDebugProxyCaptureStore } = await import("../proxy-capture/store.sqlite.js"); + const { closeOpenClawStateDatabaseForTest } = await import("../state/openclaw-state-db.js"); closeDebugProxyCaptureStore(); + closeOpenClawStateDatabaseForTest(); vi.restoreAllMocks(); vi.resetModules(); process.exitCode = undefined; @@ -503,10 +503,7 @@ describe("proxy cli runtime", () => { expect(serverStopSpy).toHaveBeenCalledTimes(1); - const store = getDebugProxyCaptureStore( - process.env.OPENCLAW_DEBUG_PROXY_DB_PATH!, - process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR!, - ); + const store = getDebugProxyCaptureStore(); const [session] = store.listSessions(5); expect(session?.mode).toBe("proxy-run"); expect(session?.endedAt).toBeGreaterThanOrEqual(beforeRun); diff --git a/src/cli/proxy-cli.runtime.ts b/src/cli/proxy-cli.runtime.ts index 6f4464c5c19..40639db181b 100644 --- a/src/cli/proxy-cli.runtime.ts +++ b/src/cli/proxy-cli.runtime.ts @@ -24,7 +24,7 @@ import type { CaptureQueryPreset } from "../proxy-capture/types.js"; export async function runDebugProxyStartCommand(opts: { host?: string; port?: number }) { const settings = resolveDebugProxySettings(); - const store = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir); + const store = getDebugProxyCaptureStore(); store.upsertSession({ id: settings.sessionId, startedAt: Date.now(), @@ -32,8 +32,6 @@ export async function runDebugProxyStartCommand(opts: { host?: string; port?: nu sourceScope: "openclaw", sourceProcess: "openclaw", proxyUrl: settings.proxyUrl, - dbPath: settings.dbPath, - blobDir: settings.blobDir, }); initializeDebugProxyCapture("proxy-start", settings); const ca = await ensureDebugProxyCa(settings.certDir); @@ -44,7 +42,7 @@ export async function runDebugProxyStartCommand(opts: { host?: string; port?: nu }); process.stdout.write(`Debug proxy: ${server.proxyUrl}\n`); process.stdout.write(`CA cert: ${ca.certPath}\n`); - process.stdout.write(`Capture DB: ${settings.dbPath}\n`); + process.stdout.write(`Capture DB: ${store.dbPath}\n`); process.stdout.write("Press Ctrl+C to stop.\n"); const shutdown = async () => { process.off("SIGINT", onSignal); @@ -81,15 +79,13 @@ export async function runDebugProxyRunCommand(opts: { ...baseSettings, sessionId, }; - getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).upsertSession({ + getDebugProxyCaptureStore().upsertSession({ id: sessionId, startedAt: Date.now(), mode: "proxy-run", sourceScope: "openclaw", sourceProcess: "openclaw", proxyUrl: undefined, - dbPath: settings.dbPath, - blobDir: settings.blobDir, }); const server = await startDebugProxyServer({ host: opts.host, @@ -100,8 +96,6 @@ export async function runDebugProxyRunCommand(opts: { const childEnv = applyDebugProxyEnv(process.env, { proxyUrl: server.proxyUrl, sessionId, - dbPath: settings.dbPath, - blobDir: settings.blobDir, certDir: settings.certDir, }); try { @@ -119,7 +113,7 @@ export async function runDebugProxyRunCommand(opts: { }); } finally { await server.stop(); - getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).endSession(sessionId); + getDebugProxyCaptureStore().endSession(sessionId); } } @@ -292,10 +286,7 @@ export async function runProxyValidateCommand(opts: { } export async function runDebugProxySessionsCommand(opts: { limit?: number }) { - const settings = resolveDebugProxySettings(); - const sessions = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).listSessions( - opts.limit ?? 20, - ); + const sessions = getDebugProxyCaptureStore().listSessions(opts.limit ?? 20); process.stdout.write(`${JSON.stringify(sessions, null, 2)}\n`); closeDebugProxyCaptureStore(); } @@ -304,11 +295,7 @@ export async function runDebugProxyQueryCommand(opts: { preset: CaptureQueryPreset; sessionId?: string; }) { - const settings = resolveDebugProxySettings(); - const rows = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).queryPreset( - opts.preset, - opts.sessionId, - ); + const rows = getDebugProxyCaptureStore().queryPreset(opts.preset, opts.sessionId); process.stdout.write(`${JSON.stringify(rows, null, 2)}\n`); closeDebugProxyCaptureStore(); } @@ -319,17 +306,13 @@ export async function runDebugProxyCoverageCommand() { } export async function runDebugProxyPurgeCommand() { - const settings = resolveDebugProxySettings(); - const result = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).purgeAll(); + const result = getDebugProxyCaptureStore().purgeAll(); process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); closeDebugProxyCaptureStore(); } export async function readDebugProxyBlobCommand(opts: { blobId: string }) { - const settings = resolveDebugProxySettings(); - const content = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).readBlob( - opts.blobId, - ); + const content = getDebugProxyCaptureStore().readBlob(opts.blobId); if (content == null) { closeDebugProxyCaptureStore(); throw new Error(`Unknown blob: ${opts.blobId}`); diff --git a/src/proxy-capture/blob-store.ts b/src/proxy-capture/blob-store.ts deleted file mode 100644 index 3432595f079..00000000000 --- a/src/proxy-capture/blob-store.ts +++ /dev/null @@ -1,45 +0,0 @@ -// Proxy capture blob store persists captured request and response bodies by hash. -import { createHash } from "node:crypto"; -import fs from "node:fs"; -import path from "node:path"; -import { gzipSync, gunzipSync } from "node:zlib"; -import { applyPrivateModeSync } from "../infra/private-mode.js"; -import type { CaptureBlobRecord } from "./types.js"; - -// Capture blobs store request/response bodies by content hash, gzip-compressed -// on disk, so repeated payloads can share one file across events. -const DEBUG_PROXY_CAPTURE_DIR_MODE = 0o700; -const DEBUG_PROXY_CAPTURE_FILE_MODE = 0o600; - -function ensureDir(dir: string) { - fs.mkdirSync(dir, { recursive: true, mode: DEBUG_PROXY_CAPTURE_DIR_MODE }); -} - -export function writeCaptureBlob(params: { - blobDir: string; - data: Buffer; - contentType?: string; -}): CaptureBlobRecord { - ensureDir(params.blobDir); - const sha256 = createHash("sha256").update(params.data).digest("hex"); - const blobId = sha256.slice(0, 24); - const outputPath = path.join(params.blobDir, `${blobId}.bin.gz`); - if (!fs.existsSync(outputPath)) { - fs.writeFileSync(outputPath, gzipSync(params.data), { mode: DEBUG_PROXY_CAPTURE_FILE_MODE }); - } - applyPrivateModeSync(outputPath, DEBUG_PROXY_CAPTURE_FILE_MODE); - return { - blobId, - path: outputPath, - encoding: "gzip", - sizeBytes: params.data.byteLength, - sha256, - ...(params.contentType ? { contentType: params.contentType } : {}), - }; -} - -// Debug CLI reads blobs as UTF-8 previews. Binary payloads still remain -// available via the compressed file path recorded in the blob metadata. -export function readCaptureBlobText(blobPath: string): string { - return gunzipSync(fs.readFileSync(blobPath)).toString("utf8"); -} diff --git a/src/proxy-capture/env.test.ts b/src/proxy-capture/env.test.ts index d93634220c3..ed131ca9b47 100644 --- a/src/proxy-capture/env.test.ts +++ b/src/proxy-capture/env.test.ts @@ -1,6 +1,7 @@ // Proxy capture env tests cover environment variable generation for capture sessions. import { describe, expect, it } from "vitest"; import { + applyDebugProxyEnv, OPENCLAW_DEBUG_PROXY_ENABLED, OPENCLAW_DEBUG_PROXY_SESSION_ID, resolveDebugProxySettings, @@ -26,4 +27,30 @@ describe("resolveDebugProxySettings", () => { expect(settings.sessionId).toBe("session-explicit"); }); + + it("ignores obsolete capture storage overrides", () => { + const settings = resolveDebugProxySettings({ + OPENCLAW_DEBUG_PROXY_DB_PATH: "/tmp/legacy-capture.sqlite", + OPENCLAW_DEBUG_PROXY_BLOB_DIR: "/tmp/legacy-capture-blobs", + }); + + expect(settings).not.toHaveProperty("dbPath"); + expect(settings).not.toHaveProperty("blobDir"); + }); + + it("does not pass obsolete capture storage overrides to child processes", () => { + const env = applyDebugProxyEnv( + { + OPENCLAW_DEBUG_PROXY_DB_PATH: "/tmp/legacy-capture.sqlite", + OPENCLAW_DEBUG_PROXY_BLOB_DIR: "/tmp/legacy-capture-blobs", + }, + { + proxyUrl: "http://127.0.0.1:7799", + sessionId: "session-child", + }, + ); + + expect(env.OPENCLAW_DEBUG_PROXY_DB_PATH).toBeUndefined(); + expect(env.OPENCLAW_DEBUG_PROXY_BLOB_DIR).toBeUndefined(); + }); }); diff --git a/src/proxy-capture/env.ts b/src/proxy-capture/env.ts index 5567dc8c6b0..b2f101a5a67 100644 --- a/src/proxy-capture/env.ts +++ b/src/proxy-capture/env.ts @@ -3,18 +3,12 @@ import { randomUUID } from "node:crypto"; import type { Agent } from "node:http"; import process from "node:process"; import { createAmbientNodeProxyAgent } from "@openclaw/proxyline"; -import { - resolveDebugProxyBlobDir, - resolveDebugProxyCertDir, - resolveDebugProxyDbPath, -} from "./paths.js"; +import { resolveDebugProxyCertDir } from "./paths.js"; // Environment contract for debug proxy capture. These vars are passed to child // processes and provider transports so capture sessions share one store/proxy. export const OPENCLAW_DEBUG_PROXY_ENABLED = "OPENCLAW_DEBUG_PROXY_ENABLED"; export const OPENCLAW_DEBUG_PROXY_URL = "OPENCLAW_DEBUG_PROXY_URL"; -export const OPENCLAW_DEBUG_PROXY_DB_PATH = "OPENCLAW_DEBUG_PROXY_DB_PATH"; -export const OPENCLAW_DEBUG_PROXY_BLOB_DIR = "OPENCLAW_DEBUG_PROXY_BLOB_DIR"; export const OPENCLAW_DEBUG_PROXY_CERT_DIR = "OPENCLAW_DEBUG_PROXY_CERT_DIR"; export const OPENCLAW_DEBUG_PROXY_SESSION_ID = "OPENCLAW_DEBUG_PROXY_SESSION_ID"; export const OPENCLAW_DEBUG_PROXY_REQUIRE = "OPENCLAW_DEBUG_PROXY_REQUIRE"; @@ -23,8 +17,6 @@ export type DebugProxySettings = { enabled: boolean; required: boolean; proxyUrl?: string; - dbPath: string; - blobDir: string; certDir: string; sessionId: string; sourceProcess: string; @@ -48,8 +40,6 @@ export function resolveDebugProxySettings( enabled, required: isTruthy(env[OPENCLAW_DEBUG_PROXY_REQUIRE]), proxyUrl: env[OPENCLAW_DEBUG_PROXY_URL]?.trim() || undefined, - dbPath: env[OPENCLAW_DEBUG_PROXY_DB_PATH]?.trim() || resolveDebugProxyDbPath(env), - blobDir: env[OPENCLAW_DEBUG_PROXY_BLOB_DIR]?.trim() || resolveDebugProxyBlobDir(env), certDir: env[OPENCLAW_DEBUG_PROXY_CERT_DIR]?.trim() || resolveDebugProxyCertDir(env), sessionId, sourceProcess: "openclaw", @@ -61,20 +51,19 @@ export function applyDebugProxyEnv( params: { proxyUrl: string; sessionId: string; - dbPath?: string; - blobDir?: string; certDir?: string; }, ): NodeJS.ProcessEnv { // Child process env forces proxy capture and standard proxy variables while // preserving unrelated environment values. + const baseEnv = { ...env }; + delete baseEnv.OPENCLAW_DEBUG_PROXY_DB_PATH; + delete baseEnv.OPENCLAW_DEBUG_PROXY_BLOB_DIR; return { - ...env, + ...baseEnv, [OPENCLAW_DEBUG_PROXY_ENABLED]: "1", [OPENCLAW_DEBUG_PROXY_REQUIRE]: "1", [OPENCLAW_DEBUG_PROXY_URL]: params.proxyUrl, - [OPENCLAW_DEBUG_PROXY_DB_PATH]: params.dbPath ?? resolveDebugProxyDbPath(env), - [OPENCLAW_DEBUG_PROXY_BLOB_DIR]: params.blobDir ?? resolveDebugProxyBlobDir(env), [OPENCLAW_DEBUG_PROXY_CERT_DIR]: params.certDir ?? resolveDebugProxyCertDir(env), [OPENCLAW_DEBUG_PROXY_SESSION_ID]: params.sessionId, HTTP_PROXY: params.proxyUrl, diff --git a/src/proxy-capture/paths.ts b/src/proxy-capture/paths.ts index ce3dce3a929..f7ddbfff1bb 100644 --- a/src/proxy-capture/paths.ts +++ b/src/proxy-capture/paths.ts @@ -1,21 +1,13 @@ -// Proxy capture path helpers resolve capture directories and database paths. +// Proxy capture path helpers resolve certificate artifacts. import path from "node:path"; import { resolveStateDir } from "../config/paths.js"; -// Debug proxy capture artifacts live under OpenClaw state so DB, blobs, and CA -// files are grouped and easy to purge. +// Debug proxy CA files live under OpenClaw state. Capture data lives in the +// shared global state database. function resolveDebugProxyRootDir(env: NodeJS.ProcessEnv = process.env): string { return path.join(resolveStateDir(env), "debug-proxy"); } -export function resolveDebugProxyDbPath(env: NodeJS.ProcessEnv = process.env): string { - return path.join(resolveDebugProxyRootDir(env), "capture.sqlite"); -} - -export function resolveDebugProxyBlobDir(env: NodeJS.ProcessEnv = process.env): string { - return path.join(resolveDebugProxyRootDir(env), "blobs"); -} - export function resolveDebugProxyCertDir(env: NodeJS.ProcessEnv = process.env): string { return path.join(resolveDebugProxyRootDir(env), "certs"); } diff --git a/src/proxy-capture/proxy-server.managed-proxy.test.ts b/src/proxy-capture/proxy-server.managed-proxy.test.ts index 084915e2a0c..6c26ec98caa 100644 --- a/src/proxy-capture/proxy-server.managed-proxy.test.ts +++ b/src/proxy-capture/proxy-server.managed-proxy.test.ts @@ -5,11 +5,21 @@ import { Socket, type AddressInfo } from "node:net"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import { assertDebugProxyDirectUpstreamAllowed, startDebugProxyServer } from "./proxy-server.js"; +import { closeDebugProxyCaptureStore } from "./store.sqlite.js"; let testRoot: string | undefined; +const originalStateDir = process.env.OPENCLAW_STATE_DIR; async function cleanupTestDirs(): Promise { + closeDebugProxyCaptureStore(); + closeOpenClawStateDatabaseForTest(); + if (originalStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = originalStateDir; + } if (!testRoot) { return; } @@ -24,11 +34,10 @@ async function makeSettings() { await mkdir(certDir, { recursive: true }); await writeFile(join(certDir, "root-ca.pem"), "test root cert\n", "utf8"); await writeFile(join(certDir, "root-ca-key.pem"), "test root key\n", "utf8"); + process.env.OPENCLAW_STATE_DIR = testRoot; return { enabled: true, required: false, - dbPath: ":memory:", - blobDir: join(testRoot, "blobs"), certDir, sessionId: "debug-proxy-managed-proxy-test", sourceProcess: "test", diff --git a/src/proxy-capture/proxy-server.test.ts b/src/proxy-capture/proxy-server.test.ts index 61dbe7af46c..f4b78a295f9 100644 --- a/src/proxy-capture/proxy-server.test.ts +++ b/src/proxy-capture/proxy-server.test.ts @@ -5,14 +5,22 @@ import type { AddressInfo } from "node:net"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, describe, expect, it } from "vitest"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import type { DebugProxySettings } from "./env.js"; import { parseConnectTarget, startDebugProxyServer } from "./proxy-server.js"; import { closeDebugProxyCaptureStore, getDebugProxyCaptureStore } from "./store.sqlite.js"; let testRoot: string | undefined; +const originalStateDir = process.env.OPENCLAW_STATE_DIR; async function cleanupTestRoot(): Promise { closeDebugProxyCaptureStore(); + closeOpenClawStateDatabaseForTest(); + if (originalStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = originalStateDir; + } if (!testRoot) { return; } @@ -27,11 +35,10 @@ async function makeSettings(): Promise { await mkdir(certDir, { recursive: true }); await writeFile(join(certDir, "root-ca.pem"), "test root cert\n", "utf8"); await writeFile(join(certDir, "root-ca-key.pem"), "test root key\n", "utf8"); + process.env.OPENCLAW_STATE_DIR = testRoot; return { enabled: true, required: false, - dbPath: ":memory:", - blobDir: join(testRoot, "blobs"), certDir, sessionId: "debug-proxy-server-test", sourceProcess: "test", @@ -160,10 +167,7 @@ describe("startDebugProxyServer", () => { expect(origin.receivedRequestBody()).toBe(requestBody); expect(responseBody).toBe(origin.responseBody); - const events = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).getSessionEvents( - settings.sessionId, - 10, - ); + const events = getDebugProxyCaptureStore().getSessionEvents(settings.sessionId, 10); const capturedRequest = events.find((event) => event.kind === "request"); const capturedResponse = events.find((event) => event.kind === "response"); expect(capturedRequest?.dataText).toBe("q".repeat(8192)); diff --git a/src/proxy-capture/proxy-server.ts b/src/proxy-capture/proxy-server.ts index 98364abc29b..a4d4b607a40 100644 --- a/src/proxy-capture/proxy-server.ts +++ b/src/proxy-capture/proxy-server.ts @@ -154,7 +154,7 @@ export async function startDebugProxyServer(params: { settings: DebugProxySettings; }): Promise { await ensureDebugProxyCa(params.settings.certDir); - const store = getDebugProxyCaptureStore(params.settings.dbPath, params.settings.blobDir); + const store = getDebugProxyCaptureStore(); const recordProxyEvent = createProxyCaptureRecorder({ store, settings: params.settings }); const host = params.host?.trim() || "127.0.0.1"; diff --git a/src/proxy-capture/runtime.test.ts b/src/proxy-capture/runtime.test.ts index eb71fbc4a04..9de79fa27a8 100644 --- a/src/proxy-capture/runtime.test.ts +++ b/src/proxy-capture/runtime.test.ts @@ -13,8 +13,6 @@ type StoreCall = { name: string; args: unknown[] }; const settings: DebugProxySettings = { enabled: true, required: false, - dbPath: "/tmp/openclaw-proxy-runtime-test.sqlite", - blobDir: "/tmp/openclaw-proxy-runtime-test-blobs", certDir: "/tmp/openclaw-proxy-runtime-test-certs", sessionId: "runtime-test-session", sourceProcess: "runtime-test", diff --git a/src/proxy-capture/runtime.ts b/src/proxy-capture/runtime.ts index 45e111d0407..2af3f17cef5 100644 --- a/src/proxy-capture/runtime.ts +++ b/src/proxy-capture/runtime.ts @@ -42,7 +42,7 @@ const SENSITIVE_CAPTURE_HEADER_NAME_FRAGMENTS = [ ]; // Runtime capture records HTTP/fetch and websocket events into the SQLite store, -// redacting sensitive headers and persisting bodies through the blob store. +// redacting sensitive headers and persisting bodies in capture_blobs. type GlobalFetchPatchedState = { originalFetch: typeof globalThis.fetch; }; @@ -57,7 +57,7 @@ type DebugProxyCaptureStoreLike = Pick< >; export type DebugProxyCaptureRuntimeDeps = { - getStore?: (dbPath: string, blobDir: string) => DebugProxyCaptureStoreLike; + getStore?: () => DebugProxyCaptureStoreLike; closeStore?: typeof closeDebugProxyCaptureStore; persistEventPayload?: ( store: DebugProxyCaptureStoreLike, @@ -221,7 +221,7 @@ function installDebugProxyGlobalFetchPatch( return response; } catch (error) { if (url && /^https?:/i.test(url)) { - const store = runtime.getStore(settings.dbPath, settings.blobDir); + const store = runtime.getStore(); const parsed = new URL(url); store.recordEvent({ sessionId: settings.sessionId, @@ -278,15 +278,13 @@ export function initializeDebugProxyCapture( if (!settings.enabled) { return; } - resolveRuntimeDeps(deps).getStore(settings.dbPath, settings.blobDir).upsertSession({ + resolveRuntimeDeps(deps).getStore().upsertSession({ id: settings.sessionId, startedAt: Date.now(), mode, sourceScope: "openclaw", sourceProcess: settings.sourceProcess, proxyUrl: settings.proxyUrl, - dbPath: settings.dbPath, - blobDir: settings.blobDir, }); installDebugProxyGlobalFetchPatch(settings, deps); } @@ -302,7 +300,7 @@ export function finalizeDebugProxyCapture( return; } const runtime = resolveRuntimeDeps(deps); - runtime.getStore(settings.dbPath, settings.blobDir).endSession(settings.sessionId); + runtime.getStore().endSession(settings.sessionId); uninstallDebugProxyGlobalFetchPatch(deps); runtime.closeStore(); } @@ -326,7 +324,7 @@ export function captureHttpExchange( return; } const runtime = resolveRuntimeDeps(deps); - const store = runtime.getStore(settings.dbPath, settings.blobDir); + const store = runtime.getStore(); const flowId = params.flowId ?? randomUUID(); const url = new URL(params.url); const requestBody = @@ -449,7 +447,7 @@ export function captureWsEvent(params: { if (!settings.enabled) { return; } - const store = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir); + const store = getDebugProxyCaptureStore(); const url = new URL(params.url); const payload = persistEventPayload(store, { data: params.payload, diff --git a/src/proxy-capture/store.sqlite.test.ts b/src/proxy-capture/store.sqlite.test.ts index b15163fa039..f6b5907e407 100644 --- a/src/proxy-capture/store.sqlite.test.ts +++ b/src/proxy-capture/store.sqlite.test.ts @@ -3,8 +3,8 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; -import { listOpenFileDescriptorsForPath } from "../infra/open-file-descriptors.test-support.js"; import { resolveSqliteDatabaseFilePaths } from "../infra/sqlite-files.js"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import { acquireDebugProxyCaptureStore, closeDebugProxyCaptureStore, @@ -17,6 +17,7 @@ const cleanupDirs: string[] = []; afterEach(() => { closeDebugProxyCaptureStore(); + closeOpenClawStateDatabaseForTest(); vi.restoreAllMocks(); while (cleanupDirs.length > 0) { const dir = cleanupDirs.pop(); @@ -29,7 +30,13 @@ afterEach(() => { function makeStore() { const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-")); cleanupDirs.push(root); - return new DebugProxyCaptureStore(path.join(root, "capture.sqlite"), path.join(root, "blobs")); + return new DebugProxyCaptureStore({ env: { OPENCLAW_STATE_DIR: root } }); +} + +function makeStateEnv(prefix: string): NodeJS.ProcessEnv { + const root = fs.mkdtempSync(path.join(os.tmpdir(), prefix)); + cleanupDirs.push(root); + return { OPENCLAW_STATE_DIR: root }; } function readMode(target: string): number { @@ -37,51 +44,11 @@ function readMode(target: string): number { } describe("DebugProxyCaptureStore", () => { - it.each([ - ":memory:", - "file::memory:?cache=shared", - "file:%3Amemory:?cache=shared", - "file:proxy-capture?mode=memory&cache=shared", - "file:proxy-capture?mode=memory#ignored", - ])( - "keeps SQLite memory path %s off the filesystem", - (dbPath) => { - const mkdirSync = vi.spyOn(fs, "mkdirSync"); - const openSync = vi.spyOn(fs, "openSync"); - const existsSync = vi.spyOn(fs, "existsSync"); - - const store = new DebugProxyCaptureStore(dbPath, "unused"); - try { - expect(store.db.prepare("PRAGMA database_list").get()).toMatchObject({ file: "" }); - expect(mkdirSync).not.toHaveBeenCalled(); - expect(openSync).not.toHaveBeenCalled(); - expect(existsSync).not.toHaveBeenCalled(); - } finally { - store.close(); - } - }, - ); - - it.runIf(process.platform === "linux")("closes the database when initialization fails", () => { - const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-failed-open-")); - cleanupDirs.push(root); - const dbPath = path.join(root, "capture.sqlite"); - fs.writeFileSync(dbPath, "not a sqlite database"); - - expect(() => new DebugProxyCaptureStore(dbPath, path.join(root, "blobs"))).toThrow( - "file is not a database", - ); - expect(listOpenFileDescriptorsForPath(dbPath)).toEqual([]); - }); - it("keeps the cached store open until the last lease releases", () => { - const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-lease-")); - cleanupDirs.push(root); - const dbPath = path.join(root, "capture.sqlite"); - const blobDir = path.join(root, "blobs"); + const options = { env: makeStateEnv("openclaw-proxy-capture-lease-") }; - const first = acquireDebugProxyCaptureStore(dbPath, blobDir); - const second = acquireDebugProxyCaptureStore(dbPath, blobDir); + const first = acquireDebugProxyCaptureStore(options); + const second = acquireDebugProxyCaptureStore(options); expect(second.store).toBe(first.store); first.release(); @@ -90,22 +57,18 @@ describe("DebugProxyCaptureStore", () => { second.release(); expect(first.store.isClosed).toBe(true); - const reopened = getDebugProxyCaptureStore(dbPath, blobDir); + const reopened = getDebugProxyCaptureStore(options); expect(Object.is(reopened, first.store)).toBe(false); expect(reopened.isClosed).toBe(false); }); it("tracks and closes cached stores independently across paths", () => { - const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-paths-")); - cleanupDirs.push(root); - const first = acquireDebugProxyCaptureStore( - path.join(root, "first.sqlite"), - path.join(root, "first-blobs"), - ); - const second = acquireDebugProxyCaptureStore( - path.join(root, "second.sqlite"), - path.join(root, "second-blobs"), - ); + const first = acquireDebugProxyCaptureStore({ + env: makeStateEnv("openclaw-proxy-capture-first-"), + }); + const second = acquireDebugProxyCaptureStore({ + env: makeStateEnv("openclaw-proxy-capture-second-"), + }); first.release(); expect(first.store.isClosed).toBe(true); @@ -117,8 +80,6 @@ describe("DebugProxyCaptureStore", () => { }); it("uses rollback journaling for captures on NFS-backed volumes", () => { - const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-nfs-")); - cleanupDirs.push(root); vi.spyOn(fs, "statfsSync").mockReturnValue({ type: 0x6969, bsize: 1024, @@ -129,10 +90,9 @@ describe("DebugProxyCaptureStore", () => { ffree: 0, }); - const store = new DebugProxyCaptureStore( - path.join(root, "capture.sqlite"), - path.join(root, "blobs"), - ); + const store = new DebugProxyCaptureStore({ + env: makeStateEnv("openclaw-proxy-capture-nfs-"), + }); try { expect(store.db.prepare("PRAGMA journal_mode").get()).toMatchObject({ journal_mode: "delete", @@ -142,31 +102,40 @@ describe("DebugProxyCaptureStore", () => { } }); - it.runIf(process.platform !== "win32")("keeps capture databases and blobs private", () => { - const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-permissions-")); - cleanupDirs.push(root); - const dbDir = path.join(root, "db"); - const dbPath = path.join(dbDir, "capture.sqlite"); - const blobDir = path.join(root, "blobs"); - const store = new DebugProxyCaptureStore(dbPath, blobDir); - const blob = store.persistPayload(Buffer.from("authorization: Bearer secret")); + it.runIf(process.platform !== "win32")( + "stores capture blobs in the private shared state database", + () => { + const env = makeStateEnv("openclaw-proxy-capture-permissions-"); + const root = env.OPENCLAW_STATE_DIR!; + const store = new DebugProxyCaptureStore({ env }); + const blob = store.persistPayload(Buffer.from("authorization: Bearer secret")); + const row = store.db + .prepare( + `SELECT encoding, size_bytes AS sizeBytes, sha256, data + FROM capture_blobs + WHERE blob_id = ?`, + ) + .get(blob.blobId) as + | { data: Uint8Array; encoding: string; sha256: string; sizeBytes: number } + | undefined; - expect(readMode(dbDir)).toBe(0o700); - expect(readMode(blobDir)).toBe(0o700); - for (const databaseFile of resolveSqliteDatabaseFilePaths(dbPath).filter(fs.existsSync)) { - expect(readMode(databaseFile)).toBe(0o600); - } - expect(readMode(blob.path)).toBe(0o600); - - store.close(); - fs.chmodSync(dbPath, 0o644); - fs.chmodSync(blob.path, 0o644); - const reopened = new DebugProxyCaptureStore(dbPath, blobDir); - reopened.persistPayload(Buffer.from("authorization: Bearer secret")); - expect(readMode(dbPath)).toBe(0o600); - expect(readMode(blob.path)).toBe(0o600); - reopened.close(); - }); + expect(store.dbPath).toBe(path.join(root, "state", "openclaw.sqlite")); + expect(fs.existsSync(path.join(root, "debug-proxy", "capture.sqlite"))).toBe(false); + expect(fs.existsSync(path.join(root, "debug-proxy", "blobs"))).toBe(false); + expect(row).toMatchObject({ + encoding: "gzip", + sha256: blob.sha256, + sizeBytes: blob.sizeBytes, + }); + expect(Buffer.from(row?.data ?? []).toString("utf8")).not.toContain("Bearer secret"); + expect(readMode(path.dirname(store.dbPath))).toBe(0o700); + for (const databaseFile of resolveSqliteDatabaseFilePaths(store.dbPath).filter( + fs.existsSync, + )) { + expect(readMode(databaseFile)).toBe(0o600); + } + }, + ); it("ignores duplicate close calls", () => { const store = makeStore(); @@ -184,8 +153,6 @@ describe("DebugProxyCaptureStore", () => { mode: "proxy-run", sourceScope: "openclaw", sourceProcess: "openclaw", - dbPath: store.dbPath, - blobDir: store.blobDir, }); const firstPayload = persistEventPayload(store, { data: '{"ok":true}', @@ -230,6 +197,43 @@ describe("DebugProxyCaptureStore", () => { expect(store.readBlob(firstPayload.dataBlobId ?? "")).toContain('"ok":true'); }); + it("creates and later upgrades an implicit session for direct event capture", () => { + const store = makeStore(); + store.recordEvent({ + sessionId: "session-direct", + ts: 20, + sourceScope: "openclaw", + sourceProcess: "provider", + protocol: "https", + direction: "outbound", + kind: "request", + flowId: "flow-direct", + dataBlobId: "already-purged", + }); + + expect(store.listSessions(10)[0]).toMatchObject({ + id: "session-direct", + mode: "implicit", + }); + expect(store.getSessionEvents("session-direct", 10)[0]).toMatchObject({ + dataBlobId: null, + }); + + store.upsertSession({ + id: "session-direct", + startedAt: 10, + mode: "runtime", + sourceScope: "openclaw", + sourceProcess: "openclaw", + }); + + expect(store.listSessions(10)[0]).toMatchObject({ + id: "session-direct", + mode: "runtime", + startedAt: 10, + }); + }); + it("keeps shared blobs when deleting one of multiple referencing sessions", () => { const store = makeStore(); const sharedPayload = persistEventPayload(store, { @@ -244,8 +248,6 @@ describe("DebugProxyCaptureStore", () => { mode: "proxy-run", sourceScope: "openclaw", sourceProcess: "openclaw", - dbPath: store.dbPath, - blobDir: store.blobDir, }); store.recordEvent({ sessionId, @@ -270,5 +272,12 @@ describe("DebugProxyCaptureStore", () => { expect(result.blobs).toBe(0); expect(store.readBlob(sharedPayload.dataBlobId ?? "")).toContain('"shared":true'); expect(store.listSessions(10).map((session) => session.id)).toEqual(["session-b"]); + + expect(store.deleteSessions(["session-b"])).toEqual({ + sessions: 1, + events: 1, + blobs: 1, + }); + expect(store.readBlob(sharedPayload.dataBlobId ?? "")).toBeNull(); }); }); diff --git a/src/proxy-capture/store.sqlite.ts b/src/proxy-capture/store.sqlite.ts index 946a1427cf9..23d6030cb70 100644 --- a/src/proxy-capture/store.sqlite.ts +++ b/src/proxy-capture/store.sqlite.ts @@ -1,17 +1,11 @@ // Proxy capture SQLite store persists capture metadata and replayable exchanges. -import fs from "node:fs"; -import path from "node:path"; +import { createHash } from "node:crypto"; import type { DatabaseSync } from "node:sqlite"; +import { gunzipSync, gzipSync } from "node:zlib"; import { normalizeNullableString as normalizeObservedValue } from "@openclaw/normalization-core/string-coerce"; import { normalizeUniqueStringEntries } from "@openclaw/normalization-core/string-normalization"; -import { requireNodeSqlite } from "../infra/node-sqlite.js"; -import { applyPrivateModeSync } from "../infra/private-mode.js"; -import { resolveSqliteDatabaseFilePaths } from "../infra/sqlite-files.js"; -import { - configureSqliteConnectionPragmas, - type SqliteWalMaintenance, -} from "../infra/sqlite-wal.js"; -import { readCaptureBlobText, writeCaptureBlob } from "./blob-store.js"; +import { runSqliteImmediateTransactionSync } from "../infra/sqlite-transaction.js"; +import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js"; import type { CaptureBlobRecord, CaptureEventRecord, @@ -23,122 +17,11 @@ import type { CaptureSessionSummary, } from "./types.js"; -// SQLite-backed debug proxy store. Metadata stays in SQLite; large payloads are -// compressed into the blob directory and referenced by hash. -const DEBUG_PROXY_CAPTURE_DIR_MODE = 0o700; -const DEBUG_PROXY_CAPTURE_FILE_MODE = 0o600; - -function isInMemoryDatabasePath(dbPath: string): boolean { - if (dbPath === ":memory:") { - return true; - } - if (!dbPath.startsWith("file:")) { - return false; - } - const fragmentIndex = dbPath.indexOf("#"); - const uriWithoutFragment = fragmentIndex === -1 ? dbPath : dbPath.slice(0, fragmentIndex); - const queryIndex = uriWithoutFragment.indexOf("?"); - const uriPath = - queryIndex === -1 ? uriWithoutFragment : uriWithoutFragment.slice(0, queryIndex); - try { - if (decodeURIComponent(uriPath.slice("file:".length)) === ":memory:") { - return true; - } - } catch { - // Malformed escapes cannot identify a memory URI; retain file-backed handling. - } - return ( - queryIndex !== -1 && - new URLSearchParams(uriWithoutFragment.slice(queryIndex + 1)).get("mode") === "memory" - ); -} - -function ensureParentDir(filePath: string) { - fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: DEBUG_PROXY_CAPTURE_DIR_MODE }); -} - -function hardenDatabaseFiles(dbPath: string): void { - for (const candidate of resolveSqliteDatabaseFilePaths(dbPath)) { - if (fs.existsSync(candidate)) { - applyPrivateModeSync(candidate, DEBUG_PROXY_CAPTURE_FILE_MODE); - } - } -} - -type OpenedDatabase = { - db: DatabaseSync; - walMaintenance: SqliteWalMaintenance; +// Capture rows and compressed payload BLOBs live in the shared global state DB. +export type DebugProxyCaptureStoreOptions = { + env?: NodeJS.ProcessEnv; }; -function openDatabase(dbPath: string): OpenedDatabase { - const fileBackedPath = isInMemoryDatabasePath(dbPath) ? undefined : dbPath; - if (fileBackedPath) { - ensureParentDir(fileBackedPath); - if (!fs.existsSync(fileBackedPath)) { - fs.closeSync(fs.openSync(fileBackedPath, "a", DEBUG_PROXY_CAPTURE_FILE_MODE)); - } - } - const { DatabaseSync } = requireNodeSqlite(); - const db = new DatabaseSync(dbPath); - let walMaintenance: SqliteWalMaintenance | undefined; - try { - if (fileBackedPath) { - applyPrivateModeSync(fileBackedPath, DEBUG_PROXY_CAPTURE_FILE_MODE); - } - walMaintenance = configureSqliteConnectionPragmas(db, { - busyTimeoutMs: 5000, - databaseLabel: "debug-proxy-capture", - ...(fileBackedPath ? { databasePath: fileBackedPath } : {}), - }); - db.exec(` - CREATE TABLE IF NOT EXISTS capture_sessions ( - id TEXT PRIMARY KEY, - started_at INTEGER NOT NULL, - ended_at INTEGER, - mode TEXT NOT NULL, - source_scope TEXT NOT NULL, - source_process TEXT NOT NULL, - proxy_url TEXT, - db_path TEXT NOT NULL, - blob_dir TEXT NOT NULL - ); - CREATE TABLE IF NOT EXISTS capture_events ( - id INTEGER PRIMARY KEY, - session_id TEXT NOT NULL, - ts INTEGER NOT NULL, - source_scope TEXT NOT NULL, - source_process TEXT NOT NULL, - protocol TEXT NOT NULL, - direction TEXT NOT NULL, - kind TEXT NOT NULL, - flow_id TEXT NOT NULL, - method TEXT, - host TEXT, - path TEXT, - status INTEGER, - close_code INTEGER, - content_type TEXT, - headers_json TEXT, - data_text TEXT, - data_blob_id TEXT, - data_sha256 TEXT, - error_text TEXT, - meta_json TEXT - ); - CREATE INDEX IF NOT EXISTS capture_events_session_ts_idx ON capture_events(session_id, ts); - CREATE INDEX IF NOT EXISTS capture_events_flow_idx ON capture_events(flow_id, ts); - `); - if (fileBackedPath) { - hardenDatabaseFiles(fileBackedPath); - } - return { db, walMaintenance }; - } catch (err) { - walMaintenance?.close(); - db.close(); - throw err; - } -} - function serializeJson(value: unknown): string | null { return value == null ? null : JSON.stringify(value); } @@ -165,24 +48,19 @@ function sortObservedCounts(counts: Map): CaptureObservedDimensi export class DebugProxyCaptureStore { readonly db: DatabaseSync; - private readonly walMaintenance: SqliteWalMaintenance; + readonly dbPath: string; private closed = false; - constructor( - readonly dbPath: string, - readonly blobDir: string, - ) { - const opened = openDatabase(dbPath); - this.db = opened.db; - this.walMaintenance = opened.walMaintenance; + constructor(options: DebugProxyCaptureStoreOptions = {}) { + const database = openOpenClawStateDatabase({ env: options.env }); + this.db = database.db; + this.dbPath = database.path; } close(): void { if (this.closed) { return; } - this.walMaintenance.close(); - this.db.close(); this.closed = true; } @@ -194,10 +72,15 @@ export class DebugProxyCaptureStore { this.db .prepare( `INSERT INTO capture_sessions ( - id, started_at, ended_at, mode, source_scope, source_process, proxy_url, db_path, blob_dir - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + id, started_at, ended_at, mode, source_scope, source_process, proxy_url + ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET + started_at=MIN(capture_sessions.started_at, excluded.started_at), ended_at=excluded.ended_at, + mode=CASE + WHEN capture_sessions.mode = 'implicit' THEN excluded.mode + ELSE capture_sessions.mode + END, proxy_url=excluded.proxy_url, source_process=excluded.source_process`, ) @@ -209,8 +92,6 @@ export class DebugProxyCaptureStore { session.sourceScope, session.sourceProcess, session.proxyUrl ?? null, - session.dbPath, - session.blobDir, ); } @@ -221,40 +102,82 @@ export class DebugProxyCaptureStore { } persistPayload(data: Buffer, contentType?: string): CaptureBlobRecord { - return writeCaptureBlob({ blobDir: this.blobDir, data, contentType }); + const sha256 = createHash("sha256").update(data).digest("hex"); + const blobId = sha256.slice(0, 24); + this.db + .prepare( + `INSERT OR IGNORE INTO capture_blobs ( + blob_id, content_type, encoding, size_bytes, sha256, data, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + blobId, + contentType ?? null, + "gzip", + data.byteLength, + sha256, + gzipSync(data), + Date.now(), + ); + return { + blobId, + encoding: "gzip", + sizeBytes: data.byteLength, + sha256, + ...(contentType ? { contentType } : {}), + }; } recordEvent(event: CaptureEventRecord): void { - this.db - .prepare( - `INSERT INTO capture_events ( - session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id, - method, host, path, status, close_code, content_type, headers_json, - data_text, data_blob_id, data_sha256, error_text, meta_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ) - .run( - event.sessionId, - event.ts, - event.sourceScope, - event.sourceProcess, - event.protocol, - event.direction, - event.kind, - event.flowId, - event.method ?? null, - event.host ?? null, - event.path ?? null, - event.status ?? null, - event.closeCode ?? null, - event.contentType ?? null, - event.headersJson ?? null, - event.dataText ?? null, - event.dataBlobId ?? null, - event.dataSha256 ?? null, - event.errorText ?? null, - event.metaJson ?? null, - ); + runSqliteImmediateTransactionSync(this.db, () => { + // Capture can be invoked directly by provider seams before the top-level + // runtime initializes. Keep the shared-schema foreign key valid without + // making diagnostics break the request they are observing. + this.db + .prepare( + `INSERT OR IGNORE INTO capture_sessions ( + id, started_at, mode, source_scope, source_process + ) VALUES (?, ?, 'implicit', ?, ?)`, + ) + .run(event.sessionId, event.ts, event.sourceScope, event.sourceProcess); + // A concurrent purge can remove a payload before its event is recorded. + // Keep the inline preview instead of failing the observed request. + const dataBlobId = + event.dataBlobId && + this.db.prepare(`SELECT 1 FROM capture_blobs WHERE blob_id = ?`).get(event.dataBlobId) + ? event.dataBlobId + : null; + this.db + .prepare( + `INSERT INTO capture_events ( + session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id, + method, host, path, status, close_code, content_type, headers_json, + data_text, data_blob_id, data_sha256, error_text, meta_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + event.sessionId, + event.ts, + event.sourceScope, + event.sourceProcess, + event.protocol, + event.direction, + event.kind, + event.flowId, + event.method ?? null, + event.host ?? null, + event.path ?? null, + event.status ?? null, + event.closeCode ?? null, + event.contentType ?? null, + event.headersJson ?? null, + event.dataText ?? null, + dataBlobId, + event.dataSha256 ?? null, + event.errorText ?? null, + event.metaJson ?? null, + ); + }); } listSessions(limit = 50): CaptureSessionSummary[] { @@ -352,13 +275,13 @@ export class DebugProxyCaptureStore { readBlob(blobId: string): string | null { const row = this.db - .prepare(`SELECT data_blob_id AS blobId FROM capture_events WHERE data_blob_id = ? LIMIT 1`) - .get(blobId) as { blobId?: string } | undefined; - if (!row?.blobId) { + .prepare(`SELECT encoding, data FROM capture_blobs WHERE blob_id = ?`) + .get(blobId) as { data?: Uint8Array; encoding?: string } | undefined; + if (!row?.data) { return null; } - const blobPath = path.join(this.blobDir, `${row.blobId}.bin.gz`); - return fs.existsSync(blobPath) ? readCaptureBlobText(blobPath) : null; + const data = Buffer.from(row.data); + return (row.encoding === "gzip" ? gunzipSync(data) : data).toString("utf8"); } queryPreset(preset: CaptureQueryPreset, sessionId?: string): CaptureQueryRow[] { @@ -442,21 +365,24 @@ export class DebugProxyCaptureStore { } purgeAll(): { sessions: number; events: number; blobs: number } { - const sessionCount = - (this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as { count: number }) - .count ?? 0; - const eventCount = - (this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number }) - .count ?? 0; - this.db.exec(`DELETE FROM capture_events; DELETE FROM capture_sessions;`); - let blobs = 0; - if (fs.existsSync(this.blobDir)) { - for (const entry of fs.readdirSync(this.blobDir)) { - fs.rmSync(path.join(this.blobDir, entry), { force: true }); - blobs += 1; - } - } - return { sessions: sessionCount, events: eventCount, blobs }; + return runSqliteImmediateTransactionSync(this.db, () => { + const sessionCount = + ( + this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as { + count: number; + } + ).count ?? 0; + const eventCount = + (this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number }) + .count ?? 0; + const blobCount = + (this.db.prepare(`SELECT COUNT(*) AS count FROM capture_blobs`).get() as { count: number }) + .count ?? 0; + this.db.exec( + `DELETE FROM capture_events; DELETE FROM capture_sessions; DELETE FROM capture_blobs;`, + ); + return { sessions: sessionCount, events: eventCount, blobs: blobCount }; + }); } deleteSessions(sessionIds: string[]): { sessions: number; events: number; blobs: number } { @@ -464,75 +390,76 @@ export class DebugProxyCaptureStore { if (uniqueSessionIds.length === 0) { return { sessions: 0, events: 0, blobs: 0 }; } - const placeholders = uniqueSessionIds.map(() => "?").join(", "); - const blobRows = this.db - .prepare( - `SELECT DISTINCT data_blob_id AS blobId - FROM capture_events - WHERE session_id IN (${placeholders}) - AND data_blob_id IS NOT NULL`, - ) - .all(...uniqueSessionIds) as Array<{ blobId?: string | null }>; - const eventCount = - ( - this.db - .prepare( - `SELECT COUNT(*) AS count - FROM capture_events - WHERE session_id IN (${placeholders})`, - ) - .get(...uniqueSessionIds) as { count: number } - ).count ?? 0; - const sessionCount = - ( - this.db - .prepare( - `SELECT COUNT(*) AS count - FROM capture_sessions - WHERE id IN (${placeholders})`, - ) - .get(...uniqueSessionIds) as { count: number } - ).count ?? 0; - this.db - .prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`) - .run(...uniqueSessionIds); - this.db - .prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`) - .run(...uniqueSessionIds); - const candidateBlobIds = blobRows - .map((row) => row.blobId?.trim()) - .filter((blobId): blobId is string => Boolean(blobId)); - const remainingBlobRefs = - // Shared blobs are deleted only when no surviving event references them. - candidateBlobIds.length > 0 - ? new Set( - ( - this.db - .prepare( - `SELECT DISTINCT data_blob_id AS blobId - FROM capture_events - WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")}) - AND data_blob_id IS NOT NULL`, - ) - .all(...candidateBlobIds) as Array<{ blobId?: string | null }> + return runSqliteImmediateTransactionSync(this.db, () => { + const placeholders = uniqueSessionIds.map(() => "?").join(", "); + const blobRows = this.db + .prepare( + `SELECT DISTINCT data_blob_id AS blobId + FROM capture_events + WHERE session_id IN (${placeholders}) + AND data_blob_id IS NOT NULL`, + ) + .all(...uniqueSessionIds) as Array<{ blobId?: string | null }>; + const eventCount = + ( + this.db + .prepare( + `SELECT COUNT(*) AS count + FROM capture_events + WHERE session_id IN (${placeholders})`, ) - .map((row) => row.blobId?.trim()) - .filter((blobId): blobId is string => Boolean(blobId)), - ) - : new Set(); - let blobs = 0; - for (const row of blobRows) { - const blobId = row.blobId?.trim(); - if (!blobId || remainingBlobRefs.has(blobId)) { - continue; + .get(...uniqueSessionIds) as { count: number } + ).count ?? 0; + const sessionCount = + ( + this.db + .prepare( + `SELECT COUNT(*) AS count + FROM capture_sessions + WHERE id IN (${placeholders})`, + ) + .get(...uniqueSessionIds) as { count: number } + ).count ?? 0; + this.db + .prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`) + .run(...uniqueSessionIds); + this.db + .prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`) + .run(...uniqueSessionIds); + const candidateBlobIds = blobRows + .map((row) => row.blobId?.trim()) + .filter((blobId): blobId is string => Boolean(blobId)); + const remainingBlobRefs = + // Shared blobs are deleted only when no surviving event references them. + candidateBlobIds.length > 0 + ? new Set( + ( + this.db + .prepare( + `SELECT DISTINCT data_blob_id AS blobId + FROM capture_events + WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")}) + AND data_blob_id IS NOT NULL`, + ) + .all(...candidateBlobIds) as Array<{ blobId?: string | null }> + ) + .map((row) => row.blobId?.trim()) + .filter((blobId): blobId is string => Boolean(blobId)), + ) + : new Set(); + let blobs = 0; + const deleteBlob = this.db.prepare(`DELETE FROM capture_blobs WHERE blob_id = ?`); + for (const blobId of candidateBlobIds) { + if (remainingBlobRefs.has(blobId)) { + continue; + } + const result = deleteBlob.run(blobId); + if (Number(result.changes) > 0) { + blobs += 1; + } } - const blobPath = path.join(this.blobDir, `${blobId}.bin.gz`); - if (fs.existsSync(blobPath)) { - fs.rmSync(blobPath, { force: true }); - blobs += 1; - } - } - return { sessions: sessionCount, events: eventCount, blobs }; + return { sessions: sessionCount, events: eventCount, blobs }; + }); } } @@ -543,13 +470,15 @@ type CachedStoreEntry = { const cachedStores = new Map(); -export function getDebugProxyCaptureStore(dbPath: string, blobDir: string): DebugProxyCaptureStore { - const key = `${dbPath}:${blobDir}`; +export function getDebugProxyCaptureStore( + options: DebugProxyCaptureStoreOptions = {}, +): DebugProxyCaptureStore { + const key = openOpenClawStateDatabase({ env: options.env }).path; const cached = cachedStores.get(key); if (cached && !cached.store.isClosed) { return cached.store; } - const store = new DebugProxyCaptureStore(dbPath, blobDir); + const store = new DebugProxyCaptureStore(options); cachedStores.set(key, { store, leases: 0 }); return store; } @@ -561,14 +490,14 @@ export function closeDebugProxyCaptureStore(): void { cachedStores.clear(); } -// Lease API keeps one cached synchronous SQLite connection alive across related -// capture operations, then closes it when the last owner releases. -export function acquireDebugProxyCaptureStore( - dbPath: string, - blobDir: string, -): { store: DebugProxyCaptureStore; release: () => void } { - const key = `${dbPath}:${blobDir}`; - const store = getDebugProxyCaptureStore(dbPath, blobDir); +// Lease API keeps one cached capture-store wrapper alive across related +// operations, then releases it without closing the shared state database. +export function acquireDebugProxyCaptureStore(options: DebugProxyCaptureStoreOptions = {}): { + store: DebugProxyCaptureStore; + release: () => void; +} { + const store = getDebugProxyCaptureStore(options); + const key = store.dbPath; const cached = cachedStores.get(key); if (!cached || cached.store !== store) { throw new Error("debug proxy capture store cache changed while acquiring a lease"); diff --git a/src/proxy-capture/types.ts b/src/proxy-capture/types.ts index 4ec6fc4e90a..f41b4a73810 100644 --- a/src/proxy-capture/types.ts +++ b/src/proxy-capture/types.ts @@ -23,13 +23,10 @@ export type CaptureSessionRecord = { sourceScope: "openclaw"; sourceProcess: string; proxyUrl?: string; - dbPath: string; - blobDir: string; }; export type CaptureBlobRecord = { blobId: string; - path: string; encoding: "gzip"; sizeBytes: number; sha256: string;