From af8d2f2948d365d498cd9df0d3303b93157689ed Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 16 May 2026 11:14:54 +0800 Subject: [PATCH] fix(config): harden persisted boundary repair --- CHANGELOG.md | 1 + .../auth-profiles/persisted-boundary.test.ts | 105 ++++++++++++ src/agents/auth-profiles/persisted.ts | 130 +++++++++++++- src/agents/auth-profiles/state.ts | 161 +++++++++++++++--- .../doctor-cron-store-migration.test.ts | 28 +++ src/commands/doctor-cron-store-migration.ts | 29 +++- src/commands/doctor-cron.ts | 10 ++ src/cron/persisted-shape.ts | 66 +++++++ .../store.load-missing-session-target.test.ts | 41 ++++- src/cron/service/store.ts | 39 ++--- .../session-compaction-checkpoints.test.ts | 68 ++++++++ src/gateway/session-compaction-checkpoints.ts | 6 +- 12 files changed, 624 insertions(+), 60 deletions(-) create mode 100644 src/agents/auth-profiles/persisted-boundary.test.ts create mode 100644 src/cron/persisted-shape.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c54d7bb97a..d6727f8bb94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Agents/subagents: warn and continue completion announce cleanup when lifecycle cleanup fails, preventing ended subagent runs from becoming silent ghosts. Fixes #82306. Thanks @SebTardif. - Telegram: let authorized text `/stop` commands use the fast-abort path before queued agent work, so active turns stop immediately instead of processing the abort after the turn finishes; foreign-bot `/stop@otherbot` mentions now stay on the regular topic lane instead of being routed into our control lane. Fixes #82162. Thanks @civiltox. - Sessions: drop persisted entries with invalid session ids and strip malformed transcript file metadata before hydrating session runtime state. +- Config persistence: normalize malformed auth profile credential fields/state, skip JSON-valid garbage transcript checkpoint rows, and let `openclaw doctor --fix` remove unrepairable cron job rows. - Cron: skip persisted job rows with malformed schedule or payload shapes in memory, leaving the store for `openclaw doctor --fix` instead of hydrating them into runtime state. - Task persistence: drop malformed array/scalar requester-origin JSON from task and task-flow SQLite sidecars instead of restoring it as delivery metadata. - Agents/timeouts: clarify model idle-timeout errors and docs so provider `timeoutSeconds` is shown as bounded by the whole agent/run timeout ceiling. diff --git a/src/agents/auth-profiles/persisted-boundary.test.ts b/src/agents/auth-profiles/persisted-boundary.test.ts new file mode 100644 index 00000000000..a7edff62f2f --- /dev/null +++ b/src/agents/auth-profiles/persisted-boundary.test.ts @@ -0,0 +1,105 @@ +import { describe, expect, it } from "vitest"; +import { AUTH_STORE_VERSION } from "./constants.js"; +import { coercePersistedAuthProfileStore } from "./persisted.js"; + +describe("persisted auth profile boundary", () => { + it("normalizes malformed persisted credentials and state before runtime use", () => { + const store = coercePersistedAuthProfileStore({ + version: "not-a-version", + profiles: { + "openai:default": { + type: "api_key", + provider: " OpenAI ", + key: 42, + keyRef: { source: "env", id: "OPENAI_API_KEY" }, + metadata: { account: "acct_123", bad: 123 }, + copyToAgents: "yes", + email: ["wrong"], + displayName: "Work", + }, + "minimax:default": { + type: "token", + provider: "minimax", + token: ["wrong"], + tokenRef: { source: "env", provider: "default", id: "MINIMAX_TOKEN" }, + expires: "tomorrow", + }, + "codex:default": { + type: "oauth", + provider: "openai-codex", + access: ["wrong"], + refresh: "refresh-token", + expires: "later", + oauthRef: { + source: "openclaw-credentials", + provider: "openai-codex", + id: "not-a-secret-id", + }, + }, + "broken:array": [], + }, + order: { + OpenAI: [" openai:default ", 5, ""], + minimax: "wrong", + }, + lastGood: { + OpenAI: " openai:default ", + minimax: 5, + }, + usageStats: { + "openai:default": { + cooldownUntil: "later", + disabledUntil: 123, + disabledReason: "billing", + failureCounts: { + billing: 2, + nope: 4, + }, + }, + "minimax:default": "wrong", + }, + }); + + expect(store).toMatchObject({ + version: AUTH_STORE_VERSION, + profiles: { + "openai:default": { + type: "api_key", + provider: "openai", + keyRef: { source: "env", provider: "default", id: "OPENAI_API_KEY" }, + metadata: { account: "acct_123" }, + displayName: "Work", + }, + "minimax:default": { + type: "token", + provider: "minimax", + tokenRef: { source: "env", provider: "default", id: "MINIMAX_TOKEN" }, + expires: 0, + }, + "codex:default": { + type: "oauth", + provider: "openai-codex", + refresh: "refresh-token", + expires: 0, + }, + }, + order: { + openai: ["openai:default"], + }, + lastGood: { + openai: "openai:default", + }, + usageStats: { + "openai:default": { + disabledUntil: 123, + disabledReason: "billing", + failureCounts: { billing: 2 }, + }, + }, + }); + expect(store?.profiles["broken:array"]).toBeUndefined(); + expect(store?.profiles["openai:default"]).not.toHaveProperty("key"); + expect(store?.profiles["openai:default"]).not.toHaveProperty("copyToAgents"); + expect(store?.profiles["codex:default"]).not.toHaveProperty("oauthRef"); + }); +}); diff --git a/src/agents/auth-profiles/persisted.ts b/src/agents/auth-profiles/persisted.ts index f573ed8115b..d38e392f230 100644 --- a/src/agents/auth-profiles/persisted.ts +++ b/src/agents/auth-profiles/persisted.ts @@ -77,6 +77,38 @@ function isRecord(value: unknown): value is Record { return !!value && typeof value === "object" && !Array.isArray(value); } +function normalizeOptionalCredentialString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? value : undefined; +} + +function normalizeOptionalCredentialBoolean(value: unknown): boolean | undefined { + return typeof value === "boolean" ? value : undefined; +} + +function normalizeExpiryField(value: unknown): number | undefined { + if (value === undefined) { + return undefined; + } + return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : 0; +} + +function normalizeCredentialMetadata(value: unknown): Record | undefined { + if (!isRecord(value)) { + return undefined; + } + const metadata: Record = {}; + for (const [key, entry] of Object.entries(value)) { + if (typeof entry === "string") { + metadata[key] = entry; + } + } + return Object.keys(metadata).length > 0 ? metadata : undefined; +} + function normalizeSecretBackedField(params: { entry: Record; valueField: "key" | "token"; @@ -93,6 +125,25 @@ function normalizeSecretBackedField(params: { delete params.entry[params.valueField]; } +function normalizeCommonCredentialFields(entry: Record): Record { + const normalized: Record = { + provider: typeof entry.provider === "string" ? normalizeProviderId(entry.provider) : "", + }; + const copyToAgents = normalizeOptionalCredentialBoolean(entry.copyToAgents); + if (copyToAgents !== undefined) { + normalized.copyToAgents = copyToAgents; + } + const email = normalizeOptionalCredentialString(entry.email); + if (email !== undefined) { + normalized.email = email; + } + const displayName = normalizeOptionalCredentialString(entry.displayName); + if (displayName !== undefined) { + normalized.displayName = displayName; + } + return normalized; +} + function normalizeRawCredentialEntry(raw: Record): Partial { const entry = { ...raw } as Record; if (!("type" in entry) && typeof entry["mode"] === "string") { @@ -103,6 +154,73 @@ function normalizeRawCredentialEntry(raw: Record): Partial = { + type: "api_key", + ...normalizeCommonCredentialFields(entry), + }; + const key = normalizeOptionalCredentialString(entry.key); + const keyRef = coerceSecretRef(entry.keyRef); + const metadata = normalizeCredentialMetadata(entry.metadata); + if (key !== undefined) { + normalized.key = key; + } + if (keyRef) { + normalized.keyRef = keyRef; + } + if (metadata) { + normalized.metadata = metadata; + } + return normalized as Partial; + } + if (entry.type === "token") { + const normalized: Record = { + type: "token", + ...normalizeCommonCredentialFields(entry), + }; + const token = normalizeOptionalCredentialString(entry.token); + const tokenRef = coerceSecretRef(entry.tokenRef); + const expires = normalizeExpiryField(entry.expires); + if (token !== undefined) { + normalized.token = token; + } + if (tokenRef) { + normalized.tokenRef = tokenRef; + } + if (expires !== undefined) { + normalized.expires = expires; + } + return normalized as Partial; + } + if (entry.type === "oauth") { + const normalized: Record = { + type: "oauth", + ...normalizeCommonCredentialFields(entry), + }; + for (const field of [ + "access", + "refresh", + "idToken", + "clientId", + "enterpriseUrl", + "projectId", + "accountId", + "chatgptPlanType", + ] as const) { + const value = normalizeOptionalCredentialString(entry[field]); + if (value !== undefined) { + normalized[field] = value; + } + } + const expires = normalizeExpiryField(entry.expires); + if (expires !== undefined) { + normalized.expires = expires; + } + if (isOAuthProfileSecretRef(entry.oauthRef)) { + normalized.oauthRef = entry.oauthRef; + } + return normalized; + } return entry as Partial; } @@ -528,22 +646,23 @@ function parseCredentialEntry( raw: unknown, fallbackProvider?: string, ): { ok: true; credential: AuthProfileCredential } | { ok: false; reason: CredentialRejectReason } { - if (!raw || typeof raw !== "object") { + if (!isRecord(raw)) { return { ok: false, reason: "non_object" }; } - const typed = normalizeRawCredentialEntry(raw as Record); + const typed = normalizeRawCredentialEntry(raw); if (!AUTH_PROFILE_TYPES.has(typed.type as AuthProfileCredential["type"])) { return { ok: false, reason: "invalid_type" }; } const provider = typed.provider ?? fallbackProvider; - if (typeof provider !== "string" || provider.trim().length === 0) { + const normalizedProvider = typeof provider === "string" ? normalizeProviderId(provider) : ""; + if (!normalizedProvider) { return { ok: false, reason: "missing_provider" }; } return { ok: true, credential: { ...typed, - provider, + provider: normalizedProvider, } as AuthProfileCredential, }; } @@ -609,8 +728,9 @@ export function coercePersistedAuthProfileStore(raw: unknown): AuthProfileStore normalized[key] = parsed.credential; } warnRejectedCredentialEntries("auth-profiles.json", rejected); + const version = Number(record.version ?? AUTH_STORE_VERSION); return { - version: Number(record.version ?? AUTH_STORE_VERSION), + version: Number.isFinite(version) && version > 0 ? version : AUTH_STORE_VERSION, profiles: normalized, ...coerceAuthProfileState(record), }; diff --git a/src/agents/auth-profiles/state.ts b/src/agents/auth-profiles/state.ts index 96210e53ffa..3ea6125ac97 100644 --- a/src/agents/auth-profiles/state.ts +++ b/src/agents/auth-profiles/state.ts @@ -1,44 +1,159 @@ import fs from "node:fs"; import { loadJsonFile, saveJsonFile } from "../../infra/json-file.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import { normalizeProviderId } from "../provider-id.js"; import { AUTH_STORE_VERSION } from "./constants.js"; import { resolveAuthStatePath } from "./paths.js"; -import type { AuthProfileState, AuthProfileStateStore, ProfileUsageStats } from "./types.js"; +import type { + AuthProfileBlockedReason, + AuthProfileBlockedSource, + AuthProfileFailureReason, + AuthProfileState, + AuthProfileStateStore, + ProfileUsageStats, +} from "./types.js"; -function normalizeAuthProfileOrder(raw: unknown): AuthProfileState["order"] { - if (!raw || typeof raw !== "object") { +const AUTH_FAILURE_REASONS = new Set([ + "auth", + "auth_permanent", + "format", + "overloaded", + "rate_limit", + "billing", + "timeout", + "model_not_found", + "session_expired", + "empty_response", + "no_error_details", + "unclassified", + "unknown", +]); +const AUTH_BLOCKED_REASONS = new Set(["subscription_limit"]); +const AUTH_BLOCKED_SOURCES = new Set(["codex_rate_limits", "wham"]); + +function isRecord(value: unknown): value is Record { + return !!value && typeof value === "object" && !Array.isArray(value); +} + +function normalizeFiniteNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function normalizeEnumValue(value: unknown, allowed: Set): T | undefined { + if (typeof value !== "string") { return undefined; } - const normalized = Object.entries(raw as Record).reduce< - Record - >((acc, [provider, value]) => { - if (!Array.isArray(value)) { + return allowed.has(value as T) ? (value as T) : undefined; +} + +function normalizeFailureCounts(raw: unknown): ProfileUsageStats["failureCounts"] { + if (!isRecord(raw)) { + return undefined; + } + const normalized: NonNullable = {}; + for (const [reason, count] of Object.entries(raw)) { + if (!AUTH_FAILURE_REASONS.has(reason as AuthProfileFailureReason)) { + continue; + } + if (typeof count !== "number" || !Number.isFinite(count) || count <= 0) { + continue; + } + normalized[reason as AuthProfileFailureReason] = Math.trunc(count); + } + return Object.keys(normalized).length > 0 ? normalized : undefined; +} + +function normalizeAuthProfileOrder(raw: unknown): AuthProfileState["order"] { + if (!isRecord(raw)) { + return undefined; + } + const normalized = Object.entries(raw).reduce>( + (acc, [provider, value]) => { + if (!Array.isArray(value)) { + return acc; + } + const providerKey = normalizeProviderId(provider); + if (!providerKey) { + return acc; + } + const list = value.map((entry) => normalizeOptionalString(entry) ?? "").filter(Boolean); + if (list.length > 0) { + acc[providerKey] = list; + } return acc; + }, + {}, + ); + return Object.keys(normalized).length > 0 ? normalized : undefined; +} + +function normalizeLastGood(raw: unknown): AuthProfileState["lastGood"] { + if (!isRecord(raw)) { + return undefined; + } + const normalized: Record = {}; + for (const [provider, profileId] of Object.entries(raw)) { + const providerKey = normalizeProviderId(provider); + const normalizedProfileId = normalizeOptionalString(profileId); + if (!providerKey || !normalizedProfileId) { + continue; } - const list = value.map((entry) => normalizeOptionalString(entry) ?? "").filter(Boolean); - if (list.length > 0) { - acc[provider] = list; + normalized[providerKey] = normalizedProfileId; + } + return Object.keys(normalized).length > 0 ? normalized : undefined; +} + +function normalizeUsageStatsEntry(raw: unknown): ProfileUsageStats | undefined { + if (!isRecord(raw)) { + return undefined; + } + const stats: ProfileUsageStats = { + lastUsed: normalizeFiniteNumber(raw.lastUsed), + blockedUntil: normalizeFiniteNumber(raw.blockedUntil), + blockedReason: normalizeEnumValue(raw.blockedReason, AUTH_BLOCKED_REASONS), + blockedSource: normalizeEnumValue(raw.blockedSource, AUTH_BLOCKED_SOURCES), + blockedModel: normalizeOptionalString(raw.blockedModel), + cooldownUntil: normalizeFiniteNumber(raw.cooldownUntil), + cooldownReason: normalizeEnumValue(raw.cooldownReason, AUTH_FAILURE_REASONS), + cooldownModel: normalizeOptionalString(raw.cooldownModel), + disabledUntil: normalizeFiniteNumber(raw.disabledUntil), + disabledReason: normalizeEnumValue(raw.disabledReason, AUTH_FAILURE_REASONS), + errorCount: normalizeFiniteNumber(raw.errorCount), + failureCounts: normalizeFailureCounts(raw.failureCounts), + lastFailureAt: normalizeFiniteNumber(raw.lastFailureAt), + }; + for (const key of Object.keys(stats) as Array) { + if (stats[key] === undefined) { + delete stats[key]; } - return acc; - }, {}); + } + return Object.keys(stats).length > 0 ? stats : undefined; +} + +function normalizeUsageStats(raw: unknown): AuthProfileState["usageStats"] { + if (!isRecord(raw)) { + return undefined; + } + const normalized: Record = {}; + for (const [profileId, value] of Object.entries(raw)) { + const normalizedProfileId = normalizeOptionalString(profileId); + const stats = normalizeUsageStatsEntry(value); + if (!normalizedProfileId || !stats) { + continue; + } + normalized[normalizedProfileId] = stats; + } return Object.keys(normalized).length > 0 ? normalized : undefined; } export function coerceAuthProfileState(raw: unknown): AuthProfileState { - if (!raw || typeof raw !== "object") { + if (!isRecord(raw)) { return {}; } - const record = raw as Record; return { - order: normalizeAuthProfileOrder(record.order), - lastGood: - record.lastGood && typeof record.lastGood === "object" - ? (record.lastGood as Record) - : undefined, - usageStats: - record.usageStats && typeof record.usageStats === "object" - ? (record.usageStats as Record) - : undefined, + order: normalizeAuthProfileOrder(raw.order), + lastGood: normalizeLastGood(raw.lastGood), + usageStats: normalizeUsageStats(raw.usageStats), }; } diff --git a/src/commands/doctor-cron-store-migration.test.ts b/src/commands/doctor-cron-store-migration.test.ts index 79acbae34a9..cfa83962484 100644 --- a/src/commands/doctor-cron-store-migration.test.ts +++ b/src/commands/doctor-cron-store-migration.test.ts @@ -142,6 +142,34 @@ describe("normalizeStoredCronJobs", () => { expect(result.issues.legacyPayloadKind).toBeUndefined(); }); + it("removes unrepairable persisted schedule and payload shapes", () => { + const jobs = [ + makeLegacyJob({ + id: "valid", + schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 }, + payload: { kind: "systemEvent", text: "tick" }, + }), + makeLegacyJob({ + id: "bad-schedule", + schedule: { kind: "cron", expr: [] }, + payload: { kind: "systemEvent", text: "tick" }, + }), + makeLegacyJob({ + id: "bad-payload", + schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 }, + payload: { kind: "agentTurn", message: ["tick"] }, + }), + ]; + + const result = normalizeStoredCronJobs(jobs); + + expect(result.mutated).toBe(true); + expect(result.issues.invalidSchedule).toBe(1); + expect(result.issues.invalidPayload).toBe(1); + expect(jobs.map((job) => job.id)).toEqual(["valid"]); + expect(result.jobs.map((job) => job.id)).toEqual(["valid"]); + }); + it("normalizes whitespace-padded and non-canonical payload kinds", () => { const jobs = [ { diff --git a/src/commands/doctor-cron-store-migration.ts b/src/commands/doctor-cron-store-migration.ts index 84e6e67a7ee..4bf0f9a56d5 100644 --- a/src/commands/doctor-cron-store-migration.ts +++ b/src/commands/doctor-cron-store-migration.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { parseAbsoluteTimeMs } from "../cron/parse.js"; +import { getInvalidPersistedCronJobReason } from "../cron/persisted-shape.js"; import { coerceFiniteScheduleNumber } from "../cron/schedule.js"; import { inferLegacyName } from "../cron/service/normalize.js"; import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../cron/stagger.js"; @@ -26,7 +27,9 @@ type CronStoreIssueKey = | "legacyPayloadProvider" | "legacyTopLevelPayloadFields" | "legacyTopLevelDeliveryFields" - | "legacyDeliveryMode"; + | "legacyDeliveryMode" + | "invalidSchedule" + | "invalidPayload"; type CronStoreIssues = Partial>; @@ -235,6 +238,7 @@ export function normalizeStoredCronJobs( ): NormalizeCronStoreJobsResult { const issues: CronStoreIssues = {}; let mutated = false; + const keptJobs: Array> = []; for (const raw of jobs) { const jobIssues = new Set(); @@ -560,6 +564,29 @@ export function normalizeStoredCronJobs( raw.delivery = normalizedLegacy.delivery; mutated = true; } + + const invalidPersistedReason = getInvalidPersistedCronJobReason(raw); + if ( + invalidPersistedReason === "missing-schedule" || + invalidPersistedReason === "invalid-schedule" + ) { + trackIssue("invalidSchedule"); + mutated = true; + continue; + } + if ( + invalidPersistedReason === "missing-payload" || + invalidPersistedReason === "invalid-payload" + ) { + trackIssue("invalidPayload"); + mutated = true; + continue; + } + keptJobs.push(raw); + } + + if (keptJobs.length !== jobs.length) { + jobs.splice(0, jobs.length, ...keptJobs); } return { issues, jobs, mutated }; diff --git a/src/commands/doctor-cron.ts b/src/commands/doctor-cron.ts index 11aaf1662a5..056ba4d89eb 100644 --- a/src/commands/doctor-cron.ts +++ b/src/commands/doctor-cron.ts @@ -81,6 +81,16 @@ function formatLegacyIssuePreview(issues: Partial>): stri `- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``, ); } + if (issues.invalidSchedule) { + lines.push( + `- ${pluralize(issues.invalidSchedule, "job")} has an invalid persisted schedule and will be removed`, + ); + } + if (issues.invalidPayload) { + lines.push( + `- ${pluralize(issues.invalidPayload, "job")} has an invalid persisted payload and will be removed`, + ); + } return lines; } diff --git a/src/cron/persisted-shape.ts b/src/cron/persisted-shape.ts new file mode 100644 index 00000000000..1b59b6ac547 --- /dev/null +++ b/src/cron/persisted-shape.ts @@ -0,0 +1,66 @@ +import { parseAbsoluteTimeMs } from "./parse.js"; + +export type InvalidPersistedCronJobReason = + | "missing-id" + | "missing-schedule" + | "invalid-schedule" + | "missing-payload" + | "invalid-payload"; + +export function getInvalidPersistedCronJobReason( + candidate: Record, +): InvalidPersistedCronJobReason | null { + const id = candidate.id; + if (typeof id !== "string" || !id.trim()) { + return "missing-id"; + } + const schedule = candidate.schedule; + if (!schedule || typeof schedule !== "object" || Array.isArray(schedule)) { + return "missing-schedule"; + } + const scheduleRecord = schedule as Record; + const scheduleKind = scheduleRecord.kind; + if (scheduleKind !== "at" && scheduleKind !== "every" && scheduleKind !== "cron") { + return "invalid-schedule"; + } + if (scheduleKind === "at") { + const at = scheduleRecord.at; + if (typeof at !== "string" || parseAbsoluteTimeMs(at) === null) { + return "invalid-schedule"; + } + } + if (scheduleKind === "every") { + const everyMs = scheduleRecord.everyMs; + if (typeof everyMs !== "number" || !Number.isFinite(everyMs) || everyMs <= 0) { + return "invalid-schedule"; + } + } + if (scheduleKind === "cron") { + const expr = scheduleRecord.expr; + if (typeof expr !== "string" || expr.trim().length === 0) { + return "invalid-schedule"; + } + } + const payload = candidate.payload; + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + return "missing-payload"; + } + const payloadRecord = payload as Record; + const payloadKind = payloadRecord.kind; + if (payloadKind !== "systemEvent" && payloadKind !== "agentTurn") { + return "invalid-payload"; + } + if (payloadKind === "systemEvent") { + const text = payloadRecord.text; + if (typeof text !== "string" || text.trim().length === 0) { + return "invalid-payload"; + } + } + if (payloadKind === "agentTurn") { + const message = payloadRecord.message; + if (typeof message !== "string" || message.trim().length === 0) { + return "invalid-payload"; + } + } + return null; +} diff --git a/src/cron/service/store.load-missing-session-target.test.ts b/src/cron/service/store.load-missing-session-target.test.ts index 9ba02e25a24..ec9eff81d40 100644 --- a/src/cron/service/store.load-missing-session-target.test.ts +++ b/src/cron/service/store.load-missing-session-target.test.ts @@ -162,6 +162,42 @@ describe("cron service store load: missing sessionTarget", () => { payload: ["systemEvent", "tick"], state: {}, }, + { + id: "bad-cron-expr", + name: "bad cron expr", + enabled: true, + createdAtMs: STORE_TEST_NOW - 60_000, + updatedAtMs: STORE_TEST_NOW - 60_000, + schedule: { kind: "cron", expr: [] }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }, + { + id: "bad-system-event-text", + name: "bad system event text", + enabled: true, + createdAtMs: STORE_TEST_NOW - 60_000, + updatedAtMs: STORE_TEST_NOW - 60_000, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: ["tick"] }, + state: {}, + }, + { + id: "bad-agent-turn-message", + name: "bad agent turn message", + enabled: true, + createdAtMs: STORE_TEST_NOW - 60_000, + updatedAtMs: STORE_TEST_NOW - 60_000, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: { text: "tick" } }, + state: {}, + }, ]); const beforeRaw = await fs.readFile(storePath, "utf-8"); const warnSpy = vi.spyOn(logger, "warn"); @@ -178,10 +214,13 @@ describe("cron service store load: missing sessionTarget", () => { const msg = typeof call[1] === "string" ? call[1] : ""; return msg.includes("skipped invalid persisted job"); }); - expect(invalidShapeWarns).toHaveLength(2); + expect(invalidShapeWarns).toHaveLength(5); expect(invalidShapeWarns.map((call) => (call[0] as { reason?: string }).reason)).toEqual([ "missing-schedule", "missing-payload", + "invalid-schedule", + "invalid-payload", + "invalid-payload", ]); warnSpy.mockRestore(); }); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index afe13410097..37fd1b7197a 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import { normalizeCronJobIdentityFields } from "../normalize-job-identity.js"; import { normalizeCronJobInput } from "../normalize.js"; +import { getInvalidPersistedCronJobReason } from "../persisted-shape.js"; import { cronSchedulingInputsEqual } from "../schedule-identity.js"; import { isInvalidCronSessionTargetIdError } from "../session-target.js"; import { loadCronStore, saveCronStore } from "../store.js"; @@ -20,30 +21,6 @@ function invalidateStaleNextRunOnScheduleChange(params: { params.hydrated.state.nextRunAtMs = undefined; } -function getInvalidPersistedCronJobReason(candidate: Record): string | null { - const id = candidate.id; - if (typeof id !== "string" || !id.trim()) { - return "missing-id"; - } - const schedule = candidate.schedule; - if (!schedule || typeof schedule !== "object" || Array.isArray(schedule)) { - return "missing-schedule"; - } - const scheduleKind = (schedule as { kind?: unknown }).kind; - if (scheduleKind !== "at" && scheduleKind !== "every" && scheduleKind !== "cron") { - return "invalid-schedule"; - } - const payload = candidate.payload; - if (!payload || typeof payload !== "object" || Array.isArray(payload)) { - return "missing-payload"; - } - const payloadKind = (payload as { kind?: unknown }).kind; - if (payloadKind !== "systemEvent" && payloadKind !== "agentTurn") { - return "invalid-payload"; - } - return null; -} - function warnInvalidPersistedCronJob(params: { state: CronServiceState; raw: Record; @@ -57,7 +34,12 @@ function warnInvalidPersistedCronJob(params: { } params.state.warnedInvalidPersistedJobKeys.add(dedupeKey); params.state.deps.log.warn( - { storePath: params.state.deps.storePath, jobId, jobIndex: params.index, reason: params.reason }, + { + storePath: params.state.deps.storePath, + jobId, + jobIndex: params.index, + reason: params.reason, + }, "cron: skipped invalid persisted job; run openclaw doctor --fix to repair", ); } @@ -114,10 +96,9 @@ export async function ensureLoaded( } const hydrated = normalized && typeof normalized === "object" ? (normalized as unknown as CronJob) : job; - const invalidReason = getInvalidPersistedCronJobReason(hydrated as unknown as Record< - string, - unknown - >); + const invalidReason = getInvalidPersistedCronJobReason( + hydrated as unknown as Record, + ); if (invalidReason) { warnInvalidPersistedCronJob({ state, raw, index, reason: invalidReason }); continue; diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index 753aed6af32..3185269b7b2 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -342,6 +342,74 @@ describe("session-compaction-checkpoints", () => { ]); }); + test("async fork skips JSON-valid garbage transcript entries", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-garbage-fork-")); + tempDirs.push(dir); + + const sourceFile = path.join(dir, "garbage.jsonl"); + const firstMessage = { + type: "message", + id: "first", + parentId: null, + message: { + role: "user", + content: "first", + timestamp: 1, + }, + }; + const secondMessage = { + type: "message", + id: "second", + parentId: "first", + message: { + role: "assistant", + content: "second", + api: "responses", + provider: "openai", + model: "gpt-test", + timestamp: 2, + }, + }; + await fs.writeFile( + sourceFile, + [ + JSON.stringify({ + type: "session", + version: CURRENT_SESSION_VERSION, + id: "source-session", + timestamp: new Date(0).toISOString(), + cwd: dir, + }), + JSON.stringify(firstMessage), + "null", + "[]", + '"garbage"', + JSON.stringify(secondMessage), + "{truncated-json", + "", + ].join("\n"), + "utf-8", + ); + + const forked = await forkCompactionCheckpointTranscriptAsync({ + sourceFile, + sessionDir: dir, + }); + + if (!forked) { + throw new Error("expected forked checkpoint transcript"); + } + const forkedEntries = (await fs.readFile(forked.sessionFile, "utf-8")) + .trim() + .split(/\r?\n/) + .map((line) => JSON.parse(line) as Record); + expect(forkedEntries.map((entry) => entry.type)).toEqual(["session", "message", "message"]); + expect(requireRecord(forkedEntries[1]?.message, "first forked message").content).toBe("first"); + expect(requireRecord(forkedEntries[2]?.message, "second forked message").content).toBe( + "second", + ); + }); + test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-trim-")); tempDirs.push(dir); diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index 713ee14f1d9..d9ce7de532f 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -155,7 +155,11 @@ async function readTranscriptEntriesForForkAsync( try { for await (const line of streamSessionTranscriptLines(sessionFile)) { try { - entries.push(JSON.parse(line) as PiSessionFileEntry); + const parsed = JSON.parse(line) as unknown; + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + continue; + } + entries.push(parsed as PiSessionFileEntry); } catch { // Match pi-coding-agent's loader: malformed JSONL entries are ignored. }