Skip to content

Instantly share code, notes, and snippets.

@arnobroekhof
Last active May 28, 2018 04:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arnobroekhof/aeec55342cb395a8f721 to your computer and use it in GitHub Desktop.
Save arnobroekhof/aeec55342cb395a8f721 to your computer and use it in GitHub Desktop.
Copying files to rados
#!/usr/bin/env python
try:
import rados
import sys
import hashlib
import argparse
import os
import logging
except ImportError:
raise ImportError('unable to import modules')
sys.stderr.write("unable to import modules")
exit(1)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_argparser():
"""
This creates a set of common command-line options that we will use
in our scripts'''
:return: argparse.ArgumentParser() object
"""
p = argparse.ArgumentParser()
p.add_argument('--directory', help='Directory to copy', required=True)
p.add_argument('--key-prefix', help='rados object key prefix', default=None)
p.add_argument('--ceph-pool',
default=os.environ.get('CEPH_POOL'),
help='Ceph pool to use, defaults to environment variable CEPH_POOL')
p.add_argument('--ceph-id',
default=os.environ.get('CEPH_ID'),
help='Ceph id, defaults to environment variable CEPH_ID, note: the name must start with client.')
p.add_argument('--ceph-conf',
default=os.environ.get('CEPH_CONF'),
help='Ceph configuration file, default to environment variable CEPH_CONF')
p.add_argument('--ceph-keyring',
default=os.environ.get('CEPH_KEYRING'),
help='Ceph keyring file, default to environment variable CEPH_KEYRING')
p.add_argument('--block-size',
default=65536, help='Block size used to copy, defaults to 65536 bytes')
return p
def copy_to_ceph(src, key, ioctx_handler, blocksize=65536, key_prefix=None):
"""
Copy file to rados and calculate md5 on the fly and put it as xattr attribute
:param src: path of the file
:param key: the object name for rados
:param ioctx_handler: rados IOctx handler
:param blocksize: blocksize defaults to 65536
:return: True if there where no errors
"""
if not key_prefix is None:
key = "%s%s" % (key_prefix, key)
try:
f = open(src, 'r')
buf = f.read(blocksize)
h = hashlib.md5()
offset = 0
while len(buf) > 0:
h.update(buf)
ioctx_handler.write(key, buf, offset)
offset += len(buf)
buf = f.read(blocksize)
ioctx_handler.set_xattr(key, "MD5", h.hexdigest())
return True
except Exception as e:
logger.error('Error opening file %s' % src)
return False
def get_filepaths(directory):
"""
This function will generate the file names in a directory
tree by walking the tree either top-down or bottom-up. For each
directory in the tree rooted at directory top (including top itself),
it yields a 3-tuple (dirpath, dirnames, filenames).
"""
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
return file_paths # Self-explanatory.
def connect_to_ceph(ceph_conf, ceph_id, ceph_pool, ceph_keyring):
if not 'client.' in ceph_id:
ceph_id = 'client.%s' % ceph_id
cluster = rados.Rados(conffile=ceph_conf, conf=dict(keyring=ceph_keyring), name=ceph_id)
try:
cluster.connect()
logger.info("Connect to ceph cluster with id: %s " % cluster.get_fsid())
except Exception as e:
print RuntimeError(e)
try:
cluster.pool_exists(ceph_pool)
except Exception as e:
print RuntimeError(e)
ioctx = cluster.open_ioctx(ceph_pool)
return ioctx
def parse_args():
"""
Parse the arguments
:return: argparse parse_args() object
"""
p = create_argparser()
return p.parse_args()
def start_copy(files, ioctx, block_size, key_prefix=None):
"""
Start the copy to rados
:param files: A dict with all files to be copied
:param ioctx: The rados IOCtx Handler
:param block_size: The block to size for copying.
:return:
"""
count = len(files)
count_done = 0
logger.info('Start copying: %s files' % count)
for f in files:
if copy_to_ceph(f, f, ioctx, block_size, key_prefix):
logger.info('Status copying: %s of %s' % (count_done, count))
count_done += 1
else:
logger.error('Error copying file %s' % f)
logger.info('Finised copying %s' % count)
def main():
args = parse_args()
ioctx = connect_to_ceph(args.ceph_conf, args.ceph_id, args.ceph_pool, args.ceph_keyring)
files = get_filepaths(args.directory)
start_copy(files, ioctx, args.block_size, args.key_prefix)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment