Last active
September 3, 2024 17:57
-
-
Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Migrate files in cephfs to a new file layout pool recursivly
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import logging | |
import sys | |
from concurrent.futures import ThreadPoolExecutor | |
import threading | |
import uuid | |
import xattr | |
from pathlib import Path | |
start_directory = '.' # current directory | |
scratch_directory = '.scratch' | |
max_parallel_threads = 4 | |
def has_ceph_pool_attr(file_path, pool_value): | |
""" Check if the file has the specified ceph pool attribute value using xattr. """ | |
try: | |
attributes = xattr.xattr(file_path) | |
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8') | |
return pool_value in ceph_pool | |
except (IOError, KeyError): | |
# IOError for inaccessible files, KeyError if the attribute does not exist | |
return False | |
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock): | |
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """ | |
try: | |
if has_ceph_pool_attr(file_path, ceph_pool_value): | |
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}") | |
return | |
logging.info(f"Processing file: {file_path}") | |
# after replacing file, parent folder atime and mtime are modified | |
# keep them to replace them | |
parent_path = Path(file_path).parent.absolute() | |
parent_stat_info = os.stat(parent_path, follow_symlinks=False) | |
parent_mtime = parent_stat_info.st_mtime | |
parent_atime = parent_stat_info.st_atime | |
# Generate a unique identifier and append it to the filename | |
unique_suffix = uuid.uuid4().hex | |
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}" | |
scratch_file_path = os.path.join(scratch_dir, scratch_file_name) | |
stat_info = os.stat(file_path, follow_symlinks=False) | |
inode = stat_info.st_ino | |
nlink = stat_info.st_nlink | |
if nlink > 1 or inode in hard_links: | |
with lock: | |
if inode in hard_links: | |
os.remove(file_path) | |
os.link(hard_links[inode], file_path) | |
logging.info(f"Hard link recreated for file: {file_path}") | |
return | |
else: | |
logging.info(f"Hard link added to list for file: {file_path}") | |
hard_links[inode] = file_path | |
if os.path.islink(file_path): | |
link_target = os.readlink(file_path) | |
os.unlink(file_path) | |
os.symlink(link_target, file_path) | |
os.lchown(file_path, uid, gid) | |
else: | |
shutil.copy2(file_path, scratch_file_path) | |
shutil.copystat(file_path, scratch_file_path) | |
os.remove(file_path) | |
shutil.move(scratch_file_path, file_path) | |
os.chown(file_path, uid, gid) | |
# update parent atime and mtime | |
os.utime(parent_path, (parent_atime, parent_mtime)) | |
except Exception as e: | |
logging.error(f"Error processing {file_path}: {e}") | |
def process_files(start_dir, scratch_dir, ceph_pool_value): | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
if not os.path.exists(scratch_dir): | |
os.makedirs(scratch_dir) | |
hard_links = {} | |
lock = threading.Lock() | |
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor: | |
futures = [] | |
for root, dirs, files in os.walk(start_dir): | |
dirs.sort() | |
files.sort() | |
for file in files: | |
file_path = os.path.join(root, file) | |
if scratch_dir in file_path: | |
continue | |
stat_info = os.stat(file_path, follow_symlinks=False) | |
uid = stat_info.st_uid | |
gid = stat_info.st_gid | |
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock) | |
futures.append(future) | |
for future in futures: | |
future.result() | |
if os.path.exists(scratch_dir): | |
shutil.rmtree(scratch_dir) | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print("Usage: python script.py <ceph_pool_value>") | |
sys.exit(1) | |
ceph_pool_value = sys.argv[1] | |
process_files(start_directory, scratch_directory, ceph_pool_value) |
Hello,
Thanks for the amazing script !
I've just noticed that, with this script, the parent folders atime
and mtime
are modified.
Because I'm not sure its intended (personally, I want this operation to be as much transparent as possible to the final user), I've patched the script (see below).
I've also found 2 bugs in the "hard link" part of the script:
- The file is not removed before re-creating the hard link. It creates an error
ERROR - Error processing ./3/a: [Errno 17] File exists: './1/c' -> './3/a'
. It needs first to remove the file. - Because the script is (possibly, I think in some specific race conditions this error doesn't occurs) destroying one hard link at a time, the last "hard link" got
nlink = 1
. The script should also check if the file is referenced in thehard_links[]
list.
For the second point, let's take a little example:
1/a
and2/b
are hard links, they have the same123456789
inode. For each one,nlink = 2
.- We launch the script (with only one thread). It first reads
1/a
. Becausenlink > 1
the script enter the if, and add it to the listhard_links[inode] = file_path
. 1/a
is then treated as a regular file (the file is copied to.scratch
, then this copy replaces1/a
). Because1/a
is a new file, the inode changed to987654321
.- The script now take care of
2/b
. Nownlink = 1
(it's the only file withinode=123456789
), so we won't enter the special hard_linkif
.
I'm proposing the following patch, feel free to add it if you want !
--- /tmp/migrate.py.old 2024-08-23 14:47:47.920283815 +0200
+++ /tmp/migrate.py 2024-08-23 14:47:34.656527176 +0200
@@ -7,6 +7,7 @@
import threading
import uuid
import xattr
+from pathlib import Path
start_directory = '.' # current directory
scratch_directory = '.scratch'
@@ -31,6 +32,13 @@
logging.info(f"Processing file: {file_path}")
+ # after replacing file, parent folder atime and mtime are modified
+ # keep them to replace them
+ parent_path = Path(file_path).parent.absolute()
+ parent_stat_info = os.stat(parent_path, follow_symlinks=False)
+ parent_mtime = parent_stat_info.st_mtime
+ parent_atime = parent_stat_info.st_atime
+
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
@@ -40,13 +48,15 @@
inode = stat_info.st_ino
nlink = stat_info.st_nlink
- if nlink > 1:
+ if nlink > 1 or inode in hard_links:
with lock:
if inode in hard_links:
+ os.remove(file_path)
os.link(hard_links[inode], file_path)
logging.info(f"Hard link recreated for file: {file_path}")
return
else:
+ logging.info(f"Hard link added to list for file: {file_path}")
hard_links[inode] = file_path
if os.path.islink(file_path):
@@ -60,6 +70,9 @@
os.remove(file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
+
+ # update parent atime and mtime
+ os.utime(parent_path, (parent_atime, parent_mtime))
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
For now on, this is working as intended.
Regards
I'm proposing the following patch, feel free to add it if you want !
Brilliant fixes. I incorporated them in my version. Thanks!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note, the latest version uses the xattr library, so you need to
pip install xattr
before you can use this.