Last active May 24, 2024 04:42
Migrate files in cephfs to a new file layout pool recursivly
import os
import shutil
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
import xattr
start_directory = '.' # current directory
scratch_directory = '.scratch'
max_parallel_threads = 4
def has_ceph_pool_attr(file_path, pool_value):
""" Check if the file has the specified ceph pool attribute value using xattr. """
attributes = xattr.xattr(file_path)
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8')
return pool_value in ceph_pool
except (IOError, KeyError):
# IOError for inaccessible files, KeyError if the attribute does not exist
return False
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock):
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """
if has_ceph_pool_attr(file_path, ceph_pool_value):
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}")
return"Processing file: {file_path}")
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
scratch_file_path = os.path.join(scratch_dir, scratch_file_name)
stat_info = os.stat(file_path, follow_symlinks=False)
inode = stat_info.st_ino
nlink = stat_info.st_nlink
if nlink > 1:
with lock:
if inode in hard_links:[inode], file_path)"Hard link recreated for file: {file_path}")
hard_links[inode] = file_path
if os.path.islink(file_path):
link_target = os.readlink(file_path)
os.symlink(link_target, file_path)
os.lchown(file_path, uid, gid)
shutil.copy2(file_path, scratch_file_path)
shutil.copystat(file_path, scratch_file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
def process_files(start_dir, scratch_dir, ceph_pool_value):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
if not os.path.exists(scratch_dir):
hard_links = {}
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
futures = []
for root, dirs, files in os.walk(start_dir):
for file in files:
file_path = os.path.join(root, file)
if scratch_dir in file_path:
stat_info = os.stat(file_path, follow_symlinks=False)
uid = stat_info.st_uid
gid = stat_info.st_gid
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
for future in futures:
if os.path.exists(scratch_dir):
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python <ceph_pool_value>")
ceph_pool_value = sys.argv[1]
process_files(start_directory, scratch_directory, ceph_pool_value)
Note, the latest version uses the xattr library, so you need to pip install xattr before you can use this.

