-
-
Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
import os | |
import shutil | |
import logging | |
import sys | |
from concurrent.futures import ThreadPoolExecutor | |
import threading | |
import uuid | |
import xattr | |
from pathlib import Path | |
start_directory = '.' # current directory | |
scratch_directory = '.scratch' | |
max_parallel_threads = 4 | |
def has_ceph_pool_attr(file_path, pool_value): | |
""" Check if the file has the specified ceph pool attribute value using xattr. """ | |
try: | |
attributes = xattr.xattr(file_path) | |
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8') | |
return ceph_pool == pool_value | |
except (IOError, KeyError): | |
# IOError for inaccessible files, KeyError if the attribute does not exist | |
return False | |
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock): | |
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """ | |
try: | |
if has_ceph_pool_attr(file_path, ceph_pool_value): | |
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}") | |
return | |
logging.info(f"Processing file: {file_path}") | |
# after replacing file, parent folder atime and mtime are modified | |
# keep them to replace them | |
parent_path = Path(file_path).parent.absolute() | |
parent_stat_info = os.stat(parent_path, follow_symlinks=False) | |
parent_mtime = parent_stat_info.st_mtime | |
parent_atime = parent_stat_info.st_atime | |
# Generate a unique identifier and append it to the filename | |
unique_suffix = uuid.uuid4().hex | |
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}" | |
scratch_file_path = os.path.join(scratch_dir, scratch_file_name) | |
stat_info = os.stat(file_path, follow_symlinks=False) | |
inode = stat_info.st_ino | |
nlink = stat_info.st_nlink | |
if nlink > 1 or inode in hard_links: | |
with lock: | |
if inode in hard_links: | |
os.remove(file_path) | |
os.link(hard_links[inode], file_path) | |
logging.info(f"Hard link recreated for file: {file_path}") | |
return | |
else: | |
logging.info(f"Hard link added to list for file: {file_path}") | |
hard_links[inode] = file_path | |
if os.path.islink(file_path): | |
link_target = os.readlink(file_path) | |
os.unlink(file_path) | |
os.symlink(link_target, file_path) | |
os.lchown(file_path, uid, gid) | |
else: | |
shutil.copy2(file_path, scratch_file_path) | |
shutil.copystat(file_path, scratch_file_path) | |
os.remove(file_path) | |
shutil.move(scratch_file_path, file_path) | |
os.chown(file_path, uid, gid) | |
# update parent atime and mtime | |
os.utime(parent_path, (parent_atime, parent_mtime)) | |
except Exception as e: | |
logging.error(f"Error processing {file_path}: {e}") | |
def handler(future): | |
future.result() | |
return | |
def process_files(start_dir, scratch_dir, ceph_pool_value): | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
if not os.path.exists(scratch_dir): | |
os.makedirs(scratch_dir) | |
hard_links = {} | |
lock = threading.Lock() | |
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor: | |
for root, dirs, files in os.walk(start_dir): | |
dirs.sort() | |
files.sort() | |
for file in files: | |
file_path = os.path.join(root, file) | |
if scratch_dir in file_path: | |
continue | |
stat_info = os.stat(file_path, follow_symlinks=False) | |
uid = stat_info.st_uid | |
gid = stat_info.st_gid | |
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock) | |
future.add_done_callback(handler) | |
if os.path.exists(scratch_dir): | |
shutil.rmtree(scratch_dir) | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print("Usage: python script.py <ceph_pool_value>") | |
sys.exit(1) | |
ceph_pool_value = sys.argv[1] | |
process_files(start_directory, scratch_directory, ceph_pool_value) |
Hello,
Thanks for the amazing script !
I've just noticed that, with this script, the parent folders atime
and mtime
are modified.
Because I'm not sure its intended (personally, I want this operation to be as much transparent as possible to the final user), I've patched the script (see below).
I've also found 2 bugs in the "hard link" part of the script:
- The file is not removed before re-creating the hard link. It creates an error
ERROR - Error processing ./3/a: [Errno 17] File exists: './1/c' -> './3/a'
. It needs first to remove the file. - Because the script is (possibly, I think in some specific race conditions this error doesn't occurs) destroying one hard link at a time, the last "hard link" got
nlink = 1
. The script should also check if the file is referenced in thehard_links[]
list.
For the second point, let's take a little example:
1/a
and2/b
are hard links, they have the same123456789
inode. For each one,nlink = 2
.- We launch the script (with only one thread). It first reads
1/a
. Becausenlink > 1
the script enter the if, and add it to the listhard_links[inode] = file_path
. 1/a
is then treated as a regular file (the file is copied to.scratch
, then this copy replaces1/a
). Because1/a
is a new file, the inode changed to987654321
.- The script now take care of
2/b
. Nownlink = 1
(it's the only file withinode=123456789
), so we won't enter the special hard_linkif
.
I'm proposing the following patch, feel free to add it if you want !
--- /tmp/migrate.py.old 2024-08-23 14:47:47.920283815 +0200
+++ /tmp/migrate.py 2024-08-23 14:47:34.656527176 +0200
@@ -7,6 +7,7 @@
import threading
import uuid
import xattr
+from pathlib import Path
start_directory = '.' # current directory
scratch_directory = '.scratch'
@@ -31,6 +32,13 @@
logging.info(f"Processing file: {file_path}")
+ # after replacing file, parent folder atime and mtime are modified
+ # keep them to replace them
+ parent_path = Path(file_path).parent.absolute()
+ parent_stat_info = os.stat(parent_path, follow_symlinks=False)
+ parent_mtime = parent_stat_info.st_mtime
+ parent_atime = parent_stat_info.st_atime
+
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
@@ -40,13 +48,15 @@
inode = stat_info.st_ino
nlink = stat_info.st_nlink
- if nlink > 1:
+ if nlink > 1 or inode in hard_links:
with lock:
if inode in hard_links:
+ os.remove(file_path)
os.link(hard_links[inode], file_path)
logging.info(f"Hard link recreated for file: {file_path}")
return
else:
+ logging.info(f"Hard link added to list for file: {file_path}")
hard_links[inode] = file_path
if os.path.islink(file_path):
@@ -60,6 +70,9 @@
os.remove(file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
+
+ # update parent atime and mtime
+ os.utime(parent_path, (parent_atime, parent_mtime))
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
For now on, this is working as intended.
Regards
I'm proposing the following patch, feel free to add it if you want !
Brilliant fixes. I incorporated them in my version. Thanks!
Migrated 160TB of data between two pools. Took the better part of 4 days on my setup, but ran without a hitch. Thanks!
Hi,
After using this script on big folders (200-300TB), I saw that the memory consumption was really high, could lead to activate the OOM killer (I'm talking about easily 40-50GB of RAM usage, or even more).
After digging this, I found out that the futures = []
table was really big (it's final size will be the "total number of files x size of object Future
"). Near the end of a run, on a big folder, we talk about millions and millions of objects stored in this array.
When using the with
syntax (I've tested this to make sure it was true), you won't exit the with
before every thread as finished it's work. It means you don't really need to store all the future
objects to call future.result()
on them to make sure all the files have been processed.
If you have the same memory issue, you should try using the following patch:
--- migrate.py.old 2024-10-22 09:33:06.920808260 +0200
+++ migrate.py.new 2024-10-22 09:36:00.865790973 +0200
@@ -76,6 +89,10 @@
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
+def handler(future):
+ future.result()
+ return
+
def process_files(start_dir, scratch_dir, ceph_pool_value):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -86,7 +103,6 @@
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
- futures = []
for root, dirs, files in os.walk(start_dir):
dirs.sort()
files.sort()
@@ -100,10 +116,7 @@
gid = stat_info.st_gid
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
+ future.add_done_callback(handler)
- futures.append(future)
-
- for future in futures:
- future.result()
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
I'm still calling future.result()
, but using a callback. This may not even be necessary
Feel free to add it to your gist if you wan't !
Hey all,
As I am getting ready to do something similar in our environment (change the pools which the cephfs data pool for a directory is on, because we need to use a different EC policy), is there a difference between using this script and method, versus doing like an rsync between the directory on the old pool and a directory on the new pool?
Also, assuming we update the file attr on the directory to make new files go to the new pool, if we use this script and some client comes in (while the script is running) and edits an existing file (so the old pool), won't there be an issue , since the script already copied the file? Or does this process assume the existing files will not be updated/touched by clients, during this migration script?
Thanks!
is there a difference between using this script and method, versus doing like an rsync between the directory on the old pool and a directory on the new pool?
This method is for cases where you don't actually want to move the file to a new folder and just want to leave it where it is but change the underlying pool. If you use rsync to move the file to an entirely new folder path, then you don't need this as copying the file to the new place will make the new file follow whatever rules are on the destination folder.
does this process assume the existing files will not be updated/touched by clients, during this migration script?
This script does assume no one else is using the files while it's running. There is a race condition where bad things could happen if someone tried to edit a file at exactly the wrong time. The same problem occurs when using rsync. In my case, I am the only user of my homelab so I just shut down my services while I made the change and then started them back up and the only person I inconvenienced was me. But if you have a large business with many concurrent users, you won't have that option and probably need to schedule this for a maintenance window off hours, etc.
is there a difference between using this script and method, versus doing like an rsync between the directory on the old pool and a directory on the new pool?
This method is for cases where you don't actually want to move the file to a new folder and just want to leave it where it is but change the underlying pool. If you use rsync to move the file to an entirely new folder path, then you don't need this as copying the file to the new place will make the new file follow whatever rules are on the destination folder.
does this process assume the existing files will not be updated/touched by clients, during this migration script?
This script does assume no one else is using the files while it's running. There is a race condition where bad things could happen if someone tried to edit a file at exactly the wrong time. The same problem occurs when using rsync. In my case, I am the only user of my homelab so I just shut down my services while I made the change and then started them back up and the only person I inconvenienced was me. But if you have a large business with many concurrent users, you won't have that option and probably need to schedule this for a maintenance window off hours, etc.
Thanks for the reply! Curious, did you entertain any of the native ceph ways of copying stuff between pools? Like the rados cppool or import/export?
Thanks for the reply! Curious, did you entertain any of the native ceph ways of copying stuff between pools? Like the rados cppool or import/export?
I looked for one before I made this and never found a path. The consensus in the community seemed to be "copy the files somewhere and then copy them back". I didn't want to do it by hand, and I saw someone else had made their own python script to do something like this and so I went this path as well (their script didn't work for me).
Note, the latest version uses the xattr library, so you need to
pip install xattr
before you can use this.