diff --git a/src/commands/doctor/legacy/cron-store-migration.ts b/src/commands/doctor/legacy/cron-store-migration.ts index 1c56d8caf14..9c40570f70e 100644 --- a/src/commands/doctor/legacy/cron-store-migration.ts +++ b/src/commands/doctor/legacy/cron-store-migration.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { parseAbsoluteTimeMs } from "../../../cron/parse.js"; +import { getInvalidPersistedCronJobReason } from "../../../cron/persisted-shape.js"; import { coerceFiniteScheduleNumber } from "../../../cron/schedule.js"; import { inferLegacyName } from "../../../cron/service/normalize.js"; import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../../../cron/stagger.js"; @@ -26,7 +27,9 @@ type CronStoreIssueKey = | "legacyPayloadProvider" | "legacyTopLevelPayloadFields" | "legacyTopLevelDeliveryFields" - | "legacyDeliveryMode"; + | "legacyDeliveryMode" + | "invalidSchedule" + | "invalidPayload"; type CronStoreIssues = Partial>; @@ -234,6 +237,7 @@ export function normalizeStoredCronJobs( jobs: Array>, ): NormalizeCronStoreJobsResult { const issues: CronStoreIssues = {}; + const keptJobs: Array> = []; let mutated = false; for (const raw of jobs) { @@ -560,6 +564,29 @@ export function normalizeStoredCronJobs( raw.delivery = normalizedLegacy.delivery; mutated = true; } + + const invalidPersistedReason = getInvalidPersistedCronJobReason(raw); + if ( + invalidPersistedReason === "missing-schedule" || + invalidPersistedReason === "invalid-schedule" + ) { + trackIssue("invalidSchedule"); + mutated = true; + continue; + } + if ( + invalidPersistedReason === "missing-payload" || + invalidPersistedReason === "invalid-payload" + ) { + trackIssue("invalidPayload"); + mutated = true; + continue; + } + keptJobs.push(raw); + } + + if (keptJobs.length !== jobs.length) { + jobs.splice(0, jobs.length, ...keptJobs); } return { issues, jobs, mutated }; diff --git a/src/config/sessions/store-entry-shape.ts b/src/config/sessions/store-entry-shape.ts deleted file mode 100644 index 987bbc77462..00000000000 --- a/src/config/sessions/store-entry-shape.ts +++ /dev/null @@ -1,71 +0,0 @@ -import { isRecord } from "../../shared/record-coerce.js"; -import { validateSessionId } from "./paths.js"; -import type { SessionEntry } from "./types.js"; - -function isSafeSessionId(value: unknown): value is string { - if (typeof value !== "string") { - return false; - } - const trimmed = value.trim(); - if (!trimmed || trimmed.length > 255) { - return false; - } - if (trimmed.includes("/") || trimmed.includes("\\") || trimmed === "." || trimmed === "..") { - return false; - } - return /^[A-Za-z0-9][A-Za-z0-9._:@-]*$/.test(trimmed); -} - -function normalizeTranscriptSessionId(value: string): string | undefined { - try { - return validateSessionId(value); - } catch { - return undefined; - } -} - -function normalizeOptionalTimestamp(value: unknown): number | undefined { - if (value === undefined) { - return undefined; - } - return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : 0; -} - -export function normalizePersistedSessionEntryShape(value: unknown): SessionEntry | undefined { - if (!isRecord(value)) { - return undefined; - } - - let next = value as unknown as SessionEntry; - const sessionFile = typeof value.sessionFile === "string" ? value.sessionFile.trim() : undefined; - if (value.sessionId !== undefined) { - if (!isSafeSessionId(value.sessionId)) { - return undefined; - } - const sessionId = value.sessionId.trim(); - const transcriptSessionId = normalizeTranscriptSessionId(sessionId); - if (!transcriptSessionId && !sessionFile) { - const { sessionId: _dropSessionId, ...rest } = next; - next = rest as SessionEntry; - } else if (sessionId !== value.sessionId) { - next = { ...next, sessionId }; - } - } - - if (value.sessionFile !== undefined && typeof value.sessionFile !== "string") { - if (next === value) { - next = { ...next }; - } - delete next.sessionFile; - } - - const updatedAt = normalizeOptionalTimestamp(value.updatedAt); - if (updatedAt !== value.updatedAt) { - if (next === value) { - next = { ...next }; - } - next.updatedAt = updatedAt ?? 0; - } - - return next; -} diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index cfe9ea47b5b..2439be2c852 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -136,151 +136,6 @@ describe("appendAssistantMessageToSessionTranscript", () => { } }); - it("uses spawned cwd when creating a missing transcript header", async () => { - const taskCwd = path.join(fixture.sessionsDir(), "task-repo"); - fs.mkdirSync(taskCwd, { recursive: true }); - fs.writeFileSync( - fixture.storePath(), - JSON.stringify({ - [sessionKey]: { - sessionId, - chatType: "direct", - channel: "discord", - spawnedCwd: taskCwd, - }, - }), - "utf-8", - ); - - const result = await appendAssistantMessageToSessionTranscript({ - sessionKey, - text: "Hello from task cwd!", - storePath: fixture.storePath(), - }); - - expect(result.ok).toBe(true); - if (result.ok) { - const [headerLine] = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n"); - const header = JSON.parse(headerLine ?? "{}") as { cwd?: string }; - expect(header.cwd).toBe(taskCwd); - } - }); - - it("runs matching owned transcript appends through the active session write lock", async () => { - writeTranscriptStore(); - const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); - const events: string[] = []; - - const result = await withOwnedSessionTranscriptWrites( - { - sessionFile, - sessionKey, - withSessionWriteLock: async (run) => { - events.push("lock"); - return await run(); - }, - }, - async () => - await appendAssistantMessageToSessionTranscript({ - sessionKey, - text: "Hello under lock", - storePath: fixture.storePath(), - }), - ); - - expect(result.ok).toBe(true); - expect(events).toEqual(["lock", "lock", "lock"]); - }); - - it("keeps matching owned transcript appends locked from bound callbacks", async () => { - const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); - const events: string[] = []; - const callback = bindOwnedSessionTranscriptWrites( - { - sessionFile, - sessionKey, - withSessionWriteLock: async (run) => { - events.push("lock"); - return await run(); - }, - }, - async () => - await appendSessionTranscriptMessage({ - transcriptPath: sessionFile, - message: { - role: "assistant", - content: "Hello from bound delivery", - timestamp: Date.now(), - stopReason: "stop", - }, - }), - ); - - const result = await callback(); - - expect(result.messageId).toBeTruthy(); - expect(events).toEqual(["lock"]); - }); - - it("appends to legacy lowercase Signal group session entries", async () => { - const mixedGroupId = "VWATodkf2hc8zdOS76q9Tb0+5Bi522E03qLdaQ/9ypg="; - const signalSessionKey = `agent:main:signal:group:${mixedGroupId}`; - const legacySignalSessionKey = signalSessionKey.toLowerCase(); - fs.writeFileSync( - fixture.storePath(), - JSON.stringify({ - [legacySignalSessionKey]: { - sessionId, - chatType: "group", - channel: "signal", - }, - }), - "utf-8", - ); - - const result = await appendAssistantMessageToSessionTranscript({ - sessionKey: signalSessionKey, - text: "Hello Signal group", - storePath: fixture.storePath(), - }); - - expect(result.ok).toBe(true); - if (result.ok) { - const lines = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n"); - expect(lines).toHaveLength(2); - const messageLine = JSON.parse(lines[1]); - expect(messageLine.message.content[0].text).toBe("Hello Signal group"); - } - }); - - it("falls back to the canonical transcript path for malformed persisted sessionFile metadata", async () => { - fs.writeFileSync( - fixture.storePath(), - JSON.stringify({ - [sessionKey]: { - sessionId, - sessionFile: { path: "../../escaped.jsonl" }, - updatedAt: Date.now(), - }, - }), - "utf-8", - ); - - const result = await appendAssistantMessageToSessionTranscript({ - sessionKey, - text: "Hello from a repaired metadata boundary", - storePath: fixture.storePath(), - }); - - expect(result.ok).toBe(true); - if (result.ok) { - expect(result.sessionFile).toBe( - resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()), - ); - expect(fs.existsSync(result.sessionFile)).toBe(true); - } - }); - it("emits transcript update events for delivery mirrors", async () => { await writeTranscriptStore(); const emitSpy = vi.spyOn(transcriptEvents, "emitSessionTranscriptUpdate"); diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index a97a9ed1c6f..681922ac932 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -336,7 +336,6 @@ describe("gateway session utils", () => { test("session rows ignore malformed compaction checkpoints", () => { const row = buildGatewaySessionRow({ cfg: createModelDefaultsConfig({ primary: "openai/gpt-5.4" }), - storePath: "", store: {}, key: "agent:main:main", entry: { diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 0be3353a83c..b7aafdb28f5 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -50,6 +50,7 @@ import { resolveAgentSessionDatabaseTargetsSync, resolveAgentMainSessionKey, resolveFreshSessionTotalTokens, + type SessionCompactionCheckpointReason, type SessionEntry, type SessionScope, } from "../config/sessions.js"; @@ -246,11 +247,45 @@ function resolveNonNegativeNumber(value: number | null | undefined): number | un return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined; } +const VALID_COMPACTION_CHECKPOINT_REASONS = new Set([ + "manual", + "auto-threshold", + "overflow-retry", + "timeout-retry", +]); + +function isSessionCompactionCheckpoint( + checkpoint: unknown, +): checkpoint is NonNullable[number] { + if (!checkpoint || typeof checkpoint !== "object" || Array.isArray(checkpoint)) { + return false; + } + const candidate = checkpoint as Partial< + NonNullable[number] + >; + return ( + typeof candidate.checkpointId === "string" && + candidate.checkpointId.length > 0 && + typeof candidate.createdAt === "number" && + Number.isFinite(candidate.createdAt) && + typeof candidate.reason === "string" && + VALID_COMPACTION_CHECKPOINT_REASONS.has(candidate.reason as SessionCompactionCheckpointReason) + ); +} + +function normalizedCompactionCheckpoints( + entry?: Pick | null, +): NonNullable { + return Array.isArray(entry?.compactionCheckpoints) + ? entry.compactionCheckpoints.filter(isSessionCompactionCheckpoint) + : []; +} + function resolveLatestCompactionCheckpoint( entry?: Pick | null, ): NonNullable[number] | undefined { - const checkpoints = entry?.compactionCheckpoints; - if (!Array.isArray(checkpoints) || checkpoints.length === 0) { + const checkpoints = normalizedCompactionCheckpoints(entry); + if (checkpoints.length === 0) { return undefined; } return checkpoints.reduce((latest, checkpoint) => @@ -1568,7 +1603,7 @@ export function buildGatewaySessionRow(params: { lastTo: deliveryFields.lastTo, lastAccountId: deliveryFields.lastAccountId, lastThreadId: deliveryFields.lastThreadId, - compactionCheckpointCount: entry?.compactionCheckpoints?.length, + compactionCheckpointCount: normalizedCompactionCheckpoints(entry).length, latestCompactionCheckpoint, pluginExtensions: pluginExtensions.length > 0 ? pluginExtensions : undefined, };