Skip to content

Instantly share code, notes, and snippets.

@ervwalter
Last active November 13, 2024 20:25
Show Gist options
  • Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Migrate files in cephfs to a new file layout pool recursivly
import os
import shutil
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
import xattr
from pathlib import Path
start_directory = '.' # current directory
scratch_directory = '.scratch'
max_parallel_threads = 4
def has_ceph_pool_attr(file_path, pool_value):
""" Check if the file has the specified ceph pool attribute value using xattr. """
try:
attributes = xattr.xattr(file_path)
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8')
return ceph_pool == pool_value
except (IOError, KeyError):
# IOError for inaccessible files, KeyError if the attribute does not exist
return False
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock):
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """
try:
if has_ceph_pool_attr(file_path, ceph_pool_value):
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}")
return
logging.info(f"Processing file: {file_path}")
# after replacing file, parent folder atime and mtime are modified
# keep them to replace them
parent_path = Path(file_path).parent.absolute()
parent_stat_info = os.stat(parent_path, follow_symlinks=False)
parent_mtime = parent_stat_info.st_mtime
parent_atime = parent_stat_info.st_atime
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
scratch_file_path = os.path.join(scratch_dir, scratch_file_name)
stat_info = os.stat(file_path, follow_symlinks=False)
inode = stat_info.st_ino
nlink = stat_info.st_nlink
if nlink > 1 or inode in hard_links:
with lock:
if inode in hard_links:
os.remove(file_path)
os.link(hard_links[inode], file_path)
logging.info(f"Hard link recreated for file: {file_path}")
return
else:
logging.info(f"Hard link added to list for file: {file_path}")
hard_links[inode] = file_path
if os.path.islink(file_path):
link_target = os.readlink(file_path)
os.unlink(file_path)
os.symlink(link_target, file_path)
os.lchown(file_path, uid, gid)
else:
shutil.copy2(file_path, scratch_file_path)
shutil.copystat(file_path, scratch_file_path)
os.remove(file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
# update parent atime and mtime
os.utime(parent_path, (parent_atime, parent_mtime))
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
def handler(future):
future.result()
return
def process_files(start_dir, scratch_dir, ceph_pool_value):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
if not os.path.exists(scratch_dir):
os.makedirs(scratch_dir)
hard_links = {}
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
for root, dirs, files in os.walk(start_dir):
dirs.sort()
files.sort()
for file in files:
file_path = os.path.join(root, file)
if scratch_dir in file_path:
continue
stat_info = os.stat(file_path, follow_symlinks=False)
uid = stat_info.st_uid
gid = stat_info.st_gid
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
future.add_done_callback(handler)
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <ceph_pool_value>")
sys.exit(1)
ceph_pool_value = sys.argv[1]
process_files(start_directory, scratch_directory, ceph_pool_value)
@ervwalter
Copy link
Author

Note, the latest version uses the xattr library, so you need to pip install xattr before you can use this.

@PopiBrossard
Copy link

Hello,

Thanks for the amazing script !

I've just noticed that, with this script, the parent folders atime and mtime are modified.

Because I'm not sure its intended (personally, I want this operation to be as much transparent as possible to the final user), I've patched the script (see below).

I've also found 2 bugs in the "hard link" part of the script:

  • The file is not removed before re-creating the hard link. It creates an error ERROR - Error processing ./3/a: [Errno 17] File exists: './1/c' -> './3/a'. It needs first to remove the file.
  • Because the script is (possibly, I think in some specific race conditions this error doesn't occurs) destroying one hard link at a time, the last "hard link" got nlink = 1. The script should also check if the file is referenced in the hard_links[] list.

For the second point, let's take a little example:

  1. 1/a and 2/b are hard links, they have the same 123456789 inode. For each one, nlink = 2.
  2. We launch the script (with only one thread). It first reads 1/a. Because nlink > 1 the script enter the if, and add it to the list hard_links[inode] = file_path.
  3. 1/a is then treated as a regular file (the file is copied to .scratch, then this copy replaces 1/a). Because 1/a is a new file, the inode changed to 987654321.
  4. The script now take care of 2/b. Now nlink = 1 (it's the only file with inode=123456789), so we won't enter the special hard_link if.

I'm proposing the following patch, feel free to add it if you want !

--- /tmp/migrate.py.old	2024-08-23 14:47:47.920283815 +0200
+++ /tmp/migrate.py	2024-08-23 14:47:34.656527176 +0200
@@ -7,6 +7,7 @@
 import threading
 import uuid
 import xattr
+from pathlib import Path
 
 start_directory = '.'  # current directory
 scratch_directory = '.scratch'
@@ -31,6 +32,13 @@
 
         logging.info(f"Processing file: {file_path}")
 
+        # after replacing file, parent folder atime and mtime are modified
+        # keep them to replace them
+        parent_path = Path(file_path).parent.absolute()
+        parent_stat_info = os.stat(parent_path, follow_symlinks=False)
+        parent_mtime = parent_stat_info.st_mtime
+        parent_atime = parent_stat_info.st_atime
+
         # Generate a unique identifier and append it to the filename
         unique_suffix = uuid.uuid4().hex
         scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
@@ -40,13 +48,15 @@
         inode = stat_info.st_ino
         nlink = stat_info.st_nlink
 
-        if nlink > 1:
+        if nlink > 1 or inode in hard_links:
             with lock:
                 if inode in hard_links:
+                    os.remove(file_path)
                     os.link(hard_links[inode], file_path)
                     logging.info(f"Hard link recreated for file: {file_path}")
                     return
                 else:
+                    logging.info(f"Hard link added to list for file: {file_path}")
                     hard_links[inode] = file_path
 
         if os.path.islink(file_path):
@@ -60,6 +70,9 @@
             os.remove(file_path)
             shutil.move(scratch_file_path, file_path)
             os.chown(file_path, uid, gid)
+        
+        # update parent atime and mtime
+        os.utime(parent_path, (parent_atime, parent_mtime))
 
     except Exception as e:
         logging.error(f"Error processing {file_path}: {e}")

For now on, this is working as intended.

Regards

@ervwalter
Copy link
Author

I'm proposing the following patch, feel free to add it if you want !

Brilliant fixes. I incorporated them in my version. Thanks!

@solsys
Copy link

solsys commented Sep 29, 2024

Migrated 160TB of data between two pools. Took the better part of 4 days on my setup, but ran without a hitch. Thanks!

@PopiBrossard
Copy link

PopiBrossard commented Oct 22, 2024

Hi,
After using this script on big folders (200-300TB), I saw that the memory consumption was really high, could lead to activate the OOM killer (I'm talking about easily 40-50GB of RAM usage, or even more).

After digging this, I found out that the futures = [] table was really big (it's final size will be the "total number of files x size of object Future"). Near the end of a run, on a big folder, we talk about millions and millions of objects stored in this array.

When using the with syntax (I've tested this to make sure it was true), you won't exit the with before every thread as finished it's work. It means you don't really need to store all the future objects to call future.result() on them to make sure all the files have been processed.

If you have the same memory issue, you should try using the following patch:

--- migrate.py.old	2024-10-22 09:33:06.920808260 +0200
+++ migrate.py.new	2024-10-22 09:36:00.865790973 +0200
@@ -76,6 +89,10 @@
     except Exception as e:
         logging.error(f"Error processing {file_path}: {e}")
 
+def handler(future):
+    future.result()
+    return
+
 def process_files(start_dir, scratch_dir, ceph_pool_value):
     logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
@@ -86,7 +103,6 @@
     lock = threading.Lock()
 
     with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
-        futures = []
         for root, dirs, files in os.walk(start_dir):
             dirs.sort()
             files.sort()
@@ -100,10 +116,7 @@
                 gid = stat_info.st_gid
 
                 future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
+                future.add_done_callback(handler)
-                futures.append(future)
-
-        for future in futures:
-            future.result()
 
     if os.path.exists(scratch_dir):
         shutil.rmtree(scratch_dir)

I'm still calling future.result(), but using a callback. This may not even be necessary

Feel free to add it to your gist if you wan't !

@nshah011393
Copy link

nshah011393 commented Oct 24, 2024

Hey all,

As I am getting ready to do something similar in our environment (change the pools which the cephfs data pool for a directory is on, because we need to use a different EC policy), is there a difference between using this script and method, versus doing like an rsync between the directory on the old pool and a directory on the new pool?

Also, assuming we update the file attr on the directory to make new files go to the new pool, if we use this script and some client comes in (while the script is running) and edits an existing file (so the old pool), won't there be an issue , since the script already copied the file? Or does this process assume the existing files will not be updated/touched by clients, during this migration script?

Thanks!

@ervwalter
Copy link
Author

is there a difference between using this script and method, versus doing like an rsync between the directory on the old pool and a directory on the new pool?

This method is for cases where you don't actually want to move the file to a new folder and just want to leave it where it is but change the underlying pool. If you use rsync to move the file to an entirely new folder path, then you don't need this as copying the file to the new place will make the new file follow whatever rules are on the destination folder.

does this process assume the existing files will not be updated/touched by clients, during this migration script?

This script does assume no one else is using the files while it's running. There is a race condition where bad things could happen if someone tried to edit a file at exactly the wrong time. The same problem occurs when using rsync. In my case, I am the only user of my homelab so I just shut down my services while I made the change and then started them back up and the only person I inconvenienced was me. But if you have a large business with many concurrent users, you won't have that option and probably need to schedule this for a maintenance window off hours, etc.

@nshah011393
Copy link

is there a difference between using this script and method, versus doing like an rsync between the directory on the old pool and a directory on the new pool?

This method is for cases where you don't actually want to move the file to a new folder and just want to leave it where it is but change the underlying pool. If you use rsync to move the file to an entirely new folder path, then you don't need this as copying the file to the new place will make the new file follow whatever rules are on the destination folder.

does this process assume the existing files will not be updated/touched by clients, during this migration script?

This script does assume no one else is using the files while it's running. There is a race condition where bad things could happen if someone tried to edit a file at exactly the wrong time. The same problem occurs when using rsync. In my case, I am the only user of my homelab so I just shut down my services while I made the change and then started them back up and the only person I inconvenienced was me. But if you have a large business with many concurrent users, you won't have that option and probably need to schedule this for a maintenance window off hours, etc.

Thanks for the reply! Curious, did you entertain any of the native ceph ways of copying stuff between pools? Like the rados cppool or import/export?

@ervwalter
Copy link
Author

Thanks for the reply! Curious, did you entertain any of the native ceph ways of copying stuff between pools? Like the rados cppool or import/export?

I looked for one before I made this and never found a path. The consensus in the community seemed to be "copy the files somewhere and then copy them back". I didn't want to do it by hand, and I saw someone else had made their own python script to do something like this and so I went this path as well (their script didn't work for me).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment