Skip to content

Instantly share code, notes, and snippets.

@ervwalter
Last active September 3, 2024 17:57
Show Gist options
  • Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Migrate files in cephfs to a new file layout pool recursivly
import os
import shutil
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
import xattr
from pathlib import Path
start_directory = '.' # current directory
scratch_directory = '.scratch'
max_parallel_threads = 4
def has_ceph_pool_attr(file_path, pool_value):
""" Check if the file has the specified ceph pool attribute value using xattr. """
try:
attributes = xattr.xattr(file_path)
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8')
return pool_value in ceph_pool
except (IOError, KeyError):
# IOError for inaccessible files, KeyError if the attribute does not exist
return False
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock):
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """
try:
if has_ceph_pool_attr(file_path, ceph_pool_value):
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}")
return
logging.info(f"Processing file: {file_path}")
# after replacing file, parent folder atime and mtime are modified
# keep them to replace them
parent_path = Path(file_path).parent.absolute()
parent_stat_info = os.stat(parent_path, follow_symlinks=False)
parent_mtime = parent_stat_info.st_mtime
parent_atime = parent_stat_info.st_atime
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
scratch_file_path = os.path.join(scratch_dir, scratch_file_name)
stat_info = os.stat(file_path, follow_symlinks=False)
inode = stat_info.st_ino
nlink = stat_info.st_nlink
if nlink > 1 or inode in hard_links:
with lock:
if inode in hard_links:
os.remove(file_path)
os.link(hard_links[inode], file_path)
logging.info(f"Hard link recreated for file: {file_path}")
return
else:
logging.info(f"Hard link added to list for file: {file_path}")
hard_links[inode] = file_path
if os.path.islink(file_path):
link_target = os.readlink(file_path)
os.unlink(file_path)
os.symlink(link_target, file_path)
os.lchown(file_path, uid, gid)
else:
shutil.copy2(file_path, scratch_file_path)
shutil.copystat(file_path, scratch_file_path)
os.remove(file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
# update parent atime and mtime
os.utime(parent_path, (parent_atime, parent_mtime))
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
def process_files(start_dir, scratch_dir, ceph_pool_value):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
if not os.path.exists(scratch_dir):
os.makedirs(scratch_dir)
hard_links = {}
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
futures = []
for root, dirs, files in os.walk(start_dir):
dirs.sort()
files.sort()
for file in files:
file_path = os.path.join(root, file)
if scratch_dir in file_path:
continue
stat_info = os.stat(file_path, follow_symlinks=False)
uid = stat_info.st_uid
gid = stat_info.st_gid
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
futures.append(future)
for future in futures:
future.result()
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <ceph_pool_value>")
sys.exit(1)
ceph_pool_value = sys.argv[1]
process_files(start_directory, scratch_directory, ceph_pool_value)
@ervwalter
Copy link
Author

Note, the latest version uses the xattr library, so you need to pip install xattr before you can use this.

@PopiBrossard
Copy link

Hello,

Thanks for the amazing script !

I've just noticed that, with this script, the parent folders atime and mtime are modified.

Because I'm not sure its intended (personally, I want this operation to be as much transparent as possible to the final user), I've patched the script (see below).

I've also found 2 bugs in the "hard link" part of the script:

  • The file is not removed before re-creating the hard link. It creates an error ERROR - Error processing ./3/a: [Errno 17] File exists: './1/c' -> './3/a'. It needs first to remove the file.
  • Because the script is (possibly, I think in some specific race conditions this error doesn't occurs) destroying one hard link at a time, the last "hard link" got nlink = 1. The script should also check if the file is referenced in the hard_links[] list.

For the second point, let's take a little example:

  1. 1/a and 2/b are hard links, they have the same 123456789 inode. For each one, nlink = 2.
  2. We launch the script (with only one thread). It first reads 1/a. Because nlink > 1 the script enter the if, and add it to the list hard_links[inode] = file_path.
  3. 1/a is then treated as a regular file (the file is copied to .scratch, then this copy replaces 1/a). Because 1/a is a new file, the inode changed to 987654321.
  4. The script now take care of 2/b. Now nlink = 1 (it's the only file with inode=123456789), so we won't enter the special hard_link if.

I'm proposing the following patch, feel free to add it if you want !

--- /tmp/migrate.py.old	2024-08-23 14:47:47.920283815 +0200
+++ /tmp/migrate.py	2024-08-23 14:47:34.656527176 +0200
@@ -7,6 +7,7 @@
 import threading
 import uuid
 import xattr
+from pathlib import Path
 
 start_directory = '.'  # current directory
 scratch_directory = '.scratch'
@@ -31,6 +32,13 @@
 
         logging.info(f"Processing file: {file_path}")
 
+        # after replacing file, parent folder atime and mtime are modified
+        # keep them to replace them
+        parent_path = Path(file_path).parent.absolute()
+        parent_stat_info = os.stat(parent_path, follow_symlinks=False)
+        parent_mtime = parent_stat_info.st_mtime
+        parent_atime = parent_stat_info.st_atime
+
         # Generate a unique identifier and append it to the filename
         unique_suffix = uuid.uuid4().hex
         scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
@@ -40,13 +48,15 @@
         inode = stat_info.st_ino
         nlink = stat_info.st_nlink
 
-        if nlink > 1:
+        if nlink > 1 or inode in hard_links:
             with lock:
                 if inode in hard_links:
+                    os.remove(file_path)
                     os.link(hard_links[inode], file_path)
                     logging.info(f"Hard link recreated for file: {file_path}")
                     return
                 else:
+                    logging.info(f"Hard link added to list for file: {file_path}")
                     hard_links[inode] = file_path
 
         if os.path.islink(file_path):
@@ -60,6 +70,9 @@
             os.remove(file_path)
             shutil.move(scratch_file_path, file_path)
             os.chown(file_path, uid, gid)
+        
+        # update parent atime and mtime
+        os.utime(parent_path, (parent_atime, parent_mtime))
 
     except Exception as e:
         logging.error(f"Error processing {file_path}: {e}")

For now on, this is working as intended.

Regards

@ervwalter
Copy link
Author

I'm proposing the following patch, feel free to add it if you want !

Brilliant fixes. I incorporated them in my version. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment