mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-25 05:39:32 +00:00
refactor(proxy): store captures in shared state database
This commit is contained in:
@@ -99,8 +99,7 @@ describe("listMicrosoftVoices", () => {
|
||||
const tempDir = mkdtempSync(path.join(os.tmpdir(), "microsoft-voices-capture-"));
|
||||
proxyReset.captureProxyEnv();
|
||||
process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1";
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite");
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs");
|
||||
process.env.OPENCLAW_STATE_DIR = tempDir;
|
||||
process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "ms-voices-session";
|
||||
|
||||
globalThis.fetch = vi
|
||||
@@ -109,18 +108,13 @@ describe("listMicrosoftVoices", () => {
|
||||
new Response(JSON.stringify([{ ShortName: "en-US-AvaNeural" }]), { status: 200 }),
|
||||
) as unknown as typeof globalThis.fetch;
|
||||
|
||||
const store = getDebugProxyCaptureStore(
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
store.upsertSession({
|
||||
id: "ms-voices-session",
|
||||
startedAt: Date.now(),
|
||||
mode: "test",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
});
|
||||
|
||||
await listMicrosoftVoices();
|
||||
@@ -144,26 +138,20 @@ describe("listMicrosoftVoices", () => {
|
||||
const tempDir = mkdtempSync(path.join(os.tmpdir(), "microsoft-voices-global-"));
|
||||
proxyReset.captureProxyEnv();
|
||||
process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1";
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite");
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs");
|
||||
process.env.OPENCLAW_STATE_DIR = tempDir;
|
||||
process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "ms-voices-global-session";
|
||||
|
||||
globalThis.fetch = vi.fn(
|
||||
async () => new Response(JSON.stringify([{ ShortName: "en-US-AvaNeural" }]), { status: 200 }),
|
||||
) as unknown as typeof globalThis.fetch;
|
||||
|
||||
const store = getDebugProxyCaptureStore(
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
store.upsertSession({
|
||||
id: "ms-voices-global-session",
|
||||
startedAt: Date.now(),
|
||||
mode: "test",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
});
|
||||
initializeDebugProxyCapture("test");
|
||||
|
||||
|
||||
@@ -378,8 +378,7 @@ describe("openai tts", () => {
|
||||
const tempDir = mkdtempSync(path.join(os.tmpdir(), "openai-tts-capture-"));
|
||||
proxyReset.captureProxyEnv();
|
||||
process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1";
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite");
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs");
|
||||
process.env.OPENCLAW_STATE_DIR = tempDir;
|
||||
process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "tts-session";
|
||||
|
||||
globalThis.fetch = vi
|
||||
@@ -388,18 +387,13 @@ describe("openai tts", () => {
|
||||
new Response(Buffer.from("audio-bytes"), { status: 200 }),
|
||||
) as unknown as typeof globalThis.fetch;
|
||||
|
||||
const store = getDebugProxyCaptureStore(
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
store.upsertSession({
|
||||
id: "tts-session",
|
||||
startedAt: Date.now(),
|
||||
mode: "test",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
});
|
||||
|
||||
await openaiTTS({
|
||||
@@ -427,8 +421,7 @@ describe("openai tts", () => {
|
||||
const tempDir = mkdtempSync(path.join(os.tmpdir(), "openai-tts-patched-capture-"));
|
||||
proxyReset.captureProxyEnv();
|
||||
process.env.OPENCLAW_DEBUG_PROXY_ENABLED = "1";
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite");
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs");
|
||||
process.env.OPENCLAW_STATE_DIR = tempDir;
|
||||
process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID = "tts-patched-session";
|
||||
|
||||
globalThis.fetch = vi
|
||||
@@ -449,10 +442,7 @@ describe("openai tts", () => {
|
||||
timeoutMs: 5_000,
|
||||
});
|
||||
|
||||
const store = getDebugProxyCaptureStore(
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
let events: Array<Record<string, unknown>> = [];
|
||||
try {
|
||||
await vi.waitFor(() => {
|
||||
|
||||
@@ -150,8 +150,6 @@ vi.mock("openclaw/plugin-sdk/proxy-capture", () => ({
|
||||
}),
|
||||
getDebugProxyCaptureStore: () => captureMock.store,
|
||||
resolveDebugProxySettings: () => ({
|
||||
dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH ?? "",
|
||||
blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR ?? "",
|
||||
proxyUrl: process.env.OPENCLAW_DEBUG_PROXY_URL ?? "",
|
||||
sessionId: "qa-lab-test",
|
||||
}),
|
||||
@@ -876,8 +874,6 @@ describe("qa-lab server", () => {
|
||||
cleanups.push(async () => {
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite");
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs");
|
||||
const store = captureMock.store;
|
||||
store.upsertSession({
|
||||
id: "qa-capture-session",
|
||||
@@ -885,8 +881,6 @@ describe("qa-lab server", () => {
|
||||
mode: "proxy-run",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
dbPath: process.env.OPENCLAW_DEBUG_PROXY_DB_PATH,
|
||||
blobDir: process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR,
|
||||
});
|
||||
store.recordEvent({
|
||||
sessionId: "qa-capture-session",
|
||||
@@ -954,8 +948,6 @@ describe("qa-lab server", () => {
|
||||
port: 0,
|
||||
});
|
||||
cleanups.push(async () => {
|
||||
delete process.env.OPENCLAW_DEBUG_PROXY_DB_PATH;
|
||||
delete process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR;
|
||||
await lab.stop();
|
||||
});
|
||||
|
||||
|
||||
@@ -219,10 +219,7 @@ export async function startQaLabServer(
|
||||
): Promise<QaLabServerHandle> {
|
||||
const repoRoot = path.resolve(params?.repoRoot ?? process.cwd());
|
||||
const captureSettings = resolveDebugProxySettings();
|
||||
const captureStoreLease = acquireDebugProxyCaptureStore(
|
||||
captureSettings.dbPath,
|
||||
captureSettings.blobDir,
|
||||
);
|
||||
const captureStoreLease = acquireDebugProxyCaptureStore();
|
||||
const captureStore = captureStoreLease.store;
|
||||
const state = createQaBusState();
|
||||
let latestReport: QaLabLatestReport | null = null;
|
||||
|
||||
@@ -3,9 +3,8 @@ import { afterEach, vi } from "vitest";
|
||||
|
||||
const DEBUG_PROXY_ENV_KEYS = [
|
||||
"OPENCLAW_DEBUG_PROXY_ENABLED",
|
||||
"OPENCLAW_DEBUG_PROXY_DB_PATH",
|
||||
"OPENCLAW_DEBUG_PROXY_BLOB_DIR",
|
||||
"OPENCLAW_DEBUG_PROXY_SESSION_ID",
|
||||
"OPENCLAW_STATE_DIR",
|
||||
] as const;
|
||||
|
||||
type DebugProxyEnvKey = (typeof DEBUG_PROXY_ENV_KEYS)[number];
|
||||
@@ -30,13 +29,19 @@ function restoreDebugProxyEnv(snapshot: DebugProxyEnvSnapshot): void {
|
||||
|
||||
export function installDebugProxyTestResetHooks() {
|
||||
const originalFetch = globalThis.fetch;
|
||||
let priorProxyEnv: DebugProxyEnvSnapshot = {};
|
||||
const originalProxyEnv = snapshotDebugProxyEnv();
|
||||
let priorProxyEnv = originalProxyEnv;
|
||||
|
||||
afterEach(() => {
|
||||
afterEach(async () => {
|
||||
const { closeDebugProxyCaptureStore } = await import("openclaw/plugin-sdk/proxy-capture");
|
||||
const { closeOpenClawStateDatabaseForTest } =
|
||||
await import("openclaw/plugin-sdk/sqlite-runtime-testing");
|
||||
closeDebugProxyCaptureStore();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
globalThis.fetch = originalFetch;
|
||||
vi.restoreAllMocks();
|
||||
restoreDebugProxyEnv(priorProxyEnv);
|
||||
priorProxyEnv = {};
|
||||
priorProxyEnv = originalProxyEnv;
|
||||
});
|
||||
|
||||
return {
|
||||
|
||||
@@ -39,8 +39,7 @@ vi.mock("../infra/net/proxy/proxy-validation.js", () => ({
|
||||
|
||||
describe("proxy cli runtime", () => {
|
||||
const envKeys = [
|
||||
"OPENCLAW_DEBUG_PROXY_DB_PATH",
|
||||
"OPENCLAW_DEBUG_PROXY_BLOB_DIR",
|
||||
"OPENCLAW_STATE_DIR",
|
||||
"OPENCLAW_DEBUG_PROXY_CERT_DIR",
|
||||
"OPENCLAW_DEBUG_PROXY_SESSION_ID",
|
||||
"OPENCLAW_DEBUG_PROXY_ENABLED",
|
||||
@@ -52,8 +51,7 @@ describe("proxy cli runtime", () => {
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-cli-runtime-"));
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH = path.join(tempDir, "capture.sqlite");
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR = path.join(tempDir, "blobs");
|
||||
process.env.OPENCLAW_STATE_DIR = tempDir;
|
||||
process.env.OPENCLAW_DEBUG_PROXY_CERT_DIR = path.join(tempDir, "certs");
|
||||
delete process.env.OPENCLAW_DEBUG_PROXY_ENABLED;
|
||||
delete process.env.OPENCLAW_DEBUG_PROXY_SESSION_ID;
|
||||
@@ -92,7 +90,9 @@ describe("proxy cli runtime", () => {
|
||||
|
||||
afterEach(async () => {
|
||||
const { closeDebugProxyCaptureStore } = await import("../proxy-capture/store.sqlite.js");
|
||||
const { closeOpenClawStateDatabaseForTest } = await import("../state/openclaw-state-db.js");
|
||||
closeDebugProxyCaptureStore();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
vi.restoreAllMocks();
|
||||
vi.resetModules();
|
||||
process.exitCode = undefined;
|
||||
@@ -503,10 +503,7 @@ describe("proxy cli runtime", () => {
|
||||
|
||||
expect(serverStopSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
const store = getDebugProxyCaptureStore(
|
||||
process.env.OPENCLAW_DEBUG_PROXY_DB_PATH!,
|
||||
process.env.OPENCLAW_DEBUG_PROXY_BLOB_DIR!,
|
||||
);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
const [session] = store.listSessions(5);
|
||||
expect(session?.mode).toBe("proxy-run");
|
||||
expect(session?.endedAt).toBeGreaterThanOrEqual(beforeRun);
|
||||
|
||||
@@ -24,7 +24,7 @@ import type { CaptureQueryPreset } from "../proxy-capture/types.js";
|
||||
|
||||
export async function runDebugProxyStartCommand(opts: { host?: string; port?: number }) {
|
||||
const settings = resolveDebugProxySettings();
|
||||
const store = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
store.upsertSession({
|
||||
id: settings.sessionId,
|
||||
startedAt: Date.now(),
|
||||
@@ -32,8 +32,6 @@ export async function runDebugProxyStartCommand(opts: { host?: string; port?: nu
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
proxyUrl: settings.proxyUrl,
|
||||
dbPath: settings.dbPath,
|
||||
blobDir: settings.blobDir,
|
||||
});
|
||||
initializeDebugProxyCapture("proxy-start", settings);
|
||||
const ca = await ensureDebugProxyCa(settings.certDir);
|
||||
@@ -44,7 +42,7 @@ export async function runDebugProxyStartCommand(opts: { host?: string; port?: nu
|
||||
});
|
||||
process.stdout.write(`Debug proxy: ${server.proxyUrl}\n`);
|
||||
process.stdout.write(`CA cert: ${ca.certPath}\n`);
|
||||
process.stdout.write(`Capture DB: ${settings.dbPath}\n`);
|
||||
process.stdout.write(`Capture DB: ${store.dbPath}\n`);
|
||||
process.stdout.write("Press Ctrl+C to stop.\n");
|
||||
const shutdown = async () => {
|
||||
process.off("SIGINT", onSignal);
|
||||
@@ -81,15 +79,13 @@ export async function runDebugProxyRunCommand(opts: {
|
||||
...baseSettings,
|
||||
sessionId,
|
||||
};
|
||||
getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).upsertSession({
|
||||
getDebugProxyCaptureStore().upsertSession({
|
||||
id: sessionId,
|
||||
startedAt: Date.now(),
|
||||
mode: "proxy-run",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
proxyUrl: undefined,
|
||||
dbPath: settings.dbPath,
|
||||
blobDir: settings.blobDir,
|
||||
});
|
||||
const server = await startDebugProxyServer({
|
||||
host: opts.host,
|
||||
@@ -100,8 +96,6 @@ export async function runDebugProxyRunCommand(opts: {
|
||||
const childEnv = applyDebugProxyEnv(process.env, {
|
||||
proxyUrl: server.proxyUrl,
|
||||
sessionId,
|
||||
dbPath: settings.dbPath,
|
||||
blobDir: settings.blobDir,
|
||||
certDir: settings.certDir,
|
||||
});
|
||||
try {
|
||||
@@ -119,7 +113,7 @@ export async function runDebugProxyRunCommand(opts: {
|
||||
});
|
||||
} finally {
|
||||
await server.stop();
|
||||
getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).endSession(sessionId);
|
||||
getDebugProxyCaptureStore().endSession(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,10 +286,7 @@ export async function runProxyValidateCommand(opts: {
|
||||
}
|
||||
|
||||
export async function runDebugProxySessionsCommand(opts: { limit?: number }) {
|
||||
const settings = resolveDebugProxySettings();
|
||||
const sessions = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).listSessions(
|
||||
opts.limit ?? 20,
|
||||
);
|
||||
const sessions = getDebugProxyCaptureStore().listSessions(opts.limit ?? 20);
|
||||
process.stdout.write(`${JSON.stringify(sessions, null, 2)}\n`);
|
||||
closeDebugProxyCaptureStore();
|
||||
}
|
||||
@@ -304,11 +295,7 @@ export async function runDebugProxyQueryCommand(opts: {
|
||||
preset: CaptureQueryPreset;
|
||||
sessionId?: string;
|
||||
}) {
|
||||
const settings = resolveDebugProxySettings();
|
||||
const rows = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).queryPreset(
|
||||
opts.preset,
|
||||
opts.sessionId,
|
||||
);
|
||||
const rows = getDebugProxyCaptureStore().queryPreset(opts.preset, opts.sessionId);
|
||||
process.stdout.write(`${JSON.stringify(rows, null, 2)}\n`);
|
||||
closeDebugProxyCaptureStore();
|
||||
}
|
||||
@@ -319,17 +306,13 @@ export async function runDebugProxyCoverageCommand() {
|
||||
}
|
||||
|
||||
export async function runDebugProxyPurgeCommand() {
|
||||
const settings = resolveDebugProxySettings();
|
||||
const result = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).purgeAll();
|
||||
const result = getDebugProxyCaptureStore().purgeAll();
|
||||
process.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
|
||||
closeDebugProxyCaptureStore();
|
||||
}
|
||||
|
||||
export async function readDebugProxyBlobCommand(opts: { blobId: string }) {
|
||||
const settings = resolveDebugProxySettings();
|
||||
const content = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).readBlob(
|
||||
opts.blobId,
|
||||
);
|
||||
const content = getDebugProxyCaptureStore().readBlob(opts.blobId);
|
||||
if (content == null) {
|
||||
closeDebugProxyCaptureStore();
|
||||
throw new Error(`Unknown blob: ${opts.blobId}`);
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
// Proxy capture blob store persists captured request and response bodies by hash.
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { gzipSync, gunzipSync } from "node:zlib";
|
||||
import { applyPrivateModeSync } from "../infra/private-mode.js";
|
||||
import type { CaptureBlobRecord } from "./types.js";
|
||||
|
||||
// Capture blobs store request/response bodies by content hash, gzip-compressed
|
||||
// on disk, so repeated payloads can share one file across events.
|
||||
const DEBUG_PROXY_CAPTURE_DIR_MODE = 0o700;
|
||||
const DEBUG_PROXY_CAPTURE_FILE_MODE = 0o600;
|
||||
|
||||
function ensureDir(dir: string) {
|
||||
fs.mkdirSync(dir, { recursive: true, mode: DEBUG_PROXY_CAPTURE_DIR_MODE });
|
||||
}
|
||||
|
||||
export function writeCaptureBlob(params: {
|
||||
blobDir: string;
|
||||
data: Buffer;
|
||||
contentType?: string;
|
||||
}): CaptureBlobRecord {
|
||||
ensureDir(params.blobDir);
|
||||
const sha256 = createHash("sha256").update(params.data).digest("hex");
|
||||
const blobId = sha256.slice(0, 24);
|
||||
const outputPath = path.join(params.blobDir, `${blobId}.bin.gz`);
|
||||
if (!fs.existsSync(outputPath)) {
|
||||
fs.writeFileSync(outputPath, gzipSync(params.data), { mode: DEBUG_PROXY_CAPTURE_FILE_MODE });
|
||||
}
|
||||
applyPrivateModeSync(outputPath, DEBUG_PROXY_CAPTURE_FILE_MODE);
|
||||
return {
|
||||
blobId,
|
||||
path: outputPath,
|
||||
encoding: "gzip",
|
||||
sizeBytes: params.data.byteLength,
|
||||
sha256,
|
||||
...(params.contentType ? { contentType: params.contentType } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
// Debug CLI reads blobs as UTF-8 previews. Binary payloads still remain
|
||||
// available via the compressed file path recorded in the blob metadata.
|
||||
export function readCaptureBlobText(blobPath: string): string {
|
||||
return gunzipSync(fs.readFileSync(blobPath)).toString("utf8");
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
// Proxy capture env tests cover environment variable generation for capture sessions.
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
applyDebugProxyEnv,
|
||||
OPENCLAW_DEBUG_PROXY_ENABLED,
|
||||
OPENCLAW_DEBUG_PROXY_SESSION_ID,
|
||||
resolveDebugProxySettings,
|
||||
@@ -26,4 +27,30 @@ describe("resolveDebugProxySettings", () => {
|
||||
|
||||
expect(settings.sessionId).toBe("session-explicit");
|
||||
});
|
||||
|
||||
it("ignores obsolete capture storage overrides", () => {
|
||||
const settings = resolveDebugProxySettings({
|
||||
OPENCLAW_DEBUG_PROXY_DB_PATH: "/tmp/legacy-capture.sqlite",
|
||||
OPENCLAW_DEBUG_PROXY_BLOB_DIR: "/tmp/legacy-capture-blobs",
|
||||
});
|
||||
|
||||
expect(settings).not.toHaveProperty("dbPath");
|
||||
expect(settings).not.toHaveProperty("blobDir");
|
||||
});
|
||||
|
||||
it("does not pass obsolete capture storage overrides to child processes", () => {
|
||||
const env = applyDebugProxyEnv(
|
||||
{
|
||||
OPENCLAW_DEBUG_PROXY_DB_PATH: "/tmp/legacy-capture.sqlite",
|
||||
OPENCLAW_DEBUG_PROXY_BLOB_DIR: "/tmp/legacy-capture-blobs",
|
||||
},
|
||||
{
|
||||
proxyUrl: "http://127.0.0.1:7799",
|
||||
sessionId: "session-child",
|
||||
},
|
||||
);
|
||||
|
||||
expect(env.OPENCLAW_DEBUG_PROXY_DB_PATH).toBeUndefined();
|
||||
expect(env.OPENCLAW_DEBUG_PROXY_BLOB_DIR).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,18 +3,12 @@ import { randomUUID } from "node:crypto";
|
||||
import type { Agent } from "node:http";
|
||||
import process from "node:process";
|
||||
import { createAmbientNodeProxyAgent } from "@openclaw/proxyline";
|
||||
import {
|
||||
resolveDebugProxyBlobDir,
|
||||
resolveDebugProxyCertDir,
|
||||
resolveDebugProxyDbPath,
|
||||
} from "./paths.js";
|
||||
import { resolveDebugProxyCertDir } from "./paths.js";
|
||||
|
||||
// Environment contract for debug proxy capture. These vars are passed to child
|
||||
// processes and provider transports so capture sessions share one store/proxy.
|
||||
export const OPENCLAW_DEBUG_PROXY_ENABLED = "OPENCLAW_DEBUG_PROXY_ENABLED";
|
||||
export const OPENCLAW_DEBUG_PROXY_URL = "OPENCLAW_DEBUG_PROXY_URL";
|
||||
export const OPENCLAW_DEBUG_PROXY_DB_PATH = "OPENCLAW_DEBUG_PROXY_DB_PATH";
|
||||
export const OPENCLAW_DEBUG_PROXY_BLOB_DIR = "OPENCLAW_DEBUG_PROXY_BLOB_DIR";
|
||||
export const OPENCLAW_DEBUG_PROXY_CERT_DIR = "OPENCLAW_DEBUG_PROXY_CERT_DIR";
|
||||
export const OPENCLAW_DEBUG_PROXY_SESSION_ID = "OPENCLAW_DEBUG_PROXY_SESSION_ID";
|
||||
export const OPENCLAW_DEBUG_PROXY_REQUIRE = "OPENCLAW_DEBUG_PROXY_REQUIRE";
|
||||
@@ -23,8 +17,6 @@ export type DebugProxySettings = {
|
||||
enabled: boolean;
|
||||
required: boolean;
|
||||
proxyUrl?: string;
|
||||
dbPath: string;
|
||||
blobDir: string;
|
||||
certDir: string;
|
||||
sessionId: string;
|
||||
sourceProcess: string;
|
||||
@@ -48,8 +40,6 @@ export function resolveDebugProxySettings(
|
||||
enabled,
|
||||
required: isTruthy(env[OPENCLAW_DEBUG_PROXY_REQUIRE]),
|
||||
proxyUrl: env[OPENCLAW_DEBUG_PROXY_URL]?.trim() || undefined,
|
||||
dbPath: env[OPENCLAW_DEBUG_PROXY_DB_PATH]?.trim() || resolveDebugProxyDbPath(env),
|
||||
blobDir: env[OPENCLAW_DEBUG_PROXY_BLOB_DIR]?.trim() || resolveDebugProxyBlobDir(env),
|
||||
certDir: env[OPENCLAW_DEBUG_PROXY_CERT_DIR]?.trim() || resolveDebugProxyCertDir(env),
|
||||
sessionId,
|
||||
sourceProcess: "openclaw",
|
||||
@@ -61,20 +51,19 @@ export function applyDebugProxyEnv(
|
||||
params: {
|
||||
proxyUrl: string;
|
||||
sessionId: string;
|
||||
dbPath?: string;
|
||||
blobDir?: string;
|
||||
certDir?: string;
|
||||
},
|
||||
): NodeJS.ProcessEnv {
|
||||
// Child process env forces proxy capture and standard proxy variables while
|
||||
// preserving unrelated environment values.
|
||||
const baseEnv = { ...env };
|
||||
delete baseEnv.OPENCLAW_DEBUG_PROXY_DB_PATH;
|
||||
delete baseEnv.OPENCLAW_DEBUG_PROXY_BLOB_DIR;
|
||||
return {
|
||||
...env,
|
||||
...baseEnv,
|
||||
[OPENCLAW_DEBUG_PROXY_ENABLED]: "1",
|
||||
[OPENCLAW_DEBUG_PROXY_REQUIRE]: "1",
|
||||
[OPENCLAW_DEBUG_PROXY_URL]: params.proxyUrl,
|
||||
[OPENCLAW_DEBUG_PROXY_DB_PATH]: params.dbPath ?? resolveDebugProxyDbPath(env),
|
||||
[OPENCLAW_DEBUG_PROXY_BLOB_DIR]: params.blobDir ?? resolveDebugProxyBlobDir(env),
|
||||
[OPENCLAW_DEBUG_PROXY_CERT_DIR]: params.certDir ?? resolveDebugProxyCertDir(env),
|
||||
[OPENCLAW_DEBUG_PROXY_SESSION_ID]: params.sessionId,
|
||||
HTTP_PROXY: params.proxyUrl,
|
||||
|
||||
@@ -1,21 +1,13 @@
|
||||
// Proxy capture path helpers resolve capture directories and database paths.
|
||||
// Proxy capture path helpers resolve certificate artifacts.
|
||||
import path from "node:path";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
|
||||
// Debug proxy capture artifacts live under OpenClaw state so DB, blobs, and CA
|
||||
// files are grouped and easy to purge.
|
||||
// Debug proxy CA files live under OpenClaw state. Capture data lives in the
|
||||
// shared global state database.
|
||||
function resolveDebugProxyRootDir(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveStateDir(env), "debug-proxy");
|
||||
}
|
||||
|
||||
export function resolveDebugProxyDbPath(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveDebugProxyRootDir(env), "capture.sqlite");
|
||||
}
|
||||
|
||||
export function resolveDebugProxyBlobDir(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveDebugProxyRootDir(env), "blobs");
|
||||
}
|
||||
|
||||
export function resolveDebugProxyCertDir(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return path.join(resolveDebugProxyRootDir(env), "certs");
|
||||
}
|
||||
|
||||
@@ -5,11 +5,21 @@ import { Socket, type AddressInfo } from "node:net";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import { assertDebugProxyDirectUpstreamAllowed, startDebugProxyServer } from "./proxy-server.js";
|
||||
import { closeDebugProxyCaptureStore } from "./store.sqlite.js";
|
||||
|
||||
let testRoot: string | undefined;
|
||||
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
|
||||
|
||||
async function cleanupTestDirs(): Promise<void> {
|
||||
closeDebugProxyCaptureStore();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
if (originalStateDir === undefined) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = originalStateDir;
|
||||
}
|
||||
if (!testRoot) {
|
||||
return;
|
||||
}
|
||||
@@ -24,11 +34,10 @@ async function makeSettings() {
|
||||
await mkdir(certDir, { recursive: true });
|
||||
await writeFile(join(certDir, "root-ca.pem"), "test root cert\n", "utf8");
|
||||
await writeFile(join(certDir, "root-ca-key.pem"), "test root key\n", "utf8");
|
||||
process.env.OPENCLAW_STATE_DIR = testRoot;
|
||||
return {
|
||||
enabled: true,
|
||||
required: false,
|
||||
dbPath: ":memory:",
|
||||
blobDir: join(testRoot, "blobs"),
|
||||
certDir,
|
||||
sessionId: "debug-proxy-managed-proxy-test",
|
||||
sourceProcess: "test",
|
||||
|
||||
@@ -5,14 +5,22 @@ import type { AddressInfo } from "node:net";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import type { DebugProxySettings } from "./env.js";
|
||||
import { parseConnectTarget, startDebugProxyServer } from "./proxy-server.js";
|
||||
import { closeDebugProxyCaptureStore, getDebugProxyCaptureStore } from "./store.sqlite.js";
|
||||
|
||||
let testRoot: string | undefined;
|
||||
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
|
||||
|
||||
async function cleanupTestRoot(): Promise<void> {
|
||||
closeDebugProxyCaptureStore();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
if (originalStateDir === undefined) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = originalStateDir;
|
||||
}
|
||||
if (!testRoot) {
|
||||
return;
|
||||
}
|
||||
@@ -27,11 +35,10 @@ async function makeSettings(): Promise<DebugProxySettings> {
|
||||
await mkdir(certDir, { recursive: true });
|
||||
await writeFile(join(certDir, "root-ca.pem"), "test root cert\n", "utf8");
|
||||
await writeFile(join(certDir, "root-ca-key.pem"), "test root key\n", "utf8");
|
||||
process.env.OPENCLAW_STATE_DIR = testRoot;
|
||||
return {
|
||||
enabled: true,
|
||||
required: false,
|
||||
dbPath: ":memory:",
|
||||
blobDir: join(testRoot, "blobs"),
|
||||
certDir,
|
||||
sessionId: "debug-proxy-server-test",
|
||||
sourceProcess: "test",
|
||||
@@ -160,10 +167,7 @@ describe("startDebugProxyServer", () => {
|
||||
|
||||
expect(origin.receivedRequestBody()).toBe(requestBody);
|
||||
expect(responseBody).toBe(origin.responseBody);
|
||||
const events = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir).getSessionEvents(
|
||||
settings.sessionId,
|
||||
10,
|
||||
);
|
||||
const events = getDebugProxyCaptureStore().getSessionEvents(settings.sessionId, 10);
|
||||
const capturedRequest = events.find((event) => event.kind === "request");
|
||||
const capturedResponse = events.find((event) => event.kind === "response");
|
||||
expect(capturedRequest?.dataText).toBe("q".repeat(8192));
|
||||
|
||||
@@ -154,7 +154,7 @@ export async function startDebugProxyServer(params: {
|
||||
settings: DebugProxySettings;
|
||||
}): Promise<DebugProxyServerHandle> {
|
||||
await ensureDebugProxyCa(params.settings.certDir);
|
||||
const store = getDebugProxyCaptureStore(params.settings.dbPath, params.settings.blobDir);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
const recordProxyEvent = createProxyCaptureRecorder({ store, settings: params.settings });
|
||||
const host = params.host?.trim() || "127.0.0.1";
|
||||
|
||||
|
||||
@@ -13,8 +13,6 @@ type StoreCall = { name: string; args: unknown[] };
|
||||
const settings: DebugProxySettings = {
|
||||
enabled: true,
|
||||
required: false,
|
||||
dbPath: "/tmp/openclaw-proxy-runtime-test.sqlite",
|
||||
blobDir: "/tmp/openclaw-proxy-runtime-test-blobs",
|
||||
certDir: "/tmp/openclaw-proxy-runtime-test-certs",
|
||||
sessionId: "runtime-test-session",
|
||||
sourceProcess: "runtime-test",
|
||||
|
||||
@@ -42,7 +42,7 @@ const SENSITIVE_CAPTURE_HEADER_NAME_FRAGMENTS = [
|
||||
];
|
||||
|
||||
// Runtime capture records HTTP/fetch and websocket events into the SQLite store,
|
||||
// redacting sensitive headers and persisting bodies through the blob store.
|
||||
// redacting sensitive headers and persisting bodies in capture_blobs.
|
||||
type GlobalFetchPatchedState = {
|
||||
originalFetch: typeof globalThis.fetch;
|
||||
};
|
||||
@@ -57,7 +57,7 @@ type DebugProxyCaptureStoreLike = Pick<
|
||||
>;
|
||||
|
||||
export type DebugProxyCaptureRuntimeDeps = {
|
||||
getStore?: (dbPath: string, blobDir: string) => DebugProxyCaptureStoreLike;
|
||||
getStore?: () => DebugProxyCaptureStoreLike;
|
||||
closeStore?: typeof closeDebugProxyCaptureStore;
|
||||
persistEventPayload?: (
|
||||
store: DebugProxyCaptureStoreLike,
|
||||
@@ -221,7 +221,7 @@ function installDebugProxyGlobalFetchPatch(
|
||||
return response;
|
||||
} catch (error) {
|
||||
if (url && /^https?:/i.test(url)) {
|
||||
const store = runtime.getStore(settings.dbPath, settings.blobDir);
|
||||
const store = runtime.getStore();
|
||||
const parsed = new URL(url);
|
||||
store.recordEvent({
|
||||
sessionId: settings.sessionId,
|
||||
@@ -278,15 +278,13 @@ export function initializeDebugProxyCapture(
|
||||
if (!settings.enabled) {
|
||||
return;
|
||||
}
|
||||
resolveRuntimeDeps(deps).getStore(settings.dbPath, settings.blobDir).upsertSession({
|
||||
resolveRuntimeDeps(deps).getStore().upsertSession({
|
||||
id: settings.sessionId,
|
||||
startedAt: Date.now(),
|
||||
mode,
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: settings.sourceProcess,
|
||||
proxyUrl: settings.proxyUrl,
|
||||
dbPath: settings.dbPath,
|
||||
blobDir: settings.blobDir,
|
||||
});
|
||||
installDebugProxyGlobalFetchPatch(settings, deps);
|
||||
}
|
||||
@@ -302,7 +300,7 @@ export function finalizeDebugProxyCapture(
|
||||
return;
|
||||
}
|
||||
const runtime = resolveRuntimeDeps(deps);
|
||||
runtime.getStore(settings.dbPath, settings.blobDir).endSession(settings.sessionId);
|
||||
runtime.getStore().endSession(settings.sessionId);
|
||||
uninstallDebugProxyGlobalFetchPatch(deps);
|
||||
runtime.closeStore();
|
||||
}
|
||||
@@ -326,7 +324,7 @@ export function captureHttpExchange(
|
||||
return;
|
||||
}
|
||||
const runtime = resolveRuntimeDeps(deps);
|
||||
const store = runtime.getStore(settings.dbPath, settings.blobDir);
|
||||
const store = runtime.getStore();
|
||||
const flowId = params.flowId ?? randomUUID();
|
||||
const url = new URL(params.url);
|
||||
const requestBody =
|
||||
@@ -449,7 +447,7 @@ export function captureWsEvent(params: {
|
||||
if (!settings.enabled) {
|
||||
return;
|
||||
}
|
||||
const store = getDebugProxyCaptureStore(settings.dbPath, settings.blobDir);
|
||||
const store = getDebugProxyCaptureStore();
|
||||
const url = new URL(params.url);
|
||||
const payload = persistEventPayload(store, {
|
||||
data: params.payload,
|
||||
|
||||
@@ -3,8 +3,8 @@ import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { listOpenFileDescriptorsForPath } from "../infra/open-file-descriptors.test-support.js";
|
||||
import { resolveSqliteDatabaseFilePaths } from "../infra/sqlite-files.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import {
|
||||
acquireDebugProxyCaptureStore,
|
||||
closeDebugProxyCaptureStore,
|
||||
@@ -17,6 +17,7 @@ const cleanupDirs: string[] = [];
|
||||
|
||||
afterEach(() => {
|
||||
closeDebugProxyCaptureStore();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
vi.restoreAllMocks();
|
||||
while (cleanupDirs.length > 0) {
|
||||
const dir = cleanupDirs.pop();
|
||||
@@ -29,7 +30,13 @@ afterEach(() => {
|
||||
function makeStore() {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-"));
|
||||
cleanupDirs.push(root);
|
||||
return new DebugProxyCaptureStore(path.join(root, "capture.sqlite"), path.join(root, "blobs"));
|
||||
return new DebugProxyCaptureStore({ env: { OPENCLAW_STATE_DIR: root } });
|
||||
}
|
||||
|
||||
function makeStateEnv(prefix: string): NodeJS.ProcessEnv {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), prefix));
|
||||
cleanupDirs.push(root);
|
||||
return { OPENCLAW_STATE_DIR: root };
|
||||
}
|
||||
|
||||
function readMode(target: string): number {
|
||||
@@ -37,51 +44,11 @@ function readMode(target: string): number {
|
||||
}
|
||||
|
||||
describe("DebugProxyCaptureStore", () => {
|
||||
it.each([
|
||||
":memory:",
|
||||
"file::memory:?cache=shared",
|
||||
"file:%3Amemory:?cache=shared",
|
||||
"file:proxy-capture?mode=memory&cache=shared",
|
||||
"file:proxy-capture?mode=memory#ignored",
|
||||
])(
|
||||
"keeps SQLite memory path %s off the filesystem",
|
||||
(dbPath) => {
|
||||
const mkdirSync = vi.spyOn(fs, "mkdirSync");
|
||||
const openSync = vi.spyOn(fs, "openSync");
|
||||
const existsSync = vi.spyOn(fs, "existsSync");
|
||||
|
||||
const store = new DebugProxyCaptureStore(dbPath, "unused");
|
||||
try {
|
||||
expect(store.db.prepare("PRAGMA database_list").get()).toMatchObject({ file: "" });
|
||||
expect(mkdirSync).not.toHaveBeenCalled();
|
||||
expect(openSync).not.toHaveBeenCalled();
|
||||
expect(existsSync).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
it.runIf(process.platform === "linux")("closes the database when initialization fails", () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-failed-open-"));
|
||||
cleanupDirs.push(root);
|
||||
const dbPath = path.join(root, "capture.sqlite");
|
||||
fs.writeFileSync(dbPath, "not a sqlite database");
|
||||
|
||||
expect(() => new DebugProxyCaptureStore(dbPath, path.join(root, "blobs"))).toThrow(
|
||||
"file is not a database",
|
||||
);
|
||||
expect(listOpenFileDescriptorsForPath(dbPath)).toEqual([]);
|
||||
});
|
||||
|
||||
it("keeps the cached store open until the last lease releases", () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-lease-"));
|
||||
cleanupDirs.push(root);
|
||||
const dbPath = path.join(root, "capture.sqlite");
|
||||
const blobDir = path.join(root, "blobs");
|
||||
const options = { env: makeStateEnv("openclaw-proxy-capture-lease-") };
|
||||
|
||||
const first = acquireDebugProxyCaptureStore(dbPath, blobDir);
|
||||
const second = acquireDebugProxyCaptureStore(dbPath, blobDir);
|
||||
const first = acquireDebugProxyCaptureStore(options);
|
||||
const second = acquireDebugProxyCaptureStore(options);
|
||||
|
||||
expect(second.store).toBe(first.store);
|
||||
first.release();
|
||||
@@ -90,22 +57,18 @@ describe("DebugProxyCaptureStore", () => {
|
||||
second.release();
|
||||
expect(first.store.isClosed).toBe(true);
|
||||
|
||||
const reopened = getDebugProxyCaptureStore(dbPath, blobDir);
|
||||
const reopened = getDebugProxyCaptureStore(options);
|
||||
expect(Object.is(reopened, first.store)).toBe(false);
|
||||
expect(reopened.isClosed).toBe(false);
|
||||
});
|
||||
|
||||
it("tracks and closes cached stores independently across paths", () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-paths-"));
|
||||
cleanupDirs.push(root);
|
||||
const first = acquireDebugProxyCaptureStore(
|
||||
path.join(root, "first.sqlite"),
|
||||
path.join(root, "first-blobs"),
|
||||
);
|
||||
const second = acquireDebugProxyCaptureStore(
|
||||
path.join(root, "second.sqlite"),
|
||||
path.join(root, "second-blobs"),
|
||||
);
|
||||
const first = acquireDebugProxyCaptureStore({
|
||||
env: makeStateEnv("openclaw-proxy-capture-first-"),
|
||||
});
|
||||
const second = acquireDebugProxyCaptureStore({
|
||||
env: makeStateEnv("openclaw-proxy-capture-second-"),
|
||||
});
|
||||
|
||||
first.release();
|
||||
expect(first.store.isClosed).toBe(true);
|
||||
@@ -117,8 +80,6 @@ describe("DebugProxyCaptureStore", () => {
|
||||
});
|
||||
|
||||
it("uses rollback journaling for captures on NFS-backed volumes", () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-nfs-"));
|
||||
cleanupDirs.push(root);
|
||||
vi.spyOn(fs, "statfsSync").mockReturnValue({
|
||||
type: 0x6969,
|
||||
bsize: 1024,
|
||||
@@ -129,10 +90,9 @@ describe("DebugProxyCaptureStore", () => {
|
||||
ffree: 0,
|
||||
});
|
||||
|
||||
const store = new DebugProxyCaptureStore(
|
||||
path.join(root, "capture.sqlite"),
|
||||
path.join(root, "blobs"),
|
||||
);
|
||||
const store = new DebugProxyCaptureStore({
|
||||
env: makeStateEnv("openclaw-proxy-capture-nfs-"),
|
||||
});
|
||||
try {
|
||||
expect(store.db.prepare("PRAGMA journal_mode").get()).toMatchObject({
|
||||
journal_mode: "delete",
|
||||
@@ -142,31 +102,40 @@ describe("DebugProxyCaptureStore", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it.runIf(process.platform !== "win32")("keeps capture databases and blobs private", () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-permissions-"));
|
||||
cleanupDirs.push(root);
|
||||
const dbDir = path.join(root, "db");
|
||||
const dbPath = path.join(dbDir, "capture.sqlite");
|
||||
const blobDir = path.join(root, "blobs");
|
||||
const store = new DebugProxyCaptureStore(dbPath, blobDir);
|
||||
const blob = store.persistPayload(Buffer.from("authorization: Bearer secret"));
|
||||
it.runIf(process.platform !== "win32")(
|
||||
"stores capture blobs in the private shared state database",
|
||||
() => {
|
||||
const env = makeStateEnv("openclaw-proxy-capture-permissions-");
|
||||
const root = env.OPENCLAW_STATE_DIR!;
|
||||
const store = new DebugProxyCaptureStore({ env });
|
||||
const blob = store.persistPayload(Buffer.from("authorization: Bearer secret"));
|
||||
const row = store.db
|
||||
.prepare(
|
||||
`SELECT encoding, size_bytes AS sizeBytes, sha256, data
|
||||
FROM capture_blobs
|
||||
WHERE blob_id = ?`,
|
||||
)
|
||||
.get(blob.blobId) as
|
||||
| { data: Uint8Array; encoding: string; sha256: string; sizeBytes: number }
|
||||
| undefined;
|
||||
|
||||
expect(readMode(dbDir)).toBe(0o700);
|
||||
expect(readMode(blobDir)).toBe(0o700);
|
||||
for (const databaseFile of resolveSqliteDatabaseFilePaths(dbPath).filter(fs.existsSync)) {
|
||||
expect(readMode(databaseFile)).toBe(0o600);
|
||||
}
|
||||
expect(readMode(blob.path)).toBe(0o600);
|
||||
|
||||
store.close();
|
||||
fs.chmodSync(dbPath, 0o644);
|
||||
fs.chmodSync(blob.path, 0o644);
|
||||
const reopened = new DebugProxyCaptureStore(dbPath, blobDir);
|
||||
reopened.persistPayload(Buffer.from("authorization: Bearer secret"));
|
||||
expect(readMode(dbPath)).toBe(0o600);
|
||||
expect(readMode(blob.path)).toBe(0o600);
|
||||
reopened.close();
|
||||
});
|
||||
expect(store.dbPath).toBe(path.join(root, "state", "openclaw.sqlite"));
|
||||
expect(fs.existsSync(path.join(root, "debug-proxy", "capture.sqlite"))).toBe(false);
|
||||
expect(fs.existsSync(path.join(root, "debug-proxy", "blobs"))).toBe(false);
|
||||
expect(row).toMatchObject({
|
||||
encoding: "gzip",
|
||||
sha256: blob.sha256,
|
||||
sizeBytes: blob.sizeBytes,
|
||||
});
|
||||
expect(Buffer.from(row?.data ?? []).toString("utf8")).not.toContain("Bearer secret");
|
||||
expect(readMode(path.dirname(store.dbPath))).toBe(0o700);
|
||||
for (const databaseFile of resolveSqliteDatabaseFilePaths(store.dbPath).filter(
|
||||
fs.existsSync,
|
||||
)) {
|
||||
expect(readMode(databaseFile)).toBe(0o600);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
it("ignores duplicate close calls", () => {
|
||||
const store = makeStore();
|
||||
@@ -184,8 +153,6 @@ describe("DebugProxyCaptureStore", () => {
|
||||
mode: "proxy-run",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
dbPath: store.dbPath,
|
||||
blobDir: store.blobDir,
|
||||
});
|
||||
const firstPayload = persistEventPayload(store, {
|
||||
data: '{"ok":true}',
|
||||
@@ -230,6 +197,43 @@ describe("DebugProxyCaptureStore", () => {
|
||||
expect(store.readBlob(firstPayload.dataBlobId ?? "")).toContain('"ok":true');
|
||||
});
|
||||
|
||||
it("creates and later upgrades an implicit session for direct event capture", () => {
|
||||
const store = makeStore();
|
||||
store.recordEvent({
|
||||
sessionId: "session-direct",
|
||||
ts: 20,
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "provider",
|
||||
protocol: "https",
|
||||
direction: "outbound",
|
||||
kind: "request",
|
||||
flowId: "flow-direct",
|
||||
dataBlobId: "already-purged",
|
||||
});
|
||||
|
||||
expect(store.listSessions(10)[0]).toMatchObject({
|
||||
id: "session-direct",
|
||||
mode: "implicit",
|
||||
});
|
||||
expect(store.getSessionEvents("session-direct", 10)[0]).toMatchObject({
|
||||
dataBlobId: null,
|
||||
});
|
||||
|
||||
store.upsertSession({
|
||||
id: "session-direct",
|
||||
startedAt: 10,
|
||||
mode: "runtime",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
});
|
||||
|
||||
expect(store.listSessions(10)[0]).toMatchObject({
|
||||
id: "session-direct",
|
||||
mode: "runtime",
|
||||
startedAt: 10,
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps shared blobs when deleting one of multiple referencing sessions", () => {
|
||||
const store = makeStore();
|
||||
const sharedPayload = persistEventPayload(store, {
|
||||
@@ -244,8 +248,6 @@ describe("DebugProxyCaptureStore", () => {
|
||||
mode: "proxy-run",
|
||||
sourceScope: "openclaw",
|
||||
sourceProcess: "openclaw",
|
||||
dbPath: store.dbPath,
|
||||
blobDir: store.blobDir,
|
||||
});
|
||||
store.recordEvent({
|
||||
sessionId,
|
||||
@@ -270,5 +272,12 @@ describe("DebugProxyCaptureStore", () => {
|
||||
expect(result.blobs).toBe(0);
|
||||
expect(store.readBlob(sharedPayload.dataBlobId ?? "")).toContain('"shared":true');
|
||||
expect(store.listSessions(10).map((session) => session.id)).toEqual(["session-b"]);
|
||||
|
||||
expect(store.deleteSessions(["session-b"])).toEqual({
|
||||
sessions: 1,
|
||||
events: 1,
|
||||
blobs: 1,
|
||||
});
|
||||
expect(store.readBlob(sharedPayload.dataBlobId ?? "")).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,17 +1,11 @@
|
||||
// Proxy capture SQLite store persists capture metadata and replayable exchanges.
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { createHash } from "node:crypto";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { gunzipSync, gzipSync } from "node:zlib";
|
||||
import { normalizeNullableString as normalizeObservedValue } from "@openclaw/normalization-core/string-coerce";
|
||||
import { normalizeUniqueStringEntries } from "@openclaw/normalization-core/string-normalization";
|
||||
import { requireNodeSqlite } from "../infra/node-sqlite.js";
|
||||
import { applyPrivateModeSync } from "../infra/private-mode.js";
|
||||
import { resolveSqliteDatabaseFilePaths } from "../infra/sqlite-files.js";
|
||||
import {
|
||||
configureSqliteConnectionPragmas,
|
||||
type SqliteWalMaintenance,
|
||||
} from "../infra/sqlite-wal.js";
|
||||
import { readCaptureBlobText, writeCaptureBlob } from "./blob-store.js";
|
||||
import { runSqliteImmediateTransactionSync } from "../infra/sqlite-transaction.js";
|
||||
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
|
||||
import type {
|
||||
CaptureBlobRecord,
|
||||
CaptureEventRecord,
|
||||
@@ -23,122 +17,11 @@ import type {
|
||||
CaptureSessionSummary,
|
||||
} from "./types.js";
|
||||
|
||||
// SQLite-backed debug proxy store. Metadata stays in SQLite; large payloads are
|
||||
// compressed into the blob directory and referenced by hash.
|
||||
const DEBUG_PROXY_CAPTURE_DIR_MODE = 0o700;
|
||||
const DEBUG_PROXY_CAPTURE_FILE_MODE = 0o600;
|
||||
|
||||
function isInMemoryDatabasePath(dbPath: string): boolean {
|
||||
if (dbPath === ":memory:") {
|
||||
return true;
|
||||
}
|
||||
if (!dbPath.startsWith("file:")) {
|
||||
return false;
|
||||
}
|
||||
const fragmentIndex = dbPath.indexOf("#");
|
||||
const uriWithoutFragment = fragmentIndex === -1 ? dbPath : dbPath.slice(0, fragmentIndex);
|
||||
const queryIndex = uriWithoutFragment.indexOf("?");
|
||||
const uriPath =
|
||||
queryIndex === -1 ? uriWithoutFragment : uriWithoutFragment.slice(0, queryIndex);
|
||||
try {
|
||||
if (decodeURIComponent(uriPath.slice("file:".length)) === ":memory:") {
|
||||
return true;
|
||||
}
|
||||
} catch {
|
||||
// Malformed escapes cannot identify a memory URI; retain file-backed handling.
|
||||
}
|
||||
return (
|
||||
queryIndex !== -1 &&
|
||||
new URLSearchParams(uriWithoutFragment.slice(queryIndex + 1)).get("mode") === "memory"
|
||||
);
|
||||
}
|
||||
|
||||
function ensureParentDir(filePath: string) {
|
||||
fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: DEBUG_PROXY_CAPTURE_DIR_MODE });
|
||||
}
|
||||
|
||||
function hardenDatabaseFiles(dbPath: string): void {
|
||||
for (const candidate of resolveSqliteDatabaseFilePaths(dbPath)) {
|
||||
if (fs.existsSync(candidate)) {
|
||||
applyPrivateModeSync(candidate, DEBUG_PROXY_CAPTURE_FILE_MODE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type OpenedDatabase = {
|
||||
db: DatabaseSync;
|
||||
walMaintenance: SqliteWalMaintenance;
|
||||
// Capture rows and compressed payload BLOBs live in the shared global state DB.
|
||||
export type DebugProxyCaptureStoreOptions = {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
};
|
||||
|
||||
function openDatabase(dbPath: string): OpenedDatabase {
|
||||
const fileBackedPath = isInMemoryDatabasePath(dbPath) ? undefined : dbPath;
|
||||
if (fileBackedPath) {
|
||||
ensureParentDir(fileBackedPath);
|
||||
if (!fs.existsSync(fileBackedPath)) {
|
||||
fs.closeSync(fs.openSync(fileBackedPath, "a", DEBUG_PROXY_CAPTURE_FILE_MODE));
|
||||
}
|
||||
}
|
||||
const { DatabaseSync } = requireNodeSqlite();
|
||||
const db = new DatabaseSync(dbPath);
|
||||
let walMaintenance: SqliteWalMaintenance | undefined;
|
||||
try {
|
||||
if (fileBackedPath) {
|
||||
applyPrivateModeSync(fileBackedPath, DEBUG_PROXY_CAPTURE_FILE_MODE);
|
||||
}
|
||||
walMaintenance = configureSqliteConnectionPragmas(db, {
|
||||
busyTimeoutMs: 5000,
|
||||
databaseLabel: "debug-proxy-capture",
|
||||
...(fileBackedPath ? { databasePath: fileBackedPath } : {}),
|
||||
});
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS capture_sessions (
|
||||
id TEXT PRIMARY KEY,
|
||||
started_at INTEGER NOT NULL,
|
||||
ended_at INTEGER,
|
||||
mode TEXT NOT NULL,
|
||||
source_scope TEXT NOT NULL,
|
||||
source_process TEXT NOT NULL,
|
||||
proxy_url TEXT,
|
||||
db_path TEXT NOT NULL,
|
||||
blob_dir TEXT NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS capture_events (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id TEXT NOT NULL,
|
||||
ts INTEGER NOT NULL,
|
||||
source_scope TEXT NOT NULL,
|
||||
source_process TEXT NOT NULL,
|
||||
protocol TEXT NOT NULL,
|
||||
direction TEXT NOT NULL,
|
||||
kind TEXT NOT NULL,
|
||||
flow_id TEXT NOT NULL,
|
||||
method TEXT,
|
||||
host TEXT,
|
||||
path TEXT,
|
||||
status INTEGER,
|
||||
close_code INTEGER,
|
||||
content_type TEXT,
|
||||
headers_json TEXT,
|
||||
data_text TEXT,
|
||||
data_blob_id TEXT,
|
||||
data_sha256 TEXT,
|
||||
error_text TEXT,
|
||||
meta_json TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS capture_events_session_ts_idx ON capture_events(session_id, ts);
|
||||
CREATE INDEX IF NOT EXISTS capture_events_flow_idx ON capture_events(flow_id, ts);
|
||||
`);
|
||||
if (fileBackedPath) {
|
||||
hardenDatabaseFiles(fileBackedPath);
|
||||
}
|
||||
return { db, walMaintenance };
|
||||
} catch (err) {
|
||||
walMaintenance?.close();
|
||||
db.close();
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
function serializeJson(value: unknown): string | null {
|
||||
return value == null ? null : JSON.stringify(value);
|
||||
}
|
||||
@@ -165,24 +48,19 @@ function sortObservedCounts(counts: Map<string, number>): CaptureObservedDimensi
|
||||
|
||||
export class DebugProxyCaptureStore {
|
||||
readonly db: DatabaseSync;
|
||||
private readonly walMaintenance: SqliteWalMaintenance;
|
||||
readonly dbPath: string;
|
||||
private closed = false;
|
||||
|
||||
constructor(
|
||||
readonly dbPath: string,
|
||||
readonly blobDir: string,
|
||||
) {
|
||||
const opened = openDatabase(dbPath);
|
||||
this.db = opened.db;
|
||||
this.walMaintenance = opened.walMaintenance;
|
||||
constructor(options: DebugProxyCaptureStoreOptions = {}) {
|
||||
const database = openOpenClawStateDatabase({ env: options.env });
|
||||
this.db = database.db;
|
||||
this.dbPath = database.path;
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.walMaintenance.close();
|
||||
this.db.close();
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
@@ -194,10 +72,15 @@ export class DebugProxyCaptureStore {
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO capture_sessions (
|
||||
id, started_at, ended_at, mode, source_scope, source_process, proxy_url, db_path, blob_dir
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
id, started_at, ended_at, mode, source_scope, source_process, proxy_url
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
started_at=MIN(capture_sessions.started_at, excluded.started_at),
|
||||
ended_at=excluded.ended_at,
|
||||
mode=CASE
|
||||
WHEN capture_sessions.mode = 'implicit' THEN excluded.mode
|
||||
ELSE capture_sessions.mode
|
||||
END,
|
||||
proxy_url=excluded.proxy_url,
|
||||
source_process=excluded.source_process`,
|
||||
)
|
||||
@@ -209,8 +92,6 @@ export class DebugProxyCaptureStore {
|
||||
session.sourceScope,
|
||||
session.sourceProcess,
|
||||
session.proxyUrl ?? null,
|
||||
session.dbPath,
|
||||
session.blobDir,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -221,40 +102,82 @@ export class DebugProxyCaptureStore {
|
||||
}
|
||||
|
||||
persistPayload(data: Buffer, contentType?: string): CaptureBlobRecord {
|
||||
return writeCaptureBlob({ blobDir: this.blobDir, data, contentType });
|
||||
const sha256 = createHash("sha256").update(data).digest("hex");
|
||||
const blobId = sha256.slice(0, 24);
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT OR IGNORE INTO capture_blobs (
|
||||
blob_id, content_type, encoding, size_bytes, sha256, data, created_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
)
|
||||
.run(
|
||||
blobId,
|
||||
contentType ?? null,
|
||||
"gzip",
|
||||
data.byteLength,
|
||||
sha256,
|
||||
gzipSync(data),
|
||||
Date.now(),
|
||||
);
|
||||
return {
|
||||
blobId,
|
||||
encoding: "gzip",
|
||||
sizeBytes: data.byteLength,
|
||||
sha256,
|
||||
...(contentType ? { contentType } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
recordEvent(event: CaptureEventRecord): void {
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO capture_events (
|
||||
session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id,
|
||||
method, host, path, status, close_code, content_type, headers_json,
|
||||
data_text, data_blob_id, data_sha256, error_text, meta_json
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
)
|
||||
.run(
|
||||
event.sessionId,
|
||||
event.ts,
|
||||
event.sourceScope,
|
||||
event.sourceProcess,
|
||||
event.protocol,
|
||||
event.direction,
|
||||
event.kind,
|
||||
event.flowId,
|
||||
event.method ?? null,
|
||||
event.host ?? null,
|
||||
event.path ?? null,
|
||||
event.status ?? null,
|
||||
event.closeCode ?? null,
|
||||
event.contentType ?? null,
|
||||
event.headersJson ?? null,
|
||||
event.dataText ?? null,
|
||||
event.dataBlobId ?? null,
|
||||
event.dataSha256 ?? null,
|
||||
event.errorText ?? null,
|
||||
event.metaJson ?? null,
|
||||
);
|
||||
runSqliteImmediateTransactionSync(this.db, () => {
|
||||
// Capture can be invoked directly by provider seams before the top-level
|
||||
// runtime initializes. Keep the shared-schema foreign key valid without
|
||||
// making diagnostics break the request they are observing.
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT OR IGNORE INTO capture_sessions (
|
||||
id, started_at, mode, source_scope, source_process
|
||||
) VALUES (?, ?, 'implicit', ?, ?)`,
|
||||
)
|
||||
.run(event.sessionId, event.ts, event.sourceScope, event.sourceProcess);
|
||||
// A concurrent purge can remove a payload before its event is recorded.
|
||||
// Keep the inline preview instead of failing the observed request.
|
||||
const dataBlobId =
|
||||
event.dataBlobId &&
|
||||
this.db.prepare(`SELECT 1 FROM capture_blobs WHERE blob_id = ?`).get(event.dataBlobId)
|
||||
? event.dataBlobId
|
||||
: null;
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO capture_events (
|
||||
session_id, ts, source_scope, source_process, protocol, direction, kind, flow_id,
|
||||
method, host, path, status, close_code, content_type, headers_json,
|
||||
data_text, data_blob_id, data_sha256, error_text, meta_json
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
)
|
||||
.run(
|
||||
event.sessionId,
|
||||
event.ts,
|
||||
event.sourceScope,
|
||||
event.sourceProcess,
|
||||
event.protocol,
|
||||
event.direction,
|
||||
event.kind,
|
||||
event.flowId,
|
||||
event.method ?? null,
|
||||
event.host ?? null,
|
||||
event.path ?? null,
|
||||
event.status ?? null,
|
||||
event.closeCode ?? null,
|
||||
event.contentType ?? null,
|
||||
event.headersJson ?? null,
|
||||
event.dataText ?? null,
|
||||
dataBlobId,
|
||||
event.dataSha256 ?? null,
|
||||
event.errorText ?? null,
|
||||
event.metaJson ?? null,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
listSessions(limit = 50): CaptureSessionSummary[] {
|
||||
@@ -352,13 +275,13 @@ export class DebugProxyCaptureStore {
|
||||
|
||||
readBlob(blobId: string): string | null {
|
||||
const row = this.db
|
||||
.prepare(`SELECT data_blob_id AS blobId FROM capture_events WHERE data_blob_id = ? LIMIT 1`)
|
||||
.get(blobId) as { blobId?: string } | undefined;
|
||||
if (!row?.blobId) {
|
||||
.prepare(`SELECT encoding, data FROM capture_blobs WHERE blob_id = ?`)
|
||||
.get(blobId) as { data?: Uint8Array; encoding?: string } | undefined;
|
||||
if (!row?.data) {
|
||||
return null;
|
||||
}
|
||||
const blobPath = path.join(this.blobDir, `${row.blobId}.bin.gz`);
|
||||
return fs.existsSync(blobPath) ? readCaptureBlobText(blobPath) : null;
|
||||
const data = Buffer.from(row.data);
|
||||
return (row.encoding === "gzip" ? gunzipSync(data) : data).toString("utf8");
|
||||
}
|
||||
|
||||
queryPreset(preset: CaptureQueryPreset, sessionId?: string): CaptureQueryRow[] {
|
||||
@@ -442,21 +365,24 @@ export class DebugProxyCaptureStore {
|
||||
}
|
||||
|
||||
purgeAll(): { sessions: number; events: number; blobs: number } {
|
||||
const sessionCount =
|
||||
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as { count: number })
|
||||
.count ?? 0;
|
||||
const eventCount =
|
||||
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number })
|
||||
.count ?? 0;
|
||||
this.db.exec(`DELETE FROM capture_events; DELETE FROM capture_sessions;`);
|
||||
let blobs = 0;
|
||||
if (fs.existsSync(this.blobDir)) {
|
||||
for (const entry of fs.readdirSync(this.blobDir)) {
|
||||
fs.rmSync(path.join(this.blobDir, entry), { force: true });
|
||||
blobs += 1;
|
||||
}
|
||||
}
|
||||
return { sessions: sessionCount, events: eventCount, blobs };
|
||||
return runSqliteImmediateTransactionSync(this.db, () => {
|
||||
const sessionCount =
|
||||
(
|
||||
this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as {
|
||||
count: number;
|
||||
}
|
||||
).count ?? 0;
|
||||
const eventCount =
|
||||
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number })
|
||||
.count ?? 0;
|
||||
const blobCount =
|
||||
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_blobs`).get() as { count: number })
|
||||
.count ?? 0;
|
||||
this.db.exec(
|
||||
`DELETE FROM capture_events; DELETE FROM capture_sessions; DELETE FROM capture_blobs;`,
|
||||
);
|
||||
return { sessions: sessionCount, events: eventCount, blobs: blobCount };
|
||||
});
|
||||
}
|
||||
|
||||
deleteSessions(sessionIds: string[]): { sessions: number; events: number; blobs: number } {
|
||||
@@ -464,75 +390,76 @@ export class DebugProxyCaptureStore {
|
||||
if (uniqueSessionIds.length === 0) {
|
||||
return { sessions: 0, events: 0, blobs: 0 };
|
||||
}
|
||||
const placeholders = uniqueSessionIds.map(() => "?").join(", ");
|
||||
const blobRows = this.db
|
||||
.prepare(
|
||||
`SELECT DISTINCT data_blob_id AS blobId
|
||||
FROM capture_events
|
||||
WHERE session_id IN (${placeholders})
|
||||
AND data_blob_id IS NOT NULL`,
|
||||
)
|
||||
.all(...uniqueSessionIds) as Array<{ blobId?: string | null }>;
|
||||
const eventCount =
|
||||
(
|
||||
this.db
|
||||
.prepare(
|
||||
`SELECT COUNT(*) AS count
|
||||
FROM capture_events
|
||||
WHERE session_id IN (${placeholders})`,
|
||||
)
|
||||
.get(...uniqueSessionIds) as { count: number }
|
||||
).count ?? 0;
|
||||
const sessionCount =
|
||||
(
|
||||
this.db
|
||||
.prepare(
|
||||
`SELECT COUNT(*) AS count
|
||||
FROM capture_sessions
|
||||
WHERE id IN (${placeholders})`,
|
||||
)
|
||||
.get(...uniqueSessionIds) as { count: number }
|
||||
).count ?? 0;
|
||||
this.db
|
||||
.prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`)
|
||||
.run(...uniqueSessionIds);
|
||||
this.db
|
||||
.prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`)
|
||||
.run(...uniqueSessionIds);
|
||||
const candidateBlobIds = blobRows
|
||||
.map((row) => row.blobId?.trim())
|
||||
.filter((blobId): blobId is string => Boolean(blobId));
|
||||
const remainingBlobRefs =
|
||||
// Shared blobs are deleted only when no surviving event references them.
|
||||
candidateBlobIds.length > 0
|
||||
? new Set(
|
||||
(
|
||||
this.db
|
||||
.prepare(
|
||||
`SELECT DISTINCT data_blob_id AS blobId
|
||||
FROM capture_events
|
||||
WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")})
|
||||
AND data_blob_id IS NOT NULL`,
|
||||
)
|
||||
.all(...candidateBlobIds) as Array<{ blobId?: string | null }>
|
||||
return runSqliteImmediateTransactionSync(this.db, () => {
|
||||
const placeholders = uniqueSessionIds.map(() => "?").join(", ");
|
||||
const blobRows = this.db
|
||||
.prepare(
|
||||
`SELECT DISTINCT data_blob_id AS blobId
|
||||
FROM capture_events
|
||||
WHERE session_id IN (${placeholders})
|
||||
AND data_blob_id IS NOT NULL`,
|
||||
)
|
||||
.all(...uniqueSessionIds) as Array<{ blobId?: string | null }>;
|
||||
const eventCount =
|
||||
(
|
||||
this.db
|
||||
.prepare(
|
||||
`SELECT COUNT(*) AS count
|
||||
FROM capture_events
|
||||
WHERE session_id IN (${placeholders})`,
|
||||
)
|
||||
.map((row) => row.blobId?.trim())
|
||||
.filter((blobId): blobId is string => Boolean(blobId)),
|
||||
)
|
||||
: new Set<string>();
|
||||
let blobs = 0;
|
||||
for (const row of blobRows) {
|
||||
const blobId = row.blobId?.trim();
|
||||
if (!blobId || remainingBlobRefs.has(blobId)) {
|
||||
continue;
|
||||
.get(...uniqueSessionIds) as { count: number }
|
||||
).count ?? 0;
|
||||
const sessionCount =
|
||||
(
|
||||
this.db
|
||||
.prepare(
|
||||
`SELECT COUNT(*) AS count
|
||||
FROM capture_sessions
|
||||
WHERE id IN (${placeholders})`,
|
||||
)
|
||||
.get(...uniqueSessionIds) as { count: number }
|
||||
).count ?? 0;
|
||||
this.db
|
||||
.prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`)
|
||||
.run(...uniqueSessionIds);
|
||||
this.db
|
||||
.prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`)
|
||||
.run(...uniqueSessionIds);
|
||||
const candidateBlobIds = blobRows
|
||||
.map((row) => row.blobId?.trim())
|
||||
.filter((blobId): blobId is string => Boolean(blobId));
|
||||
const remainingBlobRefs =
|
||||
// Shared blobs are deleted only when no surviving event references them.
|
||||
candidateBlobIds.length > 0
|
||||
? new Set(
|
||||
(
|
||||
this.db
|
||||
.prepare(
|
||||
`SELECT DISTINCT data_blob_id AS blobId
|
||||
FROM capture_events
|
||||
WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")})
|
||||
AND data_blob_id IS NOT NULL`,
|
||||
)
|
||||
.all(...candidateBlobIds) as Array<{ blobId?: string | null }>
|
||||
)
|
||||
.map((row) => row.blobId?.trim())
|
||||
.filter((blobId): blobId is string => Boolean(blobId)),
|
||||
)
|
||||
: new Set<string>();
|
||||
let blobs = 0;
|
||||
const deleteBlob = this.db.prepare(`DELETE FROM capture_blobs WHERE blob_id = ?`);
|
||||
for (const blobId of candidateBlobIds) {
|
||||
if (remainingBlobRefs.has(blobId)) {
|
||||
continue;
|
||||
}
|
||||
const result = deleteBlob.run(blobId);
|
||||
if (Number(result.changes) > 0) {
|
||||
blobs += 1;
|
||||
}
|
||||
}
|
||||
const blobPath = path.join(this.blobDir, `${blobId}.bin.gz`);
|
||||
if (fs.existsSync(blobPath)) {
|
||||
fs.rmSync(blobPath, { force: true });
|
||||
blobs += 1;
|
||||
}
|
||||
}
|
||||
return { sessions: sessionCount, events: eventCount, blobs };
|
||||
return { sessions: sessionCount, events: eventCount, blobs };
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -543,13 +470,15 @@ type CachedStoreEntry = {
|
||||
|
||||
const cachedStores = new Map<string, CachedStoreEntry>();
|
||||
|
||||
export function getDebugProxyCaptureStore(dbPath: string, blobDir: string): DebugProxyCaptureStore {
|
||||
const key = `${dbPath}:${blobDir}`;
|
||||
export function getDebugProxyCaptureStore(
|
||||
options: DebugProxyCaptureStoreOptions = {},
|
||||
): DebugProxyCaptureStore {
|
||||
const key = openOpenClawStateDatabase({ env: options.env }).path;
|
||||
const cached = cachedStores.get(key);
|
||||
if (cached && !cached.store.isClosed) {
|
||||
return cached.store;
|
||||
}
|
||||
const store = new DebugProxyCaptureStore(dbPath, blobDir);
|
||||
const store = new DebugProxyCaptureStore(options);
|
||||
cachedStores.set(key, { store, leases: 0 });
|
||||
return store;
|
||||
}
|
||||
@@ -561,14 +490,14 @@ export function closeDebugProxyCaptureStore(): void {
|
||||
cachedStores.clear();
|
||||
}
|
||||
|
||||
// Lease API keeps one cached synchronous SQLite connection alive across related
|
||||
// capture operations, then closes it when the last owner releases.
|
||||
export function acquireDebugProxyCaptureStore(
|
||||
dbPath: string,
|
||||
blobDir: string,
|
||||
): { store: DebugProxyCaptureStore; release: () => void } {
|
||||
const key = `${dbPath}:${blobDir}`;
|
||||
const store = getDebugProxyCaptureStore(dbPath, blobDir);
|
||||
// Lease API keeps one cached capture-store wrapper alive across related
|
||||
// operations, then releases it without closing the shared state database.
|
||||
export function acquireDebugProxyCaptureStore(options: DebugProxyCaptureStoreOptions = {}): {
|
||||
store: DebugProxyCaptureStore;
|
||||
release: () => void;
|
||||
} {
|
||||
const store = getDebugProxyCaptureStore(options);
|
||||
const key = store.dbPath;
|
||||
const cached = cachedStores.get(key);
|
||||
if (!cached || cached.store !== store) {
|
||||
throw new Error("debug proxy capture store cache changed while acquiring a lease");
|
||||
|
||||
@@ -23,13 +23,10 @@ export type CaptureSessionRecord = {
|
||||
sourceScope: "openclaw";
|
||||
sourceProcess: string;
|
||||
proxyUrl?: string;
|
||||
dbPath: string;
|
||||
blobDir: string;
|
||||
};
|
||||
|
||||
export type CaptureBlobRecord = {
|
||||
blobId: string;
|
||||
path: string;
|
||||
encoding: "gzip";
|
||||
sizeBytes: number;
|
||||
sha256: string;
|
||||
|
||||
Reference in New Issue
Block a user