mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 16:34:07 +00:00
fix: serialize skill proposal lifecycle mutations
This commit is contained in:
@@ -344,98 +344,96 @@ export async function reviseSkillProposal(
|
||||
input: SkillProposalReviseInput,
|
||||
): Promise<SkillProposalReadResult> {
|
||||
const config = resolveSkillWorkshopConfig(input.config);
|
||||
const read = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
const { record } = read;
|
||||
if (record.status !== "pending") {
|
||||
throw new Error(`Only pending proposals can be revised. Current status: ${record.status}.`);
|
||||
}
|
||||
assertInsideWorkspace(input.workspaceDir, record.target.skillFile, "skill file");
|
||||
assertInsideWorkspace(input.workspaceDir, record.target.skillDir, "skill directory");
|
||||
return await withPendingSkillProposalMutation(input, "revised", async (read) => {
|
||||
const { record } = read;
|
||||
assertInsideWorkspace(input.workspaceDir, record.target.skillFile, "skill file");
|
||||
assertInsideWorkspace(input.workspaceDir, record.target.skillDir, "skill directory");
|
||||
|
||||
if (record.kind === "create") {
|
||||
const currentContent = await readWorkspaceSkillFile(record.target.skillFile);
|
||||
if (currentContent !== null) {
|
||||
await markProposalStale(record, "Target skill was created after proposal creation.");
|
||||
throw new Error("Target skill was created after proposal creation; proposal marked stale.");
|
||||
if (record.kind === "create") {
|
||||
const currentContent = await readWorkspaceSkillFile(record.target.skillFile);
|
||||
if (currentContent !== null) {
|
||||
await markProposalStale(record, "Target skill was created after proposal creation.");
|
||||
throw new Error("Target skill was created after proposal creation; proposal marked stale.");
|
||||
}
|
||||
} else {
|
||||
const currentContent = await readWorkspaceSkillFile(record.target.skillFile);
|
||||
if (currentContent === null) {
|
||||
throw new Error(`Target skill is missing: ${record.target.skillFile}`);
|
||||
}
|
||||
if (
|
||||
record.target.currentContentHash &&
|
||||
hashSkillProposalContent(currentContent) !== record.target.currentContentHash
|
||||
) {
|
||||
await markProposalStale(record, "Target skill changed after proposal creation.");
|
||||
throw new Error("Target skill changed after proposal creation; proposal marked stale.");
|
||||
}
|
||||
await assertSupportTargetsUnchanged(record);
|
||||
}
|
||||
} else {
|
||||
const currentContent = await readWorkspaceSkillFile(record.target.skillFile);
|
||||
if (currentContent === null) {
|
||||
throw new Error(`Target skill is missing: ${record.target.skillFile}`);
|
||||
}
|
||||
if (
|
||||
record.target.currentContentHash &&
|
||||
hashSkillProposalContent(currentContent) !== record.target.currentContentHash
|
||||
) {
|
||||
await markProposalStale(record, "Target skill changed after proposal creation.");
|
||||
throw new Error("Target skill changed after proposal creation; proposal marked stale.");
|
||||
}
|
||||
await assertSupportTargetsUnchanged(record);
|
||||
}
|
||||
|
||||
const supportFiles =
|
||||
input.supportFiles === undefined
|
||||
? await readProposalSupportFiles(record)
|
||||
: prepareSkillProposalSupportFiles(input.supportFiles);
|
||||
assertProposalContentWithinLimit(input.content, config.maxSkillBytes);
|
||||
const supportFileMetadata =
|
||||
supportFiles.length > 0
|
||||
? await buildSupportFileMetadata(
|
||||
supportFiles,
|
||||
record.kind === "update" ? record.target.skillDir : undefined,
|
||||
)
|
||||
: [];
|
||||
const nextVersion = nextProposalVersion(record.proposedVersion);
|
||||
const description = normalizeOptionalString(input.description) ?? record.description;
|
||||
assertProposalDescriptionWithinLimit(description);
|
||||
const now = new Date().toISOString();
|
||||
const proposalContent = renderProposalMarkdown({
|
||||
name: record.target.skillKey,
|
||||
description,
|
||||
content: input.content,
|
||||
fallbackFrontmatterContent: read.content,
|
||||
version: nextVersion,
|
||||
date: now,
|
||||
const supportFiles =
|
||||
input.supportFiles === undefined
|
||||
? await readProposalSupportFiles(record)
|
||||
: prepareSkillProposalSupportFiles(input.supportFiles);
|
||||
assertProposalContentWithinLimit(input.content, config.maxSkillBytes);
|
||||
const supportFileMetadata =
|
||||
supportFiles.length > 0
|
||||
? await buildSupportFileMetadata(
|
||||
supportFiles,
|
||||
record.kind === "update" ? record.target.skillDir : undefined,
|
||||
)
|
||||
: [];
|
||||
const nextVersion = nextProposalVersion(record.proposedVersion);
|
||||
const description = normalizeOptionalString(input.description) ?? record.description;
|
||||
assertProposalDescriptionWithinLimit(description);
|
||||
const now = new Date().toISOString();
|
||||
const proposalContent = renderProposalMarkdown({
|
||||
name: record.target.skillKey,
|
||||
description,
|
||||
content: input.content,
|
||||
fallbackFrontmatterContent: read.content,
|
||||
version: nextVersion,
|
||||
date: now,
|
||||
});
|
||||
const goal =
|
||||
input.goal === undefined
|
||||
? normalizeOptionalString(record.goal)
|
||||
: normalizeOptionalString(input.goal);
|
||||
const evidence =
|
||||
input.evidence === undefined
|
||||
? normalizeOptionalString(record.evidence)
|
||||
: normalizeOptionalString(input.evidence);
|
||||
const previousSupportFiles = record.supportFiles;
|
||||
const revised: SkillProposalRecord = {
|
||||
...record,
|
||||
description,
|
||||
updatedAt: now,
|
||||
proposedVersion: nextVersion,
|
||||
draftHash: hashSkillProposalContent(proposalContent),
|
||||
scan: scanProposalBundle(proposalContent, supportFiles),
|
||||
};
|
||||
if (supportFiles.length > 0) {
|
||||
revised.supportFiles = supportFileMetadata;
|
||||
} else {
|
||||
delete revised.supportFiles;
|
||||
}
|
||||
if (goal) {
|
||||
revised.goal = goal;
|
||||
} else {
|
||||
delete revised.goal;
|
||||
}
|
||||
if (evidence) {
|
||||
revised.evidence = evidence;
|
||||
} else {
|
||||
delete revised.evidence;
|
||||
}
|
||||
await replaceSkillProposalDraft({
|
||||
record: revised,
|
||||
previousSupportFiles,
|
||||
content: proposalContent,
|
||||
supportFiles,
|
||||
});
|
||||
return { record: revised, content: proposalContent };
|
||||
});
|
||||
const goal =
|
||||
input.goal === undefined
|
||||
? normalizeOptionalString(record.goal)
|
||||
: normalizeOptionalString(input.goal);
|
||||
const evidence =
|
||||
input.evidence === undefined
|
||||
? normalizeOptionalString(record.evidence)
|
||||
: normalizeOptionalString(input.evidence);
|
||||
const previousSupportFiles = record.supportFiles;
|
||||
const revised: SkillProposalRecord = {
|
||||
...record,
|
||||
description,
|
||||
updatedAt: now,
|
||||
proposedVersion: nextVersion,
|
||||
draftHash: hashSkillProposalContent(proposalContent),
|
||||
scan: scanProposalBundle(proposalContent, supportFiles),
|
||||
};
|
||||
if (supportFiles.length > 0) {
|
||||
revised.supportFiles = supportFileMetadata;
|
||||
} else {
|
||||
delete revised.supportFiles;
|
||||
}
|
||||
if (goal) {
|
||||
revised.goal = goal;
|
||||
} else {
|
||||
delete revised.goal;
|
||||
}
|
||||
if (evidence) {
|
||||
revised.evidence = evidence;
|
||||
} else {
|
||||
delete revised.evidence;
|
||||
}
|
||||
await replaceSkillProposalDraft({
|
||||
record: revised,
|
||||
previousSupportFiles,
|
||||
content: proposalContent,
|
||||
supportFiles,
|
||||
});
|
||||
return { record: revised, content: proposalContent };
|
||||
}
|
||||
|
||||
export async function rejectSkillProposal(
|
||||
@@ -447,38 +445,29 @@ export async function rejectSkillProposal(
|
||||
export async function quarantineSkillProposal(
|
||||
input: SkillProposalActionInput,
|
||||
): Promise<SkillProposalRecord> {
|
||||
const read = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
if (read.record.status !== "pending") {
|
||||
throw new Error(
|
||||
`Only pending proposals can be quarantined. Current status: ${read.record.status}.`,
|
||||
);
|
||||
}
|
||||
const now = new Date().toISOString();
|
||||
const record: SkillProposalRecord = {
|
||||
...read.record,
|
||||
status: "quarantined",
|
||||
updatedAt: now,
|
||||
quarantinedAt: now,
|
||||
statusReason: normalizeOptionalString(input.reason),
|
||||
scan: {
|
||||
...read.record.scan,
|
||||
state: "quarantined",
|
||||
},
|
||||
};
|
||||
await updateSkillProposalRecord({ record });
|
||||
return record;
|
||||
return await withPendingSkillProposalMutation(input, "quarantined", async (read) => {
|
||||
const now = new Date().toISOString();
|
||||
const record: SkillProposalRecord = {
|
||||
...read.record,
|
||||
status: "quarantined",
|
||||
updatedAt: now,
|
||||
quarantinedAt: now,
|
||||
statusReason: normalizeOptionalString(input.reason),
|
||||
scan: {
|
||||
...read.record.scan,
|
||||
state: "quarantined",
|
||||
},
|
||||
};
|
||||
await updateSkillProposalRecord({ record });
|
||||
return record;
|
||||
});
|
||||
}
|
||||
|
||||
export async function applySkillProposal(
|
||||
input: SkillProposalActionInput,
|
||||
): Promise<SkillProposalApplyResult> {
|
||||
const initial = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
return await withSkillProposalTargetLock(initial.record, async () => {
|
||||
const read = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
return await withPendingSkillProposalMutation(input, "applied", async (read) => {
|
||||
const { record, content } = read;
|
||||
if (record.status !== "pending") {
|
||||
throw new Error(`Only pending proposals can be applied. Current status: ${record.status}.`);
|
||||
}
|
||||
const draftHash = hashSkillProposalContent(content);
|
||||
if (draftHash !== record.draftHash) {
|
||||
throw new Error("Proposal draft changed without updating proposal metadata.");
|
||||
@@ -785,22 +774,35 @@ async function markProposal(
|
||||
input: SkillProposalActionInput,
|
||||
status: "rejected",
|
||||
): Promise<SkillProposalRecord> {
|
||||
const read = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
if (read.record.status !== "pending") {
|
||||
throw new Error(
|
||||
`Only pending proposals can be rejected. Current status: ${read.record.status}.`,
|
||||
);
|
||||
}
|
||||
const now = new Date().toISOString();
|
||||
const record: SkillProposalRecord = {
|
||||
...read.record,
|
||||
status,
|
||||
updatedAt: now,
|
||||
rejectedAt: now,
|
||||
statusReason: normalizeOptionalString(input.reason),
|
||||
};
|
||||
await updateSkillProposalRecord({ record });
|
||||
return record;
|
||||
return await withPendingSkillProposalMutation(input, status, async (read) => {
|
||||
const now = new Date().toISOString();
|
||||
const record: SkillProposalRecord = {
|
||||
...read.record,
|
||||
status,
|
||||
updatedAt: now,
|
||||
rejectedAt: now,
|
||||
statusReason: normalizeOptionalString(input.reason),
|
||||
};
|
||||
await updateSkillProposalRecord({ record });
|
||||
return record;
|
||||
});
|
||||
}
|
||||
|
||||
async function withPendingSkillProposalMutation<T>(
|
||||
input: Pick<SkillProposalActionInput, "proposalId" | "workspaceDir">,
|
||||
action: "applied" | "quarantined" | "rejected" | "revised",
|
||||
fn: (read: SkillProposalReadResult) => Promise<T>,
|
||||
): Promise<T> {
|
||||
const initial = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
return await withSkillProposalTargetLock(initial.record, async () => {
|
||||
const read = await readRequiredProposal(input.proposalId, input.workspaceDir);
|
||||
if (read.record.status !== "pending") {
|
||||
throw new Error(
|
||||
`Only pending proposals can be ${action}. Current status: ${read.record.status}.`,
|
||||
);
|
||||
}
|
||||
return await fn(read);
|
||||
});
|
||||
}
|
||||
|
||||
async function assertSupportTargetUnchanged(params: {
|
||||
|
||||
@@ -51,6 +51,7 @@ const SKILL_WORKSHOP_LOCK_OPTIONS: FileLockOptions = {
|
||||
},
|
||||
stale: 60_000,
|
||||
};
|
||||
const skillWorkshopProcessLocks = new Map<string, Promise<void>>();
|
||||
|
||||
type SkillWorkshopStoreOptions = {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
@@ -332,8 +333,7 @@ export async function withSkillProposalTargetLock<T>(
|
||||
TARGET_LOCKS_REL_DIR,
|
||||
`${hashSkillProposalContent(record.target.skillFile)}.target`,
|
||||
);
|
||||
await fs.mkdir(path.dirname(lockFile), { recursive: true });
|
||||
return await withFileLock(lockFile, SKILL_WORKSHOP_LOCK_OPTIONS, fn);
|
||||
return await withSkillWorkshopLock(lockFile, fn);
|
||||
}
|
||||
|
||||
export async function writeSkillProposalRollback(params: {
|
||||
@@ -404,8 +404,29 @@ async function withSkillProposalManifestLock<T>(
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const lockFile = path.join(resolveSkillWorkshopStateDir(options), MANIFEST_LOCK_REL_PATH);
|
||||
return await withSkillWorkshopLock(lockFile, fn);
|
||||
}
|
||||
|
||||
async function withSkillWorkshopLock<T>(lockFile: string, fn: () => Promise<T>): Promise<T> {
|
||||
const lockKey = path.resolve(lockFile);
|
||||
const previous = skillWorkshopProcessLocks.get(lockKey) ?? Promise.resolve();
|
||||
let releaseQueued!: () => void;
|
||||
const current = new Promise<void>((resolve) => {
|
||||
releaseQueued = resolve;
|
||||
});
|
||||
const previousDone = previous.catch(() => undefined);
|
||||
const queued = previousDone.then(() => current);
|
||||
skillWorkshopProcessLocks.set(lockKey, queued);
|
||||
await previousDone;
|
||||
await fs.mkdir(path.dirname(lockFile), { recursive: true });
|
||||
return await withFileLock(lockFile, SKILL_WORKSHOP_LOCK_OPTIONS, fn);
|
||||
try {
|
||||
return await withFileLock(lockFile, SKILL_WORKSHOP_LOCK_OPTIONS, fn);
|
||||
} finally {
|
||||
releaseQueued();
|
||||
if (skillWorkshopProcessLocks.get(lockKey) === queued) {
|
||||
skillWorkshopProcessLocks.delete(lockKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function readWorkspaceSkillFile(filePath: string): Promise<string | null> {
|
||||
|
||||
Reference in New Issue
Block a user