Files
openclaw/src/agents/sandbox/fs-bridge-write-helper.ts
2026-03-11 01:15:47 +00:00

110 lines
3.7 KiB
TypeScript

import type { PathSafetyCheck, PinnedSandboxWriteEntry } from "./fs-bridge-path-safety.js";
import type { SandboxFsCommandPlan } from "./fs-bridge-shell-command-plans.js";
export const SANDBOX_PINNED_WRITE_PYTHON = [
"import errno",
"import os",
"import secrets",
"import sys",
"",
"mount_root = sys.argv[1]",
"relative_parent = sys.argv[2]",
"basename = sys.argv[3]",
'mkdir_enabled = sys.argv[4] == "1"',
"",
"DIR_FLAGS = os.O_RDONLY",
"if hasattr(os, 'O_DIRECTORY'):",
" DIR_FLAGS |= os.O_DIRECTORY",
"if hasattr(os, 'O_NOFOLLOW'):",
" DIR_FLAGS |= os.O_NOFOLLOW",
"",
"WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL",
"if hasattr(os, 'O_NOFOLLOW'):",
" WRITE_FLAGS |= os.O_NOFOLLOW",
"",
"def open_dir(path, dir_fd=None):",
" return os.open(path, DIR_FLAGS, dir_fd=dir_fd)",
"",
"def walk_parent(root_fd, rel_parent, mkdir_enabled):",
" current_fd = os.dup(root_fd)",
" try:",
" segments = [segment for segment in rel_parent.split('/') if segment and segment != '.']",
" for segment in segments:",
" if segment == '..':",
" raise OSError(errno.EPERM, 'path traversal is not allowed', segment)",
" try:",
" next_fd = open_dir(segment, dir_fd=current_fd)",
" except FileNotFoundError:",
" if not mkdir_enabled:",
" raise",
" os.mkdir(segment, 0o777, dir_fd=current_fd)",
" next_fd = open_dir(segment, dir_fd=current_fd)",
" os.close(current_fd)",
" current_fd = next_fd",
" return current_fd",
" except Exception:",
" os.close(current_fd)",
" raise",
"",
"def create_temp_file(parent_fd, basename):",
" prefix = '.openclaw-write-' + basename + '.'",
" for _ in range(128):",
" candidate = prefix + secrets.token_hex(6)",
" try:",
" fd = os.open(candidate, WRITE_FLAGS, 0o600, dir_fd=parent_fd)",
" return candidate, fd",
" except FileExistsError:",
" continue",
" raise RuntimeError('failed to allocate sandbox temp file')",
"",
"root_fd = open_dir(mount_root)",
"parent_fd = None",
"temp_fd = None",
"temp_name = None",
"try:",
" parent_fd = walk_parent(root_fd, relative_parent, mkdir_enabled)",
" temp_name, temp_fd = create_temp_file(parent_fd, basename)",
" while True:",
" chunk = sys.stdin.buffer.read(65536)",
" if not chunk:",
" break",
" os.write(temp_fd, chunk)",
" os.fsync(temp_fd)",
" os.close(temp_fd)",
" temp_fd = None",
" os.replace(temp_name, basename, src_dir_fd=parent_fd, dst_dir_fd=parent_fd)",
" os.fsync(parent_fd)",
"except Exception:",
" if temp_fd is not None:",
" os.close(temp_fd)",
" temp_fd = None",
" if temp_name is not None and parent_fd is not None:",
" try:",
" os.unlink(temp_name, dir_fd=parent_fd)",
" except FileNotFoundError:",
" pass",
" raise",
"finally:",
" if parent_fd is not None:",
" os.close(parent_fd)",
" os.close(root_fd)",
].join("\n");
export function buildPinnedWritePlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxWriteEntry;
mkdir: boolean;
}): SandboxFsCommandPlan & { stdin?: Buffer | string } {
return {
checks: [params.check],
recheckBeforeCommand: true,
script: ["set -eu", "python3 - \"$@\" <<'PY'", SANDBOX_PINNED_WRITE_PYTHON, "PY"].join("\n"),
args: [
params.pinned.mountRootPath,
params.pinned.relativeParentPath,
params.pinned.basename,
params.mkdir ? "1" : "0",
],
};
}