mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 15:34:46 +00:00
fix(config): harden persisted boundary repair
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/subagents: warn and continue completion announce cleanup when lifecycle cleanup fails, preventing ended subagent runs from becoming silent ghosts. Fixes #82306. Thanks @SebTardif.
|
||||
- Telegram: let authorized text `/stop` commands use the fast-abort path before queued agent work, so active turns stop immediately instead of processing the abort after the turn finishes; foreign-bot `/stop@otherbot` mentions now stay on the regular topic lane instead of being routed into our control lane. Fixes #82162. Thanks @civiltox.
|
||||
- Sessions: drop persisted entries with invalid session ids and strip malformed transcript file metadata before hydrating session runtime state.
|
||||
- Config persistence: normalize malformed auth profile credential fields/state, skip JSON-valid garbage transcript checkpoint rows, and let `openclaw doctor --fix` remove unrepairable cron job rows.
|
||||
- Cron: skip persisted job rows with malformed schedule or payload shapes in memory, leaving the store for `openclaw doctor --fix` instead of hydrating them into runtime state.
|
||||
- Task persistence: drop malformed array/scalar requester-origin JSON from task and task-flow SQLite sidecars instead of restoring it as delivery metadata.
|
||||
- Agents/timeouts: clarify model idle-timeout errors and docs so provider `timeoutSeconds` is shown as bounded by the whole agent/run timeout ceiling.
|
||||
|
||||
105
src/agents/auth-profiles/persisted-boundary.test.ts
Normal file
105
src/agents/auth-profiles/persisted-boundary.test.ts
Normal file
@@ -0,0 +1,105 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { AUTH_STORE_VERSION } from "./constants.js";
|
||||
import { coercePersistedAuthProfileStore } from "./persisted.js";
|
||||
|
||||
describe("persisted auth profile boundary", () => {
|
||||
it("normalizes malformed persisted credentials and state before runtime use", () => {
|
||||
const store = coercePersistedAuthProfileStore({
|
||||
version: "not-a-version",
|
||||
profiles: {
|
||||
"openai:default": {
|
||||
type: "api_key",
|
||||
provider: " OpenAI ",
|
||||
key: 42,
|
||||
keyRef: { source: "env", id: "OPENAI_API_KEY" },
|
||||
metadata: { account: "acct_123", bad: 123 },
|
||||
copyToAgents: "yes",
|
||||
email: ["wrong"],
|
||||
displayName: "Work",
|
||||
},
|
||||
"minimax:default": {
|
||||
type: "token",
|
||||
provider: "minimax",
|
||||
token: ["wrong"],
|
||||
tokenRef: { source: "env", provider: "default", id: "MINIMAX_TOKEN" },
|
||||
expires: "tomorrow",
|
||||
},
|
||||
"codex:default": {
|
||||
type: "oauth",
|
||||
provider: "openai-codex",
|
||||
access: ["wrong"],
|
||||
refresh: "refresh-token",
|
||||
expires: "later",
|
||||
oauthRef: {
|
||||
source: "openclaw-credentials",
|
||||
provider: "openai-codex",
|
||||
id: "not-a-secret-id",
|
||||
},
|
||||
},
|
||||
"broken:array": [],
|
||||
},
|
||||
order: {
|
||||
OpenAI: [" openai:default ", 5, ""],
|
||||
minimax: "wrong",
|
||||
},
|
||||
lastGood: {
|
||||
OpenAI: " openai:default ",
|
||||
minimax: 5,
|
||||
},
|
||||
usageStats: {
|
||||
"openai:default": {
|
||||
cooldownUntil: "later",
|
||||
disabledUntil: 123,
|
||||
disabledReason: "billing",
|
||||
failureCounts: {
|
||||
billing: 2,
|
||||
nope: 4,
|
||||
},
|
||||
},
|
||||
"minimax:default": "wrong",
|
||||
},
|
||||
});
|
||||
|
||||
expect(store).toMatchObject({
|
||||
version: AUTH_STORE_VERSION,
|
||||
profiles: {
|
||||
"openai:default": {
|
||||
type: "api_key",
|
||||
provider: "openai",
|
||||
keyRef: { source: "env", provider: "default", id: "OPENAI_API_KEY" },
|
||||
metadata: { account: "acct_123" },
|
||||
displayName: "Work",
|
||||
},
|
||||
"minimax:default": {
|
||||
type: "token",
|
||||
provider: "minimax",
|
||||
tokenRef: { source: "env", provider: "default", id: "MINIMAX_TOKEN" },
|
||||
expires: 0,
|
||||
},
|
||||
"codex:default": {
|
||||
type: "oauth",
|
||||
provider: "openai-codex",
|
||||
refresh: "refresh-token",
|
||||
expires: 0,
|
||||
},
|
||||
},
|
||||
order: {
|
||||
openai: ["openai:default"],
|
||||
},
|
||||
lastGood: {
|
||||
openai: "openai:default",
|
||||
},
|
||||
usageStats: {
|
||||
"openai:default": {
|
||||
disabledUntil: 123,
|
||||
disabledReason: "billing",
|
||||
failureCounts: { billing: 2 },
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(store?.profiles["broken:array"]).toBeUndefined();
|
||||
expect(store?.profiles["openai:default"]).not.toHaveProperty("key");
|
||||
expect(store?.profiles["openai:default"]).not.toHaveProperty("copyToAgents");
|
||||
expect(store?.profiles["codex:default"]).not.toHaveProperty("oauthRef");
|
||||
});
|
||||
});
|
||||
@@ -77,6 +77,38 @@ function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return !!value && typeof value === "object" && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function normalizeOptionalCredentialString(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed ? value : undefined;
|
||||
}
|
||||
|
||||
function normalizeOptionalCredentialBoolean(value: unknown): boolean | undefined {
|
||||
return typeof value === "boolean" ? value : undefined;
|
||||
}
|
||||
|
||||
function normalizeExpiryField(value: unknown): number | undefined {
|
||||
if (value === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : 0;
|
||||
}
|
||||
|
||||
function normalizeCredentialMetadata(value: unknown): Record<string, string> | undefined {
|
||||
if (!isRecord(value)) {
|
||||
return undefined;
|
||||
}
|
||||
const metadata: Record<string, string> = {};
|
||||
for (const [key, entry] of Object.entries(value)) {
|
||||
if (typeof entry === "string") {
|
||||
metadata[key] = entry;
|
||||
}
|
||||
}
|
||||
return Object.keys(metadata).length > 0 ? metadata : undefined;
|
||||
}
|
||||
|
||||
function normalizeSecretBackedField(params: {
|
||||
entry: Record<string, unknown>;
|
||||
valueField: "key" | "token";
|
||||
@@ -93,6 +125,25 @@ function normalizeSecretBackedField(params: {
|
||||
delete params.entry[params.valueField];
|
||||
}
|
||||
|
||||
function normalizeCommonCredentialFields(entry: Record<string, unknown>): Record<string, unknown> {
|
||||
const normalized: Record<string, unknown> = {
|
||||
provider: typeof entry.provider === "string" ? normalizeProviderId(entry.provider) : "",
|
||||
};
|
||||
const copyToAgents = normalizeOptionalCredentialBoolean(entry.copyToAgents);
|
||||
if (copyToAgents !== undefined) {
|
||||
normalized.copyToAgents = copyToAgents;
|
||||
}
|
||||
const email = normalizeOptionalCredentialString(entry.email);
|
||||
if (email !== undefined) {
|
||||
normalized.email = email;
|
||||
}
|
||||
const displayName = normalizeOptionalCredentialString(entry.displayName);
|
||||
if (displayName !== undefined) {
|
||||
normalized.displayName = displayName;
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function normalizeRawCredentialEntry(raw: Record<string, unknown>): Partial<AuthProfileCredential> {
|
||||
const entry = { ...raw } as Record<string, unknown>;
|
||||
if (!("type" in entry) && typeof entry["mode"] === "string") {
|
||||
@@ -103,6 +154,73 @@ function normalizeRawCredentialEntry(raw: Record<string, unknown>): Partial<Auth
|
||||
}
|
||||
normalizeSecretBackedField({ entry, valueField: "key", refField: "keyRef" });
|
||||
normalizeSecretBackedField({ entry, valueField: "token", refField: "tokenRef" });
|
||||
if (entry.type === "api_key") {
|
||||
const normalized: Record<string, unknown> = {
|
||||
type: "api_key",
|
||||
...normalizeCommonCredentialFields(entry),
|
||||
};
|
||||
const key = normalizeOptionalCredentialString(entry.key);
|
||||
const keyRef = coerceSecretRef(entry.keyRef);
|
||||
const metadata = normalizeCredentialMetadata(entry.metadata);
|
||||
if (key !== undefined) {
|
||||
normalized.key = key;
|
||||
}
|
||||
if (keyRef) {
|
||||
normalized.keyRef = keyRef;
|
||||
}
|
||||
if (metadata) {
|
||||
normalized.metadata = metadata;
|
||||
}
|
||||
return normalized as Partial<AuthProfileCredential>;
|
||||
}
|
||||
if (entry.type === "token") {
|
||||
const normalized: Record<string, unknown> = {
|
||||
type: "token",
|
||||
...normalizeCommonCredentialFields(entry),
|
||||
};
|
||||
const token = normalizeOptionalCredentialString(entry.token);
|
||||
const tokenRef = coerceSecretRef(entry.tokenRef);
|
||||
const expires = normalizeExpiryField(entry.expires);
|
||||
if (token !== undefined) {
|
||||
normalized.token = token;
|
||||
}
|
||||
if (tokenRef) {
|
||||
normalized.tokenRef = tokenRef;
|
||||
}
|
||||
if (expires !== undefined) {
|
||||
normalized.expires = expires;
|
||||
}
|
||||
return normalized as Partial<AuthProfileCredential>;
|
||||
}
|
||||
if (entry.type === "oauth") {
|
||||
const normalized: Record<string, unknown> = {
|
||||
type: "oauth",
|
||||
...normalizeCommonCredentialFields(entry),
|
||||
};
|
||||
for (const field of [
|
||||
"access",
|
||||
"refresh",
|
||||
"idToken",
|
||||
"clientId",
|
||||
"enterpriseUrl",
|
||||
"projectId",
|
||||
"accountId",
|
||||
"chatgptPlanType",
|
||||
] as const) {
|
||||
const value = normalizeOptionalCredentialString(entry[field]);
|
||||
if (value !== undefined) {
|
||||
normalized[field] = value;
|
||||
}
|
||||
}
|
||||
const expires = normalizeExpiryField(entry.expires);
|
||||
if (expires !== undefined) {
|
||||
normalized.expires = expires;
|
||||
}
|
||||
if (isOAuthProfileSecretRef(entry.oauthRef)) {
|
||||
normalized.oauthRef = entry.oauthRef;
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
return entry as Partial<AuthProfileCredential>;
|
||||
}
|
||||
|
||||
@@ -528,22 +646,23 @@ function parseCredentialEntry(
|
||||
raw: unknown,
|
||||
fallbackProvider?: string,
|
||||
): { ok: true; credential: AuthProfileCredential } | { ok: false; reason: CredentialRejectReason } {
|
||||
if (!raw || typeof raw !== "object") {
|
||||
if (!isRecord(raw)) {
|
||||
return { ok: false, reason: "non_object" };
|
||||
}
|
||||
const typed = normalizeRawCredentialEntry(raw as Record<string, unknown>);
|
||||
const typed = normalizeRawCredentialEntry(raw);
|
||||
if (!AUTH_PROFILE_TYPES.has(typed.type as AuthProfileCredential["type"])) {
|
||||
return { ok: false, reason: "invalid_type" };
|
||||
}
|
||||
const provider = typed.provider ?? fallbackProvider;
|
||||
if (typeof provider !== "string" || provider.trim().length === 0) {
|
||||
const normalizedProvider = typeof provider === "string" ? normalizeProviderId(provider) : "";
|
||||
if (!normalizedProvider) {
|
||||
return { ok: false, reason: "missing_provider" };
|
||||
}
|
||||
return {
|
||||
ok: true,
|
||||
credential: {
|
||||
...typed,
|
||||
provider,
|
||||
provider: normalizedProvider,
|
||||
} as AuthProfileCredential,
|
||||
};
|
||||
}
|
||||
@@ -609,8 +728,9 @@ export function coercePersistedAuthProfileStore(raw: unknown): AuthProfileStore
|
||||
normalized[key] = parsed.credential;
|
||||
}
|
||||
warnRejectedCredentialEntries("auth-profiles.json", rejected);
|
||||
const version = Number(record.version ?? AUTH_STORE_VERSION);
|
||||
return {
|
||||
version: Number(record.version ?? AUTH_STORE_VERSION),
|
||||
version: Number.isFinite(version) && version > 0 ? version : AUTH_STORE_VERSION,
|
||||
profiles: normalized,
|
||||
...coerceAuthProfileState(record),
|
||||
};
|
||||
|
||||
@@ -1,44 +1,159 @@
|
||||
import fs from "node:fs";
|
||||
import { loadJsonFile, saveJsonFile } from "../../infra/json-file.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { normalizeProviderId } from "../provider-id.js";
|
||||
import { AUTH_STORE_VERSION } from "./constants.js";
|
||||
import { resolveAuthStatePath } from "./paths.js";
|
||||
import type { AuthProfileState, AuthProfileStateStore, ProfileUsageStats } from "./types.js";
|
||||
import type {
|
||||
AuthProfileBlockedReason,
|
||||
AuthProfileBlockedSource,
|
||||
AuthProfileFailureReason,
|
||||
AuthProfileState,
|
||||
AuthProfileStateStore,
|
||||
ProfileUsageStats,
|
||||
} from "./types.js";
|
||||
|
||||
function normalizeAuthProfileOrder(raw: unknown): AuthProfileState["order"] {
|
||||
if (!raw || typeof raw !== "object") {
|
||||
const AUTH_FAILURE_REASONS = new Set<AuthProfileFailureReason>([
|
||||
"auth",
|
||||
"auth_permanent",
|
||||
"format",
|
||||
"overloaded",
|
||||
"rate_limit",
|
||||
"billing",
|
||||
"timeout",
|
||||
"model_not_found",
|
||||
"session_expired",
|
||||
"empty_response",
|
||||
"no_error_details",
|
||||
"unclassified",
|
||||
"unknown",
|
||||
]);
|
||||
const AUTH_BLOCKED_REASONS = new Set<AuthProfileBlockedReason>(["subscription_limit"]);
|
||||
const AUTH_BLOCKED_SOURCES = new Set<AuthProfileBlockedSource>(["codex_rate_limits", "wham"]);
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return !!value && typeof value === "object" && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function normalizeFiniteNumber(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
function normalizeEnumValue<T extends string>(value: unknown, allowed: Set<T>): T | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = Object.entries(raw as Record<string, unknown>).reduce<
|
||||
Record<string, string[]>
|
||||
>((acc, [provider, value]) => {
|
||||
if (!Array.isArray(value)) {
|
||||
return allowed.has(value as T) ? (value as T) : undefined;
|
||||
}
|
||||
|
||||
function normalizeFailureCounts(raw: unknown): ProfileUsageStats["failureCounts"] {
|
||||
if (!isRecord(raw)) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized: NonNullable<ProfileUsageStats["failureCounts"]> = {};
|
||||
for (const [reason, count] of Object.entries(raw)) {
|
||||
if (!AUTH_FAILURE_REASONS.has(reason as AuthProfileFailureReason)) {
|
||||
continue;
|
||||
}
|
||||
if (typeof count !== "number" || !Number.isFinite(count) || count <= 0) {
|
||||
continue;
|
||||
}
|
||||
normalized[reason as AuthProfileFailureReason] = Math.trunc(count);
|
||||
}
|
||||
return Object.keys(normalized).length > 0 ? normalized : undefined;
|
||||
}
|
||||
|
||||
function normalizeAuthProfileOrder(raw: unknown): AuthProfileState["order"] {
|
||||
if (!isRecord(raw)) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = Object.entries(raw).reduce<Record<string, string[]>>(
|
||||
(acc, [provider, value]) => {
|
||||
if (!Array.isArray(value)) {
|
||||
return acc;
|
||||
}
|
||||
const providerKey = normalizeProviderId(provider);
|
||||
if (!providerKey) {
|
||||
return acc;
|
||||
}
|
||||
const list = value.map((entry) => normalizeOptionalString(entry) ?? "").filter(Boolean);
|
||||
if (list.length > 0) {
|
||||
acc[providerKey] = list;
|
||||
}
|
||||
return acc;
|
||||
},
|
||||
{},
|
||||
);
|
||||
return Object.keys(normalized).length > 0 ? normalized : undefined;
|
||||
}
|
||||
|
||||
function normalizeLastGood(raw: unknown): AuthProfileState["lastGood"] {
|
||||
if (!isRecord(raw)) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized: Record<string, string> = {};
|
||||
for (const [provider, profileId] of Object.entries(raw)) {
|
||||
const providerKey = normalizeProviderId(provider);
|
||||
const normalizedProfileId = normalizeOptionalString(profileId);
|
||||
if (!providerKey || !normalizedProfileId) {
|
||||
continue;
|
||||
}
|
||||
const list = value.map((entry) => normalizeOptionalString(entry) ?? "").filter(Boolean);
|
||||
if (list.length > 0) {
|
||||
acc[provider] = list;
|
||||
normalized[providerKey] = normalizedProfileId;
|
||||
}
|
||||
return Object.keys(normalized).length > 0 ? normalized : undefined;
|
||||
}
|
||||
|
||||
function normalizeUsageStatsEntry(raw: unknown): ProfileUsageStats | undefined {
|
||||
if (!isRecord(raw)) {
|
||||
return undefined;
|
||||
}
|
||||
const stats: ProfileUsageStats = {
|
||||
lastUsed: normalizeFiniteNumber(raw.lastUsed),
|
||||
blockedUntil: normalizeFiniteNumber(raw.blockedUntil),
|
||||
blockedReason: normalizeEnumValue(raw.blockedReason, AUTH_BLOCKED_REASONS),
|
||||
blockedSource: normalizeEnumValue(raw.blockedSource, AUTH_BLOCKED_SOURCES),
|
||||
blockedModel: normalizeOptionalString(raw.blockedModel),
|
||||
cooldownUntil: normalizeFiniteNumber(raw.cooldownUntil),
|
||||
cooldownReason: normalizeEnumValue(raw.cooldownReason, AUTH_FAILURE_REASONS),
|
||||
cooldownModel: normalizeOptionalString(raw.cooldownModel),
|
||||
disabledUntil: normalizeFiniteNumber(raw.disabledUntil),
|
||||
disabledReason: normalizeEnumValue(raw.disabledReason, AUTH_FAILURE_REASONS),
|
||||
errorCount: normalizeFiniteNumber(raw.errorCount),
|
||||
failureCounts: normalizeFailureCounts(raw.failureCounts),
|
||||
lastFailureAt: normalizeFiniteNumber(raw.lastFailureAt),
|
||||
};
|
||||
for (const key of Object.keys(stats) as Array<keyof ProfileUsageStats>) {
|
||||
if (stats[key] === undefined) {
|
||||
delete stats[key];
|
||||
}
|
||||
return acc;
|
||||
}, {});
|
||||
}
|
||||
return Object.keys(stats).length > 0 ? stats : undefined;
|
||||
}
|
||||
|
||||
function normalizeUsageStats(raw: unknown): AuthProfileState["usageStats"] {
|
||||
if (!isRecord(raw)) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized: Record<string, ProfileUsageStats> = {};
|
||||
for (const [profileId, value] of Object.entries(raw)) {
|
||||
const normalizedProfileId = normalizeOptionalString(profileId);
|
||||
const stats = normalizeUsageStatsEntry(value);
|
||||
if (!normalizedProfileId || !stats) {
|
||||
continue;
|
||||
}
|
||||
normalized[normalizedProfileId] = stats;
|
||||
}
|
||||
return Object.keys(normalized).length > 0 ? normalized : undefined;
|
||||
}
|
||||
|
||||
export function coerceAuthProfileState(raw: unknown): AuthProfileState {
|
||||
if (!raw || typeof raw !== "object") {
|
||||
if (!isRecord(raw)) {
|
||||
return {};
|
||||
}
|
||||
const record = raw as Record<string, unknown>;
|
||||
return {
|
||||
order: normalizeAuthProfileOrder(record.order),
|
||||
lastGood:
|
||||
record.lastGood && typeof record.lastGood === "object"
|
||||
? (record.lastGood as Record<string, string>)
|
||||
: undefined,
|
||||
usageStats:
|
||||
record.usageStats && typeof record.usageStats === "object"
|
||||
? (record.usageStats as Record<string, ProfileUsageStats>)
|
||||
: undefined,
|
||||
order: normalizeAuthProfileOrder(raw.order),
|
||||
lastGood: normalizeLastGood(raw.lastGood),
|
||||
usageStats: normalizeUsageStats(raw.usageStats),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -142,6 +142,34 @@ describe("normalizeStoredCronJobs", () => {
|
||||
expect(result.issues.legacyPayloadKind).toBeUndefined();
|
||||
});
|
||||
|
||||
it("removes unrepairable persisted schedule and payload shapes", () => {
|
||||
const jobs = [
|
||||
makeLegacyJob({
|
||||
id: "valid",
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 },
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
}),
|
||||
makeLegacyJob({
|
||||
id: "bad-schedule",
|
||||
schedule: { kind: "cron", expr: [] },
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
}),
|
||||
makeLegacyJob({
|
||||
id: "bad-payload",
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 },
|
||||
payload: { kind: "agentTurn", message: ["tick"] },
|
||||
}),
|
||||
];
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues.invalidSchedule).toBe(1);
|
||||
expect(result.issues.invalidPayload).toBe(1);
|
||||
expect(jobs.map((job) => job.id)).toEqual(["valid"]);
|
||||
expect(result.jobs.map((job) => job.id)).toEqual(["valid"]);
|
||||
});
|
||||
|
||||
it("normalizes whitespace-padded and non-canonical payload kinds", () => {
|
||||
const jobs = [
|
||||
{
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { parseAbsoluteTimeMs } from "../cron/parse.js";
|
||||
import { getInvalidPersistedCronJobReason } from "../cron/persisted-shape.js";
|
||||
import { coerceFiniteScheduleNumber } from "../cron/schedule.js";
|
||||
import { inferLegacyName } from "../cron/service/normalize.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../cron/stagger.js";
|
||||
@@ -26,7 +27,9 @@ type CronStoreIssueKey =
|
||||
| "legacyPayloadProvider"
|
||||
| "legacyTopLevelPayloadFields"
|
||||
| "legacyTopLevelDeliveryFields"
|
||||
| "legacyDeliveryMode";
|
||||
| "legacyDeliveryMode"
|
||||
| "invalidSchedule"
|
||||
| "invalidPayload";
|
||||
|
||||
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
|
||||
|
||||
@@ -235,6 +238,7 @@ export function normalizeStoredCronJobs(
|
||||
): NormalizeCronStoreJobsResult {
|
||||
const issues: CronStoreIssues = {};
|
||||
let mutated = false;
|
||||
const keptJobs: Array<Record<string, unknown>> = [];
|
||||
|
||||
for (const raw of jobs) {
|
||||
const jobIssues = new Set<CronStoreIssueKey>();
|
||||
@@ -560,6 +564,29 @@ export function normalizeStoredCronJobs(
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const invalidPersistedReason = getInvalidPersistedCronJobReason(raw);
|
||||
if (
|
||||
invalidPersistedReason === "missing-schedule" ||
|
||||
invalidPersistedReason === "invalid-schedule"
|
||||
) {
|
||||
trackIssue("invalidSchedule");
|
||||
mutated = true;
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
invalidPersistedReason === "missing-payload" ||
|
||||
invalidPersistedReason === "invalid-payload"
|
||||
) {
|
||||
trackIssue("invalidPayload");
|
||||
mutated = true;
|
||||
continue;
|
||||
}
|
||||
keptJobs.push(raw);
|
||||
}
|
||||
|
||||
if (keptJobs.length !== jobs.length) {
|
||||
jobs.splice(0, jobs.length, ...keptJobs);
|
||||
}
|
||||
|
||||
return { issues, jobs, mutated };
|
||||
|
||||
@@ -81,6 +81,16 @@ function formatLegacyIssuePreview(issues: Partial<Record<string, number>>): stri
|
||||
`- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``,
|
||||
);
|
||||
}
|
||||
if (issues.invalidSchedule) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.invalidSchedule, "job")} has an invalid persisted schedule and will be removed`,
|
||||
);
|
||||
}
|
||||
if (issues.invalidPayload) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.invalidPayload, "job")} has an invalid persisted payload and will be removed`,
|
||||
);
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
|
||||
|
||||
66
src/cron/persisted-shape.ts
Normal file
66
src/cron/persisted-shape.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { parseAbsoluteTimeMs } from "./parse.js";
|
||||
|
||||
export type InvalidPersistedCronJobReason =
|
||||
| "missing-id"
|
||||
| "missing-schedule"
|
||||
| "invalid-schedule"
|
||||
| "missing-payload"
|
||||
| "invalid-payload";
|
||||
|
||||
export function getInvalidPersistedCronJobReason(
|
||||
candidate: Record<string, unknown>,
|
||||
): InvalidPersistedCronJobReason | null {
|
||||
const id = candidate.id;
|
||||
if (typeof id !== "string" || !id.trim()) {
|
||||
return "missing-id";
|
||||
}
|
||||
const schedule = candidate.schedule;
|
||||
if (!schedule || typeof schedule !== "object" || Array.isArray(schedule)) {
|
||||
return "missing-schedule";
|
||||
}
|
||||
const scheduleRecord = schedule as Record<string, unknown>;
|
||||
const scheduleKind = scheduleRecord.kind;
|
||||
if (scheduleKind !== "at" && scheduleKind !== "every" && scheduleKind !== "cron") {
|
||||
return "invalid-schedule";
|
||||
}
|
||||
if (scheduleKind === "at") {
|
||||
const at = scheduleRecord.at;
|
||||
if (typeof at !== "string" || parseAbsoluteTimeMs(at) === null) {
|
||||
return "invalid-schedule";
|
||||
}
|
||||
}
|
||||
if (scheduleKind === "every") {
|
||||
const everyMs = scheduleRecord.everyMs;
|
||||
if (typeof everyMs !== "number" || !Number.isFinite(everyMs) || everyMs <= 0) {
|
||||
return "invalid-schedule";
|
||||
}
|
||||
}
|
||||
if (scheduleKind === "cron") {
|
||||
const expr = scheduleRecord.expr;
|
||||
if (typeof expr !== "string" || expr.trim().length === 0) {
|
||||
return "invalid-schedule";
|
||||
}
|
||||
}
|
||||
const payload = candidate.payload;
|
||||
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
|
||||
return "missing-payload";
|
||||
}
|
||||
const payloadRecord = payload as Record<string, unknown>;
|
||||
const payloadKind = payloadRecord.kind;
|
||||
if (payloadKind !== "systemEvent" && payloadKind !== "agentTurn") {
|
||||
return "invalid-payload";
|
||||
}
|
||||
if (payloadKind === "systemEvent") {
|
||||
const text = payloadRecord.text;
|
||||
if (typeof text !== "string" || text.trim().length === 0) {
|
||||
return "invalid-payload";
|
||||
}
|
||||
}
|
||||
if (payloadKind === "agentTurn") {
|
||||
const message = payloadRecord.message;
|
||||
if (typeof message !== "string" || message.trim().length === 0) {
|
||||
return "invalid-payload";
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -162,6 +162,42 @@ describe("cron service store load: missing sessionTarget", () => {
|
||||
payload: ["systemEvent", "tick"],
|
||||
state: {},
|
||||
},
|
||||
{
|
||||
id: "bad-cron-expr",
|
||||
name: "bad cron expr",
|
||||
enabled: true,
|
||||
createdAtMs: STORE_TEST_NOW - 60_000,
|
||||
updatedAtMs: STORE_TEST_NOW - 60_000,
|
||||
schedule: { kind: "cron", expr: [] },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
},
|
||||
{
|
||||
id: "bad-system-event-text",
|
||||
name: "bad system event text",
|
||||
enabled: true,
|
||||
createdAtMs: STORE_TEST_NOW - 60_000,
|
||||
updatedAtMs: STORE_TEST_NOW - 60_000,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: ["tick"] },
|
||||
state: {},
|
||||
},
|
||||
{
|
||||
id: "bad-agent-turn-message",
|
||||
name: "bad agent turn message",
|
||||
enabled: true,
|
||||
createdAtMs: STORE_TEST_NOW - 60_000,
|
||||
updatedAtMs: STORE_TEST_NOW - 60_000,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: { text: "tick" } },
|
||||
state: {},
|
||||
},
|
||||
]);
|
||||
const beforeRaw = await fs.readFile(storePath, "utf-8");
|
||||
const warnSpy = vi.spyOn(logger, "warn");
|
||||
@@ -178,10 +214,13 @@ describe("cron service store load: missing sessionTarget", () => {
|
||||
const msg = typeof call[1] === "string" ? call[1] : "";
|
||||
return msg.includes("skipped invalid persisted job");
|
||||
});
|
||||
expect(invalidShapeWarns).toHaveLength(2);
|
||||
expect(invalidShapeWarns).toHaveLength(5);
|
||||
expect(invalidShapeWarns.map((call) => (call[0] as { reason?: string }).reason)).toEqual([
|
||||
"missing-schedule",
|
||||
"missing-payload",
|
||||
"invalid-schedule",
|
||||
"invalid-payload",
|
||||
"invalid-payload",
|
||||
]);
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import { normalizeCronJobIdentityFields } from "../normalize-job-identity.js";
|
||||
import { normalizeCronJobInput } from "../normalize.js";
|
||||
import { getInvalidPersistedCronJobReason } from "../persisted-shape.js";
|
||||
import { cronSchedulingInputsEqual } from "../schedule-identity.js";
|
||||
import { isInvalidCronSessionTargetIdError } from "../session-target.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
@@ -20,30 +21,6 @@ function invalidateStaleNextRunOnScheduleChange(params: {
|
||||
params.hydrated.state.nextRunAtMs = undefined;
|
||||
}
|
||||
|
||||
function getInvalidPersistedCronJobReason(candidate: Record<string, unknown>): string | null {
|
||||
const id = candidate.id;
|
||||
if (typeof id !== "string" || !id.trim()) {
|
||||
return "missing-id";
|
||||
}
|
||||
const schedule = candidate.schedule;
|
||||
if (!schedule || typeof schedule !== "object" || Array.isArray(schedule)) {
|
||||
return "missing-schedule";
|
||||
}
|
||||
const scheduleKind = (schedule as { kind?: unknown }).kind;
|
||||
if (scheduleKind !== "at" && scheduleKind !== "every" && scheduleKind !== "cron") {
|
||||
return "invalid-schedule";
|
||||
}
|
||||
const payload = candidate.payload;
|
||||
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
|
||||
return "missing-payload";
|
||||
}
|
||||
const payloadKind = (payload as { kind?: unknown }).kind;
|
||||
if (payloadKind !== "systemEvent" && payloadKind !== "agentTurn") {
|
||||
return "invalid-payload";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function warnInvalidPersistedCronJob(params: {
|
||||
state: CronServiceState;
|
||||
raw: Record<string, unknown>;
|
||||
@@ -57,7 +34,12 @@ function warnInvalidPersistedCronJob(params: {
|
||||
}
|
||||
params.state.warnedInvalidPersistedJobKeys.add(dedupeKey);
|
||||
params.state.deps.log.warn(
|
||||
{ storePath: params.state.deps.storePath, jobId, jobIndex: params.index, reason: params.reason },
|
||||
{
|
||||
storePath: params.state.deps.storePath,
|
||||
jobId,
|
||||
jobIndex: params.index,
|
||||
reason: params.reason,
|
||||
},
|
||||
"cron: skipped invalid persisted job; run openclaw doctor --fix to repair",
|
||||
);
|
||||
}
|
||||
@@ -114,10 +96,9 @@ export async function ensureLoaded(
|
||||
}
|
||||
const hydrated =
|
||||
normalized && typeof normalized === "object" ? (normalized as unknown as CronJob) : job;
|
||||
const invalidReason = getInvalidPersistedCronJobReason(hydrated as unknown as Record<
|
||||
string,
|
||||
unknown
|
||||
>);
|
||||
const invalidReason = getInvalidPersistedCronJobReason(
|
||||
hydrated as unknown as Record<string, unknown>,
|
||||
);
|
||||
if (invalidReason) {
|
||||
warnInvalidPersistedCronJob({ state, raw, index, reason: invalidReason });
|
||||
continue;
|
||||
|
||||
@@ -342,6 +342,74 @@ describe("session-compaction-checkpoints", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
test("async fork skips JSON-valid garbage transcript entries", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-garbage-fork-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
const sourceFile = path.join(dir, "garbage.jsonl");
|
||||
const firstMessage = {
|
||||
type: "message",
|
||||
id: "first",
|
||||
parentId: null,
|
||||
message: {
|
||||
role: "user",
|
||||
content: "first",
|
||||
timestamp: 1,
|
||||
},
|
||||
};
|
||||
const secondMessage = {
|
||||
type: "message",
|
||||
id: "second",
|
||||
parentId: "first",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "second",
|
||||
api: "responses",
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timestamp: 2,
|
||||
},
|
||||
};
|
||||
await fs.writeFile(
|
||||
sourceFile,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
version: CURRENT_SESSION_VERSION,
|
||||
id: "source-session",
|
||||
timestamp: new Date(0).toISOString(),
|
||||
cwd: dir,
|
||||
}),
|
||||
JSON.stringify(firstMessage),
|
||||
"null",
|
||||
"[]",
|
||||
'"garbage"',
|
||||
JSON.stringify(secondMessage),
|
||||
"{truncated-json",
|
||||
"",
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const forked = await forkCompactionCheckpointTranscriptAsync({
|
||||
sourceFile,
|
||||
sessionDir: dir,
|
||||
});
|
||||
|
||||
if (!forked) {
|
||||
throw new Error("expected forked checkpoint transcript");
|
||||
}
|
||||
const forkedEntries = (await fs.readFile(forked.sessionFile, "utf-8"))
|
||||
.trim()
|
||||
.split(/\r?\n/)
|
||||
.map((line) => JSON.parse(line) as Record<string, unknown>);
|
||||
expect(forkedEntries.map((entry) => entry.type)).toEqual(["session", "message", "message"]);
|
||||
expect(requireRecord(forkedEntries[1]?.message, "first forked message").content).toBe("first");
|
||||
expect(requireRecord(forkedEntries[2]?.message, "second forked message").content).toBe(
|
||||
"second",
|
||||
);
|
||||
});
|
||||
|
||||
test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-trim-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
@@ -155,7 +155,11 @@ async function readTranscriptEntriesForForkAsync(
|
||||
try {
|
||||
for await (const line of streamSessionTranscriptLines(sessionFile)) {
|
||||
try {
|
||||
entries.push(JSON.parse(line) as PiSessionFileEntry);
|
||||
const parsed = JSON.parse(line) as unknown;
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
continue;
|
||||
}
|
||||
entries.push(parsed as PiSessionFileEntry);
|
||||
} catch {
|
||||
// Match pi-coding-agent's loader: malformed JSONL entries are ignored.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user