Skip to content

Instantly share code, notes, and snippets.

@mainframe
Forked from wido/rbd-export-import.py
Created December 6, 2016 13:25
Show Gist options
  • Save mainframe/0a9a96692e1d1eaa98f2bcf58bab27c4 to your computer and use it in GitHub Desktop.
Save mainframe/0a9a96692e1d1eaa98f2bcf58bab27c4 to your computer and use it in GitHub Desktop.
Multi threaded RBD copy
#!/usr/bin/env python
"""
Python script to multi-threaded copy a RBD image from one cluster to another
This script requires a configuration file and a RBD image to copy.
It will copy the RBD image from the source pool to destination pool as
specified in the configuration file.
It assumes the destination image already exists and is at least the size of the
source image.
Example config (rbd-export-import.conf):
[source]
pool = rbd
mon_host = 2001:db8::100
id = admin
key = AQAI1s5XK0q0ChAA9FDEBluyt4E0OI26oGVsGQ==
[destination]
pool = rbd
mon_host = 2001:db8::200
id = admin
key = AQCuASJXa27KDhAAg3Ww25Jn/ZLmlDcwhVLNXg==
Author: Wido den Hollander <wido@42on.com>
"""
import rbd
import rados
import argparse
import logging
import sys
import threading
import time
import Queue
from ConfigParser import ConfigParser
LOGGER = logging.getLogger()
# Read in 1GB segment sizes
SEGMENT_SIZE = 1073741824
def rbd_copy_worker(i, q, source, dest, chunk_size):
while True:
chunk = q.get()
offset = chunk['offset']
length = chunk['length']
LOGGER.debug('Worker %d offset: %d length: %d', i, offset, length)
done = 0
while done < length:
data = source.read(offset, chunk_size)
read = len(data)
dest.write(data, offset)
offset += read
done += read
q.task_done()
def create_rados_connection(mon_host, rados_id, key):
conn = rados.Rados(rados_id=rados_id)
conn.conf_set('mon_host', mon_host)
conn.conf_set('key', key)
conn.connect()
return conn
def main(config, img_source, img_dest, workers, chunk_size):
source = create_rados_connection(config.get('source', 'mon_host'),
config.get('source', 'id'),
config.get('source', 'key'))
dest = create_rados_connection(config.get('destination', 'mon_host'),
config.get('destination', 'id'),
config.get('destination', 'key'))
LOGGER.info('Spawning %d workers to copy %s to %s', workers, img_source,
img_dest)
LOGGER.debug('Creating RADOS IoCTX')
source_io = source.open_ioctx(config.get('source', 'pool'))
dest_io = dest.open_ioctx(config.get('destination', 'pool'))
LOGGER.debug('Opening RBD images')
source_rbd = rbd.Image(source_io, img_source, read_only=True)
dest_rbd = rbd.Image(dest_io, img_dest)
size = source_rbd.size()
LOGGER.info('Size of source image is %d', size)
if dest_rbd.size() < source_rbd.size():
raise Exception('Destination image is small than source')
LOGGER.info('Will use %d byte chunks to read and write', chunk_size)
LOGGER.info('Splitting up into %d sized segments', SEGMENT_SIZE)
threads = []
chunk_queue = Queue.Queue()
for i in range(workers):
worker = threading.Thread(target=rbd_copy_worker,
args=(i, chunk_queue, source_rbd, dest_rbd,
chunk_size,))
threads.append(worker)
worker.daemon = True
worker.start()
offset = 0
while offset < size:
if offset + SEGMENT_SIZE > size:
length = size - (offset + SEGMENT_SIZE)
else:
length = SEGMENT_SIZE
chunk = {'offset': offset, 'length': length}
chunk_queue.put(chunk)
offset += length
LOGGER.info('Waiting for Queue to be empty')
chunk_queue.join()
LOGGER.debug('Closing RBD images')
source_rbd.close()
dest_rbd.close()
LOGGER.debug('Closing RADOS IoCTX')
source_io.close()
dest_io.close()
LOGGER.debug('Closing RADOS connections')
source.shutdown()
dest.shutdown()
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
parser = argparse.ArgumentParser(description='Ceph RBD import/export')
parser.add_argument('--config', action='store', dest='conffile',
default='rbd-export-import.conf',
help='Configuration file')
parser.add_argument('--source-image', action='store', dest='img_source',
help='The source image')
parser.add_argument('--dest-image', action='store', dest='img_dest',
help='The destination image')
parser.add_argument('--workers', action='store', dest='workers', type='int',
help='Number of worker threads to run', default=10)
parser.add_argument('--chunk-size', action='store', dest='chunk_size',
type='int', default=262144)
args = parser.parse_args()
conf = ConfigParser()
conf.readfp(open(args.conffile))
main(conf, args.img_source, args.img_dest, args.workers, args.chunk_size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment