Skip to content

Instantly share code, notes, and snippets.

@wido
Created September 6, 2016 18:46
Show Gist options
  • Save wido/4accce6623381f23079479f5ac06bae0 to your computer and use it in GitHub Desktop.
Save wido/4accce6623381f23079479f5ac06bae0 to your computer and use it in GitHub Desktop.
Multi threaded RBD copy
#!/usr/bin/env python
"""
Python script to multi-threaded copy a RBD image from one cluster to another
This script requires a configuration file and a RBD image to copy.
It will copy the RBD image from the source pool to destination pool as
specified in the configuration file.
It assumes the destination image already exists and is at least the size of the
source image.
Example config (rbd-export-import.conf):
[source]
pool = rbd
mon_host = 2001:db8::100
id = admin
key = AQAI1s5XK0q0ChAA9FDEBluyt4E0OI26oGVsGQ==
[destination]
pool = rbd
mon_host = 2001:db8::200
id = admin
key = AQCuASJXa27KDhAAg3Ww25Jn/ZLmlDcwhVLNXg==
Author: Wido den Hollander <wido@42on.com>
"""
import rbd
import rados
import argparse
import logging
import sys
import threading
import time
import Queue
from ConfigParser import ConfigParser
LOGGER = logging.getLogger()
# Read in 1GB segment sizes
SEGMENT_SIZE = 1073741824
def rbd_copy_worker(i, q, source, dest, chunk_size):
while True:
chunk = q.get()
offset = chunk['offset']
length = chunk['length']
LOGGER.debug('Worker %d offset: %d length: %d', i, offset, length)
done = 0
while done < length:
data = source.read(offset, chunk_size)
read = len(data)
dest.write(data, offset)
offset += read
done += read
q.task_done()
def create_rados_connection(mon_host, rados_id, key):
conn = rados.Rados(rados_id=rados_id)
conn.conf_set('mon_host', mon_host)
conn.conf_set('key', key)
conn.connect()
return conn
def main(config, img_source, img_dest, workers, chunk_size):
source = create_rados_connection(config.get('source', 'mon_host'),
config.get('source', 'id'),
config.get('source', 'key'))
dest = create_rados_connection(config.get('destination', 'mon_host'),
config.get('destination', 'id'),
config.get('destination', 'key'))
LOGGER.info('Spawning %d workers to copy %s to %s', workers, img_source,
img_dest)
LOGGER.debug('Creating RADOS IoCTX')
source_io = source.open_ioctx(config.get('source', 'pool'))
dest_io = dest.open_ioctx(config.get('destination', 'pool'))
LOGGER.debug('Opening RBD images')
source_rbd = rbd.Image(source_io, img_source, read_only=True)
dest_rbd = rbd.Image(dest_io, img_dest)
size = source_rbd.size()
LOGGER.info('Size of source image is %d', size)
if dest_rbd.size() < source_rbd.size():
raise Exception('Destination image is small than source')
LOGGER.info('Will use %d byte chunks to read and write', chunk_size)
LOGGER.info('Splitting up into %d sized segments', SEGMENT_SIZE)
threads = []
chunk_queue = Queue.Queue()
for i in range(workers):
worker = threading.Thread(target=rbd_copy_worker,
args=(i, chunk_queue, source_rbd, dest_rbd,
chunk_size,))
threads.append(worker)
worker.daemon = True
worker.start()
offset = 0
while offset < size:
if offset + SEGMENT_SIZE > size:
length = size - (offset + SEGMENT_SIZE)
else:
length = SEGMENT_SIZE
chunk = {'offset': offset, 'length': length}
chunk_queue.put(chunk)
offset += length
LOGGER.info('Waiting for Queue to be empty')
chunk_queue.join()
LOGGER.debug('Closing RBD images')
source_rbd.close()
dest_rbd.close()
LOGGER.debug('Closing RADOS IoCTX')
source_io.close()
dest_io.close()
LOGGER.debug('Closing RADOS connections')
source.shutdown()
dest.shutdown()
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
parser = argparse.ArgumentParser(description='Ceph RBD import/export')
parser.add_argument('--config', action='store', dest='conffile',
default='rbd-export-import.conf',
help='Configuration file')
parser.add_argument('--source-image', action='store', dest='img_source',
help='The source image')
parser.add_argument('--dest-image', action='store', dest='img_dest',
help='The destination image')
parser.add_argument('--workers', action='store', dest='workers', type='int',
help='Number of worker threads to run', default=10)
parser.add_argument('--chunk-size', action='store', dest='chunk_size',
type='int', default=262144)
args = parser.parse_args()
conf = ConfigParser()
conf.readfp(open(args.conffile))
main(conf, args.img_source, args.img_dest, args.workers, args.chunk_size)
@ImageWQ
Copy link

ImageWQ commented Apr 29, 2022

Your code is well worth learning, but it runs very slowly. When rbd export and import needs 200s, your method needs 700s. How should I adjust it?——from CEPH enthusiasts

@wido
Copy link
Author

wido commented Apr 29, 2022

Your code is well worth learning, but it runs very slowly. When rbd export and import needs 200s, your method needs 700s. How should I adjust it?——from CEPH enthusiasts

I have never tested the performance. This was a PoC for me. Maybe you can tune the SEGMENT_SIZE

@ImageWQ
Copy link

ImageWQ commented Apr 29, 2022

您的代码非常值得学习,但运行速度非常慢。当rbd导出导入需要200s时,你的方法需要700s。应该如何调整?——来自CEPH爱好者

我从未测试过性能。这对我来说是一个 PoC。也许你可以调整 SEGMENT_SIZE

Thank you very much for this reply. I will continue to study export and import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment