diff --git a/scripts/check-database-first-legacy-stores.mjs b/scripts/check-database-first-legacy-stores.mjs index 3b010399089..43760a8a823 100644 --- a/scripts/check-database-first-legacy-stores.mjs +++ b/scripts/check-database-first-legacy-stores.mjs @@ -467,6 +467,7 @@ const allowedExactPaths = new Set([ "extensions/codex/src/node-cli-sessions.ts", "src/agents/session-tool-result-guard.ts", "src/infra/restart-sentinel.ts", + "src/plugin-sdk/session-store-runtime.ts", ]); const allowedPrefixes = ["src/commands/doctor", "src/commands/export-trajectory"]; diff --git a/scripts/generate-kysely-types.mjs b/scripts/generate-kysely-types.mjs index 2549c91b065..1be99955a61 100644 --- a/scripts/generate-kysely-types.mjs +++ b/scripts/generate-kysely-types.mjs @@ -24,7 +24,6 @@ const SCHEMAS = [ ]; const verify = process.argv.includes("--verify") || process.argv.includes("--check"); -let codegenTempDir; function run(command, args, options = {}) { const result = spawnSync(command, args, { @@ -42,16 +41,107 @@ function run(command, args, options = {}) { } } -function resolveCodegenBin() { - if (!codegenTempDir) { - codegenTempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-kysely-codegen-")); - run( - "pnpm", - ["add", "--allow-build=better-sqlite3", "kysely-codegen", "typescript", "better-sqlite3"], - { cwd: codegenTempDir }, - ); +function runCapture(command, args, options = {}) { + const result = spawnSync(command, args, { + stdio: ["pipe", "pipe", "inherit"], + input: options.input, + encoding: "utf8", + env: { ...process.env, ...options.env }, + cwd: options.cwd, + }); + if (result.error) { + throw result.error; } - return path.join(codegenTempDir, "node_modules", ".bin", "kysely-codegen"); + if (result.status !== 0) { + process.exit(result.status ?? 1); + } + return result.stdout; +} + +function sqliteJson(dbPath, sql) { + const raw = runCapture("sqlite3", ["-json", dbPath, sql]); + return raw.trim() ? JSON.parse(raw) : []; +} + +function toInterfaceName(tableName) { + return tableName + .split("_") + .map((part) => `${part.slice(0, 1).toUpperCase()}${part.slice(1)}`) + .join(""); +} + +function columnBaseType(columnType) { + const normalized = columnType.toUpperCase(); + if (normalized.includes("BLOB")) { + return "Uint8Array"; + } + if ( + normalized.includes("INT") || + normalized.includes("REAL") || + normalized.includes("FLOA") || + normalized.includes("DOUB") || + normalized.includes("NUM") || + normalized.includes("DEC") + ) { + return "number"; + } + return "string"; +} + +function columnType(column, primaryKeyColumnCount) { + const baseType = columnBaseType(String(column.type ?? "")); + const generated = + column.dflt_value != null || + (primaryKeyColumnCount === 1 && + Number(column.pk) > 0 && + String(column.type ?? "") + .toUpperCase() + .includes("INT")); + const nullable = Number(column.notnull) !== 1 && !generated; + const valueType = nullable ? `${baseType} | null` : baseType; + return generated ? `Generated<${valueType}>` : valueType; +} + +function generateTypes(dbPath) { + const tables = sqliteJson( + dbPath, + "SELECT name FROM sqlite_schema WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name;", + ).map((row) => String(row.name)); + const lines = [ + "/**", + " * This file was generated by kysely-codegen.", + " * Please do not edit it manually.", + " */", + "", + 'import type { ColumnType } from "kysely";', + "", + "export type Generated = T extends ColumnType", + " ? ColumnType", + " : ColumnType;", + "", + ]; + + const interfaces = []; + for (const table of tables) { + const interfaceName = toInterfaceName(table); + interfaces.push({ interfaceName, table }); + lines.push(`export interface ${interfaceName} {`); + const columns = sqliteJson(dbPath, `PRAGMA table_xinfo(${JSON.stringify(table)});`) + .filter((column) => Number(column.hidden) === 0) + .toSorted((left, right) => String(left.name).localeCompare(String(right.name))); + const primaryKeyColumnCount = columns.filter((column) => Number(column.pk) > 0).length; + for (const column of columns) { + lines.push(` ${column.name}: ${columnType(column, primaryKeyColumnCount)};`); + } + lines.push("}", ""); + } + + lines.push("export interface DB {"); + for (const { interfaceName, table } of interfaces) { + lines.push(` ${table}: ${interfaceName};`); + } + lines.push("}", ""); + return lines.join("\n"); } function readUtf8(file) { @@ -81,18 +171,7 @@ function generate(schema) { : schema.schemaOutFile; try { run("sqlite3", [tmpDb], { input: readUtf8(schema.schema) }); - run( - resolveCodegenBin(), - [ - "--dialect", - "sqlite", - "--type-mapping", - '{"BLOB":"Uint8Array","blob":"Uint8Array"}', - "--out-file", - tmpOut, - ], - { env: { DATABASE_URL: tmpDb } }, - ); + fs.writeFileSync(tmpOut, generateTypes(tmpDb)); if (verify && readUtf8(tmpOut) !== readUtf8(schema.outFile)) { console.error(`${schema.outFile} is out of date. Run pnpm db:kysely:gen.`); @@ -109,12 +188,6 @@ function generate(schema) { } } -try { - for (const schema of SCHEMAS) { - generate(schema); - } -} finally { - if (codegenTempDir) { - fs.rmSync(codegenTempDir, { recursive: true, force: true }); - } +for (const schema of SCHEMAS) { + generate(schema); } diff --git a/src/config/sessions/session-entries.session-key-normalization.test.ts b/src/config/sessions/session-entries.session-key-normalization.test.ts index 9e428492960..a1996aa1171 100644 --- a/src/config/sessions/session-entries.session-key-normalization.test.ts +++ b/src/config/sessions/session-entries.session-key-normalization.test.ts @@ -8,7 +8,7 @@ import { } from "../../state/openclaw-agent-db.js"; import { createSuiteTempRootTracker } from "../../test-helpers/temp-dir.js"; import { recordSessionMetaFromInbound, updateLastRoute } from "../sessions.js"; -import { listSessionEntries, upsertSessionEntry } from "./store.js"; +import { listSessionEntries, patchSessionEntry, upsertSessionEntry } from "./store.js"; import type { SessionEntry } from "./types.js"; const CANONICAL_KEY = "agent:main:webchat:dm:mixed-user"; @@ -144,6 +144,33 @@ describe("SQLite session row key normalization", () => { }); }); + it("patches canonical rows when callers pass accepted mixed-case keys", async () => { + upsertSessionEntry({ + agentId: "main", + sessionKey: CANONICAL_KEY, + entry: { + sessionId: "existing-session", + updatedAt: 100, + chatType: "direct", + channel: "webchat", + }, + }); + + await patchSessionEntry({ + agentId: "main", + sessionKey: MIXED_CASE_KEY, + update: () => ({ updatedAt: 200, modelOverride: "gpt-5.5" }), + }); + + const store = readMainSessionRows(); + expect(Object.keys(store)).toEqual([CANONICAL_KEY]); + expect(store[CANONICAL_KEY]).toMatchObject({ + sessionId: "existing-session", + modelOverride: "gpt-5.5", + }); + expect(store[CANONICAL_KEY]?.updatedAt).toBeGreaterThan(100); + }); + it("does not migrate legacy mixed-case entries during runtime updates", async () => { seedRawSessionEntry(MIXED_CASE_KEY, { sessionId: "legacy-session", diff --git a/src/config/sessions/session-entries.sqlite.test.ts b/src/config/sessions/session-entries.sqlite.test.ts index e914bdd12e9..a84d6bc9bc1 100644 --- a/src/config/sessions/session-entries.sqlite.test.ts +++ b/src/config/sessions/session-entries.sqlite.test.ts @@ -594,6 +594,34 @@ describe("SQLite session row backend", () => { }); }); + it("records inbound metadata in the provided state directory", async () => { + const stateDir = createTempDir(); + const env = { OPENCLAW_STATE_DIR: stateDir }; + + await recordSessionMetaFromInbound({ + agentId: "ops", + env, + sessionKey: "discord:main", + ctx: { + Provider: "discord", + ChatType: "direct", + From: "discord:user:U1", + To: "bot", + OriginatingChannel: originatingChannel("discord"), + OriginatingTo: "user:U1", + AccountId: "work", + NativeDirectUserId: "U1", + }, + }); + + expect(getSessionEntry({ agentId: "ops", env, sessionKey: "discord:main" })).toMatchObject({ + channel: "discord", + }); + expect( + fs.existsSync(path.join(stateDir, "agents", "ops", "agent", "openclaw-agent.sqlite")), + ).toBe(true); + }); + it("stores group conversation identity in typed agent rows", async () => { const stateDir = createTempDir(); const env = { OPENCLAW_STATE_DIR: stateDir }; @@ -692,6 +720,68 @@ describe("SQLite session row backend", () => { ).toEqual([{ session_id: "second-session", session_key: "discord:ops" }]); }); + it("keeps aliased routes bound to their own entry rows", () => { + const stateDir = createTempDir(); + const env = { OPENCLAW_STATE_DIR: stateDir }; + + upsertSessionEntry({ + agentId: "ops", + env, + sessionKey: "direct:legacy", + entry: { + sessionId: "shared-session", + updatedAt: 100, + modelOverride: "legacy-model", + }, + }); + upsertSessionEntry({ + agentId: "ops", + env, + sessionKey: "agent:ops:main", + entry: { + sessionId: "shared-session", + updatedAt: 200, + modelOverride: "main-model", + }, + }); + + expect(getSessionEntry({ agentId: "ops", env, sessionKey: "direct:legacy" })).toMatchObject({ + sessionId: "shared-session", + modelOverride: "legacy-model", + }); + expect(getSessionEntry({ agentId: "ops", env, sessionKey: "agent:ops:main" })).toMatchObject({ + sessionId: "shared-session", + modelOverride: "main-model", + }); + expect(listSessionEntries({ agentId: "ops", env })).toHaveLength(2); + expect(loadSqliteSessionEntries({ agentId: "ops", env })).toMatchObject({ + "direct:legacy": { + sessionId: "shared-session", + modelOverride: "legacy-model", + }, + "agent:ops:main": { + sessionId: "shared-session", + modelOverride: "main-model", + }, + }); + appendSqliteSessionTranscriptEvent({ + agentId: "ops", + env, + sessionId: "shared-session", + event: { type: "session", id: "shared-session" }, + }); + + expect(deleteSessionEntry({ agentId: "ops", env, sessionKey: "direct:legacy" })).toBe(true); + expect(getSessionEntry({ agentId: "ops", env, sessionKey: "direct:legacy" })).toBeUndefined(); + expect(getSessionEntry({ agentId: "ops", env, sessionKey: "agent:ops:main" })).toMatchObject({ + sessionId: "shared-session", + modelOverride: "main-model", + }); + expect( + hasSqliteSessionTranscriptEvents({ agentId: "ops", env, sessionId: "shared-session" }), + ).toBe(true); + }); + it("updates one session entry without replacing the whole SQLite store", async () => { const stateDir = createTempDir(); const env = { OPENCLAW_STATE_DIR: stateDir }; diff --git a/src/config/sessions/session-entries.sqlite.ts b/src/config/sessions/session-entries.sqlite.ts index bd638306024..77853b3e23d 100644 --- a/src/config/sessions/session-entries.sqlite.ts +++ b/src/config/sessions/session-entries.sqlite.ts @@ -253,7 +253,11 @@ function selectSessionEntryRows( ) { return db .selectFrom("session_routes as sr") - .innerJoin("session_entries as se", "se.session_id", "sr.session_id") + .innerJoin("session_entries as se", (join) => + join + .onRef("se.session_id", "=", "sr.session_id") + .onRef("se.session_key", "=", "sr.session_key"), + ) .innerJoin("sessions as s", "s.session_id", "se.session_id") .leftJoin("conversations as c", "c.conversation_id", "s.primary_conversation_id") .select([ @@ -649,6 +653,10 @@ function countSessionEntryRows(database: OpenClawAgentDatabase): number { return typeof count === "bigint" ? Number(count) : count; } +function parseSqliteCount(value: number | bigint | undefined): number { + return typeof value === "bigint" ? Number(value) : (value ?? 0); +} + function readProjectedSqliteSessionEntry( database: OpenClawAgentDatabase, sessionKey: string, @@ -883,11 +891,39 @@ export function deleteSqliteSessionEntry( if (!row) { return false; } - const result = executeSqliteQuerySync( + executeSqliteQuerySync( + database.db, + db.deleteFrom("session_entries").where("session_key", "=", options.sessionKey), + ); + executeSqliteQuerySync( + database.db, + db.deleteFrom("session_routes").where("session_key", "=", options.sessionKey), + ); + const remainingRoutes = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("session_routes") + .select((eb) => eb.fn.countAll().as("count")) + .where("session_id", "=", row.session_id), + ); + const remainingEntries = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("session_entries") + .select((eb) => eb.fn.countAll().as("count")) + .where("session_id", "=", row.session_id), + ); + if ( + parseSqliteCount(remainingRoutes?.count) > 0 || + parseSqliteCount(remainingEntries?.count) > 0 + ) { + return true; + } + executeSqliteQuerySync( database.db, db.deleteFrom("sessions").where("session_id", "=", row.session_id), ); - return Number(result.numAffectedRows ?? 0) > 0; + return true; }, options); } diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 96edcbade75..c443b18e472 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -125,12 +125,13 @@ export async function patchSessionEntry( return existing; } const next = mergeSessionEntry(existing, patch); + const normalizedKey = normalizeSessionRowKey(options.sessionKey); const applied = applySqliteSessionEntriesPatch({ agentId: options.agentId, env: options.env, path: options.path, - upsertEntries: { [options.sessionKey]: next }, - expectedEntries: new Map([[options.sessionKey, expected]]), + upsertEntries: { [normalizedKey]: next }, + expectedEntries: new Map([[normalizedKey, expected]]), }); if (applied) { return next; @@ -176,6 +177,7 @@ function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryCon export async function recordSessionMetaFromInbound(params: { agentId?: string; + env?: NodeJS.ProcessEnv; sessionKey: string; ctx: MsgContext; groupResolution?: import("./types.js").GroupKeyResolution | null; @@ -185,6 +187,7 @@ export async function recordSessionMetaFromInbound(params: { const createIfMissing = params.createIfMissing ?? true; const rowOptions = resolveSessionRowOptionsFromSessionKey({ agentId: params.agentId, + env: params.env, sessionKey, }); const normalizedKey = normalizeSessionRowKey(sessionKey); diff --git a/src/config/sessions/transcript-store.sqlite.test.ts b/src/config/sessions/transcript-store.sqlite.test.ts index 86ee6b0514e..4c747ca1714 100644 --- a/src/config/sessions/transcript-store.sqlite.test.ts +++ b/src/config/sessions/transcript-store.sqlite.test.ts @@ -16,9 +16,11 @@ import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db import { appendSqliteSessionTranscriptEvent, appendSqliteSessionTranscriptMessage, + countSqliteSessionTranscriptDisplayMessages, deleteSqliteSessionTranscript, listSqliteSessionTranscripts, loadSqliteSessionTranscriptEvents, + loadSqliteSessionTranscriptTailEvents, recordSqliteSessionTranscriptSnapshot, replaceSqliteSessionTranscriptEvents, } from "./transcript-store.sqlite.js"; @@ -252,6 +254,42 @@ describe("SQLite session transcript store", () => { ).toEqual([{ type: "message", id: "main" }]); }); + it("reads bounded transcript tails without materializing older rows", () => { + const stateDir = createTempDir(); + const env = { OPENCLAW_STATE_DIR: stateDir }; + replaceSqliteSessionTranscriptEvents({ + env, + agentId: "main", + sessionId: "session-1", + events: [ + { type: "session", id: "session-1" }, + ...Array.from({ length: 8 }, (_, index) => ({ + type: "message", + id: `m${index}`, + parentId: index === 0 ? null : `m${index - 1}`, + message: { role: "user", content: `message ${index}` }, + })), + ], + now: () => 100, + }); + + expect( + loadSqliteSessionTranscriptTailEvents({ + env, + agentId: "main", + sessionId: "session-1", + maxEvents: 3, + }).map((entry) => (entry.event as { id?: string }).id), + ).toEqual(["m5", "m6", "m7"]); + expect( + countSqliteSessionTranscriptDisplayMessages({ + env, + agentId: "main", + sessionId: "session-1", + }), + ).toBe(8); + }); + it("lists SQLite transcript scopes", () => { const stateDir = createTempDir(); const env = { OPENCLAW_STATE_DIR: stateDir }; diff --git a/src/config/sessions/transcript-store.sqlite.ts b/src/config/sessions/transcript-store.sqlite.ts index 8aa81b1ba0b..80f874b5b0b 100644 --- a/src/config/sessions/transcript-store.sqlite.ts +++ b/src/config/sessions/transcript-store.sqlite.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; -import type { Insertable } from "kysely"; +import { sql, type Insertable } from "kysely"; import { + executeCompiledSqliteQuerySync, executeSqliteQuerySync, executeSqliteQueryTakeFirstSync, getNodeSqliteKysely, @@ -45,6 +46,11 @@ export type ReplaceSqliteSessionTranscriptEventsOptions = SqliteSessionTranscrip now?: () => number; }; +export type LoadSqliteSessionTranscriptTailEventsOptions = SqliteSessionTranscriptStoreOptions & { + maxBytes?: number; + maxEvents: number; +}; + export type SqliteSessionTranscriptScope = { agentId: string; sessionId: string; @@ -100,6 +106,28 @@ function parseCreatedAt(value: unknown): number { return typeof value === "bigint" ? Number(value) : Number(value); } +function parseTranscriptEventRow(row: { + seq: number | bigint; + event_json: unknown; + created_at: unknown; +}): SqliteSessionTranscriptEvent { + const seq = typeof row.seq === "bigint" ? Number(row.seq) : row.seq; + return { + seq, + event: parseTranscriptEventJson(row.event_json, seq), + createdAt: parseCreatedAt(row.created_at), + }; +} + +function parseSqliteCount(value: unknown): number { + const count = typeof value === "bigint" ? Number(value) : Number(value ?? 0); + return Number.isFinite(count) && count > 0 ? count : 0; +} + +function normalizePositiveInteger(value: number, fallback: number): number { + return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : fallback; +} + function getAgentTranscriptKysely(db: import("node:sqlite").DatabaseSync) { return getNodeSqliteKysely(db); } @@ -631,15 +659,87 @@ export function loadSqliteSessionTranscriptEvents( .select(["seq", "event_json", "created_at"]) .where("session_id", "=", sessionId) .orderBy("seq", "asc"), - ).rows.map((row) => { - const record = row; - const seq = typeof record.seq === "bigint" ? Number(record.seq) : record.seq; - return { - seq, - event: parseTranscriptEventJson(record.event_json, seq), - createdAt: parseCreatedAt(record.created_at), - }; - }); + ).rows.map(parseTranscriptEventRow); +} + +export function loadSqliteSessionTranscriptTailEvents( + options: LoadSqliteSessionTranscriptTailEventsOptions, +): SqliteSessionTranscriptEvent[] { + const { sessionId } = normalizeTranscriptScope(options); + const database = openTranscriptAgentDatabase(options); + const maxEvents = normalizePositiveInteger(options.maxEvents, 1); + const maxBytes = + typeof options.maxBytes === "number" && Number.isFinite(options.maxBytes) + ? Math.max(1024, Math.floor(options.maxBytes)) + : undefined; + const rows = executeSqliteQuerySync( + database.db, + getAgentTranscriptKysely(database.db) + .selectFrom("transcript_events") + .select(["seq", "event_json", "created_at"]) + .where("session_id", "=", sessionId) + .orderBy("seq", "desc") + .limit(maxEvents), + ).rows; + const selected: typeof rows = []; + let bytes = 0; + for (const row of rows) { + const eventBytes = Buffer.byteLength(row.event_json, "utf8") + 1; + if (maxBytes !== undefined && selected.length > 0 && bytes + eventBytes > maxBytes) { + break; + } + selected.push(row); + bytes += eventBytes; + } + return selected.toReversed().map(parseTranscriptEventRow); +} + +export function countSqliteSessionTranscriptDisplayMessages( + options: SqliteSessionTranscriptStoreOptions, +): number { + const { sessionId } = normalizeTranscriptScope(options); + const database = openTranscriptAgentDatabase(options); + const row = executeCompiledSqliteQuerySync( + database.db, + // kysely-allow-raw: recursive CTE; inputs stay parameterized through Kysely. + sql<{ + parent_link_count?: unknown; + active_count?: unknown; + fallback_count?: unknown; + }>` + WITH latest_leaf AS ( + SELECT event_id + FROM transcript_event_identities + WHERE session_id = ${sessionId} AND event_type != 'session' AND has_parent = 1 + ORDER BY seq DESC + LIMIT 1 + ), + active_chain(event_id, parent_id, seq, event_type) AS ( + SELECT event_id, parent_id, seq, event_type + FROM transcript_event_identities + WHERE session_id = ${sessionId} AND event_id = (SELECT event_id FROM latest_leaf) + UNION ALL + SELECT parent.event_id, parent.parent_id, parent.seq, parent.event_type + FROM transcript_event_identities AS parent + JOIN active_chain AS child ON child.parent_id = parent.event_id + WHERE parent.session_id = ${sessionId} + ) + SELECT + (SELECT COUNT(*) FROM transcript_event_identities WHERE session_id = ${sessionId} AND has_parent = 1) AS parent_link_count, + (SELECT COUNT(*) FROM active_chain WHERE event_type != 'session') AS active_count, + ( + SELECT COUNT(*) + FROM transcript_events + WHERE session_id = ${sessionId} + AND (instr(event_json, '"message":') > 0 OR instr(event_json, '"type":"compaction"') > 0) + ) AS fallback_count + `.compile(getAgentTranscriptKysely(database.db)), + ).rows[0]; + const parentLinkCount = parseSqliteCount(row?.parent_link_count); + const activeCount = parseSqliteCount(row?.active_count); + return parentLinkCount > 0 && activeCount > 0 + ? activeCount + : parseSqliteCount(row?.fallback_count); } export function hasSqliteSessionTranscriptEvents( diff --git a/src/gateway/session-transcript-readers.ts b/src/gateway/session-transcript-readers.ts index 8d2f54a4cc4..eb35d5dec7a 100644 --- a/src/gateway/session-transcript-readers.ts +++ b/src/gateway/session-transcript-readers.ts @@ -1,7 +1,9 @@ import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js"; import { + countSqliteSessionTranscriptDisplayMessages, hasSqliteSessionTranscriptEvents, loadSqliteSessionTranscriptEvents, + loadSqliteSessionTranscriptTailEvents, resolveSqliteSessionTranscriptScope, } from "../config/sessions/transcript-store.sqlite.js"; import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js"; @@ -74,6 +76,33 @@ function loadScopedTranscriptEvents(params: { } } +function loadScopedTranscriptTailEvents(params: { + agentId?: string; + maxBytes?: number; + maxEvents: number; + sessionId: string; +}): unknown[] | undefined { + if (!params.sessionId.trim()) { + return undefined; + } + try { + const scope = resolveSqliteSessionTranscriptScope({ + agentId: params.agentId, + sessionId: params.sessionId, + }); + if (!scope || !hasSqliteSessionTranscriptEvents(scope)) { + return undefined; + } + return loadSqliteSessionTranscriptTailEvents({ + ...scope, + maxEvents: params.maxEvents, + ...(params.maxBytes !== undefined ? { maxBytes: params.maxBytes } : {}), + }).map((entry) => entry.event); + } catch { + return undefined; + } +} + function sqliteTranscriptEventToRecord(event: unknown): TailTranscriptRecord | null { if (!event || typeof event !== "object" || Array.isArray(event)) { return null; @@ -100,6 +129,18 @@ function loadScopedTranscriptRecords(params: { }); } +function loadScopedTranscriptTailRecords(params: { + agentId?: string; + maxBytes?: number; + maxEvents: number; + sessionId: string; +}): TailTranscriptRecord[] | undefined { + return loadScopedTranscriptTailEvents(params)?.flatMap((event) => { + const record = sqliteTranscriptEventToRecord(event); + return record && record.record.type !== "session" ? [record] : []; + }); +} + function tailRecordHasTreeLink(entry: TailTranscriptRecord): boolean { return ( entry.record.type !== "session" && @@ -205,6 +246,49 @@ function loadScopedSessionMessages(params: { return records ? transcriptRecordsToMessages(selectActiveTranscriptRecords(records)) : undefined; } +function loadScopedRecentSessionMessages(params: { + agentId?: string; + maxBytes?: number; + maxMessages: number; + maxLines?: number; + sessionId: string; +}): unknown[] | undefined { + const maxEvents = Math.max( + params.maxMessages, + Math.floor(params.maxLines ?? params.maxMessages * 20 + 20), + ); + const records = loadScopedTranscriptTailRecords({ + agentId: params.agentId, + maxEvents, + sessionId: params.sessionId, + ...(params.maxBytes !== undefined ? { maxBytes: params.maxBytes } : {}), + }); + return records + ? transcriptRecordsToMessages(selectActiveTranscriptRecords(records)).slice(-params.maxMessages) + : undefined; +} + +function countScopedSessionMessages(params: { + agentId?: string; + sessionId: string; +}): number | undefined { + if (!params.sessionId.trim()) { + return undefined; + } + try { + const scope = resolveSqliteSessionTranscriptScope({ + agentId: params.agentId, + sessionId: params.sessionId, + }); + if (!scope || !hasSqliteSessionTranscriptEvents(scope)) { + return undefined; + } + return countSqliteSessionTranscriptDisplayMessages(scope); + } catch { + return undefined; + } +} + export function attachOpenClawTranscriptMeta( message: unknown, meta: Record, @@ -239,10 +323,13 @@ export function readRecentSessionMessages( return []; } return ( - loadScopedSessionMessages({ + loadScopedRecentSessionMessages({ agentId: scope.agentId, sessionId: scope.sessionId, - })?.slice(-maxMessages) ?? [] + maxMessages, + ...(opts?.maxBytes !== undefined ? { maxBytes: opts.maxBytes } : {}), + ...(opts?.maxLines !== undefined ? { maxLines: opts.maxLines } : {}), + }) ?? [] ); } @@ -258,17 +345,18 @@ export function visitSessionMessages( } export function readSessionMessageCount(scope: SessionTranscriptReadScope): number { - return loadScopedSessionMessages(scope)?.length ?? 0; + return countScopedSessionMessages(scope) ?? 0; } export async function readSessionMessagesAsync( scope: SessionTranscriptReadScope, opts: ReadSessionMessagesAsyncOptions, ): Promise { + if (opts.mode === "recent") { + return readRecentSessionMessages(scope, opts); + } const messages = loadScopedSessionMessages(scope) ?? []; - return opts.mode === "recent" - ? messages.slice(-Math.max(0, Math.floor(opts.maxMessages))) - : messages; + return messages; } export async function visitSessionMessagesAsync( diff --git a/src/infra/backup-create.test.ts b/src/infra/backup-create.test.ts index aa752978005..67e732efbf6 100644 --- a/src/infra/backup-create.test.ts +++ b/src/infra/backup-create.test.ts @@ -246,41 +246,43 @@ describe("createBackupArchive", () => { ); }); - it("dereferences hardlinks instead of emitting restore-hostile Link entries", async () => { + it("omits volatile live state files from the staged archive", async () => { await withOpenClawTestState( { layout: "state-only", - prefix: "openclaw-backup-hardlink-", + prefix: "openclaw-backup-volatile-", scenario: "minimal", }, async (state) => { - const stateDir = state.stateDir; const outputDir = state.path("backups"); - const sourcePath = path.join(stateDir, "workspace-adx", "openclaw-src", "node_modules"); - const targetPath = path.join(sourcePath, "esbuild", "bin", "esbuild"); - const hardlinkPath = path.join(sourcePath, "@esbuild", "darwin-arm64", "bin", "esbuild"); - await fs.mkdir(path.dirname(targetPath), { recursive: true }); - await fs.mkdir(path.dirname(hardlinkPath), { recursive: true }); - await fs.writeFile(targetPath, "binary fixture\n", "utf8"); - await fs.link(targetPath, hardlinkPath); + await fs.mkdir(path.join(state.stateDir, "logs", "nested"), { recursive: true }); + await fs.mkdir(path.join(state.stateDir, "delivery-queue"), { recursive: true }); + await fs.mkdir(path.join(state.stateDir, "sessions", "s-abc"), { recursive: true }); + await fs.writeFile(path.join(state.stateDir, "logs", "nested", "gateway.log"), "tail\n"); + await fs.writeFile(path.join(state.stateDir, "gateway.pid"), "123\n"); + await fs.writeFile(path.join(state.stateDir, "ipc.sock"), ""); + await fs.writeFile(path.join(state.stateDir, "delivery-queue", "pending.json"), "{}\n"); + await fs.writeFile(path.join(state.stateDir, "sessions", "s-abc", "meta.json"), "{}\n"); await fs.mkdir(outputDir, { recursive: true }); const result = await createBackupArchive({ output: outputDir, includeWorkspace: false, - nowMs: Date.UTC(2026, 3, 29, 12, 0, 0), + nowMs: Date.UTC(2026, 4, 10, 12, 0, 0), }); - const entries = await listArchiveEntryDetails(result.archivePath); + const entries = await listArchiveEntries(result.archivePath); - expect(entries.filter((entry) => entry.type === "Link")).toStrictEqual([]); - expect(entries.some((entry) => entry.path.endsWith("/esbuild/bin/esbuild"))).toBe(true); - expect( - entries.some((entry) => entry.path.endsWith("/@esbuild/darwin-arm64/bin/esbuild")), - ).toBe(true); - - const runtime: RuntimeEnv = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; - const verification = await backupVerifyCommand(runtime, { archive: result.archivePath }); - expect(verification.ok).toBe(true); + expect(entries.some((entry) => entry.endsWith("/state/logs/nested/gateway.log"))).toBe( + false, + ); + expect(entries.some((entry) => entry.endsWith("/state/gateway.pid"))).toBe(false); + expect(entries.some((entry) => entry.endsWith("/state/ipc.sock"))).toBe(false); + expect(entries.some((entry) => entry.endsWith("/state/delivery-queue/pending.json"))).toBe( + false, + ); + expect(entries.some((entry) => entry.endsWith("/state/sessions/s-abc/meta.json"))).toBe( + true, + ); }, ); }); diff --git a/src/infra/backup-create.ts b/src/infra/backup-create.ts index 4418783b67b..d82cb0719bf 100644 --- a/src/infra/backup-create.ts +++ b/src/infra/backup-create.ts @@ -11,9 +11,13 @@ import { resolveBackupPlanFromDisk, } from "../commands/backup-shared.js"; import { isPathWithin } from "../commands/cleanup-utils.js"; -import { recordOpenClawStateBackupRun } from "../state/openclaw-state-db.js"; +import { + OPENCLAW_SQLITE_BUSY_TIMEOUT_MS, + recordOpenClawStateBackupRun, +} from "../state/openclaw-state-db.js"; import { resolveHomeDir, resolveUserPath } from "../utils.js"; import { resolveRuntimeServiceVersion } from "../version.js"; +import { isVolatileBackupPath } from "./backup-volatile-filter.js"; import { writeJson } from "./json-files.js"; import { requireNodeSqlite } from "./node-sqlite.js"; import { assertSqliteIntegrityOk } from "./sqlite-integrity.js"; @@ -47,6 +51,7 @@ export type BackupCreateOptions = { verify?: boolean; json?: boolean; nowMs?: number; + log?: (message: string) => void; }; type BackupManifestAsset = { @@ -161,6 +166,71 @@ function buildTempArchivePath(outputPath: string): string { return `${outputPath}.${randomUUID()}.tmp`; } +const BACKUP_TAR_MAX_ATTEMPTS = 3; +const BACKUP_TAR_BACKOFF_MS = [250, 1000] as const; + +function isTarEofRaceError(err: unknown): boolean { + const code = (err as NodeJS.ErrnoException | undefined)?.code; + if (code === "ENOENT" || code === "EOF" || code === "TAR_BAD_ARCHIVE") { + return true; + } + const message = (err as Error | undefined)?.message ?? ""; + return /(did not encounter expected|encountered unexpected) EOF|TAR_BAD_ARCHIVE/i.test(message); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export type BackupTarRetryLogger = (message: string) => void; + +async function writeTarArchiveWithRetry(params: { + tempArchivePath: string; + runTar: () => Promise; + log?: BackupTarRetryLogger; + sleepMs?: (ms: number) => Promise; +}): Promise { + const sleepFn = params.sleepMs ?? sleep; + let lastErr: unknown; + for (let attempt = 1; attempt <= BACKUP_TAR_MAX_ATTEMPTS; attempt += 1) { + try { + await params.runTar(); + return; + } catch (err) { + lastErr = err; + if (!isTarEofRaceError(err) || attempt === BACKUP_TAR_MAX_ATTEMPTS) { + break; + } + try { + await fs.rm(params.tempArchivePath, { force: true }); + } catch (cleanupErr) { + const code = (cleanupErr as NodeJS.ErrnoException).code; + if (code && code !== "ENOENT") { + params.log?.( + `Backup archiver could not remove temp archive ${params.tempArchivePath} between retries: ${code}. Continuing.`, + ); + } + } + const backoff = BACKUP_TAR_BACKOFF_MS[attempt - 1] ?? 0; + const offendingPath = (err as NodeJS.ErrnoException).path; + params.log?.( + `Backup archiver hit a live-write race${ + offendingPath ? ` on ${offendingPath}` : "" + } (attempt ${attempt}/${BACKUP_TAR_MAX_ATTEMPTS}); retrying in ${Math.round(backoff / 1000)}s.`, + ); + await sleepFn(backoff); + } + } + const final = lastErr instanceof Error ? lastErr : new Error(String(lastErr)); + const offendingPath = (lastErr as NodeJS.ErrnoException | undefined)?.path; + const suffix = offendingPath + ? ` (last offending path: ${offendingPath}, after ${BACKUP_TAR_MAX_ATTEMPTS} attempts)` + : ` (after ${BACKUP_TAR_MAX_ATTEMPTS} attempts)`; + throw new Error(`Backup archive write failed: ${final.message}${suffix}`, { cause: final }); +} + +export const __test = { writeTarArchiveWithRetry, isTarEofRaceError }; + // The temp manifest is passed to `tar.c` alongside the asset source paths. If // the temp file lives inside any asset, recursive traversal pulls it in a // second time and both copies remap to `/manifest.json`, which @@ -433,6 +503,7 @@ async function snapshotSqliteDatabase(params: { const sqlite = requireNodeSqlite(); const db = new sqlite.DatabaseSync(params.sourcePath); try { + db.exec(`PRAGMA busy_timeout = ${OPENCLAW_SQLITE_BUSY_TIMEOUT_MS};`); try { db.exec("PRAGMA wal_checkpoint(FULL);"); } catch { @@ -470,10 +541,14 @@ async function stageBackupAssets(params: { } const stagedPath = path.join(params.tempDir, "state-snapshot"); + const volatilePlan = { stateDirs: [asset.sourcePath] }; await fs.cp(asset.sourcePath, stagedPath, { recursive: true, verbatimSymlinks: true, - filter: (source) => !isSqliteDatabasePath(source) && !isSqliteSidecarPath(source), + filter: (source) => + !isSqliteDatabasePath(source) && + !isSqliteSidecarPath(source) && + !isVolatileBackupPath(source, volatilePlan), }); for (const sqlitePath of await listSqliteDatabasePaths(asset.sourcePath)) { @@ -588,24 +663,29 @@ export async function createBackupArchive( const filter = stagedAssets.state ? buildExtensionsNodeModulesFilter(stagedAssets.state.stagedPath) : undefined; - await tar.c( - { - file: tempArchivePath, - ...(filter ? { filter } : {}), - gzip: true, - portable: true, - preservePaths: true, - onWriteEntry: (entry) => { - entry.path = remapArchiveEntryPath({ - entryPath: entry.path, - manifestPath, - archiveRoot, - stagedAssets, - }); - }, - }, - [manifestPath, ...stagedAssets.archivePaths], - ); + await writeTarArchiveWithRetry({ + tempArchivePath, + log: opts.log, + runTar: () => + tar.c( + { + file: tempArchivePath, + ...(filter ? { filter } : {}), + gzip: true, + portable: true, + preservePaths: true, + onWriteEntry: (entry) => { + entry.path = remapArchiveEntryPath({ + entryPath: entry.path, + manifestPath, + archiveRoot, + stagedAssets, + }); + }, + }, + [manifestPath, ...stagedAssets.archivePaths], + ), + }); await publishTempArchive({ tempArchivePath, outputPath }); if (manifest && result.assets.some((asset) => asset.kind === "state")) { recordOpenClawStateBackupRun({ diff --git a/src/infra/backup-volatile-filter.test.ts b/src/infra/backup-volatile-filter.test.ts new file mode 100644 index 00000000000..996122c3a29 --- /dev/null +++ b/src/infra/backup-volatile-filter.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from "vitest"; +import { isVolatileBackupPath } from "./backup-volatile-filter.js"; + +const stateDir = "/opt/openclaw/state"; +const plan = { stateDirs: [stateDir] }; + +describe("isVolatileBackupPath", () => { + it.each([ + [`${stateDir}/sessions/s-abc/transcript.jsonl`, true], + [`${stateDir}/sessions/s-abc/run.log`, true], + [`${stateDir}/agents/main/sessions/transcript.jsonl`, true], + [`${stateDir}/agents/ops/sessions/run.log`, true], + [`${stateDir}/cron/runs/2026-01-01/job.log`, true], + [`${stateDir}/cron/runs/nightly.jsonl`, true], + [`${stateDir}/logs/gateway.jsonl`, true], + [`${stateDir}/logs/nested/gateway.log`, true], + [`${stateDir}/ipc/gateway.sock`, true], + [`${stateDir}/gateway.pid`, true], + [`${stateDir}/tmp/pending.tmp`, true], + [`${stateDir}/delivery-queue/pending.tmp`, true], + [`${stateDir}/delivery-queue/pending.json`, true], + [`${stateDir}/session-delivery-queue/pending.tmp`, true], + [`${stateDir}/session-delivery-queue/pending.json`, true], + [`${stateDir}/sessions/s-abc/meta.json`, false], + [`${stateDir}/agents/main/sessions/sessions.json`, false], + [`${stateDir}/cron/jobs.json`, false], + [`${stateDir}/cron/runs/2026-01-01/job.json`, false], + [`${stateDir}/config.json`, false], + ["/home/user/project/README.md", false], + ["/home/user/project/pending.tmp", false], + ["/home/user/notes/daily.log", false], + ])("classifies %s as volatile=%s", (p, expected) => { + expect(isVolatileBackupPath(p, plan)).toBe(expected); + }); + + it("returns false when no state dirs are provided", () => { + expect( + isVolatileBackupPath(`${stateDir}/sessions/s-abc/transcript.jsonl`, { stateDirs: [] }), + ).toBe(false); + }); + + it("does not match paths that escape the anchor via `..`", () => { + expect(isVolatileBackupPath(`${stateDir}/sessions/../config.jsonl`, plan)).toBe(false); + expect(isVolatileBackupPath(`${stateDir}/cron/runs/../jobs.log`, plan)).toBe(false); + expect(isVolatileBackupPath(`${stateDir}/logs/../notes.jsonl`, plan)).toBe(false); + }); + + it("normalizes Windows-style separators before anchor checks", () => { + const winStateDir = "C:\\openclaw\\state"; + const winPlan = { stateDirs: [winStateDir] }; + expect(isVolatileBackupPath(`${winStateDir}\\sessions\\s-abc\\transcript.jsonl`, winPlan)).toBe( + true, + ); + expect(isVolatileBackupPath(`${winStateDir}\\agents\\main\\sessions\\s.jsonl`, winPlan)).toBe( + true, + ); + expect(isVolatileBackupPath(`${winStateDir}\\cron\\runs\\2026\\job.jsonl`, winPlan)).toBe(true); + expect(isVolatileBackupPath(`${winStateDir}\\sessions\\..\\config.jsonl`, winPlan)).toBe(false); + }); + + it("matches tar filter paths when node-tar omits the leading slash", () => { + expect( + isVolatileBackupPath("opt/openclaw/state/agents/main/sessions/transcript.jsonl", plan), + ).toBe(true); + }); +}); diff --git a/src/infra/backup-volatile-filter.ts b/src/infra/backup-volatile-filter.ts new file mode 100644 index 00000000000..844c6578247 --- /dev/null +++ b/src/infra/backup-volatile-filter.ts @@ -0,0 +1,108 @@ +import path from "node:path"; + +const STATE_TRANSIENT_EXTENSIONS = new Set([".sock", ".pid", ".tmp"]); + +function normalizePosix(input: string): string { + if (!input) { + return input; + } + // Swap Windows-style separators, then collapse `.`/`..` segments so ancestry + // checks cannot be bypassed by a path that traverses out of the anchor. + return path.posix.normalize(input.replaceAll("\\", "/")); +} + +function isUnder(childPosix: string, parentPosix: string): boolean { + if (!parentPosix) { + return false; + } + const p = parentPosix.endsWith("/") ? parentPosix : `${parentPosix}/`; + return childPosix === parentPosix || childPosix.startsWith(p); +} + +function hasExtension(filePosix: string, extensions: readonly string[]): boolean { + const ext = path.posix.extname(filePosix).toLowerCase(); + return extensions.includes(ext); +} + +function hasExtensionInSet(filePosix: string, extensions: ReadonlySet): boolean { + return extensions.has(path.posix.extname(filePosix).toLowerCase()); +} + +function isAgentSessionTranscriptPath(filePosix: string, stateDirPosix: string): boolean { + const agentsRoot = path.posix.join(stateDirPosix, "agents"); + if (!isUnder(filePosix, agentsRoot)) { + return false; + } + const relative = path.posix.relative(agentsRoot, filePosix); + const parts = relative.split("/").filter(Boolean); + return parts.length >= 3 && parts[1] === "sessions"; +} + +function filePathCandidates(input: string): string[] { + const normalized = normalizePosix(input); + if (normalized.startsWith("/") || /^[A-Za-z]:\//u.test(normalized)) { + return [normalized]; + } + // node-tar may pass absolute input paths to filters without the leading + // slash, even when the source list used absolute paths. + return [normalized, normalizePosix(`/${normalized}`)]; +} + +export type VolatileFilterPlan = { + /** Canonical state directories the filter should treat as volatile anchors. */ + stateDirs: string[]; +}; + +export function isVolatileBackupPath(absolutePath: string, plan: VolatileFilterPlan): boolean { + if (!absolutePath) { + return false; + } + const candidates = filePathCandidates(absolutePath); + + for (const stateDir of plan.stateDirs) { + if (!stateDir) { + continue; + } + const stateDirPosix = normalizePosix(stateDir); + + for (const filePosix of candidates) { + const sessionsRoot = path.posix.join(stateDirPosix, "sessions"); + if (isUnder(filePosix, sessionsRoot) && hasExtension(filePosix, [".jsonl", ".log"])) { + return true; + } + + if ( + isAgentSessionTranscriptPath(filePosix, stateDirPosix) && + hasExtension(filePosix, [".jsonl", ".log"]) + ) { + return true; + } + + const cronRunsRoot = path.posix.join(stateDirPosix, "cron", "runs"); + if (isUnder(filePosix, cronRunsRoot) && hasExtension(filePosix, [".jsonl", ".log"])) { + return true; + } + + const logsRoot = path.posix.join(stateDirPosix, "logs"); + if (isUnder(filePosix, logsRoot) && hasExtension(filePosix, [".jsonl", ".log"])) { + return true; + } + + for (const queueDir of ["delivery-queue", "session-delivery-queue"]) { + const queueRoot = path.posix.join(stateDirPosix, queueDir); + if (isUnder(filePosix, queueRoot) && hasExtension(filePosix, [".json", ".tmp"])) { + return true; + } + } + + if ( + isUnder(filePosix, stateDirPosix) && + hasExtensionInSet(filePosix, STATE_TRANSIENT_EXTENSIONS) + ) { + return true; + } + } + } + + return false; +} diff --git a/src/infra/stale-lock-file.ts b/src/infra/stale-lock-file.ts new file mode 100644 index 00000000000..79ea94cc3e1 --- /dev/null +++ b/src/infra/stale-lock-file.ts @@ -0,0 +1,35 @@ +import { isPidDefinitelyDead as defaultIsPidDefinitelyDead } from "../shared/pid-alive.js"; + +export type LockFileOwnerPayload = { + pid?: number; + createdAt?: string; +}; + +export function readLockFileOwnerPayload( + payload: Record | null, +): LockFileOwnerPayload | null { + if (!payload) { + return null; + } + return { + pid: typeof payload.pid === "number" ? payload.pid : undefined, + createdAt: typeof payload.createdAt === "string" ? payload.createdAt : undefined, + }; +} + +export function shouldRemoveDeadOwnerOrExpiredLock(params: { + payload: Record | null; + staleMs: number; + nowMs?: number; + isPidDefinitelyDead?: (pid: number) => boolean; +}): boolean { + const payload = readLockFileOwnerPayload(params.payload); + if (payload?.pid) { + return (params.isPidDefinitelyDead ?? defaultIsPidDefinitelyDead)(payload.pid); + } + if (payload?.createdAt) { + const createdAt = Date.parse(payload.createdAt); + return !Number.isFinite(createdAt) || (params.nowMs ?? Date.now()) - createdAt > params.staleMs; + } + return false; +}