fix(sessions): preserve sqlite scope paths

This commit is contained in:
Peter Steinberger
2026-05-15 20:35:49 +01:00
parent c3b862627c
commit c0cb00bb23
6 changed files with 330 additions and 92 deletions

View File

@@ -171,6 +171,7 @@ function createLegacyStateMigrationDetectionResult(params?: {
legacyStorePath: "/tmp/state/sessions/sessions.json",
agentLegacyDir: "/tmp/state/agents/main/sessions",
agentLegacyStorePath: "/tmp/state/agents/main/sessions/sessions.json",
agentLegacyStores: [],
hasLegacy: params?.hasLegacySessions ?? false,
legacyKeys: [],
},

View File

@@ -327,4 +327,84 @@ describe("state migrations", () => {
await expectMissingPath(resolveLegacyChannelAllowFromPath("chatapp", env, "default"));
await expectMissingPath(resolveLegacyChannelAllowFromPath("chatapp", env, "beta"));
});
it("migrates legacy sessions for every configured agent", async () => {
const root = await createTempDir();
const stateDir = path.join(root, ".openclaw");
const env = createEnv(stateDir);
const cfg = {
...createConfig(),
agents: {
list: [{ id: "worker-1", default: true }, { id: "reviewer" }],
},
} as OpenClawConfig;
await fs.mkdir(path.join(stateDir, "agents", "reviewer", "sessions"), { recursive: true });
await fs.writeFile(
path.join(stateDir, "agents", "reviewer", "sessions", "sessions.json"),
`${JSON.stringify(
{
"group:mobile-reviewer": { sessionId: "reviewer-group", updatedAt: 9 },
},
null,
2,
)}\n`,
"utf8",
);
await fs.writeFile(
path.join(stateDir, "agents", "reviewer", "sessions", "reviewer-trace.jsonl"),
[
JSON.stringify({
type: "session",
id: "reviewer-trace",
timestamp: "2026-05-06T00:00:00.000Z",
}),
JSON.stringify({
type: "message",
timestamp: "2026-05-06T00:00:01.000Z",
message: { role: "user", content: "review me", timestamp: 1 },
}),
].join("\n") + "\n",
"utf8",
);
const detected = await detectLegacyStateMigrations({
cfg,
env,
homedir: () => root,
});
expect(detected.sessions.hasLegacy).toBe(true);
expect(detected.preview).toContain(
`- Sessions: import ${path.join(stateDir, "agents", "reviewer", "sessions", "sessions.json")} into SQLite`,
);
const result = await runLegacyStateMigrations({
detected,
now: () => 1234,
});
expect(result.warnings).toStrictEqual([]);
expect(result.changes).toEqual([
"Imported 1 session index row(s) into SQLite for agent reviewer",
"Canonicalized 1 legacy session key(s)",
"Imported canonical reviewer-trace.jsonl transcript (2 event(s)) into SQLite for agent reviewer",
]);
const reviewerStore = loadSqliteSessionEntries({
agentId: "reviewer",
env,
}) as Record<string, { sessionId: string }>;
expect(reviewerStore["agent:reviewer:mobileauth:group:mobile-reviewer"]?.sessionId).toBe(
"reviewer-group",
);
const importedTranscriptEvents = loadSqliteSessionTranscriptEvents({
agentId: "reviewer",
sessionId: "reviewer-trace",
env,
});
expect(importedTranscriptEvents).toHaveLength(2);
await expectMissingPath(path.join(stateDir, "agents", "reviewer", "sessions", "sessions.json"));
await expectMissingPath(
path.join(stateDir, "agents", "reviewer", "sessions", "reviewer-trace.jsonl"),
);
});
});

View File

