fix: preserve sqlite migration metadata

This commit is contained in:
Peter Steinberger
2026-05-17 02:45:47 +01:00
parent 104bbf8aa3
commit 7a7e5ceb10
6 changed files with 269 additions and 24 deletions

View File

@@ -458,6 +458,124 @@ describe("state migrations", () => {
await expectMissingPath(legacyPluginStatePath);
});
it("preserves typed metadata when importing legacy delivery queues", async () => {
const root = await createTempDir();
const stateDir = path.join(root, ".openclaw");
const env = createEnv(stateDir);
const cfg = createConfig();
const outboundQueueDir = path.join(stateDir, "delivery-queue");
const sessionQueueDir = path.join(stateDir, "session-delivery-queue");
await fs.mkdir(outboundQueueDir, { recursive: true });
await fs.mkdir(sessionQueueDir, { recursive: true });
await fs.writeFile(
path.join(outboundQueueDir, "outbound-1.json"),
`${JSON.stringify({
id: "outbound-1",
channel: "discord",
to: "channel-1",
accountId: "account-1",
session: { key: "agent:main:desk", requesterAccountId: "session-account" },
retryCount: 3,
lastAttemptAt: 111,
lastError: "network",
recoveryState: "unknown_after_send",
platformSendStartedAt: 222,
})}\n`,
"utf8",
);
await fs.writeFile(
path.join(sessionQueueDir, "session-1.json"),
`${JSON.stringify({
id: "session-1",
kind: "agentTurn",
sessionKey: "agent:main:desk",
route: { channel: "slack", to: "thread-1", accountId: "workspace-1" },
retryCount: 2,
lastAttemptAt: 333,
lastError: "rate limited",
})}\n`,
"utf8",
);
const detected = await detectLegacyStateMigrations({
cfg,
env,
homedir: () => root,
});
expect(detected.preview).toEqual([
`- Outbound delivery queue: ${outboundQueueDir} → SQLite`,
`- Session delivery queue: ${sessionQueueDir} → SQLite`,
]);
const result = await runLegacyStateMigrations({
detected,
now: () => 1234,
});
expect(result.warnings).toStrictEqual([]);
expect(result.changes).toEqual([
"Imported 1 outbound delivery queue row(s) into SQLite",
"Imported 1 session delivery queue row(s) into SQLite",
]);
const stateDatabase = openOpenClawStateDatabase({ env });
const db = getNodeSqliteKysely<DeliveryQueueTestDatabase>(stateDatabase.db);
const rows = executeSqliteQuerySync(
stateDatabase.db,
db
.selectFrom("delivery_queue_entries")
.select([
"queue_name",
"id",
"entry_kind",
"session_key",
"channel",
"target",
"account_id",
"retry_count",
"last_attempt_at",
"last_error",
"recovery_state",
"platform_send_started_at",
])
.orderBy("queue_name", "asc"),
).rows;
expect(rows).toEqual([
{
queue_name: "outbound-delivery",
id: "outbound-1",
entry_kind: "outbound",
session_key: "agent:main:desk",
channel: "discord",
target: "channel-1",
account_id: "account-1",
retry_count: 3,
last_attempt_at: 111,
last_error: "network",
recovery_state: "unknown_after_send",
platform_send_started_at: 222,
},
{
queue_name: "session-delivery",
id: "session-1",
entry_kind: "agentTurn",
session_key: "agent:main:desk",
channel: "slack",
target: "thread-1",
account_id: "workspace-1",
retry_count: 2,
last_attempt_at: 333,
last_error: "rate limited",
recovery_state: null,
platform_send_started_at: null,
},
]);
await expectMissingPath(outboundQueueDir);
await expectMissingPath(sessionQueueDir);
});
it("imports legacy Active Memory session toggles into unified plugin state", async () => {
const root = await createTempDir();
const stateDir = path.join(root, ".openclaw");

View File

@@ -412,6 +412,27 @@ type LegacyDeliveryQueueSpec = {
sourcePath: string;
};
type LegacyDeliveryQueueRow = {
queue_name: string;
id: string;
status: "pending" | "failed";
entry_kind: string | null;
session_key: string | null;
channel: string | null;
target: string | null;
account_id: string | null;
retry_count: number;
last_attempt_at: number | null;
last_error: string | null;
recovery_state: string | null;
platform_send_started_at: number | null;
entry_json: string;
enqueued_at: number;
updated_at: number;
failed_at: number | null;
sourcePath: string;
};
type LegacyCurrentConversationBindingsFile = {
version?: unknown;
bindings?: unknown;
@@ -419,17 +440,93 @@ type LegacyCurrentConversationBindingsFile = {
const CURRENT_CONVERSATION_BINDINGS_ID_PREFIX = "generic:";
function readLegacyQueueJson(filePath: string, id: string, enqueuedAt: number): string {
function readLegacyQueueRecord(
filePath: string,
id: string,
enqueuedAt: number,
): Record<string, unknown> {
const parsed = JSON.parse(fs.readFileSync(filePath, "utf-8")) as unknown;
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return JSON.stringify({ id, enqueuedAt, retryCount: 0 });
return { id, enqueuedAt, retryCount: 0 };
}
return JSON.stringify({
return {
id,
enqueuedAt,
retryCount: 0,
...(parsed as Record<string, unknown>),
});
};
}
function legacyQueueText(value: unknown): string | null {
return typeof value === "string" && value.trim() ? value.trim() : null;
}
function legacyQueueNumber(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null;
}
function legacyQueueRecord(value: unknown): Record<string, unknown> {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: {};
}
function legacyQueueNestedText(
record: Record<string, unknown>,
key: string,
field: string,
): string | null {
return legacyQueueText(legacyQueueRecord(record[key])[field]);
}
function legacyDeliveryQueueRow(params: {
spec: LegacyDeliveryQueueSpec;
filePath: string;
status: "pending" | "failed";
enqueuedAt: number;
failedAt: number | null;
}): LegacyDeliveryQueueRow {
const id = path.basename(params.filePath, ".json");
const record = readLegacyQueueRecord(params.filePath, id, params.enqueuedAt);
const sessionKey =
legacyQueueText(record.sessionKey) ??
legacyQueueNestedText(record, "session", "key") ??
legacyQueueNestedText(record, "mirror", "sessionKey");
const channel =
legacyQueueText(record.channel) ??
legacyQueueNestedText(record, "route", "channel") ??
legacyQueueNestedText(record, "deliveryContext", "channel");
const target =
legacyQueueText(record.to) ??
legacyQueueNestedText(record, "route", "to") ??
legacyQueueNestedText(record, "deliveryContext", "to");
const accountId =
legacyQueueText(record.accountId) ??
legacyQueueNestedText(record, "session", "requesterAccountId") ??
legacyQueueNestedText(record, "route", "accountId") ??
legacyQueueNestedText(record, "deliveryContext", "accountId");
return {
queue_name: params.spec.queueName,
id,
status: params.status,
entry_kind:
legacyQueueText(record.kind) ??
(params.spec.queueName === "outbound-delivery" ? "outbound" : null),
session_key: sessionKey,
channel,
target,
account_id: accountId,
retry_count: legacyQueueNumber(record.retryCount) ?? 0,
last_attempt_at: legacyQueueNumber(record.lastAttemptAt),
last_error: legacyQueueText(record.lastError),
recovery_state: legacyQueueText(record.recoveryState),
platform_send_started_at: legacyQueueNumber(record.platformSendStartedAt),
entry_json: JSON.stringify(record),
enqueued_at: params.enqueuedAt,
updated_at: Date.now(),
failed_at: params.failedAt,
sourcePath: params.filePath,
};
}
function listLegacyQueueFiles(queueDir: string): {
@@ -462,21 +559,9 @@ function importLegacyDeliveryQueueToSqlite(
].flatMap(({ filePath, status }) => {
try {
const stat = fs.statSync(filePath);
const id = path.basename(filePath, ".json");
const enqueuedAt = stat.mtimeMs > 0 ? Math.trunc(stat.mtimeMs) : Date.now();
const failedAt = status === "failed" ? enqueuedAt : null;
return [
{
queue_name: spec.queueName,
id,
status,
entry_json: readLegacyQueueJson(filePath, id, enqueuedAt),
enqueued_at: enqueuedAt,
updated_at: Date.now(),
failed_at: failedAt,
sourcePath: filePath,
},
];
return [legacyDeliveryQueueRow({ spec, filePath, status, enqueuedAt, failedAt })];
} catch {
return [];
}
@@ -494,6 +579,16 @@ function importLegacyDeliveryQueueToSqlite(
queue_name: row.queue_name,
id: row.id,
status: row.status,
entry_kind: row.entry_kind,
session_key: row.session_key,
channel: row.channel,
target: row.target,
account_id: row.account_id,
retry_count: row.retry_count,
last_attempt_at: row.last_attempt_at,
last_error: row.last_error,
recovery_state: row.recovery_state,
platform_send_started_at: row.platform_send_started_at,
entry_json: row.entry_json,
enqueued_at: row.enqueued_at,
updated_at: row.updated_at,
@@ -502,6 +597,16 @@ function importLegacyDeliveryQueueToSqlite(
.onConflict((conflict) =>
conflict.columns(["queue_name", "id"]).doUpdateSet({
status: row.status,
entry_kind: row.entry_kind,
session_key: row.session_key,
channel: row.channel,
target: row.target,
account_id: row.account_id,
retry_count: row.retry_count,
last_attempt_at: row.last_attempt_at,
last_error: row.last_error,
recovery_state: row.recovery_state,
platform_send_started_at: row.platform_send_started_at,
entry_json: row.entry_json,
enqueued_at: row.enqueued_at,
updated_at: row.updated_at,

View File

@@ -91,6 +91,27 @@ describe("openclaw agent database", () => {
expect(registered?.sizeBytes).toBeGreaterThan(0);
});
it("keeps multiple registered paths for the same agent", () => {
const stateDir = createTempStateDir();
const env = { OPENCLAW_STATE_DIR: stateDir };
const relocatedPath = path.join(stateDir, "relocated", "worker-1.sqlite");
const relocated = openOpenClawAgentDatabase({
agentId: "worker-1",
env,
path: relocatedPath,
});
const defaultDatabase = openOpenClawAgentDatabase({
agentId: "worker-1",
env,
});
expect(
listOpenClawRegisteredAgentDatabases({ env })
.filter((entry) => entry.agentId === "worker-1")
.map((entry) => entry.path),
).toEqual([defaultDatabase.path, relocated.path].toSorted());
});
it("configures durable SQLite connection pragmas", () => {
const stateDir = createTempStateDir();
const database = openOpenClawAgentDatabase({

View File

@@ -146,8 +146,7 @@ function registerAgentDatabase(params: {
size_bytes: sizeBytes,
})
.onConflict((conflict) =>
conflict.column("agent_id").doUpdateSet({
path: params.path,
conflict.columns(["agent_id", "path"]).doUpdateSet({
schema_version: OPENCLAW_AGENT_SCHEMA_VERSION,
last_seen_at: lastSeenAt,
size_bytes: sizeBytes,
@@ -166,7 +165,7 @@ export function listOpenClawRegisteredAgentDatabases(
const db = getNodeSqliteKysely<OpenClawAgentRegistryDatabase>(database.db);
const rows = executeSqliteQuerySync(
database.db,
db.selectFrom("agent_databases").selectAll().orderBy("agent_id", "asc"),
db.selectFrom("agent_databases").selectAll().orderBy("agent_id", "asc").orderBy("path", "asc"),
).rows;
return rows.map((row) => ({
agentId: normalizeAgentId(row.agent_id),

View File

@@ -555,11 +555,12 @@ CREATE INDEX IF NOT EXISTS idx_acp_replay_events_session_seq
ON acp_replay_events(session_id, seq);
CREATE TABLE IF NOT EXISTS agent_databases (
agent_id TEXT NOT NULL PRIMARY KEY,
agent_id TEXT NOT NULL,
path TEXT NOT NULL,
schema_version INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
size_bytes INTEGER
size_bytes INTEGER,
PRIMARY KEY (agent_id, path)
);
CREATE TABLE IF NOT EXISTS plugin_state_entries (

View File

@@ -550,11 +550,12 @@ CREATE INDEX IF NOT EXISTS idx_acp_replay_events_session_seq
ON acp_replay_events(session_id, seq);
CREATE TABLE IF NOT EXISTS agent_databases (
agent_id TEXT NOT NULL PRIMARY KEY,
agent_id TEXT NOT NULL,
path TEXT NOT NULL,
schema_version INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
size_bytes INTEGER
size_bytes INTEGER,
PRIMARY KEY (agent_id, path)
);
CREATE TABLE IF NOT EXISTS plugin_state_entries (