mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 04:36:22 +00:00
fix: preserve sqlite transcript discovery
This commit is contained in:
@@ -30,7 +30,7 @@ export function openMemoryDatabaseAtPath(
|
||||
// failing immediately with SQLITE_BUSY.
|
||||
db.exec(`PRAGMA busy_timeout = ${MEMORY_SQLITE_BUSY_TIMEOUT_MS}`);
|
||||
if (agentId) {
|
||||
ensureOpenClawAgentDatabaseSchema(db, { agentId, path: dbPath, register: true });
|
||||
ensureOpenClawAgentDatabaseSchema(db, { agentId, path: dbPath, register: false });
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,10 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import {
|
||||
closeOpenClawStateDatabaseForTest,
|
||||
openOpenClawStateDatabase,
|
||||
} from "openclaw/plugin-sdk/sqlite-runtime";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { MEMORY_SQLITE_BUSY_TIMEOUT_MS, openMemoryDatabaseAtPath } from "./manager-db.js";
|
||||
import {
|
||||
@@ -158,6 +162,7 @@ describe("memory manager readonly recovery", () => {
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
@@ -235,6 +240,20 @@ describe("memory manager readonly recovery", () => {
|
||||
db.close();
|
||||
});
|
||||
|
||||
it("does not register memory sqlite as the agent transcript database", () => {
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state"));
|
||||
|
||||
const db = openMemoryDatabaseAtPath(indexPath, false, "main");
|
||||
db.close();
|
||||
|
||||
const stateDb = openOpenClawStateDatabase().db;
|
||||
const registered = stateDb
|
||||
.prepare("SELECT path FROM agent_databases WHERE agent_id = ?")
|
||||
.get("main");
|
||||
|
||||
expect(registered).toBeUndefined();
|
||||
});
|
||||
|
||||
it("queues targeted session scopes behind an in-flight sync", async () => {
|
||||
let releaseSync = () => {};
|
||||
const pendingSync = new Promise<void>((resolve) => {
|
||||
|
||||
@@ -290,6 +290,49 @@ describe("SQLite session transcript store", () => {
|
||||
).toBe(8);
|
||||
});
|
||||
|
||||
it("preserves event timestamps when replacing transcript rows", () => {
|
||||
const stateDir = createTempDir();
|
||||
const env = { OPENCLAW_STATE_DIR: stateDir };
|
||||
const sessionStartedAt = Date.parse("2026-02-05T10:00:00.000Z");
|
||||
const lastMessageAt = Date.parse("2026-02-05T10:05:00.000Z");
|
||||
|
||||
replaceSqliteSessionTranscriptEvents({
|
||||
env,
|
||||
agentId: "main",
|
||||
sessionId: "session-1",
|
||||
events: [
|
||||
{
|
||||
type: "session",
|
||||
id: "session-1",
|
||||
},
|
||||
{
|
||||
type: "message",
|
||||
id: "m1",
|
||||
timestamp: "2026-02-05T10:00:00.000Z",
|
||||
message: { role: "user", content: "hi" },
|
||||
},
|
||||
{
|
||||
type: "message",
|
||||
id: "m2",
|
||||
timestamp: "2026-02-05T10:05:00.000Z",
|
||||
message: { role: "assistant", content: "ok" },
|
||||
},
|
||||
],
|
||||
now: () => Date.parse("2026-02-10T00:00:00.000Z"),
|
||||
});
|
||||
|
||||
expect(
|
||||
loadSqliteSessionTranscriptEvents({
|
||||
env,
|
||||
agentId: "main",
|
||||
sessionId: "session-1",
|
||||
}).map((entry) => entry.createdAt),
|
||||
).toEqual([sessionStartedAt, sessionStartedAt, lastMessageAt]);
|
||||
expect(listSqliteSessionTranscripts({ env, agentId: "main" })[0]?.updatedAt).toBe(
|
||||
lastMessageAt,
|
||||
);
|
||||
});
|
||||
|
||||
it("lists SQLite transcript scopes", () => {
|
||||
const stateDir = createTempDir();
|
||||
const env = { OPENCLAW_STATE_DIR: stateDir };
|
||||
|
||||
@@ -107,6 +107,30 @@ function parseCreatedAt(value: unknown): number {
|
||||
return typeof value === "bigint" ? Number(value) : Number(value);
|
||||
}
|
||||
|
||||
function parseTranscriptTimestampMs(value: unknown): number | undefined {
|
||||
if (typeof value === "number") {
|
||||
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : undefined;
|
||||
}
|
||||
if (typeof value !== "string" || !value.trim()) {
|
||||
return undefined;
|
||||
}
|
||||
const parsed = Date.parse(value);
|
||||
return Number.isFinite(parsed) && parsed >= 0 ? parsed : undefined;
|
||||
}
|
||||
|
||||
function readTranscriptEventTimestampMs(event: unknown): number | undefined {
|
||||
if (!event || typeof event !== "object" || Array.isArray(event)) {
|
||||
return undefined;
|
||||
}
|
||||
const record = event as Record<string, unknown>;
|
||||
return (
|
||||
parseTranscriptTimestampMs(record.timestamp) ??
|
||||
(record.message && typeof record.message === "object"
|
||||
? parseTranscriptTimestampMs((record.message as Record<string, unknown>).timestamp)
|
||||
: undefined)
|
||||
);
|
||||
}
|
||||
|
||||
function parseTranscriptEventRow(row: {
|
||||
seq: number | bigint;
|
||||
event_json: unknown;
|
||||
@@ -633,16 +657,34 @@ export function replaceSqliteSessionTranscriptEvents(
|
||||
): { replaced: number } {
|
||||
const { sessionId } = normalizeTranscriptScope(options);
|
||||
const now = options.now?.() ?? Date.now();
|
||||
const timestamps = options.events.map(readTranscriptEventTimestampMs);
|
||||
let fallbackCreatedAt = timestamps.find((timestamp) => timestamp !== undefined) ?? now;
|
||||
const entries = options.events.map((event, seq) => {
|
||||
const createdAt = timestamps[seq] ?? fallbackCreatedAt;
|
||||
fallbackCreatedAt = createdAt;
|
||||
return {
|
||||
event,
|
||||
seq,
|
||||
createdAt,
|
||||
};
|
||||
});
|
||||
const updatedAt = entries.length > 0 ? Math.max(...entries.map((entry) => entry.createdAt)) : now;
|
||||
runOpenClawAgentWriteTransaction((database) => {
|
||||
ensureTranscriptSessionRoot({ database, sessionId, updatedAt: now });
|
||||
ensureTranscriptSessionRoot({ database, sessionId, updatedAt });
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
getAgentTranscriptKysely(database.db)
|
||||
.deleteFrom("transcript_events")
|
||||
.where("session_id", "=", sessionId),
|
||||
);
|
||||
options.events.forEach((event, seq) => {
|
||||
insertTranscriptEvent({ database, sessionId, seq, event, createdAt: now });
|
||||
entries.forEach((entry) => {
|
||||
insertTranscriptEvent({
|
||||
database,
|
||||
sessionId,
|
||||
seq: entry.seq,
|
||||
event: entry.event,
|
||||
createdAt: entry.createdAt,
|
||||
});
|
||||
});
|
||||
}, options);
|
||||
|
||||
|
||||
@@ -122,6 +122,38 @@ describe("session cost usage", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("discovers sessions that continued after the requested end", async () => {
|
||||
const root = await makeRoot("discover-continued");
|
||||
await withStateDir(root, async () => {
|
||||
writeTranscript({
|
||||
sessionId: "sess-continued",
|
||||
events: [
|
||||
{
|
||||
type: "message",
|
||||
timestamp: "2026-02-05T12:00:00.000Z",
|
||||
message: { role: "user", content: "Summarize this range" },
|
||||
},
|
||||
{
|
||||
type: "message",
|
||||
timestamp: "2026-02-07T12:00:00.000Z",
|
||||
message: { role: "assistant", content: "continued" },
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const sessions = await discoverAllSessions({
|
||||
startMs: Date.parse("2026-02-05T00:00:00.000Z"),
|
||||
endMs: Date.parse("2026-02-06T00:00:00.000Z"),
|
||||
});
|
||||
expect(sessions).toHaveLength(1);
|
||||
expect(sessions[0]).toMatchObject({
|
||||
agentId: "main",
|
||||
sessionId: "sess-continued",
|
||||
firstUserMessage: "Summarize this range",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("loads aggregate usage directly from SQLite transcript events", async () => {
|
||||
const root = await makeRoot("aggregate");
|
||||
await withStateDir(root, async () => {
|
||||
|
||||
@@ -518,9 +518,6 @@ export async function discoverAllSessions(params?: {
|
||||
if (params?.startMs && transcript.updatedAt < params.startMs) {
|
||||
continue;
|
||||
}
|
||||
if (params?.endMs && transcript.updatedAt > params.endMs) {
|
||||
continue;
|
||||
}
|
||||
let firstUserMessage: string | undefined;
|
||||
if (params?.includeFirstUserMessage !== false) {
|
||||
for (const event of loadSqliteSessionTranscriptEvents(transcript)) {
|
||||
|
||||
Reference in New Issue
Block a user