@@ -79,6 +79,7 @@ export type LegacyStateDetection = {
legacyStorePath: string;
agentLegacyDir: string;
agentLegacyStorePath: string;
agentLegacyStores: LegacyAgentSessionStoreDetection[];
hasLegacy: boolean;
legacyKeys: string[];
};
@@ -104,6 +105,14 @@ type DoctorSessionMigrationSurface = {
}) => string | null | undefined;
};
type LegacyAgentSessionStoreDetection = {
agentId: string;
legacyDir: string;
legacyStorePath: string;
hasLegacyJsonSessionStore: boolean;
legacyKeys: string[];
};
type MigrationSourceReport = {
kind: string;
sourcePath?: string;
@@ -1788,6 +1797,44 @@ function listLegacySessionKeys(params: {
return legacy;
}
function listLegacySessionMigrationAgentIds(cfg: OpenClawConfig, targetAgentId: string): string[] {
return Array.from(new Set([targetAgentId, ...listAgentIds(cfg)].map(normalizeAgentId)));
}
function detectAgentLegacySessionStores(params: {
cfg: OpenClawConfig;
includeSessions: boolean;
stateDir: string;
targetAgentId: string;
targetMainKey: string;
targetScope?: SessionScope;
}): LegacyAgentSessionStoreDetection[] {
return listLegacySessionMigrationAgentIds(params.cfg, params.targetAgentId).map((agentId) => {
const legacyDir = path.join(params.stateDir, "agents", agentId, "sessions");
const legacyStorePath = path.join(legacyDir, "sessions.json");
const hasLegacyJsonSessionStore = params.includeSessions && fileExists(legacyStorePath);
const parsed = hasLegacyJsonSessionStore
? readSessionStoreJson5(legacyStorePath)
: { store: {}, ok: true };
const legacyKeys =
params.includeSessions && parsed.ok
? listLegacySessionKeys({
store: parsed.store,
agentId,
mainKey: params.targetMainKey,
scope: params.targetScope,
})
: [];
return {
agentId,
legacyDir,
legacyStorePath,
hasLegacyJsonSessionStore,
legacyKeys,
};
});
}
function emptyDirOrMissing(dir: string): boolean {
if (!existsDir(dir)) {
return true;
@@ -1838,18 +1885,23 @@ function collectLegacyMigrationSources(detected: LegacyStateDetection): Migratio
: undefined,
}),
);
add(
statFileSource({
kind: "session-index",
sourcePath: detected.sessions.agentLegacyStorePath,
targetTable: "agent.session_entries",
recordCount: fileExists(detected.sessions.agentLegacyStorePath)
? countSessionStoreRecords(detected.sessions.agentLegacyStorePath)
: undefined,
}),
);
for (const agentStore of detected.sessions.agentLegacyStores) {
add(
statFileSource({
kind: "session-index",
sourcePath: agentStore.legacyStorePath,
targetTable: "agent.session_entries",
recordCount: fileExists(agentStore.legacyStorePath)
? countSessionStoreRecords(agentStore.legacyStorePath)
: undefined,
}),
);
}
for (const dir of [detected.sessions.legacyDir, detected.sessions.agentLegacyDir]) {
for (const dir of [
detected.sessions.legacyDir,
...detected.sessions.agentLegacyStores.map((store) => store.legacyDir),
]) {
for (const entry of safeReadDir(dir)) {
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
continue;
@@ -2186,27 +2238,31 @@ export async function detectLegacyStateMigrations(params: {
const sessionsLegacyDir = path.join(stateDir, "sessions");
const sessionsLegacyStorePath = path.join(sessionsLegacyDir, "sessions.json");
const sessionsAgentLegacyDir = path.join(stateDir, "agents", targetAgentId, "sessions");
const sessionsAgentLegacyStorePath = path.join(sessionsAgentLegacyDir, "sessions.json");
const hasAgentLegacyJsonSessionStore =
includeSessions && fileExists(sessionsAgentLegacyStorePath);
const legacySessionEntries = includeSessions ? safeReadDir(sessionsLegacyDir) : [];
const hasLegacySessions =
(includeSessions && fileExists(sessionsLegacyStorePath)) ||
legacySessionEntries.some((e) => e.isFile() && e.name.endsWith(".jsonl"));
const targetSessionParsed = hasAgentLegacyJsonSessionStore
? readSessionStoreJson5(sessionsAgentLegacyStorePath)
: { store: {}, ok: true };
const legacyKeys =
includeSessions && targetSessionParsed.ok
? listLegacySessionKeys({
store: targetSessionParsed.store,
agentId: targetAgentId,
mainKey: targetMainKey,
scope: targetScope,
})
: [];
const agentLegacyStores = detectAgentLegacySessionStores({
cfg: params.cfg,
includeSessions,
stateDir,
targetAgentId,
targetMainKey,
targetScope,
});
const targetAgentLegacyStore =
agentLegacyStores.find((store) => store.agentId === targetAgentId) ?? agentLegacyStores[0];
const sessionsAgentLegacyDir =
targetAgentLegacyStore?.legacyDir ?? path.join(stateDir, "agents", targetAgentId, "sessions");
const sessionsAgentLegacyStorePath =
targetAgentLegacyStore?.legacyStorePath ?? path.join(sessionsAgentLegacyDir, "sessions.json");
const legacyKeys = agentLegacyStores.flatMap((store) => store.legacyKeys);
const hasAgentLegacyJsonSessionStore = agentLegacyStores.some(
(store) => store.hasLegacyJsonSessionStore,
);
const hasAgentLegacyJsonlSessions = agentLegacyStores.some((store) =>
safeReadDir(store.legacyDir).some((entry) => entry.isFile() && entry.name.endsWith(".jsonl")),
);
const legacyAgentDir = path.join(stateDir, "agent");
const targetAgentDir = path.join(stateDir, "agents", targetAgentId, "agent");
@@ -2225,10 +2281,14 @@ export async function detectLegacyStateMigrations(params: {
preview.push(`- Sessions: import ${sessionsLegacyDir} into SQLite`);
}
if (legacyKeys.length > 0) {
preview.push(`- Sessions: canonicalize legacy keys in ${sessionsAgentLegacyStorePath}`);
for (const store of agentLegacyStores.filter((store) => store.legacyKeys.length > 0)) {
preview.push(`- Sessions: canonicalize legacy keys in ${store.legacyStorePath}`);
}
}
if (hasAgentLegacyJsonSessionStore) {
preview.push(`- Sessions: import ${sessionsAgentLegacyStorePath} into SQLite`);
for (const store of agentLegacyStores.filter((store) => store.hasLegacyJsonSessionStore)) {
preview.push(`- Sessions: import ${store.legacyStorePath} into SQLite`);
}
}
if (hasLegacyAgentDir) {
preview.push(`- Agent dir: ${legacyAgentDir}${targetAgentDir}`);
@@ -2250,7 +2310,12 @@ export async function detectLegacyStateMigrations(params: {
legacyStorePath: sessionsLegacyStorePath,
agentLegacyDir: sessionsAgentLegacyDir,
agentLegacyStorePath: sessionsAgentLegacyStorePath,
hasLegacy: hasLegacySessions || legacyKeys.length > 0 || hasAgentLegacyJsonSessionStore,
agentLegacyStores,
hasLegacy:
hasLegacySessions ||
legacyKeys.length > 0 ||
hasAgentLegacyJsonSessionStore ||
hasAgentLegacyJsonlSessions,
legacyKeys,
},
agentDir: {
@@ -2278,19 +2343,7 @@ async function migrateLegacySessions(
const legacyParsed = fileExists(detected.sessions.legacyStorePath)
? readSessionStoreJson5(detected.sessions.legacyStorePath)
: { store: {}, ok: true };
const agentLegacyParsed = fileExists(detected.sessions.agentLegacyStorePath)
? readSessionStoreJson5(detected.sessions.agentLegacyStorePath)
: { store: {}, ok: true };
const hasAgentLegacySessionStoreFile = fileExists(detected.sessions.agentLegacyStorePath);
const legacyStore = legacyParsed.store;
const agentLegacyStore = agentLegacyParsed.store;
const canonicalizedAgentLegacy = canonicalizeSessionStore({
store: agentLegacyStore,
agentId: detected.targetAgentId,
mainKey: detected.targetMainKey,
scope: detected.targetScope,
});
const canonicalizedLegacy = canonicalizeSessionStore({
store: legacyStore,
agentId: detected.targetAgentId,
@@ -2298,39 +2351,71 @@ async function migrateLegacySessions(
scope: detected.targetScope,
});
const merged: Record<string, SessionEntryLike> = { ...canonicalizedAgentLegacy.store };
for (const [key, entry] of Object.entries(canonicalizedLegacy.store)) {
merged[key] = mergeSessionEntry({
existing: merged[key],
incoming: entry,
preferIncomingOnTie: false,
});
}
const mainKey = buildAgentMainSessionKey({
agentId: detected.targetAgentId,
mainKey: detected.targetMainKey,
});
if (!merged[mainKey]) {
const latest = pickLatestLegacyDirectEntry(legacyStore);
if (latest?.sessionId) {
merged[mainKey] = latest;
changes.push(`Migrated latest direct-chat session → ${mainKey}`);
}
}
if (!legacyParsed.ok) {
warnings.push(
`Legacy sessions store unreadable; left in place at ${detected.sessions.legacyStorePath}`,
);
}
if (
(legacyParsed.ok || agentLegacyParsed.ok) &&
(Object.keys(legacyStore).length > 0 ||
for (const agentStore of detected.sessions.agentLegacyStores) {
const agentLegacyParsed = fileExists(agentStore.legacyStorePath)
? readSessionStoreJson5(agentStore.legacyStorePath)
: { store: {}, ok: true };
const agentLegacyStore = agentLegacyParsed.store;
const canonicalizedAgentLegacy = canonicalizeSessionStore({
store: agentLegacyStore,
agentId: agentStore.agentId,
mainKey: detected.targetMainKey,
scope: detected.targetScope,
});
const merged: Record<string, SessionEntryLike> = { ...canonicalizedAgentLegacy.store };
if (agentStore.agentId === detected.targetAgentId) {
for (const [key, entry] of Object.entries(canonicalizedLegacy.store)) {
merged[key] = mergeSessionEntry({
existing: merged[key],
incoming: entry,
preferIncomingOnTie: false,
});
}
const mainKey = buildAgentMainSessionKey({
agentId: detected.targetAgentId,
mainKey: detected.targetMainKey,
});
if (!merged[mainKey]) {
const latest = pickLatestLegacyDirectEntry(legacyStore);
if (latest?.sessionId) {
merged[mainKey] = latest;
changes.push(`Migrated latest direct-chat session → ${mainKey}`);
}
}
}
if (!agentLegacyParsed.ok) {
warnings.push(
`Legacy sessions store unreadable; left in place at ${agentStore.legacyStorePath}`,
);
}
const hasRootLegacyForTarget =
agentStore.agentId === detected.targetAgentId && Object.keys(legacyStore).length > 0;
const hasAgentLegacyRows =
Object.keys(agentLegacyStore).length > 0 ||
(hasAgentLegacySessionStoreFile && agentLegacyParsed.ok))
) {
(agentStore.hasLegacyJsonSessionStore && agentLegacyParsed.ok);
if (!hasRootLegacyForTarget && !hasAgentLegacyRows) {
continue;
}
if (
agentStore.agentId === detected.targetAgentId &&
!legacyParsed.ok &&
!agentLegacyParsed.ok
) {
continue;
}
if (agentStore.agentId !== detected.targetAgentId && !agentLegacyParsed.ok) {
continue;
}
const normalized: Record<string, SessionEntry> = {};
for (const [key, entry] of Object.entries(merged)) {
const normalizedEntry = normalizeSessionEntry(entry);
@@ -2341,17 +2426,17 @@ async function migrateLegacySessions(
}
const imported = mergeSqliteSessionEntries(
{
agentId: detected.targetAgentId,
agentId: agentStore.agentId,
env: detected.env,
},
normalized,
);
changes.push(
`Imported ${imported.imported} session index row(s) into SQLite for agent ${detected.targetAgentId}`,
`Imported ${imported.imported} session index row(s) into SQLite for agent ${agentStore.agentId}`,
);
if (agentLegacyParsed.ok && fileExists(detected.sessions.agentLegacyStorePath)) {
if (agentLegacyParsed.ok && fileExists(agentStore.legacyStorePath)) {
try {
fs.rmSync(detected.sessions.agentLegacyStorePath, { force: true });
fs.rmSync(agentStore.legacyStorePath, { force: true });
} catch {
// ignore
}
@@ -2390,24 +2475,26 @@ async function migrateLegacySessions(
}
}
const agentLegacyEntries = safeReadDir(detected.sessions.agentLegacyDir);
for (const entry of agentLegacyEntries) {
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
continue;
}
const transcriptPath = path.join(detected.sessions.agentLegacyDir, entry.name);
try {
const imported = importLegacyTranscriptFileToSqlite({
sourcePath: transcriptPath,
agentId: detected.targetAgentId,
env: detected.env,
});
fs.rmSync(transcriptPath, { force: true });
changes.push(
`Imported canonical ${entry.name} transcript (${imported.imported} event(s)) into SQLite for agent ${detected.targetAgentId}`,
);
} catch (err) {
warnings.push(`Failed importing transcript ${transcriptPath}: ${String(err)}`);
for (const agentStore of detected.sessions.agentLegacyStores) {
const agentLegacyEntries = safeReadDir(agentStore.legacyDir);
for (const entry of agentLegacyEntries) {
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
continue;
}
const transcriptPath = path.join(agentStore.legacyDir, entry.name);
try {
const imported = importLegacyTranscriptFileToSqlite({
sourcePath: transcriptPath,
agentId: agentStore.agentId,
env: detected.env,
});
fs.rmSync(transcriptPath, { force: true });
changes.push(
`Imported canonical ${entry.name} transcript (${imported.imported} event(s)) into SQLite for agent ${agentStore.agentId}`,
);
} catch (err) {
warnings.push(`Failed importing transcript ${transcriptPath}: ${String(err)}`);
}
}
}
@@ -2422,6 +2509,9 @@ async function migrateLegacySessions(
}
removeDirIfEmpty(detected.sessions.legacyDir);
for (const agentStore of detected.sessions.agentLegacyStores) {
removeDirIfEmpty(agentStore.legacyDir);
}
const legacyLeft = safeReadDir(detected.sessions.legacyDir).filter((e) => e.isFile());
if (legacyLeft.length > 0) {
warnings.push(
@@ -2430,6 +2520,16 @@ async function migrateLegacySessions(
.join(", ")}`,
);
}
for (const agentStore of detected.sessions.agentLegacyStores) {
const agentLegacyLeft = safeReadDir(agentStore.legacyDir).filter((e) => e.isFile());
if (agentLegacyLeft.length > 0) {
warnings.push(
`Left legacy sessions in place at ${agentStore.legacyDir}: ${agentLegacyLeft
.map((entry) => entry.name)
.join(", ")}`,
);
}
}
return { changes, warnings };
}

View File

@@ -53,6 +53,7 @@ export type LoadSqliteSessionTranscriptTailEventsOptions = SqliteSessionTranscri
export type SqliteSessionTranscriptScope = {
agentId: string;
path?: string;
sessionId: string;
};
@@ -427,6 +428,7 @@ export function resolveSqliteSessionTranscriptScope(
if (options.agentId?.trim()) {
return {
agentId: normalizeAgentId(options.agentId),
path: options.path,
sessionId,
};
}

View File

@@ -54,6 +54,7 @@ function setupState(prefix = "openclaw-session-utils-sqlite-") {
function seedTranscript(params: {
sessionId: string;
agentId?: string;
path?: string;
events: TranscriptEvent[];
}) {
if (!stateDir) {
@@ -62,6 +63,7 @@ function seedTranscript(params: {
const agentId = params.agentId ?? "main";
replaceSqliteSessionTranscriptEvents({
agentId,
path: params.path,
sessionId: params.sessionId,
events: params.events,
now: () => 1_778_100_000_000,
@@ -247,6 +249,45 @@ describe("SQLite transcript readers", () => {
});
});
test("preserves explicit SQLite database paths for scoped transcript reads", () => {
setupState();
const sessionId = "custom-db-session";
const customPath = path.join(stateDir, "registered", "ops.sqlite");
const scope = { agentId: "ops", path: customPath, sessionId };
seedTranscript({
agentId: "ops",
path: customPath,
sessionId,
events: [
header(sessionId),
message("user", "from custom db"),
message("assistant", "custom reply", {
provider: "openai",
model: "gpt-5.4",
usage: { input: 1, output: 2 },
}),
],
});
expect(readSessionMessages({ agentId: "ops", sessionId })).toEqual([]);
expect(readSessionMessages(scope)).toEqual([
expect.objectContaining({ content: "from custom db" }),
expect.objectContaining({ content: "custom reply" }),
]);
expect(readRecentSessionMessages(scope, { maxMessages: 1 })).toEqual([
expect.objectContaining({ content: "custom reply" }),
]);
expect(readRecentSessionMessagesWithStats(scope, { maxMessages: 1 })).toMatchObject({
totalMessages: 2,
messages: [{ __openclaw: { seq: 2 }, content: "custom reply" }],
});
expect(readSessionMessageCount(scope)).toBe(2);
expect(readLatestSessionUsageFromTranscript(scope)).toMatchObject({
inputTokens: 1,
outputTokens: 2,
});
});
test("reads transcript event windows from SQLite for manual compaction", () => {
setupState();
const sessionId = "manual-window";

View File

@@ -126,6 +126,7 @@ function sqliteTranscriptEventToRecord(event: unknown): TailTranscriptRecord | n
function loadScopedTranscriptRecords(params: {
agentId?: string;
path?: string;
sessionId: string;
}): TailTranscriptRecord[] | undefined {
return loadScopedTranscriptEvents(params)?.flatMap((event) => {
@@ -136,6 +137,7 @@ function loadScopedTranscriptRecords(params: {
function loadScopedTranscriptTailRecords(params: {
agentId?: string;
path?: string;
maxBytes?: number;
maxEvents: number;
sessionId: string;
@@ -245,6 +247,7 @@ function transcriptRecordsToMessages(records: TailTranscriptRecord[]): unknown[]
function loadScopedSessionMessages(params: {
agentId?: string;
path?: string;
sessionId: string;
}): unknown[] | undefined {
const records = loadScopedTranscriptRecords(params);
@@ -253,6 +256,7 @@ function loadScopedSessionMessages(params: {
function loadScopedRecentSessionMessages(params: {
agentId?: string;
path?: string;
maxBytes?: number;
maxMessages: number;
maxLines?: number;
@@ -264,6 +268,7 @@ function loadScopedRecentSessionMessages(params: {
);
const records = loadScopedTranscriptTailRecords({
agentId: params.agentId,
path: params.path,
maxEvents,
sessionId: params.sessionId,
...(params.maxBytes !== undefined ? { maxBytes: params.maxBytes } : {}),
@@ -275,6 +280,7 @@ function loadScopedRecentSessionMessages(params: {
function countScopedSessionMessages(params: {
agentId?: string;
path?: string;
sessionId: string;
}): number | undefined {
if (!params.sessionId.trim()) {
@@ -283,6 +289,7 @@ function countScopedSessionMessages(params: {
try {
const scope = resolveSqliteSessionTranscriptScope({
agentId: params.agentId,
path: params.path,
sessionId: params.sessionId,
});
if (!scope || !hasSqliteSessionTranscriptEvents(scope)) {
@@ -330,6 +337,7 @@ export function readRecentSessionMessages(
return (
loadScopedRecentSessionMessages({
agentId: scope.agentId,
path: scope.path,
sessionId: scope.sessionId,
maxMessages,
...(opts?.maxBytes !== undefined ? { maxBytes: opts.maxBytes } : {}),
@@ -758,7 +766,13 @@ function extractLatestUsageFromTranscriptEvents(
return latest;
}
function loadUsageEvents(params: { sessionId: string; agentId?: string }): unknown[] | undefined {
function loadUsageEvents(params: {
sessionId: string;
agentId?: string;
path?: string;
}): unknown[] | undefined {
// Usage callers pass SessionTranscriptReadScope; keep the signature aligned so custom DB paths
// stay scoped even though most call sites only need the default agent database.
return loadScopedTranscriptEvents(params);
}