Skip to content

Instantly share code, notes, and snippets.

@ervwalter
Last active June 25, 2024 07:57
Show Gist options
  • Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Migrate files in cephfs to a new file layout pool recursivly
import os
import shutil
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
import xattr
start_directory = '.' # current directory
scratch_directory = '.scratch'
max_parallel_threads = 4
def has_ceph_pool_attr(file_path, pool_value):
""" Check if the file has the specified ceph pool attribute value using xattr. """
try:
attributes = xattr.xattr(file_path)
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8')
return pool_value in ceph_pool
except (IOError, KeyError):
# IOError for inaccessible files, KeyError if the attribute does not exist
return False
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock):
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """
try:
if has_ceph_pool_attr(file_path, ceph_pool_value):
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}")
return
logging.info(f"Processing file: {file_path}")
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
scratch_file_path = os.path.join(scratch_dir, scratch_file_name)
stat_info = os.stat(file_path, follow_symlinks=False)
inode = stat_info.st_ino
nlink = stat_info.st_nlink
if nlink > 1:
with lock:
if inode in hard_links:
os.link(hard_links[inode], file_path)
logging.info(f"Hard link recreated for file: {file_path}")
return
else:
hard_links[inode] = file_path
if os.path.islink(file_path):
link_target = os.readlink(file_path)
os.unlink(file_path)
os.symlink(link_target, file_path)
os.lchown(file_path, uid, gid)
else:
shutil.copy2(file_path, scratch_file_path)
shutil.copystat(file_path, scratch_file_path)
os.remove(file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
def process_files(start_dir, scratch_dir, ceph_pool_value):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
if not os.path.exists(scratch_dir):
os.makedirs(scratch_dir)
hard_links = {}
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
futures = []
for root, dirs, files in os.walk(start_dir):
dirs.sort()
files.sort()
for file in files:
file_path = os.path.join(root, file)
if scratch_dir in file_path:
continue
stat_info = os.stat(file_path, follow_symlinks=False)
uid = stat_info.st_uid
gid = stat_info.st_gid
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
futures.append(future)
for future in futures:
future.result()
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <ceph_pool_value>")
sys.exit(1)
ceph_pool_value = sys.argv[1]
process_files(start_directory, scratch_directory, ceph_pool_value)
@ervwalter
Copy link
Author

Note, the latest version uses the xattr library, so you need to pip install xattr before you can use this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment