fix(sessions): preserve registered sqlite targets

This commit is contained in:
Peter Steinberger
2026-05-15 19:03:28 +01:00
parent 380eb4d35b
commit 2f4da78be3
9 changed files with 116 additions and 17 deletions

View File

@@ -162,7 +162,11 @@ export async function compactEmbeddedAgentSession(
agentId: params.agentId,
config: params.config,
});
const transcriptScope = { agentId: agentIds.sessionAgentId, sessionId: params.sessionId };
const transcriptScope = {
agentId: agentIds.sessionAgentId,
path: params.path,
sessionId: params.sessionId,
};
const agentDir = params.agentDir ?? resolveAgentDir(params.config ?? {}, agentIds.sessionAgentId);
const resolvedWorkspaceDir = resolveUserPath(params.workspaceDir);
const contextEngine = await resolveContextEngine(params.config, {
@@ -268,6 +272,7 @@ export async function compactEmbeddedAgentSession(
checkpointSnapshot = engineOwnsCompaction
? await captureCompactionCheckpointSnapshotAsync({
agentId: sessionAgentId,
path: params.path,
sessionId: params.sessionId,
})
: null;
@@ -317,6 +322,7 @@ export async function compactEmbeddedAgentSession(
let postCompactionSessionId = delegatedSessionId ?? params.sessionId;
let postCompactionTranscriptScope = {
agentId: agentIds.sessionAgentId,
path: params.path,
sessionId: postCompactionSessionId,
};
let postCompactionLeafId: string | undefined;
@@ -325,12 +331,14 @@ export async function compactEmbeddedAgentSession(
try {
const rotation = await rotateSqliteTranscriptAfterCompaction({
agentId: agentIds.sessionAgentId,
path: params.path,
sessionId: params.sessionId,
});
if (rotation.rotated) {
postCompactionSessionId = rotation.sessionId ?? postCompactionSessionId;
postCompactionTranscriptScope = {
agentId: agentIds.sessionAgentId,
path: params.path,
sessionId: postCompactionSessionId,
};
postCompactionLeafId = rotation.leafId;

View File

@@ -10,6 +10,7 @@ import type { SkillSnapshot } from "../skills.js";
export type CompactEmbeddedAgentSessionParams = {
sessionId: string;
agentId?: string;
path?: string;
runId?: string;
sessionKey?: string;
/** Session key used only for runtime policy/sandbox resolution. Defaults to sessionKey. */

View File

@@ -11,6 +11,7 @@ import { log } from "./logger.js";
type TranscriptScope = {
agentId: string;
path?: string;
sessionId: string;
};

View File

@@ -35,6 +35,7 @@ export function shouldRotateCompactionTranscript(config?: OpenClawConfig): boole
export async function rotateTranscriptAfterCompaction(params: {
sessionManager: ReadonlySessionManagerForRotation;
agentId: string;
path?: string;
sessionId: string;
now?: () => Date;
}): Promise<CompactionTranscriptRotation> {
@@ -71,6 +72,7 @@ export async function rotateTranscriptAfterCompaction(params: {
});
replaceSqliteSessionTranscriptEvents({
agentId,
path: params.path,
sessionId,
events: [header, ...successorEntries],
});
@@ -87,6 +89,7 @@ export async function rotateTranscriptAfterCompaction(params: {
export async function rotateSqliteTranscriptAfterCompaction(params: {
agentId: string;
path?: string;
sessionId: string;
now?: () => Date;
}): Promise<CompactionTranscriptRotation> {
@@ -97,6 +100,7 @@ export async function rotateSqliteTranscriptAfterCompaction(params: {
return rotateTranscriptAfterCompaction({
sessionManager: state,
agentId: params.agentId,
path: params.path,
sessionId: params.sessionId,
...(params.now ? { now: params.now } : {}),
});
@@ -104,6 +108,7 @@ export async function rotateSqliteTranscriptAfterCompaction(params: {
function loadTranscriptStateFromSqlite(params: {
agentId: string;
path?: string;
sessionId: string;
}): TranscriptState | null {
const sessionId = params.sessionId.trim();
@@ -111,7 +116,7 @@ function loadTranscriptStateFromSqlite(params: {
return null;
}
const agentId = normalizeAgentId(params.agentId);
const events = loadSqliteSessionTranscriptEvents({ agentId, sessionId }).map(
const events = loadSqliteSessionTranscriptEvents({ agentId, path: params.path, sessionId }).map(
(entry) => entry.event,
);
if (events.length === 0) {

View File

@@ -31,11 +31,12 @@ function resolveSessionDatabaseTarget(params: {
};
}
function dedupeTargetsByAgentId(targets: SessionDatabaseTarget[]): SessionDatabaseTarget[] {
function dedupeTargetsByDatabase(targets: SessionDatabaseTarget[]): SessionDatabaseTarget[] {
const deduped = new Map<string, SessionDatabaseTarget>();
for (const target of targets) {
if (!deduped.has(target.agentId)) {
deduped.set(target.agentId, target);
const key = `${target.agentId}\0${target.databasePath}`;
if (!deduped.has(key)) {
deduped.set(key, target);
}
}
return [...deduped.values()];
@@ -103,7 +104,7 @@ export function resolveAllAgentSessionDatabaseTargetsSync(
params: { env?: NodeJS.ProcessEnv } = {},
): SessionDatabaseTarget[] {
const env = params.env ?? process.env;
return dedupeTargetsByAgentId([
return dedupeTargetsByDatabase([
...resolveSessionStoreDiscoveryState(cfg, env),
...resolveRegisteredAgentDatabaseTargets(env),
]);
@@ -120,7 +121,7 @@ export function resolveAgentSessionDatabaseTargetsSync(
const registered = resolveRegisteredAgentDatabaseTargets(env).filter(
(target) => normalizeAgentId(target.agentId) === requested,
);
return dedupeTargetsByAgentId([...configured, ...registered]);
return dedupeTargetsByDatabase([...configured, ...registered]);
}
export async function resolveAllAgentSessionDatabaseTargets(

View File

@@ -152,6 +152,7 @@ export type ContextEngineMaintenanceResult = TranscriptRewriteResult;
export type ContextEngineTranscriptScope = {
agentId: string;
path?: string;
sessionId: string;
};

View File

@@ -243,6 +243,7 @@ function loadSessionRowsForTarget(target: ReturnType<typeof resolveGatewaySessio
} {
const entry = getSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
});
const store = entry ? { [target.canonicalKey]: entry } : {};
@@ -253,9 +254,14 @@ function loadSessionRowsForTarget(target: ReturnType<typeof resolveGatewaySessio
};
}
function loadAgentSessionRows(agentId: string): Record<string, SessionEntry> {
function loadAgentSessionRows(params: {
agentId: string;
databasePath: string;
}): Record<string, SessionEntry> {
return Object.fromEntries(
listSessionEntries({ agentId }).map(({ sessionKey, entry }) => [sessionKey, entry]),
listSessionEntries({ agentId: params.agentId, path: params.databasePath }).map(
({ sessionKey, entry }) => [sessionKey, entry],
),
);
}
@@ -428,10 +434,15 @@ function cloneCheckpointSessionEntry(params: {
function ensureSessionTranscriptScope(params: {
sessionId: string;
agentId: string;
databasePath: string;
}): { ok: true } | { ok: false; error: string } {
try {
if (
!hasSqliteSessionTranscriptEvents({ agentId: params.agentId, sessionId: params.sessionId })
!hasSqliteSessionTranscriptEvents({
agentId: params.agentId,
path: params.databasePath,
sessionId: params.sessionId,
})
) {
const header = {
type: "session",
@@ -442,6 +453,7 @@ function ensureSessionTranscriptScope(params: {
};
appendSqliteSessionTranscriptEvent({
agentId: params.agentId,
path: params.databasePath,
sessionId: params.sessionId,
event: header,
});
@@ -472,7 +484,7 @@ async function createAgentMainSessionForSend(params: {
cfg: params.cfg,
key: params.canonicalKey,
});
const store = loadAgentSessionRows(target.agentId);
const store = loadAgentSessionRows(target);
const patched = await applySessionsPatchToStore({
cfg: params.cfg,
store,
@@ -485,11 +497,13 @@ async function createAgentMainSessionForSend(params: {
}
upsertSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
entry: patched.entry,
});
const ensured = ensureSessionTranscriptScope({
agentId: target.agentId,
databasePath: target.databasePath,
sessionId: patched.entry.sessionId,
});
if (!ensured.ok) {
@@ -1023,7 +1037,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
continue;
}
const items = readSessionPreviewItemsFromTranscript(
{ agentId: target.agentId, sessionId: entry.sessionId },
{ agentId: target.agentId, path: target.databasePath, sessionId: entry.sessionId },
limit,
maxChars,
);
@@ -1265,7 +1279,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
: buildDashboardSessionKey(agentId);
const target = resolveGatewaySessionDatabaseTarget({ cfg, key });
const targetAgentId = resolveAgentIdFromSessionKey(target.canonicalKey);
const createdStore = loadAgentSessionRows(target.agentId);
const createdStore = loadAgentSessionRows(target);
const patched = await applySessionsPatchToStore({
cfg,
store: createdStore,
@@ -1291,6 +1305,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
if (created.ok) {
upsertSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
entry: created.entry,
});
@@ -1302,10 +1317,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const ensured = ensureSessionTranscriptScope({
sessionId: created.entry.sessionId,
agentId: targetAgentId,
databasePath: target.databasePath,
});
if (!ensured.ok) {
deleteSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
});
respond(
@@ -1325,6 +1342,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const messageSeq = initialMessage
? (await readSessionMessageCountAsync({
agentId: target.agentId,
path: target.databasePath,
sessionId: createdEntry.sessionId,
})) + 1
: undefined;
@@ -1452,6 +1470,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
const branchedSession = await forkCompactionCheckpointTranscriptAsync({
agentId: target.agentId,
path: target.databasePath,
sourceSessionId: checkpoint.preCompaction.sessionId,
});
if (!branchedSession?.sessionId) {
@@ -1474,6 +1493,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
upsertSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: nextKey,
entry: nextEntry,
});
@@ -1567,6 +1587,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const target = resolveGatewaySessionDatabaseTarget({ cfg: loaded.cfg, key: canonicalKey });
const restoredSession = await forkCompactionCheckpointTranscriptAsync({
agentId: target.agentId,
path: target.databasePath,
sourceSessionId: checkpoint.preCompaction.sessionId,
});
if (!restoredSession?.sessionId) {
@@ -1586,6 +1607,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
upsertSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: canonicalKey,
entry: nextEntry,
});
@@ -1795,7 +1817,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const { cfg, target } = resolveGatewaySessionTargetFromKey(key, context.getRuntimeConfig());
const loaded = loadSessionRowsForTarget(target);
const patchStore = loadAgentSessionRows(target.agentId);
const patchStore = loadAgentSessionRows(target);
if (loaded.entry) {
patchStore[target.canonicalKey] = loaded.entry;
}
@@ -1812,6 +1834,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
upsertSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
entry: applied.entry,
});
@@ -2010,6 +2033,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const deleted = entry
? deleteSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: deleteKey,
})
: false;
@@ -2056,7 +2080,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const { messages } = await readRecentSessionMessagesWithStatsAsync(
{ agentId: target.agentId, sessionId: entry.sessionId },
{ agentId: target.agentId, path: target.databasePath, sessionId: entry.sessionId },
{
maxMessages: limit,
maxLines: limit * 20 + 20,
@@ -2100,7 +2124,13 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
if (!hasSqliteSessionTranscriptEvents({ agentId: target.agentId, sessionId })) {
if (
!hasSqliteSessionTranscriptEvents({
agentId: target.agentId,
path: target.databasePath,
sessionId,
})
) {
respond(
true,
{
@@ -2136,6 +2166,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const result = await compactEmbeddedPiSession({
sessionId,
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
allowGatewaySubagentBinding: true,
workspaceDir,
@@ -2156,6 +2187,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
if (result.ok && result.compacted) {
await patchSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
fallbackEntry: entry,
update: (entryToUpdate) => {
@@ -2205,6 +2237,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const tail = readRecentSessionTranscriptEvents({
sessionId,
agentId: target.agentId,
path: target.databasePath,
maxEvents: maxLines,
});
const events = tail?.events ?? [];
@@ -2225,12 +2258,14 @@ export const sessionsHandlers: GatewayRequestHandlers = {
replaceSqliteSessionTranscriptEvents({
agentId: target.agentId,
path: target.databasePath,
sessionId,
events,
});
await patchSessionEntry({
agentId: target.agentId,
path: target.databasePath,
sessionKey: target.canonicalKey,
fallbackEntry: entry,
update: (entryToUpdate) => {

View File

@@ -34,6 +34,7 @@ export type ReadRecentSessionMessagesOptions = {
export type SessionTranscriptReadScope = {
agentId?: string;
path?: string;
sessionId: string;
};
@@ -57,6 +58,7 @@ function normalizeTailEntryString(value: unknown): string | undefined {
function loadScopedTranscriptEvents(params: {
agentId?: string;
path?: string;
sessionId: string;
}): unknown[] | undefined {
if (!params.sessionId.trim()) {
@@ -65,6 +67,7 @@ function loadScopedTranscriptEvents(params: {
try {
const scope = resolveSqliteSessionTranscriptScope({
agentId: params.agentId,
path: params.path,
sessionId: params.sessionId,
});
if (!scope || !hasSqliteSessionTranscriptEvents(scope)) {
@@ -78,6 +81,7 @@ function loadScopedTranscriptEvents(params: {
function loadScopedTranscriptTailEvents(params: {
agentId?: string;
path?: string;
maxBytes?: number;
maxEvents: number;
sessionId: string;
@@ -88,6 +92,7 @@ function loadScopedTranscriptTailEvents(params: {
try {
const scope = resolveSqliteSessionTranscriptScope({
agentId: params.agentId,
path: params.path,
sessionId: params.sessionId,
});
if (!scope || !hasSqliteSessionTranscriptEvents(scope)) {
@@ -409,10 +414,12 @@ export async function readRecentSessionMessagesWithStatsAsync(
export function readRecentSessionTranscriptEvents(params: {
sessionId: string;
agentId?: string;
path?: string;
maxEvents: number;
}): { events: unknown[]; totalEvents: number } | null {
const events = loadScopedTranscriptEvents({
agentId: params.agentId,
path: params.path,
sessionId: params.sessionId,
});
if (!events) {

View File

@@ -4,10 +4,11 @@ import path from "node:path";
import { afterEach, describe, expect, test, vi } from "vitest";
import { resetConfigRuntimeState } from "../config/config.js";
import type { OpenClawConfig } from "../config/config.js";
import type { SessionEntry } from "../config/sessions.js";
import { upsertSessionEntry, type SessionEntry } from "../config/sessions.js";
import { replaceSqliteSessionTranscriptEvents } from "../config/sessions/transcript-store.sqlite.js";
import { createEmptyPluginRegistry } from "../plugins/registry-empty.js";
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../plugins/runtime.js";
import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import { withStateDirEnv } from "../test-helpers/state-dir-env.js";
import {
@@ -749,6 +750,45 @@ describe("gateway session utils", () => {
expect(target.canonicalKey).toBe("agent:ops:main");
});
test("resolveGatewaySessionDatabaseTarget preserves registered database path for existing rows", () => {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "session-target-registered-db-"));
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = tmpDir;
try {
const databasePath = path.join(tmpDir, "retired", "openclaw-agent.sqlite");
const cfg = {
session: { mainKey: "main" },
agents: { list: [{ id: "main", default: true }] },
} as OpenClawConfig;
upsertSessionEntry({
agentId: "main",
path: databasePath,
sessionKey: "agent:main:archived",
entry: {
sessionId: "archived-session",
updatedAt: Date.parse("2026-05-01T00:00:00.000Z"),
},
});
const target = resolveGatewaySessionDatabaseTarget({
cfg,
key: "agent:main:archived",
});
expect(target.databasePath).toBe(databasePath);
expect(target.canonicalKey).toBe("agent:main:archived");
} finally {
closeOpenClawAgentDatabasesForTest();
closeOpenClawStateDatabaseForTest();
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(tmpDir, { recursive: true, force: true });
}
});
test("listAgentsForGateway rejects avatar symlink escapes outside workspace", () => {
const root = fs.mkdtempSync(path.join(os.tmpdir(), "session-utils-avatar-outside-"));
const workspace = path.join(root, "workspace");