fix: align sqlite rebase with main

This commit is contained in:
Peter Steinberger
2026-05-16 06:29:53 +01:00
parent fcca98d9d1
commit b02056a32f
5 changed files with 66 additions and 221 deletions

View File

@@ -1,5 +1,6 @@
import { randomUUID } from "node:crypto";
import { parseAbsoluteTimeMs } from "../../../cron/parse.js";
import { getInvalidPersistedCronJobReason } from "../../../cron/persisted-shape.js";
import { coerceFiniteScheduleNumber } from "../../../cron/schedule.js";
import { inferLegacyName } from "../../../cron/service/normalize.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../../../cron/stagger.js";
@@ -26,7 +27,9 @@ type CronStoreIssueKey =
| "legacyPayloadProvider"
| "legacyTopLevelPayloadFields"
| "legacyTopLevelDeliveryFields"
| "legacyDeliveryMode";
| "legacyDeliveryMode"
| "invalidSchedule"
| "invalidPayload";
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
@@ -234,6 +237,7 @@ export function normalizeStoredCronJobs(
jobs: Array<Record<string, unknown>>,
): NormalizeCronStoreJobsResult {
const issues: CronStoreIssues = {};
const keptJobs: Array<Record<string, unknown>> = [];
let mutated = false;
for (const raw of jobs) {
@@ -560,6 +564,29 @@ export function normalizeStoredCronJobs(
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
const invalidPersistedReason = getInvalidPersistedCronJobReason(raw);
if (
invalidPersistedReason === "missing-schedule" ||
invalidPersistedReason === "invalid-schedule"
) {
trackIssue("invalidSchedule");
mutated = true;
continue;
}
if (
invalidPersistedReason === "missing-payload" ||
invalidPersistedReason === "invalid-payload"
) {
trackIssue("invalidPayload");
mutated = true;
continue;
}
keptJobs.push(raw);
}
if (keptJobs.length !== jobs.length) {
jobs.splice(0, jobs.length, ...keptJobs);
}
return { issues, jobs, mutated };

View File

@@ -1,71 +0,0 @@
import { isRecord } from "../../shared/record-coerce.js";
import { validateSessionId } from "./paths.js";
import type { SessionEntry } from "./types.js";
function isSafeSessionId(value: unknown): value is string {
if (typeof value !== "string") {
return false;
}
const trimmed = value.trim();
if (!trimmed || trimmed.length > 255) {
return false;
}
if (trimmed.includes("/") || trimmed.includes("\\") || trimmed === "." || trimmed === "..") {
return false;
}
return /^[A-Za-z0-9][A-Za-z0-9._:@-]*$/.test(trimmed);
}
function normalizeTranscriptSessionId(value: string): string | undefined {
try {
return validateSessionId(value);
} catch {
return undefined;
}
}
function normalizeOptionalTimestamp(value: unknown): number | undefined {
if (value === undefined) {
return undefined;
}
return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : 0;
}
export function normalizePersistedSessionEntryShape(value: unknown): SessionEntry | undefined {
if (!isRecord(value)) {
return undefined;
}
let next = value as unknown as SessionEntry;
const sessionFile = typeof value.sessionFile === "string" ? value.sessionFile.trim() : undefined;
if (value.sessionId !== undefined) {
if (!isSafeSessionId(value.sessionId)) {
return undefined;
}
const sessionId = value.sessionId.trim();
const transcriptSessionId = normalizeTranscriptSessionId(sessionId);
if (!transcriptSessionId && !sessionFile) {
const { sessionId: _dropSessionId, ...rest } = next;
next = rest as SessionEntry;
} else if (sessionId !== value.sessionId) {
next = { ...next, sessionId };
}
}
if (value.sessionFile !== undefined && typeof value.sessionFile !== "string") {
if (next === value) {
next = { ...next };
}
delete next.sessionFile;
}
const updatedAt = normalizeOptionalTimestamp(value.updatedAt);
if (updatedAt !== value.updatedAt) {
if (next === value) {
next = { ...next };
}
next.updatedAt = updatedAt ?? 0;
}
return next;
}

View File

@@ -136,151 +136,6 @@ describe("appendAssistantMessageToSessionTranscript", () => {
}
});
it("uses spawned cwd when creating a missing transcript header", async () => {
const taskCwd = path.join(fixture.sessionsDir(), "task-repo");
fs.mkdirSync(taskCwd, { recursive: true });
fs.writeFileSync(
fixture.storePath(),
JSON.stringify({
[sessionKey]: {
sessionId,
chatType: "direct",
channel: "discord",
spawnedCwd: taskCwd,
},
}),
"utf-8",
);
const result = await appendAssistantMessageToSessionTranscript({
sessionKey,
text: "Hello from task cwd!",
storePath: fixture.storePath(),
});
expect(result.ok).toBe(true);
if (result.ok) {
const [headerLine] = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n");
const header = JSON.parse(headerLine ?? "{}") as { cwd?: string };
expect(header.cwd).toBe(taskCwd);
}
});
it("runs matching owned transcript appends through the active session write lock", async () => {
writeTranscriptStore();
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
const events: string[] = [];
const result = await withOwnedSessionTranscriptWrites(
{
sessionFile,
sessionKey,
withSessionWriteLock: async (run) => {
events.push("lock");
return await run();
},
},
async () =>
await appendAssistantMessageToSessionTranscript({
sessionKey,
text: "Hello under lock",
storePath: fixture.storePath(),
}),
);
expect(result.ok).toBe(true);
expect(events).toEqual(["lock", "lock", "lock"]);
});
it("keeps matching owned transcript appends locked from bound callbacks", async () => {
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
const events: string[] = [];
const callback = bindOwnedSessionTranscriptWrites(
{
sessionFile,
sessionKey,
withSessionWriteLock: async (run) => {
events.push("lock");
return await run();
},
},
async () =>
await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message: {
role: "assistant",
content: "Hello from bound delivery",
timestamp: Date.now(),
stopReason: "stop",
},
}),
);
const result = await callback();
expect(result.messageId).toBeTruthy();
expect(events).toEqual(["lock"]);
});
it("appends to legacy lowercase Signal group session entries", async () => {
const mixedGroupId = "VWATodkf2hc8zdOS76q9Tb0+5Bi522E03qLdaQ/9ypg=";
const signalSessionKey = `agent:main:signal:group:${mixedGroupId}`;
const legacySignalSessionKey = signalSessionKey.toLowerCase();
fs.writeFileSync(
fixture.storePath(),
JSON.stringify({
[legacySignalSessionKey]: {
sessionId,
chatType: "group",
channel: "signal",
},
}),
"utf-8",
);
const result = await appendAssistantMessageToSessionTranscript({
sessionKey: signalSessionKey,
text: "Hello Signal group",
storePath: fixture.storePath(),
});
expect(result.ok).toBe(true);
if (result.ok) {
const lines = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n");
expect(lines).toHaveLength(2);
const messageLine = JSON.parse(lines[1]);
expect(messageLine.message.content[0].text).toBe("Hello Signal group");
}
});
it("falls back to the canonical transcript path for malformed persisted sessionFile metadata", async () => {
fs.writeFileSync(
fixture.storePath(),
JSON.stringify({
[sessionKey]: {
sessionId,
sessionFile: { path: "../../escaped.jsonl" },
updatedAt: Date.now(),
},
}),
"utf-8",
);
const result = await appendAssistantMessageToSessionTranscript({
sessionKey,
text: "Hello from a repaired metadata boundary",
storePath: fixture.storePath(),
});
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.sessionFile).toBe(
resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()),
);
expect(fs.existsSync(result.sessionFile)).toBe(true);
}
});
it("emits transcript update events for delivery mirrors", async () => {
await writeTranscriptStore();
const emitSpy = vi.spyOn(transcriptEvents, "emitSessionTranscriptUpdate");

View File

@@ -336,7 +336,6 @@ describe("gateway session utils", () => {
test("session rows ignore malformed compaction checkpoints", () => {
const row = buildGatewaySessionRow({
cfg: createModelDefaultsConfig({ primary: "openai/gpt-5.4" }),
storePath: "",
store: {},
key: "agent:main:main",
entry: {

View File

@@ -50,6 +50,7 @@ import {
resolveAgentSessionDatabaseTargetsSync,
resolveAgentMainSessionKey,
resolveFreshSessionTotalTokens,
type SessionCompactionCheckpointReason,
type SessionEntry,
type SessionScope,
} from "../config/sessions.js";
@@ -246,11 +247,45 @@ function resolveNonNegativeNumber(value: number | null | undefined): number | un
return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined;
}
const VALID_COMPACTION_CHECKPOINT_REASONS = new Set<SessionCompactionCheckpointReason>([
"manual",
"auto-threshold",
"overflow-retry",
"timeout-retry",
]);
function isSessionCompactionCheckpoint(
checkpoint: unknown,
): checkpoint is NonNullable<SessionEntry["compactionCheckpoints"]>[number] {
if (!checkpoint || typeof checkpoint !== "object" || Array.isArray(checkpoint)) {
return false;
}
const candidate = checkpoint as Partial<
NonNullable<SessionEntry["compactionCheckpoints"]>[number]
>;
return (
typeof candidate.checkpointId === "string" &&
candidate.checkpointId.length > 0 &&
typeof candidate.createdAt === "number" &&
Number.isFinite(candidate.createdAt) &&
typeof candidate.reason === "string" &&
VALID_COMPACTION_CHECKPOINT_REASONS.has(candidate.reason as SessionCompactionCheckpointReason)
);
}
function normalizedCompactionCheckpoints(
entry?: Pick<SessionEntry, "compactionCheckpoints"> | null,
): NonNullable<SessionEntry["compactionCheckpoints"]> {
return Array.isArray(entry?.compactionCheckpoints)
? entry.compactionCheckpoints.filter(isSessionCompactionCheckpoint)
: [];
}
function resolveLatestCompactionCheckpoint(
entry?: Pick<SessionEntry, "compactionCheckpoints"> | null,
): NonNullable<SessionEntry["compactionCheckpoints"]>[number] | undefined {
const checkpoints = entry?.compactionCheckpoints;
if (!Array.isArray(checkpoints) || checkpoints.length === 0) {
const checkpoints = normalizedCompactionCheckpoints(entry);
if (checkpoints.length === 0) {
return undefined;
}
return checkpoints.reduce((latest, checkpoint) =>
@@ -1568,7 +1603,7 @@ export function buildGatewaySessionRow(params: {
lastTo: deliveryFields.lastTo,
lastAccountId: deliveryFields.lastAccountId,
lastThreadId: deliveryFields.lastThreadId,
compactionCheckpointCount: entry?.compactionCheckpoints?.length,
compactionCheckpointCount: normalizedCompactionCheckpoints(entry).length,
latestCompactionCheckpoint,
pluginExtensions: pluginExtensions.length > 0 ? pluginExtensions : undefined,
};